Skip to content

Commit

Permalink
add XRay tracing using Natchez
Browse files Browse the repository at this point in the history
this uses the X-Ray backend directly instead of using OTel, because
AWS Lambda supports X-Ray by default, and including an OTel collector is
extra work. It sounds like including the collector can be done by
building it in as a Lambda Layer, but that's not something I've done
before, and I think this will work for now.
  • Loading branch information
bpholt committed Nov 29, 2023
1 parent 83f2464 commit 46492bd
Show file tree
Hide file tree
Showing 16 changed files with 601 additions and 127 deletions.
11 changes: 10 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,13 @@ lazy val `autoscaling-ecs-core`: Project = project
"io.circe" %% "circe-parser" % "0.14.6",
"io.monix" %% "newtypes-core" % "0.2.3",
"io.monix" %% "newtypes-circe-v0-14" % "0.2.3",
"org.tpolecat" %% "natchez-core" % "0.3.4",
"org.typelevel" %% "cats-tagless-core" % "0.15.0",
"org.typelevel" %% "alleycats-core" % "2.10.0",
"com.dwolla" %% "natchez-tagless" % "0.2.4",
)
}
},
scalacOptions += "-Xmax-inlines:40", // because Task from ECS has 33 fields
)
.dependsOn(
`smithy4s-generated`,
Expand All @@ -86,6 +91,7 @@ lazy val `core-tests` = project
"org.typelevel" %% "scalacheck-effect-munit" % "2.0.0-M2" % Test,
"org.typelevel" %% "log4cats-noop" % "2.6.0" % Test,
"org.typelevel" %% "mouse" % "1.2.2" % Test,
"org.tpolecat" %% "natchez-noop" % "0.3.4" % Test,
"io.circe" %% "circe-literal" % "0.14.6" % Test,
"io.circe" %% "circe-testing" % "0.14.6" % Test,
"com.47deg" %% "scalacheck-toolbox-datetime" % "0.7.0" % Test exclude("joda-time", "joda-time"),
Expand All @@ -110,6 +116,7 @@ lazy val `autoscaling-ecs-draining-lambda` = project
"org.typelevel" %% "feral-lambda" % "0.2.4",
"org.typelevel" %% "log4cats-slf4j" % "2.6.0",
"org.http4s" %% "http4s-ember-client" % "0.23.24",
"org.tpolecat" %% "natchez-xray" % "0.3.4",
"com.amazonaws" % "aws-lambda-java-log4j2" % "1.6.0" % Runtime,
"org.apache.logging.log4j" % "log4j-slf4j-impl" % "2.22.0" % Runtime,
"org.typelevel" %% "cats-effect-testkit" % "3.5.2" % Test,
Expand All @@ -120,6 +127,7 @@ lazy val `autoscaling-ecs-draining-lambda` = project
"io.circe" %% "circe-literal" % "0.14.6" % Test,
"io.circe" %% "circe-testing" % "0.14.6" % Test,
"org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2" % Test,
"com.dwolla" %% "dwolla-otel-natchez" % "0.2.2" % Test,
)
},
topLevelDirectory := None,
Expand All @@ -144,6 +152,7 @@ lazy val `registrator-health-check-lambda` = project
"org.typelevel" %% "log4cats-slf4j" % "2.6.0",
"org.http4s" %% "http4s-ember-client" % "0.23.21",
"org.typelevel" %% "mouse" % "1.2.2",
"org.tpolecat" %% "natchez-xray" % "0.3.4",
"com.amazonaws" % "aws-lambda-java-log4j2" % "1.6.0" % Runtime,
"org.apache.logging.log4j" % "log4j-slf4j-impl" % "2.22.0" % Runtime,
"org.typelevel" %% "cats-effect-testkit" % "3.5.2" % Test,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import _root_.io.circe.*
import _root_.io.circe.literal.*
import _root_.io.circe.syntax.*
import cats.effect.*
import cats.mtl.Local
import cats.syntax.all.*
import com.dwolla.aws
import com.dwolla.aws.autoscaling.given
Expand All @@ -16,6 +17,9 @@ import org.scalacheck.effect.PropF.forAllF
import org.typelevel.log4cats.LoggerFactory
import org.typelevel.log4cats.noop.NoOpFactory
import com.amazonaws.sns.TopicARN
import com.dwolla.tracing.mtl.LocalSpan
import natchez.Span
import natchez.noop.NoopEntrypoint

