Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

admission: move CreateTime-sequencing below-raft #102819

Merged
merged 3 commits into from
May 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ ALL_TESTS = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:kvflowdispatch_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer:kvflowsequencer_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker_test",
"//pkg/kv/kvserver/kvstorage:kvstorage_test",
Expand Down Expand Up @@ -1303,8 +1302,6 @@ GO_TARGETS = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:kvflowdispatch_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer:kvflowsequencer",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer:kvflowsequencer_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker_test",
Expand Down Expand Up @@ -2770,7 +2767,6 @@ GET_X_DATA_TARGETS = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:get_x_data",
"//pkg/kv/kvserver/kvserverbase:get_x_data",
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/kvflowcontrol/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ package kvflowcontrol
// still queued after ~100ms, will trigger epoch-LIFO everywhere.
// [^11]: See the implementation for kvflowcontrol.Dispatch.
// [^12]: See UpToRaftLogPosition in AdmittedRaftLogEntries.
// [^13]: See kvflowsequencer.Sequencer and its use in kvflowhandle.Handle.
// [^13]: See admission.sequencer and its use in admission.StoreWorkQueue.
// [^14]: See the high_create_time_low_position_different_range test case for
// TestReplicatedWriteAdmission.
//
Expand Down
8 changes: 3 additions & 5 deletions pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,11 @@ type Handle interface {
// work with given priority along connected streams. The deduction is
// tracked with respect to the specific raft log position it's expecting it
// to end up in, log positions that monotonically increase. Requests are
// assumed to have been Admit()-ed first. The returned time.Time parameter
// is to be used as the work item's CreateTime when enqueueing in IO
// admission queues.
// assumed to have been Admit()-ed first.
DeductTokensFor(
context.Context, admissionpb.WorkPriority, time.Time,
context.Context, admissionpb.WorkPriority,
kvflowcontrolpb.RaftLogPosition, Tokens,
) time.Time
)
// ReturnTokensUpto returns all previously deducted tokens of a given
// priority for all log positions less than or equal to the one specified.
// It does for the specific stream. Once returned, subsequent attempts to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,9 @@ func (c *Controller) Admit(
}
}

// TODO(irfansharif): Use the create time for ordering among waiting
// requests. Integrate it with epoch-LIFO.
// TODO(irfansharif): Use CreateTime for ordering among waiting
// requests, integrate it with epoch-LIFO. See I12 from
// kvflowcontrol/doc.go.
}

// DeductTokens is part of the kvflowcontrol.Controller interface.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@ func TestDispatch(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

reverseWorkPriorityDict := make(map[string]admissionpb.WorkPriority)
for k, v := range admissionpb.WorkPriorityDict {
reverseWorkPriorityDict[v] = k
}

datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) {
var dispatch *Dispatch
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
Expand Down Expand Up @@ -82,7 +77,7 @@ func TestDispatch(t *testing.T) {

case strings.HasPrefix(parts[i], "pri="):
// Parse pri=<string>.
pri, found := reverseWorkPriorityDict[arg]
pri, found := admissionpb.TestingReverseWorkPriorityDict[arg]
require.True(t, found)
entries.AdmissionPriority = int32(pri)

Expand Down
2 changes: 0 additions & 2 deletions pkg/kv/kvserver/kvflowcontrol/kvflowhandle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ go_library(
"//pkg/base",
"//pkg/kv/kvserver/kvflowcontrol",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker",
"//pkg/util/admission/admissionpb",
"//pkg/util/hlc",
Expand All @@ -40,7 +39,6 @@ go_test(
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/timeutil",
"@com_github_stretchr_testify//require",
],
)
Expand Down
23 changes: 8 additions & 15 deletions pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand All @@ -41,7 +40,6 @@ type Handle struct {
// (identified by their log positions) have been admitted below-raft,
// streams disconnect, or the handle closed entirely.
perStreamTokenTracker map[kvflowcontrol.Stream]*kvflowtokentracker.Tracker
sequencer *kvflowsequencer.Sequencer
closed bool
}
}
Expand All @@ -54,7 +52,6 @@ func New(controller kvflowcontrol.Controller, metrics *Metrics, clock *hlc.Clock
clock: clock,
}
h.mu.perStreamTokenTracker = map[kvflowcontrol.Stream]*kvflowtokentracker.Tracker{}
h.mu.sequencer = kvflowsequencer.New()
return h
}

