Skip to content

Commit

Permalink
Fix the discrepancy between Profile API and real time tasks API
Browse files Browse the repository at this point in the history
This PR fixes the discrepancy by querying the result index when the total updates is less than 32. We have done similar things in profile API so I refactored reusable code to ProfileUtil. We also cached whether we have queried the result index and won't repeatedly issue the extra query.

Testing done:
1. repeated repro steps in #502 and verified the issue has been resolved.

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo authored and amitgalitz committed Jan 9, 2023
1 parent 1e8f858 commit 5559f40
Show file tree
Hide file tree
Showing 16 changed files with 646 additions and 113 deletions.
2 changes: 0 additions & 2 deletions src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -560,14 +560,12 @@ private void stopAdJob(String detectorId, AnomalyDetectorFunction function) {
}, exception -> { log.error("JobRunner failed to update AD job as disabled for " + detectorId, exception); }));
} else {
log.info("AD Job was disabled for " + detectorId);
// function.execute();
}
} catch (IOException e) {
log.error("JobRunner failed to stop detector job " + detectorId, e);
}
} else {
log.info("AD Job was not found for " + detectorId);
// function.execute();
}
}, exception -> log.error("JobRunner failed to get detector job " + detectorId, exception));

Expand Down
5 changes: 4 additions & 1 deletion src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,10 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
adTaskManager,
nodeFilter,
threadPool,
client
client,
stateManager,
adTaskCacheManager,
AnomalyDetectorSettings.NUM_MIN_SAMPLES
);

// return objects used by Guice to inject dependencies for e.g.,
Expand Down
54 changes: 9 additions & 45 deletions src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.opensearch.ad.model.ADTaskType;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.DetectorProfile;
import org.opensearch.ad.model.DetectorProfileName;
import org.opensearch.ad.model.DetectorState;
Expand All @@ -64,8 +63,6 @@
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHits;
import org.opensearch.search.aggregations.Aggregation;
import org.opensearch.search.aggregations.AggregationBuilder;
Expand Down Expand Up @@ -451,14 +448,15 @@ private void profileMultiEntityDetectorStateRelated(
if (profileResponse.getTotalUpdates() < requiredSamples) {
// need to double check since what ProfileResponse returns is the highest priority entity currently in memory, but
// another entity might have already been initialized and sit somewhere else (in memory or on disk).
confirmMultiEntityDetectorInitStatus(
detector,
job.getEnabledTime().toEpochMilli(),
profileBuilder,
profilesToCollect,
profileResponse.getTotalUpdates(),
listener
);
long enabledTime = job.getEnabledTime().toEpochMilli();
long totalUpdates = profileResponse.getTotalUpdates();
ProfileUtil
.confirmDetectorRealtimeInitStatus(
detector,
enabledTime,
client,
onInittedEver(enabledTime, profileBuilder, profilesToCollect, detector, totalUpdates, listener)
);
} else {
createRunningStateAndInitProgress(profilesToCollect, profileBuilder);
listener.onResponse(profileBuilder.build());
Expand All @@ -471,18 +469,6 @@ private void profileMultiEntityDetectorStateRelated(
}
}

private void confirmMultiEntityDetectorInitStatus(
AnomalyDetector detector,
long enabledTime,
DetectorProfile.Builder profile,
Set<DetectorProfileName> profilesToCollect,
long totalUpdates,
MultiResponsesDelegateActionListener<DetectorProfile> listener
) {
SearchRequest searchLatestResult = createInittedEverRequest(detector.getDetectorId(), enabledTime, detector.getResultIndex());
client.search(searchLatestResult, onInittedEver(enabledTime, profile, profilesToCollect, detector, totalUpdates, listener));
}

private ActionListener<SearchResponse> onInittedEver(
long lastUpdateTimeMs,
DetectorProfile.Builder profileBuilder,
Expand Down Expand Up @@ -602,26 +588,4 @@ private void processInitResponse(

listener.onResponse(builder.build());
}

/**
* Create search request to check if we have at least 1 anomaly score larger than 0 after AD job enabled time
* @param detectorId detector id
* @param enabledTime the time when AD job is enabled in milliseconds
* @return the search request
*/
private SearchRequest createInittedEverRequest(String detectorId, long enabledTime, String resultIndex) {
BoolQueryBuilder filterQuery = new BoolQueryBuilder();
filterQuery.filter(QueryBuilders.termQuery(AnomalyResult.DETECTOR_ID_FIELD, detectorId));
filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.EXECUTION_END_TIME_FIELD).gte(enabledTime));
filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.ANOMALY_SCORE_FIELD).gt(0));

SearchSourceBuilder source = new SearchSourceBuilder().query(filterQuery).size(1);

