diff --git a/pkg/kv/batch.go b/pkg/kv/batch.go index 06c21ca2a1d5..1ec5493d27f9 100644 --- a/pkg/kv/batch.go +++ b/pkg/kv/batch.go @@ -54,7 +54,7 @@ type Batch struct { reqs []roachpb.RequestUnion // approxMutationReqBytes tracks the approximate size of keys and values in - // mutations added to this batch via Put, CPut, InitPut, Del, etc. + // mutations added to this batch via Put, CPut, Del, etc. approxMutationReqBytes int // Set when AddRawRequest is used, in which case using the "other" // operations renders the batch unusable. @@ -72,8 +72,8 @@ type Batch struct { } // ApproximateMutationBytes returns the approximate byte size of the mutations -// added to this batch via Put, CPut, InitPut, Del, etc methods. Mutations added -// via AddRawRequest are not tracked. +// added to this batch via Put, CPut, Del, etc methods. Mutations added via +// AddRawRequest are not tracked. func (b *Batch) ApproximateMutationBytes() int { return b.approxMutationReqBytes } @@ -204,12 +204,6 @@ func (b *Batch) fillResults(ctx context.Context) { if result.Err == nil { row.Value = &req.Value } - case *roachpb.InitPutRequest: - row := &result.Rows[k] - row.Key = []byte(req.Key) - if result.Err == nil { - row.Value = &req.Value - } case *roachpb.IncrementRequest: row := &result.Rows[k] row.Key = []byte(req.Key) @@ -506,30 +500,6 @@ func (b *Batch) cputInternal( b.initResult(1, 1, notRaw, nil) } -// InitPut sets the first value for a key to value. An ConditionFailedError is -// reported if a value already exists for the key and it's not equal to the -// value passed in. If failOnTombstones is set to true, tombstones will return -// a ConditionFailedError just like a mismatched value. -// -// key can be either a byte slice or a string. value can be any key type, a -// protoutil.Message or any Go primitive type (bool, int, etc). It is illegal -// to set value to nil. -func (b *Batch) InitPut(key, value interface{}, failOnTombstones bool) { - k, err := marshalKey(key) - if err != nil { - b.initResult(0, 1, notRaw, err) - return - } - v, err := marshalValue(value) - if err != nil { - b.initResult(0, 1, notRaw, err) - return - } - b.appendReqs(roachpb.NewInitPut(k, v, failOnTombstones)) - b.approxMutationReqBytes += len(k) + len(v.RawBytes) - b.initResult(1, 1, notRaw, nil) -} - // Inc increments the integer value at key. If the key does not exist it will // be created with an initial value of 0 which will then be incremented. If the // key exists but was set using Put or CPut an error will be returned. diff --git a/pkg/kv/db.go b/pkg/kv/db.go index c45d71fa99bd..09601a6dc087 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -412,6 +412,17 @@ func (db *DB) CPut(ctx context.Context, key, value interface{}, expValue []byte) return getOneErr(db.Run(ctx, b), b) } +// CPutAllowingIfNotExists is like CPut except it also allows the Put when the +// existing entry does not exist -- i.e. it succeeds if there is no existing +// entry or the existing entry has the expected value. +func (db *DB) CPutAllowingIfNotExists( + ctx context.Context, key, value interface{}, expValue []byte, +) error { + b := &Batch{} + b.CPutAllowingIfNotExists(key, value, expValue) + return getOneErr(db.Run(ctx, b), b) +} + // CPutInline conditionally sets the value for a key if the existing value is // equal to expValue, but does not maintain multi-version values. To // conditionally set a value only if the key doesn't currently exist, pass an @@ -433,20 +444,6 @@ func (db *DB) CPutInline(ctx context.Context, key, value interface{}, expValue [ return getOneErr(db.Run(ctx, b), b) } -// InitPut sets the first value for a key to value. A ConditionFailedError is -// reported if a value already exists for the key and it's not equal to the -// value passed in. If failOnTombstones is set to true, tombstones count as -// mismatched values and will cause a ConditionFailedError. -// -// key can be either a byte slice or a string. value can be any key type, a -// protoutil.Message or any Go primitive type (bool, int, etc). It is illegal to -// set value to nil. -func (db *DB) InitPut(ctx context.Context, key, value interface{}, failOnTombstones bool) error { - b := &Batch{} - b.InitPut(key, value, failOnTombstones) - return getOneErr(db.Run(ctx, b), b) -} - // Inc increments the integer value at key. If the key does not exist it will // be created with an initial value of 0 which will then be incremented. If the // key exists but was set using Put or CPut an error will be returned. diff --git a/pkg/kv/db_test.go b/pkg/kv/db_test.go index 867c50adaf9b..a12cd592c840 100644 --- a/pkg/kv/db_test.go +++ b/pkg/kv/db_test.go @@ -180,6 +180,56 @@ func TestDB_CPut(t *testing.T) { checkResult(t, []byte("4"), result.ValueBytes()) } +func TestDB_CPutAllowingIfNotExists(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + s, db := setup(t) + defer s.Stopper().Stop(context.Background()) + ctx := context.Background() + + if err := db.Put(ctx, "aa", "1"); err != nil { + t.Fatal(err) + } + if err := db.CPutAllowingIfNotExists(ctx, "aa", "2", kvclientutils.StrToCPutExistingValue("1")); err != nil { + t.Fatal(err) + } + result, err := db.Get(ctx, "aa") + if err != nil { + t.Fatal(err) + } + checkResult(t, []byte("2"), result.ValueBytes()) + + if err = db.CPutAllowingIfNotExists(ctx, "aa", "3", kvclientutils.StrToCPutExistingValue("1")); err == nil { + t.Fatal("expected error from conditional put") + } + result, err = db.Get(ctx, "aa") + if err != nil { + t.Fatal(err) + } + checkResult(t, []byte("2"), result.ValueBytes()) + + // NOTE: this demonstrates the difference between CPut and + // CPutAllowingIfNotExists. A normal CPut fails when the entry + // does not already exist. CPutAllowingIfNotExists does not. + if err = db.CPutAllowingIfNotExists(ctx, "bb", "4", kvclientutils.StrToCPutExistingValue("1")); err != nil { + t.Fatal(err) + } + result, err = db.Get(ctx, "bb") + if err != nil { + t.Fatal(err) + } + checkResult(t, []byte("4"), result.ValueBytes()) + + if err = db.CPutAllowingIfNotExists(ctx, "bb", "4", kvclientutils.StrToCPutExistingValue("4")); err != nil { + t.Fatal(err) + } + result, err = db.Get(ctx, "bb") + if err != nil { + t.Fatal(err) + } + checkResult(t, []byte("4"), result.ValueBytes()) +} + func TestDB_CPutInline(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -238,38 +288,6 @@ func TestDB_CPutInline(t *testing.T) { } } -func TestDB_InitPut(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - s, db := setup(t) - defer s.Stopper().Stop(context.Background()) - ctx := context.Background() - - if err := db.InitPut(ctx, "aa", "1", false); err != nil { - t.Fatal(err) - } - if err := db.InitPut(ctx, "aa", "1", false); err != nil { - t.Fatal(err) - } - if err := db.InitPut(ctx, "aa", "2", false); err == nil { - t.Fatal("expected error from init put") - } - if _, err := db.Del(ctx, "aa"); err != nil { - t.Fatal(err) - } - if err := db.InitPut(ctx, "aa", "2", true); err == nil { - t.Fatal("expected error from init put") - } - if err := db.InitPut(ctx, "aa", "1", false); err != nil { - t.Fatal(err) - } - result, err := db.Get(ctx, "aa") - if err != nil { - t.Fatal(err) - } - checkResult(t, []byte("1"), result.ValueBytes()) -} - func TestDB_Inc(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index c1d1ef4a0a58..99fd503f50f7 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -2064,16 +2064,6 @@ func TestTxnCoordSenderRetries(t *testing.T) { tsLeaked: true, clientRetry: true, }, - { - name: "forwarded timestamp with get and initput", - afterTxnStart: func(ctx context.Context, db *kv.DB) error { - _, err := db.Get(ctx, "a") // read key to set ts cache - return err - }, - retryable: func(ctx context.Context, txn *kv.Txn) error { - return txn.InitPut(ctx, "a", "put", false /* failOnTombstones */) // put to advance txn ts - }, - }, { name: "forwarded timestamp with get and cput", afterTxnStart: func(ctx context.Context, db *kv.DB) error { @@ -2529,111 +2519,6 @@ func TestTxnCoordSenderRetries(t *testing.T) { priorReads: true, txnCoordRetry: true, }, - { - name: "write too old with initput", - afterTxnStart: func(ctx context.Context, db *kv.DB) error { - return db.Put(ctx, "iput", "put") - }, - retryable: func(ctx context.Context, txn *kv.Txn) error { - return txn.InitPut(ctx, "iput", "put", false) - }, - }, - { - name: "write too old with initput after prior read", - afterTxnStart: func(ctx context.Context, db *kv.DB) error { - return db.Put(ctx, "iput", "put") - }, - retryable: func(ctx context.Context, txn *kv.Txn) error { - return txn.InitPut(ctx, "iput", "put", false) - }, - priorReads: true, - txnCoordRetry: true, // fails on first attempt at cput with write too old - // Succeeds on second attempt. - }, - { - name: "write too old with initput matching older and newer values", - beforeTxnStart: func(ctx context.Context, db *kv.DB) error { - return db.Put(ctx, "iput", "put") - }, - afterTxnStart: func(ctx context.Context, db *kv.DB) error { - return db.Put(ctx, "iput", "put") - }, - retryable: func(ctx context.Context, txn *kv.Txn) error { - return txn.InitPut(ctx, "iput", "put", false) - }, - }, - { - name: "write too old with initput matching older and newer values after prior read", - beforeTxnStart: func(ctx context.Context, db *kv.DB) error { - return db.Put(ctx, "iput", "put") - }, - afterTxnStart: func(ctx context.Context, db *kv.DB) error { - return db.Put(ctx, "iput", "put") - }, - retryable: func(ctx context.Context, txn *kv.Txn) error { - return txn.InitPut(ctx, "iput", "put", false) - }, - priorReads: true, - // Expect a transaction coord retry, which should succeed. - txnCoordRetry: true, - }, - { - name: "write too old with initput matching older value", - beforeTxnStart: func(ctx context.Context, db *kv.DB) error { - return db.Put(ctx, "iput", "put1") - }, - afterTxnStart: func(ctx context.Context, db *kv.DB) error { - return db.Put(ctx, "iput", "put2") - }, - retryable: func(ctx context.Context, txn *kv.Txn) error { - return txn.InitPut(ctx, "iput", "put1", false) - }, - txnCoordRetry: false, // non-matching value means we fail txn coord retry - expFailure: "unexpected value", // the failure we get is a condition failed error - }, - { - name: "write too old with initput matching newer value", - beforeTxnStart: func(ctx context.Context, db *kv.DB) error { - return db.Put(ctx, "iput", "put1") - }, - afterTxnStart: func(ctx context.Context, db *kv.DB) error { - return db.Put(ctx, "iput", "put2") - }, - retryable: func(ctx context.Context, txn *kv.Txn) error { - return txn.InitPut(ctx, "iput", "put2", false) - }, - // No txn coord retry as we get condition failed error. - expFailure: "unexpected value", // the failure we get is a condition failed error - }, - { - name: "write too old with initput failing on tombstone before", - beforeTxnStart: func(ctx context.Context, db *kv.DB) error { - _, err := db.Del(ctx, "iput") - return err - }, - afterTxnStart: func(ctx context.Context, db *kv.DB) error { - return db.Put(ctx, "iput", "put2") - }, - retryable: func(ctx context.Context, txn *kv.Txn) error { - return txn.InitPut(ctx, "iput", "put2", true) - }, - expFailure: "unexpected value", // condition failed error when failing on tombstones - }, - { - name: "write too old with initput failing on tombstone after", - beforeTxnStart: func(ctx context.Context, db *kv.DB) error { - return db.Put(ctx, "iput", "put") - }, - afterTxnStart: func(ctx context.Context, db *kv.DB) error { - _, err := db.Del(ctx, "iput") - return err - }, - retryable: func(ctx context.Context, txn *kv.Txn) error { - return txn.InitPut(ctx, "iput", "put", true) - }, - txnCoordRetry: false, // non-matching value means we fail txn coord retry - expFailure: "unexpected value", // condition failed error when failing on tombstones - }, { name: "write too old with locking read", afterTxnStart: func(ctx context.Context, db *kv.DB) error { diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go index 933826e2017f..2bd40a7ab04a 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go @@ -209,26 +209,22 @@ func TestTxnPipelinerTrackInFlightWrites(t *testing.T) { cputArgs := roachpb.ConditionalPutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}} cputArgs.Sequence = 2 ba.Add(&cputArgs) - initPutArgs := roachpb.InitPutRequest{RequestHeader: roachpb.RequestHeader{Key: keyB}} - initPutArgs.Sequence = 3 - ba.Add(&initPutArgs) - incArgs := roachpb.IncrementRequest{RequestHeader: roachpb.RequestHeader{Key: keyC}} - incArgs.Sequence = 4 + incArgs := roachpb.IncrementRequest{RequestHeader: roachpb.RequestHeader{Key: keyB}} + incArgs.Sequence = 3 ba.Add(&incArgs) // Write at the same key as another write in the same batch. Will only // result in a single in-flight write, at the larger sequence number. - delArgs := roachpb.DeleteRequest{RequestHeader: roachpb.RequestHeader{Key: keyC}} - delArgs.Sequence = 5 + delArgs := roachpb.DeleteRequest{RequestHeader: roachpb.RequestHeader{Key: keyB}} + delArgs.Sequence = 4 ba.Add(&delArgs) mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { - require.Len(t, ba.Requests, 5) + require.Len(t, ba.Requests, 4) require.True(t, ba.AsyncConsensus) require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[0].GetInner()) require.IsType(t, &roachpb.ConditionalPutRequest{}, ba.Requests[1].GetInner()) - require.IsType(t, &roachpb.InitPutRequest{}, ba.Requests[2].GetInner()) - require.IsType(t, &roachpb.IncrementRequest{}, ba.Requests[3].GetInner()) - require.IsType(t, &roachpb.DeleteRequest{}, ba.Requests[4].GetInner()) + require.IsType(t, &roachpb.IncrementRequest{}, ba.Requests[2].GetInner()) + require.IsType(t, &roachpb.DeleteRequest{}, ba.Requests[3].GetInner()) qiReq := ba.Requests[0].GetQueryIntent() require.Equal(t, keyA, qiReq.Key) @@ -248,13 +244,12 @@ func TestTxnPipelinerTrackInFlightWrites(t *testing.T) { br, pErr = tp.SendLocked(ctx, ba) require.NotNil(t, br) - require.Len(t, br.Responses, 4) // QueryIntent response stripped + require.Len(t, br.Responses, 3) // QueryIntent response stripped require.IsType(t, &roachpb.ConditionalPutResponse{}, br.Responses[0].GetInner()) - require.IsType(t, &roachpb.InitPutResponse{}, br.Responses[1].GetInner()) - require.IsType(t, &roachpb.IncrementResponse{}, br.Responses[2].GetInner()) - require.IsType(t, &roachpb.DeleteResponse{}, br.Responses[3].GetInner()) + require.IsType(t, &roachpb.IncrementResponse{}, br.Responses[1].GetInner()) + require.IsType(t, &roachpb.DeleteResponse{}, br.Responses[2].GetInner()) require.Nil(t, pErr) - require.Equal(t, 3, tp.ifWrites.len()) + require.Equal(t, 2, tp.ifWrites.len()) wMin := tp.ifWrites.t.Min().(*inFlightWrite) require.Equal(t, cputArgs.Key, wMin.Key) @@ -265,41 +260,35 @@ func TestTxnPipelinerTrackInFlightWrites(t *testing.T) { // Send a final write, along with an EndTxn request. Should attempt to prove // all in-flight writes. Should NOT use async consensus. - keyD := roachpb.Key("d") ba.Requests = nil - putArgs2 := roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyD}} - putArgs2.Sequence = 6 + putArgs2 := roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyC}} + putArgs2.Sequence = 5 ba.Add(&putArgs2) etArgs := roachpb.EndTxnRequest{Commit: true} - etArgs.Sequence = 7 + etArgs.Sequence = 6 ba.Add(&etArgs) mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { - require.Len(t, ba.Requests, 5) + require.Len(t, ba.Requests, 4) require.False(t, ba.AsyncConsensus) require.IsType(t, &roachpb.PutRequest{}, ba.Requests[0].GetInner()) require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[1].GetInner()) require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[2].GetInner()) - require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[3].GetInner()) - require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[4].GetInner()) + require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[3].GetInner()) qiReq1 := ba.Requests[1].GetQueryIntent() qiReq2 := ba.Requests[2].GetQueryIntent() - qiReq3 := ba.Requests[3].GetQueryIntent() require.Equal(t, keyA, qiReq1.Key) require.Equal(t, keyB, qiReq2.Key) - require.Equal(t, keyC, qiReq3.Key) require.Equal(t, enginepb.TxnSeq(2), qiReq1.Txn.Sequence) - require.Equal(t, enginepb.TxnSeq(3), qiReq2.Txn.Sequence) - require.Equal(t, enginepb.TxnSeq(5), qiReq3.Txn.Sequence) + require.Equal(t, enginepb.TxnSeq(4), qiReq2.Txn.Sequence) - etReq := ba.Requests[4].GetEndTxn() + etReq := ba.Requests[3].GetEndTxn() require.Equal(t, []roachpb.Span{{Key: keyA}}, etReq.LockSpans) expInFlight := []roachpb.SequencedWrite{ {Key: keyA, Sequence: 2}, - {Key: keyB, Sequence: 3}, + {Key: keyB, Sequence: 4}, {Key: keyC, Sequence: 5}, - {Key: keyD, Sequence: 6}, } require.Equal(t, expInFlight, etReq.InFlightWrites) @@ -308,7 +297,6 @@ func TestTxnPipelinerTrackInFlightWrites(t *testing.T) { br.Txn.Status = roachpb.COMMITTED br.Responses[1].GetQueryIntent().FoundIntent = true br.Responses[2].GetQueryIntent().FoundIntent = true - br.Responses[3].GetQueryIntent().FoundIntent = true return br, nil }) diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator_test.go index 0beda8bb2bfb..2fa2ac595dd4 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator_test.go @@ -66,7 +66,7 @@ func TestSequenceNumberAllocation(t *testing.T) { ba.Requests = nil ba.Add(&roachpb.ConditionalPutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) ba.Add(&roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) - ba.Add(&roachpb.InitPutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) + ba.Add(&roachpb.IncrementRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) ba.Add(&roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{Key: keyA, EndKey: keyB}}) mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { @@ -157,7 +157,7 @@ func TestSequenceNumberAllocationWithStep(t *testing.T) { ba.Requests = nil ba.Add(&roachpb.ConditionalPutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) ba.Add(&roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) - ba.Add(&roachpb.InitPutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) + ba.Add(&roachpb.IncrementRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) ba.Add(&roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{Key: keyA, EndKey: keyB}}) mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { @@ -208,7 +208,7 @@ func TestSequenceNumberAllocationWithStep(t *testing.T) { ba.Requests = nil ba.Add(&roachpb.ConditionalPutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) ba.Add(&roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) - ba.Add(&roachpb.InitPutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) + ba.Add(&roachpb.IncrementRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) ba.Add(&roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{Key: keyA, EndKey: keyB}}) mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { @@ -413,7 +413,7 @@ func TestSequenceNumberAllocationAfterEpochBump(t *testing.T) { ba.Header = roachpb.Header{Txn: &txn} ba.Add(&roachpb.ConditionalPutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) ba.Add(&roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) - ba.Add(&roachpb.InitPutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) + ba.Add(&roachpb.IncrementRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { require.Len(t, ba.Requests, 3) @@ -439,7 +439,7 @@ func TestSequenceNumberAllocationAfterEpochBump(t *testing.T) { ba.Add(&roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) ba.Add(&roachpb.ConditionalPutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) ba.Add(&roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{Key: keyA, EndKey: keyB}}) - ba.Add(&roachpb.InitPutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) + ba.Add(&roachpb.IncrementRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}) mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { require.Len(t, ba.Requests, 4) diff --git a/pkg/kv/kvnemesis/doc.go b/pkg/kv/kvnemesis/doc.go index 6ed0e4fdbc42..b11b2818953d 100644 --- a/pkg/kv/kvnemesis/doc.go +++ b/pkg/kv/kvnemesis/doc.go @@ -23,7 +23,7 @@ // guarantees. // // TODO -// - CPut/InitPut/Increment +// - CPut/Increment // - ClearRange/RevertRange // - AdminRelocateRange // - AdminUnsplit diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 990cc8eb0185..2e23a758a51e 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -508,20 +508,6 @@ func (txn *Txn) CPut(ctx context.Context, key, value interface{}, expValue []byt return getOneErr(txn.Run(ctx, b), b) } -// InitPut sets the first value for a key to value. An error is reported if a -// value already exists for the key and it's not equal to the value passed in. -// If failOnTombstones is set to true, tombstones count as mismatched values -// and will cause a ConditionFailedError. -// -// key can be either a byte slice or a string. value can be any key type, a -// protoutil.Message or any Go primitive type (bool, int, etc). It is illegal to -// set value to nil. -func (txn *Txn) InitPut(ctx context.Context, key, value interface{}, failOnTombstones bool) error { - b := txn.NewBatch() - b.InitPut(key, value, failOnTombstones) - return getOneErr(txn.Run(ctx, b), b) -} - // Inc increments the integer value at key. If the key does not exist it will // be created with an initial value of 0 which will then be incremented. If the // key exists but was set using Put or CPut an error will be returned. diff --git a/pkg/kv/txn_test.go b/pkg/kv/txn_test.go index 9934ee252a95..9b2f7ba3c7e1 100644 --- a/pkg/kv/txn_test.go +++ b/pkg/kv/txn_test.go @@ -133,27 +133,6 @@ func newTestTxnFactory( }) } -func TestInitPut(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - ctx := context.Background() - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - // This test is mostly an excuse to exercise otherwise unused code. - // TODO(vivekmenezes): update test or remove when InitPut is being - // considered sufficiently tested and this path exercised. - clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */) - db := NewDB(log.MakeTestingAmbientCtxWithNewTracer(), newTestTxnFactory(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { - br := ba.CreateReply() - return br, nil - }), clock, stopper) - - txn := NewTxn(ctx, db, 0 /* gatewayNodeID */) - if pErr := txn.InitPut(ctx, "a", "b", false); pErr != nil { - t.Fatal(pErr) - } -} - // TestTransactionConfig verifies the proper unwrapping and re-wrapping of the // client's sender when starting a transaction. Also verifies that the // UserPriority is propagated to the transactional client and that the admission diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 4aec679f5751..b7e2b2557fea 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -1206,22 +1206,6 @@ func NewConditionalPutInline(key Key, value Value, expValue []byte, allowNotExis } } -// NewInitPut returns a Request initialized to put the value at key, as long as -// the key doesn't exist, returning a ConditionFailedError if the key exists and -// the existing value is different from value. If failOnTombstones is set to -// true, tombstones count as mismatched values and will cause a -// ConditionFailedError. -func NewInitPut(key Key, value Value, failOnTombstones bool) Request { - value.InitChecksum(key) - return &InitPutRequest{ - RequestHeader: RequestHeader{ - Key: key, - }, - Value: value, - FailOnTombstones: failOnTombstones, - } -} - // NewDelete returns a Request initialized to delete the value at key. func NewDelete(key Key) Request { return &DeleteRequest{ diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index a0d6b29b9abc..3fa213f7a8d7 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -300,6 +300,11 @@ message ConditionalPutResponse { // - If key exists, returns a ConditionFailedError if value != existing value // If failOnTombstones is set to true, tombstone values count as mismatched // values and will cause a ConditionFailedError. +// +// NOTE: InitPutRequest is now deprecated and should no longer be used. See +// #71074. The request type still exists for compatibility with v22.2 and +// earlier releases. The server-side handling code can be deleted once +// compatibility with v22.2 is no longer needed. message InitPutRequest { RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; Value value = 2 [(gogoproto.nullable) = false]; diff --git a/pkg/sql/backfill/backfill.go b/pkg/sql/backfill/backfill.go index 5a290dd2ff59..34bef0a1b8b6 100644 --- a/pkg/sql/backfill/backfill.go +++ b/pkg/sql/backfill/backfill.go @@ -1021,9 +1021,9 @@ func (ib *IndexBackfiller) RunIndexBackfillChunk( for _, entry := range entries { if traceKV { - log.VEventf(ctx, 2, "InitPut %s -> %s", entry.Key, entry.Value.PrettyPrint()) + log.VEventf(ctx, 2, "CPut %s -> %s", entry.Key, entry.Value.PrettyPrint()) } - batch.InitPut(entry.Key, &entry.Value, false /* failOnTombstones */) + batch.CPutAllowingIfNotExists(entry.Key, &entry.Value, entry.Value.TagAndDataBytes()) } writeBatch := txn.Run if alsoCommit { diff --git a/pkg/sql/catalog/ingesting/write_descs.go b/pkg/sql/catalog/ingesting/write_descs.go index 3e80ea2e9d68..5284a1bf6469 100644 --- a/pkg/sql/catalog/ingesting/write_descs.go +++ b/pkg/sql/catalog/ingesting/write_descs.go @@ -215,7 +215,9 @@ func WriteDescriptors( } for _, kv := range extra { - b.InitPut(kv.Key, &kv.Value, false) + // TODO DURING REVIEW: @dt, why was this an InitPut instead of a + // CPut(expect=nil)? This was added in 270d9c8. + b.CPutAllowingIfNotExists(kv.Key, &kv.Value, kv.Value.TagAndDataBytes()) } if err := txn.Run(ctx, b); err != nil { if errors.HasType(err, (*roachpb.ConditionFailedError)(nil)) { diff --git a/pkg/sql/catalog/table_elements.go b/pkg/sql/catalog/table_elements.go index 3badbc683640..e273032fe08c 100644 --- a/pkg/sql/catalog/table_elements.go +++ b/pkg/sql/catalog/table_elements.go @@ -205,7 +205,7 @@ type Index interface { NumCompositeColumns() int GetCompositeColumnID(compositeColumnOrdinal int) descpb.ColumnID UseDeletePreservingEncoding() bool - // ForcePut forces all writes to use Put rather than CPut or InitPut. + // ForcePut forces all writes to use Put rather than CPut. // // Users of this options should take great care as it // effectively mean unique constraints are not respected. diff --git a/pkg/sql/catalog/tabledesc/index.go b/pkg/sql/catalog/tabledesc/index.go index aece5c9b65e5..9615990cccd7 100644 --- a/pkg/sql/catalog/tabledesc/index.go +++ b/pkg/sql/catalog/tabledesc/index.go @@ -393,7 +393,7 @@ func (w index) UseDeletePreservingEncoding() bool { } // ForcePut returns true if writes to the index should only use Put (rather than -// CPut or InitPut). This is used by: +// CPut). This is used by: // // - indexes currently being built by the MVCC-compliant index backfiller, and // - the temporary indexes that support that process, and diff --git a/pkg/sql/catalog/tabledesc/index_test.go b/pkg/sql/catalog/tabledesc/index_test.go index d453436128bb..6b8dc729065a 100644 --- a/pkg/sql/catalog/tabledesc/index_test.go +++ b/pkg/sql/catalog/tabledesc/index_test.go @@ -381,20 +381,25 @@ func TestIndexStrictColumnIDs(t *testing.T) { require.NoError(t, err) // Retrieve KV trace and check for redundant values. - rows, err := conn.Query(`SELECT message FROM [SHOW KV TRACE FOR SESSION] WHERE message LIKE 'InitPut%'`) + rows, err := conn.Query(`SELECT message FROM [SHOW KV TRACE FOR SESSION] WHERE message LIKE 'CPut%'`) require.NoError(t, err) defer rows.Close() - require.True(t, rows.Next()) - var msg string - err = rows.Scan(&msg) - require.NoError(t, err) - expected := fmt.Sprintf(`InitPut /Table/%d/2/0/0/0/0/0/0 -> /BYTES/0x2300030003000300`, mut.GetID()) - require.Equal(t, expected, msg) + var msgs []string + for rows.Next() { + var msg string + require.NoError(t, rows.Scan(&msg)) + msgs = append(msgs, msg) + } + expectedMsgs := []string{ + fmt.Sprintf(`CPut /Table/%d/1/0/0 -> /TUPLE/2:2:Int/0`, mut.GetID()), + fmt.Sprintf(`CPut /Table/%d/2/0/0/0/0/0/0 -> /BYTES/0x2300030003000300`, mut.GetID()), + } + require.Equal(t, expectedMsgs, msgs) // Test that with the strict guarantees, this table descriptor would have been // considered invalid. idx.Version = descpb.StrictIndexColumnIDGuaranteesVersion - expected = fmt.Sprintf(`relation "t" (%d): index "sec" has duplicates in KeySuffixColumnIDs: [2 2 2 2]`, mut.GetID()) + expected := fmt.Sprintf(`relation "t" (%d): index "sec" has duplicates in KeySuffixColumnIDs: [2 2 2 2]`, mut.GetID()) require.EqualError(t, validate.Self(clusterversion.TestingClusterVersion, mut), expected) } diff --git a/pkg/sql/index_mutation_test.go b/pkg/sql/index_mutation_test.go index 195f319c148e..f85201c8c780 100644 --- a/pkg/sql/index_mutation_test.go +++ b/pkg/sql/index_mutation_test.go @@ -112,7 +112,6 @@ func getKVTrace(t *testing.T, db *gosql.DB) string { allowedKVOpTypes := []string{ "CPut", "Put", - "InitPut", "Del", "DelRange", "ClearRange", diff --git a/pkg/sql/logictest/logic.go b/pkg/sql/logictest/logic.go index 1e78677b2458..0abd24180b1d 100644 --- a/pkg/sql/logictest/logic.go +++ b/pkg/sql/logictest/logic.go @@ -898,7 +898,6 @@ type logicQuery struct { var allowedKVOpTypes = []string{ "CPut", "Put", - "InitPut", "Del", "DelRange", "ClearRange", diff --git a/pkg/sql/row/inserter.go b/pkg/sql/row/inserter.go index 9a120cfd9f69..9eb59e3e1505 100644 --- a/pkg/sql/row/inserter.go +++ b/pkg/sql/row/inserter.go @@ -102,20 +102,20 @@ func insertDelFn(ctx context.Context, b putter, key *roachpb.Key, traceKV bool) b.Del(key) } -// insertPutFn is used by insertRow when conflicts should be ignored. +// insertInvertedPutFn is used by insertRow when conflicts should be ignored. func insertInvertedPutFn( ctx context.Context, b putter, key *roachpb.Key, value *roachpb.Value, traceKV bool, ) { if traceKV { - log.VEventfDepth(ctx, 1, 2, "InitPut %s -> %s", *key, value.PrettyPrint()) + log.VEventfDepth(ctx, 1, 2, "CPut %s -> %s", *key, value.PrettyPrint()) } - b.InitPut(key, value, false) + b.CPutAllowingIfNotExists(key, value, value.TagAndDataBytes()) } type putter interface { - CPut(key, value interface{}, expValue []byte) Put(key, value interface{}) - InitPut(key, value interface{}, failOnTombstones bool) + CPut(key, value interface{}, expValue []byte) + CPutAllowingIfNotExists(key, value interface{}, expValue []byte) Del(key ...interface{}) } diff --git a/pkg/sql/row/row_converter.go b/pkg/sql/row/row_converter.go index 4a990acee0ee..f4a13dcae454 100644 --- a/pkg/sql/row/row_converter.go +++ b/pkg/sql/row/row_converter.go @@ -35,11 +35,27 @@ import ( // KVInserter implements the putter interface. type KVInserter func(roachpb.KeyValue) +// Put method of the putter interface. +func (i KVInserter) Put(key, value interface{}) { + i(roachpb.KeyValue{ + Key: *key.(*roachpb.Key), + Value: *value.(*roachpb.Value), + }) +} + // CPut is not implmented. func (i KVInserter) CPut(key, value interface{}, expValue []byte) { panic("unimplemented") } +// CPutAllowingIfNotExists method of the putter interface. +func (i KVInserter) CPutAllowingIfNotExists(key, value interface{}, expValue []byte) { + i(roachpb.KeyValue{ + Key: *key.(*roachpb.Key), + Value: *value.(*roachpb.Value), + }) +} + // Del is not implemented. func (i KVInserter) Del(key ...interface{}) { // This is called when there are multiple column families to ensure that @@ -56,22 +72,6 @@ func (i KVInserter) Del(key ...interface{}) { // empty). } -// Put method of the putter interface. -func (i KVInserter) Put(key, value interface{}) { - i(roachpb.KeyValue{ - Key: *key.(*roachpb.Key), - Value: *value.(*roachpb.Value), - }) -} - -// InitPut method of the putter interface. -func (i KVInserter) InitPut(key, value interface{}, failOnTombstones bool) { - i(roachpb.KeyValue{ - Key: *key.(*roachpb.Key), - Value: *value.(*roachpb.Value), - }) -} - // GenerateInsertRow prepares a row tuple for insertion. It fills in default // expressions, verifies non-nullable columns, and checks column widths. // diff --git a/pkg/sql/show_trace.go b/pkg/sql/show_trace.go index eedab704d8b2..2e8d9d63733e 100644 --- a/pkg/sql/show_trace.go +++ b/pkg/sql/show_trace.go @@ -160,7 +160,6 @@ var kvMsgRegexp = regexp.MustCompile( "^fetched: ", "^CPut ", "^Put ", - "^InitPut ", "^DelRange ", "^ClearRange ", "^Del ", diff --git a/pkg/sql/sqlliveness/slstorage/slstorage.go b/pkg/sql/sqlliveness/slstorage/slstorage.go index 83e85529e01c..7df5ba4e1c6c 100644 --- a/pkg/sql/sqlliveness/slstorage/slstorage.go +++ b/pkg/sql/sqlliveness/slstorage/slstorage.go @@ -400,7 +400,7 @@ func (s *Storage) Insert( k := s.makeSessionKey(sid) v := encodeValue(expiration) ctx = multitenant.WithTenantCostControlExemption(ctx) - if err := s.db.InitPut(ctx, k, &v, true); err != nil { + if err := s.db.CPutAllowingIfNotExists(ctx, k, &v, v.TagAndDataBytes()); err != nil { s.metrics.WriteFailures.Inc(1) return errors.Wrapf(err, "could not insert session %s", sid) } diff --git a/pkg/upgrade/upgrades/BUILD.bazel b/pkg/upgrade/upgrades/BUILD.bazel index 744aa9f84a73..8053b3df0632 100644 --- a/pkg/upgrade/upgrades/BUILD.bazel +++ b/pkg/upgrade/upgrades/BUILD.bazel @@ -31,6 +31,7 @@ go_library( "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/kv", + "//pkg/roachpb", "//pkg/security/username", "//pkg/sql", "//pkg/sql/catalog", diff --git a/pkg/upgrade/upgrades/descriptor_utils.go b/pkg/upgrade/upgrades/descriptor_utils.go index 77abcc951787..cb7cb001ed56 100644 --- a/pkg/upgrade/upgrades/descriptor_utils.go +++ b/pkg/upgrade/upgrades/descriptor_utils.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descidgen" @@ -69,7 +70,11 @@ func CreateSystemTableInTxn( b.CPut(tKey, desc.GetID(), nil) b.CPut(catalogkeys.MakeDescMetadataKey(codec, desc.GetID()), desc.DescriptorProto(), nil) if desc.IsSequence() { - b.InitPut(codec.SequenceKey(uint32(desc.GetID())), desc.GetSequenceOpts().Start, false /* failOnTombstones */) + // TODO DURING REVIEW: @ajwerner, why was this an InitPut instead of a + // CPut(expect=nil)? This was added in 2548cb7. + var v roachpb.Value + v.SetInt(desc.GetSequenceOpts().Start) + b.CPutAllowingIfNotExists(codec.SequenceKey(uint32(desc.GetID())), &v, v.TagAndDataBytes()) } if err := txn.Run(ctx, b); err != nil { return descpb.InvalidID, false, err