Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port the bulk of the process_execution crate to async/await #9676

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/rust/engine/process_execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ authors = [ "Pants Build <[email protected]>" ]
publish = false

[dependencies]
async-trait = "0.1"
copy_dir = "0.1.2"
walkdir = "2"
async_semaphore = { path = "../async_semaphore" }
Expand Down
206 changes: 94 additions & 112 deletions src/rust/engine/process_execution/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ use crate::{
};
use std::sync::Arc;

use async_trait::async_trait;
use bincode;
use bytes::Bytes;
use futures::compat::Future01CompatExt;
use futures::future::{self as future03, TryFutureExt};
use futures01::{future, Future};
use futures::future;
use futures01::Future;
use log::{debug, warn};
use protobuf::Message;
use serde::{Deserialize, Serialize};

use boxfuture::{BoxFuture, Boxable};
use hashing::Fingerprint;
use serde::{Deserialize, Serialize};
use sharded_lmdb::ShardedLmdb;
use store::Store;

Expand Down Expand Up @@ -49,107 +49,93 @@ impl CommandRunner {
}
}

#[async_trait]
impl crate::CommandRunner for CommandRunner {
fn extract_compatible_request(&self, req: &MultiPlatformProcess) -> Option<Process> {
self.underlying.extract_compatible_request(req)
}

// TODO: Maybe record WorkUnits for local cache checks.
fn run(
async fn run(
&self,
req: MultiPlatformProcess,
context: Context,
) -> BoxFuture<FallibleProcessResultWithPlatform, String> {
) -> Result<FallibleProcessResultWithPlatform, String> {
let digest = crate::digest(req.clone(), &self.metadata);
let key = digest.0;

let command_runner = self.clone();
self
.lookup(key)
.then(move |maybe_result| {
match maybe_result {
Ok(Some(result)) => return future::ok(result).to_boxed(),
Err(err) => {
warn!("Error loading process execution result from local cache: {} - continuing to execute", err);
// Falling through to re-execute.
},
Ok(None) => {
// Falling through to execute.
},
}
command_runner
.underlying
.run(req, context)
.and_then(move |result| {
if result.exit_code == 0 {
command_runner
.store(key, &result)
.then(|store_result| {
if let Err(err) = store_result {
debug!("Error storing process execution result to local cache: {} - ignoring and continuing", err);
}
Ok(result)
}).to_boxed()
} else {
future::ok(result).to_boxed()
}
})
.to_boxed()
match self.lookup(key).await {
Ok(Some(result)) => return Ok(result),
Err(err) => {
warn!(
"Error loading process execution result from local cache: {} - continuing to execute",
err
);
// Falling through to re-execute.
}
Ok(None) => {
// Falling through to execute.
}
}

})
.to_boxed()
let result = command_runner.underlying.run(req, context).await?;
if result.exit_code == 0 {
if let Err(err) = command_runner.store(key, &result).await {
debug!(
"Error storing process execution result to local cache: {} - ignoring and continuing",
err
);
}
}
Ok(result)
}
}

impl CommandRunner {
fn lookup(
async fn lookup(
&self,
fingerprint: Fingerprint,
) -> impl Future<Item = Option<FallibleProcessResultWithPlatform>, Error = String> {
) -> Result<Option<FallibleProcessResultWithPlatform>, String> {
use bazel_protos::remote_execution::ExecuteResponse;
let file_store = self.file_store.clone();

let command_runner = self.clone();
Box::pin(async move {
let maybe_execute_response: Option<(ExecuteResponse, Platform)> = command_runner
.process_execution_store
.load_bytes_with(fingerprint.clone(), move |bytes| {
let decoded: PlatformAndResponseBytes = bincode::deserialize(&bytes[..])
.map_err(|err| format!("Could not deserialize platform and response: {}", err))?;

let platform = decoded.platform;

let mut execute_response = ExecuteResponse::new();
execute_response
.merge_from_bytes(&decoded.response_bytes)
.map_err(|e| format!("Invalid ExecuteResponse: {:?}", e))?;

Ok((execute_response, platform))
})
.await?;

if let Some((execute_response, platform)) = maybe_execute_response {
crate::remote::populate_fallible_execution_result(
file_store,
execute_response,
vec![],
platform,
)
.map(Some)
.compat()
.await
} else {
Ok(None)
}
})
.compat()
let maybe_execute_response: Option<(ExecuteResponse, Platform)> = self
.process_execution_store
.load_bytes_with(fingerprint.clone(), move |bytes| {
let decoded: PlatformAndResponseBytes = bincode::deserialize(&bytes[..])
.map_err(|err| format!("Could not deserialize platform and response: {}", err))?;

let platform = decoded.platform;

let mut execute_response = ExecuteResponse::new();
execute_response
.merge_from_bytes(&decoded.response_bytes)
.map_err(|e| format!("Invalid ExecuteResponse: {:?}", e))?;

Ok((execute_response, platform))
})
.await?;

if let Some((execute_response, platform)) = maybe_execute_response {
crate::remote::populate_fallible_execution_result(
self.file_store.clone(),
execute_response,
vec![],
platform,
)
.map(Some)
.compat()
.await
} else {
Ok(None)
}
}

fn store(
async fn store(
&self,
fingerprint: Fingerprint,
result: &FallibleProcessResultWithPlatform,
) -> impl Future<Item = (), Error = String> {
) -> Result<(), String> {
let mut execute_response = bazel_protos::remote_execution::ExecuteResponse::new();
execute_response.set_cached_result(true);
let action_result = execute_response.mut_result();
Expand All @@ -160,46 +146,42 @@ impl CommandRunner {
directory.set_tree_digest((&result.output_directory).into());
directory
});
let process_execution_store = self.process_execution_store.clone();
// TODO: Should probably have a configurable lease time which is larger than default.
// (This isn't super urgent because we don't ever actually GC this store. So also...)
// TODO: GC the local process execution cache.
//

let platform = result.platform;
let (stdout_digest, stderr_digest) = future::try_join(
self
.file_store
.store_file_bytes(result.stdout.clone(), true),
self
.file_store
.store_file_bytes(result.stderr.clone(), true),
)
.await?;

let command_runner = self.clone();
let stdout = result.stdout.clone();
let stderr = result.stderr.clone();
Box::pin(async move {
let (stdout_digest, stderr_digest) = future03::try_join(
command_runner.file_store.store_file_bytes(stdout, true),
command_runner.file_store.store_file_bytes(stderr, true),
let action_result = execute_response.mut_result();
action_result.set_stdout_digest((&stdout_digest).into());
action_result.set_stderr_digest((&stderr_digest).into());
let response_bytes = execute_response
.write_to_bytes()
.map_err(|err| format!("Error serializing execute process result to cache: {}", err))?;

let bytes_to_store = bincode::serialize(&PlatformAndResponseBytes {
platform: result.platform,
response_bytes,
})
.map(Bytes::from)
.map_err(|err| {
format!(
"Error serializing platform and execute process result: {}",
err
)
.await?;
let action_result = execute_response.mut_result();
action_result.set_stdout_digest((&stdout_digest).into());
action_result.set_stderr_digest((&stderr_digest).into());
let response_bytes = execute_response
.write_to_bytes()
.map_err(|err| format!("Error serializing execute process result to cache: {}", err))?;

let bytes_to_store = bincode::serialize(&PlatformAndResponseBytes {
platform,
response_bytes,
})
.map(Bytes::from)
.map_err(|err| {
format!(
"Error serializing platform and execute process result: {}",
err
)
})?;
})?;

process_execution_store
.store_bytes(fingerprint, bytes_to_store, false)
.await
})
.compat()
self
.process_execution_store
.store_bytes(fingerprint, bytes_to_store, false)
.await
}
}
12 changes: 2 additions & 10 deletions src/rust/engine/process_execution/src/cache_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::{
CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform,
PlatformConstraint, Process, ProcessMetadata,
};
use futures::compat::Future01CompatExt;
use hashing::EMPTY_DIGEST;
use sharded_lmdb::ShardedLmdb;
use std::collections::{BTreeMap, BTreeSet};
Expand Down Expand Up @@ -64,10 +63,7 @@ async fn run_roundtrip(script_exit_code: i8) -> RoundtripResults {
is_nailgunnable: false,
};

let local_result = local
.run(request.clone().into(), Context::default())
.compat()
.await;
let local_result = local.run(request.clone().into(), Context::default()).await;

let cache_dir = TempDir::new().unwrap();
let max_lmdb_size = 50 * 1024 * 1024; //50 MB - I didn't pick that number but it seems reasonable.
Expand All @@ -90,7 +86,6 @@ async fn run_roundtrip(script_exit_code: i8) -> RoundtripResults {

let uncached_result = caching
.run(request.clone().into(), Context::default())
.compat()
.await;

assert_eq!(local_result, uncached_result);
Expand All @@ -99,10 +94,7 @@ async fn run_roundtrip(script_exit_code: i8) -> RoundtripResults {
// fail due to a FileNotFound error. So, If the second run succeeds, that implies that the
// cache was successfully used.
std::fs::remove_file(&script_path).unwrap();
let maybe_cached_result = caching
.run(request.into(), Context::default())
.compat()
.await;
let maybe_cached_result = caching.run(request.into(), Context::default()).await;

RoundtripResults {
uncached: uncached_result,
Expand Down
Loading