Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use file cache to list partitions if available #9655

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 265 additions & 4 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

//! Helper functions for the table implementation

use std::collections::BTreeSet;
use std::fmt::{Display, Formatter};
use std::ops::Deref;
use std::ptr::null;
use std::sync::Arc;

use super::PartitionedFile;
Expand All @@ -31,6 +35,8 @@ use arrow::{
record_batch::RecordBatch,
};
use arrow_schema::Fields;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{internal_err, Column, DFField, DFSchema, DataFusionError};
use datafusion_expr::execution_props::ExecutionProps;
Expand All @@ -39,7 +45,11 @@ use datafusion_physical_expr::create_physical_expr;
use futures::stream::{BoxStream, FuturesUnordered, StreamExt, TryStreamExt};
use log::{debug, trace};
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use object_store::{
GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions,
PutResult,
};
use tokio::io::AsyncWrite;

/// Check whether the given expression can be resolved using only the columns `col_names`.
/// This means that if this function returns true:
Expand Down Expand Up @@ -168,24 +178,154 @@ struct Partition {
files: Option<Vec<ObjectMeta>>,
}

#[derive(Debug, Default)]
struct ObjectMetaLister {
objects: Arc<Vec<ObjectMeta>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I would I recommend a trie for this use case, as list_with_delimiter called individually on every single partition, which means this code is going to perform O(N^2) in the worst case. I used sequence_trie for this in my own object store cache implementation.

IMO it would be ideal if the ListFilesCache itself returned a trie instead of a Vec -- then no conversion will need to happen at all.

}

impl Display for ObjectMetaLister {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
unimplemented!()
}
}

#[async_trait]
impl ObjectStore for ObjectMetaLister {
async fn put_opts(
&self,
location: &Path,
bytes: Bytes,
opts: PutOptions,
) -> object_store::Result<PutResult> {
unimplemented!()
}

async fn put_multipart(
&self,
location: &Path,
) -> object_store::Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
unimplemented!()
}

async fn abort_multipart(
&self,
location: &Path,
multipart_id: &MultipartId,
) -> object_store::Result<()> {
unimplemented!()
}

async fn get_opts(
&self,
location: &Path,
options: GetOptions,
) -> object_store::Result<GetResult> {
unimplemented!()
}

async fn delete(&self, location: &Path) -> object_store::Result<()> {
unimplemented!()
}

fn list(
&self,
prefix: Option<&Path>,
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
unimplemented!()
}

async fn list_with_delimiter(
&self,
prefix: Option<&Path>,
) -> object_store::Result<ListResult> {
// Copied and slightly adapted from object_store::memory::InMemory::list_with_delimiter

let root = Path::default();
let prefix = prefix.unwrap_or(&root);

let mut common_prefixes = BTreeSet::new();

// Only objects in this base level should be returned in the
// response. Otherwise, we just collect the common prefixes.
let mut objects = vec![];
for object in self.objects.deref() {
let path = &object.location;
if !path.as_ref().starts_with(prefix.as_ref()) {
break;
}

// Pop first element
let mut parts = match path.prefix_match(prefix) {
Some(parts) => parts,
// Should only return children of the prefix
None => continue,
};

let common_prefix = match parts.next() {
Some(p) => p,
None => continue,
};

if parts.next().is_some() {
common_prefixes.insert(prefix.child(common_prefix));
} else {
objects.push(object.clone());
}
}

Ok(ListResult {
objects,
common_prefixes: common_prefixes.into_iter().collect(),
})
}

async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
unimplemented!()
}

async fn copy_if_not_exists(
&self,
from: &Path,
to: &Path,
) -> object_store::Result<()> {
unimplemented!()
}
}

impl Partition {
/// List the direct children of this partition updating `self.files` with
/// any child files, and returning a list of child "directories"
async fn list(mut self, store: &dyn ObjectStore) -> Result<(Self, Vec<Path>)> {
async fn list(mut self, lister: &dyn ObjectStore) -> Result<(Self, Vec<Path>)> {
trace!("Listing partition {}", self.path);
let prefix = Some(&self.path).filter(|p| !p.as_ref().is_empty());
let result = store.list_with_delimiter(prefix).await?;
let result = lister.list_with_delimiter(prefix).await?;
self.files = Some(result.objects);
Ok((self, result.common_prefixes))
}
}

/// Returns a recursive list of the partitions in `table_path` up to `max_depth`
async fn list_partitions(
ctx: &SessionState,
store: &dyn ObjectStore,
table_path: &ListingTableUrl,
max_depth: usize,
) -> Result<Vec<Partition>> {
let mut cache_lister: Option<Box<dyn ObjectStore>> = None;

if let Some(cache) = ctx.runtime_env().cache_manager.get_list_files_cache() {
if let Some(objects) = cache.get(&table_path.prefix()) {
debug!("Hit files cache in list_partitions");
cache_lister = Some(Box::new(ObjectMetaLister { objects }));
}
};

let store: &dyn ObjectStore = if let Some(ref lister) = cache_lister {
&**lister
} else {
store
};

let partition = Partition {
path: table_path.prefix().clone(),
depth: 0,
Expand Down Expand Up @@ -343,7 +483,8 @@ pub async fn pruned_partition_list<'a>(
));
}

