Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: Run one ScanInterleavedIntents per store concurrently #69607

Merged
merged 1 commit into from
Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ type Limiters struct {
// rangefeeds in the "catch-up" state across the store. The "catch-up" state
// is a temporary state at the beginning of a rangefeed which is expensive
// because it uses an engine iterator.
ConcurrentRangefeedIters limit.ConcurrentRequestLimiter
ConcurrentRangefeedIters limit.ConcurrentRequestLimiter
ConcurrentScanInterleavedIntents limit.ConcurrentRequestLimiter
}

// EvalContext is the interface through which command evaluation accesses the
Expand Down
15 changes: 15 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ var concurrentRangefeedItersLimit = settings.RegisterIntSetting(
settings.PositiveInt,
)

// concurrentscanInterleavedIntentsLimit is the number of concurrent
// ScanInterleavedIntents requests that will be run on a store. Used as part
// of pre-evaluation throttling.
var concurrentscanInterleavedIntentsLimit = settings.RegisterIntSetting(
"kv.migration.concurrent_scan_interleaved_intents",
"number of scan interleaved intents requests a store will handle concurrently before queueing",
1,
settings.PositiveInt,
)

// Minimum time interval between system config updates which will lead to
// enqueuing replicas.
var queueAdditionOnSystemConfigUpdateRate = settings.RegisterFloatSetting(
Expand Down Expand Up @@ -860,6 +870,11 @@ func NewStore(
s.limiters.ConcurrentExportRequests = limit.MakeConcurrentRequestLimiter(
"exportRequestLimiter", int(ExportRequestsLimit.Get(&cfg.Settings.SV)),
)
s.limiters.ConcurrentScanInterleavedIntents = limit.MakeConcurrentRequestLimiter(
"scanInterleavedIntentsLimiter", int(concurrentscanInterleavedIntentsLimit.Get(&cfg.Settings.SV)))
concurrentscanInterleavedIntentsLimit.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) {
s.limiters.ConcurrentScanInterleavedIntents.SetLimit(int(concurrentscanInterleavedIntentsLimit.Get(&cfg.Settings.SV)))
})

// The snapshot storage is usually empty at this point since it is cleared
// after each snapshot application, except when the node crashed right before
Expand Down
13 changes: 13 additions & 0 deletions pkg/kv/kvserver/store_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,19 @@ func (s *Store) maybeThrottleBatch(
}
return res, nil

case *roachpb.ScanInterleavedIntentsRequest:
before := timeutil.Now()
res, err := s.limiters.ConcurrentScanInterleavedIntents.Begin(ctx)
if err != nil {
return nil, err
}

waited := timeutil.Since(before)
if waited > time.Second {
log.Infof(ctx, "ScanInterleavedIntents request was delayed by %v", waited)
}
return res, nil

default:
return nil, nil
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/migration/migrationcluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ func (c *Cluster) UntilClusterStable(ctx context.Context, fn func() error) error
return nil
}

// NumNodes is part of the migration.Cluster interface.
func (c *Cluster) NumNodes(ctx context.Context) (int, error) {
ns, err := NodesFromNodeLiveness(ctx, c.c.NodeLiveness)
if err != nil {
return 0, err
}
return len(ns), nil
}

