Skip to content

Commit

Permalink
Merge Decouple ConnectionWatchdog reconnect from timer thread #100
Browse files Browse the repository at this point in the history
Motivation: Netty's HashedWheelTimer runs single threaded and processes, therefore, one task at a time. lettuce's ConnectionWatchdog uses a blocking call (without a timeout) to the initializing future that may block the timer thread indefinitely. The blocking call stops the reconnection for all connections because the processing thread is blocked.
  • Loading branch information
mp911de committed Jul 8, 2015
1 parent fe6f445 commit c5411d4
Show file tree
Hide file tree
Showing 37 changed files with 215 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,6 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof ConnectionEvents.Close) {
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/com/lambdaworks/redis/RedisChannelHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,14 @@ public void setOptions(ClientOptions clientOptions) {
this.clientOptions = clientOptions;
}

public TimeUnit getTimeoutUnit() {
return unit;
}

public long getTimeout() {
return timeout;
}

public TimeUnit getTimeoutUnit() {
return unit;
}

protected <T> T syncHandler(Object asyncApi, Class<?>... interfaces) {
FutureSyncInvocationHandler<K, V> h = new FutureSyncInvocationHandler<>((StatefulConnection) this, asyncApi);
return (T) Proxy.newProxyInstance(AbstractRedisClient.class.getClassLoader(), interfaces, h);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.lambdaworks.redis.protocol;

import io.netty.channel.Channel;

/**
* @author <a href="mailto:[email protected]">Mark Paluch</a>
* @since 08.07.15 09:43
*/
class ChannelLogDescriptor {

static String logDescriptor(Channel channel) {

if (channel == null) {
return "unknown";
}

StringBuffer buffer = new StringBuffer(64);

if (channel.localAddress() != null) {
buffer.append(channel.localAddress()).append(" -> ");
}
if (channel.remoteAddress() != null) {
buffer.append(channel.remoteAddress());
}

if (!channel.isActive()) {
buffer.append(" (inactive)");
}

return buffer.toString();
}
}
30 changes: 20 additions & 10 deletions src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

package com.lambdaworks.redis.protocol;

import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
import java.util.ArrayDeque;
import java.util.ArrayList;
Expand All @@ -14,6 +15,8 @@

import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

Expand All @@ -28,6 +31,7 @@
public class CommandHandler<K, V> extends ChannelDuplexHandler implements RedisChannelWriter<K, V> {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(CommandHandler.class);
private static final WriteLogListener WRITE_LOG_LISTENER = new WriteLogListener();

protected ClientOptions clientOptions;
protected Queue<RedisCommand<K, V, ?>> queue;
Expand Down Expand Up @@ -176,7 +180,7 @@ public <T, C extends RedisCommand<K, V, T>> C write(C command) {

if (reliability == Reliability.AT_LEAST_ONCE) {
// commands are ok to stay within the queue, reconnect will retrigger them
channel.write(command, channel.voidPromise());
channel.write(command).addListener(WRITE_LOG_LISTENER);
channel.flush();
}
} else {
Expand Down Expand Up @@ -232,7 +236,6 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}
} catch (Exception e) {
cmd.completeExceptionally(e);
cmd.cancel();
promise.setFailure(e);
throw e;
}
Expand Down Expand Up @@ -483,14 +486,8 @@ private String logPrefix() {
if (logPrefix != null) {
return logPrefix;
}
StringBuffer buffer = new StringBuffer(16);
buffer.append('[');
if (channel != null) {
buffer.append(channel.remoteAddress());
} else {
buffer.append("not connected");
}
buffer.append(']');
StringBuffer buffer = new StringBuffer(64);
buffer.append('[').append(ChannelLogDescriptor.logDescriptor(channel)).append(']');
return logPrefix = buffer.toString();
}

Expand Down Expand Up @@ -524,4 +521,17 @@ public void operationComplete(ChannelFuture future) throws Exception {
}
}

/**
* A generic future listener which logs unsuccessful writes.
*
*/
static class WriteLogListener implements GenericFutureListener<Future<Void>> {

@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess() && !(future.cause() instanceof ClosedChannelException))
logger.warn(future.cause().getMessage(), future.cause());
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
package com.lambdaworks.redis.protocol;

import java.net.SocketAddress;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

import com.google.common.base.Supplier;
import com.lambdaworks.redis.ClientOptions;
import com.lambdaworks.redis.ConnectionEvents;
import com.lambdaworks.redis.RedisChannelHandler;
import com.lambdaworks.redis.RedisChannelInitializer;

import io.netty.bootstrap.Bootstrap;
Expand Down Expand Up @@ -50,6 +52,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements
private SocketAddress remoteAddress;
private int attempts;
private long lastReconnectionLogging = -1;
private String logPrefix;

/**
* Create a new watchdog that adds to new connections to the supplied {@link ChannelGroup} and establishes a new
Expand Down Expand Up @@ -82,7 +85,7 @@ public ConnectionWatchdog(ClientOptions clientOptions, Bootstrap bootstrap, Time

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
logger.debug("userEventTriggered(" + ctx + ", " + evt + ")");
logger.debug("{} userEventTriggered({}, {})", logPrefix(), ctx, evt);
if (evt instanceof ConnectionEvents.PrepareClose) {
ConnectionEvents.PrepareClose prepareClose = (ConnectionEvents.PrepareClose) evt;
setListenOnChannelInactive(false);
Expand All @@ -94,7 +97,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {

logger.debug("channelActive(" + ctx + ")");
logger.debug("{} channelActive({})", logPrefix(), ctx);
channel = ctx.channel();
attempts = 0;
remoteAddress = channel.remoteAddress();
Expand All @@ -104,10 +107,13 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {

logger.debug("channelInactive(" + ctx + ")");
logger.debug("{} channelInactive({})", logPrefix(), ctx);
channel = null;
if (listenOnChannelInactive && !reconnectSuspended) {
scheduleReconnect();
} else {
logger.debug("{} Reconnect scheduling disabled", logPrefix(), ctx);
logger.debug("");
}
super.channelInactive(ctx);
}
Expand All @@ -116,9 +122,10 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
* Schedule reconnect if channel is not available/not active.
*/
public void scheduleReconnect() {
logger.debug("scheduleReconnect()");
logger.debug("{} scheduleReconnect()", logPrefix());

if (!isEventLoopGroupActive()) {
logger.debug("isEventLoopGroupActive() == false");
return;
}

Expand All @@ -127,9 +134,26 @@ public void scheduleReconnect() {
attempts++;
}
int timeout = 2 << attempts;
timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS);
timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {

if (!isEventLoopGroupActive()) {
logger.debug("isEventLoopGroupActive() == false");
return;
}

bootstrap.group().submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
ConnectionWatchdog.this.run(null);
return null;
}
});
}
}, timeout, TimeUnit.MILLISECONDS);
} else {
logger.debug("Skipping scheduleReconnect() because I have an active channel");
logger.debug("{} Skipping scheduleReconnect() because I have an active channel", logPrefix());
}
}

