Skip to content

Commit

Permalink
Merge pull request #30686 from geoand/#30681
Browse files Browse the repository at this point in the history
Don't fail send when a sse sink has been closed
  • Loading branch information
geoand authored Jan 30, 2023
2 parents dd48b4f + da98a10 commit 0d6eac7
Showing 1 changed file with 48 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.jboss.resteasy.reactive.server.jaxrs;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -37,8 +38,9 @@ public synchronized void onClose(Consumer<SseEventSink> onClose) {
public synchronized void register(SseEventSink sseEventSink) {
Objects.requireNonNull(sseEventSink);
checkClosed();
if (sseEventSink instanceof SseEventSinkImpl == false)
if (sseEventSink instanceof SseEventSinkImpl == false) {
throw new IllegalArgumentException("Can only work with Quarkus-REST instances: " + sseEventSink);
}
((SseEventSinkImpl) sseEventSink).register(this);
sinks.add(sseEventSink);
}
Expand All @@ -50,20 +52,62 @@ public synchronized CompletionStage<?> broadcast(OutboundSseEvent event) {
CompletableFuture<?>[] cfs = new CompletableFuture[sinks.size()];
for (int i = 0; i < sinks.size(); i++) {
SseEventSink sseEventSink = sinks.get(i);
cfs[i] = sseEventSink.send(event).toCompletableFuture();
CompletionStage<?> cs;
try {
cs = sseEventSink.send(event).exceptionally((t) -> {
// do not propagate the exception to the returned CF
// apparently, the goal is to close this sink and not report the error
// of the broadcast operation
notifyOnErrorListeners(sseEventSink, t);
return null;
});
} catch (Exception e) {
// do not propagate the exception to the returned CF
// apparently, the goal is to close this sink and not report the error
// of the broadcast operation
notifyOnErrorListeners(sseEventSink, e);
cs = CompletableFuture.completedFuture(null);
}
cfs[i] = cs.toCompletableFuture();
}
return CompletableFuture.allOf(cfs);
}

private void notifyOnErrorListeners(SseEventSink eventSink, Throwable throwable) {
// We have to notify close listeners if the SSE event output has been
// closed (either by client closing the connection (IOException) or by
// calling SseEventSink.close() (IllegalStateException) on the server
// side).
if (throwable instanceof IOException || throwable instanceof IllegalStateException) {
notifyOnCloseListeners(eventSink);
}
onErrorListeners.forEach(consumer -> {
consumer.accept(eventSink, throwable);
});
}

private void notifyOnCloseListeners(SseEventSink eventSink) {
// First remove the eventSink from the outputQueue to ensure that
// concurrent calls to this method will notify listeners only once for a
// given eventSink instance.
if (sinks.remove(eventSink)) {
onCloseListeners.forEach(consumer -> {
consumer.accept(eventSink);
});
}
}

private void checkClosed() {
if (isClosed)
if (isClosed) {
throw new IllegalStateException("Broadcaster has been closed");
}
}

@Override
public synchronized void close() {
if (isClosed)
if (isClosed) {
return;
}
isClosed = true;
for (SseEventSink sink : sinks) {
// this will in turn fire close events to our listeners
Expand Down

0 comments on commit 0d6eac7

Please sign in to comment.