diff --git a/build.sbt b/build.sbt index a07c06feb..baa5db1b3 100644 --- a/build.sbt +++ b/build.sbt @@ -179,11 +179,6 @@ lazy val kafkaDistroless = project .settings(addCompilerPlugin(betterMonadicFor)) .dependsOn(commonFs2) -lazy val bench = project - .in(file("modules/bench")) - .dependsOn(pubsub % "test->test") - .enablePlugins(JmhPlugin) - lazy val rabbitmq = project .in(file("modules/rabbitmq")) .enablePlugins(BuildInfoPlugin, JavaAppPackaging, SnowplowDockerPlugin) diff --git a/modules/bench/build.sbt b/modules/bench/build.sbt deleted file mode 100644 index 9ce58ff1f..000000000 --- a/modules/bench/build.sbt +++ /dev/null @@ -1,6 +0,0 @@ -Jmh / sourceDirectory := (Test / sourceDirectory).value -Jmh / classDirectory := (Test / classDirectory).value -Jmh / dependencyClasspath := (Test / dependencyClasspath).value -// rewire tasks, so that 'jmh:run' automatically invokes 'jmh:compile' (otherwise a clean 'jmh:run' would fail) -Jmh / compile := (Jmh / compile).dependsOn(Test / compile).value -Jmh / run := (Jmh / run).dependsOn(Jmh / Keys.compile).evaluated \ No newline at end of file diff --git a/modules/bench/src/test/resources/simplelogger.properties b/modules/bench/src/test/resources/simplelogger.properties deleted file mode 100644 index 02dcc4765..000000000 --- a/modules/bench/src/test/resources/simplelogger.properties +++ /dev/null @@ -1,2 +0,0 @@ -org.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.enrich.pubsub.Assets=off -org.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.enrich.pubsub.test.TestEnvironment=off diff --git a/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EnrichBench.scala b/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EnrichBench.scala deleted file mode 100644 index ebe8c0c1b..000000000 --- a/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EnrichBench.scala +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Copyright (c) 2020-2022 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.enrich.bench - -import org.openjdk.jmh.annotations._ - -import java.util.concurrent.TimeUnit - -import cats.effect.{ContextShift, IO, Clock, Blocker} - -import io.circe.literal._ - -import fs2.Stream - -import com.snowplowanalytics.iglu.client.Client - -import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry -import com.snowplowanalytics.snowplow.enrich.common.loaders.ThriftLoader -import com.snowplowanalytics.snowplow.enrich.pubsub.test.TestEnvironment -import com.snowplowanalytics.snowplow.enrich.pubsub.{Enrich, Environment, EnrichSpec, Payload} - -import org.apache.http.message.BasicNameValuePair - - -/** - * @example - * {{{ - * jmh:run -i 15 -wi 10 -f1 -t1 EnrichBench - * }}} - */ -@State(Scope.Thread) -@BenchmarkMode(Array(Mode.AverageTime)) -@OutputTimeUnit(TimeUnit.MICROSECONDS) -class EnrichBench { - - implicit val ioClock: Clock[IO] = Clock.create[IO] - - val client = Client.parseDefault[IO](json""" - { - "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1", - "data": { - "cacheSize": 500, - "repositories": [ - { - "name": "Iglu Central", - "priority": 0, - "vendorPrefixes": [ "com.snowplowanalytics" ], - "connection": { - "http": { - "uri": "http://iglucentral.com" - } - } - }, - { - "name": "Iglu Central - GCP Mirror", - "priority": 1, - "vendorPrefixes": [ "com.snowplowanalytics" ], - "connection": { - "http": { - "uri": "http://mirror01.iglucentral.com" - } - } - } - ] - } - } - """).rethrowT.unsafeRunSync() - - @Benchmark - def measureEnrichWithMinimalPayload(state: EnrichBench.BenchState) = { - implicit val CS: ContextShift[IO] = state.contextShift - Enrich.enrichWith[IO](IO.pure(EnrichmentRegistry()), client, None, (_: Option[Long]) => IO.unit)(state.raw).unsafeRunSync() - } - - @Benchmark - def measureToCollectorPayload(state: EnrichBench.BenchState) = { - ThriftLoader.toCollectorPayload(state.raw.data, Enrich.processor) - } - - @Benchmark - @OperationsPerInvocation(50) // 5 events repetated 10 times - def measureRunWithNoEnrichments(state: EnrichBench.BenchState) = { - // We used this benchmark to check if running the whole `enrichWith` on a blocking - // thread-pool will give us increase in performance. Results haven't confirm it: - // EnrichBench.measureRunWithNoEnrichments avgt 15 341.144 ± 18.884 us/op <- smaller blocker - // EnrichBench.measureRunWithNoEnrichments avgt 15 326.608 ± 16.714 us/op <- wrapping blocker - // EnrichBench.measureRunWithNoEnrichments avgt 15 292.907 ± 15.894 us/op <- no blocker at all - // However, I'm still leaving the "smaller blocker" in a hope that with actual IO enrichments - // it will give the expected increase in performance - implicit val CS: ContextShift[IO] = state.contextShift - state.useEnvironment(e => Enrich.run[IO](e).compile.drain).unsafeRunSync() - } -} - -object EnrichBench { - @State(Scope.Benchmark) - class BenchState { - var raw: Payload[IO, Array[Byte]] = _ - var useEnvironment: (Environment[IO] => IO[Unit]) => IO[Unit] = _ - var contextShift: ContextShift[IO] = _ - var blocker: Blocker = _ - - @Setup(Level.Trial) - def setup(): Unit = { - - raw = EnrichSpec.payload[IO] - - val input = Stream.emits(List( - EnrichSpec.collectorPayload.copy( - querystring = new BasicNameValuePair("ip", "125.12.2.40") :: EnrichSpec.querystring - ), - EnrichSpec.collectorPayload.copy( - querystring = new BasicNameValuePair("ip", "125.12.2.41") :: EnrichSpec.querystring - ), - EnrichSpec.collectorPayload.copy( - querystring = new BasicNameValuePair("ip", "125.12.2.42") :: EnrichSpec.querystring - ), - EnrichSpec.collectorPayload.copy( - querystring = new BasicNameValuePair("ip", "125.12.2.43") :: EnrichSpec.querystring - ), - EnrichSpec.collectorPayload.copy( - querystring = new BasicNameValuePair("ip", "125.12.2.44") :: EnrichSpec.querystring - ), - )).repeatN(10).map(cp => Payload(cp.toRaw, IO.unit)).covary[IO] - - useEnvironment = TestEnvironment.make(input).map(_.env).use(_: Environment[IO] => IO[Unit]) - - contextShift = IO.contextShift(scala.concurrent.ExecutionContext.global) - - blocker = Blocker[IO].use(IO.pure).unsafeRunSync() - } - } -} diff --git a/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EtlPipelineBench.scala b/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EtlPipelineBench.scala deleted file mode 100644 index 8f2311517..000000000 --- a/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EtlPipelineBench.scala +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright (c) 2020-2022 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.enrich.bench - -import org.openjdk.jmh.annotations._ - -import java.util.concurrent.TimeUnit - -import cats.Id -import cats.data.Validated - -import cats.effect.{IO, Clock} - -import io.circe.Json - -import com.snowplowanalytics.iglu.client.{Resolver, Client, CirceValidator} - -import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline -import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry -import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry -import com.snowplowanalytics.snowplow.enrich.common.SpecHelpers.adaptersSchemas - -import org.joda.time.DateTime - -@State(Scope.Thread) -@BenchmarkMode(Array(Mode.AverageTime, Mode.Throughput)) -@OutputTimeUnit(TimeUnit.MICROSECONDS) -class EtlPipelineBench { - - private implicit val ioClock: Clock[IO] = Clock.create[IO] - - private implicit val idClock: Clock[Id] = new Clock[Id] { - final def realTime(unit: TimeUnit): Id[Long] = - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS) - final def monotonic(unit: TimeUnit): Id[Long] = - unit.convert(System.nanoTime(), TimeUnit.NANOSECONDS) - } - - @Benchmark - def measureProcessEventsIO(state: EtlPipelineBench.BenchState) = { - val payload = EnrichSpec.collectorPayload - EtlPipeline.processEvents[IO](state.adapterRegistry, state.enrichmentRegistryIo, Client.IgluCentral, Enrich.processor, state.dateTime, Validated.Valid(Some(payload))).unsafeRunSync() - } - - @Benchmark - def measureProcessEventsId(state: EtlPipelineBench.BenchState) = { - val payload = EnrichSpec.collectorPayload - EtlPipeline.processEvents[Id](state.adapterRegistry, state.enrichmentRegistryId, state.clientId, Enrich.processor, state.dateTime, Validated.Valid(Some(payload))) - } -} - -object EtlPipelineBench { - - - @State(Scope.Benchmark) - class BenchState { - var dateTime: DateTime = _ - var adapterRegistry: AdapterRegistry = _ - var enrichmentRegistryId: EnrichmentRegistry[Id] = _ - var enrichmentRegistryIo: EnrichmentRegistry[IO] = _ - var clientId: Client[Id, Json] = _ - var clientIO: Client[IO, Json] = _ - - @Setup(Level.Trial) - def setup(): Unit = { - dateTime = DateTime.parse("2010-06-30T01:20+02:00") - adapterRegistry = new AdapterRegistry(adaptersSchemas = adaptersSchemas) - enrichmentRegistryId = EnrichmentRegistry[Id]() - enrichmentRegistryIo = EnrichmentRegistry[IO]() - clientId = Client[Id, Json](Resolver(List(), None), CirceValidator) - clientIO = Client[IO, Json](Resolver(List(), None), CirceValidator) - } - } -} diff --git a/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/ThriftLoaderBench.scala b/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/ThriftLoaderBench.scala deleted file mode 100644 index 5193248fc..000000000 --- a/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/ThriftLoaderBench.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright (c) 2020-2022 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.enrich.bench - -import org.openjdk.jmh.annotations._ -import java.util.concurrent.TimeUnit - -import com.snowplowanalytics.snowplow.enrich.common.loaders.ThriftLoader -import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent -import com.snowplowanalytics.snowplow.enrich.pubsub.{Enrich, EnrichSpec} - -@State(Scope.Thread) -@BenchmarkMode(Array(Mode.AverageTime)) -@OutputTimeUnit(TimeUnit.NANOSECONDS) -class ThriftLoaderBench { - - @Benchmark - def measureToCollectorPayload(state: ThriftLoaderBench.BenchState) = - ThriftLoader.toCollectorPayload(state.data, Enrich.processor) - - @Benchmark - def measureNormalize(state: ThriftLoaderBench.BenchState) = { - Enrich.serializeEnriched(state.event) - } -} - -object ThriftLoaderBench { - @State(Scope.Benchmark) - class BenchState { - var data: Array[Byte] = _ - var event: EnrichedEvent = _ - - @Setup(Level.Trial) - def setup(): Unit = { - data = EnrichSpec.collectorPayload.toRaw - - event = new EnrichedEvent() - event.setApp_id("foo") - event.setEvent_id("deadbeef-dead-dead-dead-deaddeafbeef") - event.setUser_ipaddress("128.0.1.2") - event.setUnstruct_event("""{"some": "json"}""") - } - } -} - diff --git a/project/plugins.sbt b/project/plugins.sbt index 93225d695..ff7866711 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -4,6 +4,5 @@ addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.2") addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.1.17") addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.3.1") addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.9.2") -addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.4.0") addSbtPlugin("com.geirsson" % "sbt-ci-release" % "1.5.7") addSbtPlugin("com.snowplowanalytics" % "sbt-snowplow-release" % "0.2.1")