Skip to content

Commit

Permalink
kvserver: Run one ScanInterleavedIntents per store concurrently
Browse files Browse the repository at this point in the history
Currently, lock table migration related requests (Barrier,
ScanInterleavedIntents, PushTxn, ResolveIntent), are only run
with a concurrency of 4 regardless of the size of the cluster.

This change increases that to 4 * numNodes, and adds a new
mechanism to limit the ScanInterleavedIntents on the receiver
side to 1 per store. This throttle is applied before latches
are grabbed. Only ScanInterleavedIntents needs to be
rate-limited as it's by far the heaviest component in
the separated intents migrations.

Release justification: Category 2, low-risk update to new functionality
Release note: None.
  • Loading branch information
itsbilal committed Aug 31, 2021
1 parent c4778c7 commit 94d7ddf
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 16 deletions.
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ type Limiters struct {
// rangefeeds in the "catch-up" state across the store. The "catch-up" state
// is a temporary state at the beginning of a rangefeed which is expensive
// because it uses an engine iterator.
ConcurrentRangefeedIters limit.ConcurrentRequestLimiter
ConcurrentRangefeedIters limit.ConcurrentRequestLimiter
ConcurrentScanInterleavedIntents limit.ConcurrentRequestLimiter
}

// EvalContext is the interface through which command evaluation accesses the
Expand Down
21 changes: 21 additions & 0 deletions pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -277,6 +278,21 @@ func (r *Replica) maybeAddRangeInfoToResponse(
}
}

func (r *Replica) maybeThrottleRequestPreLatch(
ctx context.Context, ba *roachpb.BatchRequest,
) limit.Reservation {
switch {
case ba.IsSingleScanInterleavedIntentsRequest():
reservation, err := r.store.limiters.ConcurrentScanInterleavedIntents.Begin(ctx)
if err != nil {
return nil
}
return reservation
default:
return nil
}
}

