Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quarkus test hangs forever with a ResteasyReactiveClientRequestFilter and Stork #27337

Closed
vsevel opened this issue Aug 17, 2022 · 26 comments
Closed
Labels
area/stork kind/bug Something isn't working

Comments

@vsevel
Copy link
Contributor

vsevel commented Aug 17, 2022

Describe the bug

I have written a client request filter followingup the OidcClientRequestReactiveFilter example for suspending/resuming the context in a reactive context.
In my case I am using the following filter implementation, and a pseudo remote custom discovery resource, called from the stork client side representation.
cc @geoand @michalszynkiewicz @sberyozkin

Expected behavior

Test Stork3Test should succeed.

Actual behavior

The tests hangs forever after printing:

15:59:10 INFO  [or.ac.MyReactiveRequestFilter] (main) processing stork://hello-service:8081/greeting/hello; suspend request context
15:59:10 INFO  [or.ac.MyReactiveRequestFilter] (main) anonymous = true
15:59:10 INFO  [or.ac.MyReactiveRequestFilter] (vert.x-eventloop-thread-2) processing http://localhost:8081/discovery; suspend request context

How to Reproduce?

mvn package -Dtest=Stork3Test in project reproducer_reactive_stork

Output of uname -a or ver

No response

Output of java -version

No response

GraalVM version (if different from Java)

No response

Quarkus version or git rev

2.11.2

Build tool (ie. output of mvnw --version or gradlew --version)

No response

Additional information

see this thread

@vsevel vsevel added the kind/bug Something isn't working label Aug 17, 2022
@quarkus-bot
Copy link

quarkus-bot bot commented Aug 17, 2022

/cc @michalszynkiewicz

@quarkus-bot
Copy link

quarkus-bot bot commented Aug 17, 2022

You added a link to a Zulip discussion, please make sure the description of the issue is comprehensive and doesn't require accessing Zulip.

This message is automatically generated by a bot.

@michalszynkiewicz
Copy link
Member

michalszynkiewicz commented Aug 18, 2022

Can you try removing @Provider from the filter and registering the filter directly on your client? nevermind :)

@michalszynkiewicz
Copy link
Member

I spent a few hours on it and so far have no clue.
@cescoffier maybe you'd have some idea what may be going on?

BTW, @vsevel is the idea to use the filter for the actual client and the client used by the service discovery?

@vsevel
Copy link
Contributor Author

vsevel commented Aug 18, 2022

thanks @michalszynkiewicz for spending some time already.
yes, I could bypass that particular filter for the discovery call, and I have already internally. this a valid workaround.
but I was afraid this may be uncovering a broader issue, or is the scope limited to calls to discovery services because they are done from within the rest client, and that is what causing the issue...?

@vsevel
Copy link
Contributor Author

vsevel commented Aug 18, 2022

