Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize Resolver #3627

Merged
merged 11 commits into from
May 17, 2024
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/bench/benches/uv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ mod resolver {
&index,
&hashes,
&build_context,
&installed_packages,
installed_packages,
DistributionDatabase::new(client, &build_context, concurrency.downloads),
)?;

Expand Down
1 change: 1 addition & 0 deletions crates/once-map/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ workspace = true
[dependencies]
dashmap = { workspace = true }
tokio = { workspace = true }
futures = { workspace = true }
21 changes: 21 additions & 0 deletions crates/once-map/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,27 @@ impl<K: Eq + Hash, V: Clone> OnceMap<K, V> {
}
}

/// Wait for the result of a job that is running, in a blocking context.
///
/// Will hang if [`OnceMap::done`] isn't called for this key.
pub fn wait_blocking(&self, key: &K) -> Option<V> {
let entry = self.items.get(key)?;
match entry.value() {
Value::Filled(value) => Some(value.clone()),
Value::Waiting(notify) => {
let notify = notify.clone();
drop(entry);
futures::executor::block_on(notify.notified());

let entry = self.items.get(key).expect("map is append-only");
match entry.value() {
Value::Filled(value) => Some(value.clone()),
Value::Waiting(_) => unreachable!("notify was called"),
}
}
}
}

