Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Adds initialization progress to profile API #164

Merged
merged 7 commits into from
Jul 14, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultRequest;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultResponse;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyResultHandler;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyIndexHandler;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.DetectorStateHandler;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobExecutionContext;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.LockModel;
Expand All @@ -77,8 +78,9 @@ public class AnomalyDetectorJobRunner implements ScheduledJobRunner {
private Client client;
private ClientUtil clientUtil;
private ThreadPool threadPool;
private AnomalyResultHandler anomalyResultHandler;
private AnomalyIndexHandler<AnomalyResult> anomalyResultHandler;
private ConcurrentHashMap<String, Integer> detectorEndRunExceptionCount;
private DetectorStateHandler detectorStateHandler;

public static AnomalyDetectorJobRunner getJobRunnerInstance() {
if (INSTANCE != null) {
Expand Down Expand Up @@ -110,7 +112,7 @@ public void setThreadPool(ThreadPool threadPool) {
this.threadPool = threadPool;
}

public void setAnomalyResultHandler(AnomalyResultHandler anomalyResultHandler) {
public void setAnomalyResultHandler(AnomalyIndexHandler<AnomalyResult> anomalyResultHandler) {
this.anomalyResultHandler = anomalyResultHandler;
}

Expand All @@ -119,6 +121,10 @@ public void setSettings(Settings settings) {
this.maxRetryForEndRunException = AnomalyDetectorSettings.MAX_RETRY_FOR_END_RUN_EXCEPTION.get(settings);
}

public void setDetectorStateHandler(DetectorStateHandler detectorStateHandler) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor, should we rename setDetectorStateHandler as setDetectionStateHandler ? Same question for other ...detectorState...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed

this.detectorStateHandler = detectorStateHandler;
}

@Override
public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) {
String detectorId = jobParameter.getName();
Expand Down Expand Up @@ -436,7 +442,8 @@ private void indexAnomalyResult(
Instant.now(),
response.getError()
);
anomalyResultHandler.indexAnomalyResult(anomalyResult);
anomalyResultHandler.index(anomalyResult, detectorId);
detectorStateHandler.saveError(response.getError(), detectorId);
kaituo marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there any reasons not inverse the dependency, i.e., letting detector state handler be a dependency within state manager? there are for two benefits

  1. removes the unnecessary dependency that state manager and job runner for the same detector are on the same node. if/when the two are run on separate nodes, error information doesn't need to be replicated.
  2. since state manager already has the error, directly handing off/delegating to the handler is more natural than giving the info to job runner and let job runner give it to the handler.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

saveError method needs to be called by a source that runs regularly. job runner is the source. Both state manager and detector state handler reacts to job runner.

About the two benefits:

  1. with or without my changes, state manager and job runner for the same detector are on the same node.
  2. state manager has the error because job runner gives the error to state manager. job runner can give the error to state manager because job runner has access to detectorStateHandler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. if job runner (sender) and state manager (responder) are on separate nodes for better scalability in the future, why let this change add an unnecessary coupling to prevent that?

  2. the state manager is already a dependency of transport action, which generates errors first hand, why not letting the transport action directly update state manager, which updates the handler, (the chain of command is job runner > transport action > state manager > state handler), rather than taking a longer route involving the high-level job runner for more lower level details, which should only be responsible for sending job requests by schedule and have as little business logic as possible?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I don't see it is coming in the near feature. State manager is saving the state of job runner. It makes no sense to save it in different node.
  2. detectorStateHandler and anomalyResultHandler are both dependency of job runner. Previously, the logic of anomalyResultHandler is in the transport action, as you suggested. Yaliang separated it out and moved it to job runner. It is easier to read and being consistent by keeping the way as it is now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. if job runner (sender) and state manager (responder) are on separate nodes for better scalability in the future, why let this change add an unnecessary coupling to prevent that?
  2. the state manager is already a dependency of transport action, which generates errors first hand, why not letting the transport action directly update state manager, which updates the handler, (the chain of command is job runner > transport action > state manager > state handler), rather than taking a longer route involving the high-level job runner for more lower level details, which should only be responsible for sending job requests by schedule and have as little business logic as possible?

hi, Lai, good questions, before decoupling, we have no job runner and we handle all business logic and exceptions in transport action as it stands on top of other components. After decoupling, job runner is the top component, split the exception handling to job runner to avoid the exception handling logic scattered into multiple components(transport actions, job runners, etc). So job runner will catch all exceptions including its own exceptions, then decide the following error handling such as stop detector if no enough resource, and write error in AD result index. Now Kaituo add detectorStateHandler, which will track the latest error. So it's same design logic to do error handling in one place.

Of course, we may have other options to do this, like create a new error handler, pass it to every component to handle exception and persist in AD result individually. Both options have pros and cons, need to do some tradeoff. How about we keep current design for this PR? We can discuss different options and solve the error handling in another PR if we have better option.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not a blocker, as i have approved the pr.

it's good to use interface to decouple business logic from work execution dependencies. in another word, consider if the job scheduler code (a work execution dependency) change/migrate to some other dependency for execution, how much needs to re-developed, copy and pasted, re-tested? can/should the work be further reduced? in the ideal case, the interface should serve to protect the business logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. Agree to "use interface to decouple business logic from work execution dependencies". I think we can restructure when do AD workbench as it will change a lot of how to schedule detection task/job but not overdesign. Let's discuss more when do AD workbench.

} catch (Exception e) {
log.error("Failed to index anomaly result for " + detectorId, e);
} finally {
Expand Down Expand Up @@ -490,7 +497,8 @@ private void indexAnomalyResultException(
Instant.now(),
errorMessage
);
anomalyResultHandler.indexAnomalyResult(anomalyResult);
anomalyResultHandler.index(anomalyResult, detectorId);
detectorStateHandler.saveError(errorMessage, detectorId);
} catch (Exception e) {
log.error("Failed to index anomaly result for " + detectorId, e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@
import java.security.PrivilegedAction;
import java.time.Clock;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -83,6 +81,7 @@
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult;
import com.amazon.opendistroforelasticsearch.ad.model.DetectorInternalState;
import com.amazon.opendistroforelasticsearch.ad.rest.RestAnomalyDetectorJobAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestDeleteAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestExecuteAnomalyDetectorAction;
Expand All @@ -100,7 +99,6 @@
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.IndexStatusSupplier;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.ModelsOnNodeSupplier;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.SettableSupplier;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStateManager;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultAction;
Expand All @@ -111,18 +109,23 @@
import com.amazon.opendistroforelasticsearch.ad.transport.DeleteModelTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.RCFPollingAction;
import com.amazon.opendistroforelasticsearch.ad.transport.RCFPollingTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.RCFResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.RCFResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyResultHandler;
import com.amazon.opendistroforelasticsearch.ad.transport.TransportStateManager;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyIndexHandler;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.DetectorStateHandler;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import com.amazon.opendistroforelasticsearch.ad.util.ColdStartRunner;
import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer;
import com.amazon.opendistroforelasticsearch.ad.util.IndexUtils;
import com.amazon.opendistroforelasticsearch.ad.util.Throttler;
import com.amazon.opendistroforelasticsearch.ad.util.ThrowingConsumerWrapper;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobSchedulerExtension;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobParser;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobRunner;
Expand Down Expand Up @@ -150,6 +153,8 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip
private NamedXContentRegistry xContentRegistry;
private ClientUtil clientUtil;
private DiscoveryNodeFilterer nodeFilter;
private IndexUtils indexUtils;
private DetectorStateHandler detectorStateHandler;

static {
SpecialPermission.check();
Expand All @@ -170,28 +175,34 @@ public List<RestHandler> getRestHandlers(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
AnomalyResultHandler anomalyResultHandler = new AnomalyResultHandler(

AnomalyIndexHandler<AnomalyResult> anomalyResultHandler;
kaituo marked this conversation as resolved.
Show resolved Hide resolved
anomalyResultHandler = new AnomalyIndexHandler<AnomalyResult>(
client,
settings,
clusterService,
indexNameExpressionResolver,
anomalyDetectionIndices,
threadPool
threadPool,
AnomalyResult.ANOMALY_RESULT_INDEX,
ThrowingConsumerWrapper.throwingConsumerWrapper(anomalyDetectionIndices::initAnomalyResultIndexDirectly),
anomalyDetectionIndices::doesAnomalyResultIndexExist,
false,
this.clientUtil,
this.indexUtils,
clusterService
);

AnomalyDetectorJobRunner jobRunner = AnomalyDetectorJobRunner.getJobRunnerInstance();
jobRunner.setClient(client);
jobRunner.setClientUtil(clientUtil);
jobRunner.setThreadPool(threadPool);
jobRunner.setAnomalyResultHandler(anomalyResultHandler);
jobRunner.setDetectorStateHandler(detectorStateHandler);
jobRunner.setSettings(settings);

AnomalyDetectorProfileRunner profileRunner = new AnomalyDetectorProfileRunner(
client,
this.xContentRegistry,
this.nodeFilter,
indexNameExpressionResolver,
clusterService,
Calendar.getInstance(TimeZone.getTimeZone("UTC"))
AnomalyDetectorSettings.NUM_MIN_SAMPLES
);
RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction(profileRunner);
RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction(
Expand Down Expand Up @@ -257,7 +268,7 @@ public Collection<Object> createComponents(
Clock clock = Clock.systemUTC();
Throttler throttler = new Throttler(clock);
this.clientUtil = new ClientUtil(settings, client, throttler, threadPool);
IndexUtils indexUtils = new IndexUtils(client, clientUtil, clusterService);
this.indexUtils = new IndexUtils(client, clientUtil, clusterService, indexNameExpressionResolver);
anomalyDetectionIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings);
this.clusterService = clusterService;
this.xContentRegistry = xContentRegistry;
Expand Down Expand Up @@ -301,7 +312,7 @@ public Collection<Object> createComponents(
);

HashRing hashRing = new HashRing(nodeFilter, clock, settings);
ADStateManager stateManager = new ADStateManager(
TransportStateManager stateManager = new TransportStateManager(
client,
xContentRegistry,
modelManager,
Expand Down Expand Up @@ -348,6 +359,18 @@ public Collection<Object> createComponents(

adStats = new ADStats(indexUtils, modelManager, stats);
ADCircuitBreakerService adCircuitBreakerService = new ADCircuitBreakerService(jvmService).init();
this.detectorStateHandler = new DetectorStateHandler(
client,
settings,
threadPool,
ThrowingConsumerWrapper.throwingConsumerWrapper(anomalyDetectionIndices::initDetectorStateIndex),
anomalyDetectionIndices::doesDetectorStateIndexExist,
this.clientUtil,
this.indexUtils,
clusterService,
xContentRegistry,
stateManager
);

return ImmutableList
.of(
Expand All @@ -368,7 +391,8 @@ public Collection<Object> createComponents(
adCircuitBreakerService,
adStats,
new MasterEventListener(clusterService, threadPool, client, clock, clientUtil, nodeFilter),
nodeFilter
nodeFilter,
detectorStateHandler
);
}

Expand Down Expand Up @@ -413,7 +437,13 @@ public List<Setting<?>> getSettings() {

@Override
public List<NamedXContentRegistry.Entry> getNamedXContent() {
return ImmutableList.of(AnomalyDetector.XCONTENT_REGISTRY, AnomalyResult.XCONTENT_REGISTRY);
return ImmutableList
.of(
AnomalyDetector.XCONTENT_REGISTRY,
AnomalyResult.XCONTENT_REGISTRY,
DetectorInternalState.XCONTENT_REGISTRY,
AnomalyDetectorJob.XCONTENT_REGISTRY
);
}

/*
Expand All @@ -430,7 +460,8 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
new ActionHandler<>(AnomalyResultAction.INSTANCE, AnomalyResultTransportAction.class),
new ActionHandler<>(CronAction.INSTANCE, CronTransportAction.class),
new ActionHandler<>(ADStatsNodesAction.INSTANCE, ADStatsNodesTransportAction.class),
new ActionHandler<>(ProfileAction.INSTANCE, ProfileTransportAction.class)
new ActionHandler<>(ProfileAction.INSTANCE, ProfileTransportAction.class),
new ActionHandler<>(RCFPollingAction.INSTANCE, RCFPollingTransportAction.class)
);
}

Expand Down Expand Up @@ -466,7 +497,8 @@ public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings sett
new SystemIndexDescriptor(AnomalyDetectionIndices.ALL_AD_RESULTS_INDEX_PATTERN, "anomaly result"),
new SystemIndexDescriptor(AnomalyDetector.ANOMALY_DETECTORS_INDEX, "detector definition"),
new SystemIndexDescriptor(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX, "detector job"),
new SystemIndexDescriptor(CommonName.CHECKPOINT_INDEX_NAME, "model checkpoint")
new SystemIndexDescriptor(CommonName.CHECKPOINT_INDEX_NAME, "model checkpoint"),
new SystemIndexDescriptor(DetectorInternalState.DETECTOR_STATE_INDEX, "detector information like total rcf updates")
)
);
}
Expand Down
Loading