diff --git a/pkg/kv/kvserver/kvadmission/BUILD.bazel b/pkg/kv/kvserver/kvadmission/BUILD.bazel index 189dfd08f6cf..c0446fd040ed 100644 --- a/pkg/kv/kvserver/kvadmission/BUILD.bazel +++ b/pkg/kv/kvserver/kvadmission/BUILD.bazel @@ -8,6 +8,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/kv/kvpb", + "//pkg/kv/kvserver/raftlog", "//pkg/roachpb", "//pkg/settings", "//pkg/settings/cluster", diff --git a/pkg/kv/kvserver/kvadmission/kvadmission.go b/pkg/kv/kvserver/kvadmission/kvadmission.go index d67e6b7d509a..def2d71c9507 100644 --- a/pkg/kv/kvserver/kvadmission/kvadmission.go +++ b/pkg/kv/kvserver/kvadmission/kvadmission.go @@ -19,6 +19,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -120,7 +121,7 @@ type Controller interface { FollowerStoreWriteBytes(roachpb.StoreID, FollowerStoreWriteBytes) // AdmitRaftEntry informs admission control of a raft log entry being // written to storage. - AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry) + AdmitRaftEntry(context.Context, roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry) } // TenantWeightProvider can be periodically asked to provide the tenant @@ -244,7 +245,7 @@ func (n *controllerImpl) AdmitKVWork( if err != nil { return Handle{}, err } - admissionEnabled = storeWorkHandle.AdmissionEnabled() + admissionEnabled = storeWorkHandle.UseAdmittedWorkDone() if admissionEnabled { defer func() { if retErr != nil { @@ -409,9 +410,63 @@ func (n *controllerImpl) FollowerStoreWriteBytes( // AdmitRaftEntry implements the Controller interface. func (n *controllerImpl) AdmitRaftEntry( - roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry, + ctx context.Context, + tenantID roachpb.TenantID, + storeID roachpb.StoreID, + rangeID roachpb.RangeID, + entry raftpb.Entry, ) { - panic("unimplemented") + typ, err := raftlog.EncodingOf(entry) + if err != nil { + log.Errorf(ctx, "unable to determine raft command encoding: %v", err) + return + } + if !typ.UsesAdmissionControl() { + return // nothing to do + } + meta, err := raftlog.DecodeRaftAdmissionMeta(entry.Data) + if err != nil { + log.Errorf(ctx, "unable to decode raft command admission data: %v", err) + return + } + + storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(storeID)) + if storeAdmissionQ == nil { + log.Errorf(ctx, "unable to find queue for store: %s", storeID) + return // nothing to do + } + + if len(entry.Data) == 0 { + log.Fatal(ctx, "found (unexpected) empty raft command for below-raft admission") + } + wi := admission.WorkInfo{ + TenantID: tenantID, + Priority: admissionpb.WorkPriority(meta.AdmissionPriority), + CreateTime: meta.AdmissionCreateTime, + BypassAdmission: false, + RequestedCount: int64(len(entry.Data)), + } + wi.ReplicatedWorkInfo = admission.ReplicatedWorkInfo{ + Enabled: true, + RangeID: rangeID, + Origin: meta.AdmissionOriginNode, + LogPosition: admission.LogPosition{ + Term: entry.Term, + Index: entry.Index, + }, + Ingested: typ.IsSideloaded(), + } + + handle, err := storeAdmissionQ.Admit(ctx, admission.StoreWriteWorkInfo{ + WorkInfo: wi, + }) + if err != nil { + log.Errorf(ctx, "error while admitting to store admission queue: %v", err) + return + } + if handle.UseAdmittedWorkDone() { + log.Fatalf(ctx, "unexpected handle.UseAdmittedWorkDone") + } } // FollowerStoreWriteBytes captures stats about writes done to a store by a diff --git a/pkg/kv/kvserver/kvflowcontrol/doc.go b/pkg/kv/kvserver/kvflowcontrol/doc.go index 60abec7421fe..2e12a6a478f5 100644 --- a/pkg/kv/kvserver/kvflowcontrol/doc.go +++ b/pkg/kv/kvserver/kvflowcontrol/doc.go @@ -327,6 +327,105 @@ package kvflowcontrol // it can transition into the mode described in I3a where we deduct/block for // flow tokens for subsequent quorum writes. // +// I12. How does this interact with epoch-LIFO? Or CreateTime ordering +// generally? +// - Background: Epoch-LIFO tries to reduce lower percentile admission queueing +// delay (at the expense of higher percentile delay) by switching between the +// standard CreateTime-based FIFO ordering to LIFO-within-an-epoch. Work +// submitted by a transaction with CreateTime T is slotted into the epoch +// number with time interval [E, E+100ms) where T is contained in this +// interval. The work will start executing after the epoch is closed, at +// ~E+100ms. This increases transaction latency by at most ~100ms. When the +// epoch closes, all nodes start executing the transactions in that epoch in +// LIFO order, and will implicitly have the same set of competing +// transactions[^10], a set that stays unchanged until the next epoch closes. +// And by the time the next epoch closes, and the current epoch's transactions +// are deprioritized, 100ms will have elapsed, which is selected to be long +// enough for most of these now-admitted to have finished their work. +// - We switch to LIFO-within-an-epoch once we start observing that the +// maximum queuing delay for work within a starts +// exceeding the ~100ms we'd add with epoch-LIFO. +// - The idea is we have bottleneck resources that cause delays without +// bound with if work keeps accumulating, and other kinds of bottlenecks +// where delays aren't increasing without bound. We're also relying on work +// bottlenecked on epoch-LIFO not being able to issue more work. +// - For below-raft work queue ordering, we effectively ignore CreateTime when +// ordering work. Within a given , admission takes +// place in raft log order (i.e. entries with lower terms get admitted first, +// or lower indexes within the same term). This simplifies token returns which +// happen by specifying a prefix up to which we want to release flow tokens +// for a given priority[^11]. +// - NB: Regarding "admission takes place in raft log order", we could +// implement this differently. We introduced log-position based ordering to +// simplify the implementation of token returns where we release tokens by +// specifying the log position up-to-which we want to release held +// tokens[^12]. But with additional tracking in the below-raft work queues, +// if we know that work W2 with log position L2 got admitted, and +// corresponded to F flow token deductions at the origin, and we also know +// that work W1 with log position L1 is currently queued, also corresponding +// to F flow token deductions at the origin, we could inform the origin node +// to return flow tokens up to L1 and still get what we want -- a return of +// F flow tokens when each work gets admitted. +// - We're effectively ignoring CreateTime since flow token deductions at the +// sender aren't "tied" to CreateTime in the way they're tied to the issuing +// tenant or the work class. So while we still want priority-based ordering +// to release regular flow tokens before elastic ones, releasing flow tokens +// for work with lower CreateTimes does not actually promote doing older +// work first since the physical work below-raft is already done before +// (asynchronous) admission, and the token returns don't unblock work from +// some given epoch. Below-raft ordering based on CreateTime is moot. +// - Note that for WorkQueue orderings, we have (i) fair sharing through +// tenantID+weight, (ii) strict prioritization, (iii) and sequencing of work +// within a , using CreateTime. For below-raft work +// queue ordering where we want to admit in roughly log position order, we +// then (ab)use the CreateTime sequencing by combining each work's true +// CreateTime with its log position[^13], to get a monotonic "sequence number" +// that tracks observed log positions. This sequence number is kept close to +// the maximum observed CreateTime within a replication stream, which also +// lets us generate cluster-wide FIFO ordering as follows. +// - We re-assign CreateTime in a way that, with a high probability, matches +// log position order. We can be forgetful about this tracking (for internal +// GC reasons) since at wt worst we might over-admit slightly. +// - To operate within cluster-wide FIFO ordering, we want to order by +// "true" CreateTime when comparing work across different ranges. Writes for +// a single range, as observed by a given store below-raft (follower or +// otherwise) travel along a single stream. Consider the case where a single +// store S3 receives replication traffic for two ranges R1 and R2, +// originating from two separate nodes N1 and N2. If N1 is issuing writes +// with strictly older CreateTimes, when returning flow tokens we should +// prefer N1. By keeping these sequence numbers close to "true" CreateTimes, +// we'd be favoring N1 without introducing bias for replication streams with +// shorter/longer raft logs[^12][^14]. +// - We could improve cluster-wide FIFO properties by introducing a +// WorkQueue-like data structure that simply orders by CreateTime when +// acquiring flow tokens above raft. +// - Could we then use this above-raft ordering to implement epoch-LIFO? +// Cluster-wide we want to admit work within a given epoch, which here +// entails issuing replication traffic for work slotted in a given epoch. +// - Fan-in effects to consider for epoch-LIFO, assuming below-raft orderings +// are as described above (i.e. log-position based). +// - When there's no epoch-LIFO happening, we have cluster-wide FIFO +// ordering within a pair as described above. +// - When epoch-LIFO is happening across all senders issuing replication +// traffic to a given receiver store, we're seeing work within the same +// epoch, but we'll be returning flow tokens to nodes issuing work with +// older CreateTimes. So defeating the LIFO in epoch-LIFO, but we're still +// completing work slotted into some given epoch. +// - When epoch-LIFO is happening only on a subset of the senders issuing +// replication traffic, we'll again be returning flow tokens to nodes +// issuing work with older CreateTimes. This is undefined. +// - It's strange that different nodes can admit work from "different +// epochs"[^10]. What are we to do below-raft, when deciding where to +// return flow tokens back to, since it's all for the same +// ? Maybe we need to pass down whether the work was +// admitted at the sender with LIFO/FIFO, and return flow tokens in +// {LIFO,FIFO} order across all nodes that issued {LIFO,FIFO} work? Or +// also pass down the enqueuing timestamp, so we have a good sense +// below-raft on whether this work is past the epoch expiration and +// should be deprioritized. +// - Because the fan-in effects of epoch-LIFO are not well understood (by this +// author at least), we just disable it below-raft. +// // --- // // [^1]: kvserverpb.RaftMessageRequest is the unit of what's sent @@ -373,6 +472,20 @@ package kvflowcontrol // machine application get significantly behind due to local scheduling // reasons by using the same goroutine to do both async raft log writes // and state machine application. +// [^10]: This relies on partially synchronized clocks without requiring +// explicit coordination. +// - Isn't the decision to start using epoch-LIFO a node-local one, based +// on node-local max queuing latency for a ? So what +// happens when work for a transaction is operating across multiple +// nodes, some using epoch-LIFO and some not? +// Is this why we use the max queuing latency as the trigger to switch +// into epoch-LIFO? All queued work for an epoch E across all nodes, if +// still queued after ~100ms, will trigger epoch-LIFO everywhere. +// [^11]: See the implementation for kvflowcontrol.Dispatch. +// [^12]: See UpToRaftLogPosition in AdmittedRaftLogEntries. +// [^13]: See the sequencer in pkg/util/admission. +// [^14]: See the high_create_time_low_position_different_range test case for +// TestReplicatedWriteAdmission. // // TODO(irfansharif): These descriptions are too high-level, imprecise and // possibly wrong. Fix that. After implementing these interfaces and integrating diff --git a/pkg/server/node.go b/pkg/server/node.go index f0df2e141f5c..2c05eed96d0c 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -919,7 +919,7 @@ func (n *Node) GetPebbleMetrics() []admission.StoreMetrics { diskStats = s } metrics = append(metrics, admission.StoreMetrics{ - StoreID: int32(store.StoreID()), + StoreID: store.StoreID(), Metrics: m.Metrics, WriteStallCount: m.WriteStallCount, DiskStats: diskStats}) diff --git a/pkg/util/admission/BUILD.bazel b/pkg/util/admission/BUILD.bazel index b288a3fe3b14..e2dbecf8603b 100644 --- a/pkg/util/admission/BUILD.bazel +++ b/pkg/util/admission/BUILD.bazel @@ -15,8 +15,10 @@ go_library( "kv_slot_adjuster.go", "pacer.go", "scheduler_latency_listener.go", + "sequencer.go", "sql_cpu_overload_indicator.go", "store_token_estimation.go", + "testing_knobs.go", "tokens_linear_model.go", "work_queue.go", ], @@ -53,7 +55,9 @@ go_test( "elastic_cpu_work_queue_test.go", "granter_test.go", "io_load_listener_test.go", + "replicated_write_admission_test.go", "scheduler_latency_listener_test.go", + "sequencer_test.go", "store_token_estimation_test.go", "tokens_linear_model_test.go", "work_queue_test.go", @@ -67,6 +71,7 @@ go_test( "//pkg/testutils/datapathutils", "//pkg/testutils/echotest", "//pkg/util/admission/admissionpb", + "//pkg/util/humanizeutil", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/metric", diff --git a/pkg/util/admission/admission.go b/pkg/util/admission/admission.go index ccbf4a2de2bc..e59d16927952 100644 --- a/pkg/util/admission/admission.go +++ b/pkg/util/admission/admission.go @@ -522,8 +522,9 @@ func workKindString(workKind WorkKind) string { // all of these when StoreWorkQueue.AdmittedWorkDone is called, so that these // cumulative values are mutually consistent. type storeAdmissionStats struct { - // Total requests that called AdmittedWorkDone or BypassedWorkDone. - admittedCount uint64 + // Total requests that called {Admitted,Bypassed}WorkDone, or in the case of + // replicated writes, the requests that called Admit. + workCount uint64 // Sum of StoreWorkDoneInfo.WriteBytes. // // TODO(sumeer): writeAccountedBytes and ingestedAccountedBytes are not @@ -548,7 +549,7 @@ type storeAdmissionStats struct { // (e.g. for logging). aux struct { // These bypassed numbers are already included in the corresponding - // {admittedCount, writeAccountedBytes, ingestedAccountedBytes}. + // {workCount, writeAccountedBytes, ingestedAccountedBytes}. bypassedCount uint64 writeBypassedAccountedBytes uint64 ingestedBypassedAccountedBytes uint64 diff --git a/pkg/util/admission/elastic_cpu_work_queue.go b/pkg/util/admission/elastic_cpu_work_queue.go index 04e05f2a5158..3efcc20c178a 100644 --- a/pkg/util/admission/elastic_cpu_work_queue.go +++ b/pkg/util/admission/elastic_cpu_work_queue.go @@ -81,7 +81,7 @@ func (e *ElasticCPUWorkQueue) Admit( if duration > MaxElasticCPUDuration { duration = MaxElasticCPUDuration } - info.requestedCount = duration.Nanoseconds() + info.RequestedCount = duration.Nanoseconds() enabled, err := e.workQueue.Admit(ctx, info) if err != nil { return nil, err diff --git a/pkg/util/admission/elastic_cpu_work_queue_test.go b/pkg/util/admission/elastic_cpu_work_queue_test.go index f044f984aeb1..291791b43c13 100644 --- a/pkg/util/admission/elastic_cpu_work_queue_test.go +++ b/pkg/util/admission/elastic_cpu_work_queue_test.go @@ -161,7 +161,7 @@ func (t *testElasticCPUInternalWorkQueue) Admit( _ context.Context, info WorkInfo, ) (enabled bool, err error) { if !t.disabled { - t.buf.WriteString(fmt.Sprintf("admitted=%s ", time.Duration(info.requestedCount))) + t.buf.WriteString(fmt.Sprintf("admitted=%s ", time.Duration(info.RequestedCount))) } return !t.disabled, nil } diff --git a/pkg/util/admission/grant_coordinator.go b/pkg/util/admission/grant_coordinator.go index 1a7a2005e60c..edf2b24b28e9 100644 --- a/pkg/util/admission/grant_coordinator.go +++ b/pkg/util/admission/grant_coordinator.go @@ -113,7 +113,7 @@ func (sgc *StoreGrantCoordinators) SetPebbleMetricsProvider( if unsafeGc, ok := sgc.gcMap.Load(int64(m.StoreID)); ok { gc := (*GrantCoordinator)(unsafeGc) gc.pebbleMetricsTick(ctx, m) - iotc.UpdateIOThreshold(roachpb.StoreID(m.StoreID), gc.ioLoadListener.ioThreshold) + iotc.UpdateIOThreshold(m.StoreID, gc.ioLoadListener.ioThreshold) } else { log.Warningf(ctx, "seeing metrics for unknown storeID %d", m.StoreID) @@ -135,7 +135,7 @@ func (sgc *StoreGrantCoordinators) SetPebbleMetricsProvider( }() } -func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID int32) *GrantCoordinator { +func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID) *GrantCoordinator { coord := &GrantCoordinator{ settings: sgc.settings, useGrantChains: false, @@ -168,7 +168,7 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID int32) *GrantCoo }, } - storeReq := sgc.makeStoreRequesterFunc(sgc.ambientCtx, granters, sgc.settings, sgc.workQueueMetrics, opts) + storeReq := sgc.makeStoreRequesterFunc(sgc.ambientCtx, storeID, granters, sgc.settings, sgc.workQueueMetrics, opts, nil) coord.queues[KVWork] = storeReq requesters := storeReq.getRequesters() kvg.regularRequester = requesters[admissionpb.RegularWorkClass] @@ -336,8 +336,9 @@ type makeRequesterFunc func( metrics *WorkQueueMetrics, opts workQueueOptions) requester type makeStoreRequesterFunc func( - _ log.AmbientContext, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, - settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions) storeRequester + _ log.AmbientContext, storeID roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, + settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, knobs *TestingKnobs, +) storeRequester // NewGrantCoordinators constructs GrantCoordinators and WorkQueues for a // regular cluster node. Caller is responsible for: diff --git a/pkg/util/admission/granter.go b/pkg/util/admission/granter.go index a14a2fb9e63d..8787020d0886 100644 --- a/pkg/util/admission/granter.go +++ b/pkg/util/admission/granter.go @@ -594,7 +594,7 @@ type IOThresholdConsumer interface { // StoreMetrics are the metrics for a store. type StoreMetrics struct { - StoreID int32 + StoreID roachpb.StoreID *pebble.Metrics WriteStallCount int64 // Optional. diff --git a/pkg/util/admission/granter_test.go b/pkg/util/admission/granter_test.go index e5635fa2c3f1..c085950b12ce 100644 --- a/pkg/util/admission/granter_test.go +++ b/pkg/util/admission/granter_test.go @@ -109,8 +109,9 @@ func TestGranterBasic(t *testing.T) { storeCoordinators := &StoreGrantCoordinators{ settings: settings, makeStoreRequesterFunc: func( - ambientCtx log.AmbientContext, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, - settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions) storeRequester { + ambientCtx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, + settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, knobs *TestingKnobs, + ) storeRequester { makeTestRequester := func(wc admissionpb.WorkClass) *testRequester { req := &testRequester{ workKind: KVWork, @@ -273,8 +274,8 @@ func TestStoreCoordinators(t *testing.T) { opts := Options{ makeRequesterFunc: makeRequesterFunc, makeStoreRequesterFunc: func( - ctx log.AmbientContext, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, - settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions) storeRequester { + ctx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, + settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, _ *TestingKnobs) storeRequester { reqReg := makeRequesterFunc(ctx, KVWork, granters[admissionpb.RegularWorkClass], settings, metrics, opts) reqElastic := makeRequesterFunc(ctx, KVWork, granters[admissionpb.ElasticWorkClass], settings, metrics, opts) str := &storeTestRequester{} @@ -444,7 +445,7 @@ func (m *testMetricsProvider) setMetricsForStores(stores []int32, metrics pebble m.metrics = m.metrics[:0] for _, s := range stores { m.metrics = append(m.metrics, StoreMetrics{ - StoreID: s, + StoreID: roachpb.StoreID(s), Metrics: &metrics, }) } diff --git a/pkg/util/admission/io_load_listener.go b/pkg/util/admission/io_load_listener.go index 0c39b22c339c..f7b65d20ca21 100644 --- a/pkg/util/admission/io_load_listener.go +++ b/pkg/util/admission/io_load_listener.go @@ -15,6 +15,7 @@ import ( "math" "time" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" @@ -151,7 +152,7 @@ const l0SubLevelCountOverloadThreshold = 20 // storeWriteDone), the tokens are adjusted differently for the // flush/compaction L0 tokens and for the "disk bandwidth" tokens. type ioLoadListener struct { - storeID int32 + storeID roachpb.StoreID settings *cluster.Settings kvRequester storeRequester kvGranter granterWithIOTokens diff --git a/pkg/util/admission/io_load_listener_test.go b/pkg/util/admission/io_load_listener_test.go index 44a98b616bc0..b10507d9cd7d 100644 --- a/pkg/util/admission/io_load_listener_test.go +++ b/pkg/util/admission/io_load_listener_test.go @@ -67,11 +67,11 @@ func TestIOLoadListener(t *testing.T) { case "prep-admission-stats": req.stats = storeAdmissionStats{ - admittedCount: 0, + workCount: 0, writeAccountedBytes: 0, ingestedAccountedBytes: 0, } - d.ScanArgs(t, "admitted", &req.stats.admittedCount) + d.ScanArgs(t, "admitted", &req.stats.workCount) if d.HasArg("write-bytes") { d.ScanArgs(t, "write-bytes", &req.stats.writeAccountedBytes) } @@ -290,7 +290,7 @@ func TestBadIOLoadListenerStats(t *testing.T) { d.BytesRead = rand.Uint64() d.BytesWritten = rand.Uint64() d.ProvisionedBandwidth = 1 << 20 - req.stats.admittedCount = rand.Uint64() + req.stats.workCount = rand.Uint64() req.stats.writeAccountedBytes = rand.Uint64() req.stats.ingestedAccountedBytes = rand.Uint64() req.stats.statsToIgnore.Bytes = rand.Uint64() diff --git a/pkg/util/admission/replicated_write_admission_test.go b/pkg/util/admission/replicated_write_admission_test.go new file mode 100644 index 000000000000..d25adb9895bb --- /dev/null +++ b/pkg/util/admission/replicated_write_admission_test.go @@ -0,0 +1,447 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package admission + +import ( + "context" + "fmt" + "sort" + "strconv" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/datadriven" + "github.com/stretchr/testify/require" +) + +// TestReplicatedWriteAdmission is a data-driven test for admission control of +// replicated writes. We provide the following syntax: +// +// - "init" +// Initializes the store work queues and test granters with 0B of +// {regular,elastic} tokens. +// +// - "admit" tenant=t pri= create-time= \ +// size= range=r log-position=/ origin=n \ +// [ingested=] +// Admit a replicated write request from the given tenant, of the given +// priority/size/create-time, writing to the given log position for the +// specified raft group. Also specified is the node where this request +// originated and whether it was ingested (i.e. as sstables). +// +// - "granter" [class={regular,elastic}] adjust-tokens={-,+} +// Adjust the available {regular,elastic} tokens. If no class is specified, +// the adjustment applies across both work classes. +// +// - "grant" [class={regular,elastic}] +// Grant waiting requests until tokens are depleted or there are no more +// requests waiting. If no class is specified, we grant admission across +// both classes. +// +// - "tenant-weights" [t=]... +// Set weights for each tenant. +// +// - "print" +// Pretty-print work queue internal state (waiting requests, consumed tokens +// per-tenant, physical admission statistics, tenant weights, etc). +func TestReplicatedWriteAdmission(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + dir := datapathutils.TestDataPath(t, "replicated_write_admission") + datadriven.Walk(t, dir, func(t *testing.T, path string) { + var ( + storeWorkQueue *StoreWorkQueue + buf builderWithMu + tg [admissionpb.NumWorkClasses]*testReplicatedWriteGranter + ) + defer func() { + if storeWorkQueue != nil { + storeWorkQueue.close() + } + }() + + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "init": + tg = [admissionpb.NumWorkClasses]*testReplicatedWriteGranter{ + admissionpb.RegularWorkClass: newTestReplicatedWriteGranter(t, admissionpb.RegularWorkClass, &buf), + admissionpb.ElasticWorkClass: newTestReplicatedWriteGranter(t, admissionpb.ElasticWorkClass, &buf), + } + registry := metric.NewRegistry() + metrics := makeWorkQueueMetrics("", registry) + opts := makeWorkQueueOptions(KVWork) + opts.usesTokens = true + opts.timeSource = timeutil.NewManualTime(tzero) + opts.disableEpochClosingGoroutine = true + knobs := &TestingKnobs{ + AdmittedReplicatedWorkInterceptor: func( + tenantID roachpb.TenantID, + pri admissionpb.WorkPriority, + rwi ReplicatedWorkInfo, + originalTokens int64, + createTime int64, + ) { + ingested := "" + if rwi.Ingested { + ingested = " ingested" + } + buf.printf("admitted [tenant=t%d pri=%s create-time=%s size=%s range=r%s origin=n%s log-position=%s%s]", + tenantID.ToUint64(), pri, timeutil.FromUnixNanos(createTime).Sub(tzero), + printTrimmedBytes(originalTokens), rwi.RangeID, rwi.Origin, rwi.LogPosition, ingested) + }, + } + storeWorkQueue = makeStoreWorkQueue( + log.MakeTestingAmbientContext(tracing.NewTracer()), + roachpb.StoreID(1), + [admissionpb.NumWorkClasses]granterWithStoreWriteDone{ + tg[admissionpb.RegularWorkClass], + tg[admissionpb.ElasticWorkClass], + }, + st, metrics, opts, knobs, + ).(*StoreWorkQueue) + tg[admissionpb.RegularWorkClass].r = storeWorkQueue.getRequesters()[admissionpb.RegularWorkClass] + tg[admissionpb.ElasticWorkClass].r = storeWorkQueue.getRequesters()[admissionpb.ElasticWorkClass] + return printTestGranterTokens(tg) + + case "admit": + require.NotNilf(t, storeWorkQueue, "uninitialized store work queue (did you use 'init'?)") + var arg string + + // Parse tenant=t. + d.ScanArgs(t, "tenant", &arg) + ti, err := strconv.Atoi(strings.TrimPrefix(arg, "t")) + require.NoError(t, err) + tenantID := roachpb.MustMakeTenantID(uint64(ti)) + + // Parse pri=. + d.ScanArgs(t, "pri", &arg) + pri, found := reverseWorkPriorityDict[arg] + require.True(t, found) + + // Parse size=. + d.ScanArgs(t, "size", &arg) + bytes, err := humanizeutil.ParseBytes(arg) + require.NoError(t, err) + + // Parse range=r. + d.ScanArgs(t, "range", &arg) + ri, err := strconv.Atoi(strings.TrimPrefix(arg, "r")) + require.NoError(t, err) + rangeID := roachpb.RangeID(ri) + + // Parse origin=n. + d.ScanArgs(t, "origin", &arg) + ni, err := strconv.Atoi(strings.TrimPrefix(arg, "n")) + require.NoError(t, err) + nodeID := roachpb.NodeID(ni) + + // Parse log-position=/. + logPosition := parseLogPosition(t, d) + + // Parse create-time=. + d.ScanArgs(t, "create-time", &arg) + dur, err := time.ParseDuration(arg) + require.NoError(t, err) + createTime := tzero.Add(dur) + + // Parse ingested=. + var ingested bool + if d.HasArg("ingested") { + d.ScanArgs(t, "ingested", &arg) + ingested, err = strconv.ParseBool(arg) + require.NoError(t, err) + } + + info := StoreWriteWorkInfo{ + WorkInfo: WorkInfo{ + TenantID: tenantID, + Priority: pri, + CreateTime: createTime.UnixNano(), + RequestedCount: bytes, + ReplicatedWorkInfo: ReplicatedWorkInfo{ + Enabled: true, + RangeID: rangeID, + Origin: nodeID, + LogPosition: logPosition, + Ingested: ingested, + }, + }, + } + + handle, err := storeWorkQueue.Admit(ctx, info) + require.NoError(t, err) + require.False(t, handle.UseAdmittedWorkDone()) + return buf.stringAndReset() + + case "granter": + // Parse adjust-tokens={+,-}. + var arg string + d.ScanArgs(t, "adjust-tokens", &arg) + isPositive := strings.Contains(arg, "+") + arg = strings.TrimPrefix(arg, "+") + arg = strings.TrimPrefix(arg, "-") + delta, err := humanizeutil.ParseBytes(arg) + require.NoError(t, err) + if !isPositive { + delta = -delta + } + + var wcs []admissionpb.WorkClass + if d.HasArg("class") { + wcs = append(wcs, parseWorkClass(t, d)) + } else { + wcs = append(wcs, + admissionpb.RegularWorkClass, + admissionpb.ElasticWorkClass) + } + for _, wc := range wcs { + tg[wc].tokens += delta + } + return printTestGranterTokens(tg) + + case "tenant-weights": + weightMap := make(map[uint64]uint32) + for _, cmdArg := range d.CmdArgs { + tenantID, err := strconv.Atoi(strings.TrimPrefix(cmdArg.Key, "t")) + require.NoError(t, err) + weight, err := strconv.Atoi(cmdArg.Vals[0]) + require.NoError(t, err) + weightMap[uint64(tenantID)] = uint32(weight) + } + storeWorkQueue.SetTenantWeights(weightMap) + return "" + + case "grant": + var wcs []admissionpb.WorkClass + if d.HasArg("class") { + wcs = append(wcs, parseWorkClass(t, d)) + } else { + wcs = append(wcs, + admissionpb.RegularWorkClass, + admissionpb.ElasticWorkClass) + } + for _, wc := range wcs { + tg[wc].grant() + } + return buf.stringAndReset() + + case "print": + storeWorkQueue.mu.Lock() + defer storeWorkQueue.mu.Unlock() + return fmt.Sprintf("physical-stats: work-count=%d written-bytes=%s ingested-bytes=%s\n[regular work queue]: %s\n[elastic work queue]: %s\n", + storeWorkQueue.mu.stats.workCount, + printTrimmedBytes(int64(storeWorkQueue.mu.stats.writeAccountedBytes)), + printTrimmedBytes(int64(storeWorkQueue.mu.stats.ingestedAccountedBytes)), + printWorkQueue(&storeWorkQueue.q[admissionpb.RegularWorkClass]), + printWorkQueue(&storeWorkQueue.q[admissionpb.ElasticWorkClass]), + ) + + default: + return fmt.Sprintf("unknown command: %s", d.Cmd) + } + }) + }) +} + +func parseLogPosition(t *testing.T, d *datadriven.TestData) LogPosition { + // Parse log-position=/. + var arg string + d.ScanArgs(t, "log-position", &arg) + inner := strings.Split(arg, "/") + require.Len(t, inner, 2) + term, err := strconv.Atoi(inner[0]) + require.NoError(t, err) + index, err := strconv.Atoi(inner[1]) + require.NoError(t, err) + return LogPosition{ + Term: uint64(term), + Index: uint64(index), + } +} + +func parseWorkClass(t *testing.T, d *datadriven.TestData) admissionpb.WorkClass { + // Parse class={regular,elastic}. + var arg string + d.ScanArgs(t, "class", &arg) + var wc admissionpb.WorkClass + switch arg { + case "regular": + wc = admissionpb.RegularWorkClass + case "elastic": + wc = admissionpb.ElasticWorkClass + default: + t.Fatalf("unexpected class: %s", arg) + } + return wc +} + +func printTrimmedBytes(bytes int64) string { + return strings.ReplaceAll(string(humanizeutil.IBytes(bytes)), " ", "") +} + +func printTestGranterTokens(tg [admissionpb.NumWorkClasses]*testReplicatedWriteGranter) string { + var buf strings.Builder + for i, wc := range []admissionpb.WorkClass{ + admissionpb.RegularWorkClass, + admissionpb.ElasticWorkClass, + } { + if i != 0 { + buf.WriteString("\n") + } + buf.WriteString(fmt.Sprintf("[%s] %s tokens available", wc, printTrimmedBytes(tg[wc].tokens))) + } + return buf.String() +} + +func printWorkQueue(q *WorkQueue) string { + var buf strings.Builder + q.mu.Lock() + defer q.mu.Unlock() + buf.WriteString(fmt.Sprintf("len(tenant-heap)=%d", len(q.mu.tenantHeap))) + if len(q.mu.tenantHeap) > 0 { + buf.WriteString(fmt.Sprintf(" top-tenant=t%d", q.mu.tenantHeap[0].id)) + } + var ids []uint64 + for id := range q.mu.tenants { + ids = append(ids, id) + } + sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] }) + for _, id := range ids { + tenant := q.mu.tenants[id] + buf.WriteString(fmt.Sprintf("\n tenant=t%d weight=%d fifo-threshold=%s used=%s", + tenant.id, + tenant.weight, + admissionpb.WorkPriority(tenant.fifoPriorityThreshold), + printTrimmedBytes(int64(tenant.used)), + )) + if len(tenant.waitingWorkHeap) > 0 { + buf.WriteString("\n") + + for i := range tenant.waitingWorkHeap { + w := tenant.waitingWorkHeap[i] + if i != 0 { + buf.WriteString("\n") + } + + ingested := "" + if w.replicated.Ingested { + ingested = " ingested " + } + + buf.WriteString(fmt.Sprintf(" [%d: pri=%s create-time=%s size=%s range=r%d origin=n%d log-position=%s%s]", i, + w.priority, + timeutil.FromUnixNanos(w.createTime).Sub(tzero), + printTrimmedBytes(w.requestedCount), + w.replicated.RangeID, + w.replicated.Origin, + w.replicated.LogPosition, + ingested, + )) + } + } + } + return buf.String() +} + +// tzero represents the t=0, the earliest possible time. All other +// create-time= is relative to this time. +var tzero = timeutil.Unix(0, 0) + +var reverseWorkPriorityDict map[string]admissionpb.WorkPriority + +func init() { + reverseWorkPriorityDict = make(map[string]admissionpb.WorkPriority) + for k, v := range admissionpb.WorkPriorityDict { + reverseWorkPriorityDict[v] = k + } +} + +type testReplicatedWriteGranter struct { + t *testing.T + wc admissionpb.WorkClass + buf *builderWithMu + r requester + + tokens int64 +} + +var _ granterWithStoreWriteDone = &testReplicatedWriteGranter{} + +func newTestReplicatedWriteGranter( + t *testing.T, wc admissionpb.WorkClass, buf *builderWithMu, +) *testReplicatedWriteGranter { + return &testReplicatedWriteGranter{ + t: t, + wc: wc, + buf: buf, + } +} +func (tg *testReplicatedWriteGranter) grantKind() grantKind { + return token +} + +func (tg *testReplicatedWriteGranter) tryGet(count int64) bool { + if count > tg.tokens { + tg.buf.printf("[%s] try-get=%s available=%s => insufficient tokens", + tg.wc, printTrimmedBytes(count), printTrimmedBytes(tg.tokens)) + return false + } + + tg.buf.printf("[%s] try-get=%s available=%s => sufficient tokens", + tg.wc, printTrimmedBytes(count), printTrimmedBytes(tg.tokens)) + tg.tokens -= count + return true +} + +func (tg *testReplicatedWriteGranter) returnGrant(count int64) { + tg.t.Fatalf("unimplemented") +} + +func (tg *testReplicatedWriteGranter) tookWithoutPermission(count int64) { + tg.t.Fatalf("unimplemented") +} + +func (tg *testReplicatedWriteGranter) continueGrantChain(grantChainID grantChainID) { + tg.t.Fatalf("unimplemented") +} + +func (tg *testReplicatedWriteGranter) grant() { + for { + if tg.tokens <= 0 { + return // nothing left to do + } + if !tg.r.hasWaitingRequests() { + return // nothing left to do + } + _ = tg.r.granted(0 /* unused */) + } +} + +func (tg *testReplicatedWriteGranter) storeWriteDone( + originalTokens int64, doneInfo StoreWorkDoneInfo, +) (additionalTokens int64) { + tg.tokens -= originalTokens + return 0 +} diff --git a/pkg/util/admission/sequencer.go b/pkg/util/admission/sequencer.go new file mode 100644 index 000000000000..cbcdaccff910 --- /dev/null +++ b/pkg/util/admission/sequencer.go @@ -0,0 +1,71 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package admission + +// sequencer issues monotonic sequence numbers derived from observed +// tuples. The sequence numbers are roughly +// monotonic with respect to log positions -- if log positions advance, so do +// the issued sequence numbers. The sequence numbers are also kept close to the +// maximum observed create time (similar to an HLC). +// +// This is a purpose-built data structure for below-raft admission control where +// we want to assign each AC-queued work a "sequence number" for FIFO ordering +// within a . It's not safe for concurrent access. +type sequencer struct { + // maxCreateTime ratchets to the highest observed create time. We generate + // sequence numbers by adding this to sequenceDelta, where the delta term is + // incremented whenever the raft term or index within the raft term + // advances. This provides monotonicity with respect to log positions. The + // sequence numbers we issue are kept close to the highest observed create + // time (similar to HLC), so when we observe jumps in maxCreateTime, we + // reduce the accumulated sequenceDelta by as much possible. + maxCreateTime int64 + // maxRaftTerm and maxEntryIndexInTerm ratchets to the highest observed raft + // term and highest observed index within that term. The sequencer observe + // term and index transitions to ensure that subsequently issued sequence + // numbers continue to be monotonic. + maxRaftTerm, maxEntryIndexInTerm uint64 + // sequenceDelta is added to the ever-ratcheting maxCreateTime to generate + // monotonic sequence numbers as raft terms/indexes and maxCreateTime + // advance. + sequenceDelta int64 +} + +func (s *sequencer) sequence(createTime int64, raftTerm, raftIndex uint64) int64 { + if createTime > s.maxCreateTime { + delta := createTime - s.maxCreateTime + // We advance the maxCreateTime by delta and simultaneously reduce + // sequenceDelta by just that much to keep issuing sequence numbers + // close to maxCreateTime. We'll increment the sequenceDelta to + // guarantee monotonicity (consider the case where maxCreateTime and + // sequenceDelta were incremented/decremented by N respectively). + s.maxCreateTime += delta + s.sequenceDelta -= delta + s.sequenceDelta += 1 // to ensure monotonicity + if s.sequenceDelta < 0 { + // Enforce a floor. We've advanced maxCreateTime greater than what + // we had in sequenceDelta so this still gives us monotonicity and + // lets us sequence numbers close to maxCreateTime. + s.sequenceDelta = 0 + } + } + + if s.maxRaftTerm < raftTerm { + // The raft term advanced. Reset the tracking state for term/index. + s.maxRaftTerm, s.maxEntryIndexInTerm = raftTerm, raftIndex + s.sequenceDelta += 1 // to ensure monotonicity + } else if raftTerm == s.maxRaftTerm && s.maxEntryIndexInTerm < raftIndex { + // The raft index advanced within the same term. Set tracking state. + s.maxEntryIndexInTerm = raftIndex + s.sequenceDelta += 1 // to ensure monotonicity + } + return s.maxCreateTime + s.sequenceDelta +} diff --git a/pkg/util/admission/sequencer_test.go b/pkg/util/admission/sequencer_test.go new file mode 100644 index 000000000000..21b14d8caa0e --- /dev/null +++ b/pkg/util/admission/sequencer_test.go @@ -0,0 +1,65 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package admission + +import ( + "fmt" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/datadriven" + "github.com/stretchr/testify/require" +) + +func TestSequencer(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + var s *sequencer + var lastSeqNum int64 + datadriven.RunTest(t, datapathutils.TestDataPath(t, "sequencer"), + func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "init": + s = &sequencer{} + return "" + + case "sequence": + var arg, movement string + + // Parse create-time=. + d.ScanArgs(t, "create-time", &arg) + dur, err := time.ParseDuration(arg) + require.NoError(t, err) + + // Parse log-position=/. + logPosition := parseLogPosition(t, d) + sequenceNum := s.sequence(dur.Nanoseconds(), logPosition.Term, logPosition.Index) + if lastSeqNum < sequenceNum { + movement = " (advanced)" + } + lastSeqNum = sequenceNum + return fmt.Sprintf("seq=%d ≈%s%s", + sequenceNum, + timeutil.FromUnixNanos(sequenceNum).Sub(tzero), + movement, + ) + + default: + return fmt.Sprintf("unknown command: %s", d.Cmd) + } + }, + ) +} diff --git a/pkg/util/admission/store_token_estimation.go b/pkg/util/admission/store_token_estimation.go index be66b50452d1..2e2eca842b8e 100644 --- a/pkg/util/admission/store_token_estimation.go +++ b/pkg/util/admission/store_token_estimation.go @@ -177,8 +177,8 @@ func (e *storePerWorkTokenEstimator) updateEstimates( if adjustedIntL0IngestedBytes < 0 { adjustedIntL0IngestedBytes = 0 } - intWorkCount := int64(admissionStats.admittedCount) - - int64(e.cumStoreAdmissionStats.admittedCount) + intWorkCount := int64(admissionStats.workCount) - + int64(e.cumStoreAdmissionStats.workCount) intL0WriteAccountedBytes := int64(admissionStats.writeAccountedBytes) - int64(e.cumStoreAdmissionStats.writeAccountedBytes) // Note that these are not L0 ingested bytes, since we don't know how diff --git a/pkg/util/admission/store_token_estimation_test.go b/pkg/util/admission/store_token_estimation_test.go index faf2aa46836e..b2898f203b46 100644 --- a/pkg/util/admission/store_token_estimation_test.go +++ b/pkg/util/admission/store_token_estimation_test.go @@ -58,7 +58,7 @@ func TestStorePerWorkTokenEstimator(t *testing.T) { d.ScanArgs(t, "admitted", &admitted) d.ScanArgs(t, "write-accounted", &writeAccounted) d.ScanArgs(t, "ingested-accounted", &ingestedAccounted) - admissionStats.admittedCount += admitted + admissionStats.workCount += admitted admissionStats.writeAccountedBytes += writeAccounted admissionStats.ingestedAccountedBytes += ingestedAccounted if d.HasArg("bypassed-count") { diff --git a/pkg/util/admission/testdata/io_load_listener b/pkg/util/admission/testdata/io_load_listener index 1f64c463641e..5575a7fd5f55 100644 --- a/pkg/util/admission/testdata/io_load_listener +++ b/pkg/util/admission/testdata/io_load_listener @@ -6,7 +6,7 @@ init prep-admission-stats admitted=0 ---- -{admittedCount:0 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +{workCount:0 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} # Even though above the threshold, the first 60 ticks don't limit the tokens. set-state l0-bytes=10000 l0-added-write=1000 l0-files=21 l0-sublevels=21 @@ -76,7 +76,7 @@ tick: 59, setAvailableTokens: io-tokens=unlimited elastic-disk-bw-tokens=unlimit prep-admission-stats admitted=10000 write-bytes=40000 ---- -{admittedCount:10000 writeAccountedBytes:40000 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +{workCount:10000 writeAccountedBytes:40000 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} # Delta added is 100,000. The l0-bytes are the same, so compactions removed # 100,000 bytes. Smoothed removed by compactions is 50,000. Each admitted is @@ -151,7 +151,7 @@ tick: 59, setAvailableTokens: io-tokens=169 elastic-disk-bw-tokens=unlimited prep-admission-stats admitted=20000 write-bytes=80000 ---- -{admittedCount:20000 writeAccountedBytes:80000 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +{workCount:20000 writeAccountedBytes:80000 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} # Same delta as previous but smoothing bumps up the tokens to 25,000. set-state l0-bytes=10000 l0-added-write=201000 l0-files=21 l0-sublevels=21 @@ -232,7 +232,7 @@ setAvailableTokens: io-tokens=365 elastic-disk-bw-tokens=unlimited prep-admission-stats admitted=30000 write-bytes=120000 ---- -{admittedCount:30000 writeAccountedBytes:120000 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +{workCount:30000 writeAccountedBytes:120000 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} # l0-sublevels drops below threshold. We calculate the smoothed values, but # don't limit the tokens. @@ -250,7 +250,7 @@ init prep-admission-stats admitted=0 ---- -{admittedCount:0 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +{workCount:0 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} set-state l0-bytes=1000 l0-added-write=1000 l0-added-ingested=0 l0-files=21 l0-sublevels=21 print-only-first-tick=true ---- @@ -262,7 +262,7 @@ tick: 0, setAvailableTokens: io-tokens=unlimited elastic-disk-bw-tokens=unlimite # the admitted requests. prep-admission-stats admitted=10 write-bytes=130000 ingested-bytes=20000 ---- -{admittedCount:10 writeAccountedBytes:130000 ingestedAccountedBytes:20000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +{workCount:10 writeAccountedBytes:130000 ingestedAccountedBytes:20000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} # The ingested model can be fit with a multiplier of ~1.5 for the interval, # but since the l0-ingest-lm model had a previous multiplier of 0.75 and the @@ -282,7 +282,7 @@ setAvailableTokens: io-tokens=417 elastic-disk-bw-tokens=unlimited # ingested model is decayed by a factor of 2. prep-admission-stats admitted=20 write-bytes=150000 ingested-bytes=20000 ---- -{admittedCount:20 writeAccountedBytes:150000 ingestedAccountedBytes:20000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +{workCount:20 writeAccountedBytes:150000 ingestedAccountedBytes:20000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} set-state l0-bytes=1000 l0-added-write=191000 l0-added-ingested=30000 l0-files=21 l0-sublevels=21 print-only-first-tick=true ---- @@ -296,7 +296,7 @@ setAvailableTokens: io-tokens=459 elastic-disk-bw-tokens=unlimited # bytes to L0. We don't let unaccounted bytes become negative. prep-admission-stats admitted=30 write-bytes=250000 ingested-bytes=20000 ingested-into-l0=20000 ---- -{admittedCount:30 writeAccountedBytes:250000 ingestedAccountedBytes:20000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +{workCount:30 writeAccountedBytes:250000 ingestedAccountedBytes:20000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} set-state l0-bytes=1000 l0-added-write=211000 l0-added-ingested=30000 l0-files=21 l0-sublevels=21 print-only-first-tick=true ---- @@ -312,7 +312,7 @@ init prep-admission-stats admitted=0 ---- -{admittedCount:0 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +{workCount:0 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} set-state l0-bytes=10000 l0-added-write=1000 l0-files=1 l0-sublevels=1 print-only-first-tick=true ---- diff --git a/pkg/util/admission/testdata/replicated_write_admission/class_segmentation b/pkg/util/admission/testdata/replicated_write_admission/class_segmentation new file mode 100644 index 000000000000..e4d8f12d26a4 --- /dev/null +++ b/pkg/util/admission/testdata/replicated_write_admission/class_segmentation @@ -0,0 +1,63 @@ +# Verify that we'll admit requests based on the {regular,elastic} tokens being +# generated. + +init +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# Admit two requests, one at low-pri but lower log position and lower +# create-time. It gets classified as an elastic request. +admit tenant=t1 pri=low-pri create-time=1us size=1B range=r1 origin=n1 log-position=4/20 +---- +[elastic] try-get=1B available=0B => insufficient tokens + +# And one at high-pri but higher log position and higher create-time. It gets +# classified as a regular request. +admit tenant=t1 pri=high-pri create-time=2us size=1B range=r1 origin=n1 log-position=4/21 +---- +[regular] try-get=1B available=0B => insufficient tokens + +# Observe both waiting requests and physical admission stats. Note that the two +# requests sit within their own work queues (segmented by class). +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=high-pri create-time=2.001µs size=1B range=r1 origin=n1 log-position=4/21] +[elastic work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=low-pri create-time=1.001µs size=1B range=r1 origin=n1 log-position=4/20] + +# Produce 1B worth of regular tokens and verify that it only admits work +# waiting in the regular work queue. +granter class=regular adjust-tokens=+1B +---- +[regular] 1B tokens available +[elastic] 0B tokens available + +grant +---- +admitted [tenant=t1 pri=high-pri create-time=2.001µs size=1B range=r1 origin=n1 log-position=4/21] + +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=0 + tenant=t1 weight=1 fifo-threshold=low-pri used=1B +[elastic work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=low-pri create-time=1.001µs size=1B range=r1 origin=n1 log-position=4/20] + +# Do the same for elastic tokens. +granter class=elastic adjust-tokens=+1B +---- +[regular] 0B tokens available +[elastic] 1B tokens available + +grant +---- +admitted [tenant=t1 pri=low-pri create-time=1.001µs size=1B range=r1 origin=n1 log-position=4/20] + +# vim:ft=sh diff --git a/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_different_range b/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_different_range new file mode 100644 index 000000000000..d7bbb57fe9ae --- /dev/null +++ b/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_different_range @@ -0,0 +1,52 @@ +# Verify that we use create-time based ordering for replicated write admission +# when done so across different ranges. + +init +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# Admit two requests, one created at t=5us with a lower log position on r1. +admit tenant=t1 pri=normal-pri create-time=5us size=1B range=r1 origin=n1 log-position=4/20 +---- +[regular] try-get=1B available=0B => insufficient tokens + +# And one created at t=1us with a higher log position on r2. +admit tenant=t1 pri=normal-pri create-time=1us size=1B range=r2 origin=n1 log-position=4/21 +---- + +# Observe both waiting requests and physical admission stats. Note how the +# request with the lower create time sorts first despite having the higher +# log position. This is because the requests are from different ranges, and we +# don't compare log positions. +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=normal-pri create-time=1.001µs size=1B range=r2 origin=n1 log-position=4/21] + [1: pri=normal-pri create-time=5.001µs size=1B range=r1 origin=n1 log-position=4/20] +[elastic work queue]: len(tenant-heap)=0 + +# Produce 1B worth of regular tokens. +granter class=regular adjust-tokens=+1B +---- +[regular] 1B tokens available +[elastic] 0B tokens available + +# Grant admission to requests. Since we have 1B worth of tokens, and 2 waiting +# requests wanting 1B each, we're only able to admit one. Verify that it's the +# request with the lower create-time. +grant class=regular +---- +admitted [tenant=t1 pri=normal-pri create-time=1.001µs size=1B range=r2 origin=n1 log-position=4/21] + +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=1B + [0: pri=normal-pri create-time=5.001µs size=1B range=r1 origin=n1 log-position=4/20] +[elastic work queue]: len(tenant-heap)=0 + +# vim:ft=sh diff --git a/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_same_range b/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_same_range new file mode 100644 index 000000000000..13ed41a7e357 --- /dev/null +++ b/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_same_range @@ -0,0 +1,51 @@ +# Verify that we ignore create-time based ordering for replicated write +# admission when writes happen within the same range. + +init +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# Admit two requests, one created at t=5us but with a lower log position. +admit tenant=t1 pri=normal-pri create-time=5us size=1B range=r1 origin=n1 log-position=4/20 +---- +[regular] try-get=1B available=0B => insufficient tokens + +# And one created at t=1us but but higher log position. +admit tenant=t1 pri=normal-pri create-time=1us size=1B range=r1 origin=n1 log-position=4/21 +---- + +# Observe both waiting requests and physical admission stats. Note how the +# request with the lower log position sorts first despite having the higher +# create-time. +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=normal-pri create-time=5.001µs size=1B range=r1 origin=n1 log-position=4/20] + [1: pri=normal-pri create-time=5.002µs size=1B range=r1 origin=n1 log-position=4/21] +[elastic work queue]: len(tenant-heap)=0 + +# Produce 1B worth of regular tokens. +granter class=regular adjust-tokens=+1B +---- +[regular] 1B tokens available +[elastic] 0B tokens available + +# Grant admission to requests. Since we have 1B worth of tokens, and 2 waiting +# requests wanting 1B each, we're only able to admit one. Verify that it's the +# request with the lower log position despite the higher create-time. +grant class=regular +---- +admitted [tenant=t1 pri=normal-pri create-time=5.001µs size=1B range=r1 origin=n1 log-position=4/20] + +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=1B + [0: pri=normal-pri create-time=5.002µs size=1B range=r1 origin=n1 log-position=4/21] +[elastic work queue]: len(tenant-heap)=0 + +# vim:ft=sh diff --git a/pkg/util/admission/testdata/replicated_write_admission/high_pri_low_position b/pkg/util/admission/testdata/replicated_write_admission/high_pri_low_position new file mode 100644 index 000000000000..d0f1ab40f6db --- /dev/null +++ b/pkg/util/admission/testdata/replicated_write_admission/high_pri_low_position @@ -0,0 +1,50 @@ +# Verify that we'll admit in priority order, even if the higher priority work +# has a higher log position. + +init +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# Admit two requests, one at normal-pri but lower log position. +admit tenant=t1 pri=normal-pri create-time=1us size=1B range=r1 origin=n1 log-position=4/20 +---- +[regular] try-get=1B available=0B => insufficient tokens + +# And one at high-pri but higher log position. +admit tenant=t1 pri=high-pri create-time=1us size=1B range=r1 origin=n1 log-position=4/21 +---- + +# Observe both waiting requests and physical admission stats. Note that the +# high-pri request sorts first, despite having a higher log position. +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=high-pri create-time=1.002µs size=1B range=r1 origin=n1 log-position=4/21] + [1: pri=normal-pri create-time=1.001µs size=1B range=r1 origin=n1 log-position=4/20] +[elastic work queue]: len(tenant-heap)=0 + +# Produce 1B worth of regular tokens. +granter class=regular adjust-tokens=+1B +---- +[regular] 1B tokens available +[elastic] 0B tokens available + +# Grant admission to requests. Since we have 1B worth of tokens, and 2 waiting +# requests wanting 1B each, we're only able to admit one. Verify that it's the +# high-pri request that gets through. +grant class=regular +---- +admitted [tenant=t1 pri=high-pri create-time=1.002µs size=1B range=r1 origin=n1 log-position=4/21] + +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=1B + [0: pri=normal-pri create-time=1.001µs size=1B range=r1 origin=n1 log-position=4/20] +[elastic work queue]: len(tenant-heap)=0 + +# vim:ft=sh diff --git a/pkg/util/admission/testdata/replicated_write_admission/overview b/pkg/util/admission/testdata/replicated_write_admission/overview new file mode 100644 index 000000000000..1880f287decb --- /dev/null +++ b/pkg/util/admission/testdata/replicated_write_admission/overview @@ -0,0 +1,70 @@ +# Walk through the basics of the datadriven syntax. + +init +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# Try to admit two requests of 1B each, at incrementing log positions. The +# first requests tries the fast path and fails admission, and gets added to the +# work queue's internal heap. +admit tenant=t1 pri=normal-pri create-time=1us size=1B range=r1 origin=n1 log-position=4/20 +---- +[regular] try-get=1B available=0B => insufficient tokens + +# Observe that the physical stats show that the actual work was done, but the +# work is virtually enqueued in the work queue for deferred admission. +print +---- +physical-stats: work-count=1 written-bytes=1B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=normal-pri create-time=1.001µs size=1B range=r1 origin=n1 log-position=4/20] +[elastic work queue]: len(tenant-heap)=0 + +# Admit a second request. Since there's already a request waiting, we don't get +# the fast path. +admit tenant=t1 pri=normal-pri create-time=1us size=1B range=r1 origin=n1 log-position=4/21 ingested=true +---- + +# Observe both waiting requests. +print +---- +physical-stats: work-count=2 written-bytes=1B ingested-bytes=1B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=normal-pri create-time=1.001µs size=1B range=r1 origin=n1 log-position=4/20] + [1: pri=normal-pri create-time=1.002µs size=1B range=r1 origin=n1 log-position=4/21 ingested ] +[elastic work queue]: len(tenant-heap)=0 + +# Produce 2B worth of regular tokens. +granter class=regular adjust-tokens=+2B +---- +[regular] 2B tokens available +[elastic] 0B tokens available + +# Grant admission requests. Since we have 2B worth of tokens, and 2 waiting +# requests wanting 1B each, we're able to admit both. We do so in log-position +# order. +grant class=regular +---- +admitted [tenant=t1 pri=normal-pri create-time=1.001µs size=1B range=r1 origin=n1 log-position=4/20] +admitted [tenant=t1 pri=normal-pri create-time=1.002µs size=1B range=r1 origin=n1 log-position=4/21 ingested] + +# Pretty print granter state to show no more available tokens. We've consumed +# the 2B above. +granter adjust-tokens=+0B +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# Observe the empty tenant heaps (all work was admitted) and 2B worth of used +# tokens for t1. +print +---- +physical-stats: work-count=2 written-bytes=1B ingested-bytes=1B +[regular work queue]: len(tenant-heap)=0 + tenant=t1 weight=1 fifo-threshold=low-pri used=2B +[elastic work queue]: len(tenant-heap)=0 + +# vim:ft=sh diff --git a/pkg/util/admission/testdata/replicated_write_admission/tenant_fairness b/pkg/util/admission/testdata/replicated_write_admission/tenant_fairness new file mode 100644 index 000000000000..059afdb9d8d6 --- /dev/null +++ b/pkg/util/admission/testdata/replicated_write_admission/tenant_fairness @@ -0,0 +1,68 @@ +# Observe how tokens are consumed fairly across tenants. + +init +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# For two tenants t1 and t2, try to admit two requests of 1B each at +# incrementing log positions. +admit tenant=t1 pri=normal-pri create-time=1us size=1B range=r1 origin=n1 log-position=4/20 +---- +[regular] try-get=1B available=0B => insufficient tokens + +admit tenant=t1 pri=normal-pri create-time=1us size=1B range=r1 origin=n1 log-position=4/21 +---- + +admit tenant=t2 pri=normal-pri create-time=1us size=1B range=r2 origin=n1 log-position=5/20 +---- + +admit tenant=t2 pri=normal-pri create-time=1us size=1B range=r2 origin=n1 log-position=5/21 +---- + +# Observe all waiting requests. +print +---- +physical-stats: work-count=4 written-bytes=4B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=2 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=normal-pri create-time=1.001µs size=1B range=r1 origin=n1 log-position=4/20] + [1: pri=normal-pri create-time=1.002µs size=1B range=r1 origin=n1 log-position=4/21] + tenant=t2 weight=1 fifo-threshold=low-pri used=0B + [0: pri=normal-pri create-time=1.001µs size=1B range=r2 origin=n1 log-position=5/20] + [1: pri=normal-pri create-time=1.002µs size=1B range=r2 origin=n1 log-position=5/21] +[elastic work queue]: len(tenant-heap)=0 + +# Produce 2B worth of regular tokens. +granter class=regular adjust-tokens=+2B +---- +[regular] 2B tokens available +[elastic] 0B tokens available + +# Grant admission requests. Since we have 2B worth of tokens, and 4 waiting +# requests wanting 1B each from 2 tenants, we're able to 1 request from each +# tenant. +grant class=regular +---- +admitted [tenant=t1 pri=normal-pri create-time=1.001µs size=1B range=r1 origin=n1 log-position=4/20] +admitted [tenant=t2 pri=normal-pri create-time=1.001µs size=1B range=r2 origin=n1 log-position=5/20] + +# Pretty print granter state to show no more available tokens. We've consumed +# the 2B above. +granter adjust-tokens=+0B +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# Observe that each tenant still has one waiting request. +print +---- +physical-stats: work-count=4 written-bytes=4B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=2 top-tenant=t2 + tenant=t1 weight=1 fifo-threshold=low-pri used=1B + [0: pri=normal-pri create-time=1.002µs size=1B range=r1 origin=n1 log-position=4/21] + tenant=t2 weight=1 fifo-threshold=low-pri used=1B + [0: pri=normal-pri create-time=1.002µs size=1B range=r2 origin=n1 log-position=5/21] +[elastic work queue]: len(tenant-heap)=0 + +# vim:ft=sh diff --git a/pkg/util/admission/testdata/replicated_write_admission/tenant_weights b/pkg/util/admission/testdata/replicated_write_admission/tenant_weights new file mode 100644 index 000000000000..1c7f56b8c254 --- /dev/null +++ b/pkg/util/admission/testdata/replicated_write_admission/tenant_weights @@ -0,0 +1,104 @@ +# Observe how tokens are consumed proportionally across tenants, as configured +# by tenant weights. + +init +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +tenant-weights t1=2 t2=5 +---- + +# For two tenants t1 and t2, try to admit 5 requests of 1B each at +# incrementing log positions. +admit tenant=t1 pri=normal-pri create-time=1us size=1B range=r1 origin=n1 log-position=4/20 +---- +[regular] try-get=1B available=0B => insufficient tokens + +admit tenant=t1 pri=normal-pri create-time=1us size=1B range=r1 origin=n1 log-position=4/21 +---- + +admit tenant=t1 pri=normal-pri create-time=1us size=1B range=r1 origin=n1 log-position=4/22 +---- + +admit tenant=t1 pri=normal-pri create-time=1us size=1B range=r1 origin=n1 log-position=4/23 +---- + +admit tenant=t1 pri=normal-pri create-time=1us size=1B range=r1 origin=n1 log-position=4/24 +---- + +admit tenant=t2 pri=normal-pri create-time=1us size=1B range=r2 origin=n1 log-position=5/20 +---- + +admit tenant=t2 pri=normal-pri create-time=1us size=1B range=r2 origin=n1 log-position=5/21 +---- + +admit tenant=t2 pri=normal-pri create-time=1us size=1B range=r2 origin=n1 log-position=5/22 +---- + +admit tenant=t2 pri=normal-pri create-time=1us size=1B range=r2 origin=n1 log-position=5/23 +---- + +admit tenant=t2 pri=normal-pri create-time=1us size=1B range=r2 origin=n1 log-position=5/24 +---- + +# Observe all waiting requests. +print +---- +physical-stats: work-count=10 written-bytes=10B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=2 top-tenant=t1 + tenant=t1 weight=2 fifo-threshold=low-pri used=0B + [0: pri=normal-pri create-time=1.001µs size=1B range=r1 origin=n1 log-position=4/20] + [1: pri=normal-pri create-time=1.002µs size=1B range=r1 origin=n1 log-position=4/21] + [2: pri=normal-pri create-time=1.003µs size=1B range=r1 origin=n1 log-position=4/22] + [3: pri=normal-pri create-time=1.004µs size=1B range=r1 origin=n1 log-position=4/23] + [4: pri=normal-pri create-time=1.005µs size=1B range=r1 origin=n1 log-position=4/24] + tenant=t2 weight=5 fifo-threshold=low-pri used=0B + [0: pri=normal-pri create-time=1.001µs size=1B range=r2 origin=n1 log-position=5/20] + [1: pri=normal-pri create-time=1.002µs size=1B range=r2 origin=n1 log-position=5/21] + [2: pri=normal-pri create-time=1.003µs size=1B range=r2 origin=n1 log-position=5/22] + [3: pri=normal-pri create-time=1.004µs size=1B range=r2 origin=n1 log-position=5/23] + [4: pri=normal-pri create-time=1.005µs size=1B range=r2 origin=n1 log-position=5/24] +[elastic work queue]: len(tenant-heap)=0 + +# Produce 7B worth of regular tokens. +granter class=regular adjust-tokens=+7B +---- +[regular] 7B tokens available +[elastic] 0B tokens available + +# Grant admission requests. Since we have 7B worth of tokens, and 10 waiting +# requests wanting 1B each, we'll be able to admit 7 requests. We'll bias +# towards the tenant with the higher weight (t2). +grant class=regular +---- +admitted [tenant=t1 pri=normal-pri create-time=1.001µs size=1B range=r1 origin=n1 log-position=4/20] +admitted [tenant=t2 pri=normal-pri create-time=1.001µs size=1B range=r2 origin=n1 log-position=5/20] +admitted [tenant=t2 pri=normal-pri create-time=1.002µs size=1B range=r2 origin=n1 log-position=5/21] +admitted [tenant=t2 pri=normal-pri create-time=1.003µs size=1B range=r2 origin=n1 log-position=5/22] +admitted [tenant=t1 pri=normal-pri create-time=1.002µs size=1B range=r1 origin=n1 log-position=4/21] +admitted [tenant=t2 pri=normal-pri create-time=1.004µs size=1B range=r2 origin=n1 log-position=5/23] +admitted [tenant=t2 pri=normal-pri create-time=1.005µs size=1B range=r2 origin=n1 log-position=5/24] + +# Pretty print granter state to show no more available tokens. We've consumed +# the 7B above. +granter adjust-tokens=+0B +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# Observe that t2 has no waiting requests, but t1 still has 3. So we've +# processed 5 t2 requests for every 2 t1 requests, exactly what we'd expect for +# a 2:5 weight ratio between t1:t2. +print +---- +physical-stats: work-count=10 written-bytes=10B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=2 fifo-threshold=low-pri used=2B + [0: pri=normal-pri create-time=1.003µs size=1B range=r1 origin=n1 log-position=4/22] + [1: pri=normal-pri create-time=1.004µs size=1B range=r1 origin=n1 log-position=4/23] + [2: pri=normal-pri create-time=1.005µs size=1B range=r1 origin=n1 log-position=4/24] + tenant=t2 weight=5 fifo-threshold=low-pri used=5B +[elastic work queue]: len(tenant-heap)=0 + +# vim:ft=sh diff --git a/pkg/util/admission/testdata/sequencer b/pkg/util/admission/testdata/sequencer new file mode 100644 index 000000000000..34929f5f5e83 --- /dev/null +++ b/pkg/util/admission/testdata/sequencer @@ -0,0 +1,142 @@ +# Walk through the basics of how the per-range sequencer works. +# +# ----------------------------------------------------------------------------- +# 1. Observe how the sequence numbers change with changing log positions (and +# static create-times). +init +---- + +sequence create-time=0ms log-position=1/0 +---- +seq=1 ≈1ns (advanced) + +# If the log index is incremented, so does the sequence number. So we're +# issuing "create times" that are higher than ones issued before for lower log +# indexes. +sequence create-time=0ms log-position=1/1 +---- +seq=2 ≈2ns (advanced) + +# Bump the log index by 19, and observe a higher sequence number. Since the +# create-time is static (0ms now and earlier), and we try to stay as close to +# the max observed create-time as possible, we increment the sequence number by +# the smallest possible amount -- 1ns. +sequence create-time=0ms log-position=1/20 +---- +seq=3 ≈3ns (advanced) + +# Regressions in log indexes (indicating buggy usage) doesn't cause regressions +# in the sequence number. +sequence create-time=0ms log-position=1/10 +---- +seq=3 ≈3ns + +# Try again with a large bump in the log index. +sequence create-time=0ms log-position=1/500 +---- +seq=4 ≈4ns (advanced) + +# Increases in the raft term also increases the sequence number. +sequence create-time=0ms log-position=2/0 +---- +seq=5 ≈5ns (advanced) + +# Regressions in the raft term (indicating buggy usage) doesn't cause +# regressions in the sequence number. +sequence create-time=0ms log-position=1/0 +---- +seq=5 ≈5ns + +# Try another (large) bump in the raft term, observing a delta in the sequence +# number. +sequence create-time=0ms log-position=5/0 +---- +seq=6 ≈6ns (advanced) + + +# ----------------------------------------------------------------------------- +# 2. Observe how the sequence numbers change with changing create-times (and +# static log positions, which would typically indicate buggy usage). +init +---- + +sequence create-time=0ms log-position=1/1 +---- +seq=1 ≈1ns + +# Create time advancing to 1us also advances the sequence numbers -- they're +# kept closely tied to the largest observed create time. +sequence create-time=1us log-position=1/1 +---- +seq=1000 ≈1µs (advanced) + +# Ditto for subsequent increases in max observed create times. +sequence create-time=2us log-position=1/1 +---- +seq=2000 ≈2µs (advanced) + +# Regressions in create-time don't cause regressions in +sequence create-time=1us log-position=1/1 +---- +seq=2000 ≈2µs + + +# ----------------------------------------------------------------------------- +# 3. Observe how the sequence numbers change with both changing create-times +# and log positions. +init +---- + +# Advance either create-time or log position or both. We should see the +# sequence numbers ratchet up accordingly. +sequence create-time=0ns log-position=1/1 +---- +seq=1 ≈1ns + +# We see the sequence number increase by 2, because of both an increase in +# create-time and log-position. +sequence create-time=1ns log-position=1/2 +---- +seq=3 ≈3ns (advanced) + +# We see the sequence number increase by 1, because of an increase in just the +# log-position. +sequence create-time=1ns log-position=1/3 +---- +seq=4 ≈4ns (advanced) + +# We see the sequence number increase by 1, because of both an increase in just +# the create-time. +sequence create-time=2ns log-position=1/3 +---- +seq=5 ≈5ns (advanced) + +# Advance the log position by a few indexes to accumulate into the internally +# tracked "sequence delta". +sequence create-time=2ns log-position=1/4 +---- +seq=6 ≈6ns (advanced) + +sequence create-time=2ns log-position=1/5 +---- +seq=7 ≈7ns (advanced) + +sequence create-time=2ns log-position=1/6 +---- +seq=8 ≈8ns (advanced) + +# We see the sequence number increase by 1, because of an increase in just the +# log-position (the term increase). +sequence create-time=2ns log-position=2/1 +---- +seq=9 ≈9ns (advanced) + +# We see the sequence number increase to 1+create-time -- internally we're +# resetting any accumulated sequence deltas since the jump in max-observed +# create-time is larger than it. The +1 is coming from an increase in the log +# index. +sequence create-time=20ns log-position=2/2 +---- +seq=21 ≈21ns (advanced) + +# vim:ft=sh diff --git a/pkg/util/admission/testdata/store_work_queue b/pkg/util/admission/testdata/store_work_queue index 26fba8dc46e4..ca4ebdfbccd6 100644 --- a/pkg/util/admission/testdata/store_work_queue +++ b/pkg/util/admission/testdata/store_work_queue @@ -5,7 +5,7 @@ print ---- regular workqueue: closed epoch: 0 tenantHeap len: 0 elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:0 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +stats:{workCount:0 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} estimates:{writeTokens:1} set-try-get-return-value v=true @@ -14,7 +14,7 @@ set-try-get-return-value v=true admit id=1 tenant=53 priority=0 create-time-millis=1 bypass=false ---- tryGet regular: returning true -id 1: admit succeeded with handle {tenantID:{InternalValue:53} writeTokens:1 workClass:0 admissionEnabled:true} +id 1: admit succeeded with handle {tenantID:{InternalValue:53} writeTokens:1 workClass:0 useAdmittedWorkDone:true} work-done id=1 ---- @@ -25,18 +25,18 @@ set-store-request-estimates write-tokens=100 regular workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 53 used: 1, w: 1, fifo: -128 elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:1 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +stats:{workCount:1 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} estimates:{writeTokens:100} admit id=2 tenant=55 priority=0 create-time-millis=1 bypass=false ---- tryGet regular: returning true -id 2: admit succeeded with handle {tenantID:{InternalValue:55} writeTokens:100 workClass:0 admissionEnabled:true} +id 2: admit succeeded with handle {tenantID:{InternalValue:55} writeTokens:100 workClass:0 useAdmittedWorkDone:true} admit id=3 tenant=53 priority=0 create-time-millis=1 bypass=false ---- tryGet regular: returning true -id 3: admit succeeded with handle {tenantID:{InternalValue:53} writeTokens:100 workClass:0 admissionEnabled:true} +id 3: admit succeeded with handle {tenantID:{InternalValue:53} writeTokens:100 workClass:0 useAdmittedWorkDone:true} print ---- @@ -44,7 +44,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 53 used: 101, w: 1, fifo: -128 tenant-id: 55 used: 100, w: 1, fifo: -128 elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:1 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +stats:{workCount:1 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} estimates:{writeTokens:100} set-try-get-return-value v=false @@ -65,13 +65,13 @@ regular workqueue: closed epoch: 0 tenantHeap len: 1 top tenant: 57 tenant-id: 55 used: 600, w: 1, fifo: -128 tenant-id: 57 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 0] elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:2 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +stats:{workCount:2 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} estimates:{writeTokens:100} granted ---- continueGrantChain regular 0 -id 4: admit succeeded with handle {tenantID:{InternalValue:57} writeTokens:100 workClass:0 admissionEnabled:true} +id 4: admit succeeded with handle {tenantID:{InternalValue:57} writeTokens:100 workClass:0 useAdmittedWorkDone:true} granted regular: returned 100 print @@ -81,7 +81,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 55 used: 600, w: 1, fifo: -128 tenant-id: 57 used: 100, w: 1, fifo: -128 elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:2 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +stats:{workCount:2 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} estimates:{writeTokens:100} work-done id=3 ingested-bytes=1000000 additional-tokens=50000 @@ -95,7 +95,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 55 used: 600, w: 1, fifo: -128 tenant-id: 57 used: 100, w: 1, fifo: -128 elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:3 writeAccountedBytes:0 ingestedAccountedBytes:1000000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +stats:{workCount:3 writeAccountedBytes:0 ingestedAccountedBytes:1000000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} estimates:{writeTokens:100} set-store-request-estimates write-tokens=10000 @@ -105,7 +105,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 55 used: 600, w: 1, fifo: -128 tenant-id: 57 used: 100, w: 1, fifo: -128 elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:3 writeAccountedBytes:0 ingestedAccountedBytes:1000000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +stats:{workCount:3 writeAccountedBytes:0 ingestedAccountedBytes:1000000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} estimates:{writeTokens:10000} work-done id=4 write-bytes=2000 ingested-bytes=1000 additional-tokens=2000 @@ -119,7 +119,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 55 used: 600, w: 1, fifo: -128 tenant-id: 57 used: 2100, w: 1, fifo: -128 elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:4 writeAccountedBytes:2000 ingestedAccountedBytes:1001000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +stats:{workCount:4 writeAccountedBytes:2000 ingestedAccountedBytes:1001000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} estimates:{writeTokens:10000} bypassed-work-done work-count=10 write-bytes=1000 ingested-bytes=1000000 @@ -133,7 +133,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 55 used: 600, w: 1, fifo: -128 tenant-id: 57 used: 2100, w: 1, fifo: -128 elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:14 writeAccountedBytes:3000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} +stats:{workCount:14 writeAccountedBytes:3000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} estimates:{writeTokens:10000} stats-to-ignore ingested-bytes=12000 ingested-into-L0-bytes=9000 @@ -143,7 +143,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 55 used: 600, w: 1, fifo: -128 tenant-id: 57 used: 2100, w: 1, fifo: -128 elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:14 writeAccountedBytes:3000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} +stats:{workCount:14 writeAccountedBytes:3000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} estimates:{writeTokens:10000} # Elastic work. @@ -160,7 +160,7 @@ granted regular: returned 0 granted elastic=true ---- continueGrantChain elastic 0 -id 5: admit succeeded with handle {tenantID:{InternalValue:53} writeTokens:10000 workClass:1 admissionEnabled:true} +id 5: admit succeeded with handle {tenantID:{InternalValue:53} writeTokens:10000 workClass:1 useAdmittedWorkDone:true} granted elastic: returned 10000 print @@ -171,7 +171,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 57 used: 2100, w: 1, fifo: -128 elastic workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 53 used: 10000, w: 1, fifo: -128 -stats:{admittedCount:14 writeAccountedBytes:3000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} +stats:{workCount:14 writeAccountedBytes:3000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} estimates:{writeTokens:10000} set-try-get-return-value v=true elastic=true @@ -180,7 +180,7 @@ set-try-get-return-value v=true elastic=true admit id=6 tenant=54 priority=-40 create-time-millis=3 bypass=false ---- tryGet elastic: returning true -id 6: admit succeeded with handle {tenantID:{InternalValue:54} writeTokens:10000 workClass:1 admissionEnabled:true} +id 6: admit succeeded with handle {tenantID:{InternalValue:54} writeTokens:10000 workClass:1 useAdmittedWorkDone:true} print ---- @@ -191,7 +191,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 elastic workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 53 used: 10000, w: 1, fifo: -128 tenant-id: 54 used: 10000, w: 1, fifo: -128 -stats:{admittedCount:14 writeAccountedBytes:3000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} +stats:{workCount:14 writeAccountedBytes:3000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} estimates:{writeTokens:10000} work-done id=5 write-bytes=1000 additional-tokens=200 @@ -207,7 +207,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 elastic workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 53 used: 10200, w: 1, fifo: -128 tenant-id: 54 used: 10000, w: 1, fifo: -128 -stats:{admittedCount:15 writeAccountedBytes:4000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} +stats:{workCount:15 writeAccountedBytes:4000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} estimates:{writeTokens:10000} work-done id=6 ingested-bytes=500 additional-tokens=500 @@ -223,5 +223,5 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 elastic workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 53 used: 10200, w: 1, fifo: -128 tenant-id: 54 used: 10500, w: 1, fifo: -128 -stats:{admittedCount:16 writeAccountedBytes:4000 ingestedAccountedBytes:2001500 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} +stats:{workCount:16 writeAccountedBytes:4000 ingestedAccountedBytes:2001500 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} estimates:{writeTokens:10000} diff --git a/pkg/util/admission/testing_knobs.go b/pkg/util/admission/testing_knobs.go new file mode 100644 index 000000000000..659552c84dc5 --- /dev/null +++ b/pkg/util/admission/testing_knobs.go @@ -0,0 +1,36 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package admission + +import ( + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" +) + +// TestingKnobs provide fine-grained control over the various admission control +// components for testing. +type TestingKnobs struct { + // AdmittedReplicatedWorkInterceptor is invoked whenever replicated work is + // admitted. + AdmittedReplicatedWorkInterceptor func( + tenantID roachpb.TenantID, + pri admissionpb.WorkPriority, + rwi ReplicatedWorkInfo, + originalTokens int64, + createTime int64, + ) +} + +// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. +func (t *TestingKnobs) ModuleTestingKnobs() {} + +var _ base.ModuleTestingKnobs = (*TestingKnobs)(nil) diff --git a/pkg/util/admission/work_queue.go b/pkg/util/admission/work_queue.go index 3ec037c171be..788d1cb7ee4d 100644 --- a/pkg/util/admission/work_queue.go +++ b/pkg/util/admission/work_queue.go @@ -156,9 +156,17 @@ var epochLIFOQueueDelayThresholdToSwitchToLIFO = settings.RegisterDurationSettin return nil }).WithPublic() -// WorkInfo provides information that is used to order work within an -// WorkQueue. The WorkKind is not included as a field since an WorkQueue deals -// with a single WorkKind. +var rangeSequencerGCThreshold = settings.RegisterDurationSetting( + settings.TenantWritable, + "admission.replication_control.range_sequencer_gc_threshold", + "the inactive duration for a range sequencer after it's garbage collected", + 5*time.Minute, + settings.NonNegativeDuration, +) + +// WorkInfo provides information that is used to order work within an WorkQueue. +// The WorkKind is not included as a field since an WorkQueue deals with a +// single WorkKind. type WorkInfo struct { // TenantID is the id of the tenant. For single-tenant clusters, this will // always be the SystemTenantID. @@ -177,22 +185,58 @@ type WorkInfo struct { // when KV work generates other KV work (to avoid deadlock). Ignored // otherwise. BypassAdmission bool + // RequestedCount is the requested number of tokens or slots. If unset: + // - For slot-based queues we treat it as an implicit request of 1; + // - For the store work queue, we use per-request estimates to deduct some + // number of tokens at-admit time. Note that this only applies to the + // legacy above-raft admission control. With admission control for + // replicated writes (done so asynchronously, below-raft; see + // ReplicatedWrite below), we do know the size of the write being + // admitted, so RequestedCount is set accordingly. + RequestedCount int64 + // ReplicatedWorkInfo groups everything needed to admit replicated writes, done + // so asynchronously below-raft as part of replication admission control. + ReplicatedWorkInfo ReplicatedWorkInfo +} + +// ReplicatedWorkInfo groups everything needed to admit replicated writes, done +// so asynchronously below-raft as part of replication admission control. +type ReplicatedWorkInfo struct { + // Enabled captures whether this work represents a replicated write, + // subject to below-raft asynchronous admission control. + Enabled bool + // RangeID identifies the raft group on behalf of which work is being + // admitted. + RangeID roachpb.RangeID + // Origin is the node at which this work originated. It's used for + // replication admission control to inform the origin of admitted work + // (after which flow tokens are released, permitting more replicated + // writes). + Origin roachpb.NodeID + // LogPosition is the point on the raft log where the write was replicated. + LogPosition LogPosition + // Ingested captures whether the write work corresponds to an ingest + // (for sstables, for example). This is used alongside RequestedCount to + // maintain accurate linear models for L0 growth due to ingests and + // regular write batches. + Ingested bool +} - // Optional information specified only for WorkQueues where the work is tied - // to a range. This allows queued work to return early as soon as the range - // is no longer in a relevant state at this node. Currently only KVWork is - // tied to a range. - // TODO(sumeer): use these in the WorkQueue implementation. +// LogPosition is a point on the raft log, identified by a term and an index. +type LogPosition struct { + Term uint64 + Index uint64 +} - // RangeID is the range at which this work must be performed. Optional (see - // comment above). - RangeID roachpb.RangeID - // RequiresLeaseholder is true iff the work requires the leaseholder. - // Optional (see comment above). - RequiresLeaseholder bool +func (r LogPosition) String() string { + return fmt.Sprintf("%d/%d", r.Term, r.Index) +} - // For internal use by wrapper classes. The requested tokens or slots. - requestedCount int64 +func (r LogPosition) Less(o LogPosition) bool { + if r.Term != o.Term { + return r.Term < o.Term + } + return r.Index < o.Index } // WorkQueue maintains a queue of work waiting to be admitted. Ordering of @@ -222,12 +266,15 @@ type WorkInfo struct { // kvQueue.AdmittedWorkDone(tid) // } type WorkQueue struct { - ambientCtx context.Context - workKind WorkKind - granter granter - usesTokens bool - tiedToRange bool - settings *cluster.Settings + ambientCtx context.Context + workKind WorkKind + granter granter + usesTokens bool + tiedToRange bool + usesAsyncAdmit bool + settings *cluster.Settings + + onAdmittedReplicatedWork onAdmittedReplicatedWork // Prevents more than one caller to be in Admit and calling tryGet or adding // to the queue. It allows WorkQueue to release mu before calling tryGet and @@ -269,8 +316,9 @@ type WorkQueue struct { var _ requester = &WorkQueue{} type workQueueOptions struct { - usesTokens bool - tiedToRange bool + usesTokens bool + tiedToRange bool + usesAsyncAdmit bool // timeSource can be set to non-nil for tests. If nil, // the timeutil.DefaultTimeSource will be used. @@ -329,6 +377,7 @@ func initWorkQueue( q.granter = granter q.usesTokens = opts.usesTokens q.tiedToRange = opts.tiedToRange + q.usesAsyncAdmit = opts.usesAsyncAdmit q.settings = settings q.logThreshold = log.Every(5 * time.Minute) q.metrics = metrics @@ -369,7 +418,9 @@ func (q *WorkQueue) timeNow() time.Time { } func (q *WorkQueue) epochLIFOEnabled() bool { - return EpochLIFOEnabled.Get(&q.settings.SV) + // We don't use epoch LIFO for below-raft admission control. See I12 from + // kvflowcontrol/doc.go. + return EpochLIFOEnabled.Get(&q.settings.SV) && !q.usesAsyncAdmit } // Samples the latest cluster settings for epoch-LIFO. @@ -492,17 +543,24 @@ func (q *WorkQueue) tryCloseEpoch(timeNow time.Time) { // admission control is enabled. AdmittedWorkDone must be called iff // enabled=true && err!=nil, and the WorkKind for this queue uses slots. func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err error) { - enabledSetting := admissionControlEnabledSettings[q.workKind] - if enabledSetting != nil && !enabledSetting.Get(&q.settings.SV) { - return false, nil + if !info.ReplicatedWorkInfo.Enabled { + enabledSetting := admissionControlEnabledSettings[q.workKind] + if enabledSetting != nil && !enabledSetting.Get(&q.settings.SV) { + return false, nil + } } - if info.requestedCount == 0 { - // Callers from outside the admission package don't set requestedCount -- - // these are implicitly requesting a count of 1. - info.requestedCount = 1 + + // TODO(irfansharif): When enabling replication admission control for + // regular writes with arbitrary concurrency (part of #95563), measure + // the memory overhead of enqueueing each raft command to see whether we + // need to do some coalescing at this level. + + if info.RequestedCount == 0 { + // We treat unset RequestCounts as an implicit request of 1. + info.RequestedCount = 1 } - if !q.usesTokens && info.requestedCount != 1 { - panic(errors.AssertionFailedf("unexpected requestedCount: %d", info.requestedCount)) + if !q.usesTokens && info.RequestedCount != 1 { + panic(errors.AssertionFailedf("unexpected RequestedCount: %d", info.RequestedCount)) } q.metrics.incRequested(info.Priority) tenantID := info.TenantID.ToUint64() @@ -518,14 +576,29 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err tenant = newTenantInfo(tenantID, q.getTenantWeightLocked(tenantID)) q.mu.tenants[tenantID] = tenant } + if info.ReplicatedWorkInfo.Enabled { + if info.BypassAdmission { + // TODO(irfansharif): "Admin" work (like splits, scatters, lease + // transfers, etc.), and work originating from AdmissionHeader_OTHER, + // don't use flow control tokens above-raft. So there's nothing to + // virtually enqueue below-raft, since we have nothing to return. That + // said, it might still be useful to physically admit these proposals + // for correct token modeling. To do that, we'd have to pass down + // information about it being bypassed above-raft. + panic("unexpected BypassAdmission bit set for below raft admission") + } + if !q.usesTokens { + panic("unexpected ReplicatedWrite.Enabled on slot-based queue") + } + } if info.BypassAdmission && roachpb.IsSystemTenantID(tenantID) && q.workKind == KVWork { - tenant.used += uint64(info.requestedCount) + tenant.used += uint64(info.RequestedCount) if isInTenantHeap(tenant) { q.mu.tenantHeap.fix(tenant) } q.mu.Unlock() q.admitMu.Unlock() - q.granter.tookWithoutPermission(info.requestedCount) + q.granter.tookWithoutPermission(info.RequestedCount) q.metrics.incAdmitted(info.Priority) return true, nil } @@ -539,11 +612,29 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err if len(q.mu.tenantHeap) == 0 { // Fast-path. Try to grab token/slot. // Optimistically update used to avoid locking again. - tenant.used += uint64(info.requestedCount) + tenant.used += uint64(info.RequestedCount) q.mu.Unlock() - if q.granter.tryGet(info.requestedCount) { + if q.granter.tryGet(info.RequestedCount) { q.admitMu.Unlock() q.metrics.incAdmitted(info.Priority) + if info.ReplicatedWorkInfo.Enabled { + // TODO(irfansharif): There's a race here, and could lead to + // over-admission. It's possible that there are enqueued work + // items with lower log positions than the request that just got + // through using the fast-path, and since we're returning flow + // tokens by specifying a log prefix, we'd be returning more + // flow tokens than actually admitted. Fix it as part of #95563, + // by either adding more synchronization, getting rid of this + // fast path, or swapping this entry from the top-most one in + // the waiting heap (and fixing the heap). + q.onAdmittedReplicatedWork.admittedReplicatedWork( + roachpb.MustMakeTenantID(tenant.id), + info.Priority, + info.ReplicatedWorkInfo, + info.RequestedCount, + info.CreateTime, + ) + } return true, nil } // Did not get token/slot. @@ -571,11 +662,11 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err if !ok || prevTenant != tenant { panic("prev tenantInfo no longer in map") } - if tenant.used < uint64(info.requestedCount) { - panic(errors.AssertionFailedf("tenant.used %d < info.requestedCount %d", - tenant.used, info.requestedCount)) + if tenant.used < uint64(info.RequestedCount) { + panic(errors.AssertionFailedf("tenant.used %d < info.RequestedCount %d", + tenant.used, info.RequestedCount)) } - tenant.used -= uint64(info.requestedCount) + tenant.used -= uint64(info.RequestedCount) } else { if !ok { tenant = newTenantInfo(tenantID, q.getTenantWeightLocked(tenantID)) @@ -583,14 +674,17 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err } // Don't want to overflow tenant.used if it is already 0 because of // being reset to 0 by the GC goroutine. - if tenant.used >= uint64(info.requestedCount) { - tenant.used -= uint64(info.requestedCount) + if tenant.used >= uint64(info.RequestedCount) { + tenant.used -= uint64(info.RequestedCount) } } } // Check for cancellation. startTime := q.timeNow() if ctx.Err() != nil { + if info.ReplicatedWorkInfo.Enabled { + panic("not equipped to deal with cancelable contexts below raft") + } // Already canceled. More likely to happen if cpu starvation is // causing entering into the work queue to be delayed. q.mu.Unlock() @@ -606,7 +700,9 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err if int(info.Priority) < tenant.fifoPriorityThreshold { ordering = lifoWorkOrdering } - work := newWaitingWork(info.Priority, ordering, info.CreateTime, info.requestedCount, startTime, q.mu.epochLengthNanos) + work := newWaitingWork(info.Priority, ordering, info.CreateTime, info.RequestedCount, startTime, q.mu.epochLengthNanos) + work.replicated = info.ReplicatedWorkInfo + inTenantHeap := isInTenantHeap(tenant) if work.epoch <= q.mu.closedEpochThreshold || ordering == fifoWorkOrdering { heap.Push(&tenant.waitingWorkHeap, work) @@ -618,11 +714,16 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err } // Else already in tenantHeap. - // Release all locks and start waiting. + // Release all locks. q.mu.Unlock() q.admitMu.Unlock() q.metrics.recordStartWait(info.Priority) + if info.ReplicatedWorkInfo.Enabled { + return // return without waiting (admission is asynchronous) + } + + // Start waiting for admission. defer releaseWaitingWork(work) select { case <-ctx.Done(): @@ -638,16 +739,16 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err if work.heapIndex == -1 { // No longer in heap. Raced with token/slot grant. if !q.usesTokens { - if tenant.used < uint64(info.requestedCount) { - panic(errors.AssertionFailedf("tenant.used %d < info.requestedCount %d", - tenant.used, info.requestedCount)) + if tenant.used < uint64(info.RequestedCount) { + panic(errors.AssertionFailedf("tenant.used %d < info.RequestedCount %d", + tenant.used, info.RequestedCount)) } - tenant.used -= uint64(info.requestedCount) + tenant.used -= uint64(info.RequestedCount) } // Else, we don't decrement tenant.used since we don't want to race with // the gc goroutine that will set used=0. q.mu.Unlock() - q.granter.returnGrant(info.requestedCount) + q.granter.returnGrant(info.RequestedCount) // The channel is sent to after releasing mu, so we don't need to hold // mu when receiving from it. Additionally, we've already called // returnGrant so we're not holding back future grant chains if this one @@ -745,8 +846,30 @@ func (q *WorkQueue) granted(grantChainID grantChainID) int64 { // releaseWaitingWork to return item to the waitingWorkPool. requestedCount := item.requestedCount q.mu.Unlock() - // Reduce critical section by sending on channel after releasing mutex. - item.ch <- grantChainID + + if !item.replicated.Enabled { + // Reduce critical section by sending on channel after releasing mutex. + item.ch <- grantChainID + } else { + // NB: We don't use grant chains for store tokens, so they don't apply + // to replicated writes. + + defer releaseWaitingWork(item) + q.onAdmittedReplicatedWork.admittedReplicatedWork( + roachpb.MustMakeTenantID(tenant.id), + item.priority, + item.replicated, + item.requestedCount, + item.createTime, + ) + + q.metrics.incAdmitted(item.priority) + waitDur := q.timeNow().Sub(item.enqueueingTime) + q.metrics.recordFinishWait(item.priority, waitDur) + if item.heapIndex != -1 { + panic(errors.AssertionFailedf("grantee should be removed from heap")) + } + } return requestedCount } @@ -769,24 +892,24 @@ func (q *WorkQueue) gcTenantsAndResetTokens() { } // adjustTenantTokens is used internally by StoreWorkQueue. The -// additionalTokens count can be negative, in which case it is returning +// additionalTokensNeeded count can be negative, in which case it is returning // tokens. This is only for WorkQueue's own accounting -- it should not call // into granter. -func (q *WorkQueue) adjustTenantTokens(tenantID roachpb.TenantID, additionalTokens int64) { +func (q *WorkQueue) adjustTenantTokens(tenantID roachpb.TenantID, additionalTokensNeeded int64) { tid := tenantID.ToUint64() q.mu.Lock() defer q.mu.Unlock() tenant, ok := q.mu.tenants[tid] if ok { - if additionalTokens < 0 { - toReturn := uint64(-additionalTokens) + if additionalTokensNeeded < 0 { + toReturn := uint64(-additionalTokensNeeded) if tenant.used < toReturn { tenant.used = 0 } else { tenant.used -= toReturn } } else { - tenant.used += uint64(additionalTokens) + tenant.used += uint64(additionalTokensNeeded) } } } @@ -1243,6 +1366,7 @@ type waitingWork struct { // to false. inWaitingWorkHeap bool enqueueingTime time.Time + replicated ReplicatedWorkInfo } var waitingWorkPool = sync.Pool{ @@ -1636,68 +1760,115 @@ func makeWorkQueueMetricsSingle(name string) workQueueMetricsSingle { // seeking admission from a StoreWorkQueue. type StoreWriteWorkInfo struct { WorkInfo - // NB: no information about the size of the work is provided at admission - // time. The token subtraction at admission time is completely based on past - // estimates. This estimation is improved at work completion time via size - // information provided in StoreWorkDoneInfo. - // - // TODO(sumeer): in some cases, like AddSSTable requests, we do have size - // information at proposal time, and may be able to use it fruitfully. } // StoreWorkQueue is responsible for admission to a store. type StoreWorkQueue struct { - q [admissionpb.NumWorkClasses]WorkQueue + storeID roachpb.StoreID + q [admissionpb.NumWorkClasses]WorkQueue // Only calls storeWriteDone. The rest of the interface is used by // WorkQueue. granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone mu struct { syncutil.RWMutex + // estimates is used to determine how many tokens are deducted at-admit + // time for each request. It's not used for replication admission + // control (below-raft) where we do know the size of the write being + // admitted. estimates storeRequestEstimates - stats storeAdmissionStats + // stats are used to maintain L0 {write,ingest} linear models, modeling + // the relation between accounted for "physical" {write,ingest} bytes + // and observed L0 growth (which factors in state machine application). + stats storeAdmissionStats + } + sequencersMu struct { + syncutil.Mutex + s map[roachpb.RangeID]*sequencer // cleaned up periodically } + stopCh chan struct{} + timeSource timeutil.TimeSource + settings *cluster.Settings + + knobs *TestingKnobs } // StoreWorkHandle is returned by StoreWorkQueue.Admit, and contains state -// needed by the caller (see StoreWorkHandle.AdmissionEnabled) and by +// needed by the caller (see StoreWorkHandle.UseAdmittedWorkDone) and by // StoreWorkQueue.AdmittedWorkDone. type StoreWorkHandle struct { tenantID roachpb.TenantID // The writeTokens acquired by this request. Must be > 0. - writeTokens int64 - workClass admissionpb.WorkClass - admissionEnabled bool + writeTokens int64 + workClass admissionpb.WorkClass + useAdmittedWorkDone bool } -// AdmissionEnabled indicates whether admission control is enabled. If it -// returns false, there is no need to call StoreWorkQueue.AdmittedWorkDone. -func (h StoreWorkHandle) AdmissionEnabled() bool { - return h.admissionEnabled +// UseAdmittedWorkDone indicates whether we need to invoke +// StoreWorkQueue.AdmittedWorkDone. It's false if AC is disabled or if we're +// using below-raft admission control. +func (h StoreWorkHandle) UseAdmittedWorkDone() bool { + return h.useAdmittedWorkDone } // Admit is called when requesting admission for store work. If err!=nil, the // request was not admitted, potentially due to a deadline being exceeded. If -// err=nil and handle.AdmissionEnabled() is true, AdmittedWorkDone must be +// err=nil and handle.UseAdmittedWorkDone() is true, AdmittedWorkDone must be // called when the admitted work is done. func (q *StoreWorkQueue) Admit( ctx context.Context, info StoreWriteWorkInfo, ) (handle StoreWorkHandle, err error) { - // For now, we compute a workClass based on priority. wc := admissionpb.WorkClassFromPri(info.Priority) - h := StoreWorkHandle{ - tenantID: info.TenantID, - workClass: wc, + if info.RequestedCount == 0 { + // We use a per-request estimate only when no requested count is + // provided. It's always provided for below-raft admission where we know + // the size of the work being admitted. For below-raft admission when + // work is admitted[1], we first deduct the requested number of tokens. + // This just corresponds to the known size of the write/ingest, but + // could be insufficient since we haven't applied the granter's linear + // models. This is accounted for in + // StoreWorkQueue.admittedReplicatedWork(), which is invoked right after + // admission. There is no risk of over-admission since this adjustment + // is being done in the same goroutine that did the granting. + // + // [1]: This happens asynchronously -- i.e. we may have already returned + // from StoreWorkQueue.Admit(). + q.mu.RLock() + info.RequestedCount = q.mu.estimates.writeTokens + q.mu.RUnlock() } - q.mu.RLock() - estimates := q.mu.estimates - q.mu.RUnlock() - h.writeTokens = estimates.writeTokens - info.WorkInfo.requestedCount = h.writeTokens + + if info.ReplicatedWorkInfo.Enabled { + info.CreateTime = q.sequenceReplicatedWork(info.CreateTime, info.ReplicatedWorkInfo) + } + enabled, err := q.q[wc].Admit(ctx, info.WorkInfo) if err != nil { return StoreWorkHandle{}, err } - h.admissionEnabled = enabled + + h := StoreWorkHandle{ + tenantID: info.TenantID, + workClass: wc, + writeTokens: info.RequestedCount, + useAdmittedWorkDone: enabled, + } + if !info.ReplicatedWorkInfo.Enabled { + return h, nil + } + + h.useAdmittedWorkDone = false + var storeWorkDoneInfo StoreWorkDoneInfo + if info.ReplicatedWorkInfo.Ingested { + storeWorkDoneInfo.IngestedBytes = info.RequestedCount + } else { + storeWorkDoneInfo.WriteBytes = info.RequestedCount + } + + // Update store admission stats, because the write is happening ~this + // point. These statistics are used to maintain the underlying linear + // models (modeling relation between physical log writes and total L0 + // growth, which includes the state machine application). + q.updateStoreStatsAfterWorkDone(1, storeWorkDoneInfo, false) return h, nil } @@ -1713,13 +1884,72 @@ type StoreWorkDoneInfo struct { IngestedBytes int64 } -// AdmittedWorkDone indicates to the queue that the admitted work has -// completed. +type onAdmittedReplicatedWork interface { + admittedReplicatedWork( + tenantID roachpb.TenantID, + pri admissionpb.WorkPriority, + rwi ReplicatedWorkInfo, + requestedTokens int64, + createTime int64, + ) +} + +var _ onAdmittedReplicatedWork = &StoreWorkQueue{} + +// admittedReplicatedWork indicates to the queue that replicated write work was +// admitted. +func (q *StoreWorkQueue) admittedReplicatedWork( + tenantID roachpb.TenantID, + pri admissionpb.WorkPriority, + rwi ReplicatedWorkInfo, + originalTokens int64, + createTime int64, // only used in tests +) { + if !rwi.Enabled { + panic("unexpected call to admittedReplicatedWork for work that's not a replicated write") + } + if fn := q.knobs.AdmittedReplicatedWorkInterceptor; fn != nil { + fn(tenantID, pri, rwi, originalTokens, createTime) + } + + var storeWorkDoneInfo StoreWorkDoneInfo + if rwi.Ingested { + storeWorkDoneInfo.IngestedBytes = originalTokens + } else { + storeWorkDoneInfo.WriteBytes = originalTokens + } + + // We've already used RequestedCount for replicated writes to deduct tokens + // in the granter. RequestedCount corresponded to the size of the + // write/ingest, which we knew when enqueuing the write in the WorkQueue for + // (asynchronous) admission. That token deduction however did not use the + // underlying linear models, and we may have under-deducted -- we account + // for this below. + wc := admissionpb.WorkClassFromPri(pri) + additionalTokensNeeded := q.granters[wc].storeWriteDone(originalTokens, storeWorkDoneInfo) + q.q[wc].adjustTenantTokens(tenantID, additionalTokensNeeded) + + // TODO(irfansharif): Dispatch flow token returns here. We want to + // inform (a) the origin node of writes at (b) a given priority, to + // (c) the given range, at (d) the given log position on (e) the + // local store. Part of #95563. + // + _ = rwi.Origin // (a) + _ = pri // (b) + _ = rwi.RangeID // (c) + _ = rwi.LogPosition // (d) + _ = q.storeID // (e) +} + +// AdmittedWorkDone indicates to the queue that the admitted work has completed. +// It's used for the legacy above-raft admission control where we Admit() +// upfront, with just an estimate of the write size, and after the write is +// done, invoke AdmittedWorkDone with the now-known size. func (q *StoreWorkQueue) AdmittedWorkDone(h StoreWorkHandle, doneInfo StoreWorkDoneInfo) error { - if !h.admissionEnabled { - return nil + if !h.UseAdmittedWorkDone() { + return nil // nothing to do } - q.updateStoreAdmissionStats(1, doneInfo, false) + q.updateStoreStatsAfterWorkDone(1, doneInfo, false) additionalTokens := q.granters[h.workClass].storeWriteDone(h.writeTokens, doneInfo) q.q[h.workClass].adjustTenantTokens(h.tenantID, additionalTokens) return nil @@ -1729,7 +1959,7 @@ func (q *StoreWorkQueue) AdmittedWorkDone(h StoreWorkHandle, doneInfo StoreWorkD // can (a) adjust remaining tokens, (b) account for this in the per-work token // estimation model. func (q *StoreWorkQueue) BypassedWorkDone(workCount int64, doneInfo StoreWorkDoneInfo) { - q.updateStoreAdmissionStats(uint64(workCount), doneInfo, true) + q.updateStoreStatsAfterWorkDone(uint64(workCount), doneInfo, true) // Since we have no control over such work, we choose to count it as // regularWorkClass. _ = q.granters[admissionpb.RegularWorkClass].storeWriteDone(0, doneInfo) @@ -1744,11 +1974,11 @@ func (q *StoreWorkQueue) StatsToIgnore(ingestStats pebble.IngestOperationStats) q.mu.Unlock() } -func (q *StoreWorkQueue) updateStoreAdmissionStats( +func (q *StoreWorkQueue) updateStoreStatsAfterWorkDone( workCount uint64, doneInfo StoreWorkDoneInfo, bypassed bool, ) { q.mu.Lock() - q.mu.stats.admittedCount += workCount + q.mu.stats.workCount += workCount q.mu.stats.writeAccountedBytes += uint64(doneInfo.WriteBytes) q.mu.stats.ingestedAccountedBytes += uint64(doneInfo.IngestedBytes) if bypassed { @@ -1779,6 +2009,7 @@ func (q *StoreWorkQueue) close() { for i := range q.q { q.q[i].close() } + close(q.stopCh) } func (q *StoreWorkQueue) getStoreAdmissionStats() storeAdmissionStats { @@ -1795,21 +2026,77 @@ func (q *StoreWorkQueue) setStoreRequestEstimates(estimates storeRequestEstimate func makeStoreWorkQueue( ambientCtx log.AmbientContext, + storeID roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, + knobs *TestingKnobs, ) storeRequester { + if knobs == nil { + knobs = &TestingKnobs{} + } + if opts.timeSource == nil { + opts.timeSource = timeutil.DefaultTimeSource{} + } q := &StoreWorkQueue{ - granters: granters, + storeID: storeID, + granters: granters, + knobs: knobs, + stopCh: make(chan struct{}), + timeSource: opts.timeSource, + settings: settings, } + + opts.usesAsyncAdmit = true for i := range q.q { initWorkQueue(&q.q[i], ambientCtx, KVWork, granters[i], settings, metrics, opts) + q.q[i].onAdmittedReplicatedWork = q } // Arbitrary initial value. This will be replaced before any meaningful // token constraints are enforced. q.mu.estimates = storeRequestEstimates{ writeTokens: 1, } + + q.sequencersMu.s = make(map[roachpb.RangeID]*sequencer) + go func() { + ticker := time.NewTicker(30 * time.Second) + for { + select { + case <-ticker.C: + q.gcSequencers() + case <-q.stopCh: + return + } + } + }() return q } + +func (q *StoreWorkQueue) gcSequencers() { + q.sequencersMu.Lock() + defer q.sequencersMu.Unlock() + + for rangeID, seq := range q.sequencersMu.s { + maxCreateTime := timeutil.FromUnixNanos(seq.maxCreateTime) + if q.timeSource.Now().Sub(maxCreateTime) > rangeSequencerGCThreshold.Get(&q.settings.SV) { + delete(q.sequencersMu.s, rangeID) + } + } +} + +func (q *StoreWorkQueue) sequenceReplicatedWork(createTime int64, info ReplicatedWorkInfo) int64 { + q.sequencersMu.Lock() + seq, ok := q.sequencersMu.s[info.RangeID] + if !ok { + seq = &sequencer{} + q.sequencersMu.s[info.RangeID] = seq + } + q.sequencersMu.Unlock() + return seq.sequence( + createTime, + info.LogPosition.Term, + info.LogPosition.Index, + ) +} diff --git a/pkg/util/admission/work_queue_test.go b/pkg/util/admission/work_queue_test.go index 0296c442a7ad..f4902930be32 100644 --- a/pkg/util/admission/work_queue_test.go +++ b/pkg/util/admission/work_queue_test.go @@ -57,9 +57,12 @@ func (b *builderWithMu) stringAndReset() string { } type testGranter struct { - name string - buf *builderWithMu - r requester + gk grantKind + name string + buf *builderWithMu + r requester + + // Configurable knobs for tests. returnValueFromTryGet bool additionalTokens int64 } @@ -67,21 +70,26 @@ type testGranter struct { var _ granterWithStoreWriteDone = &testGranter{} func (tg *testGranter) grantKind() grantKind { - return slot + return tg.gk } + func (tg *testGranter) tryGet(count int64) bool { tg.buf.printf("tryGet%s: returning %t", tg.name, tg.returnValueFromTryGet) return tg.returnValueFromTryGet } + func (tg *testGranter) returnGrant(count int64) { tg.buf.printf("returnGrant%s %d", tg.name, count) } + func (tg *testGranter) tookWithoutPermission(count int64) { tg.buf.printf("tookWithoutPermission%s %d", tg.name, count) } + func (tg *testGranter) continueGrantChain(grantChainID grantChainID) { tg.buf.printf("continueGrantChain%s %d", tg.name, grantChainID) } + func (tg *testGranter) grant(grantChainID grantChainID) { rv := tg.r.granted(grantChainID) if rv > 0 { @@ -93,6 +101,7 @@ func (tg *testGranter) grant(grantChainID grantChainID) { } tg.buf.printf("granted%s: returned %d", tg.name, rv) } + func (tg *testGranter) storeWriteDone( originalTokens int64, doneInfo StoreWorkDoneInfo, ) (additionalTokens int64) { @@ -186,7 +195,7 @@ func TestWorkQueueBasic(t *testing.T) { switch d.Cmd { case "init": closeFn() - tg = &testGranter{buf: &buf} + tg = &testGranter{gk: slot, buf: &buf} opts := makeWorkQueueOptions(KVWork) timeSource = timeutil.NewManualTime(initialTime) opts.timeSource = timeSource @@ -329,7 +338,7 @@ func TestWorkQueueTokenResetRace(t *testing.T) { defer log.Scope(t).Close(t) var buf builderWithMu - tg := &testGranter{buf: &buf} + tg := &testGranter{gk: slot, buf: &buf} st := cluster.MakeTestingClusterSettings() registry := metric.NewRegistry() metrics := makeWorkQueueMetrics("", registry) @@ -506,16 +515,16 @@ func TestStoreWorkQueueBasic(t *testing.T) { switch d.Cmd { case "init": closeFn() - tg[admissionpb.RegularWorkClass] = &testGranter{name: " regular", buf: &buf} - tg[admissionpb.ElasticWorkClass] = &testGranter{name: " elastic", buf: &buf} + tg[admissionpb.RegularWorkClass] = &testGranter{gk: token, name: " regular", buf: &buf} + tg[admissionpb.ElasticWorkClass] = &testGranter{gk: token, name: " elastic", buf: &buf} opts := makeWorkQueueOptions(KVWork) opts.usesTokens = true opts.timeSource = timeutil.NewManualTime(timeutil.FromUnixMicros(0)) opts.disableEpochClosingGoroutine = true st = cluster.MakeTestingClusterSettings() - q = makeStoreWorkQueue(log.MakeTestingAmbientContext(tracing.NewTracer()), + q = makeStoreWorkQueue(log.MakeTestingAmbientContext(tracing.NewTracer()), roachpb.StoreID(1), [admissionpb.NumWorkClasses]granterWithStoreWriteDone{tg[admissionpb.RegularWorkClass], tg[admissionpb.ElasticWorkClass]}, - st, metrics, opts).(*StoreWorkQueue) + st, metrics, opts, nil /* testing knobs */).(*StoreWorkQueue) tg[admissionpb.RegularWorkClass].r = q.getRequesters()[admissionpb.RegularWorkClass] tg[admissionpb.ElasticWorkClass].r = q.getRequesters()[admissionpb.ElasticWorkClass] wrkMap.resetMap() diff --git a/pkg/util/timeutil/time.go b/pkg/util/timeutil/time.go index 811f63b72bff..42f37be43486 100644 --- a/pkg/util/timeutil/time.go +++ b/pkg/util/timeutil/time.go @@ -90,12 +90,20 @@ func Until(t time.Time) time.Duration { var UnixEpoch = time.Unix(0, 0).UTC() // FromUnixMicros returns the UTC time.Time corresponding to the given Unix -// time, usec microseconds since UnixEpoch. In Go's current time.Time +// time, usec microseconds since UnixEpoch. +// In Go's current time.Time // implementation, all possible values for us can be represented as a time.Time. func FromUnixMicros(us int64) time.Time { return time.Unix(us/1e6, (us%1e6)*1e3).UTC() } +// FromUnixNanos returns the UTC time.Time corresponding to the given Unix +// time, ns nanoseconds since UnixEpoch. In Go's current time.Time +// implementation, all possible values for ns can be represented as a time.Time. +func FromUnixNanos(ns int64) time.Time { + return time.Unix(ns/1e9, ns%1e9).UTC() +} + // ToUnixMicros returns t as the number of microseconds elapsed since UnixEpoch. // Fractional microseconds are rounded, half up, using time.Round. Similar to // time.Time.UnixNano, the result is undefined if the Unix time in microseconds