From 66585ba174ebe6cba9b0863e96f81cb29a64ffca Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Mon, 12 Feb 2018 03:49:22 +0300 Subject: [PATCH] Deflakes async tests by switching to concurrent blocking queue (#606) This removes some Thread.sleep and other guards from tests by changing assertions to use a blocking queue. It also improves some of the state checking, particularly around leaked trace contexts. This uncovered two where async instrumentation (vertx and netty) redundantly scoped a span. --- .../grpc/ITTracingClientInterceptor.java | 90 ++++++++----- .../grpc/ITTracingServerInterceptor.java | 59 +++++---- .../src/main/java/brave/http/ITHttp.java | 116 ++++++++++++++--- .../java/brave/http/ITHttpAsyncClient.java | 54 ++++++++ .../main/java/brave/http/ITHttpClient.java | 119 ++++++++---------- .../main/java/brave/http/ITHttpServer.java | 103 +++++++-------- .../java/brave/http/ITServlet25Container.java | 2 + .../java/brave/http/ITServlet3Container.java | 6 +- .../java/brave/http/HttpServerHandler.java | 30 +++-- .../ITTracingHttpAsyncClientBuilder.java | 6 +- .../ITTracingHttpClientBuilder.java | 7 +- .../brave/jaxrs2/ITTracingFeature_Client.java | 35 +++--- .../jaxrs2/ITTracingFeature_Container.java | 6 +- .../sampling/ITDeclarativeSampling.java | 10 +- .../brave/kafka/clients/ITKafkaTracing.java | 92 ++++++++------ .../mysql/ITTracingStatementInterceptor.java | 5 +- .../mysql6/ITTracingStatementInterceptor.java | 5 +- .../brave/netty/http/NettyHttpTracing.java | 3 + .../netty/http/TracingHttpServerHandler.java | 15 ++- .../brave/okhttp3/ITTracingCallFactory.java | 18 ++- .../brave/okhttp3/ITTracingInterceptor.java | 7 +- .../java/brave/p6spy/ITTracingP6Factory.java | 7 +- .../p6spy/TracingJdbcEventListenerTest.java | 7 +- .../java/brave/sparkjava/ITSparkTracing.java | 20 +-- ...ITTracingClientHttpRequestInterceptor.java | 6 +- ...cingAsyncClientHttpRequestInterceptor.java | 34 ++--- ...ITTracingClientHttpRequestInterceptor.java | 18 ++- .../web/TracingRoutingContextHandler.java | 117 ++++++++++------- .../brave/vertx/web/ITVertxWebTracing.java | 36 ++++-- 29 files changed, 631 insertions(+), 402 deletions(-) create mode 100644 instrumentation/http-tests/src/main/java/brave/http/ITHttpAsyncClient.java diff --git a/instrumentation/grpc/src/test/java/brave/grpc/ITTracingClientInterceptor.java b/instrumentation/grpc/src/test/java/brave/grpc/ITTracingClientInterceptor.java index 13e159a4aa..81cba33820 100644 --- a/instrumentation/grpc/src/test/java/brave/grpc/ITTracingClientInterceptor.java +++ b/instrumentation/grpc/src/test/java/brave/grpc/ITTracingClientInterceptor.java @@ -24,10 +24,12 @@ import io.grpc.examples.helloworld.HelloReply; import io.grpc.examples.helloworld.HelloRequest; import java.io.IOException; +import java.util.Arrays; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -45,7 +47,11 @@ public class ITTracingClientInterceptor { Logger testLogger = LogManager.getLogger(); - ConcurrentLinkedDeque spans = new ConcurrentLinkedDeque<>(); + /** + * See brave.http.ITHttp for rationale on using a concurrent blocking queue eventhough some calls, + * like those using blocking clients, happen on the main thread. + */ + BlockingQueue spans = new LinkedBlockingQueue<>(); GrpcTracing tracing = GrpcTracing.create(tracingBuilder(Sampler.ALWAYS_SAMPLE).build()); Tracer tracer = tracing.tracing().tracer(); @@ -60,6 +66,10 @@ public class ITTracingClientInterceptor { @After public void close() throws Exception { closeClient(client); server.stop(); + // From brave.http.ITHttp.close + assertThat(spans.poll(100, TimeUnit.MILLISECONDS)) + .withFailMessage("Span remaining in queue. Check for exception or redundant reporting") + .isNull(); Tracing current = Tracing.current(); if (current != null) current.close(); } @@ -86,6 +96,8 @@ void closeClient(ManagedChannel client) throws Exception { TraceContext context = server.takeRequest().context(); assertThat(context.parentId()).isNull(); assertThat(context.sampled()).isTrue(); + + spans.take(); } @Test public void makesChildOfCurrentSpan() throws Exception { @@ -101,6 +113,11 @@ void closeClient(ManagedChannel client) throws Exception { .isEqualTo(parent.context().traceId()); assertThat(context.parentId()) .isEqualTo(parent.context().spanId()); + + // we report one local and one client span + assertThat(Arrays.asList(spans.take(), spans.take())) + .extracting(Span::kind) + .containsOnly(null, Span.Kind.CLIENT); } /** @@ -131,6 +148,11 @@ void closeClient(ManagedChannel client) throws Exception { } finally { otherSpan.finish(); } + + // Check we reported 2 local spans and 2 client spans + assertThat(Arrays.asList(spans.take(), spans.take(), spans.take(), spans.take())) + .extracting(Span::kind) + .containsOnly(null, Span.Kind.CLIENT); } /** Unlike Brave 3, Brave 4 propagates trace ids even when unsampled */ @@ -143,25 +165,39 @@ void closeClient(ManagedChannel client) throws Exception { TraceContextOrSamplingFlags extracted = server.takeRequest(); assertThat(extracted.sampled()).isFalse(); + + // @After will check that nothing is reported } @Test public void reportsClientKindToZipkin() throws Exception { GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST); - assertThat(spans) - .extracting(Span::kind) - .containsExactly(Span.Kind.CLIENT); + Span span = spans.take(); + assertThat(span.kind()) + .isEqualTo(Span.Kind.CLIENT); } @Test public void defaultSpanNameIsMethodName() throws Exception { GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST); - assertThat(spans) - .extracting(Span::name) - .containsExactly("helloworld.greeter/sayhello"); + Span span = spans.take(); + assertThat(span.name()) + .isEqualTo("helloworld.greeter/sayhello"); + } + + @Test public void onTransportException_reportsSpan() throws Exception { + spanFromTransportException(); + } + + @Test public void onTransportException_addsErrorTag() throws Exception { + Span span = spanFromTransportException(); + assertThat(span.tags()).containsExactly( + entry("error", "UNAVAILABLE"), + entry("grpc.status_code", "UNAVAILABLE") + ); } - @Test public void reportsSpanOnTransportException() throws Exception { + Span spanFromTransportException() throws InterruptedException { server.stop(); try { @@ -170,10 +206,7 @@ void closeClient(ManagedChannel client) throws Exception { } catch (StatusRuntimeException e) { } - assertThat(spans).flatExtracting(s -> s.tags().entrySet()).containsExactly( - entry("error", "UNAVAILABLE"), - entry("grpc.status_code", "UNAVAILABLE") - ); + return spans.take(); } @Test public void addsErrorTag_onUnimplemented() throws Exception { @@ -183,30 +216,21 @@ void closeClient(ManagedChannel client) throws Exception { } catch (StatusRuntimeException e) { } - assertThat(spans).flatExtracting(s -> s.tags().entrySet()).containsExactly( + Span span = spans.take(); + assertThat(span.tags()).containsExactly( entry("error", "UNIMPLEMENTED"), entry("grpc.status_code", "UNIMPLEMENTED") ); } - @Test public void addsErrorTag_onTransportException() throws Exception { - reportsSpanOnTransportException(); - - assertThat(spans).flatExtracting(s -> s.tags().entrySet()).containsExactly( - entry("error", "UNAVAILABLE"), - entry("grpc.status_code", "UNAVAILABLE") - ); - } - @Test public void addsErrorTag_onCanceledFuture() throws Exception { server.enqueueDelay(TimeUnit.SECONDS.toMillis(1)); ListenableFuture resp = GreeterGrpc.newFutureStub(client).sayHello(HELLO_REQUEST); assumeTrue("lost race on cancel", resp.cancel(true)); - close(); // blocks until the cancel finished - - assertThat(spans).flatExtracting(s -> s.tags().entrySet()).containsExactly( + Span span = spans.take(); + assertThat(span.tags()).containsExactly( entry("error", "CANCELLED"), entry("grpc.status_code", "CANCELLED") ); @@ -244,6 +268,8 @@ public void start(Listener responseListener, Metadata headers) { assertThat(scopes) .containsKeys("before", "start"); + + spans.take(); } @Test public void clientParserTest() throws Exception { @@ -272,8 +298,9 @@ protected String spanName(MethodDescriptor methodDesc GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST); - assertThat(spans.getFirst().name()).isEqualTo("unary"); - assertThat(spans).flatExtracting(s -> s.tags().keySet()).containsExactlyInAnyOrder( + Span span = spans.take(); + assertThat(span.name()).isEqualTo("unary"); + assertThat(span.tags()).containsKeys( "grpc.message_received", "grpc.message_sent", "grpc.message_received.visible", "grpc.message_sent.visible" ); @@ -294,14 +321,15 @@ public void clientParserTestStreamingResponse() throws Exception { Iterator replies = GreeterGrpc.newBlockingStub(client) .sayHelloWithManyReplies(HelloRequest.newBuilder().setName("this is dog").build()); assertThat(replies).hasSize(10); - assertThat(spans).hasSize(1); + + Span span = spans.take(); // all response messages are tagged to the same span - assertThat(spans.getFirst().tags()).hasSize(10); + assertThat(span.tags()).hasSize(10); } Tracing.Builder tracingBuilder(Sampler sampler) { return Tracing.newBuilder() - .spanReporter((zipkin2.reporter.Reporter) spans::add) + .spanReporter(spans::add) .currentTraceContext( // connect to log4j ThreadContextCurrentTraceContext.create(new StrictCurrentTraceContext())) .sampler(sampler); diff --git a/instrumentation/grpc/src/test/java/brave/grpc/ITTracingServerInterceptor.java b/instrumentation/grpc/src/test/java/brave/grpc/ITTracingServerInterceptor.java index 2e144f458e..737704a063 100644 --- a/instrumentation/grpc/src/test/java/brave/grpc/ITTracingServerInterceptor.java +++ b/instrumentation/grpc/src/test/java/brave/grpc/ITTracingServerInterceptor.java @@ -29,7 +29,8 @@ import io.grpc.examples.helloworld.HelloReply; import io.grpc.examples.helloworld.HelloRequest; import java.util.Iterator; -import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.apache.logging.log4j.LogManager; @@ -53,7 +54,8 @@ public class ITTracingServerInterceptor { @Rule public ExpectedException thrown = ExpectedException.none(); - ConcurrentLinkedDeque spans = new ConcurrentLinkedDeque<>(); + /** See brave.http.ITHttp for rationale on using a concurrent blocking queue */ + BlockingQueue spans = new LinkedBlockingQueue<>(); GrpcTracing grpcTracing; Server server; @@ -95,6 +97,10 @@ void init(@Nullable ServerInterceptor userInterceptor) throws Exception { server.shutdown(); server.awaitTermination(); } + // From brave.http.ITHttp.close + assertThat(spans.poll(100, TimeUnit.MILLISECONDS)) + .withFailMessage("Span remaining in queue. Check for exception or redundant reporting") + .isNull(); Tracing current = Tracing.current(); if (current != null) current.close(); } @@ -123,11 +129,11 @@ public void start(Listener responseListener, Metadata headers) { GreeterGrpc.newBlockingStub(channel).sayHello(HELLO_REQUEST); - Span s = tryTakeSpan(); - assertThat(s.traceId()).isEqualTo(traceId); - assertThat(s.parentId()).isEqualTo(parentId); - assertThat(s.id()).isEqualTo(spanId); - assertThat(s.shared()).isTrue(); + Span span = spans.take(); + assertThat(span.traceId()).isEqualTo(traceId); + assertThat(span.parentId()).isEqualTo(parentId); + assertThat(span.id()).isEqualTo(spanId); + assertThat(span.shared()).isTrue(); } @Test public void createsChildWhenJoinDisabled() throws Exception { @@ -157,11 +163,11 @@ public void start(Listener responseListener, Metadata headers) { GreeterGrpc.newBlockingStub(channel).sayHello(HELLO_REQUEST); - Span s = tryTakeSpan(); - assertThat(s.traceId()).isEqualTo(traceId); - assertThat(s.parentId()).isEqualTo(spanId); - assertThat(s.id()).isNotEqualTo(spanId); - assertThat(s.shared()).isNull(); + Span span = spans.take(); + assertThat(span.traceId()).isEqualTo(traceId); + assertThat(span.parentId()).isEqualTo(spanId); + assertThat(span.id()).isNotEqualTo(spanId); + assertThat(span.shared()).isNull(); } @Test public void samplingDisabled() throws Exception { @@ -170,7 +176,7 @@ public void start(Listener responseListener, Metadata headers) { GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST); - assertThat(spans).isEmpty(); + // @After will check that nothing is reported } /** @@ -194,24 +200,30 @@ public ServerCall.Listener interceptCall(ServerCall ServerCall.Listener interceptCall(ServerCall String spanName(MethodDescriptor methodDesc GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST); - Span span = tryTakeSpan(); + Span span = spans.take(); assertThat(span.name()).isEqualTo("unary"); assertThat(span.tags().keySet()).containsExactlyInAnyOrder( "grpc.message_received", "grpc.message_sent", @@ -276,7 +289,8 @@ protected String spanName(MethodDescriptor methodDesc .sayHelloWithManyReplies(HELLO_REQUEST); assertThat(replies).hasSize(10); // all response messages are tagged to the same span - assertThat(tryTakeSpan().tags()).hasSize(10); + Span span = spans.take(); + assertThat(span.tags()).hasSize(10); } Tracing.Builder tracingBuilder(Sampler sampler) { @@ -286,13 +300,4 @@ Tracing.Builder tracingBuilder(Sampler sampler) { ThreadContextCurrentTraceContext.create(new StrictCurrentTraceContext())) .sampler(sampler); } - - // Flake detection logic. If the test fails after the retry it might not be a timing issue. - Span tryTakeSpan() throws InterruptedException { - if (spans.isEmpty()) { - testLogger.warn("delayed getting a span; retrying"); - Thread.sleep(100); - } - return spans.poll(); - } } diff --git a/instrumentation/http-tests/src/main/java/brave/http/ITHttp.java b/instrumentation/http-tests/src/main/java/brave/http/ITHttp.java index b088586747..9387ad4b66 100644 --- a/instrumentation/http-tests/src/main/java/brave/http/ITHttp.java +++ b/instrumentation/http-tests/src/main/java/brave/http/ITHttp.java @@ -1,33 +1,117 @@ package brave.http; import brave.Tracing; +import brave.internal.HexCodec; import brave.propagation.B3Propagation; import brave.propagation.CurrentTraceContext; import brave.propagation.ExtraFieldPropagation; import brave.propagation.StrictCurrentTraceContext; import brave.propagation.TraceContext; import brave.sampler.Sampler; -import java.util.Map; -import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import okhttp3.mockwebserver.MockWebServer; import org.junit.After; import org.junit.Rule; -import org.junit.rules.ExpectedException; +import zipkin2.Annotation; import zipkin2.Span; import static org.assertj.core.api.Assertions.assertThat; +/** + * This is the base class for http-based integration tests. It has a few features to ensure tests + * cover common instrumentation bugs. Most of this optimizes for instrumentation occurring on a + * different thread than main (which does the assertions). + * + *

    + *
  • {@link StrictCurrentTraceContext} double-checks threads don't leak contexts
  • + *
  • Span reporting double-checks the span was de-scoped on finish, to prevent leaks
  • + *
  • Spans report into a concurrent blocking queue to prevent assertions race conditions
  • + *
  • After tests complete, the queue is strictly checked to catch redundant span reporting
  • + *
+ * + *

As a blocking queue is used, {@link #takeSpan take a span} to perform assertions on it. + * + *

{@code
+ * Span span = takeSpan();
+ * assertThat(span.traceId()).isEqualTo(traceId);
+ * }
+ * + * All spans reported must be taken before the test completes! + * + *

Debugging test failures

+ * + *

If a test hangs, likely {@link BlockingQueue#take()} is being called when a span wasn't + * reported. An exception or bug could cause this (for example, the error handling route not + * calling {@link brave.Span#finish()}). + * + *

If a test fails on {@link After}, it can mean that your test created a span, but didn't {@link BlockingQueue#take()} + * it off the queue. If you are testing something that creates a span, you may not want to verify + * each one. In this case, at least take them similar to below: + * + *

{@code
+ * for (int i = 0; i < 10; i++) takeSpan(); // we expected 10 spans
+ * }
+ * + *

This code looks hard.. why are we using a concurrent queue? My http client is easy

+ * + *

Some http client instrumentation are fully synchronous (everything on the main thread). + * Testing such instrumentation could be easier, ex reporting into a list. Some other race-detecting + * features may feel overkill in this case. + * + *

Consider though, this is a base class for all http instrumentation: servers (always report off + * main thread) and asynchronous clients (often report off main). Also, even blocking clients can + * execute their "on headers received" hook on a separate thread! Even if the http client you are + * working on does everything on the same thread, a small change could invalidate that assumption. + * If something written to work on one thread is suddenly working on two threads, tests can fail + * "randomly", perhaps not until an unrelated change to JRE. When tests fail, they also make it + * impossible to release new code until we disable the test or fix it. Bugs or race conditions + * instrumentation can be very time consuming to solve. For example, they can appear as "flakes" in + * CI servers such as Travis, which can be near impossible to debug. + * + *

Bottom-line is that we accept that strict tests are harder up front, and not necessary for a + * few types of blocking client instrumentation. However, the majority of http instrumentation have + * to concern themselves with multi-threaded behavior and if we always do, the chances of builds + * breaking are less. + */ public abstract class ITHttp { - @Rule public ExpectedException thrown = ExpectedException.none(); + public static final String EXTRA_KEY = "user-id"; + static final String CONTEXT_LEAK = "context.leak"; + @Rule public MockWebServer server = new MockWebServer(); - public static String EXTRA_KEY = "user-id"; - protected ConcurrentLinkedDeque spans = new ConcurrentLinkedDeque<>(); + /** + * When testing servers or asynchronous clients, spans are reported on a worker thread. In order + * to read them on the main thread, we use a concurrent queue. As some implementations report + * after a response is sent, we use a blocking queue to prevent race conditions in tests. + */ + private BlockingQueue spans = new LinkedBlockingQueue<>(); + + /** Call this to block until a span was reported */ + protected Span takeSpan() throws InterruptedException { + Span result = spans.take(); + assertThat(result.annotations()) + .extracting(Annotation::value) + .doesNotContain(CONTEXT_LEAK); + return result; + } protected CurrentTraceContext currentTraceContext = new StrictCurrentTraceContext(); protected HttpTracing httpTracing; + /** + * On close, we check that all spans have been verified by the test. This ensures bad behavior + * such as duplicate reporting doesn't occur. The impact is that every span must at least be + * {@link BlockingQueue#poll() polled} before the end of each method. + * + *

This also closes the current instance of tracing, to prevent it from being accidentally + * visible to other test classes which call {@link Tracing#current()}. + */ @After public void close() throws Exception { + assertThat(spans.poll(100, TimeUnit.MILLISECONDS)) + .withFailMessage("Span remaining in queue. Check for exception or redundant reporting") + .isNull(); Tracing current = Tracing.current(); if (current != null) current.close(); } @@ -37,22 +121,22 @@ Tracing.Builder tracingBuilder(Sampler sampler) { .spanReporter(s -> { // make sure the context was cleared prior to finish.. no leaks! TraceContext current = httpTracing.tracing().currentTraceContext().get(); + boolean contextLeak = false; if (current != null) { - assertThat(current.spanId()) - .isNotEqualTo(s.id()); + // add annotation in addition to throwing, in case we are off the main thread + if (HexCodec.toLowerHex(current.spanId()).equals(s.id())) { + s = s.toBuilder().addAnnotation(s.timestampAsLong(), CONTEXT_LEAK).build(); + contextLeak = true; + } } spans.add(s); + // throw so that we can see the path to the code that leaked the context + if (contextLeak) { + throw new AssertionError(CONTEXT_LEAK + " on " + Thread.currentThread().getName()); + } }) .propagationFactory(ExtraFieldPropagation.newFactory(B3Propagation.FACTORY, EXTRA_KEY)) .currentTraceContext(currentTraceContext) .sampler(sampler); } - - void assertReportedTagsInclude(String key, String... values) { - assertThat(spans) - .flatExtracting(s -> s.tags().entrySet()) - .filteredOn(e -> e.getKey().equals(key)) - .extracting(Map.Entry::getValue) - .containsExactly(values); - } } diff --git a/instrumentation/http-tests/src/main/java/brave/http/ITHttpAsyncClient.java b/instrumentation/http-tests/src/main/java/brave/http/ITHttpAsyncClient.java new file mode 100644 index 0000000000..e4d4462195 --- /dev/null +++ b/instrumentation/http-tests/src/main/java/brave/http/ITHttpAsyncClient.java @@ -0,0 +1,54 @@ +package brave.http; + +import brave.Tracer; +import brave.Tracer.SpanInScope; +import brave.internal.HexCodec; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.RecordedRequest; +import org.junit.Test; +import zipkin2.Span; + +import static org.assertj.core.api.Assertions.assertThat; + +public abstract class ITHttpAsyncClient extends ITHttpClient { + + protected abstract void getAsync(C client, String pathIncludingQuery) throws Exception; + + /** + * This tests that the parent is determined at the time the request was made, not when the request + * was executed. + */ + @Test public void usesParentFromInvocationTime() throws Exception { + Tracer tracer = httpTracing.tracing().tracer(); + server.enqueue(new MockResponse().setBodyDelay(300, TimeUnit.MILLISECONDS)); + server.enqueue(new MockResponse()); + + brave.Span parent = tracer.newTrace().name("test").start(); + try (SpanInScope ws = tracer.withSpanInScope(parent)) { + getAsync(client, "/items/1"); + getAsync(client, "/items/2"); + } finally { + parent.finish(); + } + + brave.Span otherSpan = tracer.newTrace().name("test2").start(); + try (SpanInScope ws = tracer.withSpanInScope(otherSpan)) { + for (int i = 0; i < 2; i++) { + RecordedRequest request = server.takeRequest(); + assertThat(request.getHeader("x-b3-traceId")) + .isEqualTo(parent.context().traceIdString()); + assertThat(request.getHeader("x-b3-parentspanid")) + .isEqualTo(HexCodec.toLowerHex(parent.context().spanId())); + } + } finally { + otherSpan.finish(); + } + + // Check we reported 2 local spans and 2 client spans + assertThat(Arrays.asList(takeSpan(), takeSpan(), takeSpan(), takeSpan())) + .extracting(Span::kind) + .containsOnly(null, Span.Kind.CLIENT); + } +} diff --git a/instrumentation/http-tests/src/main/java/brave/http/ITHttpClient.java b/instrumentation/http-tests/src/main/java/brave/http/ITHttpClient.java index b64c96ebd7..10396c9a52 100644 --- a/instrumentation/http-tests/src/main/java/brave/http/ITHttpClient.java +++ b/instrumentation/http-tests/src/main/java/brave/http/ITHttpClient.java @@ -7,7 +7,7 @@ import brave.propagation.ExtraFieldPropagation; import brave.propagation.SamplingFlags; import brave.sampler.Sampler; -import java.util.concurrent.TimeUnit; +import java.util.Arrays; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.RecordedRequest; import okhttp3.mockwebserver.SocketPolicy; @@ -37,8 +37,6 @@ public abstract class ITHttpClient extends ITHttp { protected abstract void post(C client, String pathIncludingQuery, String body) throws Exception; - protected abstract void getAsync(C client, String pathIncludingQuery) throws Exception; - @Override @After public void close() throws Exception { closeClient(client); super.close(); @@ -52,6 +50,8 @@ public abstract class ITHttpClient extends ITHttp { assertThat(request.getHeaders().toMultimap()) .containsKeys("x-b3-traceId", "x-b3-spanId") .containsEntry("x-b3-sampled", asList("1")); + + takeSpan(); } @Test public void makesChildOfCurrentSpan() throws Exception { @@ -70,6 +70,10 @@ public abstract class ITHttpClient extends ITHttp { .isEqualTo(parent.context().traceIdString()); assertThat(request.getHeader("x-b3-parentspanid")) .isEqualTo(HexCodec.toLowerHex(parent.context().spanId())); + + assertThat(Arrays.asList(takeSpan(), takeSpan())) + .extracting(Span::kind) + .containsOnly(null, Span.Kind.CLIENT); } @Test public void propagatesExtra_newTrace() throws Exception { @@ -86,6 +90,11 @@ public abstract class ITHttpClient extends ITHttp { assertThat(server.takeRequest().getHeader(EXTRA_KEY)) .isEqualTo("joey"); + + // we report one local and one client span + assertThat(Arrays.asList(takeSpan(), takeSpan())) + .extracting(Span::kind) + .containsOnly(null, Span.Kind.CLIENT); } @Test public void propagatesExtra_unsampledTrace() throws Exception { @@ -104,37 +113,6 @@ public abstract class ITHttpClient extends ITHttp { .isEqualTo("joey"); } - /** - * This tests that the parent is determined at the time the request was made, not when the request - * was executed. - */ - @Test public void usesParentFromInvocationTime() throws Exception { - Tracer tracer = httpTracing.tracing().tracer(); - server.enqueue(new MockResponse().setBodyDelay(1, TimeUnit.SECONDS)); - server.enqueue(new MockResponse()); - - brave.Span parent = tracer.newTrace().name("test").start(); - try (SpanInScope ws = tracer.withSpanInScope(parent)) { - getAsync(client, "/foo"); - getAsync(client, "/foo"); - } finally { - parent.finish(); - } - - brave.Span otherSpan = tracer.newTrace().name("test2").start(); - try (SpanInScope ws = tracer.withSpanInScope(otherSpan)) { - for (int i = 0; i < 2; i++) { - RecordedRequest request = server.takeRequest(); - assertThat(request.getHeader("x-b3-traceId")) - .isEqualTo(parent.context().traceIdString()); - assertThat(request.getHeader("x-b3-parentspanid")) - .isEqualTo(HexCodec.toLowerHex(parent.context().spanId())); - } - } finally { - otherSpan.finish(); - } - } - /** Unlike Brave 3, Brave 4 propagates trace ids even when unsampled */ @Test public void propagates_sampledFalse() throws Exception { close(); @@ -170,9 +148,9 @@ public abstract class ITHttpClient extends ITHttp { server.enqueue(new MockResponse()); get(client, "/foo"); - assertThat(spans) - .extracting(Span::kind) - .containsExactly(Span.Kind.CLIENT); + Span span = takeSpan(); + assertThat(span.kind()) + .isEqualTo(Span.Kind.CLIENT); } @Test @@ -180,9 +158,9 @@ public void reportsServerAddress() throws Exception { server.enqueue(new MockResponse()); get(client, "/foo"); - assertThat(spans) - .extracting(Span::remoteEndpoint) - .containsExactly(Endpoint.newBuilder() + Span span = takeSpan(); + assertThat(span.remoteEndpoint()) + .isEqualTo(Endpoint.newBuilder() .ip("127.0.0.1") .port(server.getPort()).build() ); @@ -192,9 +170,9 @@ public void reportsServerAddress() throws Exception { server.enqueue(new MockResponse()); get(client, "/foo"); - assertThat(spans) - .extracting(Span::name) - .containsExactly("get"); + Span span = takeSpan(); + assertThat(span.name()) + .isEqualTo("get"); } @Test public void supportsPortableCustomization() throws Exception { @@ -217,16 +195,16 @@ public void request(HttpAdapter adapter, Req req, server.enqueue(new MockResponse()); get(client, uri); - assertThat(spans) - .extracting(Span::name) - .containsExactly("get /foo"); + Span span = takeSpan(); + assertThat(span.name()) + .isEqualTo("get /foo"); - assertThat(spans) - .extracting(Span::remoteServiceName) - .containsExactly("remote-service"); + assertThat(span.remoteServiceName()) + .isEqualTo("remote-service"); - assertReportedTagsInclude("http.url", url(uri)); - assertReportedTagsInclude("context.visible", "true"); + assertThat(span.tags()) + .containsEntry("http.url", url(uri)) + .containsEntry("context.visible", "true"); } @Test public void addsStatusCodeWhenNotOk() throws Exception { @@ -238,8 +216,10 @@ public void request(HttpAdapter adapter, Req req, // some clients think 400 is an error } - assertReportedTagsInclude("http.status_code", "400"); - assertReportedTagsInclude("error", "400"); + Span span = takeSpan(); + assertThat(span.tags()) + .containsEntry("http.status_code", "400") + .containsEntry("error", "400"); } @Test public void redirect() throws Exception { @@ -249,7 +229,7 @@ public void request(HttpAdapter adapter, Req req, server.enqueue(new MockResponse().setResponseCode(404)); // hehe to a bad location! brave.Span parent = tracer.newTrace().name("test").start(); - try (Tracer.SpanInScope ws = tracer.withSpanInScope(parent)) { + try (SpanInScope ws = tracer.withSpanInScope(parent)) { get(client, "/foo"); } catch (RuntimeException e) { // some think 404 is an exception @@ -257,7 +237,12 @@ public void request(HttpAdapter adapter, Req req, parent.finish(); } - assertReportedTagsInclude("http.path", "/foo", "/bar"); + Span client1 = takeSpan(); + Span client2 = takeSpan(); + assertThat(Arrays.asList(client1.tags().get("http.path"), client2.tags().get("http.path"))) + .contains("/foo", "/bar"); + + assertThat(takeSpan().kind()).isNull(); // local } @Test public void post() throws Exception { @@ -270,12 +255,16 @@ public void request(HttpAdapter adapter, Req req, assertThat(server.takeRequest().getBody().readUtf8()) .isEqualTo(body); - assertThat(spans) - .extracting(Span::name) - .containsExactly("post"); + Span span = takeSpan(); + assertThat(span.name()) + .isEqualTo("post"); } @Test public void reportsSpanOnTransportException() throws Exception { + checkReportsSpanOnTransportException(); + } + + Span checkReportsSpanOnTransportException() throws InterruptedException { server.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AT_START)); try { @@ -284,15 +273,13 @@ public void request(HttpAdapter adapter, Req req, // ok, but the span should include an error! } - assertThat(spans).hasSize(1); + return takeSpan(); } @Test public void addsErrorTagOnTransportException() throws Exception { - reportsSpanOnTransportException(); - - assertThat(spans) - .flatExtracting(s -> s.tags().keySet()) - .contains("error"); + Span span = checkReportsSpanOnTransportException(); + assertThat(span.tags()) + .containsKey("error"); } @Test public void httpPathTagExcludesQueryParams() throws Exception { @@ -301,7 +288,9 @@ public void request(HttpAdapter adapter, Req req, server.enqueue(new MockResponse()); get(client, path); - assertReportedTagsInclude("http.path", "/foo"); + Span span = takeSpan(); + assertThat(span.tags()) + .containsEntry("http.path", "/foo"); } protected String url(String pathIncludingQuery) { diff --git a/instrumentation/http-tests/src/main/java/brave/http/ITHttpServer.java b/instrumentation/http-tests/src/main/java/brave/http/ITHttpServer.java index 0b9e2c2a2e..31e5048313 100644 --- a/instrumentation/http-tests/src/main/java/brave/http/ITHttpServer.java +++ b/instrumentation/http-tests/src/main/java/brave/http/ITHttpServer.java @@ -4,7 +4,6 @@ import brave.propagation.ExtraFieldPropagation; import brave.sampler.Sampler; import java.io.IOException; -import java.util.Map; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; @@ -22,8 +21,7 @@ public abstract class ITHttpServer extends ITHttp { OkHttpClient client = new OkHttpClient(); - @Before - public void setup() throws Exception { + @Before public void setup() throws Exception { httpTracing = HttpTracing.create(tracingBuilder(Sampler.ALWAYS_SAMPLE).build()); init(); } @@ -48,16 +46,17 @@ public void usesExistingTraceId() throws Exception { .header("X-B3-Sampled", "1") .build()); - assertThat(spans).allSatisfy(s -> { - assertThat(s.traceId()).isEqualTo(traceId); - assertThat(s.parentId()).isEqualTo(parentId); - assertThat(s.id()).isEqualTo(spanId); - }); + Span span = takeSpan(); + assertThat(span.traceId()).isEqualTo(traceId); + assertThat(span.parentId()).isEqualTo(parentId); + assertThat(span.id()).isEqualTo(spanId); } @Test public void readsExtra_newTrace() throws Exception { readsExtra(new Request.Builder()); + + takeSpan(); } @Test @@ -65,7 +64,7 @@ public void readsExtra_unsampled() throws Exception { readsExtra(new Request.Builder() .header("X-B3-Sampled", "0")); - assertThat(spans).isEmpty(); + // @After will check that nothing is reported } @Test @@ -76,17 +75,16 @@ public void readsExtra_existingTrace() throws Exception { .header("X-B3-TraceId", traceId) .header("X-B3-SpanId", traceId)); - assertThat(spans).allSatisfy(s -> { - assertThat(s.traceId()).isEqualTo(traceId); - assertThat(s.id()).isEqualTo(traceId); - }); + Span span = takeSpan(); + assertThat(span.traceId()).isEqualTo(traceId); + assertThat(span.id()).isEqualTo(traceId); } /** * The /extra endpoint should copy the key {@link #EXTRA_KEY} to the response body using * {@link ExtraFieldPropagation#get(String)}. */ - void readsExtra(Request.Builder builder) throws IOException { + void readsExtra(Request.Builder builder) throws Exception { Request request = builder.url(url("/extra")) // this is the pre-configured key we can pass through .header(EXTRA_KEY, "joey").build(); @@ -105,8 +103,7 @@ public void samplingDisabled() throws Exception { get("/foo"); - assertThat(spans) - .isEmpty(); + // @After will check that nothing is reported } @Test public void customSampler() throws Exception { @@ -122,8 +119,7 @@ public void samplingDisabled() throws Exception { assertThat(response.isSuccessful()).isTrue(); } - assertThat(spans) - .isEmpty(); + // @After will check that nothing is reported } /** @@ -133,7 +129,7 @@ public void samplingDisabled() throws Exception { public void async() throws Exception { get("/async"); - assertThat(spans).hasSize(1); + takeSpan(); } /** @@ -146,10 +142,8 @@ public void async() throws Exception { public void createsChildSpan() throws Exception { get("/child"); - assertThat(spans).hasSize(2); - - Span child = spans.pop(); - Span parent = spans.pop(); + Span child = takeSpan(); + Span parent = takeSpan(); assertThat(parent.traceId()).isEqualTo(child.traceId()); assertThat(parent.id()).isEqualTo(child.parentId()); @@ -161,9 +155,9 @@ public void createsChildSpan() throws Exception { public void reportsClientAddress() throws Exception { get("/foo"); - assertThat(spans) - .flatExtracting(Span::remoteEndpoint) - .doesNotContainNull(); + Span span = takeSpan(); + assertThat(span.remoteEndpoint()) + .isNotNull(); } @Test @@ -172,8 +166,8 @@ public void reportsClientAddress_XForwardedFor() throws Exception { .header("X-Forwarded-For", "1.2.3.4") .build()); - assertThat(spans) - .extracting(Span::remoteEndpoint) + Span span = takeSpan(); + assertThat(span.remoteEndpoint()) .extracting(Endpoint::ipv4) .contains("1.2.3.4"); } @@ -182,18 +176,18 @@ public void reportsClientAddress_XForwardedFor() throws Exception { public void reportsServerKindToZipkin() throws Exception { get("/foo"); - assertThat(spans) - .extracting(Span::kind) - .containsExactly(Span.Kind.SERVER); + Span span = takeSpan(); + assertThat(span.kind()) + .isEqualTo(Span.Kind.SERVER); } @Test public void defaultSpanNameIsMethodName() throws Exception { get("/foo"); - assertThat(spans) - .extracting(Span::name) - .containsExactly("get"); + Span span = takeSpan(); + assertThat(span.name()) + .isEqualTo("get"); } @Test @@ -211,12 +205,12 @@ public void request(HttpAdapter adapter, Req req, SpanCustomizer c String uri = "/foo?z=2&yAA=1"; get(uri); - assertThat(spans) - .extracting(Span::name) - .containsExactly("get /foo"); - - assertReportedTagsInclude("http.url", url(uri)); - assertReportedTagsInclude("context.visible", "true"); + Span span = takeSpan(); + assertThat(span.name()) + .isEqualTo("get /foo"); + assertThat(span.tags()) + .containsEntry("http.url", url(uri)) + .containsEntry("context.visible", "true"); } @Test @@ -227,8 +221,10 @@ public void addsStatusCode_badRequest() throws Exception { // some servers think 400 is an error } - assertReportedTagsInclude("http.status_code", "400"); - assertReportedTagsInclude("error", "400"); + Span span = takeSpan(); + assertThat(span.tags()) + .containsEntry("http.status_code", "400") + .containsEntry("error", "400"); } @Test @@ -241,10 +237,10 @@ public void reportsSpanOnException_async() throws Exception { reportsSpanOnException("/exceptionAsync"); } - private void reportsSpanOnException(String path) throws IOException { + Span reportsSpanOnException(String path) throws Exception { get(path); - assertThat(spans).hasSize(1); + return takeSpan(); } @Test @@ -261,7 +257,9 @@ public void addsErrorTagOnException_async() throws Exception { public void httpPathTagExcludesQueryParams() throws Exception { get("/foo?z=2&yAA=1"); - assertReportedTagsInclude("http.path", "/foo"); + Span span = takeSpan(); + assertThat(span.tags()) + .containsEntry("http.path", "/foo"); } protected Response get(String path) throws IOException { @@ -286,16 +284,9 @@ protected Response get(Request request) throws IOException { } } - private void addsErrorTagOnException(String path) throws IOException { - reportsSpanOnException(path); - try { - Thread.sleep(1000L); - } catch (InterruptedException e) { - e.printStackTrace(); - } - assertThat(spans) - .flatExtracting(s -> s.tags().entrySet()) - .extracting(Map.Entry::getKey) - .contains("error"); + private void addsErrorTagOnException(String path) throws Exception { + Span span = reportsSpanOnException(path); + assertThat(span.tags()) + .containsKey("error"); } } diff --git a/instrumentation/http-tests/src/main/java/brave/http/ITServlet25Container.java b/instrumentation/http-tests/src/main/java/brave/http/ITServlet25Container.java index 8605e31a29..ea3ae81bff 100644 --- a/instrumentation/http-tests/src/main/java/brave/http/ITServlet25Container.java +++ b/instrumentation/http-tests/src/main/java/brave/http/ITServlet25Container.java @@ -88,6 +88,8 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha assertThat(response.header(EXTRA_KEY)) .isEqualTo("abcdefg"); } + + takeSpan(); } @Override diff --git a/instrumentation/http-tests/src/main/java/brave/http/ITServlet3Container.java b/instrumentation/http-tests/src/main/java/brave/http/ITServlet3Container.java index bbcda25d74..ef438a5c94 100644 --- a/instrumentation/http-tests/src/main/java/brave/http/ITServlet3Container.java +++ b/instrumentation/http-tests/src/main/java/brave/http/ITServlet3Container.java @@ -13,8 +13,6 @@ import org.junit.AfterClass; import org.junit.Test; -import static org.assertj.core.api.Assertions.assertThat; - public abstract class ITServlet3Container extends ITServlet25Container { static ExecutorService executor = Executors.newCachedThreadPool(); @@ -32,13 +30,13 @@ static class AsyncServlet extends HttpServlet { @Test public void forward() throws Exception { get("/forward"); - assertThat(spans).hasSize(1); + takeSpan(); } @Test public void forwardAsync() throws Exception { get("/forwardAsync"); - assertThat(spans).hasSize(1); + takeSpan(); } static class ForwardServlet extends HttpServlet { diff --git a/instrumentation/http/src/main/java/brave/http/HttpServerHandler.java b/instrumentation/http/src/main/java/brave/http/HttpServerHandler.java index 7526974939..8f9f711164 100644 --- a/instrumentation/http/src/main/java/brave/http/HttpServerHandler.java +++ b/instrumentation/http/src/main/java/brave/http/HttpServerHandler.java @@ -4,7 +4,6 @@ import brave.SpanCustomizer; import brave.Tracer; import brave.internal.Nullable; -import brave.internal.Platform; import brave.propagation.TraceContext; import brave.propagation.TraceContextOrSamplingFlags; import zipkin2.Endpoint; @@ -34,7 +33,12 @@ public final class HttpServerHandler { public static HttpServerHandler create(HttpTracing httpTracing, HttpServerAdapter adapter) { - return new HttpServerHandler<>(httpTracing, adapter); + return new HttpServerHandler<>( + httpTracing.tracing().tracer(), + httpTracing.serverSampler(), + httpTracing.serverParser(), + adapter + ); } final Tracer tracer; @@ -42,10 +46,15 @@ public static HttpServerHandler create(HttpTracing httpTracing, final HttpServerParser parser; final HttpServerAdapter adapter; - HttpServerHandler(HttpTracing httpTracing, HttpServerAdapter adapter) { - this.tracer = httpTracing.tracing().tracer(); - this.sampler = httpTracing.serverSampler(); - this.parser = httpTracing.serverParser(); + HttpServerHandler( + Tracer tracer, + HttpSampler sampler, + HttpServerParser parser, + HttpServerAdapter adapter + ) { + this.tracer = tracer; + this.sampler = sampler; + this.parser = parser; this.adapter = adapter; } @@ -63,6 +72,8 @@ public Span handleReceive(TraceContext.Extractor extractor, Req request) { * Like {@link #handleReceive(TraceContext.Extractor, Object)}, except for when the carrier of * trace data is not the same as the request. * + *

Request data is parsed before the span is started. + * * @see HttpServerParser#request(HttpAdapter, Object, SpanCustomizer) */ public Span handleReceive(TraceContext.Extractor extractor, C carrier, Req request) { @@ -71,7 +82,13 @@ public Span handleReceive(TraceContext.Extractor extractor, C carrier, Re // all of the parsing here occur before a timestamp is recorded on the span span.kind(Span.Kind.SERVER); + parseRequest(request, span); + + return span.start(); + } + void parseRequest(Req request, Span span) { + if (span.isNoop()) return; // Ensure user-code can read the current trace context Tracer.SpanInScope ws = tracer.withSpanInScope(span); try { @@ -84,7 +101,6 @@ public Span handleReceive(TraceContext.Extractor extractor, C carrier, Re if (adapter.parseClientAddress(request, remoteEndpoint)) { span.remoteEndpoint(remoteEndpoint.build()); } - return span.start(); } /** Creates a potentially noop span representing this request */ diff --git a/instrumentation/httpasyncclient/src/test/java/brave/httpasyncclient/ITTracingHttpAsyncClientBuilder.java b/instrumentation/httpasyncclient/src/test/java/brave/httpasyncclient/ITTracingHttpAsyncClientBuilder.java index 6df6eebd8f..7308d565d1 100644 --- a/instrumentation/httpasyncclient/src/test/java/brave/httpasyncclient/ITTracingHttpAsyncClientBuilder.java +++ b/instrumentation/httpasyncclient/src/test/java/brave/httpasyncclient/ITTracingHttpAsyncClientBuilder.java @@ -1,6 +1,6 @@ package brave.httpasyncclient; -import brave.http.ITHttpClient; +import brave.http.ITHttpAsyncClient; import java.io.IOException; import java.net.URI; import okhttp3.mockwebserver.MockResponse; @@ -15,7 +15,7 @@ import static org.assertj.core.api.Assertions.assertThat; -public class ITTracingHttpAsyncClientBuilder extends ITHttpClient { +public class ITTracingHttpAsyncClientBuilder extends ITHttpAsyncClient { @Override protected CloseableHttpAsyncClient newClient(int port) { CloseableHttpAsyncClient result = TracingHttpAsyncClientBuilder.create(httpTracing).build(); @@ -60,5 +60,7 @@ protected void post(CloseableHttpAsyncClient client, String pathIncludingQuery, RecordedRequest request = server.takeRequest(); assertThat(request.getHeader("x-b3-traceId")) .isEqualTo(request.getHeader("my-id")); + + takeSpan(); } } diff --git a/instrumentation/httpclient/src/test/java/brave/httpclient/ITTracingHttpClientBuilder.java b/instrumentation/httpclient/src/test/java/brave/httpclient/ITTracingHttpClientBuilder.java index 53bb819a3d..62a0ab0e5d 100644 --- a/instrumentation/httpclient/src/test/java/brave/httpclient/ITTracingHttpClientBuilder.java +++ b/instrumentation/httpclient/src/test/java/brave/httpclient/ITTracingHttpClientBuilder.java @@ -10,7 +10,6 @@ import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; -import org.junit.AssumptionViolatedException; import org.junit.Test; import static org.apache.http.util.EntityUtils.consume; @@ -38,10 +37,6 @@ public class ITTracingHttpClientBuilder extends ITHttpClient { +public class ITTracingFeature_Client extends ITHttpAsyncClient { ExecutorService executor = Executors.newSingleThreadExecutor(); @@ -27,17 +27,16 @@ public class ITTracingFeature_Client extends ITHttpClient { .build(); } - @Override protected void closeClient(Client client) throws IOException { + @Override protected void closeClient(Client client) { client.close(); executor.shutdown(); } - @Override protected void get(Client client, String pathIncludingQuery) throws IOException { + @Override protected void get(Client client, String pathIncludingQuery) { client.target(url(pathIncludingQuery)).request().buildGet().invoke().close(); } - @Override protected void post(Client client, String pathIncludingQuery, String body) - throws Exception { + @Override protected void post(Client client, String pathIncludingQuery, String body) { client.target(url(pathIncludingQuery)).request() .buildPost(Entity.text(body)) .invoke().close(); @@ -64,25 +63,23 @@ public class ITTracingFeature_Client extends ITHttpClient { RecordedRequest request = server.takeRequest(); assertThat(request.getHeader("x-b3-traceId")) .isEqualTo(request.getHeader("my-id")); + + takeSpan(); } - @Override @Test(expected = AssertionError.class) - public void redirect() throws Exception { // blind to the implementation of redirects - super.redirect(); + @Override @Ignore("blind to the implementation of redirects") + public void redirect() { } - @Override @Test(expected = AssertionError.class) - public void reportsServerAddress() throws Exception { // doesn't know the remote address - super.reportsServerAddress(); + @Override @Ignore("doesn't know the remote address") + public void reportsServerAddress() { } - @Override @Test(expected = AssertionError.class) // doesn't yet close a span on exception - public void reportsSpanOnTransportException() throws Exception { - super.reportsSpanOnTransportException(); + @Override @Ignore("doesn't yet close a span on exception") + public void reportsSpanOnTransportException() { } - @Override @Test(expected = AssertionError.class) // doesn't yet close a span on exception - public void addsErrorTagOnTransportException() throws Exception { - super.addsErrorTagOnTransportException(); + @Override @Ignore("doesn't yet close a span on exception") + public void addsErrorTagOnTransportException() { } } diff --git a/instrumentation/jaxrs2/src/test/java/brave/jaxrs2/ITTracingFeature_Container.java b/instrumentation/jaxrs2/src/test/java/brave/jaxrs2/ITTracingFeature_Container.java index 893d2c3ae5..95bf9a3a6d 100644 --- a/instrumentation/jaxrs2/src/test/java/brave/jaxrs2/ITTracingFeature_Container.java +++ b/instrumentation/jaxrs2/src/test/java/brave/jaxrs2/ITTracingFeature_Container.java @@ -93,9 +93,9 @@ public String spanName(HttpAdapter adapter, Req req) { get("/foo"); - assertThat(spans) - .extracting(Span::name) - .containsExactly("foo"); + Span span = takeSpan(); + assertThat(span.name()) + .isEqualTo("foo"); } @Override public void init(ServletContextHandler handler) { diff --git a/instrumentation/jaxrs2/src/test/java/brave/jaxrs2/features/sampling/ITDeclarativeSampling.java b/instrumentation/jaxrs2/src/test/java/brave/jaxrs2/features/sampling/ITDeclarativeSampling.java index 18b9cf9124..0a6b3c09ee 100644 --- a/instrumentation/jaxrs2/src/test/java/brave/jaxrs2/features/sampling/ITDeclarativeSampling.java +++ b/instrumentation/jaxrs2/src/test/java/brave/jaxrs2/features/sampling/ITDeclarativeSampling.java @@ -7,7 +7,8 @@ import brave.http.ServletContainer; import brave.jaxrs2.TracingBootstrap; import java.io.IOException; -import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import javax.ws.rs.GET; import javax.ws.rs.Path; import okhttp3.OkHttpClient; @@ -24,7 +25,8 @@ import static org.assertj.core.api.Assertions.assertThat; public class ITDeclarativeSampling extends ServletContainer { - ConcurrentLinkedDeque spans = new ConcurrentLinkedDeque<>(); + /** Spans are reported on a server thread, but we read them on the main thread */ + BlockingQueue spans = new LinkedBlockingQueue<>(); OkHttpClient client = new OkHttpClient(); HttpTracing httpTracing = HttpTracing.newBuilder(Tracing.newBuilder() .spanReporter(spans::add) @@ -69,13 +71,13 @@ public void annotationOverridesHttpSampler() throws Exception { assertThat(spans).isEmpty(); get("/foo"); - assertThat(spans).isNotEmpty(); + spans.take(); } @Test public void lackOfAnnotationFallsback() throws Exception { get("/baz"); - assertThat(spans).isNotEmpty(); + spans.take(); } @Override public void init(ServletContextHandler handler) { diff --git a/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/ITKafkaTracing.java b/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/ITKafkaTracing.java index 12725c93e5..a3d4109d5c 100644 --- a/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/ITKafkaTracing.java +++ b/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/ITKafkaTracing.java @@ -13,7 +13,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -37,8 +39,12 @@ public class ITKafkaTracing { String TEST_KEY = "foo"; String TEST_VALUE = "bar"; - ConcurrentLinkedDeque consumerSpans = new ConcurrentLinkedDeque<>(); - ConcurrentLinkedDeque producerSpans = new ConcurrentLinkedDeque<>(); + /** + * See brave.http.ITHttp for rationale on using a concurrent blocking queue eventhough some calls, + * like consumer operations, happen on the main thread. + */ + BlockingQueue consumerSpans = new LinkedBlockingQueue<>(); + BlockingQueue producerSpans = new LinkedBlockingQueue<>(); KafkaTracing consumerTracing = KafkaTracing.create(Tracing.newBuilder() .spanReporter(consumerSpans::add) @@ -55,8 +61,18 @@ public class ITKafkaTracing { Producer producer; Consumer consumer; + // See brave.http.ITHttp for rationale on polling after tests complete + @After public void checkConsumedAllSpans() throws Exception { + assertThat(producerSpans.poll(100, TimeUnit.MILLISECONDS)) + .withFailMessage("Producer spans remaining in queue. Check for redundant reporting") + .isNull(); + assertThat(consumerSpans.poll(100, TimeUnit.MILLISECONDS)) + .withFailMessage("Consumer spans remaining in queue. Check for redundant reporting") + .isNull(); + } + @After - public void close() throws Exception { + public void close() { if (producer != null) producer.close(); if (consumer != null) consumer.close(); Tracing current = Tracing.current(); @@ -73,22 +89,20 @@ public void joinSpan_deprecated_because_it_writes_to_old_span() throws Exception ConsumerRecords records = consumer.poll(10000); assertThat(records).hasSize(1); - assertThat(producerSpans).hasSize(1); - assertThat(consumerSpans).hasSize(1); + Span producerSpan = producerSpans.take(); + Span consumerSpan = consumerSpans.take(); - assertThat(consumerSpans.getFirst().traceId()) - .isEqualTo(producerSpans.getFirst().traceId()); + assertThat(consumerSpan.traceId()) + .isEqualTo(producerSpan.traceId()); for (ConsumerRecord record : records) { brave.Span joined = consumerTracing.joinSpan(record); - joined.annotate("foo"); - joined.flush(); + joined.abandon(); // Re-using this span happens "after" it is completed, which will make the UI look strange // Instead, use nextSpan to create a span representing message processing. - assertThat(consumerSpans) - .filteredOn(s -> s.id().equals(HexCodec.toLowerHex(joined.context().spanId()))) - .hasSize(2); + assertThat(consumerSpan.id()) + .isEqualTo(HexCodec.toLowerHex(joined.context().spanId())); } } @@ -106,22 +120,22 @@ public void poll_creates_one_consumer_span_per_extracted_context() throws Except ConsumerRecords records = consumer.poll(10000); assertThat(records).hasSize(2); - assertThat(producerSpans).hasSize(2); - assertThat(consumerSpans).hasSize(2); + Span producerSpan1 = producerSpans.take(), producerSpan2 = producerSpans.take(); + Span consumerSpan1 = consumerSpans.take(), consumerSpan2 = consumerSpans.take(); // Check to see the trace is continued between the producer and the consumer // we don't know the order the spans will come in. Correlate with the tag instead. - String firstTopic = producerSpans.getFirst().tags().get(KAFKA_TOPIC_TAG); - if (firstTopic.equals(consumerSpans.getFirst().tags().get(KAFKA_TOPIC_TAG))) { - assertThat(producerSpans.getFirst().traceId()) - .isEqualTo(consumerSpans.getFirst().traceId()); - assertThat(producerSpans.getLast().traceId()) - .isEqualTo(consumerSpans.getLast().traceId()); + String firstTopic = producerSpan1.tags().get(KAFKA_TOPIC_TAG); + if (firstTopic.equals(consumerSpan1.tags().get(KAFKA_TOPIC_TAG))) { + assertThat(producerSpan1.traceId()) + .isEqualTo(consumerSpan1.traceId()); + assertThat(producerSpan2.traceId()) + .isEqualTo(consumerSpan2.traceId()); } else { - assertThat(producerSpans.getFirst().traceId()) - .isEqualTo(consumerSpans.getLast().traceId()); - assertThat(producerSpans.getLast().traceId()) - .isEqualTo(consumerSpans.getFirst().traceId()); + assertThat(producerSpan1.traceId()) + .isEqualTo(consumerSpan2.traceId()); + assertThat(producerSpan2.traceId()) + .isEqualTo(consumerSpan1.traceId()); } } @@ -141,8 +155,9 @@ public void poll_creates_one_consumer_span_per_topic() throws Exception { ConsumerRecords records = consumer.poll(10000); assertThat(records).hasSize(10); - assertThat(producerSpans).isEmpty(); // not traced - assertThat(consumerSpans).hasSize(2); // one per topic! + consumerSpans.take(); + consumerSpans.take(); + // producerSpans empty as not traced } @Test @@ -155,14 +170,14 @@ public void nextSpan_makes_child() throws Exception { ConsumerRecords records = consumer.poll(10000); assertThat(records).hasSize(1); - assertThat(producerSpans).hasSize(1); - assertThat(consumerSpans).hasSize(1); + Span producerSpan = producerSpans.take(); + Span consumerSpan = consumerSpans.take(); for (ConsumerRecord record : records) { brave.Span processor = consumerTracing.nextSpan(record); - Span consumerSpan = consumerSpans.stream() - .filter(s -> s.tags().get(KAFKA_TOPIC_TAG).equals(record.topic())).findAny().get(); + assertThat(consumerSpan.tags()) + .containsEntry(KAFKA_TOPIC_TAG, record.topic()); assertThat(processor.context().traceIdString()).isEqualTo(consumerSpan.traceId()); assertThat(HexCodec.toLowerHex(processor.context().parentId())).isEqualTo(consumerSpan.id()); @@ -170,9 +185,9 @@ public void nextSpan_makes_child() throws Exception { processor.start().name("processor").finish(); // The processor doesn't taint the consumer span which has already finished - assertThat(consumerSpans) - .extracting(Span::id) - .containsOnly(HexCodec.toLowerHex(processor.context().spanId()), consumerSpan.id()); + Span processorSpan = consumerSpans.take(); + assertThat(processorSpan.id()) + .isNotEqualTo(consumerSpan.id()); } } @@ -231,16 +246,15 @@ public void continues_a_trace_when_only_trace_id_propagated() throws Exception { ConsumerRecords records = consumer.poll(10000); assertThat(records).hasSize(1); - assertThat(producerSpans).hasSize(1); - assertThat(consumerSpans).hasSize(1); + Span producerSpan = producerSpans.take(); + Span consumerSpan = consumerSpans.take(); - assertThat(consumerSpans.getFirst().traceId()) - .isEqualTo(producerSpans.getFirst().traceId()); + assertThat(producerSpan.traceId()) + .isEqualTo(consumerSpan.traceId()); for (ConsumerRecord record : records) { TraceContext forProcessor = consumerTracing.nextSpan(record).context(); - Span consumerSpan = consumerSpans.getLast(); assertThat(forProcessor.traceIdString()).isEqualTo(consumerSpan.traceId()); } } diff --git a/instrumentation/mysql/src/test/java/brave/mysql/ITTracingStatementInterceptor.java b/instrumentation/mysql/src/test/java/brave/mysql/ITTracingStatementInterceptor.java index 1a13002259..52756073b5 100644 --- a/instrumentation/mysql/src/test/java/brave/mysql/ITTracingStatementInterceptor.java +++ b/instrumentation/mysql/src/test/java/brave/mysql/ITTracingStatementInterceptor.java @@ -9,7 +9,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.ArrayList; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -22,7 +22,8 @@ public class ITTracingStatementInterceptor { static final String QUERY = "select 'hello world'"; - ConcurrentLinkedDeque spans = new ConcurrentLinkedDeque<>(); + /** JDBC is synchronous and we aren't using thread pools: everything happens on the main thread */ + ArrayList spans = new ArrayList<>(); Tracing tracing = tracingBuilder(Sampler.ALWAYS_SAMPLE).build(); Connection connection; diff --git a/instrumentation/mysql6/src/test/java/brave/mysql6/ITTracingStatementInterceptor.java b/instrumentation/mysql6/src/test/java/brave/mysql6/ITTracingStatementInterceptor.java index f88e58aee5..b5eb69b5ee 100644 --- a/instrumentation/mysql6/src/test/java/brave/mysql6/ITTracingStatementInterceptor.java +++ b/instrumentation/mysql6/src/test/java/brave/mysql6/ITTracingStatementInterceptor.java @@ -9,7 +9,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.ArrayList; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -23,7 +23,8 @@ public class ITTracingStatementInterceptor { static final String QUERY = "select 'hello world'"; - ConcurrentLinkedDeque spans = new ConcurrentLinkedDeque<>(); + /** JDBC is synchronous and we aren't using thread pools: everything happens on the main thread */ + ArrayList spans = new ArrayList<>(); Tracing tracing = tracingBuilder(Sampler.ALWAYS_SAMPLE).build(); Connection connection; diff --git a/instrumentation/netty-codec-http/src/main/java/brave/netty/http/NettyHttpTracing.java b/instrumentation/netty-codec-http/src/main/java/brave/netty/http/NettyHttpTracing.java index bb26565811..51ae3e4e62 100644 --- a/instrumentation/netty-codec-http/src/main/java/brave/netty/http/NettyHttpTracing.java +++ b/instrumentation/netty-codec-http/src/main/java/brave/netty/http/NettyHttpTracing.java @@ -1,6 +1,7 @@ package brave.netty.http; import brave.Span; +import brave.Tracer.SpanInScope; import brave.Tracing; import brave.http.HttpTracing; import io.netty.channel.ChannelDuplexHandler; @@ -8,6 +9,8 @@ public final class NettyHttpTracing { static final AttributeKey SPAN_ATTRIBUTE = AttributeKey.valueOf(Span.class.getName()); + static final AttributeKey SPAN_IN_SCOPE_ATTRIBUTE = + AttributeKey.valueOf(SpanInScope.class.getName()); public static NettyHttpTracing create(Tracing tracing) { return new NettyHttpTracing(HttpTracing.create(tracing)); diff --git a/instrumentation/netty-codec-http/src/main/java/brave/netty/http/TracingHttpServerHandler.java b/instrumentation/netty-codec-http/src/main/java/brave/netty/http/TracingHttpServerHandler.java index de09920e79..24231aaa7a 100644 --- a/instrumentation/netty-codec-http/src/main/java/brave/netty/http/TracingHttpServerHandler.java +++ b/instrumentation/netty-codec-http/src/main/java/brave/netty/http/TracingHttpServerHandler.java @@ -55,15 +55,19 @@ final class TracingHttpServerHandler extends ChannelDuplexHandler { Span span = nextSpan(extractor.extract(request.headers()), request).kind(Span.Kind.SERVER); ctx.channel().attr(NettyHttpTracing.SPAN_ATTRIBUTE).set(span); + SpanInScope spanInScope = tracer.withSpanInScope(span); + ctx.channel().attr(NettyHttpTracing.SPAN_IN_SCOPE_ATTRIBUTE).set(spanInScope); // Place the span in scope so that downstream code can read trace IDs - try (SpanInScope ws = tracer.withSpanInScope(span)) { + try { if (!span.isNoop()) { parser.request(adapter, request, span); maybeParseClientAddress(ctx.channel(), request, span); span.start(); } ctx.fireChannelRead(msg); + } finally { + spanInScope.close(); } } @@ -99,11 +103,16 @@ Span nextSpan(TraceContextOrSamplingFlags extracted, HttpRequest request) { return; } - // Place the span in scope so that downstream code can read trace IDs HttpResponse response = (HttpResponse) msg; - try (SpanInScope ws = tracer.withSpanInScope(span)) { + + // Guard re-scoping the same span + SpanInScope spanInScope = ctx.channel().attr(NettyHttpTracing.SPAN_IN_SCOPE_ATTRIBUTE).get(); + if (spanInScope == null) spanInScope = tracer.withSpanInScope(span); + try { ctx.write(msg, prm); parser.response(adapter, response, null, span); + } finally { + spanInScope.close(); // clear scope before reporting span.finish(); } } diff --git a/instrumentation/okhttp3/src/test/java/brave/okhttp3/ITTracingCallFactory.java b/instrumentation/okhttp3/src/test/java/brave/okhttp3/ITTracingCallFactory.java index 782f980cfb..7d4b622681 100644 --- a/instrumentation/okhttp3/src/test/java/brave/okhttp3/ITTracingCallFactory.java +++ b/instrumentation/okhttp3/src/test/java/brave/okhttp3/ITTracingCallFactory.java @@ -1,8 +1,9 @@ package brave.okhttp3; import brave.Tracer; -import brave.http.ITHttpClient; +import brave.http.ITHttpAsyncClient; import java.io.IOException; +import java.util.Arrays; import java.util.concurrent.TimeUnit; import okhttp3.Call; import okhttp3.Callback; @@ -14,10 +15,11 @@ import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.RecordedRequest; import org.junit.Test; +import zipkin2.Span; import static org.assertj.core.api.Assertions.assertThat; -public class ITTracingCallFactory extends ITHttpClient { +public class ITTracingCallFactory extends ITHttpAsyncClient { @Override protected Call.Factory newClient(int port) { return TracingCallFactory.create(httpTracing, new OkHttpClient.Builder() @@ -28,7 +30,7 @@ public class ITTracingCallFactory extends ITHttpClient { ); } - @Override protected void closeClient(Call.Factory client) throws IOException { + @Override protected void closeClient(Call.Factory client) { ((TracingCallFactory) client).ok.dispatcher().executorService().shutdownNow(); } @@ -45,15 +47,14 @@ public class ITTracingCallFactory extends ITHttpClient { .execute(); } - @Override protected void getAsync(Call.Factory client, String pathIncludingQuery) - throws Exception { + @Override protected void getAsync(Call.Factory client, String pathIncludingQuery) { client.newCall(new Request.Builder().url(url(pathIncludingQuery)).build()) .enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { e.printStackTrace(); } - @Override public void onResponse(Call call, Response response) throws IOException { + @Override public void onResponse(Call call, Response response) { } }); } @@ -79,5 +80,10 @@ public class ITTracingCallFactory extends ITHttpClient { RecordedRequest request = server.takeRequest(); assertThat(request.getHeader("x-b3-traceId")) .isEqualTo(request.getHeader("my-id")); + + // we report one local and one client span + assertThat(Arrays.asList(takeSpan(), takeSpan())) + .extracting(Span::kind) + .containsOnly(null, Span.Kind.CLIENT); } } diff --git a/instrumentation/okhttp3/src/test/java/brave/okhttp3/ITTracingInterceptor.java b/instrumentation/okhttp3/src/test/java/brave/okhttp3/ITTracingInterceptor.java index b2271216f1..f75946e1d4 100644 --- a/instrumentation/okhttp3/src/test/java/brave/okhttp3/ITTracingInterceptor.java +++ b/instrumentation/okhttp3/src/test/java/brave/okhttp3/ITTracingInterceptor.java @@ -1,6 +1,6 @@ package brave.okhttp3; -import brave.http.ITHttpClient; +import brave.http.ITHttpAsyncClient; import java.io.IOException; import java.util.concurrent.TimeUnit; import okhttp3.Call; @@ -12,7 +12,7 @@ import okhttp3.RequestBody; import okhttp3.Response; -public class ITTracingInterceptor extends ITHttpClient { +public class ITTracingInterceptor extends ITHttpAsyncClient { @Override protected Call.Factory newClient(int port) { return new OkHttpClient.Builder() @@ -44,8 +44,7 @@ public class ITTracingInterceptor extends ITHttpClient { .execute(); } - @Override protected void getAsync(Call.Factory client, String pathIncludingQuery) - throws Exception { + @Override protected void getAsync(Call.Factory client, String pathIncludingQuery) { client.newCall(new Request.Builder().url(url(pathIncludingQuery)).build()) .enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { diff --git a/instrumentation/p6spy/src/test/java/brave/p6spy/ITTracingP6Factory.java b/instrumentation/p6spy/src/test/java/brave/p6spy/ITTracingP6Factory.java index f07438a833..94fd369b88 100644 --- a/instrumentation/p6spy/src/test/java/brave/p6spy/ITTracingP6Factory.java +++ b/instrumentation/p6spy/src/test/java/brave/p6spy/ITTracingP6Factory.java @@ -9,7 +9,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.ArrayList; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -27,7 +27,8 @@ public class ITTracingP6Factory { DerbyUtils.disableLog(); } - ConcurrentLinkedDeque spans = new ConcurrentLinkedDeque<>(); + /** JDBC is synchronous and we aren't using thread pools: everything happens on the main thread */ + ArrayList spans = new ArrayList<>(); Tracing tracing = tracingBuilder(Sampler.ALWAYS_SAMPLE, spans).build(); Connection connection; @@ -103,7 +104,7 @@ void prepareExecuteSelect(String query) throws SQLException { } } - static Tracing.Builder tracingBuilder(Sampler sampler, ConcurrentLinkedDeque spans) { + static Tracing.Builder tracingBuilder(Sampler sampler, ArrayList spans) { return Tracing.newBuilder() .spanReporter(spans::add) .currentTraceContext(new StrictCurrentTraceContext()) diff --git a/instrumentation/p6spy/src/test/java/brave/p6spy/TracingJdbcEventListenerTest.java b/instrumentation/p6spy/src/test/java/brave/p6spy/TracingJdbcEventListenerTest.java index 57000c011c..839d5d2bf3 100644 --- a/instrumentation/p6spy/src/test/java/brave/p6spy/TracingJdbcEventListenerTest.java +++ b/instrumentation/p6spy/src/test/java/brave/p6spy/TracingJdbcEventListenerTest.java @@ -9,7 +9,7 @@ import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.SQLException; -import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.ArrayList; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -119,7 +119,7 @@ public class TracingJdbcEventListenerTest { } @Test public void nullSqlWontNPE() throws SQLException { - ConcurrentLinkedDeque spans = new ConcurrentLinkedDeque<>(); + ArrayList spans = new ArrayList<>(); try (Tracing tracing = tracingBuilder(Sampler.ALWAYS_SAMPLE, spans).build()) { when(statementInformation.getSql()).thenReturn(null); @@ -137,8 +137,7 @@ public class TracingJdbcEventListenerTest { } @Test public void handleAfterExecute_without_beforeExecute_getting_called() { - ConcurrentLinkedDeque spans = new ConcurrentLinkedDeque<>(); - Tracing tracing = tracingBuilder(Sampler.ALWAYS_SAMPLE, spans).build(); + Tracing tracing = tracingBuilder(Sampler.ALWAYS_SAMPLE, new ArrayList<>()).build(); Span span = tracing.tracer().nextSpan().start(); try (SpanInScope spanInScope = tracing.tracer().withSpanInScope(span)) { TracingJdbcEventListener listener = new TracingJdbcEventListener("", false); diff --git a/instrumentation/sparkjava/src/test/java/brave/sparkjava/ITSparkTracing.java b/instrumentation/sparkjava/src/test/java/brave/sparkjava/ITSparkTracing.java index c14635fe34..a27a61fdc0 100644 --- a/instrumentation/sparkjava/src/test/java/brave/sparkjava/ITSparkTracing.java +++ b/instrumentation/sparkjava/src/test/java/brave/sparkjava/ITSparkTracing.java @@ -3,19 +3,21 @@ import brave.http.ITHttpServer; import brave.propagation.ExtraFieldPropagation; import org.junit.After; -import org.junit.ComparisonFailure; -import org.junit.Test; +import org.junit.Ignore; import spark.Spark; public class ITSparkTracing extends ITHttpServer { - /** - * Async tests are ignored until https://github.com/perwendel/spark/issues/208 - */ - @Override - @Test(expected = ComparisonFailure.class) - public void async() throws Exception { - super.async(); + @Override @Ignore("ignored until https://github.com/perwendel/spark/issues/208") + public void async() { + } + + @Override @Ignore("ignored until https://github.com/perwendel/spark/issues/208") + public void addsErrorTagOnException_async() { + } + + @Override @Ignore("ignored until https://github.com/perwendel/spark/issues/208") + public void reportsSpanOnException_async() { } @Override protected void init() throws Exception { diff --git a/instrumentation/spring-web/src/it/spring3/src/test/java/brave/spring/web3/ITTracingClientHttpRequestInterceptor.java b/instrumentation/spring-web/src/it/spring3/src/test/java/brave/spring/web3/ITTracingClientHttpRequestInterceptor.java index 1895c343dd..f78ccbaff1 100644 --- a/instrumentation/spring-web/src/it/spring3/src/test/java/brave/spring/web3/ITTracingClientHttpRequestInterceptor.java +++ b/instrumentation/spring-web/src/it/spring3/src/test/java/brave/spring/web3/ITTracingClientHttpRequestInterceptor.java @@ -46,10 +46,6 @@ ClientHttpRequestFactory configureClient(ClientHttpRequestInterceptor intercepto restTemplate.postForObject(url(uri), content, String.class); } - @Override protected void getAsync(ClientHttpRequestFactory client, String uri) { - throw new AssumptionViolatedException("TODO: async rest template has its own interceptor"); - } - @Test public void currentSpanVisibleToUserInterceptors() throws Exception { server.enqueue(new MockResponse()); @@ -64,6 +60,8 @@ ClientHttpRequestFactory configureClient(ClientHttpRequestInterceptor intercepto RecordedRequest request = server.takeRequest(); assertThat(request.getHeader("x-b3-traceId")) .isEqualTo(request.getHeader("my-id")); + + takeSpan(); } @Override @Test(expected = AssertionError.class) diff --git a/instrumentation/spring-web/src/test/java/brave/spring/web/ITTracingAsyncClientHttpRequestInterceptor.java b/instrumentation/spring-web/src/test/java/brave/spring/web/ITTracingAsyncClientHttpRequestInterceptor.java index d4276eefbd..403da94216 100644 --- a/instrumentation/spring-web/src/test/java/brave/spring/web/ITTracingAsyncClientHttpRequestInterceptor.java +++ b/instrumentation/spring-web/src/test/java/brave/spring/web/ITTracingAsyncClientHttpRequestInterceptor.java @@ -1,12 +1,12 @@ package brave.spring.web; -import brave.http.ITHttpClient; +import brave.http.ITHttpAsyncClient; import java.net.URI; import java.util.Arrays; import java.util.Collections; -import java.util.concurrent.ExecutionException; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.RecordedRequest; +import org.junit.Ignore; import org.junit.Test; import org.springframework.http.RequestEntity; import org.springframework.http.client.AsyncClientHttpRequestFactory; @@ -18,7 +18,7 @@ import static org.assertj.core.api.Assertions.assertThat; public class ITTracingAsyncClientHttpRequestInterceptor - extends ITHttpClient { + extends ITHttpAsyncClient { AsyncClientHttpRequestInterceptor interceptor; AsyncClientHttpRequestFactory configureClient(AsyncClientHttpRequestInterceptor interceptor) { @@ -42,25 +42,15 @@ AsyncClientHttpRequestFactory configureClient(AsyncClientHttpRequestInterceptor throws Exception { AsyncRestTemplate restTemplate = new AsyncRestTemplate(client); restTemplate.setInterceptors(Collections.singletonList(interceptor)); - try { - restTemplate.getForEntity(url(pathIncludingQuery), String.class).get(); - } finally { - // TODO: understand why we need sleeps. maybe there's an executor we can change to same thread - Thread.sleep(100); - } + restTemplate.getForEntity(url(pathIncludingQuery), String.class).get(); } @Override protected void post(AsyncClientHttpRequestFactory client, String uri, String content) throws Exception { AsyncRestTemplate restTemplate = new AsyncRestTemplate(client); restTemplate.setInterceptors(Collections.singletonList(interceptor)); - try { - restTemplate.postForEntity(url(uri), RequestEntity.post(URI.create(url(uri))).body(content), - String.class).get(); - } finally { - // TODO: understand why we need sleeps. maybe there's an executor we can change to same thread - Thread.sleep(100); - } + restTemplate.postForEntity(url(uri), RequestEntity.post(URI.create(url(uri))).body(content), + String.class).get(); } @Override protected void getAsync(AsyncClientHttpRequestFactory client, String uri) { @@ -83,15 +73,15 @@ AsyncClientHttpRequestFactory configureClient(AsyncClientHttpRequestInterceptor RecordedRequest request = server.takeRequest(); assertThat(request.getHeader("x-b3-traceId")) .isEqualTo(request.getHeader("my-id")); + + takeSpan(); } - @Override @Test(expected = ExecutionException.class) - public void redirect() throws Exception { // blind to the implementation of redirects - super.redirect(); + @Override @Ignore("blind to the implementation of redirects") + public void redirect() { } - @Override @Test(expected = AssertionError.class) - public void reportsServerAddress() throws Exception { // doesn't know the remote address - super.reportsServerAddress(); + @Override @Ignore("doesn't know the remote address") + public void reportsServerAddress() { } } diff --git a/instrumentation/spring-web/src/test/java/brave/spring/web/ITTracingClientHttpRequestInterceptor.java b/instrumentation/spring-web/src/test/java/brave/spring/web/ITTracingClientHttpRequestInterceptor.java index 96de423d14..0914f27e8b 100644 --- a/instrumentation/spring-web/src/test/java/brave/spring/web/ITTracingClientHttpRequestInterceptor.java +++ b/instrumentation/spring-web/src/test/java/brave/spring/web/ITTracingClientHttpRequestInterceptor.java @@ -5,7 +5,7 @@ import java.util.Collections; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.RecordedRequest; -import org.junit.AssumptionViolatedException; +import org.junit.Ignore; import org.junit.Test; import org.springframework.http.client.ClientHttpRequestFactory; import org.springframework.http.client.ClientHttpRequestInterceptor; @@ -45,10 +45,6 @@ ClientHttpRequestFactory configureClient(ClientHttpRequestInterceptor intercepto restTemplate.postForObject(url(uri), content, String.class); } - @Override protected void getAsync(ClientHttpRequestFactory client, String uri) { - throw new AssumptionViolatedException("TODO: async rest template has its own interceptor"); - } - @Test public void currentSpanVisibleToUserInterceptors() throws Exception { server.enqueue(new MockResponse()); @@ -63,15 +59,15 @@ ClientHttpRequestFactory configureClient(ClientHttpRequestInterceptor intercepto RecordedRequest request = server.takeRequest(); assertThat(request.getHeader("x-b3-traceId")) .isEqualTo(request.getHeader("my-id")); + + takeSpan(); } - @Override @Test(expected = AssertionError.class) - public void redirect() throws Exception { // blind to the implementation of redirects - super.redirect(); + @Override @Ignore("blind to the implementation of redirects") + public void redirect() { } - @Override @Test(expected = AssertionError.class) - public void reportsServerAddress() throws Exception { // doesn't know the remote address - super.reportsServerAddress(); + @Override @Ignore("doesn't know the remote address") + public void reportsServerAddress() { } } diff --git a/instrumentation/vertx-web/src/main/java/brave/vertx/web/TracingRoutingContextHandler.java b/instrumentation/vertx-web/src/main/java/brave/vertx/web/TracingRoutingContextHandler.java index d07a9171e8..ab5246a90f 100644 --- a/instrumentation/vertx-web/src/main/java/brave/vertx/web/TracingRoutingContextHandler.java +++ b/instrumentation/vertx-web/src/main/java/brave/vertx/web/TracingRoutingContextHandler.java @@ -15,7 +15,14 @@ import zipkin2.Endpoint; /** - * Idea for how to address re-route was from {@code TracingHandler} in https://github.com/opentracing-contrib/java-vertx-web + *

Why not rely on {@code context.request().endHandler()} to finish a span?

+ *

There can be only one {@link HttpServerRequest#endHandler(Handler) end handler}. We can't rely + * on {@code endHandler()} as a user can override it in their route. If they did, we'd leak an + * unfinished span. For this reason, we speculatively use both an end handler and an end header + * handler. + * + *

The hint that we need to re-attach the headers handler on re-route came from looking at + * {@code TracingHandler} in https://github.com/opentracing-contrib/java-vertx-web */ final class TracingRoutingContextHandler implements Handler { static final Getter GETTER = new Getter() { @@ -29,68 +36,84 @@ final class TracingRoutingContextHandler implements Handler { }; final Tracer tracer; - final HttpServerHandler handler; + final HttpServerHandler serverHandler; final TraceContext.Extractor extractor; TracingRoutingContextHandler(HttpTracing httpTracing) { tracer = httpTracing.tracing().tracer(); - handler = HttpServerHandler.create(httpTracing, ADAPTER); + serverHandler = HttpServerHandler.create(httpTracing, new Adapter()); extractor = httpTracing.tracing().propagation().extractor(GETTER); } @Override public void handle(RoutingContext context) { - boolean newRequest = false; - Span span = context.get(Span.class.getName()); - if (span == null) { - newRequest = true; - span = handler.handleReceive(extractor, context.request()); - context.put(Span.class.getName(), span); + TracingHandler tracingHandler = context.get(TracingHandler.class.getName()); + if (tracingHandler != null) { // then we already have a span + if (!context.failed()) { // re-routed, so re-attach the end handler + context.addHeadersEndHandler(tracingHandler); + } + context.next(); + return; } - if (newRequest || !context.failed()) { // re-routed, so re-attach the end handler - // Note: In Brave, finishing a client span after headers sent is normal. - context.addHeadersEndHandler(finishHttpSpan(context, span)); - } + Span span = serverHandler.handleReceive(extractor, context.request()); + TracingHandler handler = new TracingHandler(context, span); + context.put(TracingHandler.class.getName(), handler); + + // When a route ends a request directly, this will finish the span + context.request().endHandler(handler); + // When a route overwrites the above endHandler, this will finish the span + context.addHeadersEndHandler(handler); try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) { context.next(); - } catch (RuntimeException | Error e) { - handler.handleSend(null, e, span); - throw e; } } - Handler finishHttpSpan(RoutingContext context, Span span) { - return v -> handler.handleSend(context.response(), context.failure(), span); + class TracingHandler implements Handler { + final RoutingContext context; + final Span span; + + TracingHandler(RoutingContext context, Span span) { + this.context = context; + this.span = span; + } + + @Override public void handle(Void aVoid) { + if (!context.request().isEnded()) return; + serverHandler.handleSend(context.response(), context.failure(), span); + } } - static final HttpServerAdapter ADAPTER = - new HttpServerAdapter() { - @Override public String method(HttpServerRequest request) { - return request.method().name(); - } - - @Override public String url(HttpServerRequest request) { - return request.absoluteURI(); - } - - @Override public String requestHeader(HttpServerRequest request, String name) { - return request.headers().get(name); - } - - @Override public Integer statusCode(HttpServerResponse response) { - return response.getStatusCode(); - } - - @Override - public boolean parseClientAddress(HttpServerRequest req, Endpoint.Builder builder) { - if (super.parseClientAddress(req, builder)) return true; - SocketAddress addr = req.remoteAddress(); - if (builder.parseIp(addr.host())) { - builder.port(addr.port()); - return true; - } - return false; - } - }; + static final class Adapter extends HttpServerAdapter { + @Override public String method(HttpServerRequest request) { + return request.method().name(); + } + + @Override public String path(HttpServerRequest request) { + return request.path(); + } + + @Override public String url(HttpServerRequest request) { + return request.absoluteURI(); + } + + @Override public String requestHeader(HttpServerRequest request, String name) { + return request.headers().get(name); + } + + @Override public Integer statusCode(HttpServerResponse response) { + return response.getStatusCode(); + } + + @Override + public boolean parseClientAddress(HttpServerRequest req, Endpoint.Builder builder) { + if (super.parseClientAddress(req, builder)) return true; + SocketAddress addr = req.remoteAddress(); + if (builder.parseIp(addr.host())) { + builder.port(addr.port()); + return true; + } + return false; + } + } } diff --git a/instrumentation/vertx-web/src/test/java/brave/vertx/web/ITVertxWebTracing.java b/instrumentation/vertx-web/src/test/java/brave/vertx/web/ITVertxWebTracing.java index 636fc76d06..045a1f04d8 100644 --- a/instrumentation/vertx-web/src/test/java/brave/vertx/web/ITVertxWebTracing.java +++ b/instrumentation/vertx-web/src/test/java/brave/vertx/web/ITVertxWebTracing.java @@ -1,5 +1,8 @@ package brave.vertx.web; +import brave.SpanCustomizer; +import brave.http.HttpAdapter; +import brave.http.HttpServerParser; import brave.http.ITHttpServer; import brave.propagation.ExtraFieldPropagation; import io.vertx.core.Handler; @@ -13,9 +16,9 @@ import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Test; +import zipkin2.Span; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.entry; public class ITVertxWebTracing extends ITHttpServer { Vertx vertx; @@ -36,6 +39,9 @@ public class ITVertxWebTracing extends ITHttpServer { router.route("/reroute").handler(ctx -> { ctx.reroute("/foo"); }); + router.route("/rerouteAsync").handler(ctx -> { + ctx.reroute("/async"); + }); router.route("/extra").handler(ctx -> { ctx.response().end(ExtraFieldPropagation.get(EXTRA_KEY)); }); @@ -53,7 +59,6 @@ public class ITVertxWebTracing extends ITHttpServer { ctx.request().endHandler(v -> ctx.fail(new Exception())); }); - Handler routingContextHandler = VertxWebTracing.create(httpTracing).routingContextHandler(); router.route() @@ -75,12 +80,29 @@ public class ITVertxWebTracing extends ITHttpServer { // makes sure we don't accidentally rewrite the incoming http path @Test public void handlesReroute() throws Exception { - get("/reroute"); + handlesReroute("/reroute"); + } + + @Test public void handlesRerouteAsync() throws Exception { + handlesReroute("/rerouteAsync"); + } + + void handlesReroute(String path) throws Exception { + httpTracing = httpTracing.toBuilder().serverParser(new HttpServerParser() { + @Override + public void request(HttpAdapter adapter, Req req, SpanCustomizer customizer) { + super.request(adapter, req, customizer); + customizer.tag("http.url", adapter.url(req)); // just the path is logged by default + } + }).build(); + init(); + + get(path); - assertThat(spans) - .hasSize(1) - .flatExtracting(s -> s.tags().entrySet()) - .contains(entry("http.path", "/reroute")); + Span span = takeSpan(); + assertThat(span.tags()) + .containsEntry("http.path", path) + .containsEntry("http.url", url(path)); } @Override