Skip to content

Commit

Permalink
Merge #76854
Browse files Browse the repository at this point in the history
76854: kvserver: configure and provide memory budget for rangefeeds r=erikgrinaker a=aliher1911

This PR wires in RangeFeed budget to use KV pool which is downstream
from SQL pool. Budget is using shared pool for montioring purposes
and limits each individual pool at 20% of total allocation for tenant
ranges and 128MB for system ranges.

Release note: None


Co-authored-by: Oleg Afanasyev <[email protected]>
  • Loading branch information
craig[bot] and aliher1911 committed Feb 27, 2022
2 parents eee8ef0 + 3afb369 commit 1cd61cf
Show file tree
Hide file tree
Showing 10 changed files with 184 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ go_library(
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/util/bufalloc",
"//pkg/util/envutil",
"//pkg/util/hlc",
"//pkg/util/interval",
"//pkg/util/log",
"//pkg/util/log/logcrash",
"//pkg/util/metric",
"//pkg/util/mon",
"//pkg/util/protoutil",
Expand Down
110 changes: 107 additions & 3 deletions pkg/kv/kvserver/rangefeed/budget.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,41 @@ import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)

// useBudgets controls if RangeFeed memory budgets are enabled. Overridable by
// environment variable.
var useBudgets = envutil.EnvOrDefaultBool("COCKROACH_USE_RANGEFEED_MEM_BUDGETS", true)

// totalSharedFeedBudgetFraction is maximum percentage of SQL memory pool that
// could be used by all feed budgets together. Overridable by environment
// variable.
var totalSharedFeedBudgetFraction = envutil.EnvOrDefaultFloat64("COCKROACH_RANGEFEED_FEED_MEM_FRACTION",
0.5)

// maxFeedFraction is maximum percentage of feed memory pool that could be
// allocated to a single feed budget. Overridable by environment variable.
// With 32 GB node and 0.25 sql memory budget (8 GB) each range would get max of
// 8 * 0.5 * 0.05 = 200 MB budget limit.
// With 8 GB node and 0.25 sql memory budget (2 GB) each range would get max of
// 2 * 0.5 * 0.05 = 50 MB budget limit.
var maxFeedFraction = envutil.EnvOrDefaultFloat64("COCKROACH_RANGEFEED_TOTAL_MEM_FRACTION", 0.05)

// Pre allocated memory limit for system RangeFeeds. Each event should never
// exceed 64 MB as it would fail to write to raft log. We don't expect system
// ranges to have such objects, but we'll have a multiple of those just in case.
var systemRangeFeedBudget = envutil.EnvOrDefaultInt64("COCKROACH_RANGEFEED_SYSTEM_BUDGET",
2*64*1024*1024 /* 128MB */)

