Skip to content

Latest commit

 

History

History
361 lines (255 loc) · 16 KB

mutiny-primer.adoc

File metadata and controls

361 lines (255 loc) · 16 KB

Mutiny - Async for bare mortal

Mutiny is an intuitive, reactive programming library. It is the primary model to write reactive applications with Quarkus.

An event-driven reactive programming API

Mutiny is very different from the other reactive programming libraries. It takes a different approach to design your program. With Mutiny everything is event-driven: you receive events, and you react to them. This event-driven aspect embraces the asynchronous nature of distributed systems and provides an elegant and precise way to express continuation.

Mutiny offers two types that are both event-driven and lazy:

  • A Uni emits a single event (an item or a failure). Unis are convenient to represent asynchronous actions that return 0 or 1 result. A good example is the result of sending a message to a message broker queue.

  • A Multi emits multiple events (n items, 1 failure or 1 completion). Multis can represent streams of items, potentially unbounded. A good example is receiving messages from a message broker queue.

These two types allow representing any type of interactions. They are event sources. You observe them (subscription) and you get notified when they emit an item, a failure, or, in the case of a bounded Multi, a completion event. When you (the subscriber) receive the event, you can process it (e.g., transform it, filter it). With Mutiny, you are going to write code like onX().action(), which reads as “on item X do action”.

If you want to know more about Mutiny, and the concepts behind it, check the Mutiny Reference documentation.

Mutiny in Quarkus

Mutiny is the primary API when dealing with the reactive features from Quarkus. It means that most extensions support Mutiny either by exposing an API returning Unis and Multis (such as reactive data sources or rest clients) or understanding when your methods return a Uni or a Multi (such as RESTEasy Reactive or Reactive Messaging).

These integrations make Mutiny a prominent and cohesive model for every reactive application developed with Quarkus. In addition, Mutiny architecture allows fine-grain dead-code elimination which improves the memory usage when compiled to native (such as with Quarkus native mode or GraalVM native image compiler).

Why another reactive programming API?

Seasoned reactive developers may wonder why Quarkus introduced yet another reactive programming APIs while there are existing ones. Mutiny is taking a very different angle:

Event-Driven - Mutiny places events at the core of its design. With Mutiny, you observe events, react to them, and create elegant and readable processing pipelines. A Ph.D. in functional programming is not required.

Navigable - Even with intelligent code completion, classes with hundreds of methods are confusing. Mutiny provides a navigable and explicit API driving you towards the operator you need.

Non-Blocking I/O - Mutiny is the perfect companion to tame the asynchronous nature of applications with non-blocking I/O (which powers Quarkus). Declaratively compose operations, transform data, enforce progress, recover from failures, and more.

Made for an asynchronous world - Mutiny can be used in any asynchronous application such as event-driven microservices, message-based applications, network utilities, data stream processing, and of course…​ reactive applications!

Reactive Streams and Converters Built-In - Mutiny is based on the Reactive Streams specification, and so it can be integrated with any other reactive programming library. In addition, it proposes converters to interact with other popular libraries.

Mutiny and the I/O Threads

Quarkus is powered by a reactive engine, and when developing a reactive application, your code is executed on one of the few I/O threads. Remember, you must never block these threads, and the model would collapse if you do. So, you can’t use blocking I/O. Instead, you need to schedule the I/O operation and pass a continuation.

Reactive Execution Model and I/O Threads

The Mutiny event-driven paradigm is tailored for this. When the I/O operation completes successfully, the Uni that represents it emits an item event. When it fails, it emits a failure event. The continuation is simply and naturally expressed using the event-driven API.

Mutiny through Examples

Many Quarkus extensions expose Mutiny APIs. In this section, we use the MongoDB extension to illustrate how to use Mutiny.

Let’s imagine a simple structure representing an element from the periodic table:

public class Element {

   public String name;
   public String symbol;
   public int position;

   public Element(String name, String symbol, int position) {
       this.name = name;
       this.symbol = symbol;
       this.position = position;
   }

   public Element() {
        // Use by JSON mappers
   }
}

This structure contains the name, symbol, and position of the element. To retrieve and store elements into a Mongo collection, you can use the following code:

@ApplicationScoped
public class ElementService {

