diff --git a/bom/application/pom.xml b/bom/application/pom.xml
index 2d36d5c687efb..a0f86ac29cf94 100644
--- a/bom/application/pom.xml
+++ b/bom/application/pom.xml
@@ -59,15 +59,15 @@
4.0.1
4.0.0
3.1.1
- 2.0.1
+ 2.1.0.Beta1
3.0.0
6.1.0
4.0.0
2.0.0
1.0.13
- 2.7.0
- 2.30.1
- 4.1.1
+ 3.0.0
+ 3.2.0
+ 4.1.1.RC1
1.4.1
2.1.0
2.1.1
@@ -145,9 +145,9 @@
3.1.1
4.1.86.Final
1.8.0
- 1.0.3
+ 1.0.4
3.5.0.Final
- 1.8.0
+ 2.0.0
3.3.1
1.8.0
1.1.8.4
@@ -217,6 +217,7 @@
0.0.6
0.1.3
2.9.2
+ 1.0.0
@@ -5442,6 +5443,11 @@
+
+ io.smallrye.reactive
+ mutiny-zero-flow-adapters
+ ${mutiny-zero.version}
+
org.eclipse.microprofile.reactive-streams-operators
microprofile-reactive-streams-operators-api
diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/GrpcReflectionTest.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/GrpcReflectionTest.java
index 1e748b893d4ce..80d495f1e33ae 100644
--- a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/GrpcReflectionTest.java
+++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/GrpcReflectionTest.java
@@ -5,6 +5,7 @@
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.Flow;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
@@ -12,8 +13,6 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.Subscription;
import com.google.protobuf.ByteString;
@@ -233,15 +232,15 @@ public void testRetrievingAllExtensionNumbersOfType() {
assertThat(list).containsExactlyInAnyOrderElementsOf(expected);
}
- private static class ResettableSubscriber implements Subscriber {
+ private static class ResettableSubscriber implements Flow.Subscriber {
- private Subscription subscription;
+ private Flow.Subscription subscription;
private volatile T last;
private boolean completed;
private Throwable failure;
@Override
- public void onSubscribe(Subscription subscription) {
+ public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
}
diff --git a/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/Attachment.java b/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/Attachment.java
index bc3e31c329600..7e11540c1eb99 100644
--- a/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/Attachment.java
+++ b/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/Attachment.java
@@ -3,8 +3,7 @@
import java.io.File;
import java.util.Iterator;
import java.util.NoSuchElementException;
-
-import org.reactivestreams.Publisher;
+import java.util.concurrent.Flow.Publisher;
import io.smallrye.mutiny.Multi;
diff --git a/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/Mail.java b/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/Mail.java
index 07e094bcf5b6f..5ee50038adc2e 100644
--- a/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/Mail.java
+++ b/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/Mail.java
@@ -2,8 +2,7 @@
import java.io.File;
import java.util.*;
-
-import org.reactivestreams.Publisher;
+import java.util.concurrent.Flow.Publisher;
/**
* Represents an e-mail.
diff --git a/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/runtime/MutinyMailerImpl.java b/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/runtime/MutinyMailerImpl.java
index e69674e7adcf5..302656b6e58a6 100644
--- a/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/runtime/MutinyMailerImpl.java
+++ b/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/runtime/MutinyMailerImpl.java
@@ -6,6 +6,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Flow.Publisher;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -13,7 +14,6 @@
import jakarta.inject.Inject;
import org.jboss.logging.Logger;
-import org.reactivestreams.Publisher;
import io.quarkus.mailer.Attachment;
import io.quarkus.mailer.Mail;
diff --git a/extensions/mailer/runtime/src/test/java/io/quarkus/mailer/AttachmentTest.java b/extensions/mailer/runtime/src/test/java/io/quarkus/mailer/AttachmentTest.java
index cc73a1ec7bad1..b1bd405c634e2 100644
--- a/extensions/mailer/runtime/src/test/java/io/quarkus/mailer/AttachmentTest.java
+++ b/extensions/mailer/runtime/src/test/java/io/quarkus/mailer/AttachmentTest.java
@@ -7,12 +7,12 @@
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.UUID;
+import java.util.concurrent.Flow;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.reactivestreams.Publisher;
import io.quarkus.mailer.runtime.MutinyMailerImpl;
import io.smallrye.mutiny.Multi;
@@ -70,7 +70,7 @@ void testInlineAttachmentCreationFromFile() {
@Test
void testAttachmentCreationFromStream() {
- Publisher publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true))
+ Flow.Publisher publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true))
.onItem().transformToMulti(af -> af.toMulti()
.onItem().transformToIterable(this::getBytes));
@@ -109,7 +109,7 @@ public Byte next() {
@Test
void testInlineAttachmentCreationFromStream() {
- Publisher publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true))
+ Flow.Publisher publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true))
.onItem().transformToMulti(af -> af.toMulti()
.onItem().transformToIterable(this::getBytes));
@@ -128,7 +128,7 @@ void testInlineAttachmentCreationFromStream() {
@Test
void testAttachmentCreationWithDescription() {
- Publisher publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true))
+ Flow.Publisher publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true))
.onItem().transformToMulti(af -> af.toMulti()
.onItem().transformToIterable(this::getBytes));
@@ -154,7 +154,7 @@ private String getContent(Attachment attachment) {
@Test
void testInlineAttachmentCreationWithDescription() {
- Publisher publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true))
+ Flow.Publisher publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true))
.onItem().transformToMulti(af -> af.toMulti()
.onItem().transformToIterable(this::getBytes));
diff --git a/extensions/mongodb-client/runtime/pom.xml b/extensions/mongodb-client/runtime/pom.xml
index f02251708d3ee..187488cbdcbcf 100644
--- a/extensions/mongodb-client/runtime/pom.xml
+++ b/extensions/mongodb-client/runtime/pom.xml
@@ -24,6 +24,10 @@
io.quarkus
quarkus-mutiny
+
+ io.smallrye.reactive
+ mutiny-zero-flow-adapters
+
io.quarkus
quarkus-vertx
diff --git a/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/ReactiveMongoCollectionImpl.java b/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/ReactiveMongoCollectionImpl.java
index 1196ff7fc668e..acac2af661e43 100644
--- a/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/ReactiveMongoCollectionImpl.java
+++ b/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/ReactiveMongoCollectionImpl.java
@@ -1,6 +1,7 @@
package io.quarkus.mongodb.impl;
import java.util.List;
+import java.util.concurrent.Flow;
import org.bson.Document;
import org.bson.codecs.configuration.CodecRegistry;
@@ -47,6 +48,7 @@
import io.quarkus.mongodb.reactive.ReactiveMongoCollection;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
+import mutiny.zero.flow.adapters.AdaptersToFlow;
public class ReactiveMongoCollectionImpl implements ReactiveMongoCollection {
@@ -262,11 +264,11 @@ public Multi aggregate(ClientSession clientSession, List extends Bson>
return Wrappers.toMulti(collection.aggregate(clientSession, pipeline, clazz));
}
- private AggregatePublisher apply(AggregateOptions options, AggregatePublisher publisher) {
+ private Flow.Publisher apply(AggregateOptions options, AggregatePublisher publisher) {
if (options == null) {
- return publisher;
+ return AdaptersToFlow.publisher(publisher);
}
- return options.apply(publisher);
+ return AdaptersToFlow.publisher(options.apply(publisher));
}
@Override
@@ -409,11 +411,11 @@ public Multi mapReduce(String mapFunction, String reduceFunction, MapReduceOp
return Multi.createFrom().publisher(apply(options, collection.mapReduce(mapFunction, reduceFunction)));
}
- private MapReducePublisher apply(MapReduceOptions options, MapReducePublisher mapReduce) {
+ private Flow.Publisher apply(MapReduceOptions options, MapReducePublisher mapReduce) {
if (options == null) {
- return mapReduce;
+ return AdaptersToFlow.publisher(mapReduce);
}
- return options.apply(mapReduce);
+ return AdaptersToFlow.publisher(options.apply(mapReduce));
}
@Override
diff --git a/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/ReactiveMongoDatabaseImpl.java b/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/ReactiveMongoDatabaseImpl.java
index 41249cdf9f7ba..b439c44f8cdd5 100644
--- a/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/ReactiveMongoDatabaseImpl.java
+++ b/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/ReactiveMongoDatabaseImpl.java
@@ -1,6 +1,7 @@
package io.quarkus.mongodb.impl;
import java.util.List;
+import java.util.concurrent.Flow;
import org.bson.Document;
import org.bson.conversions.Bson;
@@ -22,6 +23,7 @@
import io.quarkus.mongodb.reactive.ReactiveMongoDatabase;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
+import mutiny.zero.flow.adapters.AdaptersToFlow;
public class ReactiveMongoDatabaseImpl implements ReactiveMongoDatabase {
@@ -127,12 +129,11 @@ public Multi listCollections(Class clazz, CollectionListOptions option
return Multi.createFrom().publisher(apply(options, database.listCollections(clazz)));
}
- private ListCollectionsPublisher apply(CollectionListOptions options,
- ListCollectionsPublisher collections) {
+ private Flow.Publisher apply(CollectionListOptions options, ListCollectionsPublisher collections) {
if (options == null) {
- return collections;
+ return AdaptersToFlow.publisher(collections);
} else {
- return options.apply(collections);
+ return AdaptersToFlow.publisher(options.apply(collections));
}
}
diff --git a/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/Wrappers.java b/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/Wrappers.java
index 7cb38ba3d350b..f9474f1899689 100644
--- a/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/Wrappers.java
+++ b/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/Wrappers.java
@@ -8,6 +8,7 @@
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
+import mutiny.zero.flow.adapters.AdaptersToFlow;
class Wrappers {
@@ -17,7 +18,7 @@ private Wrappers() {
static Uni toUni(Publisher publisher) {
Context context = Vertx.currentContext();
- Uni uni = Uni.createFrom().publisher(publisher);
+ Uni uni = Uni.createFrom().publisher(AdaptersToFlow.publisher(publisher));
if (context != null) {
return uni.emitOn(command -> context.runOnContext(x -> command.run()));
}
@@ -27,15 +28,16 @@ static Uni toUni(Publisher publisher) {
static Multi toMulti(Publisher publisher) {
Context context = Vertx.currentContext();
if (context != null) {
- return Multi.createFrom().publisher(publisher).emitOn(command -> context.runOnContext(x -> command.run()));
+ return Multi.createFrom().publisher(AdaptersToFlow.publisher(publisher))
+ .emitOn(command -> context.runOnContext(x -> command.run()));
} else {
- return Multi.createFrom().publisher(publisher);
+ return Multi.createFrom().publisher(AdaptersToFlow.publisher(publisher));
}
}
static Uni> toUniOfList(Publisher publisher) {
Context context = Vertx.currentContext();
- Uni> uni = Multi.createFrom().publisher(publisher)
+ Uni> uni = Multi.createFrom().publisher(AdaptersToFlow.publisher(publisher))
.collect().asList();
if (context != null) {
diff --git a/extensions/narayana-jta/runtime/pom.xml b/extensions/narayana-jta/runtime/pom.xml
index fde4e86a79887..5a9116588ff8d 100644
--- a/extensions/narayana-jta/runtime/pom.xml
+++ b/extensions/narayana-jta/runtime/pom.xml
@@ -51,6 +51,10 @@
io.smallrye.reactive
smallrye-reactive-converter-mutiny
+
+ io.smallrye.reactive
+ mutiny-zero-flow-adapters
+
org.graalvm.nativeimage
svm
diff --git a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/interceptor/TransactionalInterceptorBase.java b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/interceptor/TransactionalInterceptorBase.java
index c281e78eae50b..d965b3b1da999 100644
--- a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/interceptor/TransactionalInterceptorBase.java
+++ b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/interceptor/TransactionalInterceptorBase.java
@@ -9,6 +9,7 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Flow;
import java.util.function.Function;
import jakarta.inject.Inject;
@@ -33,6 +34,8 @@
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.converters.ReactiveTypeConverter;
import io.smallrye.reactive.converters.Registry;
+import mutiny.zero.flow.adapters.AdaptersToFlow;
+import mutiny.zero.flow.adapters.AdaptersToReactiveStreams;
public abstract class TransactionalInterceptorBase implements Serializable {
@@ -138,8 +141,7 @@ protected Object invokeInOurTx(InvocationContext ic, TransactionManager tm, Runn
// handle asynchronously if not throwing
if (!throwing && ret != null) {
ReactiveTypeConverter
+
+ io.smallrye.reactive
+ mutiny-zero-flow-adapters
+
org.jboss.resteasy
resteasy-client
diff --git a/extensions/resteasy-classic/resteasy-mutiny-common/runtime/src/main/java/io/quarkus/resteasy/mutiny/common/runtime/MultiProvider.java b/extensions/resteasy-classic/resteasy-mutiny-common/runtime/src/main/java/io/quarkus/resteasy/mutiny/common/runtime/MultiProvider.java
index fa53954b5168e..a8fcdac529730 100644
--- a/extensions/resteasy-classic/resteasy-mutiny-common/runtime/src/main/java/io/quarkus/resteasy/mutiny/common/runtime/MultiProvider.java
+++ b/extensions/resteasy-classic/resteasy-mutiny-common/runtime/src/main/java/io/quarkus/resteasy/mutiny/common/runtime/MultiProvider.java
@@ -4,10 +4,11 @@
import org.reactivestreams.Publisher;
import io.smallrye.mutiny.Multi;
+import mutiny.zero.flow.adapters.AdaptersToReactiveStreams;
public class MultiProvider implements AsyncStreamProvider> {
@Override
public Publisher> toAsyncStream(Multi> multi) {
- return multi;
+ return AdaptersToReactiveStreams.publisher(multi);
}
-}
\ No newline at end of file
+}
diff --git a/extensions/resteasy-classic/resteasy-mutiny-common/runtime/src/test/java/io/quarkus/resteasy/mutiny/common/test/MultiProviderTest.java b/extensions/resteasy-classic/resteasy-mutiny-common/runtime/src/test/java/io/quarkus/resteasy/mutiny/common/test/MultiProviderTest.java
index 1144b7e535044..9ff98a595da42 100644
--- a/extensions/resteasy-classic/resteasy-mutiny-common/runtime/src/test/java/io/quarkus/resteasy/mutiny/common/test/MultiProviderTest.java
+++ b/extensions/resteasy-classic/resteasy-mutiny-common/runtime/src/test/java/io/quarkus/resteasy/mutiny/common/test/MultiProviderTest.java
@@ -8,6 +8,7 @@
import io.quarkus.resteasy.mutiny.common.runtime.MultiProvider;
import io.smallrye.mutiny.Multi;
+import mutiny.zero.flow.adapters.AdaptersToFlow;
public class MultiProviderTest {
@@ -17,7 +18,8 @@ public class MultiProviderTest {
public void test() {
Multi> multi = Multi.createFrom().items(1, 2, 3);
Publisher> publisher = provider.toAsyncStream(multi);
- List> list = Multi.createFrom().publisher(publisher).collect().asList().await().indefinitely();
+ List> list = Multi.createFrom().publisher(AdaptersToFlow.publisher(publisher)).collect().asList().await()
+ .indefinitely();
Assertions.assertEquals(1, list.get(0));
Assertions.assertEquals(2, list.get(1));
Assertions.assertEquals(3, list.get(2));
diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamResource.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamResource.java
index c42dea196633a..a6f8ea1df87a7 100644
--- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamResource.java
+++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamResource.java
@@ -1,6 +1,7 @@
package io.quarkus.resteasy.reactive.server.test.stream;
import java.util.Date;
+import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -20,6 +21,7 @@
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.vertx.core.buffer.Buffer;
+import mutiny.zero.flow.adapters.AdaptersToReactiveStreams;
@Path("stream")
public class StreamResource {
@@ -43,10 +45,17 @@ public Multi getStreamedText() {
@Path("text/stream/publisher")
@GET
@Produces(MediaType.TEXT_PLAIN)
- public Publisher getStreamedTextPublisher() {
+ public Flow.Publisher getStreamedTextPublisher() {
return Multi.createFrom().items("foo", "bar");
}
+ @Path("text/stream/legacy-publisher")
+ @GET
+ @Produces(MediaType.TEXT_PLAIN)
+ public Publisher getStreamedTextLegacyPublisher() {
+ return AdaptersToReactiveStreams.publisher(Multi.createFrom().items("foo", "bar"));
+ }
+
@Path("byte-arrays/collect")
@GET
@Produces(MediaType.TEXT_PLAIN)
@@ -78,7 +87,7 @@ public Multi getStreamedCharArrays() {
@Path("char-arrays/stream/publisher")
@GET
@Produces(MediaType.TEXT_PLAIN)
- public Publisher getStreamedCharArraysPublisher() {
+ public Flow.Publisher getStreamedCharArraysPublisher() {
return Multi.createFrom().items("foo".toCharArray(), "bar".toCharArray());
}
diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamTestCase.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamTestCase.java
index 75e9830a024f2..7a2135bd216cd 100644
--- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamTestCase.java
+++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamTestCase.java
@@ -105,6 +105,10 @@ public void testStreaming() throws Exception {
.then()
.statusCode(200)
.body(Matchers.equalTo("foobar"));
+ RestAssured.get("/stream/text/stream/legacy-publisher")
+ .then()
+ .statusCode(200)
+ .body(Matchers.equalTo("foobar"));
RestAssured.get("/stream/text/collect")
.then()
.statusCode(200)
diff --git a/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/amqp/ProducingBean.java b/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/amqp/ProducingBean.java
index 93771a283c998..d708936335aa2 100644
--- a/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/amqp/ProducingBean.java
+++ b/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/amqp/ProducingBean.java
@@ -1,11 +1,11 @@
package io.quarkus.smallrye.reactivemessaging.amqp;
import java.time.Duration;
+import java.util.concurrent.Flow;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
-import org.reactivestreams.Publisher;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
@@ -14,7 +14,7 @@
public class ProducingBean {
@Outgoing("source")
- public Publisher generate() {
+ public Flow.Publisher generate() {
return Multi.createFrom().range(1, 11)
.map(Integer::longValue)
.map(i -> i * 2)
diff --git a/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/amqp/devmode/nohttp/Producer.java b/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/amqp/devmode/nohttp/Producer.java
index f7ae0181c2a3a..812ed0ac96845 100644
--- a/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/amqp/devmode/nohttp/Producer.java
+++ b/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/amqp/devmode/nohttp/Producer.java
@@ -1,18 +1,18 @@
package io.quarkus.smallrye.reactivemessaging.amqp.devmode.nohttp;
import java.time.Duration;
+import java.util.concurrent.Flow;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
-import org.reactivestreams.Publisher;
import io.smallrye.mutiny.Multi;
@ApplicationScoped
public class Producer {
@Outgoing("source")
- public Publisher generate() {
+ public Flow.Publisher generate() {
return Multi.createFrom().ticks().every(Duration.ofMillis(200))
.onOverflow().drop()
.map(i -> i * 1);
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java
index 59b0c766fae65..858b60cfd4195 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java
@@ -114,11 +114,8 @@ public static QuarkusMediatorConfiguration create(MethodInfo methodInfo, boolean
configuration.setProduction(validationOutput.getProduction());
configuration.setConsumption(validationOutput.getConsumption());
configuration.setIngestedPayloadType(validationOutput.getIngestedPayloadType());
- if (validationOutput.getUseBuilderTypes()) {
- configuration.setUseBuilderTypes(validationOutput.getUseBuilderTypes());
- } else {
- configuration.setUseBuilderTypes(false);
- }
+ configuration.setUseBuilderTypes(validationOutput.getUseBuilderTypes());
+ configuration.setUseReactiveStreams(validationOutput.getUseReactiveStreams());
if (acknowledgment == null) {
acknowledgment = mediatorConfigurationSupport.processDefaultAcknowledgement(shape,
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java
index b43bddc727e68..b9db175cfb784 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java
@@ -55,9 +55,8 @@ public final class ReactiveMessagingDotNames {
static final DotName EMITTER_FACTORY_FOR = DotName.createSimple(EmitterFactoryFor.class.getName());
static final DotName INCOMING_CONNECTOR_FACTORY = DotName.createSimple(IncomingConnectorFactory.class.getName());
- static final DotName OUTGOING_CONNECTOR_FACTORY = DotName.createSimple(OutgoingConnectorFactory.class.getName());
-
static final DotName INBOUND_CONNECTOR = DotName.createSimple(InboundConnector.class.getName());
+ static final DotName OUTGOING_CONNECTOR_FACTORY = DotName.createSimple(OutgoingConnectorFactory.class.getName());
static final DotName OUTBOUND_CONNECTOR = DotName.createSimple(OutboundConnector.class.getName());
static final DotName SMALLRYE_BLOCKING = DotName.createSimple(io.smallrye.common.annotation.Blocking.class.getName());
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/SmallRyeBlockingSubscriberTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/SmallRyeBlockingSubscriberTest.java
index 4d89322315f35..68af7c38ffd81 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/SmallRyeBlockingSubscriberTest.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/SmallRyeBlockingSubscriberTest.java
@@ -5,6 +5,7 @@
import java.io.File;
import java.util.List;
+import java.util.concurrent.Flow;
import java.util.stream.Collectors;
import jakarta.enterprise.context.ApplicationScoped;
@@ -13,7 +14,6 @@
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
-import org.reactivestreams.Publisher;
import io.quarkus.smallrye.reactivemessaging.blocking.beans.IncomingUsingSmallRyeBlocking;
import io.quarkus.test.QuarkusUnitTest;
@@ -47,7 +47,7 @@ public void testIncomingUsingRunOnWorkerThread() {
@ApplicationScoped
public static class ProduceIn {
@Outgoing("in")
- public Publisher produce() {
+ public Flow.Publisher produce() {
return Multi.createFrom().items("a", "b", "c", "d", "e", "f");
}
}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/TransactionalSubscriberTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/TransactionalSubscriberTest.java
index 04b6134a478a7..81e3a7639d388 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/TransactionalSubscriberTest.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/TransactionalSubscriberTest.java
@@ -5,6 +5,7 @@
import java.io.File;
import java.util.List;
+import java.util.concurrent.Flow;
import java.util.stream.Collectors;
import jakarta.enterprise.context.ApplicationScoped;
@@ -13,7 +14,6 @@
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
-import org.reactivestreams.Publisher;
import io.quarkus.smallrye.reactivemessaging.blocking.beans.IncomingUsingTransactional;
import io.quarkus.test.QuarkusUnitTest;
@@ -47,7 +47,7 @@ public void testIncomingUsingRunOnWorkerThread() {
@ApplicationScoped
public static class ProduceIn {
@Outgoing("in")
- public Publisher produce() {
+ public Flow.Publisher produce() {
return Multi.createFrom().items("a", "b", "c", "d", "e", "f");
}
}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/blocking/BlockingSubscriberTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/blocking/BlockingSubscriberTest.java
index 76a4eb3fd2eb6..9db7283577cea 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/blocking/BlockingSubscriberTest.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/blocking/BlockingSubscriberTest.java
@@ -5,6 +5,7 @@
import java.io.File;
import java.util.List;
+import java.util.concurrent.Flow;
import java.util.stream.Collectors;
import jakarta.enterprise.context.ApplicationScoped;
@@ -13,7 +14,6 @@
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
-import org.reactivestreams.Publisher;
import io.quarkus.smallrye.reactivemessaging.blocking.beans.IncomingCustomTwoBlockingBean;
import io.quarkus.smallrye.reactivemessaging.blocking.beans.IncomingCustomUnorderedBlockingBean;
@@ -72,7 +72,7 @@ public void testIncomingBlockingCustomPoolTwo() {
public static class ProduceIn {
@Outgoing("in")
@Broadcast(2)
- public Publisher produce() {
+ public Flow.Publisher produce() {
return Multi.createFrom().items("a", "b", "c", "d", "e", "f");
}
}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/blocking/SmallRyeBlockingSubscriberTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/blocking/SmallRyeBlockingSubscriberTest.java
index f2792b6cca643..dddaaf8838292 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/blocking/SmallRyeBlockingSubscriberTest.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/blocking/SmallRyeBlockingSubscriberTest.java
@@ -5,6 +5,7 @@
import java.io.File;
import java.util.List;
+import java.util.concurrent.Flow;
import java.util.stream.Collectors;
import jakarta.enterprise.context.ApplicationScoped;
@@ -13,7 +14,6 @@
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
-import org.reactivestreams.Publisher;
import io.quarkus.smallrye.reactivemessaging.blocking.beans.IncomingUsingSmallRyeBlocking;
import io.quarkus.test.QuarkusUnitTest;
@@ -47,7 +47,7 @@ public void testIncomingUsingRunOnWorkerThread() {
@ApplicationScoped
public static class ProduceIn {
@Outgoing("in")
- public Publisher produce() {
+ public Flow.Publisher produce() {
return Multi.createFrom().items("a", "b", "c", "d", "e", "f");
}
}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/channels/DeprecatedChannelConsumer.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/channels/DeprecatedChannelConsumer.java
index 9ec142ab201a9..084ddf6a2678c 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/channels/DeprecatedChannelConsumer.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/channels/DeprecatedChannelConsumer.java
@@ -13,6 +13,7 @@
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.annotations.Channel;
+import mutiny.zero.flow.adapters.AdaptersToFlow;
@ApplicationScoped
public class DeprecatedChannelConsumer {
@@ -22,7 +23,7 @@ public class DeprecatedChannelConsumer {
Publisher> sourceStream;
public List consume() {
- return Multi.createFrom().publisher(sourceStream)
+ return Multi.createFrom().publisher(AdaptersToFlow.publisher(sourceStream))
.onItem().transform(Message::getPayload)
.collect().asList()
.await().indefinitely();
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/signatures/IncomingsTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/signatures/IncomingsTest.java
index 4102b9e5124a6..8800f0cc06904 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/signatures/IncomingsTest.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/signatures/IncomingsTest.java
@@ -5,6 +5,7 @@
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Flow;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@@ -13,7 +14,6 @@
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
-import org.reactivestreams.Publisher;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.mutiny.Multi;
@@ -39,7 +39,7 @@ public void testIncomingsWithTwoSources() {
public static class ProducerOnA {
@Outgoing("a")
- public Publisher produce() {
+ public Flow.Publisher produce() {
return Multi.createFrom().items("a", "b", "c");
}
@@ -49,7 +49,7 @@ public Publisher produce() {
public static class ProducerOnB {
@Outgoing("b")
- public Publisher produce() {
+ public Flow.Publisher produce() {
return Multi.createFrom().items("d", "e", "f");
}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentChannelTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentChannelTest.java
index 647fea7bb98f7..3b9650e229ec0 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentChannelTest.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentChannelTest.java
@@ -23,6 +23,7 @@
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.mutiny.Multi;
+import mutiny.zero.flow.adapters.AdaptersToReactiveStreams;
public class ConnectorAttachmentChannelTest {
@@ -45,7 +46,8 @@ static class MyDummyConnector implements IncomingConnectorFactory {
@Override
public PublisherBuilder extends Message>> getPublisherBuilder(Config config) {
- return ReactiveStreams.fromPublisher(Multi.createFrom().range(0, 5).map(Message::of));
+ return ReactiveStreams
+ .fromPublisher(AdaptersToReactiveStreams.publisher(Multi.createFrom().range(0, 5).map(Message::of)));
}
}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentCustomEmitterTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentCustomEmitterTest.java
index 37ef0f8a9fc51..e8dfb8b826dc6 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentCustomEmitterTest.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentCustomEmitterTest.java
@@ -5,6 +5,7 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Flow;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
@@ -24,7 +25,6 @@
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
-import org.reactivestreams.Publisher;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.mutiny.Multi;
@@ -89,7 +89,7 @@ interface CustomEmitter extends EmitterType {
static class CustomEmitterImpl implements MessagePublisherProvider, CustomEmitter {
@Override
- public Publisher> getPublisher() {
+ public Flow.Publisher> getPublisher() {
return Multi.createFrom().range(0, 5).map(Message::of).map(m -> (Message) m);
}
}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentIncomingTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentIncomingTest.java
index 9c0842b500c35..4968931d27d56 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentIncomingTest.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentIncomingTest.java
@@ -22,6 +22,7 @@
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.mutiny.Multi;
+import mutiny.zero.flow.adapters.AdaptersToReactiveStreams;
public class ConnectorAttachmentIncomingTest {
@@ -44,7 +45,8 @@ static class MyDummyConnector implements IncomingConnectorFactory {
@Override
public PublisherBuilder extends Message>> getPublisherBuilder(Config config) {
- return ReactiveStreams.fromPublisher(Multi.createFrom().range(0, 5).map(Message::of));
+ return ReactiveStreams
+ .fromPublisher(AdaptersToReactiveStreams.publisher(Multi.createFrom().range(0, 5).map(Message::of)));
}
}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/DisabledConnectorAttachmentIncomingTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/DisabledConnectorAttachmentIncomingTest.java
index 82a04a471ff40..dd059d54a748d 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/DisabledConnectorAttachmentIncomingTest.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/DisabledConnectorAttachmentIncomingTest.java
@@ -23,6 +23,7 @@
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.mutiny.Multi;
+import mutiny.zero.flow.adapters.AdaptersToReactiveStreams;
public class DisabledConnectorAttachmentIncomingTest {
@@ -48,7 +49,8 @@ static class MyDummyConnector implements IncomingConnectorFactory {
@Override
public PublisherBuilder extends Message>> getPublisherBuilder(Config config) {
- return ReactiveStreams.fromPublisher(Multi.createFrom().range(0, 5).map(Message::of));
+ return ReactiveStreams
+ .fromPublisher(AdaptersToReactiveStreams.publisher(Multi.createFrom().range(0, 5).map(Message::of)));
}
}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/IncomingChannelWithDotsTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/IncomingChannelWithDotsTest.java
index 2217eebb3161e..9ffb14588289b 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/IncomingChannelWithDotsTest.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/IncomingChannelWithDotsTest.java
@@ -22,6 +22,7 @@
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.mutiny.Multi;
+import mutiny.zero.flow.adapters.AdaptersToReactiveStreams;
public class IncomingChannelWithDotsTest {
@@ -44,7 +45,8 @@ static class MyDummyConnector implements IncomingConnectorFactory {
@Override
public PublisherBuilder extends Message>> getPublisherBuilder(Config config) {
- return ReactiveStreams.fromPublisher(Multi.createFrom().range(0, 5).map(Message::of));
+ return ReactiveStreams
+ .fromPublisher(AdaptersToReactiveStreams.publisher(Multi.createFrom().range(0, 5).map(Message::of)));
}
}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/TwoConnectorsAttachmentIncomingTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/TwoConnectorsAttachmentIncomingTest.java
index 68f708c54b04f..5ffe5ef6888cf 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/TwoConnectorsAttachmentIncomingTest.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/TwoConnectorsAttachmentIncomingTest.java
@@ -23,6 +23,7 @@
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.mutiny.Multi;
+import mutiny.zero.flow.adapters.AdaptersToReactiveStreams;
public class TwoConnectorsAttachmentIncomingTest {
@@ -47,7 +48,8 @@ static class MyDummyConnector implements IncomingConnectorFactory {
@Override
public PublisherBuilder extends Message>> getPublisherBuilder(Config config) {
- return ReactiveStreams.fromPublisher(Multi.createFrom().range(0, 5).map(Message::of));
+ return ReactiveStreams
+ .fromPublisher(AdaptersToReactiveStreams.publisher(Multi.createFrom().range(0, 5).map(Message::of)));
}
}
@@ -57,7 +59,8 @@ static class MySecondDummyConnector implements IncomingConnectorFactory {
@Override
public PublisherBuilder extends Message>> getPublisherBuilder(Config config) {
- return ReactiveStreams.fromPublisher(Multi.createFrom().range(0, 5).map(Message::of));
+ return ReactiveStreams
+ .fromPublisher(AdaptersToReactiveStreams.publisher(Multi.createFrom().range(0, 5).map(Message::of)));
}
}
diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/DuplicatedContextConnectorFactoryInterceptor.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/DuplicatedContextConnectorFactoryInterceptor.java
index 3bb3703f4af2e..7ac562c0b1f4a 100644
--- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/DuplicatedContextConnectorFactoryInterceptor.java
+++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/DuplicatedContextConnectorFactoryInterceptor.java
@@ -1,6 +1,7 @@
package io.quarkus.smallrye.reactivemessaging.runtime;
import java.util.Optional;
+import java.util.concurrent.Flow;
import jakarta.annotation.Priority;
import jakarta.interceptor.AroundInvoke;
@@ -9,7 +10,6 @@
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
-import org.reactivestreams.Publisher;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.smallrye.common.vertx.VertxContext;
@@ -29,7 +29,7 @@ public Object intercept(InvocationContext ctx) throws Exception {
return result.map(DuplicatedContextConnectorFactoryInterceptor::setMessageContextSafe);
}
if (ctx.getMethod().getName().equals("getPublisher")) {
- Publisher> result = (Publisher>) ctx.proceed();
+ Flow.Publisher> result = (Flow.Publisher>) ctx.proceed();
return Multi.createFrom().publisher(result)
.map(DuplicatedContextConnectorFactoryInterceptor::setMessageContextSafe);
}
diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusMediatorConfiguration.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusMediatorConfiguration.java
index 48c3309a2dc1d..054e1111d8960 100644
--- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusMediatorConfiguration.java
+++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusMediatorConfiguration.java
@@ -53,6 +53,8 @@ public class QuarkusMediatorConfiguration implements MediatorConfiguration {
private Type ingestedPayload;
+ private boolean useReactiveStreams = false;
+
public String getBeanId() {
return beanId;
}
@@ -260,4 +262,17 @@ public void setBlockingExecutionOrdered(boolean blockingExecutionOrdered) {
public void setWorkerPoolName(String workerPoolName) {
this.workerPoolName = workerPoolName;
}
+
+ public boolean isUseReactiveStreams() {
+ return useReactiveStreams;
+ }
+
+ public void setUseReactiveStreams(boolean usesReactiveStreams) {
+ this.useReactiveStreams = usesReactiveStreams;
+ }
+
+ @Override
+ public boolean usesReactiveStreams() {
+ return useReactiveStreams;
+ }
}
diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/devmode/DevModeSupportConnectorFactoryInterceptor.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/devmode/DevModeSupportConnectorFactoryInterceptor.java
index e006416e9a4b8..ffeaae2e883b6 100644
--- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/devmode/DevModeSupportConnectorFactoryInterceptor.java
+++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/devmode/DevModeSupportConnectorFactoryInterceptor.java
@@ -1,6 +1,7 @@
package io.quarkus.smallrye.reactivemessaging.runtime.devmode;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow;
import java.util.function.Supplier;
import jakarta.annotation.Priority;
@@ -12,7 +13,6 @@
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
-import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
@@ -50,7 +50,7 @@ public Object intercept(InvocationContext ctx) throws Exception {
});
}
if (ctx.getMethod().getName().equals("getPublisher")) {
- Publisher> result = (Publisher>) ctx.proceed();
+ Flow.Publisher> result = (Flow.Publisher>) ctx.proceed();
return Multi.createFrom().publisher(result)
.onItem().transformToUniAndConcatenate(msg -> Uni.createFrom().emitter(e -> {
onMessage.get().whenComplete((restarted, error) -> {
@@ -94,12 +94,12 @@ public void onComplete() {
});
}
if (ctx.getMethod().getName().equals("getSubscriber")) {
- Subscriber> result = (Subscriber>) ctx.proceed();
- return new Subscriber>() {
- private Subscriber> subscriber;
+ Flow.Subscriber> result = (Flow.Subscriber>) ctx.proceed();
+ return new Flow.Subscriber>() {
+ private Flow.Subscriber> subscriber;
@Override
- public void onSubscribe(Subscription s) {
+ public void onSubscribe(Flow.Subscription s) {
subscriber = result;
subscriber.onSubscribe(s);
}
diff --git a/independent-projects/arc/pom.xml b/independent-projects/arc/pom.xml
index d1f29e3e352e8..04488053f8391 100644
--- a/independent-projects/arc/pom.xml
+++ b/independent-projects/arc/pom.xml
@@ -52,7 +52,7 @@
2.1.1
1.5.0.Final
3.0.0
- 1.8.0
+ 2.0.0
3.0.0-M7
1.6.8
diff --git a/independent-projects/qute/pom.xml b/independent-projects/qute/pom.xml
index 57a363e3ebfdc..9042dc167f627 100644
--- a/independent-projects/qute/pom.xml
+++ b/independent-projects/qute/pom.xml
@@ -47,7 +47,7 @@
3.5.0.Final
3.0.0-M7
1.6.8
- 1.8.0
+ 2.0.0
diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/MultiByteHttpData.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/MultiByteHttpData.java
index 3223884280ba7..f85b6c136b5d4 100644
--- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/MultiByteHttpData.java
+++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/MultiByteHttpData.java
@@ -20,10 +20,10 @@
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.concurrent.Executor;
+import java.util.concurrent.Flow.Subscription;
import java.util.function.Consumer;
import org.jboss.logging.Logger;
-import org.reactivestreams.Subscription;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
diff --git a/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/ResteasyReactiveDotNames.java b/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/ResteasyReactiveDotNames.java
index 0e0273aef9d87..d1d87e4b2f17a 100644
--- a/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/ResteasyReactiveDotNames.java
+++ b/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/ResteasyReactiveDotNames.java
@@ -21,6 +21,7 @@
import java.util.SortedSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Flow.Publisher;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
@@ -93,7 +94,6 @@
import org.jboss.resteasy.reactive.RestSseElementType;
import org.jboss.resteasy.reactive.RestStreamElementType;
import org.jboss.resteasy.reactive.Separator;
-import org.reactivestreams.Publisher;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.common.annotation.NonBlocking;
@@ -205,6 +205,7 @@ public final class ResteasyReactiveDotNames {
public static final DotName COMPLETION_STAGE = DotName.createSimple(CompletionStage.class.getName());
public static final DotName COMPLETABLE_FUTURE = DotName.createSimple(CompletableFuture.class.getName());
public static final DotName PUBLISHER = DotName.createSimple(Publisher.class.getName());
+ public static final DotName LEGACY_PUBLISHER = DotName.createSimple(org.reactivestreams.Publisher.class.getName());
public static final DotName REST_RESPONSE = DotName.createSimple(RestResponse.class.getName());
public static final DotName INTEGER = DotName.createSimple(Integer.class.getName());
diff --git a/independent-projects/resteasy-reactive/common/runtime/pom.xml b/independent-projects/resteasy-reactive/common/runtime/pom.xml
index dcb2c969b0ca3..c3dca5b8bd030 100644
--- a/independent-projects/resteasy-reactive/common/runtime/pom.xml
+++ b/independent-projects/resteasy-reactive/common/runtime/pom.xml
@@ -22,6 +22,14 @@
io.smallrye.reactive
mutiny
+
+ org.reactivestreams
+ reactive-streams
+
+
+ io.smallrye.reactive
+ mutiny-zero-flow-adapters
+
io.smallrye.common
smallrye-common-annotation
diff --git a/independent-projects/resteasy-reactive/pom.xml b/independent-projects/resteasy-reactive/pom.xml
index 438718e893a68..73ad7ed9d5a56 100644
--- a/independent-projects/resteasy-reactive/pom.xml
+++ b/independent-projects/resteasy-reactive/pom.xml
@@ -59,7 +59,7 @@
3.0.0-M7
1.6.8
- 1.8.0
+ 2.0.0
1.13.2
4.3.5
4.5.1
@@ -70,7 +70,9 @@
3.0.2
3.0.0
4.2.0
- 2.25.0
+ 3.0.0
+ 1.0.4
+ 1.0.0
3.1
@@ -217,11 +219,22 @@
${vertx.version}
+
+ org.reactivestreams
+ reactive-streams
+ ${reactive-streams.version}
+
+
io.smallrye.reactive
mutiny
${mutiny.version}
+
+ io.smallrye.reactive
+ mutiny-zero-flow-adapters
+ ${mutiny-zero.version}
+
io.smallrye.reactive
smallrye-mutiny-vertx-core
diff --git a/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/scanning/AsyncReturnTypeScanner.java b/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/scanning/AsyncReturnTypeScanner.java
index d04a8d4133af4..5cf0e9ce56ad4 100644
--- a/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/scanning/AsyncReturnTypeScanner.java
+++ b/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/scanning/AsyncReturnTypeScanner.java
@@ -2,6 +2,7 @@
import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.COMPLETABLE_FUTURE;
import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.COMPLETION_STAGE;
+import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.LEGACY_PUBLISHER;
import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.MULTI;
import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.PUBLISHER;
import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.UNI;
@@ -32,7 +33,7 @@ public List scan(MethodInfo method, ClassInfo actualEndp
return Collections.singletonList(new FixedHandlerChainCustomizer(new UniResponseHandler(),
HandlerChainCustomizer.Phase.AFTER_METHOD_INVOKE));
}
- if (returnTypeName.equals(MULTI) || returnTypeName.equals(PUBLISHER)) {
+ if (returnTypeName.equals(MULTI) || returnTypeName.equals(PUBLISHER) || returnTypeName.equals(LEGACY_PUBLISHER)) {
return Collections.singletonList(new FixedHandlerChainCustomizer(new PublisherResponseHandler(),
HandlerChainCustomizer.Phase.AFTER_METHOD_INVOKE));
}
@@ -43,6 +44,7 @@ public List scan(MethodInfo method, ClassInfo actualEndp
public boolean isMethodSignatureAsync(MethodInfo method) {
DotName returnTypeName = method.returnType().name();
return returnTypeName.equals(COMPLETION_STAGE) || returnTypeName.equals(COMPLETABLE_FUTURE) ||
- returnTypeName.equals(UNI) || returnTypeName.equals(MULTI) || returnTypeName.equals(PUBLISHER);
+ returnTypeName.equals(UNI) || returnTypeName.equals(MULTI) || returnTypeName.equals(PUBLISHER) ||
+ returnTypeName.equals(LEGACY_PUBLISHER);
}
}
diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java
index c06a031fe7713..d10f6fcfdaecd 100644
--- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java
+++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java
@@ -6,6 +6,9 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
@@ -23,9 +26,8 @@
import org.jboss.resteasy.reactive.server.model.HandlerChainCustomizer.Phase;
import org.jboss.resteasy.reactive.server.spi.ServerRestHandler;
import org.jboss.resteasy.reactive.server.spi.StreamingResponse;
-import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.Subscription;
+
+import mutiny.zero.flow.adapters.AdaptersToFlow;
/**
* This handler is used to push streams of data to the client.
@@ -246,8 +248,12 @@ public void handle(ResteasyReactiveRequestContext requestContext)
@Override
public void handle(ResteasyReactiveRequestContext requestContext) throws Exception {
// FIXME: handle Response with entity being a Multi
- if (requestContext.getResult() instanceof Publisher) {
- Publisher> result = (Publisher>) requestContext.getResult();
+ Object requestContextResult = requestContext.getResult();
+ if (requestContextResult instanceof org.reactivestreams.Publisher) {
+ requestContextResult = AdaptersToFlow.publisher((org.reactivestreams.Publisher>) requestContextResult);
+ }
+ if (requestContextResult instanceof Publisher) {
+ Publisher> result = (Publisher>) requestContextResult;
// FIXME: if we make a pretend Response and go through the normal route, we will get
// media type negotiation and fixed entity writer set up, perhaps it's better than
// cancelling the normal route?
diff --git a/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/stream/StreamResource.java b/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/stream/StreamResource.java
index 17d71f5f4bc5f..41aebefcb6d1b 100644
--- a/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/stream/StreamResource.java
+++ b/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/stream/StreamResource.java
@@ -1,6 +1,7 @@
package org.jboss.resteasy.reactive.server.vertx.test.stream;
import java.util.Date;
+import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -11,12 +12,12 @@
import jakarta.ws.rs.core.MediaType;
import org.jboss.resteasy.reactive.common.util.MultiCollectors;
-import org.reactivestreams.Publisher;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.vertx.core.buffer.Buffer;
+import mutiny.zero.flow.adapters.AdaptersToReactiveStreams;
@Path("stream")
public class StreamResource {
@@ -44,6 +45,13 @@ public Publisher getStreamedTextPublisher() {
return Multi.createFrom().items("foo", "bar");
}
+ @Path("text/stream/legacy-publisher")
+ @GET
+ @Produces(MediaType.TEXT_PLAIN)
+ public org.reactivestreams.Publisher getStreamedTextLegacyPublisher() {
+ return AdaptersToReactiveStreams.publisher(Multi.createFrom().items("foo", "bar"));
+ }
+
@Path("byte-arrays/collect")
@GET
@Produces(MediaType.TEXT_PLAIN)
diff --git a/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/stream/StreamTestCase.java b/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/stream/StreamTestCase.java
index 10ff84498cda4..d97c50659ec71 100644
--- a/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/stream/StreamTestCase.java
+++ b/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/stream/StreamTestCase.java
@@ -104,6 +104,10 @@ public void testStreaming() throws Exception {
.then()
.statusCode(200)
.body(Matchers.equalTo("foobar"));
+ RestAssured.get("/stream/text/stream/legacy-publisher")
+ .then()
+ .statusCode(200)
+ .body(Matchers.equalTo("foobar"));
RestAssured.get("/stream/text/collect")
.then()
.statusCode(200)
diff --git a/integration-tests/devmode/src/test/java/io/quarkus/test/devconsole/MyProcessor.java b/integration-tests/devmode/src/test/java/io/quarkus/test/devconsole/MyProcessor.java
index affecb9fdf6c6..d9a63239dff43 100644
--- a/integration-tests/devmode/src/test/java/io/quarkus/test/devconsole/MyProcessor.java
+++ b/integration-tests/devmode/src/test/java/io/quarkus/test/devconsole/MyProcessor.java
@@ -1,5 +1,7 @@
package io.quarkus.test.devconsole;
+import java.util.concurrent.Flow;
+
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
@@ -8,9 +10,6 @@
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
-import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.Subscription;
import io.smallrye.reactive.messaging.annotations.Broadcast;
import io.vertx.core.http.HttpServerResponse;
@@ -21,14 +20,14 @@ public class MyProcessor {
@Inject
@Channel("processed")
- Instance> channel;
+ Instance> channel;
public void init(@Observes Router router) {
router.get().handler(rc -> {
HttpServerResponse response = rc.response();
- channel.get().subscribe(new Subscriber() {
+ channel.get().subscribe(new Flow.Subscriber() {
@Override
- public void onSubscribe(Subscription subscription) {
+ public void onSubscribe(Flow.Subscription subscription) {
response.putHeader("Transfer-encoding", "chunked");
subscription.request(2);
}
diff --git a/integration-tests/main/src/main/java/io/quarkus/it/reactive/ReactiveStreamOpsResource.java b/integration-tests/main/src/main/java/io/quarkus/it/reactive/ReactiveStreamOpsResource.java
index 828d18c69d3ab..a78519c8d7ca6 100644
--- a/integration-tests/main/src/main/java/io/quarkus/it/reactive/ReactiveStreamOpsResource.java
+++ b/integration-tests/main/src/main/java/io/quarkus/it/reactive/ReactiveStreamOpsResource.java
@@ -6,6 +6,7 @@
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import io.smallrye.mutiny.Multi;
+import mutiny.zero.flow.adapters.AdaptersToReactiveStreams;
@Path("/reactive")
public class ReactiveStreamOpsResource {
@@ -30,7 +31,7 @@ public String stream1() {
@Path("/stream-mutiny")
public String stream2() {
StringBuilder builder = new StringBuilder();
- ReactiveStreams.fromPublisher(Multi.createFrom().items("d", "e", "f"))
+ ReactiveStreams.fromPublisher(AdaptersToReactiveStreams.publisher(Multi.createFrom().items("d", "e", "f")))
.map(String::toUpperCase)
.forEach(builder::append)
.run();
diff --git a/integration-tests/mongodb-panache-kotlin/src/main/kotlin/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookEntityResource.kt b/integration-tests/mongodb-panache-kotlin/src/main/kotlin/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookEntityResource.kt
index 8237bed5f257c..9984585251220 100644
--- a/integration-tests/mongodb-panache-kotlin/src/main/kotlin/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookEntityResource.kt
+++ b/integration-tests/mongodb-panache-kotlin/src/main/kotlin/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookEntityResource.kt
@@ -19,9 +19,9 @@ import jakarta.ws.rs.core.Response
import org.bson.types.ObjectId
import org.jboss.logging.Logger
import org.jboss.resteasy.reactive.RestStreamElementType
-import org.reactivestreams.Publisher
import java.net.URI
import java.time.LocalDate.parse
+import java.util.concurrent.Flow
@Path("/reactive/books/entity")
class ReactiveBookEntityResource {
@@ -43,7 +43,7 @@ class ReactiveBookEntityResource {
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestStreamElementType(MediaType.APPLICATION_JSON)
- fun streamBooks(@QueryParam("sort") sort: String?): Publisher {
+ fun streamBooks(@QueryParam("sort") sort: String?): Flow.Publisher {
return if (sort != null) {
ReactiveBookEntity.streamAll(Sort.ascending(sort))
} else ReactiveBookEntity.streamAll()
diff --git a/integration-tests/mongodb-panache-kotlin/src/main/kotlin/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookRepositoryResource.kt b/integration-tests/mongodb-panache-kotlin/src/main/kotlin/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookRepositoryResource.kt
index 3ca8977a3dcc0..bdd3f2baa8c0c 100644
--- a/integration-tests/mongodb-panache-kotlin/src/main/kotlin/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookRepositoryResource.kt
+++ b/integration-tests/mongodb-panache-kotlin/src/main/kotlin/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookRepositoryResource.kt
@@ -21,9 +21,9 @@ import jakarta.ws.rs.core.Response
import org.bson.types.ObjectId
import org.jboss.logging.Logger
import org.jboss.resteasy.reactive.RestStreamElementType
-import org.reactivestreams.Publisher
import java.net.URI
import java.time.LocalDate
+import java.util.concurrent.Flow
@Path("/reactive/books/repository")
class ReactiveBookRepositoryResource {
@@ -48,7 +48,7 @@ class ReactiveBookRepositoryResource {
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestStreamElementType(MediaType.APPLICATION_JSON)
- fun streamBooks(@QueryParam("sort") sort: String?): Publisher {
+ fun streamBooks(@QueryParam("sort") sort: String?): Flow.Publisher {
return if (sort != null) {
reactiveBookRepository.streamAll(Sort.ascending(sort))
} else reactiveBookRepository.streamAll()
diff --git a/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookEntityResource.java b/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookEntityResource.java
index 38d640841f6b9..eddd1391f6189 100644
--- a/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookEntityResource.java
+++ b/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookEntityResource.java
@@ -3,6 +3,7 @@
import java.net.URI;
import java.time.LocalDate;
import java.util.List;
+import java.util.concurrent.Flow;
import jakarta.annotation.PostConstruct;
import jakarta.ws.rs.*;
@@ -12,7 +13,6 @@
import org.bson.types.ObjectId;
import org.jboss.logging.Logger;
import org.jboss.resteasy.reactive.RestStreamElementType;
-import org.reactivestreams.Publisher;
import io.quarkus.panache.common.Parameters;
import io.quarkus.panache.common.Sort;
@@ -41,7 +41,7 @@ public Uni> getBooks(@QueryParam("sort") String sort) {
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestStreamElementType(MediaType.APPLICATION_JSON)
- public Publisher streamBooks(@QueryParam("sort") String sort) {
+ public Flow.Publisher streamBooks(@QueryParam("sort") String sort) {
if (sort != null) {
return ReactiveBookEntity.streamAll(Sort.ascending(sort));
}
diff --git a/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookRepositoryResource.java b/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookRepositoryResource.java
index b0c97b81524b7..96e21828edc16 100644
--- a/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookRepositoryResource.java
+++ b/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookRepositoryResource.java
@@ -3,6 +3,7 @@
import java.net.URI;
import java.time.LocalDate;
import java.util.List;
+import java.util.concurrent.Flow;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
@@ -13,7 +14,6 @@
import org.bson.types.ObjectId;
import org.jboss.logging.Logger;
import org.jboss.resteasy.reactive.RestStreamElementType;
-import org.reactivestreams.Publisher;
import io.quarkus.it.mongodb.panache.book.Book;
import io.quarkus.panache.common.Parameters;
@@ -45,7 +45,7 @@ public Uni> getBooks(@QueryParam("sort") String sort) {
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestStreamElementType(MediaType.APPLICATION_JSON)
- public Publisher streamBooks(@QueryParam("sort") String sort) {
+ public Flow.Publisher streamBooks(@QueryParam("sort") String sort) {
if (sort != null) {
return reactiveBookRepository.streamAll(Sort.ascending(sort));
}
diff --git a/integration-tests/smallrye-context-propagation/src/test/java/io/quarkus/context/test/mutiny/MutinyContextEndpoint.java b/integration-tests/smallrye-context-propagation/src/test/java/io/quarkus/context/test/mutiny/MutinyContextEndpoint.java
index d3e442c56e6b3..26b82ed2877cb 100644
--- a/integration-tests/smallrye-context-propagation/src/test/java/io/quarkus/context/test/mutiny/MutinyContextEndpoint.java
+++ b/integration-tests/smallrye-context-propagation/src/test/java/io/quarkus/context/test/mutiny/MutinyContextEndpoint.java
@@ -4,6 +4,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Flow;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
@@ -23,7 +24,6 @@
import org.eclipse.microprofile.context.ManagedExecutor;
import org.eclipse.microprofile.context.ThreadContext;
import org.junit.jupiter.api.Assertions;
-import org.reactivestreams.Publisher;
import org.wildfly.common.Assert;
import io.quarkus.arc.Arc;
@@ -455,7 +455,7 @@ public Multi transactionPropagationWithMulti() throws SystemException {
@Transactional
@GET
@Path("/transaction-multi-2")
- public Publisher transactionPropagationWithMulti2() {
+ public Flow.Publisher transactionPropagationWithMulti2() {
Multi ret = Multi.createFrom().item("OK");
// now delete both entities
Assertions.assertEquals(2, Person.deleteAll());