Skip to content

Commit

Permalink
Use callbacks and bug fix (opendistro-for-elasticsearch#83)
Browse files Browse the repository at this point in the history
* Use callbacks and bug fix

This PR includes the following changes:

1. remove classes that are not needed in jacocoExclusions since we have enough coverage for those classes.
2. Use ClientUtil instead of Elasticsearch’s client in AD job runner
3. Use one function to get the number of partitioned forests. Previously, we have redundant code in both ModelManager and ADStateManager.
4. Change ADStateManager.getAnomalyDetector to use callback.
5. Change AnomalyResultTransportAction to use callback to get features.
6. Add in AnomalyResultTransportAction to handle the case where all features have been disabled, and users' index does not exist.
7. Change get RCF and threshold result methods to use callback and add exception handling of IndexNotFoundException due to the change. Previously, getting RCF and threshold result methods won’t throw IndexNotFoundException.
8. Remove unused fields in StopDetectorTransportAction and AnomalyResultTransportAction
9. Unwrap EsRejectedExecutionException as it can be nested inside RemoteTransportException. Previously, we would not recognize EsRejectedExecutionException and thus miss anomaly results write retrying.
10. Add error in anomaly result schema.11. Fix broken tests due to my changes.

Testing done:
1. unit/integration tests pass
2. do end-to-end testing and make sure my fix achieves the purpose 
   * timeout issue is gone 
   * when all features have been disabled or index does not exist, we will retry a few more times and disable AD jobs.
  • Loading branch information
kaituo authored Apr 14, 2020
1 parent 4e4b343 commit 0c33050
Show file tree
Hide file tree
Showing 21 changed files with 429 additions and 342 deletions.
18 changes: 6 additions & 12 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -255,23 +255,17 @@ List<String> jacocoExclusions = [
'com.amazon.opendistroforelasticsearch.ad.common.exception.AnomalyDetectionException',
'com.amazon.opendistroforelasticsearch.ad.util.ClientUtil',

'com.amazon.opendistroforelasticsearch.ad.ml.*',
'com.amazon.opendistroforelasticsearch.ad.feature.*',
'com.amazon.opendistroforelasticsearch.ad.dataprocessor.*',
'com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorRunner',
'com.amazon.opendistroforelasticsearch.ad.resthandler.RestGetAnomalyResultAction',
'com.amazon.opendistroforelasticsearch.ad.metrics.MetricFactory',
'com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices',
'com.amazon.opendistroforelasticsearch.ad.transport.ForwardAction',
'com.amazon.opendistroforelasticsearch.ad.transport.ForwardTransportAction',
'com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorAction',
'com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorRequest',
'com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorResponse',
'com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorTransportAction',
'com.amazon.opendistroforelasticsearch.ad.transport.ADStatsAction',
'com.amazon.opendistroforelasticsearch.ad.transport.CronRequest',
'com.amazon.opendistroforelasticsearch.ad.transport.DeleteDetectorAction',
'com.amazon.opendistroforelasticsearch.ad.util.ParseUtils'
'com.amazon.opendistroforelasticsearch.ad.transport.CronTransportAction',
'com.amazon.opendistroforelasticsearch.ad.transport.CronRequest',
'com.amazon.opendistroforelasticsearch.ad.transport.ADStatsAction',
'com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorRunner',
'com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices',
'com.amazon.opendistroforelasticsearch.ad.util.ParseUtils',
]

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultResponse;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyResultHandler;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobExecutionContext;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.LockModel;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobParameter;
Expand All @@ -39,7 +40,9 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -71,6 +74,7 @@ public class AnomalyDetectorJobRunner implements ScheduledJobRunner {
private Settings settings;
private int maxRetryForEndRunException;
private Client client;
private ClientUtil clientUtil;
private ThreadPool threadPool;
private AnomalyResultHandler anomalyResultHandler;
private ConcurrentHashMap<String, Integer> detectorEndRunExceptionCount;
Expand All @@ -97,6 +101,10 @@ public void setClient(Client client) {
this.client = client;
}

public void setClientUtil(ClientUtil clientUtil) {
this.clientUtil = clientUtil;
}

public void setThreadPool(ThreadPool threadPool) {
this.threadPool = threadPool;
}
Expand Down Expand Up @@ -258,7 +266,7 @@ protected void handleAdException(
) {
String detectorId = jobParameter.getName();
if (exception instanceof EndRunException) {
log.error("EndRunException happened when executed anomaly result action for " + detectorId, exception);
log.error("EndRunException happened when executing anomaly result action for " + detectorId, exception);

if (((EndRunException) exception).isEndNow()) {
// Stop AD job if EndRunException shows we should end job now.
Expand Down Expand Up @@ -349,9 +357,8 @@ private void stopAdJob(String detectorId) {
try {
GetRequest getRequest = new GetRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX).id(detectorId);

client.get(getRequest, ActionListener.wrap(response -> {
clientUtil.<GetRequest, GetResponse>asyncRequest(getRequest, client::get, ActionListener.wrap(response -> {
if (response.isExists()) {
String s = response.getSourceAsString();
try (
XContentParser parser = XContentType.JSON
.xContent()
Expand All @@ -374,14 +381,19 @@ private void stopAdJob(String detectorId) {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(newJob.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), XCONTENT_WITH_TYPE))
.id(detectorId);
client.index(indexRequest, ActionListener.wrap(indexResponse -> {
if (indexResponse != null
&& (indexResponse.getResult() == CREATED || indexResponse.getResult() == UPDATED)) {
log.info("AD Job was disabled by JobRunner for " + detectorId);
} else {
log.warn("Failed to disable AD job for " + detectorId);
}
}, exception -> log.error("JobRunner failed to update AD job as disabled for " + detectorId, exception)));
clientUtil
.<IndexRequest, IndexResponse>asyncRequest(
indexRequest,
client::index,
ActionListener.wrap(indexResponse -> {
if (indexResponse != null
&& (indexResponse.getResult() == CREATED || indexResponse.getResult() == UPDATED)) {
log.info("AD Job was disabled by JobRunner for " + detectorId);
} else {
log.warn("Failed to disable AD job for " + detectorId);
}
}, exception -> log.error("JobRunner failed to update AD job as disabled for " + detectorId, exception))
);
}
} catch (IOException e) {
log.error("JobRunner failed to stop detector job " + detectorId, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip
private ThreadPool threadPool;
private IndexNameExpressionResolver indexNameExpressionResolver;
private ADStats adStats;
private ClientUtil clientUtil;

static {
SpecialPermission.check();
Expand Down Expand Up @@ -174,6 +175,7 @@ public List<RestHandler> getRestHandlers(
);
AnomalyDetectorJobRunner jobRunner = AnomalyDetectorJobRunner.getJobRunnerInstance();
jobRunner.setClient(client);
jobRunner.setClientUtil(clientUtil);
jobRunner.setThreadPool(threadPool);
jobRunner.setAnomalyResultHandler(anomalyResultHandler);
jobRunner.setSettings(settings);
Expand Down Expand Up @@ -237,7 +239,7 @@ public Collection<Object> createComponents(
Settings settings = environment.settings();
Clock clock = Clock.systemUTC();
Throttler throttler = new Throttler(clock);
ClientUtil clientUtil = new ClientUtil(settings, client, throttler, threadPool);
this.clientUtil = new ClientUtil(settings, client, throttler, threadPool);
IndexUtils indexUtils = new IndexUtils(client, clientUtil, clusterService);
anomalyDetectionIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings, clientUtil);
this.clusterService = clusterService;
Expand Down Expand Up @@ -272,7 +274,8 @@ public Collection<Object> createComponents(
HybridThresholdingModel.class,
AnomalyDetectorSettings.MIN_PREVIEW_SIZE,
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
AnomalyDetectorSettings.HOURLY_MAINTENANCE
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
AnomalyDetectorSettings.SHINGLE_SIZE
);

HashRing hashRing = new HashRing(clusterService, clock, settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public AnomalyDetectorRunner(ModelManager modelManager, FeatureManager featureMa
* @param startTime detection period start time
* @param endTime detection period end time
* @param listener handle anomaly result
* @throws IOException - if a user gives wrong query input when defining a detector
*/
public void executeDetector(AnomalyDetector detector, Instant startTime, Instant endTime, ActionListener<List<AnomalyResult>> listener)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
Expand Down Expand Up @@ -108,6 +109,7 @@ public String getName() {
private final CheckpointDao checkpointDao;
private final Gson gson;
private final Clock clock;
private final int shingleSize;

// A tree of N samples has 2N nodes, with one bounding box for each node.
private static final long BOUNDING_BOXES = 2L;
Expand Down Expand Up @@ -160,7 +162,8 @@ public ModelManager(
Class<? extends ThresholdingModel> thresholdingModelClass,
int minPreviewSize,
Duration modelTtl,
Duration checkpointInterval
Duration checkpointInterval,
int shingleSize
) {

this.clusterService = clusterService;
Expand Down Expand Up @@ -188,6 +191,7 @@ public ModelManager(

this.forests = new ConcurrentHashMap<>();
this.thresholds = new ConcurrentHashMap<>();
this.shingleSize = shingleSize;
}

/**
Expand Down Expand Up @@ -272,6 +276,36 @@ public Entry<Integer, Integer> getPartitionedForestSizes(RandomCutForest forest,
return new SimpleImmutableEntry<>(numPartitions, forestSize);
}

/**
* Construct a RCF model and then partition it by forest size.
*
* A RCF model is constructed based on the number of input features.
*
* Then a RCF model is first partitioned into desired size based on heap.
* If there are more partitions than the number of nodes in the cluster,
* the model is partitioned by the number of nodes and verified to
* ensure the size of a partition does not exceed the max size limit based on heap.
*
* @param detector detector object
* @return a pair of number of partitions and size of a parition (number of trees)
* @throws LimitExceededException when there is no sufficient resouce available
*/
public Entry<Integer, Integer> getPartitionedForestSizes(AnomalyDetector detector) {
String detectorId = detector.getDetectorId();
int rcfNumFeatures = detector.getEnabledFeatureIds().size() * shingleSize;
return getPartitionedForestSizes(
RandomCutForest
.builder()
.dimensions(rcfNumFeatures)
.sampleSize(rcfNumSamplesInTree)
.numberOfTrees(rcfNumTrees)
.outputAfter(rcfNumSamplesInTree)
.parallelExecutionEnabled(false)
.build(),
detectorId
);
}

/**
* Gets the estimated size of a RCF model.
*
Expand Down Expand Up @@ -542,20 +576,22 @@ public void trainModel(AnomalyDetector anomalyDetector, double[][] dataPoints) {
if (dataPoints.length == 0 || dataPoints[0].length == 0) {
throw new IllegalArgumentException("Data points must not be empty.");
}
if (dataPoints[0].length != anomalyDetector.getEnabledFeatureIds().size() * shingleSize) {
throw new IllegalArgumentException(
String
.format(
Locale.ROOT,
"Feature dimension is not correct, we expect %s but get %d",
anomalyDetector.getEnabledFeatureIds().size() * shingleSize,
dataPoints[0].length
)
);
}
int rcfNumFeatures = dataPoints[0].length;

// Create partitioned RCF models
Entry<Integer, Integer> partitionResults = getPartitionedForestSizes(
RandomCutForest
.builder()
.dimensions(rcfNumFeatures)
.sampleSize(rcfNumSamplesInTree)
.numberOfTrees(rcfNumTrees)
.outputAfter(rcfNumSamplesInTree)
.parallelExecutionEnabled(false)
.build(),
anomalyDetector.getDetectorId()
);
Entry<Integer, Integer> partitionResults = getPartitionedForestSizes(anomalyDetector);

int numForests = partitionResults.getKey();
int forestSize = partitionResults.getValue();
double[] scores = new double[dataPoints.length];
Expand Down
Loading

0 comments on commit 0c33050

Please sign in to comment.