Skip to content

Commit

Permalink
Merge pull request #16438 from geoand/#16227
Browse files Browse the repository at this point in the history
Fix race condition on reactive client with streaming and SSE responses
  • Loading branch information
gastaldi authored Apr 13, 2021
2 parents 9d5f8d5 + 6336053 commit eb169e8
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
Expand Down Expand Up @@ -89,7 +88,6 @@ public void testStreaming() throws Exception {
}

@Test
@Disabled("https://github.com/quarkusio/quarkus/issues/16227")
public void testClientStreaming() throws Exception {
Client client = ClientBuilder.newBuilder().build();
WebTarget target = client.target(uri.toString() + "stream/text/stream");
Expand Down Expand Up @@ -141,7 +139,6 @@ public void testInfiniteStreamClosedByClientAfterRegistration() throws Exception
}

@Test
@Disabled("https://github.com/quarkusio/quarkus/issues/16227")
public void testSse() throws InterruptedException {
Client client = ClientBuilder.newBuilder().build();
WebTarget target = client.target(uri.toString() + "stream/sse");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.net.impl.ConnectionBase;
import java.io.ByteArrayInputStream;
import java.util.concurrent.TimeUnit;
Expand All @@ -18,7 +17,7 @@

public class MultiInvoker extends AbstractRxInvoker<Multi<?>> {

private WebTargetImpl target;
private final WebTargetImpl target;

public MultiInvoker(WebTargetImpl target) {
this.target = target;
Expand All @@ -45,7 +44,7 @@ static class MultiRequest<R> {

private final AtomicReference<Runnable> onCancel = new AtomicReference<>();

private MultiEmitter<? super R> emitter;
private final MultiEmitter<? super R> emitter;

private static final Runnable CLEARED = () -> {
};
Expand Down Expand Up @@ -155,11 +154,9 @@ private <R> void registerForChunks(MultiRequest<? super R> multiRequest,
multiRequest.emitter.fail(t);
}
});
HttpConnection connection = vertxClientResponse.request().connection();
// this captures the server closing
connection.closeHandler(v -> {
multiRequest.emitter.complete();
});
// we don't add a closeHandler handler on the connection as it can race with this handler
// and close before the emitter emits anything
// see: https://github.com/quarkusio/quarkus/pull/16438
vertxClientResponse.handler(new Handler<Buffer>() {
@Override
public void handle(Buffer buffer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ public class SseEventSourceImpl implements SseEventSource, Handler<Long> {
private TimeUnit reconnectUnit;
private long reconnectDelay;

private WebTargetImpl webTarget;
private final WebTargetImpl webTarget;
// this tracks user request to open/close
private volatile boolean isOpen;
// this tracks whether we have a connection open
private volatile boolean isInProgress;

private List<Consumer<InboundSseEvent>> consumers = new ArrayList<>();
private List<Consumer<Throwable>> errorListeners = new ArrayList<>();
private List<Runnable> completionListeners = new ArrayList<>();
private final List<Consumer<InboundSseEvent>> consumers = new ArrayList<>();
private final List<Consumer<Throwable>> errorListeners = new ArrayList<>();
private final List<Runnable> completionListeners = new ArrayList<>();
private HttpConnection connection;
private SseParser sseParser;
private final SseParser sseParser;
private long timerId = -1;
private boolean receivedClientClose;

Expand Down Expand Up @@ -128,11 +128,11 @@ private void registerOnClient(HttpClientResponse vertxClientResponse) {
// that is set in ClientSendRequestHandler
vertxClientResponse.request().exceptionHandler(null);
connection = vertxClientResponse.request().connection();
connection.closeHandler(v -> {
close(true);
});
String sseContentTypeHeader = vertxClientResponse.getHeader(CommonSseUtil.SSE_CONTENT_TYPE);
sseParser.setSseContentTypeHeader(sseContentTypeHeader);
// we don't add a closeHandler handler on the connection as it can race with this handler
// and close before the emitter emits anything
// see: https://github.com/quarkusio/quarkus/pull/16438
vertxClientResponse.handler(sseParser);
vertxClientResponse.endHandler(v -> {
close(true);
Expand Down

0 comments on commit eb169e8

Please sign in to comment.