Skip to content

Commit

Permalink
Update to Mutiny 2.6.0 and Reactive Messaging 4.20.0
Browse files Browse the repository at this point in the history
This also fixes the native compilation of JCTools which is now
used in Mutiny, and adds some related tests for native compilation.
  • Loading branch information
jponge committed Mar 28, 2024
1 parent 5d5bb73 commit 7d4b497
Show file tree
Hide file tree
Showing 12 changed files with 300 additions and 9 deletions.
8 changes: 4 additions & 4 deletions bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@
<smallrye-context-propagation.version>2.1.0</smallrye-context-propagation.version>
<smallrye-reactive-streams-operators.version>1.0.13</smallrye-reactive-streams-operators.version>
<smallrye-reactive-types-converter.version>3.0.1</smallrye-reactive-types-converter.version>
<smallrye-mutiny-vertx-binding.version>3.11.0</smallrye-mutiny-vertx-binding.version>
<smallrye-reactive-messaging.version>4.19.0</smallrye-reactive-messaging.version>
<smallrye-mutiny-vertx-binding.version>3.12.0</smallrye-mutiny-vertx-binding.version>
<smallrye-reactive-messaging.version>4.20.0</smallrye-reactive-messaging.version>
<smallrye-stork.version>2.6.0</smallrye-stork.version>
<jakarta.activation.version>2.1.3</jakarta.activation.version>
<jakarta.annotation-api.version>2.1.1</jakarta.annotation-api.version>
Expand Down Expand Up @@ -148,7 +148,7 @@
<brotli4j.version>1.16.0</brotli4j.version>
<reactive-streams.version>1.0.4</reactive-streams.version>
<jboss-logging.version>3.5.3.Final</jboss-logging.version>
<mutiny.version>2.5.8</mutiny.version>
<mutiny.version>2.6.0</mutiny.version>
<kafka3.version>3.7.0</kafka3.version>
<lz4.version>1.8.0</lz4.version> <!-- dependency of the kafka-clients that could be overridden by other imported BOMs in the platform -->
<snappy.version>1.1.10.5</snappy.version>
Expand Down Expand Up @@ -224,7 +224,7 @@
<org-crac.version>0.1.3</org-crac.version>
<sshd-common.version>2.12.0</sshd-common.version>
<mime4j.version>0.8.11</mime4j.version>
<mutiny-zero.version>1.0.0</mutiny-zero.version>
<mutiny-zero.version>1.1.0</mutiny-zero.version>
<pulsar-client.version>3.0.0</pulsar-client.version>
<async-http-client.version>2.12.3</async-http-client.version>
<!-- Dev UI -->
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.quarkus.mutiny.deployment;

import java.util.List;
import java.util.Optional;

import org.jboss.threads.ContextHandler;
Expand All @@ -10,6 +11,7 @@
import io.quarkus.deployment.builditem.ContextHandlerBuildItem;
import io.quarkus.deployment.builditem.ExecutorBuildItem;
import io.quarkus.deployment.builditem.ShutdownContextBuildItem;
import io.quarkus.deployment.builditem.nativeimage.UnsafeAccessedFieldBuildItem;
import io.quarkus.mutiny.runtime.MutinyInfrastructure;

