From 65272c976a1e2bd1658928fa5229f65274d61da0 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Tue, 1 Oct 2024 14:25:01 +0100 Subject: [PATCH] Pubsub source open more transport channels Previously the pubsub SDK opened just a single transport channel (roughly equivalent to a TCP channel). Some snowplow apps we need to batch up a very large number of events from the source before acking. In those circumstances it is good to open more streaming pulls (a type of RPC) to keep the pubsub subscription healthy. But to open more streaming pulls we also need to open more transport channels (TCP channels) to support all the RPCs. This commit adds a new config parameter `maxPullsPerTransportChannel`. The default value 16 is good for most apps. --- .../pubsub/src/main/resources/reference.conf | 1 + .../sources/pubsub/PubsubSource.scala | 26 ++++++++++++++++++- .../sources/pubsub/PubsubSourceConfig.scala | 3 ++- .../pubsub/PubsubSourceConfigSpec.scala | 17 ++++++------ 4 files changed, 37 insertions(+), 10 deletions(-) diff --git a/modules/pubsub/src/main/resources/reference.conf b/modules/pubsub/src/main/resources/reference.conf index c7f9e5d..dc03ea3 100644 --- a/modules/pubsub/src/main/resources/reference.conf +++ b/modules/pubsub/src/main/resources/reference.conf @@ -10,6 +10,7 @@ snowplow.defaults: { productName: "Snowplow OSS" } shutdownTimeout: "30 seconds" + maxPullsPerTransportChannel: 16 } } diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala index 6c3c901..eaacea4 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala @@ -21,7 +21,8 @@ import java.time.Instant import com.google.api.core.ApiService import com.google.api.gax.batching.FlowControlSettings import com.google.api.gax.core.FixedExecutorProvider -import com.google.cloud.pubsub.v1.{AckReplyConsumer, MessageReceiver, Subscriber} +import com.google.api.gax.grpc.ChannelPoolSettings +import com.google.cloud.pubsub.v1.{AckReplyConsumer, MessageReceiver, Subscriber, SubscriptionAdminSettings} import com.google.common.util.concurrent.{ForwardingExecutorService, ListeningExecutorService, MoreExecutors} import com.google.pubsub.v1.{ProjectSubscriptionName, PubsubMessage} import org.threeten.bp.{Duration => ThreetenDuration} @@ -191,6 +192,7 @@ object PubsubSource { for { direct <- executorResource(Sync[F].delay(MoreExecutors.newDirectExecutorService())) parallelPullCount = chooseNumParallelPulls(config) + channelCount = chooseNumTransportChannels(config, parallelPullCount) executor <- executorResource(Sync[F].delay(Executors.newScheduledThreadPool(2 * parallelPullCount))) receiver = messageReceiver(config, control) name = ProjectSubscriptionName.of(config.subscription.projectId, config.subscription.subscriptionId) @@ -208,6 +210,16 @@ object PubsubSource { FlowControlSettings.getDefaultInstance } .setHeaderProvider(GcpUserAgent.headerProvider(config.gcpUserAgent)) + .setChannelProvider { + SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder + .setMaxInboundMessageSize(20 << 20) // copies Subscriber hard-coded default + .setMaxInboundMetadataSize(20 << 20) // copies Subscriber hard-coded default + .setKeepAliveTime(ThreetenDuration.ofMinutes(5)) // copies Subscriber hard-coded default + .setChannelPoolSettings { + ChannelPoolSettings.staticallySized(channelCount) + } + .build + } .build }) _ <- Resource.eval(Sync[F].delay { @@ -327,4 +339,16 @@ object PubsubSource { .setScale(0, BigDecimal.RoundingMode.UP) .toInt + /** + * Picks a sensible number of GRPC transport channels (roughly equivalent to a TCP connection) + * + * GRPC has a hard limit of 100 concurrent RPCs on a channel. And experience shows it is healthy + * to stay much under that limit. If we need to open a large number of streaming pulls then we + * might approach/exceed that limit. + */ + private def chooseNumTransportChannels(config: PubsubSourceConfig, parallelPullCount: Int): Int = + (BigDecimal(parallelPullCount) / config.maxPullsPerTransportChannel) + .setScale(0, BigDecimal.RoundingMode.UP) + .toInt + } diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfig.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfig.scala index 50f45a9..0b0e871 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfig.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfig.scala @@ -23,7 +23,8 @@ case class PubsubSourceConfig( minDurationPerAckExtension: FiniteDuration, maxDurationPerAckExtension: FiniteDuration, gcpUserAgent: GcpUserAgent, - shutdownTimeout: FiniteDuration + shutdownTimeout: FiniteDuration, + maxPullsPerTransportChannel: Int ) object PubsubSourceConfig { diff --git a/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfigSpec.scala b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfigSpec.scala index eaf7a3c..94b9c3f 100644 --- a/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfigSpec.scala +++ b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfigSpec.scala @@ -41,14 +41,15 @@ class PubsubSourceConfigSpec extends Specification { val result = ConfigFactory.load(ConfigFactory.parseString(input)) val expected = PubsubSourceConfig( - subscription = PubsubSourceConfig.Subscription("my-project", "my-subscription"), - parallelPullFactor = BigDecimal(0.5), - bufferMaxBytes = 10000000, - maxAckExtensionPeriod = 1.hour, - minDurationPerAckExtension = 1.minute, - maxDurationPerAckExtension = 10.minutes, - gcpUserAgent = GcpUserAgent("Snowplow OSS", "example-version"), - shutdownTimeout = 30.seconds + subscription = PubsubSourceConfig.Subscription("my-project", "my-subscription"), + parallelPullFactor = BigDecimal(0.5), + bufferMaxBytes = 10000000, + maxAckExtensionPeriod = 1.hour, + minDurationPerAckExtension = 1.minute, + maxDurationPerAckExtension = 10.minutes, + gcpUserAgent = GcpUserAgent("Snowplow OSS", "example-version"), + shutdownTimeout = 30.seconds, + maxPullsPerTransportChannel = 16 ) result.as[Wrapper] must beRight.like { case w: Wrapper =>