Skip to content

Commit

Permalink
fix(over window): fix table iter pk prefix to use deduped partition k…
Browse files Browse the repository at this point in the history
…ey (#19081)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Oct 24, 2024
1 parent bdfd892 commit 73a3d97
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 26 deletions.
18 changes: 14 additions & 4 deletions src/stream/src/executor/over_window/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ struct ExecutorInner<S: StateStore> {

schema: Schema,
calls: Vec<WindowFuncCall>,
partition_key_indices: Vec<usize>,
deduped_part_key_indices: Vec<usize>,
order_key_indices: Vec<usize>,
order_key_data_types: Vec<DataType>,
order_key_order_types: Vec<OrderType>,
Expand Down Expand Up @@ -88,9 +88,10 @@ impl<S: StateStore> Execute for OverWindowExecutor<S> {
}

impl<S: StateStore> ExecutorInner<S> {
/// Get deduplicated partition key from a full row, which happened to be the prefix of table PK.
fn get_partition_key(&self, full_row: impl Row) -> OwnedRow {
full_row
.project(&self.partition_key_indices)
.project(&self.deduped_part_key_indices)
.into_owned_row()
}

Expand Down Expand Up @@ -162,13 +163,22 @@ impl<S: StateStore> OverWindowExecutor<S> {
&input_info.pk_indices,
);

let deduped_part_key_indices = {
let mut dedup = HashSet::new();
args.partition_key_indices
.iter()
.filter(|i| dedup.insert(**i))
.copied()
.collect()
};

Self {
input: args.input,
inner: ExecutorInner {
actor_ctx: args.actor_ctx,
schema: args.schema,
calls: args.calls,
partition_key_indices: args.partition_key_indices,
deduped_part_key_indices,
order_key_indices: args.order_key_indices,
order_key_data_types,
order_key_order_types: args.order_key_order_types,
Expand Down Expand Up @@ -262,7 +272,7 @@ impl<S: StateStore> OverWindowExecutor<S> {
chunk: StreamChunk,
metrics: &'a OverWindowMetrics,
) {
// partition key => changes happened in the partition.
// (deduped) partition key => changes happened in the partition.
let mut deltas: BTreeMap<DefaultOrdered<OwnedRow>, PartitionDelta> = BTreeMap::new();
// input pk of update records of which the order key is changed.
let mut key_change_updated_pks = HashSet::new();
Expand Down
40 changes: 18 additions & 22 deletions src/stream/src/executor/over_window/over_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ const MAGIC_CACHE_SIZE: usize = 1024;
const MAGIC_JITTER_PREVENTION: usize = MAGIC_CACHE_SIZE / 8;

pub(super) fn shrink_partition_cache(
this_partition_key: &OwnedRow,
deduped_part_key: &OwnedRow,
range_cache: &mut PartitionCache,
cache_policy: CachePolicy,
recently_accessed_range: RangeInclusive<StateKey>,
) {
tracing::trace!(
this_partition_key=?this_partition_key,
partition=?deduped_part_key,
cache_policy=?cache_policy,
recently_accessed_range=?recently_accessed_range,
"find the range to retain in the range cache"
Expand Down Expand Up @@ -218,7 +218,7 @@ pub(super) fn shrink_partition_cache(
};

tracing::trace!(
this_partition_key=?this_partition_key,
partition=?deduped_part_key,
retain_range=?(&start..=&end),
"retain range in the range cache"
);
Expand Down Expand Up @@ -290,7 +290,7 @@ impl<'a> AffectedRange<'a> {
/// By putting this type inside `private` module, we can avoid misuse of the internal fields and
/// methods.
pub(super) struct OverPartition<'a, S: StateStore> {
this_partition_key: &'a OwnedRow,
deduped_part_key: &'a OwnedRow,
range_cache: &'a mut PartitionCache,
cache_policy: CachePolicy,

Expand All @@ -312,7 +312,7 @@ const MAGIC_BATCH_SIZE: usize = 512;
impl<'a, S: StateStore> OverPartition<'a, S> {
#[allow(clippy::too_many_arguments)]
pub fn new(
this_partition_key: &'a OwnedRow,
deduped_part_key: &'a OwnedRow,
cache: &'a mut PartitionCache,
cache_policy: CachePolicy,
calls: &'a [WindowFuncCall],
Expand All @@ -337,7 +337,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
.any(|call| call.frame.bounds.end_is_unbounded());

Self {
this_partition_key,
deduped_part_key,
range_cache: cache,
cache_policy,

Expand Down Expand Up @@ -658,17 +658,17 @@ impl<'a, S: StateStore> OverPartition<'a, S> {

if need_extend_leftward {
self.stats.left_miss_count += 1;
tracing::trace!(partition=?self.this_partition_key, "partition cache left extension triggered");
tracing::trace!(partition=?self.deduped_part_key, "partition cache left extension triggered");
let left_most = self.cache_real_first_key().unwrap_or(delta_first).clone();
self.extend_cache_leftward_by_n(table, &left_most).await?;
}
if need_extend_rightward {
self.stats.right_miss_count += 1;
tracing::trace!(partition=?self.this_partition_key, "partition cache right extension triggered");
tracing::trace!(partition=?self.deduped_part_key, "partition cache right extension triggered");
let right_most = self.cache_real_last_key().unwrap_or(delta_last).clone();
self.extend_cache_rightward_by_n(table, &right_most).await?;
}
tracing::trace!(partition=?self.this_partition_key, "partition cache extended");
tracing::trace!(partition=?self.deduped_part_key, "partition cache extended");
}
}

Expand Down Expand Up @@ -925,16 +925,12 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
return Ok(());
}

tracing::trace!(partition=?self.this_partition_key, "loading the whole partition into cache");
tracing::trace!(partition=?self.deduped_part_key, "loading the whole partition into cache");

let mut new_cache = PartitionCache::new(); // shouldn't use `new_empty_partition_cache` here because we don't want sentinels
let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Bound::Unbounded, Bound::Unbounded);
let table_iter = table
.iter_with_prefix(
self.this_partition_key,
sub_range,
PrefetchOptions::default(),
)
.iter_with_prefix(self.deduped_part_key, sub_range, PrefetchOptions::default())
.await?;

#[for_await]
Expand Down Expand Up @@ -969,7 +965,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
{
// completely not overlapping, for the sake of simplicity, we re-init the cache
tracing::debug!(
partition=?self.this_partition_key,
partition=?self.deduped_part_key,
cache_first=?cache_real_first_key,
cache_last=?cache_real_last_key,
range=?range,
Expand All @@ -985,7 +981,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
Bound::Included(self.row_conv.state_key_to_table_sub_pk(range.end())?),
);
tracing::debug!(
partition=?self.this_partition_key,
partition=?self.deduped_part_key,
table_sub_range=?table_sub_range,
"cache is empty, just loading the given range"
);
Expand All @@ -1007,7 +1003,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
),
);
tracing::trace!(
partition=?self.this_partition_key,
partition=?self.deduped_part_key,
table_sub_range=?table_sub_range,
"loading the left half of given range"
);
Expand All @@ -1026,7 +1022,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
Bound::Included(self.row_conv.state_key_to_table_sub_pk(range.end())?),
);
tracing::trace!(
partition=?self.this_partition_key,
partition=?self.deduped_part_key,
table_sub_range=?table_sub_range,
"loading the right half of given range"
);
Expand Down Expand Up @@ -1139,7 +1135,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
) -> StreamExecutorResult<()> {
let stream = table
.iter_with_prefix(
self.this_partition_key,
self.deduped_part_key,
&table_sub_range,
PrefetchOptions::default(),
)
Expand Down Expand Up @@ -1171,7 +1167,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
);
let rev_stream = table
.rev_iter_with_prefix(
self.this_partition_key,
self.deduped_part_key,
&sub_range,
PrefetchOptions::default(),
)
Expand Down Expand Up @@ -1215,7 +1211,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
);
let stream = table
.iter_with_prefix(
self.this_partition_key,
self.deduped_part_key,
&sub_range,
PrefetchOptions::default(),
)
Expand Down

0 comments on commit 73a3d97

Please sign in to comment.