diff --git a/solr/core/src/java/org/apache/solr/cloud/ActiveReplicaWatcher.java b/solr/core/src/java/org/apache/solr/cloud/ActiveReplicaWatcher.java index 4fe3963b7cd..7b9500a9e5b 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ActiveReplicaWatcher.java +++ b/solr/core/src/java/org/apache/solr/cloud/ActiveReplicaWatcher.java @@ -119,6 +119,7 @@ public String toString() { } // synchronized due to SOLR-11535 + // TODO: remove `synchronized`, now that SOLR-11535 is fixed @Override public synchronized boolean onStateChanged(Set liveNodes, DocCollection collectionState) { if (log.isDebugEnabled()) { diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 45293d6772a..81584eb0319 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -2883,6 +2883,7 @@ public UnloadCoreOnDeletedWatcher(String coreNodeName, String shard, String core @Override // synchronized due to SOLR-11535 + // TODO: remove `synchronized`, now that SOLR-11535 is fixed public synchronized boolean onStateChanged(DocCollection collectionState) { if (getCoreContainer().getCoreDescriptor(coreName) == null) return true; diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java index 88e1dd54f77..de306340f48 100644 --- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java @@ -25,11 +25,16 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; import org.apache.lucene.util.IOUtils; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.cloud.OverseerTest; @@ -669,6 +674,73 @@ public void testWatchRaceCondition() throws Exception { } } + /** + * Simulates race condition that might arise when state updates triggered by watch notification + * contend with removal of collection watches. + * + *

