diff --git a/modules/runtime-common/src/main/resources/reference.conf b/modules/runtime-common/src/main/resources/reference.conf index e218bcc..6e1713e 100644 --- a/modules/runtime-common/src/main/resources/reference.conf +++ b/modules/runtime-common/src/main/resources/reference.conf @@ -1,4 +1,10 @@ snowplow.defaults: { + http: { + client: { + maxConnectionsPerServer: 4 + } + } + statsd: { port: 8125 tags: {} diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HttpClient.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HttpClient.scala new file mode 100644 index 0000000..51745e0 --- /dev/null +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HttpClient.scala @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.runtime + +import cats.effect.{Async, Resource} +import org.http4s.blaze.client.BlazeClientBuilder +import org.http4s.client.Client +import io.circe._ +import io.circe.generic.semiauto._ + +object HttpClient { + case class Config( + maxConnectionsPerServer: Int + ) + + object Config { + implicit def telemetryConfigDecoder: Decoder[Config] = deriveDecoder + } + + /** + * Provides a http4s Client configured appropriately for a snowplow common-streams app + * + * Blaze option `maxConnectionsPerRequestKey` is set to something small. This is required so we + * can traverse over many schemas in parallel without overwhelming any single Iglu server. + * + * Blaze options `maxTotalConnections` and `maxWaitQueueLimit` are unlimited. This is appropriate + * because common-streams apps are already naturally rate limited by the flow of events. We do not + * want exceptions for exceeding number of requests allowed in a queue. + */ + def resource[F[_]: Async](config: Config): Resource[F, Client[F]] = + BlazeClientBuilder[F] + .withMaxConnectionsPerRequestKey(Function.const(config.maxConnectionsPerServer)) + .withMaxTotalConnections(Int.MaxValue) + .withMaxWaitQueueLimit(Int.MaxValue) + .resource +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index f9233a9..69bf274 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -16,7 +16,8 @@ object Dependencies { val catsRetry = "3.1.3" val fs2 = "3.10.0" val log4cats = "2.6.0" - val http4s = "0.23.26" + val http4s = "0.23.28" + val blaze = "0.23.16" val decline = "2.4.1" val circe = "0.14.6" val circeExtra = "0.14.3" @@ -59,6 +60,7 @@ object Dependencies { val catsRetry = "com.github.cb372" %% "cats-retry" % V.catsRetry val emberServer = "org.http4s" %% "http4s-ember-server" % V.http4s val http4sCirce = "org.http4s" %% "http4s-circe" % V.http4s + val blazeClient = "org.http4s" %% "http4s-blaze-client" % V.blaze val decline = "com.monovore" %% "decline-effect" % V.decline val circeConfig = "io.circe" %% "circe-config" % V.circeConfig val circeGeneric = "io.circe" %% "circe-generic" % V.circe @@ -164,6 +166,7 @@ object Dependencies { ) val runtimeCommonDependencies = Seq( + blazeClient, cats, catsEffectKernel, catsRetry,