diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index 7a0aaeca3ee9..b10d9fa374df 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -2554,6 +2554,13 @@ func (writeOptions *WriteOptions) GetOriginID() uint32 { return writeOptions.OriginID } +func (writeOptions *WriteOptions) GetOriginTimestamp() hlc.Timestamp { + if writeOptions == nil { + return hlc.Timestamp{} + } + return writeOptions.OriginTimestamp +} + func (r *ConditionalPutRequest) Validate() error { if !r.OriginTimestamp.IsEmpty() { if r.AllowIfDoesNotExist { diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index 3814dfc9af5e..1caf71d0c7f7 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -3020,6 +3020,10 @@ message Header { message WriteOptions { uint32 origin_id = 1[(gogoproto.customname) = "OriginID"]; + // OriginTimestamp is bound to the MVCCValueHeader of written key in the + // batch. Note that a kv client cannot set this if they use CPut's origin + // timestamp arg. + util.hlc.Timestamp origin_timestamp = 2 [(gogoproto.nullable) = false]; } // BoundedStalenessHeader contains configuration values pertaining to bounded diff --git a/pkg/kv/kvserver/batcheval/cmd_conditional_put.go b/pkg/kv/kvserver/batcheval/cmd_conditional_put.go index 3fff9f82bb2f..3914e71d6424 100644 --- a/pkg/kv/kvserver/batcheval/cmd_conditional_put.go +++ b/pkg/kv/kvserver/batcheval/cmd_conditional_put.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" ) func init() { @@ -62,6 +63,14 @@ func ConditionalPut( return result.Result{}, err } + originTimestampForValueHeader := h.WriteOptions.GetOriginTimestamp() + if args.OriginTimestamp.IsSet() { + originTimestampForValueHeader = args.OriginTimestamp + } + if args.OriginTimestamp.IsSet() && h.WriteOptions.GetOriginTimestamp().IsSet() { + return result.Result{}, errors.AssertionFailedf("OriginTimestamp cannot be passed via CPut arg and in request header") + } + opts := storage.ConditionalPutWriteOptions{ MVCCWriteOptions: storage.MVCCWriteOptions{ Txn: h.Txn, @@ -70,6 +79,7 @@ func ConditionalPut( ReplayWriteTimestampProtection: h.AmbiguousReplayProtection, OmitInRangefeeds: cArgs.OmitInRangefeeds, OriginID: h.WriteOptions.GetOriginID(), + OriginTimestamp: originTimestampForValueHeader, MaxLockConflicts: storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), TargetLockConflictBytes: storage.TargetBytesPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), Category: fs.BatchEvalReadCategory, diff --git a/pkg/kv/kvserver/batcheval/cmd_delete.go b/pkg/kv/kvserver/batcheval/cmd_delete.go index 9e4289b5d928..e8cb2b5c0947 100644 --- a/pkg/kv/kvserver/batcheval/cmd_delete.go +++ b/pkg/kv/kvserver/batcheval/cmd_delete.go @@ -40,6 +40,7 @@ func Delete( ReplayWriteTimestampProtection: h.AmbiguousReplayProtection, OmitInRangefeeds: cArgs.OmitInRangefeeds, OriginID: h.WriteOptions.GetOriginID(), + OriginTimestamp: h.WriteOptions.GetOriginTimestamp(), MaxLockConflicts: storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), TargetLockConflictBytes: storage.TargetBytesPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), Category: fs.BatchEvalReadCategory, diff --git a/pkg/kv/kvserver/batcheval/cmd_delete_range.go b/pkg/kv/kvserver/batcheval/cmd_delete_range.go index cd3f04716b05..1367c988772e 100644 --- a/pkg/kv/kvserver/batcheval/cmd_delete_range.go +++ b/pkg/kv/kvserver/batcheval/cmd_delete_range.go @@ -246,6 +246,7 @@ func DeleteRange( ReplayWriteTimestampProtection: h.AmbiguousReplayProtection, OmitInRangefeeds: cArgs.OmitInRangefeeds, OriginID: h.WriteOptions.GetOriginID(), + OriginTimestamp: h.WriteOptions.GetOriginTimestamp(), MaxLockConflicts: storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), TargetLockConflictBytes: storage.TargetBytesPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), Category: fs.BatchEvalReadCategory, diff --git a/pkg/kv/kvserver/batcheval/cmd_increment.go b/pkg/kv/kvserver/batcheval/cmd_increment.go index a38dd3dc841f..fdabe6481bdd 100644 --- a/pkg/kv/kvserver/batcheval/cmd_increment.go +++ b/pkg/kv/kvserver/batcheval/cmd_increment.go @@ -41,6 +41,7 @@ func Increment( ReplayWriteTimestampProtection: h.AmbiguousReplayProtection, OmitInRangefeeds: cArgs.OmitInRangefeeds, OriginID: h.WriteOptions.GetOriginID(), + OriginTimestamp: h.WriteOptions.GetOriginTimestamp(), MaxLockConflicts: storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), TargetLockConflictBytes: storage.TargetBytesPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), Category: fs.BatchEvalReadCategory, diff --git a/pkg/kv/kvserver/batcheval/cmd_init_put.go b/pkg/kv/kvserver/batcheval/cmd_init_put.go index 7078dde6cae6..8352a321e445 100644 --- a/pkg/kv/kvserver/batcheval/cmd_init_put.go +++ b/pkg/kv/kvserver/batcheval/cmd_init_put.go @@ -42,6 +42,7 @@ func InitPut( ReplayWriteTimestampProtection: h.AmbiguousReplayProtection, OmitInRangefeeds: cArgs.OmitInRangefeeds, OriginID: h.WriteOptions.GetOriginID(), + OriginTimestamp: h.WriteOptions.GetOriginTimestamp(), MaxLockConflicts: storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), TargetLockConflictBytes: storage.TargetBytesPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), Category: fs.BatchEvalReadCategory, diff --git a/pkg/kv/kvserver/batcheval/cmd_put.go b/pkg/kv/kvserver/batcheval/cmd_put.go index 6a55ed8a0d9f..23385d3b0d56 100644 --- a/pkg/kv/kvserver/batcheval/cmd_put.go +++ b/pkg/kv/kvserver/batcheval/cmd_put.go @@ -63,6 +63,7 @@ func Put( ReplayWriteTimestampProtection: h.AmbiguousReplayProtection, OmitInRangefeeds: cArgs.OmitInRangefeeds, OriginID: h.WriteOptions.GetOriginID(), + OriginTimestamp: h.WriteOptions.GetOriginTimestamp(), MaxLockConflicts: storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), TargetLockConflictBytes: storage.TargetBytesPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), Category: fs.BatchEvalReadCategory, diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index f9c778502034..6255126f51b7 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -4712,8 +4712,12 @@ type MVCCWriteOptions struct { ReplayWriteTimestampProtection bool OmitInRangefeeds bool ImportEpoch uint32 - OriginID uint32 - OriginTimestamp hlc.Timestamp + // OriginID, when set during Logical Data Replication, will bind to the + // putting key's MVCCValueHeader. + OriginID uint32 + // OriginTimestamp, when set during Logical Data Replication, will bind to the + // putting key's MVCCValueHeader. + OriginTimestamp hlc.Timestamp // MaxLockConflicts is a maximum number of conflicting locks collected before // returning LockConflictError. Even single-key writes can encounter multiple // conflicting shared locks, so the limit is important to bound the number of