Skip to content

Commit

Permalink
Merge #129640
Browse files Browse the repository at this point in the history
129640: kvserver: deflake, fix, speed up TestTenantRateLimiter r=tbg a=tbg

This test would occasionally (rarely) hang for 120s and then fail with
an opaque AmbiguousResultError in case that some of the requests the
test doesn't expect to block end up doing so.

I'm fairly sure this test can sometimes deadlock because enough extra
writes sneak in to cause the test's requests to block.

Seeing how this is a tarpit test (once you start looking your work day
is consumed), I addressed the following problems:

- it claimed to verify metrics, but the regexp had rotted
  and so it wasn't verifying anything.
- it ran into throttling in the client-side tenant rate limiter,
  so the test took 10s-15s. Now it runs in ~1s.
- it also did >3k requests, so it was quite expensive. Now
  it does a couple hundred bigger ones, which should be a
  smidge cheaper.
- it was flaky due to trying to make very strong claims about
  when requests would block, despite dozens of background requests
  that are hard to avoid sneaking in on every run.
- it would be difficult to debug when hanging due to unexpected
  blocking.

Now it's a somewhat weaker but still meaningful test that going forward
we are paying a much smaller tax for. Should it block again the test
will fail gracefully; it will still be difficult to root cause but
it is also now wholly unexpected that this should ever occur.

See: #129612 (comment)

Fixes #129612.

Release note: None
Epic: None

Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
craig[bot] and tbg committed Sep 6, 2024
2 parents 50c5280 + ba875d5 commit 9b76aad
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 49 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ go_test(
"//pkg/kv/kvserver/tscache",
"//pkg/kv/kvserver/txnwait",
"//pkg/kv/kvserver/uncertainty",
"//pkg/multitenant",
"//pkg/multitenant/tenantcapabilities/tenantcapabilitiesauthorizer",
"//pkg/raft",
"//pkg/raft/confchange",
Expand Down
165 changes: 124 additions & 41 deletions pkg/kv/kvserver/client_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"regexp"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"

Expand All @@ -31,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/tenantrate"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
Expand All @@ -42,6 +44,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -168,12 +171,29 @@ func TestTenantRateLimiter(t *testing.T) {
t0 := time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)
timeSource := timeutil.NewManualTime(t0)

// This test shouldn't take forever. If we're going to fail, better to
// do it in minutes than in an hour.
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
// We're testing the server-side tenant rate limiter, but there is also a tenant-side one.
// That one actually throttles too in this test (making it take 10+s) unless we work around that.
ctx = multitenant.WithTenantCostControlExemption(ctx)

var numAcquired atomic.Int32
acqFunc := func(
ctx context.Context, poolName string, r quotapool.Request, start time.Time,
) {
numAcquired.Add(1)
}
s, sqlDB, db := serverutils.StartServer(t, base.TestServerArgs{
DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TenantRateKnobs: tenantrate.TestingKnobs{
TimeSource: timeSource,
QuotaPoolOptions: []quotapool.Option{
quotapool.WithTimeSource(timeSource),
quotapool.OnAcquisition(acqFunc),
},
},
},
KeyVisualizer: &keyvisualizer.TestingKnobs{SkipJobBootstrap: true},
Expand All @@ -186,7 +206,6 @@ func TestTenantRateLimiter(t *testing.T) {
},
},
})
ctx := context.Background()
tenantID := serverutils.TestTenantID()
ts, err := s.TenantController().StartTenant(ctx, base.TestTenantArgs{
TenantID: tenantID,
Expand Down Expand Up @@ -230,54 +249,120 @@ func TestTenantRateLimiter(t *testing.T) {
cfg := tenantrate.ConfigFromSettings(&s.ClusterSettings().SV)

// We don't know the exact size of the write, but we can set lower and upper
// bounds.
writeCostLower := cfg.WriteBatchUnits + cfg.WriteRequestUnits
writeCostUpper := cfg.WriteBatchUnits + cfg.WriteRequestUnits + float64(32)*cfg.WriteUnitsPerByte
tolerance := 50.0 // Leave space for a couple of other background requests.
// burstWrites is a number of writes that don't exceed the burst limit.
burstWrites := int((cfg.Burst - tolerance) / writeCostUpper)
// tooManyWrites is a number of writes which definitely exceed the burst
// limit.
tooManyWrites := int(cfg.Burst/writeCostLower) + 2

// This test shouldn't take forever. If we're going to fail, better to
// do it in minutes than in an hour.
ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
// bounds for a "small" (up to 32 bytes) request.

// If we put at least that many bytes into a single put, it should consume at
// least one percent of the configured burst, meaning if we send 100 such requests
// we should expect blocking. (We may block earlier due to background writes
// sneaking in).
//
// The below is based on the equation
//
// cost = WriteBatchUnits + WriteRequestUnits + bytes * WriteUnitsPerByte.
//
// and solving for the case in which cost is a percent of Burst.
numBytesForAtLeastOnePercentOfBurst := int((cfg.Burst/100 - cfg.WriteBatchUnits - cfg.WriteRequestUnits) / cfg.WriteUnitsPerByte)
require.NotZero(t, numBytesForAtLeastOnePercentOfBurst)
t.Logf("bytes for one percent of burst: %d", numBytesForAtLeastOnePercentOfBurst)
atLeastOnePercentValue := strings.Repeat("x", numBytesForAtLeastOnePercentOfBurst)

// Spawn a helper that will detect if the rate limiter is blocking us.
// This prevents cases where the test would stall and fail opaquely instead
// of making it clear that writes on the main goroutine blocked unexpectedly.
watcher := func(ctx context.Context, t *testing.T, msg string) (_ context.Context, cancel func()) {
t.Helper()
t.Logf("testing: %v", msg)
ctx, cancel = context.WithCancel(ctx)
go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Second):
t.Logf("total acquisitions: %d", numAcquired.Load())
}
if !assert.Len(t, timeSource.Timers(), 0, msg) {
cancel()
}
}
}()
return ctx, cancel
}

