Not an issue with flink, but missing a closing bracket!
I am trying to follow the examples from the 1.16 (current, 2023-03-07) release.
The example is this:
stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>);
My code looks like this:
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
private val result = streamA.join(streamB)
.where(new MySelector().getKey)
.equalTo(new MySelector().getKey)
.window(SlidingEventTimeWindows.of(Time.minutes(60), Time.minutes(60))
.apply() // <- function does not exist!
Intellij gives a couple of options there but no apply
(see screenshot). Any ideas?
build.sbt
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / scalaVersion := "2.12.12"
lazy val root = (project in file("."))
.settings(
name := "apache-flink-example",
libraryDependencies += "org.apache.flink" % "flink-streaming-scala_2.12" % "1.16.1",
libraryDependencies += "org.apache.flink" % "flink-connector-kafka" % "1.16.1",
libraryDependencies += "org.apache.flink" % "flink-connectors" % "1.16.1",
libraryDependencies += "org.apache.flink" % "flink-core" % "1.16.1",
libraryDependencies += "org.apache.flink" % "flink-java8" % "0.10.1",
libraryDependencies += "org.apache.flink" % "flink-quickstart" % "1.16.1",
libraryDependencies += "org.apache.flink" % "flink-libraries" % "1.16.1",
libraryDependencies += "org.apache.flink" % "flink-clients" % "1.16.1",
libraryDependencies += "org.json4s" %% "json4s-native" % "4.0.6",
)
>Solution :
The methods you see are the methods available for the SlidingEventTimeWindows
class, because the function is not closed.
There is a parenthesis missing in the window function. Try this:
private val result = streamA.join(streamB)
.where(new MySelector().getKey)
.equalTo(new MySelector().getKey)
.window(SlidingEventTimeWindows.of(Time.minutes(60), Time.minutes(60))) //check the parenthesis
.apply()