Skip to content

Commit

Permalink
KVL-874 Add unit tests for Telemetry (#9500)
Browse files Browse the repository at this point in the history
* Add unit tests for Telemetry
CHANGELOG_BEGIN
CHANGELOG_END

* Make Tracers injectable, improve the TelemetryContextSpec, add a metrics-test-lib package
  • Loading branch information
hubert-da authored Apr 27, 2021
1 parent c567680 commit d761853
Show file tree
Hide file tree
Showing 11 changed files with 815 additions and 28 deletions.
4 changes: 3 additions & 1 deletion bazel-java-deps.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ def install_java_deps():
"io.dropwizard.metrics:metrics-graphite:4.1.2",
"io.dropwizard.metrics:metrics-jmx:4.1.2",
"io.dropwizard.metrics:metrics-jvm:4.1.2",
"io.opentelemetry:opentelemetry-context:0.16.0",
"io.opentelemetry:opentelemetry-api:0.16.0",
"io.opentelemetry:opentelemetry-context:0.16.0",
"io.opentelemetry:opentelemetry-sdk-testing:0.16.0",
"io.opentelemetry:opentelemetry-sdk-trace:0.16.0",
"io.prometheus:simpleclient:0.8.1",
"io.prometheus:simpleclient_dropwizard:0.8.1",
"io.prometheus:simpleclient_httpserver:0.8.1",
Expand Down
28 changes: 27 additions & 1 deletion ledger/metrics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,48 @@ da_scala_library(
],
)

da_scala_library(
name = "metrics-test-lib",
srcs = glob(["src/test/lib/scala/**/*.scala"]),
versioned_scala_deps = {
"2.12": ["@maven//:org_scala_lang_modules_scala_collection_compat"],
},
visibility = [
"//visibility:public",
],
runtime_deps = [],
deps = [
":metrics",
"@maven//:io_opentelemetry_opentelemetry_api",
"@maven//:io_opentelemetry_opentelemetry_sdk_testing",
"@maven//:io_opentelemetry_opentelemetry_sdk_trace",
],
)

da_scala_test_suite(
name = "metrics-tests",
size = "small",
srcs = glob(["src/test/scala/**/*.scala"]),
srcs = glob(["src/test/suite/scala/**/*.scala"]),
scala_deps = [
"@maven//:com_typesafe_akka_akka_actor",
"@maven//:com_typesafe_akka_akka_stream",
"@maven//:org_scalactic_scalactic",
"@maven//:org_scalatest_scalatest",
],
versioned_scala_deps = {
"2.12": ["@maven//:org_scala_lang_modules_scala_collection_compat"],
},
deps = [
":metrics",
":metrics-test-lib",
"//ledger-api/rs-grpc-akka",
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
"//libs-scala/concurrent",
"@maven//:io_dropwizard_metrics_metrics_core",
"@maven//:io_opentelemetry_opentelemetry_api",
"@maven//:io_opentelemetry_opentelemetry_context",
"@maven//:io_opentelemetry_opentelemetry_sdk_testing",
"@maven//:io_opentelemetry_opentelemetry_sdk_trace",
],
)
25 changes: 12 additions & 13 deletions ledger/metrics/src/main/scala/com/daml/telemetry/Telemetry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,17 @@ package com.daml.telemetry

import java.util.{Map => jMap}

import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.{Span, Tracer}

import scala.concurrent.Future

