Skip to content

Commit

Permalink
Upgrade common-streams to 0.8.0-M4 (#383)
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu authored Sep 16, 2024
1 parent 32a1a0e commit 5d5d7f5
Show file tree
Hide file tree
Showing 11 changed files with 50 additions and 25 deletions.
7 changes: 7 additions & 0 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@
# -- Change to `false` so events go the failed events stream instead of crashing the loader.
"exitOnMissingIgluSchema": true

# -- Configuration of internal http client used for iglu resolver, alerts and telemetry
"http": {
"client": {
"maxConnectionsPerServer": 4
}
}

"monitoring": {
"metrics": {

Expand Down
12 changes: 7 additions & 5 deletions config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@
# -- Only used if retrieval mode is type Polling. How many events the client may fetch in a single poll.
"maxRecords": 1000
}

# -- The number of batches of events which are pre-fetched from kinesis.
# -- Increasing this above 1 is not known to improve performance.
"bufferSize": 1

}

"output": {
Expand Down Expand Up @@ -139,6 +134,13 @@
# -- Change to `false` so events go the failed events stream instead of crashing the loader.
"exitOnMissingIgluSchema": true

# -- Configuration of internal http client used for iglu resolver, alerts and telemetry
"http": {
"client": {
"maxConnectionsPerServer": 4
}
}

"monitoring": {
"metrics": {

Expand Down
7 changes: 7 additions & 0 deletions config/config.pubsub.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@
# -- indicates an error that needs addressing.
# -- Change to `false` so events go the failed events stream instead of crashing the loader.
"exitOnMissingIgluSchema": true

# -- Configuration of internal http client used for iglu resolver, alerts and telemetry
"http": {
"client": {
"maxConnectionsPerServer": 4
}
}

"monitoring": {
"metrics": {
Expand Down
4 changes: 4 additions & 0 deletions modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
"legacyColumns": []
"exitOnMissingIgluSchema": true

"http": {
"client": ${snowplow.defaults.http.client}
}

"monitoring": {
"metrics": {
"statsd": ${snowplow.defaults.statsd}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import com.comcast.ip4s.Port
import scala.concurrent.duration.FiniteDuration
import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig
import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, Metrics => CommonMetrics, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, HttpClient, Metrics => CommonMetrics, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs.schemaCriterionDecoder
import com.snowplowanalytics.snowplow.runtime.HealthProbe.decoders._

Expand All @@ -32,7 +32,8 @@ case class Config[+Source, +Sink](
license: AcceptedLicense,
skipSchemas: List[SchemaCriterion],
legacyColumns: List[SchemaCriterion],
exitOnMissingIgluSchema: Boolean
exitOnMissingIgluSchema: Boolean,
http: Config.Http
)

object Config {
Expand Down Expand Up @@ -91,6 +92,8 @@ object Config {
tooManyColumns: TooManyColumnsRetries
)

case class Http(client: HttpClient.Config)

implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = {
implicit val configuration = Configuration.default.withDiscriminator("type")
implicit val sinkWithMaxSize = for {
Expand All @@ -114,6 +117,7 @@ object Config {
implicit val alterTableRetries = deriveConfiguredDecoder[AlterTableWaitRetries]
implicit val tooManyColsRetries = deriveConfiguredDecoder[TooManyColumnsRetries]
implicit val retriesDecoder = deriveConfiguredDecoder[Retries]
implicit val httpDecoder = deriveConfiguredDecoder[Http]

// TODO add bigquery docs
implicit val licenseDecoder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ package com.snowplowanalytics.snowplow.bigquery

import cats.implicits._
import cats.effect.{Async, Resource, Sync}
import cats.effect.unsafe.implicits.global
import org.http4s.client.Client
import org.http4s.blaze.client.BlazeClientBuilder
import io.sentry.Sentry
import retry.RetryPolicy

Expand All @@ -21,7 +19,7 @@ import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.sources.SourceAndAck
import com.snowplowanalytics.snowplow.sinks.Sink
import com.snowplowanalytics.snowplow.bigquery.processing.{BigQueryRetrying, BigQueryUtils, TableManager, Writer}
import com.snowplowanalytics.snowplow.runtime.{AppHealth, AppInfo, HealthProbe, Webhook}
import com.snowplowanalytics.snowplow.runtime.{AppHealth, AppInfo, HealthProbe, HttpClient, Webhook}

case class Environment[F[_]](
appInfo: AppInfo,
Expand Down Expand Up @@ -55,7 +53,7 @@ object Environment {
sourceReporter = sourceAndAck.isHealthy(config.main.monitoring.healthProbe.unhealthyLatency).map(_.showIfUnhealthy)
appHealth <- Resource.eval(AppHealth.init[F, Alert, RuntimeService](List(sourceReporter)))
resolver <- mkResolver[F](config.iglu)
httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource
httpClient <- HttpClient.resource[F](config.main.http.client)
_ <- HealthProbe.resource(config.main.monitoring.healthProbe.port, appHealth)
_ <- Webhook.resource(config.main.monitoring.webhook, appInfo, httpClient, appHealth)
badSink <-
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ object Processing {
}

/** Transform the Event into values compatible with the BigQuery sdk */
private def transform[F[_]: Sync: RegistryLookup](
private def transform[F[_]: Async: RegistryLookup](
env: Environment[F],
badProcessor: BadRowProcessor
): Pipe[F, Batched, BatchAfterTransform] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import com.comcast.ip4s.Port
import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.bigquery.Config.GcpUserAgent
import com.snowplowanalytics.snowplow.runtime.Metrics.StatsdConfig
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, ConfigParser, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, ConfigParser, HttpClient, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.sinks.kafka.KafkaSinkConfig
import com.snowplowanalytics.snowplow.sources.kafka.KafkaSourceConfig
import org.http4s.implicits.http4sLiteralsSyntax
Expand Down Expand Up @@ -126,7 +126,8 @@ object KafkaConfigSpec {
license = AcceptedLicense(),
skipSchemas = List.empty,
legacyColumns = List.empty,
exitOnMissingIgluSchema = true
exitOnMissingIgluSchema = true,
http = Config.Http(HttpClient.Config(4))
)

private val extendedConfig = Config[KafkaSourceConfig, KafkaSinkConfig](
Expand Down Expand Up @@ -219,6 +220,7 @@ object KafkaConfigSpec {
SchemaCriterion.parse("iglu:com.acme/legacy/jsonschema/1-*-*").get,
SchemaCriterion.parse("iglu:com.acme/legacy/jsonschema/2-*-*").get
),
exitOnMissingIgluSchema = true
exitOnMissingIgluSchema = true,
http = Config.Http(HttpClient.Config(4))
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ import com.comcast.ip4s.Port
import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.bigquery.Config.GcpUserAgent
import com.snowplowanalytics.snowplow.runtime.Metrics.StatsdConfig
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, ConfigParser, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, ConfigParser, HttpClient, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.sinks.kinesis.{BackoffPolicy, KinesisSinkConfig}
import com.snowplowanalytics.snowplow.sources.kinesis.KinesisSourceConfig
import eu.timepit.refined.types.all.PosInt
import org.http4s.implicits.http4sLiteralsSyntax
import org.specs2.Specification

Expand Down Expand Up @@ -67,7 +66,6 @@ object KinesisConfigSpec {
workerIdentifier = "test-hostname",
initialPosition = KinesisSourceConfig.InitialPosition.Latest,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
bufferSize = PosInt.unsafeFrom(1),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
Expand Down Expand Up @@ -124,7 +122,8 @@ object KinesisConfigSpec {
license = AcceptedLicense(),
skipSchemas = List.empty,
legacyColumns = List.empty,
exitOnMissingIgluSchema = true
exitOnMissingIgluSchema = true,
http = Config.Http(HttpClient.Config(4))
)

// workerIdentifer coming from "HOSTNAME" env variable set in BuildSettings
Expand All @@ -135,7 +134,6 @@ object KinesisConfigSpec {
workerIdentifier = "test-hostname",
initialPosition = KinesisSourceConfig.InitialPosition.TrimHorizon,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
bufferSize = PosInt.unsafeFrom(1),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
Expand Down Expand Up @@ -214,6 +212,7 @@ object KinesisConfigSpec {
SchemaCriterion.parse("iglu:com.acme/legacy/jsonschema/1-*-*").get,
SchemaCriterion.parse("iglu:com.acme/legacy/jsonschema/2-*-*").get
),
exitOnMissingIgluSchema = true
exitOnMissingIgluSchema = true,
http = Config.Http(HttpClient.Config(4))
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.bigquery.Config.GcpUserAgent
import com.snowplowanalytics.snowplow.pubsub.{GcpUserAgent => PubsubUserAgent}
import com.snowplowanalytics.snowplow.runtime.Metrics.StatsdConfig
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, ConfigParser, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, ConfigParser, HttpClient, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.sinks.pubsub.PubsubSinkConfig
import com.snowplowanalytics.snowplow.sources.pubsub.PubsubSourceConfig
import org.http4s.implicits.http4sLiteralsSyntax
Expand Down Expand Up @@ -121,7 +121,8 @@ object PubsubConfigSpec {
license = AcceptedLicense(),
skipSchemas = List.empty,
legacyColumns = List.empty,
exitOnMissingIgluSchema = true
exitOnMissingIgluSchema = true,
http = Config.Http(HttpClient.Config(4))
)

private val extendedConfig = Config[PubsubSourceConfig, PubsubSinkConfig](
Expand Down Expand Up @@ -207,6 +208,7 @@ object PubsubConfigSpec {
SchemaCriterion.parse("iglu:com.acme/legacy/jsonschema/1-*-*").get,
SchemaCriterion.parse("iglu:com.acme/legacy/jsonschema/2-*-*").get
),
exitOnMissingIgluSchema = true
exitOnMissingIgluSchema = true,
http = Config.Http(HttpClient.Config(4))
)
}
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object Dependencies {
val bigquery = "2.34.2"

// Snowplow
val streams = "0.8.0-M2"
val streams = "0.8.0-M4"
val igluClient = "3.1.0"

// tests
Expand Down

0 comments on commit 5d5d7f5

Please sign in to comment.