Skip to content

Commit

Permalink
Eagerness of windows should be configurable
Browse files Browse the repository at this point in the history
common-streams has a feature in which a timed window of events is allowed to start processing
before the previous window has finalized. This is a great feature for making full use of available
cpu. It means are are always working the cpu hard, even if some slow I/O is required to finalize
the window.

Until now, the eagerness only stretched to consecutive windows. E.g. If window 1 is still
finalizing then it is allowed for window 2 to start processing; but it is not allowed for window 3
to start processing.

For Lake Loader I found it is better to let the eagerness stretch further.  E.g. window 3 is
allowed to start processing even if windows 1 and 2 are both still finalizing.  I also found it is
better to allow consecutive windows to be finalizing at the same time. E.g. window 2 can start its
finalization even if window 1 is still finishing its finalization.

This PR makes configurable how many windows may start eagerly ahead of a finalzing window.
  • Loading branch information
istreeter committed May 15, 2024
1 parent 4a154bf commit bacdcd2
Show file tree
Hide file tree
Showing 3 changed files with 476 additions and 238 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,23 @@ object EventProcessingConfig {
* several parallel instances of the app all start at the same time. All instances in the group
* should end windows at slightly different times, so that downstream gets a more steady flow of
* completed batches.
* @param numEagerWindows
* Controls how many windows are allowed to start eagerly ahead of an earlier window that is
* still being finalized. For example, if numEagerWindows=2 then window 42 is allowed to start
* while windows 40 and 41 are still finalizing.
*/
case class TimedWindows(duration: FiniteDuration, firstWindowScaling: Double) extends Windowing
case class TimedWindows(
duration: FiniteDuration,
firstWindowScaling: Double,
numEagerWindows: Int
) extends Windowing

object TimedWindows {
def build[F[_]: Sync](duration: FiniteDuration): F[TimedWindows] =
def build[F[_]: Sync](duration: FiniteDuration, numEagerWindows: Int): F[TimedWindows] =
for {
random <- Random.scalaUtilRandom
factor <- random.nextDouble
} yield TimedWindows(duration, factor)
} yield TimedWindows(duration, factor, numEagerWindows)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private[sources] object LowLevelSource {
tokenedSources
.zip(sinks)
.map { case (tokenedSource, sink) => sink(tokenedSource) }
.parJoin(2) // so we start processing the next window while the previous window is still finishing up.
.parJoin(eagerness(config.windowing)) // so we start processing the next window while the previous window is still finishing up.
}

str.flatten
Expand Down Expand Up @@ -185,14 +185,13 @@ private[sources] object LowLevelSource {
checkpointer: Checkpointer[F, C],
control: EagerWindows.Control[F]
): Pipe[F, TokenedEvents, Nothing] =
_.append(Stream.eval(control.waitForPreviousWindow).drain)
.evalTap { case TokenedEvents(events, _, _) =>
Logger[F].debug(s"Batch of ${events.size} events received from the source stream")
}
_.evalTap { case TokenedEvents(events, _, _) =>
Logger[F].debug(s"Batch of ${events.size} events received from the source stream")
}
.through(processor)
.chunks
.evalTap(_ => control.waitForPreviousWindow)
.prefetch // This prefetch means we can ack messages concurrently with processing the next batch
.evalTap(_ => control.waitForPreviousWindow)
.evalMap { chunk =>
chunk
.traverse { token =>
Expand Down Expand Up @@ -223,6 +222,12 @@ private[sources] object LowLevelSource {
case tw: EventProcessingConfig.TimedWindows => timedWindows(tw)
}

private def eagerness(config: EventProcessingConfig.Windowing): Int =
config match {
case EventProcessingConfig.NoWindowing => 1
case tw: EventProcessingConfig.TimedWindows => tw.numEagerWindows + 1
}

/**
* An fs2 Pipe which converts a stream of `A` into a stream of `Stream[F, A]`. Each stream in the
* output provides events over a fixed window of time. When the window is over, the inner stream
Expand Down
Loading

0 comments on commit bacdcd2

Please sign in to comment.