Skip to content
This repository has been archived by the owner on Aug 3, 2019. It is now read-only.

Commit

Permalink
Provide a socket address supplier based on the cluster topology #9
Browse files Browse the repository at this point in the history
- Refactor circular iteration into RoundRobin class
- Introduce ConnectionAware and EventExecutorAware interfaces to inject resources into address suppliers
- Add HelloCluster address supplier that obtains its address based on the HELLO command

This commit is a pre-requisite for #8
  • Loading branch information
mp911de committed Oct 14, 2015
1 parent fa4d5d5 commit f54d580
Show file tree
Hide file tree
Showing 12 changed files with 364 additions and 37 deletions.
37 changes: 25 additions & 12 deletions src/main/java/biz/paluch/spinach/DisqueClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
import java.util.concurrent.TimeUnit;

import biz.paluch.spinach.api.DisqueConnection;
import biz.paluch.spinach.impl.DisqueConnectionImpl;
import biz.paluch.spinach.impl.SocketAddressSupplier;
import biz.paluch.spinach.impl.SocketAddressSupplierFactory;
import biz.paluch.spinach.impl.*;

import com.google.common.base.Supplier;
import com.lambdaworks.redis.*;
Expand Down Expand Up @@ -147,17 +145,10 @@ private <K, V> DisqueConnectionImpl<K, V> connect0(RedisCodec<K, V> codec, final

logger.debug("Trying to get a Disque connection for one of: " + disqueURI.getConnectionPoints());

ConnectionBuilder connectionBuilder;

final RedisURI redisURI = new RedisURI();
toRedisURI(disqueURI, null, redisURI);
if (disqueURI.isSsl()) {
connectionBuilder = SslConnectionBuilder.sslConnectionBuilder().ssl(redisURI);
} else {
connectionBuilder = ConnectionBuilder.connectionBuilder();
}

connectionBuilder.clientOptions(options);
connectionBuilder(commandHandler, connection, null, connectionBuilder, redisURI);
ConnectionBuilder connectionBuilder = connectionBuilder(disqueURI, options, commandHandler, connection, redisURI);

boolean connected = false;
Exception causingException = null;
Expand Down Expand Up @@ -209,9 +200,31 @@ public SocketAddress get() {
throw new RedisConnectionException("Cannot connect to Disque: " + disqueURI, causingException);
}

if (socketAddressSupplier instanceof ConnectionAware) {
((ConnectionAware) socketAddressSupplier).setConnection(connection);
}

if (socketAddressSupplier instanceof EventExecutorAware) {
((EventExecutorAware) socketAddressSupplier).setEventExecutor(genericWorkerPool);
}

return connection;
}

private <K, V> ConnectionBuilder connectionBuilder(DisqueURI disqueURI, ClientOptions options,
CommandHandler<K, V> commandHandler, DisqueConnectionImpl<K, V> connection, RedisURI redisURI) {
ConnectionBuilder connectionBuilder;
if (disqueURI.isSsl()) {
connectionBuilder = SslConnectionBuilder.sslConnectionBuilder().ssl(redisURI);
} else {
connectionBuilder = ConnectionBuilder.connectionBuilder();
}

connectionBuilder.clientOptions(options);
connectionBuilder(commandHandler, connection, null, connectionBuilder, redisURI);
return connectionBuilder;
}

