From 2536d52c511384adeedd76a2c821cc80712d9ded Mon Sep 17 00:00:00 2001 From: Martin Kouba Date: Tue, 25 Jun 2024 17:14:43 +0200 Subject: [PATCH] WebSockets Next: add limit of messages kept for a Dev UI connection - otherwise the connection can consume all memory and the UI may become unresponsive, e.g. if an endpoint sends a new message every millisecond --- .../resources/dev-ui/qwc-wsn-endpoints.js | 15 +++-- .../next/WebSocketsServerRuntimeConfig.java | 16 +++++ .../devui/WebSocketNextJsonRPCService.java | 58 +++++++++++++------ 3 files changed, 67 insertions(+), 22 deletions(-) diff --git a/extensions/websockets-next/deployment/src/main/resources/dev-ui/qwc-wsn-endpoints.js b/extensions/websockets-next/deployment/src/main/resources/dev-ui/qwc-wsn-endpoints.js index 1b6d808339c9f..b18e57b0665d1 100644 --- a/extensions/websockets-next/deployment/src/main/resources/dev-ui/qwc-wsn-endpoints.js +++ b/extensions/websockets-next/deployment/src/main/resources/dev-ui/qwc-wsn-endpoints.js @@ -32,7 +32,6 @@ export class QwcWebSocketNextEndpoints extends LitElement { cursor: pointer; } .top-bar { - display: flex; align-items: baseline; gap: 20px; padding-left: 20px; @@ -62,7 +61,8 @@ export class QwcWebSocketNextEndpoints extends LitElement { _selectedEndpoint: {state: true}, _selectedConnection: {state: true}, _endpointsAndConnections: {state: true}, - _textMessages: {state: true} + _textMessages: {state: true}, + _connectionMessagesLimit: {state: false} }; constructor() { @@ -83,6 +83,7 @@ export class QwcWebSocketNextEndpoints extends LitElement { e.connections = jsonResponse.result[e.generatedClazz]; return e; }); + this._connectionMessagesLimit = jsonResponse.result.connectionMessageLimit; }) .then(() => { this._conntectionStatusStream = this.jsonRpc.connectionStatus().onNext(jsonResponse => { @@ -219,7 +220,7 @@ export class QwcWebSocketNextEndpoints extends LitElement { Back -

${this._selectedEndpoint.clazz} · Open Connections

+

Open connections for endpoint: ${this._selectedEndpoint.clazz}

`; } @@ -234,11 +235,15 @@ export class QwcWebSocketNextEndpoints extends LitElement { Close connection + + Connection messages limit: ${this._connectionMessagesLimit} + Clear messages -

${this._selectedEndpoint.clazz} · Dev UI Connection · ${this._selectedConnection.handshakePath}

+

Connection: ${this._selectedConnection.id}

+

Endpoint: ${this._selectedEndpoint.clazz}  |  Handshake path: ${this._selectedConnection.handshakePath}

`; } @@ -286,7 +291,7 @@ export class QwcWebSocketNextEndpoints extends LitElement { _renderClazz(endpoint) { return html` - ${endpoint.clazz} + ${endpoint.clazz} `; } diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsServerRuntimeConfig.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsServerRuntimeConfig.java index 3d4b71a427dd0..650067a60aa41 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsServerRuntimeConfig.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsServerRuntimeConfig.java @@ -59,6 +59,11 @@ public interface WebSocketsServerRuntimeConfig { */ Security security(); + /** + * Dev mode configuration. + */ + DevMode devMode(); + /** * Traffic logging config. */ @@ -75,4 +80,15 @@ interface Security { } + interface DevMode { + + /** + * The limit of messages kept for a Dev UI connection. If less than zero then no messages are stored and sent to the Dev + * UI view. + */ + @WithDefault("1000") + long connectionMessagesLimit(); + + } + } diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/devui/WebSocketNextJsonRPCService.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/devui/WebSocketNextJsonRPCService.java index 810da7a9568fa..5df2e1d395b28 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/devui/WebSocketNextJsonRPCService.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/devui/WebSocketNextJsonRPCService.java @@ -4,7 +4,6 @@ import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.ListIterator; @@ -18,6 +17,7 @@ import io.quarkus.vertx.http.runtime.HttpConfiguration; import io.quarkus.websockets.next.WebSocketConnection; +import io.quarkus.websockets.next.WebSocketsServerRuntimeConfig; import io.quarkus.websockets.next.runtime.ConnectionManager; import io.quarkus.websockets.next.runtime.ConnectionManager.ConnectionListener; import io.smallrye.mutiny.Multi; @@ -51,12 +51,16 @@ public class WebSocketNextJsonRPCService implements ConnectionListener { private final HttpConfiguration httpConfig; - WebSocketNextJsonRPCService(ConnectionManager connectionManager, Vertx vertx, HttpConfiguration httpConfig) { + private final WebSocketsServerRuntimeConfig.DevMode devModeConfig; + + WebSocketNextJsonRPCService(ConnectionManager connectionManager, Vertx vertx, HttpConfiguration httpConfig, + WebSocketsServerRuntimeConfig config) { this.connectionStatus = BroadcastProcessor.create(); this.connectionMessages = BroadcastProcessor.create(); this.connectionManager = connectionManager; this.vertx = vertx; this.httpConfig = httpConfig; + this.devModeConfig = config.devMode(); this.sockets = new ConcurrentHashMap<>(); connectionManager.addListener(this); } @@ -80,6 +84,7 @@ public JsonObject getConnections(List endpoints) { } json.put(endpoint, array); } + json.put("connectionMessagesLimit", devModeConfig.connectionMessagesLimit()); return json; } @@ -87,8 +92,9 @@ public JsonArray getMessages(String connectionKey) { DevWebSocket socket = sockets.get(connectionKey); if (socket != null) { JsonArray ret = new JsonArray(); - synchronized (socket.messages) { - for (ListIterator it = socket.messages.listIterator(socket.messages.size()); it.hasPrevious();) { + List messages = socket.messages; + synchronized (messages) { + for (ListIterator it = messages.listIterator(messages.size()); it.hasPrevious();) { ret.add(it.previous().toJsonObject()); } } @@ -112,13 +118,18 @@ public Uni openDevConnection(String path, String endpointPath) { .addHeader(DEVUI_SOCKET_KEY_HEADER, connectionKey))); return uni.onItem().transform(s -> { LOG.debugf("Opened Dev UI connection with key %s to %s", connectionKey, path); - List messages = Collections.synchronizedList(new ArrayList<>()); + List messages = new ArrayList<>(); s.textMessageHandler(m -> { - TextMessage t = new TextMessage(true, m, LocalDateTime.now()); - messages.add(t); - connectionMessages - .onNext(t.toJsonObject() - .put("key", connectionKey)); + synchronized (messages) { + if (messages.size() < devModeConfig.connectionMessagesLimit()) { + TextMessage t = new TextMessage(true, m, LocalDateTime.now()); + messages.add(t); + connectionMessages.onNext(t.toJsonObject().put("key", connectionKey)); + } else { + LOG.debugf("Opened Dev UI connection [%s] received a message but the limit [%s] has been reached", + connectionKey, devModeConfig.connectionMessagesLimit()); + } + } }); sockets.put(connectionKey, new DevWebSocket(s, messages)); return new JsonObject().put("success", true).put("key", connectionKey); @@ -187,12 +198,18 @@ public Uni sendTextMessage(String connectionKey, String message) { if (socket != null) { Uni uni = UniHelper.toUni(socket.socket.writeTextMessage(message)); return uni.onItem().transform(v -> { - LOG.debugf("Sent text message to connection with key %s", connectionKey); - TextMessage t = new TextMessage(false, message, LocalDateTime.now()); - socket.messages.add(t); - connectionMessages - .onNext(t.toJsonObject() - .put("key", connectionKey)); + List messages = socket.messages; + synchronized (messages) { + if (messages.size() < devModeConfig.connectionMessagesLimit()) { + TextMessage t = new TextMessage(false, message, LocalDateTime.now()); + messages.add(t); + connectionMessages.onNext(t.toJsonObject().put("key", connectionKey)); + LOG.debugf("Sent text message to connection with key %s", connectionKey); + } else { + LOG.debugf("Sent text message to connection [%s] but the limit [%s] has been reached", + connectionKey, devModeConfig.connectionMessagesLimit()); + } + } return new JsonObject().put("success", true); }).onFailure().recoverWithItem(t -> { LOG.errorf(t, "Unable to send text message to connection with key %s", connectionKey); @@ -205,7 +222,7 @@ public Uni sendTextMessage(String connectionKey, String message) { public JsonObject clearMessages(String connectionKey) { DevWebSocket socket = sockets.get(connectionKey); if (socket != null) { - socket.messages.clear(); + socket.clearMessages(); return new JsonObject().put("success", true); } return new JsonObject().put("success", false); @@ -240,6 +257,12 @@ JsonObject toJsonObject(String endpoint, WebSocketConnection c) { } record DevWebSocket(WebSocket socket, List messages) { + + void clearMessages() { + synchronized (messages) { + messages.clear(); + } + } } record TextMessage(boolean incoming, String text, LocalDateTime timestamp) { @@ -252,6 +275,7 @@ JsonObject toJsonObject() { .put("className", incoming ? "incoming" : "outgoing") .put("userAbbr", incoming ? "IN" : "OUT"); } + } }