Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Context propagation not working in async calls #947

Closed
AndreasPetersen opened this issue Aug 2, 2023 · 6 comments · Fixed by #950
Closed

Context propagation not working in async calls #947

AndreasPetersen opened this issue Aug 2, 2023 · 6 comments · Fixed by #950
Milestone

Comments

@AndreasPetersen
Copy link

When using HTTP Async Transport context propagation is not made when using quarkus-smallrye-context-propagation. This is in contrast to async REST requests, where the context is correctly propagated.

I have created a reproducer project here. In it, you will find a test which calls an HTTP server REST endpoint, which makes a call to:

  • A sync REST client
  • An async REST client
  • A sync SOAP client
  • An async SOAP client. This test case fails.

I'm using the Micrometer metrics feature with Quarkus CXF , and I've created a MeterFilter that adds tags to the created metrics from a @RequestScoped bean. However, since the no context propagation is done on the async SOAP client call, the MeterFilter fails with:

2023-08-02 13:51:09,179 WARN  [org.apa.cxf.pha.PhaseInterceptorChain] (default-workqueue-1) Exception in handleFault on interceptor org.apache.cxf.metrics.interceptors.MetricsMessageInInterceptor@6f400618: jakarta.enterprise.context.ContextNotActiveException: RequestScoped context was not active when trying to obtain a bean instance for a client proxy of CLASS bean [class=org.acme.RequestScopedHeader, id=88dfd968e166472ee5433588607fd7b273d3ab8a]
	- you can activate the request context for a specific method using the @ActivateRequestContext interceptor binding
	at io.quarkus.arc.impl.ClientProxies.getDelegate(ClientProxies.java:55)
	at org.acme.RequestScopedHeader_ClientProxy.arc$delegate(Unknown Source)
	at org.acme.RequestScopedHeader_ClientProxy.getHeaderValue(Unknown Source)
	at org.acme.MeterFilterProducer$1.map(MeterFilterProducer.java:22)
	at io.micrometer.core.instrument.MeterRegistry.getMappedId(MeterRegistry.java:593)
	at io.micrometer.core.instrument.MeterRegistry.registerMeterIfNecessary(MeterRegistry.java:576)
	at io.micrometer.core.instrument.MeterRegistry.timer(MeterRegistry.java:323)
	at io.micrometer.core.instrument.Timer$Builder.register(Timer.java:444)
	at org.apache.cxf.metrics.micrometer.MicrometerMetricsContext.stop(MicrometerMetricsContext.java:124)
	at org.apache.cxf.metrics.micrometer.MicrometerMetricsContext.record(MicrometerMetricsContext.java:104)
	at org.apache.cxf.metrics.micrometer.MicrometerClientMetricsContext.record(MicrometerClientMetricsContext.java:56)
	at org.apache.cxf.metrics.micrometer.MicrometerMetricsContext.stop(MicrometerMetricsContext.java:79)
	at org.apache.cxf.metrics.micrometer.MicrometerClientMetricsContext.stop(MicrometerClientMetricsContext.java:46)
	at org.apache.cxf.metrics.ExchangeMetrics.stop(ExchangeMetrics.java:75)
	at org.apache.cxf.metrics.interceptors.AbstractMetricsInterceptor.stop(AbstractMetricsInterceptor.java:214)
	at org.apache.cxf.metrics.interceptors.MetricsMessageInInterceptor.handleFault(MetricsMessageInInterceptor.java:50)
	at org.apache.cxf.phase.PhaseInterceptorChain.unwind(PhaseInterceptorChain.java:498)
	at org.apache.cxf.phase.PhaseInterceptorChain.wrapExceptionAsFault(PhaseInterceptorChain.java:349)
	at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:331)
	at org.apache.cxf.endpoint.ClientImpl.onMessage(ClientImpl.java:808)
	at org.apache.cxf.transport.http.HTTPConduit$WrappedOutputStream.handleResponseInternal(HTTPConduit.java:1738)
	at org.apache.cxf.transport.http.HTTPConduit$WrappedOutputStream$1.run(HTTPConduit.java:1216)
	at org.apache.cxf.workqueue.AutomaticWorkQueueImpl$3.run(AutomaticWorkQueueImpl.java:413)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at org.apache.cxf.workqueue.AutomaticWorkQueueImpl$AWQThreadFactory$1.run(AutomaticWorkQueueImpl.java:346)
	at java.base/java.lang.Thread.run(Thread.java:833)
