Skip to content

Commit

Permalink
Yield after channel send and move cpu tasks to thread
Browse files Browse the repository at this point in the history
  • Loading branch information
konstin committed Jan 29, 2024
1 parent 471ef3f commit b347a00
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 61 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ tokio-util = { version = "0.7.10", features = ["compat"] }
toml = { version = "0.8.8" }
toml_edit = { version = "0.21.0" }
tracing = { version = "0.1.40" }
tracing-durations-export = { version = "0.1.0", features = ["plot"] }
tracing-durations-export = { version = "0.1.2", features = ["plot"] }
tracing-indicatif = { version = "0.3.6" }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tracing-tree = { version = "0.3.0" }
Expand Down
12 changes: 8 additions & 4 deletions crates/puffin-client/src/cached_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl CachedClient {
/// client.
#[instrument(skip_all)]
pub async fn get_cached_with_callback<
Payload: Serialize + DeserializeOwned + Send,
Payload: Serialize + DeserializeOwned + Send + 'static,
CallBackError,
Callback,
CallbackReturn,
Expand Down Expand Up @@ -172,7 +172,7 @@ impl CachedClient {
}
}

async fn read_cache<Payload: Serialize + DeserializeOwned + Send>(
async fn read_cache<Payload: Serialize + DeserializeOwned + Send + 'static>(
cache_entry: &CacheEntry,
) -> Option<DataWithCachePolicy<Payload>> {
let read_span = info_span!("read_cache", file = %cache_entry.path().display());
Expand All @@ -185,8 +185,12 @@ impl CachedClient {
"parse_cache",
path = %cache_entry.path().display()
);
let parse_result = parse_span
.in_scope(|| rmp_serde::from_slice::<DataWithCachePolicy<Payload>>(&cached));
let parse_result = tokio::task::spawn_blocking(move || {
parse_span
.in_scope(|| rmp_serde::from_slice::<DataWithCachePolicy<Payload>>(&cached))
})
.await
.expect("Tokio executor failed, was there a panic?");
match parse_result {
Ok(data) => Some(data),
Err(err) => {
Expand Down
2 changes: 1 addition & 1 deletion crates/puffin-dev/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ rustc-hash = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-durations-export = { version = "0.1.0", features = ["plot"] }
tracing-durations-export = { version = "0.1.2", features = ["plot"] }
tracing-indicatif = { workspace = true }
tracing-subscriber = { workspace = true }
url = { workspace = true }
Expand Down
41 changes: 28 additions & 13 deletions crates/puffin-resolver/src/resolver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use std::sync::Arc;

use anyhow::Result;
use dashmap::{DashMap, DashSet};
use futures::channel::mpsc::UnboundedReceiver;
use futures::channel::mpsc::{
UnboundedReceiver as MpscUnboundedReceiver, UnboundedSender as MpscUnboundedSender,
};
use futures::{FutureExt, StreamExt};
use itertools::Itertools;
use pubgrub::error::PubGrubError;
Expand Down Expand Up @@ -243,7 +245,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
#[instrument(skip_all)]
async fn solve(
&self,
request_sink: &futures::channel::mpsc::UnboundedSender<Request>,
request_sink: &MpscUnboundedSender<Request>,
) -> Result<ResolutionGraph, ResolveError> {
let root = PubGrubPackage::Root(self.project.clone());

Expand Down Expand Up @@ -384,11 +386,11 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {

/// Visit a [`PubGrubPackage`] prior to selection. This should be called on a [`PubGrubPackage`]
/// before it is selected, to allow metadata to be fetched in parallel.
fn visit_package(
async fn visit_package(
&self,
package: &PubGrubPackage,
priorities: &mut PubGrubPriorities,
request_sink: &futures::channel::mpsc::UnboundedSender<Request>,
request_sink: &MpscUnboundedSender<Request>,
) -> Result<(), ResolveError> {
match package {
PubGrubPackage::Root(_) => {}
Expand All @@ -409,14 +411,16 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
}
}
}
// Yield after sending on a channel to allow the subscribers to continue
tokio::task::yield_now().await;
Ok(())
}

/// Visit the set of [`PubGrubPackage`] candidates prior to selection. This allows us to fetch
/// metadata for all of the packages in parallel.
fn pre_visit<'data>(
packages: impl Iterator<Item = (&'data PubGrubPackage, &'data Range<Version>)>,
request_sink: &futures::channel::mpsc::UnboundedSender<Request>,
request_sink: &MpscUnboundedSender<Request>,
) -> Result<(), ResolveError> {
// Iterate over the potential packages, and fetch file metadata for any of them. These
// represent our current best guesses for the versions that we _might_ select.
Expand All @@ -437,9 +441,9 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
package: &PubGrubPackage,
range: &Range<Version>,
pins: &mut FilePins,
request_sink: &futures::channel::mpsc::UnboundedSender<Request>,
request_sink: &MpscUnboundedSender<Request>,
) -> Result<Option<Version>, ResolveError> {
return match package {
let result = match package {
PubGrubPackage::Root(_) => Ok(Some(MIN_VERSION.clone())),

PubGrubPackage::Python(PubGrubPython::Installed) => {
Expand Down Expand Up @@ -577,15 +581,19 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
Ok(Some(version))
}
};
// Yield after sending on a channel to allow the subscribers to continue
tokio::task::yield_now().await;
result
}

/// Given a candidate package and version, return its dependencies.
#[instrument(skip_all, fields(%package, %version))]
async fn get_dependencies(
&self,
package: &PubGrubPackage,
version: &Version,
priorities: &mut PubGrubPriorities,
request_sink: &futures::channel::mpsc::UnboundedSender<Request>,
request_sink: &MpscUnboundedSender<Request>,
) -> Result<Dependencies, ResolveError> {
match package {
PubGrubPackage::Root(_) => {
Expand All @@ -611,7 +619,8 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
debug!("Adding direct dependency: {package}{version}");

// Emit a request to fetch the metadata for this package.
self.visit_package(package, priorities, request_sink)?;
self.visit_package(package, priorities, request_sink)
.await?;
}

// Add a dependency on each editable.
Expand Down Expand Up @@ -692,7 +701,8 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
debug!("Adding transitive dependency: {package}{version}");

// Emit a request to fetch the metadata for this package.
self.visit_package(package, priorities, request_sink)?;
self.visit_package(package, priorities, request_sink)
.await?;
}

// If a package has an extra, insert a constraint on the base package.
Expand All @@ -709,7 +719,10 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
}

/// Fetch the metadata for a stream of packages and versions.
async fn fetch(&self, request_stream: UnboundedReceiver<Request>) -> Result<(), ResolveError> {
async fn fetch(
&self,
request_stream: MpscUnboundedReceiver<Request>,
) -> Result<(), ResolveError> {
let mut response_stream = request_stream
.map(|request| self.process_request(request).boxed())
.buffer_unordered(500);
Expand Down Expand Up @@ -754,6 +767,8 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
}
None => {}
}
// Yield after OnceMap` insertion and sending on a channel to allow the subscribers to continue
tokio::task::yield_now().await;
}

Ok::<(), ResolveError>(())
Expand Down Expand Up @@ -898,10 +913,10 @@ impl Display for Request {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Request::Package(package_name) => {
write!(f, "Package {package_name}")
write!(f, "Versions {package_name}")
}
Request::Dist(dist) => {
write!(f, "Dist {dist}")
write!(f, "Metadata {dist}")
}
Request::Prefetch(package_name, range) => {
write!(f, "Prefetch {package_name} {range}")
Expand Down
111 changes: 72 additions & 39 deletions crates/puffin-resolver/src/resolver/provider.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use std::future::Future;
use std::ops::Deref;
use std::sync::Arc;

use anyhow::Result;
use chrono::{DateTime, Utc};
use futures::FutureExt;
use url::Url;

use distribution_types::Dist;
use distribution_types::{Dist, IndexUrl};
use platform_tags::Tags;
use puffin_client::{FlatIndex, RegistryClient};
use puffin_client::{FlatIndex, RegistryClient, SimpleMetadata};
use puffin_distribution::DistributionDatabase;
use puffin_normalize::PackageName;
use puffin_traits::{BuildContext, NoBinary};
Expand Down Expand Up @@ -45,17 +47,29 @@ pub trait ResolverProvider: Send + Sync {
/// The main IO backend for the resolver, which does cached requests network requests using the
/// [`RegistryClient`] and [`DistributionDatabase`].
pub struct DefaultResolverProvider<'a, Context: BuildContext + Send + Sync> {
/// The [`RegistryClient`] used to query the index.
client: &'a RegistryClient,
/// The [`DistributionDatabase`] used to build source distributions.
fetcher: DistributionDatabase<'a, Context>,
inner: Arc<DefaultResolverProviderInner>,
}

pub struct DefaultResolverProviderInner {
/// The [`RegistryClient`] used to query the index.
client: RegistryClient,
/// These are the entries from `--find-links` that act as overrides for index responses.
flat_index: &'a FlatIndex,
tags: &'a Tags,
flat_index: FlatIndex,
tags: Tags,
python_requirement: PythonRequirement,
exclude_newer: Option<DateTime<Utc>>,
allowed_yanks: AllowedYanks,
no_binary: &'a NoBinary,
no_binary: NoBinary,
}

impl<'a, Context: BuildContext + Send + Sync> Deref for DefaultResolverProvider<'a, Context> {
type Target = DefaultResolverProviderInner;

fn deref(&self) -> &Self::Target {
self.inner.as_ref()
}
}

impl<'a, Context: BuildContext + Send + Sync> DefaultResolverProvider<'a, Context> {
Expand All @@ -72,50 +86,69 @@ impl<'a, Context: BuildContext + Send + Sync> DefaultResolverProvider<'a, Contex
no_binary: &'a NoBinary,
) -> Self {
Self {
client,
fetcher,
flat_index,
tags,
python_requirement,
exclude_newer,
allowed_yanks,
no_binary,
inner: Arc::new(DefaultResolverProviderInner {
client: client.clone(),
flat_index: flat_index.clone(),
tags: tags.clone(),
python_requirement,
exclude_newer,
allowed_yanks,
no_binary: no_binary.clone(),
}),
}
}
}

fn simple_to_version_map(
self_arc: Arc<DefaultResolverProviderInner>,
result: Result<(IndexUrl, SimpleMetadata), puffin_client::Error>,
package_name: &PackageName,
) -> VersionMapResponse {
match result {
Ok((index, metadata)) => Ok(VersionMap::from_metadata(
metadata,
&package_name,
&index,
&self_arc.tags,
&self_arc.python_requirement,
&self_arc.allowed_yanks,
self_arc.exclude_newer.as_ref(),
self_arc.flat_index.get(&package_name).cloned(),
&self_arc.no_binary,
)),
Err(err) => match err.into_kind() {
kind @ (puffin_client::ErrorKind::PackageNotFound(_)
| puffin_client::ErrorKind::NoIndex(_)) => {
if let Some(flat_index) = self_arc.flat_index.get(&package_name).cloned() {
Ok(VersionMap::from(flat_index))
} else {
Err(kind.into())
}
}
kind => Err(kind.into()),
},
}
}

impl<'a, Context: BuildContext + Send + Sync> ResolverProvider
for DefaultResolverProvider<'a, Context>
{
fn get_version_map<'io>(
&'io self,
package_name: &'io PackageName,
) -> impl Future<Output = VersionMapResponse> + Send + 'io {
self.client
.simple(package_name)
.map(move |result| match result {
Ok((index, metadata)) => Ok(VersionMap::from_metadata(
metadata,
package_name,
&index,
self.tags,
&self.python_requirement,
&self.allowed_yanks,
self.exclude_newer.as_ref(),
self.flat_index.get(package_name).cloned(),
self.no_binary,
)),
Err(err) => match err.into_kind() {
kind @ (puffin_client::ErrorKind::PackageNotFound(_)
| puffin_client::ErrorKind::NoIndex(_)) => {
if let Some(flat_index) = self.flat_index.get(package_name).cloned() {
Ok(VersionMap::from(flat_index))
} else {
Err(kind.into())
}
}
kind => Err(kind.into()),
},
let package_name_owned = package_name.clone();
let self_arc = self.inner.clone();
self.inner
.client
.simple(&package_name)
.then(|result| async move {
tokio::task::spawn_blocking(move || {
simple_to_version_map(self_arc, result, &package_name_owned)
})
.await
.unwrap()
})
}

Expand Down
2 changes: 1 addition & 1 deletion crates/puffin-traits/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl Display for BuildKind {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum NoBinary {
/// Allow installation of any wheel.
None,
Expand Down

0 comments on commit b347a00

Please sign in to comment.