From cec84fb2195d050950eea215067145746dfdb131 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Thu, 6 Apr 2023 17:49:32 -0400 Subject: [PATCH] admission: add intercept points for when replicated work gets admitted In a subsequent commit, when integrating kvflowcontrol into the critical path for replication traffic, we'll set up the return of flow tokens from the receiver node back to the sender once log entries get (asynchronously) admitted[^1]. So we need to intercept the exact points at which the virtually enqueued work items get admitted, since it all happens asynchronously[^2]. To that end we introduce the following interface: // OnLogEntryAdmitted is used to observe the specific entries // (identified by rangeID + log position) that were admitted. Since // admission control for log entries is asynchronous/non-blocking, // this allows callers to do requisite post-admission // bookkeeping. type OnLogEntryAdmitted interface { AdmittedLogEntry( origin roachpb.NodeID, /* node where the entry originated */ pri admissionpb.WorkPriority, /* admission priority of the entry */ storeID roachpb.StoreID, /* store on which the entry was admitted */ rangeID roachpb.RangeID, /* identifying range for the log entry */ pos LogPosition, /* log position of the entry that was admitted*/ ) } For now we pass in a no-op implementation in production code, but this will change shortly. Seeing as how the asynchronous admit interface is going to be the primary once once we enable replication admission control by default, for IO control, we no longer need the storeWriteDone interfaces and corresponding types. It's being used by our current (and soon-to-be legacy) above-raft IO admission control to inform granters of when the write was actually done, post-admission. For above-raft IO control, at admit-time we do not have sizing info for the writes, so by intercepting these writes at write-done time we're able to make any outstanding token adjustments in the granter. To reflect this new world, we: - Rename setAdmittedDoneModels to setLinearModels. - Introduce a storeReplicatedWorkAdmittedInfo[^3]. It provides information about the size of replicated work once it's admitted (which happens asynchronously from the work itself). This lets us use the underlying linear models for L0 {writes,ingests} to deduct an appropriate number of tokens from the granter, for the admitted work size[^4]. - Rename the granterWithStoreWriteDone interface to granterWithStoreReplicatedWorkAdmitted. We'll still intercept the actual point of admission for some token adjustments, through the the storeReplicatedWorkAdmittedLocked API shown below. There are two callstacks through which this API gets invoked, one where the coord.mu is already held, and one where it isn't. We plumb this information through so the lock is acquired if not already held. The locking structure is unfortunate, but this was a minimally invasive diff. storeReplicatedWorkAdmittedLocked( originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo, ) (additionalTokens int64) While here, we also export an admission.TestingReverseWorkPriorityDict. There are at least three tests that have re-invented the wheel. [^1]: This will happen through the kvflowcontrol.Dispatch interface introduced back in #97766, after integrating it with the RaftTransport layer. [^2]: Introduced in #97599, for replicated write work. [^3]: Identical to the previous StoreWorkDoneInfo. [^4]: There's a peculiarity here in that at enqueuing-time we actually know the size of the write, so we could have deducted the right number of tokens upfront and avoid this post-admit granter token adjustment. We inherit this structure from earlier, and just leave a TODO for now. Release note: None --- .../kvflowdispatch/kvflowdispatch_test.go | 7 +- .../kvflowtokentracker/tracker_test.go | 15 +-- pkg/server/server.go | 2 +- pkg/util/admission/admission.go | 37 ++++--- pkg/util/admission/admissionpb/admissionpb.go | 13 ++- pkg/util/admission/elastic_cpu_work_handle.go | 5 +- pkg/util/admission/grant_coordinator.go | 28 ++++- pkg/util/admission/granter.go | 47 ++++---- pkg/util/admission/granter_test.go | 16 +-- pkg/util/admission/io_load_listener.go | 2 +- pkg/util/admission/io_load_listener_test.go | 4 +- .../replicated_write_admission_test.go | 26 ++--- pkg/util/admission/store_token_estimation.go | 6 + pkg/util/admission/work_queue.go | 103 ++++++++++++++---- pkg/util/admission/work_queue_test.go | 18 ++- 15 files changed, 210 insertions(+), 119 deletions(-) diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go index 5caf3f67343a..6610f902ba15 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go @@ -31,11 +31,6 @@ func TestDispatch(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - reverseWorkPriorityDict := make(map[string]admissionpb.WorkPriority) - for k, v := range admissionpb.WorkPriorityDict { - reverseWorkPriorityDict[v] = k - } - datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) { var dispatch *Dispatch datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { @@ -82,7 +77,7 @@ func TestDispatch(t *testing.T) { case strings.HasPrefix(parts[i], "pri="): // Parse pri=. - pri, found := reverseWorkPriorityDict[arg] + pri, found := admissionpb.TestingReverseWorkPriorityDict[arg] require.True(t, found) entries.AdmissionPriority = int32(pri) diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go index 858bda480797..7793501e0b75 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go @@ -32,19 +32,10 @@ func TestTracker(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - reverseWorkPriorityDict := make(map[string]admissionpb.WorkPriority) - for k, v := range admissionpb.WorkPriorityDict { - reverseWorkPriorityDict[v] = k - } - ctx := context.Background() datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) { var tracker *Tracker - knobs := &kvflowcontrol.TestingKnobs{ - UntrackTokensInterceptor: func(tokens kvflowcontrol.Tokens, pos kvflowcontrolpb.RaftLogPosition) { - - }, - } + knobs := &kvflowcontrol.TestingKnobs{} datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { case "init": @@ -73,7 +64,7 @@ func TestTracker(t *testing.T) { switch { case strings.HasPrefix(parts[i], "pri="): var found bool - pri, found = reverseWorkPriorityDict[arg] + pri, found = admissionpb.TestingReverseWorkPriorityDict[arg] require.True(t, found) case strings.HasPrefix(parts[i], "tokens="): @@ -103,7 +94,7 @@ func TestTracker(t *testing.T) { var priStr, logPositionStr string d.ScanArgs(t, "pri", &priStr) d.ScanArgs(t, "up-to-log-position", &logPositionStr) - pri, found := reverseWorkPriorityDict[priStr] + pri, found := admissionpb.TestingReverseWorkPriorityDict[priStr] require.True(t, found) logPosition := parseLogPosition(t, logPositionStr) diff --git a/pkg/server/server.go b/pkg/server/server.go index bd640e6d4940..797efa152566 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -269,7 +269,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { if opts, ok := cfg.TestingKnobs.AdmissionControl.(*admission.Options); ok { admissionOptions.Override(opts) } - gcoords := admission.NewGrantCoordinators(cfg.AmbientCtx, st, admissionOptions, registry) + gcoords := admission.NewGrantCoordinators(cfg.AmbientCtx, st, admissionOptions, registry, &admission.NoopOnLogEntryAdmitted{}) engines, err := cfg.CreateEngines(ctx) if err != nil { diff --git a/pkg/util/admission/admission.go b/pkg/util/admission/admission.go index e59d16927952..4e285c42ee3d 100644 --- a/pkg/util/admission/admission.go +++ b/pkg/util/admission/admission.go @@ -178,7 +178,7 @@ type granter interface { // is a possibility that that raced with cancellation. // // Do not use this for doing store IO-related token adjustments when work is - // done -- that should be done via granterWithStoreWriteDone.storeWriteDone. + // done -- that should be done via granterWithStoreReplicatedWorkAdmitted.storeWriteDone. // // REQUIRES: count > 0. count == 1 for slots. returnGrant(count int64) @@ -195,7 +195,7 @@ type granter interface { // work turned out to be an underestimate. // // Do not use this for doing store IO-related token adjustments when work is - // done -- that should be done via granterWithStoreWriteDone.storeWriteDone. + // done -- that should be done via granterWithStoreReplicatedWorkAdmitted.storeWriteDone. // // REQUIRES: count > 0. count == 1 for slots. tookWithoutPermission(count int64) @@ -274,23 +274,30 @@ type granterWithIOTokens interface { // getDiskTokensUsedAndReset returns the disk bandwidth tokens used // since the last such call. getDiskTokensUsedAndReset() [admissionpb.NumWorkClasses]int64 - // setAdmittedDoneModelsLocked supplies the models to use when - // storeWriteDone is called, to adjust token consumption. Note that these - // models are not used for token adjustment at admission time -- that is - // handled by StoreWorkQueue and is not in scope of this granter. This - // asymmetry is due to the need to use all the functionality of WorkQueue at - // admission time. See the long explanatory comment at the beginning of - // store_token_estimation.go, regarding token estimation. - setAdmittedDoneModels(l0WriteLM tokensLinearModel, l0IngestLM tokensLinearModel, - ingestLM tokensLinearModel) + // setLinearModels supplies the models to use when storeWriteDone or + // storeReplicatedWorkAdmittedLocked is called, to adjust token consumption. + // Note that these models are not used for token adjustment at admission + // time -- that is handled by StoreWorkQueue and is not in scope of this + // granter. This asymmetry is due to the need to use all the functionality + // of WorkQueue at admission time. See the long explanatory comment at the + // beginning of store_token_estimation.go, regarding token estimation. + setLinearModels(l0WriteLM, l0IngestLM, ingestLM tokensLinearModel) } -// granterWithStoreWriteDone is used to abstract kvStoreTokenGranter for -// testing. The interface is used by StoreWorkQueue to pass on sizing -// information provided when the work was completed. -type granterWithStoreWriteDone interface { +// granterWithStoreReplicatedWorkAdmitted is used to abstract +// kvStoreTokenGranter for testing. The interface is used by StoreWorkQueue to +// pass on sizing information provided when the work is either done (for legacy, +// above-raft IO admission) or admitted (for below-raft, asynchronous admission +// control. +type granterWithStoreReplicatedWorkAdmitted interface { granter + // storeWriteDone is used by legacy, above-raft IO admission control to + // inform granters of when the write was actually done, post-admission. At + // admit-time we did now have sizing info for these writes, so by + // intercepting these writes at admit time we're able to make any + // outstanding token adjustments in the granter. storeWriteDone(originalTokens int64, doneInfo StoreWorkDoneInfo) (additionalTokens int64) + storeReplicatedWorkAdmittedLocked(originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo) (additionalTokens int64) } // cpuOverloadIndicator is meant to be an instantaneous indicator of cpu diff --git a/pkg/util/admission/admissionpb/admissionpb.go b/pkg/util/admission/admissionpb/admissionpb.go index 57dc9911080f..c230a5d11c38 100644 --- a/pkg/util/admission/admissionpb/admissionpb.go +++ b/pkg/util/admission/admissionpb/admissionpb.go @@ -53,7 +53,7 @@ func (w WorkPriority) SafeFormat(p redact.SafePrinter, verb rune) { p.Print(s) return } - p.Printf("custom-pri=%d", w) + p.Printf("custom-pri=%d", int8(w)) } // WorkPriorityDict is a mapping of the priorities to a short string name. The @@ -69,6 +69,17 @@ var WorkPriorityDict = map[WorkPriority]string{ HighPri: "high-pri", } +// TestingReverseWorkPriorityDict is the reverse-lookup dictionary for +// WorkPriorityDict, for use in tests. +var TestingReverseWorkPriorityDict map[string]WorkPriority + +func init() { + TestingReverseWorkPriorityDict = make(map[string]WorkPriority) + for k, v := range WorkPriorityDict { + TestingReverseWorkPriorityDict[v] = k + } +} + // WorkClass represents the class of work, which is defined entirely by its // WorkPriority. Namely, everything less than NormalPri is defined to be // "Elastic", while everything above and including NormalPri is considered diff --git a/pkg/util/admission/elastic_cpu_work_handle.go b/pkg/util/admission/elastic_cpu_work_handle.go index 85c5561304b5..42e60594419c 100644 --- a/pkg/util/admission/elastic_cpu_work_handle.go +++ b/pkg/util/admission/elastic_cpu_work_handle.go @@ -151,9 +151,8 @@ func TestingNewElasticCPUHandle() *ElasticCPUWorkHandle { return newElasticCPUWorkHandle(420 * time.Hour) // use a very high allotment } -// TestingNewElasticCPUWithCallback constructs an -// ElascticCPUWorkHandle with a testing override for the behaviour of -// OverLimit(). +// TestingNewElasticCPUHandleWithCallback constructs an ElasticCPUWorkHandle +// with a testing override for the behaviour of OverLimit(). func TestingNewElasticCPUHandleWithCallback(cb func() (bool, time.Duration)) *ElasticCPUWorkHandle { h := TestingNewElasticCPUHandle() h.testingOverrideOverLimit = cb diff --git a/pkg/util/admission/grant_coordinator.go b/pkg/util/admission/grant_coordinator.go index edf2b24b28e9..582bc5d6c7d0 100644 --- a/pkg/util/admission/grant_coordinator.go +++ b/pkg/util/admission/grant_coordinator.go @@ -61,6 +61,7 @@ type StoreGrantCoordinators struct { // api. numStores int pebbleMetricsProvider PebbleMetricsProvider + onLogEntryAdmitted OnLogEntryAdmitted closeCh chan struct{} disableTickerForTesting bool @@ -157,7 +158,7 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID) // This is IO work, so override the usesTokens value. opts.usesTokens = true // TODO(sumeer): add per-store WorkQueue state for debug.zip and db console. - granters := [admissionpb.NumWorkClasses]granterWithStoreWriteDone{ + granters := [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted{ &kvStoreTokenChildGranter{ workClass: admissionpb.RegularWorkClass, parent: kvg, @@ -168,7 +169,17 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID) }, } - storeReq := sgc.makeStoreRequesterFunc(sgc.ambientCtx, storeID, granters, sgc.settings, sgc.workQueueMetrics, opts, nil) + storeReq := sgc.makeStoreRequesterFunc( + sgc.ambientCtx, + storeID, + granters, + sgc.settings, + sgc.workQueueMetrics, + opts, + nil, /* knobs */ + sgc.onLogEntryAdmitted, + &coord.mu.Mutex, + ) coord.queues[KVWork] = storeReq requesters := storeReq.getRequesters() kvg.regularRequester = requesters[admissionpb.RegularWorkClass] @@ -336,8 +347,9 @@ type makeRequesterFunc func( metrics *WorkQueueMetrics, opts workQueueOptions) requester type makeStoreRequesterFunc func( - _ log.AmbientContext, storeID roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, + _ log.AmbientContext, storeID roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted, settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, knobs *TestingKnobs, + onLogEntryAdmitted OnLogEntryAdmitted, coordMu *syncutil.Mutex, ) storeRequester // NewGrantCoordinators constructs GrantCoordinators and WorkQueues for a @@ -356,13 +368,17 @@ type makeStoreRequesterFunc func( // GrantCoordinators since they are not trying to control CPU usage, so we turn // off grant chaining in those coordinators. func NewGrantCoordinators( - ambientCtx log.AmbientContext, st *cluster.Settings, opts Options, registry *metric.Registry, + ambientCtx log.AmbientContext, + st *cluster.Settings, + opts Options, + registry *metric.Registry, + onLogEntryAdmitted OnLogEntryAdmitted, ) GrantCoordinators { metrics := makeGrantCoordinatorMetrics() registry.AddMetricStruct(metrics) return GrantCoordinators{ - Stores: makeStoresGrantCoordinators(ambientCtx, opts, st, metrics, registry), + Stores: makeStoresGrantCoordinators(ambientCtx, opts, st, metrics, registry, onLogEntryAdmitted), Regular: makeRegularGrantCoordinator(ambientCtx, opts, st, metrics, registry), Elastic: makeElasticGrantCoordinator(ambientCtx, st, registry), } @@ -399,6 +415,7 @@ func makeStoresGrantCoordinators( st *cluster.Settings, metrics GrantCoordinatorMetrics, registry *metric.Registry, + onLogEntryAdmitted OnLogEntryAdmitted, ) *StoreGrantCoordinators { // These metrics are shared across all stores and broken down by priority for // the common priorities. @@ -417,6 +434,7 @@ func makeStoresGrantCoordinators( makeStoreRequesterFunc: makeStoreRequester, kvIOTokensExhaustedDuration: metrics.KVIOTokensExhaustedDuration, workQueueMetrics: storeWorkQueueMetrics, + onLogEntryAdmitted: onLogEntryAdmitted, } return storeCoordinators } diff --git a/pkg/util/admission/granter.go b/pkg/util/admission/granter.go index 8787020d0886..a117e6141861 100644 --- a/pkg/util/admission/granter.go +++ b/pkg/util/admission/granter.go @@ -324,7 +324,7 @@ type kvStoreTokenChildGranter struct { parent *kvStoreTokenGranter } -var _ granterWithStoreWriteDone = &kvStoreTokenChildGranter{} +var _ granterWithStoreReplicatedWorkAdmitted = &kvStoreTokenChildGranter{} var _ granter = &kvStoreTokenChildGranter{} // grantKind implements granter. @@ -352,11 +352,23 @@ func (cg *kvStoreTokenChildGranter) continueGrantChain(grantChainID grantChainID // Ignore since grant chains are not used for store tokens. } -// storeWriteDone implements granterWithStoreWriteDone. +// storeWriteDone implements granterWithStoreReplicatedWorkAdmitted. func (cg *kvStoreTokenChildGranter) storeWriteDone( originalTokens int64, doneInfo StoreWorkDoneInfo, ) (additionalTokens int64) { - return cg.parent.storeWriteDone(cg.workClass, originalTokens, doneInfo) + cg.parent.coord.mu.Lock() + defer cg.parent.coord.mu.Unlock() + // NB: the token/metric adjustments we want to make here are the same as we + // want to make through the storeReplicatedWorkAdmittedLocked, so we (ab)use it. + return cg.parent.storeReplicatedWorkAdmittedLocked( + cg.workClass, originalTokens, storeReplicatedWorkAdmittedInfo(doneInfo)) +} + +// storeReplicatedWorkAdmitted implements granterWithStoreReplicatedWorkAdmitted. +func (cg *kvStoreTokenChildGranter) storeReplicatedWorkAdmittedLocked( + originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo, +) (additionalTokens int64) { + return cg.parent.storeReplicatedWorkAdmittedLocked(cg.workClass, originalTokens, admittedInfo) } func (sg *kvStoreTokenGranter) tryGet(workClass admissionpb.WorkClass, count int64) bool { @@ -522,7 +534,7 @@ func (sg *kvStoreTokenGranter) getDiskTokensUsedAndReset() [admissionpb.NumWorkC } // setAdmittedModelsLocked implements granterWithIOTokens. -func (sg *kvStoreTokenGranter) setAdmittedDoneModels( +func (sg *kvStoreTokenGranter) setLinearModels( l0WriteLM tokensLinearModel, l0IngestLM tokensLinearModel, ingestLM tokensLinearModel, ) { sg.coord.mu.Lock() @@ -532,37 +544,21 @@ func (sg *kvStoreTokenGranter) setAdmittedDoneModels( sg.ingestLM = ingestLM } -// storeWriteDone implements granterWithStoreWriteDone. -func (sg *kvStoreTokenGranter) storeWriteDone( - wc admissionpb.WorkClass, originalTokens int64, doneInfo StoreWorkDoneInfo, +func (sg *kvStoreTokenGranter) storeReplicatedWorkAdmittedLocked( + wc admissionpb.WorkClass, originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo, ) (additionalTokens int64) { - // Normally, we follow the structure of a foo() method calling into a foo() - // method on the GrantCoordinator, which then calls fooLocked() on the - // kvStoreTokenGranter. For example, returnGrant follows this structure. - // This allows the GrantCoordinator to do two things (a) acquire the mu - // before calling into kvStoreTokenGranter, (b) do side-effects, like - // terminating grant chains and doing more grants after the call into the - // fooLocked() method. - // For storeWriteDone we don't bother with this structure involving the - // GrantCoordinator (which has served us well across various methods and - // various granter implementations), since the decision on when the - // GrantCoordinator should call tryGrantLocked is more complicated. And since this - // storeWriteDone is unique to the kvStoreTokenGranter (and not implemented - // by other granters) this approach seems acceptable. - // Reminder: coord.mu protects the state in the kvStoreTokenGranter. - sg.coord.mu.Lock() exhaustedFunc := func() bool { return sg.coordMu.availableIOTokens <= 0 || (wc == admissionpb.ElasticWorkClass && sg.coordMu.elasticDiskBWTokensAvailable <= 0) } wasExhausted := exhaustedFunc() - actualL0WriteTokens := sg.l0WriteLM.applyLinearModel(doneInfo.WriteBytes) - actualL0IngestTokens := sg.l0IngestLM.applyLinearModel(doneInfo.IngestedBytes) + actualL0WriteTokens := sg.l0WriteLM.applyLinearModel(admittedInfo.WriteBytes) + actualL0IngestTokens := sg.l0IngestLM.applyLinearModel(admittedInfo.IngestedBytes) actualL0Tokens := actualL0WriteTokens + actualL0IngestTokens additionalL0TokensNeeded := actualL0Tokens - originalTokens sg.subtractTokensLocked(additionalL0TokensNeeded, false) - actualIngestTokens := sg.ingestLM.applyLinearModel(doneInfo.IngestedBytes) + actualIngestTokens := sg.ingestLM.applyLinearModel(admittedInfo.IngestedBytes) additionalDiskBWTokensNeeded := (actualL0WriteTokens + actualIngestTokens) - originalTokens if wc == admissionpb.ElasticWorkClass { sg.coordMu.elasticDiskBWTokensAvailable -= additionalDiskBWTokensNeeded @@ -574,7 +570,6 @@ func (sg *kvStoreTokenGranter) storeWriteDone( sg.coord.tryGrantLocked() } } - sg.coord.mu.Unlock() // For multi-tenant fairness accounting, we choose to ignore disk bandwidth // tokens. Ideally, we'd have multiple resource dimensions for the fairness // decisions, but we don't necessarily need something more sophisticated diff --git a/pkg/util/admission/granter_test.go b/pkg/util/admission/granter_test.go index c085950b12ce..9fc6555dfdd9 100644 --- a/pkg/util/admission/granter_test.go +++ b/pkg/util/admission/granter_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/pebble" "github.com/stretchr/testify/require" @@ -97,7 +98,7 @@ func TestGranterBasic(t *testing.T) { return req } delayForGrantChainTermination = 0 - coords := NewGrantCoordinators(ambientCtx, settings, opts, registry) + coords := NewGrantCoordinators(ambientCtx, settings, opts, registry, &NoopOnLogEntryAdmitted{}) defer coords.Close() coord = coords.Regular return flushAndReset() @@ -109,8 +110,9 @@ func TestGranterBasic(t *testing.T) { storeCoordinators := &StoreGrantCoordinators{ settings: settings, makeStoreRequesterFunc: func( - ambientCtx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, + ambientCtx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted, settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, knobs *TestingKnobs, + _ OnLogEntryAdmitted, _ *syncutil.Mutex, ) storeRequester { makeTestRequester := func(wc admissionpb.WorkClass) *testRequester { req := &testRequester{ @@ -148,7 +150,7 @@ func TestGranterBasic(t *testing.T) { kvStoreGranter := coord.granters[KVWork].(*kvStoreTokenGranter) // Use the same model for all 3 kinds of models. tlm := tokensLinearModel{multiplier: 0.5, constant: 50} - kvStoreGranter.setAdmittedDoneModels(tlm, tlm, tlm) + kvStoreGranter.setLinearModels(tlm, tlm, tlm) return flushAndReset() case "set-has-waiting-requests": @@ -232,7 +234,7 @@ func TestGranterBasic(t *testing.T) { var origTokens, writeBytes int d.ScanArgs(t, "orig-tokens", &origTokens) d.ScanArgs(t, "write-bytes", &writeBytes) - requesters[scanWorkKind(t, d)].granter.(granterWithStoreWriteDone).storeWriteDone( + requesters[scanWorkKind(t, d)].granter.(granterWithStoreReplicatedWorkAdmitted).storeWriteDone( int64(origTokens), StoreWorkDoneInfo{WriteBytes: int64(writeBytes)}) coord.testingTryGrant() return flushAndReset() @@ -274,8 +276,8 @@ func TestStoreCoordinators(t *testing.T) { opts := Options{ makeRequesterFunc: makeRequesterFunc, makeStoreRequesterFunc: func( - ctx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, - settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, _ *TestingKnobs) storeRequester { + ctx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted, + settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, _ *TestingKnobs, _ OnLogEntryAdmitted, _ *syncutil.Mutex) storeRequester { reqReg := makeRequesterFunc(ctx, KVWork, granters[admissionpb.RegularWorkClass], settings, metrics, opts) reqElastic := makeRequesterFunc(ctx, KVWork, granters[admissionpb.ElasticWorkClass], settings, metrics, opts) str := &storeTestRequester{} @@ -286,7 +288,7 @@ func TestStoreCoordinators(t *testing.T) { return str }, } - coords := NewGrantCoordinators(ambientCtx, settings, opts, registry) + coords := NewGrantCoordinators(ambientCtx, settings, opts, registry, &NoopOnLogEntryAdmitted{}) // There is only 1 KVWork requester at this point in initialization, for the // Regular GrantCoordinator. require.Equal(t, 1, len(requesters)) diff --git a/pkg/util/admission/io_load_listener.go b/pkg/util/admission/io_load_listener.go index f7b65d20ca21..badb05edeca9 100644 --- a/pkg/util/admission/io_load_listener.go +++ b/pkg/util/admission/io_load_listener.go @@ -417,7 +417,7 @@ func (io *ioLoadListener) adjustTokens(ctx context.Context, metrics StoreMetrics requestEstimates := io.perWorkTokenEstimator.getStoreRequestEstimatesAtAdmission() io.kvRequester.setStoreRequestEstimates(requestEstimates) l0WriteLM, l0IngestLM, ingestLM := io.perWorkTokenEstimator.getModelsAtAdmittedDone() - io.kvGranter.setAdmittedDoneModels(l0WriteLM, l0IngestLM, ingestLM) + io.kvGranter.setLinearModels(l0WriteLM, l0IngestLM, ingestLM) if _, overloaded := io.ioThreshold.Score(); overloaded || io.aux.doLogFlush || io.elasticDiskBWTokens != unlimitedTokens { log.Infof(ctx, "IO overload: %s", io.adjustTokensResult) diff --git a/pkg/util/admission/io_load_listener_test.go b/pkg/util/admission/io_load_listener_test.go index b10507d9cd7d..2f2bfc0cc425 100644 --- a/pkg/util/admission/io_load_listener_test.go +++ b/pkg/util/admission/io_load_listener_test.go @@ -370,7 +370,7 @@ func (g *testGranterWithIOTokens) getDiskTokensUsedAndReset() [admissionpb.NumWo return g.diskBandwidthTokensUsed } -func (g *testGranterWithIOTokens) setAdmittedDoneModels( +func (g *testGranterWithIOTokens) setLinearModels( l0WriteLM tokensLinearModel, l0IngestLM tokensLinearModel, ingestLM tokensLinearModel, ) { fmt.Fprintf(&g.buf, "setAdmittedDoneModelsLocked: l0-write-lm: ") @@ -409,7 +409,7 @@ func (g *testGranterNonNegativeTokens) getDiskTokensUsedAndReset() [admissionpb. return [admissionpb.NumWorkClasses]int64{} } -func (g *testGranterNonNegativeTokens) setAdmittedDoneModels( +func (g *testGranterNonNegativeTokens) setLinearModels( l0WriteLM tokensLinearModel, l0IngestLM tokensLinearModel, ingestLM tokensLinearModel, ) { require.LessOrEqual(g.t, 0.5, l0WriteLM.multiplier) diff --git a/pkg/util/admission/replicated_write_admission_test.go b/pkg/util/admission/replicated_write_admission_test.go index d25adb9895bb..4d1e368922dd 100644 --- a/pkg/util/admission/replicated_write_admission_test.go +++ b/pkg/util/admission/replicated_write_admission_test.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/datadriven" @@ -112,14 +113,15 @@ func TestReplicatedWriteAdmission(t *testing.T) { printTrimmedBytes(originalTokens), rwi.RangeID, rwi.Origin, rwi.LogPosition, ingested) }, } + var mockCoordMu syncutil.Mutex storeWorkQueue = makeStoreWorkQueue( log.MakeTestingAmbientContext(tracing.NewTracer()), roachpb.StoreID(1), - [admissionpb.NumWorkClasses]granterWithStoreWriteDone{ + [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted{ tg[admissionpb.RegularWorkClass], tg[admissionpb.ElasticWorkClass], }, - st, metrics, opts, knobs, + st, metrics, opts, knobs, &NoopOnLogEntryAdmitted{}, &mockCoordMu, ).(*StoreWorkQueue) tg[admissionpb.RegularWorkClass].r = storeWorkQueue.getRequesters()[admissionpb.RegularWorkClass] tg[admissionpb.ElasticWorkClass].r = storeWorkQueue.getRequesters()[admissionpb.ElasticWorkClass] @@ -137,7 +139,7 @@ func TestReplicatedWriteAdmission(t *testing.T) { // Parse pri=. d.ScanArgs(t, "pri", &arg) - pri, found := reverseWorkPriorityDict[arg] + pri, found := admissionpb.TestingReverseWorkPriorityDict[arg] require.True(t, found) // Parse size=. @@ -369,15 +371,6 @@ func printWorkQueue(q *WorkQueue) string { // create-time= is relative to this time. var tzero = timeutil.Unix(0, 0) -var reverseWorkPriorityDict map[string]admissionpb.WorkPriority - -func init() { - reverseWorkPriorityDict = make(map[string]admissionpb.WorkPriority) - for k, v := range admissionpb.WorkPriorityDict { - reverseWorkPriorityDict[v] = k - } -} - type testReplicatedWriteGranter struct { t *testing.T wc admissionpb.WorkClass @@ -387,7 +380,7 @@ type testReplicatedWriteGranter struct { tokens int64 } -var _ granterWithStoreWriteDone = &testReplicatedWriteGranter{} +var _ granterWithStoreReplicatedWorkAdmitted = &testReplicatedWriteGranter{} func newTestReplicatedWriteGranter( t *testing.T, wc admissionpb.WorkClass, buf *builderWithMu, @@ -445,3 +438,10 @@ func (tg *testReplicatedWriteGranter) storeWriteDone( tg.tokens -= originalTokens return 0 } + +func (tg *testReplicatedWriteGranter) storeReplicatedWorkAdmittedLocked( + originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo, +) (additionalTokens int64) { + tg.tokens -= originalTokens + return 0 +} diff --git a/pkg/util/admission/store_token_estimation.go b/pkg/util/admission/store_token_estimation.go index 2e2eca842b8e..c1f92ff1b69f 100644 --- a/pkg/util/admission/store_token_estimation.go +++ b/pkg/util/admission/store_token_estimation.go @@ -12,6 +12,12 @@ package admission import "github.com/cockroachdb/pebble" +// TODO(irfansharif): This comment is a bit stale with replication admission +// control where admission is asynchronous. AC is informed of the write when +// it's being physically done, so we know its size then. We don't need upfront +// estimates anymore. The AdmittedWorkDone interface and surrounding types +// (StoreWorkDoneInfo for ex.) are no longer central. +// // The logic in this file deals with token estimation for a store write in two // situations: (a) at admission time, (b) when the admitted work is done. At // (a) we have no information provided about the work size (NB: this choice is diff --git a/pkg/util/admission/work_queue.go b/pkg/util/admission/work_queue.go index 07f035664a6e..c63ec70ffa59 100644 --- a/pkg/util/admission/work_queue.go +++ b/pkg/util/admission/work_queue.go @@ -633,6 +633,7 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err info.ReplicatedWorkInfo, info.RequestedCount, info.CreateTime, + false, /* coordMuLocked */ ) } return true, nil @@ -679,6 +680,7 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err } } } + // Check for cancellation. startTime := q.timeNow() if ctx.Err() != nil { @@ -861,6 +863,7 @@ func (q *WorkQueue) granted(grantChainID grantChainID) int64 { item.replicated, item.requestedCount, item.createTime, + true, /* coordMuLocked */ ) q.metrics.incAdmitted(item.priority) @@ -1672,7 +1675,7 @@ func (m *WorkQueueMetrics) getOrCreate(priority admissionpb.WorkPriority) workQu // necessary to call LoadOrStore here as this could be called concurrently. // It is not called the first Load so that we don't have to unnecessarily // create the metrics. - statPrefix := fmt.Sprintf("%v.%v", m.name, admissionpb.WorkPriorityDict[priority]) + statPrefix := fmt.Sprintf("%v.%v", m.name, priority.String()) val, ok = m.byPriority.LoadOrStore(priority, makeWorkQueueMetricsSingle(statPrefix)) if !ok { m.registry.AddMetricStruct(val) @@ -1766,9 +1769,10 @@ type StoreWriteWorkInfo struct { type StoreWorkQueue struct { storeID roachpb.StoreID q [admissionpb.NumWorkClasses]WorkQueue - // Only calls storeWriteDone. The rest of the interface is used by + // Only calls storeReplicatedWorkAdmittedLocked. The rest of the interface is used by // WorkQueue. - granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone + granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted + coordMu *syncutil.Mutex mu struct { syncutil.RWMutex // estimates is used to determine how many tokens are deducted at-admit @@ -1788,6 +1792,7 @@ type StoreWorkQueue struct { stopCh chan struct{} timeSource timeutil.TimeSource settings *cluster.Settings + onLogEntryAdmitted OnLogEntryAdmitted knobs *TestingKnobs } @@ -1883,6 +1888,20 @@ type StoreWorkDoneInfo struct { IngestedBytes int64 } +// storeReplicatedWorkAdmittedInfo provides information about the size of +// replicated work once it's admitted (which happens asynchronously from the +// work itself). This lets us use the underlying linear models for L0 +// {writes,ingests} to deduct an appropriate number of tokens from the granter, +// for the admitted work size. +// +// TODO(irfansharif): This post-admission adjustment of tokens is odd -- when +// the replicated work is being enqueued, we already know its size, so we could +// have applied the linear models upfront and determine what the right # of +// tokens to deduct all at once. We're doing it this way because we've written +// the WorkQueue and granter interactions to be very general, but it can be hard +// to follow. See review discussions over at #97599. +type storeReplicatedWorkAdmittedInfo StoreWorkDoneInfo + type onAdmittedReplicatedWork interface { admittedReplicatedWork( tenantID roachpb.TenantID, @@ -1890,7 +1909,10 @@ type onAdmittedReplicatedWork interface { rwi ReplicatedWorkInfo, requestedTokens int64, createTime int64, + coordMuLocked bool, ) + + // TODO(irfansharif): This coordMuLocked parameter is gross. } var _ onAdmittedReplicatedWork = &StoreWorkQueue{} @@ -1903,6 +1925,7 @@ func (q *StoreWorkQueue) admittedReplicatedWork( rwi ReplicatedWorkInfo, originalTokens int64, createTime int64, // only used in tests + coordMuLocked bool, ) { if !rwi.Enabled { panic("unexpected call to admittedReplicatedWork for work that's not a replicated write") @@ -1911,11 +1934,11 @@ func (q *StoreWorkQueue) admittedReplicatedWork( fn(tenantID, pri, rwi, originalTokens, createTime) } - var storeWorkDoneInfo StoreWorkDoneInfo + var replicatedWorkAdmittedInfo storeReplicatedWorkAdmittedInfo if rwi.Ingested { - storeWorkDoneInfo.IngestedBytes = originalTokens + replicatedWorkAdmittedInfo.IngestedBytes = originalTokens } else { - storeWorkDoneInfo.WriteBytes = originalTokens + replicatedWorkAdmittedInfo.WriteBytes = originalTokens } // We've already used RequestedCount for replicated writes to deduct tokens @@ -1925,19 +1948,47 @@ func (q *StoreWorkQueue) admittedReplicatedWork( // underlying linear models, and we may have under-deducted -- we account // for this below. wc := admissionpb.WorkClassFromPri(pri) - additionalTokensNeeded := q.granters[wc].storeWriteDone(originalTokens, storeWorkDoneInfo) + if !coordMuLocked { + q.coordMu.Lock() + } + additionalTokensNeeded := q.granters[wc].storeReplicatedWorkAdmittedLocked(originalTokens, replicatedWorkAdmittedInfo) + if !coordMuLocked { + q.coordMu.Unlock() + } q.q[wc].adjustTenantTokens(tenantID, additionalTokensNeeded) + // Inform callers of the entry we just admitted. + q.onLogEntryAdmitted.AdmittedLogEntry( + rwi.Origin, + pri, + q.storeID, + rwi.RangeID, + rwi.LogPosition, + ) +} - // TODO(irfansharif): Dispatch flow token returns here. We want to - // inform (a) the origin node of writes at (b) a given priority, to - // (c) the given range, at (d) the given log position on (e) the - // local store. Part of #95563. - // - _ = rwi.Origin // (a) - _ = pri // (b) - _ = rwi.RangeID // (c) - _ = rwi.LogPosition // (d) - _ = q.storeID // (e) +// OnLogEntryAdmitted is used to observe the specific entries (identified by +// rangeID + log position) that were admitted. Since admission control for log +// entries is asynchronous/non-blocking, this allows callers to do requisite +// post-admission bookkeeping. +type OnLogEntryAdmitted interface { + AdmittedLogEntry( + origin roachpb.NodeID, /* node where the entry originated */ + pri admissionpb.WorkPriority, /* admission priority of the entry */ + storeID roachpb.StoreID, /* store on which the entry was admitted */ + rangeID roachpb.RangeID, /* identifying range for the log entry */ + pos LogPosition, /* log position of the entry that was admitted*/ + ) +} + +// NoopOnLogEntryAdmitted is a no-op implementation of the OnLogEntryAdmitted +// interface. +type NoopOnLogEntryAdmitted struct{} + +var _ OnLogEntryAdmitted = &NoopOnLogEntryAdmitted{} + +func (n *NoopOnLogEntryAdmitted) AdmittedLogEntry( + roachpb.NodeID, admissionpb.WorkPriority, roachpb.StoreID, roachpb.RangeID, LogPosition, +) { } // AdmittedWorkDone indicates to the queue that the admitted work has completed. @@ -2026,11 +2077,13 @@ func (q *StoreWorkQueue) setStoreRequestEstimates(estimates storeRequestEstimate func makeStoreWorkQueue( ambientCtx log.AmbientContext, storeID roachpb.StoreID, - granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, + granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted, settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, knobs *TestingKnobs, + onLogEntryAdmitted OnLogEntryAdmitted, + coordMu *syncutil.Mutex, ) storeRequester { if knobs == nil { knobs = &TestingKnobs{} @@ -2039,12 +2092,14 @@ func makeStoreWorkQueue( opts.timeSource = timeutil.DefaultTimeSource{} } q := &StoreWorkQueue{ - storeID: storeID, - granters: granters, - knobs: knobs, - stopCh: make(chan struct{}), - timeSource: opts.timeSource, - settings: settings, + coordMu: coordMu, + storeID: storeID, + granters: granters, + knobs: knobs, + stopCh: make(chan struct{}), + timeSource: opts.timeSource, + settings: settings, + onLogEntryAdmitted: onLogEntryAdmitted, } opts.usesAsyncAdmit = true diff --git a/pkg/util/admission/work_queue_test.go b/pkg/util/admission/work_queue_test.go index f4902930be32..66da1387e973 100644 --- a/pkg/util/admission/work_queue_test.go +++ b/pkg/util/admission/work_queue_test.go @@ -67,7 +67,7 @@ type testGranter struct { additionalTokens int64 } -var _ granterWithStoreWriteDone = &testGranter{} +var _ granterWithStoreReplicatedWorkAdmitted = &testGranter{} func (tg *testGranter) grantKind() grantKind { return tg.gk @@ -110,6 +110,14 @@ func (tg *testGranter) storeWriteDone( return tg.additionalTokens } +func (tg *testGranter) storeReplicatedWorkAdmittedLocked( + originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo, +) (additionalTokens int64) { + tg.buf.printf("storeReplicatedWorkAdmittedLocked%s: originalTokens %d, admittedBytes(write %d,ingested %d) returning %d", + tg.name, originalTokens, admittedInfo.WriteBytes, admittedInfo.IngestedBytes, tg.additionalTokens) + return tg.additionalTokens +} + type testWork struct { tenantID roachpb.TenantID cancel context.CancelFunc @@ -522,9 +530,13 @@ func TestStoreWorkQueueBasic(t *testing.T) { opts.timeSource = timeutil.NewManualTime(timeutil.FromUnixMicros(0)) opts.disableEpochClosingGoroutine = true st = cluster.MakeTestingClusterSettings() + var mockCoordMu syncutil.Mutex q = makeStoreWorkQueue(log.MakeTestingAmbientContext(tracing.NewTracer()), roachpb.StoreID(1), - [admissionpb.NumWorkClasses]granterWithStoreWriteDone{tg[admissionpb.RegularWorkClass], tg[admissionpb.ElasticWorkClass]}, - st, metrics, opts, nil /* testing knobs */).(*StoreWorkQueue) + [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted{ + tg[admissionpb.RegularWorkClass], + tg[admissionpb.ElasticWorkClass], + }, + st, metrics, opts, nil /* testing knobs */, &NoopOnLogEntryAdmitted{}, &mockCoordMu).(*StoreWorkQueue) tg[admissionpb.RegularWorkClass].r = q.getRequesters()[admissionpb.RegularWorkClass] tg[admissionpb.ElasticWorkClass].r = q.getRequesters()[admissionpb.ElasticWorkClass] wrkMap.resetMap()