diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssembly.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssembly.java index 26175bf3646f..af9cbd3b8ca8 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssembly.java @@ -178,16 +178,18 @@ private static void enableFlowable() { @SuppressWarnings({"rawtypes", "unchecked"}) private static void enableObservable() { - oldOnObservableSubscribe = RxJavaPlugins.getOnObservableSubscribe(); - RxJavaPlugins.setOnObservableSubscribe( - biCompose( - oldOnObservableSubscribe, - (observable, observer) -> { - final Context context = Context.current(); - try (Scope ignored = context.makeCurrent()) { - return new TracingObserver(observer, context); - } - })); + if (TracingObserver.canEnable()) { + oldOnObservableSubscribe = RxJavaPlugins.getOnObservableSubscribe(); + RxJavaPlugins.setOnObservableSubscribe( + biCompose( + oldOnObservableSubscribe, + (observable, observer) -> { + final Context context = Context.current(); + try (Scope ignored = context.makeCurrent()) { + return new TracingObserver(observer, context); + } + })); + } } @SuppressWarnings({"rawtypes", "unchecked"}) diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingObserver.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingObserver.java index f411e50c0d59..1fb334df6b89 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingObserver.java +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingObserver.java @@ -27,8 +27,11 @@ import io.reactivex.Observer; import io.reactivex.internal.fuseable.QueueDisposable; import io.reactivex.internal.observers.BasicFuseableObserver; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; class TracingObserver extends BasicFuseableObserver { + private static final MethodHandle queueDisposableGetter = getQueueDisposableGetter(); // BasicFuseableObserver#actual has been renamed to downstream in newer versions, we can't use it // in this class @@ -64,7 +67,7 @@ public void onComplete() { @Override public int requestFusion(int mode) { - final QueueDisposable qd = this.qs; + final QueueDisposable qd = getQueueDisposable(); if (qd != null) { final int m = qd.requestFusion(mode); sourceMode = m; @@ -75,6 +78,36 @@ public int requestFusion(int mode) { @Override public T poll() throws Exception { - return qs.poll(); + return getQueueDisposable().poll(); + } + + private QueueDisposable getQueueDisposable() { + try { + return (QueueDisposable) queueDisposableGetter.invoke(this); + } catch (Throwable throwable) { + throw new IllegalStateException(throwable); + } + } + + private static MethodHandle getGetterHandle(String fieldName) { + try { + return MethodHandles.lookup() + .findGetter(BasicFuseableObserver.class, fieldName, QueueDisposable.class); + } catch (NoSuchFieldException | IllegalAccessException ignored) { + } + return null; + } + + private static MethodHandle getQueueDisposableGetter() { + MethodHandle getter = getGetterHandle("qd"); + if (getter == null) { + // in versions before 2.2.1 field was named "qs" + getter = getGetterHandle("qs"); + } + return getter; + } + + public static boolean canEnable() { + return queueDisposableGetter != null; } } diff --git a/instrumentation/rxjava/rxjava-2.0/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava2/AbstractRxJava2SubscriptionTest.groovy b/instrumentation/rxjava/rxjava-2.0/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava2/AbstractRxJava2SubscriptionTest.groovy index c248f0e0438e..752472677d6d 100644 --- a/instrumentation/rxjava/rxjava-2.0/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava2/AbstractRxJava2SubscriptionTest.groovy +++ b/instrumentation/rxjava/rxjava-2.0/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava2/AbstractRxJava2SubscriptionTest.groovy @@ -6,10 +6,12 @@ package io.opentelemetry.instrumentation.rxjava2 import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan +import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runInternalSpan import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace import io.opentelemetry.api.GlobalOpenTelemetry import io.opentelemetry.instrumentation.test.InstrumentationSpecification +import io.reactivex.Observable import io.reactivex.Single import io.reactivex.functions.Consumer @@ -17,7 +19,7 @@ import java.util.concurrent.CountDownLatch abstract class AbstractRxJava2SubscriptionTest extends InstrumentationSpecification { - def "subscription test"() { + def "subscribe single test"() { when: CountDownLatch latch = new CountDownLatch(1) runUnderTrace("parent") { @@ -43,6 +45,32 @@ abstract class AbstractRxJava2SubscriptionTest extends InstrumentationSpecificat } } + def "test observable fusion"() { + when: + CountDownLatch latch = new CountDownLatch(1) + runUnderTrace("parent") { + Observable integerObservable = Observable.just(1, 2, 3, 4) + integerObservable.concatMap({ + return Observable.just(it) + }).count().subscribe(new Consumer() { + @Override + void accept(Long count) { + runInternalSpan("child") + latch.countDown() + } + }) + } + latch.await() + + then: + assertTraces(1) { + trace(0, 2) { + basicSpan(it, 0, "parent") + basicSpan(it, 1, "child", span(0)) + } + } + } + static class Connection { static int query() { def span = GlobalOpenTelemetry.getTracer("test").spanBuilder("Connection.query").startSpan()