private void validateUrisAreOfSameConnectionType(List<? extends ConnectionPoint> connectionPoints) {
boolean unixDomainSocket = false;
boolean inetSocket = false;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/biz/paluch/spinach/cluster/DisqueNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* Representation of a redis cluster node.
*
* @author <a href="mailto:[email protected]">Mark Paluch</a>
* @since 3.0
* @since 0.2
*/
@SuppressWarnings("serial")
public class DisqueNode implements Serializable {
Expand Down
105 changes: 105 additions & 0 deletions src/main/java/biz/paluch/spinach/impl/ClusterAwareNodeSupport.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package biz.paluch.spinach.impl;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;

import java.io.Serializable;
import java.util.*;

import biz.paluch.spinach.api.DisqueConnection;
import biz.paluch.spinach.cluster.DisqueNode;

import com.google.common.collect.Lists;

/**
* Convenient base class for classes that rely on the cluster topology of Disque. Typically subclassed by
* {@link SocketAddressSupplier SocketAddressSuppliers}.
*
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
public abstract class ClusterAwareNodeSupport {

private DisqueConnection<Object, Object> disqueConnection;
private final List<DisqueNode> nodes = Lists.newArrayList();

protected void reloadNodes() {

Hello hello = HelloParser.parse(disqueConnection.sync().hello());
Collections.sort(hello.nodes, new Comparator<PrioritizedDisqueNode>() {
@Override
public int compare(PrioritizedDisqueNode o1, PrioritizedDisqueNode o2) {

if (o1.priority == o2.priority) {
return o1.disqueNode.getPort() - o2.disqueNode.getPort();
}
return o1.priority - o2.priority;
}
});
this.nodes.clear();

for (PrioritizedDisqueNode node : hello.nodes) {
this.nodes.add(node.disqueNode);
}
}

public <K, V> void setConnection(DisqueConnection<K, V> disqueConnection) {
this.disqueConnection = (DisqueConnection<Object, Object>) disqueConnection;
}

public List<DisqueNode> getNodes() {
return nodes;
}

/**
* Disque node with priority.
*/
static class PrioritizedDisqueNode implements Serializable {

DisqueNode disqueNode;
int priority;

}

static class Hello {
long version;
String nodeId;
List<PrioritizedDisqueNode> nodes = Lists.newArrayList();

}

static class HelloParser implements Serializable {

public static Hello parse(List<Object> hello) {

checkArgument(hello.size() > 2, "HELLO output must contain more than two elements");
checkArgument(Long.valueOf(1).equals(hello.get(0)), "Only HELLO version 1 supported. Received HELLO version is "
+ hello.get(0));

Hello result = new Hello();
result.version = (Long) hello.get(0);
result.nodeId = (String) hello.get(1);

for (int i = 2; i < hello.size(); i++) {
checkState(hello.get(i) instanceof Collection, "HELLO output at index " + i + " is not a collection");
Collection<Object> nodeDetails = (Collection<Object>) hello.get(i);
checkState(nodeDetails.size() > 3, "HELLO output at index " + i + " has less than 4 elements");

Iterator<Object> iterator = nodeDetails.iterator();

DisqueNode disqueNode = new DisqueNode();
disqueNode.setNodeId((String) iterator.next());
disqueNode.setAddr((String) iterator.next());
disqueNode.setPort(Integer.parseInt((String) iterator.next()));

PrioritizedDisqueNode prioritizedDisqueNode = new PrioritizedDisqueNode();
prioritizedDisqueNode.disqueNode = disqueNode;
prioritizedDisqueNode.priority = Integer.parseInt((String) iterator.next());

result.nodes.add(prioritizedDisqueNode);
}

return result;
}
}

}
22 changes: 22 additions & 0 deletions src/main/java/biz/paluch/spinach/impl/ConnectionAware.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package biz.paluch.spinach.impl;

import biz.paluch.spinach.api.DisqueConnection;

/**
* Interface to be implemented by {@link SocketAddressSupplier} that want to be aware of their connection.
*
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
public interface ConnectionAware {

/**
* Set the {@link DisqueConnection connection}.
* <p>
* Invoked after activating and authenticating the connection.
*
* @param disqueConnection the connection
* @param <K> Key type
* @param <V> Value type
*/
<K, V> void setConnection(DisqueConnection<K, V> disqueConnection);
}
20 changes: 20 additions & 0 deletions src/main/java/biz/paluch/spinach/impl/EventExecutorAware.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package biz.paluch.spinach.impl;

import io.netty.util.concurrent.EventExecutor;

import java.util.concurrent.ScheduledExecutorService;

/**
* Interface to be implemented by {@link SocketAddressSupplier} that want to be aware of the {@link ScheduledExecutorService}.
*
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
public interface EventExecutorAware {

/**
* Set the {@link ScheduledExecutorService event executor}. Invoked after activating and authenticating the connection.
*
* @param eventExecutor the eventExecutor
*/
void setEventExecutor(ScheduledExecutorService eventExecutor);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package biz.paluch.spinach.impl;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;

