Skip to content

Commit

Permalink
kv: detect if missing intent is due to intent resolution during paral…
Browse files Browse the repository at this point in the history
…lel commit

Fixes cockroachdb#37866.

This commit implements the medium-term solution to cockroachdb#37866 proposed and modeled
in cockroachdb#37900. The solution is to catch `IntentMissingErrors` in `DistSender`'s
`divideAndSendParallelCommit` method coming from a parallel commit's pre-commit
QueryIntent batch. When we see one of these errors, we immediately send a
`QueryTxn` request to the transaction record. This will result in one of the
four statuses:
1. PENDING: Unexpected because the parallel commit `EndTransactionRequest` succeeded. Ignore.
2. STAGING: Unambiguously not the issue from cockroachdb#37866. Ignore.
3. COMMITTED: Unambiguously the issue from cockroachdb#37866. Strip the error and return the updated proto.
4. ABORTED: Still ambiguous. Transform error into an AmbiguousCommitError and return.

This solution isolates the ambiguity caused by the loss of information during
intent resolution to just the case where the result of the QueryTxn is ABORTED.
This is because an ABORTED record can mean either 1) the transaction was ABORTED
and the missing intent was removed or 2) the transaction was COMMITTED, all
intents were resolved, and the transaction record was GCed.

Release note: None
  • Loading branch information
nvanbenschoten committed Jun 4, 2019
1 parent 5a17b3d commit a83d408
Show file tree
Hide file tree
Showing 2 changed files with 230 additions and 11 deletions.
107 changes: 99 additions & 8 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -914,14 +914,31 @@ func (ds *DistSender) divideAndSendParallelCommit(
}
if qiPErr := qiReply.pErr; qiPErr != nil {
// The batch with the pre-commit QueryIntent requests returned an error.
// Wrap this in a MixedSuccessError, as we know that the EndTransaction
// batch succeeded. It is not possible for qiPErr to be a MixedSuccessError
// itself, so we don't need to handle that case like we do down below.
qiPErr.UpdateTxn(ba.Txn)
maybeSwapErrorIndex(qiPErr, swapIdx, lastIdx)
pErr := roachpb.NewError(&roachpb.MixedSuccessError{Wrapped: qiPErr})
pErr.Index = qiPErr.Index
return nil, pErr
ignoreMissing := false
if _, ok := qiPErr.GetDetail().(*roachpb.IntentMissingError); ok {
// If the error is an IntentMissingError, detect whether this is due
// to intent resolution and can be safely ignored.
ignoreMissing, err = ds.detectIntentMissingDueToIntentResolution(ctx, br.Txn)
if err != nil {
return nil, roachpb.NewError(err)
}
}
if !ignoreMissing {
// Wrap this in a MixedSuccessError, as we know that the EndTransaction
// batch succeeded. It is not possible for qiPErr to be a MixedSuccessError
// itself, so we don't need to handle that case like we do down below.
qiPErr.UpdateTxn(br.Txn)
maybeSwapErrorIndex(qiPErr, swapIdx, lastIdx)
pErr := roachpb.NewError(&roachpb.MixedSuccessError{Wrapped: qiPErr})
pErr.Index = qiPErr.Index
return nil, pErr
}
// Populate the pre-commit QueryIntent batch response. If we made it
// here then we know we can ignore intent missing errors.
qiReply.reply = qiBa.CreateReply()
for _, ru := range qiReply.reply.Responses {
ru.GetQueryIntent().FoundIntent = true
}
}

// Both halves of the split batch succeeded. Piece them back together.
Expand All @@ -935,6 +952,80 @@ func (ds *DistSender) divideAndSendParallelCommit(
return br, nil
}