the issue is that I have weird behaviors on other calls too, not just on the discovery service.
to overcome this, I added:

    @Override
    public void filter(ResteasyReactiveClientRequestContext requestContext) {

        if (Infrastructure.canCallerThreadBeBlocked()) {
            ...
            return;
        }
...

and I have started excluding explicit calls from the filter. but that does not bring a high level of confidence.

@cescoffier
Copy link
Member

That call just checks that you are not on an event loop. This seems to be the case for the second call.

Suspending the request may prevent the body from being read, which could be a problem.

@vsevel
Copy link
Contributor Author

vsevel commented Aug 18, 2022

I guess I am missing the idiomatic construct. my requirement is simply to be able to intercept outgoing calls, adding some attributes in the header based on the injected SecurityIdentity

That call just checks that you are not on an event loop. This seems to be the case for the second call.

in my example, the first call is happening on the main thread (this is triggered from a test). I use the canCallerThreadBeBlocked to execute the code synchronously when I can.

Suspending the request may prevent the body from being read, which could be a problem.

not sure what you mean. I strictly applied the approach taken in the oidc reactive filter. what is the proper way to exercise the SecurityIdentity?

@cescoffier
Copy link
Member

Looking into it.

@cescoffier
Copy link
Member

The exception is swallowed:

javax.enterprise.context.ContextNotActiveException: RequestScoped context was not active when trying to obtain a bean instance for a client proxy of CLASS bean [class=io.quarkus.security.runtime.SecurityIdentityAssociation, id=aba1c02763d7b5960e5b4bb68a012c84567f5f56]
	- you can activate the request context for a specific method using the @ActivateRequestContext interceptor binding
	at io.quarkus.arc.impl.ClientProxies.getDelegate(ClientProxies.java:53)
	at io.quarkus.security.runtime.SecurityIdentityAssociation_ClientProxy.arc$delegate(Unknown Source)
	at io.quarkus.security.runtime.SecurityIdentityAssociation_ClientProxy.getDeferredIdentity(Unknown Source)
	at org.acme.MyReactiveRequestFilter.filter(MyReactiveRequestFilter.java:37)
	at org.jboss.resteasy.reactive.client.spi.ResteasyReactiveClientRequestFilter.filter(ResteasyReactiveClientRequestFilter.java:11)
	at org.jboss.resteasy.reactive.client.handlers.ClientRequestFilterRestHandler.handle(ClientRequestFilterRestHandler.java:24)
	at org.jboss.resteasy.reactive.client.handlers.ClientRequestFilterRestHandler.handle(ClientRequestFilterRestHandler.java:9)
	at org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.invokeHandler(AbstractResteasyReactiveContext.java:218)
	at org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.run(AbstractResteasyReactiveContext.java:140)
	at org.jboss.resteasy.reactive.client.impl.AsyncInvokerImpl.performRequestInternal(AsyncInvokerImpl.java:262)
	at org.jboss.resteasy.reactive.client.impl.AsyncInvokerImpl.performRequestInternal(AsyncInvokerImpl.java:252)
	at org.jboss.resteasy.reactive.client.impl.AsyncInvokerImpl.method(AsyncInvokerImpl.java:219)
	at org.jboss.resteasy.reactive.client.impl.UniInvoker$1.get(UniInvoker.java:24)
	at org.jboss.resteasy.reactive.client.impl.UniInvoker$1.get(UniInvoker.java:21)

@cescoffier
Copy link
Member

You cannot access the currentIdentityAssociation after suspending as it's a request scoped bean, and you suspend the request scope.

@cescoffier
Copy link
Member

BTW, it's a synchronous exception that is swallowed, something should be added to RR to log the exception. Most probably in org.jboss.resteasy.reactive.client.handlers.ClientRequestFilterRestHandler.

@cescoffier
Copy link
Member

@michalszynkiewicz @vsevel
This is working:

package org.acme;

import javax.enterprise.context.control.ActivateRequestContext;
import javax.enterprise.inject.spi.CDI;
import javax.inject.Inject;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.Provider;

import io.quarkus.arc.Arc;
import io.quarkus.security.identity.CurrentIdentityAssociation;
import io.quarkus.security.identity.SecurityIdentity;
import io.smallrye.mutiny.Uni;
import org.jboss.resteasy.reactive.client.spi.ResteasyReactiveClientRequestContext;
import org.jboss.resteasy.reactive.client.spi.ResteasyReactiveClientRequestFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Provider
public class MyReactiveRequestFilter implements ResteasyReactiveClientRequestFilter {

    private static final Logger log = LoggerFactory.getLogger(MyReactiveRequestFilter.class);

    @Inject
    CurrentIdentityAssociation currentIdentityAssociation;

    @Override
    @ActivateRequestContext
    public void filter(ResteasyReactiveClientRequestContext requestContext) {

        log.info("processing " + requestContext.getUri() + "; suspend request context");
        requestContext.suspend();

        try {
            currentIdentityAssociation.getDeferredIdentity()
                    .log()
                    .subscribe()
                    .with(securityIdentity -> {
                                log.info("anonymous = " + securityIdentity.isAnonymous());
                                requestContext.resume();
                            },
                            throwable -> {
                                log.error("filter failed", throwable);
                                requestContext.abortWith(Response.status(500).build());
                                requestContext.resume();
                            });
        } catch (Throwable e) {
            log.error("Something really bad happened and get buried real deep", e);
            requestContext.abortWith(Response.status(500).build());
            requestContext.resume();
        }
    }
}

I just added the ActivateRequestContext annotation because, and that is where my knowledge ends, there is a case where the filter is called without a request context active. It seems to come from a Uni created from a CompletionStage (generally a very bad idea because of the threading issues). I was not able to find the exact code calling the filter method. @michalszynkiewicz, maybe you know?

The following solution is probably better as it clearly identifies the boundary:

package org.acme;

import javax.enterprise.context.control.ActivateRequestContext;
import javax.enterprise.inject.spi.CDI;
import javax.inject.Inject;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.Provider;

import io.quarkus.arc.Arc;
import io.quarkus.security.identity.CurrentIdentityAssociation;
import io.quarkus.security.identity.SecurityIdentity;
import io.smallrye.mutiny.Uni;
import org.jboss.resteasy.reactive.client.spi.ResteasyReactiveClientRequestContext;
import org.jboss.resteasy.reactive.client.spi.ResteasyReactiveClientRequestFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Provider
public class MyReactiveRequestFilter implements ResteasyReactiveClientRequestFilter {

    private static final Logger log = LoggerFactory.getLogger(MyReactiveRequestFilter.class);

    @Inject
    CurrentIdentityAssociation currentIdentityAssociation;

    @Override
    public void filter(ResteasyReactiveClientRequestContext requestContext) {
        boolean requestScopeActivated = false;
        if (! Arc.container().requestContext().isActive()) {
            Arc.container().requestContext().activate();
            requestScopeActivated = true;
        }

        log.info("processing " + requestContext.getUri() + "; suspend request context");
        requestContext.suspend();

        try {
            boolean shouldTerminateRequestScope = requestScopeActivated;
            currentIdentityAssociation.getDeferredIdentity()
                    .subscribe()
                    .with(securityIdentity -> {
                                log.info("anonymous = " + securityIdentity.isAnonymous());
                                deactivateRequestScopeIfNeeded(shouldTerminateRequestScope);
                                requestContext.resume();
                            },
                            throwable -> {
                                log.error("filter failed", throwable);
                                deactivateRequestScopeIfNeeded(shouldTerminateRequestScope);
                                requestContext.abortWith(Response.status(500).build());
                                requestContext.resume();
                            });
        } catch (Throwable e) {
            log.error("Something really bad happened and get buried real deep", e);
            requestContext.abortWith(Response.status(500).build());
            requestContext.resume();
        }
    }

    private void deactivateRequestScopeIfNeeded(boolean shouldTerminateRequestScope) {
        if (shouldTerminateRequestScope) {
            Arc.container().requestContext().deactivate();
        }
    }
}

@vsevel
Copy link
Contributor Author

vsevel commented Aug 19, 2022

ok. just a bit over my head ;-)

