Skip to content

Commit

Permalink
SOLR-17004: ZkStateReader waitForState should check clusterState befo…
Browse files Browse the repository at this point in the history
…re using watchers (#1945)
  • Loading branch information
risdenk committed Oct 2, 2023
1 parent 28be875 commit 676737b
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 7 deletions.
2 changes: 2 additions & 0 deletions solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ Optimizations

* SOLR-16989: Optimize and consolidate reuse of DocValues iterators for value retrieval (Michael Gibney)

* SOLR-17004: ZkStateReader waitForState should check clusterState before using watchers (Kevin Risden)

Bug Fixes
---------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,22 @@ public void testSimpleSliceLeaderElection() throws Exception {
.getCoreDescriptor()
.getCloudDescriptor()
.getShardId());
String jettyNodeName = jetty.getNodeName(); // must get before shutdown
jetty.stop();
stoppedRunners.add(jetty);
waitForState(
"Leader should not be " + jettyNodeName,
collection,
(n, c) ->
c.getLeader("shard1") != null
&& !jettyNodeName.equals(c.getLeader("shard1").getNodeName()));
}

for (JettySolrRunner runner : stoppedRunners) {
runner.start();
}
waitForState(
"Expected to see nodes come back " + collection, collection, (n, c) -> n.size() == 6);
"Expected to see nodes come back for " + collection, collection, (n, c) -> n.size() == 6);
CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());

// testLeaderElectionAfterClientTimeout
Expand All @@ -99,6 +106,7 @@ public void testSimpleSliceLeaderElection() throws Exception {
// timeout the leader
String leader = getLeader(collection);
JettySolrRunner jetty = getRunner(leader);
assertNotNull(jetty);
cluster.expireZkSession(jetty);

for (int i = 0; i < 60; i++) { // wait till leader is changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -936,7 +936,6 @@ public Replica getLeaderRetry(String collection, String shard) throws Interrupte
/** Get shard leader properties, with retry if none exist. */
public Replica getLeaderRetry(String collection, String shard, int timeout)
throws InterruptedException {
AtomicReference<DocCollection> coll = new AtomicReference<>();
AtomicReference<Replica> leader = new AtomicReference<>();
try {
waitForState(
Expand All @@ -945,7 +944,6 @@ public Replica getLeaderRetry(String collection, String shard, int timeout)
TimeUnit.MILLISECONDS,
(n, c) -> {
if (c == null) return false;
coll.set(c);
Replica l = getLeader(n, c, shard);
if (l != null) {
log.debug("leader found for {}/{} to be {}", collection, shard, l);
Expand Down Expand Up @@ -1802,6 +1800,18 @@ public void waitForState(
throw new AlreadyClosedException();
}

// Check predicate against known clusterState before trying to add watchers
if (clusterState != null) {
Set<String> liveNodes = clusterState.getLiveNodes();
DocCollection docCollection = clusterState.getCollectionOrNull(collection);
if (liveNodes != null && docCollection != null) {
if (predicate.matches(liveNodes, docCollection)) {
log.debug("Found {} directly in clusterState", predicate);
return;
}
}
}

final CountDownLatch latch = new CountDownLatch(1);
waitLatches.add(latch);
AtomicReference<DocCollection> docCollection = new AtomicReference<>();
Expand Down Expand Up @@ -1855,12 +1865,23 @@ public DocCollection waitForState(
throw new AlreadyClosedException();
}

// Check predicate against known clusterState before trying to add watchers
if (clusterState != null) {
DocCollection docCollection = clusterState.getCollectionOrNull(collection);
if (docCollection != null) {
if (predicate.test(docCollection)) {
log.debug("Found {} directly in clusterState", predicate);
return docCollection;
}
}
}

final CountDownLatch latch = new CountDownLatch(1);
waitLatches.add(latch);
AtomicReference<DocCollection> docCollection = new AtomicReference<>();
AtomicReference<DocCollection> docCollectionReference = new AtomicReference<>();
DocCollectionWatcher watcher =
(c) -> {
docCollection.set(c);
docCollectionReference.set(c);
boolean matches = predicate.test(c);
if (matches) latch.countDown();

Expand All @@ -1875,8 +1896,8 @@ public DocCollection waitForState(
"Timeout waiting to see state for collection="
+ collection
+ " :"
+ docCollection.get());
return docCollection.get();
+ docCollectionReference.get());
return docCollectionReference.get();
} finally {
removeDocCollectionWatcher(collection, watcher);
waitLatches.remove(latch);
Expand Down

0 comments on commit 676737b

Please sign in to comment.