import biz.paluch.spinach.api.DisqueConnection;
import biz.paluch.spinach.cluster.DisqueNode;

/**
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
public class HelloClusterSocketAddressSupplier extends ClusterAwareNodeSupport implements SocketAddressSupplier,
ConnectionAware {

private final SocketAddressSupplier bootstrap;

private RoundRobin<DisqueNode> roundRobin;
private String preferredNodeIdPrefix;

public HelloClusterSocketAddressSupplier(SocketAddressSupplier bootstrap) {
this.bootstrap = bootstrap;
}

@Override
public SocketAddress get() {

if (getNodes().isEmpty()) {
return bootstrap.get();
}

DisqueNode disqueNode = roundRobin.next();
return new InetSocketAddress(disqueNode.getAddr(), disqueNode.getPort());
}

@Override
public <K, V> void setConnection(DisqueConnection<K, V> disqueConnection) {
super.setConnection(disqueConnection);
reloadNodes();
}

@Override
protected void reloadNodes() {
super.reloadNodes();

if (preferredNodeIdPrefix != null) {
for (DisqueNode disqueNode : getNodes()) {
if (disqueNode.getNodeId().startsWith(preferredNodeIdPrefix)) {
roundRobin = new RoundRobin<DisqueNode>(getNodes(), disqueNode);
return;
}
}
}

roundRobin = new RoundRobin<DisqueNode>(getNodes());
}

public void setPreferredNodeIdPrefix(String preferredNodeIdPrefix) {
this.preferredNodeIdPrefix = preferredNodeIdPrefix;
}
}
48 changes: 48 additions & 0 deletions src/main/java/biz/paluch/spinach/impl/RoundRobin.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package biz.paluch.spinach.impl;

import java.util.Collection;

/**
* Circular element provider. This class allows infinite scrolling over a collection with the possibility to provide an initial
* offset.
*
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
class RoundRobin<V> {

protected final Collection<? extends V> collection;
protected V offset;

public RoundRobin(Collection<? extends V> collection) {
this(collection, null);
}

public RoundRobin(Collection<? extends V> collection, V offset) {
this.collection = collection;
this.offset = offset;
}

/**
* Returns the next item.
*
* @return the next item
*/
public V next() {
if (offset != null) {
boolean accept = false;
for (V element : collection) {
if (element == offset) {
accept = true;
continue;
}

if (accept) {
return offset = element;
}
}
}

return offset = collection.iterator().next();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,48 +8,30 @@
import com.lambdaworks.redis.ConnectionPoint;

/**
* Round-Robin socket address supplier. Connection points are iterated circular without an end.
* Round-Robin socket address supplier. Connection points are iterated circular/infinitely.
*
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
public class RoundRobinSocketAddressSupplier implements SocketAddressSupplier {

protected final Collection<? extends ConnectionPoint> connectionPoint;
protected ConnectionPoint offset;
protected RoundRobin<? extends ConnectionPoint> roundRobin;

public RoundRobinSocketAddressSupplier(Collection<? extends ConnectionPoint> connectionPoints) {
this(connectionPoints, null);
}

public RoundRobinSocketAddressSupplier(Collection<? extends ConnectionPoint> connectionPoints, ConnectionPoint offset) {
this.connectionPoint = connectionPoints;
this.offset = offset;
this.roundRobin = new RoundRobin<ConnectionPoint>(connectionPoints, offset);
}

@Override
public SocketAddress get() {
ConnectionPoint connectionPoint = getConnectionPoint();
ConnectionPoint connectionPoint = roundRobin.next();
return getSocketAddress(connectionPoint);
}

private ConnectionPoint getConnectionPoint() {
if (offset != null) {
boolean accept = false;
for (ConnectionPoint point : connectionPoint) {
if (point == offset) {
accept = true;
continue;
}

if (accept) {
return offset = point;
}
}
}

return offset = connectionPoint.iterator().next();
}

protected static SocketAddress getSocketAddress(ConnectionPoint connectionPoint) {
if (connectionPoint instanceof DisqueURI.DisqueSocket) {
return ((DisqueURI.DisqueSocket) connectionPoint).getResolvedAddress();
Expand Down
Loading

0 comments on commit f54d580

Please sign in to comment.