Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Optimize NodeIndicesStats output behind flag #15513

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326))
- [Workload Management] Add query group stats constructs ([#15343](https://github.com/opensearch-project/OpenSearch/pull/15343)))
- Add runAs to Subject interface and introduce IdentityAwarePlugin extension point ([#14630](https://github.com/opensearch-project/OpenSearch/pull/14630))
- Optimize NodeIndicesStats output behind flag ([#14454](https://github.com/opensearch-project/OpenSearch/pull/14454))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class CommonStatsFlags implements Writeable, Cloneable {
// Used for metric CACHE_STATS, to determine which caches to report stats for
private EnumSet<CacheType> includeCaches = EnumSet.noneOf(CacheType.class);
private String[] levels = new String[0];
private boolean includeIndicesStatsByLevel = false;

/**
* @param flags flags to set. If no flags are supplied, default flags will be set.
Expand Down Expand Up @@ -106,6 +107,9 @@ public CommonStatsFlags(StreamInput in) throws IOException {
includeCaches = in.readEnumSet(CacheType.class);
levels = in.readStringArray();
}
if (in.getVersion().onOrAfter(Version.V_2_17_0)) {
includeIndicesStatsByLevel = in.readBoolean();
}
}

@Override
Expand Down Expand Up @@ -135,6 +139,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeEnumSet(includeCaches);
out.writeStringArrayNullable(levels);
}
if (out.getVersion().onOrAfter(Version.V_2_17_0)) {
out.writeBoolean(includeIndicesStatsByLevel);
}
}

/**
Expand Down Expand Up @@ -273,6 +280,14 @@ public boolean includeSegmentFileSizes() {
return this.includeSegmentFileSizes;
}

public void setIncludeIndicesStatsByLevel(boolean includeIndicesStatsByLevel) {
this.includeIndicesStatsByLevel = includeIndicesStatsByLevel;
}

public boolean getIncludeIndicesStatsByLevel() {
return this.includeIndicesStatsByLevel;
}

public boolean isSet(Flag flag) {
return flags.contains(flag);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -758,8 +758,12 @@
break;
}
}

return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats);
if (flags.getIncludeIndicesStatsByLevel()) {
NodeIndicesStats.StatsLevel statsLevel = NodeIndicesStats.getAcceptedLevel(flags.getLevels());
return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats, statsLevel);

Check warning on line 763 in server/src/main/java/org/opensearch/indices/IndicesService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/IndicesService.java#L762-L763

Added lines #L762 - L763 were not covered by tests
} else {
return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats);
}
}

Map<Index, List<IndexShardStats>> statsByShard(final IndicesService indicesService, final CommonStatsFlags flags) {
Expand Down
199 changes: 171 additions & 28 deletions server/src/main/java/org/opensearch/indices/NodeIndicesStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.indices;

import org.opensearch.Version;
import org.opensearch.action.admin.indices.stats.CommonStats;
import org.opensearch.action.admin.indices.stats.IndexShardStats;
import org.opensearch.action.admin.indices.stats.ShardStats;
Expand Down Expand Up @@ -63,9 +64,11 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* Global information on indices stats running on a specific node.
Expand All @@ -74,26 +77,27 @@
*/
@PublicApi(since = "1.0.0")
public class NodeIndicesStats implements Writeable, ToXContentFragment {
private CommonStats stats;
private Map<Index, List<IndexShardStats>> statsByShard;
protected CommonStats stats;
protected Map<Index, CommonStats> statsByIndex;
protected Map<Index, List<IndexShardStats>> statsByShard;

public NodeIndicesStats(StreamInput in) throws IOException {
stats = new CommonStats(in);
if (in.readBoolean()) {
int entries = in.readVInt();
statsByShard = new HashMap<>();
for (int i = 0; i < entries; i++) {
Index index = new Index(in);
int indexShardListSize = in.readVInt();
List<IndexShardStats> indexShardStats = new ArrayList<>(indexShardListSize);
for (int j = 0; j < indexShardListSize; j++) {
indexShardStats.add(new IndexShardStats(in));
}
statsByShard.put(index, indexShardStats);
if (in.getVersion().onOrAfter(Version.V_2_17_0)) {
// contains statsByIndex
if (in.readBoolean()) {
statsByIndex = readStatsByIndex(in);
}
}
if (in.readBoolean()) {
statsByShard = readStatsByShard(in);
}
}

/**
* Without passing the information of the levels to the constructor, we return the Node-level aggregated stats as
* {@link CommonStats} along with a hash-map containing Index to List of Shard Stats.
*/
public NodeIndicesStats(CommonStats oldStats, Map<Index, List<IndexShardStats>> statsByShard, SearchRequestStats searchRequestStats) {
// this.stats = stats;
this.statsByShard = statsByShard;
Expand All @@ -112,6 +116,90 @@
}
}

/**
* Passing the level information to the nodes allows us to aggregate the stats based on the level passed. This
* allows us to aggregate based on NodeLevel (default - if no level is passed) or Index level if `indices` level is
* passed and finally return the statsByShards map if `shards` level is passed. This allows us to reduce ser/de of
* stats and return only the information that is required while returning to the client.
*/
public NodeIndicesStats(
CommonStats oldStats,
Map<Index, List<IndexShardStats>> statsByShard,
SearchRequestStats searchRequestStats,
StatsLevel level
) {
// make a total common stats from old ones and current ones
this.stats = oldStats;
for (List<IndexShardStats> shardStatsList : statsByShard.values()) {
for (IndexShardStats indexShardStats : shardStatsList) {
for (ShardStats shardStats : indexShardStats.getShards()) {
stats.add(shardStats.getStats());
}
}
}

if (this.stats.search != null) {
this.stats.search.setSearchRequestStats(searchRequestStats);

Check warning on line 142 in server/src/main/java/org/opensearch/indices/NodeIndicesStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/NodeIndicesStats.java#L142

Added line #L142 was not covered by tests
}

if (level != null) {
switch (level) {
case INDICES:
this.statsByIndex = createStatsByIndex(statsByShard);
break;
case SHARDS:
this.statsByShard = statsByShard;
break;
}
}
}

/**
* By default, the levels passed from the transport action will be a list of strings, since NodeIndicesStats can
* only aggregate on one level, we pick the first accepted level else we ignore if no known level is passed. Level is
* selected based on enum defined in {@link StatsLevel}
*
* Note - we are picking the first level as multiple levels are not supported in the previous versions.
* @param levels - levels sent in the request.
*
* @return Corresponding identified enum {@link StatsLevel}
*/
public static StatsLevel getAcceptedLevel(String[] levels) {
if (levels != null && levels.length > 0) {
Optional<StatsLevel> level = Arrays.stream(StatsLevel.values())
.filter(field -> field.getRestName().equals(levels[0]))
.findFirst();
return level.orElseThrow(() -> new IllegalArgumentException("Level provided is not supported by NodeIndicesStats"));

Check warning on line 172 in server/src/main/java/org/opensearch/indices/NodeIndicesStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/NodeIndicesStats.java#L169-L172

Added lines #L169 - L172 were not covered by tests
}
return null;

Check warning on line 174 in server/src/main/java/org/opensearch/indices/NodeIndicesStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/NodeIndicesStats.java#L174

Added line #L174 was not covered by tests
}

private Map<Index, CommonStats> readStatsByIndex(StreamInput in) throws IOException {
Map<Index, CommonStats> statsByIndex = new HashMap<>();
int indexEntries = in.readVInt();
for (int i = 0; i < indexEntries; i++) {
Index index = new Index(in);
CommonStats commonStats = new CommonStats(in);
statsByIndex.put(index, commonStats);
}
return statsByIndex;
}

private Map<Index, List<IndexShardStats>> readStatsByShard(StreamInput in) throws IOException {
Map<Index, List<IndexShardStats>> statsByShard = new HashMap<>();
int entries = in.readVInt();
for (int i = 0; i < entries; i++) {
Index index = new Index(in);
int indexShardListSize = in.readVInt();
List<IndexShardStats> indexShardStats = new ArrayList<>(indexShardListSize);
for (int j = 0; j < indexShardListSize; j++) {
indexShardStats.add(new IndexShardStats(in));
}
statsByShard.put(index, indexShardStats);
}
return statsByShard;
}

@Nullable
public StoreStats getStore() {
return stats.getStore();
Expand Down Expand Up @@ -195,7 +283,31 @@
@Override
public void writeTo(StreamOutput out) throws IOException {
stats.writeTo(out);

if (out.getVersion().onOrAfter(Version.V_2_17_0)) {
out.writeBoolean(statsByIndex != null);
if (statsByIndex != null) {
writeStatsByIndex(out);
}
}

out.writeBoolean(statsByShard != null);
if (statsByShard != null) {
writeStatsByShards(out);
}
}

private void writeStatsByIndex(StreamOutput out) throws IOException {
if (statsByIndex != null) {
out.writeVInt(statsByIndex.size());
for (Map.Entry<Index, CommonStats> entry : statsByIndex.entrySet()) {
entry.getKey().writeTo(out);
entry.getValue().writeTo(out);
}
}
}

private void writeStatsByShards(StreamOutput out) throws IOException {
if (statsByShard != null) {
out.writeVInt(statsByShard.size());
for (Map.Entry<Index, List<IndexShardStats>> entry : statsByShard.entrySet()) {
Expand All @@ -210,29 +322,46 @@

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
final String level = params.param("level", "node");
final boolean isLevelValid = "indices".equalsIgnoreCase(level)
|| "node".equalsIgnoreCase(level)
|| "shards".equalsIgnoreCase(level);
final String level = params.param("level", StatsLevel.NODE.getRestName());
final boolean isLevelValid = StatsLevel.NODE.getRestName().equalsIgnoreCase(level)
|| StatsLevel.INDICES.getRestName().equalsIgnoreCase(level)
|| StatsLevel.SHARDS.getRestName().equalsIgnoreCase(level);
if (!isLevelValid) {
throw new IllegalArgumentException("level parameter must be one of [indices] or [node] or [shards] but was [" + level + "]");
throw new IllegalArgumentException(
"level parameter must be one of ["
+ StatsLevel.INDICES.getRestName()
+ "] or ["
+ StatsLevel.NODE.getRestName()
+ "] or ["
+ StatsLevel.SHARDS.getRestName()
+ "] but was ["
+ level
+ "]"
);
}

// "node" level
builder.startObject(Fields.INDICES);
builder.startObject(StatsLevel.INDICES.getRestName());
stats.toXContent(builder, params);

if ("indices".equals(level)) {
Map<Index, CommonStats> indexStats = createStatsByIndex();
builder.startObject(Fields.INDICES);
for (Map.Entry<Index, CommonStats> entry : indexStats.entrySet()) {
if (StatsLevel.INDICES.getRestName().equals(level)) {
assert statsByIndex != null || statsByShard != null : "Expected shard stats or index stats in response for generating ["
+ StatsLevel.INDICES
+ "] field";
if (statsByIndex == null) {
statsByIndex = createStatsByIndex(statsByShard);
}

builder.startObject(StatsLevel.INDICES.getRestName());
for (Map.Entry<Index, CommonStats> entry : statsByIndex.entrySet()) {
builder.startObject(entry.getKey().getName());
entry.getValue().toXContent(builder, params);
builder.endObject();
}
builder.endObject();
} else if ("shards".equals(level)) {
builder.startObject("shards");
} else if (StatsLevel.SHARDS.getRestName().equals(level)) {
builder.startObject(StatsLevel.SHARDS.getRestName());
assert statsByShard != null : "Expected shard stats in response for generating [" + StatsLevel.SHARDS + "] field";
for (Map.Entry<Index, List<IndexShardStats>> entry : statsByShard.entrySet()) {
builder.startArray(entry.getKey().getName());
for (IndexShardStats indexShardStats : entry.getValue()) {
Expand All @@ -251,7 +380,7 @@
return builder;
}

private Map<Index, CommonStats> createStatsByIndex() {
private Map<Index, CommonStats> createStatsByIndex(Map<Index, List<IndexShardStats>> statsByShard) {
Map<Index, CommonStats> statsMap = new HashMap<>();
for (Map.Entry<Index, List<IndexShardStats>> entry : statsByShard.entrySet()) {
if (!statsMap.containsKey(entry.getKey())) {
Expand Down Expand Up @@ -281,7 +410,21 @@
*
* @opensearch.internal
*/
static final class Fields {
static final String INDICES = "indices";
@PublicApi(since = "2.17.0")
public enum StatsLevel {
INDICES("indices"),
SHARDS("shards"),
NODE("node");

private final String restName;

StatsLevel(String restName) {
this.restName = restName;
}

public String getRestName() {
return restName;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
String[] levels = Strings.splitStringByCommaToArray(request.param("level"));
nodesStatsRequest.indices().setLevels(levels);
nodesStatsRequest.setIncludeDiscoveryNodes(false);
nodesStatsRequest.indices().setIncludeIndicesStatsByLevel(true);

return channel -> client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@
NodesStatsRequest.Metric.PROCESS.metricName(),
NodesStatsRequest.Metric.SCRIPT.metricName()
);
nodesStatsRequest.indices().setIncludeIndicesStatsByLevel(true);

Check warning on line 151 in server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java#L151

Added line #L151 was not covered by tests
client.admin().cluster().nodesStats(nodesStatsRequest, new RestResponseListener<NodesStatsResponse>(channel) {
@Override
public RestResponse buildResponse(NodesStatsResponse nodesStatsResponse) throws Exception {
Expand Down
Loading
Loading