From d54572177623bf2d8f25be5b1b1bc5ff34948046 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Tue, 19 Mar 2024 08:20:31 +0000 Subject: [PATCH] Improve timings of overlapping windows (#64) In Lake Loader, there is an edge-case scenario where the time taken to "commit" events to the Lake becomes longer than the duration of the configured timed window. Previously, because of how windows got timed, the edge case scenario caused the window sizes to get smaller and smaller as it waited for the events to be committed. This PR changes how a window length is timed. After this PR, the processing time of a window remains constant, even if the "commit" phase starts to exceed the window size. It allows the loader to stay healthier even during edge-case scenarios. --- .../sources/internal/LowLevelSource.scala | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala index 1fe9f95..4d1f4be 100644 --- a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala +++ b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala @@ -241,21 +241,20 @@ private[sources] object LowLevelSource { case Some(q) => Pull.eval(q.offer(None)) >> Pull.done } case Some((Left(_), next)) => - val openWindow = - Pull.eval(Logger[F].info(s"Opening new window with duration ${config.duration}")) >> next.timeout(config.duration) current match { - case None => openWindow >> go(next, None) - case Some(q) => openWindow >> Pull.eval(q.offer(None)) >> go(next, None) + case None => go(next, None) + case Some(q) => Pull.eval(q.offer(None)) >> go(next, None) } case Some((Right(chunk), next)) => current match { case None => - for { + val pull = for { q <- Pull.eval(Queue.synchronous[F, Option[A]]) _ <- Pull.output1(Stream.fromQueueNoneTerminated(q)) _ <- Pull.eval(chunk.traverse(a => q.offer(Some(a)))) - _ <- go(next, Some(q)) - } yield () + _ <- Pull.eval(Logger[F].info(s"Opening new window with duration ${config.duration}")) >> next.timeout(config.duration) + } yield go(next, Some(q)) + pull.flatten case Some(q) => Pull.eval(chunk.traverse(a => q.offer(Some(a)))) >> go(next, Some(q)) } @@ -265,11 +264,11 @@ private[sources] object LowLevelSource { in.pull .timed { timedPull: Pull.Timed[F, A] => val timeout = timeoutForFirstWindow(config) - for { + val pull = for { _ <- Pull.eval(Logger[F].info(s"Opening first window with randomly adjusted duration of $timeout")) _ <- timedPull.timeout(timeout) - _ <- go(timedPull, None) - } yield () + } yield go(timedPull, None) + pull.flatten } .stream .prefetch // This prefetch is required to pull items into the emitted stream