trait Telemetry {
/** @param tracer An OpenTelemetry Tracer that can be used for building spans. */
abstract class Telemetry(protected val tracer: Tracer) {

/** Returns a telemetry context from the OpenTelemetry context stored in the gRPC
* thread local context.
* This is used to recover the tracing metadata from an incoming gRPC call,
* and used for the subsequent spans.
*
* Current implementation only works with OpenTelemetry < 0.10. Later versions of
* OpenTelemetry use a different context object.
*/
def contextFromGrpcThreadLocalContext(): TelemetryContext

Expand Down Expand Up @@ -71,32 +69,33 @@ trait Telemetry {
protected def rootContext: TelemetryContext
}

/** Default implementation of Telemetry. Uses OpenTelemetry to generate and gather traces.
*/
object DefaultTelemetry extends Telemetry {
abstract class DefaultTelemetry(override protected val tracer: Tracer) extends Telemetry(tracer) {

override def contextFromGrpcThreadLocalContext(): TelemetryContext = {
DefaultTelemetryContext(Span.current)
DefaultTelemetryContext(tracer, Span.current)
}

override def contextFromMetadata(metadata: Option[jMap[String, String]]): TelemetryContext = {
metadata
.flatMap(Tracing.decodeTraceMetadata) match {
case None =>
RootDefaultTelemetryContext
RootDefaultTelemetryContext(tracer)
case Some(context) =>
DefaultTelemetryContext(Span.fromContext(context))
DefaultTelemetryContext(tracer, Span.fromContext(context))
}
}

override protected def rootContext: TelemetryContext = RootDefaultTelemetryContext
override protected def rootContext: TelemetryContext = RootDefaultTelemetryContext(tracer)
}

/** Default implementation of Telemetry. Uses OpenTelemetry to generate and gather traces. */
object DefaultTelemetry extends DefaultTelemetry(OpenTelemetryTracer)

/** Implementation of Telemetry that does nothing.
*
* It always returns NoOpTelemetryContext, and just executes without modification any given code block function.
*/
object NoOpTelemetry extends Telemetry {
object NoOpTelemetry extends Telemetry(Tracer.getDefault) {

override def contextFromGrpcThreadLocalContext(): TelemetryContext = NoOpTelemetryContext

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package com.daml.telemetry
import java.util.{HashMap => jHashMap, Map => jMap}

import com.daml.dec.DirectExecutionContext
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.{Span, Tracer}
import io.opentelemetry.context.Context

import scala.concurrent.Future
Expand Down Expand Up @@ -81,7 +81,8 @@ trait TelemetryContext {

/** Default implementation of TelemetryContext. Uses OpenTelemetry to generate and gather traces.
*/
protected class DefaultTelemetryContext(protected val span: Span) extends TelemetryContext {
protected class DefaultTelemetryContext(protected val tracer: Tracer, protected val span: Span)
extends TelemetryContext {

def setAttribute(attribute: SpanAttribute, value: String): TelemetryContext = {
span.setAttribute(attribute.key, value)
Expand All @@ -97,15 +98,14 @@ protected class DefaultTelemetryContext(protected val span: Span) extends Teleme
): Future[T] = {
val subSpan = createSubSpan(spanName, kind, attributes: _*)

val result = body(DefaultTelemetryContext(subSpan))
result.onComplete {
val result = body(DefaultTelemetryContext(tracer, subSpan))
result.andThen {
case Failure(t) =>
subSpan.recordException(t)
subSpan.end()
case Success(_) =>
subSpan.end()
}(DirectExecutionContext)
result
}

override def runInNewSpan[T](
Expand All @@ -118,7 +118,7 @@ protected class DefaultTelemetryContext(protected val span: Span) extends Teleme
val subSpan = createSubSpan(spanName, kind, attributes: _*)

try {
body(DefaultTelemetryContext(subSpan))
body(DefaultTelemetryContext(tracer, subSpan))
} finally {
subSpan.end()
}
Expand All @@ -130,7 +130,7 @@ protected class DefaultTelemetryContext(protected val span: Span) extends Teleme
attributes: (SpanAttribute, String)*
): Span = {
val subSpan =
OpenTelemetryTracer
tracer
.spanBuilder(spanName)
.setParent(Context.current.`with`(span))
.setSpanKind(kind.kind)
Expand Down Expand Up @@ -159,18 +159,19 @@ protected class DefaultTelemetryContext(protected val span: Span) extends Teleme
}

object DefaultTelemetryContext {
def apply(span: Span): DefaultTelemetryContext =
new DefaultTelemetryContext(span)
def apply(tracer: Tracer, span: Span): DefaultTelemetryContext =
new DefaultTelemetryContext(tracer, span)
}

protected object RootDefaultTelemetryContext extends DefaultTelemetryContext(Span.getInvalid) {
protected class RootDefaultTelemetryContext(override protected val tracer: Tracer)
extends DefaultTelemetryContext(tracer, Span.getInvalid) {
override protected def createSubSpan(
spanName: String,
kind: SpanKind,
attributes: (SpanAttribute, String)*
): Span = {
val subSpan =
OpenTelemetryTracer.spanBuilder(spanName).setNoParent().setSpanKind(kind.kind).startSpan()
tracer.spanBuilder(spanName).setNoParent().setSpanKind(kind.kind).startSpan()
for {
(attribute, value) <- attributes
} {
Expand All @@ -180,6 +181,11 @@ protected object RootDefaultTelemetryContext extends DefaultTelemetryContext(Spa
}
}

object RootDefaultTelemetryContext {
def apply(tracer: Tracer): RootDefaultTelemetryContext =
new RootDefaultTelemetryContext(tracer)
}

/** Implementation of Telemetry that does nothing.
*
* It always returns NoOpTelemetryContext, and just executes without modification any given code block function.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.telemetry

import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter
import io.opentelemetry.sdk.trace.SdkTracerProvider
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor

import scala.jdk.CollectionConverters._

trait TelemetrySpecBase {

protected val anInstrumentationName = "com.daml.telemetry.TelemetrySpec"
protected val aSpanName = "aSpan"
protected val anApplicationIdSpanAttribute: (SpanAttribute, String) =
SpanAttribute.ApplicationId -> "anApplicationId"
protected val aCommandIdSpanAttribute: (SpanAttribute, String) =
SpanAttribute.CommandId -> "aCommandId"

protected val spanExporter: InMemorySpanExporter = InMemorySpanExporter.create
protected val tracerProvider: SdkTracerProvider = SdkTracerProvider
.builder()
.addSpanProcessor(SimpleSpanProcessor.create(spanExporter))
.build()

protected object TestTelemetry extends DefaultTelemetry(tracerProvider.get(anInstrumentationName))

protected implicit class RichInMemorySpanExporter(exporter: InMemorySpanExporter) {
def finishedSpanAttributes: Map[SpanAttribute, String] = {
val finishedSpans = exporter.getFinishedSpanItems.asScala
finishedSpans.flatMap { span =>
val attributes = span.getAttributes.asMap.asScala
attributes.map { case (key, value) =>
SpanAttribute(key.toString) -> value.toString
}
}.toMap
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.telemetry

import io.opentelemetry.api.trace.Span
import io.opentelemetry.context.Context
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import org.scalatest.{Assertion, BeforeAndAfterEach}

/** Other cases are covered by [[TelemetrySpec]] */
class TelemetryContextSpec
extends TelemetrySpecBase
with AnyWordSpecLike
with Matchers
with BeforeAndAfterEach {

override protected def afterEach(): Unit = spanExporter.reset()

"DefaultTelemetryContext.runInOpenTelemetryScope" should {
"run a body and create a current context with a span" in {
val tracer = tracerProvider.get(anInstrumentationName)
val span = tracer
.spanBuilder(aSpanName)
.setAttribute(
anApplicationIdSpanAttribute._1.key,
anApplicationIdSpanAttribute._2,
)
.startSpan()

runInOpenTelemetryScopeAndAssert(DefaultTelemetryContext(tracer, span))

val attributes = spanExporter.finishedSpanAttributes
attributes should contain(anApplicationIdSpanAttribute)
}
}

"RootDefaultTelemetryContext.runInOpenTelemetryScope" should {
"run a body" in {
val tracer = tracerProvider.get(anInstrumentationName)
runInOpenTelemetryScopeAndAssert(RootDefaultTelemetryContext(tracer))
}
}

"NoOpTelemetryContext.runInOpenTelemetryScope" should {
"run a body" in {
runInOpenTelemetryScopeAndAssert(NoOpTelemetryContext)
}
}

private def runInOpenTelemetryScopeAndAssert(telemetryContext: TelemetryContext): Assertion = {
var placeholder: Option[_] = None
telemetryContext.runInOpenTelemetryScope {
Span
.fromContext(Context.current())
.end() // end the span from the current context to be able to make assertions on its attributes
placeholder = Some(())
}
placeholder shouldBe Some(())
}
}
Loading

0 comments on commit d761853

Please sign in to comment.