Skip to content

Commit

Permalink
kv: add ability to verify pipelined replicated shared/exclusive locks
Browse files Browse the repository at this point in the history
Previously, QueryIntent requests were only used to verify whether an
intent was successfully evaluated and replicated. This patch extends
QueryIntent request to also be able to verify whether a pipelined
shared or exclusive lock was successfully replicated or not.

Informs #117978

Release note: None
  • Loading branch information
arulajmani committed Mar 27, 2024
1 parent 6e8d2cd commit 70b7031
Show file tree
Hide file tree
Showing 9 changed files with 437 additions and 67 deletions.
13 changes: 13 additions & 0 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1436,6 +1436,19 @@ message QueryIntentRequest {
// If true, return an IntentMissingError if no matching intent (neither a
// "partial match" nor a "full match") is found.
bool error_if_missing = 3;

// The strength with which the lock being queried was acquired at. To ensure
// the supplied protection was provided, we check whether the lock was held
// with the supplied lock strength or something stronger at the sequence
// number.
kv.kvserver.concurrency.lock.Strength lock_strength = 4;

// The list of sequence numbers that have been ignored by the transaction that
// acquired the lock. Any locks found at sequence numbers which are considered
// ignored will be treated as "not found"; that's because they can be removed
// at any time.
repeated storage.enginepb.IgnoredSeqNumRange ignored_seqnums = 5
[(gogoproto.nullable) = false, (gogoproto.customname) = "IgnoredSeqNums"];
}

// A QueryIntentResponse is the return value from the QueryIntent() method.
Expand Down
133 changes: 86 additions & 47 deletions pkg/kv/kvserver/batcheval/cmd_query_intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
Expand All @@ -28,17 +31,32 @@ func init() {
}

func declareKeysQueryIntent(
_ ImmutableRangeState,
rs ImmutableRangeState,
_ *kvpb.Header,
req kvpb.Request,
latchSpans *spanset.SpanSet,
_ *lockspanset.LockSpanSet,
_ time.Duration,
) error {
// QueryIntent requests read the specified keys at the maximum timestamp in
// order to read any intent present, if one exists, regardless of the
// timestamp it was written at.
// QueryIntent requests acquire a non-MVCC latch in order to read the queried
// lock, if one exists, regardless of the time it was written at. This
// isolates them from in-flight intent writes and exclusive lock acquisitions.
latchSpans.AddNonMVCC(spanset.SpanReadOnly, req.Header().Span())
// They also acquire a read latch on the per-transaction local key that all
// replicated shared lock acquisitions acquire latches on to isolate them
// To isolate themselves from any in-flight shared locking requests that they
// TODO(arul): add a test.
//
// TODO(XXX): Do we really need this? We're saying that we don't need to
// prevent replicated locks from landing after the query intent request
// because pipelined replicated locks can never be part of a batch that's
// being committed in parallel. As such, there can't be any in-flight requests
// (that haven't evaluated yet) that acquire replicated locks and pipeline
// them. Does that mean this latching isn't required?
txnID := req.(*kvpb.QueryIntentRequest).Txn.ID
latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{
Key: keys.ReplicatedSharedLocksTransactionLatchingKey(rs.GetRangeID(), txnID),
})
return nil
}

Expand Down Expand Up @@ -74,57 +92,78 @@ func QueryIntent(
h.Timestamp, args.Txn.WriteTimestamp)
}

