diff --git a/e2e_test/over_window/generated/batch/main.slt.part b/e2e_test/over_window/generated/batch/main.slt.part index 5e91838cfac89..dcd341e3e714f 100644 --- a/e2e_test/over_window/generated/batch/main.slt.part +++ b/e2e_test/over_window/generated/batch/main.slt.part @@ -9,3 +9,4 @@ include ./expr_in_win_func/mod.slt.part include ./agg_in_win_func/mod.slt.part include ./opt_agg_then_join/mod.slt.part include ./with_filter/mod.slt.part +include ./no_effect_updates/mod.slt.part diff --git a/e2e_test/over_window/generated/batch/no_effect_updates/mod.slt.part b/e2e_test/over_window/generated/batch/no_effect_updates/mod.slt.part new file mode 100644 index 0000000000000..827b54113b962 --- /dev/null +++ b/e2e_test/over_window/generated/batch/no_effect_updates/mod.slt.part @@ -0,0 +1,94 @@ +# This file is generated by `gen.py`. Do not edit it manually! + +# Test handling of updates having no effect on window function outputs. + +statement ok +create table t ( + id int + , foo int + , bar int +); + +statement ok +create view v1 as +select + * + , rank() over (partition by 1::int order by foo) as r1 +from t; + +statement ok +create view v2 as +select + * + , rank() over (partition by 1::int order by bar) as r2 +from t; + +statement ok +insert into t values + (100001, 701, 805) +, (100002, 700, 806) +, (100003, 723, 807) +, (100004, 702, 808); + +query iii +select * from v1 order by r1, id; +---- +100002 700 806 1 +100001 701 805 2 +100004 702 808 3 +100003 723 807 4 + +query iii +select * from v2 order by r2, id; +---- +100001 701 805 1 +100002 700 806 2 +100003 723 807 3 +100004 702 808 4 + +statement ok +update t set foo = 733 where id = 100001; + +query iii +select * from v1 order by r1, id; +---- +100002 700 806 1 +100004 702 808 2 +100003 723 807 3 +100001 733 805 4 + +query iii +select * from v2 order by r2, id; +---- +100001 733 805 1 +100002 700 806 2 +100003 723 807 3 +100004 702 808 4 + +statement ok +update t set bar = 804 where id = 100001; + +query iii +select * from v1 order by r1, id; +---- +100002 700 806 1 +100004 702 808 2 +100003 723 807 3 +100001 733 804 4 + +query iii +select * from v2 order by r2, id; +---- +100001 733 804 1 +100002 700 806 2 +100003 723 807 3 +100004 702 808 4 + +statement ok +drop view v1; + +statement ok +drop view v2; + +statement ok +drop table t; diff --git a/e2e_test/over_window/generated/streaming/main.slt.part b/e2e_test/over_window/generated/streaming/main.slt.part index 5e91838cfac89..dcd341e3e714f 100644 --- a/e2e_test/over_window/generated/streaming/main.slt.part +++ b/e2e_test/over_window/generated/streaming/main.slt.part @@ -9,3 +9,4 @@ include ./expr_in_win_func/mod.slt.part include ./agg_in_win_func/mod.slt.part include ./opt_agg_then_join/mod.slt.part include ./with_filter/mod.slt.part +include ./no_effect_updates/mod.slt.part diff --git a/e2e_test/over_window/generated/streaming/no_effect_updates/mod.slt.part b/e2e_test/over_window/generated/streaming/no_effect_updates/mod.slt.part new file mode 100644 index 0000000000000..0261fbb1cc454 --- /dev/null +++ b/e2e_test/over_window/generated/streaming/no_effect_updates/mod.slt.part @@ -0,0 +1,94 @@ +# This file is generated by `gen.py`. Do not edit it manually! + +# Test handling of updates having no effect on window function outputs. + +statement ok +create table t ( + id int + , foo int + , bar int +); + +statement ok +create materialized view v1 as +select + * + , rank() over (partition by 1::int order by foo) as r1 +from t; + +statement ok +create materialized view v2 as +select + * + , rank() over (partition by 1::int order by bar) as r2 +from t; + +statement ok +insert into t values + (100001, 701, 805) +, (100002, 700, 806) +, (100003, 723, 807) +, (100004, 702, 808); + +query iii +select * from v1 order by r1, id; +---- +100002 700 806 1 +100001 701 805 2 +100004 702 808 3 +100003 723 807 4 + +query iii +select * from v2 order by r2, id; +---- +100001 701 805 1 +100002 700 806 2 +100003 723 807 3 +100004 702 808 4 + +statement ok +update t set foo = 733 where id = 100001; + +query iii +select * from v1 order by r1, id; +---- +100002 700 806 1 +100004 702 808 2 +100003 723 807 3 +100001 733 805 4 + +query iii +select * from v2 order by r2, id; +---- +100001 733 805 1 +100002 700 806 2 +100003 723 807 3 +100004 702 808 4 + +statement ok +update t set bar = 804 where id = 100001; + +query iii +select * from v1 order by r1, id; +---- +100002 700 806 1 +100004 702 808 2 +100003 723 807 3 +100001 733 804 4 + +query iii +select * from v2 order by r2, id; +---- +100001 733 804 1 +100002 700 806 2 +100003 723 807 3 +100004 702 808 4 + +statement ok +drop materialized view v1; + +statement ok +drop materialized view v2; + +statement ok +drop table t; diff --git a/e2e_test/over_window/templates/main.slt.part b/e2e_test/over_window/templates/main.slt.part index 7f7ad74ab7424..e7ec434b8cfaf 100644 --- a/e2e_test/over_window/templates/main.slt.part +++ b/e2e_test/over_window/templates/main.slt.part @@ -7,3 +7,4 @@ include ./expr_in_win_func/mod.slt.part include ./agg_in_win_func/mod.slt.part include ./opt_agg_then_join/mod.slt.part include ./with_filter/mod.slt.part +include ./no_effect_updates/mod.slt.part diff --git a/e2e_test/over_window/templates/no_effect_updates/mod.slt.part b/e2e_test/over_window/templates/no_effect_updates/mod.slt.part new file mode 100644 index 0000000000000..32627f5f38f0c --- /dev/null +++ b/e2e_test/over_window/templates/no_effect_updates/mod.slt.part @@ -0,0 +1,92 @@ +# Test handling of updates having no effect on window function outputs. + +statement ok +create table t ( + id int + , foo int + , bar int +); + +statement ok +create $view_type v1 as +select + * + , rank() over (partition by 1::int order by foo) as r1 +from t; + +statement ok +create $view_type v2 as +select + * + , rank() over (partition by 1::int order by bar) as r2 +from t; + +statement ok +insert into t values + (100001, 701, 805) +, (100002, 700, 806) +, (100003, 723, 807) +, (100004, 702, 808); + +query iii +select * from v1 order by r1, id; +---- +100002 700 806 1 +100001 701 805 2 +100004 702 808 3 +100003 723 807 4 + +query iii +select * from v2 order by r2, id; +---- +100001 701 805 1 +100002 700 806 2 +100003 723 807 3 +100004 702 808 4 + +statement ok +update t set foo = 733 where id = 100001; + +query iii +select * from v1 order by r1, id; +---- +100002 700 806 1 +100004 702 808 2 +100003 723 807 3 +100001 733 805 4 + +query iii +select * from v2 order by r2, id; +---- +100001 733 805 1 +100002 700 806 2 +100003 723 807 3 +100004 702 808 4 + +statement ok +update t set bar = 804 where id = 100001; + +query iii +select * from v1 order by r1, id; +---- +100002 700 806 1 +100004 702 808 2 +100003 723 807 3 +100001 733 804 4 + +query iii +select * from v2 order by r2, id; +---- +100001 733 804 1 +100002 700 806 2 +100003 723 807 3 +100004 702 808 4 + +statement ok +drop $view_type v1; + +statement ok +drop $view_type v2; + +statement ok +drop table t; diff --git a/src/common/src/array/stream_record.rs b/src/common/src/array/stream_record.rs index 1037f92b286ce..cfd474017d933 100644 --- a/src/common/src/array/stream_record.rs +++ b/src/common/src/array/stream_record.rs @@ -28,7 +28,7 @@ pub enum RecordType { } /// Generic type to represent a row change. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] pub enum Record { Insert { new_row: R }, Delete { old_row: R }, diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 55957af9e6e5b..0ecb0eac90718 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -644,16 +644,16 @@ where pub async fn get_encoded_row(&self, pk: impl Row) -> StreamExecutorResult> { assert!(pk.len() <= self.pk_indices.len()); - if self.prefix_hint_len != 0 { - debug_assert_eq!(self.prefix_hint_len, pk.len()); - } - let serialized_pk = serialize_pk_with_vnode(&pk, &self.pk_serde, self.compute_vnode_by_pk(&pk)); let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() { Some(serialized_pk.slice(VirtualNode::SIZE..)) } else { + #[cfg(debug_assertions)] + if self.prefix_hint_len != 0 { + warn!("prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"); + } None }; diff --git a/src/stream/src/executor/over_window/general.rs b/src/stream/src/executor/over_window/general.rs index be71950694a04..1a5073b6e9546 100644 --- a/src/stream/src/executor/over_window/general.rs +++ b/src/stream/src/executor/over_window/general.rs @@ -17,6 +17,7 @@ use std::marker::PhantomData; use std::ops::RangeInclusive; use delta_btree_map::Change; +use itertools::Itertools; use risingwave_common::array::stream_record::Record; use risingwave_common::array::Op; use risingwave_common::row::RowExt; @@ -149,6 +150,8 @@ pub(super) struct Calls { pub(super) range_frames: Vec, pub(super) start_is_unbounded: bool, pub(super) end_is_unbounded: bool, + /// Deduplicated indices of all arguments of all calls. + pub(super) all_arg_indices: Vec, } impl Calls { @@ -171,12 +174,19 @@ impl Calls { .iter() .any(|call| call.frame.bounds.end_is_unbounded()); + let all_arg_indices = calls + .iter() + .flat_map(|call| call.args.val_indices().iter().copied()) + .dedup() + .collect(); + Self { calls, super_rows_frame_bounds, range_frames, start_is_unbounded, end_is_unbounded, + all_arg_indices, } } @@ -326,8 +336,12 @@ impl OverWindowExecutor { chunk: StreamChunk, metrics: &'a OverWindowMetrics, ) { - // (deduped) partition key => changes happened in the partition. - let mut deltas: BTreeMap, PartitionDelta> = BTreeMap::new(); + // (deduped) partition key => ( + // significant changes happened in the partition, + // no-effect changes happened in the partition, + // ) + let mut deltas: BTreeMap, (PartitionDelta, PartitionDelta)> = + BTreeMap::new(); // input pk of update records of which the order key is changed. let mut key_change_updated_pks = HashSet::new(); @@ -336,16 +350,16 @@ impl OverWindowExecutor { match record { Record::Insert { new_row } => { let part_key = this.get_partition_key(new_row).into(); - let part_delta = deltas.entry(part_key).or_default(); - part_delta.insert( + let (delta, _) = deltas.entry(part_key).or_default(); + delta.insert( this.row_to_cache_key(new_row)?, Change::Insert(new_row.into_owned_row()), ); } Record::Delete { old_row } => { let part_key = this.get_partition_key(old_row).into(); - let part_delta = deltas.entry(part_key).or_default(); - part_delta.insert(this.row_to_cache_key(old_row)?, Change::Delete); + let (delta, _) = deltas.entry(part_key).or_default(); + delta.insert(this.row_to_cache_key(old_row)?, Change::Delete); } Record::Update { old_row, new_row } => { let old_part_key = this.get_partition_key(old_row).into(); @@ -354,23 +368,31 @@ impl OverWindowExecutor { let new_state_key = this.row_to_cache_key(new_row)?; if old_part_key == new_part_key && old_state_key == new_state_key { // not a key-change update - let part_delta = deltas.entry(old_part_key).or_default(); - part_delta.insert(old_state_key, Change::Insert(new_row.into_owned_row())); + let (delta, no_effect_delta) = deltas.entry(old_part_key).or_default(); + if old_row.project(&this.calls.all_arg_indices) + == new_row.project(&this.calls.all_arg_indices) + { + // partition key, order key and arguments are all the same + no_effect_delta + .insert(old_state_key, Change::Insert(new_row.into_owned_row())); + } else { + delta.insert(old_state_key, Change::Insert(new_row.into_owned_row())); + } } else if old_part_key == new_part_key { // order-change update, split into delete + insert, will be merged after // building changes key_change_updated_pks.insert(this.get_input_pk(old_row)); - let part_delta = deltas.entry(old_part_key).or_default(); - part_delta.insert(old_state_key, Change::Delete); - part_delta.insert(new_state_key, Change::Insert(new_row.into_owned_row())); + let (delta, _) = deltas.entry(old_part_key).or_default(); + delta.insert(old_state_key, Change::Delete); + delta.insert(new_state_key, Change::Insert(new_row.into_owned_row())); } else { // partition-change update, split into delete + insert // NOTE(rc): Since we append partition key to logical pk, we can't merge the // delete + insert back to update later. // TODO: IMO this behavior is problematic. Deep discussion is needed. - let old_part_delta = deltas.entry(old_part_key).or_default(); + let (old_part_delta, _) = deltas.entry(old_part_key).or_default(); old_part_delta.insert(old_state_key, Change::Delete); - let new_part_delta = deltas.entry(new_part_key).or_default(); + let (new_part_delta, _) = deltas.entry(new_part_key).or_default(); new_part_delta .insert(new_state_key, Change::Insert(new_row.into_owned_row())); } @@ -384,7 +406,7 @@ impl OverWindowExecutor { let mut chunk_builder = StreamChunkBuilder::new(this.chunk_size, this.schema.data_types()); // Build final changes partition by partition. - for (part_key, delta) in deltas { + for (part_key, (delta, no_effect_delta)) in deltas { vars.stats.cache_lookup += 1; if !vars.cached_partitions.contains(&part_key.0) { vars.stats.cache_miss += 1; @@ -392,6 +414,54 @@ impl OverWindowExecutor { .put(part_key.0.clone(), new_empty_partition_cache()); } let mut cache = vars.cached_partitions.get_mut(&part_key).unwrap(); + + // First, handle `Update`s that don't affect window function outputs. + // Be careful that changes in `delta` may (though we believe unlikely) affect the + // window function outputs of rows in `no_effect_delta`, so before handling `delta` + // we need to write all changes to state table, range cache and chunk builder. + for (key, change) in no_effect_delta { + let new_row = change.into_insert().unwrap(); // new row of an `Update` + + let (old_row, from_cache) = if let Some(old_row) = cache.inner().get(&key).cloned() + { + // Got old row from range cache. + (old_row, true) + } else { + // Retrieve old row from state table. + let table_pk = (&new_row).project(this.state_table.pk_indices()); + // The accesses to the state table is ordered by table PK, so ideally we + // can leverage the block cache under the hood. + if let Some(old_row) = this.state_table.get_row(table_pk).await? { + (old_row, false) + } else { + consistency_panic!(?part_key, ?key, ?new_row, "updating non-existing row"); + continue; + } + }; + + // concatenate old outputs + let input_len = new_row.len(); + let new_row = OwnedRow::new( + new_row + .into_iter() + .chain(old_row.as_inner().iter().skip(input_len).cloned()) // chain old outputs + .collect(), + ); + + // apply & emit the change + let record = Record::Update { + old_row: &old_row, + new_row: &new_row, + }; + if let Some(chunk) = chunk_builder.append_record(record.as_ref()) { + yield chunk; + } + this.state_table.write_record(record); + if from_cache { + cache.insert(key, new_row); + } + } + let mut partition = OverPartition::new( &part_key, &mut cache, @@ -406,6 +476,10 @@ impl OverWindowExecutor { }, ); + if delta.is_empty() { + continue; + } + // Build changes for current partition. let (part_changes, accessed_range) = partition.build_changes(&this.state_table, delta).await?; diff --git a/src/stream/src/executor/over_window/over_partition.rs b/src/stream/src/executor/over_window/over_partition.rs index 8406d094d7ff7..93f17703f3073 100644 --- a/src/stream/src/executor/over_window/over_partition.rs +++ b/src/stream/src/executor/over_window/over_partition.rs @@ -398,13 +398,16 @@ impl<'a, S: StateStore> OverPartition<'a, S> { let input_schema_len = table.get_data_types().len() - self.calls.len(); let calls = self.calls; + // return values let mut part_changes = BTreeMap::new(); + let mut accessed_range: Option> = None; + + // stats let mut accessed_entry_count = 0; let mut compute_count = 0; let mut same_output_count = 0; - // Find affected ranges, this also ensures that all rows in the affected ranges are loaded - // into the cache. + // Find affected ranges, this also ensures that all rows in the affected ranges are loaded into the cache. let (part_with_delta, affected_ranges) = self.find_affected_ranges(table, &mut delta).await?; @@ -424,8 +427,6 @@ impl<'a, S: StateStore> OverPartition<'a, S> { } } - let mut accessed_range: Option> = None; - for AffectedRange { first_frame_start, first_curr_key, @@ -595,13 +596,13 @@ impl<'a, S: StateStore> OverPartition<'a, S> { 'a: 'delta, 's: 'delta, { - self.ensure_delta_in_cache(table, delta).await?; - let delta = &*delta; // let's make it immutable - if delta.is_empty() { return Ok((DeltaBTreeMap::new(self.range_cache.inner(), delta), vec![])); } + self.ensure_delta_in_cache(table, delta).await?; + let delta = &*delta; // let's make it immutable + let delta_first = delta.first_key_value().unwrap().0.as_normal_expect(); let delta_last = delta.last_key_value().unwrap().0.as_normal_expect();