-
Notifications
You must be signed in to change notification settings - Fork 987
Asynchronous API (4.0)
This guide will give you an impression how and when to use the asynchronous API provided by lettuce 4.x.
Asynchronous methodologies allow you to utilize better system resources, instead of wasting threads waiting for network or disk I/O. Threads can be fully utilized to perform other work instead. lettuce facilitates asynchronicity from building the client on top of netty that is a multithreaded, event-driven I/O framework. All communication is handled asynchronously. Once the foundation is able to processes commands concurrently, it is convenient to take advantage from the asynchronicity. It is way harder to turn a blocking and synchronous working software into a concurrently processing system.
Asynchronicity permits other processing to continue before the transmission has finished and the response of the transmission is processed. This means, in the context of lettuce and especially Redis, that multiple commands can be issued serially without the need of waiting to finish the preceding command. This mode of operation is also known as Pipelining. The following example should give you an impression of the mode of operation:
- Given client A and client B
- Client A triggers command
SET A=B
- Client B triggers at the same time of Client A command
SET C=D
- Redis receives command from Client A
- Redis receives command from Client B
- Redis processes
SET A=B
and respondsOK
to Client A - Client A receives the response and stores the response in the response handle
- Redis processes
SET C=D
and respondsOK
to Client B - Client B receives the response and stores the response in the response handle
Both clients from the example above can be either two threads or connections within an application or two physically separated clients.
Clients can operate concurrently to each other by either being separate processes, threads, event-loops, actors, fibers, etc. Redis processes incoming commands serially and operates mostly single-threaded. This means, commands are processed in the order they are received with some characteristic that we'll cover later.
Let's take the simplified example and enhance it by some program flow details:
- Given client A
- Client A triggers command
SET A=B
- Client A uses the asynchronous API and can perform other processing
- Redis receives command from Client A
- Redis processes
SET A=B
and respondsOK
to Client A - Client A receives the response and stores the response in the response handle
- Client A can access now the response to its command without waiting (non-blocking)
The Client A takes advantage from not waiting on the result of the command so it can process computational work or issue another Redis command. The client can work with the command result as soon as the response is available.
While this guide helps you to understand the asynchronous API it is worthwhile to learn the impact on the synchronous API. The general approach of the synchronous API is no different than the asynchronous API. In both cases, the same facilities are used to invoke and transport commands to the Redis server. The only difference is a blocking behavior of the caller that is using the synchronous API. Blocking happens on command level and affects only the command completion part, meaning multiple clients using the synchronous API can invoke commands on the same connection and at the same time without blocking each other. A call on the synchronous API is unblocked at the moment a command response was processed.
- Given client A and client B
- Client A triggers command
SET A=B
on the synchronous API and waits for the result - Client B triggers at the same time of Client A command
SET C=D
on the synchronous API and waits for the result - Redis receives command from Client A
- Redis receives command from Client B
- Redis processes
SET A=B
and respondsOK
to Client A - Client A receives the response and unblocks the program flow of Client A
- Redis processes
SET C=D
and respondsOK
to Client B - Client B receives the response and unblocks the program flow of Client B
Beginning with lettuce 4.0 two threads may share one connection with the use of the same or the different API. So one thread may use the synchronous API without affecting another thread using the asynchronous API.
However, there are some cases you should not share a connection among threads to avoid side-effects. The cases are:
- Disabling flush-after-command to improve performance
- The use of blocking operations like
BLPOP
. Blocking operations are queued on Redis until they can be executed. While one connection is blocked, other connections can issue commands to Redis. Once a command unblocks the blocking command (that said anLPUSH
orRPUSH
hits the list), the blocked connection is unblocked and can proceed after that. - Transactions
- Using multiple databases
Every command invocation on the asynchronous API creates a RedisFuture<T>
that can be canceled, awaited and subscribed (listener). A CompleteableFuture<T>
or RedisFuture<T>
is a pointer to the result that is initially unknown since the computation of its value is yet incomplete. A RedisFuture<T>
provides operations for synchronization and chaining.
CompletableFuture<String> future = new CompletableFuture<>();
System.out.println("Current state: " + future.isDone());
future.complete("my value");
System.out.println("Current state: " + future.isDone());
System.out.println("Got value: " + future.get());
The example prints the following lines:
Current state: false
Current state: true
Got value: my value
Attaching a listener to a future allows chaining. Promises can be used synonymous to futures, but not every future is a promise. A promise guarantees a callback/notification and thus it has come to its name.
A simple listener that gets called once the future completes:
final CompletableFuture<String> future = new CompletableFuture<>();
future.thenRun(new Runnable() {
@Override
public void run() {
try {
System.out.println("Got value: " + future.get());
} catch (Exception e) {
e.printStackTrace();
}
}
});
System.out.println("Current state: " + future.isDone());
future.complete("my value");
System.out.println("Current state: " + future.isDone());
The value processing moves from the caller into a listener that is then called by whoever completes the future. The example prints the following lines:
Current state: false
Got value: my value
Current state: true
The code from above requires exception handling since calls to the get()
method can lead to exceptions. Exceptions raised during the computation of the Future<T>
are transported within an ExecutionException
. Another exception that may be thrown is the InterruptedException
. This is because calls to get()
are blocking calls and the blocked thread can be interrupted at any time. Just think about a system shutdown.
The CompletionStage<T>
type allows since Java 8 a much more sophisticated handling of futures. A CompletionStage<T>
can consume, transform and build a chain of value processing. The code from above can be rewritten in Java 8 in the following style:
CompletableFuture<String> future = new CompletableFuture<>();
future.thenAccept(new Consumer<String>() {
@Override
public void accept(String value) {
System.out.println("Got value: " + value);
}
});
System.out.println("Current state: " + future.isDone());
future.complete("my value");
System.out.println("Current state: " + future.isDone());
The example prints the following lines:
Current state: false
Got value: my value
Current state: true
You can find the full reference for the CompletionStage<T>
type in the Java 8 API documentation.
lettuce futures can be used for initial and chaining operations. When using lettuce futures, you will notice the non-blocking behavior. This is because all I/O and command processing are handled asynchronously using the netty EventLoop. The lettuce RedisFuture<T>
extends a CompletionStage<T>
so all methods of the base type are available.
lettuce exposes its futures on the Standalone, Sentinel, Publish/Subscribe and Cluster APIs.
Connecting to Redis is insanely simple:
RedisClient client = RedisClient.create("redis://localhost");
RedisAsyncCommands<String, String> commands = client.connect().async();
In the next step, obtaining a value from a key requires the GET
operation:
RedisFuture<String> future = commands.get("key");
The first thing you want to do when working with futures is to consume them. Consuming a futures means obtaining the value. Here is an example that blocks the calling thread and prints the value:
RedisFuture<String> future = commands.get("key");
String value = future.get();
System.out.println(value);
Invocations to the get()
method (pull-style) block the calling thread at least until the value is computed but in the worst case indefinitely. Using timeouts is always a good idea to not exhaust your threads.
try {
RedisFuture<String> future = commands.get("key");
String value = future.get(1, TimeUnit.MINUTES);
System.out.println(value);
} catch (Exception e) {
e.printStackTrace();
}
The example will wait at most 1 minute for the future to complete. If the timeout exceeds, a TimeoutException
is thrown to signal the timeout.
Futures can also be consumed in a push style, meaning when the RedisFuture<T>
is completed, a follow-up action is triggered:
RedisFuture<String> future = commands.get("key");
future.thenAccept(new Consumer<String>() {
@Override
public void accept(String value) {
System.out.println(value);
}
});
Alternatively, written in Java 8 lambdas:
RedisFuture<String> future = commands.get("key");
future.thenAccept(System.out::println);
lettuce futures are completed on the netty EventLoop. Consuming and chaining futures on the default thread is always a good idea except for one case: Blocking/long-running operations. As a rule of thumb, never block the event loop. If you need to chain futures using blocking calls, use the thenAcceptAsync()
/thenRunAsync()
methods to fork the processing to another thread. The ...async()
methods need a threading infrastructure for execution, by default the ForkJoinPool.commonPool()
is used. The ForkJoinPool
is statically constructed and does not grow with increasing load. Using default Executor
s is almost always the better idea.
Executor sharedExecutor = ...
RedisFuture<String> future = commands.get("key");
future.thenAcceptAsync(new Consumer<String>() {
@Override
public void accept(String value) {
System.out.println(value);
}
}, sharedExecutor);
A key point when using futures is the synchronization. Futures are usually used to:
- Trigger multiple invocations without the urge to wait for the predecessors (Batching)
- Invoking a command without awaiting the result at all (Fire&Forget)
- Invoking a command and perform other computing in the meantime (Decoupling)
- Adding concurrency to certain computational efforts (Concurrency)
There are several ways how to wait or get notified in case a future completes. Certain synchronization techniques apply to some motivations why you want to use futures.
Blocking synchronization comes handy if you perform batching/add concurrency to certain parts of your system. An example to batching can be setting/retrieving multiple values and awaiting the results before a certain point within processing.
List<RedisFuture<String>> futures = new ArrayList<RedisFuture<String>>();
for (int i = 0; i < 10; i++) {
futures.add(commands.get("key-" + i, "value-" + i));
}
LettuceFutures.awaitAll(1, TimeUnit.MINUTES, futures.toArray(new RedisFuture[futures.size()]));
The code from above does not wait until a certain command completes before it issues another one. The synchronization is done after all commands are issued.
The example code can easily be turned into a Fire&Forget pattern by omitting the call to LettuceFutures.awaitAll()
.
A single future execution can be also awaited, meaning an opt-in to wait for a certain time but without raising an exception:
RedisFuture<String> future = commands.get("key");
if(!future.await(1, TimeUnit.MINUTES)) {
System.out.println("Could not complete within the timeout");
}
Calling await()
is friendlier to call since it throws only an InterruptedException
in case the blocked thread is interrupted. You are already familiar with the get()
method for synchronization, so we will not bother you with this one.
At last, there is another way to synchronize futures in a blocking way. The major caveat is that you will become responsible to handle thread interruptions. If you do not handle that aspect, you will not be able to shut down your system properly if it is in a running state.
RedisFuture<String> future = commands.get("key");
while (!future.isDone()) {
// do something ...
}
While the isDone()
method does not aim primarily for synchronization use, it might come handy to perform other computational efforts while the command is executed.
Futures can be synchronized/chained in a non-blocking style to improve thread utilization. Chaining works very well in systems relying on event-driven characteristics. Future chaining builds up a chain of one or more futures that are executed serially, and every chain member handles a part in the computation. The CompletionStage<T>
API offers various methods to chain and transform futures. A simple transformation of the value can be done using the thenApply()
method:
future.thenApply(new Function<String, Integer>() {
@Override
public Integer apply(String value) {
return value.length();
}
}).thenAccept(new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
System.out.println("Got value: " + integer);
}
});
Alternatively, written in Java 8 lambdas:
future.thenApply(String::length)
.thenAccept(integer -> System.out.println("Got value: " + integer));
The thenApply()
method accepts a function that transforms the value into another one. The final thenAccept()
method consumes the value for final processing.
You have already seen the thenRun()
method from previous examples. The thenRun()
method can be used to handle future completions in case the data is not crucial to your flow:
future.thenRun(new Runnable() {
@Override
public void run() {
System.out.println("Finished the future.");
}
});
Keep in mind to execute the Runnable
on a custom Executor
if you are doing blocking calls within the Runnable
.
Another chaining method worth mentioning is the either-or chaining. While either-or fits not fully to synchronization its worthwhile to see it in action.
A couple of ...Either()
methods are available on a CompletionStage<T>
, see the Java 8 API docs for the full reference. The either-or pattern consumes the value from the first future that is completed. A good example might be two services returning the same data, for instance, a Master-Slave scenario, but you want to return the data as fast as possible:
RedisStringAsyncCommands<String, String> master = masterClient.connect().async();
RedisStringAsyncCommands<String, String> slave = slaveClient.connect().async();
RedisFuture<String> future = master.get("key");
future.acceptEither(slave.get("key"), new Consumer<String>() {
@Override
public void accept(String value) {
System.out.println("Got value: " + value);
}
});
Error handling is an indispensable component of every real world application and should to be considered from the beginning on. Futures provide some mechanisms to deal with errors.
In general, you want to react in the following ways:
- Return a default value instead
- Use a backup future
- Retry the future
RedisFuture<T>
s transport exceptions if any occurred. Calls to the get()
method throw the occurred exception wrapped within an ExecutionException
(this is different to lettuce 3.x). You can find more details within the Javadoc on CompletionStage
The following code falls back to a default value after it runs to an exception by using the handle()
method:
future.handle(new BiFunction<String, Throwable, String>() {
@Override
public Integer apply(String value, Throwable throwable) {
if(throwable != null) {
return "default value";
}
return value;
}
}).thenAccept(new Consumer<String>() {
@Override
public void accept(String value) {
System.out.println("Got value: " + value);
}
});
More sophisticated code could decide on behalf of the throwable type that value to return, as the shortcut example using the exceptionally()
method:
future.exceptionally(new Function<Throwable, String>() {
@Override
public String apply(Throwable throwable) {
if (throwable instanceof IllegalStateException) {
return "default value";
}
return "other default value";
}
});
Retrying futures and recovery using futures is not part of the Java 8 CompleteableFuture<T>
. See the Reactive API for comfortable ways handling with exceptions.
Basic operations
RedisAsyncCommands<String, String> async = client.connect().async();
RedisFuture<String> set = async.set("key", "value");
RedisFuture<String> get = async.get("key");
set.get() == "OK"
get.get() == "value"
Waiting for a future with a timeout
RedisAsyncCommands<String, String> async = client.connect().async();
RedisFuture<String> set = async.set("key", "value");
RedisFuture<String> get = async.get("key");
set.await(1, SECONDS) == true
set.get() == "OK"
get.get(1, TimeUnit.MINUTES) == "value"
Using listeners for a future
RedisStringAsyncCommands<String, String> async = client.connect().async();
RedisFuture<String> set = async.set("key", "value");
Runnable listener = new Runnable() {
@Override
public void run() {
...;
}
};
set.thenRun(listener);
Lettuce documentation was moved to https://redis.github.io/lettuce/overview/
Intro
Getting started
- Getting started
- Redis URI and connection details
- Basic usage
- Asynchronous API
- Reactive API
- Publish/Subscribe
- Transactions/Multi
- Scripting and Functions
- Redis Command Interfaces
- FAQ
HA and Sharding
Advanced usage
- Configuring Client resources
- Client Options
- Dynamic Command Interfaces
- SSL Connections
- Native Transports
- Unix Domain Sockets
- Streaming API
- Events
- Command Latency Metrics
- Tracing
- Stateful Connections
- Pipelining/Flushing
- Connection Pooling
- Graal Native Image
- Custom commands
Integration and Extension
Internals