Skip to content

Commit

Permalink
Merge pull request quarkusio#14880 from geoand/quarkusio#14875
Browse files Browse the repository at this point in the history
Ensure that resume from a worker pool thread doesn't resume on event loop
  • Loading branch information
stuartwdouglas authored Feb 7, 2021
2 parents 9560207 + b143d47 commit 4155282
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.jboss.resteasy.reactive.server.spi.ServerRequestContext;
import org.junit.jupiter.api.Assertions;

import io.quarkus.runtime.BlockingOperationControl;
import io.smallrye.common.annotation.Blocking;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;

Expand All @@ -54,6 +56,16 @@ public String params(@RestPath String p,
+ q2.orElse("empty") + ", q3: " + q3.orElse(-1);
}

@Blocking
@POST
@Path("form-blocking")
public String formBlocking(@RestForm String f) {
if (!BlockingOperationControl.isBlockingAllowed()) {
throw new RuntimeException("should not have dispatched");
}
return f;
}

@GET
@Path("context")
public String context(// Spec:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,12 @@ public void testNewParams() {
.then()
.log().ifError()
.body(Matchers.equalTo("data:OK\n\n"));
RestAssured.with()
.urlEncodingEnabled(false)
.formParam("f", "fv")
.post("/new-params/myklass;m=mv/myregex/form-blocking")
.then()
.body(Matchers.equalTo("fv"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ public abstract class AbstractResteasyReactiveContext<T extends AbstractResteasy
private boolean suspended = false;
private volatile boolean requestScopeActivated = false;
private volatile boolean running = false;
private volatile Executor executor;
private volatile Executor executor; // ephemerally set by handlers to signal that we resume, it needs to be on this executor
private volatile Executor lastExecutor; // contains the last executor which was provided during resume - needed to submit there if suspended again
private Map<String, Object> properties;
private final ThreadSetupAction requestContext;
private ThreadSetupAction.ThreadState currentRequestScope;
Expand Down Expand Up @@ -55,11 +56,19 @@ public synchronized void resume(Executor executor) {
this.executor = executor;
if (executor == null) {
suspended = false;
} else {
this.lastExecutor = executor;
}
} else {
suspended = false;
if (executor == null) {
getEventLoop().execute(this);
if (lastExecutor == null) {
getEventLoop().execute(this);
} else {
// we need to do this to ensure that if we suspended while not on the event-loop,
// that we come back on a thread from this executor
lastExecutor.execute(this);
}
} else {
executor.execute(this);
}
Expand Down

0 comments on commit 4155282

Please sign in to comment.