From 0c86901f1b96b1063310f33764332ae844787c2f Mon Sep 17 00:00:00 2001 From: ritchie Date: Tue, 20 Feb 2024 05:36:49 +0100 Subject: [PATCH] feat: Properly cache object-stores --- .../polars-io/src/cloud/object_store_setup.rs | 39 ++++++++++++------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/crates/polars-io/src/cloud/object_store_setup.rs b/crates/polars-io/src/cloud/object_store_setup.rs index 64860be34183..94fe4563f9d3 100644 --- a/crates/polars-io/src/cloud/object_store_setup.rs +++ b/crates/polars-io/src/cloud/object_store_setup.rs @@ -1,17 +1,17 @@ use once_cell::sync::Lazy; pub use options::*; use polars_error::to_compute_err; +use polars_utils::aliases::PlHashMap; use tokio::sync::RwLock; +use url::Url; use super::*; -type CacheKey = (String, Option); - -/// A very simple cache that only stores a single object-store. -/// This greatly reduces the query times as multiple object stores (when reading many small files) +/// Object stores must be cached. Every object-store will do DNS lookups and /// get rate limited when querying the DNS (can take up to 5s). +/// Other reasons are connection pools that must be shared between as much as possible. #[allow(clippy::type_complexity)] -static OBJECT_STORE_CACHE: Lazy)>>> = +static OBJECT_STORE_CACHE: Lazy>>> = Lazy::new(Default::default); type BuildResult = PolarsResult<(CloudLocation, Arc)>; @@ -24,26 +24,31 @@ fn err_missing_feature(feature: &str, scheme: &str) -> BuildResult { ); } +/// Get the key of a url for object store registration. +/// The credential info will be removed +fn url_to_key(url: &Url) -> String { + format!( + "{}://{}", + url.scheme(), + &url[url::Position::BeforeHost..url::Position::AfterPort], + ) +} + /// Build an [`ObjectStore`] based on the URL and passed in url. Return the cloud location and an implementation of the object store. pub async fn build_object_store(url: &str, options: Option<&CloudOptions>) -> BuildResult { let parsed = parse_url(url).map_err(to_compute_err)?; let cloud_location = CloudLocation::from_url(&parsed)?; - let options = options.cloned(); - let key = (url.to_string(), options); + let key = url_to_key(&parsed); { let cache = OBJECT_STORE_CACHE.read().await; - if let Some((stored_key, store)) = cache.as_ref() { - if stored_key == &key { - return Ok((cloud_location, store.clone())); - } + if let Some(store) = cache.get(&key) { + return Ok((cloud_location, store.clone())); } } - let options = key - .1 - .as_ref() + let options = options .map(Cow::Borrowed) .unwrap_or_else(|| Cow::Owned(Default::default())); @@ -98,6 +103,10 @@ pub async fn build_object_store(url: &str, options: Option<&CloudOptions>) -> Bu }, }?; let mut cache = OBJECT_STORE_CACHE.write().await; - *cache = Some((key, store.clone())); + // Clear the cache if we surpass a certain amount of buckets. Don't expect that to happen. + if cache.len() > 512 { + cache.clear() + } + cache.insert(key, store.clone()); Ok((cloud_location, store)) }