From 1a2dd2cd8961bd98e5a78f0458b4fdb6cdd8d203 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Tue, 23 Aug 2022 12:40:59 +0100 Subject: [PATCH] kvserver: send all consistency requests in parallel Currently, the replica initiating the consistency check sends a collection request to itself first, and only then to other replicas in parallel. This results in substantial asynchrony on the receiving replica, between the incoming CollectChecksum request and the checksum computation task started by the ComputeChecksum message. The current solution to that is keeping the checksum computation results in memory for replicaChecksumGCInterval to return them to late arriving requests. The reason why the first checksum collection blocks the others is that it computes the "master checksum", which is then added to all other requests. However, this field is only used by the receiving end to log an inconsistency error. The actual killing of this replica happens on the second phase of the protocol, after the initiating replica commits another Raft message with the Terminate field populated. So, there is no strong reason to keep this blocking behaviour. If the initiating replica fails to compute its local checksum, it does not send requests to other replicas. This is problematic because the checksum tasks will be run on all replicas, which opens the possibility for accumulating many such dangling checks. This commit makes all the checksum collection requests parallel. Benefits: - There is less asynchrony between the sender and receiver, so we can drop the GC (in follow-up commits), and require an incoming request before starting the checksum computation task. - All the outgoing collection requests are now explicitly canceled if the local computation fails. This way, the cancelation signal has more chance to propagate to all replicas and cancel the tasks that were started anyway. Release justification: performance and stability improvement Release note (bug fix): Consistency checks are now sent to all replicas in parallel, previously it would be blocked on processing the local replica first. This a) reduces the latency of one check 2x, and b) allows better propagation of the cancelation signal which results in fewer abandoned tasks on remote replicas, and more resources spent on useful checks. --- pkg/kv/kvserver/consistency_queue.go | 9 +-- pkg/kv/kvserver/replica_consistency.go | 90 ++++++++++++-------------- pkg/kv/kvserver/stores_server.go | 4 ++ 3 files changed, 50 insertions(+), 53 deletions(-) diff --git a/pkg/kv/kvserver/consistency_queue.go b/pkg/kv/kvserver/consistency_queue.go index 771966df1a11..0a40db7aa21c 100644 --- a/pkg/kv/kvserver/consistency_queue.go +++ b/pkg/kv/kvserver/consistency_queue.go @@ -66,8 +66,9 @@ const consistencyCheckRateMinWait = 100 * time.Millisecond // replication factor of 7 could run 7 concurrent checks on every node. // // Note that checksum calculations below Raft are not tied to the caller's -// context (especially on followers), and will continue to run even after the -// caller has given up on them, which may cause them to build up. +// context, and may continue to run even after the caller has given up on them, +// which may cause them to build up. Although we do best effort to cancel the +// running task on the receiving end when the incoming request is aborted. // // CHECK_STATS checks do not count towards this limit, as they are cheap and the // DistSender will parallelize them across all ranges (notably when calling @@ -76,8 +77,8 @@ const consistencyCheckAsyncConcurrency = 7 // consistencyCheckAsyncTimeout is a below-Raft timeout for asynchronous // consistency check calculations. These are not tied to the caller's context, -// and thus will continue to run even after the caller has given up on them, so -// we give them an upper timeout to prevent them from running forever. +// and thus may continue to run even if the caller has given up on them, so we +// give them an upper timeout to prevent them from running forever. const consistencyCheckAsyncTimeout = time.Hour var testingAggressiveConsistencyChecks = envutil.EnvOrDefaultBool("COCKROACH_CONSISTENCY_AGGRESSIVE", false) diff --git a/pkg/kv/kvserver/replica_consistency.go b/pkg/kv/kvserver/replica_consistency.go index bcf97947c357..f60354225df9 100644 --- a/pkg/kv/kvserver/replica_consistency.go +++ b/pkg/kv/kvserver/replica_consistency.go @@ -15,7 +15,6 @@ import ( "crypto/sha512" "encoding/binary" "fmt" - "sort" "sync" "time" @@ -47,10 +46,14 @@ import ( // How long to keep consistency checker checksums in-memory for collection. // Typically a long-poll waits for the result of the computation, so it's almost -// immediately collected. However, the consistency checker synchronously -// collects the first replica's checksum before all others, so if the first one -// is slow the checksum may not be collected right away, and that first -// consistency check can take a long time due to rate limiting and range size. +// immediately collected. +// +// TODO(pavelkalinnikov): The consistency checker used to synchronously collect +// the first replica's checksum before all others, so if the first one was slow +// the checksum could not be collected right away, and that first consistency +// check could take a long time due to rate limiting and range size. Now, all +// requests are sent immediately in parallel, so we can remove the GC altogether +// and synchronize the request with the task. const replicaChecksumGCInterval = time.Hour // fatalOnStatsMismatch, if true, turns stats mismatches into fatal errors. A @@ -334,7 +337,7 @@ type ConsistencyCheckResult struct { } func (r *Replica) collectChecksumFromReplica( - ctx context.Context, replica roachpb.ReplicaDescriptor, id uuid.UUID, checksum []byte, + ctx context.Context, replica roachpb.ReplicaDescriptor, id uuid.UUID, ) (CollectChecksumResponse, error) { conn, err := r.store.cfg.NodeDialer.Dial(ctx, replica.NodeID, rpc.DefaultClass) if err != nil { @@ -346,7 +349,7 @@ func (r *Replica) collectChecksumFromReplica( StoreRequestHeader: StoreRequestHeader{NodeID: replica.NodeID, StoreID: replica.StoreID}, RangeID: r.RangeID, ChecksumID: id, - Checksum: checksum, + Checksum: nil, // TODO(pavelkalinnikov): Deprecate this field. } resp, err := client.CollectChecksum(ctx, req) if err != nil { @@ -370,70 +373,59 @@ func (r *Replica) RunConsistencyCheck( } ccRes := res.(*roachpb.ComputeChecksumResponse) - var orderedReplicas []roachpb.ReplicaDescriptor - { - desc := r.Desc() - localReplica, err := r.GetReplicaDescriptor() - if err != nil { - return nil, errors.Wrap(err, "could not get replica descriptor") - } - - // Move the local replica to the front (which makes it the "master" - // we're comparing against). - orderedReplicas = append(orderedReplicas, desc.Replicas().Descriptors()...) - - sort.Slice(orderedReplicas, func(i, j int) bool { - return orderedReplicas[i] == localReplica - }) + localReplica, err := r.GetReplicaDescriptor() + if err != nil { + return nil, errors.Wrap(err, "could not get replica descriptor") } - resultCh := make(chan ConsistencyCheckResult, len(orderedReplicas)) - var results []ConsistencyCheckResult + replicas := r.Desc().Replicas().Descriptors() + resultCh := make(chan ConsistencyCheckResult, len(replicas)) + results := make([]ConsistencyCheckResult, 0, len(replicas)) + var wg sync.WaitGroup + ctx, cancel := context.WithCancel(ctx) + + defer close(resultCh) // close the channel only after + defer wg.Wait() // all producers have terminated + defer cancel() // which happens after we cancel them - for _, replica := range orderedReplicas { + for _, replica := range replicas { wg.Add(1) replica := replica // per-iteration copy for the goroutine if err := r.store.Stopper().RunAsyncTask(ctx, "storage.Replica: checking consistency", func(ctx context.Context) { defer wg.Done() - - var masterChecksum []byte - if len(results) > 0 { - masterChecksum = results[0].Response.Checksum - } - resp, err := r.collectChecksumFromReplica(ctx, replica, ccRes.ChecksumID, masterChecksum) + resp, err := r.collectChecksumFromReplica(ctx, replica, ccRes.ChecksumID) resultCh <- ConsistencyCheckResult{ Replica: replica, Response: resp, Err: err, } - }); err != nil { + }, + ); err != nil { + // If we can't start tasks, the node is likely draining. Return the error + // verbatim, after all the started tasks are stopped. wg.Done() - // If we can't start tasks, the node is likely draining. Just return the error verbatim. return nil, err } + } - // Collect the master result eagerly so that we can send a SHA in the - // remaining requests (this is used for logging inconsistencies on the - // remote nodes only). - if len(results) == 0 { - wg.Wait() - result := <-resultCh + // Collect the results from all replicas, while the tasks are running. + for result := range resultCh { + results = append(results, result) + if result.Replica.IsSame(localReplica) { + // If we can't compute the local checksum, give up. This will cancel all + // the outstanding requests, and wait for the tasks above to terminate. if err := result.Err; err != nil { - // If we can't compute the local checksum, give up. return nil, errors.Wrap(err, "computing own checksum") } - results = append(results, result) + // Put the local replica first in the list. + results[0], results[len(results)-1] = results[len(results)-1], results[0] + } + // If it was the last request, don't wait on the channel anymore. + if len(results) == len(replicas) { + break } - } - - wg.Wait() - close(resultCh) - - // Collect the remaining results. - for result := range resultCh { - results = append(results, result) } return results, nil diff --git a/pkg/kv/kvserver/stores_server.go b/pkg/kv/kvserver/stores_server.go index d49f21f74b2d..93ddaf0885c9 100644 --- a/pkg/kv/kvserver/stores_server.go +++ b/pkg/kv/kvserver/stores_server.go @@ -70,6 +70,10 @@ func (is Server) CollectChecksum( // If this check is false, then this request is the replica carrying out // the consistency check. The message is spurious, but we want to leave the // snapshot (if present) intact. + // + // TODO(pavelkalinnikov): The Checksum field is no longer populated. + // Drop this check, or move it to where the ComputeChecksum.Terminate + // field takes effect. if len(req.Checksum) > 0 { log.Errorf(ctx, "consistency check failed on range r%d: expected checksum %x, got %x", req.RangeID, redact.Safe(req.Checksum), redact.Safe(ccr.Checksum))