diff --git a/docs/src/main/asciidoc/websockets-next-reference.adoc b/docs/src/main/asciidoc/websockets-next-reference.adoc index 23d223caef1ca..b343654c44e07 100644 --- a/docs/src/main/asciidoc/websockets-next-reference.adoc +++ b/docs/src/main/asciidoc/websockets-next-reference.adoc @@ -233,20 +233,23 @@ Method receiving messages from the client are annotated with `@OnTextMessage` or ==== Invocation rules -When invoking these annotated methods, the _session_ scope linked to the WebSocket connection remains active. +When invoking the callback methods, the _session_ scope linked to the WebSocket connection remains active. In addition, the request scope is active until the completion of the method (or until it produces its result for async and reactive methods). -Quarkus WebSocket Next supports _blocking_ and _non-blocking_ logic, akin to Quarkus REST, determined by the method signature and additional annotations such as `@Blocking` and `@NonBlocking`. +WebSocket Next supports _blocking_ and _non-blocking_ logic, akin to Quarkus REST, determined from the return type of the method and additional annotations such as `@Blocking` and `@NonBlocking`. Here are the rules governing execution: -* Non-blocking methods must execute on the connection's event loop. -* Methods annotated with `@RunOnVirtualThread` are considered blocking and should execute on a virtual thread. -* Blocking methods must execute on a worker thread if not annotated with `@RunOnVirtualThread`. -* When `@RunOnVirtualThread` is employed, each invocation spawns a new virtual thread. -* Methods returning `CompletionStage`, `Uni` and `Multi` are considered non-blocking. -* Methods returning `void` or plain objects are considered blocking. -* Kotlin `suspend` functions are considered non-blocking. +* Methods annotated with `@RunOnVirtualThread`, `@Blocking` or `@Transactional` are considered blocking. +* Methods annotated with `@NonBlocking` are considered non-blocking. +* Methods declared on a class annotated with `@Transactional` are considered blocking unless annotated with `@NonBlocking`. +* If the method does not declare any of the annotations listed above the execution model is derived from the return type: +** Methods returning `Uni` and `Multi` are considered non-blocking. +** Methods returning `void` or any other type are considered blocking. +* Kotlin `suspend` functions are always considered non-blocking and may not be annotated with `@Blocking`, `@NonBlocking` or `@RunOnVirtualThread`. +* Non-blocking methods must execute on the connection's event loop thread. +* Blocking methods must execute on a worker thread unless annotated with `@RunOnVirtualThread`. +* Methods annotated with `@RunOnVirtualThread` must execute on a virtual thread, each invocation spawns a new virtual thread. ==== Method parameters diff --git a/extensions/websockets-next/deployment/pom.xml b/extensions/websockets-next/deployment/pom.xml index c6b47704d0f51..a1e5ec3f6e8ce 100644 --- a/extensions/websockets-next/deployment/pom.xml +++ b/extensions/websockets-next/deployment/pom.xml @@ -91,6 +91,11 @@ opentelemetry-semconv test + + jakarta.transaction + jakarta.transaction-api + test + diff --git a/extensions/websockets-next/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketDotNames.java b/extensions/websockets-next/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketDotNames.java index 539ca7ea6a415..8ecf78f867726 100644 --- a/extensions/websockets-next/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketDotNames.java +++ b/extensions/websockets-next/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketDotNames.java @@ -54,6 +54,7 @@ final class WebSocketDotNames { static final DotName HANDSHAKE_REQUEST = DotName.createSimple(HandshakeRequest.class); static final DotName THROWABLE = DotName.createSimple(Throwable.class); static final DotName CLOSE_REASON = DotName.createSimple(CloseReason.class); + static final DotName TRANSACTIONAL = DotName.createSimple("jakarta.transaction.Transactional"); static final List CALLBACK_ANNOTATIONS = List.of(ON_OPEN, ON_CLOSE, ON_BINARY_MESSAGE, ON_TEXT_MESSAGE, ON_PONG_MESSAGE, ON_ERROR); diff --git a/extensions/websockets-next/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketProcessor.java b/extensions/websockets-next/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketProcessor.java index 0b6eddb255515..6e5b680686edb 100644 --- a/extensions/websockets-next/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketProcessor.java +++ b/extensions/websockets-next/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketProcessor.java @@ -1578,13 +1578,16 @@ private static ExecutionModel executionModel(MethodInfo method, TransformedAnnot throw new WebSocketException("Kotlin `suspend` functions in WebSockets Next endpoints may not be " + "annotated @Blocking, @NonBlocking or @RunOnVirtualThread: " + method); } - if (transformedAnnotations.hasAnnotation(method, WebSocketDotNames.RUN_ON_VIRTUAL_THREAD)) { return ExecutionModel.VIRTUAL_THREAD; } else if (transformedAnnotations.hasAnnotation(method, WebSocketDotNames.BLOCKING)) { return ExecutionModel.WORKER_THREAD; } else if (transformedAnnotations.hasAnnotation(method, WebSocketDotNames.NON_BLOCKING)) { return ExecutionModel.EVENT_LOOP; + } else if (transformedAnnotations.hasAnnotation(method, WebSocketDotNames.TRANSACTIONAL) + || transformedAnnotations.hasAnnotation(method.declaringClass(), WebSocketDotNames.TRANSACTIONAL)) { + // Method annotated with @Transactional or declared on a class annotated @Transactional is also treated as a blocking method + return ExecutionModel.WORKER_THREAD; } else { return hasBlockingSignature(method) ? ExecutionModel.WORKER_THREAD : ExecutionModel.EVENT_LOOP; } diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/executionmodel/BlockingAnnotationTest.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/executionmodel/BlockingAnnotationTest.java index b31cb1d540daf..8de51e94a3bad 100644 --- a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/executionmodel/BlockingAnnotationTest.java +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/executionmodel/BlockingAnnotationTest.java @@ -35,7 +35,8 @@ public class BlockingAnnotationTest { @Test void testEndoint() { - try (WSClient client = new WSClient(vertx).connect(endUri)) { + try (WSClient client = new WSClient(vertx)) { + client.connect(endUri); assertEquals("evenloop:false,worker:true", client.sendAndAwaitReply("foo").toString()); } } diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/executionmodel/NonBlockingAnnotationTest.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/executionmodel/NonBlockingAnnotationTest.java index 3c4da547354be..a144b28f0d932 100644 --- a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/executionmodel/NonBlockingAnnotationTest.java +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/executionmodel/NonBlockingAnnotationTest.java @@ -34,7 +34,8 @@ public class NonBlockingAnnotationTest { @Test void testEndoint() { - try (WSClient client = new WSClient(vertx).connect(endUri)) { + try (WSClient client = new WSClient(vertx)) { + client.connect(endUri); assertEquals("evenloop:true,worker:false", client.sendAndAwaitReply("foo").toString()); } } diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/executionmodel/TransactionalClassTest.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/executionmodel/TransactionalClassTest.java new file mode 100644 index 0000000000000..ea4bc61b1c182 --- /dev/null +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/executionmodel/TransactionalClassTest.java @@ -0,0 +1,55 @@ +package io.quarkus.websockets.next.test.executionmodel; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.net.URI; + +import jakarta.inject.Inject; +import jakarta.transaction.Transactional; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.OnTextMessage; +import io.quarkus.websockets.next.WebSocket; +import io.quarkus.websockets.next.test.utils.WSClient; +import io.smallrye.mutiny.Uni; +import io.vertx.core.Context; +import io.vertx.core.Vertx; + +public class TransactionalClassTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(Endpoint.class, WSClient.class); + }); + + @Inject + Vertx vertx; + + @TestHTTPResource("endpoint") + URI endUri; + + @Test + void testEndoint() { + try (WSClient client = new WSClient(vertx)) { + client.connect(endUri); + assertEquals("evenloop:false,worker:true", client.sendAndAwaitReply("foo").toString()); + } + } + + @Transactional + @WebSocket(path = "/endpoint") + public static class Endpoint { + + @OnTextMessage + Uni message(String ignored) { + return Uni.createFrom().item("evenloop:" + Context.isOnEventLoopThread() + ",worker:" + Context.isOnWorkerThread()); + } + + } + +} diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/executionmodel/TransactionalMethodTest.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/executionmodel/TransactionalMethodTest.java new file mode 100644 index 0000000000000..1bc299b54e47c --- /dev/null +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/executionmodel/TransactionalMethodTest.java @@ -0,0 +1,55 @@ +package io.quarkus.websockets.next.test.executionmodel; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.net.URI; + +import jakarta.inject.Inject; +import jakarta.transaction.Transactional; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.OnTextMessage; +import io.quarkus.websockets.next.WebSocket; +import io.quarkus.websockets.next.test.utils.WSClient; +import io.smallrye.mutiny.Uni; +import io.vertx.core.Context; +import io.vertx.core.Vertx; + +public class TransactionalMethodTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(Endpoint.class, WSClient.class); + }); + + @Inject + Vertx vertx; + + @TestHTTPResource("endpoint") + URI endUri; + + @Test + void testEndoint() { + try (WSClient client = new WSClient(vertx)) { + client.connect(endUri); + assertEquals("evenloop:false,worker:true", client.sendAndAwaitReply("foo").toString()); + } + } + + @WebSocket(path = "/endpoint") + public static class Endpoint { + + @Transactional + @OnTextMessage + Uni message(String ignored) { + return Uni.createFrom().item("evenloop:" + Context.isOnEventLoopThread() + ",worker:" + Context.isOnWorkerThread()); + } + + } + +}