diff --git a/modules/runtime-common/src/main/resources/reference.conf b/modules/runtime-common/src/main/resources/reference.conf index e218bcc0..6e1713e6 100644 --- a/modules/runtime-common/src/main/resources/reference.conf +++ b/modules/runtime-common/src/main/resources/reference.conf @@ -1,4 +1,10 @@ snowplow.defaults: { + http: { + client: { + maxConnectionsPerServer: 4 + } + } + statsd: { port: 8125 tags: {} diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HttpClient.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HttpClient.scala new file mode 100644 index 00000000..51745e09 --- /dev/null +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HttpClient.scala @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.runtime + +import cats.effect.{Async, Resource} +import org.http4s.blaze.client.BlazeClientBuilder +import org.http4s.client.Client +import io.circe._ +import io.circe.generic.semiauto._ + +object HttpClient { + case class Config( + maxConnectionsPerServer: Int + ) + + object Config { + implicit def telemetryConfigDecoder: Decoder[Config] = deriveDecoder + } + + /** + * Provides a http4s Client configured appropriately for a snowplow common-streams app + * + * Blaze option `maxConnectionsPerRequestKey` is set to something small. This is required so we + * can traverse over many schemas in parallel without overwhelming any single Iglu server. + * + * Blaze options `maxTotalConnections` and `maxWaitQueueLimit` are unlimited. This is appropriate + * because common-streams apps are already naturally rate limited by the flow of events. We do not + * want exceptions for exceeding number of requests allowed in a queue. + */ + def resource[F[_]: Async](config: Config): Resource[F, Client[F]] = + BlazeClientBuilder[F] + .withMaxConnectionsPerRequestKey(Function.const(config.maxConnectionsPerServer)) + .withMaxTotalConnections(Int.MaxValue) + .withMaxWaitQueueLimit(Int.MaxValue) + .resource +} diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Telemetry.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Telemetry.scala index 807bee92..fd3ba850 100644 --- a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Telemetry.scala +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Telemetry.scala @@ -19,7 +19,7 @@ import io.circe._ import io.circe.syntax._ import io.circe.config.syntax._ import io.circe.generic.semiauto._ -import org.http4s.client.{Client => HttpClient} +import org.http4s.client.{Client => Http4sClient} import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger @@ -52,7 +52,7 @@ object Telemetry { def stream[F[_]: Async]( config: Config, appInfo: AppInfo, - httpClient: HttpClient[F] + httpClient: Http4sClient[F] ): Stream[F, Nothing] = if (config.disable) Stream.never @@ -72,7 +72,7 @@ object Telemetry { private def initTracker[F[_]: Async]( config: Config, appName: String, - client: HttpClient[F] + client: Http4sClient[F] ): Resource[F, Tracker[F]] = for { implicit0(random: Random[F]) <- Resource.eval(Random.scalaUtilRandom) diff --git a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/ConfigParserSpec.scala b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/ConfigParserSpec.scala index 3840aadf..c63ffb2e 100644 --- a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/ConfigParserSpec.scala +++ b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/ConfigParserSpec.scala @@ -154,7 +154,7 @@ class ConfigParserSpec extends Specification with CatsEffect { } def configMissingField = { - val expected = "Cannot resolve config: DecodingFailure at .field41: Missing required field" + val expected = "Cannot resolve config: DecodingFailure at .field4.field41: Missing required field" val path = Paths.get("src/test/resources/config_parser_test/config_missing_field.hocon") ConfigParser .configFromFile[IO, TestConfig](path) diff --git a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/MetricsConfigSpec.scala b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/MetricsConfigSpec.scala index ed93bc1f..6ab23d9c 100644 --- a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/MetricsConfigSpec.scala +++ b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/MetricsConfigSpec.scala @@ -132,7 +132,7 @@ class MetricsConfigSpec extends Specification { val result = ConfigFactory.load(ConfigFactory.parseString(input)) result.as[StatsdWrapper] must beLeft.like { case e: DecodingFailure => - e.show must beEqualTo("DecodingFailure at .prefix: Missing required field") + e.show must beEqualTo("DecodingFailure at .xyz.prefix: Missing required field") } } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index f9233a97..57eb8182 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -16,10 +16,11 @@ object Dependencies { val catsRetry = "3.1.3" val fs2 = "3.10.0" val log4cats = "2.6.0" - val http4s = "0.23.26" + val http4s = "0.23.28" + val blaze = "0.23.16" val decline = "2.4.1" - val circe = "0.14.6" - val circeExtra = "0.14.3" + val circe = "0.14.8" + val circeExtra = "0.14.4" val circeConfig = "0.10.1" val betterMonadicFor = "0.3.1" val kindProjector = "0.13.2" @@ -59,6 +60,7 @@ object Dependencies { val catsRetry = "com.github.cb372" %% "cats-retry" % V.catsRetry val emberServer = "org.http4s" %% "http4s-ember-server" % V.http4s val http4sCirce = "org.http4s" %% "http4s-circe" % V.http4s + val blazeClient = "org.http4s" %% "http4s-blaze-client" % V.blaze val decline = "com.monovore" %% "decline-effect" % V.decline val circeConfig = "io.circe" %% "circe-config" % V.circeConfig val circeGeneric = "io.circe" %% "circe-generic" % V.circe @@ -164,6 +166,7 @@ object Dependencies { ) val runtimeCommonDependencies = Seq( + blazeClient, cats, catsEffectKernel, catsRetry,