diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go index b79354a62c4b..1583f4ac6071 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go @@ -85,25 +85,29 @@ func (s *txnSeqNumAllocator) SendLocked( ) (*kvpb.BatchResponse, *kvpb.Error) { for _, ru := range ba.Requests { req := ru.GetInner() + oldHeader := req.Header() // Only increment the sequence number generator for requests that // will leave intents or requests that will commit the transaction. // This enables ba.IsCompleteTransaction to work properly. + // + // Note: requests that perform writes using write intents and the EndTxn + // request cannot operate at a past sequence number. This also applies to + // combined read/intent-write requests (e.g. CPuts) -- these always read at + // the latest write sequence number as well. + // + // Requests that do not perform intent writes use the read sequence number. + // Notably, this includes Get/Scan/ReverseScan requests that acquire + // replicated locks, even though they go through raft. if kvpb.IsIntentWrite(req) || req.Method() == kvpb.EndTxn { s.writeSeq++ if err := s.maybeAutoStepReadSeqLocked(ctx); err != nil { return nil, kvpb.NewError(err) } - } - - // Note: only read-only requests can operate at a past seqnum. - // Combined read/write requests (e.g. CPut) always read at the - // latest write seqnum. - oldHeader := req.Header() - if kvpb.IsReadOnly(req) { - oldHeader.Sequence = s.readSeq - } else { oldHeader.Sequence = s.writeSeq + } else { + oldHeader.Sequence = s.readSeq } + req.SetHeader(oldHeader) } diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index 1bf56ed42d2a..9c975999a36b 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -93,8 +93,8 @@ var flagExclusions = map[flag][]flag{ // IsReadOnly returns true iff the request is read-only. A request is // read-only if it does not go through raft, meaning that it cannot -// change any replicated state. However, read-only requests may still -// acquire locks with an unreplicated durability level; see IsLocking. +// change any replicated state. However, read-only requests may still acquire +// locks, but only with unreplicated durability. func IsReadOnly(args Request) bool { flags := args.flags() return (flags&isRead) != 0 && (flags&isWrite) == 0 @@ -1382,9 +1382,17 @@ func flagForLockStrength(l lock.Strength) flag { return 0 } +func flagForLockDurability(d lock.Durability) flag { + if d == lock.Replicated { + return isWrite + } + return 0 +} + func (gr *GetRequest) flags() flag { maybeLocking := flagForLockStrength(gr.KeyLockingStrength) - return isRead | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked + maybeWrite := flagForLockDurability(gr.KeyLockingDurability) + return isRead | maybeWrite | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked } func (*PutRequest) flags() flag { @@ -1474,12 +1482,14 @@ func (*RevertRangeRequest) flags() flag { func (sr *ScanRequest) flags() flag { maybeLocking := flagForLockStrength(sr.KeyLockingStrength) - return isRead | isRange | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked + maybeWrite := flagForLockDurability(sr.KeyLockingDurability) + return isRead | maybeWrite | isRange | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked } func (rsr *ReverseScanRequest) flags() flag { maybeLocking := flagForLockStrength(rsr.KeyLockingStrength) - return isRead | isRange | isReverse | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked + maybeWrite := flagForLockDurability(rsr.KeyLockingDurability) + return isRead | maybeWrite | isRange | isReverse | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked } // EndTxn updates the timestamp cache to prevent replays. diff --git a/pkg/kv/kvserver/batcheval/cmd_get.go b/pkg/kv/kvserver/batcheval/cmd_get.go index fcb572c4b2e2..92d675520e8a 100644 --- a/pkg/kv/kvserver/batcheval/cmd_get.go +++ b/pkg/kv/kvserver/batcheval/cmd_get.go @@ -84,7 +84,11 @@ func Get( var res result.Result if args.KeyLockingStrength != lock.None && h.Txn != nil && getRes.Value != nil { - acq := roachpb.MakeLockAcquisition(h.Txn, args.Key, lock.Unreplicated, args.KeyLockingStrength) + acq, err := acquireLockOnKey(ctx, readWriter, h.Txn, args.KeyLockingStrength, + args.KeyLockingDurability, args.Key) + if err != nil { + return result.Result{}, err + } res.Local.AcquiredLocks = []roachpb.LockAcquisition{acq} } res.Local.EncounteredIntents = intents diff --git a/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go b/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go index 6deb362cf52c..2ecfb75be871 100644 --- a/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go +++ b/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go @@ -110,11 +110,14 @@ func ReverseScan( } if args.KeyLockingStrength != lock.None && h.Txn != nil { - err = acquireUnreplicatedLocksOnKeys(&res, h.Txn, args.KeyLockingStrength, args.ScanFormat, &scanRes) + acquiredLocks, err := acquireLocksOnKeys(ctx, readWriter, h.Txn, args.KeyLockingStrength, + args.KeyLockingDurability, args.ScanFormat, &scanRes) if err != nil { return result.Result{}, err } + res.Local.AcquiredLocks = acquiredLocks } + res.Local.EncounteredIntents = scanRes.Intents return res, nil } diff --git a/pkg/kv/kvserver/batcheval/cmd_scan.go b/pkg/kv/kvserver/batcheval/cmd_scan.go index 11902ec2129b..6de565175772 100644 --- a/pkg/kv/kvserver/batcheval/cmd_scan.go +++ b/pkg/kv/kvserver/batcheval/cmd_scan.go @@ -110,11 +110,14 @@ func Scan( } if args.KeyLockingStrength != lock.None && h.Txn != nil { - err = acquireUnreplicatedLocksOnKeys(&res, h.Txn, args.KeyLockingStrength, args.ScanFormat, &scanRes) + acquiredLocks, err := acquireLocksOnKeys(ctx, readWriter, h.Txn, args.KeyLockingStrength, + args.KeyLockingDurability, args.ScanFormat, &scanRes) if err != nil { return result.Result{}, err } + res.Local.AcquiredLocks = acquiredLocks } + res.Local.EncounteredIntents = scanRes.Intents return res, nil } diff --git a/pkg/kv/kvserver/batcheval/declare.go b/pkg/kv/kvserver/batcheval/declare.go index 879e8e53c2dc..74951f4d2016 100644 --- a/pkg/kv/kvserver/batcheval/declare.go +++ b/pkg/kv/kvserver/batcheval/declare.go @@ -57,39 +57,45 @@ func DefaultDeclareIsolatedKeys( lockSpans *lockspanset.LockSpanSet, maxOffset time.Duration, ) { - access := spanset.SpanReadWrite - str := lock.Intent + var access spanset.SpanAccess + var str lock.Strength timestamp := header.Timestamp - if kvpb.IsReadOnly(req) { - if !kvpb.IsLocking(req) { - access = spanset.SpanReadOnly - str = lock.None - // For non-locking reads, acquire read latches all the way up to the - // request's worst-case (i.e. global) uncertainty limit, because reads may - // observe writes all the way up to this timestamp. - // - // It is critical that reads declare latches up through their uncertainty - // interval so that they are properly synchronized with earlier writes that - // may have a happened-before relationship with the read. These writes could - // not have completed and returned to the client until they were durable in - // the Range's Raft log. However, they may not have been applied to the - // replica's state machine by the time the write was acknowledged, because - // Raft entry application occurs asynchronously with respect to the writer - // (see AckCommittedEntriesBeforeApplication). Latching is the only - // mechanism that ensures that any observers of the write wait for the write - // apply before reading. - // - // NOTE: we pass an empty lease status here, which means that observed - // timestamps collected by transactions will not be used. The actual - // uncertainty interval used by the request may be smaller (i.e. contain a - // local limit), but we can't determine that until after we have declared - // keys, acquired latches, and consulted the replica's lease. - in := uncertainty.ComputeInterval(header, kvserverpb.LeaseStatus{}, maxOffset) - timestamp.Forward(in.GlobalLimit) - } else { - str, _ = req.(kvpb.LockingReadRequest).KeyLocking() + + if kvpb.IsReadOnly(req) && !kvpb.IsLocking(req) { + str = lock.None + access = spanset.SpanReadOnly + // For non-locking reads, acquire read latches all the way up to the + // request's worst-case (i.e. global) uncertainty limit, because reads may + // observe writes all the way up to this timestamp. + // + // It is critical that reads declare latches up through their uncertainty + // interval so that they are properly synchronized with earlier writes that + // may have a happened-before relationship with the read. These writes could + // not have completed and returned to the client until they were durable in + // the Range's Raft log. However, they may not have been applied to the + // replica's state machine by the time the write was acknowledged, because + // Raft entry application occurs asynchronously with respect to the writer + // (see AckCommittedEntriesBeforeApplication). Latching is the only + // mechanism that ensures that any observers of the write wait for the write + // apply before reading. + // + // NOTE: we pass an empty lease status here, which means that observed + // timestamps collected by transactions will not be used. The actual + // uncertainty interval used by the request may be smaller (i.e. contain a + // local limit), but we can't determine that until after we have declared + // keys, acquired latches, and consulted the replica's lease. + in := uncertainty.ComputeInterval(header, kvserverpb.LeaseStatus{}, maxOffset) + timestamp.Forward(in.GlobalLimit) + } else { + str = lock.Intent + access = spanset.SpanReadWrite + // Get the correct lock strength to use for {lock,latch} spans if we're + // dealing with locking read requests. + if readOnlyReq, ok := req.(kvpb.LockingReadRequest); ok { + str, _ = readOnlyReq.KeyLocking() switch str { - // The lock.None case has already been handled above. + case lock.None: + panic(errors.AssertionFailedf("unexpected non-locking read handling")) case lock.Shared: access = spanset.SpanReadOnly // Unlike non-locking reads, shared-locking reads are isolated from diff --git a/pkg/kv/kvserver/batcheval/intent.go b/pkg/kv/kvserver/batcheval/intent.go index 4f1a562e4fe2..1b2d31a66768 100644 --- a/pkg/kv/kvserver/batcheval/intent.go +++ b/pkg/kv/kvserver/batcheval/intent.go @@ -14,7 +14,6 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -104,44 +103,108 @@ func readProvisionalVal( } -// acquireUnreplicatedLocksOnKeys adds an unreplicated lock acquisition by the -// transaction to the provided result.Result for each key in the scan result. -func acquireUnreplicatedLocksOnKeys( - res *result.Result, +// acquireLocksOnKeys acquires locks on each of the keys in the result of a +// {,Reverse}ScanRequest. The locks are held by the specified transaction with +// the supplied locks strength and durability. The list of LockAcquisitions is +// returned to the caller, which the caller must accumulate in its result set. +// +// It is possible to run into a lock conflict error when trying to acquire a +// lock on one of the keys. In such cases, a LockConflictError is returned to +// the caller. +func acquireLocksOnKeys( + ctx context.Context, + readWriter storage.ReadWriter, txn *roachpb.Transaction, str lock.Strength, + dur lock.Durability, scanFmt kvpb.ScanFormat, scanRes *storage.MVCCScanResult, -) error { - res.Local.AcquiredLocks = make([]roachpb.LockAcquisition, scanRes.NumKeys) +) ([]roachpb.LockAcquisition, error) { + acquiredLocks := make([]roachpb.LockAcquisition, scanRes.NumKeys) switch scanFmt { case kvpb.BATCH_RESPONSE: var i int - return storage.MVCCScanDecodeKeyValues(scanRes.KVData, func(key storage.MVCCKey, _ []byte) error { - res.Local.AcquiredLocks[i] = roachpb.MakeLockAcquisition(txn, copyKey(key.Key), lock.Unreplicated, str) + err := storage.MVCCScanDecodeKeyValues(scanRes.KVData, func(key storage.MVCCKey, _ []byte) error { + k := copyKey(key.Key) + acq, err := acquireLockOnKey(ctx, readWriter, txn, str, dur, k) + if err != nil { + return err + } + acquiredLocks[i] = acq i++ return nil }) + if err != nil { + return nil, err + } + return acquiredLocks, nil case kvpb.KEY_VALUES: for i, row := range scanRes.KVs { - res.Local.AcquiredLocks[i] = roachpb.MakeLockAcquisition(txn, copyKey(row.Key), lock.Unreplicated, str) + k := copyKey(row.Key) + acq, err := acquireLockOnKey(ctx, readWriter, txn, str, dur, k) + if err != nil { + return nil, err + } + acquiredLocks[i] = acq } - return nil + return acquiredLocks, nil case kvpb.COL_BATCH_RESPONSE: - return errors.AssertionFailedf("unexpectedly acquiring unreplicated locks with COL_BATCH_RESPONSE scan format") + return nil, errors.AssertionFailedf("unexpectedly acquiring unreplicated locks with COL_BATCH_RESPONSE scan format") default: panic("unexpected scanFormat") } } +// acquireLockOnKey acquires a lock on the specified key. The lock is acquired +// by the specified transaction with the supplied lock strength and durability. +// The resultant lock acquisition struct is returned, which the caller must +// accumulate in its result set. +// +// It is possible for lock acquisition to run into a lock conflict error, in +// which case a LockConflictError is returned to the caller. +func acquireLockOnKey( + ctx context.Context, + readWriter storage.ReadWriter, + txn *roachpb.Transaction, + str lock.Strength, + dur lock.Durability, + key roachpb.Key, +) (roachpb.LockAcquisition, error) { + // TODO(arul,nvanbenschoten): For now, we're only checking whether we have + // access to a legit pebble.Writer for replicated lock acquisition. We're not + // actually acquiring a replicated lock -- we can only do so once they're + // fully supported in the storage package. Until then, we grab an unreplicated + // lock regardless of what the caller asked us to do. + if dur == lock.Replicated { + // ShouldWriteLocalTimestamp is only implemented by a pebble.Writer; it'll + // panic if we were on the read-only evaluation path, and only had access to + // a pebble.ReadOnly. + readWriter.ShouldWriteLocalTimestamps(ctx) + // Regardless of what the caller asked for, we'll give it an unreplicated + // lock. + dur = lock.Unreplicated + } + switch dur { + case lock.Unreplicated: + // TODO(arul,nvanbenschoten): Call into MVCCCheckForAcquireLockHere. + case lock.Replicated: + // TODO(arul,nvanbenschoten): Call into MVCCAcquireLock here. + default: + panic("unexpected lock durability") + } + acq := roachpb.MakeLockAcquisition(txn, key, dur, str) + return acq, nil +} + // copyKey copies the provided roachpb.Key into a new byte slice, returning the -// copy. It is used in acquireUnreplicatedLocksOnKeys for two reasons: +// copy. It is used in acquireLocksOnKeys for two reasons: // 1. the keys in an MVCCScanResult, regardless of the scan format used, point // to a small number of large, contiguous byte slices. These "MVCCScan // batches" contain keys and their associated values in the same backing // array. To avoid holding these entire backing arrays in memory and -// preventing them from being garbage collected indefinitely, we copy the key -// slices before coupling their lifetimes to those of unreplicated locks. +// preventing them from being garbage collected indefinitely, we copy the +// key slices before coupling their lifetimes to those of the constructed +// lock acquisitions. // 2. the KV API has a contract that byte slices returned from KV will not be // mutated by higher levels. However, we have seen cases (e.g.#64228) where // this contract is broken due to bugs. To defensively guard against this