Skip to content

Commit

Permalink
Prepare store::remote::ByteStore for other providers (#19424)
Browse files Browse the repository at this point in the history
This does preparatory refactoring towards #11149 (and probably also
#19049), by adjusting `store::remote::ByteStore` in a few ways to make
it easier to plop in new 'providers':

- package the various options for creating a provider into a struct, and
a bunch of mechanical refactoring to use that struct
- improved tests:
  - explicit tests for the REAPI provider
  - more extensive exercising of the `ByteStore` interface, including
extra tests for combinations like "`load_file` when digest is missing",
and (to me) more logical test names
  - testing the 'normal' `ByteStore` interface via fully in-memory simple
`Provider` instead of needing to run the `StubCAS`
  - testing some more things at lower levels, like the REAPI provider
doing hash verification, the `LoadDestination::reset` methods doing the
right thing and `ByteStore::store_buffered`

The commits are mostly individually reviewable, although I'd recommend
having a sense of the big picture as described above before going
through them one-by-one.

After this, the next steps towards #11149 will be:

1. do something similar for the action cache
2. implement new providers, with some sort of factory function for going
from `RemoteOptions` to an appropriate `Arc<dyn ByteStoreProvider +
'static>`
3. expose settings to select and configure those new providers
  • Loading branch information
huonw authored Jul 12, 2023
1 parent 0392d37 commit 6d03494
Show file tree
Hide file tree
Showing 13 changed files with 736 additions and 503 deletions.
24 changes: 12 additions & 12 deletions src/rust/engine/fs/brfs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use log::{debug, error, warn};
use parking_lot::Mutex;
use protos::gen::build::bazel::remote::execution::v2 as remexec;
use protos::require_digest;
use store::{Store, StoreError};
use store::{RemoteOptions, Store, StoreError};
use tokio::signal::unix::{signal, SignalKind};
use tokio::task;
use tokio_stream::wrappers::SignalStream;
Expand Down Expand Up @@ -767,22 +767,22 @@ async fn main() {
Store::local_only(runtime.clone(), store_path).expect("Error making local store.");
let store = match args.value_of("server-address") {
Some(address) => local_only_store
.into_with_remote(
address,
args.value_of("remote-instance-name").map(str::to_owned),
tls::Config::new_without_mtls(root_ca_certs),
.into_with_remote(RemoteOptions {
cas_address: address.to_owned(),
instance_name: args.value_of("remote-instance-name").map(str::to_owned),
tls_config: tls::Config::new_without_mtls(root_ca_certs),
headers,
4 * 1024 * 1024,
std::time::Duration::from_secs(5 * 60),
1,
args
chunk_size_bytes: 4 * 1024 * 1024,
rpc_timeout: std::time::Duration::from_secs(5 * 60),
rpc_retries: 1,
rpc_concurrency_limit: args
.value_of_t::<usize>("rpc-concurrency-limit")
.expect("Bad rpc-concurrency-limit flag"),
None,
args
capabilities_cell_opt: None,
batch_api_size_limit: args
.value_of_t::<usize>("batch-api-size-limit")
.expect("Bad batch-api-size-limit flag"),
)
})
.expect("Error making remote store"),
None => local_only_store,
};
Expand Down
25 changes: 13 additions & 12 deletions src/rust/engine/fs/fs_util/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ use protos::require_digest;
use serde_derive::Serialize;
use std::collections::{BTreeMap, BTreeSet};
use store::{
Snapshot, SnapshotOps, Store, StoreError, StoreFileByDigest, SubsetParams, UploadSummary,
RemoteOptions, Snapshot, SnapshotOps, Store, StoreError, StoreFileByDigest, SubsetParams,
UploadSummary,
};
use workunit_store::WorkunitStore;

Expand Down Expand Up @@ -342,7 +343,7 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> {
.map_err(|e| format!("Failed to open/create store for directory {store_dir:?}: {e}"))?;
let (store_result, store_has_remote) = match top_match.value_of("server-address") {
Some(cas_address) => {
let chunk_size = top_match
let chunk_size_bytes = top_match
.value_of_t::<usize>("chunk-bytes")
.expect("Bad chunk-bytes flag");

Expand Down Expand Up @@ -411,31 +412,31 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> {
}

(
local_only.into_with_remote(
cas_address,
top_match
local_only.into_with_remote(RemoteOptions {
cas_address: cas_address.to_owned(),
instance_name: top_match
.value_of("remote-instance-name")
.map(str::to_owned),
tls_config,
headers,
chunk_size,
chunk_size_bytes,
// This deadline is really only in place because otherwise DNS failures
// leave this hanging forever.
//
// Make fs_util have a very long deadline (because it's not configurable,
// like it is inside pants).
Duration::from_secs(30 * 60),
top_match
rpc_timeout: Duration::from_secs(30 * 60),
rpc_retries: top_match
.value_of_t::<usize>("rpc-attempts")
.expect("Bad rpc-attempts flag"),
top_match
rpc_concurrency_limit: top_match
.value_of_t::<usize>("rpc-concurrency-limit")
.expect("Bad rpc-concurrency-limit flag"),
None,
top_match
capabilities_cell_opt: None,
batch_api_size_limit: top_match
.value_of_t::<usize>("batch-api-size-limit")
.expect("Bad batch-api-size-limit flag"),
),
}),
true,
)
}
Expand Down
32 changes: 6 additions & 26 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ mod snapshot_ops_tests;
mod snapshot_tests;
pub use crate::snapshot_ops::{SnapshotOps, SubsetParams};

use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::collections::{BTreeSet, HashMap, HashSet};
use std::fmt::{self, Debug, Display};
use std::fs::OpenOptions;
use std::fs::Permissions as FSPermissions;
Expand All @@ -64,7 +64,7 @@ use parking_lot::Mutex;
use prost::Message;
use protos::gen::build::bazel::remote::execution::v2 as remexec;
use protos::require_digest;
use remexec::{ServerCapabilities, Tree};
use remexec::Tree;
use serde_derive::Serialize;
use sharded_lmdb::DEFAULT_LEASE_TIME;
#[cfg(target_os = "macos")]
Expand All @@ -84,6 +84,7 @@ mod local;
pub mod local_tests;

mod remote;
pub use remote::RemoteOptions;
#[cfg(test)]
mod remote_tests;

Expand Down Expand Up @@ -372,32 +373,11 @@ impl Store {
/// Add remote storage to a Store. If it is missing a value which it tries to load, it will
/// attempt to back-fill its local storage from the remote storage.
///
pub fn into_with_remote(
self,
cas_address: &str,
instance_name: Option<String>,
tls_config: grpc_util::tls::Config,
headers: BTreeMap<String, String>,
chunk_size_bytes: usize,
rpc_timeout: Duration,
rpc_retries: usize,
rpc_concurrency_limit: usize,
capabilities_cell_opt: Option<Arc<OnceCell<ServerCapabilities>>>,
batch_api_size_limit: usize,
) -> Result<Store, String> {
pub fn into_with_remote(self, remote_options: RemoteOptions) -> Result<Store, String> {
Ok(Store {
local: self.local,
remote: Some(RemoteStore::new(remote::ByteStore::new(
cas_address,
instance_name,
tls_config,
headers,
chunk_size_bytes,
rpc_timeout,
rpc_retries,
rpc_concurrency_limit,
capabilities_cell_opt,
batch_api_size_limit,
remote: Some(RemoteStore::new(remote::ByteStore::from_options(
remote_options,
)?)),
immutable_inputs_base: self.immutable_inputs_base,
})
Expand Down
57 changes: 29 additions & 28 deletions src/rust/engine/fs/store/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use workunit_store::{in_workunit, ObservationMetric};
use crate::StoreError;

mod reapi;
#[cfg(test)]
mod reapi_tests;

pub type ByteSource = Arc<(dyn Fn(Range<usize>) -> Bytes + Send + Sync + 'static)>;

Expand All @@ -41,6 +43,21 @@ pub trait ByteStoreProvider: Sync + Send + 'static {
fn chunk_size_bytes(&self) -> usize;
}

// TODO: Consider providing `impl Default`, similar to `super::LocalOptions`.
#[derive(Clone)]
pub struct RemoteOptions {
pub cas_address: String,
pub instance_name: Option<String>,
pub headers: BTreeMap<String, String>,
pub tls_config: grpc_util::tls::Config,
pub chunk_size_bytes: usize,
pub rpc_timeout: Duration,
pub rpc_retries: usize,
pub rpc_concurrency_limit: usize,
pub capabilities_cell_opt: Option<Arc<OnceCell<ServerCapabilities>>>,
pub batch_api_size_limit: usize,
}

#[derive(Clone)]
pub struct ByteStore {
instance_name: Option<String>,
Expand Down Expand Up @@ -77,36 +94,20 @@ impl LoadDestination for Vec<u8> {
}

impl ByteStore {
// TODO: Consider extracting these options to a struct with `impl Default`, similar to
// `super::LocalOptions`.
pub fn new(
cas_address: &str,
instance_name: Option<String>,
tls_config: grpc_util::tls::Config,
headers: BTreeMap<String, String>,
chunk_size_bytes: usize,
rpc_timeout: Duration,
rpc_retries: usize,
rpc_concurrency_limit: usize,
capabilities_cell_opt: Option<Arc<OnceCell<ServerCapabilities>>>,
batch_api_size_limit: usize,
) -> Result<ByteStore, String> {
let provider = Arc::new(reapi::Provider::new(
cas_address,
instance_name.clone(),
tls_config,
headers,
chunk_size_bytes,
rpc_timeout,
rpc_retries,
rpc_concurrency_limit,
capabilities_cell_opt,
batch_api_size_limit,
)?);
Ok(ByteStore {
provider: Arc<dyn ByteStoreProvider + 'static>,
) -> ByteStore {
ByteStore {
instance_name,
provider,
})
}
}

pub fn from_options(options: RemoteOptions) -> Result<ByteStore, String> {
let instance_name = options.instance_name.clone();
let provider = Arc::new(reapi::Provider::new(options)?);
Ok(ByteStore::new(instance_name, provider))
}

pub(crate) fn chunk_size_bytes(&self) -> usize {
Expand All @@ -116,10 +117,10 @@ impl ByteStore {
pub async fn store_buffered<WriteToBuffer, WriteResult>(
&self,
digest: Digest,
mut write_to_buffer: WriteToBuffer,
write_to_buffer: WriteToBuffer,
) -> Result<(), StoreError>
where
WriteToBuffer: FnMut(std::fs::File) -> WriteResult,
WriteToBuffer: FnOnce(std::fs::File) -> WriteResult,
WriteResult: Future<Output = Result<(), StoreError>>,
{
let write_buffer = tempfile::tempfile().map_err(|e| {
Expand Down
48 changes: 22 additions & 26 deletions src/rust/engine/fs/store/src/remote/reapi.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright 2023 Pants project contributors (see CONTRIBUTORS.md).
// Licensed under the Apache License, Version 2.0 (see LICENSE).
use std::cmp::min;
use std::collections::{BTreeMap, HashSet};
use std::collections::HashSet;
use std::convert::TryInto;
use std::fmt;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Instant;

use async_oncecell::OnceCell;
use async_trait::async_trait;
Expand All @@ -27,6 +27,8 @@ use tokio::sync::Mutex;
use tonic::{Code, Request, Status};
use workunit_store::{Metric, ObservationMetric};

use crate::RemoteOptions;

use super::{ByteSource, ByteStoreProvider, LoadDestination};

pub struct Provider {
Expand Down Expand Up @@ -64,32 +66,24 @@ impl std::error::Error for ByteStoreError {}
impl Provider {
// TODO: Consider extracting these options to a struct with `impl Default`, similar to
// `super::LocalOptions`.
pub fn new(
cas_address: &str,
instance_name: Option<String>,
tls_config: grpc_util::tls::Config,
mut headers: BTreeMap<String, String>,
chunk_size_bytes: usize,
rpc_timeout: Duration,
rpc_retries: usize,
rpc_concurrency_limit: usize,
capabilities_cell_opt: Option<Arc<OnceCell<ServerCapabilities>>>,
batch_api_size_limit: usize,
) -> Result<Provider, String> {
let tls_client_config = if cas_address.starts_with("https://") {
Some(tls_config.try_into()?)
pub fn new(mut options: RemoteOptions) -> Result<Provider, String> {
let tls_client_config = if options.cas_address.starts_with("https://") {
Some(options.tls_config.try_into()?)
} else {
None
};

let endpoint =
grpc_util::create_endpoint(cas_address, tls_client_config.as_ref(), &mut headers)?;
let http_headers = headers_to_http_header_map(&headers)?;
let endpoint = grpc_util::create_endpoint(
&options.cas_address,
tls_client_config.as_ref(),
&mut options.headers,
)?;
let http_headers = headers_to_http_header_map(&options.headers)?;
let channel = layered_service(
tonic::transport::Channel::balance_list(vec![endpoint].into_iter()),
rpc_concurrency_limit,
options.rpc_concurrency_limit,
http_headers,
Some((rpc_timeout, Metric::RemoteStoreRequestTimeouts)),
Some((options.rpc_timeout, Metric::RemoteStoreRequestTimeouts)),
);

let byte_stream_client = Arc::new(ByteStreamClient::new(channel.clone()));
Expand All @@ -99,14 +93,16 @@ impl Provider {
let capabilities_client = Arc::new(CapabilitiesClient::new(channel));

Ok(Provider {
instance_name,
chunk_size_bytes,
_rpc_attempts: rpc_retries + 1,
instance_name: options.instance_name,
chunk_size_bytes: options.chunk_size_bytes,
_rpc_attempts: options.rpc_retries + 1,
byte_stream_client,
cas_client,
capabilities_cell: capabilities_cell_opt.unwrap_or_else(|| Arc::new(OnceCell::new())),
capabilities_cell: options
.capabilities_cell_opt
.unwrap_or_else(|| Arc::new(OnceCell::new())),
capabilities_client,
batch_api_size_limit,
batch_api_size_limit: options.batch_api_size_limit,
})
}

Expand Down
Loading

0 comments on commit 6d03494

Please sign in to comment.