Skip to content

Commit

Permalink
SOLR-17453: Drop CloudUtil.waitForState() in favor of common waitForS…
Browse files Browse the repository at this point in the history
…tate() (#2716)

Update tests so they use the standard ZkStateReader.waitForState(),
relying on a Zookeeper watch, instead of actively polling the collection
state.

(cherry picked from commit 7d60883)
  • Loading branch information
psalagnac authored and dsmiley committed Sep 20, 2024
1 parent cacb29e commit 72ad7b8
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 212 deletions.
174 changes: 0 additions & 174 deletions solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,29 @@

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.commons.io.file.PathUtils;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.CollectionStatePredicate;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CloudUtil {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

public static final int DEFAULT_TIMEOUT = 90;

/**
* See if coreNodeName has been taken over by another baseUrl and unload core + throw exception if
* it has been.
Expand Down Expand Up @@ -157,169 +146,6 @@ public static Map<String, byte[]> getTrustedKeys(SolrZkClient zk, String dir) {
return result;
}

/**
* Wait for a particular collection state to appear.
*
* <p>This is a convenience method using the {@link #DEFAULT_TIMEOUT}
*
* @param cloudManager current instance of {@link SolrCloudManager}
* @param message a message to report on failure
* @param collection the collection to watch
* @param predicate a predicate to match against the collection state
*/
public static long waitForState(
final SolrCloudManager cloudManager,
final String message,
final String collection,
final CollectionStatePredicate predicate) {
AtomicReference<DocCollection> state = new AtomicReference<>();
AtomicReference<Set<String>> liveNodesLastSeen = new AtomicReference<>();
try {
return waitForState(
cloudManager,
collection,
DEFAULT_TIMEOUT,
TimeUnit.SECONDS,
(n, c) -> {
state.set(c);
liveNodesLastSeen.set(n);
return predicate.matches(n, c);
});
} catch (Exception e) {
throw new AssertionError(
message
+ "\n"
+ "Live Nodes: "
+ liveNodesLastSeen.get()
+ "\nLast available state: "
+ state.get(),
e);
}
}

/**
* Wait for a particular collection state to appear.
*
* <p>This is a convenience method using the {@link #DEFAULT_TIMEOUT}
*
* @param cloudManager current instance of {@link SolrCloudManager}
* @param collection the collection to watch
* @param wait timeout value
* @param unit timeout unit
* @param predicate a predicate to match against the collection state
*/
public static long waitForState(
final SolrCloudManager cloudManager,
final String collection,
long wait,
final TimeUnit unit,
final CollectionStatePredicate predicate)
throws InterruptedException, TimeoutException, IOException {
TimeOut timeout = new TimeOut(wait, unit, cloudManager.getTimeSource());
long timeWarn = timeout.timeLeft(TimeUnit.MILLISECONDS) / 4;
ClusterState state = null;
DocCollection coll = null;
while (!timeout.hasTimedOut()) {
state = cloudManager.getClusterState();
coll = state.getCollectionOrNull(collection);
// due to the way we manage collections in SimClusterStateProvider a null here
// can mean that a collection is still being created but has no replicas
if (coll == null) { // does not yet exist?
timeout.sleep(100);
continue;
}
if (predicate.matches(state.getLiveNodes(), coll)) {
log.trace("-- predicate matched with state {}", state);
return timeout.timeElapsed(TimeUnit.MILLISECONDS);
}
timeout.sleep(100);
if (timeout.timeLeft(TimeUnit.MILLISECONDS) < timeWarn) {
log.trace("-- still not matching predicate: {}", state);
}
}
throw new TimeoutException("last ClusterState: " + state + ", last coll state: " + coll);
}

/**
* Return a {@link CollectionStatePredicate} that returns true if a collection has the expected
* number of active shards and replicas
*
* @param expectedShards expected number of active shards
* @param expectedReplicas expected number of active replicas
*/
public static CollectionStatePredicate clusterShape(int expectedShards, int expectedReplicas) {
return clusterShape(expectedShards, expectedReplicas, false, false);
}

/**
* Return a {@link CollectionStatePredicate} that returns true if a collection has the expected
* number of shards and replicas.
*
* <p>Note: for shards marked as inactive the current Solr behavior is that replicas remain
* active.
*
* @param expectedShards expected number of shards
* @param expectedReplicas expected number of active replicas per shard
* @param withInactive if true then count also inactive shards
* @param requireLeaders if true then require that each shard has a leader
*/
public static CollectionStatePredicate clusterShape(
int expectedShards, int expectedReplicas, boolean withInactive, boolean requireLeaders) {
return (liveNodes, collectionState) -> {
if (collectionState == null) {
log.debug("-- null collection");
return false;
}
Collection<Slice> slices =
withInactive ? collectionState.getSlices() : collectionState.getActiveSlices();
if (slices.size() != expectedShards) {
if (log.isDebugEnabled()) {
log.debug(
"-- wrong number of slices for collection {}, expected={}, found={}: {}",
collectionState.getName(),
expectedShards,
collectionState.getSlices().size(),
collectionState.getSlices());
}
return false;
}
Set<String> leaderless = new HashSet<>();
for (Slice slice : slices) {
int activeReplicas = 0;
if (requireLeaders
&& slice.getState() != Slice.State.INACTIVE
&& slice.getLeader() == null) {
leaderless.add(slice.getName());
continue;
}
// skip other checks, we're going to fail anyway
if (!leaderless.isEmpty()) {
continue;
}
for (Replica replica : slice) {
if (replica.isActive(liveNodes)) activeReplicas++;
}
if (activeReplicas != expectedReplicas) {
if (log.isDebugEnabled()) {
log.debug(
"-- wrong number of active replicas for collection {} in slice {}, expected={}, found={}",
collectionState.getName(),
slice.getName(),
expectedReplicas,
activeReplicas);
}
return false;
}
}
if (leaderless.isEmpty()) {
return true;
} else {
log.info("-- shards without leaders: {}", leaderless);
return false;
}
};
}

/**
* Builds a string with sorted {@link CoreContainer#getLoadedCoreNames()} while truncating to the
* first 20 cores.
Expand Down
86 changes: 54 additions & 32 deletions solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
Expand All @@ -31,6 +32,7 @@
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.cloud.api.collections.ReindexCollectionCmd;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
Expand All @@ -39,6 +41,7 @@
import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TestInjection;
import org.apache.solr.util.TimeOut;
Expand Down Expand Up @@ -78,15 +81,32 @@ public void doBefore() {
.build();
}

private ReindexCollectionCmd.State getState(String collection) {
try {
return ReindexCollectionCmd.State.get(
ReindexCollectionCmd.getReindexingState(stateManager, collection)
.get(ReindexCollectionCmd.STATE));
} catch (Exception e) {
fail("Unexpected exception checking state of " + collection + ": " + e);
return null;
/** Wait for the reindexing state to have the specified value, or fail (can be null). */
private void waitForReindexingState(String collection, ReindexCollectionCmd.State expected) {

ReindexCollectionCmd.State lastSeen = null;

TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (!timeout.hasTimedOut()) {
try {
lastSeen =
ReindexCollectionCmd.State.get(
ReindexCollectionCmd.getReindexingState(stateManager, collection)
.get(ReindexCollectionCmd.STATE));

// exit busy waiting if expected state was found
if (lastSeen == expected) {
break;
}
} catch (Exception e) {
fail("Unexpected exception checking state of " + collection + ": " + e);
return;
}
}

assertFalse(
String.format(Locale.ROOT, "Expecting state %s, last seen was %s", expected, lastSeen),
timeout.hasTimedOut());
}

@After
Expand Down Expand Up @@ -128,8 +148,7 @@ public void testBasicReindexing() throws Exception {
assertEquals(
status.toString(), (long) NUM_DOCS, ((Number) status.get("processedDocs")).longValue());

CloudUtil.waitForState(
cloudManager,
waitForState(
"did not finish copying in time",
targetCollection,
(liveNodes, coll) -> {
Expand Down Expand Up @@ -185,8 +204,7 @@ private void doTestSameTargetReindexing(boolean sourceRemove, boolean followAlia
}
assertNotNull("target collection not present after 30s", realTargetCollection);

CloudUtil.waitForState(
cloudManager,
waitForState(
"did not finish copying in time",
realTargetCollection,
(liveNodes, coll) -> {
Expand Down Expand Up @@ -227,8 +245,7 @@ public void testLossySchema() throws Exception {
.setConfigName("conf3");
req.process(solrClient);

CloudUtil.waitForState(
cloudManager,
waitForState(
"did not finish copying in time",
targetCollection,
(liveNodes, coll) -> {
Expand Down Expand Up @@ -271,8 +288,7 @@ public void testReshapeReindexing() throws Exception {
.setCollectionParam("q", "id:10*");
req.process(solrClient);

CloudUtil.waitForState(
cloudManager,
waitForState(
"did not finish copying in time",
targetCollection,
(liveNodes, coll) -> {
Expand Down Expand Up @@ -361,14 +377,13 @@ public void testFailure() throws Exception {
coll.getName().startsWith(ReindexCollectionCmd.CHK_COL_PREFIX));
});
// verify that the source collection is read-write and has no reindexing flags
CloudUtil.waitForState(
cloudManager,
waitForState(
"collection state is incorrect",
sourceCollection,
((liveNodes, collectionState) ->
(liveNodes, collectionState) ->
!collectionState.isReadOnly()
&& collectionState.getStr(ReindexCollectionCmd.REINDEXING_STATE) == null
&& getState(sourceCollection) == null));
&& collectionState.getStr(ReindexCollectionCmd.REINDEXING_STATE) == null);
waitForReindexingState(sourceCollection, null);
}

@Test
Expand All @@ -383,8 +398,7 @@ public void testAbort() throws Exception {
CollectionAdminRequest.reindexCollection(sourceCollection).setTarget(targetCollection);
String asyncId = req.processAsync(solrClient);
// wait for the source collection to be put in readOnly mode
CloudUtil.waitForState(
cloudManager,
waitForState(
"source collection didn't become readOnly",
sourceCollection,
(liveNodes, coll) -> coll.isReadOnly());
Expand All @@ -397,13 +411,11 @@ public void testAbort() throws Exception {
assertNotNull(rsp.toString(), status);
assertEquals(status.toString(), "aborting", status.get("state"));

CloudUtil.waitForState(
cloudManager,
waitForState(
"incorrect collection state",
sourceCollection,
((liveNodes, collectionState) ->
collectionState.isReadOnly()
&& getState(sourceCollection) == ReindexCollectionCmd.State.ABORTED));
(liveNodes, collectionState) -> collectionState.isReadOnly());
waitForReindexingState(sourceCollection, ReindexCollectionCmd.State.ABORTED);

// verify status
req.setCommand("status");
Expand All @@ -413,14 +425,24 @@ public void testAbort() throws Exception {
assertEquals(status.toString(), "aborted", status.get("state"));
// let the process continue
TestInjection.reindexLatch.countDown();
CloudUtil.waitForState(
cloudManager,
waitForState(
"source collection is in wrong state",
sourceCollection,
(liveNodes, docCollection) ->
!docCollection.isReadOnly() && getState(sourceCollection) == null);
(liveNodes, docCollection) -> !docCollection.isReadOnly());
waitForReindexingState(sourceCollection, null);

// verify the response
rsp = CollectionAdminRequest.requestStatus(asyncId).process(solrClient);
int retries = 0;
while (retries < 5
&& ((CollectionAdminRequest.RequestStatusResponse) rsp).getRequestStatus()
== RequestStatusState.RUNNING) {
// There is a chance the async status hasn't been updated yet when we retrieve it the first
// time. Just wait and retry a couple of times
Thread.sleep(100L);
rsp = CollectionAdminRequest.requestStatus(asyncId).process(solrClient);
retries++;
}
status = (Map<String, Object>) rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS);
assertNotNull(rsp.toString(), status);
assertEquals(status.toString(), "aborted", status.get("state"));
Expand Down
Loading

0 comments on commit 72ad7b8

Please sign in to comment.