Skip to content

Commit

Permalink
kvserver: requests that acquire repl locks should use read-write path
Browse files Browse the repository at this point in the history
Previously, {Get,Scan,ReverseScan}Requests all used the read-only
execution path. Things aren't so simple anymore, now that we want these
requests to be able to acquire replicated locks, which means they need
to go through raft (and therefore the read-write execution path). This
patch achieves exactly that.

Informs #100193

Release note: None
  • Loading branch information
arulajmani committed Sep 26, 2023
1 parent c704759 commit 7f78c5f
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 63 deletions.
22 changes: 13 additions & 9 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,25 +85,29 @@ func (s *txnSeqNumAllocator) SendLocked(
) (*kvpb.BatchResponse, *kvpb.Error) {
for _, ru := range ba.Requests {
req := ru.GetInner()
oldHeader := req.Header()
// Only increment the sequence number generator for requests that
// will leave intents or requests that will commit the transaction.
// This enables ba.IsCompleteTransaction to work properly.
//
// Note: requests that perform writes using write intents and the EndTxn
// request cannot operate at a past sequence number. This also applies to
// combined read/intent-write requests (e.g. CPuts) -- these always read at
// the latest write sequence number as well.
//
// Requests that do not perform intent writes use the read sequence number.
// Notably, this includes Get/Scan/ReverseScan requests that acquire
// replicated locks, even though they go through raft.
if kvpb.IsIntentWrite(req) || req.Method() == kvpb.EndTxn {
s.writeSeq++
if err := s.maybeAutoStepReadSeqLocked(ctx); err != nil {
return nil, kvpb.NewError(err)
}
}

// Note: only read-only requests can operate at a past seqnum.
// Combined read/write requests (e.g. CPut) always read at the
// latest write seqnum.
oldHeader := req.Header()
if kvpb.IsReadOnly(req) {
oldHeader.Sequence = s.readSeq
} else {
oldHeader.Sequence = s.writeSeq
} else {
oldHeader.Sequence = s.readSeq
}

req.SetHeader(oldHeader)
}

Expand Down
20 changes: 15 additions & 5 deletions pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ var flagExclusions = map[flag][]flag{

// IsReadOnly returns true iff the request is read-only. A request is
// read-only if it does not go through raft, meaning that it cannot
// change any replicated state. However, read-only requests may still
// acquire locks with an unreplicated durability level; see IsLocking.
// change any replicated state. However, read-only requests may still acquire
// locks, but only with unreplicated durability.
func IsReadOnly(args Request) bool {
flags := args.flags()
return (flags&isRead) != 0 && (flags&isWrite) == 0
Expand Down Expand Up @@ -1382,9 +1382,17 @@ func flagForLockStrength(l lock.Strength) flag {
return 0
}

func flagForLockDurability(d lock.Durability) flag {
if d == lock.Replicated {
return isWrite
}
return 0
}

func (gr *GetRequest) flags() flag {
maybeLocking := flagForLockStrength(gr.KeyLockingStrength)
return isRead | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked
maybeWrite := flagForLockDurability(gr.KeyLockingDurability)
return isRead | maybeWrite | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked
}

func (*PutRequest) flags() flag {
Expand Down Expand Up @@ -1474,12 +1482,14 @@ func (*RevertRangeRequest) flags() flag {

func (sr *ScanRequest) flags() flag {
maybeLocking := flagForLockStrength(sr.KeyLockingStrength)
return isRead | isRange | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked
maybeWrite := flagForLockDurability(sr.KeyLockingDurability)
return isRead | maybeWrite | isRange | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked
}

func (rsr *ReverseScanRequest) flags() flag {
maybeLocking := flagForLockStrength(rsr.KeyLockingStrength)
return isRead | isRange | isReverse | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked
maybeWrite := flagForLockDurability(rsr.KeyLockingDurability)
return isRead | maybeWrite | isRange | isReverse | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked
}

// EndTxn updates the timestamp cache to prevent replays.
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ func Get(

var res result.Result
if args.KeyLockingStrength != lock.None && h.Txn != nil && getRes.Value != nil {
acq := roachpb.MakeLockAcquisition(h.Txn, args.Key, lock.Unreplicated, args.KeyLockingStrength)
acq, err := acquireLockOnKey(ctx, readWriter, h.Txn, args.KeyLockingStrength,
args.KeyLockingDurability, args.Key)
if err != nil {
return result.Result{}, err
}
res.Local.AcquiredLocks = []roachpb.LockAcquisition{acq}
}
res.Local.EncounteredIntents = intents
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_reverse_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,14 @@ func ReverseScan(
}

if args.KeyLockingStrength != lock.None && h.Txn != nil {
err = acquireUnreplicatedLocksOnKeys(&res, h.Txn, args.KeyLockingStrength, args.ScanFormat, &scanRes)
acquiredLocks, err := acquireLocksOnKeys(ctx, readWriter, h.Txn, args.KeyLockingStrength,
args.KeyLockingDurability, args.ScanFormat, &scanRes)
if err != nil {
return result.Result{}, err
}
res.Local.AcquiredLocks = acquiredLocks
}

res.Local.EncounteredIntents = scanRes.Intents
return res, nil
}
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,14 @@ func Scan(
}

if args.KeyLockingStrength != lock.None && h.Txn != nil {
err = acquireUnreplicatedLocksOnKeys(&res, h.Txn, args.KeyLockingStrength, args.ScanFormat, &scanRes)
acquiredLocks, err := acquireLocksOnKeys(ctx, readWriter, h.Txn, args.KeyLockingStrength,
args.KeyLockingDurability, args.ScanFormat, &scanRes)
if err != nil {
return result.Result{}, err
}
res.Local.AcquiredLocks = acquiredLocks
}

res.Local.EncounteredIntents = scanRes.Intents
return res, nil
}
68 changes: 37 additions & 31 deletions pkg/kv/kvserver/batcheval/declare.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,39 +57,45 @@ func DefaultDeclareIsolatedKeys(
lockSpans *lockspanset.LockSpanSet,
maxOffset time.Duration,
) {
access := spanset.SpanReadWrite
str := lock.Intent
var access spanset.SpanAccess
var str lock.Strength
timestamp := header.Timestamp
if kvpb.IsReadOnly(req) {
if !kvpb.IsLocking(req) {
access = spanset.SpanReadOnly
str = lock.None
// For non-locking reads, acquire read latches all the way up to the
// request's worst-case (i.e. global) uncertainty limit, because reads may
// observe writes all the way up to this timestamp.
//
// It is critical that reads declare latches up through their uncertainty
// interval so that they are properly synchronized with earlier writes that
// may have a happened-before relationship with the read. These writes could
// not have completed and returned to the client until they were durable in
// the Range's Raft log. However, they may not have been applied to the
// replica's state machine by the time the write was acknowledged, because
// Raft entry application occurs asynchronously with respect to the writer
// (see AckCommittedEntriesBeforeApplication). Latching is the only
// mechanism that ensures that any observers of the write wait for the write
// apply before reading.
//
// NOTE: we pass an empty lease status here, which means that observed
// timestamps collected by transactions will not be used. The actual
// uncertainty interval used by the request may be smaller (i.e. contain a
// local limit), but we can't determine that until after we have declared
// keys, acquired latches, and consulted the replica's lease.
in := uncertainty.ComputeInterval(header, kvserverpb.LeaseStatus{}, maxOffset)
timestamp.Forward(in.GlobalLimit)
} else {
str, _ = req.(kvpb.LockingReadRequest).KeyLocking()

if kvpb.IsReadOnly(req) && !kvpb.IsLocking(req) {
str = lock.None
access = spanset.SpanReadOnly
// For non-locking reads, acquire read latches all the way up to the
// request's worst-case (i.e. global) uncertainty limit, because reads may
// observe writes all the way up to this timestamp.
//
// It is critical that reads declare latches up through their uncertainty
// interval so that they are properly synchronized with earlier writes that
// may have a happened-before relationship with the read. These writes could
// not have completed and returned to the client until they were durable in
// the Range's Raft log. However, they may not have been applied to the
// replica's state machine by the time the write was acknowledged, because
// Raft entry application occurs asynchronously with respect to the writer
// (see AckCommittedEntriesBeforeApplication). Latching is the only
// mechanism that ensures that any observers of the write wait for the write
// apply before reading.
//
// NOTE: we pass an empty lease status here, which means that observed
// timestamps collected by transactions will not be used. The actual
// uncertainty interval used by the request may be smaller (i.e. contain a
// local limit), but we can't determine that until after we have declared
// keys, acquired latches, and consulted the replica's lease.
in := uncertainty.ComputeInterval(header, kvserverpb.LeaseStatus{}, maxOffset)
timestamp.Forward(in.GlobalLimit)
} else {
str = lock.Intent
access = spanset.SpanReadWrite
// Get the correct lock strength to use for {lock,latch} spans if we're
// dealing with locking read requests.
if readOnlyReq, ok := req.(kvpb.LockingReadRequest); ok {
str, _ = readOnlyReq.KeyLocking()
switch str {
// The lock.None case has already been handled above.
case lock.None:
panic(errors.AssertionFailedf("unexpected non-locking read handling"))
case lock.Shared:
access = spanset.SpanReadOnly
// Unlike non-locking reads, shared-locking reads are isolated from
Expand Down
93 changes: 78 additions & 15 deletions pkg/kv/kvserver/batcheval/intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand Down Expand Up @@ -104,44 +103,108 @@ func readProvisionalVal(

}

// acquireUnreplicatedLocksOnKeys adds an unreplicated lock acquisition by the
// transaction to the provided result.Result for each key in the scan result.
func acquireUnreplicatedLocksOnKeys(
res *result.Result,
// acquireLocksOnKeys acquires locks on each of the keys in the result of a
// {,Reverse}ScanRequest. The locks are held by the specified transaction with
// the supplied locks strength and durability. The list of LockAcquisitions is
// returned to the caller, which the caller must accumulate in its result set.
//
// It is possible to run into a lock conflict error when trying to acquire a
// lock on one of the keys. In such cases, a LockConflictError is returned to
// the caller.
func acquireLocksOnKeys(
ctx context.Context,
readWriter storage.ReadWriter,
txn *roachpb.Transaction,
str lock.Strength,
dur lock.Durability,
scanFmt kvpb.ScanFormat,
scanRes *storage.MVCCScanResult,
) error {
res.Local.AcquiredLocks = make([]roachpb.LockAcquisition, scanRes.NumKeys)
) ([]roachpb.LockAcquisition, error) {
acquiredLocks := make([]roachpb.LockAcquisition, scanRes.NumKeys)
switch scanFmt {
case kvpb.BATCH_RESPONSE:
var i int
return storage.MVCCScanDecodeKeyValues(scanRes.KVData, func(key storage.MVCCKey, _ []byte) error {
res.Local.AcquiredLocks[i] = roachpb.MakeLockAcquisition(txn, copyKey(key.Key), lock.Unreplicated, str)
err := storage.MVCCScanDecodeKeyValues(scanRes.KVData, func(key storage.MVCCKey, _ []byte) error {
k := copyKey(key.Key)
acq, err := acquireLockOnKey(ctx, readWriter, txn, str, dur, k)
if err != nil {
return err
}
acquiredLocks[i] = acq
i++
return nil
})
if err != nil {
return nil, err
}
return acquiredLocks, nil
case kvpb.KEY_VALUES:
for i, row := range scanRes.KVs {
res.Local.AcquiredLocks[i] = roachpb.MakeLockAcquisition(txn, copyKey(row.Key), lock.Unreplicated, str)
k := copyKey(row.Key)
acq, err := acquireLockOnKey(ctx, readWriter, txn, str, dur, k)
if err != nil {
return nil, err
}
acquiredLocks[i] = acq
}
return nil
return acquiredLocks, nil
case kvpb.COL_BATCH_RESPONSE:
return errors.AssertionFailedf("unexpectedly acquiring unreplicated locks with COL_BATCH_RESPONSE scan format")
return nil, errors.AssertionFailedf("unexpectedly acquiring unreplicated locks with COL_BATCH_RESPONSE scan format")
default:
panic("unexpected scanFormat")
}
}

// acquireLockOnKey acquires a lock on the specified key. The lock is acquired
// by the specified transaction with the supplied lock strength and durability.
// The resultant lock acquisition struct is returned, which the caller must
// accumulate in its result set.
//
// It is possible for lock acquisition to run into a lock conflict error, in
// which case a LockConflictError is returned to the caller.
func acquireLockOnKey(
ctx context.Context,
readWriter storage.ReadWriter,
txn *roachpb.Transaction,
str lock.Strength,
dur lock.Durability,
key roachpb.Key,
) (roachpb.LockAcquisition, error) {
// TODO(arul,nvanbenschoten): For now, we're only checking whether we have
// access to a legit pebble.Writer for replicated lock acquisition. We're not
// actually acquiring a replicated lock -- we can only do so once they're
// fully supported in the storage package. Until then, we grab an unreplicated
// lock regardless of what the caller asked us to do.
if dur == lock.Replicated {
// ShouldWriteLocalTimestamp is only implemented by a pebble.Writer; it'll
// panic if we were on the read-only evaluation path, and only had access to
// a pebble.ReadOnly.
readWriter.ShouldWriteLocalTimestamps(ctx)
// Regardless of what the caller asked for, we'll give it an unreplicated
// lock.
dur = lock.Unreplicated
}
switch dur {
case lock.Unreplicated:
// TODO(arul,nvanbenschoten): Call into MVCCCheckForAcquireLockHere.
case lock.Replicated:
// TODO(arul,nvanbenschoten): Call into MVCCAcquireLock here.
default:
panic("unexpected lock durability")
}
acq := roachpb.MakeLockAcquisition(txn, key, dur, str)
return acq, nil
}

// copyKey copies the provided roachpb.Key into a new byte slice, returning the
// copy. It is used in acquireUnreplicatedLocksOnKeys for two reasons:
// copy. It is used in acquireLocksOnKeys for two reasons:
// 1. the keys in an MVCCScanResult, regardless of the scan format used, point
// to a small number of large, contiguous byte slices. These "MVCCScan
// batches" contain keys and their associated values in the same backing
// array. To avoid holding these entire backing arrays in memory and
// preventing them from being garbage collected indefinitely, we copy the key
// slices before coupling their lifetimes to those of unreplicated locks.
// preventing them from being garbage collected indefinitely, we copy the
// key slices before coupling their lifetimes to those of the constructed
// lock acquisitions.
// 2. the KV API has a contract that byte slices returned from KV will not be
// mutated by higher levels. However, we have seen cases (e.g.#64228) where
// this contract is broken due to bugs. To defensively guard against this
Expand Down

0 comments on commit 7f78c5f

Please sign in to comment.