Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): support non_pk_prefix_watermark state cleaning #19889

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
605f235
feat(storage): basic of non_pk_watermark state clean
Li0k Dec 23, 2024
501d374
feat(storage): ignore non_pk_prefix_watermark compaction
Li0k Dec 23, 2024
3544c0e
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Dec 23, 2024
d1a39a8
fix ut
Li0k Dec 23, 2024
7c3f521
fix panic
Li0k Dec 23, 2024
e3dbc73
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Dec 23, 2024
b71eff9
refactor(storage): refactor watermark type
Li0k Dec 25, 2024
9e0af8e
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Dec 25, 2024
74336d6
typo
Li0k Dec 25, 2024
96de9ba
fix(storage): fix wateramrk_col_idx_in_pk
Li0k Dec 25, 2024
3127678
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Dec 25, 2024
49a48ad
fix check
Li0k Dec 25, 2024
bb7a29b
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Dec 27, 2024
6b0b295
refactor
Li0k Dec 27, 2024
3c23aa3
typo
Li0k Dec 27, 2024
3113463
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Dec 27, 2024
b2e158e
fix panic
Li0k Dec 30, 2024
fd308de
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Dec 30, 2024
bf28307
typo
Li0k Dec 30, 2024
369d718
typo
Li0k Dec 30, 2024
3500061
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Dec 30, 2024
ef4c752
address comments
Li0k Jan 8, 2025
5ed4920
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Jan 8, 2025
8130c61
refactor
Li0k Jan 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ message TableWatermarks {

// The direction of the table watermark.
bool is_ascending = 2;

// The table watermark is non-pk prefix table watermark.
bool is_non_pk_prefix = 3;
}

message EpochNewChangeLog {
Expand Down
12 changes: 12 additions & 0 deletions src/common/src/util/row_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ impl OrderedRowSerde {
}
}

#[must_use]
pub fn index(&self, idx: usize) -> Cow<'_, Self> {
if 1 == self.order_types.len() {
Cow::Borrowed(self)
} else {
Cow::Owned(Self {
schema: vec![self.schema[idx].clone()],
order_types: vec![self.order_types[idx]],
})
}
}

