Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
vietk committed Sep 26, 2024
2 parents b7e7d3e + 9839fcf commit 3ee5f8c
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.google.protobuf.util.JsonFormat;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.baggage.Baggage;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.StatusCode;
Expand Down Expand Up @@ -168,24 +169,28 @@ public void process(Record<KIn, VIn> record) {
final TextMapPropagator propagator = openTelemetry.getPropagators().getTextMapPropagator();

// going through all propagation field names defined in the OTel configuration
// we look if any of them has been set with a non null value in the headers of the incoming message
// we look if any of them has been set with a non-null value in the headers of the incoming message
Context extractedContext = null;
if (propagator.fields()
.stream()
.map(record.headers()::lastHeader)
.anyMatch(Objects::nonNull)) {
// if that is the case, let's extract a Context initialized with the parent trace id and span id
// present as headers in the incoming message
Context extractedContext = propagator.extract(Context.current(), record.headers(), textMapGetter);
// if that is the case, let's extract a Context initialized with the parent trace id, span id
// and baggage present as headers in the incoming message
extractedContext = propagator.extract(Context.current(), record.headers(), textMapGetter);
// use the context as parent span for the built span
spanBuilder.setParent(extractedContext);
// we clean the headers to avoid their propagation in any outgoing message (knowing that by
// default Kafka Streams copies all headers of the incoming message into any outgoing message)
propagator.fields().forEach(record.headers()::remove);
}
Span span = spanBuilder.startSpan();
try (Scope scope = span.makeCurrent()) {
// baggage need to be explicitly set as current otherwise it is not propagated (baggage is independent of span
// in opentelemetry) and actually lost as kafka headers are cleaned
try (Scope ignored = (extractedContext != null) ? Baggage.fromContext(extractedContext).makeCurrent() : Scope.noop();
Scope scope = span.makeCurrent()) {
try {
// now that the context has been to the new started child span of this microservice we replace
// now that the context has been set to the new started child span of this microservice, we replace
// the headers in the incoming message so when an outgoing message is produced with the copied
// header values it already has the span id from this new child span
propagator.inject(Context.current(), record.headers(), textMapSetter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public final class KafkaStreamsProcessorHeaders {
*/
public static final String W3C_TRACE_ID = "traceparent";

/**
* W3C tracing baggage. It is propagated by the opentelemetry if configured to do so.
*/
public static final String W3C_BAGGAGE = "baggage";

/**
* The reason of the failure.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor;

import static io.opentelemetry.sdk.testing.assertj.TracesAssert.assertThat;
import static io.quarkiverse.kafkastreamsprocessor.impl.protocol.KafkaStreamsProcessorHeaders.W3C_BAGGAGE;
import static io.quarkiverse.kafkastreamsprocessor.impl.protocol.KafkaStreamsProcessorHeaders.W3C_TRACE_ID;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -70,6 +72,8 @@
import com.google.protobuf.util.JsonFormat;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.baggage.Baggage;
import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanId;
Expand All @@ -78,8 +82,12 @@
import io.opentelemetry.api.trace.TraceId;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension;
import io.opentelemetry.sdk.trace.IdGenerator;
import io.quarkiverse.kafkastreamsprocessor.impl.TestException;
Expand Down Expand Up @@ -313,6 +321,31 @@ void shouldLogRawToStringValueIfNotProtobuf() throws Throwable {
assertThat(getLogs(), hasItem(allOf(containsString("DEBUG"), containsString("time=1970-01-01T00:00Z"))));
}

@Test
void shouldPropagateOpentelemetryW3CBaggage() {
// header value format here: https://www.w3.org/TR/baggage/#baggage-http-header-format
Headers headers = new RecordHeaders().add(W3C_TRACE_ID, TRACE_PARENT.getBytes())
.add(W3C_BAGGAGE, "key1=value1,key2=value2".getBytes());
Record<String, Ping> record = new Record<>(null, Ping.newBuilder().setMessage("blabla").build(), 0L, headers);

// the opentelemetry injected and used throughout the unit tests in this class is not configured with W3C baggage propagator
// as coming from the OpenTelemetryExtension, so we need to create a new one with baggage propagator.
try (OpenTelemetrySdk openTelemetryWithBaggageSdk = OpenTelemetrySdk.builder()
.setPropagators(ContextPropagators.create(TextMapPropagator.composite(W3CTraceContextPropagator.getInstance(),
W3CBaggagePropagator.getInstance())))
.build()) {
decorator = new TracingDecorator<>(new LogOpentelemetryBaggageProcessor(), openTelemetryWithBaggageSdk,
kafkaTextMapGetter, kafkaTextMapSetter, openTelemetryWithBaggageSdk.getTracer("test"), PROCESSOR_NAME,
jsonPrinter);
decorator.init(processorContext);

decorator.process(record);

assertThat(getLogs(), hasItem(allOf(containsString("DEBUG"), containsString("baggage: key1 value1"))));
assertThat(getLogs(), hasItem(allOf(containsString("DEBUG"), containsString("baggage: key2 value2"))));
}
}

@Slf4j
static class ReadMDCProcessor implements Processor<String, Ping, String, Ping> {
String traceId;
Expand All @@ -331,6 +364,14 @@ public void process(Record<String, Ping> record) {
}
}

@Slf4j
static class LogOpentelemetryBaggageProcessor implements Processor<String, Ping, String, Ping> {
@Override
public void process(Record<String, Ping> record) {
Baggage.current().forEach((key, baggageEntry) -> log.debug("baggage: {} {}", key, baggageEntry.getValue()));
}
}

public static String w3cHeader(String traceId, String spanId) {
return String.format("00-%s-%s-01", StringUtils.leftPad(traceId, TraceId.getLength(), '0'),
StringUtils.leftPad(spanId, SpanId.getLength(), '0'));
Expand Down

0 comments on commit 3ee5f8c

Please sign in to comment.