Skip to content

Commit

Permalink
Yield after channel send and move cpu tasks to thread
Browse files Browse the repository at this point in the history
  • Loading branch information
konstin committed Jan 29, 2024
1 parent ff5ebb1 commit 0f15008
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 63 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ tokio-util = { version = "0.7.10", features = ["compat"] }
toml = { version = "0.8.8" }
toml_edit = { version = "0.21.0" }
tracing = { version = "0.1.40" }
tracing-durations-export = { version = "0.1.0", features = ["plot"] }
tracing-durations-export = { version = "0.1.2", features = ["plot"] }
tracing-indicatif = { version = "0.3.6" }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tracing-tree = { version = "0.3.0" }
Expand Down
8 changes: 8 additions & 0 deletions crates/distribution-types/src/id.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::fmt::{Display, Formatter};

/// A unique identifier for a package (e.g., `black==23.10.0`).
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct PackageId(String);
Expand All @@ -8,6 +10,12 @@ impl PackageId {
}
}

impl Display for PackageId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}

/// A unique identifier for a distribution (e.g., `black-23.10.0-py3-none-any.whl`).
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct DistributionId(String);
Expand Down
12 changes: 8 additions & 4 deletions crates/puffin-client/src/cached_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl CachedClient {
/// client.
#[instrument(skip_all)]
pub async fn get_cached_with_callback<
Payload: Serialize + DeserializeOwned + Send,
Payload: Serialize + DeserializeOwned + Send + 'static,
CallBackError,
Callback,
CallbackReturn,
Expand Down Expand Up @@ -172,7 +172,7 @@ impl CachedClient {
}
}

