Skip to content

Commit

Permalink
kvserver: incorporate sending queue priority into snapshot requests
Browse files Browse the repository at this point in the history
This change modifies the `(Delegated)SnapshotRequest` Raft RPCs in order
to incorporate the name of the sending queue, as well as the sending
queue's priority, in order to be used to prioritize queued snapshots on
a receiving store.

Release justification: Low-risk change to existing functionality.
Release note: None
  • Loading branch information
AlexTalks committed Aug 25, 2022
1 parent 6b3cd4b commit 9913ba5
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 25 deletions.
27 changes: 27 additions & 0 deletions pkg/kv/kvserver/kvserverpb/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,15 @@ message SnapshotRequest {
reserved 2;
}

// QueueName indicates the source of the snapshot. Snapshots are prioritized
// within a queue and round-robin selected between queues for both the sending
// and receiving side.
enum QueueName {
OTHER = 0;
REPLICATE_QUEUE = 1;
RAFT_SNAPSHOT_QUEUE = 2;
}

message Header {
// The replica state at the time the snapshot was generated. Note
// that ReplicaState.Desc differs from the above range_descriptor
Expand All @@ -170,12 +179,16 @@ message SnapshotRequest {
int64 range_size = 3;

// The priority of the snapshot.
// Deprecated, prefer sender_queue_priority.
// TODO(abaptist): Remove this field for v23.1.
Priority priority = 6;

// The strategy of the snapshot.
Strategy strategy = 7;

// The type of the snapshot.
// Deprecated, prefer sender_queue_name.
// TODO(abaptist): Remove this field for v23.1.
Type type = 9;

// Whether the snapshot uses the unreplicated RaftTruncatedState or not.
Expand All @@ -190,6 +203,14 @@ message SnapshotRequest {
// TODO(irfansharif): Remove this in v22.1.
bool deprecated_unreplicated_truncated_state = 8;

// The sending queue's name, to be utilized to ensure fairness across
// different snapshot sending sources.
SnapshotRequest.QueueName sender_queue_name = 10;

// The sending queue's priority, to be utilized to prioritize snapshots
// from a particular sending source.
double sender_queue_priority = 11;

reserved 1, 4;
}

Expand Down Expand Up @@ -237,6 +258,12 @@ message DelegateSnapshotRequest {
// The priority of the snapshot.
SnapshotRequest.Priority priority = 5;

// The sending queue's name.
SnapshotRequest.QueueName sender_queue_name = 9;

// The sending queue's priority.
double sender_queue_priority = 10;

// The type of the snapshot.
SnapshotRequest.Type type = 6;

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/raft_snapshot_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (rq *raftSnapshotQueue) processRaftSnapshot(
}
}

err := repl.sendSnapshot(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY)
err := repl.sendSnapshot(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY, kvserverpb.SnapshotRequest_RAFT_SNAPSHOT_QUEUE, raftSnapshotPriority)

// NB: if the snapshot fails because of an overlapping replica on the
// recipient which is also waiting for a snapshot, the "smart" thing is to
Expand Down
42 changes: 27 additions & 15 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -981,13 +981,15 @@ func (r *Replica) ChangeReplicas(
return nil, errors.New("must disable replicate queue to use ChangeReplicas manually")
}
}
return r.changeReplicasImpl(ctx, desc, priority, reason, details, chgs)
return r.changeReplicasImpl(ctx, desc, priority, kvserverpb.SnapshotRequest_OTHER, 0.0, reason, details, chgs)
}

