Skip to content

Commit

Permalink
SOLR-11535: Fix race condition in singleton-per-collection StateWatch…
Browse files Browse the repository at this point in the history
…er creation (#151)

back-ported from active upstream PR as of commit
405e1a8
  • Loading branch information
magibney authored Oct 6, 2023
1 parent 38c7694 commit 84d0d39
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public String toString() {
}

// synchronized due to SOLR-11535
// TODO: remove `synchronized`, now that SOLR-11535 is fixed
@Override
public synchronized boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
if (log.isDebugEnabled()) {
Expand Down
1 change: 1 addition & 0 deletions solr/core/src/java/org/apache/solr/cloud/ZkController.java
Original file line number Diff line number Diff line change
Expand Up @@ -2883,6 +2883,7 @@ public UnloadCoreOnDeletedWatcher(String coreNodeName, String shard, String core

@Override
// synchronized due to SOLR-11535
// TODO: remove `synchronized`, now that SOLR-11535 is fixed
public synchronized boolean onStateChanged(DocCollection collectionState) {
if (getCoreContainer().getCoreDescriptor(coreName) == null) return true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,16 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cloud.OverseerTest;
Expand Down Expand Up @@ -669,6 +674,73 @@ public void testWatchRaceCondition() throws Exception {
}
}

/**
* Simulates race condition that might arise when state updates triggered by watch notification
* contend with removal of collection watches.
*
* <p>Such race condition should no longer exist with the new code that uses a single map for both
* "collection watches" and "latest state of watched collection"
*/
public void testStateWatcherRaceCondition() throws Exception {
ZkStateWriter writer = fixture.writer;
final ZkStateReader reader = fixture.reader;
fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
int extraWatchers = 10;
int iterations = 10;
for (int i = 0; i < extraWatchers; i++) {
// add and remove a bunch of watchers
DocCollectionWatcher w = (coll) -> false;
try {
reader.registerDocCollectionWatcher("c1", w);
} finally {
reader.removeDocCollectionWatcher("c1", w);
}
}
final ConcurrentHashMap<Integer, LongAdder> invoked = new ConcurrentHashMap<>();
CyclicBarrier barrier = new CyclicBarrier(2);
reader.registerDocCollectionWatcher(
"c1",
(coll) -> {
// add a watcher that tracks how many times it's invoked per znode version
if (coll != null) {
try {
barrier.await(250, TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
invoked.computeIfAbsent(coll.getZNodeVersion(), (k) -> new LongAdder()).increment();
}
return false;
});

ClusterState clusterState = reader.getClusterState();
int dataVersion = -1;
for (int i = 0; i < iterations; i++) {
// create or update collection
DocCollection state =
DocCollection.create(
"c1",
new HashMap<>(),
Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
DocRouter.DEFAULT,
dataVersion,
PerReplicaStatesFetcher.getZkClientPrsSupplier(
fixture.zkClient, DocCollection.getCollectionPath("c1")));
ZkWriteCommand wc = new ZkWriteCommand("c1", state);
writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
clusterState = writer.writePendingUpdates();
barrier.await(250, TimeUnit.MILLISECONDS); // wait for the watch callback to execute
fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1" + i, true);
dataVersion = clusterState.getCollectionOrNull("c1").getZNodeVersion();
}
// expect to have been invoked for each iteration ...
assertEquals(iterations, invoked.size());
// ... and only _once_ for each iteration
assertTrue(
"wrong number of watchers (expected 1): " + invoked,
invoked.values().stream().mapToLong(LongAdder::sum).allMatch((l) -> l == 1));
}

/**
* Ensure that collection state fetching (getCollectionLive etc.) would not throw exception when
* the state.json is deleted in between the state.json read and PRS entries read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,11 @@ private StatefulCollectionWatch compute(

private static class StatefulCollectionWatch extends CollectionWatch<DocCollectionWatcher> {
private DocCollection currentState;
private volatile StateWatcher associatedWatcher;

private StatefulCollectionWatch(StateWatcher associatedWatcher) {
this.associatedWatcher = associatedWatcher;
}
}

public static final Set<String> KNOWN_CLUSTER_PROPS =
Expand Down Expand Up @@ -642,8 +647,10 @@ private void constructState(Set<String> changedCollections) {

/** Refresh collections. */
private void refreshCollections() {
for (String coll : collectionWatches.watchedCollections()) {
new StateWatcher(coll).refreshAndWatch();
for (Entry<String, StatefulCollectionWatch> e : collectionWatches.watchedCollectionEntries()) {
StateWatcher newStateWatcher = new StateWatcher(e.getKey());
e.getValue().associatedWatcher = newStateWatcher;
newStateWatcher.refreshAndWatch();
}
}

Expand Down Expand Up @@ -1331,8 +1338,9 @@ public void process(WatchedEvent event) {
return;
}

if (!collectionWatches.watchedCollections().contains(coll)) {
// This collection is no longer interesting, stop watching.
StatefulCollectionWatch scw = collectionWatches.statefulWatchesByCollectionName.get(coll);
if (scw == null || scw.associatedWatcher != this) {
// Collection no longer interesting, or we have been replaced by a different watcher.
log.debug("Uninteresting collection {}", coll);
return;
}
Expand Down Expand Up @@ -1668,19 +1676,21 @@ public static String getCollectionPath(String coll) {
* @see ZkStateReader#unregisterCore(String)
*/
public void registerCore(String collection) {
AtomicBoolean reconstructState = new AtomicBoolean(false);
AtomicReference<StateWatcher> newWatcherRef = new AtomicReference<>();
collectionWatches.compute(
collection,
(k, v) -> {
if (v == null) {
reconstructState.set(true);
v = new StatefulCollectionWatch();
StateWatcher stateWatcher = new StateWatcher(collection);
newWatcherRef.set(stateWatcher);
v = new StatefulCollectionWatch(stateWatcher);
}
v.coreRefCount++;
return v;
});
if (reconstructState.get()) {
new StateWatcher(collection).refreshAndWatch();
StateWatcher newWatcher = newWatcherRef.get();
if (newWatcher != null) {
newWatcher.refreshAndWatch();
}
}

Expand Down Expand Up @@ -1752,26 +1762,29 @@ public void registerCollectionStateWatcher(
* <p>The Watcher will automatically be removed when it's <code>onStateChanged</code> returns
* <code>true</code>
*/
public void registerDocCollectionWatcher(String collection, DocCollectionWatcher stateWatcher) {
AtomicBoolean watchSet = new AtomicBoolean(false);
public void registerDocCollectionWatcher(
String collection, DocCollectionWatcher docCollectionWatcher) {
AtomicReference<StateWatcher> newWatcherRef = new AtomicReference<>();
collectionWatches.compute(
collection,
(k, v) -> {
if (v == null) {
v = new StatefulCollectionWatch();
watchSet.set(true);
StateWatcher stateWatcher = new StateWatcher(collection);
newWatcherRef.set(stateWatcher);
v = new StatefulCollectionWatch(stateWatcher);
}
v.stateWatchers.add(stateWatcher);
v.stateWatchers.add(docCollectionWatcher);
return v;
});

if (watchSet.get()) {
new StateWatcher(collection).refreshAndWatch();
StateWatcher newWatcher = newWatcherRef.get();
if (newWatcher != null) {
newWatcher.refreshAndWatch();
}

DocCollection state = clusterState.getCollectionOrNull(collection);
if (stateWatcher.onStateChanged(state) == true) {
removeDocCollectionWatcher(collection, stateWatcher);
if (docCollectionWatcher.onStateChanged(state) == true) {
removeDocCollectionWatcher(collection, docCollectionWatcher);
}
}

Expand Down

0 comments on commit 84d0d39

Please sign in to comment.