-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kvserver: sync checksum computation with long poll #86591
Conversation
Issues to address (and test?):
|
@tbg @erikgrinaker The first commit partially addresses the issue with minimal code touches. Do you think we could merge or even backport it? Then I could address other things in a follow-up PR. We can decide after I add more commits to this PR, and it becomes more apparent how we can split it. |
a069800
to
0105d0b
Compare
4a30fe8
to
d65925c
Compare
d65925c
to
e7b8854
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the main case in which the lack of cancellation caused issues was that caller's context expired before it calls CollectChecksum
, i.e. the RPC never happens and so the computation at the caller will still be left dangling. So if we merge a partial fix, it should encompass that. The current code doesn't.
I think a decent order of things is
- avoid special-casing of the first checksum (i.e. run all in parallel); hopefully a small change? Let's explore
- add code that aborts the inflight computation unless the long-poll arrives within (say) 5s
- backport the above two
- remove the GC of old checksums, since it's no longer necessary (either long-poll creates it and removes it after computation not starting within 5s, or computation creates it and removes it after long-poll not showing up within 5s, or result of computation gets consumed by long poll and removes it)
This could look something like this:
type bridge struct {
mu struct {
syncutil.Mutex
cancelAll func() // cancels both producer and consumer (if they're there)
consuming chan struct{} // closed when long poll arrives
producing chan struct{} // closed when computation starts
produced chan struct{} // closed when computation ends (regardless of outcome)
result interface{} // populated with close(produced)
}
}
func (br *bridge) setProducing() error {
br.mu.Lock()
defer br.mu.Unlock()
select {
case <-br.mu.producing:
return errors.AssertionFailedf("already producing")
default:
}
close(br.mu.producing)
return nil
}
func (br *bridge) setProduced(result interface{}) error {
br.mu.Lock()
defer br.mu.Unlock()
select {
case <-br.mu.produced:
return errors.AssertionFailedf("already produced")
default:
}
br.mu.result = result
close(br.mu.produced)
return nil
}
func (br *bridge) setConsuming() error {
br.mu.Lock()
defer br.mu.Unlock()
select {
case <-br.mu.consuming:
return errors.New("already consuming")
default:
}
close(br.mu.consuming) // NB: mutex makes this non-racy
return nil
}
func (br *bridge) waitProducerStart(ctx context.Context) error {
br.mu.Lock()
ch := br.mu.producing
br.mu.Unlock()
select {
case <-ctx.Done():
return errors.Wrapf(ctx.Err(), "producer didn't start in time")
case <-ch:
}
return nil
}
func (br *bridge) waitProducerDone(ctx context.Context) (interface{}, error) {
br.mu.Lock()
ch := br.mu.produced
br.mu.Unlock()
select {
case <-ctx.Done():
return nil, errors.Wrapf(ctx.Err(), "while waiting for result")
case <-ch:
}
return br.mu.result, nil // NB: don't need lock here
}
func (br *bridge) isConsuming() bool {
br.mu.Lock()
defer br.mu.Unlock()
select {
case <-br.mu.consuming:
return true
default:
return false
}
}
type inflightChecksumMap map[string]*bridge // TODO needs a lock
func (m inflightChecksumMap) getOrCreate(id string, cancelMe func()) *bridge {
// TODO m.mu.Lock()
// TODO defer m.mu.Lock()
if br, ok := m[id]; ok {
old := br.mu.cancelAll
br.mu.cancelAll = func() {
old()
cancelMe()
}
return br
}
br := &bridge{}
br.mu.cancelAll = cancelMe
br.mu.consuming = make(chan struct{})
br.mu.producing = make(chan struct{})
br.mu.produced = make(chan struct{})
m[id] = br
return br
}
func (m inflightChecksumMap) delete(id string, ptr *bridge) {
// TODO m.mu.Lock()
// TODO defer m.mu.Lock()
if m[id] != ptr {
// Shouldn't happen but if for some reason there's an object
// here that's not the one we worked on, don't do anything
// as there'll be someone else who's in charge of deleting
// it and it's simpler not to have to consider this case.
return
}
m[id].mu.cancelAll()
delete(m, id)
}
var m inflightChecksumMap
func computeChecksum(ctx context.Context, id string) error {
ctx, cancel := context.WithCancel(ctx)
br := m.getOrCreate(id, cancel)
defer m.delete(id, br) // see corresponding line in getChecksum for rationale
if err := br.setProducing(); err != nil {
return err
}
defer time.AfterFunc(5*time.Second, func() { // TODO use stopper instead
// NB: if we blocked here, we could delay the computation until the
// long poll has actually shown up. But we will expect it to usually
// do that, so probably fine as is? But if it doesn't once then it
// might happen all of the time. So I would lean towards blocking here.
if !br.isConsuming() {
cancel()
}
}).Stop()
var result interface{}
for i := 0; i < 100; i++ {
// Big compute! But sensitive to ctx cancellation.
_ = result
}
br.setProduced(result)
}
func getChecksum(ctx context.Context, id string) (interface{}, error) {
ctx, cancel := context.WithCancel(ctx)
br := m.getOrCreate(id, cancel)
// No matter what happens, at the end of this method this ID isn't tracked
// anymore (though it might be recreated if the computation starts, but then
// it will time out waiting for getChecksum and will again disappear).
defer m.delete(id, br)
if err := br.setConsuming(); err != nil {
return nil, err
}
{
const startTimeout = 5 * time.Second
timeoutCtx, cancel := context.WithTimeout(ctx, startTimeout)
defer cancel()
if err := br.waitProducerStart(timeoutCtx); err != nil {
return nil, err
}
}
return br.waitProducerDone(ctx)
}
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @erikgrinaker)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 2 files at r1, 1 of 1 files at r2, 1 of 2 files at r3, 1 of 1 files at r4, all commit messages.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @erikgrinaker)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, wrong button.
36b7cf2
to
e9c6269
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- avoid special-casing of the first checksum (i.e. run all in parallel); hopefully a small change? Let's explore
- add code that aborts the inflight computation unless the long-poll arrives within (say) 5s
- backport the above two
It's unclear to me how we can make this safe to backport -- users can presumably run different minor versions in the same cluster, so wouldn't this effectively prevent all consistency checks initiated by older nodes? I was thinking we'd need a major version gate for this, and tolerate slow callers until all nodes have upgraded.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @tbg)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean the following scenario?
- An old node blocks on the first request, so it will typically be more than 5s until other requests arrive.
- A new node expects a timely turnup of a request, so it times out the consistency check.
Possibly we could backport the sequential->parallel change first, coupled with cancel propagation (for the case when the RPC made it through). It's backwards compatible in this regard.
And then we could tackle the timeout behaviour of the task (using the gate etc).
Another backwards-compatible way:
- still don't start the task right away, put it in an in-memory queue.
- allow a longer timeout (same as before).
- let
CollectChecksum
start it when it arrives.
The unclear moment here is: how long is it okay to hold thesnapshot
that the post-apply handler grabs.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @tbg)
183374e
to
94e5107
Compare
Yeah, as we discussed in the meeting earlier today, we should backport this to 22.2 at least, so that we can rely on the behavior when we add timeouts in 23.1. This would also be a strict improvement, in that we'll at least cancel when there is an RPC, and in the common case there will be an RPC when we run them all in parallel. |
5d20698
to
a7e68ae
Compare
ffb3cfb
to
cd2e42c
Compare
77cdc73
to
20056b8
Compare
@tbg @erikgrinaker Ready for review now. Disregard all the previous versions of on this PR, that work was factored out and submitted. |
d038308
to
1f1bf44
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Can we close out #77432 now?
nit: in the release notes, I wouldn't refer to requests or computations, since end users don't know what these internals are. I would reword it something along the lines of "consistency checks are now properly cancelled on timeout, preventing them from piling up."
1f1bf44
to
12d2a39
Compare
Marking as fixed by this PR.
Done. |
}, | ||
); err != nil { | ||
log.Errorf(ctx, "checksum collection did not join: %v", err) | ||
} else if err := contextutil.RunWithTimeout(ctx, taskName, consistencyCheckAsyncTimeout, | ||
func(ctx context.Context) error { | ||
defer snap.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bug: need to close this snap
even if there was an error above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@erikgrinaker Is there a way to assert that all snapshots were closed in tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe something like:
- At the end of the test, take a new snapshot
s
. - Wait until the engine's "protected" sequence number (if there is such a thing) reaches that of
s
. If we did not release snapshots, this will time out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ran a quick test, doesn't look like we do. We should. Opened cockroachdb/pebble#1986.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course, we'll need a test that actually exercises the code path that leaks the snapshot.
For consistency checks? This bug is fixed, so we could only do it manually (to make sure retrospectively that this bug would have been caught). But there could be tests in Pebble/CRDB for the unhappy case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, Pebble should check that the detection works.
Previously, the checksum computation would run until completion unconditionally (unless the collection request comes before it). This is not the best spend of the limited pool capacity, because the result of this computation may never be requested. After this commit, the checksum computation task is synchronized with the checksum collection request. Both wait at most 5 seconds until the other party has joined. Once joined, the computation starts, otherwise skips. If any party abandons the request, then the replicaChecksum record is preserved in the state, and is scheduled for a GC later. This is to help the other party to fail fast, instead of waiting, if it arrives late. Release note (performance improvement): consistency checks are now properly cancelled on timeout, preventing them from piling up.
ef62b9f
to
6e901bc
Compare
Consistency checks require synchronization between the checksum computation task and the collection requests, the task does not start otherwise. The task is canceled if the collection request is abandoned. These factors removed the possibility of an uncontrolled build-up of abandoned checksum computation tasks competing for resources. All in-flight tasks are waited on by the requestor. This commit removes the no longer needed limit for the number of running tasks, because this is automatically guaranteed by the way consistency checks are run. Release note: None
The consistency queue usually has a much lower timeout (depends on the consistency checker rate limit etc). This was a fallback for the follower checks that weren't connected to the queue's context, and could run forever. Now the task runs iff there is a requestor waiting for its result. Release note: None
Release note: None
6e901bc
to
46fe099
Compare
bors r=erikgrinaker |
Build failed (retrying...): |
Build failed (retrying...): |
Build succeeded: |
Previously, the checksum computation would run until completion unconditionally
(unless the collection request comes before it). This is not the best spend of
the limited pool capacity, because the result of this computation may never be
requested.
After this commit, the checksum computation task is synchronized with the
checksum collection request. Both wait at most 5 seconds until the other party
has joined. Once joined, the computation starts, otherwise skips.
If any party abandons the request, then the
replicaChecksum
record is preservedin the state, and is scheduled for a GC later. This is to help the other party
to fail fast, instead of waiting, if it arrives late.
This change also removes the no longer needed concurrency limit for the tasks,
because tasks are canceled reliably and will not pile up.
Fixes #77432
Release note (performance improvement): consistency checks are now properly
cancelled on timeout, preventing them from piling up.