diff --git a/pkg/kv/kvserver/api.proto b/pkg/kv/kvserver/api.proto index 9c50cd5ce62c..18453f87763c 100644 --- a/pkg/kv/kvserver/api.proto +++ b/pkg/kv/kvserver/api.proto @@ -36,15 +36,18 @@ message CollectChecksumRequest { bytes checksum_id = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "ChecksumID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"]; - bytes checksum = 4; + reserved 4; + // If true then the response must include the snapshot of the data from which + // the checksum is computed. + bool with_snapshot = 5; } message CollectChecksumResponse { // The checksum is the sha512 hash of the requested computation. It is empty // if the computation failed. bytes checksum = 1; - // snapshot is set if the roachpb.ComputeChecksumRequest had snapshot = true - // and the response checksum is different from the request checksum. + // snapshot is set if the with_snapshot in CollectChecksumRequest is true. For + // example, it can be set by the caller when it has detected an inconsistency. // // TODO(tschottdorf): with larger ranges, this is no longer tenable. // See https://github.com/cockroachdb/cockroach/issues/21128. diff --git a/pkg/kv/kvserver/consistency_queue.go b/pkg/kv/kvserver/consistency_queue.go index 771966df1a11..0a40db7aa21c 100644 --- a/pkg/kv/kvserver/consistency_queue.go +++ b/pkg/kv/kvserver/consistency_queue.go @@ -66,8 +66,9 @@ const consistencyCheckRateMinWait = 100 * time.Millisecond // replication factor of 7 could run 7 concurrent checks on every node. // // Note that checksum calculations below Raft are not tied to the caller's -// context (especially on followers), and will continue to run even after the -// caller has given up on them, which may cause them to build up. +// context, and may continue to run even after the caller has given up on them, +// which may cause them to build up. Although we do best effort to cancel the +// running task on the receiving end when the incoming request is aborted. // // CHECK_STATS checks do not count towards this limit, as they are cheap and the // DistSender will parallelize them across all ranges (notably when calling @@ -76,8 +77,8 @@ const consistencyCheckAsyncConcurrency = 7 // consistencyCheckAsyncTimeout is a below-Raft timeout for asynchronous // consistency check calculations. These are not tied to the caller's context, -// and thus will continue to run even after the caller has given up on them, so -// we give them an upper timeout to prevent them from running forever. +// and thus may continue to run even if the caller has given up on them, so we +// give them an upper timeout to prevent them from running forever. const consistencyCheckAsyncTimeout = time.Hour var testingAggressiveConsistencyChecks = envutil.EnvOrDefaultBool("COCKROACH_CONSISTENCY_AGGRESSIVE", false) diff --git a/pkg/kv/kvserver/replica_consistency.go b/pkg/kv/kvserver/replica_consistency.go index df02113a8a19..095759e708cd 100644 --- a/pkg/kv/kvserver/replica_consistency.go +++ b/pkg/kv/kvserver/replica_consistency.go @@ -15,7 +15,6 @@ import ( "crypto/sha512" "encoding/binary" "fmt" - "sort" "sync" "time" @@ -47,10 +46,15 @@ import ( // How long to keep consistency checker checksums in-memory for collection. // Typically a long-poll waits for the result of the computation, so it's almost -// immediately collected. However, the consistency checker synchronously -// collects the first replica's checksum before all others, so if the first one -// is slow the checksum may not be collected right away, and that first -// consistency check can take a long time due to rate limiting and range size. +// immediately collected. +// +// Up to 22.1, the consistency check initiator used to synchronously collect the +// first replica's checksum before all others, so checksum collection requests +// could arrive late if the first one was slow. Since 22.2, all requests are +// parallel and likely arrive quickly. +// +// TODO(pavelkalinnikov): Consider removing GC behaviour in 23.1+, when all the +// incoming requests are from 22.2+ nodes (hence arrive timely). const replicaChecksumGCInterval = time.Hour // fatalOnStatsMismatch, if true, turns stats mismatches into fatal errors. A @@ -87,10 +91,10 @@ type replicaChecksum struct { // ComputeChecksum through Raft and then issues CollectChecksum commands to the // other replicas. These are inspected and a CheckConsistencyResponse is assembled. // -// When args.Mode is CHECK_VIA_QUEUE and an inconsistency is detected and no -// diff was requested, the consistency check will be re-run to collect a diff, -// which is then printed before calling `log.Fatal`. This behavior should be -// lifted to the consistency checker queue in the future. +// When req.Mode is CHECK_VIA_QUEUE and an inconsistency is detected, the +// consistency check will be re-run to collect a diff, which is then printed +// before calling `log.Fatal`. This behavior should be lifted to the consistency +// checker queue in the future. func (r *Replica) CheckConsistency( ctx context.Context, req roachpb.CheckConsistencyRequest, ) (roachpb.CheckConsistencyResponse, *roachpb.Error) { @@ -328,7 +332,7 @@ type ConsistencyCheckResult struct { } func (r *Replica) collectChecksumFromReplica( - ctx context.Context, replica roachpb.ReplicaDescriptor, id uuid.UUID, checksum []byte, + ctx context.Context, replica roachpb.ReplicaDescriptor, id uuid.UUID, withSnap bool, ) (CollectChecksumResponse, error) { conn, err := r.store.cfg.NodeDialer.Dial(ctx, replica.NodeID, rpc.DefaultClass) if err != nil { @@ -340,7 +344,7 @@ func (r *Replica) collectChecksumFromReplica( StoreRequestHeader: StoreRequestHeader{NodeID: replica.NodeID, StoreID: replica.StoreID}, RangeID: r.RangeID, ChecksumID: id, - Checksum: checksum, + WithSnapshot: withSnap, } resp, err := client.CollectChecksum(ctx, req) if err != nil { @@ -364,70 +368,61 @@ func (r *Replica) runConsistencyCheck( } ccRes := res.(*roachpb.ComputeChecksumResponse) - var orderedReplicas []roachpb.ReplicaDescriptor - { - desc := r.Desc() - localReplica, err := r.GetReplicaDescriptor() - if err != nil { - return nil, errors.Wrap(err, "could not get replica descriptor") - } - - // Move the local replica to the front (which makes it the "master" - // we're comparing against). - orderedReplicas = append(orderedReplicas, desc.Replicas().Descriptors()...) - - sort.Slice(orderedReplicas, func(i, j int) bool { - return orderedReplicas[i] == localReplica - }) + replSet := r.Desc().Replicas() + localReplica, found := replSet.GetReplicaDescriptorByID(r.replicaID) + if !found { + return nil, errors.New("could not get local replica descriptor") } + replicas := replSet.Descriptors() + + resultCh := make(chan ConsistencyCheckResult, len(replicas)) + results := make([]ConsistencyCheckResult, 0, len(replicas)) - resultCh := make(chan ConsistencyCheckResult, len(orderedReplicas)) - var results []ConsistencyCheckResult var wg sync.WaitGroup + ctx, cancel := context.WithCancel(ctx) + + defer close(resultCh) // close the channel when + defer wg.Wait() // writers have terminated + defer cancel() // but cancel them first + // P.S. Have you noticed the Haiku? - for _, replica := range orderedReplicas { + for _, replica := range replicas { wg.Add(1) replica := replica // per-iteration copy for the goroutine if err := r.store.Stopper().RunAsyncTask(ctx, "storage.Replica: checking consistency", func(ctx context.Context) { defer wg.Done() - - var masterChecksum []byte - if len(results) > 0 { - masterChecksum = results[0].Response.Checksum - } - resp, err := r.collectChecksumFromReplica(ctx, replica, ccRes.ChecksumID, masterChecksum) + resp, err := r.collectChecksumFromReplica(ctx, replica, ccRes.ChecksumID, req.Snapshot) resultCh <- ConsistencyCheckResult{ Replica: replica, Response: resp, Err: err, } - }); err != nil { + }, + ); err != nil { + // If we can't start tasks, the node is likely draining. Return the error + // verbatim, after all the started tasks are stopped. wg.Done() - // If we can't start tasks, the node is likely draining. Just return the error verbatim. return nil, err } + } - // Collect the master result eagerly so that we can send a SHA in the - // remaining requests (this is used for logging inconsistencies on the - // remote nodes only). - if len(results) == 0 { - wg.Wait() - result := <-resultCh + // Collect the results from all replicas, while the tasks are running. + for result := range resultCh { + results = append(results, result) + if result.Replica.IsSame(localReplica) { + // If we can't compute the local checksum, give up. This will cancel all + // the outstanding requests, and wait for the tasks above to terminate. if err := result.Err; err != nil { - // If we can't compute the local checksum, give up. return nil, errors.Wrap(err, "computing own checksum") } - results = append(results, result) + // Put the local replica first in the list. + results[0], results[len(results)-1] = results[len(results)-1], results[0] + } + // If it was the last request, don't wait on the channel anymore. + if len(results) == len(replicas) { + break } - } - - wg.Wait() - close(resultCh) - - // Collect the remaining results. - for result := range resultCh { - results = append(results, result) } return results, nil diff --git a/pkg/kv/kvserver/stores_server.go b/pkg/kv/kvserver/stores_server.go index d49f21f74b2d..f3154aca6842 100644 --- a/pkg/kv/kvserver/stores_server.go +++ b/pkg/kv/kvserver/stores_server.go @@ -11,7 +11,6 @@ package kvserver import ( - "bytes" "context" "time" @@ -19,7 +18,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" - "github.com/cockroachdb/redact" ) // Server implements PerReplicaServer. @@ -53,7 +51,7 @@ func (is Server) execStoreCommand( func (is Server) CollectChecksum( ctx context.Context, req *CollectChecksumRequest, ) (*CollectChecksumResponse, error) { - resp := &CollectChecksumResponse{} + var resp *CollectChecksumResponse err := is.execStoreCommand(ctx, req.StoreRequestHeader, func(ctx context.Context, s *Store) error { ctx, cancel := s.stopper.WithCancelOnQuiesce(ctx) @@ -66,17 +64,7 @@ func (is Server) CollectChecksum( if err != nil { return err } - if !bytes.Equal(req.Checksum, ccr.Checksum) { - // If this check is false, then this request is the replica carrying out - // the consistency check. The message is spurious, but we want to leave the - // snapshot (if present) intact. - if len(req.Checksum) > 0 { - log.Errorf(ctx, "consistency check failed on range r%d: expected checksum %x, got %x", - req.RangeID, redact.Safe(req.Checksum), redact.Safe(ccr.Checksum)) - // Leave resp.Snapshot alone so that the caller will receive what's - // in it (if anything). - } - } else { + if !req.WithSnapshot { ccr.Snapshot = nil } resp = &ccr