// batchExecutionFn is a method on Replica that executes a BatchRequest. It is
// called with the batch, along with a guard for the latches protecting the
// request.
Expand Down Expand Up @@ -335,6 +351,11 @@ func (r *Replica) executeBatchWithConcurrencyRetries(
// Handle load-based splitting.
r.recordBatchForLoadBasedSplitting(ctx, ba, latchSpans)

// Handle any request-specific pre-latch throttles.
if res := r.maybeThrottleRequestPreLatch(ctx, ba); res != nil {
defer res.Release()
}

// Try to execute command; exit retry loop on success.
var g *concurrency.Guard
defer func() {
Expand Down
15 changes: 15 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ var concurrentRangefeedItersLimit = settings.RegisterIntSetting(
settings.PositiveInt,
)

// concurrentscanInterleavedIntentsLimit is the number of concurrent
// ScanInterleavedIntents requests that will be run on a store. Used as part
// of pre-evaluation throttling.
var concurrentscanInterleavedIntentsLimit = settings.RegisterIntSetting(
"kv.migration.concurrent_scan_interleaved_intents",
"number of scan interleaved intents requests a store will handle concurrently before queueing",
1,
settings.PositiveInt,
)

// Minimum time interval between system config updates which will lead to
// enqueuing replicas.
var queueAdditionOnSystemConfigUpdateRate = settings.RegisterFloatSetting(
Expand Down Expand Up @@ -860,6 +870,11 @@ func NewStore(
s.limiters.ConcurrentExportRequests = limit.MakeConcurrentRequestLimiter(
"exportRequestLimiter", int(ExportRequestsLimit.Get(&cfg.Settings.SV)),
)
s.limiters.ConcurrentScanInterleavedIntents = limit.MakeConcurrentRequestLimiter(
"scanInterleavedIntentsLimiter", int(concurrentscanInterleavedIntentsLimit.Get(&cfg.Settings.SV)))
concurrentscanInterleavedIntentsLimit.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) {
s.limiters.ConcurrentScanInterleavedIntents.SetLimit(int(concurrentscanInterleavedIntentsLimit.Get(&cfg.Settings.SV)))
})

// The snapshot storage is usually empty at this point since it is cleared
// after each snapshot application, except when the node crashed right before
Expand Down
9 changes: 9 additions & 0 deletions pkg/migration/migrationcluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ func (c *Cluster) UntilClusterStable(ctx context.Context, fn func() error) error
return nil
}

// NumNodes is part of the migration.Cluster interface.
func (c *Cluster) NumNodes(ctx context.Context) (int, error) {
ns, err := NodesFromNodeLiveness(ctx, c.c.NodeLiveness)
if err != nil {
return 0, err
}
return len(ns), nil
}

// ForEveryNode is part of the migration.Cluster interface.
func (c *Cluster) ForEveryNode(
ctx context.Context, op string, fn func(context.Context, serverpb.MigrationClient) error,
Expand Down
5 changes: 5 additions & 0 deletions pkg/migration/migrationcluster/tenant_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ func NewTenantCluster(db *kv.DB) *TenantCluster {
return &TenantCluster{db: db}
}

// NumNodes is part of the migration.Cluster interface.
func (t *TenantCluster) NumNodes(ctx context.Context) (int, error) {
return 0, errors.AssertionFailedf("non-system tenants cannot iterate nodes")
}

// ForEveryNode is part of the migration.Cluster interface.
func (t *TenantCluster) ForEveryNode(
ctx context.Context, op string, fn func(context.Context, serverpb.MigrationClient) error,
Expand Down
33 changes: 19 additions & 14 deletions pkg/migration/migrations/separated_intents.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,14 @@ import (
"github.com/cockroachdb/errors"
)

// The number of concurrent migrateLockTableRequests requests to run. This
// is effectively a cluster-wide setting as the actual legwork of the migration
// happens when the destination replica(s) are sending replies back to the
// original node.
// The number of concurrent lock table migration related requests (eg. barrier,
// ScanInterleavedIntents) to run, as a multiplier of the number of nodes. If
// this cluster has 15 nodes, 15 * concurrentMigrateLockTableRequests requests
// will be executed at once.
//
// TODO(bilal): Add logic to make this concurrency limit a per-leaseholder limit
// as opposed to a cluster-wide limit. That way, we could limit
// migrateLockTableRequests to 1 per leaseholder as opposed to 4 for the entire
// cluster, avoiding the case where all 4 ranges at a time could have the same node
// as their leaseholder.
// Note that some of the requests (eg. ScanInterleavedIntents) do throttling on
// the receiving end, to reduce the chances of a large number of requests
// being directed at a few nodes.
const concurrentMigrateLockTableRequests = 4

// The maximum number of times to retry a migrateLockTableRequest before failing
Expand Down Expand Up @@ -87,7 +85,7 @@ type migrateLockTablePool struct {
db *kv.DB
clock *hlc.Clock
done chan bool
status [concurrentMigrateLockTableRequests]int64
status []int64
finished uint64

mu struct {
Expand Down Expand Up @@ -334,7 +332,7 @@ func (m *migrateLockTablePool) runStatusLogger(ctx context.Context) {
select {
case <-ticker.C:
var ranges strings.Builder
for i := 0; i < concurrentMigrateLockTableRequests; i++ {
for i := 0; i < len(m.status); i++ {
rangeID := atomic.LoadInt64(&m.status[i])
if rangeID == 0 {
continue
Expand Down Expand Up @@ -384,15 +382,18 @@ func runSeparatedIntentsMigration(
db *kv.DB,
ri rangeIterator,
ir intentResolver,
numNodes int,
) error {
concurrentRequests := concurrentMigrateLockTableRequests * numNodes
workerPool := migrateLockTablePool{
requests: make(chan migrateLockTableRequest, concurrentMigrateLockTableRequests),
requests: make(chan migrateLockTableRequest, concurrentRequests),
stopper: stopper,
db: db,
ir: ir,
clock: clock,
status: make([]int64, concurrentRequests),
}
migratedRanges, err := workerPool.runMigrateRequestsForRanges(ctx, ri, concurrentMigrateLockTableRequests)
migratedRanges, err := workerPool.runMigrateRequestsForRanges(ctx, ri, concurrentRequests)
if err != nil {
return err
}
Expand All @@ -416,7 +417,11 @@ func separatedIntentsMigration(
})
ri := kvcoord.NewRangeIterator(deps.DistSender)

return runSeparatedIntentsMigration(ctx, deps.DB.Clock(), deps.Stopper, deps.DB, ri, ir)
numNodes, err := deps.Cluster.NumNodes(ctx)
if err != nil {
return err
}
return runSeparatedIntentsMigration(ctx, deps.DB.Clock(), deps.Stopper, deps.DB, ri, ir, numNodes)
}

func postSeparatedIntentsMigration(
Expand Down
2 changes: 1 addition & 1 deletion pkg/migration/migrations/separated_intents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func TestRunSeparatedIntentsMigration(t *testing.T) {
errorPerNCalls, err = strconv.Atoi(d.Input)
require.NoError(t, err)
case "run-migration":
err := runSeparatedIntentsMigration(ctx, hlcClock, stopper, db, ri, ir)
err := runSeparatedIntentsMigration(ctx, hlcClock, stopper, db, ri, ir, 1)
if err == nil {
return "ok"
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/migration/system_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ import (
// Cluster abstracts a physical KV cluster and can be utilized by a long-running
// migration.
type Cluster interface {
// NumNodes returns the number of nodes in the cluster. This is merely a
// convenience method and is not meant to be used to infer cluster stability;
// for that, use UntilClusterStable.
NumNodes(ctx context.Context) (int, error)

// ForEveryNode is a short hand to execute the given closure (named by the
// informational parameter op) against every node in the cluster at a given
// point in time. Given it's possible for nodes to join or leave the cluster
Expand Down
6 changes: 6 additions & 0 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,12 @@ func (ba *BatchRequest) IsSingleCheckConsistencyRequest() bool {
return ba.isSingleRequestWithMethod(CheckConsistency)
}

// IsSingleScanInterleavedIntentsRequest returns true iff the batch contains a
// single ScanInterleavedIntents request
func (ba *BatchRequest) IsSingleScanInterleavedIntentsRequest() bool {
return ba.isSingleRequestWithMethod(ScanInterleavedIntents)
}

// RequiresConsensus returns true iff the batch contains a request that should
// always force replication and proposal through raft, even if evaluation is
// a no-op. The Barrier request requires consensus even though its evaluation
Expand Down

0 comments on commit 94d7ddf

Please sign in to comment.