Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change default Parquet writer settings to match arrow-rs (except for compression & statistics) #11558

Merged
merged 7 commits into from
Jul 23, 2024
97 changes: 88 additions & 9 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,85 @@ config_namespace! {
}
}

/// When using the parquet feature,
/// use the same default writer settings as the extern parquet.
#[cfg(feature = "parquet")]
wiedld marked this conversation as resolved.
Show resolved Hide resolved
mod parquet_defaults {
use parquet::basic::Compression;
use parquet::file::properties as props;

/// Default value for [`props::WriterProperties::data_page_size_limit`]
pub const DEFAULT_PAGE_SIZE: usize = props::DEFAULT_PAGE_SIZE;
/// Default value for [`props::WriterProperties::write_batch_size`]
pub const DEFAULT_WRITE_BATCH_SIZE: usize = props::DEFAULT_WRITE_BATCH_SIZE;
/// Default value for [`props::WriterProperties::writer_version`]
pub const DEFAULT_WRITER_VERSION: &str = "1.0";
/// Default value for [`props::WriterProperties::dictionary_enabled`]
pub const DEFAULT_DICTIONARY_ENABLED: Option<bool> =
Some(props::DEFAULT_DICTIONARY_ENABLED);
/// Default value for [`props::WriterProperties::dictionary_page_size_limit`]
pub const DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT: usize =
props::DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT;
/// Default value for [`props::WriterProperties::data_page_row_count_limit`]
pub const DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT: usize =
props::DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT;
/// Default value for [`props::WriterProperties::max_statistics_size`]
pub const DEFAULT_MAX_STATISTICS_SIZE: Option<usize> =
Some(props::DEFAULT_MAX_STATISTICS_SIZE);
/// Default value for [`props::WriterProperties::max_row_group_size`]
pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = props::DEFAULT_MAX_ROW_GROUP_SIZE;
/// Default value for [`props::WriterProperties::column_index_truncate_length`]
pub const DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH: Option<usize> =
props::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;

// TODO: discuss if we want datafusion to use these defaults from the extern parquet
// refer to https://github.com/apache/datafusion/issues/11367

#[allow(dead_code)]
/// Default value for [`props::WriterProperties::statistics_enabled`]
pub const DEFAULT_STATISTICS_ENABLED: Option<&str> = Some("page");
#[allow(dead_code)]
/// Default value for [`props::BloomFilterProperties::fpp`]
pub const DEFAULT_BLOOM_FILTER_FPP: Option<f64> =
Some(props::DEFAULT_BLOOM_FILTER_FPP);
#[allow(dead_code)]
/// Default value for [`props::BloomFilterProperties::ndv`]
pub const DEFAULT_BLOOM_FILTER_NDV: Option<u64> =
Some(props::DEFAULT_BLOOM_FILTER_NDV);

#[allow(dead_code)]
/// Default value for [props::WriterProperties::compression`]
pub const DEFAULT_COMPRESSION: Compression = Compression::UNCOMPRESSED;
}

/// When note using the parquet feature, provide a manual copy
/// of the extern parquet's settings in order to compile.
///
/// This is required since the [`ParquetOptions`] are extended with the
/// `config_namespace` macro, which does not handle internal configuration macros.
#[cfg(not(feature = "parquet"))]
mod parquet_defaults {
pub const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
pub const DEFAULT_WRITE_BATCH_SIZE: usize = 1024;
pub const DEFAULT_WRITER_VERSION: &str = "1.0";
pub const DEFAULT_DICTIONARY_ENABLED: Option<bool> = Some(true);
pub const DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT: usize = DEFAULT_PAGE_SIZE;
pub const DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT: usize = 20_000;
pub const DEFAULT_MAX_STATISTICS_SIZE: Option<usize> = Some(4096);
pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
pub const DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH: Option<usize> = Some(64);

// TODO: discuss if we want datafusion to use these defaults from the extern parquet
// refer to https://github.com/apache/datafusion/issues/11367

#[allow(dead_code)]
pub const DEFAULT_STATISTICS_ENABLED: Option<&str> = Some("page");
#[allow(dead_code)]
pub const DEFAULT_BLOOM_FILTER_FPP: Option<f64> = Some(0.05);
#[allow(dead_code)]
pub const DEFAULT_BLOOM_FILTER_NDV: Option<u64> = Some(1_000_000_u64);
}

