Skip to content

Commit

Permalink
Fix argument order for Sentinel-related Pub/Sub messages in Master/Sl…
Browse files Browse the repository at this point in the history
…ave #668

Lettuce now evaluates the correct message and channel arguments to determine whether it should refresh/reconnect sentinels.
  • Loading branch information
mp911de committed Dec 13, 2017
1 parent 8d9932c commit 8e79e04
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2011-2016 the original author or authors.
* Copyright 2011-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,7 +33,6 @@
import io.lettuce.core.protocol.LettuceCharsets;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;

import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
Expand Down Expand Up @@ -237,7 +236,7 @@ protected void onEvent(Consumer<Timeout> timeoutConsumer) {
}
}

static interface MessagePredicate extends BiPredicate<String, String> {
interface MessagePredicate extends BiPredicate<String, String> {

@Override
boolean test(String message, String channel);
Expand Down Expand Up @@ -294,7 +293,8 @@ public boolean test(String channel, String message) {
private static class SentinelReconnectMessagePredicate implements MessagePredicate {

@Override
public boolean test(String message, String channel) {
public boolean test(String channel, String message) {

if (channel.equals("+sentinel")) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2011-2016 the original author or authors.
* Copyright 2011-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,7 +42,6 @@
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
import io.lettuce.core.resource.ClientResources;

import io.netty.util.concurrent.EventExecutorGroup;

/**
Expand Down Expand Up @@ -79,7 +78,7 @@ public class SentinelTopologyRefreshTest {
private SentinelTopologyRefresh sut;

@Before
public void before() throws Exception {
public void before() {

when(redisClient.connectPubSub(any(StringCodec.class), eq(host1))).thenReturn(connection);
when(clientResources.eventExecutorGroup()).thenReturn(eventExecutors);
Expand All @@ -90,7 +89,7 @@ public void before() throws Exception {
}

@Test
public void bind() throws Exception {
public void bind() {

sut.bind(refreshRunnable);

Expand All @@ -99,7 +98,7 @@ public void bind() throws Exception {
}

@Test
public void bindWithSecondSentinelFails() throws Exception {
public void bindWithSecondSentinelFails() {

sut = new SentinelTopologyRefresh(redisClient, "mymaster", Arrays.asList(host1, host2));

Expand All @@ -114,7 +113,7 @@ public void bindWithSecondSentinelFails() throws Exception {
}

@Test
public void bindWithSentinelRecovery() throws Exception {
public void bindWithSentinelRecovery() {

StatefulRedisPubSubConnection<String, String> connection2 = mock(StatefulRedisPubSubConnection.class);
RedisPubSubAsyncCommands<String, String> async2 = mock(RedisPubSubAsyncCommands.class);
Expand All @@ -138,13 +137,16 @@ public void bindWithSentinelRecovery() throws Exception {
adapter.message("*", "+sentinel",
"sentinel c14cc895bb0479c91312cee0e0440b7d99ad367b 127.0.0.1 26380 @ mymaster 127.0.0.1 6483");

verify(eventExecutors, times(1)).schedule(captor.capture(), anyLong(), any());
captor.getValue().run();

verify(redisClient, times(2)).connectPubSub(any(), eq(host2));
assertThat(connections).containsKey(host1).containsKey(host2).hasSize(2);
verify(refreshRunnable, never()).run();
}

@Test
public void bindDuringClose() throws Exception {
public void bindDuringClose() {

sut = new SentinelTopologyRefresh(redisClient, "mymaster", Arrays.asList(host1, host2));

Expand All @@ -171,7 +173,7 @@ public void bindDuringClose() throws Exception {
}

@Test
public void close() throws Exception {
public void close() {

sut.bind(refreshRunnable);
sut.close();
Expand All @@ -181,7 +183,7 @@ public void close() throws Exception {
}

@Test
public void bindAfterClose() throws Exception {
public void bindAfterClose() {

sut.close();
sut.bind(refreshRunnable);
Expand All @@ -191,9 +193,10 @@ public void bindAfterClose() throws Exception {
}

@Test
public void shouldNotProcessOtherEvents() throws Exception {
public void shouldNotProcessOtherEvents() {

RedisPubSubAdapter<String, String> adapter = getAdapter();
sut.bind(refreshRunnable);

adapter.message("*", "*", "irreleval");

Expand All @@ -203,9 +206,10 @@ public void shouldNotProcessOtherEvents() throws Exception {
}

@Test
public void shouldProcessElectedLeader() throws Exception {
public void shouldProcessElectedLeader() {

RedisPubSubAdapter<String, String> adapter = getAdapter();
sut.bind(refreshRunnable);

adapter.message("*", "+elected-leader", "master mymaster 127.0.0.1");

Expand All @@ -215,9 +219,10 @@ public void shouldProcessElectedLeader() throws Exception {
}

@Test
public void shouldProcessSwitchMaster() throws Exception {
public void shouldProcessSwitchMaster() {

RedisPubSubAdapter<String, String> adapter = getAdapter();
sut.bind(refreshRunnable);

adapter.message("*", "+switch-master", "mymaster 127.0.0.1");

Expand All @@ -227,9 +232,10 @@ public void shouldProcessSwitchMaster() throws Exception {
}

@Test
public void shouldProcessFixSlaveConfig() throws Exception {
public void shouldProcessFixSlaveConfig() {

RedisPubSubAdapter<String, String> adapter = getAdapter();
sut.bind(refreshRunnable);

adapter.message("*", "fix-slave-config", "@ mymaster 127.0.0.1");

Expand All @@ -239,9 +245,10 @@ public void shouldProcessFixSlaveConfig() throws Exception {
}

@Test
public void shouldProcessConvertToSlave() throws Exception {
public void shouldProcessConvertToSlave() {

RedisPubSubAdapter<String, String> adapter = getAdapter();
sut.bind(refreshRunnable);

adapter.message("*", "+convert-to-slave", "@ mymaster 127.0.0.1");

Expand All @@ -251,9 +258,10 @@ public void shouldProcessConvertToSlave() throws Exception {
}

@Test
public void shouldProcessRoleChange() throws Exception {
public void shouldProcessRoleChange() {

RedisPubSubAdapter<String, String> adapter = getAdapter();
sut.bind(refreshRunnable);

adapter.message("*", "+role-change", "@ mymaster 127.0.0.1");

Expand All @@ -263,9 +271,10 @@ public void shouldProcessRoleChange() throws Exception {
}

@Test
public void shouldProcessFailoverEnd() throws Exception {
public void shouldProcessFailoverEnd() {

RedisPubSubAdapter<String, String> adapter = getAdapter();
sut.bind(refreshRunnable);

adapter.message("*", "failover-end", "");

Expand All @@ -275,9 +284,10 @@ public void shouldProcessFailoverEnd() throws Exception {
}

@Test
public void shouldProcessFailoverTimeout() throws Exception {
public void shouldProcessFailoverTimeout() {

RedisPubSubAdapter<String, String> adapter = getAdapter();
sut.bind(refreshRunnable);

adapter.message("*", "failover-end-for-timeout", "");

Expand All @@ -287,9 +297,10 @@ public void shouldProcessFailoverTimeout() throws Exception {
}

@Test
public void shouldExecuteOnceWithinATimeout() throws Exception {
public void shouldExecuteOnceWithinATimeout() {

RedisPubSubAdapter<String, String> adapter = getAdapter();
sut.bind(refreshRunnable);

adapter.message("*", "failover-end-for-timeout", "");
adapter.message("*", "failover-end-for-timeout", "");
Expand All @@ -300,9 +311,10 @@ public void shouldExecuteOnceWithinATimeout() throws Exception {
}

@Test
public void shouldNotProcessIfExecutorIsShuttingDown() throws Exception {
public void shouldNotProcessIfExecutorIsShuttingDown() {

RedisPubSubAdapter<String, String> adapter = getAdapter();
sut.bind(refreshRunnable);
when(eventExecutors.isShuttingDown()).thenReturn(true);

adapter.message("*", "failover-end-for-timeout", "");
Expand All @@ -312,8 +324,6 @@ public void shouldNotProcessIfExecutorIsShuttingDown() throws Exception {
}

private RedisPubSubAdapter<String, String> getAdapter() {

sut.bind(refreshRunnable);
return (RedisPubSubAdapter<String, String>) ReflectionTestUtils.getField(sut, "adapter");
}
}

0 comments on commit 8e79e04

Please sign in to comment.