From c926799fced26385d2bd2804e7fcdb7ff8b5b7a6 Mon Sep 17 00:00:00 2001 From: Arul Ajmani Date: Sat, 16 Sep 2023 15:24:43 -0400 Subject: [PATCH] kv: add lock durability options to the KV client API This patch adds lock durability flags to GetFor{Update,Share}, ScanFor{Update,Share}, and ReverseScanFor{Update,Share}, using which users of the KV client API can express lock durability goals. These then map to either replicated or unreplicated locks. This patch then modifies existing tests to make use of replicated locks in a few places. Epic: none Release note: None --- pkg/ccl/changefeedccl/kvfeed/scanner.go | 2 +- pkg/cli/debug_send_kv_batch_test.go | 4 +- pkg/kv/batch.go | 100 +++++--- pkg/kv/db.go | 72 +++--- pkg/kv/db_test.go | 226 +++++++++++------- .../kvcoord/dist_sender_ambiguous_test.go | 2 +- .../kvcoord/dist_sender_server_test.go | 28 +-- pkg/kv/kvclient/kvcoord/dist_sender_test.go | 53 ++-- pkg/kv/kvclient/kvcoord/transport_test.go | 2 +- .../txn_coord_sender_savepoints_test.go | 2 +- .../kvclient/kvcoord/txn_coord_sender_test.go | 56 +++-- pkg/kv/kvclient/kvstreamer/streamer_test.go | 2 +- pkg/kv/kvnemesis/applier.go | 74 +++--- pkg/kv/kvpb/api.go | 119 +++++++-- pkg/kv/kvpb/batch_test.go | 6 +- .../client_replica_circuit_breaker_test.go | 4 +- pkg/kv/kvserver/client_replica_test.go | 6 +- pkg/kv/kvserver/client_split_test.go | 4 +- pkg/kv/kvserver/replica_command.go | 11 +- pkg/kv/kvserver/replica_rankings_test.go | 2 +- pkg/kv/kvserver/replica_test.go | 8 +- pkg/kv/txn.go | 64 ++--- pkg/kv/txn_test.go | 6 +- pkg/server/node_test.go | 4 +- .../system_config_watcher_test.go | 1 - pkg/sql/crdb_internal.go | 2 +- pkg/sql/tests/kv_test.go | 3 +- pkg/ts/server.go | 2 +- pkg/upgrade/upgrades/BUILD.bazel | 1 + .../desc_id_sequence_for_system_tenant.go | 3 +- 30 files changed, 540 insertions(+), 329 deletions(-) diff --git a/pkg/ccl/changefeedccl/kvfeed/scanner.go b/pkg/ccl/changefeedccl/kvfeed/scanner.go index 0d42dde9e25b..db904e2a0f24 100644 --- a/pkg/ccl/changefeedccl/kvfeed/scanner.go +++ b/pkg/ccl/changefeedccl/kvfeed/scanner.go @@ -188,7 +188,7 @@ func (p *scanRequestScanner) exportSpan( for remaining := &span; remaining != nil; { start := timeutil.Now() b := txn.NewBatch() - r := kvpb.NewScan(remaining.Key, remaining.EndKey, kvpb.NonLocking).(*kvpb.ScanRequest) + r := kvpb.NewScan(remaining.Key, remaining.EndKey).(*kvpb.ScanRequest) r.ScanFormat = kvpb.BATCH_RESPONSE b.Header.TargetBytes = targetBytesPerScan b.AdmissionHeader = kvpb.AdmissionHeader{ diff --git a/pkg/cli/debug_send_kv_batch_test.go b/pkg/cli/debug_send_kv_batch_test.go index dfb50e71dd05..3f79d62f8463 100644 --- a/pkg/cli/debug_send_kv_batch_test.go +++ b/pkg/cli/debug_send_kv_batch_test.go @@ -37,7 +37,7 @@ func TestSendKVBatchExample(t *testing.T) { var ba kvpb.BatchRequest ba.Add(kvpb.NewPut(roachpb.Key("foo"), roachpb.MakeValueFromString("bar"))) - ba.Add(kvpb.NewGet(roachpb.Key("foo"), kvpb.NonLocking)) + ba.Add(kvpb.NewGet(roachpb.Key("foo"))) // NOTE: This cannot be marshaled using the standard Go JSON marshaler, // since it does not correctly (un)marshal the JSON as mandated by the @@ -72,7 +72,7 @@ func TestSendKVBatch(t *testing.T) { // Protobuf spec. Instead, use the JSON marshaler shipped with Protobuf. var ba kvpb.BatchRequest ba.Add(kvpb.NewPut(roachpb.Key("foo"), roachpb.MakeValueFromString("bar"))) - ba.Add(kvpb.NewGet(roachpb.Key("foo"), kvpb.NonLocking)) + ba.Add(kvpb.NewGet(roachpb.Key("foo"))) jsonpb := protoutil.JSONPb{} jsonProto, err := jsonpb.Marshal(&ba) diff --git a/pkg/kv/batch.go b/pkg/kv/batch.go index 76ff420426a4..92080093f422 100644 --- a/pkg/kv/batch.go +++ b/pkg/kv/batch.go @@ -379,13 +379,22 @@ func (b *Batch) AddRawRequest(reqs ...kvpb.Request) { } } -func (b *Batch) get(key interface{}, str kvpb.KeyLockingStrengthType) { +func (b *Batch) get( + key interface{}, str kvpb.KeyLockingStrengthType, dur kvpb.KeyLockingDurabilityType, +) { k, err := marshalKey(key) if err != nil { b.initResult(0, 1, notRaw, err) return } - b.appendReqs(kvpb.NewGet(k, str)) + switch str { + case kvpb.NonLocking: + b.appendReqs(kvpb.NewGet(k)) + case kvpb.ForShare, kvpb.ForUpdate: + b.appendReqs(kvpb.NewLockingGet(k, str, dur)) + default: + panic(errors.AssertionFailedf("unknown str %d", str)) + } b.initResult(1, 1, notRaw, nil) } @@ -397,31 +406,32 @@ func (b *Batch) get(key interface{}, str kvpb.KeyLockingStrengthType) { // // key can be either a byte slice or a string. func (b *Batch) Get(key interface{}) { - b.get(key, kvpb.NonLocking) + b.get(key, kvpb.NonLocking, kvpb.Invalid) } -// GetForUpdate retrieves the value for a key. An unreplicated, exclusive lock -// is acquired on the key, if it exists. A new result will be appended to the -// batch which will contain a single row. +// GetForUpdate retrieves the value for a key, returning the retrieved key/value +// or an error. An Exclusive lock with the supplied durability is acquired on +// the key, if it exists. It is not considered an error for the key not to +// exist. // // r, err := db.GetForUpdate("a") // // string(r.Rows[0].Key) == "a" // // key can be either a byte slice or a string. -func (b *Batch) GetForUpdate(key interface{}) { - b.get(key, kvpb.ForUpdate) +func (b *Batch) GetForUpdate(key interface{}, dur kvpb.KeyLockingDurabilityType) { + b.get(key, kvpb.ForUpdate, dur) } -// GetForShare retrieves the value for a key. An unreplicated, shared lock -// is acquired on the key, if it exists. A new result will be appended to the -// batch which will contain a single row. +// GetForShare retrieves the value for a key. A shared lock with the supplied +// durability is acquired on the key, if it exists. A new result will be +// appended to the batch which will contain a single row. // // r, err := db.GetForShare("a") // // string(r.Rows[0].Key) == "a" // // key can be either a byte slice or a string. -func (b *Batch) GetForShare(key interface{}) { - b.get(key, kvpb.ForShare) +func (b *Batch) GetForShare(key interface{}, dur kvpb.KeyLockingDurabilityType) { + b.get(key, kvpb.ForShare, dur) } func (b *Batch) put(key, value interface{}, inline bool) { @@ -728,7 +738,12 @@ func (b *Batch) Inc(key interface{}, value int64) { b.initResult(1, 1, notRaw, nil) } -func (b *Batch) scan(s, e interface{}, isReverse bool, str kvpb.KeyLockingStrengthType) { +func (b *Batch) scan( + s, e interface{}, + isReverse bool, + str kvpb.KeyLockingStrengthType, + dur kvpb.KeyLockingDurabilityType, +) { begin, err := marshalKey(s) if err != nil { b.initResult(0, 0, notRaw, err) @@ -739,10 +754,21 @@ func (b *Batch) scan(s, e interface{}, isReverse bool, str kvpb.KeyLockingStreng b.initResult(0, 0, notRaw, err) return } - if !isReverse { - b.appendReqs(kvpb.NewScan(begin, end, str)) - } else { - b.appendReqs(kvpb.NewReverseScan(begin, end, str)) + switch str { + case kvpb.NonLocking: + if !isReverse { + b.appendReqs(kvpb.NewScan(begin, end)) + } else { + b.appendReqs(kvpb.NewReverseScan(begin, end)) + } + case kvpb.ForShare, kvpb.ForUpdate: + if !isReverse { + b.appendReqs(kvpb.NewLockingScan(begin, end, str, dur)) + } else { + b.appendReqs(kvpb.NewLockingReverseScan(begin, end, str, dur)) + } + default: + panic(errors.AssertionFailedf("unknown str %d", str)) } b.initResult(1, 0, notRaw, nil) } @@ -755,31 +781,31 @@ func (b *Batch) scan(s, e interface{}, isReverse bool, str kvpb.KeyLockingStreng // // key can be either a byte slice or a string. func (b *Batch) Scan(s, e interface{}) { - b.scan(s, e, false /* isReverse */, kvpb.NonLocking) + b.scan(s, e, false /* isReverse */, kvpb.NonLocking, kvpb.Invalid) } -// ScanForUpdate retrieves the key/values between begin (inclusive) and end -// (exclusive) in ascending order. Unreplicated, exclusive locks are acquired on -// each of the returned keys. +// ScanForUpdate retrieves the rows between begin (inclusive) and end +// (exclusive) in ascending order. Exclusive locks with the supplied durability +// are acquired on each of the returned keys. // // A new result will be appended to the batch which will contain "rows" (each // row is a key/value pair) and Result.Err will indicate success or failure. // // key can be either a byte slice or a string. -func (b *Batch) ScanForUpdate(s, e interface{}) { - b.scan(s, e, false /* isReverse */, kvpb.ForUpdate) +func (b *Batch) ScanForUpdate(s, e interface{}, dur kvpb.KeyLockingDurabilityType) { + b.scan(s, e, false /* isReverse */, kvpb.ForUpdate, dur) } // ScanForShare retrieves the key/values between begin (inclusive) and end -// (exclusive) in ascending order. Unreplicated, shared locks are acquired on -// each of the returned keys. +// (exclusive) in ascending order. Shared locks with the supplied durability are +// acquired on each of the returned keys. // // A new result will be appended to the batch which will contain "rows" (each // row is a key/value pair) and Result.Err will indicate success or failure. // // key can be either a byte slice or a string. -func (b *Batch) ScanForShare(s, e interface{}) { - b.scan(s, e, false /* isReverse */, kvpb.ForShare) +func (b *Batch) ScanForShare(s, e interface{}, dur kvpb.KeyLockingDurabilityType) { + b.scan(s, e, false /* isReverse */, kvpb.ForShare, dur) } // ReverseScan retrieves the rows between begin (inclusive) and end (exclusive) @@ -790,31 +816,31 @@ func (b *Batch) ScanForShare(s, e interface{}) { // // key can be either a byte slice or a string. func (b *Batch) ReverseScan(s, e interface{}) { - b.scan(s, e, true /* isReverse */, kvpb.NonLocking) + b.scan(s, e, true /* isReverse */, kvpb.NonLocking, kvpb.Invalid) } // ReverseScanForUpdate retrieves the rows between begin (inclusive) and end -// (exclusive) in descending order. Unreplicated, exclusive locks are acquired -// on each of the returned keys. +// (exclusive) in descending order. Exclusive locks with the supplied durability +// are acquired on each of the returned keys. // // A new result will be appended to the batch which will contain "rows" (each // "row" is a key/value pair) and Result.Err will indicate success or failure. // // key can be either a byte slice or a string. -func (b *Batch) ReverseScanForUpdate(s, e interface{}) { - b.scan(s, e, true /* isReverse */, kvpb.ForUpdate) +func (b *Batch) ReverseScanForUpdate(s, e interface{}, dur kvpb.KeyLockingDurabilityType) { + b.scan(s, e, true /* isReverse */, kvpb.ForUpdate, dur) } // ReverseScanForShare retrieves the rows between begin (inclusive) and end -// (exclusive) in descending order. Unreplicated, shared locks are acquired -// on each of the returned keys. +// (exclusive) in descending order. Shared locks with the supplied durability +// are acquired on each of the returned keys. // // A new result will be appended to the batch which will contain "rows" (each // "row" is a key/value pair) and Result.Err will indicate success or failure. // // key can be either a byte slice or a string. -func (b *Batch) ReverseScanForShare(s, e interface{}) { - b.scan(s, e, true /* isReverse */, kvpb.ForShare) +func (b *Batch) ReverseScanForShare(s, e interface{}, dur kvpb.KeyLockingDurabilityType) { + b.scan(s, e, true /* isReverse */, kvpb.ForShare, dur) } // Del deletes one or more keys. diff --git a/pkg/kv/db.go b/pkg/kv/db.go index 2e887e378582..d9e2c74f2679 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -343,30 +343,35 @@ func (db *DB) Get(ctx context.Context, key interface{}) (KeyValue, error) { } // GetForUpdate retrieves the value for a key, returning the retrieved key/value -// or an error. An unreplicated, exclusive lock is acquired on the key, if it -// exists. It is not considered an error for the key not to exist. +// or an error. An Exclusive lock with the supplied durability is acquired on +// the key, if it exists. It is not considered an error for the key not to +// exist. // // r, err := db.GetForUpdate("a") // // string(r.Key) == "a" // // key can be either a byte slice or a string. -func (db *DB) GetForUpdate(ctx context.Context, key interface{}) (KeyValue, error) { +func (db *DB) GetForUpdate( + ctx context.Context, key interface{}, dur kvpb.KeyLockingDurabilityType, +) (KeyValue, error) { b := &Batch{} - b.GetForUpdate(key) + b.GetForUpdate(key, dur) return getOneRow(db.Run(ctx, b), b) } -// GetForShare retrieves the value for a key, returning the retrieved key/value -// or an error. An unreplicated, shared lock is acquired on the key, if it -// exists. It is not considered an error for the key not to exist. +// GetForShare retrieves the value for a key. A shared lock with the supplied +// durability is acquired on the key, if it exists. A new result will be +// appended to the batch which will contain a single row. // // r, err := db.GetForShare("a") // // string(r.Key) == "a" // // key can be either a byte slice or a string. -func (db *DB) GetForShare(ctx context.Context, key interface{}) (KeyValue, error) { +func (db *DB) GetForShare( + ctx context.Context, key interface{}, dur kvpb.KeyLockingDurabilityType, +) (KeyValue, error) { b := &Batch{} - b.GetForShare(key) + b.GetForShare(key, dur) return getOneRow(db.Run(ctx, b), b) } @@ -494,13 +499,14 @@ func (db *DB) scan( isReverse bool, str kvpb.KeyLockingStrengthType, readConsistency kvpb.ReadConsistencyType, + dur kvpb.KeyLockingDurabilityType, ) ([]KeyValue, error) { b := &Batch{} b.Header.ReadConsistency = readConsistency if maxRows > 0 { b.Header.MaxSpanRequestKeys = maxRows } - b.scan(begin, end, isReverse, str) + b.scan(begin, end, isReverse, str, dur) r, err := getOneResult(db.Run(ctx, b), b) return r.Rows, err } @@ -512,33 +518,37 @@ func (db *DB) scan( // // key can be either a byte slice or a string. func (db *DB) Scan(ctx context.Context, begin, end interface{}, maxRows int64) ([]KeyValue, error) { - return db.scan(ctx, begin, end, maxRows, false /* isReverse */, kvpb.NonLocking, kvpb.CONSISTENT) + return db.scan(ctx, begin, end, maxRows, false /* isReverse */, kvpb.NonLocking, kvpb.CONSISTENT, kvpb.Invalid) } // ScanForUpdate retrieves the rows between begin (inclusive) and end -// (exclusive) in ascending order. Unreplicated, exclusive locks are -// acquired on each of the returned keys. +// (exclusive) in ascending order. Exclusive locks with the supplied durability +// are acquired on each of the returned keys. // // The returned []KeyValue will contain up to maxRows elements. // // key can be either a byte slice or a string. func (db *DB) ScanForUpdate( - ctx context.Context, begin, end interface{}, maxRows int64, + ctx context.Context, begin, end interface{}, maxRows int64, dur kvpb.KeyLockingDurabilityType, ) ([]KeyValue, error) { - return db.scan(ctx, begin, end, maxRows, false /* isReverse */, kvpb.ForUpdate, kvpb.CONSISTENT) + return db.scan( + ctx, begin, end, maxRows, false /* isReverse */, kvpb.ForUpdate, kvpb.CONSISTENT, dur, + ) } -// ScanForShare retrieves the rows between begin (inclusive) and end -// (exclusive) in ascending order. Unreplicated, shared locks are -// acquired on each of the returned keys. +// ScanForShare retrieves the rows between begin (inclusive) and end (exclusive) +// in ascending order. Shared locks with the supplied durability are acquired on +// each of the returned keys. // // The returned []KeyValue will contain up to maxRows elements. // // key can be either a byte slice or a string. func (db *DB) ScanForShare( - ctx context.Context, begin, end interface{}, maxRows int64, + ctx context.Context, begin, end interface{}, maxRows int64, dur kvpb.KeyLockingDurabilityType, ) ([]KeyValue, error) { - return db.scan(ctx, begin, end, maxRows, false /* isReverse */, kvpb.ForShare, kvpb.CONSISTENT) + return db.scan( + ctx, begin, end, maxRows, false /* isReverse */, kvpb.ForShare, kvpb.CONSISTENT, dur, + ) } // ReverseScan retrieves the rows between begin (inclusive) and end (exclusive) @@ -550,33 +560,37 @@ func (db *DB) ScanForShare( func (db *DB) ReverseScan( ctx context.Context, begin, end interface{}, maxRows int64, ) ([]KeyValue, error) { - return db.scan(ctx, begin, end, maxRows, true /* isReverse */, kvpb.NonLocking, kvpb.CONSISTENT) + return db.scan(ctx, begin, end, maxRows, true /* isReverse */, kvpb.NonLocking, kvpb.CONSISTENT, kvpb.Invalid) } // ReverseScanForUpdate retrieves the rows between begin (inclusive) and end -// (exclusive) in descending order. Unreplicated, exclusive locks are acquired -// on each of the returned keys. +// (exclusive) in descending order. Exclusive locks with the supplied durability +// are acquired on each of the returned keys. // // The returned []KeyValue will contain up to maxRows elements. // // key can be either a byte slice or a string. func (db *DB) ReverseScanForUpdate( - ctx context.Context, begin, end interface{}, maxRows int64, + ctx context.Context, begin, end interface{}, maxRows int64, dur kvpb.KeyLockingDurabilityType, ) ([]KeyValue, error) { - return db.scan(ctx, begin, end, maxRows, true /* isReverse */, kvpb.ForUpdate, kvpb.CONSISTENT) + return db.scan( + ctx, begin, end, maxRows, true /* isReverse */, kvpb.ForUpdate, kvpb.CONSISTENT, dur, + ) } // ReverseScanForShare retrieves the rows between begin (inclusive) and end -// (exclusive) in descending order. Unreplicated, shared locks are acquired -// on each of the returned keys. +// (exclusive) in descending order. Shared locks with the supplied durability +// are acquired on each of the returned keys. // // The returned []KeyValue will contain up to maxRows elements. // // key can be either a byte slice or a string. func (db *DB) ReverseScanForShare( - ctx context.Context, begin, end interface{}, maxRows int64, + ctx context.Context, begin, end interface{}, maxRows int64, dur kvpb.KeyLockingDurabilityType, ) ([]KeyValue, error) { - return db.scan(ctx, begin, end, maxRows, true /* isReverse */, kvpb.ForShare, kvpb.CONSISTENT) + return db.scan( + ctx, begin, end, maxRows, true /* isReverse */, kvpb.ForShare, kvpb.CONSISTENT, dur, + ) } // Del deletes one or more keys. diff --git a/pkg/kv/db_test.go b/pkg/kv/db_test.go index 2f6617e9c216..99859532c961 100644 --- a/pkg/kv/db_test.go +++ b/pkg/kv/db_test.go @@ -111,26 +111,44 @@ func TestDB_GetForUpdate(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) s, db := setup(t) - defer s.Stopper().Stop(context.Background()) + ctx := context.Background() + defer s.Stopper().Stop(ctx) - result, err := db.GetForUpdate(context.Background(), "aa") - if err != nil { - t.Fatal(err) - } - checkResult(t, []byte(""), result.ValueBytes()) + testutils.RunTrueAndFalse(t, "durability-guaranteed", func(t *testing.T, durabilityGuaranteed bool) { + var result kv.KeyValue + var err error + if durabilityGuaranteed { + result, err = db.GetForUpdate(ctx, "aa", kvpb.GuaranteedDurability) + } else { + result, err = db.GetForUpdate(ctx, "aa", kvpb.BestEffort) + } + if err != nil { + t.Fatal(err) + } + checkResult(t, []byte(""), result.ValueBytes()) + }) } func TestDB_GetForShare(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) s, db := setup(t) - defer s.Stopper().Stop(context.Background()) + ctx := context.Background() + defer s.Stopper().Stop(ctx) - result, err := db.GetForShare(context.Background(), "aa") - if err != nil { - t.Fatal(err) - } - checkResult(t, []byte(""), result.ValueBytes()) + testutils.RunTrueAndFalse(t, "durability-guaranteed", func(t *testing.T, durabilityGuaranteed bool) { + var result kv.KeyValue + var err error + if durabilityGuaranteed { + result, err = db.GetForShare(ctx, "aa", kvpb.GuaranteedDurability) + } else { + result, err = db.GetForShare(ctx, "aa", kvpb.BestEffort) + } + if err != nil { + t.Fatal(err) + } + checkResult(t, []byte(""), result.ValueBytes()) + }) } func TestDB_Put(t *testing.T) { @@ -363,52 +381,70 @@ func TestDB_ScanForUpdate(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) s, db := setup(t) - defer s.Stopper().Stop(context.Background()) + ctx := context.Background() + defer s.Stopper().Stop(ctx) - b := &kv.Batch{} - b.Put("aa", "1") - b.Put("ab", "2") - b.Put("bb", "3") - if err := db.Run(context.Background(), b); err != nil { - t.Fatal(err) - } - rows, err := db.ScanForUpdate(context.Background(), "a", "b", 100) - if err != nil { - t.Fatal(err) - } - expected := map[string][]byte{ - "aa": []byte("1"), - "ab": []byte("2"), - } + testutils.RunTrueAndFalse(t, "durability-guaranteed", func(t *testing.T, durabilityGuaranteed bool) { + b := &kv.Batch{} + b.Put("aa", "1") + b.Put("ab", "2") + b.Put("bb", "3") + if err := db.Run(context.Background(), b); err != nil { + t.Fatal(err) + } + var rows []kv.KeyValue + var err error + if durabilityGuaranteed { + rows, err = db.ScanForUpdate(ctx, "a", "b", 100, kvpb.GuaranteedDurability) + } else { + rows, err = db.ScanForUpdate(ctx, "a", "b", 100, kvpb.BestEffort) + } + if err != nil { + t.Fatal(err) + } + expected := map[string][]byte{ + "aa": []byte("1"), + "ab": []byte("2"), + } - checkRows(t, expected, rows) - checkLen(t, len(expected), len(rows)) + checkRows(t, expected, rows) + checkLen(t, len(expected), len(rows)) + }) } func TestDB_ScanForShare(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) s, db := setup(t) - defer s.Stopper().Stop(context.Background()) + ctx := context.Background() + defer s.Stopper().Stop(ctx) - b := &kv.Batch{} - b.Put("aa", "1") - b.Put("ab", "2") - b.Put("bb", "3") - if err := db.Run(context.Background(), b); err != nil { - t.Fatal(err) - } - rows, err := db.ScanForShare(context.Background(), "a", "b", 100) - if err != nil { - t.Fatal(err) - } - expected := map[string][]byte{ - "aa": []byte("1"), - "ab": []byte("2"), - } + testutils.RunTrueAndFalse(t, "durability-guaranteed", func(t *testing.T, durabilityGuaranteed bool) { + b := &kv.Batch{} + b.Put("aa", "1") + b.Put("ab", "2") + b.Put("bb", "3") + if err := db.Run(ctx, b); err != nil { + t.Fatal(err) + } + var rows []kv.KeyValue + var err error + if durabilityGuaranteed { + rows, err = db.ScanForShare(ctx, "a", "b", 100, kvpb.GuaranteedDurability) + } else { + rows, err = db.ScanForShare(ctx, "a", "b", 100, kvpb.BestEffort) + } + if err != nil { + t.Fatal(err) + } + expected := map[string][]byte{ + "aa": []byte("1"), + "ab": []byte("2"), + } - checkRows(t, expected, rows) - checkLen(t, len(expected), len(rows)) + checkRows(t, expected, rows) + checkLen(t, len(expected), len(rows)) + }) } func TestDB_ReverseScan(t *testing.T) { @@ -441,52 +477,72 @@ func TestDB_ReverseScanForUpdate(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) s, db := setup(t) - defer s.Stopper().Stop(context.Background()) + ctx := context.Background() + defer s.Stopper().Stop(ctx) - b := &kv.Batch{} - b.Put("aa", "1") - b.Put("ab", "2") - b.Put("bb", "3") - if err := db.Run(context.Background(), b); err != nil { - t.Fatal(err) - } - rows, err := db.ReverseScanForUpdate(context.Background(), "ab", "c", 100) - if err != nil { - t.Fatal(err) - } - expected := map[string][]byte{ - "bb": []byte("3"), - "ab": []byte("2"), - } + testutils.RunTrueAndFalse(t, "durability-guaranteed", func(t *testing.T, durabilityGuaranteed bool) { + b := &kv.Batch{} + b.Put("aa", "1") + b.Put("ab", "2") + b.Put("bb", "3") + if err := db.Run(ctx, b); err != nil { + t.Fatal(err) + } + var rows []kv.KeyValue + var err error - checkRows(t, expected, rows) - checkLen(t, len(expected), len(rows)) + if durabilityGuaranteed { + rows, err = db.ReverseScanForUpdate(ctx, "ab", "c", 100, kvpb.GuaranteedDurability) + } else { + rows, err = db.ReverseScanForUpdate(ctx, "ab", "c", 100, kvpb.BestEffort) + } + if err != nil { + t.Fatal(err) + } + expected := map[string][]byte{ + "bb": []byte("3"), + "ab": []byte("2"), + } + + checkRows(t, expected, rows) + checkLen(t, len(expected), len(rows)) + }) } func TestDB_ReverseScanForShare(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) s, db := setup(t) - defer s.Stopper().Stop(context.Background()) + ctx := context.Background() + defer s.Stopper().Stop(ctx) - b := &kv.Batch{} - b.Put("aa", "1") - b.Put("ab", "2") - b.Put("bb", "3") - if err := db.Run(context.Background(), b); err != nil { - t.Fatal(err) - } - rows, err := db.ReverseScanForShare(context.Background(), "ab", "c", 100) - if err != nil { - t.Fatal(err) - } - expected := map[string][]byte{ - "bb": []byte("3"), - "ab": []byte("2"), - } + testutils.RunTrueAndFalse(t, "durability-guaranteed", func(t *testing.T, durabilityGuaranteed bool) { + b := &kv.Batch{} + b.Put("aa", "1") + b.Put("ab", "2") + b.Put("bb", "3") + if err := db.Run(ctx, b); err != nil { + t.Fatal(err) + } + var rows []kv.KeyValue + var err error - checkRows(t, expected, rows) - checkLen(t, len(expected), len(rows)) + if durabilityGuaranteed { + rows, err = db.ReverseScanForShare(ctx, "ab", "c", 100, kvpb.GuaranteedDurability) + } else { + rows, err = db.ReverseScanForShare(ctx, "ab", "c", 100, kvpb.BestEffort) + } + if err != nil { + t.Fatal(err) + } + expected := map[string][]byte{ + "bb": []byte("3"), + "ab": []byte("2"), + } + + checkRows(t, expected, rows) + checkLen(t, len(expected), len(rows)) + }) } func TestDB_TxnIterate(t *testing.T) { @@ -762,7 +818,7 @@ func TestDBDecommissionedOperations(t *testing.T) { return err }}, {"GetForUpdate", func() error { - _, err := db.GetForUpdate(ctx, key) + _, err := db.GetForUpdate(ctx, key, kvpb.BestEffort) return err }}, {"Put", func() error { diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go index 0c3b6041ce7e..781ade85153c 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go @@ -402,7 +402,7 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { getInBatch := func(t *testing.T, ctx context.Context, txn *kv.Txn, keys ...roachpb.Key) []int64 { batch := txn.NewBatch() for _, key := range keys { - batch.GetForUpdate(key) + batch.GetForUpdate(key, kvpb.BestEffort) } assert.NoError(t, txn.Run(ctx, batch)) assert.Len(t, batch.Results, len(keys)) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index 714c316f8fa0..ef9baa236ef4 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -1113,8 +1113,8 @@ func TestMultiRangeScanReverseScanInconsistent(t *testing.T) { // OpRequiresTxnError. We set the local clock to the timestamp of // just above the first key to verify it's used to read only key "a". for i, request := range []kvpb.Request{ - kvpb.NewScan(roachpb.Key("a"), roachpb.Key("c"), kvpb.NonLocking), - kvpb.NewReverseScan(roachpb.Key("a"), roachpb.Key("c"), kvpb.NonLocking), + kvpb.NewScan(roachpb.Key("a"), roachpb.Key("c")), + kvpb.NewReverseScan(roachpb.Key("a"), roachpb.Key("c")), } { // The looping is necessary since the Put above of a may not have been // applied by time we execute the scan. If it has not run, then try the @@ -1194,7 +1194,7 @@ func TestMultiRangeScanDeleteRange(t *testing.T) { if _, err := kv.SendWrapped(ctx, tds, put); err != nil { t.Fatal(err) } - scan := kvpb.NewScan(writes[0], writes[len(writes)-1].Next(), kvpb.NonLocking) + scan := kvpb.NewScan(writes[0], writes[len(writes)-1].Next()) reply, err := kv.SendWrapped(ctx, tds, scan) if err != nil { t.Fatal(err) @@ -1233,7 +1233,7 @@ func TestMultiRangeScanDeleteRange(t *testing.T) { txnProto := roachpb.MakeTransaction("MyTxn", nil, isolation.Serializable, 0, now.ToTimestamp(), 0, int32(s.SQLInstanceID()), 0) txn := kv.NewTxnFromProto(ctx, db, s.NodeID(), now, kv.RootTxn, &txnProto) - scan := kvpb.NewScan(writes[0], writes[len(writes)-1].Next(), kvpb.NonLocking) + scan := kvpb.NewScan(writes[0], writes[len(writes)-1].Next()) ba := &kvpb.BatchRequest{} ba.Header = kvpb.Header{Txn: &txnProto} ba.Add(scan) @@ -1351,7 +1351,7 @@ func TestMultiRangeScanWithPagination(t *testing.T) { // happens above this. var maxTargetBytes int64 { - scan := kvpb.NewScan(tc.keys[0], tc.keys[len(tc.keys)-1].Next(), kvpb.NonLocking) + scan := kvpb.NewScan(tc.keys[0], tc.keys[len(tc.keys)-1].Next()) resp, pErr := kv.SendWrapped(ctx, tds, scan) require.Nil(t, pErr) require.Nil(t, resp.Header().ResumeSpan) @@ -1392,11 +1392,11 @@ func TestMultiRangeScanWithPagination(t *testing.T) { var req kvpb.Request switch { case span.EndKey == nil: - req = kvpb.NewGet(span.Key, kvpb.NonLocking) + req = kvpb.NewGet(span.Key) case reverse: - req = kvpb.NewReverseScan(span.Key, span.EndKey, kvpb.NonLocking) + req = kvpb.NewReverseScan(span.Key, span.EndKey) default: - req = kvpb.NewScan(span.Key, span.EndKey, kvpb.NonLocking) + req = kvpb.NewScan(span.Key, span.EndKey) } ba.Add(req) } @@ -3036,7 +3036,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { return db.Put(ctx, "a", "put") }, retryable: func(ctx context.Context, txn *kv.Txn) error { - _, err := txn.ScanForUpdate(ctx, "a", "a\x00", 0) + _, err := txn.ScanForUpdate(ctx, "a", "a\x00", 0, kvpb.BestEffort) return err }, allIsoLevels: &expect{ @@ -3050,7 +3050,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { return db.Put(ctx, "a", "put") }, retryable: func(ctx context.Context, txn *kv.Txn) error { - _, err := txn.ScanForUpdate(ctx, "a", "a\x00", 0) + _, err := txn.ScanForUpdate(ctx, "a", "a\x00", 0, kvpb.BestEffort) return err }, priorReads: true, @@ -3081,7 +3081,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { return db.Put(ctx, "a", "put") }, retryable: func(ctx context.Context, txn *kv.Txn) error { - _, err := txn.ScanForUpdate(ctx, "a", "c", 0) + _, err := txn.ScanForUpdate(ctx, "a", "c", 0, kvpb.BestEffort) return err }, allIsoLevels: &expect{ @@ -3096,7 +3096,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { return db.Put(ctx, "b", "put") }, retryable: func(ctx context.Context, txn *kv.Txn) error { - _, err := txn.ScanForUpdate(ctx, "a", "c", 0) + _, err := txn.ScanForUpdate(ctx, "a", "c", 0, kvpb.BestEffort) return err }, allIsoLevels: &expect{ @@ -3112,8 +3112,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { }, retryable: func(ctx context.Context, txn *kv.Txn) error { b := txn.NewBatch() - b.ScanForUpdate("a", "a\x00") - b.ScanForUpdate("b", "b\x00") + b.ScanForUpdate("a", "a\x00", kvpb.BestEffort) + b.ScanForUpdate("b", "b\x00", kvpb.BestEffort) return txn.Run(ctx, b) }, allIsoLevels: &expect{ diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index cd41e176982f..27ee10f1a8dd 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -489,7 +489,7 @@ func TestSendRPCOrder(t *testing.T) { RangeID: rangeID, // Not used in this test, but why not. RoutingPolicy: tc.routingPolicy, } - req := kvpb.NewScan(roachpb.Key("a"), roachpb.Key("b"), kvpb.NonLocking) + req := kvpb.NewScan(roachpb.Key("a"), roachpb.Key("b")) _, pErr := kv.SendWrappedWith(ctx, ds, header, req) require.Nil(t, pErr) }) @@ -950,7 +950,7 @@ func TestNoBackoffOnNotLeaseHolderErrorFromFollowerRead(t *testing.T) { Lease: lease, }) - get := kvpb.NewGet(roachpb.Key("a"), kvpb.NonLocking) + get := kvpb.NewGet(roachpb.Key("a")) _, pErr := kv.SendWrapped(ctx, ds, get) require.Nil(t, pErr) require.Equal(t, []roachpb.NodeID{1, 2}, sentTo) @@ -1022,7 +1022,7 @@ func TestNoBackoffOnNotLeaseHolderErrorWithoutLease(t *testing.T) { // Send a request. It should try all three replicas once: the first two fail // with NLHE, the third one succeeds. None of them should trigger backoffs. - _, pErr := kv.SendWrapped(ctx, ds, kvpb.NewGet(roachpb.Key("a"), kvpb.NonLocking)) + _, pErr := kv.SendWrapped(ctx, ds, kvpb.NewGet(roachpb.Key("a"))) require.NoError(t, pErr.GoError()) require.Equal(t, []roachpb.NodeID{1, 2, 3}, sentTo) require.Equal(t, int64(0), ds.Metrics().InLeaseTransferBackoffs.Count()) @@ -1118,7 +1118,7 @@ func TestDistSenderMovesOnFromReplicaWithStaleLease(t *testing.T) { Lease: cachedLease, }) - get := kvpb.NewGet(roachpb.Key("a"), kvpb.NonLocking) + get := kvpb.NewGet(roachpb.Key("a")) _, pErr := kv.SendWrapped(ctx, ds, get) require.Nil(t, pErr) @@ -1240,7 +1240,7 @@ func TestDistSenderIgnoresNLHEBasedOnOldRangeGeneration(t *testing.T) { Lease: cachedLease, }) - get := kvpb.NewGet(roachpb.Key("a"), kvpb.NonLocking) + get := kvpb.NewGet(roachpb.Key("a")) _, pErr := kv.SendWrapped(ctx, ds, get) require.Nil(t, pErr) @@ -1349,7 +1349,7 @@ func TestDistSenderRetryOnTransportErrors(t *testing.T) { Lease: cachedLease, }) - get := kvpb.NewGet(roachpb.Key("a"), kvpb.NonLocking) + get := kvpb.NewGet(roachpb.Key("a")) _, pErr := kv.SendWrapped(ctx, ds, get) if spec.shouldRetry { require.True(t, secondReplicaTried, "Second replica was not retried") @@ -1903,7 +1903,7 @@ func TestRetryOnWrongReplicaError(t *testing.T) { Settings: cluster.MakeTestingClusterSettings(), } ds := NewDistSender(cfg) - scan := kvpb.NewScan(roachpb.Key("a"), roachpb.Key("d"), kvpb.NonLocking) + scan := kvpb.NewScan(roachpb.Key("a"), roachpb.Key("d")) if _, err := kv.SendWrapped(context.Background(), ds, scan); err != nil { t.Errorf("scan encountered error: %s", err) } @@ -2007,7 +2007,7 @@ func TestRetryOnWrongReplicaErrorWithSuggestion(t *testing.T) { RPCRetryOptions: &retry.Options{MaxRetries: 1}, } ds := NewDistSender(cfg) - scan := kvpb.NewScan(roachpb.Key("a"), roachpb.Key("d"), kvpb.NonLocking) + scan := kvpb.NewScan(roachpb.Key("a"), roachpb.Key("d")) if _, err := kv.SendWrapped(context.Background(), ds, scan); err != nil { t.Errorf("scan encountered error: %s", err) } @@ -2127,7 +2127,7 @@ func TestSendRPCRetry(t *testing.T) { Settings: cluster.MakeTestingClusterSettings(), } ds := NewDistSender(cfg) - scan := kvpb.NewScan(roachpb.Key("a"), roachpb.Key("d"), kvpb.NonLocking) + scan := kvpb.NewScan(roachpb.Key("a"), roachpb.Key("d")) sr, err := kv.SendWrappedWith(ctx, ds, kvpb.Header{MaxSpanRequestKeys: 1}, scan) if err != nil { t.Fatal(err) @@ -2253,7 +2253,7 @@ func TestDistSenderDescriptorUpdatesOnSuccessfulRPCs(t *testing.T) { // Send a request that's going to receive a response with a RangeInfo. k := roachpb.Key("a") - get := kvpb.NewGet(k, kvpb.NonLocking) + get := kvpb.NewGet(k) ba := &kvpb.BatchRequest{} ba.Add(get) _, pErr := ds.Send(ctx, ba) @@ -2366,7 +2366,7 @@ func TestSendRPCRangeNotFoundError(t *testing.T) { Settings: cluster.MakeTestingClusterSettings(), } ds = NewDistSender(cfg) - get := kvpb.NewGet(roachpb.Key("b"), kvpb.NonLocking) + get := kvpb.NewGet(roachpb.Key("b")) _, err := kv.SendWrapped(ctx, ds, get) if err != nil { t.Fatal(err) @@ -2462,8 +2462,8 @@ func TestMultiRangeGapReverse(t *testing.T) { ba := &kvpb.BatchRequest{} ba.Txn = &txn - ba.Add(kvpb.NewReverseScan(splits[0], splits[1], kvpb.NonLocking)) - ba.Add(kvpb.NewReverseScan(splits[2], splits[3], kvpb.NonLocking)) + ba.Add(kvpb.NewReverseScan(splits[0], splits[1])) + ba.Add(kvpb.NewReverseScan(splits[2], splits[3])) // Before fixing https://github.com/cockroachdb/cockroach/issues/18174, this // would error with: @@ -2564,7 +2564,7 @@ func TestMultiRangeMergeStaleDescriptor(t *testing.T) { Settings: cluster.MakeTestingClusterSettings(), } ds := NewDistSender(cfg) - scan := kvpb.NewScan(roachpb.Key("a"), roachpb.Key("d"), kvpb.NonLocking) + scan := kvpb.NewScan(roachpb.Key("a"), roachpb.Key("d")) // Set the Txn info to avoid an OpRequiresTxnError. reply, err := kv.SendWrappedWith(ctx, ds, kvpb.Header{ MaxSpanRequestKeys: 10, @@ -2918,7 +2918,6 @@ func TestTruncateWithLocalSpanAndDescriptor(t *testing.T) { ba.Add(kvpb.NewScan( keys.RangeDescriptorKey(roachpb.RKey("a")), keys.RangeDescriptorKey(roachpb.RKey("c")), - kvpb.NonLocking, )) if _, pErr := ds.Send(ctx, ba); pErr != nil { @@ -4038,19 +4037,19 @@ func TestCanSendToFollower(t *testing.T) { kvpb.Header{ Txn: &roachpb.Transaction{}, }, - kvpb.NewGet(roachpb.Key("a"), kvpb.NonLocking), + kvpb.NewGet(roachpb.Key("a")), 1, }, { true, kvpb.Header{}, - kvpb.NewGet(roachpb.Key("a"), kvpb.NonLocking), + kvpb.NewGet(roachpb.Key("a")), 1, }, { false, kvpb.Header{}, - kvpb.NewGet(roachpb.Key("a"), kvpb.NonLocking), + kvpb.NewGet(roachpb.Key("a")), 2, }, } { @@ -4241,7 +4240,7 @@ func TestEvictMetaRange(t *testing.T) { } ds := NewDistSender(cfg) - scan := kvpb.NewScan(roachpb.Key("a"), roachpb.Key("b"), kvpb.NonLocking) + scan := kvpb.NewScan(roachpb.Key("a"), roachpb.Key("b")) if _, pErr := kv.SendWrapped(ctx, ds, scan); pErr != nil { t.Fatalf("scan encountered error: %s", pErr) } @@ -4256,7 +4255,7 @@ func TestEvictMetaRange(t *testing.T) { // Simulate a split on the meta2 range and mark it as stale. isStale = true - scan = kvpb.NewScan(roachpb.Key("b"), roachpb.Key("c"), kvpb.NonLocking) + scan = kvpb.NewScan(roachpb.Key("b"), roachpb.Key("c")) if _, pErr := kv.SendWrapped(ctx, ds, scan); pErr != nil { t.Fatalf("scan encountered error: %s", pErr) } @@ -4570,13 +4569,13 @@ func TestRequestSubdivisionAfterDescriptorChange(t *testing.T) { splitKey := keys.MustAddr(keyB) get := func(k roachpb.Key) kvpb.Request { - return kvpb.NewGet(k, kvpb.NonLocking) + return kvpb.NewGet(k) } scan := func(k roachpb.Key) kvpb.Request { - return kvpb.NewScan(k, k.Next(), kvpb.NonLocking) + return kvpb.NewScan(k, k.Next()) } revScan := func(k roachpb.Key) kvpb.Request { - return kvpb.NewReverseScan(k, k.Next(), kvpb.NonLocking) + return kvpb.NewReverseScan(k, k.Next()) } for _, tc := range []struct { @@ -4709,7 +4708,7 @@ func TestRequestSubdivisionAfterDescriptorChangeWithUnavailableReplicasTerminate splitKey := keys.MustAddr(keyB) get := func(k roachpb.Key) kvpb.Request { - return kvpb.NewGet(k, kvpb.NonLocking) + return kvpb.NewGet(k) } ctx := context.Background() @@ -4802,13 +4801,13 @@ func TestDescriptorChangeAfterRequestSubdivision(t *testing.T) { laterSplitKey2 := keys.MustAddr(keyD) get := func(k roachpb.Key) kvpb.Request { - return kvpb.NewGet(k, kvpb.NonLocking) + return kvpb.NewGet(k) } scan := func(k roachpb.Key) kvpb.Request { - return kvpb.NewScan(k, k.Next(), kvpb.NonLocking) + return kvpb.NewScan(k, k.Next()) } revScan := func(k roachpb.Key) kvpb.Request { - return kvpb.NewReverseScan(k, k.Next(), kvpb.NonLocking) + return kvpb.NewReverseScan(k, k.Next()) } for _, tc := range []struct { diff --git a/pkg/kv/kvclient/kvcoord/transport_test.go b/pkg/kv/kvclient/kvcoord/transport_test.go index 4e5e1e3528d0..afec40d4a1f0 100644 --- a/pkg/kv/kvclient/kvcoord/transport_test.go +++ b/pkg/kv/kvclient/kvcoord/transport_test.go @@ -154,7 +154,7 @@ func TestResponseVerifyFailure(t *testing.T) { } ba := &kvpb.BatchRequest{} - req := kvpb.NewScan(roachpb.KeyMin, roachpb.KeyMax, kvpb.NonLocking) + req := kvpb.NewScan(roachpb.KeyMin, roachpb.KeyMax) ba.Add(req) br := ba.CreateReply() resp := br.Responses[0].GetInner().(*kvpb.ScanResponse) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go index 3c0ccce64341..a4ebf555c1e9 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go @@ -174,7 +174,7 @@ func TestSavepoints(t *testing.T) { case "get": b := txn.NewBatch() if td.HasArg("locking") { - b.GetForUpdate(td.CmdArgs[0].Key) + b.GetForUpdate(td.CmdArgs[0].Key, kvpb.BestEffort) } else { b.Get(td.CmdArgs[0].Key) } diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index 09f318436d38..86c0d8aed0ad 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -1139,7 +1139,9 @@ func TestTxnCoordSenderNoDuplicateLockSpans(t *testing.T) { txn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */) // Acquire locks on a-b, c, m, u-w before the final batch. - _, pErr := txn.ScanForShare(ctx, roachpb.Key("a"), roachpb.Key("b"), 0) + _, pErr := txn.ScanForShare( + ctx, roachpb.Key("a"), roachpb.Key("b"), 0, kvpb.GuaranteedDurability, + ) if pErr != nil { t.Fatal(pErr) } @@ -1147,7 +1149,7 @@ func TestTxnCoordSenderNoDuplicateLockSpans(t *testing.T) { if pErr != nil { t.Fatal(pErr) } - _, pErr = txn.GetForUpdate(ctx, roachpb.Key("m")) + _, pErr = txn.GetForUpdate(ctx, roachpb.Key("m"), kvpb.GuaranteedDurability) if pErr != nil { t.Fatal(pErr) } @@ -1161,8 +1163,8 @@ func TestTxnCoordSenderNoDuplicateLockSpans(t *testing.T) { b.Put(roachpb.Key("b"), []byte("value")) b.Put(roachpb.Key("c"), []byte("value")) b.Put(roachpb.Key("d"), []byte("value")) - b.GetForUpdate(roachpb.Key("n")) - b.ReverseScanForShare(roachpb.Key("v"), roachpb.Key("z")) + b.GetForUpdate(roachpb.Key("n"), kvpb.GuaranteedDurability) + b.ReverseScanForShare(roachpb.Key("v"), roachpb.Key("z"), kvpb.GuaranteedDurability) // The expected locks are a-b, c, m, n, and u-z. expectedLockSpans = []roachpb.Span{ @@ -1872,7 +1874,7 @@ func TestOnePCErrorTracking(t *testing.T) { } b := txn.NewBatch() b.Put(keyA, "test value") - b.ScanForUpdate(keyB, keyC) + b.ScanForUpdate(keyB, keyC, kvpb.BestEffort) if err := txn.CommitInBatch(ctx, b); !testutils.IsError(err, "injected err") { t.Fatal(err) } @@ -2150,7 +2152,7 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) { if write { err = txn.Put(ctx, "consider", "phlebas") } else /* locking read */ { - _, err = txn.ScanForUpdate(ctx, "a", "b", 0) + _, err = txn.ScanForUpdate(ctx, "a", "b", 0, kvpb.BestEffort) } if err == nil { t.Fatal("missing injected retriable error") @@ -2879,24 +2881,32 @@ func TestTxnTypeCompatibleWithBatchRequest(t *testing.T) { // A LeafTxn is not compatible with locking requests. // 1. Locking Get requests. - _, err = leafTxn.GetForUpdate(ctx, roachpb.Key("a")) + _, err = leafTxn.GetForUpdate(ctx, roachpb.Key("a"), kvpb.GuaranteedDurability) require.Error(t, err) require.Regexp(t, "LeafTxn .* incompatible with locking request .*", err) - _, err = leafTxn.GetForShare(ctx, roachpb.Key("a")) + _, err = leafTxn.GetForShare(ctx, roachpb.Key("a"), kvpb.GuaranteedDurability) require.Error(t, err) require.Regexp(t, "LeafTxn .* incompatible with locking request .*", err) // 2. Locking Scan requests. - _, err = leafTxn.ScanForUpdate(ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */) + _, err = leafTxn.ScanForUpdate( + ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */, kvpb.GuaranteedDurability, + ) require.Error(t, err) require.Regexp(t, "LeafTxn .* incompatible with locking request .*", err) - _, err = leafTxn.ScanForShare(ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */) + _, err = leafTxn.ScanForShare( + ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */, kvpb.GuaranteedDurability, + ) require.Error(t, err) require.Regexp(t, "LeafTxn .* incompatible with locking request .*", err) // 3. Locking ReverseScan requests. - _, err = leafTxn.ReverseScanForUpdate(ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */) + _, err = leafTxn.ReverseScanForUpdate( + ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */, kvpb.GuaranteedDurability, + ) require.Error(t, err) require.Regexp(t, "LeafTxn .* incompatible with locking request .*", err) - _, err = leafTxn.ReverseScanForShare(ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */) + _, err = leafTxn.ReverseScanForShare( + ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */, kvpb.GuaranteedDurability, + ) require.Error(t, err) require.Regexp(t, "LeafTxn .* incompatible with locking request .*", err) // 4. Writes. @@ -2913,23 +2923,31 @@ func TestTxnTypeCompatibleWithBatchRequest(t *testing.T) { // A RootTxn is compatible with all requests. // 1. All types of Get requests. - _, err = rootTxn.GetForUpdate(ctx, roachpb.Key("a")) + _, err = rootTxn.GetForUpdate(ctx, roachpb.Key("a"), kvpb.GuaranteedDurability) require.NoError(t, err) - _, err = rootTxn.GetForShare(ctx, roachpb.Key("a")) + _, err = rootTxn.GetForShare(ctx, roachpb.Key("a"), kvpb.GuaranteedDurability) require.NoError(t, err) _, err = rootTxn.Get(ctx, roachpb.Key("a")) require.NoError(t, err) // 2. All types of Scan requests. - _, err = rootTxn.ScanForUpdate(ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */) + _, err = rootTxn.ScanForUpdate( + ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */, kvpb.GuaranteedDurability, + ) require.NoError(t, err) - _, err = rootTxn.ScanForShare(ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */) + _, err = rootTxn.ScanForShare( + ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */, kvpb.GuaranteedDurability, + ) require.NoError(t, err) _, err = rootTxn.Scan(ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */) require.NoError(t, err) - // 3. All types of ReverseScan requets. - _, err = rootTxn.ReverseScanForUpdate(ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */) + // 3. All types of ReverseScan requests. + _, err = rootTxn.ReverseScanForUpdate( + ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */, kvpb.GuaranteedDurability, + ) require.NoError(t, err) - _, err = rootTxn.ReverseScanForShare(ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */) + _, err = rootTxn.ReverseScanForShare( + ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */, kvpb.GuaranteedDurability, + ) require.NoError(t, err) _, err = rootTxn.ReverseScan(ctx, roachpb.Key("a"), roachpb.Key("d"), 0 /* maxRows */) require.NoError(t, err) diff --git a/pkg/kv/kvclient/kvstreamer/streamer_test.go b/pkg/kv/kvclient/kvstreamer/streamer_test.go index 4889fbf5b44d..d479e51297cf 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer_test.go +++ b/pkg/kv/kvclient/kvstreamer/streamer_test.go @@ -90,7 +90,7 @@ func TestStreamerLimitations(t *testing.T) { defer streamer.Close(ctx) streamer.Init(kvstreamer.OutOfOrder, kvstreamer.Hints{UniqueRequests: true}, 1 /* maxKeysPerRow */, nil /* diskBuffer */) k := append(s.Codec().TenantPrefix(), roachpb.Key("key")...) - get := kvpb.NewGet(k, kvpb.NonLocking) + get := kvpb.NewGet(k) reqs := []kvpb.RequestUnion{{ Value: &kvpb.RequestUnion_Get{ Get: get.(*kvpb.GetRequest), diff --git a/pkg/kv/kvnemesis/applier.go b/pkg/kv/kvnemesis/applier.go index 56799bd7a34c..ba7326bb393b 100644 --- a/pkg/kv/kvnemesis/applier.go +++ b/pkg/kv/kvnemesis/applier.go @@ -237,15 +237,15 @@ type dbRunI interface { type clientI interface { dbRunI Get(context.Context, interface{}) (kv.KeyValue, error) - GetForUpdate(context.Context, interface{}) (kv.KeyValue, error) - GetForShare(context.Context, interface{}) (kv.KeyValue, error) + GetForUpdate(context.Context, interface{}, kvpb.KeyLockingDurabilityType) (kv.KeyValue, error) + GetForShare(context.Context, interface{}, kvpb.KeyLockingDurabilityType) (kv.KeyValue, error) Put(context.Context, interface{}, interface{}) error Scan(context.Context, interface{}, interface{}, int64) ([]kv.KeyValue, error) - ScanForUpdate(context.Context, interface{}, interface{}, int64) ([]kv.KeyValue, error) - ScanForShare(context.Context, interface{}, interface{}, int64) ([]kv.KeyValue, error) + ScanForUpdate(context.Context, interface{}, interface{}, int64, kvpb.KeyLockingDurabilityType) ([]kv.KeyValue, error) + ScanForShare(context.Context, interface{}, interface{}, int64, kvpb.KeyLockingDurabilityType) ([]kv.KeyValue, error) ReverseScan(context.Context, interface{}, interface{}, int64) ([]kv.KeyValue, error) - ReverseScanForUpdate(context.Context, interface{}, interface{}, int64) ([]kv.KeyValue, error) - ReverseScanForShare(context.Context, interface{}, interface{}, int64) ([]kv.KeyValue, error) + ReverseScanForUpdate(context.Context, interface{}, interface{}, int64, kvpb.KeyLockingDurabilityType) ([]kv.KeyValue, error) + ReverseScanForShare(context.Context, interface{}, interface{}, int64, kvpb.KeyLockingDurabilityType) ([]kv.KeyValue, error) Del(context.Context, ...interface{}) ([]roachpb.Key, error) DelRange(context.Context, interface{}, interface{}, bool) ([]roachpb.Key, error) } @@ -280,18 +280,18 @@ func batchRun( func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) { switch o := op.GetValue().(type) { case *GetOperation: - fn := (*kv.Batch).Get - if o.ForUpdate { - fn = (*kv.Batch).GetForUpdate - } - if o.ForShare { - fn = (*kv.Batch).GetForShare - } res, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) { if o.SkipLocked { b.Header.WaitPolicy = lock.WaitPolicy_SkipLocked } - fn(b, o.Key) + dur := kvpb.BestEffort + if o.ForUpdate { + b.GetForUpdate(o.Key, dur) + } else if o.ForShare { + b.GetForShare(o.Key, dur) + } else { + b.Get(o.Key) + } }) o.Result = resultInit(ctx, err) if err != nil { @@ -316,29 +316,28 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) { } o.Result.OptionalTimestamp = ts case *ScanOperation: - fn := (*kv.Batch).Scan - if o.Reverse { - if o.ForUpdate { - fn = (*kv.Batch).ReverseScanForUpdate - } else if o.ForShare { - fn = (*kv.Batch).ReverseScanForShare - } else { - fn = (*kv.Batch).ReverseScan - } - } else { - if o.ForUpdate { - fn = (*kv.Batch).ScanForUpdate - } else if o.ForShare { - fn = (*kv.Batch).ScanForShare - } else { - fn = (*kv.Batch).Scan - } - } res, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) { if o.SkipLocked { b.Header.WaitPolicy = lock.WaitPolicy_SkipLocked } - fn(b, o.Key, o.EndKey) + dur := kvpb.BestEffort + if o.Reverse { + if o.ForUpdate { + b.ReverseScanForUpdate(o.Key, o.EndKey, dur) + } else if o.ForShare { + b.ReverseScanForShare(o.Key, o.EndKey, dur) + } else { + b.ReverseScan(o.Key, o.EndKey) + } + } else { + if o.ForUpdate { + b.ScanForUpdate(o.Key, o.EndKey, dur) + } else if o.ForShare { + b.ScanForShare(o.Key, o.EndKey, dur) + } else { + b.Scan(o.Key, o.EndKey) + } + } }) o.Result = resultInit(ctx, err) if err != nil { @@ -443,8 +442,10 @@ func applyBatchOp( for i := range o.Ops { switch subO := o.Ops[i].GetValue().(type) { case *GetOperation: + // TODO(arul): Looks like I forgot to add shared locks here. + dur := kvpb.BestEffort if subO.ForUpdate { - b.GetForUpdate(subO.Key) + b.GetForUpdate(subO.Key, dur) } else { b.Get(subO.Key) } @@ -452,12 +453,13 @@ func applyBatchOp( b.Put(subO.Key, subO.Value()) setLastReqSeq(b, subO.Seq) case *ScanOperation: + dur := kvpb.BestEffort if subO.Reverse && subO.ForUpdate { - b.ReverseScanForUpdate(subO.Key, subO.EndKey) + b.ReverseScanForUpdate(subO.Key, subO.EndKey, dur) } else if subO.Reverse { b.ReverseScan(subO.Key, subO.EndKey) } else if subO.ForUpdate { - b.ScanForUpdate(subO.Key, subO.EndKey) + b.ScanForUpdate(subO.Key, subO.EndKey, dur) } else { b.Scan(subO.Key, subO.EndKey) } diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index 9c975999a36b..f8efbc409895 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -1189,15 +1189,29 @@ func (r *IsSpanEmptyRequest) ShallowCopy() Request { return &shallowCopy } -// NewGet returns a Request initialized to get the value at key. If -// forUpdate is true, an unreplicated, exclusive lock is acquired on on -// the key, if it exists. -func NewGet(key roachpb.Key, str KeyLockingStrengthType) Request { +// NewLockingGet returns a Request initialized to get the value at key. A lock +// corresponding to the supplied lock strength and durability is acquired on the +// key, if it exists. +func NewLockingGet( + key roachpb.Key, str KeyLockingStrengthType, dur KeyLockingDurabilityType, +) Request { return &GetRequest{ RequestHeader: RequestHeader{ Key: key, }, - KeyLockingStrength: scanLockStrength(str), + KeyLockingStrength: scanLockStrength(str), + KeyLockingDurability: scanLockDurability(dur), + } +} + +// NewGet returns a Request initialized to get the value at key. No lock is +// acquired on the key, even if it exists. +func NewGet(key roachpb.Key) Request { + return &GetRequest{ + RequestHeader: RequestHeader{ + Key: key, + }, + KeyLockingStrength: lock.None, } } @@ -1314,29 +1328,59 @@ func NewDeleteRange(startKey, endKey roachpb.Key, returnKeys bool) Request { } } -// NewScan returns a Request initialized to scan from start to end keys. -// If forUpdate is true, unreplicated, exclusive locks are acquired on -// each of the resulting keys. -func NewScan(key, endKey roachpb.Key, str KeyLockingStrengthType) Request { +// NewLockingScan returns a Request initialized to scan from start to end keys. +// A lock corresponding to the supplied lock strength and durability will be +// acquired on each of the resulting keys. +func NewLockingScan( + key, endKey roachpb.Key, str KeyLockingStrengthType, dur KeyLockingDurabilityType, +) Request { + return &ScanRequest{ + RequestHeader: RequestHeader{ + Key: key, + EndKey: endKey, + }, + KeyLockingStrength: scanLockStrength(str), + KeyLockingDurability: scanLockDurability(dur), + } +} + +// NewScan returns a Request initialized to scan from start to end keys. No +// locks will be acquired on the resulting keys. +func NewScan(key, endKey roachpb.Key) Request { return &ScanRequest{ RequestHeader: RequestHeader{ Key: key, EndKey: endKey, }, - KeyLockingStrength: scanLockStrength(str), + KeyLockingStrength: lock.None, + } +} + +// NewLockingReverseScan returns a Request initialized to reverse scan from end. +// A lock corresponding to the supplied lock strength and durability will be +// acquired on each of the resulting keys. +func NewLockingReverseScan( + key, endKey roachpb.Key, str KeyLockingStrengthType, dur KeyLockingDurabilityType, +) Request { + return &ReverseScanRequest{ + RequestHeader: RequestHeader{ + Key: key, + EndKey: endKey, + }, + KeyLockingStrength: scanLockStrength(str), + KeyLockingDurability: scanLockDurability(dur), } } -// NewReverseScan returns a Request initialized to reverse scan from end. -// If forUpdate is true, unreplicated, exclusive locks are acquired on -// each of the resulting keys. -func NewReverseScan(key, endKey roachpb.Key, str KeyLockingStrengthType) Request { +// NewReverseScan returns a Request initialized to reverse scan from end. No +// locks will be acquired on each of the resulting keys. +func NewReverseScan(key, endKey roachpb.Key) Request { return &ReverseScanRequest{ RequestHeader: RequestHeader{ Key: key, EndKey: endKey, }, - KeyLockingStrength: scanLockStrength(str), + KeyLockingStrength: lock.None, } } @@ -1362,10 +1406,40 @@ const ( ForUpdate ) +// KeyLockingDurabilityType is used to describe the durability goals of per-key +// locks acquired by locking Get, Scan, and ReverseScan requests. +type KeyLockingDurabilityType int8 + +const ( + // Invalid is meant to be used in places where supplying lock durability does + // not make sense. Notably, it's used by non-locking requests. + Invalid KeyLockingDurabilityType = iota + // BestEffort makes a best-effort attempt to hold any locks acquired until + // commit time. Locks are held in-memory on the leaseholder of the locked key; + // locks may be lost because of things like lease transfers, node restarts, + // range splits, and range merges. However, the unreplicated nature of these + // locks makes lock acquisition fast, making this a great choice for + // transactions that do not require locks for correctness -- serializable + // transactions. + BestEffort + // GuaranteedDurability guarantees that if the transaction commits then any + // locks acquired by it will be held until commit time. On commit, once the + // locks are released, no subsequent writers will be able to write at or below + // the transaction's commit timestamp -- regardless of the timestamp at which + // the lock was acquired. Simply put, once acquired, locks are guaranteed to + // provide protection until commit time. + // + // To provide this guarantee, locks are replicated -- which means they come + // with the performance penalties of doing so. They're attractive choices for + // transactions that require locks for correctness (read: read-committed, + // snapshot isolation). + GuaranteedDurability +) + func scanLockStrength(str KeyLockingStrengthType) lock.Strength { switch str { case NonLocking: - return lock.None + panic(fmt.Sprintf("unexpected strength %d", str)) case ForShare: return lock.Shared case ForUpdate: @@ -1375,6 +1449,19 @@ func scanLockStrength(str KeyLockingStrengthType) lock.Strength { } } +func scanLockDurability(dur KeyLockingDurabilityType) lock.Durability { + switch dur { + case Invalid: + panic("invalid lock durability") + case BestEffort: + return lock.Unreplicated + case GuaranteedDurability: + return lock.Replicated + default: + panic(fmt.Sprintf("unknown durability type: %d", dur)) + } +} + func flagForLockStrength(l lock.Strength) flag { if l != lock.None { return isLocking diff --git a/pkg/kv/kvpb/batch_test.go b/pkg/kv/kvpb/batch_test.go index 13b7d34e6770..a835f01cc4f5 100644 --- a/pkg/kv/kvpb/batch_test.go +++ b/pkg/kv/kvpb/batch_test.go @@ -344,9 +344,9 @@ func TestRefreshSpanIterate(t *testing.T) { func TestRefreshSpanIterateSkipLocked(t *testing.T) { ba := BatchRequest{} - ba.Add(NewGet(roachpb.Key("a"), NonLocking)) - ba.Add(NewScan(roachpb.Key("b"), roachpb.Key("d"), NonLocking)) - ba.Add(NewReverseScan(roachpb.Key("e"), roachpb.Key("g"), NonLocking)) + ba.Add(NewGet(roachpb.Key("a"))) + ba.Add(NewScan(roachpb.Key("b"), roachpb.Key("d"))) + ba.Add(NewReverseScan(roachpb.Key("e"), roachpb.Key("g"))) br := ba.CreateReply() // Without a SkipLocked wait policy. diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go index 32572389bb01..ae8e526f7235 100644 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -1069,14 +1069,14 @@ func (cbt *circuitBreakerTest) Write(idx int) error { func (cbt *circuitBreakerTest) Read(idx int) error { cbt.t.Helper() repl := cbt.repls[idx] - get := kvpb.NewGet(repl.Desc().StartKey.AsRawKey(), kvpb.NonLocking) + get := kvpb.NewGet(repl.Desc().StartKey.AsRawKey()) return cbt.Send(idx, get) } func (cbt *circuitBreakerTest) FollowerRead(idx int) error { cbt.t.Helper() repl := cbt.repls[idx] - get := kvpb.NewGet(repl.Desc().StartKey.AsRawKey(), kvpb.NonLocking) + get := kvpb.NewGet(repl.Desc().StartKey.AsRawKey()) ctx := context.Background() ts := repl.GetCurrentClosedTimestamp(ctx) return cbt.SendCtxTS(ctx, idx, get, ts) diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 5911155898e0..f474930c7df0 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -4983,7 +4983,7 @@ func TestOptimisticEvalRetry(t *testing.T) { defer s.Stopper().Stop(ctx) txn1 := db.NewTxn(ctx, "locking txn") - _, err := txn1.ScanForUpdate(ctx, "a", "c", 0) + _, err := txn1.ScanForUpdate(ctx, "a", "c", 0, kvpb.BestEffort) require.NoError(t, err) readDone := make(chan error) @@ -5038,7 +5038,7 @@ func TestOptimisticEvalNoContention(t *testing.T) { defer s.Stopper().Stop(ctx) txn1 := db.NewTxn(ctx, "locking txn") - _, err := txn1.ScanForUpdate(ctx, "b", "c", 0) + _, err := txn1.ScanForUpdate(ctx, "b", "c", 0, kvpb.BestEffort) require.NoError(t, err) readDone := make(chan error) @@ -5165,7 +5165,7 @@ func BenchmarkOptimisticEvalForLocks(b *testing.B) { go func() { for { txn := db.NewTxn(ctx, "locking txn") - _, err = txn.ScanForUpdate(ctx, lockStart, "c", 0) + _, err = txn.ScanForUpdate(ctx, lockStart, "c", 0, kvpb.BestEffort) require.NoError(b, err) time.Sleep(5 * time.Millisecond) // Normally, it would do a write here, but we don't bother. diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index b2f440781278..5bff9fb4d475 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -448,12 +448,12 @@ func TestQueryLocksAcrossRanges(t *testing.T) { txn1 := kv.NewTxn(ctx, db, s.NodeID()) err = txn1.Put(ctx, roachpb.Key("c"), []byte("baz")) require.NoError(t, err) - _, err = txn1.GetForUpdate(ctx, roachpb.Key("x")) + _, err = txn1.GetForUpdate(ctx, roachpb.Key("x"), kvpb.BestEffort) require.NoError(t, err) // Use txn2 to get an unreplicated lock on "p". txn2 := kv.NewTxn(ctx, db, s.NodeID()) - _, err = txn2.GetForUpdate(ctx, roachpb.Key("p")) + _, err = txn2.GetForUpdate(ctx, roachpb.Key("p"), kvpb.BestEffort) require.NoError(t, err) now := s.Clock().NowAsClockTimestamp() diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index c45c41f1bf90..0dea06bc0066 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -640,7 +640,7 @@ func (r *Replica) AdminMerge( // shortly. var rightDesc roachpb.RangeDescriptor rightDescKey := keys.RangeDescriptorKey(origLeftDesc.EndKey) - dbRightDescKV, err := txn.GetForUpdate(ctx, rightDescKey) + dbRightDescKV, err := txn.GetForUpdate(ctx, rightDescKey, kvpb.BestEffort) if err != nil { return err } @@ -3290,12 +3290,13 @@ func conditionalGetDescValueFromDB( forUpdate bool, check func(*roachpb.RangeDescriptor) (matched, skip bool), ) (kvDesc *roachpb.RangeDescriptor, kvDescBytes []byte, skip bool, err error) { - get := txn.Get + descKey := keys.RangeDescriptorKey(startKey) + var existingDescKV kv.KeyValue if forUpdate { - get = txn.GetForUpdate + existingDescKV, err = txn.GetForUpdate(ctx, descKey, kvpb.BestEffort) + } else { + existingDescKV, err = txn.Get(ctx, descKey) } - descKey := keys.RangeDescriptorKey(startKey) - existingDescKV, err := get(ctx, descKey) if err != nil { return nil, nil, false /* skip */, errors.Wrap(err, "fetching current range descriptor value") } diff --git a/pkg/kv/kvserver/replica_rankings_test.go b/pkg/kv/kvserver/replica_rankings_test.go index 4562742cc2d8..c99e0a7e1ed2 100644 --- a/pkg/kv/kvserver/replica_rankings_test.go +++ b/pkg/kv/kvserver/replica_rankings_test.go @@ -199,7 +199,7 @@ func TestAddSSTQPSStat(t *testing.T) { // genVariableRead returns a batch request containing, start-end sequential key reads. func genVariableRead(ctx context.Context, start, end roachpb.Key) *kvpb.BatchRequest { - scan := kvpb.NewScan(start, end, kvpb.NonLocking) + scan := kvpb.NewScan(start, end) readBa := &kvpb.BatchRequest{} readBa.Add(scan) return readBa diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 0ed77816bd38..4c202827b389 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -11085,7 +11085,7 @@ func TestReplicaNotifyLockTableOn1PC(t *testing.T) { txn := newTransaction("test", key, 1, tc.Clock()) ba := &kvpb.BatchRequest{} ba.Header = kvpb.Header{Txn: txn} - ba.Add(kvpb.NewScan(key, key.Next(), kvpb.ForUpdate)) + ba.Add(kvpb.NewLockingScan(key, key.Next(), kvpb.ForUpdate, kvpb.BestEffort)) if _, pErr := tc.Sender().Send(ctx, ba); pErr != nil { t.Fatalf("unexpected error: %s", pErr) } @@ -11182,9 +11182,9 @@ func TestReplicaAsyncIntentResolutionOn1PC(t *testing.T) { // Perform one or more "for update" gets. This should acquire unreplicated, // exclusive locks on the keys. b := txn.NewBatch() - b.GetForUpdate(keyA) + b.GetForUpdate(keyA, kvpb.BestEffort) if external { - b.GetForUpdate(keyB) + b.GetForUpdate(keyB, kvpb.BestEffort) } err = txn.Run(ctx, b) require.NoError(t, err) @@ -11244,7 +11244,7 @@ func TestReplicaQueryLocks(t *testing.T) { txn := newTransaction("test", keyA, 1, tc.Clock()) ba := &kvpb.BatchRequest{} ba.Header = kvpb.Header{Txn: txn} - ba.Add(kvpb.NewScan(keyA, keyB.Next(), kvpb.ForUpdate)) + ba.Add(kvpb.NewLockingScan(keyA, keyB.Next(), kvpb.ForUpdate, kvpb.BestEffort)) if _, pErr := tc.Sender().Send(ctx, ba); pErr != nil { t.Fatalf("unexpected error: %s", pErr) } diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index d9c3551adf32..50bd3bd45de0 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -473,30 +473,35 @@ func (txn *Txn) Get(ctx context.Context, key interface{}) (KeyValue, error) { } // GetForUpdate retrieves the value for a key, returning the retrieved key/value -// or an error. An unreplicated, exclusive lock is acquired on the key, if it -// exists. It is not considered an error for the key to not exist. +// or an error. An Exclusive lock with the supplied durability is acquired on +// the key, if it exists. It is not considered an error for the key not to +// exist. // // r, err := txn.GetForUpdate("a") // // string(r.Key) == "a" // // key can be either a byte slice or a string. -func (txn *Txn) GetForUpdate(ctx context.Context, key interface{}) (KeyValue, error) { +func (txn *Txn) GetForUpdate( + ctx context.Context, key interface{}, dur kvpb.KeyLockingDurabilityType, +) (KeyValue, error) { b := txn.NewBatch() - b.GetForUpdate(key) + b.GetForUpdate(key, dur) return getOneRow(txn.Run(ctx, b), b) } -// GetForShare retrieves the value for a key, returning the retrieved key/value -// or an error. An unreplicated, shared lock is acquired on the key, if it -// exists. It is not considered an error for the key to not exist. +// GetForShare retrieves the value for a key. A shared lock with the supplied +// durability is acquired on the key, if it exists. A new result will be +// appended to the batch which will contain a single row. // // r, err := txn.GetForShare("a") // // string(r.Key) == "a" // // key can be either a byte slice or a string. -func (txn *Txn) GetForShare(ctx context.Context, key interface{}) (KeyValue, error) { +func (txn *Txn) GetForShare( + ctx context.Context, key interface{}, dur kvpb.KeyLockingDurabilityType, +) (KeyValue, error) { b := txn.NewBatch() - b.GetForShare(key) + b.GetForShare(key, dur) return getOneRow(txn.Run(ctx, b), b) } @@ -595,12 +600,13 @@ func (txn *Txn) scan( maxRows int64, isReverse bool, str kvpb.KeyLockingStrengthType, + dur kvpb.KeyLockingDurabilityType, ) ([]KeyValue, error) { b := txn.NewBatch() if maxRows > 0 { b.Header.MaxSpanRequestKeys = maxRows } - b.scan(begin, end, isReverse, str) + b.scan(begin, end, isReverse, str, dur) r, err := getOneResult(txn.Run(ctx, b), b) return r.Rows, err } @@ -615,35 +621,35 @@ func (txn *Txn) scan( func (txn *Txn) Scan( ctx context.Context, begin, end interface{}, maxRows int64, ) ([]KeyValue, error) { - return txn.scan(ctx, begin, end, maxRows, false /* isReverse */, kvpb.NonLocking) + return txn.scan(ctx, begin, end, maxRows, false /* isReverse */, kvpb.NonLocking, kvpb.Invalid) } // ScanForUpdate retrieves the rows between begin (inclusive) and end -// (exclusive) in ascending order. Unreplicated, exclusive locks are acquired on -// each of the returned keys. +// (exclusive) in ascending order. Exclusive locks with the supplied durability +// are acquired on each of the returned keys. // // The returned []KeyValue will contain up to maxRows elements (or all results // when zero is supplied). // // key can be either a byte slice or a string. func (txn *Txn) ScanForUpdate( - ctx context.Context, begin, end interface{}, maxRows int64, + ctx context.Context, begin, end interface{}, maxRows int64, dur kvpb.KeyLockingDurabilityType, ) ([]KeyValue, error) { - return txn.scan(ctx, begin, end, maxRows, false /* isReverse */, kvpb.ForUpdate) + return txn.scan(ctx, begin, end, maxRows, false /* isReverse */, kvpb.ForUpdate, dur) } -// ScanForShare retrieves the rows between begin (inclusive) and end -// (exclusive) in ascending order. Unreplicated, shared locks are acquired on -// each of the returned keys. +// ScanForShare retrieves the rows between begin (inclusive) and end (exclusive) +// in ascending order. Shared locks[1] are acquired on each of the returned +// keys. // // The returned []KeyValue will contain up to maxRows elements (or all results // when zero is supplied). // // key can be either a byte slice or a string. func (txn *Txn) ScanForShare( - ctx context.Context, begin, end interface{}, maxRows int64, + ctx context.Context, begin, end interface{}, maxRows int64, dur kvpb.KeyLockingDurabilityType, ) ([]KeyValue, error) { - return txn.scan(ctx, begin, end, maxRows, false /* isReverse */, kvpb.ForShare) + return txn.scan(ctx, begin, end, maxRows, false /* isReverse */, kvpb.ForShare, dur) } // ReverseScan retrieves the rows between begin (inclusive) and end (exclusive) @@ -656,35 +662,35 @@ func (txn *Txn) ScanForShare( func (txn *Txn) ReverseScan( ctx context.Context, begin, end interface{}, maxRows int64, ) ([]KeyValue, error) { - return txn.scan(ctx, begin, end, maxRows, true /* isReverse */, kvpb.NonLocking) + return txn.scan(ctx, begin, end, maxRows, true /* isReverse */, kvpb.NonLocking, kvpb.Invalid) } // ReverseScanForUpdate retrieves the rows between begin (inclusive) and end -// (exclusive) in descending order. Unreplicated, exclusive locks are acquired -// on each of the returned keys. +// (exclusive) in descending order. Exclusive locks with the supplied durability +// are acquired on each of the returned keys. // // The returned []KeyValue will contain up to maxRows elements (or all results // when zero is supplied). // // key can be either a byte slice or a string. func (txn *Txn) ReverseScanForUpdate( - ctx context.Context, begin, end interface{}, maxRows int64, + ctx context.Context, begin, end interface{}, maxRows int64, dur kvpb.KeyLockingDurabilityType, ) ([]KeyValue, error) { - return txn.scan(ctx, begin, end, maxRows, true /* isReverse */, kvpb.ForUpdate) + return txn.scan(ctx, begin, end, maxRows, true /* isReverse */, kvpb.ForUpdate, dur) } // ReverseScanForShare retrieves the rows between begin (inclusive) and end -// (exclusive) in descending order. Unreplicated, shared locks are acquired -// on each of the returned keys. +// (exclusive) in descending order. Shared locks with the supplied durability +// are acquired on each of the returned keys. // // The returned []KeyValue will contain up to maxRows elements (or all results // when zero is supplied). // // key can be either a byte slice or a string. func (txn *Txn) ReverseScanForShare( - ctx context.Context, begin, end interface{}, maxRows int64, + ctx context.Context, begin, end interface{}, maxRows int64, dur kvpb.KeyLockingDurabilityType, ) ([]KeyValue, error) { - return txn.scan(ctx, begin, end, maxRows, true /* isReverse */, kvpb.ForShare) + return txn.scan(ctx, begin, end, maxRows, true /* isReverse */, kvpb.ForShare, dur) } // Iterate performs a paginated scan and applying the function f to every page. diff --git a/pkg/kv/txn_test.go b/pkg/kv/txn_test.go index c28d5fad939a..2566d31ad815 100644 --- a/pkg/kv/txn_test.go +++ b/pkg/kv/txn_test.go @@ -574,7 +574,7 @@ func TestTxnNegotiateAndSend(t *testing.T) { MinTimestampBound: ts10, } ba.RoutingPolicy = kvpb.RoutingPolicy_NEAREST - ba.Add(kvpb.NewGet(roachpb.Key("a"), kvpb.NonLocking)) + ba.Add(kvpb.NewGet(roachpb.Key("a"))) br, pErr := txn.NegotiateAndSend(ctx, ba) if fastPath { @@ -685,7 +685,7 @@ func TestTxnNegotiateAndSendWithDeadline(t *testing.T) { MaxTimestampBound: test.maxTSBound, } ba.RoutingPolicy = kvpb.RoutingPolicy_NEAREST - ba.Add(kvpb.NewGet(roachpb.Key("a"), kvpb.NonLocking)) + ba.Add(kvpb.NewGet(roachpb.Key("a"))) br, pErr := txn.NegotiateAndSend(ctx, ba) if test.expErr == "" { @@ -755,7 +755,7 @@ func TestTxnNegotiateAndSendWithResumeSpan(t *testing.T) { } ba.RoutingPolicy = kvpb.RoutingPolicy_NEAREST ba.MaxSpanRequestKeys = 2 - ba.Add(kvpb.NewScan(roachpb.Key("a"), roachpb.Key("d"), kvpb.NonLocking)) + ba.Add(kvpb.NewScan(roachpb.Key("a"), roachpb.Key("d"))) br, pErr := txn.NegotiateAndSend(ctx, ba) if fastPath { diff --git a/pkg/server/node_test.go b/pkg/server/node_test.go index 840a425d46ce..7cad18cbe5ed 100644 --- a/pkg/server/node_test.go +++ b/pkg/server/node_test.go @@ -713,7 +713,7 @@ func TestNodeBatchRequestPProfLabels(t *testing.T) { return labels }() - gr := kvpb.NewGet(roachpb.Key("a"), kvpb.NonLocking) + gr := kvpb.NewGet(roachpb.Key("a")) pr := kvpb.NewPut(gr.Header().Key, roachpb.Value{}) ba.Add(gr, pr) @@ -748,7 +748,7 @@ func TestNodeBatchRequestMetricsInc(t *testing.T) { ba.RangeID = 1 ba.Replica.StoreID = 1 - gr := kvpb.NewGet(roachpb.Key("a"), kvpb.NonLocking) + gr := kvpb.NewGet(roachpb.Key("a")) pr := kvpb.NewPut(gr.Header().Key, roachpb.Value{}) ba.Add(gr, pr) diff --git a/pkg/server/systemconfigwatcher/system_config_watcher_test.go b/pkg/server/systemconfigwatcher/system_config_watcher_test.go index 0b0e056a801e..41e1c7c68703 100644 --- a/pkg/server/systemconfigwatcher/system_config_watcher_test.go +++ b/pkg/server/systemconfigwatcher/system_config_watcher_test.go @@ -131,7 +131,6 @@ func getSystemDescriptorAndZonesSpans( kvpb.NewScan( append(codec.TenantPrefix(), startKey...), append(codec.TenantPrefix(), endKey...), - kvpb.NonLocking, ), ) br, pErr := kvDB.NonTransactionalSender().Send(ctx, ba) diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index c4a43e632482..121874b457a7 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -6394,7 +6394,7 @@ CREATE TABLE crdb_internal.lost_descriptors_with_data ( endPrefix := p.extendedEvalCtx.Codec.TablePrefix(uint32(endID - 1)).PrefixEnd() b := p.Txn().NewBatch() b.Header.MaxSpanRequestKeys = 1 - scanRequest := kvpb.NewScan(startPrefix, endPrefix, kvpb.NonLocking).(*kvpb.ScanRequest) + scanRequest := kvpb.NewScan(startPrefix, endPrefix).(*kvpb.ScanRequest) scanRequest.ScanFormat = kvpb.BATCH_RESPONSE b.AddRawRequest(scanRequest) err = p.execCfg.DB.Run(ctx, b) diff --git a/pkg/sql/tests/kv_test.go b/pkg/sql/tests/kv_test.go index ebc1573d58f7..822306b3015e 100644 --- a/pkg/sql/tests/kv_test.go +++ b/pkg/sql/tests/kv_test.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/clusterversion" kv2 "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -105,7 +106,7 @@ func (kv *kvNative) Update(rows, run int) error { // Don't permute the rows, to be similar to SQL which sorts the spans in a // batch. for i := 0; i < rows; i++ { - b.GetForUpdate(fmt.Sprintf("%s%08d", kv.prefix, i)) + b.GetForUpdate(fmt.Sprintf("%s%08d", kv.prefix, i), kvpb.BestEffort) } if err := txn.Run(ctx, b); err != nil { return err diff --git a/pkg/ts/server.go b/pkg/ts/server.go index 5fdb5d18513d..42d72971d4a1 100644 --- a/pkg/ts/server.go +++ b/pkg/ts/server.go @@ -489,7 +489,7 @@ func dumpTimeseriesAllSources( for span != nil { b := &kv.Batch{} - scan := kvpb.NewScan(span.Key, span.EndKey, kvpb.NonLocking) + scan := kvpb.NewScan(span.Key, span.EndKey) b.AddRawRequest(scan) b.Header.MaxSpanRequestKeys = dumpBatchSize err := db.Run(ctx, b) diff --git a/pkg/upgrade/upgrades/BUILD.bazel b/pkg/upgrade/upgrades/BUILD.bazel index 894554d7df28..490542269c39 100644 --- a/pkg/upgrade/upgrades/BUILD.bazel +++ b/pkg/upgrade/upgrades/BUILD.bazel @@ -51,6 +51,7 @@ go_library( "//pkg/keys", "//pkg/keyvisualizer/keyvisjob", "//pkg/kv", + "//pkg/kv/kvpb", "//pkg/multitenant/mtinfopb", "//pkg/roachpb", "//pkg/security/username", diff --git a/pkg/upgrade/upgrades/desc_id_sequence_for_system_tenant.go b/pkg/upgrade/upgrades/desc_id_sequence_for_system_tenant.go index 58248c5ede82..b3abc037788f 100644 --- a/pkg/upgrade/upgrades/desc_id_sequence_for_system_tenant.go +++ b/pkg/upgrade/upgrades/desc_id_sequence_for_system_tenant.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/upgrade" ) @@ -27,7 +28,7 @@ func descIDSequenceForSystemTenant( return nil } return d.DB.KV().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - oldEntry, err := txn.GetForUpdate(ctx, keys.LegacyDescIDGenerator) + oldEntry, err := txn.GetForUpdate(ctx, keys.LegacyDescIDGenerator, kvpb.BestEffort) if err != nil { return err }