Skip to content

Commit

Permalink
Merge pull request quarkusio#17938 from stuartwdouglas/rr-servlet
Browse files Browse the repository at this point in the history
Resteasy Reactive servlet fixes
  • Loading branch information
stuartwdouglas authored Jun 16, 2021
2 parents 4478b87 + cc6605b commit 805aa6c
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import javax.enterprise.event.Event;
import javax.servlet.AsyncContext;
import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServletRequest;
Expand Down Expand Up @@ -58,6 +60,7 @@ public class ServletRequestContext extends ResteasyReactiveRequestContext
final HttpServletResponse response;
AsyncContext asyncContext;
ServletWriteListener writeListener;
ServletReadListener readListener;
byte[] asyncWriteData;
boolean closed;
Consumer<Throwable> asyncWriteHandler;
Expand Down Expand Up @@ -203,7 +206,11 @@ public String getRequestNormalisedPath() {

@Override
public String getRequestAbsoluteUri() {
return request.getRequestURL().toString();
if (request.getQueryString() == null) {
return request.getRequestURL().toString();
} else {
return request.getRequestURL().append("?").append(request.getQueryString()).toString();
}
}

@Override
Expand Down Expand Up @@ -276,20 +283,17 @@ public ServerHttpResponse pauseRequestInput() {

@Override
public ServerHttpResponse resumeRequestInput() {
//TODO
return this;
}

@Override
public ServerHttpResponse setReadListener(ReadCallback callback) {
byte[] buf = new byte[1024];
int r;
try {
InputStream in = request.getInputStream();
while ((r = in.read(buf)) > 0) {
callback.data(ByteBuffer.wrap(buf, 0, r));
ServletInputStream in = request.getInputStream();
if (!request.isAsyncStarted()) {
request.startAsync();
}
callback.done();
in.setReadListener(new ServletReadListener(in, callback));
} catch (IOException e) {
resume(e);
}
Expand Down Expand Up @@ -521,6 +525,88 @@ public synchronized void onError(Throwable t) {
}
}

class ServletReadListener implements ReadListener {

final ServletInputStream inputStream;
final ReadCallback readCallback;
boolean paused;
boolean allDone;
Throwable problem;

ServletReadListener(ServletInputStream inputStream, ReadCallback readCallback) {
this.inputStream = inputStream;
this.readCallback = readCallback;
}

@Override
public void onDataAvailable() throws IOException {
synchronized (this) {
if (paused) {
return;
}
}
doRead();

}

private void doRead() {
if (inputStream.isReady()) {
byte[] buf = new byte[1024];
try {
int r = inputStream.read(buf);
readCallback.data(ByteBuffer.wrap(buf, 0, r));
} catch (IOException e) {
ServletRequestContext.this.resume(problem);
}
}
}

synchronized void pause() {
paused = true;
}

void resume() {
boolean allDone;
Throwable problem;
synchronized (this) {
paused = false;
allDone = this.allDone;
this.allDone = false;
problem = this.problem;
this.problem = null;
}
if (problem != null) {
ServletRequestContext.this.resume(problem);
} else if (allDone) {
readCallback.done();
} else {
doRead();
}
}

@Override
public void onAllDataRead() throws IOException {
synchronized (this) {
if (paused) {
allDone = true;
return;
}
}
readCallback.done();
}

@Override
public void onError(Throwable t) {
synchronized (this) {
if (paused) {
problem = t;
return;
}
}
ServletRequestContext.this.resume(t);
}
}

static final class MapEntry<K, V> implements Map.Entry<K, V> {

private final K key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ public SseResponseWriterHandler() {
@Override
public void handle(ResteasyReactiveRequestContext requestContext) throws Exception {
requestContext.getSseEventSink().sendInitialResponse(requestContext.serverResponse());
requestContext.suspend();
}
}

0 comments on commit 805aa6c

Please sign in to comment.