Skip to content

Commit

Permalink
Stream large blobs to remote cache directly from local cache file (#1…
Browse files Browse the repository at this point in the history
…9711)

This (hopefully) optimises storing large blobs to a remote cache, by
streaming them directly from the file stored on disk in the "FSDB".

This builds on the FSDB local store work (#18153), relying on large
objects being stored as an immutable file on disk, in the cache managed
by Pants.

This is an optimisation in several ways:

- Cutting out an extra temporary file:
- Previously `Store::store_large_blob_remote` would load the whole blob
from the local store and then write that to a temporary file. This was
appropriate with LMBD-backed blobs.
- With new FSDB, there's already a file that can be used, no need for
that temporary, and so the file creation and writing overhead can be
eliminated .
- Reducing sync IO in async tasks, due to mmap:
- Previously `ByteStore::store_buffered` would take that temporary file
and mmap it, to be able to slice into `Bytes` more efficiently... except
this is secretly blocking/sync IO, happening within async tasks (AIUI:
when accessing a mmap'd byte that's only on disk, not yet in memory, the
whole OS thread is blocked/descheduled while the OS pulls the relevant
part of the file into memory, i.e. `tokio` can't run another task on
that thread).
- This new approach uses normal `tokio` async IO mechanisms to read the
file, and thus hopefully this has higher concurrency.
  - (This also eliminates the unmaintained `memmap` dependency.)

I haven't benchmarked this though.

My main motivation for this is firming up the provider API before adding
new byte store providers, for #11149. This also resolves some TODOs and
even eliminates some `unsafe`, yay!

The commits are individually reviewable.

Fixes #19049, fixes #14341 (`memmap` removed), closes #17234 (solves the
same problem but with an approach that wasn't possible at the time).
  • Loading branch information
huonw authored Sep 12, 2023
1 parent be5836a commit b625f09
Show file tree
Hide file tree
Showing 12 changed files with 380 additions and 225 deletions.
13 changes: 2 additions & 11 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion src/rust/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ lmdb-rkv = { git = "https://github.com/pantsbuild/lmdb-rs.git", rev = "6ae7a552a
log = "0.4.17"
madvise = "0.1"
maplit = "1.0.1"
memmap = "0.7"
nails = "0.13"
nix = "0.25"
notify = { git = "https://github.com/pantsbuild/notify", rev = "276af0f3c5f300bfd534941386ba2f3b3a022df7" }
Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/fs/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ itertools = { workspace = true }
lmdb-rkv = { workspace = true }
log = { workspace = true }
madvise = { workspace = true }
memmap = { workspace = true }
parking_lot = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
Expand All @@ -36,6 +35,7 @@ task_executor = { path = "../../task_executor" }
tempfile = { workspace = true }
tokio-rustls = { workspace = true }
tokio = { workspace = true, features = ["fs"] }
tokio-util = { workspace = true, features = ["io"] }
tonic = { workspace = true, features = ["transport", "codegen", "tls", "tls-roots", "prost"] }
tower-service = { workspace = true }
tryfuture = { path = "../../tryfuture" }
Expand Down
52 changes: 20 additions & 32 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -809,15 +809,16 @@ impl Store {
remote_store
.clone()
.maybe_upload(digest, async move {
// TODO(John Sirois): Consider allowing configuration of when to buffer large blobs
// to disk to be independent of the remote store wire chunk size.
if digest.size_bytes > remote_store.store.chunk_size_bytes() {
Self::store_large_blob_remote(local, remote_store.store, entry_type, digest)
.await
} else {
Self::store_small_blob_remote(local, remote_store.store, entry_type, digest)
.await
}
match local.load_from_fs(digest).await? {
Some(path) => {
Self::store_fsdb_blob_remote(remote_store.store, digest, path).await?
}
None => {
Self::store_lmdb_blob_remote(local, remote_store.store, entry_type, digest)
.await?
}
};
Ok(())
})
.await
}
Expand All @@ -840,15 +841,17 @@ impl Store {
.boxed()
}

async fn store_small_blob_remote(
async fn store_lmdb_blob_remote(
local: local::ByteStore,
remote: remote::ByteStore,
entry_type: EntryType,
digest: Digest,
) -> Result<(), StoreError> {
// We need to copy the bytes into memory so that they may be used safely in an async
// future. While this unfortunately increases memory consumption, we prioritize
// being able to run `remote.store_bytes()` as async.
// being able to run `remote.store_bytes()` as async. In addition, this is only used
// for blobs in the LMDB store, most of which are small: large blobs end up in the
// FSDB store.
//
// See https://github.com/pantsbuild/pants/pull/9793 for an earlier implementation
// that used `Executor.block_on`, which avoided the clone but was blocking.
Expand All @@ -866,31 +869,16 @@ impl Store {
}
}

async fn store_large_blob_remote(
local: local::ByteStore,
async fn store_fsdb_blob_remote(
remote: remote::ByteStore,
entry_type: EntryType,
digest: Digest,
path: PathBuf,
) -> Result<(), StoreError> {
remote
.store_buffered(digest, |mut buffer| async {
let result = local
.load_bytes_with(entry_type, digest, move |bytes| {
buffer.write_all(bytes).map_err(|e| {
format!("Failed to write {entry_type:?} {digest:?} to temporary buffer: {e}")
})
})
.await?;
match result {
None => Err(StoreError::MissingDigest(
format!("Failed to upload {entry_type:?}: Not found in local store",),
digest,
)),
Some(Err(err)) => Err(err.into()),
Some(Ok(())) => Ok(()),
}
})
let file = tokio::fs::File::open(&path)
.await
.map_err(|e| format!("failed to read {digest:?} from {path:?}: {e}"))?;
remote.store_file(digest, file).await?;
Ok(())
}

///
Expand Down
94 changes: 31 additions & 63 deletions src/rust/engine/fs/store/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Licensed under the Apache License, Version 2.0 (see LICENSE).
use std::collections::{BTreeMap, HashSet};
use std::fmt;
use std::ops::Range;
use std::sync::Arc;
use std::time::{Duration, Instant};

Expand All @@ -14,33 +13,36 @@ use hashing::Digest;
use log::Level;
use protos::gen::build::bazel::remote::execution::v2 as remexec;
use remexec::ServerCapabilities;
use tokio::fs::File;
use tokio::io::{AsyncSeekExt, AsyncWrite};
use workunit_store::{in_workunit, ObservationMetric};

use crate::StoreError;

mod reapi;
#[cfg(test)]
mod reapi_tests;

pub type ByteSource = Arc<(dyn Fn(Range<usize>) -> Bytes + Send + Sync + 'static)>;

#[async_trait]
pub trait ByteStoreProvider: Sync + Send + 'static {
async fn store_bytes(&self, digest: Digest, bytes: ByteSource) -> Result<(), String>;
/// Store the bytes readable from `file` into the remote store
async fn store_file(&self, digest: Digest, file: File) -> Result<(), String>;

/// Store the bytes in `bytes` into the remote store, as an optimisation of `store_file` when the
/// bytes are already in memory
async fn store_bytes(&self, digest: Digest, bytes: Bytes) -> Result<(), String>;

/// Load the data stored (if any) in the remote store for `digest` into `destination`. Returns
/// true when found, false when not.
async fn load(
&self,
digest: Digest,
destination: &mut dyn LoadDestination,
) -> Result<bool, String>;

/// Return any digests from `digests` that are not (currently) available in the remote store.
async fn list_missing_digests(
&self,
digests: &mut (dyn Iterator<Item = Digest> + Send),
) -> Result<HashSet<Digest>, String>;

fn chunk_size_bytes(&self) -> usize;
}

// TODO: Consider providing `impl Default`, similar to `super::LocalOptions`.
Expand Down Expand Up @@ -110,74 +112,40 @@ impl ByteStore {
Ok(ByteStore::new(instance_name, provider))
}

pub(crate) fn chunk_size_bytes(&self) -> usize {
self.provider.chunk_size_bytes()
}

pub async fn store_buffered<WriteToBuffer, WriteResult>(
&self,
digest: Digest,
write_to_buffer: WriteToBuffer,
) -> Result<(), StoreError>
where
WriteToBuffer: FnOnce(std::fs::File) -> WriteResult,
WriteResult: Future<Output = Result<(), StoreError>>,
{
let write_buffer = tempfile::tempfile().map_err(|e| {
format!("Failed to create a temporary blob upload buffer for {digest:?}: {e}")
})?;
let read_buffer = write_buffer.try_clone().map_err(|e| {
format!("Failed to create a read handle for the temporary upload buffer for {digest:?}: {e}")
})?;
write_to_buffer(write_buffer).await?;

// Unsafety: Mmap presents an immutable slice of bytes, but the underlying file that is mapped
// could be mutated by another process. We guard against this by creating an anonymous
// temporary file and ensuring it is written to and closed via the only other handle to it in
// the code just above.
let mmap = Arc::new(unsafe {
let mapping = memmap::Mmap::map(&read_buffer).map_err(|e| {
format!("Failed to memory map the temporary file buffer for {digest:?}: {e}")
})?;
if let Err(err) = madvise::madvise(
mapping.as_ptr(),
mapping.len(),
madvise::AccessPattern::Sequential,
) {
log::warn!(
"Failed to madvise(MADV_SEQUENTIAL) for the memory map of the temporary file buffer for \
{digest:?}. Continuing with possible reduced performance: {err}",
digest = digest,
err = err
)
}
Ok(mapping) as Result<memmap::Mmap, String>
}?);

/// Store the bytes readable from `file` into the remote store
pub async fn store_file(&self, digest: Digest, file: File) -> Result<(), String> {
self
.store_bytes_source(
digest,
Arc::new(move |range| Bytes::copy_from_slice(&mmap[range])),
)
.await?;

Ok(())
.store_tracking("store", digest, || self.provider.store_file(digest, file))
.await
}

/// Store the bytes in `bytes` into the remote store, as an optimisation of `store_file` when the
/// bytes are already in memory
pub async fn store_bytes(&self, bytes: Bytes) -> Result<(), String> {
let digest = Digest::of_bytes(&bytes);
self
.store_bytes_source(digest, Arc::new(move |range| bytes.slice(range)))
.store_tracking("store_bytes", digest, || {
self.provider.store_bytes(digest, bytes)
})
.await
}

async fn store_bytes_source(&self, digest: Digest, bytes: ByteSource) -> Result<(), String> {
async fn store_tracking<DoStore, Fut>(
&self,
workunit: &'static str,
digest: Digest,
do_store: DoStore,
) -> Result<(), String>
where
DoStore: FnOnce() -> Fut + Send,
Fut: Future<Output = Result<(), String>> + Send,
{
in_workunit!(
"store_bytes",
workunit,
Level::Trace,
desc = Some(format!("Storing {digest:?}")),
|workunit| async move {
let result = self.provider.store_bytes(digest, bytes).await;
let result = do_store().await;

if result.is_ok() {
workunit.record_observation(
Expand Down
Loading

0 comments on commit b625f09

Please sign in to comment.