Skip to content

Commit

Permalink
Propagate context into redisson async callback (open-telemetry#5313)
Browse files Browse the repository at this point in the history
* Propagate context into redisson async callback

* Apply suggestions from code review

Co-authored-by: Anuraag Agrawal <[email protected]>

Co-authored-by: Anuraag Agrawal <[email protected]>
  • Loading branch information
2 people authored and RashmiRam committed May 23, 2022
1 parent a2f98ca commit e9de2b2
Show file tree
Hide file tree
Showing 9 changed files with 201 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.redisson;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.util.concurrent.CompletableFuture;

public final class CompletableFutureWrapper<T> extends CompletableFuture<T>
implements PromiseWrapper<T> {
private volatile EndOperationListener<T> endOperationListener;

private CompletableFutureWrapper(CompletableFuture<T> delegate, Context context) {
this.whenComplete(
(result, error) -> {
EndOperationListener<T> endOperationListener = this.endOperationListener;
if (endOperationListener != null) {
endOperationListener.accept(result, error);
}
try (Scope ignored = context.makeCurrent()) {
if (error != null) {
delegate.completeExceptionally(error);
} else {
delegate.complete(result);
}
}
});
}

public static <T> CompletableFuture<T> wrap(CompletableFuture<T> delegate) {
if (delegate instanceof CompletableFutureWrapper) {
return delegate;
}

return new CompletableFutureWrapper<>(delegate, Context.current());
}

@Override
public void setEndOperationListener(EndOperationListener<T> endOperationListener) {
this.endOperationListener = endOperationListener;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import static io.opentelemetry.javaagent.instrumentation.redisson.RedissonSingletons.instrumenter;

import io.netty.util.concurrent.Future;
import io.opentelemetry.context.Context;
import java.util.function.BiConsumer;

Expand All @@ -20,10 +19,6 @@ public EndOperationListener(Context context, RedissonRequest request) {
this.request = request;
}

public void operationComplete(Future<T> future) {
instrumenter().end(context, request, null, future.cause());
}

@Override
public void accept(T t, Throwable error) {
instrumenter().end(context, request, null, error);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.redisson;

public interface PromiseWrapper<T> {

void setEndOperationListener(EndOperationListener<T> endOperationListener);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.redisson;

import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.concurrent.CompletableFuture;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.redisson.misc.RPromise;

public class RedisCommandDataInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return namedOneOf(
"org.redisson.client.protocol.CommandData", "org.redisson.client.protocol.CommandsData");
}

@Override
public void transform(TypeTransformer transformer) {
// before 3.16.8
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, named("org.redisson.misc.RPromise"))),
this.getClass().getName() + "$WrapPromiseAdvice");
// since 3.16.8
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, named("java.util.concurrent.CompletableFuture"))),
this.getClass().getName() + "$WrapCompletableFutureAdvice");
}

@SuppressWarnings("unused")
public static class WrapPromiseAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Argument(value = 0, readOnly = false) RPromise<?> promise) {
promise = RedissonPromiseWrapper.wrap(promise);
}
}