   final ReactiveMongoCollection<Element> collection;

   @Inject
   ElementService(ReactiveMongoClient client) {
       collection = client.getDatabase("quarkus")
               .getCollection("elements", Element.class);
   }

   public void add(Element element) {
       Uni<InsertOneResult> insertion = collection.insertOne(element);
       insertion
           .onItem().transform(r -> r.getInsertedId().asString())
           .subscribe().with(
               result -> System.out.println("inserted " + result),
               failure -> System.out.println("D'oh" + failure));
   }

   public void getAll() {
       collection.find()
           .subscribe().with(
              element -> System.out.println("Element: " + element),
             failure -> System.out.println("D'oh! " + failure),
             () -> System.out.println("No more elements")
       );
   }

}

First, the Mongo client is injected. Note that it uses the reactive variant (io.quarkus.mongodb.reactive.ReactiveMongoClient). In the initialize method, we retrieve and store the collection in which elements will be inserted.

The add method inserts an element in the collection. It receives the element as a parameter and uses the reactive API of the collection. Interacting with the database involves I/Os. The reactive principles forbid blocking while waiting for the interaction to complete. Instead, we schedule the operation and pass a continuation. The insertOne method returns a Uni, i.e., an asynchronous operation. That’s the scheduled I/O. We now need to express the continuation, which is done using the .onItem() method. .onItem() allows configuring what needs to happen when the observed Uni emits an item, in our case when the scheduled I/Os completes. In this example, we extract the inserted document id. The final step is the subscription. Without it, nothing would ever happen. Subscribing triggers the operation. The subscription method can also define handlers: the id value on success, or a failure when the insertion fails.

Let’s now look at the second method. It retrieves all the stored elements. In this case, it returns multiple items (one per stored element), so we are using a Multi. As for the insertion, getting the stored elements involves I/Os. The find is our operation. As for Uni, you need to subscribe to trigger the operation. The subscriber receives item events, a failure event, or a completion event when all the elements have been received.

Subscribing to a Uni or a Multi is essential, as without it, the operation is never executed. In Quarkus some extensions deal with the subscription for you. For example, in RESTEasy Reactive your HTTP methods can return a Uni or a Multi, and RESTEasy Reactive handles the subscription.

Mutiny Patterns

The example from the last section was simplistic on purpose. Let’s have a look at a few common patterns.

Observing events

You can observe the various kind of events using:

on{event}().invoke(ev → System.out.println(ev));

For example, for items use: onItem().invoke(item → …​);

For failure, use: onFailure().invoke(failure → …​)

The invoke method is synchronous. Sometimes you need to execute an asynchronous action. In this case use call, as in: onItem().call(item → someAsyncAction(item)). Note that call does not change the item, it just calls an asynchronous action, and when this one completes, it emits the original item downstream.

Transforming item

The first instrumental pattern consists of transforming the item events you receive. As we have seen in the previous section, it could indicate the successful insertion, or the elements stored in the database.

Transforming an item is done using: onItem().transform(item → …​.).

More details about transformation can be found in the Mutiny documentation.

Sequential composition

Sequential composition allows chaining dependent asynchronous operations. This is achieved using onItem().transformToUni(item → …​). Unlike transform, the function passed to transformToUni returns a Uni.

Uni<String> uni1 = …
uni1
.onItem().transformtoUni(item -> anotherAsynchronousAction(item));

More details about asynchronous transformation can be found in the Mutiny documentation.

Failure handling

So far we only handle the item events, but handling failure is essential. You can handle failures using onFailure().

For example, you can recover with a fallback item using onFailure().recoverWithItem(fallback):

Uni<String> uni1 = …
uni1
.onFailure().recoverWithItem(“my fallback value”);

You can also retry the operation such as in:

Uni<String> uni1 = …
uni1
.onFailure().retry().atMost(5);

More info about failure recovery can be found on the handling failure documentation and the retrying on failures documentation.

Events and Actions

The following tables list the events that you can receive for Uni and Multi. Each of them is associated with its own group (onX). The second table lists the classic action you can do upon an event. Note that some groups offer more possibilities.

Events from the upstream Events from the downstream

Uni

Subscription (1), Item (0..1), failure (0..1)

