diff --git a/pkg/kv/kvserver/rangefeed/budget.go b/pkg/kv/kvserver/rangefeed/budget.go index 088ffea5283e..5f4b9d915516 100644 --- a/pkg/kv/kvserver/rangefeed/budget.go +++ b/pkg/kv/kvserver/rangefeed/budget.go @@ -121,8 +121,13 @@ type FeedBudget struct { limit int64 // Channel to notify that memory was returned to the budget. replenishC chan interface{} - // Budget cancellation request + // Budget cancellation request. stopC chan interface{} + // Cluster settings to be able to check RangefeedBudgetsEnabled on the fly. + // When budgets are disabled, all new messages should pass through, all + // previous allocations should still be returned as memory pool is shared with + // SQL. + settings *settings.Values closed sync.Once } @@ -130,7 +135,7 @@ type FeedBudget struct { // NewFeedBudget creates a FeedBudget to be used with RangeFeed. If nil account // is passed, function will return nil which is safe to use with RangeFeed as // it effectively disables memory accounting for that feed. -func NewFeedBudget(budget *mon.BoundAccount, limit int64) *FeedBudget { +func NewFeedBudget(budget *mon.BoundAccount, limit int64, settings *settings.Values) *FeedBudget { if budget == nil { return nil } @@ -142,6 +147,7 @@ func NewFeedBudget(budget *mon.BoundAccount, limit int64) *FeedBudget { replenishC: make(chan interface{}, 1), stopC: make(chan interface{}), limit: limit, + settings: settings, } f.mu.memBudget = budget return f @@ -151,6 +157,9 @@ func NewFeedBudget(budget *mon.BoundAccount, limit int64) *FeedBudget { // returns error immediately. // Returned allocation has its use counter set to 1. func (f *FeedBudget) TryGet(ctx context.Context, amount int64) (*SharedBudgetAllocation, error) { + if !RangefeedBudgetsEnabled.Get(f.settings) { + return nil, nil + } f.mu.Lock() defer f.mu.Unlock() if f.mu.closed { @@ -262,6 +271,8 @@ type BudgetFactory struct { feedBytesMon *mon.BytesMonitor systemFeedBytesMon *mon.BytesMonitor + settings *settings.Values + metrics *FeedBudgetPoolMetrics } @@ -274,6 +285,7 @@ type BudgetFactoryConfig struct { adjustLimit func(int64) int64 totalRangeReedBudget int64 histogramWindowInterval time.Duration + settings *settings.Values } func (b BudgetFactoryConfig) empty() bool { @@ -288,6 +300,7 @@ func CreateBudgetFactoryConfig( memoryPoolSize int64, histogramWindowInterval time.Duration, adjustLimit func(int64) int64, + settings *settings.Values, ) BudgetFactoryConfig { if rootMon == nil || !useBudgets { return BudgetFactoryConfig{} @@ -300,6 +313,7 @@ func CreateBudgetFactoryConfig( adjustLimit: adjustLimit, totalRangeReedBudget: totalRangeReedBudget, histogramWindowInterval: histogramWindowInterval, + settings: settings, } } @@ -328,6 +342,7 @@ func NewBudgetFactory(ctx context.Context, config BudgetFactoryConfig) *BudgetFa adjustLimit: config.adjustLimit, feedBytesMon: rangeFeedPoolMonitor, systemFeedBytesMon: systemRangeMonitor, + settings: config.settings, metrics: metrics, } } @@ -356,10 +371,10 @@ func (f *BudgetFactory) CreateBudget(key roachpb.RKey) *FeedBudget { // We use any table with reserved ID in system tenant as system case. if key.Less(roachpb.RKey(keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID + 1))) { acc := f.systemFeedBytesMon.MakeBoundAccount() - return NewFeedBudget(&acc, 0) + return NewFeedBudget(&acc, 0, f.settings) } acc := f.feedBytesMon.MakeBoundAccount() - return NewFeedBudget(&acc, rangeLimit) + return NewFeedBudget(&acc, rangeLimit, f.settings) } // Metrics exposes Metrics for BudgetFactory so that they could be registered diff --git a/pkg/kv/kvserver/rangefeed/budget_test.go b/pkg/kv/kvserver/rangefeed/budget_test.go index 482add0d5df3..ffdb650a2f83 100644 --- a/pkg/kv/kvserver/rangefeed/budget_test.go +++ b/pkg/kv/kvserver/rangefeed/budget_test.go @@ -31,7 +31,8 @@ func TestFeedBudget(t *testing.T) { m.Start(context.Background(), nil, mon.MakeStandaloneBudget(poolSize)) b := m.MakeBoundAccount() - f := NewFeedBudget(&b, budgetSize) + s := cluster.MakeTestingClusterSettings() + f := NewFeedBudget(&b, budgetSize, &s.SV) return f, m, &b } ctx := context.Background() @@ -191,7 +192,7 @@ func TestBudgetFactory(t *testing.T) { rootMon := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, s) rootMon.Start(context.Background(), nil, mon.MakeStandaloneBudget(10000000)) bf := NewBudgetFactory(context.Background(), - CreateBudgetFactoryConfig(rootMon, 10000, time.Second*5, budgetLowThresholdFn(10000))) + CreateBudgetFactoryConfig(rootMon, 10000, time.Second*5, budgetLowThresholdFn(10000), &s.SV)) // Verify system ranges use own budget. bSys := bf.CreateBudget(keys.MustAddr(keys.Meta1Prefix)) @@ -217,18 +218,58 @@ func TestDisableBudget(t *testing.T) { bf := NewBudgetFactory(context.Background(), CreateBudgetFactoryConfig(rootMon, 10000, time.Second*5, func(_ int64) int64 { return 0 - })) + }, &s.SV)) bUsr := bf.CreateBudget(keys.MustAddr(keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID + 1))) require.Nil(t, bUsr, "Range budget when budgets are disabled.") } +func TestDisableBudgetOnTheFly(t *testing.T) { + s := cluster.MakeTestingClusterSettings() + + m := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil) + m.Start(context.Background(), nil, mon.MakeStandaloneBudget(100000)) + bf := NewBudgetFactory(context.Background(), + CreateBudgetFactoryConfig( + m, + 10000000, + time.Second*5, + func(l int64) int64 { + return l + }, + &s.SV)) + + f := bf.CreateBudget(keys.MustAddr(keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID + 1))) + + objectSize := int64(1000) + alloc, err := f.TryGet(context.Background(), objectSize) + require.NoError(t, err) + require.NotNil(t, alloc, "can't get budget") + // Disable budget using settings and verify that budget will stop creating new + // allocations. + RangefeedBudgetsEnabled.Override(context.Background(), &s.SV, false) + alloc2, err := f.TryGet(context.Background(), 1000) + require.NoError(t, err) + require.Nil(t, alloc2, "budget was not disabled") + + // Release should not crash or cause any anomalies after budget is disabled. + alloc.Release(context.Background()) + // When budget is released it keeps as much as budget increment amount + // allocated for caching purposes which we can't release until the factory + // is destroyed, but we can check that it is less than object size (because + // allocation increment is low). + require.Less(t, bf.Metrics().SharedBytesCount.Value(), objectSize, + "budget was not released") +} + func TestConfigFactory(t *testing.T) { + s := cluster.MakeTestingClusterSettings() rootMon := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil) rootMon.Start(context.Background(), nil, mon.MakeStandaloneBudget(10000000)) // Check provisionalFeedLimit is computed. - config := CreateBudgetFactoryConfig(rootMon, 100000, time.Second*5, budgetLowThresholdFn(10000)) + config := CreateBudgetFactoryConfig(rootMon, 100000, time.Second*5, budgetLowThresholdFn(10000), + &s.SV) require.Less(t, config.provisionalFeedLimit, int64(100000), "provisional range limit should be lower than whole memory pool") require.NotZerof(t, config.provisionalFeedLimit, "provisional range feed limit must not be zero") @@ -236,11 +277,13 @@ func TestConfigFactory(t *testing.T) { // Check if global disable switch works. useBudgets = false defer func() { useBudgets = true }() - config = CreateBudgetFactoryConfig(rootMon, 100000, time.Second*5, budgetLowThresholdFn(10000)) + config = CreateBudgetFactoryConfig(rootMon, 100000, time.Second*5, budgetLowThresholdFn(10000), + &s.SV) require.True(t, config.empty(), "config not empty despite disabled factory") } func TestBudgetLimits(t *testing.T) { + s := cluster.MakeTestingClusterSettings() rootMon := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil) rootMon.Start(context.Background(), nil, mon.MakeStandaloneBudget(10000000)) @@ -256,6 +299,7 @@ func TestBudgetLimits(t *testing.T) { }, totalRangeReedBudget: 100000, histogramWindowInterval: time.Second * 5, + settings: &s.SV, }) userKey := roachpb.RKey(keys.ScratchRangeMin) @@ -272,6 +316,7 @@ func TestBudgetLimits(t *testing.T) { }, totalRangeReedBudget: 100000, histogramWindowInterval: time.Second * 5, + settings: &s.SV, }) b = bf.CreateBudget(userKey) require.Nil(t, b, "budget is disabled") diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 22a914c6cff5..edf0720ab433 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -537,10 +538,7 @@ func TestProcessorSlowConsumer(t *testing.T) { func TestProcessorMemoryBudgetExceeded(t *testing.T) { defer leaktest.AfterTest(t)() - m := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil) - m.Start(context.Background(), nil, mon.MakeStandaloneBudget(40)) - b := m.MakeBoundAccount() - fb := NewFeedBudget(&b, 0) + fb := newTestBudget(40) stopper := stop.NewStopper() var pushTxnInterval, pushTxnAge time.Duration = 0, 0 // disable @@ -608,10 +606,7 @@ func TestProcessorMemoryBudgetExceeded(t *testing.T) { func TestProcessorMemoryBudgetReleased(t *testing.T) { defer leaktest.AfterTest(t)() - m := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil) - m.Start(context.Background(), nil, mon.MakeStandaloneBudget(40)) - b := m.MakeBoundAccount() - fb := NewFeedBudget(&b, 0) + fb := newTestBudget(40) stopper := stop.NewStopper() var pushTxnInterval, pushTxnAge time.Duration = 0, 0 // disable @@ -1086,10 +1081,12 @@ func TestBudgetReleaseOnProcessorStop(t *testing.T) { // as sync events used to flush queues. const channelCapacity = totalEvents/2 + 10 + s := cluster.MakeTestingClusterSettings() m := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil) m.Start(context.Background(), nil, mon.MakeStandaloneBudget(math.MaxInt64)) + //budgetEnabled := int32(1) b := m.MakeBoundAccount() - fb := NewFeedBudget(&b, 0) + fb := NewFeedBudget(&b, 0, &s.SV) stopper := stop.NewStopper() var pushTxnInterval, pushTxnAge time.Duration = 0, 0 // disable @@ -1177,10 +1174,7 @@ func TestBudgetReleaseOnLastStreamError(t *testing.T) { // objects. Ideally it would be nice to have const channelCapacity = totalEvents + 5 - m := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil) - m.Start(context.Background(), nil, mon.MakeStandaloneBudget(math.MaxInt64)) - b := m.MakeBoundAccount() - fb := NewFeedBudget(&b, 0) + fb := newTestBudget(math.MaxInt64) stopper := stop.NewStopper() var pushTxnInterval, pushTxnAge time.Duration = 0, 0 // disable @@ -1234,6 +1228,15 @@ func TestBudgetReleaseOnLastStreamError(t *testing.T) { requireBudgetDrainedSoon(t, p, rStream) } +func newTestBudget(limit int64) *FeedBudget { + s := cluster.MakeTestingClusterSettings() + m := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil) + m.Start(context.Background(), nil, mon.MakeStandaloneBudget(limit)) + b := m.MakeBoundAccount() + fb := NewFeedBudget(&b, 0, &s.SV) + return fb +} + // TestBudgetReleaseOnOneStreamError verifies that if one stream fails while // other keeps running, accounting correctly releases memory budget for shared // events. @@ -1248,10 +1251,7 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) { // as sync events used to flush queues. const channelCapacity = totalEvents/2 + 10 - m := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil) - m.Start(context.Background(), nil, mon.MakeStandaloneBudget(math.MaxInt64)) - b := m.MakeBoundAccount() - fb := NewFeedBudget(&b, 0) + fb := newTestBudget(math.MaxInt64) stopper := stop.NewStopper() var pushTxnInterval, pushTxnAge time.Duration = 0, 0 // disable @@ -1417,10 +1417,7 @@ func BenchmarkProcessorWithBudget(b *testing.B) { var budget *FeedBudget if false { - m := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil) - m.Start(context.Background(), nil, mon.MakeStandaloneBudget(math.MaxInt64)) - acc := m.MakeBoundAccount() - budget = NewFeedBudget(&acc, 0) + budget = newTestBudget(math.MaxInt64) } stopper := stop.NewStopper() diff --git a/pkg/server/server.go b/pkg/server/server.go index 387429c9a9c2..5591c7538e22 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -495,7 +495,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { return raftCmdLimit } return limit - })) + }, + &st.SV)) if rangeReedBudgetFactory != nil { registry.AddMetricStruct(rangeReedBudgetFactory.Metrics()) }