Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
Omega359 committed Mar 18, 2024
2 parents 6a450b4 + 269563a commit 71d47a3
Show file tree
Hide file tree
Showing 65 changed files with 3,445 additions and 1,570 deletions.
11 changes: 9 additions & 2 deletions .github/workflows/pr_benchmarks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
with:
ref: refs/pull/${{ github.event.issue.number }}/head

- name: Setup data and generate unique result names
- name: Setup test data
# Workaround for `the input device is not a TTY`, appropriated from https://github.com/actions/runner/issues/241
shell: 'script -q -e -c "bash -e {0}"'
run: |
Expand All @@ -31,7 +31,8 @@ jobs:
# Setup the TPC-H data set with a scale factor of 10
./bench.sh data tpch
# Generate a unique-ish identifiers for the results
- name: Generate unique result names
run: |
echo "HEAD_LONG_SHA=$(git log -1 --format='%H')" >> "$GITHUB_ENV"
echo "HEAD_SHORT_SHA=$(git log -1 --format='%h' --abbrev=7)" >> "$GITHUB_ENV"
echo "BASE_SHORT_SHA=$(echo "${{ github.sha }}" | cut -c1-7)" >> "$GITHUB_ENV"
Expand All @@ -43,6 +44,12 @@ jobs:
cd benchmarks
./bench.sh run tpch
# For some reason this step doesn't seem to propagate the env var down into the script
if [ -d "results/HEAD" ]; then
echo "Moving results into ${{ env.HEAD_SHORT_SHA }}"
mv results/HEAD results/${{ env.HEAD_SHORT_SHA }}
fi
- name: Checkout base commit
uses: actions/checkout@v4
Expand Down
10 changes: 4 additions & 6 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,7 @@ config_namespace! {
pub listing_table_ignore_subdirectory: bool, default = true

/// Should DataFusion support recursive CTEs
/// Defaults to false since this feature is a work in progress and may not
/// behave as expected
pub enable_recursive_ctes: bool, default = false
pub enable_recursive_ctes: bool, default = true
}
}

Expand Down
165 changes: 100 additions & 65 deletions datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::windows::{
create_window_expr, BoundedWindowAggExec, WindowAggExec,
};
use datafusion::physical_plan::{collect, ExecutionPlan, InputOrderMode};
use datafusion::physical_plan::InputOrderMode::{Linear, PartiallySorted, Sorted};
use datafusion::physical_plan::{collect, InputOrderMode};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::{Result, ScalarValue};
use datafusion_common_runtime::SpawnedTask;
Expand All @@ -44,8 +45,6 @@ use hashbrown::HashMap;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};

use datafusion_physical_plan::InputOrderMode::{Linear, PartiallySorted, Sorted};

