Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Adds initialization progress to profile API #164

Merged
merged 7 commits into from
Jul 14, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultRequest;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultResponse;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyResultHandler;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyIndexHandler;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.DetectorStateHandler;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobExecutionContext;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.LockModel;
Expand All @@ -77,8 +78,9 @@ public class AnomalyDetectorJobRunner implements ScheduledJobRunner {
private Client client;
private ClientUtil clientUtil;
private ThreadPool threadPool;
private AnomalyResultHandler anomalyResultHandler;
private AnomalyIndexHandler<AnomalyResult> anomalyResultHandler;
private ConcurrentHashMap<String, Integer> detectorEndRunExceptionCount;
private DetectorStateHandler detectorStateHandler;

public static AnomalyDetectorJobRunner getJobRunnerInstance() {
if (INSTANCE != null) {
Expand Down Expand Up @@ -110,7 +112,7 @@ public void setThreadPool(ThreadPool threadPool) {
this.threadPool = threadPool;
}

public void setAnomalyResultHandler(AnomalyResultHandler anomalyResultHandler) {
public void setAnomalyResultHandler(AnomalyIndexHandler<AnomalyResult> anomalyResultHandler) {
this.anomalyResultHandler = anomalyResultHandler;
}

Expand All @@ -119,6 +121,10 @@ public void setSettings(Settings settings) {
this.maxRetryForEndRunException = AnomalyDetectorSettings.MAX_RETRY_FOR_END_RUN_EXCEPTION.get(settings);
}

public void setDetectorStateHandler(DetectorStateHandler detectorStateHandler) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor, should we rename setDetectorStateHandler as setDetectionStateHandler ? Same question for other ...detectorState...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed

this.detectorStateHandler = detectorStateHandler;
}

@Override
public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) {
String detectorId = jobParameter.getName();
Expand Down Expand Up @@ -436,7 +442,8 @@ private void indexAnomalyResult(
Instant.now(),
response.getError()
);
anomalyResultHandler.indexAnomalyResult(anomalyResult);
anomalyResultHandler.index(anomalyResult, detectorId);
detectorStateHandler.saveError(response.getError(), detectorId, jobParameter.getEnabledTime());
} catch (Exception e) {
log.error("Failed to index anomaly result for " + detectorId, e);
} finally {
Expand Down Expand Up @@ -490,7 +497,8 @@ private void indexAnomalyResultException(
Instant.now(),
errorMessage
);
anomalyResultHandler.indexAnomalyResult(anomalyResult);
anomalyResultHandler.index(anomalyResult, detectorId);
detectorStateHandler.saveError(errorMessage, detectorId, jobParameter.getEnabledTime());
} catch (Exception e) {
log.error("Failed to index anomaly result for " + detectorId, e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@
import java.security.PrivilegedAction;
import java.time.Clock;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -83,6 +81,7 @@
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult;
import com.amazon.opendistroforelasticsearch.ad.model.DetectorInternalState;
import com.amazon.opendistroforelasticsearch.ad.rest.RestAnomalyDetectorJobAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestDeleteAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestExecuteAnomalyDetectorAction;
Expand All @@ -100,7 +99,6 @@
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.IndexStatusSupplier;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.ModelsOnNodeSupplier;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.SettableSupplier;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStateManager;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultAction;
Expand All @@ -111,18 +109,23 @@
import com.amazon.opendistroforelasticsearch.ad.transport.DeleteModelTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.RCFPollingAction;
import com.amazon.opendistroforelasticsearch.ad.transport.RCFPollingTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.RCFResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.RCFResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyResultHandler;
import com.amazon.opendistroforelasticsearch.ad.transport.TransportStateManager;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyIndexHandler;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.DetectorStateHandler;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import com.amazon.opendistroforelasticsearch.ad.util.ColdStartRunner;
import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer;
import com.amazon.opendistroforelasticsearch.ad.util.IndexUtils;
import com.amazon.opendistroforelasticsearch.ad.util.Throttler;
import com.amazon.opendistroforelasticsearch.ad.util.ThrowingConsumerWrapper;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobSchedulerExtension;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobParser;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobRunner;
Expand Down Expand Up @@ -150,6 +153,8 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip
private NamedXContentRegistry xContentRegistry;
private ClientUtil clientUtil;
private DiscoveryNodeFilterer nodeFilter;
private IndexUtils indexUtils;
private DetectorStateHandler detectorStateHandler;

static {
SpecialPermission.check();
Expand All @@ -170,28 +175,34 @@ public List<RestHandler> getRestHandlers(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
AnomalyResultHandler anomalyResultHandler = new AnomalyResultHandler(

AnomalyIndexHandler<AnomalyResult> anomalyResultHandler;
kaituo marked this conversation as resolved.
Show resolved Hide resolved
anomalyResultHandler = new AnomalyIndexHandler<AnomalyResult>(
client,
settings,
clusterService,
indexNameExpressionResolver,
anomalyDetectionIndices,
threadPool
threadPool,
AnomalyResult.ANOMALY_RESULT_INDEX,
ThrowingConsumerWrapper.throwingConsumerWrapper(anomalyDetectionIndices::initAnomalyResultIndexDirectly),
anomalyDetectionIndices::doesAnomalyResultIndexExist,
false,
this.clientUtil,
this.indexUtils,
clusterService
);

AnomalyDetectorJobRunner jobRunner = AnomalyDetectorJobRunner.getJobRunnerInstance();
jobRunner.setClient(client);
jobRunner.setClientUtil(clientUtil);
jobRunner.setThreadPool(threadPool);
jobRunner.setAnomalyResultHandler(anomalyResultHandler);
jobRunner.setDetectorStateHandler(detectorStateHandler);
jobRunner.setSettings(settings);

AnomalyDetectorProfileRunner profileRunner = new AnomalyDetectorProfileRunner(
client,
this.xContentRegistry,
this.nodeFilter,
indexNameExpressionResolver,
clusterService,
Calendar.getInstance(TimeZone.getTimeZone("UTC"))
AnomalyDetectorSettings.NUM_MIN_SAMPLES
);
RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction(profileRunner);
RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction(
Expand Down Expand Up @@ -257,7 +268,7 @@ public Collection<Object> createComponents(
Clock clock = Clock.systemUTC();
Throttler throttler = new Throttler(clock);
this.clientUtil = new ClientUtil(settings, client, throttler, threadPool);
IndexUtils indexUtils = new IndexUtils(client, clientUtil, clusterService);
this.indexUtils = new IndexUtils(client, clientUtil, clusterService, indexNameExpressionResolver);
anomalyDetectionIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings);
this.clusterService = clusterService;
this.xContentRegistry = xContentRegistry;
Expand Down Expand Up @@ -301,7 +312,7 @@ public Collection<Object> createComponents(
);

HashRing hashRing = new HashRing(nodeFilter, clock, settings);
ADStateManager stateManager = new ADStateManager(
TransportStateManager stateManager = new TransportStateManager(
client,
xContentRegistry,
modelManager,
Expand Down Expand Up @@ -348,6 +359,17 @@ public Collection<Object> createComponents(

adStats = new ADStats(indexUtils, modelManager, stats);
ADCircuitBreakerService adCircuitBreakerService = new ADCircuitBreakerService(jvmService).init();
this.detectorStateHandler = new DetectorStateHandler(
client,
settings,
threadPool,
ThrowingConsumerWrapper.throwingConsumerWrapper(anomalyDetectionIndices::initDetectorStateIndex),
anomalyDetectionIndices::doesDetectorStateIndexExist,
this.clientUtil,
this.indexUtils,
clusterService,
xContentRegistry
);

return ImmutableList
.of(
Expand All @@ -368,7 +390,8 @@ public Collection<Object> createComponents(
adCircuitBreakerService,
adStats,
new MasterEventListener(clusterService, threadPool, client, clock, clientUtil, nodeFilter),
nodeFilter
nodeFilter,
detectorStateHandler
);
}

Expand Down Expand Up @@ -413,7 +436,13 @@ public List<Setting<?>> getSettings() {

@Override
public List<NamedXContentRegistry.Entry> getNamedXContent() {
return ImmutableList.of(AnomalyDetector.XCONTENT_REGISTRY, AnomalyResult.XCONTENT_REGISTRY);
return ImmutableList
.of(
AnomalyDetector.XCONTENT_REGISTRY,
AnomalyResult.XCONTENT_REGISTRY,
DetectorInternalState.XCONTENT_REGISTRY,
AnomalyDetectorJob.XCONTENT_REGISTRY
);
}

/*
Expand All @@ -430,7 +459,8 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
new ActionHandler<>(AnomalyResultAction.INSTANCE, AnomalyResultTransportAction.class),
new ActionHandler<>(CronAction.INSTANCE, CronTransportAction.class),
new ActionHandler<>(ADStatsNodesAction.INSTANCE, ADStatsNodesTransportAction.class),
new ActionHandler<>(ProfileAction.INSTANCE, ProfileTransportAction.class)
new ActionHandler<>(ProfileAction.INSTANCE, ProfileTransportAction.class),
new ActionHandler<>(RCFPollingAction.INSTANCE, RCFPollingTransportAction.class)
);
}

Expand Down Expand Up @@ -466,7 +496,8 @@ public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings sett
new SystemIndexDescriptor(AnomalyDetectionIndices.ALL_AD_RESULTS_INDEX_PATTERN, "anomaly result"),
new SystemIndexDescriptor(AnomalyDetector.ANOMALY_DETECTORS_INDEX, "detector definition"),
new SystemIndexDescriptor(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX, "detector job"),
new SystemIndexDescriptor(CommonName.CHECKPOINT_INDEX_NAME, "model checkpoint")
new SystemIndexDescriptor(CommonName.CHECKPOINT_INDEX_NAME, "model checkpoint"),
new SystemIndexDescriptor(DetectorInternalState.DETECTOR_STATE_INDEX, "detector information like total rcf updates")
)
);
}
Expand Down
Loading