Skip to content

Commit

Permalink
server,admission: ensure admission control uses annotated contexts
Browse files Browse the repository at this point in the history
This ensures the activity is present in traces, logs, etc
and that the server IDs are properly reported.

Release note: None
  • Loading branch information
knz committed Nov 11, 2021
1 parent c14ece7 commit 465c8ea
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 33 deletions.
22 changes: 12 additions & 10 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,15 +450,17 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
}
tcsFactory := kvcoord.NewTxnCoordSenderFactory(txnCoordSenderFactoryCfg, distSender)

gcoords, metrics := admission.NewGrantCoordinators(admission.Options{
MinCPUSlots: 1,
MaxCPUSlots: 100000, /* TODO(sumeer): add cluster setting */
SQLKVResponseBurstTokens: 100000, /* TODO(sumeer): add cluster setting */
SQLSQLResponseBurstTokens: 100000, /* arbitrary, and unused */
SQLStatementLeafStartWorkSlots: 100, /* arbitrary, and unused */
SQLStatementRootStartWorkSlots: 100, /* arbitrary, and unused */
Settings: st,
})
gcoords, metrics := admission.NewGrantCoordinators(
cfg.AmbientCtx,
admission.Options{
MinCPUSlots: 1,
MaxCPUSlots: 100000, /* TODO(sumeer): add cluster setting */
SQLKVResponseBurstTokens: 100000, /* TODO(sumeer): add cluster setting */
SQLSQLResponseBurstTokens: 100000, /* arbitrary, and unused */
SQLStatementLeafStartWorkSlots: 100, /* arbitrary, and unused */
SQLStatementRootStartWorkSlots: 100, /* arbitrary, and unused */
Settings: st,
})
for i := range metrics {
registry.AddMetricStruct(metrics[i])
}
Expand Down Expand Up @@ -1635,7 +1637,7 @@ func (s *Server) PreStart(ctx context.Context) error {
return err
}
// Stores have been initialized, so Node can now provide Pebble metrics.
s.storeGrantCoords.SetPebbleMetricsProvider(s.node)
s.storeGrantCoords.SetPebbleMetricsProvider(ctx, s.node)

