Skip to content

Commit

Permalink
kv: bump timestamp cache when releasing replicated locks synchronously
Browse files Browse the repository at this point in the history
Fixes #111536.

This commit is the second half of #111546. Whereas that commit taught
intent/lock resolution to bump the timestamp cache when releasing
replicated locks for committed transactions asynchronously through
ResolveIntent and ResolveIntentRange requests, this commit teaches
intent/lock resolution to bump the timestamp cache when releasing
replicated locks for committed transactions synchronously through EndTxn
requests.

The interface changes here look slightly different than the ones in that
commit. Instead of attaching the commit timestamp to the response proto,
we attach the spans over which replicated locks were released. This is
because the commit timestamp was already present on the response, but
the request may have resolved multiple lock spans.

Release note: None
  • Loading branch information
nvanbenschoten committed Oct 6, 2023
1 parent b28fa0d commit 0e6de2b
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 22 deletions.
16 changes: 14 additions & 2 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,18 @@ message EndTxnResponse {
// The commit timestamp of the STAGING transaction record written
// by the request. Only set if the transaction record was staged.
util.hlc.Timestamp staging_timestamp = 5 [(gogoproto.nullable) = false];
// ReplicatedLocksReleasedOnCommit, if non-empty, indicate that replicated
// locks with strength Shared or Exclusive were released in the referenced key
// spans when committing this transaction. Notably, this field is left unset
// if only write intents were resolved. The field is also left unset for
// transactions that aborted.
//
// The caller must bump the timestamp cache across these spans to the
// transaction's commit timestamp. Doing so ensures that the released locks
// (acquired by the now committed transaction) continue to provide protection
// against other writers up to the commit timestamp, even after the locks have
// been released.
repeated Span replicated_locks_released_on_commit = 6 [(gogoproto.nullable) = false];
}

