Skip to content

Commit

Permalink
When using SSE send the headers (and flush them) as soon as we have t…
Browse files Browse the repository at this point in the history
…he Publisher (so before subscription).

Fix quarkusio#22762

(cherry picked from commit 23ab5a0)
  • Loading branch information
cescoffier authored and gsmet committed Feb 21, 2022
1 parent ddd0f35 commit 03ad7cf
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,6 @@ public static void setHeaders(ResteasyReactiveRequestContext context, ServerHttp

public static void setHeaders(ResteasyReactiveRequestContext context, ServerHttpResponse response,
List<PublisherResponseHandler.StreamingResponseCustomizer> customizers) {
// FIXME: spec says we should flush the headers when first message is sent or when the resource method returns, whichever
// happens first
if (!response.headWritten()) {
response.setStatusCode(Response.Status.OK.getStatusCode());
response.setResponseHeader(HttpHeaders.CONTENT_TYPE, MediaType.SERVER_SENT_EVENTS);
Expand All @@ -164,6 +162,7 @@ public static void setHeaders(ResteasyReactiveRequestContext context, ServerHttp
customizers.get(i).customize(response);
}
// FIXME: other headers?

}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package org.jboss.resteasy.reactive.server.handlers;

import static org.jboss.resteasy.reactive.server.jaxrs.SseEventSinkImpl.EMPTY_BUFFER;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import javax.ws.rs.core.MediaType;
import org.jboss.logging.Logger;
import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext;
Expand Down Expand Up @@ -40,9 +44,9 @@ private static class SseMultiSubscriber extends AbstractMultiSubscriber {
@Override
public void onNext(Object item) {
OutboundSseEventImpl event = new OutboundSseEventImpl.BuilderImpl().data(item).build();
SseUtil.send(requestContext, event, customizers).handle(new BiFunction<Object, Throwable, Object>() {
SseUtil.send(requestContext, event, customizers).whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public Object apply(Object v, Throwable t) {
public void accept(Object v, Throwable t) {
if (t != null) {
// need to cancel because the exception didn't come from the Multi
subscription.cancel();
Expand All @@ -51,7 +55,6 @@ public Object apply(Object v, Throwable t) {
// send in the next item
subscription.request(1);
}
return null;
}
});
}
Expand Down Expand Up @@ -204,11 +207,11 @@ public void handle(ResteasyReactiveRequestContext requestContext) throws Excepti
requestContext.setResponseContentType(mediaTypes[0]);
// this is the non-async return type
requestContext.setGenericReturnType(requestContext.getTarget().getReturnType());
// we have several possibilities here, but in all we suspend
requestContext.suspend();

if (mediaTypes[0].isCompatible(MediaType.SERVER_SENT_EVENTS_TYPE)) {
handleSse(requestContext, result);
} else {
requestContext.suspend();
boolean json = mediaTypes[0].isCompatible(MediaType.APPLICATION_JSON_TYPE);
handleStreaming(requestContext, result, json);
}
Expand All @@ -220,7 +223,18 @@ private void handleStreaming(ResteasyReactiveRequestContext requestContext, Publ
}

private void handleSse(ResteasyReactiveRequestContext requestContext, Publisher<?> result) {
result.subscribe(new SseMultiSubscriber(requestContext, streamingResponseCustomizers));
SseUtil.setHeaders(requestContext, requestContext.serverResponse(), streamingResponseCustomizers);
requestContext.suspend();
requestContext.serverResponse().write(EMPTY_BUFFER, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
if (throwable == null) {
result.subscribe(new SseMultiSubscriber(requestContext, streamingResponseCustomizers));
} else {
requestContext.resume(throwable);
}
}
});
}

public interface StreamingResponseCustomizer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

public class SseEventSinkImpl implements SseEventSink {

private static final byte[] EMPTY_BUFFER = new byte[0];
public static final byte[] EMPTY_BUFFER = new byte[0];
private ResteasyReactiveRequestContext context;
private SseBroadcasterImpl broadcaster;
private boolean closed;
Expand Down

0 comments on commit 03ad7cf

Please sign in to comment.