From 23ab5a000f0c5c23c0e3b9a149bc1786a991fe55 Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Mon, 21 Feb 2022 14:34:09 +0100 Subject: [PATCH] When using SSE send the headers (and flush them) as soon as we have the Publisher (so before subscription). Fix #22762 --- .../reactive/server/core/SseUtil.java | 3 +-- .../handlers/PublisherResponseHandler.java | 26 ++++++++++++++----- .../server/jaxrs/SseEventSinkImpl.java | 2 +- 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/SseUtil.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/SseUtil.java index 963d71a82fec5..07564a0982b12 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/SseUtil.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/SseUtil.java @@ -151,8 +151,6 @@ public static void setHeaders(ResteasyReactiveRequestContext context, ServerHttp public static void setHeaders(ResteasyReactiveRequestContext context, ServerHttpResponse response, List customizers) { - // FIXME: spec says we should flush the headers when first message is sent or when the resource method returns, whichever - // happens first if (!response.headWritten()) { response.setStatusCode(Response.Status.OK.getStatusCode()); response.setResponseHeader(HttpHeaders.CONTENT_TYPE, MediaType.SERVER_SENT_EVENTS); @@ -164,6 +162,7 @@ public static void setHeaders(ResteasyReactiveRequestContext context, ServerHttp customizers.get(i).customize(response); } // FIXME: other headers? + } } } diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java index 43ddb8b91cf78..64dc218adb12a 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java @@ -1,10 +1,14 @@ package org.jboss.resteasy.reactive.server.handlers; +import static org.jboss.resteasy.reactive.server.jaxrs.SseEventSinkImpl.EMPTY_BUFFER; + import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.BiConsumer; import java.util.function.BiFunction; +import java.util.function.Consumer; import javax.ws.rs.core.MediaType; import org.jboss.logging.Logger; import org.jboss.resteasy.reactive.common.util.RestMediaType; @@ -43,9 +47,9 @@ private static class SseMultiSubscriber extends AbstractMultiSubscriber { @Override public void onNext(Object item) { OutboundSseEventImpl event = new OutboundSseEventImpl.BuilderImpl().data(item).build(); - SseUtil.send(requestContext, event, customizers).handle(new BiFunction() { + SseUtil.send(requestContext, event, customizers).whenComplete(new BiConsumer() { @Override - public Object apply(Object v, Throwable t) { + public void accept(Object v, Throwable t) { if (t != null) { // need to cancel because the exception didn't come from the Multi subscription.cancel(); @@ -54,7 +58,6 @@ public Object apply(Object v, Throwable t) { // send in the next item subscription.request(1); } - return null; } }); } @@ -250,11 +253,11 @@ public void handle(ResteasyReactiveRequestContext requestContext) throws Excepti requestContext.setResponseContentType(mediaType); // this is the non-async return type requestContext.setGenericReturnType(requestContext.getTarget().getReturnType()); - // we have several possibilities here, but in all we suspend - requestContext.suspend(); + if (mediaType.isCompatible(MediaType.SERVER_SENT_EVENTS_TYPE)) { handleSse(requestContext, result); } else { + requestContext.suspend(); boolean json = mediaType.toString().contains(JSON); if (requiresChunkedStream(mediaType)) { handleChunkedStreaming(requestContext, result, json); @@ -279,7 +282,18 @@ private void handleStreaming(ResteasyReactiveRequestContext requestContext, Publ } private void handleSse(ResteasyReactiveRequestContext requestContext, Publisher result) { - result.subscribe(new SseMultiSubscriber(requestContext, streamingResponseCustomizers)); + SseUtil.setHeaders(requestContext, requestContext.serverResponse(), streamingResponseCustomizers); + requestContext.suspend(); + requestContext.serverResponse().write(EMPTY_BUFFER, new Consumer() { + @Override + public void accept(Throwable throwable) { + if (throwable == null) { + result.subscribe(new SseMultiSubscriber(requestContext, streamingResponseCustomizers)); + } else { + requestContext.resume(throwable); + } + } + }); } public interface StreamingResponseCustomizer { diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/jaxrs/SseEventSinkImpl.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/jaxrs/SseEventSinkImpl.java index 6cdf099eab4fc..48735ac388c20 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/jaxrs/SseEventSinkImpl.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/jaxrs/SseEventSinkImpl.java @@ -11,7 +11,7 @@ public class SseEventSinkImpl implements SseEventSink { - private static final byte[] EMPTY_BUFFER = new byte[0]; + public static final byte[] EMPTY_BUFFER = new byte[0]; private ResteasyReactiveRequestContext context; private SseBroadcasterImpl broadcaster; private boolean closed;