ppalaga added a commit to ppalaga/quarkus-cxf that referenced this issue Aug 3, 2023
ppalaga added a commit to ppalaga/quarkus-cxf that referenced this issue Aug 3, 2023
@ppalaga
Copy link
Contributor

ppalaga commented Aug 3, 2023

Thanks for the precise analysis and reproducer, @AndreasPetersen!

I added the reproducer and I tried to fix the issue in #950

I thought it would be enough to instruct CXF to submit the async client requests through org.eclipse.microprofile.context.ManagedExecutor obtained from the CDI container. I can see the tasks being executed in the proper thread executor-thread-2

INFO  [io.qua.cxf.hc5.it.MeterFilterProducer] (executor-thread-2) == in MeterFilter

and in the stack trace, there is io.smallrye.context.impl.wrappers.SlowContextualRunnable.run(SlowContextualRunnable.java:19) which should IMO setup the context:

        try (CleanAutoCloseable activeState = state.begin()) {
            runnable.run();
        }

but it is still failing with the following stack trace:

Exception in handleFault on interceptor org.apache.cxf.metrics.interceptors.MetricsMessageInInterceptor@53922ff4: jakarta.enterprise.context.ContextNotActiveException: RequestScoped context was not active when trying to obtain a bean instance for a client proxy of CLASS bean [class=io.quarkiverse.cxf.hc5.it.RequestScopedHeader, id=2e1be9b7658c70e3149847c70302371c0eea4152]
[INFO] [stdout]         - you can activate the request context for a specific method using the @ActivateRequestContext interceptor binding
[INFO] [stdout]         at io.quarkus.arc.impl.ClientProxies.getDelegate(ClientProxies.java:55)
[INFO] [stdout]         at io.quarkiverse.cxf.hc5.it.RequestScopedHeader_ClientProxy.arc$delegate(Unknown Source)
[INFO] [stdout]         at io.quarkiverse.cxf.hc5.it.RequestScopedHeader_ClientProxy.getHeaderValue(Unknown Source)
[INFO] [stdout]         at io.quarkiverse.cxf.hc5.it.MeterFilterProducer$1.map(MeterFilterProducer.java:29)
[INFO] [stdout]         at io.micrometer.core.instrument.MeterRegistry.getMappedId(MeterRegistry.java:593)
[INFO] [stdout]         at io.micrometer.core.instrument.MeterRegistry.registerMeterIfNecessary(MeterRegistry.java:576)
[INFO] [stdout]         at io.micrometer.core.instrument.MeterRegistry.timer(MeterRegistry.java:323)
[INFO] [stdout]         at io.micrometer.core.instrument.Timer$Builder.register(Timer.java:444)
[INFO] [stdout]         at org.apache.cxf.metrics.micrometer.MicrometerMetricsContext.stop(MicrometerMetricsContext.java:124)
[INFO] [stdout]         at org.apache.cxf.metrics.micrometer.MicrometerMetricsContext.record(MicrometerMetricsContext.java:104)
[INFO] [stdout]         at org.apache.cxf.metrics.micrometer.MicrometerClientMetricsContext.record(MicrometerClientMetricsContext.java:56)
[INFO] [stdout]         at org.apache.cxf.metrics.micrometer.MicrometerMetricsContext.stop(MicrometerMetricsContext.java:79)
[INFO] [stdout]         at org.apache.cxf.metrics.micrometer.MicrometerClientMetricsContext.stop(MicrometerClientMetricsContext.java:46)
[INFO] [stdout]         at org.apache.cxf.metrics.ExchangeMetrics.stop(ExchangeMetrics.java:75)
[INFO] [stdout]         at org.apache.cxf.metrics.interceptors.AbstractMetricsInterceptor.stop(AbstractMetricsInterceptor.java:214)
[INFO] [stdout]         at org.apache.cxf.metrics.interceptors.MetricsMessageInInterceptor.handleFault(MetricsMessageInInterceptor.java:50)
[INFO] [stdout]         at org.apache.cxf.phase.PhaseInterceptorChain.unwind(PhaseInterceptorChain.java:498)
[INFO] [stdout]         at org.apache.cxf.phase.PhaseInterceptorChain.wrapExceptionAsFault(PhaseInterceptorChain.java:349)
[INFO] [stdout]         at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:331)
[INFO] [stdout]         at org.apache.cxf.endpoint.ClientImpl.onMessage(ClientImpl.java:808)
[INFO] [stdout]         at org.apache.cxf.transport.http.HTTPConduit$WrappedOutputStream.handleResponseInternal(HTTPConduit.java:1738)
[INFO] [stdout]         at org.apache.cxf.transport.http.HTTPConduit$WrappedOutputStream$1.run(HTTPConduit.java:1216)
[INFO] [stdout]         at io.quarkiverse.cxf.transport.http.hc5.QuarkusWorkQueueImpl$1.run(QuarkusWorkQueueImpl.java:28)
[INFO] [stdout]         at io.smallrye.context.impl.wrappers.SlowContextualRunnable.run(SlowContextualRunnable.java:19)
[INFO] [stdout]         at io.quarkus.vertx.core.runtime.VertxCoreRecorder$14.runWith(VertxCoreRecorder.java:581)
[INFO] [stdout]         at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2513)
[INFO] [stdout]         at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1512)
[INFO] [stdout]         at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29)
[INFO] [stdout]         at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29)
[INFO] [stdout]         at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
[INFO] [stdout]         at java.base/java.lang.Thread.run(Thread.java:833)

