From 7f78c5f1b14037fb4f8350d1c70a0fcf7b517628 Mon Sep 17 00:00:00 2001 From: Arul Ajmani Date: Fri, 8 Sep 2023 14:22:17 -0400 Subject: [PATCH 1/3] kvserver: requests that acquire repl locks should use read-write path Previously, {Get,Scan,ReverseScan}Requests all used the read-only execution path. Things aren't so simple anymore, now that we want these requests to be able to acquire replicated locks, which means they need to go through raft (and therefore the read-write execution path). This patch achieves exactly that. Informs #100193 Release note: None --- .../txn_interceptor_seq_num_allocator.go | 22 +++-- pkg/kv/kvpb/api.go | 20 +++- pkg/kv/kvserver/batcheval/cmd_get.go | 6 +- pkg/kv/kvserver/batcheval/cmd_reverse_scan.go | 5 +- pkg/kv/kvserver/batcheval/cmd_scan.go | 5 +- pkg/kv/kvserver/batcheval/declare.go | 68 +++++++------- pkg/kv/kvserver/batcheval/intent.go | 93 ++++++++++++++++--- 7 files changed, 156 insertions(+), 63 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go index b79354a62c4b..1583f4ac6071 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go @@ -85,25 +85,29 @@ func (s *txnSeqNumAllocator) SendLocked( ) (*kvpb.BatchResponse, *kvpb.Error) { for _, ru := range ba.Requests { req := ru.GetInner() + oldHeader := req.Header() // Only increment the sequence number generator for requests that // will leave intents or requests that will commit the transaction. // This enables ba.IsCompleteTransaction to work properly. + // + // Note: requests that perform writes using write intents and the EndTxn + // request cannot operate at a past sequence number. This also applies to + // combined read/intent-write requests (e.g. CPuts) -- these always read at + // the latest write sequence number as well. + // + // Requests that do not perform intent writes use the read sequence number. + // Notably, this includes Get/Scan/ReverseScan requests that acquire + // replicated locks, even though they go through raft. if kvpb.IsIntentWrite(req) || req.Method() == kvpb.EndTxn { s.writeSeq++ if err := s.maybeAutoStepReadSeqLocked(ctx); err != nil { return nil, kvpb.NewError(err) } - } - - // Note: only read-only requests can operate at a past seqnum. - // Combined read/write requests (e.g. CPut) always read at the - // latest write seqnum. - oldHeader := req.Header() - if kvpb.IsReadOnly(req) { - oldHeader.Sequence = s.readSeq - } else { oldHeader.Sequence = s.writeSeq + } else { + oldHeader.Sequence = s.readSeq } + req.SetHeader(oldHeader) } diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index 1bf56ed42d2a..9c975999a36b 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -93,8 +93,8 @@ var flagExclusions = map[flag][]flag{ // IsReadOnly returns true iff the request is read-only. A request is // read-only if it does not go through raft, meaning that it cannot -// change any replicated state. However, read-only requests may still -// acquire locks with an unreplicated durability level; see IsLocking. +// change any replicated state. However, read-only requests may still acquire +// locks, but only with unreplicated durability. func IsReadOnly(args Request) bool { flags := args.flags() return (flags&isRead) != 0 && (flags&isWrite) == 0 @@ -1382,9 +1382,17 @@ func flagForLockStrength(l lock.Strength) flag { return 0 } +func flagForLockDurability(d lock.Durability) flag { + if d == lock.Replicated { + return isWrite + } + return 0 +} + func (gr *GetRequest) flags() flag { maybeLocking := flagForLockStrength(gr.KeyLockingStrength) - return isRead | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked + maybeWrite := flagForLockDurability(gr.KeyLockingDurability) + return isRead | maybeWrite | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked } func (*PutRequest) flags() flag { @@ -1474,12 +1482,14 @@ func (*RevertRangeRequest) flags() flag { func (sr *ScanRequest) flags() flag { maybeLocking := flagForLockStrength(sr.KeyLockingStrength) - return isRead | isRange | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked + maybeWrite := flagForLockDurability(sr.KeyLockingDurability) + return isRead | maybeWrite | isRange | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked } func (rsr *ReverseScanRequest) flags() flag { maybeLocking := flagForLockStrength(rsr.KeyLockingStrength) - return isRead | isRange | isReverse | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked + maybeWrite := flagForLockDurability(rsr.KeyLockingDurability) + return isRead | maybeWrite | isRange | isReverse | isTxn | maybeLocking | updatesTSCache | needsRefresh | canSkipLocked } // EndTxn updates the timestamp cache to prevent replays. diff --git a/pkg/kv/kvserver/batcheval/cmd_get.go b/pkg/kv/kvserver/batcheval/cmd_get.go index fcb572c4b2e2..92d675520e8a 100644 --- a/pkg/kv/kvserver/batcheval/cmd_get.go +++ b/pkg/kv/kvserver/batcheval/cmd_get.go @@ -84,7 +84,11 @@ func Get( var res result.Result if args.KeyLockingStrength != lock.None && h.Txn != nil && getRes.Value != nil { - acq := roachpb.MakeLockAcquisition(h.Txn, args.Key, lock.Unreplicated, args.KeyLockingStrength) + acq, err := acquireLockOnKey(ctx, readWriter, h.Txn, args.KeyLockingStrength, + args.KeyLockingDurability, args.Key) + if err != nil { + return result.Result{}, err + } res.Local.AcquiredLocks = []roachpb.LockAcquisition{acq} } res.Local.EncounteredIntents = intents diff --git a/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go b/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go index 6deb362cf52c..2ecfb75be871 100644 --- a/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go +++ b/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go @@ -110,11 +110,14 @@ func ReverseScan( } if args.KeyLockingStrength != lock.None && h.Txn != nil { - err = acquireUnreplicatedLocksOnKeys(&res, h.Txn, args.KeyLockingStrength, args.ScanFormat, &scanRes) + acquiredLocks, err := acquireLocksOnKeys(ctx, readWriter, h.Txn, args.KeyLockingStrength, + args.KeyLockingDurability, args.ScanFormat, &scanRes) if err != nil { return result.Result{}, err } + res.Local.AcquiredLocks = acquiredLocks } + res.Local.EncounteredIntents = scanRes.Intents return res, nil } diff --git a/pkg/kv/kvserver/batcheval/cmd_scan.go b/pkg/kv/kvserver/batcheval/cmd_scan.go index 11902ec2129b..6de565175772 100644 --- a/pkg/kv/kvserver/batcheval/cmd_scan.go +++ b/pkg/kv/kvserver/batcheval/cmd_scan.go @@ -110,11 +110,14 @@ func Scan( } if args.KeyLockingStrength != lock.None && h.Txn != nil { - err = acquireUnreplicatedLocksOnKeys(&res, h.Txn, args.KeyLockingStrength, args.ScanFormat, &scanRes) + acquiredLocks, err := acquireLocksOnKeys(ctx, readWriter, h.Txn, args.KeyLockingStrength, + args.KeyLockingDurability, args.ScanFormat, &scanRes) if err != nil { return result.Result{}, err } + res.Local.AcquiredLocks = acquiredLocks } + res.Local.EncounteredIntents = scanRes.Intents return res, nil } diff --git a/pkg/kv/kvserver/batcheval/declare.go b/pkg/kv/kvserver/batcheval/declare.go index 879e8e53c2dc..74951f4d2016 100644 --- a/pkg/kv/kvserver/batcheval/declare.go +++ b/pkg/kv/kvserver/batcheval/declare.go @@ -57,39 +57,45 @@ func DefaultDeclareIsolatedKeys( lockSpans *lockspanset.LockSpanSet, maxOffset time.Duration, ) { - access := spanset.SpanReadWrite - str := lock.Intent + var access spanset.SpanAccess + var str lock.Strength timestamp := header.Timestamp - if kvpb.IsReadOnly(req) { - if !kvpb.IsLocking(req) { - access = spanset.SpanReadOnly - str = lock.None - // For non-locking reads, acquire read latches all the way up to the - // request's worst-case (i.e. global) uncertainty limit, because reads may - // observe writes all the way up to this timestamp. - // - // It is critical that reads declare latches up through their uncertainty - // interval so that they are properly synchronized with earlier writes that - // may have a happened-before relationship with the read. These writes could - // not have completed and returned to the client until they were durable in - // the Range's Raft log. However, they may not have been applied to the - // replica's state machine by the time the write was acknowledged, because - // Raft entry application occurs asynchronously with respect to the writer - // (see AckCommittedEntriesBeforeApplication). Latching is the only - // mechanism that ensures that any observers of the write wait for the write - // apply before reading. - // - // NOTE: we pass an empty lease status here, which means that observed - // timestamps collected by transactions will not be used. The actual - // uncertainty interval used by the request may be smaller (i.e. contain a - // local limit), but we can't determine that until after we have declared - // keys, acquired latches, and consulted the replica's lease. - in := uncertainty.ComputeInterval(header, kvserverpb.LeaseStatus{}, maxOffset) - timestamp.Forward(in.GlobalLimit) - } else { - str, _ = req.(kvpb.LockingReadRequest).KeyLocking() + + if kvpb.IsReadOnly(req) && !kvpb.IsLocking(req) { + str = lock.None + access = spanset.SpanReadOnly + // For non-locking reads, acquire read latches all the way up to the + // request's worst-case (i.e. global) uncertainty limit, because reads may + // observe writes all the way up to this timestamp. + // + // It is critical that reads declare latches up through their uncertainty + // interval so that they are properly synchronized with earlier writes that + // may have a happened-before relationship with the read. These writes could + // not have completed and returned to the client until they were durable in + // the Range's Raft log. However, they may not have been applied to the + // replica's state machine by the time the write was acknowledged, because + // Raft entry application occurs asynchronously with respect to the writer + // (see AckCommittedEntriesBeforeApplication). Latching is the only + // mechanism that ensures that any observers of the write wait for the write + // apply before reading. + // + // NOTE: we pass an empty lease status here, which means that observed + // timestamps collected by transactions will not be used. The actual + // uncertainty interval used by the request may be smaller (i.e. contain a + // local limit), but we can't determine that until after we have declared + // keys, acquired latches, and consulted the replica's lease. + in := uncertainty.ComputeInterval(header, kvserverpb.LeaseStatus{}, maxOffset) + timestamp.Forward(in.GlobalLimit) + } else { + str = lock.Intent + access = spanset.SpanReadWrite + // Get the correct lock strength to use for {lock,latch} spans if we're + // dealing with locking read requests. + if readOnlyReq, ok := req.(kvpb.LockingReadRequest); ok { + str, _ = readOnlyReq.KeyLocking() switch str { - // The lock.None case has already been handled above. + case lock.None: + panic(errors.AssertionFailedf("unexpected non-locking read handling")) case lock.Shared: access = spanset.SpanReadOnly // Unlike non-locking reads, shared-locking reads are isolated from diff --git a/pkg/kv/kvserver/batcheval/intent.go b/pkg/kv/kvserver/batcheval/intent.go index 4f1a562e4fe2..1b2d31a66768 100644 --- a/pkg/kv/kvserver/batcheval/intent.go +++ b/pkg/kv/kvserver/batcheval/intent.go @@ -14,7 +14,6 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -104,44 +103,108 @@ func readProvisionalVal( } -// acquireUnreplicatedLocksOnKeys adds an unreplicated lock acquisition by the -// transaction to the provided result.Result for each key in the scan result. -func acquireUnreplicatedLocksOnKeys( - res *result.Result, +// acquireLocksOnKeys acquires locks on each of the keys in the result of a +// {,Reverse}ScanRequest. The locks are held by the specified transaction with +// the supplied locks strength and durability. The list of LockAcquisitions is +// returned to the caller, which the caller must accumulate in its result set. +// +// It is possible to run into a lock conflict error when trying to acquire a +// lock on one of the keys. In such cases, a LockConflictError is returned to +// the caller. +func acquireLocksOnKeys( + ctx context.Context, + readWriter storage.ReadWriter, txn *roachpb.Transaction, str lock.Strength, + dur lock.Durability, scanFmt kvpb.ScanFormat, scanRes *storage.MVCCScanResult, -) error { - res.Local.AcquiredLocks = make([]roachpb.LockAcquisition, scanRes.NumKeys) +) ([]roachpb.LockAcquisition, error) { + acquiredLocks := make([]roachpb.LockAcquisition, scanRes.NumKeys) switch scanFmt { case kvpb.BATCH_RESPONSE: var i int - return storage.MVCCScanDecodeKeyValues(scanRes.KVData, func(key storage.MVCCKey, _ []byte) error { - res.Local.AcquiredLocks[i] = roachpb.MakeLockAcquisition(txn, copyKey(key.Key), lock.Unreplicated, str) + err := storage.MVCCScanDecodeKeyValues(scanRes.KVData, func(key storage.MVCCKey, _ []byte) error { + k := copyKey(key.Key) + acq, err := acquireLockOnKey(ctx, readWriter, txn, str, dur, k) + if err != nil { + return err + } + acquiredLocks[i] = acq i++ return nil }) + if err != nil { + return nil, err + } + return acquiredLocks, nil case kvpb.KEY_VALUES: for i, row := range scanRes.KVs { - res.Local.AcquiredLocks[i] = roachpb.MakeLockAcquisition(txn, copyKey(row.Key), lock.Unreplicated, str) + k := copyKey(row.Key) + acq, err := acquireLockOnKey(ctx, readWriter, txn, str, dur, k) + if err != nil { + return nil, err + } + acquiredLocks[i] = acq } - return nil + return acquiredLocks, nil case kvpb.COL_BATCH_RESPONSE: - return errors.AssertionFailedf("unexpectedly acquiring unreplicated locks with COL_BATCH_RESPONSE scan format") + return nil, errors.AssertionFailedf("unexpectedly acquiring unreplicated locks with COL_BATCH_RESPONSE scan format") default: panic("unexpected scanFormat") } } +// acquireLockOnKey acquires a lock on the specified key. The lock is acquired +// by the specified transaction with the supplied lock strength and durability. +// The resultant lock acquisition struct is returned, which the caller must +// accumulate in its result set. +// +// It is possible for lock acquisition to run into a lock conflict error, in +// which case a LockConflictError is returned to the caller. +func acquireLockOnKey( + ctx context.Context, + readWriter storage.ReadWriter, + txn *roachpb.Transaction, + str lock.Strength, + dur lock.Durability, + key roachpb.Key, +) (roachpb.LockAcquisition, error) { + // TODO(arul,nvanbenschoten): For now, we're only checking whether we have + // access to a legit pebble.Writer for replicated lock acquisition. We're not + // actually acquiring a replicated lock -- we can only do so once they're + // fully supported in the storage package. Until then, we grab an unreplicated + // lock regardless of what the caller asked us to do. + if dur == lock.Replicated { + // ShouldWriteLocalTimestamp is only implemented by a pebble.Writer; it'll + // panic if we were on the read-only evaluation path, and only had access to + // a pebble.ReadOnly. + readWriter.ShouldWriteLocalTimestamps(ctx) + // Regardless of what the caller asked for, we'll give it an unreplicated + // lock. + dur = lock.Unreplicated + } + switch dur { + case lock.Unreplicated: + // TODO(arul,nvanbenschoten): Call into MVCCCheckForAcquireLockHere. + case lock.Replicated: + // TODO(arul,nvanbenschoten): Call into MVCCAcquireLock here. + default: + panic("unexpected lock durability") + } + acq := roachpb.MakeLockAcquisition(txn, key, dur, str) + return acq, nil +} + // copyKey copies the provided roachpb.Key into a new byte slice, returning the -// copy. It is used in acquireUnreplicatedLocksOnKeys for two reasons: +// copy. It is used in acquireLocksOnKeys for two reasons: // 1. the keys in an MVCCScanResult, regardless of the scan format used, point // to a small number of large, contiguous byte slices. These "MVCCScan // batches" contain keys and their associated values in the same backing // array. To avoid holding these entire backing arrays in memory and -// preventing them from being garbage collected indefinitely, we copy the key -// slices before coupling their lifetimes to those of unreplicated locks. +// preventing them from being garbage collected indefinitely, we copy the +// key slices before coupling their lifetimes to those of the constructed +// lock acquisitions. // 2. the KV API has a contract that byte slices returned from KV will not be // mutated by higher levels. However, we have seen cases (e.g.#64228) where // this contract is broken due to bugs. To defensively guard against this From c926799fced26385d2bd2804e7fcdb7ff8b5b7a6 Mon Sep 17 00:00:00 2001 From: Arul Ajmani Date: Sat, 16 Sep 2023 15:24:43 -0400 Subject: [PATCH 2/3] kv: add lock durability options to the KV client API This patch adds lock durability flags to GetFor{Update,Share}, ScanFor{Update,Share}, and ReverseScanFor{Update,Share}, using which users of the KV client API can express lock durability goals. These then map to either replicated or unreplicated locks. This patch then modifies existing tests to make use of replicated locks in a few places. Epic: none Release note: None --- pkg/ccl/changefeedccl/kvfeed/scanner.go | 2 +- pkg/cli/debug_send_kv_batch_test.go | 4 +- pkg/kv/batch.go | 100 +++++--- pkg/kv/db.go | 72 +++--- pkg/kv/db_test.go | 226 +++++++++++------- .../kvcoord/dist_sender_ambiguous_test.go | 2 +- .../kvcoord/dist_sender_server_test.go | 28 +-- pkg/kv/kvclient/kvcoord/dist_sender_test.go | 53 ++-- pkg/kv/kvclient/kvcoord/transport_test.go | 2 +- .../txn_coord_sender_savepoints_test.go | 2 +- .../kvclient/kvcoord/txn_coord_sender_test.go | 56 +++-- pkg/kv/kvclient/kvstreamer/streamer_test.go | 2 +- pkg/kv/kvnemesis/applier.go | 74 +++--- pkg/kv/kvpb/api.go | 119 +++++++-- pkg/kv/kvpb/batch_test.go | 6 +- .../client_replica_circuit_breaker_test.go | 4 +- pkg/kv/kvserver/client_replica_test.go | 6 +- pkg/kv/kvserver/client_split_test.go | 4 +- pkg/kv/kvserver/replica_command.go | 11 +- pkg/kv/kvserver/replica_rankings_test.go | 2 +- pkg/kv/kvserver/replica_test.go | 8 +- pkg/kv/txn.go | 64 ++--- pkg/kv/txn_test.go | 6 +- pkg/server/node_test.go | 4 +- .../system_config_watcher_test.go | 1 - pkg/sql/crdb_internal.go | 2 +- pkg/sql/tests/kv_test.go | 3 +- pkg/ts/server.go | 2 +- pkg/upgrade/upgrades/BUILD.bazel | 1 + .../desc_id_sequence_for_system_tenant.go | 3 +- 30 files changed, 540 insertions(+), 329 deletions(-) diff --git a/pkg/ccl/changefeedccl/kvfeed/scanner.go b/pkg/ccl/changefeedccl/kvfeed/scanner.go index 0d42dde9e25b..db904e2a0f24 100644 --- a/pkg/ccl/changefeedccl/kvfeed/scanner.go +++ b/pkg/ccl/changefeedccl/kvfeed/scanner.go @@ -188,7 +188,7 @@ func (p *scanRequestScanner) exportSpan( for remaining := &span; remaining != nil; { start := timeutil.Now() b := txn.NewBatch() - r := kvpb.NewScan(remaining.Key, remaining.EndKey, kvpb.NonLocking).(*kvpb.ScanRequest) + r := kvpb.NewScan(remaining.Key, remaining.EndKey).(*kvpb.ScanRequest) r.ScanFormat = kvpb.BATCH_RESPONSE b.Header.TargetBytes = targetBytesPerScan b.AdmissionHeader = kvpb.AdmissionHeader{ diff --git a/pkg/cli/debug_send_kv_batch_test.go b/pkg/cli/debug_send_kv_batch_test.go index dfb50e71dd05..3f79d62f8463 100644 --- a/pkg/cli/debug_send_kv_batch_test.go +++ b/pkg/cli/debug_send_kv_batch_test.go @@ -37,7 +37,7 @@ func TestSendKVBatchExample(t *testing.T) { var ba kvpb.BatchRequest ba.Add(kvpb.NewPut(roachpb.Key("foo"), roachpb.MakeValueFromString("bar"))) - ba.Add(kvpb.NewGet(roachpb.Key("foo"), kvpb.NonLocking)) + ba.Add(kvpb.NewGet(roachpb.Key("foo"))) // NOTE: This cannot be marshaled using the standard Go JSON marshaler, // since it does not correctly (un)marshal the JSON as mandated by the @@ -72,7 +72,7 @@ func TestSendKVBatch(t *testing.T) { // Protobuf spec. Instead, use the JSON marshaler shipped with Protobuf. var ba kvpb.BatchRequest ba.Add(kvpb.NewPut(roachpb.Key("foo"), roachpb.MakeValueFromString("bar"))) - ba.Add(kvpb.NewGet(roachpb.Key("foo"), kvpb.NonLocking)) + ba.Add(kvpb.NewGet(roachpb.Key("foo"))) jsonpb := protoutil.JSONPb{} jsonProto, err := jsonpb.Marshal(&ba) diff --git a/pkg/kv/batch.go b/pkg/kv/batch.go index 76ff420426a4..92080093f422 100644 --- a/pkg/kv/batch.go +++ b/pkg/kv/batch.go @@ -379,13 +379,22 @@ func (b *Batch) AddRawRequest(reqs ...kvpb.Request) { } } -func (b *Batch) get(key interface{}, str kvpb.KeyLockingStrengthType) { +func (b *Batch) get( + key interface{}, str kvpb.KeyLockingStrengthType, dur kvpb.KeyLockingDurabilityType, +) { k, err := marshalKey(key) if err != nil { b.initResult(0, 1, notRaw, err) return } - b.appendReqs(kvpb.NewGet(k, str)) + switch str { + case kvpb.NonLocking: + b.appendReqs(kvpb.NewGet(k)) + case kvpb.ForShare, kvpb.ForUpdate: + b.appendReqs(kvpb.NewLockingGet(k, str, dur)) + default: + panic(errors.AssertionFailedf("unknown str %d", str)) + } b.initResult(1, 1, notRaw, nil) } @@ -397,31 +406,32 @@ func (b *Batch) get(key interface{}, str kvpb.KeyLockingStrengthType) { // // key can be either a byte slice or a string. func (b *Batch) Get(key interface{}) { - b.get(key, kvpb.NonLocking) + b.get(key, kvpb.NonLocking, kvpb.Invalid) } -// GetForUpdate retrieves the value for a key. An unreplicated, exclusive lock -// is acquired on the key, if it exists. A new result will be appended to the -// batch which will contain a single row. +// GetForUpdate retrieves the value for a key, returning the retrieved key/value +// or an error. An Exclusive lock with the supplied durability is acquired on +// the key, if it exists. It is not considered an error for the key not to +// exist. // // r, err := db.GetForUpdate("a") // // string(r.Rows[0].Key) == "a" // // key can be either a byte slice or a string. -func (b *Batch) GetForUpdate(key interface{}) { - b.get(key, kvpb.ForUpdate) +func (b *Batch) GetForUpdate(key interface{}, dur kvpb.KeyLockingDurabilityType) { + b.get(key, kvpb.ForUpdate, dur) } -// GetForShare retrieves the value for a key. An unreplicated, shared lock -// is acquired on the key, if it exists. A new result will be appended to the -// batch which will contain a single row. +// GetForShare retrieves the value for a key. A shared lock with the supplied +// durability is acquired on the key, if it exists. A new result will be +// appended to the batch which will contain a single row. // // r, err := db.GetForShare("a") // // string(r.Rows[0].Key) == "a" // // key can be either a byte slice or a string. -func (b *Batch) GetForShare(key interface{}) { - b.get(key, kvpb.ForShare) +func (b *Batch) GetForShare(key interface{}, dur kvpb.KeyLockingDurabilityType) { + b.get(key, kvpb.ForShare, dur) } func (b *Batch) put(key, value interface{}, inline bool) { @@ -728,7 +738,12 @@ func (b *Batch) Inc(key interface{}, value int64) { b.initResult(1, 1, notRaw, nil) } -func (b *Batch) scan(s, e interface{}, isReverse bool, str kvpb.KeyLockingStrengthType) { +func (b *Batch) scan( + s, e interface{}, + isReverse bool, + str kvpb.KeyLockingStrengthType, + dur kvpb.KeyLockingDurabilityType, +) { begin, err := marshalKey(s) if err != nil { b.initResult(0, 0, notRaw, err) @@ -739,10 +754,21 @@ func (b *Batch) scan(s, e interface{}, isReverse bool, str kvpb.KeyLockingStreng b.initResult(0, 0, notRaw, err) return } - if !isReverse { - b.appendReqs(kvpb.NewScan(begin, end, str)) - } else { - b.appendReqs(kvpb.NewReverseScan(begin, end, str)) + switch str { + case kvpb.NonLocking: + if !isReverse { + b.appendReqs(kvpb.NewScan(begin, end)) + } else { + b.appendReqs(kvpb.NewReverseScan(begin, end)) + } + case kvpb.ForShare, kvpb.ForUpdate: + if !isReverse { + b.appendReqs(kvpb.NewLockingScan(begin, end, str, dur)) + } else { + b.appendReqs(kvpb.NewLockingReverseScan(begin, end, str, dur)) + } + default: + panic(errors.AssertionFailedf("unknown str %d", str)) } b.initResult(1, 0, notRaw, nil) } @@ -755,31 +781,31 @@ func (b *Batch) scan(s, e interface{}, isReverse bool, str kvpb.KeyLockingStreng // // key can be either a byte slice or a string. func (b *Batch) Scan(s, e interface{}) { - b.scan(s, e, false /* isReverse */, kvpb.NonLocking) + b.scan(s, e, false /* isReverse */, kvpb.NonLocking, kvpb.Invalid) } -// ScanForUpdate retrieves the key/values between begin (inclusive) and end -// (exclusive) in ascending order. Unreplicated, exclusive locks are acquired on -// each of the returned keys. +// ScanForUpdate retrieves the rows between begin (inclusive) and end +// (exclusive) in ascending order. Exclusive locks with the supplied durability +// are acquired on each of the returned keys. // // A new result will be appended to the batch which will contain "rows" (each // row is a key/value pair) and Result.Err will indicate success or failure. // // key can be either a byte slice or a string. -func (b *Batch) ScanForUpdate(s, e interface{}) { - b.scan(s, e, false /* isReverse */, kvpb.ForUpdate) +func (b *Batch) ScanForUpdate(s, e interface{}, dur kvpb.KeyLockingDurabilityType) { + b.scan(s, e, false /* isReverse */, kvpb.ForUpdate, dur) } // ScanForShare retrieves the key/values between begin (inclusive) and end -// (exclusive) in ascending order. Unreplicated, shared locks are acquired on -// each of the returned keys. +// (exclusive) in ascending order. Shared locks with the supplied durability are +// acquired on each of the returned keys. // // A new result will be appended to the batch which will contain "rows" (each // row is a key/value pair) and Result.Err will indicate success or failure. // // key can be either a byte slice or a string. -func (b *Batch) ScanForShare(s, e interface{}) { - b.scan(s, e, false /* isReverse */, kvpb.ForShare) +func (b *Batch) ScanForShare(s, e interface{}, dur kvpb.KeyLockingDurabilityType) { + b.scan(s, e, false /* isReverse */, kvpb.ForShare, dur) } // ReverseScan retrieves the rows between begin (inclusive) and end (exclusive) @@ -790,31 +816,31 @@ func (b *Batch) ScanForShare(s, e interface{}) { // // key can be either a byte slice or a string. func (b *Batch) ReverseScan(s, e interface{}) { - b.scan(s, e, true /* isReverse */, kvpb.NonLocking) + b.scan(s, e, true /* isReverse */, kvpb.NonLocking, kvpb.Invalid) } // ReverseScanForUpdate retrieves the rows between begin (inclusive) and end -// (exclusive) in descending order. Unreplicated, exclusive locks are acquired -// on each of the returned keys. +// (exclusive) in descending order. Exclusive locks with the supplied durability +// are acquired on each of the returned keys. // // A new result will be appended to the batch which will contain "rows" (each // "row" is a key/value pair) and Result.Err will indicate success or failure. // // key can be either a byte slice or a string. -func (b *Batch) ReverseScanForUpdate(s, e interface{}) { - b.scan(s, e, true /* isReverse */, kvpb.ForUpdate) +func (b *Batch) ReverseScanForUpdate(s, e interface{}, dur kvpb.KeyLockingDurabilityType) { + b.scan(s, e, true /* isReverse */, kvpb.ForUpdate, dur) } // ReverseScanForShare retrieves the rows between begin (inclusive) and end -// (exclusive) in descending order. Unreplicated, shared locks are acquired -// on each of the returned keys. +// (exclusive) in descending order. Shared locks with the supplied durability +// are acquired on each of the returned keys. // // A new result will be appended to the batch which will contain "rows" (each // "row" is a key/value pair) and Result.Err will indicate success or failure. // // key can be either a byte slice or a string. -func (b *Batch) ReverseScanForShare(s, e interface{}) { - b.scan(s, e, true /* isReverse */, kvpb.ForShare) +func (b *Batch) ReverseScanForShare(s, e interface{}, dur kvpb.KeyLockingDurabilityType) { + b.scan(s, e, true /* isReverse */, kvpb.ForShare, dur) } // Del deletes one or more keys. diff --git a/pkg/kv/db.go b/pkg/kv/db.go index 2e887e378582..d9e2c74f2679 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -343,30 +343,35 @@ func (db *DB) Get(ctx context.Context, key interface{}) (KeyValue, error) { } // GetForUpdate retrieves the value for a key, returning the retrieved key/value -// or an error. An unreplicated, exclusive lock is acquired on the key, if it -// exists. It is not considered an error for the key not to exist. +// or an error. An Exclusive lock with the supplied durability is acquired on +// the key, if it exists. It is not considered an error for the key not to +// exist. // // r, err := db.GetForUpdate("a") // // string(r.Key) == "a" // // key can be either a byte slice or a string. -func (db *DB) GetForUpdate(ctx context.Context, key interface{}) (KeyValue, error) { +func (db *DB) GetForUpdate( + ctx context.Context, key interface{}, dur kvpb.KeyLockingDurabilityType, +) (KeyValue, error) { b := &Batch{} - b.GetForUpdate(key) + b.GetForUpdate(key, dur) return getOneRow(db.Run(ctx, b), b) } -// GetForShare retrieves the value for a key, returning the retrieved key/value -// or an error. An unreplicated, shared lock is acquired on the key, if it -// exists. It is not considered an error for the key not to exist. +// GetForShare retrieves the value for a key. A shared lock with the supplied +// durability is acquired on the key, if it exists. A new result will be +// appended to the batch which will contain a single row. // // r, err := db.GetForShare("a") // // string(r.Key) == "a" // // key can be either a byte slice or a string. -func (db *DB) GetForShare(ctx context.Context, key interface{}) (KeyValue, error) { +func (db *DB) GetForShare( + ctx context.Context, key interface{}, dur kvpb.KeyLockingDurabilityType, +) (KeyValue, error) { b := &Batch{} - b.GetForShare(key) + b.GetForShare(key, dur) return getOneRow(db.Run(ctx, b), b) } @@ -494,13 +499,14 @@ func (db *DB) scan( isReverse bool, str kvpb.KeyLockingStrengthType, readConsistency kvpb.ReadConsistencyType, + dur kvpb.KeyLockingDurabilityType, ) ([]KeyValue, error) { b := &Batch{} b.Header.ReadConsistency = readConsistency if maxRows > 0 { b.Header.MaxSpanRequestKeys = maxRows } - b.scan(begin, end, isReverse, str) + b.scan(begin, end, isReverse, str, dur) r, err := getOneResult(db.Run(ctx, b), b) return r.Rows, err } @@ -512,33 +518,37 @@ func (db *DB) scan( // // key can be either a byte slice or a string. func (db *DB) Scan(ctx context.Context, begin, end interface{}, maxRows int64) ([]KeyValue, error) { - return db.scan(ctx, begin, end, maxRows, false /* isReverse */, kvpb.NonLocking, kvpb.CONSISTENT) + return db.scan(ctx, begin, end, maxRows, false /* isReverse */, kvpb.NonLocking, kvpb.CONSISTENT, kvpb.Invalid) } // ScanForUpdate retrieves the rows between begin (inclusive) and end -// (exclusive) in ascending order. Unreplicated, exclusive locks are -// acquired on each of the returned keys. +// (exclusive) in ascending order. Exclusive locks with the supplied durability +// are acquired on each of the returned keys. // // The returned []KeyValue will contain up to maxRows elements. // // key can be either a byte slice or a string. func (db *DB) ScanForUpdate( - ctx context.Context, begin, end interface{}, maxRows int64, + ctx context.Context, begin, end interface{}, maxRows int64, dur kvpb.KeyLockingDurabilityType, ) ([]KeyValue, error) { - return db.scan(ctx, begin, end, maxRows, false /* isReverse */, kvpb.ForUpdate, kvpb.CONSISTENT) + return db.scan( + ctx, begin, end, maxRows, false /* isReverse */, kvpb.ForUpdate, kvpb.CONSISTENT, dur, + ) } -// ScanForShare retrieves the rows between begin (inclusive) and end -// (exclusive) in ascending order. Unreplicated, shared locks are -// acquired on each of the returned keys. +// ScanForShare retrieves the rows between begin (inclusive) and end (exclusive) +// in ascending order. Shared locks with the supplied durability are acquired on +// each of the returned keys. // // The returned []KeyValue will contain up to maxRows elements. // // key can be either a byte slice or a string. func (db *DB) ScanForShare( - ctx context.Context, begin, end interface{}, maxRows int64, + ctx context.Context, begin, end interface{}, maxRows int64, dur kvpb.KeyLockingDurabilityType, ) ([]KeyValue, error) { - return db.scan(ctx, begin, end, maxRows, false /* isReverse */, kvpb.ForShare, kvpb.CONSISTENT) + return db.scan( + ctx, begin, end, maxRows, false /* isReverse */, kvpb.ForShare, kvpb.CONSISTENT, dur, + ) } // ReverseScan retrieves the rows between begin (inclusive) and end (exclusive) @@ -550,33 +560,37 @@ func (db *DB) ScanForShare( func (db *DB) ReverseScan( ctx context.Context, begin, end interface{}, maxRows int64, ) ([]KeyValue, error) { - return db.scan(ctx, begin, end, maxRows, true /* isReverse */, kvpb.NonLocking, kvpb.CONSISTENT) + return db.scan(ctx, begin, end, maxRows, true /* isReverse */, kvpb.NonLocking, kvpb.CONSISTENT, kvpb.Invalid) } // ReverseScanForUpdate retrieves the rows between begin (inclusive) and end -// (exclusive) in descending order. Unreplicated, exclusive locks are acquired -// on each of the returned keys. +// (exclusive) in descending order. Exclusive locks with the supplied durability +// are acquired on each of the returned keys. // // The returned []KeyValue will contain up to maxRows elements. // // key can be either a byte slice or a string. func (db *DB) ReverseScanForUpdate( - ctx context.Context, begin, end interface{}, maxRows int64, + ctx context.Context, begin, end interface{}, maxRows int64, dur kvpb.KeyLockingDurabilityType, ) ([]KeyValue, error) { - return db.scan(ctx, begin, end, maxRows, true /* isReverse */, kvpb.ForUpdate, kvpb.CONSISTENT) + return db.scan( + ctx, begin, end, maxRows, true /* isReverse */, kvpb.ForUpdate, kvpb.CONSISTENT, dur, + ) } // ReverseScanForShare retrieves the rows between begin (inclusive) and end -// (exclusive) in descending order. Unreplicated, shared locks are acquired -// on each of the returned keys. +// (exclusive) in descending order. Shared locks with the supplied durability +// are acquired on each of the returned keys. // // The returned []KeyValue will contain up to maxRows elements. // // key can be either a byte slice or a string. func (db *DB) ReverseScanForShare( - ctx context.Context, begin, end interface{}, maxRows int64, + ctx context.Context, begin, end interface{}, maxRows int64, dur kvpb.KeyLockingDurabilityType, ) ([]KeyValue, error) { - return db.scan(ctx, begin, end, maxRows, true /* isReverse */, kvpb.ForShare, kvpb.CONSISTENT) + return db.scan( + ctx, begin, end, maxRows, true /* isReverse */, kvpb.ForShare, kvpb.CONSISTENT, dur, + ) } // Del deletes one or more keys. diff --git a/pkg/kv/db_test.go b/pkg/kv/db_test.go index 2f6617e9c216..99859532c961 100644 --- a/pkg/kv/db_test.go +++ b/pkg/kv/db_test.go @@ -111,26 +111,44 @@ func TestDB_GetForUpdate(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) s, db := setup(t) - defer s.Stopper().Stop(context.Background()) + ctx := context.Background() + defer s.Stopper().Stop(ctx) - result, err := db.GetForUpdate(context.Background(), "aa") - if err != nil { - t.Fatal(err) - } - checkResult(t, []byte(""), result.ValueBytes()) + testutils.RunTrueAndFalse(t, "durability-guaranteed", func(t *testing.T, durabilityGuaranteed bool) { + var result kv.KeyValue + var err error + if durabilityGuaranteed { + result, err = db.GetForUpdate(ctx, "aa", kvpb.GuaranteedDurability) + } else { + result, err = db.GetForUpdate(ctx, "aa", kvpb.BestEffort) + } + if err != nil { + t.Fatal(err) + } + checkResult(t, []byte(""), result.ValueBytes()) + }) } func TestDB_GetForShare(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) s, db := setup(t) - defer s.Stopper().Stop(context.Background()) + ctx := context.Background() + defer s.Stopper().Stop(ctx) - result, err := db.GetForShare(context.Background(), "aa") - if err != nil { - t.Fatal(err) - } - checkResult(t, []byte(""), result.ValueBytes()) + testutils.RunTrueAndFalse(t, "durability-guaranteed", func(t *testing.T, durabilityGuaranteed bool) { + var result kv.KeyValue + var err error + if durabilityGuaranteed { + result, err = db.GetForShare(ctx, "aa", kvpb.GuaranteedDurability) + } else { + result, err = db.GetForShare(ctx, "aa", kvpb.BestEffort) + } + if err != nil { + t.Fatal(err) + } + checkResult(t, []byte(""), result.ValueBytes()) + }) } func TestDB_Put(t *testing.T) { @@ -363,52 +381,70 @@ func TestDB_ScanForUpdate(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) s, db := setup(t) - defer s.Stopper().Stop(context.Background()) + ctx := context.Background() + defer s.Stopper().Stop(ctx) - b := &kv.Batch{} - b.Put("aa", "1") - b.Put("ab", "2") - b.Put("bb", "3") - if err := db.Run(context.Background(), b); err != nil { - t.Fatal(err) - } - rows, err := db.ScanForUpdate(context.Background(), "a", "b", 100) - if err != nil { - t.Fatal(err) - } - expected := map[string][]byte{ - "aa": []byte("1"), - "ab": []byte("2"), - } + testutils.RunTrueAndFalse(t, "durability-guaranteed", func(t *testing.T, durabilityGuaranteed bool) { + b := &kv.Batch{} + b.Put("aa", "1") + b.Put("ab", "2") + b.Put("bb", "3") + if err := db.Run(context.Background(), b); err != nil { + t.Fatal(err) + } + var rows []kv.KeyValue + var err error + if durabilityGuaranteed { + rows, err = db.ScanForUpdate(ctx, "a", "b", 100, kvpb.GuaranteedDurability) + } else { + rows, err = db.ScanForUpdate(ctx, "a", "b", 100, kvpb.BestEffort) + } + if err != nil { + t.Fatal(err) + } + expected := map[string][]byte{ + "aa": []byte("1"), + "ab": []byte("2"), + } - checkRows(t, expected, rows) - checkLen(t, len(expected), len(rows)) + checkRows(t, expected, rows) + checkLen(t, len(expected), len(rows)) + }) } func TestDB_ScanForShare(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) s, db := setup(t) - defer s.Stopper().Stop(context.Background()) + ctx := context.Background() + defer s.Stopper().Stop(ctx) - b := &kv.Batch{} - b.Put("aa", "1") - b.Put("ab", "2") - b.Put("bb", "3") - if err := db.Run(context.Background(), b); err != nil { - t.Fatal(err) - } - rows, err := db.ScanForShare(context.Background(), "a", "b", 100) - if err != nil { - t.Fatal(err) - } - expected := map[string][]byte{ - "aa": []byte("1"), - "ab": []byte("2"), - } + testutils.RunTrueAndFalse(t, "durability-guaranteed", func(t *testing.T, durabilityGuaranteed bool) { + b := &kv.Batch{} + b.Put("aa", "1") + b.Put("ab", "2") + b.Put("bb", "3") + if err := db.Run(ctx, b); err != nil { + t.Fatal(err) + } + var rows []kv.KeyValue + var err error + if durabilityGuaranteed { + rows, err = db.ScanForShare(ctx, "a", "b", 100, kvpb.GuaranteedDurability) + } else { + rows, err = db.ScanForShare(ctx, "a", "b", 100, kvpb.BestEffort) + } + if err != nil { + t.Fatal(err) + } + expected := map[string][]byte{ + "aa": []byte("1"), + "ab": []byte("2"), + } - checkRows(t, expected, rows) - checkLen(t, len(expected), len(rows)) + checkRows(t, expected, rows) + checkLen(t, len(expected), len(rows)) + }) } func TestDB_ReverseScan(t *testing.T) { @@ -441,52 +477,72 @@ func TestDB_ReverseScanForUpdate(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) s, db := setup(t) - defer s.Stopper().Stop(context.Background()) + ctx := context.Background() + defer s.Stopper().Stop(ctx) - b := &kv.Batch{} - b.Put("aa", "1") - b.Put("ab", "2") - b.Put("bb", "3") - if err := db.Run(context.Background(), b); err != nil { - t.Fatal(err) - } - rows, err := db.ReverseScanForUpdate(context.Background(), "ab", "c", 100) - if err != nil { - t.Fatal(err) - } - expected := map[string][]byte{ - "bb": []byte("3"), - "ab": []byte("2"), - } + testutils.RunTrueAndFalse(t, "durability-guaranteed", func(t *testing.T, durabilityGuaranteed bool) { + b := &kv.Batch{} + b.Put("aa", "1") + b.Put("ab", "2") + b.Put("bb", "3") + if err := db.Run(ctx, b); err != nil { + t.Fatal(err) + } + var rows []kv.KeyValue + var err error - checkRows(t, expected, rows) - checkLen(t, len(expected), len(rows)) + if durabilityGuaranteed { + rows, err = db.ReverseScanForUpdate(ctx, "ab", "c", 100, kvpb.GuaranteedDurability) + } else { + rows, err = db.ReverseScanForUpdate(ctx, "ab", "c", 100, kvpb.BestEffort) + } + if err != nil { + t.Fatal(err) + } + expected := map[string][]byte{ + "bb": []byte("3"), + "ab": []byte("2"), + } + + checkRows(t, expected, rows) + checkLen(t, len(expected), len(rows)) + }) } func TestDB_ReverseScanForShare(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) s, db := setup(t) - defer s.Stopper().Stop(context.Background()) + ctx := context.Background() + defer s.Stopper().Stop(ctx) - b := &kv.Batch{} - b.Put("aa", "1") - b.Put("ab", "2") - b.Put("bb", "3") - if err := db.Run(context.Background(), b); err != nil { - t.Fatal(err) - } - rows, err := db.ReverseScanForShare(context.Background(), "ab", "c", 100) - if err != nil { - t.Fatal(err) - } - expected := map[string][]byte{ - "bb": []byte("3"), - "ab": []byte("2"), - } + testutils.RunTrueAndFalse(t, "durability-guaranteed", func(t *testing.T, durabilityGuaranteed bool) { + b := &kv.Batch{} + b.Put("aa", "1") + b.Put("ab", "2") + b.Put("bb", "3") + if err := db.Run(ctx, b); err != nil { + t.Fatal(err) + } + var rows []kv.KeyValue + var err error - checkRows(t, expected, rows) - checkLen(t, len(expected), len(rows)) + if durabilityGuaranteed { + rows, err = db.ReverseScanForShare(ctx, "ab", "c", 100, kvpb.GuaranteedDurability) + } else { + rows, err = db.ReverseScanForShare(ctx, "ab", "c", 100, kvpb.BestEffort) + } + if err != nil { + t.Fatal(err) + } + expected := map[string][]byte{ + "bb": []byte("3"), + "ab": []byte("2"), + } + + checkRows(t, expected, rows) + checkLen(t, len(expected), len(rows)) + }) } func TestDB_TxnIterate(t *testing.T) { @@ -762,7 +818,7 @@ func TestDBDecommissionedOperations(t *testing.T) { return err }}, {"GetForUpdate", func() error { - _, err := db.GetForUpdate(ctx, key) + _, err := db.GetForUpdate(ctx, key, kvpb.BestEffort) return err }}, {"Put", func() error { diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go index 0c3b6041ce7e..781ade85153c 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go @@ -402,7 +402,7 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { getInBatch := func(t *testing.T, ctx context.Context, txn *kv.Txn, keys ...roachpb.Key) []int64 { batch := txn.NewBatch() for _, key := range keys { - batch.GetForUpdate(key) + batch.GetForUpdate(key, kvpb.BestEffort) } assert.NoError(t, txn.Run(ctx, batch)) assert.Len(t, batch.Results, len(keys)) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index 714c316f8fa0..ef9baa236ef4 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -1113,8 +1113,8 @@ func TestMultiRangeScanReverseScanInconsistent(t *testing.T) { // OpRequiresTxnError. We set the local clock to the timestamp of // just above the first key to verify it's used to read only key "a". for i, request := range []kvpb.Request{ - kvpb.NewScan(roachpb.Key("a"), roachpb.Key("c"), kvpb.NonLocking), - kvpb.NewReverseScan(roachpb.Key("a"), roachpb.Key("c"), kvpb.NonLocking), + kvpb.NewScan(roachpb.Key("a"), roachpb.Key("c")), + kvpb.NewReverseScan(roachpb.Key("a"), roachpb.Key("c")), } { // The looping is necessary since the Put above of a may not have been // applied by time we execute the scan. If it has not run, then try the @@ -1194,7 +1194,7 @@ func TestMultiRangeScanDeleteRange(t *testing.T) { if _, err := kv.SendWrapped(ctx, tds, put); err != nil { t.Fatal(err) } - scan := kvpb.NewScan(writes[0], writes[len(writes)-1].Next(), kvpb.NonLocking) + scan := kvpb.NewScan(writes[0], writes[len(writes)-1].Next()) reply, err := kv.SendWrapped(ctx, tds, scan) if err != nil { t.Fatal(err) @@ -1233,7 +1233,7 @@ func TestMultiRangeScanDeleteRange(t *testing.T) { txnProto := roachpb.MakeTransaction("MyTxn", nil, isolation.Serializable, 0, now.ToTimestamp(), 0, int32(s.SQLInstanceID()), 0) txn := kv.NewTxnFromProto(ctx, db, s.NodeID(), now, kv.RootTxn, &txnProto) - scan := kvpb.NewScan(writes[0], writes[len(writes)-1].Next(), kvpb.NonLocking) + scan := kvpb.NewScan(writes[0], writes[len(writes)-1].Next()) ba := &kvpb.BatchRequest{} ba.Header = kvpb.Header{Txn: &txnProto} ba.Add(scan) @@ -1351,7 +1351,7 @@ func TestMultiRangeScanWithPagination(t *testing.T) { // happens above this. var maxTargetBytes int64 { - scan := kvpb.NewScan(tc.keys[0], tc.keys[len(tc.keys)-1].Next(), kvpb.NonLocking) + scan := kvpb.NewScan(tc.keys[0], tc.keys[len(tc.keys)-1].Next()) resp, pErr := kv.SendWrapped(ctx, tds, scan) require.Nil(t, pErr) require.Nil(t, resp.Header().ResumeSpan) @@ -1392,11 +1392,11 @@ func TestMultiRangeScanWithPagination(t *testing.T) { var req kvpb.Request switch { case span.EndKey == nil: - req = kvpb.NewGet(span.Key, kvpb.NonLocking) + req = kvpb.NewGet(span.Key) case reverse: - req = kvpb.NewReverseScan(span.Key, span.EndKey, kvpb.NonLocking) + req = kvpb.NewReverseScan(span.Key, span.EndKey) default: - req = kvpb.NewScan(span.Key, span.EndKey, kvpb.NonLocking) + req = kvpb.NewScan(span.Key, span.EndKey) } ba.Add(req) } @@ -3036,7 +3036,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { return db.Put(ctx, "a", "put") }, retryable: func(ctx context.Context, txn *kv.Txn) error { - _, err := txn.ScanForUpdate(ctx, "a", "a\x00", 0) + _, err := txn.ScanForUpdate(ctx, "a", "a\x00", 0, kvpb.BestEffort) return err }, allIsoLevels: &expect{ @@ -3050,7 +3050,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { return db.Put(ctx, "a", "put") }, retryable: func(ctx context.Context, txn *kv.Txn) error { - _, err := txn.ScanForUpdate(ctx, "a", "a\x00", 0) + _, err := txn.ScanForUpdate(ctx, "a", "a\x00", 0, kvpb.BestEffort) return err }, priorReads: true, @@ -3081,7 +3081,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { return db.Put(ctx, "a", "put") }, retryable: func(ctx context.Context, txn *kv.Txn) error { - _, err := txn.ScanForUpdate(ctx, "a", "c", 0) + _, err := txn.ScanForUpdate(ctx, "a", "c", 0, kvpb.BestEffort) return err }, allIsoLevels: &expect{ @@ -3096,7 +3096,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { return db.Put(ctx, "b", "put") }, retryable: func(ctx context.Context, txn *kv.Txn) error { - _, err := txn.ScanForUpdate(ctx, "a", "c", 0) + _, err := txn.ScanForUpdate(ctx, "a", "c", 0, kvpb.BestEffort) return err }, allIsoLevels: &expect{ @@ -3112,8 +3112,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { }, retryable: func(ctx context.Context, txn *kv.Txn) error { b := txn.NewBatch() - b.ScanForUpdate("a", "a\x00") - b.ScanForUpdate("b", "b\x00") + b.ScanForUpdate("a", "a\x00", kvpb.BestEffort) + b.ScanForUpdate("b", "b\x00", kvpb.BestEffort) return txn.Run(ctx, b) }, allIsoLevels: &expect{ diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index cd41e176982f..27ee10f1a8dd 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -489,7 +489,7 @@ func TestSendRPCOrder(t *testing.T) { RangeID: rangeID, // Not used in this test, but why not. RoutingPolicy: tc.routingPolicy, } - req := kvpb.NewScan(roachpb.Key("a"), roachpb.Key("b"), kvpb.NonLocking) + req := kvpb.NewScan(roachpb.Key("a"), roachpb.Key("b")) _, pErr := kv.SendWrappedWith(ctx, ds, header, req) require.Nil(t, pErr) }) @@ -950,7 +950,7 @@ func TestNoBackoffOnNotLeaseHolderErrorFromFollowerRead(t *testing.T) { Lease: lease, }) - get := kvpb.NewGet(roachpb.Key("a"), kvpb.NonLocking) + get := kvpb.NewGet(roachpb.Key("a")) _, pErr := kv.SendWrapped(ctx, ds, get) require.Nil(t, pErr) require.Equal(t, []roachpb.NodeID{1, 2}, sentTo) @@ -1022,7 +1022,7 @@ func TestNoBackoffOnNotLeaseHolderErrorWithoutLease(t *testing.T) { // Send a request. It should try all three replicas once: the first two fail // with NLHE, the third one succeeds. None of them should trigger backoffs. - _, pErr := kv.SendWrapped(ctx, ds, kvpb.NewGet(roachpb.Key("a"), kvpb.NonLocking)) + _, pErr := kv.SendWrapped(ctx, ds, kvpb.NewGet(roachpb.Key("a"))) require.NoError(t, pErr.GoError()) require.Equal(t, []roachpb.NodeID{1, 2, 3}, sentTo) require.Equal(t, int64(0), ds.Metrics().InLeaseTransferBackoffs.Count()) @@ -1118,7 +1118,7 @@ func TestDistSenderMovesOnFromReplicaWithStaleLease(t *testing.T) { Lease: cachedLease, }) - get := kvpb.NewGet(roachpb.Key("a"), kvpb.NonLocking) + get := kvpb.NewGet(roachpb.Key("a")) _, pErr := kv.SendWrapped(ctx, ds, get) require.Nil(t, pErr) @@ -1240,7 +1240,7 @@ func TestDistSenderIgnoresNLHEBasedOnOldRangeGeneration(t *testing.T) { Lease: cachedLease, }) - get := kvpb.NewGet(roachpb.Key("a"), kvpb.NonLocking) + get := kvpb.NewGet(roachpb.Key("a")) _, pErr := kv.SendWrapped(ctx, ds, get) require.Nil(t, pErr) @@ -1349,7 +1349,7 @@ func TestDistSenderRetryOnTransportErrors(t *testing.T) { Lease: cachedLease, }) - get := kvpb.NewGet(roachpb.Key("a"), kvpb.NonLocking) + get := kvpb.NewGet(roachpb.Key("a")) _, pErr := kv.SendWrapped(ctx, ds, get) if spec.shouldRetry { require.True(t, secondReplicaTried, "Second replica was not retried") @@ -1903,7 +1903,7 @@ func TestRetryOnWrongReplicaError(t *testing.T) { Settings: cluster.MakeTestingClusterSettings(), } ds := NewDistSender(cfg) - scan := kvpb.NewScan(roachpb.Key("a"), roachpb.Key("d"), kvpb.NonLocking) + scan := kvpb.NewScan(roachpb.Key("a"), roachpb.Key("d")) if _, err := kv.SendWrapped(context.Background(), ds, scan); err != nil { t.Errorf("scan encountered error: %s", err) } @@ -2007,7 +2007,7 @@ func TestRetryOnWrongReplicaErrorWithSuggestion(t *testing.T) { RPCRetryOptions: &retry.Options{MaxRetries: 1}, } ds := NewDistSender(cfg) - scan := kvpb.NewScan(roachpb.Key("a"), roachpb.Key("d"), kvpb.NonLocking) + scan := kvpb.NewScan(roachpb.Key("a"), roachpb.Key("d")) if _, err := kv.SendWrapped(context.Background(), ds, scan); err != nil { t.Errorf("scan encountered error: %s", err) } @@ -2127,7 +2127,7 @@ func TestSendRPCRetry(t *testing.T) { Settings: cluster.MakeTestingClusterSettings(), } ds := NewDistSender(cfg) - scan := kvpb.NewScan(roachpb.Key("a"), roachpb.Key("d"), kvpb.NonLocking) + scan := kvpb.NewScan(roachpb.Key("a"), roachpb.Key("d")) sr, err := kv.SendWrappedWith(ctx, ds, kvpb.Header{MaxSpanRequestKeys: 1}, scan) if err != nil { t.Fatal(err) @@ -2253,7 +2253,7 @@ func TestDistSenderDescriptorUpdatesOnSuccessfulRPCs(t *testing.T) { // Send a request that's going to receive a response with a RangeInfo. k := roachpb.Key("a") - get := kvpb.NewGet(k, kvpb.NonLocking) + get := kvpb.NewGet(k) ba := &kvpb.BatchRequest{} ba.Add(get) _, pErr := ds.Send(ctx, ba) @@ -2366,7 +2366,7 @@ func TestSendRPCRangeNotFoundError(t *testing.T) { Settings: cluster.MakeTestingClusterSettings(), } ds = NewDistSender(cfg) - get := kvpb.NewGet(roachpb.Key("b"), kvpb.NonLocking) + get := kvpb.NewGet(roachpb.Key("b")) _, err := kv.SendWrapped(ctx, ds, get) if err != nil { t.Fatal(err) @@ -2462,8 +2462,8 @@ func TestMultiRangeGapReverse(t *testing.T) { ba := &kvpb.BatchRequest{} ba.Txn = &txn - ba.Add(kvpb.NewReverseScan(splits[0], splits[1], kvpb.NonLocking)) - ba.Add(kvpb.NewReverseScan(splits[2], splits[3], kvpb.NonLocking)) + ba.Add(kvpb.NewReverseScan(splits[0], splits[1])) + ba.Add(kvpb.NewReverseScan(splits[2], splits[3])) // Before fixing https://github.com/cockroachdb/cockroach/issues/18174, this // would error with: @@ -2564,7 +2564,7 @@ func TestMultiRangeMergeStaleDescriptor(t *testing.T) { Settings: cluster.MakeTestingClusterSettings(), } ds := NewDistSender(cfg) - scan := kvpb.NewScan(roachpb.Key("a"), roachpb.Key("d"), kvpb.NonLocking) + scan := kvpb.NewScan(roachpb.Key("a"), roachpb.Key("d")) // Set the Txn info to avoid an OpRequiresTxnError. reply, err := kv.SendWrappedWith(ctx, ds, kvpb.Header{ MaxSpanRequestKeys: 10, @@ -2918,7 +2918,6 @@ func TestTruncateWithLocalSpanAndDescriptor(t *testing.T) { ba.Add(kvpb.NewScan( keys.RangeDescriptorKey(roachpb.RKey("a")), keys.RangeDescriptorKey(roachpb.RKey("c")), - kvpb.NonLocking, )) if _, pErr := ds.Send(ctx, ba); pErr != nil { @@ -4038,19 +4037,19 @@ func TestCanSendToFollower(t *testing.T) { kvpb.Header{ Txn: &roachpb.Transaction{}, }, - kvpb.NewGet(roachpb.Key("a"), kvpb.NonLocking), + kvpb.NewGet(roachpb.Key("a")), 1, }, { true, kvpb.Header{}, - kvpb.NewGet(roachpb.Key("a"), kvpb.NonLocking), + kvpb.NewGet(roachpb.Key("a")), 1, }, { false, kvpb.Header{}, - kvpb.NewGet(roachpb.Key("a"), kvpb.NonLocking), + kvpb.NewGet(roachpb.Key("a")), 2, }, } { @@ -4241,7 +4240,7 @@ func TestEvictMetaRange(t *testing.T) { } ds := NewDistSender(cfg) - scan := kvpb.NewScan(roachpb.Key("a"), roachpb.Key("b"), kvpb.NonLocking) + scan := kvpb.NewScan(roachpb.Key("a"), roachpb.Key("b")) if _, pErr := kv.SendWrapped(ctx, ds, scan); pErr != nil { t.Fatalf("scan encountered error: %s", pErr) } @@ -4256,7 +4255,7 @@ func TestEvictMetaRange(t *testing.T) { // Simulate a split on the meta2 range and mark it as stale. isStale = true - scan = kvpb.NewScan(roachpb.Key("b"), roachpb.Key("c"), kvpb.NonLocking) + scan = kvpb.NewScan(roachpb.Key("b"), roachpb.Key("c")) if _, pErr := kv.SendWrapped(ctx, ds, scan); pErr != nil { t.Fatalf("scan encountered error: %s", pErr) } @@ -4570,13 +4569,13 @@ func TestRequestSubdivisionAfterDescriptorChange(t *testing.T) { splitKey := keys.MustAddr(keyB) get := func(k roachpb.Key) kvpb.Request { - return kvpb.NewGet(k, kvpb.NonLocking) + return kvpb.NewGet(k) } scan := func(k roachpb.Key) kvpb.Request { - return kvpb.NewScan(k, k.Next(), kvpb.NonLocking) + return kvpb.NewScan(k, k.Next()) } revScan := func(k roachpb.Key) kvpb.Request { - return kvpb.NewReverseScan(k, k.Next(), kvpb.NonLocking) + return kvpb.NewReverseScan(k, k.Next()) } for _, tc := range []struct { @@ -4709,7 +4708,7 @@ func TestRequestSubdivisionAfterDescriptorChangeWithUnavailableReplicasTerminate splitKey := keys.MustAddr(keyB) get := func(k roachpb.Key) kvpb.Request { - return kvpb.NewGet(k, kvpb.NonLocking) + return kvpb.NewGet(k) } ctx := context.Background() @@ -4802,13 +4801,13 @@ func TestDescriptorChangeAfterRequestSubdivision(t *testing.T) { laterSplitKey2 := keys.MustAddr(keyD) get := func(k roachpb.Key) kvpb.Request { - return kvpb.NewGet(k, kvpb.NonLocking) + return kvpb.NewGet(k) } scan := func(k roachpb.Key) kvpb.Request { - return kvpb.NewScan(k, k.Next(), kvpb.NonLocking) + return kvpb.NewScan(k, k.Next()) } revScan := func(k roachpb.Key) kvpb.Request { - return kvpb.NewReverseScan(k, k.Next(), kvpb.NonLocking) + return kvpb.NewReverseScan(k, k.Next()) } for _, tc := range []struct { diff --git a/pkg/kv/kvclient/kvcoord/transport_test.go b/pkg/kv/kvclient/kvcoord/transport_test.go index 4e5e1e3528d0..afec40d4a1f0 100644 --- a/pkg/kv/kvclient/kvcoord/transport_test.go +++ b/pkg/kv/kvclient/kvcoord/transport_test.go @@ -154,7 +154,7 @@ func TestResponseVerifyFailure(t *testing.T) { } ba := &kvpb.BatchRequest{} - req := kvpb.NewScan(roachpb.KeyMin, roachpb.KeyMax, kvpb.NonLocking) + req := kvpb.NewScan(roachpb.KeyMin, roachpb.KeyMax) ba.Add(req) br := ba.CreateReply() resp := br.Responses[0].GetInner().(*kvpb.ScanResponse) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go index 3c0ccce64341..a4ebf555c1e9 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go @@ -174,7 +174,7 @@ func TestSavepoints(t *testing.T) { case "get": b := txn.NewBatch() if td.HasArg("locking") { - b.GetForUpdate(td.CmdArgs[0].Key) + b.GetForUpdate(td.CmdArgs[0].Key, kvpb.BestEffort) } else { b.Get(td.CmdArgs[0].Key) } diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index 09f318436d38..86c0d8aed0ad 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -1139,7 +1139,9 @@ func TestTxnCoordSenderNoDuplicateLockSpans(t *testing.T) { txn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */) // Acquire locks on a-b, c, m, u-w before the final batch. - _, pErr := txn.ScanForShare(ctx, roachpb.Key("a"), roachpb.Key("b"), 0) + _, pErr := txn.ScanForShare( + ctx, roachpb.Key("a"), roachpb.Key("b"), 0, kvpb.GuaranteedDurability, + ) if pErr != nil { t.Fatal(pErr) } @@ -1147,7 +1149,7 @@ func TestTxnCoordSenderNoDuplicateLockSpans(t *testing.T) { if pErr != nil { t.Fatal(pErr) } - _, pErr = txn.GetForUpdate(ctx, roachpb.Key("m")) + _, pErr = txn.GetForUpdate(ctx, roachpb.Key("m"), kvpb.GuaranteedDurability) if pErr != nil { t.Fatal(pErr) } @@ -1161,8 +1163,8 @@ func TestTxnCoordSenderNoDuplicateLockSpans(t *testing.T) { b.Put(roachpb.Key("b"), []byte("value")) b.Put(roachpb.Key("c"), []byte("value")) b.Put(roachpb.Key("d"), []byte("value")) - b.GetForUpdate(roachpb.Key("n")) - b.ReverseScanForShare(roachpb.Key("v"), roachpb.Key("z")) + b.GetForUpdate(roachpb.Key("n"), kvpb.GuaranteedDurability) + b.ReverseScanForShare(roachpb.Key("v"), roachpb.Key("z"), kvpb.GuaranteedDurability) // The expected locks are a-b, c, m, n, and u-z. expectedLockSpans = []roachpb.Span{ @@ -1872,7 +1874,7 @@ func TestOnePCErrorTracking(t *testing.T) { } b := txn.NewBatch() b.Put(keyA, "test value") - b.ScanForUpdate(keyB, keyC) + b.ScanForUpdate(keyB, keyC, kvpb.BestEffort) if err := txn.CommitInBatch(ctx, b); !testutils.IsError(err, "injected err") { t.Fatal(err) } @@ -2150,7 +2152,7 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) { if write { err = txn.Put(ctx, "consider", "phlebas") } else /* locking read */ { - _, err = txn.ScanForUpdate(ctx, "a", "b", 0) + _, err = txn.ScanForUpdate(ctx, "a", "b", 0, kvpb.BestEffort) } if err == nil { t.Fatal("missing injected retriable error") @@ -2879,24 +2881,32 @@ func TestTxnTypeCompatibleWithBatchRequest(t *testing.T) { // A LeafTxn is not compatible with locking requests. // 1. Locking Get requests. - _, err = leafTxn.GetForUpdate(ctx, roachpb.Key("a")) + _, err = leafTxn.GetForUpdate(ctx, roachpb.Key("a"), kvpb.GuaranteedDurability) require.Error(t, err) require.Regexp(t, "LeafTxn .* incompatible with locking request .*", err) - _, err = leafTxn.GetForShare(ctx, roachpb.Key("a")) + _, err = leafTxn.GetForShare(ctx, roachpb.Key("a"), kvpb.GuaranteedDurability) require.Error(t, err) require.Regexp(t, "LeafTxn .* incompatible with locking request .*", err) // 2. Locking Scan requests. - _, err = leafTxn.ScanForUpdate(ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */) + _, err = leafTxn.ScanForUpdate( + ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */, kvpb.GuaranteedDurability, + ) require.Error(t, err) require.Regexp(t, "LeafTxn .* incompatible with locking request .*", err) - _, err = leafTxn.ScanForShare(ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */) + _, err = leafTxn.ScanForShare( + ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */, kvpb.GuaranteedDurability, + ) require.Error(t, err) require.Regexp(t, "LeafTxn .* incompatible with locking request .*", err) // 3. Locking ReverseScan requests. - _, err = leafTxn.ReverseScanForUpdate(ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */) + _, err = leafTxn.ReverseScanForUpdate( + ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */, kvpb.GuaranteedDurability, + ) require.Error(t, err) require.Regexp(t, "LeafTxn .* incompatible with locking request .*", err) - _, err = leafTxn.ReverseScanForShare(ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */) + _, err = leafTxn.ReverseScanForShare( + ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */, kvpb.GuaranteedDurability, + ) require.Error(t, err) require.Regexp(t, "LeafTxn .* incompatible with locking request .*", err) // 4. Writes. @@ -2913,23 +2923,31 @@ func TestTxnTypeCompatibleWithBatchRequest(t *testing.T) { // A RootTxn is compatible with all requests. // 1. All types of Get requests. - _, err = rootTxn.GetForUpdate(ctx, roachpb.Key("a")) + _, err = rootTxn.GetForUpdate(ctx, roachpb.Key("a"), kvpb.GuaranteedDurability) require.NoError(t, err) - _, err = rootTxn.GetForShare(ctx, roachpb.Key("a")) + _, err = rootTxn.GetForShare(ctx, roachpb.Key("a"), kvpb.GuaranteedDurability) require.NoError(t, err) _, err = rootTxn.Get(ctx, roachpb.Key("a")) require.NoError(t, err) // 2. All types of Scan requests. - _, err = rootTxn.ScanForUpdate(ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */) + _, err = rootTxn.ScanForUpdate( + ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */, kvpb.GuaranteedDurability, + ) require.NoError(t, err) - _, err = rootTxn.ScanForShare(ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */) + _, err = rootTxn.ScanForShare( + ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */, kvpb.GuaranteedDurability, + ) require.NoError(t, err) _, err = rootTxn.Scan(ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */) require.NoError(t, err) - // 3. All types of ReverseScan requets. - _, err = rootTxn.ReverseScanForUpdate(ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */) + // 3. All types of ReverseScan requests. + _, err = rootTxn.ReverseScanForUpdate( + ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */, kvpb.GuaranteedDurability, + ) require.NoError(t, err) - _, err = rootTxn.ReverseScanForShare(ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */) + _, err = rootTxn.ReverseScanForShare( + ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */, kvpb.GuaranteedDurability, + ) require.NoError(t, err) _, err = rootTxn.ReverseScan(ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */) require.NoError(t, err) diff --git a/pkg/kv/kvclient/kvstreamer/streamer_test.go b/pkg/kv/kvclient/kvstreamer/streamer_test.go index 4889fbf5b44d..d479e51297cf 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer_test.go +++ b/pkg/kv/kvclient/kvstreamer/streamer_test.go @@ -90,7 +90,7 @@ func TestStreamerLimitations(t *testing.T) { defer streamer.Close(ctx) streamer.Init(kvstreamer.OutOfOrder, kvstreamer.Hints{UniqueRequests: true}, 1 /* maxKeysPerRow */, nil /* diskBuffer */) k := append(s.Codec().TenantPrefix(), roachpb.Key("key")...) - get := kvpb.NewGet(k, kvpb.NonLocking) + get := kvpb.NewGet(k) reqs := []kvpb.RequestUnion{{ Value: &kvpb.RequestUnion_Get{ Get: get.(*kvpb.GetRequest), diff --git a/pkg/kv/kvnemesis/applier.go b/pkg/kv/kvnemesis/applier.go index 56799bd7a34c..ba7326bb393b 100644 --- a/pkg/kv/kvnemesis/applier.go +++ b/pkg/kv/kvnemesis/applier.go @@ -237,15 +237,15 @@ type dbRunI interface { type clientI interface { dbRunI Get(context.Context, interface{}) (kv.KeyValue, error) - GetForUpdate(context.Context, interface{}) (kv.KeyValue, error) - GetForShare(context.Context, interface{}) (kv.KeyValue, error) + GetForUpdate(context.Context, interface{}, kvpb.KeyLockingDurabilityType) (kv.KeyValue, error) + GetForShare(context.Context, interface{}, kvpb.KeyLockingDurabilityType) (kv.KeyValue, error) Put(context.Context, interface{}, interface{}) error Scan(context.Context, interface{}, interface{}, int64) ([]kv.KeyValue, error) - ScanForUpdate(context.Context, interface{}, interface{}, int64) ([]kv.KeyValue, error) - ScanForShare(context.Context, interface{}, interface{}, int64) ([]kv.KeyValue, error) + ScanForUpdate(context.Context, interface{}, interface{}, int64, kvpb.KeyLockingDurabilityType) ([]kv.KeyValue, error) + ScanForShare(context.Context, interface{}, interface{}, int64, kvpb.KeyLockingDurabilityType) ([]kv.KeyValue, error) ReverseScan(context.Context, interface{}, interface{}, int64) ([]kv.KeyValue, error) - ReverseScanForUpdate(context.Context, interface{}, interface{}, int64) ([]kv.KeyValue, error) - ReverseScanForShare(context.Context, interface{}, interface{}, int64) ([]kv.KeyValue, error) + ReverseScanForUpdate(context.Context, interface{}, interface{}, int64, kvpb.KeyLockingDurabilityType) ([]kv.KeyValue, error) + ReverseScanForShare(context.Context, interface{}, interface{}, int64, kvpb.KeyLockingDurabilityType) ([]kv.KeyValue, error) Del(context.Context, ...interface{}) ([]roachpb.Key, error) DelRange(context.Context, interface{}, interface{}, bool) ([]roachpb.Key, error) } @@ -280,18 +280,18 @@ func batchRun( func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) { switch o := op.GetValue().(type) { case *GetOperation: - fn := (*kv.Batch).Get - if o.ForUpdate { - fn = (*kv.Batch).GetForUpdate - } - if o.ForShare { - fn = (*kv.Batch).GetForShare - } res, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) { if o.SkipLocked { b.Header.WaitPolicy = lock.WaitPolicy_SkipLocked } - fn(b, o.Key) + dur := kvpb.BestEffort + if o.ForUpdate { + b.GetForUpdate(o.Key, dur) + } else if o.ForShare { + b.GetForShare(o.Key, dur) + } else { + b.Get(o.Key) + } }) o.Result = resultInit(ctx, err) if err != nil { @@ -316,29 +316,28 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) { } o.Result.OptionalTimestamp = ts case *ScanOperation: - fn := (*kv.Batch).Scan - if o.Reverse { - if o.ForUpdate { - fn = (*kv.Batch).ReverseScanForUpdate - } else if o.ForShare { - fn = (*kv.Batch).ReverseScanForShare - } else { - fn = (*kv.Batch).ReverseScan - } - } else { - if o.ForUpdate { - fn = (*kv.Batch).ScanForUpdate - } else if o.ForShare { - fn = (*kv.Batch).ScanForShare - } else { - fn = (*kv.Batch).Scan - } - } res, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) { if o.SkipLocked { b.Header.WaitPolicy = lock.WaitPolicy_SkipLocked } - fn(b, o.Key, o.EndKey) + dur := kvpb.BestEffort + if o.Reverse { + if o.ForUpdate { + b.ReverseScanForUpdate(o.Key, o.EndKey, dur) + } else if o.ForShare { + b.ReverseScanForShare(o.Key, o.EndKey, dur) + } else { + b.ReverseScan(o.Key, o.EndKey) + } + } else { + if o.ForUpdate { + b.ScanForUpdate(o.Key, o.EndKey, dur) + } else if o.ForShare { + b.ScanForShare(o.Key, o.EndKey, dur) + } else { + b.Scan(o.Key, o.EndKey) + } + } }) o.Result = resultInit(ctx, err) if err != nil { @@ -443,8 +442,10 @@ func applyBatchOp( for i := range o.Ops { switch subO := o.Ops[i].GetValue().(type) { case *GetOperation: + // TODO(arul): Looks like I forgot to add shared locks here. + dur := kvpb.BestEffort if subO.ForUpdate { - b.GetForUpdate(subO.Key) + b.GetForUpdate(subO.Key, dur) } else { b.Get(subO.Key) } @@ -452,12 +453,13 @@ func applyBatchOp( b.Put(subO.Key, subO.Value()) setLastReqSeq(b, subO.Seq) case *ScanOperation: + dur := kvpb.BestEffort if subO.Reverse && subO.ForUpdate { - b.ReverseScanForUpdate(subO.Key, subO.EndKey) + b.ReverseScanForUpdate(subO.Key, subO.EndKey, dur) } else if subO.Reverse { b.ReverseScan(subO.Key, subO.EndKey) } else if subO.ForUpdate { - b.ScanForUpdate(subO.Key, subO.EndKey) + b.ScanForUpdate(subO.Key, subO.EndKey, dur) } else { b.Scan(subO.Key, subO.EndKey) } diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index 9c975999a36b..f8efbc409895 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -1189,15 +1189,29 @@ func (r *IsSpanEmptyRequest) ShallowCopy() Request { return &shallowCopy } -// NewGet returns a Request initialized to get the value at key. If -// forUpdate is true, an unreplicated, exclusive lock is acquired on on -// the key, if it exists. -func NewGet(key roachpb.Key, str KeyLockingStrengthType) Request { +// NewLockingGet returns a Request initialized to get the value at key. A lock +// corresponding to the supplied lock strength and durability is acquired on the +// key, if it exists. +func NewLockingGet( + key roachpb.Key, str KeyLockingStrengthType, dur KeyLockingDurabilityType, +) Request { return &GetRequest{ RequestHeader: RequestHeader{ Key: key, }, - KeyLockingStrength: scanLockStrength(str), + KeyLockingStrength: scanLockStrength(str), + KeyLockingDurability: scanLockDurability(dur), + } +} + +// NewGet returns a Request initialized to get the value at key. No lock is +// acquired on the key, even if it exists. +func NewGet(key roachpb.Key) Request { + return &GetRequest{ + RequestHeader: RequestHeader{ + Key: key, + }, + KeyLockingStrength: lock.None, } } @@ -1314,29 +1328,59 @@ func NewDeleteRange(startKey, endKey roachpb.Key, returnKeys bool) Request { } } -// NewScan returns a Request initialized to scan from start to end keys. -// If forUpdate is true, unreplicated, exclusive locks are acquired on -// each of the resulting keys. -func NewScan(key, endKey roachpb.Key, str KeyLockingStrengthType) Request { +// NewLockingScan returns a Request initialized to scan from start to end keys. +// A lock corresponding to the supplied lock strength and durability will be +// acquired on each of the resulting keys. +func NewLockingScan( + key, endKey roachpb.Key, str KeyLockingStrengthType, dur KeyLockingDurabilityType, +) Request { + return &ScanRequest{ + RequestHeader: RequestHeader{ + Key: key, + EndKey: endKey, + }, + KeyLockingStrength: scanLockStrength(str), + KeyLockingDurability: scanLockDurability(dur), + } +} + +// NewScan returns a Request initialized to scan from start to end keys. No +// locks will be acquired on the resulting keys. +func NewScan(key, endKey roachpb.Key) Request { return &ScanRequest{ RequestHeader: RequestHeader{ Key: key, EndKey: endKey, }, - KeyLockingStrength: scanLockStrength(str), + KeyLockingStrength: lock.None, + } +} + +// NewLockingReverseScan returns a Request initialized to reverse scan from end. +// A lock corresponding to the supplied lock strength and durability will be +// acquired on each of the resulting keys. +func NewLockingReverseScan( + key, endKey roachpb.Key, str KeyLockingStrengthType, dur KeyLockingDurabilityType, +) Request { + return &ReverseScanRequest{ + RequestHeader: RequestHeader{ + Key: key, + EndKey: endKey, + }, + KeyLockingStrength: scanLockStrength(str), + KeyLockingDurability: scanLockDurability(dur), } } -// NewReverseScan returns a Request initialized to reverse scan from end. -// If forUpdate is true, unreplicated, exclusive locks are acquired on -// each of the resulting keys. -func NewReverseScan(key, endKey roachpb.Key, str KeyLockingStrengthType) Request { +// NewReverseScan returns a Request initialized to reverse scan from end. No +// locks will be acquired on each of the resulting keys. +func NewReverseScan(key, endKey roachpb.Key) Request { return &ReverseScanRequest{ RequestHeader: RequestHeader{ Key: key, EndKey: endKey, }, - KeyLockingStrength: scanLockStrength(str), + KeyLockingStrength: lock.None, } } @@ -1362,10 +1406,40 @@ const ( ForUpdate ) +// KeyLockingDurabilityType is used to describe the durability goals of per-key +// locks acquired by locking Get, Scan, and ReverseScan requests. +type KeyLockingDurabilityType int8 + +const ( + // Invalid is meant to be used in places where supplying lock durability does + // not make sense. Notably, it's used by non-locking requests. + Invalid KeyLockingDurabilityType = iota + // BestEffort makes a best-effort attempt to hold any locks acquired until + // commit time. Locks are held in-memory on the leaseholder of the locked key; + // locks may be lost because of things like lease transfers, node restarts, + // range splits, and range merges. However, the unreplicated nature of these + // locks makes lock acquisition fast, making this a great choice for + // transactions that do not require locks for correctness -- serializable + // transactions. + BestEffort + // GuaranteedDurability guarantees that if the transaction commits then any + // locks acquired by it will be held until commit time. On commit, once the + // locks are released, no subsequent writers will be able to write at or below + // the transaction's commit timestamp -- regardless of the timestamp at which + // the lock was acquired. Simply put, once acquired, locks are guaranteed to + // provide protection until commit time. + // + // To provide this guarantee, locks are replicated -- which means they come + // with the performance penalties of doing so. They're attractive choices for + // transactions that require locks for correctness (read: read-committed, + // snapshot isolation). + GuaranteedDurability +) + func scanLockStrength(str KeyLockingStrengthType) lock.Strength { switch str { case NonLocking: - return lock.None + panic(fmt.Sprintf("unexpected strength %d", str)) case ForShare: return lock.Shared case ForUpdate: @@ -1375,6 +1449,19 @@ func scanLockStrength(str KeyLockingStrengthType) lock.Strength { } } +func scanLockDurability(dur KeyLockingDurabilityType) lock.Durability { + switch dur { + case Invalid: + panic("invalid lock durability") + case BestEffort: + return lock.Unreplicated + case GuaranteedDurability: + return lock.Replicated + default: + panic(fmt.Sprintf("unknown durability type: %d", dur)) + } +} + func flagForLockStrength(l lock.Strength) flag { if l != lock.None { return isLocking diff --git a/pkg/kv/kvpb/batch_test.go b/pkg/kv/kvpb/batch_test.go index 13b7d34e6770..a835f01cc4f5 100644 --- a/pkg/kv/kvpb/batch_test.go +++ b/pkg/kv/kvpb/batch_test.go @@ -344,9 +344,9 @@ func TestRefreshSpanIterate(t *testing.T) { func TestRefreshSpanIterateSkipLocked(t *testing.T) { ba := BatchRequest{} - ba.Add(NewGet(roachpb.Key("a"), NonLocking)) - ba.Add(NewScan(roachpb.Key("b"), roachpb.Key("d"), NonLocking)) - ba.Add(NewReverseScan(roachpb.Key("e"), roachpb.Key("g"), NonLocking)) + ba.Add(NewGet(roachpb.Key("a"))) + ba.Add(NewScan(roachpb.Key("b"), roachpb.Key("d"))) + ba.Add(NewReverseScan(roachpb.Key("e"), roachpb.Key("g"))) br := ba.CreateReply() // Without a SkipLocked wait policy. diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go index 32572389bb01..ae8e526f7235 100644 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -1069,14 +1069,14 @@ func (cbt *circuitBreakerTest) Write(idx int) error { func (cbt *circuitBreakerTest) Read(idx int) error { cbt.t.Helper() repl := cbt.repls[idx] - get := kvpb.NewGet(repl.Desc().StartKey.AsRawKey(), kvpb.NonLocking) + get := kvpb.NewGet(repl.Desc().StartKey.AsRawKey()) return cbt.Send(idx, get) } func (cbt *circuitBreakerTest) FollowerRead(idx int) error { cbt.t.Helper() repl := cbt.repls[idx] - get := kvpb.NewGet(repl.Desc().StartKey.AsRawKey(), kvpb.NonLocking) + get := kvpb.NewGet(repl.Desc().StartKey.AsRawKey()) ctx := context.Background() ts := repl.GetCurrentClosedTimestamp(ctx) return cbt.SendCtxTS(ctx, idx, get, ts) diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 5911155898e0..f474930c7df0 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -4983,7 +4983,7 @@ func TestOptimisticEvalRetry(t *testing.T) { defer s.Stopper().Stop(ctx) txn1 := db.NewTxn(ctx, "locking txn") - _, err := txn1.ScanForUpdate(ctx, "a", "c", 0) + _, err := txn1.ScanForUpdate(ctx, "a", "c", 0, kvpb.BestEffort) require.NoError(t, err) readDone := make(chan error) @@ -5038,7 +5038,7 @@ func TestOptimisticEvalNoContention(t *testing.T) { defer s.Stopper().Stop(ctx) txn1 := db.NewTxn(ctx, "locking txn") - _, err := txn1.ScanForUpdate(ctx, "b", "c", 0) + _, err := txn1.ScanForUpdate(ctx, "b", "c", 0, kvpb.BestEffort) require.NoError(t, err) readDone := make(chan error) @@ -5165,7 +5165,7 @@ func BenchmarkOptimisticEvalForLocks(b *testing.B) { go func() { for { txn := db.NewTxn(ctx, "locking txn") - _, err = txn.ScanForUpdate(ctx, lockStart, "c", 0) + _, err = txn.ScanForUpdate(ctx, lockStart, "c", 0, kvpb.BestEffort) require.NoError(b, err) time.Sleep(5 * time.Millisecond) // Normally, it would do a write here, but we don't bother. diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index b2f440781278..5bff9fb4d475 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -448,12 +448,12 @@ func TestQueryLocksAcrossRanges(t *testing.T) { txn1 := kv.NewTxn(ctx, db, s.NodeID()) err = txn1.Put(ctx, roachpb.Key("c"), []byte("baz")) require.NoError(t, err) - _, err = txn1.GetForUpdate(ctx, roachpb.Key("x")) + _, err = txn1.GetForUpdate(ctx, roachpb.Key("x"), kvpb.BestEffort) require.NoError(t, err) // Use txn2 to get an unreplicated lock on "p". txn2 := kv.NewTxn(ctx, db, s.NodeID()) - _, err = txn2.GetForUpdate(ctx, roachpb.Key("p")) + _, err = txn2.GetForUpdate(ctx, roachpb.Key("p"), kvpb.BestEffort) require.NoError(t, err) now := s.Clock().NowAsClockTimestamp() diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index c45c41f1bf90..0dea06bc0066 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -640,7 +640,7 @@ func (r *Replica) AdminMerge( // shortly. var rightDesc roachpb.RangeDescriptor rightDescKey := keys.RangeDescriptorKey(origLeftDesc.EndKey) - dbRightDescKV, err := txn.GetForUpdate(ctx, rightDescKey) + dbRightDescKV, err := txn.GetForUpdate(ctx, rightDescKey, kvpb.BestEffort) if err != nil { return err } @@ -3290,12 +3290,13 @@ func conditionalGetDescValueFromDB( forUpdate bool, check func(*roachpb.RangeDescriptor) (matched, skip bool), ) (kvDesc *roachpb.RangeDescriptor, kvDescBytes []byte, skip bool, err error) { - get := txn.Get + descKey := keys.RangeDescriptorKey(startKey) + var existingDescKV kv.KeyValue if forUpdate { - get = txn.GetForUpdate + existingDescKV, err = txn.GetForUpdate(ctx, descKey, kvpb.BestEffort) + } else { + existingDescKV, err = txn.Get(ctx, descKey) } - descKey := keys.RangeDescriptorKey(startKey) - existingDescKV, err := get(ctx, descKey) if err != nil { return nil, nil, false /* skip */, errors.Wrap(err, "fetching current range descriptor value") } diff --git a/pkg/kv/kvserver/replica_rankings_test.go b/pkg/kv/kvserver/replica_rankings_test.go index 4562742cc2d8..c99e0a7e1ed2 100644 --- a/pkg/kv/kvserver/replica_rankings_test.go +++ b/pkg/kv/kvserver/replica_rankings_test.go @@ -199,7 +199,7 @@ func TestAddSSTQPSStat(t *testing.T) { // genVariableRead returns a batch request containing, start-end sequential key reads. func genVariableRead(ctx context.Context, start, end roachpb.Key) *kvpb.BatchRequest { - scan := kvpb.NewScan(start, end, kvpb.NonLocking) + scan := kvpb.NewScan(start, end) readBa := &kvpb.BatchRequest{} readBa.Add(scan) return readBa diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 0ed77816bd38..4c202827b389 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -11085,7 +11085,7 @@ func TestReplicaNotifyLockTableOn1PC(t *testing.T) { txn := newTransaction("test", key, 1, tc.Clock()) ba := &kvpb.BatchRequest{} ba.Header = kvpb.Header{Txn: txn} - ba.Add(kvpb.NewScan(key, key.Next(), kvpb.ForUpdate)) + ba.Add(kvpb.NewLockingScan(key, key.Next(), kvpb.ForUpdate, kvpb.BestEffort)) if _, pErr := tc.Sender().Send(ctx, ba); pErr != nil { t.Fatalf("unexpected error: %s", pErr) } @@ -11182,9 +11182,9 @@ func TestReplicaAsyncIntentResolutionOn1PC(t *testing.T) { // Perform one or more "for update" gets. This should acquire unreplicated, // exclusive locks on the keys. b := txn.NewBatch() - b.GetForUpdate(keyA) + b.GetForUpdate(keyA, kvpb.BestEffort) if external { - b.GetForUpdate(keyB) + b.GetForUpdate(keyB, kvpb.BestEffort) } err = txn.Run(ctx, b) require.NoError(t, err) @@ -11244,7 +11244,7 @@ func TestReplicaQueryLocks(t *testing.T) { txn := newTransaction("test", keyA, 1, tc.Clock()) ba := &kvpb.BatchRequest{} ba.Header = kvpb.Header{Txn: txn} - ba.Add(kvpb.NewScan(keyA, keyB.Next(), kvpb.ForUpdate)) + ba.Add(kvpb.NewLockingScan(keyA, keyB.Next(), kvpb.ForUpdate, kvpb.BestEffort)) if _, pErr := tc.Sender().Send(ctx, ba); pErr != nil { t.Fatalf("unexpected error: %s", pErr) } diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index d9c3551adf32..50bd3bd45de0 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -473,30 +473,35 @@ func (txn *Txn) Get(ctx context.Context, key interface{}) (KeyValue, error) { } // GetForUpdate retrieves the value for a key, returning the retrieved key/value -// or an error. An unreplicated, exclusive lock is acquired on the key, if it -// exists. It is not considered an error for the key to not exist. +// or an error. An Exclusive lock with the supplied durability is acquired on +// the key, if it exists. It is not considered an error for the key not to +// exist. // // r, err := txn.GetForUpdate("a") // // string(r.Key) == "a" // // key can be either a byte slice or a string. -func (txn *Txn) GetForUpdate(ctx context.Context, key interface{}) (KeyValue, error) { +func (txn *Txn) GetForUpdate( + ctx context.Context, key interface{}, dur kvpb.KeyLockingDurabilityType, +) (KeyValue, error) { b := txn.NewBatch() - b.GetForUpdate(key) + b.GetForUpdate(key, dur) return getOneRow(txn.Run(ctx, b), b) } -// GetForShare retrieves the value for a key, returning the retrieved key/value -// or an error. An unreplicated, shared lock is acquired on the key, if it -// exists. It is not considered an error for the key to not exist. +// GetForShare retrieves the value for a key. A shared lock with the supplied +// durability is acquired on the key, if it exists. A new result will be +// appended to the batch which will contain a single row. // // r, err := txn.GetForShare("a") // // string(r.Key) == "a" // // key can be either a byte slice or a string. -func (txn *Txn) GetForShare(ctx context.Context, key interface{}) (KeyValue, error) { +func (txn *Txn) GetForShare( + ctx context.Context, key interface{}, dur kvpb.KeyLockingDurabilityType, +) (KeyValue, error) { b := txn.NewBatch() - b.GetForShare(key) + b.GetForShare(key, dur) return getOneRow(txn.Run(ctx, b), b) } @@ -595,12 +600,13 @@ func (txn *Txn) scan( maxRows int64, isReverse bool, str kvpb.KeyLockingStrengthType, + dur kvpb.KeyLockingDurabilityType, ) ([]KeyValue, error) { b := txn.NewBatch() if maxRows > 0 { b.Header.MaxSpanRequestKeys = maxRows } - b.scan(begin, end, isReverse, str) + b.scan(begin, end, isReverse, str, dur) r, err := getOneResult(txn.Run(ctx, b), b) return r.Rows, err } @@ -615,35 +621,35 @@ func (txn *Txn) scan( func (txn *Txn) Scan( ctx context.Context, begin, end interface{}, maxRows int64, ) ([]KeyValue, error) { - return txn.scan(ctx, begin, end, maxRows, false /* isReverse */, kvpb.NonLocking) + return txn.scan(ctx, begin, end, maxRows, false /* isReverse */, kvpb.NonLocking, kvpb.Invalid) } // ScanForUpdate retrieves the rows between begin (inclusive) and end -// (exclusive) in ascending order. Unreplicated, exclusive locks are acquired on -// each of the returned keys. +// (exclusive) in ascending order. Exclusive locks with the supplied durability +// are acquired on each of the returned keys. // // The returned []KeyValue will contain up to maxRows elements (or all results // when zero is supplied). // // key can be either a byte slice or a string. func (txn *Txn) ScanForUpdate( - ctx context.Context, begin, end interface{}, maxRows int64, + ctx context.Context, begin, end interface{}, maxRows int64, dur kvpb.KeyLockingDurabilityType, ) ([]KeyValue, error) { - return txn.scan(ctx, begin, end, maxRows, false /* isReverse */, kvpb.ForUpdate) + return txn.scan(ctx, begin, end, maxRows, false /* isReverse */, kvpb.ForUpdate, dur) } -// ScanForShare retrieves the rows between begin (inclusive) and end -// (exclusive) in ascending order. Unreplicated, shared locks are acquired on -// each of the returned keys. +// ScanForShare retrieves the rows between begin (inclusive) and end (exclusive) +// in ascending order. Shared locks[1] are acquired on each of the returned +// keys. // // The returned []KeyValue will contain up to maxRows elements (or all results // when zero is supplied). // // key can be either a byte slice or a string. func (txn *Txn) ScanForShare( - ctx context.Context, begin, end interface{}, maxRows int64, + ctx context.Context, begin, end interface{}, maxRows int64, dur kvpb.KeyLockingDurabilityType, ) ([]KeyValue, error) { - return txn.scan(ctx, begin, end, maxRows, false /* isReverse */, kvpb.ForShare) + return txn.scan(ctx, begin, end, maxRows, false /* isReverse */, kvpb.ForShare, dur) } // ReverseScan retrieves the rows between begin (inclusive) and end (exclusive) @@ -656,35 +662,35 @@ func (txn *Txn) ScanForShare( func (txn *Txn) ReverseScan( ctx context.Context, begin, end interface{}, maxRows int64, ) ([]KeyValue, error) { - return txn.scan(ctx, begin, end, maxRows, true /* isReverse */, kvpb.NonLocking) + return txn.scan(ctx, begin, end, maxRows, true /* isReverse */, kvpb.NonLocking, kvpb.Invalid) } // ReverseScanForUpdate retrieves the rows between begin (inclusive) and end -// (exclusive) in descending order. Unreplicated, exclusive locks are acquired -// on each of the returned keys. +// (exclusive) in descending order. Exclusive locks with the supplied durability +// are acquired on each of the returned keys. // // The returned []KeyValue will contain up to maxRows elements (or all results // when zero is supplied). // // key can be either a byte slice or a string. func (txn *Txn) ReverseScanForUpdate( - ctx context.Context, begin, end interface{}, maxRows int64, + ctx context.Context, begin, end interface{}, maxRows int64, dur kvpb.KeyLockingDurabilityType, ) ([]KeyValue, error) { - return txn.scan(ctx, begin, end, maxRows, true /* isReverse */, kvpb.ForUpdate) + return txn.scan(ctx, begin, end, maxRows, true /* isReverse */, kvpb.ForUpdate, dur) } // ReverseScanForShare retrieves the rows between begin (inclusive) and end -// (exclusive) in descending order. Unreplicated, shared locks are acquired -// on each of the returned keys. +// (exclusive) in descending order. Shared locks with the supplied durability +// are acquired on each of the returned keys. // // The returned []KeyValue will contain up to maxRows elements (or all results // when zero is supplied). // // key can be either a byte slice or a string. func (txn *Txn) ReverseScanForShare( - ctx context.Context, begin, end interface{}, maxRows int64, + ctx context.Context, begin, end interface{}, maxRows int64, dur kvpb.KeyLockingDurabilityType, ) ([]KeyValue, error) { - return txn.scan(ctx, begin, end, maxRows, true /* isReverse */, kvpb.ForShare) + return txn.scan(ctx, begin, end, maxRows, true /* isReverse */, kvpb.ForShare, dur) } // Iterate performs a paginated scan and applying the function f to every page. diff --git a/pkg/kv/txn_test.go b/pkg/kv/txn_test.go index c28d5fad939a..2566d31ad815 100644 --- a/pkg/kv/txn_test.go +++ b/pkg/kv/txn_test.go @@ -574,7 +574,7 @@ func TestTxnNegotiateAndSend(t *testing.T) { MinTimestampBound: ts10, } ba.RoutingPolicy = kvpb.RoutingPolicy_NEAREST - ba.Add(kvpb.NewGet(roachpb.Key("a"), kvpb.NonLocking)) + ba.Add(kvpb.NewGet(roachpb.Key("a"))) br, pErr := txn.NegotiateAndSend(ctx, ba) if fastPath { @@ -685,7 +685,7 @@ func TestTxnNegotiateAndSendWithDeadline(t *testing.T) { MaxTimestampBound: test.maxTSBound, } ba.RoutingPolicy = kvpb.RoutingPolicy_NEAREST - ba.Add(kvpb.NewGet(roachpb.Key("a"), kvpb.NonLocking)) + ba.Add(kvpb.NewGet(roachpb.Key("a"))) br, pErr := txn.NegotiateAndSend(ctx, ba) if test.expErr == "" { @@ -755,7 +755,7 @@ func TestTxnNegotiateAndSendWithResumeSpan(t *testing.T) { } ba.RoutingPolicy = kvpb.RoutingPolicy_NEAREST ba.MaxSpanRequestKeys = 2 - ba.Add(kvpb.NewScan(roachpb.Key("a"), roachpb.Key("d"), kvpb.NonLocking)) + ba.Add(kvpb.NewScan(roachpb.Key("a"), roachpb.Key("d"))) br, pErr := txn.NegotiateAndSend(ctx, ba) if fastPath { diff --git a/pkg/server/node_test.go b/pkg/server/node_test.go index 840a425d46ce..7cad18cbe5ed 100644 --- a/pkg/server/node_test.go +++ b/pkg/server/node_test.go @@ -713,7 +713,7 @@ func TestNodeBatchRequestPProfLabels(t *testing.T) { return labels }() - gr := kvpb.NewGet(roachpb.Key("a"), kvpb.NonLocking) + gr := kvpb.NewGet(roachpb.Key("a")) pr := kvpb.NewPut(gr.Header().Key, roachpb.Value{}) ba.Add(gr, pr) @@ -748,7 +748,7 @@ func TestNodeBatchRequestMetricsInc(t *testing.T) { ba.RangeID = 1 ba.Replica.StoreID = 1 - gr := kvpb.NewGet(roachpb.Key("a"), kvpb.NonLocking) + gr := kvpb.NewGet(roachpb.Key("a")) pr := kvpb.NewPut(gr.Header().Key, roachpb.Value{}) ba.Add(gr, pr) diff --git a/pkg/server/systemconfigwatcher/system_config_watcher_test.go b/pkg/server/systemconfigwatcher/system_config_watcher_test.go index 0b0e056a801e..41e1c7c68703 100644 --- a/pkg/server/systemconfigwatcher/system_config_watcher_test.go +++ b/pkg/server/systemconfigwatcher/system_config_watcher_test.go @@ -131,7 +131,6 @@ func getSystemDescriptorAndZonesSpans( kvpb.NewScan( append(codec.TenantPrefix(), startKey...), append(codec.TenantPrefix(), endKey...), - kvpb.NonLocking, ), ) br, pErr := kvDB.NonTransactionalSender().Send(ctx, ba) diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index c4a43e632482..121874b457a7 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -6394,7 +6394,7 @@ CREATE TABLE crdb_internal.lost_descriptors_with_data ( endPrefix := p.extendedEvalCtx.Codec.TablePrefix(uint32(endID - 1)).PrefixEnd() b := p.Txn().NewBatch() b.Header.MaxSpanRequestKeys = 1 - scanRequest := kvpb.NewScan(startPrefix, endPrefix, kvpb.NonLocking).(*kvpb.ScanRequest) + scanRequest := kvpb.NewScan(startPrefix, endPrefix).(*kvpb.ScanRequest) scanRequest.ScanFormat = kvpb.BATCH_RESPONSE b.AddRawRequest(scanRequest) err = p.execCfg.DB.Run(ctx, b) diff --git a/pkg/sql/tests/kv_test.go b/pkg/sql/tests/kv_test.go index ebc1573d58f7..822306b3015e 100644 --- a/pkg/sql/tests/kv_test.go +++ b/pkg/sql/tests/kv_test.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/clusterversion" kv2 "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -105,7 +106,7 @@ func (kv *kvNative) Update(rows, run int) error { // Don't permute the rows, to be similar to SQL which sorts the spans in a // batch. for i := 0; i < rows; i++ { - b.GetForUpdate(fmt.Sprintf("%s%08d", kv.prefix, i)) + b.GetForUpdate(fmt.Sprintf("%s%08d", kv.prefix, i), kvpb.BestEffort) } if err := txn.Run(ctx, b); err != nil { return err diff --git a/pkg/ts/server.go b/pkg/ts/server.go index 5fdb5d18513d..42d72971d4a1 100644 --- a/pkg/ts/server.go +++ b/pkg/ts/server.go @@ -489,7 +489,7 @@ func dumpTimeseriesAllSources( for span != nil { b := &kv.Batch{} - scan := kvpb.NewScan(span.Key, span.EndKey, kvpb.NonLocking) + scan := kvpb.NewScan(span.Key, span.EndKey) b.AddRawRequest(scan) b.Header.MaxSpanRequestKeys = dumpBatchSize err := db.Run(ctx, b) diff --git a/pkg/upgrade/upgrades/BUILD.bazel b/pkg/upgrade/upgrades/BUILD.bazel index 894554d7df28..490542269c39 100644 --- a/pkg/upgrade/upgrades/BUILD.bazel +++ b/pkg/upgrade/upgrades/BUILD.bazel @@ -51,6 +51,7 @@ go_library( "//pkg/keys", "//pkg/keyvisualizer/keyvisjob", "//pkg/kv", + "//pkg/kv/kvpb", "//pkg/multitenant/mtinfopb", "//pkg/roachpb", "//pkg/security/username", diff --git a/pkg/upgrade/upgrades/desc_id_sequence_for_system_tenant.go b/pkg/upgrade/upgrades/desc_id_sequence_for_system_tenant.go index 58248c5ede82..b3abc037788f 100644 --- a/pkg/upgrade/upgrades/desc_id_sequence_for_system_tenant.go +++ b/pkg/upgrade/upgrades/desc_id_sequence_for_system_tenant.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/upgrade" ) @@ -27,7 +28,7 @@ func descIDSequenceForSystemTenant( return nil } return d.DB.KV().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - oldEntry, err := txn.GetForUpdate(ctx, keys.LegacyDescIDGenerator) + oldEntry, err := txn.GetForUpdate(ctx, keys.LegacyDescIDGenerator, kvpb.BestEffort) if err != nil { return err } From e55210818ec22fc0bfad96c39b009770c00f3907 Mon Sep 17 00:00:00 2001 From: Arul Ajmani Date: Sat, 16 Sep 2023 15:59:13 -0400 Subject: [PATCH 3/3] kvserver: add a basic shared locks test This patch adds a very basic shared locks test using the KV client API. While here, we also take the opportunity to extend this test for replicated locks. Epic: none Release note: None --- pkg/kv/kvserver/client_replica_test.go | 61 ++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index f474930c7df0..52bafae59e25 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -4970,6 +4970,67 @@ func setupDBAndWriteAAndB(t *testing.T) (serverutils.TestServerInterface, *kv.DB return s, db } +// TestSharedLocksBasic tests basic shared lock semantics. In particular, it +// tests multiple shared locks are compatible with each other, but exclusive +// locks aren't. +func TestSharedLocksBasic(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + s, db := setupDBAndWriteAAndB(t) + defer s.Stopper().Stop(ctx) + + testutils.RunTrueAndFalse(t, "guaranteed-durability", func(t *testing.T, guaranteedDurability bool) { + txn1 := db.NewTxn(ctx, "txn1") + txn2 := db.NewTxn(ctx, "txn2") + + dur := kvpb.BestEffort + if guaranteedDurability { + dur = kvpb.GuaranteedDurability + } + + res, err := txn1.ScanForShare(ctx, "a", "c", 0, dur) + require.NoError(t, err) + require.Equal(t, 2, len(res)) + + _, err = txn2.ReverseScanForShare(ctx, "a", "c", 0, dur) + require.NoError(t, err) + require.Equal(t, 2, len(res)) + + ch := make(chan struct{}, 1) // we won't pull off the channel + var wg sync.WaitGroup + wg.Add(1) + + go func() { + defer wg.Done() + txn3 := db.NewTxn(ctx, "txn3") + res, err := txn3.GetForUpdate(ctx, "a", dur) + require.NoError(t, err) + ch <- struct{}{} + require.NotNil(t, res.Value) + require.NoError(t, txn3.Commit(ctx)) + }() + + ensureGetForUpdateIsBlocked := func() { + select { + case <-ch: + t.Fatal("expected GetForUpdate request to block") + case <-time.After(10 * time.Millisecond): + // sleep for a bit to allow the GetForUpdate to block. + } + } + ensureGetForUpdateIsBlocked() + require.NoError(t, txn1.Commit(ctx)) + // Finalizing just one of the shared locking transactions shouldn't unblock + // the GetForUpdate. + ensureGetForUpdateIsBlocked() + require.NoError(t, txn2.Rollback(ctx)) + + wg.Wait() + }) +} + // TestOptimisticEvalRetry tests the case where an optimistically evaluated // scan encounters contention from a concurrent txn holding unreplicated // exclusive locks, and therefore re-evaluates pessimistically, and eventually