#[tokio::test(flavor = "multi_thread", worker_threads = 16)]
async fn window_bounded_window_random_comparison() -> Result<()> {
// make_staggered_batches gives result sorted according to a, b, c
Expand Down Expand Up @@ -515,7 +514,8 @@ fn get_random_window_frame(rng: &mut StdRng, is_linear: bool) -> WindowFrame {
} else {
WindowFrameUnits::Groups
};
match units {

let mut window_frame = match units {
// In range queries window frame boundaries should match column type
WindowFrameUnits::Range => {
let start_bound = if start_bound.is_preceding {
Expand Down Expand Up @@ -566,6 +566,47 @@ fn get_random_window_frame(rng: &mut StdRng, is_linear: bool) -> WindowFrame {
// should work only with WindowAggExec
window_frame
}
};
convert_bound_to_current_row_if_applicable(rng, &mut window_frame.start_bound);
convert_bound_to_current_row_if_applicable(rng, &mut window_frame.end_bound);
window_frame
}

/// This utility converts `PRECEDING(0)` or `FOLLOWING(0)` specifiers in window
/// frame bounds to `CURRENT ROW` with 50% probability. This enables us to test
/// behaviour of the system in the `CURRENT ROW` mode.
fn convert_bound_to_current_row_if_applicable(
rng: &mut StdRng,
bound: &mut WindowFrameBound,
) {
match bound {
WindowFrameBound::Preceding(value) | WindowFrameBound::Following(value) => {
if let Ok(zero) = ScalarValue::new_zero(&value.data_type()) {
if value == &zero && rng.gen_range(0..2) == 0 {
*bound = WindowFrameBound::CurrentRow;
}
}
}
_ => {}
}
}

/// This utility determines whether a given window frame can be executed with
/// multiple ORDER BY expressions. As an example, range frames with offset (such
/// as `RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING`) cannot have ORDER BY clauses
/// of the form `\[ORDER BY a ASC, b ASC, ...]`
fn can_accept_multi_orderby(window_frame: &WindowFrame) -> bool {
match window_frame.units {
WindowFrameUnits::Rows => true,
WindowFrameUnits::Range => {
// Range can only accept multi ORDER BY clauses when bounds are
// CURRENT ROW or UNBOUNDED PRECEDING/FOLLOWING:
(window_frame.start_bound.is_unbounded()
|| window_frame.start_bound == WindowFrameBound::CurrentRow)
&& (window_frame.end_bound.is_unbounded()
|| window_frame.end_bound == WindowFrameBound::CurrentRow)
}
WindowFrameUnits::Groups => true,
}
}

Expand All @@ -588,13 +629,16 @@ async fn run_window_test(
let mut orderby_exprs = vec![];
for column in &orderby_columns {
orderby_exprs.push(PhysicalSortExpr {
expr: col(column, &schema).unwrap(),
expr: col(column, &schema)?,
options: SortOptions::default(),
})
}
if orderby_exprs.len() > 1 && !can_accept_multi_orderby(&window_frame) {
orderby_exprs = orderby_exprs[0..1].to_vec();
}
let mut partitionby_exprs = vec![];
for column in &partition_by_columns {
partitionby_exprs.push(col(column, &schema).unwrap());
partitionby_exprs.push(col(column, &schema)?);
}
let mut sort_keys = vec![];
for partition_by_expr in &partitionby_exprs {
Expand All @@ -609,7 +653,7 @@ async fn run_window_test(
}
}

let concat_input_record = concat_batches(&schema, &input1).unwrap();
let concat_input_record = concat_batches(&schema, &input1)?;
let source_sort_keys = vec![
PhysicalSortExpr {
expr: col("a", &schema)?,
Expand All @@ -624,73 +668,59 @@ async fn run_window_test(
options: Default::default(),
},
];
let memory_exec =
MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), None).unwrap();
let memory_exec = memory_exec.with_sort_information(vec![source_sort_keys.clone()]);
let mut exec1 = Arc::new(memory_exec) as Arc<dyn ExecutionPlan>;
let mut exec1 = Arc::new(
MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), None)?
.with_sort_information(vec![source_sort_keys.clone()]),
) as _;
// Table is ordered according to ORDER BY a, b, c In linear test we use PARTITION BY b, ORDER BY a
// For WindowAggExec to produce correct result it need table to be ordered by b,a. Hence add a sort.
if is_linear {
exec1 = Arc::new(SortExec::new(sort_keys.clone(), exec1)) as _;
exec1 = Arc::new(SortExec::new(sort_keys, exec1)) as _;
}

let usual_window_exec = Arc::new(
WindowAggExec::try_new(
vec![create_window_expr(
&window_fn,
fn_name.clone(),
&args,
&partitionby_exprs,
&orderby_exprs,
Arc::new(window_frame.clone()),
schema.as_ref(),
false,
)
.unwrap()],
exec1,
vec![],
)
.unwrap(),
) as _;
let usual_window_exec = Arc::new(WindowAggExec::try_new(
vec![create_window_expr(
&window_fn,
fn_name.clone(),
&args,
&partitionby_exprs,
&orderby_exprs,
Arc::new(window_frame.clone()),
schema.as_ref(),
false,
)?],
exec1,
vec![],
)?) as _;
let exec2 = Arc::new(
MemoryExec::try_new(&[input1.clone()], schema.clone(), None)
.unwrap()
MemoryExec::try_new(&[input1.clone()], schema.clone(), None)?
.with_sort_information(vec![source_sort_keys.clone()]),
);
let running_window_exec = Arc::new(
BoundedWindowAggExec::try_new(
vec![create_window_expr(
&window_fn,
fn_name,
&args,
&partitionby_exprs,
&orderby_exprs,
Arc::new(window_frame.clone()),
schema.as_ref(),
false,
)
.unwrap()],
exec2,
vec![],
search_mode,
)
.unwrap(),
) as Arc<dyn ExecutionPlan>;
let running_window_exec = Arc::new(BoundedWindowAggExec::try_new(
vec![create_window_expr(
&window_fn,
fn_name,
&args,
&partitionby_exprs,
&orderby_exprs,
Arc::new(window_frame.clone()),
schema.as_ref(),
false,
)?],
exec2,
vec![],
search_mode.clone(),
)?) as _;
let task_ctx = ctx.task_ctx();
let collected_usual = collect(usual_window_exec, task_ctx.clone()).await.unwrap();

let collected_running = collect(running_window_exec, task_ctx.clone())
.await
.unwrap();
let collected_usual = collect(usual_window_exec, task_ctx.clone()).await?;
let collected_running = collect(running_window_exec, task_ctx).await?;

// BoundedWindowAggExec should produce more chunk than the usual WindowAggExec.
// Otherwise it means that we cannot generate result in running mode.
assert!(collected_running.len() > collected_usual.len());
// compare
let usual_formatted = pretty_format_batches(&collected_usual).unwrap().to_string();
let running_formatted = pretty_format_batches(&collected_running)
.unwrap()
.to_string();
let usual_formatted = pretty_format_batches(&collected_usual)?.to_string();
let running_formatted = pretty_format_batches(&collected_running)?.to_string();

let mut usual_formatted_sorted: Vec<&str> = usual_formatted.trim().lines().collect();
usual_formatted_sorted.sort_unstable();
Expand All @@ -703,11 +733,16 @@ async fn run_window_test(
.zip(&running_formatted_sorted)
.enumerate()
{
assert_eq!(
(i, usual_line),
(i, running_line),
"Inconsistent result for window_frame: {window_frame:?}, window_fn: {window_fn:?}, args:{args:?}"
);
if !usual_line.eq(running_line) {
println!("Inconsistent result for window_frame at line:{i:?}: {window_frame:?}, window_fn: {window_fn:?}, args:{args:?}, pb_cols:{partition_by_columns:?}, ob_cols:{orderby_columns:?}, search_mode:{search_mode:?}");
println!("--------usual_formatted_sorted----------------running_formatted_sorted--------");
for (line1, line2) in
usual_formatted_sorted.iter().zip(running_formatted_sorted)
{
println!("{:?} --- {:?}", line1, line2);
}
unreachable!();
}
}
Ok(())
}
Expand Down
22 changes: 22 additions & 0 deletions datafusion/core/tests/memory_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,28 @@ async fn sort_spill_reservation() {
test.with_config(config).with_expected_success().run().await;
}

#[tokio::test]
async fn oom_recursive_cte() {
TestCase::new()
.with_query(
"WITH RECURSIVE nodes AS (
SELECT 1 as id
UNION ALL
SELECT UNNEST(RANGE(id+1, id+1000)) as id
FROM nodes
WHERE id < 10
)
SELECT * FROM nodes;",
)
.with_expected_errors(vec![
"Resources exhausted: Failed to allocate additional",
"RecursiveQuery",
])
.with_memory_limit(2_000)
.run()
.await
}

/// Run the query with the specified memory limit,
/// and verifies the expected errors are returned
#[derive(Clone, Debug)]
Expand Down
Loading

0 comments on commit 71d47a3

Please sign in to comment.