Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Tracing #177

Merged
merged 3 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ lazy val `http4s-consul-middleware` = crossProject(JSPlatform, JVMPlatform)
.settings(
description := "http4s middleware to discover the host and port for an HTTP request using Consul",
tpolecatScalacOptions += ScalacOptions.release("8"),
tlMimaPreviousVersions ++= {
if (scalaVersion.value.startsWith("2.")) Set("0.1.0")
else Set.empty
},
libraryDependencies ++= {
val http4sVersion = "0.23.27"
val munitVersion = "1.0.0"
Expand All @@ -54,8 +50,9 @@ lazy val `http4s-consul-middleware` = crossProject(JSPlatform, JVMPlatform)
"org.typelevel" %%% "keypool" % "0.4.9",
"org.typelevel" %%% "case-insensitive" % "1.4.0",
"org.typelevel" %%% "cats-effect" % "3.5.4",
"org.typelevel" %%% "cats-mtl" % "1.4.0",
"org.tpolecat" %%% "natchez-core" % "0.3.5",
"org.tpolecat" %%% "natchez-http4s" % "0.5.0",
"org.tpolecat" %%% "natchez-mtl" % "0.3.5",
"org.typelevel" %%% "log4cats-noop" % log4catsVersion % Test,
"org.http4s" %%% "http4s-ember-client" % http4sVersion % Test,
"org.http4s" %%% "http4s-dsl" % http4sVersion % Test,
Expand All @@ -67,7 +64,6 @@ lazy val `http4s-consul-middleware` = crossProject(JSPlatform, JVMPlatform)
compilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1"),
)
else Seq.empty)

},
)
.jvmSettings(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package com.dwolla.consul.examples

import cats.effect._
import natchez.Trace
import natchez.noop.NoopSpan
import natchez.Span
import natchez.mtl.natchezMtlTraceForLocal
import natchez.noop.NoopEntrypoint
import org.typelevel.log4cats.LoggerFactory
import org.typelevel.log4cats.noop.NoOpFactory

trait ConsulMiddlewareAppPlatform extends IOApp.Simple {
trait ConsulMiddlewareAppPlatform extends IOApp.Simple with LocalTracing {
private implicit val noOpFactory: LoggerFactory[IO] = NoOpFactory[IO]

override def run: IO[Unit] = Trace.ioTrace(NoopSpan[IO]()).flatMap { implicit trace =>
new ConsulMiddlewareApp[IO].run
override def run: IO[Unit] = IOLocal(Span.noop[IO]).map(catsMtlEffectLocalForIO(_)).flatMap { implicit L =>
new ConsulMiddlewareApp[IO](NoopEntrypoint[IO]()).run
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package com.dwolla.consul.examples

import cats.effect._
import natchez.Trace
import natchez.noop.NoopSpan
import natchez.Span
import natchez.mtl.natchezMtlTraceForLocal
import natchez.noop.NoopEntrypoint
import org.typelevel.log4cats.LoggerFactory
import org.typelevel.log4cats.noop.NoOpFactory

trait ConsulServiceDiscoveryAlgAppPlatform extends IOApp.Simple {
trait ConsulServiceDiscoveryAlgAppPlatform extends IOApp.Simple with LocalTracing {
private implicit val noOpFactory: LoggerFactory[IO] = NoOpFactory[IO]

override def run: IO[Unit] = Trace.ioTrace(NoopSpan[IO]()).flatMap { implicit trace =>
new ConsulServiceDiscoveryAlgApp[IO].run
override def run: IO[Unit] = IOLocal(Span.noop[IO]).map(catsMtlEffectLocalForIO(_)).flatMap { implicit L =>
new ConsulServiceDiscoveryAlgApp[IO](NoopEntrypoint[IO]()).run
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package com.dwolla.consul.examples

import cats.data._
import cats.syntax.all._
import cats.effect.{Trace => _, _}
import cats.syntax.all._
import io.jaegertracing.Configuration._
import natchez._
import natchez.jaeger._
import natchez.mtl._
import org.typelevel.log4cats.LoggerFactory
import org.typelevel.log4cats.slf4j.Slf4jFactory

import java.net.URI

trait ConsulMiddlewareAppPlatform extends IOApp.Simple {
trait ConsulMiddlewareAppPlatform extends IOApp.Simple with LocalTracing {
private def jaegerEntryPoint[F[_] : Sync]: Resource[F, EntryPoint[F]] =
Jaeger.entryPoint("ConsulMiddlewareApp", Either.catchNonFatal(new URI("http://localhost:16686")).toOption) { c =>
Sync[F].delay {
Expand All @@ -23,11 +23,16 @@ trait ConsulMiddlewareAppPlatform extends IOApp.Simple {

override def run: IO[Unit] =
jaegerEntryPoint[IO]
.flatMap(_.root("ConsulMiddlewareApp"))
.evalMap {
implicit val loggerFactory: LoggerFactory[IO] = Slf4jFactory.create[IO]
.flatMap { ep =>
ep.root("ConsulMiddlewareApp")
.evalMap {
implicit val loggerFactory: LoggerFactory[IO] = Slf4jFactory.create[IO]

new ConsulMiddlewareApp[ReaderT[IO, Span[IO], *]].run.run
IOLocal(_).flatMap { implicit l =>
new ConsulMiddlewareApp(ep).run
}
}
}
.use_

}
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package com.dwolla.consul.examples

import cats.data.Kleisli
import cats.effect.{Trace => _, _}
import cats.syntax.all._
import io.jaegertracing.Configuration._
import natchez._
import natchez.jaeger._
import natchez.mtl.natchezMtlTraceForLocal
import org.typelevel.log4cats.LoggerFactory
import org.typelevel.log4cats.slf4j.Slf4jFactory

import java.net.URI

trait ConsulServiceDiscoveryAlgAppPlatform extends IOApp.Simple {
trait ConsulServiceDiscoveryAlgAppPlatform extends IOApp.Simple with LocalTracing {
private def jaegerEntryPoint[F[_] : Sync]: Resource[F, EntryPoint[F]] =
Jaeger.entryPoint("ConsulServiceDiscoveryAlgApp", Either.catchNonFatal(new URI("http://localhost:16686")).toOption) { c =>
Sync[F].delay {
Expand All @@ -23,11 +23,15 @@ trait ConsulServiceDiscoveryAlgAppPlatform extends IOApp.Simple {

override def run: IO[Unit] =
jaegerEntryPoint[IO]
.flatMap(_.root("ConsulServiceDiscoveryAlgApp"))
.evalMap {
implicit val loggerFactory: LoggerFactory[IO] = Slf4jFactory.create[IO]
.flatMap { ep =>
ep.root("ConsulServiceDiscoveryAlgApp")
.evalMap {
implicit val loggerFactory: LoggerFactory[IO] = Slf4jFactory.create[IO]

new ConsulServiceDiscoveryAlgApp[Kleisli[IO, Span[IO], *]].run.run
IOLocal(_).flatMap { implicit l =>
new ConsulServiceDiscoveryAlgApp(ep).run
}
}
}
.use_
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package com.dwolla.consul

import cats.effect.kernel.Resource.ExitCase
import cats.{Monad, ~>}
import cats.effect.std.Random
import cats.effect.syntax.all._
import cats.effect.{Trace => _, _}
import cats.mtl.Local
import cats.syntax.all._
import cats.{Monad, ~>}
import com.dwolla.consul.ThirdPartyTypeCodecs._
import fs2.Stream
import io.circe.optics.JsonPath.root
import io.circe.{Decoder, Json}
import monocle.Traversal
import natchez.Trace
import natchez.{EntryPoint, Span, Trace}
import org.http4s.Method.GET
import org.http4s._
import org.http4s.circe.jsonOf
Expand Down Expand Up @@ -54,7 +55,9 @@ trait ConsulServiceDiscoveryAlg[F[_]] { self =>
object ConsulServiceDiscoveryAlg {
def apply[F[_] : Temporal : LoggerFactory : Random : Trace](consulBaseUri: Uri,
longPollTimeout: FiniteDuration,
client: Client[F]): F[ConsulServiceDiscoveryAlg[F]] =
client: Client[F],
entryPoint: EntryPoint[F])
(implicit L: Local[F, Span[F]]): F[ConsulServiceDiscoveryAlg[F]] =
LoggerFactory[F]
.create(LoggerName("com.dwolla.consul.ConsulServiceDiscoveryAlg"))
.map { implicit l =>
Expand All @@ -63,7 +66,7 @@ object ConsulServiceDiscoveryAlg {
lookup[F](serviceName, consulBaseUri, None, longPollTimeout, client)
.toResource
.flatMap { case (initialValue, initialConsulIndex) =>
continuallyUpdating(serviceName, initialValue, initialConsulIndex, consulBaseUri, longPollTimeout, client)
continuallyUpdating(serviceName, initialValue, initialConsulIndex, consulBaseUri, longPollTimeout, client, entryPoint)
}
.onFinalize(Logger[F].trace(s"👋 shutting down authoritiesForService($serviceName)"))
}
Expand Down Expand Up @@ -145,25 +148,42 @@ object ConsulServiceDiscoveryAlg {
initialConsulIndex: Option[ConsulIndex],
consulBase: Uri,
longPollTimeout: FiniteDuration,
client: Client[F]): Resource[F, F[Vector[Uri.Authority]]] =
client: Client[F],
entryPoint: EntryPoint[F],
)
(implicit L: Local[F, Span[F]]): Resource[F, F[Vector[Uri.Authority]]] =
Stream.unfoldEval(initialConsulIndex) { maybeIndex =>
// since this is a background task, it doesn't make sense to attach it to the trace that initially started it
import natchez.Trace.Implicits.noop
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The noop Trace[F] previously passed to lookup prevented the span creation immediately within lookup (e.g. the span named com.dwolla.consul.ConsulServiceDiscoveryAlg.lookup) but the issue is observable when using a Client[F] that has been wrapped in the Natchez middleware from natchez-http4s. Assuming it's using a Local[F, Span[F]]-based MTL Trace[F], the span referenced by the Natchez middleware is not the noop span, but the current value of the Local instance, meaning the middleware creates a new span and sends that trace's kernel as part of the request being made.


lookup[F](serviceName, consulBase, maybeIndex, longPollTimeout, client)
.map(_.leftMap(_.some)) // if we successfully got values, wrap them in Some so we can unNone later
.handleErrorWith {
// TODO maybe we should introduce some kind of escalating delay here?
Logger[F].warn(_)("🔥 An exception occurred getting service details from Consul; retrying")
.as((none[Vector[Uri.Authority]], maybeIndex)) // continue successfully, but emit None so the failure can be filtered out later
}
.map(_.some) // this stream will unfold forever (well, until its Resource is finalized)
inNewLinkedRootSpan(entryPoint) { // since this is a background task, it doesn't make sense
import natchez.mtl._ // to directly attach it to the trace that initially started it,
// but linking it to the new root span is helpful

lookup[F](serviceName, consulBase, maybeIndex, longPollTimeout, client)
.map(_.leftMap(_.some)) // if we successfully got values, wrap them in Some so we can unNone later
.handleErrorWith {
// TODO maybe we should introduce some kind of escalating delay here?
Logger[F].warn(_)("🔥 An exception occurred getting service details from Consul; retrying")
.as((none[Vector[Uri.Authority]], maybeIndex)) // continue successfully, but emit None so the failure can be filtered out later
}
.map(_.some) // this stream will unfold forever (well, until its Resource is finalized)
}
}
.unNone // errors returned by `lookup` are emitted as None, so filter them out
.unNone // errors returned by `lookup` are emitted as None, so filter them out
.holdResource(initialValue)
.onFinalize(Logger[F].trace(s"👋 shutting down continuallyUpdating($serviceName, …)"))
.map(_.get)

private def inNewLinkedRootSpan[F[_] : MonadCancelThrow, A](entryPoint: EntryPoint[F])
(fa: F[A])
(implicit L: Local[F, Span[F]]): F[A] =
natchez.mtl.natchezMtlTraceForLocal
.kernel
.map(Span.Options.Defaults.withLink)
.flatMap {
entryPoint
.root("com.dwolla.consul.ConsulServiceDiscoveryAlg.continuallyUpdating", _)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We technically don't need to create a new root span here; we could use the Local[F, Span[F]] to set the no-op span for the fa scope. But we have to break bincompat to demand the Local[F, Span[F]], so we might as well also ask for the EntryPoint[F] and create a new root span.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And AFAICT we really do need Local[F, Span[F]]. One alternative I tried, using Trace[F].span("continuallyUpdating", Span.Options.Suppress), did successfully suppress the span creation from the Natchez http4s middleware, but the duration of the continuallyUpdating was still tied to the execution of the background requests, so doesn't accomplish the main goal.

.use(Local[F, Span[F]].scope(fa))
}

private[consul] def serviceListUri(consulBase: Uri,
serviceName: ServiceName,
index: Option[ConsulIndex],
Expand All @@ -172,15 +192,6 @@ object ConsulServiceDiscoveryAlg {
consulBase / "v1" / "health" / "service" / serviceName +? OnlyHealthyServices +?? index +?? index.as(WaitPeriod(longPollTimeout))

private implicit def jsonEntityDecoder[F[_] : Concurrent, A: Decoder]: EntityDecoder[F, A] = jsonOf[F, A]

@deprecated("used traced version", "0.2.0")
def apply[F[_]](consulBaseUri: Uri,
longPollTimeout: FiniteDuration,
client: Client[F],
F: Temporal[F],
L: LoggerFactory[F],
R: Random[F]): F[ConsulServiceDiscoveryAlg[F]] =
ConsulServiceDiscoveryAlg(consulBaseUri, longPollTimeout, client)(F, L, R, natchez.Trace.Implicits.noop(F))
}

abstract class AbstractConsulServiceDiscoveryAlg[F[_] : Random : Monad] extends ConsulServiceDiscoveryAlg[F] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,4 @@ object ConsulUriResolver {
.flatTap(newUri => Logger[F].trace(s" rewrote $source to $newUri"))

}

@deprecated("used traced version", "0.2.0")
def apply[F[_]](backgroundResolver: ConsulServiceDiscoveryAlg[F], F: Async[F], L: LoggerFactory[F]): Resource[F, ConsulUriResolver[F]] =
ConsulUriResolver(backgroundResolver)(F, L, natchez.Trace.Implicits.noop(F))

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,4 @@ object ConsulMiddleware {
}
.onFinalize(Logger[F].trace("👋 shutting down ConsulMiddleware"))
}

@deprecated("used traced version", "0.2.0")
def apply[F[_]](consulServiceDiscoveryAlg: ConsulServiceDiscoveryAlg[F],
client: Client[F],
F: Async[F],
L: LoggerFactory[F]): Resource[F, Client[F]] = {
ConsulMiddleware(consulServiceDiscoveryAlg)(client)(F, L, natchez.Trace.Implicits.noop(F))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package examples

import cats.effect.{Trace => _, _}
import cats.effect.std.Random
import cats.mtl.Local
import cats.syntax.all._
import com.dwolla.consul._
import com.dwolla.consul.examples.ConsulMiddlewareApp.consulAwareClient
import com.dwolla.consul.http4s.ConsulMiddleware
import fs2.Stream
import fs2.io.net.Network
import natchez.Trace
import natchez.{EntryPoint, Span, Trace}
import org.http4s.Method.GET
import org.http4s._
import org.http4s.client.Client
Expand All @@ -20,7 +21,8 @@ import org.typelevel.log4cats.{Logger, LoggerFactory}

import scala.concurrent.duration._

class ConsulMiddlewareApp[F[_] : Async : LoggerFactory : Trace : Network] extends Http4sClientDsl[F] {
class ConsulMiddlewareApp[F[_] : Async : LoggerFactory : Trace : Network](entryPoint: EntryPoint[F])
(implicit L: Local[F, Span[F]]) extends Http4sClientDsl[F] {
val exampleConsulUri: Uri = uri"consul://httpd/"

def run: F[Unit] =
Expand All @@ -29,7 +31,7 @@ class ConsulMiddlewareApp[F[_] : Async : LoggerFactory : Trace : Network] extend
.create
.flatMap { implicit logger: Logger[F] =>
(for {
client <- Stream.resource(consulAwareClient[F])
client <- Stream.resource(consulAwareClient(entryPoint))
_ <- Stream.repeatEval {
client
.successful(GET(exampleConsulUri))
Expand All @@ -48,13 +50,15 @@ class ConsulMiddlewareApp[F[_] : Async : LoggerFactory : Trace : Network] extend
}

object ConsulMiddlewareApp extends ConsulMiddlewareAppPlatform {
private[ConsulMiddlewareApp] def consulAwareClient[F[_] : Async : Random : LoggerFactory : Trace : Network]: Resource[F, Client[F]] =
(consulServiceDiscoveryAlg[F], normalClient[F])
private[ConsulMiddlewareApp] def consulAwareClient[F[_] : Async : Random : LoggerFactory : Trace : Network](entryPoint: EntryPoint[F])
(implicit L: Local[F, Span[F]]): Resource[F, Client[F]] =
(consulServiceDiscoveryAlg[F](entryPoint), normalClient[F])
.parMapN(ConsulMiddleware(_)(_))
.flatten

private def consulServiceDiscoveryAlg[F[_] : Async : Random : LoggerFactory : Trace : Network]: Resource[F, ConsulServiceDiscoveryAlg[F]] =
longPollClient[F].evalMap(ConsulServiceDiscoveryAlg(uri"http://localhost:8500", 1.minute, _))
private def consulServiceDiscoveryAlg[F[_] : Async : Random : LoggerFactory : Trace : Network](entryPoint: EntryPoint[F])
(implicit L: Local[F, Span[F]]): Resource[F, ConsulServiceDiscoveryAlg[F]] =
longPollClient[F].evalMap(ConsulServiceDiscoveryAlg(uri"http://localhost:8500", 1.minute, _, entryPoint))

private def longPollClient[F[_] : Async : Network]: Resource[F, Client[F]] = clientWithTimeout(75.seconds)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@ package examples

import cats.effect.{Trace => _, _}
import cats.effect.std.Random
import cats.mtl.Local
import cats.syntax.all._
import fs2.Stream
import fs2.io.net.Network
import natchez.Trace
import natchez.{EntryPoint, Span, Trace}
import org.http4s.client.dsl.Http4sClientDsl
import org.http4s.ember.client.EmberClientBuilder
import org.http4s.syntax.all._
import org.typelevel.log4cats.{Logger, LoggerFactory}

import scala.concurrent.duration._

class ConsulServiceDiscoveryAlgApp[F[_] : Async : LoggerFactory : Trace : Network] extends Http4sClientDsl[F] {
class ConsulServiceDiscoveryAlgApp[F[_] : Async : LoggerFactory : Trace : Network](entryPoint: EntryPoint[F])
(implicit L: Local[F, Span[F]]) extends Http4sClientDsl[F] {
private implicit def loggerR(implicit L: Logger[F]): Logger[Resource[F, *]] = Logger[F].mapK(Resource.liftK)

private val serviceName = ServiceName("httpd")
Expand All @@ -26,7 +28,7 @@ class ConsulServiceDiscoveryAlgApp[F[_] : Async : LoggerFactory : Trace : Networ
EmberClientBuilder
.default[F]
.build
.evalMap(ConsulServiceDiscoveryAlg(uri"http://localhost:8500", 1.minute, _))
.evalMap(ConsulServiceDiscoveryAlg(uri"http://localhost:8500", 1.minute, _, entryPoint))
}
.flatMap { alg =>
Stream
Expand Down
Loading