Skip to content

Commit

Permalink
SOLR-16701: Race condition on PRS enabled collection deletion (#1460)
Browse files Browse the repository at this point in the history
(cherry picked from commit f22a51c)
  • Loading branch information
patsonluk authored and magibney committed Sep 23, 2023
1 parent 5bf79fd commit f1ba2c2
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 0 deletions.
2 changes: 2 additions & 0 deletions solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ Bug Fixes

* SOLR-16925: Fix indentation for JacksonJsonWriter (Houston Putman)

* SOLR-16701: Fix race condition on PRS enabled collection deletion (Patson Luk)

Dependency Upgrades
---------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,19 @@
import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.PerReplicaStatesOps;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.CommonTestInjection;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.common.util.ZLibCompressor;
import org.apache.solr.handler.admin.ConfigSetsHandler;
import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
Expand Down Expand Up @@ -664,4 +667,121 @@ public void testWatchRaceCondition() throws Exception {
ExecutorUtil.awaitTermination(executorService);
}
}

/**
* Ensure that collection state fetching (getCollectionLive etc.) would not throw exception when
* the state.json is deleted in between the state.json read and PRS entries read
*/
public void testDeletePrsCollection() throws Exception {
ZkStateWriter writer = fixture.writer;
ZkStateReader reader = fixture.reader;

String collectionName = "c1";
fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collectionName, true);

ClusterState clusterState = reader.getClusterState();

String nodeName = "node1:10000_solr";
String sliceName = "shard1";
Slice slice = new Slice(sliceName, Map.of(), Map.of(), collectionName);

// create new collection
DocCollection state =
DocCollection.create(
collectionName,
Map.of(sliceName, slice),
Collections.singletonMap(DocCollection.CollectionStateProps.PER_REPLICA_STATE, true),
DocRouter.DEFAULT,
0,
PerReplicaStatesOps.getZkClientPrsSupplier(
fixture.zkClient, DocCollection.getCollectionPath(collectionName)));
ZkWriteCommand wc = new ZkWriteCommand(collectionName, state);
writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
clusterState = writer.writePendingUpdates();

