From e77020ef83f83fb07d6a9961381a2d1c6262e42b Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Tue, 17 Jun 2014 17:48:02 +0200 Subject: [PATCH] Support for PUBSUB --- README.md | 2 +- .../redis/BaseRedisAsyncConnection.java | 9 +++++ .../redis/BaseRedisConnection.java | 9 +++++ .../redis/RedisAsyncConnectionImpl.java | 20 ++++++++++ .../redis/RedisCommandBuilder.java | 20 ++++++++++ .../redis/protocol/CommandKeyword.java | 2 +- .../redis/protocol/CommandType.java | 2 +- .../lambdaworks/redis/PubSubCommandTest.java | 39 +++++++++++++++++++ 8 files changed, 100 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 26784a1a9b..6f15411e0e 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ Lettuce works with: Currently following commands are __not yet__ supported: -* Server Commands: DEBUG SEGFAULT, MONITOR, PUBSUB +* Server Commands: MONITOR I'm pursuing the client code as a fork since the latest changes weren't merged back into https://github/wg/lettuce I'd love not adding yet another Java redis client, but unfortunately it's the current situation. diff --git a/lettuce/src/main/java/com/lambdaworks/redis/BaseRedisAsyncConnection.java b/lettuce/src/main/java/com/lambdaworks/redis/BaseRedisAsyncConnection.java index a02d4daaf8..1f9705591e 100644 --- a/lettuce/src/main/java/com/lambdaworks/redis/BaseRedisAsyncConnection.java +++ b/lettuce/src/main/java/com/lambdaworks/redis/BaseRedisAsyncConnection.java @@ -2,6 +2,7 @@ import java.io.Closeable; import java.util.List; +import java.util.Map; /** * @@ -16,6 +17,14 @@ public interface BaseRedisAsyncConnection extends Closeable { RedisFuture publish(K channel, V message); + RedisFuture> pubsubChannels(); + + RedisFuture> pubsubChannels(K channel); + + RedisFuture> pubsubNumsub(K... channels); + + RedisFuture pubsubNumpat(); + RedisFuture echo(V msg); RedisFuture ping(); diff --git a/lettuce/src/main/java/com/lambdaworks/redis/BaseRedisConnection.java b/lettuce/src/main/java/com/lambdaworks/redis/BaseRedisConnection.java index d212bf1efa..0eff4c06e0 100644 --- a/lettuce/src/main/java/com/lambdaworks/redis/BaseRedisConnection.java +++ b/lettuce/src/main/java/com/lambdaworks/redis/BaseRedisConnection.java @@ -2,6 +2,7 @@ import java.io.Closeable; import java.util.List; +import java.util.Map; /** * @@ -16,6 +17,14 @@ public interface BaseRedisConnection extends Closeable { Long publish(K channel, V message); + List pubsubChannels(); + + List pubsubChannels(K channel); + + Map pubsubNumsub(K... channels); + + Long pubsubNumpat(); + V echo(V msg); String ping(); diff --git a/lettuce/src/main/java/com/lambdaworks/redis/RedisAsyncConnectionImpl.java b/lettuce/src/main/java/com/lambdaworks/redis/RedisAsyncConnectionImpl.java index 32b671bf77..9a736226c8 100644 --- a/lettuce/src/main/java/com/lambdaworks/redis/RedisAsyncConnectionImpl.java +++ b/lettuce/src/main/java/com/lambdaworks/redis/RedisAsyncConnectionImpl.java @@ -592,6 +592,26 @@ public RedisFuture publish(K channel, V message) { return dispatch(commandBuilder.publish(channel, message)); } + @Override + public RedisFuture> pubsubChannels() { + return dispatch(commandBuilder.pubsubChannels()); + } + + @Override + public RedisFuture> pubsubChannels(K channel) { + return dispatch(commandBuilder.pubsubChannels(channel)); + } + + @Override + public RedisFuture> pubsubNumsub(K... channels) { + return dispatch(commandBuilder.pubsubNumsub(channels)); + } + + @Override + public RedisFuture pubsubNumpat() { + return dispatch(commandBuilder.pubsubNumpat()); + } + @Override public RedisFuture quit() { return dispatch(commandBuilder.quit()); diff --git a/lettuce/src/main/java/com/lambdaworks/redis/RedisCommandBuilder.java b/lettuce/src/main/java/com/lambdaworks/redis/RedisCommandBuilder.java index 6daecd711e..13817367a5 100644 --- a/lettuce/src/main/java/com/lambdaworks/redis/RedisCommandBuilder.java +++ b/lettuce/src/main/java/com/lambdaworks/redis/RedisCommandBuilder.java @@ -506,6 +506,26 @@ public Command publish(K channel, V message) { return createCommand(PUBLISH, new IntegerOutput(codec), args); } + public Command> pubsubChannels() { + CommandArgs args = new CommandArgs(codec).add(CHANNELS); + return createCommand(PUBSUB, new KeyListOutput(codec), args); + } + + public Command> pubsubChannels(K pattern) { + CommandArgs args = new CommandArgs(codec).add(CHANNELS).addKey(pattern); + return createCommand(PUBSUB, new KeyListOutput(codec), args); + } + + public Command> pubsubNumsub(K... pattern) { + CommandArgs args = new CommandArgs(codec).add(NUMSUB).addKeys(pattern); + return createCommand(PUBSUB, (MapOutput) new MapOutput((RedisCodec) codec), args); + } + + public Command pubsubNumpat() { + CommandArgs args = new CommandArgs(codec).add(NUMPAT); + return createCommand(PUBSUB, new IntegerOutput(codec), args); + } + public Command quit() { return createCommand(QUIT, new StatusOutput(codec)); } diff --git a/lettuce/src/main/java/com/lambdaworks/redis/protocol/CommandKeyword.java b/lettuce/src/main/java/com/lambdaworks/redis/protocol/CommandKeyword.java index b56bbdb3ce..4e2e9aa9ad 100644 --- a/lettuce/src/main/java/com/lambdaworks/redis/protocol/CommandKeyword.java +++ b/lettuce/src/main/java/com/lambdaworks/redis/protocol/CommandKeyword.java @@ -8,7 +8,7 @@ * @author Will Glozer */ public enum CommandKeyword { - ADDSLOTS, AFTER, AGGREGATE, ALPHA, AND, ASC, BEFORE, BY, COUNT, DELSLOTS, DESC, SOFT, HARD, ENCODING, FAILOVER, FORGET, FLUSH, FORCE, FLUSHSLOTS, GETNAME, GETKEYSINSLOT, IDLETIME, KILL, LEN, LIMIT, LIST, LOAD, MATCH, MAX, MEET, MIN, MOVED, NO, NODE, NODES, NOSAVE, NOT, ONE, OR, PAUSE, REFCOUNT, REPLICATE, RESET, REWRITE, RESETSTAT, SETNAME, SETSLOT, MIGRATING, IMPORTING, SLAVES, STORE, SUM, SEGFAULT, WEIGHTS, WITHSCORES, XOR, REMOVE; + ADDSLOTS, AFTER, AGGREGATE, ALPHA, AND, ASC, BEFORE, BY, CHANNELS, COUNT, DELSLOTS, DESC, SOFT, HARD, ENCODING, FAILOVER, FORGET, FLUSH, FORCE, FLUSHSLOTS, GETNAME, GETKEYSINSLOT, IDLETIME, KILL, LEN, LIMIT, LIST, LOAD, MATCH, MAX, MEET, MIN, MOVED, NO, NODE, NODES, NOSAVE, NOT, NUMSUB, NUMPAT, ONE, OR, PAUSE, REFCOUNT, REPLICATE, RESET, REWRITE, RESETSTAT, SETNAME, SETSLOT, MIGRATING, IMPORTING, SLAVES, STORE, SUM, SEGFAULT, WEIGHTS, WITHSCORES, XOR, REMOVE; public byte[] bytes; diff --git a/lettuce/src/main/java/com/lambdaworks/redis/protocol/CommandType.java b/lettuce/src/main/java/com/lambdaworks/redis/protocol/CommandType.java index 88403298f5..c2f194558b 100644 --- a/lettuce/src/main/java/com/lambdaworks/redis/protocol/CommandType.java +++ b/lettuce/src/main/java/com/lambdaworks/redis/protocol/CommandType.java @@ -46,7 +46,7 @@ public enum CommandType { // Pub/Sub - PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, + PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PUBSUB, // Sets diff --git a/lettuce/src/test/java/com/lambdaworks/redis/PubSubCommandTest.java b/lettuce/src/test/java/com/lambdaworks/redis/PubSubCommandTest.java index 6d02ea8193..c0b7e7d248 100644 --- a/lettuce/src/test/java/com/lambdaworks/redis/PubSubCommandTest.java +++ b/lettuce/src/test/java/com/lambdaworks/redis/PubSubCommandTest.java @@ -2,9 +2,13 @@ package com.lambdaworks.redis; +import static org.hamcrest.CoreMatchers.hasItem; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -92,6 +96,41 @@ public void psubscribe() throws Exception { assertEquals(1, (long) counts.take()); } + @Test + public void pubsubChannels() throws Exception { + pubsub.subscribe(channel); + + List result = redis.pubsubChannels(); + assertThat(result, hasItem(channel)); + } + + @Test + public void pubsubChannelsWithArg() throws Exception { + pubsub.subscribe(channel); + + List result = redis.pubsubChannels(pattern); + assertThat(result, hasItem(channel)); + } + + @Test + public void pubsubNumsub() throws Exception { + + pubsub.subscribe(channel); + + Map result = redis.pubsubNumsub(channel); + assertEquals(1, result.size()); + assertEquals("1", result.get(channel)); + } + + @Test + public void pubsubNumpat() throws Exception { + + pubsub.psubscribe(pattern); + + Long result = redis.pubsubNumpat(); + assertEquals(1L, result.longValue()); + } + @Test(timeout = 200) public void punsubscribe() throws Exception { pubsub.punsubscribe(pattern);