Skip to content

Commit

Permalink
kv: stop sending InitPut, replace with CPut
Browse files Browse the repository at this point in the history
First half of #71074.

This commit replaces all client-side use of InitPut request with the
equivalent use of CPut. It uses the following equivalence to perform
this mapping:
```
InitPut(key, value) -> CPutAllowingIfNotExists(key, value, value)
```

A future change in v23.2 (or do we now allow skipping one major
version?) can remove the server-side handling of InitPut and remove the
proto message entirely, once we no longer need to ensure mixed-version
compatibility with v22.2.

This is primarily a clean-up that reduces the KV API surface area.
However, it's also a useful simplification for #72614.

Release justification: None.

Release note: None.
  • Loading branch information
nvanbenschoten committed Sep 23, 2022
1 parent e8b2d03 commit f7f014d
Show file tree
Hide file tree
Showing 32 changed files with 284 additions and 458 deletions.
36 changes: 3 additions & 33 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type Batch struct {
reqs []roachpb.RequestUnion

// approxMutationReqBytes tracks the approximate size of keys and values in
// mutations added to this batch via Put, CPut, InitPut, Del, etc.
// mutations added to this batch via Put, CPut, Del, etc.
approxMutationReqBytes int
// Set when AddRawRequest is used, in which case using the "other"
// operations renders the batch unusable.
Expand All @@ -72,8 +72,8 @@ type Batch struct {
}

// ApproximateMutationBytes returns the approximate byte size of the mutations
// added to this batch via Put, CPut, InitPut, Del, etc methods. Mutations added
// via AddRawRequest are not tracked.
// added to this batch via Put, CPut, Del, etc methods. Mutations added via
// AddRawRequest are not tracked.
func (b *Batch) ApproximateMutationBytes() int {
return b.approxMutationReqBytes
}
Expand Down Expand Up @@ -204,12 +204,6 @@ func (b *Batch) fillResults(ctx context.Context) {
if result.Err == nil {
row.Value = &req.Value
}
case *roachpb.InitPutRequest:
row := &result.Rows[k]
row.Key = []byte(req.Key)
if result.Err == nil {
row.Value = &req.Value
}
case *roachpb.IncrementRequest:
row := &result.Rows[k]
row.Key = []byte(req.Key)
Expand Down Expand Up @@ -506,30 +500,6 @@ func (b *Batch) cputInternal(
b.initResult(1, 1, notRaw, nil)
}

// InitPut sets the first value for a key to value. An ConditionFailedError is
// reported if a value already exists for the key and it's not equal to the
// value passed in. If failOnTombstones is set to true, tombstones will return
// a ConditionFailedError just like a mismatched value.
//
// key can be either a byte slice or a string. value can be any key type, a
// protoutil.Message or any Go primitive type (bool, int, etc). It is illegal
// to set value to nil.
func (b *Batch) InitPut(key, value interface{}, failOnTombstones bool) {
k, err := marshalKey(key)
if err != nil {
b.initResult(0, 1, notRaw, err)
return
}
v, err := marshalValue(value)
if err != nil {
b.initResult(0, 1, notRaw, err)
return
}
b.appendReqs(roachpb.NewInitPut(k, v, failOnTombstones))
b.approxMutationReqBytes += len(k) + len(v.RawBytes)
b.initResult(1, 1, notRaw, nil)
}

// Inc increments the integer value at key. If the key does not exist it will
// be created with an initial value of 0 which will then be incremented. If the
// key exists but was set using Put or CPut an error will be returned.
Expand Down
25 changes: 11 additions & 14 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,17 @@ func (db *DB) CPut(ctx context.Context, key, value interface{}, expValue []byte)
return getOneErr(db.Run(ctx, b), b)
}

// CPutAllowingIfNotExists is like CPut except it also allows the Put when the
// existing entry does not exist -- i.e. it succeeds if there is no existing
// entry or the existing entry has the expected value.
func (db *DB) CPutAllowingIfNotExists(
ctx context.Context, key, value interface{}, expValue []byte,
) error {
b := &Batch{}
b.CPutAllowingIfNotExists(key, value, expValue)
return getOneErr(db.Run(ctx, b), b)
}

// CPutInline conditionally sets the value for a key if the existing value is
// equal to expValue, but does not maintain multi-version values. To
// conditionally set a value only if the key doesn't currently exist, pass an
Expand All @@ -433,20 +444,6 @@ func (db *DB) CPutInline(ctx context.Context, key, value interface{}, expValue [
return getOneErr(db.Run(ctx, b), b)
}

// InitPut sets the first value for a key to value. A ConditionFailedError is
// reported if a value already exists for the key and it's not equal to the
// value passed in. If failOnTombstones is set to true, tombstones count as
// mismatched values and will cause a ConditionFailedError.
//
// key can be either a byte slice or a string. value can be any key type, a
// protoutil.Message or any Go primitive type (bool, int, etc). It is illegal to
// set value to nil.
func (db *DB) InitPut(ctx context.Context, key, value interface{}, failOnTombstones bool) error {
b := &Batch{}
b.InitPut(key, value, failOnTombstones)
return getOneErr(db.Run(ctx, b), b)
}

// Inc increments the integer value at key. If the key does not exist it will
// be created with an initial value of 0 which will then be incremented. If the
// key exists but was set using Put or CPut an error will be returned.
Expand Down
82 changes: 50 additions & 32 deletions pkg/kv/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,56 @@ func TestDB_CPut(t *testing.T) {
checkResult(t, []byte("4"), result.ValueBytes())
}

func TestDB_CPutAllowingIfNotExists(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
s, db := setup(t)
defer s.Stopper().Stop(context.Background())
ctx := context.Background()

if err := db.Put(ctx, "aa", "1"); err != nil {
t.Fatal(err)
}
if err := db.CPutAllowingIfNotExists(ctx, "aa", "2", kvclientutils.StrToCPutExistingValue("1")); err != nil {
t.Fatal(err)
}
result, err := db.Get(ctx, "aa")
if err != nil {
t.Fatal(err)
}
checkResult(t, []byte("2"), result.ValueBytes())

if err = db.CPutAllowingIfNotExists(ctx, "aa", "3", kvclientutils.StrToCPutExistingValue("1")); err == nil {
t.Fatal("expected error from conditional put")
}
result, err = db.Get(ctx, "aa")
if err != nil {
t.Fatal(err)
}
checkResult(t, []byte("2"), result.ValueBytes())

// NOTE: this demonstrates the difference between CPut and
// CPutAllowingIfNotExists. A normal CPut fails when the entry
// does not already exist. CPutAllowingIfNotExists does not.
if err = db.CPutAllowingIfNotExists(ctx, "bb", "4", kvclientutils.StrToCPutExistingValue("1")); err != nil {
t.Fatal(err)
}
result, err = db.Get(ctx, "bb")
if err != nil {
t.Fatal(err)
}
checkResult(t, []byte("4"), result.ValueBytes())

if err = db.CPutAllowingIfNotExists(ctx, "bb", "4", kvclientutils.StrToCPutExistingValue("4")); err != nil {
t.Fatal(err)
}
result, err = db.Get(ctx, "bb")
if err != nil {
t.Fatal(err)
}
checkResult(t, []byte("4"), result.ValueBytes())
}

func TestDB_CPutInline(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -238,38 +288,6 @@ func TestDB_CPutInline(t *testing.T) {
}
}

func TestDB_InitPut(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
s, db := setup(t)
defer s.Stopper().Stop(context.Background())
ctx := context.Background()

if err := db.InitPut(ctx, "aa", "1", false); err != nil {
t.Fatal(err)
}
if err := db.InitPut(ctx, "aa", "1", false); err != nil {
t.Fatal(err)
}
if err := db.InitPut(ctx, "aa", "2", false); err == nil {
t.Fatal("expected error from init put")
}
if _, err := db.Del(ctx, "aa"); err != nil {
t.Fatal(err)
}
if err := db.InitPut(ctx, "aa", "2", true); err == nil {
t.Fatal("expected error from init put")
}
if err := db.InitPut(ctx, "aa", "1", false); err != nil {
t.Fatal(err)
}
result, err := db.Get(ctx, "aa")
if err != nil {
t.Fatal(err)
}
checkResult(t, []byte("1"), result.ValueBytes())
}

func TestDB_Inc(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
115 changes: 0 additions & 115 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2064,16 +2064,6 @@ func TestTxnCoordSenderRetries(t *testing.T) {
tsLeaked: true,
clientRetry: true,
},
{
name: "forwarded timestamp with get and initput",
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
_, err := db.Get(ctx, "a") // read key to set ts cache
return err
},
retryable: func(ctx context.Context, txn *kv.Txn) error {
return txn.InitPut(ctx, "a", "put", false /* failOnTombstones */) // put to advance txn ts
},
},
{
name: "forwarded timestamp with get and cput",
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
Expand Down Expand Up @@ -2529,111 +2519,6 @@ func TestTxnCoordSenderRetries(t *testing.T) {
priorReads: true,
txnCoordRetry: true,
},
{
name: "write too old with initput",
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "iput", "put")
},
retryable: func(ctx context.Context, txn *kv.Txn) error {
return txn.InitPut(ctx, "iput", "put", false)
},
},
{
name: "write too old with initput after prior read",
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "iput", "put")
},
retryable: func(ctx context.Context, txn *kv.Txn) error {
return txn.InitPut(ctx, "iput", "put", false)
},
priorReads: true,
txnCoordRetry: true, // fails on first attempt at cput with write too old
// Succeeds on second attempt.
},
{
name: "write too old with initput matching older and newer values",
beforeTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "iput", "put")
},
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "iput", "put")
},
retryable: func(ctx context.Context, txn *kv.Txn) error {
return txn.InitPut(ctx, "iput", "put", false)
},
},
{
name: "write too old with initput matching older and newer values after prior read",
beforeTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "iput", "put")
},
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "iput", "put")
},
retryable: func(ctx context.Context, txn *kv.Txn) error {
return txn.InitPut(ctx, "iput", "put", false)
},
priorReads: true,
// Expect a transaction coord retry, which should succeed.
txnCoordRetry: true,
},
{
name: "write too old with initput matching older value",
beforeTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "iput", "put1")
},
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "iput", "put2")
},
retryable: func(ctx context.Context, txn *kv.Txn) error {
return txn.InitPut(ctx, "iput", "put1", false)
},
txnCoordRetry: false, // non-matching value means we fail txn coord retry
expFailure: "unexpected value", // the failure we get is a condition failed error
},
{
name: "write too old with initput matching newer value",
beforeTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "iput", "put1")
},
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "iput", "put2")
},
retryable: func(ctx context.Context, txn *kv.Txn) error {
return txn.InitPut(ctx, "iput", "put2", false)
},
// No txn coord retry as we get condition failed error.
expFailure: "unexpected value", // the failure we get is a condition failed error
},
{
name: "write too old with initput failing on tombstone before",
beforeTxnStart: func(ctx context.Context, db *kv.DB) error {
_, err := db.Del(ctx, "iput")
return err
},
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "iput", "put2")
},
retryable: func(ctx context.Context, txn *kv.Txn) error {
return txn.InitPut(ctx, "iput", "put2", true)
},
expFailure: "unexpected value", // condition failed error when failing on tombstones
},
{
name: "write too old with initput failing on tombstone after",
beforeTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "iput", "put")
},
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
_, err := db.Del(ctx, "iput")
return err
},
retryable: func(ctx context.Context, txn *kv.Txn) error {
return txn.InitPut(ctx, "iput", "put", true)
},
txnCoordRetry: false, // non-matching value means we fail txn coord retry
expFailure: "unexpected value", // condition failed error when failing on tombstones
},
{
name: "write too old with locking read",
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
Expand Down
Loading

0 comments on commit f7f014d

Please sign in to comment.