Skip to content

Commit

Permalink
admission: move CreateTime-sequencing below-raft
Browse files Browse the repository at this point in the history
We move kvflowsequencer.Sequencer and its use in kvflowhandle.Handle
(above-raft) to admission.sequencer, now used by
admission.StoreWorkQueue (below-raft). This variant appeared in an
earlier revision of cockroachdb#97599 where we first introduced monotonically
increasing CreateTimes for a given raft group.

In a subsequent commit, when integrating kvflowcontrol into the critical
path for replication traffic, we'll observe that it's quite difficult to
create sequencing CreateTimes[^1] above raft. This is because these sequence
numbers are encoded as part of the raft proposal[^2], and at
encode-time, we don't actually know what log position the proposal is
going to end up in. It's hard to explicitly guarantee that a proposal
with log-position P1 will get encoded before another with log position
P2, where P1 < P2.

Naively sequencing CreateTimes at proposal-encode-time could result in
over-admission. This is because of how we return flow tokens -- up to
some log index[^3], and how use these sequence numbers in below-raft
WorkQueues. If P2 ends up with a lower sequence number/CreateTime, it
would get admitted first, and when returning flow tokens by log
position, in specifying up-to-P2, we'll early return P1's flow tokens
despite it not being admitted. So we'd over-admit at the sender. This
is all within a <tenant,priority> pair.

[^1]: We use CreateTimes as "sequence numbers" in replication admission
      control. We want to assign each AC-queued work below-raft a
      "sequence number" for FIFO ordering within a <tenant,priority>.
      We ensure these timestamps are roughly monotonic with respect to
      log positions of replicated work by sequencing work in log
      position order.
[^2]: In kvflowcontrolpb.RaftAdmissionMeta.
[^3]: See kvflowcontrolpb.AdmittedRaftLogEntries.

Release note: None
  • Loading branch information
irfansharif committed Apr 28, 2023
1 parent 0529a97 commit e59abeb
Show file tree
Hide file tree
Showing 17 changed files with 146 additions and 182 deletions.
4 changes: 0 additions & 4 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ ALL_TESTS = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:kvflowdispatch_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer:kvflowsequencer_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker_test",
"//pkg/kv/kvserver/kvstorage:kvstorage_test",
Expand Down Expand Up @@ -1303,8 +1302,6 @@ GO_TARGETS = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:kvflowdispatch_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer:kvflowsequencer",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer:kvflowsequencer_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker_test",
Expand Down Expand Up @@ -2769,7 +2766,6 @@ GET_X_DATA_TARGETS = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:get_x_data",
"//pkg/kv/kvserver/kvserverbase:get_x_data",
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/kvflowcontrol/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ package kvflowcontrol
// still queued after ~100ms, will trigger epoch-LIFO everywhere.
// [^11]: See the implementation for kvflowcontrol.Dispatch.
// [^12]: See UpToRaftLogPosition in AdmittedRaftLogEntries.
// [^13]: See kvflowsequencer.Sequencer and its use in kvflowhandle.Handle.
// [^13]: See admission.sequencer and its use in admission.StoreWorkQueue.
// [^14]: See the high_create_time_low_position_different_range test case for
// TestReplicatedWriteAdmission.
//
Expand Down
8 changes: 3 additions & 5 deletions pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,11 @@ type Handle interface {
// work with given priority along connected streams. The deduction is
// tracked with respect to the specific raft log position it's expecting it
// to end up in, log positions that monotonically increase. Requests are
// assumed to have been Admit()-ed first. The returned time.Time parameter
// is to be used as the work item's CreateTime when enqueueing in IO
// admission queues.
// assumed to have been Admit()-ed first.
DeductTokensFor(
context.Context, admissionpb.WorkPriority, time.Time,
context.Context, admissionpb.WorkPriority,
kvflowcontrolpb.RaftLogPosition, Tokens,
) time.Time
)
// ReturnTokensUpto returns all previously deducted tokens of a given
// priority for all log positions less than or equal to the one specified.
// It does for the specific stream. Once returned, subsequent attempts to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,9 @@ func (c *Controller) Admit(
}
}

// TODO(irfansharif): Use the create time for ordering among waiting
// requests. Integrate it with epoch-LIFO.
// TODO(irfansharif): Use CreateTime for ordering among waiting
// requests, integrate it with epoch-LIFO. See I12 from
// kvflowcontrol/doc.go.
}

