diff --git a/pkg/kv/kvprober/BUILD.bazel b/pkg/kv/kvprober/BUILD.bazel index 273a61e67dfe..40961688d8fc 100644 --- a/pkg/kv/kvprober/BUILD.bazel +++ b/pkg/kv/kvprober/BUILD.bazel @@ -32,6 +32,7 @@ go_test( "kvprober_integration_test.go", "kvprober_test.go", "main_test.go", + "planner_test.go", ], embed = [":kvprober"], deps = [ @@ -52,6 +53,7 @@ go_test( "//pkg/util/log", "//pkg/util/randutil", "//pkg/util/syncutil", + "//pkg/util/timeutil", "//pkg/util/tracing", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", diff --git a/pkg/kv/kvprober/helpers_test.go b/pkg/kv/kvprober/helpers_test.go index 241d08c9c70d..24b63f048884 100644 --- a/pkg/kv/kvprober/helpers_test.go +++ b/pkg/kv/kvprober/helpers_test.go @@ -10,7 +10,12 @@ package kvprober -import "context" +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/settings/cluster" +) // Below are exported to enable testing from kvprober_test. @@ -27,3 +32,9 @@ func (p *Prober) Probe(ctx context.Context, db dbGet) { func (p *Prober) PlannerNext(ctx context.Context) (Step, error) { return p.planner.next(ctx) } + +func (p *Prober) SetPlanningRateLimit(d time.Duration) { + p.planner.(*meta2Planner).getRateLimit = func(settings *cluster.Settings) time.Duration { + return d + } +} diff --git a/pkg/kv/kvprober/kvprober.go b/pkg/kv/kvprober/kvprober.go index aa7680065399..c8a126d22939 100644 --- a/pkg/kv/kvprober/kvprober.go +++ b/pkg/kv/kvprober/kvprober.go @@ -136,19 +136,23 @@ func (p *Prober) Start(ctx context.Context, stopper *stop.Stopper) error { ctx, cancel := stopper.WithCancelOnQuiesce(ctx) defer cancel() - ticker := time.NewTicker(withJitter(readInterval.Get(&p.settings.SV), rand.Int63n)) - defer ticker.Stop() + d := func() time.Duration { + return withJitter(readInterval.Get(&p.settings.SV), rand.Int63n) + } + t := timeutil.NewTimer() + t.Reset(d()) + defer t.Stop() for { select { - case <-ticker.C: + case <-t.C: + t.Read = true + // Jitter added to de-synchronize different nodes' probe loops. + t.Reset(d()) case <-stopper.ShouldQuiesce(): return } - // Jitter added to de-synchronize different nodes' probe loops. - ticker.Reset(withJitter(readInterval.Get(&p.settings.SV), rand.Int63n)) - p.probe(ctx, p.db) } }) diff --git a/pkg/kv/kvprober/kvprober_integration_test.go b/pkg/kv/kvprober/kvprober_integration_test.go index f74b2758726a..4341fd24216e 100644 --- a/pkg/kv/kvprober/kvprober_integration_test.go +++ b/pkg/kv/kvprober/kvprober_integration_test.go @@ -221,6 +221,9 @@ func initTestProber( // Given small test cluster, this better exercises the planning logic. kvprober.NumStepsToPlanAtOnce.Override(&s.ClusterSettings().SV, 10) + // Want these tests to run as fast as possible; see planner_test.go for a + // unit test of the rate limiting. + p.SetPlanningRateLimit(0) return s, sqlDB, p, func() { s.Stopper().Stop(context.Background()) diff --git a/pkg/kv/kvprober/planner.go b/pkg/kv/kvprober/planner.go index effd2fdd89c5..3693ddb9a5c0 100644 --- a/pkg/kv/kvprober/planner.go +++ b/pkg/kv/kvprober/planner.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/contextutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) @@ -53,8 +54,25 @@ type meta2Planner struct { // cursor points to a key in meta2 at which scanning should resume when new // plans are needed. cursor roachpb.Key - // meta2Planner makes plans of size NumPrefetchedPlan as per below. + // meta2Planner makes plans of size numStepsToPlanAtOnce as per below. plan []Step + // lastPlanTime records the last time the meta2Planner made a plan of size + // numStepsToPlanAtOnce; this is recorded in order to implement a rate limit. + // + // Note that crashes clear this field, so the rate limit is not enforced in + // case of a crash loop. + lastPlanTime time.Time + + // Swappable for testing. + now func() time.Time + getRateLimit func(settings *cluster.Settings) time.Duration + getNMeta2KVs func( + ctx context.Context, + db dbScan, + n int64, + cursor roachpb.Key, + timeout time.Duration) ([]kv.KeyValue, roachpb.Key, error) + meta2KVsToPlan func(kvs []kv.KeyValue) ([]Step, error) } func newMeta2Planner(db *kv.DB, settings *cluster.Settings) *meta2Planner { @@ -62,6 +80,14 @@ func newMeta2Planner(db *kv.DB, settings *cluster.Settings) *meta2Planner { db: db, settings: settings, cursor: keys.Meta2Prefix, + // At kvprober start time, this field is set to the unix epoch, implying + // that planning will be allowed on the first call to next no matter what. + // After that, the field will be set correctly. + lastPlanTime: timeutil.Unix(0, 0), + now: timeutil.Now, + getRateLimit: getRateLimitImpl, + getNMeta2KVs: getNMeta2KVsImpl, + meta2KVsToPlan: meta2KVsToPlanImpl, } } @@ -87,7 +113,7 @@ func newMeta2Planner(db *kv.DB, settings *cluster.Settings) *meta2Planner { // Note that though we scan meta2 here, we also randomize the order of // ranges in the plan. This is avoid all nodes probing the same ranges at // the same time. Jitter is also added to the sleep between probe time -// to de-synchronize different nodes' probe loops. +// to de-synchronize different nodes' .probe loops. // // What about resource usage? // @@ -104,22 +130,31 @@ func newMeta2Planner(db *kv.DB, settings *cluster.Settings) *meta2Planner { // kv.prober.planner.n_probes_at_a_time cluster setting. // // CPU: -// - Again scales with the the kv.prober.planner.n_probes_at_a_time cluster +// - Again scales with the kv.prober.planner.n_probes_at_a_time cluster // setting. Note the proto unmarshalling. We also shuffle a slice of size // kv.prober.planner.n_probes_at_a_time. If the setting is set to a high // number, we pay a higher CPU cost less often; if it's set to a low number, // we pay a smaller CPU cost more often. func (p *meta2Planner) next(ctx context.Context) (Step, error) { if len(p.plan) == 0 { + // Protect CRDB from planning executing too often, due to either issues + // with CRDB (meta2 unavailability) or bugs in kvprober. + timeSinceLastPlan := p.now().Sub(p.lastPlanTime) // Since(p.lastPlanTime) + if limit := p.getRateLimit(p.settings); timeSinceLastPlan < limit { + return Step{}, errors.Newf("planner rate limit hit: "+ + "timSinceLastPlan=%v, limit=%v", timeSinceLastPlan, limit) + } + p.lastPlanTime = p.now() + timeout := scanMeta2Timeout.Get(&p.settings.SV) - kvs, cursor, err := getNMeta2KVs( + kvs, cursor, err := p.getNMeta2KVs( ctx, p.db, numStepsToPlanAtOnce.Get(&p.settings.SV), p.cursor, timeout) if err != nil { return Step{}, errors.Wrapf(err, "failed to get meta2 rows") } p.cursor = cursor - plan, err := meta2KVsToPlan(kvs) + plan, err := p.meta2KVsToPlan(kvs) if err != nil { return Step{}, errors.Wrapf(err, "failed to make plan from meta2 rows") } @@ -138,11 +173,28 @@ func (p *meta2Planner) next(ctx context.Context) (Step, error) { return step, nil } +// Consider the following configuration: +// +// 1. Read probes are sent every 1s. +// 2. Planning is done 60 steps (ranges) at a time. +// +// In the happy path, planning is done once a minute. +// +// The rate limit calculation below implies that planning can be done max once +// evey 30 seconds (since 60s / 2 -> 30s). +func getRateLimitImpl(settings *cluster.Settings) time.Duration { + sv := &settings.SV + const happyPathIntervalToRateLimitIntervalRatio = 2 + return time.Duration( + readInterval.Get(sv).Nanoseconds()* + numStepsToPlanAtOnce.Get(sv)/happyPathIntervalToRateLimitIntervalRatio) * time.Nanosecond +} + type dbScan interface { Scan(ctx context.Context, begin, end interface{}, maxRows int64) ([]kv.KeyValue, error) } -func getNMeta2KVs( +func getNMeta2KVsImpl( ctx context.Context, db dbScan, n int64, cursor roachpb.Key, timeout time.Duration, ) ([]kv.KeyValue, roachpb.Key, error) { var kvs []kv.KeyValue @@ -175,7 +227,7 @@ func getNMeta2KVs( return kvs, cursor, nil } -func meta2KVsToPlan(kvs []kv.KeyValue) ([]Step, error) { +func meta2KVsToPlanImpl(kvs []kv.KeyValue) ([]Step, error) { plans := make([]Step, len(kvs)) var rangeDesc roachpb.RangeDescriptor diff --git a/pkg/kv/kvprober/planner_test.go b/pkg/kv/kvprober/planner_test.go new file mode 100644 index 000000000000..40384efa3804 --- /dev/null +++ b/pkg/kv/kvprober/planner_test.go @@ -0,0 +1,125 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvprober + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +func TestPlannerEnforcesRateLimit(t *testing.T) { + p := newMeta2Planner(nil, cluster.MakeTestingClusterSettings()) + p.getRateLimit = func(settings *cluster.Settings) time.Duration { + return 1 * time.Second + } + + now := timeutil.Now() + fakeNow := func() time.Time { + return now + } + p.now = fakeNow + + p.getNMeta2KVs = func(context.Context, dbScan, int64, roachpb.Key, time.Duration) (values []kv.KeyValue, keys roachpb.Key, e error) { + return nil, nil, nil + } + p.meta2KVsToPlan = func([]kv.KeyValue) (steps []Step, e error) { + return []Step{{ + RangeID: 3, + }}, nil + } + + // Rate limit not hit since first call to next. + ctx := context.Background() + _, err := p.next(ctx) + require.NoError(t, err) + + // Rate limit hit since time not moved forward. + _, err = p.next(ctx) + require.Error(t, err) + require.Regexp(t, "planner rate limit hit", err.Error()) + + // Rate limit not hit since time moved forward enough. + now = now.Add(2 * time.Second) + _, err = p.next(ctx) + require.NoError(t, err) + + // Rate limit hit since time not moved forward enough. + now = now.Add(600 * time.Millisecond) + _, err = p.next(ctx) + require.Error(t, err) + require.Regexp(t, "planner rate limit hit", err.Error()) + + // Rate limit not hit since time moved forward enough. 600ms + 600ms + // is enough wait time to not hit the rate limit. + now = now.Add(600 * time.Millisecond) + _, err = p.next(ctx) + require.NoError(t, err) + + // Rate limit hit since time not moved forward enough. + now = now.Add(400 * time.Millisecond) + _, err = p.next(ctx) + require.Error(t, err) + require.Regexp(t, "planner rate limit hit", err.Error()) + + // Rate limit hit since time not moved forward enough. 400ms + 400ms + // is not enough wait time to not hit the rate limit. + now = now.Add(400 * time.Millisecond) + _, err = p.next(ctx) + require.Error(t, err) + require.Regexp(t, "planner rate limit hit", err.Error()) + + // Rate limit not hit since time moved forward enough. 400ms + 400ms + + // 400ms is enough wait time to not hit the rate limit. + now = now.Add(400 * time.Millisecond) + _, err = p.next(ctx) + require.NoError(t, err) + + // Whether planning succeeds or fails shouldn't affect the rate limiting! + p.meta2KVsToPlan = func([]kv.KeyValue) (steps []Step, e error) { + return nil, errors.New("boom") + } + + // Rate limit hit since time not moved forward enough. + _, err = p.next(ctx) + require.Error(t, err) + require.Regexp(t, "planner rate limit hit", err.Error()) + + // Rate limit hit since time not moved forward enough. + now = now.Add(600 * time.Millisecond) + _, err = p.next(ctx) + require.Error(t, err) + require.Regexp(t, "planner rate limit hit", err.Error()) + + // Rate limit not hit since time moved forward enough. 600ms + 600ms + // is enough wait time to not hit the rate limit. + now = now.Add(600 * time.Millisecond) + _, err = p.next(ctx) + require.Error(t, err) + require.Regexp(t, "boom", err.Error()) // plan failure instead of rate limit! +} + +func TestGetRateLimit(t *testing.T) { + s := cluster.MakeTestingClusterSettings() + + readInterval.Override(&s.SV, time.Second) + numStepsToPlanAtOnce.Override(&s.SV, 60) + + got := getRateLimitImpl(s) + require.Equal(t, 30*time.Second, got) +}