diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go index 5caf3f67343a..6610f902ba15 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go @@ -31,11 +31,6 @@ func TestDispatch(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - reverseWorkPriorityDict := make(map[string]admissionpb.WorkPriority) - for k, v := range admissionpb.WorkPriorityDict { - reverseWorkPriorityDict[v] = k - } - datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) { var dispatch *Dispatch datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { @@ -82,7 +77,7 @@ func TestDispatch(t *testing.T) { case strings.HasPrefix(parts[i], "pri="): // Parse pri=. - pri, found := reverseWorkPriorityDict[arg] + pri, found := admissionpb.TestingReverseWorkPriorityDict[arg] require.True(t, found) entries.AdmissionPriority = int32(pri) diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go index 858bda480797..7793501e0b75 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go @@ -32,19 +32,10 @@ func TestTracker(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - reverseWorkPriorityDict := make(map[string]admissionpb.WorkPriority) - for k, v := range admissionpb.WorkPriorityDict { - reverseWorkPriorityDict[v] = k - } - ctx := context.Background() datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) { var tracker *Tracker - knobs := &kvflowcontrol.TestingKnobs{ - UntrackTokensInterceptor: func(tokens kvflowcontrol.Tokens, pos kvflowcontrolpb.RaftLogPosition) { - - }, - } + knobs := &kvflowcontrol.TestingKnobs{} datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { case "init": @@ -73,7 +64,7 @@ func TestTracker(t *testing.T) { switch { case strings.HasPrefix(parts[i], "pri="): var found bool - pri, found = reverseWorkPriorityDict[arg] + pri, found = admissionpb.TestingReverseWorkPriorityDict[arg] require.True(t, found) case strings.HasPrefix(parts[i], "tokens="): @@ -103,7 +94,7 @@ func TestTracker(t *testing.T) { var priStr, logPositionStr string d.ScanArgs(t, "pri", &priStr) d.ScanArgs(t, "up-to-log-position", &logPositionStr) - pri, found := reverseWorkPriorityDict[priStr] + pri, found := admissionpb.TestingReverseWorkPriorityDict[priStr] require.True(t, found) logPosition := parseLogPosition(t, logPositionStr) diff --git a/pkg/server/server.go b/pkg/server/server.go index bd640e6d4940..797efa152566 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -269,7 +269,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { if opts, ok := cfg.TestingKnobs.AdmissionControl.(*admission.Options); ok { admissionOptions.Override(opts) } - gcoords := admission.NewGrantCoordinators(cfg.AmbientCtx, st, admissionOptions, registry) + gcoords := admission.NewGrantCoordinators(cfg.AmbientCtx, st, admissionOptions, registry, &admission.NoopOnLogEntryAdmitted{}) engines, err := cfg.CreateEngines(ctx) if err != nil { diff --git a/pkg/util/admission/admission.go b/pkg/util/admission/admission.go index e59d16927952..4e285c42ee3d 100644 --- a/pkg/util/admission/admission.go +++ b/pkg/util/admission/admission.go @@ -178,7 +178,7 @@ type granter interface { // is a possibility that that raced with cancellation. // // Do not use this for doing store IO-related token adjustments when work is - // done -- that should be done via granterWithStoreWriteDone.storeWriteDone. + // done -- that should be done via granterWithStoreReplicatedWorkAdmitted.storeWriteDone. // // REQUIRES: count > 0. count == 1 for slots. returnGrant(count int64) @@ -195,7 +195,7 @@ type granter interface { // work turned out to be an underestimate. // // Do not use this for doing store IO-related token adjustments when work is - // done -- that should be done via granterWithStoreWriteDone.storeWriteDone. + // done -- that should be done via granterWithStoreReplicatedWorkAdmitted.storeWriteDone. // // REQUIRES: count > 0. count == 1 for slots. tookWithoutPermission(count int64) @@ -274,23 +274,30 @@ type granterWithIOTokens interface { // getDiskTokensUsedAndReset returns the disk bandwidth tokens used // since the last such call. getDiskTokensUsedAndReset() [admissionpb.NumWorkClasses]int64 - // setAdmittedDoneModelsLocked supplies the models to use when - // storeWriteDone is called, to adjust token consumption. Note that these - // models are not used for token adjustment at admission time -- that is - // handled by StoreWorkQueue and is not in scope of this granter. This - // asymmetry is due to the need to use all the functionality of WorkQueue at - // admission time. See the long explanatory comment at the beginning of - // store_token_estimation.go, regarding token estimation. - setAdmittedDoneModels(l0WriteLM tokensLinearModel, l0IngestLM tokensLinearModel, - ingestLM tokensLinearModel) + // setLinearModels supplies the models to use when storeWriteDone or + // storeReplicatedWorkAdmittedLocked is called, to adjust token consumption. + // Note that these models are not used for token adjustment at admission + // time -- that is handled by StoreWorkQueue and is not in scope of this + // granter. This asymmetry is due to the need to use all the functionality + // of WorkQueue at admission time. See the long explanatory comment at the + // beginning of store_token_estimation.go, regarding token estimation. + setLinearModels(l0WriteLM, l0IngestLM, ingestLM tokensLinearModel) } -// granterWithStoreWriteDone is used to abstract kvStoreTokenGranter for -// testing. The interface is used by StoreWorkQueue to pass on sizing -// information provided when the work was completed. -type granterWithStoreWriteDone interface { +// granterWithStoreReplicatedWorkAdmitted is used to abstract +// kvStoreTokenGranter for testing. The interface is used by StoreWorkQueue to +// pass on sizing information provided when the work is either done (for legacy, +// above-raft IO admission) or admitted (for below-raft, asynchronous admission +// control. +type granterWithStoreReplicatedWorkAdmitted interface { granter + // storeWriteDone is used by legacy, above-raft IO admission control to + // inform granters of when the write was actually done, post-admission. At + // admit-time we did now have sizing info for these writes, so by + // intercepting these writes at admit time we're able to make any + // outstanding token adjustments in the granter. storeWriteDone(originalTokens int64, doneInfo StoreWorkDoneInfo) (additionalTokens int64) + storeReplicatedWorkAdmittedLocked(originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo) (additionalTokens int64) } // cpuOverloadIndicator is meant to be an instantaneous indicator of cpu diff --git a/pkg/util/admission/admissionpb/admissionpb.go b/pkg/util/admission/admissionpb/admissionpb.go index 57dc9911080f..c230a5d11c38 100644 --- a/pkg/util/admission/admissionpb/admissionpb.go +++ b/pkg/util/admission/admissionpb/admissionpb.go @@ -53,7 +53,7 @@ func (w WorkPriority) SafeFormat(p redact.SafePrinter, verb rune) { p.Print(s) return } - p.Printf("custom-pri=%d", w) + p.Printf("custom-pri=%d", int8(w)) } // WorkPriorityDict is a mapping of the priorities to a short string name. The @@ -69,6 +69,17 @@ var WorkPriorityDict = map[WorkPriority]string{ HighPri: "high-pri", } +// TestingReverseWorkPriorityDict is the reverse-lookup dictionary for +// WorkPriorityDict, for use in tests. +var TestingReverseWorkPriorityDict map[string]WorkPriority + +func init() { + TestingReverseWorkPriorityDict = make(map[string]WorkPriority) + for k, v := range WorkPriorityDict { + TestingReverseWorkPriorityDict[v] = k + } +} + // WorkClass represents the class of work, which is defined entirely by its // WorkPriority. Namely, everything less than NormalPri is defined to be // "Elastic", while everything above and including NormalPri is considered diff --git a/pkg/util/admission/elastic_cpu_work_handle.go b/pkg/util/admission/elastic_cpu_work_handle.go index 85c5561304b5..42e60594419c 100644 --- a/pkg/util/admission/elastic_cpu_work_handle.go +++ b/pkg/util/admission/elastic_cpu_work_handle.go @@ -151,9 +151,8 @@ func TestingNewElasticCPUHandle() *ElasticCPUWorkHandle { return newElasticCPUWorkHandle(420 * time.Hour) // use a very high allotment } -// TestingNewElasticCPUWithCallback constructs an -// ElascticCPUWorkHandle with a testing override for the behaviour of -// OverLimit(). +// TestingNewElasticCPUHandleWithCallback constructs an ElasticCPUWorkHandle +// with a testing override for the behaviour of OverLimit(). func TestingNewElasticCPUHandleWithCallback(cb func() (bool, time.Duration)) *ElasticCPUWorkHandle { h := TestingNewElasticCPUHandle() h.testingOverrideOverLimit = cb diff --git a/pkg/util/admission/grant_coordinator.go b/pkg/util/admission/grant_coordinator.go index edf2b24b28e9..582bc5d6c7d0 100644 --- a/pkg/util/admission/grant_coordinator.go +++ b/pkg/util/admission/grant_coordinator.go @@ -61,6 +61,7 @@ type StoreGrantCoordinators struct { // api. numStores int pebbleMetricsProvider PebbleMetricsProvider + onLogEntryAdmitted OnLogEntryAdmitted closeCh chan struct{} disableTickerForTesting bool @@ -157,7 +158,7 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID) // This is IO work, so override the usesTokens value. opts.usesTokens = true // TODO(sumeer): add per-store WorkQueue state for debug.zip and db console. - granters := [admissionpb.NumWorkClasses]granterWithStoreWriteDone{ + granters := [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted{ &kvStoreTokenChildGranter{ workClass: admissionpb.RegularWorkClass, parent: kvg, @@ -168,7 +169,17 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID) }, } - storeReq := sgc.makeStoreRequesterFunc(sgc.ambientCtx, storeID, granters, sgc.settings, sgc.workQueueMetrics, opts, nil) + storeReq := sgc.makeStoreRequesterFunc( + sgc.ambientCtx, + storeID, + granters, + sgc.settings, + sgc.workQueueMetrics, + opts, + nil, /* knobs */ + sgc.onLogEntryAdmitted, + &coord.mu.Mutex, + ) coord.queues[KVWork] = storeReq requesters := storeReq.getRequesters() kvg.regularRequester = requesters[admissionpb.RegularWorkClass] @@ -336,8 +347,9 @@ type makeRequesterFunc func( metrics *WorkQueueMetrics, opts workQueueOptions) requester type makeStoreRequesterFunc func( - _ log.AmbientContext, storeID roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, + _ log.AmbientContext, storeID roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted, settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, knobs *TestingKnobs, + onLogEntryAdmitted OnLogEntryAdmitted, coordMu *syncutil.Mutex, ) storeRequester // NewGrantCoordinators constructs GrantCoordinators and WorkQueues for a @@ -356,13 +368,17 @@ type makeStoreRequesterFunc func( // GrantCoordinators since they are not trying to control CPU usage, so we turn // off grant chaining in those coordinators. func NewGrantCoordinators( - ambientCtx log.AmbientContext, st *cluster.Settings, opts Options, registry *metric.Registry, + ambientCtx log.AmbientContext, + st *cluster.Settings, + opts Options, + registry *metric.Registry, + onLogEntryAdmitted OnLogEntryAdmitted, ) GrantCoordinators { metrics := makeGrantCoordinatorMetrics() registry.AddMetricStruct(metrics) return GrantCoordinators{ - Stores: makeStoresGrantCoordinators(ambientCtx, opts, st, metrics, registry), + Stores: makeStoresGrantCoordinators(ambientCtx, opts, st, metrics, registry, onLogEntryAdmitted), Regular: makeRegularGrantCoordinator(ambientCtx, opts, st, metrics, registry), Elastic: makeElasticGrantCoordinator(ambientCtx, st, registry), } @@ -399,6 +415,7 @@ func makeStoresGrantCoordinators( st *cluster.Settings, metrics GrantCoordinatorMetrics, registry *metric.Registry, + onLogEntryAdmitted OnLogEntryAdmitted, ) *StoreGrantCoordinators { // These metrics are shared across all stores and broken down by priority for // the common priorities. @@ -417,6 +434,7 @@ func makeStoresGrantCoordinators( makeStoreRequesterFunc: makeStoreRequester, kvIOTokensExhaustedDuration: metrics.KVIOTokensExhaustedDuration, workQueueMetrics: storeWorkQueueMetrics, + onLogEntryAdmitted: onLogEntryAdmitted, } return storeCoordinators } diff --git a/pkg/util/admission/granter.go b/pkg/util/admission/granter.go index 8787020d0886..a117e6141861 100644 --- a/pkg/util/admission/granter.go +++ b/pkg/util/admission/granter.go @@ -324,7 +324,7 @@ type kvStoreTokenChildGranter struct { parent *kvStoreTokenGranter } -var _ granterWithStoreWriteDone = &kvStoreTokenChildGranter{} +var _ granterWithStoreReplicatedWorkAdmitted = &kvStoreTokenChildGranter{} var _ granter = &kvStoreTokenChildGranter{} // grantKind implements granter. @@ -352,11 +352,23 @@ func (cg *kvStoreTokenChildGranter) continueGrantChain(grantChainID grantChainID // Ignore since grant chains are not used for store tokens. } -// storeWriteDone implements granterWithStoreWriteDone. +// storeWriteDone implements granterWithStoreReplicatedWorkAdmitted. func (cg *kvStoreTokenChildGranter) storeWriteDone( originalTokens int64, doneInfo StoreWorkDoneInfo, ) (additionalTokens int64) { - return cg.parent.storeWriteDone(cg.workClass, originalTokens, doneInfo) + cg.parent.coord.mu.Lock() + defer cg.parent.coord.mu.Unlock() + // NB: the token/metric adjustments we want to make here are the same as we + // want to make through the storeReplicatedWorkAdmittedLocked, so we (ab)use it. + return cg.parent.storeReplicatedWorkAdmittedLocked( + cg.workClass, originalTokens, storeReplicatedWorkAdmittedInfo(doneInfo)) +} + +// storeReplicatedWorkAdmitted implements granterWithStoreReplicatedWorkAdmitted. +func (cg *kvStoreTokenChildGranter) storeReplicatedWorkAdmittedLocked( + originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo, +) (additionalTokens int64) { + return cg.parent.storeReplicatedWorkAdmittedLocked(cg.workClass, originalTokens, admittedInfo) } func (sg *kvStoreTokenGranter) tryGet(workClass admissionpb.WorkClass, count int64) bool { @@ -522,7 +534,7 @@ func (sg *kvStoreTokenGranter) getDiskTokensUsedAndReset() [admissionpb.NumWorkC } // setAdmittedModelsLocked implements granterWithIOTokens. -func (sg *kvStoreTokenGranter) setAdmittedDoneModels( +func (sg *kvStoreTokenGranter) setLinearModels( l0WriteLM tokensLinearModel, l0IngestLM tokensLinearModel, ingestLM tokensLinearModel, ) { sg.coord.mu.Lock() @@ -532,37 +544,21 @@ func (sg *kvStoreTokenGranter) setAdmittedDoneModels( sg.ingestLM = ingestLM } -// storeWriteDone implements granterWithStoreWriteDone. -func (sg *kvStoreTokenGranter) storeWriteDone( - wc admissionpb.WorkClass, originalTokens int64, doneInfo StoreWorkDoneInfo, +func (sg *kvStoreTokenGranter) storeReplicatedWorkAdmittedLocked( + wc admissionpb.WorkClass, originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo, ) (additionalTokens int64) { - // Normally, we follow the structure of a foo() method calling into a foo() - // method on the GrantCoordinator, which then calls fooLocked() on the - // kvStoreTokenGranter. For example, returnGrant follows this structure. - // This allows the GrantCoordinator to do two things (a) acquire the mu - // before calling into kvStoreTokenGranter, (b) do side-effects, like - // terminating grant chains and doing more grants after the call into the - // fooLocked() method. - // For storeWriteDone we don't bother with this structure involving the - // GrantCoordinator (which has served us well across various methods and - // various granter implementations), since the decision on when the - // GrantCoordinator should call tryGrantLocked is more complicated. And since this - // storeWriteDone is unique to the kvStoreTokenGranter (and not implemented - // by other granters) this approach seems acceptable. - // Reminder: coord.mu protects the state in the kvStoreTokenGranter. - sg.coord.mu.Lock() exhaustedFunc := func() bool { return sg.coordMu.availableIOTokens <= 0 || (wc == admissionpb.ElasticWorkClass && sg.coordMu.elasticDiskBWTokensAvailable <= 0) } wasExhausted := exhaustedFunc() - actualL0WriteTokens := sg.l0WriteLM.applyLinearModel(doneInfo.WriteBytes) - actualL0IngestTokens := sg.l0IngestLM.applyLinearModel(doneInfo.IngestedBytes) + actualL0WriteTokens := sg.l0WriteLM.applyLinearModel(admittedInfo.WriteBytes) + actualL0IngestTokens := sg.l0IngestLM.applyLinearModel(admittedInfo.IngestedBytes) actualL0Tokens := actualL0WriteTokens + actualL0IngestTokens additionalL0TokensNeeded := actualL0Tokens - originalTokens sg.subtractTokensLocked(additionalL0TokensNeeded, false) - actualIngestTokens := sg.ingestLM.applyLinearModel(doneInfo.IngestedBytes) + actualIngestTokens := sg.ingestLM.applyLinearModel(admittedInfo.IngestedBytes) additionalDiskBWTokensNeeded := (actualL0WriteTokens + actualIngestTokens) - originalTokens if wc == admissionpb.ElasticWorkClass { sg.coordMu.elasticDiskBWTokensAvailable -= additionalDiskBWTokensNeeded @@ -574,7 +570,6 @@ func (sg *kvStoreTokenGranter) storeWriteDone( sg.coord.tryGrantLocked() } } - sg.coord.mu.Unlock() // For multi-tenant fairness accounting, we choose to ignore disk bandwidth // tokens. Ideally, we'd have multiple resource dimensions for the fairness // decisions, but we don't necessarily need something more sophisticated diff --git a/pkg/util/admission/granter_test.go b/pkg/util/admission/granter_test.go index c085950b12ce..9fc6555dfdd9 100644 --- a/pkg/util/admission/granter_test.go +++ b/pkg/util/admission/granter_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/pebble" "github.com/stretchr/testify/require" @@ -97,7 +98,7 @@ func TestGranterBasic(t *testing.T) { return req } delayForGrantChainTermination = 0 - coords := NewGrantCoordinators(ambientCtx, settings, opts, registry) + coords := NewGrantCoordinators(ambientCtx, settings, opts, registry, &NoopOnLogEntryAdmitted{}) defer coords.Close() coord = coords.Regular return flushAndReset() @@ -109,8 +110,9 @@ func TestGranterBasic(t *testing.T) { storeCoordinators := &StoreGrantCoordinators{ settings: settings, makeStoreRequesterFunc: func( - ambientCtx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, + ambientCtx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted, settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, knobs *TestingKnobs, + _ OnLogEntryAdmitted, _ *syncutil.Mutex, ) storeRequester { makeTestRequester := func(wc admissionpb.WorkClass) *testRequester { req := &testRequester{ @@ -148,7 +150,7 @@ func TestGranterBasic(t *testing.T) { kvStoreGranter := coord.granters[KVWork].(*kvStoreTokenGranter) // Use the same model for all 3 kinds of models. tlm := tokensLinearModel{multiplier: 0.5, constant: 50} - kvStoreGranter.setAdmittedDoneModels(tlm, tlm, tlm) + kvStoreGranter.setLinearModels(tlm, tlm, tlm) return flushAndReset() case "set-has-waiting-requests": @@ -232,7 +234,7 @@ func TestGranterBasic(t *testing.T) { var origTokens, writeBytes int d.ScanArgs(t, "orig-tokens", &origTokens) d.ScanArgs(t, "write-bytes", &writeBytes) - requesters[scanWorkKind(t, d)].granter.(granterWithStoreWriteDone).storeWriteDone( + requesters[scanWorkKind(t, d)].granter.(granterWithStoreReplicatedWorkAdmitted).storeWriteDone( int64(origTokens), StoreWorkDoneInfo{WriteBytes: int64(writeBytes)}) coord.testingTryGrant() return flushAndReset() @@ -274,8 +276,8 @@ func TestStoreCoordinators(t *testing.T) { opts := Options{ makeRequesterFunc: makeRequesterFunc, makeStoreRequesterFunc: func( - ctx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, - settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, _ *TestingKnobs) storeRequester { + ctx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted, + settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, _ *TestingKnobs, _ OnLogEntryAdmitted, _ *syncutil.Mutex) storeRequester { reqReg := makeRequesterFunc(ctx, KVWork, granters[admissionpb.RegularWorkClass], settings, metrics, opts) reqElastic := makeRequesterFunc(ctx, KVWork, granters[admissionpb.ElasticWorkClass], settings, metrics, opts) str := &storeTestRequester{} @@ -286,7 +288,7 @@ func TestStoreCoordinators(t *testing.T) { return str }, } - coords := NewGrantCoordinators(ambientCtx, settings, opts, registry) + coords := NewGrantCoordinators(ambientCtx, settings, opts, registry, &NoopOnLogEntryAdmitted{}) // There is only 1 KVWork requester at this point in initialization, for the // Regular GrantCoordinator. require.Equal(t, 1, len(requesters)) diff --git a/pkg/util/admission/io_load_listener.go b/pkg/util/admission/io_load_listener.go index f7b65d20ca21..badb05edeca9 100644 --- a/pkg/util/admission/io_load_listener.go +++ b/pkg/util/admission/io_load_listener.go @@ -417,7 +417,7 @@ func (io *ioLoadListener) adjustTokens(ctx context.Context, metrics StoreMetrics requestEstimates := io.perWorkTokenEstimator.getStoreRequestEstimatesAtAdmission() io.kvRequester.setStoreRequestEstimates(requestEstimates) l0WriteLM, l0IngestLM, ingestLM := io.perWorkTokenEstimator.getModelsAtAdmittedDone() - io.kvGranter.setAdmittedDoneModels(l0WriteLM, l0IngestLM, ingestLM) + io.kvGranter.setLinearModels(l0WriteLM, l0IngestLM, ingestLM) if _, overloaded := io.ioThreshold.Score(); overloaded || io.aux.doLogFlush || io.elasticDiskBWTokens != unlimitedTokens { log.Infof(ctx, "IO overload: %s", io.adjustTokensResult) diff --git a/pkg/util/admission/io_load_listener_test.go b/pkg/util/admission/io_load_listener_test.go index b10507d9cd7d..2f2bfc0cc425 100644 --- a/pkg/util/admission/io_load_listener_test.go +++ b/pkg/util/admission/io_load_listener_test.go @@ -370,7 +370,7 @@ func (g *testGranterWithIOTokens) getDiskTokensUsedAndReset() [admissionpb.NumWo return g.diskBandwidthTokensUsed } -func (g *testGranterWithIOTokens) setAdmittedDoneModels( +func (g *testGranterWithIOTokens) setLinearModels( l0WriteLM tokensLinearModel, l0IngestLM tokensLinearModel, ingestLM tokensLinearModel, ) { fmt.Fprintf(&g.buf, "setAdmittedDoneModelsLocked: l0-write-lm: ") @@ -409,7 +409,7 @@ func (g *testGranterNonNegativeTokens) getDiskTokensUsedAndReset() [admissionpb. return [admissionpb.NumWorkClasses]int64{} } -func (g *testGranterNonNegativeTokens) setAdmittedDoneModels( +func (g *testGranterNonNegativeTokens) setLinearModels( l0WriteLM tokensLinearModel, l0IngestLM tokensLinearModel, ingestLM tokensLinearModel, ) { require.LessOrEqual(g.t, 0.5, l0WriteLM.multiplier) diff --git a/pkg/util/admission/replicated_write_admission_test.go b/pkg/util/admission/replicated_write_admission_test.go index d25adb9895bb..4d1e368922dd 100644 --- a/pkg/util/admission/replicated_write_admission_test.go +++ b/pkg/util/admission/replicated_write_admission_test.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/datadriven" @@ -112,14 +113,15 @@ func TestReplicatedWriteAdmission(t *testing.T) { printTrimmedBytes(originalTokens), rwi.RangeID, rwi.Origin, rwi.LogPosition, ingested) }, } + var mockCoordMu syncutil.Mutex storeWorkQueue = makeStoreWorkQueue( log.MakeTestingAmbientContext(tracing.NewTracer()), roachpb.StoreID(1), - [admissionpb.NumWorkClasses]granterWithStoreWriteDone{ + [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted{ tg[admissionpb.RegularWorkClass], tg[admissionpb.ElasticWorkClass], }, - st, metrics, opts, knobs, + st, metrics, opts, knobs, &NoopOnLogEntryAdmitted{}, &mockCoordMu, ).(*StoreWorkQueue) tg[admissionpb.RegularWorkClass].r = storeWorkQueue.getRequesters()[admissionpb.RegularWorkClass] tg[admissionpb.ElasticWorkClass].r = storeWorkQueue.getRequesters()[admissionpb.ElasticWorkClass] @@ -137,7 +139,7 @@ func TestReplicatedWriteAdmission(t *testing.T) { // Parse pri=. d.ScanArgs(t, "pri", &arg) - pri, found := reverseWorkPriorityDict[arg] + pri, found := admissionpb.TestingReverseWorkPriorityDict[arg] require.True(t, found) // Parse size=. @@ -369,15 +371,6 @@ func printWorkQueue(q *WorkQueue) string { // create-time= is relative to this time. var tzero = timeutil.Unix(0, 0) -var reverseWorkPriorityDict map[string]admissionpb.WorkPriority - -func init() { - reverseWorkPriorityDict = make(map[string]admissionpb.WorkPriority) - for k, v := range admissionpb.WorkPriorityDict { - reverseWorkPriorityDict[v] = k - } -} - type testReplicatedWriteGranter struct { t *testing.T wc admissionpb.WorkClass @@ -387,7 +380,7 @@ type testReplicatedWriteGranter struct { tokens int64 } -var _ granterWithStoreWriteDone = &testReplicatedWriteGranter{} +var _ granterWithStoreReplicatedWorkAdmitted = &testReplicatedWriteGranter{} func newTestReplicatedWriteGranter( t *testing.T, wc admissionpb.WorkClass, buf *builderWithMu, @@ -445,3 +438,10 @@ func (tg *testReplicatedWriteGranter) storeWriteDone( tg.tokens -= originalTokens return 0 } + +func (tg *testReplicatedWriteGranter) storeReplicatedWorkAdmittedLocked( + originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo, +) (additionalTokens int64) { + tg.tokens -= originalTokens + return 0 +} diff --git a/pkg/util/admission/store_token_estimation.go b/pkg/util/admission/store_token_estimation.go index 2e2eca842b8e..c1f92ff1b69f 100644 --- a/pkg/util/admission/store_token_estimation.go +++ b/pkg/util/admission/store_token_estimation.go @@ -12,6 +12,12 @@ package admission import "github.com/cockroachdb/pebble" +// TODO(irfansharif): This comment is a bit stale with replication admission +// control where admission is asynchronous. AC is informed of the write when +// it's being physically done, so we know its size then. We don't need upfront +// estimates anymore. The AdmittedWorkDone interface and surrounding types +// (StoreWorkDoneInfo for ex.) are no longer central. +// // The logic in this file deals with token estimation for a store write in two // situations: (a) at admission time, (b) when the admitted work is done. At // (a) we have no information provided about the work size (NB: this choice is diff --git a/pkg/util/admission/work_queue.go b/pkg/util/admission/work_queue.go index 07f035664a6e..c63ec70ffa59 100644 --- a/pkg/util/admission/work_queue.go +++ b/pkg/util/admission/work_queue.go @@ -633,6 +633,7 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err info.ReplicatedWorkInfo, info.RequestedCount, info.CreateTime, + false, /* coordMuLocked */ ) } return true, nil @@ -679,6 +680,7 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err } } } + // Check for cancellation. startTime := q.timeNow() if ctx.Err() != nil { @@ -861,6 +863,7 @@ func (q *WorkQueue) granted(grantChainID grantChainID) int64 { item.replicated, item.requestedCount, item.createTime, + true, /* coordMuLocked */ ) q.metrics.incAdmitted(item.priority) @@ -1672,7 +1675,7 @@ func (m *WorkQueueMetrics) getOrCreate(priority admissionpb.WorkPriority) workQu // necessary to call LoadOrStore here as this could be called concurrently. // It is not called the first Load so that we don't have to unnecessarily // create the metrics. - statPrefix := fmt.Sprintf("%v.%v", m.name, admissionpb.WorkPriorityDict[priority]) + statPrefix := fmt.Sprintf("%v.%v", m.name, priority.String()) val, ok = m.byPriority.LoadOrStore(priority, makeWorkQueueMetricsSingle(statPrefix)) if !ok { m.registry.AddMetricStruct(val) @@ -1766,9 +1769,10 @@ type StoreWriteWorkInfo struct { type StoreWorkQueue struct { storeID roachpb.StoreID q [admissionpb.NumWorkClasses]WorkQueue - // Only calls storeWriteDone. The rest of the interface is used by + // Only calls storeReplicatedWorkAdmittedLocked. The rest of the interface is used by // WorkQueue. - granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone + granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted + coordMu *syncutil.Mutex mu struct { syncutil.RWMutex // estimates is used to determine how many tokens are deducted at-admit @@ -1788,6 +1792,7 @@ type StoreWorkQueue struct { stopCh chan struct{} timeSource timeutil.TimeSource settings *cluster.Settings + onLogEntryAdmitted OnLogEntryAdmitted knobs *TestingKnobs } @@ -1883,6 +1888,20 @@ type StoreWorkDoneInfo struct { IngestedBytes int64 } +// storeReplicatedWorkAdmittedInfo provides information about the size of +// replicated work once it's admitted (which happens asynchronously from the +// work itself). This lets us use the underlying linear models for L0 +// {writes,ingests} to deduct an appropriate number of tokens from the granter, +// for the admitted work size. +// +// TODO(irfansharif): This post-admission adjustment of tokens is odd -- when +// the replicated work is being enqueued, we already know its size, so we could +// have applied the linear models upfront and determine what the right # of +// tokens to deduct all at once. We're doing it this way because we've written +// the WorkQueue and granter interactions to be very general, but it can be hard +// to follow. See review discussions over at #97599. +type storeReplicatedWorkAdmittedInfo StoreWorkDoneInfo + type onAdmittedReplicatedWork interface { admittedReplicatedWork( tenantID roachpb.TenantID, @@ -1890,7 +1909,10 @@ type onAdmittedReplicatedWork interface { rwi ReplicatedWorkInfo, requestedTokens int64, createTime int64, + coordMuLocked bool, ) + + // TODO(irfansharif): This coordMuLocked parameter is gross. } var _ onAdmittedReplicatedWork = &StoreWorkQueue{} @@ -1903,6 +1925,7 @@ func (q *StoreWorkQueue) admittedReplicatedWork( rwi ReplicatedWorkInfo, originalTokens int64, createTime int64, // only used in tests + coordMuLocked bool, ) { if !rwi.Enabled { panic("unexpected call to admittedReplicatedWork for work that's not a replicated write") @@ -1911,11 +1934,11 @@ func (q *StoreWorkQueue) admittedReplicatedWork( fn(tenantID, pri, rwi, originalTokens, createTime) } - var storeWorkDoneInfo StoreWorkDoneInfo + var replicatedWorkAdmittedInfo storeReplicatedWorkAdmittedInfo if rwi.Ingested { - storeWorkDoneInfo.IngestedBytes = originalTokens + replicatedWorkAdmittedInfo.IngestedBytes = originalTokens } else { - storeWorkDoneInfo.WriteBytes = originalTokens + replicatedWorkAdmittedInfo.WriteBytes = originalTokens } // We've already used RequestedCount for replicated writes to deduct tokens @@ -1925,19 +1948,47 @@ func (q *StoreWorkQueue) admittedReplicatedWork( // underlying linear models, and we may have under-deducted -- we account // for this below. wc := admissionpb.WorkClassFromPri(pri) - additionalTokensNeeded := q.granters[wc].storeWriteDone(originalTokens, storeWorkDoneInfo) + if !coordMuLocked { + q.coordMu.Lock() + } + additionalTokensNeeded := q.granters[wc].storeReplicatedWorkAdmittedLocked(originalTokens, replicatedWorkAdmittedInfo) + if !coordMuLocked { + q.coordMu.Unlock() + } q.q[wc].adjustTenantTokens(tenantID, additionalTokensNeeded) + // Inform callers of the entry we just admitted. + q.onLogEntryAdmitted.AdmittedLogEntry( + rwi.Origin, + pri, + q.storeID, + rwi.RangeID, + rwi.LogPosition, + ) +} - // TODO(irfansharif): Dispatch flow token returns here. We want to - // inform (a) the origin node of writes at (b) a given priority, to - // (c) the given range, at (d) the given log position on (e) the - // local store. Part of #95563. - // - _ = rwi.Origin // (a) - _ = pri // (b) - _ = rwi.RangeID // (c) - _ = rwi.LogPosition // (d) - _ = q.storeID // (e) +// OnLogEntryAdmitted is used to observe the specific entries (identified by +// rangeID + log position) that were admitted. Since admission control for log +// entries is asynchronous/non-blocking, this allows callers to do requisite +// post-admission bookkeeping. +type OnLogEntryAdmitted interface { + AdmittedLogEntry( + origin roachpb.NodeID, /* node where the entry originated */ + pri admissionpb.WorkPriority, /* admission priority of the entry */ + storeID roachpb.StoreID, /* store on which the entry was admitted */ + rangeID roachpb.RangeID, /* identifying range for the log entry */ + pos LogPosition, /* log position of the entry that was admitted*/ + ) +} + +// NoopOnLogEntryAdmitted is a no-op implementation of the OnLogEntryAdmitted +// interface. +type NoopOnLogEntryAdmitted struct{} + +var _ OnLogEntryAdmitted = &NoopOnLogEntryAdmitted{} + +func (n *NoopOnLogEntryAdmitted) AdmittedLogEntry( + roachpb.NodeID, admissionpb.WorkPriority, roachpb.StoreID, roachpb.RangeID, LogPosition, +) { } // AdmittedWorkDone indicates to the queue that the admitted work has completed. @@ -2026,11 +2077,13 @@ func (q *StoreWorkQueue) setStoreRequestEstimates(estimates storeRequestEstimate func makeStoreWorkQueue( ambientCtx log.AmbientContext, storeID roachpb.StoreID, - granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, + granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted, settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, knobs *TestingKnobs, + onLogEntryAdmitted OnLogEntryAdmitted, + coordMu *syncutil.Mutex, ) storeRequester { if knobs == nil { knobs = &TestingKnobs{} @@ -2039,12 +2092,14 @@ func makeStoreWorkQueue( opts.timeSource = timeutil.DefaultTimeSource{} } q := &StoreWorkQueue{ - storeID: storeID, - granters: granters, - knobs: knobs, - stopCh: make(chan struct{}), - timeSource: opts.timeSource, - settings: settings, + coordMu: coordMu, + storeID: storeID, + granters: granters, + knobs: knobs, + stopCh: make(chan struct{}), + timeSource: opts.timeSource, + settings: settings, + onLogEntryAdmitted: onLogEntryAdmitted, } opts.usesAsyncAdmit = true diff --git a/pkg/util/admission/work_queue_test.go b/pkg/util/admission/work_queue_test.go index f4902930be32..66da1387e973 100644 --- a/pkg/util/admission/work_queue_test.go +++ b/pkg/util/admission/work_queue_test.go @@ -67,7 +67,7 @@ type testGranter struct { additionalTokens int64 } -var _ granterWithStoreWriteDone = &testGranter{} +var _ granterWithStoreReplicatedWorkAdmitted = &testGranter{} func (tg *testGranter) grantKind() grantKind { return tg.gk @@ -110,6 +110,14 @@ func (tg *testGranter) storeWriteDone( return tg.additionalTokens } +func (tg *testGranter) storeReplicatedWorkAdmittedLocked( + originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo, +) (additionalTokens int64) { + tg.buf.printf("storeReplicatedWorkAdmittedLocked%s: originalTokens %d, admittedBytes(write %d,ingested %d) returning %d", + tg.name, originalTokens, admittedInfo.WriteBytes, admittedInfo.IngestedBytes, tg.additionalTokens) + return tg.additionalTokens +} + type testWork struct { tenantID roachpb.TenantID cancel context.CancelFunc @@ -522,9 +530,13 @@ func TestStoreWorkQueueBasic(t *testing.T) { opts.timeSource = timeutil.NewManualTime(timeutil.FromUnixMicros(0)) opts.disableEpochClosingGoroutine = true st = cluster.MakeTestingClusterSettings() + var mockCoordMu syncutil.Mutex q = makeStoreWorkQueue(log.MakeTestingAmbientContext(tracing.NewTracer()), roachpb.StoreID(1), - [admissionpb.NumWorkClasses]granterWithStoreWriteDone{tg[admissionpb.RegularWorkClass], tg[admissionpb.ElasticWorkClass]}, - st, metrics, opts, nil /* testing knobs */).(*StoreWorkQueue) + [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted{ + tg[admissionpb.RegularWorkClass], + tg[admissionpb.ElasticWorkClass], + }, + st, metrics, opts, nil /* testing knobs */, &NoopOnLogEntryAdmitted{}, &mockCoordMu).(*StoreWorkQueue) tg[admissionpb.RegularWorkClass].r = q.getRequesters()[admissionpb.RegularWorkClass] tg[admissionpb.ElasticWorkClass].r = q.getRequesters()[admissionpb.ElasticWorkClass] wrkMap.resetMap()