Skip to content

Commit

Permalink
Merge pull request cockroachdb#111870 from AlexTalks/fix_backport_107…
Browse files Browse the repository at this point in the history
…680_refactor

release-23.1: storage: refactor mvcc write parameters
  • Loading branch information
AlexTalks authored Oct 6, 2023
2 parents 5dac51a + 91d85f7 commit 580cd51
Show file tree
Hide file tree
Showing 81 changed files with 678 additions and 624 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/engineccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func loadTestData(
timestamp := hlc.Timestamp{WallTime: minWallTime + rand.Int63n(int64(batchTimeSpan))}
value := roachpb.MakeValueFromBytes(randutil.RandBytes(rng, valueBytes))
value.InitChecksum(key)
if err := storage.MVCCPut(ctx, batch, nil, key, timestamp, hlc.ClockTimestamp{}, value, nil); err != nil {
if err := storage.MVCCPut(ctx, batch, key, timestamp, value, storage.MVCCWriteOptions{}); err != nil {
return nil, err
}
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/ccl/storageccl/engineccl/encrypted_fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,12 +397,10 @@ func TestPebbleEncryption2(t *testing.T) {
err = storage.MVCCPut(
context.Background(),
db,
nil, /* ms */
roachpb.Key(key),
hlc.Timestamp{},
hlc.ClockTimestamp{},
roachpb.MakeValueFromBytes([]byte(val)),
nil, /* txn */
storage.MVCCWriteOptions{},
)
require.NoError(t, err)
require.NoError(t, db.Flush())
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestRangeLookupWithOpenTransaction(t *testing.T) {
key := testutils.MakeKey(keys.Meta1Prefix, roachpb.KeyMax)
now := s.Clock().Now()
txn := roachpb.MakeTransaction("txn", roachpb.Key("foobar"), 0, now, 0, int32(s.SQLInstanceID()))
if err := storage.MVCCPutProto(context.Background(), s.Engines()[0], nil, key, now, hlc.ClockTimestamp{}, &txn, &roachpb.RangeDescriptor{}); err != nil {
if err := storage.MVCCPutProto(context.Background(), s.Engines()[0], key, now, &roachpb.RangeDescriptor{}, storage.MVCCWriteOptions{Txn: &txn}); err != nil {
t.Fatal(err)
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/abortspan/abortspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (sc *AbortSpan) Del(
ctx context.Context, reader storage.ReadWriter, ms *enginepb.MVCCStats, txnID uuid.UUID,
) error {
key := keys.AbortSpanKey(sc.rangeID, txnID)
_, err := storage.MVCCDelete(ctx, reader, ms, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil /* txn */)
_, err := storage.MVCCDelete(ctx, reader, key, hlc.Timestamp{}, storage.MVCCWriteOptions{Stats: ms})
return err
}

Expand All @@ -134,7 +134,7 @@ func (sc *AbortSpan) Put(
) error {
log.VEventf(ctx, 2, "writing abort span entry for %s", txnID.Short())
key := keys.AbortSpanKey(sc.rangeID, txnID)
return storage.MVCCPutProto(ctx, readWriter, ms, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil /* txn */, entry)
return storage.MVCCPutProto(ctx, readWriter, key, hlc.Timestamp{}, entry, storage.MVCCWriteOptions{Stats: ms})
}

// CopyTo copies the abort span entries to the abort span for the range
Expand Down Expand Up @@ -178,9 +178,9 @@ func (sc *AbortSpan) CopyTo(
if err != nil {
return err
}
return storage.MVCCPutProto(ctx, w, ms,
return storage.MVCCPutProto(ctx, w,
keys.AbortSpanKey(newRangeID, txnID),
hlc.Timestamp{}, hlc.ClockTimestamp{}, nil, &entry,
hlc.Timestamp{}, &entry, storage.MVCCWriteOptions{Stats: ms},
)
}); err != nil {
return errors.Wrap(err, "AbortSpan.CopyTo")
Expand Down
4 changes: 1 addition & 3 deletions pkg/kv/kvserver/batch_spanset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,12 +545,10 @@ func TestSpanSetMVCCResolveWriteIntentRange(t *testing.T) {
if err := storage.MVCCPut(
ctx,
eng,
nil, // ms
roachpb.Key("b"),
hlc.Timestamp{WallTime: 10}, // irrelevant
hlc.ClockTimestamp{}, // irrelevant
value,
nil, // txn
storage.MVCCWriteOptions{}, // irrelevant
); err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,7 @@ func TestEvalAddSSTable(t *testing.T) {
kv.Key.Timestamp.WallTime *= 1e9
v, err := storage.DecodeMVCCValue(kv.Value)
require.NoError(t, err)
require.NoError(t, storage.MVCCPut(ctx, b, nil, kv.Key.Key, kv.Key.Timestamp, hlc.ClockTimestamp{}, v.Value, txn))
require.NoError(t, storage.MVCCPut(ctx, b, kv.Key.Key, kv.Key.Timestamp, v.Value, storage.MVCCWriteOptions{Txn: txn}))
case storage.MVCCRangeKeyValue:
v, err := storage.DecodeMVCCValue(kv.Value)
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_clear_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ func TestCmdClearRange(t *testing.T) {
// Write some random point keys within the cleared span, above the range tombstones.
for i := 0; i < tc.keyCount; i++ {
key := roachpb.Key(fmt.Sprintf("%04d", i))
require.NoError(t, storage.MVCCPut(ctx, eng, nil, key,
hlc.Timestamp{WallTime: int64(4+i%2) * 1e9}, hlc.ClockTimestamp{}, value, nil))
require.NoError(t, storage.MVCCPut(ctx, eng, key,
hlc.Timestamp{WallTime: int64(4+i%2) * 1e9}, value, storage.MVCCWriteOptions{}))
}

// Calculate the range stats.
Expand Down
11 changes: 9 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_conditional_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,20 @@ func ConditionalPut(
}

handleMissing := storage.CPutMissingBehavior(args.AllowIfDoesNotExist)

opts := storage.MVCCWriteOptions{
Txn: h.Txn,
LocalTimestamp: cArgs.Now,
Stats: cArgs.Stats,
}

var err error
if args.Blind {
err = storage.MVCCBlindConditionalPut(
ctx, readWriter, cArgs.Stats, args.Key, ts, cArgs.Now, args.Value, args.ExpBytes, handleMissing, h.Txn)
ctx, readWriter, args.Key, ts, args.Value, args.ExpBytes, handleMissing, opts)
} else {
err = storage.MVCCConditionalPut(
ctx, readWriter, cArgs.Stats, args.Key, ts, cArgs.Now, args.Value, args.ExpBytes, handleMissing, h.Txn)
ctx, readWriter, args.Key, ts, args.Value, args.ExpBytes, handleMissing, opts)
}
// NB: even if MVCC returns an error, it may still have written an intent
// into the batch. This allows callers to consume errors like WriteTooOld
Expand Down
8 changes: 7 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,15 @@ func Delete(
h := cArgs.Header
reply := resp.(*kvpb.DeleteResponse)

opts := storage.MVCCWriteOptions{
Txn: h.Txn,
LocalTimestamp: cArgs.Now,
Stats: cArgs.Stats,
}

var err error
reply.FoundKey, err = storage.MVCCDelete(
ctx, readWriter, cArgs.Stats, args.Key, h.Timestamp, cArgs.Now, h.Txn,
ctx, readWriter, args.Key, h.Timestamp, opts,
)

// If requested, replace point tombstones with range tombstones.
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,10 @@ func DeleteRange(
// can update the Result's AcquiredLocks field.
returnKeys := args.ReturnKeys || h.Txn != nil
deleted, resumeSpan, num, err := storage.MVCCDeleteRange(
ctx, readWriter, cArgs.Stats, args.Key, args.EndKey,
h.MaxSpanRequestKeys, timestamp, cArgs.Now, h.Txn, returnKeys)
ctx, readWriter, args.Key, args.EndKey,
h.MaxSpanRequestKeys, timestamp,
storage.MVCCWriteOptions{Txn: h.Txn, LocalTimestamp: cArgs.Now, Stats: cArgs.Stats},
returnKeys)
if err == nil && args.ReturnKeys {
reply.Keys = deleted
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/batcheval/cmd_delete_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ func TestDeleteRangeTombstone(t *testing.T) {
var localTS hlc.ClockTimestamp

txn := roachpb.MakeTransaction("test", nil /* baseKey */, roachpb.NormalUserPriority, hlc.Timestamp{WallTime: 5e9}, 0, 0)
require.NoError(t, storage.MVCCPut(ctx, rw, nil, roachpb.Key("b"), hlc.Timestamp{WallTime: 2e9}, localTS, roachpb.MakeValueFromString("b2"), nil))
require.NoError(t, storage.MVCCPut(ctx, rw, nil, roachpb.Key("c"), hlc.Timestamp{WallTime: 4e9}, localTS, roachpb.MakeValueFromString("c4"), nil))
require.NoError(t, storage.MVCCPut(ctx, rw, nil, roachpb.Key("d"), hlc.Timestamp{WallTime: 2e9}, localTS, roachpb.MakeValueFromString("d2"), nil))
_, err := storage.MVCCDelete(ctx, rw, nil, roachpb.Key("d"), hlc.Timestamp{WallTime: 3e9}, localTS, nil)
require.NoError(t, storage.MVCCPut(ctx, rw, roachpb.Key("b"), hlc.Timestamp{WallTime: 2e9}, roachpb.MakeValueFromString("b2"), storage.MVCCWriteOptions{LocalTimestamp: localTS}))
require.NoError(t, storage.MVCCPut(ctx, rw, roachpb.Key("c"), hlc.Timestamp{WallTime: 4e9}, roachpb.MakeValueFromString("c4"), storage.MVCCWriteOptions{LocalTimestamp: localTS}))
require.NoError(t, storage.MVCCPut(ctx, rw, roachpb.Key("d"), hlc.Timestamp{WallTime: 2e9}, roachpb.MakeValueFromString("d2"), storage.MVCCWriteOptions{LocalTimestamp: localTS}))
_, err := storage.MVCCDelete(ctx, rw, roachpb.Key("d"), hlc.Timestamp{WallTime: 3e9}, storage.MVCCWriteOptions{LocalTimestamp: localTS})
require.NoError(t, err)
require.NoError(t, storage.MVCCPut(ctx, rw, nil, roachpb.Key("i"), hlc.Timestamp{WallTime: 5e9}, localTS, roachpb.MakeValueFromString("i5"), &txn))
require.NoError(t, storage.MVCCPut(ctx, rw, roachpb.Key("i"), hlc.Timestamp{WallTime: 5e9}, roachpb.MakeValueFromString("i5"), storage.MVCCWriteOptions{Txn: &txn, LocalTimestamp: localTS}))
require.NoError(t, storage.MVCCDeleteRangeUsingTombstone(ctx, rw, nil, roachpb.Key("f"), roachpb.Key("h"), hlc.Timestamp{WallTime: 3e9}, localTS, nil, nil, false, 0, nil))
require.NoError(t, storage.MVCCDeleteRangeUsingTombstone(ctx, rw, nil, roachpb.Key("Z"), roachpb.Key("a"), hlc.Timestamp{WallTime: 100e9}, localTS, nil, nil, false, 0, nil))
require.NoError(t, storage.MVCCDeleteRangeUsingTombstone(ctx, rw, nil, roachpb.Key("z"), roachpb.Key("|"), hlc.Timestamp{WallTime: 100e9}, localTS, nil, nil, false, 0, nil))
Expand Down
10 changes: 6 additions & 4 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,8 @@ func updateStagingTxn(
txn.LockSpans = args.LockSpans
txn.InFlightWrites = args.InFlightWrites
txnRecord := txn.AsRecord()
return storage.MVCCPutProto(ctx, readWriter, ms, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil, &txnRecord)

return storage.MVCCPutProto(ctx, readWriter, key, hlc.Timestamp{}, &txnRecord, storage.MVCCWriteOptions{Stats: ms})
}

// updateFinalizedTxn persists the COMMITTED or ABORTED transaction record with
Expand All @@ -707,6 +708,7 @@ func updateFinalizedTxn(
recordAlreadyExisted bool,
externalLocks []roachpb.Span,
) error {
opts := storage.MVCCWriteOptions{Stats: ms}
if txnAutoGC && len(externalLocks) == 0 {
if log.V(2) {
log.Infof(ctx, "auto-gc'ed %s (%d locks)", txn.Short(), len(args.LockSpans))
Expand All @@ -717,13 +719,13 @@ func updateFinalizedTxn(
// BatchRequest writes.
return nil
}
_, err := storage.MVCCDelete(ctx, readWriter, ms, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil)
_, err := storage.MVCCDelete(ctx, readWriter, key, hlc.Timestamp{}, opts)
return err
}
txn.LockSpans = externalLocks
txn.InFlightWrites = nil
txnRecord := txn.AsRecord()
return storage.MVCCPutProto(ctx, readWriter, ms, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil, &txnRecord)
return storage.MVCCPutProto(ctx, readWriter, key, hlc.Timestamp{}, &txnRecord, opts)
}

// RunCommitTrigger runs the commit trigger from an end transaction request.
Expand Down Expand Up @@ -1080,7 +1082,7 @@ func splitTriggerHelper(
if err != nil {
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to fetch last replica GC timestamp")
}
if err := storage.MVCCPutProto(ctx, batch, nil, keys.RangeLastReplicaGCTimestampKey(split.RightDesc.RangeID), hlc.Timestamp{}, hlc.ClockTimestamp{}, nil, &replicaGCTS); err != nil {
if err := storage.MVCCPutProto(ctx, batch, keys.RangeLastReplicaGCTimestampKey(split.RightDesc.RangeID), hlc.Timestamp{}, &replicaGCTS, storage.MVCCWriteOptions{}); err != nil {
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to copy last replica GC timestamp")
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,7 +1011,7 @@ func TestEndTxnUpdatesTransactionRecord(t *testing.T) {
// Write the existing transaction record, if necessary.
txnKey := keys.TransactionKey(txn.Key, txn.ID)
if c.existingTxn != nil {
if err := storage.MVCCPutProto(ctx, batch, nil, txnKey, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil, c.existingTxn); err != nil {
if err := storage.MVCCPutProto(ctx, batch, txnKey, hlc.Timestamp{}, c.existingTxn, storage.MVCCWriteOptions{}); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -1117,13 +1117,13 @@ func TestPartialRollbackOnEndTransaction(t *testing.T) {
// Write a first value at key.
v.SetString("a")
txn.Sequence = 1
if err := storage.MVCCPut(ctx, batch, nil, k, ts, hlc.ClockTimestamp{}, v, &txn); err != nil {
if err := storage.MVCCPut(ctx, batch, k, ts, v, storage.MVCCWriteOptions{Txn: &txn}); err != nil {
t.Fatal(err)
}
// Write another value.
v.SetString("b")
txn.Sequence = 2
if err := storage.MVCCPut(ctx, batch, nil, k, ts, hlc.ClockTimestamp{}, v, &txn); err != nil {
if err := storage.MVCCPut(ctx, batch, k, ts, v, storage.MVCCWriteOptions{Txn: &txn}); err != nil {
t.Fatal(err)
}

Expand All @@ -1136,7 +1136,7 @@ func TestPartialRollbackOnEndTransaction(t *testing.T) {
txnKey := keys.TransactionKey(txn.Key, txn.ID)
if storeTxnBeforeEndTxn {
txnRec := txn.AsRecord()
if err := storage.MVCCPutProto(ctx, batch, nil, txnKey, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil, &txnRec); err != nil {
if err := storage.MVCCPutProto(ctx, batch, txnKey, hlc.Timestamp{}, &txnRec, storage.MVCCWriteOptions{}); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -1562,7 +1562,7 @@ func TestResolveLocalLocks(t *testing.T) {
txn.Status = roachpb.COMMITTED

for i := 0; i < numKeys; i++ {
err := storage.MVCCPut(ctx, batch, nil, intToKey(i), ts, hlc.ClockTimestamp{}, roachpb.MakeValueFromString("a"), &txn)
err := storage.MVCCPut(ctx, batch, intToKey(i), ts, roachpb.MakeValueFromString("a"), storage.MVCCWriteOptions{Txn: &txn})
require.NoError(t, err)
}
resolvedLocks, externalLocks, err := resolveLocalLocksWithPagination(
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,7 @@ func TestRandomKeyAndTimestampExport(t *testing.T) {
valueSize := randutil.RandIntInRange(rnd, averageValueSize-100, averageValueSize+100)
value := roachpb.MakeValueFromBytes(randutil.RandBytes(rnd, valueSize))
value.InitChecksum(key)
if err := storage.MVCCPut(ctx, batch, nil, key, ts, hlc.ClockTimestamp{}, value, nil); err != nil {
if err := storage.MVCCPut(ctx, batch, key, ts, value, storage.MVCCWriteOptions{}); err != nil {
t.Fatal(err)
}

Expand All @@ -819,7 +819,7 @@ func TestRandomKeyAndTimestampExport(t *testing.T) {
ts = hlc.Timestamp{WallTime: int64(curWallTime), Logical: int32(curLogical)}
value = roachpb.MakeValueFromBytes(randutil.RandBytes(rnd, 200))
value.InitChecksum(key)
if err := storage.MVCCPut(ctx, batch, nil, key, ts, hlc.ClockTimestamp{}, value, nil); err != nil {
if err := storage.MVCCPut(ctx, batch, key, ts, value, storage.MVCCWriteOptions{}); err != nil {
t.Fatal(err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_heartbeat_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func HeartbeatTxn(
// is up for debate.
txn.LastHeartbeat.Forward(args.Now)
txnRecord := txn.AsRecord()
if err := storage.MVCCPutProto(ctx, readWriter, cArgs.Stats, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil, &txnRecord); err != nil {
if err := storage.MVCCPutProto(ctx, readWriter, key, hlc.Timestamp{}, &txnRecord, storage.MVCCWriteOptions{Stats: cArgs.Stats}); err != nil {
return result.Result{}, err
}
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_increment.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,14 @@ func Increment(
h := cArgs.Header
reply := resp.(*kvpb.IncrementResponse)

opts := storage.MVCCWriteOptions{
Txn: h.Txn,
LocalTimestamp: cArgs.Now,
Stats: cArgs.Stats,
}

newVal, err := storage.MVCCIncrement(
ctx, readWriter, cArgs.Stats, args.Key, h.Timestamp, cArgs.Now, h.Txn, args.Increment)
ctx, readWriter, args.Key, h.Timestamp, opts, args.Increment)
reply.NewValue = newVal
// NB: even if MVCC returns an error, it may still have written an intent
// into the batch. This allows callers to consume errors like WriteTooOld
Expand Down
10 changes: 8 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_init_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,19 @@ func InitPut(
args.FailOnTombstones = false
}

opts := storage.MVCCWriteOptions{
Txn: h.Txn,
LocalTimestamp: cArgs.Now,
Stats: cArgs.Stats,
}

var err error
if args.Blind {
err = storage.MVCCBlindInitPut(
ctx, readWriter, cArgs.Stats, args.Key, h.Timestamp, cArgs.Now, args.Value, args.FailOnTombstones, h.Txn)
ctx, readWriter, args.Key, h.Timestamp, args.Value, args.FailOnTombstones, opts)
} else {
err = storage.MVCCInitPut(
ctx, readWriter, cArgs.Stats, args.Key, h.Timestamp, cArgs.Now, args.Value, args.FailOnTombstones, h.Txn)
ctx, readWriter, args.Key, h.Timestamp, args.Value, args.FailOnTombstones, opts)
}
// NB: even if MVCC returns an error, it may still have written an intent
// into the batch. This allows callers to consume errors like WriteTooOld
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_push_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func PushTxn(
// in the timestamp cache.
if ok {
txnRecord := reply.PusheeTxn.AsRecord()
if err := storage.MVCCPutProto(ctx, readWriter, cArgs.Stats, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil, &txnRecord); err != nil {
if err := storage.MVCCPutProto(ctx, readWriter, key, hlc.Timestamp{}, &txnRecord, storage.MVCCWriteOptions{Stats: cArgs.Stats}); err != nil {
return result.Result{}, err
}
}
Expand All @@ -333,7 +333,7 @@ func PushTxn(
// TODO(nvanbenschoten): remove this logic in v23.2.
if ok && !cArgs.EvalCtx.ClusterSettings().Version.IsActive(ctx, clusterversion.V23_1) {
txnRecord := reply.PusheeTxn.AsRecord()
if err := storage.MVCCPutProto(ctx, readWriter, cArgs.Stats, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil, &txnRecord); err != nil {
if err := storage.MVCCPutProto(ctx, readWriter, key, hlc.Timestamp{}, &txnRecord, storage.MVCCWriteOptions{Stats: cArgs.Stats}); err != nil {
return result.Result{}, err
}
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,23 @@ func Put(
) (result.Result, error) {
args := cArgs.Args.(*kvpb.PutRequest)
h := cArgs.Header
ms := cArgs.Stats

var ts hlc.Timestamp
if !args.Inline {
ts = h.Timestamp
}

opts := storage.MVCCWriteOptions{
Txn: h.Txn,
LocalTimestamp: cArgs.Now,
Stats: cArgs.Stats,
}

var err error
if args.Blind {
err = storage.MVCCBlindPut(ctx, readWriter, ms, args.Key, ts, cArgs.Now, args.Value, h.Txn)
err = storage.MVCCBlindPut(ctx, readWriter, args.Key, ts, args.Value, opts)
} else {
err = storage.MVCCPut(ctx, readWriter, ms, args.Key, ts, cArgs.Now, args.Value, h.Txn)
err = storage.MVCCPut(ctx, readWriter, args.Key, ts, args.Value, opts)
}
// NB: even if MVCC returns an error, it may still have written an intent
// into the batch. This allows callers to consume errors like WriteTooOld
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_query_intent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestQueryIntent(t *testing.T) {

writeIntent := func(k roachpb.Key, ts int64) roachpb.Transaction {
txn := roachpb.MakeTransaction("test", k, 0, makeTS(ts), 0, 1)
_, err := storage.MVCCDelete(ctx, db, nil, k, makeTS(ts), hlc.ClockTimestamp{}, &txn)
_, err := storage.MVCCDelete(ctx, db, k, makeTS(ts), storage.MVCCWriteOptions{Txn: &txn})
require.NoError(t, err)
return txn
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ func TestQueryResolvedTimestamp(t *testing.T) {
return hlc.Timestamp{WallTime: ts}
}
writeValue := func(k string, ts int64) {
_, err := storage.MVCCDelete(ctx, db, nil, roachpb.Key(k), makeTS(ts), hlc.ClockTimestamp{}, nil)
_, err := storage.MVCCDelete(ctx, db, roachpb.Key(k), makeTS(ts), storage.MVCCWriteOptions{})
require.NoError(t, err)
}
writeIntent := func(k string, ts int64) {
txn := roachpb.MakeTransaction("test", roachpb.Key(k), 0, makeTS(ts), 0, 1)
_, err := storage.MVCCDelete(ctx, db, nil, roachpb.Key(k), makeTS(ts), hlc.ClockTimestamp{}, &txn)
_, err := storage.MVCCDelete(ctx, db, roachpb.Key(k), makeTS(ts), storage.MVCCWriteOptions{Txn: &txn})
require.NoError(t, err)
}
writeInline := func(k string) {
_, err := storage.MVCCDelete(ctx, db, nil, roachpb.Key(k), hlc.Timestamp{}, hlc.ClockTimestamp{}, nil)
_, err := storage.MVCCDelete(ctx, db, roachpb.Key(k), hlc.Timestamp{}, storage.MVCCWriteOptions{})
require.NoError(t, err)
}

Expand Down
Loading

0 comments on commit 580cd51

Please sign in to comment.