Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.16] always populate resource usage metrics for categorization #42

Merged
merged 1 commit into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ public void onRequestFailure(final SearchPhaseContext context, final SearchReque
constructSearchQueryRecord(context, searchRequestContext);
}

private boolean shouldCollect(MetricType metricType) {
return queryInsightsService.isSearchQueryMetricsFeatureEnabled() || queryInsightsService.isCollectionEnabled(metricType);
}

private void constructSearchQueryRecord(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) {
SearchTask searchTask = context.getTask();
List<TaskResourceInfo> tasksResourceUsages = searchRequestContext.getPhaseResourceUsage();
Expand All @@ -154,19 +158,19 @@ private void constructSearchQueryRecord(final SearchPhaseContext context, final
final SearchRequest request = context.getRequest();
try {
Map<MetricType, Number> measurements = new HashMap<>();
if (queryInsightsService.isCollectionEnabled(MetricType.LATENCY)) {
if (shouldCollect(MetricType.LATENCY)) {
measurements.put(
MetricType.LATENCY,
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos())
);
}
if (queryInsightsService.isCollectionEnabled(MetricType.CPU)) {
if (shouldCollect(MetricType.CPU)) {
measurements.put(
MetricType.CPU,
tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getCpuTimeInNanos()).mapToLong(Long::longValue).sum()
);
}
if (queryInsightsService.isCollectionEnabled(MetricType.MEMORY)) {
if (shouldCollect(MetricType.MEMORY)) {
measurements.put(
MetricType.MEMORY,
tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getMemoryInBytes()).mapToLong(Long::longValue).sum()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,17 +123,19 @@ public QueryInsightsService(
* @param record the record to ingest
*/
public boolean addRecord(final SearchQueryRecord record) {
boolean shouldAdd = false;
for (Map.Entry<MetricType, TopQueriesService> entry : topQueriesServices.entrySet()) {
if (!enableCollect.get(entry.getKey())) {
continue;
}
List<SearchQueryRecord> currentSnapshot = entry.getValue().getTopQueriesCurrentSnapshot();
// skip add to top N queries store if the incoming record is smaller than the Nth record
if (currentSnapshot.size() < entry.getValue().getTopNSize()
|| SearchQueryRecord.compare(record, currentSnapshot.get(0), entry.getKey()) > 0) {
shouldAdd = true;
break;
boolean shouldAdd = searchQueryMetricsEnabled;
if (!shouldAdd) {
for (Map.Entry<MetricType, TopQueriesService> entry : topQueriesServices.entrySet()) {
if (!enableCollect.get(entry.getKey())) {
continue;
}
List<SearchQueryRecord> currentSnapshot = entry.getValue().getTopQueriesCurrentSnapshot();
// skip add to top N queries store if the incoming record is smaller than the Nth record
if (currentSnapshot.size() < entry.getValue().getTopNSize()
|| SearchQueryRecord.compare(record, currentSnapshot.get(0), entry.getKey()) > 0) {
shouldAdd = true;
break;
}
}
}
if (shouldAdd) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ public SearchQueryAggregationCategorizer(SearchQueryCounters searchQueryCounters
* @param aggregatorFactories input aggregations
* @param measurements latency, cpu, memory measurements
*/
public void incrementSearchQueryAggregationCounters(Collection<AggregationBuilder> aggregatorFactories, Map<MetricType, Number> measurements) {
public void incrementSearchQueryAggregationCounters(
Collection<AggregationBuilder> aggregatorFactories,
Map<MetricType, Number> measurements
) {
for (AggregationBuilder aggregationBuilder : aggregatorFactories) {
incrementCountersRecursively(aggregationBuilder, measurements);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public SearchQueryCounters(MetricsRegistry metricsRegistry) {
* Increment counter
* @param queryBuilder query builder
* @param level level of query builder, 0 being highest level
* @param measurements metrics measurements
*/
public void incrementCounter(QueryBuilder queryBuilder, int level, Map<MetricType, Number> measurements) {
String uniqueQueryCounterName = queryBuilder.getName();
Expand All @@ -116,6 +117,7 @@ public void incrementCounter(QueryBuilder queryBuilder, int level, Map<MetricTyp
* Increment aggregate counter
* @param value value to increment
* @param tags tags
* @param measurements metrics measurements
*/
public void incrementAggCounter(double value, Tags tags, Map<MetricType, Number> measurements) {
aggCounter.add(value, tags);
Expand All @@ -126,6 +128,7 @@ public void incrementAggCounter(double value, Tags tags, Map<MetricType, Number>
* Increment sort counter
* @param value value to increment
* @param tags tags
* @param measurements metrics measurements
*/
public void incrementSortCounter(double value, Tags tags, Map<MetricType, Number> measurements) {
sortCounter.add(value, tags);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ public void writeTo(final StreamOutput out) throws IOException {
out.writeLong(timestamp);
out.writeMap(measurements, (stream, metricType) -> MetricType.writeTo(out, metricType), StreamOutput::writeGenericValue);
out.writeMap(
attributes,
(stream, attribute) -> Attribute.writeTo(out, attribute),
(stream, attributeValue) -> Attribute.writeValueTo(out, attributeValue)
attributes,
(stream, attribute) -> Attribute.writeTo(out, attribute),
(stream, attributeValue) -> Attribute.writeValueTo(out, attributeValue)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.plugin.insights;

import org.junit.Before;
import org.opensearch.action.ActionRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -30,6 +29,7 @@
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.junit.Before;

import java.util.Arrays;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.test.OpenSearchTestCase.buildNewFakeTransportAddress;
import static org.opensearch.test.OpenSearchTestCase.random;
Expand All @@ -47,6 +45,8 @@
import static org.opensearch.test.OpenSearchTestCase.randomIntBetween;
import static org.opensearch.test.OpenSearchTestCase.randomLong;
import static org.opensearch.test.OpenSearchTestCase.randomLongBetween;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

final public class QueryInsightsTestUtils {

Expand Down Expand Up @@ -209,8 +209,8 @@ public static boolean checkRecordsEquals(List<SearchQueryRecord> records1, List<
return false;
} else if (value instanceof Map
&& !Maps.deepEquals((Map<Object, Object>) value, (Map<Object, Object>) attributes2.get(attribute))) {
return false;
}
return false;
}
}
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,12 @@

package org.opensearch.plugin.insights.core.listener;

import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.MockitoAnnotations;
import org.mockito.MockitoSession;
import org.opensearch.action.search.SearchPhaseContext;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchRequestContext;
import org.opensearch.action.search.SearchTask;
import org.opensearch.action.search.SearchType;
import org.opensearch.action.support.replication.ClusterStateCreationUtils;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.Tuple;
Expand All @@ -37,12 +32,11 @@
import org.opensearch.search.aggregations.support.ValueType;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.junit.Before;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -54,11 +48,11 @@
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

import org.mockito.ArgumentCaptor;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -115,7 +109,7 @@ public void testOnRequestEnd() throws InterruptedException {
Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel")
);

String[] indices = new String[]{"index-1", "index-2"};
String[] indices = new String[] { "index-1", "index-2" };

Map<String, Long> phaseLatencyMap = new HashMap<>();
phaseLatencyMap.put("expand", 0L);
Expand Down Expand Up @@ -164,7 +158,7 @@ public void testConcurrentOnRequestEnd() throws InterruptedException {
Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel")
);

String[] indices = new String[]{"index-1", "index-2"};
String[] indices = new String[] { "index-1", "index-2" };

Map<String, Long> phaseLatencyMap = new HashMap<>();
phaseLatencyMap.put("expand", 0L);
Expand Down Expand Up @@ -252,7 +246,6 @@ public void testTopNFeatureEnabledDisabled() {
verify(queryInsightsService3, never()).checkAndStopQueryInsights();
verify(queryInsightsService3, never()).checkAndRestartQueryInsights();


// Test case 4: Only CPU enabled initially, enable latency and verify expected behavior
QueryInsightsService queryInsightsService4 = mock(QueryInsightsService.class);
QueryInsightsListener queryInsightsListener4 = new QueryInsightsListener(clusterService, queryInsightsService4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.plugin.insights.core.service;

import org.junit.Before;
import org.opensearch.client.Client;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
Expand All @@ -19,12 +18,11 @@
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.junit.Before;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

/**
* Unit Tests for {@link QueryInsightsService}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,16 @@ public void testSmallNSize() {
}

public void testValidateTopNSize() {
assertThrows(IllegalArgumentException.class, () -> {
topQueriesService.validateTopNSize(QueryInsightsSettings.MAX_N_SIZE + 1);
});
assertThrows(IllegalArgumentException.class, () -> { topQueriesService.validateTopNSize(QueryInsightsSettings.MAX_N_SIZE + 1); });
}

public void testValidateNegativeTopNSize() {
assertThrows(IllegalArgumentException.class, () -> {
topQueriesService.validateTopNSize(-1);
});
assertThrows(IllegalArgumentException.class, () -> { topQueriesService.validateTopNSize(-1); });
}

public void testGetTopQueriesWhenNotEnabled() {
topQueriesService.setEnabled(false);
assertThrows(IllegalArgumentException.class, () -> {
topQueriesService.getTopQueriesRecords(false);
});
assertThrows(IllegalArgumentException.class, () -> { topQueriesService.getTopQueriesRecords(false); });
}

public void testValidateWindowSize() {
Expand All @@ -100,12 +94,8 @@ public void testValidateWindowSize() {
assertThrows(IllegalArgumentException.class, () -> {
topQueriesService.validateWindowSize(new TimeValue(QueryInsightsSettings.MIN_WINDOW_SIZE.getSeconds() - 1, TimeUnit.SECONDS));
});
assertThrows(IllegalArgumentException.class, () -> {
topQueriesService.validateWindowSize(new TimeValue(2, TimeUnit.DAYS));
});
assertThrows(IllegalArgumentException.class, () -> {
topQueriesService.validateWindowSize(new TimeValue(7, TimeUnit.MINUTES));
});
assertThrows(IllegalArgumentException.class, () -> { topQueriesService.validateWindowSize(new TimeValue(2, TimeUnit.DAYS)); });
assertThrows(IllegalArgumentException.class, () -> { topQueriesService.validateWindowSize(new TimeValue(7, TimeUnit.MINUTES)); });
}

private static void runUntilTimeoutOrFinish(DeterministicTaskQueue deterministicTaskQueue, long duration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@

package org.opensearch.plugin.insights.core.service.categorizor;

import org.junit.After;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.BoostingQueryBuilder;
import org.opensearch.index.query.MatchNoneQueryBuilder;
Expand All @@ -37,21 +34,24 @@
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.After;
import org.junit.Before;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import static org.opensearch.plugin.insights.QueryInsightsTestUtils.generateQueryInsightRecords;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.plugin.insights.QueryInsightsTestUtils.generateQueryInsightRecords;

public final class SearchQueryCategorizerTests extends OpenSearchTestCase {

Expand All @@ -71,19 +71,18 @@ public void setup() {
invocation -> mock(Counter.class)
);

when(metricsRegistry.createHistogram(any(String.class), any(String.class), any(String.class)))
.thenAnswer(new Answer<Histogram>() {
@Override
public Histogram answer(InvocationOnMock invocation) throws Throwable {
// Extract arguments to identify which histogram is being created
String name = invocation.getArgument(0);
// Create a mock histogram
Histogram histogram = mock(Histogram.class);
// Store histogram in map for lookup
histogramMap.put(name, histogram);
return histogram;
}
});
when(metricsRegistry.createHistogram(any(String.class), any(String.class), any(String.class))).thenAnswer(new Answer<Histogram>() {
@Override
public Histogram answer(InvocationOnMock invocation) throws Throwable {
// Extract arguments to identify which histogram is being created
String name = invocation.getArgument(0);
// Create a mock histogram
Histogram histogram = mock(Histogram.class);
// Store histogram in map for lookup
histogramMap.put(name, histogram);
return histogram;
}
});
searchQueryCategorizer = SearchQueryCategorizer.getInstance(metricsRegistry);
}

Expand Down Expand Up @@ -327,7 +326,6 @@ private void verifyMeasurementHistogramsIncremented(SearchQueryRecord record, in
Histogram queryTypeCpuHistogram = histogramMap.get("search.query.type.cpu.histogram");
Histogram queryTypeMemoryHistogram = histogramMap.get("search.query.type.memory.histogram");


verify(queryTypeLatencyHistogram, times(times)).record(eq(expectedLatency), any(Tags.class));
verify(queryTypeCpuHistogram, times(times)).record(eq(expectedCpu), any(Tags.class));
verify(queryTypeMemoryHistogram, times(times)).record(eq(expectedMemory), any(Tags.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@

import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.opensearch.test.rest.yaml.ClientYamlTestCandidate;
import org.opensearch.test.rest.yaml.OpenSearchClientYamlSuiteTestCase;


public class QueryInsightsClientYamlTestSuiteIT extends OpenSearchClientYamlSuiteTestCase {

public QueryInsightsClientYamlTestSuiteIT(@Name("yaml") ClientYamlTestCandidate testCandidate) {
Expand Down
Loading