Skip to content

Commit

Permalink
Fix netty 4.1 instrumentation not removing future listeners (#2851)
Browse files Browse the repository at this point in the history
* Fix netty 4.1 instrumentation not removing future listeners

* Code review follow-up

* Use InstrumentationContext
  • Loading branch information
Mateusz Rzeszutek authored Apr 27, 2021
1 parent 2df0bb4 commit 77f8be8
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@

import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.AgentElementMatchers.implementsInterface;
import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.ClassLoaderMatcher.hasClassesNamed;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isArray;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.opentelemetry.context.Context;
import io.opentelemetry.javaagent.instrumentation.api.ContextStore;
import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext;
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import java.util.HashMap;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
Expand All @@ -35,18 +40,83 @@ public ElementMatcher<TypeDescription> typeMatcher() {

@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
Map<ElementMatcher.Junction<MethodDescription>, String> transformers = new HashMap<>();
transformers.put(
isMethod()
.and(named("addListener"))
.and(takesArgument(0, named("io.netty.util.concurrent.GenericFutureListener"))),
ChannelFutureInstrumentation.class.getName() + "$AddListenerAdvice");
transformers.put(
isMethod().and(named("addListeners")).and(takesArgument(0, isArray())),
ChannelFutureInstrumentation.class.getName() + "$AddListenersAdvice");
transformers.put(
isMethod()
.and(named("removeListener"))
.and(takesArgument(0, named("io.netty.util.concurrent.GenericFutureListener"))),
ChannelFutureInstrumentation.class.getName() + "$RemoveListenerAdvice");
transformers.put(
isMethod().and(named("removeListeners")).and(takesArgument(0, isArray())),
ChannelFutureInstrumentation.class.getName() + "$RemoveListenersAdvice");
return transformers;
}

public static class AddListenerAdvice {
@Advice.OnMethodEnter
public static void wrapListener(
@Advice.Argument(value = 0, readOnly = false) GenericFutureListener listener) {
listener = new WrappedFutureListener(Java8BytecodeBridge.currentContext(), listener);
@Advice.Argument(value = 0, readOnly = false)
GenericFutureListener<? extends Future<? super Void>> listener) {
ContextStore<GenericFutureListener, GenericFutureListener> contextStore =
InstrumentationContext.get(GenericFutureListener.class, GenericFutureListener.class);
listener =
FutureListenerWrappers.wrap(contextStore, Java8BytecodeBridge.currentContext(), listener);
}
}

public static class AddListenersAdvice {
@Advice.OnMethodEnter
public static void wrapListener(
@Advice.Argument(value = 0, readOnly = false)
GenericFutureListener<? extends Future<? super Void>>[] listeners) {

ContextStore<GenericFutureListener, GenericFutureListener> contextStore =
InstrumentationContext.get(GenericFutureListener.class, GenericFutureListener.class);
Context context = Java8BytecodeBridge.currentContext();
@SuppressWarnings("unchecked")
GenericFutureListener<? extends Future<? super Void>>[] wrappedListeners =
new GenericFutureListener[listeners.length];
for (int i = 0; i < listeners.length; ++i) {
wrappedListeners[i] = FutureListenerWrappers.wrap(contextStore, context, listeners[i]);
}
listeners = wrappedListeners;
}
}

public static class RemoveListenerAdvice {
@Advice.OnMethodEnter
public static void wrapListener(
@Advice.Argument(value = 0, readOnly = false)
GenericFutureListener<? extends Future<? super Void>> listener) {
ContextStore<GenericFutureListener, GenericFutureListener> contextStore =
InstrumentationContext.get(GenericFutureListener.class, GenericFutureListener.class);
listener = FutureListenerWrappers.getWrapper(contextStore, listener);
}
}

public static class RemoveListenersAdvice {
@Advice.OnMethodEnter
public static void wrapListener(
@Advice.Argument(value = 0, readOnly = false)
GenericFutureListener<? extends Future<? super Void>>[] listeners) {

ContextStore<GenericFutureListener, GenericFutureListener> contextStore =
InstrumentationContext.get(GenericFutureListener.class, GenericFutureListener.class);
@SuppressWarnings("unchecked")
GenericFutureListener<? extends Future<? super Void>>[] wrappedListeners =
new GenericFutureListener[listeners.length];
for (int i = 0; i < listeners.length; ++i) {
wrappedListeners[i] = FutureListenerWrappers.getWrapper(contextStore, listeners[i]);
}
listeners = wrappedListeners;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.netty.v4_1;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.GenericProgressiveFutureListener;
import io.netty.util.concurrent.ProgressiveFuture;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.instrumentation.api.ContextStore;

public final class FutureListenerWrappers {
@SuppressWarnings("unchecked")
public static GenericFutureListener<? extends Future<? super Void>> wrap(
ContextStore<GenericFutureListener, GenericFutureListener> contextStore,
Context context,
GenericFutureListener<? extends Future<? super Void>> delegate) {
if (delegate instanceof WrappedFutureListener
|| delegate instanceof WrappedProgressiveFutureListener) {
return delegate;
}
return (GenericFutureListener<? extends Future<? super Void>>)
contextStore.putIfAbsent(
delegate,
() -> {
if (delegate instanceof GenericProgressiveFutureListener) {
return new WrappedProgressiveFutureListener(
context,
(GenericProgressiveFutureListener<ProgressiveFuture<? super Void>>) delegate);
} else {
return new WrappedFutureListener(
context, (GenericFutureListener<Future<? super Void>>) delegate);
}
});
}

public static GenericFutureListener<? extends Future<? super Void>> getWrapper(
ContextStore<GenericFutureListener, GenericFutureListener> contextStore,
GenericFutureListener<? extends Future<? super Void>> delegate) {
GenericFutureListener<? extends Future<? super Void>> wrapper =
(GenericFutureListener<? extends Future<? super Void>>) contextStore.get(delegate);
return wrapper == null ? delegate : wrapper;
}

private static final class WrappedFutureListener
implements GenericFutureListener<Future<? super Void>> {

private final Context context;
private final GenericFutureListener<Future<? super Void>> delegate;

private WrappedFutureListener(
Context context, GenericFutureListener<Future<? super Void>> delegate) {
this.context = context;
this.delegate = delegate;
}

@Override
public void operationComplete(Future<? super Void> future) throws Exception {
try (Scope ignored = context.makeCurrent()) {
delegate.operationComplete(future);
}
}
}

private static final class WrappedProgressiveFutureListener
implements GenericProgressiveFutureListener<ProgressiveFuture<? super Void>> {

private final Context context;
private final GenericProgressiveFutureListener<ProgressiveFuture<? super Void>> delegate;

private WrappedProgressiveFutureListener(
Context context,
GenericProgressiveFutureListener<ProgressiveFuture<? super Void>> delegate) {
this.context = context;
this.delegate = delegate;
}

@Override
public void operationProgressed(
ProgressiveFuture<? super Void> progressiveFuture, long l, long l1) throws Exception {
try (Scope ignored = context.makeCurrent()) {
delegate.operationProgressed(progressiveFuture, l, l1);
}
}

@Override
public void operationComplete(ProgressiveFuture<? super Void> progressiveFuture)
throws Exception {
try (Scope ignored = context.makeCurrent()) {
delegate.operationComplete(progressiveFuture);
}
}
}

private FutureListenerWrappers() {}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import io.netty.channel.embedded.EmbeddedChannel
import io.netty.util.concurrent.Future
import io.netty.util.concurrent.GenericFutureListener
import io.netty.util.concurrent.GenericProgressiveFutureListener
import io.netty.util.concurrent.ProgressiveFuture
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

class ChannelFutureTest extends AgentInstrumentationSpecification {
// regression test for https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/2705
def "should clean up wrapped listeners"() {
given:
def channel = new EmbeddedChannel()
def counter = new AtomicInteger()

def listener1 = newListener(counter)
channel.closeFuture().addListener(listener1)
channel.closeFuture().removeListener(listener1)

def listener2 = newListener(counter)
def listener3 = newProgressiveListener(counter)
channel.closeFuture().addListeners(listener2, listener3)
channel.closeFuture().removeListeners(listener2, listener3)

when:
channel.close().await(5, TimeUnit.SECONDS)

then:
counter.get() == 0
}

private static GenericFutureListener newListener(AtomicInteger counter) {
new GenericFutureListener() {
void operationComplete(Future future) throws Exception {
counter.incrementAndGet()
}
}
}

private static GenericFutureListener newProgressiveListener(AtomicInteger counter) {
new GenericProgressiveFutureListener() {
void operationProgressed(ProgressiveFuture future, long progress, long total) throws Exception {
counter.incrementAndGet()
}

void operationComplete(Future future) throws Exception {
counter.incrementAndGet()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public interface ContextStore<K, C> {
*
* @param <C> context type
*/
@FunctionalInterface
interface Factory<C> {

/** Returns a new context instance. */
Expand Down

0 comments on commit 77f8be8

Please sign in to comment.