Skip to content

Commit

Permalink
Support IGNORE NULLS for NTH_VALUE window function (apache#9625)
Browse files Browse the repository at this point in the history
Co-authored-by: Huaxin Gao <[email protected]>
  • Loading branch information
huaxingao and Huaxin Gao authored Mar 18, 2024
1 parent d81e9ed commit 40bf0ea
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 10 deletions.
32 changes: 23 additions & 9 deletions datafusion/physical-expr/src/window/nth_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,16 @@ impl NthValue {
name: impl Into<String>,
expr: Arc<dyn PhysicalExpr>,
data_type: DataType,
n: u32,
n: i64,
ignore_nulls: bool,
) -> Result<Self> {
if ignore_nulls {
return exec_err!("NTH_VALUE ignore_nulls is not supported yet");
}
match n {
0 => exec_err!("NTH_VALUE expects n to be non-zero"),
_ => Ok(Self {
name: name.into(),
expr,
data_type,
kind: NthValueKind::Nth(n as i64),
kind: NthValueKind::Nth(n),
ignore_nulls,
}),
}
Expand Down Expand Up @@ -267,20 +264,37 @@ impl PartitionEvaluator for NthValueEvaluator {
if index >= n_range {
// Outside the range, return NULL:
ScalarValue::try_from(arr.data_type())
} else if self.ignore_nulls {
let valid_indices = valid_indices.unwrap();
if index >= valid_indices.len() {
return ScalarValue::try_from(arr.data_type());
}
ScalarValue::try_from_array(
&slice.unwrap(),
valid_indices[index],
)
} else {
ScalarValue::try_from_array(arr, range.start + index)
}
}
Ordering::Less => {
let reverse_index = (-n) as usize;
if n_range >= reverse_index {
if n_range < reverse_index {
// Outside the range, return NULL:
ScalarValue::try_from(arr.data_type())
} else if self.ignore_nulls {
let valid_indices = valid_indices.unwrap();
if reverse_index > valid_indices.len() {
return ScalarValue::try_from(arr.data_type());
}
let new_index =
valid_indices[valid_indices.len() - reverse_index];
ScalarValue::try_from_array(&slice.unwrap(), new_index)
} else {
ScalarValue::try_from_array(
arr,
range.start + n_range - reverse_index,
)
} else {
// Outside the range, return NULL:
ScalarValue::try_from(arr.data_type())
}
}
Ordering::Equal => {
Expand Down
1 change: 0 additions & 1 deletion datafusion/physical-plan/src/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ fn create_built_in_window_expr(
.clone()
.try_into()
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
let n: u32 = n as u32;
Arc::new(NthValue::nth(
name,
arg,
Expand Down
182 changes: 182 additions & 0 deletions datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4617,3 +4617,185 @@ NULL 1

statement ok
DROP TABLE t;

# Test for ignore nulls with ORDER BY in nth_VALUE
statement ok
CREATE TABLE t AS VALUES (3, 3), (4, 4), (null::bigint, 1), (null::bigint, 2), (5, 5), (6, 6);

query I
SELECT column1 FROM t ORDER BY column2;
----
NULL
NULL
3
4
5
6

query I
SELECT nth_VALUE(column1, 3) OVER(ORDER BY column2) FROM t;
----
NULL
NULL
3
3
3
3

query I
SELECT nth_VALUE(column1, 3) IGNORE NULLS OVER(ORDER BY column2) FROM t;
----
NULL
NULL
NULL
NULL
5
5

query I
SELECT nth_VALUE(column1, 3) OVER(ORDER BY column2 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM t;
----
3
3
3
3
3
3

query I
SELECT nth_VALUE(column1, 3) IGNORE NULLS OVER(ORDER BY column2 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM t;
----
5
5
5
5
5
5

query I
SELECT nth_value(column1, 3) OVER(ORDER BY column2 RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) FROM t;
----
NULL
3
4
5
6
NULL

query I
SELECT nth_value(column1, 3) IGNORE NULLS OVER (ORDER BY column2 RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) FROM t;
----
NULL
NULL
NULL
5
6
NULL

statement ok
DROP TABLE t;

# Test for negative index in NTH_VALUE
statement ok
CREATE TABLE t AS VALUES (3, 3), (4, 4), (null::bigint, 1), (null::bigint, 2), (5, 5), (6, 6);

query I
SELECT column1 FROM t ORDER BY column2 DESC NULLS LAST;
----
6
5
4
3
NULL
NULL

query I
SELECT nth_VALUE(column1, -2) OVER(ORDER BY column2 DESC NULLS LAST) FROM t;
----
NULL
6
5
4
3
NULL

query I
SELECT nth_VALUE(column1, -2) IGNORE NULLS OVER(ORDER BY column2 DESC NULLS LAST) FROM t;
----
NULL
6
5
4
4
4

query I
SELECT nth_VALUE(column1, -2) OVER(ORDER BY column2 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM t;
----
5
5
5
5
5
5

query I
SELECT nth_VALUE(column1, -2) IGNORE NULLS OVER(ORDER BY column2 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM t;
----
4
4
4
4
4
4

query I
SELECT nth_value(column1, -2) OVER (ORDER BY column2 DESC NULLS LAST RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) FROM t;
----
6
5
4
3
NULL
NULL

query I
SELECT nth_value(column1, -2) IGNORE NULLS OVER (ORDER BY column2 DESC NULLS LAST RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) FROM t;
----
6
5
4
4
NULL
NULL

statement ok
DROP TABLE t;

# Test for ignore nulls with ORDER BY in NTH_VALUE with all NULLs
statement ok
CREATE TABLE t AS VALUES (null::bigint, 4), (null::bigint, 3), (null::bigint, 1), (null::bigint, 2);

query II
SELECT NTH_VALUE(column1, 2) OVER(ORDER BY column2 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), column2 FROM t;
----
NULL 4
NULL 3
NULL 2
NULL 1

query II
SELECT NTH_VALUE(column1, 2) RESPECT NULLS OVER(ORDER BY column2 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), column2 FROM t;
----
NULL 4
NULL 3
NULL 2
NULL 1

query II
SELECT NTH_VALUE(column1, 2) IGNORE NULLS OVER(ORDER BY column2 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), column2 FROM t;
----
NULL 4
NULL 3
NULL 2
NULL 1

0 comments on commit 40bf0ea

Please sign in to comment.