Skip to content

Commit

Permalink
Support for SENTINEL MASTERS
Browse files Browse the repository at this point in the history
  • Loading branch information
mp911de committed Jun 29, 2014
1 parent 854c9ff commit b6f2fb5
Show file tree
Hide file tree
Showing 12 changed files with 139 additions and 5 deletions.
7 changes: 7 additions & 0 deletions lettuce/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@
<version>1.2.17</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ public void debugSegfault() {
dispatch(commandBuilder.debugSegfault());
}

@Override
public void debugOom() {
dispatch(commandBuilder.debugOom());
}

@Override
public RedisFuture<Long> decr(K key) {
return dispatch(commandBuilder.decr(key));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ public Command<K, V, Void> debugSegfault() {
return createCommand(DEBUG, null, args);
}

public Command<K, V, Void> debugOom() {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add("OOM");
return createCommand(DEBUG, null, args);
}

public Command<K, V, Long> decr(K key) {
return createCommand(DECR, new IntegerOutput<K, V>(codec), key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.Closeable;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;

Expand All @@ -23,6 +24,13 @@ public interface RedisSentinelAsyncConnection<K, V> extends Closeable {
*/
Future<SocketAddress> getMasterAddrByName(K key);

/**
* Enumerates all the monitored masters and their states.
*
* @return RedisFuture&lt;Map&lt;K, V&gt;&gt;
*/
RedisFuture<List<Map<K, V>>> masters();

/**
* Show the state and info of the specified master.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ public SocketAddress apply(List<V> input) {
return result;
}

@Override
public RedisFuture<List<Map<K, V>>> masters() {

return dispatch(commandBuilder.masters());
}

@Override
public RedisFuture<Map<K, V>> master(K key) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,15 @@ public interface RedisServerAsyncConnection<K, V> {
RedisFuture<String> debugObject(K key);

/**
* Make the server crash.
* Make the server crash: Invalid pointer access.
*/
void debugSegfault();

/**
* Make the server crash: Out of memory.
*/
void debugOom();

/**
* Remove all keys from all databases.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,15 @@ public interface RedisServerConnection<K, V> {
String debugObject(K key);

/**
* Make the server crash.
* Make the server crash: Invalid pointer access.
*/
void debugSegfault();

/**
* Make the server crash: Out of memory.
*/
void debugOom();

/**
* Remove all keys from all databases.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.lambdaworks.redis;

import static com.lambdaworks.redis.protocol.CommandKeyword.FAILOVER;
import static com.lambdaworks.redis.protocol.CommandKeyword.RESET;
import static com.lambdaworks.redis.protocol.CommandKeyword.SLAVES;
import static com.lambdaworks.redis.protocol.CommandType.MONITOR;
import static com.lambdaworks.redis.protocol.CommandType.PING;
import static com.lambdaworks.redis.protocol.CommandType.SENTINEL;
Expand All @@ -10,6 +13,7 @@

import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.output.IntegerOutput;
import com.lambdaworks.redis.output.ListOfMapsOutput;
import com.lambdaworks.redis.output.MapOutput;
import com.lambdaworks.redis.output.StatusOutput;
import com.lambdaworks.redis.output.ValueListOutput;
Expand All @@ -31,23 +35,28 @@ public Command<K, V, List<V>> getMasterAddrByKey(K key) {
return createCommand(SENTINEL, new ValueListOutput<K, V>(codec), args);
}

public Command<K, V, List<Map<K, V>>> masters() {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add("masters");
return createCommand(SENTINEL, new ListOfMapsOutput<K, V>(codec), args);
}

public Command<K, V, Map<K, V>> master(K key) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add("master").addKey(key);
return createCommand(SENTINEL, new MapOutput<K, V>(codec), args);
}

public Command<K, V, Map<K, V>> slaves(K key) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add("slaves").addKey(key);
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(SLAVES).addKey(key);
return createCommand(SENTINEL, new MapOutput<K, V>(codec), args);
}

public Command<K, V, Long> reset(K key) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add("reset").addKey(key);
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(RESET).addKey(key);
return createCommand(SENTINEL, new IntegerOutput<K, V>(codec), args);
}

public Command<K, V, String> failover(K key) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add("failover").addKey(key);
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(FAILOVER).addKey(key);
return createCommand(SENTINEL, new StatusOutput<K, V>(codec), args);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (C) 2011 - Will Glozer. All rights reserved.

package com.lambdaworks.redis.output;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;

/**
* {@link java.util.List} of maps output.
*
* @param <K> Key type.
* @param <V> Value type.
*
* @author Will Glozer
*/
public class ListOfMapsOutput<K, V> extends CommandOutput<K, V, List<Map<K, V>>> {
private MapOutput<K, V> nested;
private int mapCount = -1;
private List<Integer> counts = new ArrayList<Integer>();

public ListOfMapsOutput(RedisCodec<K, V> codec) {
super(codec, new ArrayList<Map<K, V>>());
nested = new MapOutput<K, V>(codec);
}

@Override
public void set(ByteBuffer bytes) {
nested.set(bytes);
}

@Override
public void complete(int depth) {

if (counts.size() > 0) {
int expectedSize = counts.get(0);

if (nested.get().size() == expectedSize) {
counts.remove(0);
output.add(new HashMap<K, V>(nested.get()));
nested.get().clear();
}
}
}

@Override
public void multi(int count) {
if (mapCount == -1) {
mapCount = count;
} else {
// div 2 because of key value pair counts twice
counts.add(count / 2);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,8 @@ public String toString() {
sb.append(']');
return sb.toString();
}

public void multi(int count) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public boolean decode(ByteBuf buffer, RedisCommand<K, V, ?> command, CommandOutp
length = (int) readLong(buffer, buffer.readerIndex(), end);
state.count = length;
buffer.markReaderIndex();
output.multi(state.count);
}

if (state.count <= 0) {
Expand All @@ -150,6 +151,7 @@ public boolean decode(ByteBuf buffer, RedisCommand<K, V, ?> command, CommandOutp

state.count--;
stack.addFirst(new State());

continue loop;
case BYTES:
if ((bytes = readBytes(buffer, state.count)) == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
package com.lambdaworks.redis;

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;

Expand Down Expand Up @@ -63,6 +66,21 @@ public void getSlaveAddr() throws Exception {

}

@Test
public void masters() throws Exception {

Future<List<Map<String, String>>> result = sentinel.masters();
List<Map<String, String>> list = result.get();

assertThat(list.size(), greaterThan(0));

Map<String, String> map = list.get(0);
assertNotNull(map.get("flags"));
assertNotNull(map.get("config-epoch"));
assertNotNull(map.get("port"));

}

@Test
public void getSlaveDownstate() throws Exception {

Expand Down

0 comments on commit b6f2fb5

Please sign in to comment.