Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Model storage for 3rd Party models #71323

Merged
merged 15 commits into from
Apr 12, 2021

Conversation

davidkyle
Copy link
Member

@davidkyle davidkyle commented Apr 6, 2021

The models are large binary blobs which need to be split into smaller chunks and base64 encoded as described in #70827. This PR introduces the code for re-assembling the chunks and streaming them to the inference process.

Closes #70827

@davidkyle davidkyle added >feature :ml Machine learning labels Apr 6, 2021
@elasticmachine elasticmachine added the Team:ML Meta label for the ML team label Apr 6, 2021
@elasticmachine
Copy link
Collaborator

Pinging @elastic/ml-core (Team:ML)

* @return The index the model is stored in
*/
public String getIndex() {
return InferenceIndexConstants.LATEST_INDEX_NAME;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be assuming the latest index. It will break as soon as we want to introduce another.

There is also the wider question of whether we want these big external models going into the same index that holds our internal inference documents. At small scale it's OK, but eventually somebody will upload 20GB of external models and we haven't thought through the details of how this index (now a system index) will scale up when required.

I am happy to leave that wider question to another day, so you can just add a TODO for that bit. But please change the overall approach to support patterns and multi-index aliases because otherwise we'll be stuck when scalability and upgrades become requirements.

return;
}