/// Note: prefer [`Row::memcmp_serialize`] if possible.
pub fn serialize(&self, row: impl Row, append_to: impl BufMut) {
self.serialize_datums(row.iter(), append_to)
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ impl CompactStatus {
pub fn is_trivial_reclaim(task: &CompactTask) -> bool {
// Currently all VnodeWatermark tasks are trivial reclaim.
if task.task_type == TaskType::VnodeWatermark {
assert!(task.input_ssts.len() == 2);
assert!(task.input_ssts[1].table_infos.is_empty());
return true;
}
let exist_table_ids = HashSet::<u32>::from_iter(task.existing_table_ids.clone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ fn should_delete_key_by_watermark(
let Some(w) = watermark.vnode_watermarks.get(&vnode) else {
return false;
};
watermark.direction.filter_by_watermark(key, w)
watermark.direction.key_filter_by_watermark(key, w)
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ fn safe_epoch_read_table_watermarks(
state_table_info: &HummockVersionStateTableInfo,
member_table_ids: &BTreeSet<TableId>,
) -> BTreeMap<TableId, ReadTableWatermark> {
safe_epoch_read_table_watermarks_impl(&safe_epoch_table_watermarks_impl(
safe_epoch_read_table_watermarks_impl(safe_epoch_table_watermarks_impl(
table_watermarks,
state_table_info,
&member_table_ids
Expand Down
19 changes: 18 additions & 1 deletion src/meta/src/hummock/manager/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::table_stats::{
add_prost_table_stats_map, purge_prost_table_stats, PbTableStatsMap,
};
use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
use risingwave_hummock_sdk::version::{GroupDelta, IntraLevelDelta};
use risingwave_hummock_sdk::{
compact_task_to_string, statistics_compact_task, CompactionGroupId, HummockCompactionTaskId,
Expand Down Expand Up @@ -728,6 +729,22 @@ impl HummockManager {
}
}

let table_watermarks = version
.latest_version()
.table_watermarks
.iter()
.filter_map(|(table_id, table_watermarks)| {
if matches!(
table_watermarks.watermark_type,
WatermarkSerdeType::PkPrefix,
) {
Some((*table_id, table_watermarks.clone()))
} else {
None
}
})
.collect();
Comment on lines +732 to +746
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually why don't we do the filtering inside the picker instead like in here if the watermark type is part of TableWatermarks:

We can avoid cloning the table watermark, which can be large given that it stores bytes from user data, with no harm.


while let Some(compact_task) = compact_status.get_compact_task(
version
.latest_version()
Expand All @@ -742,7 +759,7 @@ impl HummockManager {
selector,
&table_id_to_option,
developer_config.clone(),
&version.latest_version().table_watermarks,
&table_watermarks,
&version.latest_version().state_table_info,
) {
let target_level_id = compact_task.input.target_level as u32;
Expand Down
5 changes: 5 additions & 0 deletions src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,13 @@ async fn build_table(
},
);
let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
let mut builder = SstableBuilder::<_, Xor16FilterBuilder>::for_test(
sstable_object_id,
writer,
opt,
table_id_to_vnode,
table_id_to_watermark_serde,
);
let value = b"1234567890123456789";
let mut full_key = test_key_of(0, epoch, TableId::new(0));
Expand Down Expand Up @@ -186,11 +188,14 @@ async fn build_table_2(
);

let table_id_to_vnode = HashMap::from_iter(vec![(table_id, VirtualNode::COUNT_FOR_TEST)]);
let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);

let mut builder = SstableBuilder::<_, Xor16FilterBuilder>::for_test(
sstable_object_id,
writer,
opt,
table_id_to_vnode,
table_id_to_watermark_serde,
);
let mut full_key = test_key_of(0, epoch, TableId::new(table_id));
let table_key_len = full_key.user_key.table_key.len();
Expand Down
23 changes: 18 additions & 5 deletions src/storage/benches/bench_merge_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@

use std::cell::RefCell;
use std::collections::BTreeMap;
use std::sync::Arc;

use bytes::Bytes;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use futures::executor::block_on;
use risingwave_hummock_sdk::key::TableKey;
use risingwave_storage::compaction_catalog_manager::CompactionCatalogAgent;
use risingwave_storage::hummock::iterator::{
Forward, HummockIterator, HummockIteratorUnion, MergeIterator, SkipWatermarkIterator,
Forward, HummockIterator, HummockIteratorUnion, MergeIterator,
NonPkPrefixSkipWatermarkIterator, SkipWatermarkIterator,
};
use risingwave_storage::hummock::shared_buffer::shared_buffer_batch::{
SharedBufferBatch, SharedBufferBatchIterator, SharedBufferValue,
Expand Down Expand Up @@ -108,10 +111,20 @@ fn criterion_benchmark(c: &mut Criterion) {
},
);

let merge_iter = RefCell::new(SkipWatermarkIterator::new(
MergeIterator::new(gen_interleave_shared_buffer_batch_iter(10000, 100)),
BTreeMap::new(),
));
let combine_iter = {
let iter = SkipWatermarkIterator::new(
MergeIterator::new(gen_interleave_shared_buffer_batch_iter(10000, 100)),
BTreeMap::new(),
);

NonPkPrefixSkipWatermarkIterator::new(
iter,
BTreeMap::new(),
Arc::new(CompactionCatalogAgent::dummy()),
)
};

let merge_iter = RefCell::new(combine_iter);
c.bench_with_input(
BenchmarkId::new("bench-merge-iter-skip-empty-watermark", "unordered"),
&merge_iter,
Expand Down
11 changes: 10 additions & 1 deletion src/storage/benches/bench_multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,16 @@ impl<F: SstableWriterFactory> TableBuilderFactory for LocalTableBuilderFactory<F
TableId::default().into(),
VirtualNode::COUNT_FOR_TEST,
)]);
let builder = SstableBuilder::for_test(id, writer, self.options.clone(), table_id_to_vnode);

let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);

let builder = SstableBuilder::for_test(
id,
writer,
self.options.clone(),
table_id_to_vnode,
table_id_to_watermark_serde,
);

Ok(builder)
}
Expand Down
3 changes: 2 additions & 1 deletion src/storage/benches/bench_table_watermarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_common::hash::VirtualNode;
use risingwave_common::util::epoch::test_epoch;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::table_watermark::{
TableWatermarks, TableWatermarksIndex, VnodeWatermark, WatermarkDirection,
TableWatermarks, TableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
};
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionStateTableInfo};
use risingwave_hummock_sdk::HummockEpoch;
Expand Down Expand Up @@ -101,6 +101,7 @@ fn gen_committed_table_watermarks(
})
.collect(),
direction: WatermarkDirection::Ascending,
watermark_type: WatermarkSerdeType::PkPrefix,
}
}

Expand Down
45 changes: 44 additions & 1 deletion src/storage/hummock_sdk/src/compact_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::mem::size_of;

use itertools::Itertools;
Expand All @@ -22,6 +22,7 @@ use risingwave_pb::hummock::{
PbCompactTask, PbKeyRange, PbTableOption, PbTableSchema, PbTableStats, PbValidationTask,
};

use crate::compaction_group::StateTableId;
use crate::key_range::KeyRange;
use crate::level::InputLevel;
use crate::sstable_info::SstableInfo;
Expand Down Expand Up @@ -114,6 +115,48 @@ impl CompactTask {
}
}

impl CompactTask {
// The compact task may need to reclaim key with TTL
pub fn contains_ttl(&self) -> bool {
self.table_options
.iter()
.any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0))
}

