Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Add performance tests for flatMap operation in OperationContainer #740

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ lazy val summingbird = Project(
summingbirdBuilder,
summingbirdChill,
summingbirdExample,
summingbirdCoreTest
summingbirdCoreTest,
summingbirdOnlinePerf
)

/**
Expand Down Expand Up @@ -314,6 +315,10 @@ lazy val summingbirdOnline = module("online").settings(
summingbirdClient
)

lazy val summingbirdOnlinePerf = module("online-perf")
.enablePlugins(JmhPlugin)
.dependsOn(summingbirdOnline)

lazy val summingbirdStorm = module("storm").settings(
parallelExecution in Test := false,
libraryDependencies ++= Seq(
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.12")
addSbtPlugin("com.typesafe.sbt" % "sbt-ghpages" % "0.5.4")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.0")
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "1.0")
addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.2.18")
addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.2.24")
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.twitter.summingbird.online.executor

import com.twitter.conversions.time.longToTimeableNumber
import com.twitter.summingbird.online.FlatMapOperation
import com.twitter.summingbird.online.option.{ MaxEmitPerExecute, MaxFutureWaitTime, MaxWaitingFutures }
import java.util.concurrent.TimeUnit
import org.openjdk.jmh.annotations.{ Benchmark, BenchmarkMode, Mode, OutputTimeUnit, Scope, State }
import org.openjdk.jmh.infra.Blackhole

/**
* Command to run (should be executed from sbt with project summingbird-online-perf):
* jmh:run -i 10 -wi 10 -f1 -t1 ComposedFlatMapPerformance
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest adding these options:

-prof gc to enable the GC profiler
-gc to force a GC between runs. Without it, 10 warmup and actual iterations might not be enough to exclude GC noise
-foe true to fail the benchmark if the code fails. I've seen benchmarks that only test that the benchmark is wrong :)
-jvmArgs "-Xms1G -Xmx1G" to make sure that the allocated memory is the same across different dev environments

*
* Benchmark Mode Cnt Score Error Units
* ComposedFlatMapPerformance.measureDirect avgt 10 239.003 ± 19.990 ms/op
* ComposedFlatMapPerformance.measureDirectOperationContainer avgt 10 295.847 ± 14.774 ms/op
* ComposedFlatMapPerformance.measureOperationContainerComposeThroughOperation avgt 10 2733.537 ± 63.415 ms/op
* ComposedFlatMapPerformance.measureOperationContainerDirectCompose avgt 10 1084.457 ± 22.968 ms/op
*/
@State(Scope.Thread)
class ComposedFlatMapPerformance {
val size = 1000000
val input = Array.range(0, size)

val f1: (Int => TraversableOnce[Int]) = x => Some(x * 2)
val f2: (Int => TraversableOnce[(Int, Int)]) = x => Some((x, x))
val f3: (((Int, Int)) => TraversableOnce[Int]) = pair => List(
pair._1,
pair._2,
pair._1 + pair._2,
pair._1 * pair._2
)

val composed: (Int) => TraversableOnce[Int] =
flatMap[Int, (Int, Int), Int](flatMap[Int, Int, (Int, Int)](f1, f2), f3)
val composedContainerDirect: OperationContainer[Int, Int, InputState[Int]] =
new SimpleFlatMap(composed)
val composedContainer: OperationContainer[Int, Int, InputState[Int]] =
new IntermediateFlatMap(
FlatMapOperation.apply(f1).flatMap(f2).flatMap(f3),
MaxWaitingFutures(1000),
MaxFutureWaitTime(1000.millis),
MaxEmitPerExecute(Int.MaxValue)
)
val directlyComposedContainer: OperationContainer[Int, Int, InputState[Int]] =
new IntermediateFlatMap(
FlatMapOperation.apply(composed),
MaxWaitingFutures(1000),
MaxFutureWaitTime(1000.millis),
MaxEmitPerExecute(Int.MaxValue)
)

@Benchmark
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.MILLISECONDS)
def measureDirect(bh: Blackhole): Unit =
input.foreach(composed.apply(_).foreach(bh.consume))
Copy link

@fwbrasil fwbrasil Jul 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function allocation here (composed.apply(_)) introduces some noise. Maybe it's worth extracting it to a class val


@Benchmark
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.MILLISECONDS)
def measureDirectOperationContainer(bh: Blackhole): Unit =
TestUtils.testOperationContainer(composedContainerDirect, input, bh)

@Benchmark
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.MILLISECONDS)
def measureOperationContainerComposeThroughOperation(bh: Blackhole): Unit =
TestUtils.testOperationContainer(composedContainer, input, bh)

@Benchmark
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.MILLISECONDS)
def measureOperationContainerDirectCompose(bh: Blackhole): Unit =
TestUtils.testOperationContainer(directlyComposedContainer, input, bh)