// detectIntentMissingDueToIntentResolution attempts to detect whether a missing
// intent error thrown by a pre-commit QueryIntent request was due to intent
// resolution after the transaction was already finalized instead of due to a
// failure of the corresponding pipelined write. It is possible for these two
// situations to be confused because the pre-commit QueryIntent requests are
// issued in parallel with the staging EndTransaction request and may evaluate
// after the transaction becomes implicitly committed. If this happens and a
// concurrent transaction observes the implicit commit and makes the commit
// explicit, it is allowed to begin resolving the transactions intents.
//
// MVCC values don't remember their transaction once they have been resolved.
// This loss of information means that QueryIntent returns an intent missing
// error if it finds the resolved value that correspond to its desired intent.
// Because of this, the race discussed above can result in intent missing errors
// during a parallel commit even when the transaction successfully committed.
//
// This method queries the transaction record to determine whether an intent
// missing error was caused by this race or whether the intent missing error
// is real and guarantees that the transaction is not implicitly committed.
//
// See #37866 (issue) and #37900 (corresponding tla+ update).
func (ds *DistSender) detectIntentMissingDueToIntentResolution(
ctx context.Context, txn *roachpb.Transaction,
) (bool, error) {
ba := roachpb.BatchRequest{}
ba.Timestamp = ds.clock.Now()
ba.Add(&roachpb.QueryTxnRequest{
RequestHeader: roachpb.RequestHeader{
Key: txn.TxnMeta.Key,
},
Txn: txn.TxnMeta,
})
log.VEvent(ctx, 1, "detecting whether missing intent is due to intent resolution")
br, pErr := ds.Send(ctx, ba)
if pErr != nil {
// We weren't able to determine whether the intent missing error is
// due to intent resolution or not, so it is still ambiguous whether
// the commit succeeded.
return false, roachpb.NewAmbiguousResultError(fmt.Sprintf("error=%s [intent missing]", pErr))
}
respTxn := &br.Responses[0].GetQueryTxn().QueriedTxn
switch respTxn.Status {
case roachpb.COMMITTED:
// The transaction has already been finalized as committed. The missing
// intent error must have been a result of a concurrent transaction
// recovery finding the transaction in the implicit commit state and
// resolving one of its intents before the pre-commit QueryIntent
// queried that intent. We know that the transaction was committed
// successfully, so ignore the error.
return true, nil
case roachpb.ABORTED:
// The transaction has either already been finalized as aborted or has
// been finalized as committed and already had its transaction record
// GCed. We can't distinguish between these two conditions with full
// certainty, so we're forced to return an ambiguous commit error.
// TODO(nvanbenschoten): QueryTxn will materialize an ABORTED transaction
// record if one does not already exist. If we are certain that no actor
// will ever persist an ABORTED transaction record after a COMMIT record is
// GCed and we returned whether the record was synthesized in the QueryTxn
// response then we could use the existence of an ABORTED transaction record
// to further isolates the ambiguity caused by the loss of information
// during intent resolution. If this error becomes a problem, we can explore
// this option.
return false, roachpb.NewAmbiguousResultError("intent missing and record aborted")
default:
// The transaction has not been finalized yet, so the missing intent
// error must have been caused by a real missing intent. Propagate the
// missing intent error.
// NB: we don't expect the record to be PENDING at this point, but it's
// not worth making any hard assertions about what we get back here.
return false, nil
}
}