Expand Down Expand Up @@ -104,39 +101,36 @@ func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct tim
func (h *Handle) DeductTokensFor(
ctx context.Context,
pri admissionpb.WorkPriority,
ct time.Time,
pos kvflowcontrolpb.RaftLogPosition,
tokens kvflowcontrol.Tokens,
) time.Time {
) {
if h == nil {
// TODO(irfansharif): See TODO around nil receiver check in Admit().
return ct
return
}

ct, _ = h.deductTokensForInner(ctx, pri, ct, pos, tokens)
return ct
_ = h.deductTokensForInner(ctx, pri, pos, tokens)
}

func (h *Handle) deductTokensForInner(
ctx context.Context,
pri admissionpb.WorkPriority,
ct time.Time,
pos kvflowcontrolpb.RaftLogPosition,
tokens kvflowcontrol.Tokens,
) (sequence time.Time, streams []kvflowcontrol.Stream) {
) (streams []kvflowcontrol.Stream) {
h.mu.Lock()
defer h.mu.Unlock()
if h.mu.closed {
log.Errorf(ctx, "operating on a closed handle")
return ct, nil // unused return value in production code
return nil // unused return value in production code
}

for _, c := range h.mu.connections {
h.controller.DeductTokens(ctx, pri, tokens, c.Stream())
h.mu.perStreamTokenTracker[c.Stream()].Track(ctx, pri, tokens, pos)
streams = append(streams, c.Stream())
}
return h.mu.sequencer.Sequence(ct), streams
return streams
}

// ReturnTokensUpto is part of the kvflowcontrol.Handle interface.
Expand Down Expand Up @@ -322,9 +316,8 @@ func (h *Handle) TestingNonBlockingAdmit(
func (h *Handle) TestingDeductTokensForInner(
ctx context.Context,
pri admissionpb.WorkPriority,
ct time.Time,
pos kvflowcontrolpb.RaftLogPosition,
tokens kvflowcontrol.Tokens,
) (time.Time, []kvflowcontrol.Stream) {
return h.deductTokensForInner(ctx, pri, ct, pos, tokens)
) []kvflowcontrol.Stream {
return h.deductTokensForInner(ctx, pri, pos, tokens)
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -80,7 +79,7 @@ func TestHandleAdmit(t *testing.T) {
// Connect a single stream at pos=0 and deplete all 16MiB of regular
// tokens at pos=1.
handle.ConnectStream(ctx, pos(0), stream)
handle.DeductTokensFor(ctx, admissionpb.NormalPri, time.Time{}, pos(1), kvflowcontrol.Tokens(16<<20 /* 16MiB */))
handle.DeductTokensFor(ctx, admissionpb.NormalPri, pos(1), kvflowcontrol.Tokens(16<<20 /* 16MiB */))

// Invoke .Admit() in a separate goroutine, and test below whether
// the goroutine is blocked.
Expand All @@ -106,67 +105,3 @@ func TestHandleAdmit(t *testing.T) {
})
}
}

