diff --git a/config/config.azure.reference.hocon b/config/config.azure.reference.hocon index 9545c131..bf5ffb10 100644 --- a/config/config.azure.reference.hocon +++ b/config/config.azure.reference.hocon @@ -117,6 +117,13 @@ # -- Change to `false` so events go the failed events stream instead of crashing the loader. "exitOnMissingIgluSchema": true + # -- Configuration of internal http client used for iglu resolver, alerts and telemetry + "http": { + "client": { + "maxConnectionsPerServer": 4 + } + } + "monitoring": { "metrics": { diff --git a/config/config.kinesis.reference.hocon b/config/config.kinesis.reference.hocon index 4456af30..5dabbfe2 100644 --- a/config/config.kinesis.reference.hocon +++ b/config/config.kinesis.reference.hocon @@ -31,11 +31,6 @@ # -- Only used if retrieval mode is type Polling. How many events the client may fetch in a single poll. "maxRecords": 1000 } - - # -- The number of batches of events which are pre-fetched from kinesis. - # -- Increasing this above 1 is not known to improve performance. - "bufferSize": 1 - } "output": { @@ -139,6 +134,13 @@ # -- Change to `false` so events go the failed events stream instead of crashing the loader. "exitOnMissingIgluSchema": true + # -- Configuration of internal http client used for iglu resolver, alerts and telemetry + "http": { + "client": { + "maxConnectionsPerServer": 4 + } + } + "monitoring": { "metrics": { diff --git a/config/config.pubsub.reference.hocon b/config/config.pubsub.reference.hocon index d4710624..de678752 100644 --- a/config/config.pubsub.reference.hocon +++ b/config/config.pubsub.reference.hocon @@ -119,6 +119,13 @@ # -- indicates an error that needs addressing. # -- Change to `false` so events go the failed events stream instead of crashing the loader. "exitOnMissingIgluSchema": true + + # -- Configuration of internal http client used for iglu resolver, alerts and telemetry + "http": { + "client": { + "maxConnectionsPerServer": 4 + } + } "monitoring": { "metrics": { diff --git a/modules/core/src/main/resources/reference.conf b/modules/core/src/main/resources/reference.conf index 13468414..50b93bbf 100644 --- a/modules/core/src/main/resources/reference.conf +++ b/modules/core/src/main/resources/reference.conf @@ -44,6 +44,10 @@ "legacyColumns": [] "exitOnMissingIgluSchema": true + "http": { + "client": ${snowplow.defaults.http.client} + } + "monitoring": { "metrics": { "statsd": ${snowplow.defaults.statsd} diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Config.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Config.scala index ab7a30f5..4895ef27 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Config.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Config.scala @@ -18,7 +18,7 @@ import com.comcast.ip4s.Port import scala.concurrent.duration.FiniteDuration import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig import com.snowplowanalytics.iglu.core.SchemaCriterion -import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, Metrics => CommonMetrics, Retrying, Telemetry, Webhook} +import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, HttpClient, Metrics => CommonMetrics, Retrying, Telemetry, Webhook} import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs.schemaCriterionDecoder import com.snowplowanalytics.snowplow.runtime.HealthProbe.decoders._ @@ -32,7 +32,8 @@ case class Config[+Source, +Sink]( license: AcceptedLicense, skipSchemas: List[SchemaCriterion], legacyColumns: List[SchemaCriterion], - exitOnMissingIgluSchema: Boolean + exitOnMissingIgluSchema: Boolean, + http: Config.Http ) object Config { @@ -91,6 +92,8 @@ object Config { tooManyColumns: TooManyColumnsRetries ) + case class Http(client: HttpClient.Config) + implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = { implicit val configuration = Configuration.default.withDiscriminator("type") implicit val sinkWithMaxSize = for { @@ -114,6 +117,7 @@ object Config { implicit val alterTableRetries = deriveConfiguredDecoder[AlterTableWaitRetries] implicit val tooManyColsRetries = deriveConfiguredDecoder[TooManyColumnsRetries] implicit val retriesDecoder = deriveConfiguredDecoder[Retries] + implicit val httpDecoder = deriveConfiguredDecoder[Http] // TODO add bigquery docs implicit val licenseDecoder = diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Environment.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Environment.scala index b3ba9f4a..89654cfa 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Environment.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Environment.scala @@ -10,9 +10,7 @@ package com.snowplowanalytics.snowplow.bigquery import cats.implicits._ import cats.effect.{Async, Resource, Sync} -import cats.effect.unsafe.implicits.global import org.http4s.client.Client -import org.http4s.blaze.client.BlazeClientBuilder import io.sentry.Sentry import retry.RetryPolicy @@ -21,7 +19,7 @@ import com.snowplowanalytics.iglu.core.SchemaCriterion import com.snowplowanalytics.snowplow.sources.SourceAndAck import com.snowplowanalytics.snowplow.sinks.Sink import com.snowplowanalytics.snowplow.bigquery.processing.{BigQueryRetrying, BigQueryUtils, TableManager, Writer} -import com.snowplowanalytics.snowplow.runtime.{AppHealth, AppInfo, HealthProbe, Webhook} +import com.snowplowanalytics.snowplow.runtime.{AppHealth, AppInfo, HealthProbe, HttpClient, Webhook} case class Environment[F[_]]( appInfo: AppInfo, @@ -55,7 +53,7 @@ object Environment { sourceReporter = sourceAndAck.isHealthy(config.main.monitoring.healthProbe.unhealthyLatency).map(_.showIfUnhealthy) appHealth <- Resource.eval(AppHealth.init[F, Alert, RuntimeService](List(sourceReporter))) resolver <- mkResolver[F](config.iglu) - httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource + httpClient <- HttpClient.resource[F](config.main.http.client) _ <- HealthProbe.resource(config.main.monitoring.healthProbe.port, appHealth) _ <- Webhook.resource(config.main.monitoring.webhook, appInfo, httpClient, appHealth) badSink <- diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala index 355308d4..07452e24 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala @@ -141,7 +141,7 @@ object Processing { } /** Transform the Event into values compatible with the BigQuery sdk */ - private def transform[F[_]: Sync: RegistryLookup]( + private def transform[F[_]: Async: RegistryLookup]( env: Environment[F], badProcessor: BadRowProcessor ): Pipe[F, Batched, BatchAfterTransform] = diff --git a/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/bigquery/KafkaConfigSpec.scala b/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/bigquery/KafkaConfigSpec.scala index 449d6b54..a4ee54d5 100644 --- a/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/bigquery/KafkaConfigSpec.scala +++ b/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/bigquery/KafkaConfigSpec.scala @@ -17,7 +17,7 @@ import com.comcast.ip4s.Port import com.snowplowanalytics.iglu.core.SchemaCriterion import com.snowplowanalytics.snowplow.bigquery.Config.GcpUserAgent import com.snowplowanalytics.snowplow.runtime.Metrics.StatsdConfig -import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, ConfigParser, Retrying, Telemetry, Webhook} +import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, ConfigParser, HttpClient, Retrying, Telemetry, Webhook} import com.snowplowanalytics.snowplow.sinks.kafka.KafkaSinkConfig import com.snowplowanalytics.snowplow.sources.kafka.KafkaSourceConfig import org.http4s.implicits.http4sLiteralsSyntax @@ -126,7 +126,8 @@ object KafkaConfigSpec { license = AcceptedLicense(), skipSchemas = List.empty, legacyColumns = List.empty, - exitOnMissingIgluSchema = true + exitOnMissingIgluSchema = true, + http = Config.Http(HttpClient.Config(4)) ) private val extendedConfig = Config[KafkaSourceConfig, KafkaSinkConfig]( @@ -219,6 +220,7 @@ object KafkaConfigSpec { SchemaCriterion.parse("iglu:com.acme/legacy/jsonschema/1-*-*").get, SchemaCriterion.parse("iglu:com.acme/legacy/jsonschema/2-*-*").get ), - exitOnMissingIgluSchema = true + exitOnMissingIgluSchema = true, + http = Config.Http(HttpClient.Config(4)) ) } diff --git a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/bigquery/KinesisConfigSpec.scala b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/bigquery/KinesisConfigSpec.scala index 20e52909..6eda0025 100644 --- a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/bigquery/KinesisConfigSpec.scala +++ b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/bigquery/KinesisConfigSpec.scala @@ -17,10 +17,9 @@ import com.comcast.ip4s.Port import com.snowplowanalytics.iglu.core.SchemaCriterion import com.snowplowanalytics.snowplow.bigquery.Config.GcpUserAgent import com.snowplowanalytics.snowplow.runtime.Metrics.StatsdConfig -import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, ConfigParser, Retrying, Telemetry, Webhook} +import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, ConfigParser, HttpClient, Retrying, Telemetry, Webhook} import com.snowplowanalytics.snowplow.sinks.kinesis.{BackoffPolicy, KinesisSinkConfig} import com.snowplowanalytics.snowplow.sources.kinesis.KinesisSourceConfig -import eu.timepit.refined.types.all.PosInt import org.http4s.implicits.http4sLiteralsSyntax import org.specs2.Specification @@ -67,7 +66,6 @@ object KinesisConfigSpec { workerIdentifier = "test-hostname", initialPosition = KinesisSourceConfig.InitialPosition.Latest, retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000), - bufferSize = PosInt.unsafeFrom(1), customEndpoint = None, dynamodbCustomEndpoint = None, cloudwatchCustomEndpoint = None, @@ -124,7 +122,8 @@ object KinesisConfigSpec { license = AcceptedLicense(), skipSchemas = List.empty, legacyColumns = List.empty, - exitOnMissingIgluSchema = true + exitOnMissingIgluSchema = true, + http = Config.Http(HttpClient.Config(4)) ) // workerIdentifer coming from "HOSTNAME" env variable set in BuildSettings @@ -135,7 +134,6 @@ object KinesisConfigSpec { workerIdentifier = "test-hostname", initialPosition = KinesisSourceConfig.InitialPosition.TrimHorizon, retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000), - bufferSize = PosInt.unsafeFrom(1), customEndpoint = None, dynamodbCustomEndpoint = None, cloudwatchCustomEndpoint = None, @@ -214,6 +212,7 @@ object KinesisConfigSpec { SchemaCriterion.parse("iglu:com.acme/legacy/jsonschema/1-*-*").get, SchemaCriterion.parse("iglu:com.acme/legacy/jsonschema/2-*-*").get ), - exitOnMissingIgluSchema = true + exitOnMissingIgluSchema = true, + http = Config.Http(HttpClient.Config(4)) ) } diff --git a/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/bigquery/PubsubConfigSpec.scala b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/bigquery/PubsubConfigSpec.scala index 2bfacfb6..a94b9524 100644 --- a/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/bigquery/PubsubConfigSpec.scala +++ b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/bigquery/PubsubConfigSpec.scala @@ -18,7 +18,7 @@ import com.snowplowanalytics.iglu.core.SchemaCriterion import com.snowplowanalytics.snowplow.bigquery.Config.GcpUserAgent import com.snowplowanalytics.snowplow.pubsub.{GcpUserAgent => PubsubUserAgent} import com.snowplowanalytics.snowplow.runtime.Metrics.StatsdConfig -import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, ConfigParser, Retrying, Telemetry, Webhook} +import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, ConfigParser, HttpClient, Retrying, Telemetry, Webhook} import com.snowplowanalytics.snowplow.sinks.pubsub.PubsubSinkConfig import com.snowplowanalytics.snowplow.sources.pubsub.PubsubSourceConfig import org.http4s.implicits.http4sLiteralsSyntax @@ -121,7 +121,8 @@ object PubsubConfigSpec { license = AcceptedLicense(), skipSchemas = List.empty, legacyColumns = List.empty, - exitOnMissingIgluSchema = true + exitOnMissingIgluSchema = true, + http = Config.Http(HttpClient.Config(4)) ) private val extendedConfig = Config[PubsubSourceConfig, PubsubSinkConfig]( @@ -207,6 +208,7 @@ object PubsubConfigSpec { SchemaCriterion.parse("iglu:com.acme/legacy/jsonschema/1-*-*").get, SchemaCriterion.parse("iglu:com.acme/legacy/jsonschema/2-*-*").get ), - exitOnMissingIgluSchema = true + exitOnMissingIgluSchema = true, + http = Config.Http(HttpClient.Config(4)) ) } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 7657bcbe..c0c3cb97 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -29,7 +29,7 @@ object Dependencies { val bigquery = "2.34.2" // Snowplow - val streams = "0.8.0-M2" + val streams = "0.8.0-M4" val igluClient = "3.1.0" // tests