Skip to content

Commit

Permalink
Add data models for health stats API (#120) (#126)
Browse files Browse the repository at this point in the history
* Add data models for health stats api



* update PR based on comments



---------


(cherry picked from commit d74adb3)

Signed-off-by: Chenyang Ji <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 6843023 commit 929141c
Show file tree
Hide file tree
Showing 15 changed files with 712 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.plugin.insights.core.service;

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_GROUPING_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getExporterSettings;

import java.io.IOException;
Expand All @@ -18,6 +19,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
Expand All @@ -33,6 +35,8 @@
import org.opensearch.plugin.insights.rules.model.GroupingType;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.rules.model.healthStats.QueryInsightsHealthStats;
import org.opensearch.plugin.insights.rules.model.healthStats.TopQueriesHealthStats;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.threadpool.Scheduler;
Expand Down Expand Up @@ -439,4 +443,20 @@ protected void doClose() throws IOException {
queryInsightsExporterFactory.closeAllExporters();
queryInsightsReaderFactory.closeAllReaders();
}

/**
* Get health stats for query insights services
*
* @return QueryInsightsHealthStats
*/
public QueryInsightsHealthStats getHealthStats() {
Map<MetricType, TopQueriesHealthStats> topQueriesHealthStatsMap = topQueriesServices.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getHealthStats()));
return new QueryInsightsHealthStats(
threadPool.info(QUERY_INSIGHTS_EXECUTOR),
this.queryRecordsQueue.size(),
topQueriesHealthStatsMap
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.plugin.insights.rules.model.GroupingType;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.rules.model.healthStats.TopQueriesHealthStats;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -516,4 +517,13 @@ private void drain() {
topQueriesHistorySnapshot.set(new ArrayList<>());
topQueriesCurrentSnapshot.set(new ArrayList<>());
}

/**
* Get top queries service health stats
*
* @return TopQueriesHealthStats
*/
public TopQueriesHealthStats getHealthStats() {
return new TopQueriesHealthStats(this.topQueriesStore.size(), this.queryGrouper.getHealthStats());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.plugin.insights.rules.model.GroupingType;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.rules.model.healthStats.QueryGrouperHealthStats;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;

/**
Expand All @@ -39,12 +40,12 @@ public class MinMaxHeapQueryGrouper implements QueryGrouper {
/**
* Metric type for the current grouping service
*/
private MetricType metricType;
private final MetricType metricType;

/**
* Aggregation type for the current grouping service
*/
private AggregationType aggregationType;
private final AggregationType aggregationType;
/**
* Map storing groupingId to Tuple containing Aggregate search query record and boolean.
* SearchQueryRecord: Aggregate search query record to store the aggregate of a metric type based on the aggregation type..
Expand All @@ -53,18 +54,18 @@ public class MinMaxHeapQueryGrouper implements QueryGrouper {
* boolean: True if the aggregate record is in the Top N queries priority query (min heap) and False if the aggregate
* record is in the Max Heap
*/
private ConcurrentHashMap<String, Tuple<SearchQueryRecord, Boolean>> groupIdToAggSearchQueryRecord;
private final ConcurrentHashMap<String, Tuple<SearchQueryRecord, Boolean>> groupIdToAggSearchQueryRecord;
/**
* Min heap to keep track of the Top N query groups and is passed from TopQueriesService as the topQueriesStore
*/
private PriorityBlockingQueue<SearchQueryRecord> minHeapTopQueriesStore;
private final PriorityBlockingQueue<SearchQueryRecord> minHeapTopQueriesStore;
/**
* The Max heap is an overflow data structure used to manage records that exceed the capacity of the Min heap.
* It stores all records not included in the Top N query results. When the aggregate measurement for one of these
* records is updated and it now qualifies as part of the Top N, the record is moved from the Max heap to the Min heap,
* and the records are rearranged accordingly.
*/
private PriorityBlockingQueue<SearchQueryRecord> maxHeapQueryStore;
private final PriorityBlockingQueue<SearchQueryRecord> maxHeapQueryStore;

/**
* Top N size based on the configuration set
Expand All @@ -80,11 +81,11 @@ public class MinMaxHeapQueryGrouper implements QueryGrouper {
private int maxGroups;

public MinMaxHeapQueryGrouper(
MetricType metricType,
GroupingType groupingType,
AggregationType aggregationType,
PriorityBlockingQueue<SearchQueryRecord> topQueriesStore,
int topNSize
final MetricType metricType,
final GroupingType groupingType,
final AggregationType aggregationType,
final PriorityBlockingQueue<SearchQueryRecord> topQueriesStore,
final int topNSize
) {
this.groupingType = groupingType;
this.metricType = metricType;
Expand All @@ -103,7 +104,7 @@ public MinMaxHeapQueryGrouper(
* @return return the search query record that represents the group
*/
@Override
public SearchQueryRecord add(SearchQueryRecord searchQueryRecord) {
public SearchQueryRecord add(final SearchQueryRecord searchQueryRecord) {
if (groupingType == GroupingType.NONE) {
throw new IllegalArgumentException("Do not use addQueryToGroup when GroupingType is None");
}
Expand All @@ -120,8 +121,7 @@ public SearchQueryRecord add(SearchQueryRecord searchQueryRecord) {
// Add to min PQ and promote to max
// If max PQ is empty return else try to promote record from max to min
if (!groupIdToAggSearchQueryRecord.containsKey(groupId)) {
boolean maxGroupsLimitReached = checkMaxGroupsLimitReached(groupId);
if (maxGroupsLimitReached) {
if (checkMaxGroupsLimitReached(groupId)) {
return null;
}
aggregateSearchQueryRecord = searchQueryRecord;
Expand Down Expand Up @@ -158,7 +158,7 @@ public void drain() {
* @return grouping type changed
*/
@Override
public boolean setGroupingType(GroupingType newGroupingType) {
public boolean setGroupingType(final GroupingType newGroupingType) {
if (this.groupingType != newGroupingType) {
this.groupingType = newGroupingType;
drain();
Expand All @@ -183,7 +183,7 @@ public GroupingType getGroupingType() {
* @return max groups changed
*/
@Override
public boolean setMaxGroups(int maxGroups) {
public boolean setMaxGroups(final int maxGroups) {
if (this.maxGroups != maxGroups) {
this.maxGroups = maxGroups;
drain();
Expand All @@ -197,17 +197,21 @@ public boolean setMaxGroups(int maxGroups) {
* @param newSize new size
*/
@Override
public void updateTopNSize(int newSize) {
public void updateTopNSize(final int newSize) {
this.topNSize = newSize;
}

private void addToMinPQ(SearchQueryRecord searchQueryRecord, String groupId) {
private void addToMinPQ(final SearchQueryRecord searchQueryRecord, final String groupId) {
minHeapTopQueriesStore.add(searchQueryRecord);
groupIdToAggSearchQueryRecord.put(groupId, new Tuple<>(searchQueryRecord, true));
overflow();
}

private void addAndPromote(SearchQueryRecord searchQueryRecord, SearchQueryRecord aggregateSearchQueryRecord, String groupId) {
private void addAndPromote(
final SearchQueryRecord searchQueryRecord,
final SearchQueryRecord aggregateSearchQueryRecord,
final String groupId
) {
Number measurementToAdd = searchQueryRecord.getMeasurement(metricType);
aggregateSearchQueryRecord.addMeasurement(metricType, measurementToAdd);
addToMinPQ(aggregateSearchQueryRecord, groupId);
Expand All @@ -228,7 +232,7 @@ private void overflow() {
}
}

private boolean checkMaxGroupsLimitReached(String groupId) {
private boolean checkMaxGroupsLimitReached(final String groupId) {
if (maxGroups <= maxHeapQueryStore.size() && minHeapTopQueriesStore.size() >= topNSize) {
log.warn(
"Exceeded [{}] setting threshold which is set at {}. Discarding new group with id {}.",
Expand Down Expand Up @@ -259,11 +263,11 @@ int numberOfTopGroups() {
}

/**
* Get groupingId. This should be query hashcode for SIMILARITY grouping and user_id for USER_ID grouping.
* Get groupingId. This should be the query hashcode for SIMILARITY grouping and user_id for USER_ID grouping.
* @param searchQueryRecord record
* @return Grouping Id
*/
private String getGroupingId(SearchQueryRecord searchQueryRecord) {
private String getGroupingId(final SearchQueryRecord searchQueryRecord) {
switch (groupingType) {
case SIMILARITY:
return searchQueryRecord.getAttributes().get(Attribute.QUERY_HASHCODE).toString();
Expand All @@ -273,4 +277,13 @@ private String getGroupingId(SearchQueryRecord searchQueryRecord) {
throw new IllegalArgumentException("The following grouping type is not supported : " + groupingType);
}
}

/**
* Get health stats of the MinMaxHeapQueryGrouperService
*
* @return QueryGrouperHealthStats
*/
public QueryGrouperHealthStats getHealthStats() {
return new QueryGrouperHealthStats(this.groupIdToAggSearchQueryRecord.size(), this.maxHeapQueryStore.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.opensearch.plugin.insights.rules.model.GroupingType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.rules.model.healthStats.QueryGrouperHealthStats;

/**
* Interface for grouping search queries based on grouping type for the metric type.
Expand Down Expand Up @@ -57,4 +58,11 @@ public interface QueryGrouper {
* @param topNSize the new top N size
*/
void updateTopNSize(int topNSize);

/**
* Get health stats of the QueryGrouperService
*
* @return QueryGrouperHealthStats
*/
QueryGrouperHealthStats getHealthStats();
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public static MetricType fromString(final String metricType) {
* @param metricType the MetricType to write
* @throws IOException IOException
*/
static void writeTo(final StreamOutput out, final MetricType metricType) throws IOException {
public static void writeTo(final StreamOutput out, final MetricType metricType) throws IOException {
out.writeString(metricType.toString());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights.rules.model.healthStats;

import java.io.IOException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;

/**
* Represents the health statistics of the query grouper.
*/
public class QueryGrouperHealthStats implements ToXContentFragment, Writeable {
private final int queryGroupCount;
private final int queryGroupHeapSize;
private static final String QUERY_GROUP_COUNT_TOTAL = "QueryGroupCount_Total";
private static final String QUERY_GROUP_COUNT_MAX_HEAP = "QueryGroupCount_MaxHeap";

/**
* Constructor to read QueryGrouperHealthStats from a StreamInput.
*
* @param in the StreamInput to read the QueryGrouperHealthStats from
* @throws IOException IOException
*/
public QueryGrouperHealthStats(final StreamInput in) throws IOException {
this.queryGroupCount = in.readInt();
this.queryGroupHeapSize = in.readInt();
}

/**
* Constructor of QueryGrouperHealthStats
*
* @param queryGroupCount Number of groups in the grouper
* @param queryGroupHeapSize Heap size of the grouper
*/
public QueryGrouperHealthStats(final int queryGroupCount, final int queryGroupHeapSize) {
this.queryGroupCount = queryGroupCount;
this.queryGroupHeapSize = queryGroupHeapSize;
}

/**
* Write QueryGrouperHealthStats Object to output stream
* @param out streamOutput
* @throws IOException IOException
*/
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeInt(queryGroupCount);
out.writeInt(queryGroupHeapSize);
}

/**
* Write QueryGrouperHealthStats object to XContent
*
* @param builder XContentBuilder
* @param params Parameters
* @return XContentBuilder
* @throws IOException IOException
*/
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(QUERY_GROUP_COUNT_TOTAL, queryGroupCount);
builder.field(QUERY_GROUP_COUNT_MAX_HEAP, queryGroupHeapSize);
return builder;
}

/**
* Gets the number of query groups.
*
* @return the query group count
*/
public int getQueryGroupCount() {
return queryGroupCount;
}

/**
* Gets the query group heap size.
*
* @return the query group heap size
*/
public int getQueryGroupHeapSize() {
return queryGroupHeapSize;
}
}
Loading

0 comments on commit 929141c

Please sign in to comment.