Skip to content

Commit

Permalink
storage: refactor mvcc write parameters
Browse files Browse the repository at this point in the history
This change introduces `MVCCWriteOptions`, a structure for bundling
parameters for `MVCCPut`, `MVCCDelete`, and their many variants, and
refactors usages of these functions across the code base in order to
move the existing function arguments into this structure. In addition to
allowing the code to eliminate specifying default values in many
callers, this enables the ability to pass new flags to write operations
such as the replay protection needed to address cockroachdb#103817.

Part of: cockroachdb#103817

Release note: None
  • Loading branch information
AlexTalks committed Sep 19, 2023
1 parent 79bc0fc commit eff2a5d
Show file tree
Hide file tree
Showing 81 changed files with 678 additions and 624 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/engineccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func loadTestData(
timestamp := hlc.Timestamp{WallTime: minWallTime + rand.Int63n(int64(batchTimeSpan))}
value := roachpb.MakeValueFromBytes(randutil.RandBytes(rng, valueBytes))
value.InitChecksum(key)
if err := storage.MVCCPut(ctx, batch, nil, key, timestamp, hlc.ClockTimestamp{}, value, nil); err != nil {
if err := storage.MVCCPut(ctx, batch, key, timestamp, value, storage.MVCCWriteOptions{}); err != nil {
return nil, err
}
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/ccl/storageccl/engineccl/encrypted_fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,12 +397,10 @@ func TestPebbleEncryption2(t *testing.T) {
err = storage.MVCCPut(
context.Background(),
db,
nil, /* ms */
roachpb.Key(key),
hlc.Timestamp{},
hlc.ClockTimestamp{},
roachpb.MakeValueFromBytes([]byte(val)),
nil, /* txn */
storage.MVCCWriteOptions{},
)
require.NoError(t, err)
require.NoError(t, db.Flush())
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestRangeLookupWithOpenTransaction(t *testing.T) {
key := testutils.MakeKey(keys.Meta1Prefix, roachpb.KeyMax)
now := s.Clock().Now()
txn := roachpb.MakeTransaction("txn", roachpb.Key("foobar"), 0, now, 0, int32(s.SQLInstanceID()))
if err := storage.MVCCPutProto(context.Background(), s.Engines()[0], nil, key, now, hlc.ClockTimestamp{}, &txn, &roachpb.RangeDescriptor{}); err != nil {
if err := storage.MVCCPutProto(context.Background(), s.Engines()[0], key, now, &roachpb.RangeDescriptor{}, storage.MVCCWriteOptions{Txn: &txn}); err != nil {
t.Fatal(err)
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/abortspan/abortspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (sc *AbortSpan) Del(
ctx context.Context, reader storage.ReadWriter, ms *enginepb.MVCCStats, txnID uuid.UUID,
) error {
key := keys.AbortSpanKey(sc.rangeID, txnID)
_, err := storage.MVCCDelete(ctx, reader, ms, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil /* txn */)
_, err := storage.MVCCDelete(ctx, reader, key, hlc.Timestamp{}, storage.MVCCWriteOptions{Stats: ms})
return err
}

Expand All @@ -134,7 +134,7 @@ func (sc *AbortSpan) Put(
) error {
log.VEventf(ctx, 2, "writing abort span entry for %s", txnID.Short())
key := keys.AbortSpanKey(sc.rangeID, txnID)
return storage.MVCCPutProto(ctx, readWriter, ms, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil /* txn */, entry)
return storage.MVCCPutProto(ctx, readWriter, key, hlc.Timestamp{}, entry, storage.MVCCWriteOptions{Stats: ms})
}

// CopyTo copies the abort span entries to the abort span for the range
Expand Down Expand Up @@ -178,9 +178,9 @@ func (sc *AbortSpan) CopyTo(
if err != nil {
return err
}
return storage.MVCCPutProto(ctx, w, ms,
return storage.MVCCPutProto(ctx, w,
keys.AbortSpanKey(newRangeID, txnID),
hlc.Timestamp{}, hlc.ClockTimestamp{}, nil, &entry,
hlc.Timestamp{}, &entry, storage.MVCCWriteOptions{Stats: ms},
)
}); err != nil {
return errors.Wrap(err, "AbortSpan.CopyTo")
Expand Down
4 changes: 1 addition & 3 deletions pkg/kv/kvserver/batch_spanset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,12 +545,10 @@ func TestSpanSetMVCCResolveWriteIntentRange(t *testing.T) {
if err := storage.MVCCPut(
ctx,
eng,
nil, // ms
roachpb.Key("b"),
hlc.Timestamp{WallTime: 10}, // irrelevant
hlc.ClockTimestamp{}, // irrelevant
value,
nil, // txn
storage.MVCCWriteOptions{}, // irrelevant
); err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,7 @@ func TestEvalAddSSTable(t *testing.T) {
kv.Key.Timestamp.WallTime *= 1e9
v, err := storage.DecodeMVCCValue(kv.Value)
require.NoError(t, err)
require.NoError(t, storage.MVCCPut(ctx, b, nil, kv.Key.Key, kv.Key.Timestamp, hlc.ClockTimestamp{}, v.Value, txn))
require.NoError(t, storage.MVCCPut(ctx, b, kv.Key.Key, kv.Key.Timestamp, v.Value, storage.MVCCWriteOptions{Txn: txn}))
case storage.MVCCRangeKeyValue:
v, err := storage.DecodeMVCCValue(kv.Value)
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_clear_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ func TestCmdClearRange(t *testing.T) {
// Write some random point keys within the cleared span, above the range tombstones.
for i := 0; i < tc.keyCount; i++ {
key := roachpb.Key(fmt.Sprintf("%04d", i))
require.NoError(t, storage.MVCCPut(ctx, eng, nil, key,
hlc.Timestamp{WallTime: int64(4+i%2) * 1e9}, hlc.ClockTimestamp{}, value, nil))
require.NoError(t, storage.MVCCPut(ctx, eng, key,
hlc.Timestamp{WallTime: int64(4+i%2) * 1e9}, value, storage.MVCCWriteOptions{}))
}

// Calculate the range stats.
Expand Down
11 changes: 9 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_conditional_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,20 @@ func ConditionalPut(
}

handleMissing := storage.CPutMissingBehavior(args.AllowIfDoesNotExist)

opts := storage.MVCCWriteOptions{
Txn: h.Txn,
LocalTimestamp: cArgs.Now,
Stats: cArgs.Stats,
}

var err error
if args.Blind {
err = storage.MVCCBlindConditionalPut(
ctx, readWriter, cArgs.Stats, args.Key, ts, cArgs.Now, args.Value, args.ExpBytes, handleMissing, h.Txn)
ctx, readWriter, args.Key, ts, args.Value, args.ExpBytes, handleMissing, opts)
} else {
err = storage.MVCCConditionalPut(
ctx, readWriter, cArgs.Stats, args.Key, ts, cArgs.Now, args.Value, args.ExpBytes, handleMissing, h.Txn)
ctx, readWriter, args.Key, ts, args.Value, args.ExpBytes, handleMissing, opts)
}
// NB: even if MVCC returns an error, it may still have written an intent
// into the batch. This allows callers to consume errors like WriteTooOld
Expand Down
8 changes: 7 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,15 @@ func Delete(
h := cArgs.Header
reply := resp.(*kvpb.DeleteResponse)

opts := storage.MVCCWriteOptions{
Txn: h.Txn,
LocalTimestamp: cArgs.Now,
Stats: cArgs.Stats,
}

var err error
reply.FoundKey, err = storage.MVCCDelete(
ctx, readWriter, cArgs.Stats, args.Key, h.Timestamp, cArgs.Now, h.Txn,
ctx, readWriter, args.Key, h.Timestamp, opts,
)

// If requested, replace point tombstones with range tombstones.
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,10 @@ func DeleteRange(
// can update the Result's AcquiredLocks field.
returnKeys := args.ReturnKeys || h.Txn != nil
deleted, resumeSpan, num, err := storage.MVCCDeleteRange(
ctx, readWriter, cArgs.Stats, args.Key, args.EndKey,
h.MaxSpanRequestKeys, timestamp, cArgs.Now, h.Txn, returnKeys)
ctx, readWriter, args.Key, args.EndKey,
h.MaxSpanRequestKeys, timestamp,
storage.MVCCWriteOptions{Txn: h.Txn, LocalTimestamp: cArgs.Now, Stats: cArgs.Stats},
returnKeys)
if err == nil && args.ReturnKeys {
reply.Keys = deleted
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/batcheval/cmd_delete_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ func TestDeleteRangeTombstone(t *testing.T) {
var localTS hlc.ClockTimestamp

txn := roachpb.MakeTransaction("test", nil /* baseKey */, roachpb.NormalUserPriority, hlc.Timestamp{WallTime: 5e9}, 0, 0)
require.NoError(t, storage.MVCCPut(ctx, rw, nil, roachpb.Key("b"), hlc.Timestamp{WallTime: 2e9}, localTS, roachpb.MakeValueFromString("b2"), nil))
require.NoError(t, storage.MVCCPut(ctx, rw, nil, roachpb.Key("c"), hlc.Timestamp{WallTime: 4e9}, localTS, roachpb.MakeValueFromString("c4"), nil))
require.NoError(t, storage.MVCCPut(ctx, rw, nil, roachpb.Key("d"), hlc.Timestamp{WallTime: 2e9}, localTS, roachpb.MakeValueFromString("d2"), nil))
_, err := storage.MVCCDelete(ctx, rw, nil, roachpb.Key("d"), hlc.Timestamp{WallTime: 3e9}, localTS, nil)
require.NoError(t, storage.MVCCPut(ctx, rw, roachpb.Key("b"), hlc.Timestamp{WallTime: 2e9}, roachpb.MakeValueFromString("b2"), storage.MVCCWriteOptions{LocalTimestamp: localTS}))
require.NoError(t, storage.MVCCPut(ctx, rw, roachpb.Key("c"), hlc.Timestamp{WallTime: 4e9}, roachpb.MakeValueFromString("c4"), storage.MVCCWriteOptions{LocalTimestamp: localTS}))
require.NoError(t, storage.MVCCPut(ctx, rw, roachpb.Key("d"), hlc.Timestamp{WallTime: 2e9}, roachpb.MakeValueFromString("d2"), storage.MVCCWriteOptions{LocalTimestamp: localTS}))
_, err := storage.MVCCDelete(ctx, rw, roachpb.Key("d"), hlc.Timestamp{WallTime: 3e9}, storage.MVCCWriteOptions{LocalTimestamp: localTS})
require.NoError(t, err)
require.NoError(t, storage.MVCCPut(ctx, rw, nil, roachpb.Key("i"), hlc.Timestamp{WallTime: 5e9}, localTS, roachpb.MakeValueFromString("i5"), &txn))
require.NoError(t, storage.MVCCPut(ctx, rw, roachpb.Key("i"), hlc.Timestamp{WallTime: 5e9}, roachpb.MakeValueFromString("i5"), storage.MVCCWriteOptions{Txn: &txn, LocalTimestamp: localTS}))
require.NoError(t, storage.MVCCDeleteRangeUsingTombstone(ctx, rw, nil, roachpb.Key("f"), roachpb.Key("h"), hlc.Timestamp{WallTime: 3e9}, localTS, nil, nil, false, 0, nil))
require.NoError(t, storage.MVCCDeleteRangeUsingTombstone(ctx, rw, nil, roachpb.Key("Z"), roachpb.Key("a"), hlc.Timestamp{WallTime: 100e9}, localTS, nil, nil, false, 0, nil))
require.NoError(t, storage.MVCCDeleteRangeUsingTombstone(ctx, rw, nil, roachpb.Key("z"), roachpb.Key("|"), hlc.Timestamp{WallTime: 100e9}, localTS, nil, nil, false, 0, nil))
Expand Down
10 changes: 6 additions & 4 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,8 @@ func updateStagingTxn(
txn.LockSpans = args.LockSpans
txn.InFlightWrites = args.InFlightWrites
txnRecord := txn.AsRecord()
return storage.MVCCPutProto(ctx, readWriter, ms, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil, &txnRecord)

return storage.MVCCPutProto(ctx, readWriter, key, hlc.Timestamp{}, &txnRecord, storage.MVCCWriteOptions{Stats: ms})
}

// updateFinalizedTxn persists the COMMITTED or ABORTED transaction record with
Expand All @@ -707,6 +708,7 @@ func updateFinalizedTxn(
recordAlreadyExisted bool,
externalLocks []roachpb.Span,
) error {
opts := storage.MVCCWriteOptions{Stats: ms}
if txnAutoGC && len(externalLocks) == 0 {
if log.V(2) {
log.Infof(ctx, "auto-gc'ed %s (%d locks)", txn.Short(), len(args.LockSpans))
Expand All @@ -717,13 +719,13 @@ func updateFinalizedTxn(
// BatchRequest writes.
return nil
}
_, err := storage.MVCCDelete(ctx, readWriter, ms, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil)
_, err := storage.MVCCDelete(ctx, readWriter, key, hlc.Timestamp{}, opts)
return err
}
txn.LockSpans = externalLocks
txn.InFlightWrites = nil
txnRecord := txn.AsRecord()
return storage.MVCCPutProto(ctx, readWriter, ms, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil, &txnRecord)
return storage.MVCCPutProto(ctx, readWriter, key, hlc.Timestamp{}, &txnRecord, opts)
}

// RunCommitTrigger runs the commit trigger from an end transaction request.
Expand Down Expand Up @@ -1080,7 +1082,7 @@ func splitTriggerHelper(
if err != nil {
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to fetch last replica GC timestamp")
}
if err := storage.MVCCPutProto(ctx, batch, nil, keys.RangeLastReplicaGCTimestampKey(split.RightDesc.RangeID), hlc.Timestamp{}, hlc.ClockTimestamp{}, nil, &replicaGCTS); err != nil {
if err := storage.MVCCPutProto(ctx, batch, keys.RangeLastReplicaGCTimestampKey(split.RightDesc.RangeID), hlc.Timestamp{}, &replicaGCTS, storage.MVCCWriteOptions{}); err != nil {
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to copy last replica GC timestamp")
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,7 +1011,7 @@ func TestEndTxnUpdatesTransactionRecord(t *testing.T) {
// Write the existing transaction record, if necessary.
txnKey := keys.TransactionKey(txn.Key, txn.ID)
if c.existingTxn != nil {
if err := storage.MVCCPutProto(ctx, batch, nil, txnKey, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil, c.existingTxn); err != nil {
if err := storage.MVCCPutProto(ctx, batch, txnKey, hlc.Timestamp{}, c.existingTxn, storage.MVCCWriteOptions{}); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -1117,13 +1117,13 @@ func TestPartialRollbackOnEndTransaction(t *testing.T) {
// Write a first value at key.
v.SetString("a")
txn.Sequence = 1
if err := storage.MVCCPut(ctx, batch, nil, k, ts, hlc.ClockTimestamp{}, v, &txn); err != nil {
if err := storage.MVCCPut(ctx, batch, k, ts, v, storage.MVCCWriteOptions{Txn: &txn}); err != nil {
t.Fatal(err)
}
// Write another value.
v.SetString("b")
txn.Sequence = 2
if err := storage.MVCCPut(ctx, batch, nil, k, ts, hlc.ClockTimestamp{}, v, &txn); err != nil {
if err := storage.MVCCPut(ctx, batch, k, ts, v, storage.MVCCWriteOptions{Txn: &txn}); err != nil {
t.Fatal(err)
}

Expand All @@ -1136,7 +1136,7 @@ func TestPartialRollbackOnEndTransaction(t *testing.T) {
txnKey := keys.TransactionKey(txn.Key, txn.ID)
if storeTxnBeforeEndTxn {
txnRec := txn.AsRecord()
if err := storage.MVCCPutProto(ctx, batch, nil, txnKey, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil, &txnRec); err != nil {
if err := storage.MVCCPutProto(ctx, batch, txnKey, hlc.Timestamp{}, &txnRec, storage.MVCCWriteOptions{}); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -1562,7 +1562,7 @@ func TestResolveLocalLocks(t *testing.T) {
txn.Status = roachpb.COMMITTED

for i := 0; i < numKeys; i++ {
err := storage.MVCCPut(ctx, batch, nil, intToKey(i), ts, hlc.ClockTimestamp{}, roachpb.MakeValueFromString("a"), &txn)
err := storage.MVCCPut(ctx, batch, intToKey(i), ts, roachpb.MakeValueFromString("a"), storage.MVCCWriteOptions{Txn: &txn})
require.NoError(t, err)
}
resolvedLocks, externalLocks, err := resolveLocalLocksWithPagination(
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,7 @@ func TestRandomKeyAndTimestampExport(t *testing.T) {
valueSize := randutil.RandIntInRange(rnd, averageValueSize-100, averageValueSize+100)
value := roachpb.MakeValueFromBytes(randutil.RandBytes(rnd, valueSize))
value.InitChecksum(key)
if err := storage.MVCCPut(ctx, batch, nil, key, ts, hlc.ClockTimestamp{}, value, nil); err != nil {
if err := storage.MVCCPut(ctx, batch, key, ts, value, storage.MVCCWriteOptions{}); err != nil {
t.Fatal(err)
}

Expand All @@ -819,7 +819,7 @@ func TestRandomKeyAndTimestampExport(t *testing.T) {
ts = hlc.Timestamp{WallTime: int64(curWallTime), Logical: int32(curLogical)}
value = roachpb.MakeValueFromBytes(randutil.RandBytes(rnd, 200))
value.InitChecksum(key)
if err := storage.MVCCPut(ctx, batch, nil, key, ts, hlc.ClockTimestamp{}, value, nil); err != nil {
if err := storage.MVCCPut(ctx, batch, key, ts, value, storage.MVCCWriteOptions{}); err != nil {
t.Fatal(err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_heartbeat_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func HeartbeatTxn(
// is up for debate.
txn.LastHeartbeat.Forward(args.Now)
txnRecord := txn.AsRecord()
if err := storage.MVCCPutProto(ctx, readWriter, cArgs.Stats, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil, &txnRecord); err != nil {
if err := storage.MVCCPutProto(ctx, readWriter, key, hlc.Timestamp{}, &txnRecord, storage.MVCCWriteOptions{Stats: cArgs.Stats}); err != nil {
return result.Result{}, err
}
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_increment.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,14 @@ func Increment(
h := cArgs.Header
reply := resp.(*kvpb.IncrementResponse)

opts := storage.MVCCWriteOptions{
Txn: h.Txn,
LocalTimestamp: cArgs.Now,
Stats: cArgs.Stats,
}

newVal, err := storage.MVCCIncrement(
ctx, readWriter, cArgs.Stats, args.Key, h.Timestamp, cArgs.Now, h.Txn, args.Increment)
ctx, readWriter, args.Key, h.Timestamp, opts, args.Increment)
reply.NewValue = newVal
// NB: even if MVCC returns an error, it may still have written an intent
// into the batch. This allows callers to consume errors like WriteTooOld
Expand Down
10 changes: 8 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_init_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,19 @@ func InitPut(
args.FailOnTombstones = false
}

opts := storage.MVCCWriteOptions{
Txn: h.Txn,
LocalTimestamp: cArgs.Now,
Stats: cArgs.Stats,
}

var err error
if args.Blind {
err = storage.MVCCBlindInitPut(
ctx, readWriter, cArgs.Stats, args.Key, h.Timestamp, cArgs.Now, args.Value, args.FailOnTombstones, h.Txn)
ctx, readWriter, args.Key, h.Timestamp, args.Value, args.FailOnTombstones, opts)
} else {
err = storage.MVCCInitPut(
ctx, readWriter, cArgs.Stats, args.Key, h.Timestamp, cArgs.Now, args.Value, args.FailOnTombstones, h.Txn)
ctx, readWriter, args.Key, h.Timestamp, args.Value, args.FailOnTombstones, opts)
}
// NB: even if MVCC returns an error, it may still have written an intent
// into the batch. This allows callers to consume errors like WriteTooOld
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_push_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func PushTxn(
// in the timestamp cache.
if ok {
txnRecord := reply.PusheeTxn.AsRecord()
if err := storage.MVCCPutProto(ctx, readWriter, cArgs.Stats, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil, &txnRecord); err != nil {
if err := storage.MVCCPutProto(ctx, readWriter, key, hlc.Timestamp{}, &txnRecord, storage.MVCCWriteOptions{Stats: cArgs.Stats}); err != nil {
return result.Result{}, err
}
}
Expand All @@ -333,7 +333,7 @@ func PushTxn(
// TODO(nvanbenschoten): remove this logic in v23.2.
if ok && !cArgs.EvalCtx.ClusterSettings().Version.IsActive(ctx, clusterversion.V23_1) {
txnRecord := reply.PusheeTxn.AsRecord()
if err := storage.MVCCPutProto(ctx, readWriter, cArgs.Stats, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil, &txnRecord); err != nil {
if err := storage.MVCCPutProto(ctx, readWriter, key, hlc.Timestamp{}, &txnRecord, storage.MVCCWriteOptions{Stats: cArgs.Stats}); err != nil {
return result.Result{}, err
}
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,23 @@ func Put(
) (result.Result, error) {
args := cArgs.Args.(*kvpb.PutRequest)
h := cArgs.Header
ms := cArgs.Stats

var ts hlc.Timestamp
if !args.Inline {
ts = h.Timestamp
}

opts := storage.MVCCWriteOptions{
Txn: h.Txn,
LocalTimestamp: cArgs.Now,
Stats: cArgs.Stats,
}

var err error
if args.Blind {
err = storage.MVCCBlindPut(ctx, readWriter, ms, args.Key, ts, cArgs.Now, args.Value, h.Txn)
err = storage.MVCCBlindPut(ctx, readWriter, args.Key, ts, args.Value, opts)
} else {
err = storage.MVCCPut(ctx, readWriter, ms, args.Key, ts, cArgs.Now, args.Value, h.Txn)
err = storage.MVCCPut(ctx, readWriter, args.Key, ts, args.Value, opts)
}
// NB: even if MVCC returns an error, it may still have written an intent
// into the batch. This allows callers to consume errors like WriteTooOld
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_query_intent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestQueryIntent(t *testing.T) {

writeIntent := func(k roachpb.Key, ts int64) roachpb.Transaction {
txn := roachpb.MakeTransaction("test", k, 0, makeTS(ts), 0, 1)
_, err := storage.MVCCDelete(ctx, db, nil, k, makeTS(ts), hlc.ClockTimestamp{}, &txn)
_, err := storage.MVCCDelete(ctx, db, k, makeTS(ts), storage.MVCCWriteOptions{Txn: &txn})
require.NoError(t, err)
return txn
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ func TestQueryResolvedTimestamp(t *testing.T) {
return hlc.Timestamp{WallTime: ts}
}
writeValue := func(k string, ts int64) {
_, err := storage.MVCCDelete(ctx, db, nil, roachpb.Key(k), makeTS(ts), hlc.ClockTimestamp{}, nil)
_, err := storage.MVCCDelete(ctx, db, roachpb.Key(k), makeTS(ts), storage.MVCCWriteOptions{})
require.NoError(t, err)
}
writeIntent := func(k string, ts int64) {
txn := roachpb.MakeTransaction("test", roachpb.Key(k), 0, makeTS(ts), 0, 1)
_, err := storage.MVCCDelete(ctx, db, nil, roachpb.Key(k), makeTS(ts), hlc.ClockTimestamp{}, &txn)
_, err := storage.MVCCDelete(ctx, db, roachpb.Key(k), makeTS(ts), storage.MVCCWriteOptions{Txn: &txn})
require.NoError(t, err)
}
writeInline := func(k string) {
_, err := storage.MVCCDelete(ctx, db, nil, roachpb.Key(k), hlc.Timestamp{}, hlc.ClockTimestamp{}, nil)
_, err := storage.MVCCDelete(ctx, db, roachpb.Key(k), hlc.Timestamp{}, storage.MVCCWriteOptions{})
require.NoError(t, err)
}

Expand Down
Loading

0 comments on commit eff2a5d

Please sign in to comment.