Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
129547: kv,storage: add new OriginTimestamp option to ConditionalPut r=tbg a=stevendanna

To improve the performance of the logical replication ingestion
processor, we would like to be able to manually generate KV batches
rather than writing via SQL.  However, we also only want to write an
ingested row if it is clearly newer than an existing row.

Two new ConditionaPutRequest parameters allow for this:

- `OriginTimestamp`: When set, any existing value must be older than
  this timestamp. The time comparison is made against the
  OriginTimestamp field in the values MVCCValueHeader if it is set or
  the keys MVCCTimestamp if it isn't.

- `ShouldWinOriginTimestampTie`: When true, indicates that the
  proposed value should be accepted even the inbound OriginTimestamp is
  exactly equal to the existing value's timestamp.

Additionally, two new error states have been added to
ConditionFailedError:

- `OriginTimestampOlderThan`: An error with this value set indicates
  that the ConditionalPutRequest failed because its OriginTimestamp was
  too old.

- `HadNewerOriginTimestamp`: When set, this indicates that while the
  expected value did not match the existing value, the provided origin
  timestamp was newer.

The expectation is that callers who receive an error with
OriginTimestampOlderThan may abort their transaction and not attempt a
retry because their proposed value is too old.

A caller who gets an error with HadNewerOriginTimestamp may choose to
abort their transaction but retry with the value provided in the
ActualValue field. Note a value expectation mismatch is likely for
our proposed caller who is constructing its expected values using
rangefeed events from another table.

Epic: none
Release note: None

130908: rac2: send-queue tracking in push mode r=kvoli a=sumeerbhola

The new raftEventForReplica encapsulates all that is needed to handle the RaftEvent, including (re)creating a replicaSendStream. The complexity is encapsulated in the construction of this event, which also pays attention to regressions, jumps, and other anomalies, all of which cause the existing replicaSendStream (if any) to be recreated. The recreaton is a significant simplification from the prototype, which was doing unnecessarily complicated work in fixing up the state.

The send-queue tracking is simple, [indexToSend, nextRaftIndex). Since there is a send-queue, we separately track the eval.tokensDeducted counts. Tracker.UntrackGE is no longer needed, which is another simplification.

Epic: CRDB-37515

Informs #123509

Release note: None

Co-authored-by: Steven Danna <[email protected]>
Co-authored-by: sumeerbhola <[email protected]>
Co-authored-by: Austen McClernon <[email protected]>
  • Loading branch information
4 people committed Sep 25, 2024
3 parents 1d38876 + dfbe17a + 70ba097 commit 6b5494d
Show file tree
Hide file tree
Showing 31 changed files with 2,432 additions and 789 deletions.
9 changes: 9 additions & 0 deletions pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2553,3 +2553,12 @@ func (writeOptions *WriteOptions) GetOriginID() uint32 {
}
return writeOptions.OriginID
}

func (r *ConditionalPutRequest) Validate() error {
if !r.OriginTimestamp.IsEmpty() {
if r.AllowIfDoesNotExist {
return errors.AssertionFailedf("invalid ConditionalPutRequest: AllowIfDoesNotExist and non-empty OriginTimestamp are incompatible")
}
}
return nil
}
35 changes: 34 additions & 1 deletion pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,39 @@ message ConditionalPutRequest {
// timestamp. This option should be used with care as it precludes
// the use of this value with transactions.
bool inline = 7;

// OriginTimestamp, if set, indicates that the ConditionalPut should
// only succeed if the provided timestamp is greater than the
// OriginTimestamp field in the MVCCValueHeader of the existing
// value (if the value has a non-zero OriginTimestamp) or the MVCC
// timestamp of the existing value. If there is no existing value,
// the timestamp check succeeds.
//
// The conditional put request will only succeed if this timestamp
// comparison is successful and the expected bytes matches the
// actual bytes.
//
// On success, this timestamp will also be stored in the
// OriginTimestamp field of the the MVCCValueHeader for the newly
// written value.
//
// In the proposed use case, the OriginTimestamp is the MVCC version
// timestamp of the row on the source cluster and the semantics lead
// to idempotent last-write-wins semantics against the "original
// MVCC timestamp" of the value on the destination, which is its
// MVCC timestamp or, if set, the OriginTimestamp (reflecting that
// this value originated from the source cluster).
//
// Used by logical data replication.
util.hlc.Timestamp origin_timestamp = 8 [(gogoproto.nullable) = false];

// ShouldWinOriginTimestampTie, if true, indicates that if the "comparison
// timestamp" (see comment on OriginTimestamp) and OriginTimestamp are equal,
// then the ConditionalPut should succeed (assuming the expected and actual
// bytes also match).
//
// This must only be used in conjunction with OriginTimestamp.
bool should_win_origin_timestamp_tie = 9;
}

