From 5bf56969ac7294f893b1b3b8a2d08204026c854c Mon Sep 17 00:00:00 2001 From: Alex Sarkesian Date: Wed, 2 Aug 2023 02:46:00 -0400 Subject: [PATCH] kvcoord: refactor ambiguous commit tests In #107323, testing for the ambiguous write case that leads to the "transaction unexpectedly committed" bug were introduced, however to increase test coverage of the fix, multiple schedules of operations need to be tested. This change simply refactors the framework of the existing test in order to enable the addition of muliple subtests. The subtests are included in a separate patch. Part of: #103817 Release note: None --- .../kvcoord/dist_sender_ambiguous_test.go | 464 ++++++++++-------- 1 file changed, 254 insertions(+), 210 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go index f0abcd437c65..799bbc5fd92c 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go @@ -60,11 +60,28 @@ type interceptedReq struct { } func (req *interceptedReq) String() string { - return fmt.Sprintf("(%s) n%d->n%d:r%d/%d", - req.txnName, req.fromNodeID, req.toNodeID, req.toRangeID, req.toReplicaID, + return fmt.Sprintf("%s(%s) n%d->n%d:r%d/%d", + req.prefix, req.txnName, req.fromNodeID, req.toNodeID, req.toRangeID, req.toReplicaID, ) } +// pauseUntil blocks on untilCh, logging before and after. +func (req *interceptedReq) pauseUntil(t *testing.T, untilCh chan struct{}, cp InterceptPoint) { + t.Logf("%s ‹%s› paused %s", req, req.ba.Summary(), cp) + <-untilCh + t.Logf("%s ‹%s› unpaused", req, req.ba.Summary()) +} + +// pauseUntil blocks until the first of c1/2 is ready, logging before and after. +func (req *interceptedReq) pauseUntilFirst(t *testing.T, c1, c2 chan struct{}, cp InterceptPoint) { + t.Logf("%s ‹%s› paused %s", req, req.ba.Summary(), cp) + select { + case <-c1: + case <-c2: + } + t.Logf("%s ‹%s› unpaused", req, req.ba.Summary()) +} + type interceptedResp struct { br *kvpb.BatchResponse err error @@ -157,6 +174,17 @@ const ( AfterSending ) +func (cp InterceptPoint) String() string { + switch cp { + case BeforeSending: + return "before send" + case AfterSending: + return "after send" + default: + panic(fmt.Sprintf("unknown InterceptPoint: %d", cp)) + } +} + // interceptorHelperMutex represents a convenience structure so that tests or // subtests using the interceptingTransport can organize the interception and // request/response sequencing logic alongside the logic of the test itself. @@ -166,16 +194,23 @@ const ( // modified after the test cluster has started. type interceptorHelperMutex struct { syncutil.RWMutex + interceptorTestConfig +} + +// interceptorTestConfig is the inner shared state of interceptorHelperMutex. +type interceptorTestConfig struct { + *testing.T + + // lastInterceptedOpID is a counter that is to be incremented by each + // request r for which filter(r) returns true, for the purposes of debugging. + // Incremented atomically by concurrent operations holding the read lock. + lastInterceptedOpID int64 // filter defines a function that should return true for requests the test // cares about - this includes logging, blocking, or overriding responses. All // requests that return true should be logged. filter func(req *interceptedReq) (isObservedReq bool) - // willPause defines a function that should return true for the requests that - // will block before or after sending. Intended for logging purposes only. - willPause func(req *interceptedReq) (willBlock bool) - // maybeWait defines a function within which the actual sequencing of requests // and responses can occur, by blocking until conditions are met. If a non-nil // error is returned, it will be returned to the DistSender, otherwise the @@ -183,6 +218,26 @@ type interceptorHelperMutex struct { maybeWait func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) } +// configureSubTest is a utility for the interceptorHelperMutex so that all +// test logging and assertions performed by the interceptor can be tied to the +// particular active subtest. Returns a function to restore the original +// configuration on teardown. +func (tMu *interceptorHelperMutex) configureSubTest(t *testing.T) (restore func()) { + tMu.Lock() + defer tMu.Unlock() + + origConfig := tMu.interceptorTestConfig + restore = func() { + tMu.Lock() + defer tMu.Unlock() + tMu.interceptorTestConfig = origConfig + } + + tMu.T = t + tMu.lastInterceptedOpID = 0 + return restore +} + // TestTransactionUnexpectedlyCommitted validates the handling of the case where // a parallel commit transaction with an ambiguous error on a write races with // a contending transaction's recovery attempt. In the case that the recovery @@ -211,13 +266,17 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // Key constants. tablePrefix := bootstrap.TestingUserTableDataMin() + tableSpan := roachpb.Span{Key: tablePrefix, EndKey: tablePrefix.PrefixEnd()} keyA := roachpb.Key(encoding.EncodeBytesAscending(tablePrefix.Clone(), []byte("a"))) keyB := roachpb.Key(encoding.EncodeBytesAscending(tablePrefix.Clone(), []byte("b"))) // Test synchronization helpers. // Handles all synchronization of KV operations at the transport level. - var tMu interceptorHelperMutex - var interceptedOpID int64 + tMu := interceptorHelperMutex{ + interceptorTestConfig: interceptorTestConfig{ + T: t, + }, + } getInterceptingTransportFactory := func(nID roachpb.NodeID) kvcoord.TransportFactory { return func(options kvcoord.SendOptions, dialer *nodedialer.Dialer, slice kvcoord.ReplicaSlice) (kvcoord.Transport, error) { transport, tErr := kvcoord.GRPCTransportFactory(options, dialer, slice) @@ -229,11 +288,9 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { defer tMu.RUnlock() if tMu.filter != nil && tMu.filter(req) { - opID := atomic.AddInt64(&interceptedOpID, 1) - if tMu.willPause != nil && tMu.willPause(req) { - req.prefix = fmt.Sprintf("[paused %d] ", opID) - } - t.Logf("%s%s batchReq={%s}, meta={%s}", req.prefix, req, req.ba, req.txnMeta) + opID := atomic.AddInt64(&tMu.lastInterceptedOpID, 1) + req.prefix = fmt.Sprintf("[op %d] ", opID) + tMu.Logf("%s batchReq={%s}, meta={%s}", req, req.ba, req.txnMeta) if tMu.maybeWait != nil { err := tMu.maybeWait(BeforeSending, req, nil) @@ -295,7 +352,7 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { }) defer tc.Stopper().Stop(ctx) - requireRangeLease := func(desc roachpb.RangeDescriptor, serverIdx int) { + requireRangeLease := func(t *testing.T, desc roachpb.RangeDescriptor, serverIdx int) { t.Helper() testutils.SucceedsSoon(t, func() error { hint := tc.Target(serverIdx) @@ -342,32 +399,40 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { db := tc.Server(0).DB() - initTest := func() { - // Write initial values, split ranges, and separate leases. - require.NoError(t, db.Put(ctx, keyA, 50)) - require.NoError(t, db.Put(ctx, keyB, 50)) - + // Perform initial range split. + { tc.SplitRangeOrFatal(t, keyA) firstRange, secondRange := tc.SplitRangeOrFatal(t, keyB) t.Logf("first range: %s", firstRange) t.Logf("second range: %s", secondRange) + } - // Separate the leases for each range so they are not on the same node. + initSubTest := func(t *testing.T) (finishSubTest func()) { + restoreAfterSubTest := tMu.configureSubTest(t) + + // Write initial values and separate leases. + require.NoError(t, db.Put(ctx, keyA, 50)) + require.NoError(t, db.Put(ctx, keyB, 50)) + + firstRange := tc.LookupRangeOrFatal(t, keyA) + secondRange := tc.LookupRangeOrFatal(t, keyB) tc.TransferRangeLeaseOrFatal(t, firstRange, tc.Target(0)) - requireRangeLease(firstRange, 0) + requireRangeLease(t, firstRange, 0) tc.TransferRangeLeaseOrFatal(t, secondRange, tc.Target(1)) - requireRangeLease(secondRange, 1) - } + requireRangeLease(t, secondRange, 1) + + return func() { + defer restoreAfterSubTest() - finishTest := func() { - // Dump KVs at end of test for debugging purposes on failure. - if t.Failed() { - scannedKVs, err := db.Scan(ctx, tablePrefix, tablePrefix.PrefixEnd(), 0) - require.NoError(t, err) - for _, scannedKV := range scannedKVs { - mvccValue, err := storage.DecodeMVCCValue(scannedKV.Value.RawBytes) + // Dump KVs at end of test for debugging purposes on failure. + if t.Failed() { + scannedKVs, err := db.Scan(ctx, tablePrefix, tablePrefix.PrefixEnd(), 0) require.NoError(t, err) - t.Logf("key: %s, value: %s", scannedKV.Key, mvccValue) + for _, scannedKV := range scannedKVs { + mvccValue, err := storage.DecodeMVCCValue(scannedKV.Value.RawBytes) + require.NoError(t, err) + t.Logf("key: %s, value: %s", scannedKV.Key, mvccValue) + } } } } @@ -390,23 +455,8 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { return true } - return false - } - tMu.willPause = func(req *interceptedReq) bool { - _, hasPut := req.ba.GetArg(kvpb.Put) - - // txn1's writes to n2 will be paused. - if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() { - return true - } - - // txn1's retried EndTxn will be paused. - if req.txnName == "txn1" && req.ba.IsSingleEndTxnRequest() { - return true - } - - // The recovery operation on txn1 needs to be sequenced specifically. - if req.ba.IsSingleRecoverTxnRequest() { + // Log intent resolution on the key span used in the test. + if riReq, ok := req.ba.GetArg(kvpb.ResolveIntent); ok && tableSpan.ContainsKey(riReq.Header().Key) { return true } @@ -427,68 +477,47 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { WHERE id IN ($1, $2)` const xferAmount = 10 - initTest() - defer finishTest() - - // Checkpoints in test. - txn1Ready := make(chan struct{}) - txn2Ready := make(chan struct{}) - leaseMoveReady := make(chan struct{}) - leaseMoveComplete := make(chan struct{}) - receivedETRetry := make(chan struct{}) - recoverComplete := make(chan struct{}) - txn1Done := make(chan struct{}) - - // Final result. - txn1ResultCh := make(chan error, 1) - - // Concurrent transactions. - var wg sync.WaitGroup - wg.Add(3) - go func() { - defer wg.Done() - defer close(txn1Done) - // Wait until txn1 is ready to start. - select { - case <-txn1Ready: - case <-time.After(succeedsSoonDuration): - t.Logf("txn1 timed out before start") - return - } + execWorkloadTxn := func(t *testing.T, name string) error { tCtx := context.Background() - txn := db.NewTxn(tCtx, "txn1") + txn := db.NewTxn(tCtx, name) vals := getInBatch(tCtx, txn, keyA, keyB) batch := txn.NewBatch() batch.Put(keyA, vals[0]-xferAmount) batch.Put(keyB, vals[1]+xferAmount) - txn1ResultCh <- txn.CommitInBatch(tCtx, batch) - }() - go func() { - defer wg.Done() - // Wait until txn2 is ready to start. + return txn.CommitInBatch(tCtx, batch) + } + + waitUntilReady := func(t *testing.T, name string, readyCh chan struct{}) (finishedWithoutTimeout bool) { select { - case <-txn2Ready: + case <-readyCh: case <-time.After(succeedsSoonDuration): - t.Logf("txn2 timed out before start") + t.Logf("%s timed out before start", name) + return false + } + + return true + } + + runTxn := func(t *testing.T, name string, wg *sync.WaitGroup, readyCh, doneCh chan struct{}, resultCh chan error) { + defer wg.Done() + if doneCh != nil { + defer close(doneCh) + } + if !waitUntilReady(t, name, readyCh) { return } - tCtx := context.Background() - txn := db.NewTxn(tCtx, "txn2") - vals := getInBatch(tCtx, txn, keyA, keyB) + err := execWorkloadTxn(t, name) + if resultCh != nil { + resultCh <- err + } else { + assert.NoError(t, err) + } + } - batch := txn.NewBatch() - batch.Put(keyA, vals[0]-xferAmount) - batch.Put(keyB, vals[1]+xferAmount) - assert.NoError(t, txn.CommitInBatch(tCtx, batch)) - }() - go func() { + runLeaseMover := func(t *testing.T, wg *sync.WaitGroup, readyCh chan struct{}, doneCh chan struct{}) { defer wg.Done() - // Wait until lease move is ready. - select { - case <-leaseMoveReady: - case <-time.After(succeedsSoonDuration): - t.Logf("lease mover timed out before start") + if !waitUntilReady(t, "lease mover", readyCh) { return } @@ -497,125 +526,140 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { t.Logf("Transferring r%d lease to n%d", desc.RangeID, tc.Target(0).NodeID) assert.NoError(t, tc.TransferRangeLease(desc, tc.Target(0))) - close(leaseMoveComplete) - }() - - // KV Request sequencing. - tMu.Lock() - tMu.maybeWait = func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) { - _, hasPut := req.ba.GetArg(kvpb.Put) - - // These conditions are checked in order of expected operations of the test. - - // 1. txn1->n1: Get(a) - // 2. txn1->n2: Get(b) - // 3. txn1->n1: Put(a), EndTxn(parallel commit) -- Puts txn1 in STAGING. - // 4. txn1->n2: Put(b) -- Send the request, but pause before returning the - // response so we can inject network failure. - if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { - // Once we have seen the write on txn1 to n2 that we will fail, txn2 can - // start. - close(txn2Ready) - } + close(doneCh) + } - // 5. txn2->n1: Get(a) -- Discovers txn1's locks, issues push request. - // 6. txn2->n2: Get(b) - // 7. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts - // recovery. - // 8. _->n1: RecoverTxn(txn1) -- Before sending, pause the request so we - // can ensure it gets evaluated after txn1 retries (and refreshes), but - // before its final EndTxn. - if req.ba.IsSingleRecoverTxnRequest() && cp == BeforeSending { - // Once the RecoverTxn request is issued, as part of txn2's PushTxn - // request, the lease can be moved. - close(leaseMoveReady) - } + t.Run("workload conflict", func(t *testing.T) { + defer initSubTest(t)() + + // Checkpoints in test. + txn1Ready := make(chan struct{}) + txn2Ready := make(chan struct{}) + leaseMoveReady := make(chan struct{}) + leaseMoveComplete := make(chan struct{}) + receivedFinalET := make(chan struct{}) + recoverComplete := make(chan struct{}) + txn1Done := make(chan struct{}) + + // Final result. + txn1ResultCh := make(chan error, 1) + + // Concurrent transactions. + var wg sync.WaitGroup + wg.Add(3) + go runTxn(t, "txn1", &wg, txn1Ready, txn1Done, txn1ResultCh) + go runTxn(t, "txn2", &wg, txn2Ready, nil /* doneCh */, nil /* resultCh */) + go runLeaseMover(t, &wg, leaseMoveReady, leaseMoveComplete) + + // KV Request sequencing. + tMu.Lock() + tMu.maybeWait = func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) { + _, hasPut := req.ba.GetArg(kvpb.Put) + + // These conditions are checked in order of expected operations of the + // test. + + // 1. txn1->n1: Get(a) + // 2. txn1->n2: Get(b) + // 3. txn1->n1: Put(a), EndTxn(parallel commit) -- Puts txn1 in STAGING. + // 4. txn1->n2: Put(b) -- Send the request, but pause before returning + // the response so we can inject network failure. + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + // Once we have seen the write on txn1 to n2 that we will fail, txn2 + // can start. + close(txn2Ready) + } - // - // n2: Put(b) to - // return with error> - if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { - // Hold the operation open until we are ready to retry on the new - // replica, after which we will return the injected failure. - <-leaseMoveComplete - t.Logf("%s%s Put op unpaused (injected RPC error)", req.prefix, req) - return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) - } + // 5. txn2->n1: Get(a) -- Discovers txn1's locks, issues push request. + // 6. txn2->n2: Get(b) + // 7. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts + // recovery. + // 8. _->n1: RecoverTxn(txn1) -- Before sending, pause the request so we + // can ensure it gets evaluated after txn1 retries (and refreshes), but + // before its final EndTxn. + if req.ba.IsSingleRecoverTxnRequest() && cp == BeforeSending { + // Once the RecoverTxn request is issued, as part of txn2's PushTxn + // request, the lease can be moved. + close(leaseMoveReady) + } - // -- NB: If ambiguous errors were propagated, txn1 would end here. - // -- NB: When ambiguous errors are not propagated, txn1 continues with: - // - // 9. txn1->n1: Put(b) -- Retry on new leaseholder sees new lease start - // timestamp, and attempts to evaluate it as an idempotent replay, but at a - // higher timestamp, which breaks idempotency due to being on commit. - // 10. txn1->n1: Refresh(a) - // 11. txn1->n1: Refresh(b) - // Note that if these refreshes fail, the transaction coordinator - // would return a retriable error, although the transaction could be - // actually committed during recovery - this is highly problematic. - - // 12. txn1->n1: EndTxn(commit) -- Before sending, pause the request so that - // we can allow (8) RecoverTxn(txn1) to proceed, simulating a race in which - // the recovery wins. - if req.txnName == "txn1" && req.ba.IsSingleEndTxnRequest() && cp == BeforeSending { - close(receivedETRetry) - } + // + // n2: Put(b) to + // return with error> + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + // Hold the operation open until we are ready to retry on the new + // replica, after which we will return the injected failure. + req.pauseUntil(t, leaseMoveComplete, cp) + t.Logf("%s - injected RPC error", req.prefix) + return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) + } - // -- because txn1 - // is in STAGING and has all of its writes, it is implicitly committed, - // so the recovery will succeed in marking it explicitly committed. - if req.ba.IsSingleRecoverTxnRequest() && cp == BeforeSending { - // The RecoverTxn operation must be evaluated after txn1's Refreshes, - // or after txn1 completes with error. - select { - case <-receivedETRetry: - case <-txn1Done: + // -- NB: If ambiguous errors were propagated, txn1 would end here. + // -- NB: When ambiguous errors are not propagated, txn1 continues with: + // + // 9. txn1->n1: Put(b) -- Retry on new leaseholder sees new lease start + // timestamp, and attempts to evaluate it as an idempotent replay, but at + // a higher timestamp, which breaks idempotency due to being on commit. + // 10. txn1->n1: Refresh(a) + // 11. txn1->n1: Refresh(b) + // 12. txn1->n1: EndTxn(commit) -- Before sending, pause the request so + // that we can allow (8) RecoverTxn(txn1) to proceed, simulating a race + // in which the recovery wins. + if req.txnName == "txn1" && req.ba.IsSingleEndTxnRequest() && cp == BeforeSending { + close(receivedFinalET) } - t.Logf("%s%s RecoverTxn op unpaused", req.prefix, req) - } - if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending { - t.Logf("%s RecoverTxn op complete, resp={%s}", req, resp) - close(recoverComplete) - } - // -- Results in - // "transaction unexpectedly committed" due to the recovery completing - // first. - if req.txnName == "txn1" && req.ba.IsSingleEndTxnRequest() && cp == BeforeSending { - <-recoverComplete - t.Logf("%s%s EndTxn op unpaused", req.prefix, req) - } + // -- because txn1 + // is in STAGING and has all of its writes, it is implicitly committed, + // so the recovery will succeed in marking it explicitly committed. + if req.ba.IsSingleRecoverTxnRequest() && cp == BeforeSending { + // The RecoverTxn operation is evaluated after txn1's Refreshes, + // or after txn1 completes with error. + req.pauseUntilFirst(t, receivedFinalET, txn1Done, cp) + } + if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending { + t.Logf("%s - complete, resp={%s}", req.prefix, resp) + close(recoverComplete) + } - // - if req.txnName == "txn2" && hasPut && cp == BeforeSending { - // While txn2's Puts can only occur after txn1 is marked as explicitly - // committed, if the Recovery and the subsequent txn2 Put(b) operations - // happen before txn1 retries its Put(b) on n1, it will encounter txn2's - // intent and get a WriteTooOld error instead of potentially being an - // idempotent replay. - <-txn1Done - } + // -- Results in + // "transaction unexpectedly committed" due to the recovery completing + // first. + if req.txnName == "txn1" && req.ba.IsSingleEndTxnRequest() && cp == BeforeSending { + req.pauseUntil(t, recoverComplete, cp) + } - return nil - } - tMu.Unlock() + // + if req.txnName == "txn2" && hasPut && cp == BeforeSending { + // While txn2's Puts can only occur after txn1 is marked as explicitly + // committed, if the Recovery and the subsequent txn2 Put(b) operations + // happen before txn1 retries its Put(b) on n1, it will encounter txn2's + // intent and get a WriteTooOld error instead of potentially being an + // idempotent replay. + <-txn1Done + } - // Start test, await concurrent operations and validate results. - close(txn1Ready) - err := <-txn1ResultCh - t.Logf("txn1 completed with err: %+v", err) - wg.Wait() - - // TODO(sarkesian): While we expect an AmbiguousResultError once the - // immediate changes outlined in #103817 are implemented, right now this is - // essentially validating the existence of the bug. This needs to be fixed, - // and we should not see an assertion failure from the transaction coordinator - // once fixed. - tErr := (*kvpb.TransactionStatusError)(nil) - require.ErrorAsf(t, err, &tErr, - "expected TransactionStatusError due to being already committed") - require.Equalf(t, kvpb.TransactionStatusError_REASON_TXN_COMMITTED, tErr.Reason, - "expected TransactionStatusError due to being already committed") - require.Truef(t, errors.HasAssertionFailure(err), - "expected AssertionFailedError due to sanity check on transaction already committed") + return nil + } + tMu.Unlock() + + // Start test, await concurrent operations and validate results. + close(txn1Ready) + err := <-txn1ResultCh + t.Logf("txn1 completed with err: %+v", err) + wg.Wait() + + // TODO(sarkesian): While we expect an AmbiguousResultError once the + // immediate changes outlined in #103817 are implemented, right now this is + // essentially validating the existence of the bug. This needs to be fixed, + // and we should not see an assertion failure from the transaction + // coordinator once fixed. + tErr := (*kvpb.TransactionStatusError)(nil) + require.ErrorAsf(t, err, &tErr, + "expected TransactionStatusError due to being already committed") + require.Equalf(t, kvpb.TransactionStatusError_REASON_TXN_COMMITTED, tErr.Reason, + "expected TransactionStatusError due to being already committed") + require.Truef(t, errors.HasAssertionFailure(err), + "expected AssertionFailedError due to sanity check on transaction already committed") + }) }