diff --git a/pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go b/pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go index 7aa9b3ff1d27..1c58660d9f3c 100644 --- a/pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go +++ b/pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go @@ -287,9 +287,10 @@ type ReplicatedEvalResult struct { IsLeaseRequest bool `protobuf:"varint,6,opt,name=is_lease_request,json=isLeaseRequest,proto3" json:"is_lease_request,omitempty"` // The timestamp at which this command is writing. Used to verify the validity // of the command against the GC threshold and to update the followers' - // clocks. If the request that produced this command is not an IntentWrite - // one, then the request's write timestamp is meaningless; for such request's, - // this field is simply a clock reading from the proposer. + // clocks. If the request that produced this command is not a write that cares + // about the timestamp cache, then the request's write timestamp is + // meaningless; for such request's, this field is simply a clock reading from + // the proposer. WriteTimestamp hlc.Timestamp `protobuf:"bytes,8,opt,name=write_timestamp,json=writeTimestamp,proto3" json:"write_timestamp"` // The stats delta corresponding to the data in this WriteBatch. On // a split, contains only the contributions to the left-hand side. diff --git a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto index 66979e208f89..91c8633d7008 100644 --- a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto +++ b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto @@ -127,9 +127,10 @@ message ReplicatedEvalResult { bool is_lease_request = 6; // The timestamp at which this command is writing. Used to verify the validity // of the command against the GC threshold and to update the followers' - // clocks. If the request that produced this command is not an IntentWrite - // one, then the request's write timestamp is meaningless; for such request's, - // this field is simply a clock reading from the proposer. + // clocks. If the request that produced this command is not a write that cares + // about the timestamp cache, then the request's write timestamp is + // meaningless; for such request's, this field is simply a clock reading from + // the proposer. util.hlc.Timestamp write_timestamp = 8 [(gogoproto.nullable) = false]; // The stats delta corresponding to the data in this WriteBatch. On // a split, contains only the contributions to the left-hand side. diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index 5512c3116af0..b97199232051 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -212,9 +212,7 @@ func (r *Replica) tryReproposeWithNewLeaseIndex( defer tok.DoneIfNotMoved(ctx) // NB: p.Request.Timestamp reflects the action of ba.SetActiveTimestamp. - // The IsIntentWrite condition matches the similar logic for caring - // about the closed timestamp cache in applyTimestampCache(). - if p.Request.IsIntentWrite() && p.Request.WriteTimestamp().LessEq(minTS) { + if p.Request.AppliesTimestampCache() && p.Request.WriteTimestamp().LessEq(minTS) { // The tracker wants us to forward the request timestamp, but we can't // do that without re-evaluating, so give up. The error returned here // will go to back to DistSender, so send something it can digest. diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index 369177418abf..2570f95fff3c 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -955,8 +955,8 @@ func (b *replicaAppBatch) Close() { var raftClosedTimestampAssertionsEnabled = envutil.EnvOrDefaultBool("COCKROACH_RAFT_CLOSEDTS_ASSERTIONS_ENABLED", true) // Assert that the current command is not writing under the closed timestamp. -// This check only applies to IntentWrite commands, since others (for example, -// EndTxn) can operate below the closed timestamp. +// This check only applies to certain write commands, mainly IsIntentWrite, +// since others (for example, EndTxn) can operate below the closed timestamp. // // Note that we check that we're we're writing under b.state.RaftClosedTimestamp // (i.e. below the timestamp closed by previous commands), not below @@ -964,7 +964,7 @@ var raftClosedTimestampAssertionsEnabled = envutil.EnvOrDefaultBool("COCKROACH_R // timestamp carried by itself; in other words cmd.raftCmd.ClosedTimestamp is a // promise about future commands, not the command carrying it. func (b *replicaAppBatch) assertNoWriteBelowClosedTimestamp(cmd *replicatedCmd) error { - if !cmd.IsLocal() || !cmd.proposal.Request.IsIntentWrite() { + if !cmd.IsLocal() || !cmd.proposal.Request.AppliesTimestampCache() { return nil } if !raftClosedTimestampAssertionsEnabled { diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index bc1ecfcce31f..866747662253 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -841,7 +841,7 @@ func (r *Replica) evaluateProposal( // Set the proposal's replicated result, which contains metadata and // side-effects that are to be replicated to all replicas. res.Replicated.IsLeaseRequest = ba.IsLeaseRequest() - if ba.IsIntentWrite() { + if ba.AppliesTimestampCache() { res.Replicated.WriteTimestamp = ba.WriteTimestamp() } else { // For misc requests, use WriteTimestamp to propagate a clock signal. This diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index 964cbc067ce2..28c989a9ac35 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -475,7 +475,7 @@ func (b *propBuf) FlushLockedWithRaftGroup( b.p.registerProposalLocked(p) // Exit the tracker. - if !reproposal && p.Request.IsIntentWrite() { + if !reproposal && p.Request.AppliesTimestampCache() { // Sanity check that the request is tracked by the evaluation tracker at // this point. It's supposed to be tracked until the // doneIfNotMovedLocked() call below. @@ -671,7 +671,8 @@ func (b *propBuf) allocateLAIAndClosedTimestampLocked( // evaluating (instead, assignedClosedTimestamp was supposed to have bumped // the write timestamp of any request the began evaluating after it was // set). - if p.Request.WriteTimestamp().Less(b.assignedClosedTimestamp) && p.Request.IsIntentWrite() { + if p.Request.WriteTimestamp().Less(b.assignedClosedTimestamp) && + p.Request.AppliesTimestampCache() { log.Fatalf(ctx, "%v", errorutil.UnexpectedWithIssueErrorf(72428, "attempting to propose command writing below closed timestamp. "+ "wts: %s < assigned closed: %s; ba: %s; lease: %s.", diff --git a/pkg/kv/kvserver/replica_tscache.go b/pkg/kv/kvserver/replica_tscache.go index 920281098309..9bd13f4f60f9 100644 --- a/pkg/kv/kvserver/replica_tscache.go +++ b/pkg/kv/kvserver/replica_tscache.go @@ -300,7 +300,7 @@ func (r *Replica) applyTimestampCache( for _, union := range ba.Requests { args := union.GetInner() - if roachpb.IsIntentWrite(args) { + if roachpb.AppliesTimestampCache(args) { header := args.Header() // Forward the timestamp if there's been a more recent read (by someone else). diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 14b62aba6f44..6b1953bedd4c 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -115,7 +115,7 @@ func (r *Replica) executeWriteBatch( // starts being tracked after we apply the timestamp cache. var minTS hlc.Timestamp var tok TrackedRequestToken - if ba.IsIntentWrite() { + if ba.AppliesTimestampCache() { minTS, tok = r.mu.proposalBuf.TrackEvaluatingRequest(ctx, ba.WriteTimestamp()) } defer tok.DoneIfNotMoved(ctx) diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 712a5a8f8a0f..8a0d7f601776 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -86,6 +86,7 @@ const ( isPrefix // requests which, in a batch, must not be split from the following request isUnsplittable // range command that must not be split during sending skipsLeaseCheck // commands which skip the check that the evaluating replica has a valid lease + appliesTSCache // commands which apply the timestamp cache and closed timestamp updatesTSCache // commands which update the timestamp cache updatesTSCacheOnErr // commands which make read data available on errors needsRefresh // commands which require refreshes to avoid serializable retries @@ -97,6 +98,7 @@ var flagDependencies = map[flag][]flag{ isAdmin: {isAlone}, isLocking: {isTxn}, isIntentWrite: {isWrite, isLocking}, + appliesTSCache: {isWrite}, skipsLeaseCheck: {isAlone}, } @@ -164,6 +166,13 @@ func IsRange(args Request) bool { return (args.flags() & isRange) != 0 } +// AppliesTimestampCache returns whether the command is a write that applies the +// timestamp cache (and closed timestamp), possibly pushing its write timestamp +// into the future to avoid re-writing history. +func AppliesTimestampCache(args Request) bool { + return (args.flags() & appliesTSCache) != 0 +} + // UpdatesTimestampCache returns whether the command must update // the timestamp cache in order to set a low water mark for the // timestamp at which mutations to overlapping key(s) can write @@ -1188,7 +1197,7 @@ func (gr *GetRequest) flags() flag { } func (*PutRequest) flags() flag { - return isWrite | isTxn | isLocking | isIntentWrite | canBackpressure + return isWrite | isTxn | isLocking | isIntentWrite | appliesTSCache | canBackpressure } // ConditionalPut effectively reads without writing if it hits a @@ -1197,7 +1206,8 @@ func (*PutRequest) flags() flag { // they return an error immediately instead of continuing a serializable // transaction to be retried at end transaction. func (*ConditionalPutRequest) flags() flag { - return isRead | isWrite | isTxn | isLocking | isIntentWrite | updatesTSCache | updatesTSCacheOnErr | canBackpressure + return isRead | isWrite | isTxn | isLocking | isIntentWrite | + appliesTSCache | updatesTSCache | updatesTSCacheOnErr | canBackpressure } // InitPut, like ConditionalPut, effectively reads without writing if it hits a @@ -1206,7 +1216,8 @@ func (*ConditionalPutRequest) flags() flag { // return an error immediately instead of continuing a serializable transaction // to be retried at end transaction. func (*InitPutRequest) flags() flag { - return isRead | isWrite | isTxn | isLocking | isIntentWrite | updatesTSCache | updatesTSCacheOnErr | canBackpressure + return isRead | isWrite | isTxn | isLocking | isIntentWrite | + appliesTSCache | updatesTSCache | updatesTSCacheOnErr | canBackpressure } // Increment reads the existing value, but always leaves an intent so @@ -1215,11 +1226,11 @@ func (*InitPutRequest) flags() flag { // error immediately instead of continuing a serializable transaction // to be retried at end transaction. func (*IncrementRequest) flags() flag { - return isRead | isWrite | isTxn | isLocking | isIntentWrite | canBackpressure + return isRead | isWrite | isTxn | isLocking | isIntentWrite | appliesTSCache | canBackpressure } func (*DeleteRequest) flags() flag { - return isWrite | isTxn | isLocking | isIntentWrite | canBackpressure + return isWrite | isTxn | isLocking | isIntentWrite | appliesTSCache | canBackpressure } func (drr *DeleteRangeRequest) flags() flag { @@ -1246,7 +1257,8 @@ func (drr *DeleteRangeRequest) flags() flag { // it. Note that, even if we didn't update the ts cache, deletes of keys // that exist would not be lost (since the DeleteRange leaves intents on // those keys), but deletes of "empty space" would. - return isRead | isWrite | isTxn | isLocking | isIntentWrite | isRange | updatesTSCache | needsRefresh | canBackpressure + return isRead | isWrite | isTxn | isLocking | isIntentWrite | isRange | + appliesTSCache | updatesTSCache | needsRefresh | canBackpressure } // Note that ClearRange commands cannot be part of a transaction as diff --git a/pkg/roachpb/batch.go b/pkg/roachpb/batch.go index b31ac9203e6e..4a4a5aa8073f 100644 --- a/pkg/roachpb/batch.go +++ b/pkg/roachpb/batch.go @@ -133,6 +133,13 @@ func (ba *BatchRequest) IsLeaseRequest() bool { return ok } +// AppliesTimestampCache returns whether the command is a write that applies the +// timestamp cache (and closed timestamp), possibly pushing its write timestamp +// into the future to avoid re-writing history. +func (ba *BatchRequest) AppliesTimestampCache() bool { + return ba.hasFlag(appliesTSCache) +} + // IsAdmin returns true iff the BatchRequest contains an admin request. func (ba *BatchRequest) IsAdmin() bool { return ba.hasFlag(isAdmin)