diff --git a/build.sbt b/build.sbt index 932f4604..8eab90fe 100644 --- a/build.sbt +++ b/build.sbt @@ -131,7 +131,7 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) "org.scodec" %%% "scodec-bits" % "1.1.34", "org.scodec" %%% "scodec-core" % (if (tlIsScala3.value) "2.2.0" else "1.11.10"), "org.scodec" %%% "scodec-cats" % "1.2.0", - "org.tpolecat" %%% "natchez-core" % natchezVersion, + "org.typelevel" %%% "otel4s-core-trace" % "0.0-2ef92ef-SNAPSHOT", "org.tpolecat" %%% "sourcepos" % "1.1.0", "org.scala-lang.modules" %%% "scala-collection-compat" % "2.9.0", ) ++ Seq( diff --git a/modules/core/js-native/src/main/scala/protocol/StartupPlatform.scala b/modules/core/js-native/src/main/scala/protocol/StartupPlatform.scala index 54783976..8b374c1e 100644 --- a/modules/core/js-native/src/main/scala/protocol/StartupPlatform.scala +++ b/modules/core/js-native/src/main/scala/protocol/StartupPlatform.scala @@ -6,7 +6,7 @@ package skunk.net.protocol import cats.MonadThrow import cats.syntax.all._ -import natchez.Trace +import org.typelevel.otel4s.trace.Tracer import skunk.net.MessageSocket import skunk.net.message._ import skunk.exception.{ @@ -16,12 +16,12 @@ import skunk.exception.{ private[protocol] trait StartupCompanionPlatform { this: Startup.type => - private[protocol] def authenticationSASL[F[_]: MonadThrow: MessageSocket: Trace]( + private[protocol] def authenticationSASL[F[_]: MonadThrow: MessageSocket: Tracer]( sm: StartupMessage, password: Option[String], mechanisms: List[String] ): F[Unit] = - Trace[F].span("authenticationSASL") { + Tracer[F].span("authenticationSASL") { if (mechanisms.contains(Scram.SaslMechanism)) { for { pw <- requirePassword[F](sm, password) diff --git a/modules/core/jvm/src/main/scala/net/protocol/StartupPlatform.scala b/modules/core/jvm/src/main/scala/net/protocol/StartupPlatform.scala index 1ebce1cf..7c230dac 100644 --- a/modules/core/jvm/src/main/scala/net/protocol/StartupPlatform.scala +++ b/modules/core/jvm/src/main/scala/net/protocol/StartupPlatform.scala @@ -9,7 +9,7 @@ import com.ongres.scram.common.stringprep.StringPreparations import cats.MonadError import cats.syntax.all._ -import natchez.Trace +import org.typelevel.otel4s.trace.Tracer import scala.util.control.NonFatal import skunk.net.MessageSocket import skunk.net.message._ @@ -20,14 +20,14 @@ import skunk.exception.{ private[protocol] trait StartupCompanionPlatform { this: Startup.type => - private[protocol] def authenticationSASL[F[_]: MessageSocket: Trace]( + private[protocol] def authenticationSASL[F[_]: MessageSocket: Tracer]( sm: StartupMessage, password: Option[String], mechanisms: List[String] )( implicit ev: MonadError[F, Throwable] ): F[Unit] = - Trace[F].span("authenticationSASL") { + Tracer[F].span("authenticationSASL").surround { for { client <- { try ScramClient. diff --git a/modules/core/shared/src/main/scala/Session.scala b/modules/core/shared/src/main/scala/Session.scala index 8337d467..5379546f 100644 --- a/modules/core/shared/src/main/scala/Session.scala +++ b/modules/core/shared/src/main/scala/Session.scala @@ -13,7 +13,7 @@ import fs2.concurrent.Signal import fs2.io.net.{ Network, SocketGroup, SocketOption } import fs2.Pipe import fs2.Stream -import natchez.Trace +import org.typelevel.otel4s.trace.Tracer import skunk.codec.all.bool import skunk.data._ import skunk.net.Protocol @@ -383,7 +383,7 @@ object Session { * @param queryCache Size of the cache for query checking * @group Constructors */ - def pooled[F[_]: Temporal: Trace: Network: Console]( + def pooled[F[_]: Temporal: Tracer: Network: Console]( host: String, port: Int = 5432, user: String, @@ -400,11 +400,11 @@ object Session { parseCache: Int = 1024, readTimeout: Duration = Duration.Inf, ): Resource[F, Resource[F, Session[F]]] = { - pooledF[F](host, port, user, database, password, max, debug, strategy, ssl, parameters, socketOptions, commandCache, queryCache, parseCache, readTimeout).map(_.apply(Trace[F])) + pooledF[F](host, port, user, database, password, max, debug, strategy, ssl, parameters, socketOptions, commandCache, queryCache, parseCache, readTimeout).map(_.apply(Tracer[F])) } /** - * Resource yielding a function from Trace to `SessionPool` managing up to `max` concurrent `Session`s. Typically you + * Resource yielding a function from Tracer to `SessionPool` managing up to `max` concurrent `Session`s. Typically you * will `use` this resource once on application startup and pass the resulting * `Resource[F, Session[F]]` to the rest of your program. * @@ -440,9 +440,9 @@ object Session { queryCache: Int = 1024, parseCache: Int = 1024, readTimeout: Duration = Duration.Inf, - ): Resource[F, Trace[F] => Resource[F, Session[F]]] = { + ): Resource[F, Tracer[F] => Resource[F, Session[F]]] = { - def session(socketGroup: SocketGroup[F], sslOp: Option[SSLNegotiation.Options[F]], cache: Describe.Cache[F])(implicit T: Trace[F]): Resource[F, Session[F]] = + def session(socketGroup: SocketGroup[F], sslOp: Option[SSLNegotiation.Options[F]], cache: Describe.Cache[F])(implicit T: Tracer[F]): Resource[F, Session[F]] = for { pc <- Resource.eval(Parse.Cache.empty[F](parseCache)) s <- fromSocketGroup[F](socketGroup, host, port, user, database, password, debug, strategy, socketOptions, sslOp, parameters, cache, pc, readTimeout) @@ -453,7 +453,7 @@ object Session { for { dc <- Resource.eval(Describe.Cache.empty[F](commandCache, queryCache)) sslOp <- ssl.toSSLNegotiationOptions(if (debug) logger.some else none) - pool <- Pool.ofF({implicit T: Trace[F] => session(Network[F], sslOp, dc)}, max)(Recyclers.full) + pool <- Pool.ofF({implicit T: Tracer[F] => session(Network[F], sslOp, dc)}, max)(Recyclers.full) } yield pool } @@ -463,7 +463,7 @@ object Session { * single-session pool. This method is shorthand for `Session.pooled(..., max = 1, ...).flatten`. * @see pooled */ - def single[F[_]: Temporal: Trace: Network: Console]( + def single[F[_]: Temporal: Tracer: Network: Console]( host: String, port: Int = 5432, user: String, @@ -478,10 +478,10 @@ object Session { parseCache: Int = 1024, readTimeout: Duration = Duration.Inf, ): Resource[F, Session[F]] = - singleF[F](host, port, user, database, password, debug, strategy, ssl, parameters, commandCache, queryCache, parseCache, readTimeout).apply(Trace[F]) + singleF[F](host, port, user, database, password, debug, strategy, ssl, parameters, commandCache, queryCache, parseCache, readTimeout).apply(Tracer[F]) /** - * Resource yielding logically unpooled sessions given a Trace. This can be convenient for demonstrations and + * Resource yielding logically unpooled sessions given a Tracer. This can be convenient for demonstrations and * programs that only need a single session. In reality each session is managed by its own * single-session pool. * @see pooledF @@ -500,8 +500,8 @@ object Session { queryCache: Int = 1024, parseCache: Int = 1024, readTimeout: Duration = Duration.Inf, - ): Trace[F] => Resource[F, Session[F]] = - Kleisli((_: Trace[F]) => pooledF( + ): Tracer[F] => Resource[F, Session[F]] = + Kleisli((_: Tracer[F]) => pooledF( host = host, port = port, user = user, @@ -517,10 +517,10 @@ object Session { parseCache = parseCache, readTimeout = readTimeout )).flatMap(f => - Kleisli { implicit T: Trace[F] => f(T) } + Kleisli { implicit T: Tracer[F] => f(T) } ).run - def fromSocketGroup[F[_]: Temporal: Trace: Console]( + def fromSocketGroup[F[_]: Temporal: Tracer: Console]( socketGroup: SocketGroup[F], host: String, port: Int = 5432, diff --git a/modules/core/shared/src/main/scala/exception/PostgresErrorException.scala b/modules/core/shared/src/main/scala/exception/PostgresErrorException.scala index 0dcb6ae3..b68496b7 100644 --- a/modules/core/shared/src/main/scala/exception/PostgresErrorException.scala +++ b/modules/core/shared/src/main/scala/exception/PostgresErrorException.scala @@ -5,7 +5,8 @@ package skunk.exception import cats.syntax.all._ -import natchez.TraceValue +import org.typelevel.otel4s.Attribute +import org.typelevel.otel4s.AttributeKey import skunk.SqlState import skunk.data.Type import skunk.util.Origin @@ -33,26 +34,26 @@ class PostgresErrorException ( argumentsOrigin = argumentsOrigin, ) { - override def fields: Map[String, TraceValue] = { - var map = super.fields - - map += "error.postgres.message" -> message - map += "error.postgres.severity" -> severity - map += "error.postgres.code" -> code - - internalPosition.foreach(a => map += "error.postgres.internalPosition" -> a) - internalQuery .foreach(a => map += "error.postgres.internalQuery" -> a) - where .foreach(a => map += "error.postgres.where" -> a) - schemaName .foreach(a => map += "error.postgres.schemaName" -> a) - tableName .foreach(a => map += "error.postgres.tableName" -> a) - columnName .foreach(a => map += "error.postgres.columnName" -> a) - dataTypeName .foreach(a => map += "error.postgres.dataTypeName" -> a) - constraintName .foreach(a => map += "error.postgres.constraintName" -> a) - fileName .foreach(a => map += "error.postgres.fileName" -> a) - line .foreach(a => map += "error.postgres.line" -> a) - routine .foreach(a => map += "error.postgres.routine" -> a) - - map + override def fields: List[Attribute[_]] = { + val builder = List.newBuilder[Attribute[_]] + + builder += Attribute(AttributeKey.string("error.postgres.message") , message) + builder += Attribute(AttributeKey.string("error.postgres.severity") , severity) + builder += Attribute(AttributeKey.string("error.postgres.code") , code) + + internalPosition.foreach(a => builder += Attribute(AttributeKey.long("error.postgres.internalPosition") , a.toLong)) + internalQuery .foreach(a => builder += Attribute(AttributeKey.long("error.postgres.internalQuery") , a.toLong)) + where .foreach(a => builder += Attribute(AttributeKey.string("error.postgres.where") , a)) + schemaName .foreach(a => builder += Attribute(AttributeKey.string("error.postgres.schemaName") , a)) + tableName .foreach(a => builder += Attribute(AttributeKey.string("error.postgres.tableName") , a)) + columnName .foreach(a => builder += Attribute(AttributeKey.string("error.postgres.columnName") , a)) + dataTypeName .foreach(a => builder += Attribute(AttributeKey.string("error.postgres.dataTypeName") , a)) + constraintName .foreach(a => builder += Attribute(AttributeKey.string("error.postgres.constraintName") , a)) + fileName .foreach(a => builder += Attribute(AttributeKey.string("error.postgres.fileName") , a)) + line .foreach(a => builder += Attribute(AttributeKey.long("error.postgres.line") , a.toLong)) + routine .foreach(a => builder += Attribute(AttributeKey.string("error.postgres.routine") , a)) + + builder.result() } /** diff --git a/modules/core/shared/src/main/scala/exception/SkunkException.scala b/modules/core/shared/src/main/scala/exception/SkunkException.scala index 2cc8d996..66ed3f2f 100644 --- a/modules/core/shared/src/main/scala/exception/SkunkException.scala +++ b/modules/core/shared/src/main/scala/exception/SkunkException.scala @@ -5,11 +5,11 @@ package skunk.exception import cats.syntax.all._ +import org.typelevel.otel4s.Attribute +import org.typelevel.otel4s.AttributeKey import skunk.data.Type import skunk.Query import skunk.util.{ CallSite, Origin, Pretty } -import natchez.Fields -import natchez.TraceValue class SkunkException protected[skunk]( val sql: Option[String], @@ -22,41 +22,41 @@ class SkunkException protected[skunk]( val sqlOrigin: Option[Origin] = None, val argumentsOrigin: Option[Origin] = None, val callSite: Option[CallSite] = None -) extends Exception(message) with Fields { +) extends Exception(message) { - override def fields: Map[String, TraceValue] = { + def fields: List[Attribute[_]] = { - var map: Map[String, TraceValue] = Map.empty + val builder = List.newBuilder[Attribute[_]] - map += "error.message" -> message + builder += Attribute(AttributeKey.string("error.message"), message) - sql .foreach(a => map += "error.sql" -> a) - position.foreach(a => map += "error.position" -> a) - detail .foreach(a => map += "error.detail" -> a) - hint .foreach(a => map += "error.hint" -> a) + sql .foreach(a => builder += Attribute(AttributeKey.string("error.sql") , a)) + position.foreach(a => builder += Attribute(AttributeKey.long("error.position") , a.toLong)) + detail .foreach(a => builder += Attribute(AttributeKey.string("error.detail") , a)) + hint .foreach(a => builder += Attribute(AttributeKey.string("error.hint") , a)) (arguments.zipWithIndex).foreach { case ((typ, os), n) => - map += s"error.argument.${n + 1}.type" -> typ.name - map += s"error.argument.${n + 1}.value" -> os.getOrElse[String]("NULL") + builder += Attribute(AttributeKey.string(s"error.argument.${n + 1}.type") , typ.name) + builder += Attribute(AttributeKey.string(s"error.argument.${n + 1}.value") , os.getOrElse[String]("NULL")) } sqlOrigin.foreach { o => - map += "error.sqlOrigin.file" -> o.file - map += "error.sqlOrigin.line" -> o.line + builder += Attribute(AttributeKey.string("error.sqlOrigin.file") , o.file) + builder += Attribute(AttributeKey.long("error.sqlOrigin.line") , o.line.toLong) } argumentsOrigin.foreach { o => - map += "error.argumentsOrigin.file" -> o.file - map += "error.argumentsOrigin.line" -> o.line + builder += Attribute(AttributeKey.string("error.argumentsOrigin.file") , o.file) + builder += Attribute(AttributeKey.long("error.argumentsOrigin.line") , o.line.toLong) } callSite.foreach { cs => - map += "error.callSite.origin.file" -> cs.origin.file - map += "error.callSite.origin.line" -> cs.origin.line - map += "error.callSite.origin.method" -> cs.methodName + builder += Attribute(AttributeKey.string("error.callSite.origin.file") , cs.origin.file) + builder += Attribute(AttributeKey.long("error.callSite.origin.line") , cs.origin.line.toLong) + builder += Attribute(AttributeKey.string("error.callSite.origin.method") , cs.methodName) } - map + builder.result() } protected def title: String = @@ -148,4 +148,4 @@ object SkunkException { argumentsOrigin = argsOrigin ) -} \ No newline at end of file +} diff --git a/modules/core/shared/src/main/scala/net/Protocol.scala b/modules/core/shared/src/main/scala/net/Protocol.scala index 44843113..31f1d8f4 100644 --- a/modules/core/shared/src/main/scala/net/Protocol.scala +++ b/modules/core/shared/src/main/scala/net/Protocol.scala @@ -13,7 +13,7 @@ import skunk.{ Command, Query, Statement, ~, Void } import skunk.data._ import skunk.util.{ Namer, Origin } import skunk.util.Typer -import natchez.Trace +import org.typelevel.otel4s.trace.Tracer import fs2.io.net.{ SocketGroup, SocketOption } import skunk.net.protocol.Describe import scala.concurrent.duration.Duration @@ -196,7 +196,7 @@ object Protocol { * @param host Postgres server host * @param port Postgres port, default 5432 */ - def apply[F[_]: Temporal: Trace: Console]( + def apply[F[_]: Temporal: Tracer: Console]( host: String, port: Int, debug: Boolean, @@ -213,7 +213,7 @@ object Protocol { p <- Resource.eval(fromMessageSocket(bms, nam, describeCache, parseCache)) } yield p - def fromMessageSocket[F[_]: Concurrent: Trace]( + def fromMessageSocket[F[_]: Concurrent: Tracer]( bms: BufferedMessageSocket[F], nam: Namer[F], dc: Describe.Cache[F], diff --git a/modules/core/shared/src/main/scala/net/protocol/Bind.scala b/modules/core/shared/src/main/scala/net/protocol/Bind.scala index ad51a0c4..f5436f1c 100644 --- a/modules/core/shared/src/main/scala/net/protocol/Bind.scala +++ b/modules/core/shared/src/main/scala/net/protocol/Bind.scala @@ -12,7 +12,10 @@ import skunk.net.message.{ Bind => BindMessage, Close => _, _ } import skunk.net.MessageSocket import skunk.net.Protocol.{ PreparedStatement, PortalId } import skunk.util.{ Origin, Namer } -import natchez.Trace +import org.typelevel.otel4s.Attribute +import org.typelevel.otel4s.AttributeKey +import org.typelevel.otel4s.trace.Span +import org.typelevel.otel4s.trace.Tracer trait Bind[F[_]] { @@ -26,7 +29,7 @@ trait Bind[F[_]] { object Bind { - def apply[F[_]: Exchange: MessageSocket: Namer: Trace]( + def apply[F[_]: Exchange: MessageSocket: Namer: Tracer]( implicit ev: MonadError[F, Throwable] ): Bind[F] = new Bind[F] { @@ -37,14 +40,14 @@ object Bind { argsOrigin: Origin ): Resource[F, PortalId] = Resource.make { - exchange("bind") { + exchange("bind") { (span: Span[F]) => for { pn <- nextName("portal").map(PortalId(_)) ea = statement.statement.encoder.encode(args) // encoded args - _ <- Trace[F].put( - "arguments" -> ea.map(_.orNull).mkString(","), - "portal-id" -> pn.value - ) + _ <- span.addAttributes( + Attribute(AttributeKey.string("arguments"), ea.map(_.orNull).mkString(",")), + Attribute(AttributeKey.string("portal-id"), pn.value) + ) _ <- send(BindMessage(pn.value, statement.id.value, ea)) _ <- send(Flush) _ <- flatExpect { diff --git a/modules/core/shared/src/main/scala/net/protocol/Close.scala b/modules/core/shared/src/main/scala/net/protocol/Close.scala index 573f3758..8f13e314 100644 --- a/modules/core/shared/src/main/scala/net/protocol/Close.scala +++ b/modules/core/shared/src/main/scala/net/protocol/Close.scala @@ -9,7 +9,10 @@ import cats.FlatMap import cats.syntax.all._ import skunk.net.message.{ Close => CloseMessage, Flush, CloseComplete } import skunk.net.MessageSocket -import natchez.Trace +import org.typelevel.otel4s.Attribute +import org.typelevel.otel4s.AttributeKey +import org.typelevel.otel4s.trace.Span +import org.typelevel.otel4s.trace.Tracer trait Close[F[_]] { def apply(portalId: Protocol.PortalId): F[Unit] @@ -18,18 +21,18 @@ trait Close[F[_]] { object Close { - def apply[F[_]: FlatMap: Exchange: MessageSocket: Trace]: Close[F] = + def apply[F[_]: FlatMap: Exchange: MessageSocket: Tracer]: Close[F] = new Close[F] { override def apply(portalId: Protocol.PortalId): F[Unit] = - exchange("close-portal") { - Trace[F].put("portal" -> portalId.value) *> + exchange("close-portal") { (span: Span[F]) => + span.addAttribute(Attribute(AttributeKey.string("portal"), portalId.value)) *> close(CloseMessage.portal(portalId.value)) } override def apply(statementId: Protocol.StatementId): F[Unit] = - exchange("close-statement") { - Trace[F].put("statement" -> statementId.value) *> + exchange("close-statement") { (span: Span[F]) => + span.addAttribute(Attribute(AttributeKey.string("statement"), statementId.value)) *> close(CloseMessage.statement(statementId.value)) } @@ -42,4 +45,4 @@ object Close { } -} \ No newline at end of file +} diff --git a/modules/core/shared/src/main/scala/net/protocol/Describe.scala b/modules/core/shared/src/main/scala/net/protocol/Describe.scala index e528d223..8245d013 100644 --- a/modules/core/shared/src/main/scala/net/protocol/Describe.scala +++ b/modules/core/shared/src/main/scala/net/protocol/Describe.scala @@ -14,7 +14,10 @@ import skunk.net.message.{ Describe => DescribeMessage, _ } import skunk.util.{ StatementCache, Typer } import skunk.exception.UnknownOidException import skunk.data.TypedRowDescription -import natchez.Trace +import org.typelevel.otel4s.Attribute +import org.typelevel.otel4s.AttributeKey +import org.typelevel.otel4s.trace.Span +import org.typelevel.otel4s.trace.Tracer import cats.data.OptionT trait Describe[F[_]] { @@ -24,16 +27,16 @@ trait Describe[F[_]] { object Describe { - def apply[F[_]: Exchange: MessageSocket: Trace](cache: Cache[F])( + def apply[F[_]: Exchange: MessageSocket: Tracer](cache: Cache[F])( implicit ev: MonadError[F, Throwable] ): Describe[F] = new Describe[F] { override def apply(cmd: skunk.Command[_], id: StatementId, ty: Typer): F[Unit] = - exchange("describe") { + exchange("describe") { (span: Span[F]) => OptionT(cache.commandCache.get(cmd)).getOrElseF { for { - _ <- Trace[F].put("statement-id" -> id.value) + _ <- span.addAttribute(Attribute(AttributeKey.string("statement-id"), id.value)) _ <- send(DescribeMessage.statement(id.value)) _ <- send(Flush) _ <- expect { case ParameterDescription(_) => } // always ok @@ -58,9 +61,9 @@ object Describe { override def apply[A](query: skunk.Query[_, A], id: StatementId, ty: Typer): F[TypedRowDescription] = OptionT(cache.queryCache.get(query)).getOrElseF { - exchange("describe") { + exchange("describe") { (span: Span[F]) => for { - _ <- Trace[F].put("statement-id" -> id.value) + _ <- span.addAttribute(Attribute(AttributeKey.string("statement-id"), id.value)) _ <- send(DescribeMessage.statement(id.value)) _ <- send(Flush) _ <- expect { case ParameterDescription(_) => } // always ok @@ -71,7 +74,7 @@ object Describe { td <- rd.typed(ty) match { case Left(err) => UnknownOidException(query, err, ty.strategy).raiseError[F, TypedRowDescription] case Right(td) => - Trace[F].put("column-types" -> td.fields.map(_.tpe).mkString("[", ", ", "]")).as(td) + span.addAttribute(Attribute(AttributeKey.string("column-types"), td.fields.map(_.tpe).mkString("[", ", ", "]"))).as(td) } _ <- ColumnAlignmentException(query, td).raiseError[F, Unit].unlessA(query.isDynamic || query.decoder.types === td.types) _ <- cache.queryCache.put(query, td) // on success diff --git a/modules/core/shared/src/main/scala/net/protocol/Execute.scala b/modules/core/shared/src/main/scala/net/protocol/Execute.scala index 5aec6726..d1ecf18f 100644 --- a/modules/core/shared/src/main/scala/net/protocol/Execute.scala +++ b/modules/core/shared/src/main/scala/net/protocol/Execute.scala @@ -12,7 +12,10 @@ import skunk.exception.PostgresErrorException import skunk.net.{ Protocol, MessageSocket } import skunk.net.message.{ Execute => ExecuteMessage, _ } import skunk.util.Typer -import natchez.Trace +import org.typelevel.otel4s.Attribute +import org.typelevel.otel4s.AttributeKey +import org.typelevel.otel4s.trace.Span +import org.typelevel.otel4s.trace.Tracer import skunk.exception.CopyNotSupportedException import skunk.exception.EmptyStatementException @@ -23,13 +26,13 @@ trait Execute[F[_]] { object Execute { - def apply[F[_]: Exchange: MessageSocket: Trace]( + def apply[F[_]: Exchange: MessageSocket: Tracer]( implicit ev: MonadError[F, Throwable] ): Execute[F] = new Unroll[F] with Execute[F] { override def apply[A](portal: Protocol.CommandPortal[F, A]): F[Completion] = - exchange("execute") { + exchange("execute") { (_: Span[F]) => for { _ <- send(ExecuteMessage(portal.id.value, 0)) _ <- send(Flush) @@ -74,11 +77,11 @@ object Execute { } override def apply[A, B](portal: Protocol.QueryPortal[F, A, B], maxRows: Int, ty: Typer): F[List[B] ~ Boolean] = - exchange("execute") { + exchange("execute") { (span: Span[F]) => for { - _ <- Trace[F].put( - "max-rows" -> maxRows, - "portal-id" -> portal.id.value + _ <- span.addAttributes( + Attribute(AttributeKey.long("max-rows"), maxRows.toLong), + Attribute(AttributeKey.string("portal-id"), portal.id.value) ) _ <- send(ExecuteMessage(portal.id.value, maxRows)) _ <- send(Flush) diff --git a/modules/core/shared/src/main/scala/net/protocol/Parse.scala b/modules/core/shared/src/main/scala/net/protocol/Parse.scala index 3ff2aea7..f3e4a863 100644 --- a/modules/core/shared/src/main/scala/net/protocol/Parse.scala +++ b/modules/core/shared/src/main/scala/net/protocol/Parse.scala @@ -16,7 +16,10 @@ import skunk.Statement import skunk.util.Namer import skunk.util.Typer import skunk.exception.{ UnknownTypeException, TooManyParametersException } -import natchez.Trace +import org.typelevel.otel4s.Attribute +import org.typelevel.otel4s.AttributeKey +import org.typelevel.otel4s.trace.Span +import org.typelevel.otel4s.trace.Tracer import cats.data.OptionT trait Parse[F[_]] { @@ -25,7 +28,7 @@ trait Parse[F[_]] { object Parse { - def apply[F[_]: Exchange: MessageSocket: Namer: Trace](cache: Cache[F])( + def apply[F[_]: Exchange: MessageSocket: Namer: Tracer](cache: Cache[F])( implicit ev: MonadError[F, Throwable] ): Parse[F] = new Parse[F] { @@ -37,13 +40,13 @@ object Parse { case Right(os) => OptionT(cache.value.get(statement)).getOrElseF { - exchange("parse") { + exchange("parse") { (span: Span[F]) => for { id <- nextName("statement").map(StatementId(_)) - _ <- Trace[F].put( - "statement-name" -> id.value, - "statement-sql" -> statement.sql, - "statement-parameter-types" -> os.map(n => ty.typeForOid(n, -1).getOrElse(n)).mkString("[", ", ", "]") + _ <- span.addAttributes( + Attribute(AttributeKey.string("statement-name"), id.value), + Attribute(AttributeKey.string("statement-sql"), statement.sql), + Attribute(AttributeKey.string("statement-parameter-types"), os.map(n => ty.typeForOid(n, -1).getOrElse(n)).mkString("[", ", ", "]")) ) _ <- send(ParseMessage(id.value, statement.sql, os)) _ <- send(Flush) diff --git a/modules/core/shared/src/main/scala/net/protocol/Prepare.scala b/modules/core/shared/src/main/scala/net/protocol/Prepare.scala index f7cfe37e..d193a6c2 100644 --- a/modules/core/shared/src/main/scala/net/protocol/Prepare.scala +++ b/modules/core/shared/src/main/scala/net/protocol/Prepare.scala @@ -14,7 +14,7 @@ import skunk.net.MessageSocket import skunk.net.Protocol.{ PreparedCommand, PreparedQuery, CommandPortal, QueryPortal } import skunk.util.{ Origin, Namer } import skunk.util.Typer -import natchez.Trace +import org.typelevel.otel4s.trace.Tracer trait Prepare[F[_]] { def apply[A](command: skunk.Command[A], ty: Typer): F[PreparedCommand[F, A]] @@ -23,7 +23,7 @@ trait Prepare[F[_]] { object Prepare { - def apply[F[_]: Exchange: MessageSocket: Namer: Trace](describeCache: Describe.Cache[F], parseCache: Parse.Cache[F])( + def apply[F[_]: Exchange: MessageSocket: Namer: Tracer](describeCache: Describe.Cache[F], parseCache: Parse.Cache[F])( implicit ev: MonadError[F, Throwable] ): Prepare[F] = new Prepare[F] { diff --git a/modules/core/shared/src/main/scala/net/protocol/Query.scala b/modules/core/shared/src/main/scala/net/protocol/Query.scala index 2d0d9f05..df8200c6 100644 --- a/modules/core/shared/src/main/scala/net/protocol/Query.scala +++ b/modules/core/shared/src/main/scala/net/protocol/Query.scala @@ -13,7 +13,10 @@ import skunk.net.message.{ Query => QueryMessage, _ } import skunk.net.MessageSocket import skunk.util.Typer import skunk.exception.UnknownOidException -import natchez.Trace +import org.typelevel.otel4s.Attribute +import org.typelevel.otel4s.AttributeKey +import org.typelevel.otel4s.trace.Span +import org.typelevel.otel4s.trace.Tracer import skunk.Statement import skunk.exception.SkunkException import skunk.exception.EmptyStatementException @@ -25,7 +28,7 @@ trait Query[F[_]] { object Query { - def apply[F[_]: Exchange: MessageSocket: Trace]( + def apply[F[_]: Exchange: MessageSocket: Tracer]( implicit ev: MonadError[F, Throwable] ): Query[F] = new Unroll[F] with Query[F] { @@ -70,9 +73,9 @@ object Query { } override def apply[B](query: skunk.Query[Void, B], ty: Typer): F[List[B]] = - exchange("query") { - Trace[F].put( - "query.sql" -> query.sql + exchange("query") { (span: Span[F]) => + span.addAttribute( + Attribute(AttributeKey.string("query.sql"), query.sql) ) *> send(QueryMessage(query.sql)) *> flatExpect { // If we get a RowDescription back it means we have a valid query as far as Postgres is @@ -161,9 +164,9 @@ object Query { } override def apply(command: Command[Void]): F[Completion] = - exchange("query") { - Trace[F].put( - "command.sql" -> command.sql + exchange("query") { (span: Span[F]) => + span.addAttribute( + Attribute(AttributeKey.string("command.sql"), command.sql) ) *> send(QueryMessage(command.sql)) *> flatExpect { case CommandComplete(c) => diff --git a/modules/core/shared/src/main/scala/net/protocol/Startup.scala b/modules/core/shared/src/main/scala/net/protocol/Startup.scala index 9d9e9035..7f0070ef 100644 --- a/modules/core/shared/src/main/scala/net/protocol/Startup.scala +++ b/modules/core/shared/src/main/scala/net/protocol/Startup.scala @@ -6,7 +6,10 @@ package skunk.net.protocol import cats.{ApplicativeError, MonadError} import cats.syntax.all._ -import natchez.Trace +import org.typelevel.otel4s.Attribute +import org.typelevel.otel4s.AttributeKey +import org.typelevel.otel4s.trace.Span +import org.typelevel.otel4s.trace.Tracer import scala.util.control.NonFatal import scodec.bits.ByteVector import skunk.net.MessageSocket @@ -25,18 +28,18 @@ trait Startup[F[_]] { object Startup extends StartupCompanionPlatform { - def apply[F[_]: Exchange: MessageSocket: Trace]( + def apply[F[_]: Exchange: MessageSocket: Tracer]( implicit ev: MonadError[F, Throwable] ): Startup[F] = new Startup[F] { override def apply(user: String, database: String, password: Option[String], parameters: Map[String, String]): F[Unit] = - exchange("startup") { + exchange("startup") { (span: Span[F]) => val sm = StartupMessage(user, database, parameters) for { - _ <- Trace[F].put( - "user" -> user, - "database" -> database - ) + _ <- span.addAttributes( + Attribute(AttributeKey.string("user"), user), + Attribute(AttributeKey.string("database"), database) + ) _ <- send(sm) _ <- flatExpectStartup(sm) { case AuthenticationOk => ().pure[F] @@ -58,13 +61,13 @@ object Startup extends StartupCompanionPlatform { } // already inside an exchange - private def authenticationCleartextPassword[F[_]: MessageSocket: Trace]( + private def authenticationCleartextPassword[F[_]: MessageSocket: Tracer]( sm: StartupMessage, password: Option[String] )( implicit ev: MonadError[F, Throwable] ): F[Unit] = - Trace[F].span("authenticationCleartextPassword") { + Tracer[F].span("authenticationCleartextPassword").surround { requirePassword[F](sm, password).flatMap { pw => for { _ <- send(PasswordMessage.cleartext(pw)) @@ -73,14 +76,14 @@ object Startup extends StartupCompanionPlatform { } } - private def authenticationMD5Password[F[_]: MessageSocket: Trace]( + private def authenticationMD5Password[F[_]: MessageSocket: Tracer]( sm: StartupMessage, password: Option[String], salt: Array[Byte] )( implicit ev: MonadError[F, Throwable] ): F[Unit] = - Trace[F].span("authenticationMD5Password") { + Tracer[F].span("authenticationMD5Password").surround { requirePassword[F](sm, password).flatMap { pw => for { _ <- send(PasswordMessage.md5(sm.user, pw, salt)) diff --git a/modules/core/shared/src/main/scala/net/protocol/Unroll.scala b/modules/core/shared/src/main/scala/net/protocol/Unroll.scala index 290a0497..3a4ce58b 100644 --- a/modules/core/shared/src/main/scala/net/protocol/Unroll.scala +++ b/modules/core/shared/src/main/scala/net/protocol/Unroll.scala @@ -14,7 +14,9 @@ import skunk.net.MessageSocket import skunk.net.Protocol.QueryPortal import skunk.util.Origin import skunk.data.TypedRowDescription -import natchez.Trace +import org.typelevel.otel4s.Attribute +import org.typelevel.otel4s.AttributeKey +import org.typelevel.otel4s.trace.Tracer import skunk.exception.PostgresErrorException import scala.util.control.NonFatal @@ -22,7 +24,7 @@ import scala.util.control.NonFatal * Superclass for `Query` and `Execute` sub-protocols, both of which need a way to accumulate * results in a `List` and report errors when decoding fails. */ -private[protocol] class Unroll[F[_]: MessageSocket: Trace]( +private[protocol] class Unroll[F[_]: MessageSocket: Tracer]( implicit ev: MonadError[F, Throwable] ) { @@ -82,17 +84,17 @@ private[protocol] class Unroll[F[_]: MessageSocket: Trace]( } val rows: F[List[List[Option[String]]] ~ Boolean] = - Trace[F].span("read") { + Tracer[F].span("read").use { span => accumulate(Nil).flatTap { case (rows, bool) => - Trace[F].put( - "row-count" -> rows.length, - "more-rows" -> bool + span.addAttributes( + Attribute(AttributeKey.long("row-count"), rows.length.toLong), + Attribute(AttributeKey.boolean("more-rows"), bool) ) } } rows.flatMap { case (rows, bool) => - Trace[F].span("decode") { + Tracer[F].span("decode").surround { rows.traverse { data => // https://github.com/tpolecat/skunk/issues/129 diff --git a/modules/core/shared/src/main/scala/net/protocol/package.scala b/modules/core/shared/src/main/scala/net/protocol/package.scala index e44dd1cb..b63d410d 100644 --- a/modules/core/shared/src/main/scala/net/protocol/package.scala +++ b/modules/core/shared/src/main/scala/net/protocol/package.scala @@ -6,14 +6,15 @@ package skunk.net import skunk.net.message._ import skunk.util.Namer +import skunk.util.Origin +import org.typelevel.otel4s.trace.Span +import org.typelevel.otel4s.trace.Tracer package object protocol { -import natchez.Trace -import skunk.util.Origin - def exchange[F[_]: Trace, A](label: String)(fa: F[A])( + def exchange[F[_]: Tracer, A](label: String)(f: Span[F] => F[A])( implicit exchange: Exchange[F] - ): F[A] = Trace[F].span(label)(exchange(fa)) + ): F[A] = Tracer[F].span(label).use(span => exchange(f(span))) def receive[F[_]](implicit ev: MessageSocket[F]): F[BackendMessage] = ev.receive @@ -39,4 +40,4 @@ import skunk.util.Origin def nextName[F[_]](prefix: String)(implicit ev: Namer[F]): F[String] = ev.nextName(prefix) -} \ No newline at end of file +} diff --git a/modules/core/shared/src/main/scala/util/Pool.scala b/modules/core/shared/src/main/scala/util/Pool.scala index 00a8d4bf..3a26dc5d 100644 --- a/modules/core/shared/src/main/scala/util/Pool.scala +++ b/modules/core/shared/src/main/scala/util/Pool.scala @@ -11,7 +11,7 @@ import cats.effect.implicits._ import cats.effect.Resource import cats.syntax.all._ import skunk.exception.SkunkException -import natchez.Trace +import org.typelevel.otel4s.trace.Tracer object Pool { @@ -43,12 +43,12 @@ object Pool { ) // Preserved for previous use, and specifically simpler use for - // Trace systems that are universal rather than shorter scoped. - def of[F[_]: Concurrent: Trace, A]( + // Tracer systems that are universal rather than shorter scoped. + def of[F[_]: Concurrent: Tracer, A]( rsrc: Resource[F, A], size: Int)( recycler: Recycler[F, A] - ): Resource[F, Resource[F, A]] = ofF({(_: Trace[F]) => rsrc}, size)(recycler).map(_.apply(Trace[F])) + ): Resource[F, Resource[F, A]] = ofF({(_: Tracer[F]) => rsrc}, size)(recycler).map(_.apply(Tracer[F])) /** * A pooled resource (which is itself a managed resource). @@ -58,10 +58,10 @@ object Pool { * yielding false here means the element should be freed and removed from the pool. */ def ofF[F[_]: Concurrent, A]( - rsrc: Trace[F] => Resource[F, A], + rsrc: Tracer[F] => Resource[F, A], size: Int)( recycler: Recycler[F, A] - ): Resource[F, Trace[F] => Resource[F, A]] = { + ): Resource[F, Tracer[F] => Resource[F, A]] = { // Just in case. assert(size > 0, s"Pool size must be positive (you passed $size).") @@ -77,14 +77,14 @@ object Pool { ) // We can construct a pool given a Ref containing our initial state. - def poolImpl(ref: Ref[F, State])(implicit T: Trace[F]): Resource[F, A] = { + def poolImpl(ref: Ref[F, State])(implicit T: Tracer[F]): Resource[F, A] = { // To give out an alloc we create a deferral first, which we will need if there are no slots // available. If there is a filled slot, remove it and yield its alloc. If there is an empty // slot, remove it and allocate. If there are no slots, enqueue the deferral and wait on it, // which will [semantically] block the caller until an alloc is returned to the pool. val give: F[Alloc] = - Trace[F].span("pool.allocate") { + Tracer[F].span("pool.allocate").surround { Deferred[F, Either[Throwable, Alloc]].flatMap { d => // If allocation fails for any reason then there's no resource to return to the pool @@ -98,7 +98,7 @@ object Pool { // all (defer and wait). ref.modify { case (Some(a) :: os, ds) => ((os, ds), a.pure[F]) - case (None :: os, ds) => ((os, ds), Concurrent[F].onError(rsrc(Trace[F]).allocated)(restore)) + case (None :: os, ds) => ((os, ds), Concurrent[F].onError(rsrc(Tracer[F]).allocated)(restore)) case (Nil, ds) => ((Nil, ds :+ d), d.get.flatMap(_.liftTo[F].onError(restore))) } .flatten @@ -109,7 +109,7 @@ object Pool { // there are a bunch of error conditions to consider. This operation is a finalizer and // cannot be canceled, so we don't need to worry about that case here. def take(a: Alloc): F[Unit] = - Trace[F].span("pool.free") { + Tracer[F].span("pool.free").surround { recycler(a._1).onError { case t => dispose(a) *> t.raiseError[F, Unit] } flatMap { @@ -121,7 +121,7 @@ object Pool { // Return `a` to the pool. If there are awaiting deferrals, complete the next one. Otherwise // push a filled slot into the queue. def recycle(a: Alloc): F[Unit] = - Trace[F].span("recycle") { + Tracer[F].span("recycle").surround { ref.modify { case (os, d :: ds) => ((os, ds), d.complete(a.asRight).void) // hand it back out case (os, Nil) => ((Some(a) :: os, Nil), ().pure[F]) // return to pool @@ -133,10 +133,10 @@ object Pool { // of `a`. If there are deferrals, remove the next one and complete it (failures in allocation // are handled by the awaiting deferral in `give` above). Always finalize `a` def dispose(a: Alloc): F[Unit] = - Trace[F].span("dispose") { + Tracer[F].span("dispose").surround { ref.modify { case (os, Nil) => ((os :+ None, Nil), ().pure[F]) // new empty slot - case (os, d :: ds) => ((os, ds), Concurrent[F].attempt(rsrc(Trace[F]).allocated).flatMap(d.complete).void) // alloc now! + case (os, d :: ds) => ((os, ds), Concurrent[F].attempt(rsrc(Tracer[F]).allocated).flatMap(d.complete).void) // alloc now! } .guarantee(a._2).flatten } @@ -169,8 +169,8 @@ object Pool { } - Resource.make(alloc)(free).map(a => {implicit T: Trace[F] => poolImpl(a)}) + Resource.make(alloc)(free).map(a => {implicit T: Tracer[F] => poolImpl(a)}) } -} \ No newline at end of file +}