Skip to content

Commit

Permalink
kvserver: even more closed ts assertions
Browse files Browse the repository at this point in the history
This patch adds historical information to the assertion against closed
timestamp regressions. We've seen that assertion fire in cockroachdb#61981.
The replica now maintains info about what command last bumped the
ClosedTimestamp.

Release note: None
  • Loading branch information
andreimatei committed Apr 21, 2021
1 parent a175614 commit cf7875b
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 2 deletions.
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,9 @@ type Replica struct {
failureToGossipSystemConfig bool

tenantID roachpb.TenantID // Set when first initialized, not modified after

// Historical information about the command that set the closed timestamp.
closedTimestampSetter closedTimestampSetterInfo
}

rangefeedMu struct {
Expand Down
70 changes: 68 additions & 2 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -25,8 +26,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/kr/pretty"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
Expand Down Expand Up @@ -357,6 +360,7 @@ func (sm *replicaStateMachine) NewBatch(ephemeral bool) apply.Batch {
b.state = r.mu.state
b.state.Stats = &b.stats
*b.state.Stats = *r.mu.state.Stats
b.closedTimestampSetter = r.mu.closedTimestampSetter
r.mu.RUnlock()
b.start = timeutil.Now()
return b
Expand All @@ -379,6 +383,9 @@ type replicaAppBatch struct {
// under the Replica.mu when the batch is initialized and is updated in
// stageTrivialReplicatedEvalResult.
state kvserverpb.ReplicaState
// closedTimestampSetter maintains historical information about the
// advancement of the closed timestamp.
closedTimestampSetter closedTimestampSetterInfo
// stats is stored on the application batch to avoid an allocation in
// tracking the batch's view of replicaState. All pointer fields in
// replicaState other than Stats are overwritten completely rather than
Expand Down Expand Up @@ -827,6 +834,7 @@ func (b *replicaAppBatch) stageTrivialReplicatedEvalResult(
}
if cts := cmd.raftCmd.ClosedTimestamp; cts != nil && !cts.IsEmpty() {
b.state.RaftClosedTimestamp = *cts
b.closedTimestampSetter.record(cmd, b.state.Lease)
if clockTS, ok := cts.TryToClockTimestamp(); ok {
b.maxTS.Forward(clockTS)
}
Expand Down Expand Up @@ -898,6 +906,7 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error {
"raft closed timestamp regression; replica has: %s, new batch has: %s.",
existingClosed.String(), newClosed.String())
}
r.mu.closedTimestampSetter = b.closedTimestampSetter

closedTimestampUpdated := r.mu.state.RaftClosedTimestamp.Forward(b.state.RaftClosedTimestamp)
prevStats := *r.mu.state.Stats
Expand Down Expand Up @@ -1067,9 +1076,18 @@ func (b *replicaAppBatch) assertNoCmdClosedTimestampRegression(cmd *replicatedCm
} else {
req = "<unknown; not leaseholder>"
}
var prevReq redact.StringBuilder
if req := b.closedTimestampSetter.leaseReq; req != nil {
prevReq.Printf("lease acquisition: %s (prev: %s)", req.Lease, req.PrevLease)
} else {
prevReq.SafeString("<unknown; not leaseholder or not lease request>")
}

return errors.AssertionFailedf(
"raft closed timestamp regression in cmd: %x; batch state: %s, command: %s, lease: %s, req: %s",
cmd.idKey, existingClosed.String(), newClosed.String(), b.state.Lease, req)
"raft closed timestamp regression in cmd: %x; batch state: %s, command: %s, lease: %s, req: %s, applying at LAI: %d.\n"+
"Closed timestamp was set by req: %s under lease: %s; applied at LAI: %d. Batch idx: %d.",
cmd.idKey, existingClosed.String(), newClosed.String(), b.state.Lease, req, cmd.leaseIndex,
prevReq, b.closedTimestampSetter.lease, b.closedTimestampSetter.leaseIdx, b.entries)
}
return nil
}
Expand Down Expand Up @@ -1322,3 +1340,51 @@ func (sm *replicaStateMachine) moveStats() applyCommittedEntriesStats {
sm.stats = applyCommittedEntriesStats{}
return stats
}

// closedTimestampSetterInfo contains information about the command that last
// bumped the closed timestamp.
type closedTimestampSetterInfo struct {
// lease represents the lease under which the command is being applied.
lease *roachpb.Lease
// leaseIdx is the LAI of the command.
leaseIdx ctpb.LAI
// leaseReq is set if the request that generated this command was a
// RequestLeaseRequest. This is only ever set on the leaseholder replica since
// only the leaseholder has information about the request corresponding to a
// command.
// NOTE: We only keep track of lease requests because keeping track of all
// requests would be too expensive: cloning the request is expensive and also
// requests can be large in memory.
leaseReq *roachpb.RequestLeaseRequest
// split and merge are set if the request was an EndTxn with the respective
// commit trigger set.
split, merge bool
}

// record saves information about the command that update's the replica's closed
// timestamp.
func (s *closedTimestampSetterInfo) record(cmd *replicatedCmd, lease *roachpb.Lease) {
*s = closedTimestampSetterInfo{}
s.leaseIdx = ctpb.LAI(cmd.leaseIndex)
s.lease = lease
if !cmd.IsLocal() {
return
}
req := cmd.proposal.Request
et, ok := req.GetArg(roachpb.EndTxn)
if ok {
endTxn := et.(*roachpb.EndTxnRequest)
if trig := endTxn.InternalCommitTrigger; trig != nil {
if trig.SplitTrigger != nil {
s.split = true
} else if trig.MergeTrigger != nil {
s.merge = true
}
}
} else if req.IsLeaseRequest() {
// Make a deep copy since we're not allowed to hold on to request
// memory.
lr, _ := req.GetArg(roachpb.RequestLease)
s.leaseReq = protoutil.Clone(lr).(*roachpb.RequestLeaseRequest)
}
}

0 comments on commit cf7875b

Please sign in to comment.