Skip to content

Commit

Permalink
Fix the discrepancy between Profile API and real time tasks API
Browse files Browse the repository at this point in the history
This PR fixes the discrepancy by querying the result index when the total updates is less than 32. We have done similar things in profile API so I refactored reusable code to ProfileUtil. We also cached whether we have queried the result index and won't repeatedly issue the extra query.

Testing done:
1. repeated repro steps in opensearch-project#502 and verified the issue has been resolved.

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Jan 5, 2023
1 parent 20d2c5f commit 5dbc2f2
Show file tree
Hide file tree
Showing 13 changed files with 564 additions and 96 deletions.
2 changes: 0 additions & 2 deletions src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -560,14 +560,12 @@ private void stopAdJob(String detectorId, AnomalyDetectorFunction function) {
}, exception -> { log.error("JobRunner failed to update AD job as disabled for " + detectorId, exception); }));
} else {
log.info("AD Job was disabled for " + detectorId);
// function.execute();
}
} catch (IOException e) {
log.error("JobRunner failed to stop detector job " + detectorId, e);
}
} else {
log.info("AD Job was not found for " + detectorId);
// function.execute();
}
}, exception -> log.error("JobRunner failed to get detector job " + detectorId, exception));

Expand Down
5 changes: 4 additions & 1 deletion src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,10 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
adTaskManager,
nodeFilter,
threadPool,
client
client,
stateManager,
adTaskCacheManager,
AnomalyDetectorSettings.NUM_MIN_SAMPLES
);

// return objects used by Guice to inject dependencies for e.g.,
Expand Down
54 changes: 9 additions & 45 deletions src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.opensearch.ad.model.ADTaskType;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.DetectorProfile;
import org.opensearch.ad.model.DetectorProfileName;
import org.opensearch.ad.model.DetectorState;
Expand All @@ -64,8 +63,6 @@
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHits;
import org.opensearch.search.aggregations.Aggregation;
import org.opensearch.search.aggregations.AggregationBuilder;
Expand Down Expand Up @@ -451,14 +448,15 @@ private void profileMultiEntityDetectorStateRelated(
if (profileResponse.getTotalUpdates() < requiredSamples) {
// need to double check since what ProfileResponse returns is the highest priority entity currently in memory, but
// another entity might have already been initialized and sit somewhere else (in memory or on disk).
confirmMultiEntityDetectorInitStatus(
detector,
job.getEnabledTime().toEpochMilli(),
profileBuilder,
profilesToCollect,
profileResponse.getTotalUpdates(),
listener
);
long enabledTime = job.getEnabledTime().toEpochMilli();
long totalUpdates = profileResponse.getTotalUpdates();
ProfileUtil
.confirmDetectorInitStatus(
detector,
enabledTime,
client,
onInittedEver(enabledTime, profileBuilder, profilesToCollect, detector, totalUpdates, listener)
);
} else {
createRunningStateAndInitProgress(profilesToCollect, profileBuilder);
listener.onResponse(profileBuilder.build());
Expand All @@ -471,18 +469,6 @@ private void profileMultiEntityDetectorStateRelated(
}
}

private void confirmMultiEntityDetectorInitStatus(
AnomalyDetector detector,
long enabledTime,
DetectorProfile.Builder profile,
Set<DetectorProfileName> profilesToCollect,
long totalUpdates,
MultiResponsesDelegateActionListener<DetectorProfile> listener
) {
SearchRequest searchLatestResult = createInittedEverRequest(detector.getDetectorId(), enabledTime, detector.getResultIndex());
client.search(searchLatestResult, onInittedEver(enabledTime, profile, profilesToCollect, detector, totalUpdates, listener));
}

private ActionListener<SearchResponse> onInittedEver(
long lastUpdateTimeMs,
DetectorProfile.Builder profileBuilder,
Expand Down Expand Up @@ -602,26 +588,4 @@ private void processInitResponse(

listener.onResponse(builder.build());
}

/**
* Create search request to check if we have at least 1 anomaly score larger than 0 after AD job enabled time
* @param detectorId detector id
* @param enabledTime the time when AD job is enabled in milliseconds
* @return the search request
*/
private SearchRequest createInittedEverRequest(String detectorId, long enabledTime, String resultIndex) {
BoolQueryBuilder filterQuery = new BoolQueryBuilder();
filterQuery.filter(QueryBuilders.termQuery(AnomalyResult.DETECTOR_ID_FIELD, detectorId));
filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.EXECUTION_END_TIME_FIELD).gte(enabledTime));
filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.ANOMALY_SCORE_FIELD).gt(0));

