-
Notifications
You must be signed in to change notification settings - Fork 36
Add multi-entity checkpoints read and write #256
Conversation
We need checkpoints to save states and models on disk. In single-entity detectors, we store rcf and threshold models separately in different docs. In multi-entity detectors, we need to store them together as we don't use distributed models anymore. We also need to store recent sample history when the models are not ready. This PR adds functions to serialize models and samples together in one doc and deserialize them when needed. Also, we bulk indexing multi-entity detectors' checkpoints. Bulk requests will yield much better performance than single-document index requests Testing done: 1. added unit tests. 2. end-to-end testing
Codecov Report
@@ Coverage Diff @@
## master #256 +/- ##
=========================================
Coverage 73.01% 73.01%
Complexity 1461 1461
=========================================
Files 164 164
Lines 6834 6834
Branches 527 527
=========================================
Hits 4990 4990
Misses 1594 1594
Partials 250 250
Flags with carried forward coverage won't be shown. Click here to find out more.
|
src/main/java/com/amazon/opendistroforelasticsearch/ad/ml/CheckpointDao.java
Outdated
Show resolved
Hide resolved
src/main/java/com/amazon/opendistroforelasticsearch/ad/ml/CheckpointDao.java
Show resolved
Hide resolved
src/main/java/com/amazon/opendistroforelasticsearch/ad/ml/CheckpointDao.java
Show resolved
Hide resolved
src/main/java/com/amazon/opendistroforelasticsearch/ad/ml/CheckpointDao.java
Show resolved
Hide resolved
if (indexUtil.doesCheckpointIndexExist()) { | ||
saveModelCheckpointSync(source, modelId); | ||
} else { | ||
indexUtil.initCheckpointIndex(ActionListener.wrap(initResponse -> { | ||
if (initResponse.isAcknowledged()) { | ||
saveModelCheckpointSync(source, modelId); | ||
} else { | ||
throw new RuntimeException("Creating checkpoint with mappings call not acknowledged."); | ||
} | ||
}, exception -> { | ||
if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) { | ||
// It is possible the index has been created while we sending the create request | ||
saveModelCheckpointSync(source, modelId); | ||
} else { | ||
logger.error(String.format("Unexpected error creating index %s", indexName), exception); | ||
} | ||
})); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like code is same as below. you may refactor them into a single method to avoid duplicate code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried, ended up giving up. The difference is the save method: one is calling saveModelCheckpointSync with two parameters, while another is calling saveModelCheckpointAsync with threee parameters. I created a funcitonal interface to consume three parameters (since JDK does not provide one) and make a generic method using functional interface. I gave up because this interface and method are only used once inside the class and the amount of code is more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if have a method called saveModelCheckpoint(source, modelId, isAsync, listener)
, and use isAsync to determine which saveModelCheckpointSync/saveModelCheckpointAsync method to call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea. Changed.
if (indexUtil.doesCheckpointIndexExist()) { | ||
saveModelCheckpointSync(source, modelId); | ||
} else { | ||
indexUtil.initCheckpointIndex(ActionListener.wrap(initResponse -> { | ||
if (initResponse.isAcknowledged()) { | ||
saveModelCheckpointSync(source, modelId); | ||
} else { | ||
throw new RuntimeException("Creating checkpoint with mappings call not acknowledged."); | ||
} | ||
}, exception -> { | ||
if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) { | ||
// It is possible the index has been created while we sending the create request | ||
saveModelCheckpointSync(source, modelId); | ||
} else { | ||
logger.error(String.format("Unexpected error creating index %s", indexName), exception); | ||
} | ||
})); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if have a method called saveModelCheckpoint(source, modelId, isAsync, listener)
, and use isAsync to determine which saveModelCheckpointSync/saveModelCheckpointAsync method to call?
// It is possible the index has been created while we sending the create request | ||
flush(bulkRequest); | ||
} else { | ||
logger.error(String.format("Unexpected error creating index %s", indexName), exception); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In which case we will call write()
method of checkpointDao? Without knowing that, I am not sure whether it is okay to swallow the exception here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we will call it whenever we need to save a checkpoint. Any better option to not swallow it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if we throw the unexpected exception, will it break anything or make any change to existing workflow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not completely sure without testing. I guess the upstream will eventually swallow it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i see. thanks.
src/main/java/com/amazon/opendistroforelasticsearch/ad/ml/CheckpointDao.java
Show resolved
Hide resolved
src/main/java/com/amazon/opendistroforelasticsearch/ad/ml/CheckpointDao.java
Outdated
Show resolved
Hide resolved
src/main/java/com/amazon/opendistroforelasticsearch/ad/ml/CheckpointDao.java
Show resolved
Hide resolved
This PR is a conglomerate of the following PRs. opendistro-for-elasticsearch#247 opendistro-for-elasticsearch#249 opendistro-for-elasticsearch#250 opendistro-for-elasticsearch#252 opendistro-for-elasticsearch#253 opendistro-for-elasticsearch#256 opendistro-for-elasticsearch#257 opendistro-for-elasticsearch#258 opendistro-for-elasticsearch#259 opendistro-for-elasticsearch#260 opendistro-for-elasticsearch#261 opendistro-for-elasticsearch#262 opendistro-for-elasticsearch#263 opendistro-for-elasticsearch#264 opendistro-for-elasticsearch#265 opendistro-for-elasticsearch#266 opendistro-for-elasticsearch#267 opendistro-for-elasticsearch#268 opendistro-for-elasticsearch#269 This spreadsheet contains the mappings from files to PR number: https://quip-amazon.com/DiHkAmz9oSLu/HC-PR Testing done: 1. Add unit tests except four classes (excluded in build.gradle). Will add them in the later PR. 2. Manual testing passes.
* Add support filtering the data by one categorical variable This PR is a conglomerate of the following PRs. #247 #249 #250 #252 #253 #256 #257 #258 #259 #260 #261 #262 #263 #264 #265 #266 #267 #268 #269 This spreadsheet contains the mappings from files to PR number: https://quip-amazon.com/DiHkAmz9oSLu/HC-PR Testing done: 1. Add unit tests except four classes (excluded in build.gradle). Will add them in the later PR. 2. Manual testing passes.
Note: since there are a lot of dependencies, I only list the main class and test code to save reviewers' time. The build will fail due to missing dependencies. I will use that PR just for review. will not merge it. Will have a big one in the end and merge once after all review PRs get approved.
Issue #, if available:
Description of changes:
We need checkpoints to save states and models on disk. In single-entity detectors, we store rcf and threshold models separately in different docs. In multi-entity detectors, we need to store them together as we don't use distributed models anymore. We also need to store recent sample history when the models are not ready.
This PR adds functions to serialize models and samples together in one doc and deserialize them when needed. Also, we bulk indexing multi-entity detectors' checkpoints. Bulk requests will yield much better performance than single-document index requests What's more, I add detectorId field in the checkpoint index to be able to query checkpoints by detector id.
Testing done:
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.