From 9913ba58c622e32f968f79c3877872e242706371 Mon Sep 17 00:00:00 2001 From: Alex Sarkesian Date: Thu, 18 Aug 2022 14:13:01 -0400 Subject: [PATCH] kvserver: incorporate sending queue priority into snapshot requests This change modifies the `(Delegated)SnapshotRequest` Raft RPCs in order to incorporate the name of the sending queue, as well as the sending queue's priority, in order to be used to prioritize queued snapshots on a receiving store. Release justification: Low-risk change to existing functionality. Release note: None --- pkg/kv/kvserver/kvserverpb/raft.proto | 27 +++++++++++++++++ pkg/kv/kvserver/raft_snapshot_queue.go | 2 +- pkg/kv/kvserver/replica_command.go | 42 +++++++++++++++++--------- pkg/kv/kvserver/replicate_queue.go | 33 ++++++++++++++------ 4 files changed, 79 insertions(+), 25 deletions(-) diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index 1b02b41d473b..f5cbd37383a6 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -155,6 +155,15 @@ message SnapshotRequest { reserved 2; } + // QueueName indicates the source of the snapshot. Snapshots are prioritized + // within a queue and round-robin selected between queues for both the sending + // and receiving side. + enum QueueName { + OTHER = 0; + REPLICATE_QUEUE = 1; + RAFT_SNAPSHOT_QUEUE = 2; + } + message Header { // The replica state at the time the snapshot was generated. Note // that ReplicaState.Desc differs from the above range_descriptor @@ -170,12 +179,16 @@ message SnapshotRequest { int64 range_size = 3; // The priority of the snapshot. + // Deprecated, prefer sender_queue_priority. + // TODO(abaptist): Remove this field for v23.1. Priority priority = 6; // The strategy of the snapshot. Strategy strategy = 7; // The type of the snapshot. + // Deprecated, prefer sender_queue_name. + // TODO(abaptist): Remove this field for v23.1. Type type = 9; // Whether the snapshot uses the unreplicated RaftTruncatedState or not. @@ -190,6 +203,14 @@ message SnapshotRequest { // TODO(irfansharif): Remove this in v22.1. bool deprecated_unreplicated_truncated_state = 8; + // The sending queue's name, to be utilized to ensure fairness across + // different snapshot sending sources. + SnapshotRequest.QueueName sender_queue_name = 10; + + // The sending queue's priority, to be utilized to prioritize snapshots + // from a particular sending source. + double sender_queue_priority = 11; + reserved 1, 4; } @@ -237,6 +258,12 @@ message DelegateSnapshotRequest { // The priority of the snapshot. SnapshotRequest.Priority priority = 5; + // The sending queue's name. + SnapshotRequest.QueueName sender_queue_name = 9; + + // The sending queue's priority. + double sender_queue_priority = 10; + // The type of the snapshot. SnapshotRequest.Type type = 6; diff --git a/pkg/kv/kvserver/raft_snapshot_queue.go b/pkg/kv/kvserver/raft_snapshot_queue.go index 61419699ecec..c3f1d848325a 100644 --- a/pkg/kv/kvserver/raft_snapshot_queue.go +++ b/pkg/kv/kvserver/raft_snapshot_queue.go @@ -140,7 +140,7 @@ func (rq *raftSnapshotQueue) processRaftSnapshot( } } - err := repl.sendSnapshot(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY) + err := repl.sendSnapshot(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY, kvserverpb.SnapshotRequest_RAFT_SNAPSHOT_QUEUE, raftSnapshotPriority) // NB: if the snapshot fails because of an overlapping replica on the // recipient which is also waiting for a snapshot, the "smart" thing is to diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 4d669222f666..db180daaf455 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -981,13 +981,15 @@ func (r *Replica) ChangeReplicas( return nil, errors.New("must disable replicate queue to use ChangeReplicas manually") } } - return r.changeReplicasImpl(ctx, desc, priority, reason, details, chgs) + return r.changeReplicasImpl(ctx, desc, priority, kvserverpb.SnapshotRequest_OTHER, 0.0, reason, details, chgs) } func (r *Replica) changeReplicasImpl( ctx context.Context, desc *roachpb.RangeDescriptor, priority kvserverpb.SnapshotRequest_Priority, + senderName kvserverpb.SnapshotRequest_QueueName, + senderQueuePriority float64, reason kvserverpb.RangeLogEventReason, details string, chgs roachpb.ReplicationChanges, @@ -1054,7 +1056,7 @@ func (r *Replica) changeReplicasImpl( _ = roachpb.ReplicaSet.LearnerDescriptors var err error desc, err = r.initializeRaftLearners( - ctx, desc, priority, reason, details, adds, roachpb.LEARNER, + ctx, desc, priority, senderName, senderQueuePriority, reason, details, adds, roachpb.LEARNER, ) if err != nil { return nil, err @@ -1100,7 +1102,7 @@ func (r *Replica) changeReplicasImpl( // disruption to foreground traffic. See // https://github.com/cockroachdb/cockroach/issues/63199 for an example. desc, err = r.initializeRaftLearners( - ctx, desc, priority, reason, details, adds, roachpb.NON_VOTER, + ctx, desc, priority, senderName, senderQueuePriority, reason, details, adds, roachpb.NON_VOTER, ) if err != nil { return nil, err @@ -1654,6 +1656,8 @@ func (r *Replica) initializeRaftLearners( ctx context.Context, desc *roachpb.RangeDescriptor, priority kvserverpb.SnapshotRequest_Priority, + senderName kvserverpb.SnapshotRequest_QueueName, + senderQueuePriority float64, reason kvserverpb.RangeLogEventReason, details string, targets []roachpb.ReplicationTarget, @@ -1799,7 +1803,9 @@ func (r *Replica) initializeRaftLearners( // orphaned learner. Second, this tickled some bugs in etcd/raft around // switching between StateSnapshot and StateProbe. Even if we worked through // these, it would be susceptible to future similar issues. - if err := r.sendSnapshot(ctx, rDesc, kvserverpb.SnapshotRequest_INITIAL, priority); err != nil { + if err := r.sendSnapshot( + ctx, rDesc, kvserverpb.SnapshotRequest_INITIAL, priority, senderName, senderQueuePriority, + ); err != nil { return nil, err } } @@ -2584,6 +2590,8 @@ func (r *Replica) sendSnapshot( recipient roachpb.ReplicaDescriptor, snapType kvserverpb.SnapshotRequest_Type, priority kvserverpb.SnapshotRequest_Priority, + senderQueueName kvserverpb.SnapshotRequest_QueueName, + senderQueuePriority float64, ) (retErr error) { defer func() { // Report the snapshot status to Raft, which expects us to do this once we @@ -2631,13 +2639,15 @@ func (r *Replica) sendSnapshot( // Create new delegate snapshot request with only required metadata. delegateRequest := &kvserverpb.DelegateSnapshotRequest{ - RangeID: r.RangeID, - CoordinatorReplica: sender, - RecipientReplica: recipient, - Priority: priority, - Type: snapType, - Term: status.Term, - DelegatedSender: sender, + RangeID: r.RangeID, + CoordinatorReplica: sender, + RecipientReplica: recipient, + Priority: priority, + SenderQueueName: senderQueueName, + SenderQueuePriority: senderQueuePriority, + Type: snapType, + Term: status.Term, + DelegatedSender: sender, } err = contextutil.RunWithTimeout( ctx, "delegate-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { @@ -2777,10 +2787,12 @@ func (r *Replica) followerSendSnapshot( Snapshot: snap.RaftSnap, }, }, - RangeSize: rangeSize, - Priority: req.Priority, - Strategy: kvserverpb.SnapshotRequest_KV_BATCH, - Type: req.Type, + RangeSize: rangeSize, + Priority: req.Priority, + SenderQueueName: req.SenderQueueName, + SenderQueuePriority: req.SenderQueuePriority, + Strategy: kvserverpb.SnapshotRequest_KV_BATCH, + Type: req.Type, } newBatchFn := func() storage.Batch { return r.store.Engine().NewUnindexedBatch(true /* writeOnly */) diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index cdd55b16f6a7..81c79fdbf271 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -774,7 +774,7 @@ func (rq *replicateQueue) processOneChange( // unavailability; see: _ = execChangeReplicasTxn - action, _ := rq.allocator.ComputeAction(ctx, conf, desc) + action, allocatorPrio := rq.allocator.ComputeAction(ctx, conf, desc) log.VEventf(ctx, 1, "next replica action: %s", action) switch action { @@ -787,13 +787,13 @@ func (rq *replicateQueue) processOneChange( // Add replicas. case allocatorimpl.AllocatorAddVoter: requeue, err := rq.addOrReplaceVoters( - ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, dryRun, + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, allocatorPrio, dryRun, ) rq.metrics.trackResultByAllocatorAction(action, err, dryRun) return requeue, err case allocatorimpl.AllocatorAddNonVoter: requeue, err := rq.addOrReplaceNonVoters( - ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, dryRun, + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, allocatorPrio, dryRun, ) rq.metrics.trackResultByAllocatorAction(action, err, dryRun) return requeue, err @@ -821,7 +821,7 @@ func (rq *replicateQueue) processOneChange( deadVoterReplicas[0], voterReplicas) } requeue, err := rq.addOrReplaceVoters( - ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, dryRun) + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, allocatorPrio, dryRun) rq.metrics.trackResultByAllocatorAction(action, err, dryRun) return requeue, err case allocatorimpl.AllocatorReplaceDeadNonVoter: @@ -836,7 +836,7 @@ func (rq *replicateQueue) processOneChange( deadNonVoterReplicas[0], nonVoterReplicas) } requeue, err := rq.addOrReplaceNonVoters( - ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, dryRun) + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, allocatorPrio, dryRun) rq.metrics.trackResultByAllocatorAction(action, err, dryRun) return requeue, err @@ -854,7 +854,7 @@ func (rq *replicateQueue) processOneChange( decommissioningVoterReplicas[0], voterReplicas) } requeue, err := rq.addOrReplaceVoters( - ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, dryRun) + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, allocatorPrio, dryRun) rq.metrics.trackResultByAllocatorAction(action, err, dryRun) if err != nil { return requeue, decommissionPurgatoryError{err} @@ -872,7 +872,7 @@ func (rq *replicateQueue) processOneChange( decommissioningNonVoterReplicas[0], nonVoterReplicas) } requeue, err := rq.addOrReplaceNonVoters( - ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, dryRun) + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, allocatorPrio, dryRun) rq.metrics.trackResultByAllocatorAction(action, err, dryRun) if err != nil { return requeue, decommissionPurgatoryError{err} @@ -919,6 +919,7 @@ func (rq *replicateQueue) processOneChange( repl, voterReplicas, nonVoterReplicas, + allocatorPrio, canTransferLeaseFrom, scatter, dryRun, @@ -963,6 +964,7 @@ func (rq *replicateQueue) addOrReplaceVoters( liveVoterReplicas, liveNonVoterReplicas []roachpb.ReplicaDescriptor, removeIdx int, replicaStatus allocatorimpl.ReplicaStatus, + allocatorPriority float64, dryRun bool, ) (requeue bool, _ error) { desc, conf := repl.DescAndSpanConfig() @@ -1082,6 +1084,7 @@ func (rq *replicateQueue) addOrReplaceVoters( ops, desc, kvserverpb.SnapshotRequest_RECOVERY, + allocatorPriority, kvserverpb.ReasonRangeUnderReplicated, details, dryRun, @@ -1101,6 +1104,7 @@ func (rq *replicateQueue) addOrReplaceNonVoters( liveVoterReplicas, liveNonVoterReplicas []roachpb.ReplicaDescriptor, removeIdx int, replicaStatus allocatorimpl.ReplicaStatus, + allocatorPrio float64, dryRun bool, ) (requeue bool, _ error) { desc, conf := repl.DescAndSpanConfig() @@ -1138,6 +1142,7 @@ func (rq *replicateQueue) addOrReplaceNonVoters( ops, desc, kvserverpb.SnapshotRequest_RECOVERY, + allocatorPrio, kvserverpb.ReasonRangeUnderReplicated, details, dryRun, @@ -1326,6 +1331,7 @@ func (rq *replicateQueue) removeVoter( roachpb.MakeReplicationChanges(roachpb.REMOVE_VOTER, removeVoter), desc, kvserverpb.SnapshotRequest_UNKNOWN, // unused + 0.0, // unused kvserverpb.ReasonRangeOverReplicated, details, dryRun, @@ -1369,7 +1375,8 @@ func (rq *replicateQueue) removeNonVoter( repl, roachpb.MakeReplicationChanges(roachpb.REMOVE_NON_VOTER, target), desc, - kvserverpb.SnapshotRequest_UNKNOWN, + kvserverpb.SnapshotRequest_UNKNOWN, // unused + 0.0, // unused kvserverpb.ReasonRangeOverReplicated, details, dryRun, @@ -1429,6 +1436,7 @@ func (rq *replicateQueue) removeDecommissioning( roachpb.MakeReplicationChanges(targetType.RemoveChangeType(), target), desc, kvserverpb.SnapshotRequest_UNKNOWN, // unused + 0.0, // unused kvserverpb.ReasonStoreDecommissioning, "", dryRun, ); err != nil { return false, err @@ -1475,6 +1483,7 @@ func (rq *replicateQueue) removeDead( roachpb.MakeReplicationChanges(targetType.RemoveChangeType(), target), desc, kvserverpb.SnapshotRequest_UNKNOWN, // unused + 0.0, // unused kvserverpb.ReasonStoreDead, "", dryRun, @@ -1488,6 +1497,7 @@ func (rq *replicateQueue) considerRebalance( ctx context.Context, repl *Replica, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + allocatorPrio float64, canTransferLeaseFrom func(ctx context.Context, repl *Replica) bool, scatter, dryRun bool, ) (requeue bool, _ error) { @@ -1574,6 +1584,7 @@ func (rq *replicateQueue) considerRebalance( chgs, desc, kvserverpb.SnapshotRequest_REBALANCE, + allocatorPrio, kvserverpb.ReasonRebalance, details, dryRun, @@ -1794,6 +1805,7 @@ func (rq *replicateQueue) changeReplicas( chgs roachpb.ReplicationChanges, desc *roachpb.RangeDescriptor, priority kvserverpb.SnapshotRequest_Priority, + allocatorPriority float64, reason kvserverpb.RangeLogEventReason, details string, dryRun bool, @@ -1804,7 +1816,10 @@ func (rq *replicateQueue) changeReplicas( // NB: this calls the impl rather than ChangeReplicas because // the latter traps tests that try to call it while the replication // queue is active. - if _, err := repl.changeReplicasImpl(ctx, desc, priority, reason, details, chgs); err != nil { + if _, err := repl.changeReplicasImpl( + ctx, desc, priority, kvserverpb.SnapshotRequest_REPLICATE_QUEUE, allocatorPriority, reason, + details, chgs, + ); err != nil { return err } rangeUsageInfo := rangeUsageInfoForRepl(repl)