diff --git a/src/main/java/com/lambdaworks/redis/AbstractRedisAsyncCommands.java b/src/main/java/com/lambdaworks/redis/AbstractRedisAsyncCommands.java index a46d86c561..3b2f77a7fe 100644 --- a/src/main/java/com/lambdaworks/redis/AbstractRedisAsyncCommands.java +++ b/src/main/java/com/lambdaworks/redis/AbstractRedisAsyncCommands.java @@ -1684,4 +1684,14 @@ public void reset() { public StatefulConnection getConnection() { return connection; } + + @Override + public void setAutoFlushCommands(boolean autoFlush) { + connection.setAutoFlushCommands(autoFlush); + } + + @Override + public void flushCommands() { + connection.flushCommands(); + } } diff --git a/src/main/java/com/lambdaworks/redis/BaseRedisAsyncConnection.java b/src/main/java/com/lambdaworks/redis/BaseRedisAsyncConnection.java index 59a9c7e1ef..c1f2697b26 100644 --- a/src/main/java/com/lambdaworks/redis/BaseRedisAsyncConnection.java +++ b/src/main/java/com/lambdaworks/redis/BaseRedisAsyncConnection.java @@ -158,4 +158,19 @@ public interface BaseRedisAsyncConnection extends Closeable, BaseRedisAsyn */ boolean isOpen(); + /** + * Disable or enable auto-flush behavior. Default is {@literal true}. If autoFlushCommands is disabled, multiple commands + * can be issued without writing them actually to the transport. Commands are buffered until a {@link #flushCommands()} is + * issued. After calling {@link #flushCommands()} commands are sent to the transport and executed by Redis. + * + * @param autoFlush state of autoFlush. + */ + void setAutoFlushCommands(boolean autoFlush); + + /** + * Flush pending commands. This commands forces a flush on the channel and can be used to buffer ("pipeline") commands to + * achieve batching. No-op if channel is not connected. + */ + void flushCommands(); + } diff --git a/src/main/java/com/lambdaworks/redis/RedisChannelHandler.java b/src/main/java/com/lambdaworks/redis/RedisChannelHandler.java index 872d705fd5..b95cb30ad2 100644 --- a/src/main/java/com/lambdaworks/redis/RedisChannelHandler.java +++ b/src/main/java/com/lambdaworks/redis/RedisChannelHandler.java @@ -209,4 +209,12 @@ protected T syncHandler(Object asyncApi, Class... interfaces) { FutureSyncInvocationHandler h = new FutureSyncInvocationHandler<>((StatefulConnection) this, asyncApi); return (T) Proxy.newProxyInstance(AbstractRedisClient.class.getClassLoader(), interfaces, h); } + + public void setAutoFlushCommands(boolean autoFlush) { + getChannelWriter().setAutoFlushCommands(autoFlush); + } + + public void flushCommands() { + getChannelWriter().flushCommands(); + } } diff --git a/src/main/java/com/lambdaworks/redis/RedisChannelWriter.java b/src/main/java/com/lambdaworks/redis/RedisChannelWriter.java index a755c6842e..7a11e677e7 100644 --- a/src/main/java/com/lambdaworks/redis/RedisChannelWriter.java +++ b/src/main/java/com/lambdaworks/redis/RedisChannelWriter.java @@ -40,4 +40,19 @@ public interface RedisChannelWriter extends Closeable { * @param redisChannelHandler the channel handler (external connection object) */ void setRedisChannelHandler(RedisChannelHandler redisChannelHandler); + + /** + * Disable or enable auto-flush behavior. Default is {@literal true}. If autoFlushCommands is disabled, multiple commands + * can be issued without writing them actually to the transport. Commands are buffered until a {@link #flushCommands()} is + * issued. After calling {@link #flushCommands()} commands are sent to the transport and executed by Redis. + * + * @param autoFlush state of autoFlush. + */ + void setAutoFlushCommands(boolean autoFlush); + + /** + * Flush pending commands. This commands forces a flush on the channel and can be used to buffer ("pipeline") commands to + * achieve batching. No-op if channel is not connected. + */ + void flushCommands(); } diff --git a/src/main/java/com/lambdaworks/redis/RedisConnectionPool.java b/src/main/java/com/lambdaworks/redis/RedisConnectionPool.java index d47103f828..fb37f7e778 100644 --- a/src/main/java/com/lambdaworks/redis/RedisConnectionPool.java +++ b/src/main/java/com/lambdaworks/redis/RedisConnectionPool.java @@ -176,7 +176,7 @@ void addListener(CloseEvents.CloseListener listener) { * @since 3.0 */ static class PooledConnectionInvocationHandler extends AbstractInvocationHandler { - public static final Set DISABLED_METHODS = ImmutableSet.of("auth", "select", "quit", "getStatefulConnection"); + public static final Set DISABLED_METHODS = ImmutableSet.of("auth", "select", "quit", "getStatefulConnection", "setAutoFlushCommands"); private T connection; private final RedisConnectionPool pool; diff --git a/src/main/java/com/lambdaworks/redis/api/StatefulConnection.java b/src/main/java/com/lambdaworks/redis/api/StatefulConnection.java index ae4d7b131c..8dac4e9887 100644 --- a/src/main/java/com/lambdaworks/redis/api/StatefulConnection.java +++ b/src/main/java/com/lambdaworks/redis/api/StatefulConnection.java @@ -65,4 +65,19 @@ public interface StatefulConnection extends AutoCloseable { * internal state machine gets out of sync with the connection. */ void reset(); + + /** + * Disable or enable auto-flush behavior. Default is {@literal true}. If autoFlushCommands is disabled, multiple commands + * can be issued without writing them actually to the transport. Commands are buffered until a {@link #flushCommands()} is + * issued. After calling {@link #flushCommands()} commands are sent to the transport and executed by Redis. + * + * @param autoFlush state of autoFlush. + */ + void setAutoFlushCommands(boolean autoFlush); + + /** + * Flush pending commands. This commands forces a flush on the channel and can be used to buffer ("pipeline") commands to + * achieve batching. No-op if channel is not connected. + */ + void flushCommands(); } diff --git a/src/main/java/com/lambdaworks/redis/cluster/ClusterConnectionProvider.java b/src/main/java/com/lambdaworks/redis/cluster/ClusterConnectionProvider.java index 43601feb6b..264459f6a6 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/ClusterConnectionProvider.java +++ b/src/main/java/com/lambdaworks/redis/cluster/ClusterConnectionProvider.java @@ -56,6 +56,21 @@ interface ClusterConnectionProvider extends Closeable { */ void setPartitions(Partitions partitions); + /** + * Disable or enable auto-flush behavior. Default is {@literal true}. If autoFlushCommands is disabled, multiple commands + * can be issued without writing them actually to the transport. Commands are buffered until a {@link #flushCommands()} is + * issued. After calling {@link #flushCommands()} commands are sent to the transport and executed by Redis. + * + * @param autoFlush state of autoFlush. + */ + void setAutoFlushCommands(boolean autoFlush); + + /** + * Flush pending commands. This commands forces a flush on the channel and can be used to buffer ("pipeline") commands to + * achieve batching. No-op if channel is not connected. + */ + void flushCommands(); + enum Intent { READ, WRITE; } diff --git a/src/main/java/com/lambdaworks/redis/cluster/ClusterDistributionChannelWriter.java b/src/main/java/com/lambdaworks/redis/cluster/ClusterDistributionChannelWriter.java index 96e87b07be..7fd6c9f42d 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/ClusterDistributionChannelWriter.java +++ b/src/main/java/com/lambdaworks/redis/cluster/ClusterDistributionChannelWriter.java @@ -131,6 +131,16 @@ public void setRedisChannelHandler(RedisChannelHandler redisChannelHandler defaultWriter.setRedisChannelHandler(redisChannelHandler); } + @Override + public void setAutoFlushCommands(boolean autoFlush) { + getClusterConnectionProvider().setAutoFlushCommands(autoFlush); + } + + @Override + public void flushCommands() { + getClusterConnectionProvider().flushCommands(); + } + public ClusterConnectionProvider getClusterConnectionProvider() { return clusterConnectionProvider; } diff --git a/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java b/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java index 699664c314..ff472e7edc 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java +++ b/src/main/java/com/lambdaworks/redis/cluster/PooledClusterConnectionProvider.java @@ -9,11 +9,10 @@ import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableMap; import com.lambdaworks.redis.LettuceStrings; -import com.lambdaworks.redis.RedisAsyncConnection; +import com.lambdaworks.redis.RedisChannelHandler; import com.lambdaworks.redis.RedisException; import com.lambdaworks.redis.RedisURI; import com.lambdaworks.redis.api.StatefulRedisConnection; -import com.lambdaworks.redis.cluster.api.StatefulRedisClusterConnection; import com.lambdaworks.redis.cluster.models.partitions.Partitions; import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode; import com.lambdaworks.redis.codec.RedisCodec; @@ -37,6 +36,9 @@ class PooledClusterConnectionProvider implements ClusterConnectionProvider private final StatefulRedisConnection writers[] = new StatefulRedisConnection[SlotHash.SLOT_COUNT]; private Partitions partitions; + private boolean autoFlushCommands = true; + private Object stateLock = new Object(); + public PooledClusterConnectionProvider(final RedisClusterClient redisClusterClient, final RedisCodec redisCodec) { this.debugEnabled = logger.isDebugEnabled(); this.connections = CacheBuilder.newBuilder().build(new CacheLoader>() { @@ -48,6 +50,10 @@ public StatefulRedisConnection load(PoolKey key) throws Exception { connection.sync().readOnly(); } + synchronized (stateLock) { + connection.setAutoFlushCommands(autoFlushCommands); + } + return connection; } }); @@ -186,7 +192,30 @@ public void setPartitions(Partitions partitions) { resetPartitions(); } + @Override + public void setAutoFlushCommands(boolean autoFlush) { + synchronized (stateLock) { + this.autoFlushCommands = autoFlush; + } + + for (StatefulRedisConnection connection : connections.asMap().values()) { + connection.setAutoFlushCommands(autoFlush); + } + } + + @Override + public void flushCommands() { + + for (StatefulRedisConnection connection : connections.asMap().values()) { + connection.flushCommands(); + } + + } + protected void resetPartitions() { - Arrays.fill(writers, null); + + synchronized (stateLock) { + Arrays.fill(writers, null); + } } } diff --git a/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterAsyncCommandsImpl.java b/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterAsyncCommandsImpl.java index 56d214b37b..502c7d9663 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterAsyncCommandsImpl.java +++ b/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterAsyncCommandsImpl.java @@ -160,7 +160,6 @@ public RedisFuture mset(Map map) { return null; }); - } @Override @@ -201,7 +200,6 @@ private T execute(Callable function) { } catch (Exception e) { throw new RedisException(e); } - } @Override @@ -253,5 +251,4 @@ protected AsyncNodeSelection nodes(Predicate predicate, return (AsyncNodeSelection) Proxy.newProxyInstance(NodeSelection.class.getClassLoader(), new Class[] { NodeSelectionAsyncCommands.class, AsyncNodeSelection.class }, h); } - } diff --git a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java index 2ea11a4ccf..ea7bc1dac4 100644 --- a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java +++ b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java @@ -11,10 +11,20 @@ import java.util.concurrent.locks.ReentrantLock; import com.google.common.annotations.VisibleForTesting; -import com.lambdaworks.redis.*; +import com.lambdaworks.redis.ClientOptions; +import com.lambdaworks.redis.ConnectionEvents; +import com.lambdaworks.redis.RedisChannelHandler; +import com.lambdaworks.redis.RedisChannelWriter; +import com.lambdaworks.redis.RedisException; import io.netty.buffer.ByteBuf; -import io.netty.channel.*; +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.internal.logging.InternalLogger; @@ -58,6 +68,7 @@ public class CommandHandler extends ChannelDuplexHandler implements RedisC private RedisChannelHandler redisChannelHandler; private Throwable connectionError; private String logPrefix; + private boolean autoFlushCommands = true; /** * Initialize a new instance that handles commands from the supplied queue. @@ -80,7 +91,7 @@ public CommandHandler(ClientOptions clientOptions, Queue> @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { setState(LifecycleState.REGISTERED); - buffer = ctx.alloc().heapBuffer(); + buffer = ctx.alloc().directBuffer(8192 * 8); rsm = new RedisStateMachine(); synchronized (stateLock) { channel = ctx.channel(); @@ -175,14 +186,17 @@ public > C write(C command) { if (reliability == Reliability.AT_MOST_ONCE) { // cancel on exceptions and remove from queue, because there is no housekeeping channel.write(command).addListener(new AtMostOnceWriteListener(command, queue)); - channel.flush(); } if (reliability == Reliability.AT_LEAST_ONCE) { // commands are ok to stay within the queue, reconnect will retrigger them channel.write(command).addListener(WRITE_LOG_LISTENER); + } + + if (autoFlushCommands) { channel.flush(); } + } else { if (commandBuffer.contains(command) || queue.contains(command)) { @@ -217,6 +231,13 @@ private boolean isConnected() { && lifecycleState.ordinal() <= LifecycleState.DISCONNECTED.ordinal(); } + @Override + public void flushCommands() { + if (channel != null && isConnected() && channel.isActive()) { + channel.flush(); + } + } + /** * * @see io.netty.channel.ChannelDuplexHandler#write(io.netty.channel.ChannelHandlerContext, java.lang.Object, @@ -482,6 +503,13 @@ public void setRedisChannelHandler(RedisChannelHandler redisChannelHandler this.redisChannelHandler = redisChannelHandler; } + @Override + public void setAutoFlushCommands(boolean autoFlush) { + synchronized (stateLock) { + this.autoFlushCommands = autoFlush; + } + } + private String logPrefix() { if (logPrefix != null) { return logPrefix; diff --git a/src/test/java/com/lambdaworks/redis/PipeliningTest.java b/src/test/java/com/lambdaworks/redis/PipeliningTest.java new file mode 100644 index 0000000000..9021fdea96 --- /dev/null +++ b/src/test/java/com/lambdaworks/redis/PipeliningTest.java @@ -0,0 +1,86 @@ +package com.lambdaworks.redis; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import com.google.common.collect.Lists; + +/** + * @author Mark Paluch + */ +public class PipeliningTest extends AbstractRedisClientTest { + + @Test + public void basic() throws Exception { + + RedisAsyncConnection connection = client.connectAsync(); + connection.setAutoFlushCommands(false); + + int iterations = 1000; + List> futures = triggerSet(connection, iterations); + + verifyNotExecuted(iterations); + + connection.flushCommands(); + + LettuceFutures.awaitAll(5, TimeUnit.SECONDS, futures.toArray(new RedisFuture[futures.size()])); + + verifyExecuted(iterations); + + connection.close(); + } + + protected void verifyExecuted(int iterations) { + for (int i = 0; i < iterations; i++) { + assertThat(redis.get(key(i))).as("Key " + key(i) + " must be " + value(i)).isEqualTo(value(i)); + } + } + + @Test + public void setAutoFlushTrueDoesNotFlush() throws Exception { + + RedisAsyncConnection connection = client.connectAsync(); + connection.setAutoFlushCommands(false); + + int iterations = 1000; + List> futures = triggerSet(connection, iterations); + + verifyNotExecuted(iterations); + + connection.setAutoFlushCommands(true); + + verifyNotExecuted(iterations); + + connection.flushCommands(); + boolean result = LettuceFutures.awaitAll(5, TimeUnit.SECONDS, futures.toArray(new RedisFuture[futures.size()])); + assertThat(result).isTrue(); + + connection.close(); + } + + protected void verifyNotExecuted(int iterations) { + for (int i = 0; i < iterations; i++) { + assertThat(redis.get(key(i))).as("Key " + key(i) + " must be null").isNull(); + } + } + + protected List> triggerSet(RedisAsyncConnection connection, int iterations) { + List> futures = Lists.newArrayList(); + for (int i = 0; i < iterations; i++) { + futures.add(connection.set(key(i), value(i))); + } + return futures; + } + + protected String value(int i) { + return value + "-" + i; + } + + protected String key(int i) { + return key + "-" + i; + } +} diff --git a/src/test/java/com/lambdaworks/redis/PoolConnectionTest.java b/src/test/java/com/lambdaworks/redis/PoolConnectionTest.java index a26560ae75..c384d98b39 100644 --- a/src/test/java/com/lambdaworks/redis/PoolConnectionTest.java +++ b/src/test/java/com/lambdaworks/redis/PoolConnectionTest.java @@ -36,6 +36,21 @@ public void getStatefulConnection() throws Exception { c1.getStatefulConnection(); } + @Test + public void setAutoFlushCommandsNotAllowed() throws Exception { + + RedisConnectionPool> pool = client.asyncPool(); + RedisAsyncConnection c1 = pool.allocateConnection(); + try { + c1.setAutoFlushCommands(true); + fail("Missing UnsupportedOperationException"); + } catch (Exception e) { + assertThat(e).isInstanceOf(UnsupportedOperationException.class); + } finally { + pool.close(); + } + } + @Test public void sameConnectionAfterFree() throws Exception { RedisConnectionPool> pool = client.pool(); diff --git a/src/test/java/com/lambdaworks/redis/cluster/AdvancedClusterClientTest.java b/src/test/java/com/lambdaworks/redis/cluster/AdvancedClusterClientTest.java index c4d30bb394..404e6d0616 100644 --- a/src/test/java/com/lambdaworks/redis/cluster/AdvancedClusterClientTest.java +++ b/src/test/java/com/lambdaworks/redis/cluster/AdvancedClusterClientTest.java @@ -11,6 +11,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.Lists; +import com.lambdaworks.redis.LettuceFutures; +import com.lambdaworks.redis.RedisFuture; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -30,6 +36,8 @@ import com.lambdaworks.redis.cluster.api.sync.RedisAdvancedClusterCommands; import com.lambdaworks.redis.cluster.api.sync.RedisClusterCommands; import com.lambdaworks.redis.cluster.models.partitions.Partitions; +import com.lambdaworks.redis.RedisClusterConnection; +import com.lambdaworks.redis.RedisException; import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode; /** @@ -386,4 +394,44 @@ public void testSync() throws Exception { assertThat(sync.getStatefulConnection()).isSameAs(commands.getStatefulConnection()); } + + @Test + public void pipelining() throws Exception { + + RedisAdvancedClusterConnection verificationConnection = clusterClient.connectCluster(); + + // preheat the first connection + commands.get(key(0)).get(); + + int iterations = 1000; + commands.setAutoFlushCommands(false); + List> futures = Lists.newArrayList(); + for (int i = 0; i < iterations; i++) { + futures.add(commands.set(key(i), value(i))); + } + + for (int i = 0; i < iterations; i++) { + assertThat(verificationConnection.get(key(i))).as("Key " + key(i) + " must be null").isNull(); + } + + commands.flushCommands(); + boolean result = LettuceFutures.awaitAll(5, TimeUnit.SECONDS, futures.toArray(new RedisFuture[futures.size()])); + assertThat(result).isTrue(); + + for (int i = 0; i < iterations; i++) { + assertThat(verificationConnection.get(key(i))).as("Key " + key(i) + " must be " + value(i)).isEqualTo(value(i)); + } + + verificationConnection.close(); + + } + + protected String value(int i) { + return value + "-" + i; + } + + protected String key(int i) { + return key + "-" + i; + } + } diff --git a/src/test/java/com/lambdaworks/redis/commands/KeyCommandTest.java b/src/test/java/com/lambdaworks/redis/commands/KeyCommandTest.java index 9697662dc5..a97b1f4c3b 100644 --- a/src/test/java/com/lambdaworks/redis/commands/KeyCommandTest.java +++ b/src/test/java/com/lambdaworks/redis/commands/KeyCommandTest.java @@ -158,8 +158,8 @@ public void pttl() throws Exception { assertThat((long) redis.pttl(key)).isEqualTo(-2); redis.set(key, value); assertThat((long) redis.pttl(key)).isEqualTo(-1); - redis.pexpire(key, 10); - assertThat(redis.pttl(key) <= 10 && redis.pttl(key) > 0).isTrue(); + redis.pexpire(key, 50); + assertThat(redis.pttl(key) <= 50 && redis.pttl(key) > 0).isTrue(); } @Test diff --git a/src/test/java/com/lambdaworks/redis/pubsub/PubSubCommandTest.java b/src/test/java/com/lambdaworks/redis/pubsub/PubSubCommandTest.java index 59fb698d82..c4727a1274 100644 --- a/src/test/java/com/lambdaworks/redis/pubsub/PubSubCommandTest.java +++ b/src/test/java/com/lambdaworks/redis/pubsub/PubSubCommandTest.java @@ -2,6 +2,8 @@ package com.lambdaworks.redis.pubsub; +import static com.google.code.tempusfugit.temporal.Duration.seconds; +import static com.google.code.tempusfugit.temporal.Timeout.timeout; import static org.assertj.core.api.Assertions.assertThat; import static org.hamcrest.CoreMatchers.hasItem; import static org.junit.Assert.assertThat; @@ -13,16 +15,21 @@ import java.util.concurrent.TimeUnit; import com.lambdaworks.Wait; -import com.lambdaworks.redis.AbstractRedisClientTest; -import com.lambdaworks.redis.FastShutdown; -import com.lambdaworks.redis.RedisClient; -import com.lambdaworks.redis.RedisFuture; +import com.lambdaworks.redis.*; +import com.lambdaworks.redis.api.async.RedisAsyncCommands; import com.lambdaworks.redis.pubsub.api.async.RedisPubSubAsyncCommands; import org.junit.After; import org.junit.Before; import org.junit.Test; import com.google.common.collect.Lists; +import com.google.code.tempusfugit.temporal.Condition; +import com.google.code.tempusfugit.temporal.WaitFor; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.MoreExecutors; +import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; +import com.lambdaworks.redis.pubsub.RedisPubSubConnection; +import com.lambdaworks.redis.pubsub.RedisPubSubListener; public class PubSubCommandTest extends AbstractRedisClientTest implements RedisPubSubListener { private RedisPubSubAsyncCommands pubsub; @@ -92,6 +99,25 @@ public void message() throws Exception { assertThat(messages.take()).isEqualTo(message); } + @Test(timeout = 2000) + public void pipelinedMessage() throws Exception { + pubsub.subscribe(channel); + assertThat(channels.take()).isEqualTo(channel); + RedisAsyncCommands connection = client.connectAsync(); + + connection.setAutoFlushCommands(false); + connection.publish(channel, message); + Thread.sleep(100); + + assertThat(channels).isEmpty(); + connection.flushCommands(); + + assertThat(channels.take()).isEqualTo(channel); + assertThat(messages.take()).isEqualTo(message); + + connection.close(); + } + @Test(timeout = 2000) public void pmessage() throws Exception { pubsub.psubscribe(pattern).await(1, TimeUnit.MINUTES); @@ -108,6 +134,24 @@ public void pmessage() throws Exception { assertThat(messages.take()).isEqualTo("msg 2!"); } + @Test(timeout = 2000) + public void pipelinedSubscribe() throws Exception { + + pubsub.setAutoFlushCommands(false); + pubsub.subscribe(channel); + Thread.sleep(100); + assertThat(channels).isEmpty(); + pubsub.flushCommands(); + + assertThat(channels.take()).isEqualTo(channel); + + redis.publish(channel, message); + + assertThat(channels.take()).isEqualTo(channel); + assertThat(messages.take()).isEqualTo(message); + + } + @Test(timeout = 2000) public void psubscribe() throws Exception { RedisFuture psubscribe = pubsub.psubscribe(pattern);