Skip to content

Commit

Permalink
Support for
Browse files Browse the repository at this point in the history
PUBSUB
  • Loading branch information
mp911de committed Jun 17, 2014
1 parent a47686c commit e77020e
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 3 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Lettuce works with:

Currently following commands are __not yet__ supported:

* Server Commands: DEBUG SEGFAULT, MONITOR, PUBSUB
* Server Commands: MONITOR

I'm pursuing the client code as a fork since the latest changes weren't merged back into https://github/wg/lettuce
I'd love not adding yet another Java redis client, but unfortunately it's the current situation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.Closeable;
import java.util.List;
import java.util.Map;

/**
*
Expand All @@ -16,6 +17,14 @@ public interface BaseRedisAsyncConnection<K, V> extends Closeable {

RedisFuture<Long> publish(K channel, V message);

RedisFuture<List<K>> pubsubChannels();

RedisFuture<List<K>> pubsubChannels(K channel);

RedisFuture<Map<K, Long>> pubsubNumsub(K... channels);

RedisFuture<Long> pubsubNumpat();

RedisFuture<V> echo(V msg);

RedisFuture<String> ping();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.Closeable;
import java.util.List;
import java.util.Map;

/**
*
Expand All @@ -16,6 +17,14 @@ public interface BaseRedisConnection<K, V> extends Closeable {

Long publish(K channel, V message);

List<K> pubsubChannels();

List<K> pubsubChannels(K channel);

Map<K, Long> pubsubNumsub(K... channels);

Long pubsubNumpat();

V echo(V msg);

String ping();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,26 @@ public RedisFuture<Long> publish(K channel, V message) {
return dispatch(commandBuilder.publish(channel, message));
}

@Override
public RedisFuture<List<K>> pubsubChannels() {
return dispatch(commandBuilder.pubsubChannels());
}

@Override
public RedisFuture<List<K>> pubsubChannels(K channel) {
return dispatch(commandBuilder.pubsubChannels(channel));
}

@Override
public RedisFuture<Map<K, Long>> pubsubNumsub(K... channels) {
return dispatch(commandBuilder.pubsubNumsub(channels));
}

@Override
public RedisFuture<Long> pubsubNumpat() {
return dispatch(commandBuilder.pubsubNumpat());
}

@Override
public RedisFuture<String> quit() {
return dispatch(commandBuilder.quit());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,26 @@ public Command<K, V, Long> publish(K channel, V message) {
return createCommand(PUBLISH, new IntegerOutput<K, V>(codec), args);
}

public Command<K, V, List<K>> pubsubChannels() {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(CHANNELS);
return createCommand(PUBSUB, new KeyListOutput<K, V>(codec), args);
}

public Command<K, V, List<K>> pubsubChannels(K pattern) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(CHANNELS).addKey(pattern);
return createCommand(PUBSUB, new KeyListOutput<K, V>(codec), args);
}

public Command<K, V, Map<K, Long>> pubsubNumsub(K... pattern) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(NUMSUB).addKeys(pattern);
return createCommand(PUBSUB, (MapOutput) new MapOutput<K, Long>((RedisCodec) codec), args);
}

public Command<K, V, Long> pubsubNumpat() {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(NUMPAT);
return createCommand(PUBSUB, new IntegerOutput<K, V>(codec), args);
}

public Command<K, V, String> quit() {
return createCommand(QUIT, new StatusOutput<K, V>(codec));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
* @author Will Glozer
*/
public enum CommandKeyword {
ADDSLOTS, AFTER, AGGREGATE, ALPHA, AND, ASC, BEFORE, BY, COUNT, DELSLOTS, DESC, SOFT, HARD, ENCODING, FAILOVER, FORGET, FLUSH, FORCE, FLUSHSLOTS, GETNAME, GETKEYSINSLOT, IDLETIME, KILL, LEN, LIMIT, LIST, LOAD, MATCH, MAX, MEET, MIN, MOVED, NO, NODE, NODES, NOSAVE, NOT, ONE, OR, PAUSE, REFCOUNT, REPLICATE, RESET, REWRITE, RESETSTAT, SETNAME, SETSLOT, MIGRATING, IMPORTING, SLAVES, STORE, SUM, SEGFAULT, WEIGHTS, WITHSCORES, XOR, REMOVE;
ADDSLOTS, AFTER, AGGREGATE, ALPHA, AND, ASC, BEFORE, BY, CHANNELS, COUNT, DELSLOTS, DESC, SOFT, HARD, ENCODING, FAILOVER, FORGET, FLUSH, FORCE, FLUSHSLOTS, GETNAME, GETKEYSINSLOT, IDLETIME, KILL, LEN, LIMIT, LIST, LOAD, MATCH, MAX, MEET, MIN, MOVED, NO, NODE, NODES, NOSAVE, NOT, NUMSUB, NUMPAT, ONE, OR, PAUSE, REFCOUNT, REPLICATE, RESET, REWRITE, RESETSTAT, SETNAME, SETSLOT, MIGRATING, IMPORTING, SLAVES, STORE, SUM, SEGFAULT, WEIGHTS, WITHSCORES, XOR, REMOVE;

public byte[] bytes;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public enum CommandType {

// Pub/Sub

PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE,
PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PUBSUB,

// Sets

Expand Down
39 changes: 39 additions & 0 deletions lettuce/src/test/java/com/lambdaworks/redis/PubSubCommandTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

package com.lambdaworks.redis;

import static org.hamcrest.CoreMatchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;

import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -92,6 +96,41 @@ public void psubscribe() throws Exception {
assertEquals(1, (long) counts.take());
}

@Test
public void pubsubChannels() throws Exception {
pubsub.subscribe(channel);

List<String> result = redis.pubsubChannels();
assertThat(result, hasItem(channel));
}

@Test
public void pubsubChannelsWithArg() throws Exception {
pubsub.subscribe(channel);

List<String> result = redis.pubsubChannels(pattern);
assertThat(result, hasItem(channel));
}

@Test
public void pubsubNumsub() throws Exception {

pubsub.subscribe(channel);

Map<String, Long> result = redis.pubsubNumsub(channel);
assertEquals(1, result.size());
assertEquals("1", result.get(channel));
}

@Test
public void pubsubNumpat() throws Exception {

pubsub.psubscribe(pattern);

Long result = redis.pubsubNumpat();
assertEquals(1L, result.longValue());
}

@Test(timeout = 200)
public void punsubscribe() throws Exception {
pubsub.punsubscribe(pattern);
Expand Down

0 comments on commit e77020e

Please sign in to comment.