/// Return the result of a previous job, if any.
pub fn get<Q: ?Sized + Hash + Eq>(&self, key: &Q) -> Option<V>
where
Expand Down
2 changes: 1 addition & 1 deletion crates/uv-dispatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl<'a> BuildContext for BuildDispatch<'a> {
self.index,
&HashStrategy::None,
self,
&EmptyInstalledPackages,
EmptyInstalledPackages,
DistributionDatabase::new(self.client, self, self.concurrency.downloads),
)?;
let graph = resolver.resolve().await.with_context(|| {
Expand Down
2 changes: 1 addition & 1 deletion crates/uv-installer/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl<'a> Planner<'a> {
#[allow(clippy::too_many_arguments)]
pub fn build(
self,
mut site_packages: SitePackages<'_>,
mut site_packages: SitePackages,
reinstall: &Reinstall,
no_binary: &NoBinary,
hasher: &HashStrategy,
Expand Down
18 changes: 9 additions & 9 deletions crates/uv-installer/src/site_packages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ use crate::satisfies::RequirementSatisfaction;
/// An index over the packages installed in an environment.
///
/// Packages are indexed by both name and (for editable installs) URL.
#[derive(Debug)]
pub struct SitePackages<'a> {
venv: &'a PythonEnvironment,
#[derive(Debug, Clone)]
pub struct SitePackages {
venv: PythonEnvironment,
/// The vector of all installed distributions. The `by_name` and `by_url` indices index into
/// this vector. The vector may contain `None` values, which represent distributions that were
/// removed from the virtual environment.
Expand All @@ -38,9 +38,9 @@ pub struct SitePackages<'a> {
by_url: FxHashMap<Url, Vec<usize>>,
}

impl<'a> SitePackages<'a> {
impl SitePackages {
/// Build an index of installed packages from the given Python executable.
pub fn from_executable(venv: &'a PythonEnvironment) -> Result<SitePackages<'a>> {
pub fn from_executable(venv: &PythonEnvironment) -> Result<SitePackages> {
let mut distributions: Vec<Option<InstalledDist>> = Vec::new();
let mut by_name = FxHashMap::default();
let mut by_url = FxHashMap::default();
Expand Down Expand Up @@ -68,7 +68,7 @@ impl<'a> SitePackages<'a> {
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
return Ok(Self {
venv,
venv: venv.clone(),
distributions,
by_name,
by_url,
Expand Down Expand Up @@ -107,7 +107,7 @@ impl<'a> SitePackages<'a> {
}

Ok(Self {
venv,
venv: venv.clone(),
distributions,
by_name,
by_url,
Expand Down Expand Up @@ -439,7 +439,7 @@ pub enum SatisfiesResult {
Unsatisfied(String),
}

impl IntoIterator for SitePackages<'_> {
impl IntoIterator for SitePackages {
type Item = InstalledDist;
type IntoIter = Flatten<std::vec::IntoIter<Option<InstalledDist>>>;

Expand Down Expand Up @@ -540,7 +540,7 @@ impl Diagnostic {
}
}

impl InstalledPackagesProvider for SitePackages<'_> {
impl InstalledPackagesProvider for SitePackages {
fn iter(&self) -> impl Iterator<Item = &InstalledDist> {
self.iter()
}
Expand Down
59 changes: 32 additions & 27 deletions crates/uv-interpreter/src/environment.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use itertools::Either;
use std::env;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use same_file::is_same_file;

Expand All @@ -12,7 +13,10 @@ use crate::{find_default_python, find_requested_python, Error, Interpreter, Targ

/// A Python environment, consisting of a Python [`Interpreter`] and its associated paths.
#[derive(Debug, Clone)]
pub struct PythonEnvironment {
pub struct PythonEnvironment(Arc<SharedPythonEnvironment>);

#[derive(Debug, Clone)]
struct SharedPythonEnvironment {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my knowledge, is this the typical naming scheme for this pattern?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I usually use Foo and FooInner personally. I don't think I've seen SharedFoo much? I like adding a suffix personally. So I'd prefer FooShared (or whatever). But I don't have a strong opinion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool thanks! Inner makes a bit more sense to me.

Copy link
Member Author

@ibraheemdev ibraheemdev May 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I sort of avoid Inner because it feels like a catch-all naming convention. A suffix seems slightly better for readability, I'll switch to that.

root: PathBuf,
interpreter: Interpreter,
}
Expand Down Expand Up @@ -46,68 +50,69 @@ impl PythonEnvironment {
interpreter.base_prefix().display()
);

Ok(Self {
Ok(Self(Arc::new(SharedPythonEnvironment {
root: venv,
interpreter,
})
})))
}

/// Create a [`PythonEnvironment`] for a Python interpreter specifier (e.g., a path or a binary name).
pub fn from_requested_python(python: &str, cache: &Cache) -> Result<Self, Error> {
let Some(interpreter) = find_requested_python(python, cache)? else {
return Err(Error::RequestedPythonNotFound(python.to_string()));
};
Ok(Self {
Ok(Self(Arc::new(SharedPythonEnvironment {
root: interpreter.prefix().to_path_buf(),
interpreter,
})
})))
}

/// Create a [`PythonEnvironment`] for the default Python interpreter.
pub fn from_default_python(cache: &Cache) -> Result<Self, Error> {
let interpreter = find_default_python(cache)?;
Ok(Self {
Ok(Self(Arc::new(SharedPythonEnvironment {
root: interpreter.prefix().to_path_buf(),
interpreter,
})
})))
}

/// Create a [`PythonEnvironment`] from an existing [`Interpreter`] and root directory.
pub fn from_interpreter(interpreter: Interpreter) -> Self {
Self {
Self(Arc::new(SharedPythonEnvironment {
root: interpreter.prefix().to_path_buf(),
interpreter,
}
}))
}

/// Create a [`PythonEnvironment`] from an existing [`Interpreter`] and `--target` directory.
#[must_use]
pub fn with_target(self, target: Target) -> Self {
Self {
interpreter: self.interpreter.with_target(target),
..self
}
let inner = Arc::unwrap_or_clone(self.0);
Self(Arc::new(SharedPythonEnvironment {
interpreter: inner.interpreter.with_target(target),
..inner
}))
}

/// Returns the root (i.e., `prefix`) of the Python interpreter.
pub fn root(&self) -> &Path {
&self.root
&self.0.root
}

/// Return the [`Interpreter`] for this virtual environment.
pub fn interpreter(&self) -> &Interpreter {
&self.interpreter
&self.0.interpreter
}

/// Return the [`PyVenvConfiguration`] for this virtual environment, as extracted from the
/// `pyvenv.cfg` file.
pub fn cfg(&self) -> Result<PyVenvConfiguration, Error> {
Ok(PyVenvConfiguration::parse(self.root.join("pyvenv.cfg"))?)
Ok(PyVenvConfiguration::parse(self.0.root.join("pyvenv.cfg"))?)
}

/// Returns the location of the Python executable.
pub fn python_executable(&self) -> &Path {
self.interpreter.sys_executable()
self.0.interpreter.sys_executable()
}

/// Returns an iterator over the `site-packages` directories inside a virtual environment.
Expand All @@ -118,11 +123,11 @@ impl PythonEnvironment {
/// Some distributions also create symbolic links from `purelib` to `platlib`; in such cases, we
/// still deduplicate the entries, returning a single path.
pub fn site_packages(&self) -> impl Iterator<Item = &Path> {
if let Some(target) = self.interpreter.target() {
if let Some(target) = self.0.interpreter.target() {
Either::Left(std::iter::once(target.root()))
} else {
let purelib = self.interpreter.purelib();
let platlib = self.interpreter.platlib();
let purelib = self.0.interpreter.purelib();
let platlib = self.0.interpreter.platlib();
Either::Right(std::iter::once(purelib).chain(
if purelib == platlib || is_same_file(purelib, platlib).unwrap_or(false) {
None
Expand All @@ -135,31 +140,31 @@ impl PythonEnvironment {

/// Returns the path to the `bin` directory inside a virtual environment.
pub fn scripts(&self) -> &Path {
self.interpreter.scripts()
self.0.interpreter.scripts()
}

/// Grab a file lock for the virtual environment to prevent concurrent writes across processes.
pub fn lock(&self) -> Result<LockedFile, std::io::Error> {
if let Some(target) = self.interpreter.target() {
if let Some(target) = self.0.interpreter.target() {
// If we're installing into a `--target`, use a target-specific lock file.
LockedFile::acquire(
target.root().join(".lock"),
target.root().simplified_display(),
)
} else if self.interpreter.is_virtualenv() {
} else if self.0.interpreter.is_virtualenv() {
// If the environment a virtualenv, use a virtualenv-specific lock file.
LockedFile::acquire(self.root.join(".lock"), self.root.simplified_display())
LockedFile::acquire(self.0.root.join(".lock"), self.0.root.simplified_display())
} else {
// Otherwise, use a global lock file.
LockedFile::acquire(
env::temp_dir().join(format!("uv-{}.lock", cache_key::digest(&self.root))),
self.root.simplified_display(),
env::temp_dir().join(format!("uv-{}.lock", cache_key::digest(&self.0.root))),
self.0.root.simplified_display(),
)
}
}

/// Return the [`Interpreter`] for this virtual environment.
pub fn into_interpreter(self) -> Interpreter {
self.interpreter
Arc::unwrap_or_clone(self.0).interpreter
}
}
28 changes: 15 additions & 13 deletions crates/uv-requirements/src/lookahead.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::VecDeque;
use std::{collections::VecDeque, sync::Arc};

use futures::stream::FuturesUnordered;
use futures::StreamExt;
Expand Down Expand Up @@ -197,17 +197,18 @@ impl<'a, Context: BuildContext> LookaheadResolver<'a, Context> {
// Fetch the metadata for the distribution.
let requires_dist = {
let id = dist.version_id();
if let Some(archive) = self
.index
.get_metadata(&id)
.as_deref()
.and_then(|response| {
if let MetadataResponse::Found(archive, ..) = response {
Some(archive)
} else {
None
}
})
if let Some(archive) =
self.index
.distributions()
.get(&id)
.as_deref()
.and_then(|response| {
if let MetadataResponse::Found(archive, ..) = response {
Some(archive)
} else {
None
}
})
{
// If the metadata is already in the index, return it.
archive
Expand All @@ -234,7 +235,8 @@ impl<'a, Context: BuildContext> LookaheadResolver<'a, Context> {

// Insert the metadata into the index.
self.index
.insert_metadata(id, MetadataResponse::Found(archive));
.distributions()
.done(id, Arc::new(MetadataResponse::Found(archive)));

requires_dist
.into_iter()
Expand Down
27 changes: 15 additions & 12 deletions crates/uv-requirements/src/source_tree.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::borrow::Cow;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use anyhow::{Context, Result};
use futures::stream::FuturesOrdered;
Expand Down Expand Up @@ -117,17 +118,18 @@ impl<'a, Context: BuildContext> SourceTreeResolver<'a, Context> {
// Fetch the metadata for the distribution.
let metadata = {
let id = VersionId::from_url(source.url());
if let Some(archive) = self
.index
.get_metadata(&id)
.as_deref()
.and_then(|response| {
if let MetadataResponse::Found(archive) = response {
Some(archive)
} else {
None
}
})
if let Some(archive) =
self.index
.distributions()
.get(&id)
.as_deref()
.and_then(|response| {
if let MetadataResponse::Found(archive) = response {
Some(archive)
} else {
None
}
})
{
// If the metadata is already in the index, return it.
archive.metadata.clone()
Expand All @@ -138,7 +140,8 @@ impl<'a, Context: BuildContext> SourceTreeResolver<'a, Context> {

// Insert the metadata into the index.
self.index
.insert_metadata(id, MetadataResponse::Found(archive.clone()));
.distributions()
.done(id, Arc::new(MetadataResponse::Found(archive.clone())));

archive.metadata
}
Expand Down
Loading
Loading