Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
61275: kvprober: rate limit the planner r=joshimhoff a=joshimhoff

cockroachdb#61255

**kvprober: rate limit the planner**

This commit introduces a planning rate limit. This protects CRDB from
planning executing too often, due to either issues with CRDB (e.g.
meta2 unavailability) or bugs in kvprober. When planning does execute,
kvprober scans kv.prober.planner.num_steps_to_plan_at_once rows worth
of meta2 and unmarshalls the resulting range descriptors.

Release justification: Auxiliary system that is off by default.
Release note: None.

Co-authored-by: Josh Imhoff <[email protected]>
Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
3 people committed Mar 4, 2021
2 parents 4989436 + 56c54a4 commit c0d8340
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 14 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvprober/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_test(
"kvprober_integration_test.go",
"kvprober_test.go",
"main_test.go",
"planner_test.go",
],
embed = [":kvprober"],
deps = [
Expand All @@ -52,6 +53,7 @@ go_test(
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
Expand Down
13 changes: 12 additions & 1 deletion pkg/kv/kvprober/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@

package kvprober

import "context"
import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/settings/cluster"
)

// Below are exported to enable testing from kvprober_test.

Expand All @@ -27,3 +32,9 @@ func (p *Prober) Probe(ctx context.Context, db dbGet) {
func (p *Prober) PlannerNext(ctx context.Context) (Step, error) {
return p.planner.next(ctx)
}

func (p *Prober) SetPlanningRateLimit(d time.Duration) {
p.planner.(*meta2Planner).getRateLimit = func(settings *cluster.Settings) time.Duration {
return d
}
}
16 changes: 10 additions & 6 deletions pkg/kv/kvprober/kvprober.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,19 +136,23 @@ func (p *Prober) Start(ctx context.Context, stopper *stop.Stopper) error {
ctx, cancel := stopper.WithCancelOnQuiesce(ctx)
defer cancel()

ticker := time.NewTicker(withJitter(readInterval.Get(&p.settings.SV), rand.Int63n))
defer ticker.Stop()
d := func() time.Duration {
return withJitter(readInterval.Get(&p.settings.SV), rand.Int63n)
}
t := timeutil.NewTimer()
t.Reset(d())
defer t.Stop()

for {
select {
case <-ticker.C:
case <-t.C:
t.Read = true
// Jitter added to de-synchronize different nodes' probe loops.
t.Reset(d())
case <-stopper.ShouldQuiesce():
return
}

// Jitter added to de-synchronize different nodes' probe loops.
ticker.Reset(withJitter(readInterval.Get(&p.settings.SV), rand.Int63n))

p.probe(ctx, p.db)
}
})
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvprober/kvprober_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ func initTestProber(

// Given small test cluster, this better exercises the planning logic.
kvprober.NumStepsToPlanAtOnce.Override(&s.ClusterSettings().SV, 10)
// Want these tests to run as fast as possible; see planner_test.go for a
// unit test of the rate limiting.
p.SetPlanningRateLimit(0)

return s, sqlDB, p, func() {
s.Stopper().Stop(context.Background())
Expand Down
66 changes: 59 additions & 7 deletions pkg/kv/kvprober/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -53,15 +54,40 @@ type meta2Planner struct {
// cursor points to a key in meta2 at which scanning should resume when new
// plans are needed.
cursor roachpb.Key
// meta2Planner makes plans of size NumPrefetchedPlan as per below.
// meta2Planner makes plans of size numStepsToPlanAtOnce as per below.
plan []Step
// lastPlanTime records the last time the meta2Planner made a plan of size
// numStepsToPlanAtOnce; this is recorded in order to implement a rate limit.
//
// Note that crashes clear this field, so the rate limit is not enforced in
// case of a crash loop.
lastPlanTime time.Time

// Swappable for testing.
now func() time.Time
getRateLimit func(settings *cluster.Settings) time.Duration
getNMeta2KVs func(
ctx context.Context,
db dbScan,
n int64,
cursor roachpb.Key,
timeout time.Duration) ([]kv.KeyValue, roachpb.Key, error)
meta2KVsToPlan func(kvs []kv.KeyValue) ([]Step, error)
}

func newMeta2Planner(db *kv.DB, settings *cluster.Settings) *meta2Planner {
return &meta2Planner{
db: db,
settings: settings,
cursor: keys.Meta2Prefix,
// At kvprober start time, this field is set to the unix epoch, implying
// that planning will be allowed on the first call to next no matter what.
// After that, the field will be set correctly.
lastPlanTime: timeutil.Unix(0, 0),
now: timeutil.Now,
getRateLimit: getRateLimitImpl,
getNMeta2KVs: getNMeta2KVsImpl,
meta2KVsToPlan: meta2KVsToPlanImpl,
}
}

Expand All @@ -87,7 +113,7 @@ func newMeta2Planner(db *kv.DB, settings *cluster.Settings) *meta2Planner {
// Note that though we scan meta2 here, we also randomize the order of
// ranges in the plan. This is avoid all nodes probing the same ranges at
// the same time. Jitter is also added to the sleep between probe time
// to de-synchronize different nodes' probe loops.
// to de-synchronize different nodes' .probe loops.
//
// What about resource usage?
//
Expand All @@ -104,22 +130,31 @@ func newMeta2Planner(db *kv.DB, settings *cluster.Settings) *meta2Planner {
// kv.prober.planner.n_probes_at_a_time cluster setting.
//
// CPU:
// - Again scales with the the kv.prober.planner.n_probes_at_a_time cluster
// - Again scales with the kv.prober.planner.n_probes_at_a_time cluster
// setting. Note the proto unmarshalling. We also shuffle a slice of size
// kv.prober.planner.n_probes_at_a_time. If the setting is set to a high
// number, we pay a higher CPU cost less often; if it's set to a low number,
// we pay a smaller CPU cost more often.
func (p *meta2Planner) next(ctx context.Context) (Step, error) {
if len(p.plan) == 0 {
// Protect CRDB from planning executing too often, due to either issues
// with CRDB (meta2 unavailability) or bugs in kvprober.
timeSinceLastPlan := p.now().Sub(p.lastPlanTime) // Since(p.lastPlanTime)
if limit := p.getRateLimit(p.settings); timeSinceLastPlan < limit {
return Step{}, errors.Newf("planner rate limit hit: "+
"timSinceLastPlan=%v, limit=%v", timeSinceLastPlan, limit)
}
p.lastPlanTime = p.now()

timeout := scanMeta2Timeout.Get(&p.settings.SV)
kvs, cursor, err := getNMeta2KVs(
kvs, cursor, err := p.getNMeta2KVs(
ctx, p.db, numStepsToPlanAtOnce.Get(&p.settings.SV), p.cursor, timeout)
if err != nil {
return Step{}, errors.Wrapf(err, "failed to get meta2 rows")
}
p.cursor = cursor

plan, err := meta2KVsToPlan(kvs)
plan, err := p.meta2KVsToPlan(kvs)
if err != nil {
return Step{}, errors.Wrapf(err, "failed to make plan from meta2 rows")
}
Expand All @@ -138,11 +173,28 @@ func (p *meta2Planner) next(ctx context.Context) (Step, error) {
return step, nil
}

// Consider the following configuration:
//
// 1. Read probes are sent every 1s.
// 2. Planning is done 60 steps (ranges) at a time.
//
// In the happy path, planning is done once a minute.
//
// The rate limit calculation below implies that planning can be done max once
// evey 30 seconds (since 60s / 2 -> 30s).
func getRateLimitImpl(settings *cluster.Settings) time.Duration {
sv := &settings.SV
const happyPathIntervalToRateLimitIntervalRatio = 2
return time.Duration(
readInterval.Get(sv).Nanoseconds()*
numStepsToPlanAtOnce.Get(sv)/happyPathIntervalToRateLimitIntervalRatio) * time.Nanosecond
}

type dbScan interface {
Scan(ctx context.Context, begin, end interface{}, maxRows int64) ([]kv.KeyValue, error)
}

func getNMeta2KVs(
func getNMeta2KVsImpl(
ctx context.Context, db dbScan, n int64, cursor roachpb.Key, timeout time.Duration,
) ([]kv.KeyValue, roachpb.Key, error) {
var kvs []kv.KeyValue
Expand Down Expand Up @@ -175,7 +227,7 @@ func getNMeta2KVs(
return kvs, cursor, nil
}

func meta2KVsToPlan(kvs []kv.KeyValue) ([]Step, error) {
func meta2KVsToPlanImpl(kvs []kv.KeyValue) ([]Step, error) {
plans := make([]Step, len(kvs))

var rangeDesc roachpb.RangeDescriptor
Expand Down
125 changes: 125 additions & 0 deletions pkg/kv/kvprober/planner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvprober

import (
"context"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func TestPlannerEnforcesRateLimit(t *testing.T) {
p := newMeta2Planner(nil, cluster.MakeTestingClusterSettings())
p.getRateLimit = func(settings *cluster.Settings) time.Duration {
return 1 * time.Second
}

now := timeutil.Now()
fakeNow := func() time.Time {
return now
}
p.now = fakeNow

p.getNMeta2KVs = func(context.Context, dbScan, int64, roachpb.Key, time.Duration) (values []kv.KeyValue, keys roachpb.Key, e error) {
return nil, nil, nil
}
p.meta2KVsToPlan = func([]kv.KeyValue) (steps []Step, e error) {
return []Step{{
RangeID: 3,
}}, nil
}

// Rate limit not hit since first call to next.
ctx := context.Background()
_, err := p.next(ctx)
require.NoError(t, err)

// Rate limit hit since time not moved forward.
_, err = p.next(ctx)
require.Error(t, err)
require.Regexp(t, "planner rate limit hit", err.Error())

// Rate limit not hit since time moved forward enough.
now = now.Add(2 * time.Second)
_, err = p.next(ctx)
require.NoError(t, err)

// Rate limit hit since time not moved forward enough.
now = now.Add(600 * time.Millisecond)
_, err = p.next(ctx)
require.Error(t, err)
require.Regexp(t, "planner rate limit hit", err.Error())

// Rate limit not hit since time moved forward enough. 600ms + 600ms
// is enough wait time to not hit the rate limit.
now = now.Add(600 * time.Millisecond)
_, err = p.next(ctx)
require.NoError(t, err)

// Rate limit hit since time not moved forward enough.
now = now.Add(400 * time.Millisecond)
_, err = p.next(ctx)
require.Error(t, err)
require.Regexp(t, "planner rate limit hit", err.Error())

// Rate limit hit since time not moved forward enough. 400ms + 400ms
// is not enough wait time to not hit the rate limit.
now = now.Add(400 * time.Millisecond)
_, err = p.next(ctx)
require.Error(t, err)
require.Regexp(t, "planner rate limit hit", err.Error())

// Rate limit not hit since time moved forward enough. 400ms + 400ms +
// 400ms is enough wait time to not hit the rate limit.
now = now.Add(400 * time.Millisecond)
_, err = p.next(ctx)
require.NoError(t, err)

// Whether planning succeeds or fails shouldn't affect the rate limiting!
p.meta2KVsToPlan = func([]kv.KeyValue) (steps []Step, e error) {
return nil, errors.New("boom")
}

// Rate limit hit since time not moved forward enough.
_, err = p.next(ctx)
require.Error(t, err)
require.Regexp(t, "planner rate limit hit", err.Error())

// Rate limit hit since time not moved forward enough.
now = now.Add(600 * time.Millisecond)
_, err = p.next(ctx)
require.Error(t, err)
require.Regexp(t, "planner rate limit hit", err.Error())

// Rate limit not hit since time moved forward enough. 600ms + 600ms
// is enough wait time to not hit the rate limit.
now = now.Add(600 * time.Millisecond)
_, err = p.next(ctx)
require.Error(t, err)
require.Regexp(t, "boom", err.Error()) // plan failure instead of rate limit!
}

func TestGetRateLimit(t *testing.T) {
s := cluster.MakeTestingClusterSettings()

readInterval.Override(&s.SV, time.Second)
numStepsToPlanAtOnce.Override(&s.SV, 60)

got := getRateLimitImpl(s)
require.Equal(t, 30*time.Second, got)
}

0 comments on commit c0d8340

Please sign in to comment.