From 4dbc8084584492b1d57550480ef8b20317050315 Mon Sep 17 00:00:00 2001 From: Timur Abishev Date: Thu, 15 Jun 2017 17:24:42 -0700 Subject: [PATCH 1/4] Add some performance testing to OperationContainer --- build.sbt | 7 +- project/plugins.sbt | 2 +- .../executor/ComposedFlatMapPerformance.scala | 82 +++++++++++++ .../ComposedFlatMapWithTimePerformance.scala | 109 ++++++++++++++++++ .../executor/IdentityFlatMapPerformance.scala | 52 +++++++++ .../online/executor/TestUtils.scala | 43 +++++++ .../online/executor/SimpleFlatMap.scala | 11 ++ 7 files changed, 304 insertions(+), 2 deletions(-) create mode 100644 summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapPerformance.scala create mode 100644 summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapWithTimePerformance.scala create mode 100644 summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/IdentityFlatMapPerformance.scala create mode 100644 summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/TestUtils.scala create mode 100644 summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/SimpleFlatMap.scala diff --git a/build.sbt b/build.sbt index 690ac8feb..e21c45764 100644 --- a/build.sbt +++ b/build.sbt @@ -174,7 +174,8 @@ lazy val summingbird = Project( summingbirdBuilder, summingbirdChill, summingbirdExample, - summingbirdCoreTest + summingbirdCoreTest, + summingbirdOnlinePerf ) /** @@ -314,6 +315,10 @@ lazy val summingbirdOnline = module("online").settings( summingbirdClient ) +lazy val summingbirdOnlinePerf = module("online-perf") + .enablePlugins(JmhPlugin) + .dependsOn(summingbirdOnline) + lazy val summingbirdStorm = module("storm").settings( parallelExecution in Test := false, libraryDependencies ++= Seq( diff --git a/project/plugins.sbt b/project/plugins.sbt index 07e60b7e7..7bb926a63 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -6,4 +6,4 @@ addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.12") addSbtPlugin("com.typesafe.sbt" % "sbt-ghpages" % "0.5.4") addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.0") addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "1.0") -addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.2.18") +addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.2.24") diff --git a/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapPerformance.scala b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapPerformance.scala new file mode 100644 index 000000000..a3dd7fb17 --- /dev/null +++ b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapPerformance.scala @@ -0,0 +1,82 @@ +package com.twitter.summingbird.online.executor + +import com.twitter.conversions.time.longToTimeableNumber +import com.twitter.summingbird.online.FlatMapOperation +import com.twitter.summingbird.online.option.{MaxEmitPerExecute, MaxFutureWaitTime, MaxWaitingFutures} +import java.util.concurrent.TimeUnit +import org.openjdk.jmh.annotations.{Benchmark, BenchmarkMode, Mode, OutputTimeUnit, Scope, State} +import org.openjdk.jmh.infra.Blackhole + +/** + * Command to run (should be executed from sbt with project summingbird-online-perf): + * jmh:run -i 10 -wi 10 -f1 -t1 ComposedFlatMapPerformance + * + * Benchmark Mode Cnt Score Error Units + * ComposedFlatMapPerformance.measureDirect avgt 10 239.003 ± 19.990 ms/op + * ComposedFlatMapPerformance.measureDirectOperationContainer avgt 10 366.307 ± 109.717 ms/op + * ComposedFlatMapPerformance.measureOperationContainerComposeThroughOperation avgt 10 2733.537 ± 63.415 ms/op + * ComposedFlatMapPerformance.measureOperationContainerDirectCompose avgt 10 1084.457 ± 22.968 ms/op + */ +@State(Scope.Thread) +class ComposedFlatMapPerformance { + val size = 1000000 + val input = Array.range(0, size) + + val f1: (Int => TraversableOnce[Int]) = x => Some(x * 2) + val f2: (Int => TraversableOnce[(Int, Int)]) = x => Some((x, x)) + val f3: (((Int, Int)) => TraversableOnce[Int]) = pair => List( + pair._1, + pair._2, + pair._1 + pair._2, + pair._1 * pair._2 + ) + + val composed: (Int) => TraversableOnce[Int] = + flatMap[Int, (Int, Int), Int](flatMap[Int, Int, (Int, Int)](f1, f2), f3) + val composedContainerDirect: OperationContainer[Int, Int, InputState[Int]] = + new SimpleFlatMap(composed) + val composedContainer: OperationContainer[Int, Int, InputState[Int]] = + new IntermediateFlatMap( + FlatMapOperation.apply(f1).flatMap(f2).flatMap(f3), + MaxWaitingFutures(1000), + MaxFutureWaitTime(1000.millis), + MaxEmitPerExecute(Int.MaxValue) + ) + val directlyComposedContainer: OperationContainer[Int, Int, InputState[Int]] = + new IntermediateFlatMap( + FlatMapOperation.apply(composed), + MaxWaitingFutures(1000), + MaxFutureWaitTime(1000.millis), + MaxEmitPerExecute(Int.MaxValue) + ) + + @Benchmark + @BenchmarkMode(Array(Mode.AverageTime)) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + def measureDirect(bh: Blackhole): Unit = + input.foreach(composed.apply(_).foreach(bh.consume)) + + @Benchmark + @BenchmarkMode(Array(Mode.AverageTime)) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + def measureDirectOperationContainer(bh: Blackhole): Unit = + TestUtils.testOperationContainer(composedContainerDirect, input, bh) + + @Benchmark + @BenchmarkMode(Array(Mode.AverageTime)) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + def measureOperationContainerComposeThroughOperation(bh: Blackhole): Unit = + TestUtils.testOperationContainer(composedContainer, input, bh) + + @Benchmark + @BenchmarkMode(Array(Mode.AverageTime)) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + def measureOperationContainerDirectCompose(bh: Blackhole): Unit = + TestUtils.testOperationContainer(directlyComposedContainer, input, bh) + + def flatMap[A, B, C]( + f1: A => TraversableOnce[B], + f2: B => TraversableOnce[C] + ): (A => TraversableOnce[C]) = + x => f1(x).flatMap(f2) +} diff --git a/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapWithTimePerformance.scala b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapWithTimePerformance.scala new file mode 100644 index 000000000..8ce20667d --- /dev/null +++ b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapWithTimePerformance.scala @@ -0,0 +1,109 @@ +package com.twitter.summingbird.online.executor + +import com.twitter.conversions.time.longToTimeableNumber +import com.twitter.summingbird.batch.Timestamp +import com.twitter.summingbird.online.FlatMapOperation +import com.twitter.summingbird.online.option.{MaxEmitPerExecute, MaxFutureWaitTime, MaxWaitingFutures} +import java.util.concurrent.TimeUnit +import org.openjdk.jmh.annotations.{Benchmark, BenchmarkMode, Mode, OutputTimeUnit, Scope, State} +import org.openjdk.jmh.infra.Blackhole + +/** + * Command to run (should be executed from sbt with project summingbird-online-perf): + * jmh:run -i 10 -wi 10 -f1 -t1 ComposedFlatMapWithTimePerformance + * + * Benchmark Mode Cnt Score Error Units + * ComposedFlatMapWithTimePerformance.measureDirectComplex avgt 10 340.341 ± 4.343 ms/op + * ComposedFlatMapWithTimePerformance.measureDirectOperationContainer avgt 10 396.599 ± 9.106 ms/op + * ComposedFlatMapWithTimePerformance.measureDirectSimple avgt 10 279.628 ± 6.175 ms/op + * ComposedFlatMapWithTimePerformance.measureOperationContainerComposeThroughOperation avgt 10 2978.141 ± 75.213 ms/op + * ComposedFlatMapWithTimePerformance.measureOperationContainerDirectCompose avgt 10 1198.601 ± 54.871 ms/op + */ +@State(Scope.Thread) +class ComposedFlatMapWithTimePerformance { + val size = 1000000 + val input = (0 to size).map(i => (Timestamp(0), i)).toArray + + val f1: (Int => TraversableOnce[Int]) = x => Some(x * 2) + val f2: (Int => TraversableOnce[(Int, Int)]) = x => Some((x, x)) + val f3: (((Int, Int)) => TraversableOnce[Int]) = pair => List( + pair._1, + pair._2, + pair._1 + pair._2, + pair._1 * pair._2 + ) + + val composed: (Int) => TraversableOnce[Int] = + flatMap[Int, (Int, Int), Int](flatMap[Int, Int, (Int, Int)](f1, f2), f3) + val composedOp: FlatMapOperation[Int, Int] = + FlatMapOperation.apply(f1).flatMap(f2).flatMap(f3) + + val composedWithTimeSimple: ((Timestamp, Int)) => TraversableOnce[(Timestamp, Int)] = + withTime(composed) + val composedWithTimeComplex: ((Timestamp, Int)) => TraversableOnce[(Timestamp, Int)] = + flatMap( + flatMap(withTime(f1), withTime(f2)), + withTime(f3) + ) + val composedContainerDirect: OperationContainer[(Timestamp, Int), (Timestamp, Int), InputState[Int]] = + new SimpleFlatMap(composedWithTimeComplex) + val composedContainerComposeThroughOperation: + OperationContainer[(Timestamp, Int), (Timestamp, Int), InputState[Int]] = new IntermediateFlatMap( + withTimeOp(composedOp), + MaxWaitingFutures(1000), + MaxFutureWaitTime(1000.millis), + MaxEmitPerExecute(Int.MaxValue) + ) + val composedContainerDirectCompose: + OperationContainer[(Timestamp, Int), (Timestamp, Int), InputState[Int]] = new IntermediateFlatMap( + FlatMapOperation.apply(composedWithTimeComplex), + MaxWaitingFutures(1000), + MaxFutureWaitTime(1000.millis), + MaxEmitPerExecute(Int.MaxValue) + ) + + @Benchmark + @BenchmarkMode(Array(Mode.AverageTime)) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + def measureDirectSimple(bh: Blackhole): Unit = + input.foreach(composedWithTimeSimple.apply(_).foreach(bh.consume)) + + @Benchmark + @BenchmarkMode(Array(Mode.AverageTime)) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + def measureDirectComplex(bh: Blackhole): Unit = + input.foreach(composedWithTimeComplex.apply(_).foreach(bh.consume)) + + @Benchmark + @BenchmarkMode(Array(Mode.AverageTime)) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + def measureDirectOperationContainer(bh: Blackhole): Unit = + TestUtils.testOperationContainer(composedContainerDirect, input, bh) + + @Benchmark + @BenchmarkMode(Array(Mode.AverageTime)) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + def measureOperationContainerComposeThroughOperation(bh: Blackhole): Unit = + TestUtils.testOperationContainer(composedContainerComposeThroughOperation, input, bh) + + @Benchmark + @BenchmarkMode(Array(Mode.AverageTime)) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + def measureOperationContainerDirectCompose(bh: Blackhole): Unit = + TestUtils.testOperationContainer(composedContainerDirectCompose, input, bh) + + def flatMap[A, B, C]( + f1: A => TraversableOnce[B], + f2: B => TraversableOnce[C] + ): (A => TraversableOnce[C]) = + x => f1(x).flatMap(f2) + + def withTime[A, B](f: A => TraversableOnce[B]): ((Timestamp, A)) => TraversableOnce[(Timestamp, B)] = { + case (timestamp, v) => f.apply(v).map((timestamp, _)) + } + + def withTimeOp[A, B](f: FlatMapOperation[A, B]): FlatMapOperation[(Timestamp, A), (Timestamp, B)] = + FlatMapOperation.generic { case (timestamp, v) => + f.apply(v).map(elements => elements.map((timestamp, _))) + } +} diff --git a/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/IdentityFlatMapPerformance.scala b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/IdentityFlatMapPerformance.scala new file mode 100644 index 000000000..dc6fb9104 --- /dev/null +++ b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/IdentityFlatMapPerformance.scala @@ -0,0 +1,52 @@ +package com.twitter.summingbird.online.executor + +import com.twitter.conversions.time.longToTimeableNumber +import com.twitter.summingbird.online.FlatMapOperation +import com.twitter.summingbird.online.option.{MaxEmitPerExecute, MaxFutureWaitTime, MaxWaitingFutures} +import java.util.concurrent.TimeUnit +import org.openjdk.jmh.annotations.{Benchmark, BenchmarkMode, Mode, OutputTimeUnit, Scope, State} +import org.openjdk.jmh.infra.Blackhole + +/** + * Command to run (should be executed from sbt with project summingbird-online-perf): + * jmh:run -i 10 -wi 10 -f1 -t1 IdentityFlatMapPerformance + * + * Benchmark Mode Cnt Score Error Units + * IdentityFlatMapPerformance.measureDirect avgt 10 42.184 ± 3.518 ms/op + * IdentityFlatMapPerformance.measureDirectOperationContainer avgt 10 87.605 ± 3.843 ms/op + * IdentityFlatMapPerformance.measureOperationContainer avgt 10 817.659 ± 36.591 ms/op + */ +@State(Scope.Thread) +class IdentityFlatMapPerformance { + val size = 1000000 + val input = Array.range(0, size) + + val f: (Int => TraversableOnce[Int]) = x => Iterator(x) + val containerDirect: OperationContainer[Int, Int, InputState[Int]] = + new SimpleFlatMap(f) + val container: OperationContainer[Int, Int, InputState[Int]] = + new IntermediateFlatMap( + FlatMapOperation.apply(f), + MaxWaitingFutures(1000), + MaxFutureWaitTime(1000.millis), + MaxEmitPerExecute(Int.MaxValue) + ) + + @Benchmark + @BenchmarkMode(Array(Mode.AverageTime)) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + def measureDirect(bh: Blackhole): Unit = + input.foreach(f.apply(_).foreach(bh.consume)) + + @Benchmark + @BenchmarkMode(Array(Mode.AverageTime)) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + def measureDirectOperationContainer(bh: Blackhole): Unit = + TestUtils.testOperationContainer(containerDirect, input, bh) + + @Benchmark + @BenchmarkMode(Array(Mode.AverageTime)) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + def measureOperationContainer(bh: Blackhole): Unit = + TestUtils.testOperationContainer(container, input, bh) +} diff --git a/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/TestUtils.scala b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/TestUtils.scala new file mode 100644 index 000000000..83153099a --- /dev/null +++ b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/TestUtils.scala @@ -0,0 +1,43 @@ +package com.twitter.summingbird.online.executor + +import org.openjdk.jmh.infra.Blackhole +import scala.concurrent.duration.DurationInt +import scala.util.{Failure, Success} + +object TestUtils { + val TickDelay = 5.millis + + def testOperationContainer[I, O]( + container: OperationContainer[I, O, InputState[Int]], + inputs: Array[I], + bh: Blackhole + ): Unit = { + var left = inputs.length + for ( + input <- inputs; + emitted <- container.execute(InputState[Int](1), input) + ) { + emitted match { + case (states, output) => + states.foreach { _ => left -= 1 } + output match { + case Success(value) => value.foreach(bh.consume) + case Failure(exception) => bh.consume(exception) + } + } + } + while (left != 0) { + Thread.sleep(TickDelay.toMillis) + for (emitted <- container.executeTick) { + emitted match { + case (states, output) => + states.foreach { _ => left -= 1 } + output match { + case Success(value) => value.foreach(bh.consume) + case Failure(exception) => bh.consume(exception) + } + } + } + } + } +} diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/SimpleFlatMap.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/SimpleFlatMap.scala new file mode 100644 index 000000000..8c951a5de --- /dev/null +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/SimpleFlatMap.scala @@ -0,0 +1,11 @@ +package com.twitter.summingbird.online.executor +import chain.Chain +import scala.util.{Success, Try} + +class SimpleFlatMap[I, O, S](f: I => TraversableOnce[O]) extends OperationContainer[I, O, S] { + override def executeTick: TraversableOnce[(Chain[S], Try[TraversableOnce[O]])] = + None + + override def execute(state: S,data: I): TraversableOnce[(Chain[S], Try[TraversableOnce[O]])] = + Some((Chain.single(state), Success(f(data)))) +} From a2b653af98c11fabb3aca0823f9a89ad753082ce Mon Sep 17 00:00:00 2001 From: Timur Abishev Date: Thu, 6 Jul 2017 16:22:19 -0700 Subject: [PATCH 2/4] Use Try instead of Success --- .../online/executor/ComposedFlatMapPerformance.scala | 2 +- .../online/executor/ComposedFlatMapWithTimePerformance.scala | 2 +- .../online/executor/IdentityFlatMapPerformance.scala | 2 +- .../twitter/summingbird/online/executor/SimpleFlatMap.scala | 4 ++-- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapPerformance.scala b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapPerformance.scala index a3dd7fb17..50191162a 100644 --- a/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapPerformance.scala +++ b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapPerformance.scala @@ -13,7 +13,7 @@ import org.openjdk.jmh.infra.Blackhole * * Benchmark Mode Cnt Score Error Units * ComposedFlatMapPerformance.measureDirect avgt 10 239.003 ± 19.990 ms/op - * ComposedFlatMapPerformance.measureDirectOperationContainer avgt 10 366.307 ± 109.717 ms/op + * ComposedFlatMapPerformance.measureDirectOperationContainer avgt 10 295.847 ± 14.774 ms/op * ComposedFlatMapPerformance.measureOperationContainerComposeThroughOperation avgt 10 2733.537 ± 63.415 ms/op * ComposedFlatMapPerformance.measureOperationContainerDirectCompose avgt 10 1084.457 ± 22.968 ms/op */ diff --git a/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapWithTimePerformance.scala b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapWithTimePerformance.scala index 8ce20667d..1fdf6ac5c 100644 --- a/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapWithTimePerformance.scala +++ b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapWithTimePerformance.scala @@ -14,7 +14,7 @@ import org.openjdk.jmh.infra.Blackhole * * Benchmark Mode Cnt Score Error Units * ComposedFlatMapWithTimePerformance.measureDirectComplex avgt 10 340.341 ± 4.343 ms/op - * ComposedFlatMapWithTimePerformance.measureDirectOperationContainer avgt 10 396.599 ± 9.106 ms/op + * ComposedFlatMapWithTimePerformance.measureDirectOperationContainer avgt 10 386.593 ± 6.474 ms/op * ComposedFlatMapWithTimePerformance.measureDirectSimple avgt 10 279.628 ± 6.175 ms/op * ComposedFlatMapWithTimePerformance.measureOperationContainerComposeThroughOperation avgt 10 2978.141 ± 75.213 ms/op * ComposedFlatMapWithTimePerformance.measureOperationContainerDirectCompose avgt 10 1198.601 ± 54.871 ms/op diff --git a/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/IdentityFlatMapPerformance.scala b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/IdentityFlatMapPerformance.scala index dc6fb9104..abc50ff5d 100644 --- a/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/IdentityFlatMapPerformance.scala +++ b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/IdentityFlatMapPerformance.scala @@ -13,7 +13,7 @@ import org.openjdk.jmh.infra.Blackhole * * Benchmark Mode Cnt Score Error Units * IdentityFlatMapPerformance.measureDirect avgt 10 42.184 ± 3.518 ms/op - * IdentityFlatMapPerformance.measureDirectOperationContainer avgt 10 87.605 ± 3.843 ms/op + * IdentityFlatMapPerformance.measureDirectOperationContainer avgt 10 101.944 ± 3.956 ms/op * IdentityFlatMapPerformance.measureOperationContainer avgt 10 817.659 ± 36.591 ms/op */ @State(Scope.Thread) diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/SimpleFlatMap.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/SimpleFlatMap.scala index 8c951a5de..0090e9471 100644 --- a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/SimpleFlatMap.scala +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/SimpleFlatMap.scala @@ -1,11 +1,11 @@ package com.twitter.summingbird.online.executor import chain.Chain -import scala.util.{Success, Try} +import scala.util.Try class SimpleFlatMap[I, O, S](f: I => TraversableOnce[O]) extends OperationContainer[I, O, S] { override def executeTick: TraversableOnce[(Chain[S], Try[TraversableOnce[O]])] = None override def execute(state: S,data: I): TraversableOnce[(Chain[S], Try[TraversableOnce[O]])] = - Some((Chain.single(state), Success(f(data)))) + Some((Chain.single(state), Try(f(data)))) } From 69bde7c9cb587a0e6001cd167a7a8e316bcea64b Mon Sep 17 00:00:00 2001 From: Timur Abishev Date: Thu, 6 Jul 2017 22:21:43 -0700 Subject: [PATCH 3/4] Move SimpleFlatMap to perf module --- .../twitter/summingbird/online/executor/SimpleFlatMap.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) rename {summingbird-online => summingbird-online-perf}/src/main/scala/com/twitter/summingbird/online/executor/SimpleFlatMap.scala (73%) diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/SimpleFlatMap.scala b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/SimpleFlatMap.scala similarity index 73% rename from summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/SimpleFlatMap.scala rename to summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/SimpleFlatMap.scala index 0090e9471..7c3f891e6 100644 --- a/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/SimpleFlatMap.scala +++ b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/SimpleFlatMap.scala @@ -1,8 +1,11 @@ package com.twitter.summingbird.online.executor + import chain.Chain import scala.util.Try -class SimpleFlatMap[I, O, S](f: I => TraversableOnce[O]) extends OperationContainer[I, O, S] { +private[executor] class SimpleFlatMap[I, O, S](f: I => TraversableOnce[O]) + extends OperationContainer[I, O, S] { + override def executeTick: TraversableOnce[(Chain[S], Try[TraversableOnce[O]])] = None From f3e48456fedf9805a7588b7f0063395dc4bf47b2 Mon Sep 17 00:00:00 2001 From: Timur Abishev Date: Thu, 6 Jul 2017 22:22:46 -0700 Subject: [PATCH 4/4] Fix imports style --- .../online/executor/ComposedFlatMapPerformance.scala | 4 ++-- .../online/executor/ComposedFlatMapWithTimePerformance.scala | 4 ++-- .../online/executor/IdentityFlatMapPerformance.scala | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapPerformance.scala b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapPerformance.scala index 50191162a..47dc261b5 100644 --- a/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapPerformance.scala +++ b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapPerformance.scala @@ -2,9 +2,9 @@ package com.twitter.summingbird.online.executor import com.twitter.conversions.time.longToTimeableNumber import com.twitter.summingbird.online.FlatMapOperation -import com.twitter.summingbird.online.option.{MaxEmitPerExecute, MaxFutureWaitTime, MaxWaitingFutures} +import com.twitter.summingbird.online.option.{ MaxEmitPerExecute, MaxFutureWaitTime, MaxWaitingFutures } import java.util.concurrent.TimeUnit -import org.openjdk.jmh.annotations.{Benchmark, BenchmarkMode, Mode, OutputTimeUnit, Scope, State} +import org.openjdk.jmh.annotations.{ Benchmark, BenchmarkMode, Mode, OutputTimeUnit, Scope, State } import org.openjdk.jmh.infra.Blackhole /** diff --git a/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapWithTimePerformance.scala b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapWithTimePerformance.scala index 1fdf6ac5c..1d5d6cd52 100644 --- a/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapWithTimePerformance.scala +++ b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapWithTimePerformance.scala @@ -3,9 +3,9 @@ package com.twitter.summingbird.online.executor import com.twitter.conversions.time.longToTimeableNumber import com.twitter.summingbird.batch.Timestamp import com.twitter.summingbird.online.FlatMapOperation -import com.twitter.summingbird.online.option.{MaxEmitPerExecute, MaxFutureWaitTime, MaxWaitingFutures} +import com.twitter.summingbird.online.option.{ MaxEmitPerExecute, MaxFutureWaitTime, MaxWaitingFutures } import java.util.concurrent.TimeUnit -import org.openjdk.jmh.annotations.{Benchmark, BenchmarkMode, Mode, OutputTimeUnit, Scope, State} +import org.openjdk.jmh.annotations.{ Benchmark, BenchmarkMode, Mode, OutputTimeUnit, Scope, State } import org.openjdk.jmh.infra.Blackhole /** diff --git a/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/IdentityFlatMapPerformance.scala b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/IdentityFlatMapPerformance.scala index abc50ff5d..c341d7612 100644 --- a/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/IdentityFlatMapPerformance.scala +++ b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/IdentityFlatMapPerformance.scala @@ -2,9 +2,9 @@ package com.twitter.summingbird.online.executor import com.twitter.conversions.time.longToTimeableNumber import com.twitter.summingbird.online.FlatMapOperation -import com.twitter.summingbird.online.option.{MaxEmitPerExecute, MaxFutureWaitTime, MaxWaitingFutures} +import com.twitter.summingbird.online.option.{ MaxEmitPerExecute, MaxFutureWaitTime, MaxWaitingFutures } import java.util.concurrent.TimeUnit -import org.openjdk.jmh.annotations.{Benchmark, BenchmarkMode, Mode, OutputTimeUnit, Scope, State} +import org.openjdk.jmh.annotations.{ Benchmark, BenchmarkMode, Mode, OutputTimeUnit, Scope, State } import org.openjdk.jmh.infra.Blackhole /**