Skip to content

Commit

Permalink
kv: Priority sending and receiving snapshots
Browse files Browse the repository at this point in the history
Previously on both the send and receive snapshot side, the
various places that sent snapshots were uncoordinated, and
the choice of which snapshot was sent was somewhat arbitrary.
This PR uses the new multiqueue to prioritize them correctly.

Release note (performance improvement): Snapshots use a fair
round-robin approach for choosing which one to send next.

Release Justification: A large number of customer support
cases have occurred due to incorrect receive side snapshot
prioritization. Decommissioning will complete faster with this
change.

Release justification:
  • Loading branch information
andrewbaptist committed Aug 22, 2022
1 parent 9c7db33 commit 9c338eb
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 43 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ go_library(
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/liveness",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/kv/kvserver/multiqueue",
"//pkg/kv/kvserver/raftentry",
"//pkg/kv/kvserver/raftutil",
"//pkg/kv/kvserver/rangefeed",
Expand Down Expand Up @@ -413,6 +414,7 @@ go_test(
"//pkg/util/contextutil",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/envutil",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/leaktest",
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,10 @@ func (s *Store) ManualRaftSnapshot(repl *Replica, target roachpb.ReplicaID) erro
return err
}

// ReservationCount counts the number of outstanding reservations that are not
// running.
func (s *Store) ReservationCount() int {
return len(s.snapshotApplySem)
return s.cfg.concurrentSnapshotApplyLimit - s.snapshotApplySem.Len()
}

// RaftSchedulerPriorityID returns the Raft scheduler's prioritized range.
Expand Down
24 changes: 24 additions & 0 deletions pkg/kv/kvserver/kvserverpb/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,16 @@ message SnapshotRequest {
reserved 2;
}

// QueueName indicates the source of the snapshot. Snapshots are prioritized
// within a queue and round-robin selected between queues for both the sending
// and receiving side.
enum QueueName {
RAFT_SNAPSHOT_QUEUE = 0;
REPLICATE_QUEUE = 1;
OTHER = 2;
}


message Header {
// The replica state at the time the snapshot was generated. Note
// that ReplicaState.Desc differs from the above range_descriptor
Expand All @@ -170,12 +180,14 @@ message SnapshotRequest {
int64 range_size = 3;

// The priority of the snapshot.
// TODO(abaptist): Deprecated, remove this in v23.1
Priority priority = 6;

// The strategy of the snapshot.
Strategy strategy = 7;

// The type of the snapshot.
// TODO(abaptist): Deprecated, remove this in v23.1
Type type = 9;

// Whether the snapshot uses the unreplicated RaftTruncatedState or not.
Expand All @@ -190,6 +202,12 @@ message SnapshotRequest {
// TODO(irfansharif): Remove this in v22.1.
bool deprecated_unreplicated_truncated_state = 8;

// The sending queue's name.
SnapshotRequest.QueueName sender_queue_name = 10;

// The sending queue's priority.
double sender_queue_priority = 11;

reserved 1, 4;
}

Expand Down Expand Up @@ -234,6 +252,12 @@ message DelegateSnapshotRequest {
// The priority of the snapshot.
SnapshotRequest.Priority priority = 5;

// The sending queue's name.
SnapshotRequest.QueueName sender_queue_name = 9;

// The sending queue's priority.
double sender_queue_priority = 10;

// The type of the snapshot.
SnapshotRequest.Type type = 6;

Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/raft_snapshot_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ func (rq *raftSnapshotQueue) processRaftSnapshot(
}
}

err := repl.sendSnapshot(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY)
err := repl.sendSnapshot(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY,
kvserverpb.SnapshotRequest_RAFT_SNAPSHOT_QUEUE, raftSnapshotPriority)

// NB: if the snapshot fails because of an overlapping replica on the
// recipient which is also waiting for a snapshot, the "smart" thing is to
Expand Down
40 changes: 25 additions & 15 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -981,13 +981,15 @@ func (r *Replica) ChangeReplicas(
return nil, errors.New("must disable replicate queue to use ChangeReplicas manually")
}
}
return r.changeReplicasImpl(ctx, desc, priority, reason, details, chgs)
return r.changeReplicasImpl(ctx, desc, priority, kvserverpb.SnapshotRequest_OTHER, 0.0, reason, details, chgs)
}

func (r *Replica) changeReplicasImpl(
ctx context.Context,
desc *roachpb.RangeDescriptor,
priority kvserverpb.SnapshotRequest_Priority,
senderName kvserverpb.SnapshotRequest_QueueName,
senderQueuePriority float64,
reason kvserverpb.RangeLogEventReason,
details string,
chgs roachpb.ReplicationChanges,
Expand Down Expand Up @@ -1054,7 +1056,7 @@ func (r *Replica) changeReplicasImpl(
_ = roachpb.ReplicaSet.LearnerDescriptors
var err error
desc, err = r.initializeRaftLearners(
ctx, desc, priority, reason, details, adds, roachpb.LEARNER,
ctx, desc, priority, senderName, senderQueuePriority, reason, details, adds, roachpb.LEARNER,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1100,7 +1102,7 @@ func (r *Replica) changeReplicasImpl(
// disruption to foreground traffic. See
// https://github.com/cockroachdb/cockroach/issues/63199 for an example.
desc, err = r.initializeRaftLearners(
ctx, desc, priority, reason, details, adds, roachpb.NON_VOTER,
ctx, desc, priority, senderName, senderQueuePriority, reason, details, adds, roachpb.NON_VOTER,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1654,6 +1656,8 @@ func (r *Replica) initializeRaftLearners(
ctx context.Context,
desc *roachpb.RangeDescriptor,
priority kvserverpb.SnapshotRequest_Priority,
senderName kvserverpb.SnapshotRequest_QueueName,
senderQueuePriority float64,
reason kvserverpb.RangeLogEventReason,
details string,
targets []roachpb.ReplicationTarget,
Expand Down Expand Up @@ -1799,7 +1803,7 @@ func (r *Replica) initializeRaftLearners(
// orphaned learner. Second, this tickled some bugs in etcd/raft around
// switching between StateSnapshot and StateProbe. Even if we worked through
// these, it would be susceptible to future similar issues.
if err := r.sendSnapshot(ctx, rDesc, kvserverpb.SnapshotRequest_INITIAL, priority); err != nil {
if err := r.sendSnapshot(ctx, rDesc, kvserverpb.SnapshotRequest_INITIAL, priority, senderName, senderQueuePriority); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -2584,6 +2588,8 @@ func (r *Replica) sendSnapshot(
recipient roachpb.ReplicaDescriptor,
snapType kvserverpb.SnapshotRequest_Type,
priority kvserverpb.SnapshotRequest_Priority,
senderQueueName kvserverpb.SnapshotRequest_QueueName,
senderQueuePriority float64,
) (retErr error) {
defer func() {
// Report the snapshot status to Raft, which expects us to do this once we
Expand Down Expand Up @@ -2631,13 +2637,15 @@ func (r *Replica) sendSnapshot(

// Create new delegate snapshot request with only required metadata.
delegateRequest := &kvserverpb.DelegateSnapshotRequest{
RangeID: r.RangeID,
CoordinatorReplica: sender,
RecipientReplica: recipient,
Priority: priority,
Type: snapType,
Term: status.Term,
DelegatedSender: sender,
RangeID: r.RangeID,
CoordinatorReplica: sender,
RecipientReplica: recipient,
Priority: priority,
SenderQueueName: senderQueueName,
SenderQueuePriority: senderQueuePriority,
Type: snapType,
Term: status.Term,
DelegatedSender: sender,
}
err = contextutil.RunWithTimeout(
ctx, "delegate-snapshot", sendSnapshotTimeout, func(ctx context.Context) error {
Expand Down Expand Up @@ -2777,10 +2785,12 @@ func (r *Replica) followerSendSnapshot(
Snapshot: snap.RaftSnap,
},
},
RangeSize: rangeSize,
Priority: req.Priority,
Strategy: kvserverpb.SnapshotRequest_KV_BATCH,
Type: req.Type,
RangeSize: rangeSize,
Priority: req.Priority,
SenderQueueName: req.SenderQueueName,
SenderQueuePriority: req.SenderQueuePriority,
Strategy: kvserverpb.SnapshotRequest_KV_BATCH,
Type: req.Type,
}
newBatchFn := func() storage.Batch {
return r.store.Engine().NewUnindexedBatch(true /* writeOnly */)
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
Expand Down Expand Up @@ -240,6 +241,10 @@ func TestAddReplicaWithReceiverThrottling(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// This test assumes that the snapshot send limit is set to 1 so the second
// snapshot send will block waiting for the first to complete.
cleanup := envutil.TestSetEnv(t, "COCKROACH_CONCURRENT_SNAPSHOT_SEND_LIMIT", "1")
defer cleanup()
// blockIncomingSnapshots will block receiving snapshots.
blockIncomingSnapshots := make(chan struct{})
waitForRebalanceToBlockCh := make(chan struct{})
Expand Down Expand Up @@ -991,6 +996,8 @@ func TestLearnerAdminChangeReplicasRace(t *testing.T) {
func TestLearnerReplicateQueueRace(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
cleanup := envutil.TestSetEnv(t, "COCKROACH_CONCURRENT_SNAPSHOT_SEND_LIMIT", "2")
defer cleanup()

var skipReceiveSnapshotKnobAtomic int64 = 1
blockUntilSnapshotCh := make(chan struct{}, 2)
Expand Down
31 changes: 22 additions & 9 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ func (rq *replicateQueue) processOneChange(
// unavailability; see:
_ = execChangeReplicasTxn

action, _ := rq.allocator.ComputeAction(ctx, conf, desc)
action, allocatorPrio := rq.allocator.ComputeAction(ctx, conf, desc)
log.VEventf(ctx, 1, "next replica action: %s", action)

switch action {
Expand All @@ -787,13 +787,13 @@ func (rq *replicateQueue) processOneChange(
// Add replicas.
case allocatorimpl.AllocatorAddVoter:
requeue, err := rq.addOrReplaceVoters(
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, dryRun,
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, allocatorPrio, dryRun,
)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
return requeue, err
case allocatorimpl.AllocatorAddNonVoter:
requeue, err := rq.addOrReplaceNonVoters(
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, dryRun,
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, allocatorPrio, dryRun,
)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
return requeue, err
Expand Down Expand Up @@ -821,7 +821,7 @@ func (rq *replicateQueue) processOneChange(
deadVoterReplicas[0], voterReplicas)
}
requeue, err := rq.addOrReplaceVoters(
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, dryRun)
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, allocatorPrio, dryRun)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
return requeue, err
case allocatorimpl.AllocatorReplaceDeadNonVoter:
Expand All @@ -836,7 +836,7 @@ func (rq *replicateQueue) processOneChange(
deadNonVoterReplicas[0], nonVoterReplicas)
}
requeue, err := rq.addOrReplaceNonVoters(
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, dryRun)
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, allocatorPrio, dryRun)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
return requeue, err

Expand All @@ -854,7 +854,7 @@ func (rq *replicateQueue) processOneChange(
decommissioningVoterReplicas[0], voterReplicas)
}
requeue, err := rq.addOrReplaceVoters(
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, dryRun)
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, allocatorPrio, dryRun)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
if err != nil {
return requeue, decommissionPurgatoryError{err}
Expand All @@ -872,7 +872,7 @@ func (rq *replicateQueue) processOneChange(
decommissioningNonVoterReplicas[0], nonVoterReplicas)
}
requeue, err := rq.addOrReplaceNonVoters(
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, dryRun)
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, allocatorPrio, dryRun)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
if err != nil {
return requeue, decommissionPurgatoryError{err}
Expand Down Expand Up @@ -919,6 +919,7 @@ func (rq *replicateQueue) processOneChange(
repl,
voterReplicas,
nonVoterReplicas,
allocatorPrio,
canTransferLeaseFrom,
scatter,
dryRun,
Expand Down Expand Up @@ -963,6 +964,7 @@ func (rq *replicateQueue) addOrReplaceVoters(
liveVoterReplicas, liveNonVoterReplicas []roachpb.ReplicaDescriptor,
removeIdx int,
replicaStatus allocatorimpl.ReplicaStatus,
allocatorPriority float64,
dryRun bool,
) (requeue bool, _ error) {
desc, conf := repl.DescAndSpanConfig()
Expand Down Expand Up @@ -1082,6 +1084,7 @@ func (rq *replicateQueue) addOrReplaceVoters(
ops,
desc,
kvserverpb.SnapshotRequest_RECOVERY,
allocatorPriority,
kvserverpb.ReasonRangeUnderReplicated,
details,
dryRun,
Expand All @@ -1101,6 +1104,7 @@ func (rq *replicateQueue) addOrReplaceNonVoters(
liveVoterReplicas, liveNonVoterReplicas []roachpb.ReplicaDescriptor,
removeIdx int,
replicaStatus allocatorimpl.ReplicaStatus,
allocatorPrio float64,
dryRun bool,
) (requeue bool, _ error) {
desc, conf := repl.DescAndSpanConfig()
Expand Down Expand Up @@ -1138,6 +1142,7 @@ func (rq *replicateQueue) addOrReplaceNonVoters(
ops,
desc,
kvserverpb.SnapshotRequest_RECOVERY,
allocatorPrio,
kvserverpb.ReasonRangeUnderReplicated,
details,
dryRun,
Expand Down Expand Up @@ -1326,6 +1331,7 @@ func (rq *replicateQueue) removeVoter(
roachpb.MakeReplicationChanges(roachpb.REMOVE_VOTER, removeVoter),
desc,
kvserverpb.SnapshotRequest_UNKNOWN, // unused
0.0, // unused
kvserverpb.ReasonRangeOverReplicated,
details,
dryRun,
Expand Down Expand Up @@ -1369,7 +1375,8 @@ func (rq *replicateQueue) removeNonVoter(
repl,
roachpb.MakeReplicationChanges(roachpb.REMOVE_NON_VOTER, target),
desc,
kvserverpb.SnapshotRequest_UNKNOWN,
kvserverpb.SnapshotRequest_UNKNOWN, // unused
0.0, // unused
kvserverpb.ReasonRangeOverReplicated,
details,
dryRun,
Expand Down Expand Up @@ -1429,6 +1436,7 @@ func (rq *replicateQueue) removeDecommissioning(
roachpb.MakeReplicationChanges(targetType.RemoveChangeType(), target),
desc,
kvserverpb.SnapshotRequest_UNKNOWN, // unused
0.0, // unused
kvserverpb.ReasonStoreDecommissioning, "", dryRun,
); err != nil {
return false, err
Expand Down Expand Up @@ -1475,6 +1483,7 @@ func (rq *replicateQueue) removeDead(
roachpb.MakeReplicationChanges(targetType.RemoveChangeType(), target),
desc,
kvserverpb.SnapshotRequest_UNKNOWN, // unused
0.0, // unused
kvserverpb.ReasonStoreDead,
"",
dryRun,
Expand All @@ -1488,6 +1497,7 @@ func (rq *replicateQueue) considerRebalance(
ctx context.Context,
repl *Replica,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
allocatorPrio float64,
canTransferLeaseFrom func(ctx context.Context, repl *Replica) bool,
scatter, dryRun bool,
) (requeue bool, _ error) {
Expand Down Expand Up @@ -1574,6 +1584,7 @@ func (rq *replicateQueue) considerRebalance(
chgs,
desc,
kvserverpb.SnapshotRequest_REBALANCE,
allocatorPrio,
kvserverpb.ReasonRebalance,
details,
dryRun,
Expand Down Expand Up @@ -1794,6 +1805,7 @@ func (rq *replicateQueue) changeReplicas(
chgs roachpb.ReplicationChanges,
desc *roachpb.RangeDescriptor,
priority kvserverpb.SnapshotRequest_Priority,
allocatorPriority float64,
reason kvserverpb.RangeLogEventReason,
details string,
dryRun bool,
Expand All @@ -1804,7 +1816,8 @@ func (rq *replicateQueue) changeReplicas(
// NB: this calls the impl rather than ChangeReplicas because
// the latter traps tests that try to call it while the replication
// queue is active.
if _, err := repl.changeReplicasImpl(ctx, desc, priority, reason, details, chgs); err != nil {
if _, err := repl.changeReplicasImpl(ctx, desc, priority, kvserverpb.SnapshotRequest_REPLICATE_QUEUE,
allocatorPriority, reason, details, chgs); err != nil {
return err
}
rangeUsageInfo := rangeUsageInfoForRepl(repl)
Expand Down
Loading

0 comments on commit 9c338eb

Please sign in to comment.