public class MutinyProcessor {
Expand All @@ -31,4 +33,12 @@ public void buildTimeInit(MutinyInfrastructure recorder) {
recorder.configureThreadBlockingChecker();
recorder.configureOperatorLogger();
}

@BuildStep
public List<UnsafeAccessedFieldBuildItem> jctoolsUnsafeAccessedFields() {
return List.of(
new UnsafeAccessedFieldBuildItem(
"org.jctools.util.UnsafeRefArrayAccess",
"REF_ELEMENT_SHIFT"));
}
}
2 changes: 1 addition & 1 deletion independent-projects/arc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<version.gizmo>1.8.0</version.gizmo>
<version.jandex>3.1.7</version.jandex>
<version.jboss-logging>3.5.3.Final</version.jboss-logging>
<version.mutiny>2.5.8</version.mutiny>
<version.mutiny>2.6.0</version.mutiny>
<version.bridger>1.6.Final</version.bridger>
<!-- test versions -->
<version.assertj>3.25.3</version.assertj>
Expand Down
2 changes: 1 addition & 1 deletion independent-projects/qute/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
<version.compiler.plugin>3.12.1</version.compiler.plugin>
<version.enforcer.plugin>3.2.1</version.enforcer.plugin>
<version.surefire.plugin>3.2.5</version.surefire.plugin>
<version.smallrye-mutiny>2.5.8</version.smallrye-mutiny>
<version.smallrye-mutiny>2.6.0</version.smallrye-mutiny>
</properties>

<modules>
Expand Down
6 changes: 3 additions & 3 deletions independent-projects/resteasy-reactive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
<version.compiler.plugin>3.12.1</version.compiler.plugin>
<version.enforcer.plugin>3.2.1</version.enforcer.plugin>
<version.surefire.plugin>3.2.5</version.surefire.plugin>
<mutiny.version>2.5.8</mutiny.version>
<mutiny.version>2.6.0</mutiny.version>
<smallrye-common.version>2.3.0</smallrye-common.version>
<vertx.version>4.5.5</vertx.version>
<rest-assured.version>5.4.0</rest-assured.version>
Expand All @@ -70,10 +70,10 @@
<yasson.version>3.0.3</yasson.version>
<jakarta.json.bind-api.version>3.0.0</jakarta.json.bind-api.version>
<awaitility.version>4.2.1</awaitility.version>
<smallrye-mutiny-vertx-core.version>3.10.0</smallrye-mutiny-vertx-core.version>
<smallrye-mutiny-vertx-core.version>3.12.0</smallrye-mutiny-vertx-core.version>
<reactive-streams.version>1.0.4</reactive-streams.version>
<mockito.version>5.11.0</mockito.version>
<mutiny-zero.version>1.0.0</mutiny-zero.version>
<mutiny-zero.version>1.1.0</mutiny-zero.version>

<!-- Forbidden API checks -->
<forbiddenapis-maven-plugin.version>3.4</forbiddenapis-maven-plugin.version>
Expand Down
10 changes: 10 additions & 0 deletions integration-tests/mutiny-native-jctools/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Quarkus - Integration Tests - Mutiny native JCTools support

This integration test checks that the Mutiny extension provides support for the native compilation of JCTools, which is now used internally in Mutiny instead of old custom data structures.

This is important as JCTools makes use of `sun.misc.Unsafe` in some places.

The tests do the following:

- create all kinds of queues behind the factory `io.smallrye.mutiny.helpers.queues.Queues` interface, and
- expose a few Mutiny pipelines where queues may be needed: overflow, custom emitters, etc.
104 changes: 104 additions & 0 deletions integration-tests/mutiny-native-jctools/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>quarkus-integration-tests-parent</artifactId>
<groupId>io.quarkus</groupId>
<version>999-SNAPSHOT</version>
</parent>

<artifactId>quarkus-integration-test-mutiny-native-jctools</artifactId>

<name>Quarkus - Integration Tests - Mutiny native JCTools support</name>

<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-mutiny</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>

<!-- Minimal test dependencies to *-deployment artifacts for consistent build order -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-mutiny-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>build</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package io.quarkus.it.mutiny.nativejctools;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.MediaType;

import org.jboss.resteasy.reactive.RestStreamElementType;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.queues.Queues;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.subscription.MultiEmitter;