let partitions = list_partitions(store, table_path, partition_cols.len()).await?;
let partitions =
list_partitions(ctx, store, table_path, partition_cols.len()).await?;
debug!("Listed {} partitions", partitions.len());

let pruned =
Expand Down Expand Up @@ -431,9 +572,14 @@ where
mod tests {
use std::ops::Not;

use datafusion_execution::cache::cache_manager::CacheManagerConfig;
use datafusion_execution::cache::cache_unit;
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use futures::StreamExt;

use crate::logical_expr::{case, col, lit};
use crate::prelude::SessionContext;
use crate::test::object_store::make_test_store_and_state;

use super::*;
Expand Down Expand Up @@ -538,6 +684,121 @@ mod tests {
assert_eq!(f2.partition_values, &[ScalarValue::from("val1"),]);
}

#[tokio::test]
async fn test_pruned_partition_list_with_cache() {
let (store, state) = make_test_store_and_state(&[
("tablepath/mypartition=val1/file.parquet", 100),
("tablepath/mypartition=val2/file.parquet", 100),
("tablepath/mypartition=val1/other=val3/file.parquet", 100),
]);

let cache_config = CacheManagerConfig::default();
let file_static_cache =
Arc::new(cache_unit::DefaultFileStatisticsCache::default());
let list_file_cache = Arc::new(cache_unit::DefaultListFilesCache::default());
let cache_config = cache_config
.with_files_statistics_cache(Some(file_static_cache.clone()))
.with_list_files_cache(Some(list_file_cache.clone()));
let rt = Arc::new(
RuntimeEnv::new(RuntimeConfig::new().with_cache_manager(cache_config))
.unwrap(),
);
let context = SessionContext::new_with_config_rt(SessionConfig::default(), rt);
let state = context.state();

// Cache is initially empty
let filter = Expr::eq(col("mypartition"), lit("val1"));
let pruned = pruned_partition_list(
&state,
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
&[filter],
".parquet",
&[(String::from("mypartition"), DataType::Utf8)],
)
.await
.expect("partition pruning failed")
.try_collect::<Vec<_>>()
.await
.unwrap();

assert_eq!(pruned.len(), 2);
let f1 = &pruned[0];
assert_eq!(
f1.object_meta.location.as_ref(),
"tablepath/mypartition=val1/file.parquet"
);
assert_eq!(&f1.partition_values, &[ScalarValue::from("val1")]);
let f2 = &pruned[1];
assert_eq!(
f2.object_meta.location.as_ref(),
"tablepath/mypartition=val1/other=val3/file.parquet"
);
assert_eq!(f2.partition_values, &[ScalarValue::from("val1"),]);

// Now without partition cols. This will call list_all_files, so it will fill the cache.
let filter = Expr::eq(col("mypartition"), lit("val1"));
let pruned = pruned_partition_list(
&state,
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
&[filter],
".parquet",
&[],
)
.await
.expect("partition pruning failed")
.try_collect::<Vec<_>>()
.await
.unwrap();

// Make sure files are cached now
assert_eq!(pruned.len(), 3);
let cache = context
.runtime_env()
.cache_manager
.get_list_files_cache()
.expect("cache");
let entries = cache
.get(
&ListingTableUrl::parse("file:///tablepath/")
.unwrap()
.prefix(),
)
.expect("cache entries");
assert_eq!(entries.len(), 3);

// Should get same result with the cache now
let filter = Expr::eq(col("mypartition"), lit("val1"));
let pruned = pruned_partition_list(
&state,
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
&[filter],
".parquet",
&[(String::from("mypartition"), DataType::Utf8)],
)
.await
.expect("partition pruning failed")
.try_collect::<Vec<_>>()
.await
.unwrap();

assert_eq!(pruned.len(), 2);
let f1 = &pruned[0];
assert_eq!(
f1.object_meta.location.as_ref(),
"tablepath/mypartition=val1/file.parquet"
);
assert_eq!(&f1.partition_values, &[ScalarValue::from("val1")]);
let f2 = &pruned[1];
assert_eq!(
f2.object_meta.location.as_ref(),
"tablepath/mypartition=val1/other=val3/file.parquet"
);
assert_eq!(f2.partition_values, &[ScalarValue::from("val1"),]);
}

#[tokio::test]
async fn test_pruned_partition_list_multi() {
let (store, state) = make_test_store_and_state(&[
Expand Down
Loading