From 1965c04854567b86370a1070ff8e6176a82ff8aa Mon Sep 17 00:00:00 2001 From: Jay Miller <3744812+jaylmiller@users.noreply.github.com> Date: Thu, 16 Feb 2023 12:05:59 -0500 Subject: [PATCH 1/3] add SortExec input case to each merge bench case --- datafusion/core/benches/merge.rs | 72 ++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/datafusion/core/benches/merge.rs b/datafusion/core/benches/merge.rs index a7ac6cd41dac..f1c4736039f9 100644 --- a/datafusion/core/benches/merge.rs +++ b/datafusion/core/benches/merge.rs @@ -80,6 +80,7 @@ use arrow::{ /// Benchmarks for SortPreservingMerge stream use criterion::{criterion_group, criterion_main, Criterion}; +use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::{ execution::context::TaskContext, physical_plan::{ @@ -136,11 +137,22 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(move || case.run()) }); + c.bench_function("merge i64 SortExec input", |b| { + let case = MergeBenchCase::new_with_sort_input(&I64_STREAMS); + + b.iter(move || case.run()) + }); + c.bench_function("merge f64", |b| { let case = MergeBenchCase::new(&F64_STREAMS); b.iter(move || case.run()) }); + c.bench_function("merge f64 SortExec input", |b| { + let case = MergeBenchCase::new_with_sort_input(&F64_STREAMS); + + b.iter(move || case.run()) + }); c.bench_function("merge utf8 low cardinality", |b| { let case = MergeBenchCase::new(&UTF8_LOW_CARDINALITY_STREAMS); @@ -148,39 +160,79 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(move || case.run()) }); + c.bench_function("merge utf8 low cardinality SortExec", |b| { + let case = MergeBenchCase::new_with_sort_input(&UTF8_LOW_CARDINALITY_STREAMS); + + b.iter(move || case.run()) + }); + c.bench_function("merge utf8 high cardinality", |b| { let case = MergeBenchCase::new(&UTF8_HIGH_CARDINALITY_STREAMS); b.iter(move || case.run()) }); + c.bench_function("merge utf8 high cardinality SortExec input", |b| { + let case = MergeBenchCase::new_with_sort_input(&UTF8_HIGH_CARDINALITY_STREAMS); + + b.iter(move || case.run()) + }); + c.bench_function("merge utf8 tuple", |b| { let case = MergeBenchCase::new(&UTF8_TUPLE_STREAMS); b.iter(move || case.run()) }); + c.bench_function("merge utf8 tuple SortExec input", |b| { + let case = MergeBenchCase::new_with_sort_input(&UTF8_TUPLE_STREAMS); + + b.iter(move || case.run()) + }); + c.bench_function("merge utf8 dictionary", |b| { let case = MergeBenchCase::new(&DICTIONARY_STREAMS); b.iter(move || case.run()) }); + c.bench_function("merge utf8 dictionary SortExec input", |b| { + let case = MergeBenchCase::new_with_sort_input(&DICTIONARY_STREAMS); + + b.iter(move || case.run()) + }); + c.bench_function("merge utf8 dictionary tuple", |b| { let case = MergeBenchCase::new(&DICTIONARY_TUPLE_STREAMS); b.iter(move || case.run()) }); + c.bench_function("merge utf8 dictionary tuple SortExec input", |b| { + let case = MergeBenchCase::new_with_sort_input(&DICTIONARY_TUPLE_STREAMS); + b.iter(move || case.run()) + }); + c.bench_function("merge mixed utf8 dictionary tuple", |b| { let case = MergeBenchCase::new(&MIXED_DICTIONARY_TUPLE_STREAMS); b.iter(move || case.run()) }); + c.bench_function("merge mixed utf8 dictionary tuple SortExec input", |b| { + let case = MergeBenchCase::new_with_sort_input(&MIXED_DICTIONARY_TUPLE_STREAMS); + b.iter(move || case.run()) + }); + c.bench_function("merge mixed tuple", |b| { let case = MergeBenchCase::new(&MIXED_TUPLE_STREAMS); b.iter(move || case.run()) }); + + c.bench_function("merge mixed tuple SortExec input", |b| { + let case = MergeBenchCase::new_with_sort_input(&MIXED_TUPLE_STREAMS); + + b.iter(move || case.run()) + }); } /// Encapsulates running each test case @@ -214,6 +266,26 @@ impl MergeBenchCase { } } + fn new_with_sort_input(partitions: &[Vec]) -> Self { + let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + + let schema = partitions[0][0].schema(); + let sort = make_sort_exprs(schema.as_ref()); + + let projection = None; + let exec = Arc::new(MemoryExec::try_new(partitions, schema, projection).unwrap()); + let sort_exec = SortExec::try_new(sort.to_owned(), exec, None).unwrap(); + let plan = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(sort_exec))); + + Self { + runtime, + task_ctx, + plan, + } + } + /// runs the specified plan to completion, draining all input and /// panic'ing on error fn run(&self) { From fdc3a550f8999d4271393871d413fc496cdac73d Mon Sep 17 00:00:00 2001 From: Jay Miller <3744812+jaylmiller@users.noreply.github.com> Date: Thu, 16 Feb 2023 12:06:43 -0500 Subject: [PATCH 2/3] fix lil typo error in sort bench --- datafusion/core/benches/sort.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/benches/sort.rs b/datafusion/core/benches/sort.rs index 2d9417d8bd3b..5d9905d9d989 100644 --- a/datafusion/core/benches/sort.rs +++ b/datafusion/core/benches/sort.rs @@ -104,7 +104,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(move || case.run()) }); c.bench_function("sort utf8 low cardinality preserve partitioning", |b| { - let case = SortBenchCase::new(&UTF8_LOW_CARDINALITY_STREAMS); + let case = SortBenchCasePreservePartitioning::new(&UTF8_LOW_CARDINALITY_STREAMS); b.iter(move || case.run()) }); From e67f5b3b8c04a5589cdac87f70f4e37bb23a6e63 Mon Sep 17 00:00:00 2001 From: Jay Miller <3744812+jaylmiller@users.noreply.github.com> Date: Sat, 18 Feb 2023 17:58:54 -0500 Subject: [PATCH 3/3] fix sort bench to actually use full data set in non-preserve partition case --- datafusion/core/benches/sort.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/core/benches/sort.rs b/datafusion/core/benches/sort.rs index 5d9905d9d989..0507a9308a28 100644 --- a/datafusion/core/benches/sort.rs +++ b/datafusion/core/benches/sort.rs @@ -29,6 +29,7 @@ use arrow::{ /// Benchmarks for SortExec use criterion::{criterion_group, criterion_main, Criterion}; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::{ execution::context::TaskContext, physical_plan::{memory::MemoryExec, sorts::sort::SortExec, ExecutionPlan}, @@ -199,7 +200,8 @@ impl SortBenchCase { let projection = None; let exec = MemoryExec::try_new(partitions, schema, projection).unwrap(); - let plan = Arc::new(SortExec::try_new(sort, Arc::new(exec), None).unwrap()); + let exec = Arc::new(CoalescePartitionsExec::new(Arc::new(exec))); + let plan = Arc::new(SortExec::try_new(sort, exec, None).unwrap()); Self { runtime,