Skip to content

Commit

Permalink
rangefeed: disable rangefeed memory budgets dynamically
Browse files Browse the repository at this point in the history
Previously memory budgets could only be disabled on new
rangefeeds where processor is not already running. Existing
rangefeeds will continue to limit used memory.
In case budgets needs to be urgently disabled on a big cluster
you'll have to restart all system rangefeeds which means you
likely end up restarting all nodes. This is disruptive and
should be avoided.
This PR adds an atomic flag that existing budgets could check
and stop allocation checks if it is reset to false.

Release note (ops change): Rangefeed memory budgets could be
disabled on the fly when cluster setting is changed without
the need to restart the feed.
  • Loading branch information
aliher1911 committed Apr 1, 2022
1 parent 3272fce commit 0085eeb
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 31 deletions.
23 changes: 19 additions & 4 deletions pkg/kv/kvserver/rangefeed/budget.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,21 @@ type FeedBudget struct {
limit int64
// Channel to notify that memory was returned to the budget.
replenishC chan interface{}
// Budget cancellation request
// Budget cancellation request.
stopC chan interface{}
// Cluster settings to be able to check RangefeedBudgetsEnabled on the fly.
// When budgets are disabled, all new messages should pass through, all
// previous allocations should still be returned as memory pool is shared with
// SQL.
settings *settings.Values

closed sync.Once
}

// NewFeedBudget creates a FeedBudget to be used with RangeFeed. If nil account
// is passed, function will return nil which is safe to use with RangeFeed as
// it effectively disables memory accounting for that feed.
func NewFeedBudget(budget *mon.BoundAccount, limit int64) *FeedBudget {
func NewFeedBudget(budget *mon.BoundAccount, limit int64, settings *settings.Values) *FeedBudget {
if budget == nil {
return nil
}
Expand All @@ -142,6 +147,7 @@ func NewFeedBudget(budget *mon.BoundAccount, limit int64) *FeedBudget {
replenishC: make(chan interface{}, 1),
stopC: make(chan interface{}),
limit: limit,
settings: settings,
}
f.mu.memBudget = budget
return f
Expand All @@ -151,6 +157,9 @@ func NewFeedBudget(budget *mon.BoundAccount, limit int64) *FeedBudget {
// returns error immediately.
// Returned allocation has its use counter set to 1.
func (f *FeedBudget) TryGet(ctx context.Context, amount int64) (*SharedBudgetAllocation, error) {
if !RangefeedBudgetsEnabled.Get(f.settings) {
return nil, nil
}
f.mu.Lock()
defer f.mu.Unlock()
if f.mu.closed {
Expand Down Expand Up @@ -262,6 +271,8 @@ type BudgetFactory struct {
feedBytesMon *mon.BytesMonitor
systemFeedBytesMon *mon.BytesMonitor

settings *settings.Values

metrics *FeedBudgetPoolMetrics
}

Expand All @@ -274,6 +285,7 @@ type BudgetFactoryConfig struct {
adjustLimit func(int64) int64
totalRangeReedBudget int64
histogramWindowInterval time.Duration
settings *settings.Values
}

func (b BudgetFactoryConfig) empty() bool {
Expand All @@ -288,6 +300,7 @@ func CreateBudgetFactoryConfig(
memoryPoolSize int64,
histogramWindowInterval time.Duration,
adjustLimit func(int64) int64,
settings *settings.Values,
) BudgetFactoryConfig {
if rootMon == nil || !useBudgets {
return BudgetFactoryConfig{}
Expand All @@ -300,6 +313,7 @@ func CreateBudgetFactoryConfig(
adjustLimit: adjustLimit,
totalRangeReedBudget: totalRangeReedBudget,
histogramWindowInterval: histogramWindowInterval,
settings: settings,
}
}

Expand Down Expand Up @@ -328,6 +342,7 @@ func NewBudgetFactory(ctx context.Context, config BudgetFactoryConfig) *BudgetFa
adjustLimit: config.adjustLimit,
feedBytesMon: rangeFeedPoolMonitor,
systemFeedBytesMon: systemRangeMonitor,
settings: config.settings,
metrics: metrics,
}
}
Expand Down Expand Up @@ -356,10 +371,10 @@ func (f *BudgetFactory) CreateBudget(key roachpb.RKey) *FeedBudget {
// We use any table with reserved ID in system tenant as system case.
if key.Less(roachpb.RKey(keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID + 1))) {
acc := f.systemFeedBytesMon.MakeBoundAccount()
return NewFeedBudget(&acc, 0)
return NewFeedBudget(&acc, 0, f.settings)
}
acc := f.feedBytesMon.MakeBoundAccount()
return NewFeedBudget(&acc, rangeLimit)
return NewFeedBudget(&acc, rangeLimit, f.settings)
}

// Metrics exposes Metrics for BudgetFactory so that they could be registered
Expand Down
55 changes: 50 additions & 5 deletions pkg/kv/kvserver/rangefeed/budget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ func TestFeedBudget(t *testing.T) {
m.Start(context.Background(), nil, mon.MakeStandaloneBudget(poolSize))
b := m.MakeBoundAccount()

f := NewFeedBudget(&b, budgetSize)
s := cluster.MakeTestingClusterSettings()
f := NewFeedBudget(&b, budgetSize, &s.SV)
return f, m, &b
}
ctx := context.Background()
Expand Down Expand Up @@ -191,7 +192,7 @@ func TestBudgetFactory(t *testing.T) {
rootMon := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, s)
rootMon.Start(context.Background(), nil, mon.MakeStandaloneBudget(10000000))
bf := NewBudgetFactory(context.Background(),
CreateBudgetFactoryConfig(rootMon, 10000, time.Second*5, budgetLowThresholdFn(10000)))
CreateBudgetFactoryConfig(rootMon, 10000, time.Second*5, budgetLowThresholdFn(10000), &s.SV))

// Verify system ranges use own budget.
bSys := bf.CreateBudget(keys.MustAddr(keys.Meta1Prefix))
Expand All @@ -217,30 +218,72 @@ func TestDisableBudget(t *testing.T) {
bf := NewBudgetFactory(context.Background(),
CreateBudgetFactoryConfig(rootMon, 10000, time.Second*5, func(_ int64) int64 {
return 0
}))
}, &s.SV))

bUsr := bf.CreateBudget(keys.MustAddr(keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID + 1)))
require.Nil(t, bUsr, "Range budget when budgets are disabled.")
}

