Skip to content

Commit

Permalink
roachpb: add appliesTSCache request flag
Browse files Browse the repository at this point in the history
Previously, `isIntentWrite` determined whether a request checked the
timestamp cache and possibly pushed its timestamp. However, some
requests may want to check the timestamp cache without writing intents,
notably an MVCC-compliant `AddSSTable` request.

This patch introduces a new flag `appliesTSCache`, and uses it as a
condition for applying the timestamp cache to the request.

Release note: None
  • Loading branch information
erikgrinaker committed Nov 25, 2021
1 parent 35ceaee commit 529b785
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 23 deletions.
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions pkg/kv/kvserver/kvserverpb/proposer_kv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,10 @@ message ReplicatedEvalResult {
bool is_lease_request = 6;
// The timestamp at which this command is writing. Used to verify the validity
// of the command against the GC threshold and to update the followers'
// clocks. If the request that produced this command is not an IntentWrite
// one, then the request's write timestamp is meaningless; for such request's,
// this field is simply a clock reading from the proposer.
// clocks. If the request that produced this command is not a write that cares
// about the timestamp cache, then the request's write timestamp is
// meaningless; for such request's, this field is simply a clock reading from
// the proposer.
util.hlc.Timestamp write_timestamp = 8 [(gogoproto.nullable) = false];
// The stats delta corresponding to the data in this WriteBatch. On
// a split, contains only the contributions to the left-hand side.
Expand Down
4 changes: 1 addition & 3 deletions pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,7 @@ func (r *Replica) tryReproposeWithNewLeaseIndex(
defer tok.DoneIfNotMoved(ctx)

// NB: p.Request.Timestamp reflects the action of ba.SetActiveTimestamp.
// The IsIntentWrite condition matches the similar logic for caring
// about the closed timestamp cache in applyTimestampCache().
if p.Request.IsIntentWrite() && p.Request.WriteTimestamp().LessEq(minTS) {
if p.Request.AppliesTimestampCache() && p.Request.WriteTimestamp().LessEq(minTS) {
// The tracker wants us to forward the request timestamp, but we can't
// do that without re-evaluating, so give up. The error returned here
// will go to back to DistSender, so send something it can digest.
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -955,16 +955,16 @@ func (b *replicaAppBatch) Close() {
var raftClosedTimestampAssertionsEnabled = envutil.EnvOrDefaultBool("COCKROACH_RAFT_CLOSEDTS_ASSERTIONS_ENABLED", true)

// Assert that the current command is not writing under the closed timestamp.
// This check only applies to IntentWrite commands, since others (for example,
// EndTxn) can operate below the closed timestamp.
// This check only applies to certain write commands, mainly IsIntentWrite,
// since others (for example, EndTxn) can operate below the closed timestamp.
//
// Note that we check that we're we're writing under b.state.RaftClosedTimestamp
// (i.e. below the timestamp closed by previous commands), not below
// cmd.raftCmd.ClosedTimestamp. A command is allowed to write below the closed
// timestamp carried by itself; in other words cmd.raftCmd.ClosedTimestamp is a
// promise about future commands, not the command carrying it.
func (b *replicaAppBatch) assertNoWriteBelowClosedTimestamp(cmd *replicatedCmd) error {
if !cmd.IsLocal() || !cmd.proposal.Request.IsIntentWrite() {
if !cmd.IsLocal() || !cmd.proposal.Request.AppliesTimestampCache() {
return nil
}
if !raftClosedTimestampAssertionsEnabled {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ func (r *Replica) evaluateProposal(
// Set the proposal's replicated result, which contains metadata and
// side-effects that are to be replicated to all replicas.
res.Replicated.IsLeaseRequest = ba.IsLeaseRequest()
if ba.IsIntentWrite() {
if ba.AppliesTimestampCache() {
res.Replicated.WriteTimestamp = ba.WriteTimestamp()
} else {
// For misc requests, use WriteTimestamp to propagate a clock signal. This
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func (b *propBuf) FlushLockedWithRaftGroup(
b.p.registerProposalLocked(p)

// Exit the tracker.
if !reproposal && p.Request.IsIntentWrite() {
if !reproposal && p.Request.AppliesTimestampCache() {
// Sanity check that the request is tracked by the evaluation tracker at
// this point. It's supposed to be tracked until the
// doneIfNotMovedLocked() call below.
Expand Down Expand Up @@ -671,7 +671,8 @@ func (b *propBuf) allocateLAIAndClosedTimestampLocked(
// evaluating (instead, assignedClosedTimestamp was supposed to have bumped
// the write timestamp of any request the began evaluating after it was
// set).
if p.Request.WriteTimestamp().Less(b.assignedClosedTimestamp) && p.Request.IsIntentWrite() {
if p.Request.WriteTimestamp().Less(b.assignedClosedTimestamp) &&
p.Request.AppliesTimestampCache() {
log.Fatalf(ctx, "%v", errorutil.UnexpectedWithIssueErrorf(72428,
"attempting to propose command writing below closed timestamp. "+
"wts: %s < assigned closed: %s; ba: %s; lease: %s.",
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_tscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func (r *Replica) applyTimestampCache(

for _, union := range ba.Requests {
args := union.GetInner()
if roachpb.IsIntentWrite(args) {
if roachpb.AppliesTimestampCache(args) {
header := args.Header()

// Forward the timestamp if there's been a more recent read (by someone else).
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (r *Replica) executeWriteBatch(
// starts being tracked after we apply the timestamp cache.
var minTS hlc.Timestamp
var tok TrackedRequestToken
if ba.IsIntentWrite() {
if ba.AppliesTimestampCache() {
minTS, tok = r.mu.proposalBuf.TrackEvaluatingRequest(ctx, ba.WriteTimestamp())
}
defer tok.DoneIfNotMoved(ctx)
Expand Down
24 changes: 18 additions & 6 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ const (
isPrefix // requests which, in a batch, must not be split from the following request
isUnsplittable // range command that must not be split during sending
skipsLeaseCheck // commands which skip the check that the evaluating replica has a valid lease
appliesTSCache // commands which apply the timestamp cache and closed timestamp
updatesTSCache // commands which update the timestamp cache
updatesTSCacheOnErr // commands which make read data available on errors
needsRefresh // commands which require refreshes to avoid serializable retries
Expand All @@ -97,6 +98,7 @@ var flagDependencies = map[flag][]flag{
isAdmin: {isAlone},
isLocking: {isTxn},
isIntentWrite: {isWrite, isLocking},
appliesTSCache: {isWrite},
skipsLeaseCheck: {isAlone},
}

Expand Down Expand Up @@ -164,6 +166,13 @@ func IsRange(args Request) bool {
return (args.flags() & isRange) != 0
}

// AppliesTimestampCache returns whether the command is a write that applies the
// timestamp cache (and closed timestamp), possibly pushing its write timestamp
// into the future to avoid re-writing history.
func AppliesTimestampCache(args Request) bool {
return (args.flags() & appliesTSCache) != 0
}

// UpdatesTimestampCache returns whether the command must update
// the timestamp cache in order to set a low water mark for the
// timestamp at which mutations to overlapping key(s) can write
Expand Down Expand Up @@ -1188,7 +1197,7 @@ func (gr *GetRequest) flags() flag {
}

func (*PutRequest) flags() flag {
return isWrite | isTxn | isLocking | isIntentWrite | canBackpressure
return isWrite | isTxn | isLocking | isIntentWrite | appliesTSCache | canBackpressure
}

// ConditionalPut effectively reads without writing if it hits a
Expand All @@ -1197,7 +1206,8 @@ func (*PutRequest) flags() flag {
// they return an error immediately instead of continuing a serializable
// transaction to be retried at end transaction.
func (*ConditionalPutRequest) flags() flag {
return isRead | isWrite | isTxn | isLocking | isIntentWrite | updatesTSCache | updatesTSCacheOnErr | canBackpressure
return isRead | isWrite | isTxn | isLocking | isIntentWrite |
appliesTSCache | updatesTSCache | updatesTSCacheOnErr | canBackpressure
}

// InitPut, like ConditionalPut, effectively reads without writing if it hits a
Expand All @@ -1206,7 +1216,8 @@ func (*ConditionalPutRequest) flags() flag {
// return an error immediately instead of continuing a serializable transaction
// to be retried at end transaction.
func (*InitPutRequest) flags() flag {
return isRead | isWrite | isTxn | isLocking | isIntentWrite | updatesTSCache | updatesTSCacheOnErr | canBackpressure
return isRead | isWrite | isTxn | isLocking | isIntentWrite |
appliesTSCache | updatesTSCache | updatesTSCacheOnErr | canBackpressure
}

// Increment reads the existing value, but always leaves an intent so
Expand All @@ -1215,11 +1226,11 @@ func (*InitPutRequest) flags() flag {
// error immediately instead of continuing a serializable transaction
// to be retried at end transaction.
func (*IncrementRequest) flags() flag {
return isRead | isWrite | isTxn | isLocking | isIntentWrite | canBackpressure
return isRead | isWrite | isTxn | isLocking | isIntentWrite | appliesTSCache | canBackpressure
}

func (*DeleteRequest) flags() flag {
return isWrite | isTxn | isLocking | isIntentWrite | canBackpressure
return isWrite | isTxn | isLocking | isIntentWrite | appliesTSCache | canBackpressure
}

func (drr *DeleteRangeRequest) flags() flag {
Expand All @@ -1246,7 +1257,8 @@ func (drr *DeleteRangeRequest) flags() flag {
// it. Note that, even if we didn't update the ts cache, deletes of keys
// that exist would not be lost (since the DeleteRange leaves intents on
// those keys), but deletes of "empty space" would.
return isRead | isWrite | isTxn | isLocking | isIntentWrite | isRange | updatesTSCache | needsRefresh | canBackpressure
return isRead | isWrite | isTxn | isLocking | isIntentWrite | isRange |
appliesTSCache | updatesTSCache | needsRefresh | canBackpressure
}

// Note that ClearRange commands cannot be part of a transaction as
Expand Down
7 changes: 7 additions & 0 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,13 @@ func (ba *BatchRequest) IsLeaseRequest() bool {
return ok
}

// AppliesTimestampCache returns whether the command is a write that applies the
// timestamp cache (and closed timestamp), possibly pushing its write timestamp
// into the future to avoid re-writing history.
func (ba *BatchRequest) AppliesTimestampCache() bool {
return ba.hasFlag(appliesTSCache)
}

// IsAdmin returns true iff the BatchRequest contains an admin request.
func (ba *BatchRequest) IsAdmin() bool {
return ba.hasFlag(isAdmin)
Expand Down

0 comments on commit 529b785

Please sign in to comment.