-
Notifications
You must be signed in to change notification settings - Fork 674
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SOLR-17582 Stream CLUSTERSTATUS API response #2916
base: main
Are you sure you want to change the base?
Changes from 7 commits
f64e3d3
bbbc62d
6ed4a95
3627bda
c5a485f
8b8d37d
457da45
f6e77cd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,8 +16,8 @@ | |
*/ | ||
package org.apache.solr.handler.admin; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
|
@@ -27,6 +27,7 @@ | |
import java.util.Objects; | ||
import java.util.Set; | ||
import java.util.stream.Stream; | ||
import org.apache.solr.common.MapWriter; | ||
import org.apache.solr.common.SolrException; | ||
import org.apache.solr.common.cloud.Aliases; | ||
import org.apache.solr.common.cloud.ClusterState; | ||
|
@@ -180,6 +181,14 @@ private void fetchClusterStatusForCollOrAlias( | |
String routeKey = solrParams.get(ShardParams._ROUTE_); | ||
String shard = solrParams.get(ZkStateReader.SHARD_ID_PROP); | ||
|
||
Set<String> requestedShards; | ||
if (shard != null) { | ||
String[] paramShards = shard.split(","); | ||
requestedShards = Set.of(paramShards); | ||
} else { | ||
requestedShards = Set.of(); | ||
} | ||
|
||
Stream<DocCollection> collectionStream; | ||
if (collection == null) { | ||
collectionStream = clusterState.collectionStream(); | ||
|
@@ -205,54 +214,25 @@ private void fetchClusterStatusForCollOrAlias( | |
} | ||
} | ||
|
||
// TODO use an Iterable to stream the data to the client instead of gathering it all in mem | ||
|
||
NamedList<Object> collectionProps = new SimpleOrderedMap<>(); | ||
|
||
collectionStream.forEach( | ||
clusterStateCollection -> { | ||
Map<String, Object> collectionStatus; | ||
String name = clusterStateCollection.getName(); | ||
|
||
Set<String> requestedShards = new HashSet<>(); | ||
if (routeKey != null) { | ||
DocRouter router = clusterStateCollection.getRouter(); | ||
Collection<Slice> slices = | ||
router.getSearchSlices(routeKey, null, clusterStateCollection); | ||
for (Slice slice : slices) { | ||
requestedShards.add(slice.getName()); | ||
} | ||
} | ||
if (shard != null) { | ||
String[] paramShards = shard.split(","); | ||
requestedShards.addAll(Arrays.asList(paramShards)); | ||
} | ||
|
||
byte[] bytes = Utils.toJSON(clusterStateCollection); | ||
@SuppressWarnings("unchecked") | ||
Map<String, Object> docCollection = (Map<String, Object>) Utils.fromJSON(bytes); | ||
collectionStatus = getCollectionStatus(docCollection, name, requestedShards); | ||
|
||
collectionStatus.put("znodeVersion", clusterStateCollection.getZNodeVersion()); | ||
collectionStatus.put( | ||
"creationTimeMillis", clusterStateCollection.getCreationTime().toEpochMilli()); | ||
|
||
if (collectionVsAliases.containsKey(name) && !collectionVsAliases.get(name).isEmpty()) { | ||
collectionStatus.put("aliases", collectionVsAliases.get(name)); | ||
} | ||
String configName = clusterStateCollection.getConfigName(); | ||
collectionStatus.put("configName", configName); | ||
if (solrParams.getBool("prs", false) && clusterStateCollection.isPerReplicaState()) { | ||
PerReplicaStates prs = clusterStateCollection.getPerReplicaStates(); | ||
collectionStatus.put("PRS", prs); | ||
} | ||
collectionProps.add(name, collectionStatus); | ||
}); | ||
|
||
// now we need to walk the collectionProps tree to cross-check replica state with live nodes | ||
crossCheckReplicaStateWithLiveNodes(liveNodes, collectionProps); | ||
|
||
clusterStatus.add("collections", collectionProps); | ||
MapWriter collectionPropsWriter = | ||
ew -> { | ||
collectionStream.forEach( | ||
(collectionState) -> { | ||
try { | ||
ew.put( | ||
collectionState.getName(), | ||
buildResponseForCollection( | ||
collectionState, | ||
collectionVsAliases, | ||
routeKey, | ||
liveNodes, | ||
requestedShards)); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does buildResponseForCollection throw IOException? That's suspicious. If you must, catch in there and throw a suitable exception like SolrException (which extends RuntimeException and is generally preferred within Solr over RE). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. putNoEx then |
||
} | ||
}); | ||
}; | ||
clusterStatus.add("collections", collectionPropsWriter); | ||
} | ||
|
||
private void addAliasMap(Aliases aliases, NamedList<Object> clusterStatus) { | ||
|
@@ -307,23 +287,20 @@ private Map<String, Object> getCollectionStatus( | |
*/ | ||
@SuppressWarnings("unchecked") | ||
protected void crossCheckReplicaStateWithLiveNodes( | ||
List<String> liveNodes, NamedList<Object> collectionProps) { | ||
for (Map.Entry<String, Object> next : collectionProps) { | ||
Map<String, Object> collMap = (Map<String, Object>) next.getValue(); | ||
Map<String, Object> shards = (Map<String, Object>) collMap.get("shards"); | ||
for (Object nextShard : shards.values()) { | ||
Map<String, Object> shardMap = (Map<String, Object>) nextShard; | ||
Map<String, Object> replicas = (Map<String, Object>) shardMap.get("replicas"); | ||
for (Object nextReplica : replicas.values()) { | ||
Map<String, Object> replicaMap = (Map<String, Object>) nextReplica; | ||
if (Replica.State.getState((String) replicaMap.get(ZkStateReader.STATE_PROP)) | ||
!= Replica.State.DOWN) { | ||
// not down, so verify the node is live | ||
String node_name = (String) replicaMap.get(ZkStateReader.NODE_NAME_PROP); | ||
if (!liveNodes.contains(node_name)) { | ||
// node is not live, so this replica is actually down | ||
replicaMap.put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString()); | ||
} | ||
List<String> liveNodes, Map<String, Object> collectionProps) { | ||
Map<String, Object> shards = (Map<String, Object>) collectionProps.get("shards"); | ||
for (Object nextShard : shards.values()) { | ||
Map<String, Object> shardMap = (Map<String, Object>) nextShard; | ||
Map<String, Object> replicas = (Map<String, Object>) shardMap.get("replicas"); | ||
for (Object nextReplica : replicas.values()) { | ||
Map<String, Object> replicaMap = (Map<String, Object>) nextReplica; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I almost want to cry just glancing at this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll change those There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. totally understood. In some old Solr code like this, there's always "and one more thing" we could/should do but ultimately snowballs the scope out of control. I leave it to you to do as you wish. Thank you for your contribution here; I didn't mean to get more out of you than you bargained for :-) |
||
if (Replica.State.getState((String) replicaMap.get(ZkStateReader.STATE_PROP)) | ||
!= Replica.State.DOWN) { | ||
// not down, so verify the node is live | ||
String node_name = (String) replicaMap.get(ZkStateReader.NODE_NAME_PROP); | ||
if (!liveNodes.contains(node_name)) { | ||
// node is not live, so this replica is actually down | ||
replicaMap.put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString()); | ||
} | ||
} | ||
} | ||
|
@@ -368,4 +345,47 @@ public static Map<String, Object> postProcessCollectionJSON(Map<String, Object> | |
collection.put("health", Health.combine(healthStates).toString()); | ||
return collection; | ||
} | ||
|
||
private Map<String, Object> buildResponseForCollection( | ||
DocCollection clusterStateCollection, | ||
Map<String, List<String>> collectionVsAliases, | ||
String routeKey, | ||
List<String> liveNodes, | ||
Set<String> requestedShards) { | ||
Map<String, Object> collectionStatus; | ||
Set<String> shards = new HashSet<>(requestedShards); | ||
String name = clusterStateCollection.getName(); | ||
|
||
if (routeKey != null) { | ||
DocRouter router = clusterStateCollection.getRouter(); | ||
Collection<Slice> slices = router.getSearchSlices(routeKey, null, clusterStateCollection); | ||
for (Slice slice : slices) { | ||
shards.add(slice.getName()); | ||
} | ||
} | ||
|
||
byte[] bytes = Utils.toJSON(clusterStateCollection); | ||
@SuppressWarnings("unchecked") | ||
Map<String, Object> docCollection = (Map<String, Object>) Utils.fromJSON(bytes); | ||
collectionStatus = getCollectionStatus(docCollection, name, shards); | ||
|
||
collectionStatus.put("znodeVersion", clusterStateCollection.getZNodeVersion()); | ||
collectionStatus.put( | ||
"creationTimeMillis", clusterStateCollection.getCreationTime().toEpochMilli()); | ||
|
||
if (collectionVsAliases.containsKey(name) && !collectionVsAliases.get(name).isEmpty()) { | ||
collectionStatus.put("aliases", collectionVsAliases.get(name)); | ||
} | ||
String configName = clusterStateCollection.getConfigName(); | ||
collectionStatus.put("configName", configName); | ||
if (solrParams.getBool("prs", false) && clusterStateCollection.isPerReplicaState()) { | ||
PerReplicaStates prs = clusterStateCollection.getPerReplicaStates(); | ||
collectionStatus.put("PRS", prs); | ||
} | ||
|
||
// now we need to walk the collectionProps tree to cross-check replica state with live nodes | ||
crossCheckReplicaStateWithLiveNodes(liveNodes, collectionStatus); | ||
|
||
return collectionStatus; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
friggin beautiful now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
putNoEx
removes the try/catch we can get it cleaner!