@Path("/tests")
public class MyResource {

@GET
@Path("create-queues")
public String createQueues() {
ArrayList<Long> sums = new ArrayList<>();
List.of(
Queues.createMpscQueue(),
Queues.createMpscArrayQueue(2048),
Queues.createSpscArrayQueue(2048),
Queues.createSpscUnboundedQueue(2048),
Queues.createSpscUnboundedArrayQueue(2048),
Queues.createSpscChunkedArrayQueue(2048)).forEach(queue -> {
for (int i = 0; i < 1024; i++) {
queue.offer(i);
}
long sum = 0L;
for (Integer n = (Integer) queue.poll(); n != null; n = (Integer) queue.poll()) {
sum = sum + n;
}
sums.add(sum);
});
return "Ok :: " + sums.stream()
.map(Objects::toString)
.collect(Collectors.joining("/"));
}

@GET
@Path("ticks-overflow")
public String ticksWithOverflow() {
List<String> ticks = Multi.createFrom().ticks().every(Duration.ofMillis(10))
.onOverflow().bufferUnconditionally()
.onItem().transform(tick -> ":")
.select().first(10)
.collect().asList().await().atMost(Duration.ofSeconds(5));
return String.join("", ticks);
}

@GET
@RestStreamElementType(MediaType.TEXT_PLAIN)
@Path("emitter")
public Multi<String> emitter() {
AtomicInteger counter = new AtomicInteger();
return Multi.createFrom().emitter(emitter -> {
ScheduledExecutorService scheduler = Infrastructure.getDefaultWorkerPool();
scheduler.schedule(() -> emitAndSchedule(counter, emitter), 100, TimeUnit.MILLISECONDS);
});
}

private void emitAndSchedule(AtomicInteger counter, MultiEmitter<? super String> emitter) {
ScheduledExecutorService scheduler = Infrastructure.getDefaultWorkerPool();
int n = counter.getAndIncrement();
if (n < 5) {
for (int i = 0; i < n * 10; i++) {
emitter.emit(String.valueOf(i));
}
scheduler.schedule(() -> emitAndSchedule(counter, emitter), 125, TimeUnit.MILLISECONDS);
} else {
emitter.complete();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.quarkus.it.mutiny.nativejctools;

import io.quarkus.test.junit.QuarkusIntegrationTest;

@QuarkusIntegrationTest
public class MyResourceIT extends MyResourceTest {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package io.quarkus.it.mutiny.nativejctools;

import static io.restassured.RestAssured.get;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.jupiter.api.Assertions.assertIterableEquals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.client.WebTarget;
import jakarta.ws.rs.sse.SseEventSource;

import org.junit.jupiter.api.Test;

import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;

@QuarkusTest
public class MyResourceTest {

@Test
public void testCreateQueues() {
get("/tests/create-queues")
.then()
.body(is("Ok :: 523776/523776/523776/523776/523776/523776"))
.statusCode(200);
}

@Test
public void testTicksWithOverflow() {
get("/tests/ticks-overflow")
.then()
.body(is("::::::::::"))
.statusCode(200);
}

@Test
public void testEmitter() throws Throwable {
Client client = ClientBuilder.newClient();
WebTarget target = client.target("http://localhost:" + RestAssured.port + "/tests/emitter");
SseEventSource eventSource = SseEventSource.target(target).build();

AtomicBoolean done = new AtomicBoolean();
AtomicReference<Throwable> failure = new AtomicReference<>();
ArrayList<String> events = new ArrayList<>();

eventSource.register(
event -> events.add(event.readData()),
failure::set,
() -> done.set(true));
eventSource.open();

await().atMost(Duration.ofSeconds(10))
.until(() -> done.get() || failure.get() != null);

if (failure.get() != null) {
throw failure.get();
}

ArrayList<String> expected = new ArrayList<>();
for (int i = 0; i < 5; i++) {
for (int j = 0; j < i * 10; j++) {
expected.add(String.valueOf(j));
}
}
assertIterableEquals(expected, events);
}
}
1 change: 1 addition & 0 deletions integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@
<module>mtls-certificates</module>
<!-- Virtual threads test -->
<module>virtual-threads</module>
<module>mutiny-native-jctools</module>
</modules>
</profile>

Expand Down

0 comments on commit 7d4b497

Please sign in to comment.