Skip to content

Commit

Permalink
kv: add ability to verify pipelined replicated shared/exclusive locks
Browse files Browse the repository at this point in the history
Previously, QueryIntent requests were only used to verify whether an
intent was successfully evaluated and replicated. This patch extends
QueryIntent request to also be able to verify whether a pipelined
shared or exclusive lock was successfully replicated or not.

Informs cockroachdb#117978

Release note: None
  • Loading branch information
arulajmani committed Mar 27, 2024
1 parent 085b9be commit 5cb2264
Show file tree
Hide file tree
Showing 13 changed files with 528 additions and 79 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -339,4 +339,4 @@ trace.snapshot.rate duration 0s if non-zero, interval at which background trace
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez application
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used. application
ui.display_timezone enumeration etc/utc the timezone used to format timestamps in the ui [etc/utc = 0, america/new_york = 1] application
version version 1000023.2-upgrading-to-1000024.1-step-022 set the active cluster version in the format '<major>.<minor>' application
version version 1000023.2-upgrading-to-1000024.1-step-024 set the active cluster version in the format '<major>.<minor>' application
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,6 @@
<tr><td><div id="setting-trace-span-registry-enabled" class="anchored"><code>trace.span_registry.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://&lt;ui&gt;/#/debug/tracez</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-trace-zipkin-collector" class="anchored"><code>trace.zipkin.collector</code></div></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 9411 will be used.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-ui-display-timezone" class="anchored"><code>ui.display_timezone</code></div></td><td>enumeration</td><td><code>etc/utc</code></td><td>the timezone used to format timestamps in the ui [etc/utc = 0, america/new_york = 1]</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000023.2-upgrading-to-1000024.1-step-022</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000023.2-upgrading-to-1000024.1-step-024</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
</tbody>
</table>
5 changes: 5 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ const (
// splits.
V24_1_EstimatedMVCCStatsInSplit

// V24_1_ReplicatedLockPipelining allows exclusive and shared replicated locks
// to be pipelined.
V24_1_ReplicatedLockPipelining

numKeys
)

Expand Down Expand Up @@ -372,6 +376,7 @@ var versionTable = [numKeys]roachpb.Version{
V24_1_SystemDatabaseSurvivability: {Major: 23, Minor: 2, Internal: 18},
V24_1_GossipMaximumIOOverload: {Major: 23, Minor: 2, Internal: 20},
V24_1_EstimatedMVCCStatsInSplit: {Major: 23, Minor: 2, Internal: 22},
V24_1_ReplicatedLockPipelining: {Major: 23, Minor: 2, Internal: 24},
}

// Latest is always the highest version key. This is the maximum logical cluster
Expand Down
15 changes: 15 additions & 0 deletions pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1999,6 +1999,21 @@ func (acrr *AdminChangeReplicasRequest) Changes() []ReplicationChange {
return sl
}

// StrengthOrDefault returns the strength of the lock being queried by the
// QueryIntentRequest.
func (qir *QueryIntentRequest) StrengthOrDefault() lock.Strength {
// TODO(arul): the Strength field on QueryIntentRequest was introduced in
// v24.1. Prior to that, rather unsurprisingly, QueryIntentRequest would only
// query replicated locks with strength. To maintain compatibility between
// v23.2 <-> v24.1 nodes, if this field is unset, we assume it's lock.Intent.
// In the future, once compatibility with v23.2 is no longer a concern, we
// should be able to get rid of this logic.
if qir.Strength == lock.None {
return lock.Intent
}
return qir.Strength
}

