Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Properly cache object-stores #14598

Merged
merged 1 commit into from
Feb 20, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 24 additions & 15 deletions crates/polars-io/src/cloud/object_store_setup.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use once_cell::sync::Lazy;
pub use options::*;
use polars_error::to_compute_err;
use polars_utils::aliases::PlHashMap;
use tokio::sync::RwLock;
use url::Url;

use super::*;

type CacheKey = (String, Option<CloudOptions>);

/// A very simple cache that only stores a single object-store.
/// This greatly reduces the query times as multiple object stores (when reading many small files)
/// Object stores must be cached. Every object-store will do DNS lookups and
/// get rate limited when querying the DNS (can take up to 5s).
/// Other reasons are connection pools that must be shared between as much as possible.
#[allow(clippy::type_complexity)]
static OBJECT_STORE_CACHE: Lazy<RwLock<Option<(CacheKey, Arc<dyn ObjectStore>)>>> =
static OBJECT_STORE_CACHE: Lazy<RwLock<PlHashMap<String, Arc<dyn ObjectStore>>>> =
Lazy::new(Default::default);

type BuildResult = PolarsResult<(CloudLocation, Arc<dyn ObjectStore>)>;
Expand All @@ -24,26 +24,31 @@ fn err_missing_feature(feature: &str, scheme: &str) -> BuildResult {
);
}

/// Get the key of a url for object store registration.
/// The credential info will be removed
fn url_to_key(url: &Url) -> String {
format!(
"{}://{}",
url.scheme(),
&url[url::Position::BeforeHost..url::Position::AfterPort],
)
}

/// Build an [`ObjectStore`] based on the URL and passed in url. Return the cloud location and an implementation of the object store.
pub async fn build_object_store(url: &str, options: Option<&CloudOptions>) -> BuildResult {
let parsed = parse_url(url).map_err(to_compute_err)?;
let cloud_location = CloudLocation::from_url(&parsed)?;

let options = options.cloned();
let key = (url.to_string(), options);
let key = url_to_key(&parsed);

{
let cache = OBJECT_STORE_CACHE.read().await;
if let Some((stored_key, store)) = cache.as_ref() {
if stored_key == &key {
return Ok((cloud_location, store.clone()));
}
if let Some(store) = cache.get(&key) {
return Ok((cloud_location, store.clone()));
}
}

let options = key
.1
.as_ref()
let options = options
.map(Cow::Borrowed)
.unwrap_or_else(|| Cow::Owned(Default::default()));

Expand Down Expand Up @@ -98,6 +103,10 @@ pub async fn build_object_store(url: &str, options: Option<&CloudOptions>) -> Bu
},
}?;
let mut cache = OBJECT_STORE_CACHE.write().await;
*cache = Some((key, store.clone()));
// Clear the cache if we surpass a certain amount of buckets. Don't expect that to happen.
if cache.len() > 512 {
cache.clear()
}
cache.insert(key, store.clone());
Ok((cloud_location, store))
}
Loading