diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index d9dd8678053b..e2688fcb04b7 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -295,4 +295,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https:///#/debug/tracez trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 1000022.1-68 set the active cluster version in the format '.' +version version 1000022.1-70 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 523a18dbb92c..c68be48ee994 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -229,6 +229,6 @@ trace.opentelemetry.collectorstringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.span_registry.enabledbooleantrueif set, ongoing traces can be seen at https:///#/debug/tracez trace.zipkin.collectorstringthe address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -versionversion1000022.1-68set the active cluster version in the format '.' +versionversion1000022.1-70set the active cluster version in the format '.' diff --git a/pkg/base/test_server_args.go b/pkg/base/test_server_args.go index d9ea00d3c6a3..9b6a98d57840 100644 --- a/pkg/base/test_server_args.go +++ b/pkg/base/test_server_args.go @@ -102,6 +102,8 @@ type TestServerArgs struct { TimeSeriesQueryMemoryBudget int64 SQLMemoryPoolSize int64 CacheSize int64 + SnapshotSendLimit int64 + SnapshotApplyLimit int64 // By default, test servers have AutoInitializeCluster=true set in // their config. If NoAutoInitializeCluster is set, that behavior is disabled diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 64115cc04239..a8b4440acf6d 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -278,6 +278,10 @@ const ( // TTLDistSQL uses DistSQL to distribute TTL SELECT/DELETE statements to // leaseholder nodes. TTLDistSQL + // PrioritizeSnapshots adds prioritization to sender snapshots. When this + // version is enabled, the receiver will look at the priority of snapshots + // using the fields added in 22.2. + PrioritizeSnapshots // ************************************************* // Step (1): Add new versions here. @@ -453,6 +457,10 @@ var rawVersionsSingleton = keyedVersions{ Key: TTLDistSQL, Version: roachpb.Version{Major: 22, Minor: 1, Internal: 68}, }, + { + Key: PrioritizeSnapshots, + Version: roachpb.Version{Major: 22, Minor: 1, Internal: 70}, + }, // ************************************************* // Step (2): Add new versions here. diff --git a/pkg/clusterversion/key_string.go b/pkg/clusterversion/key_string.go index 9de85aa54e5b..6c345cc1cb9a 100644 --- a/pkg/clusterversion/key_string.go +++ b/pkg/clusterversion/key_string.go @@ -44,11 +44,12 @@ func _() { _ = x[GCHintInReplicaState-32] _ = x[UpdateInvalidColumnIDsInSequenceBackReferences-33] _ = x[TTLDistSQL-34] + _ = x[PrioritizeSnapshots-35] } -const _Key_name = "invalidVersionKeyV22_1Start22_2LocalTimestampsPebbleFormatSplitUserKeysMarkedCompactedEnsurePebbleFormatVersionRangeKeysEnablePebbleFormatVersionRangeKeysTrigramInvertedIndexesRemoveGrantPrivilegeMVCCRangeTombstonesUpgradeSequenceToBeReferencedByIDSampledStmtDiagReqsAddSSTableTombstonesSystemPrivilegesTableEnablePredicateProjectionChangefeedAlterSystemSQLInstancesAddLocalitySystemExternalConnectionsTableAlterSystemStatementStatisticsAddIndexRecommendationsRoleIDSequenceAddSystemUserIDColumnSystemUsersIDColumnIsBackfilledSetSystemUsersUserIDColumnNotNullSQLSchemaTelemetryScheduledJobsSchemaChangeSupportsCreateFunctionDeleteRequestReturnKeyPebbleFormatPrePebblev1MarkedRoleOptionsTableHasIDColumnRoleOptionsIDColumnIsBackfilledSetRoleOptionsUserIDColumnNotNullUseDelRangeInGCJobWaitedForDelRangeInGCJobRangefeedUseOneStreamPerNodeNoNonMVCCAddSSTableGCHintInReplicaStateUpdateInvalidColumnIDsInSequenceBackReferencesTTLDistSQL" +const _Key_name = "invalidVersionKeyV22_1Start22_2LocalTimestampsPebbleFormatSplitUserKeysMarkedCompactedEnsurePebbleFormatVersionRangeKeysEnablePebbleFormatVersionRangeKeysTrigramInvertedIndexesRemoveGrantPrivilegeMVCCRangeTombstonesUpgradeSequenceToBeReferencedByIDSampledStmtDiagReqsAddSSTableTombstonesSystemPrivilegesTableEnablePredicateProjectionChangefeedAlterSystemSQLInstancesAddLocalitySystemExternalConnectionsTableAlterSystemStatementStatisticsAddIndexRecommendationsRoleIDSequenceAddSystemUserIDColumnSystemUsersIDColumnIsBackfilledSetSystemUsersUserIDColumnNotNullSQLSchemaTelemetryScheduledJobsSchemaChangeSupportsCreateFunctionDeleteRequestReturnKeyPebbleFormatPrePebblev1MarkedRoleOptionsTableHasIDColumnRoleOptionsIDColumnIsBackfilledSetRoleOptionsUserIDColumnNotNullUseDelRangeInGCJobWaitedForDelRangeInGCJobRangefeedUseOneStreamPerNodeNoNonMVCCAddSSTableGCHintInReplicaStateUpdateInvalidColumnIDsInSequenceBackReferencesTTLDistSQLPrioritizeSnapshots" -var _Key_index = [...]uint16{0, 17, 22, 31, 46, 86, 120, 154, 176, 196, 215, 248, 267, 287, 308, 343, 377, 407, 460, 474, 495, 526, 559, 590, 624, 646, 675, 702, 733, 766, 784, 808, 836, 855, 875, 921, 931} +var _Key_index = [...]uint16{0, 17, 22, 31, 46, 86, 120, 154, 176, 196, 215, 248, 267, 287, 308, 343, 377, 407, 460, 474, 495, 526, 559, 590, 624, 646, 675, 702, 733, 766, 784, 808, 836, 855, 875, 921, 931, 950} func (i Key) String() string { i -= -1 diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index e24c30c8e1f2..bec1040c5d1d 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -136,6 +136,7 @@ go_library( "//pkg/kv/kvserver/kvserverpb", "//pkg/kv/kvserver/liveness", "//pkg/kv/kvserver/liveness/livenesspb", + "//pkg/kv/kvserver/multiqueue", "//pkg/kv/kvserver/raftentry", "//pkg/kv/kvserver/raftutil", "//pkg/kv/kvserver/rangefeed", diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index c173fbe4a25e..8eda8f267882 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -199,8 +199,10 @@ func (s *Store) ManualRaftSnapshot(repl *Replica, target roachpb.ReplicaID) erro return err } +// ReservationCount counts the number of outstanding reservations that are not +// running. func (s *Store) ReservationCount() int { - return len(s.snapshotApplySem) + return int(s.cfg.SnapshotApplyLimit) - s.snapshotApplyQueue.Len() } // RaftSchedulerPriorityID returns the Raft scheduler's prioritized range. diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index f5cbd37383a6..4923d96ffecd 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -204,7 +204,11 @@ message SnapshotRequest { bool deprecated_unreplicated_truncated_state = 8; // The sending queue's name, to be utilized to ensure fairness across - // different snapshot sending sources. + // different snapshot sending sources. The default queue name, OTHER, is + // reserved for any uncategorized and unprioritized snapshots, and requests + // with sender queue name OTHER may not specify a non-zero + // sender_queue_priority. To prioritize snapshots categorized as OTHER, + // first move them to a different queue name. SnapshotRequest.QueueName sender_queue_name = 10; // The sending queue's priority, to be utilized to prioritize snapshots diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index eb7411f00e77..eb7b06f3620f 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -2665,6 +2665,16 @@ func (r *Replica) sendSnapshot( } } + // Don't send a queue name or priority if the receiver may not understand + // them or the setting is disabled. TODO(baptist): Remove the version flag in + // v23.1. Consider removing the cluster setting once we have verified this + // works as expected in all cases. + if !r.store.ClusterSettings().Version.IsActive(ctx, clusterversion.PrioritizeSnapshots) || + !snapshotPrioritizationEnabled.Get(&r.store.ClusterSettings().SV) { + senderQueueName = 0 + senderQueuePriority = 0 + } + log.VEventf( ctx, 2, "delegating snapshot transmission for %v to %v", recipient, sender, ) diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 79f1344da700..06933e69aeec 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -265,7 +265,7 @@ func TestAddReplicaWithReceiverThrottling(t *testing.T) { ctx := context.Background() tc := testcluster.StartTestCluster( t, 3, base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{Knobs: knobs}, + ServerArgs: base.TestServerArgs{Knobs: knobs, SnapshotSendLimit: 1}, ReplicationMode: base.ReplicationManual, }, ) @@ -1066,7 +1066,7 @@ func TestLearnerReplicateQueueRace(t *testing.T) { } ctx := context.Background() tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{Knobs: knobs}, + ServerArgs: base.TestServerArgs{Knobs: knobs, SnapshotSendLimit: 1}, ReplicationMode: base.ReplicationManual, }) defer tc.Stopper().Stop(ctx) diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 343503f58aa3..6c88f25e7e79 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -42,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/multiqueue" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats" @@ -253,6 +254,8 @@ func testStoreConfig(clock *hlc.Clock, version roachpb.Version) StoreConfig { ScanInterval: 10 * time.Minute, HistogramWindowInterval: metric.TestSampleInterval, ProtectedTimestampReader: spanconfig.EmptyProtectedTSReader(clock), + SnapshotSendLimit: DefaultSnapshotSendLimit, + SnapshotApplyLimit: DefaultSnapshotApplyLimit, // Use a constant empty system config, which mirrors the previously // existing logic to install an empty system config in gossip. @@ -769,12 +772,11 @@ type Store struct { nodeDesc *roachpb.NodeDescriptor initComplete sync.WaitGroup // Signaled by async init tasks - // Semaphore to limit concurrent non-empty snapshot application. - snapshotApplySem chan struct{} - // Semaphore to limit concurrent non-empty snapshot sending. - initialSnapshotSendSem chan struct{} - // Semaphore to limit concurrent non-empty snapshot sending. - raftSnapshotSendSem chan struct{} + // Queue to limit concurrent non-empty snapshot application. + snapshotApplyQueue *multiqueue.MultiQueue + + // Queue to limit concurrent non-empty snapshot sending. + snapshotSendQueue *multiqueue.MultiQueue // Track newly-acquired expiration-based leases that we want to proactively // renew. An object is sent on the signal whenever a new entry is added to @@ -1053,14 +1055,14 @@ type StoreConfig struct { TestingKnobs StoreTestingKnobs - // concurrentSnapshotApplyLimit specifies the maximum number of empty + // SnapshotApplyLimit specifies the maximum number of empty // snapshots and the maximum number of non-empty snapshots that are permitted // to be applied concurrently. - concurrentSnapshotApplyLimit int + SnapshotApplyLimit int64 - // concurrentSnapshotSendLimit specifies the maximum number of each type of + // SnapshotSendLimit specifies the maximum number of each type of // snapshot that are permitted to be sent concurrently. - concurrentSnapshotSendLimit int + SnapshotSendLimit int64 // HistogramWindowInterval is (server.Config).HistogramWindowInterval HistogramWindowInterval time.Duration @@ -1145,16 +1147,6 @@ func (sc *StoreConfig) SetDefaults() { if sc.RaftEntryCacheSize == 0 { sc.RaftEntryCacheSize = defaultRaftEntryCacheSize } - if sc.concurrentSnapshotApplyLimit == 0 { - // NB: setting this value higher than 1 is likely to degrade client - // throughput. - sc.concurrentSnapshotApplyLimit = - envutil.EnvOrDefaultInt("COCKROACH_CONCURRENT_SNAPSHOT_APPLY_LIMIT", 1) - } - if sc.concurrentSnapshotSendLimit == 0 { - sc.concurrentSnapshotSendLimit = - envutil.EnvOrDefaultInt("COCKROACH_CONCURRENT_SNAPSHOT_SEND_LIMIT", 1) - } if sc.TestingKnobs.GossipWhenCapacityDeltaExceedsFraction == 0 { sc.TestingKnobs.GossipWhenCapacityDeltaExceedsFraction = defaultGossipWhenCapacityDeltaExceedsFraction @@ -1256,9 +1248,8 @@ func NewStore( s.txnWaitMetrics = txnwait.NewMetrics(cfg.HistogramWindowInterval) s.metrics.registry.AddMetricStruct(s.txnWaitMetrics) - s.snapshotApplySem = make(chan struct{}, cfg.concurrentSnapshotApplyLimit) - s.initialSnapshotSendSem = make(chan struct{}, cfg.concurrentSnapshotSendLimit) - s.raftSnapshotSendSem = make(chan struct{}, cfg.concurrentSnapshotSendLimit) + s.snapshotApplyQueue = multiqueue.NewMultiQueue(int(cfg.SnapshotApplyLimit)) + s.snapshotSendQueue = multiqueue.NewMultiQueue(int(cfg.SnapshotSendLimit)) if ch := s.cfg.TestingKnobs.LeaseRenewalSignalChan; ch != nil { s.renewableLeasesSignal = ch } else { diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 6827584041e1..7ab6ab1bf307 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/multiqueue" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" @@ -56,6 +57,24 @@ const ( // tagSnapshotTiming is the tracing span tag that the *snapshotTimingTag // lives under. tagSnapshotTiming = "snapshot_timing_tag" + + // DefaultSnapshotSendLimit is the max number of snapshots concurrently sent. + // See server.KVConfig for more info. + DefaultSnapshotSendLimit = 2 + + // DefaultSnapshotApplyLimit is the number of snapshots concurrently applied. + // See server.KVConfig for more info. + DefaultSnapshotApplyLimit = 1 +) + +// snapshotPrioritizationEnabled will allow the sender and receiver of snapshots +// to prioritize the snapshots. If disabled, the behavior will be FIFO on both +// send and receive sides. +var snapshotPrioritizationEnabled = settings.RegisterBoolSetting( + settings.SystemOnly, + "kv.snapshot_prioritization.enabled", + "if true, then prioritize enqueued snapshots on both the send or receive sides", + true, ) // incomingSnapshotStream is the minimal interface on a GRPC stream required @@ -659,10 +678,12 @@ func (kvSS *kvBatchSnapshotStrategy) Close(ctx context.Context) { func (s *Store) reserveReceiveSnapshot( ctx context.Context, header *kvserverpb.SnapshotRequest_Header, ) (_cleanup func(), _err error) { - ctx, sp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "reserveSnapshot") + ctx, sp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "reserveReceiveSnapshot") defer sp.Finish() - return s.throttleSnapshot( - ctx, s.snapshotApplySem, header.RangeSize, + + return s.throttleSnapshot(ctx, s.snapshotApplyQueue, + int(header.SenderQueueName), header.SenderQueuePriority, + header.RangeSize, header.RaftMessageRequest.RangeID, header.RaftMessageRequest.ToReplica.ReplicaID, s.metrics.RangeSnapshotRecvQueueLength, s.metrics.RangeSnapshotRecvInProgress, s.metrics.RangeSnapshotRecvTotalInProgress, @@ -675,14 +696,13 @@ func (s *Store) reserveSendSnapshot( ) (_cleanup func(), _err error) { ctx, sp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "reserveSendSnapshot") defer sp.Finish() - sem := s.initialSnapshotSendSem - if req.Type == kvserverpb.SnapshotRequest_VIA_SNAPSHOT_QUEUE { - sem = s.raftSnapshotSendSem - } if fn := s.cfg.TestingKnobs.BeforeSendSnapshotThrottle; fn != nil { fn() } - return s.throttleSnapshot(ctx, sem, rangeSize, + + return s.throttleSnapshot(ctx, s.snapshotSendQueue, + int(req.SenderQueueName), req.SenderQueuePriority, + rangeSize, req.RangeID, req.DelegatedSender.ReplicaID, s.metrics.RangeSnapshotSendQueueLength, s.metrics.RangeSnapshotSendInProgress, s.metrics.RangeSnapshotSendTotalInProgress, @@ -694,18 +714,28 @@ func (s *Store) reserveSendSnapshot( // release its resources. func (s *Store) throttleSnapshot( ctx context.Context, - snapshotSem chan struct{}, + snapshotQueue *multiqueue.MultiQueue, + requestSource int, + requestPriority float64, rangeSize int64, rangeID roachpb.RangeID, replicaID roachpb.ReplicaID, waitingSnapshotMetric, inProgressSnapshotMetric, totalInProgressSnapshotMetric *metric.Gauge, -) (_cleanup func(), _err error) { +) (cleanup func(), err error) { tBegin := timeutil.Now() + var permit *multiqueue.Permit // Empty snapshots are exempt from rate limits because they're so cheap to // apply. This vastly speeds up rebalancing any empty ranges created by a // RESTORE or manual SPLIT AT, since it prevents these empty snapshots from // getting stuck behind large snapshots managed by the replicate queue. if rangeSize != 0 || s.cfg.TestingKnobs.ThrottleEmptySnapshots { + task := snapshotQueue.Add(requestSource, requestPriority) + defer func() { + if err != nil { + snapshotQueue.Cancel(task) + } + }() + waitingSnapshotMetric.Inc(1) defer waitingSnapshotMetric.Dec(1) queueCtx := ctx @@ -721,13 +751,14 @@ func (s *Store) throttleSnapshot( defer cancel() } select { - case snapshotSem <- struct{}{}: - // Got a spot in the semaphore, continue with sending the snapshot. + case permit = <-task.GetWaitChan(): + // Got a spot in the snapshotQueue, continue with sending the snapshot. if fn := s.cfg.TestingKnobs.AfterSendSnapshotThrottle; fn != nil { fn() } - log.Event(ctx, "acquired spot in the snapshot semaphore") + log.Event(ctx, "acquired spot in the snapshot snapshotQueue") case <-queueCtx.Done(): + // We need to cancel the task so that it doesn't ever get a permit. if err := ctx.Err(); err != nil { return nil, errors.Wrap(err, "acquiring snapshot reservation") } @@ -772,7 +803,7 @@ func (s *Store) throttleSnapshot( if rangeSize != 0 || s.cfg.TestingKnobs.ThrottleEmptySnapshots { inProgressSnapshotMetric.Dec(1) - <-snapshotSem + snapshotQueue.Release(permit) } }, nil } @@ -1145,7 +1176,7 @@ var snapshotSenderBatchSize = settings.RegisterByteSizeSetting( // snapshot's total timeout that it is allowed to spend queued on the receiver // waiting for a reservation. // -// Enforcement of this snapshotApplySem-scoped timeout is intended to prevent +// Enforcement of this snapshotApplyQueue-scoped timeout is intended to prevent // starvation of snapshots in cases where a queue of snapshots waiting for // reservations builds and no single snapshot acquires the semaphore with // sufficient time to complete, but each holds the semaphore long enough to diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index b868d803c672..7a9f5766c978 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -2926,6 +2926,78 @@ func TestSendSnapshotThrottling(t *testing.T) { } } +// TestSendSnapshotConcurrency tests the sending of concurrent snapshots and +// verifies they are only sent "2 at a time". This is not intended to test the +// prioritization of the snapshots as that is covered by the multi-queue +// testing. +func TestSendSnapshotConcurrency(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + tc := testContext{} + tc.Start(ctx, t, stopper) + s := tc.store + + // Checking this now makes sure that if the defaults change this test will also. + require.Equal(t, 2, s.snapshotSendQueue.Len()) + cleanup1, err := s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSnapshotRequest{ + SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE, + SenderQueuePriority: 1, + }, 1) + require.Nil(t, err) + require.Equal(t, 1, s.snapshotSendQueue.Len()) + cleanup2, err := s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSnapshotRequest{ + SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE, + SenderQueuePriority: 1, + }, 1) + require.Nil(t, err) + require.Equal(t, 0, s.snapshotSendQueue.Len()) + // At this point both the first two tasks will be holding reservations and + // waiting for cleanup, a third task will block. + var wg sync.WaitGroup + wg.Add(2) + go func() { + before := timeutil.Now() + cleanup3, err := s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSnapshotRequest{ + SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE, + SenderQueuePriority: 1, + }, 1) + after := timeutil.Now() + cleanup3() + wg.Done() + require.Nil(t, err) + require.GreaterOrEqual(t, after.Sub(before), 10*time.Millisecond) + }() + + // This task will not block for more than a few MS, but we want to wait for + // it to complete to make sure it frees the permit. + go func() { + deadlineCtx, cancel := context.WithTimeout(ctx, 1*time.Millisecond) + defer cancel() + + // This will time out since the deadline is set artificially low. Make sure + // the permit is released. + _, err := s.reserveSendSnapshot(deadlineCtx, &kvserverpb.DelegateSnapshotRequest{ + SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE, + SenderQueuePriority: 1, + }, 1) + wg.Done() + require.NotNil(t, err) + }() + + // Wait a little time before calling signaling the first two as complete. + time.Sleep(100 * time.Millisecond) + cleanup1() + cleanup2() + + // Wait until all cleanup run before checking the number of permits. + wg.Wait() + require.Equal(t, 2, s.snapshotSendQueue.Len()) +} + func TestReserveSnapshotThrottling(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -3108,7 +3180,7 @@ func TestReserveSnapshotQueueTimeoutAvoidsStarvation(t *testing.T) { defer stopper.Stop(ctx) tsc := TestStoreConfig(nil) // Set the concurrency to 1 explicitly, in case the default ever changes. - tsc.concurrentSnapshotApplyLimit = 1 + tsc.SnapshotApplyLimit = 1 tc := testContext{} tc.StartWithStoreConfig(ctx, t, stopper, tsc) s := tc.store diff --git a/pkg/server/config.go b/pkg/server/config.go index f8ff1467366a..04be5c71dbab 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -364,6 +364,19 @@ type KVConfig struct { DelayedBootstrapFn func() enginesCreated bool + + // SnapshotSendLimit is the number of concurrent snapshots a store will send. + SnapshotSendLimit int64 + + // SnapshotApplyLimit is the number of concurrent snapshots a store will + // apply. The send limit is typically higher than the apply limit for a few + // reasons. One is that it keeps "pipelining" of requests in the case where + // there is only a single sender and single receiver. As soon as a receiver + // finishes a request, there will be another one to start. The performance + // impact of sending snapshots is lower than applying. Finally, snapshots are + // not sent until the receiver is ready to apply, so the cost of sending is + // low until the receiver is ready. + SnapshotApplyLimit int64 } // MakeKVConfig returns a KVConfig with default values. @@ -375,6 +388,8 @@ func MakeKVConfig(storeSpec base.StoreSpec) KVConfig { ScanMinIdleTime: defaultScanMinIdleTime, ScanMaxIdleTime: defaultScanMaxIdleTime, EventLogEnabled: defaultEventLogEnabled, + SnapshotSendLimit: kvserver.DefaultSnapshotSendLimit, + SnapshotApplyLimit: kvserver.DefaultSnapshotApplyLimit, Stores: base.StoreSpecList{ Specs: []base.StoreSpec{storeSpec}, }, @@ -770,6 +785,8 @@ func (cfg *Config) readEnvironmentVariables() { cfg.ScanInterval = envutil.EnvOrDefaultDuration("COCKROACH_SCAN_INTERVAL", cfg.ScanInterval) cfg.ScanMinIdleTime = envutil.EnvOrDefaultDuration("COCKROACH_SCAN_MIN_IDLE_TIME", cfg.ScanMinIdleTime) cfg.ScanMaxIdleTime = envutil.EnvOrDefaultDuration("COCKROACH_SCAN_MAX_IDLE_TIME", cfg.ScanMaxIdleTime) + cfg.SnapshotSendLimit = envutil.EnvOrDefaultInt64("COCKROACH_CONCURRENT_SNAPSHOT_SEND_LIMIT", cfg.SnapshotSendLimit) + cfg.SnapshotApplyLimit = envutil.EnvOrDefaultInt64("COCKROACH_CONCURRENT_SNAPSHOT_APPLY_LIMIT", cfg.SnapshotApplyLimit) } // parseGossipBootstrapAddresses parses list of gossip bootstrap addresses. diff --git a/pkg/server/server.go b/pkg/server/server.go index 3e48119b1e32..3f24182238d2 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -667,6 +667,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { SystemConfigProvider: systemConfigWatcher, SpanConfigSubscriber: spanConfig.subscriber, SpanConfigsDisabled: cfg.SpanConfigsDisabled, + SnapshotApplyLimit: cfg.SnapshotApplyLimit, + SnapshotSendLimit: cfg.SnapshotSendLimit, } if storeTestingKnobs := cfg.TestingKnobs.Store; storeTestingKnobs != nil { diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 58ba49162180..b945143300b6 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -235,6 +235,12 @@ func makeTestConfigFromParams(params base.TestServerArgs) Config { if params.DisableSpanConfigs { cfg.SpanConfigsDisabled = true } + if params.SnapshotApplyLimit != 0 { + cfg.SnapshotApplyLimit = params.SnapshotApplyLimit + } + if params.SnapshotSendLimit != 0 { + cfg.SnapshotSendLimit = params.SnapshotSendLimit + } // Ensure we have the correct number of engines. Add in-memory ones where // needed. There must be at least one store/engine.