Skip to content

Commit

Permalink
WebSockets Next: add support for Kotlin suspend functions
Browse files Browse the repository at this point in the history
Kotlin `suspend` functions are treated like Java methods that return `Uni`.
That is, they are considered non-blocking. The implementation uses CDI
method invokers (to avoid custom bytecode generation), which actually
convert the `suspend` function result into a `Uni` under the hood.

With this commit, only single-shot `suspend` functions are supported;
`suspend` functions returning `Flow` are not supported yet.
  • Loading branch information
Ladicek committed Jul 10, 2024
1 parent 389f694 commit 2517290
Show file tree
Hide file tree
Showing 25 changed files with 941 additions and 79 deletions.
5 changes: 5 additions & 0 deletions bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2166,6 +2166,11 @@
<artifactId>quarkus-websockets-next-deployment</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-websockets-next-kotlin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-undertow-spi</artifactId>
Expand Down
3 changes: 3 additions & 0 deletions docs/src/main/asciidoc/kotlin.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,9 @@ The following extensions provide support for Kotlin Coroutines by allowing the u
|`quarkus-vertx`
|Support is provided for `@ConsumeEvent` methods

|`quarkus-websockets-next`
|Support is provided for server-side and client-side endpoint methods

|===

=== Kotlin coroutines and Mutiny
Expand Down
28 changes: 17 additions & 11 deletions docs/src/main/asciidoc/websockets-next-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ The session context remains active until the `@OnClose` method completes executi
In cases where a WebSocket endpoint does not declare an `@OnOpen` method, the session context is still created.
It remains active until the connection terminates, regardless of the presence of an `@OnClose` method.

Methods annotated with `@OnTextMessage,` `@OnBinaryMessage,` `@OnOpen`, and `@OnClose` also have the request scoped activated for the duration of the method execution (until it produced its result).
Methods annotated with `@OnTextMessage,` `@OnBinaryMessage,` `@OnOpen`, and `@OnClose` also have the request scope activated for the duration of the method execution (until it produced its result).

[[callback-methods]]
=== Callback methods
Expand Down Expand Up @@ -224,6 +224,7 @@ Here are the rules governing execution:
* When `@RunOnVirtualThread` is employed, each invocation spawns a new virtual thread.
* Methods returning `CompletionStage`, `Uni` and `Multi` are considered non-blocking.
* Methods returning `void` or plain objects are considered blocking.
* Kotlin `suspend` functions are considered non-blocking.

==== Method parameters

