-
Notifications
You must be signed in to change notification settings - Fork 596
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(subscription): Support specified pk read log store #19274
base: main
Are you sure you want to change the base?
Conversation
d53bf7a
to
7f4e0cd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feat is not used in cursor manager currently and there is no test on it. How do we verify that it is working as expected?
src/expr/impl/src/scalar/cast.rs
Outdated
@@ -47,6 +47,15 @@ where | |||
}) | |||
} | |||
|
|||
#[function("cast(varchar) -> struct", type_infer = "unreachable")] | |||
pub fn str_parse_struct(elem: &str, ctx: &Context) -> Result<StructValue> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change relevant to this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is to enable recovery from a single point, his logic = select * from table where (pk1,pk2,pk3)>(1,1,1);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some early comments.
@@ -740,6 +742,52 @@ impl SubscriptionCursor { | |||
Ok(row) | |||
} | |||
|
|||
pub fn process_output_desc_row( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that the Field
and the row structure (i.e. which columns in which idx) are fixed, there are several things can be simplified in this method:
- We can use
row.project(<pk_col_idx_in_row>)
to get the pk instead of doing the filter_map and map checking every time. - We only need to calculate the
pk_fields
once and cache it in SubscriptionCursor instead of doing it every time when we get rows.
@@ -770,33 +818,116 @@ impl SubscriptionCursor { | |||
} | |||
}) | |||
.collect::<Vec<_>>(); | |||
let output_col_idx_with_out_hidden = output_col_idx | |||
let max_split_range_gap = context.session_ctx().config().max_split_range_gap() as u64; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this? I think we only have one range here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The call to the split_to_scan_ranges method requires that, here, it won't work
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
In this pr we will save the pk consumed by the cursor every time, and support the specified pk consumption log, this but this pr does not use them at present, we will open the cursor recovery in the next pr
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.