Skip to content

Commit

Permalink
kv: bump timestamp cache when resolving replicated locks
Browse files Browse the repository at this point in the history
This patch teaches ResolveIntent and ResolveIntentRange requests to
bump the timestamp cache if any replicated shared/exclusive locks were
resolved by them (if the transaction that held the lock was committed).
In all other cases (only unreplicated locks, no shared or exclusive
locks, or aborted lock holder transaction) the timestamp cache is not
bumped.

The handling of ResolveIntentRange requests deserves some words -- for
these, we choose to bump the timestamp cache over the entire keyspan
they operated over if there's a single replicated {shared, exclusive}
lock. This means we're losing fidelity over specific keys that had point
locks on them; we choose this approach instead of trying to plumb high
fidelity information back up for simplicity and to avoid cases where the
response message to a ResolveIntentRange request is significantly larger
than the request itself. The downside of this is that we bump the timestamp
cache over keys that may not have been locked. In practice, we don't think
this will be any more impactful than the other downsides of ranged intent
resolution, like grabbing a write latch across keys that weren't locked.

Lastly, it's worth noting that `EndTxn` requests also resolve local
locks. As such, any replicated {shared, exclusive} locks resolved by a
EndTxn request also need to be handled in similar fashion. This patch
does not do that -- we leave that to an subsequent patch, at which point
the linked issue can be closed.

Informs cockroachdb#111536

Release note: None
  • Loading branch information