@cescoffier @geoand @mkouba would you mind looking at the second commit in #950 and give any hints what is missing?

@mkouba
Copy link

mkouba commented Aug 4, 2023

@cescoffier @geoand @mkouba would you mind looking at the second commit in #950 and give any hints what is missing?

CC @manovotn @FroMage

@FroMage
Copy link

FroMage commented Aug 7, 2023

OK, so, in my opinion, what happens here is that you're not capturing the context of when you create the Runnable, as it should, but when you schedule it, which is too late.

Context Propagation is supposed to happen by capturing the context when it's set up, saving it for later, and then restoring it when it's no longer there (in another thread, for example).

You're using ManagedExecutor which is only for running async tasks on worker threads, by propagating the current context. But I'm not even sure you need that. What you should be using is ThreadContext.contextualRunnable at the moment you create the Runnable that you will want to schedule later, because at that moment you create it (presumably), that's where you have the contexts that you want to capture. Then, later, you can use ManagedExecutor if you want to offload it to a worker thread, or just call it if you already are in an async Thread.

Does this help?

@ppalaga
Copy link
Contributor

ppalaga commented Aug 7, 2023

Thanks a lot @FroMage! I needed exactly this kind of conceptual intro. Let me try your hint with ThreadContext.contextualRunnable.

ppalaga added a commit to ppalaga/quarkus-cxf that referenced this issue Aug 7, 2023
ppalaga added a commit to ppalaga/quarkus-cxf that referenced this issue Aug 7, 2023
@ppalaga
Copy link
Contributor

ppalaga commented Aug 7, 2023

What you should be using is ThreadContext.contextualRunnable at the moment you create the Runnable

This was indeed crucial for me: I found out that the runnable is created in HC5's own event loop thread where the context is not available, so submitting the response consumer runnable to ManagedExecutor was not enough. I was able hack into CXF internals and (indirectly) wrap the runnable from the Vert.x event loop where the context is available. I hope that's the right thing to do.

Then, later, you can use ManagedExecutor if you want to offload it to a worker thread

I had to keep using ManagedExecutor because otherwise CXF would use its own executor and with that one in place, the scenario was failing.

Thanks again, @FroMage, the context propagation part is clear.

Now I can turn to CXF experts to figure out how to make my hack more elegant.

@FroMage
Copy link

FroMage commented Aug 8, 2023

OK, great :) Glad that it was this and not something hard to find :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants