diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ConstantSizeStack.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ConstantSizeStack.java deleted file mode 100644 index 2ff026d07726..000000000000 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ConstantSizeStack.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; - -import io.opentelemetry.instrumentation.api.internal.GuardedBy; -import javax.annotation.Nullable; - -final class ConstantSizeStack { - - private final Object lock = new Object(); - - @GuardedBy("lock") - private final Object[] items; - - @GuardedBy("lock") - private int currentIndex; - - ConstantSizeStack(int size) { - items = new Object[size]; - // will start from 0 on the first push() call - currentIndex = size - 1; - } - - void push(T item) { - synchronized (lock) { - currentIndex = (currentIndex + 1) % items.length; - items[currentIndex] = item; - } - } - - @Nullable - T pop() { - synchronized (lock) { - T item = peek(); - items[currentIndex] = null; - int size = items.length; - currentIndex = ((currentIndex == 0 ? size : currentIndex) - 1) % size; - return item; - } - } - - @Nullable - @SuppressWarnings("unchecked") - T peek() { - synchronized (lock) { - return (T) items[currentIndex]; - } - } -} diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java index 8ba67e38ee3f..2c6f90826a23 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java @@ -24,8 +24,7 @@ public final class HttpResponseReceiverInstrumenter { // this method adds several stateful listeners that execute the instrumenter lifecycle during HTTP // request processing // it should be used just before one of the response*() methods is called - after this point the - // HTTP - // request is no longer modifiable by the user + // HTTP request is no longer modifiable by the user @Nullable public static HttpClient.ResponseReceiver instrument(HttpClient.ResponseReceiver receiver) { // receiver should always be an HttpClientFinalizer, which both extends HttpClient and diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/InstrumentationContexts.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/InstrumentationContexts.java index 34601e6468d4..e6af2f990268 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/InstrumentationContexts.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/InstrumentationContexts.java @@ -9,17 +9,23 @@ import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientResend; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.annotation.Nullable; import reactor.netty.http.client.HttpClientRequest; import reactor.netty.http.client.HttpClientResponse; final class InstrumentationContexts { + static final Logger logger = Logger.getLogger(InstrumentationContexts.class.getName()); + private volatile Context parentContext; // on retries, reactor-netty starts the next resend attempt before it ends the previous one (i.e. // it calls the callback functions in that order); thus for a short moment there can be 2 // coexisting HTTP client spans - private final ConstantSizeStack clientContexts = new ConstantSizeStack<>(2); + private final Queue clientContexts = new ArrayBlockingQueue<>(2, true); void initialize(Context parentContext) { this.parentContext = HttpClientResend.initialize(parentContext); @@ -41,13 +47,19 @@ Context startClientSpan(HttpClientRequest request) { Context context = null; if (instrumenter().shouldStart(parentContext, request)) { context = instrumenter().start(parentContext, request); - clientContexts.push(new RequestAndContext(request, context)); + if (!clientContexts.offer(new RequestAndContext(request, context))) { + // should not ever happen in reality + String message = + "Could not instrument HTTP client request; not enough space in the request queue"; + logger.log(Level.FINE, message); + instrumenter().end(context, request, null, new IllegalStateException(message)); + } } return context; } void endClientSpan(@Nullable HttpClientResponse response, @Nullable Throwable error) { - RequestAndContext requestAndContext = clientContexts.pop(); + RequestAndContext requestAndContext = clientContexts.poll(); if (requestAndContext != null) { instrumenter().end(requestAndContext.context, requestAndContext.request, response, error); }