Skip to content

Commit

Permalink
feat: Properly cache object-stores
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Feb 20, 2024
1 parent 7cf1045 commit 0c86901
Showing 1 changed file with 24 additions and 15 deletions.
39 changes: 24 additions & 15 deletions crates/polars-io/src/cloud/object_store_setup.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use once_cell::sync::Lazy;
pub use options::*;
use polars_error::to_compute_err;
use polars_utils::aliases::PlHashMap;
use tokio::sync::RwLock;
use url::Url;

use super::*;

type CacheKey = (String, Option<CloudOptions>);

/// A very simple cache that only stores a single object-store.
/// This greatly reduces the query times as multiple object stores (when reading many small files)
/// Object stores must be cached. Every object-store will do DNS lookups and
/// get rate limited when querying the DNS (can take up to 5s).
/// Other reasons are connection pools that must be shared between as much as possible.
#[allow(clippy::type_complexity)]
static OBJECT_STORE_CACHE: Lazy<RwLock<Option<(CacheKey, Arc<dyn ObjectStore>)>>> =
static OBJECT_STORE_CACHE: Lazy<RwLock<PlHashMap<String, Arc<dyn ObjectStore>>>> =
Lazy::new(Default::default);

type BuildResult = PolarsResult<(CloudLocation, Arc<dyn ObjectStore>)>;
Expand All @@ -24,26 +24,31 @@ fn err_missing_feature(feature: &str, scheme: &str) -> BuildResult {
);
}

/// Get the key of a url for object store registration.
/// The credential info will be removed
fn url_to_key(url: &Url) -> String {
format!(
"{}://{}",
url.scheme(),
&url[url::Position::BeforeHost..url::Position::AfterPort],
)
}

/// Build an [`ObjectStore`] based on the URL and passed in url. Return the cloud location and an implementation of the object store.
pub async fn build_object_store(url: &str, options: Option<&CloudOptions>) -> BuildResult {
let parsed = parse_url(url).map_err(to_compute_err)?;
let cloud_location = CloudLocation::from_url(&parsed)?;

let options = options.cloned();
let key = (url.to_string(), options);
let key = url_to_key(&parsed);

{
let cache = OBJECT_STORE_CACHE.read().await;
if let Some((stored_key, store)) = cache.as_ref() {
if stored_key == &key {
return Ok((cloud_location, store.clone()));
}
if let Some(store) = cache.get(&key) {
return Ok((cloud_location, store.clone()));
}
}

let options = key
.1
.as_ref()
let options = options
.map(Cow::Borrowed)
.unwrap_or_else(|| Cow::Owned(Default::default()));

Expand Down Expand Up @@ -98,6 +103,10 @@ pub async fn build_object_store(url: &str, options: Option<&CloudOptions>) -> Bu
},
}?;
let mut cache = OBJECT_STORE_CACHE.write().await;
*cache = Some((key, store.clone()));
// Clear the cache if we surpass a certain amount of buckets. Don't expect that to happen.
if cache.len() > 512 {
cache.clear()
}
cache.insert(key, store.clone());
Ok((cloud_location, store))
}

0 comments on commit 0c86901

Please sign in to comment.