def flatMap[A, B, C](
f1: A => TraversableOnce[B],
f2: B => TraversableOnce[C]
): (A => TraversableOnce[C]) =
x => f1(x).flatMap(f2)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package com.twitter.summingbird.online.executor

import com.twitter.conversions.time.longToTimeableNumber
import com.twitter.summingbird.batch.Timestamp
import com.twitter.summingbird.online.FlatMapOperation
import com.twitter.summingbird.online.option.{ MaxEmitPerExecute, MaxFutureWaitTime, MaxWaitingFutures }
import java.util.concurrent.TimeUnit
import org.openjdk.jmh.annotations.{ Benchmark, BenchmarkMode, Mode, OutputTimeUnit, Scope, State }
import org.openjdk.jmh.infra.Blackhole

/**
* Command to run (should be executed from sbt with project summingbird-online-perf):
* jmh:run -i 10 -wi 10 -f1 -t1 ComposedFlatMapWithTimePerformance
*
* Benchmark Mode Cnt Score Error Units
* ComposedFlatMapWithTimePerformance.measureDirectComplex avgt 10 340.341 ± 4.343 ms/op
* ComposedFlatMapWithTimePerformance.measureDirectOperationContainer avgt 10 386.593 ± 6.474 ms/op
* ComposedFlatMapWithTimePerformance.measureDirectSimple avgt 10 279.628 ± 6.175 ms/op
* ComposedFlatMapWithTimePerformance.measureOperationContainerComposeThroughOperation avgt 10 2978.141 ± 75.213 ms/op
* ComposedFlatMapWithTimePerformance.measureOperationContainerDirectCompose avgt 10 1198.601 ± 54.871 ms/op
*/
@State(Scope.Thread)
class ComposedFlatMapWithTimePerformance {
val size = 1000000
val input = (0 to size).map(i => (Timestamp(0), i)).toArray

val f1: (Int => TraversableOnce[Int]) = x => Some(x * 2)
val f2: (Int => TraversableOnce[(Int, Int)]) = x => Some((x, x))
val f3: (((Int, Int)) => TraversableOnce[Int]) = pair => List(
pair._1,
pair._2,
pair._1 + pair._2,
pair._1 * pair._2
)

val composed: (Int) => TraversableOnce[Int] =
flatMap[Int, (Int, Int), Int](flatMap[Int, Int, (Int, Int)](f1, f2), f3)
val composedOp: FlatMapOperation[Int, Int] =
FlatMapOperation.apply(f1).flatMap(f2).flatMap(f3)

val composedWithTimeSimple: ((Timestamp, Int)) => TraversableOnce[(Timestamp, Int)] =
withTime(composed)
val composedWithTimeComplex: ((Timestamp, Int)) => TraversableOnce[(Timestamp, Int)] =
flatMap(
flatMap(withTime(f1), withTime(f2)),
withTime(f3)
)
val composedContainerDirect: OperationContainer[(Timestamp, Int), (Timestamp, Int), InputState[Int]] =
new SimpleFlatMap(composedWithTimeComplex)
val composedContainerComposeThroughOperation:
OperationContainer[(Timestamp, Int), (Timestamp, Int), InputState[Int]] = new IntermediateFlatMap(
withTimeOp(composedOp),
MaxWaitingFutures(1000),
MaxFutureWaitTime(1000.millis),
MaxEmitPerExecute(Int.MaxValue)
)
val composedContainerDirectCompose:
OperationContainer[(Timestamp, Int), (Timestamp, Int), InputState[Int]] = new IntermediateFlatMap(
FlatMapOperation.apply(composedWithTimeComplex),
MaxWaitingFutures(1000),
MaxFutureWaitTime(1000.millis),
MaxEmitPerExecute(Int.MaxValue)
)

@Benchmark
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.MILLISECONDS)
def measureDirectSimple(bh: Blackhole): Unit =
input.foreach(composedWithTimeSimple.apply(_).foreach(bh.consume))

@Benchmark
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.MILLISECONDS)
def measureDirectComplex(bh: Blackhole): Unit =
input.foreach(composedWithTimeComplex.apply(_).foreach(bh.consume))

@Benchmark
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.MILLISECONDS)
def measureDirectOperationContainer(bh: Blackhole): Unit =
TestUtils.testOperationContainer(composedContainerDirect, input, bh)

@Benchmark
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.MILLISECONDS)
def measureOperationContainerComposeThroughOperation(bh: Blackhole): Unit =
TestUtils.testOperationContainer(composedContainerComposeThroughOperation, input, bh)

@Benchmark
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.MILLISECONDS)
def measureOperationContainerDirectCompose(bh: Blackhole): Unit =
TestUtils.testOperationContainer(composedContainerDirectCompose, input, bh)

