Skip to content

Commit

Permalink
storage: Check context cancellation before the command queue
Browse files Browse the repository at this point in the history
  • Loading branch information
bdarnell committed Nov 7, 2016
1 parent 6996001 commit 27bdb07
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 13 deletions.
16 changes: 16 additions & 0 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1243,6 +1243,22 @@ func (r *Replica) beginCmds(
// a large read-only span but also a write (see #10084).
readOnly := ba.IsReadOnly()

// Check for context cancellation before inserting into the
// command queue (and check again afterward). Once we're in the
// command queue we're committed to waiting on our prerequisites
// (which costs a goroutine, and slightly increases the cost of
// other commands that might wait on our keys), so it's good to
// bail out early if we can.
select {
case <-ctx.Done():
err := ctx.Err()
errStr := fmt.Sprintf("%s before command queue: %s", err, ba)
log.Warning(ctx, errStr)
log.ErrEvent(ctx, errStr)
return nil, err
default:
}

r.cmdQMu.Lock()
chans := r.cmdQMu.global.getWait(readOnly, spansGlobal...)
chans = append(chans, r.cmdQMu.global.getWait(readOnly, spansLocal...)...)
Expand Down
31 changes: 20 additions & 11 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2069,11 +2069,18 @@ func TestReplicaCommandQueueCancellation(t *testing.T) {
cmd1Done := startBlockingCmd(context.Background(), key1)
<-blockingStart

// Put a cancelled blocking command in the command queue. This command will
// block on the previous command, but will not itself reach the filter since
// its context is cancelled.
// Start a command that is already cancelled. It will return immediately.
{
ctx, cancel := context.WithCancel(context.Background())
cancel()
done := startBlockingCmd(ctx, key1, key2)
if pErr := <-done; !testutils.IsPError(pErr, context.Canceled.Error()) {
t.Fatalf("unexpected error %v", pErr)
}
}

// Start a third command which will wait for the first.
ctx, cancel := context.WithCancel(context.Background())
cancel()
cmd2Done := startBlockingCmd(ctx, key1, key2)

// Wait until both commands are in the command queue.
Expand All @@ -2087,16 +2094,18 @@ func TestReplicaCommandQueueCancellation(t *testing.T) {
return nil
})

// If this deadlocks, the command has unexpectedly begun executing and was
// trapped in the command filter. Indeed, the absence of such a deadlock is
// what's being tested here.
if pErr := <-cmd2Done; !testutils.IsPError(pErr, context.Canceled.Error()) {
// Cancel the third command, then finish the first to unblock it.
cancel()
blockingDone <- struct{}{}
if pErr := <-cmd1Done; pErr != nil {
t.Fatal(pErr)
}

// Finish the previous command, allowing the test to shut down.
blockingDone <- struct{}{}
if pErr := <-cmd1Done; pErr != nil {
// The third command should finish with a context cancellation
// error instead of executing. If it had started executing, it would
// be caught in the command filter and we'd time out reading from
// the channel.
if pErr := <-cmd2Done; !testutils.IsPError(pErr, context.Canceled.Error()) {
t.Fatal(pErr)
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2427,8 +2427,8 @@ func (s *Store) Send(
// context was canceled or timed out. However, some unittests set a
// maximum retry count; return txn retry error for transactional
// cases and the original error otherwise.
if ctx.Err() != nil {
return nil, roachpb.NewError(ctx.Err())
if err := ctx.Err(); err != nil {
return nil, roachpb.NewError(err)
}
log.Event(ctx, "store retry limit exceeded") // good to check for if tests fail
if ba.Txn != nil {
Expand Down

0 comments on commit 27bdb07

Please sign in to comment.