SearchRequest request = new SearchRequest(CommonName.ANOMALY_RESULT_INDEX_ALIAS);
request.source(source);
if (resultIndex != null) {
request.indices(resultIndex);
}
return request;
}
}
134 changes: 118 additions & 16 deletions src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.ad.common.exception.AnomalyDetectionException;
import org.opensearch.ad.common.exception.EndRunException;
import org.opensearch.ad.common.exception.ResourceNotFoundException;
import org.opensearch.ad.constant.CommonErrorMessages;
Expand All @@ -32,6 +34,7 @@
import org.opensearch.ad.model.DetectorProfileName;
import org.opensearch.ad.model.FeatureData;
import org.opensearch.ad.model.IntervalTimeConfiguration;
import org.opensearch.ad.task.ADTaskCacheManager;
import org.opensearch.ad.task.ADTaskManager;
import org.opensearch.ad.transport.AnomalyResultResponse;
import org.opensearch.ad.transport.ProfileAction;
Expand All @@ -40,10 +43,12 @@
import org.opensearch.ad.transport.RCFPollingRequest;
import org.opensearch.ad.transport.handler.AnomalyIndexHandler;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.commons.authuser.User;
import org.opensearch.search.SearchHits;
import org.opensearch.threadpool.ThreadPool;

public class ExecuteADResultResponseRecorder {
Expand All @@ -55,21 +60,30 @@ public class ExecuteADResultResponseRecorder {
private DiscoveryNodeFilterer nodeFilter;
private ThreadPool threadPool;
private Client client;
private NodeStateManager nodeStateManager;
private ADTaskCacheManager adTaskCacheManager;
private int rcfMinSamples;

public ExecuteADResultResponseRecorder(
AnomalyDetectionIndices anomalyDetectionIndices,
AnomalyIndexHandler<AnomalyResult> anomalyResultHandler,
ADTaskManager adTaskManager,
DiscoveryNodeFilterer nodeFilter,
ThreadPool threadPool,
Client client
Client client,
NodeStateManager nodeStateManager,
ADTaskCacheManager adTaskCacheManager,
int rcfMinSamples
) {
this.anomalyDetectionIndices = anomalyDetectionIndices;
this.anomalyResultHandler = anomalyResultHandler;
this.adTaskManager = adTaskManager;
this.nodeFilter = nodeFilter;
this.threadPool = threadPool;
this.client = client;
this.nodeStateManager = nodeStateManager;
this.adTaskCacheManager = adTaskCacheManager;
this.rcfMinSamples = rcfMinSamples;
}

public void indexAnomalyResult(
Expand Down Expand Up @@ -185,27 +199,66 @@ private void updateLatestRealtimeTask(
String error
) {
// Don't need info as this will be printed repeatedly in each interval
adTaskManager
.updateLatestRealtimeTaskOnCoordinatingNode(
ActionListener<UpdateResponse> listener = ActionListener.wrap(r -> {
if (r != null) {
log.debug("Updated latest realtime task successfully for detector {}, taskState: {}", detectorId, taskState);
}
}, e -> {
if ((e instanceof ResourceNotFoundException) && e.getMessage().contains(CAN_NOT_FIND_LATEST_TASK)) {
// Clear realtime task cache, will recreate AD task in next run, check AnomalyResultTransportAction.
log.error("Can't find latest realtime task of detector " + detectorId);
adTaskManager.removeRealtimeTaskCache(detectorId);
} else {
log.error("Failed to update latest realtime task for detector " + detectorId, e);
}
});

// rcfTotalUpdates is null when we save exception messages
if (!adTaskCacheManager.hasQueriedResultIndex(detectorId) && rcfTotalUpdates != null && rcfTotalUpdates < rcfMinSamples) {
// confirm the total updates number since it is possible that we have already had results after job enabling time
// If yes, total updates should be at least rcfMinSamples so that the init progress reaches 100%.
confirmTotalRCFUpdatesFound(
detectorId,
taskState,
rcfTotalUpdates,
detectorIntervalInMinutes,
error,
ActionListener.wrap(r -> {
if (r != null) {
log.debug("Updated latest realtime task successfully for detector {}, taskState: {}", detectorId, taskState);
}
}, e -> {
if ((e instanceof ResourceNotFoundException) && e.getMessage().contains(CAN_NOT_FIND_LATEST_TASK)) {
// Clear realtime task cache, will recreate AD task in next run, check AnomalyResultTransportAction.
log.error("Can't find latest realtime task of detector " + detectorId);
adTaskManager.removeRealtimeTaskCache(detectorId);
} else {
log.error("Failed to update latest realtime task for detector " + detectorId, e);
}
})
ActionListener
.wrap(
r -> adTaskManager
.updateLatestRealtimeTaskOnCoordinatingNode(
detectorId,
taskState,
r,
detectorIntervalInMinutes,
error,
listener
),
e -> {
log.error("Fail to confirm rcf update", e);
adTaskManager
.updateLatestRealtimeTaskOnCoordinatingNode(
detectorId,
taskState,
rcfTotalUpdates,
detectorIntervalInMinutes,
error,
listener
);
}
)
);
} else {
adTaskManager
.updateLatestRealtimeTaskOnCoordinatingNode(
detectorId,
taskState,
rcfTotalUpdates,
detectorIntervalInMinutes,
error,
listener
);
}
}

/**
Expand Down Expand Up @@ -285,4 +338,53 @@ public void indexAnomalyResultException(
}
}

private void confirmTotalRCFUpdatesFound(
String detectorId,
String taskState,
Long rcfTotalUpdates,
Long detectorIntervalInMinutes,
String error,
ActionListener<Long> listener
) {
nodeStateManager.getAnomalyDetector(detectorId, ActionListener.wrap(detectorOptional -> {
if (!detectorOptional.isPresent()) {
listener.onFailure(new AnomalyDetectionException(detectorId, "fail to get detector"));
return;
}
nodeStateManager.getAnomalyDetectorJob(detectorId, ActionListener.wrap(jobOptional -> {
if (!jobOptional.isPresent()) {
listener.onFailure(new AnomalyDetectionException(detectorId, "fail to get job"));
return;
}

ProfileUtil
.confirmDetectorRealtimeInitStatus(
detectorOptional.get(),
jobOptional.get().getEnabledTime().toEpochMilli(),
client,
ActionListener.wrap(searchResponse -> {
ActionListener.completeWith(listener, () -> {
SearchHits hits = searchResponse.getHits();
Long correctedTotalUpdates = rcfTotalUpdates;
if (hits.getTotalHits().value > 0L) {
// correct the number if we have already had results after job enabling time
// so that the detector won't stay initialized
correctedTotalUpdates = Long.valueOf(rcfMinSamples);
}
adTaskCacheManager.markResultIndexQueried(detectorId);
return correctedTotalUpdates;
});
}, exception -> {
if (ExceptionUtil.isIndexNotAvailable(exception)) {
// anomaly result index is not created yet
adTaskCacheManager.markResultIndexQueried(detectorId);
listener.onResponse(0L);
} else {
listener.onFailure(exception);
}
})
);
}, e -> listener.onFailure(new AnomalyDetectionException(detectorId, "fail to get job"))));
}, e -> listener.onFailure(new AnomalyDetectionException(detectorId, "fail to get detector"))));
}
}
68 changes: 68 additions & 0 deletions src/main/java/org/opensearch/ad/ProfileUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.ad;

import org.opensearch.action.ActionListener;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.client.Client;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.ExistsQueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;

public class ProfileUtil {
/**
* Create search request to check if we have at least 1 anomaly score larger than 0 after AD job enabled time.
* Note this function is only meant to check for status of real time analysis.
*
* @param detectorId detector id
* @param enabledTime the time when AD job is enabled in milliseconds
* @return the search request
*/
private static SearchRequest createRealtimeInittedEverRequest(String detectorId, long enabledTime, String resultIndex) {
BoolQueryBuilder filterQuery = new BoolQueryBuilder();
filterQuery.filter(QueryBuilders.termQuery(AnomalyResult.DETECTOR_ID_FIELD, detectorId));
filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.EXECUTION_END_TIME_FIELD).gte(enabledTime));
filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.ANOMALY_SCORE_FIELD).gt(0));
// Historical analysis result also stored in result index, which has non-null task_id.
// For realtime detection result, we should filter task_id == null
ExistsQueryBuilder taskIdExistsFilter = QueryBuilders.existsQuery(AnomalyResult.TASK_ID_FIELD);
filterQuery.mustNot(taskIdExistsFilter);

SearchSourceBuilder source = new SearchSourceBuilder().query(filterQuery).size(1);

SearchRequest request = new SearchRequest(CommonName.ANOMALY_RESULT_INDEX_ALIAS);
request.source(source);
if (resultIndex != null) {
request.indices(resultIndex);
}
return request;
}

public static void confirmDetectorRealtimeInitStatus(
AnomalyDetector detector,
long enabledTime,
Client client,
ActionListener<SearchResponse> listener
) {
SearchRequest searchLatestResult = createRealtimeInittedEverRequest(
detector.getDetectorId(),
enabledTime,
detector.getResultIndex()
);
client.search(searchLatestResult, listener);
}
}
2 changes: 2 additions & 0 deletions src/main/java/org/opensearch/ad/constant/CommonName.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public class CommonName {

// The alias of the index in which to write AD result history
public static final String ANOMALY_RESULT_INDEX_ALIAS = ".opendistro-anomaly-results";
// index pattern matching to all ad result indices
public static final String ANOMALY_RESULT_INDEX_ALL = ".opendistro-anomaly-results-history*";

// ======================================
// Format name
Expand Down
Loading

0 comments on commit 5559f40

Please sign in to comment.