config_namespace! {
/// Options for reading and writing parquet files
///
Expand Down Expand Up @@ -357,14 +436,14 @@ config_namespace! {
// and map to parquet::file::properties::WriterProperties

/// (writing) Sets best effort maximum size of data page in bytes
pub data_pagesize_limit: usize, default = 1024 * 1024
pub data_pagesize_limit: usize, default = parquet_defaults::DEFAULT_PAGE_SIZE

/// (writing) Sets write_batch_size in bytes
pub write_batch_size: usize, default = 1024
pub write_batch_size: usize, default = parquet_defaults::DEFAULT_WRITE_BATCH_SIZE

/// (writing) Sets parquet writer version
/// valid values are "1.0" and "2.0"
pub writer_version: String, default = "1.0".into()
pub writer_version: String, default = parquet_defaults::DEFAULT_WRITER_VERSION.to_string()

/// (writing) Sets default parquet compression codec.
/// Valid values are: uncompressed, snappy, gzip(level),
Expand All @@ -375,10 +454,10 @@ config_namespace! {

/// (writing) Sets if dictionary encoding is enabled. If NULL, uses
/// default parquet writer setting
pub dictionary_enabled: Option<bool>, default = None
pub dictionary_enabled: Option<bool>, default = parquet_defaults::DEFAULT_DICTIONARY_ENABLED

/// (writing) Sets best effort maximum dictionary page size, in bytes
pub dictionary_page_size_limit: usize, default = 1024 * 1024
pub dictionary_page_size_limit: usize, default = parquet_defaults::DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT

/// (writing) Sets if statistics are enabled for any column
/// Valid values are: "none", "chunk", and "page"
Expand All @@ -388,21 +467,21 @@ config_namespace! {

/// (writing) Sets max statistics size for any column. If NULL, uses
/// default parquet writer setting
pub max_statistics_size: Option<usize>, default = None
pub max_statistics_size: Option<usize>, default = parquet_defaults::DEFAULT_MAX_STATISTICS_SIZE

/// (writing) Target maximum number of rows in each row group (defaults to 1M
/// rows). Writing larger row groups requires more memory to write, but
/// can get better compression and be faster to read.
pub max_row_group_size: usize, default = 1024 * 1024
pub max_row_group_size: usize, default = parquet_defaults::DEFAULT_MAX_ROW_GROUP_SIZE

/// (writing) Sets "created by" property
pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()

/// (writing) Sets column index truncate length
pub column_index_truncate_length: Option<usize>, default = None
pub column_index_truncate_length: Option<usize>, default = parquet_defaults::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH

/// (writing) Sets best effort maximum number of rows in data page
pub data_page_row_count_limit: usize, default = usize::MAX
pub data_page_row_count_limit: usize, default = parquet_defaults::DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT

/// (writing) Sets default encoding for any column.
/// Valid values are: plain, plain_dictionary, rle,
Expand Down
72 changes: 0 additions & 72 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,56 +644,6 @@ mod tests {
"datafusion's default is zstd"
);

// TODO: data_page_row_count_limit defaults do not match
// refer to https://github.com/apache/datafusion/issues/11367
assert_eq!(
default_writer_props.data_page_row_count_limit(),
20_000,
"extern parquet's default data_page_row_count_limit is 20_000"
);
assert_eq!(
from_datafusion_defaults.data_page_row_count_limit(),
usize::MAX,
"datafusion's default is usize::MAX"
);

// TODO: column_index_truncate_length do not match
// refer to https://github.com/apache/datafusion/issues/11367
assert_eq!(
default_writer_props.column_index_truncate_length(),
Some(64),
"extern parquet's default is 64"
);
assert_eq!(
from_datafusion_defaults.column_index_truncate_length(),
None,
"datafusion's default is None"
);

// The next few examples are where datafusion's default is None.
// But once datafusion's TableParquetOptions are converted to a WriterProperties,
// then we get the extern parquet's defaults.
//
// In other words, we do not get indeterminate behavior in the output writer props.
// But this is only because we use the extern parquet's defaults when we leave
// the datafusion setting as None.

// datafusion's `None` for Option<bool> => becomes parquet's true
// TODO: should this be changed?
// refer to https://github.com/apache/datafusion/issues/11367
assert!(
default_writer_props.dictionary_enabled(&"default".into()),
"extern parquet's default is true"
);
assert_eq!(
default_table_writer_opts.global.dictionary_enabled, None,
"datafusion's has no default"
);
assert!(
from_datafusion_defaults.dictionary_enabled(&"default".into()),
"should see the extern parquet's default over-riding datafusion's None",
);

// datafusion's `None` for Option<String> => becomes parquet's EnabledStatistics::Page
// TODO: should this be changed?
// refer to https://github.com/apache/datafusion/issues/11367
Expand All @@ -712,35 +662,13 @@ mod tests {
"should see the extern parquet's default over-riding datafusion's None",
);

// datafusion's `None` for Option<usize> => becomes parquet's 4096
// TODO: should this be changed?
// refer to https://github.com/apache/datafusion/issues/11367
assert_eq!(
default_writer_props.max_statistics_size(&"default".into()),
4096,
"extern parquet's default is 4096"
);
assert_eq!(
default_table_writer_opts.global.max_statistics_size, None,
"datafusion's has no default"
);
assert_eq!(
default_writer_props.max_statistics_size(&"default".into()),
4096,
"should see the extern parquet's default over-riding datafusion's None",
);

// Confirm all other settings are equal.
// First resolve the known discrepancies, (set as the same).
// TODO: once we fix the above mis-matches, we should be able to remove this.
let mut from_extern_parquet =
session_config_from_writer_props(&default_writer_props);
from_extern_parquet.global.compression = Some("zstd(3)".into());
from_extern_parquet.global.data_page_row_count_limit = usize::MAX;
from_extern_parquet.global.column_index_truncate_length = None;
from_extern_parquet.global.dictionary_enabled = None;
from_extern_parquet.global.statistics_enabled = None;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any rationale for not setting the statistics value to the same as in the arrow-rs writer?

Copy link
Contributor Author

@wiedld wiedld Jul 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The statistics_enabled and the bloom filter setting will require a bit more updates to do. In order to keep the diff small, (and more easily reviewable), I was going to do in a follow up PR. Is that ok @alamb ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SOunds good to me 👍 thank you

from_extern_parquet.global.max_statistics_size = None;

// Expected: the remaining should match
let same_created_by = default_table_writer_opts.global.created_by.clone(); // we expect these to be different
Expand Down
16 changes: 8 additions & 8 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -183,17 +183,17 @@ datafusion.execution.parquet.bloom_filter_fpp NULL
datafusion.execution.parquet.bloom_filter_ndv NULL
datafusion.execution.parquet.bloom_filter_on_read true
datafusion.execution.parquet.bloom_filter_on_write false
datafusion.execution.parquet.column_index_truncate_length NULL
datafusion.execution.parquet.column_index_truncate_length 64
datafusion.execution.parquet.compression zstd(3)
datafusion.execution.parquet.created_by datafusion
datafusion.execution.parquet.data_page_row_count_limit 18446744073709551615
datafusion.execution.parquet.data_page_row_count_limit 20000
datafusion.execution.parquet.data_pagesize_limit 1048576
datafusion.execution.parquet.dictionary_enabled NULL
datafusion.execution.parquet.dictionary_enabled true
datafusion.execution.parquet.dictionary_page_size_limit 1048576
datafusion.execution.parquet.enable_page_index true
datafusion.execution.parquet.encoding NULL
datafusion.execution.parquet.max_row_group_size 1048576
datafusion.execution.parquet.max_statistics_size NULL
datafusion.execution.parquet.max_statistics_size 4096
datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2
datafusion.execution.parquet.maximum_parallel_row_group_writers 1
datafusion.execution.parquet.metadata_size_hint NULL
Expand Down Expand Up @@ -267,17 +267,17 @@ datafusion.execution.parquet.bloom_filter_fpp NULL (writing) Sets bloom filter f
datafusion.execution.parquet.bloom_filter_ndv NULL (writing) Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting
datafusion.execution.parquet.bloom_filter_on_read true (writing) Use any available bloom filters when reading parquet files
datafusion.execution.parquet.bloom_filter_on_write false (writing) Write bloom filters for all columns when creating parquet files
datafusion.execution.parquet.column_index_truncate_length NULL (writing) Sets column index truncate length
datafusion.execution.parquet.column_index_truncate_length 64 (writing) Sets column index truncate length
datafusion.execution.parquet.compression zstd(3) (writing) Sets default parquet compression codec. Valid values are: uncompressed, snappy, gzip(level), lzo, brotli(level), lz4, zstd(level), and lz4_raw. These values are not case sensitive. If NULL, uses default parquet writer setting
datafusion.execution.parquet.created_by datafusion (writing) Sets "created by" property
datafusion.execution.parquet.data_page_row_count_limit 18446744073709551615 (writing) Sets best effort maximum number of rows in data page
datafusion.execution.parquet.data_page_row_count_limit 20000 (writing) Sets best effort maximum number of rows in data page
datafusion.execution.parquet.data_pagesize_limit 1048576 (writing) Sets best effort maximum size of data page in bytes
datafusion.execution.parquet.dictionary_enabled NULL (writing) Sets if dictionary encoding is enabled. If NULL, uses default parquet writer setting
datafusion.execution.parquet.dictionary_enabled true (writing) Sets if dictionary encoding is enabled. If NULL, uses default parquet writer setting
datafusion.execution.parquet.dictionary_page_size_limit 1048576 (writing) Sets best effort maximum dictionary page size, in bytes
datafusion.execution.parquet.enable_page_index true (reading) If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded.
datafusion.execution.parquet.encoding NULL (writing) Sets default encoding for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting
datafusion.execution.parquet.max_row_group_size 1048576 (writing) Target maximum number of rows in each row group (defaults to 1M rows). Writing larger row groups requires more memory to write, but can get better compression and be faster to read.
datafusion.execution.parquet.max_statistics_size NULL (writing) Sets max statistics size for any column. If NULL, uses default parquet writer setting
datafusion.execution.parquet.max_statistics_size 4096 (writing) Sets max statistics size for any column. If NULL, uses default parquet writer setting
datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2 (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame.
datafusion.execution.parquet.maximum_parallel_row_group_writers 1 (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame.
datafusion.execution.parquet.metadata_size_hint NULL (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer
Expand Down
Loading