Skip to content

Commit

Permalink
kvserver: switch meaning of RER.Timestamp
Browse files Browse the repository at this point in the history
The ReplicatedEvalResult.Timestamp used to carry the read timestamp of
the request that proposed the respective command. This patch renames it
to WriteTimestamp and sets it accordingly.

The field is used for two purposes: for bumping the clock on followers
such that they don't have values above their clock, and for checking
that we're not writing below the GC threshold. Both of these use cases
actually want the write timestamps, so they were broken. The second use
seems dubious to me (I don't think it's actually needed) but this patch
does not take a position on it beyond adding a TODO.

Beyond fixing the existing uses of this field, putting the write
timestamp in every Raft command has the added benefit that we can use it
to assert below Raft that nobody tries to write below the closed
timestamp. Before this patch, only the leaseholder was able to do this
assertion (because it was the only replica that had access to the write
timestamp) and so the leaseholder had to do it as a non-deterministic
error (and non-deterministic errors are nasty, even when they're
assertion failures). The assertion is changed in the next commit.

In addition, by clarifying the meaning of this field, this patch opens
the door for cockroachdb#62569 to muddy the field back and give it a special
meaning for lease transfers - the lease start time.

Also mentioning cockroachdb#55293 because this patch lightly touches on the
interaction between requests and the GC threshold.

Release note: None
  • Loading branch information
