Skip to content

Commit

Permalink
WebSockets Next: add limit of messages kept for a Dev UI connection
Browse files Browse the repository at this point in the history
- otherwise the connection can consume all memory and the UI may become
unresponsive, e.g. if an endpoint sends a new message every millisecond
  • Loading branch information
mkouba committed Jun 25, 2024
1 parent f5bc656 commit 2536d52
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ export class QwcWebSocketNextEndpoints extends LitElement {
cursor: pointer;
}
.top-bar {
display: flex;
align-items: baseline;
gap: 20px;
padding-left: 20px;
Expand Down Expand Up @@ -62,7 +61,8 @@ export class QwcWebSocketNextEndpoints extends LitElement {
_selectedEndpoint: {state: true},
_selectedConnection: {state: true},
_endpointsAndConnections: {state: true},
_textMessages: {state: true}
_textMessages: {state: true},
_connectionMessagesLimit: {state: false}
};

constructor() {
Expand All @@ -83,6 +83,7 @@ export class QwcWebSocketNextEndpoints extends LitElement {
e.connections = jsonResponse.result[e.generatedClazz];
return e;
});
this._connectionMessagesLimit = jsonResponse.result.connectionMessageLimit;
})
.then(() => {
this._conntectionStatusStream = this.jsonRpc.connectionStatus().onNext(jsonResponse => {
Expand Down Expand Up @@ -219,7 +220,7 @@ export class QwcWebSocketNextEndpoints extends LitElement {
<vaadin-icon icon="font-awesome-solid:caret-left" slot="prefix"></vaadin-icon>
Back
</vaadin-button>
<h4>${this._selectedEndpoint.clazz} · Open Connections</h4>
<h4>Open connections for endpoint: <code>${this._selectedEndpoint.clazz}</code></h4>
</div>`;
}

Expand All @@ -234,11 +235,15 @@ export class QwcWebSocketNextEndpoints extends LitElement {
<vaadin-icon icon="font-awesome-solid:xmark" slot="prefix"></vaadin-icon>
Close connection
</vaadin-button>
<vaadin-button disabled>
Connection messages limit: ${this._connectionMessagesLimit}
</vaadin-button>
<vaadin-button @click="${this._clearMessages}">
<vaadin-icon icon="font-awesome-solid:trash" slot="prefix"></vaadin-icon>
Clear messages
</vaadin-button>
<h4>${this._selectedEndpoint.clazz} · Dev UI Connection · <code>${this._selectedConnection.handshakePath}</code></h4>
<h4>Connection: <code>${this._selectedConnection.id}</code></h4>
<h3>Endpoint: <code>${this._selectedEndpoint.clazz}</code> &nbsp;|&nbsp; Handshake path: <code>${this._selectedConnection.handshakePath}</code></h3>
</div>`;
}

Expand Down Expand Up @@ -286,7 +291,7 @@ export class QwcWebSocketNextEndpoints extends LitElement {

_renderClazz(endpoint) {
return html`
<strong>${endpoint.clazz}</strong>
<strong><code>${endpoint.clazz}</code></strong>
`;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ public interface WebSocketsServerRuntimeConfig {
*/
Security security();

/**
* Dev mode configuration.
*/
DevMode devMode();

/**
* Traffic logging config.
*/
Expand All @@ -75,4 +80,15 @@ interface Security {

}

interface DevMode {

/**
* The limit of messages kept for a Dev UI connection. If less than zero then no messages are stored and sent to the Dev
* UI view.
*/
@WithDefault("1000")
long connectionMessagesLimit();

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.ListIterator;
Expand All @@ -18,6 +17,7 @@

import io.quarkus.vertx.http.runtime.HttpConfiguration;
import io.quarkus.websockets.next.WebSocketConnection;
import io.quarkus.websockets.next.WebSocketsServerRuntimeConfig;
import io.quarkus.websockets.next.runtime.ConnectionManager;
import io.quarkus.websockets.next.runtime.ConnectionManager.ConnectionListener;
import io.smallrye.mutiny.Multi;
Expand Down Expand Up @@ -51,12 +51,16 @@ public class WebSocketNextJsonRPCService implements ConnectionListener {

private final HttpConfiguration httpConfig;

WebSocketNextJsonRPCService(ConnectionManager connectionManager, Vertx vertx, HttpConfiguration httpConfig) {
private final WebSocketsServerRuntimeConfig.DevMode devModeConfig;

WebSocketNextJsonRPCService(ConnectionManager connectionManager, Vertx vertx, HttpConfiguration httpConfig,
WebSocketsServerRuntimeConfig config) {
this.connectionStatus = BroadcastProcessor.create();
this.connectionMessages = BroadcastProcessor.create();
this.connectionManager = connectionManager;
this.vertx = vertx;
this.httpConfig = httpConfig;
this.devModeConfig = config.devMode();
this.sockets = new ConcurrentHashMap<>();
connectionManager.addListener(this);
}
Expand All @@ -80,15 +84,17 @@ public JsonObject getConnections(List<String> endpoints) {
}
json.put(endpoint, array);
}
json.put("connectionMessagesLimit", devModeConfig.connectionMessagesLimit());
return json;
}

public JsonArray getMessages(String connectionKey) {
DevWebSocket socket = sockets.get(connectionKey);
if (socket != null) {
JsonArray ret = new JsonArray();
synchronized (socket.messages) {
for (ListIterator<TextMessage> it = socket.messages.listIterator(socket.messages.size()); it.hasPrevious();) {
List<TextMessage> messages = socket.messages;
synchronized (messages) {
for (ListIterator<TextMessage> it = messages.listIterator(messages.size()); it.hasPrevious();) {
ret.add(it.previous().toJsonObject());
}
}
Expand All @@ -112,13 +118,18 @@ public Uni<JsonObject> openDevConnection(String path, String endpointPath) {
.addHeader(DEVUI_SOCKET_KEY_HEADER, connectionKey)));
return uni.onItem().transform(s -> {
LOG.debugf("Opened Dev UI connection with key %s to %s", connectionKey, path);
List<TextMessage> messages = Collections.synchronizedList(new ArrayList<>());
List<TextMessage> messages = new ArrayList<>();
s.textMessageHandler(m -> {
TextMessage t = new TextMessage(true, m, LocalDateTime.now());
messages.add(t);
connectionMessages
.onNext(t.toJsonObject()
.put("key", connectionKey));
synchronized (messages) {
if (messages.size() < devModeConfig.connectionMessagesLimit()) {
TextMessage t = new TextMessage(true, m, LocalDateTime.now());
messages.add(t);
connectionMessages.onNext(t.toJsonObject().put("key", connectionKey));
} else {
LOG.debugf("Opened Dev UI connection [%s] received a message but the limit [%s] has been reached",
connectionKey, devModeConfig.connectionMessagesLimit());
}
}
});
sockets.put(connectionKey, new DevWebSocket(s, messages));
return new JsonObject().put("success", true).put("key", connectionKey);
Expand Down Expand Up @@ -187,12 +198,18 @@ public Uni<JsonObject> sendTextMessage(String connectionKey, String message) {
if (socket != null) {
Uni<Void> uni = UniHelper.toUni(socket.socket.writeTextMessage(message));
return uni.onItem().transform(v -> {
LOG.debugf("Sent text message to connection with key %s", connectionKey);
TextMessage t = new TextMessage(false, message, LocalDateTime.now());
socket.messages.add(t);
connectionMessages
.onNext(t.toJsonObject()
.put("key", connectionKey));
List<TextMessage> messages = socket.messages;
synchronized (messages) {
if (messages.size() < devModeConfig.connectionMessagesLimit()) {
TextMessage t = new TextMessage(false, message, LocalDateTime.now());
messages.add(t);
connectionMessages.onNext(t.toJsonObject().put("key", connectionKey));
LOG.debugf("Sent text message to connection with key %s", connectionKey);
} else {
LOG.debugf("Sent text message to connection [%s] but the limit [%s] has been reached",
connectionKey, devModeConfig.connectionMessagesLimit());
}
}
return new JsonObject().put("success", true);
}).onFailure().recoverWithItem(t -> {
LOG.errorf(t, "Unable to send text message to connection with key %s", connectionKey);
Expand All @@ -205,7 +222,7 @@ public Uni<JsonObject> sendTextMessage(String connectionKey, String message) {
public JsonObject clearMessages(String connectionKey) {
DevWebSocket socket = sockets.get(connectionKey);
if (socket != null) {
socket.messages.clear();
socket.clearMessages();
return new JsonObject().put("success", true);
}
return new JsonObject().put("success", false);
Expand Down Expand Up @@ -240,6 +257,12 @@ JsonObject toJsonObject(String endpoint, WebSocketConnection c) {
}

record DevWebSocket(WebSocket socket, List<TextMessage> messages) {

void clearMessages() {
synchronized (messages) {
messages.clear();
}
}
}

record TextMessage(boolean incoming, String text, LocalDateTime timestamp) {
Expand All @@ -252,6 +275,7 @@ JsonObject toJsonObject() {
.put("className", incoming ? "incoming" : "outgoing")
.put("userAbbr", incoming ? "IN" : "OUT");
}

}

}

0 comments on commit 2536d52

Please sign in to comment.