From b8502d8f0152fd6a801e16c1c6f98d015b16c619 Mon Sep 17 00:00:00 2001 From: Martin Kouba Date: Tue, 2 Apr 2024 18:30:27 +0200 Subject: [PATCH] WebSockets Next: error handlers part 4 - use error handlers to process Mutiny Multi failures - update docs --- .../asciidoc/websockets-next-reference.adoc | 52 +++++++-- .../deployment/WebSocketServerProcessor.java | 58 ++++++---- ...nClosePathParamConnectionArgumentTest.java | 68 +++++++++++ ...OnOpenPathParamConnectionArgumentTest.java | 54 +++++++++ .../errors/MultiBinaryDecodeErrorTest.java | 63 +++++++++++ .../errors/MultiBinaryEncodeErrorTest.java | 63 +++++++++++ .../MultiFailureCloseConnectionTest.java | 70 ++++++++++++ .../next/test/errors/MultiFailureTest.java | 63 +++++++++++ .../test/errors/MultiTextDecodeErrorTest.java | 76 +++++++++++++ .../test/errors/MultiTextEncodeErrorTest.java | 107 ++++++++++++++++++ ...tipleAmbiguousGlobalErrorHandlersTest.java | 2 + .../io/quarkus/websockets/next/OnClose.java | 11 ++ .../next/runtime/WebSocketEndpoint.java | 3 + .../next/runtime/WebSocketEndpointBase.java | 23 ++-- .../next/runtime/WebSocketServerRecorder.java | 26 ++++- 15 files changed, 689 insertions(+), 50 deletions(-) create mode 100644 extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/args/OnClosePathParamConnectionArgumentTest.java create mode 100644 extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/args/OnOpenPathParamConnectionArgumentTest.java create mode 100644 extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/MultiBinaryDecodeErrorTest.java create mode 100644 extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/MultiBinaryEncodeErrorTest.java create mode 100644 extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/MultiFailureCloseConnectionTest.java create mode 100644 extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/MultiFailureTest.java create mode 100644 extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/MultiTextDecodeErrorTest.java create mode 100644 extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/MultiTextEncodeErrorTest.java diff --git a/docs/src/main/asciidoc/websockets-next-reference.adoc b/docs/src/main/asciidoc/websockets-next-reference.adoc index f2b1a92d6960b..5387561323b5f 100644 --- a/docs/src/main/asciidoc/websockets-next-reference.adoc +++ b/docs/src/main/asciidoc/websockets-next-reference.adoc @@ -3,7 +3,8 @@ This guide is maintained in the main Quarkus repository and pull requests should be submitted there: https://github.com/quarkusio/quarkus/tree/main/docs/src/main/asciidoc //// -= WebSockets-Next extension reference guide +[id="websockets-next-reference-guide"] += WebSockets Next extension reference guide :extension-status: preview include::_attributes.adoc[] :numbered: @@ -12,7 +13,8 @@ include::_attributes.adoc[] :topics: web,websockets :extensions: io.quarkus:quarkus-websockets-next -IMPORTANT: The `websockets-next` extension is experimental. The proposal API may change in future releases. +The `websockets-next` extension provides an experimental API to define _WebSocket_ endpoints declaratively. +The proposed API may change in future releases. == The WebSocket protocol @@ -193,6 +195,7 @@ A WebSocket endpoint comprises the following components: * At most one `@OnBinaryMessage` method: Handles the binary messages the connected client sends. * At most one `@OnOpen` method: Invoked when a client connects to the WebSocket. * At most one `@OnClose` method: Executed upon the client disconnecting from the WebSocket. +* Any number of `@OnError` methods: Invoked when an error occurs; that is when an endpoint callback throws a runtime error, or when a conversion errors occurs, or when a returned `io.smallrye.mutiny.Uni`/`io.smallrye.mutiny.Multi` receives a failure. Only some endpoints need to include all methods. However, it must contain at least `@On[Text|Binary]Message` or `@OnOpen`. @@ -222,23 +225,26 @@ Here are the rules governing execution: * Methods annotated with `@RunOnVirtualThread` are considered blocking and should execute on a virtual thread. * Blocking methods must execute on a worker thread if not annotated with `@RunOnVirtualThread`. * When `@RunOnVirtualThread` is employed, each invocation spawns a new virtual thread. -* Methods returning `CompletionStage` and `Uni` are considered non-blocking -* Methods returning `Multi` are considered non-blocking and must be subscribed to, except if they return their own `Multi`. +* Methods returning `CompletionStage`, `Uni` and `Multi` are considered non-blocking. * Methods returning `void` or plain objects are considered blocking. -=== Parameters +=== Method Parameters -These methods can accept parameters in two formats: +The method must accept exactly one message parameter: * The message object (of any type). * A `Multi` with X as the message type. -* Any other parameters should be flagged as errors. +However, it may also accept the following parameters: + + * `WebSocketConnection` + * `HandshakeRequest` + * `String` parameters annotated with `@PathParam` + The message object represents the data sent and can be accessed as either raw content (`String`, `JsonObject`, `JsonArray`, `Buffer` or `byte[]`) or deserialized high-level objects, which is the recommended approach. When receiving a `Multi`, the method is invoked once per connection, and the provided `Multi` receives the items transmitted by this connection. The method must subscribe to the `Multi` to receive these items (or return a Multi). -Cancelling this subscription closes the associated connection. === Allowed Returned Types @@ -324,7 +330,7 @@ When a method is intended to produce a message written to the client, it can emi Emitting `null` signifies no response to be sent to the client, allowing for skipping a response when needed. === JsonObject and JsonArray -Vert.x `JSONObject` and `JSONArray` instances bypass the serialization and deserialization mechanisms. +Vert.x `JsonObject` and `JsonArray` instances bypass the serialization and deserialization mechanisms. Messages are sent as text messages. === Broadcasting @@ -341,6 +347,8 @@ String emitToAll(String message) { The same principle applies to methods returning instances of `Multi` or `Uni`. +NOTE: If you need to select the connected clients that should receive the message, you can use `WebSocketConnection.broadcast().filter().sendText()`. + == OnOpen and OnClose methods The WebSocket endpoint can also be notified when a client connects or disconnects. @@ -369,8 +377,11 @@ These methods have access to the _session-scoped_ `WebSocketConnection` bean. === Parameters -Methods annotated with `@OnOpen` and `@OnClose` do not accept any parameters. -If such methods declare parameters, they will be flagged as errors and reported at build time. +Methods annotated with `@OnOpen` and `@OnClose` may accept the following parameters: + + * `WebSocketConnection` + * `HandshakeRequest` + * `String` parameters annotated with `@PathParam` === Allowed Returned Types @@ -425,6 +436,25 @@ String onOpen() { } ---- +== Error Handling + +The WebSocket endpoint can also be notified when an error occurs. +A WebSocket endpoint method annotated with `@io.quarkus.websockets.next.OnError` is invoked when an endpoint callback throws a runtime error, or when a conversion errors occurs, +or when a returned `io.smallrye.mutiny.Uni`/`io.smallrye.mutiny.Multi` receives a failure. + +The method must accept exactly one "error" parameter, i.e. a parameter that is assignable from `java.lang.Throwable`. +The method may also accept the following parameters: + +* `WebSocketConnection` +* `HandshakeRequest` +* `String` parameters annotated with `@PathParam` + +An endpoint may declare multiple methods annotated with `@io.quarkus.websockets.next.OnError`. +However, each method must declare a different error parameter. +The method that declares a most-specific supertype of the actual exception is selected. + +NOTE: The `@io.quarkus.websockets.next.OnError` annotation can be also used to declare a global error handler, i.e. a method that is not declared on a WebSocket endpoint. Such a method may not accept `@PathParam` paremeters. Error handlers declared on an endpoint take precedence over the global error handlers. + == Access to the WebSocketConnection The `io.quarkus.websockets.next.WebSocketConnection` object represents the WebSocket connection. diff --git a/extensions/websockets-next/server/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketServerProcessor.java b/extensions/websockets-next/server/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketServerProcessor.java index 6e58aabf0436b..64c430a41c071 100644 --- a/extensions/websockets-next/server/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketServerProcessor.java +++ b/extensions/websockets-next/server/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketServerProcessor.java @@ -463,7 +463,7 @@ private String generateEndpoint(WebSocketEndpointBuildItem endpoint, MethodDescriptor.ofMethod(WebSocketEndpointBase.class, "beanInstance", Object.class, String.class), doOnOpen.getThis(), doOnOpen.load(endpoint.bean.getIdentifier())); // Call the business method - TryBlock tryBlock = onErrorTryBlock(doOnOpen); + TryBlock tryBlock = onErrorTryBlock(doOnOpen, doOnOpen.getThis()); ResultHandle[] args = callback.generateArguments(tryBlock.getThis(), tryBlock, transformedAnnotations, index); ResultHandle ret = tryBlock.invokeVirtualMethod(MethodDescriptor.of(callback.method), beanInstance, args); encodeAndReturnResult(tryBlock.getThis(), tryBlock, callback, globalErrorHandlers, endpoint, ret); @@ -488,7 +488,7 @@ private String generateEndpoint(WebSocketEndpointBuildItem endpoint, MethodDescriptor.ofMethod(WebSocketEndpointBase.class, "beanInstance", Object.class, String.class), doOnClose.getThis(), doOnClose.load(endpoint.bean.getIdentifier())); // Call the business method - TryBlock tryBlock = onErrorTryBlock(doOnClose); + TryBlock tryBlock = onErrorTryBlock(doOnClose, doOnClose.getThis()); ResultHandle[] args = callback.generateArguments(tryBlock.getThis(), tryBlock, transformedAnnotations, index); ResultHandle ret = tryBlock.invokeVirtualMethod(MethodDescriptor.of(callback.method), beanInstance, args); encodeAndReturnResult(tryBlock.getThis(), tryBlock, callback, globalErrorHandlers, endpoint, ret); @@ -632,7 +632,7 @@ private void generateOnMessage(ClassCreator endpointCreator, WebSocketEndpointBu MethodCreator doOnMessage = endpointCreator.getMethodCreator("doOn" + messageType + "Message", Uni.class, methodParameterType); - TryBlock tryBlock = onErrorTryBlock(doOnMessage); + TryBlock tryBlock = onErrorTryBlock(doOnMessage, doOnMessage.getThis()); // Foo foo = beanInstance("foo"); ResultHandle beanInstance = tryBlock.invokeVirtualMethod( MethodDescriptor.ofMethod(WebSocketEndpointBase.class, "beanInstance", Object.class, String.class), @@ -673,13 +673,13 @@ private TryBlock uniFailureTryBlock(BytecodeCreator method) { return tryBlock; } - private TryBlock onErrorTryBlock(BytecodeCreator method) { + private TryBlock onErrorTryBlock(BytecodeCreator method, ResultHandle endpointThis) { TryBlock tryBlock = method.tryBlock(); CatchBlockCreator catchBlock = tryBlock.addCatch(Throwable.class); // return doOnError(t); catchBlock.returnValue(catchBlock.invokeVirtualMethod( MethodDescriptor.ofMethod(WebSocketEndpointBase.class, "doOnError", Uni.class, Throwable.class), - catchBlock.getThis(), catchBlock.getCaughtException())); + endpointThis, catchBlock.getCaughtException())); return tryBlock; } @@ -810,23 +810,28 @@ private ResultHandle encodeMessage(ResultHandle endpointThis, BytecodeCreator me return uniOnFailureDoOnError(endpointThis, method, callback, uniChain, endpoint, globalErrorHandlers); } } else if (callback.isReturnTypeMulti()) { - // return multiBinary(multi, broadcast, m -> { - // Buffer buffer = encodeBuffer(m); - // return sendBinary(buffer,broadcast); - //}); + // try { + // Buffer buffer = encodeBuffer(m); + // return sendBinary(buffer,broadcast); + // } catch(Throwable t) { + // return doOnError(t); + // } FunctionCreator fun = method.createFunction(Function.class); BytecodeCreator funBytecode = fun.getBytecode(); - ResultHandle buffer = encodeBuffer(funBytecode, callback.returnType().asParameterizedType().arguments().get(0), - funBytecode.getMethodParam(0), endpointThis, callback); - funBytecode.returnValue(funBytecode.invokeVirtualMethod( + // This checkcast should not be necessary but we need to use the endpoint in the function bytecode + // otherwise gizmo does not access the endpoint reference correcly + ResultHandle endpointBase = funBytecode.checkCast(endpointThis, WebSocketEndpointBase.class); + TryBlock tryBlock = onErrorTryBlock(fun.getBytecode(), endpointBase); + ResultHandle buffer = encodeBuffer(tryBlock, callback.returnType().asParameterizedType().arguments().get(0), + tryBlock.getMethodParam(0), endpointThis, callback); + tryBlock.returnValue(tryBlock.invokeVirtualMethod( MethodDescriptor.ofMethod(WebSocketEndpointBase.class, "sendBinary", Uni.class, Buffer.class, boolean.class), endpointThis, buffer, - funBytecode.load(callback.broadcast()))); + tryBlock.load(callback.broadcast()))); return method.invokeVirtualMethod(MethodDescriptor.ofMethod(WebSocketEndpointBase.class, - "multiBinary", Uni.class, Multi.class, boolean.class, Function.class), endpointThis, + "multiBinary", Uni.class, Multi.class, Function.class), endpointThis, value, - method.load(callback.broadcast()), fun.getInstance()); } else { // return sendBinary(buffer,broadcast); @@ -865,22 +870,29 @@ private ResultHandle encodeMessage(ResultHandle endpointThis, BytecodeCreator me } } else if (callback.isReturnTypeMulti()) { // return multiText(multi, broadcast, m -> { - // String text = encodeText(m); - // return sendText(buffer,broadcast); + // try { + // String text = encodeText(m); + // return sendText(buffer,broadcast); + // } catch(Throwable t) { + // return doOnError(t); + // } //}); FunctionCreator fun = method.createFunction(Function.class); BytecodeCreator funBytecode = fun.getBytecode(); - ResultHandle text = encodeText(funBytecode, callback.returnType().asParameterizedType().arguments().get(0), - funBytecode.getMethodParam(0), endpointThis, callback); - funBytecode.returnValue(funBytecode.invokeVirtualMethod( + // This checkcast should not be necessary but we need to use the endpoint in the function bytecode + // otherwise gizmo does not access the endpoint reference correcly + ResultHandle endpointBase = funBytecode.checkCast(endpointThis, WebSocketEndpointBase.class); + TryBlock tryBlock = onErrorTryBlock(fun.getBytecode(), endpointBase); + ResultHandle text = encodeText(tryBlock, callback.returnType().asParameterizedType().arguments().get(0), + tryBlock.getMethodParam(0), endpointThis, callback); + tryBlock.returnValue(tryBlock.invokeVirtualMethod( MethodDescriptor.ofMethod(WebSocketEndpointBase.class, "sendText", Uni.class, String.class, boolean.class), endpointThis, text, - funBytecode.load(callback.broadcast()))); + tryBlock.load(callback.broadcast()))); return method.invokeVirtualMethod(MethodDescriptor.ofMethod(WebSocketEndpointBase.class, - "multiText", Uni.class, Multi.class, boolean.class, Function.class), endpointThis, + "multiText", Uni.class, Multi.class, Function.class), endpointThis, value, - method.load(callback.broadcast()), fun.getInstance()); } else { // return sendText(text,broadcast); diff --git a/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/args/OnClosePathParamConnectionArgumentTest.java b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/args/OnClosePathParamConnectionArgumentTest.java new file mode 100644 index 0000000000000..2fe500adbd50c --- /dev/null +++ b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/args/OnClosePathParamConnectionArgumentTest.java @@ -0,0 +1,68 @@ +package io.quarkus.websockets.next.test.args; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.OnClose; +import io.quarkus.websockets.next.OnOpen; +import io.quarkus.websockets.next.PathParam; +import io.quarkus.websockets.next.WebSocket; +import io.quarkus.websockets.next.WebSocketConnection; +import io.quarkus.websockets.next.test.utils.WSClient; +import io.vertx.core.Vertx; +import io.vertx.core.http.WebSocketConnectOptions; + +public class OnClosePathParamConnectionArgumentTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(MontyEcho.class, WSClient.class); + }); + + @Inject + Vertx vertx; + + @TestHTTPResource("echo/monty/and/foo") + URI testUri; + + @Test + void testArguments() throws InterruptedException { + String header = "fool"; + WSClient client = WSClient.create(vertx).connect(new WebSocketConnectOptions().addHeader("X-Test", header), testUri); + client.disconnect(); + assertTrue(MontyEcho.CLOSED_LATCH.await(5, TimeUnit.SECONDS)); + assertEquals("foo:monty:fool", MontyEcho.CLOSED_MESSAGE.get()); + } + + @WebSocket(path = "/echo/{grail}/and/{life}") + public static class MontyEcho { + + static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1); + static final AtomicReference CLOSED_MESSAGE = new AtomicReference<>(); + + @OnOpen + void open() { + } + + @OnClose + void close(@PathParam String life, @PathParam String grail, WebSocketConnection connection) { + CLOSED_MESSAGE.set(life + ":" + grail + ":" + connection.handshakeRequest().header("X-Test")); + CLOSED_LATCH.countDown(); + } + + } + +} diff --git a/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/args/OnOpenPathParamConnectionArgumentTest.java b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/args/OnOpenPathParamConnectionArgumentTest.java new file mode 100644 index 0000000000000..97a8c7e1ca7f9 --- /dev/null +++ b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/args/OnOpenPathParamConnectionArgumentTest.java @@ -0,0 +1,54 @@ +package io.quarkus.websockets.next.test.args; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.net.URI; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.OnOpen; +import io.quarkus.websockets.next.PathParam; +import io.quarkus.websockets.next.WebSocket; +import io.quarkus.websockets.next.WebSocketConnection; +import io.quarkus.websockets.next.test.utils.WSClient; +import io.vertx.core.Vertx; +import io.vertx.core.http.WebSocketConnectOptions; + +public class OnOpenPathParamConnectionArgumentTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(MontyEcho.class, WSClient.class); + }); + + @Inject + Vertx vertx; + + @TestHTTPResource("echo/monty/and/foo") + URI testUri; + + @Test + void testArguments() { + String header = "fool"; + WSClient client = WSClient.create(vertx).connect(new WebSocketConnectOptions().addHeader("X-Test", header), testUri); + client.waitForMessages(1); + assertEquals("foo:monty:fool", client.getMessages().get(0).toString()); + } + + @WebSocket(path = "/echo/{grail}/and/{life}") + public static class MontyEcho { + + @OnOpen + String process(@PathParam String life, @PathParam String grail, WebSocketConnection connection) { + return life + ":" + grail + ":" + connection.handshakeRequest().header("X-Test"); + } + + } + +} diff --git a/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/MultiBinaryDecodeErrorTest.java b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/MultiBinaryDecodeErrorTest.java new file mode 100644 index 0000000000000..e14a37c3c990c --- /dev/null +++ b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/MultiBinaryDecodeErrorTest.java @@ -0,0 +1,63 @@ +package io.quarkus.websockets.next.test.errors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.BinaryDecodeException; +import io.quarkus.websockets.next.OnBinaryMessage; +import io.quarkus.websockets.next.OnError; +import io.quarkus.websockets.next.WebSocket; +import io.quarkus.websockets.next.test.utils.WSClient; +import io.smallrye.mutiny.Multi; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.mutiny.core.Context; + +public class MultiBinaryDecodeErrorTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(Echo.class, WSClient.class); + }); + + @Inject + Vertx vertx; + + @TestHTTPResource("echo") + URI testUri; + + @Test + void testError() { + WSClient client = WSClient.create(vertx).connect(testUri); + client.send(Buffer.buffer("1")); + client.waitForMessages(1); + assertEquals("Problem decoding: 1", client.getLastMessage().toString()); + } + + @WebSocket(path = "/echo") + public static class Echo { + + @OnBinaryMessage + Multi process(Multi messages) { + return messages; + } + + @OnError + String decodingError(BinaryDecodeException e) { + assertTrue(Context.isOnWorkerThread()); + return "Problem decoding: " + e.getBytes().toString(); + } + + } + +} diff --git a/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/MultiBinaryEncodeErrorTest.java b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/MultiBinaryEncodeErrorTest.java new file mode 100644 index 0000000000000..979feeb4dc737 --- /dev/null +++ b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/MultiBinaryEncodeErrorTest.java @@ -0,0 +1,63 @@ +package io.quarkus.websockets.next.test.errors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.BinaryEncodeException; +import io.quarkus.websockets.next.OnBinaryMessage; +import io.quarkus.websockets.next.OnError; +import io.quarkus.websockets.next.WebSocket; +import io.quarkus.websockets.next.test.utils.WSClient; +import io.smallrye.mutiny.Multi; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.mutiny.core.Context; + +public class MultiBinaryEncodeErrorTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(Echo.class, WSClient.class); + }); + + @Inject + Vertx vertx; + + @TestHTTPResource("echo") + URI testUri; + + @Test + void testError() { + WSClient client = WSClient.create(vertx).connect(testUri); + client.send(Buffer.buffer("1")); + client.waitForMessages(1); + assertEquals("Problem encoding: 1", client.getLastMessage().toString()); + } + + @WebSocket(path = "/echo") + public static class Echo { + + @OnBinaryMessage + Multi process(Buffer message) { + return Multi.createFrom().item(Integer.parseInt(message.toString())); + } + + @OnError + String encodingError(BinaryEncodeException e) { + assertTrue(Context.isOnWorkerThread()); + return "Problem encoding: " + e.getEncodedObject().toString(); + } + + } + +} diff --git a/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/MultiFailureCloseConnectionTest.java b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/MultiFailureCloseConnectionTest.java new file mode 100644 index 0000000000000..660e9192f1e02 --- /dev/null +++ b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/MultiFailureCloseConnectionTest.java @@ -0,0 +1,70 @@ +package io.quarkus.websockets.next.test.errors; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.net.URI; +import java.time.Duration; + +import jakarta.inject.Inject; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.OnError; +import io.quarkus.websockets.next.OnTextMessage; +import io.quarkus.websockets.next.WebSocket; +import io.quarkus.websockets.next.WebSocketConnection; +import io.quarkus.websockets.next.test.utils.WSClient; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.vertx.core.Vertx; + +public class MultiFailureCloseConnectionTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(Echo.class, WSClient.class); + }); + + @Inject + Vertx vertx; + + @TestHTTPResource("echo") + URI testUri; + + @Test + void testError() { + WSClient client = WSClient.create(vertx).connect(testUri); + client.sendAndAwait("bar,foo,baz"); + // "bar" should be sent back + client.waitForMessages(1); + // "foo" results in a failure -> connection closed + Awaitility.await().atMost(Duration.ofSeconds(5)).until(() -> client.isClosed()); + // "foo" and "baz" should never be sent back + assertEquals(1, client.getMessages().size()); + } + + @WebSocket(path = "/echo") + public static class Echo { + + @OnTextMessage + Multi process(String message) { + return Multi.createFrom().items(message.split(",")).invoke(s -> { + if (s.equals("foo")) { + throw new IllegalArgumentException(); + } + }); + } + + @OnError + Uni runtimeProblem(IllegalArgumentException e, WebSocketConnection connection) { + return connection.close(); + } + + } + +} diff --git a/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/MultiFailureTest.java b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/MultiFailureTest.java new file mode 100644 index 0000000000000..6abf64c6194dc --- /dev/null +++ b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/MultiFailureTest.java @@ -0,0 +1,63 @@ +package io.quarkus.websockets.next.test.errors; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.net.URI; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.OnError; +import io.quarkus.websockets.next.OnTextMessage; +import io.quarkus.websockets.next.WebSocket; +import io.quarkus.websockets.next.test.utils.WSClient; +import io.smallrye.mutiny.Multi; +import io.vertx.core.Vertx; + +public class MultiFailureTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(Echo.class, WSClient.class); + }); + + @Inject + Vertx vertx; + + @TestHTTPResource("echo") + URI testUri; + + @Test + void testError() { + WSClient client = WSClient.create(vertx).connect(testUri); + client.sendAndAwait("bar,foo,baz"); + client.waitForMessages(2); + assertEquals("bar", client.getMessages().get(0).toString()); + assertEquals("foo detected", client.getMessages().get(1).toString()); + } + + @WebSocket(path = "/echo") + public static class Echo { + + @OnTextMessage + Multi process(String message) { + return Multi.createFrom().items(message.split(",")).invoke(s -> { + if (s.equals("foo")) { + throw new IllegalArgumentException(); + } + }); + } + + @OnError + String runtimeProblem(IllegalArgumentException e) { + return "foo detected"; + } + + } + +} diff --git a/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/MultiTextDecodeErrorTest.java b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/MultiTextDecodeErrorTest.java new file mode 100644 index 0000000000000..049e6ffc06a5a --- /dev/null +++ b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/MultiTextDecodeErrorTest.java @@ -0,0 +1,76 @@ +package io.quarkus.websockets.next.test.errors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.OnError; +import io.quarkus.websockets.next.OnTextMessage; +import io.quarkus.websockets.next.TextDecodeException; +import io.quarkus.websockets.next.WebSocket; +import io.quarkus.websockets.next.test.utils.WSClient; +import io.smallrye.mutiny.Multi; +import io.vertx.core.Vertx; +import io.vertx.mutiny.core.Context; + +public class MultiTextDecodeErrorTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(Echo.class, WSClient.class); + }); + + @Inject + Vertx vertx; + + @TestHTTPResource("echo") + URI testUri; + + @Test + void testError() { + WSClient client = WSClient.create(vertx).connect(testUri); + client.send("not a json"); + client.waitForMessages(1); + assertEquals("Problem decoding: not a json", client.getLastMessage().toString()); + } + + @WebSocket(path = "/echo") + public static class Echo { + + @OnTextMessage + Multi process(Multi pojos) { + return pojos; + } + + @OnError + String decodingError(TextDecodeException e) { + assertTrue(Context.isOnWorkerThread()); + return "Problem decoding: " + e.getText(); + } + + } + + public static class Pojo { + + private String name; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + } + +} diff --git a/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/MultiTextEncodeErrorTest.java b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/MultiTextEncodeErrorTest.java new file mode 100644 index 0000000000000..7df746edd1662 --- /dev/null +++ b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/MultiTextEncodeErrorTest.java @@ -0,0 +1,107 @@ +package io.quarkus.websockets.next.test.errors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.lang.reflect.Type; +import java.net.URI; + +import jakarta.annotation.Priority; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.OnError; +import io.quarkus.websockets.next.OnTextMessage; +import io.quarkus.websockets.next.TextEncodeException; +import io.quarkus.websockets.next.TextMessageCodec; +import io.quarkus.websockets.next.WebSocket; +import io.quarkus.websockets.next.test.utils.WSClient; +import io.smallrye.mutiny.Multi; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonObject; +import io.vertx.mutiny.core.Context; + +public class MultiTextEncodeErrorTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(Echo.class, WSClient.class); + }); + + @Inject + Vertx vertx; + + @TestHTTPResource("echo") + URI testUri; + + @Test + void testError() { + WSClient client = WSClient.create(vertx).connect(testUri); + client.send(new JsonObject().put("name", "Fixa").encode()); + client.waitForMessages(1); + assertEquals("java.lang.IllegalArgumentException:Fixa", client.getLastMessage().toString()); + } + + @WebSocket(path = "/echo") + public static class Echo { + + @OnTextMessage(outputCodec = BadCodec.class) + Multi process(Pojo pojo) { + return Multi.createFrom().item(pojo); + } + + @OnError + String encodingError(TextEncodeException e) { + assertTrue(Context.isOnWorkerThread()); + return e.getCause().toString() + ":" + e.getEncodedObject().toString(); + } + + } + + @Priority(-1) // Let the JsonTextMessageCodec decode the pojo + @Singleton + public static class BadCodec implements TextMessageCodec { + + @Override + public boolean supports(Type type) { + return type.equals(Pojo.class); + } + + @Override + public String encode(Pojo value) { + throw new IllegalArgumentException(); + } + + @Override + public Pojo decode(Type type, String value) { + throw new UnsupportedOperationException(); + } + + } + + public static class Pojo { + + private String name; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + @Override + public String toString() { + return name; + } + + } + +} diff --git a/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/MultipleAmbiguousGlobalErrorHandlersTest.java b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/MultipleAmbiguousGlobalErrorHandlersTest.java index 39c39cdff3b2d..08751fc65f512 100644 --- a/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/MultipleAmbiguousGlobalErrorHandlersTest.java +++ b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/MultipleAmbiguousGlobalErrorHandlersTest.java @@ -7,6 +7,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import io.quarkus.arc.Unremovable; import io.quarkus.test.QuarkusUnitTest; import io.quarkus.websockets.next.OnError; import io.quarkus.websockets.next.WebSocketServerException; @@ -25,6 +26,7 @@ void testMultipleAmbiguousErrorHandlers() { fail(); } + @Unremovable @ApplicationScoped public static class GlobalErrorHandlers { diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/OnClose.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/OnClose.java index 49707928265d6..636a6ec3fb723 100644 --- a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/OnClose.java +++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/OnClose.java @@ -6,12 +6,23 @@ import java.lang.annotation.Retention; import java.lang.annotation.Target; +import io.quarkus.websockets.next.WebSocketConnection.HandshakeRequest; import io.smallrye.common.annotation.Experimental; /** * A {@link WebSocket} endpoint method annotated with this annotation is invoked when the client disconnects from the * socket. *