// AsLockUpdate creates a lock update message corresponding to the given resolve
// intent request.
func (rir *ResolveIntentRequest) AsLockUpdate() roachpb.LockUpdate {
Expand Down
13 changes: 13 additions & 0 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1436,6 +1436,19 @@ message QueryIntentRequest {
// If true, return an IntentMissingError if no matching intent (neither a
// "partial match" nor a "full match") is found.
bool error_if_missing = 3;

// The strength with which the lock being queried was acquired at. To ensure
// the supplied protection was provided, we check whether the lock was held
// with the supplied lock strength or something stronger at the sequence
// number.
kv.kvserver.concurrency.lock.Strength strength = 4;

// The list of sequence numbers that have been ignored by the transaction that
// acquired the lock. Any locks found at sequence numbers which are considered
// ignored will be treated as "not found"; that's because they can be removed
// at any time.
repeated storage.enginepb.IgnoredSeqNumRange ignored_seqnums = 5
[(gogoproto.nullable) = false, (gogoproto.customname) = "IgnoredSeqNums"];
}

// A QueryIntentResponse is the return value from the QueryIntent() method.
Expand Down
123 changes: 81 additions & 42 deletions pkg/kv/kvserver/batcheval/cmd_query_intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
Expand All @@ -28,17 +32,27 @@ func init() {
}

func declareKeysQueryIntent(
_ ImmutableRangeState,
rs ImmutableRangeState,
_ *kvpb.Header,
req kvpb.Request,
latchSpans *spanset.SpanSet,
_ *lockspanset.LockSpanSet,
_ time.Duration,
) error {
// QueryIntent requests read the specified keys at the maximum timestamp in
// order to read any intent present, if one exists, regardless of the
// timestamp it was written at.
// QueryIntent requests acquire a non-MVCC latch in order to read the queried
// lock, if one exists, regardless of the time it was written at. This
// isolates them from in-flight intent writes and exclusive lock acquisitions
// they're trying to query.
latchSpans.AddNonMVCC(spanset.SpanReadOnly, req.Header().Span())
// They also acquire a read latch on the per-transaction local key that all
// replicated shared lock acquisitions acquire latches on. This isolates them
// from the in-flight shared lock acquisition they're trying to query.
//
// TODO(arul): add a test.
txnID := req.(*kvpb.QueryIntentRequest).Txn.ID
latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{
Key: keys.ReplicatedSharedLocksTransactionLatchingKey(rs.GetRangeID(), txnID),
})
return nil
}

Expand Down Expand Up @@ -74,53 +88,78 @@ func QueryIntent(
h.Timestamp, args.Txn.WriteTimestamp)
}

