Skip to content

Commit

Permalink
Avoid batch prefetching for un-optimized registries
Browse files Browse the repository at this point in the history
  • Loading branch information
charliermarsh committed Sep 9, 2024
1 parent d9cd282 commit 494555c
Show file tree
Hide file tree
Showing 23 changed files with 193 additions and 90 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion crates/bench/benches/uv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ mod resolver {

use anyhow::Result;

use distribution_types::IndexLocations;
use distribution_types::{IndexCapabilities, IndexLocations};
use install_wheel_rs::linker::LinkMode;
use pep440_rs::Version;
use pep508_rs::{MarkerEnvironment, MarkerEnvironmentBuilder};
Expand Down Expand Up @@ -152,6 +152,7 @@ mod resolver {
);
let flat_index = FlatIndex::default();
let git = GitResolver::default();
let capabilities = IndexCapabilities::default();
let hashes = HashStrategy::None;
let in_flight = InFlight::default();
let index = InMemoryIndex::default();
Expand Down Expand Up @@ -179,6 +180,7 @@ mod resolver {
&flat_index,
&index,
&git,
&capabilities,
&in_flight,
IndexStrategy::default(),
&config_settings,
Expand Down
1 change: 1 addition & 0 deletions crates/distribution-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ fs-err = { workspace = true }
itertools = { workspace = true }
jiff = { workspace = true }
rkyv = { workspace = true }
rustc-hash = { workspace = true }
schemars = { workspace = true, optional = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
Expand Down
27 changes: 26 additions & 1 deletion crates/distribution-types/src/index_url.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use itertools::Either;
use rustc_hash::FxHashSet;
use std::borrow::Cow;
use std::fmt::{Display, Formatter};
use std::ops::Deref;
use std::path::Path;
use std::str::FromStr;
use std::sync::LazyLock;
use std::sync::{Arc, LazyLock, RwLock};
use thiserror::Error;
use url::{ParseError, Url};

Expand Down Expand Up @@ -485,3 +486,27 @@ impl From<IndexLocations> for IndexUrls {
}
}
}

/// A map of [`IndexUrl`]s to their capabilities.
///
/// For now, we only support a single capability (range requests), and we only store an index if
/// it _doesn't_ support range requests. The benefit is that the map is almost always empty, so
/// validating capabilities is extremely cheap.
#[derive(Debug, Default, Clone)]
pub struct IndexCapabilities(Arc<RwLock<FxHashSet<IndexUrl>>>);

impl IndexCapabilities {
/// Returns `true` if the given [`IndexUrl`] supports range requests.
pub fn supports_range_requests(&self, index_url: &IndexUrl) -> bool {
!self.0.read().unwrap().contains(index_url)
}

/// Mark an [`IndexUrl`] as not supporting range requests.
pub fn set_supports_range_requests(&self, index_url: IndexUrl, supports: bool) {
if supports {
self.0.write().unwrap().remove(&index_url);
} else {
self.0.write().unwrap().insert(index_url);
}
}
}
17 changes: 8 additions & 9 deletions crates/distribution-types/src/prioritized_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,15 +468,14 @@ impl<'a> CompatibleDist<'a> {
}
}

