From a26697c5ee6ecd4cf0e41f25f1be414cc29e8566 Mon Sep 17 00:00:00 2001 From: kojilin Date: Sun, 17 Jun 2018 23:55:09 +0900 Subject: [PATCH 1/2] Make zipkin's current context can be nested Apply this change will fix TraceContext leak in thread problem. There is a case that we keep armeria's request context aware CompletableFuture for fetching data, and let multiple request register callback on that future. So there is a problem that future uses first request's eventloop and RequestContext to call waiting handlers, but each handler may wrap with it's RequestContext, so there is nested RequestContext on same thread(onEnter->onEnter->onExit->onExit). Another solution is application developer should always jump back to current request's context aware eventloop. e.g. ``` cachedFuture.acceptAsync((r, e)->{ ... }, RequestContext.current.contextAwareEventloop); ``` I'm not sure if armeria side need to take care of this or not... --- .../client/tracing/HttpTracingClient.java | 3 +- .../internal/tracing/SpanContextUtil.java | 22 ++++++--- .../internal/tracing/SpanInScopeWrapper.java | 47 +++++++++++++++++++ .../server/tracing/HttpTracingService.java | 3 +- 4 files changed, 67 insertions(+), 8 deletions(-) create mode 100644 zipkin/src/main/java/com/linecorp/armeria/internal/tracing/SpanInScopeWrapper.java diff --git a/zipkin/src/main/java/com/linecorp/armeria/client/tracing/HttpTracingClient.java b/zipkin/src/main/java/com/linecorp/armeria/client/tracing/HttpTracingClient.java index 1a05996a99d..cfacc48b5a2 100644 --- a/zipkin/src/main/java/com/linecorp/armeria/client/tracing/HttpTracingClient.java +++ b/zipkin/src/main/java/com/linecorp/armeria/client/tracing/HttpTracingClient.java @@ -33,6 +33,7 @@ import com.linecorp.armeria.common.logging.RequestLogAvailability; import com.linecorp.armeria.internal.tracing.AsciiStringKeyFactory; import com.linecorp.armeria.internal.tracing.SpanContextUtil; +import com.linecorp.armeria.internal.tracing.SpanInScopeWrapper; import brave.Span; import brave.Span.Kind; @@ -52,7 +53,7 @@ */ public class HttpTracingClient extends SimpleDecoratingClient { - private static final FastThreadLocal SPAN_IN_THREAD = new FastThreadLocal<>(); + private static final FastThreadLocal SPAN_IN_THREAD = new FastThreadLocal<>(); /** * Creates a new tracing {@link Client} decorator using the specified {@link Tracing} instance. diff --git a/zipkin/src/main/java/com/linecorp/armeria/internal/tracing/SpanContextUtil.java b/zipkin/src/main/java/com/linecorp/armeria/internal/tracing/SpanContextUtil.java index dcf3967f7db..4d7c5b252c8 100644 --- a/zipkin/src/main/java/com/linecorp/armeria/internal/tracing/SpanContextUtil.java +++ b/zipkin/src/main/java/com/linecorp/armeria/internal/tracing/SpanContextUtil.java @@ -32,13 +32,23 @@ public final class SpanContextUtil { /** * Sets up the {@link RequestContext} to push and pop the {@link Span} whenever it is entered/exited. */ - public static void setupContext(FastThreadLocal threadLocalSpan, RequestContext ctx, Span span, - Tracer tracer) { - ctx.onEnter(unused -> threadLocalSpan.set(tracer.withSpanInScope(span))); + public static void setupContext(FastThreadLocal threadLocalSpan, RequestContext ctx, + Span span, Tracer tracer) { + ctx.onEnter(unused -> { + final SpanInScopeWrapper current = threadLocalSpan.get(); + final SpanInScope newScope = tracer.withSpanInScope(span); + threadLocalSpan.set(new SpanInScopeWrapper(newScope, current)); + }); ctx.onExit(unused -> { - final SpanInScope spanInScope = threadLocalSpan.get(); - if (spanInScope != null) { - spanInScope.close(); + final SpanInScopeWrapper spanInScope = threadLocalSpan.get(); + if (spanInScope == null) { + return; + } + spanInScope.close(); + final SpanInScopeWrapper previousScope = spanInScope.getPrevious(); + if (previousScope != null) { + threadLocalSpan.set(previousScope); + } else { threadLocalSpan.remove(); } }); diff --git a/zipkin/src/main/java/com/linecorp/armeria/internal/tracing/SpanInScopeWrapper.java b/zipkin/src/main/java/com/linecorp/armeria/internal/tracing/SpanInScopeWrapper.java new file mode 100644 index 00000000000..8fc63c73538 --- /dev/null +++ b/zipkin/src/main/java/com/linecorp/armeria/internal/tracing/SpanInScopeWrapper.java @@ -0,0 +1,47 @@ +/* + * Copyright 2018 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.internal.tracing; + +import javax.annotation.Nullable; + +import brave.Tracer.SpanInScope; + +/** + * SpanInScope Wrapper for keeping previous span scope. + */ +public final class SpanInScopeWrapper implements AutoCloseable { + + private final SpanInScope spanInScope; + + @Nullable + private final SpanInScopeWrapper previous; + + public SpanInScopeWrapper(SpanInScope current, @Nullable SpanInScopeWrapper previous) { + this.spanInScope = current; + this.previous = previous; + } + + @Nullable + public SpanInScopeWrapper getPrevious() { + return previous; + } + + @Override + public void close() { + spanInScope.close(); + } +} diff --git a/zipkin/src/main/java/com/linecorp/armeria/server/tracing/HttpTracingService.java b/zipkin/src/main/java/com/linecorp/armeria/server/tracing/HttpTracingService.java index 0359725900e..6b93c2d70f1 100644 --- a/zipkin/src/main/java/com/linecorp/armeria/server/tracing/HttpTracingService.java +++ b/zipkin/src/main/java/com/linecorp/armeria/server/tracing/HttpTracingService.java @@ -24,6 +24,7 @@ import com.linecorp.armeria.common.logging.RequestLogAvailability; import com.linecorp.armeria.internal.tracing.AsciiStringKeyFactory; import com.linecorp.armeria.internal.tracing.SpanContextUtil; +import com.linecorp.armeria.internal.tracing.SpanInScopeWrapper; import com.linecorp.armeria.server.Service; import com.linecorp.armeria.server.ServiceRequestContext; import com.linecorp.armeria.server.SimpleDecoratingService; @@ -46,7 +47,7 @@ */ public class HttpTracingService extends SimpleDecoratingService { - private static final FastThreadLocal SPAN_IN_THREAD = new FastThreadLocal<>(); + private static final FastThreadLocal SPAN_IN_THREAD = new FastThreadLocal<>(); /** * Creates a new tracing {@link Service} decorator using the specified {@link Tracing} instance. From b61e37a85cf9610054d299af4d8970df9e97819a Mon Sep 17 00:00:00 2001 From: kojilin Date: Mon, 18 Jun 2018 12:53:57 +0900 Subject: [PATCH 2/2] Append test. --- .../internal/tracing/SpanContextUtil.java | 2 +- .../internal/tracing/SpanInScopeWrapper.java | 4 +- .../tracing/HttpTracingIntegrationTest.java | 29 +-- ...tpTracingNestedContextIntegrationTest.java | 184 ++++++++++++++++++ .../armeria/it/tracing/ReporterImpl.java | 51 +++++ 5 files changed, 240 insertions(+), 30 deletions(-) create mode 100644 zipkin/src/test/java/com/linecorp/armeria/it/tracing/HttpTracingNestedContextIntegrationTest.java create mode 100644 zipkin/src/test/java/com/linecorp/armeria/it/tracing/ReporterImpl.java diff --git a/zipkin/src/main/java/com/linecorp/armeria/internal/tracing/SpanContextUtil.java b/zipkin/src/main/java/com/linecorp/armeria/internal/tracing/SpanContextUtil.java index 4d7c5b252c8..0ceed3fcaf4 100644 --- a/zipkin/src/main/java/com/linecorp/armeria/internal/tracing/SpanContextUtil.java +++ b/zipkin/src/main/java/com/linecorp/armeria/internal/tracing/SpanContextUtil.java @@ -45,7 +45,7 @@ public static void setupContext(FastThreadLocal threadLocalS return; } spanInScope.close(); - final SpanInScopeWrapper previousScope = spanInScope.getPrevious(); + final SpanInScopeWrapper previousScope = spanInScope.previous(); if (previousScope != null) { threadLocalSpan.set(previousScope); } else { diff --git a/zipkin/src/main/java/com/linecorp/armeria/internal/tracing/SpanInScopeWrapper.java b/zipkin/src/main/java/com/linecorp/armeria/internal/tracing/SpanInScopeWrapper.java index 8fc63c73538..9896cec1dc6 100644 --- a/zipkin/src/main/java/com/linecorp/armeria/internal/tracing/SpanInScopeWrapper.java +++ b/zipkin/src/main/java/com/linecorp/armeria/internal/tracing/SpanInScopeWrapper.java @@ -21,7 +21,7 @@ import brave.Tracer.SpanInScope; /** - * SpanInScope Wrapper for keeping previous span scope. + * {@link SpanInScope} wrapper that keeps the previous {@link SpanInScope}. */ public final class SpanInScopeWrapper implements AutoCloseable { @@ -36,7 +36,7 @@ public SpanInScopeWrapper(SpanInScope current, @Nullable SpanInScopeWrapper prev } @Nullable - public SpanInScopeWrapper getPrevious() { + public SpanInScopeWrapper previous() { return previous; } diff --git a/zipkin/src/test/java/com/linecorp/armeria/it/tracing/HttpTracingIntegrationTest.java b/zipkin/src/test/java/com/linecorp/armeria/it/tracing/HttpTracingIntegrationTest.java index de7df237638..282ab08c4f9 100644 --- a/zipkin/src/test/java/com/linecorp/armeria/it/tracing/HttpTracingIntegrationTest.java +++ b/zipkin/src/test/java/com/linecorp/armeria/it/tracing/HttpTracingIntegrationTest.java @@ -24,15 +24,11 @@ import static com.linecorp.armeria.common.thrift.ThriftSerializationFormats.BINARY; import static org.assertj.core.api.Assertions.assertThat; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.IntStream; import org.apache.thrift.async.AsyncMethodCallback; @@ -68,7 +64,6 @@ import brave.propagation.CurrentTraceContext; import brave.sampler.Sampler; import zipkin2.Span; -import zipkin2.reporter.Reporter; public class HttpTracingIntegrationTest { @@ -162,7 +157,7 @@ public void setupClients() { .build(HelloService.Iface.class); zipClient = new ClientBuilder(server.uri(BINARY, "/zip")) .decorator(HttpRequest.class, HttpResponse.class, - HttpTracingClient.newDecorator(newTracing("client/zip"))) + HttpTracingClient.newDecorator(newTracing("client/zip"))) .build(HelloService.Iface.class); fooClientWithoutTracing = Clients.newClient(server.uri(BINARY, "/foo"), HelloService.Iface.class); barClient = newClient("/bar"); @@ -177,7 +172,7 @@ public void tearDown() { @After public void shouldHaveNoExtraSpans() { - assertThat(spanReporter.spans).isEmpty(); + assertThat(spanReporter.getSpans()).isEmpty(); } private static HttpTracingService decorate(String name, Service service) { @@ -331,24 +326,4 @@ public void onError(Exception exception) { resultHandler.onError(exception); } } - - private static class ReporterImpl implements Reporter { - private final BlockingQueue spans = new LinkedBlockingQueue<>(); - - @Override - public void report(Span span) { - spans.add(span); - } - - Span[] take(int numSpans) throws InterruptedException { - final List taken = new ArrayList<>(); - while (taken.size() < numSpans) { - taken.add(spans.take()); - } - - // Reverse the collected spans to sort the spans by request time. - Collections.reverse(taken); - return taken.toArray(new Span[numSpans]); - } - } } diff --git a/zipkin/src/test/java/com/linecorp/armeria/it/tracing/HttpTracingNestedContextIntegrationTest.java b/zipkin/src/test/java/com/linecorp/armeria/it/tracing/HttpTracingNestedContextIntegrationTest.java new file mode 100644 index 00000000000..c38413eeba5 --- /dev/null +++ b/zipkin/src/test/java/com/linecorp/armeria/it/tracing/HttpTracingNestedContextIntegrationTest.java @@ -0,0 +1,184 @@ +/* + * Copyright 2018 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.it.tracing; + +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static com.linecorp.armeria.common.HttpStatus.INTERNAL_SERVER_ERROR; +import static com.linecorp.armeria.common.HttpStatus.OK; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; +import static org.awaitility.Awaitility.with; + +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.awaitility.core.ConditionTimeoutException; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import com.linecorp.armeria.client.HttpClient; +import com.linecorp.armeria.common.AggregatedHttpMessage; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.RequestContext; +import com.linecorp.armeria.common.util.SafeCloseable; +import com.linecorp.armeria.server.AbstractHttpService; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.server.Service; +import com.linecorp.armeria.server.ServiceRequestContext; +import com.linecorp.armeria.server.tracing.HttpTracingService; +import com.linecorp.armeria.testing.server.ServerRule; + +import brave.Tracing; +import brave.propagation.CurrentTraceContext; +import brave.sampler.Sampler; +import zipkin2.Span; + +public class HttpTracingNestedContextIntegrationTest { + + private static final ReporterImpl spanReporter = new ReporterImpl(); + + private HttpClient poolHttpClient; + + private final CountDownLatch waitCreateCache = new CountDownLatch(1); + + @Rule + public final ServerRule server = new ServerRule() { + @Override + protected void configure(ServerBuilder sb) throws Exception { + + final CountDownLatch countDownLatch = new CountDownLatch(2); + final AtomicReference> cache = new AtomicReference<>(); + + sb.service("/non-trace", new AbstractHttpService() { + @Override + protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req) + throws Exception { + if (Tracing.currentTracer().currentSpan() != null) { + return HttpResponse.of(INTERNAL_SERVER_ERROR); + } + return HttpResponse.of(OK); + } + }); + + sb.service("/create-cache", decorate("service/create-cache", new AbstractHttpService() { + @Override + protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req) + throws Exception { + final CompletableFuture future = CompletableFuture.supplyAsync(() -> { + try { + countDownLatch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return OK; + }, RequestContext.current().contextAwareEventLoop()); + cache.set(future); + waitCreateCache.countDown(); + return HttpResponse.from(future.thenApply(status -> { + if (Tracing.currentTracer().currentSpan() == null) { + return HttpResponse.of(INTERNAL_SERVER_ERROR); + } + return HttpResponse.of(status); + })); + } + })); + + sb.service("/read-cache", decorate("service/read-cache", new AbstractHttpService() { + @Override + protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req) throws Exception { + try { + final RequestContext requestContext = RequestContext.current(); + return HttpResponse.from( + cache.get().thenApply(status -> { + try (SafeCloseable ignored = RequestContext.push(requestContext)) { + if (Tracing.currentTracer().currentSpan() == null) { + return HttpResponse.of(INTERNAL_SERVER_ERROR); + } + return HttpResponse.of(status); + } + })); + } finally { + countDownLatch.countDown(); + } + } + })); + } + }; + + @Before + public void setupClients() { + poolHttpClient = HttpClient.of(server.uri("/")); + } + + @After + public void tearDown() { + Tracing.current().close(); + } + + @After + public void shouldHaveNoExtraSpans() { + assertThat(spanReporter.getSpans()).isEmpty(); + } + + private static HttpTracingService decorate(String name, Service service) { + return HttpTracingService.newDecorator(newTracing(name)).apply(service); + } + + private static Tracing newTracing(String name) { + return Tracing.newBuilder() + .currentTraceContext(CurrentTraceContext.Default.create()) + .localServiceName(name) + .spanReporter(spanReporter) + .sampler(Sampler.ALWAYS_SAMPLE) + .build(); + } + + @Test(timeout = 20000) + public void testNestedRequestContext() throws Exception { + final CompletableFuture create = poolHttpClient.get("/create-cache").aggregate(); + waitCreateCache.await(3, SECONDS); + final CompletableFuture read1 = poolHttpClient.get("/read-cache").aggregate(); + final CompletableFuture read2 = poolHttpClient.get("/read-cache").aggregate(); + + assertThat(create.get().status()).isEqualTo(OK); + assertThat(read1.get().status()).isEqualTo(OK); + assertThat(read2.get().status()).isEqualTo(OK); + + final Span[] spans = spanReporter.take(3); + assertThat(Arrays.stream(spans).map(Span::traceId).collect(toImmutableSet())).hasSize(3); + + try { + with().pollInterval(10, MILLISECONDS) + .then() + .atMost(10, SECONDS) + .untilAsserted( + () -> assertThat(poolHttpClient.get("/non-trace").aggregate().get().status()) + .isEqualTo(INTERNAL_SERVER_ERROR)); + fail("There is a leaked context."); + } catch (ConditionTimeoutException ignored) { + } + } +} diff --git a/zipkin/src/test/java/com/linecorp/armeria/it/tracing/ReporterImpl.java b/zipkin/src/test/java/com/linecorp/armeria/it/tracing/ReporterImpl.java new file mode 100644 index 00000000000..fc8cf2d3bba --- /dev/null +++ b/zipkin/src/test/java/com/linecorp/armeria/it/tracing/ReporterImpl.java @@ -0,0 +1,51 @@ +/* + * Copyright 2018 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.it.tracing; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import zipkin2.Span; +import zipkin2.reporter.Reporter; + +class ReporterImpl implements Reporter { + + private final BlockingQueue spans = new LinkedBlockingQueue<>(); + + @Override + public void report(Span span) { + spans.add(span); + } + + BlockingQueue getSpans() { + return spans; + } + + Span[] take(int numSpans) throws InterruptedException { + final List taken = new ArrayList<>(); + while (taken.size() < numSpans) { + taken.add(spans.take()); + } + + // Reverse the collected spans to sort the spans by request time. + Collections.reverse(taken); + return taken.toArray(new Span[numSpans]); + } +}