Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenTelemetry context not propagated to a fork-join pool thread via the OpenTelemetryMpContextPropagation #31417

Closed
ppatierno opened this issue Feb 25, 2023 · 22 comments · Fixed by #31546

Comments

@ppatierno
Copy link
Contributor

While porting the Strimzi HTTP - Kafka bridge from pure Vert.x to Quarkus (using RESTeasy Reactive) I need OpenTelemetry context propagation across threads.

In our code, a REST call is executed in the Vert.x event loop which brings the OTel context (with the HTTP call span) within the Vert.x context. The REST call then uses CompletableFuture.supplyAsync to run a Kafka client call (in order to free the Vert.x loop soon) which uses an interceptor (from the OpenTelementry Kafka instrumentation library) to add its own span which has to be a child of the HTTP call span.
The problem is that the original OTel context is not propagated in the fork-join pool thread, where the Kafka call runs.

In order to do that I found that:

  • I can use a ManagedExecutor instance to run the CompletableFuture.supplyAsync code in a Quarkus pool related thread and the OTel context is propagated correctly.
  • I can run the REST call using @Blocking annotation. Not using the CompletableFuture.supplyAsync and running everything in the same worker thread (not using the Vert.x event loop at all).
  • I found the OpenTelemetryMpContextPropagation (coming in Quarkus 3.0.0) which should help to do the same by capturing the context with threadContext.withContextCapture on the CompletableFuture.supplyAsync call.

While the first two options work, the third one doesn't.
If I understood the purpose of the OpenTelemetryMpContextPropagation, my expectation was that the OTel context is propagated from a Quarkus managed thread (i.e. Vert.x event loop, any worker thread, ...) to a fork-join pool as well but it seems not happening.
In our code, the Kafka related span is not attached as child of the HTTP request span, because of the OTel context not propagated (because of missing Vert.x context) . I was able to see why and replicated in the OpenTelemetryMpContextPropagationTest just by printing the trace and span ids this way.

@GET
        @Path("/helloWithContextPropagation")
        public CompletionStage<String> helloWithCustomExecutor() {
            System.out.println("Thread " + Thread.currentThread() + " ----> "
                    + Span.current().getSpanContext().getTraceId() + " - "
                    + Span.current().getSpanContext().getSpanId());
            String message = "Hello/" + Span.current().getSpanContext().getTraceId();
            return this.threadContext
                    .withContextCapture(CompletableFuture.supplyAsync(
                            () -> {
                                System.out.println("Thread " + Thread.currentThread() + " ----> "
                                        + Span.current().getSpanContext().getTraceId() + " - "
                                        + Span.current().getSpanContext().getSpanId());
                                return message;
                            }, this.customExecutorService))
                    .thenApplyAsync(msg -> {
                        System.out.println("Thread " + Thread.currentThread() + " ----> "
                                + Span.current().getSpanContext().getTraceId() + " - "
                                + Span.current().getSpanContext().getSpanId());
                        return msg + "-" + Span.current().getSpanContext().getTraceId();
                    });
        }

The output is something like this:

Thread Thread[executor-thread-0,5,main] ----> 249af904b222cf7684706bedbb0a26ff - 943063dd4c81d983
Thread Thread[ForkJoinPool-1-worker-19,5,main] ----> 00000000000000000000000000000000 - 0000000000000000
Thread Thread[executor-thread-1,5,main] ----> 249af904b222cf7684706bedbb0a26ff - 943063dd4c81d983

As you can see the OTel context is not propagated in the fork-join pool thread, not managed by Quarkus, and it returns to be available in the thenApplyAsync because the code is executed in some Quarkus managed thread, I guess.
AFAIU, the purpose of the OpenTelemetryMpContextPropagation was to make OTel context available across threads even if not managed by Quarkus, so not propagated by a Vert.x context.
Is there anything I misunderstood about the usage of OpenTelemetryMpContextPropagation, or is it a bug?

@quarkus-bot
Copy link

quarkus-bot bot commented Feb 25, 2023

/cc @brunobat (opentelemetry), @radcortez (opentelemetry)

@ppatierno
Copy link
Contributor Author

@radcortez I opened this issue you could be interested into.

@brunobat
Copy link
Contributor

Hi @ppatierno can you please explain were is this OpenTelemetryMpContextPropagation class?

