diff --git a/modules/runtime-common/src/main/resources/reference.conf b/modules/runtime-common/src/main/resources/reference.conf index 675be4f9..e218bcc0 100644 --- a/modules/runtime-common/src/main/resources/reference.conf +++ b/modules/runtime-common/src/main/resources/reference.conf @@ -5,6 +5,11 @@ snowplow.defaults: { period: "60 seconds" } + webhook: { + tags: {} + heartbeat: "60 minutes" + } + telemetry: { disable: false interval: "15 minutes" diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala new file mode 100644 index 00000000..90f03ed4 --- /dev/null +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala @@ -0,0 +1,196 @@ +/* + * Copyright (c) 2014-present Snowplow Analytics Ltd. All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ + +package com.snowplowanalytics.snowplow.runtime + +import cats.{Eq, Show} +import cats.Monad +import cats.implicits._ +import cats.effect.{Async, Ref} +import fs2.concurrent.SignallingRef + +/** + * Class to collect, store and provide the health statuses of services required by the App + * + * @tparam SetupAlert + * Sealed trait of the alerts this app is allowed to send to the webhook for setup errors + * @tparam RuntimeService + * Sealed trait of the services that this app requires to be healthy + * + * ==App Boilerplate== + * + * To use this class in a Snowplow app, first enumerate the runtime services required by this app: + * + * {{{ + * trait MyAppRuntimeService + * case object BadRowsOutputStream extends MyAppRuntimeService + * case object IgluClient extends MyAppRuntimeService + * }}} + * + * Next, define the alerts this app is allowed to send to the webhook for setup errors + * + * {{{ + * trait MyAppAlert + * case object BadPassword extends MyAppAlert + * case class MissingPermissions(moreDetails: String) extends MyAppAlert + * }}} + * + * Implement a `cats.Show` for the runtime services and for the alerts + * + * {{{ + * val runtimeShow = Show[MyAppRuntimeService] { + * case BadRowsOutputStream => "Bad rows output stream" + * case IgluClient => "Iglu client" + * } + * + * val alertShow = Show[MyAppAlert] { + * case BadPassword => "Bad password" + * case MissingPermissions(moreDetails) => "Missing permissions " + moreDetails + * } + * }}} + * + * ==Environment initialization== + * + * Initialize the AppHealth as part of App Environment initialization: + * + * {{{ + * appHealth <- AppHealth.init[F, MyAppAlert, MyAppRuntimeService] + * }}} + * + * Initialize a health probe, so the app reports itself as unhealthy whenever a required service + * becomes unhealthy + * + * {{{ + * _ <- HealthProbe.resource(port, appHealthy) + * }}} + * + * Initialize the webhook, so the app reports new setup errors or whenever the setup configuration + * becomes healthy + * + * {{{ + * _ <- Webhook.resource(config, appInfo, httpClient, appHealth) + * }}} + * + * And finally, register any runtime service that provides its own health reporter. Not all services + * fall into this category, but the source stream does fall into this category. + * + * {{{ + * _ <- appHealth.addRuntimeHealthReporter(sourceAndAckIsHealthy) + * }}} + * + * ==Application processing== + * + * Once the application enters its processing phase, you can set the health status for any runtime + * service or setup configuration + * + * {{{ + * // After catching an exception in the bad events stream + * _ <- Logger[F].error(e)("Problem with bad rows output stream") + * _ <- appHealth.beUnhealthyForRuntimeService(BadRowsOutputStream) + * + * // After bad rows stream becomes healthy again + * _ <- Logger[F].debug("Bad rows output stream is ok") + * _ <- appHealth.beHealthyForRuntimeService(BadRowsOutputStream) + * + * // After catching an exception with the external setup configuration + * // Note this will send an alert webhook + * _ <- Logger[F].error(e)("Problem with the provided password") + * _ <- appHealth.beUnhealthyForSetup(BadPassword) + * + * // After successful connection to the externally configured services + * // Note this will send the first hearbeat webhook + * _ <- Logger[F].error(e)("Everything ok with the provided setup configuration") + * _ <- appHealth.beHealthyForSetup + * }}} + * + * The application processing code does not need to explicitly send any monitoring alert or adjust + * the health probe return code. + */ +class AppHealth[F[_]: Monad, SetupAlert, RuntimeService] private ( + private[runtime] val setupHealth: SignallingRef[F, AppHealth.SetupStatus[SetupAlert]], + unhealthyRuntimeServices: Ref[F, Set[RuntimeService]], + runtimeServiceReporters: List[F[Option[String]]] +) extends AppHealth.Interface[F, SetupAlert, RuntimeService] { + import AppHealth._ + + def beHealthyForSetup: F[Unit] = + setupHealth.set(SetupStatus.Healthy) + + def beUnhealthyForSetup(alert: SetupAlert): F[Unit] = + setupHealth.set(SetupStatus.Unhealthy(alert)) + + def beHealthyForRuntimeService(service: RuntimeService): F[Unit] = + unhealthyRuntimeServices.update(_ - service) + + def beUnhealthyForRuntimeService(service: RuntimeService): F[Unit] = + unhealthyRuntimeServices.update(_ + service) + + private[runtime] def unhealthyRuntimeServiceMessages(implicit show: Show[RuntimeService]): F[List[String]] = + for { + services <- unhealthyRuntimeServices.get + extras <- runtimeServiceReporters.sequence + } yield services.toList.map(_.show) ::: extras.flatten +} + +object AppHealth { + + /** + * Subset of AppHealth interface required during app's event-processing phase + * + * This interface should be part of the App's Environment. Processing Specs can provide a custom + * implementation. + */ + trait Interface[F[_], -SetupAlert, -RuntimeService] { + def beHealthyForSetup: F[Unit] + + def beUnhealthyForSetup(alert: SetupAlert): F[Unit] + + def beHealthyForRuntimeService(service: RuntimeService): F[Unit] + + def beUnhealthyForRuntimeService(service: RuntimeService): F[Unit] + } + + private[runtime] sealed trait SetupStatus[+Alert] + private[runtime] object SetupStatus { + case object AwaitingHealth extends SetupStatus[Nothing] + case object Healthy extends SetupStatus[Nothing] + case class Unhealthy[Alert](alert: Alert) extends SetupStatus[Alert] + + implicit def eq[Alert: Show]: Eq[SetupStatus[Alert]] = Eq.instance { + case (Healthy, Healthy) => true + case (Healthy, _) => false + case (AwaitingHealth, AwaitingHealth) => true + case (AwaitingHealth, _) => false + case (Unhealthy(a1), Unhealthy(a2)) => a1.show === a2.show + case (Unhealthy(_), _) => false + } + } + + /** + * Initialize the AppHealth + * + * @tparam SetupAlert + * Sealed trait of the alerts this app is allowed to send to the webhook for setup errors + * @tparam RuntimeService + * Sealed trait of the services that this app requires to be healthy + * + * @param runtimeReporters + * Reporters for any additional service, not covered by `RuntimeService`. Reporters provide a + * String if a service is unhealthy. The String must be a short description of why the service + * is unhealthy. + */ + def init[F[_]: Async, SetupAlert, RuntimeService]( + runtimeReporters: List[F[Option[String]]] + ): F[AppHealth[F, SetupAlert, RuntimeService]] = + for { + setupHealth <- SignallingRef[F, SetupStatus[SetupAlert]](SetupStatus.AwaitingHealth) + unhealthyRuntimeServices <- Ref[F].of(Set.empty[RuntimeService]) + } yield new AppHealth(setupHealth, unhealthyRuntimeServices, runtimeReporters) +} diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HealthProbe.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HealthProbe.scala index 418cca9b..f3e019e5 100644 --- a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HealthProbe.scala +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HealthProbe.scala @@ -7,9 +7,10 @@ */ package com.snowplowanalytics.snowplow.runtime -import cats.effect.{Async, Resource, Sync} -import cats.data.Kleisli +import cats.Show import cats.implicits._ +import cats.data.Kleisli +import cats.effect.{Async, Resource, Sync} import com.comcast.ip4s.{Ipv4Address, Port} import io.circe.Decoder import org.http4s.ember.server.EmberServerBuilder @@ -22,18 +23,17 @@ object HealthProbe { private implicit def logger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] - sealed trait Status - case object Healthy extends Status - case class Unhealthy(reason: String) extends Status - - def resource[F[_]: Async](port: Port, isHealthy: F[Status]): Resource[F, Unit] = { + def resource[F[_]: Async, RuntimeService: Show]( + port: Port, + appHealth: AppHealth[F, ?, RuntimeService] + ): Resource[F, Unit] = { implicit val network: Network[F] = Network.forAsync[F] EmberServerBuilder .default[F] .withHost(Ipv4Address.fromBytes(0, 0, 0, 0)) .withPort(port) .withMaxConnections(1) - .withHttpApp(httpApp(isHealthy)) + .withHttpApp(httpApp(appHealth)) .build .evalTap { _ => Logger[F].info(s"Health service listening on port $port") @@ -47,17 +47,49 @@ object HealthProbe { } } - private def httpApp[F[_]: Sync](isHealthy: F[Status]): HttpApp[F] = + private[runtime] def httpApp[F[_]: Sync, RuntimeService: Show]( + appHealth: AppHealth[F, ?, RuntimeService] + ): HttpApp[F] = Kleisli { _ => - isHealthy.flatMap { - case Healthy => + val problemsF = for { + runtimeUnhealthies <- appHealth.unhealthyRuntimeServiceMessages + setupHealth <- appHealth.setupHealth.get + } yield { + val allUnhealthy = runtimeUnhealthies ++ (setupHealth match { + case AppHealth.SetupStatus.Unhealthy(_) => Some("External setup configuration") + case _ => None + }) + + val allAwaiting = setupHealth match { + case AppHealth.SetupStatus.AwaitingHealth => Some("External setup configuration") + case _ => None + } + + val unhealthyMsg = if (allUnhealthy.nonEmpty) { + val joined = allUnhealthy.mkString("Services are unhealthy [", ", ", "]") + Some(joined) + } else None + + val awaitingMsg = if (allAwaiting.nonEmpty) { + val joined = allAwaiting.mkString("Services are awaiting a healthy status [", ", ", "]") + Some(joined) + } else None + + if (unhealthyMsg.isEmpty && awaitingMsg.isEmpty) + None + else + Some((unhealthyMsg ++ awaitingMsg).mkString(" AND ")) + } + + problemsF.flatMap { + case Some(errorMsg) => + Logger[F].warn(s"Health probe returning 503: $errorMsg").as { + Response(status = Status.ServiceUnavailable) + } + case None => Logger[F].debug("Health probe returning 200").as { Response(status = Status.Ok) } - case Unhealthy(reason) => - Logger[F].warn(s"Health probe returning 503: $reason").as { - Response(status = Status.ServiceUnavailable) - } } } diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Retrying.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Retrying.scala new file mode 100644 index 00000000..3a872939 --- /dev/null +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Retrying.scala @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2014-present Snowplow Analytics Ltd. All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ + +package com.snowplowanalytics.snowplow.runtime + +import cats.{Applicative, Show} +import cats.effect.Sync +import cats.implicits._ +import retry._ +import io.circe.Decoder +import io.circe.generic.semiauto._ +import io.circe.config.syntax._ +import retry.implicits.retrySyntaxError +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +import scala.concurrent.duration.FiniteDuration + +object Retrying { + + private implicit def logger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] + + // Publicly accessible to help Snowplow apps that manage their own retrying + implicit def showRetryDetails: Show[RetryDetails] = Show { + case RetryDetails.GivingUp(totalRetries, totalDelay) => + s"Giving up on retrying, total retries: $totalRetries, total delay: ${totalDelay.toSeconds} seconds" + case RetryDetails.WillDelayAndRetry(nextDelay, retriesSoFar, cumulativeDelay) => + s"Will retry in ${nextDelay.toMillis} milliseconds, retries so far: $retriesSoFar, total delay so far: ${cumulativeDelay.toMillis} milliseconds" + } + + object Config { + + case class ForSetup(delay: FiniteDuration) + object ForSetup { + implicit def decoder: Decoder[ForSetup] = + deriveDecoder + } + + case class ForTransient(delay: FiniteDuration, attempts: Int) + object ForTransient { + implicit def decoder: Decoder[ForTransient] = + deriveDecoder + } + } + + def withRetries[F[_]: Sync: Sleep, Alert, RuntimeService, A]( + appHealth: AppHealth.Interface[F, Alert, RuntimeService], + configForTransient: Config.ForTransient, + configForSetup: Config.ForSetup, + service: RuntimeService, + toAlert: SetupExceptionMessages => Alert, + setupErrorCheck: PartialFunction[Throwable, String] + )( + action: F[A] + ): F[A] = + action + .retryingOnSomeErrors( + isWorthRetrying = checkingNestedExceptions(setupErrorCheck, _).nonEmpty.pure[F], + policy = policyForSetupErrors[F](configForSetup), + onError = logErrorAndSendAlert(appHealth, setupErrorCheck, toAlert, _, _) + ) + .retryingOnAllErrors( + policy = policyForTransientErrors[F](configForTransient), + onError = logErrorAndReportUnhealthy(appHealth, service, _, _) + ) + .productL(appHealth.beHealthyForRuntimeService(service)) + + private def policyForSetupErrors[F[_]: Applicative](config: Config.ForSetup): RetryPolicy[F] = + RetryPolicies.exponentialBackoff[F](config.delay) + + private def policyForTransientErrors[F[_]: Applicative](config: Config.ForTransient): RetryPolicy[F] = + RetryPolicies.fullJitter[F](config.delay).join(RetryPolicies.limitRetries(config.attempts - 1)) + + private def logErrorAndSendAlert[F[_]: Sync, Alert, RuntimeService]( + appHealth: AppHealth.Interface[F, Alert, RuntimeService], + setupErrorCheck: PartialFunction[Throwable, String], + toAlert: SetupExceptionMessages => Alert, + error: Throwable, + details: RetryDetails + ): F[Unit] = + logError(error, details) *> appHealth.beUnhealthyForSetup( + toAlert(SetupExceptionMessages(checkingNestedExceptions(setupErrorCheck, error))) + ) + + private def logError[F[_]: Sync](error: Throwable, details: RetryDetails): F[Unit] = + Logger[F].error(error)(show"Executing command failed. $details") + + private def logErrorAndReportUnhealthy[F[_]: Sync, RuntimeService]( + appHealth: AppHealth.Interface[F, ?, RuntimeService], + service: RuntimeService, + error: Throwable, + details: RetryDetails + ): F[Unit] = + logError(error, details) *> appHealth.beUnhealthyForRuntimeService(service) + + // Returns a list of reasons of why this was a destination setup error. + // Or empty list if this was not caused by a destination setup error + private def checkingNestedExceptions( + setupErrorCheck: PartialFunction[Throwable, String], + t: Throwable + ): List[String] = + unnestThrowableCauses(t).map(setupErrorCheck.lift).flatten + + private def unnestThrowableCauses(t: Throwable): List[Throwable] = { + def go(t: Throwable, acc: List[Throwable]): List[Throwable] = + Option(t.getCause) match { + case Some(cause) => go(cause, cause :: acc) + case None => acc.reverse + } + go(t, List(t)) + } +} diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/SetupExceptionMessages.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/SetupExceptionMessages.scala new file mode 100644 index 00000000..74d69daa --- /dev/null +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/SetupExceptionMessages.scala @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2014-present Snowplow Analytics Ltd. All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ +package com.snowplowanalytics.snowplow.runtime + +import cats.Show + +/** + * Represents all messages from an Exception associated with a setup error + * + * Messages are expected to be included in the webhook payload. This library assumes each message + * has been cleaned and sanitized by the Snowplow application. + */ +case class SetupExceptionMessages(value: List[String]) + +object SetupExceptionMessages { + + implicit def showSetupExceptionMessages: Show[SetupExceptionMessages] = { + def removeDuplicateMessages(in: List[String]): List[String] = + in match { + case h :: t :: rest => + if (h.contains(t)) removeDuplicateMessages(h :: rest) + else if (t.contains(h)) removeDuplicateMessages(t :: rest) + else h :: removeDuplicateMessages(t :: rest) + case fewer => fewer + } + + Show.show { exceptionMessages => + removeDuplicateMessages(exceptionMessages.value).mkString(": ") + } + } +} diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Webhook.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Webhook.scala index 2b6aeb23..bd8d9d0d 100644 --- a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Webhook.scala +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Webhook.scala @@ -10,12 +10,15 @@ package com.snowplowanalytics.snowplow.runtime -import cats.effect.{Async, Sync} +import cats.effect.{Async, Resource, Sync} +import cats.effect.implicits._ import cats.implicits._ import cats.Show +import fs2.{Pipe, Pull, Stream} import io.circe.{Decoder, Json} import io.circe.generic.semiauto._ import io.circe.syntax._ +import io.circe.config.syntax._ import org.http4s.circe.jsonEncoder import org.http4s.client.Client import org.http4s.{Method, Request} @@ -23,16 +26,18 @@ import org.http4s.{ParseFailure, Uri} import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger +import scala.concurrent.duration.FiniteDuration + import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} -trait Webhook[F[_], Alert] { - def alert(message: Alert): F[Unit] -} - object Webhook { - final case class Config(endpoint: Uri, tags: Map[String, String]) + final case class Config( + endpoint: Option[Uri], + tags: Map[String, String], + heartbeat: FiniteDuration + ) object Config { implicit def webhookConfigDecoder: Decoder[Config] = { @@ -42,32 +47,61 @@ object Webhook { } } - private implicit def logger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] + def resource[F[_]: Async, SetupAlert: Show]( + config: Config, + appInfo: AppInfo, + httpClient: Client[F], + appHealth: AppHealth[F, SetupAlert, ?] + ): Resource[F, Unit] = + stream(config, appInfo, httpClient, appHealth).compile.drain.background.void - def create[F[_]: Async, Alert: Show]( - config: Option[Config], + def stream[F[_]: Async, SetupAlert: Show]( + config: Config, appInfo: AppInfo, - httpClient: Client[F] - ): Webhook[F, Alert] = new Webhook[F, Alert] { - - override def alert(message: Alert): F[Unit] = - config match { - case Some(webhookConfig) => - val request = buildHttpRequest[F, Alert](webhookConfig, appInfo, message) - Logger[F].info(show"Sending alert to ${webhookConfig.endpoint} with details of the setup error...") *> - executeHttpRequest[F](webhookConfig, httpClient, request) - case None => - Logger[F].debug(s"Webhook monitoring is not configured, skipping alert: $message") + httpClient: Client[F], + appHealth: AppHealth[F, SetupAlert, ?] + ): Stream[F, Nothing] = + appHealth.setupHealth.discrete.changes + .through(repeatPeriodically(config.heartbeat)) + .flatMap { + case AppHealth.SetupStatus.AwaitingHealth => + Stream.empty + case AppHealth.SetupStatus.Healthy => + buildHeartbeatHttpRequest[F](config, appInfo) + case AppHealth.SetupStatus.Unhealthy(alert) => + buildAlertHttpRequest[F, SetupAlert](config, appInfo, alert) } - } + .evalMap(executeHttpRequest(config, httpClient, _)) + .drain + + private implicit def logger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] - private def buildHttpRequest[F[_], Alert: Show]( + private def buildAlertHttpRequest[F[_]: Sync, SetupAlert: Show]( webhookConfig: Config, appInfo: AppInfo, - alert: Alert - ): Request[F] = - Request[F](Method.POST, webhookConfig.endpoint) - .withEntity(toSelfDescribingJson(alert, appInfo, webhookConfig.tags)) + reason: SetupAlert + ): Stream[F, Request[F]] = + webhookConfig.endpoint match { + case Some(endpoint) => + val request = Request[F](Method.POST, endpoint) + .withEntity(toAlertSdj(reason, appInfo, webhookConfig.tags)) + Stream.emit(request) + case None => + Stream.eval(Logger[F].info(show"Skipping setup alert because webhook is not configured: $reason")).drain + } + + private def buildHeartbeatHttpRequest[F[_]: Sync]( + webhookConfig: Config, + appInfo: AppInfo + ): Stream[F, Request[F]] = + webhookConfig.endpoint match { + case Some(endpoint) => + val request = Request[F](Method.POST, endpoint) + .withEntity(toHeartbeatSdj(appInfo, webhookConfig.tags)) + Stream.emit(request) + case None => + Stream.eval(Logger[F].info(s"Skipping heartbeat because webhook is not configured")).drain + } private def executeHttpRequest[F[_]: Async]( webhookConfig: Config, @@ -91,8 +125,8 @@ object Webhook { /** Restrict the length of an alert message to be compliant with alert iglu schema */ private val MaxAlertPayloadLength = 4096 - private def toSelfDescribingJson[Alert: Show]( - alert: Alert, + private def toAlertSdj[SetupAlert: Show]( + reason: SetupAlert, appInfo: AppInfo, tags: Map[String, String] ): Json = @@ -101,9 +135,47 @@ object Webhook { data = Json.obj( "appName" -> appInfo.name.asJson, "appVersion" -> appInfo.version.asJson, - "message" -> alert.show.take(MaxAlertPayloadLength).asJson, + "message" -> reason.show.take(MaxAlertPayloadLength).asJson, + "tags" -> tags.asJson + ) + ).normalize + + private def toHeartbeatSdj( + appInfo: AppInfo, + tags: Map[String, String] + ): Json = + SelfDescribingData( + schema = SchemaKey("com.snowplowanalytics.monitoring.loader", "heartbeat", "jsonschema", SchemaVer.Full(1, 0, 0)), + data = Json.obj( + "appName" -> appInfo.name.asJson, + "appVersion" -> appInfo.version.asJson, "tags" -> tags.asJson ) ).normalize + def repeatPeriodically[F[_]: Async, A](period: FiniteDuration): Pipe[F, A, A] = { + + def go(timedPull: Pull.Timed[F, A], toRepeat: Option[A]): Pull[F, A, Unit] = + timedPull.uncons.flatMap { + case None => + // Upstream finished + Pull.done + case Some((Left(_), next)) => + // Timer timed-out. + Pull.outputOption1[F, A](toRepeat) *> timedPull.timeout(period) *> go(next, toRepeat) + case Some((Right(chunk), next)) => + chunk.last match { + case Some(last) => + Pull.output[F, A](chunk) *> timedPull.timeout(period) *> go(next, Some(last)) + case None => + go(next, toRepeat) + } + } + + in => + in.pull.timed { timedPull => + go(timedPull, None) + }.stream + } + } diff --git a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/AppHealthSpec.scala b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/AppHealthSpec.scala new file mode 100644 index 00000000..a614d84b --- /dev/null +++ b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/AppHealthSpec.scala @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.runtime + +import cats.Show +import cats.effect.testing.specs2.CatsEffect +import cats.effect.{IO, Ref} +import org.specs2.Specification + +class AppHealthSpec extends Specification with CatsEffect { + import AppHealthSpec._ + + def is = s2""" + The AppHealth should: + For Runtime health: + Start healthy $runtime1 + Report one unhealthy service if one service is unhealthy $runtime2 + Report two unhealthy services if two services are unhealthy $runtime3 + Become unhealthy after one service recovers $runtime4 + Report one unhealthy service if two services were unhealthy and one recovers $runtime5 + Report healthy status for an external reporter $runtime6 + For Setup health: + Start with status of awaiting health $setup1 + Report unhealthy after told of a setup problem $setup2 + Report healthy after told of a healthy setup $setup3 + Recover from an unhealthy status when told $setup4 + Return to an unhealthy status when told $setup5 + """ + + def runtime1 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) + statuses <- appHealth.unhealthyRuntimeServiceMessages + } yield statuses should beEmpty + + def runtime2 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) + _ <- appHealth.beUnhealthyForRuntimeService(TestService1) + statuses <- appHealth.unhealthyRuntimeServiceMessages + } yield statuses should beEqualTo(List("test service 1")) + + def runtime3 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) + _ <- appHealth.beUnhealthyForRuntimeService(TestService1) + _ <- appHealth.beUnhealthyForRuntimeService(TestService2) + statuses <- appHealth.unhealthyRuntimeServiceMessages + } yield statuses should containTheSameElementsAs(List("test service 1", "test service 2")) + + def runtime4 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) + _ <- appHealth.beUnhealthyForRuntimeService(TestService1) + _ <- appHealth.beHealthyForRuntimeService(TestService1) + statuses <- appHealth.unhealthyRuntimeServiceMessages + } yield statuses should beEmpty + + def runtime5 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) + _ <- appHealth.beUnhealthyForRuntimeService(TestService1) + _ <- appHealth.beUnhealthyForRuntimeService(TestService2) + _ <- appHealth.beHealthyForRuntimeService(TestService1) + statuses <- appHealth.unhealthyRuntimeServiceMessages + } yield statuses should beEqualTo(List("test service 2")) + + def runtime6 = for { + reporter <- Ref[IO].of(Option.empty[String]) + appHealth <- AppHealth.init[IO, TestAlert, TestService](List(reporter.get)) + result1 <- appHealth.unhealthyRuntimeServiceMessages + _ <- reporter.set(Some("test reporter unhealthy 1")) + result2 <- appHealth.unhealthyRuntimeServiceMessages + _ <- reporter.set(None) + result3 <- appHealth.unhealthyRuntimeServiceMessages + } yield List( + result1 should beEmpty, + result2 should beEqualTo(List("test reporter unhealthy 1")), + result3 should beEmpty + ).reduce(_ and _) + + def setup1 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) + setupHealth <- appHealth.setupHealth.get + } yield setupHealth should beEqualTo(AppHealth.SetupStatus.AwaitingHealth) + + def setup2 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) + _ <- appHealth.beUnhealthyForSetup(TestAlert1) + setupHealth <- appHealth.setupHealth.get + } yield setupHealth should beEqualTo(AppHealth.SetupStatus.Unhealthy(TestAlert1)) + + def setup3 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) + _ <- appHealth.beHealthyForSetup + setupHealth <- appHealth.setupHealth.get + } yield setupHealth should beEqualTo(AppHealth.SetupStatus.Healthy) + + def setup4 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) + _ <- appHealth.beUnhealthyForSetup(TestAlert1) + _ <- appHealth.beHealthyForSetup + setupHealth <- appHealth.setupHealth.get + } yield setupHealth should beEqualTo(AppHealth.SetupStatus.Healthy) + + def setup5 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) + _ <- appHealth.beHealthyForSetup + _ <- appHealth.beUnhealthyForSetup(TestAlert1) + setupHealth <- appHealth.setupHealth.get + } yield setupHealth should beEqualTo(AppHealth.SetupStatus.Unhealthy(TestAlert1)) + +} + +object AppHealthSpec { + + sealed trait TestAlert + case object TestAlert1 extends TestAlert + case object TestAlert2 extends TestAlert + + sealed trait TestService + case object TestService1 extends TestService + case object TestService2 extends TestService + + implicit def showTestService: Show[TestService] = Show { + case TestService1 => "test service 1" + case TestService2 => "test service 2" + } +} diff --git a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/HealthProbeSpec.scala b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/HealthProbeSpec.scala new file mode 100644 index 00000000..108ed0ca --- /dev/null +++ b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/HealthProbeSpec.scala @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.runtime + +import cats.effect.testing.specs2.CatsEffect +import cats.effect.IO +import cats.Show +import org.http4s.{Request, Status} +import org.specs2.Specification + +class HealthProbeSpec extends Specification with CatsEffect { + import HealthProbeSpec._ + + def is = s2""" + The HealthProbe should: + Initially return 503 $probe1 + Return 200 after setup configuration is healthy $probe2 + Return 503 after setup configuration becomes unhealthy $probe3 + Return 503 if a runtime service is unhealthy $probe4 + Return 200 after a runtime service recovers $probe5 + """ + + def probe1 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) + httpApp = HealthProbe.httpApp(appHealth) + response <- httpApp.run(Request[IO]()) + } yield response.status must beEqualTo(Status.ServiceUnavailable) + + def probe2 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) + httpApp = HealthProbe.httpApp(appHealth) + _ <- appHealth.beHealthyForSetup + response <- httpApp.run(Request[IO]()) + } yield response.status must beEqualTo(Status.Ok) + + def probe3 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) + httpApp = HealthProbe.httpApp(appHealth) + _ <- appHealth.beHealthyForSetup + _ <- appHealth.beUnhealthyForSetup(TestAlert1) + response <- httpApp.run(Request[IO]()) + } yield response.status must beEqualTo(Status.ServiceUnavailable) + + def probe4 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) + httpApp = HealthProbe.httpApp(appHealth) + _ <- appHealth.beHealthyForSetup + _ <- appHealth.beUnhealthyForRuntimeService(TestService1) + response <- httpApp.run(Request[IO]()) + } yield response.status must beEqualTo(Status.ServiceUnavailable) + + def probe5 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) + httpApp = HealthProbe.httpApp(appHealth) + _ <- appHealth.beHealthyForSetup + _ <- appHealth.beUnhealthyForRuntimeService(TestService1) + _ <- appHealth.beHealthyForRuntimeService(TestService1) + response <- httpApp.run(Request[IO]()) + } yield response.status must beEqualTo(Status.Ok) + +} + +object HealthProbeSpec { + + sealed trait TestAlert + case object TestAlert1 extends TestAlert + case object TestAlert2 extends TestAlert + + sealed trait TestService + case object TestService1 extends TestService + case object TestService2 extends TestService + + implicit def showTestAlert: Show[TestService] = Show[TestService](_.toString) +} diff --git a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookConfigSpec.scala b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookConfigSpec.scala new file mode 100644 index 00000000..48b5bd8f --- /dev/null +++ b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookConfigSpec.scala @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.runtime + +import cats.implicits._ +import com.typesafe.config.ConfigFactory +import io.circe.config.syntax.CirceConfigOps +import io.circe.DecodingFailure +import io.circe.Decoder +import io.circe.literal._ +import io.circe.generic.semiauto._ +import org.http4s.Uri +import org.specs2.Specification + +import scala.concurrent.duration.DurationLong + +class WebhookConfigSpec extends Specification { + import WebhookConfigSpec._ + + def is = s2""" + The webhook config decoder should: + Decode a valid JSON config when endpoint is set $e1 + Decode a valid JSON config when endpoint is missing $e2 + Not decode JSON if other required field is missing $e3 + The webhook defaults should: + Provide default values from reference.conf $e4 + Not provide default value for endpoint $e5 + + + """ + + def e1 = { + val json = json""" + { + "endpoint": "http://example.com/xyz?abc=123", + "tags": { + "abc": "xyz" + }, + "heartbeat": "42 seconds" + } + """ + + json.as[Webhook.Config] must beRight { (c: Webhook.Config) => + List( + c.endpoint must beSome(Uri.unsafeFromString("http://example.com/xyz?abc=123")), + c.tags must beEqualTo(Map("abc" -> "xyz")), + c.heartbeat must beEqualTo(42.seconds) + ).reduce(_ and _) + } + } + + def e2 = { + val json = json""" + { + "tags": { + "abc": "xyz" + }, + "heartbeat": "42 seconds" + } + """ + + json.as[Webhook.Config] must beRight { (c: Webhook.Config) => + List( + c.endpoint must beNone + ).reduce(_ and _) + } + } + + def e3 = { + + // missing heartbeat + val json = json""" + { + "endpoint": "http://example.com/xyz?abc=123", + "tags": { + "abc": "xyz" + } + } + """ + + json.as[Webhook.Config] must beLeft.like { case e: DecodingFailure => + e.show must beEqualTo("DecodingFailure at .heartbeat: Missing required field") + } + } + + def e4 = { + val input = s""" + |{ + | "xyz": $${snowplow.defaults.webhook} + | "xyz": { + | "endpoint": "http://example.com/xyz?abc=123" + | } + |} + |""".stripMargin + + val result = ConfigFactory.load(ConfigFactory.parseString(input)) + + val expected = Webhook.Config( + endpoint = Some(Uri.unsafeFromString("http://example.com/xyz?abc=123")), + tags = Map.empty, + heartbeat = 60.minutes + ) + + result.as[ConfigWrapper] must beRight { (w: ConfigWrapper) => + w.xyz must beEqualTo(expected) + } + } + + def e5 = { + val input = s""" + |{ + | "xyz": $${snowplow.defaults.webhook} + |} + |""".stripMargin + + val result = ConfigFactory.load(ConfigFactory.parseString(input)) + + result.as[ConfigWrapper] must beRight.like { case w: ConfigWrapper => + w.xyz.endpoint must beNone + } + } + +} + +object WebhookConfigSpec { + case class ConfigWrapper(xyz: Webhook.Config) + + implicit def wrapperDecoder: Decoder[ConfigWrapper] = deriveDecoder[ConfigWrapper] +} diff --git a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookSpec.scala b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookSpec.scala index 87175cba..4233c30e 100644 --- a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookSpec.scala +++ b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookSpec.scala @@ -8,19 +8,18 @@ package com.snowplowanalytics.snowplow.runtime import cats.{Applicative, Id, Show} -import cats.implicits._ import cats.effect.testing.specs2.CatsEffect import cats.effect.{Clock, IO, Ref, Resource} +import cats.effect.testkit.TestControl import org.http4s.{Headers, Method, Response} import org.http4s.client.Client import io.circe.Json -import io.circe.literal.JsonStringContext import io.circe.parser.{parse => circeParse} -import io.circe.DecodingFailure import org.http4s.Uri import org.specs2.Specification +import org.specs2.matcher.Matcher -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.{DurationInt, FiniteDuration} import java.util.concurrent.TimeUnit import com.snowplowanalytics.iglu.core.SelfDescribingData @@ -33,85 +32,182 @@ class WebhookSpec extends Specification with CatsEffect { import WebhookSpec._ def is = s2""" - The webhook config decoder should: - Decode a valid JSON config $decode1 - Not decode JSON if a required field is missing $decode2 The webhook should: - Not send any payloads if config is empty $send1 - Send a valid payload if given valid config $send2 - Ignore any exception raised by sending webhook $send3 + Not send any payloads if app health never leaves awaiting status $send1 + Send a single heartbeat after app becomes healthy for setup $send2 + Send a second heartbeat after configured period of time $send3 + Send a single alert after app becomes unhealthy for setup $send4 + Send multiple alerts if app becomes unhealthy for setup with different alert messages $send5 + Send alternating hearbeat and alert if app health flip flops $send6 + Not send any payloads if endpoint is not set in the configuration $send7 + Ignore any exception raised by sending webhook $send8 """ - def decode1 = { - val json = json""" - { - "endpoint": "http://example.com/xyz?abc=123", - "tags": { - "abc": "xyz" - } + def send1 = { + val io = resources().use { case (getReportedRequests, appHealth) => + val _ = appHealth + for { + _ <- IO.sleep(60.minutes) + reportedRequests <- getReportedRequests + } yield reportedRequests should beEmpty } - """ + TestControl.executeEmbed(io) + } - json.as[Option[Webhook.Config]] must beRight.like { case Some(c: Webhook.Config) => - List( - c.endpoint must beEqualTo(Uri.unsafeFromString("http://example.com/xyz?abc=123")), - c.tags must beEqualTo(Map("abc" -> "xyz")) + def send2 = { + val io = resources().use { case (getReportedRequests, appHealth) => + for { + _ <- appHealth.beHealthyForSetup + _ <- IO.sleep(5.minutes) + _ <- appHealth.beHealthyForSetup + _ <- IO.sleep(5.minutes) + _ <- appHealth.beHealthyForSetup + _ <- IO.sleep(5.minutes) + reportedRequests <- getReportedRequests + } yield List( + reportedRequests should haveSize(1), + reportedRequests must contain(beValidHeartbeatRequest) ).reduce(_ and _) } + TestControl.executeEmbed(io) } - def decode2 = { - val json = json""" - { - "tags": { - "abc": "xyz" - } + def send3 = { + val io = resources().use { case (getReportedRequests, appHealth) => + for { + _ <- appHealth.beHealthyForSetup + _ <- IO.sleep(45.minutes) + reportedRequests <- getReportedRequests + } yield List( + reportedRequests should haveSize(2), + reportedRequests must contain(beValidHeartbeatRequest).forall + ).reduce(_ and _) } - """ + TestControl.executeEmbed(io) + } - json.as[Option[Webhook.Config]] must beLeft.like { case e: DecodingFailure => - e.show must beEqualTo("DecodingFailure at .endpoint: Missing required field") + def send4 = { + val io = resources().use { case (getReportedRequests, appHealth) => + for { + _ <- appHealth.beUnhealthyForSetup(TestAlert("boom!")) + _ <- IO.sleep(5.minutes) + _ <- appHealth.beUnhealthyForSetup(TestAlert("boom!")) + _ <- IO.sleep(5.minutes) + _ <- appHealth.beUnhealthyForSetup(TestAlert("boom!")) + _ <- IO.sleep(5.minutes) + reportedRequests <- getReportedRequests + } yield List( + reportedRequests should haveSize(1), + reportedRequests must contain(beValidAlertRequest) + ).reduce(_ and _) } + TestControl.executeEmbed(io) } - def send1 = for { - ref <- Ref[IO].of(List.empty[ReportedRequest]) - httpClient = reportingHttpClient(ref) - webhook = Webhook.create[IO, TestAlert](None, testAppInfo, httpClient) - _ <- webhook.alert(TestAlert("this is a test")) - results <- ref.get - } yield results must beEmpty + def send5 = { + val io = resources().use { case (getReportedRequests, appHealth) => + for { + _ <- appHealth.beUnhealthyForSetup(TestAlert("boom 1")) + _ <- IO.sleep(5.minutes) + _ <- appHealth.beUnhealthyForSetup(TestAlert("boom 2")) + _ <- IO.sleep(5.minutes) + _ <- appHealth.beUnhealthyForSetup(TestAlert("boom 3")) + _ <- IO.sleep(5.minutes) + reportedRequests <- getReportedRequests + } yield List( + reportedRequests should haveSize(3), + reportedRequests must contain(beValidAlertRequest).forall + ).reduce(_ and _) + } + TestControl.executeEmbed(io) + } - def send2 = for { - ref <- Ref[IO].of(List.empty[ReportedRequest]) - httpClient = reportingHttpClient(ref) - webhook = Webhook.create[IO, TestAlert](Some(testConfig), testAppInfo, httpClient) - _ <- webhook.alert(TestAlert("this is a test")) - results <- ref.get - } yield List( - results must haveSize(1), - results must contain { req: ReportedRequest => - List( - mustHaveValidAlertBody(req.body), - req.method must beEqualTo(Method.POST), - req.uri must beEqualTo(testConfig.endpoint), - req.headers.toString must contain("Content-Type: application/json") + def send6 = { + val io = resources().use { case (getReportedRequests, appHealth) => + for { + _ <- appHealth.beHealthyForSetup + _ <- IO.sleep(5.minutes) + _ <- appHealth.beUnhealthyForSetup(TestAlert("boom!")) + _ <- IO.sleep(5.minutes) + _ <- appHealth.beHealthyForSetup + _ <- IO.sleep(5.minutes) + _ <- appHealth.beUnhealthyForSetup(TestAlert("boom!")) + _ <- IO.sleep(5.minutes) + reportedRequests <- getReportedRequests + } yield List( + reportedRequests should haveSize(4), + reportedRequests must contain(beValidAlertRequest).exactly(2.times), + reportedRequests must contain(beValidHeartbeatRequest).exactly(2.times) ).reduce(_ and _) } - ).reduce(_ and _) + TestControl.executeEmbed(io) + } - def send3 = { - val webhook = Webhook.create[IO, TestAlert](Some(testConfig), testAppInfo, errorRaisingHttpClient) - for { - _ <- webhook.alert(TestAlert("this is a test")) - } yield ok + def send7 = { + val config = testConfig.copy(endpoint = None) + val io = resources(config).use { case (getReportedRequests, appHealth) => + for { + _ <- appHealth.beHealthyForSetup + _ <- IO.sleep(5.minutes) + _ <- appHealth.beHealthyForSetup + _ <- IO.sleep(5.minutes) + _ <- appHealth.beHealthyForSetup + _ <- IO.sleep(5.minutes) + reportedRequests <- getReportedRequests + } yield reportedRequests should beEmpty + } + TestControl.executeEmbed(io) } - private def mustHaveValidAlertBody(body: String) = + def send8 = { + val resources = for { + appHealth <- Resource.eval(AppHealth.init[IO, TestAlert, TestService](Nil)) + _ <- Webhook.resource(testConfig, testAppInfo, errorRaisingHttpClient, appHealth) + } yield appHealth + + val io = resources.use { appHealth => + for { + _ <- appHealth.beHealthyForSetup + _ <- IO.sleep(30.minutes) + _ <- appHealth.beHealthyForSetup + _ <- IO.sleep(30.minutes) + _ <- appHealth.beHealthyForSetup + _ <- IO.sleep(30.minutes) + } yield ok + } + + TestControl.executeEmbed(io) + } + + private def beValidAlertRequest: Matcher[ReportedRequest] = { (req: ReportedRequest) => + List( + mustHaveValidAlertBody(req.body), + req.method must beEqualTo(Method.POST), + req.uri must beEqualTo(testEndpoint), + req.headers.toString must contain("Content-Type: application/json") + ).reduce(_ and _) + } + + private def beValidHeartbeatRequest: Matcher[ReportedRequest] = { (req: ReportedRequest) => + List( + mustHaveValidHeartbeatBody(req.body), + req.method must beEqualTo(Method.POST), + req.uri must beEqualTo(testEndpoint), + req.headers.toString must contain("Content-Type: application/json") + ).reduce(_ and _) + } + + private def mustHaveValidAlertBody = + mustHaveValidSdjBody("iglu:com.snowplowanalytics.monitoring.loader/alert/jsonschema/1-0-0") _ + + private def mustHaveValidHeartbeatBody = + mustHaveValidSdjBody("iglu:com.snowplowanalytics.monitoring.loader/heartbeat/jsonschema/1-0-0") _ + + private def mustHaveValidSdjBody(igluUri: String)(body: String) = circeParse(body) must beRight.like { case json: Json => json.as[SelfDescribingData[Json]] must beRight.like { case sdj: SelfDescribingData[Json] => List( - sdj.schema.toSchemaUri must beEqualTo("iglu:com.snowplowanalytics.monitoring.loader/alert/jsonschema/1-0-0"), + sdj.schema.toSchemaUri must beEqualTo(igluUri), igluCirceClient.check(sdj).value must beRight ).reduce(_ and _) } @@ -126,6 +222,10 @@ object WebhookSpec { implicit def testAlertShow: Show[TestAlert] = Show(_.msg) + sealed trait TestService + case object TestService1 extends TestService + case object TestService2 extends TestService + val testAppInfo: AppInfo = new AppInfo { def name: String = "testName" def version: String = "testVersion" @@ -133,9 +233,12 @@ object WebhookSpec { def cloud: String = "testCloud" } + def testEndpoint = Uri.unsafeFromString("http://example.com/xyz?abc=123") + def testConfig: Webhook.Config = Webhook.Config( - endpoint = Uri.unsafeFromString("http://example.com/xyz?abc=123"), - tags = Map("myTag" -> "myValue") + endpoint = Some(testEndpoint), + tags = Map("myTag" -> "myValue"), + heartbeat = 42.minutes ) // Used in tests to report the request that was sent to the webhook @@ -157,6 +260,22 @@ object WebhookSpec { } } + /** + * Resources for running a Spec + * + * @return + * a IO that records the requests sent to the webhook, and the AppHealth on which the spec can + * set healthy/unhealthy services + */ + def resources( + config: Webhook.Config = testConfig + ): Resource[IO, (IO[List[ReportedRequest]], AppHealth.Interface[IO, TestAlert, TestService])] = for { + ref <- Resource.eval(Ref[IO].of(List.empty[ReportedRequest])) + httpClient = reportingHttpClient(ref) + appHealth <- Resource.eval(AppHealth.init[IO, TestAlert, TestService](Nil)) + _ <- Webhook.resource(config, testAppInfo, httpClient, appHealth) + } yield (ref.get, appHealth) + // A http4s Client that raises exceptions def errorRaisingHttpClient: Client[IO] = Client[IO] { _ => diff --git a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala index fbed0447..384daf76 100644 --- a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala +++ b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala @@ -52,7 +52,14 @@ trait SourceAndAck[F[_]] { object SourceAndAck { - sealed trait HealthStatus + sealed trait HealthStatus { self => + final def showIfUnhealthy: Option[String] = + self match { + case Healthy => None + case unhealthy: Unhealthy => Some(unhealthy.show) + } + } + case object Healthy extends HealthStatus sealed trait Unhealthy extends HealthStatus @@ -86,4 +93,5 @@ object SourceAndAck { case LaggingEventProcessor(latency) => show"Processing latency is $latency" case InactiveSource(duration) => show"Source of events has been inactive for $duration" } + } diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index eebc8d59..00dc2d4f 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -112,11 +112,14 @@ object BuildSettings { ) val igluTestSettings = Seq( + // TODO: Remove this dev repository after heartbeat schema is published + Test / igluRepository := "http://iglucentral-dev.com.s3-website-us-east-1.amazonaws.com/add-loader-heartbeat", Test / igluUris := Seq( // Iglu Central schemas used in tests will get pre-fetched by sbt "iglu:com.snowplowanalytics.iglu/anything-a/jsonschema/1-0-0", "iglu:com.snowplowanalytics.snowplow.media/ad_break_end_event/jsonschema/1-0-0", - "iglu:com.snowplowanalytics.monitoring.loader/alert/jsonschema/1-0-0" + "iglu:com.snowplowanalytics.monitoring.loader/alert/jsonschema/1-0-0", + "iglu:com.snowplowanalytics.monitoring.loader/heartbeat/jsonschema/1-0-0" ) ) } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 10e019e6..f9233a97 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -166,6 +166,7 @@ object Dependencies { val runtimeCommonDependencies = Seq( cats, catsEffectKernel, + catsRetry, circeConfig, circeGeneric, emberServer,