diff --git a/pkg/kv/kvserver/batcheval/eval_context.go b/pkg/kv/kvserver/batcheval/eval_context.go index 8e17a018359e..e30278c48f30 100644 --- a/pkg/kv/kvserver/batcheval/eval_context.go +++ b/pkg/kv/kvserver/batcheval/eval_context.go @@ -39,7 +39,8 @@ type Limiters struct { // rangefeeds in the "catch-up" state across the store. The "catch-up" state // is a temporary state at the beginning of a rangefeed which is expensive // because it uses an engine iterator. - ConcurrentRangefeedIters limit.ConcurrentRequestLimiter + ConcurrentRangefeedIters limit.ConcurrentRequestLimiter + ConcurrentScanInterleavedIntents limit.ConcurrentRequestLimiter } // EvalContext is the interface through which command evaluation accesses the diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 068b4618b68f..f923ba6d3571 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/util/limit" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" @@ -277,6 +278,21 @@ func (r *Replica) maybeAddRangeInfoToResponse( } } +func (r *Replica) maybeThrottleRequestPreLatch( + ctx context.Context, ba *roachpb.BatchRequest, +) limit.Reservation { + switch { + case ba.IsSingleScanInterleavedIntentsRequest(): + reservation, err := r.store.limiters.ConcurrentScanInterleavedIntents.Begin(ctx) + if err != nil { + return nil + } + return reservation + default: + return nil + } +} + // batchExecutionFn is a method on Replica that executes a BatchRequest. It is // called with the batch, along with a guard for the latches protecting the // request. @@ -335,6 +351,11 @@ func (r *Replica) executeBatchWithConcurrencyRetries( // Handle load-based splitting. r.recordBatchForLoadBasedSplitting(ctx, ba, latchSpans) + // Handle any request-specific pre-latch throttles. + if res := r.maybeThrottleRequestPreLatch(ctx, ba); res != nil { + defer res.Release() + } + // Try to execute command; exit retry loop on success. var g *concurrency.Guard defer func() { diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 05928d5a4143..21707249512b 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -138,6 +138,16 @@ var concurrentRangefeedItersLimit = settings.RegisterIntSetting( settings.PositiveInt, ) +// concurrentscanInterleavedIntentsLimit is the number of concurrent +// ScanInterleavedIntents requests that will be run on a store. Used as part +// of pre-evaluation throttling. +var concurrentscanInterleavedIntentsLimit = settings.RegisterIntSetting( + "kv.migration.concurrent_scan_interleaved_intents", + "number of scan interleaved intents requests a store will handle concurrently before queueing", + 1, + settings.PositiveInt, +) + // Minimum time interval between system config updates which will lead to // enqueuing replicas. var queueAdditionOnSystemConfigUpdateRate = settings.RegisterFloatSetting( @@ -860,6 +870,11 @@ func NewStore( s.limiters.ConcurrentExportRequests = limit.MakeConcurrentRequestLimiter( "exportRequestLimiter", int(ExportRequestsLimit.Get(&cfg.Settings.SV)), ) + s.limiters.ConcurrentScanInterleavedIntents = limit.MakeConcurrentRequestLimiter( + "scanInterleavedIntentsLimiter", int(concurrentscanInterleavedIntentsLimit.Get(&cfg.Settings.SV))) + concurrentscanInterleavedIntentsLimit.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) { + s.limiters.ConcurrentScanInterleavedIntents.SetLimit(int(concurrentscanInterleavedIntentsLimit.Get(&cfg.Settings.SV))) + }) // The snapshot storage is usually empty at this point since it is cleared // after each snapshot application, except when the node crashed right before diff --git a/pkg/migration/migrationcluster/cluster.go b/pkg/migration/migrationcluster/cluster.go index e4208d848020..5435fae9e4ce 100644 --- a/pkg/migration/migrationcluster/cluster.go +++ b/pkg/migration/migrationcluster/cluster.go @@ -94,6 +94,15 @@ func (c *Cluster) UntilClusterStable(ctx context.Context, fn func() error) error return nil } +// NumNodes is part of the migration.Cluster interface. +func (c *Cluster) NumNodes(ctx context.Context) (int, error) { + ns, err := NodesFromNodeLiveness(ctx, c.c.NodeLiveness) + if err != nil { + return 0, err + } + return len(ns), nil +} + // ForEveryNode is part of the migration.Cluster interface. func (c *Cluster) ForEveryNode( ctx context.Context, op string, fn func(context.Context, serverpb.MigrationClient) error, diff --git a/pkg/migration/migrationcluster/tenant_cluster.go b/pkg/migration/migrationcluster/tenant_cluster.go index 66ca0dd22d84..455ebea6b9c9 100644 --- a/pkg/migration/migrationcluster/tenant_cluster.go +++ b/pkg/migration/migrationcluster/tenant_cluster.go @@ -127,6 +127,11 @@ func NewTenantCluster(db *kv.DB) *TenantCluster { return &TenantCluster{db: db} } +// NumNodes is part of the migration.Cluster interface. +func (t *TenantCluster) NumNodes(ctx context.Context) (int, error) { + return 0, errors.AssertionFailedf("non-system tenants cannot iterate nodes") +} + // ForEveryNode is part of the migration.Cluster interface. func (t *TenantCluster) ForEveryNode( ctx context.Context, op string, fn func(context.Context, serverpb.MigrationClient) error, diff --git a/pkg/migration/migrations/separated_intents.go b/pkg/migration/migrations/separated_intents.go index e2d61d88df2e..cc7f8395c8f4 100644 --- a/pkg/migration/migrations/separated_intents.go +++ b/pkg/migration/migrations/separated_intents.go @@ -35,16 +35,14 @@ import ( "github.com/cockroachdb/errors" ) -// The number of concurrent migrateLockTableRequests requests to run. This -// is effectively a cluster-wide setting as the actual legwork of the migration -// happens when the destination replica(s) are sending replies back to the -// original node. +// The number of concurrent lock table migration related requests (eg. barrier, +// ScanInterleavedIntents) to run, as a multiplier of the number of nodes. If +// this cluster has 15 nodes, 15 * concurrentMigrateLockTableRequests requests +// will be executed at once. // -// TODO(bilal): Add logic to make this concurrency limit a per-leaseholder limit -// as opposed to a cluster-wide limit. That way, we could limit -// migrateLockTableRequests to 1 per leaseholder as opposed to 4 for the entire -// cluster, avoiding the case where all 4 ranges at a time could have the same node -// as their leaseholder. +// Note that some of the requests (eg. ScanInterleavedIntents) do throttling on +// the receiving end, to reduce the chances of a large number of requests +// being directed at a few nodes. const concurrentMigrateLockTableRequests = 4 // The maximum number of times to retry a migrateLockTableRequest before failing @@ -87,7 +85,7 @@ type migrateLockTablePool struct { db *kv.DB clock *hlc.Clock done chan bool - status [concurrentMigrateLockTableRequests]int64 + status []int64 finished uint64 mu struct { @@ -334,7 +332,7 @@ func (m *migrateLockTablePool) runStatusLogger(ctx context.Context) { select { case <-ticker.C: var ranges strings.Builder - for i := 0; i < concurrentMigrateLockTableRequests; i++ { + for i := 0; i < len(m.status); i++ { rangeID := atomic.LoadInt64(&m.status[i]) if rangeID == 0 { continue @@ -384,15 +382,18 @@ func runSeparatedIntentsMigration( db *kv.DB, ri rangeIterator, ir intentResolver, + numNodes int, ) error { + concurrentRequests := concurrentMigrateLockTableRequests * numNodes workerPool := migrateLockTablePool{ - requests: make(chan migrateLockTableRequest, concurrentMigrateLockTableRequests), + requests: make(chan migrateLockTableRequest, concurrentRequests), stopper: stopper, db: db, ir: ir, clock: clock, + status: make([]int64, concurrentRequests), } - migratedRanges, err := workerPool.runMigrateRequestsForRanges(ctx, ri, concurrentMigrateLockTableRequests) + migratedRanges, err := workerPool.runMigrateRequestsForRanges(ctx, ri, concurrentRequests) if err != nil { return err } @@ -416,7 +417,11 @@ func separatedIntentsMigration( }) ri := kvcoord.NewRangeIterator(deps.DistSender) - return runSeparatedIntentsMigration(ctx, deps.DB.Clock(), deps.Stopper, deps.DB, ri, ir) + numNodes, err := deps.Cluster.NumNodes(ctx) + if err != nil { + return err + } + return runSeparatedIntentsMigration(ctx, deps.DB.Clock(), deps.Stopper, deps.DB, ri, ir, numNodes) } func postSeparatedIntentsMigration( diff --git a/pkg/migration/migrations/separated_intents_test.go b/pkg/migration/migrations/separated_intents_test.go index 3930d7a3dea2..d5c60779af42 100644 --- a/pkg/migration/migrations/separated_intents_test.go +++ b/pkg/migration/migrations/separated_intents_test.go @@ -262,7 +262,7 @@ func TestRunSeparatedIntentsMigration(t *testing.T) { errorPerNCalls, err = strconv.Atoi(d.Input) require.NoError(t, err) case "run-migration": - err := runSeparatedIntentsMigration(ctx, hlcClock, stopper, db, ri, ir) + err := runSeparatedIntentsMigration(ctx, hlcClock, stopper, db, ri, ir, 1) if err == nil { return "ok" } diff --git a/pkg/migration/system_migration.go b/pkg/migration/system_migration.go index 4b0935c7352d..aa072c0a4018 100644 --- a/pkg/migration/system_migration.go +++ b/pkg/migration/system_migration.go @@ -26,6 +26,11 @@ import ( // Cluster abstracts a physical KV cluster and can be utilized by a long-running // migration. type Cluster interface { + // NumNodes returns the number of nodes in the cluster. This is merely a + // convenience method and is not meant to be used to infer cluster stability; + // for that, use UntilClusterStable. + NumNodes(ctx context.Context) (int, error) + // ForEveryNode is a short hand to execute the given closure (named by the // informational parameter op) against every node in the cluster at a given // point in time. Given it's possible for nodes to join or leave the cluster diff --git a/pkg/roachpb/batch.go b/pkg/roachpb/batch.go index bafbae3a6fcd..6e1c2405c3a2 100644 --- a/pkg/roachpb/batch.go +++ b/pkg/roachpb/batch.go @@ -267,6 +267,12 @@ func (ba *BatchRequest) IsSingleCheckConsistencyRequest() bool { return ba.isSingleRequestWithMethod(CheckConsistency) } +// IsSingleScanInterleavedIntentsRequest returns true iff the batch contains a +// single ScanInterleavedIntents request +func (ba *BatchRequest) IsSingleScanInterleavedIntentsRequest() bool { + return ba.isSingleRequestWithMethod(ScanInterleavedIntents) +} + // RequiresConsensus returns true iff the batch contains a request that should // always force replication and proposal through raft, even if evaluation is // a no-op. The Barrier request requires consensus even though its evaluation