From 3d3910b15061844ff809bcb6fa8c0ca52f8fcf95 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Tue, 4 Apr 2023 11:03:07 -0400 Subject: [PATCH 1/3] rpc: remove debug logging This was added recently, in #94778, and contributes to log spam of the following sort: I230404 15:00:33.826337 2400 rpc/context.go:2249 [T1,n1,rnode=2,raddr=127.0.0.1:55941,class=default,rpc] 268 connection heartbeat loop ended with err: I230404 15:00:33.826338 3986 rpc/context.go:2249 [T1,n2,rnode=3,raddr=127.0.0.1:55955,class=system,rpc] 269 connection heartbeat loop ended with err: I230404 15:00:33.826367 3455 rpc/context.go:2249 [T1,n2,rnode=3,raddr=127.0.0.1:55955,class=default,rpc] 270 connection heartbeat loop ended with err: I230404 15:00:33.826394 3354 rpc/context.go:2249 [T1,n2,rnode=2,raddr=127.0.0.1:55941,class=default,rpc] 271 connection heartbeat loop ended with err: Release note: None --- pkg/rpc/context.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 097cb297c42e..72cea1abf667 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -2298,8 +2298,7 @@ func (rpcCtx *Context) grpcDialNodeInternal( // Run the heartbeat; this will block until the connection breaks for // whatever reason. We don't actually have to do anything with the error, // so we ignore it. - err := rpcCtx.runHeartbeat(ctx, conn, target) - log.Infof(ctx, "connection heartbeat loop ended with err: %v", err) + _ = rpcCtx.runHeartbeat(ctx, conn, target) maybeFatal(ctx, rpcCtx.m.Remove(k, conn)) // Context gets canceled on server shutdown, and if that's likely why From aa4ab6a0c5cb7ae7e90b5f5035626172b3885b20 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Thu, 6 Apr 2023 16:55:46 -0400 Subject: [PATCH 2/3] admission: move CreateTime-sequencing below-raft We move kvflowsequencer.Sequencer and its use in kvflowhandle.Handle (above-raft) to admission.sequencer, now used by admission.StoreWorkQueue (below-raft). This variant appeared in an earlier revision of #97599 where we first introduced monotonically increasing CreateTimes for a given raft group. In a subsequent commit, when integrating kvflowcontrol into the critical path for replication traffic, we'll observe that it's quite difficult to create sequencing CreateTimes[^1] above raft. This is because these sequence numbers are encoded as part of the raft proposal[^2], and at encode-time, we don't actually know what log position the proposal is going to end up in. It's hard to explicitly guarantee that a proposal with log-position P1 will get encoded before another with log position P2, where P1 < P2. Naively sequencing CreateTimes at proposal-encode-time could result in over-admission. This is because of how we return flow tokens -- up to some log index[^3], and how use these sequence numbers in below-raft WorkQueues. If P2 ends up with a lower sequence number/CreateTime, it would get admitted first, and when returning flow tokens by log position, in specifying up-to-P2, we'll early return P1's flow tokens despite it not being admitted. So we'd over-admit at the sender. This is all within a pair. [^1]: We use CreateTimes as "sequence numbers" in replication admission control. We want to assign each AC-queued work below-raft a "sequence number" for FIFO ordering within a . We ensure these timestamps are roughly monotonic with respect to log positions of replicated work by sequencing work in log position order. [^2]: In kvflowcontrolpb.RaftAdmissionMeta. [^3]: See kvflowcontrolpb.AdmittedRaftLogEntries. Release note: None --- pkg/BUILD.bazel | 4 -- pkg/kv/kvserver/kvflowcontrol/doc.go | 2 +- .../kvserver/kvflowcontrol/kvflowcontrol.go | 8 +-- .../kvflowcontroller/kvflowcontroller.go | 5 +- .../kvflowcontrol/kvflowhandle/BUILD.bazel | 2 - .../kvflowhandle/kvflowhandle.go | 23 +++---- .../kvflowhandle/kvflowhandle_test.go | 67 +------------------ .../kvflowcontrol/kvflowsequencer/BUILD.bazel | 29 -------- .../kvflowsimulator/simulation_test.go | 2 +- pkg/util/admission/BUILD.bazel | 2 + .../admission}/sequencer.go | 46 +++++++------ .../admission}/sequencer_test.go | 33 ++------- ...h_create_time_low_position_different_range | 2 +- .../high_create_time_low_position_same_range | 52 ++++++++++++++ .../tenant_fairness | 3 +- .../admission}/testdata/sequencer | 9 +-- pkg/util/admission/work_queue.go | 59 +++++++++++++++- 17 files changed, 166 insertions(+), 182 deletions(-) delete mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/BUILD.bazel rename pkg/{kv/kvserver/kvflowcontrol/kvflowsequencer => util/admission}/sequencer.go (52%) rename pkg/{kv/kvserver/kvflowcontrol/kvflowsequencer => util/admission}/sequencer_test.go (64%) create mode 100644 pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_same_range rename pkg/{kv/kvserver/kvflowcontrol/kvflowsequencer => util/admission}/testdata/sequencer (91%) diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index a5ce75721742..27a0f33defc4 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -227,7 +227,6 @@ ALL_TESTS = [ "//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:kvflowdispatch_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle_test", - "//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer:kvflowsequencer_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker_test", "//pkg/kv/kvserver/kvstorage:kvstorage_test", @@ -1303,8 +1302,6 @@ GO_TARGETS = [ "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:kvflowdispatch_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle", "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle_test", - "//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer:kvflowsequencer", - "//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer:kvflowsequencer_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker_test", @@ -2770,7 +2767,6 @@ GET_X_DATA_TARGETS = [ "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:get_x_data", - "//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:get_x_data", "//pkg/kv/kvserver/kvserverbase:get_x_data", diff --git a/pkg/kv/kvserver/kvflowcontrol/doc.go b/pkg/kv/kvserver/kvflowcontrol/doc.go index ec784c1e1cd7..49910b2790d5 100644 --- a/pkg/kv/kvserver/kvflowcontrol/doc.go +++ b/pkg/kv/kvserver/kvflowcontrol/doc.go @@ -483,7 +483,7 @@ package kvflowcontrol // still queued after ~100ms, will trigger epoch-LIFO everywhere. // [^11]: See the implementation for kvflowcontrol.Dispatch. // [^12]: See UpToRaftLogPosition in AdmittedRaftLogEntries. -// [^13]: See kvflowsequencer.Sequencer and its use in kvflowhandle.Handle. +// [^13]: See admission.sequencer and its use in admission.StoreWorkQueue. // [^14]: See the high_create_time_low_position_different_range test case for // TestReplicatedWriteAdmission. // diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go index e55cb2acc4c7..de6641205a58 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go @@ -98,13 +98,11 @@ type Handle interface { // work with given priority along connected streams. The deduction is // tracked with respect to the specific raft log position it's expecting it // to end up in, log positions that monotonically increase. Requests are - // assumed to have been Admit()-ed first. The returned time.Time parameter - // is to be used as the work item's CreateTime when enqueueing in IO - // admission queues. + // assumed to have been Admit()-ed first. DeductTokensFor( - context.Context, admissionpb.WorkPriority, time.Time, + context.Context, admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens, - ) time.Time + ) // ReturnTokensUpto returns all previously deducted tokens of a given // priority for all log positions less than or equal to the one specified. // It does for the specific stream. Once returned, subsequent attempts to diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go index 46c8e6b0cfb5..a18472414715 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go @@ -183,8 +183,9 @@ func (c *Controller) Admit( } } - // TODO(irfansharif): Use the create time for ordering among waiting - // requests. Integrate it with epoch-LIFO. + // TODO(irfansharif): Use CreateTime for ordering among waiting + // requests, integrate it with epoch-LIFO. See I12 from + // kvflowcontrol/doc.go. } // DeductTokens is part of the kvflowcontrol.Controller interface. diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/BUILD.bazel index ff07b3cc5958..0b4379c2a72b 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/BUILD.bazel @@ -14,7 +14,6 @@ go_library( "//pkg/base", "//pkg/kv/kvserver/kvflowcontrol", "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", - "//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker", "//pkg/util/admission/admissionpb", "//pkg/util/hlc", @@ -40,7 +39,6 @@ go_test( "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/metric", - "//pkg/util/timeutil", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go index 1c43ffeb2ab7..0ff5ebf18a10 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -41,7 +40,6 @@ type Handle struct { // (identified by their log positions) have been admitted below-raft, // streams disconnect, or the handle closed entirely. perStreamTokenTracker map[kvflowcontrol.Stream]*kvflowtokentracker.Tracker - sequencer *kvflowsequencer.Sequencer closed bool } } @@ -54,7 +52,6 @@ func New(controller kvflowcontrol.Controller, metrics *Metrics, clock *hlc.Clock clock: clock, } h.mu.perStreamTokenTracker = map[kvflowcontrol.Stream]*kvflowtokentracker.Tracker{} - h.mu.sequencer = kvflowsequencer.New() return h } @@ -104,31 +101,28 @@ func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct tim func (h *Handle) DeductTokensFor( ctx context.Context, pri admissionpb.WorkPriority, - ct time.Time, pos kvflowcontrolpb.RaftLogPosition, tokens kvflowcontrol.Tokens, -) time.Time { +) { if h == nil { // TODO(irfansharif): See TODO around nil receiver check in Admit(). - return ct + return } - ct, _ = h.deductTokensForInner(ctx, pri, ct, pos, tokens) - return ct + _ = h.deductTokensForInner(ctx, pri, pos, tokens) } func (h *Handle) deductTokensForInner( ctx context.Context, pri admissionpb.WorkPriority, - ct time.Time, pos kvflowcontrolpb.RaftLogPosition, tokens kvflowcontrol.Tokens, -) (sequence time.Time, streams []kvflowcontrol.Stream) { +) (streams []kvflowcontrol.Stream) { h.mu.Lock() defer h.mu.Unlock() if h.mu.closed { log.Errorf(ctx, "operating on a closed handle") - return ct, nil // unused return value in production code + return nil // unused return value in production code } for _, c := range h.mu.connections { @@ -136,7 +130,7 @@ func (h *Handle) deductTokensForInner( h.mu.perStreamTokenTracker[c.Stream()].Track(ctx, pri, tokens, pos) streams = append(streams, c.Stream()) } - return h.mu.sequencer.Sequence(ct), streams + return streams } // ReturnTokensUpto is part of the kvflowcontrol.Handle interface. @@ -322,9 +316,8 @@ func (h *Handle) TestingNonBlockingAdmit( func (h *Handle) TestingDeductTokensForInner( ctx context.Context, pri admissionpb.WorkPriority, - ct time.Time, pos kvflowcontrolpb.RaftLogPosition, tokens kvflowcontrol.Tokens, -) (time.Time, []kvflowcontrol.Stream) { - return h.deductTokensForInner(ctx, pri, ct, pos, tokens) +) []kvflowcontrol.Stream { + return h.deductTokensForInner(ctx, pri, pos, tokens) } diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go index 72f5d7e05724..a4f1182ac19f 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go @@ -26,7 +26,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" ) @@ -80,7 +79,7 @@ func TestHandleAdmit(t *testing.T) { // Connect a single stream at pos=0 and deplete all 16MiB of regular // tokens at pos=1. handle.ConnectStream(ctx, pos(0), stream) - handle.DeductTokensFor(ctx, admissionpb.NormalPri, time.Time{}, pos(1), kvflowcontrol.Tokens(16<<20 /* 16MiB */)) + handle.DeductTokensFor(ctx, admissionpb.NormalPri, pos(1), kvflowcontrol.Tokens(16<<20 /* 16MiB */)) // Invoke .Admit() in a separate goroutine, and test below whether // the goroutine is blocked. @@ -106,67 +105,3 @@ func TestHandleAdmit(t *testing.T) { }) } } - -// TestHandleSequencing tests the sequencing behavior of -// Handle.DeductTokensFor(), namely that we: -// - advance sequencing timestamps when the create-time advances; -// - advance sequencing timestamps when the log position advances. -func TestHandleSequencing(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - // tzero represents the t=0, the earliest possible time. All other - // create-time= is relative to this time. - var tzero = timeutil.Unix(0, 0) - - ctx := context.Background() - stream := kvflowcontrol.Stream{ - TenantID: roachpb.MustMakeTenantID(42), - StoreID: roachpb.StoreID(42), - } - pos := func(t, i uint64) kvflowcontrolpb.RaftLogPosition { - return kvflowcontrolpb.RaftLogPosition{Term: t, Index: i} - } - ct := func(d int64) time.Time { - return tzero.Add(time.Nanosecond * time.Duration(d)) - } - - const tokens = kvflowcontrol.Tokens(1 << 20 /* MiB */) - const normal = admissionpb.NormalPri - - registry := metric.NewRegistry() - clock := hlc.NewClockForTesting(nil) - controller := kvflowcontroller.New(registry, cluster.MakeTestingClusterSettings(), clock) - handle := kvflowhandle.New(controller, kvflowhandle.NewMetrics(registry), clock) - - // Test setup: handle is connected to a single stream at pos=1/0 and has - // deducted 1MiB of regular tokens at pos=1 ct=1. - handle.ConnectStream(ctx, pos(1, 0), stream) - seq0 := handle.DeductTokensFor(ctx, normal, ct(1), pos(1, 1), tokens) - - // If create-time advances, so does the sequencing timestamp. - seq1 := handle.DeductTokensFor(ctx, normal, ct(2), pos(1, 1), tokens) - require.Greater(t, seq1, seq0) - - // If stays static, the sequencing timestamp - // still advances. - seq2 := handle.DeductTokensFor(ctx, normal, ct(2), pos(1, 1), tokens) - require.Greater(t, seq2, seq1) - - // If the log index advances, so does the sequencing timestamp. - seq3 := handle.DeductTokensFor(ctx, normal, ct(3), pos(1, 2), tokens) - require.Greater(t, seq3, seq2) - - // If the log term advances, so does the sequencing timestamp. - seq4 := handle.DeductTokensFor(ctx, normal, ct(3), pos(2, 2), tokens) - require.Greater(t, seq4, seq3) - - // If both the create-time and log-position advance, so does the sequencing - // timestamp. - seq5 := handle.DeductTokensFor(ctx, normal, ct(1000), pos(4, 20), tokens) - require.Greater(t, seq5, seq4) - - // Verify that the sequencing timestamp is kept close to the maximum - // observed create-time. - require.LessOrEqual(t, seq5.Sub(ct(1000)), time.Nanosecond) -} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/BUILD.bazel deleted file mode 100644 index 4b7b66091b3e..000000000000 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/BUILD.bazel +++ /dev/null @@ -1,29 +0,0 @@ -load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") -load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") - -go_library( - name = "kvflowsequencer", - srcs = ["sequencer.go"], - importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer", - visibility = ["//visibility:public"], - deps = ["//pkg/util/timeutil"], -) - -go_test( - name = "kvflowsequencer_test", - srcs = ["sequencer_test.go"], - args = ["-test.timeout=295s"], - data = glob(["testdata/**"]), - embed = [":kvflowsequencer"], - deps = [ - "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", - "//pkg/testutils/datapathutils", - "//pkg/util/leaktest", - "//pkg/util/log", - "//pkg/util/timeutil", - "@com_github_cockroachdb_datadriven//:datadriven", - "@com_github_stretchr_testify//require", - ], -) - -get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/simulation_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/simulation_test.go index 6cdc943cfc32..d898c39ce43a 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/simulation_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/simulation_test.go @@ -630,7 +630,7 @@ func (h *replicaHandle) deductTokens( // Increment the quorum log position -- all token deductions are bound to // incrementing log positions. h.quorumLogPosition.Index += 1 - _, streams := h.handle.TestingDeductTokensForInner(ctx, pri, time.Time{}, h.quorumLogPosition, tokens) + streams := h.handle.TestingDeductTokensForInner(ctx, pri, h.quorumLogPosition, tokens) for _, stream := range streams { h.deductionTracker[stream].Track(ctx, pri, tokens, h.quorumLogPosition) } diff --git a/pkg/util/admission/BUILD.bazel b/pkg/util/admission/BUILD.bazel index 3ed968ba72fa..e2dbecf8603b 100644 --- a/pkg/util/admission/BUILD.bazel +++ b/pkg/util/admission/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "kv_slot_adjuster.go", "pacer.go", "scheduler_latency_listener.go", + "sequencer.go", "sql_cpu_overload_indicator.go", "store_token_estimation.go", "testing_knobs.go", @@ -56,6 +57,7 @@ go_test( "io_load_listener_test.go", "replicated_write_admission_test.go", "scheduler_latency_listener_test.go", + "sequencer_test.go", "store_token_estimation_test.go", "tokens_linear_model_test.go", "work_queue_test.go", diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer.go b/pkg/util/admission/sequencer.go similarity index 52% rename from pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer.go rename to pkg/util/admission/sequencer.go index 9cc0271f4257..05b389cc9310 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer.go +++ b/pkg/util/admission/sequencer.go @@ -8,15 +8,9 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package kvflowsequencer +package admission -import ( - "time" - - "github.com/cockroachdb/cockroach/pkg/util/timeutil" -) - -// Sequencer issues monotonic sequencing timestamps derived from observed +// sequencer issues monotonic sequencing timestamps derived from observed // CreateTimes. This is a purpose-built data structure for replication admission // control where we want to assign each AC-queued work below-raft a "sequence // number" for FIFO ordering within a . We ensure timestamps @@ -27,9 +21,29 @@ import ( // // It's not safe for concurrent access. // +// ---- +// +// Aside: Why not do this CreateTime-generation above raft? This is because these +// sequence numbers are encoded as part of the raft proposal[3], and at +// encode-time, we don't actually know what log position the proposal is going +// to end up in. It's hard to explicitly guarantee that a proposal with +// log-position P1 will get encoded before another with log position P2, where +// P1 < P2. +// +// If we tried to "approximate" CreateTimes at proposal-encode-time, +// approximating log position order, it could result in over-admission. This is +// because of how we return flow tokens -- up to some log index[4], and how use +// these sequence numbers in below-raft WorkQueues. If P2 ends up with a lower +// sequence number/CreateTime, it would get admitted first, and when returning +// flow tokens by log position, in specifying up-to-P2, we'll early return P1's +// flow tokens despite it not being admitted. So we'd over-admit at the sender. +// This is all within a pair. +// // [1]: See I12 from kvflowcontrol/doc.go. -// [2]: See kvflowhandle.Handle. -type Sequencer struct { +// [2]: See kvadmission.AdmitRaftEntry. +// [3]: In kvflowcontrolpb.RaftAdmissionMeta. +// [4]: See kvflowcontrolpb.AdmittedRaftLogEntries. +type sequencer struct { // maxCreateTime ratchets to the highest observed CreateTime. If sequencing // work with lower CreateTimes, we continue generating monotonic sequence // numbers by incrementing it for every such sequencing attempt. Provided @@ -38,18 +52,12 @@ type Sequencer struct { maxCreateTime int64 } -// New returns a new Sequencer. -func New() *Sequencer { - return &Sequencer{} -} - -// Sequence returns a monotonically increasing timestamps derived from the +// sequence returns a monotonically increasing timestamps derived from the // provided CreateTime. -func (s *Sequencer) Sequence(ct time.Time) time.Time { - createTime := ct.UnixNano() +func (s *sequencer) sequence(createTime int64) int64 { if createTime <= s.maxCreateTime { createTime = s.maxCreateTime + 1 } s.maxCreateTime = createTime - return timeutil.FromUnixNanos(createTime) + return createTime } diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer_test.go b/pkg/util/admission/sequencer_test.go similarity index 64% rename from pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer_test.go rename to pkg/util/admission/sequencer_test.go index cc97d1a86047..968c47ae2629 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer_test.go +++ b/pkg/util/admission/sequencer_test.go @@ -8,16 +8,13 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package kvflowsequencer +package admission import ( "fmt" - "strconv" - "strings" "testing" "time" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -30,13 +27,13 @@ func TestSequencer(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - var sequencer *Sequencer + var seq *sequencer var lastSeqNum int64 datadriven.RunTest(t, datapathutils.TestDataPath(t, "sequencer"), func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { case "init": - sequencer = New() + seq = &sequencer{} return "" case "sequence": @@ -49,8 +46,8 @@ func TestSequencer(t *testing.T) { // Parse log-position=/. logPosition := parseLogPosition(t, d) - _ = logPosition - sequenceNum := sequencer.Sequence(tzero.Add(dur)).UnixNano() + _ = logPosition // unused + sequenceNum := seq.sequence(tzero.Add(dur).UnixNano()) if lastSeqNum < sequenceNum { movement = " (advanced)" } @@ -67,23 +64,3 @@ func TestSequencer(t *testing.T) { }, ) } - -// tzero represents the t=0, the earliest possible time. All other -// create-time= is relative to this time. -var tzero = timeutil.Unix(0, 0) - -func parseLogPosition(t *testing.T, d *datadriven.TestData) kvflowcontrolpb.RaftLogPosition { - // Parse log-position=/. - var arg string - d.ScanArgs(t, "log-position", &arg) - inner := strings.Split(arg, "/") - require.Len(t, inner, 2) - term, err := strconv.Atoi(inner[0]) - require.NoError(t, err) - index, err := strconv.Atoi(inner[1]) - require.NoError(t, err) - return kvflowcontrolpb.RaftLogPosition{ - Term: uint64(term), - Index: uint64(index), - } -} diff --git a/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_different_range b/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_different_range index d36a0455822e..d7a28c2ff8bc 100644 --- a/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_different_range +++ b/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_different_range @@ -19,7 +19,7 @@ admit tenant=t1 pri=normal-pri create-time=1us size=1B range=r2 origin=n1 log-po # request with the lower create time sorts first despite having the higher log # position. Admission work queues order work based entirely on create-times, # and the assignment of monotonic create-times (WRT log positions) happens only -# within a range and by higher-level components -- kvflowcontrol.Handle. +# within a range by the StoreWorkQueue. print ---- physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B diff --git a/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_same_range b/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_same_range new file mode 100644 index 000000000000..3de81d4eb0b4 --- /dev/null +++ b/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_same_range @@ -0,0 +1,52 @@ +# Verify that we ignore create-time based ordering for replicated write +# admission when writes happen within the same range. + +init +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# Admit two requests, one created at t=5us but with a lower log position. +admit tenant=t1 pri=normal-pri create-time=5us size=1B range=r1 origin=n1 log-position=4/20 +---- +[regular] try-get=1B available=0B => insufficient tokens + +# And one created at t=1us but but higher log position. +admit tenant=t1 pri=normal-pri create-time=1us size=1B range=r1 origin=n1 log-position=4/21 +---- + +# Observe both waiting requests and physical admission stats. Note how the +# request with the lower log position sorts first despite having the higher +# create-time. The StoreWorkQueue sequences them by (ab)using the create-time +# parameter to get this log position ordering. +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=normal-pri create-time=5µs size=1B range=r1 origin=n1 log-position=4/20] + [1: pri=normal-pri create-time=5.001µs size=1B range=r1 origin=n1 log-position=4/21] +[elastic work queue]: len(tenant-heap)=0 + +# Produce 1B worth of regular tokens. +granter class=regular adjust-tokens=+1B +---- +[regular] 1B tokens available +[elastic] 0B tokens available + +# Grant admission to requests. Since we have 1B worth of tokens, and 2 waiting +# requests wanting 1B each, we're only able to admit one. Verify that it's the +# request with the lower log position despite the higher original create-time. +grant class=regular +---- +admitted [tenant=t1 pri=normal-pri create-time=5µs size=1B range=r1 origin=n1 log-position=4/20] + +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=1B + [0: pri=normal-pri create-time=5.001µs size=1B range=r1 origin=n1 log-position=4/21] +[elastic work queue]: len(tenant-heap)=0 + +# vim:ft=sh diff --git a/pkg/util/admission/testdata/replicated_write_admission/tenant_fairness b/pkg/util/admission/testdata/replicated_write_admission/tenant_fairness index c00240b461f7..3f0d04dbd18a 100644 --- a/pkg/util/admission/testdata/replicated_write_admission/tenant_fairness +++ b/pkg/util/admission/testdata/replicated_write_admission/tenant_fairness @@ -7,8 +7,7 @@ init # For two tenants t1 and t2, try to admit two requests of 1B each at # incrementing log positions. We specify create-times in log-position order for -# work within a given range, similar to what we do at the issuing client -# (kvflowcontrol.Handle). +# work within a given range, similar to what we do at the StoreWorkQueue level. admit tenant=t1 pri=normal-pri create-time=1.001us size=1B range=r1 origin=n1 log-position=4/20 ---- [regular] try-get=1B available=0B => insufficient tokens diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/testdata/sequencer b/pkg/util/admission/testdata/sequencer similarity index 91% rename from pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/testdata/sequencer rename to pkg/util/admission/testdata/sequencer index ea335538f940..97716928fec2 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/testdata/sequencer +++ b/pkg/util/admission/testdata/sequencer @@ -1,7 +1,8 @@ -# Walk through the basics of how the per-handle sequencer works. The -# log-position= parameter is not actually used in the implementation, but in -# typical usage we'd be sequencing work in log position order, and it's -# instructive to understand how sequencing timestamps are generated. +# Walk through the basics of how the below-raft replicated work sequencer +# works. The log-position= parameter is not actually used in the +# implementation, but in typical usage we'd be sequencing work in log position +# order, and it's instructive to understand how sequencing timestamps are +# generated. # # ----------------------------------------------------------------------------- # 1. Observe how the sequence numbers change with changing log positions (and diff --git a/pkg/util/admission/work_queue.go b/pkg/util/admission/work_queue.go index 8a97cfa900ca..1284f8bb7b1e 100644 --- a/pkg/util/admission/work_queue.go +++ b/pkg/util/admission/work_queue.go @@ -156,6 +156,14 @@ var epochLIFOQueueDelayThresholdToSwitchToLIFO = settings.RegisterDurationSettin return nil }).WithPublic() +var rangeSequencerGCThreshold = settings.RegisterDurationSetting( + settings.TenantWritable, + "admission.replication_control.range_sequencer_gc_threshold", + "the inactive duration for a range sequencer after it's garbage collected", + 5*time.Minute, + settings.NonNegativeDuration, +) + // WorkInfo provides information that is used to order work within an WorkQueue. // The WorkKind is not included as a field since an WorkQueue deals with a // single WorkKind. @@ -1773,9 +1781,13 @@ type StoreWorkQueue struct { // and observed L0 growth (which factors in state machine application). stats storeAdmissionStats } - stopCh chan struct{} - timeSource timeutil.TimeSource - settings *cluster.Settings + sequencersMu struct { + syncutil.Mutex + s map[roachpb.RangeID]*sequencer // cleaned up periodically + } + stopCh chan struct{} + timeSource timeutil.TimeSource + settings *cluster.Settings knobs *TestingKnobs } @@ -1824,6 +1836,9 @@ func (q *StoreWorkQueue) Admit( info.RequestedCount = q.mu.estimates.writeTokens q.mu.RUnlock() } + if info.ReplicatedWorkInfo.Enabled { + info.CreateTime = q.sequenceReplicatedWork(info.CreateTime, info.ReplicatedWorkInfo) + } enabled, err := q.q[wc].Admit(ctx, info.WorkInfo) if err != nil { @@ -2042,5 +2057,43 @@ func makeStoreWorkQueue( q.mu.estimates = storeRequestEstimates{ writeTokens: 1, } + + q.sequencersMu.s = make(map[roachpb.RangeID]*sequencer) + go func() { + ticker := time.NewTicker(30 * time.Second) + for { + select { + case <-ticker.C: + q.gcSequencers() + case <-q.stopCh: + return + } + } + }() return q } + +func (q *StoreWorkQueue) gcSequencers() { + q.sequencersMu.Lock() + defer q.sequencersMu.Unlock() + + for rangeID, seq := range q.sequencersMu.s { + maxCreateTime := timeutil.FromUnixNanos(seq.maxCreateTime) + if q.timeSource.Now().Sub(maxCreateTime) > rangeSequencerGCThreshold.Get(&q.settings.SV) { + delete(q.sequencersMu.s, rangeID) + } + } +} + +func (q *StoreWorkQueue) sequenceReplicatedWork(createTime int64, info ReplicatedWorkInfo) int64 { + q.sequencersMu.Lock() + seq, ok := q.sequencersMu.s[info.RangeID] + if !ok { + seq = &sequencer{} + q.sequencersMu.s[info.RangeID] = seq + } + q.sequencersMu.Unlock() + // We're assuming sequenceReplicatedWork is never invoked concurrently for a + // given RangeID. + return seq.sequence(createTime) +} From 05c6ae3da6fb56ae2c12b6c807d3f2a7b802504f Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Thu, 6 Apr 2023 17:49:32 -0400 Subject: [PATCH 3/3] admission: add intercept points for when replicated work gets admitted In a subsequent commit, when integrating kvflowcontrol into the critical path for replication traffic, we'll set up the return of flow tokens from the receiver node back to the sender once log entries get (asynchronously) admitted[^1]. So we need to intercept the exact points at which the virtually enqueued work items get admitted, since it all happens asynchronously[^2]. To that end we introduce the following interface: // OnLogEntryAdmitted is used to observe the specific entries // (identified by rangeID + log position) that were admitted. Since // admission control for log entries is asynchronous/non-blocking, // this allows callers to do requisite post-admission // bookkeeping. type OnLogEntryAdmitted interface { AdmittedLogEntry( origin roachpb.NodeID, /* node where the entry originated */ pri admissionpb.WorkPriority, /* admission priority of the entry */ storeID roachpb.StoreID, /* store on which the entry was admitted */ rangeID roachpb.RangeID, /* identifying range for the log entry */ pos LogPosition, /* log position of the entry that was admitted*/ ) } For now we pass in a no-op implementation in production code, but this will change shortly. Seeing as how the asynchronous admit interface is going to be the primary once once we enable replication admission control by default, for IO control, we no longer need the storeWriteDone interfaces and corresponding types. It's being used by our current (and soon-to-be legacy) above-raft IO admission control to inform granters of when the write was actually done, post-admission. For above-raft IO control, at admit-time we do not have sizing info for the writes, so by intercepting these writes at write-done time we're able to make any outstanding token adjustments in the granter. To reflect this new world, we: - Rename setAdmittedDoneModels to setLinearModels. - Introduce a storeReplicatedWorkAdmittedInfo[^3]. It provides information about the size of replicated work once it's admitted (which happens asynchronously from the work itself). This lets us use the underlying linear models for L0 {writes,ingests} to deduct an appropriate number of tokens from the granter, for the admitted work size[^4]. - Rename the granterWithStoreWriteDone interface to granterWithStoreReplicatedWorkAdmitted. We'll still intercept the actual point of admission for some token adjustments, through the the storeReplicatedWorkAdmittedLocked API shown below. There are two callstacks through which this API gets invoked, one where the coord.mu is already held, and one where it isn't. We plumb this information through so the lock is acquired if not already held. The locking structure is unfortunate, but this was a minimally invasive diff. storeReplicatedWorkAdmittedLocked( originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo, ) (additionalTokens int64) While here, we also export an admission.TestingReverseWorkPriorityDict. There are at least three tests that have re-invented the wheel. [^1]: This will happen through the kvflowcontrol.Dispatch interface introduced back in #97766, after integrating it with the RaftTransport layer. [^2]: Introduced in #97599, for replicated write work. [^3]: Identical to the previous StoreWorkDoneInfo. [^4]: There's a peculiarity here in that at enqueuing-time we actually know the size of the write, so we could have deducted the right number of tokens upfront and avoid this post-admit granter token adjustment. We inherit this structure from earlier, and just leave a TODO for now. Release note: None --- .../kvflowdispatch/kvflowdispatch_test.go | 7 +- .../kvflowtokentracker/tracker_test.go | 15 +-- pkg/server/server.go | 2 +- pkg/util/admission/admission.go | 40 ++++--- pkg/util/admission/admissionpb/admissionpb.go | 13 +- pkg/util/admission/elastic_cpu_work_handle.go | 5 +- pkg/util/admission/grant_coordinator.go | 28 ++++- pkg/util/admission/granter.go | 47 ++++---- pkg/util/admission/granter_test.go | 16 +-- pkg/util/admission/io_load_listener.go | 6 +- pkg/util/admission/io_load_listener_test.go | 4 +- .../replicated_write_admission_test.go | 26 ++-- pkg/util/admission/store_token_estimation.go | 16 ++- .../admission/store_token_estimation_test.go | 2 +- pkg/util/admission/work_queue.go | 111 ++++++++++++++---- pkg/util/admission/work_queue_test.go | 18 ++- 16 files changed, 233 insertions(+), 123 deletions(-) diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go index 5caf3f67343a..6610f902ba15 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go @@ -31,11 +31,6 @@ func TestDispatch(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - reverseWorkPriorityDict := make(map[string]admissionpb.WorkPriority) - for k, v := range admissionpb.WorkPriorityDict { - reverseWorkPriorityDict[v] = k - } - datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) { var dispatch *Dispatch datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { @@ -82,7 +77,7 @@ func TestDispatch(t *testing.T) { case strings.HasPrefix(parts[i], "pri="): // Parse pri=. - pri, found := reverseWorkPriorityDict[arg] + pri, found := admissionpb.TestingReverseWorkPriorityDict[arg] require.True(t, found) entries.AdmissionPriority = int32(pri) diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go index 858bda480797..7793501e0b75 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go @@ -32,19 +32,10 @@ func TestTracker(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - reverseWorkPriorityDict := make(map[string]admissionpb.WorkPriority) - for k, v := range admissionpb.WorkPriorityDict { - reverseWorkPriorityDict[v] = k - } - ctx := context.Background() datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) { var tracker *Tracker - knobs := &kvflowcontrol.TestingKnobs{ - UntrackTokensInterceptor: func(tokens kvflowcontrol.Tokens, pos kvflowcontrolpb.RaftLogPosition) { - - }, - } + knobs := &kvflowcontrol.TestingKnobs{} datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { case "init": @@ -73,7 +64,7 @@ func TestTracker(t *testing.T) { switch { case strings.HasPrefix(parts[i], "pri="): var found bool - pri, found = reverseWorkPriorityDict[arg] + pri, found = admissionpb.TestingReverseWorkPriorityDict[arg] require.True(t, found) case strings.HasPrefix(parts[i], "tokens="): @@ -103,7 +94,7 @@ func TestTracker(t *testing.T) { var priStr, logPositionStr string d.ScanArgs(t, "pri", &priStr) d.ScanArgs(t, "up-to-log-position", &logPositionStr) - pri, found := reverseWorkPriorityDict[priStr] + pri, found := admissionpb.TestingReverseWorkPriorityDict[priStr] require.True(t, found) logPosition := parseLogPosition(t, logPositionStr) diff --git a/pkg/server/server.go b/pkg/server/server.go index c4f6a4f8469b..d422e07be84f 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -269,7 +269,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { if opts, ok := cfg.TestingKnobs.AdmissionControl.(*admission.Options); ok { admissionOptions.Override(opts) } - gcoords := admission.NewGrantCoordinators(cfg.AmbientCtx, st, admissionOptions, registry) + gcoords := admission.NewGrantCoordinators(cfg.AmbientCtx, st, admissionOptions, registry, &admission.NoopOnLogEntryAdmitted{}) engines, err := cfg.CreateEngines(ctx) if err != nil { diff --git a/pkg/util/admission/admission.go b/pkg/util/admission/admission.go index e59d16927952..5a79d6d358c8 100644 --- a/pkg/util/admission/admission.go +++ b/pkg/util/admission/admission.go @@ -178,7 +178,7 @@ type granter interface { // is a possibility that that raced with cancellation. // // Do not use this for doing store IO-related token adjustments when work is - // done -- that should be done via granterWithStoreWriteDone.storeWriteDone. + // done -- that should be done via granterWithStoreReplicatedWorkAdmitted.storeWriteDone. // // REQUIRES: count > 0. count == 1 for slots. returnGrant(count int64) @@ -195,7 +195,7 @@ type granter interface { // work turned out to be an underestimate. // // Do not use this for doing store IO-related token adjustments when work is - // done -- that should be done via granterWithStoreWriteDone.storeWriteDone. + // done -- that should be done via granterWithStoreReplicatedWorkAdmitted.storeWriteDone. // // REQUIRES: count > 0. count == 1 for slots. tookWithoutPermission(count int64) @@ -274,23 +274,33 @@ type granterWithIOTokens interface { // getDiskTokensUsedAndReset returns the disk bandwidth tokens used // since the last such call. getDiskTokensUsedAndReset() [admissionpb.NumWorkClasses]int64 - // setAdmittedDoneModelsLocked supplies the models to use when - // storeWriteDone is called, to adjust token consumption. Note that these - // models are not used for token adjustment at admission time -- that is - // handled by StoreWorkQueue and is not in scope of this granter. This - // asymmetry is due to the need to use all the functionality of WorkQueue at - // admission time. See the long explanatory comment at the beginning of - // store_token_estimation.go, regarding token estimation. - setAdmittedDoneModels(l0WriteLM tokensLinearModel, l0IngestLM tokensLinearModel, - ingestLM tokensLinearModel) + // setLinearModels supplies the models to use when storeWriteDone or + // storeReplicatedWorkAdmittedLocked is called, to adjust token consumption. + // Note that these models are not used for token adjustment at admission + // time -- that is handled by StoreWorkQueue and is not in scope of this + // granter. This asymmetry is due to the need to use all the functionality + // of WorkQueue at admission time. See the long explanatory comment at the + // beginning of store_token_estimation.go, regarding token estimation. + setLinearModels(l0WriteLM, l0IngestLM, ingestLM tokensLinearModel) } -// granterWithStoreWriteDone is used to abstract kvStoreTokenGranter for -// testing. The interface is used by StoreWorkQueue to pass on sizing -// information provided when the work was completed. -type granterWithStoreWriteDone interface { +// granterWithStoreReplicatedWorkAdmitted is used to abstract +// kvStoreTokenGranter for testing. The interface is used by StoreWorkQueue to +// pass on sizing information provided when the work is either done (for legacy, +// above-raft IO admission) or admitted (for below-raft, asynchronous admission +// control. +type granterWithStoreReplicatedWorkAdmitted interface { granter + // storeWriteDone is used by legacy, above-raft IO admission control to + // inform granters of when the write was actually done, post-admission. At + // admit-time we did not have sizing info for these writes, so by + // intercepting these writes at admit time we're able to make any + // outstanding token adjustments in the granter. storeWriteDone(originalTokens int64, doneInfo StoreWorkDoneInfo) (additionalTokens int64) + // storeReplicatedWorkAdmittedLocked is used by below-raft admission control + // to inform granters of work being admitted in order for them to make any + // outstanding token adjustments. It's invoked with the coord.mu held. + storeReplicatedWorkAdmittedLocked(originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo) (additionalTokens int64) } // cpuOverloadIndicator is meant to be an instantaneous indicator of cpu diff --git a/pkg/util/admission/admissionpb/admissionpb.go b/pkg/util/admission/admissionpb/admissionpb.go index 57dc9911080f..c230a5d11c38 100644 --- a/pkg/util/admission/admissionpb/admissionpb.go +++ b/pkg/util/admission/admissionpb/admissionpb.go @@ -53,7 +53,7 @@ func (w WorkPriority) SafeFormat(p redact.SafePrinter, verb rune) { p.Print(s) return } - p.Printf("custom-pri=%d", w) + p.Printf("custom-pri=%d", int8(w)) } // WorkPriorityDict is a mapping of the priorities to a short string name. The @@ -69,6 +69,17 @@ var WorkPriorityDict = map[WorkPriority]string{ HighPri: "high-pri", } +// TestingReverseWorkPriorityDict is the reverse-lookup dictionary for +// WorkPriorityDict, for use in tests. +var TestingReverseWorkPriorityDict map[string]WorkPriority + +func init() { + TestingReverseWorkPriorityDict = make(map[string]WorkPriority) + for k, v := range WorkPriorityDict { + TestingReverseWorkPriorityDict[v] = k + } +} + // WorkClass represents the class of work, which is defined entirely by its // WorkPriority. Namely, everything less than NormalPri is defined to be // "Elastic", while everything above and including NormalPri is considered diff --git a/pkg/util/admission/elastic_cpu_work_handle.go b/pkg/util/admission/elastic_cpu_work_handle.go index 85c5561304b5..42e60594419c 100644 --- a/pkg/util/admission/elastic_cpu_work_handle.go +++ b/pkg/util/admission/elastic_cpu_work_handle.go @@ -151,9 +151,8 @@ func TestingNewElasticCPUHandle() *ElasticCPUWorkHandle { return newElasticCPUWorkHandle(420 * time.Hour) // use a very high allotment } -// TestingNewElasticCPUWithCallback constructs an -// ElascticCPUWorkHandle with a testing override for the behaviour of -// OverLimit(). +// TestingNewElasticCPUHandleWithCallback constructs an ElasticCPUWorkHandle +// with a testing override for the behaviour of OverLimit(). func TestingNewElasticCPUHandleWithCallback(cb func() (bool, time.Duration)) *ElasticCPUWorkHandle { h := TestingNewElasticCPUHandle() h.testingOverrideOverLimit = cb diff --git a/pkg/util/admission/grant_coordinator.go b/pkg/util/admission/grant_coordinator.go index edf2b24b28e9..582bc5d6c7d0 100644 --- a/pkg/util/admission/grant_coordinator.go +++ b/pkg/util/admission/grant_coordinator.go @@ -61,6 +61,7 @@ type StoreGrantCoordinators struct { // api. numStores int pebbleMetricsProvider PebbleMetricsProvider + onLogEntryAdmitted OnLogEntryAdmitted closeCh chan struct{} disableTickerForTesting bool @@ -157,7 +158,7 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID) // This is IO work, so override the usesTokens value. opts.usesTokens = true // TODO(sumeer): add per-store WorkQueue state for debug.zip and db console. - granters := [admissionpb.NumWorkClasses]granterWithStoreWriteDone{ + granters := [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted{ &kvStoreTokenChildGranter{ workClass: admissionpb.RegularWorkClass, parent: kvg, @@ -168,7 +169,17 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID) }, } - storeReq := sgc.makeStoreRequesterFunc(sgc.ambientCtx, storeID, granters, sgc.settings, sgc.workQueueMetrics, opts, nil) + storeReq := sgc.makeStoreRequesterFunc( + sgc.ambientCtx, + storeID, + granters, + sgc.settings, + sgc.workQueueMetrics, + opts, + nil, /* knobs */ + sgc.onLogEntryAdmitted, + &coord.mu.Mutex, + ) coord.queues[KVWork] = storeReq requesters := storeReq.getRequesters() kvg.regularRequester = requesters[admissionpb.RegularWorkClass] @@ -336,8 +347,9 @@ type makeRequesterFunc func( metrics *WorkQueueMetrics, opts workQueueOptions) requester type makeStoreRequesterFunc func( - _ log.AmbientContext, storeID roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, + _ log.AmbientContext, storeID roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted, settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, knobs *TestingKnobs, + onLogEntryAdmitted OnLogEntryAdmitted, coordMu *syncutil.Mutex, ) storeRequester // NewGrantCoordinators constructs GrantCoordinators and WorkQueues for a @@ -356,13 +368,17 @@ type makeStoreRequesterFunc func( // GrantCoordinators since they are not trying to control CPU usage, so we turn // off grant chaining in those coordinators. func NewGrantCoordinators( - ambientCtx log.AmbientContext, st *cluster.Settings, opts Options, registry *metric.Registry, + ambientCtx log.AmbientContext, + st *cluster.Settings, + opts Options, + registry *metric.Registry, + onLogEntryAdmitted OnLogEntryAdmitted, ) GrantCoordinators { metrics := makeGrantCoordinatorMetrics() registry.AddMetricStruct(metrics) return GrantCoordinators{ - Stores: makeStoresGrantCoordinators(ambientCtx, opts, st, metrics, registry), + Stores: makeStoresGrantCoordinators(ambientCtx, opts, st, metrics, registry, onLogEntryAdmitted), Regular: makeRegularGrantCoordinator(ambientCtx, opts, st, metrics, registry), Elastic: makeElasticGrantCoordinator(ambientCtx, st, registry), } @@ -399,6 +415,7 @@ func makeStoresGrantCoordinators( st *cluster.Settings, metrics GrantCoordinatorMetrics, registry *metric.Registry, + onLogEntryAdmitted OnLogEntryAdmitted, ) *StoreGrantCoordinators { // These metrics are shared across all stores and broken down by priority for // the common priorities. @@ -417,6 +434,7 @@ func makeStoresGrantCoordinators( makeStoreRequesterFunc: makeStoreRequester, kvIOTokensExhaustedDuration: metrics.KVIOTokensExhaustedDuration, workQueueMetrics: storeWorkQueueMetrics, + onLogEntryAdmitted: onLogEntryAdmitted, } return storeCoordinators } diff --git a/pkg/util/admission/granter.go b/pkg/util/admission/granter.go index 8787020d0886..a117e6141861 100644 --- a/pkg/util/admission/granter.go +++ b/pkg/util/admission/granter.go @@ -324,7 +324,7 @@ type kvStoreTokenChildGranter struct { parent *kvStoreTokenGranter } -var _ granterWithStoreWriteDone = &kvStoreTokenChildGranter{} +var _ granterWithStoreReplicatedWorkAdmitted = &kvStoreTokenChildGranter{} var _ granter = &kvStoreTokenChildGranter{} // grantKind implements granter. @@ -352,11 +352,23 @@ func (cg *kvStoreTokenChildGranter) continueGrantChain(grantChainID grantChainID // Ignore since grant chains are not used for store tokens. } -// storeWriteDone implements granterWithStoreWriteDone. +// storeWriteDone implements granterWithStoreReplicatedWorkAdmitted. func (cg *kvStoreTokenChildGranter) storeWriteDone( originalTokens int64, doneInfo StoreWorkDoneInfo, ) (additionalTokens int64) { - return cg.parent.storeWriteDone(cg.workClass, originalTokens, doneInfo) + cg.parent.coord.mu.Lock() + defer cg.parent.coord.mu.Unlock() + // NB: the token/metric adjustments we want to make here are the same as we + // want to make through the storeReplicatedWorkAdmittedLocked, so we (ab)use it. + return cg.parent.storeReplicatedWorkAdmittedLocked( + cg.workClass, originalTokens, storeReplicatedWorkAdmittedInfo(doneInfo)) +} + +// storeReplicatedWorkAdmitted implements granterWithStoreReplicatedWorkAdmitted. +func (cg *kvStoreTokenChildGranter) storeReplicatedWorkAdmittedLocked( + originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo, +) (additionalTokens int64) { + return cg.parent.storeReplicatedWorkAdmittedLocked(cg.workClass, originalTokens, admittedInfo) } func (sg *kvStoreTokenGranter) tryGet(workClass admissionpb.WorkClass, count int64) bool { @@ -522,7 +534,7 @@ func (sg *kvStoreTokenGranter) getDiskTokensUsedAndReset() [admissionpb.NumWorkC } // setAdmittedModelsLocked implements granterWithIOTokens. -func (sg *kvStoreTokenGranter) setAdmittedDoneModels( +func (sg *kvStoreTokenGranter) setLinearModels( l0WriteLM tokensLinearModel, l0IngestLM tokensLinearModel, ingestLM tokensLinearModel, ) { sg.coord.mu.Lock() @@ -532,37 +544,21 @@ func (sg *kvStoreTokenGranter) setAdmittedDoneModels( sg.ingestLM = ingestLM } -// storeWriteDone implements granterWithStoreWriteDone. -func (sg *kvStoreTokenGranter) storeWriteDone( - wc admissionpb.WorkClass, originalTokens int64, doneInfo StoreWorkDoneInfo, +func (sg *kvStoreTokenGranter) storeReplicatedWorkAdmittedLocked( + wc admissionpb.WorkClass, originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo, ) (additionalTokens int64) { - // Normally, we follow the structure of a foo() method calling into a foo() - // method on the GrantCoordinator, which then calls fooLocked() on the - // kvStoreTokenGranter. For example, returnGrant follows this structure. - // This allows the GrantCoordinator to do two things (a) acquire the mu - // before calling into kvStoreTokenGranter, (b) do side-effects, like - // terminating grant chains and doing more grants after the call into the - // fooLocked() method. - // For storeWriteDone we don't bother with this structure involving the - // GrantCoordinator (which has served us well across various methods and - // various granter implementations), since the decision on when the - // GrantCoordinator should call tryGrantLocked is more complicated. And since this - // storeWriteDone is unique to the kvStoreTokenGranter (and not implemented - // by other granters) this approach seems acceptable. - // Reminder: coord.mu protects the state in the kvStoreTokenGranter. - sg.coord.mu.Lock() exhaustedFunc := func() bool { return sg.coordMu.availableIOTokens <= 0 || (wc == admissionpb.ElasticWorkClass && sg.coordMu.elasticDiskBWTokensAvailable <= 0) } wasExhausted := exhaustedFunc() - actualL0WriteTokens := sg.l0WriteLM.applyLinearModel(doneInfo.WriteBytes) - actualL0IngestTokens := sg.l0IngestLM.applyLinearModel(doneInfo.IngestedBytes) + actualL0WriteTokens := sg.l0WriteLM.applyLinearModel(admittedInfo.WriteBytes) + actualL0IngestTokens := sg.l0IngestLM.applyLinearModel(admittedInfo.IngestedBytes) actualL0Tokens := actualL0WriteTokens + actualL0IngestTokens additionalL0TokensNeeded := actualL0Tokens - originalTokens sg.subtractTokensLocked(additionalL0TokensNeeded, false) - actualIngestTokens := sg.ingestLM.applyLinearModel(doneInfo.IngestedBytes) + actualIngestTokens := sg.ingestLM.applyLinearModel(admittedInfo.IngestedBytes) additionalDiskBWTokensNeeded := (actualL0WriteTokens + actualIngestTokens) - originalTokens if wc == admissionpb.ElasticWorkClass { sg.coordMu.elasticDiskBWTokensAvailable -= additionalDiskBWTokensNeeded @@ -574,7 +570,6 @@ func (sg *kvStoreTokenGranter) storeWriteDone( sg.coord.tryGrantLocked() } } - sg.coord.mu.Unlock() // For multi-tenant fairness accounting, we choose to ignore disk bandwidth // tokens. Ideally, we'd have multiple resource dimensions for the fairness // decisions, but we don't necessarily need something more sophisticated diff --git a/pkg/util/admission/granter_test.go b/pkg/util/admission/granter_test.go index c085950b12ce..9fc6555dfdd9 100644 --- a/pkg/util/admission/granter_test.go +++ b/pkg/util/admission/granter_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/pebble" "github.com/stretchr/testify/require" @@ -97,7 +98,7 @@ func TestGranterBasic(t *testing.T) { return req } delayForGrantChainTermination = 0 - coords := NewGrantCoordinators(ambientCtx, settings, opts, registry) + coords := NewGrantCoordinators(ambientCtx, settings, opts, registry, &NoopOnLogEntryAdmitted{}) defer coords.Close() coord = coords.Regular return flushAndReset() @@ -109,8 +110,9 @@ func TestGranterBasic(t *testing.T) { storeCoordinators := &StoreGrantCoordinators{ settings: settings, makeStoreRequesterFunc: func( - ambientCtx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, + ambientCtx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted, settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, knobs *TestingKnobs, + _ OnLogEntryAdmitted, _ *syncutil.Mutex, ) storeRequester { makeTestRequester := func(wc admissionpb.WorkClass) *testRequester { req := &testRequester{ @@ -148,7 +150,7 @@ func TestGranterBasic(t *testing.T) { kvStoreGranter := coord.granters[KVWork].(*kvStoreTokenGranter) // Use the same model for all 3 kinds of models. tlm := tokensLinearModel{multiplier: 0.5, constant: 50} - kvStoreGranter.setAdmittedDoneModels(tlm, tlm, tlm) + kvStoreGranter.setLinearModels(tlm, tlm, tlm) return flushAndReset() case "set-has-waiting-requests": @@ -232,7 +234,7 @@ func TestGranterBasic(t *testing.T) { var origTokens, writeBytes int d.ScanArgs(t, "orig-tokens", &origTokens) d.ScanArgs(t, "write-bytes", &writeBytes) - requesters[scanWorkKind(t, d)].granter.(granterWithStoreWriteDone).storeWriteDone( + requesters[scanWorkKind(t, d)].granter.(granterWithStoreReplicatedWorkAdmitted).storeWriteDone( int64(origTokens), StoreWorkDoneInfo{WriteBytes: int64(writeBytes)}) coord.testingTryGrant() return flushAndReset() @@ -274,8 +276,8 @@ func TestStoreCoordinators(t *testing.T) { opts := Options{ makeRequesterFunc: makeRequesterFunc, makeStoreRequesterFunc: func( - ctx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, - settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, _ *TestingKnobs) storeRequester { + ctx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted, + settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, _ *TestingKnobs, _ OnLogEntryAdmitted, _ *syncutil.Mutex) storeRequester { reqReg := makeRequesterFunc(ctx, KVWork, granters[admissionpb.RegularWorkClass], settings, metrics, opts) reqElastic := makeRequesterFunc(ctx, KVWork, granters[admissionpb.ElasticWorkClass], settings, metrics, opts) str := &storeTestRequester{} @@ -286,7 +288,7 @@ func TestStoreCoordinators(t *testing.T) { return str }, } - coords := NewGrantCoordinators(ambientCtx, settings, opts, registry) + coords := NewGrantCoordinators(ambientCtx, settings, opts, registry, &NoopOnLogEntryAdmitted{}) // There is only 1 KVWork requester at this point in initialization, for the // Regular GrantCoordinator. require.Equal(t, 1, len(requesters)) diff --git a/pkg/util/admission/io_load_listener.go b/pkg/util/admission/io_load_listener.go index f7b65d20ca21..028ad32f2e38 100644 --- a/pkg/util/admission/io_load_listener.go +++ b/pkg/util/admission/io_load_listener.go @@ -416,8 +416,8 @@ func (io *ioLoadListener) adjustTokens(ctx context.Context, metrics StoreMetrics io.copyAuxEtcFromPerWorkEstimator() requestEstimates := io.perWorkTokenEstimator.getStoreRequestEstimatesAtAdmission() io.kvRequester.setStoreRequestEstimates(requestEstimates) - l0WriteLM, l0IngestLM, ingestLM := io.perWorkTokenEstimator.getModelsAtAdmittedDone() - io.kvGranter.setAdmittedDoneModels(l0WriteLM, l0IngestLM, ingestLM) + l0WriteLM, l0IngestLM, ingestLM := io.perWorkTokenEstimator.getModelsAtDone() + io.kvGranter.setLinearModels(l0WriteLM, l0IngestLM, ingestLM) if _, overloaded := io.ioThreshold.Score(); overloaded || io.aux.doLogFlush || io.elasticDiskBWTokens != unlimitedTokens { log.Infof(ctx, "IO overload: %s", io.adjustTokensResult) @@ -433,7 +433,7 @@ func (io *ioLoadListener) copyAuxEtcFromPerWorkEstimator() { io.adjustTokensResult.aux.perWorkTokensAux = io.perWorkTokenEstimator.aux requestEstimates := io.perWorkTokenEstimator.getStoreRequestEstimatesAtAdmission() io.adjustTokensResult.requestEstimates = requestEstimates - l0WriteLM, l0IngestLM, ingestLM := io.perWorkTokenEstimator.getModelsAtAdmittedDone() + l0WriteLM, l0IngestLM, ingestLM := io.perWorkTokenEstimator.getModelsAtDone() io.adjustTokensResult.l0WriteLM = l0WriteLM io.adjustTokensResult.l0IngestLM = l0IngestLM io.adjustTokensResult.ingestLM = ingestLM diff --git a/pkg/util/admission/io_load_listener_test.go b/pkg/util/admission/io_load_listener_test.go index b10507d9cd7d..2f2bfc0cc425 100644 --- a/pkg/util/admission/io_load_listener_test.go +++ b/pkg/util/admission/io_load_listener_test.go @@ -370,7 +370,7 @@ func (g *testGranterWithIOTokens) getDiskTokensUsedAndReset() [admissionpb.NumWo return g.diskBandwidthTokensUsed } -func (g *testGranterWithIOTokens) setAdmittedDoneModels( +func (g *testGranterWithIOTokens) setLinearModels( l0WriteLM tokensLinearModel, l0IngestLM tokensLinearModel, ingestLM tokensLinearModel, ) { fmt.Fprintf(&g.buf, "setAdmittedDoneModelsLocked: l0-write-lm: ") @@ -409,7 +409,7 @@ func (g *testGranterNonNegativeTokens) getDiskTokensUsedAndReset() [admissionpb. return [admissionpb.NumWorkClasses]int64{} } -func (g *testGranterNonNegativeTokens) setAdmittedDoneModels( +func (g *testGranterNonNegativeTokens) setLinearModels( l0WriteLM tokensLinearModel, l0IngestLM tokensLinearModel, ingestLM tokensLinearModel, ) { require.LessOrEqual(g.t, 0.5, l0WriteLM.multiplier) diff --git a/pkg/util/admission/replicated_write_admission_test.go b/pkg/util/admission/replicated_write_admission_test.go index d25adb9895bb..4d1e368922dd 100644 --- a/pkg/util/admission/replicated_write_admission_test.go +++ b/pkg/util/admission/replicated_write_admission_test.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/datadriven" @@ -112,14 +113,15 @@ func TestReplicatedWriteAdmission(t *testing.T) { printTrimmedBytes(originalTokens), rwi.RangeID, rwi.Origin, rwi.LogPosition, ingested) }, } + var mockCoordMu syncutil.Mutex storeWorkQueue = makeStoreWorkQueue( log.MakeTestingAmbientContext(tracing.NewTracer()), roachpb.StoreID(1), - [admissionpb.NumWorkClasses]granterWithStoreWriteDone{ + [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted{ tg[admissionpb.RegularWorkClass], tg[admissionpb.ElasticWorkClass], }, - st, metrics, opts, knobs, + st, metrics, opts, knobs, &NoopOnLogEntryAdmitted{}, &mockCoordMu, ).(*StoreWorkQueue) tg[admissionpb.RegularWorkClass].r = storeWorkQueue.getRequesters()[admissionpb.RegularWorkClass] tg[admissionpb.ElasticWorkClass].r = storeWorkQueue.getRequesters()[admissionpb.ElasticWorkClass] @@ -137,7 +139,7 @@ func TestReplicatedWriteAdmission(t *testing.T) { // Parse pri=. d.ScanArgs(t, "pri", &arg) - pri, found := reverseWorkPriorityDict[arg] + pri, found := admissionpb.TestingReverseWorkPriorityDict[arg] require.True(t, found) // Parse size=. @@ -369,15 +371,6 @@ func printWorkQueue(q *WorkQueue) string { // create-time= is relative to this time. var tzero = timeutil.Unix(0, 0) -var reverseWorkPriorityDict map[string]admissionpb.WorkPriority - -func init() { - reverseWorkPriorityDict = make(map[string]admissionpb.WorkPriority) - for k, v := range admissionpb.WorkPriorityDict { - reverseWorkPriorityDict[v] = k - } -} - type testReplicatedWriteGranter struct { t *testing.T wc admissionpb.WorkClass @@ -387,7 +380,7 @@ type testReplicatedWriteGranter struct { tokens int64 } -var _ granterWithStoreWriteDone = &testReplicatedWriteGranter{} +var _ granterWithStoreReplicatedWorkAdmitted = &testReplicatedWriteGranter{} func newTestReplicatedWriteGranter( t *testing.T, wc admissionpb.WorkClass, buf *builderWithMu, @@ -445,3 +438,10 @@ func (tg *testReplicatedWriteGranter) storeWriteDone( tg.tokens -= originalTokens return 0 } + +func (tg *testReplicatedWriteGranter) storeReplicatedWorkAdmittedLocked( + originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo, +) (additionalTokens int64) { + tg.tokens -= originalTokens + return 0 +} diff --git a/pkg/util/admission/store_token_estimation.go b/pkg/util/admission/store_token_estimation.go index 2e2eca842b8e..ea3b83edb863 100644 --- a/pkg/util/admission/store_token_estimation.go +++ b/pkg/util/admission/store_token_estimation.go @@ -12,6 +12,12 @@ package admission import "github.com/cockroachdb/pebble" +// TODO(irfansharif): This comment is a bit stale with replication admission +// control where admission is asynchronous. AC is informed of the write when +// it's being physically done, so we know its size then. We don't need upfront +// estimates anymore. The AdmittedWorkDone interface and surrounding types +// (StoreWorkDoneInfo for ex.) are no longer central. +// // The logic in this file deals with token estimation for a store write in two // situations: (a) at admission time, (b) when the admitted work is done. At // (a) we have no information provided about the work size (NB: this choice is @@ -105,7 +111,13 @@ const ingestMultiplierMin = 0.5 const ingestMultiplierMax = 1.5 type storePerWorkTokenEstimator struct { - atAdmissionWorkTokens int64 + atAdmissionWorkTokens int64 + + // TODO(irfansharif): The linear model fitters below are actually not used + // for upfront per-work token estimation. They're used in the granter to + // figure out the rate of tokens to produce. This code organization is + // confusing -- rename the type? + atDoneL0WriteTokensLinearModel tokensLinearModelFitter atDoneL0IngestTokensLinearModel tokensLinearModelFitter // Unlike the models above that model bytes into L0, this model computes all @@ -238,7 +250,7 @@ func (e *storePerWorkTokenEstimator) getStoreRequestEstimatesAtAdmission() store return storeRequestEstimates{writeTokens: e.atAdmissionWorkTokens} } -func (e *storePerWorkTokenEstimator) getModelsAtAdmittedDone() ( +func (e *storePerWorkTokenEstimator) getModelsAtDone() ( l0WriteLM tokensLinearModel, l0IngestLM tokensLinearModel, ingestLM tokensLinearModel, diff --git a/pkg/util/admission/store_token_estimation_test.go b/pkg/util/admission/store_token_estimation_test.go index b2898f203b46..6f65c3da8894 100644 --- a/pkg/util/admission/store_token_estimation_test.go +++ b/pkg/util/admission/store_token_estimation_test.go @@ -77,7 +77,7 @@ func TestStorePerWorkTokenEstimator(t *testing.T) { admissionStats.statsToIgnore.Bytes += uint64(ignoreIngestedIntoL0) } estimator.updateEstimates(l0Metrics, cumLSMIngestedBytes, admissionStats) - wL0lm, iL0lm, ilm := estimator.getModelsAtAdmittedDone() + wL0lm, iL0lm, ilm := estimator.getModelsAtDone() require.Equal(t, wL0lm, estimator.atDoneL0WriteTokensLinearModel.smoothedLinearModel) require.Equal(t, iL0lm, estimator.atDoneL0IngestTokensLinearModel.smoothedLinearModel) require.Equal(t, ilm, estimator.atDoneIngestTokensLinearModel.smoothedLinearModel) diff --git a/pkg/util/admission/work_queue.go b/pkg/util/admission/work_queue.go index 1284f8bb7b1e..44c5c0a8cb77 100644 --- a/pkg/util/admission/work_queue.go +++ b/pkg/util/admission/work_queue.go @@ -633,6 +633,7 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err info.ReplicatedWorkInfo, info.RequestedCount, info.CreateTime, + false, /* coordMuLocked */ ) } return true, nil @@ -679,6 +680,7 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err } } } + // Check for cancellation. startTime := q.timeNow() if ctx.Err() != nil { @@ -861,6 +863,7 @@ func (q *WorkQueue) granted(grantChainID grantChainID) int64 { item.replicated, item.requestedCount, item.createTime, + true, /* coordMuLocked */ ) q.metrics.incAdmitted(item.priority) @@ -1672,7 +1675,7 @@ func (m *WorkQueueMetrics) getOrCreate(priority admissionpb.WorkPriority) workQu // necessary to call LoadOrStore here as this could be called concurrently. // It is not called the first Load so that we don't have to unnecessarily // create the metrics. - statPrefix := fmt.Sprintf("%v.%v", m.name, admissionpb.WorkPriorityDict[priority]) + statPrefix := fmt.Sprintf("%v.%v", m.name, priority.String()) val, ok = m.byPriority.LoadOrStore(priority, makeWorkQueueMetricsSingle(statPrefix)) if !ok { m.registry.AddMetricStruct(val) @@ -1766,9 +1769,10 @@ type StoreWriteWorkInfo struct { type StoreWorkQueue struct { storeID roachpb.StoreID q [admissionpb.NumWorkClasses]WorkQueue - // Only calls storeWriteDone. The rest of the interface is used by + // Only calls storeReplicatedWorkAdmittedLocked. The rest of the interface is used by // WorkQueue. - granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone + granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted + coordMu *syncutil.Mutex mu struct { syncutil.RWMutex // estimates is used to determine how many tokens are deducted at-admit @@ -1788,6 +1792,7 @@ type StoreWorkQueue struct { stopCh chan struct{} timeSource timeutil.TimeSource settings *cluster.Settings + onLogEntryAdmitted OnLogEntryAdmitted knobs *TestingKnobs } @@ -1883,6 +1888,22 @@ type StoreWorkDoneInfo struct { IngestedBytes int64 } +// storeReplicatedWorkAdmittedInfo provides information about the size of +// replicated work once it's admitted (which happens asynchronously from the +// work itself). This lets us use the underlying linear models for L0 +// {writes,ingests} to deduct an appropriate number of tokens from the granter, +// for the admitted work size. +// +// TODO(irfansharif): This post-admission adjustment of tokens is odd -- when +// the replicated work is being enqueued, we already know its size, so we could +// have applied the linear models upfront and determine what the right # of +// tokens to deduct all at once. We're doing it this way because we've written +// the WorkQueue and granter interactions to be very general, but it can be hard +// to follow. See review discussions over at #97599. It's worth noting that +// there isn't really a lag in the adjustment, so it is harmless from an +// operational perspective of admission control. +type storeReplicatedWorkAdmittedInfo StoreWorkDoneInfo + type onAdmittedReplicatedWork interface { admittedReplicatedWork( tenantID roachpb.TenantID, @@ -1890,7 +1911,10 @@ type onAdmittedReplicatedWork interface { rwi ReplicatedWorkInfo, requestedTokens int64, createTime int64, + coordMuLocked bool, ) + + // TODO(irfansharif): This coordMuLocked parameter is gross. } var _ onAdmittedReplicatedWork = &StoreWorkQueue{} @@ -1903,6 +1927,7 @@ func (q *StoreWorkQueue) admittedReplicatedWork( rwi ReplicatedWorkInfo, originalTokens int64, createTime int64, // only used in tests + coordMuLocked bool, ) { if !rwi.Enabled { panic("unexpected call to admittedReplicatedWork for work that's not a replicated write") @@ -1911,11 +1936,11 @@ func (q *StoreWorkQueue) admittedReplicatedWork( fn(tenantID, pri, rwi, originalTokens, createTime) } - var storeWorkDoneInfo StoreWorkDoneInfo + var replicatedWorkAdmittedInfo storeReplicatedWorkAdmittedInfo if rwi.Ingested { - storeWorkDoneInfo.IngestedBytes = originalTokens + replicatedWorkAdmittedInfo.IngestedBytes = originalTokens } else { - storeWorkDoneInfo.WriteBytes = originalTokens + replicatedWorkAdmittedInfo.WriteBytes = originalTokens } // We've already used RequestedCount for replicated writes to deduct tokens @@ -1925,19 +1950,55 @@ func (q *StoreWorkQueue) admittedReplicatedWork( // underlying linear models, and we may have under-deducted -- we account // for this below. wc := admissionpb.WorkClassFromPri(pri) - additionalTokensNeeded := q.granters[wc].storeWriteDone(originalTokens, storeWorkDoneInfo) + if !coordMuLocked { + q.coordMu.Lock() + } + additionalTokensNeeded := q.granters[wc].storeReplicatedWorkAdmittedLocked(originalTokens, replicatedWorkAdmittedInfo) + if !coordMuLocked { + q.coordMu.Unlock() + } q.q[wc].adjustTenantTokens(tenantID, additionalTokensNeeded) - // TODO(irfansharif): Dispatch flow token returns here. We want to - // inform (a) the origin node of writes at (b) a given priority, to - // (c) the given range, at (d) the given log position on (e) the - // local store. Part of #95563. + // Inform callers of the entry we just admitted. // - _ = rwi.Origin // (a) - _ = pri // (b) - _ = rwi.RangeID // (c) - _ = rwi.LogPosition // (d) - _ = q.storeID // (e) + // TODO(irfansharif): It's bad that we're extending coord.mu's critical + // section to this callback. We can't prevent it when this is happening via + // WorkQueue.granted since it was called while holding coord.mu. We should + // revisit -- one possibility is to add this to a notification queue and + // have a separate goroutine invoke these callbacks (without holding + // coord.mu). We could directly invoke here too if not holding the lock. + q.onLogEntryAdmitted.AdmittedLogEntry( + rwi.Origin, + pri, + q.storeID, + rwi.RangeID, + rwi.LogPosition, + ) +} + +// OnLogEntryAdmitted is used to observe the specific entries (identified by +// rangeID + log position) that were admitted. Since admission control for log +// entries is asynchronous/non-blocking, this allows callers to do requisite +// post-admission bookkeeping. +type OnLogEntryAdmitted interface { + AdmittedLogEntry( + origin roachpb.NodeID, /* node where the entry originated */ + pri admissionpb.WorkPriority, /* admission priority of the entry */ + storeID roachpb.StoreID, /* store on which the entry was admitted */ + rangeID roachpb.RangeID, /* identifying range for the log entry */ + pos LogPosition, /* log position of the entry that was admitted*/ + ) +} + +// NoopOnLogEntryAdmitted is a no-op implementation of the OnLogEntryAdmitted +// interface. +type NoopOnLogEntryAdmitted struct{} + +var _ OnLogEntryAdmitted = &NoopOnLogEntryAdmitted{} + +func (n *NoopOnLogEntryAdmitted) AdmittedLogEntry( + roachpb.NodeID, admissionpb.WorkPriority, roachpb.StoreID, roachpb.RangeID, LogPosition, +) { } // AdmittedWorkDone indicates to the queue that the admitted work has completed. @@ -2026,11 +2087,13 @@ func (q *StoreWorkQueue) setStoreRequestEstimates(estimates storeRequestEstimate func makeStoreWorkQueue( ambientCtx log.AmbientContext, storeID roachpb.StoreID, - granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, + granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted, settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, knobs *TestingKnobs, + onLogEntryAdmitted OnLogEntryAdmitted, + coordMu *syncutil.Mutex, ) storeRequester { if knobs == nil { knobs = &TestingKnobs{} @@ -2039,12 +2102,14 @@ func makeStoreWorkQueue( opts.timeSource = timeutil.DefaultTimeSource{} } q := &StoreWorkQueue{ - storeID: storeID, - granters: granters, - knobs: knobs, - stopCh: make(chan struct{}), - timeSource: opts.timeSource, - settings: settings, + coordMu: coordMu, + storeID: storeID, + granters: granters, + knobs: knobs, + stopCh: make(chan struct{}), + timeSource: opts.timeSource, + settings: settings, + onLogEntryAdmitted: onLogEntryAdmitted, } opts.usesAsyncAdmit = true diff --git a/pkg/util/admission/work_queue_test.go b/pkg/util/admission/work_queue_test.go index f4902930be32..66da1387e973 100644 --- a/pkg/util/admission/work_queue_test.go +++ b/pkg/util/admission/work_queue_test.go @@ -67,7 +67,7 @@ type testGranter struct { additionalTokens int64 } -var _ granterWithStoreWriteDone = &testGranter{} +var _ granterWithStoreReplicatedWorkAdmitted = &testGranter{} func (tg *testGranter) grantKind() grantKind { return tg.gk @@ -110,6 +110,14 @@ func (tg *testGranter) storeWriteDone( return tg.additionalTokens } +func (tg *testGranter) storeReplicatedWorkAdmittedLocked( + originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo, +) (additionalTokens int64) { + tg.buf.printf("storeReplicatedWorkAdmittedLocked%s: originalTokens %d, admittedBytes(write %d,ingested %d) returning %d", + tg.name, originalTokens, admittedInfo.WriteBytes, admittedInfo.IngestedBytes, tg.additionalTokens) + return tg.additionalTokens +} + type testWork struct { tenantID roachpb.TenantID cancel context.CancelFunc @@ -522,9 +530,13 @@ func TestStoreWorkQueueBasic(t *testing.T) { opts.timeSource = timeutil.NewManualTime(timeutil.FromUnixMicros(0)) opts.disableEpochClosingGoroutine = true st = cluster.MakeTestingClusterSettings() + var mockCoordMu syncutil.Mutex q = makeStoreWorkQueue(log.MakeTestingAmbientContext(tracing.NewTracer()), roachpb.StoreID(1), - [admissionpb.NumWorkClasses]granterWithStoreWriteDone{tg[admissionpb.RegularWorkClass], tg[admissionpb.ElasticWorkClass]}, - st, metrics, opts, nil /* testing knobs */).(*StoreWorkQueue) + [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted{ + tg[admissionpb.RegularWorkClass], + tg[admissionpb.ElasticWorkClass], + }, + st, metrics, opts, nil /* testing knobs */, &NoopOnLogEntryAdmitted{}, &mockCoordMu).(*StoreWorkQueue) tg[admissionpb.RegularWorkClass].r = q.getRequesters()[admissionpb.RegularWorkClass] tg[admissionpb.ElasticWorkClass].r = q.getRequesters()[admissionpb.ElasticWorkClass] wrkMap.resetMap()