Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dispatch back to IO thread for non blocking requests #20438

Merged
merged 1 commit into from
Sep 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.launch
import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext
import org.jboss.resteasy.reactive.server.spi.ServerRestHandler
import java.util.concurrent.Executor
import javax.enterprise.inject.spi.CDI

class FlowToPublisherHandler : ServerRestHandler {
Expand All @@ -27,7 +28,9 @@ class FlowToPublisherHandler : ServerRestHandler {
// ensure the proper CL is not lost in dev-mode
Thread.currentThread().contextClassLoader = originalTCCL
requestContext.result = result.asMulti()
requestContext.resume()
//run in a direct invocation executor to run the rest of the invocation in the co-route scope
//feels a bit fragile, but let's see how it goes
requestContext.resume(Executor { it.run() })
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@geoand I needed to do this to make the kotlin tests work. Not sure if I like it much but I think it is probably ok for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can live with it :)

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,12 @@ public <T> T unwrap(Class<T> theType) {
return null;
}

@Override
public boolean isOnIoThread() {
//does not really apply to Servlet
return true;
}

@Override
public ServerHttpResponse setStatusCode(int code) {
response.setStatus(code);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public Throwable getThrowable() {

protected abstract Executor getEventLoop();

protected Executor getContextExecutor() {
public Executor getContextExecutor() {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.jboss.resteasy.reactive.server.handlers.InputHandler;
import org.jboss.resteasy.reactive.server.handlers.InstanceHandler;
import org.jboss.resteasy.reactive.server.handlers.InvocationHandler;
import org.jboss.resteasy.reactive.server.handlers.NonBlockingHandler;
import org.jboss.resteasy.reactive.server.handlers.ParameterHandler;
import org.jboss.resteasy.reactive.server.handlers.PerRequestInstanceHandler;
import org.jboss.resteasy.reactive.server.handlers.RequestDeserializeHandler;
Expand Down Expand Up @@ -184,6 +185,7 @@ public RuntimeResource buildResourceMethod(ResourceClass clazz,
blockingHandlerIndex = Optional.of(handlers.size() - 1);
score.add(ScoreSystem.Category.Execution, ScoreSystem.Diagnostic.ExecutionBlocking);
} else {
handlers.add(NonBlockingHandler.INSTANCE);
score.add(ScoreSystem.Category.Execution, ScoreSystem.Diagnostic.ExecutionNonBlocking);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.jboss.resteasy.reactive.server.handlers;

import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext;
import org.jboss.resteasy.reactive.server.spi.ServerRestHandler;

public class NonBlockingHandler implements ServerRestHandler {

public static final NonBlockingHandler INSTANCE = new NonBlockingHandler();

@Override
public void handle(ResteasyReactiveRequestContext requestContext) throws Exception {
if (requestContext.serverRequest().isOnIoThread()) {
return;
}
requestContext.suspend();
requestContext.resume(requestContext.getContextExecutor());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ default FormData getExistingParsedForm() {
return null;
}

boolean isOnIoThread();

interface ReadCallback {

void done();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ public VertxResteasyReactiveRequestContext(Deployment deployment, ProvidersImpl
this.contextExecutor = new Executor() {
@Override
public void execute(Runnable command) {
internal.execute(command);
internal.runOnContext(new Handler<Void>() {
@Override
public void handle(Void unused) {
command.run();
}
});
}
};
}
Expand Down Expand Up @@ -101,7 +106,7 @@ protected EventLoop getEventLoop() {
return ((ConnectionBase) context.request().connection()).channel().eventLoop();
}

protected Executor getContextExecutor() {
public Executor getContextExecutor() {
return contextExecutor;
}

Expand Down Expand Up @@ -283,6 +288,11 @@ public FormData getExistingParsedForm() {
return ret;
}

@Override
public boolean isOnIoThread() {
return ((ConnectionBase) request.connection()).channel().eventLoop().inEventLoop();
}

@SuppressWarnings("unchecked")
@Override
public <T> T unwrap(Class<T> theType) {
Expand Down
17 changes: 17 additions & 0 deletions integration-tests/hibernate-reactive-postgresql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive-jsonb</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-elytron-security-properties-file</artifactId>
</dependency>

<!-- test dependencies -->
<dependency>
Expand Down Expand Up @@ -84,6 +88,19 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-elytron-security-properties-file-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@

import org.hibernate.reactive.mutiny.Mutiny;

import io.quarkus.security.Authenticated;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.pgclient.PgPool;
import io.vertx.mutiny.sqlclient.Row;
import io.vertx.mutiny.sqlclient.RowSet;
import io.vertx.mutiny.sqlclient.Tuple;

@Path("/tests")
@Authenticated
public class HibernateReactiveTestEndpoint {

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,8 @@ quarkus.hibernate-orm.database.generation=drop-and-create
quarkus.datasource.reactive=true
quarkus.datasource.reactive.url=${postgres.reactive.url}


quarkus.security.users.embedded.enabled=true
quarkus.security.users.embedded.users.scott=jb0ss
quarkus.security.users.embedded.plain-text=true
quarkus.security.users.embedded.roles.scott=Admin,admin,Tester,user
Original file line number Diff line number Diff line change
Expand Up @@ -11,54 +11,62 @@

/**
* Test various JPA operations running in Quarkus
*
* Also makes sure that these work with a blocking security implementation
*/
@QuarkusTest
@TestHTTPEndpoint(HibernateReactiveTestEndpoint.class)
public class HibernateReactiveTest {

@Test
public void reactiveCowPersist() {
RestAssured.when()
RestAssured.given().when()
.auth().preemptive().basic("scott", "jb0ss")
.get("/reactiveCowPersist")
.then()
.body(containsString("\"name\":\"Carolina\"}")); //Use containsString as we don't know the Id this object will have
}

@Test
public void reactiveFindMutiny() {
RestAssured.when()
RestAssured.given().when()
.auth().preemptive().basic("scott", "jb0ss")
.get("/reactiveFindMutiny")
.then()
.body(is("{\"id\":5,\"name\":\"Aloi\"}"));
}

@Test
public void reactivePersist() {
RestAssured.when()
RestAssured.given().when()
.auth().preemptive().basic("scott", "jb0ss")
.get("/reactivePersist")
.then()
.body(is("Tulip"));
}

@Test
public void reactiveRemoveTransientEntity() {
RestAssured.when()
RestAssured.given().when()
.auth().preemptive().basic("scott", "jb0ss")
.get("/reactiveRemoveTransientEntity")
.then()
.body(is("OK"));
}

@Test
public void reactiveRemoveManagedEntity() {
RestAssured.when()
RestAssured.given().when()
.auth().preemptive().basic("scott", "jb0ss")
.get("/reactiveRemoveManagedEntity")
.then()
.body(is("OK"));
}

@Test
public void reactiveUpdate() {
RestAssured.when()
RestAssured.given().when()
.auth().preemptive().basic("scott", "jb0ss")
.get("/reactiveUpdate")
.then()
.body(is("Tina"));
Expand Down