Skip to content

Commit

Permalink
[WIP, DNM] kvserver: requests that acquire repl locks should use read…
Browse files Browse the repository at this point in the history
…-write path

Previously, {Get,Scan,ReverseScan}Requests all used the read-only
execution path. Things aren't so simple anymore, now that we want these
requests to be able to acquire replicated locks, which means they need
to go through raft (and therefore the read-write execution path). This
patch achieves exactly that.

Informs #100193

Release note: None
  • Loading branch information
arulajmani committed Sep 8, 2023
1 parent 943e2f5 commit 21738f0
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 19 deletions.
36 changes: 31 additions & 5 deletions pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ var flagExclusions = map[flag][]flag{

// IsReadOnly returns true iff the request is read-only. A request is
// read-only if it does not go through raft, meaning that it cannot
// change any replicated state. However, read-only requests may still
// acquire locks with an unreplicated durability level; see IsLocking.
// change any replicated state. However, read-only requests may still acquire
// locks, but only with unreplicated durability.
func IsReadOnly(args Request) bool {
flags := args.flags()
return (flags&isRead) != 0 && (flags&isWrite) == 0
Expand Down Expand Up @@ -199,6 +199,7 @@ type Request interface {
type LockingReadRequest interface {
Request
KeyLockingStrength() lock.Strength
KeyLockingDurabilityType() DurabilityType
}

var _ LockingReadRequest = (*GetRequest)(nil)
Expand All @@ -208,20 +209,35 @@ func (gr *GetRequest) KeyLockingStrength() lock.Strength {
return gr.KeyLocking
}

// KeyLockingDurabilityType implements the LockingReadRequest interface.
func (gr *GetRequest) KeyLockingDurabilityType() DurabilityType {
return gr.DurabilityType
}

var _ LockingReadRequest = (*ScanRequest)(nil)

// KeyLockingStrength implements the LockingReadRequest interface.
func (sr *ScanRequest) KeyLockingStrength() lock.Strength {
return sr.KeyLocking
}

// KeyLockingDurabilityType implements the LockingReadRequest interface.
func (sr *ScanRequest) KeyLockingDurabilityType() DurabilityType {
return sr.DurabilityType
}

var _ LockingReadRequest = (*ReverseScanRequest)(nil)

// KeyLockingStrength implements the LockingReadRequest interface.
func (rsr *ReverseScanRequest) KeyLockingStrength() lock.Strength {
return rsr.KeyLocking
}

// KeyLockingDurabilityType implements the LockingReadRequest interface.
func (rsr *ReverseScanRequest) KeyLockingDurabilityType() DurabilityType {
return rsr.DurabilityType
}

// SizedWriteRequest is an interface used to expose the number of bytes a
// request might write.
type SizedWriteRequest interface {
Expand Down Expand Up @@ -1354,9 +1370,17 @@ func flagForLockStrength(l lock.Strength) flag {
return 0
}

func flagForIsReadOrIsWrite(l lock.Strength, durabilityType DurabilityType) flag {
if l == lock.None {
return isRead
}
return isWrite
}

func (gr *GetRequest) flags() flag {
maybeLocking := flagForLockStrength(gr.KeyLocking)
return isRead | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked
maybeRead := flagForIsReadOrIsWrite(gr.KeyLocking, gr.DurabilityType)
return maybeRead | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked
}

func (*PutRequest) flags() flag {
Expand Down Expand Up @@ -1446,12 +1470,14 @@ func (*RevertRangeRequest) flags() flag {

func (sr *ScanRequest) flags() flag {
maybeLocking := flagForLockStrength(sr.KeyLocking)
return isRead | isRange | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked
maybeRead := flagForIsReadOrIsWrite(sr.KeyLocking, sr.DurabilityType)
return maybeRead | isRange | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked
}

func (rsr *ReverseScanRequest) flags() flag {
maybeLocking := flagForLockStrength(rsr.KeyLocking)
return isRead | isRange | isReverse | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked
maybeRead := flagForIsReadOrIsWrite(rsr.KeyLocking, rsr.DurabilityType)
return maybeRead | isRange | isReverse | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked
}

// EndTxn updates the timestamp cache to prevent replays.
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ func Get(

var res result.Result
if args.KeyLocking != lock.None && h.Txn != nil && getRes.Value != nil {
acq := roachpb.MakeLockAcquisition(h.Txn, args.Key, lock.Unreplicated, args.KeyLocking)
acq, err := acquireLockOnKey(ctx, readWriter, h.Txn, args.KeyLocking, args.DurabilityType, args.Key)
if err != nil {
return result.Result{}, err
}
res.Local.AcquiredLocks = []roachpb.LockAcquisition{acq}
}
res.Local.EncounteredIntents = intents
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_reverse_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,14 @@ func ReverseScan(
}

if args.KeyLocking != lock.None && h.Txn != nil {
err = acquireUnreplicatedLocksOnKeys(&res, h.Txn, args.KeyLocking, args.ScanFormat, &scanRes)
err = acquireLocksOnKeys(
ctx, readWriter, &res, h.Txn, args.KeyLocking, args.DurabilityType, args.ScanFormat, &scanRes,
)
if err != nil {
return result.Result{}, err
}
}

res.Local.EncounteredIntents = scanRes.Intents
return res, nil
}
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,14 @@ func Scan(
}

if args.KeyLocking != lock.None && h.Txn != nil {
err = acquireUnreplicatedLocksOnKeys(&res, h.Txn, args.KeyLocking, args.ScanFormat, &scanRes)
err = acquireLocksOnKeys(
ctx, readWriter, &res, h.Txn, args.KeyLocking, args.DurabilityType, args.ScanFormat, &scanRes,
)
if err != nil {
return result.Result{}, err
}
}

res.Local.EncounteredIntents = scanRes.Intents
return res, nil
}
82 changes: 71 additions & 11 deletions pkg/kv/kvserver/batcheval/intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,28 +104,55 @@ func readProvisionalVal(

}

// acquireUnreplicatedLocksOnKeys adds an unreplicated lock acquisition by the
// transaction to the provided result.Result for each key in the scan result.
func acquireUnreplicatedLocksOnKeys(
// acquireLocksOnKeys acquires locks on each of the keys in the result of a
// {,Reverse}ScanRequest. The locks are held by the specified transaction with
// the supplied locks strength. Best-effort locks are held in unreplicated
// fashion; locks that need guaranteed durability are replicated.
//
// It is possible to run into a lock conflict error when trying to acquire a
// lock on one of the keys. In such cases, a LockConflictError is returned to
// the caller. However, if locks have been successfully acquired on each of the
// keys, the provided result.Result is mutated accordingly.
func acquireLocksOnKeys(
ctx context.Context,
readWriter storage.ReadWriter,
res *result.Result,
txn *roachpb.Transaction,
str lock.Strength,
keyLocking lock.Strength,
durabilityType kvpb.DurabilityType,
scanFmt kvpb.ScanFormat,
scanRes *storage.MVCCScanResult,
) error {
res.Local.AcquiredLocks = make([]roachpb.LockAcquisition, scanRes.NumKeys)
acquiredLocks := make([]roachpb.LockAcquisition, scanRes.NumKeys)
switch scanFmt {
case kvpb.BATCH_RESPONSE:
var i int
return storage.MVCCScanDecodeKeyValues(scanRes.KVData, func(key storage.MVCCKey, _ []byte) error {
res.Local.AcquiredLocks[i] = roachpb.MakeLockAcquisition(txn, copyKey(key.Key), lock.Unreplicated, str)
err := storage.MVCCScanDecodeKeyValues(scanRes.KVData, func(key storage.MVCCKey, _ []byte) error {
k := copyKey(key.Key)
acq, err := acquireLockOnKey(ctx, readWriter, txn, keyLocking, durabilityType, k)
if err != nil {
return err
}
acquiredLocks[i] = acq
i++
return nil
})
if err != nil {
return err
}
res.Local.AcquiredLocks = acquiredLocks
return nil
case kvpb.KEY_VALUES:
for i, row := range scanRes.KVs {
res.Local.AcquiredLocks[i] = roachpb.MakeLockAcquisition(txn, copyKey(row.Key), lock.Unreplicated, str)
k := copyKey(row.Key)
acq, err := acquireLockOnKey(ctx, readWriter, txn, keyLocking, durabilityType, k)
if err != nil {
res.Local.AcquiredLocks = res.Local.AcquiredLocks[:]
return err
}
acquiredLocks[i] = acq
}
res.Local.AcquiredLocks = acquiredLocks
return nil
case kvpb.COL_BATCH_RESPONSE:
return errors.AssertionFailedf("unexpectedly acquiring unreplicated locks with COL_BATCH_RESPONSE scan format")
Expand All @@ -134,14 +161,47 @@ func acquireUnreplicatedLocksOnKeys(
}
}

// acquireLockOnKey acquires a lock on the specified key. The lock is acquired
// with the supplied lock strength and are held by the specified transaction.
// Best-effort locks are held in unreplicated fashion; locks that need
// guaranteed durability are replicated. The resultant lock acquisition struct
// is returned, which the caller may accumulate in its result set.
//
// It is possible for lock acquisition to run into a lock conflict error, in
// which case a LockConflictError is returned to the caller.
func acquireLockOnKey(
ctx context.Context,
readWriter storage.ReadWriter,
txn *roachpb.Transaction,
str lock.Strength,
durabilityType kvpb.DurabilityType,
key roachpb.Key,
) (roachpb.LockAcquisition, error) {
switch durabilityType {
case kvpb.LockDurabilityBestEffort:
// TODO(arul): call into storage.MVCCCheckLock() here.
acq := roachpb.MakeLockAcquisition(txn, key, lock.Unreplicated, str)
return acq, nil
case kvpb.LockDurabilityGuaranteed:
if err := storage.MVCCAcquireLock(ctx, readWriter, txn, str, key); err != nil {
return roachpb.LockAcquisition{}, err
}
acq := roachpb.MakeLockAcquisition(txn, key, lock.Replicated, str)
return acq, nil
default:
panic("unexpected lock durability")
}
}

// copyKey copies the provided roachpb.Key into a new byte slice, returning the
// copy. It is used in acquireUnreplicatedLocksOnKeys for two reasons:
// copy. It is used in acquireLocksOnKeys for two reasons:
// 1. the keys in an MVCCScanResult, regardless of the scan format used, point
// to a small number of large, contiguous byte slices. These "MVCCScan
// batches" contain keys and their associated values in the same backing
// array. To avoid holding these entire backing arrays in memory and
// preventing them from being garbage collected indefinitely, we copy the key
// slices before coupling their lifetimes to those of unreplicated locks.
// preventing them from being garbage collected indefinitely, we copy the
// key slices before coupling their lifetimes to those of the constructed
// lock acquisitions.
// 2. the KV API has a contract that byte slices returned from KV will not be
// mutated by higher levels. However, we have seen cases (e.g.#64228) where
// this contract is broken due to bugs. To defensively guard against this
Expand Down
18 changes: 18 additions & 0 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7388,3 +7388,21 @@ func MVCCLookupRangeKeyValue(
// Made it!
return val, nil
}

// MVCCAcquireLock acquires acquires a lock, held by the specified transaction
// with the supplied strength, on the specific key. An error is returned if
// there is a locking conflict.
//
// TODO(arul,nvanbeschoten): this is just a crude stub.
func MVCCAcquireLock(
ctx context.Context,
readWriter ReadWriter,
txn *roachpb.Transaction,
str lock.Strength,
key roachpb.Key,
) error {
// TODO(arul, nvanbenschoten): this is currently configured just to make sure
// we have access to a legit writer; it's not actually doing anything.
readWriter.ShouldWriteLocalTimestamps(ctx)
return nil
}

0 comments on commit 21738f0

Please sign in to comment.