-
Notifications
You must be signed in to change notification settings - Fork 674
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SOLR-17582 Stream CLUSTERSTATUS API response #2916
base: main
Are you sure you want to change the base?
Changes from 5 commits
f64e3d3
bbbc62d
6ed4a95
3627bda
c5a485f
8b8d37d
457da45
f6e77cd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,17 +16,20 @@ | |
*/ | ||
package org.apache.solr.handler.admin; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.Set; | ||
import java.util.stream.Stream; | ||
import org.apache.solr.common.MapWriter; | ||
import org.apache.solr.common.SolrException; | ||
import org.apache.solr.common.cloud.Aliases; | ||
import org.apache.solr.common.cloud.ClusterState; | ||
|
@@ -205,54 +208,24 @@ private void fetchClusterStatusForCollOrAlias( | |
} | ||
} | ||
|
||
// TODO use an Iterable to stream the data to the client instead of gathering it all in mem | ||
|
||
NamedList<Object> collectionProps = new SimpleOrderedMap<>(); | ||
|
||
collectionStream.forEach( | ||
clusterStateCollection -> { | ||
Map<String, Object> collectionStatus; | ||
String name = clusterStateCollection.getName(); | ||
|
||
Set<String> requestedShards = new HashSet<>(); | ||
if (routeKey != null) { | ||
DocRouter router = clusterStateCollection.getRouter(); | ||
Collection<Slice> slices = | ||
router.getSearchSlices(routeKey, null, clusterStateCollection); | ||
for (Slice slice : slices) { | ||
requestedShards.add(slice.getName()); | ||
} | ||
MapWriter collectionPropsWriter = | ||
ew -> { | ||
SolrCollectionProperiesIterator collectionProps = | ||
new SolrCollectionProperiesIterator( | ||
collectionStream.iterator(), collectionVsAliases, routeKey, liveNodes, shard); | ||
while (collectionProps.hasNext()) { | ||
Map<String, Object> collectionPropsMap = (collectionProps.next().asMap()); | ||
collectionPropsMap.forEach( | ||
(key, value) -> { | ||
try { | ||
ew.put(key, value); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
}); | ||
} | ||
if (shard != null) { | ||
String[] paramShards = shard.split(","); | ||
requestedShards.addAll(Arrays.asList(paramShards)); | ||
} | ||
|
||
byte[] bytes = Utils.toJSON(clusterStateCollection); | ||
@SuppressWarnings("unchecked") | ||
Map<String, Object> docCollection = (Map<String, Object>) Utils.fromJSON(bytes); | ||
collectionStatus = getCollectionStatus(docCollection, name, requestedShards); | ||
|
||
collectionStatus.put("znodeVersion", clusterStateCollection.getZNodeVersion()); | ||
collectionStatus.put( | ||
"creationTimeMillis", clusterStateCollection.getCreationTime().toEpochMilli()); | ||
|
||
if (collectionVsAliases.containsKey(name) && !collectionVsAliases.get(name).isEmpty()) { | ||
collectionStatus.put("aliases", collectionVsAliases.get(name)); | ||
} | ||
String configName = clusterStateCollection.getConfigName(); | ||
collectionStatus.put("configName", configName); | ||
if (solrParams.getBool("prs", false) && clusterStateCollection.isPerReplicaState()) { | ||
PerReplicaStates prs = clusterStateCollection.getPerReplicaStates(); | ||
collectionStatus.put("PRS", prs); | ||
} | ||
collectionProps.add(name, collectionStatus); | ||
}); | ||
|
||
// now we need to walk the collectionProps tree to cross-check replica state with live nodes | ||
crossCheckReplicaStateWithLiveNodes(liveNodes, collectionProps); | ||
|
||
clusterStatus.add("collections", collectionProps); | ||
}; | ||
clusterStatus.add("collections", collectionPropsWriter); | ||
} | ||
|
||
private void addAliasMap(Aliases aliases, NamedList<Object> clusterStatus) { | ||
|
@@ -368,4 +341,77 @@ public static Map<String, Object> postProcessCollectionJSON(Map<String, Object> | |
collection.put("health", Health.combine(healthStates).toString()); | ||
return collection; | ||
} | ||
|
||
private class SolrCollectionProperiesIterator implements Iterator<NamedList<Object>> { | ||
|
||
final Iterator<DocCollection> it; | ||
Map<String, List<String>> collectionVsAliases; | ||
String routeKey; | ||
List<String> liveNodes; | ||
String shard; | ||
|
||
public SolrCollectionProperiesIterator( | ||
Iterator<DocCollection> it, | ||
Map<String, List<String>> collectionVsAliases, | ||
String routeKey, | ||
List<String> liveNodes, | ||
String shard) { | ||
this.it = it; | ||
this.collectionVsAliases = collectionVsAliases; | ||
this.routeKey = routeKey; | ||
this.liveNodes = liveNodes; | ||
this.shard = shard; | ||
} | ||
|
||
@Override | ||
public boolean hasNext() { | ||
return it.hasNext(); | ||
} | ||
|
||
@Override | ||
public NamedList<Object> next() { | ||
NamedList<Object> collectionProps = new SimpleOrderedMap<>(); | ||
DocCollection clusterStateCollection = it.next(); | ||
Map<String, Object> collectionStatus; | ||
String name = clusterStateCollection.getName(); | ||
|
||
Set<String> requestedShards = new HashSet<>(); | ||
if (routeKey != null) { | ||
DocRouter router = clusterStateCollection.getRouter(); | ||
Collection<Slice> slices = router.getSearchSlices(routeKey, null, clusterStateCollection); | ||
for (Slice slice : slices) { | ||
requestedShards.add(slice.getName()); | ||
} | ||
} | ||
if (shard != null) { | ||
String[] paramShards = shard.split(","); | ||
requestedShards.addAll(Arrays.asList(paramShards)); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ideally this is done up front; not per iteration There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved it out so its not per iteration. |
||
|
||
byte[] bytes = Utils.toJSON(clusterStateCollection); | ||
@SuppressWarnings("unchecked") | ||
Map<String, Object> docCollection = (Map<String, Object>) Utils.fromJSON(bytes); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why round-trip this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So looked at this and I understand now why it was done this way. It wants to just write this out to the Was looking to avoid this back and forth, but there were a few options. I tried using an Other way is to introduce a some kind of Another option which actually looks like the way we should go is I found that the I am thinking we should refactor DocCollection to so we can just return this but the changes were much more drastic but may be worth it. Maybe in a different JIRA? This scope continues to creep with me adding improvement to |
||
collectionStatus = getCollectionStatus(docCollection, name, requestedShards); | ||
|
||
collectionStatus.put("znodeVersion", clusterStateCollection.getZNodeVersion()); | ||
collectionStatus.put( | ||
"creationTimeMillis", clusterStateCollection.getCreationTime().toEpochMilli()); | ||
|
||
if (collectionVsAliases.containsKey(name) && !collectionVsAliases.get(name).isEmpty()) { | ||
collectionStatus.put("aliases", collectionVsAliases.get(name)); | ||
} | ||
String configName = clusterStateCollection.getConfigName(); | ||
collectionStatus.put("configName", configName); | ||
if (solrParams.getBool("prs", false) && clusterStateCollection.isPerReplicaState()) { | ||
PerReplicaStates prs = clusterStateCollection.getPerReplicaStates(); | ||
collectionStatus.put("PRS", prs); | ||
} | ||
collectionProps.add(name, collectionStatus); | ||
|
||
// now we need to walk the collectionProps tree to cross-check replica state with live | ||
// nodes | ||
crossCheckReplicaStateWithLiveNodes(liveNodes, collectionProps); | ||
return collectionProps; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was hoping we wouldn't need a custom Iterator. We do need a method that takes in DocCollection (and some other context that is the same across collections) and returns a NamedList. With such a method, we can call it via streamOfDocCollection.map(collState -> theMethod(collState, routeKey, liveNodes, etc.).iterator()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah don't know why I didn't do this first. Changed it appropriately.