Skip to content

Commit

Permalink
Fix randomly adjusted first window size (#79)
Browse files Browse the repository at this point in the history
common-streams has a feature where the first window is randomly adjusted
to a different size. This helps in a deployment where a large number of
pods might all start at the same time, but we pods to have mutually
staggered windows, e.g. to avoid write conflicts in the lake loader.

Unfortunately I broke the feature in #64 so this fixes it again.

I also add a test so it won't get broken again.
  • Loading branch information
istreeter authored May 20, 2024
1 parent bacdcd2 commit 13a7d10
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,9 @@ private[sources] object LowLevelSource {
val pull = for {
_ <- Pull.eval(Logger[F].info(s"Opening first window with randomly adjusted duration of $timeout"))
_ <- timedPull.timeout(timeout)
} yield go(timedPull, None)
q <- Pull.eval(Queue.synchronous[F, Option[A]])
_ <- Pull.output1(Stream.fromQueueNoneTerminated(q))
} yield go(timedPull, Some(q))
pull.flatten
}
.stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
cleanly checkpoint pending window when a stream is interrupted $windowed2
not checkpoint events if the event processor throws an exception $windowed3
eagerly start windows when previous window is still finalizing $windowed4
use a short first window according to the configuration $windowed5

When reporting healthy status
report healthy when there are no events $health1
Expand Down Expand Up @@ -439,6 +440,57 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
TestControl.executeEmbed(io)
}

def windowed5 = {

val config = EventProcessingConfig(
EventProcessingConfig.TimedWindows(
duration = 60.seconds,
firstWindowScaling = 0.25, // so first window is 15 seconds
numEagerWindows = 2
)
)

val testConfig = TestSourceConfig(
batchesPerRebalance = Int.MaxValue,
eventsPerBatch = 2,
timeBetweenBatches = 11.seconds,
timeToProcessBatch = 1.second
)

val io = for {
refActions <- Ref[IO].of(Vector.empty[Action])
sourceAndAck <- LowLevelSource.toSourceAndAck(testLowLevelSource(refActions, testConfig))
processor = windowedProcessor(refActions, testConfig)
fiber <- sourceAndAck.stream(config, processor).compile.drain.start
_ <- IO.sleep(91.seconds)
_ <- fiber.cancel
result <- refActions.get
} yield result must beEqualTo(
Vector(
Action.ProcessorStartedWindow("1970-01-01T00:00:00Z"),
Action.ProcessorReceivedEvents("1970-01-01T00:00:00Z", List("1", "2")),
Action.ProcessorReceivedEvents("1970-01-01T00:00:11Z", List("3", "4")),
Action.ProcessorReachedEndOfWindow("1970-01-01T00:00:15Z"),
Action.Checkpointed(List("1", "2", "3", "4")),
Action.ProcessorStartedWindow("1970-01-01T00:00:22Z"),
Action.ProcessorReceivedEvents("1970-01-01T00:00:22Z", List("5", "6")),
Action.ProcessorReceivedEvents("1970-01-01T00:00:33Z", List("7", "8")),
Action.ProcessorReceivedEvents("1970-01-01T00:00:44Z", List("9", "10")),
Action.ProcessorReceivedEvents("1970-01-01T00:00:55Z", List("11", "12")),
Action.ProcessorReceivedEvents("1970-01-01T00:01:06Z", List("13", "14")),
Action.ProcessorReceivedEvents("1970-01-01T00:01:17Z", List("15", "16")),
Action.ProcessorReachedEndOfWindow("1970-01-01T00:01:22Z"),
Action.Checkpointed(List("5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16")),
Action.ProcessorStartedWindow("1970-01-01T00:01:28Z"),
Action.ProcessorReceivedEvents("1970-01-01T00:01:28Z", List("17", "18")),
Action.ProcessorReachedEndOfWindow("1970-01-01T00:01:31Z"),
Action.Checkpointed(List("17", "18"))
)
)

TestControl.executeEmbed(io)
}

/** Specs for health check */

def health1 = {
Expand Down

0 comments on commit 13a7d10

Please sign in to comment.