Skip to content

Commit

Permalink
kvserver: send all consistency requests in parallel
Browse files Browse the repository at this point in the history
Currently, the replica initiating the consistency check sends a collection
request to itself first, and only then to other replicas in parallel. This
results in substantial asynchrony on the receiving replica, between the
incoming CollectChecksum request and the checksum computation task started by
the ComputeChecksum message. The current solution to that is keeping the
checksum computation results in memory for replicaChecksumGCInterval to return
them to late arriving requests.

The reason why the first checksum collection blocks the others is that it
computes the "master checksum", which is then added to all other requests.
However, this field is only used by the receiving end to log an inconsistency
error. The actual killing of this replica happens on the second phase of the
protocol, after the initiating replica commits another Raft message with the
Terminate field populated. So, there is no strong reason to keep this blocking
behaviour.

If the initiating replica fails to compute its local checksum, it does not send
requests to other replicas. This is problematic because the checksum tasks will
be run on all replicas, which opens the possibility for accumulating many such
dangling checks.

This commit makes all the checksum collection requests parallel. Benefits:

- There is less asynchrony between the sender and receiver, so we can drop the
  GC (in follow-up commits), and require an incoming request before starting
  the checksum computation task.

- All the outgoing collection requests are now explicitly canceled if the local
  computation fails. This way, the cancelation signal has more chance to
  propagate to all replicas and cancel the tasks that were started anyway.

Release justification: performance and stability improvement

Release note (bug fix): Consistency checks are now sent to all replicas in
parallel, previously it would be blocked on processing the local replica first.
This a) reduces the latency of one check 2x, and b) allows better propagation
of the cancelation signal which results in fewer abandoned tasks on remote
replicas, and more resources spent on useful checks.
  • Loading branch information
