Skip to content

Commit

Permalink
always populate resource usage metrics (#41)
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <[email protected]>
  • Loading branch information
ansjcy authored Jul 23, 2024
1 parent 6b0dddd commit e99dfcf
Show file tree
Hide file tree
Showing 12 changed files with 63 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ public void onRequestFailure(final SearchPhaseContext context, final SearchReque
constructSearchQueryRecord(context, searchRequestContext);
}

private boolean shouldCollect(MetricType metricType) {
return queryInsightsService.isSearchQueryMetricsFeatureEnabled() || queryInsightsService.isCollectionEnabled(metricType);
}

private void constructSearchQueryRecord(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) {
SearchTask searchTask = context.getTask();
List<TaskResourceInfo> tasksResourceUsages = searchRequestContext.getPhaseResourceUsage();
Expand All @@ -154,19 +158,19 @@ private void constructSearchQueryRecord(final SearchPhaseContext context, final
final SearchRequest request = context.getRequest();
try {
Map<MetricType, Number> measurements = new HashMap<>();
if (queryInsightsService.isCollectionEnabled(MetricType.LATENCY)) {
if (shouldCollect(MetricType.LATENCY)) {
measurements.put(
MetricType.LATENCY,
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos())
);
}
if (queryInsightsService.isCollectionEnabled(MetricType.CPU)) {
if (shouldCollect(MetricType.CPU)) {
measurements.put(
MetricType.CPU,
tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getCpuTimeInNanos()).mapToLong(Long::longValue).sum()
);
}
if (queryInsightsService.isCollectionEnabled(MetricType.MEMORY)) {
if (shouldCollect(MetricType.MEMORY)) {
measurements.put(
MetricType.MEMORY,
tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getMemoryInBytes()).mapToLong(Long::longValue).sum()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,17 +123,19 @@ public QueryInsightsService(
* @param record the record to ingest
*/
public boolean addRecord(final SearchQueryRecord record) {
boolean shouldAdd = false;
for (Map.Entry<MetricType, TopQueriesService> entry : topQueriesServices.entrySet()) {
if (!enableCollect.get(entry.getKey())) {
continue;
}
List<SearchQueryRecord> currentSnapshot = entry.getValue().getTopQueriesCurrentSnapshot();
// skip add to top N queries store if the incoming record is smaller than the Nth record
if (currentSnapshot.size() < entry.getValue().getTopNSize()
|| SearchQueryRecord.compare(record, currentSnapshot.get(0), entry.getKey()) > 0) {
shouldAdd = true;
break;
boolean shouldAdd = searchQueryMetricsEnabled;
if (!shouldAdd) {
for (Map.Entry<MetricType, TopQueriesService> entry : topQueriesServices.entrySet()) {
if (!enableCollect.get(entry.getKey())) {
continue;
}
List<SearchQueryRecord> currentSnapshot = entry.getValue().getTopQueriesCurrentSnapshot();
// skip add to top N queries store if the incoming record is smaller than the Nth record
if (currentSnapshot.size() < entry.getValue().getTopNSize()
|| SearchQueryRecord.compare(record, currentSnapshot.get(0), entry.getKey()) > 0) {
shouldAdd = true;
break;
}
}
}
if (shouldAdd) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ public SearchQueryAggregationCategorizer(SearchQueryCounters searchQueryCounters
* @param aggregatorFactories input aggregations
* @param measurements latency, cpu, memory measurements
*/
public void incrementSearchQueryAggregationCounters(Collection<AggregationBuilder> aggregatorFactories, Map<MetricType, Number> measurements) {
public void incrementSearchQueryAggregationCounters(
Collection<AggregationBuilder> aggregatorFactories,
Map<MetricType, Number> measurements
) {
for (AggregationBuilder aggregationBuilder : aggregatorFactories) {
incrementCountersRecursively(aggregationBuilder, measurements);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public SearchQueryCounters(MetricsRegistry metricsRegistry) {
* Increment counter
* @param queryBuilder query builder
* @param level level of query builder, 0 being highest level
* @param measurements metrics measurements
*/
public void incrementCounter(QueryBuilder queryBuilder, int level, Map<MetricType, Number> measurements) {
String uniqueQueryCounterName = queryBuilder.getName();
Expand All @@ -116,6 +117,7 @@ public void incrementCounter(QueryBuilder queryBuilder, int level, Map<MetricTyp
* Increment aggregate counter
* @param value value to increment
* @param tags tags
* @param measurements metrics measurements
*/
public void incrementAggCounter(double value, Tags tags, Map<MetricType, Number> measurements) {
aggCounter.add(value, tags);
Expand All @@ -126,6 +128,7 @@ public void incrementAggCounter(double value, Tags tags, Map<MetricType, Number>
* Increment sort counter
* @param value value to increment
* @param tags tags
* @param measurements metrics measurements
*/
public void incrementSortCounter(double value, Tags tags, Map<MetricType, Number> measurements) {
sortCounter.add(value, tags);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ public void writeTo(final StreamOutput out) throws IOException {
out.writeLong(timestamp);
out.writeMap(measurements, (stream, metricType) -> MetricType.writeTo(out, metricType), StreamOutput::writeGenericValue);
out.writeMap(
attributes,
(stream, attribute) -> Attribute.writeTo(out, attribute),
(stream, attributeValue) -> Attribute.writeValueTo(out, attributeValue)
attributes,
(stream, attribute) -> Attribute.writeTo(out, attribute),
(stream, attributeValue) -> Attribute.writeValueTo(out, attributeValue)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.plugin.insights;

import org.junit.Before;
import org.opensearch.action.ActionRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -30,6 +29,7 @@
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.junit.Before;

import java.util.Arrays;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.test.OpenSearchTestCase.buildNewFakeTransportAddress;
import static org.opensearch.test.OpenSearchTestCase.random;
Expand All @@ -47,6 +45,8 @@
import static org.opensearch.test.OpenSearchTestCase.randomIntBetween;
import static org.opensearch.test.OpenSearchTestCase.randomLong;
import static org.opensearch.test.OpenSearchTestCase.randomLongBetween;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

final public class QueryInsightsTestUtils {

Expand Down Expand Up @@ -209,8 +209,8 @@ public static boolean checkRecordsEquals(List<SearchQueryRecord> records1, List<
return false;
} else if (value instanceof Map
&& !Maps.deepEquals((Map<Object, Object>) value, (Map<Object, Object>) attributes2.get(attribute))) {
return false;
}
return false;
}
}
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,12 @@

package org.opensearch.plugin.insights.core.listener;

import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.MockitoAnnotations;
import org.mockito.MockitoSession;
import org.opensearch.action.search.SearchPhaseContext;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchRequestContext;
import org.opensearch.action.search.SearchTask;
import org.opensearch.action.search.SearchType;
import org.opensearch.action.support.replication.ClusterStateCreationUtils;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.Tuple;
Expand All @@ -37,12 +32,11 @@
import org.opensearch.search.aggregations.support.ValueType;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.junit.Before;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -54,11 +48,11 @@
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

import org.mockito.ArgumentCaptor;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -115,7 +109,7 @@ public void testOnRequestEnd() throws InterruptedException {
Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel")
);

String[] indices = new String[]{"index-1", "index-2"};
String[] indices = new String[] { "index-1", "index-2" };

Map<String, Long> phaseLatencyMap = new HashMap<>();
phaseLatencyMap.put("expand", 0L);
Expand Down Expand Up @@ -164,7 +158,7 @@ public void testConcurrentOnRequestEnd() throws InterruptedException {
Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel")
);

String[] indices = new String[]{"index-1", "index-2"};
String[] indices = new String[] { "index-1", "index-2" };

Map<String, Long> phaseLatencyMap = new HashMap<>();
phaseLatencyMap.put("expand", 0L);
Expand Down Expand Up @@ -252,7 +246,6 @@ public void testTopNFeatureEnabledDisabled() {
verify(queryInsightsService3, never()).checkAndStopQueryInsights();
verify(queryInsightsService3, never()).checkAndRestartQueryInsights();


// Test case 4: Only CPU enabled initially, enable latency and verify expected behavior
QueryInsightsService queryInsightsService4 = mock(QueryInsightsService.class);
QueryInsightsListener queryInsightsListener4 = new QueryInsightsListener(clusterService, queryInsightsService4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.plugin.insights.core.service;

import org.junit.Before;
import org.opensearch.client.Client;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
Expand All @@ -19,12 +18,11 @@
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.junit.Before;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

/**
* Unit Tests for {@link QueryInsightsService}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,16 @@ public void testSmallNSize() {
}

public void testValidateTopNSize() {
assertThrows(IllegalArgumentException.class, () -> {
topQueriesService.validateTopNSize(QueryInsightsSettings.MAX_N_SIZE + 1);
});
assertThrows(IllegalArgumentException.class, () -> { topQueriesService.validateTopNSize(QueryInsightsSettings.MAX_N_SIZE + 1); });
}

public void testValidateNegativeTopNSize() {
assertThrows(IllegalArgumentException.class, () -> {
topQueriesService.validateTopNSize(-1);
});
assertThrows(IllegalArgumentException.class, () -> { topQueriesService.validateTopNSize(-1); });
}

public void testGetTopQueriesWhenNotEnabled() {
topQueriesService.setEnabled(false);
assertThrows(IllegalArgumentException.class, () -> {
topQueriesService.getTopQueriesRecords(false);
});
assertThrows(IllegalArgumentException.class, () -> { topQueriesService.getTopQueriesRecords(false); });
}

public void testValidateWindowSize() {
Expand All @@ -100,12 +94,8 @@ public void testValidateWindowSize() {
assertThrows(IllegalArgumentException.class, () -> {
topQueriesService.validateWindowSize(new TimeValue(QueryInsightsSettings.MIN_WINDOW_SIZE.getSeconds() - 1, TimeUnit.SECONDS));
});
assertThrows(IllegalArgumentException.class, () -> {
topQueriesService.validateWindowSize(new TimeValue(2, TimeUnit.DAYS));
});
assertThrows(IllegalArgumentException.class, () -> {
topQueriesService.validateWindowSize(new TimeValue(7, TimeUnit.MINUTES));
});
assertThrows(IllegalArgumentException.class, () -> { topQueriesService.validateWindowSize(new TimeValue(2, TimeUnit.DAYS)); });
assertThrows(IllegalArgumentException.class, () -> { topQueriesService.validateWindowSize(new TimeValue(7, TimeUnit.MINUTES)); });
}

private static void runUntilTimeoutOrFinish(DeterministicTaskQueue deterministicTaskQueue, long duration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@

package org.opensearch.plugin.insights.core.service.categorizor;

import org.junit.After;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.BoostingQueryBuilder;
import org.opensearch.index.query.MatchNoneQueryBuilder;
Expand All @@ -37,21 +34,24 @@
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.After;
import org.junit.Before;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import static org.opensearch.plugin.insights.QueryInsightsTestUtils.generateQueryInsightRecords;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.plugin.insights.QueryInsightsTestUtils.generateQueryInsightRecords;

public final class SearchQueryCategorizerTests extends OpenSearchTestCase {

Expand All @@ -71,19 +71,18 @@ public void setup() {
invocation -> mock(Counter.class)
);

when(metricsRegistry.createHistogram(any(String.class), any(String.class), any(String.class)))
.thenAnswer(new Answer<Histogram>() {
@Override
public Histogram answer(InvocationOnMock invocation) throws Throwable {
// Extract arguments to identify which histogram is being created
String name = invocation.getArgument(0);
// Create a mock histogram
Histogram histogram = mock(Histogram.class);
// Store histogram in map for lookup
histogramMap.put(name, histogram);
return histogram;
}
});
when(metricsRegistry.createHistogram(any(String.class), any(String.class), any(String.class))).thenAnswer(new Answer<Histogram>() {
@Override
public Histogram answer(InvocationOnMock invocation) throws Throwable {
// Extract arguments to identify which histogram is being created
String name = invocation.getArgument(0);
// Create a mock histogram
Histogram histogram = mock(Histogram.class);
// Store histogram in map for lookup
histogramMap.put(name, histogram);
return histogram;
}
});
searchQueryCategorizer = SearchQueryCategorizer.getInstance(metricsRegistry);
}

Expand Down Expand Up @@ -327,7 +326,6 @@ private void verifyMeasurementHistogramsIncremented(SearchQueryRecord record, in
Histogram queryTypeCpuHistogram = histogramMap.get("search.query.type.cpu.histogram");
Histogram queryTypeMemoryHistogram = histogramMap.get("search.query.type.memory.histogram");


verify(queryTypeLatencyHistogram, times(times)).record(eq(expectedLatency), any(Tags.class));
verify(queryTypeCpuHistogram, times(times)).record(eq(expectedCpu), any(Tags.class));
verify(queryTypeMemoryHistogram, times(times)).record(eq(expectedMemory), any(Tags.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@

import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.opensearch.test.rest.yaml.ClientYamlTestCandidate;
import org.opensearch.test.rest.yaml.OpenSearchClientYamlSuiteTestCase;


public class QueryInsightsClientYamlTestSuiteIT extends OpenSearchClientYamlSuiteTestCase {

public QueryInsightsClientYamlTestSuiteIT(@Name("yaml") ClientYamlTestCandidate testCandidate) {
Expand Down

0 comments on commit e99dfcf

Please sign in to comment.