it is strange to me to force the activation of the request context.
my impression was that if the scope was not active, there was no reason to execute the functional code (a case of propagating headers based on the current security identity). so I ended up with the simpler (incorrect?) code that is passing our regression tests:

    @Override
    public void filter(ResteasyReactiveClientRequestContext requestContext) {

        if (Arc.container().requestContext().isActive()) {
            currentIdentityAssociation.getDeferredIdentity().subscribe().with(
                    identity -> putAuthorizationHeader(requestContext, identity),
                    throwable -> fail(throwable, requestContext));
        }
    }

    private void putAuthorizationHeader(ResteasyReactiveClientRequestContext requestContext, SecurityIdentity identity) {
        if (!identity.isAnonymous()) {
            MultivaluedMap<String, Object> headers = requestContext.getHeaders();
            headers.putSingle("x", "y");
        }
    }

@cescoffier
Copy link
Member

Your approach is fine!
I wonder why the scope is not activated on the second call - this is where I need @michalszynkiewicz insights.

@michalszynkiewicz
Copy link
Member

When getServiceInstance is called, the request scope is active. But within initialization of the deferred (and memoized) Uni, it is not.
I don't know the Arc internals well enough to say why is that but it seems reasonable to me, the initalization of service instances is "scheduled" from a different place, so it should probably not reuse the caller's request context.

@vsevel
Copy link
Contributor Author

vsevel commented Aug 19, 2022

out of curiosity, what is the purpose of requestContext.suspend()? in what situations do we need to do it? I could not find any doc, may be execpt #16615
it seems to be associated with reactive filters, which it is I suppose when you start using currentIdentityAssociation.getDeferredIdentity()?
I tried re-adding the suspend/resume, and it works (but it worked also before).

@michalszynkiewicz
Copy link
Member

