From 3f5e7306a5df0b6161dab3bb81bce9407dc53995 Mon Sep 17 00:00:00 2001 From: Charlie Marsh Date: Tue, 30 Jan 2024 12:25:22 -0800 Subject: [PATCH] Remove `WaitMap` dependency (#1183) ## Summary This is an attempt to https://github.com/astral-sh/puffin/pull/1163 by removing the `WaitMap` and gaining more granular control over the values that we hold over `await` boundaries. --- Cargo.lock | 117 ++++--------------- Cargo.toml | 1 - crates/once-map/Cargo.toml | 4 +- crates/once-map/src/lib.rs | 92 +++++++-------- crates/puffin-dispatch/Cargo.toml | 1 - crates/puffin-installer/Cargo.toml | 1 - crates/puffin-installer/src/downloader.rs | 23 ++-- crates/puffin-resolver/Cargo.toml | 1 - crates/puffin-resolver/src/error.rs | 7 +- crates/puffin-resolver/src/resolution.rs | 12 +- crates/puffin-resolver/src/resolver/index.rs | 10 -- crates/puffin-resolver/src/resolver/mod.rs | 78 +++++++------ crates/puffin/Cargo.toml | 1 - 13 files changed, 130 insertions(+), 218 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 50a00af47560..843c63e246f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,15 +17,6 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" -[[package]] -name = "ahash" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8fd72866655d1904d6b0997d0b07ba561047d070fbe29de039031c641b61217" -dependencies = [ - "const-random", -] - [[package]] name = "ahash" version = "0.7.7" @@ -251,7 +242,7 @@ checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" dependencies = [ "addr2line", "cc", - "cfg-if 1.0.0", + "cfg-if", "libc", "miniz_oxide", "object", @@ -464,12 +455,6 @@ dependencies = [ "libc", ] -[[package]] -name = "cfg-if" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" - [[package]] name = "cfg-if" version = "1.0.0" @@ -602,26 +587,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "const-random" -version = "0.1.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aaf16c9c2c612020bcfd042e170f6e32de9b9d75adb5277cdbbd2e2c8c8299a" -dependencies = [ - "const-random-macro", -] - -[[package]] -name = "const-random-macro" -version = "0.1.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" -dependencies = [ - "getrandom", - "once_cell", - "tiny-keccak", -] - [[package]] name = "core-foundation" version = "0.9.4" @@ -653,7 +618,7 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", ] [[package]] @@ -752,24 +717,13 @@ dependencies = [ "memchr", ] -[[package]] -name = "dashmap" -version = "3.11.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f260e2fc850179ef410018660006951c1b55b79e8087e87111a2c388994b9b5" -dependencies = [ - "ahash 0.3.8", - "cfg-if 0.1.10", - "num_cpus", -] - [[package]] name = "dashmap" version = "5.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "hashbrown 0.14.3", "lock_api", "once_cell", @@ -910,7 +864,7 @@ version = "0.8.33" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7268b386296a025e474d5140678f75d6de9493ae55a5d709eeb9dd08149945e1" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", ] [[package]] @@ -941,7 +895,7 @@ version = "0.2.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ee447700ac8aa0b2f2bd7bc4462ad686ba06baa6727ac149a2d6277f0d240fd" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "libc", "redox_syscall 0.4.1", "windows-sys 0.52.0", @@ -1132,7 +1086,7 @@ version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "190092ea657667030ac6a35e305e62fc4dd69fd98ac98631e5d3a2b1575a12b5" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "js-sys", "libc", "wasi", @@ -1247,7 +1201,7 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc52e53916c08643f1b56ec082790d1e86a32e58dc5268f897f313fbae7b4872" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "crunchy", ] @@ -1257,7 +1211,7 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" dependencies = [ - "ahash 0.7.7", + "ahash", ] [[package]] @@ -1573,7 +1527,7 @@ version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "js-sys", "wasm-bindgen", "web-sys", @@ -1974,9 +1928,9 @@ dependencies = [ name = "once-map" version = "0.0.1" dependencies = [ - "rustc-hash", + "dashmap", "thiserror", - "waitmap", + "tokio", ] [[package]] @@ -2076,7 +2030,7 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "instant", "libc", "redox_syscall 0.2.16", @@ -2090,7 +2044,7 @@ version = "0.9.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "libc", "redox_syscall 0.4.1", "smallvec", @@ -2414,7 +2368,6 @@ dependencies = [ "tracing-subscriber", "tracing-tree", "url", - "waitmap", "which", ] @@ -2581,7 +2534,6 @@ dependencies = [ "tempfile", "tokio", "tracing", - "waitmap", ] [[package]] @@ -2708,7 +2660,6 @@ dependencies = [ "tokio", "tracing", "url", - "waitmap", ] [[package]] @@ -2757,7 +2708,7 @@ dependencies = [ "cache-key", "chrono", "clap", - "dashmap 5.5.3", + "dashmap", "derivative", "distribution-filename", "distribution-types", @@ -2797,7 +2748,6 @@ dependencies = [ "tokio-util", "tracing", "url", - "waitmap", "zip", ] @@ -2845,7 +2795,7 @@ version = "0.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a89dc7a5850d0e983be1ec2a463a171d20990487c3cfcd68b5363f1ee3d6fe0" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "indoc", "libc", "memoffset", @@ -3071,7 +3021,7 @@ version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "767be24c0da52e7448d495b8d162506a9aa125426651d547d545d6c2b4b65b62" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "rustix", "windows", ] @@ -3490,7 +3440,7 @@ version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "cpufeatures", "digest", ] @@ -3501,7 +3451,7 @@ version = "0.10.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "cpufeatures", "digest", ] @@ -3706,7 +3656,7 @@ version = "3.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "01ce4141aa927a6d1bd34a041795abd0db1cccba5d5f24b009f694bdf3a1f3fa" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "fastrand", "redox_syscall 0.4.1", "rustix", @@ -3744,7 +3694,7 @@ version = "3.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adcb7fd841cd518e279be3d5a3eb0636409487998a4aff22f3de87b81e88384f" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "proc-macro2", "quote", "syn 2.0.48", @@ -3808,7 +3758,7 @@ version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "once_cell", ] @@ -3861,15 +3811,6 @@ dependencies = [ "time-core", ] -[[package]] -name = "tiny-keccak" -version = "2.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" -dependencies = [ - "crunchy", -] - [[package]] name = "tinytemplate" version = "1.2.1" @@ -4288,16 +4229,6 @@ dependencies = [ "libc", ] -[[package]] -name = "waitmap" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28491611b6b9a0b9f027be139a4be792b13a20780100dd8b054d44dbf596d52b" -dependencies = [ - "dashmap 3.11.10", - "smallvec", -] - [[package]] name = "walkdir" version = "2.4.0" @@ -4329,7 +4260,7 @@ version = "0.2.90" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1223296a201415c7fad14792dbefaace9bd52b62d33453ade1c5b5f07555406" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "wasm-bindgen-macro", ] @@ -4354,7 +4285,7 @@ version = "0.4.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bde2032aeb86bdfaecc8b261eef3cba735cc426c1f3a3416d1e0791be95fc461" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "js-sys", "wasm-bindgen", "web-sys", @@ -4643,7 +4574,7 @@ version = "0.50.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "windows-sys 0.48.0", ] diff --git a/Cargo.toml b/Cargo.toml index 19b014dc04ff..79dbd4150ae5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -104,7 +104,6 @@ unicode-width = { version = "0.1.11" } unscanny = { version = "0.1.0" } url = { version = "2.5.0" } uuid = { version = "1.7.0", default-features = false } -waitmap = { version = "1.1.0" } walkdir = { version = "2.4.0" } which = { version = "6.0.0" } zip = { version = "0.6.6", default-features = false, features = ["deflate"] } diff --git a/crates/once-map/Cargo.toml b/crates/once-map/Cargo.toml index 654e70e871fc..71e5d99006af 100644 --- a/crates/once-map/Cargo.toml +++ b/crates/once-map/Cargo.toml @@ -13,6 +13,6 @@ license = { workspace = true } workspace = true [dependencies] -rustc-hash = { workspace = true } +dashmap = { workspace = true } thiserror = { workspace = true } -waitmap = { workspace = true } +tokio = { workspace = true } diff --git a/crates/once-map/src/lib.rs b/crates/once-map/src/lib.rs index fbc78e6d6839..0ac543981309 100644 --- a/crates/once-map/src/lib.rs +++ b/crates/once-map/src/lib.rs @@ -1,11 +1,9 @@ use std::borrow::Borrow; -use std::collections::hash_map::RandomState; - use std::hash::Hash; -use std::sync::Mutex; +use std::sync::Arc; -use rustc_hash::FxHashSet; -use waitmap::{Ref, WaitMap}; +use dashmap::DashMap; +use tokio::sync::Notify; /// Run tasks only once and store the results in a parallel hash map. /// @@ -14,9 +12,7 @@ use waitmap::{Ref, WaitMap}; /// dist builds, we want to wait until the other task is done and get a reference to the same /// result. pub struct OnceMap { - /// Computations that were started, including those that were finished. - started: Mutex>, - wait_map: WaitMap, + items: DashMap>, } impl OnceMap { @@ -25,72 +21,68 @@ impl OnceMap { /// If this method returns `true`, you need to start a job and call [`OnceMap::done`] eventually /// or other tasks will hang. If it returns `false`, this job is already in progress and you /// can [`OnceMap::wait`] for the result. - pub fn register(&self, key: &Q) -> bool - where - K: Borrow, - Q: ?Sized + Hash + Eq + ToOwned, - { - let mut lock = self.started.lock().unwrap(); - if lock.contains(key) { - return false; + pub fn register(&self, key: K) -> bool { + let entry = self.items.entry(key); + match entry { + dashmap::mapref::entry::Entry::Occupied(_) => false, + dashmap::mapref::entry::Entry::Vacant(entry) => { + entry.insert(Value::Waiting(Arc::new(Notify::new()))); + true + } } - lock.insert(key.to_owned()) - } - - /// Like [`OnceMap::register`], but takes ownership of the key. - pub fn register_owned(&self, key: K) -> bool { - let mut lock = self.started.lock().unwrap(); - if lock.contains(&key) { - return false; - } - lock.insert(key) } /// Submit the result of a job you registered. pub fn done(&self, key: K, value: V) { - self.wait_map.insert(key, value); + if let Some(Value::Waiting(notify)) = self.items.insert(key, Value::Filled(Arc::new(value))) + { + notify.notify_waiters(); + } } /// Wait for the result of a job that is running. /// /// Will hang if [`OnceMap::done`] isn't called for this key. - pub async fn wait( - &self, - key: &Q, - ) -> Result, Error> - where - K: Borrow + for<'a> From<&'a Q>, - { - self.wait_map.wait(key).await.ok_or(Error::Canceled) + pub async fn wait(&self, key: &K) -> Option> { + let entry = self.items.get(key)?; + match entry.value() { + Value::Filled(value) => Some(value.clone()), + Value::Waiting(notify) => { + let notify = notify.clone(); + drop(entry); + notify.notified().await; + + let entry = self.items.get(key).expect("map is append-only"); + match entry.value() { + Value::Filled(value) => Some(value.clone()), + Value::Waiting(_) => unreachable!("notify was called"), + } + } + } } /// Return the result of a previous job, if any. - pub fn get(&self, key: &Q) -> Option> + pub fn get(&self, key: &Q) -> Option> where K: Borrow, { - self.wait_map.get(key) - } - - /// Cancel all waiting tasks. - /// - /// Warning: waiting on tasks that have been canceled will cause the map to hang. - pub fn cancel_all(&self) { - self.wait_map.cancel_all(); + let entry = self.items.get(key)?; + match entry.value() { + Value::Filled(value) => Some(value.clone()), + Value::Waiting(_) => None, + } } } impl Default for OnceMap { fn default() -> Self { Self { - started: Mutex::new(FxHashSet::default()), - wait_map: WaitMap::new(), + items: DashMap::new(), } } } -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("The operation was canceled")] - Canceled, +enum Value { + Waiting(Arc), + Filled(Arc), } diff --git a/crates/puffin-dispatch/Cargo.toml b/crates/puffin-dispatch/Cargo.toml index 4dabdf40c918..7ea34e33165d 100644 --- a/crates/puffin-dispatch/Cargo.toml +++ b/crates/puffin-dispatch/Cargo.toml @@ -35,4 +35,3 @@ itertools = { workspace = true } tempfile = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } -waitmap = { workspace = true } diff --git a/crates/puffin-installer/Cargo.toml b/crates/puffin-installer/Cargo.toml index ffe5250a89a2..9583c6a350f1 100644 --- a/crates/puffin-installer/Cargo.toml +++ b/crates/puffin-installer/Cargo.toml @@ -42,4 +42,3 @@ thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } url = { workspace = true } -waitmap = { workspace = true } diff --git a/crates/puffin-installer/src/downloader.rs b/crates/puffin-installer/src/downloader.rs index 55090ea0d1b2..a17f6b120c60 100644 --- a/crates/puffin-installer/src/downloader.rs +++ b/crates/puffin-installer/src/downloader.rs @@ -29,8 +29,6 @@ pub enum Error { Editable(#[from] puffin_distribution::Error), #[error("Unzip failed in another thread: {0}")] Thread(String), - #[error(transparent)] - OnceMap(#[from] once_map::Error), } /// Download, build, and unzip a set of distributions. @@ -159,7 +157,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> { #[instrument(skip_all, fields(name = % dist, size = ? dist.size(), url = dist.file().map(| file | file.url.to_string()).unwrap_or_default()))] pub async fn get_wheel(&self, dist: Dist, in_flight: &InFlight) -> Result { let id = dist.distribution_id(); - let wheel = if in_flight.downloads.register(&id) { + if in_flight.downloads.register(id.clone()) { let download: LocalWheel = self .database .get_or_build_wheel(dist.clone()) @@ -170,24 +168,25 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> { match result { Ok(cached) => { in_flight.downloads.done(id, Ok(cached.clone())); - cached + Ok(cached) } Err(err) => { in_flight.downloads.done(id, Err(err.to_string())); - return Err(err); + Err(err) } } } else { - in_flight + let result = in_flight .downloads .wait(&id) - .await? - .value() - .clone() - .map_err(Error::Thread)? - }; + .await + .expect("missing value for registered task"); - Ok(wheel) + match result.as_ref() { + Ok(cached) => Ok(cached.clone()), + Err(err) => Err(Error::Thread(err.to_string())), + } + } } /// Unzip a locally-available wheel into the cache. diff --git a/crates/puffin-resolver/Cargo.toml b/crates/puffin-resolver/Cargo.toml index 59ef78a7e42c..def0233bb10f 100644 --- a/crates/puffin-resolver/Cargo.toml +++ b/crates/puffin-resolver/Cargo.toml @@ -57,7 +57,6 @@ tokio = { workspace = true, features = ["macros"] } tokio-util = { workspace = true, features = ["compat"] } tracing = { workspace = true } url = { workspace = true } -waitmap = { workspace = true } zip = { workspace = true } [dev-dependencies] diff --git a/crates/puffin-resolver/src/error.rs b/crates/puffin-resolver/src/error.rs index 32b17c6faf5b..d6f18685b3ba 100644 --- a/crates/puffin-resolver/src/error.rs +++ b/crates/puffin-resolver/src/error.rs @@ -36,8 +36,8 @@ pub enum ResolveError { #[error(transparent)] Join(#[from] tokio::task::JoinError), - #[error(transparent)] - OnceMap(#[from] once_map::Error), + #[error("Attempted to wait on an unregistered task")] + Unregistered, #[error("Package metadata name `{metadata}` does not match given name `{given}`")] NameMismatch { @@ -193,8 +193,7 @@ impl NoSolutionError { // these packages, but it's non-deterministic, and omitting them ensures that // we represent the state of the resolver at the time of failure. if visited.contains(name) { - if let Some(entry) = package_versions.get(name) { - let version_map = entry.value(); + if let Some(version_map) = package_versions.get(name) { available_versions.insert( package.clone(), version_map diff --git a/crates/puffin-resolver/src/resolution.rs b/crates/puffin-resolver/src/resolution.rs index 6f500bdf7e33..be2575f3fbde 100644 --- a/crates/puffin-resolver/src/resolution.rs +++ b/crates/puffin-resolver/src/resolution.rs @@ -68,8 +68,7 @@ impl ResolutionGraph { .clone(); // Add its hashes to the index. - if let Some(entry) = packages.get(package_name) { - let version_map = entry.value(); + if let Some(version_map) = packages.get(package_name) { hashes.insert(package_name.clone(), { let mut hashes = version_map.hashes(version); hashes.sort_unstable(); @@ -94,8 +93,7 @@ impl ResolutionGraph { }; // Add its hashes to the index. - if let Some(entry) = packages.get(package_name) { - let version_map = entry.value(); + if let Some(version_map) = packages.get(package_name) { hashes.insert(package_name.clone(), { let mut hashes = version_map.hashes(version); hashes.sort_unstable(); @@ -110,10 +108,9 @@ impl ResolutionGraph { PubGrubPackage::Package(package_name, Some(extra), None) => { // Validate that the `extra` exists. let dist = PubGrubDistribution::from_registry(package_name, version); - let entry = distributions + let metadata = distributions .get(&dist.package_id()) .expect("Every package should have metadata"); - let metadata = entry.value(); if !metadata.provides_extras.contains(extra) { let pinned_package = pins @@ -130,10 +127,9 @@ impl ResolutionGraph { PubGrubPackage::Package(package_name, Some(extra), Some(url)) => { // Validate that the `extra` exists. let dist = PubGrubDistribution::from_url(package_name, url); - let entry = distributions + let metadata = distributions .get(&dist.package_id()) .expect("Every package should have metadata"); - let metadata = entry.value(); if !metadata.provides_extras.contains(extra) { let url = redirects.get(url).map_or_else( diff --git a/crates/puffin-resolver/src/resolver/index.rs b/crates/puffin-resolver/src/resolver/index.rs index c8e58fbfd2f1..7b8d0bb5ef53 100644 --- a/crates/puffin-resolver/src/resolver/index.rs +++ b/crates/puffin-resolver/src/resolver/index.rs @@ -23,13 +23,3 @@ pub struct InMemoryIndex { /// `git+https://github.com/pallets/flask.git@c2f65dd1cfff0672b902fd5b30815f0b4137214c`. pub(crate) redirects: DashMap, } - -impl InMemoryIndex { - /// Cancel all waiting tasks. - /// - /// Warning: waiting on tasks that have been canceled will cause the index to hang. - pub(crate) fn cancel_all(&self) { - self.packages.cancel_all(); - self.distributions.cancel_all(); - } -} diff --git a/crates/puffin-resolver/src/resolver/mod.rs b/crates/puffin-resolver/src/resolver/mod.rs index 09b2d8cd1093..74155a33a482 100644 --- a/crates/puffin-resolver/src/resolver/mod.rs +++ b/crates/puffin-resolver/src/resolver/mod.rs @@ -139,7 +139,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { // Mock editable responses. let package_id = dist.package_id(); - index.distributions.register(&package_id); + index.distributions.register(package_id.clone()); index.distributions.done(package_id, metadata.clone()); editables.insert( dist.name().clone(), @@ -217,10 +217,6 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { } resolution = resolve_fut => { resolution.map_err(|err| { - // Ensure that any waiting tasks are cancelled prior to accessing any of the - // index entries. - self.index.cancel_all(); - // Add version information to improve unsat error messages. if let ResolveError::NoSolution(err) = err { ResolveError::NoSolution( @@ -386,7 +382,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { /// Visit a [`PubGrubPackage`] prior to selection. This should be called on a [`PubGrubPackage`] /// before it is selected, to allow metadata to be fetched in parallel. - fn visit_package( + async fn visit_package( &self, package: &PubGrubPackage, priorities: &mut PubGrubPriorities, @@ -397,17 +393,23 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { PubGrubPackage::Python(_) => {} PubGrubPackage::Package(package_name, _extra, None) => { // Emit a request to fetch the metadata for this package. - if self.index.packages.register(package_name) { + if self.index.packages.register(package_name.clone()) { priorities.add(package_name.clone()); request_sink.unbounded_send(Request::Package(package_name.clone()))?; + + // Yield to allow subscribers to continue, as the channel is sync. + tokio::task::yield_now().await; } } PubGrubPackage::Package(package_name, _extra, Some(url)) => { // Emit a request to fetch the metadata for this distribution. let dist = Dist::from_url(package_name.clone(), url.clone())?; - if self.index.distributions.register_owned(dist.package_id()) { + if self.index.distributions.register(dist.package_id()) { priorities.add(dist.name().clone()); request_sink.unbounded_send(Request::Dist(dist))?; + + // Yield to allow subscribers to continue, as the channel is sync. + tokio::task::yield_now().await; } } } @@ -492,8 +494,12 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { } else { // Otherwise, assume this is a source distribution. let dist = PubGrubDistribution::from_url(package_name, url); - let entry = self.index.distributions.wait(&dist.package_id()).await?; - let metadata = entry.value(); + let metadata = self + .index + .distributions + .wait(&dist.package_id()) + .await + .ok_or(ResolveError::Unregistered)?; let version = &metadata.version; if range.contains(version) { Ok(Some(version.clone())) @@ -505,13 +511,13 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { PubGrubPackage::Package(package_name, extra, None) => { // Wait for the metadata to be available. - let entry = self + let version_map = self .index .packages .wait(package_name) .instrument(info_span!("package_wait", %package_name)) - .await?; - let version_map = entry.value(); + .await + .ok_or(ResolveError::Unregistered)?; self.visited.insert(package_name.clone()); if let Some(extra) = extra { @@ -523,7 +529,8 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { } // Find a compatible version. - let Some(candidate) = self.selector.select(package_name, range, version_map) else { + let Some(candidate) = self.selector.select(package_name, range, &version_map) + else { // Short circuit: we couldn't find _any_ compatible versions for a package. return Ok(None); }; @@ -567,13 +574,12 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { let version = candidate.version().clone(); // Emit a request to fetch the metadata for this version. - if self - .index - .distributions - .register_owned(candidate.package_id()) - { + if self.index.distributions.register(candidate.package_id()) { let dist = candidate.resolve().dist.clone(); request_sink.unbounded_send(Request::Dist(dist))?; + + // Yield to allow subscribers to continue, as the channel is sync. + tokio::task::yield_now().await; } Ok(Some(version)) @@ -613,7 +619,8 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { debug!("Adding direct dependency: {package}{version}"); // Emit a request to fetch the metadata for this package. - self.visit_package(package, priorities, request_sink)?; + self.visit_package(package, priorities, request_sink) + .await?; } // Add a dependency on each editable. @@ -661,7 +668,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { // If the package is known to be incompatible, return the Python version as an // incompatibility, and skip fetching the metadata. if let Some(entry) = self.incompatibilities.get(&package_id) { - let requires_python = entry.value(); + let requires_python = entry; let version = requires_python .iter() .map(PubGrubSpecifier::try_from) @@ -678,13 +685,13 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { return Ok(Dependencies::Available(constraints)); } - let entry = self + let metadata = self .index .distributions .wait(&package_id) .instrument(info_span!("distributions_wait", %package_id)) - .await?; - let metadata = entry.value(); + .await + .ok_or(ResolveError::Unregistered)?; let mut constraints = PubGrubDependencies::from_requirements( &metadata.requires_dist, @@ -699,7 +706,8 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { debug!("Adding transitive dependency: {package}{version}"); // Emit a request to fetch the metadata for this package. - self.visit_package(package, priorities, request_sink)?; + self.visit_package(package, priorities, request_sink) + .await?; } // If a package has an extra, insert a constraint on the base package. @@ -761,6 +769,9 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { } None => {} } + + // Yield to allow subscribers to continue, as the channel is sync. + tokio::task::yield_now().await; } Ok::<(), ResolveError>(()) @@ -809,12 +820,16 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { // Pre-fetch the package and distribution metadata. Request::Prefetch(package_name, range) => { // Wait for the package metadata to become available. - let entry = self.index.packages.wait(&package_name).await?; - let version_map = entry.value(); + let version_map = self + .index + .packages + .wait(&package_name) + .await + .ok_or(ResolveError::Unregistered)?; // Try to find a compatible version. If there aren't any compatible versions, // short-circuit and return `None`. - let Some(candidate) = self.selector.select(&package_name, &range, version_map) + let Some(candidate) = self.selector.select(&package_name, &range, &version_map) else { return Ok(None); }; @@ -827,13 +842,8 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { } // Emit a request to fetch the metadata for this version. - if self - .index - .distributions - .register_owned(candidate.package_id()) - { + if self.index.distributions.register(candidate.package_id()) { let dist = candidate.resolve().dist.clone(); - drop(entry); let (metadata, precise) = self .provider diff --git a/crates/puffin/Cargo.toml b/crates/puffin/Cargo.toml index bd23298a47c9..253268d3e1a9 100644 --- a/crates/puffin/Cargo.toml +++ b/crates/puffin/Cargo.toml @@ -68,7 +68,6 @@ tracing-durations-export = { workspace = true, features = ["plot"], optional = t tracing-subscriber = { workspace = true } tracing-tree = { workspace = true } url = { workspace = true } -waitmap = { workspace = true } which = { workspace = true } [target.'cfg(target_os = "windows")'.dependencies]