arulajmani authored and nvanbenschoten committed Oct 4, 2023
1 parent 0fbb976 commit 7702ff8
Show file tree
Hide file tree
Showing 6 changed files with 313 additions and 5 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1622,8 +1622,8 @@ func (*QueryIntentRequest) flags() flag {
return isRead | isPrefix | updatesTSCache | updatesTSCacheOnErr
}
func (*QueryLocksRequest) flags() flag { return isRead | isRange }
func (*ResolveIntentRequest) flags() flag { return isWrite }
func (*ResolveIntentRangeRequest) flags() flag { return isWrite | isRange }
func (*ResolveIntentRequest) flags() flag { return isWrite | updatesTSCache }
func (*ResolveIntentRangeRequest) flags() flag { return isWrite | isRange | updatesTSCache }
func (*TruncateLogRequest) flags() flag { return isWrite }
func (*MergeRequest) flags() flag { return isWrite | canBackpressure }
func (*RequestLeaseRequest) flags() flag {
Expand Down
22 changes: 22 additions & 0 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1479,6 +1479,17 @@ message ResolveIntentRequest {
// ResolveIntent() method.
message ResolveIntentResponse {
ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
// ReplicatedLocksReleasedCommitTimestamp, if non-empty, indicates that a
// replicated lock with strength Shared or Exclusive was released by a
// transaction who committed at this timestamp. Notably, this field is left
// unset if only a write intent was resolved. The field is also left unset for
// transactions who aborted.
//
// The caller must bump the timestamp cache across the resolution span to this
// commit timestamp. Doing so ensures that the released lock (acquired by a
// now committed transaction) continues to provide protection against other
// writers up to the commit timestamp, even after the lock has been released.
util.hlc.Timestamp replicated_locks_released_commit_timestamp = 2 [(gogoproto.nullable) = false];
}

// A ResolveIntentRangeRequest is arguments to the ResolveIntentRange() method.
Expand Down Expand Up @@ -1523,6 +1534,17 @@ message ResolveIntentRangeRequest {
// ResolveIntent() method.
message ResolveIntentRangeResponse {
ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
// ReplicatedLocksReleasedCommitTimestamp, if non-empty, indicates that at
// least one replicated lock with strength Shared or Exclusive was released by
// a transaction who committed at this timestamp. Notably, this field is left
// unset if only a write intent was resolved. The field is also left unset for
// transactions who aborted.
//
// The caller must bump the timestamp cache across the resolution span to this
// commit timestamp. Doing so ensures that the released lock (acquired by a
// now committed transaction) continues to provide protection against other
// writers up to the commit timestamp, even after the lock has been released.
util.hlc.Timestamp replicated_locks_released_commit_timestamp = 2 [(gogoproto.nullable) = false];
}

// A MergeRequest contains arguments to the Merge() method. It
Expand Down
15 changes: 13 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_resolve_intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ func ResolveIntent(
// The observation was from the wrong node. Ignore.
update.ClockWhilePending = roachpb.ObservedTimestamp{}
}
ok, numBytes, resumeSpan, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentOptions{TargetBytes: h.TargetBytes})
ok, numBytes, resumeSpan, replLocksReleased, err := storage.MVCCResolveWriteIntent(
ctx, readWriter, ms, update, storage.MVCCResolveWriteIntentOptions{TargetBytes: h.TargetBytes})
if err != nil {
return result.Result{}, err
}
Expand All @@ -114,6 +114,17 @@ func ResolveIntent(
res.Local.ResolvedLocks = []roachpb.LockUpdate{update}
res.Local.Metrics = resolveToMetricType(args.Status, args.Poison)

// Handle replicated lock releases.
if replLocksReleased && update.Status == roachpb.COMMITTED {
// A replicated {shared, exclusive} lock was released for a committed
// transaction. Now that the lock is no longer there, we still need to make
// sure other transactions can't write underneath the transaction's commit
// timestamp to the key. We return the transaction's commit timestamp on the
// response and update the timestamp cache a few layers above to ensure
// this.
reply.ReplicatedLocksReleasedCommitTimestamp = update.Txn.WriteTimestamp
}

if WriteAbortSpanOnResolve(args.Status, args.Poison, ok) {
if err := UpdateAbortSpan(ctx, cArgs.EvalCtx, readWriter, ms, args.IntentTxn, args.Poison); err != nil {
return result.Result{}, err
Expand Down
18 changes: 17 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_resolve_intent_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func ResolveIntentRange(
// The observation was from the wrong node. Ignore.
update.ClockWhilePending = roachpb.ObservedTimestamp{}
}
numKeys, numBytes, resumeSpan, resumeReason, _, err :=
numKeys, numBytes, resumeSpan, resumeReason, replLocksReleased, err :=
storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: h.MaxSpanRequestKeys, TargetBytes: h.TargetBytes},
)
Expand All @@ -79,6 +79,22 @@ func ResolveIntentRange(
res.Local.ResolvedLocks = []roachpb.LockUpdate{update}
res.Local.Metrics = resolveToMetricType(args.Status, args.Poison)

// Handle replicated lock releases.
if replLocksReleased && update.Status == roachpb.COMMITTED {
// A replicated {shared, exclusive} lock was released for a committed
// transaction. Now that the lock is no longer there, we still need to make
// sure other transactions can't write underneath the transaction's commit
// timestamp to the key. We return the transaction's commit timestamp on the
// response and update the timestamp cache a few layers above to ensure
// this.
//
// NB: Doing so will update the timestamp cache over the entire key span the
// request operated over -- we're losing fidelity about which key(s) had
// locks. We could do a better job tracking and plumbing this information
// up, but we choose not to.
reply.ReplicatedLocksReleasedCommitTimestamp = update.Txn.WriteTimestamp
}

if WriteAbortSpanOnResolve(args.Status, args.Poison, numKeys > 0) {
if err := UpdateAbortSpan(ctx, cArgs.EvalCtx, readWriter, ms, args.IntentTxn, args.Poison); err != nil {
return result.Result{}, err
Expand Down
233 changes: 233 additions & 0 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1536,6 +1536,16 @@ func queryLocksArgs(key, endKey []byte, includeUncontended bool) kvpb.QueryLocks
}
}

func resolveIntentArgsString(
s string, txn enginepb.TxnMeta, status roachpb.TransactionStatus,
) kvpb.ResolveIntentRequest {
return kvpb.ResolveIntentRequest{
RequestHeader: kvpb.RequestHeader{Key: roachpb.Key(s)},
IntentTxn: txn,
Status: status,
}
}

func resolveIntentRangeArgsString(
s, e string, txn enginepb.TxnMeta, status roachpb.TransactionStatus,
) *kvpb.ResolveIntentRangeRequest {
Expand Down Expand Up @@ -14311,3 +14321,226 @@ func TestRangeSplitAndRHSRemovalRacesWithFollowerRead(t *testing.T) {
go func() { defer wg.Done(); read() }()
wg.Wait()
}

// TestResolveIntentReplicatedLocksBumsTSCache ensures that performing point
// intent resolution over keys on which we have acquired replicated shared or
// exclusive locks bumps the timestamp cache for those keys if the transaction
// has successfully committed. Otherwise, if the transaction is aborted, intent
// resolution does not bump the timestamp cache.
func TestResolveIntentReplicatedLocksBumpsTSCache(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

newTxn := func(key string, ts hlc.Timestamp) *roachpb.Transaction {
// We're going to use read committed isolation level here, as that's the
// only one which makes sense in the context of this test. That's because
// serializable transactions cannot commit before refreshing their reads,
// and refreshing reads bumps the timestamp cache.
txn := roachpb.MakeTransaction("test", roachpb.Key(key), isolation.ReadCommitted, roachpb.NormalUserPriority, ts, 0, 0, 0)
return &txn
}

run := func(t *testing.T, isCommit, isReplicated bool, str lock.Strength) {
ctx := context.Background()
tc := testContext{}
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
startTime := timeutil.Unix(0, 123)
tc.manualClock = timeutil.NewManualTime(startTime)
sc := TestStoreConfig(hlc.NewClockForTesting(tc.manualClock))
sc.TestingKnobs.DisableCanAckBeforeApplication = true
tc.StartWithStoreConfig(ctx, t, stopper, sc)

// Write some keys at time t0.
t0 := timeutil.Unix(1, 0)
tc.manualClock.MustAdvanceTo(t0)
err := tc.store.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
for _, keyStr := range []string{"a", "b", "c"} {
err := txn.Put(ctx, roachpb.Key(keyStr), "value")
if err != nil {
return err
}
}
return nil
})
require.NoError(t, err)

// Scan [a, e) at t1, acquiring locks as dictated by the test setup. Verify
// the scan result is correct.
t1 := timeutil.Unix(2, 0)
ba := &kvpb.BatchRequest{}
txn := newTxn("a", makeTS(t1.UnixNano(), 0))
ba.Timestamp = makeTS(t1.UnixNano(), 0)
ba.Txn = txn
span := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("e")}
sArgs := scanArgs(span.Key, span.EndKey)
sArgs.KeyLockingStrength = str
sArgs.KeyLockingDurability = lock.Unreplicated
if isReplicated {
sArgs.KeyLockingDurability = lock.Replicated
}
ba.Add(sArgs)
_, pErr := tc.Sender().Send(ctx, ba)
require.Nil(t, pErr)

// Commit the transaction at t2.
t2 := timeutil.Unix(3, 0)
ba = &kvpb.BatchRequest{}
txn.WriteTimestamp = makeTS(t2.UnixNano(), 0)
ba.Txn = txn
et, _ := endTxnArgs(txn, isCommit)
// Omit lock spans so that we can release locks asynchronously, as if
// the locks were on a different range than the txn record.
et.LockSpans = nil
ba.Add(&et)
_, pErr = tc.Sender().Send(ctx, ba)
require.Nil(t, pErr)

status := roachpb.ABORTED
if isCommit {
status = roachpb.COMMITTED
}

// Perform point intent resolution.
ba = &kvpb.BatchRequest{}
for _, keyStr := range []string{"a", "b", "c"} {
resolveIntentArgs := resolveIntentArgsString(keyStr, txn.TxnMeta, status)
ba.Add(&resolveIntentArgs)
}
_, pErr = tc.Sender().Send(ctx, ba)
require.Nil(t, pErr)

expectedRTS := makeTS(t1.UnixNano(), 0) // we scanned at t1 over [a, e)
if isCommit && isReplicated {
expectedRTS = makeTS(t2.UnixNano(), 0)
}

rTS, _ := tc.store.tsCache.GetMax(roachpb.Key("a"), nil)
require.Equal(t, expectedRTS, rTS)
rTS, _ = tc.store.tsCache.GetMax(roachpb.Key("b"), nil)
require.Equal(t, expectedRTS, rTS)
rTS, _ = tc.store.tsCache.GetMax(roachpb.Key("c"), nil)
require.Equal(t, expectedRTS, rTS)
rTS, _ = tc.store.tsCache.GetMax(roachpb.Key("d"), nil)
require.Equal(t, makeTS(t1.UnixNano(), 0), rTS) // we scanned at t1 over [a, e)
}

testutils.RunTrueAndFalse(t, "isCommit", func(t *testing.T, isCommit bool) {
testutils.RunTrueAndFalse(t, "isReplicated", func(t *testing.T, isReplicated bool) {
for _, str := range []lock.Strength{lock.Shared, lock.Exclusive} {
t.Run(str.String(), func(t *testing.T) {
run(t, isCommit, isReplicated, str)
})
}
})
})
}

// TestResolveIntentRangeReplicatedLocksBumpsTSCache is like
// TestResolveIntentReplicatedLocksBumpsTSCache, except it tests
// ResolveIntentRange requests instead of ResolveIntent requests. As a result,
// we assert that the timestamp cache is bumped over the entire range specified
// in the ResolveIntentRangeRequest, not just the point keys that had locks on
// them.
func TestResolveIntentRangeReplicatedLocksBumpsTSCache(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

newTxn := func(key string, ts hlc.Timestamp) *roachpb.Transaction {
// We're going to use read committed isolation level here, as that's the
// only one which makes sense in the context of this test. That's because
// serializable transactions cannot commit before refreshing their reads,
// and refreshing reads bumps the timestamp cache.
txn := roachpb.MakeTransaction("test", roachpb.Key(key), isolation.ReadCommitted, roachpb.NormalUserPriority, ts, 0, 0, 0)
return &txn
}

run := func(t *testing.T, isCommit, isReplicated bool, str lock.Strength) {
ctx := context.Background()
tc := testContext{}
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
startTime := timeutil.Unix(0, 123)
tc.manualClock = timeutil.NewManualTime(startTime)
sc := TestStoreConfig(hlc.NewClockForTesting(tc.manualClock))
sc.TestingKnobs.DisableCanAckBeforeApplication = true
tc.StartWithStoreConfig(ctx, t, stopper, sc)

// Write some keys at time t0.
t0 := timeutil.Unix(1, 0)
tc.manualClock.MustAdvanceTo(t0)
err := tc.store.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
for _, keyStr := range []string{"a", "b", "c"} {
err := txn.Put(ctx, roachpb.Key(keyStr), "value")
if err != nil {
return err
}
}
return nil
})
require.NoError(t, err)

// Scan [a, e) at t1, acquiring locks as dictated by the test setup. Verify
// the scan result is correct.
t1 := timeutil.Unix(2, 0)
ba := &kvpb.BatchRequest{}
txn := newTxn("a", makeTS(t1.UnixNano(), 0))
ba.Timestamp = makeTS(t1.UnixNano(), 0)
ba.Txn = txn
span := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("e")}
sArgs := scanArgs(span.Key, span.EndKey)
sArgs.KeyLockingStrength = str
sArgs.KeyLockingDurability = lock.Unreplicated
if isReplicated {
sArgs.KeyLockingDurability = lock.Replicated
}
ba.Add(sArgs)
_, pErr := tc.Sender().Send(ctx, ba)
require.Nil(t, pErr)

// Commit the transaction at t2.
t2 := timeutil.Unix(3, 0)
ba = &kvpb.BatchRequest{}
txn.WriteTimestamp = makeTS(t2.UnixNano(), 0)
ba.Txn = txn
et, _ := endTxnArgs(txn, isCommit)
// Omit lock spans so that we can release locks asynchronously, as if
// the locks were on a different range than the txn record.
et.LockSpans = nil
ba.Add(&et)
_, pErr = tc.Sender().Send(ctx, ba)
require.Nil(t, pErr)

status := roachpb.ABORTED
if isCommit {
status = roachpb.COMMITTED
}

// Perform point intent resolution.
ba = &kvpb.BatchRequest{}
resolveIntentRangeArgs := resolveIntentRangeArgsString("a", "e", txn.TxnMeta, status)
ba.Add(resolveIntentRangeArgs)
_, pErr = tc.Sender().Send(ctx, ba)
require.Nil(t, pErr)

expectedRTS := makeTS(t1.UnixNano(), 0) // we scanned at t1 over [a, e)
if isCommit && isReplicated {
expectedRTS = makeTS(t2.UnixNano(), 0)
}

for _, keyStr := range []string{"a", "b", "c", "d"} {
rTS, _ := tc.store.tsCache.GetMax(roachpb.Key(keyStr), nil)
require.Equal(t, expectedRTS, rTS)
}
}

