Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-17582 Stream CLUSTERSTATUS API response #2916

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion solr/core/src/java/org/apache/solr/cli/StatusTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,8 @@ protected Map<String, String> getCloudStatus(SolrClient solrClient, String zkHos
cloudStatus.put("liveNodes", String.valueOf(liveNodes.size()));

// TODO get this as a metric from the metrics API instead, or something else.
var collections = (NamedList<Object>) json.findRecursive("cluster", "collections");
Map<String, Object> collections =
(Map<String, Object>) json.findRecursive("cluster", "collections");
cloudStatus.put("collections", String.valueOf(collections.size()));

return cloudStatus;
Expand Down
129 changes: 80 additions & 49 deletions solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@
*/
package org.apache.solr.handler.admin;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
Expand Down Expand Up @@ -180,6 +183,12 @@ private void fetchClusterStatusForCollOrAlias(
String routeKey = solrParams.get(ShardParams._ROUTE_);
String shard = solrParams.get(ZkStateReader.SHARD_ID_PROP);

Set<String> requestedShards = new HashSet<>();
if (shard != null) {
String[] paramShards = shard.split(",");
requestedShards.addAll(Arrays.asList(paramShards));
}
dsmiley marked this conversation as resolved.
Show resolved Hide resolved

Stream<DocCollection> collectionStream;
if (collection == null) {
collectionStream = clusterState.collectionStream();
Expand All @@ -205,54 +214,32 @@ private void fetchClusterStatusForCollOrAlias(
}
}

// TODO use an Iterable to stream the data to the client instead of gathering it all in mem

NamedList<Object> collectionProps = new SimpleOrderedMap<>();

collectionStream.forEach(
clusterStateCollection -> {
Map<String, Object> collectionStatus;
String name = clusterStateCollection.getName();

Set<String> requestedShards = new HashSet<>();
if (routeKey != null) {
DocRouter router = clusterStateCollection.getRouter();
Collection<Slice> slices =
router.getSearchSlices(routeKey, null, clusterStateCollection);
for (Slice slice : slices) {
requestedShards.add(slice.getName());
}
}
if (shard != null) {
String[] paramShards = shard.split(",");
requestedShards.addAll(Arrays.asList(paramShards));
}

byte[] bytes = Utils.toJSON(clusterStateCollection);
@SuppressWarnings("unchecked")
Map<String, Object> docCollection = (Map<String, Object>) Utils.fromJSON(bytes);
collectionStatus = getCollectionStatus(docCollection, name, requestedShards);

collectionStatus.put("znodeVersion", clusterStateCollection.getZNodeVersion());
collectionStatus.put(
"creationTimeMillis", clusterStateCollection.getCreationTime().toEpochMilli());

if (collectionVsAliases.containsKey(name) && !collectionVsAliases.get(name).isEmpty()) {
collectionStatus.put("aliases", collectionVsAliases.get(name));
MapWriter collectionPropsWriter =
ew -> {
Iterator<Map<String, Object>> it =
collectionStream
.map(
(collectionState) ->
collectionPropsResponse(
collectionState,
collectionVsAliases,
routeKey,
liveNodes,
requestedShards))
.iterator();
while (it.hasNext()) {
dsmiley marked this conversation as resolved.
Show resolved Hide resolved
Map<String, Object> props = it.next();
props.forEach(
(key, value) -> {
try {
ew.put(key, value);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking, if we can change up DocCollection in another PR, we can delete all those post processing functions and condense the response to this.

MapWriter collectionPropsWriter =
        ew -> {
          Iterator<DocCollection> it = collectionStream.iterator();
          while (it.hasNext()) {
            DocCollection collState = it.next();
            ew.put(collState.getName(), collState);
          }
        };

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should mention Solr has two similar APIs, ClusterStatus & ColStatus. See org.apache.solr.handler.admin.ColStatus#getColStatus. It'd be awesome if there was a single method that takes a DocCollection (and some other params as needed) to produce a NamedList. At least ColStatus's code doesn't have the sad JSON round-trip. Maybe it's own PR or not as you wish.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. I'll take a look at this in a separate PR/Jira

}
String configName = clusterStateCollection.getConfigName();
collectionStatus.put("configName", configName);
if (solrParams.getBool("prs", false) && clusterStateCollection.isPerReplicaState()) {
PerReplicaStates prs = clusterStateCollection.getPerReplicaStates();
collectionStatus.put("PRS", prs);
}
collectionProps.add(name, collectionStatus);
});

// now we need to walk the collectionProps tree to cross-check replica state with live nodes
crossCheckReplicaStateWithLiveNodes(liveNodes, collectionProps);

clusterStatus.add("collections", collectionProps);
};
clusterStatus.add("collections", collectionPropsWriter);
}

private void addAliasMap(Aliases aliases, NamedList<Object> clusterStatus) {
Expand Down Expand Up @@ -307,8 +294,8 @@ private Map<String, Object> getCollectionStatus(
*/
@SuppressWarnings("unchecked")
protected void crossCheckReplicaStateWithLiveNodes(
List<String> liveNodes, NamedList<Object> collectionProps) {
for (Map.Entry<String, Object> next : collectionProps) {
List<String> liveNodes, Map<String, Object> collectionProps) {
for (Map.Entry<String, Object> next : collectionProps.entrySet()) {
Map<String, Object> collMap = (Map<String, Object>) next.getValue();
Map<String, Object> shards = (Map<String, Object>) collMap.get("shards");
for (Object nextShard : shards.values()) {
Expand Down Expand Up @@ -368,4 +355,48 @@ public static Map<String, Object> postProcessCollectionJSON(Map<String, Object>
collection.put("health", Health.combine(healthStates).toString());
return collection;
}

private Map<String, Object> collectionPropsResponse(
dsmiley marked this conversation as resolved.
Show resolved Hide resolved
DocCollection clusterStateCollection,
Map<String, List<String>> collectionVsAliases,
String routeKey,
List<String> liveNodes,
Set<String> requestedShards) {
Map<String, Object> collectionProps = new HashMap<>();
Map<String, Object> collectionStatus;
String name = clusterStateCollection.getName();

if (routeKey != null) {
DocRouter router = clusterStateCollection.getRouter();
Collection<Slice> slices = router.getSearchSlices(routeKey, null, clusterStateCollection);
for (Slice slice : slices) {
requestedShards.add(slice.getName());
dsmiley marked this conversation as resolved.
Show resolved Hide resolved
}
}

byte[] bytes = Utils.toJSON(clusterStateCollection);
@SuppressWarnings("unchecked")
Map<String, Object> docCollection = (Map<String, Object>) Utils.fromJSON(bytes);
collectionStatus = getCollectionStatus(docCollection, name, requestedShards);

collectionStatus.put("znodeVersion", clusterStateCollection.getZNodeVersion());
collectionStatus.put(
"creationTimeMillis", clusterStateCollection.getCreationTime().toEpochMilli());

if (collectionVsAliases.containsKey(name) && !collectionVsAliases.get(name).isEmpty()) {
collectionStatus.put("aliases", collectionVsAliases.get(name));
}
String configName = clusterStateCollection.getConfigName();
collectionStatus.put("configName", configName);
if (solrParams.getBool("prs", false) && clusterStateCollection.isPerReplicaState()) {
PerReplicaStates prs = clusterStateCollection.getPerReplicaStates();
collectionStatus.put("PRS", prs);
}
collectionProps.put(name, collectionStatus);
dsmiley marked this conversation as resolved.
Show resolved Hide resolved

// now we need to walk the collectionProps tree to cross-check replica state with live
// nodes
crossCheckReplicaStateWithLiveNodes(liveNodes, collectionProps);
return collectionProps;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,12 @@ private void testModifyCollection() throws Exception {
.getResponse();
NamedList<?> cluster = (NamedList<?>) rsp.get("cluster");
assertNotNull("Cluster state should not be null", cluster);
NamedList<?> collections = (NamedList<?>) cluster.get("collections");
Map<?, ?> collections = (Map<?, ?>) cluster.get("collections");
assertNotNull("Collections should not be null in cluster state", collections);
assertEquals(1, collections.size());
assertEquals("25", collections._getStr(List.of(COLLECTION_NAME, "replicationFactor"), null));
Map<?, ?> collectionProperties = (Map<?, ?>) collections.get(COLLECTION_NAME);
collectionProperties.get("replicationFactor");
assertEquals("25", collectionProperties.get("replicationFactor"));

params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.MODIFYCOLLECTION.toString());
Expand All @@ -151,10 +153,12 @@ private void testModifyCollection() throws Exception {
System.out.println(rsp);
cluster = (NamedList<?>) rsp.get("cluster");
assertNotNull("Cluster state should not be null", cluster);
collections = (NamedList<?>) cluster.get("collections");
collections = (Map<?, ?>) cluster.get("collections");
assertNotNull("Collections should not be null in cluster state", collections);
assertEquals(1, collections.size());
assertNull(collections._getStr(List.of(COLLECTION_NAME, "replicationFactor"), null));
collectionProperties = (Map<?, ?>) collections.get(COLLECTION_NAME);
collectionProperties.get("replicationFactor");
assertNull(collectionProperties.get("replicationFactor"));

params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.MODIFYCOLLECTION.toString());
Expand Down Expand Up @@ -253,7 +257,7 @@ private void testNoConfigset() throws Exception {
NamedList<?> rsp = client.request(req);
NamedList<?> cluster = (NamedList<?>) rsp.get("cluster");
assertNotNull("Cluster state should not be null", cluster);
NamedList<?> collections = (NamedList<?>) cluster.get("collections");
Map<?, ?> collections = (Map<?, ?>) cluster.get("collections");
assertNotNull("Collections should not be null in cluster state", collections);
assertNotNull(
"Testing to insure collections are returned", collections.get(COLLECTION_NAME1));
Expand All @@ -280,7 +284,7 @@ private void assertCountsForRepFactorAndNrtReplicas(CloudSolrClient client, Stri
NamedList<Object> rsp = client.request(request);
NamedList<?> cluster = (NamedList<?>) rsp.get("cluster");
assertNotNull("Cluster state should not be null", cluster);
NamedList<?> collections = (NamedList<?>) cluster.get("collections");
Map<?, ?> collections = (Map<?, ?>) cluster.get("collections");
assertNotNull("Collections should not be null in cluster state", collections);
assertEquals(1, collections.size());
@SuppressWarnings({"unchecked"})
Expand All @@ -302,7 +306,7 @@ private void clusterStatusWithCollectionAndShard() throws IOException, SolrServe
NamedList<Object> rsp = client.request(request);
NamedList<?> cluster = (NamedList<?>) rsp.get("cluster");
assertNotNull("Cluster state should not be null", cluster);
NamedList<?> collections = (NamedList<?>) cluster.get("collections");
Map<?, ?> collections = (Map<?, ?>) cluster.get("collections");
assertNotNull("Collections should not be null in cluster state", collections);
assertNotNull(collections.get(COLLECTION_NAME));
assertEquals(1, collections.size());
Expand All @@ -328,7 +332,7 @@ private void clusterStatusWithCollectionAndMultipleShards()
NamedList<Object> rsp = request.process(client).getResponse();
NamedList<?> cluster = (NamedList<?>) rsp.get("cluster");
assertNotNull("Cluster state should not be null", cluster);
NamedList<?> collections = (NamedList<?>) cluster.get("collections");
Map<?, ?> collections = (Map<?, ?>) cluster.get("collections");
assertNotNull("Collections should not be null in cluster state", collections);
assertNotNull(collections.get(COLLECTION_NAME));
assertEquals(1, collections.size());
Expand Down Expand Up @@ -463,7 +467,7 @@ private void clusterStatusNoCollection() throws Exception {
NamedList<Object> rsp = client.request(request);
NamedList<?> cluster = (NamedList<?>) rsp.get("cluster");
assertNotNull("Cluster state should not be null", cluster);
NamedList<?> collections = (NamedList<?>) cluster.get("collections");
Map<?, ?> collections = (Map<?, ?>) cluster.get("collections");
assertNotNull("Collections should not be null in cluster state", collections);
assertNotNull(collections.get(COLLECTION_NAME1));
assertEquals(4, collections.size());
Expand All @@ -485,7 +489,7 @@ private void clusterStatusWithCollection() throws IOException, SolrServerExcepti
NamedList<Object> rsp = client.request(request);
NamedList<?> cluster = (NamedList<?>) rsp.get("cluster");
assertNotNull("Cluster state should not be null", cluster);
NamedList<?> collections = (NamedList<?>) cluster.get("collections");
Map<?, ?> collections = (Map<?, ?>) cluster.get("collections");
assertNotNull("Collections should not be null in cluster state", collections);
assertEquals(1, collections.size());
@SuppressWarnings({"unchecked"})
Expand Down Expand Up @@ -515,7 +519,7 @@ private void clusterStatusZNodeVersion() throws Exception {
NamedList<Object> rsp = client.request(request);
NamedList<Object> cluster = (NamedList<Object>) rsp.get("cluster");
assertNotNull("Cluster state should not be null", cluster);
NamedList<Object> collections = (NamedList<Object>) cluster.get("collections");
Map<?, ?> collections = (Map<?, ?>) cluster.get("collections");
assertNotNull("Collections should not be null in cluster state", collections);
assertEquals(1, collections.size());
Map<String, Object> collection = (Map<String, Object>) collections.get(cname);
Expand All @@ -531,7 +535,7 @@ private void clusterStatusZNodeVersion() throws Exception {

rsp = client.request(request);
cluster = (NamedList<Object>) rsp.get("cluster");
collections = (NamedList<Object>) cluster.get("collections");
collections = (Map<?, ?>) cluster.get("collections");
collection = (Map<String, Object>) collections.get(cname);
Integer newVersion = (Integer) collection.get("znodeVersion");
assertNotNull(newVersion);
Expand All @@ -558,7 +562,7 @@ private void clusterStatusWithRouteKey() throws IOException, SolrServerException
NamedList<Object> cluster = (NamedList<Object>) rsp.get("cluster");
assertNotNull("Cluster state should not be null", cluster);
@SuppressWarnings({"unchecked"})
NamedList<Object> collections = (NamedList<Object>) cluster.get("collections");
Map<?, ?> collections = (Map<?, ?>) cluster.get("collections");
assertNotNull("Collections should not be null in cluster state", collections);
assertNotNull(collections.get(DEFAULT_COLLECTION));
assertEquals(1, collections.size());
Expand Down Expand Up @@ -605,7 +609,7 @@ private void clusterStatusAliasTest() throws Exception {
DEFAULT_COLLECTION + "," + COLLECTION_NAME,
aliases.get("myalias"));

NamedList<Object> collections = (NamedList<Object>) cluster.get("collections");
Map<?, ?> collections = (Map<?, ?>) cluster.get("collections");
assertNotNull("Collections should not be null in cluster state", collections);
assertNotNull(collections.get(DEFAULT_COLLECTION));
Map<String, Object> collection = (Map<String, Object>) collections.get(DEFAULT_COLLECTION);
Expand All @@ -625,7 +629,7 @@ private void clusterStatusAliasTest() throws Exception {

cluster = (NamedList<Object>) rsp.get("cluster");
assertNotNull("Cluster state should not be null", cluster);
collections = (NamedList<Object>) cluster.get("collections");
collections = (Map<?, ?>) cluster.get("collections");
assertNotNull("Collections should not be null in cluster state", collections);
assertNotNull(collections.get(DEFAULT_COLLECTION));
assertNotNull(collections.get(COLLECTION_NAME));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.CollectionUtil;
import org.apache.solr.common.util.EnvUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.Utils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -140,11 +139,11 @@ private ClusterState fetchClusterState(SolrClient client)
liveNodesTimestamp = System.nanoTime();
}

var collectionsNl = (NamedList<Map<String, Object>>) cluster.get("collections");
var collectionsMap = (Map<String, Map<String, Object>>) cluster.get("collections");

Map<String, DocCollection> collStateByName =
CollectionUtil.newLinkedHashMap(collectionsNl.size());
for (Entry<String, Map<String, Object>> entry : collectionsNl) {
CollectionUtil.newLinkedHashMap(collectionsMap.size());
for (Entry<String, Map<String, Object>> entry : collectionsMap.entrySet()) {
collStateByName.put(
entry.getKey(), getDocCollectionFromObjects(entry.getKey(), entry.getValue()));
}
Expand Down Expand Up @@ -185,11 +184,12 @@ private DocCollection fetchCollectionState(SolrClient client, String collection)
SimpleOrderedMap<?> cluster =
submitClusterStateRequest(client, collection, ClusterStateRequestType.FETCH_COLLECTION);

var collStateMap = (Map<String, Object>) cluster.findRecursive("collections", collection);
if (collStateMap == null) {
var collState = (Map<String, Object>) cluster.findRecursive("collections", collection);

if (collState == null) {
throw new NotACollectionException(); // probably an alias
}
return getDocCollectionFromObjects(collection, collStateMap);
return getDocCollectionFromObjects(collection, collState);
}

private SimpleOrderedMap<?> submitClusterStateRequest(
Expand Down
10 changes: 6 additions & 4 deletions solr/solrj/src/java/org/apache/solr/common/util/NamedList.java
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ private void killAll(String name) {
/**
* Recursively parses the NamedList structure to arrive at a specific element. As you descend the
* NamedList tree, the last element can be any type, including NamedList, but the previous
* elements MUST be NamedList objects themselves. A null value is returned if the indicated
* elements MUST be NamedList or Map objects themselves. A null value is returned if the indicated
* hierarchy doesn't exist, but NamedList allows null values so that could be the actual value at
* the end of the path.
*
Expand Down Expand Up @@ -348,9 +348,9 @@ public Object findRecursive(String... args) {
* it to this list. Then we retrieve the first key from this list and
* assign it to value.
*
* On the next loop, we check whether the retrieved value is a NamedList.
* If it is, then we drop to that NamedList, grab the value of the
* next key, and start the loop over. If it is not a NamedList, then we
* On the next loop, we check whether the retrieved value is a NamedList or Map.
* If it is, then we drop to that NamedList/Map, grab the value of the
* next key, and start the loop over. If it is not a NamedList/Map, then we
* assign the value to null and break out of the loop.
*
* Assigning the value to null and then breaking out of the loop seems
Expand All @@ -363,6 +363,8 @@ public Object findRecursive(String... args) {
} else {
if (value instanceof NamedList) {
currentList = (NamedList<?>) value;
} else if (value instanceof Map) {
currentList = new NamedList<>((Map<String, Object>) value);
dsmiley marked this conversation as resolved.
Show resolved Hide resolved
} else {
value = null;
break;
Expand Down
Loading
Loading