From cf67cc1e6a89af4e92fa32bfe1fb73dd23c66b82 Mon Sep 17 00:00:00 2001 From: Martin Kouba Date: Fri, 12 Jul 2024 11:26:16 +0200 Subject: [PATCH] WebSockets Next: broadcasting fixes - intentionally ignore 'WebSocket is closed' failures - do not fail fast but collect all failures --- .../websockets/next/runtime/Endpoints.java | 2 +- .../next/runtime/WebSocketConnectionImpl.java | 16 ++++++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/Endpoints.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/Endpoints.java index 26bf46d6421a8..12a2b327fa6b1 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/Endpoints.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/Endpoints.java @@ -291,7 +291,7 @@ private static boolean isSecurityFailure(Throwable throwable) { || throwable instanceof ForbiddenException; } - private static boolean isWebSocketIsClosedFailure(Throwable throwable, WebSocketConnectionBase connection) { + static boolean isWebSocketIsClosedFailure(Throwable throwable, WebSocketConnectionBase connection) { if (!connection.isClosed()) { return false; } diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionImpl.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionImpl.java index d1d4cad07638e..de23dd4779d78 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionImpl.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionImpl.java @@ -219,18 +219,26 @@ public Uni sendPong(Buffer data) { throw new UnsupportedOperationException(); } - private Uni doSend(BiFunction> function, M message) { + private Uni doSend(BiFunction> sendFunction, M message) { Set connections = connectionManager.getConnections(generatedEndpointClass); if (connections.isEmpty()) { return Uni.createFrom().voidItem(); } List> unis = new ArrayList<>(connections.size()); for (WebSocketConnection connection : connections) { - if (connection.isOpen() && (filter == null || filter.test(connection))) { - unis.add(function.apply(connection, message)); + if (connection.isOpen() + && (filter == null || filter.test(connection))) { + unis.add(sendFunction.apply(connection, message) + // Intentionally ignore 'WebSocket is closed' failures + // It might happen that the connection is closed in the mean time + .onFailure(t -> Endpoints.isWebSocketIsClosedFailure(t, (WebSocketConnectionBase) connection)) + .recoverWithNull()); } } - return unis.isEmpty() ? Uni.createFrom().voidItem() : Uni.join().all(unis).andFailFast().replaceWithVoid(); + if (unis.isEmpty()) { + return Uni.createFrom().voidItem(); + } + return Uni.join().all(unis).andCollectFailures().replaceWithVoid(); } }