From 53b6b89dc04111b5855573c147432be6418af56e Mon Sep 17 00:00:00 2001 From: Vamshi Maskuri <117595548+varshith257@users.noreply.github.com> Date: Wed, 30 Oct 2024 17:23:47 +0530 Subject: [PATCH] Add OpenTelemetry attributes to the metrics backend (#2327) --- .../OpenTelemetryMetricsBackend.scala | 92 ++++++++++++++----- .../OpenTelemetryMetricsBackendTest.scala | 41 ++++++++- 2 files changed, 110 insertions(+), 23 deletions(-) diff --git a/observability/opentelemetry-metrics-backend/src/main/scala/sttp/client4/opentelemetry/OpenTelemetryMetricsBackend.scala b/observability/opentelemetry-metrics-backend/src/main/scala/sttp/client4/opentelemetry/OpenTelemetryMetricsBackend.scala index e4eaede43..149fda5a4 100644 --- a/observability/opentelemetry-metrics-backend/src/main/scala/sttp/client4/opentelemetry/OpenTelemetryMetricsBackend.scala +++ b/observability/opentelemetry-metrics-backend/src/main/scala/sttp/client4/opentelemetry/OpenTelemetryMetricsBackend.scala @@ -1,7 +1,7 @@ package sttp.client4.opentelemetry import io.opentelemetry.api.OpenTelemetry -import io.opentelemetry.api.common.Attributes +import io.opentelemetry.api.common.{AttributeKey, Attributes} import io.opentelemetry.api.metrics.{DoubleHistogram, LongCounter, LongUpDownCounter, Meter} import sttp.client4.listener.{ListenerBackend, RequestListener} import sttp.client4.wrappers.FollowRedirectsBackend @@ -108,44 +108,60 @@ private class OpenTelemetryMetricsListener( private val upAndDownCounter = new ConcurrentHashMap[String, LongUpDownCounter]() override def beforeRequest(request: GenericRequest[_, _]): Option[Long] = { - updateInProgressCounter(request, 1) - recordHistogram(requestToSizeHistogramMapper(request), request.contentLength) - requestToLatencyHistogramMapper(request).map(_ => clock.millis()) + val attributes = createRequestAttributes(request) + + updateInProgressCounter(request, 1, attributes) + recordHistogram(requestToSizeHistogramMapper(request), request.contentLength, attributes) + requestToLatencyHistogramMapper(request).map { _ => + val timestamp = clock.millis() + timestamp + } } override def requestSuccessful(request: GenericRequest[_, _], response: Response[_], tag: Option[Long]): Unit = { + val requestAttributes = createRequestAttributes(request) + val responseAttributes = createResponseAttributes(response) + + val combinedAttributes = requestAttributes.toBuilder().putAll(responseAttributes).build() + if (response.isSuccess) { - incrementCounter(responseToSuccessCounterMapper(response)) + incrementCounter(responseToSuccessCounterMapper(response), combinedAttributes) } else { - incrementCounter(requestToErrorCounterMapper(response)) + incrementCounter(requestToErrorCounterMapper(response), combinedAttributes) } - recordHistogram(responseToSizeHistogramMapper(response), response.contentLength) - recordHistogram(requestToLatencyHistogramMapper(request), tag.map(clock.millis() - _)) - updateInProgressCounter(request, -1) + + recordHistogram(responseToSizeHistogramMapper(response), response.contentLength, combinedAttributes) + recordHistogram(requestToLatencyHistogramMapper(request), tag.map(clock.millis() - _), combinedAttributes) + updateInProgressCounter(request, -1, requestAttributes) } - override def requestException(request: GenericRequest[_, _], tag: Option[Long], e: Exception): Unit = + override def requestException(request: GenericRequest[_, _], tag: Option[Long], e: Exception): Unit = { + val requestAttributes = createRequestAttributes(request) + val errorAttributes = createErrorAttributes(e) + HttpError.find(e) match { case Some(HttpError(body, statusCode)) => requestSuccessful(request, Response(body, statusCode, request.onlyMetadata), tag) case _ => - incrementCounter(requestToFailureCounterMapper(request, e)) - recordHistogram(requestToLatencyHistogramMapper(request), tag.map(clock.millis() - _)) - updateInProgressCounter(request, -1) + incrementCounter(requestToFailureCounterMapper(request, e), errorAttributes) + recordHistogram(requestToLatencyHistogramMapper(request), tag.map(clock.millis() - _), errorAttributes) + updateInProgressCounter(request, -1, requestAttributes) } + } - private def updateInProgressCounter[R, T](request: GenericRequest[T, R], delta: Long): Unit = + private def updateInProgressCounter[R, T](request: GenericRequest[T, R], delta: Long, attributes: Attributes): Unit = requestToInProgressCounterMapper(request) - .foreach(config => - getOrCreateMetric(upAndDownCounter, config, createNewUpDownCounter).add(delta, config.attributes) - ) + .foreach(config => getOrCreateMetric(upAndDownCounter, config, createNewUpDownCounter).add(delta, attributes)) - private def recordHistogram(config: Option[HistogramCollectorConfig], size: Option[Long]): Unit = config.foreach { - cfg => - getOrCreateHistogram(histograms, cfg, createNewHistogram).record(size.getOrElse(0L).toDouble, cfg.attributes) + private def recordHistogram( + config: Option[HistogramCollectorConfig], + size: Option[Long], + attributes: Attributes + ): Unit = config.foreach { cfg => + getOrCreateHistogram(histograms, cfg, createNewHistogram).record(size.getOrElse(0L).toDouble, attributes) } - private def incrementCounter(collectorConfig: Option[CollectorConfig]): Unit = + private def incrementCounter(collectorConfig: Option[CollectorConfig], attributes: Attributes): Unit = collectorConfig .foreach(config => getOrCreateMetric(counters, config, createNewCounter).add(1, config.attributes)) @@ -195,6 +211,40 @@ private class OpenTelemetryMetricsListener( b = config.description.fold(b)(b.setDescription) b.build() } + + /* + OpenTelemetry HTTP Client Metrics Spec: Mapping request attributes as per + https://opentelemetry.io/docs/specs/semconv/http/http-metrics/#http-client + * */ + private def createRequestAttributes(request: GenericRequest[_, _]): Attributes = { + val attributes = Attributes + .builder() + .put(AttributeKey.stringKey("http.request.method"), request.method.method) + .put(AttributeKey.stringKey("server.address"), request.uri.host.getOrElse("unknown")) + .put(AttributeKey.longKey("server.port"), request.uri.port.getOrElse(80)) + .build() + + attributes + } + + /* + OpenTelemetry HTTP Client Metrics Spec: Mapping response attributes as per + https://opentelemetry.io/docs/specs/semconv/http/http-metrics/#http-client + * */ + private def createResponseAttributes(response: Response[_]): Attributes = + Attributes + .builder() + .put(AttributeKey.longKey("http.response.status_code"), response.code.code) + .build() + + private def createErrorAttributes(e: Throwable): Attributes = { + val errorType = e match { + case _: java.net.UnknownHostException => "unknown_host" + case _ => e.getClass.getSimpleName + } + Attributes.builder().put("error.type", errorType).build() + } + } case class CollectorConfig( diff --git a/observability/opentelemetry-metrics-backend/src/test/scala/sttp/client4/opentelemetry/OpenTelemetryMetricsBackendTest.scala b/observability/opentelemetry-metrics-backend/src/test/scala/sttp/client4/opentelemetry/OpenTelemetryMetricsBackendTest.scala index 995cc97f4..e7a83c531 100644 --- a/observability/opentelemetry-metrics-backend/src/test/scala/sttp/client4/opentelemetry/OpenTelemetryMetricsBackendTest.scala +++ b/observability/opentelemetry-metrics-backend/src/test/scala/sttp/client4/opentelemetry/OpenTelemetryMetricsBackendTest.scala @@ -4,6 +4,7 @@ import io.opentelemetry.sdk.OpenTelemetrySdk import io.opentelemetry.sdk.metrics.SdkMeterProvider import io.opentelemetry.sdk.metrics.data.{HistogramPointData, MetricData} import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader +import io.opentelemetry.api.common.{AttributeKey, Attributes} import org.scalatest.OptionValues import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers @@ -166,7 +167,7 @@ class OpenTelemetryMetricsBackendTest extends AnyFlatSpec with Matchers with Opt getHistogramValue(reader, OpenTelemetryMetricsBackend.DefaultResponseSizeHistogramName).value.getSum shouldBe 50 } - it should "use histogram for request latencies" in { + it should "use histogram for request latencies and validate attributes" in { // given val response = ResponseStub("Ok", StatusCode.Ok, "Ok", Seq(Header.contentLength(10))) val backendStub = SyncBackendStub.whenAnyRequest.thenRespond(response) @@ -177,7 +178,8 @@ class OpenTelemetryMetricsBackendTest extends AnyFlatSpec with Matchers with Opt (0 until 5).foreach(_ => basicRequest.get(uri"http://127.0.0.1/foo").send(backend)) // then - getHistogramValue(reader, OpenTelemetryMetricsBackend.DefaultLatencyHistogramName).map(_.getSum) should not be empty + val metrics = reader.collectAllMetrics().asScala.toList + specTest(metrics, OpenTelemetryMetricsBackend.DefaultLatencyHistogramName) } it should "use error counter when http error is thrown" in { @@ -238,6 +240,20 @@ class OpenTelemetryMetricsBackendTest extends AnyFlatSpec with Matchers with Opt getMetricValue(reader, OpenTelemetryMetricsBackend.DefaultErrorCounterName) shouldBe None } + it should "validate http.client.request.duration semantic conventions" in { + // given + val reader = InMemoryMetricReader.create() + val backend = OpenTelemetryMetricsBackend(stubAlwaysOk, spawnNewOpenTelemetry(reader)) + + // when + basicRequest.get(uri"http://127.0.0.1/foo").send(backend) + + // then + val metrics = reader.collectAllMetrics().asScala.toList + val expectedMetricName = "http.client.request.duration" + specTest(metrics, expectedMetricName) + } + private[this] def getMetricValue(reader: InMemoryMetricReader, name: String): Option[Long] = reader .collectAllMetrics() @@ -261,4 +277,25 @@ class OpenTelemetryMetricsBackendTest extends AnyFlatSpec with Matchers with Opt .find(_.getName.equals(name)) .head + private[this] def specTest(metrics: List[MetricData], expectedMetricName: String): Unit = { + val metric = metrics.find(_.getName == expectedMetricName) + assert( + metric.isDefined, + s"$expectedMetricName metric is missing. Available [${metrics.map(_.getName).mkString(", ")}]" + ) + + val clue = s"[$expectedMetricName] has a mismatched property" + + metric.foreach { md => + assert(md.getName == expectedMetricName, clue) + assert(md.getUnit == "ms", clue) + + md.getHistogramData.getPoints.forEach { point => + val attributes = point.getAttributes + assert(attributes.get(AttributeKey.stringKey("http.request.method")) == "GET") + assert(attributes.get(AttributeKey.stringKey("server.address")) == "127.0.0.1") + assert(attributes.get(AttributeKey.longKey("http.response.status_code")) == 200L) + } + } + } }