Skip to content

Commit

Permalink
kvserver: send all consistency requests in parallel
Browse files Browse the repository at this point in the history
Currently, the replica initiating the consistency check sends a collection
request to itself first, and only then to other replicas in parallel. This
results in substantial asynchrony on the receiving replica, between the
incoming CollectChecksum request and the checksum computation task started by
the ComputeChecksum message. The current solution to that is keeping the
checksum computation results in memory for replicaChecksumGCInterval to return
them to late arriving requests.

The reason why the first checksum collection blocks the others is that it
computes the "master checksum", which is then added to all other requests.
However, this field is only used by the receiving end to log an inconsistency
error. The actual killing of this replica happens on the second phase of the
protocol, after the initiating replica commits another Raft message with the
Terminate field populated. So, there is no strong reason to keep this blocking
behaviour.

If the initiating replica fails to compute its local checksum, it does not send
requests to other replicas. This is problematic because the checksum tasks will
be run on all replicas, which opens the possibility for accumulating many such
dangling checks.

This commit makes all the checksum collection requests parallel. Benefits:

- There is less asynchrony between the sender and receiver, so we can drop the
  GC (in follow-up commits), and require an incoming request before starting
  the checksum computation task.

- All the outgoing collection requests are now explicitly canceled if the local
  computation fails. This way, the cancelation signal has more chance to
  propagate to all replicas and cancel the tasks that were started anyway.

Release justification: performance and stability improvement

Release note (bug fix): Consistency checks are now sent to all replicas in
parallel, previously it would be blocked on processing the local replica first.
This a) reduces the latency of one check 2x, and b) allows better propagation
of the cancelation signal which results in fewer abandoned tasks on remote
replicas, and more resources spent on useful checks.
  • Loading branch information
pav-kv committed Aug 26, 2022
1 parent ded127f commit 1a2dd2c
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 53 deletions.
9 changes: 5 additions & 4 deletions pkg/kv/kvserver/consistency_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ const consistencyCheckRateMinWait = 100 * time.Millisecond
// replication factor of 7 could run 7 concurrent checks on every node.
//
// Note that checksum calculations below Raft are not tied to the caller's
// context (especially on followers), and will continue to run even after the
// caller has given up on them, which may cause them to build up.
// context, and may continue to run even after the caller has given up on them,
// which may cause them to build up. Although we do best effort to cancel the
// running task on the receiving end when the incoming request is aborted.
//
// CHECK_STATS checks do not count towards this limit, as they are cheap and the
// DistSender will parallelize them across all ranges (notably when calling
Expand All @@ -76,8 +77,8 @@ const consistencyCheckAsyncConcurrency = 7

// consistencyCheckAsyncTimeout is a below-Raft timeout for asynchronous
// consistency check calculations. These are not tied to the caller's context,
// and thus will continue to run even after the caller has given up on them, so
// we give them an upper timeout to prevent them from running forever.
// and thus may continue to run even if the caller has given up on them, so we
// give them an upper timeout to prevent them from running forever.
const consistencyCheckAsyncTimeout = time.Hour

