From 98a9d8d27e96eb90938f51133c0bedbce2611e1c Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Fri, 3 Jan 2025 09:25:38 +0000 Subject: [PATCH] Sources should report stream latency of stuck events Currently in common-streams apps, the _application_ (i.e. not this library) is responsible for tracking latency of events consumed from the stream. Because of the way it is implemented in the apps, if an event gets stuck (e.g. cannot be written to destination) then the latency drops to zero for the period when the app is retrying the stuck event. This commit aims to fix the problem where the `latency_millis` metric wrongly drops to zero. 1. The source itself now tracks the latency of the event which is currently being processed by the downstream application. 2. The Metrics class allows metric starting values to be wrapped in and Effect, i.e. `F[Metrics.State]`. This means the application can pass in the latency reported by the Source. --- .../snowplow/runtime/MetricsSpec.scala | 2 +- .../snowplow/runtime/Metrics.scala | 5 +- .../sources/SourceAndAck.scala | 17 ++++ .../sources/internal/LowLevelSource.scala | 43 +++++++-- .../sources/internal/LowLevelSourceSpec.scala | 93 ++++++++++++++++++- 5 files changed, 144 insertions(+), 16 deletions(-) diff --git a/modules/it/src/test/scala/com/snowplowanalytics/snowplow/runtime/MetricsSpec.scala b/modules/it/src/test/scala/com/snowplowanalytics/snowplow/runtime/MetricsSpec.scala index 17e6e90d..89588b98 100644 --- a/modules/it/src/test/scala/com/snowplowanalytics/snowplow/runtime/MetricsSpec.scala +++ b/modules/it/src/test/scala/com/snowplowanalytics/snowplow/runtime/MetricsSpec.scala @@ -76,7 +76,7 @@ object TestMetrics { ref: Ref[IO, TestState], emptyState: TestState, config: Option[Metrics.StatsdConfig] - ) extends Metrics[IO, TestState](ref, emptyState, config) { + ) extends Metrics[IO, TestState](ref, IO.pure(emptyState), config) { def count(c: Int) = ref.update(s => s.copy(counter = s.counter + c)) def time(t: FiniteDuration) = ref.update(s => s.copy(timer = s.timer + t)) } diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Metrics.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Metrics.scala index ea88db9c..90951e62 100644 --- a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Metrics.scala +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Metrics.scala @@ -24,13 +24,14 @@ import java.nio.charset.StandardCharsets.UTF_8 abstract class Metrics[F[_]: Async, S <: Metrics.State]( ref: Ref[F, S], - emptyState: S, + initState: F[S], config: Option[Metrics.StatsdConfig] ) { def report: Stream[F, Nothing] = Stream.resource(Metrics.makeReporters[F](config)).flatMap { reporters => def report = for { - state <- ref.getAndSet(emptyState) + nextState <- initState + state <- ref.getAndSet(nextState) kv = state.toKVMetrics _ <- reporters.traverse(_.report(kv)) } yield () diff --git a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala index 384daf76..7bef1633 100644 --- a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala +++ b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala @@ -10,6 +10,7 @@ package com.snowplowanalytics.snowplow.sources import cats.Show import cats.implicits._ import fs2.Stream + import scala.concurrent.duration.FiniteDuration /** @@ -48,6 +49,22 @@ trait SourceAndAck[F[_]] { * healthy. If any event is "stuck" then latency is high and the probe should report unhealthy. */ def isHealthy(maxAllowedProcessingLatency: FiniteDuration): F[SourceAndAck.HealthStatus] + + /** + * Latency of the message that is currently being processed by the downstream application + * + * The returned value is `None` if this `SourceAndAck` is currently awaiting messages from + * upstream, e.g. it is doing a remote fetch. + * + * The returned value is `Some` if this `SourceAndAck` has emitted a message downstream to the + * application, and it is waiting for the downstream app to "pull" the next message from this + * `SourceAndAck`. + * + * This value should be used as the initial latency at the start of a statsd metrics reporting + * period. This ensures the app reports non-zero latency even when the app is stuck (e.g. cannot + * load events to destination). + */ + def currentStreamLatency: F[Option[FiniteDuration]] } object SourceAndAck { diff --git a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala index 3ac01bb6..3373e73d 100644 --- a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala +++ b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala @@ -18,6 +18,7 @@ import org.typelevel.log4cats.{Logger, SelfAwareStructuredLogger} import org.typelevel.log4cats.slf4j.Slf4jLogger import scala.concurrent.duration.{DurationLong, FiniteDuration} +import java.time.Instant import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck, TokenedEvents} /** @@ -67,14 +68,29 @@ private[sources] object LowLevelSource { */ private type State[C] = Map[Unique.Token, C] + /** + * Timestamps of the instantaneous state of the Source + * + * @param wallTime + * The real time (since epoch) that this Source emitted a batch to the downstream app for + * processing. + * @param streamTime + * For the last batch to be emitted downstream, this is the earliest timestamp according to the + * source, i.e. time the event was written to the source stream. It is None if this stream type + * does not record timestamps, e.g. Kafka under some circumstances. + * + * Note that both values represent instants of time. They have different types (FiniteDuration and + * Instant) but that is just for convenience to match the incoming data. + */ + private case class LastEmittedTstamps(wallTime: FiniteDuration, streamTime: Option[Instant]) + /** * Mutable state used for measuring the event-processing latency from this source * - * For the batch that was last emitted downstream for processing, the `Option[FiniteDuration]` - * represents the real time (since epoch) that the batch was emitted. If it is None then there is - * no batch currently being processed. + * Holds a `LastEmittedTstamps` for the batch that was last emitted downstream for processing. It + * is None if there is no batch currently being processed downstream. */ - private type LatencyRef[F[_]] = Ref[F, Option[FiniteDuration]] + private type LatencyRef[F[_]] = Ref[F, Option[LastEmittedTstamps]] /** * Lifts the internal [[LowLevelSource]] into a [[SourceAndAck]], which is the public API of this @@ -82,7 +98,7 @@ private[sources] object LowLevelSource { */ def toSourceAndAck[F[_]: Async, C](source: LowLevelSource[F, C]): F[SourceAndAck[F]] = for { - latencyRef <- Ref[F].of(Option.empty[FiniteDuration]) + latencyRef <- Ref[F].of(Option.empty[LastEmittedTstamps]) isConnectedRef <- Ref[F].of(false) } yield sourceAndAckImpl(source, latencyRef, isConnectedRef) @@ -119,12 +135,21 @@ private[sources] object LowLevelSource { (isConnectedRef.get, latencyRef.get, source.lastLiveness, Sync[F].realTime).mapN { case (false, _, _, _) => SourceAndAck.Disconnected - case (_, Some(lastPullTime), _, now) if now - lastPullTime > maxAllowedProcessingLatency => + case (_, Some(LastEmittedTstamps(lastPullTime, _)), _, now) if now - lastPullTime > maxAllowedProcessingLatency => SourceAndAck.LaggingEventProcessor(now - lastPullTime) case (_, _, lastLiveness, now) if now - lastLiveness > maxAllowedProcessingLatency => SourceAndAck.InactiveSource(now - lastLiveness) case _ => SourceAndAck.Healthy } + + def currentStreamLatency: F[Option[FiniteDuration]] = + latencyRef.get.flatMap { + case Some(LastEmittedTstamps(_, Some(tstamp))) => + Sync[F].realTime.map { now => + Some(now - tstamp.toEpochMilli.millis) + } + case _ => none.pure[F] + } } private def nackUnhandled[F[_]: Monad, C](checkpointer: Checkpointer[F, C], ref: Ref[F, State[C]]): F[Unit] = @@ -150,15 +175,15 @@ private[sources] object LowLevelSource { * An fs2 Pipe which records what time (duration since epoch) we last emitted a batch downstream * for processing */ - private def monitorLatency[F[_]: Sync, A](ref: Ref[F, Option[FiniteDuration]]): Pipe[F, A, A] = { + private def monitorLatency[F[_]: Sync, C](ref: LatencyRef[F]): Pipe[F, LowLevelEvents[C], LowLevelEvents[C]] = { - def go(source: Stream[F, A]): Pull[F, A, Unit] = + def go(source: Stream[F, LowLevelEvents[C]]): Pull[F, LowLevelEvents[C], Unit] = source.pull.uncons1.flatMap { case None => Pull.done case Some((pulled, source)) => for { now <- Pull.eval(Sync[F].realTime) - _ <- Pull.eval(ref.set(Some(now))) + _ <- Pull.eval(ref.set(Some(LastEmittedTstamps(now, pulled.earliestSourceTstamp)))) _ <- Pull.output1(pulled) _ <- Pull.eval(ref.set(None)) _ <- go(source) diff --git a/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala b/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala index a7dfece8..1b46954f 100644 --- a/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala +++ b/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala @@ -16,9 +16,10 @@ import cats.effect.testing.specs2.CatsEffect import fs2.{Chunk, Stream} import org.specs2.Specification -import scala.concurrent.duration.{DurationInt, FiniteDuration} - +import scala.concurrent.duration.{DurationLong, FiniteDuration} import java.nio.charset.StandardCharsets +import java.time.Instant + import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck, TokenedEvents} import java.nio.ByteBuffer @@ -48,6 +49,11 @@ class LowLevelSourceSpec extends Specification with CatsEffect { report healthy after all events have been processed and acked $health4 report disconnected while source is in between two active streams of events (e.g. during kafka rebalance) $health5 report unhealthy if the underlying low level source is lagging $health6 + + When reporting currentStreamLatency + report no timestamp when there are no events $latency1 + report a timestamp if there are unprocessed events $latency2 + report no timestamp after events are processed $latency3 """ def e1 = { @@ -639,6 +645,84 @@ class LowLevelSourceSpec extends Specification with CatsEffect { TestControl.executeEmbed(io) } + + /** Specs for health check */ + + def latency1 = { + + val config = EventProcessingConfig(EventProcessingConfig.NoWindowing) + + // A source that emits nothing + val lowLevelSource = new LowLevelSource[IO, Unit] { + def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) + def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] = Stream.emit(Stream.never[IO]) + def lastLiveness: IO[FiniteDuration] = IO.realTime + } + + val io = for { + refActions <- Ref[IO].of(Vector.empty[Action]) + sourceAndAck <- LowLevelSource.toSourceAndAck(lowLevelSource) + processor = testProcessor(refActions, TestSourceConfig(1, 1, 1.second, 1.second)) + fiber <- sourceAndAck.stream(config, processor).compile.drain.start + _ <- IO.sleep(1.hour) + reportedLatency <- sourceAndAck.currentStreamLatency + _ <- fiber.cancel + } yield reportedLatency must beNone + + TestControl.executeEmbed(io) + } + + def latency2 = { + + val config = EventProcessingConfig(EventProcessingConfig.NoWindowing) + + val streamTstamp = Instant.parse("2024-01-02T03:04:05.123Z") + val testConfig = TestSourceConfig( + batchesPerRebalance = Int.MaxValue, + eventsPerBatch = 2, + timeBetweenBatches = 0.second, + timeToProcessBatch = 1.hour, // Processor is very slow to sink the events + streamTstamp = streamTstamp + ) + + val io = for { + _ <- IO.sleep(streamTstamp.toEpochMilli.millis + 2.minutes) + refActions <- Ref[IO].of(Vector.empty[Action]) + sourceAndAck <- LowLevelSource.toSourceAndAck(testLowLevelSource(refActions, testConfig)) + processor = testProcessor(refActions, testConfig) + fiber <- sourceAndAck.stream(config, processor).compile.drain.start + _ <- IO.sleep(5.minutes) + reportedLatency <- sourceAndAck.currentStreamLatency + _ <- fiber.cancel + } yield reportedLatency must beSome(7.minutes) + + TestControl.executeEmbed(io) + } + + def latency3 = { + + val config = EventProcessingConfig(EventProcessingConfig.TimedWindows(1.hour, 1.0, 2)) + + val testConfig = TestSourceConfig( + batchesPerRebalance = Int.MaxValue, + eventsPerBatch = 2, + timeBetweenBatches = 1.second, + timeToProcessBatch = 1.second + ) + + val io = for { + refActions <- Ref[IO].of(Vector.empty[Action]) + sourceAndAck <- LowLevelSource.toSourceAndAck(testLowLevelSource(refActions, testConfig)) + processor = windowedProcessor(refActions, testConfig) + fiber <- sourceAndAck.stream(config, processor).compile.drain.start + _ <- IO.sleep(30.minutes) + reportedLatency <- sourceAndAck.currentStreamLatency + _ <- fiber.cancel + } yield reportedLatency must beNone + + TestControl.executeEmbed(io) + } + } object LowLevelSourceSpec { @@ -657,7 +741,8 @@ object LowLevelSourceSpec { eventsPerBatch: Int, timeBetweenBatches: FiniteDuration, timeToProcessBatch: FiniteDuration, - timeToFinalizeWindow: FiniteDuration = 0.seconds + timeToFinalizeWindow: FiniteDuration = 0.seconds, + streamTstamp: Instant = Instant.EPOCH ) /** @@ -737,7 +822,7 @@ object LowLevelSourceSpec { .map { numbers => val events = numbers.map(_.toString) val asBytes = Chunk.from(events).map(e => ByteBuffer.wrap(e.getBytes(StandardCharsets.UTF_8))) - LowLevelEvents(events = asBytes, ack = events.toList, earliestSourceTstamp = None) + LowLevelEvents(events = asBytes, ack = events.toList, earliestSourceTstamp = Some(config.streamTstamp)) } } .flatMap { e =>