Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query grouping framework for Top N queries and group by query similarity #66

Merged
merged 29 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c14385f
Query grouping framework and group by query similarity
deshsidd Aug 2, 2024
0a923b8
Spotless apply
deshsidd Aug 2, 2024
f56df30
Build fix
deshsidd Aug 2, 2024
0995a3a
Properly configure settings update consumer
deshsidd Aug 2, 2024
af90f5c
Address review comments
deshsidd Aug 5, 2024
a337ef0
Refactor unit tests
deshsidd Aug 5, 2024
08bb29d
Decouple Measurement and MetricType
deshsidd Aug 6, 2024
6b35126
Aggregate type NONE will ensure no aggregations computed
deshsidd Aug 6, 2024
d940428
Perform renaming
deshsidd Aug 26, 2024
cfdf489
Integrate query shape library with grouping
deshsidd Aug 26, 2024
e3a87f6
Spotless
deshsidd Aug 26, 2024
edf492d
Create and consume string hashcode interface
deshsidd Aug 26, 2024
0c469ac
Health checks in code
deshsidd Aug 28, 2024
fcd4097
Fix tests and spotless apply
deshsidd Aug 28, 2024
c695463
Minor fixes
deshsidd Aug 28, 2024
a90169a
Max groups setting and unit tests
deshsidd Aug 29, 2024
ebee6ae
Address review comments
deshsidd Aug 30, 2024
b98050d
Address review comments
deshsidd Aug 30, 2024
cd05fb6
Create query grouper interface and top query store interface
deshsidd Aug 30, 2024
3147fa6
Address review comments
deshsidd Aug 30, 2024
3dfe973
Removed unused interface
deshsidd Aug 30, 2024
980eaa8
Rebase main and spotless
deshsidd Aug 31, 2024
32cec07
Renaming variable
deshsidd Sep 3, 2024
a04da0c
Remove TopQueriesStore interface
deshsidd Sep 3, 2024
2db647d
Drain top queries service on group change
deshsidd Sep 3, 2024
07ce827
Rename max groups setting and allow minimum 0
deshsidd Sep 4, 2024
357d2cd
Make write/read from io backword compatible
deshsidd Sep 4, 2024
d891909
Minor fix
deshsidd Sep 4, 2024
3493e65
Refactor query grouper
deshsidd Sep 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ public List<Setting<?>> getSettings() {
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_SIZE,
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_WINDOW_SIZE,
QueryInsightsSettings.TOP_N_MEMORY_EXPORTER_SETTINGS,
QueryInsightsSettings.TOP_N_QUERIES_GROUP_BY,
QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N,
QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package org.opensearch.plugin.insights.core.listener;

import static org.opensearch.plugin.insights.settings.QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_QUERIES_GROUP_BY;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNEnabledSetting;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNSizeSetting;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNWindowSizeSetting;
Expand All @@ -31,7 +33,9 @@
import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
import org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator;
import org.opensearch.plugin.insights.rules.model.Attribute;
import org.opensearch.plugin.insights.rules.model.Measurement;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.tasks.Task;
Expand Down Expand Up @@ -101,6 +105,26 @@ public QueryInsightsListener(
this.queryInsightsService.setWindowSize(type, clusterService.getClusterSettings().get(getTopNWindowSizeSetting(type)));
}

// Settings endpoints set for grouping top n queries
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_QUERIES_GROUP_BY,
v -> this.queryInsightsService.setGrouping(v),
v -> this.queryInsightsService.validateGrouping(v)
);
this.queryInsightsService.validateGrouping(clusterService.getClusterSettings().get(TOP_N_QUERIES_GROUP_BY));
this.queryInsightsService.setGrouping(clusterService.getClusterSettings().get(TOP_N_QUERIES_GROUP_BY));
deshsidd marked this conversation as resolved.
Show resolved Hide resolved

clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N,
v -> this.queryInsightsService.setMaximumGroups(v),
v -> this.queryInsightsService.validateMaximumGroups(v)
);
this.queryInsightsService.validateMaximumGroups(clusterService.getClusterSettings().get(TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N));
this.queryInsightsService.setMaximumGroups(clusterService.getClusterSettings().get(TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N));

// Settings endpoints set for search query metrics
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, v -> setSearchQueryMetricsEnabled(v));
setSearchQueryMetricsEnabled(clusterService.getClusterSettings().get(SEARCH_QUERY_METRICS_ENABLED_SETTING));
Expand Down Expand Up @@ -191,32 +215,40 @@ private void constructSearchQueryRecord(final SearchPhaseContext context, final

final SearchRequest request = context.getRequest();
try {
Map<MetricType, Number> measurements = new HashMap<>();
Map<MetricType, Measurement> measurements = new HashMap<>();
if (shouldCollect(MetricType.LATENCY)) {
measurements.put(
MetricType.LATENCY,
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos())
new Measurement(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos()))
);
}
if (shouldCollect(MetricType.CPU)) {
measurements.put(
MetricType.CPU,
tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getCpuTimeInNanos()).mapToLong(Long::longValue).sum()
new Measurement(
tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getCpuTimeInNanos()).mapToLong(Long::longValue).sum()
)
);
}
if (shouldCollect(MetricType.MEMORY)) {
measurements.put(
MetricType.MEMORY,
tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getMemoryInBytes()).mapToLong(Long::longValue).sum()
new Measurement(
tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getMemoryInBytes()).mapToLong(Long::longValue).sum()
)
);
}

String hashcode = QueryShapeGenerator.getShapeHashCodeAsString(request.source(), false);
deshsidd marked this conversation as resolved.
Show resolved Hide resolved

Map<Attribute, Object> attributes = new HashMap<>();
attributes.put(Attribute.SEARCH_TYPE, request.searchType().toString().toLowerCase(Locale.ROOT));
attributes.put(Attribute.SOURCE, request.source());
attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards());
attributes.put(Attribute.INDICES, request.indices());
attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap());
attributes.put(Attribute.TASK_RESOURCE_USAGES, tasksResourceUsages);
attributes.put(Attribute.QUERY_HASHCODE, hashcode);

Map<String, Object> labels = new HashMap<>();
// Retrieve user provided label if exists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.plugin.insights.core.service;

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_GROUPING_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getExporterSettings;

import java.io.IOException;
Expand All @@ -27,6 +28,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory;
import org.opensearch.plugin.insights.core.service.categorizer.SearchQueryCategorizer;
import org.opensearch.plugin.insights.rules.model.GroupingType;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
Expand Down Expand Up @@ -73,6 +75,11 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
*/
final QueryInsightsExporterFactory queryInsightsExporterFactory;

/**
* Flags for enabling insight data grouping for different metric types
*/
private GroupingType groupingType;

private volatile boolean searchQueryMetricsEnabled;

private SearchQueryCategorizer searchQueryCategorizer;
Expand Down Expand Up @@ -112,16 +119,17 @@ public QueryInsightsService(

this.searchQueryCategorizer = SearchQueryCategorizer.getInstance(metricsRegistry);
this.enableSearchQueryMetricsFeature(false);
this.groupingType = DEFAULT_GROUPING_TYPE;
}

