Skip to content

Commit

Permalink
Handle reconnects on an own executor group (instead of the timer/even…
Browse files Browse the repository at this point in the history
…t loop group) and honor timeouts when waiting for reconnects #100

Motivation: Performing connect tasks on the event loop can block the channel threads. In consequence, no further channel message processing is possible and threads run into a dead-lock situation. Reconnects are handled now in a own executor group not harming the channel's event loop.

 The reconnect futures use the connection's timeout for the whole reconnect sequence (TCP connect and initializations such as PING or SSL) to prevent infinite blocking.
  • Loading branch information
mp911de committed Jul 11, 2015
1 parent d74dc37 commit d1fb078
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 34 deletions.
10 changes: 8 additions & 2 deletions src/main/java/com/lambdaworks/redis/AbstractRedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
Expand Down Expand Up @@ -58,6 +59,8 @@ public abstract class AbstractRedisClient {
@Deprecated
protected EventLoopGroup eventLoopGroup;

protected EventExecutorGroup genericWorkerPool;

protected final Map<Class<? extends EventLoopGroup>, EventLoopGroup> eventLoopGroups;
protected final HashedWheelTimer timer;
protected final ChannelGroup channels;
Expand All @@ -70,7 +73,8 @@ public abstract class AbstractRedisClient {
protected AbstractRedisClient() {
timer = new HashedWheelTimer();
eventLoopGroups = new ConcurrentHashMap<Class<? extends EventLoopGroup>, EventLoopGroup>();
channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
genericWorkerPool = new DefaultEventExecutorGroup(DEFAULT_EVENT_LOOP_THREADS);
channels = new DefaultChannelGroup(genericWorkerPool.next());
timer.start();
unit = TimeUnit.SECONDS;
}
Expand Down Expand Up @@ -127,6 +131,7 @@ protected void connectionBuilder(CommandHandler<?, ?> handler, RedisChannelHandl
connectionBuilder.bootstrap(redisBootstrap);
connectionBuilder.channelGroup(channels).connectionEvents(connectionEvents).timer(timer);
connectionBuilder.commandHandler(handler).socketAddressSupplier(socketAddressSupplier).connection(connection);
connectionBuilder.workerPool(genericWorkerPool);

}

Expand Down Expand Up @@ -265,6 +270,7 @@ public void shutdown(long quietPeriod, long timeout, TimeUnit timeUnit) {
List<Future<?>> closeFutures = Lists.newArrayList();
ChannelGroupFuture closeFuture = channels.close();

closeFutures.add(genericWorkerPool.shutdownGracefully(quietPeriod, timeout, timeUnit));
closeFutures.add(closeFuture);

for (EventLoopGroup eventExecutors : eventLoopGroups.values()) {
Expand Down
15 changes: 13 additions & 2 deletions src/main/java/com/lambdaworks/redis/ConnectionBuilder.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.lambdaworks.redis;

import static com.google.common.base.Preconditions.*;
import static com.google.common.base.Preconditions.checkState;

import java.net.SocketAddress;
import java.util.List;
Expand All @@ -14,8 +14,12 @@

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.util.Timer;
import io.netty.util.concurrent.EventExecutorGroup;

/**
* @author <a href="mailto:[email protected]">Mark Paluch</a>
Expand All @@ -31,6 +35,7 @@ public class ConnectionBuilder {
private Timer timer;
private Bootstrap bootstrap;
private ClientOptions clientOptions;
private EventExecutorGroup workerPool;
private long timeout;
private TimeUnit timeUnit;

Expand Down Expand Up @@ -67,6 +72,11 @@ public ConnectionBuilder clientOptions(ClientOptions clientOptions) {
return this;
}

public ConnectionBuilder workerPool(EventExecutorGroup workerPool) {
this.workerPool = workerPool;
return this;
}

public ConnectionBuilder connectionEvents(ConnectionEvents connectionEvents) {
this.connectionEvents = connectionEvents;
return this;
Expand Down Expand Up @@ -108,7 +118,8 @@ protected List<ChannelHandler> buildHandlers() {
checkState(timer != null, "timer must be set for autoReconnect=true");
checkState(socketAddressSupplier != null, "socketAddressSupplier must be set for autoReconnect=true");

ConnectionWatchdog watchdog = new ConnectionWatchdog(clientOptions, bootstrap, timer, socketAddressSupplier);
ConnectionWatchdog watchdog = new ConnectionWatchdog(clientOptions, bootstrap, timer, workerPool,
socketAddressSupplier);

watchdog.setListenOnChannelInactive(true);
handlers.add(watchdog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,8 @@ private void cancelCommands(String message) {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (debugEnabled) {
logger.debug("{} exceptionCaught() {}", logPrefix(), cause);
logger.debug("{} exceptionCaught()", logPrefix(), cause);
logger.debug(cause.getMessage(), cause);
}
if (!queue.isEmpty()) {
RedisCommand<K, V, ?> command = queue.poll();
Expand Down Expand Up @@ -465,7 +466,7 @@ public void close() {
if (currentChannel != null) {
currentChannel.pipeline().fireUserEventTriggered(new ConnectionEvents.PrepareClose());
currentChannel.pipeline().fireUserEventTriggered(new ConnectionEvents.Close());
currentChannel.closeFuture().syncUninterruptibly();
currentChannel.pipeline().close().syncUninterruptibly();
}
}

Expand Down
106 changes: 78 additions & 28 deletions src/main/java/com/lambdaworks/redis/protocol/ConnectionWatchdog.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.net.SocketAddress;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.google.common.base.Supplier;
import com.lambdaworks.redis.ClientOptions;
Expand All @@ -13,11 +14,17 @@
import com.lambdaworks.redis.RedisChannelInitializer;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.group.ChannelGroup;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.logging.InternalLogLevel;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
Expand All @@ -35,6 +42,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements

private static final InternalLogger logger = InternalLoggerFactory.getInstance(ConnectionWatchdog.class);

private final EventExecutorGroup reconnectWorkers;
private final ClientOptions clientOptions;
private final Bootstrap bootstrap;
private boolean listenOnChannelInactive;
Expand All @@ -49,6 +57,11 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements
private long lastReconnectionLogging = -1;
private String logPrefix;

private TimeUnit timeoutUnit = TimeUnit.SECONDS;
private long timeout = 60;

private volatile ChannelFuture currentFuture;

/**
* Create a new watchdog that adds to new connections to the supplied {@link ChannelGroup} and establishes a new
* {@link Channel} when disconnected, while reconnect is true.
Expand All @@ -57,8 +70,8 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements
* @param bootstrap Configuration for new channels.
* @param timer Timer used for delayed reconnect.
*/
public ConnectionWatchdog(ClientOptions clientOptions, Bootstrap bootstrap, Timer timer) {
this(clientOptions, bootstrap, timer, null);
public ConnectionWatchdog(ClientOptions clientOptions, Bootstrap bootstrap, EventExecutorGroup reconnectWorkers, Timer timer) {
this(clientOptions, bootstrap, timer, reconnectWorkers, null);
}

/**
Expand All @@ -71,17 +84,21 @@ public ConnectionWatchdog(ClientOptions clientOptions, Bootstrap bootstrap, Time
* @param socketAddressSupplier the socket address suplier for gaining an address to reconnect to
*/
public ConnectionWatchdog(ClientOptions clientOptions, Bootstrap bootstrap, Timer timer,
Supplier<SocketAddress> socketAddressSupplier) {
EventExecutorGroup reconnectWorkers, Supplier<SocketAddress> socketAddressSupplier) {
this.clientOptions = clientOptions;
this.bootstrap = bootstrap;
this.timer = timer;
this.reconnectWorkers = reconnectWorkers;
this.socketAddressSupplier = socketAddressSupplier;
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
logger.debug("{} userEventTriggered({}, {})", logPrefix, ctx, evt);
if (evt instanceof ConnectionEvents.PrepareClose) {
if (currentFuture != null && !currentFuture.isDone()) {
currentFuture.cancel(true);
}
ConnectionEvents.PrepareClose prepareClose = (ConnectionEvents.PrepareClose) evt;
setListenOnChannelInactive(false);
prepareClose.getPrepareCloseFuture().set(true);
Expand All @@ -96,6 +113,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
channel = ctx.channel();
attempts = 0;
remoteAddress = channel.remoteAddress();

super.channelActive(ctx);
}

Expand All @@ -105,9 +123,15 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.debug("{} channelInactive({})", logPrefix, ctx);
channel = null;
if (listenOnChannelInactive && !reconnectSuspended) {
RedisChannelHandler channelHandler = ctx.pipeline().get(RedisChannelHandler.class);
if (channelHandler != null) {
timeout = channelHandler.getTimeout();
timeoutUnit = channelHandler.getTimeoutUnit();
}

scheduleReconnect();
} else {
logger.debug("{} Reconnect scheduling disabled", logPrefix, ctx);
logger.debug("{} Reconnect scheduling disabled", logPrefix(), ctx);
logger.debug("");
}
super.channelInactive(ctx);
Expand All @@ -131,17 +155,22 @@ public void scheduleReconnect() {
int timeout = 2 << attempts;
timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
public void run(final Timeout timeout) throws Exception {

if (!isEventLoopGroupActive()) {
logger.debug("isEventLoopGroupActive() == false");
return;
}

bootstrap.group().submit(new Callable<Object>() {
if (reconnectWorkers != null) {
ConnectionWatchdog.this.run(timeout);
return;
}

reconnectWorkers.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
ConnectionWatchdog.this.run(null);
ConnectionWatchdog.this.run(timeout);
return null;
}
});
Expand Down Expand Up @@ -182,55 +211,75 @@ public void run(Timeout timeout) throws Exception {

try {
reconnect(infoLevel, warnLevel);
} catch (InterruptedException e) {
return;
} catch (Exception e) {
logger.log(warnLevel, "Cannot connect: {}", e.toString());
scheduleReconnect();
}
}

private void reconnect(InternalLogLevel infoLevel, InternalLogLevel warnLevel) throws InterruptedException {
private void reconnect(InternalLogLevel infoLevel, InternalLogLevel warnLevel) throws Exception {

logger.log(infoLevel, "Reconnecting, last destination was " + remoteAddress);

if (socketAddressSupplier != null) {
try {
remoteAddress = socketAddressSupplier.get();
} catch (RuntimeException e) {
logger.log(warnLevel, "Cannot retrieve the current address from socketAddressSupplier: " + e.toString());
logger.log(warnLevel, "Cannot retrieve the current address from socketAddressSupplier: " + e.toString()
+ ", reusing old address " + remoteAddress);
}
}

ChannelFuture connect = bootstrap.connect(remoteAddress);
connect.sync().await();

RedisChannelInitializer channelInitializer = connect.channel().pipeline().get(RedisChannelInitializer.class);
CommandHandler commandHandler = connect.channel().pipeline().get(CommandHandler.class);
RedisChannelHandler channelHandler = connect.channel().pipeline().get(RedisChannelHandler.class);

try {
channelInitializer.channelInitialized().get(channelHandler.getTimeout(), channelHandler.getTimeoutUnit());
logger.log(infoLevel, "Reconnected to " + remoteAddress);
} catch (Exception e) {
long timeLeft = timeoutUnit.toNanos(timeout);
long start = System.nanoTime();
currentFuture = bootstrap.connect(remoteAddress);
if (!currentFuture.await(timeLeft, TimeUnit.NANOSECONDS)) {
if (currentFuture.isCancellable()) {
currentFuture.cancel(true);
}

if (clientOptions.isCancelCommandsOnReconnectFailure()) {
commandHandler.reset();
throw new TimeoutException("Reconnection attempt exceeded timeout of " + timeout + " " + timeoutUnit);
}
currentFuture.sync();

RedisChannelInitializer channelInitializer = currentFuture.channel().pipeline().get(RedisChannelInitializer.class);
CommandHandler commandHandler = currentFuture.channel().pipeline().get(CommandHandler.class);

try {
timeLeft -= System.nanoTime() - start;
channelInitializer.channelInitialized().get(Math.max(0, timeLeft), TimeUnit.NANOSECONDS);
logger.log(infoLevel, "Reconnected to " + remoteAddress);
} catch (Exception e) {
if (clientOptions.isCancelCommandsOnReconnectFailure()) {
commandHandler.reset();
}

if (clientOptions.isSuspendReconnectOnProtocolFailure()) {
logger.error("Cannot initialize channel. Disabling autoReconnect", e);
setReconnectSuspended(true);
} else {
logger.error("Cannot initialize channel.", e);
if (clientOptions.isSuspendReconnectOnProtocolFailure()) {
logger.error("Cannot initialize channel. Disabling autoReconnect", e);
setReconnectSuspended(true);
} else {
logger.error("Cannot initialize channel.", e);
throw e;
}
}
} finally {
currentFuture = null;
}

}

private boolean isEventLoopGroupActive() {
if (bootstrap.group().isShutdown() || bootstrap.group().isTerminated() || bootstrap.group().isShuttingDown()) {
return false;
}

if (reconnectWorkers != null
&& (reconnectWorkers.isShutdown() || reconnectWorkers.isTerminated() || reconnectWorkers.isShuttingDown())) {
return false;
}

return true;
}

Expand Down Expand Up @@ -267,6 +316,7 @@ public boolean isReconnectSuspended() {
}

public void setReconnectSuspended(boolean reconnectSuspended) {

this.reconnectSuspended = reconnectSuspended;
}

Expand Down

0 comments on commit d1fb078

Please sign in to comment.