-
Notifications
You must be signed in to change notification settings - Fork 36
Add transport action for model inference #249
Add transport action for model inference #249
Conversation
The coordinating node computes feature vectors for multiple entities within a time interval through a query, finds model host nodes using consistent hashing, and asks model hosting nodes to infer anomaly grade and confidence of the feature vector. This PR adds transport actions for model hosting nodes to infer the feature vector's anomaly grade and confidence. Model hosting nodes proceed entity features in a loop. We first check if the entity’s models are in memory. If yes, we use the model to infer anomaly grade and save them. If no, we skip the entity. Testing done: * add unit tests to cover the newly added transport actions * end-to-end testing
Codecov Report
@@ Coverage Diff @@
## master #249 +/- ##
=========================================
Coverage 73.01% 73.01%
Complexity 1461 1461
=========================================
Files 164 164
Lines 6834 6834
Branches 527 527
=========================================
Hits 4990 4990
Misses 1594 1594
Partials 250 250
Flags with carried forward coverage won't be shown. Click here to find out more. |
for (Entry<String, double[]> entity : request.getEntities().entrySet()) { | ||
String entityName = entity.getKey(); | ||
// For ES, the limit of the document ID is 512 bytes. | ||
// truncate to 256 characters if too long since we are using it as part of document id. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment can't match the code logic below. line137 will skip entity if its name is too long, not truncate to 256 characters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we handle the long document id ? Will ES throw exception if document id exceeds length limitation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure. The doc said it can only handle 512 bytes at most.
} | ||
this.anomalyResultHandler.flush(currentBulkRequest, detectorId); | ||
// bulk all accumulated checkpoint requests | ||
this.checkpointDao.flush(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about add listener
as param of flush
function ? If flush successfully, execute listener.onResponse(new AcknowledgedResponse(true));
, if fail, execute listener.onFailure
. Otherwise user never know the flush succeed or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't do it because:
First, flush is asynchronous. We don't know when it is gonna finish. In our performance testing, we find our queue has 70 k checkpoints. Hold the job too long may fail the following jobs.
Second, I don't know what's the action item for customers if they know their checkpoints fail. Mostly they are agnostic to checkpoints.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't know when it is gonna finish. In our performance testing, we find our queue has 70 k checkpoints. Hold the job too long may fail the following jobs.
Make sense. Do we catch exception in checkpointDao
and write in AD result ?
what's the action item for customers if they know their checkpoints fail
User know the system is under pressure, they can
1.Scale up/out their cluster
2.Stop some testing/low priority detector
3.Tune detector configuration like use less features, tune detector interval etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't in checkpointDao. We write to logs. Write to state index might be an option. We can have a field called checkpointError. How's that?
The action item you mentioned great.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's ok to track in state index.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do it after release.
// result.getRcfScore() = 0 means the model is not initialized | ||
// result.getGrade() = 0 means it is not an anomaly | ||
// So many EsRejectedExecutionException if we write no matter what | ||
if (result.getRcfScore() > 0 && (!onlySaveAnomalies || result.getGrade() > 0)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If model not initialized (rcfScore == 0
) for a long time such as not enough data/some error, how does user know what's going on if we don't write AD result ? The init progress bar is only for most active entity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the entity is in cache, profile can call getTotalUpdates(String detectorId, String entityId).
If not, profile API has to go to a checkpoint, load it to memory, and check its total updates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cache/checkpoint will be cleared periodically. Have some concern for Ops, if user want to know what happens, we can only rely on service log. We can address this in next phase, it's ok if we have enough logs for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed.
|
||
listener.onResponse(new AcknowledgedResponse(true)); | ||
}, exception -> { | ||
LOG.error("fail to get entity's anomaly grade", exception); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest to add detector id and start/end in error message for easier log analysis.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
|
||
public class EntityResultAction extends ActionType<AcknowledgedResponse> { | ||
public static final EntityResultAction INSTANCE = new EntityResultAction(); | ||
public static final String NAME = "cluster:admin/ad/entity/result"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets follow our opendistro convention for all our actions. You can take a look at other examples.
May be "cluster:admin/opendistro/ad/entity/result"
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
String categoricalField = detector.getCategoryField().get(0); | ||
|
||
ADResultBulkRequest currentBulkRequest = new ADResultBulkRequest(); | ||
// index pressure is high. Only save anomalies |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, how did we determine the index pressure is high?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We get exceptions from ES if index pressure is high.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the change!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking care for this.
Changes look good to me.
This PR is a conglomerate of the following PRs. opendistro-for-elasticsearch#247 opendistro-for-elasticsearch#249 opendistro-for-elasticsearch#250 opendistro-for-elasticsearch#252 opendistro-for-elasticsearch#253 opendistro-for-elasticsearch#256 opendistro-for-elasticsearch#257 opendistro-for-elasticsearch#258 opendistro-for-elasticsearch#259 opendistro-for-elasticsearch#260 opendistro-for-elasticsearch#261 opendistro-for-elasticsearch#262 opendistro-for-elasticsearch#263 opendistro-for-elasticsearch#264 opendistro-for-elasticsearch#265 opendistro-for-elasticsearch#266 opendistro-for-elasticsearch#267 opendistro-for-elasticsearch#268 opendistro-for-elasticsearch#269 This spreadsheet contains the mappings from files to PR number: https://quip-amazon.com/DiHkAmz9oSLu/HC-PR Testing done: 1. Add unit tests except four classes (excluded in build.gradle). Will add them in the later PR. 2. Manual testing passes.
* Add support filtering the data by one categorical variable This PR is a conglomerate of the following PRs. #247 #249 #250 #252 #253 #256 #257 #258 #259 #260 #261 #262 #263 #264 #265 #266 #267 #268 #269 This spreadsheet contains the mappings from files to PR number: https://quip-amazon.com/DiHkAmz9oSLu/HC-PR Testing done: 1. Add unit tests except four classes (excluded in build.gradle). Will add them in the later PR. 2. Manual testing passes.
Note: since there are a lot of dependencies, I only list the main class and test code to save reviewers' time. The build will fail due to missing dependencies. I will use that PR just for review. will not merge it. Will have a big one in the end and merge once after all review PRs get approved.
Issue #, if available:
Description of changes:
The coordinating node computes feature vectors for multiple entities within a time interval through a query, finds model host nodes using consistent hashing, and asks model hosting nodes to infer anomaly grade and confidence of the feature vector.
This PR adds transport actions for model hosting nodes to infer the feature vector's anomaly grade and confidence. Model hosting nodes proceed entity features in a loop. We first check if the entity’s models are in memory. If yes, we use the model to infer anomaly grade and save them. If no, we skip the entity.
Testing done:
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.