Skip to content

Commit

Permalink
WebSockets Next: configuration updates
Browse files Browse the repository at this point in the history
- remove unimplemented timeout
- add compression support/level
- add max message size
- also invoke error handlers for connection exceptions
- resolves quarkusio#39590
  • Loading branch information
mkouba committed Apr 18, 2024
1 parent 572f286 commit 6a1d47a
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package io.quarkus.websockets.next.test.maxmessagesize;

import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.concurrent.atomic.AtomicBoolean;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.OnError;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.test.utils.WSClient;
import io.vertx.core.Vertx;

public class MaxMessageSizeTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(Echo.class, WSClient.class);
}).overrideConfigKey("quarkus.websockets-next.max-message-size", "10");

@Inject
Vertx vertx;

@TestHTTPResource("/echo")
URI echoUri;

@Test
void testMaxMessageSize() {
WSClient client = WSClient.create(vertx).connect(echoUri);
String msg = "foo".repeat(10);
String reply = client.sendAndAwaitReply(msg).toString();
assertNotEquals(msg, reply);
assertTrue(Echo.ISE_THROWN.get());
}

@WebSocket(path = "/echo")
public static class Echo {

static final AtomicBoolean ISE_THROWN = new AtomicBoolean();

@OnTextMessage
String process(String message) {
return message;
}

@OnError
String onError(IllegalStateException ise) {
ISE_THROWN.set(true);
return ise.getMessage();
}

}

}
Original file line number Diff line number Diff line change
@@ -1,29 +1,42 @@
package io.quarkus.websockets.next;

import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;

import io.quarkus.runtime.annotations.ConfigPhase;
import io.quarkus.runtime.annotations.ConfigRoot;
import io.smallrye.config.ConfigMapping;
import io.smallrye.config.WithDefault;
import io.vertx.core.http.HttpServerOptions;

@ConfigMapping(prefix = "quarkus.websockets-next")
@ConfigRoot(phase = ConfigPhase.RUN_TIME)
public interface WebSocketsRuntimeConfig {

/**
* See <a href="https://datatracker.ietf.org/doc/html/rfc6455#page-12">The WebSocket Protocol</a>
*
* @return the supported subprotocols
*/
Optional<List<String>> supportedSubprotocols();

/**
* TODO Not implemented yet.
*
* The default timeout to complete processing of a message.
* Compression Extensions for WebSocket are supported by default.
* <p>
* See also <a href="https://datatracker.ietf.org/doc/html/rfc7692">RFC 7692</a>
*/
Optional<Duration> timeout();
@WithDefault("true")
boolean perMessageCompressionSupported();

/**
* The compression level must be a value between 0 and 9. The default value is
* {@value HttpServerOptions#DEFAULT_WEBSOCKET_COMPRESSION_LEVEL}.
*/
OptionalInt compressionLevel();

/**
* The maximum size of a message in bytes. The default values is
* {@value HttpServerOptions#DEFAULT_MAX_WEBSOCKET_MESSAGE_SIZE}.
*/
OptionalInt maxMessageSize();

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ void endSession() {
}

ContextState currentRequestContextState() {
return requestContext.getState();
return requestContext.getStateIfActive();
}

static Context createNewDuplicatedContext(Context context, WebSocketConnection connection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,23 @@ public class WebSocketHttpServerOptionsCustomizer implements HttpServerOptionsCu

@Override
public void customizeHttpServer(HttpServerOptions options) {
config.supportedSubprotocols().orElse(List.of()).forEach(options::addWebSocketSubProtocol);
customize(options);
}

@Override
public void customizeHttpsServer(HttpServerOptions options) {
customize(options);
}

private void customize(HttpServerOptions options) {
config.supportedSubprotocols().orElse(List.of()).forEach(options::addWebSocketSubProtocol);
options.setPerMessageWebSocketCompressionSupported(config.perMessageCompressionSupported());
if (config.compressionLevel().isPresent()) {
options.setWebSocketCompressionLevel(config.compressionLevel().getAsInt());
}
if (config.maxMessageSize().isPresent()) {
options.setMaxWebSocketMessageSize(config.maxMessageSize().getAsInt());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,21 @@ public void handle(Void event) {
});
}
});

ws.exceptionHandler(new Handler<Throwable>() {
@Override
public void handle(Throwable t) {
ContextSupport.createNewDuplicatedContext(context, connection).runOnContext(new Handler<Void>() {
@Override
public void handle(Void event) {
endpoint.doOnError(t).subscribe().with(
v -> LOG.debugf("Error [%s] processed: %s", t.getClass(), connection),
t -> LOG.errorf(t, "Unhandled error occured: %s", t.toString(),
connection));
}
});
}
});
});
}
};
Expand Down

0 comments on commit 6a1d47a

Please sign in to comment.