// TestHandleSequencing tests the sequencing behavior of
// Handle.DeductTokensFor(), namely that we:
// - advance sequencing timestamps when the create-time advances;
// - advance sequencing timestamps when the log position advances.
func TestHandleSequencing(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// tzero represents the t=0, the earliest possible time. All other
// create-time=<duration> is relative to this time.
var tzero = timeutil.Unix(0, 0)

ctx := context.Background()
stream := kvflowcontrol.Stream{
TenantID: roachpb.MustMakeTenantID(42),
StoreID: roachpb.StoreID(42),
}
pos := func(t, i uint64) kvflowcontrolpb.RaftLogPosition {
return kvflowcontrolpb.RaftLogPosition{Term: t, Index: i}
}
ct := func(d int64) time.Time {
return tzero.Add(time.Nanosecond * time.Duration(d))
}

const tokens = kvflowcontrol.Tokens(1 << 20 /* MiB */)
const normal = admissionpb.NormalPri

registry := metric.NewRegistry()
clock := hlc.NewClockForTesting(nil)
controller := kvflowcontroller.New(registry, cluster.MakeTestingClusterSettings(), clock)
handle := kvflowhandle.New(controller, kvflowhandle.NewMetrics(registry), clock)

// Test setup: handle is connected to a single stream at pos=1/0 and has
// deducted 1MiB of regular tokens at pos=1 ct=1.
handle.ConnectStream(ctx, pos(1, 0), stream)
seq0 := handle.DeductTokensFor(ctx, normal, ct(1), pos(1, 1), tokens)

// If create-time advances, so does the sequencing timestamp.
seq1 := handle.DeductTokensFor(ctx, normal, ct(2), pos(1, 1), tokens)
require.Greater(t, seq1, seq0)

// If <create-time,log-position> stays static, the sequencing timestamp
// still advances.
seq2 := handle.DeductTokensFor(ctx, normal, ct(2), pos(1, 1), tokens)
require.Greater(t, seq2, seq1)

// If the log index advances, so does the sequencing timestamp.
seq3 := handle.DeductTokensFor(ctx, normal, ct(3), pos(1, 2), tokens)
require.Greater(t, seq3, seq2)

// If the log term advances, so does the sequencing timestamp.
seq4 := handle.DeductTokensFor(ctx, normal, ct(3), pos(2, 2), tokens)
require.Greater(t, seq4, seq3)

// If both the create-time and log-position advance, so does the sequencing
// timestamp.
seq5 := handle.DeductTokensFor(ctx, normal, ct(1000), pos(4, 20), tokens)
require.Greater(t, seq5, seq4)

// Verify that the sequencing timestamp is kept close to the maximum
// observed create-time.
require.LessOrEqual(t, seq5.Sub(ct(1000)), time.Nanosecond)
}
29 changes: 0 additions & 29 deletions pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/BUILD.bazel

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ func (h *replicaHandle) deductTokens(
// Increment the quorum log position -- all token deductions are bound to
// incrementing log positions.
h.quorumLogPosition.Index += 1
_, streams := h.handle.TestingDeductTokensForInner(ctx, pri, time.Time{}, h.quorumLogPosition, tokens)
streams := h.handle.TestingDeductTokensForInner(ctx, pri, h.quorumLogPosition, tokens)
for _, stream := range streams {
h.deductionTracker[stream].Track(ctx, pri, tokens, h.quorumLogPosition)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,10 @@ func TestTracker(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

reverseWorkPriorityDict := make(map[string]admissionpb.WorkPriority)
for k, v := range admissionpb.WorkPriorityDict {
reverseWorkPriorityDict[v] = k
}

ctx := context.Background()
datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) {
var tracker *Tracker
knobs := &kvflowcontrol.TestingKnobs{
UntrackTokensInterceptor: func(tokens kvflowcontrol.Tokens, pos kvflowcontrolpb.RaftLogPosition) {

},
}
knobs := &kvflowcontrol.TestingKnobs{}
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "init":
Expand Down Expand Up @@ -73,7 +64,7 @@ func TestTracker(t *testing.T) {
switch {
case strings.HasPrefix(parts[i], "pri="):
var found bool
pri, found = reverseWorkPriorityDict[arg]
pri, found = admissionpb.TestingReverseWorkPriorityDict[arg]
require.True(t, found)

case strings.HasPrefix(parts[i], "tokens="):
Expand Down Expand Up @@ -103,7 +94,7 @@ func TestTracker(t *testing.T) {
var priStr, logPositionStr string
d.ScanArgs(t, "pri", &priStr)
d.ScanArgs(t, "up-to-log-position", &logPositionStr)
pri, found := reverseWorkPriorityDict[priStr]
pri, found := admissionpb.TestingReverseWorkPriorityDict[priStr]
require.True(t, found)
logPosition := parseLogPosition(t, logPositionStr)

Expand Down
3 changes: 1 addition & 2 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2298,8 +2298,7 @@ func (rpcCtx *Context) grpcDialNodeInternal(
// Run the heartbeat; this will block until the connection breaks for
// whatever reason. We don't actually have to do anything with the error,
// so we ignore it.
err := rpcCtx.runHeartbeat(ctx, conn, target)
log.Infof(ctx, "connection heartbeat loop ended with err: %v", err)
_ = rpcCtx.runHeartbeat(ctx, conn, target)
maybeFatal(ctx, rpcCtx.m.Remove(k, conn))

// Context gets canceled on server shutdown, and if that's likely why
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
if opts, ok := cfg.TestingKnobs.AdmissionControl.(*admission.Options); ok {
admissionOptions.Override(opts)
}
gcoords := admission.NewGrantCoordinators(cfg.AmbientCtx, st, admissionOptions, registry)
gcoords := admission.NewGrantCoordinators(cfg.AmbientCtx, st, admissionOptions, registry, &admission.NoopOnLogEntryAdmitted{})

engines, err := cfg.CreateEngines(ctx)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/admission/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"kv_slot_adjuster.go",
"pacer.go",
"scheduler_latency_listener.go",
"sequencer.go",
"sql_cpu_overload_indicator.go",
"store_token_estimation.go",
"testing_knobs.go",
Expand Down Expand Up @@ -56,6 +57,7 @@ go_test(
"io_load_listener_test.go",
"replicated_write_admission_test.go",
"scheduler_latency_listener_test.go",
"sequencer_test.go",
"store_token_estimation_test.go",
"tokens_linear_model_test.go",
"work_queue_test.go",
Expand Down
Loading