Skip to content

Commit

Permalink
Refactor application health monitoring
Browse files Browse the repository at this point in the history
Application health is needed for two distinct reasons:

1. Sending alerts to a monitoring webhook for setup errors. Relatedly,
   sending heartbeat messages to the webhook when healthy.
2. The HTTP health probe.  Needed so that the orchestration environment
   (kubernetes or whatever) can kill the pod when unhealthy

Several Snowplow apps each implement their own logic of when to toggle
the health probe and when to send an alert.  This PR consolidates that
logic into one place.

After this PR, the application code just needs to call methods on the
AppHealth class.  This lib then manages webhook events and health probe,
based on the current status of the AppHealth.
  • Loading branch information
istreeter committed Aug 13, 2024
1 parent d44d7e2 commit b29df23
Show file tree
Hide file tree
Showing 8 changed files with 882 additions and 106 deletions.
5 changes: 5 additions & 0 deletions modules/runtime-common/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ snowplow.defaults: {
period: "60 seconds"
}

webhook: {
tags: {}
heartbeat: "60 minutes"
}

telemetry: {
disable: false
interval: "15 minutes"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Copyright (c) 2014-present Snowplow Analytics Ltd. All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd.,
* under the terms of the Snowplow Limited Use License Agreement, Version 1.0
* located at https://docs.snowplow.io/limited-use-license-1.0
* BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
* OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/

package com.snowplowanalytics.snowplow.runtime

import cats.Eq
import cats.{Applicative, Monad}
import cats.implicits._
import cats.effect.{Async, Ref}
import fs2.concurrent.SignallingRef

/**
* Class to collect, store and provide the health statuses of services required by the App
*
* @tparam SetupAlert
* Sealed trait of the alerts this app is allowed to send to the webhook for setup errors
* @tparam RuntimeService
* Sealed trait of the services that this app requires to be healthy
*
* ==App Boilerplate==
*
* To use this class in a Snowplow app, first enumerate the runtime services required by this app:
*
* {{{
* trait MyAppRuntimeService
* case object BadRowsOutputStream extends MyAppRuntimeService
* case object EventsInputStream extends MyAppRuntimeService
* }}}
*
* Next, define the alerts this app is allowed to send to the webhook for setup errors
*
* {{{
* trait MyAppAlert
* case object BadPassword extends MyAppAlert
* case class MissingPermissions(moreDetails: String) extends MyAppAlert
* }}}
*
* Implement a `cats.Show` for the runtime services and for the alerts
*
* {{{
* val runtimeShow = Show[MyAppRuntimeService] {
* case BadRowsOutputStream => "Bad rows output stream"
* case EventsInputStream => "Events input stream"
* }
*
* val alertShow = Show[MyAppAlert] {
* case BadPassword => "Bad password"
* case MissingPermissions(moreDetails) => s"Missing permissions $moreDetails"
* }
* }}}
*
* Implement a `cats.Eq` for the alerts, so we can alert on anything uniquely new
* {{{
* val alertEq = Eq.fromUniversalEquals[Alert] // Do it better than this
* }}}
*
* ==Environment initialization==
*
* Initialize the AppHealth as part of App Environment initialization:
*
* {{{
* appHealth <- AppHealth.init[F, MyAppAlert, MyAppRuntimeService]
* }}}
*
* Initialize a health probe, so the app reports itself as unhealthy whenever a required service
* becomes unhealthy
*
* {{{
* _ <- HealthProbe.resource(port, appHealthy)
* }}}
*
* Initialize the webhook, so the app reports new setup errors or whenever the setup configuration
* becomes healthy
*
* {{{
* _ <- Webhook.resource(config, appInfo, httpClient, appHealth)
* }}}
*
* And finally, register any runtime service that provides its own health reporter. Not all services
* fall into this category, but the source stream does fall into this category.
*
* {{{
* _ <- appHealth.addRuntimeHealthReporter(EventsInputStream, sourceAndAckIsHealthy)
* }}}
*
* ==Application processing==
*
* Once the application enters its processing phase, you can set the health status for any runtime
* service or setup configuration
*
* {{{
* // After catching an exception in the bad events stream
* _ <- Logger[F].error(e)("Problem with bad rows output stream")
* _ <- appHealth.becomeUnhealthyForRuntimeService(BadRowsOutputStream)
*
* // After bad rows stream becomes healthy again
* _ <- Logger[F].debug("Bad rows output stream is ok")
* _ <- appHealth.becomeHealthyForRuntimeService(BadRowsOutputStream)
*
* // After catching an exception with the external setup configuration
* // Note this will send an alert webhook
* _ <- Logger[F].error(e)("Problem with the provided password")
* _ <- appHealth.becomeUnhealthyForSetup(BadPassword)
*
* // After successful connection to the externally configured services
* // Note this will send the first hearbeat webhook
* _ <- Logger[F].error(e)("Everything ok with the provided setup configuration")
* _ <- appHealth.becomeHealthyForSetup
* }}}
*
* The application processing code does not need to explicitly send any monitoring alert or adjust
* the health probe return code.
*/
class AppHealth[F[_]: Monad, SetupAlert, RuntimeService] private (
private[runtime] val setupHealth: SignallingRef[F, AppHealth.SetupStatus[SetupAlert]],
runtimeHealth: Ref[F, Map[RuntimeService, F[AppHealth.RuntimeServiceStatus]]]
) extends AppHealth.Interface[F, SetupAlert, RuntimeService] {
import AppHealth._

def becomeHealthyForSetup: F[Unit] =
setupHealth.set(SetupStatus.Healthy)

def becomeUnhealthyForSetup(alert: SetupAlert): F[Unit] =
setupHealth.set(SetupStatus.Unhealthy(alert))

def becomeHealthyForRuntimeService(service: RuntimeService): F[Unit] =
runtimeHealth.update(_ - service)

def becomeUnhealthyForRuntimeService(service: RuntimeService): F[Unit] =
runtimeHealth.update(_ + (service -> Applicative[F].pure(RuntimeServiceStatus.Unhealthy)))

def addRuntimeHealthReporter(service: RuntimeService, reporter: F[RuntimeServiceStatus]): F[Unit] =
runtimeHealth.update(_ + (service -> reporter))

private[runtime] def unhealthyRuntimeServices: F[List[RuntimeService]] =
for {
asMap <- runtimeHealth.get
pairs <- asMap.toList.traverse { case (service, statusF) =>
statusF.map((_, service))
}
} yield pairs.collect { case (RuntimeServiceStatus.Unhealthy, service) => service }
}

object AppHealth {

/**
* Subset of AppHealth interface required during app's event-processing phase
*
* This interface should be part of the App's Environment. Processing Specs can provide a custom
* implementation.
*/
trait Interface[F[_], -SetupAlert, -RuntimeService] {
def becomeHealthyForSetup: F[Unit]

def becomeUnhealthyForSetup(alert: SetupAlert): F[Unit]

def becomeHealthyForRuntimeService(service: RuntimeService): F[Unit]

def becomeUnhealthyForRuntimeService(service: RuntimeService): F[Unit]
}

private[runtime] sealed trait SetupStatus[+Alert]
private[runtime] object SetupStatus {
case object AwaitingHealth extends SetupStatus[Nothing]
case object Healthy extends SetupStatus[Nothing]
case class Unhealthy[Alert](alert: Alert) extends SetupStatus[Alert]

implicit def eq[Alert: Eq]: Eq[SetupStatus[Alert]] = Eq.instance {
case (Healthy, Healthy) => true
case (Healthy, _) => false
case (AwaitingHealth, AwaitingHealth) => true
case (AwaitingHealth, _) => false
case (Unhealthy(a1), Unhealthy(a2)) => Eq[Alert].eqv(a1, a2)
case (Unhealthy(_), _) => false
}
}

sealed trait RuntimeServiceStatus
object RuntimeServiceStatus {
case object Healthy extends RuntimeServiceStatus
case object Unhealthy extends RuntimeServiceStatus
}

/**
* Initialize the AppHealth
*
* @tparam SetupAlert
* Sealed trait of the alerts this app is allowed to send to the webhook for setup errors
* @tparam RuntimeService
* Sealed trait of the services that this app requires to be healthy
*/
def init[F[_]: Async, SetupAlert, RuntimeService]: F[AppHealth[F, SetupAlert, RuntimeService]] =
for {
setupHealth <- SignallingRef[F, SetupStatus[SetupAlert]](SetupStatus.AwaitingHealth)
runtimeHealth <- Ref[F].of(Map.empty[RuntimeService, F[AppHealth.RuntimeServiceStatus]])
} yield new AppHealth(setupHealth, runtimeHealth)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
*/
package com.snowplowanalytics.snowplow.runtime

import cats.effect.{Async, Resource, Sync}
import cats.data.Kleisli
import cats.Show
import cats.implicits._
import cats.data.Kleisli
import cats.effect.{Async, Resource, Sync}
import com.comcast.ip4s.{Ipv4Address, Port}
import io.circe.Decoder
import org.http4s.ember.server.EmberServerBuilder
Expand All @@ -22,18 +23,17 @@ object HealthProbe {

private implicit def logger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F]

sealed trait Status
case object Healthy extends Status
case class Unhealthy(reason: String) extends Status

def resource[F[_]: Async](port: Port, isHealthy: F[Status]): Resource[F, Unit] = {
def resource[F[_]: Async, RuntimeService: Show](
port: Port,
appHealth: AppHealth[F, ?, RuntimeService]
): Resource[F, Unit] = {
implicit val network: Network[F] = Network.forAsync[F]
EmberServerBuilder
.default[F]
.withHost(Ipv4Address.fromBytes(0, 0, 0, 0))
.withPort(port)
.withMaxConnections(1)
.withHttpApp(httpApp(isHealthy))
.withHttpApp(httpApp(appHealth))
.build
.evalTap { _ =>
Logger[F].info(s"Health service listening on port $port")
Expand All @@ -47,17 +47,49 @@ object HealthProbe {
}
}

private def httpApp[F[_]: Sync](isHealthy: F[Status]): HttpApp[F] =
private[runtime] def httpApp[F[_]: Sync, RuntimeService: Show](
appHealth: AppHealth[F, ?, RuntimeService]
): HttpApp[F] =
Kleisli { _ =>
isHealthy.flatMap {
case Healthy =>
val problemsF = for {
runtimeUnhealthies <- appHealth.unhealthyRuntimeServices
setupHealth <- appHealth.setupHealth.get
} yield {
val allUnhealthy = runtimeUnhealthies.map(_.show) ++ (setupHealth match {
case AppHealth.SetupStatus.Unhealthy(_) => Some("External setup configuration")
case _ => None
})

val allAwaiting = setupHealth match {
case AppHealth.SetupStatus.AwaitingHealth => Some("External setup configuration")
case _ => None
}

val unhealthyMsg = if (allUnhealthy.nonEmpty) {
val joined = allUnhealthy.mkString("Services are unhealthy [", ", ", "]")
Some(joined)
} else None

val awaitingMsg = if (allAwaiting.nonEmpty) {
val joined = allUnhealthy.mkString("Services are awaiting a healthy status [", ", ", "]")
Some(joined)
} else None

if (unhealthyMsg.isEmpty && awaitingMsg.isEmpty)
None
else
Some((unhealthyMsg ++ awaitingMsg).mkString(" AND "))
}

problemsF.flatMap {
case Some(errorMsg) =>
Logger[F].warn(s"Health probe returning 503: $errorMsg").as {
Response(status = Status.ServiceUnavailable)
}
case None =>
Logger[F].debug("Health probe returning 200").as {
Response(status = Status.Ok)
}
case Unhealthy(reason) =>
Logger[F].warn(s"Health probe returning 503: $reason").as {
Response(status = Status.ServiceUnavailable)
}
}
}

Expand Down
Loading

0 comments on commit b29df23

Please sign in to comment.