/**
* Ingest the query data into in-memory stores
*
* @param record the record to ingest
* @return SearchQueryRecord
* @return true/false
*/
public boolean addRecord(final SearchQueryRecord record) {
boolean shouldAdd = searchQueryMetricsEnabled;
boolean shouldAdd = isSearchQueryMetricsFeatureEnabled() || isGroupingEnabled();
deshsidd marked this conversation as resolved.
Show resolved Hide resolved
if (!shouldAdd) {
for (Map.Entry<MetricType, TopQueriesService> entry : topQueriesServices.entrySet()) {
if (!enableCollect.get(entry.getKey())) {
Expand Down Expand Up @@ -185,6 +193,67 @@ public void enableCollection(final MetricType metricType, final boolean enable)
this.topQueriesServices.get(metricType).setEnabled(enable);
}

/**
* Validate grouping given grouping type setting
* @param groupingTypeSetting grouping setting
*/
public void validateGrouping(final String groupingTypeSetting) {
GroupingType.getGroupingTypeFromSettingAndValidate(groupingTypeSetting);
}

/**
* Set grouping
* @param groupingTypeSetting grouping
*/
public void setGrouping(final String groupingTypeSetting) {
GroupingType newGroupingType = GroupingType.getGroupingTypeFromSettingAndValidate(groupingTypeSetting);
GroupingType oldGroupingType = groupingType;

if (oldGroupingType != newGroupingType) {
groupingType = newGroupingType;

for (MetricType metricType : MetricType.allMetricTypes()) {
this.topQueriesServices.get(metricType).setGrouping(newGroupingType);
}
}
}

/**
* Set max number of groups
* @param maxGroups maximum number of groups that should be tracked when calculating Top N groups
*/
public void setMaximumGroups(final int maxGroups) {
for (MetricType metricType : MetricType.allMetricTypes()) {
this.topQueriesServices.get(metricType).setMaxGroups(maxGroups);
}
}

/**
* Validate max number of groups. Should be between 1 and MAX_GROUPS_LIMIT
* @param maxGroups maximum number of groups that should be tracked when calculating Top N groups
*/
public void validateMaximumGroups(final int maxGroups) {
if (maxGroups < 0 || maxGroups > QueryInsightsSettings.MAX_GROUPS_EXCLUDING_TOPN_LIMIT) {
throw new IllegalArgumentException(
"Max groups setting"
+ " should be between 0 and "
+ QueryInsightsSettings.MAX_GROUPS_EXCLUDING_TOPN_LIMIT
+ ", was ("
+ maxGroups
+ ")"
);
}
}

/**
* Get the grouping type based on the metricType
* @return GroupingType
*/

public GroupingType getGrouping() {
return groupingType;
}

/**
* Get if the Query Insights data collection is enabled for a MetricType
*
Expand Down Expand Up @@ -226,9 +295,18 @@ public boolean isSearchQueryMetricsFeatureEnabled() {
return this.searchQueryMetricsEnabled;
}

/**
* Is grouping feature enabled and TopN feature enabled
* @return boolean
*/
public boolean isGroupingEnabled() {
return this.groupingType != GroupingType.NONE && isTopNFeatureEnabled();
}

/**
* Enable/Disable search query metrics feature.
* @param enable enable/disable search query metrics feature
* Stops query insights service if no features enabled
*/
public void enableSearchQueryMetricsFeature(boolean enable) {
searchQueryMetricsEnabled = enable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.PriorityQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -35,6 +35,10 @@
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter;
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory;
import org.opensearch.plugin.insights.core.exporter.SinkType;
import org.opensearch.plugin.insights.core.service.grouper.MinMaxHeapQueryGrouper;
import org.opensearch.plugin.insights.core.service.grouper.QueryGrouper;
import org.opensearch.plugin.insights.rules.model.AggregationType;
import org.opensearch.plugin.insights.rules.model.GroupingType;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
Expand Down Expand Up @@ -66,7 +70,7 @@ public class TopQueriesService {
/**
* The internal thread-safe store that holds the top n queries insight data
*/
private final PriorityQueue<SearchQueryRecord> topQueriesStore;
private final PriorityBlockingQueue<SearchQueryRecord> topQueriesStore;

/**
* The AtomicReference of a snapshot of the current window top queries for getters to consume
Expand All @@ -93,6 +97,8 @@ public class TopQueriesService {
*/
private QueryInsightsExporter exporter;

private QueryGrouper queryGrouper;

TopQueriesService(
final MetricType metricType,
final ThreadPool threadPool,
Expand All @@ -106,9 +112,16 @@ public class TopQueriesService {
this.windowSize = QueryInsightsSettings.DEFAULT_WINDOW_SIZE;
this.windowStart = -1L;
this.exporter = null;
topQueriesStore = new PriorityQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType));
topQueriesStore = new PriorityBlockingQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType));
topQueriesCurrentSnapshot = new AtomicReference<>(new ArrayList<>());
topQueriesHistorySnapshot = new AtomicReference<>(new ArrayList<>());
queryGrouper = new MinMaxHeapQueryGrouper(
metricType,
QueryInsightsSettings.DEFAULT_GROUPING_TYPE,
AggregationType.AVERAGE,
topQueriesStore,
topNSize
);
}

/**
Expand All @@ -118,6 +131,7 @@ public class TopQueriesService {
*/
public void setTopNSize(final int topNSize) {
this.topNSize = topNSize;
this.queryGrouper.updateTopNSize(topNSize);
}

/**
Expand Down Expand Up @@ -169,6 +183,20 @@ public void setWindowSize(final TimeValue windowSize) {
this.windowStart = -1L;
}

public void setGrouping(final GroupingType groupingType) {
boolean changed = queryGrouper.setGroupingType(groupingType);
if (changed) {
drain();
}
}

public void setMaxGroups(final int maxGroups) {
boolean changed = queryGrouper.setMaxGroups(maxGroups);
if (changed) {
drain();
}
}

/**
* Validate if the window size is valid, based on internal constrains.
*
Expand Down Expand Up @@ -306,10 +334,16 @@ void consumeRecords(final List<SearchQueryRecord> records) {
}

private void addToTopNStore(final List<SearchQueryRecord> records) {
topQueriesStore.addAll(records);
// remove top elements for fix sizing priority queue
while (topQueriesStore.size() > topNSize) {
topQueriesStore.poll();
if (queryGrouper.getGroupingType() != GroupingType.NONE) {
for (SearchQueryRecord record : records) {
queryGrouper.add(record);
}
} else {
topQueriesStore.addAll(records);
// remove top elements for fix sizing priority queue
while (topQueriesStore.size() > topNSize) {
topQueriesStore.poll();
}
}
}

Expand All @@ -329,6 +363,9 @@ private void rotateWindowIfNecessary(final long newWindowStart) {
}
topQueriesHistorySnapshot.set(history);
topQueriesStore.clear();
if (queryGrouper.getGroupingType() != GroupingType.NONE) {
queryGrouper.drain();
}
topQueriesCurrentSnapshot.set(new ArrayList<>());
windowStart = newWindowStart;
// export to the configured sink
Expand Down Expand Up @@ -368,4 +405,13 @@ public List<SearchQueryRecord> getTopQueriesCurrentSnapshot() {
public void close() throws IOException {
queryInsightsExporterFactory.closeExporter(this.exporter);
}

/**
* Drain internal stores.
*/
private void drain() {
topQueriesStore.clear();
topQueriesHistorySnapshot.set(new ArrayList<>());
topQueriesCurrentSnapshot.set(new ArrayList<>());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ public static MurmurHash3.Hash128 getShapeHashCode(SearchSourceBuilder source, B
return MurmurHash3.hash128(shapeBytes.bytes, 0, shapeBytes.length, 0, new MurmurHash3.Hash128());
}

public static String getShapeHashCodeAsString(SearchSourceBuilder source, Boolean showFields) {
MurmurHash3.Hash128 hashcode = getShapeHashCode(source, showFields);
String hashAsString = Long.toHexString(hashcode.h1) + Long.toHexString(hashcode.h2);
return hashAsString;
}

/**
* Method to build search query shape given a source
* @param source search request source
Expand Down
Loading
Loading