func (r *Replica) changeReplicasImpl(
ctx context.Context,
desc *roachpb.RangeDescriptor,
priority kvserverpb.SnapshotRequest_Priority,
senderName kvserverpb.SnapshotRequest_QueueName,
senderQueuePriority float64,
reason kvserverpb.RangeLogEventReason,
details string,
chgs roachpb.ReplicationChanges,
Expand Down Expand Up @@ -1054,7 +1056,7 @@ func (r *Replica) changeReplicasImpl(
_ = roachpb.ReplicaSet.LearnerDescriptors
var err error
desc, err = r.initializeRaftLearners(
ctx, desc, priority, reason, details, adds, roachpb.LEARNER,
ctx, desc, priority, senderName, senderQueuePriority, reason, details, adds, roachpb.LEARNER,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1100,7 +1102,7 @@ func (r *Replica) changeReplicasImpl(
// disruption to foreground traffic. See
// https://github.com/cockroachdb/cockroach/issues/63199 for an example.
desc, err = r.initializeRaftLearners(
ctx, desc, priority, reason, details, adds, roachpb.NON_VOTER,
ctx, desc, priority, senderName, senderQueuePriority, reason, details, adds, roachpb.NON_VOTER,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1654,6 +1656,8 @@ func (r *Replica) initializeRaftLearners(
ctx context.Context,
desc *roachpb.RangeDescriptor,
priority kvserverpb.SnapshotRequest_Priority,
senderName kvserverpb.SnapshotRequest_QueueName,
senderQueuePriority float64,
reason kvserverpb.RangeLogEventReason,
details string,
targets []roachpb.ReplicationTarget,
Expand Down Expand Up @@ -1799,7 +1803,9 @@ func (r *Replica) initializeRaftLearners(
// orphaned learner. Second, this tickled some bugs in etcd/raft around
// switching between StateSnapshot and StateProbe. Even if we worked through
// these, it would be susceptible to future similar issues.
if err := r.sendSnapshot(ctx, rDesc, kvserverpb.SnapshotRequest_INITIAL, priority); err != nil {
if err := r.sendSnapshot(
ctx, rDesc, kvserverpb.SnapshotRequest_INITIAL, priority, senderName, senderQueuePriority,
); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -2584,6 +2590,8 @@ func (r *Replica) sendSnapshot(
recipient roachpb.ReplicaDescriptor,
snapType kvserverpb.SnapshotRequest_Type,
priority kvserverpb.SnapshotRequest_Priority,
senderQueueName kvserverpb.SnapshotRequest_QueueName,
senderQueuePriority float64,
) (retErr error) {
defer func() {
// Report the snapshot status to Raft, which expects us to do this once we
Expand Down Expand Up @@ -2631,13 +2639,15 @@ func (r *Replica) sendSnapshot(

// Create new delegate snapshot request with only required metadata.
delegateRequest := &kvserverpb.DelegateSnapshotRequest{
RangeID: r.RangeID,
CoordinatorReplica: sender,
RecipientReplica: recipient,
Priority: priority,
Type: snapType,
Term: status.Term,
DelegatedSender: sender,
RangeID: r.RangeID,
CoordinatorReplica: sender,
RecipientReplica: recipient,
Priority: priority,
SenderQueueName: senderQueueName,
SenderQueuePriority: senderQueuePriority,
Type: snapType,
Term: status.Term,
DelegatedSender: sender,
}
err = contextutil.RunWithTimeout(
ctx, "delegate-snapshot", sendSnapshotTimeout, func(ctx context.Context) error {
Expand Down Expand Up @@ -2777,10 +2787,12 @@ func (r *Replica) followerSendSnapshot(
Snapshot: snap.RaftSnap,
},
},
RangeSize: rangeSize,
Priority: req.Priority,
Strategy: kvserverpb.SnapshotRequest_KV_BATCH,
Type: req.Type,
RangeSize: rangeSize,
Priority: req.Priority,
SenderQueueName: req.SenderQueueName,
SenderQueuePriority: req.SenderQueuePriority,
Strategy: kvserverpb.SnapshotRequest_KV_BATCH,
Type: req.Type,
}
newBatchFn := func() storage.Batch {
return r.store.Engine().NewUnindexedBatch(true /* writeOnly */)
Expand Down
33 changes: 24 additions & 9 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ func (rq *replicateQueue) processOneChange(
// unavailability; see:
_ = execChangeReplicasTxn

action, _ := rq.allocator.ComputeAction(ctx, conf, desc)
action, allocatorPrio := rq.allocator.ComputeAction(ctx, conf, desc)
log.VEventf(ctx, 1, "next replica action: %s", action)

switch action {
Expand All @@ -787,13 +787,13 @@ func (rq *replicateQueue) processOneChange(
// Add replicas.
case allocatorimpl.AllocatorAddVoter:
requeue, err := rq.addOrReplaceVoters(
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, dryRun,
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, allocatorPrio, dryRun,
)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
return requeue, err
case allocatorimpl.AllocatorAddNonVoter:
requeue, err := rq.addOrReplaceNonVoters(
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, dryRun,
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, allocatorPrio, dryRun,
)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
return requeue, err
Expand Down Expand Up @@ -821,7 +821,7 @@ func (rq *replicateQueue) processOneChange(
deadVoterReplicas[0], voterReplicas)
}
requeue, err := rq.addOrReplaceVoters(
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, dryRun)
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, allocatorPrio, dryRun)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
return requeue, err
case allocatorimpl.AllocatorReplaceDeadNonVoter:
Expand All @@ -836,7 +836,7 @@ func (rq *replicateQueue) processOneChange(
deadNonVoterReplicas[0], nonVoterReplicas)
}
requeue, err := rq.addOrReplaceNonVoters(
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, dryRun)
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, allocatorPrio, dryRun)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
return requeue, err

Expand All @@ -854,7 +854,7 @@ func (rq *replicateQueue) processOneChange(
decommissioningVoterReplicas[0], voterReplicas)
}
requeue, err := rq.addOrReplaceVoters(
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, dryRun)
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, allocatorPrio, dryRun)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
if err != nil {
return requeue, decommissionPurgatoryError{err}
Expand All @@ -872,7 +872,7 @@ func (rq *replicateQueue) processOneChange(
decommissioningNonVoterReplicas[0], nonVoterReplicas)
}
requeue, err := rq.addOrReplaceNonVoters(
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, dryRun)
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, allocatorPrio, dryRun)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
if err != nil {
return requeue, decommissionPurgatoryError{err}
Expand Down Expand Up @@ -919,6 +919,7 @@ func (rq *replicateQueue) processOneChange(
repl,
voterReplicas,
nonVoterReplicas,
allocatorPrio,
canTransferLeaseFrom,
scatter,
dryRun,
Expand Down Expand Up @@ -963,6 +964,7 @@ func (rq *replicateQueue) addOrReplaceVoters(
liveVoterReplicas, liveNonVoterReplicas []roachpb.ReplicaDescriptor,
removeIdx int,
replicaStatus allocatorimpl.ReplicaStatus,
allocatorPriority float64,
dryRun bool,
) (requeue bool, _ error) {
desc, conf := repl.DescAndSpanConfig()
Expand Down Expand Up @@ -1082,6 +1084,7 @@ func (rq *replicateQueue) addOrReplaceVoters(
ops,
desc,
kvserverpb.SnapshotRequest_RECOVERY,
allocatorPriority,
kvserverpb.ReasonRangeUnderReplicated,
details,
dryRun,
Expand All @@ -1101,6 +1104,7 @@ func (rq *replicateQueue) addOrReplaceNonVoters(
liveVoterReplicas, liveNonVoterReplicas []roachpb.ReplicaDescriptor,
removeIdx int,
replicaStatus allocatorimpl.ReplicaStatus,
allocatorPrio float64,
dryRun bool,
) (requeue bool, _ error) {
desc, conf := repl.DescAndSpanConfig()
Expand Down Expand Up @@ -1138,6 +1142,7 @@ func (rq *replicateQueue) addOrReplaceNonVoters(
ops,
desc,
kvserverpb.SnapshotRequest_RECOVERY,
allocatorPrio,
kvserverpb.ReasonRangeUnderReplicated,
details,
dryRun,
Expand Down Expand Up @@ -1326,6 +1331,7 @@ func (rq *replicateQueue) removeVoter(
roachpb.MakeReplicationChanges(roachpb.REMOVE_VOTER, removeVoter),
desc,
kvserverpb.SnapshotRequest_UNKNOWN, // unused
0.0, // unused
kvserverpb.ReasonRangeOverReplicated,
details,
dryRun,
Expand Down Expand Up @@ -1369,7 +1375,8 @@ func (rq *replicateQueue) removeNonVoter(
repl,
roachpb.MakeReplicationChanges(roachpb.REMOVE_NON_VOTER, target),
desc,
kvserverpb.SnapshotRequest_UNKNOWN,
kvserverpb.SnapshotRequest_UNKNOWN, // unused
0.0, // unused
kvserverpb.ReasonRangeOverReplicated,
details,
dryRun,
Expand Down Expand Up @@ -1429,6 +1436,7 @@ func (rq *replicateQueue) removeDecommissioning(
roachpb.MakeReplicationChanges(targetType.RemoveChangeType(), target),
desc,
kvserverpb.SnapshotRequest_UNKNOWN, // unused
0.0, // unused
kvserverpb.ReasonStoreDecommissioning, "", dryRun,
); err != nil {
return false, err
Expand Down Expand Up @@ -1475,6 +1483,7 @@ func (rq *replicateQueue) removeDead(
roachpb.MakeReplicationChanges(targetType.RemoveChangeType(), target),
desc,
kvserverpb.SnapshotRequest_UNKNOWN, // unused
0.0, // unused
kvserverpb.ReasonStoreDead,
"",
dryRun,
Expand All @@ -1488,6 +1497,7 @@ func (rq *replicateQueue) considerRebalance(
ctx context.Context,
repl *Replica,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
allocatorPrio float64,
canTransferLeaseFrom func(ctx context.Context, repl *Replica) bool,
scatter, dryRun bool,
) (requeue bool, _ error) {
Expand Down Expand Up @@ -1574,6 +1584,7 @@ func (rq *replicateQueue) considerRebalance(
chgs,
desc,
kvserverpb.SnapshotRequest_REBALANCE,
allocatorPrio,
kvserverpb.ReasonRebalance,
details,
dryRun,
Expand Down Expand Up @@ -1794,6 +1805,7 @@ func (rq *replicateQueue) changeReplicas(
chgs roachpb.ReplicationChanges,
desc *roachpb.RangeDescriptor,
priority kvserverpb.SnapshotRequest_Priority,
allocatorPriority float64,
reason kvserverpb.RangeLogEventReason,
details string,
dryRun bool,
Expand All @@ -1804,7 +1816,10 @@ func (rq *replicateQueue) changeReplicas(
// NB: this calls the impl rather than ChangeReplicas because
// the latter traps tests that try to call it while the replication
// queue is active.
if _, err := repl.changeReplicasImpl(ctx, desc, priority, reason, details, chgs); err != nil {
if _, err := repl.changeReplicasImpl(
ctx, desc, priority, kvserverpb.SnapshotRequest_REPLICATE_QUEUE, allocatorPriority, reason,
details, chgs,
); err != nil {
return err
}
rangeUsageInfo := rangeUsageInfoForRepl(repl)
Expand Down

0 comments on commit 9913ba5

Please sign in to comment.