Skip to content

Commit

Permalink
Merge pull request #19595 from stuartwdouglas/race-fix
Browse files Browse the repository at this point in the history
Fix Resteasy Reactive race
  • Loading branch information
geoand authored Aug 24, 2021
2 parents 08b3740 + 819fd27 commit 5482946
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.resteasy.reactive.server.test.ExceptionUtil;
import io.quarkus.test.QuarkusUnitTest;
import io.restassured.RestAssured;

Expand All @@ -26,7 +27,7 @@ public class PerClassThrowableExceptionMapperTest {
@Override
public JavaArchive get() {
return ShrinkWrap.create(JavaArchive.class)
.addClasses(HasCustomThrowableHandlerResource.class);
.addClasses(HasCustomThrowableHandlerResource.class, ExceptionUtil.class);
}
});

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package io.quarkus.resteasy.reactive.server.test.stress;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

import javax.ws.rs.GET;
import javax.ws.rs.Path;

import org.hamcrest.Matchers;
import org.jboss.jandex.ClassInfo;
import org.jboss.jandex.MethodInfo;
import org.jboss.resteasy.reactive.common.model.ResourceClass;
import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext;
import org.jboss.resteasy.reactive.server.model.HandlerChainCustomizer;
import org.jboss.resteasy.reactive.server.model.ServerResourceMethod;
import org.jboss.resteasy.reactive.server.processor.scanning.MethodScanner;
import org.jboss.resteasy.reactive.server.spi.ServerRestHandler;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.builder.BuildChainBuilder;
import io.quarkus.builder.BuildContext;
import io.quarkus.builder.BuildStep;
import io.quarkus.resteasy.reactive.server.spi.MethodScannerBuildItem;
import io.quarkus.test.QuarkusUnitTest;
import io.restassured.RestAssured;

/**
* Tests lots of suspends/resumes per request
*/
public class SuspendResumeStressTest {

private static volatile ExecutorService executorService;

@RegisterExtension
static QuarkusUnitTest test = new QuarkusUnitTest()
.addBuildChainCustomizer(new Consumer<BuildChainBuilder>() {
@Override
public void accept(BuildChainBuilder buildChainBuilder) {
buildChainBuilder.addBuildStep(new BuildStep() {
@Override
public void execute(BuildContext context) {
context.produce(new MethodScannerBuildItem(new MethodScanner() {
@Override
public List<HandlerChainCustomizer> scan(MethodInfo method, ClassInfo actualEndpointClass,
Map<String, Object> methodContext) {
return Collections.singletonList(new Custom());
}
}));
}
}).produces(MethodScannerBuildItem.class).build();
}
})
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClass(HelloResource.class));

@Test
public void testSuspendResumeStressTest() {
executorService = Executors.newFixedThreadPool(10);
try {
for (int i = 0; i < 100; ++i) {
RestAssured.when().get("/hello").then().body(Matchers.is("hello"));
}
} finally {
executorService.shutdownNow();
executorService = null;
}
}

@Path("hello")
public static class HelloResource {

@GET
public String hello() {
return "hello";
}

}

public static class Custom implements HandlerChainCustomizer {
@Override
public List<ServerRestHandler> handlers(Phase phase, ResourceClass resourceClass, ServerResourceMethod resourceMethod) {
List<ServerRestHandler> handlers = new ArrayList<>();
for (int i = 0; i < 100; ++i) {
handlers.add(new ResumeHandler());
}
return handlers;
}
}

public static class ResumeHandler implements ServerRestHandler {

@Override
public void handle(ResteasyReactiveRequestContext requestContext) throws Exception {
requestContext.suspend();
executorService.execute(requestContext::resume);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ public void run() {
//unless there are pre-mapping filters as these may require CDI
boolean disasociateRequestScope = false;
boolean aborted = false;
Executor exec = null;
try {
while (position < handlers.length) {
int pos = position;
Expand All @@ -152,15 +151,7 @@ public void run() {
requestScopeActivated = false;
requestScopeDeactivated();
}
if (this.executor != null) {
//resume happened in the meantime
suspended = false;
exec = this.executor;
// prevent future suspensions from re-submitting the task
this.executor = null;
return;
} else if (suspended) {
running = false;
if (suspended) {
processingSuspended = true;
return;
}
Expand All @@ -184,22 +175,34 @@ public void run() {
// we need to make sure we don't close the underlying stream in the event loop if the task
// has been offloaded to the executor
if ((position == handlers.length && !processingSuspended) || aborted) {
exec = null;
close();
} else {
if (disasociateRequestScope) {
requestScopeDeactivated();
currentRequestScope.deactivate();
}
beginAsyncProcessing();
}
if (exec != null) {
//outside sync block
exec.execute(this);
} else {
Executor exec = null;
boolean resumed = false;
synchronized (this) {
running = false;
if (this.executor != null) {
//resume happened in the meantime
suspended = false;
exec = this.executor;
// prevent future suspensions from re-submitting the task
this.executor = null;
} else if (!suspended) {
resumed = true;
}
}
if (exec != null) {
//outside sync block
exec.execute(this);
} else if (resumed) {
resume();
}

}
}
}
Expand Down

0 comments on commit 5482946

Please sign in to comment.