TimeOut timeOut = new TimeOut(5000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
timeOut.waitFor(
"Timeout on waiting for c1 to show up in cluster state",
() -> reader.getClusterState().getCollectionOrNull(collectionName) != null);

String collectionPath = ZkStateReader.getCollectionPath(collectionName);

// now create the replica, take note that this has to be done after DocCollection creation with
// empty slice, otherwise the DocCollection ctor would fetch the PRS entries and throw
// exceptions
String replicaBaseUrl = Utils.getBaseUrlForNodeName(nodeName, "http");

String replicaName = "replica1";
Replica replica =
new Replica(
replicaName,
Map.of(
ZkStateReader.CORE_NAME_PROP,
"core1",
ZkStateReader.STATE_PROP,
Replica.State.ACTIVE.toString(),
ZkStateReader.NODE_NAME_PROP,
nodeName,
ZkStateReader.BASE_URL_PROP,
replicaBaseUrl,
ZkStateReader.REPLICA_TYPE,
Replica.Type.NRT.name()),
collectionName,
sliceName);

wc =
new ZkWriteCommand(
collectionName, SliceMutator.updateReplica(state, slice, replica.getName(), replica));
writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
clusterState = writer.writePendingUpdates();

timeOut.waitFor(
"Timeout on waiting for replica to show up in cluster state",
() ->
reader.getCollectionLive(collectionName).getSlice(sliceName).getReplica(replicaName)
!= null);

try (CommonTestInjection.BreakpointSetter breakpointSetter =
new CommonTestInjection.BreakpointSetter()) {
// set breakpoint such that after state.json fetch and before PRS entry fetch, we can delete
// the state.json and PRS entries to trigger the race condition
breakpointSetter.setImplementation(
PerReplicaStatesOps.class.getName() + "/beforePrsFetch",
(args) -> {
try {
// this is invoked after ZkStateReader.fetchCollectionState has fetched the state.json
// but before PRS entries.
// call delete state.json on ZK directly, very tricky to control execution order with
// writer.enqueueUpdate
reader.getZkClient().clean(collectionPath);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (KeeperException e) {
throw new RuntimeException(e);
}
});

// set breakpoint to verify the expected PrsZkNodeNotFoundException is indeed thrown within
// the execution flow, such exception is caught within the logic and not thrown to the
// caller
AtomicBoolean prsZkNodeNotFoundExceptionThrown = new AtomicBoolean(false);
breakpointSetter.setImplementation(
ZkStateReader.class.getName() + "/exercised",
(args) -> {
if (args[0] instanceof PerReplicaStatesOps.PrsZkNodeNotFoundException) {
prsZkNodeNotFoundExceptionThrown.set(true);
}
});

timeOut.waitFor(
"Timeout waiting for collection state to become null",
() -> {
// this should not throw exception even if the PRS entry read is delayed artificially
// (by previous command) and deleted after the following getCollectionLive call
return reader.getCollectionLive(collectionName) == null;
});

assertTrue(prsZkNodeNotFoundExceptionThrown.get());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Set;
import java.util.function.Function;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.CommonTestInjection;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
Expand Down Expand Up @@ -56,6 +57,8 @@ public class PerReplicaStatesOps {
public static PerReplicaStates fetch(
String path, SolrZkClient zkClient, PerReplicaStates current) {
try {
assert CommonTestInjection.injectBreakpoint(
PerReplicaStatesOps.class.getName() + "/beforePrsFetch");
if (current != null) {
Stat stat = zkClient.exists(current.path, null, true);
if (stat == null) return new PerReplicaStates(path, 0, Collections.emptyList());
Expand All @@ -64,6 +67,11 @@ public static PerReplicaStates fetch(
Stat stat = new Stat();
List<String> children = zkClient.getChildren(path, null, stat, true);
return new PerReplicaStates(path, stat.getCversion(), Collections.unmodifiableList(children));
} catch (KeeperException.NoNodeException e) {
throw new PrsZkNodeNotFoundException(
SolrException.ErrorCode.SERVER_ERROR,
"Error fetching per-replica states. The node [" + path + "] is not found",
e);
} catch (KeeperException e) {
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR, "Error fetching per-replica states", e);
Expand All @@ -76,6 +84,12 @@ public static PerReplicaStates fetch(
}
}

public static class PrsZkNodeNotFoundException extends SolrException {
private PrsZkNodeNotFoundException(ErrorCode code, String msg, Throwable cause) {
super(code, msg, cause);
}
}

public static DocCollection.PrsSupplier getZkClientPrsSupplier(
SolrZkClient zkClient, String collectionPath) {
return () -> fetch(collectionPath, zkClient, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1627,6 +1627,21 @@ private DocCollection fetchCollectionState(String coll, Watcher watcher)
}
}
return null;
} catch (PerReplicaStatesOps.PrsZkNodeNotFoundException e) {
assert CommonTestInjection.injectBreakpoint(
ZkStateReader.class.getName() + "/exercised", e);
// could be a race condition that state.json and PRS entries are deleted between the
// state.json fetch and PRS entry fetch
Stat exists = zkClient.exists(collectionPath, watcher, true);
if (exists == null) {
log.info(
"PRS entry for collection {} not found in ZK. It was probably deleted between state.json read and PRS entry read.",
coll);

return null;
} else {
throw e; // unexpected, PRS node not found but the collection state.json still exists
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@

package org.apache.solr.common.util;

import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -32,6 +38,7 @@ public class CommonTestInjection {

private static volatile Map<String, String> additionalSystemProps = null;
private static volatile Integer delay = null;
private static final ConcurrentMap<String, Breakpoint> breakpoints = new ConcurrentHashMap<>();

public static void reset() {
additionalSystemProps = null;
Expand Down Expand Up @@ -73,4 +80,93 @@ public static boolean injectDelay() {
}
return true;
}

/**
* Injects a breakpoint that pauses the existing code execution, executes the code defined in the
* breakpoint implementation and then resumes afterward. The breakpoint implementation is looked
* up by the corresponding key used in {@link BreakpointSetter#setImplementation(String,
* Breakpoint)}
*
* <p>An example usages :
*
* <ol>
* <li>Inject a precise wait until a race condition is fulfilled before proceeding with original
* code execution
* <li>Inject a flag to catch exception statement which handles the exception without
* re-throwing. This could verify caught exception does get triggered
* </ol>
*
* <p>This should always be a part of an assert statement (ie assert injectBreakpoint(key)) such
* that it will be skipped for normal code execution
*
* @see BreakpointSetter#setImplementation(String, Breakpoint)
* @param key could simply be the fully qualified class name or more granular like class name +
* other id (such as method name). This should only be set by corresponding unit test cases
* with CommonTestInjection#setBreakpoint
* @param args optional arguments list to be passed to the Breakpoint
*/
public static boolean injectBreakpoint(String key, Object... args) {
Breakpoint breakpoint = breakpoints.get(key);
if (breakpoint != null) {
log.info("Breakpoint with key {} is triggered", key);
breakpoint.executeAndResume(args);
log.info("Breakpoint with key {} was executed and normal code execution resumes", key);
} else {
log.debug(
"Breakpoint with key {} is triggered but there's no implementation set. Skipping...",
key);
}
return true;
}

public interface Breakpoint {
/**
* Code execution should break at where the breakpoint was injected, then it would execute this
* method and resumes the execution afterward.
*/
void executeAndResume(Object... args);
}

/**
* Breakpoints should be set via this {@link BreakpointSetter} within the test case and close
* should be invoked as cleanup. Since this is closeable, it should usually be used in the
* try-with-resource syntax, such as:
*
* <pre>{@code
* try (BreakpointSetter breakpointSetter = new BreakpointSetter() {
* //... test code here that calls breakpointSetter.setImplementation(...)
* }
* }</pre>
*/
public static class BreakpointSetter implements Closeable {
private Set<String> keys = new HashSet<>();
/**
* This is usually set by the test cases.
*
* <p>If a breakpoint implementation is set by this method, then code execution would break at
* the code execution point marked by CommonTestInjection#injectBreakpoint with matching key,
* executes the provided implementation in the {@link Breakpoint}, then resumes the normal code
* execution.
*
* @see CommonTestInjection#injectBreakpoint(String, Object...)
* @param key could simply be the fully qualified class name or more granular like class name +
* other id (such as method name). This should batch the key used in injectBreakpoint
* @param implementation The Breakpoint implementation
*/
public void setImplementation(String key, Breakpoint implementation) {
if (breakpoints.containsKey(key)) {
throw new IllegalArgumentException(
"Cannot redefine Breakpoint implementation with key " + key);
}
breakpoints.put(key, implementation);
keys.add(key);
}

@Override
public void close() throws IOException {
for (String key : keys) {
breakpoints.remove(key);
}
}
}
}

0 comments on commit f1ba2c2

Please sign in to comment.