diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index c53e8df35de8..86e594e687ad 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -17,6 +17,10 @@ //! Helper functions for the table implementation +use std::collections::BTreeSet; +use std::fmt::{Display, Formatter}; +use std::ops::Deref; +use std::ptr::null; use std::sync::Arc; use super::PartitionedFile; @@ -31,6 +35,8 @@ use arrow::{ record_batch::RecordBatch, }; use arrow_schema::Fields; +use async_trait::async_trait; +use bytes::Bytes; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{internal_err, Column, DFField, DFSchema, DataFusionError}; use datafusion_expr::execution_props::ExecutionProps; @@ -39,7 +45,11 @@ use datafusion_physical_expr::create_physical_expr; use futures::stream::{BoxStream, FuturesUnordered, StreamExt, TryStreamExt}; use log::{debug, trace}; use object_store::path::Path; -use object_store::{ObjectMeta, ObjectStore}; +use object_store::{ + GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, + PutResult, +}; +use tokio::io::AsyncWrite; /// Check whether the given expression can be resolved using only the columns `col_names`. /// This means that if this function returns true: @@ -168,13 +178,127 @@ struct Partition { files: Option>, } +#[derive(Debug, Default)] +struct ObjectMetaLister { + objects: Arc>, +} + +impl Display for ObjectMetaLister { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + unimplemented!() + } +} + +#[async_trait] +impl ObjectStore for ObjectMetaLister { + async fn put_opts( + &self, + location: &Path, + bytes: Bytes, + opts: PutOptions, + ) -> object_store::Result { + unimplemented!() + } + + async fn put_multipart( + &self, + location: &Path, + ) -> object_store::Result<(MultipartId, Box)> { + unimplemented!() + } + + async fn abort_multipart( + &self, + location: &Path, + multipart_id: &MultipartId, + ) -> object_store::Result<()> { + unimplemented!() + } + + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> object_store::Result { + unimplemented!() + } + + async fn delete(&self, location: &Path) -> object_store::Result<()> { + unimplemented!() + } + + fn list( + &self, + prefix: Option<&Path>, + ) -> BoxStream<'_, object_store::Result> { + unimplemented!() + } + + async fn list_with_delimiter( + &self, + prefix: Option<&Path>, + ) -> object_store::Result { + // Copied and slightly adapted from object_store::memory::InMemory::list_with_delimiter + + let root = Path::default(); + let prefix = prefix.unwrap_or(&root); + + let mut common_prefixes = BTreeSet::new(); + + // Only objects in this base level should be returned in the + // response. Otherwise, we just collect the common prefixes. + let mut objects = vec![]; + for object in self.objects.deref() { + let path = &object.location; + if !path.as_ref().starts_with(prefix.as_ref()) { + break; + } + + // Pop first element + let mut parts = match path.prefix_match(prefix) { + Some(parts) => parts, + // Should only return children of the prefix + None => continue, + }; + + let common_prefix = match parts.next() { + Some(p) => p, + None => continue, + }; + + if parts.next().is_some() { + common_prefixes.insert(prefix.child(common_prefix)); + } else { + objects.push(object.clone()); + } + } + + Ok(ListResult { + objects, + common_prefixes: common_prefixes.into_iter().collect(), + }) + } + + async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> { + unimplemented!() + } + + async fn copy_if_not_exists( + &self, + from: &Path, + to: &Path, + ) -> object_store::Result<()> { + unimplemented!() + } +} + impl Partition { /// List the direct children of this partition updating `self.files` with /// any child files, and returning a list of child "directories" - async fn list(mut self, store: &dyn ObjectStore) -> Result<(Self, Vec)> { + async fn list(mut self, lister: &dyn ObjectStore) -> Result<(Self, Vec)> { trace!("Listing partition {}", self.path); let prefix = Some(&self.path).filter(|p| !p.as_ref().is_empty()); - let result = store.list_with_delimiter(prefix).await?; + let result = lister.list_with_delimiter(prefix).await?; self.files = Some(result.objects); Ok((self, result.common_prefixes)) } @@ -182,10 +306,26 @@ impl Partition { /// Returns a recursive list of the partitions in `table_path` up to `max_depth` async fn list_partitions( + ctx: &SessionState, store: &dyn ObjectStore, table_path: &ListingTableUrl, max_depth: usize, ) -> Result> { + let mut cache_lister: Option> = None; + + if let Some(cache) = ctx.runtime_env().cache_manager.get_list_files_cache() { + if let Some(objects) = cache.get(&table_path.prefix()) { + debug!("Hit files cache in list_partitions"); + cache_lister = Some(Box::new(ObjectMetaLister { objects })); + } + }; + + let store: &dyn ObjectStore = if let Some(ref lister) = cache_lister { + &**lister + } else { + store + }; + let partition = Partition { path: table_path.prefix().clone(), depth: 0, @@ -343,7 +483,8 @@ pub async fn pruned_partition_list<'a>( )); } - let partitions = list_partitions(store, table_path, partition_cols.len()).await?; + let partitions = + list_partitions(ctx, store, table_path, partition_cols.len()).await?; debug!("Listed {} partitions", partitions.len()); let pruned = @@ -431,9 +572,14 @@ where mod tests { use std::ops::Not; + use datafusion_execution::cache::cache_manager::CacheManagerConfig; + use datafusion_execution::cache::cache_unit; + use datafusion_execution::config::SessionConfig; + use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use futures::StreamExt; use crate::logical_expr::{case, col, lit}; + use crate::prelude::SessionContext; use crate::test::object_store::make_test_store_and_state; use super::*; @@ -538,6 +684,121 @@ mod tests { assert_eq!(f2.partition_values, &[ScalarValue::from("val1"),]); } + #[tokio::test] + async fn test_pruned_partition_list_with_cache() { + let (store, state) = make_test_store_and_state(&[ + ("tablepath/mypartition=val1/file.parquet", 100), + ("tablepath/mypartition=val2/file.parquet", 100), + ("tablepath/mypartition=val1/other=val3/file.parquet", 100), + ]); + + let cache_config = CacheManagerConfig::default(); + let file_static_cache = + Arc::new(cache_unit::DefaultFileStatisticsCache::default()); + let list_file_cache = Arc::new(cache_unit::DefaultListFilesCache::default()); + let cache_config = cache_config + .with_files_statistics_cache(Some(file_static_cache.clone())) + .with_list_files_cache(Some(list_file_cache.clone())); + let rt = Arc::new( + RuntimeEnv::new(RuntimeConfig::new().with_cache_manager(cache_config)) + .unwrap(), + ); + let context = SessionContext::new_with_config_rt(SessionConfig::default(), rt); + let state = context.state(); + + // Cache is initially empty + let filter = Expr::eq(col("mypartition"), lit("val1")); + let pruned = pruned_partition_list( + &state, + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + &[filter], + ".parquet", + &[(String::from("mypartition"), DataType::Utf8)], + ) + .await + .expect("partition pruning failed") + .try_collect::>() + .await + .unwrap(); + + assert_eq!(pruned.len(), 2); + let f1 = &pruned[0]; + assert_eq!( + f1.object_meta.location.as_ref(), + "tablepath/mypartition=val1/file.parquet" + ); + assert_eq!(&f1.partition_values, &[ScalarValue::from("val1")]); + let f2 = &pruned[1]; + assert_eq!( + f2.object_meta.location.as_ref(), + "tablepath/mypartition=val1/other=val3/file.parquet" + ); + assert_eq!(f2.partition_values, &[ScalarValue::from("val1"),]); + + // Now without partition cols. This will call list_all_files, so it will fill the cache. + let filter = Expr::eq(col("mypartition"), lit("val1")); + let pruned = pruned_partition_list( + &state, + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + &[filter], + ".parquet", + &[], + ) + .await + .expect("partition pruning failed") + .try_collect::>() + .await + .unwrap(); + + // Make sure files are cached now + assert_eq!(pruned.len(), 3); + let cache = context + .runtime_env() + .cache_manager + .get_list_files_cache() + .expect("cache"); + let entries = cache + .get( + &ListingTableUrl::parse("file:///tablepath/") + .unwrap() + .prefix(), + ) + .expect("cache entries"); + assert_eq!(entries.len(), 3); + + // Should get same result with the cache now + let filter = Expr::eq(col("mypartition"), lit("val1")); + let pruned = pruned_partition_list( + &state, + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + &[filter], + ".parquet", + &[(String::from("mypartition"), DataType::Utf8)], + ) + .await + .expect("partition pruning failed") + .try_collect::>() + .await + .unwrap(); + + assert_eq!(pruned.len(), 2); + let f1 = &pruned[0]; + assert_eq!( + f1.object_meta.location.as_ref(), + "tablepath/mypartition=val1/file.parquet" + ); + assert_eq!(&f1.partition_values, &[ScalarValue::from("val1")]); + let f2 = &pruned[1]; + assert_eq!( + f2.object_meta.location.as_ref(), + "tablepath/mypartition=val1/other=val3/file.parquet" + ); + assert_eq!(f2.partition_values, &[ScalarValue::from("val1"),]); + } + #[tokio::test] async fn test_pruned_partition_list_multi() { let (store, state) = make_test_store_and_state(&[