From 12a63163a9ed872a8df82f8a23e6c4253f3eb8b9 Mon Sep 17 00:00:00 2001 From: Chih Wang Date: Thu, 26 Oct 2023 05:06:47 +0800 Subject: [PATCH] Add `parquet` feature flag, enabled by default, and make parquet conditional (#7745) * Make parquet an option by adding multiple cfg attributes without significant code changes. * Extract parquet logic into submodule from execution::context * Extract parquet logic into submodule from datafusion_core::dataframe * Extract more logic into submodule from execution::context * Move tests from execution::context * Rename submodules --- datafusion-cli/Cargo.toml | 2 +- datafusion/common/Cargo.toml | 3 +- datafusion/common/src/test_util.rs | 1 + datafusion/core/Cargo.toml | 10 +- .../src/{dataframe.rs => dataframe/mod.rs} | 151 +------- datafusion/core/src/dataframe/parquet.rs | 162 ++++++++ .../file_format/file_compression_type.rs | 18 +- .../core/src/datasource/file_format/mod.rs | 1 + .../src/datasource/file_format/options.rs | 7 +- .../core/src/datasource/listing/table.rs | 13 +- .../src/datasource/listing_table_factory.rs | 8 +- datafusion/core/src/datasource/mod.rs | 1 + .../core/src/datasource/physical_plan/mod.rs | 6 +- datafusion/core/src/execution/context/avro.rs | 83 ++++ datafusion/core/src/execution/context/csv.rs | 143 +++++++ datafusion/core/src/execution/context/json.rs | 69 ++++ .../execution/{context.rs => context/mod.rs} | 363 +----------------- .../core/src/execution/context/parquet.rs | 154 ++++++++ datafusion/core/src/lib.rs | 1 + .../enforce_distribution.rs | 67 +++- datafusion/core/src/physical_planner.rs | 2 + datafusion/core/src/test_util/mod.rs | 77 +++- datafusion/proto/Cargo.toml | 3 +- datafusion/proto/src/logical_plan/mod.rs | 66 ++-- datafusion/proto/src/physical_plan/mod.rs | 222 ++++++----- 25 files changed, 994 insertions(+), 639 deletions(-) rename datafusion/core/src/{dataframe.rs => dataframe/mod.rs} (94%) create mode 100644 datafusion/core/src/dataframe/parquet.rs create mode 100644 datafusion/core/src/execution/context/avro.rs create mode 100644 datafusion/core/src/execution/context/csv.rs create mode 100644 datafusion/core/src/execution/context/json.rs rename datafusion/core/src/execution/{context.rs => context/mod.rs} (87%) create mode 100644 datafusion/core/src/execution/context/parquet.rs diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 64e094437c5f..7dd9cb8bcb37 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -34,7 +34,7 @@ async-trait = "0.1.41" aws-config = "0.55" aws-credential-types = "0.55" clap = { version = "3", features = ["derive", "cargo"] } -datafusion = { path = "../datafusion/core", version = "32.0.0", features = ["avro", "crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions", "compression"] } +datafusion = { path = "../datafusion/core", version = "32.0.0", features = ["avro", "crypto_expressions", "encoding_expressions", "parquet", "regex_expressions", "unicode_expressions", "compression"] } dirs = "4.0.0" env_logger = "0.9" mimalloc = { version = "0.1", default-features = false } diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index 047c502d5cc2..490fbeacad85 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -35,8 +35,7 @@ path = "src/lib.rs" [features] avro = ["apache-avro"] backtrace = [] -default = ["parquet"] -pyarrow = ["pyo3", "arrow/pyarrow"] +pyarrow = ["pyo3", "arrow/pyarrow", "parquet"] [dependencies] ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] } diff --git a/datafusion/common/src/test_util.rs b/datafusion/common/src/test_util.rs index 60f1df7fd11a..9a4433782157 100644 --- a/datafusion/common/src/test_util.rs +++ b/datafusion/common/src/test_util.rs @@ -180,6 +180,7 @@ pub fn arrow_test_data() -> String { /// let filename = format!("{}/binary.parquet", testdata); /// assert!(std::path::PathBuf::from(filename).exists()); /// ``` +#[cfg(feature = "parquet")] pub fn parquet_test_data() -> String { match get_data_dir("PARQUET_TEST_DATA", "../../parquet-testing/data") { Ok(pb) => pb.display().to_string(), diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 30e0d005e92e..5f9d28bd620b 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -39,11 +39,12 @@ avro = ["apache-avro", "num-traits", "datafusion-common/avro"] backtrace = ["datafusion-common/backtrace"] compression = ["xz2", "bzip2", "flate2", "zstd", "async-compression"] crypto_expressions = ["datafusion-physical-expr/crypto_expressions", "datafusion-optimizer/crypto_expressions"] -default = ["crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions", "compression"] +default = ["crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions", "compression", "parquet"] encoding_expressions = ["datafusion-physical-expr/encoding_expressions"] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = [] -pyarrow = ["datafusion-common/pyarrow"] +parquet = ["datafusion-common/parquet", "dep:parquet"] +pyarrow = ["datafusion-common/pyarrow", "parquet"] regex_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion-optimizer/regex_expressions"] serde = ["arrow-schema/serde"] simd = ["arrow/simd"] @@ -61,7 +62,7 @@ bytes = "1.4" bzip2 = { version = "0.4.3", optional = true } chrono = { workspace = true } dashmap = "5.4.0" -datafusion-common = { path = "../common", version = "32.0.0", features = ["parquet", "object_store"] } +datafusion-common = { path = "../common", version = "32.0.0", features = ["object_store"], default-features = false } datafusion-execution = { path = "../execution", version = "32.0.0" } datafusion-expr = { path = "../expr", version = "32.0.0" } datafusion-optimizer = { path = "../optimizer", version = "32.0.0", default-features = false } @@ -80,7 +81,7 @@ num-traits = { version = "0.2", optional = true } num_cpus = "1.13.0" object_store = "0.7.0" parking_lot = "0.12" -parquet = { workspace = true } +parquet = { workspace = true, optional = true } percent-encoding = "2.2.0" pin-project-lite = "^0.2.7" rand = "0.8" @@ -93,7 +94,6 @@ uuid = { version = "1.0", features = ["v4"] } xz2 = { version = "0.1", optional = true } zstd = { version = "0.13", optional = true, default-features = false } - [dev-dependencies] async-trait = "0.1.53" bigdecimal = "0.4.1" diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe/mod.rs similarity index 94% rename from datafusion/core/src/dataframe.rs rename to datafusion/core/src/dataframe/mod.rs index 2e192c2a782e..0a99c331826c 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -17,6 +17,9 @@ //! [`DataFrame`] API for building and executing query plans. +#[cfg(feature = "parquet")] +mod parquet; + use std::any::Any; use std::sync::Arc; @@ -27,15 +30,11 @@ use arrow::datatypes::{DataType, Field}; use async_trait::async_trait; use datafusion_common::file_options::csv_writer::CsvWriterOptions; use datafusion_common::file_options::json_writer::JsonWriterOptions; -use datafusion_common::file_options::parquet_writer::{ - default_builder, ParquetWriterOptions, -}; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{ DataFusionError, FileType, FileTypeWriterOptions, SchemaError, UnnestOptions, }; use datafusion_expr::dml::CopyOptions; -use parquet::file::properties::WriterProperties; use datafusion_common::{Column, DFSchema, ScalarValue}; use datafusion_expr::{ @@ -1067,40 +1066,6 @@ impl DataFrame { DataFrame::new(self.session_state, plan).collect().await } - /// Write a `DataFrame` to a Parquet file. - pub async fn write_parquet( - self, - path: &str, - options: DataFrameWriteOptions, - writer_properties: Option, - ) -> Result, DataFusionError> { - if options.overwrite { - return Err(DataFusionError::NotImplemented( - "Overwrites are not implemented for DataFrame::write_parquet.".to_owned(), - )); - } - match options.compression{ - CompressionTypeVariant::UNCOMPRESSED => (), - _ => return Err(DataFusionError::Configuration("DataFrame::write_parquet method does not support compression set via DataFrameWriteOptions. Set parquet compression via writer_properties instead.".to_owned())) - } - let props = match writer_properties { - Some(props) => props, - None => default_builder(self.session_state.config_options())?.build(), - }; - let file_type_writer_options = - FileTypeWriterOptions::Parquet(ParquetWriterOptions::new(props)); - let copy_options = CopyOptions::WriterOptions(Box::new(file_type_writer_options)); - let plan = LogicalPlanBuilder::copy_to( - self.plan, - path.into(), - FileType::PARQUET, - options.single_file_output, - copy_options, - )? - .build()?; - DataFrame::new(self.session_state, plan).collect().await - } - /// Executes a query and writes the results to a partitioned JSON file. pub async fn write_json( self, @@ -1365,19 +1330,12 @@ mod tests { WindowFunction, }; use datafusion_physical_expr::expressions::Column; - use object_store::local::LocalFileSystem; - use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel}; - use parquet::file::reader::FileReader; - use tempfile::TempDir; - use url::Url; use crate::execution::context::SessionConfig; - use crate::execution::options::{CsvReadOptions, ParquetReadOptions}; use crate::physical_plan::ColumnarValue; use crate::physical_plan::Partitioning; use crate::physical_plan::PhysicalExpr; - use crate::test_util; - use crate::test_util::parquet_test_data; + use crate::test_util::{register_aggregate_csv, test_table, test_table_with_name}; use crate::{assert_batches_sorted_eq, execution::context::SessionContext}; use super::*; @@ -1798,31 +1756,6 @@ mod tests { Ok(ctx.sql(sql).await?.into_unoptimized_plan()) } - async fn test_table_with_name(name: &str) -> Result { - let mut ctx = SessionContext::new(); - register_aggregate_csv(&mut ctx, name).await?; - ctx.table(name).await - } - - async fn test_table() -> Result { - test_table_with_name("aggregate_test_100").await - } - - async fn register_aggregate_csv( - ctx: &mut SessionContext, - table_name: &str, - ) -> Result<()> { - let schema = test_util::aggr_test_schema(); - let testdata = test_util::arrow_test_data(); - ctx.register_csv( - table_name, - &format!("{testdata}/csv/aggregate_test_100.csv"), - CsvReadOptions::new().schema(schema.as_ref()), - ) - .await?; - Ok(()) - } - #[tokio::test] async fn with_column() -> Result<()> { let df = test_table().await?.select_columns(&["c1", "c2", "c3"])?; @@ -2227,33 +2160,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn filter_pushdown_dataframe() -> Result<()> { - let ctx = SessionContext::new(); - - ctx.register_parquet( - "test", - &format!("{}/alltypes_plain.snappy.parquet", parquet_test_data()), - ParquetReadOptions::default(), - ) - .await?; - - ctx.register_table("t1", ctx.table("test").await?.into_view())?; - - let df = ctx - .table("t1") - .await? - .filter(col("id").eq(lit(1)))? - .select_columns(&["bool_col", "int_col"])?; - - let plan = df.explain(false, false)?.collect().await?; - // Filters all the way to Parquet - let formatted = pretty::pretty_format_batches(&plan)?.to_string(); - assert!(formatted.contains("FilterExec: id@0 = 1")); - - Ok(()) - } - #[tokio::test] async fn cast_expr_test() -> Result<()> { let df = test_table() @@ -2538,53 +2444,4 @@ mod tests { Ok(()) } - - #[tokio::test] - async fn write_parquet_with_compression() -> Result<()> { - let test_df = test_table().await?; - - let output_path = "file://local/test.parquet"; - let test_compressions = vec![ - parquet::basic::Compression::SNAPPY, - parquet::basic::Compression::LZ4, - parquet::basic::Compression::LZ4_RAW, - parquet::basic::Compression::GZIP(GzipLevel::default()), - parquet::basic::Compression::BROTLI(BrotliLevel::default()), - parquet::basic::Compression::ZSTD(ZstdLevel::default()), - ]; - for compression in test_compressions.into_iter() { - let df = test_df.clone(); - let tmp_dir = TempDir::new()?; - let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?); - let local_url = Url::parse("file://local").unwrap(); - let ctx = &test_df.session_state; - ctx.runtime_env().register_object_store(&local_url, local); - df.write_parquet( - output_path, - DataFrameWriteOptions::new().with_single_file_output(true), - Some( - WriterProperties::builder() - .set_compression(compression) - .build(), - ), - ) - .await?; - - // Check that file actually used the specified compression - let file = std::fs::File::open(tmp_dir.into_path().join("test.parquet"))?; - - let reader = - parquet::file::serialized_reader::SerializedFileReader::new(file) - .unwrap(); - - let parquet_metadata = reader.metadata(); - - let written_compression = - parquet_metadata.row_group(0).column(0).compression(); - - assert_eq!(written_compression, compression); - } - - Ok(()) - } } diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs new file mode 100644 index 000000000000..36ef90c987e3 --- /dev/null +++ b/datafusion/core/src/dataframe/parquet.rs @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_common::file_options::parquet_writer::{ + default_builder, ParquetWriterOptions, +}; +use parquet::file::properties::WriterProperties; + +use super::{ + CompressionTypeVariant, CopyOptions, DataFrame, DataFrameWriteOptions, + DataFusionError, FileType, FileTypeWriterOptions, LogicalPlanBuilder, RecordBatch, +}; + +impl DataFrame { + /// Write a `DataFrame` to a Parquet file. + pub async fn write_parquet( + self, + path: &str, + options: DataFrameWriteOptions, + writer_properties: Option, + ) -> Result, DataFusionError> { + if options.overwrite { + return Err(DataFusionError::NotImplemented( + "Overwrites are not implemented for DataFrame::write_parquet.".to_owned(), + )); + } + match options.compression{ + CompressionTypeVariant::UNCOMPRESSED => (), + _ => return Err(DataFusionError::Configuration("DataFrame::write_parquet method does not support compression set via DataFrameWriteOptions. Set parquet compression via writer_properties instead.".to_owned())) + } + let props = match writer_properties { + Some(props) => props, + None => default_builder(self.session_state.config_options())?.build(), + }; + let file_type_writer_options = + FileTypeWriterOptions::Parquet(ParquetWriterOptions::new(props)); + let copy_options = CopyOptions::WriterOptions(Box::new(file_type_writer_options)); + let plan = LogicalPlanBuilder::copy_to( + self.plan, + path.into(), + FileType::PARQUET, + options.single_file_output, + copy_options, + )? + .build()?; + DataFrame::new(self.session_state, plan).collect().await + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use object_store::local::LocalFileSystem; + use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel}; + use parquet::file::reader::FileReader; + use tempfile::TempDir; + use url::Url; + + use datafusion_expr::{col, lit}; + + use crate::arrow::util::pretty; + use crate::execution::context::SessionContext; + use crate::execution::options::ParquetReadOptions; + use crate::test_util; + + use super::super::Result; + use super::*; + + #[tokio::test] + async fn filter_pushdown_dataframe() -> Result<()> { + let ctx = SessionContext::new(); + + ctx.register_parquet( + "test", + &format!( + "{}/alltypes_plain.snappy.parquet", + test_util::parquet_test_data() + ), + ParquetReadOptions::default(), + ) + .await?; + + ctx.register_table("t1", ctx.table("test").await?.into_view())?; + + let df = ctx + .table("t1") + .await? + .filter(col("id").eq(lit(1)))? + .select_columns(&["bool_col", "int_col"])?; + + let plan = df.explain(false, false)?.collect().await?; + // Filters all the way to Parquet + let formatted = pretty::pretty_format_batches(&plan)?.to_string(); + assert!(formatted.contains("FilterExec: id@0 = 1")); + + Ok(()) + } + + #[tokio::test] + async fn write_parquet_with_compression() -> Result<()> { + let test_df = test_util::test_table().await?; + + let output_path = "file://local/test.parquet"; + let test_compressions = vec![ + parquet::basic::Compression::SNAPPY, + parquet::basic::Compression::LZ4, + parquet::basic::Compression::LZ4_RAW, + parquet::basic::Compression::GZIP(GzipLevel::default()), + parquet::basic::Compression::BROTLI(BrotliLevel::default()), + parquet::basic::Compression::ZSTD(ZstdLevel::default()), + ]; + for compression in test_compressions.into_iter() { + let df = test_df.clone(); + let tmp_dir = TempDir::new()?; + let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?); + let local_url = Url::parse("file://local").unwrap(); + let ctx = &test_df.session_state; + ctx.runtime_env().register_object_store(&local_url, local); + df.write_parquet( + output_path, + DataFrameWriteOptions::new().with_single_file_output(true), + Some( + WriterProperties::builder() + .set_compression(compression) + .build(), + ), + ) + .await?; + + // Check that file actually used the specified compression + let file = std::fs::File::open(tmp_dir.into_path().join("test.parquet"))?; + + let reader = + parquet::file::serialized_reader::SerializedFileReader::new(file) + .unwrap(); + + let parquet_metadata = reader.metadata(); + + let written_compression = + parquet_metadata.row_group(0).column(0).compression(); + + assert_eq!(written_compression, compression); + } + + Ok(()) + } +} diff --git a/datafusion/core/src/datasource/file_format/file_compression_type.rs b/datafusion/core/src/datasource/file_format/file_compression_type.rs index bd2868767090..3dac7c293050 100644 --- a/datafusion/core/src/datasource/file_format/file_compression_type.rs +++ b/datafusion/core/src/datasource/file_format/file_compression_type.rs @@ -237,7 +237,14 @@ impl FileTypeExt for FileType { match self { FileType::JSON | FileType::CSV => Ok(format!("{}{}", ext, c.get_ext())), - FileType::PARQUET | FileType::AVRO | FileType::ARROW => match c.variant { + FileType::AVRO | FileType::ARROW => match c.variant { + UNCOMPRESSED => Ok(ext), + _ => Err(DataFusionError::Internal( + "FileCompressionType can be specified for CSV/JSON FileType.".into(), + )), + }, + #[cfg(feature = "parquet")] + FileType::PARQUET => match c.variant { UNCOMPRESSED => Ok(ext), _ => Err(DataFusionError::Internal( "FileCompressionType can be specified for CSV/JSON FileType.".into(), @@ -276,10 +283,13 @@ mod tests { ); } + let mut ty_ext_tuple = vec![]; + ty_ext_tuple.push((FileType::AVRO, ".avro")); + #[cfg(feature = "parquet")] + ty_ext_tuple.push((FileType::PARQUET, ".parquet")); + // Cannot specify compression for these file types - for (file_type, extension) in - [(FileType::AVRO, ".avro"), (FileType::PARQUET, ".parquet")] - { + for (file_type, extension) in ty_ext_tuple { assert_eq!( file_type .get_ext_with_compression(FileCompressionType::UNCOMPRESSED) diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 293f062d86a9..b541e2a1d44c 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -27,6 +27,7 @@ pub mod csv; pub mod file_compression_type; pub mod json; pub mod options; +#[cfg(feature = "parquet")] pub mod parquet; pub mod write; diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index 40d9878a0134..41a70e6d2f8f 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -25,12 +25,12 @@ use datafusion_common::{plan_err, DataFusionError}; use crate::datasource::file_format::arrow::ArrowFormat; use crate::datasource::file_format::file_compression_type::FileCompressionType; +#[cfg(feature = "parquet")] +use crate::datasource::file_format::parquet::ParquetFormat; use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD; use crate::datasource::listing::{ListingTableInsertMode, ListingTableUrl}; use crate::datasource::{ - file_format::{ - avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat, - }, + file_format::{avro::AvroFormat, csv::CsvFormat, json::JsonFormat}, listing::ListingOptions, }; use crate::error::Result; @@ -542,6 +542,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> { } } +#[cfg(feature = "parquet")] #[async_trait] impl ReadOptions<'_> for ParquetReadOptions<'_> { fn to_listing_options(&self, config: &SessionConfig) -> ListingOptions { diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index bd878932d80f..822a78a5522a 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -23,6 +23,8 @@ use std::{any::Any, sync::Arc}; use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files}; use super::PartitionedFile; +#[cfg(feature = "parquet")] +use crate::datasource::file_format::parquet::ParquetFormat; use crate::datasource::{ file_format::{ arrow::ArrowFormat, @@ -30,7 +32,6 @@ use crate::datasource::{ csv::CsvFormat, file_compression_type::{FileCompressionType, FileTypeExt}, json::JsonFormat, - parquet::ParquetFormat, FileFormat, }, get_statistics_with_limit, @@ -150,6 +151,7 @@ impl ListingTableConfig { FileType::JSON => Arc::new( JsonFormat::default().with_file_compression_type(file_compression_type), ), + #[cfg(feature = "parquet")] FileType::PARQUET => Arc::new(ParquetFormat::default()), }; @@ -1019,15 +1021,15 @@ mod tests { use std::fs::File; use super::*; + #[cfg(feature = "parquet")] + use crate::datasource::file_format::parquet::ParquetFormat; use crate::datasource::{provider_as_source, MemTable}; use crate::execution::options::ArrowReadOptions; use crate::physical_plan::collect; use crate::prelude::*; use crate::{ assert_batches_eq, - datasource::file_format::{ - avro::AvroFormat, file_compression_type::FileTypeExt, parquet::ParquetFormat, - }, + datasource::file_format::{avro::AvroFormat, file_compression_type::FileTypeExt}, execution::options::ReadOptions, logical_expr::{col, lit}, test::{columns, object_store::register_test_store}, @@ -1090,6 +1092,7 @@ mod tests { Ok(()) } + #[cfg(feature = "parquet")] #[tokio::test] async fn load_table_stats_by_default() -> Result<()> { let testdata = crate::test_util::parquet_test_data(); @@ -1113,6 +1116,7 @@ mod tests { Ok(()) } + #[cfg(feature = "parquet")] #[tokio::test] async fn load_table_stats_when_no_stats() -> Result<()> { let testdata = crate::test_util::parquet_test_data(); @@ -1137,6 +1141,7 @@ mod tests { Ok(()) } + #[cfg(feature = "parquet")] #[tokio::test] async fn test_try_create_output_ordering() { let testdata = crate::test_util::parquet_test_data(); diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index e74bf6fa6499..26f40518979a 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -23,10 +23,11 @@ use std::sync::Arc; use super::listing::ListingTableInsertMode; +#[cfg(feature = "parquet")] +use crate::datasource::file_format::parquet::ParquetFormat; use crate::datasource::file_format::{ arrow::ArrowFormat, avro::AvroFormat, csv::CsvFormat, - file_compression_type::FileCompressionType, json::JsonFormat, parquet::ParquetFormat, - FileFormat, + file_compression_type::FileCompressionType, json::JsonFormat, FileFormat, }; use crate::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, @@ -79,6 +80,7 @@ impl TableProviderFactory for ListingTableFactory { .with_delimiter(cmd.delimiter as u8) .with_file_compression_type(file_compression_type), ), + #[cfg(feature = "parquet")] FileType::PARQUET => Arc::new(ParquetFormat::default()), FileType::AVRO => Arc::new(AvroFormat), FileType::JSON => Arc::new( @@ -157,6 +159,7 @@ impl TableProviderFactory for ListingTableFactory { Some(mode) => ListingTableInsertMode::from_str(mode.as_str()), None => match file_type { FileType::CSV => Ok(ListingTableInsertMode::AppendToFile), + #[cfg(feature = "parquet")] FileType::PARQUET => Ok(ListingTableInsertMode::AppendNewFiles), FileType::AVRO => Ok(ListingTableInsertMode::AppendNewFiles), FileType::JSON => Ok(ListingTableInsertMode::AppendToFile), @@ -196,6 +199,7 @@ impl TableProviderFactory for ListingTableFactory { json_writer_options.compression = cmd.file_compression_type; FileTypeWriterOptions::JSON(json_writer_options) } + #[cfg(feature = "parquet")] FileType::PARQUET => file_type_writer_options, FileType::ARROW => file_type_writer_options, FileType::AVRO => file_type_writer_options, diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 455818056f2c..3ace2c239852 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -42,5 +42,6 @@ pub use self::memory::MemTable; pub use self::provider::TableProvider; pub use self::view::ViewTable; pub use crate::logical_expr::TableType; +#[cfg(feature = "parquet")] pub(crate) use statistics::get_col_stats; pub use statistics::get_statistics_with_limit; diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 57844aac5181..3f84f87eb5d5 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -23,17 +23,20 @@ mod csv; mod file_scan_config; mod file_stream; mod json; +#[cfg(feature = "parquet")] pub mod parquet; pub(crate) use self::csv::plan_to_csv; pub use self::csv::{CsvConfig, CsvExec, CsvOpener}; -pub(crate) use self::file_scan_config::PartitionColumnProjector; pub(crate) use self::json::plan_to_json; +#[cfg(feature = "parquet")] pub(crate) use self::parquet::plan_to_parquet; +#[cfg(feature = "parquet")] pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory}; pub use arrow_file::ArrowExec; pub use avro::AvroExec; +use file_scan_config::PartitionColumnProjector; pub use file_scan_config::{ wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, }; @@ -798,6 +801,7 @@ mod tests { } /// Unit tests for `repartition_file_groups()` + #[cfg(feature = "parquet")] mod repartition_file_groups_test { use datafusion_common::Statistics; use itertools::Itertools; diff --git a/datafusion/core/src/execution/context/avro.rs b/datafusion/core/src/execution/context/avro.rs new file mode 100644 index 000000000000..d60e79862ef2 --- /dev/null +++ b/datafusion/core/src/execution/context/avro.rs @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use super::super::options::{AvroReadOptions, ReadOptions}; +use super::{DataFilePaths, DataFrame, Result, SessionContext}; + +impl SessionContext { + /// Creates a [`DataFrame`] for reading an Avro data source. + /// + /// For more control such as reading multiple files, you can use + /// [`read_table`](Self::read_table) with a [`super::ListingTable`]. + /// + /// For an example, see [`read_csv`](Self::read_csv) + pub async fn read_avro( + &self, + table_paths: P, + options: AvroReadOptions<'_>, + ) -> Result { + self._read_type(table_paths, options).await + } + + /// Registers an Avro file as a table that can be referenced from + /// SQL statements executed against this context. + pub async fn register_avro( + &self, + name: &str, + table_path: &str, + options: AvroReadOptions<'_>, + ) -> Result<()> { + let listing_options = options.to_listing_options(&self.copied_config()); + + self.register_listing_table( + name, + table_path, + listing_options, + options.schema.map(|s| Arc::new(s.to_owned())), + None, + ) + .await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use async_trait::async_trait; + + // Test for compilation error when calling read_* functions from an #[async_trait] function. + // See https://github.com/apache/arrow-datafusion/issues/1154 + #[async_trait] + trait CallReadTrait { + async fn call_read_avro(&self) -> DataFrame; + } + + struct CallRead {} + + #[async_trait] + impl CallReadTrait for CallRead { + async fn call_read_avro(&self) -> DataFrame { + let ctx = SessionContext::new(); + ctx.read_avro("dummy", AvroReadOptions::default()) + .await + .unwrap() + } + } +} diff --git a/datafusion/core/src/execution/context/csv.rs b/datafusion/core/src/execution/context/csv.rs new file mode 100644 index 000000000000..f3675422c7d5 --- /dev/null +++ b/datafusion/core/src/execution/context/csv.rs @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use crate::datasource::physical_plan::plan_to_csv; + +use super::super::options::{CsvReadOptions, ReadOptions}; +use super::{DataFilePaths, DataFrame, ExecutionPlan, Result, SessionContext}; + +impl SessionContext { + /// Creates a [`DataFrame`] for reading a CSV data source. + /// + /// For more control such as reading multiple files, you can use + /// [`read_table`](Self::read_table) with a [`super::ListingTable`]. + /// + /// Example usage is given below: + /// + /// ``` + /// use datafusion::prelude::*; + /// # use datafusion::error::Result; + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// let ctx = SessionContext::new(); + /// // You can read a single file using `read_csv` + /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; + /// // you can also read multiple files: + /// let df = ctx.read_csv(vec!["tests/data/example.csv", "tests/data/example.csv"], CsvReadOptions::new()).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn read_csv( + &self, + table_paths: P, + options: CsvReadOptions<'_>, + ) -> Result { + self._read_type(table_paths, options).await + } + + /// Registers a CSV file as a table which can referenced from SQL + /// statements executed against this context. + pub async fn register_csv( + &self, + name: &str, + table_path: &str, + options: CsvReadOptions<'_>, + ) -> Result<()> { + let listing_options = options.to_listing_options(&self.copied_config()); + + self.register_listing_table( + name, + table_path, + listing_options, + options.schema.map(|s| Arc::new(s.to_owned())), + None, + ) + .await?; + + Ok(()) + } + + /// Executes a query and writes the results to a partitioned CSV file. + pub async fn write_csv( + &self, + plan: Arc, + path: impl AsRef, + ) -> Result<()> { + plan_to_csv(self.task_ctx(), plan, path).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::assert_batches_eq; + use crate::test_util::{plan_and_collect, populate_csv_partitions}; + use async_trait::async_trait; + use tempfile::TempDir; + + #[tokio::test] + async fn query_csv_with_custom_partition_extension() -> Result<()> { + let tmp_dir = TempDir::new()?; + + // The main stipulation of this test: use a file extension that isn't .csv. + let file_extension = ".tst"; + + let ctx = SessionContext::new(); + let schema = populate_csv_partitions(&tmp_dir, 2, file_extension)?; + ctx.register_csv( + "test", + tmp_dir.path().to_str().unwrap(), + CsvReadOptions::new() + .schema(&schema) + .file_extension(file_extension), + ) + .await?; + let results = + plan_and_collect(&ctx, "SELECT SUM(c1), SUM(c2), COUNT(*) FROM test").await?; + + assert_eq!(results.len(), 1); + let expected = [ + "+--------------+--------------+----------+", + "| SUM(test.c1) | SUM(test.c2) | COUNT(*) |", + "+--------------+--------------+----------+", + "| 10 | 110 | 20 |", + "+--------------+--------------+----------+", + ]; + assert_batches_eq!(expected, &results); + + Ok(()) + } + + // Test for compilation error when calling read_* functions from an #[async_trait] function. + // See https://github.com/apache/arrow-datafusion/issues/1154 + #[async_trait] + trait CallReadTrait { + async fn call_read_csv(&self) -> DataFrame; + } + + struct CallRead {} + + #[async_trait] + impl CallReadTrait for CallRead { + async fn call_read_csv(&self) -> DataFrame { + let ctx = SessionContext::new(); + ctx.read_csv("dummy", CsvReadOptions::new()).await.unwrap() + } + } +} diff --git a/datafusion/core/src/execution/context/json.rs b/datafusion/core/src/execution/context/json.rs new file mode 100644 index 000000000000..f67693aa8f31 --- /dev/null +++ b/datafusion/core/src/execution/context/json.rs @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use crate::datasource::physical_plan::plan_to_json; + +use super::super::options::{NdJsonReadOptions, ReadOptions}; +use super::{DataFilePaths, DataFrame, ExecutionPlan, Result, SessionContext}; + +impl SessionContext { + /// Creates a [`DataFrame`] for reading an JSON data source. + /// + /// For more control such as reading multiple files, you can use + /// [`read_table`](Self::read_table) with a [`super::ListingTable`]. + /// + /// For an example, see [`read_csv`](Self::read_csv) + pub async fn read_json( + &self, + table_paths: P, + options: NdJsonReadOptions<'_>, + ) -> Result { + self._read_type(table_paths, options).await + } + + /// Registers a JSON file as a table that it can be referenced + /// from SQL statements executed against this context. + pub async fn register_json( + &self, + name: &str, + table_path: &str, + options: NdJsonReadOptions<'_>, + ) -> Result<()> { + let listing_options = options.to_listing_options(&self.copied_config()); + + self.register_listing_table( + name, + table_path, + listing_options, + options.schema.map(|s| Arc::new(s.to_owned())), + None, + ) + .await?; + Ok(()) + } + + /// Executes a query and writes the results to a partitioned JSON file. + pub async fn write_json( + &self, + plan: Arc, + path: impl AsRef, + ) -> Result<()> { + plan_to_json(self.task_ctx(), plan, path).await + } +} diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context/mod.rs similarity index 87% rename from datafusion/core/src/execution/context.rs rename to datafusion/core/src/execution/context/mod.rs index 8bd4de742d69..d523c39ee01e 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -16,6 +16,13 @@ // under the License. //! [`SessionContext`] contains methods for registering data sources and executing queries + +mod avro; +mod csv; +mod json; +#[cfg(feature = "parquet")] +mod parquet; + use crate::{ catalog::{CatalogList, MemoryCatalogList}, datasource::{ @@ -77,7 +84,6 @@ use datafusion_sql::{ use sqlparser::dialect::dialect_from_str; use crate::config::ConfigOptions; -use crate::datasource::physical_plan::{plan_to_csv, plan_to_json, plan_to_parquet}; use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry}; use crate::physical_plan::udaf::AggregateUDF; use crate::physical_plan::udf::ScalarUDF; @@ -92,7 +98,6 @@ use datafusion_sql::{ parser::DFParser, planner::{ContextProvider, SqlToRel}, }; -use parquet::file::properties::WriterProperties; use url::Url; use crate::catalog::information_schema::{InformationSchemaProvider, INFORMATION_SCHEMA}; @@ -110,9 +115,7 @@ use crate::execution::options::ArrowReadOptions; pub use datafusion_execution::config::SessionConfig; pub use datafusion_execution::TaskContext; -use super::options::{ - AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions, ReadOptions, -}; +use super::options::ReadOptions; /// DataFilePaths adds a method to convert strings and vector of strings to vector of [`ListingTableUrl`] URLs. /// This allows methods such [`SessionContext::read_csv`] and [`SessionContext::read_avro`] @@ -856,34 +859,6 @@ impl SessionContext { self.read_table(Arc::new(provider)) } - /// Creates a [`DataFrame`] for reading an Avro data source. - /// - /// For more control such as reading multiple files, you can use - /// [`read_table`](Self::read_table) with a [`ListingTable`]. - /// - /// For an example, see [`read_csv`](Self::read_csv) - pub async fn read_avro( - &self, - table_paths: P, - options: AvroReadOptions<'_>, - ) -> Result { - self._read_type(table_paths, options).await - } - - /// Creates a [`DataFrame`] for reading an JSON data source. - /// - /// For more control such as reading multiple files, you can use - /// [`read_table`](Self::read_table) with a [`ListingTable`]. - /// - /// For an example, see [`read_csv`](Self::read_csv) - pub async fn read_json( - &self, - table_paths: P, - options: NdJsonReadOptions<'_>, - ) -> Result { - self._read_type(table_paths, options).await - } - /// Creates a [`DataFrame`] for reading an Arrow data source. /// /// For more control such as reading multiple files, you can use @@ -906,48 +881,6 @@ impl SessionContext { )) } - /// Creates a [`DataFrame`] for reading a CSV data source. - /// - /// For more control such as reading multiple files, you can use - /// [`read_table`](Self::read_table) with a [`ListingTable`]. - /// - /// Example usage is given below: - /// - /// ``` - /// use datafusion::prelude::*; - /// # use datafusion::error::Result; - /// # #[tokio::main] - /// # async fn main() -> Result<()> { - /// let ctx = SessionContext::new(); - /// // You can read a single file using `read_csv` - /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; - /// // you can also read multiple files: - /// let df = ctx.read_csv(vec!["tests/data/example.csv", "tests/data/example.csv"], CsvReadOptions::new()).await?; - /// # Ok(()) - /// # } - /// ``` - pub async fn read_csv( - &self, - table_paths: P, - options: CsvReadOptions<'_>, - ) -> Result { - self._read_type(table_paths, options).await - } - - /// Creates a [`DataFrame`] for reading a Parquet data source. - /// - /// For more control such as reading multiple files, you can use - /// [`read_table`](Self::read_table) with a [`ListingTable`]. - /// - /// For an example, see [`read_csv`](Self::read_csv) - pub async fn read_parquet( - &self, - table_paths: P, - options: ParquetReadOptions<'_>, - ) -> Result { - self._read_type(table_paths, options).await - } - /// Creates a [`DataFrame`] for a [`TableProvider`] such as a /// [`ListingTable`] or a custom user defined provider. pub fn read_table(&self, provider: Arc) -> Result { @@ -1008,91 +941,6 @@ impl SessionContext { Ok(()) } - /// Registers a CSV file as a table which can referenced from SQL - /// statements executed against this context. - pub async fn register_csv( - &self, - name: &str, - table_path: &str, - options: CsvReadOptions<'_>, - ) -> Result<()> { - let listing_options = options.to_listing_options(&self.copied_config()); - - self.register_listing_table( - name, - table_path, - listing_options, - options.schema.map(|s| Arc::new(s.to_owned())), - None, - ) - .await?; - - Ok(()) - } - - /// Registers a JSON file as a table that it can be referenced - /// from SQL statements executed against this context. - pub async fn register_json( - &self, - name: &str, - table_path: &str, - options: NdJsonReadOptions<'_>, - ) -> Result<()> { - let listing_options = options.to_listing_options(&self.copied_config()); - - self.register_listing_table( - name, - table_path, - listing_options, - options.schema.map(|s| Arc::new(s.to_owned())), - None, - ) - .await?; - Ok(()) - } - - /// Registers a Parquet file as a table that can be referenced from SQL - /// statements executed against this context. - pub async fn register_parquet( - &self, - name: &str, - table_path: &str, - options: ParquetReadOptions<'_>, - ) -> Result<()> { - let listing_options = options.to_listing_options(&self.state.read().config); - - self.register_listing_table( - name, - table_path, - listing_options, - options.schema.map(|s| Arc::new(s.to_owned())), - None, - ) - .await?; - Ok(()) - } - - /// Registers an Avro file as a table that can be referenced from - /// SQL statements executed against this context. - pub async fn register_avro( - &self, - name: &str, - table_path: &str, - options: AvroReadOptions<'_>, - ) -> Result<()> { - let listing_options = options.to_listing_options(&self.copied_config()); - - self.register_listing_table( - name, - table_path, - listing_options, - options.schema.map(|s| Arc::new(s.to_owned())), - None, - ) - .await?; - Ok(()) - } - /// Registers an Arrow file as a table that can be referenced from /// SQL statements executed against this context. pub async fn register_arrow( @@ -1268,34 +1116,6 @@ impl SessionContext { self.state().create_physical_plan(logical_plan).await } - /// Executes a query and writes the results to a partitioned CSV file. - pub async fn write_csv( - &self, - plan: Arc, - path: impl AsRef, - ) -> Result<()> { - plan_to_csv(self.task_ctx(), plan, path).await - } - - /// Executes a query and writes the results to a partitioned JSON file. - pub async fn write_json( - &self, - plan: Arc, - path: impl AsRef, - ) -> Result<()> { - plan_to_json(self.task_ctx(), plan, path).await - } - - /// Executes a query and writes the results to a partitioned Parquet file. - pub async fn write_parquet( - &self, - plan: Arc, - path: impl AsRef, - writer_properties: Option, - ) -> Result<()> { - plan_to_parquet(self.task_ctx(), plan, path, writer_properties).await - } - /// Get a new TaskContext to run in this session pub fn task_ctx(&self) -> Arc { Arc::new(TaskContext::from(self)) @@ -1447,6 +1267,7 @@ impl SessionState { // Create table_factories for all default formats let mut table_factories: HashMap> = HashMap::new(); + #[cfg(feature = "parquet")] table_factories.insert("PARQUET".into(), Arc::new(ListingTableFactory::new())); table_factories.insert("CSV".into(), Arc::new(ListingTableFactory::new())); table_factories.insert("JSON".into(), Arc::new(ListingTableFactory::new())); @@ -2238,22 +2059,21 @@ impl<'a> TreeNodeVisitor for BadPlanVisitor<'a> { #[cfg(test)] mod tests { + use super::super::options::CsvReadOptions; use super::*; use crate::assert_batches_eq; use crate::execution::context::QueryPlanner; use crate::execution::memory_pool::MemoryConsumer; use crate::execution::runtime_env::RuntimeConfig; use crate::test; - use crate::test_util::parquet_test_data; + use crate::test_util::{plan_and_collect, populate_csv_partitions}; use crate::variable::VarType; - use arrow::record_batch::RecordBatch; - use arrow_schema::{Field, Schema}; + use arrow_schema::Schema; use async_trait::async_trait; use datafusion_expr::Expr; - use std::fs::File; + use std::env; use std::path::PathBuf; use std::sync::Weak; - use std::{env, io::prelude::*}; use tempfile::TempDir; #[tokio::test] @@ -2348,39 +2168,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn query_csv_with_custom_partition_extension() -> Result<()> { - let tmp_dir = TempDir::new()?; - - // The main stipulation of this test: use a file extension that isn't .csv. - let file_extension = ".tst"; - - let ctx = SessionContext::new(); - let schema = populate_csv_partitions(&tmp_dir, 2, file_extension)?; - ctx.register_csv( - "test", - tmp_dir.path().to_str().unwrap(), - CsvReadOptions::new() - .schema(&schema) - .file_extension(file_extension), - ) - .await?; - let results = - plan_and_collect(&ctx, "SELECT SUM(c1), SUM(c2), COUNT(*) FROM test").await?; - - assert_eq!(results.len(), 1); - let expected = [ - "+--------------+--------------+----------+", - "| SUM(test.c1) | SUM(test.c2) | COUNT(*) |", - "+--------------+--------------+----------+", - "| 10 | 110 | 20 |", - "+--------------+--------------+----------+", - ]; - assert_batches_eq!(expected, &results); - - Ok(()) - } - #[tokio::test] async fn send_context_to_threads() -> Result<()> { // ensure SessionContexts can be used in a multi-threaded @@ -2645,60 +2432,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn read_with_glob_path() -> Result<()> { - let ctx = SessionContext::new(); - - let df = ctx - .read_parquet( - format!("{}/alltypes_plain*.parquet", parquet_test_data()), - ParquetReadOptions::default(), - ) - .await?; - let results = df.collect().await?; - let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum(); - // alltypes_plain.parquet = 8 rows, alltypes_plain.snappy.parquet = 2 rows, alltypes_dictionary.parquet = 2 rows - assert_eq!(total_rows, 10); - Ok(()) - } - - #[tokio::test] - async fn read_with_glob_path_issue_2465() -> Result<()> { - let ctx = SessionContext::new(); - - let df = ctx - .read_parquet( - // it was reported that when a path contains // (two consecutive separator) no files were found - // in this test, regardless of parquet_test_data() value, our path now contains a // - format!("{}/..//*/alltypes_plain*.parquet", parquet_test_data()), - ParquetReadOptions::default(), - ) - .await?; - let results = df.collect().await?; - let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum(); - // alltypes_plain.parquet = 8 rows, alltypes_plain.snappy.parquet = 2 rows, alltypes_dictionary.parquet = 2 rows - assert_eq!(total_rows, 10); - Ok(()) - } - - #[tokio::test] - async fn read_from_registered_table_with_glob_path() -> Result<()> { - let ctx = SessionContext::new(); - - ctx.register_parquet( - "test", - &format!("{}/alltypes_plain*.parquet", parquet_test_data()), - ParquetReadOptions::default(), - ) - .await?; - let df = ctx.sql("SELECT * FROM test").await?; - let results = df.collect().await?; - let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum(); - // alltypes_plain.parquet = 8 rows, alltypes_plain.snappy.parquet = 2 rows, alltypes_dictionary.parquet = 2 rows - assert_eq!(total_rows, 10); - Ok(()) - } - struct MyPhysicalPlanner {} #[async_trait] @@ -2738,43 +2471,6 @@ mod tests { } } - /// Execute SQL and return results - async fn plan_and_collect( - ctx: &SessionContext, - sql: &str, - ) -> Result> { - ctx.sql(sql).await?.collect().await - } - - /// Generate CSV partitions within the supplied directory - fn populate_csv_partitions( - tmp_dir: &TempDir, - partition_count: usize, - file_extension: &str, - ) -> Result { - // define schema for data source (csv file) - let schema = Arc::new(Schema::new(vec![ - Field::new("c1", DataType::UInt32, false), - Field::new("c2", DataType::UInt64, false), - Field::new("c3", DataType::Boolean, false), - ])); - - // generate a partitioned file - for partition in 0..partition_count { - let filename = format!("partition-{partition}.{file_extension}"); - let file_path = tmp_dir.path().join(filename); - let mut file = File::create(file_path)?; - - // generate some data - for i in 0..=10 { - let data = format!("{},{},{}\n", partition, i, i % 2 == 0); - file.write_all(data.as_bytes())?; - } - } - - Ok(schema) - } - /// Generate a partitioned CSV file and register it with an execution context async fn create_ctx( tmp_dir: &TempDir, @@ -2796,37 +2492,4 @@ mod tests { Ok(ctx) } - - // Test for compilation error when calling read_* functions from an #[async_trait] function. - // See https://github.com/apache/arrow-datafusion/issues/1154 - #[async_trait] - trait CallReadTrait { - async fn call_read_csv(&self) -> DataFrame; - async fn call_read_avro(&self) -> DataFrame; - async fn call_read_parquet(&self) -> DataFrame; - } - - struct CallRead {} - - #[async_trait] - impl CallReadTrait for CallRead { - async fn call_read_csv(&self) -> DataFrame { - let ctx = SessionContext::new(); - ctx.read_csv("dummy", CsvReadOptions::new()).await.unwrap() - } - - async fn call_read_avro(&self) -> DataFrame { - let ctx = SessionContext::new(); - ctx.read_avro("dummy", AvroReadOptions::default()) - .await - .unwrap() - } - - async fn call_read_parquet(&self) -> DataFrame { - let ctx = SessionContext::new(); - ctx.read_parquet("dummy", ParquetReadOptions::default()) - .await - .unwrap() - } - } } diff --git a/datafusion/core/src/execution/context/parquet.rs b/datafusion/core/src/execution/context/parquet.rs new file mode 100644 index 000000000000..b02576c6a868 --- /dev/null +++ b/datafusion/core/src/execution/context/parquet.rs @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use parquet::file::properties::WriterProperties; + +use crate::datasource::physical_plan::plan_to_parquet; + +use super::super::options::{ParquetReadOptions, ReadOptions}; +use super::{DataFilePaths, DataFrame, ExecutionPlan, Result, SessionContext}; + +impl SessionContext { + /// Creates a [`DataFrame`] for reading a Parquet data source. + /// + /// For more control such as reading multiple files, you can use + /// [`read_table`](Self::read_table) with a [`super::ListingTable`]. + /// + /// For an example, see [`read_csv`](Self::read_csv) + pub async fn read_parquet( + &self, + table_paths: P, + options: ParquetReadOptions<'_>, + ) -> Result { + self._read_type(table_paths, options).await + } + + /// Registers a Parquet file as a table that can be referenced from SQL + /// statements executed against this context. + pub async fn register_parquet( + &self, + name: &str, + table_path: &str, + options: ParquetReadOptions<'_>, + ) -> Result<()> { + let listing_options = options.to_listing_options(&self.state.read().config); + + self.register_listing_table( + name, + table_path, + listing_options, + options.schema.map(|s| Arc::new(s.to_owned())), + None, + ) + .await?; + Ok(()) + } + + /// Executes a query and writes the results to a partitioned Parquet file. + pub async fn write_parquet( + &self, + plan: Arc, + path: impl AsRef, + writer_properties: Option, + ) -> Result<()> { + plan_to_parquet(self.task_ctx(), plan, path, writer_properties).await + } +} + +#[cfg(test)] +mod tests { + use async_trait::async_trait; + + use crate::test_util::parquet_test_data; + + use super::*; + + #[tokio::test] + async fn read_with_glob_path() -> Result<()> { + let ctx = SessionContext::new(); + + let df = ctx + .read_parquet( + format!("{}/alltypes_plain*.parquet", parquet_test_data()), + ParquetReadOptions::default(), + ) + .await?; + let results = df.collect().await?; + let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum(); + // alltypes_plain.parquet = 8 rows, alltypes_plain.snappy.parquet = 2 rows, alltypes_dictionary.parquet = 2 rows + assert_eq!(total_rows, 10); + Ok(()) + } + + #[tokio::test] + async fn read_with_glob_path_issue_2465() -> Result<()> { + let ctx = SessionContext::new(); + + let df = ctx + .read_parquet( + // it was reported that when a path contains // (two consecutive separator) no files were found + // in this test, regardless of parquet_test_data() value, our path now contains a // + format!("{}/..//*/alltypes_plain*.parquet", parquet_test_data()), + ParquetReadOptions::default(), + ) + .await?; + let results = df.collect().await?; + let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum(); + // alltypes_plain.parquet = 8 rows, alltypes_plain.snappy.parquet = 2 rows, alltypes_dictionary.parquet = 2 rows + assert_eq!(total_rows, 10); + Ok(()) + } + + #[tokio::test] + async fn read_from_registered_table_with_glob_path() -> Result<()> { + let ctx = SessionContext::new(); + + ctx.register_parquet( + "test", + &format!("{}/alltypes_plain*.parquet", parquet_test_data()), + ParquetReadOptions::default(), + ) + .await?; + let df = ctx.sql("SELECT * FROM test").await?; + let results = df.collect().await?; + let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum(); + // alltypes_plain.parquet = 8 rows, alltypes_plain.snappy.parquet = 2 rows, alltypes_dictionary.parquet = 2 rows + assert_eq!(total_rows, 10); + Ok(()) + } + + // Test for compilation error when calling read_* functions from an #[async_trait] function. + // See https://github.com/apache/arrow-datafusion/issues/1154 + #[async_trait] + trait CallReadTrait { + async fn call_read_parquet(&self) -> DataFrame; + } + + struct CallRead {} + + #[async_trait] + impl CallReadTrait for CallRead { + async fn call_read_parquet(&self) -> DataFrame { + let ctx = SessionContext::new(); + ctx.read_parquet("dummy", ParquetReadOptions::default()) + .await + .unwrap() + } + } +} diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 5e9f130eade5..bf9a4abf4f2d 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -437,6 +437,7 @@ pub mod variable; // re-export dependencies from arrow-rs to minimize version maintenance for crate users pub use arrow; +#[cfg(feature = "parquet")] pub use parquet; // re-export DataFusion sub-crates at the top level. Use `pub use *` diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 9cd7eff4722b..d3fbc46a6659 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -26,7 +26,9 @@ use std::fmt::Formatter; use std::sync::Arc; use crate::config::ConfigOptions; -use crate::datasource::physical_plan::{CsvExec, ParquetExec}; +use crate::datasource::physical_plan::CsvExec; +#[cfg(feature = "parquet")] +use crate::datasource::physical_plan::ParquetExec; use crate::error::Result; use crate::physical_optimizer::utils::{ add_sort_above, get_children_exectrees, get_plan_string, is_coalesce_partitions, @@ -1306,6 +1308,7 @@ fn ensure_distribution( // When `repartition_file_scans` is set, leverage source operators // (`ParquetExec`, `CsvExec` etc.) to increase parallelism at the source. if repartition_file_scans { + #[cfg(feature = "parquet")] if let Some(parquet_exec) = child.as_any().downcast_ref::() { @@ -1313,9 +1316,8 @@ fn ensure_distribution( target_partitions, repartition_file_min_size, )); - } else if let Some(csv_exec) = - child.as_any().downcast_ref::() - { + } + if let Some(csv_exec) = child.as_any().downcast_ref::() { if let Some(csv_exec) = csv_exec.get_repartitioned( target_partitions, repartition_file_min_size, @@ -1680,7 +1682,9 @@ mod tests { use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; - use crate::datasource::physical_plan::{FileScanConfig, ParquetExec}; + use crate::datasource::physical_plan::FileScanConfig; + #[cfg(feature = "parquet")] + use crate::datasource::physical_plan::ParquetExec; use crate::physical_optimizer::enforce_sorting::EnforceSorting; use crate::physical_optimizer::output_requirements::OutputRequirements; use crate::physical_plan::aggregates::{ @@ -1819,10 +1823,12 @@ mod tests { ])) } + #[cfg(feature = "parquet")] fn parquet_exec() -> Arc { parquet_exec_with_sort(vec![]) } + #[cfg(feature = "parquet")] fn parquet_exec_with_sort( output_ordering: Vec>, ) -> Arc { @@ -1843,11 +1849,13 @@ mod tests { )) } + #[cfg(feature = "parquet")] fn parquet_exec_multiple() -> Arc { parquet_exec_multiple_sorted(vec![]) } // Created a sorted parquet exec with multiple files + #[cfg(feature = "parquet")] fn parquet_exec_multiple_sorted( output_ordering: Vec>, ) -> Arc { @@ -2202,6 +2210,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn multi_hash_joins() -> Result<()> { let left = parquet_exec(); let alias_pairs: Vec<(String, String)> = vec![ @@ -2364,6 +2373,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn multi_joins_after_alias() -> Result<()> { let left = parquet_exec(); let right = parquet_exec(); @@ -2443,6 +2453,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn multi_joins_after_multi_alias() -> Result<()> { let left = parquet_exec(); let right = parquet_exec(); @@ -2498,6 +2509,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn join_after_agg_alias() -> Result<()> { // group by (a as a1) let left = aggregate_exec_with_alias( @@ -2537,6 +2549,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn hash_join_key_ordering() -> Result<()> { // group by (a as a1, b as b1) let left = aggregate_exec_with_alias( @@ -2589,6 +2602,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn multi_hash_join_key_ordering() -> Result<()> { let left = parquet_exec(); let alias_pairs: Vec<(String, String)> = vec![ @@ -2705,6 +2719,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn reorder_join_keys_to_left_input() -> Result<()> { let left = parquet_exec(); let alias_pairs: Vec<(String, String)> = vec![ @@ -2835,6 +2850,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn reorder_join_keys_to_right_input() -> Result<()> { let left = parquet_exec(); let alias_pairs: Vec<(String, String)> = vec![ @@ -2960,6 +2976,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn multi_smj_joins() -> Result<()> { let left = parquet_exec(); let alias_pairs: Vec<(String, String)> = vec![ @@ -3233,6 +3250,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn smj_join_key_ordering() -> Result<()> { // group by (a as a1, b as b1) let left = aggregate_exec_with_alias( @@ -3328,6 +3346,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn merge_does_not_need_sort() -> Result<()> { // see https://github.com/apache/arrow-datafusion/issues/4331 let schema = schema(); @@ -3368,6 +3387,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn union_to_interleave() -> Result<()> { // group by (a as a1) let left = aggregate_exec_with_alias( @@ -3409,6 +3429,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn added_repartition_to_single_partition() -> Result<()> { let alias = vec![("a".to_string(), "a".to_string())]; let plan = aggregate_exec_with_alias(parquet_exec(), alias); @@ -3427,6 +3448,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn repartition_deepest_node() -> Result<()> { let alias = vec![("a".to_string(), "a".to_string())]; let plan = aggregate_exec_with_alias(filter_exec(parquet_exec()), alias); @@ -3446,6 +3468,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn repartition_unsorted_limit() -> Result<()> { let plan = limit_exec(filter_exec(parquet_exec())); @@ -3465,6 +3488,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn repartition_sorted_limit() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -3487,6 +3511,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn repartition_sorted_limit_with_filter() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -3512,6 +3537,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn repartition_ignores_limit() -> Result<()> { let alias = vec![("a".to_string(), "a".to_string())]; let plan = aggregate_exec_with_alias( @@ -3542,6 +3568,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn repartition_ignores_union() -> Result<()> { let plan = union_exec(vec![parquet_exec(); 5]); @@ -3561,6 +3588,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn repartition_through_sort_preserving_merge() -> Result<()> { // sort preserving merge with non-sorted input let schema = schema(); @@ -3583,6 +3611,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn repartition_ignores_sort_preserving_merge() -> Result<()> { // sort preserving merge already sorted input, let schema = schema(); @@ -3614,6 +3643,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn repartition_ignores_sort_preserving_merge_with_union() -> Result<()> { // 2 sorted parquet files unioned (partitions are concatenated, sort is preserved) let schema = schema(); @@ -3646,6 +3676,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn repartition_does_not_destroy_sort() -> Result<()> { // SortRequired // Parquet(sorted) @@ -3671,6 +3702,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn repartition_does_not_destroy_sort_more_complex() -> Result<()> { // model a more complicated scenario where one child of a union can be repartitioned for performance // but the other can not be @@ -3709,6 +3741,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn repartition_transitively_with_projection() -> Result<()> { let schema = schema(); let proj_exprs = vec![( @@ -3751,6 +3784,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn repartition_ignores_transitively_with_projection() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -3781,6 +3815,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn repartition_transitively_past_sort_with_projection() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -3810,6 +3845,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn repartition_transitively_past_sort_with_filter() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -3842,6 +3878,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn repartition_transitively_past_sort_with_projection_and_filter() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -3883,6 +3920,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn parallelization_single_partition() -> Result<()> { let alias = vec![("a".to_string(), "a".to_string())]; let plan_parquet = aggregate_exec_with_alias(parquet_exec(), alias.clone()); @@ -3971,6 +4009,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn parallelization_two_partitions() -> Result<()> { let alias = vec![("a".to_string(), "a".to_string())]; let plan_parquet = @@ -3998,6 +4037,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn parallelization_two_partitions_into_four() -> Result<()> { let alias = vec![("a".to_string(), "a".to_string())]; let plan_parquet = @@ -4025,6 +4065,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn parallelization_sorted_limit() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4057,6 +4098,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn parallelization_limit_with_filter() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4102,6 +4144,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn parallelization_ignores_limit() -> Result<()> { let alias = vec![("a".to_string(), "a".to_string())]; let plan_parquet = aggregate_exec_with_alias( @@ -4152,6 +4195,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn parallelization_union_inputs() -> Result<()> { let plan_parquet = union_exec(vec![parquet_exec(); 5]); let plan_csv = union_exec(vec![csv_exec(); 5]); @@ -4181,6 +4225,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn parallelization_prior_to_sort_preserving_merge() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4211,6 +4256,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn parallelization_sort_preserving_merge_with_union() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4245,6 +4291,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn parallelization_does_not_benefit() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4273,6 +4320,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn parallelization_ignores_transitively_with_projection_parquet() -> Result<()> { // sorted input let schema = schema(); @@ -4353,6 +4401,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn remove_redundant_roundrobins() -> Result<()> { let input = parquet_exec(); let repartition = repartition_exec(repartition_exec(input)); @@ -4403,6 +4452,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn do_not_preserve_ordering_through_repartition() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4435,6 +4485,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn do_not_preserve_ordering_through_repartition2() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4473,6 +4524,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn do_not_preserve_ordering_through_repartition3() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4495,6 +4547,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn do_not_put_sort_when_input_is_invalid() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4533,6 +4586,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn put_sort_when_input_is_valid() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4575,6 +4629,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn do_not_add_unnecessary_hash() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { @@ -4630,6 +4685,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn optimize_away_unnecessary_repartition() -> Result<()> { let physical_plan = coalesce_partitions_exec(repartition_exec(parquet_exec())); let expected = &[ @@ -4649,6 +4705,7 @@ mod tests { } #[test] + #[cfg(feature = "parquet")] fn optimize_away_unnecessary_repartition2() -> Result<()> { let physical_plan = filter_exec(repartition_exec(coalesce_partitions_exec( filter_exec(repartition_exec(parquet_exec())), diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 419f62cff664..f941e88f3a36 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -25,6 +25,7 @@ use crate::datasource::file_format::arrow::ArrowFormat; use crate::datasource::file_format::avro::AvroFormat; use crate::datasource::file_format::csv::CsvFormat; use crate::datasource::file_format::json::JsonFormat; +#[cfg(feature = "parquet")] use crate::datasource::file_format::parquet::ParquetFormat; use crate::datasource::file_format::write::FileWriterMode; use crate::datasource::file_format::FileFormat; @@ -599,6 +600,7 @@ impl DefaultPhysicalPlanner { let sink_format: Arc = match file_format { FileType::CSV => Arc::new(CsvFormat::default()), + #[cfg(feature = "parquet")] FileType::PARQUET => Arc::new(ParquetFormat::default()), FileType::JSON => Arc::new(JsonFormat::default()), FileType::AVRO => Arc::new(AvroFormat {} ), diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index 4fe022f1769d..c6b43de0c18d 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -17,15 +17,21 @@ //! Utility functions to make testing DataFusion based crates easier +#[cfg(feature = "parquet")] pub mod parquet; use std::any::Any; use std::collections::HashMap; +use std::fs::File; +use std::io::Write; use std::path::Path; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use tempfile::TempDir; + +use crate::dataframe::DataFrame; use crate::datasource::provider::TableProviderFactory; use crate::datasource::{empty::EmptyTable, provider_as_source, TableProvider}; use crate::error::Result; @@ -48,9 +54,9 @@ use async_trait::async_trait; use futures::Stream; // backwards compatibility -pub use datafusion_common::test_util::{ - arrow_test_data, get_data_dir, parquet_test_data, -}; +#[cfg(feature = "parquet")] +pub use datafusion_common::test_util::parquet_test_data; +pub use datafusion_common::test_util::{arrow_test_data, get_data_dir}; pub use datafusion_common::{assert_batches_eq, assert_batches_sorted_eq}; @@ -102,6 +108,71 @@ pub fn aggr_test_schema() -> SchemaRef { Arc::new(schema) } +/// Register session context for the aggregate_test_100.csv file +pub async fn register_aggregate_csv( + ctx: &mut SessionContext, + table_name: &str, +) -> Result<()> { + let schema = aggr_test_schema(); + let testdata = arrow_test_data(); + ctx.register_csv( + table_name, + &format!("{testdata}/csv/aggregate_test_100.csv"), + CsvReadOptions::new().schema(schema.as_ref()), + ) + .await?; + Ok(()) +} + +/// Create a table from the aggregate_test_100.csv file with the specified name +pub async fn test_table_with_name(name: &str) -> Result { + let mut ctx = SessionContext::new(); + register_aggregate_csv(&mut ctx, name).await?; + ctx.table(name).await +} + +/// Create a table from the aggregate_test_100.csv file with the name "aggregate_test_100" +pub async fn test_table() -> Result { + test_table_with_name("aggregate_test_100").await +} + +/// Execute SQL and return results +pub async fn plan_and_collect( + ctx: &SessionContext, + sql: &str, +) -> Result> { + ctx.sql(sql).await?.collect().await +} + +/// Generate CSV partitions within the supplied directory +pub fn populate_csv_partitions( + tmp_dir: &TempDir, + partition_count: usize, + file_extension: &str, +) -> Result { + // define schema for data source (csv file) + let schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::UInt32, false), + Field::new("c2", DataType::UInt64, false), + Field::new("c3", DataType::Boolean, false), + ])); + + // generate a partitioned file + for partition in 0..partition_count { + let filename = format!("partition-{partition}.{file_extension}"); + let file_path = tmp_dir.path().join(filename); + let mut file = File::create(file_path)?; + + // generate some data + for i in 0..=10 { + let data = format!("{},{},{}\n", partition, i, i % 2 == 0); + file.write_all(data.as_bytes())?; + } + } + + Ok(schema) +} + /// TableFactory for tests pub struct TestTableFactory {} diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml index 94e77088a7e8..32e10e58a7d7 100644 --- a/datafusion/proto/Cargo.toml +++ b/datafusion/proto/Cargo.toml @@ -36,8 +36,9 @@ name = "datafusion_proto" path = "src/lib.rs" [features] -default = [] +default = ["parquet"] json = ["pbjson", "serde", "serde_json"] +parquet = ["datafusion/parquet", "datafusion-common/parquet"] [dependencies] arrow = { workspace = true } diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index df76fbb81396..e426c598523e 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -31,11 +31,11 @@ use crate::{ }; use arrow::datatypes::{DataType, Schema, SchemaRef}; +#[cfg(feature = "parquet")] +use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::{ datasource::{ - file_format::{ - avro::AvroFormat, csv::CsvFormat, parquet::ParquetFormat, FileFormat, - }, + file_format::{avro::AvroFormat, csv::CsvFormat, FileFormat}, listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, view::ViewTable, TableProvider, @@ -336,6 +336,7 @@ impl AsLogicalPlan for LogicalPlanNode { "logical_plan::from_proto() Unsupported file format '{self:?}'" )) })? { + #[cfg(feature = "parquet")] &FileFormatType::Parquet(protobuf::ParquetFormat {}) => { Arc::new(ParquetFormat::default()) } @@ -849,28 +850,49 @@ impl AsLogicalPlan for LogicalPlanNode { if let Some(listing_table) = source.downcast_ref::() { let any = listing_table.options().format.as_any(); - let file_format_type = if any.is::() { - FileFormatType::Parquet(protobuf::ParquetFormat {}) - } else if let Some(csv) = any.downcast_ref::() { - FileFormatType::Csv(protobuf::CsvFormat { - delimiter: byte_to_string(csv.delimiter(), "delimiter")?, - has_header: csv.has_header(), - quote: byte_to_string(csv.quote(), "quote")?, - optional_escape: if let Some(escape) = csv.escape() { - Some(protobuf::csv_format::OptionalEscape::Escape( - byte_to_string(escape, "escape")?, - )) - } else { - None - }, - }) - } else if any.is::() { - FileFormatType::Avro(protobuf::AvroFormat {}) - } else { - return Err(proto_error(format!( + let file_format_type = { + let mut maybe_some_type = None; + + #[cfg(feature = "parquet")] + if any.is::() { + maybe_some_type = + Some(FileFormatType::Parquet(protobuf::ParquetFormat {})) + }; + + if let Some(csv) = any.downcast_ref::() { + maybe_some_type = + Some(FileFormatType::Csv(protobuf::CsvFormat { + delimiter: byte_to_string( + csv.delimiter(), + "delimiter", + )?, + has_header: csv.has_header(), + quote: byte_to_string(csv.quote(), "quote")?, + optional_escape: if let Some(escape) = csv.escape() { + Some( + protobuf::csv_format::OptionalEscape::Escape( + byte_to_string(escape, "escape")?, + ), + ) + } else { + None + }, + })) + } + + if any.is::() { + maybe_some_type = + Some(FileFormatType::Avro(protobuf::AvroFormat {})) + } + + if let Some(file_format_type) = maybe_some_type { + file_format_type + } else { + return Err(proto_error(format!( "Error converting file format, {:?} is invalid as a datafusion format.", listing_table.options().format ))); + } }; let options = listing_table.options(); diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index ef870d8ac20b..431b8e42cdaf 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -22,7 +22,9 @@ use std::sync::Arc; use datafusion::arrow::compute::SortOptions; use datafusion::arrow::datatypes::SchemaRef; use datafusion::datasource::file_format::file_compression_type::FileCompressionType; -use datafusion::datasource::physical_plan::{AvroExec, CsvExec, ParquetExec}; +#[cfg(feature = "parquet")] +use datafusion::datasource::physical_plan::ParquetExec; +use datafusion::datasource::physical_plan::{AvroExec, CsvExec}; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::execution::FunctionRegistry; use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateMode}; @@ -171,6 +173,7 @@ impl AsExecutionPlan for PhysicalPlanNode { }, FileCompressionType::UNCOMPRESSED, ))), + #[cfg(feature = "parquet")] PhysicalPlanType::ParquetScan(scan) => { let base_config = parse_protobuf_file_scan_config( scan.base_conf.as_ref().unwrap(), @@ -796,7 +799,7 @@ impl AsExecutionPlan for PhysicalPlanNode { let plan = plan.as_any(); if let Some(exec) = plan.downcast_ref::() { - Ok(protobuf::PhysicalPlanNode { + return Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Explain( protobuf::ExplainExecNode { schema: Some(exec.schema().as_ref().try_into()?), @@ -808,8 +811,10 @@ impl AsExecutionPlan for PhysicalPlanNode { verbose: exec.verbose(), }, )), - }) - } else if let Some(exec) = plan.downcast_ref::() { + }); + } + + if let Some(exec) = plan.downcast_ref::() { let input = protobuf::PhysicalPlanNode::try_from_physical_plan( exec.input().to_owned(), extension_codec, @@ -820,7 +825,7 @@ impl AsExecutionPlan for PhysicalPlanNode { .map(|expr| expr.0.clone().try_into()) .collect::>>()?; let expr_name = exec.expr().iter().map(|expr| expr.1.clone()).collect(); - Ok(protobuf::PhysicalPlanNode { + return Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Projection(Box::new( protobuf::ProjectionExecNode { input: Some(Box::new(input)), @@ -828,13 +833,15 @@ impl AsExecutionPlan for PhysicalPlanNode { expr_name, }, ))), - }) - } else if let Some(exec) = plan.downcast_ref::() { + }); + } + + if let Some(exec) = plan.downcast_ref::() { let input = protobuf::PhysicalPlanNode::try_from_physical_plan( exec.input().to_owned(), extension_codec, )?; - Ok(protobuf::PhysicalPlanNode { + return Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Analyze(Box::new( protobuf::AnalyzeExecNode { verbose: exec.verbose(), @@ -843,27 +850,31 @@ impl AsExecutionPlan for PhysicalPlanNode { schema: Some(exec.schema().as_ref().try_into()?), }, ))), - }) - } else if let Some(exec) = plan.downcast_ref::() { + }); + } + + if let Some(exec) = plan.downcast_ref::() { let input = protobuf::PhysicalPlanNode::try_from_physical_plan( exec.input().to_owned(), extension_codec, )?; - Ok(protobuf::PhysicalPlanNode { + return Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Filter(Box::new( protobuf::FilterExecNode { input: Some(Box::new(input)), expr: Some(exec.predicate().clone().try_into()?), }, ))), - }) - } else if let Some(limit) = plan.downcast_ref::() { + }); + } + + if let Some(limit) = plan.downcast_ref::() { let input = protobuf::PhysicalPlanNode::try_from_physical_plan( limit.input().to_owned(), extension_codec, )?; - Ok(protobuf::PhysicalPlanNode { + return Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::GlobalLimit(Box::new( protobuf::GlobalLimitExecNode { input: Some(Box::new(input)), @@ -874,21 +885,25 @@ impl AsExecutionPlan for PhysicalPlanNode { }, }, ))), - }) - } else if let Some(limit) = plan.downcast_ref::() { + }); + } + + if let Some(limit) = plan.downcast_ref::() { let input = protobuf::PhysicalPlanNode::try_from_physical_plan( limit.input().to_owned(), extension_codec, )?; - Ok(protobuf::PhysicalPlanNode { + return Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::LocalLimit(Box::new( protobuf::LocalLimitExecNode { input: Some(Box::new(input)), fetch: limit.fetch() as u32, }, ))), - }) - } else if let Some(exec) = plan.downcast_ref::() { + }); + } + + if let Some(exec) = plan.downcast_ref::() { let left = protobuf::PhysicalPlanNode::try_from_physical_plan( exec.left().to_owned(), extension_codec, @@ -943,7 +958,7 @@ impl AsExecutionPlan for PhysicalPlanNode { PartitionMode::Auto => protobuf::PartitionMode::Auto, }; - Ok(protobuf::PhysicalPlanNode { + return Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new( protobuf::HashJoinExecNode { left: Some(Box::new(left)), @@ -955,8 +970,10 @@ impl AsExecutionPlan for PhysicalPlanNode { filter, }, ))), - }) - } else if let Some(exec) = plan.downcast_ref::() { + }); + } + + if let Some(exec) = plan.downcast_ref::() { let left = protobuf::PhysicalPlanNode::try_from_physical_plan( exec.left().to_owned(), extension_codec, @@ -965,15 +982,16 @@ impl AsExecutionPlan for PhysicalPlanNode { exec.right().to_owned(), extension_codec, )?; - Ok(protobuf::PhysicalPlanNode { + return Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::CrossJoin(Box::new( protobuf::CrossJoinExecNode { left: Some(Box::new(left)), right: Some(Box::new(right)), }, ))), - }) - } else if let Some(exec) = plan.downcast_ref::() { + }); + } + if let Some(exec) = plan.downcast_ref::() { let groups: Vec = exec .group_expr() .groups() @@ -1046,7 +1064,7 @@ impl AsExecutionPlan for PhysicalPlanNode { .map(|expr| expr.0.to_owned().try_into()) .collect::>>()?; - Ok(protobuf::PhysicalPlanNode { + return Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new( protobuf::AggregateExecNode { group_expr, @@ -1062,33 +1080,38 @@ impl AsExecutionPlan for PhysicalPlanNode { groups, }, ))), - }) - } else if let Some(empty) = plan.downcast_ref::() { + }); + } + + if let Some(empty) = plan.downcast_ref::() { let schema = empty.schema().as_ref().try_into()?; - Ok(protobuf::PhysicalPlanNode { + return Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Empty( protobuf::EmptyExecNode { produce_one_row: empty.produce_one_row(), schema: Some(schema), }, )), - }) - } else if let Some(coalesce_batches) = plan.downcast_ref::() - { + }); + } + + if let Some(coalesce_batches) = plan.downcast_ref::() { let input = protobuf::PhysicalPlanNode::try_from_physical_plan( coalesce_batches.input().to_owned(), extension_codec, )?; - Ok(protobuf::PhysicalPlanNode { + return Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::CoalesceBatches(Box::new( protobuf::CoalesceBatchesExecNode { input: Some(Box::new(input)), target_batch_size: coalesce_batches.target_batch_size() as u32, }, ))), - }) - } else if let Some(exec) = plan.downcast_ref::() { - Ok(protobuf::PhysicalPlanNode { + }); + } + + if let Some(exec) = plan.downcast_ref::() { + return Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::CsvScan( protobuf::CsvScanExecNode { base_conf: Some(exec.base_config().try_into()?), @@ -1104,41 +1127,50 @@ impl AsExecutionPlan for PhysicalPlanNode { }, }, )), - }) - } else if let Some(exec) = plan.downcast_ref::() { + }); + } + + #[cfg(feature = "parquet")] + if let Some(exec) = plan.downcast_ref::() { let predicate = exec .predicate() .map(|pred| pred.clone().try_into()) .transpose()?; - Ok(protobuf::PhysicalPlanNode { + return Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::ParquetScan( protobuf::ParquetScanExecNode { base_conf: Some(exec.base_config().try_into()?), predicate, }, )), - }) - } else if let Some(exec) = plan.downcast_ref::() { - Ok(protobuf::PhysicalPlanNode { + }); + } + + if let Some(exec) = plan.downcast_ref::() { + return Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::AvroScan( protobuf::AvroScanExecNode { base_conf: Some(exec.base_config().try_into()?), }, )), - }) - } else if let Some(exec) = plan.downcast_ref::() { + }); + } + + if let Some(exec) = plan.downcast_ref::() { let input = protobuf::PhysicalPlanNode::try_from_physical_plan( exec.input().to_owned(), extension_codec, )?; - Ok(protobuf::PhysicalPlanNode { + return Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Merge(Box::new( protobuf::CoalescePartitionsExecNode { input: Some(Box::new(input)), }, ))), - }) - } else if let Some(exec) = plan.downcast_ref::() { + }); + } + + if let Some(exec) = plan.downcast_ref::() { let input = protobuf::PhysicalPlanNode::try_from_physical_plan( exec.input().to_owned(), extension_codec, @@ -1162,15 +1194,17 @@ impl AsExecutionPlan for PhysicalPlanNode { } }; - Ok(protobuf::PhysicalPlanNode { + return Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Repartition(Box::new( protobuf::RepartitionExecNode { input: Some(Box::new(input)), partition_method: Some(pb_partition_method), }, ))), - }) - } else if let Some(exec) = plan.downcast_ref::() { + }); + } + + if let Some(exec) = plan.downcast_ref::() { let input = protobuf::PhysicalPlanNode::try_from_physical_plan( exec.input().to_owned(), extension_codec, @@ -1191,7 +1225,7 @@ impl AsExecutionPlan for PhysicalPlanNode { }) }) .collect::>>()?; - Ok(protobuf::PhysicalPlanNode { + return Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Sort(Box::new( protobuf::SortExecNode { input: Some(Box::new(input)), @@ -1203,8 +1237,10 @@ impl AsExecutionPlan for PhysicalPlanNode { preserve_partitioning: exec.preserve_partitioning(), }, ))), - }) - } else if let Some(union) = plan.downcast_ref::() { + }); + } + + if let Some(union) = plan.downcast_ref::() { let mut inputs: Vec = vec![]; for input in union.inputs() { inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan( @@ -1212,12 +1248,14 @@ impl AsExecutionPlan for PhysicalPlanNode { extension_codec, )?); } - Ok(protobuf::PhysicalPlanNode { + return Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Union( protobuf::UnionExecNode { inputs }, )), - }) - } else if let Some(exec) = plan.downcast_ref::() { + }); + } + + if let Some(exec) = plan.downcast_ref::() { let input = protobuf::PhysicalPlanNode::try_from_physical_plan( exec.input().to_owned(), extension_codec, @@ -1238,7 +1276,7 @@ impl AsExecutionPlan for PhysicalPlanNode { }) }) .collect::>>()?; - Ok(protobuf::PhysicalPlanNode { + return Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::SortPreservingMerge( Box::new(protobuf::SortPreservingMergeExecNode { input: Some(Box::new(input)), @@ -1246,8 +1284,10 @@ impl AsExecutionPlan for PhysicalPlanNode { fetch: exec.fetch().map(|f| f as i64).unwrap_or(-1), }), )), - }) - } else if let Some(exec) = plan.downcast_ref::() { + }); + } + + if let Some(exec) = plan.downcast_ref::() { let left = protobuf::PhysicalPlanNode::try_from_physical_plan( exec.left().to_owned(), extension_codec, @@ -1283,7 +1323,7 @@ impl AsExecutionPlan for PhysicalPlanNode { }) .map_or(Ok(None), |v: Result| v.map(Some))?; - Ok(protobuf::PhysicalPlanNode { + return Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::NestedLoopJoin(Box::new( protobuf::NestedLoopJoinExecNode { left: Some(Box::new(left)), @@ -1292,8 +1332,10 @@ impl AsExecutionPlan for PhysicalPlanNode { filter, }, ))), - }) - } else if let Some(exec) = plan.downcast_ref::() { + }); + } + + if let Some(exec) = plan.downcast_ref::() { let input = protobuf::PhysicalPlanNode::try_from_physical_plan( exec.input().to_owned(), extension_codec, @@ -1311,7 +1353,7 @@ impl AsExecutionPlan for PhysicalPlanNode { .map(|e| e.clone().try_into()) .collect::>>()?; - Ok(protobuf::PhysicalPlanNode { + return Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Window(Box::new( protobuf::WindowAggExecNode { input: Some(Box::new(input)), @@ -1320,8 +1362,10 @@ impl AsExecutionPlan for PhysicalPlanNode { partition_search_mode: None, }, ))), - }) - } else if let Some(exec) = plan.downcast_ref::() { + }); + } + + if let Some(exec) = plan.downcast_ref::() { let input = protobuf::PhysicalPlanNode::try_from_physical_plan( exec.input().to_owned(), extension_codec, @@ -1359,7 +1403,7 @@ impl AsExecutionPlan for PhysicalPlanNode { } }; - Ok(protobuf::PhysicalPlanNode { + return Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Window(Box::new( protobuf::WindowAggExecNode { input: Some(Box::new(input)), @@ -1368,32 +1412,32 @@ impl AsExecutionPlan for PhysicalPlanNode { partition_search_mode: Some(partition_search_mode), }, ))), - }) - } else { - let mut buf: Vec = vec![]; - match extension_codec.try_encode(plan_clone.clone(), &mut buf) { - Ok(_) => { - let inputs: Vec = plan_clone - .children() - .into_iter() - .map(|i| { - protobuf::PhysicalPlanNode::try_from_physical_plan( - i, - extension_codec, - ) - }) - .collect::>()?; + }); + } - Ok(protobuf::PhysicalPlanNode { - physical_plan_type: Some(PhysicalPlanType::Extension( - protobuf::PhysicalExtensionNode { node: buf, inputs }, - )), + let mut buf: Vec = vec![]; + match extension_codec.try_encode(plan_clone.clone(), &mut buf) { + Ok(_) => { + let inputs: Vec = plan_clone + .children() + .into_iter() + .map(|i| { + protobuf::PhysicalPlanNode::try_from_physical_plan( + i, + extension_codec, + ) }) - } - Err(e) => internal_err!( - "Unsupported plan and extension codec failed with [{e}]. Plan: {plan_clone:?}" - ), + .collect::>()?; + + Ok(protobuf::PhysicalPlanNode { + physical_plan_type: Some(PhysicalPlanType::Extension( + protobuf::PhysicalExtensionNode { node: buf, inputs }, + )), + }) } + Err(e) => internal_err!( + "Unsupported plan and extension codec failed with [{e}]. Plan: {plan_clone:?}" + ), } } }