Skip to content

Commit

Permalink
kvserver: Run one ScanInterleavedIntents per store concurrently
Browse files Browse the repository at this point in the history
Currently, lock table migration related requests (Barrier,
ScanInterleavedIntents, PushTxn, ResolveIntent), are only run
with a concurrency of 4 regardless of the size of the cluster.

This change increases that to 4 * numNodes, and adds a new
mechanism to limit the ScanInterleavedIntents on the receiver
side to 1 per store. This throttle is applied before latches
are grabbed. Only ScanInterleavedIntents needs to be
rate-limited as it's by far the heaviest component in
the separated intents migrations.

Release justification: Category 2, low-risk update to new functionality
Release note: None.
  • Loading branch information
itsbilal committed Aug 31, 2021
1 parent c4778c7 commit 9e720b6
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 16 deletions.
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ type Limiters struct {
// rangefeeds in the "catch-up" state across the store. The "catch-up" state
// is a temporary state at the beginning of a rangefeed which is expensive
// because it uses an engine iterator.
ConcurrentRangefeedIters limit.ConcurrentRequestLimiter
ConcurrentRangefeedIters limit.ConcurrentRequestLimiter
ConcurrentScanInterleavedIntents limit.ConcurrentRequestLimiter
}

// EvalContext is the interface through which command evaluation accesses the
Expand Down
15 changes: 15 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ var concurrentRangefeedItersLimit = settings.RegisterIntSetting(
settings.PositiveInt,
)

// concurrentscanInterleavedIntentsLimit is the number of concurrent
// ScanInterleavedIntents requests that will be run on a store. Used as part
// of pre-evaluation throttling.
var concurrentscanInterleavedIntentsLimit = settings.RegisterIntSetting(
"kv.migration.concurrent_scan_interleaved_intents",
"number of scan interleaved intents requests a store will handle concurrently before queueing",
1,
settings.PositiveInt,
)

// Minimum time interval between system config updates which will lead to
// enqueuing replicas.
var queueAdditionOnSystemConfigUpdateRate = settings.RegisterFloatSetting(
Expand Down Expand Up @@ -860,6 +870,11 @@ func NewStore(
s.limiters.ConcurrentExportRequests = limit.MakeConcurrentRequestLimiter(
"exportRequestLimiter", int(ExportRequestsLimit.Get(&cfg.Settings.SV)),
)
s.limiters.ConcurrentScanInterleavedIntents = limit.MakeConcurrentRequestLimiter(
"scanInterleavedIntentsLimiter", int(concurrentscanInterleavedIntentsLimit.Get(&cfg.Settings.SV)))
concurrentscanInterleavedIntentsLimit.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) {
s.limiters.ConcurrentScanInterleavedIntents.SetLimit(int(concurrentscanInterleavedIntentsLimit.Get(&cfg.Settings.SV)))
})

// The snapshot storage is usually empty at this point since it is cleared
// after each snapshot application, except when the node crashed right before
Expand Down
13 changes: 13 additions & 0 deletions pkg/kv/kvserver/store_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,19 @@ func (s *Store) maybeThrottleBatch(
}
return res, nil

case *roachpb.ScanInterleavedIntentsRequest:
before := timeutil.Now()
res, err := s.limiters.ConcurrentScanInterleavedIntents.Begin(ctx)
if err != nil {
return nil, err
}

waited := timeutil.Since(before)
if waited > time.Second {
log.Infof(ctx, "ScanInterleavedIntents request was delayed by %v", waited)
}
return res, nil

default:
return nil, nil
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/migration/migrationcluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ func (c *Cluster) UntilClusterStable(ctx context.Context, fn func() error) error
return nil
}

// NumNodes is part of the migration.Cluster interface.
func (c *Cluster) NumNodes(ctx context.Context) (int, error) {
ns, err := NodesFromNodeLiveness(ctx, c.c.NodeLiveness)
if err != nil {
return 0, err
}
return len(ns), nil
}

