Skip to content

Commit

Permalink
Faster urllib/boto3
Browse files Browse the repository at this point in the history
  • Loading branch information
konstin committed Apr 5, 2024
1 parent 6c1ceed commit 6c6dbd3
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 7 deletions.
11 changes: 10 additions & 1 deletion crates/distribution-types/src/resolved.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt::Display;
use std::fmt::{Display, Formatter};

use pep508_rs::PackageName;

Expand Down Expand Up @@ -42,6 +42,15 @@ impl ResolvedDistRef<'_> {
}
}

impl Display for ResolvedDistRef<'_> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Installable(dist) => Display::fmt(dist, f),
Self::Installed(dist) => Display::fmt(dist, f),
}
}
}

impl Name for ResolvedDistRef<'_> {
fn name(&self) -> &PackageName {
match self {
Expand Down
3 changes: 2 additions & 1 deletion crates/uv-resolver/src/pubgrub/dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use uv_types::{Constraints, Overrides};

use crate::pubgrub::specifier::PubGrubSpecifier;
use crate::pubgrub::PubGrubPackage;
use crate::resolver::{Locals, Urls};
use crate::resolver::urls::Urls;
use crate::resolver::Locals;
use crate::ResolveError;

#[derive(Debug)]
Expand Down
2 changes: 1 addition & 1 deletion crates/uv-resolver/src/pubgrub/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use derivative::Derivative;
use pep508_rs::VerbatimUrl;
use uv_normalize::{ExtraName, PackageName};

use crate::resolver::Urls;
use crate::resolver::urls::Urls;

/// A PubGrub-compatible wrapper around a "Python package", with two notable characteristics:
///
Expand Down
161 changes: 161 additions & 0 deletions crates/uv-resolver/src/resolver/batch_prefetch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
use std::cmp::min;

use pubgrub::range::Range;
use rustc_hash::FxHashMap;
use tokio::sync::mpsc::Sender;
use tracing::{debug, trace};

use distribution_types::{DistributionMetadata, ResolvedDistRef};
use pep440_rs::Version;

use crate::candidate_selector::{CandidateDist, CandidateSelector};
use crate::pubgrub::PubGrubPackage;
use crate::resolver::Request;
use crate::{InMemoryIndex, ResolveError, VersionsResponse};

enum BatchPrefetchStrategy {
/// Go through the next versions assuming the existing selection and its constraints
/// remain.
Compatible(Range<Version>, Version),
/// We encounter cases (botocore) where the above doesn't work: Say we previously selected
/// a==x.y.z, which depends on b==x.y.z. a==x.y.z is incompatible, but we don't know that
/// yet. We just select b==x.y.z and want to prefetch, since for all versions of a we try,
/// we have to wait for the matching version of b. The selector gives us only one version of
/// b, so we're now here 0 versions prefetched. Instead, we guess that the next version of b
/// will be x.y.(z-1) and so forth.
InOrder(Version),
}

/// Prefetch a large number of versions if we already unsuccessfully tried many versions.
///
/// This is an optimization specifically targeted at cold cache urllib3/boto3/botocore, where we
/// have to fetch the metadata for a lot of versions.
#[derive(Default)]
pub(crate) struct BatchPrefetcher {
tried_versions: FxHashMap<PubGrubPackage, usize>,
last_prefetch: FxHashMap<PubGrubPackage, usize>,
}

impl BatchPrefetcher {
/// Prefetch a large number of versions if we already unsuccessfully tried many versions.
pub(crate) async fn prefetch_batches(
&mut self,
next: &PubGrubPackage,
version: &Version,
current_range: &Range<Version>,
request_sink: &Sender<Request>,
index: &InMemoryIndex,
selector: &CandidateSelector,
) -> anyhow::Result<(), ResolveError> {
let PubGrubPackage::Package(package_name, _, _) = &next else {
return Ok(());
};

let (num_tried, do_prefetch) = self.should_prefetch(next);
if !do_prefetch {
return Ok(());
}
let total_prefetch = min(num_tried, 50);
let mut counter = total_prefetch;

// This is immediate, we already fetched the version map.
let versions_response = index
.packages
.wait(package_name)
.await
.ok_or(ResolveError::Unregistered)?;

let VersionsResponse::Found(ref version_map) = *versions_response else {
return Ok(());
};

let mut phase = BatchPrefetchStrategy::Compatible(current_range.clone(), version.clone());
while counter > 0 {
counter -= 1;
let candidate = match phase {
BatchPrefetchStrategy::Compatible(range, last_version) => {
if let Some(candidate) =
selector.select_no_preference(package_name, &range, version_map)
{
let range = range.intersection(
&Range::singleton(candidate.version().clone()).complement(),
);
phase =
BatchPrefetchStrategy::Compatible(range, candidate.version().clone());
candidate
} else {
// We exhausted the compatible version, switch to ignoring the existing
// constraints on the package and instead going through versions in order.
phase = BatchPrefetchStrategy::InOrder(last_version);
continue;
}
}
BatchPrefetchStrategy::InOrder(last_version) => {
let range = if selector.use_highest_version(package_name) {
Range::strictly_lower_than(last_version)
} else {
Range::strictly_higher_than(last_version)
};
if let Some(candidate) =
selector.select_no_preference(package_name, &range, version_map)
{
phase = BatchPrefetchStrategy::InOrder(candidate.version().clone());
candidate
} else {
// Both strategies exhausted their candidates.
break;
}
}
};

let CandidateDist::Compatible(dist) = candidate.dist() else {
continue;
};
let dist = dist.for_resolution();

// Emit a request to fetch the metadata for this version.
trace!(
"Prefetching ({}) {}",
match phase {
BatchPrefetchStrategy::Compatible(_, _) => "compatible",
BatchPrefetchStrategy::InOrder(_) => "in order",
},
dist
);
if index.distributions.register(candidate.package_id()) {
let request = match dist {
ResolvedDistRef::Installable(dist) => Request::Dist(dist.clone()),
ResolvedDistRef::Installed(dist) => Request::Installed(dist.clone()),
};
request_sink.send(request).await?;
}
}

debug!(
"Prefetching {} {} versions",
total_prefetch - counter,
package_name
);

self.last_prefetch.insert(next.clone(), num_tried);
Ok(())
}

/// Each time we tried a version for a package, we register that here.
pub(crate) fn version_tried(&mut self, package: PubGrubPackage) {
*self.tried_versions.entry(package).or_default() += 1;
}

/// After 5, 10, 20, 40 tried versions, prefetch that many versions to start early but not
/// too aggressive. Later we schedule the prefetch of 50 versions every 20 versions, this gives
/// us a good buffer until we see prefetch again and is high enough to saturate the task pool.
fn should_prefetch(&self, next: &PubGrubPackage) -> (usize, bool) {
let num_tried = self.tried_versions.get(next).copied().unwrap_or_default();
let previous_prefetch = self.last_prefetch.get(next).copied().unwrap_or_default();
let do_prefetch = (num_tried >= 5 && previous_prefetch < 5)
|| (num_tried >= 10 && previous_prefetch < 10)
|| (num_tried >= 20 && previous_prefetch < 20)
|| (num_tried >= 20 && num_tried - previous_prefetch >= 20);
(num_tried, do_prefetch)
}
}
24 changes: 20 additions & 4 deletions crates/uv-resolver/src/resolver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use pep440_rs::{Version, MIN_VERSION};
use pep508_rs::{MarkerEnvironment, Requirement};
use platform_tags::Tags;
use pypi_types::Metadata23;
pub(crate) use urls::Urls;
use urls::Urls;
use uv_client::{FlatIndex, RegistryClient};
use uv_distribution::DistributionDatabase;
use uv_interpreter::Interpreter;
Expand All @@ -44,6 +44,7 @@ use crate::pubgrub::{
};
use crate::python_requirement::PythonRequirement;
use crate::resolution::ResolutionGraph;
use crate::resolver::batch_prefetch::BatchPrefetcher;
pub use crate::resolver::index::InMemoryIndex;
pub use crate::resolver::provider::{
DefaultResolverProvider, PackageVersionsResult, ResolverProvider, VersionsResponse,
Expand All @@ -54,11 +55,12 @@ pub use crate::resolver::reporter::{BuildId, Reporter};
use crate::yanks::AllowedYanks;
use crate::{DependencyMode, Exclusions, Options};

mod batch_prefetch;
mod index;
mod locals;
mod provider;
mod reporter;
mod urls;
pub(crate) mod urls;

/// The package version is unavailable and cannot be used
/// Unlike [`PackageUnavailable`] this applies to a single version of the package
Expand Down Expand Up @@ -214,13 +216,13 @@ impl<
// A channel to fetch package metadata (e.g., given `flask`, fetch all versions) and version
// metadata (e.g., given `flask==1.0.0`, fetch the metadata for that version).
// Channel size is set to the same size as the task buffer for simplicity.
let (request_sink, request_stream) = tokio::sync::mpsc::channel(50);
let (request_sink, request_stream) = tokio::sync::mpsc::channel(300);

// Run the fetcher.
let requests_fut = self.fetch(request_stream).fuse();

// Run the solver.
let resolve_fut = self.solve(request_sink).fuse();
let resolve_fut = self.solve(request_sink).boxed().fuse();

// Wait for both to complete.
match tokio::try_join!(requests_fut, resolve_fut) {
Expand Down Expand Up @@ -256,6 +258,7 @@ impl<
request_sink: tokio::sync::mpsc::Sender<Request>,
) -> Result<ResolutionGraph, ResolveError> {
let root = PubGrubPackage::Root(self.project.clone());
let mut prefetcher = BatchPrefetcher::default();

// Keep track of the packages for which we've requested metadata.
let mut pins = FilePins::default();
Expand Down Expand Up @@ -304,6 +307,8 @@ impl<
};
next = highest_priority_pkg;

prefetcher.version_tried(next.clone());

let term_intersection = state
.partial_solution
.term_intersection_for_package(&next)
Expand Down Expand Up @@ -405,6 +410,17 @@ impl<
}
};

prefetcher
.prefetch_batches(
&next,
&version,
term_intersection.unwrap_positive(),
&request_sink,
self.index,
&self.selector,
)
.await?;

self.on_progress(&next, &version);

if added_dependencies
Expand Down

0 comments on commit 6c6dbd3

Please sign in to comment.