Skip to content

Commit

Permalink
Merge branch '1.1.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
marcingrzejszczak committed Sep 13, 2023
2 parents 5a2490b + 89b4e94 commit 6ab449f
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ dependencies {
testImplementation 'io.zipkin.brave:brave-instrumentation-http-tests'
testImplementation 'ch.qos.logback:logback-classic'
testImplementation 'io.projectreactor:reactor-core'
testImplementation 'io.projectreactor:reactor-core-micrometer'
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,15 @@
*/
package io.micrometer.tracing.brave.bridge;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import brave.Tracing;
import brave.baggage.BaggageField;
Expand All @@ -30,15 +37,21 @@
import io.micrometer.common.util.internal.logging.InternalLogger;
import io.micrometer.common.util.internal.logging.InternalLoggerFactory;
import io.micrometer.context.ContextRegistry;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import io.micrometer.tracing.BaggageInScope;
import io.micrometer.tracing.CurrentTraceContext;
import io.micrometer.tracing.Span;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.contextpropagation.ObservationAwareSpanThreadLocalAccessor;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import reactor.core.observability.micrometer.Micrometer;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import static org.assertj.core.api.BDDAssertions.then;
Expand Down Expand Up @@ -74,6 +87,8 @@ class BaggageTests {

Tracer tracer = new BraveTracer(this.braveTracer, this.bridgeContext, new BraveBaggageManager());

ObservationRegistry observationRegistry = ObservationThreadLocalAccessor.getInstance().getObservationRegistry();

@AfterEach
void cleanup() {
tracing.close();
Expand Down Expand Up @@ -162,9 +177,11 @@ void injectAndExtractKeepsTheBaggageWithLegacyApi() {
}

@Test
void baggageWithContextPropagation() {
void baggageWithContextPropagation() throws InterruptedException, ExecutionException, TimeoutException {
ContextRegistry.getInstance().registerThreadLocalAccessor(new ObservationAwareSpanThreadLocalAccessor(tracer));
Hooks.enableAutomaticContextPropagation();
ExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
Scheduler scheduler = Schedulers.fromExecutor(executorService);

Span span = tracer.nextSpan().start();
try (Tracer.SpanInScope spanInScope = tracer.withSpan(span)) {
Expand All @@ -174,14 +191,23 @@ void baggageWithContextPropagation() {
log.info(
"BAGGAGE OUTSIDE OF REACTOR [" + baggageOutside + "], thread [" + Thread.currentThread() + "]");
String baggageFromReactor = Mono.just(KEY_1)
.delayElement(Duration.ofMillis(1), scheduler)
.tap(Micrometer.observation(observationRegistry))
.publishOn(Schedulers.boundedElastic())
.flatMap(s -> Mono.just(this.tracer.getBaggage(s).get())
.doOnNext(baggage -> log.info("BAGGAGE IN OF REACTOR [" + baggageOutside + "], thread ["
+ Thread.currentThread() + "]")))
.block();
then(baggageFromReactor).isEqualTo(VALUE_1);
}
then(tracer.currentSpan()).isEqualTo(span);
}
then(tracer.currentSpan()).isNull();

Future<Boolean> submit = executorService.submit(() -> tracer.currentSpan() == null);
boolean noCurrentSpan = submit.get(1, TimeUnit.SECONDS);

Assertions.assertThat(noCurrentSpan).isTrue();
}

@Test
Expand All @@ -207,4 +233,9 @@ void baggageWithContextPropagationWithLegacyApi() {
}
}

@AfterAll
static void clear() {
ContextRegistry.getInstance().removeThreadLocalAccessor(ObservationAwareSpanThreadLocalAccessor.KEY);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ dependencies {
testImplementation 'org.mockito:mockito-core'
testImplementation 'ch.qos.logback:logback-classic'
testImplementation 'io.projectreactor:reactor-core'
testImplementation 'io.projectreactor:reactor-core-micrometer'
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,22 @@
*/
package io.micrometer.tracing.otel.bridge;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import io.micrometer.common.util.internal.logging.InternalLogger;
import io.micrometer.common.util.internal.logging.InternalLoggerFactory;
import io.micrometer.context.ContextRegistry;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import io.micrometer.tracing.BaggageInScope;
import io.micrometer.tracing.Span;
import io.micrometer.tracing.Tracer;
Expand All @@ -30,15 +43,15 @@
import io.opentelemetry.extension.trace.propagation.B3Propagator;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import reactor.core.observability.micrometer.Micrometer;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.assertj.core.api.BDDAssertions.then;

class BaggageTests {
Expand Down Expand Up @@ -78,6 +91,8 @@ class BaggageTests {
Tracer tracer = new OtelTracer(otelTracer, otelCurrentTraceContext, event -> {
}, otelBaggageManager);

ObservationRegistry observationRegistry = ObservationThreadLocalAccessor.getInstance().getObservationRegistry();

@Test
void canSetAndGetBaggage() {
// GIVEN
Expand Down Expand Up @@ -135,9 +150,11 @@ void injectAndExtractKeepsTheBaggage() {
}

@Test
void baggageWithContextPropagation() {
void baggageWithContextPropagation() throws InterruptedException, ExecutionException, TimeoutException {
ContextRegistry.getInstance().registerThreadLocalAccessor(new ObservationAwareSpanThreadLocalAccessor(tracer));
Hooks.enableAutomaticContextPropagation();
ExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
Scheduler scheduler = Schedulers.fromExecutor(executorService);

Span span = tracer.nextSpan().start();
try (Tracer.SpanInScope spanInScope = tracer.withSpan(span)) {
Expand All @@ -147,14 +164,23 @@ void baggageWithContextPropagation() {
log.info(
"BAGGAGE OUTSIDE OF REACTOR [" + baggageOutside + "], thread [" + Thread.currentThread() + "]");
String baggageFromReactor = Mono.just(KEY_1)
.delayElement(Duration.ofMillis(1), scheduler)
.tap(Micrometer.observation(observationRegistry))
.publishOn(Schedulers.boundedElastic())
.flatMap(s -> Mono.just(this.tracer.getBaggage(s).get())
.doOnNext(baggage -> log.info("BAGGAGE IN OF REACTOR [" + baggageOutside + "], thread ["
+ Thread.currentThread() + "]")))
.block();
then(baggageFromReactor).isEqualTo(VALUE_1);
then(tracer.currentSpan()).isEqualTo(span);
}
}
then(tracer.currentSpan()).isNull();

Future<Boolean> submit = executorService.submit(() -> tracer.currentSpan() == null);
boolean noCurrentSpan = submit.get(1, TimeUnit.SECONDS);

Assertions.assertThat(noCurrentSpan).isTrue();
}

@Test
Expand All @@ -180,4 +206,8 @@ void baggageWithContextPropagationWithLegacyApi() {
}
}

@AfterAll
static void clear() {
ContextRegistry.getInstance().removeThreadLocalAccessor(ObservationAwareSpanThreadLocalAccessor.KEY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
*/
package io.micrometer.tracing.contextpropagation;

import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

import io.micrometer.common.util.internal.logging.InternalLogger;
import io.micrometer.common.util.internal.logging.InternalLoggerFactory;
import io.micrometer.context.ContextRegistry;
Expand All @@ -26,12 +32,6 @@
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.handler.TracingObservationHandler;

import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

/**
* A {@link ThreadLocalAccessor} to put and restore current {@link Span} depending on
* whether {@link ObservationThreadLocalAccessor} did some work or not (if
Expand Down Expand Up @@ -121,11 +121,9 @@ public Span getValue() {
public void setValue(Span value) {
SpanAction spanAction = spanActions.get(Thread.currentThread());
Tracer.SpanInScope scope = this.tracer.withSpan(value);
if (spanAction == null) {
spanAction = new SpanAction(spanActions);
spanActions.put(Thread.currentThread(), spanAction);
}
spanAction.setScope(scope);
SpanAction newSpanAction = new SpanAction(spanActions, spanAction);
spanActions.put(Thread.currentThread(), newSpanAction);
newSpanAction.setScope(scope);
}

@Override
Expand Down Expand Up @@ -171,8 +169,8 @@ static class SpanAction implements Closeable {

Closeable scope;

SpanAction(Map<Thread, SpanAction> spanActions) {
this.previous = spanActions.get(Thread.currentThread());
SpanAction(Map<Thread, SpanAction> spanActions, SpanAction previous) {
this.previous = previous;
this.todo = spanActions;
}

Expand Down

0 comments on commit 6ab449f

Please sign in to comment.