Skip to content

Commit

Permalink
Merge #59023
Browse files Browse the repository at this point in the history
59023: kv: support locking Get requests r=nvanbenschoten a=nvanbenschoten

This commit adds support for `Get` requests to acquire unreplicated locks in cases where they find a present key-value. This is supported by a new field on `GetRequest` called `KeyLocking`, which mirrors the field of the same name found on `ScanRequest` and `ReverseScanRequest`.

Much of this commit was modeled after a98be1f, which was the original commit that added locking support to ScanRequest and ReverseScanRequest.

The intention is to use this functionality to fix #46639.

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Jan 31, 2021
2 parents 3ca562d + c955e88 commit 6b58038
Show file tree
Hide file tree
Showing 20 changed files with 1,016 additions and 781 deletions.
34 changes: 25 additions & 9 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,23 +335,39 @@ func (b *Batch) AddRawRequest(reqs ...roachpb.Request) {
}
}

// Get retrieves the value for a key. A new result will be appended to the
// batch which will contain a single row.
//
// r, err := db.Get("a")
// // string(r.Rows[0].Key) == "a"
//
// key can be either a byte slice or a string.
func (b *Batch) Get(key interface{}) {
func (b *Batch) get(key interface{}, forUpdate bool) {
k, err := marshalKey(key)
if err != nil {
b.initResult(0, 1, notRaw, err)
return
}
b.appendReqs(roachpb.NewGet(k))
b.appendReqs(roachpb.NewGet(k, forUpdate))
b.initResult(1, 1, notRaw, nil)
}

// Get retrieves the value for a key. A new result will be appended to the batch
// which will contain a single row.
//
// r, err := db.Get("a")
// // string(r.Rows[0].Key) == "a"
//
// key can be either a byte slice or a string.
func (b *Batch) Get(key interface{}) {
b.get(key, false /* forUpdate */)
}

// GetForUpdate retrieves the value for a key. An unreplicated, exclusive lock
// is acquired on the key, if it exists. A new result will be appended to the
// batch which will contain a single row.
//
// r, err := db.GetForUpdate("a")
// // string(r.Rows[0].Key) == "a"
//
// key can be either a byte slice or a string.
func (b *Batch) GetForUpdate(key interface{}) {
b.get(key, true /* forUpdate */)
}

func (b *Batch) put(key, value interface{}, inline bool) {
k, err := marshalKey(key)
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,20 @@ func (db *DB) Get(ctx context.Context, key interface{}) (KeyValue, error) {
return getOneRow(db.Run(ctx, b), b)
}

// GetForUpdate retrieves the value for a key, returning the retrieved key/value
// or an error. An unreplicated, exclusive lock is acquired on the key, if it
// exists. It is not considered an error for the key not to exist.
//
// r, err := db.GetForUpdate("a")
// // string(r.Key) == "a"
//
// key can be either a byte slice or a string.
func (db *DB) GetForUpdate(ctx context.Context, key interface{}) (KeyValue, error) {
b := &Batch{}
b.GetForUpdate(key)
return getOneRow(db.Run(ctx, b), b)
}

// GetProto retrieves the value for a key and decodes the result as a proto
// message. If the key doesn't exist, the proto will simply be reset.
//
Expand Down
13 changes: 13 additions & 0 deletions pkg/kv/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,19 @@ func TestDB_Get(t *testing.T) {
checkResult(t, []byte(""), result.ValueBytes())
}

func TestDB_GetForUpdate(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
s, db := setup(t)
defer s.Stopper().Stop(context.Background())

result, err := db.GetForUpdate(context.Background(), "aa")
if err != nil {
t.Fatal(err)
}
checkResult(t, []byte(""), result.ValueBytes())
}

func TestDB_Put(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
12 changes: 6 additions & 6 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,7 @@ func TestDistSenderMovesOnFromReplicaWithStaleLease(t *testing.T) {
Lease: cachedLease,
})

get := roachpb.NewGet(roachpb.Key("a"))
get := roachpb.NewGet(roachpb.Key("a"), false /* forUpdate */)
_, pErr := kv.SendWrapped(ctx, ds, get)
require.Nil(t, pErr)

Expand Down Expand Up @@ -1768,7 +1768,7 @@ func TestDistSenderDescriptorUpdatesOnSuccessfulRPCs(t *testing.T) {

// Send a request that's going to receive a response with a RangeInfo.
k := roachpb.Key("a")
get := roachpb.NewGet(k)
get := roachpb.NewGet(k, false /* forUpdate */)
var ba roachpb.BatchRequest
ba.Add(get)
_, pErr := ds.Send(ctx, ba)
Expand Down Expand Up @@ -1879,7 +1879,7 @@ func TestSendRPCRangeNotFoundError(t *testing.T) {
Settings: cluster.MakeTestingClusterSettings(),
}
ds = NewDistSender(cfg)
get := roachpb.NewGet(roachpb.Key("b"))
get := roachpb.NewGet(roachpb.Key("b"), false /* forUpdate */)
_, err := kv.SendWrapped(ctx, ds, get)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -3488,19 +3488,19 @@ func TestCanSendToFollower(t *testing.T) {
roachpb.Header{
Txn: &roachpb.Transaction{},
},
roachpb.NewGet(roachpb.Key("a")),
roachpb.NewGet(roachpb.Key("a"), false /* forUpdate */),
1,
},
{
true,
roachpb.Header{},
roachpb.NewGet(roachpb.Key("a")),
roachpb.NewGet(roachpb.Key("a"), false /* forUpdate */),
1,
},
{
false,
roachpb.Header{},
roachpb.NewGet(roachpb.Key("a")),
roachpb.NewGet(roachpb.Key("a"), false /* forUpdate */),
2,
},
} {
Expand Down
13 changes: 10 additions & 3 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,7 @@ func TestTxnCoordSenderNoDuplicateLockSpans(t *testing.T) {
db := kv.NewDB(ambient, factory, clock, stopper)
txn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */)

// Acquire locks on a-b, c, u-w before the final batch.
// Acquire locks on a-b, c, m, u-w before the final batch.
_, pErr := txn.ReverseScanForUpdate(ctx, roachpb.Key("a"), roachpb.Key("b"), 0)
if pErr != nil {
t.Fatal(pErr)
Expand All @@ -972,23 +972,30 @@ func TestTxnCoordSenderNoDuplicateLockSpans(t *testing.T) {
if pErr != nil {
t.Fatal(pErr)
}
_, pErr = txn.GetForUpdate(ctx, roachpb.Key("m"))
if pErr != nil {
t.Fatal(pErr)
}
pErr = txn.DelRange(ctx, roachpb.Key("u"), roachpb.Key("w"))
if pErr != nil {
t.Fatal(pErr)
}

// The final batch overwrites key c and overlaps part of the a-b and u-w ranges.
// The final batch overwrites key c, reads key n, and overlaps part of the a-b and u-w ranges.
b := txn.NewBatch()
b.Put(roachpb.Key("b"), []byte("value"))
b.Put(roachpb.Key("c"), []byte("value"))
b.Put(roachpb.Key("d"), []byte("value"))
b.GetForUpdate(roachpb.Key("n"))
b.ReverseScanForUpdate(roachpb.Key("v"), roachpb.Key("z"))

// The expected locks are a-b, c, and u-z.
// The expected locks are a-b, c, m, n, and u-z.
expectedLockSpans = []roachpb.Span{
{Key: roachpb.Key("a"), EndKey: roachpb.Key("b").Next()},
{Key: roachpb.Key("c"), EndKey: nil},
{Key: roachpb.Key("d"), EndKey: nil},
{Key: roachpb.Key("m"), EndKey: nil},
{Key: roachpb.Key("n"), EndKey: nil},
{Key: roachpb.Key("u"), EndKey: roachpb.Key("z")},
}

Expand Down
13 changes: 11 additions & 2 deletions pkg/kv/kvnemesis/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func applyOp(ctx context.Context, db *kv.DB, op *Operation) {

type clientI interface {
Get(context.Context, interface{}) (kv.KeyValue, error)
GetForUpdate(context.Context, interface{}) (kv.KeyValue, error)
Put(context.Context, interface{}, interface{}) error
Scan(context.Context, interface{}, interface{}, int64) ([]kv.KeyValue, error)
ScanForUpdate(context.Context, interface{}, interface{}, int64) ([]kv.KeyValue, error)
Expand All @@ -134,7 +135,11 @@ type clientI interface {
func applyClientOp(ctx context.Context, db clientI, op *Operation) {
switch o := op.GetValue().(type) {
case *GetOperation:
kv, err := db.Get(ctx, o.Key)
fn := db.Get
if o.ForUpdate {
fn = db.GetForUpdate
}
kv, err := fn(ctx, o.Key)
if err != nil {
o.Result = resultError(ctx, err)
} else {
Expand Down Expand Up @@ -178,7 +183,11 @@ func applyBatchOp(
for i := range o.Ops {
switch subO := o.Ops[i].GetValue().(type) {
case *GetOperation:
b.Get(subO.Key)
if subO.ForUpdate {
b.GetForUpdate(subO.Key)
} else {
b.Get(subO.Key)
}
case *PutOperation:
b.Put(subO.Key, subO.Value)
case *ScanOperation:
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvnemesis/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestApplier(t *testing.T) {
check(t, step(scan(`a`, `c`)), `db1.Scan(ctx, "a", "c", 0) // ([], nil)`)

check(t, step(put(`a`, `1`)), `db0.Put(ctx, "a", 1) // nil`)
check(t, step(get(`a`)), `db1.Get(ctx, "a") // ("1", nil)`)
check(t, step(getForUpdate(`a`)), `db1.GetForUpdate(ctx, "a") // ("1", nil)`)
check(t, step(scanForUpdate(`a`, `c`)), `db0.ScanForUpdate(ctx, "a", "c", 0) // (["a":"1"], nil)`)

check(t, step(put(`b`, `2`)), `db1.Put(ctx, "b", 2) // nil`)
Expand All @@ -82,11 +82,11 @@ func TestApplier(t *testing.T) {
db1.Run(ctx, b) // nil
}
`)
checkErr(t, step(batch(put(`b`, `2`), get(`a`), scanForUpdate(`a`, `c`))), `
checkErr(t, step(batch(put(`b`, `2`), getForUpdate(`a`), scanForUpdate(`a`, `c`))), `
{
b := &Batch{}
b.Put(ctx, "b", 2) // context canceled
b.Get(ctx, "a") // (nil, context canceled)
b.GetForUpdate(ctx, "a") // (nil, context canceled)
b.ScanForUpdate(ctx, "a", "c") // (nil, context canceled)
db0.Run(ctx, b) // context canceled
}
Expand Down
39 changes: 33 additions & 6 deletions pkg/kv/kvnemesis/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,14 @@ type ClosureTxnConfig struct {
type ClientOperationConfig struct {
// GetMissing is an operation that Gets a key that definitely doesn't exist.
GetMissing int
// GetMissingForUpdate is an operation that Gets a key that definitely
// doesn't exist using a locking read.
GetMissingForUpdate int
// GetExisting is an operation that Gets a key that likely exists.
GetExisting int
// GetExistingForUpdate is an operation that Gets a key that likely exists
// using a locking read.
GetExistingForUpdate int
// PutMissing is an operation that Puts a key that definitely doesn't exist.
PutMissing int
// PutExisting is an operation that Puts a key that likely exists.
Expand Down Expand Up @@ -143,12 +149,14 @@ type ChangeLeaseConfig struct {
// yet pass (for example, if the new operation finds a kv bug or edge case).
func newAllOperationsConfig() GeneratorConfig {
clientOpConfig := ClientOperationConfig{
GetMissing: 1,
GetExisting: 1,
PutMissing: 1,
PutExisting: 1,
Scan: 1,
ScanForUpdate: 1,
GetMissing: 1,
GetMissingForUpdate: 1,
GetExisting: 1,
GetExistingForUpdate: 1,
PutMissing: 1,
PutExisting: 1,
Scan: 1,
ScanForUpdate: 1,
}
batchOpConfig := BatchOperationConfig{
Batch: 4,
Expand Down Expand Up @@ -370,9 +378,11 @@ func (g *generator) selectOp(rng *rand.Rand, contextuallyValid []opGen) Operatio

func (g *generator) registerClientOps(allowed *[]opGen, c *ClientOperationConfig) {
addOpGen(allowed, randGetMissing, c.GetMissing)
addOpGen(allowed, randGetMissingForUpdate, c.GetMissingForUpdate)
addOpGen(allowed, randPutMissing, c.PutMissing)
if len(g.keys) > 0 {
addOpGen(allowed, randGetExisting, c.GetExisting)
addOpGen(allowed, randGetExistingForUpdate, c.GetExistingForUpdate)
addOpGen(allowed, randPutExisting, c.PutExisting)
}
addOpGen(allowed, randScan, c.Scan)
Expand All @@ -387,11 +397,24 @@ func randGetMissing(_ *generator, rng *rand.Rand) Operation {
return get(randKey(rng))
}

func randGetMissingForUpdate(_ *generator, rng *rand.Rand) Operation {
op := get(randKey(rng))
op.Get.ForUpdate = true
return op
}

func randGetExisting(g *generator, rng *rand.Rand) Operation {
key := randMapKey(rng, g.keys)
return get(key)
}

func randGetExistingForUpdate(g *generator, rng *rand.Rand) Operation {
key := randMapKey(rng, g.keys)
op := get(key)
op.Get.ForUpdate = true
return op
}

func randPutMissing(g *generator, rng *rand.Rand) Operation {
value := g.getNextValue()
key := randKey(rng)
Expand Down Expand Up @@ -597,6 +620,10 @@ func get(key string) Operation {
return Operation{Get: &GetOperation{Key: []byte(key)}}
}

func getForUpdate(key string) Operation {
return Operation{Get: &GetOperation{Key: []byte(key), ForUpdate: true}}
}

func put(key, value string) Operation {
return Operation{Put: &PutOperation{Key: []byte(key), Value: []byte(value)}}
}
Expand Down
12 changes: 10 additions & 2 deletions pkg/kv/kvnemesis/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,17 @@ func TestRandStep(t *testing.T) {
switch o := op.GetValue().(type) {
case *GetOperation:
if _, ok := keys[string(o.Key)]; ok {
client.GetExisting++
if o.ForUpdate {
client.GetExistingForUpdate++
} else {
client.GetExisting++
}
} else {
client.GetMissing++
if o.ForUpdate {
client.GetMissingForUpdate++
} else {
client.GetMissing++
}
}
case *PutOperation:
if _, ok := keys[string(o.Key)]; ok {
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvnemesis/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,11 @@ func (op Operation) format(w *strings.Builder, fctx formatCtx) {
}

func (op GetOperation) format(w *strings.Builder, fctx formatCtx) {
fmt.Fprintf(w, `%s.Get(ctx, %s)`, fctx.receiver, roachpb.Key(op.Key))
methodName := `Get`
if op.ForUpdate {
methodName = `GetForUpdate`
}
fmt.Fprintf(w, `%s.%s(ctx, %s)`, fctx.receiver, methodName, roachpb.Key(op.Key))
switch op.Result.Type {
case ResultType_Error:
err := errors.DecodeError(context.TODO(), *op.Result.Err)
Expand Down
Loading

0 comments on commit 6b58038

Please sign in to comment.