Skip to content

Commit

Permalink
Upgrade common-streams to 0.8.0-M6
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu committed Oct 16, 2024
1 parent d23d372 commit beac457
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 20 deletions.
3 changes: 3 additions & 0 deletions config/config.pubsub.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
# -- The actual value used is guided by runtime statistics collected by the pubsub client library.
"minDurationPerAckExtension": "60 seconds"
"maxDurationPerAckExtension": "600 seconds"

# -- Max num of streaming pulls to open per transport channel
"maxPullsPerTransportChannel": 16
}

"output": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ object KafkaConfigSpec {
metrics = Config.Metrics(None),
sentry = None,
healthProbe = Config.HealthProbe(port = Port.fromInt(8000).get, unhealthyLatency = 5.minutes),
webhook = Webhook.Config(endpoint = None, tags = Map.empty, heartbeat = 60.minutes)
webhook = Webhook.Config(endpoint = None, tags = Map.empty, heartbeat = 5.minutes)
),
license = AcceptedLicense(),
skipSchemas = List.empty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ object KinesisConfigSpec {
metrics = Config.Metrics(None),
sentry = None,
healthProbe = Config.HealthProbe(port = Port.fromInt(8000).get, unhealthyLatency = 5.minutes),
webhook = Webhook.Config(endpoint = None, tags = Map.empty, heartbeat = 60.minutes)
webhook = Webhook.Config(endpoint = None, tags = Map.empty, heartbeat = 5.minutes)
),
license = AcceptedLicense(),
skipSchemas = List.empty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@ class PubsubConfigSpec extends Specification with CatsEffect {
object PubsubConfigSpec {
private val minimalConfig = Config[PubsubSourceConfig, PubsubSinkConfig](
input = PubsubSourceConfig(
subscription = PubsubSourceConfig.Subscription("my-project", "snowplow-enriched"),
parallelPullFactor = BigDecimal(0.5),
bufferMaxBytes = 10000000,
maxAckExtensionPeriod = 1.hour,
minDurationPerAckExtension = 1.minute,
maxDurationPerAckExtension = 10.minutes,
gcpUserAgent = PubsubUserAgent("Snowplow OSS", "bigquery-loader"),
shutdownTimeout = 30.seconds
subscription = PubsubSourceConfig.Subscription("my-project", "snowplow-enriched"),
parallelPullFactor = BigDecimal(0.5),
bufferMaxBytes = 10000000,
maxAckExtensionPeriod = 1.hour,
minDurationPerAckExtension = 1.minute,
maxDurationPerAckExtension = 10.minutes,
gcpUserAgent = PubsubUserAgent("Snowplow OSS", "bigquery-loader"),
shutdownTimeout = 30.seconds,
maxPullsPerTransportChannel = 16
),
output = Config.Output(
good = Config.BigQuery(
Expand Down Expand Up @@ -116,7 +117,7 @@ object PubsubConfigSpec {
metrics = Config.Metrics(None),
sentry = None,
healthProbe = Config.HealthProbe(port = Port.fromInt(8000).get, unhealthyLatency = 5.minutes),
webhook = Webhook.Config(endpoint = None, tags = Map.empty, heartbeat = 60.minutes)
webhook = Webhook.Config(endpoint = None, tags = Map.empty, heartbeat = 5.minutes)
),
license = AcceptedLicense(),
skipSchemas = List.empty,
Expand All @@ -127,14 +128,15 @@ object PubsubConfigSpec {

private val extendedConfig = Config[PubsubSourceConfig, PubsubSinkConfig](
input = PubsubSourceConfig(
subscription = PubsubSourceConfig.Subscription("my-project", "snowplow-enriched"),
parallelPullFactor = BigDecimal(0.5),
bufferMaxBytes = 1000000,
maxAckExtensionPeriod = 1.hour,
minDurationPerAckExtension = 1.minute,
maxDurationPerAckExtension = 10.minutes,
gcpUserAgent = PubsubUserAgent("Snowplow OSS", "bigquery-loader"),
shutdownTimeout = 30.seconds
subscription = PubsubSourceConfig.Subscription("my-project", "snowplow-enriched"),
parallelPullFactor = BigDecimal(0.5),
bufferMaxBytes = 1000000,
maxAckExtensionPeriod = 1.hour,
minDurationPerAckExtension = 1.minute,
maxDurationPerAckExtension = 10.minutes,
gcpUserAgent = PubsubUserAgent("Snowplow OSS", "bigquery-loader"),
shutdownTimeout = 30.seconds,
maxPullsPerTransportChannel = 16
),
output = Config.Output(
good = Config.BigQuery(
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object Dependencies {
val bigquery = "2.34.2"

// Snowplow
val streams = "0.8.0-M4"
val streams = "0.8.0-M6"
val igluClient = "3.1.0"

// tests
Expand Down

0 comments on commit beac457

Please sign in to comment.