Skip to content

Commit

Permalink
kv: add lock durability options to the KV client API
Browse files Browse the repository at this point in the history
This patch adds lock durability flags to GetFor{Update,Share},
ScanFor{Update,Share}, and ReverseScanFor{Update,Share}, using which
users of the KV client API can express lock durability goals. These
then map to either replicated or unreplicated locks.

This patch then modifies existing tests to make use of replicated locks
in a few places.

Epic: none

Release note: None
  • Loading branch information
arulajmani committed Sep 21, 2023
1 parent 242dd69 commit 639b8c8
Show file tree
Hide file tree
Showing 29 changed files with 521 additions and 318 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/kvfeed/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (p *scanRequestScanner) exportSpan(
for remaining := &span; remaining != nil; {
start := timeutil.Now()
b := txn.NewBatch()
r := kvpb.NewScan(remaining.Key, remaining.EndKey, kvpb.NonLocking).(*kvpb.ScanRequest)
r := kvpb.NewScan(remaining.Key, remaining.EndKey).(*kvpb.ScanRequest)
r.ScanFormat = kvpb.BATCH_RESPONSE
b.Header.TargetBytes = targetBytesPerScan
b.AdmissionHeader = kvpb.AdmissionHeader{
Expand Down
4 changes: 2 additions & 2 deletions pkg/cli/debug_send_kv_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestSendKVBatchExample(t *testing.T) {

var ba kvpb.BatchRequest
ba.Add(kvpb.NewPut(roachpb.Key("foo"), roachpb.MakeValueFromString("bar")))
ba.Add(kvpb.NewGet(roachpb.Key("foo"), kvpb.NonLocking))
ba.Add(kvpb.NewGet(roachpb.Key("foo")))

// NOTE: This cannot be marshaled using the standard Go JSON marshaler,
// since it does not correctly (un)marshal the JSON as mandated by the
Expand Down Expand Up @@ -72,7 +72,7 @@ func TestSendKVBatch(t *testing.T) {
// Protobuf spec. Instead, use the JSON marshaler shipped with Protobuf.
var ba kvpb.BatchRequest
ba.Add(kvpb.NewPut(roachpb.Key("foo"), roachpb.MakeValueFromString("bar")))
ba.Add(kvpb.NewGet(roachpb.Key("foo"), kvpb.NonLocking))
ba.Add(kvpb.NewGet(roachpb.Key("foo"))

jsonpb := protoutil.JSONPb{}
jsonProto, err := jsonpb.Marshal(&ba)
Expand Down
100 changes: 63 additions & 37 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,13 +379,22 @@ func (b *Batch) AddRawRequest(reqs ...kvpb.Request) {
}
}

func (b *Batch) get(key interface{}, str kvpb.KeyLockingStrengthType) {
func (b *Batch) get(
key interface{}, str kvpb.KeyLockingStrengthType, dur kvpb.KeyLockingDurabilityType,
) {
k, err := marshalKey(key)
if err != nil {
b.initResult(0, 1, notRaw, err)
return
}
b.appendReqs(kvpb.NewGet(k, str))
switch str {
case kvpb.NonLocking:
b.appendReqs(kvpb.NewGet(k))
case kvpb.ForShare, kvpb.ForUpdate:
b.appendReqs(kvpb.NewLockingGet(k, str, dur))
default:
panic(errors.AssertionFailedf("unknown str %d", str))
}
b.initResult(1, 1, notRaw, nil)
}

Expand All @@ -397,31 +406,32 @@ func (b *Batch) get(key interface{}, str kvpb.KeyLockingStrengthType) {
//
// key can be either a byte slice or a string.
func (b *Batch) Get(key interface{}) {
b.get(key, kvpb.NonLocking)
b.get(key, kvpb.NonLocking, kvpb.Invalid)
}

// GetForUpdate retrieves the value for a key. An unreplicated, exclusive lock
// is acquired on the key, if it exists. A new result will be appended to the
// batch which will contain a single row.
// GetForUpdate retrieves the value for a key, returning the retrieved key/value
// or an error. An Exclusive lock with the supplied durability is acquired on
// the key, if it exists. It is not considered an error for the key not to
// exist.
//
// r, err := db.GetForUpdate("a")
// // string(r.Rows[0].Key) == "a"
//
// key can be either a byte slice or a string.
func (b *Batch) GetForUpdate(key interface{}) {
b.get(key, kvpb.ForUpdate)
func (b *Batch) GetForUpdate(key interface{}, dur kvpb.KeyLockingDurabilityType) {
b.get(key, kvpb.ForUpdate, dur)
}

// GetForShare retrieves the value for a key. An unreplicated, shared lock
// is acquired on the key, if it exists. A new result will be appended to the
// batch which will contain a single row.
// GetForShare retrieves the value for a key. A shared lock with the supplied
// durability is acquired on the key, if it exists. A new result will be
// appended to the batch which will contain a single row.
//
// r, err := db.GetForShare("a")
// // string(r.Rows[0].Key) == "a"
//
// key can be either a byte slice or a string.
func (b *Batch) GetForShare(key interface{}) {
b.get(key, kvpb.ForShare)
func (b *Batch) GetForShare(key interface{}, dur kvpb.KeyLockingDurabilityType) {
b.get(key, kvpb.ForShare, dur)
}

func (b *Batch) put(key, value interface{}, inline bool) {
Expand Down Expand Up @@ -728,7 +738,12 @@ func (b *Batch) Inc(key interface{}, value int64) {
b.initResult(1, 1, notRaw, nil)
}

func (b *Batch) scan(s, e interface{}, isReverse bool, str kvpb.KeyLockingStrengthType) {
func (b *Batch) scan(
s, e interface{},
isReverse bool,
str kvpb.KeyLockingStrengthType,
dur kvpb.KeyLockingDurabilityType,
) {
begin, err := marshalKey(s)
if err != nil {
b.initResult(0, 0, notRaw, err)
Expand All @@ -739,10 +754,21 @@ func (b *Batch) scan(s, e interface{}, isReverse bool, str kvpb.KeyLockingStreng
b.initResult(0, 0, notRaw, err)
return
}
if !isReverse {
b.appendReqs(kvpb.NewScan(begin, end, str))
} else {
b.appendReqs(kvpb.NewReverseScan(begin, end, str))
switch str {
case kvpb.NonLocking:
if !isReverse {
b.appendReqs(kvpb.NewScan(begin, end))
} else {
b.appendReqs(kvpb.NewReverseScan(begin, end))
}
case kvpb.ForShare, kvpb.ForUpdate:
if !isReverse {
b.appendReqs(kvpb.NewLockingScan(begin, end, str, dur))
} else {
b.appendReqs(kvpb.NewLockingReverseScan(begin, end, str, dur))
}
default:
panic(errors.AssertionFailedf("unknown str %d", str))
}
b.initResult(1, 0, notRaw, nil)
}
Expand All @@ -755,31 +781,31 @@ func (b *Batch) scan(s, e interface{}, isReverse bool, str kvpb.KeyLockingStreng
//
// key can be either a byte slice or a string.
func (b *Batch) Scan(s, e interface{}) {
b.scan(s, e, false /* isReverse */, kvpb.NonLocking)
b.scan(s, e, false /* isReverse */, kvpb.NonLocking, kvpb.Invalid)
}

// ScanForUpdate retrieves the key/values between begin (inclusive) and end
// (exclusive) in ascending order. Unreplicated, exclusive locks are acquired on
// each of the returned keys.
// ScanForUpdate retrieves the rows between begin (inclusive) and end
// (exclusive) in ascending order. Exclusive locks with the supplied durability
// are acquired on each of the returned keys.
//
// A new result will be appended to the batch which will contain "rows" (each
// row is a key/value pair) and Result.Err will indicate success or failure.
//
// key can be either a byte slice or a string.
func (b *Batch) ScanForUpdate(s, e interface{}) {
b.scan(s, e, false /* isReverse */, kvpb.ForUpdate)
func (b *Batch) ScanForUpdate(s, e interface{}, dur kvpb.KeyLockingDurabilityType) {
b.scan(s, e, false /* isReverse */, kvpb.ForUpdate, dur)
}

// ScanForShare retrieves the key/values between begin (inclusive) and end
// (exclusive) in ascending order. Unreplicated, shared locks are acquired on
// each of the returned keys.
// (exclusive) in ascending order. Shared locks with the supplied durability are
// acquired on each of the returned keys.
//
// A new result will be appended to the batch which will contain "rows" (each
// row is a key/value pair) and Result.Err will indicate success or failure.
//
// key can be either a byte slice or a string.
func (b *Batch) ScanForShare(s, e interface{}) {
b.scan(s, e, false /* isReverse */, kvpb.ForShare)
func (b *Batch) ScanForShare(s, e interface{}, dur kvpb.KeyLockingDurabilityType) {
b.scan(s, e, false /* isReverse */, kvpb.ForShare, dur)
}

// ReverseScan retrieves the rows between begin (inclusive) and end (exclusive)
Expand All @@ -790,31 +816,31 @@ func (b *Batch) ScanForShare(s, e interface{}) {
//
// key can be either a byte slice or a string.
func (b *Batch) ReverseScan(s, e interface{}) {
b.scan(s, e, true /* isReverse */, kvpb.NonLocking)
b.scan(s, e, true /* isReverse */, kvpb.NonLocking, kvpb.Invalid)
}

// ReverseScanForUpdate retrieves the rows between begin (inclusive) and end
// (exclusive) in descending order. Unreplicated, exclusive locks are acquired
// on each of the returned keys.
// (exclusive) in descending order. Exclusive locks with the supplied durability
// are acquired on each of the returned keys.
//
// A new result will be appended to the batch which will contain "rows" (each
// "row" is a key/value pair) and Result.Err will indicate success or failure.
//
// key can be either a byte slice or a string.
func (b *Batch) ReverseScanForUpdate(s, e interface{}) {
b.scan(s, e, true /* isReverse */, kvpb.ForUpdate)
func (b *Batch) ReverseScanForUpdate(s, e interface{}, dur kvpb.KeyLockingDurabilityType) {
b.scan(s, e, true /* isReverse */, kvpb.ForUpdate, dur)
}

// ReverseScanForShare retrieves the rows between begin (inclusive) and end
// (exclusive) in descending order. Unreplicated, shared locks are acquired
// on each of the returned keys.
// (exclusive) in descending order. Shared locks with the supplied durability
// are acquired on each of the returned keys.
//
// A new result will be appended to the batch which will contain "rows" (each
// "row" is a key/value pair) and Result.Err will indicate success or failure.
//
// key can be either a byte slice or a string.
func (b *Batch) ReverseScanForShare(s, e interface{}) {
b.scan(s, e, true /* isReverse */, kvpb.ForShare)
func (b *Batch) ReverseScanForShare(s, e interface{}, dur kvpb.KeyLockingDurabilityType) {
b.scan(s, e, true /* isReverse */, kvpb.ForShare, dur)
}

// Del deletes one or more keys.
Expand Down
72 changes: 43 additions & 29 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,30 +343,35 @@ func (db *DB) Get(ctx context.Context, key interface{}) (KeyValue, error) {
}

// GetForUpdate retrieves the value for a key, returning the retrieved key/value
// or an error. An unreplicated, exclusive lock is acquired on the key, if it
// exists. It is not considered an error for the key not to exist.
// or an error. An Exclusive lock with the supplied durability is acquired on
// the key, if it exists. It is not considered an error for the key not to
// exist.
//
// r, err := db.GetForUpdate("a")
// // string(r.Key) == "a"
//
// key can be either a byte slice or a string.
func (db *DB) GetForUpdate(ctx context.Context, key interface{}) (KeyValue, error) {
func (db *DB) GetForUpdate(
ctx context.Context, key interface{}, dur kvpb.KeyLockingDurabilityType,
) (KeyValue, error) {
b := &Batch{}
b.GetForUpdate(key)
b.GetForUpdate(key, dur)
return getOneRow(db.Run(ctx, b), b)
}

// GetForShare retrieves the value for a key, returning the retrieved key/value
// or an error. An unreplicated, shared lock is acquired on the key, if it
// exists. It is not considered an error for the key not to exist.
// GetForShare retrieves the value for a key. A shared lock with the supplied
// durability is acquired on the key, if it exists. A new result will be
// appended to the batch which will contain a single row.
//
// r, err := db.GetForShare("a")
// // string(r.Key) == "a"
//
// key can be either a byte slice or a string.
func (db *DB) GetForShare(ctx context.Context, key interface{}) (KeyValue, error) {
func (db *DB) GetForShare(
ctx context.Context, key interface{}, dur kvpb.KeyLockingDurabilityType,
) (KeyValue, error) {
b := &Batch{}
b.GetForShare(key)
b.GetForShare(key, dur)
return getOneRow(db.Run(ctx, b), b)
}

Expand Down Expand Up @@ -494,13 +499,14 @@ func (db *DB) scan(
isReverse bool,
str kvpb.KeyLockingStrengthType,
readConsistency kvpb.ReadConsistencyType,
dur kvpb.KeyLockingDurabilityType,
) ([]KeyValue, error) {
b := &Batch{}
b.Header.ReadConsistency = readConsistency
if maxRows > 0 {
b.Header.MaxSpanRequestKeys = maxRows
}
b.scan(begin, end, isReverse, str)
b.scan(begin, end, isReverse, str, dur)
r, err := getOneResult(db.Run(ctx, b), b)
return r.Rows, err
}
Expand All @@ -512,33 +518,37 @@ func (db *DB) scan(
//
// key can be either a byte slice or a string.
func (db *DB) Scan(ctx context.Context, begin, end interface{}, maxRows int64) ([]KeyValue, error) {
return db.scan(ctx, begin, end, maxRows, false /* isReverse */, kvpb.NonLocking, kvpb.CONSISTENT)
return db.scan(ctx, begin, end, maxRows, false /* isReverse */, kvpb.NonLocking, kvpb.CONSISTENT, kvpb.Invalid)
}

// ScanForUpdate retrieves the rows between begin (inclusive) and end
// (exclusive) in ascending order. Unreplicated, exclusive locks are
// acquired on each of the returned keys.
// (exclusive) in ascending order. Exclusive locks with the supplied durability
// are acquired on each of the returned keys.
//
// The returned []KeyValue will contain up to maxRows elements.
//
// key can be either a byte slice or a string.
func (db *DB) ScanForUpdate(
ctx context.Context, begin, end interface{}, maxRows int64,
ctx context.Context, begin, end interface{}, maxRows int64, dur kvpb.KeyLockingDurabilityType,
) ([]KeyValue, error) {
return db.scan(ctx, begin, end, maxRows, false /* isReverse */, kvpb.ForUpdate, kvpb.CONSISTENT)
return db.scan(
ctx, begin, end, maxRows, false /* isReverse */, kvpb.ForUpdate, kvpb.CONSISTENT, dur,
)
}

// ScanForShare retrieves the rows between begin (inclusive) and end
// (exclusive) in ascending order. Unreplicated, shared locks are
// acquired on each of the returned keys.
// ScanForShare retrieves the rows between begin (inclusive) and end (exclusive)
// in ascending order. Shared locks with the supplied durability are acquired on
// each of the returned keys.
//
// The returned []KeyValue will contain up to maxRows elements.
//
// key can be either a byte slice or a string.
func (db *DB) ScanForShare(
ctx context.Context, begin, end interface{}, maxRows int64,
ctx context.Context, begin, end interface{}, maxRows int64, dur kvpb.KeyLockingDurabilityType,
) ([]KeyValue, error) {
return db.scan(ctx, begin, end, maxRows, false /* isReverse */, kvpb.ForShare, kvpb.CONSISTENT)
return db.scan(
ctx, begin, end, maxRows, false /* isReverse */, kvpb.ForShare, kvpb.CONSISTENT, dur,
)
}

// ReverseScan retrieves the rows between begin (inclusive) and end (exclusive)
Expand All @@ -550,33 +560,37 @@ func (db *DB) ScanForShare(
func (db *DB) ReverseScan(
ctx context.Context, begin, end interface{}, maxRows int64,
) ([]KeyValue, error) {
return db.scan(ctx, begin, end, maxRows, true /* isReverse */, kvpb.NonLocking, kvpb.CONSISTENT)
return db.scan(ctx, begin, end, maxRows, true /* isReverse */, kvpb.NonLocking, kvpb.CONSISTENT, kvpb.Invalid)
}

// ReverseScanForUpdate retrieves the rows between begin (inclusive) and end
// (exclusive) in descending order. Unreplicated, exclusive locks are acquired
// on each of the returned keys.
// (exclusive) in descending order. Exclusive locks with the supplied durability
// are acquired on each of the returned keys.
//
// The returned []KeyValue will contain up to maxRows elements.
//
// key can be either a byte slice or a string.
func (db *DB) ReverseScanForUpdate(
ctx context.Context, begin, end interface{}, maxRows int64,
ctx context.Context, begin, end interface{}, maxRows int64, dur kvpb.KeyLockingDurabilityType,
) ([]KeyValue, error) {
return db.scan(ctx, begin, end, maxRows, true /* isReverse */, kvpb.ForUpdate, kvpb.CONSISTENT)
return db.scan(
ctx, begin, end, maxRows, true /* isReverse */, kvpb.ForUpdate, kvpb.CONSISTENT, dur,
)
}

// ReverseScanForShare retrieves the rows between begin (inclusive) and end
// (exclusive) in descending order. Unreplicated, shared locks are acquired
// on each of the returned keys.
// (exclusive) in descending order. Shared locks with the supplied durability
// are acquired on each of the returned keys.
//
// The returned []KeyValue will contain up to maxRows elements.
//
// key can be either a byte slice or a string.
func (db *DB) ReverseScanForShare(
ctx context.Context, begin, end interface{}, maxRows int64,
ctx context.Context, begin, end interface{}, maxRows int64, dur kvpb.KeyLockingDurabilityType,
) ([]KeyValue, error) {
return db.scan(ctx, begin, end, maxRows, true /* isReverse */, kvpb.ForShare, kvpb.CONSISTENT)
return db.scan(
ctx, begin, end, maxRows, true /* isReverse */, kvpb.ForShare, kvpb.CONSISTENT, dur,
)
}

// Del deletes one or more keys.
Expand Down
Loading

0 comments on commit 639b8c8

Please sign in to comment.