My understanding is that you need managed managed threads to work in a container like Quarkus.

@ppatierno
Copy link
Contributor Author

@brunobat it was suggest and added by this PR #30478
It's available in Quarkus 3.0.0 only but as suggested by @radcortez I just copied the code into my project.
Anyway, unrelated to my project, my question is why it's not doing what it is supposed to do even with the official corresponding Quarkus unit test as explained above.
Or I didn't understand the purpose of that class :-)
As I mentioned, without that class, but using a ManagedExecutor, everything works fine and the async code runs in a Quarkus managed worker thread instead a fork-join pool one. But it seems that OpenTelemetryMpContextPropagation was added for this purpose as well.

@brunobat
Copy link
Contributor

My understanding is that is attaching the OTel context to a thread customised with org.eclipse.microprofile.context.spi.ThreadContextProvider which is MicroProfile related and managed. Not jork/join pool.

@ppatierno
Copy link
Contributor Author

So I don't understand the role of OpenTelemetryMpContextPropagation class yet.
Because without using it at all, but just using a ManagedExecutor to run my CompletableFuture.supplyAsync code, the context is propagated correctly (because not using a fork-join pool thread but a Quarkus managed one).

@radcortez
Copy link
Member

The issue is that the thenApplyAsync is not using the custom executor in the test. So it works, as you noticed, because it executes in the Quarkus managed worker thread.

I've opened #31546 to fix this in the test.

@ppatierno
Copy link
Contributor Author

ppatierno commented Mar 2, 2023

@radcortez I am confused I don't understand how it fixes my problem. Even because in my use case I don't have a thenApplyAsync. If I run the test with your fix I get:

Thread Thread[executor-thread-0,5,main] ----> 0e86b6dc0b77f82107401408e681068c - 5f7b36f74fddb93e
Thread Thread[ForkJoinPool-1-worker-19,5,main] ----> 00000000000000000000000000000000 - 0000000000000000
Thread Thread[ForkJoinPool-1-worker-19,5,main] ----> 0e86b6dc0b77f82107401408e681068c - 5f7b36f74fddb93e

So the only difference is that the thenApplyAsync code is running in the forkjoin pool thread and it has the right context ... but it's late for my use case. I need that context is right inside the code running in the CompletableFuture.supplyAsync. Even with your fix (which actually fixes the test) you see a zeroes context where I need the right one. That is where the Kafka interceptor OTel instrumentation code runs to create a span which needs the right context to be set as a child of the previous span.

The output that I would expect, at least from what is my pov why the context propagation is not working would be this:

Thread Thread[executor-thread-0,5,main] ----> 0e86b6dc0b77f82107401408e681068c - 5f7b36f74fddb93e
Thread Thread[ForkJoinPool-1-worker-19,5,main] ----> 0e86b6dc0b77f82107401408e681068c - 5f7b36f74fddb93e
Thread Thread[ForkJoinPool-1-worker-19,5,main] ----> 0e86b6dc0b77f82107401408e681068c - 5f7b36f74fddb93e

@quarkus-bot quarkus-bot bot added this to the 3.0 - main milestone Mar 2, 2023
@radcortez
Copy link
Member

It may be related to how the Kafka interceptor OTel instrumentation is doing.

Do you have a reproducer I can look at?

@ppatierno
Copy link
Contributor Author

ppatierno commented Mar 2, 2023

@radcortez I would like to take the Kafka interceptor OTel out of the picture ... why context is not available in the CompletableFuture.supplyAsync in the above test?
Again ... we see this output now with your fix:

Thread Thread[executor-thread-0,5,main] ----> 0e86b6dc0b77f82107401408e681068c - 5f7b36f74fddb93e
Thread Thread[ForkJoinPool-1-worker-19,5,main] ----> 00000000000000000000000000000000 - 0000000000000000
Thread Thread[ForkJoinPool-1-worker-19,5,main] ----> 0e86b6dc0b77f82107401408e681068c - 5f7b36f74fddb93e

Why context traceid-spanid is all zeros in the "middle"? If the context is right there, it will be ok in the Kafka interceptor OTel as well.
For this reason I switched to use this test taking my code out of the picture.
What am I missing here? Why do you think that all the zeroes are ok? If they are ok it won't never work with any code running in the CompletableFuture.supplyAsync call (not just the Kafka interceptor OTel).
Imagine that in that CompletableFuture.supplyAsync call I create a span and want to make it child of the main span, it won't work because context is not propagated, or?