// Make sure that writes to the system tenant don't block, even if we
// definitely exceed the burst rate.
for i := 0; i < tooManyWrites; i++ {
require.NoError(t, db.Put(ctx, mkKey(), 0))
{
ctx, cancel := watcher(ctx, t, "system tenant should not be rate limited")
defer cancel()

for i := 0; i < 100; i++ {
require.NoError(t, db.Put(ctx, mkKey(), atLeastOnePercentValue))
}
cancel()
}
timeSource.Advance(time.Second)
// Now ensure that in the same instant the write QPS limit does affect the
// tenant. First issue requests that can happen without blocking.
for i := 0; i < burstWrites; i++ {
require.NoError(t, ts.DB().Put(ctx, mkKey(), 0))
// tenant. First issue a handful of small requests, which should not block
// as we can send ~100 larger requests before running out of burst budget.
//
// In the past, this test was trying to get very close to cfg.Burst without
// blocking, but that is quite brittle because there are dozens of background
// writes that sneak in during an average test run. So we are now intentionally
// staying very far from running out of burst.
{
ctx, cancel := watcher(ctx, t, "first writes should not experience blocking")
defer cancel()

for i := 0; i < 5; i++ {
require.NoError(t, ts.DB().Put(ctx, mkKey(), 0))
}
cancel()
}
// Attempt to issue another request, make sure that it gets blocked by
// observing a timer.
// Now intentionally break through the burst and make sure this blocks.
// We observe blocking by noticing that a timer was created via the custom
// timeSource which we only handed to the tenant rate limiter.
t.Logf("testing: requests should eventually block")
errCh := make(chan error, 1)
// doneCh is closed once we've verified blocking and have unblocked.
// This prevents the additional writes from potentially running into
// blocking again.
doneCh := make(chan struct{})
go func() {
// Issue enough requests so that one has to block.
for i := burstWrites; i < tooManyWrites; i++ {
if err := ts.DB().Put(ctx, mkKey(), 0); err != nil {
ev := log.Every(100 * time.Millisecond)
for i := 0; i < 100; i++ {
if ev.ShouldLog() {
t.Logf("put %d", i+1)
}
if err := ts.DB().Put(ctx, mkKey(), atLeastOnePercentValue); err != nil {
errCh <- err
return
}
select {
default:
case <-doneCh:
errCh <- nil
return
}
}
errCh <- nil
t.Error("never blocked")
errCh <- errors.New("never blocked")
}()

testutils.SucceedsSoon(t, func() error {
timers := timeSource.Timers()
if len(timers) != 1 {
return errors.Errorf("seeing %d timers: %v", len(timers), timers)
if len(timeSource.Timers()) == 0 {
return errors.Errorf("not seeing any timers")
}
return nil
})
t.Log("blocking confirmed")

// Allow the blocked request to proceed.
close(doneCh) // close first so that goroutine terminates once unblocked
timeSource.Advance(time.Second)
require.NoError(t, <-errCh)

t.Log("checking metrics")
// Create some tooling to read and verify metrics off of the prometheus
// endpoint.
runner.Exec(t, `SET CLUSTER SETTING server.child_metrics.enabled = true`)
Expand All @@ -292,30 +377,28 @@ func TestTenantRateLimiter(t *testing.T) {
return string(read)
}

// Allow the blocked request to proceed.
timeSource.Advance(time.Second)
require.NoError(t, <-errCh)

// Ensure that the metric for the admitted requests reflects the number of
// admitted requests.
// TODO(radu): this is fragile because a background write could sneak in and
// the count wouldn't match exactly.
// admitted requests. We run only shallow checks here due to background writes.
m := getMetrics()
lines := strings.Split(m, "\n")
tenantMetricStr := fmt.Sprintf(`kv_tenant_rate_limit_write_requests_admitted{store="1",tenant_id="%d"}`, tenantID.ToUint64())
tenantMetricStr := fmt.Sprintf(`kv_tenant_rate_limit_write_requests_admitted{store="1",node_id="1",tenant_id="%d"}`, tenantID.ToUint64())
re := regexp.MustCompile(tenantMetricStr + ` (\d*)`)
var matched bool
for _, line := range lines {
match := re.FindStringSubmatch(line)
if match != nil {
matched = true
admittedMetricVal, err := strconv.Atoi(match[1])
require.NoError(t, err)
require.GreaterOrEqual(t, admittedMetricVal, tooManyWrites)
// Allow a tolerance for other requests performed while starting the
// tenant server.
require.Less(t, admittedMetricVal, tooManyWrites+400)
// The background chatter alone tends to put north of 100 writes in.
// But let's be conservative and not rely on that. Our blocking writes should
// get at least 50 in (we know 100 result in blocking). We can't and shouldn't
// add tighter checks here because they're a nightmare to debug should they fail.
require.GreaterOrEqual(t, admittedMetricVal, 50)
break
}
}
require.True(t, matched, "did not match %s:\n\n%s", tenantMetricStr, m)
}

// Test that KV requests made by a tenant get a context annotated with the tenant ID.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14039,7 +14039,7 @@ func TestReplicaRateLimit(t *testing.T) {
cfg.TestingKnobs.DisableMergeWaitForReplicasInit = true
// Use time travel to control the rate limiter in this test. Set authorizer to
// engage the rate limiter, overriding the default allow-all policy in tests.
cfg.TestingKnobs.TenantRateKnobs.TimeSource = tc.manualClock
cfg.TestingKnobs.TenantRateKnobs.QuotaPoolOptions = []quotapool.Option{quotapool.WithTimeSource(tc.manualClock)}
cfg.TestingKnobs.TenantRateKnobs.Authorizer = tenantcapabilitiesauthorizer.New(cfg.Settings, nil)
tc.StartWithStoreConfig(ctx, t, stopper, cfg)

Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/tenantrate/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ go_test(
"//pkg/testutils/metrictestutils",
"//pkg/util/leaktest",
"//pkg/util/metric",
"//pkg/util/quotapool",
"//pkg/util/stop",
"//pkg/util/timeutil",
"@com_github_cockroachdb_datadriven//:datadriven",
Expand Down
7 changes: 2 additions & 5 deletions pkg/kv/kvserver/tenantrate/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

// TestingKnobs configures a LimiterFactory for testing.
type TestingKnobs struct {
TimeSource timeutil.TimeSource
QuotaPoolOptions []quotapool.Option

// Authorizer, if set, replaces the authorizer in the RPCContext.
Authorizer tenantcapabilities.Authorizer
Expand Down Expand Up @@ -95,9 +94,7 @@ func (rl *LimiterFactory) GetTenant(
rcLim, ok := rl.mu.tenants[tenantID]
if !ok {
var options []quotapool.Option
if rl.knobs.TimeSource != nil {
options = append(options, quotapool.WithTimeSource(rl.knobs.TimeSource))
}
options = append(options, rl.knobs.QuotaPoolOptions...)
if closer != nil {
options = append(options, quotapool.WithCloser(closer))
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/tenantrate/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/metrictestutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/datadriven"
Expand All @@ -52,7 +53,7 @@ func TestCloser(t *testing.T) {
start := timeutil.Now()
timeSource := timeutil.NewManualTime(start)
factory := tenantrate.NewLimiterFactory(&st.SV, &tenantrate.TestingKnobs{
TimeSource: timeSource,
QuotaPoolOptions: []quotapool.Option{quotapool.WithTimeSource(timeSource)},
}, fakeAuthorizer{})
tenant := roachpb.MustMakeTenantID(2)
closer := make(chan struct{})
Expand Down Expand Up @@ -220,7 +221,7 @@ func (ts *testState) init(t *testing.T, d *datadriven.TestData) string {

parseSettings(t, d, &ts.config, ts.capabilities)
ts.rl = tenantrate.NewLimiterFactory(&ts.settings.SV, &tenantrate.TestingKnobs{
TimeSource: ts.clock,
QuotaPoolOptions: []quotapool.Option{quotapool.WithTimeSource(ts.clock)},
}, ts)
ts.rl.UpdateConfig(ts.config)
ts.m = metric.NewRegistry()
Expand Down

0 comments on commit 9b76aad

Please sign in to comment.