-
Notifications
You must be signed in to change notification settings - Fork 36
Adds initialization progress to profile API #164
Adds initialization progress to profile API #164
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two general comments.
- Does additional detector info require an additional data index? An additional internal field in detector is a better option for it requires less resources (no new index or redundant data fields) and easier to manage (less consistency issues from multiple indices).
- After initialization is completed, the status is changed and collecting data for computing initializatio progress should stop. Collecting data and computing can be done on request instead.
The idea is to put all state related to an detector to the info index. As you said, additional detector info field will be added to the info index if required.
I prefer to keep collecting after the status is changed: |
I think we should weigh the two options carefully. 1. creating a new index and 2. adding a new field to existing index. From what I have seen, option #2 is the more commonly used pattern in es and it's more efficient and easier to manage. |
Tracking the progress can be done on request when the get profile api is called, correct? |
We should use an index based on its usage. Detector index is for detector definition, not intermediate state. That's why we have checkpoint, job, result index. Currently, info index is created to store intermediate state, not just init_progress related. For example, we can implement custom routing and store the routing table in the info index. Custom routing allows more flexible routing like allocating a model on a node with the most memory available. |
That requires a broadcast call to all nodes since we don't know which node has which model. Kibana would call this api every few minutes, which is costly. |
is the new index certainly needed? if detector index is to keep unchanged, what about the job index so all the execution related details/job info is managed in one index rather than scattered across multiple indices? |
it doesn't need to be broadcast. If there is no stored initialization results, the model can be located using hash or the handler just directly pulls the first partition to get the status and save it for future queries. The idea is to only do this work when it's needed and applicable. |
In this case I think we can name the index like |
I feel so. We cannot modify job index as job scheduler listens to any changes in the index. They would schedule a new job if any doc changes there. |
Good idea. I will do this then. I still need the info index to store error information though. |
We can do that. Yaliang may add sth in the index for his workbench. Will collect his idea on the name as well. |
ok. a new index it is then. one minor thing regarding naming, i in general avoid names like info/data for obvious reasons in software. state is good, or i wouldn't mind a name like internal state to be clear and explicit. |
What're the "obvious reasons in software"? |
in software, code directs data. one should be surprised if a domain term is not data/info. Consider a book object vs a book data object. there is no information gain in stating the obvious. |
e33265c
to
429b2e4
Compare
Changed to .opendistro-anomaly-state. |
Sudipto has some comments on AD workbench, will discuss with him today. How about we split this big code change into several smaller ones? I think we need more discussion about the new index. How about we make changes to support progress bar based on current AD result index to mitigate long initialization problem first? |
We cannot support progress bar based on current AD result index. What's the undecided besides index name? We can easily change name for that. What's an easy way to split this? Half of the PR are tests. I have mainly three parts: one is the change to store to new index , one is new transport action to support fetching rcf updates, another part is to changes in the consumer (profile api). If I split in this way, one PR has roughly 1000 lines, another PR has 600 lines, another PR has 1400 lines. Are you good with that? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
first round of screening
src/main/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndices.java
Show resolved
Hide resolved
src/main/java/com/amazon/opendistroforelasticsearch/ad/ml/ModelManager.java
Show resolved
Hide resolved
src/main/java/com/amazon/opendistroforelasticsearch/ad/model/InitProgressProfile.java
Outdated
Show resolved
Hide resolved
src/main/java/com/amazon/opendistroforelasticsearch/ad/model/InitProgressProfile.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
finished the first pass.
one idea to maybe simplify the work is retrieving/computing progress this way:
- the handler of the rest request will check current indexed state
1.a if model completed, there is no need for more updates and just return the state in the response
1.b if model is not completed, do step 2 - the handler pulls partition 0 checkpoint to retrieve the latest update count, if the count is changed from the count from step 1, update and index the new state, and return new state in response. else, just return the state in response.
therefore, after at most 256 updates, the state field will be final and no longer require updating or indexing.
src/main/java/com/amazon/opendistroforelasticsearch/ad/ml/ModelManager.java
Outdated
Show resolved
Hide resolved
src/main/java/com/amazon/opendistroforelasticsearch/ad/model/DetectorInternalState.java
Outdated
Show resolved
Hide resolved
src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java
Outdated
Show resolved
Hide resolved
src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java
Show resolved
Hide resolved
* | ||
* @param flags flags to be maintained | ||
*/ | ||
void maintenanceFlag(ConcurrentHashMap<String, Instant> flags) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor. this method should be private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrote unit tests for it. That's why I am not using private.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try to avoid a. leaking implementation in class interface. and b. build brittle implementation-based tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duly noted. Having package private for tests is not uncommon. People invented @VisibleForTesting annotation for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's better left for legacy code. for new code, it's better not to start with it.
...in/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java
Show resolved
Hide resolved
src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ThrowingConsumerWrapper.java
Show resolved
Hide resolved
...in/java/com/amazon/opendistroforelasticsearch/ad/transport/handler/DetectorStateHandler.java
Outdated
Show resolved
Hide resolved
src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorJobRunner.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
finished the first pass.
one idea to maybe simplify the work is retrieving/computing progress this way:
1. the handler of the rest request will check current indexed state 1.a if model completed, there is no need for more updates and just return the state in the response 1.b if model is not completed, do step 2 2. the handler pulls partition 0 checkpoint to retrieve the latest update count, if the count is changed from the count from step 1, update and index the new state, and return new state in response. else, just return the state in response.
therefore, after at most 256 updates, the state field will be final and no longer require updating or indexing.
I'll reply when you answered my other question elsewhere.
src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorJobRunner.java
Outdated
Show resolved
Hide resolved
src/main/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndices.java
Show resolved
Hide resolved
src/main/java/com/amazon/opendistroforelasticsearch/ad/ml/ModelManager.java
Outdated
Show resolved
Hide resolved
src/main/java/com/amazon/opendistroforelasticsearch/ad/ml/ModelManager.java
Show resolved
Hide resolved
src/main/java/com/amazon/opendistroforelasticsearch/ad/model/DetectorInternalState.java
Outdated
Show resolved
Hide resolved
src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java
Show resolved
Hide resolved
* | ||
* @param flags flags to be maintained | ||
*/ | ||
void maintenanceFlag(ConcurrentHashMap<String, Instant> flags) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrote unit tests for it. That's why I am not using private.
...in/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java
Show resolved
Hide resolved
...in/java/com/amazon/opendistroforelasticsearch/ad/transport/handler/DetectorStateHandler.java
Outdated
Show resolved
Hide resolved
src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ThrowingConsumerWrapper.java
Show resolved
Hide resolved
src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java
Show resolved
Hide resolved
src/main/java/com/amazon/opendistroforelasticsearch/ad/model/DetectorInternalState.java
Outdated
Show resolved
Hide resolved
src/main/java/com/amazon/opendistroforelasticsearch/ad/model/DetectorInternalState.java
Outdated
Show resolved
Hide resolved
src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java
Outdated
Show resolved
Hide resolved
src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java
Show resolved
Hide resolved
...in/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java
Show resolved
Hide resolved
src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/RCFPollingTransportAction.java
Show resolved
Hide resolved
try { | ||
Instant time = entry.getValue(); | ||
if (time.plus(stateTtl).isBefore(clock.instant())) { | ||
flags.remove(detectorId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can save some memory for one detection interval if remove detector from currentCheckpoints
, but needs another index query to get checkpoint as almost every detector run will try to get checkpoint. Can we just remove it when detector stopped/deleted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
state manitenance would remove it from memory. We don't have to do it manually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
state manitenance would remove it from memory. We don't have to do it manually.
Yeah, the question is do we have to add some TTL to checkpoint cache and remove it periodically? Can we simplify the logic to just remove one detector's checkpoint cache when detector stopped/deleted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One case this periodic cleanup is useful: job schedule's shard relocate and we don't use the coordinating node anymore. Before the detector gets deleted/stopped, these memory would be there.
ab8c592
to
9d018b9
Compare
This PR adds init_progress to profile API. init_progress helps users track initialization percentage, needed shingles, and estimated time to go if the future data stream is continuous (no missing data). Initialization percentage measures how far away we can observe RCF models emitting scores. The implementation fetches the RCF model's total updates while the AD job is running and materializes the value to the newly added index .opendistro-anomaly-info. Total updates record the number of times this RCF model has been updated. Initialization percent is computed as x/128: * if total updates > 128, x = 128. Otherwise, x is the total updates * 128 is our output after the number in RCF. After observing 128 samples, RCF starts emitting scores. Needed shingles are computed as 128 -x. Estimated minutes to go is computed as needed shingles * detector interval This PR also materializes the error message in the most recent run to speed up profile API's error fetching. During each AD execution, we also check if a checkpoint is there (the result is saved and maintained as other AD states), if yes, we cold start immediately. Testing done: 1. adds unit tests 2. run e2e tests to verify init_progress number makes sense.
if (profilesToCollect.contains(ProfileName.INIT_PROGRESS)) { | ||
listener.onResponse(new DetectorProfile.Builder().build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the status will be incorrect for a stopped detector?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This means init progress is empty for a stopped detector.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the status of the model is completed, i would think the profile init status should also be completed to reflect the truth, whether the detector is running or stopped. otherwise, it might cause confusion that stopping a detector deletes the model or uncertainty about the model of a stopped detector.
long lastUpdateTimeMs = detectorState.getLastUpdateTime().toEpochMilli(); | ||
|
||
// if state index hasn't been updated, we should not use the error field | ||
// For example, before enabled, error is stopped due to, we should not show |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor. i have difficulty in understanding this sentence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the following easier to read?
// if state index hasn't been updated, we should not use the error field
// For example, before enabled, error is "stopped due to blah", we should not show
// this when the detector is enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. now i understand it. a more complete sentence will be clearer.
For example, before (the detector is) enabled, if the error message contains the phrase "stopped due to blah", ....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will change.
} else { | ||
checkpointDao | ||
.getModelCheckpoint( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will retrieve the entire model checkpoints (w/ many data samples that are not used), recompuate all trees from samples, and only read one field. it would be more costly an operation than reading one field from the state. it's worth more consideration if this is scalable for the get profile requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, fortunately this is one-time thing and we don't have to do it repeatedly. Also, when later shingle is ready the model can be used immediately. Let me know if you have any way to improve this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am worried about for every get profile call, the model gets pulled, loaded, and hosted.
some optimizations
- there is not much to save from getting the checkpoint. but the checkpoint doesn't need to be fully deserialized, i.e., the bulk of the model, the trees are not needed. a lightweight deserialization only involving the forest containing the update count can be added to avoid recomputing trees and discarding them.
- if the model is not hosted before deserialization, it doesn't need to be hosted since it's not being used by active jobs, saves memory space and write back to index again later during maintenance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The model does not need to be pulled, loaded, and hosted for every get profile call. Only the first profile call if the model hasn't been hosted.
"rcf_updates": { | ||
"type": "integer" | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this field is current not used in this version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. Removed
TransportState state = transportStates.get(adID); | ||
if (state == null) { | ||
state = new TransportState(adID); | ||
transportStates.put(adID, state); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor. ConcurrentHashMap::computeIfAbsent is the api for this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed.
src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/TransportStateManager.java
Show resolved
Hide resolved
DetectorInternalState newState = null; | ||
if (state == null) { | ||
newState = new DetectorInternalState.Builder().error(error).lastUpdateTime(Instant.now()).build(); | ||
} else if ((state.getError() == null && error != null) || (state.getError() != null && !state.getError().equals(error))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it the same as Objects::equals ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, refactored. Thanks :)
clientUtil.<GetRequest, GetResponse>asyncRequest(getRequest, client::get, ActionListener.wrap(response -> { | ||
DetectorInternalState newState = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can error be part of transport state and trigger indexing when updated? so a get can be saved for each run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error would be saved on a coordinating node's state. We have no easy way to find a coordinating node for a detector except broadcasting. Any way to work around this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thought, what you said make sense. I'll keep the last error in transport state and access it in DetectorStateHandler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if after each run, the detector coordinator itself, holding the anomaly result and error, manages detector error state rather than letting the job scheduler do it, does it still require messaging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You meant does it still require indexing into state index?
Yes, the state can be cleared due to maintenance, or coordinating node can change. We need a permanent place to host the data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
address/reply to Lai's comments
@@ -436,7 +442,8 @@ private void indexAnomalyResult( | |||
Instant.now(), | |||
response.getError() | |||
); | |||
anomalyResultHandler.indexAnomalyResult(anomalyResult); | |||
anomalyResultHandler.index(anomalyResult, detectorId); | |||
detectorStateHandler.saveError(response.getError(), detectorId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are there any reasons not inverse the dependency, i.e., letting detector state handler be a dependency within state manager? there are for two benefits
- removes the unnecessary dependency that state manager and job runner for the same detector are on the same node. if/when the two are run on separate nodes, error information doesn't need to be replicated.
- since state manager already has the error, directly handing off/delegating to the handler is more natural than giving the info to job runner and let job runner give it to the handler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
saveError method needs to be called by a source that runs regularly. job runner is the source. Both state manager and detector state handler reacts to job runner.
About the two benefits:
- with or without my changes, state manager and job runner for the same detector are on the same node.
- state manager has the error because job runner gives the error to state manager. job runner can give the error to state manager because job runner has access to detectorStateHandler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
if job runner (sender) and state manager (responder) are on separate nodes for better scalability in the future, why let this change add an unnecessary coupling to prevent that?
-
the state manager is already a dependency of transport action, which generates errors first hand, why not letting the transport action directly update state manager, which updates the handler, (the chain of command is job runner > transport action > state manager > state handler), rather than taking a longer route involving the high-level job runner for more lower level details, which should only be responsible for sending job requests by schedule and have as little business logic as possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I don't see it is coming in the near feature. State manager is saving the state of job runner. It makes no sense to save it in different node.
- detectorStateHandler and anomalyResultHandler are both dependency of job runner. Previously, the logic of anomalyResultHandler is in the transport action, as you suggested. Yaliang separated it out and moved it to job runner. It is easier to read and being consistent by keeping the way as it is now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- if job runner (sender) and state manager (responder) are on separate nodes for better scalability in the future, why let this change add an unnecessary coupling to prevent that?
- the state manager is already a dependency of transport action, which generates errors first hand, why not letting the transport action directly update state manager, which updates the handler, (the chain of command is job runner > transport action > state manager > state handler), rather than taking a longer route involving the high-level job runner for more lower level details, which should only be responsible for sending job requests by schedule and have as little business logic as possible?
hi, Lai, good questions, before decoupling, we have no job runner and we handle all business logic and exceptions in transport action as it stands on top of other components. After decoupling, job runner is the top component, split the exception handling to job runner to avoid the exception handling logic scattered into multiple components(transport actions, job runners, etc). So job runner will catch all exceptions including its own exceptions, then decide the following error handling such as stop detector if no enough resource, and write error in AD result index. Now Kaituo add detectorStateHandler
, which will track the latest error. So it's same design logic to do error handling in one place.
Of course, we may have other options to do this, like create a new error handler, pass it to every component to handle exception and persist in AD result individually. Both options have pros and cons, need to do some tradeoff. How about we keep current design for this PR? We can discuss different options and solve the error handling in another PR if we have better option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not a blocker, as i have approved the pr.
it's good to use interface to decouple business logic from work execution dependencies. in another word, consider if the job scheduler code (a work execution dependency) change/migrate to some other dependency for execution, how much needs to re-developed, copy and pasted, re-tested? can/should the work be further reduced? in the ideal case, the interface should serve to protect the business logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion. Agree to "use interface to decouple business logic from work execution dependencies". I think we can restructure when do AD workbench as it will change a lot of how to schedule detection task/job but not overdesign. Let's discuss more when do AD workbench.
|
||
@Override | ||
public String executor() { | ||
return ThreadPool.Names.GENERIC; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is generic correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ThreadPool.Names.SAME is more appropriate as the operation is lightweight. Changed.
@@ -91,31 +92,31 @@ public ADStateManager( | |||
* @throws LimitExceededException when there is no sufficient resource available | |||
*/ | |||
public int getPartitionNumber(String adID, AnomalyDetector detector) { | |||
Entry<Integer, Instant> partitonAndTime = partitionNumber.get(adID); | |||
if (partitonAndTime != null) { | |||
if (transportStates.containsKey(adID) && transportStates.get(adID).getPartitonNumber() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor. computeIfPresent is more concise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
computeIfPresent is to change the mapping. Here, the mapping does not change, but the mapping's value's field get changed.
@@ -130,7 +131,13 @@ public void getAnomalyDetector(String adID, ActionListener<Optional<AnomalyDetec | |||
) { | |||
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); | |||
AnomalyDetector detector = AnomalyDetector.parse(parser, response.getId()); | |||
currentDetectors.put(adID, new SimpleEntry<>(detector, clock.instant())); | |||
TransportState state = transportStates.get(adID); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor. computeIfAbsent is more concise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
return errorAndTime.getKey(); | ||
} | ||
|
||
return IMPOSSIBLE_ERROR; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor. this seems more no_error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
|
||
public void saveError(String error, String detectorId) { | ||
// trigger indexing if no error recorded (e.g., this detector got enabled just now) | ||
// or the recorded error is different than this one. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we update error to record the latest error time if the error message is the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need last update time to know if the error is recorded before or after enabled time. Current code path can handle that. So we don't need to record the latest error time if the error message is the same.
@@ -141,6 +141,7 @@ private AnomalyDetectorSettings() {} | |||
public static final String ANOMALY_DETECTORS_INDEX_MAPPING_FILE = "mappings/anomaly-detectors.json"; | |||
public static final String ANOMALY_DETECTOR_JOBS_INDEX_MAPPING_FILE = "mappings/anomaly-detector-jobs.json"; | |||
public static final String ANOMALY_RESULTS_INDEX_MAPPING_FILE = "mappings/anomaly-results.json"; | |||
public static final String ANOMALY_DETECTOR_STATE_INDEX_MAPPING_FILE = "mappings/anomaly-state.json"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor, how about rename the json file as "anomaly-detection-state.json"? In AD workbench, we will introduce detection task, plan to put task execution state in this index ".opendistro-anomaly-detection-state", will add more fields when design done.
BTW, how about we rename ANOMALY_DETECTOR_STATE_INDEX_MAPPING_FILE as ANOMALY_DETECTION_STATE_INDEX_MAPPING_FILE, similar to ANOMALY_DETECTOR_STATE_INDEX.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed both
@@ -119,6 +121,10 @@ public void setSettings(Settings settings) { | |||
this.maxRetryForEndRunException = AnomalyDetectorSettings.MAX_RETRY_FOR_END_RUN_EXCEPTION.get(settings); | |||
} | |||
|
|||
public void setDetectorStateHandler(DetectorStateHandler detectorStateHandler) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor, should we rename setDetectorStateHandler
as setDetectionStateHandler
? Same question for other ...detectorState...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the change!
* Adds initialization progress to profile API This PR adds init_progress to profile API. init_progress helps users track initialization percentage, needed shingles, and estimated time to go if the future data stream is continuous (no missing data). Initialization percentage measures how far away we can observe RCF models emitting scores. The implementation fetches the RCF model's total updates while the AD job is running and materializes the value to the newly added index .opendistro-anomaly-info. Total updates record the number of times this RCF model has been updated. Initialization percent is computed as x/128: * if total updates > 128, x = 128. Otherwise, x is the total updates * 128 is our output after the number in RCF. After observing 128 samples, RCF starts emitting scores. Needed shingles are computed as 128 -x. Estimated minutes to go is computed as needed shingles * detector interval This PR also materializes the error message in the most recent run to speed up profile API's error fetching. During each AD execution, we also check if a checkpoint is there (the result is saved and maintained as other AD states), if yes, we cold start immediately. Testing done: 1. adds unit tests 2. run e2e tests to verify init_progress number makes sense.
Issue #, if available:
#148
Description of changes:
This PR adds init_progress to profile API. init_progress helps users track initialization percentage, needed shingles, and estimated time to go if the future data stream is continuous (no missing data).
Initialization percentage measures how far away we can observe RCF models emitting scores. The implementation fetches the RCF model's total updates while the AD job is running and materializes the value to the newly added index .opendistro-anomaly-info. Total updates record the number of times this RCF model has been updated.
Initialization percent is computed as x/128:
Needed shingles are computed as 128 -x. Estimated minutes to go is computed as needed shingles * detector interval
This PR also materializes the error message in the most recent run to speed up profile API's error fetching.
During each AD execution, we also check if a checkpoint is there (the result is saved and maintained as other AD states), if yes, we cold start immediately.
Testing done:
Sample output:
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.