From affe13677a74e7402d9acf048e1a901a4398e6db Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Fri, 15 Nov 2024 12:46:18 +0800 Subject: [PATCH] review --- benchmarks/README.md | 10 +- benchmarks/bench.sh | 15 +- benchmarks/src/bin/dfbench.rs | 4 +- benchmarks/src/lib.rs | 1 + benchmarks/src/sort_tpch.rs | 320 ++++++++++++++++++++++++++++++++++ 5 files changed, 337 insertions(+), 13 deletions(-) create mode 100644 benchmarks/src/sort_tpch.rs diff --git a/benchmarks/README.md b/benchmarks/README.md index a5c37ca3f5fc..8051b943f5d0 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -330,23 +330,23 @@ steps. The tests sort the entire dataset using several different sort orders. -## Sort Integration +## Sort TPCH Test performance of end-to-end sort SQL queries. (While the `Sort` benchmark focuses on a single sort executor, this benchmark tests how sorting is executed across multiple CPU cores by benchmarking sorting the whole relational table.) Sort integration benchmark runs whole table sort queries on TPCH `lineitem` table, with different characteristics. For example, different number of sort keys, different sort key cardinality, different number of payload columns, etc. -See [`sort_integration.rs`](src/bin/sort_integration.rs) for more details. +See [`sort_tpch.rs`](src/sort_tpch.rs) for more details. -### Sort Integration Benchmark Example Runs +### Sort TPCH Benchmark Example Runs 1. Run all queries with default setting: ```bash - cargo run --release --bin sort_integration -- benchmark -p '....../datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_integration.json' + cargo run --release --bin dfbench -- sort-tpch -p '....../datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_integration.json' ``` 2. Run a specific query: ```bash - cargo run --release --bin sort_integration -- benchmark -p '....../datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_integration.json' --query 2 + cargo run --release --bin dfbench -- sort-tpch -p '....../datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_integration.json' --query 2 ``` 3. Run all queries with `bench.sh` script: diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 8a2fc1a08b7d..b02bfee2454e 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -75,6 +75,7 @@ tpch10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), tpch_mem10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), query from memory parquet: Benchmark of parquet reader's filtering speed sort: Benchmark of sorting speed +sort_tpch: Benchmark of sorting speed for end-to-end sort queries on TPCH dataset clickbench_1: ClickBench queries against a single parquet file clickbench_partitioned: ClickBench queries against a partitioned (100 files) parquet clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific) @@ -175,7 +176,7 @@ main() { # same data as for tpch data_tpch "1" ;; - sort_integration) + sort_tpch) # same data as for tpch data_tpch "1" ;; @@ -256,8 +257,8 @@ main() { external_aggr) run_external_aggr ;; - sort_integration) - run_sort_integration + sort_tpch) + run_sort_tpch ;; *) echo "Error: unknown benchmark '$BENCHMARK' for run" @@ -557,13 +558,13 @@ run_external_aggr() { } # Runs the sort integration benchmark -run_sort_integration() { +run_sort_tpch() { TPCH_DIR="${DATA_DIR}/tpch_sf1" - RESULTS_FILE="${RESULTS_DIR}/sort_integration.json" + RESULTS_FILE="${RESULTS_DIR}/sort_tpch.json" echo "RESULTS_FILE: ${RESULTS_FILE}" - echo "Running sort integration benchmark..." + echo "Running sort tpch benchmark..." - $CARGO_COMMAND --bin sort_integration -- benchmark --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" + $CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" } diff --git a/benchmarks/src/bin/dfbench.rs b/benchmarks/src/bin/dfbench.rs index f7b84116e793..81aa5437dd5f 100644 --- a/benchmarks/src/bin/dfbench.rs +++ b/benchmarks/src/bin/dfbench.rs @@ -33,7 +33,7 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; #[global_allocator] static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; -use datafusion_benchmarks::{clickbench, imdb, parquet_filter, sort, tpch}; +use datafusion_benchmarks::{clickbench, imdb, parquet_filter, sort, sort_tpch, tpch}; #[derive(Debug, StructOpt)] #[structopt(about = "benchmark command")] @@ -43,6 +43,7 @@ enum Options { Clickbench(clickbench::RunOpt), ParquetFilter(parquet_filter::RunOpt), Sort(sort::RunOpt), + SortTpch(sort_tpch::RunOpt), Imdb(imdb::RunOpt), } @@ -57,6 +58,7 @@ pub async fn main() -> Result<()> { Options::Clickbench(opt) => opt.run().await, Options::ParquetFilter(opt) => opt.run().await, Options::Sort(opt) => opt.run().await, + Options::SortTpch(opt) => opt.run().await, Options::Imdb(opt) => opt.run().await, } } diff --git a/benchmarks/src/lib.rs b/benchmarks/src/lib.rs index 02410e0cfa01..2d37d78764d7 100644 --- a/benchmarks/src/lib.rs +++ b/benchmarks/src/lib.rs @@ -20,5 +20,6 @@ pub mod clickbench; pub mod imdb; pub mod parquet_filter; pub mod sort; +pub mod sort_tpch; pub mod tpch; pub mod util; diff --git a/benchmarks/src/sort_tpch.rs b/benchmarks/src/sort_tpch.rs new file mode 100644 index 000000000000..4b83b3b8889a --- /dev/null +++ b/benchmarks/src/sort_tpch.rs @@ -0,0 +1,320 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module provides integration benchmark for sort operation. +//! It will run different sort SQL queries on TPCH `lineitem` parquet dataset. +//! +//! Another `Sort` benchmark focus on single core execution. This benchmark +//! runs end-to-end sort queries and test the performance on multiple CPU cores. + +use futures::StreamExt; +use std::path::PathBuf; +use std::sync::Arc; +use structopt::StructOpt; + +use datafusion::datasource::file_format::parquet::ParquetFormat; +use datafusion::datasource::listing::{ + ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, +}; +use datafusion::datasource::{MemTable, TableProvider}; +use datafusion::error::Result; +use datafusion::execution::runtime_env::RuntimeConfig; +use datafusion::physical_plan::display::DisplayableExecutionPlan; +use datafusion::physical_plan::{displayable, execute_stream}; +use datafusion::prelude::*; +use datafusion_common::instant::Instant; +use datafusion_common::DEFAULT_PARQUET_EXTENSION; + +use crate::util::{BenchmarkRun, CommonOpt}; + +#[derive(Debug, StructOpt)] +pub struct RunOpt { + /// Common options + #[structopt(flatten)] + common: CommonOpt, + + /// Sort query number. If not specified, runs all queries + #[structopt(short, long)] + query: Option, + + /// Path to data files (lineitem). Only parquet format is supported + #[structopt(parse(from_os_str), required = true, short = "p", long = "path")] + path: PathBuf, + + /// Path to JSON benchmark result to be compare using `compare.py` + #[structopt(parse(from_os_str), short = "o", long = "output")] + output_path: Option, + + /// Load the data into a MemTable before executing the query + #[structopt(short = "m", long = "mem-table")] + mem_table: bool, +} + +struct QueryResult { + elapsed: std::time::Duration, + row_count: usize, +} + +impl RunOpt { + const SORT_TABLES: [&'static str; 1] = ["lineitem"]; + + /// Sort queries with different characteristics: + /// - Sort key with fixed length or variable length (VARCHAR) + /// - Sort key with different cardinality + /// - Different number of sort keys + /// - Different number of payload columns (thin: 1 additional column other + /// than sort keys; wide: all columns except sort keys) + /// + /// DataSet is `lineitem` table in TPCH dataset (16 columns, 6M rows for + /// scale factor 1.0, cardinality is counted from SF1 dataset) + /// + /// Key Columns: + /// - Column `l_linenumber`, type: `INTEGER`, cardinality: 7 + /// - Column `l_suppkey`, type: `BIGINT`, cardinality: 10k + /// - Column `l_orderkey`, type: `BIGINT`, cardinality: 1.5M + /// - Column `l_comment`, type: `VARCHAR`, cardinality: 4.5M (len is ~26 chars) + /// + /// Payload Columns: + /// - Thin variant: `l_partkey` column with `BIGINT` type (1 column) + /// - Wide variant: all columns except for possible key columns (12 columns) + const SORT_QUERIES: [&'static str; 10] = [ + // Q1: 1 sort key (type: INTEGER, cardinality: 7) + 1 payload column + r#" + SELECT l_linenumber, l_partkey + FROM lineitem + ORDER BY l_linenumber + "#, + // Q2: 1 sort key (type: BIGINT, cardinality: 1.5M) + 1 payload column + r#" + SELECT l_orderkey, l_partkey + FROM lineitem + ORDER BY l_orderkey + "#, + // Q3: 1 sort key (type: VARCHAR, cardinality: 4.5M) + 1 payload column + r#" + SELECT l_comment, l_partkey + FROM lineitem + ORDER BY l_comment + "#, + // Q4: 2 sort keys {(BIGINT, 1.5M), (INTEGER, 7)} + 1 payload column + r#" + SELECT l_orderkey, l_linenumber, l_partkey + FROM lineitem + ORDER BY l_orderkey, l_linenumber + "#, + // Q5: 3 sort keys {(INTEGER, 7), (BIGINT, 10k), (BIGINT, 1.5M)} + no payload column + r#" + SELECT l_linenumber, l_suppkey, l_orderkey + FROM lineitem + ORDER BY l_linenumber, l_suppkey, l_orderkey + "#, + // Q6: 3 sort keys {(INTEGER, 7), (BIGINT, 10k), (BIGINT, 1.5M)} + 1 payload column + r#" + SELECT l_linenumber, l_suppkey, l_orderkey, l_partkey + FROM lineitem + ORDER BY l_linenumber, l_suppkey, l_orderkey + "#, + // Q7: 3 sort keys {(INTEGER, 7), (BIGINT, 10k), (BIGINT, 1.5M)} + 12 all other columns + r#" + SELECT l_linenumber, l_suppkey, l_orderkey, + l_partkey, l_quantity, l_extendedprice, l_discount, l_tax, + l_returnflag, l_linestatus, l_shipdate, l_commitdate, + l_receiptdate, l_shipinstruct, l_shipmode + FROM lineitem + ORDER BY l_linenumber, l_suppkey, l_orderkey + "#, + // Q8: 4 sort keys {(BIGINT, 1.5M), (BIGINT, 10k), (INTEGER, 7), (VARCHAR, 4.5M)} + no payload column + r#" + SELECT l_orderkey, l_suppkey, l_linenumber, l_comment + FROM lineitem + ORDER BY l_orderkey, l_suppkey, l_linenumber, l_comment + "#, + // Q9: 4 sort keys {(BIGINT, 1.5M), (BIGINT, 10k), (INTEGER, 7), (VARCHAR, 4.5M)} + 1 payload column + r#" + SELECT l_orderkey, l_suppkey, l_linenumber, l_comment, l_partkey + FROM lineitem + ORDER BY l_orderkey, l_suppkey, l_linenumber, l_comment + "#, + // Q10: 4 sort keys {(BIGINT, 1.5M), (BIGINT, 10k), (INTEGER, 7), (VARCHAR, 4.5M)} + 12 all other columns + r#" + SELECT l_orderkey, l_suppkey, l_linenumber, l_comment, + l_partkey, l_quantity, l_extendedprice, l_discount, l_tax, + l_returnflag, l_linestatus, l_shipdate, l_commitdate, + l_receiptdate, l_shipinstruct, l_shipmode + FROM lineitem + ORDER BY l_orderkey, l_suppkey, l_linenumber, l_comment + "#, + ]; + + /// If query is specified from command line, run only that query. + /// Otherwise, run all queries. + pub async fn run(&self) -> Result<()> { + let mut benchmark_run = BenchmarkRun::new(); + + let query_range = match self.query { + Some(query_id) => query_id..=query_id, + None => 1..=Self::SORT_QUERIES.len(), + }; + + for query_id in query_range { + benchmark_run.start_new_case(&format!("{query_id}")); + + let query_results = self.benchmark_query(query_id).await?; + for iter in query_results { + benchmark_run.write_iter(iter.elapsed, iter.row_count); + } + } + + benchmark_run.maybe_write_json(self.output_path.as_ref())?; + + Ok(()) + } + + /// Benchmark query `query_id` in `SORT_QUERIES` + async fn benchmark_query(&self, query_id: usize) -> Result> { + let config = self.common.config(); + + let runtime_config = RuntimeConfig::new().build_arc()?; + let ctx = SessionContext::new_with_config_rt(config, runtime_config); + + // register tables + self.register_tables(&ctx).await?; + + let mut millis = vec![]; + // run benchmark + let mut query_results = vec![]; + for i in 0..self.iterations() { + let start = Instant::now(); + + let query_idx = query_id - 1; // 1-indexed -> 0-indexed + let sql = Self::SORT_QUERIES[query_idx]; + + let row_count = self.execute_query(&ctx, sql).await?; + + let elapsed = start.elapsed(); //.as_secs_f64() * 1000.0; + let ms = elapsed.as_secs_f64() * 1000.0; + millis.push(ms); + + println!( + "Q{query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows" + ); + query_results.push(QueryResult { elapsed, row_count }); + } + + let avg = millis.iter().sum::() / millis.len() as f64; + println!("Q{query_id} avg time: {avg:.2} ms"); + + Ok(query_results) + } + + async fn register_tables(&self, ctx: &SessionContext) -> Result<()> { + for table in Self::SORT_TABLES { + let table_provider = { self.get_table(ctx, table).await? }; + + if self.mem_table { + println!("Loading table '{table}' into memory"); + let start = Instant::now(); + let memtable = + MemTable::load(table_provider, Some(self.partitions()), &ctx.state()) + .await?; + println!( + "Loaded table '{}' into memory in {} ms", + table, + start.elapsed().as_millis() + ); + ctx.register_table(table, Arc::new(memtable))?; + } else { + ctx.register_table(table, table_provider)?; + } + } + Ok(()) + } + + async fn execute_query(&self, ctx: &SessionContext, sql: &str) -> Result { + let debug = self.common.debug; + let plan = ctx.sql(sql).await?; + let (state, plan) = plan.into_parts(); + + if debug { + println!("=== Logical plan ===\n{plan}\n"); + } + + let plan = state.optimize(&plan)?; + if debug { + println!("=== Optimized logical plan ===\n{plan}\n"); + } + let physical_plan = state.create_physical_plan(&plan).await?; + if debug { + println!( + "=== Physical plan ===\n{}\n", + displayable(physical_plan.as_ref()).indent(true) + ); + } + + let mut row_count = 0; + + let mut stream = execute_stream(physical_plan.clone(), state.task_ctx())?; + while let Some(batch) = stream.next().await { + row_count += batch.unwrap().num_rows(); + } + + if debug { + println!( + "=== Physical plan with metrics ===\n{}\n", + DisplayableExecutionPlan::with_metrics(physical_plan.as_ref()) + .indent(true) + ); + } + + Ok(row_count) + } + + async fn get_table( + &self, + ctx: &SessionContext, + table: &str, + ) -> Result> { + let path = self.path.to_str().unwrap(); + + // Obtain a snapshot of the SessionState + let state = ctx.state(); + let path = format!("{path}/{table}"); + let format = Arc::new( + ParquetFormat::default() + .with_options(ctx.state().table_options().parquet.clone()), + ); + let extension = DEFAULT_PARQUET_EXTENSION; + + let options = ListingOptions::new(format) + .with_file_extension(extension) + .with_collect_stat(state.config().collect_statistics()); + + let table_path = ListingTableUrl::parse(path)?; + let config = ListingTableConfig::new(table_path).with_listing_options(options); + let config = config.infer_schema(&state).await?; + + Ok(Arc::new(ListingTable::try_new(config)?)) + } + + fn iterations(&self) -> usize { + self.common.iterations + } + + fn partitions(&self) -> usize { + self.common.partitions.unwrap_or(num_cpus::get()) + } +}