Expand All @@ -145,6 +169,7 @@ public void scheduleReconnect() {
public void run(Timeout timeout) throws Exception {

if (!isEventLoopGroupActive()) {
logger.debug("isEventLoopGroupActive() == false");
return;
}

Expand All @@ -163,7 +188,7 @@ public void run(Timeout timeout) throws Exception {
try {
reconnect(infoLevel, warnLevel);
} catch (Exception e) {
logger.log(warnLevel, "Cannot connect: " + e.toString());
logger.log(warnLevel, "Cannot connect: {}", e.toString());
scheduleReconnect();
}
}
Expand All @@ -184,10 +209,11 @@ private void reconnect(InternalLogLevel infoLevel, InternalLogLevel warnLevel) t
connect.sync().await();

RedisChannelInitializer channelInitializer = connect.channel().pipeline().get(RedisChannelInitializer.class);
CommandHandler<?, ?> commandHandler = connect.channel().pipeline().get(CommandHandler.class);
try {
CommandHandler commandHandler = connect.channel().pipeline().get(CommandHandler.class);
RedisChannelHandler channelHandler = connect.channel().pipeline().get(RedisChannelHandler.class);

channelInitializer.channelInitialized().get();
try {
channelInitializer.channelInitialized().get(channelHandler.getTimeout(), channelHandler.getTimeoutUnit());
logger.log(infoLevel, "Reconnected to " + remoteAddress);
} catch (Exception e) {

Expand Down Expand Up @@ -248,4 +274,14 @@ public boolean isReconnectSuspended() {
public void setReconnectSuspended(boolean reconnectSuspended) {
this.reconnectSuspended = reconnectSuspended;
}

private String logPrefix() {
if (logPrefix != null) {
return logPrefix;
}
StringBuffer buffer = new StringBuffer(64);
buffer.append('[').append(ChannelLogDescriptor.logDescriptor(channel)).append(']');
return logPrefix = buffer.toString();
}

}
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
package biz.paluch.redis.extensibility;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.*;

import java.util.concurrent.TimeUnit;

import com.lambdaworks.redis.pubsub.RedisPubSubAsyncCommandsImpl;
import com.lambdaworks.redis.pubsub.api.async.RedisPubSubAsyncCommands;
import org.apache.log4j.Logger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import com.lambdaworks.redis.FastShutdown;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.TestSettings;
import com.lambdaworks.redis.pubsub.RedisPubSubAsyncCommandsImpl;
import com.lambdaworks.redis.pubsub.api.async.RedisPubSubAsyncCommands;

/**
* Test for override/extensability of RedisClient
Expand All @@ -38,7 +37,7 @@ protected static MyExtendedRedisClient getRedisClient() {

@AfterClass
public static void shutdownClient() {
client.shutdown(0, 0, TimeUnit.MILLISECONDS);
FastShutdown.shutdown(client);
}

@Test
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/com/lambdaworks/SslTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void before() throws Exception {

@AfterClass
public static void afterClass() {
redisClient.shutdown();
FastShutdown.shutdown(redisClient);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public WithPasswordRequired() throws Exception {
try {
run(client);
} finally {
client.shutdown(0, 0, TimeUnit.MILLISECONDS);
FastShutdown.shutdown(client);
}
} finally {

Expand Down
2 changes: 1 addition & 1 deletion src/test/java/com/lambdaworks/redis/AllTheAPIsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public static void beforeClass() throws Exception {
@BeforeClass
public static void afterClass() throws Exception {
if (clusterClient != null) {
clusterClient.shutdown();
FastShutdown.shutdown(clusterClient);
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/test/java/com/lambdaworks/redis/ClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public void cancelCommandsOnReconnectFailure() throws Exception {

assertThat(connection.isOpen()).isFalse();
connectionWatchdog.setReconnectSuspended(false);
connectionWatchdog.scheduleReconnect();
connectionWatchdog.run(null);
Thread.sleep(500);
assertThat(connection.isOpen()).isFalse();

Expand Down Expand Up @@ -309,7 +309,7 @@ public boolean isSatisfied() {
assertThat(listener.onConnected).isEqualTo(statefulRedisConnection);
assertThat(listener.onDisconnected).isEqualTo(statefulRedisConnection);

client.shutdown();
FastShutdown.shutdown(client);
}

@Test
Expand Down Expand Up @@ -338,7 +338,7 @@ public boolean isSatisfied() {
assertThat(removedListener.onDisconnected).isNull();
assertThat(removedListener.onException).isNull();

client.shutdown();
FastShutdown.shutdown(client);

}

Expand Down Expand Up @@ -436,7 +436,7 @@ public void emptyClient() throws Exception {
} catch (IllegalArgumentException e) {
assertThat(e).hasMessageContaining("RedisURI");
}
client.shutdown();
FastShutdown.shutdown(client);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void run(RedisClient client) {
RedisConnection<String, String> authConnection = redisClient.connect().sync();
authConnection.ping();
authConnection.close();
redisClient.shutdown();
FastShutdown.shutdown(redisClient);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public DefaultRedisClient() {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
redisClient.shutdown(0, 0, TimeUnit.MILLISECONDS);
FastShutdown.shutdown(redisClient);
}
});
}
Expand Down
Loading

0 comments on commit c5411d4

Please sign in to comment.