Skip to content

Commit

Permalink
Cluster connection failover when cluster topology changes #97
Browse files Browse the repository at this point in the history
- Provide ClusterClientOptions to enable/disable topology refresh
- Implement ASK redirection
- Prevent connections to other nodes than the partion view provides (exact check)
- Polishing
  • Loading branch information
mp911de committed Jul 19, 2015
1 parent 1fb4263 commit b1ff431
Show file tree
Hide file tree
Showing 23 changed files with 958 additions and 137 deletions.
6 changes: 3 additions & 3 deletions src/main/java/com/lambdaworks/redis/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import java.io.Serializable;

/**
* Client Options to control the behavior of {@link RedisClient} and {@link com.lambdaworks.redis.cluster.RedisClusterClient}.
* Client Options to control the behavior of {@link RedisClient}.
*
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
Expand Down Expand Up @@ -88,14 +88,14 @@ public ClientOptions build() {
}
}

private ClientOptions(Builder builder) {
protected ClientOptions(Builder builder) {
pingBeforeActivateConnection = builder.pingBeforeActivateConnection;
cancelCommandsOnReconnectFailure = builder.cancelCommandsOnReconnectFailure;
autoReconnect = builder.autoReconnect;
suspendReconnectOnProtocolFailure = builder.suspendReconnectOnProtocolFailure;
}

private ClientOptions(ClientOptions original) {
protected ClientOptions(ClientOptions original) {
this.pingBeforeActivateConnection = original.pingBeforeActivateConnection;
this.autoReconnect = original.autoReconnect;
this.cancelCommandsOnReconnectFailure = original.cancelCommandsOnReconnectFailure;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import com.google.common.cache.CacheBuilder;
Expand Down Expand Up @@ -77,11 +79,23 @@ protected Object handleInvocation(Object proxy, Method method, Object[] args) th
}
}

if (redisCommand instanceof Future<?>) {
if (redisCommand.isDone()) {
try {
redisCommand.get();
} catch (InterruptedException e) {
throw e;
} catch (ExecutionException e) {
throw new RedisException(e.getCause());
}
}
}

return awaitedResult;
}

if (result instanceof RedisClusterAsyncConnection) {
return AbstractRedisClient.syncHandler((RedisChannelHandler) result, RedisConnection.class,
return AbstractRedisClient.syncHandler((RedisChannelHandler<?, ?>) result, RedisConnection.class,
RedisClusterConnection.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
class PlainChannelInitializer extends io.netty.channel.ChannelInitializer<Channel> implements RedisChannelInitializer {

final static RedisCommandBuilder INITIALIZING_CMD_BUILDER = new RedisCommandBuilder(new Utf8StringCodec());
final static RedisCommandBuilder<String, String> INITIALIZING_CMD_BUILDER = new RedisCommandBuilder(new Utf8StringCodec());

protected boolean pingBeforeActivate;
private List<ChannelHandler> handlers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1748,7 +1748,7 @@ public Command<K, V, List<GeoCoordinates>> geopos(K key, V[] members) {
}

public Command<K, V, Double> geodist(K key, V from, V to, GeoArgs.Unit unit) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addValues(from, to);
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addValue(from).addValue(to);

if (unit != null) {
args.add(unit.name());
Expand Down
117 changes: 117 additions & 0 deletions src/main/java/com/lambdaworks/redis/cluster/ClusterClientOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package com.lambdaworks.redis.cluster;

import java.util.concurrent.TimeUnit;

import com.lambdaworks.redis.ClientOptions;

/**
* Client Options to control the behavior of {@link RedisClusterClient}.
*
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
public class ClusterClientOptions extends ClientOptions {
private final boolean refreshClusterView;
private final long refreshPeriod;
private final TimeUnit refreshPeriodUnit;

/**
* Create a copy of {@literal options}
*
* @param options the original
* @return A new instance of {@link ClusterClientOptions} containing the values of {@literal options}
*/
public static ClusterClientOptions copyOf(ClusterClientOptions options) {
return new ClusterClientOptions(options);
}

/**
* Builder for {@link ClusterClientOptions}.
*/
public static class Builder extends ClientOptions.Builder {

private boolean refreshClusterView = false;
private long refreshPeriod = 60;
private TimeUnit refreshPeriodUnit = TimeUnit.SECONDS;

/**
* Enable regular cluster topology updates. The client starts updating the cluster topology in the intervals of
* {@link Builder#refreshPeriod} /{@link Builder#refreshPeriodUnit}. Defaults to {@literal false}.
*
* @param refreshClusterView {@literal true} enable regular cluster topology updates or {@literal false} to disable
* auto-updating
* @return {@code this}
*/
public Builder refreshClusterView(boolean refreshClusterView) {
this.refreshClusterView = refreshClusterView;
return this;
}

/**
* Set the refresh period. Defaults to {@literal 60 SECONDS}
*
* @param refreshPeriod period for triggering topology updates
* @param refreshPeriodUnit unit for {@code refreshPeriod}
* @return {@code this}
*/
public Builder refreshPeriod(long refreshPeriod, TimeUnit refreshPeriodUnit) {
this.refreshPeriod = refreshPeriod;
this.refreshPeriodUnit = refreshPeriodUnit;
return this;
}

/**
* Create a new instance of {@link ClusterClientOptions}
*
* @return new instance of {@link ClusterClientOptions}
*/
public ClusterClientOptions build() {
return new ClusterClientOptions(this);
}
}