/// Returns whether the distribution is a source distribution.
///
/// Avoid building source distributions we don't need.
pub fn prefetchable(&self) -> bool {
match *self {
CompatibleDist::SourceDist { .. } => false,
CompatibleDist::InstalledDist(_)
| CompatibleDist::CompatibleWheel { .. }
| CompatibleDist::IncompatibleWheel { .. } => true,
/// Returns a [`RegistryBuiltWheel`] if the distribution includes a compatible or incompatible
/// wheel.
pub fn wheel(&self) -> Option<&RegistryBuiltWheel> {
match self {
CompatibleDist::InstalledDist(_) => None,
CompatibleDist::SourceDist { .. } => None,
CompatibleDist::CompatibleWheel { wheel, .. } => Some(wheel),
CompatibleDist::IncompatibleWheel { wheel, .. } => Some(wheel),
}
}
}
Expand Down
158 changes: 91 additions & 67 deletions crates/uv-client/src/registry_client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::io;
use std::path::PathBuf;
use std::str::FromStr;

Expand All @@ -16,7 +15,9 @@ use tracing::{info_span, instrument, trace, warn, Instrument};
use url::Url;

use distribution_filename::{DistFilename, SourceDistFilename, WheelFilename};
use distribution_types::{BuiltDist, File, FileLocation, IndexUrl, IndexUrls, Name};
use distribution_types::{
BuiltDist, File, FileLocation, IndexCapabilities, IndexUrl, IndexUrls, Name, RegistryBuiltDist,
};
use install_wheel_rs::metadata::{find_archive_dist_info, is_metadata_entry};
use pep440_rs::Version;
use pep508_rs::MarkerEnvironment;
Expand Down Expand Up @@ -147,7 +148,7 @@ impl<'a> RegistryClientBuilder<'a> {
}

impl<'a> TryFrom<BaseClientBuilder<'a>> for RegistryClientBuilder<'a> {
type Error = io::Error;
type Error = std::io::Error;

fn try_from(value: BaseClientBuilder<'a>) -> Result<Self, Self::Error> {
Ok(Self {
Expand Down Expand Up @@ -402,7 +403,11 @@ impl RegistryClient {
/// 2. From a remote wheel by partial zip reading
/// 3. From a (temp) download of a remote wheel (this is a fallback, the webserver should support range requests)
#[instrument(skip_all, fields(% built_dist))]
pub async fn wheel_metadata(&self, built_dist: &BuiltDist) -> Result<Metadata23, Error> {
pub async fn wheel_metadata(
&self,
built_dist: &BuiltDist,
capabilities: &IndexCapabilities,
) -> Result<Metadata23, Error> {
let metadata = match &built_dist {
BuiltDist::Registry(wheels) => {
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -451,7 +456,7 @@ impl RegistryClient {
.await?
}
WheelLocation::Url(url) => {
self.wheel_metadata_registry(&wheel.index, &wheel.file, &url)
self.wheel_metadata_registry(&wheel.index, &wheel.file, &url, capabilities)
.await?
}
}
Expand All @@ -460,7 +465,9 @@ impl RegistryClient {
self.wheel_metadata_no_pep658(
&wheel.filename,
&wheel.url,
None,
WheelCache::Url(&wheel.url),
capabilities,
)
.await?
}
Expand Down Expand Up @@ -489,6 +496,7 @@ impl RegistryClient {
index: &IndexUrl,
file: &File,
url: &Url,
capabilities: &IndexCapabilities,
) -> Result<Metadata23, Error> {
// If the metadata file is available at its own url (PEP 658), download it from there.
let filename = WheelFilename::from_str(&file.filename).map_err(ErrorKind::WheelFilename)?;
Expand Down Expand Up @@ -536,8 +544,14 @@ impl RegistryClient {
// If we lack PEP 658 support, try using HTTP range requests to read only the
// `.dist-info/METADATA` file from the zip, and if that also fails, download the whole wheel
// into the cache and read from there
self.wheel_metadata_no_pep658(&filename, url, WheelCache::Index(index))
.await
self.wheel_metadata_no_pep658(
&filename,
url,
Some(index),
WheelCache::Index(index),
capabilities,
)
.await
}
}

Expand All @@ -546,7 +560,9 @@ impl RegistryClient {
&self,
filename: &'data WheelFilename,
url: &'data Url,
index: Option<&'data IndexUrl>,
cache_shard: WheelCache<'data>,
capabilities: &'data IndexCapabilities,
) -> Result<Metadata23, Error> {
let cache_entry = self.cache.entry(
CacheBucket::Wheels,
Expand All @@ -562,72 +578,80 @@ impl RegistryClient {
Connectivity::Offline => CacheControl::AllowStale,
};

let req = self
.uncached_client(url)
.head(url.clone())
.header(
"accept-encoding",
http::HeaderValue::from_static("identity"),
)
.build()
.map_err(ErrorKind::from)?;
// Attempt to fetch via a range request.
if index.map_or(true, |index| capabilities.supports_range_requests(index)) {
let req = self
.uncached_client(url)
.head(url.clone())
.header(
"accept-encoding",
http::HeaderValue::from_static("identity"),
)
.build()
.map_err(ErrorKind::from)?;

// Copy authorization headers from the HEAD request to subsequent requests
let mut headers = HeaderMap::default();
if let Some(authorization) = req.headers().get("authorization") {
headers.append("authorization", authorization.clone());
}
// Copy authorization headers from the HEAD request to subsequent requests
let mut headers = HeaderMap::default();
if let Some(authorization) = req.headers().get("authorization") {
headers.append("authorization", authorization.clone());
}

// This response callback is special, we actually make a number of subsequent requests to
// fetch the file from the remote zip.
let read_metadata_range_request = |response: Response| {
async {
let mut reader = AsyncHttpRangeReader::from_head_response(
self.uncached_client(url).clone(),
response,
url.clone(),
headers,
// This response callback is special, we actually make a number of subsequent requests to
// fetch the file from the remote zip.
let read_metadata_range_request = |response: Response| {
async {
let mut reader = AsyncHttpRangeReader::from_head_response(
self.uncached_client(url).clone(),
response,
url.clone(),
headers,
)
.await
.map_err(ErrorKind::AsyncHttpRangeReader)?;
trace!("Getting metadata for {filename} by range request");
let text = wheel_metadata_from_remote_zip(filename, &mut reader).await?;
let metadata = Metadata23::parse_metadata(text.as_bytes()).map_err(|err| {
Error::from(ErrorKind::MetadataParseError(
filename.clone(),
url.to_string(),
Box::new(err),
))
})?;
Ok::<Metadata23, CachedClientError<Error>>(metadata)
}
.boxed_local()
.instrument(info_span!("read_metadata_range_request", wheel = %filename))
};

let result = self
.cached_client()
.get_serde(
req,
&cache_entry,
cache_control,
read_metadata_range_request,
)
.await
.map_err(ErrorKind::AsyncHttpRangeReader)?;
trace!("Getting metadata for {filename} by range request");
let text = wheel_metadata_from_remote_zip(filename, &mut reader).await?;
let metadata = Metadata23::parse_metadata(text.as_bytes()).map_err(|err| {
Error::from(ErrorKind::MetadataParseError(
filename.clone(),
url.to_string(),
Box::new(err),
))
})?;
Ok::<Metadata23, CachedClientError<Error>>(metadata)
}
.boxed_local()
.instrument(info_span!("read_metadata_range_request", wheel = %filename))
};
.map_err(crate::Error::from);

let result = self
.cached_client()
.get_serde(
req,
&cache_entry,
cache_control,
read_metadata_range_request,
)
.await
.map_err(crate::Error::from);

match result {
Ok(metadata) => return Ok(metadata),
Err(err) => {
if err.is_http_range_requests_unsupported() {
// The range request version failed. Fall back to streaming the file to search
// for the METADATA file.
warn!("Range requests not supported for {filename}; streaming wheel");
} else {
return Err(err);
match result {
Ok(metadata) => return Ok(metadata),
Err(err) => {
if err.is_http_range_requests_unsupported() {
// The range request version failed. Fall back to streaming the file to search
// for the METADATA file.
warn!("Range requests not supported for {filename}; streaming wheel");

// Mark the index as not supporting range requests.
if let Some(index) = index {
capabilities.set_supports_range_requests(index.clone(), false);
}
} else {
return Err(err);
}
}
}
};
};
}

// Create a request to stream the file.
let req = self
Expand Down
5 changes: 3 additions & 2 deletions crates/uv-client/tests/remote_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use anyhow::Result;
use url::Url;

use distribution_filename::WheelFilename;
use distribution_types::{BuiltDist, DirectUrlBuiltDist};
use distribution_types::{BuiltDist, DirectUrlBuiltDist, IndexCapabilities};
use pep508_rs::VerbatimUrl;
use uv_cache::Cache;
use uv_client::RegistryClientBuilder;
Expand All @@ -24,7 +24,8 @@ async fn remote_metadata_with_and_without_cache() -> Result<()> {
location: Url::parse(url).unwrap(),
url: VerbatimUrl::from_str(url).unwrap(),
});
let metadata = client.wheel_metadata(&dist).await.unwrap();
let capabilities = IndexCapabilities::default();
let metadata = client.wheel_metadata(&dist, &capabilities).await.unwrap();
assert_eq!(metadata.version.to_string(), "4.66.1");
}

Expand Down
Loading

0 comments on commit 494555c

Please sign in to comment.