Skip to content

Commit

Permalink
Map async connection exception to RedisConnectionException #708
Browse files Browse the repository at this point in the history
Lettuce now maps connection exceptions to RedisConnectionException retaining connection context to align with synchronous connect methods.
  • Loading branch information
mp911de committed Feb 21, 2018
1 parent 5d783ef commit 08faf45
Show file tree
Hide file tree
Showing 8 changed files with 278 additions and 29 deletions.
2 changes: 2 additions & 0 deletions src/main/java/io/lettuce/core/ConnectionFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ <U> ConnectionFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> othe
@Override
<U> ConnectionFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);

<U> ConnectionFuture<U> thenCompose(BiFunction<? super T, ? super Throwable, ? extends CompletionStage<U>> fn);

@Override
<U> ConnectionFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);

Expand Down
30 changes: 30 additions & 0 deletions src/main/java/io/lettuce/core/DefaultConnectionFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,36 @@ public <U> DefaultConnectionFuture<U> thenCompose(Function<? super T, ? extends
return adopt(delegate.thenCompose(fn));
}

@Override
public <U> ConnectionFuture<U> thenCompose(BiFunction<? super T, ? super Throwable, ? extends CompletionStage<U>> fn) {

CompletableFuture<U> future = new CompletableFuture<>();

delegate.whenComplete((v, e) -> {

try {
CompletionStage<U> apply = fn.apply(v, e);
apply.whenComplete((u, t) -> {

if (t != null) {
future.completeExceptionally(t);
} else {
future.complete(u);
}
});
} catch (Exception ex) {
ExecutionException result = new ExecutionException("Exception while applying thenCompose", ex);

if (e != null) {
result.addSuppressed(e);
}
future.completeExceptionally(result);
}
});

return adopt(future);
}

