Skip to content

Commit

Permalink
Sources should report stream latency of stuck events
Browse files Browse the repository at this point in the history
Currently in common-streams apps, the _application_ (i.e. not this
library) is responsible for tracking latency of events consumed from the
stream.  Because of the way it is implemented in the apps, if an event
gets stuck (e.g. cannot be written to destination) then the latency
drops to zero for the period when the app is retrying the stuck event.

This commit aims to fix the problem where the `latency_millis` metric
wrongly drops to zero.

1. The source itself now tracks the latency of the event which is
   currently being processed by the downstream application.
2. The Metrics class allows metric starting values to be wrapped in and
   Effect, i.e. `F[Metrics.State]`. This means the application can pass
   in the latency reported by the Source.
  • Loading branch information
istreeter committed Jan 3, 2025
1 parent d5f509b commit 98a9d8d
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ object TestMetrics {
ref: Ref[IO, TestState],
emptyState: TestState,
config: Option[Metrics.StatsdConfig]
) extends Metrics[IO, TestState](ref, emptyState, config) {
) extends Metrics[IO, TestState](ref, IO.pure(emptyState), config) {
def count(c: Int) = ref.update(s => s.copy(counter = s.counter + c))
def time(t: FiniteDuration) = ref.update(s => s.copy(timer = s.timer + t))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import java.nio.charset.StandardCharsets.UTF_8

abstract class Metrics[F[_]: Async, S <: Metrics.State](
ref: Ref[F, S],
emptyState: S,
initState: F[S],
config: Option[Metrics.StatsdConfig]
) {
def report: Stream[F, Nothing] =
Stream.resource(Metrics.makeReporters[F](config)).flatMap { reporters =>
def report = for {
state <- ref.getAndSet(emptyState)
nextState <- initState
state <- ref.getAndSet(nextState)
kv = state.toKVMetrics
_ <- reporters.traverse(_.report(kv))
} yield ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package com.snowplowanalytics.snowplow.sources
import cats.Show
import cats.implicits._
import fs2.Stream

import scala.concurrent.duration.FiniteDuration

/**
Expand Down Expand Up @@ -48,6 +49,22 @@ trait SourceAndAck[F[_]] {
* healthy. If any event is "stuck" then latency is high and the probe should report unhealthy.
*/
def isHealthy(maxAllowedProcessingLatency: FiniteDuration): F[SourceAndAck.HealthStatus]

/**
* Latency of the message that is currently being processed by the downstream application
*
* The returned value is `None` if this `SourceAndAck` is currently awaiting messages from
* upstream, e.g. it is doing a remote fetch.
*
* The returned value is `Some` if this `SourceAndAck` has emitted a message downstream to the
* application, and it is waiting for the downstream app to "pull" the next message from this
* `SourceAndAck`.
*
* This value should be used as the initial latency at the start of a statsd metrics reporting
* period. This ensures the app reports non-zero latency even when the app is stuck (e.g. cannot
* load events to destination).
*/
def currentStreamLatency: F[Option[FiniteDuration]]
}

object SourceAndAck {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import org.typelevel.log4cats.{Logger, SelfAwareStructuredLogger}
import org.typelevel.log4cats.slf4j.Slf4jLogger

import scala.concurrent.duration.{DurationLong, FiniteDuration}
import java.time.Instant
import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck, TokenedEvents}

/**
Expand Down Expand Up @@ -67,22 +68,37 @@ private[sources] object LowLevelSource {
*/
private type State[C] = Map[Unique.Token, C]

/**
* Timestamps of the instantaneous state of the Source
*
* @param wallTime
* The real time (since epoch) that this Source emitted a batch to the downstream app for
* processing.
* @param streamTime
* For the last batch to be emitted downstream, this is the earliest timestamp according to the
* source, i.e. time the event was written to the source stream. It is None if this stream type
* does not record timestamps, e.g. Kafka under some circumstances.
*
* Note that both values represent instants of time. They have different types (FiniteDuration and
* Instant) but that is just for convenience to match the incoming data.
*/
private case class LastEmittedTstamps(wallTime: FiniteDuration, streamTime: Option[Instant])

/**
* Mutable state used for measuring the event-processing latency from this source
*
* For the batch that was last emitted downstream for processing, the `Option[FiniteDuration]`
* represents the real time (since epoch) that the batch was emitted. If it is None then there is
* no batch currently being processed.
* Holds a `LastEmittedTstamps` for the batch that was last emitted downstream for processing. It
* is None if there is no batch currently being processed downstream.
*/
private type LatencyRef[F[_]] = Ref[F, Option[FiniteDuration]]
private type LatencyRef[F[_]] = Ref[F, Option[LastEmittedTstamps]]

/**
* Lifts the internal [[LowLevelSource]] into a [[SourceAndAck]], which is the public API of this
* library
*/
def toSourceAndAck[F[_]: Async, C](source: LowLevelSource[F, C]): F[SourceAndAck[F]] =
for {
latencyRef <- Ref[F].of(Option.empty[FiniteDuration])
latencyRef <- Ref[F].of(Option.empty[LastEmittedTstamps])
isConnectedRef <- Ref[F].of(false)
} yield sourceAndAckImpl(source, latencyRef, isConnectedRef)

Expand Down Expand Up @@ -119,12 +135,21 @@ private[sources] object LowLevelSource {
(isConnectedRef.get, latencyRef.get, source.lastLiveness, Sync[F].realTime).mapN {
case (false, _, _, _) =>
SourceAndAck.Disconnected
case (_, Some(lastPullTime), _, now) if now - lastPullTime > maxAllowedProcessingLatency =>
case (_, Some(LastEmittedTstamps(lastPullTime, _)), _, now) if now - lastPullTime > maxAllowedProcessingLatency =>
SourceAndAck.LaggingEventProcessor(now - lastPullTime)
case (_, _, lastLiveness, now) if now - lastLiveness > maxAllowedProcessingLatency =>
SourceAndAck.InactiveSource(now - lastLiveness)
case _ => SourceAndAck.Healthy
}

def currentStreamLatency: F[Option[FiniteDuration]] =
latencyRef.get.flatMap {
case Some(LastEmittedTstamps(_, Some(tstamp))) =>
Sync[F].realTime.map { now =>
Some(now - tstamp.toEpochMilli.millis)
}
case _ => none.pure[F]
}
}

private def nackUnhandled[F[_]: Monad, C](checkpointer: Checkpointer[F, C], ref: Ref[F, State[C]]): F[Unit] =
Expand All @@ -150,15 +175,15 @@ private[sources] object LowLevelSource {
* An fs2 Pipe which records what time (duration since epoch) we last emitted a batch downstream
* for processing
*/
private def monitorLatency[F[_]: Sync, A](ref: Ref[F, Option[FiniteDuration]]): Pipe[F, A, A] = {
private def monitorLatency[F[_]: Sync, C](ref: LatencyRef[F]): Pipe[F, LowLevelEvents[C], LowLevelEvents[C]] = {

def go(source: Stream[F, A]): Pull[F, A, Unit] =
def go(source: Stream[F, LowLevelEvents[C]]): Pull[F, LowLevelEvents[C], Unit] =
source.pull.uncons1.flatMap {
case None => Pull.done
case Some((pulled, source)) =>
for {
now <- Pull.eval(Sync[F].realTime)
_ <- Pull.eval(ref.set(Some(now)))
_ <- Pull.eval(ref.set(Some(LastEmittedTstamps(now, pulled.earliestSourceTstamp))))
_ <- Pull.output1(pulled)
_ <- Pull.eval(ref.set(None))
_ <- go(source)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ import cats.effect.testing.specs2.CatsEffect
import fs2.{Chunk, Stream}
import org.specs2.Specification

import scala.concurrent.duration.{DurationInt, FiniteDuration}

import scala.concurrent.duration.{DurationLong, FiniteDuration}
import java.nio.charset.StandardCharsets
import java.time.Instant

import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck, TokenedEvents}

import java.nio.ByteBuffer
Expand Down Expand Up @@ -48,6 +49,11 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
report healthy after all events have been processed and acked $health4
report disconnected while source is in between two active streams of events (e.g. during kafka rebalance) $health5
report unhealthy if the underlying low level source is lagging $health6

When reporting currentStreamLatency
report no timestamp when there are no events $latency1
report a timestamp if there are unprocessed events $latency2
report no timestamp after events are processed $latency3
"""

def e1 = {
Expand Down Expand Up @@ -639,6 +645,84 @@ class LowLevelSourceSpec extends Specification with CatsEffect {

TestControl.executeEmbed(io)
}

/** Specs for health check */

def latency1 = {

val config = EventProcessingConfig(EventProcessingConfig.NoWindowing)

// A source that emits nothing
val lowLevelSource = new LowLevelSource[IO, Unit] {
def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit)
def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] = Stream.emit(Stream.never[IO])
def lastLiveness: IO[FiniteDuration] = IO.realTime
}

val io = for {
refActions <- Ref[IO].of(Vector.empty[Action])
sourceAndAck <- LowLevelSource.toSourceAndAck(lowLevelSource)
processor = testProcessor(refActions, TestSourceConfig(1, 1, 1.second, 1.second))
fiber <- sourceAndAck.stream(config, processor).compile.drain.start
_ <- IO.sleep(1.hour)
reportedLatency <- sourceAndAck.currentStreamLatency
_ <- fiber.cancel
} yield reportedLatency must beNone

TestControl.executeEmbed(io)
}

def latency2 = {

val config = EventProcessingConfig(EventProcessingConfig.NoWindowing)

val streamTstamp = Instant.parse("2024-01-02T03:04:05.123Z")
val testConfig = TestSourceConfig(
batchesPerRebalance = Int.MaxValue,
eventsPerBatch = 2,
timeBetweenBatches = 0.second,
timeToProcessBatch = 1.hour, // Processor is very slow to sink the events
streamTstamp = streamTstamp
)

val io = for {
_ <- IO.sleep(streamTstamp.toEpochMilli.millis + 2.minutes)
refActions <- Ref[IO].of(Vector.empty[Action])
sourceAndAck <- LowLevelSource.toSourceAndAck(testLowLevelSource(refActions, testConfig))
processor = testProcessor(refActions, testConfig)
fiber <- sourceAndAck.stream(config, processor).compile.drain.start
_ <- IO.sleep(5.minutes)
reportedLatency <- sourceAndAck.currentStreamLatency
_ <- fiber.cancel
} yield reportedLatency must beSome(7.minutes)

TestControl.executeEmbed(io)
}

def latency3 = {

val config = EventProcessingConfig(EventProcessingConfig.TimedWindows(1.hour, 1.0, 2))

val testConfig = TestSourceConfig(
batchesPerRebalance = Int.MaxValue,
eventsPerBatch = 2,
timeBetweenBatches = 1.second,
timeToProcessBatch = 1.second
)

val io = for {
refActions <- Ref[IO].of(Vector.empty[Action])
sourceAndAck <- LowLevelSource.toSourceAndAck(testLowLevelSource(refActions, testConfig))
processor = windowedProcessor(refActions, testConfig)
fiber <- sourceAndAck.stream(config, processor).compile.drain.start
_ <- IO.sleep(30.minutes)
reportedLatency <- sourceAndAck.currentStreamLatency
_ <- fiber.cancel
} yield reportedLatency must beNone

TestControl.executeEmbed(io)
}

}

object LowLevelSourceSpec {
Expand All @@ -657,7 +741,8 @@ object LowLevelSourceSpec {
eventsPerBatch: Int,
timeBetweenBatches: FiniteDuration,
timeToProcessBatch: FiniteDuration,
timeToFinalizeWindow: FiniteDuration = 0.seconds
timeToFinalizeWindow: FiniteDuration = 0.seconds,
streamTstamp: Instant = Instant.EPOCH
)

/**
Expand Down Expand Up @@ -737,7 +822,7 @@ object LowLevelSourceSpec {
.map { numbers =>
val events = numbers.map(_.toString)
val asBytes = Chunk.from(events).map(e => ByteBuffer.wrap(e.getBytes(StandardCharsets.UTF_8)))
LowLevelEvents(events = asBytes, ack = events.toList, earliestSourceTstamp = None)
LowLevelEvents(events = asBytes, ack = events.toList, earliestSourceTstamp = Some(config.streamTstamp))
}
}
.flatMap { e =>
Expand Down

0 comments on commit 98a9d8d

Please sign in to comment.