pav-kv committed Sep 7, 2022
1 parent 9877729 commit f72d785
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 74 deletions.
9 changes: 6 additions & 3 deletions pkg/kv/kvserver/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,18 @@ message CollectChecksumRequest {
bytes checksum_id = 3 [(gogoproto.nullable) = false,
(gogoproto.customname) = "ChecksumID",
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"];
bytes checksum = 4;
reserved 4;
// If true then the response must include the snapshot of the data from which
// the checksum is computed.
bool with_snapshot = 5;
}

message CollectChecksumResponse {
// The checksum is the sha512 hash of the requested computation. It is empty
// if the computation failed.
bytes checksum = 1;
// snapshot is set if the roachpb.ComputeChecksumRequest had snapshot = true
// and the response checksum is different from the request checksum.
// snapshot is set if the with_snapshot in CollectChecksumRequest is true. For
// example, it can be set by the caller when it has detected an inconsistency.
//
// TODO(tschottdorf): with larger ranges, this is no longer tenable.
// See https://github.com/cockroachdb/cockroach/issues/21128.
Expand Down
9 changes: 5 additions & 4 deletions pkg/kv/kvserver/consistency_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ const consistencyCheckRateMinWait = 100 * time.Millisecond
// replication factor of 7 could run 7 concurrent checks on every node.
//
// Note that checksum calculations below Raft are not tied to the caller's
// context (especially on followers), and will continue to run even after the
// caller has given up on them, which may cause them to build up.
// context, and may continue to run even after the caller has given up on them,
// which may cause them to build up. Although we do best effort to cancel the
// running task on the receiving end when the incoming request is aborted.
//
// CHECK_STATS checks do not count towards this limit, as they are cheap and the
// DistSender will parallelize them across all ranges (notably when calling
Expand All @@ -76,8 +77,8 @@ const consistencyCheckAsyncConcurrency = 7

// consistencyCheckAsyncTimeout is a below-Raft timeout for asynchronous
// consistency check calculations. These are not tied to the caller's context,
// and thus will continue to run even after the caller has given up on them, so
// we give them an upper timeout to prevent them from running forever.
// and thus may continue to run even if the caller has given up on them, so we
// give them an upper timeout to prevent them from running forever.
const consistencyCheckAsyncTimeout = time.Hour

var testingAggressiveConsistencyChecks = envutil.EnvOrDefaultBool("COCKROACH_CONSISTENCY_AGGRESSIVE", false)
Expand Down
101 changes: 48 additions & 53 deletions pkg/kv/kvserver/replica_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"crypto/sha512"
"encoding/binary"
"fmt"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -47,10 +46,15 @@ import (

// How long to keep consistency checker checksums in-memory for collection.
// Typically a long-poll waits for the result of the computation, so it's almost
// immediately collected. However, the consistency checker synchronously
// collects the first replica's checksum before all others, so if the first one
// is slow the checksum may not be collected right away, and that first
// consistency check can take a long time due to rate limiting and range size.
// immediately collected.
//
// Up to 22.1, the consistency check initiator used to synchronously collect the
// first replica's checksum before all others, so checksum collection requests
// could arrive late if the first one was slow. Since 22.2, all requests are
// parallel and likely arrive quickly.
//
// TODO(pavelkalinnikov): Consider removing GC behaviour in 23.1+, when all the
// incoming requests are from 22.2+ nodes (hence arrive timely).
const replicaChecksumGCInterval = time.Hour

// fatalOnStatsMismatch, if true, turns stats mismatches into fatal errors. A
Expand Down Expand Up @@ -87,10 +91,10 @@ type replicaChecksum struct {
// ComputeChecksum through Raft and then issues CollectChecksum commands to the
// other replicas. These are inspected and a CheckConsistencyResponse is assembled.
//
// When args.Mode is CHECK_VIA_QUEUE and an inconsistency is detected and no
// diff was requested, the consistency check will be re-run to collect a diff,
// which is then printed before calling `log.Fatal`. This behavior should be
// lifted to the consistency checker queue in the future.
// When req.Mode is CHECK_VIA_QUEUE and an inconsistency is detected, the
// consistency check will be re-run to collect a diff, which is then printed
// before calling `log.Fatal`. This behavior should be lifted to the consistency
// checker queue in the future.
func (r *Replica) CheckConsistency(
ctx context.Context, req roachpb.CheckConsistencyRequest,
) (roachpb.CheckConsistencyResponse, *roachpb.Error) {
Expand Down Expand Up @@ -328,7 +332,7 @@ type ConsistencyCheckResult struct {
}

func (r *Replica) collectChecksumFromReplica(
ctx context.Context, replica roachpb.ReplicaDescriptor, id uuid.UUID, checksum []byte,
ctx context.Context, replica roachpb.ReplicaDescriptor, id uuid.UUID, withSnap bool,
) (CollectChecksumResponse, error) {
conn, err := r.store.cfg.NodeDialer.Dial(ctx, replica.NodeID, rpc.DefaultClass)
if err != nil {
Expand All @@ -340,7 +344,7 @@ func (r *Replica) collectChecksumFromReplica(
StoreRequestHeader: StoreRequestHeader{NodeID: replica.NodeID, StoreID: replica.StoreID},
RangeID: r.RangeID,
ChecksumID: id,
Checksum: checksum,
WithSnapshot: withSnap,
}
resp, err := client.CollectChecksum(ctx, req)
if err != nil {
Expand All @@ -364,70 +368,61 @@ func (r *Replica) runConsistencyCheck(
}
ccRes := res.(*roachpb.ComputeChecksumResponse)

var orderedReplicas []roachpb.ReplicaDescriptor
{
desc := r.Desc()
localReplica, err := r.GetReplicaDescriptor()
if err != nil {
return nil, errors.Wrap(err, "could not get replica descriptor")
}

// Move the local replica to the front (which makes it the "master"
// we're comparing against).
orderedReplicas = append(orderedReplicas, desc.Replicas().Descriptors()...)

sort.Slice(orderedReplicas, func(i, j int) bool {
return orderedReplicas[i] == localReplica
})
replSet := r.Desc().Replicas()
localReplica, found := replSet.GetReplicaDescriptorByID(r.replicaID)
if !found {
return nil, errors.New("could not get local replica descriptor")
}
replicas := replSet.Descriptors()

resultCh := make(chan ConsistencyCheckResult, len(replicas))
results := make([]ConsistencyCheckResult, 0, len(replicas))

resultCh := make(chan ConsistencyCheckResult, len(orderedReplicas))
var results []ConsistencyCheckResult
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(ctx)

defer close(resultCh) // close the channel when
defer wg.Wait() // writers have terminated
defer cancel() // but cancel them first
// P.S. Have you noticed the Haiku?

for _, replica := range orderedReplicas {
for _, replica := range replicas {
wg.Add(1)
replica := replica // per-iteration copy for the goroutine
if err := r.store.Stopper().RunAsyncTask(ctx, "storage.Replica: checking consistency",
func(ctx context.Context) {
defer wg.Done()

var masterChecksum []byte
if len(results) > 0 {
masterChecksum = results[0].Response.Checksum
}
resp, err := r.collectChecksumFromReplica(ctx, replica, ccRes.ChecksumID, masterChecksum)
resp, err := r.collectChecksumFromReplica(ctx, replica, ccRes.ChecksumID, req.Snapshot)
resultCh <- ConsistencyCheckResult{
Replica: replica,
Response: resp,
Err: err,
}
}); err != nil {
},
); err != nil {
// If we can't start tasks, the node is likely draining. Return the error
// verbatim, after all the started tasks are stopped.
wg.Done()
// If we can't start tasks, the node is likely draining. Just return the error verbatim.
return nil, err
}
}

// Collect the master result eagerly so that we can send a SHA in the
// remaining requests (this is used for logging inconsistencies on the
// remote nodes only).
if len(results) == 0 {
wg.Wait()
result := <-resultCh
// Collect the results from all replicas, while the tasks are running.
for result := range resultCh {
results = append(results, result)
if result.Replica.IsSame(localReplica) {
// If we can't compute the local checksum, give up. This will cancel all
// the outstanding requests, and wait for the tasks above to terminate.
if err := result.Err; err != nil {
// If we can't compute the local checksum, give up.
return nil, errors.Wrap(err, "computing own checksum")
}
results = append(results, result)
// Put the local replica first in the list.
results[0], results[len(results)-1] = results[len(results)-1], results[0]
}
// If it was the last request, don't wait on the channel anymore.
if len(results) == len(replicas) {
break
}
}

wg.Wait()
close(resultCh)

// Collect the remaining results.
for result := range resultCh {
results = append(results, result)
}

return results, nil
Expand Down
16 changes: 2 additions & 14 deletions pkg/kv/kvserver/stores_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@
package kvserver

import (
"bytes"
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/redact"
)

// Server implements PerReplicaServer.
Expand Down Expand Up @@ -53,7 +51,7 @@ func (is Server) execStoreCommand(
func (is Server) CollectChecksum(
ctx context.Context, req *CollectChecksumRequest,
) (*CollectChecksumResponse, error) {
resp := &CollectChecksumResponse{}
var resp *CollectChecksumResponse
err := is.execStoreCommand(ctx, req.StoreRequestHeader,
func(ctx context.Context, s *Store) error {
ctx, cancel := s.stopper.WithCancelOnQuiesce(ctx)
Expand All @@ -66,17 +64,7 @@ func (is Server) CollectChecksum(
if err != nil {
return err
}
if !bytes.Equal(req.Checksum, ccr.Checksum) {
// If this check is false, then this request is the replica carrying out
// the consistency check. The message is spurious, but we want to leave the
// snapshot (if present) intact.
if len(req.Checksum) > 0 {
log.Errorf(ctx, "consistency check failed on range r%d: expected checksum %x, got %x",
req.RangeID, redact.Safe(req.Checksum), redact.Safe(ccr.Checksum))
// Leave resp.Snapshot alone so that the caller will receive what's
// in it (if anything).
}
} else {
if !req.WithSnapshot {
ccr.Snapshot = nil
}
resp = &ccr
Expand Down

0 comments on commit f72d785

Please sign in to comment.