Cancellation

Multi

Subscription (1), Item (0..n), failure (0..1), completion (0..1)

Cancellation, Request

Check the full list of events on the event documentation.

Action API Comment

transform

onItem().transform(Function<I, O> function);

Transform the event into another event using a synchronous function. The downstream receives the result of the function (or a failure if the transformation failed).

transformToUni

onItem().transformToUni(Function<I, Uni<O>> function);

Transform the event into another event using an asynchronous function. The downstream receives the item emitted by the produced Uni (or a failure if the transformation failed). If the produced Uni emits a failure, that failure is passed to the downstream.

invoke

onItem().invoke(Consumer<I> consumer)

Invokes the synchronous consumer. This is particularly convenient to execute side effects actions. The downstream receives the original event, or a failure if the consumer throws an exception

call

onItem().call(Function<I, Uni<?>>)

Invokes the asynchronous function. This is particularly convenient to execute asynchronous side effect actions.The downstream receives the original event, or a failure if the consumer throws an exception or if the produced Uni emits a failure.

fail

onItem().failWith(Function<I, Throwable>)

Emits a failure when it receives the event.

complete (Multi only)

onItem().complete()

Emits the completion event when it receives the event.

Other patterns

Mutiny provides lots of other features. Head over to the Mutiny documentation to see many more examples, including the whole list of events and how to handle them.

Some frequently asked guides are the following:

Shortcuts

When using Uni, having to write onItem() can be cumbersome. Fortunately, Mutiny provides a set of shortcut to make your code more concise:

Shortcut Equivalent

uni.map(x → y)

uni.onItem().transform(x → y)

uni.flatMap(x → uni2)

uni.onItem().transformToUni(x → uni2)

uni.chain(x → uni2)

uni.onItem().transformToUni(x → uni2)

uni.invoke(x → System.out.println(x))

uni.onItem().invoke(x → System.out.println(x))

uni.call(x → uni2)

uni.onItem().call(x → uni2)

uni.eventually(() → System.out.println("eventually"))

uni.onItemOrFailure().invoke((ignoredItem, ignoredException) → System.out.println("eventually"))

uni.eventually(() → uni2)

uni.onItemOrFailure().call((ignoredItem, ignoredException) → uni2)

uni.replaceWith(x)

uni.onItem().transform(ignored → x)

uni.replaceWith(uni2)

uni.onItem().transformToUni(ignored → uni2)

uni.replaceIfNullWith(x)

uni.onItem().ifNull().continueWith(x)

Reactive Streams

Mutiny uses Reactive Streams. Multi implements Publisher and enforces the back-pressure protocol. Emissions are constrained by the request emitted from the downstream subscribers. Thus, it does not overload the subscribers. Note that in some cases, you can’t follow this protocol (because the Multi emits events that can’t be controlled, such as time, or measures sensors). In this case, Mutiny provides a way to control the overflow using onOverflow().

Uni does not implement Reactive Streams Publisher. A Uni can only emit one event, so subscribing to the Uni is enough to express your intent to use the result and does not need the request protocol ceremony.

Mutiny and Vert.x

Vert.x is a toolkit to build reactive applications and systems. It provides a huge ecosystem of libraries following the reactive principles (i.e., non-blocking and asynchronous). Vert.x is an essential part of Quarkus, as it provides its reactive capabilities.

In addition, the whole Vert.x API can be used with Quarkus. To provide a cohesive experience, the Vert.x API is also available using a Mutiny variant, i.e., returns Uni and Multi.

More details about this API can be found on: https://quarkus.io/blog/mutiny-vertx/.

Mutiny Integration in Quarkus

The integration of Mutiny in Quarkus goes beyond just the library. Mutiny exposes hooks that allow Quarkus and Mutiny to be closely integrated:

  • Calling await or toIterable would fail if you are running on an I/O thread, preventing blocking the I/O thread;

  • The log() operator use the Quarkus logger;

  • The default Mutiny thread pool is the Quarkus worker thread pool;

  • Context Propagation is enabled by default when using Mutiny Uni and Multi

More details about the infrastructure integration can be found on https://smallrye.io/smallrye-mutiny/latest/guides/framework-integration/.