Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: prometheus attributes format to otel attributes #4188

Merged
merged 12 commits into from
Dec 13, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ import java.time.{Duration, Instant}
case class OpenTelemetryMetrics[F[_]](meter: Meter, metrics: List[Metric[F, _]]) {

/** Registers a `request_active{path, method}` up-down-counter (assuming default labels). */
def addRequestsActive(labels: MetricLabels = MetricLabels.Default): OpenTelemetryMetrics[F] =
def addRequestsActive(labels: MetricLabels = OpenTelemetryAttributes): OpenTelemetryMetrics[F] =
copy(metrics = metrics :+ requestActive(meter, labels))

/** Registers a `request_total{path, method, status}` counter (assuming default labels). */
def addRequestsTotal(labels: MetricLabels = MetricLabels.Default): OpenTelemetryMetrics[F] =
def addRequestsTotal(labels: MetricLabels = OpenTelemetryAttributes): OpenTelemetryMetrics[F] =
copy(metrics = metrics :+ requestTotal(meter, labels))

/** Registers a `request_duration_seconds{path, method, status, phase}` histogram (assuming default labels). */
def addRequestsDuration(labels: MetricLabels = MetricLabels.Default): OpenTelemetryMetrics[F] =
def addRequestsDuration(labels: MetricLabels = OpenTelemetryAttributes): OpenTelemetryMetrics[F] =
copy(metrics = metrics :+ requestDuration(meter, labels))

/** Registers a custom metric. */
Expand All @@ -36,6 +36,32 @@ case class OpenTelemetryMetrics[F[_]](meter: Meter, metrics: List[Metric[F, _]])

object OpenTelemetryMetrics {

/** Default labels for OpenTelemetry-compliant metrics, as recommended here:
* https://opentelemetry.io/docs/specs/semconv/http/http-metrics/#http-server
*
* - `http.request.method` - HTTP request method (e.g., GET, POST).
* - `path` - The request path or route template.
* - `http.response.status_code` - HTTP response status code (200, 404, etc.).
*/
lazy val OpenTelemetryAttributes: MetricLabels = MetricLabels(
forRequest = List(
"http.request.method" -> { case (_, req) => req.method.method },
"url.scheme" -> { case (_, req) => req.uri.scheme.getOrElse("unknown") },
"path" -> { case (ep, _) => ep.showPathTemplate(showQueryParam = None) }
),
forResponse = List(
"http.response.status_code" -> {
case Right(r) => Some(r.code.code.toString)
// Default to 500 for exceptions
case Left(_) => Some("500")
},
"error.type" -> {
case Left(ex) => Some(ex.getClass.getName) // Exception class name for errors
case Right(_) => None
}
)
)

def apply[F[_]](meter: Meter): OpenTelemetryMetrics[F] = apply(meter, Nil)
def apply[F[_]](otel: OpenTelemetry): OpenTelemetryMetrics[F] = apply(defaultMeter(otel), Nil)
def apply[F[_]](otel: OpenTelemetry, metrics: List[Metric[F, _]]): OpenTelemetryMetrics[F] = apply(defaultMeter(otel), metrics)
Expand All @@ -50,7 +76,7 @@ object OpenTelemetryMetrics {
* measured separately up to the point where the headers are determined, and then once again when the whole response body is complete.
*/
def default[F[_]](otel: OpenTelemetry): OpenTelemetryMetrics[F] =
default(defaultMeter(otel), MetricLabels.Default)
default(defaultMeter(otel), OpenTelemetryAttributes)

/** Registers default metrics (see other variants) using custom labels. */
def default[F[_]](otel: OpenTelemetry, labels: MetricLabels): OpenTelemetryMetrics[F] = default(defaultMeter(otel), labels)
Expand All @@ -64,10 +90,10 @@ object OpenTelemetryMetrics {
* Status is by default the status code class (1xx, 2xx, etc.), and phase can be either `headers` or `body` - request duration is
* measured separately up to the point where the headers are determined, and then once again when the whole response body is complete.
*/
def default[F[_]](meter: Meter): OpenTelemetryMetrics[F] = default(meter, MetricLabels.Default)
def default[F[_]](meter: Meter): OpenTelemetryMetrics[F] = default(meter, OpenTelemetryAttributes)

/** Registers default metrics (see other variants) using custom labels. */
def default[F[_]](meter: Meter, labels: MetricLabels = MetricLabels.Default): OpenTelemetryMetrics[F] =
def default[F[_]](meter: Meter, labels: MetricLabels = OpenTelemetryAttributes): OpenTelemetryMetrics[F] =
OpenTelemetryMetrics(
meter,
List[Metric[F, _]](
Expand All @@ -80,7 +106,7 @@ object OpenTelemetryMetrics {
def requestActive[F[_]](meter: Meter, labels: MetricLabels): Metric[F, LongUpDownCounter] =
Metric[F, LongUpDownCounter](
meter
.upDownCounterBuilder("request_active")
.upDownCounterBuilder("http.server.active_requests")
.setDescription("Active HTTP requests")
.setUnit("1")
.build(),
Expand All @@ -97,7 +123,7 @@ object OpenTelemetryMetrics {
def requestTotal[F[_]](meter: Meter, labels: MetricLabels): Metric[F, LongCounter] =
Metric[F, LongCounter](
meter
.counterBuilder("request_total")
.counterBuilder("http.server.request.total")
.setDescription("Total HTTP requests")
.setUnit("1")
.build(),
Expand All @@ -108,6 +134,7 @@ object OpenTelemetryMetrics {
m.eval {
val otLabels =
merge(asOpenTelemetryAttributes(labels, ep, req), asOpenTelemetryAttributes(labels, Right(res), None))

counter.add(1, otLabels)
}
}
Expand All @@ -125,9 +152,9 @@ object OpenTelemetryMetrics {
def requestDuration[F[_]](meter: Meter, labels: MetricLabels): Metric[F, DoubleHistogram] =
Metric[F, DoubleHistogram](
meter
.histogramBuilder("request_duration")
.histogramBuilder("http.server.request.duration")
.setDescription("Duration of HTTP requests")
.setUnit("ms")
.setUnit("s")
.build(),
onRequest = (req, recorder, m) =>
m.eval {
Expand Down Expand Up @@ -170,7 +197,10 @@ object OpenTelemetryMetrics {
l.forRequest.foldLeft(Attributes.builder())((b, label) => { b.put(label._1, label._2(ep, req)) }).build()

private def asOpenTelemetryAttributes(l: MetricLabels, res: Either[Throwable, ServerResponse[_]], phase: Option[String]): Attributes = {
val builder = l.forResponse.foldLeft(Attributes.builder())((b, label) => { b.put(label._1, label._2(res)) })
val builder = Attributes.builder()
l.forResponse.foreach { case (key, valueFn) =>
valueFn(res).foreach(value => builder.put(key, value))
}
phase.foreach(v => builder.put(l.forResponsePhase.name, v))
builder.build()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.concurrent.Future

class OpenTelemetryMetricsTest extends AnyFlatSpec with Matchers {

"default metrics" should "collect requests active" in {
"default metrics" should "collect http.server.active_requests" in {
// given
val reader = InMemoryMetricReader.create()
val provider = SdkMeterProvider.builder().registerMetricReader(reader).build()
Expand All @@ -51,17 +51,31 @@ class OpenTelemetryMetricsTest extends AnyFlatSpec with Matchers {

// then
val point = longSumData(reader).head
point.getAttributes shouldBe Attributes.of(AttributeKey.stringKey("method"), "GET", AttributeKey.stringKey("path"), "/person")
point.getAttributes shouldBe Attributes.of(
AttributeKey.stringKey("http.request.method"),
"GET",
AttributeKey.stringKey("path"),
"/person",
AttributeKey.stringKey("url.scheme"),
"http"
)
point.getValue shouldBe 1

ScalaFutures.whenReady(response, Timeout(Span(3, Seconds))) { _ =>
val point = longSumData(reader).head
point.getAttributes shouldBe Attributes.of(AttributeKey.stringKey("method"), "GET", AttributeKey.stringKey("path"), "/person")
point.getAttributes shouldBe Attributes.of(
AttributeKey.stringKey("http.request.method"),
"GET",
AttributeKey.stringKey("path"),
"/person",
AttributeKey.stringKey("url.scheme"),
"http"
)
point.getValue shouldBe 0
}
}

"default metrics" should "collect requests total" in {
"default metrics" should "collect http.server.request.total" in {
// given
val reader = InMemoryMetricReader.create()
val provider = SdkMeterProvider.builder().registerMetricReader(reader).build()
Expand All @@ -87,29 +101,33 @@ class OpenTelemetryMetricsTest extends AnyFlatSpec with Matchers {
.count {
case dp
if dp.getAttributes == Attributes.of(
AttributeKey.stringKey("method"),
AttributeKey.stringKey("http.request.method"),
"GET",
AttributeKey.stringKey("path"),
"/person",
AttributeKey.stringKey("status"),
"2xx"
AttributeKey.stringKey("url.scheme"),
"http",
AttributeKey.stringKey("http.response.status_code"),
"200",
) && dp.getValue == 2 =>
true
case dp
if dp.getAttributes == Attributes.of(
AttributeKey.stringKey("method"),
AttributeKey.stringKey("http.request.method"),
"GET",
AttributeKey.stringKey("path"),
"/person",
AttributeKey.stringKey("status"),
"4xx"
AttributeKey.stringKey("url.scheme"),
"http",
AttributeKey.stringKey("http.response.status_code"),
"400",
) && dp.getValue == 2 =>
true
case _ => false
} shouldBe 2
}

"default metrics" should "collect requests duration" in {
"default metrics" should "collect http.server.request.duration" in {
// given
val reader = InMemoryMetricReader.create()
val provider = SdkMeterProvider.builder().registerMetricReader(reader).build()
Expand Down Expand Up @@ -140,14 +158,16 @@ class OpenTelemetryMetricsTest extends AnyFlatSpec with Matchers {
val point = reader.collectAllMetrics().asScala.head.getHistogramData.getPoints.asScala
point.map(_.getAttributes) should contain(
Attributes.of(
AttributeKey.stringKey("method"),
AttributeKey.stringKey("http.request.method"),
"GET",
AttributeKey.stringKey("path"),
"/person",
AttributeKey.stringKey("status"),
"2xx",
AttributeKey.stringKey("http.response.status_code"),
"200",
AttributeKey.stringKey("phase"),
"body"
"body",
AttributeKey.stringKey("url.scheme"),
"http",
)
)
}
Expand Down Expand Up @@ -197,12 +217,14 @@ class OpenTelemetryMetricsTest extends AnyFlatSpec with Matchers {
// then
val point = longSumData(reader).head
point.getAttributes shouldBe Attributes.of(
AttributeKey.stringKey("method"),
AttributeKey.stringKey("http.request.method"),
"GET",
AttributeKey.stringKey("path"),
"/person",
AttributeKey.stringKey("status"),
"5xx"
AttributeKey.stringKey("url.scheme"),
"http",
AttributeKey.stringKey("http.response.status_code"),
"500"
)
point.getValue shouldBe 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,12 @@ object ZioMetrics {

/** Convert into zio metric labels */
private def asZioLabel(l: MetricLabels, res: Either[Throwable, ServerResponse[_]], phase: Option[String]): Set[MetricLabel] = {
l.forResponse.map(label => zio.metrics.MetricLabel(label._1, label._2(res))) ++
phase.map(v => MetricLabel(l.forResponsePhase.name, v))
}.toSet
val responseLabels = l.forResponse.map { case (key, valueFn) =>
Copy link
Contributor Author

@varshith257 varshith257 Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does we need to account Option[String] for Zio Metrics ig it may account for other metrics too?

Or better strategy to move this logic to OpenTelemetry module and introduce option labels than affecting other modules

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's fine, let's just have optional labels. This might be useful for ZIO as well

MetricLabel(key, valueFn(res).getOrElse("unknown"))
}
val phaseLabel = phase.map(v => MetricLabel(l.forResponsePhase.name, v))
(responseLabels ++ phaseLabel).toSet
}

/** Requests active metric collector. */
def requestActive[F[_]](namespace: String, labels: MetricLabels): Metric[F, Gauge[Long]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ case class EndpointMetric[F[_]](
case class ResponsePhaseLabel(name: String, headersValue: String, bodyValue: String)
case class MetricLabels(
forRequest: List[(String, (AnyEndpoint, ServerRequest) => String)],
forResponse: List[(String, Either[Throwable, ServerResponse[_]] => String)],
forResponse: List[(String, Either[Throwable, ServerResponse[_]] => Option[String])],
forResponsePhase: ResponsePhaseLabel = ResponsePhaseLabel("phase", "headers", "body")
) {
def namesForRequest: List[String] = forRequest.map { case (name, _) => name }
def namesForResponse: List[String] = forResponse.map { case (name, _) => name }

def valuesForRequest(ep: AnyEndpoint, req: ServerRequest): List[String] = forRequest.map { case (_, f) => f(ep, req) }
def valuesForResponse(res: ServerResponse[_]): List[String] = forResponse.map { case (_, f) => f(Right(res)) }
def valuesForResponse(ex: Throwable): List[String] = forResponse.map { case (_, f) => f(Left(ex)) }
def valuesForResponse(res: ServerResponse[_]): List[String] = forResponse.flatMap { case (_, f) => f(Right(res)).toList }
def valuesForResponse(ex: Throwable): List[String] = forResponse.flatMap { case (_, f) => f(Left(ex)).toList }
}

object MetricLabels {
Expand All @@ -51,15 +51,15 @@ object MetricLabels {
forResponse = List(
"status" -> {
case Right(r) =>
r.code match {
Some(r.code match {
case c if c.isInformational => "1xx"
case c if c.isSuccess => "2xx"
case c if c.isRedirect => "3xx"
case c if c.isClientError => "4xx"
case c if c.isServerError => "5xx"
case _ => ""
}
case Left(_) => "5xx"
})
case Left(_) => Some("5xx")
}
)
)
Expand Down
Loading