Skip to content

Commit

Permalink
Consider Sentinel slave changes to refresh topology #669
Browse files Browse the repository at this point in the history
Lettuce now considers Sentinel messages on channels +sdown, -sdown and +slave as signals to refresh topology. This change allows to add new nodes during runtime and close connections to nodes that are not available (connectable). Temporary failures to connect a slave result in closing the client connection until the node is reachable again.
  • Loading branch information
mp911de committed Dec 19, 2017
1 parent c3b777b commit 883b3e4
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ interface MessagePredicate extends BiPredicate<String, String> {
private static class TopologyRefreshMessagePredicate implements MessagePredicate {

private final String masterId;
private Set<String> TOPOLOGY_CHANGE_CHANNELS = new HashSet<>(Arrays.asList("+slave", "+sdown", "-sdown",
"fix-slave-config", "+convert-to-slave", "+role-change"));

TopologyRefreshMessagePredicate(String masterId) {
this.masterId = masterId;
Expand All @@ -367,26 +369,20 @@ private static class TopologyRefreshMessagePredicate implements MessagePredicate
public boolean test(String channel, String message) {

// trailing spaces after the master name are not bugs
if (channel.equals("+elected-leader")) {
if (channel.equals("+elected-leader") || channel.equals("+reset-master")) {
if (message.startsWith(String.format("master %s ", masterId))) {
return true;
}
}

if (channel.equals("+switch-master")) {
if (message.startsWith(String.format("%s ", masterId))) {
return true;
}
}

if (channel.equals("fix-slave-config")) {
if (TOPOLOGY_CHANGE_CHANNELS.contains(channel)) {
if (message.contains(String.format("@ %s ", masterId))) {
return true;
}
}

if (channel.equals("+convert-to-slave") || channel.equals("+role-change")) {
if (message.contains(String.format("@ %s ", masterId))) {
if (channel.equals("+switch-master")) {
if (message.startsWith(String.format("%s ", masterId))) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,52 @@ public void shouldNotProcessOtherEvents() {
RedisPubSubAdapter<String, String> adapter = getAdapter();
sut.bind(refreshRunnable);

adapter.message("*", "*", "irreleval");
adapter.message("*", "*", "irrelevant");

verify(redisClient, times(3)).getResources();
verify(redisClient).connectPubSubAsync(any(), any());
verifyNoMoreInteractions(redisClient);
}

@Test
public void shouldProcessSlaveDown() {

RedisPubSubAdapter<String, String> adapter = getAdapter();
sut.bind(refreshRunnable);

adapter.message("*", "+sdown", "slave 127.0.0.1:6483 127.0.0.1 6483 @ mymaster 127.0.0.1 6482");

verify(eventExecutors, times(1)).schedule(captor.capture(), anyLong(), any());
captor.getValue().run();
verify(refreshRunnable, times(1)).run();
}

@Test
public void shouldProcessSlaveAdded() {

RedisPubSubAdapter<String, String> adapter = getAdapter();
sut.bind(refreshRunnable);

adapter.message("*", "+slave", "slave 127.0.0.1:8483 127.0.0.1 8483 @ mymaster 127.0.0.1 6482");

verify(eventExecutors, times(1)).schedule(captor.capture(), anyLong(), any());
captor.getValue().run();
verify(refreshRunnable, times(1)).run();
}

@Test
public void shouldProcessSlaveBackUp() {

RedisPubSubAdapter<String, String> adapter = getAdapter();
sut.bind(refreshRunnable);

adapter.message("*", "-sdown", "slave 127.0.0.1:6483 127.0.0.1 6483 @ mymaster 127.0.0.1 6482");

verify(eventExecutors, times(1)).schedule(captor.capture(), anyLong(), any());
captor.getValue().run();
verify(refreshRunnable, times(1)).run();
}

@Test
public void shouldProcessElectedLeader() {

Expand Down

0 comments on commit 883b3e4

Please sign in to comment.