From 450681e17a86d19dcae88621dc8f6922598b1dac Mon Sep 17 00:00:00 2001 From: Martin Kouba Date: Wed, 26 Jun 2024 13:46:27 +0200 Subject: [PATCH] WebSockets Next: get rid of UniHelper#toUni() - and replace it with Uni.createFrom().completionStage(Supplier) - we need lazy subscription of the produced Uni --- .../UnhandledMessageFailureLogStrategyTest.java | 2 +- .../next/runtime/BasicWebSocketConnectorImpl.java | 3 +-- .../next/runtime/WebSocketConnectionBase.java | 12 ++++++------ .../next/runtime/WebSocketConnectorImpl.java | 3 +-- .../next/runtime/WebSocketEndpointBase.java | 3 +-- .../runtime/devui/WebSocketNextJsonRPCService.java | 10 +++++----- 6 files changed, 15 insertions(+), 18 deletions(-) diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/UnhandledMessageFailureLogStrategyTest.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/UnhandledMessageFailureLogStrategyTest.java index 1b047d03e5bd7..664cbf1caf286 100644 --- a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/UnhandledMessageFailureLogStrategyTest.java +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/UnhandledMessageFailureLogStrategyTest.java @@ -38,7 +38,7 @@ void testError() throws InterruptedException { .connectAndAwait(); connection.sendTextAndAwait("foo"); assertFalse(connection.isClosed()); - connection.sendText("bar"); + connection.sendTextAndAwait("bar"); assertTrue(ClientMessageErrorEndpoint.MESSAGE_LATCH.await(5, TimeUnit.SECONDS)); assertEquals("bar", ClientMessageErrorEndpoint.MESSAGES.get(0)); } diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/BasicWebSocketConnectorImpl.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/BasicWebSocketConnectorImpl.java index fb33732ca6c47..620369dddf0f4 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/BasicWebSocketConnectorImpl.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/BasicWebSocketConnectorImpl.java @@ -22,7 +22,6 @@ import io.quarkus.websockets.next.WebSocketClientException; import io.quarkus.websockets.next.WebSocketsClientRuntimeConfig; import io.smallrye.mutiny.Uni; -import io.smallrye.mutiny.vertx.UniHelper; import io.vertx.core.Context; import io.vertx.core.Handler; import io.vertx.core.Vertx; @@ -144,7 +143,7 @@ public Uni connect() { throw new WebSocketClientException(e); } - return UniHelper.toUni(client.connect(connectOptions)) + return Uni.createFrom().completionStage(() -> client.connect(connectOptions).toCompletionStage()) .map(ws -> { String clientId = BasicWebSocketConnector.class.getName(); TrafficLogger trafficLogger = TrafficLogger.forClient(config); diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionBase.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionBase.java index 4a0749a4f875c..3b5694e9ac8c6 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionBase.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionBase.java @@ -11,7 +11,6 @@ import io.quarkus.websockets.next.HandshakeRequest; import io.quarkus.websockets.next.WebSocketConnection.BroadcastSender; import io.smallrye.mutiny.Uni; -import io.smallrye.mutiny.vertx.UniHelper; import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.impl.BufferImpl; import io.vertx.core.http.WebSocketBase; @@ -55,14 +54,14 @@ public String pathParam(String name) { } public Uni sendText(String message) { - Uni uni = UniHelper.toUni(webSocket().writeTextMessage(message)); + Uni uni = Uni.createFrom().completionStage(() -> webSocket().writeTextMessage(message).toCompletionStage()); return trafficLogger == null ? uni : uni.invoke(() -> { trafficLogger.textMessageSent(this, message); }); } public Uni sendBinary(Buffer message) { - Uni uni = UniHelper.toUni(webSocket().writeBinaryMessage(message)); + Uni uni = Uni.createFrom().completionStage(() -> webSocket().writeBinaryMessage(message).toCompletionStage()); return trafficLogger == null ? uni : uni.invoke(() -> trafficLogger.binaryMessageSent(this, message)); } @@ -81,7 +80,7 @@ public Uni sendText(M message) { } public Uni sendPing(Buffer data) { - return UniHelper.toUni(webSocket().writePing(data)); + return Uni.createFrom().completionStage(() -> webSocket().writePing(data).toCompletionStage()); } void sendAutoPing() { @@ -93,7 +92,7 @@ void sendAutoPing() { } public Uni sendPong(Buffer data) { - return UniHelper.toUni(webSocket().writePong(data)); + return Uni.createFrom().completionStage(() -> webSocket().writePong(data).toCompletionStage()); } public Uni close() { @@ -105,7 +104,8 @@ public Uni close(CloseReason reason) { LOG.warnf("Connection already closed: %s", this); return Uni.createFrom().voidItem(); } - return UniHelper.toUni(webSocket().close((short) reason.getCode(), reason.getMessage())); + return Uni.createFrom() + .completionStage(() -> webSocket().close((short) reason.getCode(), reason.getMessage()).toCompletionStage()); } public boolean isSecure() { diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectorImpl.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectorImpl.java index 359a400f5160a..cb04b5cbb61ad 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectorImpl.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectorImpl.java @@ -23,7 +23,6 @@ import io.quarkus.websockets.next.runtime.WebSocketClientRecorder.ClientEndpoint; import io.quarkus.websockets.next.runtime.WebSocketClientRecorder.ClientEndpointsContext; import io.smallrye.mutiny.Uni; -import io.smallrye.mutiny.vertx.UniHelper; import io.vertx.core.Vertx; import io.vertx.core.http.WebSocketClient; import io.vertx.core.http.WebSocketConnectOptions; @@ -92,7 +91,7 @@ public Uni connect() { } subprotocols.forEach(connectOptions::addSubProtocol); - return UniHelper.toUni(client.connect(connectOptions)) + return Uni.createFrom().completionStage(() -> client.connect(connectOptions).toCompletionStage()) .map(ws -> { TrafficLogger trafficLogger = TrafficLogger.forClient(config); WebSocketClientConnectionImpl connection = new WebSocketClientConnectionImpl(clientEndpoint.clientId, ws, diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketEndpointBase.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketEndpointBase.java index 5a1874647a0e3..937532cd52636 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketEndpointBase.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketEndpointBase.java @@ -18,7 +18,6 @@ import io.quarkus.websockets.next.runtime.ConcurrencyLimiter.PromiseComplete; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; -import io.smallrye.mutiny.vertx.UniHelper; import io.vertx.core.Context; import io.vertx.core.Future; import io.vertx.core.Handler; @@ -256,7 +255,7 @@ public void handle(Void event) { } } }); - return UniHelper.toUni(promise.future()); + return Uni.createFrom().completionStage(() -> promise.future().toCompletionStage()); } public Object beanInstance() { diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/devui/WebSocketNextJsonRPCService.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/devui/WebSocketNextJsonRPCService.java index 5df2e1d395b28..5878b1e6950fd 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/devui/WebSocketNextJsonRPCService.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/devui/WebSocketNextJsonRPCService.java @@ -23,7 +23,6 @@ import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor; -import io.smallrye.mutiny.vertx.UniHelper; import io.vertx.core.Vertx; import io.vertx.core.http.WebSocket; import io.vertx.core.http.WebSocketClient; @@ -110,12 +109,13 @@ public Uni openDevConnection(String path, String endpointPath) { } WebSocketClient client = vertx.createWebSocketClient(); String connectionKey = UUID.randomUUID().toString(); - Uni uni = UniHelper.toUni(client + Uni uni = Uni.createFrom().completionStage(() -> client .connect(new WebSocketConnectOptions() .setPort(httpConfig.port) .setHost(httpConfig.host) .setURI(path) - .addHeader(DEVUI_SOCKET_KEY_HEADER, connectionKey))); + .addHeader(DEVUI_SOCKET_KEY_HEADER, connectionKey)) + .toCompletionStage()); return uni.onItem().transform(s -> { LOG.debugf("Opened Dev UI connection with key %s to %s", connectionKey, path); List messages = new ArrayList<>(); @@ -181,7 +181,7 @@ private static String normalize(String path) { public Uni closeDevConnection(String connectionKey) { DevWebSocket socket = sockets.remove(connectionKey); if (socket != null) { - Uni uni = UniHelper.toUni(socket.socket.close()); + Uni uni = Uni.createFrom().completionStage(() -> socket.socket.close().toCompletionStage()); return uni.onItem().transform(v -> { LOG.debugf("Closed Dev UI connection with key %s", connectionKey); return new JsonObject().put("success", true); @@ -196,7 +196,7 @@ public Uni closeDevConnection(String connectionKey) { public Uni sendTextMessage(String connectionKey, String message) { DevWebSocket socket = sockets.get(connectionKey); if (socket != null) { - Uni uni = UniHelper.toUni(socket.socket.writeTextMessage(message)); + Uni uni = Uni.createFrom().completionStage(() -> socket.socket.writeTextMessage(message).toCompletionStage()); return uni.onItem().transform(v -> { List messages = socket.messages; synchronized (messages) {