From 2fdacc1e453e2503548e79b3b048163bb2d5c6af Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Thu, 14 Mar 2024 10:20:16 +0200 Subject: [PATCH] Disable context propagation when virtual thread is switched to the carrier thread --- .../executors/ExecutorAdviceHelper.java | 28 +++++++++- .../ExecutorsInstrumentationModule.java | 3 +- .../VirtualThreadInstrumentation.java | 56 +++++++++++++++++++ 3 files changed, 85 insertions(+), 2 deletions(-) create mode 100644 instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/VirtualThreadInstrumentation.java diff --git a/instrumentation/executors/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/executors/ExecutorAdviceHelper.java b/instrumentation/executors/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/executors/ExecutorAdviceHelper.java index 3e80665e1f50..7892157b9aa0 100644 --- a/instrumentation/executors/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/executors/ExecutorAdviceHelper.java +++ b/instrumentation/executors/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/executors/ExecutorAdviceHelper.java @@ -17,13 +17,35 @@ */ public final class ExecutorAdviceHelper { + private static final ThreadLocal propagationDisabled = new ThreadLocal<>(); + + /** + * Temporarily disable context propagation for current thread. Call {@link #enablePropagation()} + * to re-enable the propagation. + */ + public static void disablePropagation() { + propagationDisabled.set(Boolean.TRUE); + } + + /** + * Enable context propagation for current thread after it was disabled by calling {@link + * #disablePropagation()}. + */ + public static void enablePropagation() { + propagationDisabled.remove(); + } + + private static boolean isPropagationDisabled() { + return propagationDisabled.get() != null; + } + /** * Check if {@code context} should be propagated to the passed {@code task}. This method must be * called before each {@link #attachContextToTask(Context, VirtualField, Object)} call to ensure * that unwanted tasks are not instrumented. */ public static boolean shouldPropagateContext(Context context, @Nullable Object task) { - if (task == null) { + if (task == null || isPropagationDisabled()) { return false; } @@ -89,6 +111,10 @@ public static void cleanUpAfterSubmit( /** Clean context attached to the given task. */ public static void cleanPropagatedContext( VirtualField virtualField, T task) { + if (isPropagationDisabled()) { + return; + } + PropagatedContext propagatedContext = virtualField.get(task); if (propagatedContext != null) { propagatedContext.clear(); diff --git a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/ExecutorsInstrumentationModule.java b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/ExecutorsInstrumentationModule.java index 93a3d83235ae..0e0e58e32a82 100644 --- a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/ExecutorsInstrumentationModule.java +++ b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/ExecutorsInstrumentationModule.java @@ -32,6 +32,7 @@ public List typeInstrumentations() { new JavaExecutorInstrumentation(), new JavaForkJoinTaskInstrumentation(), new RunnableInstrumentation(), - new ThreadPoolExtendingExecutorInstrumentation()); + new ThreadPoolExtendingExecutorInstrumentation(), + new VirtualThreadInstrumentation()); } } diff --git a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/VirtualThreadInstrumentation.java b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/VirtualThreadInstrumentation.java new file mode 100644 index 000000000000..8aad00eda23a --- /dev/null +++ b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/VirtualThreadInstrumentation.java @@ -0,0 +1,56 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.executors; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import io.opentelemetry.javaagent.bootstrap.executors.ExecutorAdviceHelper; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +public class VirtualThreadInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("java.lang.VirtualThread"); + } + + @Override + public void transform(TypeTransformer transformer) { + // Disable context propagation when virtual thread is switched to the carrier thread. We should + // not propagate context on the carrier thread. Also, context propagation code can cause the + // carrier thread to park when it normally does not park, which may be unexpected for the jvm. + // https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/10747 + transformer.applyAdviceToMethod( + named("switchToCarrierThread").and(takesArguments(0)), + this.getClass().getName() + "$SwitchToCarrierAdvice"); + transformer.applyAdviceToMethod( + named("switchToVirtualThread").and(takesArguments(1)), + this.getClass().getName() + "$SwitchToVirtualAdvice"); + } + + @SuppressWarnings("unused") + public static class SwitchToCarrierAdvice { + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void exit() { + ExecutorAdviceHelper.disablePropagation(); + } + } + + @SuppressWarnings("unused") + public static class SwitchToVirtualAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void enter() { + ExecutorAdviceHelper.enablePropagation(); + } + } +}