Skip to content

Commit

Permalink
Enable initial support for read from slaves in Master-Slave setups #125
Browse files Browse the repository at this point in the history
This commit adds the API for master-slave setups featuring topology discovery and selecting the source for read operations (read from slave/master/nearest node). The change can improve the usage of slave nodes within a setup and requires no longer multiple connections managed by user code but the connection management is built into the client.
  • Loading branch information
mp911de committed Nov 24, 2015
1 parent f80d8a1 commit 9fef4d4
Show file tree
Hide file tree
Showing 18 changed files with 1,456 additions and 6 deletions.
8 changes: 4 additions & 4 deletions src/main/java/com/lambdaworks/redis/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ protected SocketAddress getSocketAddress(RedisURI redisURI) throws InterruptedEx
if (redisURI.getSentinelMasterId() != null && !redisURI.getSentinels().isEmpty()) {
logger.debug("Connecting to Redis using Sentinels " + redisURI.getSentinels() + ", MasterId "
+ redisURI.getSentinelMasterId());
redisAddress = lookupRedis(redisURI.getSentinelMasterId());
redisAddress = lookupRedis(redisURI);

if (redisAddress == null) {
throw new RedisConnectionException("Cannot provide redisAddress using sentinel for masterId "
Expand All @@ -774,11 +774,11 @@ protected SocketAddress getSocketAddress(RedisURI redisURI) throws InterruptedEx
return redisAddress;
}

private SocketAddress lookupRedis(String sentinelMasterId) throws InterruptedException, TimeoutException,
private SocketAddress lookupRedis(RedisURI sentinelUri) throws InterruptedException, TimeoutException,
ExecutionException {
RedisSentinelAsyncCommands<String, String> connection = connectSentinel().async();
RedisSentinelAsyncCommands<String, String> connection = connectSentinel(sentinelUri).async();
try {
return connection.getMasterAddrByName(sentinelMasterId).get(timeout, unit);
return connection.getMasterAddrByName(sentinelUri.getSentinelMasterId()).get(timeout, unit);
} finally {
connection.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class ReadOnlyCommands {
enum CommandName {
ASKING, BITCOUNT, BITPOS, CLIENT, COMMAND, DUMP, ECHO, EXISTS,
/**/GEODIST, GEOPOS, GEORADIUS, GEORADIUSBYMEMBER, GET, GETBIT,
/**/GETRANGE, HEXISTS, HGET, HGETALL, HKEYS, HLEN, HMGET, HSCAN, /* TODO #117 HSTRLEN , */
/**/GETRANGE, HEXISTS, HGET, HGETALL, HKEYS, HLEN, HMGET, HSCAN, HSTRLEN,
/**/HVALS, INFO, KEYS, LINDEX, LLEN, LRANGE, MGET, MULTI, PFCOUNT, PTTL,
/**/RANDOMKEY, READWRITE, SCAN, SCARD, SCRIPT,
/**/SDIFF, SINTER, SISMEMBER, SMEMBERS, SRANDMEMBER, SSCAN, STRLEN,
Expand Down
85 changes: 85 additions & 0 deletions src/main/java/com/lambdaworks/redis/masterslave/MasterSlave.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package com.lambdaworks.redis.masterslave;

import com.lambdaworks.redis.RedisClient;
import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.api.StatefulRedisConnection;
import com.lambdaworks.redis.codec.RedisCodec;

/**
* Master-Slave connection API.
* <p>
* This API allows connections to Redis Master/Slave setups which run either Standalone or are managed by Redis Sentinel.
* Master-Slave connections incorporate topology discovery and source selection for read operations using
* {@link com.lambdaworks.redis.ReadFrom}. Regular Standalone connections using {@link RedisClient#connect()} are single-node
* connections without balancing/topology discovery.
* </p>
* <p>
* Connections can be obtained by providing the {@link RedisClient}, a {@link RedisURI} and a {@link RedisCodec}. <code>
* RedisClient client = RedisClient.create();
* StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(client,
* RedisURI.create("redis://localhost"),
* new Utf8StringCodec());
* // ...
*
* connection.close();
* client.shutdown();
* </code>
* </p>
* <h3>Topology discovery</h3>
* <p>
* Master-Slave topologies are either static or semi-static. Redis Standalone instances with attached slaves feature no
* failover/HA mechanism and are static setups. Redis Sentinel managed instances are controlled by Redis Sentinel and allow
* failover (which include master promotion). The {@link MasterSlave} API supports both mechanisms. The topology is provided by
* a {@link TopologyProvider}:
*
* <ul>
* <li>{@link MasterSlaveTopologyProvider}: Topology lookup using the {@code INFO REPLICATION} output. Slaves are listed as
* {@code slaveN=...} entries.</li>
* <li>{@link SentinelTopologyProvider}: Topology lookup using the Redis Sentinel API. In particular {@code SENTINEL SLAVES}
* output.</li>
* </ul>
*
* The topology is discovered once during the connection phase but is not updated afterwards.
* </p>
*
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
public class MasterSlave {

/**
* Open a new connection to a Redis Master-Slave server/servers using the supplied {@link RedisURI} and the supplied
* {@link RedisCodec codec} to encode/decode keys.
*
* @param redisClient the Redis client
* @param codec Use this codec to encode/decode keys and values, must not be {@literal null}
* @param redisURI the Redis server to connect to, must not be {@literal null}
* @param <K> Key type
* @param <V> Value type
* @return A new connection
*/
public static <K, V> StatefulRedisMasterSlaveConnection<K, V> connect(RedisClient redisClient, RedisCodec<K, V> codec,
RedisURI redisURI) {

StatefulRedisConnection<K, V> masterConnection = redisClient.connect(codec, redisURI);
TopologyProvider topologyProvider;
if (redisURI.getSentinels().isEmpty()) {
topologyProvider = new MasterSlaveTopologyProvider(masterConnection, redisURI);
} else {
topologyProvider = new SentinelTopologyProvider(redisURI.getSentinelMasterId(), redisClient, redisURI);
}

MasterSlaveConnectionProvider<K, V> connectionProvider = new MasterSlaveConnectionProvider<>(redisClient, codec,
masterConnection);

MasterSlaveTopologyRefresh refresh = new MasterSlaveTopologyRefresh(redisClient, topologyProvider);
connectionProvider.setKnownNodes(refresh.getNodes(redisURI));

MasterSlaveChannelWriter<K, V> channelWriter = new MasterSlaveChannelWriter<>(connectionProvider);

StatefulRedisMasterSlaveConnectionImpl<K, V> connection = new StatefulRedisMasterSlaveConnectionImpl<>(channelWriter,
codec, redisURI.getTimeout(), redisURI.getUnit());

return connection;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package com.lambdaworks.redis.masterslave;

import static com.google.common.base.Preconditions.checkArgument;

import com.lambdaworks.redis.ReadFrom;
import com.lambdaworks.redis.RedisChannelHandler;
import com.lambdaworks.redis.RedisChannelWriter;
import com.lambdaworks.redis.RedisException;
import com.lambdaworks.redis.api.StatefulRedisConnection;
import com.lambdaworks.redis.protocol.ProtocolKeyword;
import com.lambdaworks.redis.protocol.RedisCommand;

/**
* Channel writer/dispatcher that dispatches commands based on the intent to different connections.
*
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
class MasterSlaveChannelWriter<K, V> implements RedisChannelWriter<K, V> {

private MasterSlaveConnectionProvider masterSlaveConnectionProvider;
private boolean closed = false;

public MasterSlaveChannelWriter(MasterSlaveConnectionProvider masterSlaveConnectionProvider) {
this.masterSlaveConnectionProvider = masterSlaveConnectionProvider;
}

@Override
public <T, C extends RedisCommand<K, V, T>> C write(C command) {
checkArgument(command != null, "command must not be null");

if (closed) {
throw new RedisException("Connection is closed");
}

MasterSlaveConnectionProvider.Intent intent = getIntent(command.getType());
StatefulRedisConnection<K, V> connection = masterSlaveConnectionProvider.getConnection(intent);

return connection.dispatch(command);
}

private MasterSlaveConnectionProvider.Intent getIntent(ProtocolKeyword type) {
for (ProtocolKeyword readOnlyCommand : ReadOnlyCommands.READ_ONLY_COMMANDS) {
if (readOnlyCommand == type) {
return MasterSlaveConnectionProvider.Intent.READ;
}
}
return MasterSlaveConnectionProvider.Intent.WRITE;
}

@Override
public void close() {

if (closed) {
return;
}

closed = true;

if (masterSlaveConnectionProvider != null) {
masterSlaveConnectionProvider.close();
masterSlaveConnectionProvider = null;
}
}

public MasterSlaveConnectionProvider getMasterSlaveConnectionProvider() {
return masterSlaveConnectionProvider;
}

@Override
public void setRedisChannelHandler(RedisChannelHandler<K, V> redisChannelHandler) {

}

@Override
public void setAutoFlushCommands(boolean autoFlush) {
masterSlaveConnectionProvider.setAutoFlushCommands(autoFlush);
}

@Override
public void flushCommands() {
masterSlaveConnectionProvider.flushCommands();
}

@Override
public void reset() {
masterSlaveConnectionProvider.reset();
}

/**
* Set from which nodes data is read. The setting is used as default for read operations on this connection. See the
* documentation for {@link ReadFrom} for more information.
*
* @param readFrom the read from setting, must not be {@literal null}
*/
public void setReadFrom(ReadFrom readFrom) {
masterSlaveConnectionProvider.setReadFrom(readFrom);
}

/**
* Gets the {@link ReadFrom} setting for this connection. Defaults to {@link ReadFrom#MASTER} if not set.
*
* @return the read from setting
*/
public ReadFrom getReadFrom() {
return masterSlaveConnectionProvider.getReadFrom();
}

}
Loading

0 comments on commit 9fef4d4

Please sign in to comment.