Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebSockets Next: fire CDI events for client connections added/removed #41348

Merged
merged 1 commit into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/src/main/asciidoc/websockets-next-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@
There are also other convenient methods.
For example, `OpenConnections#findByEndpointId(String)` makes it easy to find connections for a specific endpoint.

[[server-cdi-events]]

Check warning on line 626 in docs/src/main/asciidoc/websockets-next-reference.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.HeadingPunctuation] Do not use end punctuation in headings. Raw Output: {"message": "[Quarkus.HeadingPunctuation] Do not use end punctuation in headings.", "location": {"path": "docs/src/main/asciidoc/websockets-next-reference.adoc", "range": {"start": {"line": 626, "column": 12}}}, "severity": "INFO"}

Check warning on line 626 in docs/src/main/asciidoc/websockets-next-reference.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.Headings] Use sentence-style capitalization in '6.3.2. CDI events'. Raw Output: {"message": "[Quarkus.Headings] Use sentence-style capitalization in '6.3.2. CDI events'.", "location": {"path": "docs/src/main/asciidoc/websockets-next-reference.adoc", "range": {"start": {"line": 626, "column": 12}}}, "severity": "INFO"}
==== CDI events

Quarkus fires a CDI event of type `io.quarkus.websockets.next.WebSocketConnection` with qualifier `@io.quarkus.websockets.next.Open` asynchronously when a new connection is opened.
Expand Down Expand Up @@ -938,9 +939,30 @@
There are also other convenient methods.
For example, `OpenClientConnections#findByClientId(String)` makes it easy to find connections for a specific endpoint.

[[client-cdi-events]]

Check warning on line 942 in docs/src/main/asciidoc/websockets-next-reference.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.HeadingPunctuation] Do not use end punctuation in headings. Raw Output: {"message": "[Quarkus.HeadingPunctuation] Do not use end punctuation in headings.", "location": {"path": "docs/src/main/asciidoc/websockets-next-reference.adoc", "range": {"start": {"line": 942, "column": 14}}}, "severity": "INFO"}

Check warning on line 942 in docs/src/main/asciidoc/websockets-next-reference.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.Headings] Use sentence-style capitalization in '7.2.2. CDI events'. Raw Output: {"message": "[Quarkus.Headings] Use sentence-style capitalization in '7.2.2. CDI events'.", "location": {"path": "docs/src/main/asciidoc/websockets-next-reference.adoc", "range": {"start": {"line": 942, "column": 14}}}, "severity": "INFO"}
==== CDI events

Quarkus fires a CDI event of type `io.quarkus.websockets.next.WebSocketClientConnection` with qualifier `@io.quarkus.websockets.next.Open` asynchronously when a new connection is opened.
Moreover, a CDI event of type `WebSocketClientConnection` with qualifier `@io.quarkus.websockets.next.Closed` is fired asynchronously when a connection is closed.

[source, java]
----
import jakarta.enterprise.event.ObservesAsync;
import io.quarkus.websockets.next.Open;
import io.quarkus.websockets.next.WebSocketClientConnection;

class MyBean {

void connectionOpened(@ObservesAsync @Open WebSocketClientConnection connection) { <1>
// This observer method is called when a connection is opened...
}
}
----
<1> An asynchronous observer method is executed using the default blocking executor service.

Check warning on line 961 in docs/src/main/asciidoc/websockets-next-reference.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using 'by using' or 'that uses' rather than 'using'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using 'by using' or 'that uses' rather than 'using'.", "location": {"path": "docs/src/main/asciidoc/websockets-next-reference.adoc", "range": {"start": {"line": 961, "column": 37}}}, "severity": "INFO"}

Check warning on line 961 in docs/src/main/asciidoc/websockets-next-reference.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.Headings] Use sentence-style capitalization in '7.3. Configuring SSL/TLS'. Raw Output: {"message": "[Quarkus.Headings] Use sentence-style capitalization in '7.3. Configuring SSL/TLS'.", "location": {"path": "docs/src/main/asciidoc/websockets-next-reference.adoc", "range": {"start": {"line": 961, "column": 86}}}, "severity": "INFO"}

