diff --git a/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/ChannelFutureInstrumentation.java b/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/ChannelFutureInstrumentation.java index 7491f0222e65..5db37d5de658 100644 --- a/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/ChannelFutureInstrumentation.java +++ b/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/ChannelFutureInstrumentation.java @@ -7,14 +7,19 @@ import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.AgentElementMatchers.implementsInterface; import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.ClassLoaderMatcher.hasClassesNamed; -import static java.util.Collections.singletonMap; +import static net.bytebuddy.matcher.ElementMatchers.isArray; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; +import io.opentelemetry.context.Context; +import io.opentelemetry.javaagent.instrumentation.api.ContextStore; +import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext; import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge; import io.opentelemetry.javaagent.tooling.TypeInstrumentation; +import java.util.HashMap; import java.util.Map; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; @@ -35,18 +40,83 @@ public ElementMatcher typeMatcher() { @Override public Map, String> transformers() { - return singletonMap( + Map, String> transformers = new HashMap<>(); + transformers.put( isMethod() .and(named("addListener")) .and(takesArgument(0, named("io.netty.util.concurrent.GenericFutureListener"))), ChannelFutureInstrumentation.class.getName() + "$AddListenerAdvice"); + transformers.put( + isMethod().and(named("addListeners")).and(takesArgument(0, isArray())), + ChannelFutureInstrumentation.class.getName() + "$AddListenersAdvice"); + transformers.put( + isMethod() + .and(named("removeListener")) + .and(takesArgument(0, named("io.netty.util.concurrent.GenericFutureListener"))), + ChannelFutureInstrumentation.class.getName() + "$RemoveListenerAdvice"); + transformers.put( + isMethod().and(named("removeListeners")).and(takesArgument(0, isArray())), + ChannelFutureInstrumentation.class.getName() + "$RemoveListenersAdvice"); + return transformers; } public static class AddListenerAdvice { @Advice.OnMethodEnter public static void wrapListener( - @Advice.Argument(value = 0, readOnly = false) GenericFutureListener listener) { - listener = new WrappedFutureListener(Java8BytecodeBridge.currentContext(), listener); + @Advice.Argument(value = 0, readOnly = false) + GenericFutureListener> listener) { + ContextStore contextStore = + InstrumentationContext.get(GenericFutureListener.class, GenericFutureListener.class); + listener = + FutureListenerWrappers.wrap(contextStore, Java8BytecodeBridge.currentContext(), listener); + } + } + + public static class AddListenersAdvice { + @Advice.OnMethodEnter + public static void wrapListener( + @Advice.Argument(value = 0, readOnly = false) + GenericFutureListener>[] listeners) { + + ContextStore contextStore = + InstrumentationContext.get(GenericFutureListener.class, GenericFutureListener.class); + Context context = Java8BytecodeBridge.currentContext(); + @SuppressWarnings("unchecked") + GenericFutureListener>[] wrappedListeners = + new GenericFutureListener[listeners.length]; + for (int i = 0; i < listeners.length; ++i) { + wrappedListeners[i] = FutureListenerWrappers.wrap(contextStore, context, listeners[i]); + } + listeners = wrappedListeners; + } + } + + public static class RemoveListenerAdvice { + @Advice.OnMethodEnter + public static void wrapListener( + @Advice.Argument(value = 0, readOnly = false) + GenericFutureListener> listener) { + ContextStore contextStore = + InstrumentationContext.get(GenericFutureListener.class, GenericFutureListener.class); + listener = FutureListenerWrappers.getWrapper(contextStore, listener); + } + } + + public static class RemoveListenersAdvice { + @Advice.OnMethodEnter + public static void wrapListener( + @Advice.Argument(value = 0, readOnly = false) + GenericFutureListener>[] listeners) { + + ContextStore contextStore = + InstrumentationContext.get(GenericFutureListener.class, GenericFutureListener.class); + @SuppressWarnings("unchecked") + GenericFutureListener>[] wrappedListeners = + new GenericFutureListener[listeners.length]; + for (int i = 0; i < listeners.length; ++i) { + wrappedListeners[i] = FutureListenerWrappers.getWrapper(contextStore, listeners[i]); + } + listeners = wrappedListeners; } } } diff --git a/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/FutureListenerWrappers.java b/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/FutureListenerWrappers.java new file mode 100644 index 000000000000..a467e2b9a3df --- /dev/null +++ b/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/FutureListenerWrappers.java @@ -0,0 +1,100 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.netty.v4_1; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.concurrent.GenericProgressiveFutureListener; +import io.netty.util.concurrent.ProgressiveFuture; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.javaagent.instrumentation.api.ContextStore; + +public final class FutureListenerWrappers { + @SuppressWarnings("unchecked") + public static GenericFutureListener> wrap( + ContextStore contextStore, + Context context, + GenericFutureListener> delegate) { + if (delegate instanceof WrappedFutureListener + || delegate instanceof WrappedProgressiveFutureListener) { + return delegate; + } + return (GenericFutureListener>) + contextStore.putIfAbsent( + delegate, + () -> { + if (delegate instanceof GenericProgressiveFutureListener) { + return new WrappedProgressiveFutureListener( + context, + (GenericProgressiveFutureListener>) delegate); + } else { + return new WrappedFutureListener( + context, (GenericFutureListener>) delegate); + } + }); + } + + public static GenericFutureListener> getWrapper( + ContextStore contextStore, + GenericFutureListener> delegate) { + GenericFutureListener> wrapper = + (GenericFutureListener>) contextStore.get(delegate); + return wrapper == null ? delegate : wrapper; + } + + private static final class WrappedFutureListener + implements GenericFutureListener> { + + private final Context context; + private final GenericFutureListener> delegate; + + private WrappedFutureListener( + Context context, GenericFutureListener> delegate) { + this.context = context; + this.delegate = delegate; + } + + @Override + public void operationComplete(Future future) throws Exception { + try (Scope ignored = context.makeCurrent()) { + delegate.operationComplete(future); + } + } + } + + private static final class WrappedProgressiveFutureListener + implements GenericProgressiveFutureListener> { + + private final Context context; + private final GenericProgressiveFutureListener> delegate; + + private WrappedProgressiveFutureListener( + Context context, + GenericProgressiveFutureListener> delegate) { + this.context = context; + this.delegate = delegate; + } + + @Override + public void operationProgressed( + ProgressiveFuture progressiveFuture, long l, long l1) throws Exception { + try (Scope ignored = context.makeCurrent()) { + delegate.operationProgressed(progressiveFuture, l, l1); + } + } + + @Override + public void operationComplete(ProgressiveFuture progressiveFuture) + throws Exception { + try (Scope ignored = context.makeCurrent()) { + delegate.operationComplete(progressiveFuture); + } + } + } + + private FutureListenerWrappers() {} +} diff --git a/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/WrappedFutureListener.java b/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/WrappedFutureListener.java deleted file mode 100644 index 223907bbd281..000000000000 --- a/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/WrappedFutureListener.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.netty.v4_1; - -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; - -public class WrappedFutureListener implements GenericFutureListener { - private final Context context; - private final GenericFutureListener delegate; - - public WrappedFutureListener(Context context, GenericFutureListener delegate) { - this.context = context; - this.delegate = delegate; - } - - @Override - public void operationComplete(Future future) throws Exception { - try (Scope ignored = context.makeCurrent()) { - delegate.operationComplete(future); - } - } -} diff --git a/instrumentation/netty/netty-4.1/javaagent/src/test/groovy/ChannelFutureTest.groovy b/instrumentation/netty/netty-4.1/javaagent/src/test/groovy/ChannelFutureTest.groovy new file mode 100644 index 000000000000..dd4f174bc04b --- /dev/null +++ b/instrumentation/netty/netty-4.1/javaagent/src/test/groovy/ChannelFutureTest.groovy @@ -0,0 +1,57 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import io.netty.channel.embedded.EmbeddedChannel +import io.netty.util.concurrent.Future +import io.netty.util.concurrent.GenericFutureListener +import io.netty.util.concurrent.GenericProgressiveFutureListener +import io.netty.util.concurrent.ProgressiveFuture +import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger + +class ChannelFutureTest extends AgentInstrumentationSpecification { + // regression test for https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/2705 + def "should clean up wrapped listeners"() { + given: + def channel = new EmbeddedChannel() + def counter = new AtomicInteger() + + def listener1 = newListener(counter) + channel.closeFuture().addListener(listener1) + channel.closeFuture().removeListener(listener1) + + def listener2 = newListener(counter) + def listener3 = newProgressiveListener(counter) + channel.closeFuture().addListeners(listener2, listener3) + channel.closeFuture().removeListeners(listener2, listener3) + + when: + channel.close().await(5, TimeUnit.SECONDS) + + then: + counter.get() == 0 + } + + private static GenericFutureListener newListener(AtomicInteger counter) { + new GenericFutureListener() { + void operationComplete(Future future) throws Exception { + counter.incrementAndGet() + } + } + } + + private static GenericFutureListener newProgressiveListener(AtomicInteger counter) { + new GenericProgressiveFutureListener() { + void operationProgressed(ProgressiveFuture future, long progress, long total) throws Exception { + counter.incrementAndGet() + } + + void operationComplete(Future future) throws Exception { + counter.incrementAndGet() + } + } + } +} diff --git a/javaagent-api/src/main/java/io/opentelemetry/javaagent/instrumentation/api/ContextStore.java b/javaagent-api/src/main/java/io/opentelemetry/javaagent/instrumentation/api/ContextStore.java index 18a782ee4791..efa26f5caf42 100644 --- a/javaagent-api/src/main/java/io/opentelemetry/javaagent/instrumentation/api/ContextStore.java +++ b/javaagent-api/src/main/java/io/opentelemetry/javaagent/instrumentation/api/ContextStore.java @@ -21,6 +21,7 @@ public interface ContextStore { * * @param context type */ + @FunctionalInterface interface Factory { /** Returns a new context instance. */