def flatMap[A, B, C](
f1: A => TraversableOnce[B],
f2: B => TraversableOnce[C]
): (A => TraversableOnce[C]) =
x => f1(x).flatMap(f2)

def withTime[A, B](f: A => TraversableOnce[B]): ((Timestamp, A)) => TraversableOnce[(Timestamp, B)] = {
case (timestamp, v) => f.apply(v).map((timestamp, _))
}

def withTimeOp[A, B](f: FlatMapOperation[A, B]): FlatMapOperation[(Timestamp, A), (Timestamp, B)] =
FlatMapOperation.generic { case (timestamp, v) =>
f.apply(v).map(elements => elements.map((timestamp, _)))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.twitter.summingbird.online.executor

import com.twitter.conversions.time.longToTimeableNumber
import com.twitter.summingbird.online.FlatMapOperation
import com.twitter.summingbird.online.option.{ MaxEmitPerExecute, MaxFutureWaitTime, MaxWaitingFutures }
import java.util.concurrent.TimeUnit
import org.openjdk.jmh.annotations.{ Benchmark, BenchmarkMode, Mode, OutputTimeUnit, Scope, State }
import org.openjdk.jmh.infra.Blackhole

/**
* Command to run (should be executed from sbt with project summingbird-online-perf):
* jmh:run -i 10 -wi 10 -f1 -t1 IdentityFlatMapPerformance
*
* Benchmark Mode Cnt Score Error Units
* IdentityFlatMapPerformance.measureDirect avgt 10 42.184 ± 3.518 ms/op
* IdentityFlatMapPerformance.measureDirectOperationContainer avgt 10 101.944 ± 3.956 ms/op
* IdentityFlatMapPerformance.measureOperationContainer avgt 10 817.659 ± 36.591 ms/op
*/
@State(Scope.Thread)
class IdentityFlatMapPerformance {
val size = 1000000
val input = Array.range(0, size)

val f: (Int => TraversableOnce[Int]) = x => Iterator(x)
val containerDirect: OperationContainer[Int, Int, InputState[Int]] =
new SimpleFlatMap(f)
val container: OperationContainer[Int, Int, InputState[Int]] =
new IntermediateFlatMap(
FlatMapOperation.apply(f),
MaxWaitingFutures(1000),
MaxFutureWaitTime(1000.millis),
MaxEmitPerExecute(Int.MaxValue)
)

@Benchmark
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.MILLISECONDS)
def measureDirect(bh: Blackhole): Unit =
input.foreach(f.apply(_).foreach(bh.consume))

@Benchmark
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.MILLISECONDS)
def measureDirectOperationContainer(bh: Blackhole): Unit =
TestUtils.testOperationContainer(containerDirect, input, bh)

@Benchmark
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.MILLISECONDS)
def measureOperationContainer(bh: Blackhole): Unit =
TestUtils.testOperationContainer(container, input, bh)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.twitter.summingbird.online.executor

import chain.Chain
import scala.util.Try

private[executor] class SimpleFlatMap[I, O, S](f: I => TraversableOnce[O])
extends OperationContainer[I, O, S] {

override def executeTick: TraversableOnce[(Chain[S], Try[TraversableOnce[O]])] =
None

override def execute(state: S,data: I): TraversableOnce[(Chain[S], Try[TraversableOnce[O]])] =
Some((Chain.single(state), Try(f(data))))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.twitter.summingbird.online.executor

import org.openjdk.jmh.infra.Blackhole
import scala.concurrent.duration.DurationInt
import scala.util.{Failure, Success}

object TestUtils {
val TickDelay = 5.millis

def testOperationContainer[I, O](
container: OperationContainer[I, O, InputState[Int]],
inputs: Array[I],
bh: Blackhole
): Unit = {
var left = inputs.length
for (
input <- inputs;
emitted <- container.execute(InputState[Int](1), input)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a sense of how expensive container.execute is? I ask because testOperationContainer introduces some noise so its overhead could dominate the final result.

) {
emitted match {
case (states, output) =>
states.foreach { _ => left -= 1 }
output match {
case Success(value) => value.foreach(bh.consume)
case Failure(exception) => bh.consume(exception)
}
}
}
while (left != 0) {
Thread.sleep(TickDelay.toMillis)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty skeptical we are comparing apples to apples when we compare this to non OperationContainers.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm skeptical as well, but I wanted to see at least some number for non OperationContainer execution. In the end I don't think it should be anyhow actionable.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread.sleep adds noise to the result. The blackhole provides a method to simulate CPU usage. I think it's consumeCpu

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I've just noticed that this is actually to check when it's completed. Maybe you could use a CountDownLatch for it?

for (emitted <- container.executeTick) {
emitted match {
case (states, output) =>
states.foreach { _ => left -= 1 }
output match {
case Success(value) => value.foreach(bh.consume)
case Failure(exception) => bh.consume(exception)
}
}
}
}
}
}