diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index bab6c42b5651..8b7d703f52fd 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -339,4 +339,4 @@ trace.snapshot.rate duration 0s if non-zero, interval at which background trace trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https:///#/debug/tracez application trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. application ui.display_timezone enumeration etc/utc the timezone used to format timestamps in the ui [etc/utc = 0, america/new_york = 1] application -version version 1000023.2-upgrading-to-1000024.1-step-022 set the active cluster version in the format '.' application +version version 1000023.2-upgrading-to-1000024.1-step-024 set the active cluster version in the format '.' application diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 7e22eef3ad9a..e5628f9fbb49 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -294,6 +294,6 @@
trace.span_registry.enabled
booleantrueif set, ongoing traces can be seen at https://<ui>/#/debug/tracezServerless/Dedicated/Self-Hosted
trace.zipkin.collector
stringthe address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.Serverless/Dedicated/Self-Hosted
ui.display_timezone
enumerationetc/utcthe timezone used to format timestamps in the ui [etc/utc = 0, america/new_york = 1]Serverless/Dedicated/Self-Hosted -
version
version1000023.2-upgrading-to-1000024.1-step-022set the active cluster version in the format '<major>.<minor>'Serverless/Dedicated/Self-Hosted +
version
version1000023.2-upgrading-to-1000024.1-step-024set the active cluster version in the format '<major>.<minor>'Serverless/Dedicated/Self-Hosted diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 74bf10d12be3..88f613407934 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -305,6 +305,10 @@ const ( // splits. V24_1_EstimatedMVCCStatsInSplit + // V24_1_ReplicatedLockPipelining allows exclusive and shared replicated locks + // to be pipelined. + V24_1_ReplicatedLockPipelining + numKeys ) @@ -372,6 +376,7 @@ var versionTable = [numKeys]roachpb.Version{ V24_1_SystemDatabaseSurvivability: {Major: 23, Minor: 2, Internal: 18}, V24_1_GossipMaximumIOOverload: {Major: 23, Minor: 2, Internal: 20}, V24_1_EstimatedMVCCStatsInSplit: {Major: 23, Minor: 2, Internal: 22}, + V24_1_ReplicatedLockPipelining: {Major: 23, Minor: 2, Internal: 24}, } // Latest is always the highest version key. This is the maximum logical cluster diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index daae1b60b120..c4c6e7f315ec 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -1999,6 +1999,21 @@ func (acrr *AdminChangeReplicasRequest) Changes() []ReplicationChange { return sl } +// StrengthOrDefault returns the strength of the lock being queried by the +// QueryIntentRequest. +func (qir *QueryIntentRequest) StrengthOrDefault() lock.Strength { + // TODO(arul): the Strength field on QueryIntentRequest was introduced in + // v24.1. Prior to that, rather unsurprisingly, QueryIntentRequest would only + // query replicated locks with strength. To maintain compatibility between + // v23.2 <-> v24.1 nodes, if this field is unset, we assume it's lock.Intent. + // In the future, once compatibility with v23.2 is no longer a concern, we + // should be able to get rid of this logic. + if qir.Strength == lock.None { + return lock.Intent + } + return qir.Strength +} + // AsLockUpdate creates a lock update message corresponding to the given resolve // intent request. func (rir *ResolveIntentRequest) AsLockUpdate() roachpb.LockUpdate { diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index 9906dd49cec3..f8519150b0f2 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -1436,6 +1436,19 @@ message QueryIntentRequest { // If true, return an IntentMissingError if no matching intent (neither a // "partial match" nor a "full match") is found. bool error_if_missing = 3; + + // The strength with which the lock being queried was acquired at. To ensure + // the supplied protection was provided, we check whether the lock was held + // with the supplied lock strength or something stronger at the sequence + // number. + kv.kvserver.concurrency.lock.Strength strength = 4; + + // The list of sequence numbers that have been ignored by the transaction that + // acquired the lock. Any locks found at sequence numbers which are considered + // ignored will be treated as "not found"; that's because they can be removed + // at any time. + repeated storage.enginepb.IgnoredSeqNumRange ignored_seqnums = 5 + [(gogoproto.nullable) = false, (gogoproto.customname) = "IgnoredSeqNums"]; } // A QueryIntentResponse is the return value from the QueryIntent() method. diff --git a/pkg/kv/kvserver/batcheval/cmd_query_intent.go b/pkg/kv/kvserver/batcheval/cmd_query_intent.go index a68f545488e6..6c7f7a30db6e 100644 --- a/pkg/kv/kvserver/batcheval/cmd_query_intent.go +++ b/pkg/kv/kvserver/batcheval/cmd_query_intent.go @@ -14,11 +14,15 @@ import ( "context" "time" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -28,17 +32,27 @@ func init() { } func declareKeysQueryIntent( - _ ImmutableRangeState, + rs ImmutableRangeState, _ *kvpb.Header, req kvpb.Request, latchSpans *spanset.SpanSet, _ *lockspanset.LockSpanSet, _ time.Duration, ) error { - // QueryIntent requests read the specified keys at the maximum timestamp in - // order to read any intent present, if one exists, regardless of the - // timestamp it was written at. + // QueryIntent requests acquire a non-MVCC latch in order to read the queried + // lock, if one exists, regardless of the time it was written at. This + // isolates them from in-flight intent writes and exclusive lock acquisitions + // they're trying to query. latchSpans.AddNonMVCC(spanset.SpanReadOnly, req.Header().Span()) + // They also acquire a read latch on the per-transaction local key that all + // replicated shared lock acquisitions acquire latches on. This isolates them + // from the in-flight shared lock acquisition they're trying to query. + // + // TODO(arul): add a test. + txnID := req.(*kvpb.QueryIntentRequest).Txn.ID + latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{ + Key: keys.ReplicatedSharedLocksTransactionLatchingKey(rs.GetRangeID(), txnID), + }) return nil } @@ -74,53 +88,78 @@ func QueryIntent( h.Timestamp, args.Txn.WriteTimestamp) } - // Read from the lock table to see if an intent exists. - intent, err := storage.GetIntent(ctx, reader, args.Key, storage.BatchEvalReadCategory) - if err != nil { - return result.Result{}, err + if enginepb.TxnSeqIsIgnored(args.Txn.Sequence, args.IgnoredSeqNums) { + return result.Result{}, errors.AssertionFailedf( + "QueryIntent request for lock at sequence number %d but sequence number is ignored %v", + args.Txn.Sequence, args.IgnoredSeqNums, + ) } - reply.FoundIntent = false - reply.FoundUnpushedIntent = false - if intent != nil { - // See comment on QueryIntentRequest.Txn for an explanation of this - // comparison. - // TODO(nvanbenschoten): Now that we have a full intent history, - // we can look at the exact sequence! That won't serve as much more - // than an assertion that QueryIntent is being used correctly. - reply.FoundIntent = (args.Txn.ID == intent.Txn.ID) && - (args.Txn.Epoch == intent.Txn.Epoch) && - (args.Txn.Sequence <= intent.Txn.Sequence) + var intent *roachpb.Intent - if !reply.FoundIntent { - log.VEventf(ctx, 2, "intent mismatch requires - %v == %v and %v == %v and %v <= %v", - args.Txn.ID, intent.Txn.ID, args.Txn.Epoch, intent.Txn.Epoch, args.Txn.Sequence, intent.Txn.Sequence) - } else { - // If we found a matching intent, check whether the intent was pushed past - // its expected timestamp. - cmpTS := args.Txn.WriteTimestamp - if ownTxn { - // If the request is querying an intent for its own transaction, forward - // the timestamp we compare against to the provisional commit timestamp - // in the batch header. - cmpTS.Forward(h.Txn.WriteTimestamp) - } - reply.FoundUnpushedIntent = intent.Txn.WriteTimestamp.LessEq(cmpTS) + // Intents have special handling because there's an associated timestamp + // component with them. + if args.StrengthOrDefault() == lock.Intent { + // Read from the lock table to see if an intent exists. + var err error + intent, err = storage.GetIntent(ctx, reader, args.Key, storage.BatchEvalReadCategory) + if err != nil { + return result.Result{}, err + } - if !reply.FoundUnpushedIntent { - log.VEventf(ctx, 2, "found pushed intent") - // If the request was querying an intent in its own transaction, update - // the response transaction. - // TODO(nvanbenschoten): if this is necessary for correctness, say so. - // And then add a test to demonstrate that. + reply.FoundIntent = false + reply.FoundUnpushedIntent = false + if intent != nil { + // See comment on QueryIntentRequest.Txn for an explanation of this + // comparison. + // TODO(nvanbenschoten): Now that we have a full intent history, + // we can look at the exact sequence! That won't serve as much more + // than an assertion that QueryIntent is being used correctly. + reply.FoundIntent = (args.Txn.ID == intent.Txn.ID) && + (args.Txn.Epoch == intent.Txn.Epoch) && + (args.Txn.Sequence <= intent.Txn.Sequence) + + if !reply.FoundIntent { + log.VEventf(ctx, 2, "intent mismatch requires - %v == %v and %v == %v and %v <= %v", + args.Txn.ID, intent.Txn.ID, args.Txn.Epoch, intent.Txn.Epoch, args.Txn.Sequence, intent.Txn.Sequence) + } else { + // If we found a matching intent, check whether the intent was pushed past + // its expected timestamp. + cmpTS := args.Txn.WriteTimestamp if ownTxn { - reply.Txn = h.Txn.Clone() - reply.Txn.WriteTimestamp.Forward(intent.Txn.WriteTimestamp) + // If the request is querying an intent for its own transaction, forward + // the timestamp we compare against to the provisional commit timestamp + // in the batch header. + cmpTS.Forward(h.Txn.WriteTimestamp) + } + reply.FoundUnpushedIntent = intent.Txn.WriteTimestamp.LessEq(cmpTS) + + if !reply.FoundUnpushedIntent { + log.VEventf(ctx, 2, "found pushed intent") + // If the request was querying an intent in its own transaction, update + // the response transaction. + // TODO(nvanbenschoten): if this is necessary for correctness, say so. + // And then add a test to demonstrate that. + if ownTxn { + reply.Txn = h.Txn.Clone() + reply.Txn.WriteTimestamp.Forward(intent.Txn.WriteTimestamp) + } } } + } else { + log.VEventf(ctx, 2, "found no intent") } } else { - log.VEventf(ctx, 2, "found no intent") + found, err := storage.VerifyLock( + ctx, reader, &args.Txn, args.Strength, args.Key, args.IgnoredSeqNums, + ) + if err != nil { + return result.Result{}, err + } + if found { + reply.FoundIntent = true + reply.FoundUnpushedIntent = true + } } if !reply.FoundIntent && args.ErrorIfMissing { diff --git a/pkg/kv/kvserver/replica_tscache.go b/pkg/kv/kvserver/replica_tscache.go index 46543b482a20..f973f0a5796f 100644 --- a/pkg/kv/kvserver/replica_tscache.go +++ b/pkg/kv/kvserver/replica_tscache.go @@ -296,21 +296,36 @@ func (r *Replica) updateTimestampCache( } addToTSCache(start, end, ts, txnID) case *kvpb.QueryIntentRequest: - missing := false - if pErr != nil { - _, missing = pErr.GetDetail().(*kvpb.IntentMissingError) - } else { - missing = !resp.(*kvpb.QueryIntentResponse).FoundUnpushedIntent - } - if missing { - // If the QueryIntent determined that the intent is missing - // then we update the timestamp cache at the intent's key to - // the intent's transactional timestamp. This will prevent - // the intent from ever being written in the future. We use - // an empty transaction ID so that we block the intent - // regardless of whether it is part of the current batch's - // transaction or not. - addToTSCache(start, end, t.Txn.WriteTimestamp, uuid.UUID{}) + // NB: We only need to bump the timestamp cache if the QueryIntentRequest + // was querying a write intent and if it wasn't found. This prevents the + // intent from ever being written in the future. This is done for the + // benefit of txn recovery, where we don't want an intent to land after a + // QueryIntent request has already evaluated and determined the fate of + // the transaction being recovered. Letting the intent land would cause us + // to commit a transaction that we've determined was aborted. + // + // However, for other replicated locks (shared, exclusive), we know that + // they'll never be pipelined if they belong to a batch that's being + // committed in parallel. This means that any QueryIntent request for a + // replicated shared or exclusive lock is doing so with the knowledge that + // the request evaluated successfully (so it can't land later) -- it's + // only checking whether the replication succeeded or not. + if t.StrengthOrDefault() == lock.Intent { + missing := false + if pErr != nil { + _, missing = pErr.GetDetail().(*kvpb.IntentMissingError) + } else { + missing = !resp.(*kvpb.QueryIntentResponse).FoundUnpushedIntent + } + if missing { + // If the QueryIntent determined that the intent is missing then we + // update the timestamp cache at the intent's key to the intent's + // transactional timestamp. This will prevent the intent from ever + // being written in the future. We use an empty transaction ID so that + // we block the intent regardless of whether it is part of the current + // batch's transaction or not. + addToTSCache(start, end, t.Txn.WriteTimestamp, uuid.UUID{}) + } } case *kvpb.ResolveIntentRequest: // Update the timestamp cache on the key the request resolved if there diff --git a/pkg/kv/kvserver/txnrecovery/BUILD.bazel b/pkg/kv/kvserver/txnrecovery/BUILD.bazel index 18e61a00a8d1..91f27ec67d87 100644 --- a/pkg/kv/kvserver/txnrecovery/BUILD.bazel +++ b/pkg/kv/kvserver/txnrecovery/BUILD.bazel @@ -11,6 +11,7 @@ go_library( deps = [ "//pkg/kv", "//pkg/kv/kvpb", + "//pkg/kv/kvserver/concurrency/lock", "//pkg/roachpb", "//pkg/util/hlc", "//pkg/util/log", diff --git a/pkg/kv/kvserver/txnrecovery/manager.go b/pkg/kv/kvserver/txnrecovery/manager.go index ea88e29f3a87..e3a527fab019 100644 --- a/pkg/kv/kvserver/txnrecovery/manager.go +++ b/pkg/kv/kvserver/txnrecovery/manager.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -206,6 +207,9 @@ func (m *manager) resolveIndeterminateCommitForTxnProbe( Key: w.Key, }, Txn: meta, + // TODO(nvanbenschoten): pass in the correct lock strength here. + Strength: lock.Intent, + IgnoredSeqNums: txn.IgnoredSeqNums, }) } diff --git a/pkg/storage/lock_table_key_scanner.go b/pkg/storage/lock_table_key_scanner.go index e7516f863103..d9d7ea0950eb 100644 --- a/pkg/storage/lock_table_key_scanner.go +++ b/pkg/storage/lock_table_key_scanner.go @@ -62,6 +62,9 @@ func strongerOrEqualStrengths(str lock.Strength) []lock.Strength { // the provided lock strength. func minConflictLockStrength(str lock.Strength) (lock.Strength, error) { switch str { + case lock.None: + // Don't conflict with any locks held by other transactions. + return lock.None, nil case lock.Shared: return lock.Exclusive, nil case lock.Exclusive, lock.Intent: @@ -113,31 +116,32 @@ var lockTableKeyScannerPool = sync.Pool{ // newLockTableKeyScanner creates a new lockTableKeyScanner. // -// txn is the transaction attempting to acquire locks. If txn is not nil, locks -// held by the transaction with any strength will be accumulated into the -// ownLocks array. Otherwise, if txn is nil, the request is non-transactional -// and no locks will be accumulated into the ownLocks array. +// txnID corresponds to the ID of the transaction attempting to acquire locks. +// If txnID is valid (non-empty), locks held by the transaction with any +// strength will be accumulated into the ownLocks array. Otherwise, if txnID is +// empty, the request is non-transactional and no locks will be accumulated into +// the ownLocks array. // // str is the strength of the lock that the transaction (or non-transactional // request) is attempting to acquire. The scanner will search for locks held by -// other transactions that conflict with this strength. +// other transactions that conflict with this strength[1]. // // maxConflicts is the maximum number of conflicting locks that the scanner // should accumulate before returning an error. If maxConflicts is zero, the // scanner will accumulate all conflicting locks. +// +// [1] It's valid to pass in lock.None for str. lock.None doesn't conflict with +// any other replicated locks; as such, passing lock.None configures the scanner +// to only return locks from the supplied txnID. func newLockTableKeyScanner( ctx context.Context, reader Reader, - txn *roachpb.Transaction, + txnID uuid.UUID, str lock.Strength, maxConflicts int64, targetBytesPerConflict int64, readCategory ReadCategory, ) (*lockTableKeyScanner, error) { - var txnID uuid.UUID - if txn != nil { - txnID = txn.ID - } minConflictStr, err := minConflictLockStrength(str) if err != nil { return nil, err diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index a76c9e21804c..6f0e0cf401cf 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -1928,7 +1928,7 @@ func MVCCPut( inlinePut := timestamp.IsEmpty() if !inlinePut { ltScanner, err = newLockTableKeyScanner( - ctx, rw, opts.Txn, lock.Intent, opts.MaxLockConflicts, opts.TargetLockConflictBytes, opts.Category) + ctx, rw, opts.TxnID(), lock.Intent, opts.MaxLockConflicts, opts.TargetLockConflictBytes, opts.Category) if err != nil { return roachpb.LockAcquisition{}, err } @@ -1992,7 +1992,7 @@ func MVCCDelete( var ltScanner *lockTableKeyScanner if !inlineDelete { ltScanner, err = newLockTableKeyScanner( - ctx, rw, opts.Txn, lock.Intent, opts.MaxLockConflicts, opts.TargetLockConflictBytes, opts.Category) + ctx, rw, opts.TxnID(), lock.Intent, opts.MaxLockConflicts, opts.TargetLockConflictBytes, opts.Category) if err != nil { return false, roachpb.LockAcquisition{}, err } @@ -2806,7 +2806,7 @@ func MVCCIncrement( var ltScanner *lockTableKeyScanner if !inlineIncrement { ltScanner, err = newLockTableKeyScanner( - ctx, rw, opts.Txn, lock.Intent, opts.MaxLockConflicts, opts.TargetLockConflictBytes, opts.Category) + ctx, rw, opts.TxnID(), lock.Intent, opts.MaxLockConflicts, opts.TargetLockConflictBytes, opts.Category) if err != nil { return 0, roachpb.LockAcquisition{}, err } @@ -2899,7 +2899,7 @@ func MVCCConditionalPut( var ltScanner *lockTableKeyScanner if !inlinePut { ltScanner, err = newLockTableKeyScanner( - ctx, rw, opts.Txn, lock.Intent, opts.MaxLockConflicts, opts.TargetLockConflictBytes, opts.Category) + ctx, rw, opts.TxnID(), lock.Intent, opts.MaxLockConflicts, opts.TargetLockConflictBytes, opts.Category) if err != nil { return roachpb.LockAcquisition{}, err } @@ -2996,7 +2996,7 @@ func MVCCInitPut( var ltScanner *lockTableKeyScanner if !inlinePut { ltScanner, err = newLockTableKeyScanner( - ctx, rw, opts.Txn, lock.Intent, opts.MaxLockConflicts, opts.TargetLockConflictBytes, opts.Category) + ctx, rw, opts.TxnID(), lock.Intent, opts.MaxLockConflicts, opts.TargetLockConflictBytes, opts.Category) if err != nil { return roachpb.LockAcquisition{}, err } @@ -3667,7 +3667,7 @@ func MVCCDeleteRange( var ltScanner *lockTableKeyScanner if !inlineDelete { ltScanner, err = newLockTableKeyScanner( - ctx, rw, opts.Txn, lock.Intent, opts.MaxLockConflicts, opts.TargetLockConflictBytes, opts.Category) + ctx, rw, opts.TxnID(), lock.Intent, opts.MaxLockConflicts, opts.TargetLockConflictBytes, opts.Category) if err != nil { return nil, nil, 0, nil, err } @@ -3886,7 +3886,9 @@ func MVCCPredicateDeleteRange( defer pointTombstoneIter.Close() ltScanner, err := newLockTableKeyScanner( - ctx, rw, nil /* txn */, lock.Intent, maxLockConflicts, targetLockConflictBytes, BatchEvalReadCategory) + ctx, rw, uuid.UUID{} /* txnID */, lock.Intent, + maxLockConflicts, targetLockConflictBytes, BatchEvalReadCategory, + ) if err != nil { return nil, err } @@ -4597,7 +4599,8 @@ func buildScanIntents(data []byte) ([]roachpb.Intent, error) { return intents, nil } -// MVCCWriteOptions bundles options for the MVCCPut and MVCCDelete families of functions. +// MVCCWriteOptions bundles options for the MVCCPut and MVCCDelete families of +// functions. type MVCCWriteOptions struct { // See the comment on mvccPutInternal for details on these parameters. Txn *roachpb.Transaction @@ -4629,6 +4632,16 @@ func (opts *MVCCWriteOptions) validate() error { return nil } +// TxnID returns the transaction ID if the write corresponds to a transactional +// write. Otherwise, if it corresponds to a non-transactional write, an empty ID +// is returned. +func (opts *MVCCWriteOptions) TxnID() uuid.UUID { + if opts.Txn != nil { + return opts.Txn.ID + } + return uuid.UUID{} +} + // MVCCScanOptions bundles options for the MVCCScan family of functions. type MVCCScanOptions struct { // See the documentation for MVCCScan for information on these parameters. @@ -5914,8 +5927,12 @@ func MVCCCheckForAcquireLock( if err := validateLockAcquisitionStrength(str); err != nil { return err } + var txnID uuid.UUID + if txn != nil { + txnID = txn.ID + } ltScanner, err := newLockTableKeyScanner( - ctx, reader, txn, str, maxLockConflicts, targetLockConflictBytes, BatchEvalReadCategory) + ctx, reader, txnID, str, maxLockConflicts, targetLockConflictBytes, BatchEvalReadCategory) if err != nil { return err } @@ -5948,7 +5965,7 @@ func MVCCAcquireLock( return err } ltScanner, err := newLockTableKeyScanner( - ctx, rw, txn, str, maxLockConflicts, targetLockConflictBytes, BatchEvalReadCategory) + ctx, rw, txn.ID, str, maxLockConflicts, targetLockConflictBytes, BatchEvalReadCategory) if err != nil { return err } @@ -6090,6 +6107,69 @@ func validateLockAcquisitionStrength(str lock.Strength) error { return nil } +// VerifyLock returns true if the supplied transaction holds a lock that offers +// equal to or greater protection[1] than the supplied lock strength. +// +// [1] Locks that were acquired at sequence numbers that have since been ignored +// aren't considered, as they may be rolled back in the future. +func VerifyLock( + ctx context.Context, + reader Reader, + txn *enginepb.TxnMeta, + str lock.Strength, + key roachpb.Key, + ignoredSeqNums []enginepb.IgnoredSeqNumRange, +) (bool, error) { + if txn == nil { + // Non-transactional requests cannot acquire locks that outlive their + // lifespan. Nothing to verify. + return false, errors.Errorf("txn must be non-nil to verify replicated lock") + } + if str == lock.None { + return false, errors.Errorf("querying a lock with strength %s is nonsensical", lock.None) + } + // NB: Pass in lock.None when configuring the lockTableKeyScanner to only + // return locks held by the our transaction. + ltScanner, err := newLockTableKeyScanner( + ctx, reader, txn.ID, lock.None, 0, 0, BatchEvalReadCategory, + ) + if err != nil { + return false, err + } + + defer ltScanner.close() + err = ltScanner.scan(key) + if err != nil { + return false, err + } + + for _, iterStr := range strongerOrEqualStrengths(str) { + foundLock := ltScanner.foundOwn(iterStr) + if foundLock == nil { + // Proceed to check weaker strengths... + continue + } + + if foundLock.Txn.Epoch != txn.Epoch { + continue // the lock belongs to a different epoch + } + + // We don't keep a full history of all sequence numbers a replicated lock + // was acquired at. As long as there exists a lock at some (non-rolled back) + // sequence number with sufficient lock strength, we have the desired mutual + // exclusion guarantees. We need to make sure the lock we found was written + // at a sequence number that hasn't been rolled back; otherwise, there's + // nothing stopping another request from rolling back the lock even though + // it exists right now. + if enginepb.TxnSeqIsIgnored(foundLock.Txn.Sequence, ignoredSeqNums) { + continue // the lock is ignored, proceed to check weaker lock strengths... + } + + return true, nil + } + return false, nil +} + // mvccReleaseLockInternal releases a lock at the specified key and strength and // by the specified transaction. The function accepts the instructions for how // to release the lock (encoded in the LockUpdate), and the current value of the diff --git a/pkg/storage/mvcc_history_test.go b/pkg/storage/mvcc_history_test.go index dd788983aa34..d81f4eebc689 100644 --- a/pkg/storage/mvcc_history_test.go +++ b/pkg/storage/mvcc_history_test.go @@ -840,6 +840,7 @@ var commands = map[string]cmd{ "add_unreplicated_lock": {typLocksUpdate, cmdAddUnreplicatedLock}, "check_for_acquire_lock": {typReadOnly, cmdCheckForAcquireLock}, "acquire_lock": {typLocksUpdate, cmdAcquireLock}, + "verify_lock": {typReadOnly, cmdVerifyLock}, "clear": {typDataUpdate, cmdClear}, "clear_range": {typDataUpdate, cmdClearRange}, @@ -1184,6 +1185,20 @@ func cmdAcquireLock(e *evalCtx) error { }) } +func cmdVerifyLock(e *evalCtx) error { + return e.withReader(func(r storage.Reader) error { + txn := e.getTxn(optional) + key := e.getKey() + str := e.getStrength() + found, err := storage.VerifyLock(e.ctx, r, &txn.TxnMeta, str, key, txn.IgnoredSeqNums) + if err != nil { + return err + } + e.results.buf.Printf("found: %v\n", found) + return nil + }) +} + func cmdClear(e *evalCtx) error { key := e.getKey() ts := e.getTs(nil) diff --git a/pkg/storage/testdata/mvcc_histories/verify_locks b/pkg/storage/testdata/mvcc_histories/verify_locks new file mode 100644 index 000000000000..9f0f90afea7c --- /dev/null +++ b/pkg/storage/testdata/mvcc_histories/verify_locks @@ -0,0 +1,258 @@ +run stats ok +put k=k1 v=v1 ts=5,0 +put k=k2 v=v2 ts=5,0 +put k=k3 v=v3 ts=5,0 +put k=k4 v=v4 ts=5,0 +put k=k5 v=v5 ts=5,0 # no lock +put k=k6 v=v6 ts=5,0 +put k=k7 v=v7 ts=5,0 +---- +>> put k=k1 v=v1 ts=5,0 +stats: key_count=+1 key_bytes=+15 val_count=+1 val_bytes=+7 live_count=+1 live_bytes=+22 +>> put k=k2 v=v2 ts=5,0 +stats: key_count=+1 key_bytes=+15 val_count=+1 val_bytes=+7 live_count=+1 live_bytes=+22 +>> put k=k3 v=v3 ts=5,0 +stats: key_count=+1 key_bytes=+15 val_count=+1 val_bytes=+7 live_count=+1 live_bytes=+22 +>> put k=k4 v=v4 ts=5,0 +stats: key_count=+1 key_bytes=+15 val_count=+1 val_bytes=+7 live_count=+1 live_bytes=+22 +>> put k=k5 v=v5 ts=5,0 # no lock +stats: key_count=+1 key_bytes=+15 val_count=+1 val_bytes=+7 live_count=+1 live_bytes=+22 +>> put k=k6 v=v6 ts=5,0 +stats: key_count=+1 key_bytes=+15 val_count=+1 val_bytes=+7 live_count=+1 live_bytes=+22 +>> put k=k7 v=v7 ts=5,0 +stats: key_count=+1 key_bytes=+15 val_count=+1 val_bytes=+7 live_count=+1 live_bytes=+22 +>> at end: +data: "k1"/5.000000000,0 -> /BYTES/v1 +data: "k2"/5.000000000,0 -> /BYTES/v2 +data: "k3"/5.000000000,0 -> /BYTES/v3 +data: "k4"/5.000000000,0 -> /BYTES/v4 +data: "k5"/5.000000000,0 -> /BYTES/v5 +data: "k6"/5.000000000,0 -> /BYTES/v6 +data: "k7"/5.000000000,0 -> /BYTES/v7 +stats: key_count=7 key_bytes=105 val_count=7 val_bytes=49 live_count=7 live_bytes=154 + +run stats ok +txn_begin t=A ts=10,0 +txn_begin t=B ts=11,0 +---- +>> at end: +txn: "B" meta={id=00000002 key=/Min iso=Serializable pri=0.00000000 epo=0 ts=11.000000000,0 min=0,0 seq=0} lock=true stat=PENDING rts=11.000000000,0 wto=false gul=0,0 + +run stats ok +with t=A + txn_step seq=10 + check_for_acquire_lock k=k1 str=shared + check_for_acquire_lock k=k2 str=shared + check_for_acquire_lock k=k3 str=exclusive + acquire_lock k=k1 str=shared + acquire_lock k=k2 str=shared + acquire_lock k=k3 str=exclusive + put k=k4 v=v_new +---- +>> acquire_lock k=k1 str=shared t=A +stats: lock_count=+1 lock_bytes=+69 lock_age=+90 +>> acquire_lock k=k2 str=shared t=A +stats: lock_count=+1 lock_bytes=+69 lock_age=+90 +>> acquire_lock k=k3 str=exclusive t=A +stats: lock_count=+1 lock_bytes=+69 lock_age=+90 +>> put k=k4 v=v_new t=A +put: lock acquisition = {k4 id=00000001 key=/Min iso=Serializable pri=0.00000000 epo=0 ts=10.000000000,0 min=0,0 seq=10 Replicated Intent []} +stats: key_bytes=+12 val_count=+1 val_bytes=+60 live_bytes=+53 gc_bytes_age=+1710 intent_count=+1 intent_bytes=+22 lock_count=+1 lock_age=+90 +>> at end: +txn: "A" meta={id=00000001 key=/Min iso=Serializable pri=0.00000000 epo=0 ts=10.000000000,0 min=0,0 seq=10} lock=true stat=PENDING rts=10.000000000,0 wto=false gul=0,0 +data: "k1"/5.000000000,0 -> /BYTES/v1 +data: "k2"/5.000000000,0 -> /BYTES/v2 +data: "k3"/5.000000000,0 -> /BYTES/v3 +meta: "k4"/0,0 -> txn={id=00000001 key=/Min iso=Serializable pri=0.00000000 epo=0 ts=10.000000000,0 min=0,0 seq=10} ts=10.000000000,0 del=false klen=12 vlen=10 mergeTs= txnDidNotUpdateMeta=true +data: "k4"/10.000000000,0 -> /BYTES/v_new +data: "k4"/5.000000000,0 -> /BYTES/v4 +data: "k5"/5.000000000,0 -> /BYTES/v5 +data: "k6"/5.000000000,0 -> /BYTES/v6 +data: "k7"/5.000000000,0 -> /BYTES/v7 +lock (Replicated): "k1"/Shared -> txn={id=00000001 key=/Min iso=Serializable pri=0.00000000 epo=0 ts=10.000000000,0 min=0,0 seq=10} ts=10.000000000,0 del=false klen=0 vlen=0 mergeTs= txnDidNotUpdateMeta=true +lock (Replicated): "k2"/Shared -> txn={id=00000001 key=/Min iso=Serializable pri=0.00000000 epo=0 ts=10.000000000,0 min=0,0 seq=10} ts=10.000000000,0 del=false klen=0 vlen=0 mergeTs= txnDidNotUpdateMeta=true +lock (Replicated): "k3"/Exclusive -> txn={id=00000001 key=/Min iso=Serializable pri=0.00000000 epo=0 ts=10.000000000,0 min=0,0 seq=10} ts=10.000000000,0 del=false klen=0 vlen=0 mergeTs= txnDidNotUpdateMeta=true +stats: key_count=7 key_bytes=117 val_count=8 val_bytes=109 live_count=7 live_bytes=207 gc_bytes_age=1710 intent_count=1 intent_bytes=22 lock_count=4 lock_bytes=207 lock_age=360 + +run stats ok +with t=A + txn_step seq=10 + verify_lock k=k1 str=shared + verify_lock k=k1 str=exclusive + verify_lock k=k2 str=shared + verify_lock k=k2 str=exclusive + verify_lock k=k3 str=shared + verify_lock k=k3 str=exclusive + verify_lock k=k4 str=shared + verify_lock k=k4 str=exclusive + verify_lock k=k5 str=shared + verify_lock k=k5 str=exclusive +---- +found: true +found: false +found: true +found: false +found: true +found: true +found: true +found: true +found: false +found: false +>> at end: +txn: "A" meta={id=00000001 key=/Min iso=Serializable pri=0.00000000 epo=0 ts=10.000000000,0 min=0,0 seq=10} lock=true stat=PENDING rts=10.000000000,0 wto=false gul=0,0 + +# A different transaction shouldn't be able to verify TxnA's locks. +run stats ok +with t=B + verify_lock k=k1 str=shared + verify_lock k=k1 str=exclusive + verify_lock k=k2 str=shared + verify_lock k=k2 str=exclusive + verify_lock k=k3 str=shared + verify_lock k=k3 str=exclusive + verify_lock k=k4 str=shared + verify_lock k=k4 str=exclusive + verify_lock k=k5 str=shared + verify_lock k=k5 str=exclusive +---- +found: false +found: false +found: false +found: false +found: false +found: false +found: false +found: false +found: false +found: false + +# Sequence numbers are not considered when verifying locks. Test a sequence +# numbers both higher and lower than the sequence number at which the lock was +# acquired. Higher: +run stats ok +with t=A + txn_step seq=15 + verify_lock k=k1 str=shared + verify_lock k=k1 str=exclusive + verify_lock k=k2 str=shared + verify_lock k=k2 str=exclusive + verify_lock k=k3 str=shared + verify_lock k=k3 str=exclusive + verify_lock k=k4 str=shared + verify_lock k=k4 str=exclusive + verify_lock k=k5 str=shared + verify_lock k=k5 str=exclusive +---- +found: true +found: false +found: true +found: false +found: true +found: true +found: true +found: true +found: false +found: false +>> at end: +txn: "A" meta={id=00000001 key=/Min iso=Serializable pri=0.00000000 epo=0 ts=10.000000000,0 min=0,0 seq=15} lock=true stat=PENDING rts=10.000000000,0 wto=false gul=0,0 + +# Lower: +run stats ok +with t=A + txn_step seq=5 + verify_lock k=k1 str=shared + verify_lock k=k1 str=exclusive + verify_lock k=k2 str=shared + verify_lock k=k2 str=exclusive + verify_lock k=k3 str=shared + verify_lock k=k3 str=exclusive + verify_lock k=k4 str=shared + verify_lock k=k4 str=exclusive + verify_lock k=k5 str=shared + verify_lock k=k5 str=exclusive +---- +found: true +found: false +found: true +found: false +found: true +found: true +found: true +found: true +found: false +found: false +>> at end: +txn: "A" meta={id=00000001 key=/Min iso=Serializable pri=0.00000000 epo=0 ts=10.000000000,0 min=0,0 seq=5} lock=true stat=PENDING rts=10.000000000,0 wto=false gul=0,0 + +# Ensure if a lock is held at a sequence number that's ignored it'll be +# considered not found. + +run ok +with t=A + txn_ignore_seqs seqs=(5-15) + verify_lock k=k1 str=shared + verify_lock k=k1 str=exclusive + verify_lock k=k2 str=shared + verify_lock k=k2 str=exclusive + verify_lock k=k3 str=shared + verify_lock k=k3 str=exclusive + verify_lock k=k4 str=shared + verify_lock k=k4 str=exclusive + verify_lock k=k5 str=shared + verify_lock k=k5 str=exclusive +---- +found: false +found: false +found: false +found: false +found: false +found: false +found: false +found: false +found: false +found: false +>> at end: +txn: "A" meta={id=00000001 key=/Min iso=Serializable pri=0.00000000 epo=0 ts=10.000000000,0 min=0,0 seq=5} lock=true stat=PENDING rts=10.000000000,0 wto=false gul=0,0 isn=1 + +# Test that if a lock is held at a sequence number that's ignored, but it's also +# held at a lock strength that's stronger at a sequence number that's not +# ignored, we consider the lock found. However, the opposite isn't true -- if +# the lock at the non-ignored sequence number is weaker in strength, then the +# stronger lock shouldn't be considered found. +run ok +with t=A + txn_step seq=10 + check_for_acquire_lock k=k6 str=shared + acquire_lock k=k6 str=shared + check_for_acquire_lock k=k7 str=exclusive + acquire_lock k=k7 str=exclusive + txn_step seq=20 + check_for_acquire_lock k=k6 str=exclusive + acquire_lock k=k6 str=exclusive + check_for_acquire_lock k=k7 str=shared + acquire_lock k=k7 str=shared +---- +>> at end: +txn: "A" meta={id=00000001 key=/Min iso=Serializable pri=0.00000000 epo=0 ts=10.000000000,0 min=0,0 seq=20} lock=true stat=PENDING rts=10.000000000,0 wto=false gul=0,0 isn=1 +lock (Replicated): "k1"/Shared -> txn={id=00000001 key=/Min iso=Serializable pri=0.00000000 epo=0 ts=10.000000000,0 min=0,0 seq=10} ts=10.000000000,0 del=false klen=0 vlen=0 mergeTs= txnDidNotUpdateMeta=true +lock (Replicated): "k2"/Shared -> txn={id=00000001 key=/Min iso=Serializable pri=0.00000000 epo=0 ts=10.000000000,0 min=0,0 seq=10} ts=10.000000000,0 del=false klen=0 vlen=0 mergeTs= txnDidNotUpdateMeta=true +lock (Replicated): "k3"/Exclusive -> txn={id=00000001 key=/Min iso=Serializable pri=0.00000000 epo=0 ts=10.000000000,0 min=0,0 seq=10} ts=10.000000000,0 del=false klen=0 vlen=0 mergeTs= txnDidNotUpdateMeta=true +lock (Replicated): "k6"/Exclusive -> txn={id=00000001 key=/Min iso=Serializable pri=0.00000000 epo=0 ts=10.000000000,0 min=0,0 seq=20} ts=10.000000000,0 del=false klen=0 vlen=0 mergeTs= txnDidNotUpdateMeta=true +lock (Replicated): "k6"/Shared -> txn={id=00000001 key=/Min iso=Serializable pri=0.00000000 epo=0 ts=10.000000000,0 min=0,0 seq=10} ts=10.000000000,0 del=false klen=0 vlen=0 mergeTs= txnDidNotUpdateMeta=true +lock (Replicated): "k7"/Exclusive -> txn={id=00000001 key=/Min iso=Serializable pri=0.00000000 epo=0 ts=10.000000000,0 min=0,0 seq=10} ts=10.000000000,0 del=false klen=0 vlen=0 mergeTs= txnDidNotUpdateMeta=true +lock (Replicated): "k7"/Shared -> txn={id=00000001 key=/Min iso=Serializable pri=0.00000000 epo=0 ts=10.000000000,0 min=0,0 seq=20} ts=10.000000000,0 del=false klen=0 vlen=0 mergeTs= txnDidNotUpdateMeta=true + +run ok +with t=A + txn_ignore_seqs seqs=(5-15) + verify_lock k=k6 str=shared + verify_lock k=k6 str=exclusive + verify_lock k=k7 str=shared + verify_lock k=k7 str=exclusive +---- +found: true +found: true +found: true +found: false +>> at end: +txn: "A" meta={id=00000001 key=/Min iso=Serializable pri=0.00000000 epo=0 ts=10.000000000,0 min=0,0 seq=20} lock=true stat=PENDING rts=10.000000000,0 wto=false gul=0,0 isn=1