// DeductTokens is part of the kvflowcontrol.Controller interface.
Expand Down
2 changes: 0 additions & 2 deletions pkg/kv/kvserver/kvflowcontrol/kvflowhandle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ go_library(
"//pkg/base",
"//pkg/kv/kvserver/kvflowcontrol",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker",
"//pkg/util/admission/admissionpb",
"//pkg/util/hlc",
Expand All @@ -40,7 +39,6 @@ go_test(
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/timeutil",
"@com_github_stretchr_testify//require",
],
)
Expand Down
23 changes: 8 additions & 15 deletions pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand All @@ -41,7 +40,6 @@ type Handle struct {
// (identified by their log positions) have been admitted below-raft,
// streams disconnect, or the handle closed entirely.
perStreamTokenTracker map[kvflowcontrol.Stream]*kvflowtokentracker.Tracker
sequencer *kvflowsequencer.Sequencer
closed bool
}
}
Expand All @@ -54,7 +52,6 @@ func New(controller kvflowcontrol.Controller, metrics *Metrics, clock *hlc.Clock
clock: clock,
}
h.mu.perStreamTokenTracker = map[kvflowcontrol.Stream]*kvflowtokentracker.Tracker{}
h.mu.sequencer = kvflowsequencer.New()
return h
}

Expand Down Expand Up @@ -104,39 +101,36 @@ func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct tim
func (h *Handle) DeductTokensFor(
ctx context.Context,
pri admissionpb.WorkPriority,
ct time.Time,
pos kvflowcontrolpb.RaftLogPosition,
tokens kvflowcontrol.Tokens,
) time.Time {
) {
if h == nil {
// TODO(irfansharif): See TODO around nil receiver check in Admit().
return ct
return
}

ct, _ = h.deductTokensForInner(ctx, pri, ct, pos, tokens)
return ct
_ = h.deductTokensForInner(ctx, pri, pos, tokens)
}

func (h *Handle) deductTokensForInner(
ctx context.Context,
pri admissionpb.WorkPriority,
ct time.Time,
pos kvflowcontrolpb.RaftLogPosition,
tokens kvflowcontrol.Tokens,
) (sequence time.Time, streams []kvflowcontrol.Stream) {
) (streams []kvflowcontrol.Stream) {
h.mu.Lock()
defer h.mu.Unlock()
if h.mu.closed {
log.Errorf(ctx, "operating on a closed handle")
return ct, nil // unused return value in production code
return nil // unused return value in production code
}

for _, c := range h.mu.connections {
h.controller.DeductTokens(ctx, pri, tokens, c.Stream())
h.mu.perStreamTokenTracker[c.Stream()].Track(ctx, pri, tokens, pos)
streams = append(streams, c.Stream())
}
return h.mu.sequencer.Sequence(ct), streams
return streams
}

// ReturnTokensUpto is part of the kvflowcontrol.Handle interface.
Expand Down Expand Up @@ -322,9 +316,8 @@ func (h *Handle) TestingNonBlockingAdmit(
func (h *Handle) TestingDeductTokensForInner(
ctx context.Context,
pri admissionpb.WorkPriority,
ct time.Time,
pos kvflowcontrolpb.RaftLogPosition,
tokens kvflowcontrol.Tokens,
) (time.Time, []kvflowcontrol.Stream) {
return h.deductTokensForInner(ctx, pri, ct, pos, tokens)
) []kvflowcontrol.Stream {
return h.deductTokensForInner(ctx, pri, pos, tokens)
}
67 changes: 1 addition & 66 deletions pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -80,7 +79,7 @@ func TestHandleAdmit(t *testing.T) {
// Connect a single stream at pos=0 and deplete all 16MiB of regular
// tokens at pos=1.
handle.ConnectStream(ctx, pos(0), stream)
handle.DeductTokensFor(ctx, admissionpb.NormalPri, time.Time{}, pos(1), kvflowcontrol.Tokens(16<<20 /* 16MiB */))
handle.DeductTokensFor(ctx, admissionpb.NormalPri, pos(1), kvflowcontrol.Tokens(16<<20 /* 16MiB */))

// Invoke .Admit() in a separate goroutine, and test below whether
// the goroutine is blocked.
Expand All @@ -106,67 +105,3 @@ func TestHandleAdmit(t *testing.T) {
})
}
}

