From beac4573c823fbe4e40381a63b4cdb9538a8c898 Mon Sep 17 00:00:00 2001 From: Oguzhan Unlu Date: Wed, 16 Oct 2024 12:14:40 +0300 Subject: [PATCH] Upgrade common-streams to 0.8.0-M6 --- config/config.pubsub.reference.hocon | 3 ++ .../snowplow/bigquery/KafkaConfigSpec.scala | 2 +- .../snowplow/bigquery/KinesisConfigSpec.scala | 2 +- .../snowplow/bigquery/PubsubConfigSpec.scala | 36 ++++++++++--------- project/Dependencies.scala | 2 +- 5 files changed, 25 insertions(+), 20 deletions(-) diff --git a/config/config.pubsub.reference.hocon b/config/config.pubsub.reference.hocon index de678752..475f9deb 100644 --- a/config/config.pubsub.reference.hocon +++ b/config/config.pubsub.reference.hocon @@ -24,6 +24,9 @@ # -- The actual value used is guided by runtime statistics collected by the pubsub client library. "minDurationPerAckExtension": "60 seconds" "maxDurationPerAckExtension": "600 seconds" + + # -- Max num of streaming pulls to open per transport channel + "maxPullsPerTransportChannel": 16 } "output": { diff --git a/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/bigquery/KafkaConfigSpec.scala b/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/bigquery/KafkaConfigSpec.scala index a4ee54d5..70ef6e4b 100644 --- a/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/bigquery/KafkaConfigSpec.scala +++ b/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/bigquery/KafkaConfigSpec.scala @@ -121,7 +121,7 @@ object KafkaConfigSpec { metrics = Config.Metrics(None), sentry = None, healthProbe = Config.HealthProbe(port = Port.fromInt(8000).get, unhealthyLatency = 5.minutes), - webhook = Webhook.Config(endpoint = None, tags = Map.empty, heartbeat = 60.minutes) + webhook = Webhook.Config(endpoint = None, tags = Map.empty, heartbeat = 5.minutes) ), license = AcceptedLicense(), skipSchemas = List.empty, diff --git a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/bigquery/KinesisConfigSpec.scala b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/bigquery/KinesisConfigSpec.scala index 6eda0025..03270401 100644 --- a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/bigquery/KinesisConfigSpec.scala +++ b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/bigquery/KinesisConfigSpec.scala @@ -117,7 +117,7 @@ object KinesisConfigSpec { metrics = Config.Metrics(None), sentry = None, healthProbe = Config.HealthProbe(port = Port.fromInt(8000).get, unhealthyLatency = 5.minutes), - webhook = Webhook.Config(endpoint = None, tags = Map.empty, heartbeat = 60.minutes) + webhook = Webhook.Config(endpoint = None, tags = Map.empty, heartbeat = 5.minutes) ), license = AcceptedLicense(), skipSchemas = List.empty, diff --git a/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/bigquery/PubsubConfigSpec.scala b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/bigquery/PubsubConfigSpec.scala index a94b9524..dcc2637e 100644 --- a/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/bigquery/PubsubConfigSpec.scala +++ b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/bigquery/PubsubConfigSpec.scala @@ -62,14 +62,15 @@ class PubsubConfigSpec extends Specification with CatsEffect { object PubsubConfigSpec { private val minimalConfig = Config[PubsubSourceConfig, PubsubSinkConfig]( input = PubsubSourceConfig( - subscription = PubsubSourceConfig.Subscription("my-project", "snowplow-enriched"), - parallelPullFactor = BigDecimal(0.5), - bufferMaxBytes = 10000000, - maxAckExtensionPeriod = 1.hour, - minDurationPerAckExtension = 1.minute, - maxDurationPerAckExtension = 10.minutes, - gcpUserAgent = PubsubUserAgent("Snowplow OSS", "bigquery-loader"), - shutdownTimeout = 30.seconds + subscription = PubsubSourceConfig.Subscription("my-project", "snowplow-enriched"), + parallelPullFactor = BigDecimal(0.5), + bufferMaxBytes = 10000000, + maxAckExtensionPeriod = 1.hour, + minDurationPerAckExtension = 1.minute, + maxDurationPerAckExtension = 10.minutes, + gcpUserAgent = PubsubUserAgent("Snowplow OSS", "bigquery-loader"), + shutdownTimeout = 30.seconds, + maxPullsPerTransportChannel = 16 ), output = Config.Output( good = Config.BigQuery( @@ -116,7 +117,7 @@ object PubsubConfigSpec { metrics = Config.Metrics(None), sentry = None, healthProbe = Config.HealthProbe(port = Port.fromInt(8000).get, unhealthyLatency = 5.minutes), - webhook = Webhook.Config(endpoint = None, tags = Map.empty, heartbeat = 60.minutes) + webhook = Webhook.Config(endpoint = None, tags = Map.empty, heartbeat = 5.minutes) ), license = AcceptedLicense(), skipSchemas = List.empty, @@ -127,14 +128,15 @@ object PubsubConfigSpec { private val extendedConfig = Config[PubsubSourceConfig, PubsubSinkConfig]( input = PubsubSourceConfig( - subscription = PubsubSourceConfig.Subscription("my-project", "snowplow-enriched"), - parallelPullFactor = BigDecimal(0.5), - bufferMaxBytes = 1000000, - maxAckExtensionPeriod = 1.hour, - minDurationPerAckExtension = 1.minute, - maxDurationPerAckExtension = 10.minutes, - gcpUserAgent = PubsubUserAgent("Snowplow OSS", "bigquery-loader"), - shutdownTimeout = 30.seconds + subscription = PubsubSourceConfig.Subscription("my-project", "snowplow-enriched"), + parallelPullFactor = BigDecimal(0.5), + bufferMaxBytes = 1000000, + maxAckExtensionPeriod = 1.hour, + minDurationPerAckExtension = 1.minute, + maxDurationPerAckExtension = 10.minutes, + gcpUserAgent = PubsubUserAgent("Snowplow OSS", "bigquery-loader"), + shutdownTimeout = 30.seconds, + maxPullsPerTransportChannel = 16 ), output = Config.Output( good = Config.BigQuery( diff --git a/project/Dependencies.scala b/project/Dependencies.scala index c0c3cb97..c4016142 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -29,7 +29,7 @@ object Dependencies { val bigquery = "2.34.2" // Snowplow - val streams = "0.8.0-M4" + val streams = "0.8.0-M6" val igluClient = "3.1.0" // tests