diff --git a/bom/runtime/pom.xml b/bom/runtime/pom.xml
index eb4826548f819..558a7044191de 100644
--- a/bom/runtime/pom.xml
+++ b/bom/runtime/pom.xml
@@ -128,7 +128,7 @@
4.1.49.Final
1.0.3
3.3.2.Final
- 0.5.4
+ 0.6.0
0.1.1
0.1.1
2.5.0
diff --git a/docs/src/main/asciidoc/amazon-dynamodb.adoc b/docs/src/main/asciidoc/amazon-dynamodb.adoc
index b94a87d3cbf1f..275bb4742fc0a 100644
--- a/docs/src/main/asciidoc/amazon-dynamodb.adoc
+++ b/docs/src/main/asciidoc/amazon-dynamodb.adoc
@@ -413,7 +413,7 @@ public class FruitAsyncService extends AbstractService {
public Uni> findAll() {
return Uni.createFrom().completionStage(() -> dynamoDB.scan(scanRequest()))
- .onItem().apply(res -> res.items().stream().map(Fruit::from).collect(Collectors.toList()));
+ .onItem().transform(res -> res.items().stream().map(Fruit::from).collect(Collectors.toList()));
}
public Uni> add(Fruit fruit) {
@@ -423,7 +423,7 @@ public class FruitAsyncService extends AbstractService {
public Uni get(String name) {
return Uni.createFrom().completionStage(() -> dynamoDB.getItem(getRequest(name)))
- .onItem().apply(resp -> Fruit.from(resp.item()));
+ .onItem().transform(resp -> Fruit.from(resp.item()));
}
}
----
diff --git a/docs/src/main/asciidoc/amazon-kms.adoc b/docs/src/main/asciidoc/amazon-kms.adoc
index 4d65c90600500..4cff8c7673b6a 100644
--- a/docs/src/main/asciidoc/amazon-kms.adoc
+++ b/docs/src/main/asciidoc/amazon-kms.adoc
@@ -260,17 +260,19 @@ public class QuarkusKmsAsyncResource {
@Path("/encrypt")
public Uni encrypt(String data) {
return Uni.createFrom().completionStage(kms.encrypt(req -> req.keyId(keyArn).plaintext(SdkBytes.fromUtf8String(data))))
- .onItem().apply(EncryptResponse::ciphertextBlob)
- .onItem().apply(blob -> Base64.encodeBase64String(blob.asByteArray()));
+ .onItem().transform(EncryptResponse::ciphertextBlob)
+ .onItem().transform(blob -> Base64.encodeBase64String(blob.asByteArray()));
}
@POST
@Path("/decrypt")
public Uni decrypt(String data) {
return Uni.createFrom().item(SdkBytes.fromByteArray(Base64.decodeBase64(data.getBytes())))
- .onItem().produceCompletionStage(msg -> kms.decrypt(req -> req.keyId(keyArn).ciphertextBlob(msg)))
- .onItem().apply(DecryptResponse::plaintext)
- .onItem().apply(SdkBytes::asUtf8String);
+ .onItem().transformToUni(msg ->
+ Uni.createFrom().completionStage(kms.decrypt(req -> req.keyId(keyArn).ciphertextBlob(msg)))
+ )
+ .onItem().transform(DecryptResponse::plaintext)
+ .onItem().transform(SdkBytes::asUtf8String);
}
}
----
diff --git a/docs/src/main/asciidoc/amazon-s3.adoc b/docs/src/main/asciidoc/amazon-s3.adoc
index 685eda8690b53..7356ec055f850 100644
--- a/docs/src/main/asciidoc/amazon-s3.adoc
+++ b/docs/src/main/asciidoc/amazon-s3.adoc
@@ -516,7 +516,7 @@ public class S3AsyncClientResource extends CommonResource {
.build();
return Uni.createFrom().completionStage(() -> s3.listObjects(listRequest))
- .onItem().apply(result -> toFileItems(result));
+ .onItem().transform(result -> toFileItems(result));
}
private List toFileItems(ListObjectsResponse objects) {
diff --git a/docs/src/main/asciidoc/amazon-sns.adoc b/docs/src/main/asciidoc/amazon-sns.adoc
index bd2a4ee1c122a..5f1e9e03bf37b 100644
--- a/docs/src/main/asciidoc/amazon-sns.adoc
+++ b/docs/src/main/asciidoc/amazon-sns.adoc
@@ -700,8 +700,8 @@ public class QuarksCannonAsyncResource {
return Uni.createFrom()
.completionStage(sns.publish(p -> p.topicArn(topicArn).message(message)))
.onItem().invoke(item -> LOGGER.infov("Fired Quark[{0}, {1}}]", quark.getFlavor(), quark.getSpin()))
- .onItem().apply(PublishResponse::messageId)
- .onItem().apply(id -> Response.ok().entity(id).build());
+ .onItem().transform(PublishResponse::messageId)
+ .onItem().transform(id -> Response.ok().entity(id).build());
}
}
----
@@ -770,14 +770,17 @@ public class QuarksShieldAsyncResource {
if (messageType.equals(NOTIFICATION_TYPE)) {
return Uni.createFrom().item(readObject(SnsNotification.class, message))
- .onItem().apply(notification -> readObject(Quark.class, notification.getMessage()))
+ .onItem().transform(notification -> readObject(Quark.class, notification.getMessage()))
.onItem().invoke(quark -> LOGGER.infov("Quark[{0}, {1}] collision with the shield.", quark.getFlavor(), quark.getSpin()))
- .onItem().apply(quark -> Response.ok().build());
+ .onItem().transform(quark -> Response.ok().build());
} else if (messageType.equals(SUBSCRIPTION_CONFIRMATION_TYPE)) {
return Uni.createFrom().item(readObject(SnsSubscriptionConfirmation.class, message))
- .onItem().produceCompletionStage(msg -> sns.confirmSubscription(confirm -> confirm.topicArn(topicArn).token(msg.getToken())))
+ .onItem().transformToUni(msg ->
+ Uni.createFrom().completionStage(
+ sns.confirmSubscription(confirm -> confirm.topicArn(topicArn).token(msg.getToken())))
+ )
.onItem().invoke(resp -> LOGGER.info("Subscription confirmed. Ready for quarks collisions."))
- .onItem().apply(resp -> Response.ok().build());
+ .onItem().transform(resp -> Response.ok().build());
} else if (messageType.equals(UNSUBSCRIPTION_CONFIRMATION_TYPE)) {
LOGGER.info("We are unsubscribed");
return Uni.createFrom().item(Response.ok().build());
@@ -791,10 +794,10 @@ public class QuarksShieldAsyncResource {
public Uni subscribe() {
return Uni.createFrom()
.completionStage(sns.subscribe(s -> s.topicArn(topicArn).protocol("http").endpoint(notificationEndpoint())))
- .onItem().apply(SubscribeResponse::subscriptionArn)
+ .onItem().transform(SubscribeResponse::subscriptionArn)
.onItem().invoke(this::setSubscriptionArn)
.onItem().invoke(arn -> LOGGER.infov("Subscribed Quarks shield with id = {0} ", arn))
- .onItem().apply(arn -> Response.ok().entity(arn).build());
+ .onItem().transform(arn -> Response.ok().entity(arn).build());
}
@POST
@@ -805,7 +808,7 @@ public class QuarksShieldAsyncResource {
.completionStage(sns.unsubscribe(s -> s.subscriptionArn(subscriptionArn)))
.onItem().invoke(arn -> LOGGER.infov("Unsubscribed quarks shield for id = {0}", subscriptionArn))
.onItem().invoke(arn -> subscriptionArn = null)
- .onItem().apply(arn -> Response.ok().build());
+ .onItem().transform(arn -> Response.ok().build());
} else {
return Uni.createFrom().item(Response.status(400).entity("Not subscribed yet").build());
}
diff --git a/docs/src/main/asciidoc/amazon-sqs.adoc b/docs/src/main/asciidoc/amazon-sqs.adoc
index c413718ed65ff..644949c490080 100644
--- a/docs/src/main/asciidoc/amazon-sqs.adoc
+++ b/docs/src/main/asciidoc/amazon-sqs.adoc
@@ -379,8 +379,8 @@ public class QuarksCannonAsyncResource {
return Uni.createFrom()
.completionStage(sqs.sendMessage(m -> m.queueUrl(queueUrl).messageBody(message)))
.onItem().invoke(item -> LOGGER.infov("Fired Quark[{0}, {1}}]", quark.getFlavor(), quark.getSpin()))
- .onItem().apply(SendMessageResponse::messageId)
- .onItem().apply(id -> Response.ok().entity(id).build());
+ .onItem().transform(SendMessageResponse::messageId)
+ .onItem().transform(id -> Response.ok().entity(id).build());
}
}
----
@@ -428,8 +428,8 @@ public class QuarksShieldAsyncResource {
public Uni> receive() {
return Uni.createFrom()
.completionStage(sqs.receiveMessage(m -> m.maxNumberOfMessages(10).queueUrl(queueUrl)))
- .onItem().apply(ReceiveMessageResponse::messages)
- .onItem().apply(m -> m.stream().map(Message::body).map(this::toQuark).collect(Collectors.toList()));
+ .onItem().transform(ReceiveMessageResponse::messages)
+ .onItem().transform(m -> m.stream().map(Message::body).map(this::toQuark).collect(Collectors.toList()));
}
private Quark toQuark(String message) {
diff --git a/docs/src/main/asciidoc/getting-started-reactive.adoc b/docs/src/main/asciidoc/getting-started-reactive.adoc
index 3cf196eb9127b..bfcaac781c09c 100644
--- a/docs/src/main/asciidoc/getting-started-reactive.adoc
+++ b/docs/src/main/asciidoc/getting-started-reactive.adoc
@@ -143,7 +143,7 @@ client = WebClient.create(vertx,
Uni uni =
client.get("/api/fruit/" + name)
.send()
- .onItem().apply(resp -> {
+ .onItem().transform(resp -> {
if (resp.statusCode() == 200) {
return resp.bodyAsJsonObject();
} else {
@@ -245,7 +245,7 @@ public class ReactiveGreetingService {
public Uni greeting(String name) {
return Uni.createFrom().item(name)
- .onItem().apply(n -> String.format("hello %s", name));
+ .onItem().transform(n -> String.format("hello %s", name));
}
}
----
@@ -312,7 +312,7 @@ In the `ReactiveGreetingService`, add the following method:
----
public Multi greetings(int count, String name) {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
- .onItem().apply(n -> String.format("hello %s - %d", name, n))
+ .onItem().transform(n -> String.format("hello %s - %d", name, n))
.transform().byTakingFirstItems(count);
}
----
@@ -456,30 +456,30 @@ public class Fruit {
public static Multi findAll(PgPool client) {
return client.query("SELECT id, name FROM fruits ORDER BY name ASC").execute()
// Create a Multi from the set of rows:
- .onItem().produceMulti(set -> Multi.createFrom().items(() -> StreamSupport.stream(set.spliterator(), false)))
+ .onItem().transformToMulti(set -> Multi.createFrom().items(() -> StreamSupport.stream(set.spliterator(), false)))
// For each row create a fruit instance
- .onItem().apply(Fruit::from);
+ .onItem().transform(Fruit::from);
}
public static Uni findById(PgPool client, Long id) {
return client.preparedQuery("SELECT id, name FROM fruits WHERE id = $1").execute(Tuple.of(id))
- .onItem().apply(RowSet::iterator)
- .onItem().apply(iterator -> iterator.hasNext() ? from(iterator.next()) : null);
+ .onItem().transform(RowSet::iterator)
+ .onItem().transform(iterator -> iterator.hasNext() ? from(iterator.next()) : null);
}
public Uni save(PgPool client) {
return client.preparedQuery("INSERT INTO fruits (name) VALUES ($1) RETURNING (id)").execute(Tuple.of(name))
- .onItem().apply(pgRowSet -> pgRowSet.iterator().next().getLong("id"));
+ .onItem().transform(pgRowSet -> pgRowSet.iterator().next().getLong("id"));
}
public Uni update(PgPool client) {
return client.preparedQuery("UPDATE fruits SET name = $1 WHERE id = $2").execute(Tuple.of(name, id))
- .onItem().apply(pgRowSet -> pgRowSet.rowCount() == 1);
+ .onItem().transform(pgRowSet -> pgRowSet.rowCount() == 1);
}
public static Uni delete(PgPool client, Long id) {
return client.preparedQuery("DELETE FROM fruits WHERE id = $1").execute(Tuple.of(id))
- .onItem().apply(pgRowSet -> pgRowSet.rowCount() == 1);
+ .onItem().transform(pgRowSet -> pgRowSet.rowCount() == 1);
}
private static Fruit from(Row row) {
@@ -559,31 +559,31 @@ public class FruitResource {
@Path("{id}")
public Uni getSingle(@PathParam Long id) {
return Fruit.findById(client, id)
- .onItem().apply(fruit -> fruit != null ? Response.ok(fruit) : Response.status(Status.NOT_FOUND))
- .onItem().apply(ResponseBuilder::build);
+ .onItem().transform(fruit -> fruit != null ? Response.ok(fruit) : Response.status(Status.NOT_FOUND))
+ .onItem().transform(ResponseBuilder::build);
}
@POST
public Uni create(Fruit fruit) {
return fruit.save(client)
- .onItem().apply(id -> URI.create("/fruits/" + id))
- .onItem().apply(uri -> Response.created(uri).build());
+ .onItem().transform(id -> URI.create("/fruits/" + id))
+ .onItem().transform(uri -> Response.created(uri).build());
}
@PUT
@Path("{id}")
public Uni update(@PathParam Long id, Fruit fruit) {
return fruit.update(client)
- .onItem().apply(updated -> updated ? Status.OK : Status.NOT_FOUND)
- .onItem().apply(status -> Response.status(status).build());
+ .onItem().transform(updated -> updated ? Status.OK : Status.NOT_FOUND)
+ .onItem().transform(status -> Response.status(status).build());
}
@DELETE
@Path("{id}")
public Uni delete(@PathParam Long id) {
return Fruit.delete(client, id)
- .onItem().apply(deleted -> deleted ? Status.NO_CONTENT : Status.NOT_FOUND)
- .onItem().apply(status -> Response.status(status).build());
+ .onItem().transform(deleted -> deleted ? Status.NO_CONTENT : Status.NOT_FOUND)
+ .onItem().transform(status -> Response.status(status).build());
}
}
----
diff --git a/docs/src/main/asciidoc/grpc-service-consumption.adoc b/docs/src/main/asciidoc/grpc-service-consumption.adoc
index f04a887898b02..6669fda56a54a 100644
--- a/docs/src/main/asciidoc/grpc-service-consumption.adoc
+++ b/docs/src/main/asciidoc/grpc-service-consumption.adoc
@@ -70,7 +70,7 @@ public String helloBlocking(@PathParam("name") String name) {
@Path("/mutiny/{name}")
public Uni helloMutiny(@PathParam("name") String name) {
return mutinyHelloService.sayHello(HelloRequest.newBuilder().setName(name).build())
- .onItem().apply(HelloReply::getMessage);
+ .onItem().transform(HelloReply::getMessage);
}
----
@@ -118,7 +118,7 @@ public class StreamingEndpoint {
public Multi invokeSource() {
// Retrieve a stream
return client.source(Empty.newBuilder().build())
- .onItem().apply(Item::getValue);
+ .onItem().transform(Item::getValue);
}
@GET
@@ -138,7 +138,7 @@ public class StreamingEndpoint {
Multi- inputs = Multi.createFrom().range(0, max)
.map(i -> Integer.toString(i))
.map(i -> Item.newBuilder().setValue(i).build());
- return client.pipe(inputs).onItem().apply(Item::getValue);
+ return client.pipe(inputs).onItem().transform(Item::getValue);
}
}
diff --git a/docs/src/main/asciidoc/grpc-service-implementation.adoc b/docs/src/main/asciidoc/grpc-service-implementation.adoc
index f01c20b7356d6..3b85b3b68ed15 100644
--- a/docs/src/main/asciidoc/grpc-service-implementation.adoc
+++ b/docs/src/main/asciidoc/grpc-service-implementation.adoc
@@ -129,7 +129,7 @@ public class StreamingService extends MutinyStreamingGrpc.StreamingImplBase {
.map(Item::getValue)
.map(Long::parseLong)
.onItem().scan(() -> 0L, Long::sum)
- .onItem().apply(l -> Item.newBuilder().setValue(Long.toString(l)).build());
+ .onItem().transform(l -> Item.newBuilder().setValue(Long.toString(l)).build());
}
}
----
diff --git a/docs/src/main/asciidoc/reactive-event-bus.adoc b/docs/src/main/asciidoc/reactive-event-bus.adoc
index ed15233761a22..d21b8e72fbcfc 100644
--- a/docs/src/main/asciidoc/reactive-event-bus.adoc
+++ b/docs/src/main/asciidoc/reactive-event-bus.adoc
@@ -229,7 +229,7 @@ public class EventResource {
@Path("{name}")
public Uni greeting(@PathParam String name) {
return bus.request("greeting", name) // <2>
- .onItem().apply(Message::body);
+ .onItem().transform(Message::body);
}
}
----
@@ -250,7 +250,7 @@ bus.sendAndForget("greeting", name)
bus.publish("greeting", name)
// Case 3
Uni response = bus.request("address", "hello, how are you?")
- .onItem().apply(Message::body);
+ .onItem().transform(Message::body);
----
== Putting things together - bridging HTTP and messages
@@ -302,7 +302,7 @@ public class EventResource {
@Path("{name}")
public Uni greeting(@PathParam String name) {
return bus.request("greeting", name) // <1>
- .onItem().apply(Message::body); // <2>
+ .onItem().transform(Message::body); // <2>
}
}
----
@@ -376,7 +376,7 @@ So you can exchange objects as follows:
@Path("{name}")
public Uni greeting(@PathParam String name) {
return bus.request("greeting", new MyName(name))
- .onItem().apply(Message::body);
+ .onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting")
@@ -395,7 +395,7 @@ If you want to use a specific codec, you need to explicitly set it on both ends:
public Uni greeting(@PathParam String name) {
return bus.request("greeting", name,
new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) // <1>
- .onItem().apply(Message::body);
+ .onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting", codec = MyNameCodec.class) // <2>
diff --git a/docs/src/main/asciidoc/reactive-sql-clients.adoc b/docs/src/main/asciidoc/reactive-sql-clients.adoc
index 4f8aba843f920..8e7dd391897e3 100644
--- a/docs/src/main/asciidoc/reactive-sql-clients.adoc
+++ b/docs/src/main/asciidoc/reactive-sql-clients.adoc
@@ -221,8 +221,8 @@ A `RowSet` is an `java.lang.Iterable
` and thus can be converted to a `Multi
[source,java]
----
Multi fruits = rowSet
- .onItem().produceMulti(set -> Multi.createFrom().iterable(set))
- .onItem().apply(Fruit::from);
+ .onItem().transformToMulti(set -> Multi.createFrom().iterable(set))
+ .onItem().transform(Fruit::from);
----
The `Fruit#from` method converts a `Row` instance to a `Fruit` instance.
@@ -243,8 +243,8 @@ Putting it all together, the `Fruit.findAll` method looks like:
----
public static Multi findAll(PgPool client) {
return client.query("SELECT id, name FROM fruits ORDER BY name ASC").execute()
- .onItem().produceMulti(set -> Multi.createFrom().iterable(set))
- .onItem().apply(Fruit::from);
+ .onItem().transformToMulti(set -> Multi.createFrom().iterable(set))
+ .onItem().transform(Fruit::from);
}
----
@@ -292,8 +292,8 @@ Equipped with this tooling, we are able to safely use an `id` provided by the us
----
public static Uni findById(PgPool client, Long id) {
return client.preparedQuery("SELECT id, name FROM fruits WHERE id = $1").execute(Tuple.of(id)) // <1>
- .onItem().apply(RowSet::iterator) // <2>
- .onItem().apply(iterator -> iterator.hasNext() ? from(iterator.next()) : null); // <3>
+ .onItem().transform(RowSet::iterator) // <2>
+ .onItem().transform(iterator -> iterator.hasNext() ? from(iterator.next()) : null); // <3>
}
----
<1> Create a `Tuple` to hold the prepared query parameters.
@@ -309,8 +309,8 @@ And in the JAX-RS resource:
@Path("{id}")
public Uni getSingle(@PathParam Long id) {
return Fruit.findById(client, id)
- .onItem().apply(fruit -> fruit != null ? Response.ok(fruit) : Response.status(Status.NOT_FOUND)) // <1>
- .onItem().apply(ResponseBuilder::build); // <2>
+ .onItem().transform(fruit -> fruit != null ? Response.ok(fruit) : Response.status(Status.NOT_FOUND)) // <1>
+ .onItem().transform(ResponseBuilder::build); // <2>
}
----
<1> Prepare a JAX-RS response with either the `Fruit` instance if found or the `404` status code.
@@ -323,7 +323,7 @@ The same logic applies when saving a `Fruit`:
----
public Uni save(PgPool client) {
return client.preparedQuery("INSERT INTO fruits (name) VALUES ($1) RETURNING (id)").execute(Tuple.of(name))
- .onItem().apply(pgRowSet -> pgRowSet.iterator().next().getLong("id"));
+ .onItem().transform(pgRowSet -> pgRowSet.iterator().next().getLong("id"));
}
----
@@ -335,8 +335,8 @@ And in the web resource we handle the `POST` request:
@POST
public Uni create(Fruit fruit) {
return fruit.save(client)
- .onItem().apply(id -> URI.create("/fruits/" + id))
- .onItem().apply(uri -> Response.created(uri).build());
+ .onItem().transform(id -> URI.create("/fruits/" + id))
+ .onItem().transform(uri -> Response.created(uri).build());
}
----
@@ -354,7 +354,7 @@ Let's use this to support removal of fruits in the database:
----
public static Uni delete(PgPool client, Long id) {
return client.preparedQuery("DELETE FROM fruits WHERE id = $1").execute(Tuple.of(id))
- .onItem().apply(pgRowSet -> pgRowSet.rowCount() == 1); // <1>
+ .onItem().transform(pgRowSet -> pgRowSet.rowCount() == 1); // <1>
}
----
<1> Inspect metadata to determine if a fruit has been actually deleted.
@@ -368,8 +368,8 @@ And to handle the HTTP `DELETE` method in the web resource:
@Path("{id}")
public Uni delete(@PathParam Long id) {
return Fruit.delete(client, id)
- .onItem().apply(deleted -> deleted ? Status.NO_CONTENT : Status.NOT_FOUND)
- .onItem().apply(status -> Response.status(status).build());
+ .onItem().transform(deleted -> deleted ? Status.NO_CONTENT : Status.NOT_FOUND)
+ .onItem().transform(status -> Response.status(status).build());
}
----
@@ -517,7 +517,7 @@ return SqlClientHelper.inTransactionUni(client, tx -> tx
.preparedQuery("INSERT INTO person (firstname,lastname) VALUES ($1,$2) RETURNING (id)")
.execute(Tuple.of(person.getFirstName(), person.getLastName()))
- .onItem().produceUni(id -> tx.preparedQuery("INSERT INTO addr (person_id,addrline1) VALUES ($1,$2)")
+ .onItem().transformToUni(id -> tx.preparedQuery("INSERT INTO addr (person_id,addrline1) VALUES ($1,$2)")
.execute(Tuple.of(id.iterator().next().getLong("id"), person.getLastName())))
.onItem().ignore().andContinueWithNull());
diff --git a/docs/src/main/asciidoc/vertx.adoc b/docs/src/main/asciidoc/vertx.adoc
index 11dde733aa90c..59442e87773a6 100644
--- a/docs/src/main/asciidoc/vertx.adoc
+++ b/docs/src/main/asciidoc/vertx.adoc
@@ -177,7 +177,7 @@ vertx.fileSystem().readFile("lorem-ipsum.txt", ar -> {
// Mutiny Vert.x:
vertx.fileSystem().readFile("lorem-ipsum.txt")
- .onItem().apply(buffer -> buffer.toString("UTF-8"))
+ .onItem().transform(buffer -> buffer.toString("UTF-8"))
.subscribe()
.with(
content -> System.out.println("Content: " + content),
@@ -268,7 +268,7 @@ public class GreetingResource {
@Produces(MediaType.TEXT_PLAIN)
public Uni doSomethingAsync() {
return vertx.fileSystem().readFile("/META-INF/resources/lorem.txt")
- .onItem().apply(b -> b.toString("UTF-8"));
+ .onItem().transform(b -> b.toString("UTF-8"));
}
}
----
@@ -535,7 +535,7 @@ public class ResourceUsingWebClient {
public Uni getFruitData(@PathParam("name") String name) {
return client.get("/api/fruit/" + name)
.send()
- .onItem().apply(resp -> {
+ .onItem().transform(resp -> {
if (resp.statusCode() == 200) {
return resp.bodyAsJsonObject();
} else {
diff --git a/extensions/funqy/funqy-server-common/runtime/src/main/java/io/quarkus/funqy/runtime/FunctionInvoker.java b/extensions/funqy/funqy-server-common/runtime/src/main/java/io/quarkus/funqy/runtime/FunctionInvoker.java
index 595c71a539f35..b0957e1bba513 100644
--- a/extensions/funqy/funqy-server-common/runtime/src/main/java/io/quarkus/funqy/runtime/FunctionInvoker.java
+++ b/extensions/funqy/funqy-server-common/runtime/src/main/java/io/quarkus/funqy/runtime/FunctionInvoker.java
@@ -119,7 +119,7 @@ public void invoke(FunqyServerRequest request, FunqyServerResponse response) {
try {
Object result = method.invoke(target, args);
if (isAsync()) {
- response.setOutput(((Uni>) result).onFailure().apply(t -> new ApplicationException(t)));
+ response.setOutput(((Uni>) result).onFailure().transform(t -> new ApplicationException(t)));
} else {
response.setOutput(Uni.createFrom().item(result));
}
diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/services/MutinyTestService.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/services/MutinyTestService.java
index 5011b614d9a7c..1cd32ebd4467f 100644
--- a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/services/MutinyTestService.java
+++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/services/MutinyTestService.java
@@ -68,7 +68,7 @@ public Multi halfDuplexCall(
.map(r -> r.getPayload().getBody().toStringUtf8())
.map(String::toUpperCase)
.collectItems().asList()
- .onItem().produceMulti(s -> Multi.createFrom().iterable(s))
+ .onItem().transformToMulti(s -> Multi.createFrom().iterable(s))
.map(r -> Messages.Payload.newBuilder().setBody(ByteString.copyFromUtf8(r)).build())
.map(r -> Messages.StreamingOutputCallResponse.newBuilder().setPayload(r).build());
diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/reflection/ReflectionService.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/reflection/ReflectionService.java
index bf97b7530e556..bd7cac116b6f4 100644
--- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/reflection/ReflectionService.java
+++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/reflection/ReflectionService.java
@@ -35,7 +35,7 @@ public ReflectionService(List definitions) {
@Override
public Multi serverReflectionInfo(Multi request) {
return request
- .onItem().apply(new Function() {
+ .onItem().transform(new Function() {
@Override
public ServerReflectionResponse apply(ServerReflectionRequest req) {
switch (req.getMessageRequestCase()) {
diff --git a/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/runtime/MailTemplateInstanceImpl.java b/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/runtime/MailTemplateInstanceImpl.java
index 8bb6948c2c49b..37b6863d6062e 100644
--- a/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/runtime/MailTemplateInstanceImpl.java
+++ b/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/runtime/MailTemplateInstanceImpl.java
@@ -6,6 +6,7 @@
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import io.quarkus.mailer.Mail;
@@ -94,8 +95,14 @@ public CompletionStage send() {
if (variant.mediaType.equals(Variant.TEXT_HTML) || variant.mediaType.equals(Variant.TEXT_PLAIN)) {
results.add(new Result(variant,
Uni.createFrom().completionStage(
- () -> templateInstance.setAttribute(TemplateInstance.SELECTED_VARIANT, variant).data(data)
- .renderAsync())));
+ new Supplier>() {
+ @Override
+ public CompletionStage extends String> get() {
+ return templateInstance
+ .setAttribute(TemplateInstance.SELECTED_VARIANT, variant).data(data)
+ .renderAsync();
+ }
+ })));
}
}
if (results.isEmpty()) {
@@ -104,7 +111,12 @@ public CompletionStage send() {
List> unis = results.stream().map(Result::getValue).collect(Collectors.toList());
return Uni.combine().all().unis(unis)
.combinedWith(combine(results))
- .onItem().produceUni(m -> mailer.send(m))
+ .chain(new Function>() {
+ @Override
+ public Uni extends Void> apply(Mail m) {
+ return mailer.send(m);
+ }
+ })
.subscribeAsCompletionStage();
} else {
throw new IllegalStateException("No template variant found");
@@ -112,17 +124,20 @@ public CompletionStage send() {
}
private Function, Mail> combine(List results) {
- return ignored -> {
- for (Result res : results) {
- // We can safely access the content here: 1. it has been resolved, 2. it's cached.
- String content = res.value.await().indefinitely();
- if (res.variant.mediaType.equals(Variant.TEXT_HTML)) {
- mail.setHtml(content);
- } else if (res.variant.mediaType.equals(Variant.TEXT_PLAIN)) {
- mail.setText(content);
+ return new Function, Mail>() {
+ @Override
+ public Mail apply(List> ignored) {
+ for (Result res : results) {
+ // We can safely access the content here: 1. it has been resolved, 2. it's cached.
+ String content = res.value.await().indefinitely();
+ if (res.variant.mediaType.equals(Variant.TEXT_HTML)) {
+ mail.setHtml(content);
+ } else if (res.variant.mediaType.equals(Variant.TEXT_PLAIN)) {
+ mail.setText(content);
+ }
}
+ return mail;
}
- return mail;
};
}
diff --git a/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/runtime/MutinyMailerImpl.java b/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/runtime/MutinyMailerImpl.java
index fab13b519757d..213a1594c7073 100644
--- a/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/runtime/MutinyMailerImpl.java
+++ b/extensions/mailer/runtime/src/main/java/io/quarkus/mailer/runtime/MutinyMailerImpl.java
@@ -6,6 +6,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Function;
import java.util.stream.Collectors;
import javax.enterprise.context.ApplicationScoped;
@@ -45,6 +46,13 @@ public class MutinyMailerImpl implements ReactiveMailer {
@Inject
MailerSupport mailerSupport;
+ private static final Function, Void> ignore = new Function, Void>() {
+ @Override
+ public Void apply(List> results) {
+ return null;
+ }
+ };;
+
@Override
public Uni send(Mail... mails) {
if (mails == null) {
@@ -52,12 +60,21 @@ public Uni send(Mail... mails) {
}
List> unis = stream(mails)
- .map(mail -> toMailMessage(mail)
- .onItem().produceUni(mailMessage -> send(mail, mailMessage)))
+ .map(new Function>() {
+ @Override
+ public Uni apply(Mail mail) {
+ return MutinyMailerImpl.this.toMailMessage(mail)
+ .chain(new Function>() {
+ @Override
+ public Uni extends Void> apply(MailMessage mailMessage) {
+ return send(mail, mailMessage);
+ }
+ });
+ }
+ })
.collect(Collectors.toList());
- return Uni.combine().all().unis(unis).combinedWith(results -> null);
-
+ return Uni.combine().all().unis(unis).combinedWith(ignore);
}
private Uni send(Mail mail, MailMessage message) {
@@ -146,7 +163,7 @@ private Uni toMailAttachment(Attachment attachment) {
}
return getAttachmentStream(vertx, attachment)
- .onItem().apply(attach::setData);
+ .onItem().transform(attach::setData);
}
public static Uni getAttachmentStream(Vertx vertx, Attachment attachment) {
diff --git a/extensions/mailer/runtime/src/test/java/io/quarkus/mailer/AttachmentTest.java b/extensions/mailer/runtime/src/test/java/io/quarkus/mailer/AttachmentTest.java
index 40c34b20b00c4..16e4e8e5412d2 100644
--- a/extensions/mailer/runtime/src/test/java/io/quarkus/mailer/AttachmentTest.java
+++ b/extensions/mailer/runtime/src/test/java/io/quarkus/mailer/AttachmentTest.java
@@ -71,8 +71,8 @@ void testInlineAttachmentCreationFromFile() {
@Test
void testAttachmentCreationFromStream() {
Publisher publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true))
- .onItem().produceMulti(af -> af.toMulti()
- .onItem().produceIterable(this::getBytes).concatenate());
+ .onItem().transformToMulti(af -> af.toMulti()
+ .onItem().transformToIterable(this::getBytes));
Attachment attachment = new Attachment("lorem.txt", publisher, "text/plain");
assertThat(attachment.getFile()).isNull();
@@ -110,8 +110,8 @@ public Byte next() {
@Test
void testInlineAttachmentCreationFromStream() {
Publisher publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true))
- .onItem().produceMulti(af -> af.toMulti()
- .onItem().produceIterable(this::getBytes).concatenate());
+ .onItem().transformToMulti(af -> af.toMulti()
+ .onItem().transformToIterable(this::getBytes));
Attachment attachment = new Attachment("lorem.txt", publisher, "text/plain", "");
assertThat(attachment.getFile()).isNull();
@@ -129,8 +129,8 @@ void testInlineAttachmentCreationFromStream() {
@Test
void testAttachmentCreationWithDescription() {
Publisher publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true))
- .onItem().produceMulti(af -> af.toMulti()
- .onItem().produceIterable(this::getBytes).concatenate());
+ .onItem().transformToMulti(af -> af.toMulti()
+ .onItem().transformToIterable(this::getBytes));
Attachment attachment = new Attachment("lorem.txt", publisher, "text/plain",
DESCRIPTION, Attachment.DISPOSITION_ATTACHMENT);
@@ -155,8 +155,8 @@ private String getContent(Attachment attachment) {
@Test
void testInlineAttachmentCreationWithDescription() {
Publisher publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true))
- .onItem().produceMulti(af -> af.toMulti()
- .onItem().produceIterable(this::getBytes).concatenate());
+ .onItem().transformToMulti(af -> af.toMulti()
+ .onItem().transformToIterable(this::getBytes));
Attachment attachment = new Attachment("lorem.txt", publisher, "text/plain",
DESCRIPTION, Attachment.DISPOSITION_INLINE);
diff --git a/extensions/mongodb-client/runtime/src/test/java/io/quarkus/mongodb/reactive/ListDatabaseTest.java b/extensions/mongodb-client/runtime/src/test/java/io/quarkus/mongodb/reactive/ListDatabaseTest.java
index 821039b3b3530..8a1644f535e4c 100644
--- a/extensions/mongodb-client/runtime/src/test/java/io/quarkus/mongodb/reactive/ListDatabaseTest.java
+++ b/extensions/mongodb-client/runtime/src/test/java/io/quarkus/mongodb/reactive/ListDatabaseTest.java
@@ -39,13 +39,12 @@ void testListingDatabasesWithNoCreatedDatabases() {
.containsExactlyInAnyOrder("local", "admin");
List names = client.startSession()
- .onItem().produceUni(session -> client.listDatabaseNames(session).collectItems().asList())
+ .chain(session -> client.listDatabaseNames(session).collectItems().asList())
.await().indefinitely();
assertThat(names).containsExactlyInAnyOrder("local", "admin");
names = client.startSession()
- .onItem()
- .produceUni(session -> client.listDatabases(session).map(doc -> doc.getString("name")).collectItems().asList())
+ .chain(session -> client.listDatabases(session).map(doc -> doc.getString("name")).collectItems().asList())
.await().indefinitely();
assertThat(names).containsExactlyInAnyOrder("local", "admin");
}
diff --git a/extensions/mongodb-client/runtime/src/test/java/io/quarkus/mongodb/reactive/ReactiveMongoClientTest.java b/extensions/mongodb-client/runtime/src/test/java/io/quarkus/mongodb/reactive/ReactiveMongoClientTest.java
index 5901eb379dc2d..60a86c987ad68 100644
--- a/extensions/mongodb-client/runtime/src/test/java/io/quarkus/mongodb/reactive/ReactiveMongoClientTest.java
+++ b/extensions/mongodb-client/runtime/src/test/java/io/quarkus/mongodb/reactive/ReactiveMongoClientTest.java
@@ -45,8 +45,8 @@ void testFindOneReturnsObjectWithId() {
ReactiveMongoCollection myCollection = database.getCollection(collection);
Document document = createDoc();
myCollection.insertOne(document)
- .onItem().produceUni(x -> myCollection.find(eq("foo", "bar")).collectItems().first())
- .onItem().invoke(found -> {
+ .then(() -> myCollection.find(eq("foo", "bar")).collectItems().first())
+ .invoke(found -> {
assertThat(found).isNotNull();
assertThat(found.getObjectId("_id")).isNotNull();
})
@@ -60,8 +60,8 @@ void testFindOneReturnsEmptyWhenNonMatches() {
ReactiveMongoCollection myCollection = database.getCollection(collection);
Document document = createDoc();
myCollection.insertOne(document)
- .onItem().produceUni(x -> myCollection.find(eq("nothing", "missing")).collectItems().first())
- .onItem().invoke(opt -> assertThat(opt).isNull())
+ .then(() -> myCollection.find(eq("nothing", "missing")).collectItems().first())
+ .invoke(opt -> assertThat(opt).isNull())
.await().indefinitely();
}
@@ -122,7 +122,7 @@ void testFindBatch() {
}
List documents = new CopyOnWriteArrayList<>();
myCollection.insertMany(toBeInserted)
- .onItem().produceUni(x -> myCollection.find(new FindOptions().sort(eq("foo", 1)))
+ .then(() -> myCollection.find(new FindOptions().sort(eq("foo", 1)))
.onItem().invoke(documents::add)
.onItem().ignoreAsUni())
.await().indefinitely();
@@ -143,7 +143,7 @@ void testFindBatchWithClass() {
}
List documents = new CopyOnWriteArrayList<>();
myCollection.insertMany(toBeInserted)
- .onItem().produceUni(x -> myCollection.find(Document.class, new FindOptions().sort(eq("foo", 1)))
+ .then(() -> myCollection.find(Document.class, new FindOptions().sort(eq("foo", 1)))
.onItem().invoke(documents::add)
.onItem().ignoreAsUni())
.await().indefinitely();
@@ -164,7 +164,7 @@ void testFindBatchWithFilter() {
}
List documents = new CopyOnWriteArrayList<>();
myCollection.insertMany(toBeInserted)
- .onItem().produceUni(x -> myCollection.find(new FindOptions().filter(eq("num", 123)).sort(eq("foo", 1)))
+ .then(() -> myCollection.find(new FindOptions().filter(eq("num", 123)).sort(eq("foo", 1)))
.onItem().invoke(documents::add)
.onItem().ignoreAsUni())
.await().indefinitely();
@@ -185,7 +185,7 @@ void testFindBatchWithFilterAndClass() {
}
List documents = new CopyOnWriteArrayList<>();
myCollection.insertMany(toBeInserted)
- .onItem().produceUni(x -> myCollection.find(Document.class,
+ .then(() -> myCollection.find(Document.class,
new FindOptions().filter(eq("num", 123)).sort(eq("foo", 1)))
.onItem().invoke(documents::add)
.onItem().ignoreAsUni())
@@ -231,7 +231,7 @@ private Uni upsertDoc(String collection, Document docToInsert,
.updateMany(eq("foo", docToInsert.getString("foo")),
insertStatement,
new UpdateOptions().upsert(true))
- .onItem().produceUni(result -> {
+ .chain(result -> {
assertThat(result.getModifiedCount()).isEqualTo(0);
if (expectedId == null) {
assertThat(0).isEqualTo(result.getMatchedCount());
@@ -264,10 +264,10 @@ void testAggregate() {
pipeline.add(doc4);
Optional optional = client.getDatabase(DATABASE).createCollection(collection)
- .onItem().produceUni(x -> insertDocs(client, collection, numDocs))
- .onItem().produceMulti(x -> client.getDatabase(DATABASE).getCollection(collection).aggregate(pipeline))
+ .then(() -> insertDocs(client, collection, numDocs))
+ .onItem().transformToMulti(x -> client.getDatabase(DATABASE).getCollection(collection).aggregate(pipeline))
.collectItems().first()
- .onItem().apply(doc -> doc.getInteger("foo_starting_with_bar1"))
+ .onItem().transform(doc -> doc.getInteger("foo_starting_with_bar1"))
.await().asOptional().indefinitely();
assertThat(optional).contains(11);
}
diff --git a/extensions/panache/mongodb-panache/runtime/src/main/java/io/quarkus/mongodb/panache/reactive/runtime/ReactiveMongoOperations.java b/extensions/panache/mongodb-panache/runtime/src/main/java/io/quarkus/mongodb/panache/reactive/runtime/ReactiveMongoOperations.java
index cbe3c09c52797..6e93e4c48fea4 100644
--- a/extensions/panache/mongodb-panache/runtime/src/main/java/io/quarkus/mongodb/panache/reactive/runtime/ReactiveMongoOperations.java
+++ b/extensions/panache/mongodb-panache/runtime/src/main/java/io/quarkus/mongodb/panache/reactive/runtime/ReactiveMongoOperations.java
@@ -336,13 +336,13 @@ private static ReactiveMongoClient mongoClient(MongoEntity entity) {
public static Uni