Skip to content

Commit

Permalink
Use INFO CLIENTS for connected clients discovery in Redis Cluster #1126
Browse files Browse the repository at this point in the history
Original pull request: #1127.
  • Loading branch information
xujs authored and mp911de committed Sep 25, 2019
1 parent c7ea587 commit 62f402a
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public Requests requestTopology() {
}

/*
* Initiate {@code CLIENT LIST} on all connections and return the {@link Requests}.
* Initiate {@code INFO CLIENTS} on all connections and return the {@link Requests}.
*
* @return the {@link Requests}.
*/
Expand All @@ -115,8 +115,8 @@ public Requests requestClients() {
synchronized (this.connections) {
for (Map.Entry<RedisURI, StatefulRedisConnection<String, String>> entry : this.connections.entrySet()) {

CommandArgs<String, String> args = new CommandArgs<>(StringCodec.UTF8).add(CommandKeyword.LIST);
Command<String, String, String> command = new Command<>(CommandType.CLIENT,
CommandArgs<String, String> args = new CommandArgs<>(StringCodec.UTF8).add(CommandKeyword.CLIENTS);
Command<String, String, String> command = new Command<>(CommandType.INFO,
new StatusOutput<>(StringCodec.UTF8), args);
TimedAsyncCommand<String, String, String> timedCommand = new TimedAsyncCommand<>(command);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,13 @@ static boolean resultAvailable(RedisFuture<?> redisFuture) {
}

private int getClients(String rawClientsOutput) {
return rawClientsOutput.trim().split("\\n").length;
String[] rows = rawClientsOutput.trim().split("\\n");
for(String row : rows){
if(row.startsWith("connected_clients")){
return Integer.parseInt(row.trim().split(":")[1]);
}
}
return 0;
}

long getLatency() {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/lettuce/core/protocol/CommandKeyword.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public enum CommandKeyword implements ProtocolKeyword {

MIGRATING, IMPORTING, SKIPME, SLAVES, STORE, SUM, SEGFAULT, UNBLOCK, WEIGHTS,

WITHSCORES, XOR;
WITHSCORES, XOR, CLIENTS;

public final byte[] bytes;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ void before() {
command.complete();
}

if (command.getType() == CommandType.INFO) {
command.getOutput().set(ByteBuffer.wrap("# Clients\nconnected_clients:2\nclient_longest_output_list:0\nclient_biggest_input_buf:0\nblocked_clients:0".getBytes()));
command.complete();
}

command.encodedAtNs = 10;
command.completedAtNs = 50;

Expand All @@ -140,6 +145,11 @@ void before() {
command.complete();
}

if (command.getType() == CommandType.INFO) {
command.getOutput().set(ByteBuffer.wrap("# Clients\nconnected_clients:2\nclient_longest_output_list:0\nclient_biggest_input_buf:0\nblocked_clients:0".getBytes()));
command.complete();
}

command.encodedAtNs = 10;
command.completedAtNs = 20;

Expand All @@ -149,6 +159,15 @@ void before() {
sut = new ClusterTopologyRefresh(nodeConnectionFactory, clientResources);
}

@Test
void getNodeTopologyView() throws Exception{
Requests requestedTopology = createClusterNodesRequests(1, NODE_1_VIEW);
Requests requestedClients = createClientListRequests(1, "# Clients\r\nconnected_clients:2438\r\nclient_longest_output_list:0\r\nclient_biggest_input_buf:0\r\nblocked_clients:0");
RedisURI redisURI = RedisURI.create("redis://localhost:1" );
NodeTopologyView nodeTopologyView = NodeTopologyView.from(redisURI, requestedTopology, requestedClients);
assertThat(nodeTopologyView.getConnectedClients()).isEqualTo(2438);
}

@Test
void getNodeSpecificViewsNode1IsFasterThanNode2() throws Exception {

Expand Down Expand Up @@ -220,7 +239,7 @@ void getNodeSpecificViewTestingNoAddrFilter() throws Exception {
+ "n6 10.37.110.65:7000 master - 0 1452553663844 45 connected 0-3828 6788-7996 10000-10038 15000-16383";

Requests clusterNodesRequests = createClusterNodesRequests(1, nodes1);
Requests clientRequests = createClientListRequests(1, "c1\nc2\n");
Requests clientRequests = createClientListRequests(1, "# Clients\r\nconnected_clients:2\r\nclient_longest_output_list:0\r\nclient_biggest_input_buf:0\r\nblocked_clients:0");

NodeTopologyViews nodeSpecificViews = sut
.getNodeSpecificViews(clusterNodesRequests, clientRequests, COMMAND_TIMEOUT_NS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@ void shouldCreateTopologyView() throws Exception {
String clusterNodesOutput = "1 127.0.0.1:7380 master,myself - 0 1401258245007 2 disconnected 8000-11999\n";
clusterNodesRequests.addRequest(redisURI, getCommand(clusterNodesOutput));

Requests clientListRequests = new Requests();
String clientListOutput = "id=2 addr=127.0.0.1:58919 fd=6 name= age=3 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=32768 obl=0 oll=0 omem=0 events=r cmd=client\n";
clientListRequests.addRequest(redisURI, getCommand(clientListOutput));
Requests infoClientRequests = new Requests();
String infoClientOutput = "# Clients\r\nconnected_clients:1\r\nclient_longest_output_list:0\r\nclient_biggest_input_buf:0\r\nblocked_clients:0";
infoClientRequests.addRequest(redisURI, getCommand(infoClientOutput));

NodeTopologyView nodeTopologyView = NodeTopologyView.from(redisURI, clusterNodesRequests, clientListRequests);
NodeTopologyView nodeTopologyView = NodeTopologyView.from(redisURI, clusterNodesRequests, infoClientRequests);

assertThat(nodeTopologyView.isAvailable()).isTrue();
assertThat(nodeTopologyView.getConnectedClients()).isEqualTo(1);
assertThat(nodeTopologyView.getPartitions()).hasSize(1);
assertThat(nodeTopologyView.getClusterNodes()).isEqualTo(clusterNodesOutput);
assertThat(nodeTopologyView.getClientList()).isEqualTo(clientListOutput);
assertThat(nodeTopologyView.getClientList()).isEqualTo(infoClientOutput);
}

@Test
Expand Down

0 comments on commit 62f402a

Please sign in to comment.