Expand All @@ -248,10 +249,12 @@ The method must subscribe to the `Multi` to receive these items (or return a Mul
Methods annotated with `@OnTextMessage` or `@OnBinaryMessage` can return various types to handle WebSocket communication efficiently:

* `void`: Indicates a blocking method where no explicit response is sent back to the client.
* `Uni<Void>`: Denotes a non-blocking method where the completion of the returned Uni signifies the end of processing. No explicit response is sent back to the client.
* `Uni<Void>`: Denotes a non-blocking method where the completion of the returned `Uni` signifies the end of processing. No explicit response is sent back to the client.
* An object of type `X` represents a blocking method in which the returned object is serialized and sent back to the client as a response.
* `Uni<X>`: Specifies a non-blocking method where the item emitted by the non-null `Uni` is sent to the client as a response.
* `Multi<X>`: Indicates a non-blocking method where the items emitted by the non-null `Multi` are sequentially sent to the client until completion or cancellation.
* Kotlin `suspend` function returning `Unit`: Denotes a non-blocking method where no explicit response is sent back to the client.
* Kotlin `suspend` function returning `X`: Specifies a non-blocking method where the returned item is sent to the client as a response.

Here are some examples of these methods:

Expand Down Expand Up @@ -381,6 +384,8 @@ The supported return types for `@OnOpen` methods are:
* An object of type `X`: Represents a blocking method where the returned object is serialized and sent back to the client.
* `Uni<X>`: Specifies a non-blocking method where the item emitted by the non-null `Uni` is sent to the client.
* `Multi<X>`: Indicates a non-blocking method where the items emitted by the non-null `Multi` are sequentially sent to the client until completion or cancellation.
* Kotlin `suspend` function returning `Unit`: Denotes a non-blocking method where no explicit message is sent back to the client.
* Kotlin `suspend` function returning `X`: Specifies a non-blocking method where the returned item is sent to the client.

Items sent to the client are <<serialization,serialized>> except for the `String`, `io.vertx.core.json.JsonObject`, `io.vertx.core.json.JsonArray`, `io.vertx.core.buffer.Buffer`, and `byte[]` types.
In the case of `Multi`, Quarkus subscribes to the returned `Multi` and writes the items to the `WebSocket` as they are emitted.
Expand All @@ -391,6 +396,7 @@ For `@OnClose` methods, the supported return types include:

* `void`: The method is considered blocking.
* `Uni<Void>`: The method is considered non-blocking.
* Kotlin `suspend` function returning `Unit`: The method is considered non-blocking.

NOTE: `@OnClose` methods declared on a server endpoint cannot send items to the connected client by returning objects.
They can only send messages to the other clients by using the `WebSocketConnection` object.
Expand Down Expand Up @@ -424,7 +430,7 @@ Alternatively, an error message can be logged or no operation performed.

The WebSocket Next extension supports automatic serialization and deserialization of messages.

Objects of type `String`, `JsonObject`, `JsonArray`, `Buffer`, and `byte[]` are sent as-is and by-pass the serialization and deserialization.
Objects of type `String`, `JsonObject`, `JsonArray`, `Buffer`, and `byte[]` are sent as-is and bypass the serialization and deserialization.
When no codec is provided, the serialization and deserialization convert the message from/to JSON automatically.

When you need to customize the serialization and deserialization, you can provide a custom codec.
Expand Down Expand Up @@ -485,7 +491,7 @@ Item find(Item item) {
//....
}
----
1. Specify the codec to use for both the deserialization of the incoming message
1. Specify the codec to use for the deserialization of the incoming message
2. Specify the codec to use for the serialization of the outgoing message

=== Ping/pong messages
Expand All @@ -509,7 +515,7 @@ quarkus.websockets-next.server.auto-ping-interval=2 <1>

The `@OnPongMessage` annotation is used to define a callback that consumes pong messages sent from the client/server.
An endpoint must declare at most one method annotated with `@OnPongMessage`.
The callback method must return either `void` or `Uni<Void>`, and it must accept a single parameter of type `Buffer`.
The callback method must return either `void` or `Uni<Void>` (or be a Kotlin `suspend` function returning `Unit`), and it must accept a single parameter of type `Buffer`.

[source,java]
----
Expand Down Expand Up @@ -539,18 +545,18 @@ This extension reuses the _main_ HTTP server.

Thus, the configuration of the WebSocket server is done in the `quarkus.http.` configuration section.

WebSocket paths configured within the application are concatenated with the root path defined by `quarkus.http.root` (which defaults to /).
WebSocket paths configured within the application are concatenated with the root path defined by `quarkus.http.root` (which defaults to `/`).
This concatenation ensures that WebSocket endpoints are appropriately positioned within the application's URL structure.

Refer to the xref:http-reference.adoc[HTTP guide] for more details.

=== Sub-websockets endpoints

A `@WebSocket` endpoint can encapsulate static nested classes, which are also annotated with /`@WebSocket` and represent _sub-websockets_.
The resulting path of these sub-web sockets concatenates the path from the enclosing class and the nested class.
A `@WebSocket` endpoint can encapsulate static nested classes, which are also annotated with `@WebSocket` and represent _sub-websockets_.
The resulting path of these sub-websockets concatenates the path from the enclosing class and the nested class.
The resulting path is normalized, following the HTTP URL rules.

Sub-web sockets inherit access to the path parameters declared in the `@WebSocket` annotation of both the enclosing and nested classes.
Sub-websockets inherit access to the path parameters declared in the `@WebSocket` annotation of both the enclosing and nested classes.
The `consumePrimary` method within the enclosing class can access the `version` parameter in the following example.
Meanwhile, the `consumeNested` method within the nested class can access both `version` and `id` parameters:

Expand Down Expand Up @@ -884,9 +890,9 @@ public class MyBean {
<4> Set the execution model for callback handlers. By default, the callback may block the current thread. However in this case, the callback is executed on the event loop and may not block the current thread.
<5> The lambda will be called for every text message sent from the server.

The basic connector is closed to a low-level API and is reserved for advanced users.
The basic connector is closer to a low-level API and is reserved for advanced users.
However, unlike others low-level WebSocket clients, it is still a CDI bean and can be injected in other beans.
It also provides a way to configure the execution model of the callbacks, ensuring the optimal integration with the rest of Quarkus.
It also provides a way to configure the execution model of the callbacks, ensuring optimal integration with the rest of Quarkus.

[[ws-client-connection]]
=== WebSocket client connection
Expand Down
72 changes: 72 additions & 0 deletions extensions/websockets-next/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,56 @@
<artifactId>smallrye-certificate-generator-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>mutiny-kotlin</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-maven-plugin</artifactId>
<version>${kotlin.version}</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<sourceDirs>
<sourceDir>${project.basedir}/src/main/kotlin</sourceDir>
<sourceDir>${project.basedir}/src/main/java</sourceDir>
</sourceDirs>
</configuration>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>test-compile</goal>
</goals>
<configuration>
<sourceDirs>
<sourceDir>${project.basedir}/src/test/kotlin</sourceDir>
<sourceDir>${project.basedir}/src/test/java</sourceDir>
</sourceDirs>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
Expand All @@ -80,6 +126,32 @@
</path>
</annotationProcessorPaths>
</configuration>
<executions>
<!-- Replacing default-compile as it is treated specially by Maven -->
<execution>
<id>default-compile</id>
<phase>none</phase>
</execution>
<!-- Replacing default-testCompile as it is treated specially by Maven -->
<execution>
<id>default-testCompile</id>
<phase>none</phase>
</execution>
<execution>
<id>java-compile</id>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>java-test-compile</id>
<phase>test-compile</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

import io.quarkus.arc.deployment.TransformedAnnotationsBuildItem;
import io.quarkus.arc.processor.Annotations;
import io.quarkus.arc.processor.BeanInfo;
import io.quarkus.arc.processor.DotNames;
import io.quarkus.arc.processor.KotlinDotNames;
import io.quarkus.arc.processor.KotlinUtils;
import io.quarkus.gizmo.BytecodeCreator;
import io.quarkus.gizmo.FieldDescriptor;
import io.quarkus.gizmo.ResultHandle;
Expand All @@ -35,15 +38,17 @@ public class Callback {
public final Target target;
public final String endpointPath;
public final AnnotationInstance annotation;
public final BeanInfo bean;
public final MethodInfo method;
public final ExecutionModel executionModel;
public final MessageType messageType;
public final List<CallbackArgument> arguments;

public Callback(Target target, AnnotationInstance annotation, MethodInfo method, ExecutionModel executionModel,
CallbackArgumentsBuildItem callbackArguments, TransformedAnnotationsBuildItem transformedAnnotations,
String endpointPath, IndexView index) {
public Callback(Target target, AnnotationInstance annotation, BeanInfo bean, MethodInfo method,
ExecutionModel executionModel, CallbackArgumentsBuildItem callbackArguments,
TransformedAnnotationsBuildItem transformedAnnotations, String endpointPath, IndexView index) {
this.target = target;
this.bean = bean;
this.method = method;
this.annotation = annotation;
this.executionModel = executionModel;
Expand Down Expand Up @@ -104,6 +109,15 @@ public boolean isReturnTypeMulti() {
return WebSocketDotNames.MULTI.equals(returnType().name());
}

public boolean isKotlinSuspendFunction() {
return KotlinUtils.isKotlinSuspendMethod(method);
}

public boolean isKotlinSuspendFunctionReturningUnit() {
return KotlinUtils.isKotlinSuspendMethod(method)
&& KotlinUtils.getKotlinSuspendMethodResult(method).name().equals(KotlinDotNames.UNIT);
}

public boolean acceptsMessage() {
return messageType != MessageType.NONE;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.quarkus.websockets.next.deployment;

import io.quarkus.arc.processor.KotlinUtils;
import io.quarkus.gizmo.ResultHandle;

public class KotlinContinuationCallbackArgument implements CallbackArgument {
@Override
public boolean matches(ParameterContext context) {
return KotlinUtils.isKotlinContinuationParameter(context.parameter());
}

@Override
public ResultHandle get(InvocationBytecodeContext context) {
// the actual value is provided by the invoker
return context.bytecode().loadNull();
}
}
Loading

0 comments on commit 2517290

Please sign in to comment.