Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into 5398-timestamp_with_f…
Browse files Browse the repository at this point in the history
…ormats
  • Loading branch information
Omega359 committed Jan 16, 2024
2 parents c5e230a + 8cf1abb commit ef6723b
Show file tree
Hide file tree
Showing 65 changed files with 1,965 additions and 1,132 deletions.
17 changes: 17 additions & 0 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ parquet: Benchmark of parquet reader's filtering speed
sort: Benchmark of sorting speed
clickbench_1: ClickBench queries against a single parquet file
clickbench_partitioned: ClickBench queries against a partitioned (100 files) parquet
clickbench_extended: ClickBench "inspired" queries against a single parquet (DataFusion specific)
**********
* Supported Configuration (Environment Variables)
Expand Down Expand Up @@ -155,6 +156,9 @@ main() {
clickbench_partitioned)
data_clickbench_partitioned
;;
clickbench_extended)
data_clickbench_1
;;
*)
echo "Error: unknown benchmark '$BENCHMARK' for data generation"
usage
Expand Down Expand Up @@ -193,6 +197,7 @@ main() {
run_sort
run_clickbench_1
run_clickbench_partitioned
run_clickbench_extended
;;
tpch)
run_tpch "1"
Expand All @@ -218,6 +223,9 @@ main() {
clickbench_partitioned)
run_clickbench_partitioned
;;
clickbench_extended)
run_clickbench_extended
;;
*)
echo "Error: unknown benchmark '$BENCHMARK' for run"
usage
Expand Down Expand Up @@ -401,6 +409,15 @@ run_clickbench_partitioned() {
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
}

# Runs the clickbench "extended" benchmark with a single large parquet file
run_clickbench_extended() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_extended.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (1 file) extended benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended.sql" -o ${RESULTS_FILE}
}


