Skip to content

Commit

Permalink
Add parquet feature flag, enabled by default, and make parquet cond…
Browse files Browse the repository at this point in the history
…itional (#7745)

* Make parquet an option by adding multiple cfg attributes without significant code changes.

* Extract parquet logic into submodule from execution::context

* Extract parquet logic into submodule from datafusion_core::dataframe

* Extract more logic into submodule from execution::context

* Move tests from execution::context

* Rename submodules
  • Loading branch information
ongchi authored Oct 25, 2023
1 parent 48ea4b2 commit 12a6316
Show file tree
Hide file tree
Showing 25 changed files with 994 additions and 639 deletions.
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async-trait = "0.1.41"
aws-config = "0.55"
aws-credential-types = "0.55"
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { path = "../datafusion/core", version = "32.0.0", features = ["avro", "crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions", "compression"] }
datafusion = { path = "../datafusion/core", version = "32.0.0", features = ["avro", "crypto_expressions", "encoding_expressions", "parquet", "regex_expressions", "unicode_expressions", "compression"] }
dirs = "4.0.0"
env_logger = "0.9"
mimalloc = { version = "0.1", default-features = false }
Expand Down
3 changes: 1 addition & 2 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ path = "src/lib.rs"
[features]
avro = ["apache-avro"]
backtrace = []
default = ["parquet"]
pyarrow = ["pyo3", "arrow/pyarrow"]
pyarrow = ["pyo3", "arrow/pyarrow", "parquet"]

[dependencies]
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
Expand Down
1 change: 1 addition & 0 deletions datafusion/common/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ pub fn arrow_test_data() -> String {
/// let filename = format!("{}/binary.parquet", testdata);
/// assert!(std::path::PathBuf::from(filename).exists());
/// ```
#[cfg(feature = "parquet")]
pub fn parquet_test_data() -> String {
match get_data_dir("PARQUET_TEST_DATA", "../../parquet-testing/data") {
Ok(pb) => pb.display().to_string(),
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ avro = ["apache-avro", "num-traits", "datafusion-common/avro"]
backtrace = ["datafusion-common/backtrace"]
compression = ["xz2", "bzip2", "flate2", "zstd", "async-compression"]
crypto_expressions = ["datafusion-physical-expr/crypto_expressions", "datafusion-optimizer/crypto_expressions"]
default = ["crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions", "compression"]
default = ["crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions", "compression", "parquet"]
encoding_expressions = ["datafusion-physical-expr/encoding_expressions"]
# Used for testing ONLY: causes all values to hash to the same value (test for collisions)
force_hash_collisions = []
pyarrow = ["datafusion-common/pyarrow"]
parquet = ["datafusion-common/parquet", "dep:parquet"]
pyarrow = ["datafusion-common/pyarrow", "parquet"]
regex_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion-optimizer/regex_expressions"]
serde = ["arrow-schema/serde"]
simd = ["arrow/simd"]
Expand All @@ -61,7 +62,7 @@ bytes = "1.4"
bzip2 = { version = "0.4.3", optional = true }
chrono = { workspace = true }
dashmap = "5.4.0"
datafusion-common = { path = "../common", version = "32.0.0", features = ["parquet", "object_store"] }
datafusion-common = { path = "../common", version = "32.0.0", features = ["object_store"], default-features = false }
datafusion-execution = { path = "../execution", version = "32.0.0" }
datafusion-expr = { path = "../expr", version = "32.0.0" }
datafusion-optimizer = { path = "../optimizer", version = "32.0.0", default-features = false }
Expand All @@ -80,7 +81,7 @@ num-traits = { version = "0.2", optional = true }
num_cpus = "1.13.0"
object_store = "0.7.0"
parking_lot = "0.12"
parquet = { workspace = true }
parquet = { workspace = true, optional = true }
percent-encoding = "2.2.0"
pin-project-lite = "^0.2.7"
rand = "0.8"
Expand All @@ -93,7 +94,6 @@ uuid = { version = "1.0", features = ["v4"] }
xz2 = { version = "0.1", optional = true }
zstd = { version = "0.13", optional = true, default-features = false }


[dev-dependencies]
async-trait = "0.1.53"
bigdecimal = "0.4.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

//! [`DataFrame`] API for building and executing query plans.

#[cfg(feature = "parquet")]
mod parquet;

use std::any::Any;
use std::sync::Arc;

Expand All @@ -27,15 +30,11 @@ use arrow::datatypes::{DataType, Field};
use async_trait::async_trait;
use datafusion_common::file_options::csv_writer::CsvWriterOptions;
use datafusion_common::file_options::json_writer::JsonWriterOptions;
use datafusion_common::file_options::parquet_writer::{
default_builder, ParquetWriterOptions,
};
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{
DataFusionError, FileType, FileTypeWriterOptions, SchemaError, UnnestOptions,
};
use datafusion_expr::dml::CopyOptions;
use parquet::file::properties::WriterProperties;

use datafusion_common::{Column, DFSchema, ScalarValue};
use datafusion_expr::{
Expand Down Expand Up @@ -1067,40 +1066,6 @@ impl DataFrame {
DataFrame::new(self.session_state, plan).collect().await
}

/// Write a `DataFrame` to a Parquet file.
pub async fn write_parquet(
self,
path: &str,
options: DataFrameWriteOptions,
writer_properties: Option<WriterProperties>,
) -> Result<Vec<RecordBatch>, DataFusionError> {
if options.overwrite {
return Err(DataFusionError::NotImplemented(
"Overwrites are not implemented for DataFrame::write_parquet.".to_owned(),
));
}
match options.compression{
CompressionTypeVariant::UNCOMPRESSED => (),
_ => return Err(DataFusionError::Configuration("DataFrame::write_parquet method does not support compression set via DataFrameWriteOptions. Set parquet compression via writer_properties instead.".to_owned()))
}
let props = match writer_properties {
Some(props) => props,
None => default_builder(self.session_state.config_options())?.build(),
};
let file_type_writer_options =
FileTypeWriterOptions::Parquet(ParquetWriterOptions::new(props));
let copy_options = CopyOptions::WriterOptions(Box::new(file_type_writer_options));
let plan = LogicalPlanBuilder::copy_to(
self.plan,
path.into(),
FileType::PARQUET,
options.single_file_output,
copy_options,
)?
.build()?;
DataFrame::new(self.session_state, plan).collect().await
}

/// Executes a query and writes the results to a partitioned JSON file.
pub async fn write_json(
self,
Expand Down Expand Up @@ -1365,19 +1330,12 @@ mod tests {
WindowFunction,
};
use datafusion_physical_expr::expressions::Column;
use object_store::local::LocalFileSystem;
use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel};
use parquet::file::reader::FileReader;
use tempfile::TempDir;
use url::Url;

use crate::execution::context::SessionConfig;
use crate::execution::options::{CsvReadOptions, ParquetReadOptions};
use crate::physical_plan::ColumnarValue;
use crate::physical_plan::Partitioning;
use crate::physical_plan::PhysicalExpr;
use crate::test_util;
use crate::test_util::parquet_test_data;
use crate::test_util::{register_aggregate_csv, test_table, test_table_with_name};
use crate::{assert_batches_sorted_eq, execution::context::SessionContext};

use super::*;
Expand Down Expand Up @@ -1798,31 +1756,6 @@ mod tests {
Ok(ctx.sql(sql).await?.into_unoptimized_plan())
}

async fn test_table_with_name(name: &str) -> Result<DataFrame> {
let mut ctx = SessionContext::new();
register_aggregate_csv(&mut ctx, name).await?;
ctx.table(name).await
}

async fn test_table() -> Result<DataFrame> {
test_table_with_name("aggregate_test_100").await
}

async fn register_aggregate_csv(
ctx: &mut SessionContext,
table_name: &str,
) -> Result<()> {
let schema = test_util::aggr_test_schema();
let testdata = test_util::arrow_test_data();
ctx.register_csv(
table_name,
&format!("{testdata}/csv/aggregate_test_100.csv"),
CsvReadOptions::new().schema(schema.as_ref()),
)
.await?;
Ok(())
}

#[tokio::test]
async fn with_column() -> Result<()> {
let df = test_table().await?.select_columns(&["c1", "c2", "c3"])?;
Expand Down Expand Up @@ -2227,33 +2160,6 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn filter_pushdown_dataframe() -> Result<()> {
let ctx = SessionContext::new();

ctx.register_parquet(
"test",
&format!("{}/alltypes_plain.snappy.parquet", parquet_test_data()),
ParquetReadOptions::default(),
)
.await?;

ctx.register_table("t1", ctx.table("test").await?.into_view())?;

let df = ctx
.table("t1")
.await?
.filter(col("id").eq(lit(1)))?
.select_columns(&["bool_col", "int_col"])?;

let plan = df.explain(false, false)?.collect().await?;
// Filters all the way to Parquet
let formatted = pretty::pretty_format_batches(&plan)?.to_string();
assert!(formatted.contains("FilterExec: id@0 = 1"));

Ok(())
}

#[tokio::test]
async fn cast_expr_test() -> Result<()> {
let df = test_table()
Expand Down Expand Up @@ -2538,53 +2444,4 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn write_parquet_with_compression() -> Result<()> {
let test_df = test_table().await?;

let output_path = "file://local/test.parquet";
let test_compressions = vec![
parquet::basic::Compression::SNAPPY,
parquet::basic::Compression::LZ4,
parquet::basic::Compression::LZ4_RAW,
parquet::basic::Compression::GZIP(GzipLevel::default()),
parquet::basic::Compression::BROTLI(BrotliLevel::default()),
parquet::basic::Compression::ZSTD(ZstdLevel::default()),
];
for compression in test_compressions.into_iter() {
let df = test_df.clone();
let tmp_dir = TempDir::new()?;
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
let local_url = Url::parse("file://local").unwrap();
let ctx = &test_df.session_state;
ctx.runtime_env().register_object_store(&local_url, local);
df.write_parquet(
output_path,
DataFrameWriteOptions::new().with_single_file_output(true),
Some(
WriterProperties::builder()
.set_compression(compression)
.build(),
),
)
.await?;

// Check that file actually used the specified compression
let file = std::fs::File::open(tmp_dir.into_path().join("test.parquet"))?;

let reader =
parquet::file::serialized_reader::SerializedFileReader::new(file)
.unwrap();

let parquet_metadata = reader.metadata();

let written_compression =
parquet_metadata.row_group(0).column(0).compression();

assert_eq!(written_compression, compression);
}

Ok(())
}
}
Loading

0 comments on commit 12a6316

Please sign in to comment.