From f082ae939461c56f03004c7aa401f5c2b76e08b6 Mon Sep 17 00:00:00 2001 From: MiNG Date: Sat, 2 May 2020 00:35:52 +0800 Subject: [PATCH] Support trace context propagation on `ListenableFuture` returned by `submitListenable`. --- .../instrument/TracedListenableFuture.java | 86 ++++++ .../TracedListenableFutureCallback.java | 69 +++++ .../TracedThreadPoolTaskExecutor.java | 6 +- .../TracedThreadPoolTaskScheduler.java | 7 +- .../TracedListenableFutureTest.java | 258 ++++++++++++++++++ 5 files changed, 420 insertions(+), 6 deletions(-) create mode 100644 instrument-starters/opentracing-spring-cloud-core/src/main/java/io/opentracing/contrib/spring/cloud/async/instrument/TracedListenableFuture.java create mode 100644 instrument-starters/opentracing-spring-cloud-core/src/main/java/io/opentracing/contrib/spring/cloud/async/instrument/TracedListenableFutureCallback.java create mode 100644 instrument-starters/opentracing-spring-cloud-core/src/test/java/io/opentracing/contrib/spring/cloud/async/instrument/TracedListenableFutureTest.java diff --git a/instrument-starters/opentracing-spring-cloud-core/src/main/java/io/opentracing/contrib/spring/cloud/async/instrument/TracedListenableFuture.java b/instrument-starters/opentracing-spring-cloud-core/src/main/java/io/opentracing/contrib/spring/cloud/async/instrument/TracedListenableFuture.java new file mode 100644 index 00000000..74908ccc --- /dev/null +++ b/instrument-starters/opentracing-spring-cloud-core/src/main/java/io/opentracing/contrib/spring/cloud/async/instrument/TracedListenableFuture.java @@ -0,0 +1,86 @@ +/** + * Copyright 2017-2020 The OpenTracing Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.opentracing.contrib.spring.cloud.async.instrument; + +import io.opentracing.Span; +import io.opentracing.Tracer; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.springframework.util.concurrent.FailureCallback; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; +import org.springframework.util.concurrent.SuccessCallback; + +/** + * @author MiNG + */ +public class TracedListenableFuture implements ListenableFuture { + + private final ListenableFuture delegate; + private final Tracer tracer; + private final Span span; + + public TracedListenableFuture(ListenableFuture delegate, Tracer tracer) { + this(delegate, tracer, tracer.activeSpan()); + } + + public TracedListenableFuture(ListenableFuture delegate, Tracer tracer, Span span) { + this.delegate = delegate; + this.tracer = tracer; + this.span = span; + } + + @Override + public void addCallback(ListenableFutureCallback callback) { + delegate.addCallback(new TracedListenableFutureCallback<>(callback, tracer, span)); + } + + @Override + public void addCallback(SuccessCallback successCallback, FailureCallback failureCallback) { + delegate.addCallback(new TracedListenableFutureCallback<>(successCallback, failureCallback, tracer, span)); + } + + @Override + public CompletableFuture completable() { + return delegate.completable(); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return delegate.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return delegate.isCancelled(); + } + + @Override + public boolean isDone() { + return delegate.isDone(); + } + + @Override + public T get() throws InterruptedException, ExecutionException { + return delegate.get(); + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return delegate.get(timeout, unit); + } +} diff --git a/instrument-starters/opentracing-spring-cloud-core/src/main/java/io/opentracing/contrib/spring/cloud/async/instrument/TracedListenableFutureCallback.java b/instrument-starters/opentracing-spring-cloud-core/src/main/java/io/opentracing/contrib/spring/cloud/async/instrument/TracedListenableFutureCallback.java new file mode 100644 index 00000000..74373d0a --- /dev/null +++ b/instrument-starters/opentracing-spring-cloud-core/src/main/java/io/opentracing/contrib/spring/cloud/async/instrument/TracedListenableFutureCallback.java @@ -0,0 +1,69 @@ +/** + * Copyright 2017-2020 The OpenTracing Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.opentracing.contrib.spring.cloud.async.instrument; + +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.Tracer; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import org.springframework.util.concurrent.FailureCallback; +import org.springframework.util.concurrent.ListenableFutureCallback; +import org.springframework.util.concurrent.SuccessCallback; + +/** + * @author MiNG + */ +public class TracedListenableFutureCallback implements ListenableFutureCallback { + + private final SuccessCallback successDelegate; + private final FailureCallback failureDelegate; + private final Tracer tracer; + private final Span span; + + public TracedListenableFutureCallback(ListenableFutureCallback delegate, Tracer tracer) { + this(delegate, delegate, tracer, tracer.activeSpan()); + } + + public TracedListenableFutureCallback(ListenableFutureCallback delegate, Tracer tracer, Span span) { + this(delegate, delegate, tracer, span); + } + + public TracedListenableFutureCallback(SuccessCallback successDelegate, FailureCallback failureDelegate, Tracer tracer) { + this(successDelegate, failureDelegate, tracer, tracer.activeSpan()); + } + + public TracedListenableFutureCallback(@Nullable SuccessCallback successDelegate, @Nullable FailureCallback failureDelegate, Tracer tracer, Span span) { + Assert.notNull(successDelegate, "'successDelegate' must not be null"); + Assert.notNull(failureDelegate, "'failureDelegate' must not be null"); + this.successDelegate = successDelegate; + this.failureDelegate = failureDelegate; + this.tracer = tracer; + this.span = span; + } + + @Override + public void onSuccess(T result) { + try (Scope ignored = span == null ? null : tracer.scopeManager().activate(span)) { + successDelegate.onSuccess(result); + } + } + + @Override + public void onFailure(Throwable ex) { + try (Scope ignored = span == null ? null : tracer.scopeManager().activate(span)) { + failureDelegate.onFailure(ex); + } + } +} diff --git a/instrument-starters/opentracing-spring-cloud-core/src/main/java/io/opentracing/contrib/spring/cloud/async/instrument/TracedThreadPoolTaskExecutor.java b/instrument-starters/opentracing-spring-cloud-core/src/main/java/io/opentracing/contrib/spring/cloud/async/instrument/TracedThreadPoolTaskExecutor.java index d0b25353..d5bb57ba 100644 --- a/instrument-starters/opentracing-spring-cloud-core/src/main/java/io/opentracing/contrib/spring/cloud/async/instrument/TracedThreadPoolTaskExecutor.java +++ b/instrument-starters/opentracing-spring-cloud-core/src/main/java/io/opentracing/contrib/spring/cloud/async/instrument/TracedThreadPoolTaskExecutor.java @@ -1,5 +1,5 @@ /** - * Copyright 2017-2019 The OpenTracing Authors + * Copyright 2017-2020 The OpenTracing Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -59,12 +59,12 @@ public Future submit(Callable task) { @Override public ListenableFuture submitListenable(Runnable task) { - return this.delegate.submitListenable(new TracedRunnable(task, tracer)); + return new TracedListenableFuture<>(this.delegate.submitListenable(new TracedRunnable(task, tracer)), tracer); } @Override public ListenableFuture submitListenable(Callable task) { - return this.delegate.submitListenable(new TracedCallable<>(task, tracer)); + return new TracedListenableFuture<>(this.delegate.submitListenable(new TracedCallable<>(task, tracer)), tracer); } @Override diff --git a/instrument-starters/opentracing-spring-cloud-core/src/main/java/io/opentracing/contrib/spring/cloud/async/instrument/TracedThreadPoolTaskScheduler.java b/instrument-starters/opentracing-spring-cloud-core/src/main/java/io/opentracing/contrib/spring/cloud/async/instrument/TracedThreadPoolTaskScheduler.java index 3bf47d5a..baae829f 100644 --- a/instrument-starters/opentracing-spring-cloud-core/src/main/java/io/opentracing/contrib/spring/cloud/async/instrument/TracedThreadPoolTaskScheduler.java +++ b/instrument-starters/opentracing-spring-cloud-core/src/main/java/io/opentracing/contrib/spring/cloud/async/instrument/TracedThreadPoolTaskScheduler.java @@ -1,5 +1,5 @@ /** - * Copyright 2017-2019 The OpenTracing Authors + * Copyright 2017-2020 The OpenTracing Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -26,6 +26,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; + import org.springframework.lang.Nullable; import org.springframework.scheduling.Trigger; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @@ -113,12 +114,12 @@ public Future submit(Callable task) { @Override public ListenableFuture submitListenable(Runnable task) { - return delegate.submitListenable(new TracedRunnable(task, tracer)); + return new TracedListenableFuture<>(delegate.submitListenable(new TracedRunnable(task, tracer)), tracer); } @Override public ListenableFuture submitListenable(Callable task) { - return delegate.submitListenable(new TracedCallable<>(task, tracer)); + return new TracedListenableFuture<>(delegate.submitListenable(new TracedCallable<>(task, tracer)), tracer); } @Override diff --git a/instrument-starters/opentracing-spring-cloud-core/src/test/java/io/opentracing/contrib/spring/cloud/async/instrument/TracedListenableFutureTest.java b/instrument-starters/opentracing-spring-cloud-core/src/test/java/io/opentracing/contrib/spring/cloud/async/instrument/TracedListenableFutureTest.java new file mode 100644 index 00000000..95802aae --- /dev/null +++ b/instrument-starters/opentracing-spring-cloud-core/src/test/java/io/opentracing/contrib/spring/cloud/async/instrument/TracedListenableFutureTest.java @@ -0,0 +1,258 @@ +/** + * Copyright 2017-2020 The OpenTracing Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.opentracing.contrib.spring.cloud.async.instrument; + +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.mock.MockTracer; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.LockSupport; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Test; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.util.concurrent.ListenableFuture; + +/** + * Tests for {@link TracedListenableFuture}, to ensure context is propagated to all callbacks. + * + * @author MiNG + */ +public class TracedListenableFutureTest { + + private static final MockTracer TRACER = new MockTracer(); + private static final TracedThreadPoolTaskExecutor EXECUTOR; + static { + final ThreadPoolTaskExecutor delegate = new ThreadPoolTaskExecutor(); + delegate.initialize(); + EXECUTOR = new TracedThreadPoolTaskExecutor(TRACER, delegate); + } + private static final TracedThreadPoolTaskScheduler SCHEDULER; + static { + final ThreadPoolTaskScheduler delegate = new ThreadPoolTaskScheduler(); + delegate.initialize(); + SCHEDULER = new TracedThreadPoolTaskScheduler(TRACER, delegate); + } + + @AfterClass + public static void cleanup() { + EXECUTOR.destroy(); + SCHEDULER.destroy(); + TRACER.close(); + } + + @Test(timeout = 1000) + public void executor_submitListenable_runnable_onSuccess() throws Exception { + final Span span = TRACER.buildSpan("executor_submitListenable_runnable_onSuccess").start(); + try (Scope ignored = TRACER.activateSpan(span)) { + final ListenableFuture listenableFuture = EXECUTOR.submitListenable(() -> { + // Force callback run in thread pool + LockSupport.parkNanos(100_000_000); + Assert.assertSame(span, TRACER.activeSpan()); + }); + final CompletableFuture callbackResult = new CompletableFuture<>(); + listenableFuture.addCallback( + r -> callbackResult.complete(span == TRACER.activeSpan()), + e -> { /* NOOP */ } + ); + listenableFuture.get(); + Assert.assertTrue(callbackResult.get()); + } + } + + @Test(timeout = 1000) + public void executor_submitListenable_callable_onSuccess() throws Exception { + final Span span = TRACER.buildSpan("executor_submitListenable_callable_onSuccess").start(); + try (Scope ignored = TRACER.activateSpan(span)) { + final ListenableFuture listenableFuture = EXECUTOR.submitListenable(() -> { + // Force callback run in thread pool + LockSupport.parkNanos(100_000_000); + Assert.assertSame(span, TRACER.activeSpan()); + return null; + }); + final CompletableFuture callbackResult = new CompletableFuture<>(); + listenableFuture.addCallback( + r -> callbackResult.complete(span == TRACER.activeSpan()), + e -> { /* NOOP */ } + ); + listenableFuture.get(); + Assert.assertTrue(callbackResult.get()); + } + } + + @Test(timeout = 1000) + public void executor_submitListenable_runnable_onFailure() throws Exception { + final Span span = TRACER.buildSpan("executor_submitListenable_runnable_onFailure").start(); + try (Scope ignored = TRACER.activateSpan(span)) { + final ListenableFuture listenableFuture = EXECUTOR.submitListenable(() -> { + // Force callback run in thread pool + LockSupport.parkNanos(100_000_000); + Assert.assertSame(span, TRACER.activeSpan()); + throw new Exception(); + }); + final CompletableFuture callbackResult = new CompletableFuture<>(); + listenableFuture.addCallback( + r -> { /* NOOP */ }, + e -> { + if (e instanceof AssertionError) { + callbackResult.completeExceptionally(e); + } else { + callbackResult.complete(span == TRACER.activeSpan()); + } + } + ); + try { + listenableFuture.get(); + Assert.fail(); + } catch (Exception e) { + // success + } + Assert.assertTrue(callbackResult.get()); + } + } + + @Test(timeout = 1000) + public void executor_submitListenable_callable_onFailure() throws Exception { + final Span span = TRACER.buildSpan("executor_submitListenable_callable_onFailure").start(); + try (Scope ignored = TRACER.activateSpan(span)) { + final ListenableFuture listenableFuture = EXECUTOR.submitListenable(() -> { + // Force callback run in thread pool + LockSupport.parkNanos(100_000_000); + Assert.assertSame(span, TRACER.activeSpan()); + throw new Exception(); + }); + final CompletableFuture callbackResult = new CompletableFuture<>(); + listenableFuture.addCallback( + r -> { /* NOOP */ }, + e -> { + if (e instanceof AssertionError) { + callbackResult.completeExceptionally(e); + } else { + callbackResult.complete(span == TRACER.activeSpan()); + } + } + ); + try { + listenableFuture.get(); + Assert.fail(); + } catch (Exception e) { + // success + } + Assert.assertTrue(callbackResult.get()); + } + } + + @Test(timeout = 1000) + public void scheduler_submitListenable_runnable_onSuccess() throws Exception { + final Span span = TRACER.buildSpan("executor_submitListenable_runnable_onSuccess").start(); + try (Scope ignored = TRACER.activateSpan(span)) { + final ListenableFuture listenableFuture = SCHEDULER.submitListenable(() -> { + // Force callback run in thread pool + LockSupport.parkNanos(100_000_000); + Assert.assertSame(span, TRACER.activeSpan()); + }); + final CompletableFuture callbackResult = new CompletableFuture<>(); + listenableFuture.addCallback( + r -> callbackResult.complete(span == TRACER.activeSpan()), + e -> { /* NOOP */ } + ); + listenableFuture.get(); + Assert.assertTrue(callbackResult.get()); + } + } + + @Test(timeout = 1000) + public void scheduler_submitListenable_callable_onSuccess() throws Exception { + final Span span = TRACER.buildSpan("executor_submitListenable_callable_onSuccess").start(); + try (Scope ignored = TRACER.activateSpan(span)) { + final ListenableFuture listenableFuture = SCHEDULER.submitListenable(() -> { + // Force callback run in thread pool + LockSupport.parkNanos(100_000_000); + Assert.assertSame(span, TRACER.activeSpan()); + return null; + }); + final CompletableFuture callbackResult = new CompletableFuture<>(); + listenableFuture.addCallback( + r -> callbackResult.complete(span == TRACER.activeSpan()), + e -> { /* NOOP */ } + ); + listenableFuture.get(); + Assert.assertTrue(callbackResult.get()); + } + } + + @Test(timeout = 1000) + public void scheduler_submitListenable_runnable_onFailure() throws Exception { + final Span span = TRACER.buildSpan("executor_submitListenable_runnable_onFailure").start(); + try (Scope ignored = TRACER.activateSpan(span)) { + final ListenableFuture listenableFuture = SCHEDULER.submitListenable(() -> { + // Force callback run in thread pool + LockSupport.parkNanos(100_000_000); + Assert.assertSame(span, TRACER.activeSpan()); + throw new Exception(); + }); + final CompletableFuture callbackResult = new CompletableFuture<>(); + listenableFuture.addCallback( + r -> { /* NOOP */ }, + e -> { + if (e instanceof AssertionError) { + callbackResult.completeExceptionally(e); + } else { + callbackResult.complete(span == TRACER.activeSpan()); + } + } + ); + try { + listenableFuture.get(); + Assert.fail(); + } catch (Exception e) { + // success + } + Assert.assertTrue(callbackResult.get()); + } + } + + @Test(timeout = 1000) + public void scheduler_submitListenable_callable_onFailure() throws Exception { + final Span span = TRACER.buildSpan("executor_submitListenable_callable_onFailure").start(); + try (Scope ignored = TRACER.activateSpan(span)) { + final ListenableFuture listenableFuture = SCHEDULER.submitListenable(() -> { + // Force callback run in thread pool + LockSupport.parkNanos(100_000_000); + Assert.assertSame(span, TRACER.activeSpan()); + throw new Exception(); + }); + final CompletableFuture callbackResult = new CompletableFuture<>(); + listenableFuture.addCallback( + r -> { /* NOOP */ }, + e -> { + if (e instanceof AssertionError) { + callbackResult.completeExceptionally(e); + } else { + callbackResult.complete(span == TRACER.activeSpan()); + } + } + ); + try { + listenableFuture.get(); + Assert.fail(); + } catch (Exception e) { + // success + } + Assert.assertTrue(callbackResult.get()); + } + } +}