Skip to content

Commit

Permalink
Fix the discrepancy between Profile API and real time tasks API (#770)
Browse files Browse the repository at this point in the history
* Fix the discrepancy between Profile API and real time tasks API

This PR fixes the discrepancy by querying the result index when the total updates is less than 32. We have done similar things in profile API so I refactored reusable code to ProfileUtil. We also cached whether we have queried the result index and won't repeatedly issue the extra query.

Testing done:
1. repeated repro steps in #502 and verified the issue has been resolved.

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo authored Jan 9, 2023
1 parent b49a36b commit 27ed498
Show file tree
Hide file tree
Showing 15 changed files with 642 additions and 117 deletions.
8 changes: 2 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ dependencies {
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${job_scheduler_version}"
implementation "org.opensearch:common-utils:${common_utils_version}"
implementation "org.opensearch.client:opensearch-rest-client:${opensearch_version}"
implementation group: 'com.google.guava', name: 'guava', version:'31.0.1-jre'
implementation group: 'com.google.guava', name: 'failureaccess', version:'1.0.1'
compileOnly group: 'com.google.guava', name: 'guava', version:'31.0.1-jre'
compileOnly group: 'com.google.guava', name: 'failureaccess', version:'1.0.1'
implementation group: 'org.javassist', name: 'javassist', version:'3.28.0-GA'
implementation group: 'org.apache.commons', name: 'commons-math3', version: '3.6.1'
implementation group: 'com.google.code.gson', name: 'gson', version: '2.8.9'
Expand All @@ -121,10 +121,6 @@ dependencies {
implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:3.0-rc3'
implementation 'software.amazon.randomcutforest:randomcutforest-core:3.0-rc3'

// we inherit jackson-core from opensearch core
implementation "com.fasterxml.jackson.core:jackson-databind:2.14.1"
implementation "com.fasterxml.jackson.core:jackson-annotations:2.14.1"

// used for serializing/deserializing rcf models.
implementation group: 'io.protostuff', name: 'protostuff-core', version: '1.8.0'
implementation group: 'io.protostuff', name: 'protostuff-runtime', version: '1.8.0'
Expand Down
2 changes: 0 additions & 2 deletions src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -560,14 +560,12 @@ private void stopAdJob(String detectorId, AnomalyDetectorFunction function) {
}, exception -> { log.error("JobRunner failed to update AD job as disabled for " + detectorId, exception); }));
} else {
log.info("AD Job was disabled for " + detectorId);
// function.execute();
}
} catch (IOException e) {
log.error("JobRunner failed to stop detector job " + detectorId, e);
}
} else {
log.info("AD Job was not found for " + detectorId);
// function.execute();
}
}, exception -> log.error("JobRunner failed to get detector job " + detectorId, exception));

Expand Down
5 changes: 4 additions & 1 deletion src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,10 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
adTaskManager,
nodeFilter,
threadPool,
client
client,
stateManager,
adTaskCacheManager,
AnomalyDetectorSettings.NUM_MIN_SAMPLES
);