protected ClusterClientOptions(Builder builder) {
super(builder);
this.refreshClusterView = builder.refreshClusterView;
this.refreshPeriod = builder.refreshPeriod;
this.refreshPeriodUnit = builder.refreshPeriodUnit;
}

protected ClusterClientOptions(ClusterClientOptions original) {
super(original);
this.refreshClusterView = original.refreshClusterView;
this.refreshPeriod = original.refreshPeriod;
this.refreshPeriodUnit = original.refreshPeriodUnit;
}

protected ClusterClientOptions() {
this.refreshClusterView = false;
this.refreshPeriod = 60;
this.refreshPeriodUnit = TimeUnit.SECONDS;
}

/**
* Flag, whether regular cluster topology updates are updated. The client starts updating the cluster topology in the
* intervals of {@link #getRefreshPeriod()} /{@link #getRefreshPeriodUnit()}. Defaults to {@literal false}
*
* @return
*/
public boolean isRefreshClusterView() {
return refreshClusterView;
}

/**
*
* @return the period between the regular cluster topology updates.
*/
public long getRefreshPeriod() {
return refreshPeriod;
}

/**
*
* @return unit for the {@link #getRefreshPeriod()}
*/
public TimeUnit getRefreshPeriodUnit() {
return refreshPeriodUnit;
}
}
19 changes: 15 additions & 4 deletions src/main/java/com/lambdaworks/redis/cluster/ClusterCommand.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.lambdaworks.redis.cluster;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -41,8 +39,14 @@ public CommandOutput<K, V, T> getOutput() {
public void complete() {
executions++;

if (executions < executionLimit && isMoved()) {
retry.write(this);
try {
if (executions < executionLimit && (isMoved() || isAsk())) {
retry.write(this);
return;
}
} catch (Exception e) {
setException(e);
command.complete();
return;
}

Expand All @@ -56,6 +60,13 @@ public boolean isMoved() {
return false;
}

public boolean isAsk() {
if (getError() != null && getError().startsWith(CommandKeyword.ASK.name())) {
return true;
}
return false;
}

@Override
public void addListener(Runnable listener, Executor executor) {
command.addListener(listener, executor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ interface ClusterConnectionProvider extends Closeable {
*/
void reset();

/**
* Close connections that are not in use anymore.
*/
void closeStaleConnections();

/**
* Update partitions.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,25 @@ public <T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {

if (commandToSend instanceof ClusterCommand) {
ClusterCommand<K, V, T> clusterCommand = (ClusterCommand<K, V, T>) commandToSend;
if (!clusterCommand.isDone() && clusterCommand.isMoved()) {
HostAndPort moveTarget = getMoveTarget(clusterCommand.getError());
commandToSend.getOutput().setError((String) null);
RedisAsyncConnectionImpl<K, V> connection = clusterConnectionProvider.getConnection(
ClusterConnectionProvider.Intent.WRITE, moveTarget.getHostText(), moveTarget.getPort());
channelWriter = connection.getChannelWriter();
if (!clusterCommand.isDone()) {
if (clusterCommand.isMoved()) {
HostAndPort moveTarget = getMoveTarget(clusterCommand.getError());
commandToSend.getOutput().setError((String) null);
RedisAsyncConnectionImpl<K, V> connection = clusterConnectionProvider.getConnection(
ClusterConnectionProvider.Intent.WRITE, moveTarget.getHostText(), moveTarget.getPort());
channelWriter = connection.getChannelWriter();
}

if (clusterCommand.isAsk()) {
HostAndPort askTarget = getAskTarget(clusterCommand.getError());
commandToSend.getOutput().setError((String) null);
RedisAsyncConnectionImpl<K, V> connection = clusterConnectionProvider.getConnection(
ClusterConnectionProvider.Intent.WRITE, askTarget.getHostText(), askTarget.getPort());
channelWriter = connection.getChannelWriter();

// set asking bit
connection.asking();
}
}
}

Expand Down Expand Up @@ -93,6 +106,17 @@ private HostAndPort getMoveTarget(String errorMessage) {
return HostAndPort.fromString(movedMessageParts.get(2));
}

private HostAndPort getAskTarget(String errorMessage) {

checkArgument(LettuceStrings.isNotEmpty(errorMessage), "errorMessage must not be empty");
checkArgument(errorMessage.startsWith(CommandKeyword.ASK.name()), "errorMessage must start with " + CommandKeyword.ASK);

List<String> movedMessageParts = Splitter.on(' ').splitToList(errorMessage);
checkArgument(movedMessageParts.size() >= 3, "errorMessage must consist of 3 tokens (" + movedMessageParts + ")");

return HostAndPort.fromString(movedMessageParts.get(2));
}

protected int getHash(byte[] encodedKey) {
return SlotHash.getSlot(encodedKey);
}
Expand Down
Loading

0 comments on commit b1ff431

Please sign in to comment.