testutils.RunTrueAndFalse(t, "isCommit", func(t *testing.T, isCommit bool) {
testutils.RunTrueAndFalse(t, "isReplicated", func(t *testing.T, isReplicated bool) {
for _, str := range []lock.Strength{lock.Shared, lock.Exclusive} {
t.Run(str.String(), func(t *testing.T) {
run(t, isCommit, isReplicated, str)
})
}
})
})
}
26 changes: 26 additions & 0 deletions pkg/kv/kvserver/replica_tscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,32 @@ func (r *Replica) updateTimestampCache(
// transaction or not.
addToTSCache(start, end, t.Txn.WriteTimestamp, uuid.UUID{})
}
case *kvpb.ResolveIntentRequest:
// Update the timestamp cache on the key the request resolved if there
// was a replicated {shared, exclusive} lock on that key which was
// released, and the transaction that has acquired that lock was
// successfully committed[1].
//
// [1] This is indicated by releasedTs being non-empty.
releasedTs := resp.(*kvpb.ResolveIntentResponse).ReplicatedLocksReleasedCommitTimestamp
if !releasedTs.IsEmpty() {
addToTSCache(start, end, releasedTs, txnID)
}
case *kvpb.ResolveIntentRangeRequest:
// Update the timestamp cache over the entire span the request operated
// over if there was at least one replicated {shared,exclusive} lock
// that was released as part of committing the transaction[1].
//
// NB: It's not strictly required that we bump the timestamp cache over
// the entire span on which the request operated; we could instead return
// information about specific point {shared, exclusive} replicated locks
// and only bump the timestamp cache over those keys -- we choose not to.
//
// [1] Indicated by releasedTs being non-empty.
releasedTs := resp.(*kvpb.ResolveIntentRangeResponse).ReplicatedLocksReleasedCommitTimestamp
if !releasedTs.IsEmpty() {
addToTSCache(start, end, releasedTs, txnID)
}
default:
addToTSCache(start, end, ts, txnID)
}
Expand Down

0 comments on commit 7702ff8

Please sign in to comment.