Skip to content

Commit

Permalink
Allow only pub/sub commands for subscribed connections #579
Browse files Browse the repository at this point in the history
Pub/Sub connections that are subscribed to at least channel/pattern allow only subscription commands to be executed and reject execution of commands other than SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, PUNSUBSCRIBE and QUIT.

Response correlation cannot be guaranteed because Redis can publish a message first and then process the actual command which would parse the message as command response and treat the actual command response as pub/sub message. A deep response inspection could help to identify non-pub/sub message responses but it's no guarantee.
  • Loading branch information
mp911de committed Oct 6, 2017
1 parent 1ed5403 commit 2a6b3ee
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 5 deletions.
34 changes: 32 additions & 2 deletions src/main/java/io/lettuce/core/pubsub/PubSubEndpoint.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2011-2016 the original author or authors.
* Copyright 2011-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,24 +15,39 @@
*/
package io.lettuce.core.pubsub;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisException;
import io.lettuce.core.protocol.CommandType;
import io.lettuce.core.protocol.DefaultEndpoint;

import io.lettuce.core.protocol.RedisCommand;
import io.netty.util.internal.ConcurrentSet;

/**
* @author Mark Paluch
*/
public class PubSubEndpoint<K, V> extends DefaultEndpoint {

private static final Set<String> ALLOWED_COMMANDS_SUBSCRIBED;
private final List<RedisPubSubListener<K, V>> listeners = new CopyOnWriteArrayList<>();
private final Set<K> channels;
private final Set<K> patterns;

static {

ALLOWED_COMMANDS_SUBSCRIBED = new HashSet<>(5, 1);

ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.SUBSCRIBE.name());
ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.PSUBSCRIBE.name());
ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.UNSUBSCRIBE.name());
ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.PUNSUBSCRIBE.name());
ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.QUIT.name());
}

/**
* Initialize a new instance that handles commands from the supplied queue.
*
Expand Down Expand Up @@ -75,6 +90,21 @@ public Set<K> getPatterns() {
return patterns;
}

@Override
public <K1, V1, T> RedisCommand<K1, V1, T> write(RedisCommand<K1, V1, T> command) {

if (!channels.isEmpty() || !patterns.isEmpty()) {

if (!ALLOWED_COMMANDS_SUBSCRIBED.contains(command.getType().name())) {

throw new RedisException(String.format("Command %s not allowed while subscribed. Allowed commands are: %s",
command.getType().name(), ALLOWED_COMMANDS_SUBSCRIBED));
}
}

return super.write(command);
}

public void notifyMessage(PubSubOutput<K, V, V> output) {

// drop empty messages
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/io/lettuce/SslTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,10 @@ public void pubSubSslAndBreakConnection() throws Exception {
assertThat(future.get()).doesNotContain("c1", "c2");
assertThat(future.isDone()).isEqualTo(true);

RedisFuture<List<String>> defectFuture = connection.pubsubChannels();
RedisFuture<Void> defectFuture = connection.subscribe("foo");

try {
assertThat(defectFuture.get()).doesNotContain("c1", "c2");
defectFuture.get();
fail("Missing ExecutionException with nested SSLHandshakeException");
} catch (InterruptedException e) {
fail("Missing ExecutionException with nested SSLHandshakeException");
Expand Down
16 changes: 15 additions & 1 deletion src/test/java/io/lettuce/core/pubsub/PubSubCommandTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2011-2016 the original author or authors.
* Copyright 2011-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@
package io.lettuce.core.pubsub;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Fail.fail;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.junit.Assert.assertThat;
Expand Down Expand Up @@ -382,6 +383,19 @@ public void removeListener() throws Exception {
assertThat(messages.poll(10, TimeUnit.MILLISECONDS)).isNull();
}

@Test
public void pingNotAllowedInSubscriptionState() throws Exception {

pubsub.subscribe(channel).get();

assertThatThrownBy(() -> pubsub.ping().get()).isInstanceOf(RedisException.class).hasMessageContaining("not allowed");
pubsub.unsubscribe(channel);

Wait.untilTrue(() -> channels.size() == 2).waitOrTimeout();

assertThat(pubsub.ping().get()).isEqualTo("PONG");
}

// RedisPubSubListener implementation

@Override
Expand Down

0 comments on commit 2a6b3ee

Please sign in to comment.