Skip to content

Commit

Permalink
Merge #131055
Browse files Browse the repository at this point in the history
131055: batcheval: add OriginTimestamp to WriteOptions in batch header r=stevendanna a=msbutler

This patch is part of larger project for Logical Data Replication to set an
OriginTimestamp, via a sql session variable, to the MVCCValueHeader of each kv
written in that session. This option will be set on LDR's SQL write path, i.e.
ingestion via sql commands, while on LDRs kv write path, the OriginTimestamp
will get set via a new option specific to the KV API's ConditionalPut request.

Epic: none

Release note: none

Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
craig[bot] and msbutler committed Sep 26, 2024
2 parents 7e0f3ae + 04e6f80 commit 5dd2562
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 2 deletions.
7 changes: 7 additions & 0 deletions pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2554,6 +2554,13 @@ func (writeOptions *WriteOptions) GetOriginID() uint32 {
return writeOptions.OriginID
}

func (writeOptions *WriteOptions) GetOriginTimestamp() hlc.Timestamp {
if writeOptions == nil {
return hlc.Timestamp{}
}
return writeOptions.OriginTimestamp
}

func (r *ConditionalPutRequest) Validate() error {
if !r.OriginTimestamp.IsEmpty() {
if r.AllowIfDoesNotExist {
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3020,6 +3020,10 @@ message Header {

message WriteOptions {
uint32 origin_id = 1[(gogoproto.customname) = "OriginID"];
// OriginTimestamp is bound to the MVCCValueHeader of written key in the
// batch. Note that a kv client cannot set this if they use CPut's origin
// timestamp arg.
util.hlc.Timestamp origin_timestamp = 2 [(gogoproto.nullable) = false];
}

// BoundedStalenessHeader contains configuration values pertaining to bounded
Expand Down
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_conditional_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
)

func init() {
Expand Down Expand Up @@ -62,6 +63,14 @@ func ConditionalPut(
return result.Result{}, err
}

originTimestampForValueHeader := h.WriteOptions.GetOriginTimestamp()
if args.OriginTimestamp.IsSet() {
originTimestampForValueHeader = args.OriginTimestamp
}
if args.OriginTimestamp.IsSet() && h.WriteOptions.GetOriginTimestamp().IsSet() {
return result.Result{}, errors.AssertionFailedf("OriginTimestamp cannot be passed via CPut arg and in request header")
}

opts := storage.ConditionalPutWriteOptions{
MVCCWriteOptions: storage.MVCCWriteOptions{
Txn: h.Txn,
Expand All @@ -70,6 +79,7 @@ func ConditionalPut(
ReplayWriteTimestampProtection: h.AmbiguousReplayProtection,
OmitInRangefeeds: cArgs.OmitInRangefeeds,
OriginID: h.WriteOptions.GetOriginID(),
OriginTimestamp: originTimestampForValueHeader,
MaxLockConflicts: storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
TargetLockConflictBytes: storage.TargetBytesPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
Category: fs.BatchEvalReadCategory,
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/cmd_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func Delete(
ReplayWriteTimestampProtection: h.AmbiguousReplayProtection,
OmitInRangefeeds: cArgs.OmitInRangefeeds,
OriginID: h.WriteOptions.GetOriginID(),
OriginTimestamp: h.WriteOptions.GetOriginTimestamp(),
MaxLockConflicts: storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
TargetLockConflictBytes: storage.TargetBytesPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
Category: fs.BatchEvalReadCategory,
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/cmd_delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ func DeleteRange(
ReplayWriteTimestampProtection: h.AmbiguousReplayProtection,
OmitInRangefeeds: cArgs.OmitInRangefeeds,
OriginID: h.WriteOptions.GetOriginID(),
OriginTimestamp: h.WriteOptions.GetOriginTimestamp(),
MaxLockConflicts: storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
TargetLockConflictBytes: storage.TargetBytesPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
Category: fs.BatchEvalReadCategory,
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/cmd_increment.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func Increment(
ReplayWriteTimestampProtection: h.AmbiguousReplayProtection,
OmitInRangefeeds: cArgs.OmitInRangefeeds,
OriginID: h.WriteOptions.GetOriginID(),
OriginTimestamp: h.WriteOptions.GetOriginTimestamp(),
MaxLockConflicts: storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
TargetLockConflictBytes: storage.TargetBytesPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
Category: fs.BatchEvalReadCategory,
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/cmd_init_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func InitPut(
ReplayWriteTimestampProtection: h.AmbiguousReplayProtection,
OmitInRangefeeds: cArgs.OmitInRangefeeds,
OriginID: h.WriteOptions.GetOriginID(),
OriginTimestamp: h.WriteOptions.GetOriginTimestamp(),
MaxLockConflicts: storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
TargetLockConflictBytes: storage.TargetBytesPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
Category: fs.BatchEvalReadCategory,
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/cmd_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func Put(
ReplayWriteTimestampProtection: h.AmbiguousReplayProtection,
OmitInRangefeeds: cArgs.OmitInRangefeeds,
OriginID: h.WriteOptions.GetOriginID(),
OriginTimestamp: h.WriteOptions.GetOriginTimestamp(),
MaxLockConflicts: storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
TargetLockConflictBytes: storage.TargetBytesPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
Category: fs.BatchEvalReadCategory,
Expand Down
8 changes: 6 additions & 2 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4712,8 +4712,12 @@ type MVCCWriteOptions struct {
ReplayWriteTimestampProtection bool
OmitInRangefeeds bool
ImportEpoch uint32
OriginID uint32
OriginTimestamp hlc.Timestamp
// OriginID, when set during Logical Data Replication, will bind to the
// putting key's MVCCValueHeader.
OriginID uint32
// OriginTimestamp, when set during Logical Data Replication, will bind to the
// putting key's MVCCValueHeader.
OriginTimestamp hlc.Timestamp
// MaxLockConflicts is a maximum number of conflicting locks collected before
// returning LockConflictError. Even single-key writes can encounter multiple
// conflicting shared locks, so the limit is important to bound the number of
Expand Down

0 comments on commit 5dd2562

Please sign in to comment.