func TestDisableBudgetOnTheFly(t *testing.T) {
s := cluster.MakeTestingClusterSettings()

m := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil)
m.Start(context.Background(), nil, mon.MakeStandaloneBudget(100000))
bf := NewBudgetFactory(context.Background(),
CreateBudgetFactoryConfig(
m,
10000000,
time.Second*5,
func(l int64) int64 {
return l
},
&s.SV))

f := bf.CreateBudget(keys.MustAddr(keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID + 1)))

objectSize := int64(1000)
alloc, err := f.TryGet(context.Background(), objectSize)
require.NoError(t, err)
require.NotNil(t, alloc, "can't get budget")
// Disable budget using settings and verify that budget will stop creating new
// allocations.
RangefeedBudgetsEnabled.Override(context.Background(), &s.SV, false)
alloc2, err := f.TryGet(context.Background(), 1000)
require.NoError(t, err)
require.Nil(t, alloc2, "budget was not disabled")

// Release should not crash or cause any anomalies after budget is disabled.
alloc.Release(context.Background())
// When budget is released it keeps as much as budget increment amount
// allocated for caching purposes which we can't release until the factory
// is destroyed, but we can check that it is less than object size (because
// allocation increment is low).
require.Less(t, bf.Metrics().SharedBytesCount.Value(), objectSize,
"budget was not released")
}

func TestConfigFactory(t *testing.T) {
s := cluster.MakeTestingClusterSettings()
rootMon := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil)
rootMon.Start(context.Background(), nil, mon.MakeStandaloneBudget(10000000))

// Check provisionalFeedLimit is computed.
config := CreateBudgetFactoryConfig(rootMon, 100000, time.Second*5, budgetLowThresholdFn(10000))
config := CreateBudgetFactoryConfig(rootMon, 100000, time.Second*5, budgetLowThresholdFn(10000),
&s.SV)
require.Less(t, config.provisionalFeedLimit, int64(100000),
"provisional range limit should be lower than whole memory pool")
require.NotZerof(t, config.provisionalFeedLimit, "provisional range feed limit must not be zero")

// Check if global disable switch works.
useBudgets = false
defer func() { useBudgets = true }()
config = CreateBudgetFactoryConfig(rootMon, 100000, time.Second*5, budgetLowThresholdFn(10000))
config = CreateBudgetFactoryConfig(rootMon, 100000, time.Second*5, budgetLowThresholdFn(10000),
&s.SV)
require.True(t, config.empty(), "config not empty despite disabled factory")
}

func TestBudgetLimits(t *testing.T) {
s := cluster.MakeTestingClusterSettings()
rootMon := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil)
rootMon.Start(context.Background(), nil, mon.MakeStandaloneBudget(10000000))

Expand All @@ -256,6 +299,7 @@ func TestBudgetLimits(t *testing.T) {
},
totalRangeReedBudget: 100000,
histogramWindowInterval: time.Second * 5,
settings: &s.SV,
})

