From 585bc3a629b92ea7a86ebfe8bf762dbef4155710 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Wed, 1 Jun 2022 21:58:02 +0100 Subject: [PATCH] Remove ObjectStore from FileScanConfig and ListingTableConfig (#2668) * Remove ObjectStore from FileScanConfig and ListingTableConfig * Update ballista pin --- benchmarks/src/bin/tpch.rs | 3 +- datafusion-examples/examples/flight_server.rs | 4 +- .../core/benches/sort_limit_query_sql.rs | 3 +- datafusion/core/src/catalog/schema.rs | 7 +- .../core/src/datasource/file_format/mod.rs | 1 - .../core/src/datasource/listing/table.rs | 141 ++++++++---------- datafusion/core/src/execution/context.rs | 29 ++-- datafusion/core/src/execution/runtime_env.rs | 2 +- .../src/physical_optimizer/repartition.rs | 2 - .../src/physical_plan/file_format/avro.rs | 9 +- .../core/src/physical_plan/file_format/csv.rs | 6 +- .../src/physical_plan/file_format/json.rs | 37 +++-- .../core/src/physical_plan/file_format/mod.rs | 6 +- .../src/physical_plan/file_format/parquet.rs | 20 ++- datafusion/core/src/test/mod.rs | 5 +- datafusion/core/src/test/object_store.rs | 7 + datafusion/core/tests/path_partition.rs | 33 ++-- datafusion/core/tests/row.rs | 1 - datafusion/proto/src/logical_plan.rs | 10 +- dev/build-arrow-ballista.sh | 2 +- 20 files changed, 152 insertions(+), 176 deletions(-) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 4e49bff09d9d..5d4895698ad0 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -44,7 +44,6 @@ use datafusion::{ }; use datafusion::{ arrow::util::pretty, - datafusion_data_access::object_store::local::LocalFileSystem, datasource::listing::{ListingOptions, ListingTable, ListingTableConfig}, }; @@ -427,7 +426,7 @@ fn get_table( }; let table_path = ListingTableUrl::parse(path)?; - let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), table_path) + let config = ListingTableConfig::new(table_path) .with_listing_options(options) .with_schema(schema); diff --git a/datafusion-examples/examples/flight_server.rs b/datafusion-examples/examples/flight_server.rs index a3d7c0f56644..5dbd694ac0f5 100644 --- a/datafusion-examples/examples/flight_server.rs +++ b/datafusion-examples/examples/flight_server.rs @@ -19,7 +19,6 @@ use std::pin::Pin; use std::sync::Arc; use arrow_flight::SchemaAsIpc; -use datafusion::datafusion_data_access::object_store::local::LocalFileSystem; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::{ListingOptions, ListingTableUrl}; use futures::Stream; @@ -71,8 +70,9 @@ impl FlightService for FlightServiceImpl { let table_path = ListingTableUrl::parse(&request.path[0]).map_err(to_tonic_err)?; + let ctx = SessionContext::new(); let schema = listing_options - .infer_schema(Arc::new(LocalFileSystem {}), &table_path) + .infer_schema(&ctx.state(), &table_path) .await .unwrap(); diff --git a/datafusion/core/benches/sort_limit_query_sql.rs b/datafusion/core/benches/sort_limit_query_sql.rs index 198eb941f14d..e7aa33bd70bd 100644 --- a/datafusion/core/benches/sort_limit_query_sql.rs +++ b/datafusion/core/benches/sort_limit_query_sql.rs @@ -18,7 +18,6 @@ #[macro_use] extern crate criterion; use criterion::Criterion; -use datafusion::datafusion_data_access::object_store::local::LocalFileSystem; use datafusion::datasource::file_format::csv::CsvFormat; use datafusion::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, @@ -71,7 +70,7 @@ fn create_context() -> Arc> { // create CSV data source let listing_options = ListingOptions::new(Arc::new(CsvFormat::default())); - let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), table_path) + let config = ListingTableConfig::new(table_path) .with_listing_options(listing_options) .with_schema(schema); diff --git a/datafusion/core/src/catalog/schema.rs b/datafusion/core/src/catalog/schema.rs index 6771f3e8d2e9..db25c1edcc18 100644 --- a/datafusion/core/src/catalog/schema.rs +++ b/datafusion/core/src/catalog/schema.rs @@ -171,11 +171,10 @@ mod tests { let ctx = SessionContext::new(); let store = Arc::new(LocalFileSystem {}); - ctx.runtime_env() - .register_object_store("file", store.clone()); + ctx.runtime_env().register_object_store("file", store); - let config = ListingTableConfig::new(store, table_path) - .infer() + let config = ListingTableConfig::new(table_path) + .infer(&ctx.state()) .await .unwrap(); let table = ListingTable::try_new(config).unwrap(); diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index eae86fa9cca7..a15750394903 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -115,7 +115,6 @@ pub(crate) mod test_util { let exec = format .create_physical_plan( FileScanConfig { - object_store: store, object_store_url: ObjectStoreUrl::local_filesystem(), file_schema, file_groups, diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index bc7ed26042a3..29cde3c9faab 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -45,14 +45,11 @@ use crate::{ }; use super::PartitionedFile; -use datafusion_data_access::object_store::ObjectStore; use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files}; /// Configuration for creating a 'ListingTable' pub struct ListingTableConfig { - /// `ObjectStore` that contains the files for the `ListingTable`. - pub object_store: Arc, /// Path on the `ObjectStore` for creating `ListingTable`. pub table_path: ListingTableUrl, /// Optional `SchemaRef` for the to be created `ListingTable`. @@ -63,9 +60,8 @@ pub struct ListingTableConfig { impl ListingTableConfig { /// Creates new `ListingTableConfig`. The `SchemaRef` and `ListingOptions` are inferred based on the suffix of the provided `table_path`. - pub fn new(object_store: Arc, table_path: ListingTableUrl) -> Self { + pub fn new(table_path: ListingTableUrl) -> Self { Self { - object_store, table_path, file_schema: None, options: None, @@ -74,7 +70,6 @@ impl ListingTableConfig { /// Add `schema` to `ListingTableConfig` pub fn with_schema(self, schema: SchemaRef) -> Self { Self { - object_store: self.object_store, table_path: self.table_path, file_schema: Some(schema), options: self.options, @@ -84,7 +79,6 @@ impl ListingTableConfig { /// Add `listing_options` to `ListingTableConfig` pub fn with_listing_options(self, listing_options: ListingOptions) -> Self { Self { - object_store: self.object_store, table_path: self.table_path, file_schema: self.file_schema, options: Some(listing_options), @@ -105,10 +99,12 @@ impl ListingTableConfig { } /// Infer `ListingOptions` based on `table_path` suffix. - pub async fn infer_options(self) -> Result { + pub async fn infer_options(self, ctx: &SessionState) -> Result { + let store = ctx.runtime_env.object_store(&self.table_path)?; + let file = self .table_path - .list_all_files(self.object_store.as_ref(), "") + .list_all_files(store.as_ref(), "") .next() .await .ok_or_else(|| DataFusionError::Internal("No files for table".into()))??; @@ -123,12 +119,11 @@ impl ListingTableConfig { format, collect_stat: true, file_extension: file_type.to_string(), - target_partitions: num_cpus::get(), + target_partitions: ctx.config.target_partitions, table_partition_cols: vec![], }; Ok(Self { - object_store: self.object_store, table_path: self.table_path, file_schema: self.file_schema, options: Some(listing_options), @@ -136,15 +131,12 @@ impl ListingTableConfig { } /// Infer `SchemaRef` based on `table_path` suffix. Requires `self.options` to be set prior to using. - pub async fn infer_schema(self) -> Result { + pub async fn infer_schema(self, ctx: &SessionState) -> Result { match self.options { Some(options) => { - let schema = options - .infer_schema(self.object_store.clone(), &self.table_path) - .await?; + let schema = options.infer_schema(ctx, &self.table_path).await?; Ok(Self { - object_store: self.object_store, table_path: self.table_path, file_schema: Some(schema), options: Some(options), @@ -157,8 +149,8 @@ impl ListingTableConfig { } /// Convenience wrapper for calling `infer_options` and `infer_schema` - pub async fn infer(self) -> Result { - self.infer_options().await?.infer_schema().await + pub async fn infer(self, ctx: &SessionState) -> Result { + self.infer_options(ctx).await?.infer_schema(ctx).await } } @@ -212,11 +204,16 @@ impl ListingOptions { /// locally or ask a remote service to do it (e.g a scheduler). pub async fn infer_schema<'a>( &'a self, - store: Arc, + ctx: &SessionState, table_path: &'a ListingTableUrl, ) -> Result { - let list_stream = table_path.list_all_files(store.as_ref(), &self.file_extension); - let files: Vec<_> = list_stream.try_collect().await?; + let store = ctx.runtime_env.object_store(table_path)?; + + let files: Vec<_> = table_path + .list_all_files(store.as_ref(), &self.file_extension) + .try_collect() + .await?; + self.format.infer_schema(&store, &files).await } } @@ -224,7 +221,6 @@ impl ListingOptions { /// An implementation of `TableProvider` that uses the object store /// or file system listing capability to get the list of files. pub struct ListingTable { - object_store: Arc, table_path: ListingTableUrl, /// File fields only file_schema: SchemaRef, @@ -261,8 +257,7 @@ impl ListingTable { } let table = Self { - object_store: config.object_store.clone(), - table_path: config.table_path.clone(), + table_path: config.table_path, file_schema, table_schema: Arc::new(Schema::new(table_fields)), options, @@ -271,11 +266,6 @@ impl ListingTable { Ok(table) } - /// Get object store ref - pub fn object_store(&self) -> &Arc { - &self.object_store - } - /// Get path ref pub fn table_path(&self) -> &ListingTableUrl { &self.table_path @@ -303,13 +293,13 @@ impl TableProvider for ListingTable { async fn scan( &self, - _ctx: &SessionState, + ctx: &SessionState, projection: &Option>, filters: &[Expr], limit: Option, ) -> Result> { let (partitioned_file_lists, statistics) = - self.list_files_for_scan(filters, limit).await?; + self.list_files_for_scan(ctx, filters, limit).await?; // if no files need to be read, return an `EmptyExec` if partitioned_file_lists.is_empty() { @@ -323,7 +313,6 @@ impl TableProvider for ListingTable { .format .create_physical_plan( FileScanConfig { - object_store: Arc::clone(&self.object_store), object_store_url: self.table_path.object_store(), file_schema: Arc::clone(&self.file_schema), file_groups: partitioned_file_lists, @@ -358,12 +347,14 @@ impl ListingTable { /// be distributed to different threads / executors. async fn list_files_for_scan<'a>( &'a self, + ctx: &'a SessionState, filters: &'a [Expr], limit: Option, ) -> Result<(Vec>, Statistics)> { + let store = ctx.runtime_env.object_store(&self.table_path)?; // list files (with partitions) let file_list = pruned_partition_list( - self.object_store.as_ref(), + store.as_ref(), &self.table_path, filters, &self.options.file_extension, @@ -373,25 +364,17 @@ impl ListingTable { // collect the statistics if required by the config // TODO: Collect statistics and schema in single-pass - let object_store = Arc::clone(&self.object_store); - let files = file_list.then(move |part_file| { - let object_store = object_store.clone(); - async move { - let part_file = part_file?; - let statistics = if self.options.collect_stat { - self.options - .format - .infer_stats( - &object_store, - self.file_schema.clone(), - &part_file.file_meta, - ) - .await? - } else { - Statistics::default() - }; - Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)> - } + let files = file_list.then(|part_file| async { + let part_file = part_file?; + let statistics = if self.options.collect_stat { + self.options + .format + .infer_stats(&store, self.file_schema.clone(), &part_file.file_meta) + .await? + } else { + Statistics::default() + }; + Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)> }); let (files, statistics) = @@ -409,10 +392,9 @@ mod tests { use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION; use crate::prelude::SessionContext; use crate::{ - datafusion_data_access::object_store::local::LocalFileSystem, datasource::file_format::{avro::AvroFormat, parquet::ParquetFormat}, logical_plan::{col, lit}, - test::{columns, object_store::TestObjectStore}, + test::{columns, object_store::register_test_store}, }; use arrow::datatypes::DataType; @@ -422,7 +404,7 @@ mod tests { async fn read_single_file() -> Result<()> { let ctx = SessionContext::new(); - let table = load_table("alltypes_plain.parquet").await?; + let table = load_table(&ctx, "alltypes_plain.parquet").await?; let projection = None; let exec = table .scan(&ctx.state(), &projection, &[], None) @@ -444,17 +426,18 @@ mod tests { let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/{}", testdata, "alltypes_plain.parquet"); let table_path = ListingTableUrl::parse(filename).unwrap(); + + let ctx = SessionContext::new(); + let state = ctx.state(); + let opt = ListingOptions::new(Arc::new(ParquetFormat::default())); - let schema = opt - .infer_schema(Arc::new(LocalFileSystem {}), &table_path) - .await?; - let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), table_path) + let schema = opt.infer_schema(&state, &table_path).await?; + let config = ListingTableConfig::new(table_path) .with_listing_options(opt) .with_schema(schema); let table = ListingTable::try_new(config)?; - let ctx = SessionContext::new(); - let exec = table.scan(&ctx.state(), &None, &[], None).await?; + let exec = table.scan(&state, &None, &[], None).await?; assert_eq!(exec.statistics().num_rows, Some(8)); assert_eq!(exec.statistics().total_byte_size, Some(671)); @@ -463,8 +446,9 @@ mod tests { #[tokio::test] async fn read_empty_table() -> Result<()> { + let ctx = SessionContext::new(); let path = String::from("table/p1=v1/file.avro"); - let store = TestObjectStore::new_arc(&[(&path, 100)]); + register_test_store(&ctx, &[(&path, 100)]); let opt = ListingOptions { file_extension: DEFAULT_AVRO_EXTENSION.to_owned(), @@ -474,10 +458,10 @@ mod tests { collect_stat: true, }; - let table_path = ListingTableUrl::parse("file:///table/").unwrap(); + let table_path = ListingTableUrl::parse("test:///table/").unwrap(); let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)])); - let config = ListingTableConfig::new(store, table_path) + let config = ListingTableConfig::new(table_path) .with_listing_options(opt) .with_schema(file_schema); let table = ListingTable::try_new(config)?; @@ -490,7 +474,6 @@ mod tests { // this will filter out the only file in the store let filter = Expr::not_eq(col("p1"), lit("v1")); - let ctx = SessionContext::new(); let scan = table .scan(&ctx.state(), &None, &[filter], None) .await @@ -516,7 +499,7 @@ mod tests { "bucket/key-prefix/file3", "bucket/key-prefix/file4", ], - "file:///bucket/key-prefix/", + "test:///bucket/key-prefix/", 12, 5, ) @@ -530,7 +513,7 @@ mod tests { "bucket/key-prefix/file2", "bucket/key-prefix/file3", ], - "file:///bucket/key-prefix/", + "test:///bucket/key-prefix/", 4, 4, ) @@ -545,14 +528,14 @@ mod tests { "bucket/key-prefix/file3", "bucket/key-prefix/file4", ], - "file:///bucket/key-prefix/", + "test:///bucket/key-prefix/", 2, 2, ) .await?; // no files => no groups - assert_list_files_for_scan_grouping(&[], "file:///bucket/key-prefix/", 2, 0) + assert_list_files_for_scan_grouping(&[], "test:///bucket/key-prefix/", 2, 0) .await?; // files that don't match the prefix @@ -562,7 +545,7 @@ mod tests { "bucket/key-prefix/file1", "bucket/other-prefix/roguefile", ], - "file:///bucket/key-prefix/", + "test:///bucket/key-prefix/", 10, 2, ) @@ -570,12 +553,16 @@ mod tests { Ok(()) } - async fn load_table(name: &str) -> Result> { + async fn load_table( + ctx: &SessionContext, + name: &str, + ) -> Result> { let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/{}", testdata, name); let table_path = ListingTableUrl::parse(filename).unwrap(); - let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), table_path) - .infer() + + let config = ListingTableConfig::new(table_path) + .infer(&ctx.state()) .await?; let table = ListingTable::try_new(config)?; Ok(Arc::new(table)) @@ -589,8 +576,8 @@ mod tests { target_partitions: usize, output_partitioning: usize, ) -> Result<()> { - let mock_store = - TestObjectStore::new_arc(&files.iter().map(|f| (*f, 10)).collect::>()); + let ctx = SessionContext::new(); + register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::>()); let format = AvroFormat {}; @@ -605,13 +592,13 @@ mod tests { let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]); let table_path = ListingTableUrl::parse(table_prefix).unwrap(); - let config = ListingTableConfig::new(mock_store, table_path) + let config = ListingTableConfig::new(table_path) .with_listing_options(opt) .with_schema(Arc::new(schema)); let table = ListingTable::try_new(config)?; - let (file_list, _) = table.list_files_for_scan(&[], None).await?; + let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?; assert_eq!(file_list.len(), output_partitioning); diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 29049fcdfbcc..d5b372c368fc 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -521,7 +521,6 @@ impl SessionContext { options: AvroReadOptions<'_>, ) -> Result> { let table_path = ListingTableUrl::parse(table_path)?; - let object_store = self.runtime_env().object_store(&table_path)?; let target_partitions = self.copied_config().target_partitions; let listing_options = options.to_listing_options(target_partitions); @@ -530,12 +529,12 @@ impl SessionContext { Some(s) => s, None => { listing_options - .infer_schema(Arc::clone(&object_store), &table_path) + .infer_schema(&self.state(), &table_path) .await? } }; - let config = ListingTableConfig::new(object_store, table_path) + let config = ListingTableConfig::new(table_path) .with_listing_options(listing_options) .with_schema(resolved_schema); let provider = ListingTable::try_new(config)?; @@ -549,7 +548,6 @@ impl SessionContext { options: NdJsonReadOptions<'_>, ) -> Result> { let table_path = ListingTableUrl::parse(table_path)?; - let object_store = self.runtime_env().object_store(&table_path)?; let target_partitions = self.copied_config().target_partitions; let listing_options = options.to_listing_options(target_partitions); @@ -558,11 +556,11 @@ impl SessionContext { Some(s) => s, None => { listing_options - .infer_schema(Arc::clone(&object_store), &table_path) + .infer_schema(&self.state(), &table_path) .await? } }; - let config = ListingTableConfig::new(object_store, table_path) + let config = ListingTableConfig::new(table_path) .with_listing_options(listing_options) .with_schema(resolved_schema); let provider = ListingTable::try_new(config)?; @@ -585,18 +583,17 @@ impl SessionContext { options: CsvReadOptions<'_>, ) -> Result> { let table_path = ListingTableUrl::parse(table_path)?; - let object_store = self.runtime_env().object_store(&table_path)?; let target_partitions = self.copied_config().target_partitions; let listing_options = options.to_listing_options(target_partitions); let resolved_schema = match options.schema { Some(s) => Arc::new(s.to_owned()), None => { listing_options - .infer_schema(Arc::clone(&object_store), &table_path) + .infer_schema(&self.state(), &table_path) .await? } }; - let config = ListingTableConfig::new(object_store, table_path.clone()) + let config = ListingTableConfig::new(table_path.clone()) .with_listing_options(listing_options) .with_schema(resolved_schema); @@ -611,17 +608,16 @@ impl SessionContext { options: ParquetReadOptions<'_>, ) -> Result> { let table_path = ListingTableUrl::parse(table_path)?; - let object_store = self.runtime_env().object_store(&table_path)?; let target_partitions = self.copied_config().target_partitions; let listing_options = options.to_listing_options(target_partitions); // with parquet we resolve the schema in all cases let resolved_schema = listing_options - .infer_schema(Arc::clone(&object_store), &table_path) + .infer_schema(&self.state(), &table_path) .await?; - let config = ListingTableConfig::new(object_store, table_path) + let config = ListingTableConfig::new(table_path) .with_listing_options(listing_options) .with_schema(resolved_schema); @@ -649,16 +645,11 @@ impl SessionContext { provided_schema: Option, ) -> Result<()> { let table_path = ListingTableUrl::parse(table_path)?; - let object_store = self.runtime_env().object_store(&table_path)?; let resolved_schema = match provided_schema { - None => { - options - .infer_schema(Arc::clone(&object_store), &table_path) - .await? - } + None => options.infer_schema(&self.state(), &table_path).await?, Some(s) => s, }; - let config = ListingTableConfig::new(object_store, table_path) + let config = ListingTableConfig::new(table_path) .with_listing_options(options) .with_schema(resolved_schema); let table = ListingTable::try_new(config)?; diff --git a/datafusion/core/src/execution/runtime_env.rs b/datafusion/core/src/execution/runtime_env.rs index 26d1471a1df5..2f134990fec7 100644 --- a/datafusion/core/src/execution/runtime_env.rs +++ b/datafusion/core/src/execution/runtime_env.rs @@ -100,7 +100,7 @@ impl RuntimeEnv { .register_store(scheme, object_store) } - /// Retrieves a `ObjectStore` instance by scheme + /// Retrieves a `ObjectStore` instance for a url pub fn object_store(&self, url: impl AsRef) -> Result> { self.object_store_registry .get_by_url(url) diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index f24e1e4e6f81..bdb01e2057ba 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -252,7 +252,6 @@ mod tests { use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::union::UnionExec; use crate::physical_plan::{displayable, Statistics}; - use crate::test::object_store::TestObjectStore; fn schema() -> SchemaRef { Arc::new(Schema::new(vec![Field::new("c1", DataType::Boolean, true)])) @@ -261,7 +260,6 @@ mod tests { fn parquet_exec() -> Arc { Arc::new(ParquetExec::new( FileScanConfig { - object_store: TestObjectStore::new_arc(&[("x", 100)]), object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), file_schema: schema(), file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs index 8f4d30be0e17..ae1efb2979c1 100644 --- a/datafusion/core/src/physical_plan/file_format/avro.rs +++ b/datafusion/core/src/physical_plan/file_format/avro.rs @@ -130,8 +130,12 @@ impl ExecutionPlan for AvroExec { } }; + let object_store = context + .runtime_env() + .object_store(&self.base_config.object_store_url)?; + Ok(Box::pin(FileStream::new( - Arc::clone(&self.base_config.object_store), + object_store, self.base_config.file_groups[partition].clone(), fun, Arc::clone(&self.projected_schema), @@ -188,7 +192,6 @@ mod tests { let file_schema = AvroFormat {}.infer_schema(&store, &[meta.clone()]).await?; let avro_exec = AvroExec::new(FileScanConfig { - object_store: Arc::new(LocalFileSystem {}), object_store_url: ObjectStoreUrl::local_filesystem(), file_groups: vec![vec![meta.into()]], file_schema, @@ -258,7 +261,6 @@ mod tests { let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]); let avro_exec = AvroExec::new(FileScanConfig { - object_store, object_store_url, file_groups: vec![vec![meta.into()]], file_schema, @@ -328,7 +330,6 @@ mod tests { // select specific columns of the files as well as the partitioning // column which is supposed to be the last column in the table schema. projection: Some(vec![0, 1, file_schema.fields().len(), 2]), - object_store, object_store_url, file_groups: vec![vec![partitioned_file]], file_schema, diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs index b78662e46713..7dddb70e95a9 100644 --- a/datafusion/core/src/physical_plan/file_format/csv.rs +++ b/datafusion/core/src/physical_plan/file_format/csv.rs @@ -137,8 +137,12 @@ impl ExecutionPlan for CsvExec { )) as BatchIter }; + let object_store = context + .runtime_env() + .object_store(&self.base_config.object_store_url)?; + Ok(Box::pin(FileStream::new( - Arc::clone(&self.base_config.object_store), + object_store, self.base_config.file_groups[partition].clone(), fun, Arc::clone(&self.projected_schema), diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index 3a179a7a29fb..397fee5fe234 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -116,8 +116,12 @@ impl ExecutionPlan for NdJsonExec { as BatchIter }; + let object_store = context + .runtime_env() + .object_store(&self.base_config.object_store_url)?; + Ok(Box::pin(FileStream::new( - Arc::clone(&self.base_config.object_store), + object_store, self.base_config.file_groups[partition].clone(), fun, Arc::clone(&self.projected_schema), @@ -192,28 +196,24 @@ mod tests { use arrow::datatypes::{Field, Schema}; use futures::StreamExt; - use crate::datafusion_data_access::object_store::local::LocalFileSystem; use crate::datasource::file_format::{json::JsonFormat, FileFormat}; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; use crate::prelude::NdJsonReadOptions; use crate::prelude::*; use datafusion_data_access::object_store::local::local_unpartitioned_file; - use datafusion_data_access::object_store::ObjectStore; use tempfile::TempDir; use super::*; const TEST_DATA_BASE: &str = "tests/jsons"; - async fn prepare_store() -> ( - Arc, - ObjectStoreUrl, - Vec>, - SchemaRef, - ) { - let store = Arc::new(LocalFileSystem {}) as _; + async fn prepare_store( + ctx: &SessionContext, + ) -> (ObjectStoreUrl, Vec>, SchemaRef) { let store_url = ObjectStoreUrl::local_filesystem(); + let store = ctx.runtime_env().object_store(&store_url).unwrap(); + let path = format!("{}/1.json", TEST_DATA_BASE); let meta = local_unpartitioned_file(path); let schema = JsonFormat::default() @@ -221,7 +221,7 @@ mod tests { .await .unwrap(); - (store, store_url, vec![vec![meta.into()]], schema) + (store_url, vec![vec![meta.into()]], schema) } #[tokio::test] @@ -230,11 +230,10 @@ mod tests { let task_ctx = session_ctx.task_ctx(); use arrow::datatypes::DataType; - let (object_store, object_store_url, file_groups, file_schema) = - prepare_store().await; + let (object_store_url, file_groups, file_schema) = + prepare_store(&session_ctx).await; let exec = NdJsonExec::new(FileScanConfig { - object_store, object_store_url, file_groups, file_schema, @@ -289,8 +288,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); use arrow::datatypes::DataType; - let (object_store, object_store_url, file_groups, actual_schema) = - prepare_store().await; + let (object_store_url, file_groups, actual_schema) = + prepare_store(&session_ctx).await; let mut fields = actual_schema.fields().clone(); fields.push(Field::new("missing_col", DataType::Int32, true)); @@ -299,7 +298,6 @@ mod tests { let file_schema = Arc::new(Schema::new(fields)); let exec = NdJsonExec::new(FileScanConfig { - object_store, object_store_url, file_groups, file_schema, @@ -330,11 +328,10 @@ mod tests { async fn nd_json_exec_file_projection() -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); - let (object_store, object_store_url, file_groups, file_schema) = - prepare_store().await; + let (object_store_url, file_groups, file_schema) = + prepare_store(&session_ctx).await; let exec = NdJsonExec::new(FileScanConfig { - object_store, object_store_url, file_groups, file_schema, diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index 0fed497d3c61..86015c472463 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -45,7 +45,6 @@ use crate::{ }; use arrow::array::{new_null_array, UInt16BufferBuilder}; use arrow::record_batch::RecordBatchOptions; -use datafusion_data_access::object_store::ObjectStore; use lazy_static::lazy_static; use log::info; use std::{ @@ -66,8 +65,6 @@ lazy_static! { /// any given file format. #[derive(Debug, Clone)] pub struct FileScanConfig { - /// Store from which the `files` should be fetched - pub object_store: Arc, /// Object store URL pub object_store_url: ObjectStoreUrl, /// Schema before projection. It contains the columns that are expected @@ -402,7 +399,7 @@ fn create_dict_array( #[cfg(test)] mod tests { use crate::{ - test::{build_table_i32, columns, object_store::TestObjectStore}, + test::{build_table_i32, columns}, test_util::aggr_test_schema, }; @@ -659,7 +656,6 @@ mod tests { file_schema, file_groups: vec![vec![]], limit: None, - object_store: TestObjectStore::new_arc(&[]), object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), projection, statistics, diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 81c4dd4279b4..b1e629be5357 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -215,11 +215,15 @@ impl ExecutionPlan for ParquetExec { &self.base_config.table_partition_cols, ); + let object_store = context + .runtime_env() + .object_store(&self.base_config.object_store_url)?; + let stream = ParquetExecStream { error: false, partition_index, metrics: self.metrics.clone(), - object_store: self.base_config.object_store.clone(), + object_store, pruning_predicate: self.pruning_predicate.clone(), batch_size: context.session_config().batch_size, schema: self.projected_schema.clone(), @@ -686,7 +690,6 @@ mod tests { // prepare the scan let parquet_exec = ParquetExec::new( FileScanConfig { - object_store: Arc::new(LocalFileSystem {}), object_store_url: ObjectStoreUrl::local_filesystem(), file_groups: vec![file_groups], file_schema, @@ -1073,7 +1076,6 @@ mod tests { ) -> Result<()> { let parquet_exec = ParquetExec::new( FileScanConfig { - object_store: Arc::new(LocalFileSystem {}), object_store_url: ObjectStoreUrl::local_filesystem(), file_groups, file_schema, @@ -1139,9 +1141,15 @@ mod tests { async fn parquet_exec_with_partition() -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); + + let object_store_url = ObjectStoreUrl::local_filesystem(); + let store = session_ctx + .runtime_env() + .object_store(&object_store_url) + .unwrap(); + let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/alltypes_plain.parquet", testdata); - let store = Arc::new(LocalFileSystem {}) as _; let meta = local_unpartitioned_file(filename); @@ -1162,8 +1170,7 @@ mod tests { let parquet_exec = ParquetExec::new( FileScanConfig { - object_store: store, - object_store_url: ObjectStoreUrl::local_filesystem(), + object_store_url, file_groups: vec![vec![partitioned_file]], file_schema: schema, statistics: Statistics::default(), @@ -1222,7 +1229,6 @@ mod tests { let parquet_exec = ParquetExec::new( FileScanConfig { - object_store: Arc::new(LocalFileSystem {}), object_store_url: ObjectStoreUrl::local_filesystem(), file_groups: vec![vec![partitioned_file]], file_schema: Arc::new(Schema::empty()), diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 19a9db9a4ed2..dd00a50286e0 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -29,9 +29,7 @@ use array::{Array, ArrayRef}; use arrow::array::{self, DecimalBuilder, Int32Array}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; -use datafusion_data_access::object_store::local::{ - local_unpartitioned_file, LocalFileSystem, -}; +use datafusion_data_access::object_store::local::local_unpartitioned_file; use futures::{Future, FutureExt}; use std::fs::File; use std::io::prelude::*; @@ -120,7 +118,6 @@ pub fn partitioned_csv_config( }; Ok(FileScanConfig { - object_store: Arc::new(LocalFileSystem {}), object_store_url: ObjectStoreUrl::local_filesystem(), file_schema: schema, file_groups, diff --git a/datafusion/core/src/test/object_store.rs b/datafusion/core/src/test/object_store.rs index 95a6f16ed3c6..fdd05334670f 100644 --- a/datafusion/core/src/test/object_store.rs +++ b/datafusion/core/src/test/object_store.rs @@ -26,9 +26,16 @@ use crate::datafusion_data_access::{ object_store::{FileMetaStream, ListEntryStream, ObjectReader, ObjectStore}, FileMeta, Result, SizedFile, }; +use crate::prelude::SessionContext; use async_trait::async_trait; use futures::{stream, AsyncRead, StreamExt}; +/// Returns a test object store with the provided `ctx` +pub(crate) fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) { + ctx.runtime_env() + .register_object_store("test", TestObjectStore::new_arc(files)); +} + #[derive(Debug)] /// An object store implem that is useful for testing. /// `ObjectReader`s are filled with zero bytes. diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs index a7e6ea45252d..e3da9d986e0d 100644 --- a/datafusion/core/tests/path_partition.rs +++ b/datafusion/core/tests/path_partition.rs @@ -54,7 +54,7 @@ async fn parquet_distinct_partition_col() -> Result<()> { "year=2021/month=10/day=28/file.parquet", ], &["year", "month", "day"], - "", + "mirror:///", "alltypes_plain.parquet", ) .await; @@ -183,7 +183,7 @@ async fn csv_filter_with_file_col() -> Result<()> { "mytable/date=2021-10-28/file.csv", ], &["date"], - "file:///mytable", + "mirror:///mytable", ); let result = ctx @@ -219,7 +219,7 @@ async fn csv_projection_on_partition() -> Result<()> { "mytable/date=2021-10-28/file.csv", ], &["date"], - "file:///mytable", + "mirror:///mytable", ); let result = ctx @@ -256,7 +256,7 @@ async fn csv_grouping_by_partition() -> Result<()> { "mytable/date=2021-10-28/file.csv", ], &["date"], - "file:///mytable", + "mirror:///mytable", ); let result = ctx @@ -290,7 +290,7 @@ async fn parquet_multiple_partitions() -> Result<()> { "year=2021/month=10/day=28/file.parquet", ], &["year", "month", "day"], - "", + "mirror:///", "alltypes_plain.parquet", ) .await; @@ -332,7 +332,7 @@ async fn parquet_statistics() -> Result<()> { "year=2021/month=10/day=28/file.parquet", ], &["year", "month", "day"], - "", + "mirror:///", // This is the only file we found in the test set with // actual stats. It has 1 column / 1 row. "single_nan.parquet", @@ -392,7 +392,7 @@ async fn parquet_overlapping_columns() -> Result<()> { "id=3/file.parquet", ], &["id"], - "", + "mirror:///", "alltypes_plain.parquet", ) .await; @@ -415,13 +415,16 @@ fn register_partitioned_aggregate_csv( let testdata = arrow_test_data(); let csv_file_path = format!("{}/csv/aggregate_test_100.csv", testdata); let file_schema = test_util::aggr_test_schema(); - let object_store = MirroringObjectStore::new_arc(csv_file_path, store_paths); + ctx.runtime_env().register_object_store( + "mirror", + MirroringObjectStore::new_arc(csv_file_path, store_paths), + ); let mut options = ListingOptions::new(Arc::new(CsvFormat::default())); options.table_partition_cols = partition_cols.iter().map(|&s| s.to_owned()).collect(); let table_path = ListingTableUrl::parse(table_path).unwrap(); - let config = ListingTableConfig::new(object_store, table_path) + let config = ListingTableConfig::new(table_path) .with_listing_options(options) .with_schema(file_schema); let table = ListingTable::try_new(config).unwrap(); @@ -439,23 +442,25 @@ async fn register_partitioned_alltypes_parquet( ) { let testdata = parquet_test_data(); let parquet_file_path = format!("{}/{}", testdata, source_file); - let object_store = - MirroringObjectStore::new_arc(parquet_file_path.clone(), store_paths); + ctx.runtime_env().register_object_store( + "mirror", + MirroringObjectStore::new_arc(parquet_file_path.clone(), store_paths), + ); let mut options = ListingOptions::new(Arc::new(ParquetFormat::default())); options.table_partition_cols = partition_cols.iter().map(|&s| s.to_owned()).collect(); options.collect_stat = true; - let table_path = ListingTableUrl::parse(format!("mirror:///{}", table_path)).unwrap(); + let table_path = ListingTableUrl::parse(table_path).unwrap(); let store_path = ListingTableUrl::parse(format!("mirror:///{}", store_paths[0])).unwrap(); let file_schema = options - .infer_schema(Arc::clone(&object_store), &store_path) + .infer_schema(&ctx.state(), &store_path) .await .expect("Parquet schema inference failed"); - let config = ListingTableConfig::new(object_store, table_path) + let config = ListingTableConfig::new(table_path) .with_listing_options(options) .with_schema(file_schema); diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs index 947ebc699b48..1de6af06ad2c 100644 --- a/datafusion/core/tests/row.rs +++ b/datafusion/core/tests/row.rs @@ -97,7 +97,6 @@ async fn get_exec( let exec = format .create_physical_plan( FileScanConfig { - object_store, object_store_url, file_schema, file_groups, diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs index 4eff00e01e8d..c6ab11458ff4 100644 --- a/datafusion/proto/src/logical_plan.rs +++ b/datafusion/proto/src/logical_plan.rs @@ -420,15 +420,7 @@ impl AsLogicalPlan for LogicalPlanNode { target_partitions: scan.target_partitions as usize, }; - let object_store = ctx.runtime_env().object_store(&table_path)?; - - println!( - "Found object store {:?} for path {}", - object_store, - scan.path.as_str() - ); - - let config = ListingTableConfig::new(object_store, table_path) + let config = ListingTableConfig::new(table_path) .with_listing_options(options) .with_schema(Arc::new(schema)); diff --git a/dev/build-arrow-ballista.sh b/dev/build-arrow-ballista.sh index 1d287c46019c..82c0be1099fb 100755 --- a/dev/build-arrow-ballista.sh +++ b/dev/build-arrow-ballista.sh @@ -24,7 +24,7 @@ rm -rf arrow-ballista 2>/dev/null # clone the repo # TODO make repo/branch configurable -git clone https://github.com/tustvold/arrow-ballista -b session-state-table-provider +git clone https://github.com/tustvold/arrow-ballista -b remove-object-store-plans # update dependencies to local crates python ./dev/make-ballista-deps-local.py