Skip to content

Commit

Permalink
StructuredTaskScope instrumentation (#11202)
Browse files Browse the repository at this point in the history
  • Loading branch information
ArtyomGabeev authored May 6, 2024
1 parent b93537d commit 35437d8
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public List<TypeInstrumentation> typeInstrumentations() {
new JavaForkJoinTaskInstrumentation(),
new RunnableInstrumentation(),
new ThreadPoolExtendingExecutorInstrumentation(),
new VirtualThreadInstrumentation());
new VirtualThreadInstrumentation(),
new StructuredTaskScopeInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.executors;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
import io.opentelemetry.javaagent.bootstrap.executors.ExecutorAdviceHelper;
import io.opentelemetry.javaagent.bootstrap.executors.PropagatedContext;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.concurrent.Callable;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class StructuredTaskScopeInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("java.util.concurrent.StructuredTaskScope");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("fork").and(takesArgument(0, Callable.class)),
this.getClass().getName() + "$ForkCallableAdvice");
}

@SuppressWarnings("unused")
public static class ForkCallableAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static PropagatedContext enterCallableFork(@Advice.Argument(0) Callable<?> task) {
Context context = Java8BytecodeBridge.currentContext();
if (ExecutorAdviceHelper.shouldPropagateContext(context, task)) {
VirtualField<Callable<?>, PropagatedContext> virtualField =
VirtualField.find(Callable.class, PropagatedContext.class);
return ExecutorAdviceHelper.attachContextToTask(context, virtualField, task);
}
return null;
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void exitCallableFork(
@Advice.Enter PropagatedContext propagatedContext, @Advice.Thrown Throwable throwable) {
ExecutorAdviceHelper.cleanUpAfterSubmit(propagatedContext, throwable);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.executors;

import static org.assertj.core.api.Assertions.assertThat;

import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import java.lang.reflect.Method;
import java.util.concurrent.Callable;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledForJreRange;
import org.junit.jupiter.api.condition.JRE;
import org.junit.jupiter.api.extension.RegisterExtension;

@EnabledForJreRange(min = JRE.JAVA_21)
class StructuredTaskScopeTest {

@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();

@Test
void multipleForkJoin() throws Exception {
Class<?> sofTaskScopeClass =
Class.forName("java.util.concurrent.StructuredTaskScope$ShutdownOnFailure");
Object taskScope = sofTaskScopeClass.getDeclaredConstructor().newInstance();
Class<?> taskScopeClass = Class.forName("java.util.concurrent.StructuredTaskScope");
Method forkMethod = taskScopeClass.getDeclaredMethod("fork", Callable.class);
Method joinMethod = taskScopeClass.getDeclaredMethod("join");
Method closeMethod = taskScopeClass.getDeclaredMethod("close");

Class<?> subtaskClass = Class.forName("java.util.concurrent.StructuredTaskScope$Subtask");
Method getMethod = subtaskClass.getDeclaredMethod("get");

Callable<String> callable1 =
() -> {
testing.runWithSpan("task1", () -> {});
return "a";
};
Callable<String> callable2 =
() -> {
testing.runWithSpan("task2", () -> {});
return "b";
};

String result =
testing.runWithSpan(
"parent",
() -> {
try {
Object fork1 = forkMethod.invoke(taskScope, callable1);
Object fork2 = forkMethod.invoke(taskScope, callable2);
joinMethod.invoke(taskScope);

return "" + getMethod.invoke(fork1) + getMethod.invoke(fork2);
} catch (Exception e) {
throw new AssertionError(e);
}
});

assertThat(result).isEqualTo("ab");

testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactlyInAnyOrder(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName("task1").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(0)),
span ->
span.hasName("task2").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(0))));

closeMethod.invoke(taskScope);
}
}

0 comments on commit 35437d8

Please sign in to comment.