SearchSourceBuilder source = new SearchSourceBuilder().query(filterQuery).size(1);

SearchRequest request = new SearchRequest(CommonName.ANOMALY_RESULT_INDEX_ALIAS);
request.source(source);
if (resultIndex != null) {
request.indices(resultIndex);
}
return request;
}
}
132 changes: 116 additions & 16 deletions src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.ad.common.exception.AnomalyDetectionException;
import org.opensearch.ad.common.exception.EndRunException;
import org.opensearch.ad.common.exception.ResourceNotFoundException;
import org.opensearch.ad.constant.CommonErrorMessages;
Expand All @@ -32,6 +34,7 @@
import org.opensearch.ad.model.DetectorProfileName;
import org.opensearch.ad.model.FeatureData;
import org.opensearch.ad.model.IntervalTimeConfiguration;
import org.opensearch.ad.task.ADTaskCacheManager;
import org.opensearch.ad.task.ADTaskManager;
import org.opensearch.ad.transport.AnomalyResultResponse;
import org.opensearch.ad.transport.ProfileAction;
Expand All @@ -40,10 +43,12 @@
import org.opensearch.ad.transport.RCFPollingRequest;
import org.opensearch.ad.transport.handler.AnomalyIndexHandler;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.commons.authuser.User;
import org.opensearch.search.SearchHits;
import org.opensearch.threadpool.ThreadPool;

public class ExecuteADResultResponseRecorder {
Expand All @@ -55,21 +60,30 @@ public class ExecuteADResultResponseRecorder {
private DiscoveryNodeFilterer nodeFilter;
private ThreadPool threadPool;
private Client client;
private NodeStateManager nodeStateManager;
private ADTaskCacheManager adTaskCacheManager;
private int rcfMinSamples;

public ExecuteADResultResponseRecorder(
AnomalyDetectionIndices anomalyDetectionIndices,
AnomalyIndexHandler<AnomalyResult> anomalyResultHandler,
ADTaskManager adTaskManager,
DiscoveryNodeFilterer nodeFilter,
ThreadPool threadPool,
Client client
Client client,
NodeStateManager nodeStateManager,
ADTaskCacheManager adTaskCacheManager,
int rcfMinSamples
) {
this.anomalyDetectionIndices = anomalyDetectionIndices;
this.anomalyResultHandler = anomalyResultHandler;
this.adTaskManager = adTaskManager;
this.nodeFilter = nodeFilter;
this.threadPool = threadPool;
this.client = client;
this.nodeStateManager = nodeStateManager;
this.adTaskCacheManager = adTaskCacheManager;
this.rcfMinSamples = rcfMinSamples;
}

public void indexAnomalyResult(
Expand Down Expand Up @@ -185,27 +199,66 @@ private void updateLatestRealtimeTask(
String error
) {
// Don't need info as this will be printed repeatedly in each interval
adTaskManager
.updateLatestRealtimeTaskOnCoordinatingNode(
ActionListener<UpdateResponse> listener = ActionListener.wrap(r -> {
if (r != null) {
log.debug("Updated latest realtime task successfully for detector {}, taskState: {}", detectorId, taskState);
}
}, e -> {
if ((e instanceof ResourceNotFoundException) && e.getMessage().contains(CAN_NOT_FIND_LATEST_TASK)) {
// Clear realtime task cache, will recreate AD task in next run, check AnomalyResultTransportAction.
log.error("Can't find latest realtime task of detector " + detectorId);
adTaskManager.removeRealtimeTaskCache(detectorId);
} else {
log.error("Failed to update latest realtime task for detector " + detectorId, e);
}
});

// rcfTotalUpdates is null when we save exception messages
if (!adTaskCacheManager.hasQueriedResultIndex(detectorId) && rcfTotalUpdates != null && rcfTotalUpdates < rcfMinSamples) {
// confirm the total updates number since it is possible that we have already had results after job enabling time
// If yes, total updates should be at least rcfMinSamples so that the init progress reaches 100%.
confirmTotalRCFUpdatesFound(
detectorId,
taskState,
rcfTotalUpdates,
detectorIntervalInMinutes,
error,
ActionListener.wrap(r -> {
if (r != null) {
log.debug("Updated latest realtime task successfully for detector {}, taskState: {}", detectorId, taskState);
}
}, e -> {
if ((e instanceof ResourceNotFoundException) && e.getMessage().contains(CAN_NOT_FIND_LATEST_TASK)) {
// Clear realtime task cache, will recreate AD task in next run, check AnomalyResultTransportAction.
log.error("Can't find latest realtime task of detector " + detectorId);
adTaskManager.removeRealtimeTaskCache(detectorId);
} else {
log.error("Failed to update latest realtime task for detector " + detectorId, e);
}
})
ActionListener
.wrap(
r -> adTaskManager
.updateLatestRealtimeTaskOnCoordinatingNode(
detectorId,
taskState,
r,
detectorIntervalInMinutes,
error,
listener
),
e -> {
log.error("Fail to confirm rcf update", e);
adTaskManager
.updateLatestRealtimeTaskOnCoordinatingNode(
detectorId,
taskState,
rcfTotalUpdates,
detectorIntervalInMinutes,
error,
listener
);
}
)
);
} else {
adTaskManager
.updateLatestRealtimeTaskOnCoordinatingNode(
detectorId,
taskState,
rcfTotalUpdates,
detectorIntervalInMinutes,
error,
listener
);
}
}

/**
Expand Down Expand Up @@ -285,4 +338,51 @@ public void indexAnomalyResultException(
}
}

private void confirmTotalRCFUpdatesFound(
String detectorId,
String taskState,
Long rcfTotalUpdates,
Long detectorIntervalInMinutes,
String error,
ActionListener<Long> listener
) {
nodeStateManager.getAnomalyDetector(detectorId, ActionListener.wrap(detectorOptional -> {
if (!detectorOptional.isPresent()) {
listener.onFailure(new AnomalyDetectionException(detectorId, "fail to get detector"));
return;
}
nodeStateManager.getAnomalyDetectorJob(detectorId, ActionListener.wrap(jobOptional -> {
if (!jobOptional.isPresent()) {
listener.onFailure(new AnomalyDetectionException(detectorId, "fail to get job"));
return;
}
adTaskCacheManager.markResultIndexQueried(detectorId);
ProfileUtil
.confirmDetectorInitStatus(
detectorOptional.get(),
jobOptional.get().getEnabledTime().toEpochMilli(),
client,
ActionListener.wrap(searchResponse -> {
ActionListener.completeWith(listener, () -> {
SearchHits hits = searchResponse.getHits();
Long correctedTotalUpdates = rcfTotalUpdates;
if (hits.getTotalHits().value > 0L) {
// correct the number if we have already had results after job enabling time
// so that the detector won't stay initialized
correctedTotalUpdates = Long.valueOf(rcfMinSamples);
}
return correctedTotalUpdates;
});
}, exception -> {
if (ExceptionUtil.isIndexNotAvailable(exception)) {
// anomaly result index is not created yet
listener.onResponse(0L);
} else {
listener.onFailure(exception);
}
})
);
}, e -> listener.onFailure(new AnomalyDetectionException(detectorId, "fail to get job"))));
}, e -> listener.onFailure(new AnomalyDetectionException(detectorId, "fail to get detector"))));
}
}
57 changes: 57 additions & 0 deletions src/main/java/org/opensearch/ad/ProfileUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.ad;

import org.opensearch.action.ActionListener;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.client.Client;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;

public class ProfileUtil {
/**
* Create search request to check if we have at least 1 anomaly score larger than 0 after AD job enabled time
* @param detectorId detector id
* @param enabledTime the time when AD job is enabled in milliseconds
* @return the search request
*/
private static SearchRequest createInittedEverRequest(String detectorId, long enabledTime, String resultIndex) {
BoolQueryBuilder filterQuery = new BoolQueryBuilder();
filterQuery.filter(QueryBuilders.termQuery(AnomalyResult.DETECTOR_ID_FIELD, detectorId));
filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.EXECUTION_END_TIME_FIELD).gte(enabledTime));
filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.ANOMALY_SCORE_FIELD).gt(0));

SearchSourceBuilder source = new SearchSourceBuilder().query(filterQuery).size(1);

SearchRequest request = new SearchRequest(CommonName.ANOMALY_RESULT_INDEX_ALIAS);
request.source(source);
if (resultIndex != null) {
request.indices(resultIndex);
}
return request;
}

public static void confirmDetectorInitStatus(
AnomalyDetector detector,
long enabledTime,
Client client,
ActionListener<SearchResponse> listener
) {
SearchRequest searchLatestResult = createInittedEverRequest(detector.getDetectorId(), enabledTime, detector.getResultIndex());
client.search(searchLatestResult, listener);
}
}
Loading

0 comments on commit 5dbc2f2

Please sign in to comment.