Skip to content

Commit

Permalink
Fix Pub/Sub Cluster connection re-activation #2534
Browse files Browse the repository at this point in the history
We now eagerly obtain the cluster node identifier, before resubscribing to prevent command failures.
  • Loading branch information
mp911de committed Oct 24, 2023
1 parent 87a75e6 commit 9055a69
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,12 @@ public CompletableFuture<StatefulRedisPubSubConnection<K, V>> getConnectionAsync

@Override
public void activated() {
super.activated();

async.clusterMyId().thenAccept(this::setNodeId);
if (!endpoint.isSubscribed()) {
async.clusterMyId().thenAccept(this::setNodeId);
}

super.activated();
}

public void setPartitions(Partitions partitions) {
Expand Down
9 changes: 7 additions & 2 deletions src/main/java/io/lettuce/core/pubsub/PubSubEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
*/
package io.lettuce.core.pubsub;

import java.util.*;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;

Expand Down Expand Up @@ -192,7 +197,7 @@ private static boolean isAllowed(RedisCommand<?, ?, ?> command) {
return ALLOWED_COMMANDS_SUBSCRIBED.contains(command.getType().name());
}

private boolean isSubscribed() {
public boolean isSubscribed() {
return subscribeWritten && (hasChannelSubscriptions() || hasPatternSubscriptions());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,10 @@
*/
package io.lettuce.core.cluster.pubsub;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.*;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;

import javax.inject.Inject;
Expand All @@ -34,7 +30,6 @@

import io.lettuce.core.RedisURI;
import io.lettuce.core.TestSupport;
import io.lettuce.core.api.push.PushMessage;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
Expand All @@ -45,12 +40,13 @@
import io.lettuce.core.cluster.pubsub.api.reactive.PubSubReactiveNodeSelection;
import io.lettuce.core.cluster.pubsub.api.sync.NodeSelectionPubSubCommands;
import io.lettuce.core.cluster.pubsub.api.sync.PubSubNodeSelection;
import io.lettuce.core.event.command.CommandFailedEvent;
import io.lettuce.core.event.command.CommandListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.support.PubSubTestListener;
import io.lettuce.test.LettuceExtension;
import io.lettuce.test.TestFutures;
import io.lettuce.test.Wait;
import io.lettuce.test.condition.EnabledOnCommand;

/**
* @author Mark Paluch
Expand All @@ -61,10 +57,13 @@ class RedisClusterPubSubConnectionIntegrationTests extends TestSupport {
private final RedisClusterClient clusterClient;

private final PubSubTestListener connectionListener = new PubSubTestListener();

private final PubSubTestListener nodeListener = new PubSubTestListener();

private StatefulRedisClusterConnection<String, String> connection;

private StatefulRedisClusterPubSubConnection<String, String> pubSubConnection;

private StatefulRedisClusterPubSubConnection<String, String> pubSubConnection2;

@Inject
Expand Down Expand Up @@ -101,6 +100,34 @@ void testRegularClientPubSubChannels() {
assertThat(channelsOnOtherNode).isEmpty();
}

@Test
void myIdWorksAfterDisconnect() throws InterruptedException {

BlockingQueue<CommandFailedEvent> failedEvents = new LinkedBlockingQueue<CommandFailedEvent>();

CommandListener listener = new CommandListener() {

@Override
public void commandFailed(CommandFailedEvent event) {
failedEvents.add(event);
}

};
clusterClient.addListener(listener);

StatefulRedisClusterPubSubConnection<String, String> pubsub = clusterClient.connectPubSub();
pubsub.sync().subscribe("foo");
pubsub.async().quit();

Thread.sleep(100);
Wait.untilTrue(pubsub::isOpen).waitOrTimeout();

pubsub.close();
clusterClient.removeListener(listener);

assertThat(failedEvents).isEmpty();
}

@Test
void testRegularClientPublish() throws Exception {

Expand Down Expand Up @@ -164,8 +191,7 @@ void testGetConnectionAsyncByNodeId() {
RedisClusterNode partition = pubSubConnection.getPartitions().getPartition(0);

StatefulRedisPubSubConnection<String, String> node = TestFutures
.getOrTimeout(pubSubConnection.getConnectionAsync(partition
.getNodeId()));
.getOrTimeout(pubSubConnection.getConnectionAsync(partition.getNodeId()));

assertThat(node.sync().ping()).isEqualTo("PONG");
}
Expand All @@ -177,8 +203,7 @@ void testGetConnectionAsyncByHostAndPort() {

RedisURI uri = partition.getUri();
StatefulRedisPubSubConnection<String, String> node = TestFutures
.getOrTimeout(pubSubConnection.getConnectionAsync(uri.getHost(),
uri.getPort()));
.getOrTimeout(pubSubConnection.getConnectionAsync(uri.getHost(), uri.getPort()));

assertThat(node.sync().ping()).isEqualTo("PONG");
}
Expand Down Expand Up @@ -297,6 +322,7 @@ void testClusterListener() throws Exception {
public void message(RedisClusterNode node, String pattern, String channel, String message) {
nodes.add(node);
}

});

PubSubNodeSelection<String, String> masters = pubSubConnection.sync().masters();
Expand Down Expand Up @@ -326,4 +352,5 @@ private RedisClusterNode getOtherThan(String nodeId) {

throw new IllegalStateException("No other nodes than " + nodeId + " available");
}

}

0 comments on commit 9055a69

Please sign in to comment.