// Read from the lock table to see if an intent exists.
intent, err := storage.GetIntent(ctx, reader, args.Key, storage.BatchEvalReadCategory)
if err != nil {
return result.Result{}, err
}
// Intents have special handling because there's an associated timestamp
// component with them.
//
// TODO(arul): We should be able to remove the lock.None case once
// compatibility with 24.1 is no longer an issue.
if args.LockStrength == lock.Intent || args.LockStrength == lock.None {
// Read from the lock table to see if an intent exists.
intent, err := storage.GetIntent(ctx, reader, args.Key, storage.BatchEvalReadCategory)
if err != nil {
return result.Result{}, err
}

reply.FoundIntent = false
reply.FoundUnpushedIntent = false
if intent != nil {
// See comment on QueryIntentRequest.Txn for an explanation of this
// comparison.
// TODO(nvanbenschoten): Now that we have a full intent history,
// we can look at the exact sequence! That won't serve as much more
// than an assertion that QueryIntent is being used correctly.
reply.FoundIntent = (args.Txn.ID == intent.Txn.ID) &&
(args.Txn.Epoch == intent.Txn.Epoch) &&
(args.Txn.Sequence <= intent.Txn.Sequence)
reply.FoundIntent = false
reply.FoundUnpushedIntent = false
if intent != nil {
// See comment on QueryIntentRequest.Txn for an explanation of this
// comparison.
// TODO(nvanbenschoten): Now that we have a full intent history,
// we can look at the exact sequence! That won't serve as much more
// than an assertion that QueryIntent is being used correctly.
reply.FoundIntent = (args.Txn.ID == intent.Txn.ID) &&
(args.Txn.Epoch == intent.Txn.Epoch) &&
(args.Txn.Sequence <= intent.Txn.Sequence)

if !reply.FoundIntent {
log.VEventf(ctx, 2, "intent mismatch requires - %v == %v and %v == %v and %v <= %v",
args.Txn.ID, intent.Txn.ID, args.Txn.Epoch, intent.Txn.Epoch, args.Txn.Sequence, intent.Txn.Sequence)
} else {
// If we found a matching intent, check whether the intent was pushed past
// its expected timestamp.
cmpTS := args.Txn.WriteTimestamp
if ownTxn {
// If the request is querying an intent for its own transaction, forward
// the timestamp we compare against to the provisional commit timestamp
// in the batch header.
cmpTS.Forward(h.Txn.WriteTimestamp)
}
reply.FoundUnpushedIntent = intent.Txn.WriteTimestamp.LessEq(cmpTS)

if !reply.FoundUnpushedIntent {
log.VEventf(ctx, 2, "found pushed intent")
// If the request was querying an intent in its own transaction, update
// the response transaction.
// TODO(nvanbenschoten): if this is necessary for correctness, say so.
// And then add a test to demonstrate that.
if !reply.FoundIntent {
log.VEventf(ctx, 2, "intent mismatch requires - %v == %v and %v == %v and %v <= %v",
args.Txn.ID, intent.Txn.ID, args.Txn.Epoch, intent.Txn.Epoch, args.Txn.Sequence, intent.Txn.Sequence)
} else {
// If we found a matching intent, check whether the intent was pushed past
// its expected timestamp.
cmpTS := args.Txn.WriteTimestamp
if ownTxn {
reply.Txn = h.Txn.Clone()
reply.Txn.WriteTimestamp.Forward(intent.Txn.WriteTimestamp)
// If the request is querying an intent for its own transaction, forward
// the timestamp we compare against to the provisional commit timestamp
// in the batch header.
cmpTS.Forward(h.Txn.WriteTimestamp)
}
reply.FoundUnpushedIntent = intent.Txn.WriteTimestamp.LessEq(cmpTS)

if !reply.FoundUnpushedIntent {
log.VEventf(ctx, 2, "found pushed intent")
// If the request was querying an intent in its own transaction, update
// the response transaction.
// TODO(nvanbenschoten): if this is necessary for correctness, say so.
// And then add a test to demonstrate that.
if ownTxn {
reply.Txn = h.Txn.Clone()
reply.Txn.WriteTimestamp.Forward(intent.Txn.WriteTimestamp)
}
}
}
} else {
log.VEventf(ctx, 2, "found no intent")
}
if !reply.FoundIntent && args.ErrorIfMissing {
return result.Result{}, kvpb.NewIntentMissingError(args.Key, intent)
}
} else {
log.VEventf(ctx, 2, "found no intent")
found, err := storage.VerifyLock(
ctx, reader, &args.Txn, args.LockStrength, args.Key, args.IgnoredSeqNums,
)
if err != nil {
return result.Result{}, err
}
if found {
reply.FoundIntent = true
reply.FoundUnpushedIntent = true
}
if !reply.FoundIntent && args.ErrorIfMissing {
return result.Result{}, kvpb.NewIntentMissingError(args.Key, nil /* intent */)
}
}

if !reply.FoundIntent && args.ErrorIfMissing {
return result.Result{}, kvpb.NewIntentMissingError(args.Key, intent)
}
return result.Result{}, nil
}
12 changes: 12 additions & 0 deletions pkg/kv/kvserver/replica_tscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,18 @@ func (r *Replica) updateTimestampCache(
// transaction or not.
addToTSCache(start, end, t.Txn.WriteTimestamp, uuid.UUID{})
}
// NB: If this QueryIntentRequest was querying a replicated lock instead
// of a write intent, we don't need to worry about updating the timestamp
// cache to prevent the replicated lock from ever being acquired after the
// QueryIntentRequest has evaluated. This is unlike write intents, where
// we prevent them from ever being written in the future. This is done
// for the benefit of txn recovery, where we don't want an intent to land
// after a QueryTxn request has evaluated. However, for replicated locks,
// we know that they'll never be pipelined if they're part of a batch being
// committed in parallel. This means any QueryIntent request for a replicated
// shared or exclusive lock is doing so with the knowledge that the request
// evaluated successfully (so it can't land later) -- it's only checking
// whether replication succeeded or not.
case *kvpb.ResolveIntentRequest:
// Update the timestamp cache on the key the request resolved if there
// was a replicated {shared, exclusive} lock on that key which was
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/txnrecovery/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
deps = [
"//pkg/kv",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/roachpb",
"//pkg/util/hlc",
"//pkg/util/log",
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/txnrecovery/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -205,7 +206,9 @@ func (m *manager) resolveIndeterminateCommitForTxnProbe(
RequestHeader: kvpb.RequestHeader{
Key: w.Key,
},
Txn: meta,
Txn: meta,
LockStrength: lock.Intent,
IgnoredSeqNums: txn.IgnoredSeqNums,
})
}

Expand Down
27 changes: 18 additions & 9 deletions pkg/storage/lock_table_key_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ func strongerOrEqualStrengths(str lock.Strength) []lock.Strength {
// the provided lock strength.
func minConflictLockStrength(str lock.Strength) (lock.Strength, error) {
switch str {
case lock.None:
// Don't conflict with any locks held by other transactions.
return lock.None, nil
case lock.Shared:
return lock.Exclusive, nil
case lock.Exclusive, lock.Intent:
Expand Down Expand Up @@ -113,30 +116,36 @@ var lockTableKeyScannerPool = sync.Pool{

// newLockTableKeyScanner creates a new lockTableKeyScanner.
//
// txn is the transaction attempting to acquire locks. If txn is not nil, locks
// held by the transaction with any strength will be accumulated into the
// ownLocks array. Otherwise, if txn is nil, the request is non-transactional
// and no locks will be accumulated into the ownLocks array.
// txnID corresponds to the ID of the transaction attempting to acquire locks.
// If txnID is valid (non-empty), locks held by the transaction with any
// strength will be accumulated into the ownLocks array. Otherwise, if txnID is
// empty, the request is non-transactional and no locks will be accumulated into
// the ownLocks array.
//
// str is the strength of the lock that the transaction (or non-transactional
// request) is attempting to acquire. The scanner will search for locks held by
// other transactions that conflict with this strength.
// other transactions that conflict with this strength[1].
//
// maxConflicts is the maximum number of conflicting locks that the scanner
// should accumulate before returning an error. If maxConflicts is zero, the
// scanner will accumulate all conflicting locks.
//
// [1] It's valid to pass in lock.None for str. lock.None doesn't conflict with
// any other replicated locks; as such, passing lock.None configures the scanner
// to only return locks from the supplied txnID.
func newLockTableKeyScanner(
ctx context.Context,
reader Reader,
txn *roachpb.Transaction,
txnID uuid.UUID,
str lock.Strength,
maxConflicts int64,
targetBytesPerConflict int64,
readCategory ReadCategory,
) (*lockTableKeyScanner, error) {
var txnID uuid.UUID
if txn != nil {
txnID = txn.ID
if txnID.Equal(uuid.UUID{}) && str == lock.None {
return nil, errors.AssertionFailedf(
"configuring the scanner with an empty transaction ID and no locking strength is nonsensical",
)
}
minConflictStr, err := minConflictLockStrength(str)
if err != nil {
Expand Down
Loading

0 comments on commit 70b7031

Please sign in to comment.