From 21738f08ca3c442100eb958eecff7179201b10a0 Mon Sep 17 00:00:00 2001 From: Arul Ajmani Date: Fri, 8 Sep 2023 14:22:17 -0400 Subject: [PATCH] [WIP, DNM] kvserver: requests that acquire repl locks should use read-write path Previously, {Get,Scan,ReverseScan}Requests all used the read-only execution path. Things aren't so simple anymore, now that we want these requests to be able to acquire replicated locks, which means they need to go through raft (and therefore the read-write execution path). This patch achieves exactly that. Informs #100193 Release note: None --- pkg/kv/kvpb/api.go | 36 ++++++-- pkg/kv/kvserver/batcheval/cmd_get.go | 5 +- pkg/kv/kvserver/batcheval/cmd_reverse_scan.go | 5 +- pkg/kv/kvserver/batcheval/cmd_scan.go | 5 +- pkg/kv/kvserver/batcheval/intent.go | 82 ++++++++++++++++--- pkg/storage/mvcc.go | 18 ++++ 6 files changed, 132 insertions(+), 19 deletions(-) diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index b7c23b9e788d..75e1e2a2a8b7 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -93,8 +93,8 @@ var flagExclusions = map[flag][]flag{ // IsReadOnly returns true iff the request is read-only. A request is // read-only if it does not go through raft, meaning that it cannot -// change any replicated state. However, read-only requests may still -// acquire locks with an unreplicated durability level; see IsLocking. +// change any replicated state. However, read-only requests may still acquire +// locks, but only with unreplicated durability. func IsReadOnly(args Request) bool { flags := args.flags() return (flags&isRead) != 0 && (flags&isWrite) == 0 @@ -199,6 +199,7 @@ type Request interface { type LockingReadRequest interface { Request KeyLockingStrength() lock.Strength + KeyLockingDurabilityType() DurabilityType } var _ LockingReadRequest = (*GetRequest)(nil) @@ -208,6 +209,11 @@ func (gr *GetRequest) KeyLockingStrength() lock.Strength { return gr.KeyLocking } +// KeyLockingDurabilityType implements the LockingReadRequest interface. +func (gr *GetRequest) KeyLockingDurabilityType() DurabilityType { + return gr.DurabilityType +} + var _ LockingReadRequest = (*ScanRequest)(nil) // KeyLockingStrength implements the LockingReadRequest interface. @@ -215,6 +221,11 @@ func (sr *ScanRequest) KeyLockingStrength() lock.Strength { return sr.KeyLocking } +// KeyLockingDurabilityType implements the LockingReadRequest interface. +func (sr *ScanRequest) KeyLockingDurabilityType() DurabilityType { + return sr.DurabilityType +} + var _ LockingReadRequest = (*ReverseScanRequest)(nil) // KeyLockingStrength implements the LockingReadRequest interface. @@ -222,6 +233,11 @@ func (rsr *ReverseScanRequest) KeyLockingStrength() lock.Strength { return rsr.KeyLocking } +// KeyLockingDurabilityType implements the LockingReadRequest interface. +func (rsr *ReverseScanRequest) KeyLockingDurabilityType() DurabilityType { + return rsr.DurabilityType +} + // SizedWriteRequest is an interface used to expose the number of bytes a // request might write. type SizedWriteRequest interface { @@ -1354,9 +1370,17 @@ func flagForLockStrength(l lock.Strength) flag { return 0 } +func flagForIsReadOrIsWrite(l lock.Strength, durabilityType DurabilityType) flag { + if l == lock.None { + return isRead + } + return isWrite +} + func (gr *GetRequest) flags() flag { maybeLocking := flagForLockStrength(gr.KeyLocking) - return isRead | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked + maybeRead := flagForIsReadOrIsWrite(gr.KeyLocking, gr.DurabilityType) + return maybeRead | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked } func (*PutRequest) flags() flag { @@ -1446,12 +1470,14 @@ func (*RevertRangeRequest) flags() flag { func (sr *ScanRequest) flags() flag { maybeLocking := flagForLockStrength(sr.KeyLocking) - return isRead | isRange | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked + maybeRead := flagForIsReadOrIsWrite(sr.KeyLocking, sr.DurabilityType) + return maybeRead | isRange | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked } func (rsr *ReverseScanRequest) flags() flag { maybeLocking := flagForLockStrength(rsr.KeyLocking) - return isRead | isRange | isReverse | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked + maybeRead := flagForIsReadOrIsWrite(rsr.KeyLocking, rsr.DurabilityType) + return maybeRead | isRange | isReverse | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked } // EndTxn updates the timestamp cache to prevent replays. diff --git a/pkg/kv/kvserver/batcheval/cmd_get.go b/pkg/kv/kvserver/batcheval/cmd_get.go index fd4330d2abb1..9e9d5a10695c 100644 --- a/pkg/kv/kvserver/batcheval/cmd_get.go +++ b/pkg/kv/kvserver/batcheval/cmd_get.go @@ -84,7 +84,10 @@ func Get( var res result.Result if args.KeyLocking != lock.None && h.Txn != nil && getRes.Value != nil { - acq := roachpb.MakeLockAcquisition(h.Txn, args.Key, lock.Unreplicated, args.KeyLocking) + acq, err := acquireLockOnKey(ctx, readWriter, h.Txn, args.KeyLocking, args.DurabilityType, args.Key) + if err != nil { + return result.Result{}, err + } res.Local.AcquiredLocks = []roachpb.LockAcquisition{acq} } res.Local.EncounteredIntents = intents diff --git a/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go b/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go index 2aa45a5ca0e4..92257e83a2d1 100644 --- a/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go +++ b/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go @@ -110,11 +110,14 @@ func ReverseScan( } if args.KeyLocking != lock.None && h.Txn != nil { - err = acquireUnreplicatedLocksOnKeys(&res, h.Txn, args.KeyLocking, args.ScanFormat, &scanRes) + err = acquireLocksOnKeys( + ctx, readWriter, &res, h.Txn, args.KeyLocking, args.DurabilityType, args.ScanFormat, &scanRes, + ) if err != nil { return result.Result{}, err } } + res.Local.EncounteredIntents = scanRes.Intents return res, nil } diff --git a/pkg/kv/kvserver/batcheval/cmd_scan.go b/pkg/kv/kvserver/batcheval/cmd_scan.go index 13d4d3b63774..14207085c60b 100644 --- a/pkg/kv/kvserver/batcheval/cmd_scan.go +++ b/pkg/kv/kvserver/batcheval/cmd_scan.go @@ -110,11 +110,14 @@ func Scan( } if args.KeyLocking != lock.None && h.Txn != nil { - err = acquireUnreplicatedLocksOnKeys(&res, h.Txn, args.KeyLocking, args.ScanFormat, &scanRes) + err = acquireLocksOnKeys( + ctx, readWriter, &res, h.Txn, args.KeyLocking, args.DurabilityType, args.ScanFormat, &scanRes, + ) if err != nil { return result.Result{}, err } } + res.Local.EncounteredIntents = scanRes.Intents return res, nil } diff --git a/pkg/kv/kvserver/batcheval/intent.go b/pkg/kv/kvserver/batcheval/intent.go index 4f1a562e4fe2..a65e705a539d 100644 --- a/pkg/kv/kvserver/batcheval/intent.go +++ b/pkg/kv/kvserver/batcheval/intent.go @@ -104,28 +104,55 @@ func readProvisionalVal( } -// acquireUnreplicatedLocksOnKeys adds an unreplicated lock acquisition by the -// transaction to the provided result.Result for each key in the scan result. -func acquireUnreplicatedLocksOnKeys( +// acquireLocksOnKeys acquires locks on each of the keys in the result of a +// {,Reverse}ScanRequest. The locks are held by the specified transaction with +// the supplied locks strength. Best-effort locks are held in unreplicated +// fashion; locks that need guaranteed durability are replicated. +// +// It is possible to run into a lock conflict error when trying to acquire a +// lock on one of the keys. In such cases, a LockConflictError is returned to +// the caller. However, if locks have been successfully acquired on each of the +// keys, the provided result.Result is mutated accordingly. +func acquireLocksOnKeys( + ctx context.Context, + readWriter storage.ReadWriter, res *result.Result, txn *roachpb.Transaction, - str lock.Strength, + keyLocking lock.Strength, + durabilityType kvpb.DurabilityType, scanFmt kvpb.ScanFormat, scanRes *storage.MVCCScanResult, ) error { - res.Local.AcquiredLocks = make([]roachpb.LockAcquisition, scanRes.NumKeys) + acquiredLocks := make([]roachpb.LockAcquisition, scanRes.NumKeys) switch scanFmt { case kvpb.BATCH_RESPONSE: var i int - return storage.MVCCScanDecodeKeyValues(scanRes.KVData, func(key storage.MVCCKey, _ []byte) error { - res.Local.AcquiredLocks[i] = roachpb.MakeLockAcquisition(txn, copyKey(key.Key), lock.Unreplicated, str) + err := storage.MVCCScanDecodeKeyValues(scanRes.KVData, func(key storage.MVCCKey, _ []byte) error { + k := copyKey(key.Key) + acq, err := acquireLockOnKey(ctx, readWriter, txn, keyLocking, durabilityType, k) + if err != nil { + return err + } + acquiredLocks[i] = acq i++ return nil }) + if err != nil { + return err + } + res.Local.AcquiredLocks = acquiredLocks + return nil case kvpb.KEY_VALUES: for i, row := range scanRes.KVs { - res.Local.AcquiredLocks[i] = roachpb.MakeLockAcquisition(txn, copyKey(row.Key), lock.Unreplicated, str) + k := copyKey(row.Key) + acq, err := acquireLockOnKey(ctx, readWriter, txn, keyLocking, durabilityType, k) + if err != nil { + res.Local.AcquiredLocks = res.Local.AcquiredLocks[:] + return err + } + acquiredLocks[i] = acq } + res.Local.AcquiredLocks = acquiredLocks return nil case kvpb.COL_BATCH_RESPONSE: return errors.AssertionFailedf("unexpectedly acquiring unreplicated locks with COL_BATCH_RESPONSE scan format") @@ -134,14 +161,47 @@ func acquireUnreplicatedLocksOnKeys( } } +// acquireLockOnKey acquires a lock on the specified key. The lock is acquired +// with the supplied lock strength and are held by the specified transaction. +// Best-effort locks are held in unreplicated fashion; locks that need +// guaranteed durability are replicated. The resultant lock acquisition struct +// is returned, which the caller may accumulate in its result set. +// +// It is possible for lock acquisition to run into a lock conflict error, in +// which case a LockConflictError is returned to the caller. +func acquireLockOnKey( + ctx context.Context, + readWriter storage.ReadWriter, + txn *roachpb.Transaction, + str lock.Strength, + durabilityType kvpb.DurabilityType, + key roachpb.Key, +) (roachpb.LockAcquisition, error) { + switch durabilityType { + case kvpb.LockDurabilityBestEffort: + // TODO(arul): call into storage.MVCCCheckLock() here. + acq := roachpb.MakeLockAcquisition(txn, key, lock.Unreplicated, str) + return acq, nil + case kvpb.LockDurabilityGuaranteed: + if err := storage.MVCCAcquireLock(ctx, readWriter, txn, str, key); err != nil { + return roachpb.LockAcquisition{}, err + } + acq := roachpb.MakeLockAcquisition(txn, key, lock.Replicated, str) + return acq, nil + default: + panic("unexpected lock durability") + } +} + // copyKey copies the provided roachpb.Key into a new byte slice, returning the -// copy. It is used in acquireUnreplicatedLocksOnKeys for two reasons: +// copy. It is used in acquireLocksOnKeys for two reasons: // 1. the keys in an MVCCScanResult, regardless of the scan format used, point // to a small number of large, contiguous byte slices. These "MVCCScan // batches" contain keys and their associated values in the same backing // array. To avoid holding these entire backing arrays in memory and -// preventing them from being garbage collected indefinitely, we copy the key -// slices before coupling their lifetimes to those of unreplicated locks. +// preventing them from being garbage collected indefinitely, we copy the +// key slices before coupling their lifetimes to those of the constructed +// lock acquisitions. // 2. the KV API has a contract that byte slices returned from KV will not be // mutated by higher levels. However, we have seen cases (e.g.#64228) where // this contract is broken due to bugs. To defensively guard against this diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 1467c60d05ca..1f8efa1bb676 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -7388,3 +7388,21 @@ func MVCCLookupRangeKeyValue( // Made it! return val, nil } + +// MVCCAcquireLock acquires acquires a lock, held by the specified transaction +// with the supplied strength, on the specific key. An error is returned if +// there is a locking conflict. +// +// TODO(arul,nvanbeschoten): this is just a crude stub. +func MVCCAcquireLock( + ctx context.Context, + readWriter ReadWriter, + txn *roachpb.Transaction, + str lock.Strength, + key roachpb.Key, +) error { + // TODO(arul, nvanbenschoten): this is currently configured just to make sure + // we have access to a legit writer; it's not actually doing anything. + readWriter.ShouldWriteLocalTimestamps(ctx) + return nil +}