diff --git a/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java b/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java index de9181e505c..e1fc0e3e456 100644 --- a/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java +++ b/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java @@ -18,23 +18,15 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.apache.commons.io.file.PathUtils; -import org.apache.solr.client.solrj.cloud.SolrCloudManager; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.ClusterState; -import org.apache.solr.common.cloud.CollectionStatePredicate; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; @@ -42,7 +34,6 @@ import org.apache.solr.core.CoreContainer; import org.apache.solr.core.CoreDescriptor; import org.apache.solr.core.SolrResourceLoader; -import org.apache.solr.util.TimeOut; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,8 +41,6 @@ public class CloudUtil { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - public static final int DEFAULT_TIMEOUT = 90; - /** * See if coreNodeName has been taken over by another baseUrl and unload core + throw exception if * it has been. @@ -157,169 +146,6 @@ public static Map<String, byte[]> getTrustedKeys(SolrZkClient zk, String dir) { return result; } - /** - * Wait for a particular collection state to appear. - * - * <p>This is a convenience method using the {@link #DEFAULT_TIMEOUT} - * - * @param cloudManager current instance of {@link SolrCloudManager} - * @param message a message to report on failure - * @param collection the collection to watch - * @param predicate a predicate to match against the collection state - */ - public static long waitForState( - final SolrCloudManager cloudManager, - final String message, - final String collection, - final CollectionStatePredicate predicate) { - AtomicReference<DocCollection> state = new AtomicReference<>(); - AtomicReference<Set<String>> liveNodesLastSeen = new AtomicReference<>(); - try { - return waitForState( - cloudManager, - collection, - DEFAULT_TIMEOUT, - TimeUnit.SECONDS, - (n, c) -> { - state.set(c); - liveNodesLastSeen.set(n); - return predicate.matches(n, c); - }); - } catch (Exception e) { - throw new AssertionError( - message - + "\n" - + "Live Nodes: " - + liveNodesLastSeen.get() - + "\nLast available state: " - + state.get(), - e); - } - } - - /** - * Wait for a particular collection state to appear. - * - * <p>This is a convenience method using the {@link #DEFAULT_TIMEOUT} - * - * @param cloudManager current instance of {@link SolrCloudManager} - * @param collection the collection to watch - * @param wait timeout value - * @param unit timeout unit - * @param predicate a predicate to match against the collection state - */ - public static long waitForState( - final SolrCloudManager cloudManager, - final String collection, - long wait, - final TimeUnit unit, - final CollectionStatePredicate predicate) - throws InterruptedException, TimeoutException, IOException { - TimeOut timeout = new TimeOut(wait, unit, cloudManager.getTimeSource()); - long timeWarn = timeout.timeLeft(TimeUnit.MILLISECONDS) / 4; - ClusterState state = null; - DocCollection coll = null; - while (!timeout.hasTimedOut()) { - state = cloudManager.getClusterState(); - coll = state.getCollectionOrNull(collection); - // due to the way we manage collections in SimClusterStateProvider a null here - // can mean that a collection is still being created but has no replicas - if (coll == null) { // does not yet exist? - timeout.sleep(100); - continue; - } - if (predicate.matches(state.getLiveNodes(), coll)) { - log.trace("-- predicate matched with state {}", state); - return timeout.timeElapsed(TimeUnit.MILLISECONDS); - } - timeout.sleep(100); - if (timeout.timeLeft(TimeUnit.MILLISECONDS) < timeWarn) { - log.trace("-- still not matching predicate: {}", state); - } - } - throw new TimeoutException("last ClusterState: " + state + ", last coll state: " + coll); - } - - /** - * Return a {@link CollectionStatePredicate} that returns true if a collection has the expected - * number of active shards and replicas - * - * @param expectedShards expected number of active shards - * @param expectedReplicas expected number of active replicas - */ - public static CollectionStatePredicate clusterShape(int expectedShards, int expectedReplicas) { - return clusterShape(expectedShards, expectedReplicas, false, false); - } - - /** - * Return a {@link CollectionStatePredicate} that returns true if a collection has the expected - * number of shards and replicas. - * - * <p>Note: for shards marked as inactive the current Solr behavior is that replicas remain - * active. - * - * @param expectedShards expected number of shards - * @param expectedReplicas expected number of active replicas per shard - * @param withInactive if true then count also inactive shards - * @param requireLeaders if true then require that each shard has a leader - */ - public static CollectionStatePredicate clusterShape( - int expectedShards, int expectedReplicas, boolean withInactive, boolean requireLeaders) { - return (liveNodes, collectionState) -> { - if (collectionState == null) { - log.debug("-- null collection"); - return false; - } - Collection<Slice> slices = - withInactive ? collectionState.getSlices() : collectionState.getActiveSlices(); - if (slices.size() != expectedShards) { - if (log.isDebugEnabled()) { - log.debug( - "-- wrong number of slices for collection {}, expected={}, found={}: {}", - collectionState.getName(), - expectedShards, - collectionState.getSlices().size(), - collectionState.getSlices()); - } - return false; - } - Set<String> leaderless = new HashSet<>(); - for (Slice slice : slices) { - int activeReplicas = 0; - if (requireLeaders - && slice.getState() != Slice.State.INACTIVE - && slice.getLeader() == null) { - leaderless.add(slice.getName()); - continue; - } - // skip other checks, we're going to fail anyway - if (!leaderless.isEmpty()) { - continue; - } - for (Replica replica : slice) { - if (replica.isActive(liveNodes)) activeReplicas++; - } - if (activeReplicas != expectedReplicas) { - if (log.isDebugEnabled()) { - log.debug( - "-- wrong number of active replicas for collection {} in slice {}, expected={}, found={}", - collectionState.getName(), - slice.getName(), - expectedReplicas, - activeReplicas); - } - return false; - } - } - if (leaderless.isEmpty()) { - return true; - } else { - log.info("-- shards without leaders: {}", leaderless); - return false; - } - }; - } - /** * Builds a string with sorted {@link CoreContainer#getLoadedCoreNames()} while truncating to the * first 20 cores. diff --git a/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java b/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java index cd0c61d709c..29e85f66905 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.concurrent.CountDownLatch; @@ -31,6 +32,7 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.response.CollectionAdminResponse; import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.RequestStatusState; import org.apache.solr.cloud.api.collections.ReindexCollectionCmd; import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrInputDocument; @@ -39,6 +41,7 @@ import org.apache.solr.common.cloud.ImplicitDocRouter; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.util.TimeSource; import org.apache.solr.util.LogLevel; import org.apache.solr.util.TestInjection; import org.apache.solr.util.TimeOut; @@ -78,15 +81,32 @@ public void doBefore() { .build(); } - private ReindexCollectionCmd.State getState(String collection) { - try { - return ReindexCollectionCmd.State.get( - ReindexCollectionCmd.getReindexingState(stateManager, collection) - .get(ReindexCollectionCmd.STATE)); - } catch (Exception e) { - fail("Unexpected exception checking state of " + collection + ": " + e); - return null; + /** Wait for the reindexing state to have the specified value, or fail (can be null). */ + private void waitForReindexingState(String collection, ReindexCollectionCmd.State expected) { + + ReindexCollectionCmd.State lastSeen = null; + + TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME); + while (!timeout.hasTimedOut()) { + try { + lastSeen = + ReindexCollectionCmd.State.get( + ReindexCollectionCmd.getReindexingState(stateManager, collection) + .get(ReindexCollectionCmd.STATE)); + + // exit busy waiting if expected state was found + if (lastSeen == expected) { + break; + } + } catch (Exception e) { + fail("Unexpected exception checking state of " + collection + ": " + e); + return; + } } + + assertFalse( + String.format(Locale.ROOT, "Expecting state %s, last seen was %s", expected, lastSeen), + timeout.hasTimedOut()); } @After @@ -128,8 +148,7 @@ public void testBasicReindexing() throws Exception { assertEquals( status.toString(), (long) NUM_DOCS, ((Number) status.get("processedDocs")).longValue()); - CloudUtil.waitForState( - cloudManager, + waitForState( "did not finish copying in time", targetCollection, (liveNodes, coll) -> { @@ -185,8 +204,7 @@ private void doTestSameTargetReindexing(boolean sourceRemove, boolean followAlia } assertNotNull("target collection not present after 30s", realTargetCollection); - CloudUtil.waitForState( - cloudManager, + waitForState( "did not finish copying in time", realTargetCollection, (liveNodes, coll) -> { @@ -227,8 +245,7 @@ public void testLossySchema() throws Exception { .setConfigName("conf3"); req.process(solrClient); - CloudUtil.waitForState( - cloudManager, + waitForState( "did not finish copying in time", targetCollection, (liveNodes, coll) -> { @@ -271,8 +288,7 @@ public void testReshapeReindexing() throws Exception { .setCollectionParam("q", "id:10*"); req.process(solrClient); - CloudUtil.waitForState( - cloudManager, + waitForState( "did not finish copying in time", targetCollection, (liveNodes, coll) -> { @@ -361,14 +377,13 @@ public void testFailure() throws Exception { coll.getName().startsWith(ReindexCollectionCmd.CHK_COL_PREFIX)); }); // verify that the source collection is read-write and has no reindexing flags - CloudUtil.waitForState( - cloudManager, + waitForState( "collection state is incorrect", sourceCollection, - ((liveNodes, collectionState) -> + (liveNodes, collectionState) -> !collectionState.isReadOnly() - && collectionState.getStr(ReindexCollectionCmd.REINDEXING_STATE) == null - && getState(sourceCollection) == null)); + && collectionState.getStr(ReindexCollectionCmd.REINDEXING_STATE) == null); + waitForReindexingState(sourceCollection, null); } @Test @@ -383,8 +398,7 @@ public void testAbort() throws Exception { CollectionAdminRequest.reindexCollection(sourceCollection).setTarget(targetCollection); String asyncId = req.processAsync(solrClient); // wait for the source collection to be put in readOnly mode - CloudUtil.waitForState( - cloudManager, + waitForState( "source collection didn't become readOnly", sourceCollection, (liveNodes, coll) -> coll.isReadOnly()); @@ -397,13 +411,11 @@ public void testAbort() throws Exception { assertNotNull(rsp.toString(), status); assertEquals(status.toString(), "aborting", status.get("state")); - CloudUtil.waitForState( - cloudManager, + waitForState( "incorrect collection state", sourceCollection, - ((liveNodes, collectionState) -> - collectionState.isReadOnly() - && getState(sourceCollection) == ReindexCollectionCmd.State.ABORTED)); + (liveNodes, collectionState) -> collectionState.isReadOnly()); + waitForReindexingState(sourceCollection, ReindexCollectionCmd.State.ABORTED); // verify status req.setCommand("status"); @@ -413,14 +425,24 @@ public void testAbort() throws Exception { assertEquals(status.toString(), "aborted", status.get("state")); // let the process continue TestInjection.reindexLatch.countDown(); - CloudUtil.waitForState( - cloudManager, + waitForState( "source collection is in wrong state", sourceCollection, - (liveNodes, docCollection) -> - !docCollection.isReadOnly() && getState(sourceCollection) == null); + (liveNodes, docCollection) -> !docCollection.isReadOnly()); + waitForReindexingState(sourceCollection, null); + // verify the response rsp = CollectionAdminRequest.requestStatus(asyncId).process(solrClient); + int retries = 0; + while (retries < 5 + && ((CollectionAdminRequest.RequestStatusResponse) rsp).getRequestStatus() + == RequestStatusState.RUNNING) { + // There is a chance the async status hasn't been updated yet when we retrieve it the first + // time. Just wait and retry a couple of times + Thread.sleep(100L); + rsp = CollectionAdminRequest.requestStatus(asyncId).process(solrClient); + retries++; + } status = (Map<String, Object>) rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS); assertNotNull(rsp.toString(), status); assertEquals(status.toString(), "aborted", status.get("state")); diff --git a/solr/core/src/test/org/apache/solr/search/TestCpuAllowedLimit.java b/solr/core/src/test/org/apache/solr/search/TestCpuAllowedLimit.java index 1fd6c87616d..2a61aa9eebd 100644 --- a/solr/core/src/test/org/apache/solr/search/TestCpuAllowedLimit.java +++ b/solr/core/src/test/org/apache/solr/search/TestCpuAllowedLimit.java @@ -24,7 +24,6 @@ import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.response.QueryResponse; -import org.apache.solr.cloud.CloudUtil; import org.apache.solr.cloud.SolrCloudTestCase; import org.apache.solr.index.NoMergePolicyFactory; import org.apache.solr.util.TestInjection; @@ -78,8 +77,7 @@ public static void setupClass() throws Exception { CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLLECTION, "conf", 3, 2); create.process(solrClient); - CloudUtil.waitForState( - cluster.getOpenOverseer().getSolrCloudManager(), "active", COLLECTION, clusterShape(3, 6)); + waitForState("active", COLLECTION, clusterShape(3, 6)); for (int j = 0; j < 100; j++) { solrClient.add(COLLECTION, sdoc("id", "id-" + j, "val_i", j % 5)); solrClient.commit(COLLECTION); // need to commit every doc to create many segments. diff --git a/solr/core/src/test/org/apache/solr/search/TestQueryLimits.java b/solr/core/src/test/org/apache/solr/search/TestQueryLimits.java index ae1b0a32d6c..7b984bbf05b 100644 --- a/solr/core/src/test/org/apache/solr/search/TestQueryLimits.java +++ b/solr/core/src/test/org/apache/solr/search/TestQueryLimits.java @@ -20,7 +20,6 @@ import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.response.QueryResponse; -import org.apache.solr.cloud.CloudUtil; import org.apache.solr.cloud.SolrCloudTestCase; import org.apache.solr.util.TestInjection; import org.apache.solr.util.ThreadCpuTimer; @@ -39,8 +38,7 @@ public static void setupCluster() throws Exception { CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLLECTION, "conf", 3, 2); create.process(solrClient); - CloudUtil.waitForState( - cluster.getOpenOverseer().getSolrCloudManager(), "active", COLLECTION, clusterShape(3, 6)); + waitForState("active", COLLECTION, clusterShape(3, 6)); for (int j = 0; j < 100; j++) { solrClient.add( COLLECTION,