Such race condition should no longer exist with the new code that uses a single map for both + * "collection watches" and "latest state of watched collection" + */ + public void testStateWatcherRaceCondition() throws Exception { + ZkStateWriter writer = fixture.writer; + final ZkStateReader reader = fixture.reader; + fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); + int extraWatchers = 10; + int iterations = 10; + for (int i = 0; i < extraWatchers; i++) { + // add and remove a bunch of watchers + DocCollectionWatcher w = (coll) -> false; + try { + reader.registerDocCollectionWatcher("c1", w); + } finally { + reader.removeDocCollectionWatcher("c1", w); + } + } + final ConcurrentHashMap invoked = new ConcurrentHashMap<>(); + CyclicBarrier barrier = new CyclicBarrier(2); + reader.registerDocCollectionWatcher( + "c1", + (coll) -> { + // add a watcher that tracks how many times it's invoked per znode version + if (coll != null) { + try { + barrier.await(250, TimeUnit.MILLISECONDS); + } catch (InterruptedException | TimeoutException | BrokenBarrierException e) { + throw new RuntimeException(e); + } + invoked.computeIfAbsent(coll.getZNodeVersion(), (k) -> new LongAdder()).increment(); + } + return false; + }); + + ClusterState clusterState = reader.getClusterState(); + int dataVersion = -1; + for (int i = 0; i < iterations; i++) { + // create or update collection + DocCollection state = + DocCollection.create( + "c1", + new HashMap<>(), + Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), + DocRouter.DEFAULT, + dataVersion, + PerReplicaStatesFetcher.getZkClientPrsSupplier( + fixture.zkClient, DocCollection.getCollectionPath("c1"))); + ZkWriteCommand wc = new ZkWriteCommand("c1", state); + writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null); + clusterState = writer.writePendingUpdates(); + barrier.await(250, TimeUnit.MILLISECONDS); // wait for the watch callback to execute + fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1" + i, true); + dataVersion = clusterState.getCollectionOrNull("c1").getZNodeVersion(); + } + // expect to have been invoked for each iteration ... + assertEquals(iterations, invoked.size()); + // ... and only _once_ for each iteration + assertTrue( + "wrong number of watchers (expected 1): " + invoked, + invoked.values().stream().mapToLong(LongAdder::sum).allMatch((l) -> l == 1)); + } + /** * Ensure that collection state fetching (getCollectionLive etc.) would not throw exception when * the state.json is deleted in between the state.json read and PRS entries read diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java index 0e36734fdf1..11a020dc6b8 100644 --- a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -372,6 +372,11 @@ private StatefulCollectionWatch compute( private static class StatefulCollectionWatch extends CollectionWatch { private DocCollection currentState; + private volatile StateWatcher associatedWatcher; + + private StatefulCollectionWatch(StateWatcher associatedWatcher) { + this.associatedWatcher = associatedWatcher; + } } public static final Set KNOWN_CLUSTER_PROPS = @@ -642,8 +647,10 @@ private void constructState(Set changedCollections) { /** Refresh collections. */ private void refreshCollections() { - for (String coll : collectionWatches.watchedCollections()) { - new StateWatcher(coll).refreshAndWatch(); + for (Entry e : collectionWatches.watchedCollectionEntries()) { + StateWatcher newStateWatcher = new StateWatcher(e.getKey()); + e.getValue().associatedWatcher = newStateWatcher; + newStateWatcher.refreshAndWatch(); } } @@ -1331,8 +1338,9 @@ public void process(WatchedEvent event) { return; } - if (!collectionWatches.watchedCollections().contains(coll)) { - // This collection is no longer interesting, stop watching. + StatefulCollectionWatch scw = collectionWatches.statefulWatchesByCollectionName.get(coll); + if (scw == null || scw.associatedWatcher != this) { + // Collection no longer interesting, or we have been replaced by a different watcher. log.debug("Uninteresting collection {}", coll); return; } @@ -1668,19 +1676,21 @@ public static String getCollectionPath(String coll) { * @see ZkStateReader#unregisterCore(String) */ public void registerCore(String collection) { - AtomicBoolean reconstructState = new AtomicBoolean(false); + AtomicReference newWatcherRef = new AtomicReference<>(); collectionWatches.compute( collection, (k, v) -> { if (v == null) { - reconstructState.set(true); - v = new StatefulCollectionWatch(); + StateWatcher stateWatcher = new StateWatcher(collection); + newWatcherRef.set(stateWatcher); + v = new StatefulCollectionWatch(stateWatcher); } v.coreRefCount++; return v; }); - if (reconstructState.get()) { - new StateWatcher(collection).refreshAndWatch(); + StateWatcher newWatcher = newWatcherRef.get(); + if (newWatcher != null) { + newWatcher.refreshAndWatch(); } } @@ -1752,26 +1762,29 @@ public void registerCollectionStateWatcher( *

The Watcher will automatically be removed when it's onStateChanged returns * true */ - public void registerDocCollectionWatcher(String collection, DocCollectionWatcher stateWatcher) { - AtomicBoolean watchSet = new AtomicBoolean(false); + public void registerDocCollectionWatcher( + String collection, DocCollectionWatcher docCollectionWatcher) { + AtomicReference newWatcherRef = new AtomicReference<>(); collectionWatches.compute( collection, (k, v) -> { if (v == null) { - v = new StatefulCollectionWatch(); - watchSet.set(true); + StateWatcher stateWatcher = new StateWatcher(collection); + newWatcherRef.set(stateWatcher); + v = new StatefulCollectionWatch(stateWatcher); } - v.stateWatchers.add(stateWatcher); + v.stateWatchers.add(docCollectionWatcher); return v; }); - if (watchSet.get()) { - new StateWatcher(collection).refreshAndWatch(); + StateWatcher newWatcher = newWatcherRef.get(); + if (newWatcher != null) { + newWatcher.refreshAndWatch(); } DocCollection state = clusterState.getCollectionOrNull(collection); - if (stateWatcher.onStateChanged(state) == true) { - removeDocCollectionWatcher(collection, stateWatcher); + if (docCollectionWatcher.onStateChanged(state) == true) { + removeDocCollectionWatcher(collection, docCollectionWatcher); } }