From 46492bdf70ef9923f6f94dffd4b8bea7b0fd37d7 Mon Sep 17 00:00:00 2001 From: Brian Holt Date: Wed, 29 Nov 2023 00:37:20 -0600 Subject: [PATCH] add XRay tracing using Natchez this uses the X-Ray backend directly instead of using OTel, because AWS Lambda supports X-Ray by default, and including an OTel collector is extra work. It sounds like including the collector can be done by building it in as a Lambda Layer, but that's not something I've done before, and I think this will work for now. --- build.sbt | 11 +- .../LifecycleHookHandlerSpec.scala | 15 +- .../scala/com/dwolla/aws/ecs/EcsAlgSpec.scala | 1 + .../aws/autoscaling/AutoScalingAlg.scala | 44 ++++- .../autoscaling/LifecycleHookHandler.scala | 31 +-- .../com/dwolla/aws/autoscaling/model.scala | 6 + .../cloudformation/CloudFormationAlg.scala | 28 ++- .../scala/com/dwolla/aws/ec2/Ec2Alg.scala | 28 ++- .../scala/com/dwolla/aws/ecs/EcsAlg.scala | 157 ++++++++------- .../main/scala/com/dwolla/aws/ecs/model.scala | 181 +++++++++++++++++- .../src/main/scala/com/dwolla/aws/model.scala | 66 ++++++- .../scala/com/dwolla/aws/sns/SnsAlg.scala | 31 ++- .../com/dwolla/tracing/mtl/LocalSpan.scala | 41 ++++ .../draining/TerminationEventHandler.scala | 22 ++- .../autoscaling/ecs/draining/TestApp.scala | 48 +++-- .../ScaleOutPendingEventHandler.scala | 18 +- 16 files changed, 601 insertions(+), 127 deletions(-) create mode 100644 core/src/main/scala/com/dwolla/tracing/mtl/LocalSpan.scala diff --git a/build.sbt b/build.sbt index 6ecd2db..475668c 100644 --- a/build.sbt +++ b/build.sbt @@ -67,8 +67,13 @@ lazy val `autoscaling-ecs-core`: Project = project "io.circe" %% "circe-parser" % "0.14.6", "io.monix" %% "newtypes-core" % "0.2.3", "io.monix" %% "newtypes-circe-v0-14" % "0.2.3", + "org.tpolecat" %% "natchez-core" % "0.3.4", + "org.typelevel" %% "cats-tagless-core" % "0.15.0", + "org.typelevel" %% "alleycats-core" % "2.10.0", + "com.dwolla" %% "natchez-tagless" % "0.2.4", ) - } + }, + scalacOptions += "-Xmax-inlines:40", // because Task from ECS has 33 fields ) .dependsOn( `smithy4s-generated`, @@ -86,6 +91,7 @@ lazy val `core-tests` = project "org.typelevel" %% "scalacheck-effect-munit" % "2.0.0-M2" % Test, "org.typelevel" %% "log4cats-noop" % "2.6.0" % Test, "org.typelevel" %% "mouse" % "1.2.2" % Test, + "org.tpolecat" %% "natchez-noop" % "0.3.4" % Test, "io.circe" %% "circe-literal" % "0.14.6" % Test, "io.circe" %% "circe-testing" % "0.14.6" % Test, "com.47deg" %% "scalacheck-toolbox-datetime" % "0.7.0" % Test exclude("joda-time", "joda-time"), @@ -110,6 +116,7 @@ lazy val `autoscaling-ecs-draining-lambda` = project "org.typelevel" %% "feral-lambda" % "0.2.4", "org.typelevel" %% "log4cats-slf4j" % "2.6.0", "org.http4s" %% "http4s-ember-client" % "0.23.24", + "org.tpolecat" %% "natchez-xray" % "0.3.4", "com.amazonaws" % "aws-lambda-java-log4j2" % "1.6.0" % Runtime, "org.apache.logging.log4j" % "log4j-slf4j-impl" % "2.22.0" % Runtime, "org.typelevel" %% "cats-effect-testkit" % "3.5.2" % Test, @@ -120,6 +127,7 @@ lazy val `autoscaling-ecs-draining-lambda` = project "io.circe" %% "circe-literal" % "0.14.6" % Test, "io.circe" %% "circe-testing" % "0.14.6" % Test, "org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2" % Test, + "com.dwolla" %% "dwolla-otel-natchez" % "0.2.2" % Test, ) }, topLevelDirectory := None, @@ -144,6 +152,7 @@ lazy val `registrator-health-check-lambda` = project "org.typelevel" %% "log4cats-slf4j" % "2.6.0", "org.http4s" %% "http4s-ember-client" % "0.23.21", "org.typelevel" %% "mouse" % "1.2.2", + "org.tpolecat" %% "natchez-xray" % "0.3.4", "com.amazonaws" % "aws-lambda-java-log4j2" % "1.6.0" % Runtime, "org.apache.logging.log4j" % "log4j-slf4j-impl" % "2.22.0" % Runtime, "org.typelevel" %% "cats-effect-testkit" % "3.5.2" % Test, diff --git a/core-tests/src/test/scala/com/dwolla/aws/autoscaling/LifecycleHookHandlerSpec.scala b/core-tests/src/test/scala/com/dwolla/aws/autoscaling/LifecycleHookHandlerSpec.scala index 2a7f76f..7195901 100644 --- a/core-tests/src/test/scala/com/dwolla/aws/autoscaling/LifecycleHookHandlerSpec.scala +++ b/core-tests/src/test/scala/com/dwolla/aws/autoscaling/LifecycleHookHandlerSpec.scala @@ -5,6 +5,7 @@ import _root_.io.circe.* import _root_.io.circe.literal.* import _root_.io.circe.syntax.* import cats.effect.* +import cats.mtl.Local import cats.syntax.all.* import com.dwolla.aws import com.dwolla.aws.autoscaling.given @@ -16,6 +17,9 @@ import org.scalacheck.effect.PropF.forAllF import org.typelevel.log4cats.LoggerFactory import org.typelevel.log4cats.noop.NoOpFactory import com.amazonaws.sns.TopicARN +import com.dwolla.tracing.mtl.LocalSpan +import natchez.Span +import natchez.noop.NoopEntrypoint class LifecycleHookHandlerSpec extends CatsEffectSuite @@ -74,10 +78,11 @@ class LifecycleHookHandlerSpec arbSubject: Option[String], ) => for { + given Local[IO, Span[IO]] <- LocalSpan() deferredLifecycleHookNotification <- Deferred[IO, LifecycleHookNotification] deferredSnsTopicArn <- Deferred[IO, TopicARN] - eventHandler = LifecycleHookHandler { case (arn, notif) => + eventHandler = LifecycleHookHandler(NoopEntrypoint[IO](), "TestHook") { case (arn, notif) => deferredLifecycleHookNotification.complete(notif) >> deferredSnsTopicArn.complete(arn).void } @@ -100,12 +105,12 @@ class LifecycleHookHandlerSpec arbContext: Context[IO], arbSubject: Option[String], ) => - val eventHandler = LifecycleHookHandler { case (arn, notif) => - IO(fail(s"TerminationEventHandler should not be called for test messages, but was called with $arn and $notif")) - } - for { + given Local[IO, Span[IO]] <- LocalSpan() snsEvent <- snsMessage(arbSnsTopicArn, testNotification, arbSubject).as[SnsEvent].liftTo[IO] + eventHandler = LifecycleHookHandler(NoopEntrypoint[IO](), "TestHook") { case (arn, notif) => + IO(fail(s"TerminationEventHandler should not be called for test messages, but was called with $arn and $notif")) + } output <- eventHandler(LambdaEnv.pure(snsEvent, arbContext)) } yield { assertEquals(output, None) diff --git a/core-tests/src/test/scala/com/dwolla/aws/ecs/EcsAlgSpec.scala b/core-tests/src/test/scala/com/dwolla/aws/ecs/EcsAlgSpec.scala index 293a57b..273d0a3 100644 --- a/core-tests/src/test/scala/com/dwolla/aws/ecs/EcsAlgSpec.scala +++ b/core-tests/src/test/scala/com/dwolla/aws/ecs/EcsAlgSpec.scala @@ -11,6 +11,7 @@ import com.dwolla.aws.ArbitraryPagination import com.dwolla.aws.ecs.* import fs2.{Chunk, Stream} import munit.{CatsEffectSuite, ScalaCheckEffectSuite} +import natchez.Trace.Implicits.noop import org.scalacheck.effect.PropF.forAllF import org.typelevel.log4cats.LoggerFactory import org.typelevel.log4cats.noop.NoOpFactory diff --git a/core/src/main/scala/com/dwolla/aws/autoscaling/AutoScalingAlg.scala b/core/src/main/scala/com/dwolla/aws/autoscaling/AutoScalingAlg.scala index c934347..3659b1a 100644 --- a/core/src/main/scala/com/dwolla/aws/autoscaling/AutoScalingAlg.scala +++ b/core/src/main/scala/com/dwolla/aws/autoscaling/AutoScalingAlg.scala @@ -1,14 +1,20 @@ -package com.dwolla.aws.autoscaling +package com.dwolla.aws +package autoscaling -import cats.effect.* +import cats.* import cats.effect.syntax.all.* +import cats.effect.{Trace as _, *} import cats.syntax.all.* +import cats.tagless.Trivial +import cats.tagless.aop.Aspect import com.amazonaws.autoscaling.{AutoScaling, LifecycleActionResult, XmlStringMaxLen19} import com.amazonaws.ec2.InstanceId import com.amazonaws.sns.* +import com.dwolla.aws.TraceableValueInstances.given import com.dwolla.aws.autoscaling.LifecycleState.* import com.dwolla.aws.sns.* import io.circe.syntax.* +import natchez.TraceableValue import org.typelevel.log4cats.{Logger, LoggerFactory} import scala.concurrent.duration.* @@ -22,6 +28,40 @@ trait AutoScalingAlg[F[_]] { } object AutoScalingAlg { + given Aspect[AutoScalingAlg, TraceableValue, Trivial] = new Aspect[AutoScalingAlg, TraceableValue, Trivial] { + override def weave[F[_]](af: AutoScalingAlg[F]): AutoScalingAlg[[A] =>> Aspect.Weave[F, TraceableValue, Trivial, A]] = + new AutoScalingAlg[[A] =>> Aspect.Weave[F, TraceableValue, Trivial, A]] { + override def pauseAndRecurse(topic: TopicARN, lifecycleHookNotification: LifecycleHookNotification, onlyIfInState: LifecycleState): Aspect.Weave[F, TraceableValue, Trivial, Unit] = + new Aspect.Weave[F, TraceableValue, Trivial, Unit]( + "AutoScalingAlg", + List(List( + Aspect.Advice.byValue[TraceableValue, TopicARN]("topic", topic), + Aspect.Advice.byValue[TraceableValue, LifecycleHookNotification]("lifecycleHookNotification", lifecycleHookNotification), + Aspect.Advice.byValue[TraceableValue, LifecycleState]("onlyIfInState", onlyIfInState), + )), + Aspect.Advice[F, Trivial, Unit]("pauseAndRecurse", af.pauseAndRecurse(topic, lifecycleHookNotification, onlyIfInState)) + ) + + override def continueAutoScaling(l: LifecycleHookNotification): Aspect.Weave[F, TraceableValue, Trivial, Unit] = + new Aspect.Weave[F, TraceableValue, Trivial, Unit]( + "AutoScalingAlg", + List(List( + Aspect.Advice.byValue[TraceableValue, LifecycleHookNotification]("l", l), + )), + Aspect.Advice[F, Trivial, Unit]("continueAutoScaling", af.continueAutoScaling(l)) + ) + } + + override def mapK[F[_], G[_]](af: AutoScalingAlg[F])(fk: F ~> G): AutoScalingAlg[G] = + new AutoScalingAlg[G] { + override def pauseAndRecurse(topic: TopicARN, lifecycleHookNotification: LifecycleHookNotification, onlyIfInState: LifecycleState): G[Unit] = + fk(af.pauseAndRecurse(topic, lifecycleHookNotification, onlyIfInState)) + + override def continueAutoScaling(l: LifecycleHookNotification): G[Unit] = + fk(af.continueAutoScaling(l)) + } + } + def apply[F[_] : Async : LoggerFactory](autoScalingClient: AutoScaling[F], sns: SnsAlg[F]): AutoScalingAlg[F] = new AutoScalingAlgImpl(autoScalingClient, sns) diff --git a/core/src/main/scala/com/dwolla/aws/autoscaling/LifecycleHookHandler.scala b/core/src/main/scala/com/dwolla/aws/autoscaling/LifecycleHookHandler.scala index 8213aab..09bb307 100644 --- a/core/src/main/scala/com/dwolla/aws/autoscaling/LifecycleHookHandler.scala +++ b/core/src/main/scala/com/dwolla/aws/autoscaling/LifecycleHookHandler.scala @@ -1,25 +1,32 @@ package com.dwolla.aws.autoscaling import cats.* +import cats.effect.{Trace as _, *} +import cats.mtl.Local import cats.syntax.all.* import com.amazonaws.sns.TopicARN import com.dwolla.aws.sns.* import feral.lambda.events.SnsEvent import feral.lambda.{INothing, LambdaEnv} import fs2.Stream +import natchez.{EntryPoint, Span} import org.typelevel.log4cats.LoggerFactory +import com.dwolla.tracing.syntax.* object LifecycleHookHandler { - def apply[F[_] : MonadThrow : LoggerFactory](eventBridge: (TopicARN, LifecycleHookNotification) => F[Unit]) - (using fs2.Compiler[F, F]): LambdaEnv[F, SnsEvent] => F[Option[INothing]] = env => - Stream.eval(env.event) - .map(_.records) - .flatMap(Stream.emits(_)) - .map(_.sns) - .evalMap(ParseLifecycleHookNotification[F]) - .unNone - .evalMap(eventBridge.tupled) - .compile - .drain - .as(None) + def apply[F[_] : MonadCancelThrow : LoggerFactory](entryPoint: EntryPoint[F], hookName: String) + (eventBridge: (TopicARN, LifecycleHookNotification) => F[Unit]) + (using fs2.Compiler[F, F], Local[F, Span[F]]): LambdaEnv[F, SnsEvent] => F[Option[INothing]] = env => + entryPoint.runInRoot(hookName) { + Stream.eval(env.event) + .map(_.records) + .flatMap(Stream.emits(_)) + .map(_.sns) + .evalMap(ParseLifecycleHookNotification[F]) + .unNone + .evalMap(eventBridge.tupled) + .compile + .drain + .as(None) + } } diff --git a/core/src/main/scala/com/dwolla/aws/autoscaling/model.scala b/core/src/main/scala/com/dwolla/aws/autoscaling/model.scala index db9e985..ec749d0 100644 --- a/core/src/main/scala/com/dwolla/aws/autoscaling/model.scala +++ b/core/src/main/scala/com/dwolla/aws/autoscaling/model.scala @@ -7,8 +7,10 @@ import com.amazonaws.autoscaling.* import com.amazonaws.ec2.* import com.dwolla.aws.AccountId import io.circe.* +import io.circe.syntax.* import monix.newtypes.NewtypeWrapped import monix.newtypes.integrations.* +import natchez.TraceableValue import java.time.Instant @@ -78,6 +80,8 @@ object LifecycleHookNotification extends DerivedCirceCodec { "LifecycleTransition", "NotificationMetadata", )(LifecycleHookNotification.apply) + + given TraceableValue[LifecycleHookNotification] = TraceableValue.stringToTraceValue.contramap(_.asJson.noSpaces) } object TestNotification extends DerivedCirceCodec { given Encoder[TestNotification] = @@ -150,4 +154,6 @@ object LifecycleState { } def fromString(s: String): Option[LifecycleState] = maybeFromString.lift(s) + + given TraceableValue[LifecycleState] = TraceableValue.stringToTraceValue.contramap(_.awsName) } diff --git a/core/src/main/scala/com/dwolla/aws/cloudformation/CloudFormationAlg.scala b/core/src/main/scala/com/dwolla/aws/cloudformation/CloudFormationAlg.scala index ef940a2..9af3896 100644 --- a/core/src/main/scala/com/dwolla/aws/cloudformation/CloudFormationAlg.scala +++ b/core/src/main/scala/com/dwolla/aws/cloudformation/CloudFormationAlg.scala @@ -1,9 +1,14 @@ -package com.dwolla.aws.cloudformation +package com.dwolla.aws +package cloudformation import cats.effect.* import cats.syntax.all.* +import cats.tagless.aop.Aspect +import cats.~> import com.amazonaws.cloudformation.* +import com.dwolla.aws.TraceableValueInstances.given import monix.newtypes.NewtypeWrapped +import natchez.TraceableValue import org.typelevel.log4cats.* import smithy4s.http.UnknownErrorResponse @@ -29,4 +34,25 @@ object CloudFormationAlg { } } } + + given Aspect[CloudFormationAlg, TraceableValue, TraceableValue] = new Aspect[CloudFormationAlg, TraceableValue, TraceableValue] { + override def weave[F[_]](af: CloudFormationAlg[F]): CloudFormationAlg[[A] =>> Aspect.Weave[F, TraceableValue, TraceableValue, A]] = + new CloudFormationAlg[[A] =>> Aspect.Weave[F, TraceableValue, TraceableValue, A]] { + override def physicalResourceIdFor(stack: StackArn, logicalResourceId: LogicalResourceId): Aspect.Weave[F, TraceableValue, TraceableValue, Option[PhysicalResourceId]] = + Aspect.Weave( + "CloudFormationAlg", + List(List( + Aspect.Advice.byValue[TraceableValue, StackArn]("stack", stack), + Aspect.Advice.byValue[TraceableValue, LogicalResourceId]("logicalResourceId", logicalResourceId), + )), + Aspect.Advice[F, TraceableValue, Option[PhysicalResourceId]]("physicalResourceIdFor", af.physicalResourceIdFor(stack, logicalResourceId)) + ) + } + + override def mapK[F[_], G[_]](af: CloudFormationAlg[F])(fk: F ~> G): CloudFormationAlg[G] = + new CloudFormationAlg[G] { + override def physicalResourceIdFor(stack: StackArn, logicalResourceId: LogicalResourceId): G[Option[PhysicalResourceId]] = + fk(af.physicalResourceIdFor(stack, logicalResourceId)) + } + } } diff --git a/core/src/main/scala/com/dwolla/aws/ec2/Ec2Alg.scala b/core/src/main/scala/com/dwolla/aws/ec2/Ec2Alg.scala index 1800bef..eafe8e4 100644 --- a/core/src/main/scala/com/dwolla/aws/ec2/Ec2Alg.scala +++ b/core/src/main/scala/com/dwolla/aws/ec2/Ec2Alg.scala @@ -1,10 +1,15 @@ -package com.dwolla.aws.ec2 +package com.dwolla.aws +package ec2 import cats.effect.* import cats.syntax.all.* +import cats.tagless.aop.Aspect +import cats.~> +import com.amazonaws.ec2.{Tag as _, *} import com.dwolla.aws.* +import com.dwolla.aws.TraceableValueInstances.given +import natchez.TraceableValue import org.typelevel.log4cats.* -import com.amazonaws.ec2.{Tag as _, *} trait Ec2Alg[F[_]] { def getTagsForInstance(id: InstanceId): F[List[Tag]] @@ -29,6 +34,25 @@ object Ec2Alg { } } } + + given Aspect[Ec2Alg, TraceableValue, TraceableValue] = new Aspect[Ec2Alg, TraceableValue, TraceableValue] { + override def weave[F[_]](af: Ec2Alg[F]): Ec2Alg[[A] =>> Aspect.Weave[F, TraceableValue, TraceableValue, A]] = + new Ec2Alg[[A] =>> Aspect.Weave[F, TraceableValue, TraceableValue, A]] { + override def getTagsForInstance(id: InstanceId): Aspect.Weave[F, TraceableValue, TraceableValue, List[Tag]] = + Aspect.Weave[F, TraceableValue, TraceableValue, List[Tag]]( + "Ec2Alg", + List(List( + Aspect.Advice.byValue[TraceableValue, InstanceId]("id", id), + )), + Aspect.Advice[F, TraceableValue, List[Tag]]("getTagsForInstance", af.getTagsForInstance(id)) + ) + } + + override def mapK[F[_], G[_]](af: Ec2Alg[F])(fk: F ~> G): Ec2Alg[G] = + new Ec2Alg[G] { + override def getTagsForInstance(id: InstanceId): G[List[Tag]] = fk(af.getTagsForInstance(id)) + } + } } extension [A] (maybeList: Option[List[A]]) { diff --git a/core/src/main/scala/com/dwolla/aws/ecs/EcsAlg.scala b/core/src/main/scala/com/dwolla/aws/ecs/EcsAlg.scala index dbb9b95..d7bde55 100644 --- a/core/src/main/scala/com/dwolla/aws/ecs/EcsAlg.scala +++ b/core/src/main/scala/com/dwolla/aws/ecs/EcsAlg.scala @@ -1,6 +1,7 @@ package com.dwolla.aws.ecs import cats.* +import cats.effect.MonadCancelThrow import cats.syntax.all.* import com.amazonaws.ec2.InstanceId import com.amazonaws.ecs.ECS @@ -9,7 +10,10 @@ import com.dwolla.aws.ecs.TaskStatus.{Running, stoppedTaskStatuses} import com.dwolla.fs2utils.Pagination import fs2.* import monix.newtypes.HasExtractor +import natchez.Trace +import natchez.Trace.given import org.typelevel.log4cats.{Logger, LoggerFactory} +import com.dwolla.aws.TraceableValueInstances.given abstract class EcsAlg[F[_] : Applicative, G[_]] { def listClusterArns: G[ClusterArn] @@ -26,44 +30,49 @@ abstract class EcsAlg[F[_] : Applicative, G[_]] { } object EcsAlg { - def apply[F[_] : Monad : LoggerFactory](ecs: ECS[F]) - (using Compiler[F, F]): EcsAlg[F, Stream[F, *]] = new EcsAlg[F, Stream[F, *]] { + def apply[F[_] : MonadCancelThrow : LoggerFactory : Trace](ecs: ECS[F]) + (using Compiler[F, F]): EcsAlg[F, Stream[F, *]] = new EcsAlg[F, Stream[F, *]] { override def listClusterArns: Stream[F, ClusterArn] = - Pagination.offsetUnfoldChunkEval { - ecs - .listClusters(_: Option[String], None) - .map { resp => - resp.clusterArns.map(Chunk.from(_)).getOrElse(Chunk.empty) -> resp.nextToken + Trace[Stream[F, *]].span("EcsAlg.listClusterArns") { + Pagination.offsetUnfoldChunkEval { + ecs + .listClusters(_: Option[String], None) + .map { resp => + resp.clusterArns.map(Chunk.from(_)).getOrElse(Chunk.empty) -> resp.nextToken + } } + .map(ClusterArn(_)) } - .map(ClusterArn(_)) override def listContainerInstances(cluster: ClusterArn): Stream[F, ContainerInstance] = - Pagination.offsetUnfoldChunkEval { (nextToken: Option[String]) => - ecs - .listContainerInstances(cluster.value.some, nextToken = nextToken) - .map { resp => - resp.containerInstanceArns.toChunk.map(ContainerInstanceId(_)) -> resp.nextToken + Trace[Stream[F, *]].span("EcsAlg.listContainerInstances") { + Trace[Stream[F, *]].put("cluster" -> cluster) >> + Pagination.offsetUnfoldChunkEval { (nextToken: Option[String]) => + ecs + .listContainerInstances(cluster.value.some, nextToken = nextToken) + .map { resp => + resp.containerInstanceArns.toChunk.map(ContainerInstanceId(_)) -> resp.nextToken + } } - } - .through(chunkedEcsRequest(ecs.describeContainerInstances(_, cluster = cluster.value.some))(_.containerInstances)) - .map { ci => - (ci.containerInstanceArn.map(ContainerInstanceId(_)), - ci.ec2InstanceId.map(InstanceId(_)), - ci.status.flatMap(ContainerInstanceStatus.fromStatus), - ) - .tupled - } - .unNone - .evalMap { (ci, ec2, status) => - listTasks(cluster, ci) - .through(chunkedEcsRequest(ecs.describeTasks(_, cluster.value.some))(_.tasks)) - .filterNot(_.lastStatus.flatMap(TaskStatus.fromString.lift).exists(stoppedTaskStatuses.contains)) - .compile - .count - .map(TaskCount(_)) - .map(ContainerInstance(ci, ec2, _, status)) - } + .through(chunkedEcsRequest(ecs.describeContainerInstances(_, cluster = cluster.value.some))(_.containerInstances)) + .map { ci => + (ci.containerInstanceArn.map(ContainerInstanceId(_)), + ci.ec2InstanceId.map(InstanceId(_)), + ci.status.flatMap(ContainerInstanceStatus.fromStatus), + ) + .tupled + } + .unNone + .evalMap { (ci, ec2, status) => + listTasks(cluster, ci) + .through(chunkedEcsRequest(ecs.describeTasks(_, cluster.value.some))(_.tasks)) + .filterNot(_.lastStatus.flatMap(TaskStatus.fromString.lift).exists(stoppedTaskStatuses.contains)) + .compile + .count + .map(TaskCount(_)) + .map(ContainerInstance(ci, ec2, _, status)) + } + } /** * Many ECS Describe* APIs accept up to 100 identifiers to be described in a single request. @@ -79,54 +88,66 @@ object EcsAlg { .unchunks override def findEc2Instance(ec2InstanceId: InstanceId): F[Option[(ClusterArn, ContainerInstance)]] = - LoggerFactory[F].create.flatMap { case given Logger[F] => - listClusterArns - // TODO listContainerInstances could use a CQL expression to narrow the search - .mproduct(listContainerInstances(_).filter(_.ec2InstanceId == ec2InstanceId)) - .compile - .last - .flatTap { ec2Instance => - Logger[F].info(s"EC2 Instance search results: $ec2Instance") + Trace[F].span("EcsAlg.findEc2Instance") { + Trace[F].put("ec2InstanceId" -> ec2InstanceId) >> + LoggerFactory[F].create.flatMap { case given Logger[F] => + listClusterArns + // TODO listContainerInstances could use a CQL expression to narrow the search + .mproduct(listContainerInstances(_).filter(_.ec2InstanceId == ec2InstanceId)) + .compile + .last + .flatTap { ec2Instance => + Logger[F].info(s"EC2 Instance search results: $ec2Instance") + } } } private def listTasks(cluster: ClusterArn, ci: ContainerInstanceId): Stream[F, TaskArn] = - for { - given _ <- Stream.eval(LoggerFactory[F].create) - _ <- Stream.eval(Logger[F].info(s"listing tasks on instance ${ci.value} in cluster $cluster")) - taskArn <- Pagination.offsetUnfoldChunkEval { (nextToken: Option[String]) => - for { - _ <- Logger[F].trace(s"cluster = ${cluster.value}, containerInstance = ${ci.value}, nextToken = $nextToken") - resp <- ecs.listTasks( - cluster = cluster.value.some, - containerInstance = ci.value.some, - nextToken = nextToken, - ) - } yield resp.taskArns.map(Chunk.from(_)).getOrElse(Chunk.empty) -> resp.nextToken - } - } yield TaskArn(taskArn) + Trace[Stream[F, *]].span("EcsAlg.listTasks") { + for { + _ <- Trace[Stream[F, *]].put("cluster" -> cluster, "ci" -> ci) + given _ <- Stream.eval(LoggerFactory[F].create) + _ <- Stream.eval(Logger[F].info(s"listing tasks on instance ${ci.value} in cluster $cluster")) + taskArn <- Pagination.offsetUnfoldChunkEval { (nextToken: Option[String]) => + for { + _ <- Logger[F].trace(s"cluster = ${cluster.value}, containerInstance = ${ci.value}, nextToken = $nextToken") + resp <- ecs.listTasks( + cluster = cluster.value.some, + containerInstance = ci.value.some, + nextToken = nextToken, + ) + } yield resp.taskArns.map(Chunk.from(_)).getOrElse(Chunk.empty) -> resp.nextToken + } + } yield TaskArn(taskArn) + } override def isTaskDefinitionRunningOnInstance(cluster: ClusterArn, ci: ContainerInstance, taskDefinition: TaskDefinitionArn): F[Boolean] = - LoggerFactory[F].create.flatMap { case given Logger[F] => - Logger[F].info(s"looking for task definition ${taskDefinition.value} on instance ${ci.containerInstanceId.value} in cluster ${cluster.value}") >> - listTasks(cluster, ci.containerInstanceId) - .through(chunkedEcsRequest(ecs.describeTasks(_, cluster.value.some))(_.tasks)) - .filter(_.taskDefinitionArn.map(TaskDefinitionArn(_)).contains(taskDefinition)) - .filter(_.lastStatus.flatMap(TaskStatus.fromString.lift).contains(Running)) - .head - .compile - .last - .map(_.isDefined) + Trace[F].span("EcsAlg.isTaskDefinitionRunningOnInstance") { + Trace[F].put("cluster" -> cluster, "ci" -> ci, "taskDefinition" -> taskDefinition) >> + LoggerFactory[F].create.flatMap { case given Logger[F] => + Logger[F].info(s"looking for task definition ${taskDefinition.value} on instance ${ci.containerInstanceId.value} in cluster ${cluster.value}") >> + listTasks(cluster, ci.containerInstanceId) + .through(chunkedEcsRequest(ecs.describeTasks(_, cluster.value.some))(_.tasks)) + .filter(_.taskDefinitionArn.map(TaskDefinitionArn(_)).contains(taskDefinition)) + .filter(_.lastStatus.flatMap(TaskStatus.fromString.lift).contains(Running)) + .head + .compile + .last + .map(_.isDefined) + } } override protected def drainInstanceImpl(cluster: ClusterArn, ci: ContainerInstance): F[Unit] = - LoggerFactory[F].create.flatMap { case given Logger[F] => - Logger[F].info(s"draining instance $ci in cluster $cluster") >> - ecs.updateContainerInstancesState(List(ci.containerInstanceId.value), com.amazonaws.ecs.ContainerInstanceStatus.DRAINING, cluster.value.some) - .void + Trace[F].span("EcsAlg.drainInstanceImpl") { + Trace[F].put("cluster" -> cluster, "ci" -> ci) >> + LoggerFactory[F].create.flatMap { case given Logger[F] => + Logger[F].info(s"draining instance $ci in cluster $cluster") >> + ecs.updateContainerInstancesState(List(ci.containerInstanceId.value), com.amazonaws.ecs.ContainerInstanceStatus.DRAINING, cluster.value.some) + .void + } } } diff --git a/core/src/main/scala/com/dwolla/aws/ecs/model.scala b/core/src/main/scala/com/dwolla/aws/ecs/model.scala index 89a1970..7e480b9 100644 --- a/core/src/main/scala/com/dwolla/aws/ecs/model.scala +++ b/core/src/main/scala/com/dwolla/aws/ecs/model.scala @@ -1,11 +1,18 @@ package com.dwolla.aws.ecs -import cats.Order import cats.syntax.all.* +import cats.tagless.aop.Aspect +import cats.{Eval, Order, ~>} import com.amazonaws.ec2.InstanceId -import com.dwolla.aws.AccountId +import com.amazonaws.ecs +import com.amazonaws.ecs.* +import com.dwolla.aws.TraceableValueInstances.given +import com.dwolla.aws.{AccountId, TraceableValueInstances, given} +import io.circe.JsonObject import monix.newtypes.* +import natchez.TraceableValue import smithy4s.aws.AwsRegion +import io.circe.syntax.* type ContainerInstanceId = ContainerInstanceId.Type object ContainerInstanceId extends NewtypeWrapped[String] @@ -29,6 +36,17 @@ case class ContainerInstance(containerInstanceId: ContainerInstanceId, countOfTasksNotStopped: TaskCount, status: ContainerInstanceStatus, ) +object ContainerInstance { + given TraceableValue[ContainerInstance] = + TraceableValue.stringToTraceValue.contramap { ci => + JsonObject( + "containerInstanceId" -> ci.containerInstanceId.value.asJson, + "ec2InstanceId" -> ci.ec2InstanceId.value.asJson, + "countOfTasksNotStopped" -> ci.countOfTasksNotStopped.value.asJson, + "status" -> ci.status.toString.asJson, + ).asJson.noSpaces + } +} enum ContainerInstanceStatus { case Active @@ -95,3 +113,162 @@ object TaskStatus { case "DELETED" => Deleted } } + +def traceableAdvice[A: TraceableValue](name: String, a: A): Aspect.Advice[Eval, TraceableValue] = + Aspect.Advice.byValue[TraceableValue, A](name, a) + +given Aspect[ECS, TraceableValue, TraceableValue] = + new Aspect[ECS, TraceableValue, TraceableValue] { + override def weave[F[_]](af: ECS[F]): ECS[[A] =>> Aspect.Weave[F, TraceableValue, TraceableValue, A]] = + new ECS[[A] =>> Aspect.Weave[F, TraceableValue, TraceableValue, A]] { + override def describeContainerInstances(containerInstances: List[String], + cluster: Option[String], + include: Option[List[ContainerInstanceField]]): Aspect.Weave[F, TraceableValue, TraceableValue, DescribeContainerInstancesResponse] = + Aspect.Weave[F, TraceableValue, TraceableValue, DescribeContainerInstancesResponse]( + "ECS", + List(List( + traceableAdvice("containerInstances", containerInstances), + traceableAdvice("cluster", cluster), + traceableAdvice("include", include), + )), + Aspect.Advice[F, TraceableValue, DescribeContainerInstancesResponse]( + "describeContainerInstances", + af.describeContainerInstances(containerInstances, cluster, include) + ) + ) + + override def describeTasks(tasks: List[String], + cluster: Option[String], + include: Option[List[TaskField]]): Aspect.Weave[F, TraceableValue, TraceableValue, DescribeTasksResponse] = + Aspect.Weave[F, TraceableValue, TraceableValue, DescribeTasksResponse]( + "ECS", + List(List( + traceableAdvice("tasks", tasks), + traceableAdvice("cluster", cluster), + traceableAdvice("include", include), + )), + Aspect.Advice[F, TraceableValue, DescribeTasksResponse]( + "describeTasks", + af.describeTasks(tasks, cluster, include) + ) + ) + + override def listClusters(nextToken: Option[String], + maxResults: Option[BoxedInteger]): Aspect.Weave[F, TraceableValue, TraceableValue, ListClustersResponse] = + Aspect.Weave[F, TraceableValue, TraceableValue, ListClustersResponse]( + "ECS", + List(List( + traceableAdvice("nextToken", nextToken), + traceableAdvice("maxResults", maxResults), + )), + Aspect.Advice[F, TraceableValue, ListClustersResponse]( + "listClusters", + af.listClusters(nextToken, maxResults) + ) + ) + + override def listContainerInstances(cluster: Option[String], + filter: Option[String], + nextToken: Option[String], + maxResults: Option[BoxedInteger], + status: Option[ecs.ContainerInstanceStatus]): Aspect.Weave[F, TraceableValue, TraceableValue, ListContainerInstancesResponse] = + Aspect.Weave[F, TraceableValue, TraceableValue, ListContainerInstancesResponse]( + "ECS", + List(List( + traceableAdvice("cluster", cluster), + traceableAdvice("filter", filter), + traceableAdvice("nextToken", nextToken), + traceableAdvice("maxResults", maxResults), + traceableAdvice("status", status), + )), + Aspect.Advice[F, TraceableValue, ListContainerInstancesResponse]( + "listContainerInstances", + af.listContainerInstances(cluster, filter, nextToken, maxResults, status) + ) + ) + + override def listTasks(cluster: Option[String], + containerInstance: Option[String], + family: Option[String], + nextToken: Option[String], + maxResults: Option[BoxedInteger], + startedBy: Option[String], + serviceName: Option[String], + desiredStatus: Option[DesiredStatus], + launchType: Option[LaunchType]): Aspect.Weave[F, TraceableValue, TraceableValue, ListTasksResponse] = + Aspect.Weave[F, TraceableValue, TraceableValue, ListTasksResponse]( + "ECS", + List(List( + traceableAdvice("cluster", cluster), + traceableAdvice("containerInstance", containerInstance), + traceableAdvice("family", family), + traceableAdvice("nextToken", nextToken), + traceableAdvice("maxResults", maxResults), + traceableAdvice("startedBy", startedBy), + traceableAdvice("serviceName", serviceName), + traceableAdvice("desiredStatus", desiredStatus), + traceableAdvice("launchType", launchType), + )), + Aspect.Advice[F, TraceableValue, ListTasksResponse]( + "listTasks", + af.listTasks(cluster, containerInstance, family, nextToken, maxResults, startedBy, serviceName, desiredStatus, launchType) + ) + ) + + override def updateContainerInstancesState(containerInstances: List[String], + status: ecs.ContainerInstanceStatus, + cluster: Option[String]): Aspect.Weave[F, TraceableValue, TraceableValue, UpdateContainerInstancesStateResponse] = + Aspect.Weave[F, TraceableValue, TraceableValue, UpdateContainerInstancesStateResponse]( + "ECS", + List(List( + traceableAdvice("containerInstances", containerInstances), + traceableAdvice("status", status), + traceableAdvice("cluster", cluster), + )), + Aspect.Advice[F, TraceableValue, UpdateContainerInstancesStateResponse]( + "updateContainerInstancesState", + af.updateContainerInstancesState(containerInstances, status, cluster) + ) + ) + } + + override def mapK[F[_], G[_]](af: ECS[F])(fk: F ~> G): ECS[G] = + new ECS[G] { + override def describeContainerInstances(containerInstances: List[String], + cluster: Option[String], + include: Option[List[ContainerInstanceField]]): G[DescribeContainerInstancesResponse] = + fk(af.describeContainerInstances(containerInstances, cluster, include)) + + override def describeTasks(tasks: List[String], + cluster: Option[String], + include: Option[List[TaskField]]): G[DescribeTasksResponse] = + fk(af.describeTasks(tasks, cluster, include)) + + override def listClusters(nextToken: Option[String], + maxResults: Option[BoxedInteger]): G[ListClustersResponse] = + fk(af.listClusters(nextToken, maxResults)) + + override def listContainerInstances(cluster: Option[String], + filter: Option[String], + nextToken: Option[String], + maxResults: Option[BoxedInteger], + status: Option[ecs.ContainerInstanceStatus]): G[ListContainerInstancesResponse] = + fk(af.listContainerInstances(cluster, filter, nextToken, maxResults, status)) + + override def listTasks(cluster: Option[String], + containerInstance: Option[String], + family: Option[String], + nextToken: Option[String], + maxResults: Option[BoxedInteger], + startedBy: Option[String], + serviceName: Option[String], + desiredStatus: Option[DesiredStatus], + launchType: Option[LaunchType]): G[ListTasksResponse] = + fk(af.listTasks(cluster, containerInstance, family, nextToken, maxResults, startedBy, serviceName, desiredStatus, launchType)) + + override def updateContainerInstancesState(containerInstances: List[String], + status: ecs.ContainerInstanceStatus, + cluster: Option[String]): G[UpdateContainerInstancesStateResponse] = + fk(af.updateContainerInstancesState(containerInstances, status, cluster)) + } + } diff --git a/core/src/main/scala/com/dwolla/aws/model.scala b/core/src/main/scala/com/dwolla/aws/model.scala index a895b66..62a58b9 100644 --- a/core/src/main/scala/com/dwolla/aws/model.scala +++ b/core/src/main/scala/com/dwolla/aws/model.scala @@ -1,19 +1,77 @@ package com.dwolla.aws +import alleycats.Empty +import com.github.plokhotnyuk.jsoniter_scala.core.WriterConfig import monix.newtypes.NewtypeWrapped import io.circe.{Decoder, Encoder} -import smithy4s.{Bijection, Newtype} +import io.circe.syntax.* +import monix.newtypes.integrations.DerivedCirceCodec +import natchez.TraceableValue +import smithy4s.json.Json.payloadCodecs +import smithy4s.{Bijection, Newtype, Schema} + +import scala.annotation.targetName type AccountId = AccountId.Type -object AccountId extends NewtypeWrapped[String] +object AccountId extends NewtypeWrapped[String] with DerivedCirceCodec type TagName = TagName.Type -object TagName extends NewtypeWrapped[String] +object TagName extends NewtypeWrapped[String] with DerivedCirceCodec type TagValue = TagValue.Type -object TagValue extends NewtypeWrapped[String] +object TagValue extends NewtypeWrapped[String] with DerivedCirceCodec case class Tag(name: TagName, value: TagValue) +object Tag { + given Encoder[Tag] = Encoder.forProduct2("name", "value") { t => (t.name, t.value) } + given TraceableValue[List[Tag]] = TraceableValue.stringToTraceValue.contramap(_.asJson.noSpaces) +} given[B <: Newtype[A]#Type, A: Encoder](using Bijection[A, B]): Encoder[B] = Encoder[A].contramap(summon[Bijection[A, B]].from) given[B <: Newtype[A]#Type, A: Decoder](using Bijection[A, B]): Decoder[B] = Decoder[A].map(summon[Bijection[A, B]].to) + +given[B <: Newtype[A]#Type, A: Empty](using Bijection[A, B]): Empty[B] = new Empty[B] { + override def empty: B = summon[Bijection[A, B]].to(Empty[A].empty) +} + +object TraceableValueInstances extends LowPriorityTraceableValueInstances { + given[B <: Newtype[A]#Type, A: TraceableValue](using Bijection[A, B]): TraceableValue[B] = TraceableValue[A].contramap(summon[Bijection[A, B]].from) + given[B <: monix.newtypes.Newtype[A]#Type, A: TraceableValue](using monix.newtypes.HasExtractor.Aux[B, A]): TraceableValue[B] = TraceableValue[A].contramap(summon[monix.newtypes.HasExtractor.Aux[B, A]].extract) + given[A: Empty : TraceableValue]: TraceableValue[Option[A]] = + TraceableValue[A].contramap(_.getOrElse(Empty[A].empty)) + @targetName("given_TraceableValue_A_by_schema") + given[A](using schemaA: Schema[A]): TraceableValue[A] = + TraceableValue.stringToTraceValue.contramap(asJsonString(schemaA)) + @targetName("given_TraceableValue_List_A_by_schema") + given[A](using schemaA: Schema[A]): TraceableValue[List[A]] = + TraceableValue.stringToTraceValue.contramap { + asJsonString(Schema.list(schemaA)) + } + @targetName("given_TraceableValue_Option_List_A_by_schema") + given[A](using schemaA: Schema[A]): TraceableValue[Option[List[A]]] = + TraceableValue.stringToTraceValue.contramap { + asJsonString(Schema.list(schemaA).option) + } + + @targetName("given_TraceableValue_List_A_as_json") + given[A: Encoder]: TraceableValue[List[A]] = + TraceableValue.stringToTraceValue.contramap(_.asJson.noSpaces) +} + +trait LowPriorityTraceableValueInstances { + @targetName("given_TraceableValue_Option_A_by_schema") + given[A](using schemaA: Schema[A]): TraceableValue[Option[A]] = + TraceableValue.stringToTraceValue.contramap { + asJsonString(schemaA.option) + } +} + + +private def asJsonString[A](schema: Schema[A]) + (a: A): String = + payloadCodecs + .withJsoniterWriterConfig(WriterConfig.withIndentionStep(0)) + .encoders + .fromSchema(schema) + .encode(a) + .toUTF8String diff --git a/core/src/main/scala/com/dwolla/aws/sns/SnsAlg.scala b/core/src/main/scala/com/dwolla/aws/sns/SnsAlg.scala index 0108c66..86ede0d 100644 --- a/core/src/main/scala/com/dwolla/aws/sns/SnsAlg.scala +++ b/core/src/main/scala/com/dwolla/aws/sns/SnsAlg.scala @@ -1,15 +1,42 @@ -package com.dwolla.aws.sns +package com.dwolla.aws +package sns import cats.* import cats.syntax.all.* -import org.typelevel.log4cats.* +import cats.tagless.Trivial +import cats.tagless.aop.Aspect import com.amazonaws.sns.* +import com.dwolla.aws.TraceableValueInstances.given +import natchez.TraceableValue +import org.typelevel.log4cats.* trait SnsAlg[F[_]] { def publish(topic: TopicARN, message: Message): F[Unit] } object SnsAlg { + given Aspect[SnsAlg, TraceableValue, Trivial] = new Aspect[SnsAlg, TraceableValue, Trivial] { + override def weave[F[_]](af: SnsAlg[F]): SnsAlg[[A] =>> Aspect.Weave[F, TraceableValue, Trivial, A]] = + new SnsAlg[[A] =>> Aspect.Weave[F, TraceableValue, Trivial, A]] { + override def publish(topic: TopicARN, message: Message): Aspect.Weave[F, TraceableValue, Trivial, Unit] = + Aspect.Weave( + "SnsAlg", + List(List( + Aspect.Advice.byValue[TraceableValue, TopicARN]("topic", topic), + Aspect.Advice.byValue[TraceableValue, Message]("message", message), + )), + Aspect.Advice[F, Trivial, Unit]("publish", af.publish(topic, message)) + ) + } + + override def mapK[F[_], G[_]](af: SnsAlg[F]) + (fk: F ~> G): SnsAlg[G] = + new SnsAlg[G] { + override def publish(topic: TopicARN, message: Message): G[Unit] = + fk(af.publish(topic, message)) + } + } + def apply[F[_] : Monad : LoggerFactory](client: SNS[F]): SnsAlg[F] = new SnsAlg[F] { override def publish(topic: TopicARN, message: Message): F[Unit] = LoggerFactory[F].create.flatMap { case given Logger[F] => diff --git a/core/src/main/scala/com/dwolla/tracing/mtl/LocalSpan.scala b/core/src/main/scala/com/dwolla/tracing/mtl/LocalSpan.scala new file mode 100644 index 0000000..d1fdf9a --- /dev/null +++ b/core/src/main/scala/com/dwolla/tracing/mtl/LocalSpan.scala @@ -0,0 +1,41 @@ +package com.dwolla.tracing.mtl + +import cats.* +import cats.effect.{Trace as _, *} +import cats.mtl.Local +import natchez.Span + +/** + * hopefully https://github.com/typelevel/cats-effect/pull/3429 lands and + * this all goes away when we update to cats-effect 3.6.0 + */ +object LocalSpan { + /** + * Given an `IOLocal[E]`, provides a `Local[IO, E]`. + * + * Copied from [[https://github.com/armanbilge/oxidized/blob/412be9cd0a60b901fd5f9157ea48bda8632c5527/core/src/main/scala/oxidized/instances/io.scala#L34-L43 armanbilge/oxidized]] + * but hopefully this instance can be brought into cats-effect someday and + * removed here. See [[https://github.com/typelevel/cats-effect/issues/3385 typelevel/cats-effect#3385]] + * for more discussion. + * + * TODO remove if made more widely available upstream + * + * @param ioLocal the `IOLocal` that propagates the state of the `E` element + * @tparam E the type of state to propagate + * @return a `Local[IO, E]` backed by the given `IOLocal[E]` + */ + def toLocal[E](ioLocal: IOLocal[E]): Local[IO, E] = + new Local[IO, E] { + override def local[A](fa: IO[A])(f: E => E): IO[A] = + ioLocal.get.flatMap { initial => + ioLocal.set(f(initial)) >> fa.guarantee(ioLocal.set(initial)) + } + + override def applicative: Applicative[IO] = IO.asyncForIO + + override def ask[E2 >: E]: IO[E2] = ioLocal.get + } + + def apply(initial: Span[IO] = Span.noop[IO]): IO[Local[IO, Span[IO]]] = + IOLocal(initial).map(toLocal) +} diff --git a/draining/src/main/scala/com/dwolla/autoscaling/ecs/draining/TerminationEventHandler.scala b/draining/src/main/scala/com/dwolla/autoscaling/ecs/draining/TerminationEventHandler.scala index 556b756..1b3f0d1 100644 --- a/draining/src/main/scala/com/dwolla/autoscaling/ecs/draining/TerminationEventHandler.scala +++ b/draining/src/main/scala/com/dwolla/autoscaling/ecs/draining/TerminationEventHandler.scala @@ -1,15 +1,22 @@ package com.dwolla.autoscaling.ecs.draining import cats.* -import cats.effect.* +import cats.effect.std.Random +import cats.effect.{Trace as _, *} +import cats.mtl.Local import com.amazonaws.autoscaling.AutoScaling import com.amazonaws.ecs.ECS import com.amazonaws.sns.SNS import com.dwolla.aws.autoscaling.{AutoScalingAlg, LifecycleHookHandler} -import com.dwolla.aws.ecs.EcsAlg +import com.dwolla.aws.ecs.{EcsAlg, given} import com.dwolla.aws.sns.SnsAlg +import com.dwolla.tracing.mtl.LocalSpan +import com.dwolla.tracing.syntax.* import feral.lambda.events.SnsEvent import feral.lambda.{INothing, IOLambda, LambdaEnv} +import natchez.Span +import natchez.mtl.given +import natchez.xray.XRay import org.http4s.ember.client.EmberClientBuilder import org.typelevel.log4cats.LoggerFactory import org.typelevel.log4cats.slf4j.Slf4jFactory @@ -21,11 +28,14 @@ class TerminationEventHandler extends IOLambda[SnsEvent, INothing] { for { client <- EmberClientBuilder.default[IO].build given LoggerFactory[IO] = Slf4jFactory.create[IO] + given Local[IO, Span[IO]] <- LocalSpan().toResource + given Random[IO] <- Random.scalaUtilRandom[IO].toResource + entryPoint <- XRay.entryPoint[IO]() awsEnv <- AwsEnvironment.default(client, AwsRegion.US_WEST_2) - ecs <- AwsClient(ECS, awsEnv).map(EcsAlg(_)) + ecs <- AwsClient(ECS, awsEnv).map(_.traceWithInputsAndOutputs).map(EcsAlg(_)) autoscalingClient <- AwsClient(AutoScaling, awsEnv) - sns <- AwsClient(SNS, awsEnv).map(SnsAlg[IO](_)) - autoscaling = AutoScalingAlg[IO](autoscalingClient, sns) + sns <- AwsClient(SNS, awsEnv).map(SnsAlg[IO](_).traceWithInputs) + autoscaling = AutoScalingAlg[IO](autoscalingClient, sns).traceWithInputs bridgeFunction = TerminationEventBridge(ecs, autoscaling) - } yield LifecycleHookHandler[IO](bridgeFunction) + } yield LifecycleHookHandler(entryPoint, "TerminationEventHandler")(bridgeFunction) } diff --git a/draining/src/test/scala/com/dwolla/autoscaling/ecs/draining/TestApp.scala b/draining/src/test/scala/com/dwolla/autoscaling/ecs/draining/TestApp.scala index 20553c3..59ad55d 100644 --- a/draining/src/test/scala/com/dwolla/autoscaling/ecs/draining/TestApp.scala +++ b/draining/src/test/scala/com/dwolla/autoscaling/ecs/draining/TestApp.scala @@ -3,33 +3,45 @@ package com.dwolla.autoscaling.ecs.draining import cats.* import cats.effect.* import cats.effect.std.Console +import cats.mtl.Local import cats.syntax.all.* import com.amazonaws.ecs.ECS -import com.dwolla.aws.ecs.EcsAlg +import com.dwolla.aws.ecs.{EcsAlg, given} +import com.dwolla.tracing.mtl.LocalSpan +import com.dwolla.tracing.syntax.* +import com.dwolla.tracing.{DwollaEnvironment, OpenTelemetryAtDwolla} import fs2.Stream +import natchez.Span +import natchez.mtl.given import org.http4s.ember.client.EmberClientBuilder import org.typelevel.log4cats.* import smithy4s.aws.* import smithy4s.aws.kernel.AwsRegion -object TestApp extends IOApp.Simple { - override def run: IO[Unit] = - Stream.resource { - for { - client <- EmberClientBuilder.default[IO].build - given LoggerFactory[IO] = new ConsoleLogger[IO] - awsEnv <- AwsEnvironment.default(client, AwsRegion.US_WEST_2) - ecs <- AwsClient(ECS, awsEnv).map(EcsAlg(_)) - } yield ecs - } - .flatMap { ecs => - ecs.listClusterArns - .filter(_.value.contains("Production")) - .flatMap(ecs.listContainerInstances) +object TestApp extends ResourceApp.Simple { + override def run: Resource[IO, Unit] = + OpenTelemetryAtDwolla[IO]("ecs-autoscaling-draining-hook", DwollaEnvironment.Local) + .flatMap(_.root("ECS Test App")) + .evalMap(LocalSpan(_)) + .evalMap { case given Local[IO, Span[IO]] => + Stream.resource { + for { + client <- EmberClientBuilder.default[IO].build + given LoggerFactory[IO] = new ConsoleLogger[IO] + awsEnv <- AwsEnvironment.default(client, AwsRegion.US_WEST_2) + ecs <- AwsClient(ECS, awsEnv).map(_.traceWithInputsAndOutputs).map(EcsAlg(_)) + } yield ecs + } + .flatMap { ecs => + ecs.listClusterArns + .filter(_.value.contains("Production")) + .flatMap(ecs.listContainerInstances) + } + .evalMap(c => IO.println(c)) + .compile + .drain } - .evalMap(c => IO.println(c)) - .compile - .drain + } class ConsoleLogger[F[_] : Applicative : Console] extends LoggerFactory[F] { diff --git a/registrator-health-check/src/main/scala/com/dwolla/autoscaling/ecs/registrator/ScaleOutPendingEventHandler.scala b/registrator-health-check/src/main/scala/com/dwolla/autoscaling/ecs/registrator/ScaleOutPendingEventHandler.scala index 64956a0..adf34e1 100644 --- a/registrator-health-check/src/main/scala/com/dwolla/autoscaling/ecs/registrator/ScaleOutPendingEventHandler.scala +++ b/registrator-health-check/src/main/scala/com/dwolla/autoscaling/ecs/registrator/ScaleOutPendingEventHandler.scala @@ -1,7 +1,9 @@ package com.dwolla.autoscaling.ecs.registrator import cats.* -import cats.effect.* +import cats.effect.std.Random +import cats.effect.{Trace as _, *} +import cats.mtl.Local import com.amazonaws.autoscaling.AutoScaling import com.amazonaws.cloudformation.CloudFormation import com.amazonaws.ec2.EC2 @@ -10,10 +12,15 @@ import com.amazonaws.sns.SNS import com.dwolla.aws.autoscaling.{AutoScalingAlg, LifecycleHookHandler} import com.dwolla.aws.cloudformation.CloudFormationAlg import com.dwolla.aws.ec2.Ec2Alg -import com.dwolla.aws.ecs.EcsAlg +import com.dwolla.aws.ecs.{EcsAlg, given} import com.dwolla.aws.sns.SnsAlg +import com.dwolla.tracing.mtl.LocalSpan +import com.dwolla.tracing.syntax.* import feral.lambda.events.SnsEvent import feral.lambda.{INothing, IOLambda, LambdaEnv} +import natchez.mtl.given +import natchez.xray.XRay +import natchez.Span import org.http4s.ember.client.EmberClientBuilder import org.typelevel.log4cats.LoggerFactory import org.typelevel.log4cats.slf4j.Slf4jFactory @@ -25,13 +32,16 @@ class ScaleOutPendingEventHandler extends IOLambda[SnsEvent, INothing] { for { client <- EmberClientBuilder.default[IO].build given LoggerFactory[IO] = Slf4jFactory.create[IO] + given Local[IO, Span[IO]] <- LocalSpan().toResource + given Random[IO] <- Random.scalaUtilRandom[IO].toResource + entryPoint <- XRay.entryPoint[IO]() awsEnv <- AwsEnvironment.default(client, AwsRegion.US_WEST_2) - ecs <- AwsClient(ECS, awsEnv).map(EcsAlg(_)) + ecs <- AwsClient(ECS, awsEnv).map(_.traceWithInputsAndOutputs).map(EcsAlg(_)) autoscalingClient <- AwsClient(AutoScaling, awsEnv) sns <- AwsClient(SNS, awsEnv).map(SnsAlg[IO](_)) ec2Client <- AwsClient(EC2, awsEnv).map(Ec2Alg[IO](_)) cloudformationClient <- AwsClient(CloudFormation, awsEnv).map(CloudFormationAlg[IO](_)) autoscaling = AutoScalingAlg[IO](autoscalingClient, sns) bridgeFunction = ScaleOutPendingEventBridge(ecs, autoscaling, ec2Client, cloudformationClient) - } yield LifecycleHookHandler[IO](bridgeFunction) + } yield LifecycleHookHandler(entryPoint, "ScaleOutPendingEventHandler")(bridgeFunction) }