Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a config to force using string view in benchmark #11514

Merged
merged 8 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ large_futures = "warn"
[workspace.lints.rust]
unused_imports = "deny"


## Temporary arrow-rs patch until 52.2.0 is released

[patch.crates-io]
Expand Down
4 changes: 3 additions & 1 deletion benchmarks/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ impl RunOpt {
None => queries.min_query_id()..=queries.max_query_id(),
};

let config = self.common.config();
let mut config = self.common.config();
config.options_mut().execution.schema_force_string_view = self.common.string_view;

let ctx = SessionContext::new_with_config(config);
self.register_hits(&ctx).await?;

Expand Down
3 changes: 3 additions & 0 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ impl RunOpt {
.config()
.with_collect_statistics(!self.disable_statistics);
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
config.options_mut().execution.schema_force_string_view = self.common.string_view;
let ctx = SessionContext::new_with_config(config);

// register tables
Expand Down Expand Up @@ -339,6 +340,7 @@ mod tests {
partitions: Some(2),
batch_size: 8192,
debug: false,
string_view: false,
};
let opt = RunOpt {
query: Some(query),
Expand Down Expand Up @@ -372,6 +374,7 @@ mod tests {
partitions: Some(2),
batch_size: 8192,
debug: false,
string_view: false,
};
let opt = RunOpt {
query: Some(query),
Expand Down
5 changes: 5 additions & 0 deletions benchmarks/src/util/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ pub struct CommonOpt {
/// Activate debug mode to see more details
#[structopt(short, long)]
pub debug: bool,

/// If true, will use StringView/BinaryViewArray instead of String/BinaryArray
/// when reading ParquetFiles
#[structopt(long)]
pub string_view: bool,
}

impl CommonOpt {
Expand Down
4 changes: 4 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,10 @@ config_namespace! {

/// Should DataFusion keep the columns used for partition_by in the output RecordBatches
pub keep_partition_by_columns: bool, default = false

/// If true, listing tables will read columns of `Utf8/Utf8Large` with `Utf8View`,
/// and `Binary/BinaryLarge` with `BinaryView`.
pub schema_force_string_view: bool, default = false
}
}

Expand Down
27 changes: 26 additions & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,32 @@ impl ListingOptions {
.try_collect()
.await?;

self.format.infer_schema(state, &store, &files).await
let mut schema = self.format.infer_schema(state, &store, &files).await?;

if state.config_options().execution.schema_force_string_view {
let transformed_fields: Vec<Arc<Field>> = schema
.fields
.iter()
.map(|field| match field.data_type() {
DataType::Utf8 | DataType::LargeUtf8 => Arc::new(Field::new(
field.name(),
DataType::Utf8View,
field.is_nullable(),
)),
DataType::Binary | DataType::LargeBinary => Arc::new(Field::new(
field.name(),
DataType::BinaryView,
field.is_nullable(),
)),
_ => field.clone(),
})
.collect();
schema = Arc::new(Schema::new_with_metadata(
transformed_fields,
schema.metadata.clone(),
));
}
Ok(schema)
}

/// Infers the partition columns stored in `LOCATION` and compares
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ datafusion.execution.parquet.statistics_enabled NULL
datafusion.execution.parquet.write_batch_size 1024
datafusion.execution.parquet.writer_version 1.0
datafusion.execution.planning_concurrency 13
datafusion.execution.schema_force_string_view false
datafusion.execution.soft_max_rows_per_output_file 50000000
datafusion.execution.sort_in_place_threshold_bytes 1048576
datafusion.execution.sort_spill_reservation_bytes 10485760
Expand Down Expand Up @@ -289,6 +290,7 @@ datafusion.execution.parquet.statistics_enabled NULL Sets if statistics are enab
datafusion.execution.parquet.write_batch_size 1024 Sets write_batch_size in bytes
datafusion.execution.parquet.writer_version 1.0 Sets parquet writer version valid values are "1.0" and "2.0"
datafusion.execution.planning_concurrency 13 Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system
datafusion.execution.schema_force_string_view false If true, the parquet reader will replace `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`.
datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max
datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged.
datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured).
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.execution.enable_recursive_ctes | true | Should DataFusion support recursive CTEs |
| datafusion.execution.split_file_groups_by_statistics | false | Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental |
| datafusion.execution.keep_partition_by_columns | false | Should DataFusion keep the columns used for partition_by in the output RecordBatches |
| datafusion.execution.schema_force_string_view | false | If true, the parquet reader will replace `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. |
| datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. |
| datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores |
| datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible |
Expand Down
Loading