// return objects used by Guice to inject dependencies for e.g.,
Expand Down
54 changes: 9 additions & 45 deletions src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.opensearch.ad.model.ADTaskType;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.DetectorProfile;
import org.opensearch.ad.model.DetectorProfileName;
import org.opensearch.ad.model.DetectorState;
Expand All @@ -64,8 +63,6 @@
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHits;
import org.opensearch.search.aggregations.Aggregation;
import org.opensearch.search.aggregations.AggregationBuilder;
Expand Down Expand Up @@ -451,14 +448,15 @@ private void profileMultiEntityDetectorStateRelated(
if (profileResponse.getTotalUpdates() < requiredSamples) {
// need to double check since what ProfileResponse returns is the highest priority entity currently in memory, but
// another entity might have already been initialized and sit somewhere else (in memory or on disk).
confirmMultiEntityDetectorInitStatus(
detector,
job.getEnabledTime().toEpochMilli(),
profileBuilder,
profilesToCollect,
profileResponse.getTotalUpdates(),
listener
);
long enabledTime = job.getEnabledTime().toEpochMilli();
long totalUpdates = profileResponse.getTotalUpdates();
ProfileUtil
.confirmDetectorRealtimeInitStatus(
detector,
enabledTime,
client,
onInittedEver(enabledTime, profileBuilder, profilesToCollect, detector, totalUpdates, listener)
);
} else {
createRunningStateAndInitProgress(profilesToCollect, profileBuilder);
listener.onResponse(profileBuilder.build());
Expand All @@ -471,18 +469,6 @@ private void profileMultiEntityDetectorStateRelated(
}
}

private void confirmMultiEntityDetectorInitStatus(
AnomalyDetector detector,
long enabledTime,
DetectorProfile.Builder profile,
Set<DetectorProfileName> profilesToCollect,
long totalUpdates,
MultiResponsesDelegateActionListener<DetectorProfile> listener
) {
SearchRequest searchLatestResult = createInittedEverRequest(detector.getDetectorId(), enabledTime, detector.getResultIndex());
client.search(searchLatestResult, onInittedEver(enabledTime, profile, profilesToCollect, detector, totalUpdates, listener));
}

private ActionListener<SearchResponse> onInittedEver(
long lastUpdateTimeMs,
DetectorProfile.Builder profileBuilder,
Expand Down Expand Up @@ -602,26 +588,4 @@ private void processInitResponse(

listener.onResponse(builder.build());
}

/**
* Create search request to check if we have at least 1 anomaly score larger than 0 after AD job enabled time
* @param detectorId detector id
* @param enabledTime the time when AD job is enabled in milliseconds
* @return the search request
*/
private SearchRequest createInittedEverRequest(String detectorId, long enabledTime, String resultIndex) {
BoolQueryBuilder filterQuery = new BoolQueryBuilder();
filterQuery.filter(QueryBuilders.termQuery(AnomalyResult.DETECTOR_ID_FIELD, detectorId));
filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.EXECUTION_END_TIME_FIELD).gte(enabledTime));
filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.ANOMALY_SCORE_FIELD).gt(0));

SearchSourceBuilder source = new SearchSourceBuilder().query(filterQuery).size(1);

SearchRequest request = new SearchRequest(CommonName.ANOMALY_RESULT_INDEX_ALIAS);
request.source(source);
if (resultIndex != null) {
request.indices(resultIndex);
}
return request;
}
}
134 changes: 118 additions & 16 deletions src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.ad.common.exception.AnomalyDetectionException;
import org.opensearch.ad.common.exception.EndRunException;
import org.opensearch.ad.common.exception.ResourceNotFoundException;
import org.opensearch.ad.constant.CommonErrorMessages;
Expand All @@ -32,6 +34,7 @@
import org.opensearch.ad.model.DetectorProfileName;
import org.opensearch.ad.model.FeatureData;
import org.opensearch.ad.model.IntervalTimeConfiguration;
import org.opensearch.ad.task.ADTaskCacheManager;
import org.opensearch.ad.task.ADTaskManager;
import org.opensearch.ad.transport.AnomalyResultResponse;
import org.opensearch.ad.transport.ProfileAction;
Expand All @@ -40,10 +43,12 @@
import org.opensearch.ad.transport.RCFPollingRequest;
import org.opensearch.ad.transport.handler.AnomalyIndexHandler;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.commons.authuser.User;
import org.opensearch.search.SearchHits;
import org.opensearch.threadpool.ThreadPool;

