Skip to content

Commit

Permalink
kvserver: prioritize system spans in rangefeed scheduler
Browse files Browse the repository at this point in the history
This patch marks system spans as having priority in the rangefeed
scheduler. We currently do this based on the descriptor ID, although
this no longer strictly holds with dynamic system table IDs.

Epic: none
Release note: None
  • Loading branch information
erikgrinaker committed Sep 22, 2023
1 parent b066349 commit 0f0fb8c
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 14 deletions.
6 changes: 2 additions & 4 deletions pkg/kv/kvserver/rangefeed/budget.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/mon"
Expand Down Expand Up @@ -364,7 +362,7 @@ func (f *BudgetFactory) Stop(ctx context.Context) {
// CreateBudget creates feed budget using memory pools configured in the
// factory. It is safe to call on nil factory as it will produce nil budget
// which in turn disables memory accounting on range feed.
func (f *BudgetFactory) CreateBudget(key roachpb.RKey) *FeedBudget {
func (f *BudgetFactory) CreateBudget(isSystem bool) *FeedBudget {
if f == nil {
return nil
}
Expand All @@ -373,7 +371,7 @@ func (f *BudgetFactory) CreateBudget(key roachpb.RKey) *FeedBudget {
return nil
}
// We use any table with reserved ID in system tenant as system case.
if key.Less(roachpb.RKey(keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID + 1))) {
if isSystem {
acc := f.systemFeedBytesMon.MakeBoundAccount()
return NewFeedBudget(&acc, 0, f.settings)
}
Expand Down
15 changes: 6 additions & 9 deletions pkg/kv/kvserver/rangefeed/budget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -195,14 +193,14 @@ func TestBudgetFactory(t *testing.T) {
CreateBudgetFactoryConfig(rootMon, 10000, time.Second*5, budgetLowThresholdFn(10000), &s.SV))

// Verify system ranges use own budget.
bSys := bf.CreateBudget(keys.MustAddr(keys.Meta1Prefix))
bSys := bf.CreateBudget(true)
_, e := bSys.TryGet(context.Background(), 199)
require.NoError(t, e, "failed to obtain system range budget")
require.Equal(t, int64(0), rootMon.AllocBytes(), "System feeds should borrow from own budget")
require.Equal(t, int64(199), bf.Metrics().SystemBytesCount.Value(), "Metric was not updated")

// Verify user feeds use shared root budget.
bUsr := bf.CreateBudget(keys.MustAddr(keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID + 1)))
bUsr := bf.CreateBudget(false)
_, e = bUsr.TryGet(context.Background(), 99)
require.NoError(t, e, "failed to obtain non-system budget")
require.Equal(t, int64(99), rootMon.AllocBytes(),
Expand All @@ -220,7 +218,7 @@ func TestDisableBudget(t *testing.T) {
return 0
}, &s.SV))

bUsr := bf.CreateBudget(keys.MustAddr(keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID + 1)))
bUsr := bf.CreateBudget(false)
require.Nil(t, bUsr, "Range budget when budgets are disabled.")
}

Expand All @@ -239,7 +237,7 @@ func TestDisableBudgetOnTheFly(t *testing.T) {
},
&s.SV))

f := bf.CreateBudget(keys.MustAddr(keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID + 1)))
f := bf.CreateBudget(false)

objectSize := int64(1000)
alloc, err := f.TryGet(context.Background(), objectSize)
Expand Down Expand Up @@ -302,8 +300,7 @@ func TestBudgetLimits(t *testing.T) {
settings: &s.SV,
})

userKey := roachpb.RKey(keys.ScratchRangeMin)
b := bf.CreateBudget(userKey)
b := bf.CreateBudget(false)
require.NotNil(t, b, "budget is disabled")
require.Equal(t, b.limit, adjustedSize, "budget limit is not adjusted")

Expand All @@ -318,6 +315,6 @@ func TestBudgetLimits(t *testing.T) {
histogramWindowInterval: time.Second * 5,
settings: &s.SV,
})
b = bf.CreateBudget(userKey)
b = bf.CreateBudget(false)
require.Nil(t, b, "budget is disabled")
}
12 changes: 11 additions & 1 deletion pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,17 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
}
r.rangefeedMu.Unlock()

// Determine if this is a system span, which should get priority.
//
// TODO(erikgrinaker): With dynamic system tables, this should really check
// catalog.IsSystemDescriptor() for the table descriptor, but we don't have
// easy access to it here. Consider plumbing this down from the client
// instead. See: https://github.com/cockroachdb/cockroach/issues/110883
isSystemSpan := span.EndKey.Compare(
roachpb.RKey(keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID+1))) <= 0

// Create a new rangefeed.
feedBudget := r.store.GetStoreConfig().RangefeedBudgetFactory.CreateBudget(r.startKey)
feedBudget := r.store.GetStoreConfig().RangefeedBudgetFactory.CreateBudget(isSystemSpan)

var sched *rangefeed.Scheduler
if shouldUseRangefeedScheduler(&r.ClusterSettings().SV) {
Expand All @@ -433,6 +442,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
Metrics: r.store.metrics.RangeFeedMetrics,
MemBudget: feedBudget,
Scheduler: sched,
Priority: isSystemSpan, // only takes effect when Scheduler != nil
}
p = rangefeed.NewProcessor(cfg)

Expand Down

0 comments on commit 0f0fb8c

Please sign in to comment.