Skip to content

Commit

Permalink
Refactor to asynchronous connect #640
Browse files Browse the repository at this point in the history
Lettuce now uses fully asynchronous connects for initial connection, address resolution and Sentinel/Redis Cluster connect. This change eliminates all blocking bits in the connection initialization for:

* Redis Standalone
* Redis Pub/Pub
* Redis Sentinel (not Master/Slave)
* Redis Cluster
* Redis Cluster Pub/Sub

RedisClusterClient now also exposes connectAsync(…) and connect connectPubSubAsync(…). Connection disposal also exposes asynchronous methods via closeAsync() returning a CompletableFuture.
  • Loading branch information
mp911de committed Nov 7, 2017
1 parent ea4297a commit 8cfb2e8
Show file tree
Hide file tree
Showing 35 changed files with 792 additions and 422 deletions.
69 changes: 58 additions & 11 deletions src/main/java/io/lettuce/core/AbstractRedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import reactor.core.publisher.Mono;
import io.lettuce.core.Transports.NativeTransports;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.protocol.ConnectionWatchdog;
Expand Down Expand Up @@ -139,7 +139,7 @@ public void setDefaultTimeout(long timeout, TimeUnit unit) {
* @param connectionBuilder connection builder to configure the connection
* @param redisURI URI of the redis instance
*/
protected void connectionBuilder(Supplier<SocketAddress> socketAddressSupplier, ConnectionBuilder connectionBuilder,
protected void connectionBuilder(Mono<SocketAddress> socketAddressSupplier, ConnectionBuilder connectionBuilder,
RedisURI redisURI) {

Bootstrap redisBootstrap = new Bootstrap();
Expand Down Expand Up @@ -237,6 +237,33 @@ protected <T> T getConnection(ConnectionFuture<T> connectionFuture) {
}
}

/**
* Retrieve the connection from {@link ConnectionFuture}. Performs a blocking {@link ConnectionFuture#get()} to synchronize
* the channel/connection initialization. Any exception is rethrown as {@link RedisConnectionException}.
*
* @param connectionFuture must not be null.
* @param <T> Connection type.
* @return the connection.
* @throws RedisConnectionException in case of connection failures.
* @since 5.0
*/
protected <T> T getConnection(CompletableFuture<T> connectionFuture) {

try {
return connectionFuture.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw RedisConnectionException.create(e);
} catch (Exception e) {

if (e instanceof ExecutionException) {
throw RedisConnectionException.create(e.getCause());
}

throw RedisConnectionException.create(e);
}
}

/**
* Connect and initialize a channel from {@link ConnectionBuilder}.
*
Expand All @@ -248,15 +275,33 @@ protected <T> T getConnection(ConnectionFuture<T> connectionFuture) {
protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initializeChannelAsync(
ConnectionBuilder connectionBuilder) {

SocketAddress redisAddress = connectionBuilder.socketAddress();
Mono<SocketAddress> socketAddressSupplier = connectionBuilder.socketAddress();

if (clientResources.eventExecutorGroup().isShuttingDown()) {
throw new IllegalStateException("Cannot connect, Event executor group is terminated.");
}

CompletableFuture<SocketAddress> socketAddressFuture = new CompletableFuture<>();
CompletableFuture<Channel> channelReadyFuture = new CompletableFuture<>();

socketAddressSupplier.doOnError(socketAddressFuture::completeExceptionally).doOnNext(socketAddressFuture::complete)
.subscribe(redisAddress -> {

if (channelReadyFuture.isCancelled()) {
return;
}
initializeChannelAsync0(connectionBuilder, channelReadyFuture, redisAddress);
}, channelReadyFuture::completeExceptionally);

return new DefaultConnectionFuture<>(socketAddressFuture, channelReadyFuture.thenApply(channel -> (T) connectionBuilder
.connection()));
}

private void initializeChannelAsync0(ConnectionBuilder connectionBuilder, CompletableFuture<Channel> channelReadyFuture,
SocketAddress redisAddress) {

logger.debug("Connecting to Redis at {}", redisAddress);

CompletableFuture<Channel> channelReadyFuture = new CompletableFuture<>();
Bootstrap redisBootstrap = connectionBuilder.bootstrap();

RedisChannelInitializer initializer = connectionBuilder.build();
Expand All @@ -266,6 +311,14 @@ protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initia
CompletableFuture<Boolean> initFuture = initializer.channelInitialized();
ChannelFuture connectFuture = redisBootstrap.connect(redisAddress);

channelReadyFuture.whenComplete((c, t) -> {

if (t instanceof CancellationException) {
connectFuture.cancel(true);
initFuture.cancel(true);
}
});

connectFuture.addListener(future -> {

if (!future.isSuccess()) {
Expand All @@ -279,6 +332,7 @@ protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initia
initFuture.whenComplete((success, throwable) -> {

if (throwable == null) {

logger.debug("Connecting to Redis at {}: Success", redisAddress);
RedisChannelHandler<?, ?> connection = connectionBuilder.connection();
connection.registerCloseables(closeableResources, connection);
Expand All @@ -299,15 +353,8 @@ protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initia
failure = throwable;
}
channelReadyFuture.completeExceptionally(failure);

CompletableFuture<Boolean> response = new CompletableFuture<>();
response.completeExceptionally(failure);

});
});

return new DefaultConnectionFuture<T>(redisAddress, channelReadyFuture.thenApply(channel -> (T) connectionBuilder
.connection()));
}

/**
Expand Down
9 changes: 5 additions & 4 deletions src/main/java/io/lettuce/core/ConnectionBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.function.Supplier;

import reactor.core.publisher.Mono;
import io.lettuce.core.codec.Utf8StringCodec;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.protocol.*;
Expand All @@ -42,7 +43,7 @@ public class ConnectionBuilder {
private static final Supplier<AsyncCommand<?, ?, ?>> PING_COMMAND_SUPPLIER = () -> new AsyncCommand<>(
INITIALIZING_CMD_BUILDER.ping());

private Supplier<SocketAddress> socketAddressSupplier;
private Mono<SocketAddress> socketAddressSupplier;
private ConnectionEvents connectionEvents;
private RedisChannelHandler<?, ?> connection;
private Endpoint endpoint;
Expand Down Expand Up @@ -122,14 +123,14 @@ public RedisChannelInitializer build() {
return new PlainChannelInitializer(pingCommandSupplier, this::buildHandlers, clientResources, timeout);
}

public ConnectionBuilder socketAddressSupplier(Supplier<SocketAddress> socketAddressSupplier) {
public ConnectionBuilder socketAddressSupplier(Mono<SocketAddress> socketAddressSupplier) {
this.socketAddressSupplier = socketAddressSupplier;
return this;
}

public SocketAddress socketAddress() {
public Mono<SocketAddress> socketAddress() {
LettuceAssert.assertState(socketAddressSupplier != null, "SocketAddressSupplier must be set");
return socketAddressSupplier.get();
return socketAddressSupplier;
}

public ConnectionBuilder timeout(Duration timeout) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/lettuce/core/ConnectionFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static <T> ConnectionFuture<T> from(SocketAddress remoteAddress, CompletableFutu
/**
* Return the remote {@link SocketAddress}.
*
* @return the remote {@link SocketAddress}.
* @return the remote {@link SocketAddress}. May be {@literal null} until the socket address is resolved.
*/
SocketAddress getRemoteAddress();

Expand Down
15 changes: 13 additions & 2 deletions src/main/java/io/lettuce/core/DefaultConnectionFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,28 @@
*/
class DefaultConnectionFuture<T> extends CompletableFuture<T> implements ConnectionFuture<T> {

private final SocketAddress remoteAddress;
private final CompletableFuture<SocketAddress> remoteAddress;
private final CompletableFuture<T> delegate;

public DefaultConnectionFuture(SocketAddress remoteAddress, CompletableFuture<T> delegate) {

this.remoteAddress = CompletableFuture.completedFuture(remoteAddress);
this.delegate = delegate;
}

public DefaultConnectionFuture(CompletableFuture<SocketAddress> remoteAddress, CompletableFuture<T> delegate) {

this.remoteAddress = remoteAddress;
this.delegate = delegate;
}

public SocketAddress getRemoteAddress() {
return remoteAddress;

if (remoteAddress.isDone() && !remoteAddress.isCompletedExceptionally()) {
return remoteAddress.join();
}

return null;
}

private <U> DefaultConnectionFuture<U> adopt(CompletableFuture<U> newFuture) {
Expand Down
68 changes: 52 additions & 16 deletions src/main/java/io/lettuce/core/RedisChannelHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.internal.LettuceAssert;
Expand All @@ -42,14 +44,22 @@
public abstract class RedisChannelHandler<K, V> implements Closeable, ConnectionFacade {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisChannelHandler.class);
private static final AtomicIntegerFieldUpdater<RedisChannelHandler> CLOSED = AtomicIntegerFieldUpdater.newUpdater(
RedisChannelHandler.class, "closed");

private static final int ST_OPEN = 0;
private static final int ST_CLOSED = 1;

private Duration timeout;
private CloseEvents closeEvents = new CloseEvents();

private final RedisChannelWriter channelWriter;
private final boolean debugEnabled = logger.isDebugEnabled();
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();

private volatile boolean closed;
// accessed via CLOSED
@SuppressWarnings("unused")
private volatile int closed = ST_OPEN;
private volatile boolean active = true;
private volatile ClientOptions clientOptions;

Expand Down Expand Up @@ -96,27 +106,55 @@ public void setTimeout(long timeout, TimeUnit unit) {
}

/**
* Close the connection.
* Close the connection (synchronous).
*/
@Override
public synchronized void close() {
public void close() {

if (debugEnabled) {
logger.debug("close()");
}

if (closed) {
closeAsync().join();
}

/**
* Close the connection (asynchronous).
*
* @since 5.1
*/
public CompletableFuture<Void> closeAsync() {

if (debugEnabled) {
logger.debug("closeAsync()");
}

if (CLOSED.get(this) == ST_CLOSED) {
logger.warn("Connection is already closed");
return;
return closeFuture;
}

if (!closed) {
if (CLOSED.compareAndSet(this, ST_OPEN, ST_CLOSED)) {

active = false;
closed = true;
channelWriter.close();
closeEvents.fireEventClosed(this);
closeEvents = new CloseEvents();
CompletableFuture<Void> future = channelWriter.closeAsync();

future.whenComplete((v, t) -> {

closeEvents.fireEventClosed(this);
closeEvents = new CloseEvents();

if (t != null) {
closeFuture.completeExceptionally(t);
} else {
closeFuture.complete(v);
}
});
} else {
logger.warn("Connection is already closed (concurrently)");
}

return closeFuture;
}

protected <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> cmd) {
Expand All @@ -143,7 +181,8 @@ protected <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> cmd) {
* @param registry registry of closeables
* @param closeables closeables to register
*/
public void registerCloseables(final Collection<Closeable> registry, final Closeable... closeables) {
public void registerCloseables(final Collection<Closeable> registry, Closeable... closeables) {

registry.addAll(Arrays.asList(closeables));

addListener(resource -> {
Expand All @@ -170,19 +209,18 @@ protected void addListener(CloseEvents.CloseListener listener) {
}

/**
*
* @return true if the connection is closed (final state in the connection lifecyle).
*/
public boolean isClosed() {
return closed;
return CLOSED.get(this) == ST_CLOSED;
}

/**
* Notification when the connection becomes active (connected).
*/
public void activated() {
active = true;
closed = false;
CLOSED.set(this, ST_OPEN);
}

/**
Expand All @@ -193,15 +231,13 @@ public void deactivated() {
}

/**
*
* @return the channel writer
*/
public RedisChannelWriter getChannelWriter() {
return channelWriter;
}

/**
*
* @return true if the connection is active and not closed.
*/
public boolean isOpen() {
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/io/lettuce/core/RedisChannelWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.Closeable;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;

import io.lettuce.core.protocol.ConnectionFacade;
import io.lettuce.core.protocol.RedisCommand;
Expand Down Expand Up @@ -55,6 +56,14 @@ public interface RedisChannelWriter extends Closeable {
@Override
void close();

/**
* Asynchronously close the {@link RedisChannelWriter}.
*
* @return future for result synchronization.
* @since 5.1
*/
CompletableFuture<Void> closeAsync();

/**
* Reset the writer state. Queued commands will be canceled and the internal state will be reset. This is useful when the
* internal state machine gets out of sync with the connection.
Expand Down
Loading

0 comments on commit 8cfb2e8

Please sign in to comment.