// TestHandleSequencing tests the sequencing behavior of
// Handle.DeductTokensFor(), namely that we:
// - advance sequencing timestamps when the create-time advances;
// - advance sequencing timestamps when the log position advances.
func TestHandleSequencing(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// tzero represents the t=0, the earliest possible time. All other
// create-time=<duration> is relative to this time.
var tzero = timeutil.Unix(0, 0)

ctx := context.Background()
stream := kvflowcontrol.Stream{
TenantID: roachpb.MustMakeTenantID(42),
StoreID: roachpb.StoreID(42),
}
pos := func(t, i uint64) kvflowcontrolpb.RaftLogPosition {
return kvflowcontrolpb.RaftLogPosition{Term: t, Index: i}
}
ct := func(d int64) time.Time {
return tzero.Add(time.Nanosecond * time.Duration(d))
}

const tokens = kvflowcontrol.Tokens(1 << 20 /* MiB */)
const normal = admissionpb.NormalPri

registry := metric.NewRegistry()
clock := hlc.NewClockForTesting(nil)
controller := kvflowcontroller.New(registry, cluster.MakeTestingClusterSettings(), clock)
handle := kvflowhandle.New(controller, kvflowhandle.NewMetrics(registry), clock)

// Test setup: handle is connected to a single stream at pos=1/0 and has
// deducted 1MiB of regular tokens at pos=1 ct=1.
handle.ConnectStream(ctx, pos(1, 0), stream)
seq0 := handle.DeductTokensFor(ctx, normal, ct(1), pos(1, 1), tokens)

// If create-time advances, so does the sequencing timestamp.
seq1 := handle.DeductTokensFor(ctx, normal, ct(2), pos(1, 1), tokens)
require.Greater(t, seq1, seq0)

// If <create-time,log-position> stays static, the sequencing timestamp
// still advances.
seq2 := handle.DeductTokensFor(ctx, normal, ct(2), pos(1, 1), tokens)
require.Greater(t, seq2, seq1)

// If the log index advances, so does the sequencing timestamp.
seq3 := handle.DeductTokensFor(ctx, normal, ct(3), pos(1, 2), tokens)
require.Greater(t, seq3, seq2)

// If the log term advances, so does the sequencing timestamp.
seq4 := handle.DeductTokensFor(ctx, normal, ct(3), pos(2, 2), tokens)
require.Greater(t, seq4, seq3)

// If both the create-time and log-position advance, so does the sequencing
// timestamp.
seq5 := handle.DeductTokensFor(ctx, normal, ct(1000), pos(4, 20), tokens)
require.Greater(t, seq5, seq4)

// Verify that the sequencing timestamp is kept close to the maximum
// observed create-time.
require.LessOrEqual(t, seq5.Sub(ct(1000)), time.Nanosecond)
}
29 changes: 0 additions & 29 deletions pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/BUILD.bazel

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ func (h *replicaHandle) deductTokens(
// Increment the quorum log position -- all token deductions are bound to
// incrementing log positions.
h.quorumLogPosition.Index += 1
_, streams := h.handle.TestingDeductTokensForInner(ctx, pri, time.Time{}, h.quorumLogPosition, tokens)
streams := h.handle.TestingDeductTokensForInner(ctx, pri, h.quorumLogPosition, tokens)
for _, stream := range streams {
h.deductionTracker[stream].Track(ctx, pri, tokens, h.quorumLogPosition)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/admission/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"kv_slot_adjuster.go",
"pacer.go",
"scheduler_latency_listener.go",
"sequencer.go",
"sql_cpu_overload_indicator.go",
"store_token_estimation.go",
"testing_knobs.go",
Expand Down Expand Up @@ -56,6 +57,7 @@ go_test(
"io_load_listener_test.go",
"replicated_write_admission_test.go",
"scheduler_latency_listener_test.go",
"sequencer_test.go",
"store_token_estimation_test.go",
"tokens_linear_model_test.go",
"work_queue_test.go",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,9 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvflowsequencer
package admission

import (
"time"

"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

// Sequencer issues monotonic sequencing timestamps derived from observed
// sequencer issues monotonic sequencing timestamps derived from observed
// CreateTimes. This is a purpose-built data structure for replication admission
// control where we want to assign each AC-queued work below-raft a "sequence
// number" for FIFO ordering within a <tenant,priority>. We ensure timestamps
Expand All @@ -28,8 +22,8 @@ import (
// It's not safe for concurrent access.
//
// [1]: See I12 from kvflowcontrol/doc.go.
// [2]: See kvflowhandle.Handle.
type Sequencer struct {
// [2]: See kvadmission.AdmitRaftEntry.
type sequencer struct {
// maxCreateTime ratchets to the highest observed CreateTime. If sequencing
// work with lower CreateTimes, we continue generating monotonic sequence
// numbers by incrementing it for every such sequencing attempt. Provided
Expand All @@ -38,18 +32,12 @@ type Sequencer struct {
maxCreateTime int64
}

// New returns a new Sequencer.
func New() *Sequencer {
return &Sequencer{}
}

// Sequence returns a monotonically increasing timestamps derived from the
// sequence returns a monotonically increasing timestamps derived from the
// provided CreateTime.
func (s *Sequencer) Sequence(ct time.Time) time.Time {
createTime := ct.UnixNano()
func (s *sequencer) sequence(createTime int64) int64 {
if createTime <= s.maxCreateTime {
createTime = s.maxCreateTime + 1
}
s.maxCreateTime = createTime
return timeutil.FromUnixNanos(createTime)
return createTime
}
Loading

0 comments on commit e59abeb

Please sign in to comment.