diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index a5ce75721742..27a0f33defc4 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -227,7 +227,6 @@ ALL_TESTS = [ "//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:kvflowdispatch_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle_test", - "//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer:kvflowsequencer_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker_test", "//pkg/kv/kvserver/kvstorage:kvstorage_test", @@ -1303,8 +1302,6 @@ GO_TARGETS = [ "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:kvflowdispatch_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle", "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle_test", - "//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer:kvflowsequencer", - "//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer:kvflowsequencer_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker_test", @@ -2770,7 +2767,6 @@ GET_X_DATA_TARGETS = [ "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:get_x_data", - "//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:get_x_data", "//pkg/kv/kvserver/kvserverbase:get_x_data", diff --git a/pkg/kv/kvserver/kvflowcontrol/doc.go b/pkg/kv/kvserver/kvflowcontrol/doc.go index ec784c1e1cd7..49910b2790d5 100644 --- a/pkg/kv/kvserver/kvflowcontrol/doc.go +++ b/pkg/kv/kvserver/kvflowcontrol/doc.go @@ -483,7 +483,7 @@ package kvflowcontrol // still queued after ~100ms, will trigger epoch-LIFO everywhere. // [^11]: See the implementation for kvflowcontrol.Dispatch. // [^12]: See UpToRaftLogPosition in AdmittedRaftLogEntries. -// [^13]: See kvflowsequencer.Sequencer and its use in kvflowhandle.Handle. +// [^13]: See admission.sequencer and its use in admission.StoreWorkQueue. // [^14]: See the high_create_time_low_position_different_range test case for // TestReplicatedWriteAdmission. // diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go index e55cb2acc4c7..de6641205a58 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go @@ -98,13 +98,11 @@ type Handle interface { // work with given priority along connected streams. The deduction is // tracked with respect to the specific raft log position it's expecting it // to end up in, log positions that monotonically increase. Requests are - // assumed to have been Admit()-ed first. The returned time.Time parameter - // is to be used as the work item's CreateTime when enqueueing in IO - // admission queues. + // assumed to have been Admit()-ed first. DeductTokensFor( - context.Context, admissionpb.WorkPriority, time.Time, + context.Context, admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens, - ) time.Time + ) // ReturnTokensUpto returns all previously deducted tokens of a given // priority for all log positions less than or equal to the one specified. // It does for the specific stream. Once returned, subsequent attempts to diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go index 46c8e6b0cfb5..a18472414715 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go @@ -183,8 +183,9 @@ func (c *Controller) Admit( } } - // TODO(irfansharif): Use the create time for ordering among waiting - // requests. Integrate it with epoch-LIFO. + // TODO(irfansharif): Use CreateTime for ordering among waiting + // requests, integrate it with epoch-LIFO. See I12 from + // kvflowcontrol/doc.go. } // DeductTokens is part of the kvflowcontrol.Controller interface. diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/BUILD.bazel index ff07b3cc5958..0b4379c2a72b 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/BUILD.bazel @@ -14,7 +14,6 @@ go_library( "//pkg/base", "//pkg/kv/kvserver/kvflowcontrol", "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", - "//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker", "//pkg/util/admission/admissionpb", "//pkg/util/hlc", @@ -40,7 +39,6 @@ go_test( "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/metric", - "//pkg/util/timeutil", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go index 1c43ffeb2ab7..0ff5ebf18a10 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -41,7 +40,6 @@ type Handle struct { // (identified by their log positions) have been admitted below-raft, // streams disconnect, or the handle closed entirely. perStreamTokenTracker map[kvflowcontrol.Stream]*kvflowtokentracker.Tracker - sequencer *kvflowsequencer.Sequencer closed bool } } @@ -54,7 +52,6 @@ func New(controller kvflowcontrol.Controller, metrics *Metrics, clock *hlc.Clock clock: clock, } h.mu.perStreamTokenTracker = map[kvflowcontrol.Stream]*kvflowtokentracker.Tracker{} - h.mu.sequencer = kvflowsequencer.New() return h } @@ -104,31 +101,28 @@ func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct tim func (h *Handle) DeductTokensFor( ctx context.Context, pri admissionpb.WorkPriority, - ct time.Time, pos kvflowcontrolpb.RaftLogPosition, tokens kvflowcontrol.Tokens, -) time.Time { +) { if h == nil { // TODO(irfansharif): See TODO around nil receiver check in Admit(). - return ct + return } - ct, _ = h.deductTokensForInner(ctx, pri, ct, pos, tokens) - return ct + _ = h.deductTokensForInner(ctx, pri, pos, tokens) } func (h *Handle) deductTokensForInner( ctx context.Context, pri admissionpb.WorkPriority, - ct time.Time, pos kvflowcontrolpb.RaftLogPosition, tokens kvflowcontrol.Tokens, -) (sequence time.Time, streams []kvflowcontrol.Stream) { +) (streams []kvflowcontrol.Stream) { h.mu.Lock() defer h.mu.Unlock() if h.mu.closed { log.Errorf(ctx, "operating on a closed handle") - return ct, nil // unused return value in production code + return nil // unused return value in production code } for _, c := range h.mu.connections { @@ -136,7 +130,7 @@ func (h *Handle) deductTokensForInner( h.mu.perStreamTokenTracker[c.Stream()].Track(ctx, pri, tokens, pos) streams = append(streams, c.Stream()) } - return h.mu.sequencer.Sequence(ct), streams + return streams } // ReturnTokensUpto is part of the kvflowcontrol.Handle interface. @@ -322,9 +316,8 @@ func (h *Handle) TestingNonBlockingAdmit( func (h *Handle) TestingDeductTokensForInner( ctx context.Context, pri admissionpb.WorkPriority, - ct time.Time, pos kvflowcontrolpb.RaftLogPosition, tokens kvflowcontrol.Tokens, -) (time.Time, []kvflowcontrol.Stream) { - return h.deductTokensForInner(ctx, pri, ct, pos, tokens) +) []kvflowcontrol.Stream { + return h.deductTokensForInner(ctx, pri, pos, tokens) } diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go index 72f5d7e05724..a4f1182ac19f 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go @@ -26,7 +26,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" ) @@ -80,7 +79,7 @@ func TestHandleAdmit(t *testing.T) { // Connect a single stream at pos=0 and deplete all 16MiB of regular // tokens at pos=1. handle.ConnectStream(ctx, pos(0), stream) - handle.DeductTokensFor(ctx, admissionpb.NormalPri, time.Time{}, pos(1), kvflowcontrol.Tokens(16<<20 /* 16MiB */)) + handle.DeductTokensFor(ctx, admissionpb.NormalPri, pos(1), kvflowcontrol.Tokens(16<<20 /* 16MiB */)) // Invoke .Admit() in a separate goroutine, and test below whether // the goroutine is blocked. @@ -106,67 +105,3 @@ func TestHandleAdmit(t *testing.T) { }) } } - -// TestHandleSequencing tests the sequencing behavior of -// Handle.DeductTokensFor(), namely that we: -// - advance sequencing timestamps when the create-time advances; -// - advance sequencing timestamps when the log position advances. -func TestHandleSequencing(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - // tzero represents the t=0, the earliest possible time. All other - // create-time= is relative to this time. - var tzero = timeutil.Unix(0, 0) - - ctx := context.Background() - stream := kvflowcontrol.Stream{ - TenantID: roachpb.MustMakeTenantID(42), - StoreID: roachpb.StoreID(42), - } - pos := func(t, i uint64) kvflowcontrolpb.RaftLogPosition { - return kvflowcontrolpb.RaftLogPosition{Term: t, Index: i} - } - ct := func(d int64) time.Time { - return tzero.Add(time.Nanosecond * time.Duration(d)) - } - - const tokens = kvflowcontrol.Tokens(1 << 20 /* MiB */) - const normal = admissionpb.NormalPri - - registry := metric.NewRegistry() - clock := hlc.NewClockForTesting(nil) - controller := kvflowcontroller.New(registry, cluster.MakeTestingClusterSettings(), clock) - handle := kvflowhandle.New(controller, kvflowhandle.NewMetrics(registry), clock) - - // Test setup: handle is connected to a single stream at pos=1/0 and has - // deducted 1MiB of regular tokens at pos=1 ct=1. - handle.ConnectStream(ctx, pos(1, 0), stream) - seq0 := handle.DeductTokensFor(ctx, normal, ct(1), pos(1, 1), tokens) - - // If create-time advances, so does the sequencing timestamp. - seq1 := handle.DeductTokensFor(ctx, normal, ct(2), pos(1, 1), tokens) - require.Greater(t, seq1, seq0) - - // If stays static, the sequencing timestamp - // still advances. - seq2 := handle.DeductTokensFor(ctx, normal, ct(2), pos(1, 1), tokens) - require.Greater(t, seq2, seq1) - - // If the log index advances, so does the sequencing timestamp. - seq3 := handle.DeductTokensFor(ctx, normal, ct(3), pos(1, 2), tokens) - require.Greater(t, seq3, seq2) - - // If the log term advances, so does the sequencing timestamp. - seq4 := handle.DeductTokensFor(ctx, normal, ct(3), pos(2, 2), tokens) - require.Greater(t, seq4, seq3) - - // If both the create-time and log-position advance, so does the sequencing - // timestamp. - seq5 := handle.DeductTokensFor(ctx, normal, ct(1000), pos(4, 20), tokens) - require.Greater(t, seq5, seq4) - - // Verify that the sequencing timestamp is kept close to the maximum - // observed create-time. - require.LessOrEqual(t, seq5.Sub(ct(1000)), time.Nanosecond) -} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/BUILD.bazel deleted file mode 100644 index 4b7b66091b3e..000000000000 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/BUILD.bazel +++ /dev/null @@ -1,29 +0,0 @@ -load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") -load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") - -go_library( - name = "kvflowsequencer", - srcs = ["sequencer.go"], - importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer", - visibility = ["//visibility:public"], - deps = ["//pkg/util/timeutil"], -) - -go_test( - name = "kvflowsequencer_test", - srcs = ["sequencer_test.go"], - args = ["-test.timeout=295s"], - data = glob(["testdata/**"]), - embed = [":kvflowsequencer"], - deps = [ - "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", - "//pkg/testutils/datapathutils", - "//pkg/util/leaktest", - "//pkg/util/log", - "//pkg/util/timeutil", - "@com_github_cockroachdb_datadriven//:datadriven", - "@com_github_stretchr_testify//require", - ], -) - -get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/simulation_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/simulation_test.go index 6cdc943cfc32..d898c39ce43a 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/simulation_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/simulation_test.go @@ -630,7 +630,7 @@ func (h *replicaHandle) deductTokens( // Increment the quorum log position -- all token deductions are bound to // incrementing log positions. h.quorumLogPosition.Index += 1 - _, streams := h.handle.TestingDeductTokensForInner(ctx, pri, time.Time{}, h.quorumLogPosition, tokens) + streams := h.handle.TestingDeductTokensForInner(ctx, pri, h.quorumLogPosition, tokens) for _, stream := range streams { h.deductionTracker[stream].Track(ctx, pri, tokens, h.quorumLogPosition) } diff --git a/pkg/util/admission/BUILD.bazel b/pkg/util/admission/BUILD.bazel index 3ed968ba72fa..e2dbecf8603b 100644 --- a/pkg/util/admission/BUILD.bazel +++ b/pkg/util/admission/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "kv_slot_adjuster.go", "pacer.go", "scheduler_latency_listener.go", + "sequencer.go", "sql_cpu_overload_indicator.go", "store_token_estimation.go", "testing_knobs.go", @@ -56,6 +57,7 @@ go_test( "io_load_listener_test.go", "replicated_write_admission_test.go", "scheduler_latency_listener_test.go", + "sequencer_test.go", "store_token_estimation_test.go", "tokens_linear_model_test.go", "work_queue_test.go", diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer.go b/pkg/util/admission/sequencer.go similarity index 52% rename from pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer.go rename to pkg/util/admission/sequencer.go index 9cc0271f4257..05b389cc9310 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer.go +++ b/pkg/util/admission/sequencer.go @@ -8,15 +8,9 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package kvflowsequencer +package admission -import ( - "time" - - "github.com/cockroachdb/cockroach/pkg/util/timeutil" -) - -// Sequencer issues monotonic sequencing timestamps derived from observed +// sequencer issues monotonic sequencing timestamps derived from observed // CreateTimes. This is a purpose-built data structure for replication admission // control where we want to assign each AC-queued work below-raft a "sequence // number" for FIFO ordering within a . We ensure timestamps @@ -27,9 +21,29 @@ import ( // // It's not safe for concurrent access. // +// ---- +// +// Aside: Why not do this CreateTime-generation above raft? This is because these +// sequence numbers are encoded as part of the raft proposal[3], and at +// encode-time, we don't actually know what log position the proposal is going +// to end up in. It's hard to explicitly guarantee that a proposal with +// log-position P1 will get encoded before another with log position P2, where +// P1 < P2. +// +// If we tried to "approximate" CreateTimes at proposal-encode-time, +// approximating log position order, it could result in over-admission. This is +// because of how we return flow tokens -- up to some log index[4], and how use +// these sequence numbers in below-raft WorkQueues. If P2 ends up with a lower +// sequence number/CreateTime, it would get admitted first, and when returning +// flow tokens by log position, in specifying up-to-P2, we'll early return P1's +// flow tokens despite it not being admitted. So we'd over-admit at the sender. +// This is all within a pair. +// // [1]: See I12 from kvflowcontrol/doc.go. -// [2]: See kvflowhandle.Handle. -type Sequencer struct { +// [2]: See kvadmission.AdmitRaftEntry. +// [3]: In kvflowcontrolpb.RaftAdmissionMeta. +// [4]: See kvflowcontrolpb.AdmittedRaftLogEntries. +type sequencer struct { // maxCreateTime ratchets to the highest observed CreateTime. If sequencing // work with lower CreateTimes, we continue generating monotonic sequence // numbers by incrementing it for every such sequencing attempt. Provided @@ -38,18 +52,12 @@ type Sequencer struct { maxCreateTime int64 } -// New returns a new Sequencer. -func New() *Sequencer { - return &Sequencer{} -} - -// Sequence returns a monotonically increasing timestamps derived from the +// sequence returns a monotonically increasing timestamps derived from the // provided CreateTime. -func (s *Sequencer) Sequence(ct time.Time) time.Time { - createTime := ct.UnixNano() +func (s *sequencer) sequence(createTime int64) int64 { if createTime <= s.maxCreateTime { createTime = s.maxCreateTime + 1 } s.maxCreateTime = createTime - return timeutil.FromUnixNanos(createTime) + return createTime } diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer_test.go b/pkg/util/admission/sequencer_test.go similarity index 64% rename from pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer_test.go rename to pkg/util/admission/sequencer_test.go index cc97d1a86047..968c47ae2629 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer_test.go +++ b/pkg/util/admission/sequencer_test.go @@ -8,16 +8,13 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package kvflowsequencer +package admission import ( "fmt" - "strconv" - "strings" "testing" "time" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -30,13 +27,13 @@ func TestSequencer(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - var sequencer *Sequencer + var seq *sequencer var lastSeqNum int64 datadriven.RunTest(t, datapathutils.TestDataPath(t, "sequencer"), func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { case "init": - sequencer = New() + seq = &sequencer{} return "" case "sequence": @@ -49,8 +46,8 @@ func TestSequencer(t *testing.T) { // Parse log-position=/. logPosition := parseLogPosition(t, d) - _ = logPosition - sequenceNum := sequencer.Sequence(tzero.Add(dur)).UnixNano() + _ = logPosition // unused + sequenceNum := seq.sequence(tzero.Add(dur).UnixNano()) if lastSeqNum < sequenceNum { movement = " (advanced)" } @@ -67,23 +64,3 @@ func TestSequencer(t *testing.T) { }, ) } - -// tzero represents the t=0, the earliest possible time. All other -// create-time= is relative to this time. -var tzero = timeutil.Unix(0, 0) - -func parseLogPosition(t *testing.T, d *datadriven.TestData) kvflowcontrolpb.RaftLogPosition { - // Parse log-position=/. - var arg string - d.ScanArgs(t, "log-position", &arg) - inner := strings.Split(arg, "/") - require.Len(t, inner, 2) - term, err := strconv.Atoi(inner[0]) - require.NoError(t, err) - index, err := strconv.Atoi(inner[1]) - require.NoError(t, err) - return kvflowcontrolpb.RaftLogPosition{ - Term: uint64(term), - Index: uint64(index), - } -} diff --git a/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_different_range b/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_different_range index d36a0455822e..d7a28c2ff8bc 100644 --- a/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_different_range +++ b/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_different_range @@ -19,7 +19,7 @@ admit tenant=t1 pri=normal-pri create-time=1us size=1B range=r2 origin=n1 log-po # request with the lower create time sorts first despite having the higher log # position. Admission work queues order work based entirely on create-times, # and the assignment of monotonic create-times (WRT log positions) happens only -# within a range and by higher-level components -- kvflowcontrol.Handle. +# within a range by the StoreWorkQueue. print ---- physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B diff --git a/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_same_range b/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_same_range new file mode 100644 index 000000000000..3de81d4eb0b4 --- /dev/null +++ b/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_same_range @@ -0,0 +1,52 @@ +# Verify that we ignore create-time based ordering for replicated write +# admission when writes happen within the same range. + +init +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# Admit two requests, one created at t=5us but with a lower log position. +admit tenant=t1 pri=normal-pri create-time=5us size=1B range=r1 origin=n1 log-position=4/20 +---- +[regular] try-get=1B available=0B => insufficient tokens + +# And one created at t=1us but but higher log position. +admit tenant=t1 pri=normal-pri create-time=1us size=1B range=r1 origin=n1 log-position=4/21 +---- + +# Observe both waiting requests and physical admission stats. Note how the +# request with the lower log position sorts first despite having the higher +# create-time. The StoreWorkQueue sequences them by (ab)using the create-time +# parameter to get this log position ordering. +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=normal-pri create-time=5µs size=1B range=r1 origin=n1 log-position=4/20] + [1: pri=normal-pri create-time=5.001µs size=1B range=r1 origin=n1 log-position=4/21] +[elastic work queue]: len(tenant-heap)=0 + +# Produce 1B worth of regular tokens. +granter class=regular adjust-tokens=+1B +---- +[regular] 1B tokens available +[elastic] 0B tokens available + +# Grant admission to requests. Since we have 1B worth of tokens, and 2 waiting +# requests wanting 1B each, we're only able to admit one. Verify that it's the +# request with the lower log position despite the higher original create-time. +grant class=regular +---- +admitted [tenant=t1 pri=normal-pri create-time=5µs size=1B range=r1 origin=n1 log-position=4/20] + +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=1B + [0: pri=normal-pri create-time=5.001µs size=1B range=r1 origin=n1 log-position=4/21] +[elastic work queue]: len(tenant-heap)=0 + +# vim:ft=sh diff --git a/pkg/util/admission/testdata/replicated_write_admission/tenant_fairness b/pkg/util/admission/testdata/replicated_write_admission/tenant_fairness index c00240b461f7..3f0d04dbd18a 100644 --- a/pkg/util/admission/testdata/replicated_write_admission/tenant_fairness +++ b/pkg/util/admission/testdata/replicated_write_admission/tenant_fairness @@ -7,8 +7,7 @@ init # For two tenants t1 and t2, try to admit two requests of 1B each at # incrementing log positions. We specify create-times in log-position order for -# work within a given range, similar to what we do at the issuing client -# (kvflowcontrol.Handle). +# work within a given range, similar to what we do at the StoreWorkQueue level. admit tenant=t1 pri=normal-pri create-time=1.001us size=1B range=r1 origin=n1 log-position=4/20 ---- [regular] try-get=1B available=0B => insufficient tokens diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/testdata/sequencer b/pkg/util/admission/testdata/sequencer similarity index 91% rename from pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/testdata/sequencer rename to pkg/util/admission/testdata/sequencer index ea335538f940..97716928fec2 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/testdata/sequencer +++ b/pkg/util/admission/testdata/sequencer @@ -1,7 +1,8 @@ -# Walk through the basics of how the per-handle sequencer works. The -# log-position= parameter is not actually used in the implementation, but in -# typical usage we'd be sequencing work in log position order, and it's -# instructive to understand how sequencing timestamps are generated. +# Walk through the basics of how the below-raft replicated work sequencer +# works. The log-position= parameter is not actually used in the +# implementation, but in typical usage we'd be sequencing work in log position +# order, and it's instructive to understand how sequencing timestamps are +# generated. # # ----------------------------------------------------------------------------- # 1. Observe how the sequence numbers change with changing log positions (and diff --git a/pkg/util/admission/work_queue.go b/pkg/util/admission/work_queue.go index 8a97cfa900ca..1284f8bb7b1e 100644 --- a/pkg/util/admission/work_queue.go +++ b/pkg/util/admission/work_queue.go @@ -156,6 +156,14 @@ var epochLIFOQueueDelayThresholdToSwitchToLIFO = settings.RegisterDurationSettin return nil }).WithPublic() +var rangeSequencerGCThreshold = settings.RegisterDurationSetting( + settings.TenantWritable, + "admission.replication_control.range_sequencer_gc_threshold", + "the inactive duration for a range sequencer after it's garbage collected", + 5*time.Minute, + settings.NonNegativeDuration, +) + // WorkInfo provides information that is used to order work within an WorkQueue. // The WorkKind is not included as a field since an WorkQueue deals with a // single WorkKind. @@ -1773,9 +1781,13 @@ type StoreWorkQueue struct { // and observed L0 growth (which factors in state machine application). stats storeAdmissionStats } - stopCh chan struct{} - timeSource timeutil.TimeSource - settings *cluster.Settings + sequencersMu struct { + syncutil.Mutex + s map[roachpb.RangeID]*sequencer // cleaned up periodically + } + stopCh chan struct{} + timeSource timeutil.TimeSource + settings *cluster.Settings knobs *TestingKnobs } @@ -1824,6 +1836,9 @@ func (q *StoreWorkQueue) Admit( info.RequestedCount = q.mu.estimates.writeTokens q.mu.RUnlock() } + if info.ReplicatedWorkInfo.Enabled { + info.CreateTime = q.sequenceReplicatedWork(info.CreateTime, info.ReplicatedWorkInfo) + } enabled, err := q.q[wc].Admit(ctx, info.WorkInfo) if err != nil { @@ -2042,5 +2057,43 @@ func makeStoreWorkQueue( q.mu.estimates = storeRequestEstimates{ writeTokens: 1, } + + q.sequencersMu.s = make(map[roachpb.RangeID]*sequencer) + go func() { + ticker := time.NewTicker(30 * time.Second) + for { + select { + case <-ticker.C: + q.gcSequencers() + case <-q.stopCh: + return + } + } + }() return q } + +func (q *StoreWorkQueue) gcSequencers() { + q.sequencersMu.Lock() + defer q.sequencersMu.Unlock() + + for rangeID, seq := range q.sequencersMu.s { + maxCreateTime := timeutil.FromUnixNanos(seq.maxCreateTime) + if q.timeSource.Now().Sub(maxCreateTime) > rangeSequencerGCThreshold.Get(&q.settings.SV) { + delete(q.sequencersMu.s, rangeID) + } + } +} + +func (q *StoreWorkQueue) sequenceReplicatedWork(createTime int64, info ReplicatedWorkInfo) int64 { + q.sequencersMu.Lock() + seq, ok := q.sequencersMu.s[info.RangeID] + if !ok { + seq = &sequencer{} + q.sequencersMu.s[info.RangeID] = seq + } + q.sequencersMu.Unlock() + // We're assuming sequenceReplicatedWork is never invoked concurrently for a + // given RangeID. + return seq.sequence(createTime) +}