public class ExecuteADResultResponseRecorder {
Expand All @@ -55,21 +60,30 @@ public class ExecuteADResultResponseRecorder {
private DiscoveryNodeFilterer nodeFilter;
private ThreadPool threadPool;
private Client client;
private NodeStateManager nodeStateManager;
private ADTaskCacheManager adTaskCacheManager;
private int rcfMinSamples;

public ExecuteADResultResponseRecorder(
AnomalyDetectionIndices anomalyDetectionIndices,
AnomalyIndexHandler<AnomalyResult> anomalyResultHandler,
ADTaskManager adTaskManager,
DiscoveryNodeFilterer nodeFilter,
ThreadPool threadPool,
Client client
Client client,
NodeStateManager nodeStateManager,
ADTaskCacheManager adTaskCacheManager,
int rcfMinSamples
) {
this.anomalyDetectionIndices = anomalyDetectionIndices;
this.anomalyResultHandler = anomalyResultHandler;
this.adTaskManager = adTaskManager;
this.nodeFilter = nodeFilter;
this.threadPool = threadPool;
this.client = client;
this.nodeStateManager = nodeStateManager;
this.adTaskCacheManager = adTaskCacheManager;
this.rcfMinSamples = rcfMinSamples;
}

public void indexAnomalyResult(
Expand Down Expand Up @@ -185,27 +199,66 @@ private void updateLatestRealtimeTask(
String error
) {
// Don't need info as this will be printed repeatedly in each interval
adTaskManager
.updateLatestRealtimeTaskOnCoordinatingNode(
ActionListener<UpdateResponse> listener = ActionListener.wrap(r -> {
if (r != null) {
log.debug("Updated latest realtime task successfully for detector {}, taskState: {}", detectorId, taskState);
}
}, e -> {
if ((e instanceof ResourceNotFoundException) && e.getMessage().contains(CAN_NOT_FIND_LATEST_TASK)) {
// Clear realtime task cache, will recreate AD task in next run, check AnomalyResultTransportAction.
log.error("Can't find latest realtime task of detector " + detectorId);
adTaskManager.removeRealtimeTaskCache(detectorId);
} else {
log.error("Failed to update latest realtime task for detector " + detectorId, e);
}
});

// rcfTotalUpdates is null when we save exception messages
if (!adTaskCacheManager.hasQueriedResultIndex(detectorId) && rcfTotalUpdates != null && rcfTotalUpdates < rcfMinSamples) {
// confirm the total updates number since it is possible that we have already had results after job enabling time
// If yes, total updates should be at least rcfMinSamples so that the init progress reaches 100%.
confirmTotalRCFUpdatesFound(
detectorId,
taskState,
rcfTotalUpdates,
detectorIntervalInMinutes,
error,
ActionListener.wrap(r -> {
if (r != null) {
log.debug("Updated latest realtime task successfully for detector {}, taskState: {}", detectorId, taskState);
}
}, e -> {
if ((e instanceof ResourceNotFoundException) && e.getMessage().contains(CAN_NOT_FIND_LATEST_TASK)) {
// Clear realtime task cache, will recreate AD task in next run, check AnomalyResultTransportAction.
log.error("Can't find latest realtime task of detector " + detectorId);
adTaskManager.removeRealtimeTaskCache(detectorId);
} else {
log.error("Failed to update latest realtime task for detector " + detectorId, e);
}
})
ActionListener
.wrap(
r -> adTaskManager
.updateLatestRealtimeTaskOnCoordinatingNode(
detectorId,
taskState,
r,
detectorIntervalInMinutes,
error,
listener
),
e -> {
log.error("Fail to confirm rcf update", e);
adTaskManager
.updateLatestRealtimeTaskOnCoordinatingNode(
detectorId,
taskState,
rcfTotalUpdates,
detectorIntervalInMinutes,
error,
listener
);
}
)
);
} else {
adTaskManager
.updateLatestRealtimeTaskOnCoordinatingNode(
detectorId,
taskState,
rcfTotalUpdates,
detectorIntervalInMinutes,
error,
listener
);
}
}

/**
Expand Down Expand Up @@ -285,4 +338,53 @@ public void indexAnomalyResultException(
}
}

private void confirmTotalRCFUpdatesFound(
String detectorId,
String taskState,
Long rcfTotalUpdates,
Long detectorIntervalInMinutes,
String error,
ActionListener<Long> listener
) {
nodeStateManager.getAnomalyDetector(detectorId, ActionListener.wrap(detectorOptional -> {
if (!detectorOptional.isPresent()) {
listener.onFailure(new AnomalyDetectionException(detectorId, "fail to get detector"));
return;
}
nodeStateManager.getAnomalyDetectorJob(detectorId, ActionListener.wrap(jobOptional -> {
if (!jobOptional.isPresent()) {
listener.onFailure(new AnomalyDetectionException(detectorId, "fail to get job"));
return;
}

ProfileUtil
.confirmDetectorRealtimeInitStatus(
detectorOptional.get(),
jobOptional.get().getEnabledTime().toEpochMilli(),
client,
ActionListener.wrap(searchResponse -> {
ActionListener.completeWith(listener, () -> {
SearchHits hits = searchResponse.getHits();
Long correctedTotalUpdates = rcfTotalUpdates;
if (hits.getTotalHits().value > 0L) {
// correct the number if we have already had results after job enabling time
// so that the detector won't stay initialized
correctedTotalUpdates = Long.valueOf(rcfMinSamples);
}
adTaskCacheManager.markResultIndexQueried(detectorId);
return correctedTotalUpdates;
});
}, exception -> {
if (ExceptionUtil.isIndexNotAvailable(exception)) {
// anomaly result index is not created yet
adTaskCacheManager.markResultIndexQueried(detectorId);
listener.onResponse(0L);
} else {
listener.onFailure(exception);
}
})
);
}, e -> listener.onFailure(new AnomalyDetectionException(detectorId, "fail to get job"))));
}, e -> listener.onFailure(new AnomalyDetectionException(detectorId, "fail to get detector"))));
}
}
Loading

0 comments on commit 27ed498

Please sign in to comment.