diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index e4cdd9b7e..34d70f8ba 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -94,6 +94,8 @@ networking { maxConnections = 1024 idleTimeout = 610 seconds + responseHeaderTimeout = 2 seconds + bodyReadTimeout = 500 millis } enableDefaultRedirect = false diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala index e087e05f9..e09577b8c 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala @@ -158,7 +158,9 @@ object Config { case class Networking( maxConnections: Int, - idleTimeout: FiniteDuration + idleTimeout: FiniteDuration, + responseHeaderTimeout: FiniteDuration, + bodyReadTimeout: FiniteDuration ) case class License( diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala index 4a2e46db7..83b154247 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala @@ -80,6 +80,7 @@ object HttpServer { .withHttpApp(hstsMiddleware(hsts, routes.orNotFound)) .withIdleTimeout(networking.idleTimeout) .withMaxConnections(networking.maxConnections) + .withResponseHeaderTimeout(networking.responseHeaderTimeout) .cond(secure, _.withSslContext(SSLContext.getDefault)) .resource diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Routes.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Routes.scala index 1cb21b0e0..59cec534b 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Routes.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Routes.scala @@ -10,17 +10,19 @@ */ package com.snowplowanalytics.snowplow.collector.core +import scala.concurrent.duration.FiniteDuration import cats.implicits._ -import cats.effect.Sync +import cats.effect.{Async, Sync} import org.http4s._ import org.http4s.dsl.Http4sDsl import org.http4s.implicits._ import com.comcast.ip4s.Dns -class Routes[F[_]: Sync]( +class Routes[F[_]: Async]( enableDefaultRedirect: Boolean, enableRootResponse: Boolean, enableCrossdomainTracking: Boolean, + bodyReadTimeout: FiniteDuration, service: IService[F] ) extends Http4sDsl[F] { @@ -49,7 +51,7 @@ class Routes[F[_]: Sync]( case req @ POST -> Root / vendor / version => val path = service.determinePath(vendor, version) service.cookie( - body = req.bodyText.compile.string.map(Some(_)), + body = req.bodyText.through(Streams.timeoutOnIdle(bodyReadTimeout)).compile.string.map(Some(_)), path = path, request = req, pixelExpected = false, diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala index 7fbbb7dab..24765102f 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala @@ -98,6 +98,7 @@ object Run { config.enableDefaultRedirect, config.rootResponse.enabled, config.crossDomain.enabled, + config.networking.responseHeaderTimeout, collectorService ).value, if (config.ssl.enable) config.ssl.port else config.port, diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Streams.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Streams.scala new file mode 100644 index 000000000..43f871396 --- /dev/null +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Streams.scala @@ -0,0 +1,29 @@ +/** + * Copyright (c) 2013-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ +package com.snowplowanalytics.snowplow.collector.core + +import scala.concurrent.duration.FiniteDuration +import cats.effect.Async +import fs2.{Pipe, Pull} + +object Streams { + def timeoutOnIdle[F[_]: Async, A](duration: FiniteDuration): Pipe[F, A, A] = + _.pull.timed { timedPull => + def go(timedPull: Pull.Timed[F, A]): Pull[F, A, Unit] = + timedPull.timeout(duration) >> + timedPull.uncons.flatMap { + case Some((Right(elems), next)) => Pull.output(elems) >> go(next) + case _ => Pull.done + } + + go(timedPull) + }.stream +} diff --git a/core/src/test/scala/com.snowplowanalytics.snowplow.collector.core/RoutesSpec.scala b/core/src/test/scala/com.snowplowanalytics.snowplow.collector.core/RoutesSpec.scala index d75249b8b..2df2627f8 100644 --- a/core/src/test/scala/com.snowplowanalytics.snowplow.collector.core/RoutesSpec.scala +++ b/core/src/test/scala/com.snowplowanalytics.snowplow.collector.core/RoutesSpec.scala @@ -73,7 +73,9 @@ class RoutesSpec extends Specification { ) = { val service = new TestService() val routes = - new Routes(enabledDefaultRedirect, enableRootResponse, enableCrossdomainTracking, service).value.orNotFound + new Routes(enabledDefaultRedirect, enableRootResponse, enableCrossdomainTracking, 500.millis, service) + .value + .orNotFound val routesWithHsts = HttpServer.hstsMiddleware(Config.HSTS(enableHsts, 180.days), routes) (service, routesWithHsts) } diff --git a/core/src/test/scala/com.snowplowanalytics.snowplow.collector.core/StreamsSpec.scala b/core/src/test/scala/com.snowplowanalytics.snowplow.collector.core/StreamsSpec.scala new file mode 100644 index 000000000..fa734babf --- /dev/null +++ b/core/src/test/scala/com.snowplowanalytics.snowplow.collector.core/StreamsSpec.scala @@ -0,0 +1,22 @@ +package com.snowplowanalytics.snowplow.collector.core + +import scala.concurrent.duration._ +import org.specs2.mutable.Specification +import cats.effect.IO +import cats.effect.unsafe.implicits.global +import fs2.Stream + +class StreamsSpec extends Specification { + + "Streams" should { + "allow terminating a stream early when idle" in { + Stream + .emits[IO, Int](Vector(1, 2, 3)) + .onComplete(Stream.empty[IO].delayBy(20.seconds)) + .through(Streams.timeoutOnIdle(100.millis)) + .compile + .count + .unsafeRunSync() must beEqualTo(3) + } + } +} diff --git a/core/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala b/core/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala index e880d00c9..92fd420f2 100644 --- a/core/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala +++ b/core/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala @@ -119,7 +119,9 @@ object TestUtils { ), networking = Networking( 1024, - 610.seconds + 610.seconds, + 2.second, + 500.millis ), enableDefaultRedirect = false, redirectDomains = Set.empty[String], diff --git a/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala b/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala index 74d92d742..4ae717e90 100644 --- a/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala +++ b/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala @@ -169,8 +169,10 @@ object KafkaConfigSpec { autoGeneratedId = None ), networking = Config.Networking( - maxConnections = 1024, - idleTimeout = 610.seconds + maxConnections = 1024, + idleTimeout = 610.seconds, + responseHeaderTimeout = 2.seconds, + bodyReadTimeout = 500.millis ), license = Config.License(accept = true) ) diff --git a/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala b/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala index b59a6a532..3f0c57ed7 100644 --- a/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala +++ b/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala @@ -123,8 +123,10 @@ object KinesisConfigSpec { redirectDomains = Set.empty, preTerminationPeriod = 10.seconds, networking = Config.Networking( - maxConnections = 1024, - idleTimeout = 610.seconds + maxConnections = 1024, + idleTimeout = 610.seconds, + responseHeaderTimeout = 2.seconds, + bodyReadTimeout = 500.millis ), streams = Config.Streams( useIpAddressAsPartitionKey = false, diff --git a/nsq/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqConfigSpec.scala b/nsq/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqConfigSpec.scala index dcc615105..ce15c7afe 100644 --- a/nsq/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqConfigSpec.scala +++ b/nsq/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqConfigSpec.scala @@ -156,8 +156,10 @@ object NsqConfigSpec { autoGeneratedId = None ), networking = Config.Networking( - maxConnections = 1024, - idleTimeout = 610.seconds + maxConnections = 1024, + idleTimeout = 610.seconds, + responseHeaderTimeout = 2.seconds, + bodyReadTimeout = 500.millis ), license = Config.License(accept = true) ) diff --git a/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala b/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala index 4431ec1bb..6df175fde 100644 --- a/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala +++ b/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala @@ -112,8 +112,10 @@ object ConfigSpec { redirectDomains = Set.empty, preTerminationPeriod = 10.seconds, networking = Config.Networking( - maxConnections = 1024, - idleTimeout = 610.seconds + maxConnections = 1024, + idleTimeout = 610.seconds, + responseHeaderTimeout = 2.seconds, + bodyReadTimeout = 500.millis ), streams = Config.Streams( useIpAddressAsPartitionKey = false, diff --git a/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala b/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala index 6f6cc5e2d..fdd0ade6c 100644 --- a/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala +++ b/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala @@ -113,8 +113,10 @@ object SqsConfigSpec { redirectDomains = Set.empty, preTerminationPeriod = 10.seconds, networking = Config.Networking( - maxConnections = 1024, - idleTimeout = 610.seconds + maxConnections = 1024, + idleTimeout = 610.seconds, + responseHeaderTimeout = 2.seconds, + bodyReadTimeout = 500.millis ), streams = Config.Streams( useIpAddressAsPartitionKey = false,