From a39bc15cf6146edb945a1bc0ab91bfee2ba9a86e Mon Sep 17 00:00:00 2001 From: Wosin Date: Sun, 19 Feb 2023 01:33:35 +0100 Subject: [PATCH 1/5] Implement aggregate on Windows with Semigroup. --- .../com.ariskk.flink4s/WindowedStream.scala | 20 +++++++++- .../com.ariskk.flink4s/FlinkExecutor.scala | 4 +- .../WindowedStreamSpec.scala | 40 +++++++++++++++++++ 3 files changed, 61 insertions(+), 3 deletions(-) diff --git a/src/main/scala/com.ariskk.flink4s/WindowedStream.scala b/src/main/scala/com.ariskk.flink4s/WindowedStream.scala index 386fb87..c649384 100644 --- a/src/main/scala/com.ariskk.flink4s/WindowedStream.scala +++ b/src/main/scala/com.ariskk.flink4s/WindowedStream.scala @@ -1,8 +1,10 @@ package com.ariskk.flink4s +import cats.Semigroup +import cats.kernel.Monoid import org.apache.flink.streaming.api.datastream.{WindowedStream => JavaWStream} import org.apache.flink.streaming.api.windowing.windows.Window -import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.common.functions.{AggregateFunction, ReduceFunction} import org.apache.flink.api.common.typeinfo.TypeInformation final case class WindowedStream[T, K, W <: Window](stream: JavaWStream[T, K, W])(implicit @@ -16,4 +18,20 @@ final case class WindowedStream[T, K, W <: Window](stream: JavaWStream[T, K, W]) DataStream(stream.reduce(reducer)) } + def aggregate[A, O](f: (A, T) => A)(outF: A => O)(implicit monoid: Monoid[A], aggTypeInfo: TypeInformation[A], + typeInformation: TypeInformation[O]): DataStream[O] = { + val reducer = new AggregateFunction[T, A, O] { + + override def createAccumulator(): A = Monoid.empty[A] + + override def add(value: T, accumulator: A): A = f(accumulator, value) + + override def getResult(accumulator: A): O = outF(accumulator) + + override def merge(a: A, b: A): A = Semigroup + .combine(a, b) + } + DataStream(stream.aggregate[A,O](reducer, aggTypeInfo, typeInformation))(typeInformation) + } + } diff --git a/src/test/scala/com.ariskk.flink4s/FlinkExecutor.scala b/src/test/scala/com.ariskk.flink4s/FlinkExecutor.scala index 8d96484..bc6eecb 100644 --- a/src/test/scala/com.ariskk.flink4s/FlinkExecutor.scala +++ b/src/test/scala/com.ariskk.flink4s/FlinkExecutor.scala @@ -1,10 +1,10 @@ package com.ariskk.flink4s import scala.util.Random - import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration import org.apache.flink.test.util.MiniClusterWithClientResource import org.apache.flink.contrib.streaming.state.RocksDBStateBackend +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend object FlinkExecutor { @@ -25,7 +25,7 @@ object FlinkExecutor { def newEnv(parallelism: Int = 2): StreamExecutionEnvironment = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(parallelism) - val rocks = new RocksDBStateBackend(s"file:///tmp/flink-${Random.nextString(10)}") + val rocks = new HashMapStateBackend() env.setStateBackend(rocks) env } diff --git a/src/test/scala/com.ariskk.flink4s/WindowedStreamSpec.scala b/src/test/scala/com.ariskk.flink4s/WindowedStreamSpec.scala index 5c06477..0fd4a8d 100644 --- a/src/test/scala/com.ariskk.flink4s/WindowedStreamSpec.scala +++ b/src/test/scala/com.ariskk.flink4s/WindowedStreamSpec.scala @@ -41,6 +41,46 @@ final class WindowedStreamSpec extends AnyFunSpec with Matchers { results.size should equal(4) results shouldBe List(50, 100, 100, 100) } + + it("should apply aggregation to count window with slide") { + val env = FlinkExecutor.newEnv(parallelism = 1) + val stream = env.fromCollection((1 to 200).toList.map(_ => 1)) + val results = stream + .keyBy(identity) + .countWindow(100, 50) + .aggregate[Int, Int]((agg, i) => agg + i)(identity(_)) + .runAndCollect + + results.size should equal(4) + results shouldBe List(50, 100, 100, 100) + } + + + it("should apply aggregation if aggregator and out are different types") { + val env = FlinkExecutor.newEnv(parallelism = 1) + val stream = env.fromCollection((1 to 200).toList.map(_ => 1)) + val results = stream + .keyBy(identity) + .countWindow(100, 50) + .aggregate[Int, String]((agg, i) => agg + i)(_.toString) + .runAndCollect + + results.size should equal(4) + results shouldBe List("50", "100", "100", "100") + } + + it("should apply aggregation if aggregator and out are different types") { + val env = FlinkExecutor.newEnv(parallelism = 1) + val stream = env.fromCollection((1 to 200).toList.map(_ => 1)) + val results = stream + .keyBy(identity) + .countWindow(100, 50) + .aggregate[Int, String]((agg, i) => agg + i)(_.toString) + .runAndCollect + + results.size should equal(4) + results shouldBe List("50", "100", "100", "100") + } } } From 2205925212cbb5649dde2f060b75364bb115423d Mon Sep 17 00:00:00 2001 From: Wosin Date: Sun, 19 Feb 2023 01:39:27 +0100 Subject: [PATCH 2/5] Implement aggregate on Windows with Semigroup. --- .../WindowedStreamSpec.scala | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/test/scala/com.ariskk.flink4s/WindowedStreamSpec.scala b/src/test/scala/com.ariskk.flink4s/WindowedStreamSpec.scala index 0fd4a8d..97db5c1 100644 --- a/src/test/scala/com.ariskk.flink4s/WindowedStreamSpec.scala +++ b/src/test/scala/com.ariskk.flink4s/WindowedStreamSpec.scala @@ -1,11 +1,12 @@ package com.ariskk.flink4s -import scala.collection.mutable.{Buffer => MutableBuffer} +import cats.Monoid +import cats.kernel.Semigroup +import scala.collection.mutable.{Buffer => MutableBuffer} import org.scalatest.funspec.AnyFunSpec import org.scalatest.matchers.should.Matchers import org.apache.flink.streaming.api.functions.sink.SinkFunction - import com.ariskk.flink4s.TypeInfo.{intTypeInfo, stringTypeInfo} final class WindowedStreamSpec extends AnyFunSpec with Matchers { @@ -69,17 +70,24 @@ final class WindowedStreamSpec extends AnyFunSpec with Matchers { results shouldBe List("50", "100", "100", "100") } - it("should apply aggregation if aggregator and out are different types") { + it("should apply aggregation based on Monoid") { val env = FlinkExecutor.newEnv(parallelism = 1) val stream = env.fromCollection((1 to 200).toList.map(_ => 1)) + implicit val semigroup = new Monoid[Int] { + + override def empty: Int = 5 + + override def combine(x: Int, y: Int): Int = x + y + } + val results = stream .keyBy(identity) .countWindow(100, 50) - .aggregate[Int, String]((agg, i) => agg + i)(_.toString) + .aggregate[Int, Int]((agg, i) => agg + i)(identity(_)) .runAndCollect results.size should equal(4) - results shouldBe List("50", "100", "100", "100") + results shouldBe List(55, 105, 105, 105) } } From 2260e2b87e529e9a938e8ce2545a60ba9fb915e7 Mon Sep 17 00:00:00 2001 From: Wosin Date: Sun, 19 Feb 2023 01:41:02 +0100 Subject: [PATCH 3/5] Revert local run change. --- src/main/scala/com.ariskk.flink4s/WindowedStream.scala | 4 ++-- src/test/scala/com.ariskk.flink4s/FlinkExecutor.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/scala/com.ariskk.flink4s/WindowedStream.scala b/src/main/scala/com.ariskk.flink4s/WindowedStream.scala index c649384..a0e5397 100644 --- a/src/main/scala/com.ariskk.flink4s/WindowedStream.scala +++ b/src/main/scala/com.ariskk.flink4s/WindowedStream.scala @@ -18,7 +18,7 @@ final case class WindowedStream[T, K, W <: Window](stream: JavaWStream[T, K, W]) DataStream(stream.reduce(reducer)) } - def aggregate[A, O](f: (A, T) => A)(outF: A => O)(implicit monoid: Monoid[A], aggTypeInfo: TypeInformation[A], + def aggregate[A, O](f: (A, T) => A)(outF: A => O)(implicit monoid: Monoid[A], aggTypeInformation: TypeInformation[A], typeInformation: TypeInformation[O]): DataStream[O] = { val reducer = new AggregateFunction[T, A, O] { @@ -31,7 +31,7 @@ final case class WindowedStream[T, K, W <: Window](stream: JavaWStream[T, K, W]) override def merge(a: A, b: A): A = Semigroup .combine(a, b) } - DataStream(stream.aggregate[A,O](reducer, aggTypeInfo, typeInformation))(typeInformation) + DataStream(stream.aggregate[A,O](reducer, aggTypeInformation, typeInformation))(typeInformation) } } diff --git a/src/test/scala/com.ariskk.flink4s/FlinkExecutor.scala b/src/test/scala/com.ariskk.flink4s/FlinkExecutor.scala index bc6eecb..8d96484 100644 --- a/src/test/scala/com.ariskk.flink4s/FlinkExecutor.scala +++ b/src/test/scala/com.ariskk.flink4s/FlinkExecutor.scala @@ -1,10 +1,10 @@ package com.ariskk.flink4s import scala.util.Random + import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration import org.apache.flink.test.util.MiniClusterWithClientResource import org.apache.flink.contrib.streaming.state.RocksDBStateBackend -import org.apache.flink.runtime.state.hashmap.HashMapStateBackend object FlinkExecutor { @@ -25,7 +25,7 @@ object FlinkExecutor { def newEnv(parallelism: Int = 2): StreamExecutionEnvironment = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(parallelism) - val rocks = new HashMapStateBackend() + val rocks = new RocksDBStateBackend(s"file:///tmp/flink-${Random.nextString(10)}") env.setStateBackend(rocks) env } From 7efabbfcfcfd1a3c1eab203085665152ee0230e9 Mon Sep 17 00:00:00 2001 From: Wosin Date: Sun, 19 Feb 2023 01:43:55 +0100 Subject: [PATCH 4/5] Some cleanup. --- .../scala/com.ariskk.flink4s/WindowedStream.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/main/scala/com.ariskk.flink4s/WindowedStream.scala b/src/main/scala/com.ariskk.flink4s/WindowedStream.scala index a0e5397..7239e76 100644 --- a/src/main/scala/com.ariskk.flink4s/WindowedStream.scala +++ b/src/main/scala/com.ariskk.flink4s/WindowedStream.scala @@ -18,20 +18,21 @@ final case class WindowedStream[T, K, W <: Window](stream: JavaWStream[T, K, W]) DataStream(stream.reduce(reducer)) } - def aggregate[A, O](f: (A, T) => A)(outF: A => O)(implicit monoid: Monoid[A], aggTypeInformation: TypeInformation[A], - typeInformation: TypeInformation[O]): DataStream[O] = { + def aggregate[A, O](aggregateF: (A, T) => A)(outputF: A => O)(implicit monoid: Monoid[A], + aggTypeInformation: TypeInformation[A], + typeInformation: TypeInformation[O]): DataStream[O] = { val reducer = new AggregateFunction[T, A, O] { override def createAccumulator(): A = Monoid.empty[A] - override def add(value: T, accumulator: A): A = f(accumulator, value) + override def add(value: T, accumulator: A): A = aggregateF(accumulator, value) - override def getResult(accumulator: A): O = outF(accumulator) + override def getResult(accumulator: A): O = outputF(accumulator) - override def merge(a: A, b: A): A = Semigroup + override def merge(a: A, b: A): A = Monoid .combine(a, b) } - DataStream(stream.aggregate[A,O](reducer, aggTypeInformation, typeInformation))(typeInformation) + DataStream(stream.aggregate[A,O](reducer, aggTypeInformation, typeInformation)) } } From 26b32394df1d270a4ace02a63d7e12488f838427 Mon Sep 17 00:00:00 2001 From: Wosin Date: Thu, 23 Feb 2023 18:42:24 +0100 Subject: [PATCH 5/5] Fix aggregate + remove monoid. --- .../scala/com.ariskk.flink4s/WindowedStream.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/main/scala/com.ariskk.flink4s/WindowedStream.scala b/src/main/scala/com.ariskk.flink4s/WindowedStream.scala index 7239e76..664eb58 100644 --- a/src/main/scala/com.ariskk.flink4s/WindowedStream.scala +++ b/src/main/scala/com.ariskk.flink4s/WindowedStream.scala @@ -18,21 +18,22 @@ final case class WindowedStream[T, K, W <: Window](stream: JavaWStream[T, K, W]) DataStream(stream.reduce(reducer)) } - def aggregate[A, O](aggregateF: (A, T) => A)(outputF: A => O)(implicit monoid: Monoid[A], - aggTypeInformation: TypeInformation[A], - typeInformation: TypeInformation[O]): DataStream[O] = { + def aggregate[A, O](agg: A, mergeF: (A, A) => A)(aggregateF: (A, T) => A)(outputF: A => O)( + implicit + aggTypeInformation: TypeInformation[A], + typeInformation: TypeInformation[O] + ): DataStream[O] = { val reducer = new AggregateFunction[T, A, O] { - override def createAccumulator(): A = Monoid.empty[A] + override def createAccumulator(): A = agg override def add(value: T, accumulator: A): A = aggregateF(accumulator, value) override def getResult(accumulator: A): O = outputF(accumulator) - override def merge(a: A, b: A): A = Monoid - .combine(a, b) + override def merge(a: A, b: A): A = mergeF(a, b) } - DataStream(stream.aggregate[A,O](reducer, aggTypeInformation, typeInformation)) + DataStream(stream.aggregate[A, O](reducer, aggTypeInformation, typeInformation)) } }