-
Notifications
You must be signed in to change notification settings - Fork 598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(source): pause source correctly #19148
Conversation
@@ -564,9 +567,14 @@ impl<S: StateStore> SourceExecutor<S> { | |||
|
|||
if let Some(mutation) = barrier.mutation.as_deref() { | |||
match mutation { | |||
// XXX: Is it possible that the stream is self_paused, and we have pause mutation now? In this case, it will panic. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pause_stream and resume_stream have been modified to warn instead of panic when seeing unexpected pause/resume.
@@ -548,8 +549,10 @@ impl<S: StateStore> SourceExecutor<S> { | |||
last_barrier_time = Instant::now(); | |||
|
|||
if self_paused { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two other places we use self_paused
:
It is better to fix them as well.
self_paused = false; | ||
} | ||
|
||
let mut split_changed = false; | ||
if let Some(ref mutation) = barrier.mutation.as_deref() { | ||
match mutation { | ||
Mutation::Pause => { | ||
command_paused = true; | ||
pause_reader!(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should also ensure pause_reader!()
should only be called when only one of command_paused
and self_paused
is false. Otherwise, the right arm of the backfill stream will be lost. See pause_reader implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If source backfilling is expected to strictly adhere to the barrier pause, it must pause here.
I'll look into pause_reader!
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got your idea. Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the fix.
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Unlike other executor that only pause/resume based on command of barrier, the source executor additionally pauses/resumes based on self_paused. However, there's a corner case where this
self_paused
method will resume the source executor incorrectly:self_paused
pauses the source executor.self_paused
to resume the source executor, incorrectly. The source executor is expected to be resumed by the following [resume] barrier.This PR fixes this issue.
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.