// The compact task may need to reclaim key with range tombstone
pub fn contains_range_tombstone(&self) -> bool {
self.input_ssts
.iter()
.flat_map(|level| level.table_infos.iter())
.any(|sst| sst.range_tombstone_count > 0)
}

// The compact task may need to reclaim key with split sst
pub fn contains_split_sst(&self) -> bool {
self.input_ssts
.iter()
.flat_map(|level| level.table_infos.iter())
.any(|sst| sst.sst_id != sst.object_id)
}

pub fn get_table_ids_from_input_ssts(&self) -> impl Iterator<Item = StateTableId> {
self.input_ssts
.iter()
.flat_map(|level| level.table_infos.iter())
.flat_map(|sst| sst.table_ids.clone())
.sorted()
.unique()
}

// filter the table-id that in existing_table_ids with the table-id in compact-task
pub fn build_compact_table_ids(&self) -> Vec<StateTableId> {
let existing_table_ids: HashSet<u32> = HashSet::from_iter(self.existing_table_ids.clone());
self.get_table_ids_from_input_ssts()
.filter(|table_id| existing_table_ids.contains(table_id))
.collect()
}
}

impl From<PbCompactTask> for CompactTask {
#[expect(deprecated)]
fn from(pb_compact_task: PbCompactTask) -> Self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ pub fn safe_epoch_table_watermarks_impl(
Some(TableWatermarks {
watermarks: vec![(*first_epoch, first_epoch_watermark.clone())],
direction: table_watermarks.direction,
watermark_type: table_watermarks.watermark_type,
})
} else {
None
Expand All @@ -156,10 +157,10 @@ pub fn safe_epoch_table_watermarks_impl(
}

pub fn safe_epoch_read_table_watermarks_impl(
safe_epoch_watermarks: &BTreeMap<u32, TableWatermarks>,
safe_epoch_watermarks: BTreeMap<u32, TableWatermarks>,
) -> BTreeMap<TableId, ReadTableWatermark> {
safe_epoch_watermarks
.iter()
.into_iter()
.map(|(table_id, watermarks)| {
assert_eq!(watermarks.watermarks.len(), 1);
let vnode_watermarks = &watermarks.watermarks.first().expect("should exist").1;
Expand All @@ -177,7 +178,7 @@ pub fn safe_epoch_read_table_watermarks_impl(
}
}
(
TableId::from(*table_id),
TableId::from(table_id),
ReadTableWatermark {
direction: watermarks.direction,
vnode_watermarks: vnode_watermark_map,
Expand Down
Loading
Loading