class LifecycleHookHandlerSpec
extends CatsEffectSuite
Expand Down Expand Up @@ -74,10 +78,11 @@ class LifecycleHookHandlerSpec
arbSubject: Option[String],
) =>
for {
given Local[IO, Span[IO]] <- LocalSpan()
deferredLifecycleHookNotification <- Deferred[IO, LifecycleHookNotification]
deferredSnsTopicArn <- Deferred[IO, TopicARN]

eventHandler = LifecycleHookHandler { case (arn, notif) =>
eventHandler = LifecycleHookHandler(NoopEntrypoint[IO](), "TestHook") { case (arn, notif) =>
deferredLifecycleHookNotification.complete(notif) >>
deferredSnsTopicArn.complete(arn).void
}
Expand All @@ -100,12 +105,12 @@ class LifecycleHookHandlerSpec
arbContext: Context[IO],
arbSubject: Option[String],
) =>
val eventHandler = LifecycleHookHandler { case (arn, notif) =>
IO(fail(s"TerminationEventHandler should not be called for test messages, but was called with $arn and $notif"))
}

for {
given Local[IO, Span[IO]] <- LocalSpan()
snsEvent <- snsMessage(arbSnsTopicArn, testNotification, arbSubject).as[SnsEvent].liftTo[IO]
eventHandler = LifecycleHookHandler(NoopEntrypoint[IO](), "TestHook") { case (arn, notif) =>
IO(fail(s"TerminationEventHandler should not be called for test messages, but was called with $arn and $notif"))
}
output <- eventHandler(LambdaEnv.pure(snsEvent, arbContext))
} yield {
assertEquals(output, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import com.dwolla.aws.ArbitraryPagination
import com.dwolla.aws.ecs.*
import fs2.{Chunk, Stream}
import munit.{CatsEffectSuite, ScalaCheckEffectSuite}
import natchez.Trace.Implicits.noop
import org.scalacheck.effect.PropF.forAllF
import org.typelevel.log4cats.LoggerFactory
import org.typelevel.log4cats.noop.NoOpFactory
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
package com.dwolla.aws.autoscaling
package com.dwolla.aws
package autoscaling

import cats.effect.*
import cats.*
import cats.effect.syntax.all.*
import cats.effect.{Trace as _, *}
import cats.syntax.all.*
import cats.tagless.Trivial
import cats.tagless.aop.Aspect
import com.amazonaws.autoscaling.{AutoScaling, LifecycleActionResult, XmlStringMaxLen19}
import com.amazonaws.ec2.InstanceId
import com.amazonaws.sns.*
import com.dwolla.aws.TraceableValueInstances.given
import com.dwolla.aws.autoscaling.LifecycleState.*
import com.dwolla.aws.sns.*
import io.circe.syntax.*
import natchez.TraceableValue
import org.typelevel.log4cats.{Logger, LoggerFactory}

import scala.concurrent.duration.*
Expand All @@ -22,6 +28,40 @@ trait AutoScalingAlg[F[_]] {
}

object AutoScalingAlg {
given Aspect[AutoScalingAlg, TraceableValue, Trivial] = new Aspect[AutoScalingAlg, TraceableValue, Trivial] {
override def weave[F[_]](af: AutoScalingAlg[F]): AutoScalingAlg[[A] =>> Aspect.Weave[F, TraceableValue, Trivial, A]] =
new AutoScalingAlg[[A] =>> Aspect.Weave[F, TraceableValue, Trivial, A]] {
override def pauseAndRecurse(topic: TopicARN, lifecycleHookNotification: LifecycleHookNotification, onlyIfInState: LifecycleState): Aspect.Weave[F, TraceableValue, Trivial, Unit] =
new Aspect.Weave[F, TraceableValue, Trivial, Unit](
"AutoScalingAlg",
List(List(
Aspect.Advice.byValue[TraceableValue, TopicARN]("topic", topic),
Aspect.Advice.byValue[TraceableValue, LifecycleHookNotification]("lifecycleHookNotification", lifecycleHookNotification),
Aspect.Advice.byValue[TraceableValue, LifecycleState]("onlyIfInState", onlyIfInState),
)),
Aspect.Advice[F, Trivial, Unit]("pauseAndRecurse", af.pauseAndRecurse(topic, lifecycleHookNotification, onlyIfInState))
)

override def continueAutoScaling(l: LifecycleHookNotification): Aspect.Weave[F, TraceableValue, Trivial, Unit] =
new Aspect.Weave[F, TraceableValue, Trivial, Unit](
"AutoScalingAlg",
List(List(
Aspect.Advice.byValue[TraceableValue, LifecycleHookNotification]("l", l),
)),
Aspect.Advice[F, Trivial, Unit]("continueAutoScaling", af.continueAutoScaling(l))
)
}

override def mapK[F[_], G[_]](af: AutoScalingAlg[F])(fk: F ~> G): AutoScalingAlg[G] =
new AutoScalingAlg[G] {
override def pauseAndRecurse(topic: TopicARN, lifecycleHookNotification: LifecycleHookNotification, onlyIfInState: LifecycleState): G[Unit] =
fk(af.pauseAndRecurse(topic, lifecycleHookNotification, onlyIfInState))

override def continueAutoScaling(l: LifecycleHookNotification): G[Unit] =
fk(af.continueAutoScaling(l))
}
}

def apply[F[_] : Async : LoggerFactory](autoScalingClient: AutoScaling[F],
sns: SnsAlg[F]): AutoScalingAlg[F] =
new AutoScalingAlgImpl(autoScalingClient, sns)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,32 @@
package com.dwolla.aws.autoscaling

import cats.*
import cats.effect.{Trace as _, *}
import cats.mtl.Local
import cats.syntax.all.*
import com.amazonaws.sns.TopicARN
import com.dwolla.aws.sns.*
import feral.lambda.events.SnsEvent
import feral.lambda.{INothing, LambdaEnv}
import fs2.Stream
import natchez.{EntryPoint, Span}
import org.typelevel.log4cats.LoggerFactory
import com.dwolla.tracing.syntax.*

object LifecycleHookHandler {
def apply[F[_] : MonadThrow : LoggerFactory](eventBridge: (TopicARN, LifecycleHookNotification) => F[Unit])
(using fs2.Compiler[F, F]): LambdaEnv[F, SnsEvent] => F[Option[INothing]] = env =>
Stream.eval(env.event)
.map(_.records)
.flatMap(Stream.emits(_))
.map(_.sns)
.evalMap(ParseLifecycleHookNotification[F])
.unNone
.evalMap(eventBridge.tupled)
.compile
.drain
.as(None)
def apply[F[_] : MonadCancelThrow : LoggerFactory](entryPoint: EntryPoint[F], hookName: String)
(eventBridge: (TopicARN, LifecycleHookNotification) => F[Unit])
(using fs2.Compiler[F, F], Local[F, Span[F]]): LambdaEnv[F, SnsEvent] => F[Option[INothing]] = env =>
entryPoint.runInRoot(hookName) {
Stream.eval(env.event)
.map(_.records)
.flatMap(Stream.emits(_))
.map(_.sns)
.evalMap(ParseLifecycleHookNotification[F])
.unNone
.evalMap(eventBridge.tupled)
.compile
.drain
.as(None)
}
}
6 changes: 6 additions & 0 deletions core/src/main/scala/com/dwolla/aws/autoscaling/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import com.amazonaws.autoscaling.*
import com.amazonaws.ec2.*
import com.dwolla.aws.AccountId
import io.circe.*
import io.circe.syntax.*
import monix.newtypes.NewtypeWrapped
import monix.newtypes.integrations.*
import natchez.TraceableValue

import java.time.Instant

Expand Down Expand Up @@ -78,6 +80,8 @@ object LifecycleHookNotification extends DerivedCirceCodec {
"LifecycleTransition",
"NotificationMetadata",
)(LifecycleHookNotification.apply)

given TraceableValue[LifecycleHookNotification] = TraceableValue.stringToTraceValue.contramap(_.asJson.noSpaces)
}
object TestNotification extends DerivedCirceCodec {
given Encoder[TestNotification] =
Expand Down Expand Up @@ -150,4 +154,6 @@ object LifecycleState {
}

def fromString(s: String): Option[LifecycleState] = maybeFromString.lift(s)

given TraceableValue[LifecycleState] = TraceableValue.stringToTraceValue.contramap(_.awsName)
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package com.dwolla.aws.cloudformation
package com.dwolla.aws
package cloudformation

import cats.effect.*
import cats.syntax.all.*
import cats.tagless.aop.Aspect
import cats.~>
import com.amazonaws.cloudformation.*
import com.dwolla.aws.TraceableValueInstances.given
import monix.newtypes.NewtypeWrapped
import natchez.TraceableValue
import org.typelevel.log4cats.*
import smithy4s.http.UnknownErrorResponse

Expand All @@ -29,4 +34,25 @@ object CloudFormationAlg {
}
}
}

given Aspect[CloudFormationAlg, TraceableValue, TraceableValue] = new Aspect[CloudFormationAlg, TraceableValue, TraceableValue] {
override def weave[F[_]](af: CloudFormationAlg[F]): CloudFormationAlg[[A] =>> Aspect.Weave[F, TraceableValue, TraceableValue, A]] =
new CloudFormationAlg[[A] =>> Aspect.Weave[F, TraceableValue, TraceableValue, A]] {
override def physicalResourceIdFor(stack: StackArn, logicalResourceId: LogicalResourceId): Aspect.Weave[F, TraceableValue, TraceableValue, Option[PhysicalResourceId]] =
Aspect.Weave(
"CloudFormationAlg",
List(List(
Aspect.Advice.byValue[TraceableValue, StackArn]("stack", stack),
Aspect.Advice.byValue[TraceableValue, LogicalResourceId]("logicalResourceId", logicalResourceId),
)),
Aspect.Advice[F, TraceableValue, Option[PhysicalResourceId]]("physicalResourceIdFor", af.physicalResourceIdFor(stack, logicalResourceId))
)
}

override def mapK[F[_], G[_]](af: CloudFormationAlg[F])(fk: F ~> G): CloudFormationAlg[G] =
new CloudFormationAlg[G] {
override def physicalResourceIdFor(stack: StackArn, logicalResourceId: LogicalResourceId): G[Option[PhysicalResourceId]] =
fk(af.physicalResourceIdFor(stack, logicalResourceId))
}
}
}
28 changes: 26 additions & 2 deletions core/src/main/scala/com/dwolla/aws/ec2/Ec2Alg.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package com.dwolla.aws.ec2
package com.dwolla.aws
package ec2

import cats.effect.*
import cats.syntax.all.*
import cats.tagless.aop.Aspect
import cats.~>
import com.amazonaws.ec2.{Tag as _, *}
import com.dwolla.aws.*
import com.dwolla.aws.TraceableValueInstances.given
import natchez.TraceableValue
import org.typelevel.log4cats.*
import com.amazonaws.ec2.{Tag as _, *}

trait Ec2Alg[F[_]] {
def getTagsForInstance(id: InstanceId): F[List[Tag]]
Expand All @@ -29,6 +34,25 @@ object Ec2Alg {
}
}
}

