From 3b97370be9714b15509c77f618b579a9e554f255 Mon Sep 17 00:00:00 2001 From: Brian Holt Date: Thu, 3 Oct 2024 12:01:36 -0500 Subject: [PATCH] spans created by TracedThriftServer should be Server spans by default --- .../async/finagle/TracedThriftServer.scala | 108 +++++++++++------- project/AsyncUtilsBuildPlugin.scala | 2 +- 2 files changed, 68 insertions(+), 42 deletions(-) diff --git a/finagle-natchez/src/main/scala/com/dwolla/util/async/finagle/TracedThriftServer.scala b/finagle-natchez/src/main/scala/com/dwolla/util/async/finagle/TracedThriftServer.scala index 09d270a07..f96a4367e 100644 --- a/finagle-natchez/src/main/scala/com/dwolla/util/async/finagle/TracedThriftServer.scala +++ b/finagle-natchez/src/main/scala/com/dwolla/util/async/finagle/TracedThriftServer.scala @@ -1,20 +1,19 @@ package com.dwolla.util.async.finagle -import cats._ -import cats.effect._ +import cats.* +import cats.effect.* import cats.effect.std.{Dispatcher, Env} -import cats.mtl._ -import cats.syntax.all._ -import cats.tagless._ -import cats.tagless.aop._ -import cats.tagless.implicits._ +import cats.mtl.* +import cats.syntax.all.* +import cats.tagless.aop.* +import cats.tagless.implicits.* import com.comcast.ip4s.{IpAddress, SocketAddress} -import com.dwolla.util.async.finagle.HigherKindedToMethodPerEndpoint._ -import com.dwolla.util.async.twitter._ -import com.twitter.finagle._ +import com.dwolla.util.async.finagle.HigherKindedToMethodPerEndpoint.* +import com.dwolla.util.async.twitter.* +import com.twitter.finagle.* import com.twitter.finagle.tracing.TraceId import com.twitter.util.{Future, Promise} -import natchez._ +import natchez.* import scala.concurrent.ExecutionContext import scala.util.{Failure, Success} @@ -44,39 +43,35 @@ object TracedThriftServer { ) (implicit ec: ExecutionContext, LocalSpan: Local[F, Span[F]]): Resource[F, ListeningServer] = + TracedThriftServer(addr, label, iface, entryPoint, Span.Options.Defaults.withSpanKind(Span.SpanKind.Server)) + + /** + * Builds a `ListeningServer` with Zipkin and Natchez tracing enabled. The `Thrift[F]` implementation will + * + * @param addr the socket address at which to listen for connections. (Typically `0.0.0.0:port`) + * @param label the name to assign the service in the Zipkin traces + * @param iface the Thrift method-per-endpoint implementation. Must be implemented in `Kleisli[F, Span[F], *]` so the span continued from Zipkin can be injected into the program. + * @param entryPoint the Natchez `EntryPoint` responsible for creating `Span` instances based on the Trace IDs coming from Finagle/Zipkin + * @param spanOptions allows the caller to set specific Span options that should be set on each new trace + * @param ec `ExecutionContext` where Twitter Futures will be completed when the Scala Future output by `Dispatcher.unsafeToFuture` completes + * @param LocalSpan `Local[F, Span[F]]` used to continue or start a new root span when a Thrift request is received + * @tparam F the effect in which to operate, which must have `Async[F]` and `Env[F]` instances available + * @tparam Thrift the higher-kinded MethodPerEndpoint Thrift algebra generated by scrooge and modified by the `AddCatsTaglessInstances` scalafix + * @return a `Resource[F, ListeningServer]` managing the lifecycle of the underlying Finagle server + */ + def apply[F[_] : Async : Env, Thrift[_[_]] : HigherKindedToMethodPerEndpoint : Instrument](addr: SocketAddress[IpAddress], + label: String, + iface: Thrift[F], + entryPoint: EntryPoint[F], + spanOptions: Span.Options, + ) + (implicit ec: ExecutionContext, + LocalSpan: Local[F, Span[F]]): Resource[F, ListeningServer] = Dispatcher.parallel[F] - .map(unsafeMapKToFuture(_, iface.instrument, entryPoint)) + .map(new UnsafeInstrumentationToFuture[F, Thrift](_, entryPoint, spanOptions)) + .map(iface.instrument.mapK(_)) .flatMap(t => Resource.make(acquire(addr, label, t))(release[F])) - private def unsafeMapKToFuture[F[_] : Async, Thrift[_[_]] : FunctorK](dispatcher: Dispatcher[F], - iface: Thrift[Instrumentation[F, *]], - entryPoint: EntryPoint[F], - ) - (implicit ec: ExecutionContext, - LocalSpan: Local[F, Span[F]]): Thrift[Future] = - iface.mapK(new (Instrumentation[F, *] ~> Future) { - override def apply[A](fa: Instrumentation[F, A]): Future[A] = - currentTraceId().flatMap { maybeTraceId => - val p = Promise[A]() - - dispatcher.unsafeToFuture { - entryPoint.continueOrElseRoot( - s"${fa.algebraName}.${fa.methodName}", - maybeTraceId - .map(ZipkinKernel.asKernel) - .getOrElse(Kernel(Map.empty)) - ) - .use(Local[F, Span[F]].scope(fa.value)) - } - .onComplete { - case Success(a) => p.setValue(a) - case Failure(ex) => p.setException(ex) - } - - p - } - }) - private def currentTraceId(): Future[Option[TraceId]] = Future(com.twitter.finagle.tracing.Trace.idOption) @@ -96,4 +91,35 @@ object TracedThriftServer { private def release[F[_] : Async](s: ListeningServer): F[Unit] = liftFuture(Sync[F].delay(s.close())) + + private class UnsafeInstrumentationToFuture[F[_] : MonadCancelThrow, Thrift[_[_]]](dispatcher: Dispatcher[F], + entryPoint: EntryPoint[F], + spanOptions: Span.Options, + ) + (implicit ec: ExecutionContext, + LocalSpan: Local[F, Span[F]]) extends (Instrumentation[F, *] ~> Future) { + override def apply[A](fa: Instrumentation[F, A]): Future[A] = + currentTraceId().flatMap { maybeTraceId => + val p = Promise[A]() + + dispatcher.unsafeToFuture { + entryPoint + .continueOrElseRoot( + name = s"${fa.algebraName}.${fa.methodName}", + kernel = maybeTraceId + .map(ZipkinKernel.asKernel) + .getOrElse(Kernel(Map.empty)), + options = spanOptions + ) + .use(Local[F, Span[F]].scope(fa.value)) + } + .onComplete { + case Success(a) => p.setValue(a) + case Failure(ex) => p.setException(ex) + } + + p + } + } + } diff --git a/project/AsyncUtilsBuildPlugin.scala b/project/AsyncUtilsBuildPlugin.scala index 65a0c430e..f956ed42e 100644 --- a/project/AsyncUtilsBuildPlugin.scala +++ b/project/AsyncUtilsBuildPlugin.scala @@ -339,7 +339,7 @@ object AsyncUtilsBuildPlugin extends AutoPlugin { ), startYear := Option(2021), sonatypeCredentialHost := xerial.sbt.Sonatype.sonatypeLegacy, - tlBaseVersion := "1.1", + tlBaseVersion := "1.2", tlCiReleaseBranches := Seq("main"), mergifyRequiredJobs ++= Seq("validate-steward"), mergifyStewardConfig ~= { _.map {