Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run reactive rest client on Vertx same context #19225

Merged
merged 1 commit into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,9 @@ void shouldHaveApplicationScopeByDefault() {
Bean<?> resolvedBean = beanManager.resolve(beans);
assertThat(resolvedBean.getScope()).isEqualTo(ApplicationScoped.class);
}

@Test
void shouldInvokeClientResponseOnSameContext() {
assertThat(testBean.bug18977()).isEqualTo("Hello");
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
package io.quarkus.rest.client.reactive;

import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.core.MediaType;

import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;

import io.smallrye.mutiny.Uni;

@RegisterRestClient(configKey = "hello2")
public interface HelloClient2 {
@POST
@Consumes(MediaType.TEXT_PLAIN)
@Path("/")
String echo(String name);

@GET
String bug18977();

@GET
@Path("delay")
Uni<String> delay();
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
package io.quarkus.rest.client.reactive;

import java.time.Duration;

import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Request;

import org.eclipse.microprofile.rest.client.inject.RestClient;
import org.junit.jupiter.api.Assertions;

import io.smallrye.mutiny.Uni;

@Path("/hello")
@Produces(MediaType.TEXT_PLAIN)
@Consumes(MediaType.TEXT_PLAIN)
Expand All @@ -16,4 +24,24 @@ public class HelloResource {
public String echo(String name, @Context Request request) {
return "hello, " + name;
}

@RestClient
HelloClient2 client2;

@GET
public Uni<String> something() {
Thread thread = Thread.currentThread();
return client2.delay()
.map(foo -> {
Assertions.assertSame(thread, Thread.currentThread());
return foo;
});
}

@Path("delay")
@GET
public Uni<String> delay() {
return Uni.createFrom().item("Hello")
.onItem().delayIt().by(Duration.ofMillis(500));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,8 @@ String helloViaBuiltClient(String name) {
.build(HelloClient.class);
return helloClient.echo(name);
}

String bug18977() {
return client2.bug18977();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.rest.client.reactive.HelloClient2;
import io.quarkus.rest.client.reactive.HelloResource;
import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
Expand All @@ -24,9 +25,11 @@ public class ProviderDisabledAutodiscoveryTest {
@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(HelloResource.class, HelloClient.class, GlobalRequestFilter.class, GlobalResponseFilter.class)
.addClasses(HelloResource.class, HelloClient2.class, HelloClient.class, GlobalRequestFilter.class,
GlobalResponseFilter.class)
.addAsResource(
new StringAsset(setUrlForClass(HelloClient.class)
+ setUrlForClass(HelloClient2.class)
+ "quarkus.rest-client-reactive.provider-autodiscovery=false"),
"application.properties"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.rest.client.reactive.HelloClient2;
import io.quarkus.rest.client.reactive.HelloResource;
import io.quarkus.test.QuarkusUnitTest;

Expand All @@ -23,12 +24,14 @@ public class ProviderPriorityTest {
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(HelloResource.class,
HelloClient.class,
HelloClient2.class,
HelloClientWithFilter.class,
ResponseFilterLowestPrio.class,
GlobalResponseFilter.class,
GlobalResponseFilterLowPrio.class)
.addAsResource(
new StringAsset(setUrlForClass(HelloClient.class)
+ setUrlForClass(HelloClient2.class)
+ setUrlForClass(HelloClientWithFilter.class)),
"application.properties"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.rest.client.reactive.HelloClient2;
import io.quarkus.rest.client.reactive.HelloResource;
import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
Expand All @@ -25,11 +26,12 @@ public class ProviderTest {
@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(HelloResource.class, HelloClient.class, GlobalRequestFilter.class,
.addClasses(HelloResource.class, HelloClient2.class, HelloClient.class, GlobalRequestFilter.class,
GlobalResponseFilter.class, GlobalRequestFilterConstrainedToServer.class,
GlobalFeature.class)
.addAsResource(
new StringAsset(setUrlForClass(HelloClient.class)),
new StringAsset(setUrlForClass(HelloClient.class)
+ setUrlForClass(HelloClient2.class)),
"application.properties"));

@RestClient
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.jboss.resteasy.reactive.client.impl;

import io.vertx.core.Context;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
Expand Down Expand Up @@ -219,7 +220,14 @@ public HttpClientResponse getVertxClientResponse() {
@Override
protected Executor getEventLoop() {
if (httpClientRequest == null) {
return restClient.getVertx().nettyEventLoopGroup().next();
// make sure we execute the client callbacks on the same context as the current thread
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not right, if this is being called from a worker then it will dispatch on the worker. You still need to actually dispatch to the relevant IO thread.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can this be called from a worker thread, though? I've always seen it called from an IO thread.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be called by the user in org.jboss.resteasy.reactive.client.spi.ResteasyReactiveClientRequestContext from a org.jboss.resteasy.reactive.client.spi.ResteasyReactiveClientRequestFilter, which could be called from pretty much anywhere.

TBH I am really worried about this 'swapping threads' approach. It opens the door to lots of subtle bugs as we now need to ensure that we always do some kind of safe hand off between the threads. We also need to be super careful with IO, as things work differently if you are not on your IO thread, because as soon as you resume IO its possible another IO thread has started running the input handler, so you need to thing about the thread safety semantics of that.

From a performance point of view it also sucks, as it will involve multiple IO threads waking up selectors that don't belong to them, which is terrible from a performance POV.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the concerns, there's many similar difficulties with Hibernate Reactive.

As a user, I think I'd be fine to have to deal with stronger restrictions in exchange for less [potential] trouble.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should just merge this for now (once the tests are passing), but look to the possibility of moving to a single thread model in future.

Thinking about it I am not sure how practical it will be, as having basically a pool of connections per IO thread is not going to be great for databases which have connection limits.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So can we merge this or not? Not trying to be pushy, but Hibernate Transactions are still broken without this (or a similar) fix :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should merge it for now, as I said above. I have rebased so lets see what CI says.

Context context = restClient.getVertx().getOrCreateContext();
return new Executor() {
@Override
public void execute(Runnable command) {
context.runOnContext(v -> command.run());
}
};
} else {
return new Executor() {
@Override
Expand Down