Skip to content

Commit

Permalink
kv: bump timestamp cache when releasing replicated locks synchronously
Browse files Browse the repository at this point in the history
Fixes cockroachdb#111536.

This commit is the second half of cockroachdb#111546. Whereas that commit taught
intent/lock resolution to bump the timestamp cache when releasing
replicated locks for committed transactions asynchronously through
ResolveIntent and ResolveIntentRange requests, this commit teaches
intent/lock resolution to bump the timestamp cache when releasing
replicated locks for committed transactions synchronously through EndTxn
requests.

The interface changes here look slightly different than the ones in that
commit. Instead of attaching the commit timestamp to the response proto,
we attach the spans over which replicated locks were released. This is
because the commit timestamp was already present on the response, but
the request may have resolved multiple lock spans.

Release note: None
  • Loading branch information
nvanbenschoten committed Oct 5, 2023
1 parent 6b08842 commit baa1990
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 20 deletions.
12 changes: 12 additions & 0 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,18 @@ message EndTxnResponse {
// The commit timestamp of the STAGING transaction record written
// by the request. Only set if the transaction record was staged.
util.hlc.Timestamp staging_timestamp = 5 [(gogoproto.nullable) = false];
// ReplicatedLocksReleasedOnCommit, if non-empty, indicate that replicated
// locks with strength Shared or Exclusive were released in the referenced key
// spans when committing this transaction. Notably, this field is left unset
// if only write intents were resolved. The field is also left unset for
// transactions who aborted.
//
// The caller must bump the timestamp cache across these spans to the
// transaction's commit timestamp. Doing so ensures that the released locks
// (acquired by the now committed transaction) continue to provide protection
// against other writers up to the commit timestamp, even after the locks have
// been released.
repeated Span replicated_locks_released_on_commit = 6 [(gogoproto.nullable) = false];
}

