From cc6605bdf98613a51b4c53103bc58ad763539492 Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Wed, 16 Jun 2021 15:32:36 +1000 Subject: [PATCH] resteasy-reactive servlet fixes Fix some issues when running the testsuite against the Servlet module --- .../runtime/ServletRequestContext.java | 102 ++++++++++++++++-- .../handlers/SseResponseWriterHandler.java | 1 + 2 files changed, 95 insertions(+), 8 deletions(-) diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-servlet/runtime/src/main/java/io/quarkus/resteasy/reactive/server/servlet/runtime/ServletRequestContext.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-servlet/runtime/src/main/java/io/quarkus/resteasy/reactive/server/servlet/runtime/ServletRequestContext.java index cfd0f04f77e51..e68df66ccf8a1 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-servlet/runtime/src/main/java/io/quarkus/resteasy/reactive/server/servlet/runtime/ServletRequestContext.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-servlet/runtime/src/main/java/io/quarkus/resteasy/reactive/server/servlet/runtime/ServletRequestContext.java @@ -18,6 +18,8 @@ import javax.enterprise.event.Event; import javax.servlet.AsyncContext; +import javax.servlet.ReadListener; +import javax.servlet.ServletInputStream; import javax.servlet.ServletOutputStream; import javax.servlet.WriteListener; import javax.servlet.http.HttpServletRequest; @@ -58,6 +60,7 @@ public class ServletRequestContext extends ResteasyReactiveRequestContext final HttpServletResponse response; AsyncContext asyncContext; ServletWriteListener writeListener; + ServletReadListener readListener; byte[] asyncWriteData; boolean closed; Consumer asyncWriteHandler; @@ -203,7 +206,11 @@ public String getRequestNormalisedPath() { @Override public String getRequestAbsoluteUri() { - return request.getRequestURL().toString(); + if (request.getQueryString() == null) { + return request.getRequestURL().toString(); + } else { + return request.getRequestURL().append("?").append(request.getQueryString()).toString(); + } } @Override @@ -276,20 +283,17 @@ public ServerHttpResponse pauseRequestInput() { @Override public ServerHttpResponse resumeRequestInput() { - //TODO return this; } @Override public ServerHttpResponse setReadListener(ReadCallback callback) { - byte[] buf = new byte[1024]; - int r; try { - InputStream in = request.getInputStream(); - while ((r = in.read(buf)) > 0) { - callback.data(ByteBuffer.wrap(buf, 0, r)); + ServletInputStream in = request.getInputStream(); + if (!request.isAsyncStarted()) { + request.startAsync(); } - callback.done(); + in.setReadListener(new ServletReadListener(in, callback)); } catch (IOException e) { resume(e); } @@ -521,6 +525,88 @@ public synchronized void onError(Throwable t) { } } + class ServletReadListener implements ReadListener { + + final ServletInputStream inputStream; + final ReadCallback readCallback; + boolean paused; + boolean allDone; + Throwable problem; + + ServletReadListener(ServletInputStream inputStream, ReadCallback readCallback) { + this.inputStream = inputStream; + this.readCallback = readCallback; + } + + @Override + public void onDataAvailable() throws IOException { + synchronized (this) { + if (paused) { + return; + } + } + doRead(); + + } + + private void doRead() { + if (inputStream.isReady()) { + byte[] buf = new byte[1024]; + try { + int r = inputStream.read(buf); + readCallback.data(ByteBuffer.wrap(buf, 0, r)); + } catch (IOException e) { + ServletRequestContext.this.resume(problem); + } + } + } + + synchronized void pause() { + paused = true; + } + + void resume() { + boolean allDone; + Throwable problem; + synchronized (this) { + paused = false; + allDone = this.allDone; + this.allDone = false; + problem = this.problem; + this.problem = null; + } + if (problem != null) { + ServletRequestContext.this.resume(problem); + } else if (allDone) { + readCallback.done(); + } else { + doRead(); + } + } + + @Override + public void onAllDataRead() throws IOException { + synchronized (this) { + if (paused) { + allDone = true; + return; + } + } + readCallback.done(); + } + + @Override + public void onError(Throwable t) { + synchronized (this) { + if (paused) { + problem = t; + return; + } + } + ServletRequestContext.this.resume(t); + } + } + static final class MapEntry implements Map.Entry { private final K key; diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/SseResponseWriterHandler.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/SseResponseWriterHandler.java index c1a2526f12224..3352297a708ba 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/SseResponseWriterHandler.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/SseResponseWriterHandler.java @@ -14,5 +14,6 @@ public SseResponseWriterHandler() { @Override public void handle(ResteasyReactiveRequestContext requestContext) throws Exception { requestContext.getSseEventSink().sendInitialResponse(requestContext.serverResponse()); + requestContext.suspend(); } }