@Override
public <U> DefaultConnectionFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) {
return adopt(delegate.thenComposeAsync(fn));
Expand Down
56 changes: 38 additions & 18 deletions src/main/java/io/lettuce/core/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;

import reactor.core.publisher.Mono;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.internal.Futures;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.output.StatusOutput;
import io.lettuce.core.protocol.*;
Expand Down Expand Up @@ -199,7 +201,8 @@ public StatefulRedisConnection<String, String> connect() {
public <K, V> StatefulRedisConnection<K, V> connect(RedisCodec<K, V> codec) {

checkForRedisURI();
return connectStandalone(codec, this.redisURI, timeout);

return getConnection(connectStandaloneAsync(codec, this.redisURI, timeout));
}

/**
Expand All @@ -211,7 +214,8 @@ public <K, V> StatefulRedisConnection<K, V> connect(RedisCodec<K, V> codec) {
public StatefulRedisConnection<String, String> connect(RedisURI redisURI) {

assertNotNull(redisURI);
return connectStandalone(newStringStringCodec(), redisURI, redisURI.getTimeout());

return getConnection(connectStandaloneAsync(newStringStringCodec(), redisURI, redisURI.getTimeout()));
}

/**
Expand All @@ -227,7 +231,8 @@ public StatefulRedisConnection<String, String> connect(RedisURI redisURI) {
public <K, V> StatefulRedisConnection<K, V> connect(RedisCodec<K, V> codec, RedisURI redisURI) {

assertNotNull(redisURI);
return connectStandalone(codec, redisURI, redisURI.getTimeout());

return getConnection(connectStandaloneAsync(codec, redisURI, redisURI.getTimeout()));
}

/**
Expand All @@ -244,20 +249,8 @@ public <K, V> StatefulRedisConnection<K, V> connect(RedisCodec<K, V> codec, Redi
public <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectAsync(RedisCodec<K, V> codec, RedisURI redisURI) {

assertNotNull(redisURI);
return connectStandaloneAsync(codec, redisURI, redisURI.getTimeout());
}

private <K, V> StatefulRedisConnection<K, V> connectStandalone(RedisCodec<K, V> codec, RedisURI redisURI, Duration timeout) {
return getConnection(connectStandaloneAsync(codec, redisURI, timeout));
}

// Required by ReflectiveNodeConnectionFactory.
@SuppressWarnings("unused")
private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandaloneAsync(RedisCodec<K, V> codec,
RedisURI redisURI) {

assertNotNull(redisURI);
return connectStandaloneAsync(codec, redisURI, redisURI.getTimeout());
return transformAsyncConnectionException(connectStandaloneAsync(codec, redisURI, redisURI.getTimeout()));
}

private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandaloneAsync(RedisCodec<K, V> codec,
Expand Down Expand Up @@ -421,7 +414,7 @@ public <K, V> ConnectionFuture<StatefulRedisPubSubConnection<K, V>> connectPubSu
RedisURI redisURI) {

assertNotNull(redisURI);
return connectPubSubAsync(codec, redisURI, redisURI.getTimeout());
return transformAsyncConnectionException(connectPubSubAsync(codec, redisURI, redisURI.getTimeout()));
}

private <K, V> ConnectionFuture<StatefulRedisPubSubConnection<K, V>> connectPubSubAsync(RedisCodec<K, V> codec,
Expand Down Expand Up @@ -484,6 +477,7 @@ public <K, V> StatefulRedisSentinelConnection<K, V> connectSentinel(RedisCodec<K
public StatefulRedisSentinelConnection<String, String> connectSentinel(RedisURI redisURI) {

assertNotNull(redisURI);

return getConnection(connectSentinelAsync(newStringStringCodec(), redisURI, redisURI.getTimeout()));
}

Expand All @@ -500,6 +494,7 @@ public StatefulRedisSentinelConnection<String, String> connectSentinel(RedisURI
public <K, V> StatefulRedisSentinelConnection<K, V> connectSentinel(RedisCodec<K, V> codec, RedisURI redisURI) {

assertNotNull(redisURI);

return getConnection(connectSentinelAsync(codec, redisURI, redisURI.getTimeout()));
}

Expand All @@ -519,7 +514,8 @@ public <K, V> CompletableFuture<StatefulRedisSentinelConnection<K, V>> connectSe
RedisURI redisURI) {

assertNotNull(redisURI);
return connectSentinelAsync(codec, redisURI, redisURI.getTimeout());

return transformAsyncConnectionException(connectSentinelAsync(codec, redisURI, redisURI.getTimeout()), redisURI);
}

private <K, V> CompletableFuture<StatefulRedisSentinelConnection<K, V>> connectSentinelAsync(RedisCodec<K, V> codec,
Expand Down Expand Up @@ -751,6 +747,30 @@ private Mono<SocketAddress> lookupRedis(RedisURI sentinelUri) {
.then(Mono.just(it))));
}

private static <T> ConnectionFuture<T> transformAsyncConnectionException(ConnectionFuture<T> future) {

return future.thenCompose((v, e) -> {

if (e != null) {
return Futures.failed(RedisConnectionException.create(future.getRemoteAddress(), e));
}

return CompletableFuture.completedFuture(v);
});
}

private static <T> CompletableFuture<T> transformAsyncConnectionException(CompletionStage<T> future, RedisURI target) {

return ConnectionFuture.from(null, future.toCompletableFuture()).thenCompose((v, e) -> {

if (e != null) {
return Futures.failed(RedisConnectionException.create(target.toString(), e));
}

return CompletableFuture.completedFuture(v);
}).toCompletableFuture();
}

private static void checkValidRedisURI(RedisURI redisURI) {

LettuceAssert.notNull(redisURI, "A valid RedisURI is required");
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/io/lettuce/core/RedisConnectionException.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,18 @@ public RedisConnectionException(String msg, Throwable cause) {
* @since 4.4
*/
public static RedisConnectionException create(SocketAddress remoteAddress, Throwable cause) {
return create(remoteAddress == null ? null : remoteAddress.toString(), cause);
}

/**
* Create a new {@link RedisConnectionException} given {@code remoteAddress} and the {@link Throwable cause}.
*
* @param remoteAddress remote address.
* @param cause the nested exception.
* @return the {@link RedisConnectionException}.
* @since 5.1
*/
public static RedisConnectionException create(String remoteAddress, Throwable cause) {

if (remoteAddress == null) {

Expand Down
22 changes: 18 additions & 4 deletions src/main/java/io/lettuce/core/cluster/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.net.URI;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -345,7 +346,7 @@ public <K, V> StatefulRedisClusterConnection<K, V> connect(RedisCodec<K, V> code
initializePartitions();
}

return getConnection(connectAsync(codec));
return getConnection(connectClusterAsync(codec));
}

/**
Expand All @@ -371,7 +372,7 @@ public <K, V> StatefulRedisClusterConnection<K, V> connect(RedisCodec<K, V> code
*/
@SuppressWarnings("unchecked")
public <K, V> CompletableFuture<StatefulRedisClusterConnection<K, V>> connectAsync(RedisCodec<K, V> codec) {
return connectClusterAsync(codec);
return transformAsyncConnectionException(connectClusterAsync(codec), getInitialUris());
}

/**
Expand Down Expand Up @@ -419,7 +420,7 @@ public <K, V> StatefulRedisClusterPubSubConnection<K, V> connectPubSub(RedisCode
initializePartitions();
}

return getConnection(connectPubSubAsync(codec));
return getConnection(connectClusterPubSubAsync(codec));
}

/**
Expand All @@ -445,7 +446,7 @@ public <K, V> StatefulRedisClusterPubSubConnection<K, V> connectPubSub(RedisCode
*/
@SuppressWarnings("unchecked")
public <K, V> CompletableFuture<StatefulRedisClusterPubSubConnection<K, V>> connectPubSubAsync(RedisCodec<K, V> codec) {
return connectClusterPubSubAsync(codec);
return transformAsyncConnectionException(connectClusterPubSubAsync(codec), getInitialUris());
}

StatefulRedisConnection<String, String> connectToNode(SocketAddress socketAddress) {
Expand Down Expand Up @@ -1104,6 +1105,19 @@ boolean expireStaleConnections() {
return getClusterClientOptions() == null || getClusterClientOptions().isCloseStaleConnections();
}

protected static <T> CompletableFuture<T> transformAsyncConnectionException(CompletionStage<T> future,
Iterable<RedisURI> target) {

return ConnectionFuture.from(null, future.toCompletableFuture()).thenCompose((v, e) -> {

if (e != null) {
return Futures.failed(RedisConnectionException.create(target.toString(), e));
}

return CompletableFuture.completedFuture(v);
}).toCompletableFuture();
}

private static <K, V> void assertNotNull(RedisCodec<K, V> codec) {
LettuceAssert.notNull(codec, "RedisCodec must not be null");
}
Expand Down
57 changes: 51 additions & 6 deletions src/main/java/io/lettuce/core/masterslave/MasterSlave.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;

import io.lettuce.core.ConnectionFuture;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisConnectionException;
import io.lettuce.core.RedisURI;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.internal.Futures;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.internal.LettuceLists;

Expand Down Expand Up @@ -99,7 +101,12 @@ public class MasterSlave {
*/
public static <K, V> StatefulRedisMasterSlaveConnection<K, V> connect(RedisClient redisClient, RedisCodec<K, V> codec,
RedisURI redisURI) {
return getConnection(connectAsync(redisClient, codec, redisURI));

LettuceAssert.notNull(redisClient, "RedisClient must not be null");
LettuceAssert.notNull(codec, "RedisCodec must not be null");
LettuceAssert.notNull(redisURI, "RedisURI must not be null");

return getConnection(connectAsyncSentinelOrAutodiscovery(redisClient, codec, redisURI), redisURI);
}

/**
Expand All @@ -120,6 +127,11 @@ public static <K, V> StatefulRedisMasterSlaveConnection<K, V> connect(RedisClien
*/
public static <K, V> CompletableFuture<StatefulRedisMasterSlaveConnection<K, V>> connectAsync(RedisClient redisClient,
RedisCodec<K, V> codec, RedisURI redisURI) {
return transformAsyncConnectionException(connectAsyncSentinelOrAutodiscovery(redisClient, codec, redisURI), redisURI);
}

private static <K, V> CompletableFuture<StatefulRedisMasterSlaveConnection<K, V>> connectAsyncSentinelOrAutodiscovery(
RedisClient redisClient, RedisCodec<K, V> codec, RedisURI redisURI) {

LettuceAssert.notNull(redisClient, "RedisClient must not be null");
LettuceAssert.notNull(codec, "RedisCodec must not be null");
Expand Down Expand Up @@ -150,7 +162,7 @@ public static <K, V> CompletableFuture<StatefulRedisMasterSlaveConnection<K, V>>
*/
public static <K, V> StatefulRedisMasterSlaveConnection<K, V> connect(RedisClient redisClient, RedisCodec<K, V> codec,
Iterable<RedisURI> redisURIs) {
return getConnection(connectAsync(redisClient, codec, redisURIs));
return getConnection(connectAsyncSentinelOrStaticSetup(redisClient, codec, redisURIs), redisURIs);
}

/**
Expand All @@ -171,6 +183,11 @@ public static <K, V> StatefulRedisMasterSlaveConnection<K, V> connect(RedisClien
*/
public static <K, V> CompletableFuture<StatefulRedisMasterSlaveConnection<K, V>> connectAsync(RedisClient redisClient,
RedisCodec<K, V> codec, Iterable<RedisURI> redisURIs) {
return transformAsyncConnectionException(connectAsyncSentinelOrStaticSetup(redisClient, codec, redisURIs), redisURIs);
}

private static <K, V> CompletableFuture<StatefulRedisMasterSlaveConnection<K, V>> connectAsyncSentinelOrStaticSetup(
RedisClient redisClient, RedisCodec<K, V> codec, Iterable<RedisURI> redisURIs) {

LettuceAssert.notNull(redisClient, "RedisClient must not be null");
LettuceAssert.notNull(codec, "RedisCodec must not be null");
Expand All @@ -195,24 +212,52 @@ private static boolean isSentinel(RedisURI redisURI) {
* the channel/connection initialization. Any exception is rethrown as {@link RedisConnectionException}.
*
* @param connectionFuture must not be null.
* @param context context information (single RedisURI, multiple URIs), used as connection target in the reported exception.
* @param <T> Connection type.
* @return the connection.
* @throws RedisConnectionException in case of connection failures.
*/
private static <T> T getConnection(CompletableFuture<T> connectionFuture) {
private static <T> T getConnection(CompletableFuture<T> connectionFuture, Object context) {

try {
return connectionFuture.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw RedisConnectionException.create(null, e);
throw RedisConnectionException.create(context.toString(), e);
} catch (Exception e) {

if (e instanceof ExecutionException) {
throw RedisConnectionException.create(null, e.getCause());

// filter intermediate RedisConnectionException exceptions that bloat the stack trace
if (e.getCause() instanceof RedisConnectionException
&& e.getCause().getCause() instanceof RedisConnectionException) {
throw RedisConnectionException.create(context.toString(), e.getCause().getCause());
}

throw RedisConnectionException.create(context.toString(), e.getCause());
}

throw RedisConnectionException.create(null, e);
throw RedisConnectionException.create(context.toString(), e);
}
}

private static <T> CompletableFuture<T> transformAsyncConnectionException(CompletionStage<T> future, Object context) {

return ConnectionFuture
.from(null, future.toCompletableFuture())
.thenCompose((v, e) -> {

if (e != null) {

// filter intermediate RedisConnectionException exceptions that bloat the stack trace
if (e.getCause() instanceof RedisConnectionException
&& e.getCause().getCause() instanceof RedisConnectionException) {
return Futures.failed(RedisConnectionException.create(context.toString(), e.getCause()));
}
return Futures.failed(RedisConnectionException.create(context.toString(), e));
}

return CompletableFuture.completedFuture(v);
}).toCompletableFuture();
}
}
Loading

0 comments on commit 08faf45

Please sign in to comment.