@SuppressWarnings("unused")
public static class WrapCompletableFutureAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false) CompletableFuture<?> completableFuture) {
completableFuture = CompletableFutureWrapper.wrap(completableFuture);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletionStage;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand Down Expand Up @@ -43,15 +42,22 @@ public static void onEnter(
@Advice.Local("otelRequest") RedissonRequest request,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {

Context parentContext = currentContext();
InetSocketAddress remoteAddress = (InetSocketAddress) connection.getChannel().remoteAddress();
request = RedissonRequest.create(remoteAddress, arg);
PromiseWrapper<?> promise = request.getPromiseWrapper();
if (promise == null) {
return;
}
if (!instrumenter().shouldStart(parentContext, request)) {
return;
}

context = instrumenter().start(parentContext, request);
scope = context.makeCurrent();

promise.setEndOperationListener(new EndOperationListener<>(context, request));
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
Expand All @@ -66,13 +72,10 @@ public static void stopSpan(
}
scope.close();

CompletionStage<?> promise = request.getPromise();
if (promise == null || throwable != null) {
if (throwable != null) {
instrumenter().end(context, request, null, throwable);
} else {
// span ended in EndOperationListener
promise.whenComplete(new EndOperationListener<>(context, request));
}
// span ended in EndOperationListener
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package io.opentelemetry.javaagent.instrumentation.redisson;

import static java.util.Collections.singletonList;
import static java.util.Arrays.asList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
Expand All @@ -21,6 +21,6 @@ public RedissonInstrumentationModule() {

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new RedisConnectionInstrumentation());
return asList(new RedisConnectionInstrumentation(), new RedisCommandDataInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.redisson;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;

public class RedissonPromiseWrapper<T> extends RedissonPromise<T> implements PromiseWrapper<T> {
private volatile EndOperationListener<T> endOperationListener;

private RedissonPromiseWrapper(RPromise<T> delegate, Context context) {
this.whenComplete(
(result, error) -> {
EndOperationListener<T> endOperationListener = this.endOperationListener;
if (endOperationListener != null) {
endOperationListener.accept(result, error);
}
try (Scope ignored = context.makeCurrent()) {
if (error != null) {
delegate.tryFailure(error);
} else {
delegate.trySuccess(result);
}
}
});
}

public static <T> RPromise<T> wrap(RPromise<T> delegate) {
if (delegate instanceof RedissonPromiseWrapper) {
return delegate;
}

return new RedissonPromiseWrapper<>(delegate, Context.current());
}

@Override
public void setEndOperationListener(EndOperationListener<T> endOperationListener) {
this.endOperationListener = endOperationListener;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,16 @@ private static String normalizeSingleCommand(CommandData<?, ?> command) {
}

@Nullable
public CompletionStage<?> getPromise() {
public PromiseWrapper<?> getPromiseWrapper() {
CompletionStage<?> promise = getPromise();
if (promise instanceof PromiseWrapper) {
return (PromiseWrapper<?>) promise;
}
return null;
}

@Nullable
private CompletionStage<?> getPromise() {
Object command = getCommand();
if (command instanceof CommandData && COMMAND_DATA_GET_PROMISE != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import spock.lang.Shared
import java.util.concurrent.TimeUnit

import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.INTERNAL

class RedissonAsyncClientTest extends AgentInstrumentationSpecification {

Expand Down Expand Up @@ -85,19 +86,29 @@ class RedissonAsyncClientTest extends AgentInstrumentationSpecification {
def "test future whenComplete"() {
when:
RSet<String> rSet = redisson.getSet("set1")
RFuture<Boolean> result = rSet.addAsync("s1")
result.whenComplete({ res, throwable ->
RList<String> strings = redisson.getList("list1")
strings.add("a")
})
RFuture<Boolean> result = runWithSpan("parent") {
RFuture<Boolean> result = rSet.addAsync("s1")
result.whenComplete({ res, throwable ->
new Exception().printStackTrace()
RList<String> strings = redisson.getList("list1")
strings.add("a")
})
return result
}

then:
result.get(30, TimeUnit.SECONDS)
assertTraces(2) {
trace(0, 1) {
assertTraces(1) {
trace(0, 3) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name "SADD"
kind CLIENT
childOf(span(0))
attributes {
"$SemanticAttributes.DB_SYSTEM" "redis"
"$SemanticAttributes.NET_PEER_IP" "127.0.0.1"
Expand All @@ -107,11 +118,10 @@ class RedissonAsyncClientTest extends AgentInstrumentationSpecification {
"$SemanticAttributes.DB_OPERATION" "SADD"
}
}
}
trace(1, 1) {
span(0) {
span(2) {
name "RPUSH"
kind CLIENT
childOf(span(0))
attributes {
"$SemanticAttributes.DB_SYSTEM" "redis"
"$SemanticAttributes.NET_PEER_IP" "127.0.0.1"
Expand Down

0 comments on commit e9de2b2

Please sign in to comment.