-
Notifications
You must be signed in to change notification settings - Fork 36
Add entity model training #247
Add entity model training #247
Conversation
This PR adds entity model training funcitons. When the models are missing in the cache and checkpoint, we need to run queries to get training data and then train models. To collect training data, we sample historical data and linearly interpolate data points between the samples. Specifically, we first note the maximum and minimum timestamp, and sample at most 24 points (with 60 points apart between two neighboring samples) between those minimum and maximum timestamps. Samples can be missing. We only interpolate points between present neighboring samples. We then transform samples and interpolate points to shingles. Finally, full shingles will be used for cold start. Testing done: 1. add unit tests 2. end-to-end testing
Codecov Report
@@ Coverage Diff @@
## master #247 +/- ##
============================================
+ Coverage 71.70% 72.81% +1.10%
- Complexity 1367 1464 +97
============================================
Files 157 164 +7
Lines 6513 6867 +354
Branches 493 533 +40
============================================
+ Hits 4670 5000 +330
- Misses 1610 1615 +5
- Partials 233 252 +19
Flags with carried forward coverage won't be shown. Click here to find out more. |
* @param interpolator Used to generate data points between samples. | ||
* @param searchFeatureDao Used to issue ES queries. | ||
* @param shingleSize The size of a data point window that appear consecutively. | ||
* @param thresholdMinPvalue min P-value for thresholding |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor. indentation is off.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
} | ||
|
||
double[] scores = new double[dataPoints.length]; | ||
Arrays.fill(scores, 0.); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor. this line is not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
} | ||
|
||
EntityModel model = entityState.getModel(); | ||
assert (model != null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not set a new model in entityState?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The model might have already contained samples passed from upstream callers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i mean if model is null, why not set a new model?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea. Changed.
double[] scores = trainRCFModel(continuousDataPoints, modelId, rcf); | ||
allScores.add(scores); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion. using list for train method and addAll to allScores saves much work below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried. I end up with a List and have to convert to Double[], then double[]. Not sure the amount of code is smaller and boxing/unboxing back and forth is not efficient.
entityState.setLastUsedTime(clock.instant()); | ||
|
||
// save to checkpoint | ||
checkpointDao.write(entityState, modelId, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question. rather than saving each model individually, is it more efficient to do batch indexing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did. The write just write to a buffer.
}, listener::onFailure); | ||
|
||
searchFeatureDao | ||
.getColdStartSamplesForPeriods( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are missing values from count/sum/etc aggregations filtered out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am reusing SearchFeatureDao.parseAggregations (https://github.com/opendistro-for-elasticsearch/anomaly-detection/blob/master/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java#L561-L571) to parse each bucket. Missing values results in empty bucket , which should give Optional.empty(), right? If yes, then those will be filtered out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for count/sum/etc on missing data, the value of the bucket should be 0. A model trained with 0s from missing data will treat new non-zero data points as anomalies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, we should have some aggregation to differentiate the two cases: count with default 0 and count with missing value remove.d
ActionListener<List<Optional<double[]>>> getFeaturelistener = ActionListener.wrap(featureSamples -> { | ||
ArrayList<double[]> continuousSampledFeatures = new ArrayList<>(maxTrainSamples); | ||
|
||
// featuresSamples are in ascending order of time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the time ranges in feature query are in descending order. when does the reversing happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
|
||
/** | ||
* TODO: make it work for shingle |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't given data already shingle data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current implementation only gives unshingled data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't line 390/440 coldStartData.add(featureManager.batchShingle(points, entityShingleSize)) already producing shingled data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, the entity state's samples are not shignled data.
*/ | ||
private void combineTrainSamples(List<double[][]> coldstartDatapoints, String modelId, ModelState<EntityModel> entityState) { | ||
EntityModel model = entityState.getModel(); | ||
if (model != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this is null, why not set the model and keep the data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point. Changed.
} | ||
|
||
/** | ||
* TODO: make it work for shingle. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which part doesn't work for shingle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have to consider timestamp of each data point for shingling.
|
||
/** | ||
* TODO: make it work for shingle | ||
* Precondition: we don't have enough training data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question. it there an indicator that the historical data for an entity has been retrieved or attempted to retrieve? if an entitys get only 10 samples, will it get the same 10 samples at a later time? Or, there is no data for an entity, will the system try repeatedly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We won't retry within one hour for an entity. Do you think this is long enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there will be some issues. In the case where samples are scarce, say just 1 data point, due to repeated process, the training data only contains the same data points. If there is no data at all, this process will not end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. If the value is too large, we have long initialization issue. If it is too small, we can waste resources. Given our cold start is rate limited, the latter's impact is not that bad.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the main issue is incorrect training data. When there is 2 data points for an entity in the index, they will be added the training data on the first run. On second run later, the same data points will be added again. And again and again until a number of samples has reached but all samples are just repeats.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a bloom filter to filter such cases. If a point only appears once or twice, it won't trigger cold start.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are three main issues.
- feature query in cold start should not return 0 for count/sum/additive aggregation on missing date. The changes will be in a separate pr.
- cold start process might result into overlapping/duplicate samples. The issue is noted.
- shingling support is not implemented. The issue is noted.
Thanks Lai. Will put those things in to-do list. |
This PR is a conglomerate of the following PRs. opendistro-for-elasticsearch#247 opendistro-for-elasticsearch#249 opendistro-for-elasticsearch#250 opendistro-for-elasticsearch#252 opendistro-for-elasticsearch#253 opendistro-for-elasticsearch#256 opendistro-for-elasticsearch#257 opendistro-for-elasticsearch#258 opendistro-for-elasticsearch#259 opendistro-for-elasticsearch#260 opendistro-for-elasticsearch#261 opendistro-for-elasticsearch#262 opendistro-for-elasticsearch#263 opendistro-for-elasticsearch#264 opendistro-for-elasticsearch#265 opendistro-for-elasticsearch#266 opendistro-for-elasticsearch#267 opendistro-for-elasticsearch#268 opendistro-for-elasticsearch#269 This spreadsheet contains the mappings from files to PR number: https://quip-amazon.com/DiHkAmz9oSLu/HC-PR Testing done: 1. Add unit tests except four classes (excluded in build.gradle). Will add them in the later PR. 2. Manual testing passes.
* Add support filtering the data by one categorical variable This PR is a conglomerate of the following PRs. #247 #249 #250 #252 #253 #256 #257 #258 #259 #260 #261 #262 #263 #264 #265 #266 #267 #268 #269 This spreadsheet contains the mappings from files to PR number: https://quip-amazon.com/DiHkAmz9oSLu/HC-PR Testing done: 1. Add unit tests except four classes (excluded in build.gradle). Will add them in the later PR. 2. Manual testing passes.
Note: since there are a lot of dependencies, I only list the main class and test code to save reviewers' time. The build will fail due to missing dependencies. I will use that PR just for review. will not merge it. Will have a big one in the end and merge once after all review PRs get approved.
Issue #, if available:
Description of changes:
This PR adds entity model training funcitons. When the models are missing in the cache and checkpoint, we need to run queries to get training data and then train models. To collect training data, we sample historical data and linearly interpolate data points between the samples.
Specifically, we first note the maximum and minimum timestamp, and sample at most 24 points (with 60 points apart between two neighboring samples) between those minimum and maximum timestamps. Samples can be missing. We only interpolate points between present neighboring samples. We then transform samples and interpolate points to shingles. Finally, full shingles will be used for cold start.
Testing done:
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.