// A ConditionalPutResponse is the return value from the
Expand Down Expand Up @@ -3213,7 +3246,7 @@ message RangeFeedRequest {
// OmitInRangefeeds = true, the write will not be emitted on the rangefeed.
// WithFiltering should NOT be set for system-table rangefeeds.
bool with_filtering = 7;

// WithMatchingOriginIDs specifies if the rangefeed server should emit events
// originating from specific clusters during Logical Data Replication. If this
// field is empty, all events are emitted.
Expand Down
8 changes: 7 additions & 1 deletion pkg/kv/kvpb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1186,7 +1186,13 @@ func (e *ConditionFailedError) Error() string {
}

func (e *ConditionFailedError) SafeFormatError(p errors.Printer) (next error) {
p.Printf("unexpected value: %s", e.ActualValue)
if e.HadNewerOriginTimestamp {
p.Printf("higher OriginTimestamp but unexpected value: %s", e.ActualValue)
} else if e.OriginTimestampOlderThan.IsSet() {
p.Printf("OriginTimestamp older than %s", e.OriginTimestampOlderThan)
} else {
p.Printf("unexpected value: %s", e.ActualValue)
}
return nil
}

Expand Down
16 changes: 15 additions & 1 deletion pkg/kv/kvpb/errors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,20 @@ message OpRequiresTxnError {
// contain the actual value found.
message ConditionFailedError {
optional roachpb.Value actual_value = 1;
// HadNewerOriginTimestamp is returned when an OriginTimestamp was
// sent in the conditional put, the origin timestamp was newer than
// the existing value's "comparison timestamp" (see comment for
// OriginTimestampOlderThan) but the expected value was
// mismatched. The caller may choose to try again using the returned
// ActualValue as the expected value.
optional bool had_newer_origin_timestamp = 2 [(gogoproto.nullable) = false];
// OriginTimestampOlderThan is returned when an OriginTimestamp was
// sent in the conditional put and it was older than the existing
// value's OriginTimestamp (if it exists) or MVCC timestamp.
//
// See the comment on the OriginTimestamp field of
// kvpb.ConditionalPutRequest for more details.
optional util.hlc.Timestamp origin_timestamp_older_than = 3 [(gogoproto.nullable) = false];
}

// A LeaseRejectedError indicates that the requested replica could
Expand Down Expand Up @@ -692,7 +706,7 @@ message RefreshFailedError {

// The timestamp the key was last updated.
optional util.hlc.Timestamp timestamp = 3 [(gogoproto.nullable) = false];

// The conflicting transaction's TxnMeta, if available. This field is used
// when the refresh fails with REASON_INTENT and is bubbled up for
// observability purposes to track the conflicting transaction. This field is
Expand Down
33 changes: 20 additions & 13 deletions pkg/kv/kvserver/batcheval/cmd_conditional_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,28 +58,35 @@ func ConditionalPut(
ts = h.Timestamp
}

handleMissing := storage.CPutMissingBehavior(args.AllowIfDoesNotExist)
if err := args.Validate(); err != nil {
return result.Result{}, err
}

opts := storage.MVCCWriteOptions{
Txn: h.Txn,
LocalTimestamp: cArgs.Now,
Stats: cArgs.Stats,
ReplayWriteTimestampProtection: h.AmbiguousReplayProtection,
OmitInRangefeeds: cArgs.OmitInRangefeeds,
OriginID: h.WriteOptions.GetOriginID(),
MaxLockConflicts: storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
TargetLockConflictBytes: storage.TargetBytesPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
Category: fs.BatchEvalReadCategory,
opts := storage.ConditionalPutWriteOptions{
MVCCWriteOptions: storage.MVCCWriteOptions{
Txn: h.Txn,
LocalTimestamp: cArgs.Now,
Stats: cArgs.Stats,
ReplayWriteTimestampProtection: h.AmbiguousReplayProtection,
OmitInRangefeeds: cArgs.OmitInRangefeeds,
OriginID: h.WriteOptions.GetOriginID(),
MaxLockConflicts: storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
TargetLockConflictBytes: storage.TargetBytesPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
Category: fs.BatchEvalReadCategory,
},
AllowIfDoesNotExist: storage.CPutMissingBehavior(args.AllowIfDoesNotExist),
OriginTimestamp: args.OriginTimestamp,
ShouldWinOriginTimestampTie: args.ShouldWinOriginTimestampTie,
}

var err error
var acq roachpb.LockAcquisition
if args.Blind {
acq, err = storage.MVCCBlindConditionalPut(
ctx, readWriter, args.Key, ts, args.Value, args.ExpBytes, handleMissing, opts)
ctx, readWriter, args.Key, ts, args.Value, args.ExpBytes, opts)
} else {
acq, err = storage.MVCCConditionalPut(
ctx, readWriter, args.Key, ts, args.Value, args.ExpBytes, handleMissing, opts)
ctx, readWriter, args.Key, ts, args.Value, args.ExpBytes, opts)
}
if err != nil {
return result.Result{}, err
Expand Down
Loading

0 comments on commit 6b5494d

Please sign in to comment.