Skip to content

Commit

Permalink
Deflakes async tests by switching to concurrent blocking queue (#606)
Browse files Browse the repository at this point in the history
This removes some Thread.sleep and other guards from tests by changing
assertions to use a blocking queue. It also improves some of the state
checking, particularly around leaked trace contexts. This uncovered two
where async instrumentation (vertx and netty) redundantly scoped a span.
  • Loading branch information
adriancole authored Feb 12, 2018
1 parent 7db7bd1 commit 66585ba
Show file tree
Hide file tree
Showing 29 changed files with 631 additions and 402 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -45,7 +47,11 @@
public class ITTracingClientInterceptor {
Logger testLogger = LogManager.getLogger();

ConcurrentLinkedDeque<Span> spans = new ConcurrentLinkedDeque<>();
/**
* See brave.http.ITHttp for rationale on using a concurrent blocking queue eventhough some calls,
* like those using blocking clients, happen on the main thread.
*/
BlockingQueue<Span> spans = new LinkedBlockingQueue<>();

GrpcTracing tracing = GrpcTracing.create(tracingBuilder(Sampler.ALWAYS_SAMPLE).build());
Tracer tracer = tracing.tracing().tracer();
Expand All @@ -60,6 +66,10 @@ public class ITTracingClientInterceptor {
@After public void close() throws Exception {
closeClient(client);
server.stop();
// From brave.http.ITHttp.close
assertThat(spans.poll(100, TimeUnit.MILLISECONDS))
.withFailMessage("Span remaining in queue. Check for exception or redundant reporting")
.isNull();
Tracing current = Tracing.current();
if (current != null) current.close();
}
Expand All @@ -86,6 +96,8 @@ void closeClient(ManagedChannel client) throws Exception {
TraceContext context = server.takeRequest().context();
assertThat(context.parentId()).isNull();
assertThat(context.sampled()).isTrue();

spans.take();
}

@Test public void makesChildOfCurrentSpan() throws Exception {
Expand All @@ -101,6 +113,11 @@ void closeClient(ManagedChannel client) throws Exception {
.isEqualTo(parent.context().traceId());
assertThat(context.parentId())
.isEqualTo(parent.context().spanId());

// we report one local and one client span
assertThat(Arrays.asList(spans.take(), spans.take()))
.extracting(Span::kind)
.containsOnly(null, Span.Kind.CLIENT);
}

/**
Expand Down Expand Up @@ -131,6 +148,11 @@ void closeClient(ManagedChannel client) throws Exception {
} finally {
otherSpan.finish();
}

// Check we reported 2 local spans and 2 client spans
assertThat(Arrays.asList(spans.take(), spans.take(), spans.take(), spans.take()))
.extracting(Span::kind)
.containsOnly(null, Span.Kind.CLIENT);
}

/** Unlike Brave 3, Brave 4 propagates trace ids even when unsampled */
Expand All @@ -143,25 +165,39 @@ void closeClient(ManagedChannel client) throws Exception {

TraceContextOrSamplingFlags extracted = server.takeRequest();
assertThat(extracted.sampled()).isFalse();

// @After will check that nothing is reported
}

@Test public void reportsClientKindToZipkin() throws Exception {
GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST);

assertThat(spans)
.extracting(Span::kind)
.containsExactly(Span.Kind.CLIENT);
Span span = spans.take();
assertThat(span.kind())
.isEqualTo(Span.Kind.CLIENT);
}

@Test public void defaultSpanNameIsMethodName() throws Exception {
GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST);

assertThat(spans)
.extracting(Span::name)
.containsExactly("helloworld.greeter/sayhello");
Span span = spans.take();
assertThat(span.name())
.isEqualTo("helloworld.greeter/sayhello");
}

@Test public void onTransportException_reportsSpan() throws Exception {
spanFromTransportException();
}

@Test public void onTransportException_addsErrorTag() throws Exception {
Span span = spanFromTransportException();
assertThat(span.tags()).containsExactly(
entry("error", "UNAVAILABLE"),
entry("grpc.status_code", "UNAVAILABLE")
);
}

@Test public void reportsSpanOnTransportException() throws Exception {
Span spanFromTransportException() throws InterruptedException {
server.stop();

try {
Expand All @@ -170,10 +206,7 @@ void closeClient(ManagedChannel client) throws Exception {
} catch (StatusRuntimeException e) {
}

assertThat(spans).flatExtracting(s -> s.tags().entrySet()).containsExactly(
entry("error", "UNAVAILABLE"),
entry("grpc.status_code", "UNAVAILABLE")
);
return spans.take();
}

@Test public void addsErrorTag_onUnimplemented() throws Exception {
Expand All @@ -183,30 +216,21 @@ void closeClient(ManagedChannel client) throws Exception {
} catch (StatusRuntimeException e) {
}

assertThat(spans).flatExtracting(s -> s.tags().entrySet()).containsExactly(
Span span = spans.take();
assertThat(span.tags()).containsExactly(
entry("error", "UNIMPLEMENTED"),
entry("grpc.status_code", "UNIMPLEMENTED")
);
}

@Test public void addsErrorTag_onTransportException() throws Exception {
reportsSpanOnTransportException();

assertThat(spans).flatExtracting(s -> s.tags().entrySet()).containsExactly(
entry("error", "UNAVAILABLE"),
entry("grpc.status_code", "UNAVAILABLE")
);
}

@Test public void addsErrorTag_onCanceledFuture() throws Exception {
server.enqueueDelay(TimeUnit.SECONDS.toMillis(1));

ListenableFuture<HelloReply> resp = GreeterGrpc.newFutureStub(client).sayHello(HELLO_REQUEST);
assumeTrue("lost race on cancel", resp.cancel(true));

close(); // blocks until the cancel finished

assertThat(spans).flatExtracting(s -> s.tags().entrySet()).containsExactly(
Span span = spans.take();
assertThat(span.tags()).containsExactly(
entry("error", "CANCELLED"),
entry("grpc.status_code", "CANCELLED")
);
Expand Down Expand Up @@ -244,6 +268,8 @@ public void start(Listener<RespT> responseListener, Metadata headers) {

assertThat(scopes)
.containsKeys("before", "start");

spans.take();
}

@Test public void clientParserTest() throws Exception {
Expand Down Expand Up @@ -272,8 +298,9 @@ protected <ReqT, RespT> String spanName(MethodDescriptor<ReqT, RespT> methodDesc

GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST);

assertThat(spans.getFirst().name()).isEqualTo("unary");
assertThat(spans).flatExtracting(s -> s.tags().keySet()).containsExactlyInAnyOrder(
Span span = spans.take();
assertThat(span.name()).isEqualTo("unary");
assertThat(span.tags()).containsKeys(
"grpc.message_received", "grpc.message_sent",
"grpc.message_received.visible", "grpc.message_sent.visible"
);
Expand All @@ -294,14 +321,15 @@ public void clientParserTestStreamingResponse() throws Exception {
Iterator<HelloReply> replies = GreeterGrpc.newBlockingStub(client)
.sayHelloWithManyReplies(HelloRequest.newBuilder().setName("this is dog").build());
assertThat(replies).hasSize(10);
assertThat(spans).hasSize(1);

Span span = spans.take();
// all response messages are tagged to the same span
assertThat(spans.getFirst().tags()).hasSize(10);
assertThat(span.tags()).hasSize(10);
}

Tracing.Builder tracingBuilder(Sampler sampler) {
return Tracing.newBuilder()
.spanReporter((zipkin2.reporter.Reporter<zipkin2.Span>) spans::add)
.spanReporter(spans::add)
.currentTraceContext( // connect to log4j
ThreadContextCurrentTraceContext.create(new StrictCurrentTraceContext()))
.sampler(sampler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
Expand All @@ -53,7 +54,8 @@ public class ITTracingServerInterceptor {

@Rule public ExpectedException thrown = ExpectedException.none();

ConcurrentLinkedDeque<Span> spans = new ConcurrentLinkedDeque<>();
/** See brave.http.ITHttp for rationale on using a concurrent blocking queue */
BlockingQueue<Span> spans = new LinkedBlockingQueue<>();

GrpcTracing grpcTracing;
Server server;
Expand Down Expand Up @@ -95,6 +97,10 @@ void init(@Nullable ServerInterceptor userInterceptor) throws Exception {
server.shutdown();
server.awaitTermination();
}
// From brave.http.ITHttp.close
assertThat(spans.poll(100, TimeUnit.MILLISECONDS))
.withFailMessage("Span remaining in queue. Check for exception or redundant reporting")
.isNull();
Tracing current = Tracing.current();
if (current != null) current.close();
}
Expand Down Expand Up @@ -123,11 +129,11 @@ public void start(Listener<RespT> responseListener, Metadata headers) {

GreeterGrpc.newBlockingStub(channel).sayHello(HELLO_REQUEST);

Span s = tryTakeSpan();
assertThat(s.traceId()).isEqualTo(traceId);
assertThat(s.parentId()).isEqualTo(parentId);
assertThat(s.id()).isEqualTo(spanId);
assertThat(s.shared()).isTrue();
Span span = spans.take();
assertThat(span.traceId()).isEqualTo(traceId);
assertThat(span.parentId()).isEqualTo(parentId);
assertThat(span.id()).isEqualTo(spanId);
assertThat(span.shared()).isTrue();
}

@Test public void createsChildWhenJoinDisabled() throws Exception {
Expand Down Expand Up @@ -157,11 +163,11 @@ public void start(Listener<RespT> responseListener, Metadata headers) {

GreeterGrpc.newBlockingStub(channel).sayHello(HELLO_REQUEST);

Span s = tryTakeSpan();
assertThat(s.traceId()).isEqualTo(traceId);
assertThat(s.parentId()).isEqualTo(spanId);
assertThat(s.id()).isNotEqualTo(spanId);
assertThat(s.shared()).isNull();
Span span = spans.take();
assertThat(span.traceId()).isEqualTo(traceId);
assertThat(span.parentId()).isEqualTo(spanId);
assertThat(span.id()).isNotEqualTo(spanId);
assertThat(span.shared()).isNull();
}

@Test public void samplingDisabled() throws Exception {
Expand All @@ -170,7 +176,7 @@ public void start(Listener<RespT> responseListener, Metadata headers) {

GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST);

assertThat(spans).isEmpty();
// @After will check that nothing is reported
}

/**
Expand All @@ -194,24 +200,30 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, Re

assertThat(fromUserInterceptor.get())
.isNotNull();

spans.take();
}

@Test public void currentSpanVisibleToImpl() throws Exception {
assertThat(GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST).getMessage())
.isNotEmpty();

spans.take();
}

@Test public void reportsServerKindToZipkin() throws Exception {
GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST);

assertThat(tryTakeSpan().kind())
Span span = spans.take();
assertThat(span.kind())
.isEqualTo(Span.Kind.SERVER);
}

@Test public void defaultSpanNameIsMethodName() throws Exception {
GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST);

assertThat(tryTakeSpan().name())
Span span = spans.take();
assertThat(span.name())
.isEqualTo("helloworld.greeter/sayhello");
}

Expand All @@ -221,7 +233,8 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, Re
.sayHello(HelloRequest.newBuilder().setName("bad").build());
failBecauseExceptionWasNotThrown(StatusRuntimeException.class);
} catch (StatusRuntimeException e) {
assertThat(tryTakeSpan().tags()).containsExactly(
Span span = spans.take();
assertThat(span.tags()).containsExactly(
entry("error", "UNKNOWN"),
entry("grpc.status_code", "UNKNOWN")
);
Expand Down Expand Up @@ -254,7 +267,7 @@ protected <ReqT, RespT> String spanName(MethodDescriptor<ReqT, RespT> methodDesc

GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST);

Span span = tryTakeSpan();
Span span = spans.take();
assertThat(span.name()).isEqualTo("unary");
assertThat(span.tags().keySet()).containsExactlyInAnyOrder(
"grpc.message_received", "grpc.message_sent",
Expand All @@ -276,7 +289,8 @@ protected <ReqT, RespT> String spanName(MethodDescriptor<ReqT, RespT> methodDesc
.sayHelloWithManyReplies(HELLO_REQUEST);
assertThat(replies).hasSize(10);
// all response messages are tagged to the same span
assertThat(tryTakeSpan().tags()).hasSize(10);
Span span = spans.take();
assertThat(span.tags()).hasSize(10);
}

Tracing.Builder tracingBuilder(Sampler sampler) {
Expand All @@ -286,13 +300,4 @@ Tracing.Builder tracingBuilder(Sampler sampler) {
ThreadContextCurrentTraceContext.create(new StrictCurrentTraceContext()))
.sampler(sampler);
}

// Flake detection logic. If the test fails after the retry it might not be a timing issue.
Span tryTakeSpan() throws InterruptedException {
if (spans.isEmpty()) {
testLogger.warn("delayed getting a span; retrying");
Thread.sleep(100);
}
return spans.poll();
}
}
Loading

0 comments on commit 66585ba

Please sign in to comment.