Check warning on line 961 in docs/src/main/asciidoc/websockets-next-reference.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.HeadingPunctuation] Do not use end punctuation in headings. Raw Output: {"message": "[Quarkus.HeadingPunctuation] Do not use end punctuation in headings.", "location": {"path": "docs/src/main/asciidoc/websockets-next-reference.adoc", "range": {"start": {"line": 961, "column": 86}}}, "severity": "INFO"}

=== Configuring SSL/TLS

To establish a TLS connection, you need to configure a _named_ configuration using the xref:./tls-registry-reference.adoc[TLS registry]:

Check warning on line 965 in docs/src/main/asciidoc/websockets-next-reference.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.Fluff] Depending on the context, consider using 'Rewrite the sentence, or use 'must', instead of' rather than 'need to'. Raw Output: {"message": "[Quarkus.Fluff] Depending on the context, consider using 'Rewrite the sentence, or use 'must', instead of' rather than 'need to'.", "location": {"path": "docs/src/main/asciidoc/websockets-next-reference.adoc", "range": {"start": {"line": 965, "column": 36}}}, "severity": "INFO"}

Check warning on line 965 in docs/src/main/asciidoc/websockets-next-reference.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using 'by using' or 'that uses' rather than 'using'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using 'by using' or 'that uses' rather than 'using'.", "location": {"path": "docs/src/main/asciidoc/websockets-next-reference.adoc", "range": {"start": {"line": 965, "column": 66}}}, "severity": "INFO"}

[source, properties]
----
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package io.quarkus.websockets.next.test.client;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import jakarta.enterprise.event.ObservesAsync;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.Closed;
import io.quarkus.websockets.next.OnClose;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.Open;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.WebSocketClient;
import io.quarkus.websockets.next.WebSocketClientConnection;
import io.quarkus.websockets.next.WebSocketConnector;
import io.quarkus.websockets.next.test.utils.WSClient;

public class ClientConnectionEventsTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(Endpoint.class, ObservingBean.class, WSClient.class);
});

@TestHTTPResource("/")
URI baseUri;

@Inject
WebSocketConnector<EndpointClient> connector;

@Test
void testEvents() throws Exception {
// Open connection, EndpointClient sends a message with client connection id
WebSocketClientConnection connection = connector
.baseUri(baseUri)
.connectAndAwait();
// Wait for the message
assertTrue(Endpoint.MESSAGE_LATCH.await(5, TimeUnit.SECONDS));
// Assert the @Open event was fired
assertTrue(ObservingBean.OPEN_LATCH.await(5, TimeUnit.SECONDS));
assertNotNull(ObservingBean.OPEN_CONN.get());
assertEquals(connection.id(), ObservingBean.OPEN_CONN.get().id());
assertEquals(connection.id(), Endpoint.MESSAGE.get());
// Close the connection
connection.closeAndAwait();
assertTrue(EndpointClient.CLOSED_LATCH.await(5, TimeUnit.SECONDS));
// Assert the @Closed event was fired
assertTrue(ObservingBean.CLOSED_LATCH.await(5, TimeUnit.SECONDS));
assertNotNull(ObservingBean.CLOSED_CONN.get());
assertEquals(connection.id(), ObservingBean.CLOSED_CONN.get().id());
}

@WebSocket(path = "/endpoint")
public static class Endpoint {

static final AtomicReference<String> MESSAGE = new AtomicReference<>();

static final CountDownLatch MESSAGE_LATCH = new CountDownLatch(1);

@OnTextMessage
void message(String message) {
MESSAGE.set(message);
MESSAGE_LATCH.countDown();
}

}

@WebSocketClient(path = "/endpoint")
public static class EndpointClient {

static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1);

@OnOpen
String open(WebSocketClientConnection connection) {
return connection.id();
}

@OnClose
void close() {
CLOSED_LATCH.countDown();
}

}

