From 57fc33328bd4093fea8eab1a8789ad9759147c82 Mon Sep 17 00:00:00 2001 From: Georgios Andrianakis Date: Mon, 12 Apr 2021 12:50:49 +0300 Subject: [PATCH 1/2] Fix race condition on reactive client with streaming and SSE responses Fixes: #16227 --- .../reactive/server/test/stream/StreamTestCase.java | 3 --- .../resteasy/reactive/client/impl/MultiInvoker.java | 9 +++------ .../reactive/client/impl/SseEventSourceImpl.java | 6 +++--- 3 files changed, 6 insertions(+), 12 deletions(-) diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamTestCase.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamTestCase.java index d0bee34930599..a4a207511708c 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamTestCase.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamTestCase.java @@ -18,7 +18,6 @@ import org.jboss.shrinkwrap.api.ShrinkWrap; import org.jboss.shrinkwrap.api.spec.JavaArchive; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; @@ -89,7 +88,6 @@ public void testStreaming() throws Exception { } @Test - @Disabled("https://github.com/quarkusio/quarkus/issues/16227") public void testClientStreaming() throws Exception { Client client = ClientBuilder.newBuilder().build(); WebTarget target = client.target(uri.toString() + "stream/text/stream"); @@ -141,7 +139,6 @@ public void testInfiniteStreamClosedByClientAfterRegistration() throws Exception } @Test - @Disabled("https://github.com/quarkusio/quarkus/issues/16227") public void testSse() throws InterruptedException { Client client = ClientBuilder.newBuilder().build(); WebTarget target = client.target(uri.toString() + "stream/sse"); diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java index 192e11eeea04d..e7fcd50e42e4f 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java @@ -5,7 +5,6 @@ import io.vertx.core.Handler; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpClientResponse; -import io.vertx.core.http.HttpConnection; import io.vertx.core.net.impl.ConnectionBase; import java.io.ByteArrayInputStream; import java.util.concurrent.TimeUnit; @@ -155,11 +154,9 @@ private void registerForChunks(MultiRequest multiRequest, multiRequest.emitter.fail(t); } }); - HttpConnection connection = vertxClientResponse.request().connection(); - // this captures the server closing - connection.closeHandler(v -> { - multiRequest.emitter.complete(); - }); + // we don't add a closeHandler handler on the connection as it can race with this handler + // and close before the emitter emits anything + // see: https://github.com/quarkusio/quarkus/pull/16438 vertxClientResponse.handler(new Handler() { @Override public void handle(Buffer buffer) { diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/SseEventSourceImpl.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/SseEventSourceImpl.java index 98f3b5bc71dce..3c0732f04f021 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/SseEventSourceImpl.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/SseEventSourceImpl.java @@ -128,11 +128,11 @@ private void registerOnClient(HttpClientResponse vertxClientResponse) { // that is set in ClientSendRequestHandler vertxClientResponse.request().exceptionHandler(null); connection = vertxClientResponse.request().connection(); - connection.closeHandler(v -> { - close(true); - }); String sseContentTypeHeader = vertxClientResponse.getHeader(CommonSseUtil.SSE_CONTENT_TYPE); sseParser.setSseContentTypeHeader(sseContentTypeHeader); + // we don't add a closeHandler handler on the connection as it can race with this handler + // and close before the emitter emits anything + // see: https://github.com/quarkusio/quarkus/pull/16438 vertxClientResponse.handler(sseParser); vertxClientResponse.endHandler(v -> { close(true); From 633605394e50b432a4eefaa9f90f2ac39162a2f2 Mon Sep 17 00:00:00 2001 From: Georgios Andrianakis Date: Mon, 12 Apr 2021 12:52:01 +0300 Subject: [PATCH 2/2] Apply minor polish to reactive client classes --- .../resteasy/reactive/client/impl/MultiInvoker.java | 4 ++-- .../reactive/client/impl/SseEventSourceImpl.java | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java index e7fcd50e42e4f..b6cd2aabd79b5 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java @@ -17,7 +17,7 @@ public class MultiInvoker extends AbstractRxInvoker> { - private WebTargetImpl target; + private final WebTargetImpl target; public MultiInvoker(WebTargetImpl target) { this.target = target; @@ -44,7 +44,7 @@ static class MultiRequest { private final AtomicReference onCancel = new AtomicReference<>(); - private MultiEmitter emitter; + private final MultiEmitter emitter; private static final Runnable CLEARED = () -> { }; diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/SseEventSourceImpl.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/SseEventSourceImpl.java index 3c0732f04f021..87f525ff3a5be 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/SseEventSourceImpl.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/SseEventSourceImpl.java @@ -21,17 +21,17 @@ public class SseEventSourceImpl implements SseEventSource, Handler { private TimeUnit reconnectUnit; private long reconnectDelay; - private WebTargetImpl webTarget; + private final WebTargetImpl webTarget; // this tracks user request to open/close private volatile boolean isOpen; // this tracks whether we have a connection open private volatile boolean isInProgress; - private List> consumers = new ArrayList<>(); - private List> errorListeners = new ArrayList<>(); - private List completionListeners = new ArrayList<>(); + private final List> consumers = new ArrayList<>(); + private final List> errorListeners = new ArrayList<>(); + private final List completionListeners = new ArrayList<>(); private HttpConnection connection; - private SseParser sseParser; + private final SseParser sseParser; private long timerId = -1; private boolean receivedClientClose;