log.Event(ctx, "started node")
if err := s.startPersistingHLCUpperBound(
Expand Down
50 changes: 34 additions & 16 deletions pkg/util/admission/granter.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,8 @@ func (sg *kvGranter) setAvailableIOTokensLocked(tokens int64) {
// StoreGrantCoordinators) for KVWork that uses that store. See the
// NewGrantCoordinators and NewGrantCoordinatorSQL functions.
type GrantCoordinator struct {
ambientCtx log.AmbientContext

settings *cluster.Settings
lastCPULoadSamplePeriod time.Duration

Expand Down Expand Up @@ -630,7 +632,9 @@ type makeRequesterFunc func(
// chains to delay admission through the per store GrantCoordinators since
// they are not trying to control CPU usage, so we turn off grant chaining in
// those coordinators.
func NewGrantCoordinators(opts Options) (GrantCoordinators, []metric.Struct) {
func NewGrantCoordinators(
ambientCtx log.AmbientContext, opts Options,
) (GrantCoordinators, []metric.Struct) {
makeRequester := makeWorkQueue
if opts.makeRequesterFunc != nil {
makeRequester = opts.makeRequesterFunc
Expand All @@ -646,6 +650,7 @@ func NewGrantCoordinators(opts Options) (GrantCoordinators, []metric.Struct) {
totalSlotsMetric: metrics.KVTotalSlots,
}
coord := &GrantCoordinator{
ambientCtx: ambientCtx,
settings: st,
cpuOverloadIndicator: kvSlotAdjuster,
cpuLoadListener: kvSlotAdjuster,
Expand Down Expand Up @@ -729,7 +734,9 @@ func NewGrantCoordinators(opts Options) (GrantCoordinators, []metric.Struct) {
// NewGrantCoordinatorSQL constructs a GrantCoordinator and WorkQueues for a
// single-tenant SQL node in a multi-tenant cluster. Caller is responsible for
// hooking this up to receive calls to CPULoad.
func NewGrantCoordinatorSQL(opts Options) (*GrantCoordinator, []metric.Struct) {
func NewGrantCoordinatorSQL(
ambientCtx log.AmbientContext, opts Options,
) (*GrantCoordinator, []metric.Struct) {
makeRequester := makeWorkQueue
if opts.makeRequesterFunc != nil {
makeRequester = opts.makeRequesterFunc
Expand All @@ -740,6 +747,7 @@ func NewGrantCoordinatorSQL(opts Options) (*GrantCoordinator, []metric.Struct) {
metricStructs := append([]metric.Struct(nil), metrics)
sqlNodeCPU := &sqlNodeCPUOverloadIndicator{}
coord := &GrantCoordinator{
ambientCtx: ambientCtx,
settings: st,
cpuOverloadIndicator: sqlNodeCPU,
cpuLoadListener: sqlNodeCPU,
Expand Down Expand Up @@ -814,8 +822,8 @@ func appendMetricStructsForQueues(ms []metric.Struct, coord *GrantCoordinator) [
// pebbleMetricsTick is called every adjustmentInterval seconds and passes
// through to the ioLoadListener, so that it can adjust the plan for future IO
// token allocations.
func (coord *GrantCoordinator) pebbleMetricsTick(m pebble.Metrics) {
coord.ioLoadListener.pebbleMetricsTick(m)
func (coord *GrantCoordinator) pebbleMetricsTick(ctx context.Context, m pebble.Metrics) {
coord.ioLoadListener.pebbleMetricsTick(ctx, m)
}

// allocateIOTokensTick tells the ioLoadListener to allocate tokens.
Expand Down Expand Up @@ -855,9 +863,11 @@ func (coord *GrantCoordinator) GetWorkQueue(workKind WorkKind) *WorkQueue {
// take into account the latest schedulers stats (indirectly, via the
// implementation of cpuOverloadIndicator).
func (coord *GrantCoordinator) CPULoad(runnable int, procs int, samplePeriod time.Duration) {
ctx := coord.ambientCtx.AnnotateCtx(context.Background())

if coord.lastCPULoadSamplePeriod != 0 && coord.lastCPULoadSamplePeriod != samplePeriod &&
KVAdmissionControlEnabled.Get(&coord.settings.SV) {
log.Infof(context.Background(), "CPULoad switching to period %s", samplePeriod.String())
log.Infof(ctx, "CPULoad switching to period %s", samplePeriod.String())
}
coord.lastCPULoadSamplePeriod = samplePeriod

Expand Down Expand Up @@ -1130,6 +1140,8 @@ func (coord *GrantCoordinator) SafeFormat(s redact.SafePrinter, verb rune) {
// that is used for KV work admission that takes into account store health.
// Currently it is intended only for writes to stores.
type StoreGrantCoordinators struct {
ambientCtx log.AmbientContext

settings *cluster.Settings
makeRequesterFunc makeRequesterFunc
kvIOTokensExhaustedDuration *metric.Counter
Expand All @@ -1143,7 +1155,9 @@ type StoreGrantCoordinators struct {

// SetPebbleMetricsProvider sets a PebbleMetricsProvider and causes the load
// on the various storage engines to be used for admission control.
func (sgc *StoreGrantCoordinators) SetPebbleMetricsProvider(pmp PebbleMetricsProvider) {
func (sgc *StoreGrantCoordinators) SetPebbleMetricsProvider(
startupCtx context.Context, pmp PebbleMetricsProvider,
) {
if sgc.pebbleMetricsProvider != nil {
panic(errors.AssertionFailedf("SetPebbleMetricsProvider called more than once"))
}
Expand All @@ -1154,9 +1168,13 @@ func (sgc *StoreGrantCoordinators) SetPebbleMetricsProvider(pmp PebbleMetricsPro
for _, m := range metrics {
gc := sgc.initGrantCoordinator(m.StoreID)
sgc.gcMap[m.StoreID] = gc
gc.pebbleMetricsTick(*m.Metrics)
gc.pebbleMetricsTick(startupCtx, *m.Metrics)
gc.allocateIOTokensTick()
}

// Attach tracer and log tags.
ctx := sgc.ambientCtx.AnnotateCtx(context.Background())

go func() {
var ticks int64
ticker := time.NewTicker(time.Second)
Expand All @@ -1168,14 +1186,14 @@ func (sgc *StoreGrantCoordinators) SetPebbleMetricsProvider(pmp PebbleMetricsPro
if ticks%adjustmentInterval == 0 {
metrics := sgc.pebbleMetricsProvider.GetPebbleMetrics()
if len(metrics) != len(sgc.gcMap) {
log.Warningf(context.Background(),
log.Warningf(ctx,
"expected %d store metrics and found %d metrics", len(sgc.gcMap), len(metrics))
}
for _, m := range metrics {
if gc, ok := sgc.gcMap[m.StoreID]; ok {
gc.pebbleMetricsTick(*m.Metrics)
gc.pebbleMetricsTick(ctx, *m.Metrics)
} else {
log.Warningf(context.Background(),
log.Warningf(ctx,
"seeing metrics for unknown storeID %d", m.StoreID)
}
}
Expand Down Expand Up @@ -1470,7 +1488,7 @@ const adjustmentInterval = 15

// pebbleMetricsTicks is called every adjustmentInterval seconds, and decides
// the token allocations until the next call.
func (io *ioLoadListener) pebbleMetricsTick(m pebble.Metrics) {
func (io *ioLoadListener) pebbleMetricsTick(ctx context.Context, m pebble.Metrics) {
if !io.statsInitialized {
io.statsInitialized = true
// Initialize cumulative stats.
Expand All @@ -1481,7 +1499,7 @@ func (io *ioLoadListener) pebbleMetricsTick(m pebble.Metrics) {
io.totalTokens = unlimitedTokens
return
}
io.adjustTokens(m)
io.adjustTokens(ctx, m)
}

// allocateTokensTick gives out 1/adjustmentInterval of the totalTokens every
Expand Down Expand Up @@ -1519,7 +1537,7 @@ func (io *ioLoadListener) allocateTokensTick() {
// many bytes are being moved out of L0 via compactions with the average
// number of bytes being added to L0 per KV work. We want the former to be
// (significantly) larger so that L0 returns to a healthy state.
func (io *ioLoadListener) adjustTokens(m pebble.Metrics) {
func (io *ioLoadListener) adjustTokens(ctx context.Context, m pebble.Metrics) {
io.tokensAllocated = 0
// Grab the cumulative stats.
admittedCount := io.kvRequester.getAdmittedCount()
Expand All @@ -1530,7 +1548,7 @@ func (io *ioLoadListener) adjustTokens(m pebble.Metrics) {
if bytesAdded < 0 {
// bytesAdded is a simple delta computation over individually cumulative
// stats, so should not be negative.
log.Warningf(context.Background(), "bytesAdded %d is negative", bytesAdded)
log.Warningf(ctx, "bytesAdded %d is negative", bytesAdded)
bytesAdded = 0
}
// bytesRemoved are due to finished compactions.
Expand All @@ -1549,7 +1567,7 @@ func (io *ioLoadListener) adjustTokens(m pebble.Metrics) {
var admitted uint64
doLog := true
if admittedCount < io.admittedCount {
log.Warningf(context.Background(), "admitted count decreased from %d to %d",
log.Warningf(ctx, "admitted count decreased from %d to %d",
io.admittedCount, admittedCount)
} else {
admitted = admittedCount - io.admittedCount
Expand Down Expand Up @@ -1588,7 +1606,7 @@ func (io *ioLoadListener) adjustTokens(m pebble.Metrics) {
io.totalTokens = int64(io.smoothedNumAdmit)
}
if doLog {
log.Infof(context.Background(),
log.Infof(ctx,
"IO overload on store %d (files %d, sub-levels %d): admitted: %d, added: %d, "+
"removed (%d, %d), admit: (%f, %d)",
io.storeID, m.Levels[0].NumFiles, m.Levels[0].Sublevels, admitted, bytesAdded,
Expand Down
19 changes: 12 additions & 7 deletions pkg/util/admission/granter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func TestGranterBasic(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var ambientCtx log.AmbientContext
var requesters [numWorkKinds]*testRequester
var coord *GrantCoordinator
var buf strings.Builder
Expand Down Expand Up @@ -126,7 +127,7 @@ func TestGranterBasic(t *testing.T) {
return req
}
delayForGrantChainTermination = 0
coords, _ := NewGrantCoordinators(opts)
coords, _ := NewGrantCoordinators(ambientCtx, opts)
coord = coords.Regular
return flushAndReset()

Expand Down Expand Up @@ -232,6 +233,7 @@ func TestStoreCoordinators(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var ambientCtx log.AmbientContext
var buf strings.Builder
settings := cluster.MakeTestingClusterSettings()
// All the KVWork requesters. The first one is for all KVWork and the
Expand All @@ -253,7 +255,7 @@ func TestStoreCoordinators(t *testing.T) {
return req
},
}
coords, _ := NewGrantCoordinators(opts)
coords, _ := NewGrantCoordinators(ambientCtx, opts)
// There is only 1 KVWork requester at this point in initialization, for the
// Regular GrantCoordinator.
require.Equal(t, 1, len(requesters))
Expand All @@ -263,7 +265,7 @@ func TestStoreCoordinators(t *testing.T) {
mp.setMetricsForStores([]int32{10, 20}, metrics)
// Setting the metrics provider will cause the initialization of two
// GrantCoordinators for the two stores.
storeCoords.SetPebbleMetricsProvider(&mp)
storeCoords.SetPebbleMetricsProvider(context.Background(), &mp)
// Now we have 1+2 = 3 KVWork requesters.
require.Equal(t, 3, len(requesters))
// Confirm that the store IDs are as expected.
Expand Down Expand Up @@ -332,6 +334,7 @@ func TestIOLoadListener(t *testing.T) {
req := &testRequesterForIOLL{}
kvGranter := &testGranterWithIOTokens{}
var ioll *ioLoadListener
ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
datadriven.RunTest(t, "testdata/io_load_listener",
func(t *testing.T, d *datadriven.TestData) string {
Expand Down Expand Up @@ -367,7 +370,7 @@ func TestIOLoadListener(t *testing.T) {
ioll.mu.Mutex = &syncutil.Mutex{}
ioll.mu.kvGranter = kvGranter
}
ioll.pebbleMetricsTick(metrics)
ioll.pebbleMetricsTick(ctx, metrics)
// Do the ticks until just before next adjustment.
var buf strings.Builder
fmt.Fprintf(&buf, "admitted: %d, bytes: %d, added-bytes: %d,\nsmoothed-removed: %d, "+
Expand All @@ -390,6 +393,7 @@ func TestIOLoadListener(t *testing.T) {
func TestIOLoadListenerOverflow(t *testing.T) {
req := &testRequesterForIOLL{}
kvGranter := &testGranterWithIOTokens{}
ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
ioll := ioLoadListener{
settings: st,
Expand All @@ -412,8 +416,8 @@ func TestIOLoadListenerOverflow(t *testing.T) {
Sublevels: 100,
NumFiles: 10000,
}
ioll.pebbleMetricsTick(m)
ioll.pebbleMetricsTick(m)
ioll.pebbleMetricsTick(ctx, m)
ioll.pebbleMetricsTick(ctx, m)
ioll.allocateTokensTick()
}

Expand All @@ -430,6 +434,7 @@ func (g *testGranterNonNegativeTokens) setAvailableIOTokensLocked(tokens int64)
func TestBadIOLoadListenerStats(t *testing.T) {
var m pebble.Metrics
req := &testRequesterForIOLL{}
ctx := context.Background()

randomValues := func() {
// Use uints, and cast so that we get bad negative values.
Expand All @@ -450,7 +455,7 @@ func TestBadIOLoadListenerStats(t *testing.T) {
ioll.mu.kvGranter = kvGranter
for i := 0; i < 100; i++ {
randomValues()
ioll.pebbleMetricsTick(m)
ioll.pebbleMetricsTick(ctx, m)
for j := 0; j < adjustmentInterval; j++ {
ioll.allocateTokensTick()
require.LessOrEqual(t, int64(0), ioll.totalTokens)
Expand Down

0 comments on commit 465c8ea

Please sign in to comment.