Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor application health monitoring #82

Merged
merged 7 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions modules/runtime-common/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ snowplow.defaults: {
period: "60 seconds"
}

webhook: {
tags: {}
heartbeat: "60 minutes"
}

telemetry: {
disable: false
interval: "15 minutes"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
* Copyright (c) 2014-present Snowplow Analytics Ltd. All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd.,
* under the terms of the Snowplow Limited Use License Agreement, Version 1.0
* located at https://docs.snowplow.io/limited-use-license-1.0
* BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
* OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/

package com.snowplowanalytics.snowplow.runtime

import cats.{Eq, Show}
import cats.Monad
import cats.implicits._
import cats.effect.{Async, Ref}
import fs2.concurrent.SignallingRef

/**
* Class to collect, store and provide the health statuses of services required by the App
*
* @tparam SetupAlert
* Sealed trait of the alerts this app is allowed to send to the webhook for setup errors
* @tparam RuntimeService
* Sealed trait of the services that this app requires to be healthy
*
* ==App Boilerplate==
*
* To use this class in a Snowplow app, first enumerate the runtime services required by this app:
*
* {{{
* trait MyAppRuntimeService
* case object BadRowsOutputStream extends MyAppRuntimeService
* case object IgluClient extends MyAppRuntimeService
* }}}
*
* Next, define the alerts this app is allowed to send to the webhook for setup errors
*
* {{{
* trait MyAppAlert
* case object BadPassword extends MyAppAlert
* case class MissingPermissions(moreDetails: String) extends MyAppAlert
* }}}
*
* Implement a `cats.Show` for the runtime services and for the alerts
*
* {{{
* val runtimeShow = Show[MyAppRuntimeService] {
* case BadRowsOutputStream => "Bad rows output stream"
* case IgluClient => "Iglu client"
* }
*
* val alertShow = Show[MyAppAlert] {
* case BadPassword => "Bad password"
* case MissingPermissions(moreDetails) => "Missing permissions " + moreDetails
* }
* }}}
*
* ==Environment initialization==
*
* Initialize the AppHealth as part of App Environment initialization:
*
* {{{
* appHealth <- AppHealth.init[F, MyAppAlert, MyAppRuntimeService]
* }}}
*
* Initialize a health probe, so the app reports itself as unhealthy whenever a required service
* becomes unhealthy
*
* {{{
* _ <- HealthProbe.resource(port, appHealthy)
* }}}
*
* Initialize the webhook, so the app reports new setup errors or whenever the setup configuration
* becomes healthy
*
* {{{
* _ <- Webhook.resource(config, appInfo, httpClient, appHealth)
* }}}
*
* And finally, register any runtime service that provides its own health reporter. Not all services
* fall into this category, but the source stream does fall into this category.
*
* {{{
* _ <- appHealth.addRuntimeHealthReporter(sourceAndAckIsHealthy)
* }}}
*
* ==Application processing==
*
* Once the application enters its processing phase, you can set the health status for any runtime
* service or setup configuration
*
* {{{
* // After catching an exception in the bad events stream
* _ <- Logger[F].error(e)("Problem with bad rows output stream")
* _ <- appHealth.beUnhealthyForRuntimeService(BadRowsOutputStream)
*
* // After bad rows stream becomes healthy again
* _ <- Logger[F].debug("Bad rows output stream is ok")
* _ <- appHealth.beHealthyForRuntimeService(BadRowsOutputStream)
*
* // After catching an exception with the external setup configuration
* // Note this will send an alert webhook
* _ <- Logger[F].error(e)("Problem with the provided password")
* _ <- appHealth.beUnhealthyForSetup(BadPassword)
*
* // After successful connection to the externally configured services
* // Note this will send the first hearbeat webhook
* _ <- Logger[F].error(e)("Everything ok with the provided setup configuration")
* _ <- appHealth.beHealthyForSetup
* }}}
*
* The application processing code does not need to explicitly send any monitoring alert or adjust
* the health probe return code.
*/
class AppHealth[F[_]: Monad, SetupAlert, RuntimeService] private (
pondzix marked this conversation as resolved.
Show resolved Hide resolved
private[runtime] val setupHealth: SignallingRef[F, AppHealth.SetupStatus[SetupAlert]],
unhealthyRuntimeServices: Ref[F, Set[RuntimeService]],
runtimeServiceReporters: List[F[Option[String]]]
) extends AppHealth.Interface[F, SetupAlert, RuntimeService] {
import AppHealth._

def beHealthyForSetup: F[Unit] =
setupHealth.set(SetupStatus.Healthy)

def beUnhealthyForSetup(alert: SetupAlert): F[Unit] =
setupHealth.set(SetupStatus.Unhealthy(alert))

def beHealthyForRuntimeService(service: RuntimeService): F[Unit] =
unhealthyRuntimeServices.update(_ - service)

def beUnhealthyForRuntimeService(service: RuntimeService): F[Unit] =
unhealthyRuntimeServices.update(_ + service)

private[runtime] def unhealthyRuntimeServiceMessages(implicit show: Show[RuntimeService]): F[List[String]] =
for {
services <- unhealthyRuntimeServices.get
extras <- runtimeServiceReporters.sequence
} yield services.toList.map(_.show) ::: extras.flatten
}

object AppHealth {

/**
* Subset of AppHealth interface required during app's event-processing phase
*
* This interface should be part of the App's Environment. Processing Specs can provide a custom
* implementation.
*/
trait Interface[F[_], -SetupAlert, -RuntimeService] {
pondzix marked this conversation as resolved.
Show resolved Hide resolved
def beHealthyForSetup: F[Unit]

def beUnhealthyForSetup(alert: SetupAlert): F[Unit]

def beHealthyForRuntimeService(service: RuntimeService): F[Unit]

def beUnhealthyForRuntimeService(service: RuntimeService): F[Unit]
}

private[runtime] sealed trait SetupStatus[+Alert]
private[runtime] object SetupStatus {
case object AwaitingHealth extends SetupStatus[Nothing]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It only affects what is printed in logs, right? In health probe it still means 503 unhealthy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • When we first transition to Healthy then we send a heartbeat event to the webhook
  • When we first transition to Unhealthy then we send a alert event to the webhook

Therefore we need a type which is neither Healthy nor Unhealthy so we detect the transition. Hence AwaitingHealth.

case object Healthy extends SetupStatus[Nothing]
case class Unhealthy[Alert](alert: Alert) extends SetupStatus[Alert]

implicit def eq[Alert: Show]: Eq[SetupStatus[Alert]] = Eq.instance {
case (Healthy, Healthy) => true
case (Healthy, _) => false
case (AwaitingHealth, AwaitingHealth) => true
case (AwaitingHealth, _) => false
case (Unhealthy(a1), Unhealthy(a2)) => a1.show === a2.show
case (Unhealthy(_), _) => false
}
}

/**
* Initialize the AppHealth
*
* @tparam SetupAlert
* Sealed trait of the alerts this app is allowed to send to the webhook for setup errors
* @tparam RuntimeService
* Sealed trait of the services that this app requires to be healthy
*
* @param runtimeReporters
* Reporters for any additional service, not covered by `RuntimeService`. Reporters provide a
* String if a service is unhealthy. The String must be a short description of why the service
* is unhealthy.
*/
def init[F[_]: Async, SetupAlert, RuntimeService](
runtimeReporters: List[F[Option[String]]]
): F[AppHealth[F, SetupAlert, RuntimeService]] =
for {
setupHealth <- SignallingRef[F, SetupStatus[SetupAlert]](SetupStatus.AwaitingHealth)
unhealthyRuntimeServices <- Ref[F].of(Set.empty[RuntimeService])
} yield new AppHealth(setupHealth, unhealthyRuntimeServices, runtimeReporters)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
*/
package com.snowplowanalytics.snowplow.runtime

import cats.effect.{Async, Resource, Sync}
import cats.data.Kleisli
import cats.Show
import cats.implicits._
import cats.data.Kleisli
import cats.effect.{Async, Resource, Sync}
import com.comcast.ip4s.{Ipv4Address, Port}
import io.circe.Decoder
import org.http4s.ember.server.EmberServerBuilder
Expand All @@ -22,18 +23,17 @@ object HealthProbe {

private implicit def logger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F]

sealed trait Status
case object Healthy extends Status
case class Unhealthy(reason: String) extends Status

def resource[F[_]: Async](port: Port, isHealthy: F[Status]): Resource[F, Unit] = {
def resource[F[_]: Async, RuntimeService: Show](
port: Port,
appHealth: AppHealth[F, ?, RuntimeService]
): Resource[F, Unit] = {
implicit val network: Network[F] = Network.forAsync[F]
EmberServerBuilder
.default[F]
.withHost(Ipv4Address.fromBytes(0, 0, 0, 0))
.withPort(port)
.withMaxConnections(1)
.withHttpApp(httpApp(isHealthy))
.withHttpApp(httpApp(appHealth))
.build
.evalTap { _ =>
Logger[F].info(s"Health service listening on port $port")
Expand All @@ -47,17 +47,49 @@ object HealthProbe {
}
}

private def httpApp[F[_]: Sync](isHealthy: F[Status]): HttpApp[F] =
private[runtime] def httpApp[F[_]: Sync, RuntimeService: Show](
appHealth: AppHealth[F, ?, RuntimeService]
): HttpApp[F] =
Kleisli { _ =>
isHealthy.flatMap {
case Healthy =>
val problemsF = for {
runtimeUnhealthies <- appHealth.unhealthyRuntimeServiceMessages
setupHealth <- appHealth.setupHealth.get
} yield {
val allUnhealthy = runtimeUnhealthies ++ (setupHealth match {
case AppHealth.SetupStatus.Unhealthy(_) => Some("External setup configuration")
case _ => None
})

val allAwaiting = setupHealth match {
case AppHealth.SetupStatus.AwaitingHealth => Some("External setup configuration")
case _ => None
}

val unhealthyMsg = if (allUnhealthy.nonEmpty) {
val joined = allUnhealthy.mkString("Services are unhealthy [", ", ", "]")
Some(joined)
} else None

val awaitingMsg = if (allAwaiting.nonEmpty) {
val joined = allAwaiting.mkString("Services are awaiting a healthy status [", ", ", "]")
Some(joined)
} else None

if (unhealthyMsg.isEmpty && awaitingMsg.isEmpty)
None
else
Some((unhealthyMsg ++ awaitingMsg).mkString(" AND "))
}

problemsF.flatMap {
case Some(errorMsg) =>
Logger[F].warn(s"Health probe returning 503: $errorMsg").as {
Response(status = Status.ServiceUnavailable)
}
case None =>
Logger[F].debug("Health probe returning 200").as {
Response(status = Status.Ok)
}
case Unhealthy(reason) =>
Logger[F].warn(s"Health probe returning 503: $reason").as {
Response(status = Status.ServiceUnavailable)
}
}
}

Expand Down
Loading
Loading