Skip to content

Commit

Permalink
Trace core with otel4s
Browse files Browse the repository at this point in the history
  • Loading branch information
rossabaker committed Jan 31, 2023
1 parent 3349de9 commit 604fd44
Show file tree
Hide file tree
Showing 18 changed files with 174 additions and 149 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform)
"org.scodec" %%% "scodec-bits" % "1.1.34",
"org.scodec" %%% "scodec-core" % (if (tlIsScala3.value) "2.2.0" else "1.11.10"),
"org.scodec" %%% "scodec-cats" % "1.2.0",
"org.tpolecat" %%% "natchez-core" % natchezVersion,
"org.typelevel" %%% "otel4s-core-trace" % "0.0-2ef92ef-SNAPSHOT",
"org.tpolecat" %%% "sourcepos" % "1.1.0",
"org.scala-lang.modules" %%% "scala-collection-compat" % "2.9.0",
) ++ Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package skunk.net.protocol

import cats.MonadThrow
import cats.syntax.all._
import natchez.Trace
import org.typelevel.otel4s.trace.Tracer
import skunk.net.MessageSocket
import skunk.net.message._
import skunk.exception.{
Expand All @@ -16,12 +16,12 @@ import skunk.exception.{

private[protocol] trait StartupCompanionPlatform { this: Startup.type =>

private[protocol] def authenticationSASL[F[_]: MonadThrow: MessageSocket: Trace](
private[protocol] def authenticationSASL[F[_]: MonadThrow: MessageSocket: Tracer](
sm: StartupMessage,
password: Option[String],
mechanisms: List[String]
): F[Unit] =
Trace[F].span("authenticationSASL") {
Tracer[F].span("authenticationSASL") {
if (mechanisms.contains(Scram.SaslMechanism)) {
for {
pw <- requirePassword[F](sm, password)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import com.ongres.scram.common.stringprep.StringPreparations

import cats.MonadError
import cats.syntax.all._
import natchez.Trace
import org.typelevel.otel4s.trace.Tracer
import scala.util.control.NonFatal
import skunk.net.MessageSocket
import skunk.net.message._
Expand All @@ -20,14 +20,14 @@ import skunk.exception.{

private[protocol] trait StartupCompanionPlatform { this: Startup.type =>

private[protocol] def authenticationSASL[F[_]: MessageSocket: Trace](
private[protocol] def authenticationSASL[F[_]: MessageSocket: Tracer](
sm: StartupMessage,
password: Option[String],
mechanisms: List[String]
)(
implicit ev: MonadError[F, Throwable]
): F[Unit] =
Trace[F].span("authenticationSASL") {
Tracer[F].span("authenticationSASL").surround {
for {
client <- {
try ScramClient.
Expand Down
28 changes: 14 additions & 14 deletions modules/core/shared/src/main/scala/Session.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import fs2.concurrent.Signal
import fs2.io.net.{ Network, SocketGroup, SocketOption }
import fs2.Pipe
import fs2.Stream
import natchez.Trace
import org.typelevel.otel4s.trace.Tracer
import skunk.codec.all.bool
import skunk.data._
import skunk.net.Protocol
Expand Down Expand Up @@ -383,7 +383,7 @@ object Session {
* @param queryCache Size of the cache for query checking
* @group Constructors
*/
def pooled[F[_]: Temporal: Trace: Network: Console](
def pooled[F[_]: Temporal: Tracer: Network: Console](
host: String,
port: Int = 5432,
user: String,
Expand All @@ -400,11 +400,11 @@ object Session {
parseCache: Int = 1024,
readTimeout: Duration = Duration.Inf,
): Resource[F, Resource[F, Session[F]]] = {
pooledF[F](host, port, user, database, password, max, debug, strategy, ssl, parameters, socketOptions, commandCache, queryCache, parseCache, readTimeout).map(_.apply(Trace[F]))
pooledF[F](host, port, user, database, password, max, debug, strategy, ssl, parameters, socketOptions, commandCache, queryCache, parseCache, readTimeout).map(_.apply(Tracer[F]))
}

/**
* Resource yielding a function from Trace to `SessionPool` managing up to `max` concurrent `Session`s. Typically you
* Resource yielding a function from Tracer to `SessionPool` managing up to `max` concurrent `Session`s. Typically you
* will `use` this resource once on application startup and pass the resulting
* `Resource[F, Session[F]]` to the rest of your program.
*
Expand Down Expand Up @@ -440,9 +440,9 @@ object Session {
queryCache: Int = 1024,
parseCache: Int = 1024,
readTimeout: Duration = Duration.Inf,
): Resource[F, Trace[F] => Resource[F, Session[F]]] = {
): Resource[F, Tracer[F] => Resource[F, Session[F]]] = {

def session(socketGroup: SocketGroup[F], sslOp: Option[SSLNegotiation.Options[F]], cache: Describe.Cache[F])(implicit T: Trace[F]): Resource[F, Session[F]] =
def session(socketGroup: SocketGroup[F], sslOp: Option[SSLNegotiation.Options[F]], cache: Describe.Cache[F])(implicit T: Tracer[F]): Resource[F, Session[F]] =
for {
pc <- Resource.eval(Parse.Cache.empty[F](parseCache))
s <- fromSocketGroup[F](socketGroup, host, port, user, database, password, debug, strategy, socketOptions, sslOp, parameters, cache, pc, readTimeout)
Expand All @@ -453,7 +453,7 @@ object Session {
for {
dc <- Resource.eval(Describe.Cache.empty[F](commandCache, queryCache))
sslOp <- ssl.toSSLNegotiationOptions(if (debug) logger.some else none)
pool <- Pool.ofF({implicit T: Trace[F] => session(Network[F], sslOp, dc)}, max)(Recyclers.full)
pool <- Pool.ofF({implicit T: Tracer[F] => session(Network[F], sslOp, dc)}, max)(Recyclers.full)
} yield pool
}

Expand All @@ -463,7 +463,7 @@ object Session {
* single-session pool. This method is shorthand for `Session.pooled(..., max = 1, ...).flatten`.
* @see pooled
*/
def single[F[_]: Temporal: Trace: Network: Console](
def single[F[_]: Temporal: Tracer: Network: Console](
host: String,
port: Int = 5432,
user: String,
Expand All @@ -478,10 +478,10 @@ object Session {
parseCache: Int = 1024,
readTimeout: Duration = Duration.Inf,
): Resource[F, Session[F]] =
singleF[F](host, port, user, database, password, debug, strategy, ssl, parameters, commandCache, queryCache, parseCache, readTimeout).apply(Trace[F])
singleF[F](host, port, user, database, password, debug, strategy, ssl, parameters, commandCache, queryCache, parseCache, readTimeout).apply(Tracer[F])

/**
* Resource yielding logically unpooled sessions given a Trace. This can be convenient for demonstrations and
* Resource yielding logically unpooled sessions given a Tracer. This can be convenient for demonstrations and
* programs that only need a single session. In reality each session is managed by its own
* single-session pool.
* @see pooledF
Expand All @@ -500,8 +500,8 @@ object Session {
queryCache: Int = 1024,
parseCache: Int = 1024,
readTimeout: Duration = Duration.Inf,
): Trace[F] => Resource[F, Session[F]] =
Kleisli((_: Trace[F]) => pooledF(
): Tracer[F] => Resource[F, Session[F]] =
Kleisli((_: Tracer[F]) => pooledF(
host = host,
port = port,
user = user,
Expand All @@ -517,10 +517,10 @@ object Session {
parseCache = parseCache,
readTimeout = readTimeout
)).flatMap(f =>
Kleisli { implicit T: Trace[F] => f(T) }
Kleisli { implicit T: Tracer[F] => f(T) }
).run

def fromSocketGroup[F[_]: Temporal: Trace: Console](
def fromSocketGroup[F[_]: Temporal: Tracer: Console](
socketGroup: SocketGroup[F],
host: String,
port: Int = 5432,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
package skunk.exception

import cats.syntax.all._
import natchez.TraceValue
import org.typelevel.otel4s.Attribute
import org.typelevel.otel4s.AttributeKey
import skunk.SqlState
import skunk.data.Type
import skunk.util.Origin
Expand Down Expand Up @@ -33,26 +34,26 @@ class PostgresErrorException (
argumentsOrigin = argumentsOrigin,
) {

override def fields: Map[String, TraceValue] = {
var map = super.fields

map += "error.postgres.message" -> message
map += "error.postgres.severity" -> severity
map += "error.postgres.code" -> code

internalPosition.foreach(a => map += "error.postgres.internalPosition" -> a)
internalQuery .foreach(a => map += "error.postgres.internalQuery" -> a)
where .foreach(a => map += "error.postgres.where" -> a)
schemaName .foreach(a => map += "error.postgres.schemaName" -> a)
tableName .foreach(a => map += "error.postgres.tableName" -> a)
columnName .foreach(a => map += "error.postgres.columnName" -> a)
dataTypeName .foreach(a => map += "error.postgres.dataTypeName" -> a)
constraintName .foreach(a => map += "error.postgres.constraintName" -> a)
fileName .foreach(a => map += "error.postgres.fileName" -> a)
line .foreach(a => map += "error.postgres.line" -> a)
routine .foreach(a => map += "error.postgres.routine" -> a)

map
override def fields: List[Attribute[_]] = {
val builder = List.newBuilder[Attribute[_]]

builder += Attribute(AttributeKey.string("error.postgres.message") , message)
builder += Attribute(AttributeKey.string("error.postgres.severity") , severity)
builder += Attribute(AttributeKey.string("error.postgres.code") , code)

internalPosition.foreach(a => builder += Attribute(AttributeKey.long("error.postgres.internalPosition") , a.toLong))
internalQuery .foreach(a => builder += Attribute(AttributeKey.long("error.postgres.internalQuery") , a.toLong))
where .foreach(a => builder += Attribute(AttributeKey.string("error.postgres.where") , a))
schemaName .foreach(a => builder += Attribute(AttributeKey.string("error.postgres.schemaName") , a))
tableName .foreach(a => builder += Attribute(AttributeKey.string("error.postgres.tableName") , a))
columnName .foreach(a => builder += Attribute(AttributeKey.string("error.postgres.columnName") , a))
dataTypeName .foreach(a => builder += Attribute(AttributeKey.string("error.postgres.dataTypeName") , a))
constraintName .foreach(a => builder += Attribute(AttributeKey.string("error.postgres.constraintName") , a))
fileName .foreach(a => builder += Attribute(AttributeKey.string("error.postgres.fileName") , a))
line .foreach(a => builder += Attribute(AttributeKey.long("error.postgres.line") , a.toLong))
routine .foreach(a => builder += Attribute(AttributeKey.string("error.postgres.routine") , a))

builder.result()
}

/**
Expand Down
42 changes: 21 additions & 21 deletions modules/core/shared/src/main/scala/exception/SkunkException.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
package skunk.exception

import cats.syntax.all._
import org.typelevel.otel4s.Attribute
import org.typelevel.otel4s.AttributeKey
import skunk.data.Type
import skunk.Query
import skunk.util.{ CallSite, Origin, Pretty }
import natchez.Fields
import natchez.TraceValue

class SkunkException protected[skunk](
val sql: Option[String],
Expand All @@ -22,41 +22,41 @@ class SkunkException protected[skunk](
val sqlOrigin: Option[Origin] = None,
val argumentsOrigin: Option[Origin] = None,
val callSite: Option[CallSite] = None
) extends Exception(message) with Fields {
) extends Exception(message) {

override def fields: Map[String, TraceValue] = {
def fields: List[Attribute[_]] = {

var map: Map[String, TraceValue] = Map.empty
val builder = List.newBuilder[Attribute[_]]

map += "error.message" -> message
builder += Attribute(AttributeKey.string("error.message"), message)

sql .foreach(a => map += "error.sql" -> a)
position.foreach(a => map += "error.position" -> a)
detail .foreach(a => map += "error.detail" -> a)
hint .foreach(a => map += "error.hint" -> a)
sql .foreach(a => builder += Attribute(AttributeKey.string("error.sql") , a))
position.foreach(a => builder += Attribute(AttributeKey.long("error.position") , a.toLong))
detail .foreach(a => builder += Attribute(AttributeKey.string("error.detail") , a))
hint .foreach(a => builder += Attribute(AttributeKey.string("error.hint") , a))

(arguments.zipWithIndex).foreach { case ((typ, os), n) =>
map += s"error.argument.${n + 1}.type" -> typ.name
map += s"error.argument.${n + 1}.value" -> os.getOrElse[String]("NULL")
builder += Attribute(AttributeKey.string(s"error.argument.${n + 1}.type") , typ.name)
builder += Attribute(AttributeKey.string(s"error.argument.${n + 1}.value") , os.getOrElse[String]("NULL"))
}

sqlOrigin.foreach { o =>
map += "error.sqlOrigin.file" -> o.file
map += "error.sqlOrigin.line" -> o.line
builder += Attribute(AttributeKey.string("error.sqlOrigin.file") , o.file)
builder += Attribute(AttributeKey.long("error.sqlOrigin.line") , o.line.toLong)
}

argumentsOrigin.foreach { o =>
map += "error.argumentsOrigin.file" -> o.file
map += "error.argumentsOrigin.line" -> o.line
builder += Attribute(AttributeKey.string("error.argumentsOrigin.file") , o.file)
builder += Attribute(AttributeKey.long("error.argumentsOrigin.line") , o.line.toLong)
}

callSite.foreach { cs =>
map += "error.callSite.origin.file" -> cs.origin.file
map += "error.callSite.origin.line" -> cs.origin.line
map += "error.callSite.origin.method" -> cs.methodName
builder += Attribute(AttributeKey.string("error.callSite.origin.file") , cs.origin.file)
builder += Attribute(AttributeKey.long("error.callSite.origin.line") , cs.origin.line.toLong)
builder += Attribute(AttributeKey.string("error.callSite.origin.method") , cs.methodName)
}

map
builder.result()
}

protected def title: String =
Expand Down Expand Up @@ -148,4 +148,4 @@ object SkunkException {
argumentsOrigin = argsOrigin
)

}
}
6 changes: 3 additions & 3 deletions modules/core/shared/src/main/scala/net/Protocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import skunk.{ Command, Query, Statement, ~, Void }
import skunk.data._
import skunk.util.{ Namer, Origin }
import skunk.util.Typer
import natchez.Trace
import org.typelevel.otel4s.trace.Tracer
import fs2.io.net.{ SocketGroup, SocketOption }
import skunk.net.protocol.Describe
import scala.concurrent.duration.Duration
Expand Down Expand Up @@ -196,7 +196,7 @@ object Protocol {
* @param host Postgres server host
* @param port Postgres port, default 5432
*/
def apply[F[_]: Temporal: Trace: Console](
def apply[F[_]: Temporal: Tracer: Console](
host: String,
port: Int,
debug: Boolean,
Expand All @@ -213,7 +213,7 @@ object Protocol {
p <- Resource.eval(fromMessageSocket(bms, nam, describeCache, parseCache))
} yield p

def fromMessageSocket[F[_]: Concurrent: Trace](
def fromMessageSocket[F[_]: Concurrent: Tracer](
bms: BufferedMessageSocket[F],
nam: Namer[F],
dc: Describe.Cache[F],
Expand Down
17 changes: 10 additions & 7 deletions modules/core/shared/src/main/scala/net/protocol/Bind.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ import skunk.net.message.{ Bind => BindMessage, Close => _, _ }
import skunk.net.MessageSocket
import skunk.net.Protocol.{ PreparedStatement, PortalId }
import skunk.util.{ Origin, Namer }
import natchez.Trace
import org.typelevel.otel4s.Attribute
import org.typelevel.otel4s.AttributeKey
import org.typelevel.otel4s.trace.Span
import org.typelevel.otel4s.trace.Tracer

trait Bind[F[_]] {

Expand All @@ -26,7 +29,7 @@ trait Bind[F[_]] {

object Bind {

def apply[F[_]: Exchange: MessageSocket: Namer: Trace](
def apply[F[_]: Exchange: MessageSocket: Namer: Tracer](
implicit ev: MonadError[F, Throwable]
): Bind[F] =
new Bind[F] {
Expand All @@ -37,14 +40,14 @@ object Bind {
argsOrigin: Origin
): Resource[F, PortalId] =
Resource.make {
exchange("bind") {
exchange("bind") { (span: Span[F]) =>
for {
pn <- nextName("portal").map(PortalId(_))
ea = statement.statement.encoder.encode(args) // encoded args
_ <- Trace[F].put(
"arguments" -> ea.map(_.orNull).mkString(","),
"portal-id" -> pn.value
)
_ <- span.addAttributes(
Attribute(AttributeKey.string("arguments"), ea.map(_.orNull).mkString(",")),
Attribute(AttributeKey.string("portal-id"), pn.value)
)
_ <- send(BindMessage(pn.value, statement.id.value, ea))
_ <- send(Flush)
_ <- flatExpect {
Expand Down
Loading

0 comments on commit 604fd44

Please sign in to comment.