From f72d785cfd6af677a4c0a902cd9f47645217368f Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Tue, 23 Aug 2022 12:40:59 +0100 Subject: [PATCH] kvserver: send all consistency requests in parallel Currently, the replica initiating the consistency check sends a collection request to itself first, and only then to other replicas in parallel. This results in substantial asynchrony on the receiving replica, between the incoming CollectChecksum request and the checksum computation task started by the ComputeChecksum message. The current solution to that is keeping the checksum computation results in memory for replicaChecksumGCInterval to return them to late arriving requests. The reason why the first checksum collection blocks the others is that it computes the "master checksum", which is then added to all other requests. However, this field is only used by the receiving end to log an inconsistency error. The actual killing of this replica happens on the second phase of the protocol, after the initiating replica commits another Raft message with the Terminate field populated. So, there is no strong reason to keep this blocking behaviour. If the initiating replica fails to compute its local checksum, it does not send requests to other replicas. This is problematic because the checksum tasks will be run on all replicas, which opens the possibility for accumulating many such dangling checks. This commit makes all the checksum collection requests parallel. Benefits: - There is less asynchrony between the sender and receiver, so we can drop the GC (in follow-up commits), and require an incoming request before starting the checksum computation task. - All the outgoing collection requests are now explicitly canceled if the local computation fails. This way, the cancelation signal has more chance to propagate to all replicas and cancel the tasks that were started anyway. Release justification: performance and stability improvement Release note (bug fix): Consistency checks are now sent to all replicas in parallel, previously it would be blocked on processing the local replica first. This a) reduces the latency of one check 2x, and b) allows better propagation of the cancelation signal which results in fewer abandoned tasks on remote replicas, and more resources spent on useful checks. --- pkg/kv/kvserver/api.proto | 9 ++- pkg/kv/kvserver/consistency_queue.go | 9 ++- pkg/kv/kvserver/replica_consistency.go | 101 ++++++++++++------------- pkg/kv/kvserver/stores_server.go | 16 +--- 4 files changed, 61 insertions(+), 74 deletions(-) diff --git a/pkg/kv/kvserver/api.proto b/pkg/kv/kvserver/api.proto index 9c50cd5ce62c..18453f87763c 100644 --- a/pkg/kv/kvserver/api.proto +++ b/pkg/kv/kvserver/api.proto @@ -36,15 +36,18 @@ message CollectChecksumRequest { bytes checksum_id = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "ChecksumID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"]; - bytes checksum = 4; + reserved 4; + // If true then the response must include the snapshot of the data from which + // the checksum is computed. + bool with_snapshot = 5; } message CollectChecksumResponse { // The checksum is the sha512 hash of the requested computation. It is empty // if the computation failed. bytes checksum = 1; - // snapshot is set if the roachpb.ComputeChecksumRequest had snapshot = true - // and the response checksum is different from the request checksum. + // snapshot is set if the with_snapshot in CollectChecksumRequest is true. For + // example, it can be set by the caller when it has detected an inconsistency. // // TODO(tschottdorf): with larger ranges, this is no longer tenable. // See https://github.com/cockroachdb/cockroach/issues/21128. diff --git a/pkg/kv/kvserver/consistency_queue.go b/pkg/kv/kvserver/consistency_queue.go index 771966df1a11..0a40db7aa21c 100644 --- a/pkg/kv/kvserver/consistency_queue.go +++ b/pkg/kv/kvserver/consistency_queue.go @@ -66,8 +66,9 @@ const consistencyCheckRateMinWait = 100 * time.Millisecond // replication factor of 7 could run 7 concurrent checks on every node. // // Note that checksum calculations below Raft are not tied to the caller's -// context (especially on followers), and will continue to run even after the -// caller has given up on them, which may cause them to build up. +// context, and may continue to run even after the caller has given up on them, +// which may cause them to build up. Although we do best effort to cancel the +// running task on the receiving end when the incoming request is aborted. // // CHECK_STATS checks do not count towards this limit, as they are cheap and the // DistSender will parallelize them across all ranges (notably when calling @@ -76,8 +77,8 @@ const consistencyCheckAsyncConcurrency = 7 // consistencyCheckAsyncTimeout is a below-Raft timeout for asynchronous // consistency check calculations. These are not tied to the caller's context, -// and thus will continue to run even after the caller has given up on them, so -// we give them an upper timeout to prevent them from running forever. +// and thus may continue to run even if the caller has given up on them, so we +// give them an upper timeout to prevent them from running forever. const consistencyCheckAsyncTimeout = time.Hour var testingAggressiveConsistencyChecks = envutil.EnvOrDefaultBool("COCKROACH_CONSISTENCY_AGGRESSIVE", false) diff --git a/pkg/kv/kvserver/replica_consistency.go b/pkg/kv/kvserver/replica_consistency.go index df02113a8a19..095759e708cd 100644 --- a/pkg/kv/kvserver/replica_consistency.go +++ b/pkg/kv/kvserver/replica_consistency.go @@ -15,7 +15,6 @@ import ( "crypto/sha512" "encoding/binary" "fmt" - "sort" "sync" "time" @@ -47,10 +46,15 @@ import ( // How long to keep consistency checker checksums in-memory for collection. // Typically a long-poll waits for the result of the computation, so it's almost -// immediately collected. However, the consistency checker synchronously -// collects the first replica's checksum before all others, so if the first one -// is slow the checksum may not be collected right away, and that first -// consistency check can take a long time due to rate limiting and range size. +// immediately collected. +// +// Up to 22.1, the consistency check initiator used to synchronously collect the +// first replica's checksum before all others, so checksum collection requests +// could arrive late if the first one was slow. Since 22.2, all requests are +// parallel and likely arrive quickly. +// +// TODO(pavelkalinnikov): Consider removing GC behaviour in 23.1+, when all the +// incoming requests are from 22.2+ nodes (hence arrive timely). const replicaChecksumGCInterval = time.Hour // fatalOnStatsMismatch, if true, turns stats mismatches into fatal errors. A @@ -87,10 +91,10 @@ type replicaChecksum struct { // ComputeChecksum through Raft and then issues CollectChecksum commands to the // other replicas. These are inspected and a CheckConsistencyResponse is assembled. // -// When args.Mode is CHECK_VIA_QUEUE and an inconsistency is detected and no -// diff was requested, the consistency check will be re-run to collect a diff, -// which is then printed before calling `log.Fatal`. This behavior should be -// lifted to the consistency checker queue in the future. +// When req.Mode is CHECK_VIA_QUEUE and an inconsistency is detected, the +// consistency check will be re-run to collect a diff, which is then printed +// before calling `log.Fatal`. This behavior should be lifted to the consistency +// checker queue in the future. func (r *Replica) CheckConsistency( ctx context.Context, req roachpb.CheckConsistencyRequest, ) (roachpb.CheckConsistencyResponse, *roachpb.Error) { @@ -328,7 +332,7 @@ type ConsistencyCheckResult struct { } func (r *Replica) collectChecksumFromReplica( - ctx context.Context, replica roachpb.ReplicaDescriptor, id uuid.UUID, checksum []byte, + ctx context.Context, replica roachpb.ReplicaDescriptor, id uuid.UUID, withSnap bool, ) (CollectChecksumResponse, error) { conn, err := r.store.cfg.NodeDialer.Dial(ctx, replica.NodeID, rpc.DefaultClass) if err != nil { @@ -340,7 +344,7 @@ func (r *Replica) collectChecksumFromReplica( StoreRequestHeader: StoreRequestHeader{NodeID: replica.NodeID, StoreID: replica.StoreID}, RangeID: r.RangeID, ChecksumID: id, - Checksum: checksum, + WithSnapshot: withSnap, } resp, err := client.CollectChecksum(ctx, req) if err != nil { @@ -364,70 +368,61 @@ func (r *Replica) runConsistencyCheck( } ccRes := res.(*roachpb.ComputeChecksumResponse) - var orderedReplicas []roachpb.ReplicaDescriptor - { - desc := r.Desc() - localReplica, err := r.GetReplicaDescriptor() - if err != nil { - return nil, errors.Wrap(err, "could not get replica descriptor") - } - - // Move the local replica to the front (which makes it the "master" - // we're comparing against). - orderedReplicas = append(orderedReplicas, desc.Replicas().Descriptors()...) - - sort.Slice(orderedReplicas, func(i, j int) bool { - return orderedReplicas[i] == localReplica - }) + replSet := r.Desc().Replicas() + localReplica, found := replSet.GetReplicaDescriptorByID(r.replicaID) + if !found { + return nil, errors.New("could not get local replica descriptor") } + replicas := replSet.Descriptors() + + resultCh := make(chan ConsistencyCheckResult, len(replicas)) + results := make([]ConsistencyCheckResult, 0, len(replicas)) - resultCh := make(chan ConsistencyCheckResult, len(orderedReplicas)) - var results []ConsistencyCheckResult var wg sync.WaitGroup + ctx, cancel := context.WithCancel(ctx) + + defer close(resultCh) // close the channel when + defer wg.Wait() // writers have terminated + defer cancel() // but cancel them first + // P.S. Have you noticed the Haiku? - for _, replica := range orderedReplicas { + for _, replica := range replicas { wg.Add(1) replica := replica // per-iteration copy for the goroutine if err := r.store.Stopper().RunAsyncTask(ctx, "storage.Replica: checking consistency", func(ctx context.Context) { defer wg.Done() - - var masterChecksum []byte - if len(results) > 0 { - masterChecksum = results[0].Response.Checksum - } - resp, err := r.collectChecksumFromReplica(ctx, replica, ccRes.ChecksumID, masterChecksum) + resp, err := r.collectChecksumFromReplica(ctx, replica, ccRes.ChecksumID, req.Snapshot) resultCh <- ConsistencyCheckResult{ Replica: replica, Response: resp, Err: err, } - }); err != nil { + }, + ); err != nil { + // If we can't start tasks, the node is likely draining. Return the error + // verbatim, after all the started tasks are stopped. wg.Done() - // If we can't start tasks, the node is likely draining. Just return the error verbatim. return nil, err } + } - // Collect the master result eagerly so that we can send a SHA in the - // remaining requests (this is used for logging inconsistencies on the - // remote nodes only). - if len(results) == 0 { - wg.Wait() - result := <-resultCh + // Collect the results from all replicas, while the tasks are running. + for result := range resultCh { + results = append(results, result) + if result.Replica.IsSame(localReplica) { + // If we can't compute the local checksum, give up. This will cancel all + // the outstanding requests, and wait for the tasks above to terminate. if err := result.Err; err != nil { - // If we can't compute the local checksum, give up. return nil, errors.Wrap(err, "computing own checksum") } - results = append(results, result) + // Put the local replica first in the list. + results[0], results[len(results)-1] = results[len(results)-1], results[0] + } + // If it was the last request, don't wait on the channel anymore. + if len(results) == len(replicas) { + break } - } - - wg.Wait() - close(resultCh) - - // Collect the remaining results. - for result := range resultCh { - results = append(results, result) } return results, nil diff --git a/pkg/kv/kvserver/stores_server.go b/pkg/kv/kvserver/stores_server.go index d49f21f74b2d..f3154aca6842 100644 --- a/pkg/kv/kvserver/stores_server.go +++ b/pkg/kv/kvserver/stores_server.go @@ -11,7 +11,6 @@ package kvserver import ( - "bytes" "context" "time" @@ -19,7 +18,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" - "github.com/cockroachdb/redact" ) // Server implements PerReplicaServer. @@ -53,7 +51,7 @@ func (is Server) execStoreCommand( func (is Server) CollectChecksum( ctx context.Context, req *CollectChecksumRequest, ) (*CollectChecksumResponse, error) { - resp := &CollectChecksumResponse{} + var resp *CollectChecksumResponse err := is.execStoreCommand(ctx, req.StoreRequestHeader, func(ctx context.Context, s *Store) error { ctx, cancel := s.stopper.WithCancelOnQuiesce(ctx) @@ -66,17 +64,7 @@ func (is Server) CollectChecksum( if err != nil { return err } - if !bytes.Equal(req.Checksum, ccr.Checksum) { - // If this check is false, then this request is the replica carrying out - // the consistency check. The message is spurious, but we want to leave the - // snapshot (if present) intact. - if len(req.Checksum) > 0 { - log.Errorf(ctx, "consistency check failed on range r%d: expected checksum %x, got %x", - req.RangeID, redact.Safe(req.Checksum), redact.Safe(ccr.Checksum)) - // Leave resp.Snapshot alone so that the caller will receive what's - // in it (if anything). - } - } else { + if !req.WithSnapshot { ccr.Snapshot = nil } resp = &ccr