From f1ba2c2cb8050ad958326141d86e573a28ab0516 Mon Sep 17 00:00:00 2001 From: patsonluk Date: Fri, 22 Sep 2023 17:21:46 -0700 Subject: [PATCH] SOLR-16701: Race condition on PRS enabled collection deletion (#1460) (cherry picked from commit f22a51cc64f83f7b1268d9f3a4c50e36249bdd87) --- solr/CHANGES.txt | 2 + .../cloud/overseer/ZkStateReaderTest.java | 120 ++++++++++++++++++ .../common/cloud/PerReplicaStatesOps.java | 14 ++ .../solr/common/cloud/ZkStateReader.java | 15 +++ .../solr/common/util/CommonTestInjection.java | 96 ++++++++++++++ 5 files changed, 247 insertions(+) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 9978cea69ff..ba6d4cf9855 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -147,6 +147,8 @@ Bug Fixes * SOLR-16925: Fix indentation for JacksonJsonWriter (Houston Putman) +* SOLR-16701: Fix race condition on PRS enabled collection deletion (Patson Luk) + Dependency Upgrades --------------------- diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java index 6e1f4221943..3e1ca33963c 100644 --- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java @@ -43,16 +43,19 @@ import org.apache.solr.common.cloud.PerReplicaStates; import org.apache.solr.common.cloud.PerReplicaStatesOps; import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.util.CommonTestInjection; import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.SolrNamedThreadFactory; import org.apache.solr.common.util.TimeSource; +import org.apache.solr.common.util.Utils; import org.apache.solr.common.util.ZLibCompressor; import org.apache.solr.handler.admin.ConfigSetsHandler; import org.apache.solr.util.LogLevel; import org.apache.solr.util.TimeOut; +import org.apache.zookeeper.KeeperException; import org.junit.After; import org.junit.Before; import org.slf4j.Logger; @@ -664,4 +667,121 @@ public void testWatchRaceCondition() throws Exception { ExecutorUtil.awaitTermination(executorService); } } + + /** + * Ensure that collection state fetching (getCollectionLive etc.) would not throw exception when + * the state.json is deleted in between the state.json read and PRS entries read + */ + public void testDeletePrsCollection() throws Exception { + ZkStateWriter writer = fixture.writer; + ZkStateReader reader = fixture.reader; + + String collectionName = "c1"; + fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collectionName, true); + + ClusterState clusterState = reader.getClusterState(); + + String nodeName = "node1:10000_solr"; + String sliceName = "shard1"; + Slice slice = new Slice(sliceName, Map.of(), Map.of(), collectionName); + + // create new collection + DocCollection state = + DocCollection.create( + collectionName, + Map.of(sliceName, slice), + Collections.singletonMap(DocCollection.CollectionStateProps.PER_REPLICA_STATE, true), + DocRouter.DEFAULT, + 0, + PerReplicaStatesOps.getZkClientPrsSupplier( + fixture.zkClient, DocCollection.getCollectionPath(collectionName))); + ZkWriteCommand wc = new ZkWriteCommand(collectionName, state); + writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null); + clusterState = writer.writePendingUpdates(); + + TimeOut timeOut = new TimeOut(5000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME); + timeOut.waitFor( + "Timeout on waiting for c1 to show up in cluster state", + () -> reader.getClusterState().getCollectionOrNull(collectionName) != null); + + String collectionPath = ZkStateReader.getCollectionPath(collectionName); + + // now create the replica, take note that this has to be done after DocCollection creation with + // empty slice, otherwise the DocCollection ctor would fetch the PRS entries and throw + // exceptions + String replicaBaseUrl = Utils.getBaseUrlForNodeName(nodeName, "http"); + + String replicaName = "replica1"; + Replica replica = + new Replica( + replicaName, + Map.of( + ZkStateReader.CORE_NAME_PROP, + "core1", + ZkStateReader.STATE_PROP, + Replica.State.ACTIVE.toString(), + ZkStateReader.NODE_NAME_PROP, + nodeName, + ZkStateReader.BASE_URL_PROP, + replicaBaseUrl, + ZkStateReader.REPLICA_TYPE, + Replica.Type.NRT.name()), + collectionName, + sliceName); + + wc = + new ZkWriteCommand( + collectionName, SliceMutator.updateReplica(state, slice, replica.getName(), replica)); + writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null); + clusterState = writer.writePendingUpdates(); + + timeOut.waitFor( + "Timeout on waiting for replica to show up in cluster state", + () -> + reader.getCollectionLive(collectionName).getSlice(sliceName).getReplica(replicaName) + != null); + + try (CommonTestInjection.BreakpointSetter breakpointSetter = + new CommonTestInjection.BreakpointSetter()) { + // set breakpoint such that after state.json fetch and before PRS entry fetch, we can delete + // the state.json and PRS entries to trigger the race condition + breakpointSetter.setImplementation( + PerReplicaStatesOps.class.getName() + "/beforePrsFetch", + (args) -> { + try { + // this is invoked after ZkStateReader.fetchCollectionState has fetched the state.json + // but before PRS entries. + // call delete state.json on ZK directly, very tricky to control execution order with + // writer.enqueueUpdate + reader.getZkClient().clean(collectionPath); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (KeeperException e) { + throw new RuntimeException(e); + } + }); + + // set breakpoint to verify the expected PrsZkNodeNotFoundException is indeed thrown within + // the execution flow, such exception is caught within the logic and not thrown to the + // caller + AtomicBoolean prsZkNodeNotFoundExceptionThrown = new AtomicBoolean(false); + breakpointSetter.setImplementation( + ZkStateReader.class.getName() + "/exercised", + (args) -> { + if (args[0] instanceof PerReplicaStatesOps.PrsZkNodeNotFoundException) { + prsZkNodeNotFoundExceptionThrown.set(true); + } + }); + + timeOut.waitFor( + "Timeout waiting for collection state to become null", + () -> { + // this should not throw exception even if the PRS entry read is delayed artificially + // (by previous command) and deleted after the following getCollectionLive call + return reader.getCollectionLive(collectionName) == null; + }); + + assertTrue(prsZkNodeNotFoundExceptionThrown.get()); + } + } } diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java index 475b852e75c..dd0b14fd3e1 100644 --- a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java +++ b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java @@ -27,6 +27,7 @@ import java.util.Set; import java.util.function.Function; import org.apache.solr.common.SolrException; +import org.apache.solr.common.util.CommonTestInjection; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Op; @@ -56,6 +57,8 @@ public class PerReplicaStatesOps { public static PerReplicaStates fetch( String path, SolrZkClient zkClient, PerReplicaStates current) { try { + assert CommonTestInjection.injectBreakpoint( + PerReplicaStatesOps.class.getName() + "/beforePrsFetch"); if (current != null) { Stat stat = zkClient.exists(current.path, null, true); if (stat == null) return new PerReplicaStates(path, 0, Collections.emptyList()); @@ -64,6 +67,11 @@ public static PerReplicaStates fetch( Stat stat = new Stat(); List children = zkClient.getChildren(path, null, stat, true); return new PerReplicaStates(path, stat.getCversion(), Collections.unmodifiableList(children)); + } catch (KeeperException.NoNodeException e) { + throw new PrsZkNodeNotFoundException( + SolrException.ErrorCode.SERVER_ERROR, + "Error fetching per-replica states. The node [" + path + "] is not found", + e); } catch (KeeperException e) { throw new SolrException( SolrException.ErrorCode.SERVER_ERROR, "Error fetching per-replica states", e); @@ -76,6 +84,12 @@ public static PerReplicaStates fetch( } } + public static class PrsZkNodeNotFoundException extends SolrException { + private PrsZkNodeNotFoundException(ErrorCode code, String msg, Throwable cause) { + super(code, msg, cause); + } + } + public static DocCollection.PrsSupplier getZkClientPrsSupplier( SolrZkClient zkClient, String collectionPath) { return () -> fetch(collectionPath, zkClient, null); diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java index d8115fdf1b8..17f7bdbc5d5 100644 --- a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -1627,6 +1627,21 @@ private DocCollection fetchCollectionState(String coll, Watcher watcher) } } return null; + } catch (PerReplicaStatesOps.PrsZkNodeNotFoundException e) { + assert CommonTestInjection.injectBreakpoint( + ZkStateReader.class.getName() + "/exercised", e); + // could be a race condition that state.json and PRS entries are deleted between the + // state.json fetch and PRS entry fetch + Stat exists = zkClient.exists(collectionPath, watcher, true); + if (exists == null) { + log.info( + "PRS entry for collection {} not found in ZK. It was probably deleted between state.json read and PRS entry read.", + coll); + + return null; + } else { + throw e; // unexpected, PRS node not found but the collection state.json still exists + } } } } diff --git a/solr/solrj/src/java/org/apache/solr/common/util/CommonTestInjection.java b/solr/solrj/src/java/org/apache/solr/common/util/CommonTestInjection.java index e1d7caa8dd9..bcacd715a71 100644 --- a/solr/solrj/src/java/org/apache/solr/common/util/CommonTestInjection.java +++ b/solr/solrj/src/java/org/apache/solr/common/util/CommonTestInjection.java @@ -17,8 +17,14 @@ package org.apache.solr.common.util; +import java.io.Closeable; +import java.io.IOException; import java.lang.invoke.MethodHandles; +import java.util.HashSet; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +38,7 @@ public class CommonTestInjection { private static volatile Map additionalSystemProps = null; private static volatile Integer delay = null; + private static final ConcurrentMap breakpoints = new ConcurrentHashMap<>(); public static void reset() { additionalSystemProps = null; @@ -73,4 +80,93 @@ public static boolean injectDelay() { } return true; } + + /** + * Injects a breakpoint that pauses the existing code execution, executes the code defined in the + * breakpoint implementation and then resumes afterward. The breakpoint implementation is looked + * up by the corresponding key used in {@link BreakpointSetter#setImplementation(String, + * Breakpoint)} + * + *

An example usages : + * + *

    + *
  1. Inject a precise wait until a race condition is fulfilled before proceeding with original + * code execution + *
  2. Inject a flag to catch exception statement which handles the exception without + * re-throwing. This could verify caught exception does get triggered + *
+ * + *

This should always be a part of an assert statement (ie assert injectBreakpoint(key)) such + * that it will be skipped for normal code execution + * + * @see BreakpointSetter#setImplementation(String, Breakpoint) + * @param key could simply be the fully qualified class name or more granular like class name + + * other id (such as method name). This should only be set by corresponding unit test cases + * with CommonTestInjection#setBreakpoint + * @param args optional arguments list to be passed to the Breakpoint + */ + public static boolean injectBreakpoint(String key, Object... args) { + Breakpoint breakpoint = breakpoints.get(key); + if (breakpoint != null) { + log.info("Breakpoint with key {} is triggered", key); + breakpoint.executeAndResume(args); + log.info("Breakpoint with key {} was executed and normal code execution resumes", key); + } else { + log.debug( + "Breakpoint with key {} is triggered but there's no implementation set. Skipping...", + key); + } + return true; + } + + public interface Breakpoint { + /** + * Code execution should break at where the breakpoint was injected, then it would execute this + * method and resumes the execution afterward. + */ + void executeAndResume(Object... args); + } + + /** + * Breakpoints should be set via this {@link BreakpointSetter} within the test case and close + * should be invoked as cleanup. Since this is closeable, it should usually be used in the + * try-with-resource syntax, such as: + * + *

{@code
+   * try (BreakpointSetter breakpointSetter = new BreakpointSetter() {
+   *     //... test code here that calls breakpointSetter.setImplementation(...)
+   * }
+   * }
+ */ + public static class BreakpointSetter implements Closeable { + private Set keys = new HashSet<>(); + /** + * This is usually set by the test cases. + * + *

If a breakpoint implementation is set by this method, then code execution would break at + * the code execution point marked by CommonTestInjection#injectBreakpoint with matching key, + * executes the provided implementation in the {@link Breakpoint}, then resumes the normal code + * execution. + * + * @see CommonTestInjection#injectBreakpoint(String, Object...) + * @param key could simply be the fully qualified class name or more granular like class name + + * other id (such as method name). This should batch the key used in injectBreakpoint + * @param implementation The Breakpoint implementation + */ + public void setImplementation(String key, Breakpoint implementation) { + if (breakpoints.containsKey(key)) { + throw new IllegalArgumentException( + "Cannot redefine Breakpoint implementation with key " + key); + } + breakpoints.put(key, implementation); + keys.add(key); + } + + @Override + public void close() throws IOException { + for (String key : keys) { + breakpoints.remove(key); + } + } + } }