diff --git a/proto/hummock.proto b/proto/hummock.proto index 15f3d61a7cf2b..c362aa006c003 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -149,6 +149,9 @@ message TableWatermarks { // The direction of the table watermark. bool is_ascending = 2; + + // The table watermark is non-pk prefix table watermark. + bool is_non_pk_prefix = 3; } message EpochNewChangeLog { diff --git a/src/common/src/util/row_serde.rs b/src/common/src/util/row_serde.rs index 57b540621958c..285e988e9e8b0 100644 --- a/src/common/src/util/row_serde.rs +++ b/src/common/src/util/row_serde.rs @@ -52,6 +52,18 @@ impl OrderedRowSerde { } } + #[must_use] + pub fn index(&self, idx: usize) -> Cow<'_, Self> { + if 1 == self.order_types.len() { + Cow::Borrowed(self) + } else { + Cow::Owned(Self { + schema: vec![self.schema[idx].clone()], + order_types: vec![self.order_types[idx]], + }) + } + } + /// Note: prefer [`Row::memcmp_serialize`] if possible. pub fn serialize(&self, row: impl Row, append_to: impl BufMut) { self.serialize_datums(row.iter(), append_to) diff --git a/src/meta/src/hummock/compaction/mod.rs b/src/meta/src/hummock/compaction/mod.rs index d5115569eabc9..4ad03e02c6d37 100644 --- a/src/meta/src/hummock/compaction/mod.rs +++ b/src/meta/src/hummock/compaction/mod.rs @@ -174,6 +174,8 @@ impl CompactStatus { pub fn is_trivial_reclaim(task: &CompactTask) -> bool { // Currently all VnodeWatermark tasks are trivial reclaim. if task.task_type == TaskType::VnodeWatermark { + assert!(task.input_ssts.len() == 2); + assert!(task.input_ssts[1].table_infos.is_empty()); return true; } let exist_table_ids = HashSet::::from_iter(task.existing_table_ids.clone()); diff --git a/src/meta/src/hummock/compaction/picker/vnode_watermark_picker.rs b/src/meta/src/hummock/compaction/picker/vnode_watermark_picker.rs index dcfb4a59daa94..85f51b131135f 100644 --- a/src/meta/src/hummock/compaction/picker/vnode_watermark_picker.rs +++ b/src/meta/src/hummock/compaction/picker/vnode_watermark_picker.rs @@ -101,7 +101,7 @@ fn should_delete_key_by_watermark( let Some(w) = watermark.vnode_watermarks.get(&vnode) else { return false; }; - watermark.direction.filter_by_watermark(key, w) + watermark.direction.key_filter_by_watermark(key, w) } #[cfg(test)] diff --git a/src/meta/src/hummock/compaction/selector/vnode_watermark_selector.rs b/src/meta/src/hummock/compaction/selector/vnode_watermark_selector.rs index c88e275d1e1c3..347cc7d4388ce 100644 --- a/src/meta/src/hummock/compaction/selector/vnode_watermark_selector.rs +++ b/src/meta/src/hummock/compaction/selector/vnode_watermark_selector.rs @@ -76,7 +76,7 @@ fn safe_epoch_read_table_watermarks( state_table_info: &HummockVersionStateTableInfo, member_table_ids: &BTreeSet, ) -> BTreeMap { - safe_epoch_read_table_watermarks_impl(&safe_epoch_table_watermarks_impl( + safe_epoch_read_table_watermarks_impl(safe_epoch_table_watermarks_impl( table_watermarks, state_table_info, &member_table_ids diff --git a/src/meta/src/hummock/manager/compaction/mod.rs b/src/meta/src/hummock/manager/compaction/mod.rs index bc91fc0676fd4..204e4c3797281 100644 --- a/src/meta/src/hummock/manager/compaction/mod.rs +++ b/src/meta/src/hummock/manager/compaction/mod.rs @@ -48,6 +48,7 @@ use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_stats::{ add_prost_table_stats_map, purge_prost_table_stats, PbTableStatsMap, }; +use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType; use risingwave_hummock_sdk::version::{GroupDelta, IntraLevelDelta}; use risingwave_hummock_sdk::{ compact_task_to_string, statistics_compact_task, CompactionGroupId, HummockCompactionTaskId, @@ -728,6 +729,22 @@ impl HummockManager { } } + let table_watermarks = version + .latest_version() + .table_watermarks + .iter() + .filter_map(|(table_id, table_watermarks)| { + if matches!( + table_watermarks.watermark_type, + WatermarkSerdeType::PkPrefix, + ) { + Some((*table_id, table_watermarks.clone())) + } else { + None + } + }) + .collect(); + while let Some(compact_task) = compact_status.get_compact_task( version .latest_version() @@ -742,7 +759,7 @@ impl HummockManager { selector, &table_id_to_option, developer_config.clone(), - &version.latest_version().table_watermarks, + &table_watermarks, &version.latest_version().state_table_info, ) { let target_level_id = compact_task.input.target_level as u32; diff --git a/src/storage/benches/bench_compactor.rs b/src/storage/benches/bench_compactor.rs index bb1d1026aa7a4..33e9d080ec8d2 100644 --- a/src/storage/benches/bench_compactor.rs +++ b/src/storage/benches/bench_compactor.rs @@ -136,11 +136,13 @@ async fn build_table( }, ); let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]); + let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]); let mut builder = SstableBuilder::<_, Xor16FilterBuilder>::for_test( sstable_object_id, writer, opt, table_id_to_vnode, + table_id_to_watermark_serde, ); let value = b"1234567890123456789"; let mut full_key = test_key_of(0, epoch, TableId::new(0)); @@ -186,11 +188,14 @@ async fn build_table_2( ); let table_id_to_vnode = HashMap::from_iter(vec![(table_id, VirtualNode::COUNT_FOR_TEST)]); + let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]); + let mut builder = SstableBuilder::<_, Xor16FilterBuilder>::for_test( sstable_object_id, writer, opt, table_id_to_vnode, + table_id_to_watermark_serde, ); let mut full_key = test_key_of(0, epoch, TableId::new(table_id)); let table_key_len = full_key.user_key.table_key.len(); diff --git a/src/storage/benches/bench_merge_iter.rs b/src/storage/benches/bench_merge_iter.rs index 2357ab78674d5..6775054bd542c 100644 --- a/src/storage/benches/bench_merge_iter.rs +++ b/src/storage/benches/bench_merge_iter.rs @@ -14,13 +14,16 @@ use std::cell::RefCell; use std::collections::BTreeMap; +use std::sync::Arc; use bytes::Bytes; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; use futures::executor::block_on; use risingwave_hummock_sdk::key::TableKey; +use risingwave_storage::compaction_catalog_manager::CompactionCatalogAgent; use risingwave_storage::hummock::iterator::{ - Forward, HummockIterator, HummockIteratorUnion, MergeIterator, SkipWatermarkIterator, + Forward, HummockIterator, HummockIteratorUnion, MergeIterator, + NonPkPrefixSkipWatermarkIterator, SkipWatermarkIterator, }; use risingwave_storage::hummock::shared_buffer::shared_buffer_batch::{ SharedBufferBatch, SharedBufferBatchIterator, SharedBufferValue, @@ -108,10 +111,20 @@ fn criterion_benchmark(c: &mut Criterion) { }, ); - let merge_iter = RefCell::new(SkipWatermarkIterator::new( - MergeIterator::new(gen_interleave_shared_buffer_batch_iter(10000, 100)), - BTreeMap::new(), - )); + let combine_iter = { + let iter = SkipWatermarkIterator::new( + MergeIterator::new(gen_interleave_shared_buffer_batch_iter(10000, 100)), + BTreeMap::new(), + ); + + NonPkPrefixSkipWatermarkIterator::new( + iter, + BTreeMap::new(), + Arc::new(CompactionCatalogAgent::dummy()), + ) + }; + + let merge_iter = RefCell::new(combine_iter); c.bench_with_input( BenchmarkId::new("bench-merge-iter-skip-empty-watermark", "unordered"), &merge_iter, diff --git a/src/storage/benches/bench_multi_builder.rs b/src/storage/benches/bench_multi_builder.rs index 8cd54ce782f2c..07fd6eb7993fe 100644 --- a/src/storage/benches/bench_multi_builder.rs +++ b/src/storage/benches/bench_multi_builder.rs @@ -90,7 +90,16 @@ impl TableBuilderFactory for LocalTableBuilderFactory bool { + self.table_options + .iter() + .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0)) + } + + // The compact task may need to reclaim key with range tombstone + pub fn contains_range_tombstone(&self) -> bool { + self.input_ssts + .iter() + .flat_map(|level| level.table_infos.iter()) + .any(|sst| sst.range_tombstone_count > 0) + } + + // The compact task may need to reclaim key with split sst + pub fn contains_split_sst(&self) -> bool { + self.input_ssts + .iter() + .flat_map(|level| level.table_infos.iter()) + .any(|sst| sst.sst_id != sst.object_id) + } + + pub fn get_table_ids_from_input_ssts(&self) -> impl Iterator { + self.input_ssts + .iter() + .flat_map(|level| level.table_infos.iter()) + .flat_map(|sst| sst.table_ids.clone()) + .sorted() + .unique() + } + + // filter the table-id that in existing_table_ids with the table-id in compact-task + pub fn build_compact_table_ids(&self) -> Vec { + let existing_table_ids: HashSet = HashSet::from_iter(self.existing_table_ids.clone()); + self.get_table_ids_from_input_ssts() + .filter(|table_id| existing_table_ids.contains(table_id)) + .collect() + } +} + impl From for CompactTask { #[expect(deprecated)] fn from(pb_compact_task: PbCompactTask) -> Self { diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index ee316f75ffd65..70824eedd7c2c 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -136,6 +136,7 @@ pub fn safe_epoch_table_watermarks_impl( Some(TableWatermarks { watermarks: vec![(*first_epoch, first_epoch_watermark.clone())], direction: table_watermarks.direction, + watermark_type: table_watermarks.watermark_type, }) } else { None @@ -156,10 +157,10 @@ pub fn safe_epoch_table_watermarks_impl( } pub fn safe_epoch_read_table_watermarks_impl( - safe_epoch_watermarks: &BTreeMap, + safe_epoch_watermarks: BTreeMap, ) -> BTreeMap { safe_epoch_watermarks - .iter() + .into_iter() .map(|(table_id, watermarks)| { assert_eq!(watermarks.watermarks.len(), 1); let vnode_watermarks = &watermarks.watermarks.first().expect("should exist").1; @@ -177,7 +178,7 @@ pub fn safe_epoch_read_table_watermarks_impl( } } ( - TableId::from(*table_id), + TableId::from(table_id), ReadTableWatermark { direction: watermarks.direction, vnode_watermarks: vnode_watermark_map, diff --git a/src/storage/hummock_sdk/src/table_watermark.rs b/src/storage/hummock_sdk/src/table_watermark.rs index 33e5127b730fd..2faacbdcaf116 100644 --- a/src/storage/hummock_sdk/src/table_watermark.rs +++ b/src/storage/hummock_sdk/src/table_watermark.rs @@ -24,6 +24,8 @@ use itertools::Itertools; use risingwave_common::bitmap::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::TableId; use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; +use risingwave_common::types::ToDatumRef; +use risingwave_common::util::sort_util::{cmp_datum, OrderType}; use risingwave_common_estimate_size::EstimateSize; use risingwave_pb::hummock::table_watermarks::PbEpochNewWatermarks; use risingwave_pb::hummock::{PbVnodeWatermark, TableWatermarks as PbTableWatermarks}; @@ -292,7 +294,11 @@ impl Display for WatermarkDirection { } impl WatermarkDirection { - pub fn filter_by_watermark(&self, key: impl AsRef<[u8]>, watermark: impl AsRef<[u8]>) -> bool { + pub fn key_filter_by_watermark( + &self, + key: impl AsRef<[u8]>, + watermark: impl AsRef<[u8]>, + ) -> bool { let key = key.as_ref(); let watermark = watermark.as_ref(); match self { @@ -301,6 +307,26 @@ impl WatermarkDirection { } } + pub fn datum_filter_by_watermark( + &self, + watermark_col_in_pk: impl ToDatumRef, + watermark: impl ToDatumRef, + order_type: OrderType, + ) -> bool { + let watermark_col_in_pk = watermark_col_in_pk.to_datum_ref(); + let watermark = watermark.to_datum_ref(); + match self { + WatermarkDirection::Ascending => { + // watermark_col_in_pk < watermark + cmp_datum(watermark_col_in_pk, watermark, order_type).is_lt() + } + WatermarkDirection::Descending => { + // watermark_col_in_pk > watermark + cmp_datum(watermark_col_in_pk, watermark, order_type).is_gt() + } + } + } + pub fn is_ascending(&self) -> bool { match self { WatermarkDirection::Ascending => true, @@ -382,6 +408,7 @@ pub struct TableWatermarks { // later epoch at the back pub watermarks: Vec<(HummockEpoch, Arc<[VnodeWatermark]>)>, pub direction: WatermarkDirection, + pub watermark_type: WatermarkSerdeType, } impl TableWatermarks { @@ -389,12 +416,14 @@ impl TableWatermarks { epoch: HummockEpoch, watermarks: Vec, direction: WatermarkDirection, + watermark_type: WatermarkSerdeType, ) -> Self { let mut this = Self { direction, watermarks: Vec::new(), + watermark_type, }; - this.add_new_epoch_watermarks(epoch, watermarks.into(), direction); + this.add_new_epoch_watermarks(epoch, watermarks.into(), direction, watermark_type); this } @@ -403,8 +432,11 @@ impl TableWatermarks { epoch: HummockEpoch, watermarks: Arc<[VnodeWatermark]>, direction: WatermarkDirection, + watermark_type: WatermarkSerdeType, ) { assert_eq!(self.direction, direction); + assert_eq!(self.watermark_type, watermark_type); + if let Some((prev_epoch, _)) = self.watermarks.last() { assert!(*prev_epoch < epoch); } @@ -449,6 +481,11 @@ impl TableWatermarks { } else { WatermarkDirection::Descending }, + watermark_type: if pb.is_non_pk_prefix { + WatermarkSerdeType::NonPkPrefix + } else { + WatermarkSerdeType::PkPrefix + }, } } } @@ -456,19 +493,30 @@ impl TableWatermarks { pub fn merge_multiple_new_table_watermarks( table_watermarks_list: impl IntoIterator>, ) -> HashMap { - let mut ret: HashMap>)> = - HashMap::new(); + #[allow(clippy::type_complexity)] + let mut ret: HashMap< + TableId, + ( + WatermarkDirection, + BTreeMap>, + WatermarkSerdeType, + ), + > = HashMap::new(); for table_watermarks in table_watermarks_list { for (table_id, new_table_watermarks) in table_watermarks { let epoch_watermarks = match ret.entry(table_id) { Entry::Occupied(entry) => { - let (direction, epoch_watermarks) = entry.into_mut(); + let (direction, epoch_watermarks, watermark_type) = entry.into_mut(); assert_eq!(&new_table_watermarks.direction, direction); + assert_eq!(&new_table_watermarks.watermark_type, watermark_type); epoch_watermarks } Entry::Vacant(entry) => { - let (_, epoch_watermarks) = - entry.insert((new_table_watermarks.direction, BTreeMap::new())); + let (_, epoch_watermarks, _) = entry.insert(( + new_table_watermarks.direction, + BTreeMap::new(), + new_table_watermarks.watermark_type, + )); epoch_watermarks } }; @@ -481,19 +529,22 @@ pub fn merge_multiple_new_table_watermarks( } } ret.into_iter() - .map(|(table_id, (direction, epoch_watermarks))| { - ( - table_id, - TableWatermarks { - direction, - // ordered from earlier epoch to later epoch - watermarks: epoch_watermarks - .into_iter() - .map(|(epoch, watermarks)| (epoch, Arc::from(watermarks))) - .collect(), - }, - ) - }) + .map( + |(table_id, (direction, epoch_watermarks, watermark_type))| { + ( + table_id, + TableWatermarks { + direction, + // ordered from earlier epoch to later epoch + watermarks: epoch_watermarks + .into_iter() + .map(|(epoch, watermarks)| (epoch, Arc::from(watermarks))) + .collect(), + watermark_type, + }, + ) + }, + ) .collect() } @@ -599,6 +650,7 @@ impl TableWatermarks { *self = TableWatermarks { watermarks: result_epoch_watermark, direction: self.direction, + watermark_type: self.watermark_type, } } } @@ -645,6 +697,11 @@ impl From<&PbTableWatermarks> for TableWatermarks { } else { WatermarkDirection::Descending }, + watermark_type: if pb.is_non_pk_prefix { + WatermarkSerdeType::NonPkPrefix + } else { + WatermarkSerdeType::PkPrefix + }, } } } @@ -664,6 +721,10 @@ impl From<&TableWatermarks> for PbTableWatermarks { WatermarkDirection::Ascending => true, WatermarkDirection::Descending => false, }, + is_non_pk_prefix: match table_watermarks.watermark_type { + WatermarkSerdeType::NonPkPrefix => true, + WatermarkSerdeType::PkPrefix => false, + }, } } } @@ -689,6 +750,11 @@ impl From for TableWatermarks { } else { WatermarkDirection::Descending }, + watermark_type: if pb.is_non_pk_prefix { + WatermarkSerdeType::NonPkPrefix + } else { + WatermarkSerdeType::PkPrefix + }, } } } @@ -708,10 +774,20 @@ impl From for PbTableWatermarks { WatermarkDirection::Ascending => true, WatermarkDirection::Descending => false, }, + is_non_pk_prefix: match table_watermarks.watermark_type { + WatermarkSerdeType::NonPkPrefix => true, + WatermarkSerdeType::PkPrefix => false, + }, } } } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum WatermarkSerdeType { + PkPrefix, + NonPkPrefix, +} + #[cfg(test)] mod tests { use std::collections::Bound::Included; @@ -732,7 +808,7 @@ mod tests { use crate::key::{is_empty_key_range, prefixed_range_with_vnode, TableKeyRange}; use crate::table_watermark::{ merge_multiple_new_table_watermarks, TableWatermarks, TableWatermarksIndex, VnodeWatermark, - WatermarkDirection, + WatermarkDirection, WatermarkSerdeType, }; use crate::version::HummockVersion; @@ -752,6 +828,7 @@ mod tests { let watermark2 = Bytes::from("watermark2"); let watermark3 = Bytes::from("watermark3"); let watermark4 = Bytes::from("watermark4"); + let watermark_type = WatermarkSerdeType::PkPrefix; let mut table_watermarks = TableWatermarks::single_epoch( epoch1, vec![VnodeWatermark::new( @@ -759,6 +836,7 @@ mod tests { watermark1.clone(), )], direction, + watermark_type, ); let epoch2 = epoch1.next_epoch(); table_watermarks.add_new_epoch_watermarks( @@ -769,6 +847,7 @@ mod tests { )] .into(), direction, + watermark_type, ); let mut table_watermark_checkpoint = table_watermarks.clone(); @@ -781,6 +860,7 @@ mod tests { watermark3.clone(), )], direction, + watermark_type, ); table_watermarks.add_new_epoch_watermarks( epoch3, @@ -790,6 +870,7 @@ mod tests { )] .into(), direction, + watermark_type, ); let epoch4 = epoch3.next_epoch(); let epoch5 = epoch4.next_epoch(); @@ -801,6 +882,7 @@ mod tests { )] .into(), direction, + watermark_type, ); second_table_watermark.add_new_epoch_watermarks( epoch5, @@ -810,6 +892,7 @@ mod tests { )] .into(), direction, + watermark_type, ); table_watermark_checkpoint.apply_new_table_watermarks(&second_table_watermark); @@ -824,6 +907,7 @@ mod tests { let watermark2 = Bytes::from("watermark2"); let watermark3 = Bytes::from("watermark3"); let watermark4 = Bytes::from("watermark4"); + let watermark_type = WatermarkSerdeType::PkPrefix; let mut table_watermarks = TableWatermarks::single_epoch( epoch1, vec![VnodeWatermark::new( @@ -831,6 +915,7 @@ mod tests { watermark1.clone(), )], direction, + watermark_type, ); let epoch2 = epoch1.next_epoch(); table_watermarks.add_new_epoch_watermarks( @@ -841,6 +926,7 @@ mod tests { )] .into(), direction, + watermark_type, ); let epoch3 = epoch2.next_epoch(); table_watermarks.add_new_epoch_watermarks( @@ -851,6 +937,7 @@ mod tests { )] .into(), direction, + watermark_type, ); let epoch4 = epoch3.next_epoch(); let epoch5 = epoch4.next_epoch(); @@ -862,6 +949,7 @@ mod tests { )] .into(), direction, + watermark_type, ); let mut table_watermarks_checkpoint = table_watermarks.clone(); @@ -899,6 +987,7 @@ mod tests { ) ], direction, + watermark_type, } ); @@ -925,6 +1014,7 @@ mod tests { ) ], direction, + watermark_type, } ); @@ -951,6 +1041,7 @@ mod tests { ) ], direction, + watermark_type, } ); @@ -970,6 +1061,7 @@ mod tests { .into() )], direction, + watermark_type, } ); } @@ -1000,6 +1092,7 @@ mod tests { .map(|epoch: u64| epoch_new_watermark(epoch, vec![&bitmap])) .collect(), direction: WatermarkDirection::Ascending, + watermark_type: WatermarkSerdeType::PkPrefix, } } let table1_watermark1 = build_table_watermark(0..3, vec![1, 2, 4]); @@ -1024,6 +1117,7 @@ mod tests { epoch_new_watermark(5, vec![&build_bitmap(4..6)]), ], direction: WatermarkDirection::Ascending, + watermark_type: WatermarkSerdeType::PkPrefix, }, ); expected.insert(TableId::new(2), table2_watermark); @@ -1209,6 +1303,7 @@ mod tests { .into(), )], direction: WatermarkDirection::Ascending, + watermark_type: WatermarkSerdeType::PkPrefix, } .into(), ); diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 193e1ef6f5267..711ee400ce777 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -39,7 +39,7 @@ pub(crate) mod tests { use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; use risingwave_hummock_sdk::table_watermark::{ - ReadTableWatermark, TableWatermarks, VnodeWatermark, WatermarkDirection, + ReadTableWatermark, TableWatermarks, VnodeWatermark, WatermarkDirection, WatermarkSerdeType, }; use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{can_concat, CompactionGroupId}; @@ -66,7 +66,7 @@ pub(crate) mod tests { }; use risingwave_storage::hummock::iterator::test_utils::mock_sstable_store; use risingwave_storage::hummock::iterator::{ - ConcatIterator, SkipWatermarkIterator, UserIterator, + ConcatIterator, NonPkPrefixSkipWatermarkIterator, SkipWatermarkIterator, UserIterator, }; use risingwave_storage::hummock::sstable_store::SstableStoreRef; use risingwave_storage::hummock::test_utils::gen_test_sstable_info; @@ -926,9 +926,11 @@ pub(crate) mod tests { let table_id_to_vnode = HashMap::from_iter([(existing_table_id, VirtualNode::COUNT_FOR_TEST)]); + let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]); let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new( FilterKeyExtractorImpl::Multi(multi_filter_key_extractor), table_id_to_vnode, + table_id_to_watermark_serde, )); let compact_ctx = get_compactor_context(&storage); @@ -1771,6 +1773,7 @@ pub(crate) mod tests { vec![VnodeWatermark::new(bitmap.clone(), watermark_key.clone())].into(), )], direction: WatermarkDirection::Ascending, + watermark_type: WatermarkSerdeType::PkPrefix, }, ); @@ -1830,21 +1833,38 @@ pub(crate) mod tests { } } let watermark = BTreeMap::from_iter([(TableId::new(1), watermark)]); + let compaction_catalog_agent = CompactionCatalogAgent::for_test(vec![1]); - let mut normal_iter = UserIterator::for_test( - SkipWatermarkIterator::new( + let combine_iter = { + let iter = SkipWatermarkIterator::new( ConcatIterator::new(ret, sstable_store.clone(), read_options.clone()), watermark.clone(), - ), - (Bound::Unbounded, Bound::Unbounded), - ); - let mut fast_iter = UserIterator::for_test( - SkipWatermarkIterator::new( - ConcatIterator::new(fast_ret, sstable_store, read_options), - watermark, - ), - (Bound::Unbounded, Bound::Unbounded), - ); + ); + + NonPkPrefixSkipWatermarkIterator::new( + iter, + BTreeMap::default(), + compaction_catalog_agent.clone(), + ) + }; + + let mut normal_iter = + UserIterator::for_test(combine_iter, (Bound::Unbounded, Bound::Unbounded)); + + let combine_iter = { + let iter = SkipWatermarkIterator::new( + ConcatIterator::new(fast_ret, sstable_store.clone(), read_options.clone()), + watermark.clone(), + ); + + NonPkPrefixSkipWatermarkIterator::new( + iter, + BTreeMap::default(), + compaction_catalog_agent.clone(), + ) + }; + let mut fast_iter = + UserIterator::for_test(combine_iter, (Bound::Unbounded, Bound::Unbounded)); normal_iter.rewind().await.unwrap(); fast_iter.rewind().await.unwrap(); let mut count = 0; diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index e0ffdb6a5eebe..87d8b1bce31b6 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -33,7 +33,7 @@ use risingwave_hummock_sdk::key_range::KeyRange; use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner}; use risingwave_hummock_sdk::table_stats::TableStats; use risingwave_hummock_sdk::table_watermark::{ - TableWatermarksIndex, VnodeWatermark, WatermarkDirection, + TableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType, }; use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo}; use risingwave_meta::hummock::test_utils::get_compaction_group_id_by_table_id; @@ -2268,6 +2268,7 @@ async fn test_table_watermark() { table_watermarks: Some(( WatermarkDirection::Ascending, vec![VnodeWatermark::new(vnode_bitmap, gen_inner_key(watermark1))], + WatermarkSerdeType::PkPrefix, )), switch_op_consistency_level: None, }, @@ -2595,6 +2596,7 @@ async fn test_table_watermark() { table_watermarks: Some(( WatermarkDirection::Ascending, vec![VnodeWatermark::new(vnode_bitmap, gen_inner_key(5))], + WatermarkSerdeType::PkPrefix, )), switch_op_consistency_level: None, }, diff --git a/src/storage/hummock_trace/src/opts.rs b/src/storage/hummock_trace/src/opts.rs index b42a682c5eb97..04a26f9cfc90d 100644 --- a/src/storage/hummock_trace/src/opts.rs +++ b/src/storage/hummock_trace/src/opts.rs @@ -263,7 +263,7 @@ pub struct TracedInitOptions { #[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)] pub struct TracedSealCurrentEpochOptions { // The watermark is serialized into protobuf - pub table_watermarks: Option<(bool, Vec>)>, + pub table_watermarks: Option<(bool, Vec>, bool)>, pub switch_op_consistency_level: Option, } diff --git a/src/storage/src/compaction_catalog_manager.rs b/src/storage/src/compaction_catalog_manager.rs index 54cffd4357855..b817633c5e5ac 100644 --- a/src/storage/src/compaction_catalog_manager.rs +++ b/src/storage/src/compaction_catalog_manager.rs @@ -16,6 +16,7 @@ use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::sync::Arc; +use itertools::Itertools; use parking_lot::RwLock; use risingwave_common::catalog::ColumnDesc; use risingwave_common::hash::{VirtualNode, VnodeCountCompat}; @@ -314,13 +315,23 @@ impl CompactionCatalogManager { let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default(); let mut table_id_to_vnode = HashMap::new(); + let mut table_id_to_watermark_serde = HashMap::new(); + { let guard = self.table_id_to_catalog.read(); table_ids.retain(|table_id| match guard.get(table_id) { Some(table_catalog) => { + // filter-key-extractor multi_filter_key_extractor .register(*table_id, FilterKeyExtractorImpl::from_table(table_catalog)); + + // vnode table_id_to_vnode.insert(*table_id, table_catalog.vnode_count()); + + // watermark + table_id_to_watermark_serde + .insert(*table_id, build_watermark_col_serde(table_catalog)); + false } @@ -346,9 +357,16 @@ impl CompactionCatalogManager { let table_id = table.id; let key_extractor = FilterKeyExtractorImpl::from_table(&table); let vnode = table.vnode_count(); + let watermark_serde = build_watermark_col_serde(&table); guard.insert(table_id, table); + // filter-key-extractor multi_filter_key_extractor.register(table_id, key_extractor); + + // vnode table_id_to_vnode.insert(table_id, vnode); + + // watermark + table_id_to_watermark_serde.insert(table_id, watermark_serde); } } } @@ -356,6 +374,7 @@ impl CompactionCatalogManager { Ok(Arc::new(CompactionCatalogAgent::new( FilterKeyExtractorImpl::Multi(multi_filter_key_extractor), table_id_to_vnode, + table_id_to_watermark_serde, ))) } @@ -365,15 +384,23 @@ impl CompactionCatalogManager { ) -> CompactionCatalogAgentRef { let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default(); let mut table_id_to_vnode = HashMap::new(); + let mut table_id_to_watermark_serde = HashMap::new(); for (table_id, table_catalog) in table_catalogs { + // filter-key-extractor multi_filter_key_extractor .register(table_id, FilterKeyExtractorImpl::from_table(&table_catalog)); + + // vnode table_id_to_vnode.insert(table_id, table_catalog.vnode_count()); + + // watermark + table_id_to_watermark_serde.insert(table_id, build_watermark_col_serde(&table_catalog)); } Arc::new(CompactionCatalogAgent::new( FilterKeyExtractorImpl::Multi(multi_filter_key_extractor), table_id_to_vnode, + table_id_to_watermark_serde, )) } } @@ -384,16 +411,25 @@ impl CompactionCatalogManager { pub struct CompactionCatalogAgent { filter_key_extractor_manager: FilterKeyExtractorImpl, table_id_to_vnode: HashMap, + // table_id ->(pk_prefix_serde, clean_watermark_col_serde, watermark_col_idx) + // cache for reduce serde build + table_id_to_watermark_serde: + HashMap>, } impl CompactionCatalogAgent { pub fn new( filter_key_extractor_manager: FilterKeyExtractorImpl, table_id_to_vnode: HashMap, + table_id_to_watermark_serde: HashMap< + StateTableId, + Option<(OrderedRowSerde, OrderedRowSerde, usize)>, + >, ) -> Self { Self { filter_key_extractor_manager, table_id_to_vnode, + table_id_to_watermark_serde, } } @@ -401,6 +437,7 @@ impl CompactionCatalogAgent { Self { filter_key_extractor_manager: FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor), table_id_to_vnode: Default::default(), + table_id_to_watermark_serde: Default::default(), } } @@ -408,14 +445,20 @@ impl CompactionCatalogAgent { let full_key_filter_key_extractor = FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor); - let table_id_to_vnode = table_ids + let table_id_to_vnode: HashMap = table_ids .into_iter() .map(|table_id| (table_id, VirtualNode::COUNT_FOR_TEST)) .collect(); + let table_id_to_watermark_serde = table_id_to_vnode + .keys() + .map(|table_id| (*table_id, None)) + .collect(); + Arc::new(CompactionCatalogAgent::new( full_key_filter_key_extractor, table_id_to_vnode, + table_id_to_watermark_serde, )) } } @@ -435,6 +478,22 @@ impl CompactionCatalogAgent { }) } + pub fn watermark_serde( + &self, + table_id: StateTableId, + ) -> Option<(OrderedRowSerde, OrderedRowSerde, usize)> { + self.table_id_to_watermark_serde + .get(&table_id) + .unwrap_or_else(|| { + panic!( + "table_id not found {} all_table_ids {:?}", + table_id, + self.table_id_to_watermark_serde.keys() + ) + }) + .clone() + } + pub fn table_id_to_vnode_ref(&self) -> &HashMap { &self.table_id_to_vnode } @@ -447,6 +506,53 @@ impl CompactionCatalogAgent { pub type CompactionCatalogManagerRef = Arc; pub type CompactionCatalogAgentRef = Arc; +fn build_watermark_col_serde( + table_catalog: &Table, +) -> Option<(OrderedRowSerde, OrderedRowSerde, usize)> { + match table_catalog.clean_watermark_index_in_pk { + None => { + // non watermark table or watermark column is the first column (pk_prefix_watermark) + None + } + + Some(clean_watermark_index_in_pk) => { + use risingwave_common::types::DataType; + let table_columns: Vec = table_catalog + .columns + .iter() + .map(|col| col.column_desc.as_ref().unwrap().into()) + .collect(); + + let pk_data_types: Vec = table_catalog + .pk + .iter() + .map(|col_order| { + table_columns[col_order.column_index as usize] + .data_type + .clone() + }) + .collect(); + + let pk_order_types = table_catalog + .pk + .iter() + .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap())) + .collect_vec(); + + assert_eq!(pk_data_types.len(), pk_order_types.len()); + let pk_serde = OrderedRowSerde::new(pk_data_types, pk_order_types); + let watermark_col_serde = pk_serde + .index(clean_watermark_index_in_pk as usize) + .into_owned(); + Some(( + pk_serde, + watermark_col_serde, + clean_watermark_index_in_pk as usize, + )) + } + } +} + #[cfg(test)] mod tests { use std::mem; diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index 7b335d64531b2..4f4611014634c 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -27,6 +27,7 @@ use risingwave_hummock_sdk::key::FullKey; use risingwave_hummock_sdk::key_range::KeyRange; use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_stats::TableStatsMap; +use risingwave_hummock_sdk::table_watermark::{TableWatermarks, WatermarkSerdeType}; use risingwave_hummock_sdk::{can_concat, EpochWithGap, KeyComparator}; use risingwave_pb::hummock::compact_task::PbTaskType; use risingwave_pb::hummock::{BloomFilterType, PbLevelType, PbTableSchema}; @@ -39,7 +40,8 @@ use crate::hummock::compactor::{ TtlCompactionFilter, }; use crate::hummock::iterator::{ - Forward, HummockIterator, MergeIterator, SkipWatermarkIterator, UserIterator, + Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator, + SkipWatermarkIterator, UserIterator, }; use crate::hummock::multi_builder::TableBuilderFactory; use crate::hummock::sstable::DEFAULT_ENTRY_SIZE; @@ -322,27 +324,13 @@ pub fn estimate_task_output_capacity(context: CompactorContext, task: &CompactTa pub async fn check_compaction_result( compact_task: &CompactTask, context: CompactorContext, + compaction_catalog_agent_ref: CompactionCatalogAgentRef, ) -> HummockResult { - let has_ttl = compact_task - .table_options - .iter() - .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0)); - - let mut compact_table_ids = compact_task - .input_ssts - .iter() - .flat_map(|level| level.table_infos.iter()) - .flat_map(|sst| sst.table_ids.clone()) - .collect_vec(); - compact_table_ids.sort(); - compact_table_ids.dedup(); - let existing_table_ids: HashSet = - HashSet::from_iter(compact_task.existing_table_ids.clone()); - let need_clean_state_table = compact_table_ids - .iter() - .any(|table_id| !existing_table_ids.contains(table_id)); + let mut table_ids_from_input_ssts = compact_task.get_table_ids_from_input_ssts(); + let need_clean_state_table = table_ids_from_input_ssts + .any(|table_id| !compact_task.existing_table_ids.contains(&table_id)); // This check method does not consider dropped keys by compaction filter. - if has_ttl || need_clean_state_table { + if compact_task.contains_ttl() || need_clean_state_table { return Ok(true); } @@ -379,13 +367,25 @@ pub async fn check_compaction_result( } let iter = MergeIterator::for_compactor(table_iters); - let left_iter = UserIterator::new( - SkipWatermarkIterator::from_safe_epoch_watermarks(iter, &compact_task.table_watermarks), - (Bound::Unbounded, Bound::Unbounded), - u64::MAX, - 0, - None, - ); + let (pk_watermarks, non_pk_prefix_watermarks) = split_watermark_from_task(compact_task); + let left_iter = { + let skip_watermark_iter = + SkipWatermarkIterator::from_safe_epoch_watermarks(iter, pk_watermarks.clone()); + + let combine_iter = NonPkPrefixSkipWatermarkIterator::from_safe_epoch_watermarks( + skip_watermark_iter, + non_pk_prefix_watermarks.clone(), + compaction_catalog_agent_ref.clone(), + ); + + UserIterator::new( + combine_iter, + (Bound::Unbounded, Bound::Unbounded), + u64::MAX, + 0, + None, + ) + }; let iter = ConcatSstableIterator::new( compact_task.existing_table_ids.clone(), compact_task.sorted_output_ssts.clone(), @@ -394,13 +394,24 @@ pub async fn check_compaction_result( Arc::new(TaskProgress::default()), context.storage_opts.compactor_iter_max_io_retry_times, ); - let right_iter = UserIterator::new( - SkipWatermarkIterator::from_safe_epoch_watermarks(iter, &compact_task.table_watermarks), - (Bound::Unbounded, Bound::Unbounded), - u64::MAX, - 0, - None, - ); + let right_iter = { + let skip_watermark_iter = + SkipWatermarkIterator::from_safe_epoch_watermarks(iter, pk_watermarks); + + let combine_iter = NonPkPrefixSkipWatermarkIterator::from_safe_epoch_watermarks( + skip_watermark_iter, + non_pk_prefix_watermarks, + compaction_catalog_agent_ref, + ); + + UserIterator::new( + combine_iter, + (Bound::Unbounded, Bound::Unbounded), + u64::MAX, + 0, + None, + ) + }; check_result(left_iter, right_iter).await } @@ -513,36 +524,12 @@ pub fn optimize_by_copy_block(compact_task: &CompactTask, context: &CompactorCon .map(|table_info| table_info.total_key_count) .sum::(); - let has_tombstone = compact_task - .input_ssts - .iter() - .flat_map(|level| level.table_infos.iter()) - .any(|sst| sst.range_tombstone_count > 0); - let has_ttl = compact_task - .table_options - .iter() - .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0)); - - let has_split_sst = compact_task - .input_ssts - .iter() - .flat_map(|level| level.table_infos.iter()) - .any(|sst| sst.sst_id != sst.object_id); - - let compact_table_ids: HashSet = HashSet::from_iter( - compact_task - .input_ssts - .iter() - .flat_map(|level| level.table_infos.iter()) - .flat_map(|sst| sst.table_ids.clone()), - ); - let single_table = compact_table_ids.len() == 1; - + let single_table = compact_task.build_compact_table_ids().len() == 1; context.storage_opts.enable_fast_compaction && all_ssts_are_blocked_filter - && !has_tombstone - && !has_ttl - && !has_split_sst + && !compact_task.contains_range_tombstone() + && !compact_task.contains_ttl() + && !compact_task.contains_split_sst() && single_table && compact_task.target_level > 0 && compact_task.input_ssts.len() == 2 @@ -676,3 +663,23 @@ pub fn calculate_task_parallelism_impl( let parallelism = compaction_size.div_ceil(parallel_compact_size); worker_num.min(parallelism.min(max_sub_compaction as u64) as usize) } + +pub fn split_watermark_from_task( + compact_task: &CompactTask, +) -> ( + BTreeMap, + BTreeMap, +) { + let mut pk_watermarks = BTreeMap::default(); + let mut non_pk_prefix_watermarks = BTreeMap::default(); + + for (table_id, watermark) in &compact_task.table_watermarks { + if let WatermarkSerdeType::PkPrefix = watermark.watermark_type { + pk_watermarks.insert(*table_id, watermark.clone()); + } else { + non_pk_prefix_watermarks.insert(*table_id, watermark.clone()); + } + } + + (pk_watermarks, non_pk_prefix_watermarks) +} diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 929de0f69f860..f5602bf792f87 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -38,6 +38,7 @@ use risingwave_pb::hummock::LevelType; use thiserror_ext::AsReport; use tokio::sync::oneshot::Receiver; +use super::compaction_utils::split_watermark_from_task; use super::iterator::MonitoredCompactorIterator; use super::task_progress::TaskProgress; use super::{CompactionStatistics, TaskConfig}; @@ -53,7 +54,8 @@ use crate::hummock::compactor::{ CompactorContext, }; use crate::hummock::iterator::{ - Forward, HummockIterator, MergeIterator, SkipWatermarkIterator, ValueMeta, + Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator, + SkipWatermarkIterator, ValueMeta, }; use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory}; use crate::hummock::utils::MemoryTracker; @@ -138,7 +140,8 @@ impl CompactorRunner { compaction_catalog_agent_ref: CompactionCatalogAgentRef, task_progress: Arc, ) -> HummockResult { - let iter = self.build_sst_iter(task_progress.clone())?; + let iter = + self.build_sst_iter(task_progress.clone(), compaction_catalog_agent_ref.clone())?; let (ssts, compaction_stat) = self .compactor .compact_key_range( @@ -157,6 +160,7 @@ impl CompactorRunner { fn build_sst_iter( &self, task_progress: Arc, + compaction_catalog_agent_ref: CompactionCatalogAgentRef, ) -> HummockResult> { let compactor_iter_max_io_retry_times = self .compactor @@ -239,13 +243,26 @@ impl CompactorRunner { // The `SkipWatermarkIterator` is used to handle the table watermark state cleaning introduced // in https://github.com/risingwavelabs/risingwave/issues/13148 - Ok(SkipWatermarkIterator::from_safe_epoch_watermarks( - MonitoredCompactorIterator::new( - MergeIterator::for_compactor(table_iters), - task_progress.clone(), - ), - &self.compact_task.table_watermarks, - )) + let combine_iter = { + let (pk_watermarks, non_pk_prefix_watermarks) = + split_watermark_from_task(&self.compact_task); + + let skip_watermark_iter = SkipWatermarkIterator::from_safe_epoch_watermarks( + MonitoredCompactorIterator::new( + MergeIterator::for_compactor(table_iters), + task_progress.clone(), + ), + pk_watermarks, + ); + + NonPkPrefixSkipWatermarkIterator::from_safe_epoch_watermarks( + skip_watermark_iter, + non_pk_prefix_watermarks, + compaction_catalog_agent_ref, + ) + }; + + Ok(combine_iter) } } @@ -575,19 +592,7 @@ pub async fn compact( ), Option, ) { - let existing_table_ids: HashSet = - HashSet::from_iter(compact_task.existing_table_ids.clone()); - let compact_table_ids = Vec::from_iter( - compact_task - .input_ssts - .iter() - .flat_map(|level| level.table_infos.iter()) - .flat_map(|sst| sst.table_ids.clone()) - .filter(|table_id| existing_table_ids.contains(table_id)) - .sorted() - .unique(), - ); - + let compact_table_ids = compact_task.build_compact_table_ids(); let compaction_catalog_agent_ref = match compaction_catalog_manager_ref .acquire(compact_table_ids.clone()) .await diff --git a/src/storage/src/hummock/compactor/fast_compactor_runner.rs b/src/storage/src/hummock/compactor/fast_compactor_runner.rs index 5fd1f061d875f..bb7abe94466b1 100644 --- a/src/storage/src/hummock/compactor/fast_compactor_runner.rs +++ b/src/storage/src/hummock/compactor/fast_compactor_runner.rs @@ -32,11 +32,12 @@ use risingwave_hummock_sdk::{can_concat, compact_task_to_string, EpochWithGap, L use crate::compaction_catalog_manager::CompactionCatalogAgentRef; use crate::hummock::block_stream::BlockDataStream; +use crate::hummock::compactor::compaction_utils::split_watermark_from_task; use crate::hummock::compactor::task_progress::TaskProgress; use crate::hummock::compactor::{ CompactionStatistics, Compactor, CompactorContext, RemoteBuilderFactory, TaskConfig, }; -use crate::hummock::iterator::SkipWatermarkState; +use crate::hummock::iterator::{NonPkPrefixSkipWatermarkState, SkipWatermarkState}; use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory}; use crate::hummock::sstable_store::SstableStoreRef; use crate::hummock::value::HummockValue; @@ -403,7 +404,7 @@ impl CompactorRunner { context .storage_opts .compactor_concurrent_uploading_sst_count, - compaction_catalog_agent_ref, + compaction_catalog_agent_ref.clone(), ); assert_eq!( task.input_ssts.len(), @@ -425,10 +426,23 @@ impl CompactorRunner { task_progress.clone(), context.storage_opts.compactor_iter_max_io_retry_times, )); - let state = SkipWatermarkState::from_safe_epoch_watermarks(&task.table_watermarks); + + let (pk_watermarks, non_pk_prefix_watermarks) = split_watermark_from_task(&task); + + let state = SkipWatermarkState::from_safe_epoch_watermarks(pk_watermarks); + let non_pk_prefix_state = NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks( + non_pk_prefix_watermarks, + compaction_catalog_agent_ref, + ); Self { - executor: CompactTaskExecutor::new(sst_builder, task_config, task_progress, state), + executor: CompactTaskExecutor::new( + sst_builder, + task_config, + task_progress, + state, + non_pk_prefix_state, + ), left, right, task_id: task.task_id, @@ -619,9 +633,10 @@ pub struct CompactTaskExecutor { builder: CapacitySplitTableBuilder, task_config: TaskConfig, task_progress: Arc, - state: SkipWatermarkState, + skip_watermark_state: SkipWatermarkState, last_key_is_delete: bool, progress_key_num: u32, + non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState, } impl CompactTaskExecutor { @@ -629,7 +644,8 @@ impl CompactTaskExecutor { builder: CapacitySplitTableBuilder, task_config: TaskConfig, task_progress: Arc, - state: SkipWatermarkState, + skip_watermark_state: SkipWatermarkState, + non_pk_prefix_skip_watermark_state: NonPkPrefixSkipWatermarkState, ) -> Self { Self { builder, @@ -640,8 +656,9 @@ impl CompactTaskExecutor { last_table_id: None, last_table_stats: TableStats::default(), task_progress, - state, + skip_watermark_state, progress_key_num: 0, + non_pk_prefix_skip_watermark_state, } } @@ -677,7 +694,9 @@ impl CompactTaskExecutor { iter: &mut BlockIterator, target_key: FullKey<&[u8]>, ) -> HummockResult<()> { - self.state.reset_watermark(); + self.skip_watermark_state.reset_watermark(); + self.non_pk_prefix_skip_watermark_state.reset_watermark(); + while iter.is_valid() && iter.key().le(&target_key) { let is_new_user_key = !self.last_key.is_empty() && iter.key().user_key != self.last_key.user_key.as_ref(); @@ -702,7 +721,8 @@ impl CompactTaskExecutor { } else if !self.task_config.retain_multiple_version && !is_first_or_new_user_key { drop = true; } - if self.state.has_watermark() && self.state.should_delete(&iter.key()) { + + if self.watermark_should_delete(&iter.key()) { drop = true; self.last_key_is_delete = true; } @@ -749,9 +769,17 @@ impl CompactTaskExecutor { // because it would cause a deleted key could be see by user again. return false; } - if self.state.has_watermark() && self.state.should_delete(smallest_key) { + + if self.watermark_should_delete(smallest_key) { return false; } + true } + + fn watermark_should_delete(&mut self, key: &FullKey<&[u8]>) -> bool { + (self.skip_watermark_state.has_watermark() && self.skip_watermark_state.should_delete(key)) + || (self.non_pk_prefix_skip_watermark_state.has_watermark() + && self.non_pk_prefix_skip_watermark_state.should_delete(key)) + } } diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index c8c1b78790e85..9d6300fc91358 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -540,15 +540,22 @@ pub fn start_compactor( let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success; if enable_check_compaction_result && need_check_task { - match check_compaction_result(&compact_task, context.clone()) - .await - { + let compact_table_ids = compact_task.build_compact_table_ids(); + match compaction_catalog_manager_ref.acquire(compact_table_ids).await { + Ok(compaction_catalog_agent_ref) => { + match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await + { + Err(e) => { + tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id); + } + Ok(true) => (), + Ok(false) => { + panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task)); + } + } + }, Err(e) => { - tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id); - } - Ok(true) => (), - Ok(false) => { - panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task)); + tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent"); } } } @@ -691,7 +698,7 @@ pub fn start_shared_compactor( compact_task, rx, Box::new(shared_compactor_object_id_manager), - compaction_catalog_agent_ref, + compaction_catalog_agent_ref.clone(), ) .await; shutdown.lock().unwrap().remove(&task_id); @@ -712,7 +719,7 @@ pub fn start_shared_compactor( let enable_check_compaction_result = context.storage_opts.check_compaction_result; let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success; if enable_check_compaction_result && need_check_task { - match check_compaction_result(&compact_task, context.clone()).await { + match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await { Err(e) => { tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id); }, diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index e3ad72f4d6099..d80abce3856ea 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -34,7 +34,7 @@ use risingwave_common::catalog::TableId; use risingwave_common::metrics::UintGauge; use risingwave_common::must_match; use risingwave_hummock_sdk::table_watermark::{ - TableWatermarks, VnodeWatermark, WatermarkDirection, + TableWatermarks, VnodeWatermark, WatermarkDirection, WatermarkSerdeType, }; use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, LocalSstableInfo}; use task_manager::{TaskManager, UploadingTaskStatus}; @@ -336,6 +336,7 @@ impl TableUnsyncData { epoch: HummockEpoch, table_watermarks: Vec, direction: WatermarkDirection, + watermark_type: WatermarkSerdeType, ) { if table_watermarks.is_empty() { return; @@ -361,12 +362,17 @@ impl TableUnsyncData { } } match &mut self.table_watermarks { - Some((prev_direction, prev_watermarks)) => { + Some((prev_direction, prev_watermarks, prev_watermark_type)) => { assert_eq!( *prev_direction, direction, "table id {} new watermark direction not match with previous", self.table_id ); + assert_eq!( + *prev_watermark_type, watermark_type, + "table id {} new watermark watermark_type not match with previous", + self.table_id + ); match prev_watermarks.entry(epoch) { Entry::Occupied(mut entry) => { let (prev_watermarks, vnode_bitmap) = entry.get_mut(); @@ -386,6 +392,7 @@ impl TableUnsyncData { self.table_watermarks = Some(( direction, BTreeMap::from_iter([(epoch, (table_watermarks, vnode_bitmap))]), + watermark_type, )); } } @@ -398,20 +405,28 @@ impl UploaderData { table_id: TableId, direction: WatermarkDirection, watermarks: impl Iterator)>, + watermark_type: WatermarkSerdeType, ) { let mut table_watermarks: Option = None; for (epoch, watermarks) in watermarks { match &mut table_watermarks { Some(prev_watermarks) => { + assert_eq!(prev_watermarks.direction, direction); + assert_eq!(prev_watermarks.watermark_type, watermark_type); prev_watermarks.add_new_epoch_watermarks( epoch, Arc::from(watermarks), direction, + watermark_type, ); } None => { - table_watermarks = - Some(TableWatermarks::single_epoch(epoch, watermarks, direction)); + table_watermarks = Some(TableWatermarks::single_epoch( + epoch, + watermarks, + direction, + watermark_type, + )); } } } @@ -628,6 +643,7 @@ struct TableUnsyncData { table_watermarks: Option<( WatermarkDirection, BTreeMap, BitmapBuilder)>, + WatermarkSerdeType, )>, spill_tasks: BTreeMap>, unsync_epochs: BTreeMap, @@ -671,6 +687,7 @@ impl TableUnsyncData { Option<( WatermarkDirection, impl Iterator)>, + WatermarkSerdeType, )>, impl Iterator, BTreeMap, @@ -692,14 +709,11 @@ impl TableUnsyncData { .map(move |(instance_id, data)| (*instance_id, data.sync(epoch))), self.table_watermarks .as_mut() - .map(|(direction, watermarks)| { - let watermarks = take_before_epoch(watermarks, epoch); - ( - *direction, - watermarks - .into_iter() - .map(|(epoch, (watermarks, _))| (epoch, watermarks)), - ) + .map(|(direction, watermarks, watermark_type)| { + let watermarks = take_before_epoch(watermarks, epoch) + .into_iter() + .map(|(epoch, (watermarks, _))| (epoch, watermarks)); + (*direction, watermarks, *watermark_type) }), take_before_epoch(&mut self.spill_tasks, epoch) .into_values() @@ -737,7 +751,7 @@ impl TableUnsyncData { self.instance_data .values() .for_each(|instance_data| instance_data.assert_after_epoch(epoch)); - if let Some((_, watermarks)) = &self.table_watermarks + if let Some((_, watermarks, _)) = &self.table_watermarks && let Some((oldest_epoch, _)) = watermarks.first_key_value() { assert_gt!(*oldest_epoch, epoch); @@ -892,8 +906,8 @@ impl UnsyncData { table_data.stopped_next_epoch = Some(next_epoch); } } - if let Some((direction, table_watermarks)) = opts.table_watermarks { - table_data.add_table_watermarks(epoch, table_watermarks, direction); + if let Some((direction, table_watermarks, watermark_type)) = opts.table_watermarks { + table_data.add_table_watermarks(epoch, table_watermarks, direction, watermark_type); } } @@ -1029,12 +1043,13 @@ impl UploaderData { true } }); - if let Some((direction, watermarks)) = table_watermarks { + if let Some((direction, watermarks, watermark_type)) = table_watermarks { Self::add_table_watermarks( &mut all_table_watermarks, *table_id, direction, watermarks, + watermark_type, ); } for task_id in task_ids { diff --git a/src/storage/src/hummock/iterator/skip_watermark.rs b/src/storage/src/hummock/iterator/skip_watermark.rs index a6c7cc4fa49ef..a43a3755c7ebe 100644 --- a/src/storage/src/hummock/iterator/skip_watermark.rs +++ b/src/storage/src/hummock/iterator/skip_watermark.rs @@ -18,6 +18,9 @@ use std::collections::{BTreeMap, VecDeque}; use bytes::Bytes; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; +use risingwave_common::row::Row; +use risingwave_common::types::Datum; +use risingwave_common::util::row_serde::OrderedRowSerde; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::safe_epoch_read_table_watermarks_impl; use risingwave_hummock_sdk::key::FullKey; use risingwave_hummock_sdk::table_stats::{add_table_stats_map, TableStats, TableStatsMap}; @@ -25,6 +28,7 @@ use risingwave_hummock_sdk::table_watermark::{ ReadTableWatermark, TableWatermarks, WatermarkDirection, }; +use crate::compaction_catalog_manager::CompactionCatalogAgentRef; use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta}; use crate::hummock::value::HummockValue; use crate::hummock::HummockResult; @@ -54,7 +58,7 @@ impl> SkipWatermarkIterator { pub fn from_safe_epoch_watermarks( inner: I, - safe_epoch_watermarks: &BTreeMap, + safe_epoch_watermarks: BTreeMap, ) -> Self { Self { inner, @@ -186,7 +190,7 @@ impl SkipWatermarkState { } pub fn from_safe_epoch_watermarks( - safe_epoch_watermarks: &BTreeMap, + safe_epoch_watermarks: BTreeMap, ) -> Self { let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks); Self::new(watermarks) @@ -206,7 +210,7 @@ impl SkipWatermarkState { return false; } Ordering::Equal => { - return direction.filter_by_watermark(inner_key, watermark); + return direction.key_filter_by_watermark(inner_key, watermark); } Ordering::Greater => { // The current key has advanced over the watermark. @@ -303,24 +307,341 @@ impl SkipWatermarkState { } } +pub struct NonPkPrefixSkipWatermarkState { + watermarks: BTreeMap, + remain_watermarks: VecDeque<(TableId, VirtualNode, WatermarkDirection, Datum)>, + compaction_catalog_agent_ref: CompactionCatalogAgentRef, + + last_serde: Option<(OrderedRowSerde, OrderedRowSerde, usize)>, +} + +impl NonPkPrefixSkipWatermarkState { + pub fn new( + watermarks: BTreeMap, + compaction_catalog_agent_ref: CompactionCatalogAgentRef, + ) -> Self { + Self { + remain_watermarks: VecDeque::new(), + watermarks, + compaction_catalog_agent_ref, + last_serde: None, + } + } + + pub fn from_safe_epoch_watermarks( + safe_epoch_watermarks: BTreeMap, + compaction_catalog_agent_ref: CompactionCatalogAgentRef, + ) -> Self { + let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks); + Self::new(watermarks, compaction_catalog_agent_ref) + } + + #[inline(always)] + pub fn has_watermark(&self) -> bool { + !self.remain_watermarks.is_empty() + } + + pub fn should_delete(&mut self, key: &FullKey<&[u8]>) -> bool { + if let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() { + let key_table_id = key.user_key.table_id; + let (key_vnode, inner_key) = key.user_key.table_key.split_vnode(); + match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) { + Ordering::Less => { + return false; + } + Ordering::Equal => { + let (pk_prefix_serde, watermark_col_serde, watermark_col_idx_in_pk) = + match self.last_serde.as_ref() { + Some(serde) => serde, + None => { + self.last_serde = self + .compaction_catalog_agent_ref + .watermark_serde(table_id.table_id()); + self.last_serde.as_ref().unwrap() + } + }; + let row = pk_prefix_serde + .deserialize(inner_key) + .unwrap_or_else(|_| { + panic!("Failed to deserialize pk_prefix inner_key {:?} serde data_types {:?} order_types {:?}", inner_key, pk_prefix_serde.get_data_types(), pk_prefix_serde.get_order_types()); + }); + let watermark_col_in_pk = row.datum_at(*watermark_col_idx_in_pk); + return direction.datum_filter_by_watermark( + watermark_col_in_pk, + watermark, + watermark_col_serde.get_order_types()[0], + ); + } + Ordering::Greater => { + // The current key has advanced over the watermark. + // We may advance the watermark before advancing the key. + return self.advance_watermark(key); + } + } + } + false + } + + pub fn reset_watermark(&mut self) { + self.remain_watermarks = self + .watermarks + .iter() + .flat_map(|(table_id, read_watermarks)| { + let watermark_serde = self.compaction_catalog_agent_ref.watermark_serde(table_id.table_id()).map(|(_pk_serde, watermark_serde, _watermark_col_idx_in_pk)| watermark_serde); + + read_watermarks + .vnode_watermarks + .iter() + .map(move |(vnode, watermarks)| { + ( + *table_id, + *vnode, + read_watermarks.direction, + { + let watermark_serde = watermark_serde.as_ref().unwrap(); + let row = watermark_serde + .deserialize(watermarks).unwrap_or_else(|_| { + panic!("Failed to deserialize watermark {:?} serde data_types {:?} order_types {:?}", watermarks, watermark_serde.get_data_types(), watermark_serde.get_order_types()); + }); + row[0].clone() + }, + ) + }) + }) + .collect(); + } + + fn advance_watermark(&mut self, key: &FullKey<&[u8]>) -> bool { + let key_table_id = key.user_key.table_id; + let (key_vnode, inner_key) = key.user_key.table_key.split_vnode(); + while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front() { + match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) { + Ordering::Less => { + self.remain_watermarks.pop_front(); + continue; + } + Ordering::Equal => { + self.last_serde = self + .compaction_catalog_agent_ref + .watermark_serde(table_id.table_id()); + let (pk_prefix_serde, watermark_col_serde, watermark_col_idx_in_pk) = + match self.last_serde.as_ref() { + Some(serde) => serde, + None => { + self.last_serde = self + .compaction_catalog_agent_ref + .watermark_serde(table_id.table_id()); + self.last_serde.as_ref().unwrap() + } + }; + + let row = pk_prefix_serde + .deserialize(inner_key) + .unwrap_or_else(|_| { + panic!("Failed to deserialize pk_prefix inner_key {:?} serde data_types {:?} order_types {:?}", inner_key, pk_prefix_serde.get_data_types(), pk_prefix_serde.get_order_types()); + }); + let watermark_col_in_pk = row.datum_at(*watermark_col_idx_in_pk); + + return direction.datum_filter_by_watermark( + watermark_col_in_pk, + watermark, + watermark_col_serde.get_order_types()[0], + ); + } + Ordering::Greater => { + return false; + } + } + } + false + } +} + +pub struct NonPkPrefixSkipWatermarkIterator { + inner: I, + state: NonPkPrefixSkipWatermarkState, + /// The stats of skipped key-value pairs for each table. + skipped_entry_table_stats: TableStatsMap, + /// The id of table currently undergoing processing. + last_table_id: Option, + /// The stats of table currently undergoing processing. + last_table_stats: TableStats, +} + +impl> NonPkPrefixSkipWatermarkIterator { + pub fn new( + inner: I, + watermarks: BTreeMap, + compaction_catalog_agent_ref: CompactionCatalogAgentRef, + ) -> Self { + Self { + inner, + state: NonPkPrefixSkipWatermarkState::new(watermarks, compaction_catalog_agent_ref), + skipped_entry_table_stats: TableStatsMap::default(), + last_table_id: None, + last_table_stats: TableStats::default(), + } + } + + pub fn from_safe_epoch_watermarks( + inner: I, + safe_epoch_watermarks: BTreeMap, + compaction_catalog_agent_ref: CompactionCatalogAgentRef, + ) -> Self { + Self { + inner, + state: NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks( + safe_epoch_watermarks, + compaction_catalog_agent_ref, + ), + skipped_entry_table_stats: TableStatsMap::default(), + last_table_id: None, + last_table_stats: TableStats::default(), + } + } + + fn reset_watermark(&mut self) { + self.state.reset_watermark(); + } + + fn reset_skipped_entry_table_stats(&mut self) { + self.skipped_entry_table_stats = TableStatsMap::default(); + self.last_table_id = None; + self.last_table_stats = TableStats::default(); + } + + /// Advance the key until iterator invalid or the current key will not be filtered by the latest watermark. + /// Calling this method should ensure that the first remaining watermark has been advanced to the current key. + /// + /// Return a flag indicating whether should later advance the watermark. + async fn advance_key_and_watermark(&mut self) -> HummockResult<()> { + // advance key and watermark in an interleave manner until nothing + // changed after the method is called. + while self.inner.is_valid() { + if !self.state.should_delete(&self.inner.key()) { + break; + } + + let table_id = self.inner.key().user_key.table_id.table_id; + + if self + .last_table_id + .map_or(true, |last_table_id| last_table_id != table_id) + { + self.add_last_table_stats(); + self.last_table_id = Some(table_id); + } + self.last_table_stats.total_key_count -= 1; + self.last_table_stats.total_key_size -= self.inner.key().encoded_len() as i64; + self.last_table_stats.total_value_size -= self.inner.value().encoded_len() as i64; + + self.inner.next().await?; + } + self.add_last_table_stats(); + Ok(()) + } + + fn add_last_table_stats(&mut self) { + let Some(last_table_id) = self.last_table_id.take() else { + return; + }; + let delta = std::mem::take(&mut self.last_table_stats); + let e = self + .skipped_entry_table_stats + .entry(last_table_id) + .or_default(); + e.total_key_count += delta.total_key_count; + e.total_key_size += delta.total_key_size; + e.total_value_size += delta.total_value_size; + } +} + +impl> HummockIterator + for NonPkPrefixSkipWatermarkIterator +{ + type Direction = Forward; + + async fn next(&mut self) -> HummockResult<()> { + self.inner.next().await?; + // Check whether there is any remaining watermark and return early to + // avoid calling the async `advance_key_and_watermark`, since in benchmark + // performance downgrade is observed without this early return. + if self.state.has_watermark() { + self.advance_key_and_watermark().await?; + } + Ok(()) + } + + fn key(&self) -> FullKey<&[u8]> { + self.inner.key() + } + + fn value(&self) -> HummockValue<&[u8]> { + self.inner.value() + } + + fn is_valid(&self) -> bool { + self.inner.is_valid() + } + + async fn rewind(&mut self) -> HummockResult<()> { + self.reset_watermark(); + self.reset_skipped_entry_table_stats(); + self.inner.rewind().await?; + self.advance_key_and_watermark().await?; + Ok(()) + } + + async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> { + self.reset_watermark(); + self.reset_skipped_entry_table_stats(); + self.inner.seek(key).await?; + self.advance_key_and_watermark().await?; + Ok(()) + } + + fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) { + add_table_stats_map( + &mut stats.skipped_by_watermark_table_stats, + &self.skipped_entry_table_stats, + ); + self.inner.collect_local_statistic(stats) + } + + fn value_meta(&self) -> ValueMeta { + self.inner.value_meta() + } +} + #[cfg(test)] mod tests { - use std::collections::BTreeMap; + use std::collections::{BTreeMap, HashMap}; use std::iter::{empty, once}; + use std::sync::Arc; use bytes::Bytes; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; + use risingwave_common::row::{OwnedRow, RowExt}; + use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_common::util::epoch::test_epoch; + use risingwave_common::util::row_serde::OrderedRowSerde; + use risingwave_common::util::sort_util::OrderType; use risingwave_hummock_sdk::key::{gen_key_from_str, FullKey, TableKey, UserKey}; use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, WatermarkDirection}; use risingwave_hummock_sdk::EpochWithGap; - use crate::hummock::iterator::{HummockIterator, SkipWatermarkIterator}; + use crate::compaction_catalog_manager::{ + CompactionCatalogAgent, FilterKeyExtractorImpl, FullKeyFilterKeyExtractor, + }; + use crate::hummock::iterator::{ + HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator, SkipWatermarkIterator, + }; use crate::hummock::shared_buffer::shared_buffer_batch::{ SharedBufferBatch, SharedBufferValue, }; + use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode}; const EPOCH: u64 = test_epoch(1); const TABLE_ID: TableId = TableId::new(233); @@ -366,12 +687,13 @@ mod tests { fn build_batch( pairs: impl Iterator, SharedBufferValue)>, + table_id: TableId, ) -> Option { let pairs: Vec<_> = pairs.collect(); if pairs.is_empty() { None } else { - Some(SharedBufferBatch::for_test(pairs, EPOCH, TABLE_ID)) + Some(SharedBufferBatch::for_test(pairs, EPOCH, table_id)) } } @@ -383,7 +705,7 @@ mod tests { if let Some(watermark) = table_watermarks.vnode_watermarks.get(&key.vnode_part()) { !table_watermarks .direction - .filter_by_watermark(key.key_part(), watermark) + .key_filter_by_watermark(key.key_part(), watermark) } else { true } @@ -398,7 +720,8 @@ mod tests { watermarks: impl IntoIterator, direction: WatermarkDirection, ) { - let test_index = [(0, 2), (0, 3), (0, 4), (1, 1), (1, 3), (4, 2), (8, 1)]; + let test_index: [(usize, usize); 7] = + [(0, 2), (0, 3), (0, 4), (1, 1), (1, 3), (4, 2), (8, 1)]; let items = test_index .iter() .map(|(vnode, key_index)| gen_key_value(*vnode, *key_index)) @@ -417,12 +740,13 @@ mod tests { }; let gen_iters = || { - let batch = build_batch(filter_with_watermarks( - items.clone().into_iter(), - read_watermark.clone(), - )); + let batch = build_batch( + filter_with_watermarks(items.clone().into_iter(), read_watermark.clone()), + TABLE_ID, + ); + let iter = SkipWatermarkIterator::new( - build_batch(items.clone().into_iter()) + build_batch(items.clone().into_iter(), TABLE_ID) .unwrap() .into_forward_iter(), BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))), @@ -491,4 +815,646 @@ mod tests { async fn test_advance_multi_vnode() { test_watermark(vec![(1, 2), (8, 0)], WatermarkDirection::Ascending).await; } + + #[tokio::test] + async fn test_non_pk_prefix_watermark() { + fn gen_key_value( + vnode: usize, + col_0: i32, + col_1: i32, + col_2: i32, + col_3: i32, + pk_serde: &OrderedRowSerde, + pk_indices: &[usize], + ) -> (TableKey, SharedBufferValue) { + let r = OwnedRow::new(vec![ + Some(ScalarImpl::Int32(col_0)), + Some(ScalarImpl::Int32(col_1)), + Some(ScalarImpl::Int32(col_2)), // watermark column + Some(ScalarImpl::Int32(col_3)), + ]); + + let pk = r.project(pk_indices); + + let k1 = serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode)); + let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice( + format!("{}-value-{}", vnode, col_2).as_bytes(), + )); + (k1, v1) + } + + let watermark_direction = WatermarkDirection::Ascending; + + let watermark_col_serde = + OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]); + let pk_serde = OrderedRowSerde::new( + vec![DataType::Int32, DataType::Int32, DataType::Int32], + vec![ + OrderType::ascending(), + OrderType::ascending(), + OrderType::ascending(), + ], + ); + + let pk_indices = vec![0, 2, 3]; + let watermark_col_idx_in_pk = 1; + + { + // test single vnode + let shared_buffer_batch = { + let mut kv_pairs = (0..10) + .map(|i| gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices)) + .collect_vec(); + kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2)); + build_batch(kv_pairs.into_iter(), TABLE_ID) + } + .unwrap(); + + { + // empty read watermark + let read_watermark = ReadTableWatermark { + direction: watermark_direction, + vnode_watermarks: BTreeMap::default(), + }; + + let compaction_catalog_agent_ref = + CompactionCatalogAgent::for_test(vec![TABLE_ID.into()]); + + let mut iter = NonPkPrefixSkipWatermarkIterator::new( + shared_buffer_batch.clone().into_forward_iter(), + BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))), + compaction_catalog_agent_ref, + ); + + iter.rewind().await.unwrap(); + assert!(iter.is_valid()); + for i in 0..10 { + let (k, _v) = gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices); + assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref()); + iter.next().await.unwrap(); + } + assert!(!iter.is_valid()); + } + + { + // test watermark + let watermark = { + let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]); + serialize_pk(r1, &watermark_col_serde) + }; + + let read_watermark = ReadTableWatermark { + direction: watermark_direction, + vnode_watermarks: BTreeMap::from_iter(once(( + VirtualNode::from_index(0), + watermark.clone(), + ))), + }; + + let full_key_filter_key_extractor = + FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor); + + let table_id_to_vnode = + HashMap::from_iter(once((TABLE_ID.table_id(), VirtualNode::COUNT_FOR_TEST))); + + let table_id_to_watermark_serde = HashMap::from_iter(once(( + TABLE_ID.table_id(), + Some(( + pk_serde.clone(), + watermark_col_serde.clone(), + watermark_col_idx_in_pk, + )), + ))); + + let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new( + full_key_filter_key_extractor, + table_id_to_vnode, + table_id_to_watermark_serde, + )); + + let mut iter = NonPkPrefixSkipWatermarkIterator::new( + shared_buffer_batch.clone().into_forward_iter(), + BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))), + compaction_catalog_agent_ref, + ); + + iter.rewind().await.unwrap(); + assert!(iter.is_valid()); + for i in 5..10 { + let (k, _v) = gen_key_value(0, i, 0, i, i, &pk_serde, &pk_indices); + assert_eq!(iter.key().user_key.table_key.as_ref(), k.as_ref()); + iter.next().await.unwrap(); + } + assert!(!iter.is_valid()); + } + } + + { + // test multi vnode + let shared_buffer_batch = { + let mut kv_pairs = (0..10_i32) + .map(|i| gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices)) + .collect_vec(); + + kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2)); + build_batch(kv_pairs.into_iter(), TABLE_ID) + }; + + { + // test watermark + let watermark = { + let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]); + serialize_pk(r1, &watermark_col_serde) + }; + + let read_watermark = ReadTableWatermark { + direction: watermark_direction, + vnode_watermarks: BTreeMap::from_iter( + (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())), + ), + }; + + let full_key_filter_key_extractor = + FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor); + + let table_id_to_vnode = + HashMap::from_iter(once((TABLE_ID.table_id(), VirtualNode::COUNT_FOR_TEST))); + + let table_id_to_watermark_serde = HashMap::from_iter(once(( + TABLE_ID.table_id(), + Some(( + pk_serde.clone(), + watermark_col_serde.clone(), + watermark_col_idx_in_pk, + )), + ))); + + let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new( + full_key_filter_key_extractor, + table_id_to_vnode, + table_id_to_watermark_serde, + )); + + let mut iter = NonPkPrefixSkipWatermarkIterator::new( + shared_buffer_batch.clone().unwrap().into_forward_iter(), + BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))), + compaction_catalog_agent_ref, + ); + + iter.rewind().await.unwrap(); + assert!(iter.is_valid()); + let mut kv_pairs = (5..10_i32) + .map(|i| { + let (k, v) = + gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices); + (k, v) + }) + .collect_vec(); + kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2)); + let mut index = 0; + while iter.is_valid() { + assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref()); + iter.next().await.unwrap(); + index += 1; + } + } + + { + // test watermark + let watermark = { + let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]); + serialize_pk(r1, &watermark_col_serde) + }; + + let read_watermark = ReadTableWatermark { + direction: watermark_direction, + vnode_watermarks: BTreeMap::from_iter( + (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())), + ), + }; + + let full_key_filter_key_extractor = + FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor); + + let table_id_to_vnode = + HashMap::from_iter(once((TABLE_ID.table_id(), VirtualNode::COUNT_FOR_TEST))); + + let table_id_to_watermark_serde = HashMap::from_iter(once(( + TABLE_ID.table_id(), + Some(( + pk_serde.clone(), + watermark_col_serde.clone(), + watermark_col_idx_in_pk, + )), + ))); + + let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new( + full_key_filter_key_extractor, + table_id_to_vnode, + table_id_to_watermark_serde, + )); + + let mut iter = NonPkPrefixSkipWatermarkIterator::new( + shared_buffer_batch.clone().unwrap().into_forward_iter(), + BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))), + compaction_catalog_agent_ref, + ); + + iter.rewind().await.unwrap(); + assert!(iter.is_valid()); + let mut kv_pairs = (5..10_i32) + .map(|i| { + let (k, v) = + gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices); + (k, v) + }) + .collect_vec(); + kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2)); + let mut index = 0; + while iter.is_valid() { + assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref()); + iter.next().await.unwrap(); + index += 1; + } + } + + { + // test watermark Descending + let watermark = { + let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]); + serialize_pk(r1, &watermark_col_serde) + }; + + let read_watermark = ReadTableWatermark { + direction: WatermarkDirection::Descending, + vnode_watermarks: BTreeMap::from_iter( + (0..2).map(|i| (VirtualNode::from_index(i), watermark.clone())), + ), + }; + + let full_key_filter_key_extractor = + FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor); + + let table_id_to_vnode = + HashMap::from_iter(once((TABLE_ID.table_id(), VirtualNode::COUNT_FOR_TEST))); + + let table_id_to_watermark_serde = HashMap::from_iter(once(( + TABLE_ID.table_id(), + Some(( + pk_serde.clone(), + watermark_col_serde.clone(), + watermark_col_idx_in_pk, + )), + ))); + + let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new( + full_key_filter_key_extractor, + table_id_to_vnode, + table_id_to_watermark_serde, + )); + + let mut iter = NonPkPrefixSkipWatermarkIterator::new( + shared_buffer_batch.clone().unwrap().into_forward_iter(), + BTreeMap::from_iter(once((TABLE_ID, read_watermark.clone()))), + compaction_catalog_agent_ref, + ); + + iter.rewind().await.unwrap(); + assert!(iter.is_valid()); + let mut kv_pairs = (0..=5_i32) + .map(|i| { + let (k, v) = + gen_key_value(i as usize % 2, 10 - i, 0, i, i, &pk_serde, &pk_indices); + (k, v) + }) + .collect_vec(); + kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2)); + let mut index = 0; + while iter.is_valid() { + assert!(kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref()); + iter.next().await.unwrap(); + index += 1; + } + } + } + } + + #[tokio::test] + async fn test_mix_watermark() { + fn gen_key_value( + vnode: usize, + col_0: i32, + col_1: i32, + col_2: i32, + col_3: i32, + pk_serde: &OrderedRowSerde, + pk_indices: &[usize], + ) -> (TableKey, SharedBufferValue) { + let r = OwnedRow::new(vec![ + Some(ScalarImpl::Int32(col_0)), + Some(ScalarImpl::Int32(col_1)), + Some(ScalarImpl::Int32(col_2)), // watermark column + Some(ScalarImpl::Int32(col_3)), + ]); + + let pk = r.project(pk_indices); + + let k1 = serialize_pk_with_vnode(pk, pk_serde, VirtualNode::from_index(vnode)); + let v1 = SharedBufferValue::Insert(Bytes::copy_from_slice( + format!("{}-value-{}-{}-{}-{}", vnode, col_0, col_1, col_2, col_3).as_bytes(), + )); + + (k1, v1) + } + + let watermark_col_serde = + OrderedRowSerde::new(vec![DataType::Int32], vec![OrderType::ascending()]); + let t1_pk_serde = OrderedRowSerde::new( + vec![DataType::Int32, DataType::Int32, DataType::Int32], + vec![ + OrderType::ascending(), + OrderType::ascending(), + OrderType::ascending(), + ], + ); + + let t1_pk_indices = vec![0, 2, 3]; + let t1_watermark_col_idx_in_pk = 1; + + let t2_pk_indices = vec![0, 1, 2]; + + let t2_pk_serde = OrderedRowSerde::new( + vec![DataType::Int32, DataType::Int32, DataType::Int32], + vec![ + OrderType::ascending(), + OrderType::ascending(), + OrderType::ascending(), + ], + ); + + let t1_id = TABLE_ID; + let t2_id = TableId::from(t1_id.table_id() + 1); + + let t1_shared_buffer_batch = { + let mut kv_pairs = (0..10_i32) + .map(|i| { + gen_key_value( + i as usize % 2, + 10 - i, + 0, + i, + i, + &t1_pk_serde, + &t1_pk_indices, + ) + }) + .collect_vec(); + + kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2)); + build_batch(kv_pairs.into_iter(), t1_id).unwrap() + }; + + let t2_shared_buffer_batch = { + let mut kv_pairs = (0..10_i32) + .map(|i| { + gen_key_value( + i as usize % 2, + 10 - i, + 0, + 0, + 0, + &t2_pk_serde, + &t2_pk_indices, + ) + }) + .collect_vec(); + + kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2)); + build_batch(kv_pairs.into_iter(), t2_id).unwrap() + }; + + let t1_watermark = { + let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]); + serialize_pk(r1, &watermark_col_serde) + }; + + let t1_read_watermark = ReadTableWatermark { + direction: WatermarkDirection::Ascending, + vnode_watermarks: BTreeMap::from_iter( + (0..2).map(|i| (VirtualNode::from_index(i), t1_watermark.clone())), + ), + }; + + let t2_watermark = { + let r1 = OwnedRow::new(vec![Some(ScalarImpl::Int32(5))]); + serialize_pk(r1, &watermark_col_serde) + }; + + let t2_read_watermark = ReadTableWatermark { + direction: WatermarkDirection::Ascending, + vnode_watermarks: BTreeMap::from_iter( + (0..2).map(|i| (VirtualNode::from_index(i), t2_watermark.clone())), + ), + }; + + { + // test non pk prefix watermark + let t1_iter = t1_shared_buffer_batch.clone().into_forward_iter(); + let t2_iter = t2_shared_buffer_batch.clone().into_forward_iter(); + let iter_vec = vec![t1_iter, t2_iter]; + let merge_iter = MergeIterator::new(iter_vec); + + let full_key_filter_key_extractor = + FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor); + + let table_id_to_vnode = + HashMap::from_iter(once((TABLE_ID.table_id(), VirtualNode::COUNT_FOR_TEST))); + + let table_id_to_watermark_serde = HashMap::from_iter(once(( + t1_id.table_id(), + Some(( + t1_pk_serde.clone(), + watermark_col_serde.clone(), + t1_watermark_col_idx_in_pk, + )), + ))); + + let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new( + full_key_filter_key_extractor, + table_id_to_vnode, + table_id_to_watermark_serde, + )); + + let mut iter = NonPkPrefixSkipWatermarkIterator::new( + merge_iter, + BTreeMap::from_iter(once((TABLE_ID, t1_read_watermark.clone()))), + compaction_catalog_agent_ref, + ); + + iter.rewind().await.unwrap(); + assert!(iter.is_valid()); + let mut t1_kv_pairs = (5..10_i32) + .map(|i| { + let (k, v) = gen_key_value( + i as usize % 2, + 10 - i, + 0, + i, + i, + &t1_pk_serde, + &t1_pk_indices, + ); + (k, v) + }) + .collect_vec(); + + t1_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2)); + + let mut t2_kv_pairs = (0..10_i32) + .map(|i| { + gen_key_value( + i as usize % 2, + 10 - i, + 0, + 0, + 0, + &t2_pk_serde, + &t2_pk_indices, + ) + }) + .collect_vec(); + + t2_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2)); + let mut index = 0; + for _ in 0..t1_kv_pairs.len() { + assert!(t1_kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref()); + iter.next().await.unwrap(); + index += 1; + } + + assert!(iter.is_valid()); + assert_eq!(t1_kv_pairs.len(), index); + + index = 0; + for _ in 0..t2_kv_pairs.len() { + assert!(t2_kv_pairs[index].0.as_ref() == iter.key().user_key.table_key.as_ref()); + iter.next().await.unwrap(); + index += 1; + } + + assert!(!iter.is_valid()); + assert_eq!(t2_kv_pairs.len(), index); + } + + { + let t1_iter = t1_shared_buffer_batch.clone().into_forward_iter(); + let t2_iter = t2_shared_buffer_batch.clone().into_forward_iter(); + let iter_vec = vec![t1_iter, t2_iter]; + let merge_iter = MergeIterator::new(iter_vec); + + let full_key_filter_key_extractor = + FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor); + + let table_id_to_vnode = HashMap::from_iter( + vec![ + (t1_id.table_id(), VirtualNode::COUNT_FOR_TEST), + (t2_id.table_id(), VirtualNode::COUNT_FOR_TEST), + ] + .into_iter(), + ); + + let table_id_to_watermark_serde = HashMap::from_iter( + vec![ + ( + t1_id.table_id(), + Some(( + t1_pk_serde.clone(), + watermark_col_serde.clone(), + t1_watermark_col_idx_in_pk, + )), + ), + (t2_id.table_id(), None), + ] + .into_iter(), + ); + + let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new( + full_key_filter_key_extractor, + table_id_to_vnode, + table_id_to_watermark_serde, + )); + + let non_pk_prefix_iter = NonPkPrefixSkipWatermarkIterator::new( + merge_iter, + BTreeMap::from_iter(once((t1_id, t1_read_watermark.clone()))), + compaction_catalog_agent_ref.clone(), + ); + + let mut mix_iter = SkipWatermarkIterator::new( + non_pk_prefix_iter, + BTreeMap::from_iter(once((t2_id, t2_read_watermark.clone()))), + ); + + mix_iter.rewind().await.unwrap(); + assert!(mix_iter.is_valid()); + + let mut t1_kv_pairs = (5..10_i32) + .map(|i| { + let (k, v) = gen_key_value( + i as usize % 2, + 10 - i, + 0, + i, + i, + &t1_pk_serde, + &t1_pk_indices, + ); + (k, v) + }) + .collect_vec(); + + t1_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2)); + + let mut t2_kv_pairs = (0..=5_i32) + .map(|i| { + gen_key_value( + i as usize % 2, + 10 - i, + 0, + 0, + 0, + &t2_pk_serde, + &t2_pk_indices, + ) + }) + .collect_vec(); + + t2_kv_pairs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2)); + + let mut index = 0; + for _ in 0..t1_kv_pairs.len() { + assert!( + t1_kv_pairs[index].0.as_ref() == mix_iter.key().user_key.table_key.as_ref() + ); + mix_iter.next().await.unwrap(); + index += 1; + } + + assert!(mix_iter.is_valid()); + assert_eq!(t1_kv_pairs.len(), index); + + index = 0; + + for _ in 0..t2_kv_pairs.len() { + assert!( + t2_kv_pairs[index].0.as_ref() == mix_iter.key().user_key.table_key.as_ref() + ); + mix_iter.next().await.unwrap(); + index += 1; + } + + assert!(!mix_iter.is_valid()); + assert_eq!(t2_kv_pairs.len(), index); + } + } } diff --git a/src/storage/src/hummock/sstable/builder.rs b/src/storage/src/hummock/sstable/builder.rs index 1662bc2795fac..95a57fcd6695c 100644 --- a/src/storage/src/hummock/sstable/builder.rs +++ b/src/storage/src/hummock/sstable/builder.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use std::time::SystemTime; use bytes::{Bytes, BytesMut}; +use risingwave_common::util::row_serde::OrderedRowSerde; use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_hummock_sdk::key::{user_key, FullKey, MAX_KEY_LEN}; use risingwave_hummock_sdk::key_range::KeyRange; @@ -136,10 +137,15 @@ impl SstableBuilder { writer: W, options: SstableBuilderOptions, table_id_to_vnode: HashMap, + table_id_to_watermark_serde: HashMap< + u32, + Option<(OrderedRowSerde, OrderedRowSerde, usize)>, + >, ) -> Self { let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new( FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor), table_id_to_vnode, + table_id_to_watermark_serde, )); Self::new( @@ -737,7 +743,14 @@ pub(super) mod tests { }; let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]); - let b = SstableBuilder::for_test(0, mock_sst_writer(&opt), opt, table_id_to_vnode); + let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]); + let b = SstableBuilder::for_test( + 0, + mock_sst_writer(&opt), + opt, + table_id_to_vnode, + table_id_to_watermark_serde, + ); b.finish().await.unwrap(); } @@ -747,7 +760,14 @@ pub(super) mod tests { let opt = default_builder_opt_for_test(); let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]); - let mut b = SstableBuilder::for_test(0, mock_sst_writer(&opt), opt, table_id_to_vnode); + let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]); + let mut b = SstableBuilder::for_test( + 0, + mock_sst_writer(&opt), + opt, + table_id_to_vnode, + table_id_to_watermark_serde, + ); for i in 0..TEST_KEYS_COUNT { b.add_for_test( @@ -787,6 +807,7 @@ pub(super) mod tests { // build remote table let sstable_store = mock_sstable_store().await; let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]); + let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]); let sst_info = gen_test_sstable_impl::, F>( opts, 0, @@ -794,6 +815,7 @@ pub(super) mod tests { sstable_store.clone(), CachePolicy::NotFill, table_id_to_vnode, + table_id_to_watermark_serde, ) .await; let table = sstable_store @@ -850,10 +872,12 @@ pub(super) mod tests { (2, VirtualNode::COUNT_FOR_TEST), (3, VirtualNode::COUNT_FOR_TEST), ]); + let table_id_to_watermark_serde = HashMap::from_iter(vec![(1, None), (2, None), (3, None)]); let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new( FilterKeyExtractorImpl::Multi(filter), table_id_to_vnode, + table_id_to_watermark_serde, )); let mut builder = SstableBuilder::new( diff --git a/src/storage/src/hummock/sstable/multi_builder.rs b/src/storage/src/hummock/sstable/multi_builder.rs index 7c23683b25f22..86b6b4cae4aea 100644 --- a/src/storage/src/hummock/sstable/multi_builder.rs +++ b/src/storage/src/hummock/sstable/multi_builder.rs @@ -385,7 +385,15 @@ impl TableBuilderFactory for LocalTableBuilderFactory { TableId::default().table_id(), VirtualNode::COUNT_FOR_TEST, )]); - let builder = SstableBuilder::for_test(id, writer, self.options.clone(), table_id_to_vnode); + let table_id_to_watermark_serde = + HashMap::from_iter(vec![(TableId::default().table_id(), None)]); + let builder = SstableBuilder::for_test( + id, + writer, + self.options.clone(), + table_id_to_vnode, + table_id_to_watermark_serde, + ); Ok(builder) } @@ -588,9 +596,12 @@ mod tests { BTreeMap::from([(1_u32, 4_u32), (2_u32, 4_u32), (3_u32, 4_u32)]); let table_id_to_vnode = HashMap::from_iter(vec![(1, 64), (2, 128), (3, 256)]); + let table_id_to_watermark_serde = + HashMap::from_iter(vec![(1, None), (2, None), (3, None)]); let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new( FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor), table_id_to_vnode, + table_id_to_watermark_serde, )); let mut builder = CapacitySplitTableBuilder::new( diff --git a/src/storage/src/hummock/sstable/xor_filter.rs b/src/storage/src/hummock/sstable/xor_filter.rs index 6461e505ccc41..5948c4f272846 100644 --- a/src/storage/src/hummock/sstable/xor_filter.rs +++ b/src/storage/src/hummock/sstable/xor_filter.rs @@ -482,9 +482,11 @@ mod tests { .create_sst_writer(object_id, writer_opts); let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]); + let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]); let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new( FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor), table_id_to_vnode, + table_id_to_watermark_serde, )); let mut builder = SstableBuilder::new( diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index 69952838e9829..1f51e3e1c0b2d 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -25,6 +25,7 @@ use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::MAX_SPILL_TIMES; use risingwave_hummock_sdk::key::{is_empty_key_range, vnode_range, TableKey, TableKeyRange}; use risingwave_hummock_sdk::sstable_info::SstableInfo; +use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType; use risingwave_hummock_sdk::EpochWithGap; use tracing::{warn, Instrument}; @@ -504,7 +505,11 @@ impl LocalStateStore for LocalHummockStorage { next_epoch, prev_epoch ); - if let Some((direction, watermarks)) = &mut opts.table_watermarks { + + // only update the PkPrefix watermark for read + if let Some((direction, watermarks, WatermarkSerdeType::PkPrefix)) = + &mut opts.table_watermarks + { let mut read_version = self.read_version.write(); read_version.filter_regress_watermarks(watermarks); if !watermarks.is_empty() { @@ -512,9 +517,11 @@ impl LocalStateStore for LocalHummockStorage { direction: *direction, epoch: prev_epoch, vnode_watermarks: watermarks.clone(), + watermark_type: WatermarkSerdeType::PkPrefix, }); } } + if !self.is_replicated && self .event_sender diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 8e0c7a589b203..a47850c53e224 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -33,7 +33,7 @@ use risingwave_hummock_sdk::key::{ use risingwave_hummock_sdk::key_range::KeyRangeCommon; use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_watermark::{ - TableWatermarksIndex, VnodeWatermark, WatermarkDirection, + TableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType, }; use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo}; use risingwave_pb::hummock::LevelType; @@ -141,6 +141,7 @@ pub enum VersionUpdate { direction: WatermarkDirection, epoch: HummockEpoch, vnode_watermarks: Vec, + watermark_type: WatermarkSerdeType, }, } @@ -248,19 +249,24 @@ impl HummockReadVersion { Self { table_id, instance_id, - table_watermarks: committed_version.table_watermarks.get(&table_id).map( - |table_watermarks| { - TableWatermarksIndex::new_committed( - table_watermarks.clone(), - committed_version - .state_table_info - .info() - .get(&table_id) - .expect("should exist") - .committed_epoch, - ) - }, - ), + table_watermarks: { + match committed_version.table_watermarks.get(&table_id) { + Some(table_watermarks) => match table_watermarks.watermark_type { + WatermarkSerdeType::PkPrefix => Some(TableWatermarksIndex::new_committed( + table_watermarks.clone(), + committed_version + .state_table_info + .info() + .get(&table_id) + .expect("should exist") + .committed_epoch, + )), + + WatermarkSerdeType::NonPkPrefix => None, /* do not fill the non-pk prefix watermark to index */ + }, + None => None, + } + }, staging: StagingVersion { imm: VecDeque::default(), sst: VecDeque::default(), @@ -406,7 +412,9 @@ impl HummockReadVersion { direction, epoch, vnode_watermarks, + watermark_type, } => { + assert_eq!(WatermarkSerdeType::PkPrefix, watermark_type); if let Some(watermark_index) = &mut self.table_watermarks { watermark_index.add_epoch_watermark( epoch, diff --git a/src/storage/src/hummock/test_utils.rs b/src/storage/src/hummock/test_utils.rs index 0459c16574865..b65fcee59061e 100644 --- a/src/storage/src/hummock/test_utils.rs +++ b/src/storage/src/hummock/test_utils.rs @@ -25,6 +25,7 @@ use risingwave_common::catalog::TableId; use risingwave_common::config::EvictionConfig; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::test_epoch; +use risingwave_common::util::row_serde::OrderedRowSerde; use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey}; use risingwave_hummock_sdk::key_range::KeyRange; @@ -162,7 +163,14 @@ pub async fn gen_test_sstable_data( TableId::default().table_id(), VirtualNode::COUNT_FOR_TEST, )]); - let mut b = SstableBuilder::for_test(0, mock_sst_writer(&opts), opts, table_id_to_vnode); + let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]); + let mut b = SstableBuilder::for_test( + 0, + mock_sst_writer(&opts), + opts, + table_id_to_vnode, + table_id_to_watermark_serde, + ); for (key, value) in kv_iter { b.add_for_test(key.to_ref(), value.as_slice()) .await @@ -233,6 +241,7 @@ pub async fn gen_test_sstable_impl + Clone + Default + Eq, F: Fil sstable_store: SstableStoreRef, policy: CachePolicy, table_id_to_vnode: HashMap, + table_id_to_watermark_serde: HashMap>, ) -> SstableInfo { let writer_opts = SstableWriterOptions { capacity_hint: None, @@ -246,6 +255,7 @@ pub async fn gen_test_sstable_impl + Clone + Default + Eq, F: Fil let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new( FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor), table_id_to_vnode, + table_id_to_watermark_serde, )); let mut b = SstableBuilder::<_, F>::new( @@ -283,6 +293,10 @@ pub async fn gen_test_sstable + Clone + Default + Eq>( TableId::default().table_id(), VirtualNode::COUNT_FOR_TEST, )]); + + let table_id_to_watermark_serde = + HashMap::from_iter(vec![(TableId::default().table_id(), None)]); + let sst_info = gen_test_sstable_impl::<_, Xor16FilterBuilder>( opts, object_id, @@ -290,6 +304,7 @@ pub async fn gen_test_sstable + Clone + Default + Eq>( sstable_store.clone(), CachePolicy::NotFill, table_id_to_vnode, + table_id_to_watermark_serde, ) .await; @@ -310,9 +325,10 @@ pub async fn gen_test_sstable_with_table_ids + Clone + Default + table_ids: Vec, ) -> (TableHolder, SstableInfo) { let table_id_to_vnode = table_ids - .into_iter() - .map(|table_id| (table_id, VirtualNode::COUNT_FOR_TEST)) + .iter() + .map(|table_id| (*table_id, VirtualNode::COUNT_FOR_TEST)) .collect(); + let table_id_to_watermark_serde = table_ids.iter().map(|table_id| (*table_id, None)).collect(); let sst_info = gen_test_sstable_impl::<_, Xor16FilterBuilder>( opts, @@ -321,6 +337,7 @@ pub async fn gen_test_sstable_with_table_ids + Clone + Default + sstable_store.clone(), CachePolicy::NotFill, table_id_to_vnode, + table_id_to_watermark_serde, ) .await; @@ -344,6 +361,10 @@ pub async fn gen_test_sstable_info + Clone + Default + Eq>( TableId::default().table_id(), VirtualNode::COUNT_FOR_TEST, )]); + + let table_id_to_watermark_serde = + HashMap::from_iter(vec![(TableId::default().table_id(), None)]); + gen_test_sstable_impl::<_, BlockedXor16FilterBuilder>( opts, object_id, @@ -351,6 +372,7 @@ pub async fn gen_test_sstable_info + Clone + Default + Eq>( sstable_store, CachePolicy::NotFill, table_id_to_vnode, + table_id_to_watermark_serde, ) .await } @@ -366,6 +388,10 @@ pub async fn gen_test_sstable_with_range_tombstone( TableId::default().table_id(), VirtualNode::COUNT_FOR_TEST, )]); + + let table_id_to_watermark_serde = + HashMap::from_iter(vec![(TableId::default().table_id(), None)]); + gen_test_sstable_impl::<_, Xor16FilterBuilder>( opts, object_id, @@ -373,6 +399,7 @@ pub async fn gen_test_sstable_with_range_tombstone( sstable_store.clone(), CachePolicy::Fill(CacheHint::Normal), table_id_to_vnode, + table_id_to_watermark_serde, ) .await } diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index bd388fc47222b..cc5812ce56d9e 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -971,7 +971,7 @@ impl LocalStateStore for RangeKvLocalStateStore { next_epoch, prev_epoch ); - if let Some((direction, watermarks)) = opts.table_watermarks { + if let Some((direction, watermarks, _watermark_type)) = opts.table_watermarks { let delete_ranges = watermarks .iter() .flat_map(|vnode_watermark| { diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 531ca629d4d65..69cc850b8c314 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -29,7 +29,9 @@ use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::{Epoch, EpochPair}; use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange}; -use risingwave_hummock_sdk::table_watermark::{VnodeWatermark, WatermarkDirection}; +use risingwave_hummock_sdk::table_watermark::{ + VnodeWatermark, WatermarkDirection, WatermarkSerdeType, +}; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_hummock_trace::{ TracedInitOptions, TracedNewLocalOptions, TracedOpConsistencyLevel, TracedPrefetchOptions, @@ -774,25 +776,31 @@ impl From for InitOptions { #[derive(Clone, Debug)] pub struct SealCurrentEpochOptions { - pub table_watermarks: Option<(WatermarkDirection, Vec)>, + pub table_watermarks: Option<(WatermarkDirection, Vec, WatermarkSerdeType)>, pub switch_op_consistency_level: Option, } impl From for TracedSealCurrentEpochOptions { fn from(value: SealCurrentEpochOptions) -> Self { TracedSealCurrentEpochOptions { - table_watermarks: value.table_watermarks.map(|(direction, watermarks)| { - ( - direction == WatermarkDirection::Ascending, - watermarks - .into_iter() - .map(|watermark| { - let pb_watermark = PbVnodeWatermark::from(watermark); - Message::encode_to_vec(&pb_watermark) - }) - .collect(), - ) - }), + table_watermarks: value.table_watermarks.map( + |(direction, watermarks, watermark_type)| { + ( + direction == WatermarkDirection::Ascending, + watermarks + .into_iter() + .map(|watermark| { + let pb_watermark = PbVnodeWatermark::from(watermark); + Message::encode_to_vec(&pb_watermark) + }) + .collect(), + match watermark_type { + WatermarkSerdeType::NonPkPrefix => true, + WatermarkSerdeType::PkPrefix => false, + }, + ) + }, + ), switch_op_consistency_level: value .switch_op_consistency_level .map(|level| matches!(level, OpConsistencyLevel::ConsistentOldValue { .. })), @@ -803,23 +811,30 @@ impl From for TracedSealCurrentEpochOptions { impl From for SealCurrentEpochOptions { fn from(value: TracedSealCurrentEpochOptions) -> SealCurrentEpochOptions { SealCurrentEpochOptions { - table_watermarks: value.table_watermarks.map(|(is_ascending, watermarks)| { - ( - if is_ascending { - WatermarkDirection::Ascending - } else { - WatermarkDirection::Descending - }, - watermarks - .into_iter() - .map(|serialized_watermark| { - Message::decode(serialized_watermark.as_slice()) - .map(|pb: PbVnodeWatermark| VnodeWatermark::from(pb)) - .expect("should not failed") - }) - .collect(), - ) - }), + table_watermarks: value.table_watermarks.map( + |(is_ascending, watermarks, is_non_pk_prefix)| { + ( + if is_ascending { + WatermarkDirection::Ascending + } else { + WatermarkDirection::Descending + }, + watermarks + .into_iter() + .map(|serialized_watermark| { + Message::decode(serialized_watermark.as_slice()) + .map(|pb: PbVnodeWatermark| VnodeWatermark::from(pb)) + .expect("should not failed") + }) + .collect(), + if is_non_pk_prefix { + WatermarkSerdeType::NonPkPrefix + } else { + WatermarkSerdeType::PkPrefix + }, + ) + }, + ), switch_op_consistency_level: value.switch_op_consistency_level.map(|enable| { if enable { OpConsistencyLevel::ConsistentOldValue { diff --git a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs index 4a0b354d322f3..a69cc854a6582 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs @@ -23,7 +23,9 @@ use risingwave_common::hash::VnodeBitmapExt; use risingwave_common::util::epoch::EpochPair; use risingwave_common_estimate_size::EstimateSize; use risingwave_connector::sink::log_store::{LogStoreResult, LogWriter}; -use risingwave_hummock_sdk::table_watermark::{VnodeWatermark, WatermarkDirection}; +use risingwave_hummock_sdk::table_watermark::{ + VnodeWatermark, WatermarkDirection, WatermarkSerdeType, +}; use risingwave_storage::store::{InitOptions, LocalStateStore, SealCurrentEpochOptions}; use tokio::sync::watch; @@ -179,7 +181,11 @@ impl LogWriter for KvLogStoreWriter { self.state_store.seal_current_epoch( next_epoch, SealCurrentEpochOptions { - table_watermarks: Some((WatermarkDirection::Ascending, watermark)), + table_watermarks: Some(( + WatermarkDirection::Ascending, + watermark, + WatermarkSerdeType::PkPrefix, + )), switch_op_consistency_level: None, }, ); diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 5e0b924548b10..26dca0b3b2234 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -42,7 +42,9 @@ use risingwave_hummock_sdk::key::{ end_bound_of_prefix, prefixed_range_with_vnode, start_bound_of_excluded_prefix, CopyFromSlice, TableKey, TableKeyRange, }; -use risingwave_hummock_sdk::table_watermark::{VnodeWatermark, WatermarkDirection}; +use risingwave_hummock_sdk::table_watermark::{ + VnodeWatermark, WatermarkDirection, WatermarkSerdeType, +}; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_pb::catalog::Table; use risingwave_storage::error::{ErrorKind, StorageError, StorageResult}; @@ -158,6 +160,8 @@ pub struct StateTableInner< output_indices: Vec, op_consistency_level: StateTableOpConsistencyLevel, + + clean_watermark_index_in_pk: Option, } /// `StateTable` will use `BasicSerde` as default @@ -441,23 +445,28 @@ where ); // Restore persisted table watermark. - let prefix_deser = if pk_indices.is_empty() { + let watermark_serde = if pk_indices.is_empty() { None } else { - Some(pk_serde.prefix(1)) + match table_catalog.clean_watermark_index_in_pk { + None => Some(pk_serde.index(0)), + Some(clean_watermark_index_in_pk) => { + Some(pk_serde.index(clean_watermark_index_in_pk as usize)) + } + } }; let max_watermark_of_vnodes = distribution .vnodes() .iter_vnodes() .filter_map(|vnode| local_state_store.get_table_watermark(vnode)) .max(); - let committed_watermark = if let Some(deser) = prefix_deser + let committed_watermark = if let Some(deser) = watermark_serde && let Some(max_watermark) = max_watermark_of_vnodes { - let deserialized = deser - .deserialize(&max_watermark) - .ok() - .and_then(|row| row[0].clone()); + let deserialized = deser.deserialize(&max_watermark).ok().and_then(|row| { + assert!(row.len() == 1); + row[0].clone() + }); if deserialized.is_none() { tracing::error!( vnodes = ?distribution.vnodes(), @@ -523,6 +532,7 @@ where output_indices, i2o_mapping, op_consistency_level: state_table_op_consistency_level, + clean_watermark_index_in_pk: table_catalog.clean_watermark_index_in_pk, } } @@ -1035,7 +1045,8 @@ where for entry in merged_stream.take(self.watermark_cache.capacity()) { let keyed_row = entry?; let pk = self.pk_serde.deserialize(keyed_row.key())?; - if !pk.is_null_at(0) { + // watermark column should be part of the pk + if !pk.is_null_at(self.clean_watermark_index_in_pk.unwrap_or(0) as usize) { pks.push(pk); } } @@ -1058,16 +1069,31 @@ where } /// Commit pending watermark and return vnode bitmap-watermark pairs to seal. - fn commit_pending_watermark(&mut self) -> Option<(WatermarkDirection, Vec)> { + fn commit_pending_watermark( + &mut self, + ) -> Option<(WatermarkDirection, Vec, WatermarkSerdeType)> { let watermark = self.pending_watermark.take(); watermark.as_ref().inspect(|watermark| { trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning"); }); - let prefix_serializer = if self.pk_indices().is_empty() { + let watermark_serializer = if self.pk_indices().is_empty() { None } else { - Some(self.pk_serde.prefix(1)) + match self.clean_watermark_index_in_pk { + None => Some(self.pk_serde.index(0)), + Some(clean_watermark_index_in_pk) => { + Some(self.pk_serde.index(clean_watermark_index_in_pk as usize)) + } + } + }; + + let watermark_type = match self.clean_watermark_index_in_pk { + None => WatermarkSerdeType::PkPrefix, + Some(clean_watermark_index_in_pk) => match clean_watermark_index_in_pk { + 0 => WatermarkSerdeType::PkPrefix, + _ => WatermarkSerdeType::NonPkPrefix, + }, }; let should_clean_watermark = match watermark { @@ -1095,31 +1121,34 @@ where let watermark_suffix = watermark.as_ref().map(|watermark| { serialize_pk( row::once(Some(watermark.clone())), - prefix_serializer.as_ref().unwrap(), + watermark_serializer.as_ref().unwrap(), ) }); - let mut seal_watermark: Option<(WatermarkDirection, VnodeWatermark)> = None; + let mut seal_watermark: Option<(WatermarkDirection, VnodeWatermark, WatermarkSerdeType)> = + None; // Compute Delete Ranges if should_clean_watermark && let Some(watermark_suffix) = watermark_suffix { trace!(table_id = %self.table_id, watermark = ?watermark_suffix, vnodes = ?{ self.vnodes().iter_vnodes().collect_vec() }, "delete range"); - if prefix_serializer + + let order_type = watermark_serializer .as_ref() .unwrap() .get_order_types() - .first() - .unwrap() - .is_ascending() - { + .get(0) + .unwrap(); + + if order_type.is_ascending() { seal_watermark = Some(( WatermarkDirection::Ascending, VnodeWatermark::new( self.vnodes().clone(), Bytes::copy_from_slice(watermark_suffix.as_ref()), ), + watermark_type, )); } else { seal_watermark = Some(( @@ -1128,6 +1157,7 @@ where self.vnodes().clone(), Bytes::copy_from_slice(watermark_suffix.as_ref()), ), + watermark_type, )); } } @@ -1144,7 +1174,9 @@ where self.watermark_cache.clear(); } - seal_watermark.map(|(direction, watermark)| (direction, vec![watermark])) + seal_watermark.map(|(direction, watermark, is_non_pk_prefix)| { + (direction, vec![watermark], is_non_pk_prefix) + }) } pub async fn try_flush(&mut self) -> StreamExecutorResult<()> { diff --git a/src/stream/src/common/table/test_state_table.rs b/src/stream/src/common/table/test_state_table.rs index df64d18354b33..17b9d43e9c3a4 100644 --- a/src/stream/src/common/table/test_state_table.rs +++ b/src/stream/src/common/table/test_state_table.rs @@ -20,7 +20,7 @@ use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId}; use risingwave_common::row::{self, OwnedRow}; -use risingwave_common::types::{DataType, Scalar, Timestamptz}; +use risingwave_common::types::{DataType, Scalar, ScalarImpl, Timestamptz}; use risingwave_common::util::epoch::{test_epoch, EpochPair}; use risingwave_common::util::sort_util::OrderType; use risingwave_common::util::value_encoding::BasicSerde; @@ -2097,3 +2097,133 @@ async fn test_replicated_state_table_replication() { ); } } + +#[tokio::test] +async fn test_non_pk_prefix_watermark_read() { + // Define the base table to replicate + const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; + let order_types = vec![OrderType::ascending(), OrderType::ascending()]; + let column_ids = [ColumnId::from(0), ColumnId::from(1), ColumnId::from(2)]; + let column_descs = vec![ + ColumnDesc::unnamed(column_ids[0], DataType::Int32), + ColumnDesc::unnamed(column_ids[1], DataType::Int32), + ColumnDesc::unnamed(column_ids[2], DataType::Int32), + ]; + let pk_indices = vec![0_usize, 1_usize]; + let read_prefix_len_hint = 1; + let mut table = gen_pbtable( + TEST_TABLE_ID, + column_descs, + order_types, + pk_indices, + read_prefix_len_hint, + ); + + // non-pk-prefix watermark + let watermark_col_idx = 1; + table.watermark_indices = vec![watermark_col_idx]; + table.clean_watermark_index_in_pk = Some(1); + let test_env = prepare_hummock_test_env().await; + test_env.register_table(table.clone()).await; + + // Create the base state table + let mut state_table: crate::common::table::state_table::StateTableInner = + StateTable::from_table_catalog_inconsistent_op(&table, test_env.storage.clone(), None) + .await; + + let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); + state_table.init_epoch(epoch).await.unwrap(); + + // Insert first record into base state table + let r1 = OwnedRow::new(vec![ + Some(0_i32.into()), + Some(1_i32.into()), + Some(1_i32.into()), + ]); + state_table.insert(r1.clone()); + + let r2 = OwnedRow::new(vec![ + Some(0_i32.into()), + Some(2_i32.into()), + Some(2_i32.into()), + ]); + state_table.insert(r2.clone()); + + let r3 = OwnedRow::new(vec![ + Some(0_i32.into()), + Some(3_i32.into()), + Some(3_i32.into()), + ]); + state_table.insert(r3.clone()); + + epoch.inc_for_test(); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); + state_table.commit(epoch).await.unwrap(); + test_env.commit_epoch(epoch.prev).await; + + { + // test read + let item_1 = state_table + .get_row(OwnedRow::new(vec![Some(0_i32.into()), Some(1_i32.into())])) + .await + .unwrap() + .unwrap(); + assert_eq!(r1, item_1); + + let item_2 = state_table + .get_row(OwnedRow::new(vec![Some(0_i32.into()), Some(2_i32.into())])) + .await + .unwrap() + .unwrap(); + + assert_eq!(r2, item_2); + + let item_3 = state_table + .get_row(OwnedRow::new(vec![Some(0_i32.into()), Some(3_i32.into())])) + .await + .unwrap() + .unwrap(); + + assert_eq!(r3, item_3); + } + + { + // update watermark + let watermark = ScalarImpl::Int32(1); + state_table.update_watermark(watermark); + + epoch.inc_for_test(); + test_env + .storage + .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); + state_table.commit(epoch).await.unwrap(); + test_env.commit_epoch(epoch.prev).await; + + // do not rewrite key-range or filter data for non-pk-prefix watermark + let item_1 = state_table + .get_row(OwnedRow::new(vec![Some(0_i32.into()), Some(1_i32.into())])) + .await + .unwrap() + .unwrap(); + assert_eq!(r1, item_1); + + let item_2 = state_table + .get_row(OwnedRow::new(vec![Some(0_i32.into()), Some(2_i32.into())])) + .await + .unwrap() + .unwrap(); + assert_eq!(r2, item_2); + + let item_3 = state_table + .get_row(OwnedRow::new(vec![Some(0_i32.into()), Some(3_i32.into())])) + .await + .unwrap() + .unwrap(); + assert_eq!(r3, item_3); + } +}