Skip to content

Commit

Permalink
Guard initialization against premature disconnect #608
Browse files Browse the repository at this point in the history
Properly propagate connect/disconnect events to prevent a race in which the disconnect is faster than setting up future chaining for event propagation.
  • Loading branch information
mp911de committed Oct 13, 2017
1 parent 32bfd07 commit 5d1f1fc
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initia
redisBootstrap.handler(initializer);

clientResources.nettyCustomizer().afterBootstrapInitialized(redisBootstrap);
CompletableFuture<Boolean> initFuture = initializer.channelInitialized();
ChannelFuture connectFuture = redisBootstrap.connect(redisAddress);

connectFuture.addListener(future -> {
Expand All @@ -305,7 +306,6 @@ protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initia
return;
}

CompletableFuture<Boolean> initFuture = initializer.channelInitialized();
initFuture.whenComplete((success, throwable) -> {

if (throwable == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ public CompletableFuture<Boolean> channelInitialized() {
public void channelInactive(ChannelHandlerContext ctx) throws Exception {

clientResources.eventBus().publish(new DisconnectedEvent(local(ctx), remote(ctx)));

if (!initializedFuture.isDone()) {
initializedFuture.completeExceptionally(new RedisConnectionException("Connection closed prematurely"));
}

initializedFuture = new CompletableFuture<>();
pingCommand = null;
super.channelInactive(ctx);
Expand Down Expand Up @@ -142,6 +147,7 @@ static void pingBeforeActivate(AsyncCommand<?, ?, ?> cmd, CompletableFuture<Bool
if (cmd.isDone() || initializedFuture.isDone()) {
return;
}

initializedFuture.completeExceptionally(ExceptionFactory.createTimeoutException(
"Cannot initialize channel (PING before activate)", timeout, timeUnit));
};
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/lambdaworks/redis/SslConnectionBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ public CompletableFuture<Boolean> channelInitialized() {

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {

if (!initializedFuture.isDone()) {
initializedFuture.completeExceptionally(new RedisConnectionException(
"Connection closed prematurely"));
}

initializedFuture = new CompletableFuture<>();
pingCommand = null;
super.channelInactive(ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public class CommandHandler<K, V> extends ChannelDuplexHandler implements RedisC

private Thread exclusiveLockOwner;
private RedisChannelHandler<K, V> redisChannelHandler;
private Throwable connectionError;
private volatile Throwable connectionError;
private String logPrefix;
private boolean autoFlushCommands = true;
private PristineFallbackCommand fallbackCommand;
Expand Down
1 change: 1 addition & 0 deletions src/test/java/com/lambdaworks/redis/support/CdiTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import javax.enterprise.inject.Produces;

import com.lambdaworks.TestClientResources;
import org.apache.webbeans.cditest.CdiTestContainer;
import org.apache.webbeans.cditest.CdiTestContainerLoader;
import org.junit.AfterClass;
Expand Down

0 comments on commit 5d1f1fc

Please sign in to comment.