Skip to content

Commit

Permalink
Polishing #1126
Browse files Browse the repository at this point in the history
Add author tags. Use regex to discover number of clients.

Original pull request: #1127.
  • Loading branch information
mp911de committed Sep 25, 2019
1 parent 62f402a commit 97700e8
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 37 deletions.
10 changes: 5 additions & 5 deletions src/main/java/io/lettuce/core/cluster/topology/Connections.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
/**
* @author Mark Paluch
* @author Christian Weitendorf
* @author Xujs
*/
class Connections {

Expand Down Expand Up @@ -91,8 +92,8 @@ public Requests requestTopology() {
for (Map.Entry<RedisURI, StatefulRedisConnection<String, String>> entry : this.connections.entrySet()) {

CommandArgs<String, String> args = new CommandArgs<>(StringCodec.UTF8).add(CommandKeyword.NODES);
Command<String, String, String> command = new Command<>(CommandType.CLUSTER, new StatusOutput<>(
StringCodec.UTF8), args);
Command<String, String, String> command = new Command<>(CommandType.CLUSTER,
new StatusOutput<>(StringCodec.UTF8), args);
TimedAsyncCommand<String, String, String> timedCommand = new TimedAsyncCommand<>(command);

entry.getValue().dispatch(timedCommand);
Expand All @@ -115,9 +116,8 @@ public Requests requestClients() {
synchronized (this.connections) {
for (Map.Entry<RedisURI, StatefulRedisConnection<String, String>> entry : this.connections.entrySet()) {

CommandArgs<String, String> args = new CommandArgs<>(StringCodec.UTF8).add(CommandKeyword.CLIENTS);
Command<String, String, String> command = new Command<>(CommandType.INFO,
new StatusOutput<>(StringCodec.UTF8), args);
Command<String, String, String> command = new Command<>(CommandType.INFO, new StatusOutput<>(StringCodec.UTF8),
new CommandArgs<>(StringCodec.UTF8).add("CLIENTS"));
TimedAsyncCommand<String, String, String> timedCommand = new TimedAsyncCommand<>(command);

entry.getValue().dispatch(timedCommand);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package io.lettuce.core.cluster.topology;

import java.util.concurrent.ExecutionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import io.lettuce.core.RedisFuture;
import io.lettuce.core.RedisURI;
Expand All @@ -25,9 +27,11 @@

/**
* @author Mark Paluch
* @author Xujs
*/
class NodeTopologyView {

private static final Pattern NUMBER = Pattern.compile("(\\d+)");
private final boolean available;
private final RedisURI redisURI;

Expand All @@ -39,7 +43,7 @@ class NodeTopologyView {

private final String clientList;

NodeTopologyView(RedisURI redisURI) {
private NodeTopologyView(RedisURI redisURI) {

this.available = false;
this.redisURI = redisURI;
Expand Down Expand Up @@ -74,7 +78,7 @@ static NodeTopologyView from(RedisURI redisURI, Requests clusterNodesRequests, R
return new NodeTopologyView(redisURI);
}

static boolean resultAvailable(RedisFuture<?> redisFuture) {
private static boolean resultAvailable(RedisFuture<?> redisFuture) {

if (redisFuture != null && redisFuture.isDone() && !redisFuture.isCancelled()) {
return true;
Expand All @@ -85,9 +89,11 @@ static boolean resultAvailable(RedisFuture<?> redisFuture) {

private int getClients(String rawClientsOutput) {
String[] rows = rawClientsOutput.trim().split("\\n");
for(String row : rows){
if(row.startsWith("connected_clients")){
return Integer.parseInt(row.trim().split(":")[1]);
for (String row : rows) {

Matcher matcher = NUMBER.matcher(row);
if (matcher.find()) {
return Integer.parseInt(matcher.group(1));
}
}
return 0;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/lettuce/core/protocol/CommandKeyword.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public enum CommandKeyword implements ProtocolKeyword {

MIGRATING, IMPORTING, SKIPME, SLAVES, STORE, SUM, SEGFAULT, UNBLOCK, WEIGHTS,

WITHSCORES, XOR, CLIENTS;
WITHSCORES, XOR;

public final byte[] bytes;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ void before() {
}

if (command.getType() == CommandType.INFO) {
command.getOutput().set(ByteBuffer.wrap("# Clients\nconnected_clients:2\nclient_longest_output_list:0\nclient_biggest_input_buf:0\nblocked_clients:0".getBytes()));
command.getOutput().set(ByteBuffer.wrap(
"# Clients\nconnected_clients:2\nclient_longest_output_list:0\nclient_biggest_input_buf:0\nblocked_clients:0"
.getBytes()));
command.complete();
}

Expand All @@ -146,7 +148,9 @@ void before() {
}

if (command.getType() == CommandType.INFO) {
command.getOutput().set(ByteBuffer.wrap("# Clients\nconnected_clients:2\nclient_longest_output_list:0\nclient_biggest_input_buf:0\nblocked_clients:0".getBytes()));
command.getOutput().set(ByteBuffer.wrap(
"# Clients\nconnected_clients:2\nclient_longest_output_list:0\nclient_biggest_input_buf:0\nblocked_clients:0"
.getBytes()));
command.complete();
}

Expand All @@ -160,10 +164,11 @@ void before() {
}

@Test
void getNodeTopologyView() throws Exception{
void getNodeTopologyView() throws Exception {
Requests requestedTopology = createClusterNodesRequests(1, NODE_1_VIEW);
Requests requestedClients = createClientListRequests(1, "# Clients\r\nconnected_clients:2438\r\nclient_longest_output_list:0\r\nclient_biggest_input_buf:0\r\nblocked_clients:0");
RedisURI redisURI = RedisURI.create("redis://localhost:1" );
Requests requestedClients = createClientListRequests(1,
"# Clients\r\nconnected_clients:2438\r\nclient_longest_output_list:0\r\nclient_biggest_input_buf:0\r\nblocked_clients:0");
RedisURI redisURI = RedisURI.create("redis://localhost:1");
NodeTopologyView nodeTopologyView = NodeTopologyView.from(redisURI, requestedTopology, requestedClients);
assertThat(nodeTopologyView.getConnectedClients()).isEqualTo(2438);
}
Expand Down Expand Up @@ -239,10 +244,11 @@ void getNodeSpecificViewTestingNoAddrFilter() throws Exception {
+ "n6 10.37.110.65:7000 master - 0 1452553663844 45 connected 0-3828 6788-7996 10000-10038 15000-16383";

Requests clusterNodesRequests = createClusterNodesRequests(1, nodes1);
Requests clientRequests = createClientListRequests(1, "# Clients\r\nconnected_clients:2\r\nclient_longest_output_list:0\r\nclient_biggest_input_buf:0\r\nblocked_clients:0");
Requests clientRequests = createClientListRequests(1,
"# Clients\r\nconnected_clients:2\r\nclient_longest_output_list:0\r\nclient_biggest_input_buf:0\r\nblocked_clients:0");

NodeTopologyViews nodeSpecificViews = sut
.getNodeSpecificViews(clusterNodesRequests, clientRequests, COMMAND_TIMEOUT_NS);
NodeTopologyViews nodeSpecificViews = sut.getNodeSpecificViews(clusterNodesRequests, clientRequests,
COMMAND_TIMEOUT_NS);

List<Partitions> values = new ArrayList<>(nodeSpecificViews.toMap().values());

Expand All @@ -267,8 +273,8 @@ void getNodeSpecificViewsNode2IsFasterThanNode1() throws Exception {

Requests clientRequests = createClientListRequests(5, "c1\nc2\n").mergeWith(createClientListRequests(1, "c1\nc2\n"));

NodeTopologyViews nodeSpecificViews = sut
.getNodeSpecificViews(clusterNodesRequests, clientRequests, COMMAND_TIMEOUT_NS);
NodeTopologyViews nodeSpecificViews = sut.getNodeSpecificViews(clusterNodesRequests, clientRequests,
COMMAND_TIMEOUT_NS);
List<Partitions> values = new ArrayList<>(nodeSpecificViews.toMap().values());

assertThat(values).hasSize(2);
Expand Down Expand Up @@ -394,8 +400,8 @@ void undiscoveredAdditionalNodesShouldBeLastUsingClientCount() {

List<RedisClusterNode> nodes = TopologyComparators.sortByClientCount(partitions);

assertThat(nodes).hasSize(2).extracting(RedisClusterNode::getUri)
.containsSequence(seed.get(0), RedisURI.create("127.0.0.1", 7381));
assertThat(nodes).hasSize(2).extracting(RedisClusterNode::getUri).containsSequence(seed.get(0),
RedisURI.create("127.0.0.1", 7381));
}

@Test
Expand All @@ -414,8 +420,8 @@ void discoveredAdditionalNodesShouldBeOrderedUsingClientCount() {

List<RedisClusterNode> nodes = TopologyComparators.sortByClientCount(partitions);

assertThat(nodes).hasSize(2).extracting(RedisClusterNode::getUri)
.containsSequence(RedisURI.create("127.0.0.1", 7381), seed.get(0));
assertThat(nodes).hasSize(2).extracting(RedisClusterNode::getUri).containsSequence(RedisURI.create("127.0.0.1", 7381),
seed.get(0));
}

@Test
Expand All @@ -432,8 +438,8 @@ void undiscoveredAdditionalNodesShouldBeLastUsingLatency() {

List<RedisClusterNode> nodes = TopologyComparators.sortByLatency(partitions);

assertThat(nodes).hasSize(2).extracting(RedisClusterNode::getUri)
.containsSequence(seed.get(0), RedisURI.create("127.0.0.1", 7381));
assertThat(nodes).hasSize(2).extracting(RedisClusterNode::getUri).containsSequence(seed.get(0),
RedisURI.create("127.0.0.1", 7381));
}

@Test
Expand All @@ -452,8 +458,8 @@ void discoveredAdditionalNodesShouldBeOrderedUsingLatency() {

List<RedisClusterNode> nodes = TopologyComparators.sortByLatency(partitions);

assertThat(nodes).hasSize(2).extracting(RedisClusterNode::getUri)
.containsSequence(RedisURI.create("127.0.0.1", 7381), seed.get(0));
assertThat(nodes).hasSize(2).extracting(RedisClusterNode::getUri).containsSequence(RedisURI.create("127.0.0.1", 7381),
seed.get(0));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

/**
* @author Mark Paluch
* @author Xujs
*/
class RequestsUnitTests {

Expand All @@ -43,13 +44,13 @@ void shouldCreateTopologyView() throws Exception {
clusterNodesRequests.addRequest(redisURI, getCommand(clusterNodesOutput));

Requests infoClientRequests = new Requests();
String infoClientOutput = "# Clients\r\nconnected_clients:1\r\nclient_longest_output_list:0\r\nclient_biggest_input_buf:0\r\nblocked_clients:0";
String infoClientOutput = "# Clients\r\nconnected_clients:100\r\nclient_longest_output_list:0\r\nclient_biggest_input_buf:0\r\nblocked_clients:0";
infoClientRequests.addRequest(redisURI, getCommand(infoClientOutput));

NodeTopologyView nodeTopologyView = NodeTopologyView.from(redisURI, clusterNodesRequests, infoClientRequests);

assertThat(nodeTopologyView.isAvailable()).isTrue();
assertThat(nodeTopologyView.getConnectedClients()).isEqualTo(1);
assertThat(nodeTopologyView.getConnectedClients()).isEqualTo(100);
assertThat(nodeTopologyView.getPartitions()).hasSize(1);
assertThat(nodeTopologyView.getClusterNodes()).isEqualTo(clusterNodesOutput);
assertThat(nodeTopologyView.getClientList()).isEqualTo(infoClientOutput);
Expand Down Expand Up @@ -79,8 +80,7 @@ void awaitShouldReturnAwaitedTime() throws Exception {

RedisURI redisURI = RedisURI.create("localhost", 6379);
Requests requests = new Requests();
Command<String, String, String> command = new Command<>(CommandType.TYPE,
new StatusOutput<>(new Utf8StringCodec()));
Command<String, String, String> command = new Command<>(CommandType.TYPE, new StatusOutput<>(new Utf8StringCodec()));
TimedAsyncCommand timedAsyncCommand = new TimedAsyncCommand(command);

requests.addRequest(redisURI, timedAsyncCommand);
Expand All @@ -93,8 +93,7 @@ void awaitShouldReturnAwaitedTimeIfNegative() throws Exception {

RedisURI redisURI = RedisURI.create("localhost", 6379);
Requests requests = new Requests();
Command<String, String, String> command = new Command<>(CommandType.TYPE,
new StatusOutput<>(new Utf8StringCodec()));
Command<String, String, String> command = new Command<>(CommandType.TYPE, new StatusOutput<>(new Utf8StringCodec()));
TimedAsyncCommand timedAsyncCommand = new TimedAsyncCommand(command);

requests.addRequest(redisURI, timedAsyncCommand);
Expand All @@ -104,8 +103,7 @@ void awaitShouldReturnAwaitedTimeIfNegative() throws Exception {
}

private TimedAsyncCommand getCommand(String response) {
Command<String, String, String> command = new Command<>(CommandType.TYPE,
new StatusOutput<>(new Utf8StringCodec()));
Command<String, String, String> command = new Command<>(CommandType.TYPE, new StatusOutput<>(new Utf8StringCodec()));
TimedAsyncCommand timedAsyncCommand = new TimedAsyncCommand(command);

command.getOutput().set(ByteBuffer.wrap(response.getBytes()));
Expand Down

0 comments on commit 97700e8

Please sign in to comment.