Skip to content

Commit

Permalink
Allow only pub/sub commands for subscribed connections #579
Browse files Browse the repository at this point in the history
Pub/Sub connections that are subscribed to at least channel/pattern allow only subscription commands to be executed and reject execution of commands other than SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, PUNSUBSCRIBE and QUIT.

Response correlation cannot be guaranteed because Redis can publish a message first and then process the actual command which would parse the message as command response and treat the actual command response as pub/sub message. A deep response inspection could help to identify non-pub/sub message responses but it's no guarantee.
  • Loading branch information
mp911de committed Oct 6, 2017
1 parent 21056f6 commit d580c72
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,19 @@
package com.lambdaworks.redis.pubsub;

import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;

import com.lambdaworks.redis.*;
import com.lambdaworks.redis.api.sync.RedisCommands;
import com.lambdaworks.redis.cluster.api.sync.RedisClusterCommands;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandType;
import com.lambdaworks.redis.protocol.ConnectionWatchdog;
import com.lambdaworks.redis.protocol.RedisCommand;
import com.lambdaworks.redis.pubsub.api.async.RedisPubSubAsyncCommands;
import com.lambdaworks.redis.pubsub.api.rx.RedisPubSubReactiveCommands;
import com.lambdaworks.redis.pubsub.api.sync.RedisPubSubCommands;

import io.netty.channel.ChannelHandler;
import io.netty.util.internal.ConcurrentSet;

Expand All @@ -48,10 +46,23 @@
public class StatefulRedisPubSubConnectionImpl<K, V> extends StatefulRedisConnectionImpl<K, V> implements
StatefulRedisPubSubConnection<K, V> {

private static final Set<String> ALLOWED_COMMANDS_SUBSCRIBED;

protected final List<RedisPubSubListener<K, V>> listeners;
protected final Set<K> channels;
protected final Set<K> patterns;

static {

ALLOWED_COMMANDS_SUBSCRIBED = new HashSet<>(5, 1);

ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.SUBSCRIBE.name());
ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.PSUBSCRIBE.name());
ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.UNSUBSCRIBE.name());
ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.PUNSUBSCRIBE.name());
ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.QUIT.name());
}

/**
* Initialize a new connection.
*
Expand Down Expand Up @@ -139,6 +150,21 @@ public void activated() {
resubscribe();
}

@Override
public <T, C extends RedisCommand<K, V, T>> C dispatch(C command) {

if (!channels.isEmpty() || !patterns.isEmpty()) {

if (!ALLOWED_COMMANDS_SUBSCRIBED.contains(command.getType().name())) {

throw new RedisException(String.format("Command %s not allowed while subscribed. Allowed commands are: %s",
command.getType().name(), ALLOWED_COMMANDS_SUBSCRIBED));
}
}

return super.dispatch(command);
}

/**
* Re-subscribe to all previously subscribed channels and patterns.
*
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/com/lambdaworks/SslTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,10 @@ public void pubSubSslAndBreakConnection() throws Exception {
assertThat(future.get()).doesNotContain("c1", "c2");
assertThat(future.isDone()).isEqualTo(true);

RedisFuture<List<String>> defectFuture = connection.pubsubChannels();
RedisFuture<Void> defectFuture = connection.subscribe("foo");

try {
assertThat(defectFuture.get()).doesNotContain("c1", "c2");
defectFuture.get();
fail("Missing ExecutionException with nested SSLHandshakeException");
} catch (InterruptedException e) {
fail("Missing ExecutionException with nested SSLHandshakeException");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ public void testRegularClientPubSubPublish() throws Exception {
RedisCommands<String, String> otherNodeConnection = connection.getConnection(otherNode.getNodeId()).sync();
otherNodeConnection.publish(key, value);
assertThat(listener.getChannels().take()).isEqualTo(key);

}

private RedisClusterNode getOtherThan(String nodeId) {
Expand Down
14 changes: 14 additions & 0 deletions src/test/java/com/lambdaworks/redis/pubsub/PubSubCommandTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.lambdaworks.redis.pubsub;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Fail.fail;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.junit.Assert.assertThat;
Expand Down Expand Up @@ -382,6 +383,19 @@ public void removeListener() throws Exception {
assertThat(messages.poll(10, TimeUnit.MILLISECONDS)).isNull();
}

@Test
public void pingNotAllowedInSubscriptionState() throws Exception {

pubsub.subscribe(channel).get();

assertThatThrownBy(() -> pubsub.ping().get()).isInstanceOf(RedisException.class).hasMessageContaining("not allowed");
pubsub.unsubscribe(channel);

Wait.untilTrue(() -> channels.size() == 2).waitOrTimeout();

assertThat(pubsub.ping().get()).isEqualTo("PONG");
}

// RedisPubSubListener implementation

@Override
Expand Down

0 comments on commit d580c72

Please sign in to comment.