// ForEveryNode is part of the migration.Cluster interface.
func (c *Cluster) ForEveryNode(
ctx context.Context, op string, fn func(context.Context, serverpb.MigrationClient) error,
Expand Down
5 changes: 5 additions & 0 deletions pkg/migration/migrationcluster/tenant_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ func NewTenantCluster(db *kv.DB) *TenantCluster {
return &TenantCluster{db: db}
}

// NumNodes is part of the migration.Cluster interface.
func (t *TenantCluster) NumNodes(ctx context.Context) (int, error) {
return 0, errors.AssertionFailedf("non-system tenants cannot iterate nodes")
}

// ForEveryNode is part of the migration.Cluster interface.
func (t *TenantCluster) ForEveryNode(
ctx context.Context, op string, fn func(context.Context, serverpb.MigrationClient) error,
Expand Down
33 changes: 19 additions & 14 deletions pkg/migration/migrations/separated_intents.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,14 @@ import (
"github.com/cockroachdb/errors"
)

// The number of concurrent migrateLockTableRequests requests to run. This
// is effectively a cluster-wide setting as the actual legwork of the migration
// happens when the destination replica(s) are sending replies back to the
// original node.
// The number of concurrent lock table migration related requests (eg. barrier,
// ScanInterleavedIntents) to run, as a multiplier of the number of nodes. If
// this cluster has 15 nodes, 15 * concurrentMigrateLockTableRequests requests
// will be executed at once.
//
// TODO(bilal): Add logic to make this concurrency limit a per-leaseholder limit
// as opposed to a cluster-wide limit. That way, we could limit
// migrateLockTableRequests to 1 per leaseholder as opposed to 4 for the entire
// cluster, avoiding the case where all 4 ranges at a time could have the same node
// as their leaseholder.
// Note that some of the requests (eg. ScanInterleavedIntents) do throttling on
// the receiving end, to reduce the chances of a large number of requests
// being directed at a few nodes.
const concurrentMigrateLockTableRequests = 4

// The maximum number of times to retry a migrateLockTableRequest before failing
Expand Down Expand Up @@ -87,7 +85,7 @@ type migrateLockTablePool struct {
db *kv.DB
clock *hlc.Clock
done chan bool
status [concurrentMigrateLockTableRequests]int64
status []int64
finished uint64

mu struct {
Expand Down Expand Up @@ -334,7 +332,7 @@ func (m *migrateLockTablePool) runStatusLogger(ctx context.Context) {
select {
case <-ticker.C:
var ranges strings.Builder
for i := 0; i < concurrentMigrateLockTableRequests; i++ {
for i := 0; i < len(m.status); i++ {
rangeID := atomic.LoadInt64(&m.status[i])
if rangeID == 0 {
continue
Expand Down Expand Up @@ -384,15 +382,18 @@ func runSeparatedIntentsMigration(
db *kv.DB,
ri rangeIterator,
ir intentResolver,
numNodes int,
) error {
concurrentRequests := concurrentMigrateLockTableRequests * numNodes
workerPool := migrateLockTablePool{
requests: make(chan migrateLockTableRequest, concurrentMigrateLockTableRequests),
requests: make(chan migrateLockTableRequest, concurrentRequests),
stopper: stopper,
db: db,
ir: ir,
clock: clock,
status: make([]int64, concurrentRequests),
}
migratedRanges, err := workerPool.runMigrateRequestsForRanges(ctx, ri, concurrentMigrateLockTableRequests)
migratedRanges, err := workerPool.runMigrateRequestsForRanges(ctx, ri, concurrentRequests)
if err != nil {
return err
}
Expand All @@ -416,7 +417,11 @@ func separatedIntentsMigration(
})
ri := kvcoord.NewRangeIterator(deps.DistSender)

return runSeparatedIntentsMigration(ctx, deps.DB.Clock(), deps.Stopper, deps.DB, ri, ir)
numNodes, err := deps.Cluster.NumNodes(ctx)
if err != nil {
return err
}
return runSeparatedIntentsMigration(ctx, deps.DB.Clock(), deps.Stopper, deps.DB, ri, ir, numNodes)
}

func postSeparatedIntentsMigration(
Expand Down
2 changes: 1 addition & 1 deletion pkg/migration/migrations/separated_intents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func TestRunSeparatedIntentsMigration(t *testing.T) {
errorPerNCalls, err = strconv.Atoi(d.Input)
require.NoError(t, err)
case "run-migration":
err := runSeparatedIntentsMigration(ctx, hlcClock, stopper, db, ri, ir)
err := runSeparatedIntentsMigration(ctx, hlcClock, stopper, db, ri, ir, 1)
if err == nil {
return "ok"
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/migration/system_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ import (
// Cluster abstracts a physical KV cluster and can be utilized by a long-running
// migration.
type Cluster interface {
// NumNodes returns the number of nodes in the cluster. This is merely a
// convenience method and is not meant to be used to infer cluster stability;
// for that, use UntilClusterStable.
NumNodes(ctx context.Context) (int, error)

// ForEveryNode is a short hand to execute the given closure (named by the
// informational parameter op) against every node in the cluster at a given
// point in time. Given it's possible for nodes to join or leave the cluster
Expand Down