Skip to content

Commit

Permalink
Merge pull request #10487 from bdarnell/check-context
Browse files Browse the repository at this point in the history
storage: Check for context cancellation in more places
  • Loading branch information
bdarnell authored Nov 7, 2016
2 parents 09c0f88 + 27bdb07 commit 278e8b1
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 15 deletions.
16 changes: 16 additions & 0 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1243,6 +1243,22 @@ func (r *Replica) beginCmds(
// a large read-only span but also a write (see #10084).
readOnly := ba.IsReadOnly()

// Check for context cancellation before inserting into the
// command queue (and check again afterward). Once we're in the
// command queue we're committed to waiting on our prerequisites
// (which costs a goroutine, and slightly increases the cost of
// other commands that might wait on our keys), so it's good to
// bail out early if we can.
select {
case <-ctx.Done():
err := ctx.Err()
errStr := fmt.Sprintf("%s before command queue: %s", err, ba)
log.Warning(ctx, errStr)
log.ErrEvent(ctx, errStr)
return nil, err
default:
}

r.cmdQMu.Lock()
chans := r.cmdQMu.global.getWait(readOnly, spansGlobal...)
chans = append(chans, r.cmdQMu.global.getWait(readOnly, spansLocal...)...)
Expand Down
31 changes: 20 additions & 11 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2069,11 +2069,18 @@ func TestReplicaCommandQueueCancellation(t *testing.T) {
cmd1Done := startBlockingCmd(context.Background(), key1)
<-blockingStart

// Put a cancelled blocking command in the command queue. This command will
// block on the previous command, but will not itself reach the filter since
// its context is cancelled.
// Start a command that is already cancelled. It will return immediately.
{
ctx, cancel := context.WithCancel(context.Background())
cancel()
done := startBlockingCmd(ctx, key1, key2)
if pErr := <-done; !testutils.IsPError(pErr, context.Canceled.Error()) {
t.Fatalf("unexpected error %v", pErr)
}
}

// Start a third command which will wait for the first.
ctx, cancel := context.WithCancel(context.Background())
cancel()
cmd2Done := startBlockingCmd(ctx, key1, key2)

// Wait until both commands are in the command queue.
Expand All @@ -2087,16 +2094,18 @@ func TestReplicaCommandQueueCancellation(t *testing.T) {
return nil
})

// If this deadlocks, the command has unexpectedly begun executing and was
// trapped in the command filter. Indeed, the absence of such a deadlock is
// what's being tested here.
if pErr := <-cmd2Done; !testutils.IsPError(pErr, context.Canceled.Error()) {
// Cancel the third command, then finish the first to unblock it.
cancel()
blockingDone <- struct{}{}
if pErr := <-cmd1Done; pErr != nil {
t.Fatal(pErr)
}

// Finish the previous command, allowing the test to shut down.
blockingDone <- struct{}{}
if pErr := <-cmd1Done; pErr != nil {
// The third command should finish with a context cancellation
// error instead of executing. If it had started executing, it would
// be caught in the command filter and we'd time out reading from
// the channel.
if pErr := <-cmd2Done; !testutils.IsPError(pErr, context.Canceled.Error()) {
t.Fatal(pErr)
}
}
Expand Down
12 changes: 8 additions & 4 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2318,7 +2318,7 @@ func (s *Store) Send(
s.mu.Lock()
retryOpts := s.cfg.RangeRetryOptions
s.mu.Unlock()
for r := retry.Start(retryOpts); next(&r); {
for r := retry.StartWithCtx(ctx, retryOpts); next(&r); {
// Get range and add command to the range for execution.
var err error
repl, err = s.GetReplica(ba.RangeID)
Expand Down Expand Up @@ -2423,9 +2423,13 @@ func (s *Store) Send(
return nil, pErr
}

// By default, retries are indefinite. However, some unittests set a
// maximum retry count; return txn retry error for transactional cases
// and the original error otherwise.
// By default, retries are infinite and we'll only get here if the
// context was canceled or timed out. However, some unittests set a
// maximum retry count; return txn retry error for transactional
// cases and the original error otherwise.
if err := ctx.Err(); err != nil {
return nil, roachpb.NewError(err)
}
log.Event(ctx, "store retry limit exceeded") // good to check for if tests fail
if ba.Txn != nil {
pErr = roachpb.NewErrorWithTxn(roachpb.NewTransactionRetryError(), ba.Txn)
Expand Down

0 comments on commit 278e8b1

Please sign in to comment.