From 1027c2cdbbbac61436b8235b2be1729af4146d17 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 21 May 2024 22:59:34 +0800 Subject: [PATCH] CURATOR-696. Fix double leader for LeaderLatch (#500) Signed-off-by: tison Co-authored-by: Kezhu Wang --- curator-recipes/pom.xml | 6 ++ .../framework/recipes/leader/LeaderLatch.java | 78 +++++++++++-------- .../recipes/leader/TestLeaderLatch.java | 65 +++++++++++++++- 3 files changed, 116 insertions(+), 33 deletions(-) diff --git a/curator-recipes/pom.xml b/curator-recipes/pom.xml index 927af0c28..1ac5a8b75 100644 --- a/curator-recipes/pom.xml +++ b/curator-recipes/pom.xml @@ -83,6 +83,12 @@ test + + org.assertj + assertj-core + test + + org.awaitility awaitility diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index 80509dbf8..4d20f9afd 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -509,7 +509,7 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex getChildren(); } } else { - log.error("getChildren() failed. rc = " + event.getResultCode()); + log.error("getChildren() failed. rc = {}", event.getResultCode()); } } }; @@ -548,43 +548,57 @@ private void checkLeadership(List children) throws Exception { log.debug("checkLeadership with id: {}, ourPath: {}, children: {}", id, localOurPath, sortedChildren); if (ourIndex < 0) { - log.error("Can't find our node. Resetting. Index: " + ourIndex); + log.error("Can't find our node. Resetting. Index: {}", ourIndex); reset(); - } else if (ourIndex == 0) { - lastPathIsLeader.set(localOurPath); - setLeadership(true); - } else { - setLeadership(false); - String watchPath = sortedChildren.get(ourIndex - 1); - Watcher watcher = new Watcher() { - @Override - public void process(WatchedEvent event) { - if (state.get() == State.STARTED && event.getType() == Event.EventType.NodeDeleted) { - try { - getChildren(); - } catch (Exception ex) { - ThreadUtils.checkInterrupted(ex); - log.error("An error occurred checking the leadership.", ex); + return; + } + + if (ourIndex == 0) { + client.getData() + .inBackground((client, event) -> { + final long ephemeralOwner = + event.getStat() != null ? event.getStat().getEphemeralOwner() : -1; + final long thisSessionId = + client.getZookeeperClient().getZooKeeper().getSessionId(); + if (ephemeralOwner != thisSessionId) { + // this node is gone - reset + reset(); + } else { + lastPathIsLeader.set(localOurPath); + setLeadership(true); } - } - } - }; + }) + .forPath(localOurPath); + return; + } - BackgroundCallback callback = new BackgroundCallback() { - @Override - public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { - if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) { - // previous node is gone - retry getChildren + setLeadership(false); + String watchPath = sortedChildren.get(ourIndex - 1); + Watcher watcher = new Watcher() { + @Override + public void process(WatchedEvent event) { + if (state.get() == State.STARTED && event.getType() == Event.EventType.NodeDeleted) { + try { getChildren(); + } catch (Exception ex) { + ThreadUtils.checkInterrupted(ex); + log.error("An error occurred checking the leadership.", ex); } } - }; - // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak - client.getData() - .usingWatcher(watcher) - .inBackground(callback) - .forPath(ZKPaths.makePath(latchPath, watchPath)); - } + } + }; + + BackgroundCallback callback = new BackgroundCallback() { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { + if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) { + // previous node is gone - retry getChildren + getChildren(); + } + } + }; + // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak + client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath)); } private void getChildren() throws Exception { diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index dc79f6668..528b317ff 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -19,6 +19,7 @@ package org.apache.curator.framework.recipes.leader; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; @@ -72,9 +73,12 @@ import org.awaitility.Awaitility; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Tag(CuratorTestBase.zk35TestCompatibilityGroup) public class TestLeaderLatch extends BaseClassForTests { + private static final Logger LOG = LoggerFactory.getLogger(TestLeaderLatch.class); private static final String PATH_NAME = "/one/two/me"; private static final int MAX_LOOPS = 5; @@ -208,6 +212,58 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception { } } + @Test + public void testSessionInterruptionDoNotCauseBrainSplit() throws Exception { + final String latchPath = "/testSessionInterruptionDoNotCauseBrainSplit"; + final Timing2 timing = new Timing2(); + final BlockingQueue events0 = new LinkedBlockingQueue<>(); + final BlockingQueue events1 = new LinkedBlockingQueue<>(); + + final List closeableResources = new ArrayList<>(); + try { + final String id0 = "id0"; + final CuratorFramework client0 = createAndStartClient(server.getConnectString(), timing, id0, null); + closeableResources.add(client0); + final LeaderLatch latch0 = createAndStartLeaderLatch(client0, latchPath, id0, events0); + closeableResources.add(latch0); + + assertThat(events0.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)) + .isNotNull() + .isEqualTo(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP)); + + final String id1 = "id1"; + final CuratorFramework client1 = createAndStartClient(server.getConnectString(), timing, id1, null); + closeableResources.add(client1); + final LeaderLatch latch1 = createAndStartLeaderLatch(client1, latchPath, id1, events1); + closeableResources.add(latch1); + + // wait for the non-leading LeaderLatch (i.e. latch1) instance to be done with its creation + // this call is time-consuming but necessary because we don't have a handle to detect the end of the reset + // call + timing.forWaiting().sleepABit(); + + assertTrue(latch0.hasLeadership()); + assertFalse(latch1.hasLeadership()); + + client0.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration(); + + assertThat(events1.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)) + .isNotNull() + .isEqualTo(new TestEvent(id1, TestEventType.GAINED_LEADERSHIP)); + + assertThat(events0.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)) + .isNotNull() + .isEqualTo(new TestEvent(id0, TestEventType.LOST_LEADERSHIP)); + // No leadership grained to old leader after session changed, hence no brain split. + assertThat(events0.poll(20, TimeUnit.MILLISECONDS)) + .isNotEqualTo(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP)); + } finally { + // reverse is necessary for closing the LeaderLatch instances before closing the corresponding client + Collections.reverse(closeableResources); + closeableResources.forEach(CloseableUtils::closeQuietly); + } + } + @Test public void testResettingOfLeadershipAfterConcurrentLeadershipChange() throws Exception { final String latchPath = "/test"; @@ -316,7 +372,9 @@ private static CuratorFramework createAndStartClient( client.getConnectionStateListenable().addListener((client1, newState) -> { if (newState == ConnectionState.CONNECTED) { - events.add(new TestEvent(id, TestEventType.GAINED_CONNECTION)); + if (events != null) { + events.add(new TestEvent(id, TestEventType.GAINED_CONNECTION)); + } } }); @@ -366,6 +424,11 @@ public boolean equals(Object o) { TestEvent testEvent = (TestEvent) o; return Objects.equals(id, testEvent.id) && eventType == testEvent.eventType; } + + @Override + public String toString() { + return "TestEvent{" + "eventType=" + eventType + ", id='" + id + '\'' + '}'; + } } @Test