given Aspect[Ec2Alg, TraceableValue, TraceableValue] = new Aspect[Ec2Alg, TraceableValue, TraceableValue] {
override def weave[F[_]](af: Ec2Alg[F]): Ec2Alg[[A] =>> Aspect.Weave[F, TraceableValue, TraceableValue, A]] =
new Ec2Alg[[A] =>> Aspect.Weave[F, TraceableValue, TraceableValue, A]] {
override def getTagsForInstance(id: InstanceId): Aspect.Weave[F, TraceableValue, TraceableValue, List[Tag]] =
Aspect.Weave[F, TraceableValue, TraceableValue, List[Tag]](
"Ec2Alg",
List(List(
Aspect.Advice.byValue[TraceableValue, InstanceId]("id", id),
)),
Aspect.Advice[F, TraceableValue, List[Tag]]("getTagsForInstance", af.getTagsForInstance(id))
)
}

override def mapK[F[_], G[_]](af: Ec2Alg[F])(fk: F ~> G): Ec2Alg[G] =
new Ec2Alg[G] {
override def getTagsForInstance(id: InstanceId): G[List[Tag]] = fk(af.getTagsForInstance(id))
}
}
}

extension [A] (maybeList: Option[List[A]]) {
Expand Down
Loading

0 comments on commit 46492bd

Please sign in to comment.