diff --git a/pkg/kv/integration_test.go b/pkg/kv/integration_test.go index fe7f72c51097..338b17e5821a 100644 --- a/pkg/kv/integration_test.go +++ b/pkg/kv/integration_test.go @@ -113,7 +113,7 @@ func TestWaiterOnRejectedCommit(t *testing.T) { } return 0, nil }, - TxnWait: txnwait.TestingKnobs{ + TxnWaitKnobs: txnwait.TestingKnobs{ OnPusherBlocked: func(ctx context.Context, push *roachpb.PushTxnRequest) { // We'll trap a reader entering the wait queue for our txn. v := txnID.Load() diff --git a/pkg/storage/batcheval/cmd_resolve_intent.go b/pkg/storage/batcheval/cmd_resolve_intent.go index 4d865a06b3f7..96d16184e4e6 100644 --- a/pkg/storage/batcheval/cmd_resolve_intent.go +++ b/pkg/storage/batcheval/cmd_resolve_intent.go @@ -23,7 +23,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/util/uuid" - "github.com/pkg/errors" ) func init() { @@ -81,9 +80,6 @@ func ResolveIntent( if h.Txn != nil { return result.Result{}, ErrTransactionUnsupported } - if args.Status == roachpb.STAGING { - return result.Result{}, errors.Errorf("cannot resolve intent with STAGING status") - } intent := roachpb.Intent{ Span: args.Span(), diff --git a/pkg/storage/batcheval/cmd_resolve_intent_range.go b/pkg/storage/batcheval/cmd_resolve_intent_range.go index 1385fc3fcada..9ff395861bc2 100644 --- a/pkg/storage/batcheval/cmd_resolve_intent_range.go +++ b/pkg/storage/batcheval/cmd_resolve_intent_range.go @@ -21,7 +21,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/spanset" - "github.com/pkg/errors" ) func init() { @@ -46,9 +45,6 @@ func ResolveIntentRange( if h.Txn != nil { return result.Result{}, ErrTransactionUnsupported } - if args.Status == roachpb.STAGING { - return result.Result{}, errors.Errorf("cannot resolve intent with STAGING status") - } intent := roachpb.Intent{ Span: args.Span(), diff --git a/pkg/storage/engine/mvcc.go b/pkg/storage/engine/mvcc.go index d3eecbf07180..4d3e31ae56c9 100644 --- a/pkg/storage/engine/mvcc.go +++ b/pkg/storage/engine/mvcc.go @@ -2312,7 +2312,7 @@ func mvccResolveWriteIntent( // | restart | | // | write@2 | | // | | resolve@1 | - // ============================ + // ============================= // // In this case, if we required the epochs to match, we would not push the // intent forward, and client B would upon retrying after its successful @@ -2323,9 +2323,15 @@ func mvccResolveWriteIntent( // used for resolving), but that costs latency. // TODO(tschottdorf): various epoch-related scenarios here deserve more // testing. - pushed := intent.Status == roachpb.PENDING && - hlc.Timestamp(meta.Timestamp).Less(intent.Txn.Timestamp) && - meta.Txn.Epoch >= intent.Txn.Epoch + inProgress := !intent.Status.IsFinalized() && meta.Txn.Epoch >= intent.Txn.Epoch + pushed := inProgress && hlc.Timestamp(meta.Timestamp).Less(intent.Txn.Timestamp) + + // There's nothing to do if meta's epoch is greater than or equal txn's + // epoch and the state is still in progress but the intent was not pushed + // to a larger timestamp. + if inProgress && !pushed { + return false, nil + } // If we're committing, or if the commit timestamp of the intent has been moved forward, and if // the proposed epoch matches the existing epoch: update the meta.Txn. For commit, it's set to @@ -2420,12 +2426,6 @@ func mvccResolveWriteIntent( // - writer2 dispatches ResolveIntent to key0 (with epoch 0) // - ResolveIntent with epoch 0 aborts intent from epoch 1. - // There's nothing to do if meta's epoch is greater than or equal txn's epoch - // and the state is still PENDING. - if intent.Status == roachpb.PENDING && meta.Txn.Epoch >= intent.Txn.Epoch { - return false, nil - } - // First clear the intent value. latestKey := MVCCKey{Key: intent.Key, Timestamp: hlc.Timestamp(meta.Timestamp)} if err := engine.Clear(latestKey); err != nil { diff --git a/pkg/storage/rangefeed/task.go b/pkg/storage/rangefeed/task.go index 51e9d131fd7b..e9afb4c4e76b 100644 --- a/pkg/storage/rangefeed/task.go +++ b/pkg/storage/rangefeed/task.go @@ -182,16 +182,14 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error { var toCleanup []roachpb.Transaction for i, txn := range pushedTxns { switch txn.Status { - case roachpb.PENDING: - // The transaction is still pending but its timestamp was moved + case roachpb.PENDING, roachpb.STAGING: + // The transaction is still in progress but its timestamp was moved // forward to the current time. Inform the Processor that it can // forward the txn's timestamp in its unresolvedIntentQueue. ops[i].SetValue(&enginepb.MVCCUpdateIntentOp{ TxnID: txn.ID, Timestamp: txn.Timestamp, }) - case roachpb.STAGING: - log.Fatalf(ctx, "unexpected pushed txn with STAGING status: %v", txn) case roachpb.COMMITTED: // The transaction is committed and its timestamp may have moved // forward since we last saw an intent. Inform the Processor diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 28645acd55a8..b9ad7442b335 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -2949,6 +2949,9 @@ func (s *Store) Send( pErr = nil case *roachpb.IndeterminateCommitError: + if s.cfg.TestingKnobs.DontRecoverIndeterminateCommits { + return nil, pErr + } // On an indeterminate commit error, attempt to recover and finalize // the stuck transaction. Retry immediately if successful. if _, err := s.recoveryMgr.ResolveIndeterminateCommit(ctx, t); err != nil { @@ -2980,18 +2983,24 @@ func (s *Store) Send( // Make a copy of the header for the upcoming push; we will update // the timestamp. h := ba.Header - // We must push at least to h.Timestamp, but in fact we want to - // go all the way up to a timestamp which was taken off the HLC - // after our operation started. This allows us to not have to - // restart for uncertainty as we come back and read. - h.Timestamp.Forward(now) - // We are going to hand the header (and thus the transaction proto) - // to the RPC framework, after which it must not be changed (since - // that could race). Since the subsequent execution of the original - // request might mutate the transaction, make a copy here. - // - // See #9130. if h.Txn != nil { + // We must push at least to h.Timestamp, but in fact we want to + // go all the way up to a timestamp which was taken off the HLC + // after our operation started. This allows us to not have to + // restart for uncertainty as we come back and read. + obsTS, ok := h.Txn.GetObservedTimestamp(ba.Replica.NodeID) + if !ok { + // This was set earlier in this method, so it's + // completely unexpected to not be found now. + log.Fatalf(ctx, "missing observed timestamp: %+v", h.Txn) + } + h.Timestamp.Forward(obsTS) + // We are going to hand the header (and thus the transaction proto) + // to the RPC framework, after which it must not be changed (since + // that could race). Since the subsequent execution of the original + // request might mutate the transaction, make a copy here. + // + // See #9130. h.Txn = h.Txn.Clone() } // Handle the case where we get more than one write intent error; @@ -4389,7 +4398,7 @@ func (s *Store) setScannerActive(active bool) { // GetTxnWaitKnobs is part of txnwait.StoreInterface. func (s *Store) GetTxnWaitKnobs() txnwait.TestingKnobs { - return s.TestingKnobs().TxnWait + return s.TestingKnobs().TxnWaitKnobs } // GetTxnWaitMetrics is called by txnwait.Queue instances to get a reference to diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index f14b2225911c..e4fc24834c14 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -68,10 +68,10 @@ var testIdent = roachpb.StoreIdent{ func (s *Store) TestSender() client.Sender { return client.Wrap(s, func(ba roachpb.BatchRequest) roachpb.BatchRequest { - rangeID := roachpb.RangeID(1) if ba.RangeID != 0 { return ba } + // If the client hasn't set ba.Range, we do it a favor and figure out the // range to which the request needs to go. // @@ -83,15 +83,18 @@ func (s *Store) TestSender() client.Sender { log.Fatal(context.TODO(), err) } - visitor := newStoreReplicaVisitor(s) - visitor.Visit(func(repl *Replica) bool { - if repl.Desc().ContainsKeyRange(key, key) { - rangeID = repl.RangeID - return false + ba.RangeID = roachpb.RangeID(1) + if repl := s.LookupReplica(key); repl != nil { + ba.RangeID = repl.RangeID + + // Attempt to assign a Replica descriptor to the batch if + // necessary, but don't throw an error if this fails. + if ba.Replica == (roachpb.ReplicaDescriptor{}) { + if desc, err := repl.GetReplicaDescriptor(); err == nil { + ba.Replica = desc + } } - return true - }) - ba.RangeID = rangeID + } return ba }) } @@ -1646,83 +1649,197 @@ func TestStoreResolveWriteIntentRollback(t *testing.T) { } // TestStoreResolveWriteIntentPushOnRead verifies that resolving a write intent -// for a read will push the timestamp. On failure to push, verify a write -// intent error is returned with !Resolvable. +// for a read will push the timestamp. It tests this along a few dimensions: +// - high-priority pushes vs. low-priority pushes +// - already pushed pushee txns vs. not already pushed pushee txns +// - PENDING pushee txn records vs. STAGING pushee txn records func TestStoreResolveWriteIntentPushOnRead(t *testing.T) { defer leaktest.AfterTest(t)() storeCfg := TestStoreConfig(nil) storeCfg.TestingKnobs.DontRetryPushTxnFailures = true + storeCfg.TestingKnobs.DontRecoverIndeterminateCommits = true stopper := stop.NewStopper() defer stopper.Stop(context.TODO()) store := createTestStoreWithConfig(t, stopper, testStoreOpts{createSystemRanges: true}, &storeCfg) - for i, resolvable := range []bool{true, false} { - key := roachpb.Key(fmt.Sprintf("key-%d", i)) - pusher := newTransaction("test", key, 1, store.cfg.Clock) - pushee := newTransaction("test", key, 1, store.cfg.Clock) - - if resolvable { - pushee.Priority = enginepb.MinTxnPriority - pusher.Priority = enginepb.MaxTxnPriority // Pusher will win. - } else { - pushee.Priority = enginepb.MaxTxnPriority - pusher.Priority = enginepb.MinTxnPriority // Pusher will lose. - } - // First, write original value. + testCases := []struct { + pusherWillWin bool // if true, pusher will have a high enough priority to push the pushee + pusheeAlreadyPushed bool // if true, pushee's timestamp will be set above pusher's target timestamp + pusheeStagingRecord bool // if true, pushee's record is STAGING, otherwise PENDING + expPushError string // regexp pattern to match on run error, if not empty + expPusheeRetry bool // do we expect the pushee to hit a retry error when committing? + }{ { - args := putArgs(key, []byte("value1")) - if _, pErr := client.SendWrapped(context.Background(), store.TestSender(), &args); pErr != nil { - t.Fatal(pErr) + // Insufficient priority to push. + pusherWillWin: false, + pusheeAlreadyPushed: false, + pusheeStagingRecord: false, + expPushError: "failed to push", + expPusheeRetry: false, + }, + { + // Successful push. + pusherWillWin: true, + pusheeAlreadyPushed: false, + pusheeStagingRecord: false, + expPushError: "", + expPusheeRetry: true, + }, + { + // Already pushed, no-op. + pusherWillWin: false, + pusheeAlreadyPushed: true, + pusheeStagingRecord: false, + expPushError: "", + expPusheeRetry: false, + }, + { + // Already pushed, no-op. + pusherWillWin: true, + pusheeAlreadyPushed: true, + pusheeStagingRecord: false, + expPushError: "", + expPusheeRetry: false, + }, + { + // Insufficient priority to push. + pusherWillWin: false, + pusheeAlreadyPushed: false, + pusheeStagingRecord: true, + expPushError: "failed to push", + expPusheeRetry: false, + }, + { + // Cannot push STAGING txn record. + pusherWillWin: true, + pusheeAlreadyPushed: false, + pusheeStagingRecord: true, + expPushError: "found txn in indeterminate STAGING state", + expPusheeRetry: false, + }, + { + // Already pushed the STAGING record, no-op. + pusherWillWin: false, + pusheeAlreadyPushed: true, + pusheeStagingRecord: true, + expPushError: "", + expPusheeRetry: false, + }, + { + // Already pushed the STAGING record, no-op. + pusherWillWin: true, + pusheeAlreadyPushed: true, + pusheeStagingRecord: true, + expPushError: "", + expPusheeRetry: false, + }, + } + for _, tc := range testCases { + name := fmt.Sprintf("pusherWillWin=%t,pusheePushed=%t,pusheeStaging=%t", + tc.pusherWillWin, tc.pusheeAlreadyPushed, tc.pusheeStagingRecord) + t.Run(name, func(t *testing.T) { + ctx := context.Background() + key := roachpb.Key(fmt.Sprintf("key-%s", name)) + pusher := newTransaction("pusher", key, 1, store.cfg.Clock) + pushee := newTransaction("pushee", key, 1, store.cfg.Clock) + + // Set transaction priorities. + if tc.pusherWillWin { + pushee.Priority = enginepb.MinTxnPriority + pusher.Priority = enginepb.MaxTxnPriority // Pusher will win. + } else { + pushee.Priority = enginepb.MaxTxnPriority + pusher.Priority = enginepb.MinTxnPriority // Pusher will lose. } - } - // Second, lay down intent using the pushee's txn. - { - args := putArgs(key, []byte("value2")) - assignSeqNumsForReqs(pushee, &args) - if _, pErr := client.SendWrappedWith( - context.Background(), store.TestSender(), roachpb.Header{Txn: pushee}, &args, - ); pErr != nil { - t.Fatal(pErr) + // First, write original value. + { + args := putArgs(key, []byte("value1")) + if _, pErr := client.SendWrapped(ctx, store.TestSender(), &args); pErr != nil { + t.Fatal(pErr) + } } - } - // Now, try to read value using the pusher's txn. - now := store.Clock().Now() - pusher.OrigTimestamp.Forward(now) - pusher.Timestamp.Forward(now) - gArgs := getArgs(key) - assignSeqNumsForReqs(pusher, &gArgs) - firstReply, pErr := client.SendWrappedWith(context.Background(), store.TestSender(), roachpb.Header{Txn: pusher}, &gArgs) - if resolvable { - if pErr != nil { - t.Errorf("%d: expected read to succeed: %s", i, pErr) - } else if replyBytes, err := firstReply.(*roachpb.GetResponse).Value.GetBytes(); err != nil { - t.Fatal(err) - } else if !bytes.Equal(replyBytes, []byte("value1")) { - t.Errorf("%d: expected bytes to be %q, got %q", i, "value1", replyBytes) + // Second, lay down intent using the pushee's txn. + { + args := putArgs(key, []byte("value2")) + assignSeqNumsForReqs(pushee, &args) + if _, pErr := client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: pushee}, &args); pErr != nil { + t.Fatal(pErr) + } } - // Finally, try to end the pushee's transaction; if we have - // SNAPSHOT isolation, the commit should work: verify the txn - // commit timestamp is greater than pusher's Timestamp. - // Otherwise, verify commit fails with TransactionRetryError. - etArgs, h := endTxnArgs(pushee, true) - assignSeqNumsForReqs(pushee, &etArgs) - _, cErr := client.SendWrappedWith(context.Background(), store.TestSender(), h, &etArgs) - if _, ok := cErr.GetDetail().(*roachpb.TransactionRetryError); !ok { - t.Errorf("expected transaction retry error; got %s", cErr) + // Determine the timestamp to read at. + readTs := store.cfg.Clock.Now() + // Give the pusher a previous observed timestamp equal to this read + // timestamp. This ensures that the pusher doesn't need to push the + // intent any higher just to push it out of its uncertainty window. + pusher.UpdateObservedTimestamp(store.Ident.NodeID, readTs) + + // If the pushee is already pushed, update the transaction record. + if tc.pusheeAlreadyPushed { + pushedTs := store.cfg.Clock.Now() + pushee.Timestamp.Forward(pushedTs) + pushee.RefreshedTimestamp.Forward(pushedTs) + hb, hbH := heartbeatArgs(pushee, store.cfg.Clock.Now()) + if _, pErr := client.SendWrappedWith(ctx, store.TestSender(), hbH, &hb); pErr != nil { + t.Fatal(pErr) + } } - } else { - // Verify we receive a transaction retry error (because we max out - // retries). - if pErr == nil { - t.Errorf("expected read to fail") + + // If the pushee is staging, update the transaction record. + if tc.pusheeStagingRecord { + // TODO(nvanbenschoten): Avoid writing directly to the engine once + // there's a way to create a STAGING transaction record. + txnKey := keys.TransactionKey(pushee.Key, pushee.ID) + txnRecord := pushee.AsRecord() + txnRecord.Status = roachpb.STAGING + if err := engine.MVCCPutProto(ctx, store.Engine(), nil, txnKey, hlc.Timestamp{}, nil, &txnRecord); err != nil { + t.Fatal(err) + } } - if _, ok := pErr.GetDetail().(*roachpb.TransactionPushError); !ok { - t.Errorf("expected transaction push error; got %T", pErr.GetDetail()) + + // Now, try to read value using the pusher's txn. + pusher.OrigTimestamp.Forward(readTs) + pusher.Timestamp.Forward(readTs) + gArgs := getArgs(key) + assignSeqNumsForReqs(pusher, &gArgs) + repl, pErr := client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: pusher}, &gArgs) + if tc.expPushError == "" { + if pErr != nil { + t.Errorf("expected read to succeed: %s", pErr) + } else if replyBytes, err := repl.(*roachpb.GetResponse).Value.GetBytes(); err != nil { + t.Fatal(err) + } else if !bytes.Equal(replyBytes, []byte("value1")) { + t.Errorf("expected bytes to be %q, got %q", "value1", replyBytes) + } + } else { + if !testutils.IsPError(pErr, tc.expPushError) { + t.Fatalf("expected error %q, found %v", tc.expPushError, pErr) + } } - } + + // Finally, try to end the pushee's transaction. Check whether + // the commit succeeds or fails. + etArgs, etH := endTxnArgs(pushee, true) + assignSeqNumsForReqs(pushee, &etArgs) + _, pErr = client.SendWrappedWith(ctx, store.TestSender(), etH, &etArgs) + if tc.pusheeStagingRecord { + // TODO(nvanbenschoten): We don't support committing STAGING + // transaction records yet. This will need to change once we do. + if !testutils.IsPError(pErr, "TransactionStatusError: bad txn status") { + t.Fatal(pErr) + } + } else if tc.expPusheeRetry { + if _, ok := pErr.GetDetail().(*roachpb.TransactionRetryError); !ok { + t.Errorf("expected transaction retry error; got %s", pErr) + } + } else { + if pErr != nil { + t.Fatalf("expected no commit error; got %s", pErr) + } + } + }) } } diff --git a/pkg/storage/testing_knobs.go b/pkg/storage/testing_knobs.go index 6259b0caf075..9fb9fc4ffd2a 100644 --- a/pkg/storage/testing_knobs.go +++ b/pkg/storage/testing_knobs.go @@ -31,8 +31,10 @@ import ( // particular point is reached) or to change the behavior by returning // an error (which aborts all further processing for the command). type StoreTestingKnobs struct { - EvalKnobs storagebase.BatchEvalTestingKnobs - IntentResolverKnobs storagebase.IntentResolverTestingKnobs + EvalKnobs storagebase.BatchEvalTestingKnobs + IntentResolverKnobs storagebase.IntentResolverTestingKnobs + TxnWaitKnobs txnwait.TestingKnobs + ConsistencyTestingKnobs ConsistencyTestingKnobs // TestingRequestFilter is called before evaluating each command on a // replica. The filter is run before the request acquires latches, so @@ -172,14 +174,14 @@ type StoreTestingKnobs struct { SystemLogsGCPeriod time.Duration // SystemLogsGCGCDone is used to notify when system logs GC is done. SystemLogsGCGCDone chan<- struct{} - // TxnWait contains knobs for txnwait.Queue instances. - TxnWait txnwait.TestingKnobs // DontRetryPushTxnFailures will propagate a push txn failure immediately // instead of utilizing the txn wait queue to wait for the transaction to // finish or be pushed by a higher priority contender. DontRetryPushTxnFailures bool - - ConsistencyTestingKnobs ConsistencyTestingKnobs + // DontRecoverIndeterminateCommits will propagate indeterminate commit + // errors from failed txn pushes immediately instead of utilizing the txn + // recovery manager to recovery from the indeterminate state. + DontRecoverIndeterminateCommits bool } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.