-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
streamproducer: rewrite event stream to remove ch/loop #120888
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall I like this idea, but I have one correctness concern. Don't block on me convincing myself if y'all are confident as I'll be out for a few days.
return nil | ||
s.seb.addKV(roachpb.KeyValue{Key: value.Key, Value: value.Value}) | ||
if err := s.maybeFlushBatch(ctx); err != nil { | ||
s.errCh <- err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not 100% convinced this is correct. I nearly convinced myself because the errCh channel is buffered and the callbacks aren't called concurrently.
But, then I realized that last bit isn't true during the initial scan which will call the onValue callback concurrently. That poses a problem not just for this errCh, but it also makes me question whether the initial scan will be a problem for the stream_event_batcher which has no mutex.
If I'm right that this is a problem, it's a little concerning that we don't have a single unit test that failed here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, yeah, I took the comment in rangefeed.go that said callbacks... are called in said async task in a single thread.
at face value, but looks like initial scans violate that comment if concurrency is enabled. I added a mutex to add (that is only used during initial scan). The rest of them I think are safe, as long as they select
to discard errors if one is already in the ch.
224ca2a
to
a47d82d
Compare
It looks like your PR touches production code but doesn't add or edit any test code. Did you consider adding tests to your PR? 🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
224ca2a
to
28d9271
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't believe we hadn't done this sooner.
@@ -112,7 +112,11 @@ func (f *RangeFeed) getSpansToScan(ctx context.Context) (func() []roachpb.Span, | |||
return err | |||
} | |||
} | |||
_, err := frontier.Forward(sp, f.initialTimestamp) | |||
advanced, err := frontier.Forward(sp, f.initialTimestamp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we call f.fontierVisitor()
here as well? I would think we only call it once on checkpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean "once on checkpoint" ? this is us calling it on a (potential) checkpoint. We've just finished moving a span from time zero to time initial, and we're informing the frontier visitor that it could elect to visit now if it wants to see the updated frontier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the time I wrote this comment, I was reading the docstring for FrontierSpanVisitor
too literally:
// FrontierSpanVisitor is called when the FrontierSpanVisitTrigger requests the
// frontier be visited after a checkpoint.
The comment could be revised to "requests the frontier to be visited after it is advanced". But CI is green.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll get it in a followup (I have a diff stacked here to improve initial scan perf by passing []row instead of row from the scanner to the event stream)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, before merging, I think we should spin up one of the tpcc roachtests and the node shutdown roachtests
Release note: none. Epic: none.
This switches eventStream to just do its work on the rangefeed client callback handlers directly, without an extra channel handoff and loop with its own duplicate frontier. Release note: none. Epic: none.
|
TFTR! bors r+ |
We already have a ch/loop/frontier in the range feed client so just use that one and do our work in the handler callbacks.