Skip to content

Commit

Permalink
Provide access to cluster connection using the advanced cluster API #71
Browse files Browse the repository at this point in the history
  • Loading branch information
mp911de committed Jun 4, 2015
1 parent 4468e8a commit 8f9c22d
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ protected Object handleInvocation(Object proxy, Method method, Object[] args) th
return LettuceFutures.await(command, timeout, unit);
}

if (result instanceof RedisClusterAsyncConnection) {
return AbstractRedisClient.syncHandler((RedisChannelHandler) result, RedisConnection.class,
RedisClusterConnection.class);
}

return result;

} catch (InvocationTargetException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.lambdaworks.redis.cluster;

import com.lambdaworks.redis.RedisClusterAsyncConnection;
import com.lambdaworks.redis.RedisClusterConnection;

/**
* Advanced asynchronous cluster API.
*
* @author <a href="mailto:[email protected]">Mark Paluch</a>
* @since 3.3
*/
public interface RedisAdvancedClusterAsyncConnection<K, V> extends RedisClusterAsyncConnection<K, V> {

/**
* Retrieve a connection to the specified cluster node using the nodeId. Host and port are looked up in the node list.
*
* @param nodeId the node Id
* @return a connection to the requested cluster node
*/
RedisClusterAsyncConnection<K, V> getConnection(String nodeId);

/**
* Retrieve a connection to the specified cluster node using the nodeId.
*
* @param host the host
* @param port the port
* @return a connection to the requested cluster node
*/
RedisClusterAsyncConnection<K, V> getConnection(String host, int port);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.lambdaworks.redis.cluster;

import java.util.concurrent.TimeUnit;

import com.lambdaworks.redis.RedisAsyncConnectionImpl;
import com.lambdaworks.redis.RedisChannelWriter;
import com.lambdaworks.redis.RedisClusterAsyncConnection;
import com.lambdaworks.redis.RedisException;
import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.cluster.models.partitions.Partitions;
import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode;
import com.lambdaworks.redis.codec.RedisCodec;
import io.netty.channel.ChannelHandler;

/**
* Advanced asynchronous Cluster connection.
*
* @author <a href="mailto:[email protected]">Mark Paluch</a>
* @since 3.3
*/
@ChannelHandler.Sharable
public class RedisAdvancedClusterAsyncConnectionImpl<K, V> extends RedisAsyncConnectionImpl<K, V> implements
RedisAdvancedClusterAsyncConnection<K, V> {

private Partitions partitions;

/**
* Initialize a new connection.
*
* @param writer the channel writer
* @param codec Codec used to encode/decode keys and values.
* @param timeout Maximum time to wait for a response.
* @param unit Unit of time for the timeout.
*/
public RedisAdvancedClusterAsyncConnectionImpl(RedisChannelWriter<K, V> writer, RedisCodec<K, V> codec, long timeout,
TimeUnit unit) {
super(writer, codec, timeout, unit);
}

ClusterDistributionChannelWriter<K, V> getWriter() {
return (ClusterDistributionChannelWriter) super.getChannelWriter();
}

@Override
public RedisClusterAsyncConnection<K, V> getConnection(String nodeId) {
RedisURI redisURI = lookup(nodeId);
if (redisURI == null) {
throw new RedisException("NodeId " + nodeId + " does not belong to the cluster");
}

return getConnection(redisURI.getHost(), redisURI.getPort());
}

private RedisURI lookup(String nodeId) {

for (RedisClusterNode partition : partitions) {
if (partition.getNodeId().equals(nodeId)) {
return partition.getUri();
}
}
return null;
}

@Override
public RedisClusterAsyncConnection<K, V> getConnection(String host, int port) {
// there is currently no check whether the node belongs to the cluster or not.
// A check against the partition table could be done, but this reflects only a particular
// point of view. What if the cluster is multi-homed, proxied, natted...?

RedisAsyncConnectionImpl<K, V> connection = getWriter().getClusterConnectionProvider().getConnection(
ClusterConnectionProvider.Intent.WRITE, host, port);

return connection;
}

public void setPartitions(Partitions partitions) {
this.partitions = partitions;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.lambdaworks.redis.cluster;

import com.lambdaworks.redis.RedisClusterConnection;

/**
* Advanced synchronous cluster API.
*
* @author <a href="mailto:[email protected]">Mark Paluch</a>
* @since 3.3
*/
public interface RedisAdvancedClusterConnection<K, V> extends RedisClusterConnection<K, V> {

/**
* Retrieve a connection to the specified cluster node using the nodeId. Host and port are looked up in the node list.
*
* @param nodeId the node Id
* @return a connection to the requested cluster node
*/
RedisClusterConnection<K, V> getConnection(String nodeId);

/**
* Retrieve a connection to the specified cluster node using the nodeId.
*
* @param host the host
* @param port the port
* @return a connection to the requested cluster node
*/
RedisClusterConnection<K, V> getConnection(String host, int port);
}
28 changes: 16 additions & 12 deletions src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public RedisClusterClient(List<RedisURI> initialUris) {
*
* @return A new connection.
*/
public RedisClusterConnection<String, String> connectCluster() {
public RedisAdvancedClusterConnection<String, String> connectCluster() {

return connectCluster(newStringStringCodec());
}
Expand All @@ -93,17 +93,18 @@ public RedisClusterConnection<String, String> connectCluster() {
* @return A new connection.
*/
@SuppressWarnings("unchecked")
public <K, V> RedisClusterConnection<K, V> connectCluster(RedisCodec<K, V> codec) {
public <K, V> RedisAdvancedClusterConnection<K, V> connectCluster(RedisCodec<K, V> codec) {

return (RedisClusterConnection<K, V>) syncHandler(connectClusterAsyncImpl(codec), RedisClusterConnection.class);
return (RedisAdvancedClusterConnection<K, V>) syncHandler(connectClusterAsyncImpl(codec),
RedisAdvancedClusterConnection.class, RedisClusterConnection.class);
}

/**
* Creates a connection to the redis cluster.
*
* @return A new connection.
*/
public RedisClusterAsyncConnection<String, String> connectClusterAsync() {
public RedisAdvancedClusterAsyncConnection<String, String> connectClusterAsync() {
return connectClusterAsyncImpl(newStringStringCodec(), getSocketAddressSupplier());
}

Expand All @@ -115,7 +116,7 @@ public RedisClusterAsyncConnection<String, String> connectClusterAsync() {
* @param <V> Value type.
* @return A new connection.
*/
public <K, V> RedisClusterAsyncConnection<K, V> connectClusterAsync(RedisCodec<K, V> codec) {
public <K, V> RedisAdvancedClusterAsyncConnection<K, V> connectClusterAsync(RedisCodec<K, V> codec) {
return connectClusterAsyncImpl(codec, getSocketAddressSupplier());
}

Expand Down Expand Up @@ -164,7 +165,7 @@ <K, V> RedisAsyncConnectionImpl<K, V> connectClusterAsyncImpl(RedisCodec<K, V> c
* @param <V> Value type.
* @return a new connection
*/
<K, V> RedisAsyncConnectionImpl<K, V> connectClusterAsyncImpl(RedisCodec<K, V> codec,
<K, V> RedisAdvancedClusterAsyncConnectionImpl<K, V> connectClusterAsyncImpl(RedisCodec<K, V> codec,
final Supplier<SocketAddress> socketAddressSupplier) {

if (partitions == null) {
Expand All @@ -181,8 +182,10 @@ <K, V> RedisAsyncConnectionImpl<K, V> connectClusterAsyncImpl(RedisCodec<K, V> c

final ClusterDistributionChannelWriter<K, V> clusterWriter = new ClusterDistributionChannelWriter<K, V>(handler,
pooledClusterConnectionProvider);
RedisAsyncConnectionImpl<K, V> connection = newRedisAsyncConnectionImpl(clusterWriter, codec, timeout, unit);
RedisAdvancedClusterAsyncConnectionImpl<K, V> connection = newRedisAsyncConnectionImpl(clusterWriter, codec, timeout,
unit);

connection.setPartitions(partitions);
connectAsyncImpl(handler, connection, socketAddressSupplier);

connection.registerCloseables(closeableResources, connection, clusterWriter, pooledClusterConnectionProvider);
Expand Down Expand Up @@ -265,20 +268,21 @@ protected Partitions loadPartitions() {
}

/**
* Construct a new {@link RedisAsyncConnectionImpl}. Can be overridden in order to construct a subclass of
* {@link RedisAsyncConnectionImpl}
* Construct a new {@link RedisAdvancedClusterAsyncConnectionImpl}. Can be overridden in order to construct a subclass of
* {@link RedisAdvancedClusterAsyncConnectionImpl}
*
* @param channelWriter the channel writer
* @param codec the codec to use
* @param timeout Timeout value
* @param unit Timeout unit
* @param <K> Key type.
* @param <V> Value type.
* @return RedisAsyncConnectionImpl&lt;K, V&gt; instance
* @return RedisAdvancedClusterAsyncConnectionImpl&lt;K, V&gt; instance
*/
protected <K, V> RedisAsyncConnectionImpl<K, V> newRedisAsyncConnectionImpl(RedisChannelWriter<K, V> channelWriter,
protected <K, V> RedisAdvancedClusterAsyncConnectionImpl<K, V> newRedisAsyncConnectionImpl(
RedisChannelWriter<K, V> channelWriter,
RedisCodec<K, V> codec, long timeout, TimeUnit unit) {
return new RedisAsyncConnectionImpl<K, V>(channelWriter, codec, timeout, unit);
return new RedisAdvancedClusterAsyncConnectionImpl<K, V>(channelWriter, codec, timeout, unit);
}

protected RedisURI getFirstUri() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.lambdaworks.redis.cluster;

import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;

import com.google.common.collect.ImmutableList;
import com.lambdaworks.redis.RedisClusterAsyncConnection;
import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.TestSettings;

/**
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
public class AbstractClusterTest {

public static final String host = TestSettings.hostAddr();
public static final int port1 = 7379;
public static final int port2 = 7380;
public static final int port3 = 7381;
public static final int port4 = 7382;

protected static RedisClusterClient clusterClient;

protected Logger log = Logger.getLogger(getClass());

protected RedisClusterAsyncConnection<String, String> redis1;

protected String key = "key";
protected String value = "value";

@Rule
public ClusterRule clusterRule = new ClusterRule(clusterClient, port1, port2, port3, port4);

@BeforeClass
public static void setupClusterClient() throws Exception {
clusterClient = new RedisClusterClient(ImmutableList.of(RedisURI.Builder.redis(host, port1).build()));
}

@AfterClass
public static void shutdownClusterClient() {
clusterClient.shutdown(0, 0, TimeUnit.MILLISECONDS);
}

public static int[] createSlots(int from, int to) {
int[] result = new int[to - from];
int counter = 0;
for (int i = from; i < to; i++) {
result[counter++] = i;

}
return result;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.lambdaworks.redis.cluster;

import static com.google.code.tempusfugit.temporal.Duration.seconds;
import static com.google.code.tempusfugit.temporal.Timeout.timeout;
import static org.assertj.core.api.Assertions.assertThat;

import com.lambdaworks.redis.RedisException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import com.google.code.tempusfugit.temporal.Condition;
import com.google.code.tempusfugit.temporal.Duration;
import com.google.code.tempusfugit.temporal.ThreadSleep;
import com.google.code.tempusfugit.temporal.WaitFor;
import com.lambdaworks.redis.RedisClusterAsyncConnection;
import com.lambdaworks.redis.RedisClusterConnection;
import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode;

/**
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
public class AdvancedClusterClientTest extends AbstractClusterTest {

private RedisAdvancedClusterAsyncConnection<String, String> connection;

@Before
public void before() throws Exception {

WaitFor.waitOrTimeout(new Condition() {
@Override
public boolean isSatisfied() {
return clusterRule.isStable();
}
}, timeout(seconds(5)), new ThreadSleep(Duration.millis(500)));

connection = clusterClient.connectClusterAsync();
}

@After
public void after() throws Exception {
connection.close();
}

@Test
public void nodeConnections() throws Exception {

assertThat(clusterClient.getPartitions()).hasSize(4);

for (RedisClusterNode redisClusterNode : clusterClient.getPartitions()) {
RedisClusterAsyncConnection<String, String> nodeConnection = connection.getConnection(redisClusterNode.getNodeId());

String myid = nodeConnection.clusterMyId().get();
assertThat(myid).isEqualTo(redisClusterNode.getNodeId());
}
}

@Test(expected = RedisException.class)
public void unknownNodeId() throws Exception {

connection.getConnection("unknown");
}

@Test(expected = RedisException.class)
public void invalidHost() throws Exception {
connection.getConnection("invalid-host", -1);
}

@Test
public void doWeirdThingsWithClusterconnections() throws Exception {

assertThat(clusterClient.getPartitions()).hasSize(4);

for (RedisClusterNode redisClusterNode : clusterClient.getPartitions()) {
RedisClusterAsyncConnection<String, String> nodeConnection = connection.getConnection(redisClusterNode.getNodeId());

nodeConnection.close();

RedisClusterAsyncConnection<String, String> nextConnection = connection.getConnection(redisClusterNode.getNodeId());
assertThat(connection).isNotSameAs(nextConnection);

}
}

@Test
public void syncConnections() throws Exception {

assertThat(clusterClient.getPartitions()).hasSize(4);

RedisAdvancedClusterConnection<String, String> sync = clusterClient.connectCluster();
for (RedisClusterNode redisClusterNode : clusterClient.getPartitions()) {
RedisClusterConnection<String, String> nodeConnection = sync.getConnection(redisClusterNode.getNodeId());

String myid = nodeConnection.clusterMyId();
assertThat(myid).isEqualTo(redisClusterNode.getNodeId());
}
}
}

0 comments on commit 8f9c22d

Please sign in to comment.