Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Improve profile API's error fetching efficiency #117

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,12 @@
import java.security.PrivilegedAction;
import java.time.Clock;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -186,7 +188,14 @@ public List<RestHandler> getRestHandlers(
jobRunner.setAnomalyResultHandler(anomalyResultHandler);
jobRunner.setSettings(settings);

AnomalyDetectorProfileRunner profileRunner = new AnomalyDetectorProfileRunner(client, this.xContentRegistry, this.nodeFilter);
AnomalyDetectorProfileRunner profileRunner = new AnomalyDetectorProfileRunner(
client,
this.xContentRegistry,
this.nodeFilter,
indexNameExpressionResolver,
clusterService,
Calendar.getInstance(TimeZone.getTimeZone("UTC"))
);
RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction(restController, profileRunner);
RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction(
settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,16 @@
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -29,8 +38,11 @@
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParseException;
Expand All @@ -39,6 +51,7 @@
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
Expand All @@ -62,13 +75,26 @@ public class AnomalyDetectorProfileRunner {
private Client client;
private NamedXContentRegistry xContentRegistry;
private DiscoveryNodeFilterer nodeFilter;
private final IndexNameExpressionResolver indexNameExpressionResolver;
static String FAIL_TO_FIND_DETECTOR_MSG = "Fail to find detector with id: ";
static String FAIL_TO_GET_PROFILE_MSG = "Fail to get profile for detector ";

public AnomalyDetectorProfileRunner(Client client, NamedXContentRegistry xContentRegistry, DiscoveryNodeFilterer nodeFilter) {
private final ClusterService clusterService;
private Calendar calendar;

public AnomalyDetectorProfileRunner(
Client client,
NamedXContentRegistry xContentRegistry,
DiscoveryNodeFilterer nodeFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService,
Calendar calendar
) {
this.client = client;
this.xContentRegistry = xContentRegistry;
this.nodeFilter = nodeFilter;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.clusterService = clusterService;
this.calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
}

public void profile(String detectorId, ActionListener<DetectorProfile> listener, Set<ProfileName> profiles) {
Expand Down Expand Up @@ -127,7 +153,7 @@ private void prepareProfile(
profileState(detectorId, enabledTimeMs, listener, job.isEnabled());
}
if (profiles.contains(ProfileName.ERROR)) {
profileError(detectorId, enabledTimeMs, listener);
profileError(detectorId, enabledTimeMs, job.getDisabledTime(), listener);
}

if (profiles.contains(ProfileName.COORDINATING_NODE)
Expand Down Expand Up @@ -208,7 +234,7 @@ private ActionListener<SearchResponse> onInittedEver(
return ActionListener.wrap(searchResponse -> {
SearchHits hits = searchResponse.getHits();
DetectorProfile profile = new DetectorProfile();
if (hits.getTotalHits().value == 0L) {
if (hits.getHits().length == 0L) {
profile.setState(DetectorState.INIT);
} else {
profile.setState(DetectorState.RUNNING);
Expand All @@ -234,20 +260,81 @@ private ActionListener<SearchResponse> onInittedEver(
}

/**
* Precondition:
* 1. Index are rotated with name pattern ".opendistro-anomaly-results-history-{now/d}-1" and now is using UTC.
* 2. Latest entry with error is recorded within enabled and disabled time. Note disabled time can be null.
*
* Error is populated if error of the latest anomaly result is not empty.
* @param detectorId detector id
* @param enabledTime the time when AD job is enabled in milliseconds
* @param enabledTimeMillis the time when AD job is enabled in milliseconds
* @param listener listener to process the returned error or exception
*/
private void profileError(String detectorId, long enabledTime, MultiResponsesDelegateActionListener<DetectorProfile> listener) {
SearchRequest searchLatestResult = createLatestAnomalyResultRequest(detectorId, enabledTime);
private void profileError(
String detectorId,
long enabledTimeMillis,
Instant disabledTime,
MultiResponsesDelegateActionListener<DetectorProfile> listener
) {
String[] latestIndex = null;

long disabledTimeMillis = 0;
if (disabledTime != null) {
disabledTimeMillis = disabledTime.toEpochMilli();
}
if (enabledTimeMillis > disabledTimeMillis) {
// detector is still running
latestIndex = new String[1];
latestIndex[0] = AnomalyResult.ANOMALY_RESULT_INDEX;
} else {
String[] concreteIndices = indexNameExpressionResolver
.concreteIndexNames(
clusterService.state(),
IndicesOptions.lenientExpandOpen(),
AnomalyDetectionIndices.ALL_AD_RESULTS_INDEX_PATTERN
);

// find the latest from result indices such as .opendistro-anomaly-results-history-2020.04.06-1 and
// /.opendistro-anomaly-results-history-2020.04.07-000002
long maxTimestamp = -1;
Map<Long, List<String>> candidateIndices = new HashMap<>();
for (String indexName : concreteIndices) {
Matcher m = Pattern.compile("\\.opendistro-anomaly-results-history-(\\d{4})\\.(\\d{2})\\.(\\d{2})-\\d+").matcher(indexName);
if (m.matches()) {
int year = Integer.parseInt(m.group(1));
int month = Integer.parseInt(m.group(2));
int date = Integer.parseInt(m.group(3));
// month starts with 0
calendar.clear();
calendar.set(year, month - 1, date);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will Jan. cause any issue? I guess in case of Jan., month is 1, not sure if this can cause any issue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest you use current year/month/date to initialize calendar, and do calendar.add(Calendar.MONTH, -1) instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will Jan. cause any issue? I guess in case of Jan., month is 1, not sure if this can cause any issue

just checked java doc. month for Jan is 0 here.

// 2020.05.08 is translated to 1588896000000
long timestamp = calendar.getTimeInMillis();

// a candidate index can be created before or after enabled time, but the index is definitely created before disabled
// time
if (timestamp <= disabledTimeMillis && maxTimestamp <= timestamp) {
maxTimestamp = timestamp;
// we can have two rotations on the same day and we don't know which one has our data, so we keep all
Copy link
Contributor

@ylwu-amzn ylwu-amzn May 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One edge case: suppose detector interval is 1 minute
1.Detector last run is at 2020-05-07, 11:59:50 PM, then AD result indices rolled over as .opendistro-anomaly-results-history-2020.05.07-001
2.Detector next run will be 2020-05-08, 00:00:50 AM. If user stop the detector at 2020-05-08 00:00:10 AM, detector will not have AD result on 2020-05-08.
So this code change will check latest AD result index on 2020-05-08, as 2020-05-08 <= 2020-05-08 00:00:10 AM(disabledTime). But we can't find any AD result for this detector on 2020-05-08. How about we check last two days' AD result indices to make sure we can always get AD result? Similar to set monitor interval as 2*detector_interval

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Changed.

List<String> indexList = candidateIndices.computeIfAbsent(timestamp, k -> new ArrayList<String>());
indexList.add(indexName);
}
}
}
latestIndex = candidateIndices.getOrDefault(maxTimestamp, new ArrayList<String>()).toArray(new String[0]);
}

if (latestIndex == null || latestIndex.length == 0) {
// no result index found: can be due to anomaly result is not created yet or result indices for the detector have been deleted.
listener.onResponse(new DetectorProfile());
return;
}
SearchRequest searchLatestResult = createLatestAnomalyResultRequest(detectorId, enabledTimeMillis, disabledTimeMillis, latestIndex);
client.search(searchLatestResult, onGetLatestAnomalyResult(listener, detectorId));
}

private ActionListener<SearchResponse> onGetLatestAnomalyResult(ActionListener<DetectorProfile> listener, String detectorId) {
return ActionListener.wrap(searchResponse -> {
SearchHits hits = searchResponse.getHits();
if (hits.getTotalHits().value == 0L) {
if (hits.getHits().length == 0L) {
listener.onResponse(new DetectorProfile());
} else {
SearchHit hit = hits.getAt(0);
Expand All @@ -259,12 +346,12 @@ private ActionListener<SearchResponse> onGetLatestAnomalyResult(ActionListener<D
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
AnomalyResult result = parser.namedObject(AnomalyResult.class, AnomalyResult.PARSE_FIELD_NAME, null);

DetectorProfile profile = new DetectorProfile();
if (result.getError() != null) {
profile.setError(result.getError());
}
listener.onResponse(profile);

} catch (IOException | XContentParseException | NullPointerException e) {
logger.error("Fail to parse anomaly result with " + hit.toString());
listener.onFailure(new RuntimeException("Fail to find detector error: " + detectorId, e));
Expand Down Expand Up @@ -292,7 +379,10 @@ private SearchRequest createInittedEverRequest(String detectorId, long enabledTi
filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.EXECUTION_END_TIME_FIELD).gte(enabledTime));
filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.ANOMALY_SCORE_FIELD).gt(0));

SearchSourceBuilder source = new SearchSourceBuilder().query(filterQuery).size(1);
// I am only looking for last 1 occurrence and have no interest in the total number of documents that match the query.
// ES will not try to count the number of documents and will be able to terminate the query as soon as 1 document
// have been collected per segment.
SearchSourceBuilder source = new SearchSourceBuilder().query(filterQuery).size(1).trackTotalHits(false);

SearchRequest request = new SearchRequest(AnomalyResult.ANOMALY_RESULT_INDEX);
request.source(source);
Expand All @@ -305,16 +395,23 @@ private SearchRequest createInittedEverRequest(String detectorId, long enabledTi
* @param enabledTime the time when AD job is enabled in milliseconds
* @return the search request
*/
private SearchRequest createLatestAnomalyResultRequest(String detectorId, long enabledTime) {
private SearchRequest createLatestAnomalyResultRequest(String detectorId, long enabledTime, long disabledTime, String[] index) {
BoolQueryBuilder filterQuery = new BoolQueryBuilder();
filterQuery.filter(QueryBuilders.termQuery(AnomalyResult.DETECTOR_ID_FIELD, detectorId));
filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.EXECUTION_END_TIME_FIELD).gte(enabledTime));
RangeQueryBuilder rangeBuilder = QueryBuilders.rangeQuery(AnomalyResult.EXECUTION_END_TIME_FIELD).gte(enabledTime);
if (disabledTime >= enabledTime) {
rangeBuilder.lte(disabledTime);
}
filterQuery.filter(rangeBuilder);

FieldSortBuilder sortQuery = new FieldSortBuilder(AnomalyResult.EXECUTION_END_TIME_FIELD).order(SortOrder.DESC);

SearchSourceBuilder source = new SearchSourceBuilder().query(filterQuery).size(1).sort(sortQuery);
// I am only looking for last 1 occurrence and have no interest in the total number of documents that match the query.
// ES will not try to count the number of documents and will be able to terminate the query as soon as 1 document
// have been collected per segment.
SearchSourceBuilder source = new SearchSourceBuilder().query(filterQuery).size(1).sort(sortQuery).trackTotalHits(false);

SearchRequest request = new SearchRequest(AnomalyDetectionIndices.ALL_AD_RESULTS_INDEX_PATTERN);
SearchRequest request = new SearchRequest(index);
request.source(source);
return request;
}
Expand Down
Loading