In REST Client Reactive, the "usual" filter is (at least in some situations) called on the event loop.
If you need to perform some IO operation there, you can suspend the context before invoking it (still without blocking the loop, using executeBlocking if required), and resume when it's done, to avoid blocking the event loop.

@michalszynkiewicz
Copy link
Member

It looks like the problem is in the fact that an exception is thrown (and the rest client handler schedules processing it) while the request context is suspended.
Changing the filter to the following works:

        requestContext.suspend();

        try {
            currentIdentityAssociation.getDeferredIdentity().subscribe()
                    .with(securityIdentity -> {
                                log.info("anonymous = " + securityIdentity.isAnonymous());
                                requestContext.resume();
                            },
                            throwable -> {
                                log.error("filter failed", throwable);
                                requestContext.abortWith(Response.status(500).build());
                                requestContext.resume();
                            });
        } catch (Exception any) {
            requestContext.resume();
        }

It still needs to be fixed in the handler, at least by logging some error, so that users know that they have to handle such exceptions

@vsevel
Copy link
Contributor Author

vsevel commented Aug 19, 2022

may be a finally would be better?

        requestContext.suspend();

        try {
...
        } finally {
            requestContext.resume();
        }

or is this problematic since the subscribers may not have executed yet, and we could end up executing the resume from the finally before the resume from the subscribers? I see AbstractResteasyReactiveContext.suspended is not thread safe. so may be something like that:

            requestContext.suspend();
            try {
                currentIdentityAssociation.getDeferredIdentity().subscribe()
                        .with(securityIdentity -> {
                                   // ...
                                    requestContext.resume();
                                },
                                throwable -> {
                                    // ...
                                    requestContext.resume();
                                });
            } catch (RuntimeException e) {
                requestContext.resume(); // assuming this will not be done in the subscribers
                throw e;
            }

If you need to perform some IO operation there, you can suspend the context before invoking it (still without blocking the loop, using executeBlocking if required), and resume when it's done, to avoid blocking the event loop.

so if we do not perform any blocking operation, then request/resume is not required I take it.

@michalszynkiewicz
Copy link
Member

may be a finally would be better?

        requestContext.suspend();

        try {
...
        } finally {
            requestContext.resume();
        }

or is this problematic since the subscribers may not have executed yet, and we could end up executing the resume from the finally before the resume from the subscribers?
Yes, resume would be called too early when everything is alright with finally

If you need to perform some IO operation there, you can suspend the context before invoking it (still without blocking the loop, using executeBlocking if required), and resume when it's done, to avoid blocking the event loop.

so if we do not perform any blocking operation, then request/resume is not required I take it.

Exactly

@michalszynkiewicz
Copy link
Member

With the linked PR the error will be logged and the processing will continue resulting in io.smallrye.stork.api.NoServiceInstanceFoundException: No service instance found

I'm not 100% sure it's proper though, let's see what people who worked more on the internals think ;)

@vsevel
Copy link
Contributor Author

vsevel commented Aug 19, 2022

ok so may be something like this then:

            requestContext.suspend();
            try {
                currentIdentityAssociation.getDeferredIdentity().subscribe()
                        .with(securityIdentity -> {
                                   // ...
                                    requestContext.resume();
                                },
                                throwable -> {
                                    // ...
                                    requestContext.resume();
                                });
            } catch (RuntimeException e) {
                requestContext.resume(); // assuming this will not be done in the subscribers
                throw e;
            }

@michalszynkiewicz
Copy link
Member

Yes, I forgot about rethrowing in my snippet above :)

@vsevel
Copy link
Contributor Author

vsevel commented Aug 19, 2022

so I think we are at the end of this thread. thanks so much for your attention @michalszynkiewicz and @cescoffier . I have a much better understanding now. this was really a learning experience.
during your next stop in Geneva, don't forget to stop by to receive your swiss chocolate bar ;-)

@michalszynkiewicz
Copy link
Member

let's close it then, thanks in advance for the chocolate bar :D

michalszynkiewicz added a commit to michalszynkiewicz/quarkus that referenced this issue Aug 19, 2022
sheilamjones pushed a commit to sheilamjones/quarkus that referenced this issue Aug 22, 2022
gsmet pushed a commit to gsmet/quarkus that referenced this issue Aug 23, 2022
fercomunello pushed a commit to fercomunello/quarkus that referenced this issue Aug 31, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/stork kind/bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants