Skip to content

Commit

Permalink
PubSub source scale the parallel pull count with number of cores (#78)
Browse files Browse the repository at this point in the history
The PubSub Source parameter `parallelPullCount` was used to set the
parallelism of the underlying Subscriber. With a higher pull count, the
Subscriber can supply events more quickly to the downstream of the
application, but there is more overhead.

For typical Snowplow apps, a pull count of 1 is sufficient on small
instances. But when there is more cpu availalbe, the downstream app
processes events more quickly, and therefore we need a higher pull count
to provide the events.

This PR makes it so pull count is picked dynamically based on available
cpu. Snowplow apps on bigger instances will automatically get the
benefit of this change, without requiring the user to explicitly set the
pull count.
  • Loading branch information
istreeter authored May 25, 2024
1 parent 13a7d10 commit 9641083
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 5 deletions.
2 changes: 1 addition & 1 deletion modules/pubsub/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
snowplow.defaults: {
sources: {
pubsub: {
parallelPullCount: 1
parallelPullFactor: 0.5
bufferMaxBytes: 10000000
maxAckExtensionPeriod: "1 hour"
minDurationPerAckExtension: "60 seconds"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,9 @@ object PubsubSource {

private def runSubscriber[F[_]: Async](config: PubsubSourceConfig, control: Control): Resource[F, Unit] =
for {
executor <- executorResource(Sync[F].delay(Executors.newScheduledThreadPool(2 * config.parallelPullCount)))
direct <- executorResource(Sync[F].delay(MoreExecutors.newDirectExecutorService()))
parallelPullCount = chooseNumParallelPulls(config)
executor <- executorResource(Sync[F].delay(Executors.newScheduledThreadPool(2 * parallelPullCount)))
receiver = messageReceiver(config, control)
name = ProjectSubscriptionName.of(config.subscription.projectId, config.subscription.subscriptionId)
subscriber <- Resource.eval(Sync[F].delay {
Expand All @@ -196,7 +197,7 @@ object PubsubSource {
.setMaxAckExtensionPeriod(convertDuration(config.maxAckExtensionPeriod))
.setMaxDurationPerAckExtension(convertDuration(config.maxDurationPerAckExtension))
.setMinDurationPerAckExtension(convertDuration(config.minDurationPerAckExtension))
.setParallelPullCount(config.parallelPullCount)
.setParallelPullCount(parallelPullCount)
.setExecutorProvider(FixedExecutorProvider.create(executorForEventCallbacks(direct, executor)))
.setSystemExecutorProvider(FixedExecutorProvider.create(executor))
.setFlowControlSettings {
Expand Down Expand Up @@ -310,4 +311,17 @@ object PubsubSource {

private def convertDuration(d: FiniteDuration): ThreetenDuration =
ThreetenDuration.ofMillis(d.toMillis)

/**
* Converts `parallelPullFactor` to a suggested number of parallel pulls
*
* For bigger instances (more cores) the downstream processor can typically process events more
* quickly. So the PubSub subscriber needs more parallelism in order to keep downstream saturated
* with events.
*/
private def chooseNumParallelPulls(config: PubsubSourceConfig): Int =
(Runtime.getRuntime.availableProcessors * config.parallelPullFactor)
.setScale(0, BigDecimal.RoundingMode.UP)
.toInt

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import com.snowplowanalytics.snowplow.pubsub.GcpUserAgent

case class PubsubSourceConfig(
subscription: PubsubSourceConfig.Subscription,
parallelPullCount: Int,
parallelPullFactor: BigDecimal,
bufferMaxBytes: Int,
maxAckExtensionPeriod: FiniteDuration,
minDurationPerAckExtension: FiniteDuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class PubsubSourceConfigSpec extends Specification {

val expected = PubsubSourceConfig(
subscription = PubsubSourceConfig.Subscription("my-project", "my-subscription"),
parallelPullCount = 1,
parallelPullFactor = BigDecimal(0.5),
bufferMaxBytes = 10000000,
maxAckExtensionPeriod = 1.hour,
minDurationPerAckExtension = 1.minute,
Expand Down

0 comments on commit 9641083

Please sign in to comment.