@radcortez
Copy link
Member

Sorry, I didn't read your question correctly the first time and just focused on the requirement of OpenTelemetryMpContextPropagation :(

The issue is that withContextCapture does not propagate the context:

https://github.com/eclipse/microprofile-context-propagation/blob/master/api/src/main/java/org/eclipse/microprofile/context/ThreadContext.java#L612-L614

Only the new CompletableFuture is affected by the propagation done by OpenTelemetryMpContextPropagation.

Is there a reason you want to apply the code in the withContextCapture method and not in the new CompletableFuture?

@ppatierno
Copy link
Contributor Author

The issue is that withContextCapture does not propagate the context:

https://github.com/eclipse/microprofile-context-propagation/blob/master/api/src/main/java/org/eclipse/microprofile/context/ThreadContext.java#L612-L614

Only the new CompletableFuture is affected by the propagation done by OpenTelemetryMpContextPropagation.

Got it!

Is there a reason you want to apply the code in the withContextCapture method and not in the new CompletableFuture?

Let me reply with another question then ... what should I have in the withContextCapture method, an "empty" CompletableFuture in order to run my actual code, where I need the context, in the thenApplyAsync ? I don't understand ...
So more in general what should be called inside withContextCapture method if the context is not available there?
Maybe what you expect is ...

  1. in the withContextCapture method I run some async operation (no span is created)
  2. in the following thenApplyAsync (where context is propagated) I should create the span
    If the above thinking is right, it cannot work in my case because the context needs to be available in the withContextCapture method for the Kafka interceptor used there.

@radcortez
Copy link
Member

Maybe what you expect is ...

  1. in the withContextCapture method I run some async operation (no span is created)
  2. in the following thenApplyAsync (where context is propagated) I should create the span
    If the above thinking is right, it cannot work in my case because the context needs to be available in the withContextCapture method for the Kafka interceptor used there.

Correct. I'm not saying that you are wrong. Is just how the spec is designed with withContextCapture not receiving the propagated context. Maybe we need to find some other way to propagate the OTel context in there too.

Is it possible to supply the Kafka interceptor to the next completable future? Theoretically, I don't see a reason for that not to work. What issue do you experience with it?

@Ladicek are you aware of a way to propagate a context in withContextCapture? Or was that left out on purpose?

@ppatierno
Copy link
Contributor Author

ppatierno commented Mar 2, 2023

Correct. I'm not saying that you are wrong. Is just how the spec is designed with withContextCapture not receiving the propagated context. Maybe we need to find some other way to propagate the OTel context in there too.

Ah ok, then we are on the same page! That was the missing piece on my side, it is just by design.

Is it possible to supply the Kafka interceptor to the next completable future? Theoretically, I don't see a reason for that not to work. What issue do you experience with it?

The Kafka interceptor is something that you set when you create a Kafka producer client. It is called when you use the Kafka producer client send operation (to send a message to a Kafka topic). Before actually sending the message, the interceptor runs some code and then send the message. So it's where it adds the span to trace the send operation.
Such Kafka client send operation is exactly what happens in the completable future wrapped by the withContextCapture (fyi the withContextCapture is not needed for my purpose, it was not there before, I added that just for the context propagation purpose).

@radcortez
Copy link
Member

But you do call some sort of send or equivalent in withContextCapture to send the message (which internally calls the Kafka OTel interceptor), right?

What if you wrap that call in a Runnable in withContextCapture so you can call send in a propagated thenApplyAsync?

@ppatierno
Copy link
Contributor Author

What if you wrap that call in a Runnable in withContextCapture so you can call send in a propagated thenApplyAsync?

I didn't get your suggestion actually :-(
The send should happen in the Runnable, ho it would happen in the thenApplyAsync?

Let me share some part of the current code which is not updated (I have my tries with OpenTelemetry integration just locally for now).

So this is the REST method called from outside and start the process of sending the message through the RestSourceBridgeEndpoint.send :
https://github.com/ppatierno/strimzi-kafka-bridge/blob/road-to-quarkus/src/main/java/io/strimzi/kafka/bridge/quarkus/RestBridge.java#L136

