diff --git a/bom/application/pom.xml b/bom/application/pom.xml index 6e1d7b25a8d297..ad5e6c795e6cc6 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -59,15 +59,15 @@ <smallrye-health.version>4.0.1</smallrye-health.version> <smallrye-metrics.version>4.0.0</smallrye-metrics.version> <smallrye-open-api.version>3.1.1</smallrye-open-api.version> - <smallrye-graphql.version>2.0.1</smallrye-graphql.version> + <smallrye-graphql.version>2.1.0.Beta1</smallrye-graphql.version> <smallrye-opentracing.version>3.0.0</smallrye-opentracing.version> <smallrye-fault-tolerance.version>6.1.0</smallrye-fault-tolerance.version> <smallrye-jwt.version>4.0.0</smallrye-jwt.version> <smallrye-context-propagation.version>2.0.0</smallrye-context-propagation.version> <smallrye-reactive-streams-operators.version>1.0.13</smallrye-reactive-streams-operators.version> - <smallrye-reactive-types-converter.version>2.7.0</smallrye-reactive-types-converter.version> - <smallrye-mutiny-vertx-binding.version>2.30.1</smallrye-mutiny-vertx-binding.version> - <smallrye-reactive-messaging.version>4.2.0</smallrye-reactive-messaging.version> + <smallrye-reactive-types-converter.version>3.0.0</smallrye-reactive-types-converter.version> + <smallrye-mutiny-vertx-binding.version>3.2.0</smallrye-mutiny-vertx-binding.version> + <smallrye-reactive-messaging.version>4.2.0.RC1</smallrye-reactive-messaging.version> <smallrye-stork.version>1.4.1</smallrye-stork.version> <jakarta.activation.version>2.1.0</jakarta.activation.version> <jakarta.annotation-api.version>2.1.1</jakarta.annotation-api.version> @@ -144,9 +144,9 @@ <caffeine.version>3.1.1</caffeine.version> <netty.version>4.1.87.Final</netty.version> <brotli4j.version>1.8.0</brotli4j.version> - <reactive-streams.version>1.0.3</reactive-streams.version> + <reactive-streams.version>1.0.4</reactive-streams.version> <jboss-logging.version>3.5.0.Final</jboss-logging.version> - <mutiny.version>1.9.0</mutiny.version> + <mutiny.version>2.1.0</mutiny.version> <kafka3.version>3.3.2</kafka3.version> <lz4.version>1.8.0</lz4.version> <!-- dependency of the kafka-clients that could be overridden by other imported BOMs in the platform --> <snappy.version>1.1.8.4</snappy.version> @@ -216,6 +216,7 @@ <org-crac.version>0.1.3</org-crac.version> <sshd-common.version>2.9.2</sshd-common.version> <mime4j.version>0.8.9</mime4j.version> + <mutiny-zero.version>1.0.0</mutiny-zero.version> </properties> <dependencyManagement> @@ -5451,6 +5452,11 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>io.smallrye.reactive</groupId> + <artifactId>mutiny-zero-flow-adapters</artifactId> + <version>${mutiny-zero.version}</version> + </dependency> <dependency> <groupId>org.eclipse.microprofile.reactive-streams-operators</groupId> <artifactId>microprofile-reactive-streams-operators-api</artifactId> diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/GrpcReflectionTest.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/GrpcReflectionTest.java index 1e748b893d4ce5..80d495f1e33ae2 100644 --- a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/GrpcReflectionTest.java +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/GrpcReflectionTest.java @@ -5,6 +5,7 @@ import java.util.Arrays; import java.util.List; +import java.util.concurrent.Flow; import org.jboss.shrinkwrap.api.ShrinkWrap; import org.jboss.shrinkwrap.api.spec.JavaArchive; @@ -12,8 +13,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; import com.google.protobuf.ByteString; @@ -233,15 +232,15 @@ public void testRetrievingAllExtensionNumbersOfType() { assertThat(list).containsExactlyInAnyOrderElementsOf(expected); } - private static class ResettableSubscriber<T> implements Subscriber<T> { + private static class ResettableSubscriber<T> implements Flow.Subscriber<T> { - private Subscription subscription; + private Flow.Subscription subscription; private volatile T last; private boolean completed; private Throwable failure; @Override - public void onSubscribe(Subscription subscription) { + public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; } diff --git a/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/Attachment.java b/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/Attachment.java index bc3e31c3296001..7e11540c1eb99e 100644 --- a/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/Attachment.java +++ b/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/Attachment.java @@ -3,8 +3,7 @@ import java.io.File; import java.util.Iterator; import java.util.NoSuchElementException; - -import org.reactivestreams.Publisher; +import java.util.concurrent.Flow.Publisher; import io.smallrye.mutiny.Multi; diff --git a/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/Mail.java b/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/Mail.java index 07e094bcf5b6f7..5ee50038adc2ea 100644 --- a/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/Mail.java +++ b/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/Mail.java @@ -2,8 +2,7 @@ import java.io.File; import java.util.*; - -import org.reactivestreams.Publisher; +import java.util.concurrent.Flow.Publisher; /** * Represents an e-mail. diff --git a/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/runtime/MutinyMailerImpl.java b/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/runtime/MutinyMailerImpl.java index e69674e7adcf52..302656b6e58a6e 100644 --- a/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/runtime/MutinyMailerImpl.java +++ b/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/runtime/MutinyMailerImpl.java @@ -6,6 +6,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Flow.Publisher; import java.util.function.Function; import java.util.stream.Collectors; @@ -13,7 +14,6 @@ import jakarta.inject.Inject; import org.jboss.logging.Logger; -import org.reactivestreams.Publisher; import io.quarkus.mailer.Attachment; import io.quarkus.mailer.Mail; diff --git a/extensions/mailer/runtime/src/test/java/io/quarkus/mailer/AttachmentTest.java b/extensions/mailer/runtime/src/test/java/io/quarkus/mailer/AttachmentTest.java index cc73a1ec7bad11..b1bd405c634e2d 100644 --- a/extensions/mailer/runtime/src/test/java/io/quarkus/mailer/AttachmentTest.java +++ b/extensions/mailer/runtime/src/test/java/io/quarkus/mailer/AttachmentTest.java @@ -7,12 +7,12 @@ import java.util.Iterator; import java.util.NoSuchElementException; import java.util.UUID; +import java.util.concurrent.Flow; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.reactivestreams.Publisher; import io.quarkus.mailer.runtime.MutinyMailerImpl; import io.smallrye.mutiny.Multi; @@ -70,7 +70,7 @@ void testInlineAttachmentCreationFromFile() { @Test void testAttachmentCreationFromStream() { - Publisher<Byte> publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true)) + Flow.Publisher<Byte> publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true)) .onItem().transformToMulti(af -> af.toMulti() .onItem().transformToIterable(this::getBytes)); @@ -109,7 +109,7 @@ public Byte next() { @Test void testInlineAttachmentCreationFromStream() { - Publisher<Byte> publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true)) + Flow.Publisher<Byte> publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true)) .onItem().transformToMulti(af -> af.toMulti() .onItem().transformToIterable(this::getBytes)); @@ -128,7 +128,7 @@ void testInlineAttachmentCreationFromStream() { @Test void testAttachmentCreationWithDescription() { - Publisher<Byte> publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true)) + Flow.Publisher<Byte> publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true)) .onItem().transformToMulti(af -> af.toMulti() .onItem().transformToIterable(this::getBytes)); @@ -154,7 +154,7 @@ private String getContent(Attachment attachment) { @Test void testInlineAttachmentCreationWithDescription() { - Publisher<Byte> publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true)) + Flow.Publisher<Byte> publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true)) .onItem().transformToMulti(af -> af.toMulti() .onItem().transformToIterable(this::getBytes)); diff --git a/extensions/mongodb-client/runtime/pom.xml b/extensions/mongodb-client/runtime/pom.xml index d9e0efbdeb7404..b76174a626e587 100644 --- a/extensions/mongodb-client/runtime/pom.xml +++ b/extensions/mongodb-client/runtime/pom.xml @@ -24,6 +24,10 @@ <groupId>io.quarkus</groupId> <artifactId>quarkus-mutiny</artifactId> </dependency> + <dependency> + <groupId>io.smallrye.reactive</groupId> + <artifactId>mutiny-zero-flow-adapters</artifactId> + </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-vertx</artifactId> diff --git a/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/ReactiveMongoCollectionImpl.java b/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/ReactiveMongoCollectionImpl.java index 1196ff7fc668ed..acac2af661e43e 100644 --- a/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/ReactiveMongoCollectionImpl.java +++ b/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/ReactiveMongoCollectionImpl.java @@ -1,6 +1,7 @@ package io.quarkus.mongodb.impl; import java.util.List; +import java.util.concurrent.Flow; import org.bson.Document; import org.bson.codecs.configuration.CodecRegistry; @@ -47,6 +48,7 @@ import io.quarkus.mongodb.reactive.ReactiveMongoCollection; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; +import mutiny.zero.flow.adapters.AdaptersToFlow; public class ReactiveMongoCollectionImpl<T> implements ReactiveMongoCollection<T> { @@ -262,11 +264,11 @@ public <D> Multi<D> aggregate(ClientSession clientSession, List<? extends Bson> return Wrappers.toMulti(collection.aggregate(clientSession, pipeline, clazz)); } - private <D> AggregatePublisher<D> apply(AggregateOptions options, AggregatePublisher<D> publisher) { + private <D> Flow.Publisher<D> apply(AggregateOptions options, AggregatePublisher<D> publisher) { if (options == null) { - return publisher; + return AdaptersToFlow.publisher(publisher); } - return options.apply(publisher); + return AdaptersToFlow.publisher(options.apply(publisher)); } @Override @@ -409,11 +411,11 @@ public Multi<T> mapReduce(String mapFunction, String reduceFunction, MapReduceOp return Multi.createFrom().publisher(apply(options, collection.mapReduce(mapFunction, reduceFunction))); } - private <D> MapReducePublisher<D> apply(MapReduceOptions options, MapReducePublisher<D> mapReduce) { + private <D> Flow.Publisher<D> apply(MapReduceOptions options, MapReducePublisher<D> mapReduce) { if (options == null) { - return mapReduce; + return AdaptersToFlow.publisher(mapReduce); } - return options.apply(mapReduce); + return AdaptersToFlow.publisher(options.apply(mapReduce)); } @Override diff --git a/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/ReactiveMongoDatabaseImpl.java b/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/ReactiveMongoDatabaseImpl.java index 41249cdf9f7ba2..b439c44f8cdd5f 100644 --- a/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/ReactiveMongoDatabaseImpl.java +++ b/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/ReactiveMongoDatabaseImpl.java @@ -1,6 +1,7 @@ package io.quarkus.mongodb.impl; import java.util.List; +import java.util.concurrent.Flow; import org.bson.Document; import org.bson.conversions.Bson; @@ -22,6 +23,7 @@ import io.quarkus.mongodb.reactive.ReactiveMongoDatabase; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; +import mutiny.zero.flow.adapters.AdaptersToFlow; public class ReactiveMongoDatabaseImpl implements ReactiveMongoDatabase { @@ -127,12 +129,11 @@ public <T> Multi<T> listCollections(Class<T> clazz, CollectionListOptions option return Multi.createFrom().publisher(apply(options, database.listCollections(clazz))); } - private <T> ListCollectionsPublisher<T> apply(CollectionListOptions options, - ListCollectionsPublisher<T> collections) { + private <T> Flow.Publisher<T> apply(CollectionListOptions options, ListCollectionsPublisher<T> collections) { if (options == null) { - return collections; + return AdaptersToFlow.publisher(collections); } else { - return options.apply(collections); + return AdaptersToFlow.publisher(options.apply(collections)); } } diff --git a/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/Wrappers.java b/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/Wrappers.java index 7cb38ba3d350b1..f9474f1899689b 100644 --- a/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/Wrappers.java +++ b/extensions/mongodb-client/runtime/src/main/java/io/quarkus/mongodb/impl/Wrappers.java @@ -8,6 +8,7 @@ import io.smallrye.mutiny.Uni; import io.vertx.core.Context; import io.vertx.core.Vertx; +import mutiny.zero.flow.adapters.AdaptersToFlow; class Wrappers { @@ -17,7 +18,7 @@ private Wrappers() { static <T> Uni<T> toUni(Publisher<T> publisher) { Context context = Vertx.currentContext(); - Uni<T> uni = Uni.createFrom().publisher(publisher); + Uni<T> uni = Uni.createFrom().publisher(AdaptersToFlow.publisher(publisher)); if (context != null) { return uni.emitOn(command -> context.runOnContext(x -> command.run())); } @@ -27,15 +28,16 @@ static <T> Uni<T> toUni(Publisher<T> publisher) { static <T> Multi<T> toMulti(Publisher<T> publisher) { Context context = Vertx.currentContext(); if (context != null) { - return Multi.createFrom().publisher(publisher).emitOn(command -> context.runOnContext(x -> command.run())); + return Multi.createFrom().publisher(AdaptersToFlow.publisher(publisher)) + .emitOn(command -> context.runOnContext(x -> command.run())); } else { - return Multi.createFrom().publisher(publisher); + return Multi.createFrom().publisher(AdaptersToFlow.publisher(publisher)); } } static <T> Uni<List<T>> toUniOfList(Publisher<T> publisher) { Context context = Vertx.currentContext(); - Uni<List<T>> uni = Multi.createFrom().publisher(publisher) + Uni<List<T>> uni = Multi.createFrom().publisher(AdaptersToFlow.publisher(publisher)) .collect().asList(); if (context != null) { diff --git a/extensions/narayana-jta/runtime/pom.xml b/extensions/narayana-jta/runtime/pom.xml index a15244b309cacd..6a9caf4cbd5fce 100644 --- a/extensions/narayana-jta/runtime/pom.xml +++ b/extensions/narayana-jta/runtime/pom.xml @@ -51,6 +51,10 @@ <groupId>io.smallrye.reactive</groupId> <artifactId>smallrye-reactive-converter-mutiny</artifactId> </dependency> + <dependency> + <groupId>io.smallrye.reactive</groupId> + <artifactId>mutiny-zero-flow-adapters</artifactId> + </dependency> <dependency> <groupId>org.graalvm.nativeimage</groupId> <artifactId>svm</artifactId> diff --git a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/interceptor/TransactionalInterceptorBase.java b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/interceptor/TransactionalInterceptorBase.java index c281e78eae50be..d965b3b1da999a 100644 --- a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/interceptor/TransactionalInterceptorBase.java +++ b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/interceptor/TransactionalInterceptorBase.java @@ -9,6 +9,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Flow; import java.util.function.Function; import jakarta.inject.Inject; @@ -33,6 +34,8 @@ import io.smallrye.mutiny.Multi; import io.smallrye.reactive.converters.ReactiveTypeConverter; import io.smallrye.reactive.converters.Registry; +import mutiny.zero.flow.adapters.AdaptersToFlow; +import mutiny.zero.flow.adapters.AdaptersToReactiveStreams; public abstract class TransactionalInterceptorBase implements Serializable { @@ -138,8 +141,7 @@ protected Object invokeInOurTx(InvocationContext ic, TransactionManager tm, Runn // handle asynchronously if not throwing if (!throwing && ret != null) { ReactiveTypeConverter<Object> converter = null; - if (ret instanceof CompletionStage == false - && (ret instanceof Publisher == false || ic.getMethod().getReturnType() != Publisher.class)) { + if (!isCompletionStage(ret) && !isSomePublisher(ic, ret)) { @SuppressWarnings({ "rawtypes", "unchecked" }) Optional<ReactiveTypeConverter<Object>> lookup = Registry.lookup((Class) ret.getClass()); if (lookup.isPresent()) { @@ -151,12 +153,15 @@ protected Object invokeInOurTx(InvocationContext ic, TransactionManager tm, Runn } } } - if (ret instanceof CompletionStage) { + if (isCompletionStage(ret)) { ret = handleAsync(tm, tx, ic, ret, afterEndTransaction); // convert back if (converter != null) ret = converter.fromCompletionStage((CompletionStage<?>) ret); - } else if (ret instanceof Publisher) { + } else if (isFlowPublisher(ret)) { + // FIXME this needs to be tested + ret = handleAsync(tm, tx, ic, ret, afterEndTransaction); + } else if (isLegacyPublisher(ret)) { ret = handleAsync(tm, tx, ic, ret, afterEndTransaction); // convert back if (converter != null) @@ -173,6 +178,23 @@ protected Object invokeInOurTx(InvocationContext ic, TransactionManager tm, Runn return ret; } + private static boolean isLegacyPublisher(Object ret) { + return ret instanceof Publisher; + } + + private boolean isSomePublisher(InvocationContext ic, Object ret) { + return isLegacyPublisher(ret) || (ic.getMethod().getReturnType() == Publisher.class) + || isFlowPublisher(ret) || (ic.getMethod().getReturnType() == Flow.Publisher.class); + } + + private static boolean isFlowPublisher(Object ret) { + return ret instanceof Flow.Publisher; + } + + private boolean isCompletionStage(Object ret) { + return ret instanceof CompletionStage; + } + private int getTransactionTimeoutFromAnnotation(InvocationContext ic) { TransactionConfiguration configAnnotation = getTransactionConfiguration(ic); @@ -223,7 +245,7 @@ protected Object handleAsync(TransactionManager tm, Transaction tx, InvocationCo // Suspend the transaction to remove it from the main request thread tm.suspend(); afterEndTransaction.run(); - if (ret instanceof CompletionStage) { + if (isCompletionStage(ret)) { return ((CompletionStage<?>) ret).handle((v, t) -> { try { doInTransaction(tm, tx, () -> { @@ -249,8 +271,15 @@ protected Object handleAsync(TransactionManager tm, Transaction tx, InvocationCo throw new CompletionException(t); return v; }); - } else if (ret instanceof Publisher) { - ret = Multi.createFrom().publisher((Publisher<?>) ret) + } else if (isLegacyPublisher(ret) || isFlowPublisher(ret)) { + Flow.Publisher<?> pub; + boolean isLegacyRS = !isFlowPublisher(ret); + if (isLegacyRS) { + pub = AdaptersToFlow.publisher((Publisher<?>) ret); + } else { + pub = (Flow.Publisher<?>) ret; + } + ret = Multi.createFrom().publisher(pub) .onFailure().invoke(t -> { try { doInTransaction(tm, tx, () -> handleExceptionNoThrow(ic, t, tx)); @@ -276,6 +305,9 @@ protected Object handleAsync(TransactionManager tm, Transaction tx, InvocationCo throw new RuntimeException(e); } }); + if (isLegacyRS) { + ret = AdaptersToReactiveStreams.publisher((Multi<?>) ret); + } } return ret; } diff --git a/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiJsonArraySupport.java b/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiJsonArraySupport.java index 38417f89b7ad0f..f7da1f6d3e4ae1 100644 --- a/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiJsonArraySupport.java +++ b/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiJsonArraySupport.java @@ -1,10 +1,9 @@ package io.quarkus.vertx.web.runtime; +import java.util.concurrent.Flow; +import java.util.concurrent.Flow.Subscriber; import java.util.function.Function; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - import io.smallrye.mutiny.Multi; import io.vertx.core.AsyncResult; import io.vertx.core.Handler; @@ -37,10 +36,10 @@ public Buffer apply(String s) { private static void write(Multi<Buffer> multi, RoutingContext rc) { HttpServerResponse response = rc.response(); multi.subscribe().withSubscriber(new Subscriber<Buffer>() { - Subscription upstream; + Flow.Subscription upstream; @Override - public void onSubscribe(Subscription subscription) { + public void onSubscribe(Flow.Subscription subscription) { this.upstream = subscription; this.upstream.request(1); } @@ -78,7 +77,7 @@ public void onComplete() { }); } - private static void onWriteDone(Subscription subscription, AsyncResult<Void> ar, RoutingContext rc) { + private static void onWriteDone(Flow.Subscription subscription, AsyncResult<Void> ar, RoutingContext rc) { if (ar.failed()) { rc.fail(ar.cause()); } else { diff --git a/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiNdjsonSupport.java b/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiNdjsonSupport.java index 4d7da7b2fc5c8e..907ea871ae1125 100644 --- a/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiNdjsonSupport.java +++ b/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiNdjsonSupport.java @@ -1,7 +1,7 @@ package io.quarkus.vertx.web.runtime; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; import io.smallrye.mutiny.Multi; import io.vertx.core.AsyncResult; diff --git a/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiSseSupport.java b/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiSseSupport.java index 11962e9bdd75c4..1c3c3236287608 100644 --- a/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiSseSupport.java +++ b/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiSseSupport.java @@ -1,11 +1,10 @@ package io.quarkus.vertx.web.runtime; +import java.util.concurrent.Flow; +import java.util.concurrent.Flow.Subscription; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - import io.quarkus.vertx.web.ReactiveRoutes; import io.smallrye.mutiny.Multi; import io.vertx.core.AsyncResult; @@ -52,7 +51,7 @@ private static void onWriteDone(Subscription subscription, AsyncResult<Void> ar, public static void write(Multi<Buffer> multi, RoutingContext rc) { HttpServerResponse response = rc.response(); - multi.subscribe().withSubscriber(new Subscriber<Buffer>() { + multi.subscribe().withSubscriber(new Flow.Subscriber<Buffer>() { Subscription upstream; @Override @@ -86,7 +85,7 @@ public void onComplete() { public static void subscribeBuffer(Multi<Buffer> multi, RoutingContext rc) { HttpServerResponse response = rc.response(); - multi.subscribe().withSubscriber(new Subscriber<Buffer>() { + multi.subscribe().withSubscriber(new Flow.Subscriber<Buffer>() { Subscription upstream; final AtomicLong count = new AtomicLong(); diff --git a/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiSupport.java b/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiSupport.java index 58634cd62469f0..efde2979271002 100644 --- a/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiSupport.java +++ b/extensions/reactive-routes/runtime/src/main/java/io/quarkus/vertx/web/runtime/MultiSupport.java @@ -1,11 +1,10 @@ package io.quarkus.vertx.web.runtime; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; import java.util.function.Consumer; import java.util.function.Function; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - import io.smallrye.mutiny.Multi; import io.vertx.core.AsyncResult; import io.vertx.core.Handler; diff --git a/extensions/resteasy-classic/resteasy-mutiny-common/runtime/pom.xml b/extensions/resteasy-classic/resteasy-mutiny-common/runtime/pom.xml index f9bc83e92c9e72..cf3625af52f05a 100644 --- a/extensions/resteasy-classic/resteasy-mutiny-common/runtime/pom.xml +++ b/extensions/resteasy-classic/resteasy-mutiny-common/runtime/pom.xml @@ -22,6 +22,10 @@ <groupId>io.quarkus</groupId> <artifactId>quarkus-mutiny</artifactId> </dependency> + <dependency> + <groupId>io.smallrye.reactive</groupId> + <artifactId>mutiny-zero-flow-adapters</artifactId> + </dependency> <dependency> <groupId>org.jboss.resteasy</groupId> <artifactId>resteasy-client</artifactId> diff --git a/extensions/resteasy-classic/resteasy-mutiny-common/runtime/src/main/java/io/quarkus/resteasy/mutiny/common/runtime/MultiProvider.java b/extensions/resteasy-classic/resteasy-mutiny-common/runtime/src/main/java/io/quarkus/resteasy/mutiny/common/runtime/MultiProvider.java index fa53954b5168e3..a8fcdac529730b 100644 --- a/extensions/resteasy-classic/resteasy-mutiny-common/runtime/src/main/java/io/quarkus/resteasy/mutiny/common/runtime/MultiProvider.java +++ b/extensions/resteasy-classic/resteasy-mutiny-common/runtime/src/main/java/io/quarkus/resteasy/mutiny/common/runtime/MultiProvider.java @@ -4,10 +4,11 @@ import org.reactivestreams.Publisher; import io.smallrye.mutiny.Multi; +import mutiny.zero.flow.adapters.AdaptersToReactiveStreams; public class MultiProvider implements AsyncStreamProvider<Multi<?>> { @Override public Publisher<?> toAsyncStream(Multi<?> multi) { - return multi; + return AdaptersToReactiveStreams.publisher(multi); } -} \ No newline at end of file +} diff --git a/extensions/resteasy-classic/resteasy-mutiny-common/runtime/src/test/java/io/quarkus/resteasy/mutiny/common/test/MultiProviderTest.java b/extensions/resteasy-classic/resteasy-mutiny-common/runtime/src/test/java/io/quarkus/resteasy/mutiny/common/test/MultiProviderTest.java index 1144b7e5350440..9ff98a595da42b 100644 --- a/extensions/resteasy-classic/resteasy-mutiny-common/runtime/src/test/java/io/quarkus/resteasy/mutiny/common/test/MultiProviderTest.java +++ b/extensions/resteasy-classic/resteasy-mutiny-common/runtime/src/test/java/io/quarkus/resteasy/mutiny/common/test/MultiProviderTest.java @@ -8,6 +8,7 @@ import io.quarkus.resteasy.mutiny.common.runtime.MultiProvider; import io.smallrye.mutiny.Multi; +import mutiny.zero.flow.adapters.AdaptersToFlow; public class MultiProviderTest { @@ -17,7 +18,8 @@ public class MultiProviderTest { public void test() { Multi<?> multi = Multi.createFrom().items(1, 2, 3); Publisher<?> publisher = provider.toAsyncStream(multi); - List<?> list = Multi.createFrom().publisher(publisher).collect().asList().await().indefinitely(); + List<?> list = Multi.createFrom().publisher(AdaptersToFlow.publisher(publisher)).collect().asList().await() + .indefinitely(); Assertions.assertEquals(1, list.get(0)); Assertions.assertEquals(2, list.get(1)); Assertions.assertEquals(3, list.get(2)); diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamResource.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamResource.java index c42dea196633a7..a6f8ea1df87a7b 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamResource.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamResource.java @@ -1,6 +1,7 @@ package io.quarkus.resteasy.reactive.server.test.stream; import java.util.Date; +import java.util.concurrent.Flow; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -20,6 +21,7 @@ import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.infrastructure.Infrastructure; import io.vertx.core.buffer.Buffer; +import mutiny.zero.flow.adapters.AdaptersToReactiveStreams; @Path("stream") public class StreamResource { @@ -43,10 +45,17 @@ public Multi<String> getStreamedText() { @Path("text/stream/publisher") @GET @Produces(MediaType.TEXT_PLAIN) - public Publisher<String> getStreamedTextPublisher() { + public Flow.Publisher<String> getStreamedTextPublisher() { return Multi.createFrom().items("foo", "bar"); } + @Path("text/stream/legacy-publisher") + @GET + @Produces(MediaType.TEXT_PLAIN) + public Publisher<String> getStreamedTextLegacyPublisher() { + return AdaptersToReactiveStreams.publisher(Multi.createFrom().items("foo", "bar")); + } + @Path("byte-arrays/collect") @GET @Produces(MediaType.TEXT_PLAIN) @@ -78,7 +87,7 @@ public Multi<char[]> getStreamedCharArrays() { @Path("char-arrays/stream/publisher") @GET @Produces(MediaType.TEXT_PLAIN) - public Publisher<char[]> getStreamedCharArraysPublisher() { + public Flow.Publisher<char[]> getStreamedCharArraysPublisher() { return Multi.createFrom().items("foo".toCharArray(), "bar".toCharArray()); } diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamTestCase.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamTestCase.java index 75e9830a024f2c..7a2135bd216cd0 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamTestCase.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stream/StreamTestCase.java @@ -105,6 +105,10 @@ public void testStreaming() throws Exception { .then() .statusCode(200) .body(Matchers.equalTo("foobar")); + RestAssured.get("/stream/text/stream/legacy-publisher") + .then() + .statusCode(200) + .body(Matchers.equalTo("foobar")); RestAssured.get("/stream/text/collect") .then() .statusCode(200) diff --git a/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/amqp/ProducingBean.java b/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/amqp/ProducingBean.java index 93771a283c998d..d708936335aa23 100644 --- a/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/amqp/ProducingBean.java +++ b/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/amqp/ProducingBean.java @@ -1,11 +1,11 @@ package io.quarkus.smallrye.reactivemessaging.amqp; import java.time.Duration; +import java.util.concurrent.Flow; import jakarta.enterprise.context.ApplicationScoped; import org.eclipse.microprofile.reactive.messaging.Outgoing; -import org.reactivestreams.Publisher; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; @@ -14,7 +14,7 @@ public class ProducingBean { @Outgoing("source") - public Publisher<Long> generate() { + public Flow.Publisher<Long> generate() { return Multi.createFrom().range(1, 11) .map(Integer::longValue) .map(i -> i * 2) diff --git a/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/amqp/devmode/nohttp/Producer.java b/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/amqp/devmode/nohttp/Producer.java index f7ae0181c2a3ae..812ed0ac96845b 100644 --- a/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/amqp/devmode/nohttp/Producer.java +++ b/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/amqp/devmode/nohttp/Producer.java @@ -1,18 +1,18 @@ package io.quarkus.smallrye.reactivemessaging.amqp.devmode.nohttp; import java.time.Duration; +import java.util.concurrent.Flow; import jakarta.enterprise.context.ApplicationScoped; import org.eclipse.microprofile.reactive.messaging.Outgoing; -import org.reactivestreams.Publisher; import io.smallrye.mutiny.Multi; @ApplicationScoped public class Producer { @Outgoing("source") - public Publisher<Long> generate() { + public Flow.Publisher<Long> generate() { return Multi.createFrom().ticks().every(Duration.ofMillis(200)) .onOverflow().drop() .map(i -> i * 1); diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java index 59b0c766fae651..858b60cfd41953 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java @@ -114,11 +114,8 @@ public static QuarkusMediatorConfiguration create(MethodInfo methodInfo, boolean configuration.setProduction(validationOutput.getProduction()); configuration.setConsumption(validationOutput.getConsumption()); configuration.setIngestedPayloadType(validationOutput.getIngestedPayloadType()); - if (validationOutput.getUseBuilderTypes()) { - configuration.setUseBuilderTypes(validationOutput.getUseBuilderTypes()); - } else { - configuration.setUseBuilderTypes(false); - } + configuration.setUseBuilderTypes(validationOutput.getUseBuilderTypes()); + configuration.setUseReactiveStreams(validationOutput.getUseReactiveStreams()); if (acknowledgment == null) { acknowledgment = mediatorConfigurationSupport.processDefaultAcknowledgement(shape, diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java index b43bddc727e68e..b9db175cfb7844 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java @@ -55,9 +55,8 @@ public final class ReactiveMessagingDotNames { static final DotName EMITTER_FACTORY_FOR = DotName.createSimple(EmitterFactoryFor.class.getName()); static final DotName INCOMING_CONNECTOR_FACTORY = DotName.createSimple(IncomingConnectorFactory.class.getName()); - static final DotName OUTGOING_CONNECTOR_FACTORY = DotName.createSimple(OutgoingConnectorFactory.class.getName()); - static final DotName INBOUND_CONNECTOR = DotName.createSimple(InboundConnector.class.getName()); + static final DotName OUTGOING_CONNECTOR_FACTORY = DotName.createSimple(OutgoingConnectorFactory.class.getName()); static final DotName OUTBOUND_CONNECTOR = DotName.createSimple(OutboundConnector.class.getName()); static final DotName SMALLRYE_BLOCKING = DotName.createSimple(io.smallrye.common.annotation.Blocking.class.getName()); diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/SmallRyeBlockingSubscriberTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/SmallRyeBlockingSubscriberTest.java index 4d89322315f354..68af7c38ffd812 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/SmallRyeBlockingSubscriberTest.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/SmallRyeBlockingSubscriberTest.java @@ -5,6 +5,7 @@ import java.io.File; import java.util.List; +import java.util.concurrent.Flow; import java.util.stream.Collectors; import jakarta.enterprise.context.ApplicationScoped; @@ -13,7 +14,6 @@ import org.eclipse.microprofile.reactive.messaging.Outgoing; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.reactivestreams.Publisher; import io.quarkus.smallrye.reactivemessaging.blocking.beans.IncomingUsingSmallRyeBlocking; import io.quarkus.test.QuarkusUnitTest; @@ -47,7 +47,7 @@ public void testIncomingUsingRunOnWorkerThread() { @ApplicationScoped public static class ProduceIn { @Outgoing("in") - public Publisher<String> produce() { + public Flow.Publisher<String> produce() { return Multi.createFrom().items("a", "b", "c", "d", "e", "f"); } } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/TransactionalSubscriberTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/TransactionalSubscriberTest.java index 04b6134a478a72..81e3a7639d388a 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/TransactionalSubscriberTest.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/TransactionalSubscriberTest.java @@ -5,6 +5,7 @@ import java.io.File; import java.util.List; +import java.util.concurrent.Flow; import java.util.stream.Collectors; import jakarta.enterprise.context.ApplicationScoped; @@ -13,7 +14,6 @@ import org.eclipse.microprofile.reactive.messaging.Outgoing; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.reactivestreams.Publisher; import io.quarkus.smallrye.reactivemessaging.blocking.beans.IncomingUsingTransactional; import io.quarkus.test.QuarkusUnitTest; @@ -47,7 +47,7 @@ public void testIncomingUsingRunOnWorkerThread() { @ApplicationScoped public static class ProduceIn { @Outgoing("in") - public Publisher<String> produce() { + public Flow.Publisher<String> produce() { return Multi.createFrom().items("a", "b", "c", "d", "e", "f"); } } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/blocking/BlockingSubscriberTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/blocking/BlockingSubscriberTest.java index 76a4eb3fd2eb66..9db7283577cea9 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/blocking/BlockingSubscriberTest.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/blocking/BlockingSubscriberTest.java @@ -5,6 +5,7 @@ import java.io.File; import java.util.List; +import java.util.concurrent.Flow; import java.util.stream.Collectors; import jakarta.enterprise.context.ApplicationScoped; @@ -13,7 +14,6 @@ import org.eclipse.microprofile.reactive.messaging.Outgoing; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.reactivestreams.Publisher; import io.quarkus.smallrye.reactivemessaging.blocking.beans.IncomingCustomTwoBlockingBean; import io.quarkus.smallrye.reactivemessaging.blocking.beans.IncomingCustomUnorderedBlockingBean; @@ -72,7 +72,7 @@ public void testIncomingBlockingCustomPoolTwo() { public static class ProduceIn { @Outgoing("in") @Broadcast(2) - public Publisher<String> produce() { + public Flow.Publisher<String> produce() { return Multi.createFrom().items("a", "b", "c", "d", "e", "f"); } } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/blocking/SmallRyeBlockingSubscriberTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/blocking/SmallRyeBlockingSubscriberTest.java index f2792b6cca643b..dddaaf88382921 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/blocking/SmallRyeBlockingSubscriberTest.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/blocking/SmallRyeBlockingSubscriberTest.java @@ -5,6 +5,7 @@ import java.io.File; import java.util.List; +import java.util.concurrent.Flow; import java.util.stream.Collectors; import jakarta.enterprise.context.ApplicationScoped; @@ -13,7 +14,6 @@ import org.eclipse.microprofile.reactive.messaging.Outgoing; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.reactivestreams.Publisher; import io.quarkus.smallrye.reactivemessaging.blocking.beans.IncomingUsingSmallRyeBlocking; import io.quarkus.test.QuarkusUnitTest; @@ -47,7 +47,7 @@ public void testIncomingUsingRunOnWorkerThread() { @ApplicationScoped public static class ProduceIn { @Outgoing("in") - public Publisher<String> produce() { + public Flow.Publisher<String> produce() { return Multi.createFrom().items("a", "b", "c", "d", "e", "f"); } } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/channels/DeprecatedChannelConsumer.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/channels/DeprecatedChannelConsumer.java index 9ec142ab201a9c..084ddf6a2678c7 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/channels/DeprecatedChannelConsumer.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/channels/DeprecatedChannelConsumer.java @@ -13,6 +13,7 @@ import io.smallrye.mutiny.Multi; import io.smallrye.reactive.messaging.annotations.Channel; +import mutiny.zero.flow.adapters.AdaptersToFlow; @ApplicationScoped public class DeprecatedChannelConsumer { @@ -22,7 +23,7 @@ public class DeprecatedChannelConsumer { Publisher<Message<String>> sourceStream; public List<String> consume() { - return Multi.createFrom().publisher(sourceStream) + return Multi.createFrom().publisher(AdaptersToFlow.publisher(sourceStream)) .onItem().transform(Message::getPayload) .collect().asList() .await().indefinitely(); diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/signatures/IncomingsTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/signatures/IncomingsTest.java index 4102b9e5124a62..8800f0cc069045 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/signatures/IncomingsTest.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/signatures/IncomingsTest.java @@ -5,6 +5,7 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Flow; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -13,7 +14,6 @@ import org.eclipse.microprofile.reactive.messaging.Outgoing; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.reactivestreams.Publisher; import io.quarkus.test.QuarkusUnitTest; import io.smallrye.mutiny.Multi; @@ -39,7 +39,7 @@ public void testIncomingsWithTwoSources() { public static class ProducerOnA { @Outgoing("a") - public Publisher<String> produce() { + public Flow.Publisher<String> produce() { return Multi.createFrom().items("a", "b", "c"); } @@ -49,7 +49,7 @@ public Publisher<String> produce() { public static class ProducerOnB { @Outgoing("b") - public Publisher<String> produce() { + public Flow.Publisher<String> produce() { return Multi.createFrom().items("d", "e", "f"); } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentChannelTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentChannelTest.java index 647fea7bb98f75..3b9650e229ec0d 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentChannelTest.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentChannelTest.java @@ -23,6 +23,7 @@ import io.quarkus.test.QuarkusUnitTest; import io.smallrye.mutiny.Multi; +import mutiny.zero.flow.adapters.AdaptersToReactiveStreams; public class ConnectorAttachmentChannelTest { @@ -45,7 +46,8 @@ static class MyDummyConnector implements IncomingConnectorFactory { @Override public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) { - return ReactiveStreams.fromPublisher(Multi.createFrom().range(0, 5).map(Message::of)); + return ReactiveStreams + .fromPublisher(AdaptersToReactiveStreams.publisher(Multi.createFrom().range(0, 5).map(Message::of))); } } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentCustomEmitterTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentCustomEmitterTest.java index 37ef0f8a9fc516..e8dfb8b826dc68 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentCustomEmitterTest.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentCustomEmitterTest.java @@ -5,6 +5,7 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Flow; import jakarta.annotation.PostConstruct; import jakarta.enterprise.context.ApplicationScoped; @@ -24,7 +25,6 @@ import org.jboss.shrinkwrap.api.spec.JavaArchive; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.reactivestreams.Publisher; import io.quarkus.test.QuarkusUnitTest; import io.smallrye.mutiny.Multi; @@ -89,7 +89,7 @@ interface CustomEmitter<T> extends EmitterType { static class CustomEmitterImpl<T> implements MessagePublisherProvider<T>, CustomEmitter<T> { @Override - public Publisher<Message<? extends T>> getPublisher() { + public Flow.Publisher<Message<? extends T>> getPublisher() { return Multi.createFrom().range(0, 5).map(Message::of).map(m -> (Message<T>) m); } } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentIncomingTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentIncomingTest.java index 9c0842b500c356..4968931d27d56d 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentIncomingTest.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentIncomingTest.java @@ -22,6 +22,7 @@ import io.quarkus.test.QuarkusUnitTest; import io.smallrye.mutiny.Multi; +import mutiny.zero.flow.adapters.AdaptersToReactiveStreams; public class ConnectorAttachmentIncomingTest { @@ -44,7 +45,8 @@ static class MyDummyConnector implements IncomingConnectorFactory { @Override public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) { - return ReactiveStreams.fromPublisher(Multi.createFrom().range(0, 5).map(Message::of)); + return ReactiveStreams + .fromPublisher(AdaptersToReactiveStreams.publisher(Multi.createFrom().range(0, 5).map(Message::of))); } } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/DisabledConnectorAttachmentIncomingTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/DisabledConnectorAttachmentIncomingTest.java index 82a04a471ff403..dd059d54a748d1 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/DisabledConnectorAttachmentIncomingTest.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/DisabledConnectorAttachmentIncomingTest.java @@ -23,6 +23,7 @@ import io.quarkus.test.QuarkusUnitTest; import io.smallrye.mutiny.Multi; +import mutiny.zero.flow.adapters.AdaptersToReactiveStreams; public class DisabledConnectorAttachmentIncomingTest { @@ -48,7 +49,8 @@ static class MyDummyConnector implements IncomingConnectorFactory { @Override public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) { - return ReactiveStreams.fromPublisher(Multi.createFrom().range(0, 5).map(Message::of)); + return ReactiveStreams + .fromPublisher(AdaptersToReactiveStreams.publisher(Multi.createFrom().range(0, 5).map(Message::of))); } } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/IncomingChannelWithDotsTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/IncomingChannelWithDotsTest.java index 2217eebb3161e4..9ffb14588289b6 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/IncomingChannelWithDotsTest.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/IncomingChannelWithDotsTest.java @@ -22,6 +22,7 @@ import io.quarkus.test.QuarkusUnitTest; import io.smallrye.mutiny.Multi; +import mutiny.zero.flow.adapters.AdaptersToReactiveStreams; public class IncomingChannelWithDotsTest { @@ -44,7 +45,8 @@ static class MyDummyConnector implements IncomingConnectorFactory { @Override public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) { - return ReactiveStreams.fromPublisher(Multi.createFrom().range(0, 5).map(Message::of)); + return ReactiveStreams + .fromPublisher(AdaptersToReactiveStreams.publisher(Multi.createFrom().range(0, 5).map(Message::of))); } } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/TwoConnectorsAttachmentIncomingTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/TwoConnectorsAttachmentIncomingTest.java index 68f708c54b04f3..5ffe5ef6888cf3 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/TwoConnectorsAttachmentIncomingTest.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/TwoConnectorsAttachmentIncomingTest.java @@ -23,6 +23,7 @@ import io.quarkus.test.QuarkusUnitTest; import io.smallrye.mutiny.Multi; +import mutiny.zero.flow.adapters.AdaptersToReactiveStreams; public class TwoConnectorsAttachmentIncomingTest { @@ -47,7 +48,8 @@ static class MyDummyConnector implements IncomingConnectorFactory { @Override public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) { - return ReactiveStreams.fromPublisher(Multi.createFrom().range(0, 5).map(Message::of)); + return ReactiveStreams + .fromPublisher(AdaptersToReactiveStreams.publisher(Multi.createFrom().range(0, 5).map(Message::of))); } } @@ -57,7 +59,8 @@ static class MySecondDummyConnector implements IncomingConnectorFactory { @Override public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) { - return ReactiveStreams.fromPublisher(Multi.createFrom().range(0, 5).map(Message::of)); + return ReactiveStreams + .fromPublisher(AdaptersToReactiveStreams.publisher(Multi.createFrom().range(0, 5).map(Message::of))); } } diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/DuplicatedContextConnectorFactoryInterceptor.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/DuplicatedContextConnectorFactoryInterceptor.java index 3bb3703f4af2ec..7ac562c0b1f4a9 100644 --- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/DuplicatedContextConnectorFactoryInterceptor.java +++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/DuplicatedContextConnectorFactoryInterceptor.java @@ -1,6 +1,7 @@ package io.quarkus.smallrye.reactivemessaging.runtime; import java.util.Optional; +import java.util.concurrent.Flow; import jakarta.annotation.Priority; import jakarta.interceptor.AroundInvoke; @@ -9,7 +10,6 @@ import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; -import org.reactivestreams.Publisher; import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle; import io.smallrye.common.vertx.VertxContext; @@ -29,7 +29,7 @@ public Object intercept(InvocationContext ctx) throws Exception { return result.map(DuplicatedContextConnectorFactoryInterceptor::setMessageContextSafe); } if (ctx.getMethod().getName().equals("getPublisher")) { - Publisher<Message<?>> result = (Publisher<Message<?>>) ctx.proceed(); + Flow.Publisher<Message<?>> result = (Flow.Publisher<Message<?>>) ctx.proceed(); return Multi.createFrom().publisher(result) .map(DuplicatedContextConnectorFactoryInterceptor::setMessageContextSafe); } diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusMediatorConfiguration.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusMediatorConfiguration.java index 48c3309a2dc1d3..054e1111d89602 100644 --- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusMediatorConfiguration.java +++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusMediatorConfiguration.java @@ -53,6 +53,8 @@ public class QuarkusMediatorConfiguration implements MediatorConfiguration { private Type ingestedPayload; + private boolean useReactiveStreams = false; + public String getBeanId() { return beanId; } @@ -260,4 +262,17 @@ public void setBlockingExecutionOrdered(boolean blockingExecutionOrdered) { public void setWorkerPoolName(String workerPoolName) { this.workerPoolName = workerPoolName; } + + public boolean isUseReactiveStreams() { + return useReactiveStreams; + } + + public void setUseReactiveStreams(boolean usesReactiveStreams) { + this.useReactiveStreams = usesReactiveStreams; + } + + @Override + public boolean usesReactiveStreams() { + return useReactiveStreams; + } } diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/devmode/DevModeSupportConnectorFactoryInterceptor.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/devmode/DevModeSupportConnectorFactoryInterceptor.java index e006416e9a4b8d..ffeaae2e883b6f 100644 --- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/devmode/DevModeSupportConnectorFactoryInterceptor.java +++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/devmode/DevModeSupportConnectorFactoryInterceptor.java @@ -1,6 +1,7 @@ package io.quarkus.smallrye.reactivemessaging.runtime.devmode; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Flow; import java.util.function.Supplier; import jakarta.annotation.Priority; @@ -12,7 +13,6 @@ import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder; -import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -50,7 +50,7 @@ public Object intercept(InvocationContext ctx) throws Exception { }); } if (ctx.getMethod().getName().equals("getPublisher")) { - Publisher<Message<?>> result = (Publisher<Message<?>>) ctx.proceed(); + Flow.Publisher<Message<?>> result = (Flow.Publisher<Message<?>>) ctx.proceed(); return Multi.createFrom().publisher(result) .onItem().transformToUniAndConcatenate(msg -> Uni.createFrom().emitter(e -> { onMessage.get().whenComplete((restarted, error) -> { @@ -94,12 +94,12 @@ public void onComplete() { }); } if (ctx.getMethod().getName().equals("getSubscriber")) { - Subscriber<Message<?>> result = (Subscriber<Message<?>>) ctx.proceed(); - return new Subscriber<Message<?>>() { - private Subscriber<Message<?>> subscriber; + Flow.Subscriber<Message<?>> result = (Flow.Subscriber<Message<?>>) ctx.proceed(); + return new Flow.Subscriber<Message<?>>() { + private Flow.Subscriber<Message<?>> subscriber; @Override - public void onSubscribe(Subscription s) { + public void onSubscribe(Flow.Subscription s) { subscriber = result; subscriber.onSubscribe(s); } diff --git a/independent-projects/arc/pom.xml b/independent-projects/arc/pom.xml index beffa29e305c8a..289d0292e34b77 100644 --- a/independent-projects/arc/pom.xml +++ b/independent-projects/arc/pom.xml @@ -52,7 +52,7 @@ <version.jakarta-annotation>2.1.1</version.jakarta-annotation> <version.gizmo>1.6.0.Final</version.gizmo> <version.jpa>3.0.0</version.jpa> - <version.mutiny>1.8.0</version.mutiny> + <version.mutiny>2.1.0</version.mutiny> <version.surefire.plugin>3.0.0-M8</version.surefire.plugin> <nexus-staging-maven-plugin.version>1.6.8</nexus-staging-maven-plugin.version> diff --git a/independent-projects/qute/pom.xml b/independent-projects/qute/pom.xml index 2000c354185aad..3b50866e86d90d 100644 --- a/independent-projects/qute/pom.xml +++ b/independent-projects/qute/pom.xml @@ -47,7 +47,7 @@ <version.jboss-logging>3.5.0.Final</version.jboss-logging> <version.surefire.plugin>3.0.0-M8</version.surefire.plugin> <version.nexus-staging-maven-plugin>1.6.8</version.nexus-staging-maven-plugin> - <version.smallrye-mutiny>1.9.0</version.smallrye-mutiny> + <version.smallrye-mutiny>2.1.0</version.smallrye-mutiny> </properties> <modules> diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/MultiByteHttpData.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/MultiByteHttpData.java index 3223884280ba78..f85b6c136b5d4e 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/MultiByteHttpData.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/MultiByteHttpData.java @@ -20,10 +20,10 @@ import java.io.InputStream; import java.nio.charset.Charset; import java.util.concurrent.Executor; +import java.util.concurrent.Flow.Subscription; import java.util.function.Consumer; import org.jboss.logging.Logger; -import org.reactivestreams.Subscription; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; diff --git a/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/ResteasyReactiveDotNames.java b/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/ResteasyReactiveDotNames.java index a80eba40712df0..dd8e13857d185b 100644 --- a/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/ResteasyReactiveDotNames.java +++ b/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/ResteasyReactiveDotNames.java @@ -21,6 +21,7 @@ import java.util.SortedSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.Flow.Publisher; import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; @@ -93,7 +94,6 @@ import org.jboss.resteasy.reactive.RestSseElementType; import org.jboss.resteasy.reactive.RestStreamElementType; import org.jboss.resteasy.reactive.Separator; -import org.reactivestreams.Publisher; import io.smallrye.common.annotation.Blocking; import io.smallrye.common.annotation.NonBlocking; @@ -205,6 +205,7 @@ public final class ResteasyReactiveDotNames { public static final DotName COMPLETION_STAGE = DotName.createSimple(CompletionStage.class.getName()); public static final DotName COMPLETABLE_FUTURE = DotName.createSimple(CompletableFuture.class.getName()); public static final DotName PUBLISHER = DotName.createSimple(Publisher.class.getName()); + public static final DotName LEGACY_PUBLISHER = DotName.createSimple(org.reactivestreams.Publisher.class.getName()); public static final DotName REST_RESPONSE = DotName.createSimple(RestResponse.class.getName()); public static final DotName INTEGER = DotName.createSimple(Integer.class.getName()); diff --git a/independent-projects/resteasy-reactive/common/runtime/pom.xml b/independent-projects/resteasy-reactive/common/runtime/pom.xml index 69002f5afe25bb..2447d17a347488 100644 --- a/independent-projects/resteasy-reactive/common/runtime/pom.xml +++ b/independent-projects/resteasy-reactive/common/runtime/pom.xml @@ -22,6 +22,14 @@ <groupId>io.smallrye.reactive</groupId> <artifactId>mutiny</artifactId> </dependency> + <dependency> + <groupId>org.reactivestreams</groupId> + <artifactId>reactive-streams</artifactId> + </dependency> + <dependency> + <groupId>io.smallrye.reactive</groupId> + <artifactId>mutiny-zero-flow-adapters</artifactId> + </dependency> <dependency> <groupId>io.smallrye.common</groupId> <artifactId>smallrye-common-annotation</artifactId> diff --git a/independent-projects/resteasy-reactive/pom.xml b/independent-projects/resteasy-reactive/pom.xml index df3d4d1aad2ea6..4a0ee489451c4a 100644 --- a/independent-projects/resteasy-reactive/pom.xml +++ b/independent-projects/resteasy-reactive/pom.xml @@ -59,7 +59,7 @@ <version.surefire.plugin>3.0.0-M8</version.surefire.plugin> <nexus-staging-maven-plugin.version>1.6.8</nexus-staging-maven-plugin.version> - <mutiny.version>1.9.0</mutiny.version> + <mutiny.version>2.1.0</mutiny.version> <smallrye-common.version>1.13.2</smallrye-common.version> <vertx.version>4.3.5</vertx.version> <rest-assured.version>4.5.1</rest-assured.version> @@ -70,7 +70,9 @@ <yasson.version>3.0.2</yasson.version> <jakarta.json.bind-api.version>3.0.0</jakarta.json.bind-api.version> <awaitility.version>4.2.0</awaitility.version> - <smallrye-mutiny-vertx-core.version>2.25.0</smallrye-mutiny-vertx-core.version> + <smallrye-mutiny-vertx-core.version>3.0.0</smallrye-mutiny-vertx-core.version> + <reactive-streams.version>1.0.4</reactive-streams.version> + <mutiny-zero.version>1.0.0</mutiny-zero.version> <!-- Forbidden API checks --> <forbiddenapis-maven-plugin.version>3.4</forbiddenapis-maven-plugin.version> @@ -217,11 +219,22 @@ <version>${vertx.version}</version> </dependency> + <dependency> + <groupId>org.reactivestreams</groupId> + <artifactId>reactive-streams</artifactId> + <version>${reactive-streams.version}</version> + </dependency> + <dependency> <groupId>io.smallrye.reactive</groupId> <artifactId>mutiny</artifactId> <version>${mutiny.version}</version> </dependency> + <dependency> + <groupId>io.smallrye.reactive</groupId> + <artifactId>mutiny-zero-flow-adapters</artifactId> + <version>${mutiny-zero.version}</version> + </dependency> <dependency> <groupId>io.smallrye.reactive</groupId> <artifactId>smallrye-mutiny-vertx-core</artifactId> diff --git a/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/scanning/AsyncReturnTypeScanner.java b/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/scanning/AsyncReturnTypeScanner.java index d04a8d4133af48..5cf0e9ce56ad48 100644 --- a/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/scanning/AsyncReturnTypeScanner.java +++ b/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/scanning/AsyncReturnTypeScanner.java @@ -2,6 +2,7 @@ import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.COMPLETABLE_FUTURE; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.COMPLETION_STAGE; +import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.LEGACY_PUBLISHER; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.MULTI; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.PUBLISHER; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.UNI; @@ -32,7 +33,7 @@ public List<HandlerChainCustomizer> scan(MethodInfo method, ClassInfo actualEndp return Collections.singletonList(new FixedHandlerChainCustomizer(new UniResponseHandler(), HandlerChainCustomizer.Phase.AFTER_METHOD_INVOKE)); } - if (returnTypeName.equals(MULTI) || returnTypeName.equals(PUBLISHER)) { + if (returnTypeName.equals(MULTI) || returnTypeName.equals(PUBLISHER) || returnTypeName.equals(LEGACY_PUBLISHER)) { return Collections.singletonList(new FixedHandlerChainCustomizer(new PublisherResponseHandler(), HandlerChainCustomizer.Phase.AFTER_METHOD_INVOKE)); } @@ -43,6 +44,7 @@ public List<HandlerChainCustomizer> scan(MethodInfo method, ClassInfo actualEndp public boolean isMethodSignatureAsync(MethodInfo method) { DotName returnTypeName = method.returnType().name(); return returnTypeName.equals(COMPLETION_STAGE) || returnTypeName.equals(COMPLETABLE_FUTURE) || - returnTypeName.equals(UNI) || returnTypeName.equals(MULTI) || returnTypeName.equals(PUBLISHER); + returnTypeName.equals(UNI) || returnTypeName.equals(MULTI) || returnTypeName.equals(PUBLISHER) || + returnTypeName.equals(LEGACY_PUBLISHER); } } diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java index c06a031fe77133..d10f6fcfdaecd0 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java @@ -6,6 +6,9 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.Flow.Publisher; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -23,9 +26,8 @@ import org.jboss.resteasy.reactive.server.model.HandlerChainCustomizer.Phase; import org.jboss.resteasy.reactive.server.spi.ServerRestHandler; import org.jboss.resteasy.reactive.server.spi.StreamingResponse; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; + +import mutiny.zero.flow.adapters.AdaptersToFlow; /** * This handler is used to push streams of data to the client. @@ -246,8 +248,12 @@ public void handle(ResteasyReactiveRequestContext requestContext) @Override public void handle(ResteasyReactiveRequestContext requestContext) throws Exception { // FIXME: handle Response with entity being a Multi - if (requestContext.getResult() instanceof Publisher) { - Publisher<?> result = (Publisher<?>) requestContext.getResult(); + Object requestContextResult = requestContext.getResult(); + if (requestContextResult instanceof org.reactivestreams.Publisher) { + requestContextResult = AdaptersToFlow.publisher((org.reactivestreams.Publisher<?>) requestContextResult); + } + if (requestContextResult instanceof Publisher) { + Publisher<?> result = (Publisher<?>) requestContextResult; // FIXME: if we make a pretend Response and go through the normal route, we will get // media type negotiation and fixed entity writer set up, perhaps it's better than // cancelling the normal route? diff --git a/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/stream/StreamResource.java b/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/stream/StreamResource.java index 17d71f5f4bc5f0..41aebefcb6d1b7 100644 --- a/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/stream/StreamResource.java +++ b/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/stream/StreamResource.java @@ -1,6 +1,7 @@ package org.jboss.resteasy.reactive.server.vertx.test.stream; import java.util.Date; +import java.util.concurrent.Flow.Publisher; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -11,12 +12,12 @@ import jakarta.ws.rs.core.MediaType; import org.jboss.resteasy.reactive.common.util.MultiCollectors; -import org.reactivestreams.Publisher; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.infrastructure.Infrastructure; import io.vertx.core.buffer.Buffer; +import mutiny.zero.flow.adapters.AdaptersToReactiveStreams; @Path("stream") public class StreamResource { @@ -44,6 +45,13 @@ public Publisher<String> getStreamedTextPublisher() { return Multi.createFrom().items("foo", "bar"); } + @Path("text/stream/legacy-publisher") + @GET + @Produces(MediaType.TEXT_PLAIN) + public org.reactivestreams.Publisher<String> getStreamedTextLegacyPublisher() { + return AdaptersToReactiveStreams.publisher(Multi.createFrom().items("foo", "bar")); + } + @Path("byte-arrays/collect") @GET @Produces(MediaType.TEXT_PLAIN) diff --git a/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/stream/StreamTestCase.java b/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/stream/StreamTestCase.java index 10ff84498cda4b..d97c50659ec71a 100644 --- a/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/stream/StreamTestCase.java +++ b/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/stream/StreamTestCase.java @@ -104,6 +104,10 @@ public void testStreaming() throws Exception { .then() .statusCode(200) .body(Matchers.equalTo("foobar")); + RestAssured.get("/stream/text/stream/legacy-publisher") + .then() + .statusCode(200) + .body(Matchers.equalTo("foobar")); RestAssured.get("/stream/text/collect") .then() .statusCode(200) diff --git a/integration-tests/devmode/src/test/java/io/quarkus/test/devconsole/MyProcessor.java b/integration-tests/devmode/src/test/java/io/quarkus/test/devconsole/MyProcessor.java index affecb9fdf6c63..d9a63239dff434 100644 --- a/integration-tests/devmode/src/test/java/io/quarkus/test/devconsole/MyProcessor.java +++ b/integration-tests/devmode/src/test/java/io/quarkus/test/devconsole/MyProcessor.java @@ -1,5 +1,7 @@ package io.quarkus.test.devconsole; +import java.util.concurrent.Flow; + import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; import jakarta.enterprise.inject.Instance; @@ -8,9 +10,6 @@ import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.Outgoing; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; import io.smallrye.reactive.messaging.annotations.Broadcast; import io.vertx.core.http.HttpServerResponse; @@ -21,14 +20,14 @@ public class MyProcessor { @Inject @Channel("processed") - Instance<Publisher<String>> channel; + Instance<Flow.Publisher<String>> channel; public void init(@Observes Router router) { router.get().handler(rc -> { HttpServerResponse response = rc.response(); - channel.get().subscribe(new Subscriber<String>() { + channel.get().subscribe(new Flow.Subscriber<String>() { @Override - public void onSubscribe(Subscription subscription) { + public void onSubscribe(Flow.Subscription subscription) { response.putHeader("Transfer-encoding", "chunked"); subscription.request(2); } diff --git a/integration-tests/main/src/main/java/io/quarkus/it/reactive/ReactiveStreamOpsResource.java b/integration-tests/main/src/main/java/io/quarkus/it/reactive/ReactiveStreamOpsResource.java index 828d18c69d3ab4..a78519c8d7ca61 100644 --- a/integration-tests/main/src/main/java/io/quarkus/it/reactive/ReactiveStreamOpsResource.java +++ b/integration-tests/main/src/main/java/io/quarkus/it/reactive/ReactiveStreamOpsResource.java @@ -6,6 +6,7 @@ import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; import io.smallrye.mutiny.Multi; +import mutiny.zero.flow.adapters.AdaptersToReactiveStreams; @Path("/reactive") public class ReactiveStreamOpsResource { @@ -30,7 +31,7 @@ public String stream1() { @Path("/stream-mutiny") public String stream2() { StringBuilder builder = new StringBuilder(); - ReactiveStreams.fromPublisher(Multi.createFrom().items("d", "e", "f")) + ReactiveStreams.fromPublisher(AdaptersToReactiveStreams.publisher(Multi.createFrom().items("d", "e", "f"))) .map(String::toUpperCase) .forEach(builder::append) .run(); diff --git a/integration-tests/mongodb-panache-kotlin/src/main/kotlin/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookEntityResource.kt b/integration-tests/mongodb-panache-kotlin/src/main/kotlin/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookEntityResource.kt index 8237bed5f257c7..9984585251220c 100644 --- a/integration-tests/mongodb-panache-kotlin/src/main/kotlin/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookEntityResource.kt +++ b/integration-tests/mongodb-panache-kotlin/src/main/kotlin/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookEntityResource.kt @@ -19,9 +19,9 @@ import jakarta.ws.rs.core.Response import org.bson.types.ObjectId import org.jboss.logging.Logger import org.jboss.resteasy.reactive.RestStreamElementType -import org.reactivestreams.Publisher import java.net.URI import java.time.LocalDate.parse +import java.util.concurrent.Flow @Path("/reactive/books/entity") class ReactiveBookEntityResource { @@ -43,7 +43,7 @@ class ReactiveBookEntityResource { @Path("/stream") @Produces(MediaType.SERVER_SENT_EVENTS) @RestStreamElementType(MediaType.APPLICATION_JSON) - fun streamBooks(@QueryParam("sort") sort: String?): Publisher<ReactiveBookEntity> { + fun streamBooks(@QueryParam("sort") sort: String?): Flow.Publisher<ReactiveBookEntity> { return if (sort != null) { ReactiveBookEntity.streamAll(Sort.ascending(sort)) } else ReactiveBookEntity.streamAll() diff --git a/integration-tests/mongodb-panache-kotlin/src/main/kotlin/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookRepositoryResource.kt b/integration-tests/mongodb-panache-kotlin/src/main/kotlin/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookRepositoryResource.kt index 3ca8977a3dcc0c..bdd3f2baa8c0c1 100644 --- a/integration-tests/mongodb-panache-kotlin/src/main/kotlin/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookRepositoryResource.kt +++ b/integration-tests/mongodb-panache-kotlin/src/main/kotlin/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookRepositoryResource.kt @@ -21,9 +21,9 @@ import jakarta.ws.rs.core.Response import org.bson.types.ObjectId import org.jboss.logging.Logger import org.jboss.resteasy.reactive.RestStreamElementType -import org.reactivestreams.Publisher import java.net.URI import java.time.LocalDate +import java.util.concurrent.Flow @Path("/reactive/books/repository") class ReactiveBookRepositoryResource { @@ -48,7 +48,7 @@ class ReactiveBookRepositoryResource { @Path("/stream") @Produces(MediaType.SERVER_SENT_EVENTS) @RestStreamElementType(MediaType.APPLICATION_JSON) - fun streamBooks(@QueryParam("sort") sort: String?): Publisher<Book> { + fun streamBooks(@QueryParam("sort") sort: String?): Flow.Publisher<Book> { return if (sort != null) { reactiveBookRepository.streamAll(Sort.ascending(sort)) } else reactiveBookRepository.streamAll() diff --git a/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookEntityResource.java b/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookEntityResource.java index 38d640841f6b9b..eddd1391f6189f 100644 --- a/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookEntityResource.java +++ b/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookEntityResource.java @@ -3,6 +3,7 @@ import java.net.URI; import java.time.LocalDate; import java.util.List; +import java.util.concurrent.Flow; import jakarta.annotation.PostConstruct; import jakarta.ws.rs.*; @@ -12,7 +13,6 @@ import org.bson.types.ObjectId; import org.jboss.logging.Logger; import org.jboss.resteasy.reactive.RestStreamElementType; -import org.reactivestreams.Publisher; import io.quarkus.panache.common.Parameters; import io.quarkus.panache.common.Sort; @@ -41,7 +41,7 @@ public Uni<List<ReactiveBookEntity>> getBooks(@QueryParam("sort") String sort) { @Path("/stream") @Produces(MediaType.SERVER_SENT_EVENTS) @RestStreamElementType(MediaType.APPLICATION_JSON) - public Publisher<ReactiveBookEntity> streamBooks(@QueryParam("sort") String sort) { + public Flow.Publisher<ReactiveBookEntity> streamBooks(@QueryParam("sort") String sort) { if (sort != null) { return ReactiveBookEntity.streamAll(Sort.ascending(sort)); } diff --git a/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookRepositoryResource.java b/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookRepositoryResource.java index b0c97b81524b79..96e21828edc16f 100644 --- a/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookRepositoryResource.java +++ b/integration-tests/mongodb-panache/src/main/java/io/quarkus/it/mongodb/panache/reactive/book/ReactiveBookRepositoryResource.java @@ -3,6 +3,7 @@ import java.net.URI; import java.time.LocalDate; import java.util.List; +import java.util.concurrent.Flow; import jakarta.annotation.PostConstruct; import jakarta.inject.Inject; @@ -13,7 +14,6 @@ import org.bson.types.ObjectId; import org.jboss.logging.Logger; import org.jboss.resteasy.reactive.RestStreamElementType; -import org.reactivestreams.Publisher; import io.quarkus.it.mongodb.panache.book.Book; import io.quarkus.panache.common.Parameters; @@ -45,7 +45,7 @@ public Uni<List<Book>> getBooks(@QueryParam("sort") String sort) { @Path("/stream") @Produces(MediaType.SERVER_SENT_EVENTS) @RestStreamElementType(MediaType.APPLICATION_JSON) - public Publisher<Book> streamBooks(@QueryParam("sort") String sort) { + public Flow.Publisher<Book> streamBooks(@QueryParam("sort") String sort) { if (sort != null) { return reactiveBookRepository.streamAll(Sort.ascending(sort)); } diff --git a/integration-tests/smallrye-context-propagation/src/test/java/io/quarkus/context/test/mutiny/MutinyContextEndpoint.java b/integration-tests/smallrye-context-propagation/src/test/java/io/quarkus/context/test/mutiny/MutinyContextEndpoint.java index d3e442c56e6b30..26b82ed2877cbb 100644 --- a/integration-tests/smallrye-context-propagation/src/test/java/io/quarkus/context/test/mutiny/MutinyContextEndpoint.java +++ b/integration-tests/smallrye-context-propagation/src/test/java/io/quarkus/context/test/mutiny/MutinyContextEndpoint.java @@ -4,6 +4,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Flow; import jakarta.annotation.PreDestroy; import jakarta.inject.Inject; @@ -23,7 +24,6 @@ import org.eclipse.microprofile.context.ManagedExecutor; import org.eclipse.microprofile.context.ThreadContext; import org.junit.jupiter.api.Assertions; -import org.reactivestreams.Publisher; import org.wildfly.common.Assert; import io.quarkus.arc.Arc; @@ -455,7 +455,7 @@ public Multi<String> transactionPropagationWithMulti() throws SystemException { @Transactional @GET @Path("/transaction-multi-2") - public Publisher<String> transactionPropagationWithMulti2() { + public Flow.Publisher<String> transactionPropagationWithMulti2() { Multi<String> ret = Multi.createFrom().item("OK"); // now delete both entities Assertions.assertEquals(2, Person.deleteAll());