From 77ed2d93ed8502546457fc6d388d484e9ff5b702 Mon Sep 17 00:00:00 2001 From: Georgios Andrianakis Date: Mon, 29 Jan 2024 10:52:19 +0200 Subject: [PATCH] Fix request hanging condition This condition could happen when a TLS secured connection had a lot of data written to it very fast from an event loop thread. The problem was that there was a race condition on the registration of the drain handler, which then resulted in the pending data never being written out. The fix is to always register the drain handler Resolves: #37807 --- .../vertx/ResteasyReactiveOutputStream.java | 68 +++++++++++-------- 1 file changed, 39 insertions(+), 29 deletions(-) diff --git a/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/ResteasyReactiveOutputStream.java b/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/ResteasyReactiveOutputStream.java index 6a2fcf0a3ea95..a0135f6876a77 100644 --- a/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/ResteasyReactiveOutputStream.java +++ b/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/ResteasyReactiveOutputStream.java @@ -57,6 +57,9 @@ public void handle(Throwable event) { } } }); + Handler handler = new DrainHandler(this); + request.response().drainHandler(handler); + request.response().closeHandler(handler); context.getContext().addEndHandler(new Handler>() { @Override @@ -90,7 +93,7 @@ public void write(ByteBuf data, boolean last) throws IOException { boolean bufferRequired = awaitWriteable() || (overflow != null && overflow.size() > 0); if (bufferRequired) { //just buffer everything - registerDrainHandler(); + // registerDrainHandler(); if (overflow == null) { overflow = new ByteArrayOutputStream(); } @@ -135,7 +138,7 @@ private boolean awaitWriteable() throws IOException { if (request.response().closed()) { throw new IOException("Connection has been closed"); } - registerDrainHandler(); + // registerDrainHandler(); try { waitingForDrain = true; request.connection().wait(); @@ -148,33 +151,14 @@ private boolean awaitWriteable() throws IOException { return false; } - private void registerDrainHandler() { - if (!drainHandlerRegistered) { - drainHandlerRegistered = true; - Handler handler = new Handler() { - @Override - public void handle(Void event) { - synchronized (request.connection()) { - if (waitingForDrain) { - request.connection().notifyAll(); - } - if (overflow != null) { - if (overflow.size() > 0) { - if (closed) { - request.response().end(Buffer.buffer(overflow.toByteArray()), null); - } else { - request.response().write(Buffer.buffer(overflow.toByteArray()), null); - } - overflow.reset(); - } - } - } - } - }; - request.response().drainHandler(handler); - request.response().closeHandler(handler); - } - } + // private void registerDrainHandler() { + // if (!drainHandlerRegistered) { + // drainHandlerRegistered = true; + // Handler handler = new DrainHandler(this); + // request.response().drainHandler(handler); + // request.response().closeHandler(handler); + // } + // } /** * {@inheritDoc} @@ -299,4 +283,30 @@ public void close() throws IOException { } } + private static class DrainHandler implements Handler { + private final ResteasyReactiveOutputStream out; + + public DrainHandler(ResteasyReactiveOutputStream out) { + this.out = out; + } + + @Override + public void handle(Void event) { + synchronized (out.request.connection()) { + if (out.waitingForDrain) { + out.request.connection().notifyAll(); + } + if (out.overflow != null) { + if (out.overflow.size() > 0) { + if (out.closed) { + out.request.response().end(Buffer.buffer(out.overflow.toByteArray()), null); + } else { + out.request.response().write(Buffer.buffer(out.overflow.toByteArray()), null); + } + out.overflow.reset(); + } + } + } + } + } }