Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: Check for context cancellation in more places #10487

Merged
merged 2 commits into from
Nov 7, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1243,6 +1243,22 @@ func (r *Replica) beginCmds(
// a large read-only span but also a write (see #10084).
readOnly := ba.IsReadOnly()

// Check for context cancellation before inserting into the
// command queue (and check again afterward). Once we're in the
// command queue we're committed to waiting on our prerequisites
// (which costs a goroutine, and slightly increases the cost of
// other commands that might wait on our keys), so it's good to
// bail out early if we can.
select {
case <-ctx.Done():
err := ctx.Err()
errStr := fmt.Sprintf("%s before command queue: %s", err, ba)
log.Warning(ctx, errStr)
log.ErrEvent(ctx, errStr)
return nil, err
default:
}

r.cmdQMu.Lock()
chans := r.cmdQMu.global.getWait(readOnly, spansGlobal...)
chans = append(chans, r.cmdQMu.global.getWait(readOnly, spansLocal...)...)
Expand Down
31 changes: 20 additions & 11 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2069,11 +2069,18 @@ func TestReplicaCommandQueueCancellation(t *testing.T) {
cmd1Done := startBlockingCmd(context.Background(), key1)
<-blockingStart

// Put a cancelled blocking command in the command queue. This command will
// block on the previous command, but will not itself reach the filter since
// its context is cancelled.
// Start a command that is already cancelled. It will return immediately.
{
ctx, cancel := context.WithCancel(context.Background())
cancel()
done := startBlockingCmd(ctx, key1, key2)
if pErr := <-done; !testutils.IsPError(pErr, context.Canceled.Error()) {
t.Fatalf("unexpected error %v", pErr)
}
}

// Start a third command which will wait for the first.
ctx, cancel := context.WithCancel(context.Background())
cancel()
cmd2Done := startBlockingCmd(ctx, key1, key2)

// Wait until both commands are in the command queue.
Expand All @@ -2087,16 +2094,18 @@ func TestReplicaCommandQueueCancellation(t *testing.T) {
return nil
})

// If this deadlocks, the command has unexpectedly begun executing and was
// trapped in the command filter. Indeed, the absence of such a deadlock is
// what's being tested here.
if pErr := <-cmd2Done; !testutils.IsPError(pErr, context.Canceled.Error()) {
// Cancel the third command, then finish the first to unblock it.
cancel()
blockingDone <- struct{}{}
if pErr := <-cmd1Done; pErr != nil {
t.Fatal(pErr)
}

// Finish the previous command, allowing the test to shut down.
blockingDone <- struct{}{}
if pErr := <-cmd1Done; pErr != nil {
// The third command should finish with a context cancellation
// error instead of executing. If it had started executing, it would
// be caught in the command filter and we'd time out reading from
// the channel.
if pErr := <-cmd2Done; !testutils.IsPError(pErr, context.Canceled.Error()) {
t.Fatal(pErr)
}
}
Expand Down
12 changes: 8 additions & 4 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2318,7 +2318,7 @@ func (s *Store) Send(
s.mu.Lock()
retryOpts := s.cfg.RangeRetryOptions
s.mu.Unlock()
for r := retry.Start(retryOpts); next(&r); {
for r := retry.StartWithCtx(ctx, retryOpts); next(&r); {
// Get range and add command to the range for execution.
var err error
repl, err = s.GetReplica(ba.RangeID)
Expand Down Expand Up @@ -2423,9 +2423,13 @@ func (s *Store) Send(
return nil, pErr
}

// By default, retries are indefinite. However, some unittests set a
// maximum retry count; return txn retry error for transactional cases
// and the original error otherwise.
// By default, retries are infinite and we'll only get here if the
// context was canceled or timed out. However, some unittests set a
// maximum retry count; return txn retry error for transactional
// cases and the original error otherwise.
if err := ctx.Err(); err != nil {
return nil, roachpb.NewError(err)
}
log.Event(ctx, "store retry limit exceeded") // good to check for if tests fail
if ba.Txn != nil {
pErr = roachpb.NewErrorWithTxn(roachpb.NewTransactionRetryError(), ba.Txn)
Expand Down