Skip to content

Commit

Permalink
Parallelize resolver (#3627)
Browse files Browse the repository at this point in the history
## Summary

This PR introduces parallelism to the resolver. Specifically, we can
perform PubGrub resolution on a separate thread, while keeping all I/O
on the tokio thread. We already have the infrastructure set up for this
with the channel and `OnceMap`, which makes this change relatively
simple. The big change needed to make this possible is removing the
lifetimes on some of the types that need to be shared between the
resolver and pubgrub thread.

A related PR, #1163, found that
adding `yield_now` calls improved throughput. With optimal scheduling we
might be able to get away with everything on the same thread here.
However, in the ideal pipeline with perfect prefetching, the resolution
and prefetching can run completely in parallel without depending on one
another. While this would be very difficult to achieve, even with our
current prefetching pattern we see a consistent performance improvement
from parallelism.

This does also require reverting a few of the changes from
#3413, but not all of them. The
sharing is isolated to the resolver task.

## Test Plan

On smaller tasks performance is mixed with ~2% improvements/regressions
on both sides. However, on medium-large resolution tasks we see the
benefits of parallelism, with improvements anywhere from 10-50%.

```
./scripts/requirements/jupyter.in
Benchmark 1: ./target/profiling/baseline (resolve-warm)
  Time (mean ± σ):      29.2 ms ±   1.8 ms    [User: 20.3 ms, System: 29.8 ms]
  Range (min … max):    26.4 ms …  36.0 ms    91 runs
 
Benchmark 2: ./target/profiling/parallel (resolve-warm)
  Time (mean ± σ):      25.5 ms ±   1.0 ms    [User: 19.5 ms, System: 25.5 ms]
  Range (min … max):    23.6 ms …  27.8 ms    99 runs
 
Summary
  ./target/profiling/parallel (resolve-warm) ran
    1.15 ± 0.08 times faster than ./target/profiling/baseline (resolve-warm)
```
```
./scripts/requirements/boto3.in   
Benchmark 1: ./target/profiling/baseline (resolve-warm)
  Time (mean ± σ):     487.1 ms ±   6.2 ms    [User: 464.6 ms, System: 61.6 ms]
  Range (min … max):   480.0 ms … 497.3 ms    10 runs
 
Benchmark 2: ./target/profiling/parallel (resolve-warm)
  Time (mean ± σ):     430.8 ms ±   9.3 ms    [User: 529.0 ms, System: 77.2 ms]
  Range (min … max):   417.1 ms … 442.5 ms    10 runs
 
Summary
  ./target/profiling/parallel (resolve-warm) ran
    1.13 ± 0.03 times faster than ./target/profiling/baseline (resolve-warm)
```
```
./scripts/requirements/airflow.in 
Benchmark 1: ./target/profiling/baseline (resolve-warm)
  Time (mean ± σ):     478.1 ms ±  18.8 ms    [User: 482.6 ms, System: 205.0 ms]
  Range (min … max):   454.7 ms … 508.9 ms    10 runs
 
Benchmark 2: ./target/profiling/parallel (resolve-warm)
  Time (mean ± σ):     308.7 ms ±  11.7 ms    [User: 428.5 ms, System: 209.5 ms]
  Range (min … max):   287.8 ms … 323.1 ms    10 runs
 
Summary
  ./target/profiling/parallel (resolve-warm) ran
    1.55 ± 0.08 times faster than ./target/profiling/baseline (resolve-warm)
```
  • Loading branch information
ibraheemdev authored May 17, 2024
1 parent 70a1782 commit 39af09f
Show file tree
Hide file tree
Showing 27 changed files with 337 additions and 303 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/bench/benches/uv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ mod resolver {
&index,
&hashes,
&build_context,
&installed_packages,
installed_packages,
DistributionDatabase::new(client, &build_context, concurrency.downloads),
)?;

Expand Down
1 change: 1 addition & 0 deletions crates/once-map/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ workspace = true
[dependencies]
dashmap = { workspace = true }
tokio = { workspace = true }
futures = { workspace = true }
21 changes: 21 additions & 0 deletions crates/once-map/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,27 @@ impl<K: Eq + Hash, V: Clone> OnceMap<K, V> {
}
}