var testingAggressiveConsistencyChecks = envutil.EnvOrDefaultBool("COCKROACH_CONSISTENCY_AGGRESSIVE", false)
Expand Down
90 changes: 41 additions & 49 deletions pkg/kv/kvserver/replica_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"crypto/sha512"
"encoding/binary"
"fmt"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -47,10 +46,14 @@ import (

// How long to keep consistency checker checksums in-memory for collection.
// Typically a long-poll waits for the result of the computation, so it's almost
// immediately collected. However, the consistency checker synchronously
// collects the first replica's checksum before all others, so if the first one
// is slow the checksum may not be collected right away, and that first
// consistency check can take a long time due to rate limiting and range size.
// immediately collected.
//
// TODO(pavelkalinnikov): The consistency checker used to synchronously collect
// the first replica's checksum before all others, so if the first one was slow
// the checksum could not be collected right away, and that first consistency
// check could take a long time due to rate limiting and range size. Now, all
// requests are sent immediately in parallel, so we can remove the GC altogether
// and synchronize the request with the task.
const replicaChecksumGCInterval = time.Hour

// fatalOnStatsMismatch, if true, turns stats mismatches into fatal errors. A
Expand Down Expand Up @@ -334,7 +337,7 @@ type ConsistencyCheckResult struct {
}

func (r *Replica) collectChecksumFromReplica(
ctx context.Context, replica roachpb.ReplicaDescriptor, id uuid.UUID, checksum []byte,
ctx context.Context, replica roachpb.ReplicaDescriptor, id uuid.UUID,
) (CollectChecksumResponse, error) {
conn, err := r.store.cfg.NodeDialer.Dial(ctx, replica.NodeID, rpc.DefaultClass)
if err != nil {
Expand All @@ -346,7 +349,7 @@ func (r *Replica) collectChecksumFromReplica(
StoreRequestHeader: StoreRequestHeader{NodeID: replica.NodeID, StoreID: replica.StoreID},
RangeID: r.RangeID,
ChecksumID: id,
Checksum: checksum,
Checksum: nil, // TODO(pavelkalinnikov): Deprecate this field.
}
resp, err := client.CollectChecksum(ctx, req)
if err != nil {
Expand All @@ -370,70 +373,59 @@ func (r *Replica) RunConsistencyCheck(
}
ccRes := res.(*roachpb.ComputeChecksumResponse)

var orderedReplicas []roachpb.ReplicaDescriptor
{
desc := r.Desc()
localReplica, err := r.GetReplicaDescriptor()
if err != nil {
return nil, errors.Wrap(err, "could not get replica descriptor")
}

// Move the local replica to the front (which makes it the "master"
// we're comparing against).
orderedReplicas = append(orderedReplicas, desc.Replicas().Descriptors()...)

sort.Slice(orderedReplicas, func(i, j int) bool {
return orderedReplicas[i] == localReplica
})
localReplica, err := r.GetReplicaDescriptor()
if err != nil {
return nil, errors.Wrap(err, "could not get replica descriptor")
}

resultCh := make(chan ConsistencyCheckResult, len(orderedReplicas))
var results []ConsistencyCheckResult
replicas := r.Desc().Replicas().Descriptors()
resultCh := make(chan ConsistencyCheckResult, len(replicas))
results := make([]ConsistencyCheckResult, 0, len(replicas))

var wg sync.WaitGroup
ctx, cancel := context.WithCancel(ctx)

defer close(resultCh) // close the channel only after
defer wg.Wait() // all producers have terminated
defer cancel() // which happens after we cancel them

for _, replica := range orderedReplicas {
for _, replica := range replicas {
wg.Add(1)
replica := replica // per-iteration copy for the goroutine
if err := r.store.Stopper().RunAsyncTask(ctx, "storage.Replica: checking consistency",
func(ctx context.Context) {
defer wg.Done()

var masterChecksum []byte
if len(results) > 0 {
masterChecksum = results[0].Response.Checksum
}
resp, err := r.collectChecksumFromReplica(ctx, replica, ccRes.ChecksumID, masterChecksum)
resp, err := r.collectChecksumFromReplica(ctx, replica, ccRes.ChecksumID)
resultCh <- ConsistencyCheckResult{
Replica: replica,
Response: resp,
Err: err,
}
}); err != nil {
},
); err != nil {
// If we can't start tasks, the node is likely draining. Return the error
// verbatim, after all the started tasks are stopped.
wg.Done()
// If we can't start tasks, the node is likely draining. Just return the error verbatim.
return nil, err
}
}

// Collect the master result eagerly so that we can send a SHA in the
// remaining requests (this is used for logging inconsistencies on the
// remote nodes only).
if len(results) == 0 {
wg.Wait()
result := <-resultCh
// Collect the results from all replicas, while the tasks are running.
for result := range resultCh {
results = append(results, result)
if result.Replica.IsSame(localReplica) {
// If we can't compute the local checksum, give up. This will cancel all
// the outstanding requests, and wait for the tasks above to terminate.
if err := result.Err; err != nil {
// If we can't compute the local checksum, give up.
return nil, errors.Wrap(err, "computing own checksum")
}
results = append(results, result)
// Put the local replica first in the list.
results[0], results[len(results)-1] = results[len(results)-1], results[0]
}
// If it was the last request, don't wait on the channel anymore.
if len(results) == len(replicas) {
break
}
}

wg.Wait()
close(resultCh)

// Collect the remaining results.
for result := range resultCh {
results = append(results, result)
}

return results, nil
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/stores_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func (is Server) CollectChecksum(
// If this check is false, then this request is the replica carrying out
// the consistency check. The message is spurious, but we want to leave the
// snapshot (if present) intact.
//
// TODO(pavelkalinnikov): The Checksum field is no longer populated.
// Drop this check, or move it to where the ComputeChecksum.Terminate
// field takes effect.
if len(req.Checksum) > 0 {
log.Errorf(ctx, "consistency check failed on range r%d: expected checksum %x, got %x",
req.RangeID, redact.Safe(req.Checksum), redact.Safe(ccr.Checksum))
Expand Down

0 comments on commit 1a2dd2c

Please sign in to comment.