andreimatei committed Apr 2, 2021
1 parent 832bc80 commit aa54820
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 140 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4702,7 +4702,7 @@ func TestAckWriteBeforeApplication(t *testing.T) {
blockPreApplication, blockPostApplication := make(chan struct{}), make(chan struct{})
applyFilterFn := func(ch chan struct{}) kvserverbase.ReplicaApplyFilter {
return func(filterArgs kvserverbase.ApplyFilterArgs) (int, *roachpb.Error) {
if atomic.LoadInt32(&filterActive) == 1 && filterArgs.Timestamp == magicTS {
if atomic.LoadInt32(&filterActive) == 1 && filterArgs.WriteTimestamp == magicTS {
<-ch
}
return 0, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3341,7 +3341,7 @@ func TestProposalOverhead(t *testing.T) {
}
// Sometime the logical portion of the timestamp can be non-zero which makes
// the overhead non-deterministic.
args.Cmd.ReplicatedEvalResult.Timestamp.Logical = 0
args.Cmd.ReplicatedEvalResult.WriteTimestamp.Logical = 0
atomic.StoreUint32(&overhead, uint32(args.Cmd.Size()-args.Cmd.WriteBatch.Size()))
// We don't want to print the WriteBatch because it's explicitly
// excluded from the size computation. Nil'ing it out does not
Expand Down
244 changes: 122 additions & 122 deletions pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions pkg/kv/kvserver/kvserverpb/proposer_kv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ message ReplicatedEvalResult {
Merge merge = 4;
ComputeChecksum compute_checksum = 21;
bool is_lease_request = 6;
// The BatchRequest.Timestamp of the request that produced this command. Used
// to verify the validity of the command against the GC threshold and to
// update the followers' clocks.
util.hlc.Timestamp timestamp = 8 [(gogoproto.nullable) = false];
// The timestamp at which this command is writing. Used to verify the validity
// of the command against the GC threshold and to update the followers'
// clocks.
util.hlc.Timestamp write_timestamp = 8 [(gogoproto.nullable) = false];
// The stats delta corresponding to the data in this WriteBatch. On
// a split, contains only the contributions to the left-hand side.
storage.enginepb.MVCCStats deprecated_delta = 10; // See #18828
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1361,8 +1361,8 @@ func (r *Replica) checkSpanInRangeRLocked(ctx context.Context, rspan roachpb.RSp
)
}

// checkTSAboveGCThresholdRLocked returns an error if a request (identified
// by its MVCC timestamp) can be run on the replica.
// checkTSAboveGCThresholdRLocked returns an error if a request (identified by
// its read timestamp) wants to read below the range's GC threshold.
func (r *Replica) checkTSAboveGCThresholdRLocked(
ts hlc.Timestamp, st kvserverpb.LeaseStatus, isAdmin bool,
) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func isTrivial(r *kvserverpb.ReplicatedEvalResult) bool {
// it is trivial.
allowlist := *r
allowlist.Delta = enginepb.MVCCStatsDelta{}
allowlist.Timestamp = hlc.Timestamp{}
allowlist.WriteTimestamp = hlc.Timestamp{}
allowlist.DeprecatedDelta = nil
allowlist.PrevLeaseProposal = nil
allowlist.State = nil
Expand All @@ -85,7 +85,7 @@ func clearTrivialReplicatedEvalResultFields(r *kvserverpb.ReplicatedEvalResult)
// they don't trigger an assertion at the end of the application process
// (which checks that all fields were handled).
r.IsLeaseRequest = false
r.Timestamp = hlc.Timestamp{}
r.WriteTimestamp = hlc.Timestamp{}
r.PrevLeaseProposal = nil
// The state fields cleared here were already applied to the in-memory view of
// replica state for this batch.
Expand Down
15 changes: 9 additions & 6 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,16 +318,19 @@ func checkForcedErr(
)
}

// Verify that the batch timestamp is after the GC threshold. This is
// Verify that command is not trying to write below the GC threshold. This is
// necessary because not all commands declare read access on the GC
// threshold key, even though they implicitly depend on it. This means
// that access to this state will not be serialized by latching,
// so we must perform this check upstream and downstream of raft.
// See #14833.
ts := raftCmd.ReplicatedEvalResult.Timestamp
if ts.LessEq(*replicaState.GCThreshold) {
// TODO(andrei,nvanbenschoten,bdarnell): Is this check below-Raft actually
// necessary, given that we've check at evaluation time that the request
// evaluates at a timestamp above the GC threshold? Does it actually matter if
// the GC threshold has advanced since then?
wts := raftCmd.ReplicatedEvalResult.WriteTimestamp
if wts.LessEq(*replicaState.GCThreshold) {
return leaseIndex, proposalNoReevaluation, roachpb.NewError(&roachpb.BatchTimestampBeforeGCError{
Timestamp: ts,
Timestamp: wts,
Threshold: *replicaState.GCThreshold,
})
}
Expand Down Expand Up @@ -484,7 +487,7 @@ func (b *replicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error
}

// Update the batch's max timestamp.
if clockTS, ok := cmd.replicatedResult().Timestamp.TryToClockTimestamp(); ok {
if clockTS, ok := cmd.replicatedResult().WriteTimestamp.TryToClockTimestamp(); ok {
b.maxTS.Forward(clockTS)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_application_state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestReplicaStateMachineChangeReplicas(t *testing.T) {
ReplicatedEvalResult: kvserverpb.ReplicatedEvalResult{
State: &kvserverpb.ReplicaState{Desc: &newDesc},
ChangeReplicas: &kvserverpb.ChangeReplicas{ChangeReplicasTrigger: trigger},
Timestamp: r.mu.state.GCThreshold.Add(1, 0),
WriteTimestamp: r.mu.state.GCThreshold.Add(1, 0),
},
},
confChange: &decodedConfChange{
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,10 @@ func (r *Replica) evaluateProposal(
// Evaluate the commands. If this returns without an error, the batch should
// be committed. Note that we don't hold any locks at this point. This is
// important since evaluating a proposal is expensive.
//
// Note that, during evaluation, ba's read and write timestamps might get
// bumped (see evaluateWriteBatchWithServersideRefreshes).
//
// TODO(tschottdorf): absorb all returned values in `res` below this point
// in the call stack as well.
batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, lul, latchSpans)
Expand Down Expand Up @@ -804,7 +808,7 @@ func (r *Replica) evaluateProposal(
// Set the proposal's replicated result, which contains metadata and
// side-effects that are to be replicated to all replicas.
res.Replicated.IsLeaseRequest = ba.IsLeaseRequest()
res.Replicated.Timestamp = ba.Timestamp
res.Replicated.WriteTimestamp = ba.WriteTimestamp()
res.Replicated.Delta = ms.ToStatsDelta()

// This is the result of a migration. See the field for more details.
Expand Down

0 comments on commit aa54820

Please sign in to comment.