Skip to content

Commit

Permalink
Prepare remote::remote_cache::CommandRunner for other providers (#19459)
Browse files Browse the repository at this point in the history
This does preparatory refactoring towards
#11149, by adjusting
`remote::remote_cache::CommandRunner` in a few ways to make it easier to
plop in new 'providers':

- package the various options for creating a provider/command runner
into structs, and a bunch of mechanical refactoring to use those structs
- explicit tests for the REAPI provider

This continues #19424, but, unlike that one, doesn't refactor
`remote_cache_tests.rs` to (mostly) use in-memory providers, as those
tests are significantly more complicated, with many more services than
just the get/update caching provider and I don't think it strictly
blocks #11149.

After this, the next steps towards #11149 will be:

1. implement new providers, with some sort of factory function for
constructing the appropriate provider
2. expose settings in `pants.toml` to select and configure those new
providers
  • Loading branch information
huonw authored Jul 19, 2023
1 parent d8c4df3 commit 237d7ab
Show file tree
Hide file tree
Showing 7 changed files with 281 additions and 106 deletions.
79 changes: 51 additions & 28 deletions src/rust/engine/process_execution/remote/src/remote_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use process_execution::{
use process_execution::{make_execute_request, EntireExecuteRequest};

mod reapi;
#[cfg(test)]
mod reapi_tests;

#[derive(Clone, Copy, Debug, strum_macros::EnumString)]
#[strum(serialize_all = "snake_case")]
Expand All @@ -39,7 +41,7 @@ pub enum RemoteCacheWarningsBehavior {
/// This `ActionCacheProvider` trait captures the operations required to be able to cache command
/// executions remotely.
#[async_trait]
trait ActionCacheProvider: Sync + Send + 'static {
pub trait ActionCacheProvider: Sync + Send + 'static {
async fn update_action_result(
&self,
action_digest: Digest,
Expand All @@ -53,6 +55,29 @@ trait ActionCacheProvider: Sync + Send + 'static {
) -> Result<Option<ActionResult>, String>;
}

#[derive(Clone)]
pub struct RemoteCacheProviderOptions {
pub instance_name: Option<String>,
pub action_cache_address: String,
pub root_ca_certs: Option<Vec<u8>>,
pub headers: BTreeMap<String, String>,
pub concurrency_limit: usize,
pub rpc_timeout: Duration,
}

pub struct RemoteCacheRunnerOptions {
pub inner: Arc<dyn process_execution::CommandRunner>,
pub instance_name: Option<String>,
pub process_cache_namespace: Option<String>,
pub executor: task_executor::Executor,
pub store: Store,
pub cache_read: bool,
pub cache_write: bool,
pub warnings_behavior: RemoteCacheWarningsBehavior,
pub cache_content_behavior: CacheContentBehavior,
pub append_only_caches_base_path: Option<String>,
}

/// This `CommandRunner` implementation caches results remotely using the Action Cache service
/// of the Remote Execution API.
///
Expand Down Expand Up @@ -80,32 +105,21 @@ pub struct CommandRunner {

impl CommandRunner {
pub fn new(
inner: Arc<dyn process_execution::CommandRunner>,
instance_name: Option<String>,
process_cache_namespace: Option<String>,
executor: task_executor::Executor,
store: Store,
action_cache_address: &str,
root_ca_certs: Option<Vec<u8>>,
headers: BTreeMap<String, String>,
cache_read: bool,
cache_write: bool,
warnings_behavior: RemoteCacheWarningsBehavior,
cache_content_behavior: CacheContentBehavior,
concurrency_limit: usize,
rpc_timeout: Duration,
append_only_caches_base_path: Option<String>,
) -> Result<Self, String> {
let provider = Arc::new(reapi::Provider::new(
instance_name.clone(),
action_cache_address,
root_ca_certs,
headers,
concurrency_limit,
rpc_timeout,
)?);

Ok(CommandRunner {
RemoteCacheRunnerOptions {
inner,
instance_name,
process_cache_namespace,
executor,
store,
cache_read,
cache_write,
warnings_behavior,
cache_content_behavior,
append_only_caches_base_path,
}: RemoteCacheRunnerOptions,
provider: Arc<dyn ActionCacheProvider + 'static>,
) -> Self {
CommandRunner {
inner,
instance_name,
process_cache_namespace,
Expand All @@ -119,7 +133,16 @@ impl CommandRunner {
warnings_behavior,
read_errors_counter: Arc::new(Mutex::new(BTreeMap::new())),
write_errors_counter: Arc::new(Mutex::new(BTreeMap::new())),
})
}
}

pub fn from_provider_options(
runner_options: RemoteCacheRunnerOptions,
provider_options: RemoteCacheProviderOptions,
) -> Result<Self, String> {
let provider = Arc::new(reapi::Provider::new(provider_options)?);

Ok(Self::new(runner_options, provider))
}

/// Create a REAPI `Tree` protobuf for an output directory by traversing down from a Pants
Expand Down
20 changes: 10 additions & 10 deletions src/rust/engine/process_execution/remote/src/remote_cache/reapi.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
// Copyright 2023 Pants project contributors (see CONTRIBUTORS.md).
// Licensed under the Apache License, Version 2.0 (see LICENSE).
use std::collections::BTreeMap;
use std::convert::TryInto;
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use grpc_util::retry::{retry_call, status_is_retryable};
Expand All @@ -18,7 +16,7 @@ use crate::remote::apply_headers;
use process_execution::Context;
use tonic::{Code, Request};

use super::ActionCacheProvider;
use super::{ActionCacheProvider, RemoteCacheProviderOptions};

pub struct Provider {
instance_name: Option<String>,
Expand All @@ -27,12 +25,14 @@ pub struct Provider {

impl Provider {
pub fn new(
instance_name: Option<String>,
action_cache_address: &str,
root_ca_certs: Option<Vec<u8>>,
mut headers: BTreeMap<String, String>,
concurrency_limit: usize,
rpc_timeout: Duration,
RemoteCacheProviderOptions {
instance_name,
action_cache_address,
root_ca_certs,
mut headers,
concurrency_limit,
rpc_timeout,
}: RemoteCacheProviderOptions,
) -> Result<Self, String> {
let tls_client_config = if action_cache_address.starts_with("https://") {
Some(grpc_util::tls::Config::new_without_mtls(root_ca_certs).try_into()?)
Expand All @@ -41,7 +41,7 @@ impl Provider {
};

let endpoint = grpc_util::create_endpoint(
action_cache_address,
&action_cache_address,
tls_client_config.as_ref(),
&mut headers,
)?;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2023 Pants project contributors (see CONTRIBUTORS.md).
// Licensed under the Apache License, Version 2.0 (see LICENSE).
use std::{collections::BTreeMap, time::Duration};

use hashing::Digest;
use mock::StubCAS;
use process_execution::Context;
use protos::gen::build::bazel::remote::execution::v2 as remexec;

use super::{reapi::Provider, ActionCacheProvider, RemoteCacheProviderOptions};

fn new_provider(cas: &StubCAS) -> Provider {
Provider::new(RemoteCacheProviderOptions {
instance_name: None,
action_cache_address: cas.address(),
root_ca_certs: None,
headers: BTreeMap::new(),
concurrency_limit: 256,
rpc_timeout: Duration::from_secs(2),
})
.unwrap()
}

#[tokio::test]
async fn get_action_result_existing() {
let cas = StubCAS::empty();
let provider = new_provider(&cas);

let action_digest = Digest::of_bytes(b"get_action_cache test");
let action_result = remexec::ActionResult {
exit_code: 123,
..Default::default()
};
cas
.action_cache
.action_map
.lock()
.insert(action_digest.hash, action_result.clone());

assert_eq!(
provider
.get_action_result(action_digest, &Context::default())
.await,
Ok(Some(action_result))
);
}

#[tokio::test]
async fn get_action_result_missing() {
let cas = StubCAS::empty();
let provider = new_provider(&cas);

let action_digest = Digest::of_bytes(b"update_action_cache test");

assert_eq!(
provider
.get_action_result(action_digest, &Context::default())
.await,
Ok(None)
);
}

#[tokio::test]
async fn get_action_result_grpc_error() {
let cas = StubCAS::builder().ac_always_errors().build();
let provider = new_provider(&cas);

let action_digest = Digest::of_bytes(b"get_action_result_grpc_error test");

let error = provider
.get_action_result(action_digest, &Context::default())
.await
.expect_err("Want err");

assert!(
error.contains("unavailable"),
"Bad error message, got: {error}"
);
}

#[tokio::test]
async fn update_action_cache() {
let cas = StubCAS::empty();
let provider = new_provider(&cas);

let action_digest = Digest::of_bytes(b"update_action_cache test");
let action_result = remexec::ActionResult {
exit_code: 123,
..Default::default()
};

provider
.update_action_result(action_digest, action_result.clone())
.await
.unwrap();

assert_eq!(
cas.action_cache.action_map.lock()[&action_digest.hash],
action_result
);
}

#[tokio::test]
async fn update_action_cache_grpc_error() {
let cas = StubCAS::builder().ac_always_errors().build();
let provider = new_provider(&cas);

let action_digest = Digest::of_bytes(b"update_action_cache_grpc_error test");
let action_result = remexec::ActionResult {
exit_code: 123,
..Default::default()
};

let error = provider
.update_action_result(action_digest, action_result.clone())
.await
.expect_err("Want err");

assert!(
error.contains("unavailable"),
"Bad error message, got: {error}"
);
}
78 changes: 45 additions & 33 deletions src/rust/engine/process_execution/remote/src/remote_cache_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use testutil::data::{TestData, TestDirectory, TestTree};
use workunit_store::{RunId, RunningWorkunit, WorkunitStore};

use crate::remote::ensure_action_stored_locally;
use crate::remote_cache::RemoteCacheWarningsBehavior;
use crate::remote_cache::{
RemoteCacheProviderOptions, RemoteCacheRunnerOptions, RemoteCacheWarningsBehavior,
};
use process_execution::{
make_execute_request, CacheContentBehavior, CommandRunner as CommandRunnerTrait, Context,
EntireExecuteRequest, FallibleProcessResultWithPlatform, Platform, Process, ProcessCacheScope,
Expand Down Expand Up @@ -146,22 +148,27 @@ fn create_cached_runner(
cache_content_behavior: CacheContentBehavior,
) -> Box<dyn CommandRunnerTrait> {
Box::new(
crate::remote_cache::CommandRunner::new(
local.into(),
None,
None,
store_setup.executor.clone(),
store_setup.store.clone(),
&store_setup.cas.address(),
None,
BTreeMap::default(),
true,
true,
RemoteCacheWarningsBehavior::FirstOnly,
cache_content_behavior,
256,
CACHE_READ_TIMEOUT,
None,
crate::remote_cache::CommandRunner::from_provider_options(
RemoteCacheRunnerOptions {
inner: local.into(),
instance_name: None,
process_cache_namespace: None,
executor: store_setup.executor.clone(),
store: store_setup.store.clone(),
cache_read: true,
cache_write: true,
warnings_behavior: RemoteCacheWarningsBehavior::FirstOnly,
cache_content_behavior,
append_only_caches_base_path: None,
},
RemoteCacheProviderOptions {
instance_name: None,
action_cache_address: store_setup.cas.address(),
root_ca_certs: None,
headers: BTreeMap::default(),
concurrency_limit: 256,
rpc_timeout: CACHE_READ_TIMEOUT,
},
)
.expect("caching command runner"),
)
Expand Down Expand Up @@ -731,22 +738,27 @@ async fn make_action_result_basic() {

let mock_command_runner = Arc::new(MockCommandRunner);
let cas = StubCAS::builder().build();
let runner = crate::remote_cache::CommandRunner::new(
mock_command_runner.clone(),
None,
None,
executor.clone(),
store.clone(),
&cas.address(),
None,
BTreeMap::default(),
true,
true,
RemoteCacheWarningsBehavior::FirstOnly,
CacheContentBehavior::Defer,
256,
CACHE_READ_TIMEOUT,
None,
let runner = crate::remote_cache::CommandRunner::from_provider_options(
RemoteCacheRunnerOptions {
inner: mock_command_runner.clone(),
instance_name: None,
process_cache_namespace: None,
executor: executor.clone(),
store: store.clone(),
cache_read: true,
cache_write: true,
warnings_behavior: RemoteCacheWarningsBehavior::FirstOnly,
cache_content_behavior: CacheContentBehavior::Defer,
append_only_caches_base_path: None,
},
RemoteCacheProviderOptions {
instance_name: None,
action_cache_address: cas.address(),
root_ca_certs: None,
headers: BTreeMap::default(),
concurrency_limit: 256,
rpc_timeout: CACHE_READ_TIMEOUT,
},
)
.expect("caching command runner");

Expand Down
Loading

0 comments on commit 237d7ab

Please sign in to comment.