From 04e6f809007775f2aeab9391165479af686a9641 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Mon, 26 Aug 2024 20:20:12 -0400 Subject: [PATCH] batcheval: add OriginTimestamp to WriteOptions in batch header This patch is part of larger project for Logical Data Replication to set an OriginTimestamp, via a sql session variable, to the MVCCValueHeader of each kv written in that session. This option will be set on LDR's SQL write path, i.e. ingestion via sql commands, while on LDRs kv write path, the OriginTimestamp will get set via a new option specific to the KV API's ConditionalPut request. Epic: none Release note: none --- pkg/kv/kvpb/api.go | 7 +++++++ pkg/kv/kvpb/api.proto | 4 ++++ pkg/kv/kvserver/batcheval/cmd_conditional_put.go | 10 ++++++++++ pkg/kv/kvserver/batcheval/cmd_delete.go | 1 + pkg/kv/kvserver/batcheval/cmd_delete_range.go | 1 + pkg/kv/kvserver/batcheval/cmd_increment.go | 1 + pkg/kv/kvserver/batcheval/cmd_init_put.go | 1 + pkg/kv/kvserver/batcheval/cmd_put.go | 1 + pkg/storage/mvcc.go | 8 ++++++-- 9 files changed, 32 insertions(+), 2 deletions(-) diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index 7a0aaeca3ee9..b10d9fa374df 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -2554,6 +2554,13 @@ func (writeOptions *WriteOptions) GetOriginID() uint32 { return writeOptions.OriginID } +func (writeOptions *WriteOptions) GetOriginTimestamp() hlc.Timestamp { + if writeOptions == nil { + return hlc.Timestamp{} + } + return writeOptions.OriginTimestamp +} + func (r *ConditionalPutRequest) Validate() error { if !r.OriginTimestamp.IsEmpty() { if r.AllowIfDoesNotExist { diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index 804aafd192d4..0bcf215ac4ce 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -3014,6 +3014,10 @@ message Header { message WriteOptions { uint32 origin_id = 1[(gogoproto.customname) = "OriginID"]; + // OriginTimestamp is bound to the MVCCValueHeader of written key in the + // batch. Note that a kv client cannot set this if they use CPut's origin + // timestamp arg. + util.hlc.Timestamp origin_timestamp = 2 [(gogoproto.nullable) = false]; } // BoundedStalenessHeader contains configuration values pertaining to bounded diff --git a/pkg/kv/kvserver/batcheval/cmd_conditional_put.go b/pkg/kv/kvserver/batcheval/cmd_conditional_put.go index 3fff9f82bb2f..3914e71d6424 100644 --- a/pkg/kv/kvserver/batcheval/cmd_conditional_put.go +++ b/pkg/kv/kvserver/batcheval/cmd_conditional_put.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" ) func init() { @@ -62,6 +63,14 @@ func ConditionalPut( return result.Result{}, err } + originTimestampForValueHeader := h.WriteOptions.GetOriginTimestamp() + if args.OriginTimestamp.IsSet() { + originTimestampForValueHeader = args.OriginTimestamp + } + if args.OriginTimestamp.IsSet() && h.WriteOptions.GetOriginTimestamp().IsSet() { + return result.Result{}, errors.AssertionFailedf("OriginTimestamp cannot be passed via CPut arg and in request header") + } + opts := storage.ConditionalPutWriteOptions{ MVCCWriteOptions: storage.MVCCWriteOptions{ Txn: h.Txn, @@ -70,6 +79,7 @@ func ConditionalPut( ReplayWriteTimestampProtection: h.AmbiguousReplayProtection, OmitInRangefeeds: cArgs.OmitInRangefeeds, OriginID: h.WriteOptions.GetOriginID(), + OriginTimestamp: originTimestampForValueHeader, MaxLockConflicts: storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), TargetLockConflictBytes: storage.TargetBytesPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), Category: fs.BatchEvalReadCategory, diff --git a/pkg/kv/kvserver/batcheval/cmd_delete.go b/pkg/kv/kvserver/batcheval/cmd_delete.go index 9e4289b5d928..e8cb2b5c0947 100644 --- a/pkg/kv/kvserver/batcheval/cmd_delete.go +++ b/pkg/kv/kvserver/batcheval/cmd_delete.go @@ -40,6 +40,7 @@ func Delete( ReplayWriteTimestampProtection: h.AmbiguousReplayProtection, OmitInRangefeeds: cArgs.OmitInRangefeeds, OriginID: h.WriteOptions.GetOriginID(), + OriginTimestamp: h.WriteOptions.GetOriginTimestamp(), MaxLockConflicts: storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), TargetLockConflictBytes: storage.TargetBytesPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), Category: fs.BatchEvalReadCategory, diff --git a/pkg/kv/kvserver/batcheval/cmd_delete_range.go b/pkg/kv/kvserver/batcheval/cmd_delete_range.go index cd3f04716b05..1367c988772e 100644 --- a/pkg/kv/kvserver/batcheval/cmd_delete_range.go +++ b/pkg/kv/kvserver/batcheval/cmd_delete_range.go @@ -246,6 +246,7 @@ func DeleteRange( ReplayWriteTimestampProtection: h.AmbiguousReplayProtection, OmitInRangefeeds: cArgs.OmitInRangefeeds, OriginID: h.WriteOptions.GetOriginID(), + OriginTimestamp: h.WriteOptions.GetOriginTimestamp(), MaxLockConflicts: storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), TargetLockConflictBytes: storage.TargetBytesPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), Category: fs.BatchEvalReadCategory, diff --git a/pkg/kv/kvserver/batcheval/cmd_increment.go b/pkg/kv/kvserver/batcheval/cmd_increment.go index a38dd3dc841f..fdabe6481bdd 100644 --- a/pkg/kv/kvserver/batcheval/cmd_increment.go +++ b/pkg/kv/kvserver/batcheval/cmd_increment.go @@ -41,6 +41,7 @@ func Increment( ReplayWriteTimestampProtection: h.AmbiguousReplayProtection, OmitInRangefeeds: cArgs.OmitInRangefeeds, OriginID: h.WriteOptions.GetOriginID(), + OriginTimestamp: h.WriteOptions.GetOriginTimestamp(), MaxLockConflicts: storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), TargetLockConflictBytes: storage.TargetBytesPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), Category: fs.BatchEvalReadCategory, diff --git a/pkg/kv/kvserver/batcheval/cmd_init_put.go b/pkg/kv/kvserver/batcheval/cmd_init_put.go index 7078dde6cae6..8352a321e445 100644 --- a/pkg/kv/kvserver/batcheval/cmd_init_put.go +++ b/pkg/kv/kvserver/batcheval/cmd_init_put.go @@ -42,6 +42,7 @@ func InitPut( ReplayWriteTimestampProtection: h.AmbiguousReplayProtection, OmitInRangefeeds: cArgs.OmitInRangefeeds, OriginID: h.WriteOptions.GetOriginID(), + OriginTimestamp: h.WriteOptions.GetOriginTimestamp(), MaxLockConflicts: storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), TargetLockConflictBytes: storage.TargetBytesPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), Category: fs.BatchEvalReadCategory, diff --git a/pkg/kv/kvserver/batcheval/cmd_put.go b/pkg/kv/kvserver/batcheval/cmd_put.go index 6a55ed8a0d9f..23385d3b0d56 100644 --- a/pkg/kv/kvserver/batcheval/cmd_put.go +++ b/pkg/kv/kvserver/batcheval/cmd_put.go @@ -63,6 +63,7 @@ func Put( ReplayWriteTimestampProtection: h.AmbiguousReplayProtection, OmitInRangefeeds: cArgs.OmitInRangefeeds, OriginID: h.WriteOptions.GetOriginID(), + OriginTimestamp: h.WriteOptions.GetOriginTimestamp(), MaxLockConflicts: storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), TargetLockConflictBytes: storage.TargetBytesPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), Category: fs.BatchEvalReadCategory, diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index f9c778502034..6255126f51b7 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -4712,8 +4712,12 @@ type MVCCWriteOptions struct { ReplayWriteTimestampProtection bool OmitInRangefeeds bool ImportEpoch uint32 - OriginID uint32 - OriginTimestamp hlc.Timestamp + // OriginID, when set during Logical Data Replication, will bind to the + // putting key's MVCCValueHeader. + OriginID uint32 + // OriginTimestamp, when set during Logical Data Replication, will bind to the + // putting key's MVCCValueHeader. + OriginTimestamp hlc.Timestamp // MaxLockConflicts is a maximum number of conflicting locks collected before // returning LockConflictError. Even single-key writes can encounter multiple // conflicting shared locks, so the limit is important to bound the number of