// maybeSwapErrorIndex swaps the error index from a to b or b to a if the
// error's index is set and is equal to one of these to values.
func maybeSwapErrorIndex(pErr *roachpb.Error, a, b int) {
Expand Down
134 changes: 131 additions & 3 deletions pkg/kv/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"fmt"
"reflect"
"regexp"
"sort"
"strconv"
"sync/atomic"
Expand Down Expand Up @@ -2191,7 +2192,8 @@ func TestMultiRangeWithEndTransaction(t *testing.T) {
var testFn simpleSendFn = func(
_ context.Context,
_ SendOptions,
_ ReplicaSlice, ba roachpb.BatchRequest,
_ ReplicaSlice,
ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
var cur []roachpb.Method
for _, union := range ba.Requests {
Expand Down Expand Up @@ -2324,7 +2326,8 @@ func TestParallelCommitSplitFromQueryIntents(t *testing.T) {
var testFn simpleSendFn = func(
_ context.Context,
_ SendOptions,
_ ReplicaSlice, ba roachpb.BatchRequest,
_ ReplicaSlice,
ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
var cur []roachpb.Method
for _, union := range ba.Requests {
Expand All @@ -2346,7 +2349,7 @@ func TestParallelCommitSplitFromQueryIntents(t *testing.T) {
ds := NewDistSender(cfg, g)
ds.DisableParallelBatches()

// Send a batch request containing two puts.
// Send a batch request containing the requests.
var ba roachpb.BatchRequest
ba.Txn = &roachpb.Transaction{Name: "test"}
ba.Add(test.reqs...)
Expand All @@ -2364,6 +2367,131 @@ func TestParallelCommitSplitFromQueryIntents(t *testing.T) {
}
}

// TestParallelCommitsDetectIntentMissingCause tests the functionality in
// DistSender.detectIntentMissingDueToIntentResolution.
func TestParallelCommitsDetectIntentMissingCause(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
rpcContext := rpc.NewInsecureTestingContext(clock, stopper)
g := makeGossip(t, stopper, rpcContext)

key := roachpb.Key("a")
txn := roachpb.MakeTransaction(
"test", key, roachpb.NormalUserPriority,
clock.Now(), clock.MaxOffset().Nanoseconds(),
)

testCases := []struct {
name string
queryTxnFn func() (roachpb.TransactionStatus, error)
expErr string
}{
{
name: "transaction record PENDING, real intent missing error",
queryTxnFn: func() (roachpb.TransactionStatus, error) {
return roachpb.PENDING, nil
},
expErr: "the batch experienced mixed success and failure: intent missing",
},
{
name: "transaction record STAGING, real intent missing error",
queryTxnFn: func() (roachpb.TransactionStatus, error) {
return roachpb.STAGING, nil
},
expErr: "the batch experienced mixed success and failure: intent missing",
},
{
name: "transaction record COMMITTED, intent missing error caused by intent resolution",
queryTxnFn: func() (roachpb.TransactionStatus, error) {
return roachpb.COMMITTED, nil
},
},
{
name: "transaction record ABORTED, ambiguous intent missing error",
queryTxnFn: func() (roachpb.TransactionStatus, error) {
return roachpb.ABORTED, nil
},
expErr: "result is ambiguous (intent missing and record aborted)",
},
{
name: "QueryTxn error, unresolved ambiguity",
queryTxnFn: func() (roachpb.TransactionStatus, error) {
return 0, errors.New("unable to query txn")
},
expErr: "result is ambiguous (error=unable to query txn [intent missing])",
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
var testFn simpleSendFn = func(
_ context.Context,
_ SendOptions,
_ ReplicaSlice,
ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
br := ba.CreateReply()
switch ba.Requests[0].GetInner().Method() {
case roachpb.QueryIntent:
br.Error = roachpb.NewError(roachpb.NewIntentMissingError(key, nil))
case roachpb.QueryTxn:
status, err := test.queryTxnFn()
if err != nil {
br.Error = roachpb.NewError(err)
} else {
respTxn := txn
respTxn.Status = status
br.Responses[0].GetQueryTxn().QueriedTxn = respTxn
}
case roachpb.EndTransaction:
br.Txn = ba.Txn.Clone()
br.Txn.Status = roachpb.STAGING
}
return br, nil
}

cfg := DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
Clock: clock,
RPCContext: rpcContext,
TestingKnobs: ClientTestingKnobs{
TransportFactory: adaptSimpleTransport(testFn),
},
RangeDescriptorDB: defaultMockRangeDescriptorDB,
}
ds := NewDistSender(cfg, g)

// Send a parallel commit batch request.
var ba roachpb.BatchRequest
ba.Txn = txn.Clone()
ba.Add(&roachpb.QueryIntentRequest{
RequestHeader: roachpb.RequestHeader{Key: key},
Txn: txn.TxnMeta,
ErrorIfMissing: true,
})
ba.Add(&roachpb.EndTransactionRequest{
RequestHeader: roachpb.RequestHeader{Key: key},
Commit: true,
InFlightWrites: []roachpb.SequencedWrite{{Key: key, Sequence: 1}},
})

// Verify that the response is expected.
_, pErr := ds.Send(context.Background(), ba)
if test.expErr == "" {
if pErr != nil {
t.Fatalf("unexpected error %v", pErr)
}
} else {
if !testutils.IsPError(pErr, regexp.QuoteMeta(test.expErr)) {
t.Fatalf("expected error %q; found %v", test.expErr, pErr)
}
}
})
}
}

func TestCountRanges(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
Expand Down

0 comments on commit a83d408

Please sign in to comment.