Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize Resolver #3627

Merged
merged 11 commits into from
May 17, 2024
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/once-map/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ workspace = true
[dependencies]
dashmap = { workspace = true }
tokio = { workspace = true }
futures = { workspace = true }
21 changes: 21 additions & 0 deletions crates/once-map/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,27 @@ impl<K: Eq + Hash, V: Clone> OnceMap<K, V> {
}
}

/// Wait for the result of a job that is running.
///
/// Will hang if [`OnceMap::done`] isn't called for this key.
pub fn wait_blocking(&self, key: &K) -> Option<V> {
let entry = self.items.get(key)?;
match entry.value() {
Value::Filled(value) => Some(value.clone()),
Value::Waiting(notify) => {
let notify = notify.clone();
drop(entry);
futures::executor::block_on(notify.notified());

let entry = self.items.get(key).expect("map is append-only");
match entry.value() {
Value::Filled(value) => Some(value.clone()),
Value::Waiting(_) => unreachable!("notify was called"),
}
}
}
}

/// Return the result of a previous job, if any.
pub fn get<Q: ?Sized + Hash + Eq>(&self, key: &Q) -> Option<V>
where
Expand Down
2 changes: 1 addition & 1 deletion crates/uv-installer/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl<'a> Planner<'a> {
#[allow(clippy::too_many_arguments)]
pub fn build(
self,
mut site_packages: SitePackages<'_>,
mut site_packages: SitePackages,
reinstall: &Reinstall,
no_binary: &NoBinary,
hasher: &HashStrategy,
Expand Down
16 changes: 8 additions & 8 deletions crates/uv-installer/src/site_packages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use crate::satisfies::RequirementSatisfaction;
///
/// Packages are indexed by both name and (for editable installs) URL.
#[derive(Debug)]
pub struct SitePackages<'a> {
venv: &'a PythonEnvironment,
pub struct SitePackages {
venv: PythonEnvironment,
/// The vector of all installed distributions. The `by_name` and `by_url` indices index into
/// this vector. The vector may contain `None` values, which represent distributions that were
/// removed from the virtual environment.
Expand All @@ -38,9 +38,9 @@ pub struct SitePackages<'a> {
by_url: FxHashMap<Url, Vec<usize>>,
}

impl<'a> SitePackages<'a> {
impl SitePackages {
/// Build an index of installed packages from the given Python executable.
pub fn from_executable(venv: &'a PythonEnvironment) -> Result<SitePackages<'a>> {
pub fn from_executable(venv: &PythonEnvironment) -> Result<SitePackages> {
let mut distributions: Vec<Option<InstalledDist>> = Vec::new();
let mut by_name = FxHashMap::default();
let mut by_url = FxHashMap::default();
Expand Down Expand Up @@ -68,7 +68,7 @@ impl<'a> SitePackages<'a> {
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
return Ok(Self {
venv,
venv: venv.clone(),
distributions,
by_name,
by_url,
Expand Down Expand Up @@ -107,7 +107,7 @@ impl<'a> SitePackages<'a> {
}

Ok(Self {
venv,
venv: venv.clone(),
distributions,
by_name,
by_url,
Expand Down Expand Up @@ -439,7 +439,7 @@ pub enum SatisfiesResult {
Unsatisfied(String),
}

impl IntoIterator for SitePackages<'_> {
impl IntoIterator for SitePackages {
type Item = InstalledDist;
type IntoIter = Flatten<std::vec::IntoIter<Option<InstalledDist>>>;

Expand Down Expand Up @@ -540,7 +540,7 @@ impl Diagnostic {
}
}

impl InstalledPackagesProvider for SitePackages<'_> {
impl InstalledPackagesProvider for SitePackages {
fn iter(&self) -> impl Iterator<Item = &InstalledDist> {
self.iter()
}
Expand Down
2 changes: 2 additions & 0 deletions crates/uv-resolver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ tokio = { workspace = true }
tokio-stream = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }
dashmap = { workspace = true }
rayon = { workspace = true }

[dev-dependencies]
uv-interpreter = { workspace = true }
Expand Down
10 changes: 4 additions & 6 deletions crates/uv-resolver/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Formatter;
use std::ops::Deref;
use std::rc::Rc;
use std::sync::Arc;

use indexmap::IndexMap;
Expand Down Expand Up @@ -239,7 +238,7 @@ impl NoSolutionError {
mut self,
python_requirement: &PythonRequirement,
visited: &SharedSet<PackageName>,
package_versions: &OnceMap<PackageName, Rc<VersionsResponse>>,
package_versions: &OnceMap<PackageName, Arc<VersionsResponse>>,
) -> Self {
let mut available_versions = IndexMap::default();
for package in self.derivation_tree.packages() {
Expand All @@ -263,7 +262,7 @@ impl NoSolutionError {
// tree, but were never visited during resolution. We _may_ have metadata for
// these packages, but it's non-deterministic, and omitting them ensures that
// we represent the state of the resolver at the time of failure.
if visited.borrow().contains(name) {
if visited.contains(name) {
if let Some(response) = package_versions.get(name) {
if let VersionsResponse::Found(ref version_maps) = *response {
for version_map in version_maps {
Expand Down Expand Up @@ -304,7 +303,6 @@ impl NoSolutionError {
mut self,
unavailable_packages: &SharedMap<PackageName, UnavailablePackage>,
) -> Self {
let unavailable_packages = unavailable_packages.borrow();
let mut new = FxHashMap::default();
for package in self.derivation_tree.packages() {
if let PubGrubPackage::Package(name, _, _) = package {
Expand All @@ -324,11 +322,11 @@ impl NoSolutionError {
incomplete_packages: &SharedMap<PackageName, SharedMap<Version, IncompletePackage>>,
) -> Self {
let mut new = FxHashMap::default();
let incomplete_packages = incomplete_packages.borrow();
for package in self.derivation_tree.packages() {
if let PubGrubPackage::Package(name, _, _) = package {
if let Some(versions) = incomplete_packages.get(name) {
for (version, reason) in versions.borrow().iter() {
for entry in versions.iter() {
let (version, reason) = entry.pair();
new.entry(name.clone())
.or_insert_with(BTreeMap::default)
.insert(version.clone(), reason.clone());
Expand Down
6 changes: 3 additions & 3 deletions crates/uv-resolver/src/resolution/graph.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::hash::BuildHasherDefault;
use std::rc::Rc;
use std::sync::Arc;

use pubgrub::range::Range;
use pubgrub::solver::{Kind, State};
Expand Down Expand Up @@ -45,8 +45,8 @@ impl ResolutionGraph {
pub(crate) fn from_state(
selection: &SelectedDependencies<UvDependencyProvider>,
pins: &FilePins,
packages: &OnceMap<PackageName, Rc<VersionsResponse>>,
distributions: &OnceMap<VersionId, Rc<MetadataResponse>>,
packages: &OnceMap<PackageName, Arc<VersionsResponse>>,
distributions: &OnceMap<VersionId, Arc<MetadataResponse>>,
state: &State<UvDependencyProvider>,
preferences: &Preferences,
editables: Editables,
Expand Down
11 changes: 5 additions & 6 deletions crates/uv-resolver/src/resolver/batch_prefetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::cmp::min;
use itertools::Itertools;
use pubgrub::range::Range;
use rustc_hash::FxHashMap;
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::UnboundedSender;
ibraheemdev marked this conversation as resolved.
Show resolved Hide resolved
use tracing::{debug, trace};

use distribution_types::DistributionMetadata;
Expand Down Expand Up @@ -44,12 +44,12 @@ pub(crate) struct BatchPrefetcher {

impl BatchPrefetcher {
/// Prefetch a large number of versions if we already unsuccessfully tried many versions.
pub(crate) async fn prefetch_batches(
pub(crate) fn prefetch_batches(
&mut self,
next: &PubGrubPackage,
version: &Version,
current_range: &Range<Version>,
request_sink: &Sender<Request>,
request_sink: &UnboundedSender<Request>,
index: &InMemoryIndex,
selector: &CandidateSelector,
) -> anyhow::Result<(), ResolveError> {
Expand All @@ -66,8 +66,7 @@ impl BatchPrefetcher {
// This is immediate, we already fetched the version map.
let versions_response = index
.packages
.wait(package_name)
.await
.wait_blocking(package_name)
.ok_or(ResolveError::Unregistered)?;

let VersionsResponse::Found(ref version_map) = *versions_response else {
Expand Down Expand Up @@ -144,7 +143,7 @@ impl BatchPrefetcher {
prefetch_count += 1;
if index.distributions.register(candidate.version_id()) {
let request = Request::from(dist);
request_sink.send(request).await?;
request_sink.send(request)?;
}
}

Expand Down
14 changes: 7 additions & 7 deletions crates/uv-resolver/src/resolver/index.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::rc::Rc;
use std::sync::Arc;

use distribution_types::VersionId;
use once_map::OnceMap;
Expand All @@ -11,30 +11,30 @@ use crate::resolver::provider::{MetadataResponse, VersionsResponse};
pub struct InMemoryIndex {
/// A map from package name to the metadata for that package and the index where the metadata
/// came from.
pub(crate) packages: OnceMap<PackageName, Rc<VersionsResponse>>,
pub(crate) packages: OnceMap<PackageName, Arc<VersionsResponse>>,

/// A map from package ID to metadata for that distribution.
pub(crate) distributions: OnceMap<VersionId, Rc<MetadataResponse>>,
pub(crate) distributions: OnceMap<VersionId, Arc<MetadataResponse>>,
ibraheemdev marked this conversation as resolved.
Show resolved Hide resolved
}

impl InMemoryIndex {
/// Insert a [`VersionsResponse`] into the index.
pub fn insert_package(&self, package_name: PackageName, response: VersionsResponse) {
self.packages.done(package_name, Rc::new(response));
self.packages.done(package_name, Arc::new(response));
}

/// Insert a [`Metadata23`] into the index.
pub fn insert_metadata(&self, version_id: VersionId, response: MetadataResponse) {
self.distributions.done(version_id, Rc::new(response));
self.distributions.done(version_id, Arc::new(response));
}

/// Get the [`VersionsResponse`] for a given package name, without waiting.
pub fn get_package(&self, package_name: &PackageName) -> Option<Rc<VersionsResponse>> {
pub fn get_package(&self, package_name: &PackageName) -> Option<Arc<VersionsResponse>> {
self.packages.get(package_name)
}

/// Get the [`MetadataResponse`] for a given package ID, without waiting.
pub fn get_metadata(&self, version_id: &VersionId) -> Option<Rc<MetadataResponse>> {
pub fn get_metadata(&self, version_id: &VersionId) -> Option<Arc<MetadataResponse>> {
self.distributions.get(version_id)
}
}
Loading