Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: requests that acquire repl locks should use read-write path #110279

Merged
merged 3 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/kvfeed/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (p *scanRequestScanner) exportSpan(
for remaining := &span; remaining != nil; {
start := timeutil.Now()
b := txn.NewBatch()
r := kvpb.NewScan(remaining.Key, remaining.EndKey, kvpb.NonLocking).(*kvpb.ScanRequest)
r := kvpb.NewScan(remaining.Key, remaining.EndKey).(*kvpb.ScanRequest)
r.ScanFormat = kvpb.BATCH_RESPONSE
b.Header.TargetBytes = targetBytesPerScan
b.AdmissionHeader = kvpb.AdmissionHeader{
Expand Down
4 changes: 2 additions & 2 deletions pkg/cli/debug_send_kv_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestSendKVBatchExample(t *testing.T) {

var ba kvpb.BatchRequest
ba.Add(kvpb.NewPut(roachpb.Key("foo"), roachpb.MakeValueFromString("bar")))
ba.Add(kvpb.NewGet(roachpb.Key("foo"), kvpb.NonLocking))
ba.Add(kvpb.NewGet(roachpb.Key("foo")))

// NOTE: This cannot be marshaled using the standard Go JSON marshaler,
// since it does not correctly (un)marshal the JSON as mandated by the
Expand Down Expand Up @@ -72,7 +72,7 @@ func TestSendKVBatch(t *testing.T) {
// Protobuf spec. Instead, use the JSON marshaler shipped with Protobuf.
var ba kvpb.BatchRequest
ba.Add(kvpb.NewPut(roachpb.Key("foo"), roachpb.MakeValueFromString("bar")))
ba.Add(kvpb.NewGet(roachpb.Key("foo"), kvpb.NonLocking))
ba.Add(kvpb.NewGet(roachpb.Key("foo")))

jsonpb := protoutil.JSONPb{}
jsonProto, err := jsonpb.Marshal(&ba)
Expand Down
100 changes: 63 additions & 37 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,13 +379,22 @@ func (b *Batch) AddRawRequest(reqs ...kvpb.Request) {
}
}

func (b *Batch) get(key interface{}, str kvpb.KeyLockingStrengthType) {
func (b *Batch) get(
key interface{}, str kvpb.KeyLockingStrengthType, dur kvpb.KeyLockingDurabilityType,
) {
k, err := marshalKey(key)
if err != nil {
b.initResult(0, 1, notRaw, err)
return
}
b.appendReqs(kvpb.NewGet(k, str))
switch str {
case kvpb.NonLocking:
b.appendReqs(kvpb.NewGet(k))
case kvpb.ForShare, kvpb.ForUpdate:
b.appendReqs(kvpb.NewLockingGet(k, str, dur))
default:
panic(errors.AssertionFailedf("unknown str %d", str))
}
b.initResult(1, 1, notRaw, nil)
}

Expand All @@ -397,31 +406,32 @@ func (b *Batch) get(key interface{}, str kvpb.KeyLockingStrengthType) {
//
// key can be either a byte slice or a string.
func (b *Batch) Get(key interface{}) {
b.get(key, kvpb.NonLocking)
b.get(key, kvpb.NonLocking, kvpb.Invalid)
}

// GetForUpdate retrieves the value for a key. An unreplicated, exclusive lock
// is acquired on the key, if it exists. A new result will be appended to the
// batch which will contain a single row.
// GetForUpdate retrieves the value for a key, returning the retrieved key/value
// or an error. An Exclusive lock with the supplied durability is acquired on
// the key, if it exists. It is not considered an error for the key not to
// exist.
//
// r, err := db.GetForUpdate("a")
// // string(r.Rows[0].Key) == "a"
//
// key can be either a byte slice or a string.
func (b *Batch) GetForUpdate(key interface{}) {
b.get(key, kvpb.ForUpdate)
func (b *Batch) GetForUpdate(key interface{}, dur kvpb.KeyLockingDurabilityType) {
b.get(key, kvpb.ForUpdate, dur)
}

// GetForShare retrieves the value for a key. An unreplicated, shared lock
// is acquired on the key, if it exists. A new result will be appended to the
// batch which will contain a single row.
// GetForShare retrieves the value for a key. A shared lock with the supplied
// durability is acquired on the key, if it exists. A new result will be
// appended to the batch which will contain a single row.
//
// r, err := db.GetForShare("a")
// // string(r.Rows[0].Key) == "a"
//
// key can be either a byte slice or a string.
func (b *Batch) GetForShare(key interface{}) {
b.get(key, kvpb.ForShare)
func (b *Batch) GetForShare(key interface{}, dur kvpb.KeyLockingDurabilityType) {
b.get(key, kvpb.ForShare, dur)
}

func (b *Batch) put(key, value interface{}, inline bool) {
Expand Down Expand Up @@ -728,7 +738,12 @@ func (b *Batch) Inc(key interface{}, value int64) {
b.initResult(1, 1, notRaw, nil)
}

func (b *Batch) scan(s, e interface{}, isReverse bool, str kvpb.KeyLockingStrengthType) {
func (b *Batch) scan(
s, e interface{},
isReverse bool,
str kvpb.KeyLockingStrengthType,
dur kvpb.KeyLockingDurabilityType,
) {
begin, err := marshalKey(s)
if err != nil {
b.initResult(0, 0, notRaw, err)
Expand All @@ -739,10 +754,21 @@ func (b *Batch) scan(s, e interface{}, isReverse bool, str kvpb.KeyLockingStreng
b.initResult(0, 0, notRaw, err)
return
}
if !isReverse {
b.appendReqs(kvpb.NewScan(begin, end, str))
} else {
b.appendReqs(kvpb.NewReverseScan(begin, end, str))
switch str {
case kvpb.NonLocking:
if !isReverse {
b.appendReqs(kvpb.NewScan(begin, end))
} else {
b.appendReqs(kvpb.NewReverseScan(begin, end))
}
case kvpb.ForShare, kvpb.ForUpdate:
if !isReverse {
b.appendReqs(kvpb.NewLockingScan(begin, end, str, dur))
} else {
b.appendReqs(kvpb.NewLockingReverseScan(begin, end, str, dur))
}
default:
panic(errors.AssertionFailedf("unknown str %d", str))
}
b.initResult(1, 0, notRaw, nil)
}
Expand All @@ -755,31 +781,31 @@ func (b *Batch) scan(s, e interface{}, isReverse bool, str kvpb.KeyLockingStreng
//
// key can be either a byte slice or a string.
func (b *Batch) Scan(s, e interface{}) {
b.scan(s, e, false /* isReverse */, kvpb.NonLocking)
b.scan(s, e, false /* isReverse */, kvpb.NonLocking, kvpb.Invalid)
}

// ScanForUpdate retrieves the key/values between begin (inclusive) and end
// (exclusive) in ascending order. Unreplicated, exclusive locks are acquired on
// each of the returned keys.
// ScanForUpdate retrieves the rows between begin (inclusive) and end
// (exclusive) in ascending order. Exclusive locks with the supplied durability
// are acquired on each of the returned keys.
//
// A new result will be appended to the batch which will contain "rows" (each
// row is a key/value pair) and Result.Err will indicate success or failure.
//
// key can be either a byte slice or a string.
func (b *Batch) ScanForUpdate(s, e interface{}) {
b.scan(s, e, false /* isReverse */, kvpb.ForUpdate)
func (b *Batch) ScanForUpdate(s, e interface{}, dur kvpb.KeyLockingDurabilityType) {
b.scan(s, e, false /* isReverse */, kvpb.ForUpdate, dur)
}

// ScanForShare retrieves the key/values between begin (inclusive) and end
// (exclusive) in ascending order. Unreplicated, shared locks are acquired on
// each of the returned keys.
// (exclusive) in ascending order. Shared locks with the supplied durability are
// acquired on each of the returned keys.
//
// A new result will be appended to the batch which will contain "rows" (each
// row is a key/value pair) and Result.Err will indicate success or failure.
//
// key can be either a byte slice or a string.
func (b *Batch) ScanForShare(s, e interface{}) {
b.scan(s, e, false /* isReverse */, kvpb.ForShare)
func (b *Batch) ScanForShare(s, e interface{}, dur kvpb.KeyLockingDurabilityType) {
b.scan(s, e, false /* isReverse */, kvpb.ForShare, dur)
}

// ReverseScan retrieves the rows between begin (inclusive) and end (exclusive)
Expand All @@ -790,31 +816,31 @@ func (b *Batch) ScanForShare(s, e interface{}) {
//
// key can be either a byte slice or a string.
func (b *Batch) ReverseScan(s, e interface{}) {
b.scan(s, e, true /* isReverse */, kvpb.NonLocking)
b.scan(s, e, true /* isReverse */, kvpb.NonLocking, kvpb.Invalid)
}

// ReverseScanForUpdate retrieves the rows between begin (inclusive) and end
// (exclusive) in descending order. Unreplicated, exclusive locks are acquired
// on each of the returned keys.
// (exclusive) in descending order. Exclusive locks with the supplied durability
// are acquired on each of the returned keys.
//
// A new result will be appended to the batch which will contain "rows" (each
// "row" is a key/value pair) and Result.Err will indicate success or failure.
//
// key can be either a byte slice or a string.
func (b *Batch) ReverseScanForUpdate(s, e interface{}) {
b.scan(s, e, true /* isReverse */, kvpb.ForUpdate)
func (b *Batch) ReverseScanForUpdate(s, e interface{}, dur kvpb.KeyLockingDurabilityType) {
b.scan(s, e, true /* isReverse */, kvpb.ForUpdate, dur)
}

// ReverseScanForShare retrieves the rows between begin (inclusive) and end
// (exclusive) in descending order. Unreplicated, shared locks are acquired
// on each of the returned keys.
// (exclusive) in descending order. Shared locks with the supplied durability
// are acquired on each of the returned keys.
//
// A new result will be appended to the batch which will contain "rows" (each
// "row" is a key/value pair) and Result.Err will indicate success or failure.
//
// key can be either a byte slice or a string.
func (b *Batch) ReverseScanForShare(s, e interface{}) {
b.scan(s, e, true /* isReverse */, kvpb.ForShare)
func (b *Batch) ReverseScanForShare(s, e interface{}, dur kvpb.KeyLockingDurabilityType) {
b.scan(s, e, true /* isReverse */, kvpb.ForShare, dur)
}

// Del deletes one or more keys.
Expand Down
72 changes: 43 additions & 29 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,30 +343,35 @@ func (db *DB) Get(ctx context.Context, key interface{}) (KeyValue, error) {
}

// GetForUpdate retrieves the value for a key, returning the retrieved key/value
// or an error. An unreplicated, exclusive lock is acquired on the key, if it
// exists. It is not considered an error for the key not to exist.
// or an error. An Exclusive lock with the supplied durability is acquired on
// the key, if it exists. It is not considered an error for the key not to
// exist.
//
// r, err := db.GetForUpdate("a")
// // string(r.Key) == "a"
//
// key can be either a byte slice or a string.
func (db *DB) GetForUpdate(ctx context.Context, key interface{}) (KeyValue, error) {
func (db *DB) GetForUpdate(
ctx context.Context, key interface{}, dur kvpb.KeyLockingDurabilityType,
) (KeyValue, error) {
b := &Batch{}
b.GetForUpdate(key)
b.GetForUpdate(key, dur)
return getOneRow(db.Run(ctx, b), b)
}

// GetForShare retrieves the value for a key, returning the retrieved key/value
// or an error. An unreplicated, shared lock is acquired on the key, if it
// exists. It is not considered an error for the key not to exist.
// GetForShare retrieves the value for a key. A shared lock with the supplied
// durability is acquired on the key, if it exists. A new result will be
// appended to the batch which will contain a single row.
//
// r, err := db.GetForShare("a")
// // string(r.Key) == "a"
//
// key can be either a byte slice or a string.
func (db *DB) GetForShare(ctx context.Context, key interface{}) (KeyValue, error) {
func (db *DB) GetForShare(
ctx context.Context, key interface{}, dur kvpb.KeyLockingDurabilityType,
) (KeyValue, error) {
b := &Batch{}
b.GetForShare(key)
b.GetForShare(key, dur)
return getOneRow(db.Run(ctx, b), b)
}

Expand Down Expand Up @@ -494,13 +499,14 @@ func (db *DB) scan(
isReverse bool,
str kvpb.KeyLockingStrengthType,
readConsistency kvpb.ReadConsistencyType,
dur kvpb.KeyLockingDurabilityType,
) ([]KeyValue, error) {
b := &Batch{}
b.Header.ReadConsistency = readConsistency
if maxRows > 0 {
b.Header.MaxSpanRequestKeys = maxRows
}
b.scan(begin, end, isReverse, str)
b.scan(begin, end, isReverse, str, dur)
r, err := getOneResult(db.Run(ctx, b), b)
return r.Rows, err
}
Expand All @@ -512,33 +518,37 @@ func (db *DB) scan(
//
// key can be either a byte slice or a string.
func (db *DB) Scan(ctx context.Context, begin, end interface{}, maxRows int64) ([]KeyValue, error) {
return db.scan(ctx, begin, end, maxRows, false /* isReverse */, kvpb.NonLocking, kvpb.CONSISTENT)
return db.scan(ctx, begin, end, maxRows, false /* isReverse */, kvpb.NonLocking, kvpb.CONSISTENT, kvpb.Invalid)
}

// ScanForUpdate retrieves the rows between begin (inclusive) and end
// (exclusive) in ascending order. Unreplicated, exclusive locks are
// acquired on each of the returned keys.
// (exclusive) in ascending order. Exclusive locks with the supplied durability
// are acquired on each of the returned keys.
//
// The returned []KeyValue will contain up to maxRows elements.
//
// key can be either a byte slice or a string.
func (db *DB) ScanForUpdate(
ctx context.Context, begin, end interface{}, maxRows int64,
ctx context.Context, begin, end interface{}, maxRows int64, dur kvpb.KeyLockingDurabilityType,
) ([]KeyValue, error) {
return db.scan(ctx, begin, end, maxRows, false /* isReverse */, kvpb.ForUpdate, kvpb.CONSISTENT)
return db.scan(
ctx, begin, end, maxRows, false /* isReverse */, kvpb.ForUpdate, kvpb.CONSISTENT, dur,
)
}

// ScanForShare retrieves the rows between begin (inclusive) and end
// (exclusive) in ascending order. Unreplicated, shared locks are
// acquired on each of the returned keys.
// ScanForShare retrieves the rows between begin (inclusive) and end (exclusive)
// in ascending order. Shared locks with the supplied durability are acquired on
// each of the returned keys.
//
// The returned []KeyValue will contain up to maxRows elements.
//
// key can be either a byte slice or a string.
func (db *DB) ScanForShare(
ctx context.Context, begin, end interface{}, maxRows int64,
ctx context.Context, begin, end interface{}, maxRows int64, dur kvpb.KeyLockingDurabilityType,
) ([]KeyValue, error) {
return db.scan(ctx, begin, end, maxRows, false /* isReverse */, kvpb.ForShare, kvpb.CONSISTENT)
return db.scan(
ctx, begin, end, maxRows, false /* isReverse */, kvpb.ForShare, kvpb.CONSISTENT, dur,
)
}

// ReverseScan retrieves the rows between begin (inclusive) and end (exclusive)
Expand All @@ -550,33 +560,37 @@ func (db *DB) ScanForShare(
func (db *DB) ReverseScan(
ctx context.Context, begin, end interface{}, maxRows int64,
) ([]KeyValue, error) {
return db.scan(ctx, begin, end, maxRows, true /* isReverse */, kvpb.NonLocking, kvpb.CONSISTENT)
return db.scan(ctx, begin, end, maxRows, true /* isReverse */, kvpb.NonLocking, kvpb.CONSISTENT, kvpb.Invalid)
}

// ReverseScanForUpdate retrieves the rows between begin (inclusive) and end
// (exclusive) in descending order. Unreplicated, exclusive locks are acquired
// on each of the returned keys.
// (exclusive) in descending order. Exclusive locks with the supplied durability
// are acquired on each of the returned keys.
//
// The returned []KeyValue will contain up to maxRows elements.
//
// key can be either a byte slice or a string.
func (db *DB) ReverseScanForUpdate(
ctx context.Context, begin, end interface{}, maxRows int64,
ctx context.Context, begin, end interface{}, maxRows int64, dur kvpb.KeyLockingDurabilityType,
) ([]KeyValue, error) {
return db.scan(ctx, begin, end, maxRows, true /* isReverse */, kvpb.ForUpdate, kvpb.CONSISTENT)
return db.scan(
ctx, begin, end, maxRows, true /* isReverse */, kvpb.ForUpdate, kvpb.CONSISTENT, dur,
)
}

// ReverseScanForShare retrieves the rows between begin (inclusive) and end
// (exclusive) in descending order. Unreplicated, shared locks are acquired
// on each of the returned keys.
// (exclusive) in descending order. Shared locks with the supplied durability
// are acquired on each of the returned keys.
//
// The returned []KeyValue will contain up to maxRows elements.
//
// key can be either a byte slice or a string.
func (db *DB) ReverseScanForShare(
ctx context.Context, begin, end interface{}, maxRows int64,
ctx context.Context, begin, end interface{}, maxRows int64, dur kvpb.KeyLockingDurabilityType,
) ([]KeyValue, error) {
return db.scan(ctx, begin, end, maxRows, true /* isReverse */, kvpb.ForShare, kvpb.CONSISTENT)
return db.scan(
ctx, begin, end, maxRows, true /* isReverse */, kvpb.ForShare, kvpb.CONSISTENT, dur,
)
}

// Del deletes one or more keys.
Expand Down
Loading
Loading