// ForEveryNode is part of the migration.Cluster interface.
func (c *Cluster) ForEveryNode(
ctx context.Context, op string, fn func(context.Context, serverpb.MigrationClient) error,
Expand Down
5 changes: 5 additions & 0 deletions pkg/migration/migrationcluster/tenant_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ func NewTenantCluster(db *kv.DB) *TenantCluster {
return &TenantCluster{db: db}
}

// NumNodes is part of the migration.Cluster interface.
func (t *TenantCluster) NumNodes(ctx context.Context) (int, error) {
return 0, errors.AssertionFailedf("non-system tenants cannot iterate nodes")
}

// ForEveryNode is part of the migration.Cluster interface.
func (t *TenantCluster) ForEveryNode(
ctx context.Context, op string, fn func(context.Context, serverpb.MigrationClient) error,
Expand Down
33 changes: 19 additions & 14 deletions pkg/migration/migrations/separated_intents.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,14 @@ import (
"github.com/cockroachdb/errors"
)

// The number of concurrent migrateLockTableRequests requests to run. This
// is effectively a cluster-wide setting as the actual legwork of the migration
// happens when the destination replica(s) are sending replies back to the
// original node.
// The number of concurrent lock table migration related requests (eg. barrier,
// ScanInterleavedIntents) to run, as a multiplier of the number of nodes. If
// this cluster has 15 nodes, 15 * concurrentMigrateLockTableRequests requests
// will be executed at once.
//
// TODO(bilal): Add logic to make this concurrency limit a per-leaseholder limit
// as opposed to a cluster-wide limit. That way, we could limit
// migrateLockTableRequests to 1 per leaseholder as opposed to 4 for the entire
// cluster, avoiding the case where all 4 ranges at a time could have the same node
// as their leaseholder.
// Note that some of the requests (eg. ScanInterleavedIntents) do throttling on
// the receiving end, to reduce the chances of a large number of requests
// being directed at a few nodes.
const concurrentMigrateLockTableRequests = 4

// The maximum number of times to retry a migrateLockTableRequest before failing
Expand Down Expand Up @@ -87,7 +85,7 @@ type migrateLockTablePool struct {
db *kv.DB
clock *hlc.Clock
done chan bool
status [concurrentMigrateLockTableRequests]int64
status []int64
finished uint64

mu struct {
Expand Down Expand Up @@ -334,7 +332,7 @@ func (m *migrateLockTablePool) runStatusLogger(ctx context.Context) {
select {
case <-ticker.C:
var ranges strings.Builder
for i := 0; i < concurrentMigrateLockTableRequests; i++ {
for i := 0; i < len(m.status); i++ {
rangeID := atomic.LoadInt64(&m.status[i])
if rangeID == 0 {
continue
Expand Down Expand Up @@ -384,15 +382,18 @@ func runSeparatedIntentsMigration(
db *kv.DB,
ri rangeIterator,
ir intentResolver,
numNodes int,
) error {
concurrentRequests := concurrentMigrateLockTableRequests * numNodes
workerPool := migrateLockTablePool{
requests: make(chan migrateLockTableRequest, concurrentMigrateLockTableRequests),
requests: make(chan migrateLockTableRequest, concurrentRequests),
stopper: stopper,
db: db,
ir: ir,
clock: clock,
status: make([]int64, concurrentRequests),
}
migratedRanges, err := workerPool.runMigrateRequestsForRanges(ctx, ri, concurrentMigrateLockTableRequests)
migratedRanges, err := workerPool.runMigrateRequestsForRanges(ctx, ri, concurrentRequests)
if err != nil {
return err
}
Expand All @@ -416,7 +417,11 @@ func separatedIntentsMigration(
})
ri := kvcoord.NewRangeIterator(deps.DistSender)

return runSeparatedIntentsMigration(ctx, deps.DB.Clock(), deps.Stopper, deps.DB, ri, ir)
numNodes, err := deps.Cluster.NumNodes(ctx)
if err != nil {
return err
}
return runSeparatedIntentsMigration(ctx, deps.DB.Clock(), deps.Stopper, deps.DB, ri, ir, numNodes)
}

func postSeparatedIntentsMigration(
Expand Down
2 changes: 1 addition & 1 deletion pkg/migration/migrations/separated_intents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func TestRunSeparatedIntentsMigration(t *testing.T) {
errorPerNCalls, err = strconv.Atoi(d.Input)
require.NoError(t, err)
case "run-migration":
err := runSeparatedIntentsMigration(ctx, hlcClock, stopper, db, ri, ir)
err := runSeparatedIntentsMigration(ctx, hlcClock, stopper, db, ri, ir, 1)
if err == nil {
return "ok"
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/migration/system_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ import (
// Cluster abstracts a physical KV cluster and can be utilized by a long-running
// migration.
type Cluster interface {
// NumNodes returns the number of nodes in the cluster. This is merely a
// convenience method and is not meant to be used to infer cluster stability;
// for that, use UntilClusterStable.
NumNodes(ctx context.Context) (int, error)

// ForEveryNode is a short hand to execute the given closure (named by the
// informational parameter op) against every node in the cluster at a given
// point in time. Given it's possible for nodes to join or leave the cluster
Expand Down

0 comments on commit 9e720b6

Please sign in to comment.