var budgetAllocationSyncPool = sync.Pool{
New: func() interface{} {
return new(SharedBudgetAllocation)
Expand Down Expand Up @@ -103,8 +131,8 @@ func (f *FeedBudget) TryGet(ctx context.Context, amount int64) (*SharedBudgetAll
f.mu.Lock()
defer f.mu.Unlock()
if f.mu.closed {
logcrash.ReportOrPanic(ctx, nil, "budget unexpectedly closed")
return nil, errors.AssertionFailedf("budget unexpectedly closed")
log.Info(ctx, "trying to get allocation from already closed budget")
return nil, errors.Errorf("budget unexpectedly closed")
}
var err error
if f.mu.memBudget.Used()+amount > f.limit {
Expand Down Expand Up @@ -202,3 +230,79 @@ func (a *SharedBudgetAllocation) Release(ctx context.Context) {
putPooledBudgetAllocation(a)
}
}

// BudgetFactory creates memory budget for rangefeed according to system
// settings.
type BudgetFactory struct {
limit int64
feedBytesMon *mon.BytesMonitor
systemFeedBytesMon *mon.BytesMonitor

metrics *FeedBudgetPoolMetrics
}

// NewBudgetFactory creates a factory callback that would create RangeFeed
// memory budget according to system policy.
func NewBudgetFactory(
ctx context.Context,
rootMon *mon.BytesMonitor,
memoryPoolSize int64,
histogramWindowInterval time.Duration,
) *BudgetFactory {
if !useBudgets || rootMon == nil {
return nil
}
totalRangeReedBudget := int64(float64(memoryPoolSize) * totalSharedFeedBudgetFraction)
feedSizeLimit := int64(float64(totalRangeReedBudget) * maxFeedFraction)

metrics := NewFeedBudgetMetrics(histogramWindowInterval)
systemRangeMonitor := mon.NewMonitorInheritWithLimit("rangefeed-system-monitor",
systemRangeFeedBudget, rootMon)
systemRangeMonitor.SetMetrics(metrics.SystemBytesCount, nil /* maxHist */)
systemRangeMonitor.Start(ctx, rootMon,
mon.MakeStandaloneBudget(systemRangeFeedBudget))

rangeFeedPoolMonitor := mon.NewMonitorInheritWithLimit("rangefeed-monitor", totalRangeReedBudget,
rootMon)
rangeFeedPoolMonitor.SetMetrics(metrics.SharedBytesCount, nil /* maxHist */)
rangeFeedPoolMonitor.Start(ctx, rootMon, mon.BoundAccount{})

return &BudgetFactory{
limit: feedSizeLimit,
feedBytesMon: rangeFeedPoolMonitor,
systemFeedBytesMon: systemRangeMonitor,
metrics: metrics,
}
}

// Stop stops underlying memory monitors used by factory.
// Safe to call on nil factory.
func (f *BudgetFactory) Stop(ctx context.Context) {
if f == nil {
return
}
f.systemFeedBytesMon.Stop(ctx)
f.feedBytesMon.Stop(ctx)
}

// CreateBudget creates feed budget using memory pools configured in the
// factory. It is safe to call on nil factory as it will produce nil budget
// which in turn disables memory accounting on range feed.
func (f *BudgetFactory) CreateBudget(key roachpb.RKey) *FeedBudget {
if f == nil {
return nil
}
// We use any table with reserved ID in system tenant as system case.
if key.Less(roachpb.RKey(keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID + 1))) {
acc := f.systemFeedBytesMon.MakeBoundAccount()
return NewFeedBudget(&acc, 0)
}
acc := f.feedBytesMon.MakeBoundAccount()
return NewFeedBudget(&acc, f.limit)
}

// Metrics exposes Metrics for BudgetFactory so that they could be registered
// in the metric registry.
func (f *BudgetFactory) Metrics() *FeedBudgetPoolMetrics {
return f.metrics
}
21 changes: 21 additions & 0 deletions pkg/kv/kvserver/rangefeed/budget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -170,3 +171,23 @@ func TestFeedBudget(t *testing.T) {
require.Error(t, err)
})
}

func TestBudgetFactory(t *testing.T) {
rootMon := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil)
rootMon.Start(context.Background(), nil, mon.MakeStandaloneBudget(10000000))
bf := NewBudgetFactory(context.Background(), rootMon, 10000, time.Second*5)

// Verify system ranges use own budget.
bSys := bf.CreateBudget(keys.MustAddr(keys.Meta1Prefix))
_, e := bSys.TryGet(context.Background(), 199)
require.NoError(t, e, "failed to obtain system range budget")
require.Equal(t, int64(0), rootMon.AllocBytes(), "System feeds should borrow from own budget")
require.Equal(t, int64(199), bf.Metrics().SystemBytesCount.Value(), "Metric was not updated")

// Verify user feeds use shared root budget.
bUsr := bf.CreateBudget(keys.MustAddr(keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID + 1)))
_, e = bUsr.TryGet(context.Background(), 99)
require.NoError(t, e, "failed to obtain non-system budget")
require.Equal(t, int64(99), rootMon.AllocBytes(), "Non-system feeds should borrow from shared budget")
require.Equal(t, int64(99), bf.Metrics().SharedBytesCount.Value(), "Metric was not updated")
}
29 changes: 29 additions & 0 deletions pkg/kv/kvserver/rangefeed/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,32 @@ func NewMetrics() *Metrics {
RangeFeedSlowClosedTimestampNudgeSem: make(chan struct{}, 1024),
}
}

// FeedBudgetPoolMetrics holds metrics for RangeFeed budgets for the purpose
// or registration in a metric registry.
type FeedBudgetPoolMetrics struct {
SystemBytesCount *metric.Gauge
SharedBytesCount *metric.Gauge
}

// MetricStruct implements metrics.Struct interface.
func (FeedBudgetPoolMetrics) MetricStruct() {}

