From 30e3561ba5fe1e1ca9ee4eb550983606006f4db2 Mon Sep 17 00:00:00 2001 From: Martin Kouba Date: Fri, 21 Jun 2024 10:49:44 +0200 Subject: [PATCH] WebSockets Next: fire CDI events for client connections added/removed - resolves #41333 --- .../asciidoc/websockets-next-reference.adoc | 22 ++++ .../client/ClientConnectionEventsTest.java | 121 ++++++++++++++++++ .../io/quarkus/websockets/next/Closed.java | 5 +- .../java/io/quarkus/websockets/next/Open.java | 5 +- .../next/runtime/ClientConnectionManager.java | 24 ++++ 5 files changed, 175 insertions(+), 2 deletions(-) create mode 100644 extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/ClientConnectionEventsTest.java diff --git a/docs/src/main/asciidoc/websockets-next-reference.adoc b/docs/src/main/asciidoc/websockets-next-reference.adoc index 99b9e1ccacfb6..f2af366a9bd49 100644 --- a/docs/src/main/asciidoc/websockets-next-reference.adoc +++ b/docs/src/main/asciidoc/websockets-next-reference.adoc @@ -623,6 +623,7 @@ class MyBean { There are also other convenient methods. For example, `OpenConnections#findByEndpointId(String)` makes it easy to find connections for a specific endpoint. +[[server-cdi-events]] ==== CDI events Quarkus fires a CDI event of type `io.quarkus.websockets.next.WebSocketConnection` with qualifier `@io.quarkus.websockets.next.Open` asynchronously when a new connection is opened. @@ -938,6 +939,27 @@ class MyBean { There are also other convenient methods. For example, `OpenClientConnections#findByClientId(String)` makes it easy to find connections for a specific endpoint. +[[client-cdi-events]] +==== CDI events + +Quarkus fires a CDI event of type `io.quarkus.websockets.next.WebSocketClientConnection` with qualifier `@io.quarkus.websockets.next.Open` asynchronously when a new connection is opened. +Moreover, a CDI event of type `WebSocketClientConnection` with qualifier `@io.quarkus.websockets.next.Closed` is fired asynchronously when a connection is closed. + +[source, java] +---- +import jakarta.enterprise.event.ObservesAsync; +import io.quarkus.websockets.next.Open; +import io.quarkus.websockets.next.WebSocketClientConnection; + +class MyBean { + + void connectionOpened(@ObservesAsync @Open WebSocketClientConnection connection) { <1> + // This observer method is called when a connection is opened... + } +} +---- +<1> An asynchronous observer method is executed using the default blocking executor service. + === Configuring SSL/TLS To establish a TLS connection, you need to configure a _named_ configuration using the xref:./tls-registry-reference.adoc[TLS registry]: diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/ClientConnectionEventsTest.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/ClientConnectionEventsTest.java new file mode 100644 index 0000000000000..8e7f17c7cf50e --- /dev/null +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/ClientConnectionEventsTest.java @@ -0,0 +1,121 @@ +package io.quarkus.websockets.next.test.client; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import jakarta.enterprise.event.ObservesAsync; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.Closed; +import io.quarkus.websockets.next.OnClose; +import io.quarkus.websockets.next.OnOpen; +import io.quarkus.websockets.next.OnTextMessage; +import io.quarkus.websockets.next.Open; +import io.quarkus.websockets.next.WebSocket; +import io.quarkus.websockets.next.WebSocketClient; +import io.quarkus.websockets.next.WebSocketClientConnection; +import io.quarkus.websockets.next.WebSocketConnector; +import io.quarkus.websockets.next.test.utils.WSClient; + +public class ClientConnectionEventsTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(Endpoint.class, ObservingBean.class, WSClient.class); + }); + + @TestHTTPResource("/") + URI baseUri; + + @Inject + WebSocketConnector connector; + + @Test + void testEvents() throws Exception { + // Open connection, EndpointClient sends a message with client connection id + WebSocketClientConnection connection = connector + .baseUri(baseUri) + .connectAndAwait(); + // Wait for the message + assertTrue(Endpoint.MESSAGE_LATCH.await(5, TimeUnit.SECONDS)); + // Assert the @Open event was fired + assertTrue(ObservingBean.OPEN_LATCH.await(5, TimeUnit.SECONDS)); + assertNotNull(ObservingBean.OPEN_CONN.get()); + assertEquals(connection.id(), ObservingBean.OPEN_CONN.get().id()); + assertEquals(connection.id(), Endpoint.MESSAGE.get()); + // Close the connection + connection.closeAndAwait(); + assertTrue(EndpointClient.CLOSED_LATCH.await(5, TimeUnit.SECONDS)); + // Assert the @Closed event was fired + assertTrue(ObservingBean.CLOSED_LATCH.await(5, TimeUnit.SECONDS)); + assertNotNull(ObservingBean.CLOSED_CONN.get()); + assertEquals(connection.id(), ObservingBean.CLOSED_CONN.get().id()); + } + + @WebSocket(path = "/endpoint") + public static class Endpoint { + + static final AtomicReference MESSAGE = new AtomicReference<>(); + + static final CountDownLatch MESSAGE_LATCH = new CountDownLatch(1); + + @OnTextMessage + void message(String message) { + MESSAGE.set(message); + MESSAGE_LATCH.countDown(); + } + + } + + @WebSocketClient(path = "/endpoint") + public static class EndpointClient { + + static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1); + + @OnOpen + String open(WebSocketClientConnection connection) { + return connection.id(); + } + + @OnClose + void close() { + CLOSED_LATCH.countDown(); + } + + } + + @Singleton + public static class ObservingBean { + + static final CountDownLatch OPEN_LATCH = new CountDownLatch(1); + static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1); + + static final AtomicReference OPEN_CONN = new AtomicReference<>(); + static final AtomicReference CLOSED_CONN = new AtomicReference<>(); + + void onOpen(@ObservesAsync @Open WebSocketClientConnection connection) { + OPEN_CONN.set(connection); + OPEN_LATCH.countDown(); + } + + void onClose(@ObservesAsync @Closed WebSocketClientConnection connection) { + CLOSED_CONN.set(connection); + CLOSED_LATCH.countDown(); + } + + } + +} diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/Closed.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/Closed.java index ea65c81e00bb9..5dd26d74bac36 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/Closed.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/Closed.java @@ -16,7 +16,10 @@ import jakarta.inject.Qualifier; /** - * A CDI event of type {@link WebSocketConnection} with this qualifier is fired asynchronously when a connection is closed. + * This qualifier is used for CDI events fired asynchronously when a WebSocket connection is closed. + *