+ * The method must return {@code void} or {@code io.smallrye.mutiny.Uni}. + * The method may accept the following parameters: + *

    + *
  • {@link WebSocketConnection}
  • + *
  • {@link HandshakeRequest}
  • + *
  • {@link String} parameters annotated with {@link PathParam}
  • + *
+ * Note that it's not possible to send a message to the current connection as the socket is already closed when the method + * invoked. However, it is possible to send messages to other open connections. + *

* An endpoint may declare at most one method annotated with this annotation. */ @Retention(RUNTIME) diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketEndpoint.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketEndpoint.java index 3a0d543ae426a..5ad60e04a69dd 100644 --- a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketEndpoint.java +++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketEndpoint.java @@ -3,6 +3,7 @@ import java.lang.reflect.Type; import io.quarkus.websockets.next.WebSocket; +import io.smallrye.mutiny.Uni; import io.vertx.core.Future; import io.vertx.core.buffer.Buffer; @@ -76,6 +77,8 @@ default ExecutionModel onCloseExecutionModel() { return ExecutionModel.NONE; } + Uni doOnError(Throwable t); + enum ExecutionModel { WORKER_THREAD, VIRTUAL_THREAD, diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketEndpointBase.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketEndpointBase.java index 4a0df4119c9cc..051362461babe 100644 --- a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketEndpointBase.java +++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketEndpointBase.java @@ -262,6 +262,7 @@ protected Uni doOnClose(Object message) { return Uni.createFrom().voidItem(); } + @Override public Uni doOnError(Throwable t) { // This method is overriden if there is at least one error handler defined return Uni.createFrom().failure(t); @@ -293,18 +294,19 @@ public Uni sendText(String message, boolean broadcast) { return broadcast ? connection.broadcast().sendText(message) : connection.sendText(message); } - public Uni multiText(Multi multi, boolean broadcast, Function> itemFun) { - multi.onFailure() - .call(connection::close) + public Uni multiText(Multi multi, Function> action) { + multi.onFailure().recoverWithMulti(t -> doOnError(t).toMulti()) .subscribe().with( m -> { - itemFun.apply(m) + // Encode and send message + action.apply(m) + .onFailure().recoverWithUni(this::doOnError) .subscribe() .with(v -> LOG.debugf("Multi >> text message: %s", connection), t -> LOG.errorf(t, "Unable to send text message from Multi: %s", connection)); }, t -> { - LOG.errorf(t, "Unable to send text message from Multi - connection was closed: %s ", connection); + LOG.errorf(t, "Unable to send text message from Multi: %s ", connection); }); return Uni.createFrom().voidItem(); } @@ -313,18 +315,19 @@ public Uni sendBinary(Buffer message, boolean broadcast) { return broadcast ? connection.broadcast().sendBinary(message) : connection.sendBinary(message); } - public Uni multiBinary(Multi multi, boolean broadcast, Function> itemFun) { - multi.onFailure() - .call(connection::close) + public Uni multiBinary(Multi multi, Function> action) { + multi.onFailure().recoverWithMulti(t -> doOnError(t).toMulti()) .subscribe().with( m -> { - itemFun.apply(m) + // Encode and send message + action.apply(m) + .onFailure().recoverWithUni(this::doOnError) .subscribe() .with(v -> LOG.debugf("Multi >> binary message: %s", connection), t -> LOG.errorf(t, "Unable to send binary message from Multi: %s", connection)); }, t -> { - LOG.errorf(t, "Unable to send text message from Multi - connection was closed: %s ", connection); + LOG.errorf(t, "Unable to send text message from Multi: %s ", connection); }); return Uni.createFrom().voidItem(); } diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java index 83b1a934f7983..c97dfc8107630 100644 --- a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java +++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java @@ -164,9 +164,16 @@ public void handle(Void event) { } else { textMessageHandler(connection, endpoint, ws, onOpenContext, m -> { contextSupport.start(); - textBroadcastProcessor.onNext(endpoint.decodeTextMultiItem(m)); - LOG.debugf("Text message >> Multi: %s", connection); - contextSupport.end(false); + try { + textBroadcastProcessor.onNext(endpoint.decodeTextMultiItem(m)); + LOG.debugf("Text message >> Multi: %s", connection); + } catch (Throwable throwable) { + endpoint.doOnError(throwable).subscribe().with( + v -> LOG.debugf("Text message >> Multi: %s", connection), + t -> LOG.errorf(t, "Unable to send text message to Multi: %s", connection)); + } finally { + contextSupport.end(false); + } }, false); } @@ -185,9 +192,16 @@ public void handle(Void event) { } else { binaryMessageHandler(connection, endpoint, ws, onOpenContext, m -> { contextSupport.start(); - binaryBroadcastProcessor.onNext(endpoint.decodeBinaryMultiItem(m)); - LOG.debugf("Binary message >> Multi: %s", connection); - contextSupport.end(false); + try { + binaryBroadcastProcessor.onNext(endpoint.decodeBinaryMultiItem(m)); + LOG.debugf("Binary message >> Multi: %s", connection); + } catch (Throwable throwable) { + endpoint.doOnError(throwable).subscribe().with( + v -> LOG.debugf("Binary message >> Multi: %s", connection), + t -> LOG.errorf(t, "Unable to send binary message to Multi: %s", connection)); + } finally { + contextSupport.end(false); + } }, false); }