diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index 4d9907c39e4b..34faae32038b 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -205,8 +205,9 @@ func (r *Replica) tryReproposeWithNewLeaseIndex( minTS, untrack := r.store.cfg.ClosedTimestamp.Tracker.Track(ctx) defer untrack(ctx, 0, 0, 0) // covers all error paths below - // NB: p.Request.Timestamp reflects the action of ba.SetActiveTimestamp. - if p.Request.Timestamp.Less(minTS) { + // The ConsultsTimestampCache condition matches the similar logic for caring + // about the closed timestamp cache in applyTimestampCache(). + if p.Request.ConsultsTimestampCache() && p.Request.WriteTimestamp().LessEq(minTS) { // The tracker wants us to forward the request timestamp, but we can't // do that without re-evaluating, so give up. The error returned here // will go to back to DistSender, so send something it can digest. diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index e00c03a885bb..71a1e9aca0ca 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -166,7 +166,7 @@ func IsRange(args Request) bool { return (args.flags() & isRange) != 0 } -// ConsultsTimestampCache returns whether the command must consult +// ConsultsTimestampCache returns whether the request must consult // the timestamp cache to determine whether a mutation is safe at // a proposed timestamp or needs to move to a higher timestamp to // avoid re-writing history. diff --git a/pkg/roachpb/batch.go b/pkg/roachpb/batch.go index 08b78cdb61da..4b563b265bd7 100644 --- a/pkg/roachpb/batch.go +++ b/pkg/roachpb/batch.go @@ -74,6 +74,19 @@ func (ba BatchRequest) EarliestActiveTimestamp() hlc.Timestamp { return ts } +// WriteTimestamp returns the timestamps at which this request is writing. For +// non-transactional requests, this is the same as the read timestamp. For +// transactional requests, the write timestamp can be higher until commit time. +// +// This should only be called after SetActiveTimestamp(). +func (ba *BatchRequest) WriteTimestamp() hlc.Timestamp { + ts := ba.Timestamp + if ba.Txn != nil { + ts.Forward(ba.Txn.WriteTimestamp) + } + return ts +} + // UpdateTxn updates the batch transaction from the supplied one in // a copy-on-write fashion, i.e. without mutating an existing // Transaction struct. @@ -156,6 +169,14 @@ func (ba *BatchRequest) IsUnsplittable() bool { return ba.hasFlag(isUnsplittable) } +// ConsultsTimestampCache returns whether the request must consult +// the timestamp cache to determine whether a mutation is safe at +// a proposed timestamp or needs to move to a higher timestamp to +// avoid re-writing history. +func (ba *BatchRequest) ConsultsTimestampCache() bool { + return ba.hasFlag(consultsTSCache) +} + // IsSingleRequest returns true iff the BatchRequest contains a single request. func (ba *BatchRequest) IsSingleRequest() bool { return len(ba.Requests) == 1