Skip to content

Commit

Permalink
Extend health probe to report unhealthy on more error scenarios (#69)
Browse files Browse the repository at this point in the history
The health probe now reports unhealthy under these extra scenarios:

- Bad row sink is unhealthy – cannot write to the sink
- Fatal error happens when trying to write events to the lake
  • Loading branch information
istreeter authored and oguzhanunlu committed Nov 1, 2024
1 parent 8d3626c commit d8692fd
Show file tree
Hide file tree
Showing 7 changed files with 370 additions and 42 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/**
* Copyright (c) 2013-present Snowplow Analytics Ltd. All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd., under the terms of the Snowplow
* Limited Use License Agreement, Version 1.0 located at
* https://docs.snowplow.io/limited-use-license-1.0 BY INSTALLING, DOWNLOADING, ACCESSING, USING OR
* DISTRIBUTING ANY PORTION OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/
package com.snowplowanalytics.snowplow.lakes

import cats.effect.{Concurrent, Ref}
import cats.implicits._
import cats.{Monad, Monoid, Show}
import com.snowplowanalytics.snowplow.runtime.HealthProbe
import com.snowplowanalytics.snowplow.runtime.HealthProbe.{Healthy, Unhealthy}
import com.snowplowanalytics.snowplow.sources.SourceAndAck

import scala.concurrent.duration.FiniteDuration

final class AppHealth[F[_]: Monad](
unhealthyLatency: FiniteDuration,
source: SourceAndAck[F],
appManagedServices: Ref[F, Map[AppHealth.Service, Boolean]]
) {

def status: F[HealthProbe.Status] =
for {
sourceHealth <- getSourceHealth
servicesHealth <- getAppManagedServicesHealth
} yield (sourceHealth :: servicesHealth).combineAll

def setServiceHealth(service: AppHealth.Service, isHealthy: Boolean): F[Unit] =
appManagedServices.update { currentHealth =>
currentHealth.updated(service, isHealthy)
}

private def getAppManagedServicesHealth: F[List[HealthProbe.Status]] =
appManagedServices.get.map { services =>
services.map {
case (service, false) => HealthProbe.Unhealthy(show"$service is not healthy")
case _ => HealthProbe.Healthy
}.toList
}

private def getSourceHealth: F[HealthProbe.Status] =
source.isHealthy(unhealthyLatency).map {
case SourceAndAck.Healthy => HealthProbe.Healthy
case unhealthy: SourceAndAck.Unhealthy => HealthProbe.Unhealthy(unhealthy.show)
}

private val combineHealth: (HealthProbe.Status, HealthProbe.Status) => HealthProbe.Status = {
case (Healthy, Healthy) => Healthy
case (Healthy, unhealthy) => unhealthy
case (unhealthy, Healthy) => unhealthy
case (Unhealthy(first), Unhealthy(second)) => Unhealthy(reason = s"$first, $second")
}

private implicit val healthMonoid: Monoid[HealthProbe.Status] = Monoid.instance(Healthy, combineHealth)
}

object AppHealth {

sealed trait Service

object Service {
case object SparkWriter extends Service
case object BadSink extends Service

implicit val show: Show[Service] = Show.show {
case SparkWriter => "Spark writer"
case BadSink => "Failed events sink"
}
}

def init[F[_]: Concurrent](
unhealthyLatency: FiniteDuration,
source: SourceAndAck[F]
): F[AppHealth[F]] =
Ref[F]
.of(Map[Service, Boolean](Service.SparkWriter -> false, Service.BadSink -> false))
.map(appManaged => new AppHealth[F](unhealthyLatency, source, appManaged))
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

package com.snowplowanalytics.snowplow.lakes

import cats.{Functor, Monad}
import cats.implicits._
import cats.effect.{Async, Resource, Sync}
import cats.effect.unsafe.implicits.global
Expand Down Expand Up @@ -46,8 +45,9 @@ case class Environment[F[_]](
badSink: Sink[F],
resolver: Resolver[F],
httpClient: Client[F],
lakeWriter: LakeWriter[F],
lakeWriter: LakeWriter.WithHandledErrors[F],
metrics: Metrics[F],
appHealth: AppHealth[F],
cpuParallelism: Int,
inMemBatchBytes: Long,
windowing: EventProcessingConfig.TimedWindows,
Expand All @@ -65,24 +65,26 @@ object Environment {
): Resource[F, Environment[F]] =
for {
_ <- enableSentry[F](appInfo, config.main.monitoring.sentry)
sourceAndAck <- Resource.eval(toSource(config.main.input))
appHealth <- Resource.eval(AppHealth.init(config.main.monitoring.healthProbe.unhealthyLatency, sourceAndAck))
_ <- HealthProbe.resource(config.main.monitoring.healthProbe.port, appHealth.status)
resolver <- mkResolver[F](config.iglu)
httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource
badSink <- toSink(config.main.output.bad.sink)
badSink <- toSink(config.main.output.bad.sink).evalTap(_ => appHealth.setServiceHealth(AppHealth.Service.BadSink, true))
windowing <- Resource.eval(EventProcessingConfig.TimedWindows.build(config.main.windowing, config.main.numEagerWindows))
(lakeWriter, lakeWriterHealth) <- LakeWriter.build[F](config.main.spark, config.main.output.good)
sourceAndAck <- Resource.eval(toSource(config.main.input))
lakeWriter <- LakeWriter.build(config.main.spark, config.main.output.good)
lakeWriterWrapped = LakeWriter.withHandledErrors(lakeWriter, appHealth)
metrics <- Resource.eval(Metrics.build(config.main.monitoring.metrics))
isHealthy = combineIsHealthy(sourceIsHealthy(config.main.monitoring.healthProbe, sourceAndAck), lakeWriterHealth)
_ <- HealthProbe.resource(config.main.monitoring.healthProbe.port, isHealthy)
cpuParallelism = chooseCpuParallelism(config.main)
} yield Environment(
appInfo = appInfo,
source = sourceAndAck,
badSink = badSink,
resolver = resolver,
httpClient = httpClient,
lakeWriter = lakeWriter,
lakeWriter = lakeWriterWrapped,
metrics = metrics,
appHealth = appHealth,
cpuParallelism = cpuParallelism,
inMemBatchBytes = config.main.inMemBatchBytes,
windowing = windowing,
Expand Down Expand Up @@ -136,16 +138,4 @@ object Environment {
.setScale(0, BigDecimal.RoundingMode.UP)
.toInt

private def sourceIsHealthy[F[_]: Functor](config: Config.HealthProbe, source: SourceAndAck[F]): F[HealthProbe.Status] =
source.isHealthy(config.unhealthyLatency).map {
case SourceAndAck.Healthy => HealthProbe.Healthy
case unhealthy: SourceAndAck.Unhealthy => HealthProbe.Unhealthy(unhealthy.show)
}

// TODO: This should move to common-streams
private def combineIsHealthy[F[_]: Monad](status: F[HealthProbe.Status]*): F[HealthProbe.Status] =
status.toList.foldM[F, HealthProbe.Status](HealthProbe.Healthy) {
case (unhealthy: HealthProbe.Unhealthy, _) => Monad[F].pure(unhealthy)
case (HealthProbe.Healthy, other) => other
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@ package com.snowplowanalytics.snowplow.lakes.processing

import cats.implicits._
import cats.data.NonEmptyList
import cats.effect.{Async, Ref, Resource, Sync}
import cats.effect.{Async, Resource, Sync}
import cats.effect.std.Mutex
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.StructType

import com.snowplowanalytics.snowplow.runtime.HealthProbe
import com.snowplowanalytics.snowplow.lakes.Config
import com.snowplowanalytics.snowplow.lakes.{AppHealth, Config}
import com.snowplowanalytics.snowplow.lakes.tables.{DeltaWriter, HudiWriter, IcebergWriter, Writer}

trait LakeWriter[F[_]] {
Expand Down Expand Up @@ -68,26 +67,49 @@ trait LakeWriter[F[_]] {

object LakeWriter {

trait WithHandledErrors[F[_]] extends LakeWriter[F]

def build[F[_]: Async](
config: Config.Spark,
target: Config.Target
): Resource[F, (LakeWriter[F], F[HealthProbe.Status])] = {
): Resource[F, LakeWriter[F]] = {
val w = target match {
case c: Config.Delta => new DeltaWriter(c)
case c: Config.Hudi => new HudiWriter(c)
case c: Config.Iceberg => new IcebergWriter(c)
}
for {
session <- SparkUtils.session[F](config, w)
isHealthy <- Resource.eval(Ref[F].of(initialHealthStatus))
writerParallelism = chooseWriterParallelism(config)
mutex1 <- Resource.eval(Mutex[F])
mutex2 <- Resource.eval(Mutex[F])
} yield (impl(session, w, isHealthy, writerParallelism, mutex1, mutex2), isHealthy.get)
} yield impl(session, w, writerParallelism, mutex1, mutex2)
}

private def initialHealthStatus: HealthProbe.Status =
HealthProbe.Unhealthy("Destination table not initialized yet")
def withHandledErrors[F[_]: Sync](underlying: LakeWriter[F], appHealth: AppHealth[F]): WithHandledErrors[F] = new WithHandledErrors[F] {
def createTable: F[Unit] =
underlying.createTable <* appHealth.setServiceHealth(AppHealth.Service.SparkWriter, isHealthy = true)

def initializeLocalDataFrame(viewName: String): F[Unit] =
underlying.initializeLocalDataFrame(viewName)

def localAppendRows(
viewName: String,
rows: NonEmptyList[Row],
schema: StructType
): F[Unit] =
underlying.localAppendRows(viewName, rows, schema)

def removeDataFrameFromDisk(viewName: String): F[Unit] =
underlying.removeDataFrameFromDisk(viewName)

def commit(viewName: String): F[Unit] =
underlying
.commit(viewName)
.onError { case _ =>
appHealth.setServiceHealth(AppHealth.Service.SparkWriter, isHealthy = false)
} <* appHealth.setServiceHealth(AppHealth.Service.SparkWriter, isHealthy = true)
}

/**
* Implementation of the LakeWriter
Expand All @@ -105,13 +127,12 @@ object LakeWriter {
private def impl[F[_]: Sync](
spark: SparkSession,
w: Writer,
isHealthy: Ref[F, HealthProbe.Status],
writerParallelism: Int,
mutexForWriting: Mutex[F],
mutexForUnioning: Mutex[F]
): LakeWriter[F] = new LakeWriter[F] {
def createTable: F[Unit] =
w.prepareTable(spark) <* isHealthy.set(HealthProbe.Healthy)
w.prepareTable(spark)

def initializeLocalDataFrame(viewName: String): F[Unit] =
SparkUtils.initializeLocalDataFrame(spark, viewName)
Expand All @@ -131,9 +152,10 @@ object LakeWriter {
df <- mutexForUnioning.lock.surround {
SparkUtils.prepareFinalDataFrame(spark, viewName, writerParallelism)
}
_ <- mutexForWriting.lock.surround {
w.write(df)
}
_ <- mutexForWriting.lock
.surround {
w.write(df)
}
} yield ()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor => BadRowProces
import com.snowplowanalytics.snowplow.badrows.Payload.{RawPayload => BadRowRawPayload}
import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, TokenedEvents}
import com.snowplowanalytics.snowplow.sinks.ListOfList
import com.snowplowanalytics.snowplow.lakes.{Environment, Metrics}
import com.snowplowanalytics.snowplow.lakes.{AppHealth, Environment, Metrics}
import com.snowplowanalytics.snowplow.runtime.processing.BatchUp
import com.snowplowanalytics.snowplow.runtime.syntax.foldable._
import com.snowplowanalytics.snowplow.loaders.transform.{
Expand Down Expand Up @@ -223,23 +223,27 @@ object Processing {
}
}

private def handleParseFailures[F[_]: Applicative, A](
private def handleParseFailures[F[_]: Sync, A](
env: Environment[F],
badProcessor: BadRowProcessor
): Pipe[F, ParseResult, ParseResult] =
_.evalTap { batch =>
sendFailedEvents(env, badProcessor, batch.bad)
}

private def sendFailedEvents[F[_]: Applicative, A](
private def sendFailedEvents[F[_]: Sync, A](
env: Environment[F],
badProcessor: BadRowProcessor,
bad: List[BadRow]
): F[Unit] =
if (bad.nonEmpty) {
val serialized = bad.map(badRow => BadRowsSerializer.withMaxSize(badRow, badProcessor, env.badRowMaxSize))
env.metrics.addBad(bad.size) *>
env.badSink.sinkSimple(ListOfList.of(List(serialized)))
env.badSink
.sinkSimple(ListOfList.of(List(serialized)))
.onError { case _ =>
env.appHealth.setServiceHealth(AppHealth.Service.BadSink, isHealthy = false)
}
} else Applicative[F].unit

private def finalizeWindow[F[_]: Sync](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,19 @@ object MockEnvironment {
def build(windows: List[List[TokenedEvents]]): IO[MockEnvironment] =
for {
state <- Ref[IO].of(Vector.empty[Action])
source = testSourceAndAck(windows, state)
appHealth <- AppHealth.init(10.seconds, source)
_ <- appHealth.setServiceHealth(AppHealth.Service.BadSink, isHealthy = true)
} yield {
val env = Environment(
appInfo = TestSparkEnvironment.appInfo,
source = testSourceAndAck(windows, state),
source = source,
badSink = testSink(state),
resolver = Resolver[IO](Nil, None),
httpClient = testHttpClient,
lakeWriter = testLakeWriter(state),
metrics = testMetrics(state),
appHealth = appHealth,
inMemBatchBytes = 1000000L,
cpuParallelism = 1,
windowing = EventProcessingConfig.TimedWindows(1.minute, 1.0, 1),
Expand All @@ -82,7 +86,7 @@ object MockEnvironment {
MockEnvironment(state, env)
}

private def testLakeWriter(state: Ref[IO, Vector[Action]]): LakeWriter[IO] = new LakeWriter[IO] {
private def testLakeWriter(state: Ref[IO, Vector[Action]]): LakeWriter.WithHandledErrors[IO] = new LakeWriter.WithHandledErrors[IO] {
def createTable: IO[Unit] =
state.update(_ :+ CreatedTable)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,20 @@ object TestSparkEnvironment {
windows: List[List[TokenedEvents]]
): Resource[IO, Environment[IO]] = for {
testConfig <- Resource.pure(TestConfig.defaults(target, tmpDir))
(lakeWriter, _) <- LakeWriter.build[IO](testConfig.spark, testConfig.output.good)
source = testSourceAndAck(windows)
appHealth <- Resource.eval(AppHealth.init(10.seconds, source))
_ <- Resource.eval(appHealth.setServiceHealth(AppHealth.Service.BadSink, isHealthy = true))
lakeWriter <- LakeWriter.build[IO](testConfig.spark, testConfig.output.good)
lakeWriterWrapped = LakeWriter.withHandledErrors(lakeWriter, appHealth)
} yield Environment(
appInfo = appInfo,
source = testSourceAndAck(windows),
source = source,
badSink = Sink[IO](_ => IO.unit),
resolver = Resolver[IO](Nil, None),
httpClient = testHttpClient,
lakeWriter = lakeWriter,
lakeWriter = lakeWriterWrapped,
metrics = testMetrics,
appHealth = appHealth,
inMemBatchBytes = 1000000L,
cpuParallelism = 1,
windowing = EventProcessingConfig.TimedWindows(1.minute, 1.0, 1),
Expand Down
Loading

0 comments on commit d8692fd

Please sign in to comment.