From 676737b65e2c5b7b858ef421ef17d89dabffe2c1 Mon Sep 17 00:00:00 2001 From: Kevin Risden Date: Fri, 29 Sep 2023 14:02:04 -0400 Subject: [PATCH] SOLR-17004: ZkStateReader waitForState should check clusterState before using watchers (#1945) --- solr/CHANGES.txt | 2 ++ .../cloud/LeaderElectionIntegrationTest.java | 10 +++++- .../solr/common/cloud/ZkStateReader.java | 33 +++++++++++++++---- 3 files changed, 38 insertions(+), 7 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index d903f12c4b4..c8bb3f212a0 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -96,6 +96,8 @@ Optimizations * SOLR-16989: Optimize and consolidate reuse of DocValues iterators for value retrieval (Michael Gibney) +* SOLR-17004: ZkStateReader waitForState should check clusterState before using watchers (Kevin Risden) + Bug Fixes --------------------- diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java index 4491476f030..5da2f862ea2 100644 --- a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java @@ -79,15 +79,22 @@ public void testSimpleSliceLeaderElection() throws Exception { .getCoreDescriptor() .getCloudDescriptor() .getShardId()); + String jettyNodeName = jetty.getNodeName(); // must get before shutdown jetty.stop(); stoppedRunners.add(jetty); + waitForState( + "Leader should not be " + jettyNodeName, + collection, + (n, c) -> + c.getLeader("shard1") != null + && !jettyNodeName.equals(c.getLeader("shard1").getNodeName())); } for (JettySolrRunner runner : stoppedRunners) { runner.start(); } waitForState( - "Expected to see nodes come back " + collection, collection, (n, c) -> n.size() == 6); + "Expected to see nodes come back for " + collection, collection, (n, c) -> n.size() == 6); CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient()); // testLeaderElectionAfterClientTimeout @@ -99,6 +106,7 @@ public void testSimpleSliceLeaderElection() throws Exception { // timeout the leader String leader = getLeader(collection); JettySolrRunner jetty = getRunner(leader); + assertNotNull(jetty); cluster.expireZkSession(jetty); for (int i = 0; i < 60; i++) { // wait till leader is changed diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java index 17f7bdbc5d5..4879733e7fe 100644 --- a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -936,7 +936,6 @@ public Replica getLeaderRetry(String collection, String shard) throws Interrupte /** Get shard leader properties, with retry if none exist. */ public Replica getLeaderRetry(String collection, String shard, int timeout) throws InterruptedException { - AtomicReference coll = new AtomicReference<>(); AtomicReference leader = new AtomicReference<>(); try { waitForState( @@ -945,7 +944,6 @@ public Replica getLeaderRetry(String collection, String shard, int timeout) TimeUnit.MILLISECONDS, (n, c) -> { if (c == null) return false; - coll.set(c); Replica l = getLeader(n, c, shard); if (l != null) { log.debug("leader found for {}/{} to be {}", collection, shard, l); @@ -1802,6 +1800,18 @@ public void waitForState( throw new AlreadyClosedException(); } + // Check predicate against known clusterState before trying to add watchers + if (clusterState != null) { + Set liveNodes = clusterState.getLiveNodes(); + DocCollection docCollection = clusterState.getCollectionOrNull(collection); + if (liveNodes != null && docCollection != null) { + if (predicate.matches(liveNodes, docCollection)) { + log.debug("Found {} directly in clusterState", predicate); + return; + } + } + } + final CountDownLatch latch = new CountDownLatch(1); waitLatches.add(latch); AtomicReference docCollection = new AtomicReference<>(); @@ -1855,12 +1865,23 @@ public DocCollection waitForState( throw new AlreadyClosedException(); } + // Check predicate against known clusterState before trying to add watchers + if (clusterState != null) { + DocCollection docCollection = clusterState.getCollectionOrNull(collection); + if (docCollection != null) { + if (predicate.test(docCollection)) { + log.debug("Found {} directly in clusterState", predicate); + return docCollection; + } + } + } + final CountDownLatch latch = new CountDownLatch(1); waitLatches.add(latch); - AtomicReference docCollection = new AtomicReference<>(); + AtomicReference docCollectionReference = new AtomicReference<>(); DocCollectionWatcher watcher = (c) -> { - docCollection.set(c); + docCollectionReference.set(c); boolean matches = predicate.test(c); if (matches) latch.countDown(); @@ -1875,8 +1896,8 @@ public DocCollection waitForState( "Timeout waiting to see state for collection=" + collection + " :" - + docCollection.get()); - return docCollection.get(); + + docCollectionReference.get()); + return docCollectionReference.get(); } finally { removeDocCollectionWatcher(collection, watcher); waitLatches.remove(latch);