Skip to content

Commit

Permalink
kv: collect and propogate all transaction state in DistSender on error
Browse files Browse the repository at this point in the history
Fixes “Bug 2” from #36089 (comment).

This commit fixes the bug described in the referenced issue where part of a
transaction proto update was lost when combining partial batches with some
successes and some failures. This was leading to a state where a transaction
restarted with a read timestamp below its maximum write timestamp from the
previous epoch.

Fixing this bug resolves the current issue in #36089.

Release note: None
  • Loading branch information
nvanbenschoten committed Jun 25, 2019
1 parent a0a6db3 commit 8b5bafb
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 29 deletions.
20 changes: 15 additions & 5 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,15 @@ func (ds *DistSender) divideAndSendBatchToRanges(
if resp.pErr != nil {
if pErr == nil {
pErr = resp.pErr
// Update the error's transaction with any new information from
// the batch response. This may contain interesting updates if
// the batch was parallelized and part of it succeeded.
pErr.UpdateTxn(br.Txn)
} else {
// Even though we ignore the second error, update the first
// error's transaction with any new information from the
// second error. This may contain interesting updates.
pErr.UpdateTxn(resp.pErr.GetTxn())
}
continue
}
Expand All @@ -1139,21 +1148,22 @@ func (ds *DistSender) divideAndSendBatchToRanges(
}

// Combine the new response with the existing one (including updating
// the headers).
// the headers) if we haven't yet seen an error.
if pErr == nil {
if err := br.Combine(resp.reply, resp.positions); err != nil {
pErr = roachpb.NewError(err)
}
} else {
// Update the error's transaction with any new information from
// the batch response. This may contain interesting updates if
// the batch was parallelized and part of it succeeded.
pErr.UpdateTxn(resp.reply.Txn)
}
}

// If we experienced an error, don't neglect to update the error's
// attached transaction with any responses which were received.
if pErr != nil {
// Update the error's transaction with any new information from
// the batch response. This may contain interesting updates if
// the batch was parallelized and part of it succeeded.
pErr.UpdateTxn(br.Txn)
// If this is a write batch with any successful responses, but
// we're ultimately returning an error, wrap the error with a
// MixedSuccessError.
Expand Down
53 changes: 29 additions & 24 deletions pkg/kv/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1638,31 +1638,34 @@ func TestBadRequest(t *testing.T) {
}

// TestPropagateTxnOnError verifies that DistSender.Send properly propagates the
// txn data to a next iteration. Use the txn.ObservedTimestamps field to verify
// that.
// txn data to a next iteration. The test uses the txn.ObservedTimestamps field
// to verify that.
func TestPropagateTxnOnError(t *testing.T) {
defer leaktest.AfterTest(t)()

// Inject this observed timestamp into the part of the batch's response that
// does not result in an error. Even though the batch as a whole results in
// an error, the transaction should still propagate this information.
observedTS := roachpb.ObservedTimestamp{
NodeID: 7, Timestamp: hlc.Timestamp{WallTime: 15},
}
containsObservedTS := func(txn *roachpb.Transaction) bool {
for _, ts := range txn.ObservedTimestamps {
if ts.Equal(observedTS) {
return true
// Inject these two observed timestamps into the parts of the batch's
// response that does not result in an error. Even though the batch as a
// whole results in an error, the transaction should still propagate this
// information.
ot1 := roachpb.ObservedTimestamp{NodeID: 7, Timestamp: hlc.Timestamp{WallTime: 15}}
ot2 := roachpb.ObservedTimestamp{NodeID: 8, Timestamp: hlc.Timestamp{WallTime: 16}}
containsObservedTSs := func(txn *roachpb.Transaction) bool {
contains := func(ot roachpb.ObservedTimestamp) bool {
for _, ts := range txn.ObservedTimestamps {
if ts.Equal(ot) {
return true
}
}
return false
}
return false
return contains(ot1) && contains(ot2)
}

// Set up a filter to so that the first CPut operation will
// get a ReadWithinUncertaintyIntervalError and so that the
// Put operation will return with the new observed timestamp.
keyA := roachpb.Key("a")
keyB := roachpb.Key("b")
// Put operations on either side of the CPut will each return
// with the new observed timestamp.
keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")
var numCPuts int32
var storeKnobs storage.StoreTestingKnobs
storeKnobs.EvalKnobs.TestingEvalFilter =
Expand All @@ -1671,7 +1674,9 @@ func TestPropagateTxnOnError(t *testing.T) {
switch fArgs.Req.(type) {
case *roachpb.PutRequest:
if k.Equal(keyA) {
fArgs.Hdr.Txn.ObservedTimestamps = append(fArgs.Hdr.Txn.ObservedTimestamps, observedTS)
fArgs.Hdr.Txn.UpdateObservedTimestamp(ot1.NodeID, ot1.Timestamp)
} else if k.Equal(keyC) {
fArgs.Hdr.Txn.UpdateObservedTimestamp(ot2.NodeID, ot2.Timestamp)
}
case *roachpb.ConditionalPutRequest:
if k.Equal(keyB) {
Expand All @@ -1692,7 +1697,7 @@ func TestPropagateTxnOnError(t *testing.T) {
defer s.Stopper().Stop(ctx)

db := s.DB()
if err := setupMultipleRanges(ctx, db, "b"); err != nil {
if err := setupMultipleRanges(ctx, db, "b", "c"); err != nil {
t.Fatal(err)
}

Expand All @@ -1702,10 +1707,9 @@ func TestPropagateTxnOnError(t *testing.T) {
t.Fatal(err)
}

// The following txn creates a batch request that is split
// into two requests: Put and CPut. The CPut operation will
// get a ReadWithinUncertaintyIntervalError and the txn will be
// retried.
// The following txn creates a batch request that is split into three
// requests: Put, CPut, and Put. The CPut operation will get a
// ReadWithinUncertaintyIntervalError and the txn will be retried.
epoch := 0
if err := db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
// Observe the commit timestamp to prevent refreshes.
Expand All @@ -1716,20 +1720,21 @@ func TestPropagateTxnOnError(t *testing.T) {
if epoch >= 2 {
// ObservedTimestamps must contain the timestamp returned from the
// Put operation.
if !containsObservedTS(proto) {
if !containsObservedTSs(proto) {
t.Errorf("expected observed timestamp, found: %v", proto.ObservedTimestamps)
}
} else {
// ObservedTimestamps must not contain the timestamp returned from
// the Put operation.
if containsObservedTS(proto) {
if containsObservedTSs(proto) {
t.Errorf("unexpected observed timestamp, found: %v", proto.ObservedTimestamps)
}
}

b := txn.NewBatch()
b.Put(keyA, "val")
b.CPut(keyB, "new_val", origVal)
b.Put(keyC, "val2")
err := txn.CommitInBatch(ctx, b)
if epoch == 1 {
if retErr, ok := err.(*roachpb.TransactionRetryWithProtoRefreshError); ok {
Expand Down

0 comments on commit 8b5bafb

Please sign in to comment.