Skip to content

Commit

Permalink
When using SSE send the headers (and flush them) as soon as we have t…
Browse files Browse the repository at this point in the history
…he Publisher (so before subscription).

Fix #22762
  • Loading branch information
cescoffier committed Feb 21, 2022
1 parent 22cf7ed commit 23ab5a0
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,6 @@ public static void setHeaders(ResteasyReactiveRequestContext context, ServerHttp

public static void setHeaders(ResteasyReactiveRequestContext context, ServerHttpResponse response,
List<PublisherResponseHandler.StreamingResponseCustomizer> customizers) {
// FIXME: spec says we should flush the headers when first message is sent or when the resource method returns, whichever
// happens first
if (!response.headWritten()) {
response.setStatusCode(Response.Status.OK.getStatusCode());
response.setResponseHeader(HttpHeaders.CONTENT_TYPE, MediaType.SERVER_SENT_EVENTS);
Expand All @@ -164,6 +162,7 @@ public static void setHeaders(ResteasyReactiveRequestContext context, ServerHttp
customizers.get(i).customize(response);
}
// FIXME: other headers?

}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package org.jboss.resteasy.reactive.server.handlers;

import static org.jboss.resteasy.reactive.server.jaxrs.SseEventSinkImpl.EMPTY_BUFFER;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import javax.ws.rs.core.MediaType;
import org.jboss.logging.Logger;
import org.jboss.resteasy.reactive.common.util.RestMediaType;
Expand Down Expand Up @@ -43,9 +47,9 @@ private static class SseMultiSubscriber extends AbstractMultiSubscriber {
@Override
public void onNext(Object item) {
OutboundSseEventImpl event = new OutboundSseEventImpl.BuilderImpl().data(item).build();
SseUtil.send(requestContext, event, customizers).handle(new BiFunction<Object, Throwable, Object>() {
SseUtil.send(requestContext, event, customizers).whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public Object apply(Object v, Throwable t) {
public void accept(Object v, Throwable t) {
if (t != null) {
// need to cancel because the exception didn't come from the Multi
subscription.cancel();
Expand All @@ -54,7 +58,6 @@ public Object apply(Object v, Throwable t) {
// send in the next item
subscription.request(1);
}
return null;
}
});
}
Expand Down Expand Up @@ -250,11 +253,11 @@ public void handle(ResteasyReactiveRequestContext requestContext) throws Excepti
requestContext.setResponseContentType(mediaType);
// this is the non-async return type
requestContext.setGenericReturnType(requestContext.getTarget().getReturnType());
// we have several possibilities here, but in all we suspend
requestContext.suspend();

if (mediaType.isCompatible(MediaType.SERVER_SENT_EVENTS_TYPE)) {
handleSse(requestContext, result);
} else {
requestContext.suspend();
boolean json = mediaType.toString().contains(JSON);
if (requiresChunkedStream(mediaType)) {
handleChunkedStreaming(requestContext, result, json);
Expand All @@ -279,7 +282,18 @@ private void handleStreaming(ResteasyReactiveRequestContext requestContext, Publ
}

private void handleSse(ResteasyReactiveRequestContext requestContext, Publisher<?> result) {
result.subscribe(new SseMultiSubscriber(requestContext, streamingResponseCustomizers));
SseUtil.setHeaders(requestContext, requestContext.serverResponse(), streamingResponseCustomizers);
requestContext.suspend();
requestContext.serverResponse().write(EMPTY_BUFFER, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
if (throwable == null) {
result.subscribe(new SseMultiSubscriber(requestContext, streamingResponseCustomizers));
} else {
requestContext.resume(throwable);
}
}
});
}

public interface StreamingResponseCustomizer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

public class SseEventSinkImpl implements SseEventSink {

private static final byte[] EMPTY_BUFFER = new byte[0];
public static final byte[] EMPTY_BUFFER = new byte[0];
private ResteasyReactiveRequestContext context;
private SseBroadcasterImpl broadcaster;
private boolean closed;
Expand Down

0 comments on commit 23ab5a0

Please sign in to comment.