diff --git a/bom/application/pom.xml b/bom/application/pom.xml index 19f5b6bfe58c0..b39ba6e61a514 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -59,15 +59,15 @@ 4.0.1 4.0.0 3.1.1 - 2.0.1 + 2.1.0.Beta1 3.0.0 6.1.0 4.0.0 2.0.0 1.0.13 - 2.7.0 - 2.30.1 - 4.1.1 + 3.0.0 + 3.2.0 + 4.1.1.RC1 1.4.1 2.1.0 2.1.1 @@ -144,9 +144,9 @@ 3.1.1 4.1.86.Final 1.8.0 - 1.0.3 + 1.0.4 3.5.0.Final - 1.9.0 + 2.1.0 3.3.2 1.8.0 1.1.8.4 @@ -214,6 +214,7 @@ 0.0.6 0.1.3 2.9.2 + 1.0.0 @@ -5439,6 +5440,11 @@ + + io.smallrye.reactive + mutiny-zero-flow-adapters + ${mutiny-zero.version} + org.eclipse.microprofile.reactive-streams-operators microprofile-reactive-streams-operators-api diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/GrpcReflectionTest.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/GrpcReflectionTest.java index 1e748b893d4ce..80d495f1e33ae 100644 --- a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/GrpcReflectionTest.java +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/GrpcReflectionTest.java @@ -5,6 +5,7 @@ import java.util.Arrays; import java.util.List; +import java.util.concurrent.Flow; import org.jboss.shrinkwrap.api.ShrinkWrap; import org.jboss.shrinkwrap.api.spec.JavaArchive; @@ -12,8 +13,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; import com.google.protobuf.ByteString; @@ -233,15 +232,15 @@ public void testRetrievingAllExtensionNumbersOfType() { assertThat(list).containsExactlyInAnyOrderElementsOf(expected); } - private static class ResettableSubscriber implements Subscriber { + private static class ResettableSubscriber implements Flow.Subscriber { - private Subscription subscription; + private Flow.Subscription subscription; private volatile T last; private boolean completed; private Throwable failure; @Override - public void onSubscribe(Subscription subscription) { + public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; } diff --git a/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/Attachment.java b/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/Attachment.java index bc3e31c329600..7e11540c1eb99 100644 --- a/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/Attachment.java +++ b/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/Attachment.java @@ -3,8 +3,7 @@ import java.io.File; import java.util.Iterator; import java.util.NoSuchElementException; - -import org.reactivestreams.Publisher; +import java.util.concurrent.Flow.Publisher; import io.smallrye.mutiny.Multi; diff --git a/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/Mail.java b/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/Mail.java index 07e094bcf5b6f..5ee50038adc2e 100644 --- a/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/Mail.java +++ b/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/Mail.java @@ -2,8 +2,7 @@ import java.io.File; import java.util.*; - -import org.reactivestreams.Publisher; +import java.util.concurrent.Flow.Publisher; /** * Represents an e-mail. diff --git a/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/runtime/MutinyMailerImpl.java b/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/runtime/MutinyMailerImpl.java index e69674e7adcf5..302656b6e58a6 100644 --- a/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/runtime/MutinyMailerImpl.java +++ b/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/runtime/MutinyMailerImpl.java @@ -6,6 +6,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Flow.Publisher; import java.util.function.Function; import java.util.stream.Collectors; @@ -13,7 +14,6 @@ import jakarta.inject.Inject; import org.jboss.logging.Logger; -import org.reactivestreams.Publisher; import io.quarkus.mailer.Attachment; import io.quarkus.mailer.Mail; diff --git a/extensions/mailer/runtime/src/test/java/io/quarkus/mailer/AttachmentTest.java b/extensions/mailer/runtime/src/test/java/io/quarkus/mailer/AttachmentTest.java index cc73a1ec7bad1..b1bd405c634e2 100644 --- a/extensions/mailer/runtime/src/test/java/io/quarkus/mailer/AttachmentTest.java +++ b/extensions/mailer/runtime/src/test/java/io/quarkus/mailer/AttachmentTest.java @@ -7,12 +7,12 @@ import java.util.Iterator; import java.util.NoSuchElementException; import java.util.UUID; +import java.util.concurrent.Flow; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.reactivestreams.Publisher; import io.quarkus.mailer.runtime.MutinyMailerImpl; import io.smallrye.mutiny.Multi; @@ -70,7 +70,7 @@ void testInlineAttachmentCreationFromFile() { @Test void testAttachmentCreationFromStream() { - Publisher publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true)) + Flow.Publisher publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true)) .onItem().transformToMulti(af -> af.toMulti() .onItem().transformToIterable(this::getBytes)); @@ -109,7 +109,7 @@ public Byte next() { @Test void testInlineAttachmentCreationFromStream() { - Publisher publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true)) + Flow.Publisher publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true)) .onItem().transformToMulti(af -> af.toMulti() .onItem().transformToIterable(this::getBytes)); @@ -128,7 +128,7 @@ void testInlineAttachmentCreationFromStream() { @Test void testAttachmentCreationWithDescription() { - Publisher publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true)) + Flow.Publisher publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true)) .onItem().transformToMulti(af -> af.toMulti() .onItem().transformToIterable(this::getBytes)); @@ -154,7 +154,7 @@ private String getContent(Attachment attachment) { @Test void testInlineAttachmentCreationWithDescription() { - Publisher publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true)) + Flow.Publisher publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true)) .onItem().transformToMulti(af -> af.toMulti() .onItem().transformToIterable(this::getBytes)); diff --git a/extensions/mongodb-client/runtime/pom.xml b/extensions/mongodb-client/runtime/pom.xml index f02251708d3ee..187488cbdcbcf 100644 --- a/extensions/mongodb-client/runtime/pom.xml +++ b/extensions/mongodb-client/runtime/pom.xml @@ -24,6 +24,10 @@ io.quarkus quarkus-mutiny + + io.smallrye.reactive + mutiny-zero-flow-adapters + io.quarkus quarkus-vertx diff --git a/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/ReactiveMongoCollectionImpl.java b/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/ReactiveMongoCollectionImpl.java index 1196ff7fc668e..acac2af661e43 100644 --- a/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/ReactiveMongoCollectionImpl.java +++ b/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/ReactiveMongoCollectionImpl.java @@ -1,6 +1,7 @@ package io.quarkus.mongodb.impl; import java.util.List; +import java.util.concurrent.Flow; import org.bson.Document; import org.bson.codecs.configuration.CodecRegistry; @@ -47,6 +48,7 @@ import io.quarkus.mongodb.reactive.ReactiveMongoCollection; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; +import mutiny.zero.flow.adapters.AdaptersToFlow; public class ReactiveMongoCollectionImpl implements ReactiveMongoCollection { @@ -262,11 +264,11 @@ public Multi aggregate(ClientSession clientSession, List return Wrappers.toMulti(collection.aggregate(clientSession, pipeline, clazz)); } - private AggregatePublisher apply(AggregateOptions options, AggregatePublisher publisher) { + private Flow.Publisher apply(AggregateOptions options, AggregatePublisher publisher) { if (options == null) { - return publisher; + return AdaptersToFlow.publisher(publisher); } - return options.apply(publisher); + return AdaptersToFlow.publisher(options.apply(publisher)); } @Override @@ -409,11 +411,11 @@ public Multi mapReduce(String mapFunction, String reduceFunction, MapReduceOp return Multi.createFrom().publisher(apply(options, collection.mapReduce(mapFunction, reduceFunction))); } - private MapReducePublisher apply(MapReduceOptions options, MapReducePublisher mapReduce) { + private Flow.Publisher apply(MapReduceOptions options, MapReducePublisher mapReduce) { if (options == null) { - return mapReduce; + return AdaptersToFlow.publisher(mapReduce); } - return options.apply(mapReduce); + return AdaptersToFlow.publisher(options.apply(mapReduce)); } @Override diff --git a/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/ReactiveMongoDatabaseImpl.java b/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/ReactiveMongoDatabaseImpl.java index 41249cdf9f7ba..b439c44f8cdd5 100644 --- a/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/ReactiveMongoDatabaseImpl.java +++ b/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/ReactiveMongoDatabaseImpl.java @@ -1,6 +1,7 @@ package io.quarkus.mongodb.impl; import java.util.List; +import java.util.concurrent.Flow; import org.bson.Document; import org.bson.conversions.Bson; @@ -22,6 +23,7 @@ import io.quarkus.mongodb.reactive.ReactiveMongoDatabase; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; +import mutiny.zero.flow.adapters.AdaptersToFlow; public class ReactiveMongoDatabaseImpl implements ReactiveMongoDatabase { @@ -127,12 +129,11 @@ public Multi listCollections(Class clazz, CollectionListOptions option return Multi.createFrom().publisher(apply(options, database.listCollections(clazz))); } - private ListCollectionsPublisher apply(CollectionListOptions options, - ListCollectionsPublisher collections) { + private Flow.Publisher apply(CollectionListOptions options, ListCollectionsPublisher collections) { if (options == null) { - return collections; + return AdaptersToFlow.publisher(collections); } else { - return options.apply(collections); + return AdaptersToFlow.publisher(options.apply(collections)); } } diff --git a/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/Wrappers.java b/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/Wrappers.java index 7cb38ba3d350b..f9474f1899689 100644 --- a/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/Wrappers.java +++ b/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/Wrappers.java @@ -8,6 +8,7 @@ import io.smallrye.mutiny.Uni; import io.vertx.core.Context; import io.vertx.core.Vertx; +import mutiny.zero.flow.adapters.AdaptersToFlow; class Wrappers { @@ -17,7 +18,7 @@ private Wrappers() { static Uni toUni(Publisher publisher) { Context context = Vertx.currentContext(); - Uni uni = Uni.createFrom().publisher(publisher); + Uni uni = Uni.createFrom().publisher(AdaptersToFlow.publisher(publisher)); if (context != null) { return uni.emitOn(command -> context.runOnContext(x -> command.run())); } @@ -27,15 +28,16 @@ static Uni toUni(Publisher publisher) { static Multi toMulti(Publisher publisher) { Context context = Vertx.currentContext(); if (context != null) { - return Multi.createFrom().publisher(publisher).emitOn(command -> context.runOnContext(x -> command.run())); + return Multi.createFrom().publisher(AdaptersToFlow.publisher(publisher)) + .emitOn(command -> context.runOnContext(x -> command.run())); } else { - return Multi.createFrom().publisher(publisher); + return Multi.createFrom().publisher(AdaptersToFlow.publisher(publisher)); } } static Uni> toUniOfList(Publisher publisher) { Context context = Vertx.currentContext(); - Uni> uni = Multi.createFrom().publisher(publisher) + Uni> uni = Multi.createFrom().publisher(AdaptersToFlow.publisher(publisher)) .collect().asList(); if (context != null) { diff --git a/extensions/narayana-jta/runtime/pom.xml b/extensions/narayana-jta/runtime/pom.xml index fde4e86a79887..5a9116588ff8d 100644 --- a/extensions/narayana-jta/runtime/pom.xml +++ b/extensions/narayana-jta/runtime/pom.xml @@ -51,6 +51,10 @@ io.smallrye.reactive smallrye-reactive-converter-mutiny + + io.smallrye.reactive + mutiny-zero-flow-adapters + org.graalvm.nativeimage svm diff --git a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/interceptor/TransactionalInterceptorBase.java b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/interceptor/TransactionalInterceptorBase.java index c281e78eae50b..d965b3b1da999 100644 --- a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/interceptor/TransactionalInterceptorBase.java +++ b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/interceptor/TransactionalInterceptorBase.java @@ -9,6 +9,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Flow; import java.util.function.Function; import jakarta.inject.Inject; @@ -33,6 +34,8 @@ import io.smallrye.mutiny.Multi; import io.smallrye.reactive.converters.ReactiveTypeConverter; import io.smallrye.reactive.converters.Registry; +import mutiny.zero.flow.adapters.AdaptersToFlow; +import mutiny.zero.flow.adapters.AdaptersToReactiveStreams; public abstract class TransactionalInterceptorBase implements Serializable { @@ -138,8 +141,7 @@ protected Object invokeInOurTx(InvocationContext ic, TransactionManager tm, Runn // handle asynchronously if not throwing if (!throwing && ret != null) { ReactiveTypeConverter converter = null; - if (ret instanceof CompletionStage == false - && (ret instanceof Publisher == false || ic.getMethod().getReturnType() != Publisher.class)) { + if (!isCompletionStage(ret) && !isSomePublisher(ic, ret)) { @SuppressWarnings({ "rawtypes", "unchecked" }) Optional> lookup = Registry.lookup((Class) ret.getClass()); if (lookup.isPresent()) { @@ -151,12 +153,15 @@ protected Object invokeInOurTx(InvocationContext ic, TransactionManager tm, Runn } } } - if (ret instanceof CompletionStage) { + if (isCompletionStage(ret)) { ret = handleAsync(tm, tx, ic, ret, afterEndTransaction); // convert back if (converter != null) ret = converter.fromCompletionStage((CompletionStage) ret); - } else if (ret instanceof Publisher) { + } else if (isFlowPublisher(ret)) { + // FIXME this needs to be tested + ret = handleAsync(tm, tx, ic, ret, afterEndTransaction); + } else if (isLegacyPublisher(ret)) { ret = handleAsync(tm, tx, ic, ret, afterEndTransaction); // convert back if (converter != null) @@ -173,6 +178,23 @@ protected Object invokeInOurTx(InvocationContext ic, TransactionManager tm, Runn return ret; } + private static boolean isLegacyPublisher(Object ret) { + return ret instanceof Publisher; + } + + private boolean isSomePublisher(InvocationContext ic, Object ret) { + return isLegacyPublisher(ret) || (ic.getMethod().getReturnType() == Publisher.class) + || isFlowPublisher(ret) || (ic.getMethod().getReturnType() == Flow.Publisher.class); + } + + private static boolean isFlowPublisher(Object ret) { + return ret instanceof Flow.Publisher; + } + + private boolean isCompletionStage(Object ret) { + return ret instanceof CompletionStage; + } + private int getTransactionTimeoutFromAnnotation(InvocationContext ic) { TransactionConfiguration configAnnotation = getTransactionConfiguration(ic); @@ -223,7 +245,7 @@ protected Object handleAsync(TransactionManager tm, Transaction tx, InvocationCo // Suspend the transaction to remove it from the main request thread tm.suspend(); afterEndTransaction.run(); - if (ret instanceof CompletionStage) { + if (isCompletionStage(ret)) { return ((CompletionStage) ret).handle((v, t) -> { try { doInTransaction(tm, tx, () -> { @@ -249,8 +271,15 @@ protected Object handleAsync(TransactionManager tm, Transaction tx, InvocationCo throw new CompletionException(t); return v; }); - } else if (ret instanceof Publisher) { - ret = Multi.createFrom().publisher((Publisher) ret) + } else if (isLegacyPublisher(ret) || isFlowPublisher(ret)) { + Flow.Publisher pub; + boolean isLegacyRS = !isFlowPublisher(ret); + if (isLegacyRS) { + pub = AdaptersToFlow.publisher((Publisher) ret); + } else { + pub = (Flow.Publisher) ret; + } + ret = Multi.createFrom().publisher(pub) .onFailure().invoke(t -> { try { doInTransaction(tm, tx, () -> handleExceptionNoThrow(ic, t, tx)); @@ -276,6 +305,9 @@ protected Object handleAsync(TransactionManager tm, Transaction tx, InvocationCo throw new RuntimeException(e); } }); + if (isLegacyRS) { + ret = AdaptersToReactiveStreams.publisher((Multi) ret); + } } return ret; } diff --git a/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiJsonArraySupport.java b/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiJsonArraySupport.java index 38417f89b7ad0..f7da1f6d3e4ae 100644 --- a/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiJsonArraySupport.java +++ b/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiJsonArraySupport.java @@ -1,10 +1,9 @@ package io.quarkus.vertx.web.runtime; +import java.util.concurrent.Flow; +import java.util.concurrent.Flow.Subscriber; import java.util.function.Function; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - import io.smallrye.mutiny.Multi; import io.vertx.core.AsyncResult; import io.vertx.core.Handler; @@ -37,10 +36,10 @@ public Buffer apply(String s) { private static void write(Multi multi, RoutingContext rc) { HttpServerResponse response = rc.response(); multi.subscribe().withSubscriber(new Subscriber() { - Subscription upstream; + Flow.Subscription upstream; @Override - public void onSubscribe(Subscription subscription) { + public void onSubscribe(Flow.Subscription subscription) { this.upstream = subscription; this.upstream.request(1); } @@ -78,7 +77,7 @@ public void onComplete() { }); } - private static void onWriteDone(Subscription subscription, AsyncResult ar, RoutingContext rc) { + private static void onWriteDone(Flow.Subscription subscription, AsyncResult ar, RoutingContext rc) { if (ar.failed()) { rc.fail(ar.cause()); } else { diff --git a/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiNdjsonSupport.java b/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiNdjsonSupport.java index 4d7da7b2fc5c8..907ea871ae112 100644 --- a/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiNdjsonSupport.java +++ b/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiNdjsonSupport.java @@ -1,7 +1,7 @@ package io.quarkus.vertx.web.runtime; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; import io.smallrye.mutiny.Multi; import io.vertx.core.AsyncResult; diff --git a/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiSseSupport.java b/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiSseSupport.java index 11962e9bdd75c..1c3c323628760 100644 --- a/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiSseSupport.java +++ b/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiSseSupport.java @@ -1,11 +1,10 @@ package io.quarkus.vertx.web.runtime; +import java.util.concurrent.Flow; +import java.util.concurrent.Flow.Subscription; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - import io.quarkus.vertx.web.ReactiveRoutes; import io.smallrye.mutiny.Multi; import io.vertx.core.AsyncResult; @@ -52,7 +51,7 @@ private static void onWriteDone(Subscription subscription, AsyncResult ar, public static void write(Multi multi, RoutingContext rc) { HttpServerResponse response = rc.response(); - multi.subscribe().withSubscriber(new Subscriber() { + multi.subscribe().withSubscriber(new Flow.Subscriber() { Subscription upstream; @Override @@ -86,7 +85,7 @@ public void onComplete() { public static void subscribeBuffer(Multi multi, RoutingContext rc) { HttpServerResponse response = rc.response(); - multi.subscribe().withSubscriber(new Subscriber() { + multi.subscribe().withSubscriber(new Flow.Subscriber() { Subscription upstream; final AtomicLong count = new AtomicLong(); diff --git a/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiSupport.java b/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiSupport.java index 58634cd62469f..efde297927100 100644 --- a/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiSupport.java +++ b/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiSupport.java @@ -1,11 +1,10 @@ package io.quarkus.vertx.web.runtime; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; import java.util.function.Consumer; import java.util.function.Function; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - import io.smallrye.mutiny.Multi; import io.vertx.core.AsyncResult; import io.vertx.core.Handler; diff --git a/extensions/resteasy-classic/resteasy-mutiny-common/runtime/pom.xml b/extensions/resteasy-classic/resteasy-mutiny-common/runtime/pom.xml index 0894a6056a0a7..1d4eb9f9ebe43 100644 --- a/extensions/resteasy-classic/resteasy-mutiny-common/runtime/pom.xml +++ b/extensions/resteasy-classic/resteasy-mutiny-common/runtime/pom.xml @@ -22,6 +22,10 @@ io.quarkus quarkus-mutiny + + io.smallrye.reactive + mutiny-zero-flow-adapters + org.jboss.resteasy resteasy-client diff --git a/extensions/resteasy-classic/resteasy-mutiny-common/runtime/src/main/java/io/quarkus/resteasy/mutiny/common/runtime/MultiProvider.java b/extensions/resteasy-classic/resteasy-mutiny-common/runtime/src/main/java/io/quarkus/resteasy/mutiny/common/runtime/MultiProvider.java index fa53954b5168e..a8fcdac529730 100644 --- a/extensions/resteasy-classic/resteasy-mutiny-common/runtime/src/main/java/io/quarkus/resteasy/mutiny/common/runtime/MultiProvider.java +++ b/extensions/resteasy-classic/resteasy-mutiny-common/runtime/src/main/java/io/quarkus/resteasy/mutiny/common/runtime/MultiProvider.java @@ -4,10 +4,11 @@ import org.reactivestreams.Publisher; import io.smallrye.mutiny.Multi; +import mutiny.zero.flow.adapters.AdaptersToReactiveStreams; public class MultiProvider implements AsyncStreamProvider> { @Override public Publisher toAsyncStream(Multi multi) { - return multi; + return AdaptersToReactiveStreams.publisher(multi); } -} \ No newline at end of file +} diff --git a/extensions/resteasy-classic/resteasy-mutiny-common/runtime/src/test/java/io/quarkus/resteasy/mutiny/common/test/MultiProviderTest.java b/extensions/resteasy-classic/resteasy-mutiny-common/runtime/src/test/java/io/quarkus/resteasy/mutiny/common/test/MultiProviderTest.java index 1144b7e535044..9ff98a595da42 100644 --- a/extensions/resteasy-classic/resteasy-mutiny-common/runtime/src/test/java/io/quarkus/resteasy/mutiny/common/test/MultiProviderTest.java +++ b/extensions/resteasy-classic/resteasy-mutiny-common/runtime/src/test/java/io/quarkus/resteasy/mutiny/common/test/MultiProviderTest.java @@ -8,6 +8,7 @@ import io.quarkus.resteasy.mutiny.common.runtime.MultiProvider; import io.smallrye.mutiny.Multi; +import mutiny.zero.flow.adapters.AdaptersToFlow; public class MultiProviderTest { @@ -17,7 +18,8 @@ public class MultiProviderTest { public void test() { Multi multi = Multi.createFrom().items(1, 2, 3); Publisher publisher = provider.toAsyncStream(multi); - List list = Multi.createFrom().publisher(publisher).collect().asList().await().indefinitely(); + List list = Multi.createFrom().publisher(AdaptersToFlow.publisher(publisher)).collect().asList().await() + .indefinitely(); Assertions.assertEquals(1, list.get(0)); Assertions.assertEquals(2, list.get(1)); Assertions.assertEquals(3, list.get(2)); diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamResource.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamResource.java index c42dea196633a..a6f8ea1df87a7 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamResource.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamResource.java @@ -1,6 +1,7 @@ package io.quarkus.resteasy.reactive.server.test.stream; import java.util.Date; +import java.util.concurrent.Flow; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -20,6 +21,7 @@ import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.infrastructure.Infrastructure; import io.vertx.core.buffer.Buffer; +import mutiny.zero.flow.adapters.AdaptersToReactiveStreams; @Path("stream") public class StreamResource { @@ -43,10 +45,17 @@ public Multi getStreamedText() { @Path("text/stream/publisher") @GET @Produces(MediaType.TEXT_PLAIN) - public Publisher getStreamedTextPublisher() { + public Flow.Publisher getStreamedTextPublisher() { return Multi.createFrom().items("foo", "bar"); } + @Path("text/stream/legacy-publisher") + @GET + @Produces(MediaType.TEXT_PLAIN) + public Publisher getStreamedTextLegacyPublisher() { + return AdaptersToReactiveStreams.publisher(Multi.createFrom().items("foo", "bar")); + } + @Path("byte-arrays/collect") @GET @Produces(MediaType.TEXT_PLAIN) @@ -78,7 +87,7 @@ public Multi getStreamedCharArrays() { @Path("char-arrays/stream/publisher") @GET @Produces(MediaType.TEXT_PLAIN) - public Publisher getStreamedCharArraysPublisher() { + public Flow.Publisher getStreamedCharArraysPublisher() { return Multi.createFrom().items("foo".toCharArray(), "bar".toCharArray()); } diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamTestCase.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamTestCase.java index 75e9830a024f2..7a2135bd216cd 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamTestCase.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamTestCase.java @@ -105,6 +105,10 @@ public void testStreaming() throws Exception { .then() .statusCode(200) .body(Matchers.equalTo("foobar")); + RestAssured.get("/stream/text/stream/legacy-publisher") + .then() + .statusCode(200) + .body(Matchers.equalTo("foobar")); RestAssured.get("/stream/text/collect") .then() .statusCode(200) diff --git a/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/amqp/ProducingBean.java b/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/amqp/ProducingBean.java index 93771a283c998..d708936335aa2 100644 --- a/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/amqp/ProducingBean.java +++ b/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/amqp/ProducingBean.java @@ -1,11 +1,11 @@ package io.quarkus.smallrye.reactivemessaging.amqp; import java.time.Duration; +import java.util.concurrent.Flow; import jakarta.enterprise.context.ApplicationScoped; import org.eclipse.microprofile.reactive.messaging.Outgoing; -import org.reactivestreams.Publisher; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; @@ -14,7 +14,7 @@ public class ProducingBean { @Outgoing("source") - public Publisher generate() { + public Flow.Publisher generate() { return Multi.createFrom().range(1, 11) .map(Integer::longValue) .map(i -> i * 2) diff --git a/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/amqp/devmode/nohttp/Producer.java b/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/amqp/devmode/nohttp/Producer.java index f7ae0181c2a3a..812ed0ac96845 100644 --- a/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/amqp/devmode/nohttp/Producer.java +++ b/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/amqp/devmode/nohttp/Producer.java @@ -1,18 +1,18 @@ package io.quarkus.smallrye.reactivemessaging.amqp.devmode.nohttp; import java.time.Duration; +import java.util.concurrent.Flow; import jakarta.enterprise.context.ApplicationScoped; import org.eclipse.microprofile.reactive.messaging.Outgoing; -import org.reactivestreams.Publisher; import io.smallrye.mutiny.Multi; @ApplicationScoped public class Producer { @Outgoing("source") - public Publisher generate() { + public Flow.Publisher generate() { return Multi.createFrom().ticks().every(Duration.ofMillis(200)) .onOverflow().drop() .map(i -> i * 1); diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java index 59b0c766fae65..858b60cfd4195 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java @@ -114,11 +114,8 @@ public static QuarkusMediatorConfiguration create(MethodInfo methodInfo, boolean configuration.setProduction(validationOutput.getProduction()); configuration.setConsumption(validationOutput.getConsumption()); configuration.setIngestedPayloadType(validationOutput.getIngestedPayloadType()); - if (validationOutput.getUseBuilderTypes()) { - configuration.setUseBuilderTypes(validationOutput.getUseBuilderTypes()); - } else { - configuration.setUseBuilderTypes(false); - } + configuration.setUseBuilderTypes(validationOutput.getUseBuilderTypes()); + configuration.setUseReactiveStreams(validationOutput.getUseReactiveStreams()); if (acknowledgment == null) { acknowledgment = mediatorConfigurationSupport.processDefaultAcknowledgement(shape, diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java index b43bddc727e68..b9db175cfb784 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java @@ -55,9 +55,8 @@ public final class ReactiveMessagingDotNames { static final DotName EMITTER_FACTORY_FOR = DotName.createSimple(EmitterFactoryFor.class.getName()); static final DotName INCOMING_CONNECTOR_FACTORY = DotName.createSimple(IncomingConnectorFactory.class.getName()); - static final DotName OUTGOING_CONNECTOR_FACTORY = DotName.createSimple(OutgoingConnectorFactory.class.getName()); - static final DotName INBOUND_CONNECTOR = DotName.createSimple(InboundConnector.class.getName()); + static final DotName OUTGOING_CONNECTOR_FACTORY = DotName.createSimple(OutgoingConnectorFactory.class.getName()); static final DotName OUTBOUND_CONNECTOR = DotName.createSimple(OutboundConnector.class.getName()); static final DotName SMALLRYE_BLOCKING = DotName.createSimple(io.smallrye.common.annotation.Blocking.class.getName()); diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/SmallRyeBlockingSubscriberTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/SmallRyeBlockingSubscriberTest.java index 4d89322315f35..68af7c38ffd81 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/SmallRyeBlockingSubscriberTest.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/SmallRyeBlockingSubscriberTest.java @@ -5,6 +5,7 @@ import java.io.File; import java.util.List; +import java.util.concurrent.Flow; import java.util.stream.Collectors; import jakarta.enterprise.context.ApplicationScoped; @@ -13,7 +14,6 @@ import org.eclipse.microprofile.reactive.messaging.Outgoing; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.reactivestreams.Publisher; import io.quarkus.smallrye.reactivemessaging.blocking.beans.IncomingUsingSmallRyeBlocking; import io.quarkus.test.QuarkusUnitTest; @@ -47,7 +47,7 @@ public void testIncomingUsingRunOnWorkerThread() { @ApplicationScoped public static class ProduceIn { @Outgoing("in") - public Publisher produce() { + public Flow.Publisher produce() { return Multi.createFrom().items("a", "b", "c", "d", "e", "f"); } } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/TransactionalSubscriberTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/TransactionalSubscriberTest.java index 04b6134a478a7..81e3a7639d388 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/TransactionalSubscriberTest.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/TransactionalSubscriberTest.java @@ -5,6 +5,7 @@ import java.io.File; import java.util.List; +import java.util.concurrent.Flow; import java.util.stream.Collectors; import jakarta.enterprise.context.ApplicationScoped; @@ -13,7 +14,6 @@ import org.eclipse.microprofile.reactive.messaging.Outgoing; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.reactivestreams.Publisher; import io.quarkus.smallrye.reactivemessaging.blocking.beans.IncomingUsingTransactional; import io.quarkus.test.QuarkusUnitTest; @@ -47,7 +47,7 @@ public void testIncomingUsingRunOnWorkerThread() { @ApplicationScoped public static class ProduceIn { @Outgoing("in") - public Publisher produce() { + public Flow.Publisher produce() { return Multi.createFrom().items("a", "b", "c", "d", "e", "f"); } } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/blocking/BlockingSubscriberTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/blocking/BlockingSubscriberTest.java index 76a4eb3fd2eb6..9db7283577cea 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/blocking/BlockingSubscriberTest.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/blocking/BlockingSubscriberTest.java @@ -5,6 +5,7 @@ import java.io.File; import java.util.List; +import java.util.concurrent.Flow; import java.util.stream.Collectors; import jakarta.enterprise.context.ApplicationScoped; @@ -13,7 +14,6 @@ import org.eclipse.microprofile.reactive.messaging.Outgoing; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.reactivestreams.Publisher; import io.quarkus.smallrye.reactivemessaging.blocking.beans.IncomingCustomTwoBlockingBean; import io.quarkus.smallrye.reactivemessaging.blocking.beans.IncomingCustomUnorderedBlockingBean; @@ -72,7 +72,7 @@ public void testIncomingBlockingCustomPoolTwo() { public static class ProduceIn { @Outgoing("in") @Broadcast(2) - public Publisher produce() { + public Flow.Publisher produce() { return Multi.createFrom().items("a", "b", "c", "d", "e", "f"); } } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/blocking/SmallRyeBlockingSubscriberTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/blocking/SmallRyeBlockingSubscriberTest.java index f2792b6cca643..dddaaf8838292 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/blocking/SmallRyeBlockingSubscriberTest.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/blocking/SmallRyeBlockingSubscriberTest.java @@ -5,6 +5,7 @@ import java.io.File; import java.util.List; +import java.util.concurrent.Flow; import java.util.stream.Collectors; import jakarta.enterprise.context.ApplicationScoped; @@ -13,7 +14,6 @@ import org.eclipse.microprofile.reactive.messaging.Outgoing; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.reactivestreams.Publisher; import io.quarkus.smallrye.reactivemessaging.blocking.beans.IncomingUsingSmallRyeBlocking; import io.quarkus.test.QuarkusUnitTest; @@ -47,7 +47,7 @@ public void testIncomingUsingRunOnWorkerThread() { @ApplicationScoped public static class ProduceIn { @Outgoing("in") - public Publisher produce() { + public Flow.Publisher produce() { return Multi.createFrom().items("a", "b", "c", "d", "e", "f"); } } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/channels/DeprecatedChannelConsumer.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/channels/DeprecatedChannelConsumer.java index 9ec142ab201a9..084ddf6a2678c 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/channels/DeprecatedChannelConsumer.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/channels/DeprecatedChannelConsumer.java @@ -13,6 +13,7 @@ import io.smallrye.mutiny.Multi; import io.smallrye.reactive.messaging.annotations.Channel; +import mutiny.zero.flow.adapters.AdaptersToFlow; @ApplicationScoped public class DeprecatedChannelConsumer { @@ -22,7 +23,7 @@ public class DeprecatedChannelConsumer { Publisher> sourceStream; public List consume() { - return Multi.createFrom().publisher(sourceStream) + return Multi.createFrom().publisher(AdaptersToFlow.publisher(sourceStream)) .onItem().transform(Message::getPayload) .collect().asList() .await().indefinitely(); diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/signatures/IncomingsTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/signatures/IncomingsTest.java index 4102b9e5124a6..8800f0cc06904 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/signatures/IncomingsTest.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/signatures/IncomingsTest.java @@ -5,6 +5,7 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Flow; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -13,7 +14,6 @@ import org.eclipse.microprofile.reactive.messaging.Outgoing; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.reactivestreams.Publisher; import io.quarkus.test.QuarkusUnitTest; import io.smallrye.mutiny.Multi; @@ -39,7 +39,7 @@ public void testIncomingsWithTwoSources() { public static class ProducerOnA { @Outgoing("a") - public Publisher produce() { + public Flow.Publisher produce() { return Multi.createFrom().items("a", "b", "c"); } @@ -49,7 +49,7 @@ public Publisher produce() { public static class ProducerOnB { @Outgoing("b") - public Publisher produce() { + public Flow.Publisher produce() { return Multi.createFrom().items("d", "e", "f"); } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentChannelTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentChannelTest.java index 647fea7bb98f7..3b9650e229ec0 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentChannelTest.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentChannelTest.java @@ -23,6 +23,7 @@ import io.quarkus.test.QuarkusUnitTest; import io.smallrye.mutiny.Multi; +import mutiny.zero.flow.adapters.AdaptersToReactiveStreams; public class ConnectorAttachmentChannelTest { @@ -45,7 +46,8 @@ static class MyDummyConnector implements IncomingConnectorFactory { @Override public PublisherBuilder> getPublisherBuilder(Config config) { - return ReactiveStreams.fromPublisher(Multi.createFrom().range(0, 5).map(Message::of)); + return ReactiveStreams + .fromPublisher(AdaptersToReactiveStreams.publisher(Multi.createFrom().range(0, 5).map(Message::of))); } } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentCustomEmitterTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentCustomEmitterTest.java index 37ef0f8a9fc51..e8dfb8b826dc6 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentCustomEmitterTest.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentCustomEmitterTest.java @@ -5,6 +5,7 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Flow; import jakarta.annotation.PostConstruct; import jakarta.enterprise.context.ApplicationScoped; @@ -24,7 +25,6 @@ import org.jboss.shrinkwrap.api.spec.JavaArchive; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.reactivestreams.Publisher; import io.quarkus.test.QuarkusUnitTest; import io.smallrye.mutiny.Multi; @@ -89,7 +89,7 @@ interface CustomEmitter extends EmitterType { static class CustomEmitterImpl implements MessagePublisherProvider, CustomEmitter { @Override - public Publisher> getPublisher() { + public Flow.Publisher> getPublisher() { return Multi.createFrom().range(0, 5).map(Message::of).map(m -> (Message) m); } } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentIncomingTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentIncomingTest.java index 9c0842b500c35..4968931d27d56 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentIncomingTest.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentIncomingTest.java @@ -22,6 +22,7 @@ import io.quarkus.test.QuarkusUnitTest; import io.smallrye.mutiny.Multi; +import mutiny.zero.flow.adapters.AdaptersToReactiveStreams; public class ConnectorAttachmentIncomingTest { @@ -44,7 +45,8 @@ static class MyDummyConnector implements IncomingConnectorFactory { @Override public PublisherBuilder> getPublisherBuilder(Config config) { - return ReactiveStreams.fromPublisher(Multi.createFrom().range(0, 5).map(Message::of)); + return ReactiveStreams + .fromPublisher(AdaptersToReactiveStreams.publisher(Multi.createFrom().range(0, 5).map(Message::of))); } } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/DisabledConnectorAttachmentIncomingTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/DisabledConnectorAttachmentIncomingTest.java index 82a04a471ff40..dd059d54a748d 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/DisabledConnectorAttachmentIncomingTest.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/DisabledConnectorAttachmentIncomingTest.java @@ -23,6 +23,7 @@ import io.quarkus.test.QuarkusUnitTest; import io.smallrye.mutiny.Multi; +import mutiny.zero.flow.adapters.AdaptersToReactiveStreams; public class DisabledConnectorAttachmentIncomingTest { @@ -48,7 +49,8 @@ static class MyDummyConnector implements IncomingConnectorFactory { @Override public PublisherBuilder> getPublisherBuilder(Config config) { - return ReactiveStreams.fromPublisher(Multi.createFrom().range(0, 5).map(Message::of)); + return ReactiveStreams + .fromPublisher(AdaptersToReactiveStreams.publisher(Multi.createFrom().range(0, 5).map(Message::of))); } } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/IncomingChannelWithDotsTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/IncomingChannelWithDotsTest.java index 2217eebb3161e..9ffb14588289b 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/IncomingChannelWithDotsTest.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/IncomingChannelWithDotsTest.java @@ -22,6 +22,7 @@ import io.quarkus.test.QuarkusUnitTest; import io.smallrye.mutiny.Multi; +import mutiny.zero.flow.adapters.AdaptersToReactiveStreams; public class IncomingChannelWithDotsTest { @@ -44,7 +45,8 @@ static class MyDummyConnector implements IncomingConnectorFactory { @Override public PublisherBuilder> getPublisherBuilder(Config config) { - return ReactiveStreams.fromPublisher(Multi.createFrom().range(0, 5).map(Message::of)); + return ReactiveStreams + .fromPublisher(AdaptersToReactiveStreams.publisher(Multi.createFrom().range(0, 5).map(Message::of))); } } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/TwoConnectorsAttachmentIncomingTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/TwoConnectorsAttachmentIncomingTest.java index 68f708c54b04f..5ffe5ef6888cf 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/TwoConnectorsAttachmentIncomingTest.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/TwoConnectorsAttachmentIncomingTest.java @@ -23,6 +23,7 @@ import io.quarkus.test.QuarkusUnitTest; import io.smallrye.mutiny.Multi; +import mutiny.zero.flow.adapters.AdaptersToReactiveStreams; public class TwoConnectorsAttachmentIncomingTest { @@ -47,7 +48,8 @@ static class MyDummyConnector implements IncomingConnectorFactory { @Override public PublisherBuilder> getPublisherBuilder(Config config) { - return ReactiveStreams.fromPublisher(Multi.createFrom().range(0, 5).map(Message::of)); + return ReactiveStreams + .fromPublisher(AdaptersToReactiveStreams.publisher(Multi.createFrom().range(0, 5).map(Message::of))); } } @@ -57,7 +59,8 @@ static class MySecondDummyConnector implements IncomingConnectorFactory { @Override public PublisherBuilder> getPublisherBuilder(Config config) { - return ReactiveStreams.fromPublisher(Multi.createFrom().range(0, 5).map(Message::of)); + return ReactiveStreams + .fromPublisher(AdaptersToReactiveStreams.publisher(Multi.createFrom().range(0, 5).map(Message::of))); } } diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/DuplicatedContextConnectorFactoryInterceptor.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/DuplicatedContextConnectorFactoryInterceptor.java index 3bb3703f4af2e..7ac562c0b1f4a 100644 --- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/DuplicatedContextConnectorFactoryInterceptor.java +++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/DuplicatedContextConnectorFactoryInterceptor.java @@ -1,6 +1,7 @@ package io.quarkus.smallrye.reactivemessaging.runtime; import java.util.Optional; +import java.util.concurrent.Flow; import jakarta.annotation.Priority; import jakarta.interceptor.AroundInvoke; @@ -9,7 +10,6 @@ import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; -import org.reactivestreams.Publisher; import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle; import io.smallrye.common.vertx.VertxContext; @@ -29,7 +29,7 @@ public Object intercept(InvocationContext ctx) throws Exception { return result.map(DuplicatedContextConnectorFactoryInterceptor::setMessageContextSafe); } if (ctx.getMethod().getName().equals("getPublisher")) { - Publisher> result = (Publisher>) ctx.proceed(); + Flow.Publisher> result = (Flow.Publisher>) ctx.proceed(); return Multi.createFrom().publisher(result) .map(DuplicatedContextConnectorFactoryInterceptor::setMessageContextSafe); } diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusMediatorConfiguration.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusMediatorConfiguration.java index 48c3309a2dc1d..054e1111d8960 100644 --- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusMediatorConfiguration.java +++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusMediatorConfiguration.java @@ -53,6 +53,8 @@ public class QuarkusMediatorConfiguration implements MediatorConfiguration { private Type ingestedPayload; + private boolean useReactiveStreams = false; + public String getBeanId() { return beanId; } @@ -260,4 +262,17 @@ public void setBlockingExecutionOrdered(boolean blockingExecutionOrdered) { public void setWorkerPoolName(String workerPoolName) { this.workerPoolName = workerPoolName; } + + public boolean isUseReactiveStreams() { + return useReactiveStreams; + } + + public void setUseReactiveStreams(boolean usesReactiveStreams) { + this.useReactiveStreams = usesReactiveStreams; + } + + @Override + public boolean usesReactiveStreams() { + return useReactiveStreams; + } } diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/devmode/DevModeSupportConnectorFactoryInterceptor.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/devmode/DevModeSupportConnectorFactoryInterceptor.java index e006416e9a4b8..ffeaae2e883b6 100644 --- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/devmode/DevModeSupportConnectorFactoryInterceptor.java +++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/devmode/DevModeSupportConnectorFactoryInterceptor.java @@ -1,6 +1,7 @@ package io.quarkus.smallrye.reactivemessaging.runtime.devmode; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Flow; import java.util.function.Supplier; import jakarta.annotation.Priority; @@ -12,7 +13,6 @@ import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder; -import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -50,7 +50,7 @@ public Object intercept(InvocationContext ctx) throws Exception { }); } if (ctx.getMethod().getName().equals("getPublisher")) { - Publisher> result = (Publisher>) ctx.proceed(); + Flow.Publisher> result = (Flow.Publisher>) ctx.proceed(); return Multi.createFrom().publisher(result) .onItem().transformToUniAndConcatenate(msg -> Uni.createFrom().emitter(e -> { onMessage.get().whenComplete((restarted, error) -> { @@ -94,12 +94,12 @@ public void onComplete() { }); } if (ctx.getMethod().getName().equals("getSubscriber")) { - Subscriber> result = (Subscriber>) ctx.proceed(); - return new Subscriber>() { - private Subscriber> subscriber; + Flow.Subscriber> result = (Flow.Subscriber>) ctx.proceed(); + return new Flow.Subscriber>() { + private Flow.Subscriber> subscriber; @Override - public void onSubscribe(Subscription s) { + public void onSubscribe(Flow.Subscription s) { subscriber = result; subscriber.onSubscribe(s); } diff --git a/independent-projects/arc/pom.xml b/independent-projects/arc/pom.xml index 765663e81baf3..59b46fd781f2b 100644 --- a/independent-projects/arc/pom.xml +++ b/independent-projects/arc/pom.xml @@ -52,7 +52,7 @@ 2.1.1 1.6.0.Final 3.0.0 - 1.8.0 + 2.1.0 3.0.0-M8 1.6.8 diff --git a/independent-projects/qute/pom.xml b/independent-projects/qute/pom.xml index 4198521a2c067..52603dd01d575 100644 --- a/independent-projects/qute/pom.xml +++ b/independent-projects/qute/pom.xml @@ -47,7 +47,7 @@ 3.5.0.Final 3.0.0-M8 1.6.8 - 1.9.0 + 2.1.0 diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/MultiByteHttpData.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/MultiByteHttpData.java index 3223884280ba7..f85b6c136b5d4 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/MultiByteHttpData.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/MultiByteHttpData.java @@ -20,10 +20,10 @@ import java.io.InputStream; import java.nio.charset.Charset; import java.util.concurrent.Executor; +import java.util.concurrent.Flow.Subscription; import java.util.function.Consumer; import org.jboss.logging.Logger; -import org.reactivestreams.Subscription; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; diff --git a/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/ResteasyReactiveDotNames.java b/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/ResteasyReactiveDotNames.java index a80eba40712df..dd8e13857d185 100644 --- a/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/ResteasyReactiveDotNames.java +++ b/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/ResteasyReactiveDotNames.java @@ -21,6 +21,7 @@ import java.util.SortedSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.Flow.Publisher; import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; @@ -93,7 +94,6 @@ import org.jboss.resteasy.reactive.RestSseElementType; import org.jboss.resteasy.reactive.RestStreamElementType; import org.jboss.resteasy.reactive.Separator; -import org.reactivestreams.Publisher; import io.smallrye.common.annotation.Blocking; import io.smallrye.common.annotation.NonBlocking; @@ -205,6 +205,7 @@ public final class ResteasyReactiveDotNames { public static final DotName COMPLETION_STAGE = DotName.createSimple(CompletionStage.class.getName()); public static final DotName COMPLETABLE_FUTURE = DotName.createSimple(CompletableFuture.class.getName()); public static final DotName PUBLISHER = DotName.createSimple(Publisher.class.getName()); + public static final DotName LEGACY_PUBLISHER = DotName.createSimple(org.reactivestreams.Publisher.class.getName()); public static final DotName REST_RESPONSE = DotName.createSimple(RestResponse.class.getName()); public static final DotName INTEGER = DotName.createSimple(Integer.class.getName()); diff --git a/independent-projects/resteasy-reactive/common/runtime/pom.xml b/independent-projects/resteasy-reactive/common/runtime/pom.xml index dcb2c969b0ca3..c3dca5b8bd030 100644 --- a/independent-projects/resteasy-reactive/common/runtime/pom.xml +++ b/independent-projects/resteasy-reactive/common/runtime/pom.xml @@ -22,6 +22,14 @@ io.smallrye.reactive mutiny + + org.reactivestreams + reactive-streams + + + io.smallrye.reactive + mutiny-zero-flow-adapters + io.smallrye.common smallrye-common-annotation diff --git a/independent-projects/resteasy-reactive/pom.xml b/independent-projects/resteasy-reactive/pom.xml index e42e3d25ccfe0..5f4d9dae9699a 100644 --- a/independent-projects/resteasy-reactive/pom.xml +++ b/independent-projects/resteasy-reactive/pom.xml @@ -59,7 +59,7 @@ 3.0.0-M8 1.6.8 - 1.9.0 + 2.1.0 1.13.2 4.3.5 4.5.1 @@ -70,7 +70,9 @@ 3.0.2 3.0.0 4.2.0 - 2.25.0 + 3.0.0 + 1.0.4 + 1.0.0 3.1 @@ -217,11 +219,22 @@ ${vertx.version} + + org.reactivestreams + reactive-streams + ${reactive-streams.version} + + io.smallrye.reactive mutiny ${mutiny.version} + + io.smallrye.reactive + mutiny-zero-flow-adapters + ${mutiny-zero.version} + io.smallrye.reactive smallrye-mutiny-vertx-core diff --git a/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/scanning/AsyncReturnTypeScanner.java b/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/scanning/AsyncReturnTypeScanner.java index d04a8d4133af4..5cf0e9ce56ad4 100644 --- a/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/scanning/AsyncReturnTypeScanner.java +++ b/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/scanning/AsyncReturnTypeScanner.java @@ -2,6 +2,7 @@ import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.COMPLETABLE_FUTURE; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.COMPLETION_STAGE; +import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.LEGACY_PUBLISHER; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.MULTI; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.PUBLISHER; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.UNI; @@ -32,7 +33,7 @@ public List scan(MethodInfo method, ClassInfo actualEndp return Collections.singletonList(new FixedHandlerChainCustomizer(new UniResponseHandler(), HandlerChainCustomizer.Phase.AFTER_METHOD_INVOKE)); } - if (returnTypeName.equals(MULTI) || returnTypeName.equals(PUBLISHER)) { + if (returnTypeName.equals(MULTI) || returnTypeName.equals(PUBLISHER) || returnTypeName.equals(LEGACY_PUBLISHER)) { return Collections.singletonList(new FixedHandlerChainCustomizer(new PublisherResponseHandler(), HandlerChainCustomizer.Phase.AFTER_METHOD_INVOKE)); } @@ -43,6 +44,7 @@ public List scan(MethodInfo method, ClassInfo actualEndp public boolean isMethodSignatureAsync(MethodInfo method) { DotName returnTypeName = method.returnType().name(); return returnTypeName.equals(COMPLETION_STAGE) || returnTypeName.equals(COMPLETABLE_FUTURE) || - returnTypeName.equals(UNI) || returnTypeName.equals(MULTI) || returnTypeName.equals(PUBLISHER); + returnTypeName.equals(UNI) || returnTypeName.equals(MULTI) || returnTypeName.equals(PUBLISHER) || + returnTypeName.equals(LEGACY_PUBLISHER); } } diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java index c06a031fe7713..d10f6fcfdaecd 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java @@ -6,6 +6,9 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.Flow.Publisher; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -23,9 +26,8 @@ import org.jboss.resteasy.reactive.server.model.HandlerChainCustomizer.Phase; import org.jboss.resteasy.reactive.server.spi.ServerRestHandler; import org.jboss.resteasy.reactive.server.spi.StreamingResponse; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; + +import mutiny.zero.flow.adapters.AdaptersToFlow; /** * This handler is used to push streams of data to the client. @@ -246,8 +248,12 @@ public void handle(ResteasyReactiveRequestContext requestContext) @Override public void handle(ResteasyReactiveRequestContext requestContext) throws Exception { // FIXME: handle Response with entity being a Multi - if (requestContext.getResult() instanceof Publisher) { - Publisher result = (Publisher) requestContext.getResult(); + Object requestContextResult = requestContext.getResult(); + if (requestContextResult instanceof org.reactivestreams.Publisher) { + requestContextResult = AdaptersToFlow.publisher((org.reactivestreams.Publisher) requestContextResult); + } + if (requestContextResult instanceof Publisher) { + Publisher result = (Publisher) requestContextResult; // FIXME: if we make a pretend Response and go through the normal route, we will get // media type negotiation and fixed entity writer set up, perhaps it's better than // cancelling the normal route? diff --git a/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/stream/StreamResource.java b/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/stream/StreamResource.java index 17d71f5f4bc5f..41aebefcb6d1b 100644 --- a/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/stream/StreamResource.java +++ b/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/stream/StreamResource.java @@ -1,6 +1,7 @@ package org.jboss.resteasy.reactive.server.vertx.test.stream; import java.util.Date; +import java.util.concurrent.Flow.Publisher; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -11,12 +12,12 @@ import jakarta.ws.rs.core.MediaType; import org.jboss.resteasy.reactive.common.util.MultiCollectors; -import org.reactivestreams.Publisher; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.infrastructure.Infrastructure; import io.vertx.core.buffer.Buffer; +import mutiny.zero.flow.adapters.AdaptersToReactiveStreams; @Path("stream") public class StreamResource { @@ -44,6 +45,13 @@ public Publisher getStreamedTextPublisher() { return Multi.createFrom().items("foo", "bar"); } + @Path("text/stream/legacy-publisher") + @GET + @Produces(MediaType.TEXT_PLAIN) + public org.reactivestreams.Publisher getStreamedTextLegacyPublisher() { + return AdaptersToReactiveStreams.publisher(Multi.createFrom().items("foo", "bar")); + } + @Path("byte-arrays/collect") @GET @Produces(MediaType.TEXT_PLAIN) diff --git a/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/stream/StreamTestCase.java b/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/stream/StreamTestCase.java index 10ff84498cda4..d97c50659ec71 100644 --- a/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/stream/StreamTestCase.java +++ b/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/stream/StreamTestCase.java @@ -104,6 +104,10 @@ public void testStreaming() throws Exception { .then() .statusCode(200) .body(Matchers.equalTo("foobar")); + RestAssured.get("/stream/text/stream/legacy-publisher") + .then() + .statusCode(200) + .body(Matchers.equalTo("foobar")); RestAssured.get("/stream/text/collect") .then() .statusCode(200) diff --git a/integration-tests/devmode/src/test/java/io/quarkus/test/devconsole/MyProcessor.java b/integration-tests/devmode/src/test/java/io/quarkus/test/devconsole/MyProcessor.java index affecb9fdf6c6..d9a63239dff43 100644 --- a/integration-tests/devmode/src/test/java/io/quarkus/test/devconsole/MyProcessor.java +++ b/integration-tests/devmode/src/test/java/io/quarkus/test/devconsole/MyProcessor.java @@ -1,5 +1,7 @@ package io.quarkus.test.devconsole; +import java.util.concurrent.Flow; + import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; import jakarta.enterprise.inject.Instance; @@ -8,9 +10,6 @@ import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.Outgoing; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; import io.smallrye.reactive.messaging.annotations.Broadcast; import io.vertx.core.http.HttpServerResponse; @@ -21,14 +20,14 @@ public class MyProcessor { @Inject @Channel("processed") - Instance> channel; + Instance> channel; public void init(@Observes Router router) { router.get().handler(rc -> { HttpServerResponse response = rc.response(); - channel.get().subscribe(new Subscriber() { + channel.get().subscribe(new Flow.Subscriber() { @Override - public void onSubscribe(Subscription subscription) { + public void onSubscribe(Flow.Subscription subscription) { response.putHeader("Transfer-encoding", "chunked"); subscription.request(2); } diff --git a/integration-tests/main/src/main/java/io/quarkus/it/reactive/ReactiveStreamOpsResource.java b/integration-tests/main/src/main/java/io/quarkus/it/reactive/ReactiveStreamOpsResource.java index 828d18c69d3ab..a78519c8d7ca6 100644 --- a/integration-tests/main/src/main/java/io/quarkus/it/reactive/ReactiveStreamOpsResource.java +++ b/integration-tests/main/src/main/java/io/quarkus/it/reactive/ReactiveStreamOpsResource.java @@ -6,6 +6,7 @@ import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; import io.smallrye.mutiny.Multi; +import mutiny.zero.flow.adapters.AdaptersToReactiveStreams; @Path("/reactive") public class ReactiveStreamOpsResource { @@ -30,7 +31,7 @@ public String stream1() { @Path("/stream-mutiny") public String stream2() { StringBuilder builder = new StringBuilder(); - ReactiveStreams.fromPublisher(Multi.createFrom().items("d", "e", "f")) + ReactiveStreams.fromPublisher(AdaptersToReactiveStreams.publisher(Multi.createFrom().items("d", "e", "f"))) .map(String::toUpperCase) .forEach(builder::append) .run(); diff --git a/integration-tests/mongodb-panache-kotlin/src/main/kotlin/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookEntityResource.kt b/integration-tests/mongodb-panache-kotlin/src/main/kotlin/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookEntityResource.kt index 8237bed5f257c..9984585251220 100644 --- a/integration-tests/mongodb-panache-kotlin/src/main/kotlin/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookEntityResource.kt +++ b/integration-tests/mongodb-panache-kotlin/src/main/kotlin/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookEntityResource.kt @@ -19,9 +19,9 @@ import jakarta.ws.rs.core.Response import org.bson.types.ObjectId import org.jboss.logging.Logger import org.jboss.resteasy.reactive.RestStreamElementType -import org.reactivestreams.Publisher import java.net.URI import java.time.LocalDate.parse +import java.util.concurrent.Flow @Path("/reactive/books/entity") class ReactiveBookEntityResource { @@ -43,7 +43,7 @@ class ReactiveBookEntityResource { @Path("/stream") @Produces(MediaType.SERVER_SENT_EVENTS) @RestStreamElementType(MediaType.APPLICATION_JSON) - fun streamBooks(@QueryParam("sort") sort: String?): Publisher { + fun streamBooks(@QueryParam("sort") sort: String?): Flow.Publisher { return if (sort != null) { ReactiveBookEntity.streamAll(Sort.ascending(sort)) } else ReactiveBookEntity.streamAll() diff --git a/integration-tests/mongodb-panache-kotlin/src/main/kotlin/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookRepositoryResource.kt b/integration-tests/mongodb-panache-kotlin/src/main/kotlin/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookRepositoryResource.kt index 3ca8977a3dcc0..bdd3f2baa8c0c 100644 --- a/integration-tests/mongodb-panache-kotlin/src/main/kotlin/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookRepositoryResource.kt +++ b/integration-tests/mongodb-panache-kotlin/src/main/kotlin/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookRepositoryResource.kt @@ -21,9 +21,9 @@ import jakarta.ws.rs.core.Response import org.bson.types.ObjectId import org.jboss.logging.Logger import org.jboss.resteasy.reactive.RestStreamElementType -import org.reactivestreams.Publisher import java.net.URI import java.time.LocalDate +import java.util.concurrent.Flow @Path("/reactive/books/repository") class ReactiveBookRepositoryResource { @@ -48,7 +48,7 @@ class ReactiveBookRepositoryResource { @Path("/stream") @Produces(MediaType.SERVER_SENT_EVENTS) @RestStreamElementType(MediaType.APPLICATION_JSON) - fun streamBooks(@QueryParam("sort") sort: String?): Publisher { + fun streamBooks(@QueryParam("sort") sort: String?): Flow.Publisher { return if (sort != null) { reactiveBookRepository.streamAll(Sort.ascending(sort)) } else reactiveBookRepository.streamAll() diff --git a/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookEntityResource.java b/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookEntityResource.java index 38d640841f6b9..eddd1391f6189 100644 --- a/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookEntityResource.java +++ b/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookEntityResource.java @@ -3,6 +3,7 @@ import java.net.URI; import java.time.LocalDate; import java.util.List; +import java.util.concurrent.Flow; import jakarta.annotation.PostConstruct; import jakarta.ws.rs.*; @@ -12,7 +13,6 @@ import org.bson.types.ObjectId; import org.jboss.logging.Logger; import org.jboss.resteasy.reactive.RestStreamElementType; -import org.reactivestreams.Publisher; import io.quarkus.panache.common.Parameters; import io.quarkus.panache.common.Sort; @@ -41,7 +41,7 @@ public Uni> getBooks(@QueryParam("sort") String sort) { @Path("/stream") @Produces(MediaType.SERVER_SENT_EVENTS) @RestStreamElementType(MediaType.APPLICATION_JSON) - public Publisher streamBooks(@QueryParam("sort") String sort) { + public Flow.Publisher streamBooks(@QueryParam("sort") String sort) { if (sort != null) { return ReactiveBookEntity.streamAll(Sort.ascending(sort)); } diff --git a/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookRepositoryResource.java b/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookRepositoryResource.java index b0c97b81524b7..96e21828edc16 100644 --- a/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookRepositoryResource.java +++ b/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookRepositoryResource.java @@ -3,6 +3,7 @@ import java.net.URI; import java.time.LocalDate; import java.util.List; +import java.util.concurrent.Flow; import jakarta.annotation.PostConstruct; import jakarta.inject.Inject; @@ -13,7 +14,6 @@ import org.bson.types.ObjectId; import org.jboss.logging.Logger; import org.jboss.resteasy.reactive.RestStreamElementType; -import org.reactivestreams.Publisher; import io.quarkus.it.mongodb.panache.book.Book; import io.quarkus.panache.common.Parameters; @@ -45,7 +45,7 @@ public Uni> getBooks(@QueryParam("sort") String sort) { @Path("/stream") @Produces(MediaType.SERVER_SENT_EVENTS) @RestStreamElementType(MediaType.APPLICATION_JSON) - public Publisher streamBooks(@QueryParam("sort") String sort) { + public Flow.Publisher streamBooks(@QueryParam("sort") String sort) { if (sort != null) { return reactiveBookRepository.streamAll(Sort.ascending(sort)); } diff --git a/integration-tests/smallrye-context-propagation/src/test/java/io/quarkus/context/test/mutiny/MutinyContextEndpoint.java b/integration-tests/smallrye-context-propagation/src/test/java/io/quarkus/context/test/mutiny/MutinyContextEndpoint.java index d3e442c56e6b3..26b82ed2877cb 100644 --- a/integration-tests/smallrye-context-propagation/src/test/java/io/quarkus/context/test/mutiny/MutinyContextEndpoint.java +++ b/integration-tests/smallrye-context-propagation/src/test/java/io/quarkus/context/test/mutiny/MutinyContextEndpoint.java @@ -4,6 +4,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Flow; import jakarta.annotation.PreDestroy; import jakarta.inject.Inject; @@ -23,7 +24,6 @@ import org.eclipse.microprofile.context.ManagedExecutor; import org.eclipse.microprofile.context.ThreadContext; import org.junit.jupiter.api.Assertions; -import org.reactivestreams.Publisher; import org.wildfly.common.Assert; import io.quarkus.arc.Arc; @@ -455,7 +455,7 @@ public Multi transactionPropagationWithMulti() throws SystemException { @Transactional @GET @Path("/transaction-multi-2") - public Publisher transactionPropagationWithMulti2() { + public Flow.Publisher transactionPropagationWithMulti2() { Multi ret = Multi.createFrom().item("OK"); // now delete both entities Assertions.assertEquals(2, Person.deleteAll());