Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor Resteasy Reactive Servlet fixes #14783

Merged
merged 1 commit into from
Feb 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ public ArcThreadSetupAction(ManagedContext managedContext) {
public ThreadState activateInitial() {
managedContext.activate();
InjectableContext.ContextState state = managedContext.getState();
return toThreadState(state);
}

private ThreadState toThreadState(InjectableContext.ContextState state) {
return new ThreadState() {
@Override
public void close() {
Expand All @@ -34,4 +38,9 @@ public void deactivate() {
}
};
}

@Override
public ThreadState currentState() {
return toThreadState(managedContext.getState());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.quarkus.arc.Arc;
import io.quarkus.arc.impl.LazyValue;
import io.quarkus.resteasy.reactive.server.runtime.ResteasyReactiveSecurityContext;
import io.quarkus.runtime.BlockingOperationControl;
import io.quarkus.security.identity.SecurityIdentity;
import io.quarkus.vertx.http.runtime.security.QuarkusHttpUser;
import io.undertow.server.HttpServerExchange;
Expand All @@ -58,6 +59,7 @@ public class ServletRequestContext extends ResteasyReactiveRequestContext
AsyncContext asyncContext;
ServletWriteListener writeListener;
byte[] asyncWriteData;
boolean closed;
Consumer<Throwable> asyncWriteHandler;
protected Consumer<ResteasyReactiveRequestContext> preCommitTask;

Expand All @@ -81,10 +83,14 @@ protected void beginAsyncProcessing() {
}

@Override
public void close() {
super.close();
if (asyncContext != null) {
asyncContext.complete();
public synchronized void close() {
if (asyncWriteData != null) {
closed = true;
} else {
super.close();
if (asyncContext != null) {
asyncContext.complete();
}
}
}

Expand Down Expand Up @@ -197,7 +203,7 @@ public String getRequestNormalisedPath() {

@Override
public String getRequestAbsoluteUri() {
return request.getRequestURI();
return request.getRequestURL().toString();
}

@Override
Expand Down Expand Up @@ -279,6 +285,15 @@ public InputStream createInputStream(ByteBuffer existingData) {
return new ServletResteasyReactiveInputStream(existingData, request);
}

@Override
public InputStream createInputStream() {
try {
return request.getInputStream();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public ServerHttpResponse pauseRequestInput() {
//TODO
Expand Down Expand Up @@ -346,23 +361,31 @@ public boolean headWritten() {

@Override
public ServerHttpResponse end(byte[] data) {
try {
response.getOutputStream().write(data);
response.getOutputStream().close();
} catch (IOException e) {
log.debug("IoException writing response", e);
if (BlockingOperationControl.isBlockingAllowed()) {
try {
response.getOutputStream().write(data);
response.getOutputStream().close();
} catch (IOException e) {
log.debug("IoException writing response", e);
}
} else {
write(data, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
try {
response.getOutputStream().close();
} catch (IOException e) {
log.debug("IoException writing response", e);
}
}
});
}
return this;
}

@Override
public ServerHttpResponse end(String data) {
try {
response.getOutputStream().write(data.getBytes(StandardCharsets.UTF_8));
response.getOutputStream().close();
} catch (IOException e) {
log.debug("IoException writing response", e);
}
end(data.getBytes(StandardCharsets.UTF_8));
return this;
}

Expand Down Expand Up @@ -422,17 +445,22 @@ public ServerHttpResponse write(byte[] data, Consumer<Throwable> asyncResultHand
asyncResultHandler.accept(e);
}
} else {
if (writeListener == null) {
try {
ServletOutputStream outputStream = response.getOutputStream();
outputStream.setWriteListener(writeListener = new ServletWriteListener(outputStream));
} catch (IOException e) {
asyncResultHandler.accept(e);
synchronized (this) {
if (asyncWriteData != null) {
throw new IllegalStateException("Cannot write more than one piece of async data at a time");
}
} else {
asyncWriteData = data;
asyncWriteHandler = asyncResultHandler;
writeListener.onWritePossible();
if (writeListener == null) {
try {
ServletOutputStream outputStream = response.getOutputStream();
outputStream.setWriteListener(writeListener = new ServletWriteListener(outputStream));
} catch (IOException e) {
asyncResultHandler.accept(e);
}
} else {
writeListener.onWritePossible();
}
}
}
return this;
Expand Down Expand Up @@ -484,29 +512,37 @@ class ServletWriteListener implements WriteListener {
}

@Override
public synchronized void onWritePossible() {
if (!outputStream.isReady()) {
return;
}
Consumer<Throwable> ctx = asyncWriteHandler;
byte[] data = asyncWriteData;
asyncWriteHandler = null;
asyncWriteData = null;
try {
outputStream.write(data);
ctx.accept(null);
} catch (IOException e) {
ctx.accept(e);
public void onWritePossible() {
synchronized (ServletRequestContext.this) {
if (!outputStream.isReady()) {
return;
}
Consumer<Throwable> ctx = asyncWriteHandler;
byte[] data = asyncWriteData;
asyncWriteHandler = null;
asyncWriteData = null;
try {
outputStream.write(data);
ctx.accept(null);
} catch (IOException e) {
ctx.accept(e);
}
if (closed) {
close();
}
}
}

@Override
public synchronized void onError(Throwable t) {
if (asyncWriteHandler != null) {
Consumer<Throwable> ctx = asyncWriteHandler;
asyncWriteHandler = null;
asyncWriteData = null;
ctx.accept(t);
synchronized (ServletRequestContext.this) {
if (asyncWriteHandler != null) {
Consumer<Throwable> ctx = asyncWriteHandler;
asyncWriteHandler = null;
asyncWriteData = null;
ctx.accept(t);
close();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,9 @@ public ResteasyReactiveRequestContext createContext(Deployment deployment, Provi
(HttpServletResponse) src.getServletResponse(), requestContext, handlerChain, abortHandlerChain,
(RoutingContext) ((VertxHttpExchange) src.getExchange().getDelegate()).getContext(), src.getExchange());
}

@Override
public boolean isDefaultBlocking() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ public void handle(ResteasyReactiveRequestContext context) throws Exception {
context.serverRequest().getRequestMethod().equals(HttpMethod.HEAD)) {
return;
}

VertxResteasyReactiveRequestContext vertxContext = (VertxResteasyReactiveRequestContext) context;
RoutingContext routingContext = vertxContext.getContext();
vertxContext.setInputStream(new VertxInputStream(routingContext, timeout));
if (context instanceof VertxResteasyReactiveRequestContext) {
//TODO: this should not be installed for servlet
VertxResteasyReactiveRequestContext vertxContext = (VertxResteasyReactiveRequestContext) context;
RoutingContext routingContext = vertxContext.getContext();
vertxContext.setInputStream(new VertxInputStream(routingContext, timeout));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public ResteasyReactiveRequestContext createContext(Deployment deployment,
handlerChain,
abortHandlerChain, currentIdentityAssociation);
}

};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ public void run() {
}
requestScopeActivated = false;
}
} else {
requestScopeActivated = false;
requestScopeDeactivated();
}
if (this.executor != null) {
//resume happened in the meantime
Expand Down Expand Up @@ -155,8 +158,8 @@ public void run() {
close();
} else {
if (disasociateRequestScope) {
currentRequestScope.deactivate();
requestScopeDeactivated();
currentRequestScope.deactivate();
}
beginAsyncProcessing();
}
Expand All @@ -179,12 +182,16 @@ public void requireCDIRequestScope() {
return;
}
requestScopeActivated = true;
if (currentRequestScope == null) {
currentRequestScope = requestContext.activateInitial();
handleRequestScopeActivation();
if (isRequestScopeManagementRequired()) {
if (currentRequestScope == null) {
currentRequestScope = requestContext.activateInitial();
} else {
currentRequestScope.activate();
}
} else {
currentRequestScope.activate();
currentRequestScope = requestContext.currentState();
}
handleRequestScopeActivation();
}

protected abstract void handleRequestScopeActivation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ public interface ThreadSetupAction {

ThreadState activateInitial();

ThreadState currentState();

interface ThreadState {
void close();

Expand Down Expand Up @@ -32,5 +34,10 @@ public void deactivate() {
}
};
}

@Override
public ThreadState currentState() {
return activateInitial();
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,12 @@ public interface RequestContextFactory {
ResteasyReactiveRequestContext createContext(Deployment deployment, ProvidersImpl providers,
Object context,
ThreadSetupAction requestContext, ServerRestHandler[] handlerChain, ServerRestHandler[] abortHandlerChain);

/**
* @return <code>true</code> if requests default to blocking when created by this factory
*/
default boolean isDefaultBlocking() {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import javax.ws.rs.ext.ReaderInterceptor;
import javax.ws.rs.ext.WriterInterceptor;
import org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext;
import org.jboss.resteasy.reactive.common.util.EmptyInputStream;
import org.jboss.resteasy.reactive.common.util.Encode;
import org.jboss.resteasy.reactive.common.util.PathSegmentImpl;
import org.jboss.resteasy.reactive.server.core.serialization.EntityWriter;
Expand Down Expand Up @@ -128,7 +127,7 @@ public abstract class ResteasyReactiveRequestContext
/**
* The input stream, if an entity is present.
*/
private InputStream inputStream = EmptyInputStream.INSTANCE;
private InputStream inputStream;

/**
* used for {@link UriInfo#getMatchedURIs()}
Expand Down Expand Up @@ -634,6 +633,7 @@ protected void handleRequestScopeActivation() {

@Override
protected void requestScopeDeactivated() {
CurrentRequestManager.set(null);
}

@Override
Expand Down Expand Up @@ -698,7 +698,14 @@ public List<UriMatch> getMatchedURIs() {
return matchedURIs;
}

public boolean hasInputStream() {
return inputStream != null;
}

public InputStream getInputStream() {
if (inputStream == null) {
inputStream = serverRequest().createInputStream();
}
return inputStream;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public BeanFactory.BeanInstance<?> apply(Class<?> aClass) {
});
RuntimeResourceDeployment runtimeResourceDeployment = new RuntimeResourceDeployment(info, executorSupplier,
blockingInputHandlerSupplier,
interceptorDeployment, dynamicEntityWriter, resourceLocatorHandler);
interceptorDeployment, dynamicEntityWriter, resourceLocatorHandler, requestContextFactory.isDefaultBlocking());
List<ResourceClass> possibleSubResource = new ArrayList<>(locatableResourceClasses);
possibleSubResource.addAll(resourceClasses); //the TCK uses normal resources also as sub resources
for (ResourceClass clazz : possibleSubResource) {
Expand Down
Loading