Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Adds initialization progress to profile API
Browse files Browse the repository at this point in the history
This PR adds init_progress to profile API. init_progress helps users track initialization percentage, needed shingles, and estimated time to go if the future data stream is continuous (no missing data).

Initialization percentage measures how far away we can observe RCF models emitting scores.  The implementation fetches the RCF model's total updates while the AD job is running and materializes the value to the newly added index .opendistro-anomaly-info. Total updates record the number of times this RCF model has been updated. 

Initialization percent is computed as x/128: 
* if total updates > 128, x = 128. Otherwise, x is the total updates 
* 128 is our output after the number in RCF. After observing 128 samples, RCF starts emitting scores.

Needed shingles are computed as 128 -x.  Estimated minutes to go is computed as needed shingles * detector interval

This PR also materializes the error message in the most recent run to speed up profile API's error fetching.

During each AD execution, we also check if a checkpoint is there (the result is saved and maintained as other AD states), if yes, we cold start immediately.

Testing done:
1. adds unit tests
2. run e2e tests to verify init_progress number makes sense.
  • Loading branch information
kaituo committed Jun 25, 2020
1 parent 4712ede commit e33265c
Show file tree
Hide file tree
Showing 50 changed files with 2,494 additions and 852 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultRequest;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultResponse;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyResultHandler;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyIndexHandler;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.DetectorStateHandler;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobExecutionContext;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.LockModel;
Expand All @@ -77,8 +78,9 @@ public class AnomalyDetectorJobRunner implements ScheduledJobRunner {
private Client client;
private ClientUtil clientUtil;
private ThreadPool threadPool;
private AnomalyResultHandler anomalyResultHandler;
private AnomalyIndexHandler<AnomalyResult> anomalyResultHandler;
private ConcurrentHashMap<String, Integer> detectorEndRunExceptionCount;
private DetectorStateHandler detectorStateHandler;

public static AnomalyDetectorJobRunner getJobRunnerInstance() {
if (INSTANCE != null) {
Expand Down Expand Up @@ -110,7 +112,7 @@ public void setThreadPool(ThreadPool threadPool) {
this.threadPool = threadPool;
}

public void setAnomalyResultHandler(AnomalyResultHandler anomalyResultHandler) {
public void setAnomalyResultHandler(AnomalyIndexHandler<AnomalyResult> anomalyResultHandler) {
this.anomalyResultHandler = anomalyResultHandler;
}

Expand All @@ -119,6 +121,10 @@ public void setSettings(Settings settings) {
this.maxRetryForEndRunException = AnomalyDetectorSettings.MAX_RETRY_FOR_END_RUN_EXCEPTION.get(settings);
}

public void setDetectorStateHandler(DetectorStateHandler detectorStateHandler) {
this.detectorStateHandler = detectorStateHandler;
}

@Override
public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) {
String detectorId = jobParameter.getName();
Expand Down Expand Up @@ -436,7 +442,8 @@ private void indexAnomalyResult(
Instant.now(),
response.getError()
);
anomalyResultHandler.indexAnomalyResult(anomalyResult);
anomalyResultHandler.index(anomalyResult, detectorId);
detectorStateHandler.saveError(response.getError(), detectorId);
} catch (Exception e) {
log.error("Failed to index anomaly result for " + detectorId, e);
} finally {
Expand Down Expand Up @@ -490,7 +497,8 @@ private void indexAnomalyResultException(
Instant.now(),
errorMessage
);
anomalyResultHandler.indexAnomalyResult(anomalyResult);
anomalyResultHandler.index(anomalyResult, detectorId);
detectorStateHandler.saveError(errorMessage, detectorId);
} catch (Exception e) {
log.error("Failed to index anomaly result for " + detectorId, e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@
import java.security.PrivilegedAction;
import java.time.Clock;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -83,6 +81,7 @@
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult;
import com.amazon.opendistroforelasticsearch.ad.model.DetectorInternalState;
import com.amazon.opendistroforelasticsearch.ad.rest.RestAnomalyDetectorJobAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestDeleteAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestExecuteAnomalyDetectorAction;
Expand Down Expand Up @@ -111,18 +110,22 @@
import com.amazon.opendistroforelasticsearch.ad.transport.DeleteModelTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.RCFPollingAction;
import com.amazon.opendistroforelasticsearch.ad.transport.RCFPollingTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.RCFResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.RCFResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyResultHandler;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyIndexHandler;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.DetectorStateHandler;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import com.amazon.opendistroforelasticsearch.ad.util.ColdStartRunner;
import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer;
import com.amazon.opendistroforelasticsearch.ad.util.IndexUtils;
import com.amazon.opendistroforelasticsearch.ad.util.Throttler;
import com.amazon.opendistroforelasticsearch.ad.util.ThrowingConsumerWrapper;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobSchedulerExtension;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobParser;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobRunner;
Expand Down Expand Up @@ -150,6 +153,8 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip
private NamedXContentRegistry xContentRegistry;
private ClientUtil clientUtil;
private DiscoveryNodeFilterer nodeFilter;
private IndexUtils indexUtils;
private DetectorStateHandler detectorStateHandler;

static {
SpecialPermission.check();
Expand All @@ -170,28 +175,34 @@ public List<RestHandler> getRestHandlers(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
AnomalyResultHandler anomalyResultHandler = new AnomalyResultHandler(

AnomalyIndexHandler<AnomalyResult> anomalyResultHandler;
anomalyResultHandler = new AnomalyIndexHandler<AnomalyResult>(
client,
settings,
clusterService,
indexNameExpressionResolver,
anomalyDetectionIndices,
threadPool
threadPool,
AnomalyResult.ANOMALY_RESULT_INDEX,
ThrowingConsumerWrapper.throwingConsumerWrapper(anomalyDetectionIndices::initAnomalyResultIndexDirectly),
anomalyDetectionIndices::doesAnomalyResultIndexExist,
false,
this.clientUtil,
this.indexUtils,
clusterService
);

AnomalyDetectorJobRunner jobRunner = AnomalyDetectorJobRunner.getJobRunnerInstance();
jobRunner.setClient(client);
jobRunner.setClientUtil(clientUtil);
jobRunner.setThreadPool(threadPool);
jobRunner.setAnomalyResultHandler(anomalyResultHandler);
jobRunner.setDetectorStateHandler(detectorStateHandler);
jobRunner.setSettings(settings);

AnomalyDetectorProfileRunner profileRunner = new AnomalyDetectorProfileRunner(
client,
this.xContentRegistry,
this.nodeFilter,
indexNameExpressionResolver,
clusterService,
Calendar.getInstance(TimeZone.getTimeZone("UTC"))
AnomalyDetectorSettings.NUM_MIN_SAMPLES
);
RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction(profileRunner);
RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction(
Expand Down Expand Up @@ -257,7 +268,7 @@ public Collection<Object> createComponents(
Clock clock = Clock.systemUTC();
Throttler throttler = new Throttler(clock);
this.clientUtil = new ClientUtil(settings, client, throttler, threadPool);
IndexUtils indexUtils = new IndexUtils(client, clientUtil, clusterService);
this.indexUtils = new IndexUtils(client, clientUtil, clusterService, indexNameExpressionResolver);
anomalyDetectionIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings);
this.clusterService = clusterService;
this.xContentRegistry = xContentRegistry;
Expand Down Expand Up @@ -347,6 +358,16 @@ public Collection<Object> createComponents(

adStats = new ADStats(indexUtils, modelManager, stats);
ADCircuitBreakerService adCircuitBreakerService = new ADCircuitBreakerService(jvmService).init();
this.detectorStateHandler = new DetectorStateHandler(
client,
settings,
threadPool,
ThrowingConsumerWrapper.throwingConsumerWrapper(anomalyDetectionIndices::initDetectorStateIndex),
anomalyDetectionIndices::doesDetectorStateIndexExist,
this.clientUtil,
this.indexUtils,
clusterService
);

return ImmutableList
.of(
Expand All @@ -367,7 +388,8 @@ public Collection<Object> createComponents(
adCircuitBreakerService,
adStats,
new MasterEventListener(clusterService, threadPool, client, clock, clientUtil, nodeFilter),
nodeFilter
nodeFilter,
detectorStateHandler
);
}

Expand Down Expand Up @@ -411,7 +433,13 @@ public List<Setting<?>> getSettings() {

@Override
public List<NamedXContentRegistry.Entry> getNamedXContent() {
return ImmutableList.of(AnomalyDetector.XCONTENT_REGISTRY, AnomalyResult.XCONTENT_REGISTRY);
return ImmutableList
.of(
AnomalyDetector.XCONTENT_REGISTRY,
AnomalyResult.XCONTENT_REGISTRY,
DetectorInternalState.XCONTENT_REGISTRY,
AnomalyDetectorJob.XCONTENT_REGISTRY
);
}

/*
Expand All @@ -428,7 +456,8 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
new ActionHandler<>(AnomalyResultAction.INSTANCE, AnomalyResultTransportAction.class),
new ActionHandler<>(CronAction.INSTANCE, CronTransportAction.class),
new ActionHandler<>(ADStatsNodesAction.INSTANCE, ADStatsNodesTransportAction.class),
new ActionHandler<>(ProfileAction.INSTANCE, ProfileTransportAction.class)
new ActionHandler<>(ProfileAction.INSTANCE, ProfileTransportAction.class),
new ActionHandler<>(RCFPollingAction.INSTANCE, RCFPollingTransportAction.class)
);
}

Expand Down Expand Up @@ -464,7 +493,8 @@ public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings sett
new SystemIndexDescriptor(AnomalyDetectionIndices.ALL_AD_RESULTS_INDEX_PATTERN, "anomaly result"),
new SystemIndexDescriptor(AnomalyDetector.ANOMALY_DETECTORS_INDEX, "detector definition"),
new SystemIndexDescriptor(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX, "detector job"),
new SystemIndexDescriptor(CommonName.CHECKPOINT_INDEX_NAME, "model checkpoint")
new SystemIndexDescriptor(CommonName.CHECKPOINT_INDEX_NAME, "model checkpoint"),
new SystemIndexDescriptor(DetectorInternalState.DETECTOR_STATE_INDEX, "detector information like total rcf updates")
)
);
}
Expand Down
Loading

0 comments on commit e33265c

Please sign in to comment.