From 08faf456e37a76645de728ef2a27eddbcaab2466 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Wed, 21 Feb 2018 17:08:34 +0100 Subject: [PATCH] Map async connection exception to RedisConnectionException #708 Lettuce now maps connection exceptions to RedisConnectionException retaining connection context to align with synchronous connect methods. --- .../io/lettuce/core/ConnectionFuture.java | 2 + .../lettuce/core/DefaultConnectionFuture.java | 30 +++++ .../java/io/lettuce/core/RedisClient.java | 56 +++++--- .../core/RedisConnectionException.java | 12 ++ .../core/cluster/RedisClusterClient.java | 22 ++- .../lettuce/core/masterslave/MasterSlave.java | 57 +++++++- .../io/lettuce/core/ConnectionFutureTest.java | 126 ++++++++++++++++++ .../masterslave/MasterSlaveSentinelTest.java | 2 +- 8 files changed, 278 insertions(+), 29 deletions(-) create mode 100644 src/test/java/io/lettuce/core/ConnectionFutureTest.java diff --git a/src/main/java/io/lettuce/core/ConnectionFuture.java b/src/main/java/io/lettuce/core/ConnectionFuture.java index 0c7d60c45d..37f8a3a791 100644 --- a/src/main/java/io/lettuce/core/ConnectionFuture.java +++ b/src/main/java/io/lettuce/core/ConnectionFuture.java @@ -172,6 +172,8 @@ ConnectionFuture thenAcceptBothAsync(CompletionStage othe @Override ConnectionFuture thenCompose(Function> fn); + ConnectionFuture thenCompose(BiFunction> fn); + @Override ConnectionFuture thenComposeAsync(Function> fn); diff --git a/src/main/java/io/lettuce/core/DefaultConnectionFuture.java b/src/main/java/io/lettuce/core/DefaultConnectionFuture.java index cb1b56f841..fd9cab4a90 100644 --- a/src/main/java/io/lettuce/core/DefaultConnectionFuture.java +++ b/src/main/java/io/lettuce/core/DefaultConnectionFuture.java @@ -241,6 +241,36 @@ public DefaultConnectionFuture thenCompose(Function ConnectionFuture thenCompose(BiFunction> fn) { + + CompletableFuture future = new CompletableFuture<>(); + + delegate.whenComplete((v, e) -> { + + try { + CompletionStage apply = fn.apply(v, e); + apply.whenComplete((u, t) -> { + + if (t != null) { + future.completeExceptionally(t); + } else { + future.complete(u); + } + }); + } catch (Exception ex) { + ExecutionException result = new ExecutionException("Exception while applying thenCompose", ex); + + if (e != null) { + result.addSuppressed(e); + } + future.completeExceptionally(result); + } + }); + + return adopt(future); + } + @Override public DefaultConnectionFuture thenComposeAsync(Function> fn) { return adopt(delegate.thenComposeAsync(fn)); diff --git a/src/main/java/io/lettuce/core/RedisClient.java b/src/main/java/io/lettuce/core/RedisClient.java index 4168dfed96..2890981d18 100644 --- a/src/main/java/io/lettuce/core/RedisClient.java +++ b/src/main/java/io/lettuce/core/RedisClient.java @@ -23,12 +23,14 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; import java.util.function.Supplier; import reactor.core.publisher.Mono; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.codec.RedisCodec; import io.lettuce.core.codec.StringCodec; +import io.lettuce.core.internal.Futures; import io.lettuce.core.internal.LettuceAssert; import io.lettuce.core.output.StatusOutput; import io.lettuce.core.protocol.*; @@ -199,7 +201,8 @@ public StatefulRedisConnection connect() { public StatefulRedisConnection connect(RedisCodec codec) { checkForRedisURI(); - return connectStandalone(codec, this.redisURI, timeout); + + return getConnection(connectStandaloneAsync(codec, this.redisURI, timeout)); } /** @@ -211,7 +214,8 @@ public StatefulRedisConnection connect(RedisCodec codec) { public StatefulRedisConnection connect(RedisURI redisURI) { assertNotNull(redisURI); - return connectStandalone(newStringStringCodec(), redisURI, redisURI.getTimeout()); + + return getConnection(connectStandaloneAsync(newStringStringCodec(), redisURI, redisURI.getTimeout())); } /** @@ -227,7 +231,8 @@ public StatefulRedisConnection connect(RedisURI redisURI) { public StatefulRedisConnection connect(RedisCodec codec, RedisURI redisURI) { assertNotNull(redisURI); - return connectStandalone(codec, redisURI, redisURI.getTimeout()); + + return getConnection(connectStandaloneAsync(codec, redisURI, redisURI.getTimeout())); } /** @@ -244,20 +249,8 @@ public StatefulRedisConnection connect(RedisCodec codec, Redi public ConnectionFuture> connectAsync(RedisCodec codec, RedisURI redisURI) { assertNotNull(redisURI); - return connectStandaloneAsync(codec, redisURI, redisURI.getTimeout()); - } - private StatefulRedisConnection connectStandalone(RedisCodec codec, RedisURI redisURI, Duration timeout) { - return getConnection(connectStandaloneAsync(codec, redisURI, timeout)); - } - - // Required by ReflectiveNodeConnectionFactory. - @SuppressWarnings("unused") - private ConnectionFuture> connectStandaloneAsync(RedisCodec codec, - RedisURI redisURI) { - - assertNotNull(redisURI); - return connectStandaloneAsync(codec, redisURI, redisURI.getTimeout()); + return transformAsyncConnectionException(connectStandaloneAsync(codec, redisURI, redisURI.getTimeout())); } private ConnectionFuture> connectStandaloneAsync(RedisCodec codec, @@ -421,7 +414,7 @@ public ConnectionFuture> connectPubSu RedisURI redisURI) { assertNotNull(redisURI); - return connectPubSubAsync(codec, redisURI, redisURI.getTimeout()); + return transformAsyncConnectionException(connectPubSubAsync(codec, redisURI, redisURI.getTimeout())); } private ConnectionFuture> connectPubSubAsync(RedisCodec codec, @@ -484,6 +477,7 @@ public StatefulRedisSentinelConnection connectSentinel(RedisCodec connectSentinel(RedisURI redisURI) { assertNotNull(redisURI); + return getConnection(connectSentinelAsync(newStringStringCodec(), redisURI, redisURI.getTimeout())); } @@ -500,6 +494,7 @@ public StatefulRedisSentinelConnection connectSentinel(RedisURI public StatefulRedisSentinelConnection connectSentinel(RedisCodec codec, RedisURI redisURI) { assertNotNull(redisURI); + return getConnection(connectSentinelAsync(codec, redisURI, redisURI.getTimeout())); } @@ -519,7 +514,8 @@ public CompletableFuture> connectSe RedisURI redisURI) { assertNotNull(redisURI); - return connectSentinelAsync(codec, redisURI, redisURI.getTimeout()); + + return transformAsyncConnectionException(connectSentinelAsync(codec, redisURI, redisURI.getTimeout()), redisURI); } private CompletableFuture> connectSentinelAsync(RedisCodec codec, @@ -751,6 +747,30 @@ private Mono lookupRedis(RedisURI sentinelUri) { .then(Mono.just(it)))); } + private static ConnectionFuture transformAsyncConnectionException(ConnectionFuture future) { + + return future.thenCompose((v, e) -> { + + if (e != null) { + return Futures.failed(RedisConnectionException.create(future.getRemoteAddress(), e)); + } + + return CompletableFuture.completedFuture(v); + }); + } + + private static CompletableFuture transformAsyncConnectionException(CompletionStage future, RedisURI target) { + + return ConnectionFuture.from(null, future.toCompletableFuture()).thenCompose((v, e) -> { + + if (e != null) { + return Futures.failed(RedisConnectionException.create(target.toString(), e)); + } + + return CompletableFuture.completedFuture(v); + }).toCompletableFuture(); + } + private static void checkValidRedisURI(RedisURI redisURI) { LettuceAssert.notNull(redisURI, "A valid RedisURI is required"); diff --git a/src/main/java/io/lettuce/core/RedisConnectionException.java b/src/main/java/io/lettuce/core/RedisConnectionException.java index c4bffaeec8..75e6eb161f 100644 --- a/src/main/java/io/lettuce/core/RedisConnectionException.java +++ b/src/main/java/io/lettuce/core/RedisConnectionException.java @@ -53,6 +53,18 @@ public RedisConnectionException(String msg, Throwable cause) { * @since 4.4 */ public static RedisConnectionException create(SocketAddress remoteAddress, Throwable cause) { + return create(remoteAddress == null ? null : remoteAddress.toString(), cause); + } + + /** + * Create a new {@link RedisConnectionException} given {@code remoteAddress} and the {@link Throwable cause}. + * + * @param remoteAddress remote address. + * @param cause the nested exception. + * @return the {@link RedisConnectionException}. + * @since 5.1 + */ + public static RedisConnectionException create(String remoteAddress, Throwable cause) { if (remoteAddress == null) { diff --git a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java index ce2b2c8338..d51dd8314c 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java +++ b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java @@ -20,6 +20,7 @@ import java.net.URI; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -345,7 +346,7 @@ public StatefulRedisClusterConnection connect(RedisCodec code initializePartitions(); } - return getConnection(connectAsync(codec)); + return getConnection(connectClusterAsync(codec)); } /** @@ -371,7 +372,7 @@ public StatefulRedisClusterConnection connect(RedisCodec code */ @SuppressWarnings("unchecked") public CompletableFuture> connectAsync(RedisCodec codec) { - return connectClusterAsync(codec); + return transformAsyncConnectionException(connectClusterAsync(codec), getInitialUris()); } /** @@ -419,7 +420,7 @@ public StatefulRedisClusterPubSubConnection connectPubSub(RedisCode initializePartitions(); } - return getConnection(connectPubSubAsync(codec)); + return getConnection(connectClusterPubSubAsync(codec)); } /** @@ -445,7 +446,7 @@ public StatefulRedisClusterPubSubConnection connectPubSub(RedisCode */ @SuppressWarnings("unchecked") public CompletableFuture> connectPubSubAsync(RedisCodec codec) { - return connectClusterPubSubAsync(codec); + return transformAsyncConnectionException(connectClusterPubSubAsync(codec), getInitialUris()); } StatefulRedisConnection connectToNode(SocketAddress socketAddress) { @@ -1104,6 +1105,19 @@ boolean expireStaleConnections() { return getClusterClientOptions() == null || getClusterClientOptions().isCloseStaleConnections(); } + protected static CompletableFuture transformAsyncConnectionException(CompletionStage future, + Iterable target) { + + return ConnectionFuture.from(null, future.toCompletableFuture()).thenCompose((v, e) -> { + + if (e != null) { + return Futures.failed(RedisConnectionException.create(target.toString(), e)); + } + + return CompletableFuture.completedFuture(v); + }).toCompletableFuture(); + } + private static void assertNotNull(RedisCodec codec) { LettuceAssert.notNull(codec, "RedisCodec must not be null"); } diff --git a/src/main/java/io/lettuce/core/masterslave/MasterSlave.java b/src/main/java/io/lettuce/core/masterslave/MasterSlave.java index 888c7d93fd..38323e730d 100644 --- a/src/main/java/io/lettuce/core/masterslave/MasterSlave.java +++ b/src/main/java/io/lettuce/core/masterslave/MasterSlave.java @@ -17,6 +17,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import io.lettuce.core.ConnectionFuture; @@ -24,6 +25,7 @@ import io.lettuce.core.RedisConnectionException; import io.lettuce.core.RedisURI; import io.lettuce.core.codec.RedisCodec; +import io.lettuce.core.internal.Futures; import io.lettuce.core.internal.LettuceAssert; import io.lettuce.core.internal.LettuceLists; @@ -99,7 +101,12 @@ public class MasterSlave { */ public static StatefulRedisMasterSlaveConnection connect(RedisClient redisClient, RedisCodec codec, RedisURI redisURI) { - return getConnection(connectAsync(redisClient, codec, redisURI)); + + LettuceAssert.notNull(redisClient, "RedisClient must not be null"); + LettuceAssert.notNull(codec, "RedisCodec must not be null"); + LettuceAssert.notNull(redisURI, "RedisURI must not be null"); + + return getConnection(connectAsyncSentinelOrAutodiscovery(redisClient, codec, redisURI), redisURI); } /** @@ -120,6 +127,11 @@ public static StatefulRedisMasterSlaveConnection connect(RedisClien */ public static CompletableFuture> connectAsync(RedisClient redisClient, RedisCodec codec, RedisURI redisURI) { + return transformAsyncConnectionException(connectAsyncSentinelOrAutodiscovery(redisClient, codec, redisURI), redisURI); + } + + private static CompletableFuture> connectAsyncSentinelOrAutodiscovery( + RedisClient redisClient, RedisCodec codec, RedisURI redisURI) { LettuceAssert.notNull(redisClient, "RedisClient must not be null"); LettuceAssert.notNull(codec, "RedisCodec must not be null"); @@ -150,7 +162,7 @@ public static CompletableFuture> */ public static StatefulRedisMasterSlaveConnection connect(RedisClient redisClient, RedisCodec codec, Iterable redisURIs) { - return getConnection(connectAsync(redisClient, codec, redisURIs)); + return getConnection(connectAsyncSentinelOrStaticSetup(redisClient, codec, redisURIs), redisURIs); } /** @@ -171,6 +183,11 @@ public static StatefulRedisMasterSlaveConnection connect(RedisClien */ public static CompletableFuture> connectAsync(RedisClient redisClient, RedisCodec codec, Iterable redisURIs) { + return transformAsyncConnectionException(connectAsyncSentinelOrStaticSetup(redisClient, codec, redisURIs), redisURIs); + } + + private static CompletableFuture> connectAsyncSentinelOrStaticSetup( + RedisClient redisClient, RedisCodec codec, Iterable redisURIs) { LettuceAssert.notNull(redisClient, "RedisClient must not be null"); LettuceAssert.notNull(codec, "RedisCodec must not be null"); @@ -195,24 +212,52 @@ private static boolean isSentinel(RedisURI redisURI) { * the channel/connection initialization. Any exception is rethrown as {@link RedisConnectionException}. * * @param connectionFuture must not be null. + * @param context context information (single RedisURI, multiple URIs), used as connection target in the reported exception. * @param Connection type. * @return the connection. * @throws RedisConnectionException in case of connection failures. */ - private static T getConnection(CompletableFuture connectionFuture) { + private static T getConnection(CompletableFuture connectionFuture, Object context) { try { return connectionFuture.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw RedisConnectionException.create(null, e); + throw RedisConnectionException.create(context.toString(), e); } catch (Exception e) { if (e instanceof ExecutionException) { - throw RedisConnectionException.create(null, e.getCause()); + + // filter intermediate RedisConnectionException exceptions that bloat the stack trace + if (e.getCause() instanceof RedisConnectionException + && e.getCause().getCause() instanceof RedisConnectionException) { + throw RedisConnectionException.create(context.toString(), e.getCause().getCause()); + } + + throw RedisConnectionException.create(context.toString(), e.getCause()); } - throw RedisConnectionException.create(null, e); + throw RedisConnectionException.create(context.toString(), e); } } + + private static CompletableFuture transformAsyncConnectionException(CompletionStage future, Object context) { + + return ConnectionFuture + .from(null, future.toCompletableFuture()) + .thenCompose((v, e) -> { + + if (e != null) { + + // filter intermediate RedisConnectionException exceptions that bloat the stack trace + if (e.getCause() instanceof RedisConnectionException + && e.getCause().getCause() instanceof RedisConnectionException) { + return Futures.failed(RedisConnectionException.create(context.toString(), e.getCause())); + } + return Futures.failed(RedisConnectionException.create(context.toString(), e)); + } + + return CompletableFuture.completedFuture(v); + }).toCompletableFuture(); + } } diff --git a/src/test/java/io/lettuce/core/ConnectionFutureTest.java b/src/test/java/io/lettuce/core/ConnectionFutureTest.java new file mode 100644 index 0000000000..63c86e186c --- /dev/null +++ b/src/test/java/io/lettuce/core/ConnectionFutureTest.java @@ -0,0 +1,126 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +import org.junit.Test; + +import io.lettuce.core.internal.Futures; + +/** + * @author Mark Paluch + */ +public class ConnectionFutureTest { + + @Test + public void shouldComposeTransformToError() { + + CompletableFuture foo = new CompletableFuture<>(); + + ConnectionFuture transformed = ConnectionFuture.from(null, foo).thenCompose((s, t) -> { + + if (t != null) { + return Futures.failed(new IllegalStateException(t)); + } + return Futures.failed(new IllegalStateException()); + }); + + foo.complete("foo"); + + assertThat(transformed).isDone(); + assertThat(transformed.toCompletableFuture()).isCompletedExceptionally(); + assertThatThrownBy(transformed::join).hasRootCauseInstanceOf(IllegalStateException.class); + } + + @Test + public void composeTransformShouldFailWhileTransformation() { + + CompletableFuture foo = new CompletableFuture<>(); + + ConnectionFuture transformed = ConnectionFuture.from(null, foo).thenCompose((s, t) -> { + throw new IllegalStateException(); + }); + + foo.complete("foo"); + + assertThat(transformed).isDone(); + assertThat(transformed.toCompletableFuture()).isCompletedExceptionally(); + assertThatThrownBy(transformed::join).hasRootCauseInstanceOf(IllegalStateException.class); + } + + @Test + public void composeTransformShouldFailWhileTransformationRetainOriginalException() { + + CompletableFuture foo = new CompletableFuture<>(); + + ConnectionFuture transformed = ConnectionFuture.from(null, foo).thenCompose((s, t) -> { + throw new IllegalStateException(); + }); + + Throwable t = new Throwable(); + foo.completeExceptionally(t); + + assertThat(transformed).isDone(); + assertThat(transformed.toCompletableFuture()).isCompletedExceptionally(); + + try { + transformed.join(); + } catch (CompletionException e) { + + assertThat(e).hasRootCauseInstanceOf(IllegalStateException.class); + assertThat(e.getCause()).hasSuppressedException(t); + } + } + + @Test + public void shouldComposeWithErrorFlow() { + + CompletableFuture foo = new CompletableFuture<>(); + CompletableFuture exceptional = new CompletableFuture<>(); + + ConnectionFuture transformed1 = ConnectionFuture.from(null, foo).thenCompose((s, t) -> { + + if (t != null) { + return Futures.failed(new IllegalStateException(t)); + } + return CompletableFuture.completedFuture(s); + }); + + ConnectionFuture transformed2 = ConnectionFuture.from(null, exceptional).thenCompose((s, t) -> { + + if (t != null) { + return Futures.failed(new IllegalStateException(t)); + } + return CompletableFuture.completedFuture(s); + }); + + foo.complete("foo"); + exceptional.completeExceptionally(new IllegalArgumentException("foo")); + + assertThat(transformed1).isDone(); + assertThat(transformed1.toCompletableFuture()).isCompletedWithValue("foo"); + + assertThat(transformed2).isDone(); + assertThat(transformed2.toCompletableFuture()).isCompletedExceptionally(); + assertThatThrownBy(transformed2::join).hasCauseInstanceOf(IllegalStateException.class).hasRootCauseInstanceOf( + IllegalArgumentException.class); + } +} diff --git a/src/test/java/io/lettuce/core/masterslave/MasterSlaveSentinelTest.java b/src/test/java/io/lettuce/core/masterslave/MasterSlaveSentinelTest.java index 803ed15bde..edd5eb1902 100644 --- a/src/test/java/io/lettuce/core/masterslave/MasterSlaveSentinelTest.java +++ b/src/test/java/io/lettuce/core/masterslave/MasterSlaveSentinelTest.java @@ -111,7 +111,7 @@ public void testMasterSlaveSentinelWithUnavailableSentinels() throws Exception { MasterSlave.connect(sentinelClient, new Utf8StringCodec(), uri); fail("Missing RedisConnectionException"); } catch (RedisConnectionException e) { - assertThat(e.getCause()).hasCauseInstanceOf(IOException.class); + assertThat(e.getCause()).hasRootCauseInstanceOf(IOException.class); } }