From 8e79e04a5e90b8340be5301d4bd1c2ebf5176884 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Wed, 13 Dec 2017 12:09:07 +0100 Subject: [PATCH] Fix argument order for Sentinel-related Pub/Sub messages in Master/Slave #668 Lettuce now evaluates the correct message and channel arguments to determine whether it should refresh/reconnect sentinels. --- .../masterslave/SentinelTopologyRefresh.java | 8 +-- .../SentinelTopologyRefreshTest.java | 52 +++++++++++-------- 2 files changed, 35 insertions(+), 25 deletions(-) diff --git a/src/main/java/io/lettuce/core/masterslave/SentinelTopologyRefresh.java b/src/main/java/io/lettuce/core/masterslave/SentinelTopologyRefresh.java index f9f3557bcc..60698a8948 100644 --- a/src/main/java/io/lettuce/core/masterslave/SentinelTopologyRefresh.java +++ b/src/main/java/io/lettuce/core/masterslave/SentinelTopologyRefresh.java @@ -1,5 +1,5 @@ /* - * Copyright 2011-2016 the original author or authors. + * Copyright 2011-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,7 +33,6 @@ import io.lettuce.core.protocol.LettuceCharsets; import io.lettuce.core.pubsub.RedisPubSubAdapter; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; - import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -237,7 +236,7 @@ protected void onEvent(Consumer timeoutConsumer) { } } - static interface MessagePredicate extends BiPredicate { + interface MessagePredicate extends BiPredicate { @Override boolean test(String message, String channel); @@ -294,7 +293,8 @@ public boolean test(String channel, String message) { private static class SentinelReconnectMessagePredicate implements MessagePredicate { @Override - public boolean test(String message, String channel) { + public boolean test(String channel, String message) { + if (channel.equals("+sentinel")) { return true; } diff --git a/src/test/java/io/lettuce/core/masterslave/SentinelTopologyRefreshTest.java b/src/test/java/io/lettuce/core/masterslave/SentinelTopologyRefreshTest.java index af8471bd8b..c0bfb270d1 100644 --- a/src/test/java/io/lettuce/core/masterslave/SentinelTopologyRefreshTest.java +++ b/src/test/java/io/lettuce/core/masterslave/SentinelTopologyRefreshTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2011-2016 the original author or authors. + * Copyright 2011-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,7 +42,6 @@ import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands; import io.lettuce.core.resource.ClientResources; - import io.netty.util.concurrent.EventExecutorGroup; /** @@ -79,7 +78,7 @@ public class SentinelTopologyRefreshTest { private SentinelTopologyRefresh sut; @Before - public void before() throws Exception { + public void before() { when(redisClient.connectPubSub(any(StringCodec.class), eq(host1))).thenReturn(connection); when(clientResources.eventExecutorGroup()).thenReturn(eventExecutors); @@ -90,7 +89,7 @@ public void before() throws Exception { } @Test - public void bind() throws Exception { + public void bind() { sut.bind(refreshRunnable); @@ -99,7 +98,7 @@ public void bind() throws Exception { } @Test - public void bindWithSecondSentinelFails() throws Exception { + public void bindWithSecondSentinelFails() { sut = new SentinelTopologyRefresh(redisClient, "mymaster", Arrays.asList(host1, host2)); @@ -114,7 +113,7 @@ public void bindWithSecondSentinelFails() throws Exception { } @Test - public void bindWithSentinelRecovery() throws Exception { + public void bindWithSentinelRecovery() { StatefulRedisPubSubConnection connection2 = mock(StatefulRedisPubSubConnection.class); RedisPubSubAsyncCommands async2 = mock(RedisPubSubAsyncCommands.class); @@ -138,13 +137,16 @@ public void bindWithSentinelRecovery() throws Exception { adapter.message("*", "+sentinel", "sentinel c14cc895bb0479c91312cee0e0440b7d99ad367b 127.0.0.1 26380 @ mymaster 127.0.0.1 6483"); + verify(eventExecutors, times(1)).schedule(captor.capture(), anyLong(), any()); + captor.getValue().run(); + verify(redisClient, times(2)).connectPubSub(any(), eq(host2)); assertThat(connections).containsKey(host1).containsKey(host2).hasSize(2); verify(refreshRunnable, never()).run(); } @Test - public void bindDuringClose() throws Exception { + public void bindDuringClose() { sut = new SentinelTopologyRefresh(redisClient, "mymaster", Arrays.asList(host1, host2)); @@ -171,7 +173,7 @@ public void bindDuringClose() throws Exception { } @Test - public void close() throws Exception { + public void close() { sut.bind(refreshRunnable); sut.close(); @@ -181,7 +183,7 @@ public void close() throws Exception { } @Test - public void bindAfterClose() throws Exception { + public void bindAfterClose() { sut.close(); sut.bind(refreshRunnable); @@ -191,9 +193,10 @@ public void bindAfterClose() throws Exception { } @Test - public void shouldNotProcessOtherEvents() throws Exception { + public void shouldNotProcessOtherEvents() { RedisPubSubAdapter adapter = getAdapter(); + sut.bind(refreshRunnable); adapter.message("*", "*", "irreleval"); @@ -203,9 +206,10 @@ public void shouldNotProcessOtherEvents() throws Exception { } @Test - public void shouldProcessElectedLeader() throws Exception { + public void shouldProcessElectedLeader() { RedisPubSubAdapter adapter = getAdapter(); + sut.bind(refreshRunnable); adapter.message("*", "+elected-leader", "master mymaster 127.0.0.1"); @@ -215,9 +219,10 @@ public void shouldProcessElectedLeader() throws Exception { } @Test - public void shouldProcessSwitchMaster() throws Exception { + public void shouldProcessSwitchMaster() { RedisPubSubAdapter adapter = getAdapter(); + sut.bind(refreshRunnable); adapter.message("*", "+switch-master", "mymaster 127.0.0.1"); @@ -227,9 +232,10 @@ public void shouldProcessSwitchMaster() throws Exception { } @Test - public void shouldProcessFixSlaveConfig() throws Exception { + public void shouldProcessFixSlaveConfig() { RedisPubSubAdapter adapter = getAdapter(); + sut.bind(refreshRunnable); adapter.message("*", "fix-slave-config", "@ mymaster 127.0.0.1"); @@ -239,9 +245,10 @@ public void shouldProcessFixSlaveConfig() throws Exception { } @Test - public void shouldProcessConvertToSlave() throws Exception { + public void shouldProcessConvertToSlave() { RedisPubSubAdapter adapter = getAdapter(); + sut.bind(refreshRunnable); adapter.message("*", "+convert-to-slave", "@ mymaster 127.0.0.1"); @@ -251,9 +258,10 @@ public void shouldProcessConvertToSlave() throws Exception { } @Test - public void shouldProcessRoleChange() throws Exception { + public void shouldProcessRoleChange() { RedisPubSubAdapter adapter = getAdapter(); + sut.bind(refreshRunnable); adapter.message("*", "+role-change", "@ mymaster 127.0.0.1"); @@ -263,9 +271,10 @@ public void shouldProcessRoleChange() throws Exception { } @Test - public void shouldProcessFailoverEnd() throws Exception { + public void shouldProcessFailoverEnd() { RedisPubSubAdapter adapter = getAdapter(); + sut.bind(refreshRunnable); adapter.message("*", "failover-end", ""); @@ -275,9 +284,10 @@ public void shouldProcessFailoverEnd() throws Exception { } @Test - public void shouldProcessFailoverTimeout() throws Exception { + public void shouldProcessFailoverTimeout() { RedisPubSubAdapter adapter = getAdapter(); + sut.bind(refreshRunnable); adapter.message("*", "failover-end-for-timeout", ""); @@ -287,9 +297,10 @@ public void shouldProcessFailoverTimeout() throws Exception { } @Test - public void shouldExecuteOnceWithinATimeout() throws Exception { + public void shouldExecuteOnceWithinATimeout() { RedisPubSubAdapter adapter = getAdapter(); + sut.bind(refreshRunnable); adapter.message("*", "failover-end-for-timeout", ""); adapter.message("*", "failover-end-for-timeout", ""); @@ -300,9 +311,10 @@ public void shouldExecuteOnceWithinATimeout() throws Exception { } @Test - public void shouldNotProcessIfExecutorIsShuttingDown() throws Exception { + public void shouldNotProcessIfExecutorIsShuttingDown() { RedisPubSubAdapter adapter = getAdapter(); + sut.bind(refreshRunnable); when(eventExecutors.isShuttingDown()).thenReturn(true); adapter.message("*", "failover-end-for-timeout", ""); @@ -312,8 +324,6 @@ public void shouldNotProcessIfExecutorIsShuttingDown() throws Exception { } private RedisPubSubAdapter getAdapter() { - - sut.bind(refreshRunnable); return (RedisPubSubAdapter) ReflectionTestUtils.getField(sut, "adapter"); } }