diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index f8efbc409895..2fd1b9cf1060 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -1622,8 +1622,8 @@ func (*QueryIntentRequest) flags() flag { return isRead | isPrefix | updatesTSCache | updatesTSCacheOnErr } func (*QueryLocksRequest) flags() flag { return isRead | isRange } -func (*ResolveIntentRequest) flags() flag { return isWrite } -func (*ResolveIntentRangeRequest) flags() flag { return isWrite | isRange } +func (*ResolveIntentRequest) flags() flag { return isWrite | updatesTSCache } +func (*ResolveIntentRangeRequest) flags() flag { return isWrite | isRange | updatesTSCache } func (*TruncateLogRequest) flags() flag { return isWrite } func (*MergeRequest) flags() flag { return isWrite | canBackpressure } func (*RequestLeaseRequest) flags() flag { diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index 48badf5e9595..695f9e102375 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -1479,6 +1479,17 @@ message ResolveIntentRequest { // ResolveIntent() method. message ResolveIntentResponse { ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; + // ReplicatedLocksReleasedCommitTimestamp, if non-empty, indicates that a + // replicated lock with strength Shared or Exclusive was released by a + // transaction who committed at this timestamp. Notably, this field is left + // unset if only a write intent was resolved. The field is also left unset for + // transactions who aborted. + // + // The caller must bump the timestamp cache across the resolution span to this + // commit timestamp. Doing so ensures that the released lock (acquired by a + // now committed transaction) continues to provide protection against other + // writers up to the commit timestamp, even after the lock has been released. + util.hlc.Timestamp replicated_locks_released_commit_timestamp = 2 [(gogoproto.nullable) = false]; } // A ResolveIntentRangeRequest is arguments to the ResolveIntentRange() method. @@ -1523,6 +1534,17 @@ message ResolveIntentRangeRequest { // ResolveIntent() method. message ResolveIntentRangeResponse { ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; + // ReplicatedLocksReleasedCommitTimestamp, if non-empty, indicates that at + // least one replicated lock with strength Shared or Exclusive was released by + // a transaction who committed at this timestamp. Notably, this field is left + // unset if only a write intent was resolved. The field is also left unset for + // transactions who aborted. + // + // The caller must bump the timestamp cache across the resolution span to this + // commit timestamp. Doing so ensures that the released lock (acquired by a + // now committed transaction) continues to provide protection against other + // writers up to the commit timestamp, even after the lock has been released. + util.hlc.Timestamp replicated_locks_released_commit_timestamp = 2 [(gogoproto.nullable) = false]; } // A MergeRequest contains arguments to the Merge() method. It diff --git a/pkg/kv/kvserver/batcheval/cmd_resolve_intent.go b/pkg/kv/kvserver/batcheval/cmd_resolve_intent.go index 76e968f778aa..42a17f8d543c 100644 --- a/pkg/kv/kvserver/batcheval/cmd_resolve_intent.go +++ b/pkg/kv/kvserver/batcheval/cmd_resolve_intent.go @@ -97,8 +97,8 @@ func ResolveIntent( // The observation was from the wrong node. Ignore. update.ClockWhilePending = roachpb.ObservedTimestamp{} } - ok, numBytes, resumeSpan, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update, - storage.MVCCResolveWriteIntentOptions{TargetBytes: h.TargetBytes}) + ok, numBytes, resumeSpan, replLocksReleased, err := storage.MVCCResolveWriteIntent( + ctx, readWriter, ms, update, storage.MVCCResolveWriteIntentOptions{TargetBytes: h.TargetBytes}) if err != nil { return result.Result{}, err } @@ -114,6 +114,17 @@ func ResolveIntent( res.Local.ResolvedLocks = []roachpb.LockUpdate{update} res.Local.Metrics = resolveToMetricType(args.Status, args.Poison) + // Handle replicated lock releases. + if replLocksReleased && update.Status == roachpb.COMMITTED { + // A replicated {shared, exclusive} lock was released for a committed + // transaction. Now that the lock is no longer there, we still need to make + // sure other transactions can't write underneath the transaction's commit + // timestamp to the key. We return the transaction's commit timestamp on the + // response and update the timestamp cache a few layers above to ensure + // this. + reply.ReplicatedLocksReleasedCommitTimestamp = update.Txn.WriteTimestamp + } + if WriteAbortSpanOnResolve(args.Status, args.Poison, ok) { if err := UpdateAbortSpan(ctx, cArgs.EvalCtx, readWriter, ms, args.IntentTxn, args.Poison); err != nil { return result.Result{}, err diff --git a/pkg/kv/kvserver/batcheval/cmd_resolve_intent_range.go b/pkg/kv/kvserver/batcheval/cmd_resolve_intent_range.go index 35383123e6cb..0e3eafb3fa93 100644 --- a/pkg/kv/kvserver/batcheval/cmd_resolve_intent_range.go +++ b/pkg/kv/kvserver/batcheval/cmd_resolve_intent_range.go @@ -55,7 +55,7 @@ func ResolveIntentRange( // The observation was from the wrong node. Ignore. update.ClockWhilePending = roachpb.ObservedTimestamp{} } - numKeys, numBytes, resumeSpan, resumeReason, _, err := + numKeys, numBytes, resumeSpan, resumeReason, replLocksReleased, err := storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update, storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: h.MaxSpanRequestKeys, TargetBytes: h.TargetBytes}, ) @@ -79,6 +79,22 @@ func ResolveIntentRange( res.Local.ResolvedLocks = []roachpb.LockUpdate{update} res.Local.Metrics = resolveToMetricType(args.Status, args.Poison) + // Handle replicated lock releases. + if replLocksReleased && update.Status == roachpb.COMMITTED { + // A replicated {shared, exclusive} lock was released for a committed + // transaction. Now that the lock is no longer there, we still need to make + // sure other transactions can't write underneath the transaction's commit + // timestamp to the key. We return the transaction's commit timestamp on the + // response and update the timestamp cache a few layers above to ensure + // this. + // + // NB: Doing so will update the timestamp cache over the entire key span the + // request operated over -- we're losing fidelity about which key(s) had + // locks. We could do a better job tracking and plumbing this information + // up, but we choose not to. + reply.ReplicatedLocksReleasedCommitTimestamp = update.Txn.WriteTimestamp + } + if WriteAbortSpanOnResolve(args.Status, args.Poison, numKeys > 0) { if err := UpdateAbortSpan(ctx, cArgs.EvalCtx, readWriter, ms, args.IntentTxn, args.Poison); err != nil { return result.Result{}, err diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index d71d5216ac7f..d821615fe4e3 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -1536,6 +1536,16 @@ func queryLocksArgs(key, endKey []byte, includeUncontended bool) kvpb.QueryLocks } } +func resolveIntentArgsString( + s string, txn enginepb.TxnMeta, status roachpb.TransactionStatus, +) kvpb.ResolveIntentRequest { + return kvpb.ResolveIntentRequest{ + RequestHeader: kvpb.RequestHeader{Key: roachpb.Key(s)}, + IntentTxn: txn, + Status: status, + } +} + func resolveIntentRangeArgsString( s, e string, txn enginepb.TxnMeta, status roachpb.TransactionStatus, ) *kvpb.ResolveIntentRangeRequest { @@ -14311,3 +14321,226 @@ func TestRangeSplitAndRHSRemovalRacesWithFollowerRead(t *testing.T) { go func() { defer wg.Done(); read() }() wg.Wait() } + +// TestResolveIntentReplicatedLocksBumsTSCache ensures that performing point +// intent resolution over keys on which we have acquired replicated shared or +// exclusive locks bumps the timestamp cache for those keys if the transaction +// has successfully committed. Otherwise, if the transaction is aborted, intent +// resolution does not bump the timestamp cache. +func TestResolveIntentReplicatedLocksBumpsTSCache(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + newTxn := func(key string, ts hlc.Timestamp) *roachpb.Transaction { + // We're going to use read committed isolation level here, as that's the + // only one which makes sense in the context of this test. That's because + // serializable transactions cannot commit before refreshing their reads, + // and refreshing reads bumps the timestamp cache. + txn := roachpb.MakeTransaction("test", roachpb.Key(key), isolation.ReadCommitted, roachpb.NormalUserPriority, ts, 0, 0, 0) + return &txn + } + + run := func(t *testing.T, isCommit, isReplicated bool, str lock.Strength) { + ctx := context.Background() + tc := testContext{} + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + startTime := timeutil.Unix(0, 123) + tc.manualClock = timeutil.NewManualTime(startTime) + sc := TestStoreConfig(hlc.NewClockForTesting(tc.manualClock)) + sc.TestingKnobs.DisableCanAckBeforeApplication = true + tc.StartWithStoreConfig(ctx, t, stopper, sc) + + // Write some keys at time t0. + t0 := timeutil.Unix(1, 0) + tc.manualClock.MustAdvanceTo(t0) + err := tc.store.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + for _, keyStr := range []string{"a", "b", "c"} { + err := txn.Put(ctx, roachpb.Key(keyStr), "value") + if err != nil { + return err + } + } + return nil + }) + require.NoError(t, err) + + // Scan [a, e) at t1, acquiring locks as dictated by the test setup. Verify + // the scan result is correct. + t1 := timeutil.Unix(2, 0) + ba := &kvpb.BatchRequest{} + txn := newTxn("a", makeTS(t1.UnixNano(), 0)) + ba.Timestamp = makeTS(t1.UnixNano(), 0) + ba.Txn = txn + span := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("e")} + sArgs := scanArgs(span.Key, span.EndKey) + sArgs.KeyLockingStrength = str + sArgs.KeyLockingDurability = lock.Unreplicated + if isReplicated { + sArgs.KeyLockingDurability = lock.Replicated + } + ba.Add(sArgs) + _, pErr := tc.Sender().Send(ctx, ba) + require.Nil(t, pErr) + + // Commit the transaction at t2. + t2 := timeutil.Unix(3, 0) + ba = &kvpb.BatchRequest{} + txn.WriteTimestamp = makeTS(t2.UnixNano(), 0) + ba.Txn = txn + et, _ := endTxnArgs(txn, isCommit) + // Omit lock spans so that we can release locks asynchronously, as if + // the locks were on a different range than the txn record. + et.LockSpans = nil + ba.Add(&et) + _, pErr = tc.Sender().Send(ctx, ba) + require.Nil(t, pErr) + + status := roachpb.ABORTED + if isCommit { + status = roachpb.COMMITTED + } + + // Perform point intent resolution. + ba = &kvpb.BatchRequest{} + for _, keyStr := range []string{"a", "b", "c"} { + resolveIntentArgs := resolveIntentArgsString(keyStr, txn.TxnMeta, status) + ba.Add(&resolveIntentArgs) + } + _, pErr = tc.Sender().Send(ctx, ba) + require.Nil(t, pErr) + + expectedRTS := makeTS(t1.UnixNano(), 0) // we scanned at t1 over [a, e) + if isCommit && isReplicated { + expectedRTS = makeTS(t2.UnixNano(), 0) + } + + rTS, _ := tc.store.tsCache.GetMax(roachpb.Key("a"), nil) + require.Equal(t, expectedRTS, rTS) + rTS, _ = tc.store.tsCache.GetMax(roachpb.Key("b"), nil) + require.Equal(t, expectedRTS, rTS) + rTS, _ = tc.store.tsCache.GetMax(roachpb.Key("c"), nil) + require.Equal(t, expectedRTS, rTS) + rTS, _ = tc.store.tsCache.GetMax(roachpb.Key("d"), nil) + require.Equal(t, makeTS(t1.UnixNano(), 0), rTS) // we scanned at t1 over [a, e) + } + + testutils.RunTrueAndFalse(t, "isCommit", func(t *testing.T, isCommit bool) { + testutils.RunTrueAndFalse(t, "isReplicated", func(t *testing.T, isReplicated bool) { + for _, str := range []lock.Strength{lock.Shared, lock.Exclusive} { + t.Run(str.String(), func(t *testing.T) { + run(t, isCommit, isReplicated, str) + }) + } + }) + }) +} + +// TestResolveIntentRangeReplicatedLocksBumpsTSCache is like +// TestResolveIntentReplicatedLocksBumpsTSCache, except it tests +// ResolveIntentRange requests instead of ResolveIntent requests. As a result, +// we assert that the timestamp cache is bumped over the entire range specified +// in the ResolveIntentRangeRequest, not just the point keys that had locks on +// them. +func TestResolveIntentRangeReplicatedLocksBumpsTSCache(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + newTxn := func(key string, ts hlc.Timestamp) *roachpb.Transaction { + // We're going to use read committed isolation level here, as that's the + // only one which makes sense in the context of this test. That's because + // serializable transactions cannot commit before refreshing their reads, + // and refreshing reads bumps the timestamp cache. + txn := roachpb.MakeTransaction("test", roachpb.Key(key), isolation.ReadCommitted, roachpb.NormalUserPriority, ts, 0, 0, 0) + return &txn + } + + run := func(t *testing.T, isCommit, isReplicated bool, str lock.Strength) { + ctx := context.Background() + tc := testContext{} + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + startTime := timeutil.Unix(0, 123) + tc.manualClock = timeutil.NewManualTime(startTime) + sc := TestStoreConfig(hlc.NewClockForTesting(tc.manualClock)) + sc.TestingKnobs.DisableCanAckBeforeApplication = true + tc.StartWithStoreConfig(ctx, t, stopper, sc) + + // Write some keys at time t0. + t0 := timeutil.Unix(1, 0) + tc.manualClock.MustAdvanceTo(t0) + err := tc.store.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + for _, keyStr := range []string{"a", "b", "c"} { + err := txn.Put(ctx, roachpb.Key(keyStr), "value") + if err != nil { + return err + } + } + return nil + }) + require.NoError(t, err) + + // Scan [a, e) at t1, acquiring locks as dictated by the test setup. Verify + // the scan result is correct. + t1 := timeutil.Unix(2, 0) + ba := &kvpb.BatchRequest{} + txn := newTxn("a", makeTS(t1.UnixNano(), 0)) + ba.Timestamp = makeTS(t1.UnixNano(), 0) + ba.Txn = txn + span := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("e")} + sArgs := scanArgs(span.Key, span.EndKey) + sArgs.KeyLockingStrength = str + sArgs.KeyLockingDurability = lock.Unreplicated + if isReplicated { + sArgs.KeyLockingDurability = lock.Replicated + } + ba.Add(sArgs) + _, pErr := tc.Sender().Send(ctx, ba) + require.Nil(t, pErr) + + // Commit the transaction at t2. + t2 := timeutil.Unix(3, 0) + ba = &kvpb.BatchRequest{} + txn.WriteTimestamp = makeTS(t2.UnixNano(), 0) + ba.Txn = txn + et, _ := endTxnArgs(txn, isCommit) + // Omit lock spans so that we can release locks asynchronously, as if + // the locks were on a different range than the txn record. + et.LockSpans = nil + ba.Add(&et) + _, pErr = tc.Sender().Send(ctx, ba) + require.Nil(t, pErr) + + status := roachpb.ABORTED + if isCommit { + status = roachpb.COMMITTED + } + + // Perform point intent resolution. + ba = &kvpb.BatchRequest{} + resolveIntentRangeArgs := resolveIntentRangeArgsString("a", "e", txn.TxnMeta, status) + ba.Add(resolveIntentRangeArgs) + _, pErr = tc.Sender().Send(ctx, ba) + require.Nil(t, pErr) + + expectedRTS := makeTS(t1.UnixNano(), 0) // we scanned at t1 over [a, e) + if isCommit && isReplicated { + expectedRTS = makeTS(t2.UnixNano(), 0) + } + + for _, keyStr := range []string{"a", "b", "c", "d"} { + rTS, _ := tc.store.tsCache.GetMax(roachpb.Key(keyStr), nil) + require.Equal(t, expectedRTS, rTS) + } + } + + testutils.RunTrueAndFalse(t, "isCommit", func(t *testing.T, isCommit bool) { + testutils.RunTrueAndFalse(t, "isReplicated", func(t *testing.T, isReplicated bool) { + for _, str := range []lock.Strength{lock.Shared, lock.Exclusive} { + t.Run(str.String(), func(t *testing.T) { + run(t, isCommit, isReplicated, str) + }) + } + }) + }) +} diff --git a/pkg/kv/kvserver/replica_tscache.go b/pkg/kv/kvserver/replica_tscache.go index 7f755e3c11bd..1bbde283e32d 100644 --- a/pkg/kv/kvserver/replica_tscache.go +++ b/pkg/kv/kvserver/replica_tscache.go @@ -277,6 +277,32 @@ func (r *Replica) updateTimestampCache( // transaction or not. addToTSCache(start, end, t.Txn.WriteTimestamp, uuid.UUID{}) } + case *kvpb.ResolveIntentRequest: + // Update the timestamp cache on the key the request resolved if there + // was a replicated {shared, exclusive} lock on that key which was + // released, and the transaction that has acquired that lock was + // successfully committed[1]. + // + // [1] This is indicated by releasedTs being non-empty. + releasedTs := resp.(*kvpb.ResolveIntentResponse).ReplicatedLocksReleasedCommitTimestamp + if !releasedTs.IsEmpty() { + addToTSCache(start, end, releasedTs, txnID) + } + case *kvpb.ResolveIntentRangeRequest: + // Update the timestamp cache over the entire span the request operated + // over if there was at least one replicated {shared,exclusive} lock + // that was released as part of committing the transaction[1]. + // + // NB: It's not strictly required that we bump the timestamp cache over + // the entire span on which the request operated; we could instead return + // information about specific point {shared, exclusive} replicated locks + // and only bump the timestamp cache over those keys -- we choose not to. + // + // [1] Indicated by releasedTs being non-empty. + releasedTs := resp.(*kvpb.ResolveIntentRangeResponse).ReplicatedLocksReleasedCommitTimestamp + if !releasedTs.IsEmpty() { + addToTSCache(start, end, releasedTs, txnID) + } default: addToTSCache(start, end, ts, txnID) }