compare_benchmarks() {
BASE_RESULTS_DIR="${SCRIPT_DIR}/results"
BRANCH1="${ARG2}"
Expand Down
33 changes: 33 additions & 0 deletions benchmarks/queries/clickbench/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# ClickBench queries

This directory contains queries for the ClickBench benchmark https://benchmark.clickhouse.com/

ClickBench is focused on aggregation and filtering performance (though it has no Joins)

## Files:
* `queries.sql` - Actual ClickBench queries, downloaded from the [ClickBench repository]
* `extended.sql` - "Extended" DataFusion specific queries.

[ClickBench repository]: https://github.com/ClickHouse/ClickBench/blob/main/datafusion/queries.sql

## "Extended" Queries
The "extended" queries are not part of the official ClickBench benchmark.
Instead they are used to test other DataFusion features that are not
covered by the standard benchmark

Each description below is for the corresponding line in `extended.sql` (line 1
is `Q0`, line 2 is `Q1`, etc.)

### Q0
Models initial Data exploration, to understand some statistics of data.
Import Query Properties: multiple `COUNT DISTINCT` on strings

```sql
SELECT
COUNT(DISTINCT "SearchPhrase"), COUNT(DISTINCT "MobilePhone"), COUNT(DISTINCT "MobilePhoneModel")
FROM hits;
```




1 change: 0 additions & 1 deletion benchmarks/queries/clickbench/README.txt

This file was deleted.

1 change: 1 addition & 0 deletions benchmarks/queries/clickbench/extended.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT COUNT(DISTINCT "SearchPhrase"), COUNT(DISTINCT "MobilePhone"), COUNT(DISTINCT "MobilePhoneModel") FROM hits;
66 changes: 41 additions & 25 deletions benchmarks/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
// specific language governing permissions and limitations
// under the License.

use std::path::Path;
use std::{path::PathBuf, time::Instant};

use datafusion::{
common::exec_err,
error::{DataFusionError, Result},
prelude::SessionContext,
};
use datafusion_common::exec_datafusion_err;
use structopt::StructOpt;

use crate::{BenchmarkRun, CommonOpt};
Expand Down Expand Up @@ -69,15 +70,49 @@ pub struct RunOpt {
output_path: Option<PathBuf>,
}

const CLICKBENCH_QUERY_START_ID: usize = 0;
const CLICKBENCH_QUERY_END_ID: usize = 42;
struct AllQueries {
queries: Vec<String>,
}

impl AllQueries {
fn try_new(path: &Path) -> Result<Self> {
// ClickBench has all queries in a single file identified by line number
let all_queries = std::fs::read_to_string(path)
.map_err(|e| exec_datafusion_err!("Could not open {path:?}: {e}"))?;
Ok(Self {
queries: all_queries.lines().map(|s| s.to_string()).collect(),
})
}

/// Returns the text of query `query_id`
fn get_query(&self, query_id: usize) -> Result<&str> {
self.queries
.get(query_id)
.ok_or_else(|| {
let min_id = self.min_query_id();
let max_id = self.max_query_id();
exec_datafusion_err!(
"Invalid query id {query_id}. Must be between {min_id} and {max_id}"
)
})
.map(|s| s.as_str())
}

fn min_query_id(&self) -> usize {
0
}

fn max_query_id(&self) -> usize {
self.queries.len() - 1
}
}
impl RunOpt {
pub async fn run(self) -> Result<()> {
println!("Running benchmarks with the following options: {self:?}");
let queries = AllQueries::try_new(self.queries_path.as_path())?;
let query_range = match self.query {
Some(query_id) => query_id..=query_id,
None => CLICKBENCH_QUERY_START_ID..=CLICKBENCH_QUERY_END_ID,
None => queries.min_query_id()..=queries.max_query_id(),
};

let config = self.common.config();
Expand All @@ -88,12 +123,12 @@ impl RunOpt {
let mut benchmark_run = BenchmarkRun::new();
for query_id in query_range {
benchmark_run.start_new_case(&format!("Query {query_id}"));
let sql = self.get_query(query_id)?;
let sql = queries.get_query(query_id)?;
println!("Q{query_id}: {sql}");

for i in 0..iterations {
let start = Instant::now();
let results = ctx.sql(&sql).await?.collect().await?;
let results = ctx.sql(sql).await?.collect().await?;
let elapsed = start.elapsed();
let ms = elapsed.as_secs_f64() * 1000.0;
let row_count: usize = results.iter().map(|b| b.num_rows()).sum();
Expand All @@ -120,23 +155,4 @@ impl RunOpt {
)
})
}

/// Returns the text of query `query_id`
fn get_query(&self, query_id: usize) -> Result<String> {
if query_id > CLICKBENCH_QUERY_END_ID {
return exec_err!(
"Invalid query id {query_id}. Must be between {CLICKBENCH_QUERY_START_ID} and {CLICKBENCH_QUERY_END_ID}"
);
}

let path = self.queries_path.as_path();

// ClickBench has all queries in a single file identified by line number
let all_queries = std::fs::read_to_string(path).map_err(|e| {
DataFusionError::Execution(format!("Could not open {path:?}: {e}"))
})?;
let all_queries: Vec<_> = all_queries.lines().collect();

Ok(all_queries.get(query_id).map(|s| s.to_string()).unwrap())
}
}
3 changes: 2 additions & 1 deletion datafusion-cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ impl Command {
filename, e
))
})?;
exec_from_lines(ctx, &mut BufReader::new(file), print_options).await;
exec_from_lines(ctx, &mut BufReader::new(file), print_options)
.await?;
Ok(())
} else {
exec_err!("Required filename argument is missing")
Expand Down
26 changes: 14 additions & 12 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,20 @@ pub async fn exec_from_commands(
ctx: &mut SessionContext,
commands: Vec<String>,
print_options: &PrintOptions,
) {
) -> Result<()> {
for sql in commands {
match exec_and_print(ctx, print_options, sql).await {
Ok(_) => {}
Err(err) => println!("{err}"),
}
exec_and_print(ctx, print_options, sql).await?;
}

Ok(())
}

/// run and execute SQL statements and commands from a file, against a context with the given print options
pub async fn exec_from_lines(
ctx: &mut SessionContext,
reader: &mut BufReader<File>,
print_options: &PrintOptions,
) {
) -> Result<()> {
let mut query = "".to_owned();

for line in reader.lines() {
Expand Down Expand Up @@ -97,26 +96,28 @@ pub async fn exec_from_lines(
// run the left over query if the last statement doesn't contain ‘;’
// ignore if it only consists of '\n'
if query.contains(|c| c != '\n') {
match exec_and_print(ctx, print_options, query).await {
Ok(_) => {}
Err(err) => println!("{err}"),
}
exec_and_print(ctx, print_options, query).await?;
}

Ok(())
}

pub async fn exec_from_files(
ctx: &mut SessionContext,
files: Vec<String>,
print_options: &PrintOptions,
) {
) -> Result<()> {
let files = files
.into_iter()
.map(|file_path| File::open(file_path).unwrap())
.collect::<Vec<_>>();

for file in files {
let mut reader = BufReader::new(file);
exec_from_lines(ctx, &mut reader, print_options).await;
exec_from_lines(ctx, &mut reader, print_options).await?;
}

Ok(())
}

/// run and execute SQL statements and commands against a context with the given print options
Expand Down Expand Up @@ -215,6 +216,7 @@ async fn exec_and_print(
MsSQL, ClickHouse, BigQuery, Ansi."
)
})?;

let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
for statement in statements {
let mut plan = ctx.state().statement_to_plan(statement).await?;
Expand Down
20 changes: 16 additions & 4 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::collections::HashMap;
use std::env;
use std::path::Path;
use std::process::ExitCode;
use std::str::FromStr;
use std::sync::{Arc, OnceLock};

Expand Down Expand Up @@ -138,7 +139,18 @@ struct Args {
}

#[tokio::main]
pub async fn main() -> Result<()> {
/// Calls [`main_inner`], then handles printing errors and returning the correct exit code
pub async fn main() -> ExitCode {
if let Err(e) = main_inner().await {
println!("Error: {e}");
return ExitCode::FAILURE;
}

ExitCode::SUCCESS
}

/// Main CLI entrypoint
async fn main_inner() -> Result<()> {
env_logger::init();
let args = Args::parse();

Expand Down Expand Up @@ -216,7 +228,7 @@ pub async fn main() -> Result<()> {

if commands.is_empty() && files.is_empty() {
if !rc.is_empty() {
exec::exec_from_files(&mut ctx, rc, &print_options).await
exec::exec_from_files(&mut ctx, rc, &print_options).await?;
}
// TODO maybe we can have thiserror for cli but for now let's keep it simple
return exec::exec_from_repl(&mut ctx, &mut print_options)
Expand All @@ -225,11 +237,11 @@ pub async fn main() -> Result<()> {
}

if !files.is_empty() {
exec::exec_from_files(&mut ctx, files, &print_options).await;
exec::exec_from_files(&mut ctx, files, &print_options).await?;
}

if !commands.is_empty() {
exec::exec_from_commands(&mut ctx, commands, &print_options).await;
exec::exec_from_commands(&mut ctx, commands, &print_options).await?;
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/advanced_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ async fn main() -> Result<()> {
vec![col("speed")], // smooth_it(speed)
vec![col("car")], // PARTITION BY car
vec![col("time").sort(true, true)], // ORDER BY time ASC
WindowFrame::new(false),
WindowFrame::new(None),
);
let df = ctx.table("cars").await?.window(vec![window_expr])?;

Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/simple_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async fn main() -> Result<()> {
vec![col("speed")], // smooth_it(speed)
vec![col("car")], // PARTITION BY car
vec![col("time").sort(true, true)], // ORDER BY time ASC
WindowFrame::new(false),
WindowFrame::new(None),
);
let df = ctx.table("cars").await?.window(vec![window_expr])?;

Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ config_namespace! {
/// parquet files by serializing them in parallel. Each column
/// in each row group in each output file are serialized in parallel
/// leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
pub allow_single_file_parallelism: bool, default = true
pub allow_single_file_parallelism: bool, default = false

/// By default parallel parquet writer is tuned for minimum
/// memory usage in a streaming execution plan. You may see
Expand Down
Loading

0 comments on commit ef6723b

Please sign in to comment.