Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update to mutiny 0.6.0 #10676

Merged
merged 3 commits into from
Jul 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bom/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
<netty.version>4.1.49.Final</netty.version>
<reactive-streams.version>1.0.3</reactive-streams.version>
<jboss-logging.version>3.3.2.Final</jboss-logging.version>
<mutiny.version>0.5.4</mutiny.version>
<mutiny.version>0.6.0</mutiny.version>
<axle-client.version>0.1.1</axle-client.version>
<mutiny-client.version>0.1.1</mutiny-client.version>
<kafka2.version>2.5.0</kafka2.version>
Expand Down
4 changes: 2 additions & 2 deletions docs/src/main/asciidoc/amazon-dynamodb.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ public class FruitAsyncService extends AbstractService {

public Uni<List<Fruit>> findAll() {
return Uni.createFrom().completionStage(() -> dynamoDB.scan(scanRequest()))
.onItem().apply(res -> res.items().stream().map(Fruit::from).collect(Collectors.toList()));
.onItem().transform(res -> res.items().stream().map(Fruit::from).collect(Collectors.toList()));
}

public Uni<List<Fruit>> add(Fruit fruit) {
Expand All @@ -423,7 +423,7 @@ public class FruitAsyncService extends AbstractService {

public Uni<Fruit> get(String name) {
return Uni.createFrom().completionStage(() -> dynamoDB.getItem(getRequest(name)))
.onItem().apply(resp -> Fruit.from(resp.item()));
.onItem().transform(resp -> Fruit.from(resp.item()));
}
}
----
Expand Down
12 changes: 7 additions & 5 deletions docs/src/main/asciidoc/amazon-kms.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -260,17 +260,19 @@ public class QuarkusKmsAsyncResource {
@Path("/encrypt")
public Uni<String> encrypt(String data) {
return Uni.createFrom().completionStage(kms.encrypt(req -> req.keyId(keyArn).plaintext(SdkBytes.fromUtf8String(data))))
.onItem().apply(EncryptResponse::ciphertextBlob)
.onItem().apply(blob -> Base64.encodeBase64String(blob.asByteArray()));
.onItem().transform(EncryptResponse::ciphertextBlob)
.onItem().transform(blob -> Base64.encodeBase64String(blob.asByteArray()));
}

@POST
@Path("/decrypt")
public Uni<String> decrypt(String data) {
return Uni.createFrom().item(SdkBytes.fromByteArray(Base64.decodeBase64(data.getBytes())))
.onItem().produceCompletionStage(msg -> kms.decrypt(req -> req.keyId(keyArn).ciphertextBlob(msg)))
.onItem().apply(DecryptResponse::plaintext)
.onItem().apply(SdkBytes::asUtf8String);
.onItem().transformToUni(msg ->
Uni.createFrom().completionStage(kms.decrypt(req -> req.keyId(keyArn).ciphertextBlob(msg)))
)
.onItem().transform(DecryptResponse::plaintext)
.onItem().transform(SdkBytes::asUtf8String);
}
}
----
Expand Down
2 changes: 1 addition & 1 deletion docs/src/main/asciidoc/amazon-s3.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ public class S3AsyncClientResource extends CommonResource {
.build();

return Uni.createFrom().completionStage(() -> s3.listObjects(listRequest))
.onItem().apply(result -> toFileItems(result));
.onItem().transform(result -> toFileItems(result));
}

private List<FileObject> toFileItems(ListObjectsResponse objects) {
Expand Down
21 changes: 12 additions & 9 deletions docs/src/main/asciidoc/amazon-sns.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -700,8 +700,8 @@ public class QuarksCannonAsyncResource {
return Uni.createFrom()
.completionStage(sns.publish(p -> p.topicArn(topicArn).message(message)))
.onItem().invoke(item -> LOGGER.infov("Fired Quark[{0}, {1}}]", quark.getFlavor(), quark.getSpin()))
.onItem().apply(PublishResponse::messageId)
.onItem().apply(id -> Response.ok().entity(id).build());
.onItem().transform(PublishResponse::messageId)
.onItem().transform(id -> Response.ok().entity(id).build());
}
}
----
Expand Down Expand Up @@ -770,14 +770,17 @@ public class QuarksShieldAsyncResource {

if (messageType.equals(NOTIFICATION_TYPE)) {
return Uni.createFrom().item(readObject(SnsNotification.class, message))
.onItem().apply(notification -> readObject(Quark.class, notification.getMessage()))
.onItem().transform(notification -> readObject(Quark.class, notification.getMessage()))
.onItem().invoke(quark -> LOGGER.infov("Quark[{0}, {1}] collision with the shield.", quark.getFlavor(), quark.getSpin()))
.onItem().apply(quark -> Response.ok().build());
.onItem().transform(quark -> Response.ok().build());
} else if (messageType.equals(SUBSCRIPTION_CONFIRMATION_TYPE)) {
return Uni.createFrom().item(readObject(SnsSubscriptionConfirmation.class, message))
.onItem().produceCompletionStage(msg -> sns.confirmSubscription(confirm -> confirm.topicArn(topicArn).token(msg.getToken())))
.onItem().transformToUni(msg ->
Uni.createFrom().completionStage(
sns.confirmSubscription(confirm -> confirm.topicArn(topicArn).token(msg.getToken())))
)
.onItem().invoke(resp -> LOGGER.info("Subscription confirmed. Ready for quarks collisions."))
.onItem().apply(resp -> Response.ok().build());
.onItem().transform(resp -> Response.ok().build());
} else if (messageType.equals(UNSUBSCRIPTION_CONFIRMATION_TYPE)) {
LOGGER.info("We are unsubscribed");
return Uni.createFrom().item(Response.ok().build());
Expand All @@ -791,10 +794,10 @@ public class QuarksShieldAsyncResource {
public Uni<Response> subscribe() {
return Uni.createFrom()
.completionStage(sns.subscribe(s -> s.topicArn(topicArn).protocol("http").endpoint(notificationEndpoint())))
.onItem().apply(SubscribeResponse::subscriptionArn)
.onItem().transform(SubscribeResponse::subscriptionArn)
.onItem().invoke(this::setSubscriptionArn)
.onItem().invoke(arn -> LOGGER.infov("Subscribed Quarks shield with id = {0} ", arn))
.onItem().apply(arn -> Response.ok().entity(arn).build());
.onItem().transform(arn -> Response.ok().entity(arn).build());
}

@POST
Expand All @@ -805,7 +808,7 @@ public class QuarksShieldAsyncResource {
.completionStage(sns.unsubscribe(s -> s.subscriptionArn(subscriptionArn)))
.onItem().invoke(arn -> LOGGER.infov("Unsubscribed quarks shield for id = {0}", subscriptionArn))
.onItem().invoke(arn -> subscriptionArn = null)
.onItem().apply(arn -> Response.ok().build());
.onItem().transform(arn -> Response.ok().build());
} else {
return Uni.createFrom().item(Response.status(400).entity("Not subscribed yet").build());
}
Expand Down
8 changes: 4 additions & 4 deletions docs/src/main/asciidoc/amazon-sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,8 @@ public class QuarksCannonAsyncResource {
return Uni.createFrom()
.completionStage(sqs.sendMessage(m -> m.queueUrl(queueUrl).messageBody(message)))
.onItem().invoke(item -> LOGGER.infov("Fired Quark[{0}, {1}}]", quark.getFlavor(), quark.getSpin()))
.onItem().apply(SendMessageResponse::messageId)
.onItem().apply(id -> Response.ok().entity(id).build());
.onItem().transform(SendMessageResponse::messageId)
.onItem().transform(id -> Response.ok().entity(id).build());
}
}
----
Expand Down Expand Up @@ -428,8 +428,8 @@ public class QuarksShieldAsyncResource {
public Uni<List<Quark>> receive() {
return Uni.createFrom()
.completionStage(sqs.receiveMessage(m -> m.maxNumberOfMessages(10).queueUrl(queueUrl)))
.onItem().apply(ReceiveMessageResponse::messages)
.onItem().apply(m -> m.stream().map(Message::body).map(this::toQuark).collect(Collectors.toList()));
.onItem().transform(ReceiveMessageResponse::messages)
.onItem().transform(m -> m.stream().map(Message::body).map(this::toQuark).collect(Collectors.toList()));
}

private Quark toQuark(String message) {
Expand Down
36 changes: 18 additions & 18 deletions docs/src/main/asciidoc/getting-started-reactive.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ client = WebClient.create(vertx,
Uni<JsonObject> uni =
client.get("/api/fruit/" + name)
.send()
.onItem().apply(resp -> {
.onItem().transform(resp -> {
if (resp.statusCode() == 200) {
return resp.bodyAsJsonObject();
} else {
Expand Down Expand Up @@ -245,7 +245,7 @@ public class ReactiveGreetingService {

public Uni<String> greeting(String name) {
return Uni.createFrom().item(name)
.onItem().apply(n -> String.format("hello %s", name));
.onItem().transform(n -> String.format("hello %s", name));
}
}
----
Expand Down Expand Up @@ -312,7 +312,7 @@ In the `ReactiveGreetingService`, add the following method:
----
public Multi<String> greetings(int count, String name) {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.onItem().apply(n -> String.format("hello %s - %d", name, n))
.onItem().transform(n -> String.format("hello %s - %d", name, n))
.transform().byTakingFirstItems(count);
}
----
Expand Down Expand Up @@ -456,30 +456,30 @@ public class Fruit {
public static Multi<Fruit> findAll(PgPool client) {
return client.query("SELECT id, name FROM fruits ORDER BY name ASC").execute()
// Create a Multi from the set of rows:
.onItem().produceMulti(set -> Multi.createFrom().items(() -> StreamSupport.stream(set.spliterator(), false)))
.onItem().transformToMulti(set -> Multi.createFrom().items(() -> StreamSupport.stream(set.spliterator(), false)))
// For each row create a fruit instance
.onItem().apply(Fruit::from);
.onItem().transform(Fruit::from);
}

public static Uni<Fruit> findById(PgPool client, Long id) {
return client.preparedQuery("SELECT id, name FROM fruits WHERE id = $1").execute(Tuple.of(id))
.onItem().apply(RowSet::iterator)
.onItem().apply(iterator -> iterator.hasNext() ? from(iterator.next()) : null);
.onItem().transform(RowSet::iterator)
.onItem().transform(iterator -> iterator.hasNext() ? from(iterator.next()) : null);
}

public Uni<Long> save(PgPool client) {
return client.preparedQuery("INSERT INTO fruits (name) VALUES ($1) RETURNING (id)").execute(Tuple.of(name))
.onItem().apply(pgRowSet -> pgRowSet.iterator().next().getLong("id"));
.onItem().transform(pgRowSet -> pgRowSet.iterator().next().getLong("id"));
}

public Uni<Boolean> update(PgPool client) {
return client.preparedQuery("UPDATE fruits SET name = $1 WHERE id = $2").execute(Tuple.of(name, id))
.onItem().apply(pgRowSet -> pgRowSet.rowCount() == 1);
.onItem().transform(pgRowSet -> pgRowSet.rowCount() == 1);
}

public static Uni<Boolean> delete(PgPool client, Long id) {
return client.preparedQuery("DELETE FROM fruits WHERE id = $1").execute(Tuple.of(id))
.onItem().apply(pgRowSet -> pgRowSet.rowCount() == 1);
.onItem().transform(pgRowSet -> pgRowSet.rowCount() == 1);
}

private static Fruit from(Row row) {
Expand Down Expand Up @@ -559,31 +559,31 @@ public class FruitResource {
@Path("{id}")
public Uni<Response> getSingle(@PathParam Long id) {
return Fruit.findById(client, id)
.onItem().apply(fruit -> fruit != null ? Response.ok(fruit) : Response.status(Status.NOT_FOUND))
.onItem().apply(ResponseBuilder::build);
.onItem().transform(fruit -> fruit != null ? Response.ok(fruit) : Response.status(Status.NOT_FOUND))
.onItem().transform(ResponseBuilder::build);
}

@POST
public Uni<Response> create(Fruit fruit) {
return fruit.save(client)
.onItem().apply(id -> URI.create("/fruits/" + id))
.onItem().apply(uri -> Response.created(uri).build());
.onItem().transform(id -> URI.create("/fruits/" + id))
.onItem().transform(uri -> Response.created(uri).build());
}

@PUT
@Path("{id}")
public Uni<Response> update(@PathParam Long id, Fruit fruit) {
return fruit.update(client)
.onItem().apply(updated -> updated ? Status.OK : Status.NOT_FOUND)
.onItem().apply(status -> Response.status(status).build());
.onItem().transform(updated -> updated ? Status.OK : Status.NOT_FOUND)
.onItem().transform(status -> Response.status(status).build());
}

@DELETE
@Path("{id}")
public Uni<Response> delete(@PathParam Long id) {
return Fruit.delete(client, id)
.onItem().apply(deleted -> deleted ? Status.NO_CONTENT : Status.NOT_FOUND)
.onItem().apply(status -> Response.status(status).build());
.onItem().transform(deleted -> deleted ? Status.NO_CONTENT : Status.NOT_FOUND)
.onItem().transform(status -> Response.status(status).build());
}
}
----
Expand Down
6 changes: 3 additions & 3 deletions docs/src/main/asciidoc/grpc-service-consumption.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public String helloBlocking(@PathParam("name") String name) {
@Path("/mutiny/{name}")
public Uni<String> helloMutiny(@PathParam("name") String name) {
return mutinyHelloService.sayHello(HelloRequest.newBuilder().setName(name).build())
.onItem().apply(HelloReply::getMessage);
.onItem().transform(HelloReply::getMessage);
}
----

Expand Down Expand Up @@ -118,7 +118,7 @@ public class StreamingEndpoint {
public Multi<String> invokeSource() {
// Retrieve a stream
return client.source(Empty.newBuilder().build())
.onItem().apply(Item::getValue);
.onItem().transform(Item::getValue);
}

@GET
Expand All @@ -138,7 +138,7 @@ public class StreamingEndpoint {
Multi<Item> inputs = Multi.createFrom().range(0, max)
.map(i -> Integer.toString(i))
.map(i -> Item.newBuilder().setValue(i).build());
return client.pipe(inputs).onItem().apply(Item::getValue);
return client.pipe(inputs).onItem().transform(Item::getValue);
}

}
Expand Down
2 changes: 1 addition & 1 deletion docs/src/main/asciidoc/grpc-service-implementation.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public class StreamingService extends MutinyStreamingGrpc.StreamingImplBase {
.map(Item::getValue)
.map(Long::parseLong)
.onItem().scan(() -> 0L, Long::sum)
.onItem().apply(l -> Item.newBuilder().setValue(Long.toString(l)).build());
.onItem().transform(l -> Item.newBuilder().setValue(Long.toString(l)).build());
}
}
----
Expand Down
10 changes: 5 additions & 5 deletions docs/src/main/asciidoc/reactive-event-bus.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public class EventResource {
@Path("{name}")
public Uni<String> greeting(@PathParam String name) {
return bus.<String>request("greeting", name) // <2>
.onItem().apply(Message::body);
.onItem().transform(Message::body);
}
}
----
Expand All @@ -250,7 +250,7 @@ bus.sendAndForget("greeting", name)
bus.publish("greeting", name)
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
.onItem().apply(Message::body);
.onItem().transform(Message::body);
----

== Putting things together - bridging HTTP and messages
Expand Down Expand Up @@ -302,7 +302,7 @@ public class EventResource {
@Path("{name}")
public Uni<String> greeting(@PathParam String name) {
return bus.<String>request("greeting", name) // <1>
.onItem().apply(Message::body); // <2>
.onItem().transform(Message::body); // <2>
}
}
----
Expand Down Expand Up @@ -376,7 +376,7 @@ So you can exchange objects as follows:
@Path("{name}")
public Uni<String> greeting(@PathParam String name) {
return bus.<String>request("greeting", new MyName(name))
.onItem().apply(Message::body);
.onItem().transform(Message::body);
}

@ConsumeEvent(value = "greeting")
Expand All @@ -395,7 +395,7 @@ If you want to use a specific codec, you need to explicitly set it on both ends:
public Uni<String> greeting(@PathParam String name) {
return bus.<String>request("greeting", name,
new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) // <1>
.onItem().apply(Message::body);
.onItem().transform(Message::body);
}

@ConsumeEvent(value = "greeting", codec = MyNameCodec.class) // <2>
Expand Down
Loading