@Singleton
public static class ObservingBean {

static final CountDownLatch OPEN_LATCH = new CountDownLatch(1);
static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1);

static final AtomicReference<WebSocketClientConnection> OPEN_CONN = new AtomicReference<>();
static final AtomicReference<WebSocketClientConnection> CLOSED_CONN = new AtomicReference<>();

void onOpen(@ObservesAsync @Open WebSocketClientConnection connection) {
OPEN_CONN.set(connection);
OPEN_LATCH.countDown();
}

void onClose(@ObservesAsync @Closed WebSocketClientConnection connection) {
CLOSED_CONN.set(connection);
CLOSED_LATCH.countDown();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
import jakarta.inject.Qualifier;

/**
* A CDI event of type {@link WebSocketConnection} with this qualifier is fired asynchronously when a connection is closed.
* This qualifier is used for CDI events fired asynchronously when a WebSocket connection is closed.
* <p>
* The payload is {@link WebSocketConnection} for server connections and {@link WebSocketClientConnection} for client
* connections.
*
* @see ObservesAsync
* @see Event#fireAsync(Object)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
import jakarta.inject.Qualifier;

/**
* A CDI event of type {@link WebSocketConnection} with this qualifier is fired asynchronously when a new connection is opened.
* This qualifier is used for CDI events fired asynchronously when a new WebSocket connection is opened.
* <p>
* The payload is {@link WebSocketConnection} for server connections and {@link WebSocketClientConnection} for client
* connections.
*
* @see ObservesAsync
* @see Event#fireAsync(Object)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@
import java.util.stream.Stream;

import jakarta.annotation.PreDestroy;
import jakarta.enterprise.event.Event;
import jakarta.inject.Singleton;

import org.jboss.logging.Logger;

import io.quarkus.arc.Arc;
import io.quarkus.arc.ArcContainer;
import io.quarkus.websockets.next.Closed;
import io.quarkus.websockets.next.Open;
import io.quarkus.websockets.next.OpenClientConnections;
import io.quarkus.websockets.next.WebSocketClientConnection;

Expand All @@ -25,6 +30,19 @@ public class ClientConnectionManager implements OpenClientConnections {

private final List<ClientConnectionListener> listeners = new CopyOnWriteArrayList<>();

private final Event<WebSocketClientConnection> openEvent;
private final Event<WebSocketClientConnection> closedEvent;

ClientConnectionManager(@Open Event<WebSocketClientConnection> openEvent,
@Closed Event<WebSocketClientConnection> closedEvent) {
ArcContainer container = Arc.container();
this.openEvent = container.resolveObserverMethods(WebSocketClientConnection.class, Open.Literal.INSTANCE).isEmpty()
? null
: openEvent;
this.closedEvent = container.resolveObserverMethods(WebSocketClientConnection.class, Closed.Literal.INSTANCE)
.isEmpty() ? null : closedEvent;
}

@Override
public Iterator<WebSocketClientConnection> iterator() {
return stream().iterator();
Expand All @@ -38,6 +56,9 @@ public Stream<WebSocketClientConnection> stream() {
void add(String endpoint, WebSocketClientConnection connection) {
LOG.debugf("Add client connection: %s", connection);
if (endpointToConnections.computeIfAbsent(endpoint, e -> ConcurrentHashMap.newKeySet()).add(connection)) {
if (openEvent != null) {
openEvent.fireAsync(connection);
}
if (!listeners.isEmpty()) {
for (ClientConnectionListener listener : listeners) {
try {
Expand All @@ -56,6 +77,9 @@ void remove(String endpoint, WebSocketClientConnection connection) {
Set<WebSocketClientConnection> connections = endpointToConnections.get(endpoint);
if (connections != null) {
if (connections.remove(connection)) {
if (closedEvent != null) {
closedEvent.fireAsync(connection);
}
if (!listeners.isEmpty()) {
for (ClientConnectionListener listener : listeners) {
try {
Expand Down