Skip to content

Commit

Permalink
Merge #76841
Browse files Browse the repository at this point in the history
76841: kvserver: consistency checker cleanups, pt 1 r=erikgrinaker a=tbg

Thinking through our recent problems with pile-up of sha512 computations again,
I would like to touch up this code somewhat to give the "waiter" the option to
cancel the inflight computation when it gives up waiting. I realized a bit of
preliminary clean-up was in order before I do that. There might be more
clean-up after this PR.

Release note: None


Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
craig[bot] and tbg committed Feb 22, 2022
2 parents a6064fe + d8dd723 commit fba04e9
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 156 deletions.
9 changes: 3 additions & 6 deletions pkg/kv/kvserver/batcheval/cmd_compute_checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,9 @@ func declareKeysComputeChecksum(
latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(rs.GetStartKey())})
}

// Version numbers for Replica checksum computation. Requests silently no-op
// unless the versions are compatible.
const (
ReplicaChecksumVersion = 4
ReplicaChecksumGCInterval = time.Hour
)
// ReplicaChecksumVersion versions the checksum computation. Requests silently no-op
// unless the versions between the requesting and requested replica are compatible.
const ReplicaChecksumVersion = 4

// ComputeChecksum starts the process of computing a checksum on the replica at
// a particular snapshot. The checksum is later verified through a
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ type Replica struct {
lastUpdateTimes lastUpdateTimesMap

// Computed checksum at a snapshot UUID.
checksums map[uuid.UUID]ReplicaChecksum
checksums map[uuid.UUID]replicaChecksum

// proposalQuota is the quota pool maintained by the lease holder where
// incoming writes acquire quota from a fixed quota pool before going
Expand Down
155 changes: 147 additions & 8 deletions pkg/kv/kvserver/replica_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@ import (
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand All @@ -40,6 +43,14 @@ import (
"github.com/cockroachdb/redact"
)

// How long to keep consistency checker checksums in-memory for collection.
// Typically a long-poll waits for the result of the computation, so it's almost
// immediately collected. However, the consistency checker synchronously
// collects the first replica's checksum before all others, so if the first one
// is slow the checksum may not be collected right away, and that first
// consistency check can take a long time due to rate limiting and range size.
const replicaChecksumGCInterval = time.Hour

// fatalOnStatsMismatch, if true, turns stats mismatches into fatal errors. A
// stats mismatch is the event in which
// - the consistency checker finds that all replicas are consistent
Expand All @@ -54,8 +65,8 @@ import (
// know old CRDB versions (<19.1 at time of writing) were not involved.
var fatalOnStatsMismatch = envutil.EnvOrDefaultBool("COCKROACH_ENFORCE_CONSISTENT_STATS", false)

// ReplicaChecksum contains progress on a replica checksum computation.
type ReplicaChecksum struct {
// replicaChecksum contains progress on a replica checksum computation.
type replicaChecksum struct {
CollectChecksumResponse
// started is true if the checksum computation has started.
started bool
Expand Down Expand Up @@ -419,10 +430,19 @@ func (r *Replica) RunConsistencyCheck(
return results, nil
}

func (r *Replica) gcOldChecksumEntriesLocked(now time.Time) {
for id, val := range r.mu.checksums {
// The timestamp is valid only if set.
if !val.gcTimestamp.IsZero() && now.After(val.gcTimestamp) {
delete(r.mu.checksums, id)
}
}
}

// getChecksum waits for the result of ComputeChecksum and returns it.
// It returns false if there is no checksum being computed for the id,
// or it has already been GCed.
func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (ReplicaChecksum, error) {
func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (replicaChecksum, error) {
now := timeutil.Now()
r.mu.Lock()
r.gcOldChecksumEntriesLocked(now)
Expand All @@ -442,14 +462,14 @@ func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (ReplicaChecksu
// Wait for the checksum to compute or at least to start.
computed, err := r.checksumInitialWait(ctx, id, c.notify)
if err != nil {
return ReplicaChecksum{}, err
return replicaChecksum{}, err
}
// If the checksum started, but has not completed commit
// to waiting the full deadline.
if !computed {
_, err = r.checksumWait(ctx, id, c.notify, nil)
if err != nil {
return ReplicaChecksum{}, err
return replicaChecksum{}, err
}
}

Expand All @@ -463,7 +483,7 @@ func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (ReplicaChecksu
// The latter case can occur when there's a version mismatch or, more generally,
// when the (async) checksum computation fails.
if !ok || c.Checksum == nil {
return ReplicaChecksum{}, errors.Errorf("no checksum found (ID = %s)", id)
return replicaChecksum{}, errors.Errorf("no checksum found (ID = %s)", id)
}
return c, nil
}
Expand Down Expand Up @@ -539,7 +559,7 @@ func (r *Replica) computeChecksumDone(
c.Delta = enginepb.MVCCStatsDelta(delta)
c.Persisted = result.PersistedMS
}
c.gcTimestamp = timeutil.Now().Add(batcheval.ReplicaChecksumGCInterval)
c.gcTimestamp = timeutil.Now().Add(replicaChecksumGCInterval)
c.Snapshot = snapshot
r.mu.checksums[id] = c
// Notify
Expand All @@ -559,7 +579,7 @@ type replicaHash struct {

// sha512 computes the SHA512 hash of all the replica data at the snapshot.
// It will dump all the kv data into snapshot if it is provided.
func (r *Replica) sha512(
func (*Replica) sha512(
ctx context.Context,
desc roachpb.RangeDescriptor,
snap storage.Reader,
Expand Down Expand Up @@ -684,3 +704,122 @@ func (r *Replica) sha512(

return &result, nil
}

func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.ComputeChecksum) {
stopper := r.store.Stopper()
now := timeutil.Now()
r.mu.Lock()
var notify chan struct{}
if c, ok := r.mu.checksums[cc.ChecksumID]; !ok {
// There is no record of this ID. Make a new notification.
notify = make(chan struct{})
} else if !c.started {
// A CollectChecksumRequest is waiting on the existing notification.
notify = c.notify
} else {
log.Fatalf(ctx, "attempted to apply ComputeChecksum command with duplicated checksum ID %s",
cc.ChecksumID)
}

r.gcOldChecksumEntriesLocked(now)

// Create an entry with checksum == nil and gcTimestamp unset.
r.mu.checksums[cc.ChecksumID] = replicaChecksum{started: true, notify: notify}
desc := *r.mu.state.Desc
r.mu.Unlock()

if cc.Version != batcheval.ReplicaChecksumVersion {
r.computeChecksumDone(ctx, cc.ChecksumID, nil, nil)
log.Infof(ctx, "incompatible ComputeChecksum versions (requested: %d, have: %d)",
cc.Version, batcheval.ReplicaChecksumVersion)
return
}

// Caller is holding raftMu, so an engine snapshot is automatically
// Raft-consistent (i.e. not in the middle of an AddSSTable).
snap := r.store.engine.NewSnapshot()
if cc.Checkpoint {
sl := stateloader.Make(r.RangeID)
as, err := sl.LoadRangeAppliedState(ctx, snap)
if err != nil {
log.Warningf(ctx, "unable to load applied index, continuing anyway")
}
// NB: the names here will match on all nodes, which is nice for debugging.
tag := fmt.Sprintf("r%d_at_%d", r.RangeID, as.RaftAppliedIndex)
if dir, err := r.store.checkpoint(ctx, tag); err != nil {
log.Warningf(ctx, "unable to create checkpoint %s: %+v", dir, err)
} else {
log.Warningf(ctx, "created checkpoint %s", dir)
}
}

// Compute SHA asynchronously and store it in a map by UUID.
// Don't use the proposal's context for this, as it likely to be canceled very
// soon.
if err := stopper.RunAsyncTask(r.AnnotateCtx(context.Background()), "storage.Replica: computing checksum", func(ctx context.Context) {
func() {
defer snap.Close()
var snapshot *roachpb.RaftSnapshotData
if cc.SaveSnapshot {
snapshot = &roachpb.RaftSnapshotData{}
}

result, err := r.sha512(ctx, desc, snap, snapshot, cc.Mode, r.store.consistencyLimiter)
if err != nil {
log.Errorf(ctx, "%v", err)
result = nil
}
r.computeChecksumDone(ctx, cc.ChecksumID, result, snapshot)
}()

var shouldFatal bool
for _, rDesc := range cc.Terminate {
if rDesc.StoreID == r.store.StoreID() && rDesc.ReplicaID == r.replicaID {
shouldFatal = true
}
}

if shouldFatal {
// This node should fatal as a result of a previous consistency
// check (i.e. this round is carried out only to obtain a diff).
// If we fatal too early, the diff won't make it back to the lease-
// holder and thus won't be printed to the logs. Since we're already
// in a goroutine that's about to end, simply sleep for a few seconds
// and then terminate.
auxDir := r.store.engine.GetAuxiliaryDir()
_ = r.store.engine.MkdirAll(auxDir)
path := base.PreventedStartupFile(auxDir)

const attentionFmt = `ATTENTION:
this node is terminating because a replica inconsistency was detected between %s
and its other replicas. Please check your cluster-wide log files for more
information and contact the CockroachDB support team. It is not necessarily safe
to replace this node; cluster data may still be at risk of corruption.
A checkpoints directory to aid (expert) debugging should be present in:
%s
A file preventing this node from restarting was placed at:
%s
`
preventStartupMsg := fmt.Sprintf(attentionFmt, r, auxDir, path)
if err := fs.WriteFile(r.store.engine, path, []byte(preventStartupMsg)); err != nil {
log.Warningf(ctx, "%v", err)
}

if p := r.store.cfg.TestingKnobs.ConsistencyTestingKnobs.OnBadChecksumFatal; p != nil {
p(*r.store.Ident)
} else {
time.Sleep(10 * time.Second)
log.Fatalf(r.AnnotateCtx(context.Background()), attentionFmt, r, auxDir, path)
}
}

}); err != nil {
defer snap.Close()
log.Errorf(ctx, "could not run async checksum computation (ID = %s): %v", cc.ChecksumID, err)
// Set checksum to nil.
r.computeChecksumDone(ctx, cc.ChecksumID, nil, nil)
}
}
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/replica_consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) {

// Simple condition, the checksum is notified, but not computed.
tc.repl.mu.Lock()
tc.repl.mu.checksums[id] = ReplicaChecksum{notify: notify}
tc.repl.mu.checksums[id] = replicaChecksum{notify: notify}
tc.repl.mu.Unlock()
rc, err := tc.repl.getChecksum(ctx, id)
if !testutils.IsError(err, "no checksum found") {
Expand All @@ -88,7 +88,7 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) {
// this will take 10ms.
id = uuid.FastMakeV4()
tc.repl.mu.Lock()
tc.repl.mu.checksums[id] = ReplicaChecksum{notify: make(chan struct{})}
tc.repl.mu.checksums[id] = replicaChecksum{notify: make(chan struct{})}
tc.repl.mu.Unlock()
rc, err = tc.repl.getChecksum(ctx, id)
if !testutils.IsError(err, "checksum computation did not start") {
Expand All @@ -99,7 +99,7 @@ func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) {
// so next step is for context deadline.
id = uuid.FastMakeV4()
tc.repl.mu.Lock()
tc.repl.mu.checksums[id] = ReplicaChecksum{notify: make(chan struct{}), started: true}
tc.repl.mu.checksums[id] = replicaChecksum{notify: make(chan struct{}), started: true}
tc.repl.mu.Unlock()
rc, err = tc.repl.getChecksum(ctx, id)
if !testutils.IsError(err, "context deadline exceeded") {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func newUnloadedReplica(
return kvserverbase.SplitByLoadMergeDelay.Get(&store.cfg.Settings.SV)
})
r.mu.proposals = map[kvserverbase.CmdIDKey]*ProposalData{}
r.mu.checksums = map[uuid.UUID]ReplicaChecksum{}
r.mu.checksums = map[uuid.UUID]replicaChecksum{}
r.mu.proposalBuf.Init((*replicaProposer)(r), tracker.NewLockfreeTracker(), r.Clock(), r.ClusterSettings())
r.mu.proposalBuf.testing.allowLeaseProposalWhenNotLeader = store.cfg.TestingKnobs.AllowLeaseRequestProposalsWhenNotLeader
r.mu.proposalBuf.testing.dontCloseTimestamps = store.cfg.TestingKnobs.DontCloseTimestamps
Expand Down
Loading

0 comments on commit fba04e9

Please sign in to comment.