Skip to content

Commit

Permalink
kv: track intents when response contains ABORTED transaction
Browse files Browse the repository at this point in the history
Previously, we were clearing the in-flight write set as soon as a
COMMITTED transaction or an ABORTED transaction was seen in a batch
response. This seems ok to do for COMMITTED transactions, but we really
shouldn't risk leaking intents when an ABORTED transaction is observed,
especially because we may want to go back and clean up its intents
later.

I don't know when this would come up because I don't think any requests
return ABORTED txs instead of returning a TransactionAbortedError, but
the commit adds a unit test around the behavior anyway.

Release note: None
  • Loading branch information
nvanbenschoten committed Aug 16, 2019
1 parent 7f869e5 commit 5beb210
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 6 deletions.
23 changes: 18 additions & 5 deletions pkg/kv/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,11 +465,24 @@ func (tp *txnPipeliner) updateWriteTracking(
return
}

// If the transaction is no longer pending, clear the in-flight writes set
// and immediately return.
// TODO(nvanbenschoten): Do we have to handle missing Txn's anymore?
if br.Txn.Status != roachpb.PENDING {
tp.ifWrites.clear(false /* reuse */)
// Similarly, if the transaction is now finalized, we don't need to
// accurately updated the write tracking.
if br.Txn.Status.IsFinalized() {
switch br.Txn.Status {
case roachpb.ABORTED:
// If the transaction is now ABORTED, add all intent writes from
// the batch directly to the write footprint. We don't know which
// of these succeeded.
ba.IntentSpanIterate(nil, tp.footprint.insert)
case roachpb.COMMITTED:
// If the transaction is now COMMITTED, it must not have any more
// in-flight writes, so clear them. Technically we should move all
// of these to the write footprint, but since the transaction is
// already committed, there's no reason to.
tp.ifWrites.clear(false /* reuse */)
default:
panic("unexpected")
}
return
}

Expand Down
55 changes: 54 additions & 1 deletion pkg/kv/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ func TestTxnPipelinerTransactionAbort(t *testing.T) {
br, pErr = tp.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
require.Equal(t, 0, tp.ifWrites.len())
require.Equal(t, 1, tp.ifWrites.len()) // nothing proven
}

// TestTxnPipelinerEpochIncrement tests that a txnPipeliner's in-flight write
Expand Down Expand Up @@ -1142,3 +1142,56 @@ func TestTxnPipelinerMaxBatchSize(t *testing.T) {
require.NotNil(t, br)
require.Equal(t, 2, tp.ifWrites.len())
}

// TestTxnPipelinerRecordsWritesOnFailure tests that even when a request returns
// with an ABORTED transaction status or an error, the intent writes it attempted
// to perform are added to the write footprint.
func TestTxnPipelinerRecordsWritesOnFailure(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
tp, mockSender := makeMockTxnPipeliner()

txn := makeTxnProto()
keyA, keyB := roachpb.Key("a"), roachpb.Key("b")

// Return an error for a write.
var ba roachpb.BatchRequest
ba.Header = roachpb.Header{Txn: &txn}
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}})

mockPErr := roachpb.NewErrorf("boom")
mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Len(t, ba.Requests, 1)
require.True(t, ba.AsyncConsensus)
require.IsType(t, &roachpb.PutRequest{}, ba.Requests[0].GetInner())

return nil, mockPErr
})

br, pErr := tp.SendLocked(ctx, ba)
require.Nil(t, br)
require.Equal(t, mockPErr, pErr)
require.Equal(t, 0, tp.ifWrites.len())
require.Len(t, tp.footprint.asSlice(), 1)

// Return an ABORTED transaction record for a write.
ba.Requests = nil
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyB}})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Len(t, ba.Requests, 1)
require.True(t, ba.AsyncConsensus)
require.IsType(t, &roachpb.PutRequest{}, ba.Requests[0].GetInner())

br = ba.CreateReply()
br.Txn = ba.Txn
br.Txn.Status = roachpb.ABORTED
return br, nil
})

br, pErr = tp.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
require.Equal(t, 0, tp.ifWrites.len())
require.Len(t, tp.footprint.asSlice(), 2)
}

0 comments on commit 5beb210

Please sign in to comment.