// Read from the lock table to see if an intent exists.
intent, err := storage.GetIntent(ctx, reader, args.Key, storage.BatchEvalReadCategory)
if err != nil {
return result.Result{}, err
if enginepb.TxnSeqIsIgnored(args.Txn.Sequence, args.IgnoredSeqNums) {
return result.Result{}, errors.AssertionFailedf(
"QueryIntent request for lock at sequence number %d but sequence number is ignored %v",
args.Txn.Sequence, args.IgnoredSeqNums,
)
}

reply.FoundIntent = false
reply.FoundUnpushedIntent = false
if intent != nil {
// See comment on QueryIntentRequest.Txn for an explanation of this
// comparison.
// TODO(nvanbenschoten): Now that we have a full intent history,
// we can look at the exact sequence! That won't serve as much more
// than an assertion that QueryIntent is being used correctly.
reply.FoundIntent = (args.Txn.ID == intent.Txn.ID) &&
(args.Txn.Epoch == intent.Txn.Epoch) &&
(args.Txn.Sequence <= intent.Txn.Sequence)
var intent *roachpb.Intent

if !reply.FoundIntent {
log.VEventf(ctx, 2, "intent mismatch requires - %v == %v and %v == %v and %v <= %v",
args.Txn.ID, intent.Txn.ID, args.Txn.Epoch, intent.Txn.Epoch, args.Txn.Sequence, intent.Txn.Sequence)
} else {
// If we found a matching intent, check whether the intent was pushed past
// its expected timestamp.
cmpTS := args.Txn.WriteTimestamp
if ownTxn {
// If the request is querying an intent for its own transaction, forward
// the timestamp we compare against to the provisional commit timestamp
// in the batch header.
cmpTS.Forward(h.Txn.WriteTimestamp)
}
reply.FoundUnpushedIntent = intent.Txn.WriteTimestamp.LessEq(cmpTS)
// Intents have special handling because there's an associated timestamp
// component with them.
if args.StrengthOrDefault() == lock.Intent {
// Read from the lock table to see if an intent exists.
var err error
intent, err = storage.GetIntent(ctx, reader, args.Key, storage.BatchEvalReadCategory)
if err != nil {
return result.Result{}, err
}

if !reply.FoundUnpushedIntent {
log.VEventf(ctx, 2, "found pushed intent")
// If the request was querying an intent in its own transaction, update
// the response transaction.
// TODO(nvanbenschoten): if this is necessary for correctness, say so.
// And then add a test to demonstrate that.
reply.FoundIntent = false
reply.FoundUnpushedIntent = false
if intent != nil {
// See comment on QueryIntentRequest.Txn for an explanation of this
// comparison.
// TODO(nvanbenschoten): Now that we have a full intent history,
// we can look at the exact sequence! That won't serve as much more
// than an assertion that QueryIntent is being used correctly.
reply.FoundIntent = (args.Txn.ID == intent.Txn.ID) &&
(args.Txn.Epoch == intent.Txn.Epoch) &&
(args.Txn.Sequence <= intent.Txn.Sequence)

if !reply.FoundIntent {
log.VEventf(ctx, 2, "intent mismatch requires - %v == %v and %v == %v and %v <= %v",
args.Txn.ID, intent.Txn.ID, args.Txn.Epoch, intent.Txn.Epoch, args.Txn.Sequence, intent.Txn.Sequence)
} else {
// If we found a matching intent, check whether the intent was pushed past
// its expected timestamp.
cmpTS := args.Txn.WriteTimestamp
if ownTxn {
reply.Txn = h.Txn.Clone()
reply.Txn.WriteTimestamp.Forward(intent.Txn.WriteTimestamp)
// If the request is querying an intent for its own transaction, forward
// the timestamp we compare against to the provisional commit timestamp
// in the batch header.
cmpTS.Forward(h.Txn.WriteTimestamp)
}
reply.FoundUnpushedIntent = intent.Txn.WriteTimestamp.LessEq(cmpTS)

if !reply.FoundUnpushedIntent {
log.VEventf(ctx, 2, "found pushed intent")
// If the request was querying an intent in its own transaction, update
// the response transaction.
// TODO(nvanbenschoten): if this is necessary for correctness, say so.
// And then add a test to demonstrate that.
if ownTxn {
reply.Txn = h.Txn.Clone()
reply.Txn.WriteTimestamp.Forward(intent.Txn.WriteTimestamp)
}
}
}
} else {
log.VEventf(ctx, 2, "found no intent")
}
} else {
log.VEventf(ctx, 2, "found no intent")
found, err := storage.VerifyLock(
ctx, reader, &args.Txn, args.Strength, args.Key, args.IgnoredSeqNums,
)
if err != nil {
return result.Result{}, err
}
if found {
reply.FoundIntent = true
reply.FoundUnpushedIntent = true
}
}

if !reply.FoundIntent && args.ErrorIfMissing {
Expand Down
45 changes: 30 additions & 15 deletions pkg/kv/kvserver/replica_tscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,21 +296,36 @@ func (r *Replica) updateTimestampCache(
}
addToTSCache(start, end, ts, txnID)
case *kvpb.QueryIntentRequest:
missing := false
if pErr != nil {
_, missing = pErr.GetDetail().(*kvpb.IntentMissingError)
} else {
missing = !resp.(*kvpb.QueryIntentResponse).FoundUnpushedIntent
}
if missing {
// If the QueryIntent determined that the intent is missing
// then we update the timestamp cache at the intent's key to
// the intent's transactional timestamp. This will prevent
// the intent from ever being written in the future. We use
// an empty transaction ID so that we block the intent
// regardless of whether it is part of the current batch's
// transaction or not.
addToTSCache(start, end, t.Txn.WriteTimestamp, uuid.UUID{})
// NB: We only need to bump the timestamp cache if the QueryIntentRequest
// was querying a write intent and if it wasn't found. This prevents the
// intent from ever being written in the future. This is done for the
// benefit of txn recovery, where we don't want an intent to land after a
// QueryIntent request has already evaluated and determined the fate of
// the transaction being recovered. Letting the intent land would cause us
// to commit a transaction that we've determined was aborted.
//
// However, for other replicated locks (shared, exclusive), we know that
// they'll never be pipelined if they belong to a batch that's being
// committed in parallel. This means that any QueryIntent request for a
// replicated shared or exclusive lock is doing so with the knowledge that
// the request evaluated successfully (so it can't land later) -- it's
// only checking whether the replication succeeded or not.
if t.StrengthOrDefault() == lock.Intent {
missing := false
if pErr != nil {
_, missing = pErr.GetDetail().(*kvpb.IntentMissingError)
} else {
missing = !resp.(*kvpb.QueryIntentResponse).FoundUnpushedIntent
}
if missing {
// If the QueryIntent determined that the intent is missing then we
// update the timestamp cache at the intent's key to the intent's
// transactional timestamp. This will prevent the intent from ever
// being written in the future. We use an empty transaction ID so that
// we block the intent regardless of whether it is part of the current
// batch's transaction or not.
addToTSCache(start, end, t.Txn.WriteTimestamp, uuid.UUID{})
}
}
case *kvpb.ResolveIntentRequest:
// Update the timestamp cache on the key the request resolved if there
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/txnrecovery/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
deps = [
"//pkg/kv",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/roachpb",
"//pkg/util/hlc",
"//pkg/util/log",
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/txnrecovery/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -206,6 +207,9 @@ func (m *manager) resolveIndeterminateCommitForTxnProbe(
Key: w.Key,
},
Txn: meta,
// TODO(nvanbenschoten): pass in the correct lock strength here.
Strength: lock.Intent,
IgnoredSeqNums: txn.IgnoredSeqNums,
})
}

Expand Down
24 changes: 14 additions & 10 deletions pkg/storage/lock_table_key_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ func strongerOrEqualStrengths(str lock.Strength) []lock.Strength {
// the provided lock strength.
func minConflictLockStrength(str lock.Strength) (lock.Strength, error) {
switch str {
case lock.None:
// Don't conflict with any locks held by other transactions.
return lock.None, nil
case lock.Shared:
return lock.Exclusive, nil
case lock.Exclusive, lock.Intent:
Expand Down Expand Up @@ -113,31 +116,32 @@ var lockTableKeyScannerPool = sync.Pool{

// newLockTableKeyScanner creates a new lockTableKeyScanner.
//
// txn is the transaction attempting to acquire locks. If txn is not nil, locks
// held by the transaction with any strength will be accumulated into the
// ownLocks array. Otherwise, if txn is nil, the request is non-transactional
// and no locks will be accumulated into the ownLocks array.
// txnID corresponds to the ID of the transaction attempting to acquire locks.
// If txnID is valid (non-empty), locks held by the transaction with any
// strength will be accumulated into the ownLocks array. Otherwise, if txnID is
// empty, the request is non-transactional and no locks will be accumulated into
// the ownLocks array.
//
// str is the strength of the lock that the transaction (or non-transactional
// request) is attempting to acquire. The scanner will search for locks held by
// other transactions that conflict with this strength.
// other transactions that conflict with this strength[1].
//
// maxConflicts is the maximum number of conflicting locks that the scanner
// should accumulate before returning an error. If maxConflicts is zero, the
// scanner will accumulate all conflicting locks.
//
// [1] It's valid to pass in lock.None for str. lock.None doesn't conflict with
// any other replicated locks; as such, passing lock.None configures the scanner
// to only return locks from the supplied txnID.
func newLockTableKeyScanner(
ctx context.Context,
reader Reader,
txn *roachpb.Transaction,
txnID uuid.UUID,
str lock.Strength,
maxConflicts int64,
targetBytesPerConflict int64,
readCategory ReadCategory,
) (*lockTableKeyScanner, error) {
var txnID uuid.UUID
if txn != nil {
txnID = txn.ID
}
minConflictStr, err := minConflictLockStrength(str)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 5cb2264

Please sign in to comment.