From a4e5e55d643f28650166a251c3f3e3c3aa6414d5 Mon Sep 17 00:00:00 2001 From: Ibraheem Ahmed Date: Wed, 15 May 2024 16:53:11 -0400 Subject: [PATCH 01/11] spawn pubgrub solver as separate thread --- Cargo.lock | 3 + crates/once-map/Cargo.toml | 1 + crates/once-map/src/lib.rs | 21 + crates/uv-installer/src/plan.rs | 2 +- crates/uv-installer/src/site_packages.rs | 16 +- crates/uv-resolver/Cargo.toml | 2 + crates/uv-resolver/src/error.rs | 10 +- crates/uv-resolver/src/resolution/graph.rs | 6 +- .../src/resolver/batch_prefetch.rs | 11 +- crates/uv-resolver/src/resolver/index.rs | 14 +- crates/uv-resolver/src/resolver/mod.rs | 697 ++++++++++-------- crates/uv-types/src/traits.rs | 2 +- crates/uv/src/commands/pip/editables.rs | 2 +- crates/uv/src/commands/pip/install.rs | 4 +- crates/uv/src/commands/project/mod.rs | 2 +- 15 files changed, 431 insertions(+), 362 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 971beeae8233..afb8cbc8eda1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2364,6 +2364,7 @@ name = "once-map" version = "0.0.1" dependencies = [ "dashmap", + "futures", "tokio", ] @@ -4998,6 +4999,7 @@ dependencies = [ "cache-key", "chrono", "clap", + "dashmap", "derivative", "distribution-filename", "distribution-types", @@ -5016,6 +5018,7 @@ dependencies = [ "platform-tags", "pubgrub", "pypi-types", + "rayon", "requirements-txt", "rkyv", "rustc-hash", diff --git a/crates/once-map/Cargo.toml b/crates/once-map/Cargo.toml index 8e6add42aeb7..9e31e9e441e5 100644 --- a/crates/once-map/Cargo.toml +++ b/crates/once-map/Cargo.toml @@ -15,3 +15,4 @@ workspace = true [dependencies] dashmap = { workspace = true } tokio = { workspace = true } +futures = { workspace = true } diff --git a/crates/once-map/src/lib.rs b/crates/once-map/src/lib.rs index 990626f6e508..7bdd279b49bb 100644 --- a/crates/once-map/src/lib.rs +++ b/crates/once-map/src/lib.rs @@ -63,6 +63,27 @@ impl OnceMap { } } + /// Wait for the result of a job that is running. + /// + /// Will hang if [`OnceMap::done`] isn't called for this key. + pub fn wait_blocking(&self, key: &K) -> Option { + let entry = self.items.get(key)?; + match entry.value() { + Value::Filled(value) => Some(value.clone()), + Value::Waiting(notify) => { + let notify = notify.clone(); + drop(entry); + futures::executor::block_on(notify.notified()); + + let entry = self.items.get(key).expect("map is append-only"); + match entry.value() { + Value::Filled(value) => Some(value.clone()), + Value::Waiting(_) => unreachable!("notify was called"), + } + } + } + } + /// Return the result of a previous job, if any. pub fn get(&self, key: &Q) -> Option where diff --git a/crates/uv-installer/src/plan.rs b/crates/uv-installer/src/plan.rs index 85709c89c806..67c7c2b98b56 100644 --- a/crates/uv-installer/src/plan.rs +++ b/crates/uv-installer/src/plan.rs @@ -68,7 +68,7 @@ impl<'a> Planner<'a> { #[allow(clippy::too_many_arguments)] pub fn build( self, - mut site_packages: SitePackages<'_>, + mut site_packages: SitePackages, reinstall: &Reinstall, no_binary: &NoBinary, hasher: &HashStrategy, diff --git a/crates/uv-installer/src/site_packages.rs b/crates/uv-installer/src/site_packages.rs index be56d9c10d68..723a47fd0aca 100644 --- a/crates/uv-installer/src/site_packages.rs +++ b/crates/uv-installer/src/site_packages.rs @@ -24,8 +24,8 @@ use crate::satisfies::RequirementSatisfaction; /// /// Packages are indexed by both name and (for editable installs) URL. #[derive(Debug)] -pub struct SitePackages<'a> { - venv: &'a PythonEnvironment, +pub struct SitePackages { + venv: PythonEnvironment, /// The vector of all installed distributions. The `by_name` and `by_url` indices index into /// this vector. The vector may contain `None` values, which represent distributions that were /// removed from the virtual environment. @@ -38,9 +38,9 @@ pub struct SitePackages<'a> { by_url: FxHashMap>, } -impl<'a> SitePackages<'a> { +impl SitePackages { /// Build an index of installed packages from the given Python executable. - pub fn from_executable(venv: &'a PythonEnvironment) -> Result> { + pub fn from_executable(venv: &PythonEnvironment) -> Result { let mut distributions: Vec> = Vec::new(); let mut by_name = FxHashMap::default(); let mut by_url = FxHashMap::default(); @@ -68,7 +68,7 @@ impl<'a> SitePackages<'a> { } Err(err) if err.kind() == std::io::ErrorKind::NotFound => { return Ok(Self { - venv, + venv: venv.clone(), distributions, by_name, by_url, @@ -107,7 +107,7 @@ impl<'a> SitePackages<'a> { } Ok(Self { - venv, + venv: venv.clone(), distributions, by_name, by_url, @@ -439,7 +439,7 @@ pub enum SatisfiesResult { Unsatisfied(String), } -impl IntoIterator for SitePackages<'_> { +impl IntoIterator for SitePackages { type Item = InstalledDist; type IntoIter = Flatten>>; @@ -540,7 +540,7 @@ impl Diagnostic { } } -impl InstalledPackagesProvider for SitePackages<'_> { +impl InstalledPackagesProvider for SitePackages { fn iter(&self) -> impl Iterator { self.iter() } diff --git a/crates/uv-resolver/Cargo.toml b/crates/uv-resolver/Cargo.toml index 7239d3079527..305c97a42f85 100644 --- a/crates/uv-resolver/Cargo.toml +++ b/crates/uv-resolver/Cargo.toml @@ -56,6 +56,8 @@ tokio = { workspace = true } tokio-stream = { workspace = true } tracing = { workspace = true } url = { workspace = true } +dashmap = { workspace = true } +rayon = { workspace = true } [dev-dependencies] uv-interpreter = { workspace = true } diff --git a/crates/uv-resolver/src/error.rs b/crates/uv-resolver/src/error.rs index 8031f944c417..e16584a28b53 100644 --- a/crates/uv-resolver/src/error.rs +++ b/crates/uv-resolver/src/error.rs @@ -1,7 +1,6 @@ use std::collections::{BTreeMap, BTreeSet}; use std::fmt::Formatter; use std::ops::Deref; -use std::rc::Rc; use std::sync::Arc; use indexmap::IndexMap; @@ -239,7 +238,7 @@ impl NoSolutionError { mut self, python_requirement: &PythonRequirement, visited: &SharedSet, - package_versions: &OnceMap>, + package_versions: &OnceMap>, ) -> Self { let mut available_versions = IndexMap::default(); for package in self.derivation_tree.packages() { @@ -263,7 +262,7 @@ impl NoSolutionError { // tree, but were never visited during resolution. We _may_ have metadata for // these packages, but it's non-deterministic, and omitting them ensures that // we represent the state of the resolver at the time of failure. - if visited.borrow().contains(name) { + if visited.contains(name) { if let Some(response) = package_versions.get(name) { if let VersionsResponse::Found(ref version_maps) = *response { for version_map in version_maps { @@ -304,7 +303,6 @@ impl NoSolutionError { mut self, unavailable_packages: &SharedMap, ) -> Self { - let unavailable_packages = unavailable_packages.borrow(); let mut new = FxHashMap::default(); for package in self.derivation_tree.packages() { if let PubGrubPackage::Package(name, _, _) = package { @@ -324,11 +322,11 @@ impl NoSolutionError { incomplete_packages: &SharedMap>, ) -> Self { let mut new = FxHashMap::default(); - let incomplete_packages = incomplete_packages.borrow(); for package in self.derivation_tree.packages() { if let PubGrubPackage::Package(name, _, _) = package { if let Some(versions) = incomplete_packages.get(name) { - for (version, reason) in versions.borrow().iter() { + for entry in versions.iter() { + let (version, reason) = entry.pair(); new.entry(name.clone()) .or_insert_with(BTreeMap::default) .insert(version.clone(), reason.clone()); diff --git a/crates/uv-resolver/src/resolution/graph.rs b/crates/uv-resolver/src/resolution/graph.rs index ae0789b83f61..ff04afe86c65 100644 --- a/crates/uv-resolver/src/resolution/graph.rs +++ b/crates/uv-resolver/src/resolution/graph.rs @@ -1,5 +1,5 @@ use std::hash::BuildHasherDefault; -use std::rc::Rc; +use std::sync::Arc; use pubgrub::range::Range; use pubgrub::solver::{Kind, State}; @@ -45,8 +45,8 @@ impl ResolutionGraph { pub(crate) fn from_state( selection: &SelectedDependencies, pins: &FilePins, - packages: &OnceMap>, - distributions: &OnceMap>, + packages: &OnceMap>, + distributions: &OnceMap>, state: &State, preferences: &Preferences, editables: Editables, diff --git a/crates/uv-resolver/src/resolver/batch_prefetch.rs b/crates/uv-resolver/src/resolver/batch_prefetch.rs index a5d25937825a..603a18c6ec64 100644 --- a/crates/uv-resolver/src/resolver/batch_prefetch.rs +++ b/crates/uv-resolver/src/resolver/batch_prefetch.rs @@ -3,7 +3,7 @@ use std::cmp::min; use itertools::Itertools; use pubgrub::range::Range; use rustc_hash::FxHashMap; -use tokio::sync::mpsc::Sender; +use tokio::sync::mpsc::UnboundedSender; use tracing::{debug, trace}; use distribution_types::DistributionMetadata; @@ -44,12 +44,12 @@ pub(crate) struct BatchPrefetcher { impl BatchPrefetcher { /// Prefetch a large number of versions if we already unsuccessfully tried many versions. - pub(crate) async fn prefetch_batches( + pub(crate) fn prefetch_batches( &mut self, next: &PubGrubPackage, version: &Version, current_range: &Range, - request_sink: &Sender, + request_sink: &UnboundedSender, index: &InMemoryIndex, selector: &CandidateSelector, ) -> anyhow::Result<(), ResolveError> { @@ -66,8 +66,7 @@ impl BatchPrefetcher { // This is immediate, we already fetched the version map. let versions_response = index .packages - .wait(package_name) - .await + .wait_blocking(package_name) .ok_or(ResolveError::Unregistered)?; let VersionsResponse::Found(ref version_map) = *versions_response else { @@ -144,7 +143,7 @@ impl BatchPrefetcher { prefetch_count += 1; if index.distributions.register(candidate.version_id()) { let request = Request::from(dist); - request_sink.send(request).await?; + request_sink.send(request)?; } } diff --git a/crates/uv-resolver/src/resolver/index.rs b/crates/uv-resolver/src/resolver/index.rs index 8f06bbd07cdf..aae0a31299ba 100644 --- a/crates/uv-resolver/src/resolver/index.rs +++ b/crates/uv-resolver/src/resolver/index.rs @@ -1,4 +1,4 @@ -use std::rc::Rc; +use std::sync::Arc; use distribution_types::VersionId; use once_map::OnceMap; @@ -11,30 +11,30 @@ use crate::resolver::provider::{MetadataResponse, VersionsResponse}; pub struct InMemoryIndex { /// A map from package name to the metadata for that package and the index where the metadata /// came from. - pub(crate) packages: OnceMap>, + pub(crate) packages: OnceMap>, /// A map from package ID to metadata for that distribution. - pub(crate) distributions: OnceMap>, + pub(crate) distributions: OnceMap>, } impl InMemoryIndex { /// Insert a [`VersionsResponse`] into the index. pub fn insert_package(&self, package_name: PackageName, response: VersionsResponse) { - self.packages.done(package_name, Rc::new(response)); + self.packages.done(package_name, Arc::new(response)); } /// Insert a [`Metadata23`] into the index. pub fn insert_metadata(&self, version_id: VersionId, response: MetadataResponse) { - self.distributions.done(version_id, Rc::new(response)); + self.distributions.done(version_id, Arc::new(response)); } /// Get the [`VersionsResponse`] for a given package name, without waiting. - pub fn get_package(&self, package_name: &PackageName) -> Option> { + pub fn get_package(&self, package_name: &PackageName) -> Option> { self.packages.get(package_name) } /// Get the [`MetadataResponse`] for a given package ID, without waiting. - pub fn get_metadata(&self, version_id: &VersionId) -> Option> { + pub fn get_metadata(&self, version_id: &VersionId) -> Option> { self.distributions.get(version_id) } } diff --git a/crates/uv-resolver/src/resolver/mod.rs b/crates/uv-resolver/src/resolver/mod.rs index cdb12bd66d91..9d8e5a5afff9 100644 --- a/crates/uv-resolver/src/resolver/mod.rs +++ b/crates/uv-resolver/src/resolver/mod.rs @@ -1,22 +1,21 @@ //! Given a set of requirements, find a set of compatible packages. use std::borrow::Cow; -use std::cell::RefCell; -use std::collections::{HashMap, HashSet}; use std::fmt::{Display, Formatter}; use std::ops::Deref; -use std::rc::Rc; use std::sync::Arc; use anyhow::Result; +use dashmap::{DashMap, DashSet}; use futures::{FutureExt, StreamExt}; use itertools::Itertools; use pubgrub::error::PubGrubError; use pubgrub::range::Range; use pubgrub::solver::{Incompatibility, State}; use rustc_hash::{FxHashMap, FxHashSet}; -use tokio_stream::wrappers::ReceiverStream; -use tracing::{debug, enabled, info_span, instrument, trace, warn, Instrument, Level}; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tracing::{debug, enabled, instrument, trace, warn, Level}; use distribution_types::{ BuiltDist, Dist, DistributionMetadata, IncompatibleDist, IncompatibleSource, IncompatibleWheel, @@ -172,23 +171,13 @@ enum ResolverVersion { Unavailable(Version, UnavailableVersion), } -pub(crate) type SharedMap = Rc>>; -pub(crate) type SharedSet = Rc>>; +pub(crate) type SharedMap = Arc>; +pub(crate) type SharedSet = Arc>; pub struct Resolver<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvider> { project: Option, - requirements: Vec, - constraints: Constraints, - overrides: Overrides, preferences: Preferences, exclusions: Exclusions, - editables: Editables, - urls: Urls, - locals: Locals, - dependency_mode: DependencyMode, - hasher: &'a HashStrategy, - /// When not set, the resolver is in "universal" mode. - markers: Option<&'a MarkerEnvironment>, python_requirement: &'a PythonRequirement, selector: CandidateSelector, index: &'a InMemoryIndex, @@ -201,6 +190,20 @@ pub struct Resolver<'a, Provider: ResolverProvider, InstalledPackages: Installed visited: SharedSet, reporter: Option>, provider: Provider, + solver_state: Option>, +} + +struct SolverState<'a> { + requirements: Vec, + constraints: Constraints, + overrides: Overrides, + editables: Editables, + urls: Urls, + locals: Locals, + dependency_mode: DependencyMode, + hasher: &'a HashStrategy, + /// When not set, the resolver is in "universal" mode. + markers: Option<&'a MarkerEnvironment>, } impl<'a, Context: BuildContext, InstalledPackages: InstalledPackagesProvider> @@ -278,28 +281,33 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide provider: Provider, installed_packages: &'a InstalledPackages, ) -> Result { - Ok(Self { - index, - unavailable_packages: SharedMap::default(), - incomplete_packages: SharedMap::default(), - visited: SharedSet::default(), - selector: CandidateSelector::for_resolution(options, &manifest, markers), + let selector = CandidateSelector::for_resolution(options, &manifest, markers); + let solver_state = SolverState { dependency_mode: options.dependency_mode, urls: Urls::from_manifest(&manifest, markers, options.dependency_mode)?, locals: Locals::from_manifest(&manifest, markers, options.dependency_mode), - project: manifest.project, requirements: manifest.requirements, constraints: manifest.constraints, overrides: manifest.overrides, - preferences: Preferences::from_iter(manifest.preferences, markers), - exclusions: manifest.exclusions, editables: Editables::from_requirements(manifest.editables), hasher, markers, + }; + + Ok(Self { + index, + unavailable_packages: SharedMap::default(), + incomplete_packages: SharedMap::default(), + visited: SharedSet::default(), + selector, + project: manifest.project, + preferences: Preferences::from_iter(manifest.preferences, markers), + exclusions: manifest.exclusions, python_requirement, reporter: None, provider, installed_packages, + solver_state: Some(solver_state), }) } @@ -315,17 +323,29 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide } /// Resolve a set of requirements into a set of pinned versions. - pub async fn resolve(self) -> Result { + pub async fn resolve(mut self) -> Result { // A channel to fetch package metadata (e.g., given `flask`, fetch all versions) and version // metadata (e.g., given `flask==1.0.0`, fetch the metadata for that version). // Channel size is set large to accommodate batch prefetching. - let (request_sink, request_stream) = tokio::sync::mpsc::channel(300); + let (request_sink, request_stream) = tokio::sync::mpsc::unbounded_channel(); + let solver_state = self.solver_state.take().unwrap(); // Run the fetcher. let requests_fut = self.fetch(request_stream).fuse(); // Run the solver. - let resolve_fut = self.solve(request_sink).boxed_local().fuse(); + #[allow(unsafe_code)] + let solver: Solver<'static, InstalledPackages> = + unsafe { std::mem::transmute(Solver::new(&self, solver_state)) }; + let (tx, rx) = tokio::sync::oneshot::channel(); + std::thread::Builder::new() + .name("solver".into()) + .spawn(move || { + let result = solver.solve(request_sink); + tx.send(result).unwrap(); + }) + .unwrap(); + let resolve_fut = async { rx.await.unwrap() }; // Wait for both to complete. match tokio::try_join!(requests_fut, resolve_fut) { @@ -355,11 +375,285 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide } } + /// Fetch the metadata for a stream of packages and versions. + async fn fetch(&self, request_stream: UnboundedReceiver) -> Result<(), ResolveError> { + let mut response_stream = UnboundedReceiverStream::new(request_stream) + .map(|request| self.process_request(request).boxed_local()) + // Allow as many futures as possible to start in the background. + // Backpressure is provided by at a more granular level by `DistributionDatabase` + // and `SourceDispatch`, as well as the bounded request channel. + .buffer_unordered(usize::MAX); + + while let Some(response) = response_stream.next().await { + match response? { + Some(Response::Package(package_name, version_map)) => { + trace!("Received package metadata for: {package_name}"); + self.index + .packages + .done(package_name, Arc::new(version_map)); + } + Some(Response::Installed { dist, metadata }) => { + trace!("Received installed distribution metadata for: {dist}"); + self.index.distributions.done( + dist.version_id(), + Arc::new(MetadataResponse::Found(ArchiveMetadata::from(metadata))), + ); + } + Some(Response::Dist { + dist: Dist::Built(dist), + metadata, + }) => { + trace!("Received built distribution metadata for: {dist}"); + match &metadata { + MetadataResponse::InvalidMetadata(err) => { + warn!("Unable to extract metadata for {dist}: {err}"); + } + MetadataResponse::InvalidStructure(err) => { + warn!("Unable to extract metadata for {dist}: {err}"); + } + _ => {} + } + self.index + .distributions + .done(dist.version_id(), Arc::new(metadata)); + } + Some(Response::Dist { + dist: Dist::Source(dist), + metadata, + }) => { + trace!("Received source distribution metadata for: {dist}"); + match &metadata { + MetadataResponse::InvalidMetadata(err) => { + warn!("Unable to extract metadata for {dist}: {err}"); + } + MetadataResponse::InvalidStructure(err) => { + warn!("Unable to extract metadata for {dist}: {err}"); + } + _ => {} + } + self.index + .distributions + .done(dist.version_id(), Arc::new(metadata)); + } + None => {} + } + } + + Ok::<(), ResolveError>(()) + } + + #[instrument(skip_all, fields(%request))] + async fn process_request(&self, request: Request) -> Result, ResolveError> { + match request { + // Fetch package metadata from the registry. + Request::Package(package_name) => { + let package_versions = self + .provider + .get_package_versions(&package_name) + .boxed_local() + .await + .map_err(ResolveError::Client)?; + + Ok(Some(Response::Package(package_name, package_versions))) + } + + // Fetch distribution metadata from the distribution database. + Request::Dist(dist) => { + let metadata = self + .provider + .get_or_build_wheel_metadata(&dist) + .boxed_local() + .await + .map_err(|err| match dist.clone() { + Dist::Built(built_dist @ BuiltDist::Path(_)) => { + ResolveError::Read(Box::new(built_dist), err) + } + Dist::Source(source_dist @ SourceDist::Path(_)) => { + ResolveError::Build(Box::new(source_dist), err) + } + Dist::Source(source_dist @ SourceDist::Directory(_)) => { + ResolveError::Build(Box::new(source_dist), err) + } + Dist::Built(built_dist) => ResolveError::Fetch(Box::new(built_dist), err), + Dist::Source(source_dist) => { + ResolveError::FetchAndBuild(Box::new(source_dist), err) + } + })?; + Ok(Some(Response::Dist { dist, metadata })) + } + + Request::Installed(dist) => { + let metadata = dist + .metadata() + .map_err(|err| ResolveError::ReadInstalled(Box::new(dist.clone()), err))?; + Ok(Some(Response::Installed { dist, metadata })) + } + + // Pre-fetch the package and distribution metadata. + Request::Prefetch(package_name, range) => { + // Wait for the package metadata to become available. + let versions_response = self + .index + .packages + .wait(&package_name) + .await + .ok_or(ResolveError::Unregistered)?; + + let version_map = match *versions_response { + VersionsResponse::Found(ref version_map) => version_map, + // Short-circuit if we did not find any versions for the package + VersionsResponse::NoIndex => { + self.unavailable_packages + .insert(package_name.clone(), UnavailablePackage::NoIndex); + + return Ok(None); + } + VersionsResponse::Offline => { + self.unavailable_packages + .insert(package_name.clone(), UnavailablePackage::Offline); + + return Ok(None); + } + VersionsResponse::NotFound => { + self.unavailable_packages + .insert(package_name.clone(), UnavailablePackage::NotFound); + + return Ok(None); + } + }; + + // Try to find a compatible version. If there aren't any compatible versions, + // short-circuit. + let Some(candidate) = self.selector.select( + &package_name, + &range, + version_map, + &self.preferences, + self.installed_packages, + &self.exclusions, + ) else { + return Ok(None); + }; + + // If there is not a compatible distribution, short-circuit. + let Some(dist) = candidate.compatible() else { + return Ok(None); + }; + + // Emit a request to fetch the metadata for this version. + if self.index.distributions.register(candidate.version_id()) { + let dist = dist.for_resolution().to_owned(); + + let response = match dist { + ResolvedDist::Installable(dist) => { + let metadata = self + .provider + .get_or_build_wheel_metadata(&dist) + .boxed_local() + .await + .map_err(|err| match dist.clone() { + Dist::Built(built_dist @ BuiltDist::Path(_)) => { + ResolveError::Read(Box::new(built_dist), err) + } + Dist::Source(source_dist @ SourceDist::Path(_)) => { + ResolveError::Build(Box::new(source_dist), err) + } + Dist::Source(source_dist @ SourceDist::Directory(_)) => { + ResolveError::Build(Box::new(source_dist), err) + } + Dist::Built(built_dist) => { + ResolveError::Fetch(Box::new(built_dist), err) + } + Dist::Source(source_dist) => { + ResolveError::FetchAndBuild(Box::new(source_dist), err) + } + })?; + Response::Dist { dist, metadata } + } + ResolvedDist::Installed(dist) => { + let metadata = dist.metadata().map_err(|err| { + ResolveError::ReadInstalled(Box::new(dist.clone()), err) + })?; + Response::Installed { dist, metadata } + } + }; + + Ok(Some(response)) + } else { + Ok(None) + } + } + } + } + + fn on_complete(&self) { + if let Some(reporter) = self.reporter.as_ref() { + reporter.on_complete(); + } + } +} + +struct Solver<'a, InstalledPackages: InstalledPackagesProvider> { + project: Option, + requirements: Vec, + constraints: Constraints, + overrides: Overrides, + preferences: &'a Preferences, + exclusions: &'a Exclusions, + editables: Editables, + urls: Urls, + locals: Locals, + dependency_mode: DependencyMode, + hasher: &'a HashStrategy, + /// When not set, the resolver is in "universal" mode. + markers: Option<&'a MarkerEnvironment>, + python_requirement: PythonRequirement, + selector: CandidateSelector, + index: &'a InMemoryIndex, + installed_packages: &'a InstalledPackages, + /// Incompatibilities for packages that are entirely unavailable. + unavailable_packages: SharedMap, + /// Incompatibilities for packages that are unavailable at specific versions. + incomplete_packages: SharedMap>, + /// The set of all registry-based packages visited during resolution. + visited: SharedSet, + reporter: Option>, +} + +impl<'a, InstalledPackages: InstalledPackagesProvider> Solver<'a, InstalledPackages> { + fn new( + resolver: &'a Resolver<'a, Provider, InstalledPackages>, + state: SolverState<'a>, + ) -> Self { + Solver { + project: resolver.project.clone(), + requirements: state.requirements, + constraints: state.constraints, + overrides: state.overrides, + editables: state.editables, + urls: state.urls, + locals: state.locals, + dependency_mode: state.dependency_mode, + hasher: state.hasher, + markers: state.markers, + preferences: &resolver.preferences, + exclusions: &resolver.exclusions, + python_requirement: resolver.python_requirement.clone(), + selector: resolver.selector.clone(), + index: resolver.index, + installed_packages: resolver.installed_packages, + unavailable_packages: resolver.unavailable_packages.clone(), + incomplete_packages: resolver.incomplete_packages.clone(), + visited: resolver.visited.clone(), + reporter: resolver.reporter.clone(), + } + } + /// Run the PubGrub solver. #[instrument(skip_all)] - async fn solve( + fn solve( &self, - request_sink: tokio::sync::mpsc::Sender, + request_sink: UnboundedSender, ) -> Result { let root = PubGrubPackage::Root(self.project.clone()); let mut prefetcher = BatchPrefetcher::default(); @@ -386,8 +680,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide Self::pre_visit( state.pubgrub.partial_solution.prioritized_packages(), &request_sink, - ) - .await?; + )?; } // Choose a package version. @@ -421,14 +714,12 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide .ok_or_else(|| { PubGrubError::Failure("a package was chosen but we don't have a term.".into()) })?; - let decision = self - .choose_version( - &state.next, - term_intersection.unwrap_positive(), - &mut state.pins, - &request_sink, - ) - .await?; + let decision = self.choose_version( + &state.next, + term_intersection.unwrap_positive(), + &mut state.pins, + &request_sink, + )?; // Pick the next compatible version. let version = match decision { @@ -443,7 +734,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide // Check if the decision was due to the package being unavailable if let PubGrubPackage::Package(ref package_name, _, _) = state.next { - if let Some(entry) = self.unavailable_packages.borrow().get(package_name) { + if let Some(entry) = self.unavailable_packages.get(package_name) { state .pubgrub .add_incompatibility(Incompatibility::custom_term( @@ -512,16 +803,14 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide } }; - prefetcher - .prefetch_batches( - &state.next, - &version, - term_intersection.unwrap_positive(), - &request_sink, - self.index, - &self.selector, - ) - .await?; + prefetcher.prefetch_batches( + &state.next, + &version, + term_intersection.unwrap_positive(), + &request_sink, + self.index, + &self.selector, + )?; self.on_progress(&state.next, &version); @@ -533,10 +822,12 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide { // Retrieve that package dependencies. let package = &state.next; - let dependencies = match self - .get_dependencies(package, &version, &mut state.priorities, &request_sink) - .await? - { + let dependencies = match self.get_dependencies( + package, + &version, + &mut state.priorities, + &request_sink, + )? { Dependencies::Unavailable(reason) => { state .pubgrub @@ -590,10 +881,10 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide /// Visit a [`PubGrubPackage`] prior to selection. This should be called on a [`PubGrubPackage`] /// before it is selected, to allow metadata to be fetched in parallel. - async fn visit_package( + fn visit_package( &self, package: &PubGrubPackage, - request_sink: &tokio::sync::mpsc::Sender, + request_sink: &UnboundedSender, ) -> Result<(), ResolveError> { match package { PubGrubPackage::Root(_) => {} @@ -607,7 +898,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide // Emit a request to fetch the metadata for this package. if self.index.packages.register(name.clone()) { - request_sink.send(Request::Package(name.clone())).await?; + request_sink.send(Request::Package(name.clone()))?; } } PubGrubPackage::Package(name, _extra, Some(url)) => { @@ -624,7 +915,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide // Emit a request to fetch the metadata for this distribution. let dist = Dist::from_url(name.clone(), url.clone())?; if self.index.distributions.register(dist.version_id()) { - request_sink.send(Request::Dist(dist)).await?; + request_sink.send(Request::Dist(dist))?; } } } @@ -633,9 +924,9 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide /// Visit the set of [`PubGrubPackage`] candidates prior to selection. This allows us to fetch /// metadata for all of the packages in parallel. - async fn pre_visit<'data>( + fn pre_visit<'data>( packages: impl Iterator)>, - request_sink: &tokio::sync::mpsc::Sender, + request_sink: &UnboundedSender, ) -> Result<(), ResolveError> { // Iterate over the potential packages, and fetch file metadata for any of them. These // represent our current best guesses for the versions that we _might_ select. @@ -643,9 +934,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide let PubGrubPackage::Package(package_name, None, None) = package else { continue; }; - request_sink - .send(Request::Prefetch(package_name.clone(), range.clone())) - .await?; + request_sink.send(Request::Prefetch(package_name.clone(), range.clone()))?; } Ok(()) } @@ -655,12 +944,12 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide /// /// Returns [None] when there are no versions in the given range. #[instrument(skip_all, fields(%package))] - async fn choose_version( + fn choose_version( &self, package: &'a PubGrubPackage, range: &Range, pins: &mut FilePins, - request_sink: &tokio::sync::mpsc::Sender, + request_sink: &UnboundedSender, ) -> Result, ResolveError> { match package { PubGrubPackage::Root(_) => Ok(Some(ResolverVersion::Available(MIN_VERSION.clone()))), @@ -719,8 +1008,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide let response = self .index .distributions - .wait(&dist.version_id()) - .await + .wait_blocking(&dist.version_id()) .ok_or(ResolveError::Unregistered)?; // If we failed to fetch the metadata for a URL, we can't proceed. @@ -728,26 +1016,25 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide MetadataResponse::Found(archive) => &archive.metadata, MetadataResponse::Offline => { self.unavailable_packages - .borrow_mut() .insert(package_name.clone(), UnavailablePackage::Offline); return Ok(None); } MetadataResponse::InvalidMetadata(err) => { - self.unavailable_packages.borrow_mut().insert( + self.unavailable_packages.insert( package_name.clone(), UnavailablePackage::InvalidMetadata(err.to_string()), ); return Ok(None); } MetadataResponse::InconsistentMetadata(err) => { - self.unavailable_packages.borrow_mut().insert( + self.unavailable_packages.insert( package_name.clone(), UnavailablePackage::InvalidMetadata(err.to_string()), ); return Ok(None); } MetadataResponse::InvalidStructure(err) => { - self.unavailable_packages.borrow_mut().insert( + self.unavailable_packages.insert( package_name.clone(), UnavailablePackage::InvalidStructure(err.to_string()), ); @@ -784,29 +1071,24 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide let versions_response = self .index .packages - .wait(package_name) - .instrument(info_span!("package_wait", %package_name)) - .await + .wait_blocking(package_name) .ok_or(ResolveError::Unregistered)?; - self.visited.borrow_mut().insert(package_name.clone()); + self.visited.insert(package_name.clone()); let version_maps = match *versions_response { VersionsResponse::Found(ref version_maps) => version_maps.as_slice(), VersionsResponse::NoIndex => { self.unavailable_packages - .borrow_mut() .insert(package_name.clone(), UnavailablePackage::NoIndex); &[] } VersionsResponse::Offline => { self.unavailable_packages - .borrow_mut() .insert(package_name.clone(), UnavailablePackage::Offline); &[] } VersionsResponse::NotFound => { self.unavailable_packages - .borrow_mut() .insert(package_name.clone(), UnavailablePackage::NotFound); &[] } @@ -865,7 +1147,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide if matches!(package, PubGrubPackage::Package(_, _, _)) { if self.index.distributions.register(candidate.version_id()) { let request = Request::from(dist.for_resolution()); - request_sink.send(request).await?; + request_sink.send(request)?; } } @@ -876,12 +1158,12 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide /// Given a candidate package and version, return its dependencies. #[instrument(skip_all, fields(%package, %version))] - async fn get_dependencies( + fn get_dependencies( &self, package: &PubGrubPackage, version: &Version, priorities: &mut PubGrubPriorities, - request_sink: &tokio::sync::mpsc::Sender, + request_sink: &UnboundedSender, ) -> Result { match package { PubGrubPackage::Root(_) => { @@ -913,7 +1195,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide priorities.insert(package, version); // Emit a request to fetch the metadata for this package. - self.visit_package(package, request_sink).await?; + self.visit_package(package, request_sink)?; } // Add a dependency on each editable. @@ -975,9 +1257,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide // Wait for the metadata to be available. self.index .distributions - .wait(&version_id) - .instrument(info_span!("distributions_wait", %version_id)) - .await + .wait_blocking(&version_id) .ok_or(ResolveError::Unregistered)?; } @@ -1010,7 +1290,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide priorities.insert(dep_package, dep_version); // Emit a request to fetch the metadata for this package. - self.visit_package(dep_package, request_sink).await?; + self.visit_package(dep_package, request_sink)?; } return Ok(Dependencies::Available(dependencies.into())); @@ -1024,11 +1304,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide let version_id = dist.version_id(); // If the package does not exist in the registry or locally, we cannot fetch its dependencies - if self - .unavailable_packages - .borrow() - .get(package_name) - .is_some() + if self.unavailable_packages.get(package_name).is_some() && self .installed_packages .get_packages(package_name) @@ -1047,29 +1323,23 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide let response = self .index .distributions - .wait(&version_id) - .instrument(info_span!("distributions_wait", %version_id)) - .await + .wait_blocking(&version_id) .ok_or(ResolveError::Unregistered)?; let metadata = match &*response { MetadataResponse::Found(archive) => &archive.metadata, MetadataResponse::Offline => { self.incomplete_packages - .borrow_mut() .entry(package_name.clone()) .or_default() - .borrow_mut() .insert(version.clone(), IncompletePackage::Offline); return Ok(Dependencies::Unavailable(UnavailableVersion::Offline)); } MetadataResponse::InvalidMetadata(err) => { warn!("Unable to extract metadata for {package_name}: {err}"); self.incomplete_packages - .borrow_mut() .entry(package_name.clone()) .or_default() - .borrow_mut() .insert( version.clone(), IncompletePackage::InvalidMetadata(err.to_string()), @@ -1081,10 +1351,8 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide MetadataResponse::InconsistentMetadata(err) => { warn!("Unable to extract metadata for {package_name}: {err}"); self.incomplete_packages - .borrow_mut() .entry(package_name.clone()) .or_default() - .borrow_mut() .insert( version.clone(), IncompletePackage::InconsistentMetadata(err.to_string()), @@ -1096,10 +1364,8 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide MetadataResponse::InvalidStructure(err) => { warn!("Unable to extract metadata for {package_name}: {err}"); self.incomplete_packages - .borrow_mut() .entry(package_name.clone()) .or_default() - .borrow_mut() .insert( version.clone(), IncompletePackage::InvalidStructure(err.to_string()), @@ -1134,7 +1400,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide priorities.insert(dep_package, dep_version); // Emit a request to fetch the metadata for this package. - self.visit_package(dep_package, request_sink).await?; + self.visit_package(dep_package, request_sink)?; } Ok(Dependencies::Available(dependencies.into())) @@ -1154,221 +1420,6 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide } } - /// Fetch the metadata for a stream of packages and versions. - async fn fetch( - &self, - request_stream: tokio::sync::mpsc::Receiver, - ) -> Result<(), ResolveError> { - let mut response_stream = ReceiverStream::new(request_stream) - .map(|request| self.process_request(request).boxed_local()) - // Allow as many futures as possible to start in the background. - // Backpressure is provided by at a more granular level by `DistributionDatabase` - // and `SourceDispatch`, as well as the bounded request channel. - .buffer_unordered(usize::MAX); - - while let Some(response) = response_stream.next().await { - match response? { - Some(Response::Package(package_name, version_map)) => { - trace!("Received package metadata for: {package_name}"); - self.index.packages.done(package_name, Rc::new(version_map)); - } - Some(Response::Installed { dist, metadata }) => { - trace!("Received installed distribution metadata for: {dist}"); - self.index.distributions.done( - dist.version_id(), - Rc::new(MetadataResponse::Found(ArchiveMetadata::from(metadata))), - ); - } - Some(Response::Dist { - dist: Dist::Built(dist), - metadata, - }) => { - trace!("Received built distribution metadata for: {dist}"); - match &metadata { - MetadataResponse::InvalidMetadata(err) => { - warn!("Unable to extract metadata for {dist}: {err}"); - } - MetadataResponse::InvalidStructure(err) => { - warn!("Unable to extract metadata for {dist}: {err}"); - } - _ => {} - } - self.index - .distributions - .done(dist.version_id(), Rc::new(metadata)); - } - Some(Response::Dist { - dist: Dist::Source(dist), - metadata, - }) => { - trace!("Received source distribution metadata for: {dist}"); - match &metadata { - MetadataResponse::InvalidMetadata(err) => { - warn!("Unable to extract metadata for {dist}: {err}"); - } - MetadataResponse::InvalidStructure(err) => { - warn!("Unable to extract metadata for {dist}: {err}"); - } - _ => {} - } - self.index - .distributions - .done(dist.version_id(), Rc::new(metadata)); - } - None => {} - } - } - - Ok::<(), ResolveError>(()) - } - - #[instrument(skip_all, fields(%request))] - async fn process_request(&self, request: Request) -> Result, ResolveError> { - match request { - // Fetch package metadata from the registry. - Request::Package(package_name) => { - let package_versions = self - .provider - .get_package_versions(&package_name) - .boxed_local() - .await - .map_err(ResolveError::Client)?; - - Ok(Some(Response::Package(package_name, package_versions))) - } - - // Fetch distribution metadata from the distribution database. - Request::Dist(dist) => { - let metadata = self - .provider - .get_or_build_wheel_metadata(&dist) - .boxed_local() - .await - .map_err(|err| match dist.clone() { - Dist::Built(built_dist @ BuiltDist::Path(_)) => { - ResolveError::Read(Box::new(built_dist), err) - } - Dist::Source(source_dist @ SourceDist::Path(_)) => { - ResolveError::Build(Box::new(source_dist), err) - } - Dist::Source(source_dist @ SourceDist::Directory(_)) => { - ResolveError::Build(Box::new(source_dist), err) - } - Dist::Built(built_dist) => ResolveError::Fetch(Box::new(built_dist), err), - Dist::Source(source_dist) => { - ResolveError::FetchAndBuild(Box::new(source_dist), err) - } - })?; - Ok(Some(Response::Dist { dist, metadata })) - } - - Request::Installed(dist) => { - let metadata = dist - .metadata() - .map_err(|err| ResolveError::ReadInstalled(Box::new(dist.clone()), err))?; - Ok(Some(Response::Installed { dist, metadata })) - } - - // Pre-fetch the package and distribution metadata. - Request::Prefetch(package_name, range) => { - // Wait for the package metadata to become available. - let versions_response = self - .index - .packages - .wait(&package_name) - .await - .ok_or(ResolveError::Unregistered)?; - - let version_map = match *versions_response { - VersionsResponse::Found(ref version_map) => version_map, - // Short-circuit if we did not find any versions for the package - VersionsResponse::NoIndex => { - self.unavailable_packages - .borrow_mut() - .insert(package_name.clone(), UnavailablePackage::NoIndex); - - return Ok(None); - } - VersionsResponse::Offline => { - self.unavailable_packages - .borrow_mut() - .insert(package_name.clone(), UnavailablePackage::Offline); - - return Ok(None); - } - VersionsResponse::NotFound => { - self.unavailable_packages - .borrow_mut() - .insert(package_name.clone(), UnavailablePackage::NotFound); - - return Ok(None); - } - }; - - // Try to find a compatible version. If there aren't any compatible versions, - // short-circuit. - let Some(candidate) = self.selector.select( - &package_name, - &range, - version_map, - &self.preferences, - self.installed_packages, - &self.exclusions, - ) else { - return Ok(None); - }; - - // If there is not a compatible distribution, short-circuit. - let Some(dist) = candidate.compatible() else { - return Ok(None); - }; - - // Emit a request to fetch the metadata for this version. - if self.index.distributions.register(candidate.version_id()) { - let dist = dist.for_resolution().to_owned(); - - let response = match dist { - ResolvedDist::Installable(dist) => { - let metadata = self - .provider - .get_or_build_wheel_metadata(&dist) - .boxed_local() - .await - .map_err(|err| match dist.clone() { - Dist::Built(built_dist @ BuiltDist::Path(_)) => { - ResolveError::Read(Box::new(built_dist), err) - } - Dist::Source(source_dist @ SourceDist::Path(_)) => { - ResolveError::Build(Box::new(source_dist), err) - } - Dist::Source(source_dist @ SourceDist::Directory(_)) => { - ResolveError::Build(Box::new(source_dist), err) - } - Dist::Built(built_dist) => { - ResolveError::Fetch(Box::new(built_dist), err) - } - Dist::Source(source_dist) => { - ResolveError::FetchAndBuild(Box::new(source_dist), err) - } - })?; - Response::Dist { dist, metadata } - } - ResolvedDist::Installed(dist) => { - let metadata = dist.metadata().map_err(|err| { - ResolveError::ReadInstalled(Box::new(dist.clone()), err) - })?; - Response::Installed { dist, metadata } - } - }; - - Ok(Some(response)) - } else { - Ok(None) - } - } - } - } - fn on_progress(&self, package: &PubGrubPackage, version: &Version) { if let Some(reporter) = self.reporter.as_ref() { match package { @@ -1384,12 +1435,6 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide } } } - - fn on_complete(&self) { - if let Some(reporter) = self.reporter.as_ref() { - reporter.on_complete(); - } - } } /// State that is used during unit propagation in the resolver. diff --git a/crates/uv-types/src/traits.rs b/crates/uv-types/src/traits.rs index d85a5ec17407..f75ca2b2c1ca 100644 --- a/crates/uv-types/src/traits.rs +++ b/crates/uv-types/src/traits.rs @@ -128,7 +128,7 @@ pub trait SourceBuildTrait { } /// A wrapper for [`uv_installer::SitePackages`] -pub trait InstalledPackagesProvider { +pub trait InstalledPackagesProvider: Send + Sync + 'static { fn iter(&self) -> impl Iterator; fn get_packages(&self, name: &PackageName) -> Vec<&InstalledDist>; } diff --git a/crates/uv/src/commands/pip/editables.rs b/crates/uv/src/commands/pip/editables.rs index efaade03bc5a..ad1c610458e9 100644 --- a/crates/uv/src/commands/pip/editables.rs +++ b/crates/uv/src/commands/pip/editables.rs @@ -43,7 +43,7 @@ impl ResolvedEditables { #[allow(clippy::too_many_arguments)] pub(crate) async fn resolve( editables: Vec, - site_packages: &SitePackages<'_>, + site_packages: &SitePackages, reinstall: &Reinstall, hasher: &HashStrategy, interpreter: &Interpreter, diff --git a/crates/uv/src/commands/pip/install.rs b/crates/uv/src/commands/pip/install.rs index 7539e7c72b23..0e5510945632 100644 --- a/crates/uv/src/commands/pip/install.rs +++ b/crates/uv/src/commands/pip/install.rs @@ -574,7 +574,7 @@ async fn resolve( project: Option, editables: &[ResolvedEditable], hasher: &HashStrategy, - site_packages: &SitePackages<'_>, + site_packages: &SitePackages, reinstall: &Reinstall, upgrade: &Upgrade, interpreter: &Interpreter, @@ -733,7 +733,7 @@ async fn resolve( async fn install( resolution: &Resolution, editables: &[ResolvedEditable], - site_packages: SitePackages<'_>, + site_packages: SitePackages, reinstall: &Reinstall, no_binary: &NoBinary, link_mode: LinkMode, diff --git a/crates/uv/src/commands/project/mod.rs b/crates/uv/src/commands/project/mod.rs index 23b5441a4bc0..5677df0ab716 100644 --- a/crates/uv/src/commands/project/mod.rs +++ b/crates/uv/src/commands/project/mod.rs @@ -242,7 +242,7 @@ pub(crate) async fn resolve( #[allow(clippy::too_many_arguments)] pub(crate) async fn install( resolution: &Resolution, - site_packages: SitePackages<'_>, + site_packages: SitePackages, no_binary: &NoBinary, link_mode: LinkMode, index_urls: &IndexLocations, From 8b9bd82e4e3374969ebb35bf6fd6f441663c086d Mon Sep 17 00:00:00 2001 From: Ibraheem Ahmed Date: Thu, 16 May 2024 11:47:27 -0400 Subject: [PATCH 02/11] remove lifetimes from resolver --- crates/bench/benches/uv.rs | 2 +- crates/uv-dispatch/src/lib.rs | 2 +- crates/uv-installer/src/site_packages.rs | 2 +- crates/uv-requirements/src/lookahead.rs | 28 ++-- crates/uv-requirements/src/source_tree.rs | 27 ++-- crates/uv-requirements/src/unnamed.rs | 24 ++-- crates/uv-resolver/src/preferences.rs | 17 +-- crates/uv-resolver/src/resolution/graph.rs | 2 +- .../src/resolver/batch_prefetch.rs | 3 +- crates/uv-resolver/src/resolver/index.rs | 23 ++-- crates/uv-resolver/src/resolver/mod.rs | 130 +++++++++--------- crates/uv-resolver/tests/resolver.rs | 2 +- crates/uv-types/src/traits.rs | 3 +- crates/uv/src/commands/pip/compile.rs | 2 +- crates/uv/src/commands/pip/install.rs | 4 +- crates/uv/src/commands/pip/sync.rs | 2 +- crates/uv/src/commands/project/lock.rs | 2 +- crates/uv/src/commands/project/mod.rs | 2 +- crates/uv/src/commands/project/run.rs | 2 +- 19 files changed, 143 insertions(+), 136 deletions(-) diff --git a/crates/bench/benches/uv.rs b/crates/bench/benches/uv.rs index 8816dbaaa2ad..05c84fa68977 100644 --- a/crates/bench/benches/uv.rs +++ b/crates/bench/benches/uv.rs @@ -108,7 +108,7 @@ mod resolver { &index, &hashes, &build_context, - &installed_packages, + installed_packages, DistributionDatabase::new(client, &build_context, concurrency.downloads), )?; diff --git a/crates/uv-dispatch/src/lib.rs b/crates/uv-dispatch/src/lib.rs index e5a185128b5e..37d2d74c9c75 100644 --- a/crates/uv-dispatch/src/lib.rs +++ b/crates/uv-dispatch/src/lib.rs @@ -153,7 +153,7 @@ impl<'a> BuildContext for BuildDispatch<'a> { self.index, &HashStrategy::None, self, - &EmptyInstalledPackages, + EmptyInstalledPackages, DistributionDatabase::new(self.client, self, self.concurrency.downloads), )?; let graph = resolver.resolve().await.with_context(|| { diff --git a/crates/uv-installer/src/site_packages.rs b/crates/uv-installer/src/site_packages.rs index 723a47fd0aca..05fe50b63eb8 100644 --- a/crates/uv-installer/src/site_packages.rs +++ b/crates/uv-installer/src/site_packages.rs @@ -23,7 +23,7 @@ use crate::satisfies::RequirementSatisfaction; /// An index over the packages installed in an environment. /// /// Packages are indexed by both name and (for editable installs) URL. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct SitePackages { venv: PythonEnvironment, /// The vector of all installed distributions. The `by_name` and `by_url` indices index into diff --git a/crates/uv-requirements/src/lookahead.rs b/crates/uv-requirements/src/lookahead.rs index 188ee02a6e4f..e016319f0759 100644 --- a/crates/uv-requirements/src/lookahead.rs +++ b/crates/uv-requirements/src/lookahead.rs @@ -1,4 +1,4 @@ -use std::collections::VecDeque; +use std::{collections::VecDeque, sync::Arc}; use futures::stream::FuturesUnordered; use futures::StreamExt; @@ -197,17 +197,18 @@ impl<'a, Context: BuildContext> LookaheadResolver<'a, Context> { // Fetch the metadata for the distribution. let requires_dist = { let id = dist.version_id(); - if let Some(archive) = self - .index - .get_metadata(&id) - .as_deref() - .and_then(|response| { - if let MetadataResponse::Found(archive, ..) = response { - Some(archive) - } else { - None - } - }) + if let Some(archive) = + self.index + .distributions() + .get(&id) + .as_deref() + .and_then(|response| { + if let MetadataResponse::Found(archive, ..) = response { + Some(archive) + } else { + None + } + }) { // If the metadata is already in the index, return it. archive @@ -234,7 +235,8 @@ impl<'a, Context: BuildContext> LookaheadResolver<'a, Context> { // Insert the metadata into the index. self.index - .insert_metadata(id, MetadataResponse::Found(archive)); + .distributions() + .done(id, Arc::new(MetadataResponse::Found(archive))); requires_dist .into_iter() diff --git a/crates/uv-requirements/src/source_tree.rs b/crates/uv-requirements/src/source_tree.rs index 0cbf5a9d7543..548b16411132 100644 --- a/crates/uv-requirements/src/source_tree.rs +++ b/crates/uv-requirements/src/source_tree.rs @@ -1,5 +1,6 @@ use std::borrow::Cow; use std::path::{Path, PathBuf}; +use std::sync::Arc; use anyhow::{Context, Result}; use futures::stream::FuturesOrdered; @@ -117,17 +118,18 @@ impl<'a, Context: BuildContext> SourceTreeResolver<'a, Context> { // Fetch the metadata for the distribution. let metadata = { let id = VersionId::from_url(source.url()); - if let Some(archive) = self - .index - .get_metadata(&id) - .as_deref() - .and_then(|response| { - if let MetadataResponse::Found(archive) = response { - Some(archive) - } else { - None - } - }) + if let Some(archive) = + self.index + .distributions() + .get(&id) + .as_deref() + .and_then(|response| { + if let MetadataResponse::Found(archive) = response { + Some(archive) + } else { + None + } + }) { // If the metadata is already in the index, return it. archive.metadata.clone() @@ -138,7 +140,8 @@ impl<'a, Context: BuildContext> SourceTreeResolver<'a, Context> { // Insert the metadata into the index. self.index - .insert_metadata(id, MetadataResponse::Found(archive.clone())); + .distributions() + .done(id, Arc::new(MetadataResponse::Found(archive.clone()))); archive.metadata } diff --git a/crates/uv-requirements/src/unnamed.rs b/crates/uv-requirements/src/unnamed.rs index 2c22cb8674e8..2379c9f31fdf 100644 --- a/crates/uv-requirements/src/unnamed.rs +++ b/crates/uv-requirements/src/unnamed.rs @@ -1,6 +1,7 @@ use std::borrow::Cow; use std::path::Path; use std::str::FromStr; +use std::sync::Arc; use anyhow::Result; use configparser::ini::Ini; @@ -254,13 +255,18 @@ impl<'a, Context: BuildContext> NamedRequirementsResolver<'a, Context> { // Fetch the metadata for the distribution. let name = { let id = VersionId::from_url(source.url()); - if let Some(archive) = index.get_metadata(&id).as_deref().and_then(|response| { - if let MetadataResponse::Found(archive) = response { - Some(archive) - } else { - None - } - }) { + if let Some(archive) = index + .distributions() + .get(&id) + .as_deref() + .and_then(|response| { + if let MetadataResponse::Found(archive) = response { + Some(archive) + } else { + None + } + }) + { // If the metadata is already in the index, return it. archive.metadata.name.clone() } else { @@ -272,7 +278,9 @@ impl<'a, Context: BuildContext> NamedRequirementsResolver<'a, Context> { let name = archive.metadata.name.clone(); // Insert the metadata into the index. - index.insert_metadata(id, MetadataResponse::Found(archive)); + index + .distributions() + .done(id, Arc::new(MetadataResponse::Found(archive))); name } diff --git a/crates/uv-resolver/src/preferences.rs b/crates/uv-resolver/src/preferences.rs index d86963126ef4..e4f658b88832 100644 --- a/crates/uv-resolver/src/preferences.rs +++ b/crates/uv-resolver/src/preferences.rs @@ -1,4 +1,4 @@ -use std::str::FromStr; +use std::{str::FromStr, sync::Arc}; use rustc_hash::FxHashMap; use tracing::trace; @@ -69,7 +69,7 @@ impl Preference { /// A set of pinned packages that should be preserved during resolution, if possible. #[derive(Debug, Clone)] -pub(crate) struct Preferences(FxHashMap); +pub(crate) struct Preferences(Arc>); impl Preferences { /// Create a map of pinned packages from an iterator of [`Preference`] entries. @@ -81,10 +81,10 @@ impl Preferences { preferences: PreferenceIterator, markers: Option<&MarkerEnvironment>, ) -> Self { - Self( - // TODO(zanieb): We should explicitly ensure that when a package name is seen multiple times - // that the newest or oldest version is preferred dependning on the resolution strategy; - // right now, the order is dependent on the given iterator. + // TODO(zanieb): We should explicitly ensure that when a package name is seen multiple times + // that the newest or oldest version is preferred dependning on the resolution strategy; + // right now, the order is dependent on the given iterator. + let preferences = preferences .into_iter() .filter_map(|preference| { @@ -130,8 +130,9 @@ impl Preferences { } } }) - .collect(), - ) + .collect(); + + Self(Arc::new(preferences)) } /// Return the pinned version for a package, if any. diff --git a/crates/uv-resolver/src/resolution/graph.rs b/crates/uv-resolver/src/resolution/graph.rs index ff04afe86c65..a3853deba0f5 100644 --- a/crates/uv-resolver/src/resolution/graph.rs +++ b/crates/uv-resolver/src/resolution/graph.rs @@ -458,7 +458,7 @@ impl ResolutionGraph { VersionOrUrlRef::Url(verbatim_url) => VersionId::from_url(verbatim_url.raw()), }; let res = index - .distributions + .distributions() .get(&version_id) .expect("every package in resolution graph has metadata"); let MetadataResponse::Found(archive, ..) = &*res else { diff --git a/crates/uv-resolver/src/resolver/batch_prefetch.rs b/crates/uv-resolver/src/resolver/batch_prefetch.rs index 603a18c6ec64..dd7be0a04cde 100644 --- a/crates/uv-resolver/src/resolver/batch_prefetch.rs +++ b/crates/uv-resolver/src/resolver/batch_prefetch.rs @@ -65,7 +65,7 @@ impl BatchPrefetcher { // This is immediate, we already fetched the version map. let versions_response = index - .packages + .packages() .wait_blocking(package_name) .ok_or(ResolveError::Unregistered)?; @@ -141,6 +141,7 @@ impl BatchPrefetcher { dist ); prefetch_count += 1; + if index.distributions.register(candidate.version_id()) { let request = Request::from(dist); request_sink.send(request)?; diff --git a/crates/uv-resolver/src/resolver/index.rs b/crates/uv-resolver/src/resolver/index.rs index aae0a31299ba..66cd31a6dcea 100644 --- a/crates/uv-resolver/src/resolver/index.rs +++ b/crates/uv-resolver/src/resolver/index.rs @@ -7,8 +7,11 @@ use uv_normalize::PackageName; use crate::resolver::provider::{MetadataResponse, VersionsResponse}; /// In-memory index of package metadata. +#[derive(Default, Clone)] +pub struct InMemoryIndex(Arc); + #[derive(Default)] -pub struct InMemoryIndex { +struct InMemoryIndexState { /// A map from package name to the metadata for that package and the index where the metadata /// came from. pub(crate) packages: OnceMap>, @@ -19,22 +22,12 @@ pub struct InMemoryIndex { impl InMemoryIndex { /// Insert a [`VersionsResponse`] into the index. - pub fn insert_package(&self, package_name: PackageName, response: VersionsResponse) { - self.packages.done(package_name, Arc::new(response)); + pub fn packages(&self) -> &OnceMap> { + &self.0.packages } /// Insert a [`Metadata23`] into the index. - pub fn insert_metadata(&self, version_id: VersionId, response: MetadataResponse) { - self.distributions.done(version_id, Arc::new(response)); - } - - /// Get the [`VersionsResponse`] for a given package name, without waiting. - pub fn get_package(&self, package_name: &PackageName) -> Option> { - self.packages.get(package_name) - } - - /// Get the [`MetadataResponse`] for a given package ID, without waiting. - pub fn get_metadata(&self, version_id: &VersionId) -> Option> { - self.distributions.get(version_id) + pub fn distributions(&self) -> &OnceMap> { + &self.0.distributions } } diff --git a/crates/uv-resolver/src/resolver/mod.rs b/crates/uv-resolver/src/resolver/mod.rs index 9d8e5a5afff9..d00ca44fbd0e 100644 --- a/crates/uv-resolver/src/resolver/mod.rs +++ b/crates/uv-resolver/src/resolver/mod.rs @@ -174,14 +174,14 @@ enum ResolverVersion { pub(crate) type SharedMap = Arc>; pub(crate) type SharedSet = Arc>; -pub struct Resolver<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvider> { +pub struct Resolver { project: Option, preferences: Preferences, exclusions: Exclusions, - python_requirement: &'a PythonRequirement, + python_requirement: PythonRequirement, selector: CandidateSelector, - index: &'a InMemoryIndex, - installed_packages: &'a InstalledPackages, + index: InMemoryIndex, + installed_packages: InstalledPackages, /// Incompatibilities for packages that are entirely unavailable. unavailable_packages: SharedMap, /// Incompatibilities for packages that are unavailable at specific versions. @@ -190,10 +190,10 @@ pub struct Resolver<'a, Provider: ResolverProvider, InstalledPackages: Installed visited: SharedSet, reporter: Option>, provider: Provider, - solver_state: Option>, + solver_state: Option, } -struct SolverState<'a> { +struct SolverState { requirements: Vec, constraints: Constraints, overrides: Overrides, @@ -201,13 +201,13 @@ struct SolverState<'a> { urls: Urls, locals: Locals, dependency_mode: DependencyMode, - hasher: &'a HashStrategy, + hasher: HashStrategy, /// When not set, the resolver is in "universal" mode. - markers: Option<&'a MarkerEnvironment>, + markers: Option, } impl<'a, Context: BuildContext, InstalledPackages: InstalledPackagesProvider> - Resolver<'a, DefaultResolverProvider<'a, Context>, InstalledPackages> + Resolver, InstalledPackages> { /// Initialize a new resolver using the default backend doing real requests. /// @@ -238,7 +238,7 @@ impl<'a, Context: BuildContext, InstalledPackages: InstalledPackagesProvider> index: &'a InMemoryIndex, hasher: &'a HashStrategy, build_context: &'a Context, - installed_packages: &'a InstalledPackages, + installed_packages: InstalledPackages, database: DistributionDatabase<'a, Context>, ) -> Result { let provider = DefaultResolverProvider::new( @@ -266,23 +266,24 @@ impl<'a, Context: BuildContext, InstalledPackages: InstalledPackagesProvider> } } -impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvider> - Resolver<'a, Provider, InstalledPackages> +impl + Resolver { /// Initialize a new resolver using a user provided backend. #[allow(clippy::too_many_arguments)] pub fn new_custom_io( manifest: Manifest, options: Options, - hasher: &'a HashStrategy, - markers: Option<&'a MarkerEnvironment>, - python_requirement: &'a PythonRequirement, - index: &'a InMemoryIndex, + hasher: &HashStrategy, + markers: Option<&MarkerEnvironment>, + python_requirement: &PythonRequirement, + index: &InMemoryIndex, provider: Provider, - installed_packages: &'a InstalledPackages, + installed_packages: InstalledPackages, ) -> Result { let selector = CandidateSelector::for_resolution(options, &manifest, markers); let solver_state = SolverState { + hasher: hasher.clone(), dependency_mode: options.dependency_mode, urls: Urls::from_manifest(&manifest, markers, options.dependency_mode)?, locals: Locals::from_manifest(&manifest, markers, options.dependency_mode), @@ -290,12 +291,11 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide constraints: manifest.constraints, overrides: manifest.overrides, editables: Editables::from_requirements(manifest.editables), - hasher, - markers, + markers: markers.cloned(), }; Ok(Self { - index, + index: index.clone(), unavailable_packages: SharedMap::default(), incomplete_packages: SharedMap::default(), visited: SharedSet::default(), @@ -303,7 +303,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide project: manifest.project, preferences: Preferences::from_iter(manifest.preferences, markers), exclusions: manifest.exclusions, - python_requirement, + python_requirement: python_requirement.clone(), reporter: None, provider, installed_packages, @@ -334,9 +334,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide let requests_fut = self.fetch(request_stream).fuse(); // Run the solver. - #[allow(unsafe_code)] - let solver: Solver<'static, InstalledPackages> = - unsafe { std::mem::transmute(Solver::new(&self, solver_state)) }; + let solver = Solver::new(&self, solver_state); let (tx, rx) = tokio::sync::oneshot::channel(); std::thread::Builder::new() .name("solver".into()) @@ -358,12 +356,12 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide Err(if let ResolveError::NoSolution(err) = err { ResolveError::NoSolution( err.with_available_versions( - self.python_requirement, + &self.python_requirement, &self.visited, - &self.index.packages, + self.index.packages(), ) .with_selector(self.selector.clone()) - .with_python_requirement(self.python_requirement) + .with_python_requirement(&self.python_requirement) .with_index_locations(self.provider.index_locations()) .with_unavailable_packages(&self.unavailable_packages) .with_incomplete_packages(&self.incomplete_packages), @@ -389,12 +387,12 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide Some(Response::Package(package_name, version_map)) => { trace!("Received package metadata for: {package_name}"); self.index - .packages + .packages() .done(package_name, Arc::new(version_map)); } Some(Response::Installed { dist, metadata }) => { trace!("Received installed distribution metadata for: {dist}"); - self.index.distributions.done( + self.index.distributions().done( dist.version_id(), Arc::new(MetadataResponse::Found(ArchiveMetadata::from(metadata))), ); @@ -414,7 +412,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide _ => {} } self.index - .distributions + .distributions() .done(dist.version_id(), Arc::new(metadata)); } Some(Response::Dist { @@ -432,7 +430,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide _ => {} } self.index - .distributions + .distributions() .done(dist.version_id(), Arc::new(metadata)); } None => {} @@ -494,7 +492,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide // Wait for the package metadata to become available. let versions_response = self .index - .packages + .packages() .wait(&package_name) .await .ok_or(ResolveError::Unregistered)?; @@ -529,7 +527,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide &range, version_map, &self.preferences, - self.installed_packages, + &self.installed_packages, &self.exclusions, ) else { return Ok(None); @@ -541,7 +539,7 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide }; // Emit a request to fetch the metadata for this version. - if self.index.distributions.register(candidate.version_id()) { + if self.index.distributions().register(candidate.version_id()) { let dist = dist.for_resolution().to_owned(); let response = match dist { @@ -593,24 +591,24 @@ impl<'a, Provider: ResolverProvider, InstalledPackages: InstalledPackagesProvide } } -struct Solver<'a, InstalledPackages: InstalledPackagesProvider> { +struct Solver { project: Option, requirements: Vec, constraints: Constraints, overrides: Overrides, - preferences: &'a Preferences, - exclusions: &'a Exclusions, + preferences: Preferences, + exclusions: Exclusions, editables: Editables, urls: Urls, locals: Locals, dependency_mode: DependencyMode, - hasher: &'a HashStrategy, + hasher: HashStrategy, /// When not set, the resolver is in "universal" mode. - markers: Option<&'a MarkerEnvironment>, + markers: Option, python_requirement: PythonRequirement, selector: CandidateSelector, - index: &'a InMemoryIndex, - installed_packages: &'a InstalledPackages, + index: InMemoryIndex, + installed_packages: InstalledPackages, /// Incompatibilities for packages that are entirely unavailable. unavailable_packages: SharedMap, /// Incompatibilities for packages that are unavailable at specific versions. @@ -620,13 +618,12 @@ struct Solver<'a, InstalledPackages: InstalledPackagesProvider> { reporter: Option>, } -impl<'a, InstalledPackages: InstalledPackagesProvider> Solver<'a, InstalledPackages> { - fn new( - resolver: &'a Resolver<'a, Provider, InstalledPackages>, - state: SolverState<'a>, +impl Solver { + fn new<'a, Provider: ResolverProvider>( + resolver: &Resolver, + state: SolverState, ) -> Self { Solver { - project: resolver.project.clone(), requirements: state.requirements, constraints: state.constraints, overrides: state.overrides, @@ -636,12 +633,13 @@ impl<'a, InstalledPackages: InstalledPackagesProvider> Solver<'a, InstalledPacka dependency_mode: state.dependency_mode, hasher: state.hasher, markers: state.markers, - preferences: &resolver.preferences, - exclusions: &resolver.exclusions, + project: resolver.project.clone(), + preferences: resolver.preferences.clone(), + exclusions: resolver.exclusions.clone(), python_requirement: resolver.python_requirement.clone(), selector: resolver.selector.clone(), - index: resolver.index, - installed_packages: resolver.installed_packages, + index: resolver.index.clone(), + installed_packages: resolver.installed_packages.clone(), unavailable_packages: resolver.unavailable_packages.clone(), incomplete_packages: resolver.incomplete_packages.clone(), visited: resolver.visited.clone(), @@ -696,8 +694,8 @@ impl<'a, InstalledPackages: InstalledPackagesProvider> Solver<'a, InstalledPacka return ResolutionGraph::from_state( &selection, &state.pins, - &self.index.packages, - &self.index.distributions, + self.index.packages(), + self.index.distributions(), &state.pubgrub, &self.preferences, self.editables.clone(), @@ -808,7 +806,7 @@ impl<'a, InstalledPackages: InstalledPackagesProvider> Solver<'a, InstalledPacka &version, term_intersection.unwrap_positive(), &request_sink, - self.index, + &self.index, &self.selector, )?; @@ -897,7 +895,7 @@ impl<'a, InstalledPackages: InstalledPackagesProvider> Solver<'a, InstalledPacka } // Emit a request to fetch the metadata for this package. - if self.index.packages.register(name.clone()) { + if self.index.packages().register(name.clone()) { request_sink.send(Request::Package(name.clone()))?; } } @@ -914,7 +912,7 @@ impl<'a, InstalledPackages: InstalledPackagesProvider> Solver<'a, InstalledPacka // Emit a request to fetch the metadata for this distribution. let dist = Dist::from_url(name.clone(), url.clone())?; - if self.index.distributions.register(dist.version_id()) { + if self.index.distributions().register(dist.version_id()) { request_sink.send(Request::Dist(dist))?; } } @@ -946,7 +944,7 @@ impl<'a, InstalledPackages: InstalledPackagesProvider> Solver<'a, InstalledPacka #[instrument(skip_all, fields(%package))] fn choose_version( &self, - package: &'a PubGrubPackage, + package: &PubGrubPackage, range: &Range, pins: &mut FilePins, request_sink: &UnboundedSender, @@ -1007,7 +1005,7 @@ impl<'a, InstalledPackages: InstalledPackagesProvider> Solver<'a, InstalledPacka let dist = PubGrubDistribution::from_url(package_name, url); let response = self .index - .distributions + .distributions() .wait_blocking(&dist.version_id()) .ok_or(ResolveError::Unregistered)?; @@ -1070,7 +1068,7 @@ impl<'a, InstalledPackages: InstalledPackagesProvider> Solver<'a, InstalledPacka // Wait for the metadata to be available. let versions_response = self .index - .packages + .packages() .wait_blocking(package_name) .ok_or(ResolveError::Unregistered)?; self.visited.insert(package_name.clone()); @@ -1102,7 +1100,7 @@ impl<'a, InstalledPackages: InstalledPackagesProvider> Solver<'a, InstalledPacka range, version_maps, &self.preferences, - self.installed_packages, + &self.installed_packages, &self.exclusions, ) else { // Short circuit: we couldn't find _any_ versions for a package. @@ -1145,7 +1143,7 @@ impl<'a, InstalledPackages: InstalledPackagesProvider> Solver<'a, InstalledPacka // Emit a request to fetch the metadata for this version. if matches!(package, PubGrubPackage::Package(_, _, _)) { - if self.index.distributions.register(candidate.version_id()) { + if self.index.distributions().register(candidate.version_id()) { let request = Request::from(dist.for_resolution()); request_sink.send(request)?; } @@ -1176,7 +1174,7 @@ impl<'a, InstalledPackages: InstalledPackagesProvider> Solver<'a, InstalledPacka None, &self.urls, &self.locals, - self.markers, + self.markers.as_ref(), ); let mut dependencies = match dependencies { @@ -1224,7 +1222,7 @@ impl<'a, InstalledPackages: InstalledPackagesProvider> Solver<'a, InstalledPacka // Add any constraints. for constraint in self.constraints.get(&metadata.name).into_iter().flatten() { - if constraint.evaluate_markers(self.markers, &[]) { + if constraint.evaluate_markers(self.markers.as_ref(), &[]) { let PubGrubRequirement { package, version } = PubGrubRequirement::from_constraint( constraint, @@ -1256,7 +1254,7 @@ impl<'a, InstalledPackages: InstalledPackagesProvider> Solver<'a, InstalledPacka // Wait for the metadata to be available. self.index - .distributions + .distributions() .wait_blocking(&version_id) .ok_or(ResolveError::Unregistered)?; } @@ -1280,7 +1278,7 @@ impl<'a, InstalledPackages: InstalledPackagesProvider> Solver<'a, InstalledPacka extra.as_ref(), &self.urls, &self.locals, - self.markers, + self.markers.as_ref(), )?; for (dep_package, dep_version) in dependencies.iter() { @@ -1322,7 +1320,7 @@ impl<'a, InstalledPackages: InstalledPackagesProvider> Solver<'a, InstalledPacka // Wait for the metadata to be available. let response = self .index - .distributions + .distributions() .wait_blocking(&version_id) .ok_or(ResolveError::Unregistered)?; @@ -1390,7 +1388,7 @@ impl<'a, InstalledPackages: InstalledPackagesProvider> Solver<'a, InstalledPacka extra.as_ref(), &self.urls, &self.locals, - self.markers, + self.markers.as_ref(), )?; for (dep_package, dep_version) in dependencies.iter() { diff --git a/crates/uv-resolver/tests/resolver.rs b/crates/uv-resolver/tests/resolver.rs index 5eca33aa0e05..874400fdd75a 100644 --- a/crates/uv-resolver/tests/resolver.rs +++ b/crates/uv-resolver/tests/resolver.rs @@ -145,7 +145,7 @@ async fn resolve( &index, &hashes, &build_context, - &installed_packages, + installed_packages, DistributionDatabase::new(&client, &build_context, concurrency.downloads), )?; Ok(resolver.resolve().await?) diff --git a/crates/uv-types/src/traits.rs b/crates/uv-types/src/traits.rs index f75ca2b2c1ca..0940d47d2ff5 100644 --- a/crates/uv-types/src/traits.rs +++ b/crates/uv-types/src/traits.rs @@ -128,12 +128,13 @@ pub trait SourceBuildTrait { } /// A wrapper for [`uv_installer::SitePackages`] -pub trait InstalledPackagesProvider: Send + Sync + 'static { +pub trait InstalledPackagesProvider: Clone + Send + 'static { fn iter(&self) -> impl Iterator; fn get_packages(&self, name: &PackageName) -> Vec<&InstalledDist>; } /// An [`InstalledPackagesProvider`] with no packages in it. +#[derive(Clone)] pub struct EmptyInstalledPackages; impl InstalledPackagesProvider for EmptyInstalledPackages { diff --git a/crates/uv/src/commands/pip/compile.rs b/crates/uv/src/commands/pip/compile.rs index 2080543027e9..1a6e266c87aa 100644 --- a/crates/uv/src/commands/pip/compile.rs +++ b/crates/uv/src/commands/pip/compile.rs @@ -545,7 +545,7 @@ pub(crate) async fn pip_compile( &top_level_index, &hasher, &build_dispatch, - &EmptyInstalledPackages, + EmptyInstalledPackages, DistributionDatabase::new(&client, &build_dispatch, concurrency.downloads), )? .with_reporter(ResolverReporter::from(printer)); diff --git a/crates/uv/src/commands/pip/install.rs b/crates/uv/src/commands/pip/install.rs index 0e5510945632..ae1d540a20e5 100644 --- a/crates/uv/src/commands/pip/install.rs +++ b/crates/uv/src/commands/pip/install.rs @@ -427,7 +427,7 @@ pub(crate) async fn pip_install( project, &editables, &hasher, - &site_packages, + site_packages.clone(), &reinstall, &upgrade, &interpreter, @@ -574,7 +574,7 @@ async fn resolve( project: Option, editables: &[ResolvedEditable], hasher: &HashStrategy, - site_packages: &SitePackages, + site_packages: SitePackages, reinstall: &Reinstall, upgrade: &Upgrade, interpreter: &Interpreter, diff --git a/crates/uv/src/commands/pip/sync.rs b/crates/uv/src/commands/pip/sync.rs index 2525466cd8c2..c2370928078f 100644 --- a/crates/uv/src/commands/pip/sync.rs +++ b/crates/uv/src/commands/pip/sync.rs @@ -381,7 +381,7 @@ pub(crate) async fn pip_sync( &hasher, &build_dispatch, // TODO(zanieb): We should consider support for installed packages in pip sync - &EmptyInstalledPackages, + EmptyInstalledPackages, DistributionDatabase::new(&client, &build_dispatch, concurrency.downloads), )? .with_reporter(reporter); diff --git a/crates/uv/src/commands/project/lock.rs b/crates/uv/src/commands/project/lock.rs index 081d9e620330..52da445e26da 100644 --- a/crates/uv/src/commands/project/lock.rs +++ b/crates/uv/src/commands/project/lock.rs @@ -111,7 +111,7 @@ pub(crate) async fn lock( // Resolve the requirements. let resolution = project::resolve( spec, - &EmptyInstalledPackages, + EmptyInstalledPackages, &hasher, &interpreter, tags, diff --git a/crates/uv/src/commands/project/mod.rs b/crates/uv/src/commands/project/mod.rs index 5677df0ab716..35bb04752068 100644 --- a/crates/uv/src/commands/project/mod.rs +++ b/crates/uv/src/commands/project/mod.rs @@ -115,7 +115,7 @@ pub(crate) fn init( #[allow(clippy::too_many_arguments)] pub(crate) async fn resolve( spec: RequirementsSpecification, - installed_packages: &InstalledPackages, + installed_packages: InstalledPackages, hasher: &HashStrategy, interpreter: &Interpreter, tags: &Tags, diff --git a/crates/uv/src/commands/project/run.rs b/crates/uv/src/commands/project/run.rs index 9ead77768d18..312b67a87bda 100644 --- a/crates/uv/src/commands/project/run.rs +++ b/crates/uv/src/commands/project/run.rs @@ -292,7 +292,7 @@ async fn update_environment( // Resolve the requirements. let resolution = match project::resolve( spec, - &site_packages, + site_packages.clone(), &hasher, &interpreter, tags, From 6823288e36fcaaef6441fab04a755de206c9e876 Mon Sep 17 00:00:00 2001 From: Ibraheem Ahmed Date: Thu, 16 May 2024 11:54:10 -0400 Subject: [PATCH 03/11] update internal docs --- crates/once-map/src/lib.rs | 2 +- crates/uv-resolver/src/resolver/mod.rs | 19 +++++++++++++++---- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/crates/once-map/src/lib.rs b/crates/once-map/src/lib.rs index 7bdd279b49bb..3fcedba4979d 100644 --- a/crates/once-map/src/lib.rs +++ b/crates/once-map/src/lib.rs @@ -63,7 +63,7 @@ impl OnceMap { } } - /// Wait for the result of a job that is running. + /// Wait for the result of a job that is running, in a blocking context. /// /// Will hang if [`OnceMap::done`] isn't called for this key. pub fn wait_blocking(&self, key: &K) -> Option { diff --git a/crates/uv-resolver/src/resolver/mod.rs b/crates/uv-resolver/src/resolver/mod.rs index d00ca44fbd0e..b828a25a20e9 100644 --- a/crates/uv-resolver/src/resolver/mod.rs +++ b/crates/uv-resolver/src/resolver/mod.rs @@ -4,6 +4,7 @@ use std::borrow::Cow; use std::fmt::{Display, Formatter}; use std::ops::Deref; use std::sync::Arc; +use std::thread; use anyhow::Result; use dashmap::{DashMap, DashSet}; @@ -193,6 +194,7 @@ pub struct Resolver, } +/// State used by the solver task. struct SolverState { requirements: Vec, constraints: Constraints, @@ -333,17 +335,22 @@ impl // Run the fetcher. let requests_fut = self.fetch(request_stream).fuse(); - // Run the solver. + // Spawn the solver thread. let solver = Solver::new(&self, solver_state); let (tx, rx) = tokio::sync::oneshot::channel(); - std::thread::Builder::new() - .name("solver".into()) + thread::Builder::new() + .name("uv-resolver".into()) .spawn(move || { let result = solver.solve(request_sink); tx.send(result).unwrap(); }) .unwrap(); - let resolve_fut = async { rx.await.unwrap() }; + + let resolve_fut = async move { + rx.await + .map_err(|_| ResolveError::ChannelClosed) + .and_then(|result| result) + }; // Wait for both to complete. match tokio::try_join!(requests_fut, resolve_fut) { @@ -591,6 +598,10 @@ impl } } +/// The PubGrub solver task. +/// +/// This solver task is spawned on a separate thread and delegates all I/O to +/// the async thread through a channel the `InMemoryIndex`. struct Solver { project: Option, requirements: Vec, From 374afaf9d2187d3c49d6afb029c19082b972b473 Mon Sep 17 00:00:00 2001 From: Ibraheem Ahmed Date: Thu, 16 May 2024 12:00:23 -0400 Subject: [PATCH 04/11] wrap `PythonEnvironment` in `Arc` --- crates/uv-interpreter/src/environment.rs | 59 ++++++++++--------- .../src/resolver/batch_prefetch.rs | 2 +- crates/uv-resolver/src/resolver/index.rs | 4 +- crates/uv-resolver/src/resolver/mod.rs | 11 ++-- 4 files changed, 41 insertions(+), 35 deletions(-) diff --git a/crates/uv-interpreter/src/environment.rs b/crates/uv-interpreter/src/environment.rs index 2811755e572f..6f5d655b7ec2 100644 --- a/crates/uv-interpreter/src/environment.rs +++ b/crates/uv-interpreter/src/environment.rs @@ -1,6 +1,7 @@ use itertools::Either; use std::env; use std::path::{Path, PathBuf}; +use std::sync::Arc; use same_file::is_same_file; @@ -12,7 +13,10 @@ use crate::{find_default_python, find_requested_python, Error, Interpreter, Targ /// A Python environment, consisting of a Python [`Interpreter`] and its associated paths. #[derive(Debug, Clone)] -pub struct PythonEnvironment { +pub struct PythonEnvironment(Arc); + +#[derive(Debug, Clone)] +struct SharedPythonEnvironment { root: PathBuf, interpreter: Interpreter, } @@ -46,10 +50,10 @@ impl PythonEnvironment { interpreter.base_prefix().display() ); - Ok(Self { + Ok(Self(Arc::new(SharedPythonEnvironment { root: venv, interpreter, - }) + }))) } /// Create a [`PythonEnvironment`] for a Python interpreter specifier (e.g., a path or a binary name). @@ -57,57 +61,58 @@ impl PythonEnvironment { let Some(interpreter) = find_requested_python(python, cache)? else { return Err(Error::RequestedPythonNotFound(python.to_string())); }; - Ok(Self { + Ok(Self(Arc::new(SharedPythonEnvironment { root: interpreter.prefix().to_path_buf(), interpreter, - }) + }))) } /// Create a [`PythonEnvironment`] for the default Python interpreter. pub fn from_default_python(cache: &Cache) -> Result { let interpreter = find_default_python(cache)?; - Ok(Self { + Ok(Self(Arc::new(SharedPythonEnvironment { root: interpreter.prefix().to_path_buf(), interpreter, - }) + }))) } /// Create a [`PythonEnvironment`] from an existing [`Interpreter`] and root directory. pub fn from_interpreter(interpreter: Interpreter) -> Self { - Self { + Self(Arc::new(SharedPythonEnvironment { root: interpreter.prefix().to_path_buf(), interpreter, - } + })) } /// Create a [`PythonEnvironment`] from an existing [`Interpreter`] and `--target` directory. #[must_use] pub fn with_target(self, target: Target) -> Self { - Self { - interpreter: self.interpreter.with_target(target), - ..self - } + let inner = Arc::unwrap_or_clone(self.0); + Self(Arc::new(SharedPythonEnvironment { + interpreter: inner.interpreter.with_target(target), + ..inner + })) } /// Returns the root (i.e., `prefix`) of the Python interpreter. pub fn root(&self) -> &Path { - &self.root + &self.0.root } /// Return the [`Interpreter`] for this virtual environment. pub fn interpreter(&self) -> &Interpreter { - &self.interpreter + &self.0.interpreter } /// Return the [`PyVenvConfiguration`] for this virtual environment, as extracted from the /// `pyvenv.cfg` file. pub fn cfg(&self) -> Result { - Ok(PyVenvConfiguration::parse(self.root.join("pyvenv.cfg"))?) + Ok(PyVenvConfiguration::parse(self.0.root.join("pyvenv.cfg"))?) } /// Returns the location of the Python executable. pub fn python_executable(&self) -> &Path { - self.interpreter.sys_executable() + self.0.interpreter.sys_executable() } /// Returns an iterator over the `site-packages` directories inside a virtual environment. @@ -118,11 +123,11 @@ impl PythonEnvironment { /// Some distributions also create symbolic links from `purelib` to `platlib`; in such cases, we /// still deduplicate the entries, returning a single path. pub fn site_packages(&self) -> impl Iterator { - if let Some(target) = self.interpreter.target() { + if let Some(target) = self.0.interpreter.target() { Either::Left(std::iter::once(target.root())) } else { - let purelib = self.interpreter.purelib(); - let platlib = self.interpreter.platlib(); + let purelib = self.0.interpreter.purelib(); + let platlib = self.0.interpreter.platlib(); Either::Right(std::iter::once(purelib).chain( if purelib == platlib || is_same_file(purelib, platlib).unwrap_or(false) { None @@ -135,31 +140,31 @@ impl PythonEnvironment { /// Returns the path to the `bin` directory inside a virtual environment. pub fn scripts(&self) -> &Path { - self.interpreter.scripts() + self.0.interpreter.scripts() } /// Grab a file lock for the virtual environment to prevent concurrent writes across processes. pub fn lock(&self) -> Result { - if let Some(target) = self.interpreter.target() { + if let Some(target) = self.0.interpreter.target() { // If we're installing into a `--target`, use a target-specific lock file. LockedFile::acquire( target.root().join(".lock"), target.root().simplified_display(), ) - } else if self.interpreter.is_virtualenv() { + } else if self.0.interpreter.is_virtualenv() { // If the environment a virtualenv, use a virtualenv-specific lock file. - LockedFile::acquire(self.root.join(".lock"), self.root.simplified_display()) + LockedFile::acquire(self.0.root.join(".lock"), self.0.root.simplified_display()) } else { // Otherwise, use a global lock file. LockedFile::acquire( - env::temp_dir().join(format!("uv-{}.lock", cache_key::digest(&self.root))), - self.root.simplified_display(), + env::temp_dir().join(format!("uv-{}.lock", cache_key::digest(&self.0.root))), + self.0.root.simplified_display(), ) } } /// Return the [`Interpreter`] for this virtual environment. pub fn into_interpreter(self) -> Interpreter { - self.interpreter + Arc::unwrap_or_clone(self.0).interpreter } } diff --git a/crates/uv-resolver/src/resolver/batch_prefetch.rs b/crates/uv-resolver/src/resolver/batch_prefetch.rs index dd7be0a04cde..d4fce8b6c273 100644 --- a/crates/uv-resolver/src/resolver/batch_prefetch.rs +++ b/crates/uv-resolver/src/resolver/batch_prefetch.rs @@ -142,7 +142,7 @@ impl BatchPrefetcher { ); prefetch_count += 1; - if index.distributions.register(candidate.version_id()) { + if index.distributions().register(candidate.version_id()) { let request = Request::from(dist); request_sink.send(request)?; } diff --git a/crates/uv-resolver/src/resolver/index.rs b/crates/uv-resolver/src/resolver/index.rs index 66cd31a6dcea..0f91cc68df69 100644 --- a/crates/uv-resolver/src/resolver/index.rs +++ b/crates/uv-resolver/src/resolver/index.rs @@ -8,10 +8,10 @@ use crate::resolver::provider::{MetadataResponse, VersionsResponse}; /// In-memory index of package metadata. #[derive(Default, Clone)] -pub struct InMemoryIndex(Arc); +pub struct InMemoryIndex(Arc); #[derive(Default)] -struct InMemoryIndexState { +struct SharedInMemoryIndex { /// A map from package name to the metadata for that package and the index where the metadata /// came from. pub(crate) packages: OnceMap>, diff --git a/crates/uv-resolver/src/resolver/mod.rs b/crates/uv-resolver/src/resolver/mod.rs index b828a25a20e9..41a5d56f58e6 100644 --- a/crates/uv-resolver/src/resolver/mod.rs +++ b/crates/uv-resolver/src/resolver/mod.rs @@ -14,7 +14,8 @@ use pubgrub::error::PubGrubError; use pubgrub::range::Range; use pubgrub::solver::{Incompatibility, State}; use rustc_hash::{FxHashMap, FxHashSet}; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; +use tokio::sync::oneshot; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, enabled, instrument, trace, warn, Level}; @@ -329,15 +330,15 @@ impl // A channel to fetch package metadata (e.g., given `flask`, fetch all versions) and version // metadata (e.g., given `flask==1.0.0`, fetch the metadata for that version). // Channel size is set large to accommodate batch prefetching. - let (request_sink, request_stream) = tokio::sync::mpsc::unbounded_channel(); + let (request_sink, request_stream) = mpsc::unbounded_channel(); let solver_state = self.solver_state.take().unwrap(); // Run the fetcher. let requests_fut = self.fetch(request_stream).fuse(); - // Spawn the solver thread. + // Spawn the PubGrub solver task. let solver = Solver::new(&self, solver_state); - let (tx, rx) = tokio::sync::oneshot::channel(); + let (tx, rx) = oneshot::channel(); thread::Builder::new() .name("uv-resolver".into()) .spawn(move || { @@ -630,7 +631,7 @@ struct Solver { } impl Solver { - fn new<'a, Provider: ResolverProvider>( + fn new( resolver: &Resolver, state: SolverState, ) -> Self { From 33fd1779a30437a3e2c688fb8a8845be96896fb8 Mon Sep 17 00:00:00 2001 From: Ibraheem Ahmed Date: Thu, 16 May 2024 12:34:14 -0400 Subject: [PATCH 05/11] remove unused `rayon` dependency --- Cargo.lock | 1 - crates/uv-resolver/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index afb8cbc8eda1..21b812a7ea3f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5018,7 +5018,6 @@ dependencies = [ "platform-tags", "pubgrub", "pypi-types", - "rayon", "requirements-txt", "rkyv", "rustc-hash", diff --git a/crates/uv-resolver/Cargo.toml b/crates/uv-resolver/Cargo.toml index 305c97a42f85..5dfbbcb2b7e8 100644 --- a/crates/uv-resolver/Cargo.toml +++ b/crates/uv-resolver/Cargo.toml @@ -57,7 +57,6 @@ tokio-stream = { workspace = true } tracing = { workspace = true } url = { workspace = true } dashmap = { workspace = true } -rayon = { workspace = true } [dev-dependencies] uv-interpreter = { workspace = true } From e8f274ec94a75d17a493ec7c8be6f44f9888f505 Mon Sep 17 00:00:00 2001 From: Ibraheem Ahmed Date: Thu, 16 May 2024 14:54:44 -0400 Subject: [PATCH 06/11] simplify resolver state --- crates/uv-resolver/src/resolver/mod.rs | 607 ++++++++++++------------- crates/uv-types/src/traits.rs | 2 +- 2 files changed, 282 insertions(+), 327 deletions(-) diff --git a/crates/uv-resolver/src/resolver/mod.rs b/crates/uv-resolver/src/resolver/mod.rs index 41a5d56f58e6..5921f3d7bd19 100644 --- a/crates/uv-resolver/src/resolver/mod.rs +++ b/crates/uv-resolver/src/resolver/mod.rs @@ -177,9 +177,34 @@ pub(crate) type SharedMap = Arc>; pub(crate) type SharedSet = Arc>; pub struct Resolver { + state: Shared, + provider: Provider, +} + +impl Deref + for Resolver +{ + type Target = Shared; + + fn deref(&self) -> &Self::Target { + &self.state + } +} + +pub struct Shared { project: Option, + requirements: Vec, + constraints: Constraints, + overrides: Overrides, preferences: Preferences, exclusions: Exclusions, + editables: Editables, + urls: Urls, + locals: Locals, + dependency_mode: DependencyMode, + hasher: HashStrategy, + /// When not set, the resolver is in "universal" mode. + markers: Option, python_requirement: PythonRequirement, selector: CandidateSelector, index: InMemoryIndex, @@ -191,22 +216,6 @@ pub struct Resolver, reporter: Option>, - provider: Provider, - solver_state: Option, -} - -/// State used by the solver task. -struct SolverState { - requirements: Vec, - constraints: Constraints, - overrides: Overrides, - editables: Editables, - urls: Urls, - locals: Locals, - dependency_mode: DependencyMode, - hasher: HashStrategy, - /// When not set, the resolver is in "universal" mode. - markers: Option, } impl<'a, Context: BuildContext, InstalledPackages: InstalledPackagesProvider> @@ -284,60 +293,60 @@ impl provider: Provider, installed_packages: InstalledPackages, ) -> Result { - let selector = CandidateSelector::for_resolution(options, &manifest, markers); - let solver_state = SolverState { - hasher: hasher.clone(), + let state = Shared { + index: index.clone(), + unavailable_packages: SharedMap::default(), + incomplete_packages: SharedMap::default(), + visited: SharedSet::default(), + selector: CandidateSelector::for_resolution(options, &manifest, markers), dependency_mode: options.dependency_mode, urls: Urls::from_manifest(&manifest, markers, options.dependency_mode)?, locals: Locals::from_manifest(&manifest, markers, options.dependency_mode), + project: manifest.project, requirements: manifest.requirements, constraints: manifest.constraints, overrides: manifest.overrides, - editables: Editables::from_requirements(manifest.editables), - markers: markers.cloned(), - }; - - Ok(Self { - index: index.clone(), - unavailable_packages: SharedMap::default(), - incomplete_packages: SharedMap::default(), - visited: SharedSet::default(), - selector, - project: manifest.project, preferences: Preferences::from_iter(manifest.preferences, markers), exclusions: manifest.exclusions, + editables: Editables::from_requirements(manifest.editables), + hasher: hasher.clone(), + markers: markers.cloned(), python_requirement: python_requirement.clone(), reporter: None, - provider, installed_packages, - solver_state: Some(solver_state), - }) + }; + Ok(Self { provider, state }) } /// Set the [`Reporter`] to use for this installer. #[must_use] pub fn with_reporter(self, reporter: impl Reporter + 'static) -> Self { let reporter = Arc::new(reporter); + Self { - reporter: Some(reporter.clone()), + state: Shared { + reporter: Some(reporter.clone()), + ..self.state + }, provider: self.provider.with_reporter(Facade { reporter }), - ..self } } /// Resolve a set of requirements into a set of pinned versions. - pub async fn resolve(mut self) -> Result { + pub async fn resolve(self) -> Result { + let state = Arc::new(self.state); + let provider = Arc::new(self.provider); + // A channel to fetch package metadata (e.g., given `flask`, fetch all versions) and version // metadata (e.g., given `flask==1.0.0`, fetch the metadata for that version). // Channel size is set large to accommodate batch prefetching. let (request_sink, request_stream) = mpsc::unbounded_channel(); - let solver_state = self.solver_state.take().unwrap(); // Run the fetcher. - let requests_fut = self.fetch(request_stream).fuse(); + let requests_fut = state.clone().fetch(provider.clone(), request_stream).fuse(); // Spawn the PubGrub solver task. - let solver = Solver::new(&self, solver_state); + let solver = state.clone(); let (tx, rx) = oneshot::channel(); thread::Builder::new() .name("uv-resolver".into()) @@ -356,7 +365,7 @@ impl // Wait for both to complete. match tokio::try_join!(requests_fut, resolve_fut) { Ok(((), resolution)) => { - self.on_complete(); + state.on_complete(); Ok(resolution) } Err(err) => { @@ -364,15 +373,15 @@ impl Err(if let ResolveError::NoSolution(err) = err { ResolveError::NoSolution( err.with_available_versions( - &self.python_requirement, - &self.visited, - self.index.packages(), + &state.python_requirement, + &state.visited, + state.index.packages(), ) - .with_selector(self.selector.clone()) - .with_python_requirement(&self.python_requirement) - .with_index_locations(self.provider.index_locations()) - .with_unavailable_packages(&self.unavailable_packages) - .with_incomplete_packages(&self.incomplete_packages), + .with_selector(state.selector.clone()) + .with_python_requirement(&state.python_requirement) + .with_index_locations(provider.index_locations()) + .with_unavailable_packages(&state.unavailable_packages) + .with_incomplete_packages(&state.incomplete_packages), ) } else { err @@ -380,289 +389,13 @@ impl } } } - - /// Fetch the metadata for a stream of packages and versions. - async fn fetch(&self, request_stream: UnboundedReceiver) -> Result<(), ResolveError> { - let mut response_stream = UnboundedReceiverStream::new(request_stream) - .map(|request| self.process_request(request).boxed_local()) - // Allow as many futures as possible to start in the background. - // Backpressure is provided by at a more granular level by `DistributionDatabase` - // and `SourceDispatch`, as well as the bounded request channel. - .buffer_unordered(usize::MAX); - - while let Some(response) = response_stream.next().await { - match response? { - Some(Response::Package(package_name, version_map)) => { - trace!("Received package metadata for: {package_name}"); - self.index - .packages() - .done(package_name, Arc::new(version_map)); - } - Some(Response::Installed { dist, metadata }) => { - trace!("Received installed distribution metadata for: {dist}"); - self.index.distributions().done( - dist.version_id(), - Arc::new(MetadataResponse::Found(ArchiveMetadata::from(metadata))), - ); - } - Some(Response::Dist { - dist: Dist::Built(dist), - metadata, - }) => { - trace!("Received built distribution metadata for: {dist}"); - match &metadata { - MetadataResponse::InvalidMetadata(err) => { - warn!("Unable to extract metadata for {dist}: {err}"); - } - MetadataResponse::InvalidStructure(err) => { - warn!("Unable to extract metadata for {dist}: {err}"); - } - _ => {} - } - self.index - .distributions() - .done(dist.version_id(), Arc::new(metadata)); - } - Some(Response::Dist { - dist: Dist::Source(dist), - metadata, - }) => { - trace!("Received source distribution metadata for: {dist}"); - match &metadata { - MetadataResponse::InvalidMetadata(err) => { - warn!("Unable to extract metadata for {dist}: {err}"); - } - MetadataResponse::InvalidStructure(err) => { - warn!("Unable to extract metadata for {dist}: {err}"); - } - _ => {} - } - self.index - .distributions() - .done(dist.version_id(), Arc::new(metadata)); - } - None => {} - } - } - - Ok::<(), ResolveError>(()) - } - - #[instrument(skip_all, fields(%request))] - async fn process_request(&self, request: Request) -> Result, ResolveError> { - match request { - // Fetch package metadata from the registry. - Request::Package(package_name) => { - let package_versions = self - .provider - .get_package_versions(&package_name) - .boxed_local() - .await - .map_err(ResolveError::Client)?; - - Ok(Some(Response::Package(package_name, package_versions))) - } - - // Fetch distribution metadata from the distribution database. - Request::Dist(dist) => { - let metadata = self - .provider - .get_or_build_wheel_metadata(&dist) - .boxed_local() - .await - .map_err(|err| match dist.clone() { - Dist::Built(built_dist @ BuiltDist::Path(_)) => { - ResolveError::Read(Box::new(built_dist), err) - } - Dist::Source(source_dist @ SourceDist::Path(_)) => { - ResolveError::Build(Box::new(source_dist), err) - } - Dist::Source(source_dist @ SourceDist::Directory(_)) => { - ResolveError::Build(Box::new(source_dist), err) - } - Dist::Built(built_dist) => ResolveError::Fetch(Box::new(built_dist), err), - Dist::Source(source_dist) => { - ResolveError::FetchAndBuild(Box::new(source_dist), err) - } - })?; - Ok(Some(Response::Dist { dist, metadata })) - } - - Request::Installed(dist) => { - let metadata = dist - .metadata() - .map_err(|err| ResolveError::ReadInstalled(Box::new(dist.clone()), err))?; - Ok(Some(Response::Installed { dist, metadata })) - } - - // Pre-fetch the package and distribution metadata. - Request::Prefetch(package_name, range) => { - // Wait for the package metadata to become available. - let versions_response = self - .index - .packages() - .wait(&package_name) - .await - .ok_or(ResolveError::Unregistered)?; - - let version_map = match *versions_response { - VersionsResponse::Found(ref version_map) => version_map, - // Short-circuit if we did not find any versions for the package - VersionsResponse::NoIndex => { - self.unavailable_packages - .insert(package_name.clone(), UnavailablePackage::NoIndex); - - return Ok(None); - } - VersionsResponse::Offline => { - self.unavailable_packages - .insert(package_name.clone(), UnavailablePackage::Offline); - - return Ok(None); - } - VersionsResponse::NotFound => { - self.unavailable_packages - .insert(package_name.clone(), UnavailablePackage::NotFound); - - return Ok(None); - } - }; - - // Try to find a compatible version. If there aren't any compatible versions, - // short-circuit. - let Some(candidate) = self.selector.select( - &package_name, - &range, - version_map, - &self.preferences, - &self.installed_packages, - &self.exclusions, - ) else { - return Ok(None); - }; - - // If there is not a compatible distribution, short-circuit. - let Some(dist) = candidate.compatible() else { - return Ok(None); - }; - - // Emit a request to fetch the metadata for this version. - if self.index.distributions().register(candidate.version_id()) { - let dist = dist.for_resolution().to_owned(); - - let response = match dist { - ResolvedDist::Installable(dist) => { - let metadata = self - .provider - .get_or_build_wheel_metadata(&dist) - .boxed_local() - .await - .map_err(|err| match dist.clone() { - Dist::Built(built_dist @ BuiltDist::Path(_)) => { - ResolveError::Read(Box::new(built_dist), err) - } - Dist::Source(source_dist @ SourceDist::Path(_)) => { - ResolveError::Build(Box::new(source_dist), err) - } - Dist::Source(source_dist @ SourceDist::Directory(_)) => { - ResolveError::Build(Box::new(source_dist), err) - } - Dist::Built(built_dist) => { - ResolveError::Fetch(Box::new(built_dist), err) - } - Dist::Source(source_dist) => { - ResolveError::FetchAndBuild(Box::new(source_dist), err) - } - })?; - Response::Dist { dist, metadata } - } - ResolvedDist::Installed(dist) => { - let metadata = dist.metadata().map_err(|err| { - ResolveError::ReadInstalled(Box::new(dist.clone()), err) - })?; - Response::Installed { dist, metadata } - } - }; - - Ok(Some(response)) - } else { - Ok(None) - } - } - } - } - - fn on_complete(&self) { - if let Some(reporter) = self.reporter.as_ref() { - reporter.on_complete(); - } - } } -/// The PubGrub solver task. -/// -/// This solver task is spawned on a separate thread and delegates all I/O to -/// the async thread through a channel the `InMemoryIndex`. -struct Solver { - project: Option, - requirements: Vec, - constraints: Constraints, - overrides: Overrides, - preferences: Preferences, - exclusions: Exclusions, - editables: Editables, - urls: Urls, - locals: Locals, - dependency_mode: DependencyMode, - hasher: HashStrategy, - /// When not set, the resolver is in "universal" mode. - markers: Option, - python_requirement: PythonRequirement, - selector: CandidateSelector, - index: InMemoryIndex, - installed_packages: InstalledPackages, - /// Incompatibilities for packages that are entirely unavailable. - unavailable_packages: SharedMap, - /// Incompatibilities for packages that are unavailable at specific versions. - incomplete_packages: SharedMap>, - /// The set of all registry-based packages visited during resolution. - visited: SharedSet, - reporter: Option>, -} - -impl Solver { - fn new( - resolver: &Resolver, - state: SolverState, - ) -> Self { - Solver { - requirements: state.requirements, - constraints: state.constraints, - overrides: state.overrides, - editables: state.editables, - urls: state.urls, - locals: state.locals, - dependency_mode: state.dependency_mode, - hasher: state.hasher, - markers: state.markers, - project: resolver.project.clone(), - preferences: resolver.preferences.clone(), - exclusions: resolver.exclusions.clone(), - python_requirement: resolver.python_requirement.clone(), - selector: resolver.selector.clone(), - index: resolver.index.clone(), - installed_packages: resolver.installed_packages.clone(), - unavailable_packages: resolver.unavailable_packages.clone(), - incomplete_packages: resolver.incomplete_packages.clone(), - visited: resolver.visited.clone(), - reporter: resolver.reporter.clone(), - } - } - +impl Shared { /// Run the PubGrub solver. #[instrument(skip_all)] fn solve( - &self, + self: Arc, request_sink: UnboundedSender, ) -> Result { let root = PubGrubPackage::Root(self.project.clone()); @@ -1430,6 +1163,222 @@ impl Solver { } } + /// Fetch the metadata for a stream of packages and versions. + async fn fetch( + self: Arc, + provider: Arc, + request_stream: UnboundedReceiver, + ) -> Result<(), ResolveError> { + let mut response_stream = UnboundedReceiverStream::new(request_stream) + .map(|request| self.process_request(request, &*provider).boxed_local()) + // Allow as many futures as possible to start in the background. + // Backpressure is provided by at a more granular level by `DistributionDatabase` + // and `SourceDispatch`, as well as the bounded request channel. + .buffer_unordered(usize::MAX); + + while let Some(response) = response_stream.next().await { + match response? { + Some(Response::Package(package_name, version_map)) => { + trace!("Received package metadata for: {package_name}"); + self.index + .packages() + .done(package_name, Arc::new(version_map)); + } + Some(Response::Installed { dist, metadata }) => { + trace!("Received installed distribution metadata for: {dist}"); + self.index.distributions().done( + dist.version_id(), + Arc::new(MetadataResponse::Found(ArchiveMetadata::from(metadata))), + ); + } + Some(Response::Dist { + dist: Dist::Built(dist), + metadata, + }) => { + trace!("Received built distribution metadata for: {dist}"); + match &metadata { + MetadataResponse::InvalidMetadata(err) => { + warn!("Unable to extract metadata for {dist}: {err}"); + } + MetadataResponse::InvalidStructure(err) => { + warn!("Unable to extract metadata for {dist}: {err}"); + } + _ => {} + } + self.index + .distributions() + .done(dist.version_id(), Arc::new(metadata)); + } + Some(Response::Dist { + dist: Dist::Source(dist), + metadata, + }) => { + trace!("Received source distribution metadata for: {dist}"); + match &metadata { + MetadataResponse::InvalidMetadata(err) => { + warn!("Unable to extract metadata for {dist}: {err}"); + } + MetadataResponse::InvalidStructure(err) => { + warn!("Unable to extract metadata for {dist}: {err}"); + } + _ => {} + } + self.index + .distributions() + .done(dist.version_id(), Arc::new(metadata)); + } + None => {} + } + } + + Ok::<(), ResolveError>(()) + } + + #[instrument(skip_all, fields(%request))] + async fn process_request( + &self, + request: Request, + provider: &Provider, + ) -> Result, ResolveError> { + match request { + // Fetch package metadata from the registry. + Request::Package(package_name) => { + let package_versions = provider + .get_package_versions(&package_name) + .boxed_local() + .await + .map_err(ResolveError::Client)?; + + Ok(Some(Response::Package(package_name, package_versions))) + } + + // Fetch distribution metadata from the distribution database. + Request::Dist(dist) => { + let metadata = provider + .get_or_build_wheel_metadata(&dist) + .boxed_local() + .await + .map_err(|err| match dist.clone() { + Dist::Built(built_dist @ BuiltDist::Path(_)) => { + ResolveError::Read(Box::new(built_dist), err) + } + Dist::Source(source_dist @ SourceDist::Path(_)) => { + ResolveError::Build(Box::new(source_dist), err) + } + Dist::Source(source_dist @ SourceDist::Directory(_)) => { + ResolveError::Build(Box::new(source_dist), err) + } + Dist::Built(built_dist) => ResolveError::Fetch(Box::new(built_dist), err), + Dist::Source(source_dist) => { + ResolveError::FetchAndBuild(Box::new(source_dist), err) + } + })?; + Ok(Some(Response::Dist { dist, metadata })) + } + + Request::Installed(dist) => { + let metadata = dist + .metadata() + .map_err(|err| ResolveError::ReadInstalled(Box::new(dist.clone()), err))?; + Ok(Some(Response::Installed { dist, metadata })) + } + + // Pre-fetch the package and distribution metadata. + Request::Prefetch(package_name, range) => { + // Wait for the package metadata to become available. + let versions_response = self + .index + .packages() + .wait(&package_name) + .await + .ok_or(ResolveError::Unregistered)?; + + let version_map = match *versions_response { + VersionsResponse::Found(ref version_map) => version_map, + // Short-circuit if we did not find any versions for the package + VersionsResponse::NoIndex => { + self.unavailable_packages + .insert(package_name.clone(), UnavailablePackage::NoIndex); + + return Ok(None); + } + VersionsResponse::Offline => { + self.unavailable_packages + .insert(package_name.clone(), UnavailablePackage::Offline); + + return Ok(None); + } + VersionsResponse::NotFound => { + self.unavailable_packages + .insert(package_name.clone(), UnavailablePackage::NotFound); + + return Ok(None); + } + }; + + // Try to find a compatible version. If there aren't any compatible versions, + // short-circuit. + let Some(candidate) = self.selector.select( + &package_name, + &range, + version_map, + &self.preferences, + &self.installed_packages, + &self.exclusions, + ) else { + return Ok(None); + }; + + // If there is not a compatible distribution, short-circuit. + let Some(dist) = candidate.compatible() else { + return Ok(None); + }; + + // Emit a request to fetch the metadata for this version. + if self.index.distributions().register(candidate.version_id()) { + let dist = dist.for_resolution().to_owned(); + + let response = match dist { + ResolvedDist::Installable(dist) => { + let metadata = provider + .get_or_build_wheel_metadata(&dist) + .boxed_local() + .await + .map_err(|err| match dist.clone() { + Dist::Built(built_dist @ BuiltDist::Path(_)) => { + ResolveError::Read(Box::new(built_dist), err) + } + Dist::Source(source_dist @ SourceDist::Path(_)) => { + ResolveError::Build(Box::new(source_dist), err) + } + Dist::Source(source_dist @ SourceDist::Directory(_)) => { + ResolveError::Build(Box::new(source_dist), err) + } + Dist::Built(built_dist) => { + ResolveError::Fetch(Box::new(built_dist), err) + } + Dist::Source(source_dist) => { + ResolveError::FetchAndBuild(Box::new(source_dist), err) + } + })?; + Response::Dist { dist, metadata } + } + ResolvedDist::Installed(dist) => { + let metadata = dist.metadata().map_err(|err| { + ResolveError::ReadInstalled(Box::new(dist.clone()), err) + })?; + Response::Installed { dist, metadata } + } + }; + + Ok(Some(response)) + } else { + Ok(None) + } + } + } + } + fn on_progress(&self, package: &PubGrubPackage, version: &Version) { if let Some(reporter) = self.reporter.as_ref() { match package { @@ -1445,6 +1394,12 @@ impl Solver { } } } + + fn on_complete(&self) { + if let Some(reporter) = self.reporter.as_ref() { + reporter.on_complete(); + } + } } /// State that is used during unit propagation in the resolver. diff --git a/crates/uv-types/src/traits.rs b/crates/uv-types/src/traits.rs index 0940d47d2ff5..e71d9fb05818 100644 --- a/crates/uv-types/src/traits.rs +++ b/crates/uv-types/src/traits.rs @@ -128,7 +128,7 @@ pub trait SourceBuildTrait { } /// A wrapper for [`uv_installer::SitePackages`] -pub trait InstalledPackagesProvider: Clone + Send + 'static { +pub trait InstalledPackagesProvider: Clone + Send + Sync + 'static { fn iter(&self) -> impl Iterator; fn get_packages(&self, name: &PackageName) -> Vec<&InstalledDist>; } From 93456000f62089a620faa3f18aafe3754c3687ed Mon Sep 17 00:00:00 2001 From: Ibraheem Ahmed Date: Thu, 16 May 2024 15:00:18 -0400 Subject: [PATCH 07/11] revert to bounded channels --- .../src/resolver/batch_prefetch.rs | 6 ++-- crates/uv-resolver/src/resolver/mod.rs | 28 +++++++++---------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/crates/uv-resolver/src/resolver/batch_prefetch.rs b/crates/uv-resolver/src/resolver/batch_prefetch.rs index d4fce8b6c273..311fa3b54952 100644 --- a/crates/uv-resolver/src/resolver/batch_prefetch.rs +++ b/crates/uv-resolver/src/resolver/batch_prefetch.rs @@ -3,7 +3,7 @@ use std::cmp::min; use itertools::Itertools; use pubgrub::range::Range; use rustc_hash::FxHashMap; -use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::mpsc::Sender; use tracing::{debug, trace}; use distribution_types::DistributionMetadata; @@ -49,7 +49,7 @@ impl BatchPrefetcher { next: &PubGrubPackage, version: &Version, current_range: &Range, - request_sink: &UnboundedSender, + request_sink: &Sender, index: &InMemoryIndex, selector: &CandidateSelector, ) -> anyhow::Result<(), ResolveError> { @@ -144,7 +144,7 @@ impl BatchPrefetcher { if index.distributions().register(candidate.version_id()) { let request = Request::from(dist); - request_sink.send(request)?; + request_sink.blocking_send(request)?; } } diff --git a/crates/uv-resolver/src/resolver/mod.rs b/crates/uv-resolver/src/resolver/mod.rs index 5921f3d7bd19..5b8533266bf6 100644 --- a/crates/uv-resolver/src/resolver/mod.rs +++ b/crates/uv-resolver/src/resolver/mod.rs @@ -14,9 +14,9 @@ use pubgrub::error::PubGrubError; use pubgrub::range::Range; use pubgrub::solver::{Incompatibility, State}; use rustc_hash::{FxHashMap, FxHashSet}; -use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; +use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::oneshot; -use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio_stream::wrappers::ReceiverStream; use tracing::{debug, enabled, instrument, trace, warn, Level}; use distribution_types::{ @@ -340,7 +340,7 @@ impl // A channel to fetch package metadata (e.g., given `flask`, fetch all versions) and version // metadata (e.g., given `flask==1.0.0`, fetch the metadata for that version). // Channel size is set large to accommodate batch prefetching. - let (request_sink, request_stream) = mpsc::unbounded_channel(); + let (request_sink, request_stream) = mpsc::channel(300); // Run the fetcher. let requests_fut = state.clone().fetch(provider.clone(), request_stream).fuse(); @@ -396,7 +396,7 @@ impl Shared { #[instrument(skip_all)] fn solve( self: Arc, - request_sink: UnboundedSender, + request_sink: Sender, ) -> Result { let root = PubGrubPackage::Root(self.project.clone()); let mut prefetcher = BatchPrefetcher::default(); @@ -627,7 +627,7 @@ impl Shared { fn visit_package( &self, package: &PubGrubPackage, - request_sink: &UnboundedSender, + request_sink: &Sender, ) -> Result<(), ResolveError> { match package { PubGrubPackage::Root(_) => {} @@ -641,7 +641,7 @@ impl Shared { // Emit a request to fetch the metadata for this package. if self.index.packages().register(name.clone()) { - request_sink.send(Request::Package(name.clone()))?; + request_sink.blocking_send(Request::Package(name.clone()))?; } } PubGrubPackage::Package(name, _extra, Some(url)) => { @@ -658,7 +658,7 @@ impl Shared { // Emit a request to fetch the metadata for this distribution. let dist = Dist::from_url(name.clone(), url.clone())?; if self.index.distributions().register(dist.version_id()) { - request_sink.send(Request::Dist(dist))?; + request_sink.blocking_send(Request::Dist(dist))?; } } } @@ -669,7 +669,7 @@ impl Shared { /// metadata for all of the packages in parallel. fn pre_visit<'data>( packages: impl Iterator)>, - request_sink: &UnboundedSender, + request_sink: &Sender, ) -> Result<(), ResolveError> { // Iterate over the potential packages, and fetch file metadata for any of them. These // represent our current best guesses for the versions that we _might_ select. @@ -677,7 +677,7 @@ impl Shared { let PubGrubPackage::Package(package_name, None, None) = package else { continue; }; - request_sink.send(Request::Prefetch(package_name.clone(), range.clone()))?; + request_sink.blocking_send(Request::Prefetch(package_name.clone(), range.clone()))?; } Ok(()) } @@ -692,7 +692,7 @@ impl Shared { package: &PubGrubPackage, range: &Range, pins: &mut FilePins, - request_sink: &UnboundedSender, + request_sink: &Sender, ) -> Result, ResolveError> { match package { PubGrubPackage::Root(_) => Ok(Some(ResolverVersion::Available(MIN_VERSION.clone()))), @@ -890,7 +890,7 @@ impl Shared { if matches!(package, PubGrubPackage::Package(_, _, _)) { if self.index.distributions().register(candidate.version_id()) { let request = Request::from(dist.for_resolution()); - request_sink.send(request)?; + request_sink.blocking_send(request)?; } } @@ -906,7 +906,7 @@ impl Shared { package: &PubGrubPackage, version: &Version, priorities: &mut PubGrubPriorities, - request_sink: &UnboundedSender, + request_sink: &Sender, ) -> Result { match package { PubGrubPackage::Root(_) => { @@ -1167,9 +1167,9 @@ impl Shared { async fn fetch( self: Arc, provider: Arc, - request_stream: UnboundedReceiver, + request_stream: Receiver, ) -> Result<(), ResolveError> { - let mut response_stream = UnboundedReceiverStream::new(request_stream) + let mut response_stream = ReceiverStream::new(request_stream) .map(|request| self.process_request(request, &*provider).boxed_local()) // Allow as many futures as possible to start in the background. // Backpressure is provided by at a more granular level by `DistributionDatabase` From 362e189de605b90f464793af5507b34bcdea9597 Mon Sep 17 00:00:00 2001 From: Ibraheem Ahmed Date: Thu, 16 May 2024 15:04:45 -0400 Subject: [PATCH 08/11] rename and document resolver states --- crates/uv-resolver/src/error.rs | 12 ++++---- crates/uv-resolver/src/resolver/mod.rs | 41 ++++++++++---------------- 2 files changed, 20 insertions(+), 33 deletions(-) diff --git a/crates/uv-resolver/src/error.rs b/crates/uv-resolver/src/error.rs index e16584a28b53..5132d61e2d40 100644 --- a/crates/uv-resolver/src/error.rs +++ b/crates/uv-resolver/src/error.rs @@ -8,6 +8,7 @@ use pubgrub::range::Range; use pubgrub::report::{DefaultStringReporter, DerivationTree, External, Reporter}; use rustc_hash::FxHashMap; +use dashmap::{DashMap, DashSet}; use distribution_types::{BuiltDist, IndexLocations, InstalledDist, ParsedUrlError, SourceDist}; use once_map::OnceMap; use pep440_rs::Version; @@ -18,10 +19,7 @@ use crate::candidate_selector::CandidateSelector; use crate::dependency_provider::UvDependencyProvider; use crate::pubgrub::{PubGrubPackage, PubGrubPython, PubGrubReportFormatter}; use crate::python_requirement::PythonRequirement; -use crate::resolver::{ - IncompletePackage, SharedMap, SharedSet, UnavailablePackage, UnavailableReason, - VersionsResponse, -}; +use crate::resolver::{IncompletePackage, UnavailablePackage, UnavailableReason, VersionsResponse}; #[derive(Debug, thiserror::Error)] pub enum ResolveError { @@ -237,7 +235,7 @@ impl NoSolutionError { pub(crate) fn with_available_versions( mut self, python_requirement: &PythonRequirement, - visited: &SharedSet, + visited: &DashSet, package_versions: &OnceMap>, ) -> Self { let mut available_versions = IndexMap::default(); @@ -301,7 +299,7 @@ impl NoSolutionError { #[must_use] pub(crate) fn with_unavailable_packages( mut self, - unavailable_packages: &SharedMap, + unavailable_packages: &DashMap, ) -> Self { let mut new = FxHashMap::default(); for package in self.derivation_tree.packages() { @@ -319,7 +317,7 @@ impl NoSolutionError { #[must_use] pub(crate) fn with_incomplete_packages( mut self, - incomplete_packages: &SharedMap>, + incomplete_packages: &DashMap>, ) -> Self { let mut new = FxHashMap::default(); for package in self.derivation_tree.packages() { diff --git a/crates/uv-resolver/src/resolver/mod.rs b/crates/uv-resolver/src/resolver/mod.rs index 5b8533266bf6..89d7f2c56685 100644 --- a/crates/uv-resolver/src/resolver/mod.rs +++ b/crates/uv-resolver/src/resolver/mod.rs @@ -173,25 +173,14 @@ enum ResolverVersion { Unavailable(Version, UnavailableVersion), } -pub(crate) type SharedMap = Arc>; -pub(crate) type SharedSet = Arc>; - pub struct Resolver { - state: Shared, + state: ResolverState, provider: Provider, } -impl Deref - for Resolver -{ - type Target = Shared; - - fn deref(&self) -> &Self::Target { - &self.state - } -} - -pub struct Shared { +/// State that is shared between the prefetcher and the PubGrub solver during +/// resolution. +struct ResolverState { project: Option, requirements: Vec, constraints: Constraints, @@ -210,11 +199,11 @@ pub struct Shared { index: InMemoryIndex, installed_packages: InstalledPackages, /// Incompatibilities for packages that are entirely unavailable. - unavailable_packages: SharedMap, + unavailable_packages: DashMap, /// Incompatibilities for packages that are unavailable at specific versions. - incomplete_packages: SharedMap>, + incomplete_packages: DashMap>, /// The set of all registry-based packages visited during resolution. - visited: SharedSet, + visited: DashSet, reporter: Option>, } @@ -293,11 +282,11 @@ impl provider: Provider, installed_packages: InstalledPackages, ) -> Result { - let state = Shared { + let state = ResolverState { index: index.clone(), - unavailable_packages: SharedMap::default(), - incomplete_packages: SharedMap::default(), - visited: SharedSet::default(), + unavailable_packages: DashMap::default(), + incomplete_packages: DashMap::default(), + visited: DashSet::default(), selector: CandidateSelector::for_resolution(options, &manifest, markers), dependency_mode: options.dependency_mode, urls: Urls::from_manifest(&manifest, markers, options.dependency_mode)?, @@ -324,7 +313,7 @@ impl let reporter = Arc::new(reporter); Self { - state: Shared { + state: ResolverState { reporter: Some(reporter.clone()), ..self.state }, @@ -391,7 +380,7 @@ impl } } -impl Shared { +impl ResolverState { /// Run the PubGrub solver. #[instrument(skip_all)] fn solve( @@ -400,7 +389,7 @@ impl Shared { ) -> Result { let root = PubGrubPackage::Root(self.project.clone()); let mut prefetcher = BatchPrefetcher::default(); - let mut state = ResolverState { + let mut state = SolveState { pubgrub: State::init(root.clone(), MIN_VERSION.clone()), next: root, pins: FilePins::default(), @@ -1404,7 +1393,7 @@ impl Shared { /// State that is used during unit propagation in the resolver. #[derive(Clone)] -struct ResolverState { +struct SolveState { /// The internal state used by the resolver. /// /// Note that not all parts of this state are strictly internal. For From a4479106badaa321d77a3a5143c12e742b63535f Mon Sep 17 00:00:00 2001 From: Ibraheem Ahmed Date: Thu, 16 May 2024 15:07:02 -0400 Subject: [PATCH 09/11] fix clippy lints --- crates/uv-resolver/src/preferences.rs | 3 ++- crates/uv-resolver/src/resolver/mod.rs | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/crates/uv-resolver/src/preferences.rs b/crates/uv-resolver/src/preferences.rs index e4f658b88832..5f1aa636bfa8 100644 --- a/crates/uv-resolver/src/preferences.rs +++ b/crates/uv-resolver/src/preferences.rs @@ -1,4 +1,5 @@ -use std::{str::FromStr, sync::Arc}; +use std::str::FromStr; +use std::sync::Arc; use rustc_hash::FxHashMap; use tracing::trace; diff --git a/crates/uv-resolver/src/resolver/mod.rs b/crates/uv-resolver/src/resolver/mod.rs index 89d7f2c56685..87f9f636c44d 100644 --- a/crates/uv-resolver/src/resolver/mod.rs +++ b/crates/uv-resolver/src/resolver/mod.rs @@ -304,7 +304,7 @@ impl reporter: None, installed_packages, }; - Ok(Self { provider, state }) + Ok(Self { state, provider }) } /// Set the [`Reporter`] to use for this installer. @@ -334,7 +334,7 @@ impl // Run the fetcher. let requests_fut = state.clone().fetch(provider.clone(), request_stream).fuse(); - // Spawn the PubGrub solver task. + // Spawn the PubGrub solver on a dedicated thread. let solver = state.clone(); let (tx, rx) = oneshot::channel(); thread::Builder::new() From d1d0f299244b10ec1a3efb81b3329aa5de3a8abc Mon Sep 17 00:00:00 2001 From: Ibraheem Ahmed Date: Fri, 17 May 2024 11:30:21 -0400 Subject: [PATCH 10/11] update docs --- crates/uv-resolver/src/resolver/index.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/uv-resolver/src/resolver/index.rs b/crates/uv-resolver/src/resolver/index.rs index 0f91cc68df69..fedcbcffe1d5 100644 --- a/crates/uv-resolver/src/resolver/index.rs +++ b/crates/uv-resolver/src/resolver/index.rs @@ -14,19 +14,19 @@ pub struct InMemoryIndex(Arc); struct SharedInMemoryIndex { /// A map from package name to the metadata for that package and the index where the metadata /// came from. - pub(crate) packages: OnceMap>, + packages: OnceMap>, /// A map from package ID to metadata for that distribution. - pub(crate) distributions: OnceMap>, + distributions: OnceMap>, } impl InMemoryIndex { - /// Insert a [`VersionsResponse`] into the index. + /// Returns a reference to the package metadata map. pub fn packages(&self) -> &OnceMap> { &self.0.packages } - /// Insert a [`Metadata23`] into the index. + /// Returns a reference to the distribution metadata map. pub fn distributions(&self) -> &OnceMap> { &self.0.distributions } From 4f2abc471363871a9468c3755daca7f562749552 Mon Sep 17 00:00:00 2001 From: Ibraheem Ahmed Date: Fri, 17 May 2024 11:39:49 -0400 Subject: [PATCH 11/11] rename `SharedPythonEnvironment` to `PythonEnvironmentShared` --- crates/uv-interpreter/src/environment.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/crates/uv-interpreter/src/environment.rs b/crates/uv-interpreter/src/environment.rs index 6f5d655b7ec2..69df98e39154 100644 --- a/crates/uv-interpreter/src/environment.rs +++ b/crates/uv-interpreter/src/environment.rs @@ -13,10 +13,10 @@ use crate::{find_default_python, find_requested_python, Error, Interpreter, Targ /// A Python environment, consisting of a Python [`Interpreter`] and its associated paths. #[derive(Debug, Clone)] -pub struct PythonEnvironment(Arc); +pub struct PythonEnvironment(Arc); #[derive(Debug, Clone)] -struct SharedPythonEnvironment { +struct PythonEnvironmentShared { root: PathBuf, interpreter: Interpreter, } @@ -50,7 +50,7 @@ impl PythonEnvironment { interpreter.base_prefix().display() ); - Ok(Self(Arc::new(SharedPythonEnvironment { + Ok(Self(Arc::new(PythonEnvironmentShared { root: venv, interpreter, }))) @@ -61,7 +61,7 @@ impl PythonEnvironment { let Some(interpreter) = find_requested_python(python, cache)? else { return Err(Error::RequestedPythonNotFound(python.to_string())); }; - Ok(Self(Arc::new(SharedPythonEnvironment { + Ok(Self(Arc::new(PythonEnvironmentShared { root: interpreter.prefix().to_path_buf(), interpreter, }))) @@ -70,7 +70,7 @@ impl PythonEnvironment { /// Create a [`PythonEnvironment`] for the default Python interpreter. pub fn from_default_python(cache: &Cache) -> Result { let interpreter = find_default_python(cache)?; - Ok(Self(Arc::new(SharedPythonEnvironment { + Ok(Self(Arc::new(PythonEnvironmentShared { root: interpreter.prefix().to_path_buf(), interpreter, }))) @@ -78,7 +78,7 @@ impl PythonEnvironment { /// Create a [`PythonEnvironment`] from an existing [`Interpreter`] and root directory. pub fn from_interpreter(interpreter: Interpreter) -> Self { - Self(Arc::new(SharedPythonEnvironment { + Self(Arc::new(PythonEnvironmentShared { root: interpreter.prefix().to_path_buf(), interpreter, })) @@ -88,7 +88,7 @@ impl PythonEnvironment { #[must_use] pub fn with_target(self, target: Target) -> Self { let inner = Arc::unwrap_or_clone(self.0); - Self(Arc::new(SharedPythonEnvironment { + Self(Arc::new(PythonEnvironmentShared { interpreter: inner.interpreter.with_target(target), ..inner }))