From 2ce5f185be136f54c82fd11470859dee2a9e92ea Mon Sep 17 00:00:00 2001 From: Henri Froese Date: Sun, 17 Mar 2024 19:12:44 +0100 Subject: [PATCH] feat: Use file cache to list partitions if available When discovering partitions for pruning, if we specify no partition columns, we call `list_all_files`, which uses the `list_files_cache` if it exists and is filled. If we specify partition columns, before this change, we recursively list files in the object store to discover partitions. That happens on every request, and listing files e.g. in AWS S3 can be slow (especially if it's 100k+). With this change, if the `list_files_cache` exists and is filled, we get all files from there and use that to discover partitions. Closes #9654 --- .../core/src/datasource/listing/helpers.rs | 269 +++++++++++++++++- 1 file changed, 265 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index c53e8df35de8..86e594e687ad 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -17,6 +17,10 @@ //! Helper functions for the table implementation +use std::collections::BTreeSet; +use std::fmt::{Display, Formatter}; +use std::ops::Deref; +use std::ptr::null; use std::sync::Arc; use super::PartitionedFile; @@ -31,6 +35,8 @@ use arrow::{ record_batch::RecordBatch, }; use arrow_schema::Fields; +use async_trait::async_trait; +use bytes::Bytes; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{internal_err, Column, DFField, DFSchema, DataFusionError}; use datafusion_expr::execution_props::ExecutionProps; @@ -39,7 +45,11 @@ use datafusion_physical_expr::create_physical_expr; use futures::stream::{BoxStream, FuturesUnordered, StreamExt, TryStreamExt}; use log::{debug, trace}; use object_store::path::Path; -use object_store::{ObjectMeta, ObjectStore}; +use object_store::{ + GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, + PutResult, +}; +use tokio::io::AsyncWrite; /// Check whether the given expression can be resolved using only the columns `col_names`. /// This means that if this function returns true: @@ -168,13 +178,127 @@ struct Partition { files: Option>, } +#[derive(Debug, Default)] +struct ObjectMetaLister { + objects: Arc>, +} + +impl Display for ObjectMetaLister { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + unimplemented!() + } +} + +#[async_trait] +impl ObjectStore for ObjectMetaLister { + async fn put_opts( + &self, + location: &Path, + bytes: Bytes, + opts: PutOptions, + ) -> object_store::Result { + unimplemented!() + } + + async fn put_multipart( + &self, + location: &Path, + ) -> object_store::Result<(MultipartId, Box)> { + unimplemented!() + } + + async fn abort_multipart( + &self, + location: &Path, + multipart_id: &MultipartId, + ) -> object_store::Result<()> { + unimplemented!() + } + + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> object_store::Result { + unimplemented!() + } + + async fn delete(&self, location: &Path) -> object_store::Result<()> { + unimplemented!() + } + + fn list( + &self, + prefix: Option<&Path>, + ) -> BoxStream<'_, object_store::Result> { + unimplemented!() + } + + async fn list_with_delimiter( + &self, + prefix: Option<&Path>, + ) -> object_store::Result { + // Copied and slightly adapted from object_store::memory::InMemory::list_with_delimiter + + let root = Path::default(); + let prefix = prefix.unwrap_or(&root); + + let mut common_prefixes = BTreeSet::new(); + + // Only objects in this base level should be returned in the + // response. Otherwise, we just collect the common prefixes. + let mut objects = vec![]; + for object in self.objects.deref() { + let path = &object.location; + if !path.as_ref().starts_with(prefix.as_ref()) { + break; + } + + // Pop first element + let mut parts = match path.prefix_match(prefix) { + Some(parts) => parts, + // Should only return children of the prefix + None => continue, + }; + + let common_prefix = match parts.next() { + Some(p) => p, + None => continue, + }; + + if parts.next().is_some() { + common_prefixes.insert(prefix.child(common_prefix)); + } else { + objects.push(object.clone()); + } + } + + Ok(ListResult { + objects, + common_prefixes: common_prefixes.into_iter().collect(), + }) + } + + async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> { + unimplemented!() + } + + async fn copy_if_not_exists( + &self, + from: &Path, + to: &Path, + ) -> object_store::Result<()> { + unimplemented!() + } +} + impl Partition { /// List the direct children of this partition updating `self.files` with /// any child files, and returning a list of child "directories" - async fn list(mut self, store: &dyn ObjectStore) -> Result<(Self, Vec)> { + async fn list(mut self, lister: &dyn ObjectStore) -> Result<(Self, Vec)> { trace!("Listing partition {}", self.path); let prefix = Some(&self.path).filter(|p| !p.as_ref().is_empty()); - let result = store.list_with_delimiter(prefix).await?; + let result = lister.list_with_delimiter(prefix).await?; self.files = Some(result.objects); Ok((self, result.common_prefixes)) } @@ -182,10 +306,26 @@ impl Partition { /// Returns a recursive list of the partitions in `table_path` up to `max_depth` async fn list_partitions( + ctx: &SessionState, store: &dyn ObjectStore, table_path: &ListingTableUrl, max_depth: usize, ) -> Result> { + let mut cache_lister: Option> = None; + + if let Some(cache) = ctx.runtime_env().cache_manager.get_list_files_cache() { + if let Some(objects) = cache.get(&table_path.prefix()) { + debug!("Hit files cache in list_partitions"); + cache_lister = Some(Box::new(ObjectMetaLister { objects })); + } + }; + + let store: &dyn ObjectStore = if let Some(ref lister) = cache_lister { + &**lister + } else { + store + }; + let partition = Partition { path: table_path.prefix().clone(), depth: 0, @@ -343,7 +483,8 @@ pub async fn pruned_partition_list<'a>( )); } - let partitions = list_partitions(store, table_path, partition_cols.len()).await?; + let partitions = + list_partitions(ctx, store, table_path, partition_cols.len()).await?; debug!("Listed {} partitions", partitions.len()); let pruned = @@ -431,9 +572,14 @@ where mod tests { use std::ops::Not; + use datafusion_execution::cache::cache_manager::CacheManagerConfig; + use datafusion_execution::cache::cache_unit; + use datafusion_execution::config::SessionConfig; + use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use futures::StreamExt; use crate::logical_expr::{case, col, lit}; + use crate::prelude::SessionContext; use crate::test::object_store::make_test_store_and_state; use super::*; @@ -538,6 +684,121 @@ mod tests { assert_eq!(f2.partition_values, &[ScalarValue::from("val1"),]); } + #[tokio::test] + async fn test_pruned_partition_list_with_cache() { + let (store, state) = make_test_store_and_state(&[ + ("tablepath/mypartition=val1/file.parquet", 100), + ("tablepath/mypartition=val2/file.parquet", 100), + ("tablepath/mypartition=val1/other=val3/file.parquet", 100), + ]); + + let cache_config = CacheManagerConfig::default(); + let file_static_cache = + Arc::new(cache_unit::DefaultFileStatisticsCache::default()); + let list_file_cache = Arc::new(cache_unit::DefaultListFilesCache::default()); + let cache_config = cache_config + .with_files_statistics_cache(Some(file_static_cache.clone())) + .with_list_files_cache(Some(list_file_cache.clone())); + let rt = Arc::new( + RuntimeEnv::new(RuntimeConfig::new().with_cache_manager(cache_config)) + .unwrap(), + ); + let context = SessionContext::new_with_config_rt(SessionConfig::default(), rt); + let state = context.state(); + + // Cache is initially empty + let filter = Expr::eq(col("mypartition"), lit("val1")); + let pruned = pruned_partition_list( + &state, + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + &[filter], + ".parquet", + &[(String::from("mypartition"), DataType::Utf8)], + ) + .await + .expect("partition pruning failed") + .try_collect::>() + .await + .unwrap(); + + assert_eq!(pruned.len(), 2); + let f1 = &pruned[0]; + assert_eq!( + f1.object_meta.location.as_ref(), + "tablepath/mypartition=val1/file.parquet" + ); + assert_eq!(&f1.partition_values, &[ScalarValue::from("val1")]); + let f2 = &pruned[1]; + assert_eq!( + f2.object_meta.location.as_ref(), + "tablepath/mypartition=val1/other=val3/file.parquet" + ); + assert_eq!(f2.partition_values, &[ScalarValue::from("val1"),]); + + // Now without partition cols. This will call list_all_files, so it will fill the cache. + let filter = Expr::eq(col("mypartition"), lit("val1")); + let pruned = pruned_partition_list( + &state, + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + &[filter], + ".parquet", + &[], + ) + .await + .expect("partition pruning failed") + .try_collect::>() + .await + .unwrap(); + + // Make sure files are cached now + assert_eq!(pruned.len(), 3); + let cache = context + .runtime_env() + .cache_manager + .get_list_files_cache() + .expect("cache"); + let entries = cache + .get( + &ListingTableUrl::parse("file:///tablepath/") + .unwrap() + .prefix(), + ) + .expect("cache entries"); + assert_eq!(entries.len(), 3); + + // Should get same result with the cache now + let filter = Expr::eq(col("mypartition"), lit("val1")); + let pruned = pruned_partition_list( + &state, + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + &[filter], + ".parquet", + &[(String::from("mypartition"), DataType::Utf8)], + ) + .await + .expect("partition pruning failed") + .try_collect::>() + .await + .unwrap(); + + assert_eq!(pruned.len(), 2); + let f1 = &pruned[0]; + assert_eq!( + f1.object_meta.location.as_ref(), + "tablepath/mypartition=val1/file.parquet" + ); + assert_eq!(&f1.partition_values, &[ScalarValue::from("val1")]); + let f2 = &pruned[1]; + assert_eq!( + f2.object_meta.location.as_ref(), + "tablepath/mypartition=val1/other=val3/file.parquet" + ); + assert_eq!(f2.partition_values, &[ScalarValue::from("val1"),]); + } + #[tokio::test] async fn test_pruned_partition_list_multi() { let (store, state) = make_test_store_and_state(&[