From 0e727742a064fb93750e9df3305bb697aeb9cc8d Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Wed, 14 Aug 2024 16:20:46 +0100 Subject: [PATCH] [amendment] move reporters into init --- .../snowplow/runtime/AppHealth.scala | 25 ++++++++----------- .../snowplow/runtime/HealthProbe.scala | 2 +- .../snowplow/runtime/AppHealthSpec.scala | 23 ++++++++--------- .../snowplow/runtime/HealthProbeSpec.scala | 10 ++++---- .../snowplow/runtime/WebhookSpec.scala | 4 +-- .../sources/SourceAndAck.scala | 10 +++++++- 6 files changed, 39 insertions(+), 35 deletions(-) diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala index 6484055..02f0521 100644 --- a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala @@ -121,7 +121,7 @@ import fs2.concurrent.SignallingRef class AppHealth[F[_]: Monad, SetupAlert, RuntimeService] private ( private[runtime] val setupHealth: SignallingRef[F, AppHealth.SetupStatus[SetupAlert]], unhealthyRuntimeServices: Ref[F, Set[RuntimeService]], - runtimeServiceReporters: Ref[F, List[F[Option[String]]]] + runtimeServiceReporters: List[F[Option[String]]] ) extends AppHealth.Interface[F, SetupAlert, RuntimeService] { import AppHealth._ @@ -137,19 +137,10 @@ class AppHealth[F[_]: Monad, SetupAlert, RuntimeService] private ( def becomeUnhealthyForRuntimeService(service: RuntimeService): F[Unit] = unhealthyRuntimeServices.update(_ + service) - /** - * Add a reporter which is counted as an unhealthy runtime service if it returns a `Some`. - * - * The returned string must be a short description of why the service is unhealthy. - */ - def addRuntimeHealthReporter(reporter: F[Option[String]]): F[Unit] = - runtimeServiceReporters.update(reporter :: _) - private[runtime] def unhealthyRuntimeServiceMessages(implicit show: Show[RuntimeService]): F[List[String]] = for { services <- unhealthyRuntimeServices.get - reporters <- runtimeServiceReporters.get - extras <- reporters.sequence + extras <- runtimeServiceReporters.sequence } yield services.toList.map(_.show) ::: extras.flatten } @@ -194,11 +185,17 @@ object AppHealth { * Sealed trait of the alerts this app is allowed to send to the webhook for setup errors * @tparam RuntimeService * Sealed trait of the services that this app requires to be healthy + * + * @param runtimeReporters + * Reporters for any additional service, not covered by `RuntimeService`. Reporters provide a + * String if a service is unhealthy. The String must be a short description of why the service + * is unhealthy. */ - def init[F[_]: Async, SetupAlert, RuntimeService]: F[AppHealth[F, SetupAlert, RuntimeService]] = + def init[F[_]: Async, SetupAlert, RuntimeService]( + runtimeReporters: List[F[Option[String]]] + ): F[AppHealth[F, SetupAlert, RuntimeService]] = for { setupHealth <- SignallingRef[F, SetupStatus[SetupAlert]](SetupStatus.AwaitingHealth) unhealthyRuntimeServices <- Ref[F].of(Set.empty[RuntimeService]) - reporters <- Ref[F].of(List.empty[F[Option[String]]]) - } yield new AppHealth(setupHealth, unhealthyRuntimeServices, reporters) + } yield new AppHealth(setupHealth, unhealthyRuntimeServices, runtimeReporters) } diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HealthProbe.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HealthProbe.scala index 1563a78..f3e019e 100644 --- a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HealthProbe.scala +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HealthProbe.scala @@ -71,7 +71,7 @@ object HealthProbe { } else None val awaitingMsg = if (allAwaiting.nonEmpty) { - val joined = allUnhealthy.mkString("Services are awaiting a healthy status [", ", ", "]") + val joined = allAwaiting.mkString("Services are awaiting a healthy status [", ", ", "]") Some(joined) } else None diff --git a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/AppHealthSpec.scala b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/AppHealthSpec.scala index a53661c..a2c4dc6 100644 --- a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/AppHealthSpec.scala +++ b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/AppHealthSpec.scala @@ -33,32 +33,32 @@ class AppHealthSpec extends Specification with CatsEffect { """ def runtime1 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) statuses <- appHealth.unhealthyRuntimeServiceMessages } yield statuses should beEmpty def runtime2 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) _ <- appHealth.becomeUnhealthyForRuntimeService(TestService1) statuses <- appHealth.unhealthyRuntimeServiceMessages } yield statuses should beEqualTo(List("test service 1")) def runtime3 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) _ <- appHealth.becomeUnhealthyForRuntimeService(TestService1) _ <- appHealth.becomeUnhealthyForRuntimeService(TestService2) statuses <- appHealth.unhealthyRuntimeServiceMessages } yield statuses should containTheSameElementsAs(List("test service 1", "test service 2")) def runtime4 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) _ <- appHealth.becomeUnhealthyForRuntimeService(TestService1) _ <- appHealth.becomeHealthyForRuntimeService(TestService1) statuses <- appHealth.unhealthyRuntimeServiceMessages } yield statuses should beEmpty def runtime5 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) _ <- appHealth.becomeUnhealthyForRuntimeService(TestService1) _ <- appHealth.becomeUnhealthyForRuntimeService(TestService2) _ <- appHealth.becomeHealthyForRuntimeService(TestService1) @@ -66,9 +66,8 @@ class AppHealthSpec extends Specification with CatsEffect { } yield statuses should beEqualTo(List("test service 2")) def runtime6 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] reporter <- Ref[IO].of(Option.empty[String]) - _ <- appHealth.addRuntimeHealthReporter(reporter.get) + appHealth <- AppHealth.init[IO, TestAlert, TestService](List(reporter.get)) result1 <- appHealth.unhealthyRuntimeServiceMessages _ <- reporter.set(Some("test reporter unhealthy 1")) result2 <- appHealth.unhealthyRuntimeServiceMessages @@ -81,31 +80,31 @@ class AppHealthSpec extends Specification with CatsEffect { ).reduce(_ and _) def setup1 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) setupHealth <- appHealth.setupHealth.get } yield setupHealth should beEqualTo(AppHealth.SetupStatus.AwaitingHealth) def setup2 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) _ <- appHealth.becomeUnhealthyForSetup(TestAlert1) setupHealth <- appHealth.setupHealth.get } yield setupHealth should beEqualTo(AppHealth.SetupStatus.Unhealthy(TestAlert1)) def setup3 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) _ <- appHealth.becomeHealthyForSetup setupHealth <- appHealth.setupHealth.get } yield setupHealth should beEqualTo(AppHealth.SetupStatus.Healthy) def setup4 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) _ <- appHealth.becomeUnhealthyForSetup(TestAlert1) _ <- appHealth.becomeHealthyForSetup setupHealth <- appHealth.setupHealth.get } yield setupHealth should beEqualTo(AppHealth.SetupStatus.Healthy) def setup5 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) _ <- appHealth.becomeHealthyForSetup _ <- appHealth.becomeUnhealthyForSetup(TestAlert1) setupHealth <- appHealth.setupHealth.get diff --git a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/HealthProbeSpec.scala b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/HealthProbeSpec.scala index 072bfaf..5481254 100644 --- a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/HealthProbeSpec.scala +++ b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/HealthProbeSpec.scala @@ -26,20 +26,20 @@ class HealthProbeSpec extends Specification with CatsEffect { """ def probe1 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) httpApp = HealthProbe.httpApp(appHealth) response <- httpApp.run(Request[IO]()) } yield response.status must beEqualTo(Status.ServiceUnavailable) def probe2 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) httpApp = HealthProbe.httpApp(appHealth) _ <- appHealth.becomeHealthyForSetup response <- httpApp.run(Request[IO]()) } yield response.status must beEqualTo(Status.Ok) def probe3 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) httpApp = HealthProbe.httpApp(appHealth) _ <- appHealth.becomeHealthyForSetup _ <- appHealth.becomeUnhealthyForSetup(TestAlert1) @@ -47,7 +47,7 @@ class HealthProbeSpec extends Specification with CatsEffect { } yield response.status must beEqualTo(Status.ServiceUnavailable) def probe4 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) httpApp = HealthProbe.httpApp(appHealth) _ <- appHealth.becomeHealthyForSetup _ <- appHealth.becomeUnhealthyForRuntimeService(TestService1) @@ -55,7 +55,7 @@ class HealthProbeSpec extends Specification with CatsEffect { } yield response.status must beEqualTo(Status.ServiceUnavailable) def probe5 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) httpApp = HealthProbe.httpApp(appHealth) _ <- appHealth.becomeHealthyForSetup _ <- appHealth.becomeUnhealthyForRuntimeService(TestService1) diff --git a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookSpec.scala b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookSpec.scala index d439815..c63bfa2 100644 --- a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookSpec.scala +++ b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookSpec.scala @@ -161,7 +161,7 @@ class WebhookSpec extends Specification with CatsEffect { def send8 = { val resources = for { - appHealth <- Resource.eval(AppHealth.init[IO, TestAlert, TestService]) + appHealth <- Resource.eval(AppHealth.init[IO, TestAlert, TestService](Nil)) _ <- Webhook.resource(testConfig, testAppInfo, errorRaisingHttpClient, appHealth) } yield appHealth @@ -275,7 +275,7 @@ object WebhookSpec { ): Resource[IO, (IO[List[ReportedRequest]], AppHealth.Interface[IO, TestAlert, TestService])] = for { ref <- Resource.eval(Ref[IO].of(List.empty[ReportedRequest])) httpClient = reportingHttpClient(ref) - appHealth <- Resource.eval(AppHealth.init[IO, TestAlert, TestService]) + appHealth <- Resource.eval(AppHealth.init[IO, TestAlert, TestService](Nil)) _ <- Webhook.resource(config, testAppInfo, httpClient, appHealth) } yield (ref.get, appHealth) diff --git a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala index fbed044..384daf7 100644 --- a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala +++ b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala @@ -52,7 +52,14 @@ trait SourceAndAck[F[_]] { object SourceAndAck { - sealed trait HealthStatus + sealed trait HealthStatus { self => + final def showIfUnhealthy: Option[String] = + self match { + case Healthy => None + case unhealthy: Unhealthy => Some(unhealthy.show) + } + } + case object Healthy extends HealthStatus sealed trait Unhealthy extends HealthStatus @@ -86,4 +93,5 @@ object SourceAndAck { case LaggingEventProcessor(latency) => show"Processing latency is $latency" case InactiveSource(duration) => show"Source of events has been inactive for $duration" } + }