userKey := roachpb.RKey(keys.ScratchRangeMin)
Expand All @@ -272,6 +316,7 @@ func TestBudgetLimits(t *testing.T) {
},
totalRangeReedBudget: 100000,
histogramWindowInterval: time.Second * 5,
settings: &s.SV,
})
b = bf.CreateBudget(userKey)
require.Nil(t, b, "budget is disabled")
Expand Down
39 changes: 18 additions & 21 deletions pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -537,10 +538,7 @@ func TestProcessorSlowConsumer(t *testing.T) {
func TestProcessorMemoryBudgetExceeded(t *testing.T) {
defer leaktest.AfterTest(t)()

m := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil)
m.Start(context.Background(), nil, mon.MakeStandaloneBudget(40))
b := m.MakeBoundAccount()
fb := NewFeedBudget(&b, 0)
fb := newTestBudget(40)

stopper := stop.NewStopper()
var pushTxnInterval, pushTxnAge time.Duration = 0, 0 // disable
Expand Down Expand Up @@ -608,10 +606,7 @@ func TestProcessorMemoryBudgetExceeded(t *testing.T) {
func TestProcessorMemoryBudgetReleased(t *testing.T) {
defer leaktest.AfterTest(t)()

m := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil)
m.Start(context.Background(), nil, mon.MakeStandaloneBudget(40))
b := m.MakeBoundAccount()
fb := NewFeedBudget(&b, 0)
fb := newTestBudget(40)

stopper := stop.NewStopper()
var pushTxnInterval, pushTxnAge time.Duration = 0, 0 // disable
Expand Down Expand Up @@ -1086,10 +1081,12 @@ func TestBudgetReleaseOnProcessorStop(t *testing.T) {
// as sync events used to flush queues.
const channelCapacity = totalEvents/2 + 10

s := cluster.MakeTestingClusterSettings()
m := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil)
m.Start(context.Background(), nil, mon.MakeStandaloneBudget(math.MaxInt64))
//budgetEnabled := int32(1)
b := m.MakeBoundAccount()
fb := NewFeedBudget(&b, 0)
fb := NewFeedBudget(&b, 0, &s.SV)

stopper := stop.NewStopper()
var pushTxnInterval, pushTxnAge time.Duration = 0, 0 // disable
Expand Down Expand Up @@ -1177,10 +1174,7 @@ func TestBudgetReleaseOnLastStreamError(t *testing.T) {
// objects. Ideally it would be nice to have
const channelCapacity = totalEvents + 5

m := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil)
m.Start(context.Background(), nil, mon.MakeStandaloneBudget(math.MaxInt64))
b := m.MakeBoundAccount()
fb := NewFeedBudget(&b, 0)
fb := newTestBudget(math.MaxInt64)

stopper := stop.NewStopper()
var pushTxnInterval, pushTxnAge time.Duration = 0, 0 // disable
Expand Down Expand Up @@ -1234,6 +1228,15 @@ func TestBudgetReleaseOnLastStreamError(t *testing.T) {
requireBudgetDrainedSoon(t, p, rStream)
}

func newTestBudget(limit int64) *FeedBudget {
s := cluster.MakeTestingClusterSettings()
m := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil)
m.Start(context.Background(), nil, mon.MakeStandaloneBudget(limit))
b := m.MakeBoundAccount()
fb := NewFeedBudget(&b, 0, &s.SV)
return fb
}

// TestBudgetReleaseOnOneStreamError verifies that if one stream fails while
// other keeps running, accounting correctly releases memory budget for shared
// events.
Expand All @@ -1248,10 +1251,7 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) {
// as sync events used to flush queues.
const channelCapacity = totalEvents/2 + 10

m := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil)
m.Start(context.Background(), nil, mon.MakeStandaloneBudget(math.MaxInt64))
b := m.MakeBoundAccount()
fb := NewFeedBudget(&b, 0)
fb := newTestBudget(math.MaxInt64)

stopper := stop.NewStopper()
var pushTxnInterval, pushTxnAge time.Duration = 0, 0 // disable
Expand Down Expand Up @@ -1417,10 +1417,7 @@ func BenchmarkProcessorWithBudget(b *testing.B) {

var budget *FeedBudget
if false {
m := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil)
m.Start(context.Background(), nil, mon.MakeStandaloneBudget(math.MaxInt64))
acc := m.MakeBoundAccount()
budget = NewFeedBudget(&acc, 0)
budget = newTestBudget(math.MaxInt64)
}

stopper := stop.NewStopper()
Expand Down
3 changes: 2 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
return raftCmdLimit
}
return limit
}))
},
&st.SV))
if rangeReedBudgetFactory != nil {
registry.AddMetricStruct(rangeReedBudgetFactory.Metrics())
}
Expand Down

0 comments on commit 0085eeb

Please sign in to comment.