Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: honor priority on send and receive snapshots #86701

Merged
merged 1 commit into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -295,4 +295,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 1000022.1-68 set the active cluster version in the format '<major>.<minor>'
version version 1000022.1-70 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,6 @@
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.span_registry.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://<ui>/#/debug/tracez</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>1000022.1-68</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>1000022.1-70</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
2 changes: 2 additions & 0 deletions pkg/base/test_server_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ type TestServerArgs struct {
TimeSeriesQueryMemoryBudget int64
SQLMemoryPoolSize int64
CacheSize int64
SnapshotSendLimit int64
SnapshotApplyLimit int64

// By default, test servers have AutoInitializeCluster=true set in
// their config. If NoAutoInitializeCluster is set, that behavior is disabled
Expand Down
8 changes: 8 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ const (
// TTLDistSQL uses DistSQL to distribute TTL SELECT/DELETE statements to
// leaseholder nodes.
TTLDistSQL
// PrioritizeSnapshots adds prioritization to sender snapshots. When this
// version is enabled, the receiver will look at the priority of snapshots
// using the fields added in 22.2.
PrioritizeSnapshots

// *************************************************
// Step (1): Add new versions here.
Expand Down Expand Up @@ -453,6 +457,10 @@ var rawVersionsSingleton = keyedVersions{
Key: TTLDistSQL,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 68},
},
{
Key: PrioritizeSnapshots,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 70},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ go_library(
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/liveness",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/kv/kvserver/multiqueue",
"//pkg/kv/kvserver/raftentry",
"//pkg/kv/kvserver/raftutil",
"//pkg/kv/kvserver/rangefeed",
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,10 @@ func (s *Store) ManualRaftSnapshot(repl *Replica, target roachpb.ReplicaID) erro
return err
}

// ReservationCount counts the number of outstanding reservations that are not
// running.
func (s *Store) ReservationCount() int {
return len(s.snapshotApplySem)
return int(s.cfg.SnapshotApplyLimit) - s.snapshotApplyQueue.Len()
}

// RaftSchedulerPriorityID returns the Raft scheduler's prioritized range.
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/kvserverpb/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,11 @@ message SnapshotRequest {
bool deprecated_unreplicated_truncated_state = 8;

// The sending queue's name, to be utilized to ensure fairness across
// different snapshot sending sources.
// different snapshot sending sources. The default queue name, OTHER, is
// reserved for any uncategorized and unprioritized snapshots, and requests
// with sender queue name OTHER may not specify a non-zero
// sender_queue_priority. To prioritize snapshots categorized as OTHER,
// first move them to a different queue name.
SnapshotRequest.QueueName sender_queue_name = 10;

// The sending queue's priority, to be utilized to prioritize snapshots
Expand Down
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2665,6 +2665,16 @@ func (r *Replica) sendSnapshot(
}
}

// Don't send a queue name or priority if the receiver may not understand
// them or the setting is disabled. TODO(baptist): Remove the version flag in
// v23.1. Consider removing the cluster setting once we have verified this
// works as expected in all cases.
if !r.store.ClusterSettings().Version.IsActive(ctx, clusterversion.PrioritizeSnapshots) ||
!snapshotPrioritizationEnabled.Get(&r.store.ClusterSettings().SV) {
senderQueueName = 0
senderQueuePriority = 0
}

log.VEventf(
ctx, 2, "delegating snapshot transmission for %v to %v", recipient, sender,
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func TestAddReplicaWithReceiverThrottling(t *testing.T) {
ctx := context.Background()
tc := testcluster.StartTestCluster(
t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{Knobs: knobs},
ServerArgs: base.TestServerArgs{Knobs: knobs, SnapshotSendLimit: 1},
ReplicationMode: base.ReplicationManual,
},
)
Expand Down Expand Up @@ -1066,7 +1066,7 @@ func TestLearnerReplicateQueueRace(t *testing.T) {
}
ctx := context.Background()
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{Knobs: knobs},
ServerArgs: base.TestServerArgs{Knobs: knobs, SnapshotSendLimit: 1},
ReplicationMode: base.ReplicationManual,
})
defer tc.Stopper().Stop(ctx)
Expand Down
37 changes: 14 additions & 23 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/multiqueue"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats"
Expand Down Expand Up @@ -253,6 +254,8 @@ func testStoreConfig(clock *hlc.Clock, version roachpb.Version) StoreConfig {
ScanInterval: 10 * time.Minute,
HistogramWindowInterval: metric.TestSampleInterval,
ProtectedTimestampReader: spanconfig.EmptyProtectedTSReader(clock),
SnapshotSendLimit: DefaultSnapshotSendLimit,
SnapshotApplyLimit: DefaultSnapshotApplyLimit,

// Use a constant empty system config, which mirrors the previously
// existing logic to install an empty system config in gossip.
Expand Down Expand Up @@ -769,12 +772,11 @@ type Store struct {
nodeDesc *roachpb.NodeDescriptor
initComplete sync.WaitGroup // Signaled by async init tasks

// Semaphore to limit concurrent non-empty snapshot application.
snapshotApplySem chan struct{}
// Semaphore to limit concurrent non-empty snapshot sending.
initialSnapshotSendSem chan struct{}
// Semaphore to limit concurrent non-empty snapshot sending.
raftSnapshotSendSem chan struct{}
// Queue to limit concurrent non-empty snapshot application.
snapshotApplyQueue *multiqueue.MultiQueue

// Queue to limit concurrent non-empty snapshot sending.
snapshotSendQueue *multiqueue.MultiQueue

// Track newly-acquired expiration-based leases that we want to proactively
// renew. An object is sent on the signal whenever a new entry is added to
Expand Down Expand Up @@ -1053,14 +1055,14 @@ type StoreConfig struct {

TestingKnobs StoreTestingKnobs

// concurrentSnapshotApplyLimit specifies the maximum number of empty
// SnapshotApplyLimit specifies the maximum number of empty
// snapshots and the maximum number of non-empty snapshots that are permitted
// to be applied concurrently.
concurrentSnapshotApplyLimit int
SnapshotApplyLimit int64

// concurrentSnapshotSendLimit specifies the maximum number of each type of
// SnapshotSendLimit specifies the maximum number of each type of
// snapshot that are permitted to be sent concurrently.
concurrentSnapshotSendLimit int
SnapshotSendLimit int64

// HistogramWindowInterval is (server.Config).HistogramWindowInterval
HistogramWindowInterval time.Duration
Expand Down Expand Up @@ -1145,16 +1147,6 @@ func (sc *StoreConfig) SetDefaults() {
if sc.RaftEntryCacheSize == 0 {
sc.RaftEntryCacheSize = defaultRaftEntryCacheSize
}
if sc.concurrentSnapshotApplyLimit == 0 {
// NB: setting this value higher than 1 is likely to degrade client
// throughput.
sc.concurrentSnapshotApplyLimit =
envutil.EnvOrDefaultInt("COCKROACH_CONCURRENT_SNAPSHOT_APPLY_LIMIT", 1)
}
if sc.concurrentSnapshotSendLimit == 0 {
sc.concurrentSnapshotSendLimit =
envutil.EnvOrDefaultInt("COCKROACH_CONCURRENT_SNAPSHOT_SEND_LIMIT", 1)
}

if sc.TestingKnobs.GossipWhenCapacityDeltaExceedsFraction == 0 {
sc.TestingKnobs.GossipWhenCapacityDeltaExceedsFraction = defaultGossipWhenCapacityDeltaExceedsFraction
Expand Down Expand Up @@ -1256,9 +1248,8 @@ func NewStore(

s.txnWaitMetrics = txnwait.NewMetrics(cfg.HistogramWindowInterval)
s.metrics.registry.AddMetricStruct(s.txnWaitMetrics)
s.snapshotApplySem = make(chan struct{}, cfg.concurrentSnapshotApplyLimit)
s.initialSnapshotSendSem = make(chan struct{}, cfg.concurrentSnapshotSendLimit)
s.raftSnapshotSendSem = make(chan struct{}, cfg.concurrentSnapshotSendLimit)
s.snapshotApplyQueue = multiqueue.NewMultiQueue(int(cfg.SnapshotApplyLimit))
s.snapshotSendQueue = multiqueue.NewMultiQueue(int(cfg.SnapshotSendLimit))
if ch := s.cfg.TestingKnobs.LeaseRenewalSignalChan; ch != nil {
s.renewableLeasesSignal = ch
} else {
Expand Down
61 changes: 46 additions & 15 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/multiqueue"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
Expand Down Expand Up @@ -56,6 +57,24 @@ const (
// tagSnapshotTiming is the tracing span tag that the *snapshotTimingTag
// lives under.
tagSnapshotTiming = "snapshot_timing_tag"

// DefaultSnapshotSendLimit is the max number of snapshots concurrently sent.
// See server.KVConfig for more info.
DefaultSnapshotSendLimit = 2

// DefaultSnapshotApplyLimit is the number of snapshots concurrently applied.
// See server.KVConfig for more info.
DefaultSnapshotApplyLimit = 1
)

// snapshotPrioritizationEnabled will allow the sender and receiver of snapshots
// to prioritize the snapshots. If disabled, the behavior will be FIFO on both
// send and receive sides.
var snapshotPrioritizationEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.snapshot_prioritization.enabled",
"if true, then prioritize enqueued snapshots on both the send or receive sides",
true,
)

// incomingSnapshotStream is the minimal interface on a GRPC stream required
Expand Down Expand Up @@ -659,10 +678,12 @@ func (kvSS *kvBatchSnapshotStrategy) Close(ctx context.Context) {
func (s *Store) reserveReceiveSnapshot(
ctx context.Context, header *kvserverpb.SnapshotRequest_Header,
) (_cleanup func(), _err error) {
ctx, sp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "reserveSnapshot")
ctx, sp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "reserveReceiveSnapshot")
defer sp.Finish()
return s.throttleSnapshot(
ctx, s.snapshotApplySem, header.RangeSize,

return s.throttleSnapshot(ctx, s.snapshotApplyQueue,
int(header.SenderQueueName), header.SenderQueuePriority,
header.RangeSize,
header.RaftMessageRequest.RangeID, header.RaftMessageRequest.ToReplica.ReplicaID,
s.metrics.RangeSnapshotRecvQueueLength,
s.metrics.RangeSnapshotRecvInProgress, s.metrics.RangeSnapshotRecvTotalInProgress,
Expand All @@ -675,14 +696,13 @@ func (s *Store) reserveSendSnapshot(
) (_cleanup func(), _err error) {
ctx, sp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "reserveSendSnapshot")
defer sp.Finish()
sem := s.initialSnapshotSendSem
if req.Type == kvserverpb.SnapshotRequest_VIA_SNAPSHOT_QUEUE {
sem = s.raftSnapshotSendSem
}
if fn := s.cfg.TestingKnobs.BeforeSendSnapshotThrottle; fn != nil {
fn()
}
return s.throttleSnapshot(ctx, sem, rangeSize,

return s.throttleSnapshot(ctx, s.snapshotSendQueue,
int(req.SenderQueueName), req.SenderQueuePriority,
rangeSize,
req.RangeID, req.DelegatedSender.ReplicaID,
s.metrics.RangeSnapshotSendQueueLength,
s.metrics.RangeSnapshotSendInProgress, s.metrics.RangeSnapshotSendTotalInProgress,
Expand All @@ -694,18 +714,28 @@ func (s *Store) reserveSendSnapshot(
// release its resources.
func (s *Store) throttleSnapshot(
ctx context.Context,
snapshotSem chan struct{},
snapshotQueue *multiqueue.MultiQueue,
requestSource int,
requestPriority float64,
rangeSize int64,
rangeID roachpb.RangeID,
replicaID roachpb.ReplicaID,
waitingSnapshotMetric, inProgressSnapshotMetric, totalInProgressSnapshotMetric *metric.Gauge,
) (_cleanup func(), _err error) {
) (cleanup func(), err error) {
tBegin := timeutil.Now()
var permit *multiqueue.Permit
// Empty snapshots are exempt from rate limits because they're so cheap to
// apply. This vastly speeds up rebalancing any empty ranges created by a
// RESTORE or manual SPLIT AT, since it prevents these empty snapshots from
// getting stuck behind large snapshots managed by the replicate queue.
if rangeSize != 0 || s.cfg.TestingKnobs.ThrottleEmptySnapshots {
task := snapshotQueue.Add(requestSource, requestPriority)
defer func() {
if err != nil {
snapshotQueue.Cancel(task)
}
}()

waitingSnapshotMetric.Inc(1)
defer waitingSnapshotMetric.Dec(1)
queueCtx := ctx
Expand All @@ -721,13 +751,14 @@ func (s *Store) throttleSnapshot(
defer cancel()
}
select {
case snapshotSem <- struct{}{}:
// Got a spot in the semaphore, continue with sending the snapshot.
case permit = <-task.GetWaitChan():
// Got a spot in the snapshotQueue, continue with sending the snapshot.
if fn := s.cfg.TestingKnobs.AfterSendSnapshotThrottle; fn != nil {
fn()
}
log.Event(ctx, "acquired spot in the snapshot semaphore")
log.Event(ctx, "acquired spot in the snapshot snapshotQueue")
case <-queueCtx.Done():
// We need to cancel the task so that it doesn't ever get a permit.
if err := ctx.Err(); err != nil {
return nil, errors.Wrap(err, "acquiring snapshot reservation")
}
Expand Down Expand Up @@ -772,7 +803,7 @@ func (s *Store) throttleSnapshot(

if rangeSize != 0 || s.cfg.TestingKnobs.ThrottleEmptySnapshots {
inProgressSnapshotMetric.Dec(1)
<-snapshotSem
snapshotQueue.Release(permit)
}
}, nil
}
Expand Down Expand Up @@ -1145,7 +1176,7 @@ var snapshotSenderBatchSize = settings.RegisterByteSizeSetting(
// snapshot's total timeout that it is allowed to spend queued on the receiver
// waiting for a reservation.
//
// Enforcement of this snapshotApplySem-scoped timeout is intended to prevent
// Enforcement of this snapshotApplyQueue-scoped timeout is intended to prevent
// starvation of snapshots in cases where a queue of snapshots waiting for
// reservations builds and no single snapshot acquires the semaphore with
// sufficient time to complete, but each holds the semaphore long enough to
Expand Down
Loading