From 9641083bc341a14397cb6cbf0b0bd62b6a8c6a5f Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Sat, 25 May 2024 09:31:45 +0100 Subject: [PATCH] PubSub source scale the parallel pull count with number of cores (#78) The PubSub Source parameter `parallelPullCount` was used to set the parallelism of the underlying Subscriber. With a higher pull count, the Subscriber can supply events more quickly to the downstream of the application, but there is more overhead. For typical Snowplow apps, a pull count of 1 is sufficient on small instances. But when there is more cpu availalbe, the downstream app processes events more quickly, and therefore we need a higher pull count to provide the events. This PR makes it so pull count is picked dynamically based on available cpu. Snowplow apps on bigger instances will automatically get the benefit of this change, without requiring the user to explicitly set the pull count. --- .../pubsub/src/main/resources/reference.conf | 2 +- .../snowplow/sources/pubsub/PubsubSource.scala | 18 ++++++++++++++++-- .../sources/pubsub/PubsubSourceConfig.scala | 2 +- .../pubsub/PubsubSourceConfigSpec.scala | 2 +- 4 files changed, 19 insertions(+), 5 deletions(-) diff --git a/modules/pubsub/src/main/resources/reference.conf b/modules/pubsub/src/main/resources/reference.conf index 0cef7e4..c7f9e5d 100644 --- a/modules/pubsub/src/main/resources/reference.conf +++ b/modules/pubsub/src/main/resources/reference.conf @@ -1,7 +1,7 @@ snowplow.defaults: { sources: { pubsub: { - parallelPullCount: 1 + parallelPullFactor: 0.5 bufferMaxBytes: 10000000 maxAckExtensionPeriod: "1 hour" minDurationPerAckExtension: "60 seconds" diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala index 585ab33..08754f3 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala @@ -186,8 +186,9 @@ object PubsubSource { private def runSubscriber[F[_]: Async](config: PubsubSourceConfig, control: Control): Resource[F, Unit] = for { - executor <- executorResource(Sync[F].delay(Executors.newScheduledThreadPool(2 * config.parallelPullCount))) direct <- executorResource(Sync[F].delay(MoreExecutors.newDirectExecutorService())) + parallelPullCount = chooseNumParallelPulls(config) + executor <- executorResource(Sync[F].delay(Executors.newScheduledThreadPool(2 * parallelPullCount))) receiver = messageReceiver(config, control) name = ProjectSubscriptionName.of(config.subscription.projectId, config.subscription.subscriptionId) subscriber <- Resource.eval(Sync[F].delay { @@ -196,7 +197,7 @@ object PubsubSource { .setMaxAckExtensionPeriod(convertDuration(config.maxAckExtensionPeriod)) .setMaxDurationPerAckExtension(convertDuration(config.maxDurationPerAckExtension)) .setMinDurationPerAckExtension(convertDuration(config.minDurationPerAckExtension)) - .setParallelPullCount(config.parallelPullCount) + .setParallelPullCount(parallelPullCount) .setExecutorProvider(FixedExecutorProvider.create(executorForEventCallbacks(direct, executor))) .setSystemExecutorProvider(FixedExecutorProvider.create(executor)) .setFlowControlSettings { @@ -310,4 +311,17 @@ object PubsubSource { private def convertDuration(d: FiniteDuration): ThreetenDuration = ThreetenDuration.ofMillis(d.toMillis) + + /** + * Converts `parallelPullFactor` to a suggested number of parallel pulls + * + * For bigger instances (more cores) the downstream processor can typically process events more + * quickly. So the PubSub subscriber needs more parallelism in order to keep downstream saturated + * with events. + */ + private def chooseNumParallelPulls(config: PubsubSourceConfig): Int = + (Runtime.getRuntime.availableProcessors * config.parallelPullFactor) + .setScale(0, BigDecimal.RoundingMode.UP) + .toInt + } diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfig.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfig.scala index eb2af1b..50f45a9 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfig.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfig.scala @@ -17,7 +17,7 @@ import com.snowplowanalytics.snowplow.pubsub.GcpUserAgent case class PubsubSourceConfig( subscription: PubsubSourceConfig.Subscription, - parallelPullCount: Int, + parallelPullFactor: BigDecimal, bufferMaxBytes: Int, maxAckExtensionPeriod: FiniteDuration, minDurationPerAckExtension: FiniteDuration, diff --git a/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfigSpec.scala b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfigSpec.scala index 3bc3c3a..eaf7a3c 100644 --- a/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfigSpec.scala +++ b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfigSpec.scala @@ -42,7 +42,7 @@ class PubsubSourceConfigSpec extends Specification { val expected = PubsubSourceConfig( subscription = PubsubSourceConfig.Subscription("my-project", "my-subscription"), - parallelPullCount = 1, + parallelPullFactor = BigDecimal(0.5), bufferMaxBytes = 10000000, maxAckExtensionPeriod = 1.hour, minDurationPerAckExtension = 1.minute,