From a1e7307008591a7f291ab6009958be5a8a0ed980 Mon Sep 17 00:00:00 2001 From: Artyom Gabeev Date: Mon, 22 Apr 2024 17:02:46 +0300 Subject: [PATCH 1/2] StructuredTaskScope instrumentation --- .../ExecutorsInstrumentationModule.java | 3 +- .../StructuredTaskScopeInstrumentation.java | 48 ++++++++++++ .../executors/StructuredTaskScopeTest.java | 78 +++++++++++++++++++ 3 files changed, 128 insertions(+), 1 deletion(-) create mode 100644 instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/StructuredTaskScopeInstrumentation.java create mode 100644 instrumentation/executors/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/executors/StructuredTaskScopeTest.java diff --git a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/ExecutorsInstrumentationModule.java b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/ExecutorsInstrumentationModule.java index 0e0e58e32a82..ff1e753b4de9 100644 --- a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/ExecutorsInstrumentationModule.java +++ b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/ExecutorsInstrumentationModule.java @@ -33,6 +33,7 @@ public List typeInstrumentations() { new JavaForkJoinTaskInstrumentation(), new RunnableInstrumentation(), new ThreadPoolExtendingExecutorInstrumentation(), - new VirtualThreadInstrumentation()); + new VirtualThreadInstrumentation(), + new StructuredTaskScopeInstrumentation()); } } diff --git a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/StructuredTaskScopeInstrumentation.java b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/StructuredTaskScopeInstrumentation.java new file mode 100644 index 000000000000..8e988955875b --- /dev/null +++ b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/StructuredTaskScopeInstrumentation.java @@ -0,0 +1,48 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.executors; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.util.VirtualField; +import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge; +import io.opentelemetry.javaagent.bootstrap.executors.ExecutorAdviceHelper; +import io.opentelemetry.javaagent.bootstrap.executors.PropagatedContext; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import java.util.concurrent.Callable; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +public class StructuredTaskScopeInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("java.util.concurrent.StructuredTaskScope"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + named("fork").and(takesArgument(0, Callable.class)), + this.getClass().getName() + "$ForkCallableAdvice"); + } + + @SuppressWarnings("unused") + public static class ForkCallableAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static PropagatedContext enterCallableFork(@Advice.Argument(0) Callable task) { + Context context = Java8BytecodeBridge.currentContext(); + VirtualField, PropagatedContext> virtualField = + VirtualField.find(Callable.class, PropagatedContext.class); + return ExecutorAdviceHelper.attachContextToTask(context, virtualField, task); + } + } +} diff --git a/instrumentation/executors/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/executors/StructuredTaskScopeTest.java b/instrumentation/executors/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/executors/StructuredTaskScopeTest.java new file mode 100644 index 000000000000..6d4d2be34881 --- /dev/null +++ b/instrumentation/executors/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/executors/StructuredTaskScopeTest.java @@ -0,0 +1,78 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.executors; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import java.lang.reflect.Method; +import java.util.concurrent.Callable; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledForJreRange; +import org.junit.jupiter.api.condition.JRE; +import org.junit.jupiter.api.extension.RegisterExtension; + +@EnabledForJreRange(min = JRE.JAVA_21) +class StructuredTaskScopeTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Test + void multipleForkJoin() throws Exception { + Class sofTaskScopeClass = + Class.forName("java.util.concurrent.StructuredTaskScope$ShutdownOnFailure"); + Object taskScope = sofTaskScopeClass.getDeclaredConstructor().newInstance(); + Class taskScopeClass = Class.forName("java.util.concurrent.StructuredTaskScope"); + Method forkMethod = taskScopeClass.getDeclaredMethod("fork", Callable.class); + Method joinMethod = taskScopeClass.getDeclaredMethod("join"); + Method closeMethod = taskScopeClass.getDeclaredMethod("close"); + + Class subtaskClass = Class.forName("java.util.concurrent.StructuredTaskScope$Subtask"); + Method getMethod = subtaskClass.getDeclaredMethod("get"); + + Callable callable1 = + () -> { + testing.runWithSpan("task1", () -> {}); + return "a"; + }; + Callable callable2 = + () -> { + testing.runWithSpan("task2", () -> {}); + return "b"; + }; + + String result = + testing.runWithSpan( + "parent", + () -> { + try { + Object fork1 = forkMethod.invoke(taskScope, callable1); + Object fork2 = forkMethod.invoke(taskScope, callable2); + joinMethod.invoke(taskScope); + + return "" + getMethod.invoke(fork1) + getMethod.invoke(fork2); + } catch (Exception e) { + throw new AssertionError(e); + } + }); + + assertThat(result).isEqualTo("ab"); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactlyInAnyOrder( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName("task1").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(0)), + span -> + span.hasName("task2").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(0)))); + + closeMethod.invoke(taskScope); + } +} From b47cfa42d1f722aa44a9b7c72cc91eebb6cff345 Mon Sep 17 00:00:00 2001 From: Artyom Gabeev Date: Mon, 29 Apr 2024 15:43:30 +0300 Subject: [PATCH 2/2] Apply CR changes --- .../StructuredTaskScopeInstrumentation.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/StructuredTaskScopeInstrumentation.java b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/StructuredTaskScopeInstrumentation.java index 8e988955875b..16678b268517 100644 --- a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/StructuredTaskScopeInstrumentation.java +++ b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/StructuredTaskScopeInstrumentation.java @@ -40,9 +40,18 @@ public static class ForkCallableAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static PropagatedContext enterCallableFork(@Advice.Argument(0) Callable task) { Context context = Java8BytecodeBridge.currentContext(); - VirtualField, PropagatedContext> virtualField = - VirtualField.find(Callable.class, PropagatedContext.class); - return ExecutorAdviceHelper.attachContextToTask(context, virtualField, task); + if (ExecutorAdviceHelper.shouldPropagateContext(context, task)) { + VirtualField, PropagatedContext> virtualField = + VirtualField.find(Callable.class, PropagatedContext.class); + return ExecutorAdviceHelper.attachContextToTask(context, virtualField, task); + } + return null; + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void exitCallableFork( + @Advice.Enter PropagatedContext propagatedContext, @Advice.Thrown Throwable throwable) { + ExecutorAdviceHelper.cleanUpAfterSubmit(propagatedContext, throwable); } } }