-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
storage/rangefeed: per registration buffer #33557
storage/rangefeed: per registration buffer #33557
Conversation
792e3b9
to
545bd29
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aside from a few small questions, this looks really good! Have you tested it with the roachtests yet?
Reviewed 7 of 7 files at r1.
Reviewable status: complete! 0 of 0 LGTMs obtained
pkg/storage/rangefeed/processor.go, line 372 at r1 (raw file):
case p.regC <- r: case <-p.stoppedC: if catchUpIter != nil {
Why did this change? Will this cause us to leak iterators if the Processor is stopped?
pkg/storage/rangefeed/registry.go, line 71 at r1 (raw file):
mu struct { *syncutil.Mutex
sync.Locker
Or pass a pointer to a registration
everywhere and don't make this pointer to a mutex.
pkg/storage/rangefeed/registry.go, line 150 at r1 (raw file):
// 1. If a catch-up scan is indicated, run one before beginning the proper // output loop. // 2. After catch-up is complete, Begin reading from the registration buffer
s/Begin/begin/
pkg/storage/rangefeed/registry.go, line 187 at r1 (raw file):
case <-ctx.Done(): return ctx.Err() case <-r.stream.Context().Done():
👍 I'm happy that we can do this instead of polling all stream contexts.
pkg/storage/rangefeed/registry.go, line 193 at r1 (raw file):
} func (r *registration) runOutputLoop(ctx context.Context, doneCb func(r *registration)) {
If this is executing synchronously then why take a callback instead of just running the disconnect code after this returns?
pkg/storage/rangefeed/registry.go, line 206 at r1 (raw file):
// runCatchupScan starts a catchup scan which will output entries for all // recorded changes in the replica that are newer than the catchupTimeStamp. // This creates a single engine iterator immediately when this method
Is this sentence true?
pkg/storage/rangefeed/registry.go, line 274 at r1 (raw file):
} func (r *registration) waitForCaughtUp() error {
This is just used in tests, right? Can we move it to a _test.go
file so that it isn't part of the registration
type's method set outside of tests?
pkg/storage/rangefeed/registry.go, line 432 at r1 (raw file):
} func (reg *registry) waitForCaughtUp() error {
Same question about moving this to a _test.go
file.
545bd29
to
44862d0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TFTR! Have not run these in roachtests yet, but we've only recently started running roachtests with rangefeed enabled at all.
Reviewable status: complete! 0 of 0 LGTMs obtained
pkg/storage/rangefeed/processor.go, line 372 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
Why did this change? Will this cause us to leak iterators if the Processor is stopped?
I'm pretty sure it would, responsibility for closing the catchupIter was with the registration, at this point it gets leaked if the processor closes while registration is in progress.
I'm not sure anymore if this is one of the leaked routines I saw in a test explicitly, I think I just caught it visually.
pkg/storage/rangefeed/registry.go, line 71 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
sync.Locker
Or pass a pointer to a
registration
everywhere and don't make this pointer to a mutex.
Done, used Locker.
pkg/storage/rangefeed/registry.go, line 150 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
s/Begin/begin/
Done.
pkg/storage/rangefeed/registry.go, line 187 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
👍 I'm happy that we can do this instead of polling all stream contexts.
Agreed, It's quite a bit easier to understand it from the perspective of a single registration, even with the elaborate shutdown dance for the dedicated goroutine.
pkg/storage/rangefeed/registry.go, line 193 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
If this is executing synchronously then why take a callback instead of just running the disconnect code after this returns?
Ah, good eye. I think I originally had this being directly run as the async task, but then I created that outer task in the processor which could just be doing this itself, but then I missed the shortcut.
pkg/storage/rangefeed/registry.go, line 206 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
Is this sentence true?
Nope, made some significant changes here and forgot to update the comment. Fixed.
pkg/storage/rangefeed/registry.go, line 274 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
This is just used in tests, right? Can we move it to a
_test.go
file so that it isn't part of theregistration
type's method set outside of tests?
Done.
pkg/storage/rangefeed/registry.go, line 432 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
Same question about moving this to a
_test.go
file.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still can't say I'm an expert on this code -- I suspect I'll have to make a modification or two myself before I'm 100% comfortable with it -- but everything here makes sense to me. Given that nathan seems to have signed off on the high-level and he's away until after you leave, I'd say feel free to merge this when you're happy with it.
Reviewable status: complete! 1 of 0 LGTMs obtained
pkg/storage/rangefeed/processor.go, line 329 at r1 (raw file):
// provided an error when the registration closes. // // The provided engine is used to instantiate "catch-up" iterators for this
nit: it's back to an iterator
pkg/storage/rangefeed/registry.go, line 156 at r2 (raw file):
func (r *registration) outputLoop(ctx context.Context) error { // If the registration has a catch-up scan, if !r.catchupTimestamp.IsEmpty() {
it seems more natural to me for this to be triggered by a non-nil catchupIter
44862d0
to
b0c5eea
Compare
Modifies the rangefeed package so that each individual registration maintains an output buffer which is processed by a dedicated goroutine. This eliminates the possibility of a slow consumer blocking the processor from making progress. Previously, writing events to registrations was synchronous, meaning that a single slow consumer could cause the publishing process to block. If publishing was blocked for too long, the processor along with all registrations were immediately torn down. With this change, a slow consumer will no longer block its siblings; if a single consumer blocks for too long, it will enter an "overflow" state and will be torn down as soon as its existing buffer is fully processed. The registry/registration interface has been modified fairly significantly to accomodate this; the processor itself has seen modest modification (mostly moving functionality to the registry level and removing no-longer-needed support for the previous functionality). Some of the processor tests have been modified to account for the now asynchronous nature of publishing events to individual streams. Fixes cockroachdb#32945 Release note: None
b0c5eea
to
6bf35f1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to change the way that "waitForCatchup" works in order to avoid a race condition; that functionality now runs through the processor. As a result, the waitForCatchup() methods cannot be moved to the _test package, so I have clearly marked them for test-only usage in other ways.
Reviewable status: complete! 0 of 0 LGTMs obtained (and 1 stale)
pkg/storage/rangefeed/processor.go, line 329 at r1 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
nit: it's back to an iterator
Done.
pkg/storage/rangefeed/registry.go, line 156 at r2 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
it seems more natural to me for this to be triggered by a non-nil
catchupIter
Done.
bors r=danhhz |
Build failed (retrying...) |
33557: storage/rangefeed: per registration buffer r=danhhz a=mrtracy Modifies the rangefeed package so that each individual registration maintains an output buffer which is processed by a dedicated goroutine. This eliminates the possibility of a slow consumer blocking the processor from making progress. Previously, writing events to registrations was synchronous, meaning that a single slow consumer could cause the publishing process to block. If publishing was blocked for too long, the processor along with all registrations were immediately torn down. With this change, a slow consumer will no longer block its siblings; if a single consumer blocks for too long, it will enter an "overflow" state and will be torn down as soon as its existing buffer is fully processed. The registry/registration interface has been modified fairly significantly to accomodate this; the processor itself has seen modest modification (mostly moving functionality to the registry level and removing no-longer-needed support for the previous functionality). Some of the processor tests have been modified to account for the now asynchronous nature of publishing events to individual streams. Fixes #32945 Release note: None Co-authored-by: Matt Tracy <[email protected]>
Build succeeded |
This test was flaky because the context used to start output loops was based on the background context, and thus wasn't being cancelled when the processor was stopped. This was caught by bors in cockroachdb#33557, but it retried and merged anyway. Release note: None
This test was flaky because the context used to start output loops was based on the background context, and thus wasn't being cancelled when the processor was stopped. This was caught by bors in cockroachdb#33557, but it retried and merged anyway. Release note: None
When converting rangefeeds to use a per-registration output loop in issue cockroachdb#33557, I accidentally removed two places where the passed "catchupIter" for a registration was being closed if the processor is terminated before the registration is actually completed. This happened due to some work-in-progress confusion where the catchupIter was removed, but eventually restored. This was caught promptly by our regular stress tests. Fixes cockroachdb#34051 Release note: None
34047: rangefeed: Fix flaky TestProcessorConcurrentStop r=mrtracy a=mrtracy This test was flaky because the context used to start output loops was based on the background context, and thus wasn't being cancelled when the processor was stopped. This was caught by bors in #33557, but it retried and merged anyway. Release note: None Co-authored-by: Matt Tracy <[email protected]>
34066: rangefeed: Fix iterator leak when stopping cluster r=danhhz a=mrtracy When converting rangefeeds to use a per-registration output loop in issue #33557, I accidentally removed two places where the passed "catchupIter" for a registration was being closed if the processor is terminated before the registration is actually completed. This happened due to some work-in-progress confusion where the catchupIter was removed, but eventually restored. This was caught promptly by our regular stress tests. Fixes #34051 Release note: None Co-authored-by: Matt Tracy <[email protected]>
Hint at the reason why a rangefeed checkpoint with an empty resolved timestamp is published. This revives an old comment delete in cockroachdb#33557. Release note: None
59728: kvserver: add a comment r=andreimatei a=andreimatei Hint at the reason why a rangefeed checkpoint with an empty resolved timestamp is published. This revives an old comment delete in #33557. Release note: None Co-authored-by: Andrei Matei <[email protected]>
Hint at the reason why a rangefeed checkpoint with an empty resolved timestamp is published. This revives an old comment delete in cockroachdb#33557. Release note: None
Modifies the rangefeed package so that each individual registration
maintains an output buffer which is processed by a dedicated goroutine.
This eliminates the possibility of a slow consumer blocking the
processor from making progress.
Previously, writing events to registrations was synchronous, meaning
that a single slow consumer could cause the publishing process to block.
If publishing was blocked for too long, the processor along with all
registrations were immediately torn down. With this change, a slow
consumer will no longer block its siblings; if a single consumer blocks
for too long, it will enter an "overflow" state and will be torn down as
soon as its existing buffer is fully processed.
The registry/registration interface has been modified fairly
significantly to accomodate this; the processor itself has seen modest
modification (mostly moving functionality to the registry level and
removing no-longer-needed support for the previous functionality). Some
of the processor tests have been modified to account for the now
asynchronous nature of publishing events to individual streams.
Fixes #32945
Release note: None