/// Wait for the result of a job that is running, in a blocking context.
///
/// Will hang if [`OnceMap::done`] isn't called for this key.
pub fn wait_blocking(&self, key: &K) -> Option<V> {
let entry = self.items.get(key)?;
match entry.value() {
Value::Filled(value) => Some(value.clone()),
Value::Waiting(notify) => {
let notify = notify.clone();
drop(entry);
futures::executor::block_on(notify.notified());

let entry = self.items.get(key).expect("map is append-only");
match entry.value() {
Value::Filled(value) => Some(value.clone()),
Value::Waiting(_) => unreachable!("notify was called"),
}
}
}
}

/// Return the result of a previous job, if any.
pub fn get<Q: ?Sized + Hash + Eq>(&self, key: &Q) -> Option<V>
where
Expand Down
2 changes: 1 addition & 1 deletion crates/uv-dispatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl<'a> BuildContext for BuildDispatch<'a> {
self.index,
&HashStrategy::None,
self,
&EmptyInstalledPackages,
EmptyInstalledPackages,
DistributionDatabase::new(self.client, self, self.concurrency.downloads),
)?;
let graph = resolver.resolve().await.with_context(|| {
Expand Down
2 changes: 1 addition & 1 deletion crates/uv-installer/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl<'a> Planner<'a> {
#[allow(clippy::too_many_arguments)]
pub fn build(
self,
mut site_packages: SitePackages<'_>,
mut site_packages: SitePackages,
reinstall: &Reinstall,
no_binary: &NoBinary,
hasher: &HashStrategy,
Expand Down
18 changes: 9 additions & 9 deletions crates/uv-installer/src/site_packages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ use crate::satisfies::RequirementSatisfaction;
/// An index over the packages installed in an environment.
///
/// Packages are indexed by both name and (for editable installs) URL.
#[derive(Debug)]
pub struct SitePackages<'a> {
venv: &'a PythonEnvironment,
#[derive(Debug, Clone)]
pub struct SitePackages {
venv: PythonEnvironment,
/// The vector of all installed distributions. The `by_name` and `by_url` indices index into
/// this vector. The vector may contain `None` values, which represent distributions that were
/// removed from the virtual environment.
Expand All @@ -38,9 +38,9 @@ pub struct SitePackages<'a> {
by_url: FxHashMap<Url, Vec<usize>>,
}

impl<'a> SitePackages<'a> {
impl SitePackages {
/// Build an index of installed packages from the given Python executable.
pub fn from_executable(venv: &'a PythonEnvironment) -> Result<SitePackages<'a>> {
pub fn from_executable(venv: &PythonEnvironment) -> Result<SitePackages> {
let mut distributions: Vec<Option<InstalledDist>> = Vec::new();
let mut by_name = FxHashMap::default();
let mut by_url = FxHashMap::default();
Expand Down Expand Up @@ -68,7 +68,7 @@ impl<'a> SitePackages<'a> {
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
return Ok(Self {
venv,
venv: venv.clone(),
distributions,
by_name,
by_url,
Expand Down Expand Up @@ -107,7 +107,7 @@ impl<'a> SitePackages<'a> {
}

Ok(Self {
venv,
venv: venv.clone(),
distributions,
by_name,
by_url,
Expand Down Expand Up @@ -439,7 +439,7 @@ pub enum SatisfiesResult {
Unsatisfied(String),
}

impl IntoIterator for SitePackages<'_> {
impl IntoIterator for SitePackages {
type Item = InstalledDist;
type IntoIter = Flatten<std::vec::IntoIter<Option<InstalledDist>>>;

Expand Down Expand Up @@ -540,7 +540,7 @@ impl Diagnostic {
}
}

impl InstalledPackagesProvider for SitePackages<'_> {
impl InstalledPackagesProvider for SitePackages {
fn iter(&self) -> impl Iterator<Item = &InstalledDist> {
self.iter()
}
Expand Down
59 changes: 32 additions & 27 deletions crates/uv-interpreter/src/environment.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use itertools::Either;
use std::env;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use same_file::is_same_file;

Expand All @@ -12,7 +13,10 @@ use crate::{find_default_python, find_requested_python, Error, Interpreter, Targ

/// A Python environment, consisting of a Python [`Interpreter`] and its associated paths.
#[derive(Debug, Clone)]
pub struct PythonEnvironment {
pub struct PythonEnvironment(Arc<PythonEnvironmentShared>);

#[derive(Debug, Clone)]
struct PythonEnvironmentShared {
root: PathBuf,
interpreter: Interpreter,
}
Expand Down Expand Up @@ -46,68 +50,69 @@ impl PythonEnvironment {
interpreter.base_prefix().display()
);

Ok(Self {
Ok(Self(Arc::new(PythonEnvironmentShared {
root: venv,
interpreter,
})
})))
}

/// Create a [`PythonEnvironment`] for a Python interpreter specifier (e.g., a path or a binary name).
pub fn from_requested_python(python: &str, cache: &Cache) -> Result<Self, Error> {
let Some(interpreter) = find_requested_python(python, cache)? else {
return Err(Error::RequestedPythonNotFound(python.to_string()));
};
Ok(Self {
Ok(Self(Arc::new(PythonEnvironmentShared {
root: interpreter.prefix().to_path_buf(),
interpreter,
})
})))
}

/// Create a [`PythonEnvironment`] for the default Python interpreter.
pub fn from_default_python(cache: &Cache) -> Result<Self, Error> {
let interpreter = find_default_python(cache)?;
Ok(Self {
Ok(Self(Arc::new(PythonEnvironmentShared {
root: interpreter.prefix().to_path_buf(),
interpreter,
})
})))
}

/// Create a [`PythonEnvironment`] from an existing [`Interpreter`] and root directory.
pub fn from_interpreter(interpreter: Interpreter) -> Self {
Self {
Self(Arc::new(PythonEnvironmentShared {
root: interpreter.prefix().to_path_buf(),
interpreter,
}
}))
}

/// Create a [`PythonEnvironment`] from an existing [`Interpreter`] and `--target` directory.
#[must_use]
pub fn with_target(self, target: Target) -> Self {
Self {
interpreter: self.interpreter.with_target(target),
..self
}
let inner = Arc::unwrap_or_clone(self.0);
Self(Arc::new(PythonEnvironmentShared {
interpreter: inner.interpreter.with_target(target),
..inner
}))
}

/// Returns the root (i.e., `prefix`) of the Python interpreter.
pub fn root(&self) -> &Path {
&self.root
&self.0.root
}

/// Return the [`Interpreter`] for this virtual environment.
pub fn interpreter(&self) -> &Interpreter {
&self.interpreter
&self.0.interpreter
}

/// Return the [`PyVenvConfiguration`] for this virtual environment, as extracted from the
/// `pyvenv.cfg` file.
pub fn cfg(&self) -> Result<PyVenvConfiguration, Error> {
Ok(PyVenvConfiguration::parse(self.root.join("pyvenv.cfg"))?)
Ok(PyVenvConfiguration::parse(self.0.root.join("pyvenv.cfg"))?)
}

/// Returns the location of the Python executable.
pub fn python_executable(&self) -> &Path {
self.interpreter.sys_executable()
self.0.interpreter.sys_executable()
}

/// Returns an iterator over the `site-packages` directories inside a virtual environment.
Expand All @@ -118,11 +123,11 @@ impl PythonEnvironment {
/// Some distributions also create symbolic links from `purelib` to `platlib`; in such cases, we
/// still deduplicate the entries, returning a single path.
pub fn site_packages(&self) -> impl Iterator<Item = &Path> {
if let Some(target) = self.interpreter.target() {
if let Some(target) = self.0.interpreter.target() {
Either::Left(std::iter::once(target.root()))
} else {
let purelib = self.interpreter.purelib();
let platlib = self.interpreter.platlib();
let purelib = self.0.interpreter.purelib();
let platlib = self.0.interpreter.platlib();
Either::Right(std::iter::once(purelib).chain(
if purelib == platlib || is_same_file(purelib, platlib).unwrap_or(false) {
None
Expand All @@ -135,31 +140,31 @@ impl PythonEnvironment {

/// Returns the path to the `bin` directory inside a virtual environment.
pub fn scripts(&self) -> &Path {
self.interpreter.scripts()
self.0.interpreter.scripts()
}

/// Grab a file lock for the virtual environment to prevent concurrent writes across processes.
pub fn lock(&self) -> Result<LockedFile, std::io::Error> {
if let Some(target) = self.interpreter.target() {
if let Some(target) = self.0.interpreter.target() {
// If we're installing into a `--target`, use a target-specific lock file.
LockedFile::acquire(
target.root().join(".lock"),
target.root().simplified_display(),
)
} else if self.interpreter.is_virtualenv() {
} else if self.0.interpreter.is_virtualenv() {
// If the environment a virtualenv, use a virtualenv-specific lock file.
LockedFile::acquire(self.root.join(".lock"), self.root.simplified_display())
LockedFile::acquire(self.0.root.join(".lock"), self.0.root.simplified_display())
} else {
// Otherwise, use a global lock file.
LockedFile::acquire(
env::temp_dir().join(format!("uv-{}.lock", cache_key::digest(&self.root))),
self.root.simplified_display(),
env::temp_dir().join(format!("uv-{}.lock", cache_key::digest(&self.0.root))),
self.0.root.simplified_display(),
)
}
}

/// Return the [`Interpreter`] for this virtual environment.
pub fn into_interpreter(self) -> Interpreter {
self.interpreter
Arc::unwrap_or_clone(self.0).interpreter
}
}
28 changes: 15 additions & 13 deletions crates/uv-requirements/src/lookahead.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::VecDeque;
use std::{collections::VecDeque, sync::Arc};

use futures::stream::FuturesUnordered;
use futures::StreamExt;
Expand Down Expand Up @@ -197,17 +197,18 @@ impl<'a, Context: BuildContext> LookaheadResolver<'a, Context> {
// Fetch the metadata for the distribution.
let requires_dist = {
let id = dist.version_id();
if let Some(archive) = self
.index
.get_metadata(&id)
.as_deref()
.and_then(|response| {
if let MetadataResponse::Found(archive, ..) = response {
Some(archive)
} else {
None
}
})
if let Some(archive) =
self.index
.distributions()
.get(&id)
.as_deref()
.and_then(|response| {
if let MetadataResponse::Found(archive, ..) = response {
Some(archive)
} else {
None
}
})
{
// If the metadata is already in the index, return it.
archive
Expand All @@ -234,7 +235,8 @@ impl<'a, Context: BuildContext> LookaheadResolver<'a, Context> {

// Insert the metadata into the index.
self.index
.insert_metadata(id, MetadataResponse::Found(archive));
.distributions()
.done(id, Arc::new(MetadataResponse::Found(archive)));

requires_dist
.into_iter()
Expand Down
27 changes: 15 additions & 12 deletions crates/uv-requirements/src/source_tree.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::borrow::Cow;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use anyhow::{Context, Result};
use futures::stream::FuturesOrdered;
Expand Down Expand Up @@ -117,17 +118,18 @@ impl<'a, Context: BuildContext> SourceTreeResolver<'a, Context> {
// Fetch the metadata for the distribution.
let metadata = {
let id = VersionId::from_url(source.url());
if let Some(archive) = self
.index
.get_metadata(&id)
.as_deref()
.and_then(|response| {
if let MetadataResponse::Found(archive) = response {
Some(archive)
} else {
None
}
})
if let Some(archive) =
self.index
.distributions()
.get(&id)
.as_deref()
.and_then(|response| {
if let MetadataResponse::Found(archive) = response {
Some(archive)
} else {
None
}
})
{
// If the metadata is already in the index, return it.
archive.metadata.clone()
Expand All @@ -138,7 +140,8 @@ impl<'a, Context: BuildContext> SourceTreeResolver<'a, Context> {

// Insert the metadata into the index.
self.index
.insert_metadata(id, MetadataResponse::Found(archive.clone()));
.distributions()
.done(id, Arc::new(MetadataResponse::Found(archive.clone())));

archive.metadata
}
Expand Down
Loading

0 comments on commit 39af09f

Please sign in to comment.