From a2a8c428f53c99cd324c900579e5c41e1cb372eb Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Wed, 6 Nov 2024 15:45:19 +0800 Subject: [PATCH 1/5] fix first_value and last_value to not ignore NULLs Signed-off-by: Richard Chien --- src/expr/impl/src/aggregate/general.rs | 38 ++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/src/expr/impl/src/aggregate/general.rs b/src/expr/impl/src/aggregate/general.rs index 0c94312335b4b..52ec7909225d4 100644 --- a/src/expr/impl/src/aggregate/general.rs +++ b/src/expr/impl/src/aggregate/general.rs @@ -124,13 +124,47 @@ fn max(state: T, input: T) -> T { state.max(input) } +/// Note that different from `min` and `max`, `first_value` doesn't ignore `NULL` values. +/// +/// ```slt +/// statement ok +/// create table t(v1 int, ts int); +/// +/// statement ok +/// insert into t values (null, 1), (2, 2), (null, 3); +/// +/// query I +/// select first_value(v1 order by ts) from t; +/// ---- +/// NULL +/// +/// statement ok +/// drop table t; +/// ``` #[aggregate("first_value(*) -> auto", state = "ref")] -fn first_value(state: T, _: T) -> T { +fn first_value(state: Option, _: Option) -> Option { state } +/// Note that different from `min` and `max`, `last_value` doesn't ignore `NULL` values. +/// +/// ```slt +/// statement ok +/// create table t(v1 int, ts int); +/// +/// statement ok +/// insert into t values (null, 1), (2, 2), (null, 3); +/// +/// query I +/// select last_value(v1 order by ts) from t; +/// ---- +/// NULL +/// +/// statement ok +/// drop table t; +/// ``` #[aggregate("last_value(*) -> auto", state = "ref")] -fn last_value(_: T, input: T) -> T { +fn last_value(_: Option, input: Option) -> Option { input } From 771e07a785a5c54a07bab1aa1e73628f2a6c3116 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Thu, 7 Nov 2024 17:18:07 +0800 Subject: [PATCH 2/5] move `first_value` and `last_value` to separate source file Signed-off-by: Richard Chien --- .../impl/src/aggregate/first_last_value.rs | 69 +++++++++++++++++++ src/expr/impl/src/aggregate/general.rs | 53 -------------- src/expr/impl/src/aggregate/mod.rs | 1 + 3 files changed, 70 insertions(+), 53 deletions(-) create mode 100644 src/expr/impl/src/aggregate/first_last_value.rs diff --git a/src/expr/impl/src/aggregate/first_last_value.rs b/src/expr/impl/src/aggregate/first_last_value.rs new file mode 100644 index 0000000000000..102e51f7c6696 --- /dev/null +++ b/src/expr/impl/src/aggregate/first_last_value.rs @@ -0,0 +1,69 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::types::{Datum, DatumRef, ScalarRefImpl}; +use risingwave_expr::aggregate; + +/// Note that different from `min` and `max`, `first_value` doesn't ignore `NULL` values. +/// +/// ```slt +/// statement ok +/// create table t(v1 int, ts int); +/// +/// statement ok +/// insert into t values (null, 1), (2, 2), (null, 3); +/// +/// query I +/// select first_value(v1 order by ts) from t; +/// ---- +/// NULL +/// +/// statement ok +/// drop table t; +/// ``` +#[aggregate("first_value(*) -> auto", state = "ref")] +fn first_value(state: Option, _: Option) -> Option { + state +} + +/// Note that different from `min` and `max`, `last_value` doesn't ignore `NULL` values. +/// +/// ```slt +/// statement ok +/// create table t(v1 int, ts int); +/// +/// statement ok +/// insert into t values (null, 1), (2, 2), (null, 3); +/// +/// query I +/// select last_value(v1 order by ts) from t; +/// ---- +/// NULL +/// +/// statement ok +/// drop table t; +/// ``` +#[aggregate("last_value(*) -> auto", state = "ref")] // TODO(): `last_value(any) -> any` +fn last_value(_: Option, input: Option) -> Option { + input +} + +#[aggregate("internal_last_seen_value(*) -> auto", state = "ref", internal)] +fn internal_last_seen_value(state: T, input: T, retract: bool) -> T { + if retract { + state + } else { + input + } +} diff --git a/src/expr/impl/src/aggregate/general.rs b/src/expr/impl/src/aggregate/general.rs index 52ec7909225d4..daaea5e782fd1 100644 --- a/src/expr/impl/src/aggregate/general.rs +++ b/src/expr/impl/src/aggregate/general.rs @@ -124,59 +124,6 @@ fn max(state: T, input: T) -> T { state.max(input) } -/// Note that different from `min` and `max`, `first_value` doesn't ignore `NULL` values. -/// -/// ```slt -/// statement ok -/// create table t(v1 int, ts int); -/// -/// statement ok -/// insert into t values (null, 1), (2, 2), (null, 3); -/// -/// query I -/// select first_value(v1 order by ts) from t; -/// ---- -/// NULL -/// -/// statement ok -/// drop table t; -/// ``` -#[aggregate("first_value(*) -> auto", state = "ref")] -fn first_value(state: Option, _: Option) -> Option { - state -} - -/// Note that different from `min` and `max`, `last_value` doesn't ignore `NULL` values. -/// -/// ```slt -/// statement ok -/// create table t(v1 int, ts int); -/// -/// statement ok -/// insert into t values (null, 1), (2, 2), (null, 3); -/// -/// query I -/// select last_value(v1 order by ts) from t; -/// ---- -/// NULL -/// -/// statement ok -/// drop table t; -/// ``` -#[aggregate("last_value(*) -> auto", state = "ref")] -fn last_value(_: Option, input: Option) -> Option { - input -} - -#[aggregate("internal_last_seen_value(*) -> auto", state = "ref", internal)] -fn internal_last_seen_value(state: T, input: T, retract: bool) -> T { - if retract { - state - } else { - input - } -} - /// Note the following corner cases: /// /// ```slt diff --git a/src/expr/impl/src/aggregate/mod.rs b/src/expr/impl/src/aggregate/mod.rs index 349574018fedf..881465b4cf82f 100644 --- a/src/expr/impl/src/aggregate/mod.rs +++ b/src/expr/impl/src/aggregate/mod.rs @@ -20,6 +20,7 @@ mod bit_or; mod bit_xor; mod bool_and; mod bool_or; +mod first_last_value; mod general; mod jsonb_agg; mod mode; From a25e0b581cd7db027c05403b2fe545ec2cca4098 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Mon, 11 Nov 2024 17:46:30 +0800 Subject: [PATCH 3/5] impl `first_value` with custom state Signed-off-by: Richard Chien --- .../impl/src/aggregate/first_last_value.rs | 35 ++++++++++++++++--- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/src/expr/impl/src/aggregate/first_last_value.rs b/src/expr/impl/src/aggregate/first_last_value.rs index 102e51f7c6696..e4577361c6089 100644 --- a/src/expr/impl/src/aggregate/first_last_value.rs +++ b/src/expr/impl/src/aggregate/first_last_value.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::types::{Datum, DatumRef, ScalarRefImpl}; +use risingwave_common::types::{Datum, ScalarRefImpl}; +use risingwave_common_estimate_size::EstimateSize; use risingwave_expr::aggregate; +use risingwave_expr::aggregate::AggStateDyn; /// Note that different from `min` and `max`, `first_value` doesn't ignore `NULL` values. /// @@ -32,9 +34,32 @@ use risingwave_expr::aggregate; /// statement ok /// drop table t; /// ``` -#[aggregate("first_value(*) -> auto", state = "ref")] -fn first_value(state: Option, _: Option) -> Option { - state +#[aggregate("first_value(any) -> any")] +fn first_value(state: &mut FirstValueState, input: Option>) { + if state.0.is_none() { + state.0 = Some(input.map(|x| x.into_scalar_impl())); + } +} + +#[derive(Debug, Clone, Default)] +struct FirstValueState(Option); + +impl EstimateSize for FirstValueState { + fn estimated_heap_size(&self) -> usize { + self.0.estimated_heap_size() + } +} + +impl AggStateDyn for FirstValueState {} + +impl From<&FirstValueState> for Datum { + fn from(state: &FirstValueState) -> Self { + if let Some(state) = &state.0 { + state.clone() + } else { + None + } + } } /// Note that different from `min` and `max`, `last_value` doesn't ignore `NULL` values. @@ -54,7 +79,7 @@ fn first_value(state: Option, _: Option) -> Option { /// statement ok /// drop table t; /// ``` -#[aggregate("last_value(*) -> auto", state = "ref")] // TODO(): `last_value(any) -> any` +#[aggregate("last_value(*) -> auto", state = "ref")] // TODO(rc): `last_value(any) -> any` fn last_value(_: Option, input: Option) -> Option { input } From b52a775a07703c7d2abdd4b4a9f0385fc8f52b74 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Mon, 11 Nov 2024 17:50:20 +0800 Subject: [PATCH 4/5] simplify Signed-off-by: Richard Chien --- src/expr/impl/src/aggregate/first_last_value.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/expr/impl/src/aggregate/first_last_value.rs b/src/expr/impl/src/aggregate/first_last_value.rs index e4577361c6089..841442148f722 100644 --- a/src/expr/impl/src/aggregate/first_last_value.rs +++ b/src/expr/impl/src/aggregate/first_last_value.rs @@ -41,15 +41,9 @@ fn first_value(state: &mut FirstValueState, input: Option>) { } } -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, EstimateSize)] struct FirstValueState(Option); -impl EstimateSize for FirstValueState { - fn estimated_heap_size(&self) -> usize { - self.0.estimated_heap_size() - } -} - impl AggStateDyn for FirstValueState {} impl From<&FirstValueState> for Datum { From 155765db797b001a1c190c8723878045542a66dd Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Tue, 12 Nov 2024 20:20:17 +0800 Subject: [PATCH 5/5] correct varchar generation in sqlsmith Signed-off-by: Richard Chien --- src/tests/sqlsmith/src/sql_gen/scalar.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/tests/sqlsmith/src/sql_gen/scalar.rs b/src/tests/sqlsmith/src/sql_gen/scalar.rs index 62cd7218dcc90..a532f6138c596 100644 --- a/src/tests/sqlsmith/src/sql_gen/scalar.rs +++ b/src/tests/sqlsmith/src/sql_gen/scalar.rs @@ -81,11 +81,15 @@ impl SqlGenerator<'_, R> { data_type: AstDataType::SmallInt, value: self.gen_int(i16::MIN as isize, i16::MAX as isize), })), - T::Varchar => Expr::Value(Value::SingleQuotedString( - (0..10) - .map(|_| self.rng.sample(Alphanumeric) as char) - .collect(), - )), + T::Varchar => Expr::Cast { + // since we are generating random scalar literal, we should cast it to avoid unknown type + expr: Box::new(Expr::Value(Value::SingleQuotedString( + (0..10) + .map(|_| self.rng.sample(Alphanumeric) as char) + .collect(), + ))), + data_type: AstDataType::Varchar, + }, T::Decimal => Expr::Nested(Box::new(Expr::Value(Value::Number(self.gen_float())))), T::Float64 => Expr::Nested(Box::new(Expr::TypedString { data_type: AstDataType::Float(None),