Apache flink join: ".apply()" does not exist? (scala)

Advertisements

Not an issue with flink, but missing a closing bracket!

I am trying to follow the examples from the 1.16 (current, 2023-03-07) release.

The example is this:

stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>);

My code looks like this:

import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows

private val result = streamA.join(streamB)
    .where(new MySelector().getKey)
    .equalTo(new MySelector().getKey)
    .window(SlidingEventTimeWindows.of(Time.minutes(60), Time.minutes(60))
    .apply()  // <- function does not exist! 
  

Intellij gives a couple of options there but no apply (see screenshot). Any ideas?

build.sbt

ThisBuild / version := "0.1.0-SNAPSHOT"

ThisBuild / scalaVersion := "2.12.12"

lazy val root = (project in file("."))
  .settings(
    name := "apache-flink-example",
    libraryDependencies += "org.apache.flink" % "flink-streaming-scala_2.12" % "1.16.1",
    libraryDependencies += "org.apache.flink" % "flink-connector-kafka" % "1.16.1",
    libraryDependencies += "org.apache.flink" % "flink-connectors" % "1.16.1",
    libraryDependencies += "org.apache.flink" % "flink-core" % "1.16.1",
    libraryDependencies += "org.apache.flink" % "flink-java8" % "0.10.1",
    libraryDependencies += "org.apache.flink" % "flink-quickstart" % "1.16.1",
    libraryDependencies += "org.apache.flink" % "flink-libraries" % "1.16.1",
    libraryDependencies += "org.apache.flink" % "flink-clients" % "1.16.1",
    libraryDependencies += "org.json4s" %% "json4s-native" % "4.0.6",
  )

>Solution :

The methods you see are the methods available for the SlidingEventTimeWindows class, because the function is not closed.

There is a parenthesis missing in the window function. Try this:

private val result = streamA.join(streamB)
    .where(new MySelector().getKey)
    .equalTo(new MySelector().getKey)
    .window(SlidingEventTimeWindows.of(Time.minutes(60), Time.minutes(60))) //check the parenthesis
    .apply()  

Leave a ReplyCancel reply