// An AdminSplitRequest is the argument to the AdminSplit() method. The
Expand Down Expand Up @@ -1483,7 +1495,7 @@ message ResolveIntentResponse {
// replicated lock with strength Shared or Exclusive was released by a
// transaction who committed at this timestamp. Notably, this field is left
// unset if only a write intent was resolved. The field is also left unset for
// transactions who aborted.
// transactions that aborted.
//
// The caller must bump the timestamp cache across the resolution span to this
// commit timestamp. Doing so ensures that the released lock (acquired by a
Expand Down Expand Up @@ -1538,7 +1550,7 @@ message ResolveIntentRangeResponse {
// least one replicated lock with strength Shared or Exclusive was released by
// a transaction who committed at this timestamp. Notably, this field is left
// unset if only a write intent was resolved. The field is also left unset for
// transactions who aborted.
// transactions that aborted.
//
// The caller must bump the timestamp cache across the resolution span to this
// commit timestamp. Doing so ensures that the released lock (acquired by a
Expand Down
35 changes: 25 additions & 10 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func EndTxn(
// The transaction has already been aborted by other.
// Do not return TransactionAbortedError since the client anyway
// wanted to abort the transaction.
resolvedLocks, externalLocks, err := resolveLocalLocks(ctx, readWriter, cArgs.EvalCtx, ms, args, reply.Txn)
resolvedLocks, _, externalLocks, err := resolveLocalLocks(ctx, readWriter, cArgs.EvalCtx, ms, args, reply.Txn)
if err != nil {
return result.Result{}, err
}
Expand Down Expand Up @@ -436,7 +436,8 @@ func EndTxn(
// we position the transaction record next to the first write of a transaction.
// This avoids the need for the intentResolver to have to return to this range
// to resolve locks for this transaction in the future.
resolvedLocks, externalLocks, err := resolveLocalLocks(ctx, readWriter, cArgs.EvalCtx, ms, args, reply.Txn)
resolvedLocks, releasedReplLocks, externalLocks, err := resolveLocalLocks(
ctx, readWriter, cArgs.EvalCtx, ms, args, reply.Txn)
if err != nil {
return result.Result{}, err
}
Expand Down Expand Up @@ -472,8 +473,16 @@ func EndTxn(
txnResult.Local.UpdatedTxns = []*roachpb.Transaction{reply.Txn}
txnResult.Local.ResolvedLocks = resolvedLocks

// Run the commit triggers if successfully committed.
if reply.Txn.Status == roachpb.COMMITTED {
// Return whether replicated {shared, exclusive} locks were released by
// the committing transaction. If such locks were released, we still
// need to make sure other transactions can't write underneath the
// transaction's commit timestamp to the key spans previously protected
// by the locks. We return the spans on the response and update the
// timestamp cache a few layers above to ensure this.
reply.ReplicatedLocksReleasedOnCommit = releasedReplLocks

// Run the commit triggers if successfully committed.
triggerResult, err := RunCommitTrigger(
ctx, cArgs.EvalCtx, readWriter.(storage.Batch), ms, args, reply.Txn,
)
Expand Down Expand Up @@ -539,7 +548,7 @@ func resolveLocalLocks(
ms *enginepb.MVCCStats,
args *kvpb.EndTxnRequest,
txn *roachpb.Transaction,
) (resolvedLocks []roachpb.LockUpdate, externalLocks []roachpb.Span, _ error) {
) (resolvedLocks []roachpb.LockUpdate, releasedReplLocks, externalLocks []roachpb.Span, _ error) {
var resolveAllowance int64 = lockResolutionBatchSize
var targetBytes int64 = lockResolutionBatchByteSize
if args.InternalCommitTrigger != nil {
Expand All @@ -563,7 +572,7 @@ func resolveLocalLocksWithPagination(
txn *roachpb.Transaction,
maxKeys int64,
targetBytes int64,
) (resolvedLocks []roachpb.LockUpdate, externalLocks []roachpb.Span, _ error) {
) (resolvedLocks []roachpb.LockUpdate, releasedReplLocks, externalLocks []roachpb.Span, _ error) {
desc := evalCtx.Desc()
if mergeTrigger := args.InternalCommitTrigger.GetMergeTrigger(); mergeTrigger != nil {
// If this is a merge, then use the post-merge descriptor to determine
Expand Down Expand Up @@ -595,7 +604,7 @@ func resolveLocalLocksWithPagination(
//
// Note that the underlying pebbleIterator will still be reused
// since readWriter is a pebbleBatch in the typical case.
ok, numBytes, resumeSpan, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update,
ok, numBytes, resumeSpan, wereReplLocksReleased, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentOptions{TargetBytes: targetBytes})
if err != nil {
return 0, 0, 0, errors.Wrapf(err, "resolving write intent at %s on end transaction [%s]", span, txn.Status)
Expand All @@ -621,6 +630,9 @@ func resolveLocalLocksWithPagination(
}
}
}
if wereReplLocksReleased {
releasedReplLocks = append(releasedReplLocks, update.Span)
}
return numKeys, numBytes, resumeReason, nil
}
// For update ranges, cut into parts inside and outside our key
Expand All @@ -630,7 +642,7 @@ func resolveLocalLocksWithPagination(
externalLocks = append(externalLocks, outSpans...)
if inSpan != nil {
update.Span = *inSpan
numKeys, numBytes, resumeSpan, resumeReason, _, err :=
numKeys, numBytes, resumeSpan, resumeReason, wereReplLocksReleased, err :=
storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: maxKeys, TargetBytes: targetBytes},
)
Expand All @@ -655,25 +667,28 @@ func resolveLocalLocksWithPagination(
span, txn.Status)
}
}
if wereReplLocksReleased {
releasedReplLocks = append(releasedReplLocks, update.Span)
}
return numKeys, numBytes, resumeReason, nil
}
return 0, 0, 0, nil
}

numKeys, _, _, err := storage.MVCCPaginate(ctx, maxKeys, targetBytes, false /* allowEmpty */, f)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

externalLocks = append(externalLocks, remainingLockSpans...)

removedAny := numKeys > 0
if WriteAbortSpanOnResolve(txn.Status, args.Poison, removedAny) {
if err := UpdateAbortSpan(ctx, evalCtx, readWriter, ms, txn.TxnMeta, args.Poison); err != nil {
return nil, nil, err
return nil, nil, nil, err
}
}
return resolvedLocks, externalLocks, nil
return resolvedLocks, releasedReplLocks, externalLocks, nil
}

// updateStagingTxn persists the STAGING transaction record with updated status
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1651,7 +1651,7 @@ func TestResolveLocalLocks(t *testing.T) {
err := storage.MVCCPut(ctx, batch, intToKey(i), ts, roachpb.MakeValueFromString("a"), storage.MVCCWriteOptions{Txn: &txn})
require.NoError(t, err)
}
resolvedLocks, externalLocks, err := resolveLocalLocksWithPagination(
resolvedLocks, releasedReplLocks, externalLocks, err := resolveLocalLocksWithPagination(
ctx,
batch,
(&MockEvalCtx{
Expand All @@ -1674,6 +1674,7 @@ func TestResolveLocalLocks(t *testing.T) {
require.Equal(t, tc.expectedResolvedLocks[i].Key, lock.Key)
require.Equal(t, tc.expectedResolvedLocks[i].EndKey, lock.EndKey)
}
require.Len(t, releasedReplLocks, 0)
require.Equal(t, len(tc.expectedExternalLocks), len(externalLocks))
for i, lock := range externalLocks {
require.Equal(t, tc.expectedExternalLocks[i].Key, lock.Key)
Expand Down
125 changes: 116 additions & 9 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14410,19 +14410,21 @@ func TestResolveIntentReplicatedLocksBumpsTSCache(t *testing.T) {
_, pErr = tc.Sender().Send(ctx, ba)
require.Nil(t, pErr)

expectedRTS := makeTS(t1.UnixNano(), 0) // we scanned at t1 over [a, e)
notBumpedTs := makeTS(t1.UnixNano(), 0) // we scanned at t1 over [a, e)
bumpedTs := makeTS(t2.UnixNano(), 0) // if we committed, it was at t2
expTs := notBumpedTs
if isCommit && isReplicated {
expectedRTS = makeTS(t2.UnixNano(), 0)
expTs = bumpedTs
}

rTS, _ := tc.store.tsCache.GetMax(roachpb.Key("a"), nil)
require.Equal(t, expectedRTS, rTS)
require.Equal(t, expTs, rTS)
rTS, _ = tc.store.tsCache.GetMax(roachpb.Key("b"), nil)
require.Equal(t, expectedRTS, rTS)
require.Equal(t, expTs, rTS)
rTS, _ = tc.store.tsCache.GetMax(roachpb.Key("c"), nil)
require.Equal(t, expectedRTS, rTS)
require.Equal(t, expTs, rTS)
rTS, _ = tc.store.tsCache.GetMax(roachpb.Key("d"), nil)
require.Equal(t, makeTS(t1.UnixNano(), 0), rTS) // we scanned at t1 over [a, e)
require.Equal(t, notBumpedTs, rTS)
}

testutils.RunTrueAndFalse(t, "isCommit", func(t *testing.T, isCommit bool) {
Expand Down Expand Up @@ -14523,14 +14525,16 @@ func TestResolveIntentRangeReplicatedLocksBumpsTSCache(t *testing.T) {
_, pErr = tc.Sender().Send(ctx, ba)
require.Nil(t, pErr)

expectedRTS := makeTS(t1.UnixNano(), 0) // we scanned at t1 over [a, e)
notBumpedTs := makeTS(t1.UnixNano(), 0) // we scanned at t1 over [a, e)
bumpedTs := makeTS(t2.UnixNano(), 0) // if we committed, it was at t2
expTs := notBumpedTs
if isCommit && isReplicated {
expectedRTS = makeTS(t2.UnixNano(), 0)
expTs = bumpedTs
}

for _, keyStr := range []string{"a", "b", "c", "d"} {
rTS, _ := tc.store.tsCache.GetMax(roachpb.Key(keyStr), nil)
require.Equal(t, expectedRTS, rTS)
require.Equal(t, expTs, rTS)
}
}

Expand All @@ -14544,3 +14548,106 @@ func TestResolveIntentRangeReplicatedLocksBumpsTSCache(t *testing.T) {
})
})
}

// TestEndTxnReplicatedLocksBumpsTSCache is like
// TestResolveIntentReplicatedLocksBumpsTSCache, except it tests EndTxn requests
// (which synchronously resolve local locks) instead of ResolveIntent requests.
func TestEndTxnReplicatedLocksBumpsTSCache(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

newTxn := func(key string, ts hlc.Timestamp) *roachpb.Transaction {
// We're going to use read committed isolation level here, as that's the
// only one which makes sense in the context of this test. That's because
// serializable transactions cannot commit before refreshing their reads,
// and refreshing reads bumps the timestamp cache.
txn := roachpb.MakeTransaction("test", roachpb.Key(key), isolation.ReadCommitted, roachpb.NormalUserPriority, ts, 0, 0, 0)
return &txn
}

run := func(t *testing.T, isCommit, isReplicated bool, str lock.Strength) {
ctx := context.Background()
tc := testContext{}
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
startTime := timeutil.Unix(0, 123)
tc.manualClock = timeutil.NewManualTime(startTime)
sc := TestStoreConfig(hlc.NewClockForTesting(tc.manualClock))
sc.TestingKnobs.DisableCanAckBeforeApplication = true
tc.StartWithStoreConfig(ctx, t, stopper, sc)

// Write some keys at time t0.
t0 := timeutil.Unix(1, 0)
tc.manualClock.MustAdvanceTo(t0)
err := tc.store.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
for _, keyStr := range []string{"a", "b", "c"} {
err := txn.Put(ctx, roachpb.Key(keyStr), "value")
if err != nil {
return err
}
}
return nil
})
require.NoError(t, err)

// Scan [a, e) at t1, acquiring locks as dictated by the test setup. Verify
// the scan result is correct.
t1 := timeutil.Unix(2, 0)
ba := &kvpb.BatchRequest{}
txn := newTxn("a", makeTS(t1.UnixNano(), 0))
ba.Timestamp = makeTS(t1.UnixNano(), 0)
ba.Txn = txn
span := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("e")}
sArgs := scanArgs(span.Key, span.EndKey)
sArgs.KeyLockingStrength = str
sArgs.KeyLockingDurability = lock.Unreplicated
if isReplicated {
sArgs.KeyLockingDurability = lock.Replicated
}
ba.Add(sArgs)
_, pErr := tc.Sender().Send(ctx, ba)
require.Nil(t, pErr)

// Commit the transaction at t2.
t2 := timeutil.Unix(3, 0)
ba = &kvpb.BatchRequest{}
txn.WriteTimestamp = makeTS(t2.UnixNano(), 0)
ba.Txn = txn
et, _ := endTxnArgs(txn, isCommit)
// Assign lock spans to the EndTxn request. Assign one point lock span
// and one range lock span.
et.LockSpans = []roachpb.Span{
{Key: roachpb.Key("a")},
{Key: roachpb.Key("c"), EndKey: roachpb.Key("e")},
}
ba.Add(&et)
_, pErr = tc.Sender().Send(ctx, ba)
require.Nil(t, pErr)

notBumpedTs := makeTS(t1.UnixNano(), 0) // we scanned at t1 over [a, e)
bumpedTs := makeTS(t2.UnixNano(), 0) // if we committed, it was at t2
expTs := notBumpedTs
if isCommit && isReplicated {
expTs = bumpedTs
}

rTS, _ := tc.store.tsCache.GetMax(roachpb.Key("a"), nil)
require.Equal(t, expTs, rTS)
rTS, _ = tc.store.tsCache.GetMax(roachpb.Key("b"), nil)
require.Equal(t, notBumpedTs, rTS)
rTS, _ = tc.store.tsCache.GetMax(roachpb.Key("c"), nil)
require.Equal(t, expTs, rTS)
rTS, _ = tc.store.tsCache.GetMax(roachpb.Key("d"), nil)
require.Equal(t, expTs, rTS)
}

testutils.RunTrueAndFalse(t, "isCommit", func(t *testing.T, isCommit bool) {
testutils.RunTrueAndFalse(t, "isReplicated", func(t *testing.T, isReplicated bool) {
for _, str := range []lock.Strength{lock.Shared, lock.Exclusive} {
t.Run(str.String(), func(t *testing.T) {
run(t, isCommit, isReplicated, str)
})
}
})
})
}
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/replica_tscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,14 @@ func (r *Replica) updateTimestampCache(
// transaction's MinTimestamp, which is consulted in CanCreateTxnRecord.
key := transactionTombstoneMarker(start, txnID)
addToTSCache(key, nil, ts, txnID)
// Additionally, EndTxn requests that release replicated locks for
// committed transactions bump the timestamp cache over those lock
// spans to the commit timestamp of the transaction to ensure that
// the released locks continue to provide protection against writes
// underneath the transaction's commit timestamp.
for _, sp := range resp.(*kvpb.EndTxnResponse).ReplicatedLocksReleasedOnCommit {
addToTSCache(sp.Key, sp.EndKey, br.Txn.WriteTimestamp, txnID)
}
case *kvpb.HeartbeatTxnRequest:
// HeartbeatTxn requests record a tombstone entry when the record is
// initially written. This is used when considering potential 1PC
Expand Down

0 comments on commit 0e6de2b

Please sign in to comment.