// NewFeedBudgetMetrics creates new metrics for RangeFeed budgets.
func NewFeedBudgetMetrics(histogramWindow time.Duration) *FeedBudgetPoolMetrics {
makeMemMetricMetadata := func(name, help string) metric.Metadata {
return metric.Metadata{
Name: "kv.rangefeed.mem_" + name,
Help: help,
Measurement: "Memory",
Unit: metric.Unit_BYTES,
}
}

return &FeedBudgetPoolMetrics{
SystemBytesCount: metric.NewGauge(makeMemMetricMetadata("system",
"Memory usage by rangefeeds on system ranges")),
SharedBytesCount: metric.NewGauge(makeMemMetricMetadata("shared",
"Memory usage by rangefeeds")),
}
}
2 changes: 0 additions & 2 deletions pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,6 @@ func TestBudgetReleaseOnProcessorStop(t *testing.T) {
PushTxnsAge: pushTxnAge,
EventChanCap: channelCapacity,
CheckStreamsInterval: 10 * time.Millisecond,
Metrics: NewMetrics(),
MemBudget: fb,
})
require.NoError(t, p.Start(stopper, nil))
Expand Down Expand Up @@ -1264,7 +1263,6 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) {
PushTxnsAge: pushTxnAge,
EventChanCap: channelCapacity,
CheckStreamsInterval: 10 * time.Millisecond,
Metrics: NewMetrics(),
MemBudget: fb,
})
require.NoError(t, p.Start(stopper, nil))
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,8 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
}
r.rangefeedMu.Unlock()

feedBudget := r.store.GetStoreConfig().RangefeedBudgetFactory.CreateBudget(r.startKey)

// Create a new rangefeed.
desc := r.Desc()
tp := rangefeedTxnPusher{ir: r.store.intentResolver, r: r}
Expand All @@ -378,6 +380,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
EventChanCap: defaultEventChanCap,
EventChanTimeout: 50 * time.Millisecond,
Metrics: r.store.metrics.RangeFeedMetrics,
MemBudget: feedBudget,
}
p = rangefeed.NewProcessor(cfg)

Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/tenantrate"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/tscache"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnrecovery"
Expand Down Expand Up @@ -1050,7 +1051,8 @@ type StoreConfig struct {

// KV Memory Monitor. Must be non-nil for production, and can be nil in some
// tests.
KVMemoryMonitor *mon.BytesMonitor
KVMemoryMonitor *mon.BytesMonitor
RangefeedBudgetFactory *rangefeed.BudgetFactory

// SpanConfigsDisabled determines whether we're able to use the span configs
// infrastructure or not.
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ go_library(
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/kv/kvserver/protectedts/ptprovider",
"//pkg/kv/kvserver/protectedts/ptreconcile",
"//pkg/kv/kvserver/rangefeed",
"//pkg/kv/kvserver/reports",
"//pkg/migration",
"//pkg/migration/migrationcluster",
Expand Down
11 changes: 11 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptprovider"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptreconcile"
serverrangefeed "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/reports"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
Expand Down Expand Up @@ -484,6 +485,15 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
kvMemoryMonitor := mon.NewMonitorInheritWithLimit(
"kv-mem", 0 /* limit */, sqlMonitorAndMetrics.rootSQLMemoryMonitor)
kvMemoryMonitor.Start(ctx, sqlMonitorAndMetrics.rootSQLMemoryMonitor, mon.BoundAccount{})
rangeReedBudgetFactory := serverrangefeed.NewBudgetFactory(ctx, kvMemoryMonitor, cfg.MemoryPoolSize,
cfg.HistogramWindowInterval())
if rangeReedBudgetFactory != nil {
registry.AddMetricStruct(rangeReedBudgetFactory.Metrics())
}
// Closer order is important with BytesMonitor.
stopper.AddCloser(stop.CloserFn(func() {
rangeReedBudgetFactory.Stop(ctx)
}))
stopper.AddCloser(stop.CloserFn(func() {
kvMemoryMonitor.Stop(ctx)
}))
Expand Down Expand Up @@ -529,6 +539,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
ExternalStorageFromURI: externalStorageFromURI,
ProtectedTimestampCache: protectedtsProvider,
KVMemoryMonitor: kvMemoryMonitor,
RangefeedBudgetFactory: rangeReedBudgetFactory,
SystemConfigProvider: systemConfigWatcher,
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,12 +567,19 @@ var charts = []sectionDescription{
},
},
{
Title: "Rangefeed Memory Budgeting",
Title: "Rangefeed Memory Allocations",
Metrics: []string{
"kv.rangefeed.budget_allocation_failed",
"kv.rangefeed.budget_allocation_blocked",
},
},
{
Title: "Memory Usage",
Metrics: []string{
"kv.rangefeed.mem_shared",
"kv.rangefeed.mem_system",
},
},
{
Title: "Snapshots",
Metrics: []string{
Expand Down

0 comments on commit 1cd61cf

Please sign in to comment.