GetResponse got = client.prepareGet(modelStorage.getIndex(), docId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use an IDs search with size 1 across the pattern of indices that might store the state documents. This is what we do in the StateStreamer class:

SearchResponse stateResponse = client.prepareSearch(indexName)
.setSize(1)
.setQuery(QueryBuilders.idsQuery().addIds(stateDocId)).get();

If you look at the history of that class you'll see we started out with using the Get API, then switched to the Search API with an IDs search later on, because it's more robust to indices being split up, upgraded or reindexed. So we should use the more robust approach here from the start.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use case is slightly different compared to AD jobs where the model state is updated over time whereas PyTorch models are static. An AD job might start writing index .ml-state-00001 then over time and successive upgrades move to .ml-state-00003. PyTorch models will be put once and if they are moved or the index deleted then the model storage document will have to be updated. Data streams and ILM are designed for fast changing time series data which I didn't consider a suitable paradigm for stable model storage. Upgraded and reindexed indices would be better handled with index aliases.

There is little difference between an ID search and Get request and the change is simple but so I will make it but for now I'd like to avoid spreading a model over multiple indices a this stage.

ModelStorage:: getIndex() is hard coded for now because there is an ongoing conversation about where to store the models and this fills the gap.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data streams and ILM are designed for fast changing time series data

The maximum recommended shard size is 50GB, and our internal indices have 1 shard. ILM (outside of data streams) is also designed for rolling over indices based on size instead of time.

for now I'd like to avoid spreading a model over multiple indices a this stage

We won't set out to deliberately spread models over multiple indices, but at some point we're going to have to introduce a solution for the problem of the inference index getting too large, and it will be very hard for that solution to reliably keep models in the same index. For example, if we used size based ILM (tricky as the inference index is a system index) then it could roll over in between the documents of a single model. If we used some sort of custom rollover due to it being a system index then that code will be incredibly complicated if it has to avoid rolling over part way through indexing the documents of a single model. And if the eventual solution is to store the models in .ml-state then that is already using size-based ILM, and could very easily roll over in between the documents of a single model.

ModelStorage:: getIndex() is hard coded for now because there is an ongoing conversation about where to store the models and this fills the gap

There is no indication in the code that the way it is now is temporary. Please add a TODO because if it accidentally got left like this when the feature branch is merged to master then for any subsequent problem the person looking into it would be having to use git blame to find this PR to find why it was like that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reworked this PR significantly the ModelStorage class has gone and I'm reusing definitions used in boosted tree model storage. This means the code has to work the same as the existing model loading code in TrainedModelProvider which does support searching for model chunks over multiple indices but has post-processing logic to limit the results to those found in the first index.

@benwtrent can you comment on the bit of code linked above please. Was the reason for this that you were worried that model IDs, and therefore document IDs may not be unique over multiple indices?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preamble: I don't think configuration indices should ever be managed by ILM. The current trained model index (inference config) is not managed my ILM. This adds unnecessary complexity. Honestly, if we are THIS concerned around static model configurations causing shards to increase, we really need to come up with another storage mechanism outside the index. The more and more we talk about "large blob storage in an index" the more it gets complicated, hacky, and unmaintainable.

OK, the reason for this check is that the .ml-inference-* indices are re-created, not updated. Consequently, it is possible to have multiple indices, but a newer index has new settings or mapped fields.

The query that grabs the model definition objects is as follows:

            .setQuery(QueryBuilders.constantScoreQuery(QueryBuilders
                .boolQuery()
                .filter(QueryBuilders.termQuery(TrainedModelConfig.MODEL_ID.getPreferredName(), modelId))
                .filter(QueryBuilders.termQuery(InferenceIndexConstants.DOC_TYPE.getPreferredName(),
                    TrainedModelDefinitionDoc.NAME))))
            .setSize(MAX_NUM_DEFINITION_DOCS)
            // First find the latest index
            .addSort("_index", SortOrder.DESC)
            // Then, sort by doc_num
            .addSort(SortBuilders.fieldSort(TrainedModelDefinitionDoc.DOC_NUM.getPreferredName())
                .order(SortOrder.ASC)
                .unmappedType("long"))
            .request();

So, we sort by indices first and then by the doc num.

Now, consider the situation where a model document does cross multiple indices, how do we know that the document order is guaranteed? It could be that a doc: 0 is in the older index (index A) then a doc: 0 is in the new index (index B), then a doc: 1 is in the old index again.

Now we have a broken compression string. ("doc: 0, index: B" + "doc: 0, index: A" + "doc: 1, index: B")

Preventing the index boundary cross ensures us that the documents are in the correct order. We could make this assurance in another way, but at the time, I didn't want to bother with it and thought that splitting a definition across indices was a bad idea (I still think that).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preamble: I don't think configuration indices should ever be managed by ILM. The current trained model index (inference config) is not managed my ILM. This adds unnecessary complexity. Honestly, if we are THIS concerned around static model configurations causing shards to increase, we really need to come up with another storage mechanism outside the index. The more and more we talk about "large blob storage in an index" the more it gets complicated, hacky, and unmaintainable.

The problem here is that "configuration" documents are usually small and few in number. When configuration documents are like this there is never a need for more than one index. .ml-config is an example of an index like this.

When configuration documents are huge there is a risk of overflowing the 50GB recommended maximum shard size. Arguably the documents in our .ml-state index are "configuration" documents, but they're not in the same index as the small documents that go in .ml-config.

The .ml-inference index is mixing the small and the large types of "configuration" in the same index. Eventually this will result in it blowing the 50GB recommended limit in some large installation in the future. It may be years before this happens but when it does happen it will not be easy to fix.

It may be that in the future we end up incrementing the latest inference index number not only for mappings changes, but also for scalability.

This doesn't mean that it's necessary to allow for splitting definitions across indices though. If the index is going to be managed by bespoke ML code then that bespoke code can enforce that the models aren't split between indices.

One observation about bespoke scalability driven rollover code for complicated scenarios is that we've never got around to it in the past. We have this problem with the ML shared results index. For a long time there has been the idea that we might need to roll it over, or, better still, dynamically switch jobs that are generating large numbers of results to using their own dedicated results indices. On the other hand, we have managed to improve scalability for the ML state indices where the usage pattern was simple enough to just use ILM.

The more and more we talk about "large blob storage in an index" the more it gets complicated, hacky, and unmaintainable.

Unfortunately at the moment we don't have anywhere else to put large blobs. I agree that in the long term it would be nice if Elasticsearch had a blob storage mechanism and then we could use that for all our large opaque binary data storage needs.

Until we have have such functionality we have to use an index, and if we want to just have one index with no possibility for expansion then maybe the solution is to restrict how many models can be stored in it. In other words, reject uploads of new models if the index would exceed a certain size afterwards. This would force users to delete old models before uploading new ones if they were close to the limit. It's not very "elastic", but at least protects against the index becoming unwieldy and causing problems in the cluster.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Firstly, I don't consider it decided that we will use .ml-inference to store PyTorch models. It is used here because I've adapted the existing code but I've argued against it elsewhere and Dave's point about mixing small config and large blobs is a good one.

The other side of the equation is uploading the models which will play a deciding role in where the models are stored and how they are managed. I have not yet run a full end-to-end example of uploading a large model, reassembling it and performing inference, there is much to be learned from doing so. Exploring the various model upload options that have been mooted will also feedback into this design.

@davidkyle
Copy link
Member Author

Boosted Trees store their model definition in TrainedModelDefinitionDocs and it would be better if there was a common solution to the model storage problem.

I will investigate using the existing storage format defined by TrainedModelDefinitionDoc

blockingCall(listener -> client().prepareIndex(InferenceIndexConstants.LATEST_INDEX_NAME)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.setSource(xContentBuilder)
.setId(TrainedModelDefinitionDoc.docId(modelId, 0))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was overwriting the same document rather than writing multiple docs. The code it was testing is sound but fixing this meant changing the test logic

@@ -136,6 +137,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(TrainedModelConfig.MODEL_ID.getPreferredName(), modelId);
builder.field(DOC_NUM.getPreferredName(), docNum);
builder.field(DEFINITION_LENGTH.getPreferredName(), definitionLength);
if (totalDefinitionLength != null) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mappings exists for this field but it was not being written. I added a test


private static final Logger logger = LogManager.getLogger(ChunkedTrainedModelRestorer.class);

private static final int MAX_NUM_DEFINITION_DOCS = 20;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a low value for a max search size but the Boosted tree models are in 16MB chunks so even 20 is a lot of data.

TrainedModelProvider::MAX_NUM_DEFINITION_DOCS = 100 is a lot for a single search (100 * 16MB) but not large enough to break ES. Depending on the model chunk size this max search size value may be tweaked

Copy link
Member

@benwtrent benwtrent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am against calling the pytorch documents a "state". They are a "configuration". But that is a naming quibble.

I do think they are more akin to a model configuration object than a continuously changing "current model state" idea.

Comment on lines +142 to +160
for (SearchHit hit : searchResponse.getHits().getHits()) {
try {
TrainedModelDefinitionDoc doc =
parseModelDefinitionDocLenientlyFromSource(hit.getSourceRef(), modelId, xContentRegistry);
lastNum = doc.getDocNum();

boolean continueSearching = modelConsumer.apply(doc);
if (continueSearching == false) {
// signal we are finished
successConsumer.accept(Boolean.FALSE);
return;
}

} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] error writing model definition", modelId), e);
errorConsumer.accept(e);
return;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to make sure that these docs don't spread across numerous indices. Or, we make sure that they are in the correct order.

Given the query, if these docs spread numerous indices, we don't have guarantees that the docs are in the right order.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test doesn't cover the failure scenario:

doc 0 in index 2
doc 0 in index 1
doc 1 in index 2

This may not be possible given the current APIs, but that is the scenario which I was concerned with.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the query filters on model_id how could there be 2 doc 0s for a given model? Isn't there a check against creating 2 models with the same name?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, there are checks everywhere. This is simply another one.

* @param listener error and success listener
*/
public void writeStateToStream(String modelId, OutputStream restoreStream, ActionListener<Boolean> listener) {
ChunkedTrainedModelRestorer restorer = new ChunkedTrainedModelRestorer(modelId, client, executorService, xContentRegistry);
Copy link
Contributor

@droberts195 droberts195 Apr 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should set the search size to 1 here.

At the moment with the default search size of 10:

  • We load 160MB of UTF-8 base64 encoded strings (10 * 16MB).
  • Then we parse one of these into a UTF-16 base64 encoded string, so that's an extra 32MB.
  • While those two formats are held in memory, we then convert back to UTF-8 in a way that doesn't reuse the original UTF-8 representation, so that's an extra 16MB.
  • Then we create the raw bytes, which on average would be about 1/4 of the size, so 4MB.

So all that adds to 212MB.

ML nodes in Cloud have JVM heaps capped at 2GB, so that's a big chunk to be using for a single operation.

For the anomaly detector model state we take care not to hold the same data in multiple formats simultaneously:

  • We load one doc at a time
  • We never convert the UTF-8 base64 encoded data to UTF-16
  • We stream it to the C++ which has a streaming base64 decoder so also never holds the full data in two formats

So in this case we only use the 16MB of a single model state document in UTF-8 format.

A sliding scale of simplicity and return on investment for reducing the memory usage of restoration is:

  1. Search for 1 document at a time
  2. Change TrainedModelDefinition so that it stores the model definition in a byte[] containing base64 UTF-8 bytes rather than UTF-16 String
  3. Change TrainedModelDefinition so that it stores the model definition in a raw byte[] rather than UTF-16 String (decoded directly from the UTF-8 of _source rather than going via an intermediate temporary String)

2 and 3 are non-trivial pieces of work but 1 is a one-liner so could be added to this PR.

I realise we already have this problem with the TrainedModelProvider class. I expect we've never hit it in practice because our boosted tree models are in reality considerably smaller than the maximum. But these PyTorch models seem to be bigger more often, so it would be good to consider memory efficiency from the start.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

2 might actually be a viable option. XContentGenerator has a writeBinary(byte[] value) method which calls a Jackson writeBinary(byte[] value) which in turn base64 encodes the bytes. XContentParser does the reverse with byte[] binaryValue(). For BWC the binary data could be written to a new field and when parsing the internal content would be created from either the old string field or the new binary one.

There is a complication that when the model is PUT it is already base64 encoded and would have to be un-encoded before storage otherwise it would be double base64 encoded. The savings would be significant though and worth investigating for a follow up PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good idea as well!

Wish I thought of it 🤦

Copy link
Contributor

@droberts195 droberts195 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is going to the feature branch I am happy to merge this as-is and iterate on the more tricky decisions in future PRs.

@davidkyle davidkyle mentioned this pull request Apr 12, 2021
1 task
@davidkyle davidkyle merged commit 52df9ad into elastic:feature/pytorch-inference Apr 12, 2021
@davidkyle davidkyle deleted the model-storage branch April 12, 2021 19:00
davidkyle added a commit that referenced this pull request Jun 3, 2021
The feature branch contains changes to configure PyTorch models with a 
TrainedModelConfig and defines a format to store the binary models. 
The _start and _stop deployment actions control the model lifecycle 
and the model can be directly evaluated with the _infer endpoint. 
2 Types of NLP tasks are supported: Named Entity Recognition and Fill Mask.

The feature branch consists of these PRs: #73523, #72218, #71679
#71323, #71035, #71177, #70713
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>feature :ml Machine learning Team:ML Meta label for the ML team
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants