Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Improve profile API's error fetching efficiency (#117)
Browse files Browse the repository at this point in the history
* Improve profile API's error fetching efficiency

Previously, profile API scans all anomaly result indices to get a detector's most recent error, which can cause performance bottleneck with large anomaly result indices. This PR improves this aspect via various efforts.

First, when a detector is running, we only need to scan the current index, not all of the rolled over ones since we are interested in the latest error.
Second, when a detector is disabled, we only need to scan the latest anomaly result indices created before the detector's enable time.
Third, setting track total hits false makes ES terminate search early. ES will not try to count the number of documents and will be able to end the query as soon as N document have been collected per segment.

Testing done:
1. patched a cluster with 1,000 detectors and 2GB anomaly result indices. Without the PR, scanning anomaly result indices 1000 times would timeout after 30 seconds. After the PR, we would not see the timeout.
2. A detector's error message can be on a rotated index. Adds a test case to makes sure we get error info from .opendistro-anomaly-results index that has been rolled over.
  • Loading branch information
kaituo authored May 9, 2020
1 parent 85d2768 commit a40ccf6
Show file tree
Hide file tree
Showing 4 changed files with 297 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,12 @@
import java.security.PrivilegedAction;
import java.time.Clock;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -186,7 +188,14 @@ public List<RestHandler> getRestHandlers(
jobRunner.setAnomalyResultHandler(anomalyResultHandler);
jobRunner.setSettings(settings);

AnomalyDetectorProfileRunner profileRunner = new AnomalyDetectorProfileRunner(client, this.xContentRegistry, this.nodeFilter);
AnomalyDetectorProfileRunner profileRunner = new AnomalyDetectorProfileRunner(
client,
this.xContentRegistry,
this.nodeFilter,
indexNameExpressionResolver,
clusterService,
Calendar.getInstance(TimeZone.getTimeZone("UTC"))
);
RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction(restController, profileRunner);
RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction(
settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,16 @@
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -29,8 +38,11 @@
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParseException;
Expand All @@ -39,6 +51,7 @@
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
Expand All @@ -62,13 +75,26 @@ public class AnomalyDetectorProfileRunner {
private Client client;
private NamedXContentRegistry xContentRegistry;
private DiscoveryNodeFilterer nodeFilter;
private final IndexNameExpressionResolver indexNameExpressionResolver;
static String FAIL_TO_FIND_DETECTOR_MSG = "Fail to find detector with id: ";
static String FAIL_TO_GET_PROFILE_MSG = "Fail to get profile for detector ";

public AnomalyDetectorProfileRunner(Client client, NamedXContentRegistry xContentRegistry, DiscoveryNodeFilterer nodeFilter) {
private final ClusterService clusterService;
private Calendar calendar;

public AnomalyDetectorProfileRunner(
Client client,
NamedXContentRegistry xContentRegistry,
DiscoveryNodeFilterer nodeFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService,
Calendar calendar
) {
this.client = client;
this.xContentRegistry = xContentRegistry;
this.nodeFilter = nodeFilter;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.clusterService = clusterService;
this.calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
}

public void profile(String detectorId, ActionListener<DetectorProfile> listener, Set<ProfileName> profiles) {
Expand Down Expand Up @@ -127,7 +153,7 @@ private void prepareProfile(
profileState(detectorId, enabledTimeMs, listener, job.isEnabled());
}
if (profiles.contains(ProfileName.ERROR)) {
profileError(detectorId, enabledTimeMs, listener);
profileError(detectorId, enabledTimeMs, job.getDisabledTime(), listener);
}

if (profiles.contains(ProfileName.COORDINATING_NODE)
Expand Down Expand Up @@ -208,7 +234,7 @@ private ActionListener<SearchResponse> onInittedEver(
return ActionListener.wrap(searchResponse -> {
SearchHits hits = searchResponse.getHits();
DetectorProfile profile = new DetectorProfile();
if (hits.getTotalHits().value == 0L) {
if (hits.getHits().length == 0L) {
profile.setState(DetectorState.INIT);
} else {
profile.setState(DetectorState.RUNNING);
Expand All @@ -234,20 +260,106 @@ private ActionListener<SearchResponse> onInittedEver(
}

/**
* Precondition:
* 1. Index are rotated with name pattern ".opendistro-anomaly-results-history-{now/d}-1" and now is using UTC.
* 2. Latest entry with error is recorded within enabled and disabled time. Note disabled time can be null.
*
* Error is populated if error of the latest anomaly result is not empty.
*
* Two optimization to avoid scanning all anomaly result indices to get a detector's most recent error
*
* First, when a detector is running, we only need to scan the current index, not all of the rolled over ones
* since we are interested in the latest error.
* Second, when a detector is disabled, we only need to scan the latest anomaly result indices created before the
* detector's enable time.
*
* @param detectorId detector id
* @param enabledTime the time when AD job is enabled in milliseconds
* @param enabledTimeMillis the time when AD job is enabled in milliseconds
* @param listener listener to process the returned error or exception
*/
private void profileError(String detectorId, long enabledTime, MultiResponsesDelegateActionListener<DetectorProfile> listener) {
SearchRequest searchLatestResult = createLatestAnomalyResultRequest(detectorId, enabledTime);
private void profileError(
String detectorId,
long enabledTimeMillis,
Instant disabledTime,
MultiResponsesDelegateActionListener<DetectorProfile> listener
) {
String[] latestIndex = null;

long disabledTimeMillis = 0;
if (disabledTime != null) {
disabledTimeMillis = disabledTime.toEpochMilli();
}
if (enabledTimeMillis > disabledTimeMillis) {
// detector is still running
latestIndex = new String[1];
latestIndex[0] = AnomalyResult.ANOMALY_RESULT_INDEX;
} else {
String[] concreteIndices = indexNameExpressionResolver
.concreteIndexNames(
clusterService.state(),
IndicesOptions.lenientExpandOpen(),
AnomalyDetectionIndices.ALL_AD_RESULTS_INDEX_PATTERN
);

// find the latest from result indices such as .opendistro-anomaly-results-history-2020.04.06-1 and
// /.opendistro-anomaly-results-history-2020.04.07-000002
long maxTimestamp = -1;
TreeMap<Long, List<String>> candidateIndices = new TreeMap<>();
for (String indexName : concreteIndices) {
Matcher m = Pattern.compile("\\.opendistro-anomaly-results-history-(\\d{4})\\.(\\d{2})\\.(\\d{2})-\\d+").matcher(indexName);
if (m.matches()) {
int year = Integer.parseInt(m.group(1));
int month = Integer.parseInt(m.group(2));
int date = Integer.parseInt(m.group(3));
// month starts with 0
calendar.clear();
calendar.set(year, month - 1, date);
// 2020.05.08 is translated to 1588896000000
long timestamp = calendar.getTimeInMillis();

// a candidate index can be created before or after enabled time, but the index is definitely created before disabled
// time
if (timestamp <= disabledTimeMillis && maxTimestamp <= timestamp) {
maxTimestamp = timestamp;
// we can have two rotations on the same day and we don't know which one has our data, so we keep all
List<String> indexList = candidateIndices.computeIfAbsent(timestamp, k -> new ArrayList<String>());
indexList.add(indexName);
}
}
}
List<String> candidates = new ArrayList<String>();
List<String> latestCandidate = candidateIndices.get(maxTimestamp);

if (latestCandidate != null) {
candidates.addAll(latestCandidate);
}

// look back one more index for an edge case:
// Suppose detector interval is 1 minute. Detector last run is at 2020-05-07, 11:59:50 PM,
// then AD result indices rolled over as .opendistro-anomaly-results-history-2020.05.07-001
// Detector next run will be 2020-05-08, 00:00:50 AM. If a user stop the detector at
// 2020-05-08 00:00:10 AM, detector will not have AD result on 2020-05-08.
// We check AD result indices one day earlier to make sure we can always get AD result.
Map.Entry<Long, List<String>> earlierCandidate = candidateIndices.lowerEntry(maxTimestamp);
if (earlierCandidate != null) {
candidates.addAll(earlierCandidate.getValue());
}
latestIndex = candidates.toArray(new String[0]);
}

if (latestIndex == null || latestIndex.length == 0) {
// no result index found: can be due to anomaly result is not created yet or result indices for the detector have been deleted.
listener.onResponse(new DetectorProfile());
return;
}
SearchRequest searchLatestResult = createLatestAnomalyResultRequest(detectorId, enabledTimeMillis, disabledTimeMillis, latestIndex);
client.search(searchLatestResult, onGetLatestAnomalyResult(listener, detectorId));
}

private ActionListener<SearchResponse> onGetLatestAnomalyResult(ActionListener<DetectorProfile> listener, String detectorId) {
return ActionListener.wrap(searchResponse -> {
SearchHits hits = searchResponse.getHits();
if (hits.getTotalHits().value == 0L) {
if (hits.getHits().length == 0L) {
listener.onResponse(new DetectorProfile());
} else {
SearchHit hit = hits.getAt(0);
Expand All @@ -259,12 +371,12 @@ private ActionListener<SearchResponse> onGetLatestAnomalyResult(ActionListener<D
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
AnomalyResult result = parser.namedObject(AnomalyResult.class, AnomalyResult.PARSE_FIELD_NAME, null);

DetectorProfile profile = new DetectorProfile();
if (result.getError() != null) {
profile.setError(result.getError());
}
listener.onResponse(profile);

} catch (IOException | XContentParseException | NullPointerException e) {
logger.error("Fail to parse anomaly result with " + hit.toString());
listener.onFailure(new RuntimeException("Fail to find detector error: " + detectorId, e));
Expand Down Expand Up @@ -292,7 +404,10 @@ private SearchRequest createInittedEverRequest(String detectorId, long enabledTi
filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.EXECUTION_END_TIME_FIELD).gte(enabledTime));
filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.ANOMALY_SCORE_FIELD).gt(0));

SearchSourceBuilder source = new SearchSourceBuilder().query(filterQuery).size(1);
// I am only looking for last 1 occurrence and have no interest in the total number of documents that match the query.
// ES will not try to count the number of documents and will be able to terminate the query as soon as 1 document
// have been collected per segment.
SearchSourceBuilder source = new SearchSourceBuilder().query(filterQuery).size(1).trackTotalHits(false);

SearchRequest request = new SearchRequest(AnomalyResult.ANOMALY_RESULT_INDEX);
request.source(source);
Expand All @@ -305,16 +420,23 @@ private SearchRequest createInittedEverRequest(String detectorId, long enabledTi
* @param enabledTime the time when AD job is enabled in milliseconds
* @return the search request
*/
private SearchRequest createLatestAnomalyResultRequest(String detectorId, long enabledTime) {
private SearchRequest createLatestAnomalyResultRequest(String detectorId, long enabledTime, long disabledTime, String[] index) {
BoolQueryBuilder filterQuery = new BoolQueryBuilder();
filterQuery.filter(QueryBuilders.termQuery(AnomalyResult.DETECTOR_ID_FIELD, detectorId));
filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.EXECUTION_END_TIME_FIELD).gte(enabledTime));
RangeQueryBuilder rangeBuilder = QueryBuilders.rangeQuery(AnomalyResult.EXECUTION_END_TIME_FIELD).gte(enabledTime);
if (disabledTime >= enabledTime) {
rangeBuilder.lte(disabledTime);
}
filterQuery.filter(rangeBuilder);

FieldSortBuilder sortQuery = new FieldSortBuilder(AnomalyResult.EXECUTION_END_TIME_FIELD).order(SortOrder.DESC);

SearchSourceBuilder source = new SearchSourceBuilder().query(filterQuery).size(1).sort(sortQuery);
// I am only looking for last 1 occurrence and have no interest in the total number of documents that match the query.
// ES will not try to count the number of documents and will be able to terminate the query as soon as 1 document
// have been collected per segment.
SearchSourceBuilder source = new SearchSourceBuilder().query(filterQuery).size(1).sort(sortQuery).trackTotalHits(false);

SearchRequest request = new SearchRequest(AnomalyDetectionIndices.ALL_AD_RESULTS_INDEX_PATTERN);
SearchRequest request = new SearchRequest(index);
request.source(source);
return request;
}
Expand Down
Loading

0 comments on commit a40ccf6

Please sign in to comment.