Skip to content

Commit

Permalink
Minor Resteast Reactive Servlet fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Feb 4, 2021
1 parent 467a587 commit ebd18c8
Show file tree
Hide file tree
Showing 14 changed files with 169 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ public ArcThreadSetupAction(ManagedContext managedContext) {
public ThreadState activateInitial() {
managedContext.activate();
InjectableContext.ContextState state = managedContext.getState();
return toThreadState(state);
}

private ThreadState toThreadState(InjectableContext.ContextState state) {
return new ThreadState() {
@Override
public void close() {
Expand All @@ -34,4 +38,9 @@ public void deactivate() {
}
};
}

@Override
public ThreadState currentState() {
return toThreadState(managedContext.getState());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.quarkus.arc.Arc;
import io.quarkus.arc.impl.LazyValue;
import io.quarkus.resteasy.reactive.server.runtime.ResteasyReactiveSecurityContext;
import io.quarkus.runtime.BlockingOperationControl;
import io.quarkus.security.identity.SecurityIdentity;
import io.quarkus.vertx.http.runtime.security.QuarkusHttpUser;
import io.undertow.server.HttpServerExchange;
Expand All @@ -58,6 +59,7 @@ public class ServletRequestContext extends ResteasyReactiveRequestContext
AsyncContext asyncContext;
ServletWriteListener writeListener;
byte[] asyncWriteData;
boolean closed;
Consumer<Throwable> asyncWriteHandler;
protected Consumer<ResteasyReactiveRequestContext> preCommitTask;

Expand All @@ -81,10 +83,14 @@ protected void beginAsyncProcessing() {
}

@Override
public void close() {
super.close();
if (asyncContext != null) {
asyncContext.complete();
public synchronized void close() {
if (asyncWriteData != null) {
closed = true;
} else {
super.close();
if (asyncContext != null) {
asyncContext.complete();
}
}
}

Expand Down Expand Up @@ -197,7 +203,7 @@ public String getRequestNormalisedPath() {

@Override
public String getRequestAbsoluteUri() {
return request.getRequestURI();
return request.getRequestURL().toString();
}

@Override
Expand Down Expand Up @@ -279,6 +285,15 @@ public InputStream createInputStream(ByteBuffer existingData) {
return new ServletResteasyReactiveInputStream(existingData, request);
}

@Override
public InputStream createInputStream() {
try {
return request.getInputStream();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public ServerHttpResponse pauseRequestInput() {
//TODO
Expand Down Expand Up @@ -346,23 +361,31 @@ public boolean headWritten() {

@Override
public ServerHttpResponse end(byte[] data) {
try {
response.getOutputStream().write(data);
response.getOutputStream().close();
} catch (IOException e) {
log.debug("IoException writing response", e);
if (BlockingOperationControl.isBlockingAllowed()) {
try {
response.getOutputStream().write(data);
response.getOutputStream().close();
} catch (IOException e) {
log.debug("IoException writing response", e);
}
} else {
write(data, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
try {
response.getOutputStream().close();
} catch (IOException e) {
log.debug("IoException writing response", e);
}
}
});
}
return this;
}

@Override
public ServerHttpResponse end(String data) {
try {
response.getOutputStream().write(data.getBytes(StandardCharsets.UTF_8));
response.getOutputStream().close();
} catch (IOException e) {
log.debug("IoException writing response", e);
}
end(data.getBytes(StandardCharsets.UTF_8));
return this;
}

Expand Down Expand Up @@ -422,17 +445,22 @@ public ServerHttpResponse write(byte[] data, Consumer<Throwable> asyncResultHand
asyncResultHandler.accept(e);
}
} else {
if (writeListener == null) {
try {
ServletOutputStream outputStream = response.getOutputStream();
outputStream.setWriteListener(writeListener = new ServletWriteListener(outputStream));
} catch (IOException e) {
asyncResultHandler.accept(e);
synchronized (this) {
if (asyncWriteData != null) {
throw new IllegalStateException("Cannot write more than one piece of async data at a time");
}
} else {
asyncWriteData = data;
asyncWriteHandler = asyncResultHandler;
writeListener.onWritePossible();
if (writeListener == null) {
try {
ServletOutputStream outputStream = response.getOutputStream();
outputStream.setWriteListener(writeListener = new ServletWriteListener(outputStream));
} catch (IOException e) {
asyncResultHandler.accept(e);
}
} else {
writeListener.onWritePossible();
}
}
}
return this;
Expand Down Expand Up @@ -484,29 +512,37 @@ class ServletWriteListener implements WriteListener {
}

@Override
public synchronized void onWritePossible() {
if (!outputStream.isReady()) {
return;
}
Consumer<Throwable> ctx = asyncWriteHandler;
byte[] data = asyncWriteData;
asyncWriteHandler = null;
asyncWriteData = null;
try {
outputStream.write(data);
ctx.accept(null);
} catch (IOException e) {
ctx.accept(e);
public void onWritePossible() {
synchronized (ServletRequestContext.this) {
if (!outputStream.isReady()) {
return;
}
Consumer<Throwable> ctx = asyncWriteHandler;
byte[] data = asyncWriteData;
asyncWriteHandler = null;
asyncWriteData = null;
try {
outputStream.write(data);
ctx.accept(null);
} catch (IOException e) {
ctx.accept(e);
}
if (closed) {
close();
}
}
}

@Override
public synchronized void onError(Throwable t) {
if (asyncWriteHandler != null) {
Consumer<Throwable> ctx = asyncWriteHandler;
asyncWriteHandler = null;
asyncWriteData = null;
ctx.accept(t);
synchronized (ServletRequestContext.this) {
if (asyncWriteHandler != null) {
Consumer<Throwable> ctx = asyncWriteHandler;
asyncWriteHandler = null;
asyncWriteData = null;
ctx.accept(t);
close();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,9 @@ public ResteasyReactiveRequestContext createContext(Deployment deployment, Provi
(HttpServletResponse) src.getServletResponse(), requestContext, handlerChain, abortHandlerChain,
(RoutingContext) ((VertxHttpExchange) src.getExchange().getDelegate()).getContext(), src.getExchange());
}

@Override
public boolean isDefaultBlocking() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ public void handle(ResteasyReactiveRequestContext context) throws Exception {
context.serverRequest().getRequestMethod().equals(HttpMethod.HEAD)) {
return;
}

VertxResteasyReactiveRequestContext vertxContext = (VertxResteasyReactiveRequestContext) context;
RoutingContext routingContext = vertxContext.getContext();
vertxContext.setInputStream(new VertxInputStream(routingContext, timeout));
if (context instanceof VertxResteasyReactiveRequestContext) {
//TODO: this should not be installed for servlet
VertxResteasyReactiveRequestContext vertxContext = (VertxResteasyReactiveRequestContext) context;
RoutingContext routingContext = vertxContext.getContext();
vertxContext.setInputStream(new VertxInputStream(routingContext, timeout));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public ResteasyReactiveRequestContext createContext(Deployment deployment,
handlerChain,
abortHandlerChain, currentIdentityAssociation);
}

};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ public void run() {
}
requestScopeActivated = false;
}
} else {
requestScopeActivated = false;
requestScopeDeactivated();
}
if (this.executor != null) {
//resume happened in the meantime
Expand Down Expand Up @@ -179,12 +182,16 @@ public void requireCDIRequestScope() {
return;
}
requestScopeActivated = true;
if (currentRequestScope == null) {
currentRequestScope = requestContext.activateInitial();
handleRequestScopeActivation();
if (isRequestScopeManagementRequired()) {
if (currentRequestScope == null) {
currentRequestScope = requestContext.activateInitial();
} else {
currentRequestScope.activate();
}
} else {
currentRequestScope.activate();
currentRequestScope = requestContext.currentState();
}
handleRequestScopeActivation();
}

protected abstract void handleRequestScopeActivation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ public interface ThreadSetupAction {

ThreadState activateInitial();

ThreadState currentState();

interface ThreadState {
void close();

Expand Down Expand Up @@ -32,5 +34,10 @@ public void deactivate() {
}
};
}

@Override
public ThreadState currentState() {
return activateInitial();
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,12 @@ public interface RequestContextFactory {
ResteasyReactiveRequestContext createContext(Deployment deployment, ProvidersImpl providers,
Object context,
ThreadSetupAction requestContext, ServerRestHandler[] handlerChain, ServerRestHandler[] abortHandlerChain);

/**
* @return <code>true</code> if requests default to blocking when created by this factory
*/
default boolean isDefaultBlocking() {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import javax.ws.rs.ext.ReaderInterceptor;
import javax.ws.rs.ext.WriterInterceptor;
import org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext;
import org.jboss.resteasy.reactive.common.util.EmptyInputStream;
import org.jboss.resteasy.reactive.common.util.Encode;
import org.jboss.resteasy.reactive.common.util.PathSegmentImpl;
import org.jboss.resteasy.reactive.server.core.serialization.EntityWriter;
Expand Down Expand Up @@ -128,7 +127,7 @@ public abstract class ResteasyReactiveRequestContext
/**
* The input stream, if an entity is present.
*/
private InputStream inputStream = EmptyInputStream.INSTANCE;
private InputStream inputStream;

/**
* used for {@link UriInfo#getMatchedURIs()}
Expand Down Expand Up @@ -634,6 +633,7 @@ protected void handleRequestScopeActivation() {

@Override
protected void requestScopeDeactivated() {
CurrentRequestManager.set(null);
}

@Override
Expand Down Expand Up @@ -698,7 +698,14 @@ public List<UriMatch> getMatchedURIs() {
return matchedURIs;
}

public boolean hasInputStream() {
return inputStream != null;
}

public InputStream getInputStream() {
if (inputStream == null) {
inputStream = serverRequest().createInputStream();
}
return inputStream;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public BeanFactory.BeanInstance<?> apply(Class<?> aClass) {
});
RuntimeResourceDeployment runtimeResourceDeployment = new RuntimeResourceDeployment(info, executorSupplier,
blockingInputHandlerSupplier,
interceptorDeployment, dynamicEntityWriter, resourceLocatorHandler);
interceptorDeployment, dynamicEntityWriter, resourceLocatorHandler, requestContextFactory.isDefaultBlocking());
List<ResourceClass> possibleSubResource = new ArrayList<>(locatableResourceClasses);
possibleSubResource.addAll(resourceClasses); //the TCK uses normal resources also as sub resources
for (ResourceClass clazz : possibleSubResource) {
Expand Down
Loading

0 comments on commit ebd18c8

Please sign in to comment.