diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 0ecbdb630aa5..5be1e5ed03ea 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -138,6 +138,7 @@ go_library( "//pkg/kv/kvserver/kvserverpb", "//pkg/kv/kvserver/liveness", "//pkg/kv/kvserver/liveness/livenesspb", + "//pkg/kv/kvserver/multiqueue", "//pkg/kv/kvserver/raftentry", "//pkg/kv/kvserver/raftutil", "//pkg/kv/kvserver/rangefeed", @@ -413,6 +414,7 @@ go_test( "//pkg/util/contextutil", "//pkg/util/ctxgroup", "//pkg/util/encoding", + "//pkg/util/envutil", "//pkg/util/hlc", "//pkg/util/humanizeutil", "//pkg/util/leaktest", diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index c173fbe4a25e..2032d0c0708a 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -199,8 +199,10 @@ func (s *Store) ManualRaftSnapshot(repl *Replica, target roachpb.ReplicaID) erro return err } +// ReservationCount counts the number of outstanding reservations that are not +// running. func (s *Store) ReservationCount() int { - return len(s.snapshotApplySem) + return s.cfg.concurrentSnapshotApplyLimit - s.snapshotApplySem.Len() } // RaftSchedulerPriorityID returns the Raft scheduler's prioritized range. diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index f4db78b6a616..c35f9e658151 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -155,6 +155,16 @@ message SnapshotRequest { reserved 2; } + // QueueName indicates the source of the snapshot. Snapshots are prioritized + // within a queue and round-robin selected between queues for both the sending + // and receiving side. + enum QueueName { + RAFT_SNAPSHOT_QUEUE = 0; + REPLICATE_QUEUE = 1; + OTHER = 2; + } + + message Header { // The replica state at the time the snapshot was generated. Note // that ReplicaState.Desc differs from the above range_descriptor @@ -170,12 +180,14 @@ message SnapshotRequest { int64 range_size = 3; // The priority of the snapshot. + // TODO(abaptist): Deprecated, remove this in v23.1 Priority priority = 6; // The strategy of the snapshot. Strategy strategy = 7; // The type of the snapshot. + // TODO(abaptist): Deprecated, remove this in v23.1 Type type = 9; // Whether the snapshot uses the unreplicated RaftTruncatedState or not. @@ -190,6 +202,12 @@ message SnapshotRequest { // TODO(irfansharif): Remove this in v22.1. bool deprecated_unreplicated_truncated_state = 8; + // The sending queue's name. + SnapshotRequest.QueueName sender_queue_name = 10; + + // The sending queue's priority. + double sender_queue_priority = 11; + reserved 1, 4; } @@ -234,6 +252,12 @@ message DelegateSnapshotRequest { // The priority of the snapshot. SnapshotRequest.Priority priority = 5; + // The sending queue's name. + SnapshotRequest.QueueName sender_queue_name = 9; + + // The sending queue's priority. + double sender_queue_priority = 10; + // The type of the snapshot. SnapshotRequest.Type type = 6; diff --git a/pkg/kv/kvserver/raft_snapshot_queue.go b/pkg/kv/kvserver/raft_snapshot_queue.go index 61419699ecec..146d3c387215 100644 --- a/pkg/kv/kvserver/raft_snapshot_queue.go +++ b/pkg/kv/kvserver/raft_snapshot_queue.go @@ -140,7 +140,8 @@ func (rq *raftSnapshotQueue) processRaftSnapshot( } } - err := repl.sendSnapshot(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY) + err := repl.sendSnapshot(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY, + kvserverpb.SnapshotRequest_RAFT_SNAPSHOT_QUEUE, raftSnapshotPriority) // NB: if the snapshot fails because of an overlapping replica on the // recipient which is also waiting for a snapshot, the "smart" thing is to diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 4d669222f666..e86a83051fd9 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -981,13 +981,15 @@ func (r *Replica) ChangeReplicas( return nil, errors.New("must disable replicate queue to use ChangeReplicas manually") } } - return r.changeReplicasImpl(ctx, desc, priority, reason, details, chgs) + return r.changeReplicasImpl(ctx, desc, priority, kvserverpb.SnapshotRequest_OTHER, 0.0, reason, details, chgs) } func (r *Replica) changeReplicasImpl( ctx context.Context, desc *roachpb.RangeDescriptor, priority kvserverpb.SnapshotRequest_Priority, + senderName kvserverpb.SnapshotRequest_QueueName, + senderQueuePriority float64, reason kvserverpb.RangeLogEventReason, details string, chgs roachpb.ReplicationChanges, @@ -1054,7 +1056,7 @@ func (r *Replica) changeReplicasImpl( _ = roachpb.ReplicaSet.LearnerDescriptors var err error desc, err = r.initializeRaftLearners( - ctx, desc, priority, reason, details, adds, roachpb.LEARNER, + ctx, desc, priority, senderName, senderQueuePriority, reason, details, adds, roachpb.LEARNER, ) if err != nil { return nil, err @@ -1100,7 +1102,7 @@ func (r *Replica) changeReplicasImpl( // disruption to foreground traffic. See // https://github.com/cockroachdb/cockroach/issues/63199 for an example. desc, err = r.initializeRaftLearners( - ctx, desc, priority, reason, details, adds, roachpb.NON_VOTER, + ctx, desc, priority, senderName, senderQueuePriority, reason, details, adds, roachpb.NON_VOTER, ) if err != nil { return nil, err @@ -1654,6 +1656,8 @@ func (r *Replica) initializeRaftLearners( ctx context.Context, desc *roachpb.RangeDescriptor, priority kvserverpb.SnapshotRequest_Priority, + senderName kvserverpb.SnapshotRequest_QueueName, + senderQueuePriority float64, reason kvserverpb.RangeLogEventReason, details string, targets []roachpb.ReplicationTarget, @@ -1799,7 +1803,7 @@ func (r *Replica) initializeRaftLearners( // orphaned learner. Second, this tickled some bugs in etcd/raft around // switching between StateSnapshot and StateProbe. Even if we worked through // these, it would be susceptible to future similar issues. - if err := r.sendSnapshot(ctx, rDesc, kvserverpb.SnapshotRequest_INITIAL, priority); err != nil { + if err := r.sendSnapshot(ctx, rDesc, kvserverpb.SnapshotRequest_INITIAL, priority, senderName, senderQueuePriority); err != nil { return nil, err } } @@ -2584,6 +2588,8 @@ func (r *Replica) sendSnapshot( recipient roachpb.ReplicaDescriptor, snapType kvserverpb.SnapshotRequest_Type, priority kvserverpb.SnapshotRequest_Priority, + senderQueueName kvserverpb.SnapshotRequest_QueueName, + senderQueuePriority float64, ) (retErr error) { defer func() { // Report the snapshot status to Raft, which expects us to do this once we @@ -2631,13 +2637,15 @@ func (r *Replica) sendSnapshot( // Create new delegate snapshot request with only required metadata. delegateRequest := &kvserverpb.DelegateSnapshotRequest{ - RangeID: r.RangeID, - CoordinatorReplica: sender, - RecipientReplica: recipient, - Priority: priority, - Type: snapType, - Term: status.Term, - DelegatedSender: sender, + RangeID: r.RangeID, + CoordinatorReplica: sender, + RecipientReplica: recipient, + Priority: priority, + SenderQueueName: senderQueueName, + SenderQueuePriority: senderQueuePriority, + Type: snapType, + Term: status.Term, + DelegatedSender: sender, } err = contextutil.RunWithTimeout( ctx, "delegate-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { @@ -2777,10 +2785,12 @@ func (r *Replica) followerSendSnapshot( Snapshot: snap.RaftSnap, }, }, - RangeSize: rangeSize, - Priority: req.Priority, - Strategy: kvserverpb.SnapshotRequest_KV_BATCH, - Type: req.Type, + RangeSize: rangeSize, + Priority: req.Priority, + SenderQueueName: req.SenderQueueName, + SenderQueuePriority: req.SenderQueuePriority, + Strategy: kvserverpb.SnapshotRequest_KV_BATCH, + Type: req.Type, } newBatchFn := func() storage.Batch { return r.store.Engine().NewUnindexedBatch(true /* writeOnly */) diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index c172c9fb4601..622970560994 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" @@ -240,6 +241,10 @@ func TestAddReplicaWithReceiverThrottling(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + // This test assumes that the snapshot send limit is set to 1 so the second + // snapshot send will block waiting for the first to complete. + cleanup := envutil.TestSetEnv(t, "COCKROACH_CONCURRENT_SNAPSHOT_SEND_LIMIT", "1") + defer cleanup() // blockIncomingSnapshots will block receiving snapshots. blockIncomingSnapshots := make(chan struct{}) waitForRebalanceToBlockCh := make(chan struct{}) @@ -991,6 +996,8 @@ func TestLearnerAdminChangeReplicasRace(t *testing.T) { func TestLearnerReplicateQueueRace(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + cleanup := envutil.TestSetEnv(t, "COCKROACH_CONCURRENT_SNAPSHOT_SEND_LIMIT", "2") + defer cleanup() var skipReceiveSnapshotKnobAtomic int64 = 1 blockUntilSnapshotCh := make(chan struct{}, 2) diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index cdd55b16f6a7..b2a61fe9ca02 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -774,7 +774,7 @@ func (rq *replicateQueue) processOneChange( // unavailability; see: _ = execChangeReplicasTxn - action, _ := rq.allocator.ComputeAction(ctx, conf, desc) + action, allocatorPrio := rq.allocator.ComputeAction(ctx, conf, desc) log.VEventf(ctx, 1, "next replica action: %s", action) switch action { @@ -787,13 +787,13 @@ func (rq *replicateQueue) processOneChange( // Add replicas. case allocatorimpl.AllocatorAddVoter: requeue, err := rq.addOrReplaceVoters( - ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, dryRun, + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, allocatorPrio, dryRun, ) rq.metrics.trackResultByAllocatorAction(action, err, dryRun) return requeue, err case allocatorimpl.AllocatorAddNonVoter: requeue, err := rq.addOrReplaceNonVoters( - ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, dryRun, + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, allocatorPrio, dryRun, ) rq.metrics.trackResultByAllocatorAction(action, err, dryRun) return requeue, err @@ -821,7 +821,7 @@ func (rq *replicateQueue) processOneChange( deadVoterReplicas[0], voterReplicas) } requeue, err := rq.addOrReplaceVoters( - ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, dryRun) + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, allocatorPrio, dryRun) rq.metrics.trackResultByAllocatorAction(action, err, dryRun) return requeue, err case allocatorimpl.AllocatorReplaceDeadNonVoter: @@ -836,7 +836,7 @@ func (rq *replicateQueue) processOneChange( deadNonVoterReplicas[0], nonVoterReplicas) } requeue, err := rq.addOrReplaceNonVoters( - ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, dryRun) + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, allocatorPrio, dryRun) rq.metrics.trackResultByAllocatorAction(action, err, dryRun) return requeue, err @@ -854,7 +854,7 @@ func (rq *replicateQueue) processOneChange( decommissioningVoterReplicas[0], voterReplicas) } requeue, err := rq.addOrReplaceVoters( - ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, dryRun) + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, allocatorPrio, dryRun) rq.metrics.trackResultByAllocatorAction(action, err, dryRun) if err != nil { return requeue, decommissionPurgatoryError{err} @@ -872,7 +872,7 @@ func (rq *replicateQueue) processOneChange( decommissioningNonVoterReplicas[0], nonVoterReplicas) } requeue, err := rq.addOrReplaceNonVoters( - ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, dryRun) + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, allocatorPrio, dryRun) rq.metrics.trackResultByAllocatorAction(action, err, dryRun) if err != nil { return requeue, decommissionPurgatoryError{err} @@ -919,6 +919,7 @@ func (rq *replicateQueue) processOneChange( repl, voterReplicas, nonVoterReplicas, + allocatorPrio, canTransferLeaseFrom, scatter, dryRun, @@ -963,6 +964,7 @@ func (rq *replicateQueue) addOrReplaceVoters( liveVoterReplicas, liveNonVoterReplicas []roachpb.ReplicaDescriptor, removeIdx int, replicaStatus allocatorimpl.ReplicaStatus, + allocatorPriority float64, dryRun bool, ) (requeue bool, _ error) { desc, conf := repl.DescAndSpanConfig() @@ -1082,6 +1084,7 @@ func (rq *replicateQueue) addOrReplaceVoters( ops, desc, kvserverpb.SnapshotRequest_RECOVERY, + allocatorPriority, kvserverpb.ReasonRangeUnderReplicated, details, dryRun, @@ -1101,6 +1104,7 @@ func (rq *replicateQueue) addOrReplaceNonVoters( liveVoterReplicas, liveNonVoterReplicas []roachpb.ReplicaDescriptor, removeIdx int, replicaStatus allocatorimpl.ReplicaStatus, + allocatorPrio float64, dryRun bool, ) (requeue bool, _ error) { desc, conf := repl.DescAndSpanConfig() @@ -1138,6 +1142,7 @@ func (rq *replicateQueue) addOrReplaceNonVoters( ops, desc, kvserverpb.SnapshotRequest_RECOVERY, + allocatorPrio, kvserverpb.ReasonRangeUnderReplicated, details, dryRun, @@ -1326,6 +1331,7 @@ func (rq *replicateQueue) removeVoter( roachpb.MakeReplicationChanges(roachpb.REMOVE_VOTER, removeVoter), desc, kvserverpb.SnapshotRequest_UNKNOWN, // unused + 0.0, // unused kvserverpb.ReasonRangeOverReplicated, details, dryRun, @@ -1369,7 +1375,8 @@ func (rq *replicateQueue) removeNonVoter( repl, roachpb.MakeReplicationChanges(roachpb.REMOVE_NON_VOTER, target), desc, - kvserverpb.SnapshotRequest_UNKNOWN, + kvserverpb.SnapshotRequest_UNKNOWN, // unused + 0.0, // unused kvserverpb.ReasonRangeOverReplicated, details, dryRun, @@ -1429,6 +1436,7 @@ func (rq *replicateQueue) removeDecommissioning( roachpb.MakeReplicationChanges(targetType.RemoveChangeType(), target), desc, kvserverpb.SnapshotRequest_UNKNOWN, // unused + 0.0, // unused kvserverpb.ReasonStoreDecommissioning, "", dryRun, ); err != nil { return false, err @@ -1475,6 +1483,7 @@ func (rq *replicateQueue) removeDead( roachpb.MakeReplicationChanges(targetType.RemoveChangeType(), target), desc, kvserverpb.SnapshotRequest_UNKNOWN, // unused + 0.0, // unused kvserverpb.ReasonStoreDead, "", dryRun, @@ -1488,6 +1497,7 @@ func (rq *replicateQueue) considerRebalance( ctx context.Context, repl *Replica, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + allocatorPrio float64, canTransferLeaseFrom func(ctx context.Context, repl *Replica) bool, scatter, dryRun bool, ) (requeue bool, _ error) { @@ -1574,6 +1584,7 @@ func (rq *replicateQueue) considerRebalance( chgs, desc, kvserverpb.SnapshotRequest_REBALANCE, + allocatorPrio, kvserverpb.ReasonRebalance, details, dryRun, @@ -1794,6 +1805,7 @@ func (rq *replicateQueue) changeReplicas( chgs roachpb.ReplicationChanges, desc *roachpb.RangeDescriptor, priority kvserverpb.SnapshotRequest_Priority, + allocatorPriority float64, reason kvserverpb.RangeLogEventReason, details string, dryRun bool, @@ -1804,7 +1816,8 @@ func (rq *replicateQueue) changeReplicas( // NB: this calls the impl rather than ChangeReplicas because // the latter traps tests that try to call it while the replication // queue is active. - if _, err := repl.changeReplicasImpl(ctx, desc, priority, reason, details, chgs); err != nil { + if _, err := repl.changeReplicasImpl(ctx, desc, priority, kvserverpb.SnapshotRequest_REPLICATE_QUEUE, + allocatorPriority, reason, details, chgs); err != nil { return err } rangeUsageInfo := rangeUsageInfoForRepl(repl) diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 98d01b712a7d..d0820d99d7f2 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -43,6 +43,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/multiqueue" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats" @@ -771,11 +772,10 @@ type Store struct { initComplete sync.WaitGroup // Signaled by async init tasks // Semaphore to limit concurrent non-empty snapshot application. - snapshotApplySem chan struct{} - // Semaphore to limit concurrent non-empty snapshot sending. - initialSnapshotSendSem chan struct{} + snapshotApplySem *multiqueue.MultiQueue + // Semaphore to limit concurrent non-empty snapshot sending. - raftSnapshotSendSem chan struct{} + snapshotSendSem *multiqueue.MultiQueue // Track newly-acquired expiration-based leases that we want to proactively // renew. An object is sent on the signal whenever a new entry is added to @@ -1139,7 +1139,7 @@ func (sc *StoreConfig) SetDefaults() { } if sc.concurrentSnapshotSendLimit == 0 { sc.concurrentSnapshotSendLimit = - envutil.EnvOrDefaultInt("COCKROACH_CONCURRENT_SNAPSHOT_SEND_LIMIT", 1) + envutil.EnvOrDefaultInt("COCKROACH_CONCURRENT_SNAPSHOT_SEND_LIMIT", 2) } if sc.TestingKnobs.GossipWhenCapacityDeltaExceedsFraction == 0 { @@ -1242,9 +1242,8 @@ func NewStore( s.txnWaitMetrics = txnwait.NewMetrics(cfg.HistogramWindowInterval) s.metrics.registry.AddMetricStruct(s.txnWaitMetrics) - s.snapshotApplySem = make(chan struct{}, cfg.concurrentSnapshotApplyLimit) - s.initialSnapshotSendSem = make(chan struct{}, cfg.concurrentSnapshotSendLimit) - s.raftSnapshotSendSem = make(chan struct{}, cfg.concurrentSnapshotSendLimit) + s.snapshotApplySem = multiqueue.NewMultiQueue(cfg.concurrentSnapshotApplyLimit) + s.snapshotSendSem = multiqueue.NewMultiQueue(cfg.concurrentSnapshotSendLimit) if ch := s.cfg.TestingKnobs.LeaseRenewalSignalChan; ch != nil { s.renewableLeasesSignal = ch } else { diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 7803070a5f62..331964e187eb 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/multiqueue" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" @@ -659,8 +660,12 @@ func (s *Store) reserveReceiveSnapshot( ) (_cleanup func(), _err error) { ctx, sp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "reserveSnapshot") defer sp.Finish() + requestSource := int32(header.SenderQueueName) + requestPriority := header.SenderQueuePriority return s.throttleSnapshot( - ctx, s.snapshotApplySem, header.RangeSize, + ctx, s.snapshotApplySem, + int(requestSource), requestPriority, + header.RangeSize, header.RaftMessageRequest.RangeID, header.RaftMessageRequest.ToReplica.ReplicaID, s.metrics.RangeSnapshotRecvQueueLength, s.metrics.RangeSnapshotRecvInProgress, s.metrics.RangeSnapshotRecvTotalInProgress, @@ -673,14 +678,14 @@ func (s *Store) reserveSendSnapshot( ) (_cleanup func(), _err error) { ctx, sp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "reserveSendSnapshot") defer sp.Finish() - sem := s.initialSnapshotSendSem - if req.Type == kvserverpb.SnapshotRequest_VIA_SNAPSHOT_QUEUE { - sem = s.raftSnapshotSendSem - } if fn := s.cfg.TestingKnobs.BeforeSendSnapshotThrottle; fn != nil { fn() } - return s.throttleSnapshot(ctx, sem, rangeSize, + requestSource := int32(req.SenderQueueName) + requestPriority := req.SenderQueuePriority + return s.throttleSnapshot(ctx, s.snapshotSendSem, + int(requestSource), requestPriority, + rangeSize, req.RangeID, req.DelegatedSender.ReplicaID, s.metrics.RangeSnapshotSendQueueLength, s.metrics.RangeSnapshotSendInProgress, s.metrics.RangeSnapshotSendTotalInProgress, @@ -692,18 +697,22 @@ func (s *Store) reserveSendSnapshot( // release its resources. func (s *Store) throttleSnapshot( ctx context.Context, - snapshotSem chan struct{}, + semaphore *multiqueue.MultiQueue, + requestSource int, + requestPriority float64, rangeSize int64, rangeID roachpb.RangeID, replicaID roachpb.ReplicaID, waitingSnapshotMetric, inProgressSnapshotMetric, totalInProgressSnapshotMetric *metric.Gauge, ) (_cleanup func(), _err error) { tBegin := timeutil.Now() + var permit *multiqueue.Permit // Empty snapshots are exempt from rate limits because they're so cheap to // apply. This vastly speeds up rebalancing any empty ranges created by a // RESTORE or manual SPLIT AT, since it prevents these empty snapshots from // getting stuck behind large snapshots managed by the replicate queue. if rangeSize != 0 || s.cfg.TestingKnobs.ThrottleEmptySnapshots { + task := semaphore.Add(requestSource, requestPriority) waitingSnapshotMetric.Inc(1) defer waitingSnapshotMetric.Dec(1) queueCtx := ctx @@ -719,7 +728,7 @@ func (s *Store) throttleSnapshot( defer cancel() } select { - case snapshotSem <- struct{}{}: + case permit = <-task.GetWaitChan(): // Got a spot in the semaphore, continue with sending the snapshot. if fn := s.cfg.TestingKnobs.AfterSendSnapshotThrottle; fn != nil { fn() @@ -770,7 +779,7 @@ func (s *Store) throttleSnapshot( if rangeSize != 0 || s.cfg.TestingKnobs.ThrottleEmptySnapshots { inProgressSnapshotMetric.Dec(1) - <-snapshotSem + semaphore.Release(permit) } }, nil }