From 9e720b628b8acb906756dde19721ecf8b5df91ce Mon Sep 17 00:00:00 2001 From: Bilal Akhtar Date: Mon, 30 Aug 2021 16:06:18 -0400 Subject: [PATCH] kvserver: Run one ScanInterleavedIntents per store concurrently Currently, lock table migration related requests (Barrier, ScanInterleavedIntents, PushTxn, ResolveIntent), are only run with a concurrency of 4 regardless of the size of the cluster. This change increases that to 4 * numNodes, and adds a new mechanism to limit the ScanInterleavedIntents on the receiver side to 1 per store. This throttle is applied before latches are grabbed. Only ScanInterleavedIntents needs to be rate-limited as it's by far the heaviest component in the separated intents migrations. Release justification: Category 2, low-risk update to new functionality Release note: None. --- pkg/kv/kvserver/batcheval/eval_context.go | 3 +- pkg/kv/kvserver/store.go | 15 +++++++++ pkg/kv/kvserver/store_send.go | 13 ++++++++ pkg/migration/migrationcluster/cluster.go | 9 +++++ .../migrationcluster/tenant_cluster.go | 5 +++ pkg/migration/migrations/separated_intents.go | 33 +++++++++++-------- .../migrations/separated_intents_test.go | 2 +- pkg/migration/system_migration.go | 5 +++ 8 files changed, 69 insertions(+), 16 deletions(-) diff --git a/pkg/kv/kvserver/batcheval/eval_context.go b/pkg/kv/kvserver/batcheval/eval_context.go index 8e17a018359e..e30278c48f30 100644 --- a/pkg/kv/kvserver/batcheval/eval_context.go +++ b/pkg/kv/kvserver/batcheval/eval_context.go @@ -39,7 +39,8 @@ type Limiters struct { // rangefeeds in the "catch-up" state across the store. The "catch-up" state // is a temporary state at the beginning of a rangefeed which is expensive // because it uses an engine iterator. - ConcurrentRangefeedIters limit.ConcurrentRequestLimiter + ConcurrentRangefeedIters limit.ConcurrentRequestLimiter + ConcurrentScanInterleavedIntents limit.ConcurrentRequestLimiter } // EvalContext is the interface through which command evaluation accesses the diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 05928d5a4143..21707249512b 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -138,6 +138,16 @@ var concurrentRangefeedItersLimit = settings.RegisterIntSetting( settings.PositiveInt, ) +// concurrentscanInterleavedIntentsLimit is the number of concurrent +// ScanInterleavedIntents requests that will be run on a store. Used as part +// of pre-evaluation throttling. +var concurrentscanInterleavedIntentsLimit = settings.RegisterIntSetting( + "kv.migration.concurrent_scan_interleaved_intents", + "number of scan interleaved intents requests a store will handle concurrently before queueing", + 1, + settings.PositiveInt, +) + // Minimum time interval between system config updates which will lead to // enqueuing replicas. var queueAdditionOnSystemConfigUpdateRate = settings.RegisterFloatSetting( @@ -860,6 +870,11 @@ func NewStore( s.limiters.ConcurrentExportRequests = limit.MakeConcurrentRequestLimiter( "exportRequestLimiter", int(ExportRequestsLimit.Get(&cfg.Settings.SV)), ) + s.limiters.ConcurrentScanInterleavedIntents = limit.MakeConcurrentRequestLimiter( + "scanInterleavedIntentsLimiter", int(concurrentscanInterleavedIntentsLimit.Get(&cfg.Settings.SV))) + concurrentscanInterleavedIntentsLimit.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) { + s.limiters.ConcurrentScanInterleavedIntents.SetLimit(int(concurrentscanInterleavedIntentsLimit.Get(&cfg.Settings.SV))) + }) // The snapshot storage is usually empty at this point since it is cleared // after each snapshot application, except when the node crashed right before diff --git a/pkg/kv/kvserver/store_send.go b/pkg/kv/kvserver/store_send.go index b8f149d4e934..6e787d183075 100644 --- a/pkg/kv/kvserver/store_send.go +++ b/pkg/kv/kvserver/store_send.go @@ -315,6 +315,19 @@ func (s *Store) maybeThrottleBatch( } return res, nil + case *roachpb.ScanInterleavedIntentsRequest: + before := timeutil.Now() + res, err := s.limiters.ConcurrentScanInterleavedIntents.Begin(ctx) + if err != nil { + return nil, err + } + + waited := timeutil.Since(before) + if waited > time.Second { + log.Infof(ctx, "ScanInterleavedIntents request was delayed by %v", waited) + } + return res, nil + default: return nil, nil } diff --git a/pkg/migration/migrationcluster/cluster.go b/pkg/migration/migrationcluster/cluster.go index e4208d848020..5435fae9e4ce 100644 --- a/pkg/migration/migrationcluster/cluster.go +++ b/pkg/migration/migrationcluster/cluster.go @@ -94,6 +94,15 @@ func (c *Cluster) UntilClusterStable(ctx context.Context, fn func() error) error return nil } +// NumNodes is part of the migration.Cluster interface. +func (c *Cluster) NumNodes(ctx context.Context) (int, error) { + ns, err := NodesFromNodeLiveness(ctx, c.c.NodeLiveness) + if err != nil { + return 0, err + } + return len(ns), nil +} + // ForEveryNode is part of the migration.Cluster interface. func (c *Cluster) ForEveryNode( ctx context.Context, op string, fn func(context.Context, serverpb.MigrationClient) error, diff --git a/pkg/migration/migrationcluster/tenant_cluster.go b/pkg/migration/migrationcluster/tenant_cluster.go index 66ca0dd22d84..455ebea6b9c9 100644 --- a/pkg/migration/migrationcluster/tenant_cluster.go +++ b/pkg/migration/migrationcluster/tenant_cluster.go @@ -127,6 +127,11 @@ func NewTenantCluster(db *kv.DB) *TenantCluster { return &TenantCluster{db: db} } +// NumNodes is part of the migration.Cluster interface. +func (t *TenantCluster) NumNodes(ctx context.Context) (int, error) { + return 0, errors.AssertionFailedf("non-system tenants cannot iterate nodes") +} + // ForEveryNode is part of the migration.Cluster interface. func (t *TenantCluster) ForEveryNode( ctx context.Context, op string, fn func(context.Context, serverpb.MigrationClient) error, diff --git a/pkg/migration/migrations/separated_intents.go b/pkg/migration/migrations/separated_intents.go index e2d61d88df2e..cc7f8395c8f4 100644 --- a/pkg/migration/migrations/separated_intents.go +++ b/pkg/migration/migrations/separated_intents.go @@ -35,16 +35,14 @@ import ( "github.com/cockroachdb/errors" ) -// The number of concurrent migrateLockTableRequests requests to run. This -// is effectively a cluster-wide setting as the actual legwork of the migration -// happens when the destination replica(s) are sending replies back to the -// original node. +// The number of concurrent lock table migration related requests (eg. barrier, +// ScanInterleavedIntents) to run, as a multiplier of the number of nodes. If +// this cluster has 15 nodes, 15 * concurrentMigrateLockTableRequests requests +// will be executed at once. // -// TODO(bilal): Add logic to make this concurrency limit a per-leaseholder limit -// as opposed to a cluster-wide limit. That way, we could limit -// migrateLockTableRequests to 1 per leaseholder as opposed to 4 for the entire -// cluster, avoiding the case where all 4 ranges at a time could have the same node -// as their leaseholder. +// Note that some of the requests (eg. ScanInterleavedIntents) do throttling on +// the receiving end, to reduce the chances of a large number of requests +// being directed at a few nodes. const concurrentMigrateLockTableRequests = 4 // The maximum number of times to retry a migrateLockTableRequest before failing @@ -87,7 +85,7 @@ type migrateLockTablePool struct { db *kv.DB clock *hlc.Clock done chan bool - status [concurrentMigrateLockTableRequests]int64 + status []int64 finished uint64 mu struct { @@ -334,7 +332,7 @@ func (m *migrateLockTablePool) runStatusLogger(ctx context.Context) { select { case <-ticker.C: var ranges strings.Builder - for i := 0; i < concurrentMigrateLockTableRequests; i++ { + for i := 0; i < len(m.status); i++ { rangeID := atomic.LoadInt64(&m.status[i]) if rangeID == 0 { continue @@ -384,15 +382,18 @@ func runSeparatedIntentsMigration( db *kv.DB, ri rangeIterator, ir intentResolver, + numNodes int, ) error { + concurrentRequests := concurrentMigrateLockTableRequests * numNodes workerPool := migrateLockTablePool{ - requests: make(chan migrateLockTableRequest, concurrentMigrateLockTableRequests), + requests: make(chan migrateLockTableRequest, concurrentRequests), stopper: stopper, db: db, ir: ir, clock: clock, + status: make([]int64, concurrentRequests), } - migratedRanges, err := workerPool.runMigrateRequestsForRanges(ctx, ri, concurrentMigrateLockTableRequests) + migratedRanges, err := workerPool.runMigrateRequestsForRanges(ctx, ri, concurrentRequests) if err != nil { return err } @@ -416,7 +417,11 @@ func separatedIntentsMigration( }) ri := kvcoord.NewRangeIterator(deps.DistSender) - return runSeparatedIntentsMigration(ctx, deps.DB.Clock(), deps.Stopper, deps.DB, ri, ir) + numNodes, err := deps.Cluster.NumNodes(ctx) + if err != nil { + return err + } + return runSeparatedIntentsMigration(ctx, deps.DB.Clock(), deps.Stopper, deps.DB, ri, ir, numNodes) } func postSeparatedIntentsMigration( diff --git a/pkg/migration/migrations/separated_intents_test.go b/pkg/migration/migrations/separated_intents_test.go index 3930d7a3dea2..d5c60779af42 100644 --- a/pkg/migration/migrations/separated_intents_test.go +++ b/pkg/migration/migrations/separated_intents_test.go @@ -262,7 +262,7 @@ func TestRunSeparatedIntentsMigration(t *testing.T) { errorPerNCalls, err = strconv.Atoi(d.Input) require.NoError(t, err) case "run-migration": - err := runSeparatedIntentsMigration(ctx, hlcClock, stopper, db, ri, ir) + err := runSeparatedIntentsMigration(ctx, hlcClock, stopper, db, ri, ir, 1) if err == nil { return "ok" } diff --git a/pkg/migration/system_migration.go b/pkg/migration/system_migration.go index 4b0935c7352d..aa072c0a4018 100644 --- a/pkg/migration/system_migration.go +++ b/pkg/migration/system_migration.go @@ -26,6 +26,11 @@ import ( // Cluster abstracts a physical KV cluster and can be utilized by a long-running // migration. type Cluster interface { + // NumNodes returns the number of nodes in the cluster. This is merely a + // convenience method and is not meant to be used to infer cluster stability; + // for that, use UntilClusterStable. + NumNodes(ctx context.Context) (int, error) + // ForEveryNode is a short hand to execute the given closure (named by the // informational parameter op) against every node in the cluster at a given // point in time. Given it's possible for nodes to join or leave the cluster