// An AdminSplitRequest is the argument to the AdminSplit() method. The
Expand Down
35 changes: 25 additions & 10 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func EndTxn(
// The transaction has already been aborted by other.
// Do not return TransactionAbortedError since the client anyway
// wanted to abort the transaction.
resolvedLocks, externalLocks, err := resolveLocalLocks(ctx, readWriter, cArgs.EvalCtx, ms, args, reply.Txn)
resolvedLocks, _, externalLocks, err := resolveLocalLocks(ctx, readWriter, cArgs.EvalCtx, ms, args, reply.Txn)
if err != nil {
return result.Result{}, err
}
Expand Down Expand Up @@ -436,7 +436,8 @@ func EndTxn(
// we position the transaction record next to the first write of a transaction.
// This avoids the need for the intentResolver to have to return to this range
// to resolve locks for this transaction in the future.
resolvedLocks, externalLocks, err := resolveLocalLocks(ctx, readWriter, cArgs.EvalCtx, ms, args, reply.Txn)
resolvedLocks, releasedReplLocks, externalLocks, err := resolveLocalLocks(
ctx, readWriter, cArgs.EvalCtx, ms, args, reply.Txn)
if err != nil {
return result.Result{}, err
}
Expand Down Expand Up @@ -472,8 +473,16 @@ func EndTxn(
txnResult.Local.UpdatedTxns = []*roachpb.Transaction{reply.Txn}
txnResult.Local.ResolvedLocks = resolvedLocks

// Run the commit triggers if successfully committed.
if reply.Txn.Status == roachpb.COMMITTED {
// Return whether replicated {shared, exclusive} locks were released by
// the committing transaction. If such locks were released, we still
// need to make sure other transactions can't write underneath the
// transaction's commit timestamp to the key spans previously protected
// by the locks. We return the spans on the response and update the
// timestamp cache a few layers above to ensure this.
reply.ReplicatedLocksReleasedOnCommit = releasedReplLocks

// Run the commit triggers if successfully committed.
triggerResult, err := RunCommitTrigger(
ctx, cArgs.EvalCtx, readWriter.(storage.Batch), ms, args, reply.Txn,
)
Expand Down Expand Up @@ -539,7 +548,7 @@ func resolveLocalLocks(
ms *enginepb.MVCCStats,
args *kvpb.EndTxnRequest,
txn *roachpb.Transaction,
) (resolvedLocks []roachpb.LockUpdate, externalLocks []roachpb.Span, _ error) {
) (resolvedLocks []roachpb.LockUpdate, releasedReplLocks, externalLocks []roachpb.Span, _ error) {
var resolveAllowance int64 = lockResolutionBatchSize
var targetBytes int64 = lockResolutionBatchByteSize
if args.InternalCommitTrigger != nil {
Expand All @@ -563,7 +572,7 @@ func resolveLocalLocksWithPagination(
txn *roachpb.Transaction,
maxKeys int64,
targetBytes int64,
) (resolvedLocks []roachpb.LockUpdate, externalLocks []roachpb.Span, _ error) {
) (resolvedLocks []roachpb.LockUpdate, releasedReplLocks, externalLocks []roachpb.Span, _ error) {
desc := evalCtx.Desc()
if mergeTrigger := args.InternalCommitTrigger.GetMergeTrigger(); mergeTrigger != nil {
// If this is a merge, then use the post-merge descriptor to determine
Expand Down Expand Up @@ -595,7 +604,7 @@ func resolveLocalLocksWithPagination(
//
// Note that the underlying pebbleIterator will still be reused
// since readWriter is a pebbleBatch in the typical case.
ok, numBytes, resumeSpan, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update,
ok, numBytes, resumeSpan, replLocksReleased, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentOptions{TargetBytes: targetBytes})
if err != nil {
return 0, 0, 0, errors.Wrapf(err, "resolving write intent at %s on end transaction [%s]", span, txn.Status)
Expand All @@ -621,6 +630,9 @@ func resolveLocalLocksWithPagination(
}
}
}
if replLocksReleased {
releasedReplLocks = append(releasedReplLocks, update.Span)
}
return numKeys, numBytes, resumeReason, nil
}
// For update ranges, cut into parts inside and outside our key
Expand All @@ -630,7 +642,7 @@ func resolveLocalLocksWithPagination(
externalLocks = append(externalLocks, outSpans...)
if inSpan != nil {
update.Span = *inSpan
numKeys, numBytes, resumeSpan, resumeReason, _, err :=
numKeys, numBytes, resumeSpan, resumeReason, replLocksReleased, err :=
storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: maxKeys, TargetBytes: targetBytes},
)
Expand All @@ -655,25 +667,28 @@ func resolveLocalLocksWithPagination(
span, txn.Status)
}
}
if replLocksReleased {
releasedReplLocks = append(releasedReplLocks, update.Span)
}
return numKeys, numBytes, resumeReason, nil
}
return 0, 0, 0, nil
}

numKeys, _, _, err := storage.MVCCPaginate(ctx, maxKeys, targetBytes, false /* allowEmpty */, f)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

externalLocks = append(externalLocks, remainingLockSpans...)

removedAny := numKeys > 0
if WriteAbortSpanOnResolve(txn.Status, args.Poison, removedAny) {
if err := UpdateAbortSpan(ctx, evalCtx, readWriter, ms, txn.TxnMeta, args.Poison); err != nil {
return nil, nil, err
return nil, nil, nil, err
}
}
return resolvedLocks, externalLocks, nil
return resolvedLocks, releasedReplLocks, externalLocks, nil
}

// updateStagingTxn persists the STAGING transaction record with updated status
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1651,7 +1651,7 @@ func TestResolveLocalLocks(t *testing.T) {
err := storage.MVCCPut(ctx, batch, intToKey(i), ts, roachpb.MakeValueFromString("a"), storage.MVCCWriteOptions{Txn: &txn})
require.NoError(t, err)
}
resolvedLocks, externalLocks, err := resolveLocalLocksWithPagination(
resolvedLocks, releasedReplLocks, externalLocks, err := resolveLocalLocksWithPagination(
ctx,
batch,
(&MockEvalCtx{
Expand All @@ -1674,6 +1674,7 @@ func TestResolveLocalLocks(t *testing.T) {
require.Equal(t, tc.expectedResolvedLocks[i].Key, lock.Key)
require.Equal(t, tc.expectedResolvedLocks[i].EndKey, lock.EndKey)
}
require.Len(t, releasedReplLocks, 0)
require.Equal(t, len(tc.expectedExternalLocks), len(externalLocks))
for i, lock := range externalLocks {
require.Equal(t, tc.expectedExternalLocks[i].Key, lock.Key)
Expand Down
125 changes: 116 additions & 9 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14410,19 +14410,21 @@ func TestResolveIntentReplicatedLocksBumpsTSCache(t *testing.T) {
_, pErr = tc.Sender().Send(ctx, ba)
require.Nil(t, pErr)

expectedRTS := makeTS(t1.UnixNano(), 0) // we scanned at t1 over [a, e)
notBumpedTs := makeTS(t1.UnixNano(), 0) // we scanned at t1 over [a, e)
bumpedTs := makeTS(t2.UnixNano(), 0) // if we committed, it was at t2
expTs := notBumpedTs
if isCommit && isReplicated {
expectedRTS = makeTS(t2.UnixNano(), 0)
expTs = bumpedTs
}

rTS, _ := tc.store.tsCache.GetMax(roachpb.Key("a"), nil)
require.Equal(t, expectedRTS, rTS)
require.Equal(t, expTs, rTS)
rTS, _ = tc.store.tsCache.GetMax(roachpb.Key("b"), nil)
require.Equal(t, expectedRTS, rTS)
require.Equal(t, expTs, rTS)
rTS, _ = tc.store.tsCache.GetMax(roachpb.Key("c"), nil)
require.Equal(t, expectedRTS, rTS)
require.Equal(t, expTs, rTS)
rTS, _ = tc.store.tsCache.GetMax(roachpb.Key("d"), nil)
require.Equal(t, makeTS(t1.UnixNano(), 0), rTS) // we scanned at t1 over [a, e)
require.Equal(t, notBumpedTs, rTS)
}

testutils.RunTrueAndFalse(t, "isCommit", func(t *testing.T, isCommit bool) {
Expand Down Expand Up @@ -14523,14 +14525,16 @@ func TestResolveIntentRangeReplicatedLocksBumpsTSCache(t *testing.T) {
_, pErr = tc.Sender().Send(ctx, ba)
require.Nil(t, pErr)

expectedRTS := makeTS(t1.UnixNano(), 0) // we scanned at t1 over [a, e)
notBumpedTs := makeTS(t1.UnixNano(), 0) // we scanned at t1 over [a, e)
bumpedTs := makeTS(t2.UnixNano(), 0) // if we committed, it was at t2
expTs := notBumpedTs
if isCommit && isReplicated {
expectedRTS = makeTS(t2.UnixNano(), 0)
expTs = bumpedTs
}

for _, keyStr := range []string{"a", "b", "c", "d"} {
rTS, _ := tc.store.tsCache.GetMax(roachpb.Key(keyStr), nil)
require.Equal(t, expectedRTS, rTS)
require.Equal(t, expTs, rTS)
}
}

Expand All @@ -14544,3 +14548,106 @@ func TestResolveIntentRangeReplicatedLocksBumpsTSCache(t *testing.T) {
})
})
}

// TestEndTxnReplicatedLocksBumpsTSCache is like
// TestResolveIntentReplicatedLocksBumpsTSCache, except it tests EndTxn requests
// (which synchronously resolve local locks) instead of ResolveIntent requests.
func TestEndTxnReplicatedLocksBumpsTSCache(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

newTxn := func(key string, ts hlc.Timestamp) *roachpb.Transaction {
// We're going to use read committed isolation level here, as that's the
// only one which makes sense in the context of this test. That's because
// serializable transactions cannot commit before refreshing their reads,
// and refreshing reads bumps the timestamp cache.
txn := roachpb.MakeTransaction("test", roachpb.Key(key), isolation.ReadCommitted, roachpb.NormalUserPriority, ts, 0, 0, 0)
return &txn
}

run := func(t *testing.T, isCommit, isReplicated bool, str lock.Strength) {
ctx := context.Background()
tc := testContext{}
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
startTime := timeutil.Unix(0, 123)
tc.manualClock = timeutil.NewManualTime(startTime)
sc := TestStoreConfig(hlc.NewClockForTesting(tc.manualClock))
sc.TestingKnobs.DisableCanAckBeforeApplication = true
tc.StartWithStoreConfig(ctx, t, stopper, sc)

// Write some keys at time t0.
t0 := timeutil.Unix(1, 0)
tc.manualClock.MustAdvanceTo(t0)
err := tc.store.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
for _, keyStr := range []string{"a", "b", "c"} {
err := txn.Put(ctx, roachpb.Key(keyStr), "value")
if err != nil {
return err
}
}
return nil
})
require.NoError(t, err)

// Scan [a, e) at t1, acquiring locks as dictated by the test setup. Verify
// the scan result is correct.
t1 := timeutil.Unix(2, 0)
ba := &kvpb.BatchRequest{}
txn := newTxn("a", makeTS(t1.UnixNano(), 0))
ba.Timestamp = makeTS(t1.UnixNano(), 0)
ba.Txn = txn
span := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("e")}
sArgs := scanArgs(span.Key, span.EndKey)
sArgs.KeyLockingStrength = str
sArgs.KeyLockingDurability = lock.Unreplicated
if isReplicated {
sArgs.KeyLockingDurability = lock.Replicated
}
ba.Add(sArgs)
_, pErr := tc.Sender().Send(ctx, ba)
require.Nil(t, pErr)

// Commit the transaction at t2.
t2 := timeutil.Unix(3, 0)
ba = &kvpb.BatchRequest{}
txn.WriteTimestamp = makeTS(t2.UnixNano(), 0)
ba.Txn = txn
et, _ := endTxnArgs(txn, isCommit)
// Assign lock spans to the EndTxn request. Assign one point lock span
// and one range lock spans.
et.LockSpans = []roachpb.Span{
{Key: roachpb.Key("a")},
{Key: roachpb.Key("c"), EndKey: roachpb.Key("e")},
}
ba.Add(&et)
_, pErr = tc.Sender().Send(ctx, ba)
require.Nil(t, pErr)

notBumpedTs := makeTS(t1.UnixNano(), 0) // we scanned at t1 over [a, e)
bumpedTs := makeTS(t2.UnixNano(), 0) // if we committed, it was at t2
expTs := notBumpedTs
if isCommit && isReplicated {
expTs = bumpedTs
}

rTS, _ := tc.store.tsCache.GetMax(roachpb.Key("a"), nil)
require.Equal(t, expTs, rTS)
rTS, _ = tc.store.tsCache.GetMax(roachpb.Key("b"), nil)
require.Equal(t, notBumpedTs, rTS)
rTS, _ = tc.store.tsCache.GetMax(roachpb.Key("c"), nil)
require.Equal(t, expTs, rTS)
rTS, _ = tc.store.tsCache.GetMax(roachpb.Key("d"), nil)
require.Equal(t, expTs, rTS)
}

testutils.RunTrueAndFalse(t, "isCommit", func(t *testing.T, isCommit bool) {
testutils.RunTrueAndFalse(t, "isReplicated", func(t *testing.T, isReplicated bool) {
for _, str := range []lock.Strength{lock.Shared, lock.Exclusive} {
t.Run(str.String(), func(t *testing.T) {
run(t, isCommit, isReplicated, str)
})
}
})
})
}
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/replica_tscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,14 @@ func (r *Replica) updateTimestampCache(
// transaction's MinTimestamp, which is consulted in CanCreateTxnRecord.
key := transactionTombstoneMarker(start, txnID)
addToTSCache(key, nil, ts, txnID)
// Additionally, EndTxn requests that release replicated locks for
// committed transactions bump the timestamp cache over those lock
// spans to the commit timestamp of the transaction to ensure that
// the released locks continue to provide protection against writes
// underneath the transaction's commit timestamp.
for _, sp := range resp.(*kvpb.EndTxnResponse).ReplicatedLocksReleasedOnCommit {
addToTSCache(sp.Key, sp.EndKey, br.Txn.WriteTimestamp, txnID)
}
case *kvpb.HeartbeatTxnRequest:
// HeartbeatTxn requests record a tombstone entry when the record is
// initially written. This is used when considering potential 1PC
Expand Down

0 comments on commit baa1990

Please sign in to comment.