Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebSockets Next: get rid of UniHelper#toUni() #41467

Merged
merged 1 commit into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void testError() throws InterruptedException {
.connectAndAwait();
connection.sendTextAndAwait("foo");
assertFalse(connection.isClosed());
connection.sendText("bar");
connection.sendTextAndAwait("bar");
assertTrue(ClientMessageErrorEndpoint.MESSAGE_LATCH.await(5, TimeUnit.SECONDS));
assertEquals("bar", ClientMessageErrorEndpoint.MESSAGES.get(0));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.quarkus.websockets.next.WebSocketClientException;
import io.quarkus.websockets.next.WebSocketsClientRuntimeConfig;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.UniHelper;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -144,7 +143,7 @@ public Uni<WebSocketClientConnection> connect() {
throw new WebSocketClientException(e);
}

return UniHelper.toUni(client.connect(connectOptions))
return Uni.createFrom().completionStage(() -> client.connect(connectOptions).toCompletionStage())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we replace the lambda with an anonymous class?

Copy link
Contributor

@geoand geoand Jun 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Always :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry but no. The reactive code is difficult to read even with lambdas. Also we use lambdas everywhere in this extension. How do you determine where to use lambda and where an anonymous class? I know that we used to avoid lambdas where possible due to some memory overhead but (1) I'd like to know if it's still the case for new JDKs and (2) I don't want to spend too much time on performance until we have some reasonable benchmarks.

Copy link
Contributor

@geoand geoand Jun 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-1 for everyrhing mentioned above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-1 for everyrhing mentioned above

Any arguments?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The argument is simple: It's been a (loose) policy all along, so if we want to change it, we should do the bare minimum of trying to figure out whether it makes sense to or not, not just put forth generic arguments that could be applied to almost any situation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The argument is simple: It's been a (loose) policy all along...

Yes, a very "loose" policy and I personally viloated this policy many times when dealing with async/reactive code ;-)

we should do the bare minimum of trying to figure out whether it makes sense to or not, not just put forth generic arguments that could be applied to almost any situation.

Readability/maintenance cost is IMO not a generic argument but I agree that we should verify whether the assumptions for this policy still apply.

@franz1981 do you happen to know if the assumption that lambdas bring significant performance overhead (memory, cpu, allocation, etc.) compared to annonymous classes still holds true?

I can try to prepare a simple (and naive) benchmark to verify this assumption...

@geoand I will merge this PR because we already have a bunch of lambdas in this extension but I've added a note to the #39148. If we decide to get rid of the lambdas (based on our findings) then it should be done globally (in the scope of the extension).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough

.map(ws -> {
String clientId = BasicWebSocketConnector.class.getName();
TrafficLogger trafficLogger = TrafficLogger.forClient(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.quarkus.websockets.next.HandshakeRequest;
import io.quarkus.websockets.next.WebSocketConnection.BroadcastSender;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.UniHelper;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.buffer.impl.BufferImpl;
import io.vertx.core.http.WebSocketBase;
Expand Down Expand Up @@ -55,14 +54,14 @@ public String pathParam(String name) {
}

public Uni<Void> sendText(String message) {
Uni<Void> uni = UniHelper.toUni(webSocket().writeTextMessage(message));
Uni<Void> uni = Uni.createFrom().completionStage(() -> webSocket().writeTextMessage(message).toCompletionStage());
return trafficLogger == null ? uni : uni.invoke(() -> {
trafficLogger.textMessageSent(this, message);
});
}

public Uni<Void> sendBinary(Buffer message) {
Uni<Void> uni = UniHelper.toUni(webSocket().writeBinaryMessage(message));
Uni<Void> uni = Uni.createFrom().completionStage(() -> webSocket().writeBinaryMessage(message).toCompletionStage());
return trafficLogger == null ? uni : uni.invoke(() -> trafficLogger.binaryMessageSent(this, message));
}

Expand All @@ -81,7 +80,7 @@ public <M> Uni<Void> sendText(M message) {
}

public Uni<Void> sendPing(Buffer data) {
return UniHelper.toUni(webSocket().writePing(data));
return Uni.createFrom().completionStage(() -> webSocket().writePing(data).toCompletionStage());
}

void sendAutoPing() {
Expand All @@ -93,7 +92,7 @@ void sendAutoPing() {
}

public Uni<Void> sendPong(Buffer data) {
return UniHelper.toUni(webSocket().writePong(data));
return Uni.createFrom().completionStage(() -> webSocket().writePong(data).toCompletionStage());
}

public Uni<Void> close() {
Expand All @@ -105,7 +104,8 @@ public Uni<Void> close(CloseReason reason) {
LOG.warnf("Connection already closed: %s", this);
return Uni.createFrom().voidItem();
}
return UniHelper.toUni(webSocket().close((short) reason.getCode(), reason.getMessage()));
return Uni.createFrom()
.completionStage(() -> webSocket().close((short) reason.getCode(), reason.getMessage()).toCompletionStage());
}

public boolean isSecure() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.quarkus.websockets.next.runtime.WebSocketClientRecorder.ClientEndpoint;
import io.quarkus.websockets.next.runtime.WebSocketClientRecorder.ClientEndpointsContext;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.UniHelper;
import io.vertx.core.Vertx;
import io.vertx.core.http.WebSocketClient;
import io.vertx.core.http.WebSocketConnectOptions;
Expand Down Expand Up @@ -92,7 +91,7 @@ public Uni<WebSocketClientConnection> connect() {
}
subprotocols.forEach(connectOptions::addSubProtocol);

return UniHelper.toUni(client.connect(connectOptions))
return Uni.createFrom().completionStage(() -> client.connect(connectOptions).toCompletionStage())
.map(ws -> {
TrafficLogger trafficLogger = TrafficLogger.forClient(config);
WebSocketClientConnectionImpl connection = new WebSocketClientConnectionImpl(clientEndpoint.clientId, ws,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.quarkus.websockets.next.runtime.ConcurrencyLimiter.PromiseComplete;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.UniHelper;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
Expand Down Expand Up @@ -256,7 +255,7 @@ public void handle(Void event) {
}
}
});
return UniHelper.toUni(promise.future());
return Uni.createFrom().completionStage(() -> promise.future().toCompletionStage());
}

public Object beanInstance() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
import io.smallrye.mutiny.vertx.UniHelper;
import io.vertx.core.Vertx;
import io.vertx.core.http.WebSocket;
import io.vertx.core.http.WebSocketClient;
Expand Down Expand Up @@ -110,12 +109,13 @@ public Uni<JsonObject> openDevConnection(String path, String endpointPath) {
}
WebSocketClient client = vertx.createWebSocketClient();
String connectionKey = UUID.randomUUID().toString();
Uni<WebSocket> uni = UniHelper.toUni(client
Uni<WebSocket> uni = Uni.createFrom().completionStage(() -> client
.connect(new WebSocketConnectOptions()
.setPort(httpConfig.port)
.setHost(httpConfig.host)
.setURI(path)
.addHeader(DEVUI_SOCKET_KEY_HEADER, connectionKey)));
.addHeader(DEVUI_SOCKET_KEY_HEADER, connectionKey))
.toCompletionStage());
return uni.onItem().transform(s -> {
LOG.debugf("Opened Dev UI connection with key %s to %s", connectionKey, path);
List<TextMessage> messages = new ArrayList<>();
Expand Down Expand Up @@ -181,7 +181,7 @@ private static String normalize(String path) {
public Uni<JsonObject> closeDevConnection(String connectionKey) {
DevWebSocket socket = sockets.remove(connectionKey);
if (socket != null) {
Uni<Void> uni = UniHelper.toUni(socket.socket.close());
Uni<Void> uni = Uni.createFrom().completionStage(() -> socket.socket.close().toCompletionStage());
return uni.onItem().transform(v -> {
LOG.debugf("Closed Dev UI connection with key %s", connectionKey);
return new JsonObject().put("success", true);
Expand All @@ -196,7 +196,7 @@ public Uni<JsonObject> closeDevConnection(String connectionKey) {
public Uni<JsonObject> sendTextMessage(String connectionKey, String message) {
DevWebSocket socket = sockets.get(connectionKey);
if (socket != null) {
Uni<Void> uni = UniHelper.toUni(socket.socket.writeTextMessage(message));
Uni<Void> uni = Uni.createFrom().completionStage(() -> socket.socket.writeTextMessage(message).toCompletionStage());
return uni.onItem().transform(v -> {
List<TextMessage> messages = socket.messages;
synchronized (messages) {
Expand Down