From f4381f920cc7ff689a10e673e05b6b02a70688b5 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Wed, 21 Apr 2021 16:33:49 +0300 Subject: [PATCH 1/2] Fix rxjava2 NoSuchFieldError --- .../rxjava2/TracingAssembly.java | 22 ++++++----- .../rxjava2/TracingObserver.java | 37 ++++++++++++++++++- .../AbstractRxJava2SubscriptionTest.groovy | 30 ++++++++++++++- 3 files changed, 76 insertions(+), 13 deletions(-) diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssembly.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssembly.java index 26175bf3646f..af9cbd3b8ca8 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssembly.java +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingAssembly.java @@ -178,16 +178,18 @@ private static void enableFlowable() { @SuppressWarnings({"rawtypes", "unchecked"}) private static void enableObservable() { - oldOnObservableSubscribe = RxJavaPlugins.getOnObservableSubscribe(); - RxJavaPlugins.setOnObservableSubscribe( - biCompose( - oldOnObservableSubscribe, - (observable, observer) -> { - final Context context = Context.current(); - try (Scope ignored = context.makeCurrent()) { - return new TracingObserver(observer, context); - } - })); + if (TracingObserver.canEnable()) { + oldOnObservableSubscribe = RxJavaPlugins.getOnObservableSubscribe(); + RxJavaPlugins.setOnObservableSubscribe( + biCompose( + oldOnObservableSubscribe, + (observable, observer) -> { + final Context context = Context.current(); + try (Scope ignored = context.makeCurrent()) { + return new TracingObserver(observer, context); + } + })); + } } @SuppressWarnings({"rawtypes", "unchecked"}) diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingObserver.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingObserver.java index f411e50c0d59..52c082eaa1e0 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingObserver.java +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingObserver.java @@ -27,8 +27,11 @@ import io.reactivex.Observer; import io.reactivex.internal.fuseable.QueueDisposable; import io.reactivex.internal.observers.BasicFuseableObserver; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; class TracingObserver extends BasicFuseableObserver { + private static final MethodHandle queueDisposableGetter = getQueueDisposableGetter(); // BasicFuseableObserver#actual has been renamed to downstream in newer versions, we can't use it // in this class @@ -64,7 +67,7 @@ public void onComplete() { @Override public int requestFusion(int mode) { - final QueueDisposable qd = this.qs; + final QueueDisposable qd = getQueueDisposable(); if (qd != null) { final int m = qd.requestFusion(mode); sourceMode = m; @@ -75,6 +78,36 @@ public int requestFusion(int mode) { @Override public T poll() throws Exception { - return qs.poll(); + return getQueueDisposable().poll(); + } + + private QueueDisposable getQueueDisposable() { + try { + return (QueueDisposable) queueDisposableGetter.invoke(this); + } catch (Throwable throwable) { + throw new IllegalStateException(throwable); + } + } + + private static MethodHandle getGetterHandle(String fieldName) { + try { + return MethodHandles.lookup() + .findGetter(BasicFuseableObserver.class, fieldName, QueueDisposable.class); + } catch (NoSuchFieldException | IllegalAccessException ignored) { + } + return null; + } + + private static MethodHandle getQueueDisposableGetter() { + MethodHandle getter = getGetterHandle("qd"); + if (getter == null) { + // in versions before 2.2.21 field was named "qs" + getter = getGetterHandle("qs"); + } + return getter; + } + + public static boolean canEnable() { + return queueDisposableGetter != null; } } diff --git a/instrumentation/rxjava/rxjava-2.0/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava2/AbstractRxJava2SubscriptionTest.groovy b/instrumentation/rxjava/rxjava-2.0/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava2/AbstractRxJava2SubscriptionTest.groovy index c248f0e0438e..752472677d6d 100644 --- a/instrumentation/rxjava/rxjava-2.0/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava2/AbstractRxJava2SubscriptionTest.groovy +++ b/instrumentation/rxjava/rxjava-2.0/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava2/AbstractRxJava2SubscriptionTest.groovy @@ -6,10 +6,12 @@ package io.opentelemetry.instrumentation.rxjava2 import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan +import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runInternalSpan import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace import io.opentelemetry.api.GlobalOpenTelemetry import io.opentelemetry.instrumentation.test.InstrumentationSpecification +import io.reactivex.Observable import io.reactivex.Single import io.reactivex.functions.Consumer @@ -17,7 +19,7 @@ import java.util.concurrent.CountDownLatch abstract class AbstractRxJava2SubscriptionTest extends InstrumentationSpecification { - def "subscription test"() { + def "subscribe single test"() { when: CountDownLatch latch = new CountDownLatch(1) runUnderTrace("parent") { @@ -43,6 +45,32 @@ abstract class AbstractRxJava2SubscriptionTest extends InstrumentationSpecificat } } + def "test observable fusion"() { + when: + CountDownLatch latch = new CountDownLatch(1) + runUnderTrace("parent") { + Observable integerObservable = Observable.just(1, 2, 3, 4) + integerObservable.concatMap({ + return Observable.just(it) + }).count().subscribe(new Consumer() { + @Override + void accept(Long count) { + runInternalSpan("child") + latch.countDown() + } + }) + } + latch.await() + + then: + assertTraces(1) { + trace(0, 2) { + basicSpan(it, 0, "parent") + basicSpan(it, 1, "child", span(0)) + } + } + } + static class Connection { static int query() { def span = GlobalOpenTelemetry.getTracer("test").spanBuilder("Connection.query").startSpan() From 0f6583bec066e00b985f98a03127ae5b960e2911 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Wed, 21 Apr 2021 17:05:14 +0300 Subject: [PATCH 2/2] correct version number where field name changed --- .../opentelemetry/instrumentation/rxjava2/TracingObserver.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingObserver.java b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingObserver.java index 52c082eaa1e0..1fb334df6b89 100644 --- a/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingObserver.java +++ b/instrumentation/rxjava/rxjava-2.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava2/TracingObserver.java @@ -101,7 +101,7 @@ private static MethodHandle getGetterHandle(String fieldName) { private static MethodHandle getQueueDisposableGetter() { MethodHandle getter = getGetterHandle("qd"); if (getter == null) { - // in versions before 2.2.21 field was named "qs" + // in versions before 2.2.1 field was named "qs" getter = getGetterHandle("qs"); } return getter;