-
-
Notifications
You must be signed in to change notification settings - Fork 644
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stream large blobs to remote cache directly from local cache file #19711
Conversation
None => { | ||
Self::store_lmdb_blob_remote(local, remote_store.store, entry_type, digest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the comment in store_lmdb_blob_remote
now notes, this code makes the assumption that anything in LMDB is small enough to load into memory. AIUI, LARGE_FILE_SIZE_LIMIT
is 512KB, i.e. these blobs should be that large at most.
Do you think that's reasonable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I do think that that is reasonable.
|
||
/// Store the bytes in `bytes` into the remote store, as an optimisation of `store` when the bytes | ||
/// are already in memory | ||
async fn store_bytes(&self, digest: Digest, bytes: Bytes) -> Result<(), String>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's a fair argument that there's no need for store_bytes
and everything can go through store
, using Cursor::new(bytes)
to create an appropriate AsyncRead
from Bytes
.
My thinking is that passing Bytes
in directly saves some memory copies for the batch case, where that object can be splatted into store_bytes_batch
and its batch upload request directly, without copying or slicing or anything (whereas using source
would require reading it into a separate Bytes
).
Maybe that optimisation is irrelevant when this code does network IO anyway, and it'd be better to just have this trait be store
, load
and list_missing_digests
.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAICT, you're right that it is one fewer copy currently to pass in Bytes
. As mentioned in the comment on store_lmdb_blob_remote
though, we used to write in a streaming fashion while blocking a spawn_blocking
task (with block_on
): that meant that we were copying directly from a MMAP into a protobuf to use with gRPC.
It's possible that we could re-introduce that optimization at some point, which would make the streaming batch store_bytes
API superior again. But at the same time, these are fairly small blobs, so the benefits of streaming are definitely reduced.
} | ||
|
||
#[tokio::test] | ||
async fn store_source_read_error_immediately() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of these tests are just copied/adapted from the store_bytes
ones below, except for this one and store_source_read_error_later
which test the new/interesting code path: what happens if the AsyncRead
s fail.
// an arbitrary source (e.g. file) might be small enough to write via the batch API, but we | ||
// ignore that possibility for now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it worth considering this optimisation? This which would mean pulling the entire (sufficiently-small) source
into a Bytes
to be able to call self.store_bytes_batch
, similar to the store_bytes
fn above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It shouldn't be, due to the limitations on when FSDB is used. And right now that is the only source of data (we always capture locally and then upload).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, just noting that the batch API size limit is 4194304 (4 MiB) which is greater than the FSDB limit of 512 KiB, i.e. the REAPI code would be happy enough to upload some moderate FSDB-sourced-files via the batch API (and, if we supported uploading multiple files in a batch, we could upload up to 8 FSDB-sourced files in one batch).
I will thus leave this comment here, but not do any action with it for now.
(While working on this, I noticed #19732. I'll fix that in an independent PR.) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
None => { | ||
Self::store_lmdb_blob_remote(local, remote_store.store, entry_type, digest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I do think that that is reasonable.
|
||
/// Store the bytes in `bytes` into the remote store, as an optimisation of `store` when the bytes | ||
/// are already in memory | ||
async fn store_bytes(&self, digest: Digest, bytes: Bytes) -> Result<(), String>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAICT, you're right that it is one fewer copy currently to pass in Bytes
. As mentioned in the comment on store_lmdb_blob_remote
though, we used to write in a streaming fashion while blocking a spawn_blocking
task (with block_on
): that meant that we were copying directly from a MMAP into a protobuf to use with gRPC.
It's possible that we could re-introduce that optimization at some point, which would make the streaming batch store_bytes
API superior again. But at the same time, these are fairly small blobs, so the benefits of streaming are definitely reduced.
if let Some(ref read_err) = *error_occurred.lock() { | ||
// check if reading `source` locally hit an error: if so, propagate that error (there will | ||
// likely be a remote error too, because our write will be too short, but the local error is | ||
// the interesting root cause) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
// an arbitrary source (e.g. file) might be small enough to write via the batch API, but we | ||
// ignore that possibility for now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It shouldn't be, due to the limitations on when FSDB is used. And right now that is the only source of data (we always capture locally and then upload).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
This (hopefully) optimises storing large blobs to a remote cache, by streaming them directly from the file stored on disk in the "FSDB".
This builds on the FSDB local store work (#18153), relying on large objects being stored as an immutable file on disk, in the cache managed by Pants.
This is an optimisation in several ways:
Store::store_large_blob_remote
would load the whole blob from the local store and then write that to a temporary file. This was appropriate with LMBD-backed blobs.ByteStore::store_buffered
would take that temporary file and mmap it, to be able to slice intoBytes
more efficiently... except this is secretly blocking/sync IO, happening within async tasks (AIUI: when accessing a mmap'd byte that's only on disk, not yet in memory, the whole OS thread is blocked/descheduled while the OS pulls the relevant part of the file into memory, i.e.tokio
can't run another task on that thread).tokio
async IO mechanisms to read the file, and thus hopefully this has higher concurrency.memmap
dependency.)I haven't benchmarked this though.
My main motivation for this is firming up the provider API before adding new byte store providers, for #11149. This also resolves some TODOs and even eliminates some
unsafe
, yay!The commits are individually reviewable.
Fixes #19049, fixes #14341 (
memmap
removed), closes #17234 (solves the same problem but with an approach that wasn't possible at the time).