async fn read_cache<Payload: Serialize + DeserializeOwned + Send>(
async fn read_cache<Payload: Serialize + DeserializeOwned + Send + 'static>(
cache_entry: &CacheEntry,
) -> Option<DataWithCachePolicy<Payload>> {
let read_span = info_span!("read_cache", file = %cache_entry.path().display());
Expand All @@ -185,8 +185,12 @@ impl CachedClient {
"parse_cache",
path = %cache_entry.path().display()
);
let parse_result = parse_span
.in_scope(|| rmp_serde::from_slice::<DataWithCachePolicy<Payload>>(&cached));
let parse_result = tokio::task::spawn_blocking(move || {
parse_span
.in_scope(|| rmp_serde::from_slice::<DataWithCachePolicy<Payload>>(&cached))
})
.await
.expect("Tokio executor failed, was there a panic?");
match parse_result {
Ok(data) => Some(data),
Err(err) => {
Expand Down
2 changes: 1 addition & 1 deletion crates/puffin-client/src/registry_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ impl RegistryClient {
/// 1. From a [PEP 658](https://peps.python.org/pep-0658/) data-dist-info-metadata url
/// 2. From a remote wheel by partial zip reading
/// 3. From a (temp) download of a remote wheel (this is a fallback, the webserver should support range requests)
#[instrument(skip(self))]
#[instrument(skip_all, fields(%built_dist))]
pub async fn wheel_metadata(&self, built_dist: &BuiltDist) -> Result<Metadata21, Error> {
let metadata = match &built_dist {
BuiltDist::Registry(wheel) => match &wheel.file.url {
Expand Down
2 changes: 1 addition & 1 deletion crates/puffin-dev/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ rustc-hash = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-durations-export = { version = "0.1.0", features = ["plot"] }
tracing-durations-export = { version = "0.1.2", features = ["plot"] }
tracing-indicatif = { workspace = true }
tracing-subscriber = { workspace = true }
url = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/puffin-distribution/src/distribution_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
/// Returns the [`Metadata21`], along with a "precise" URL for the source distribution, if
/// possible. For example, given a Git dependency with a reference to a branch or tag, return a
/// URL with a precise reference to the current commit of that branch or tag.
#[instrument(skip(self))]
#[instrument(skip_all, fields(%dist))]
pub async fn get_or_build_wheel_metadata(
&self,
dist: &Dist,
Expand Down
65 changes: 52 additions & 13 deletions crates/puffin-resolver/src/resolver/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
//! Given a set of requirements, find a set of compatible packages.

use std::fmt::{Display, Formatter};
use std::sync::Arc;

use anyhow::Result;
use dashmap::{DashMap, DashSet};
use futures::channel::mpsc::UnboundedReceiver;
use futures::channel::mpsc::{
UnboundedReceiver as MpscUnboundedReceiver, UnboundedSender as MpscUnboundedSender,
};
use futures::{FutureExt, StreamExt};
use itertools::Itertools;
use pubgrub::error::PubGrubError;
Expand All @@ -13,7 +16,7 @@ use pubgrub::solver::{Incompatibility, State};
use pubgrub::type_aliases::DependencyConstraints;
use rustc_hash::{FxHashMap, FxHashSet};
use tokio::select;
use tracing::{debug, instrument, trace};
use tracing::{debug, info_span, instrument, trace, Instrument};
use url::Url;

use distribution_filename::WheelFilename;
Expand Down Expand Up @@ -242,7 +245,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
#[instrument(skip_all)]
async fn solve(
&self,
request_sink: &futures::channel::mpsc::UnboundedSender<Request>,
request_sink: &MpscUnboundedSender<Request>,
) -> Result<ResolutionGraph, ResolveError> {
let root = PubGrubPackage::Root(self.project.clone());

Expand Down Expand Up @@ -383,11 +386,11 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {

/// Visit a [`PubGrubPackage`] prior to selection. This should be called on a [`PubGrubPackage`]
/// before it is selected, to allow metadata to be fetched in parallel.
fn visit_package(
async fn visit_package(
&self,
package: &PubGrubPackage,
priorities: &mut PubGrubPriorities,
request_sink: &futures::channel::mpsc::UnboundedSender<Request>,
request_sink: &MpscUnboundedSender<Request>,
) -> Result<(), ResolveError> {
match package {
PubGrubPackage::Root(_) => {}
Expand All @@ -408,14 +411,16 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
}
}
}
// Yield after sending on a channel to allow the subscribers to continue
tokio::task::yield_now().await;
Ok(())
}

/// Visit the set of [`PubGrubPackage`] candidates prior to selection. This allows us to fetch
/// metadata for all of the packages in parallel.
fn pre_visit<'data>(
packages: impl Iterator<Item = (&'data PubGrubPackage, &'data Range<Version>)>,
request_sink: &futures::channel::mpsc::UnboundedSender<Request>,
request_sink: &MpscUnboundedSender<Request>,
) -> Result<(), ResolveError> {
// Iterate over the potential packages, and fetch file metadata for any of them. These
// represent our current best guesses for the versions that we _might_ select.
Expand All @@ -430,14 +435,15 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {

/// Given a set of candidate packages, choose the next package (and version) to add to the
/// partial solution.
#[instrument(skip_all, fields(%package))]
async fn choose_version(
&self,
package: &PubGrubPackage,
range: &Range<Version>,
pins: &mut FilePins,
request_sink: &futures::channel::mpsc::UnboundedSender<Request>,
request_sink: &MpscUnboundedSender<Request>,
) -> Result<Option<Version>, ResolveError> {
return match package {
let result = match package {
PubGrubPackage::Root(_) => Ok(Some(MIN_VERSION.clone())),

PubGrubPackage::Python(PubGrubPython::Installed) => {
Expand Down Expand Up @@ -570,15 +576,19 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
Ok(Some(version))
}
};
// Yield after sending on a channel to allow the subscribers to continue
tokio::task::yield_now().await;
result
}

/// Given a candidate package and version, return its dependencies.
#[instrument(skip_all, fields(%package, %version))]
async fn get_dependencies(
&self,
package: &PubGrubPackage,
version: &Version,
priorities: &mut PubGrubPriorities,
request_sink: &futures::channel::mpsc::UnboundedSender<Request>,
request_sink: &MpscUnboundedSender<Request>,
) -> Result<Dependencies, ResolveError> {
match package {
PubGrubPackage::Root(_) => {
Expand All @@ -604,7 +614,8 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
debug!("Adding direct dependency: {package}{version}");

// Emit a request to fetch the metadata for this package.
self.visit_package(package, priorities, request_sink)?;
self.visit_package(package, priorities, request_sink)
.await?;
}

// Add a dependency on each editable.
Expand Down Expand Up @@ -664,7 +675,12 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
return Ok(Dependencies::Available(constraints));
}

let entry = self.index.distributions.wait(&package_id).await?;
let entry = self
.index
.distributions
.wait(&package_id)
.instrument(info_span!("distributions_wait", %package_id))
.await?;
let metadata = entry.value();

let mut constraints = PubGrubDependencies::from_requirements(
Expand All @@ -680,7 +696,8 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
debug!("Adding transitive dependency: {package}{version}");

// Emit a request to fetch the metadata for this package.
self.visit_package(package, priorities, request_sink)?;
self.visit_package(package, priorities, request_sink)
.await?;
}

// If a package has an extra, insert a constraint on the base package.
Expand All @@ -697,7 +714,10 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
}

/// Fetch the metadata for a stream of packages and versions.
async fn fetch(&self, request_stream: UnboundedReceiver<Request>) -> Result<(), ResolveError> {
async fn fetch(
&self,
request_stream: MpscUnboundedReceiver<Request>,
) -> Result<(), ResolveError> {
let mut response_stream = request_stream
.map(|request| self.process_request(request).boxed())
.buffer_unordered(500);
Expand Down Expand Up @@ -742,11 +762,14 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
}
None => {}
}
// Yield after OnceMap` insertion and sending on a channel to allow the subscribers to continue
tokio::task::yield_now().await;
}

Ok::<(), ResolveError>(())
}

#[instrument(skip_all, fields(%request))]
async fn process_request(&self, request: Request) -> Result<Option<Response>, ResolveError> {
match request {
// Fetch package metadata from the registry.
Expand Down Expand Up @@ -881,6 +904,22 @@ enum Request {
Prefetch(PackageName, Range<Version>),
}

impl Display for Request {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Request::Package(package_name) => {
write!(f, "Versions {package_name}")
}
Request::Dist(dist) => {
write!(f, "Metadata {dist}")
}
Request::Prefetch(package_name, range) => {
write!(f, "Prefetch {package_name} {range}")
}
}
}
}

#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
enum Response {
Expand Down
Loading

0 comments on commit 0f15008

Please sign in to comment.