this returns a completable future, it starts a CompletableFuture.supplyAsync here:

https://github.com/ppatierno/strimzi-kafka-bridge/blob/road-to-quarkus/src/main/java/io/strimzi/kafka/bridge/quarkus/RestSourceBridgeEndpoint.java#L149

which then call a this.kafkaBridgeProducer.send . Its code is here and it's where the actual Kafka client is called to send the message (see this.producer.send) so will be called the interceptor:

https://github.com/ppatierno/strimzi-kafka-bridge/blob/road-to-quarkus/src/main/java/io/strimzi/kafka/bridge/quarkus/KafkaBridgeProducer.java#L58

So what I was doing in the first REST related method was trying:

return this.threadContext.withContextCapture(source.send(routingContext, body, topicName, async));

Hoping that wrapping that Completable future + the OpenTelemetryMpContextPropagation would have helped with context propagation.

@Ladicek
Copy link
Contributor

Ladicek commented Mar 3, 2023

I don't really know what this issue is all about, but seeing code like

return this.threadContext.withContextCapture(source.send(routingContext, body, topicName, async));

immediately triggers an alert in my head. This is virtually always wrong and if it compiles, that's by accident. The context propagation API is supposed to be used like this:

return this.threadContext.withContextCapture(() -> source.send(routingContext, body, topicName, async));

@ppatierno
Copy link
Contributor Author

ppatierno commented Mar 3, 2023

I don't really know what this issue is all about

What isn't clear from the description I wrote down in the very first comment with printing some log exactly?

immediately triggers an alert in my head. This is virtually always wrong and if it compiles, that's by accident.

I see that withContextCapture has an overload like this https://download.eclipse.org/microprofile/microprofile-context-propagation-1.0/apidocs/org/eclipse/microprofile/context/ThreadContext.html#withContextCapture-java.util.concurrent.CompletionStage-
It gets a CompletionStage which is exactly what the source.send method returns. You are referring to the overload with a Runnable instead. So I think the code compiles and not by accident, or?

@radcortez
Copy link
Member

radcortez commented Mar 3, 2023

I didn't get your suggestion actually :-(
The send should happen in the Runnable, ho it would happen in the thenApplyAsync?

Something like this:

threadContext
                .withContextCapture(CompletableFuture.supplyAsync(new Supplier<Runnable>() {
                    @Override
                    public Runnable get() {
                        return new Runnable() {
                            @Override
                            public void run() {
                                // Send message here
                            }
                        };
                    }
                }, customExecutorService))
                .thenApplyAsync(new Function<Runnable, Void>() {
                    @Override 
                    public Void apply(final Runnable runnable) {
                        runnable.run();
                        return null;
                    }
                }, customExecutorService);

@Ladicek
Copy link
Contributor

Ladicek commented Mar 3, 2023

Sorry, when I said "I don't really know what this issue is all about", I meant I haven't been involved here and I don't have the time to go through the history. I didn't mean to imply that the issue is unclear -- just to state that I don't know it.

Now, you can of course call ThreadContext.withContextCapture(CompletionStage), but you can't expect that the running process whose result is represented by the CompletionStage will have contexts propagated -- it's already running by the time withContextCapture is called. Hence I believe using the Runnable, Callable or Supplier overloads is almost always what you need.

(And now I see that those overloads are not actually overloads [of withContextCapture]. They are contextualRunnable, contextualCallable etc.)

@ppatierno
Copy link
Contributor Author

@radcortez ... it seems too much :-) but I understand what you are proposing.
Isn't a this point better getting rid of the withContextCapture and just run my CompletableFuture.supplyAsync call with a ManagedExecutor instance? It would run the async call in a Quarkus managed thread and I wrote at the beginning of this thread, it works like a charm. So it let me think what's the real purpose of the OpenTelemetryMpContextPropagation class?

but you can't expect that the running process whose result is represented by the CompletionStage will have contexts propagated -- it's already running by the time withContextCapture is called

@Ladicek I see your point right ... I could try some refactoring to use Runnable, Callable or Supplier but to be honest as for the above comment, I am not sure I do really need the usage of withContextCapture. It seems to add me complexity with no real advantage.

@Ladicek
Copy link
Contributor

Ladicek commented Mar 3, 2023

ManagedExecutor is indeed often easier to use than ThreadContext.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants