Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Change to use callbacks in cold start
Browse files Browse the repository at this point in the history
This PR changes the code path of cold start in the transport layer to use callbacks.

Previously, I created AD’s ExecutorService that has one thread for cold starts in ColdStartRunner. When we need to trigger a cold start, we can submit a task in the ExecutorService, and consult a hash map (keyed by detector Id) that cached the results of recent cold start results. Since I have to invoke the cold start thread in various callbacks, I created a cold start thread pool and put the cold start result in the transport state.

This PR also handles new exceptions like invalid queries introduced by recent changes on ModelManager and FeatureManager.

This PR lowers the severity of a couple of log messages in HashRing and RCFPollingTransportAction to avoid overwhelming readers of log files. These log messages are common.

This PR corrects typos and updates known causes of EndRunException in comments.

Testing done:
1. Simulated cold start failures: Exceptions of cold starts can be seen by the transport layer.  EndRunException can cause AD jobs to be terminated.
2. Happy case of a cold start still works.
  • Loading branch information
kaituo committed Aug 13, 2020
1 parent f03f63f commit 49f82fe
Show file tree
Hide file tree
Showing 18 changed files with 579 additions and 401 deletions.
1 change: 1 addition & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ coverage:
- "cli/"
flags:
- cli
patch: off
comment:
layout: "reach, diff, flags, files"
behavior: default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyIndexHandler;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.DetectionStateHandler;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import com.amazon.opendistroforelasticsearch.ad.util.ColdStartRunner;
import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer;
import com.amazon.opendistroforelasticsearch.ad.util.IndexUtils;
import com.amazon.opendistroforelasticsearch.ad.util.Throttler;
Expand Down Expand Up @@ -320,7 +319,6 @@ public Collection<Object> createComponents(
clock,
AnomalyDetectorSettings.HOURLY_MAINTENANCE
);
ColdStartRunner runner = new ColdStartRunner();
FeatureManager featureManager = new FeatureManager(
searchFeatureDao,
interpolator,
Expand All @@ -333,7 +331,8 @@ public Collection<Object> createComponents(
AnomalyDetectorSettings.MAX_IMPUTATION_NEIGHBOR_DISTANCE,
AnomalyDetectorSettings.PREVIEW_SAMPLE_RATE,
AnomalyDetectorSettings.MAX_PREVIEW_SAMPLES,
AnomalyDetectorSettings.HOURLY_MAINTENANCE
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
threadPool
);
anomalyDetectorRunner = new AnomalyDetectorRunner(modelManager, featureManager, AnomalyDetectorSettings.MAX_PREVIEW_RESULTS);

Expand Down Expand Up @@ -386,7 +385,6 @@ public Collection<Object> createComponents(
modelManager,
clock,
stateManager,
runner,
new ADClusterEventListener(clusterService, hashRing, modelManager, nodeFilter),
adCircuitBreakerService,
adStats,
Expand All @@ -398,8 +396,8 @@ public Collection<Object> createComponents(

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
return Collections
.singletonList(
return Arrays
.asList(
new FixedExecutorBuilder(
settings,
AD_THREAD_POOL_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public boolean build() {

// Check cooldown period
if (clock.millis() - lastUpdate <= coolDownPeriod.getMillis()) {
LOG.info(COOLDOWN_MSG);
LOG.debug(COOLDOWN_MSG);
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.threadpool.ThreadPool;

import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin;
import com.amazon.opendistroforelasticsearch.ad.common.exception.EndRunException;
import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages;
import com.amazon.opendistroforelasticsearch.ad.dataprocessor.Interpolator;
Expand Down Expand Up @@ -72,6 +75,7 @@ public class FeatureManager {
private final double previewSampleRate;
private final int maxPreviewSamples;
private final Duration featureBufferTtl;
private final ThreadPool threadPool;

/**
* Constructor with dependencies and configuration.
Expand All @@ -88,6 +92,7 @@ public class FeatureManager {
* @param previewSampleRate number of samples to number of all the data points in the preview time range
* @param maxPreviewSamples max number of samples from search for preview features
* @param featureBufferTtl time to live for stale feature buffers
* @param threadPool object through which we can invoke different threadpool using different names
*/
public FeatureManager(
SearchFeatureDao searchFeatureDao,
Expand All @@ -101,7 +106,8 @@ public FeatureManager(
int maxNeighborDistance,
double previewSampleRate,
int maxPreviewSamples,
Duration featureBufferTtl
Duration featureBufferTtl,
ThreadPool threadPool
) {
this.searchFeatureDao = searchFeatureDao;
this.interpolator = interpolator;
Expand All @@ -117,6 +123,7 @@ public FeatureManager(
this.featureBufferTtl = featureBufferTtl;

this.detectorIdsToTimeShingles = new ConcurrentHashMap<>();
this.threadPool = threadPool;
}

/**
Expand Down Expand Up @@ -331,10 +338,12 @@ public Optional<double[][]> getColdStartData(AnomalyDetector detector) {
* onFailure is called with EndRunException on feature query creation errors
*/
public void getColdStartData(AnomalyDetector detector, ActionListener<Optional<double[][]>> listener) {
ActionListener<Optional<Long>> latestTimeListener = ActionListener
.wrap(latest -> getColdStartSamples(latest, detector, listener), listener::onFailure);
searchFeatureDao
.getLatestDataTime(
detector,
ActionListener.wrap(latest -> getColdStartSamples(latest, detector, listener), listener::onFailure)
new ThreadedActionListener<>(logger, threadPool, AnomalyDetectorPlugin.AD_THREAD_POOL_NAME, latestTimeListener, false)
);
}

Expand All @@ -343,11 +352,19 @@ private void getColdStartSamples(Optional<Long> latest, AnomalyDetector detector
if (latest.isPresent()) {
List<Entry<Long, Long>> sampleRanges = getColdStartSampleRanges(detector, latest.get());
try {
ActionListener<List<Optional<double[]>>> getFeaturesListener = ActionListener
.wrap(samples -> processColdStartSamples(samples, shingleSize, listener), listener::onFailure);
searchFeatureDao
.getFeatureSamplesForPeriods(
detector,
sampleRanges,
ActionListener.wrap(samples -> processColdStartSamples(samples, shingleSize, listener), listener::onFailure)
new ThreadedActionListener<>(
logger,
threadPool,
AnomalyDetectorPlugin.AD_THREAD_POOL_NAME,
getFeaturesListener,
false
)
);
} catch (IOException e) {
listener.onFailure(new EndRunException(detector.getDetectorId(), CommonErrorMessages.INVALID_SEARCH_QUERY_MSG, e, true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public String getDetectorIdForModelId(String modelId) {
* @param forest RCF configuration, including forest size
* @param detectorId ID of the detector with no effects on partitioning
* @return a pair of number of partitions and size of a parition (number of trees)
* @throws LimitExceededException when there is no sufficient resouce available
* @throws LimitExceededException when there is no sufficient resource available
*/
public Entry<Integer, Integer> getPartitionedForestSizes(RandomCutForest forest, String detectorId) {
long totalSize = estimateModelSize(forest);
Expand Down Expand Up @@ -295,7 +295,7 @@ public Entry<Integer, Integer> getPartitionedForestSizes(RandomCutForest forest,
*
* @param detector detector object
* @return a pair of number of partitions and size of a parition (number of trees)
* @throws LimitExceededException when there is no sufficient resouce available
* @throws LimitExceededException when there is no sufficient resource available
*/
public Entry<Integer, Integer> getPartitionedForestSizes(AnomalyDetector detector) {
int shingleSize = detector.getShingleSize();
Expand Down
Loading

0 comments on commit 49f82fe

Please sign in to comment.