diff --git a/build.sbt b/build.sbt index 690ac8feb..e21c45764 100644 --- a/build.sbt +++ b/build.sbt @@ -174,7 +174,8 @@ lazy val summingbird = Project( summingbirdBuilder, summingbirdChill, summingbirdExample, - summingbirdCoreTest + summingbirdCoreTest, + summingbirdOnlinePerf ) /** @@ -314,6 +315,10 @@ lazy val summingbirdOnline = module("online").settings( summingbirdClient ) +lazy val summingbirdOnlinePerf = module("online-perf") + .enablePlugins(JmhPlugin) + .dependsOn(summingbirdOnline) + lazy val summingbirdStorm = module("storm").settings( parallelExecution in Test := false, libraryDependencies ++= Seq( diff --git a/project/plugins.sbt b/project/plugins.sbt index 07e60b7e7..7bb926a63 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -6,4 +6,4 @@ addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.12") addSbtPlugin("com.typesafe.sbt" % "sbt-ghpages" % "0.5.4") addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.0") addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "1.0") -addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.2.18") +addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.2.24") diff --git a/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapPerformance.scala b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapPerformance.scala new file mode 100644 index 000000000..47dc261b5 --- /dev/null +++ b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapPerformance.scala @@ -0,0 +1,82 @@ +package com.twitter.summingbird.online.executor + +import com.twitter.conversions.time.longToTimeableNumber +import com.twitter.summingbird.online.FlatMapOperation +import com.twitter.summingbird.online.option.{ MaxEmitPerExecute, MaxFutureWaitTime, MaxWaitingFutures } +import java.util.concurrent.TimeUnit +import org.openjdk.jmh.annotations.{ Benchmark, BenchmarkMode, Mode, OutputTimeUnit, Scope, State } +import org.openjdk.jmh.infra.Blackhole + +/** + * Command to run (should be executed from sbt with project summingbird-online-perf): + * jmh:run -i 10 -wi 10 -f1 -t1 ComposedFlatMapPerformance + * + * Benchmark Mode Cnt Score Error Units + * ComposedFlatMapPerformance.measureDirect avgt 10 239.003 ± 19.990 ms/op + * ComposedFlatMapPerformance.measureDirectOperationContainer avgt 10 295.847 ± 14.774 ms/op + * ComposedFlatMapPerformance.measureOperationContainerComposeThroughOperation avgt 10 2733.537 ± 63.415 ms/op + * ComposedFlatMapPerformance.measureOperationContainerDirectCompose avgt 10 1084.457 ± 22.968 ms/op + */ +@State(Scope.Thread) +class ComposedFlatMapPerformance { + val size = 1000000 + val input = Array.range(0, size) + + val f1: (Int => TraversableOnce[Int]) = x => Some(x * 2) + val f2: (Int => TraversableOnce[(Int, Int)]) = x => Some((x, x)) + val f3: (((Int, Int)) => TraversableOnce[Int]) = pair => List( + pair._1, + pair._2, + pair._1 + pair._2, + pair._1 * pair._2 + ) + + val composed: (Int) => TraversableOnce[Int] = + flatMap[Int, (Int, Int), Int](flatMap[Int, Int, (Int, Int)](f1, f2), f3) + val composedContainerDirect: OperationContainer[Int, Int, InputState[Int]] = + new SimpleFlatMap(composed) + val composedContainer: OperationContainer[Int, Int, InputState[Int]] = + new IntermediateFlatMap( + FlatMapOperation.apply(f1).flatMap(f2).flatMap(f3), + MaxWaitingFutures(1000), + MaxFutureWaitTime(1000.millis), + MaxEmitPerExecute(Int.MaxValue) + ) + val directlyComposedContainer: OperationContainer[Int, Int, InputState[Int]] = + new IntermediateFlatMap( + FlatMapOperation.apply(composed), + MaxWaitingFutures(1000), + MaxFutureWaitTime(1000.millis), + MaxEmitPerExecute(Int.MaxValue) + ) + + @Benchmark + @BenchmarkMode(Array(Mode.AverageTime)) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + def measureDirect(bh: Blackhole): Unit = + input.foreach(composed.apply(_).foreach(bh.consume)) + + @Benchmark + @BenchmarkMode(Array(Mode.AverageTime)) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + def measureDirectOperationContainer(bh: Blackhole): Unit = + TestUtils.testOperationContainer(composedContainerDirect, input, bh) + + @Benchmark + @BenchmarkMode(Array(Mode.AverageTime)) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + def measureOperationContainerComposeThroughOperation(bh: Blackhole): Unit = + TestUtils.testOperationContainer(composedContainer, input, bh) + + @Benchmark + @BenchmarkMode(Array(Mode.AverageTime)) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + def measureOperationContainerDirectCompose(bh: Blackhole): Unit = + TestUtils.testOperationContainer(directlyComposedContainer, input, bh) + + def flatMap[A, B, C]( + f1: A => TraversableOnce[B], + f2: B => TraversableOnce[C] + ): (A => TraversableOnce[C]) = + x => f1(x).flatMap(f2) +} diff --git a/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapWithTimePerformance.scala b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapWithTimePerformance.scala new file mode 100644 index 000000000..1d5d6cd52 --- /dev/null +++ b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/ComposedFlatMapWithTimePerformance.scala @@ -0,0 +1,109 @@ +package com.twitter.summingbird.online.executor + +import com.twitter.conversions.time.longToTimeableNumber +import com.twitter.summingbird.batch.Timestamp +import com.twitter.summingbird.online.FlatMapOperation +import com.twitter.summingbird.online.option.{ MaxEmitPerExecute, MaxFutureWaitTime, MaxWaitingFutures } +import java.util.concurrent.TimeUnit +import org.openjdk.jmh.annotations.{ Benchmark, BenchmarkMode, Mode, OutputTimeUnit, Scope, State } +import org.openjdk.jmh.infra.Blackhole + +/** + * Command to run (should be executed from sbt with project summingbird-online-perf): + * jmh:run -i 10 -wi 10 -f1 -t1 ComposedFlatMapWithTimePerformance + * + * Benchmark Mode Cnt Score Error Units + * ComposedFlatMapWithTimePerformance.measureDirectComplex avgt 10 340.341 ± 4.343 ms/op + * ComposedFlatMapWithTimePerformance.measureDirectOperationContainer avgt 10 386.593 ± 6.474 ms/op + * ComposedFlatMapWithTimePerformance.measureDirectSimple avgt 10 279.628 ± 6.175 ms/op + * ComposedFlatMapWithTimePerformance.measureOperationContainerComposeThroughOperation avgt 10 2978.141 ± 75.213 ms/op + * ComposedFlatMapWithTimePerformance.measureOperationContainerDirectCompose avgt 10 1198.601 ± 54.871 ms/op + */ +@State(Scope.Thread) +class ComposedFlatMapWithTimePerformance { + val size = 1000000 + val input = (0 to size).map(i => (Timestamp(0), i)).toArray + + val f1: (Int => TraversableOnce[Int]) = x => Some(x * 2) + val f2: (Int => TraversableOnce[(Int, Int)]) = x => Some((x, x)) + val f3: (((Int, Int)) => TraversableOnce[Int]) = pair => List( + pair._1, + pair._2, + pair._1 + pair._2, + pair._1 * pair._2 + ) + + val composed: (Int) => TraversableOnce[Int] = + flatMap[Int, (Int, Int), Int](flatMap[Int, Int, (Int, Int)](f1, f2), f3) + val composedOp: FlatMapOperation[Int, Int] = + FlatMapOperation.apply(f1).flatMap(f2).flatMap(f3) + + val composedWithTimeSimple: ((Timestamp, Int)) => TraversableOnce[(Timestamp, Int)] = + withTime(composed) + val composedWithTimeComplex: ((Timestamp, Int)) => TraversableOnce[(Timestamp, Int)] = + flatMap( + flatMap(withTime(f1), withTime(f2)), + withTime(f3) + ) + val composedContainerDirect: OperationContainer[(Timestamp, Int), (Timestamp, Int), InputState[Int]] = + new SimpleFlatMap(composedWithTimeComplex) + val composedContainerComposeThroughOperation: + OperationContainer[(Timestamp, Int), (Timestamp, Int), InputState[Int]] = new IntermediateFlatMap( + withTimeOp(composedOp), + MaxWaitingFutures(1000), + MaxFutureWaitTime(1000.millis), + MaxEmitPerExecute(Int.MaxValue) + ) + val composedContainerDirectCompose: + OperationContainer[(Timestamp, Int), (Timestamp, Int), InputState[Int]] = new IntermediateFlatMap( + FlatMapOperation.apply(composedWithTimeComplex), + MaxWaitingFutures(1000), + MaxFutureWaitTime(1000.millis), + MaxEmitPerExecute(Int.MaxValue) + ) + + @Benchmark + @BenchmarkMode(Array(Mode.AverageTime)) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + def measureDirectSimple(bh: Blackhole): Unit = + input.foreach(composedWithTimeSimple.apply(_).foreach(bh.consume)) + + @Benchmark + @BenchmarkMode(Array(Mode.AverageTime)) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + def measureDirectComplex(bh: Blackhole): Unit = + input.foreach(composedWithTimeComplex.apply(_).foreach(bh.consume)) + + @Benchmark + @BenchmarkMode(Array(Mode.AverageTime)) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + def measureDirectOperationContainer(bh: Blackhole): Unit = + TestUtils.testOperationContainer(composedContainerDirect, input, bh) + + @Benchmark + @BenchmarkMode(Array(Mode.AverageTime)) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + def measureOperationContainerComposeThroughOperation(bh: Blackhole): Unit = + TestUtils.testOperationContainer(composedContainerComposeThroughOperation, input, bh) + + @Benchmark + @BenchmarkMode(Array(Mode.AverageTime)) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + def measureOperationContainerDirectCompose(bh: Blackhole): Unit = + TestUtils.testOperationContainer(composedContainerDirectCompose, input, bh) + + def flatMap[A, B, C]( + f1: A => TraversableOnce[B], + f2: B => TraversableOnce[C] + ): (A => TraversableOnce[C]) = + x => f1(x).flatMap(f2) + + def withTime[A, B](f: A => TraversableOnce[B]): ((Timestamp, A)) => TraversableOnce[(Timestamp, B)] = { + case (timestamp, v) => f.apply(v).map((timestamp, _)) + } + + def withTimeOp[A, B](f: FlatMapOperation[A, B]): FlatMapOperation[(Timestamp, A), (Timestamp, B)] = + FlatMapOperation.generic { case (timestamp, v) => + f.apply(v).map(elements => elements.map((timestamp, _))) + } +} diff --git a/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/IdentityFlatMapPerformance.scala b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/IdentityFlatMapPerformance.scala new file mode 100644 index 000000000..c341d7612 --- /dev/null +++ b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/IdentityFlatMapPerformance.scala @@ -0,0 +1,52 @@ +package com.twitter.summingbird.online.executor + +import com.twitter.conversions.time.longToTimeableNumber +import com.twitter.summingbird.online.FlatMapOperation +import com.twitter.summingbird.online.option.{ MaxEmitPerExecute, MaxFutureWaitTime, MaxWaitingFutures } +import java.util.concurrent.TimeUnit +import org.openjdk.jmh.annotations.{ Benchmark, BenchmarkMode, Mode, OutputTimeUnit, Scope, State } +import org.openjdk.jmh.infra.Blackhole + +/** + * Command to run (should be executed from sbt with project summingbird-online-perf): + * jmh:run -i 10 -wi 10 -f1 -t1 IdentityFlatMapPerformance + * + * Benchmark Mode Cnt Score Error Units + * IdentityFlatMapPerformance.measureDirect avgt 10 42.184 ± 3.518 ms/op + * IdentityFlatMapPerformance.measureDirectOperationContainer avgt 10 101.944 ± 3.956 ms/op + * IdentityFlatMapPerformance.measureOperationContainer avgt 10 817.659 ± 36.591 ms/op + */ +@State(Scope.Thread) +class IdentityFlatMapPerformance { + val size = 1000000 + val input = Array.range(0, size) + + val f: (Int => TraversableOnce[Int]) = x => Iterator(x) + val containerDirect: OperationContainer[Int, Int, InputState[Int]] = + new SimpleFlatMap(f) + val container: OperationContainer[Int, Int, InputState[Int]] = + new IntermediateFlatMap( + FlatMapOperation.apply(f), + MaxWaitingFutures(1000), + MaxFutureWaitTime(1000.millis), + MaxEmitPerExecute(Int.MaxValue) + ) + + @Benchmark + @BenchmarkMode(Array(Mode.AverageTime)) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + def measureDirect(bh: Blackhole): Unit = + input.foreach(f.apply(_).foreach(bh.consume)) + + @Benchmark + @BenchmarkMode(Array(Mode.AverageTime)) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + def measureDirectOperationContainer(bh: Blackhole): Unit = + TestUtils.testOperationContainer(containerDirect, input, bh) + + @Benchmark + @BenchmarkMode(Array(Mode.AverageTime)) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + def measureOperationContainer(bh: Blackhole): Unit = + TestUtils.testOperationContainer(container, input, bh) +} diff --git a/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/SimpleFlatMap.scala b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/SimpleFlatMap.scala new file mode 100644 index 000000000..7c3f891e6 --- /dev/null +++ b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/SimpleFlatMap.scala @@ -0,0 +1,14 @@ +package com.twitter.summingbird.online.executor + +import chain.Chain +import scala.util.Try + +private[executor] class SimpleFlatMap[I, O, S](f: I => TraversableOnce[O]) + extends OperationContainer[I, O, S] { + + override def executeTick: TraversableOnce[(Chain[S], Try[TraversableOnce[O]])] = + None + + override def execute(state: S,data: I): TraversableOnce[(Chain[S], Try[TraversableOnce[O]])] = + Some((Chain.single(state), Try(f(data)))) +} diff --git a/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/TestUtils.scala b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/TestUtils.scala new file mode 100644 index 000000000..83153099a --- /dev/null +++ b/summingbird-online-perf/src/main/scala/com/twitter/summingbird/online/executor/TestUtils.scala @@ -0,0 +1,43 @@ +package com.twitter.summingbird.online.executor + +import org.openjdk.jmh.infra.Blackhole +import scala.concurrent.duration.DurationInt +import scala.util.{Failure, Success} + +object TestUtils { + val TickDelay = 5.millis + + def testOperationContainer[I, O]( + container: OperationContainer[I, O, InputState[Int]], + inputs: Array[I], + bh: Blackhole + ): Unit = { + var left = inputs.length + for ( + input <- inputs; + emitted <- container.execute(InputState[Int](1), input) + ) { + emitted match { + case (states, output) => + states.foreach { _ => left -= 1 } + output match { + case Success(value) => value.foreach(bh.consume) + case Failure(exception) => bh.consume(exception) + } + } + } + while (left != 0) { + Thread.sleep(TickDelay.toMillis) + for (emitted <- container.executeTick) { + emitted match { + case (states, output) => + states.foreach { _ => left -= 1 } + output match { + case Success(value) => value.foreach(bh.consume) + case Failure(exception) => bh.consume(exception) + } + } + } + } + } +}