+ * The payload is {@link WebSocketConnection} for server connections and {@link WebSocketClientConnection} for client + * connections. * * @see ObservesAsync * @see Event#fireAsync(Object) diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/Open.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/Open.java index 08a17eb71f591..4af50adfd95c0 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/Open.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/Open.java @@ -16,7 +16,10 @@ import jakarta.inject.Qualifier; /** - * A CDI event of type {@link WebSocketConnection} with this qualifier is fired asynchronously when a new connection is opened. + * This qualifier is used for CDI events fired asynchronously when a new WebSocket connection is opened. + *

+ * The payload is {@link WebSocketConnection} for server connections and {@link WebSocketClientConnection} for client + * connections. * * @see ObservesAsync * @see Event#fireAsync(Object) diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/ClientConnectionManager.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/ClientConnectionManager.java index a81eef5786d64..ba5b038e32950 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/ClientConnectionManager.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/ClientConnectionManager.java @@ -9,10 +9,15 @@ import java.util.stream.Stream; import jakarta.annotation.PreDestroy; +import jakarta.enterprise.event.Event; import jakarta.inject.Singleton; import org.jboss.logging.Logger; +import io.quarkus.arc.Arc; +import io.quarkus.arc.ArcContainer; +import io.quarkus.websockets.next.Closed; +import io.quarkus.websockets.next.Open; import io.quarkus.websockets.next.OpenClientConnections; import io.quarkus.websockets.next.WebSocketClientConnection; @@ -25,6 +30,19 @@ public class ClientConnectionManager implements OpenClientConnections { private final List listeners = new CopyOnWriteArrayList<>(); + private final Event openEvent; + private final Event closedEvent; + + ClientConnectionManager(@Open Event openEvent, + @Closed Event closedEvent) { + ArcContainer container = Arc.container(); + this.openEvent = container.resolveObserverMethods(WebSocketClientConnection.class, Open.Literal.INSTANCE).isEmpty() + ? null + : openEvent; + this.closedEvent = container.resolveObserverMethods(WebSocketClientConnection.class, Closed.Literal.INSTANCE) + .isEmpty() ? null : closedEvent; + } + @Override public Iterator iterator() { return stream().iterator(); @@ -38,6 +56,9 @@ public Stream stream() { void add(String endpoint, WebSocketClientConnection connection) { LOG.debugf("Add client connection: %s", connection); if (endpointToConnections.computeIfAbsent(endpoint, e -> ConcurrentHashMap.newKeySet()).add(connection)) { + if (openEvent != null) { + openEvent.fireAsync(connection); + } if (!listeners.isEmpty()) { for (ClientConnectionListener listener : listeners) { try { @@ -56,6 +77,9 @@ void remove(String endpoint, WebSocketClientConnection connection) { Set connections = endpointToConnections.get(endpoint); if (connections != null) { if (connections.remove(connection)) { + if (closedEvent != null) { + closedEvent.fireAsync(connection); + } if (!listeners.isEmpty()) { for (ClientConnectionListener listener : listeners) { try {