From 1b01e63f81861a0e6a8aa4892ae105e3aa38c039 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Mon, 27 Feb 2023 13:15:56 -0500 Subject: [PATCH] kvflowcontrol: implement kvflowcontrol.Dispatch Part of #95563. Dispatch is a concrete implementation of the kvflowcontrol.Dispatch interface. It's used to dispatch information about admitted raft log entries to the specific nodes where (i) said entries originated, (ii) flow tokens were deducted and (iii) are waiting to be returned. This type is also used to read pending dispatches, which will be used in the raft transport layer when looking to piggyback information on traffic already bound to specific nodes. Since timely dispatching (read: piggybacking) is not guaranteed, we allow querying for all long-overdue dispatches. Internally it's able to coalesce dispatches bound for a given node. If dispatching admission information for two log entries with the same triple, with log positions L1 and L2 where L1 < L2, we can simply dispatch the one with L2. We leave the integration of this type with the {Store,}WorkQueue (#97599) + raft transport to future PRs. Release note: None --- pkg/BUILD.bazel | 4 + .../kvflowcontrol/kvflowdispatch/BUILD.bazel | 36 ++++ .../kvflowdispatch/kvflowdispatch.go | 107 +++++++++++ .../kvflowdispatch/kvflowdispatch_test.go | 169 ++++++++++++++++++ .../testdata/log_position_ordering | 25 +++ .../testdata/multiple_nodes_priorities_stores | 40 +++++ .../kvflowdispatch/testdata/single_dispatch | 25 +++ 7 files changed, 406 insertions(+) create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/BUILD.bazel create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/log_position_ordering create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/multiple_nodes_priorities_stores create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/single_dispatch diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 61609a1a933a..8a860ae412aa 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -213,6 +213,7 @@ ALL_TESTS = [ "//pkg/kv/kvserver/idalloc:idalloc_test", "//pkg/kv/kvserver/intentresolver:intentresolver_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller_test", + "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:kvflowdispatch_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker_test", @@ -1252,6 +1253,8 @@ GO_TARGETS = [ "//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller", "//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:kvflowcontrolpb", + "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:kvflowdispatch", + "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:kvflowdispatch_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle", "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test", @@ -2670,6 +2673,7 @@ GET_X_DATA_TARGETS = [ "//pkg/kv/kvserver/kvflowcontrol:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:get_x_data", + "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:get_x_data", diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/BUILD.bazel new file mode 100644 index 000000000000..a286451bb7e5 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/BUILD.bazel @@ -0,0 +1,36 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "kvflowdispatch", + srcs = ["kvflowdispatch.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kv/kvserver/kvflowcontrol", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", + "//pkg/roachpb", + "//pkg/util/admission/admissionpb", + "//pkg/util/syncutil", + ], +) + +go_test( + name = "kvflowdispatch_test", + srcs = ["kvflowdispatch_test.go"], + args = ["-test.timeout=295s"], + data = glob(["testdata/**"]), + embed = [":kvflowdispatch"], + deps = [ + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", + "//pkg/roachpb", + "//pkg/testutils/datapathutils", + "//pkg/util/admission/admissionpb", + "//pkg/util/leaktest", + "//pkg/util/log", + "@com_github_cockroachdb_datadriven//:datadriven", + "@com_github_stretchr_testify//require", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go new file mode 100644 index 000000000000..fddb450425cb --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go @@ -0,0 +1,107 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvflowdispatch + +import ( + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// Dispatch is a concrete implementation of the kvflowcontrol.Dispatch +// interface. It's used to (i) dispatch information about admitted raft log +// entries to specific nodes, and (ii) to read pending dispatches. +type Dispatch struct { + mu struct { + syncutil.Mutex + // outbox maintains pending dispatches on a per-node basis. + outbox map[roachpb.NodeID]dispatches + } +} + +// dispatchKey is used to coalesce dispatches bound for a given node. If +// transmitting two kvflowcontrolpb.AdmittedRaftLogEntries with the same +// triple, with UpToRaftLogPositions L1 and L2 +// where L1 < L2, we can simply dispatch the one with L2. +type dispatchKey struct { + roachpb.RangeID + roachpb.StoreID + admissionpb.WorkPriority +} + +type dispatches map[dispatchKey]kvflowcontrolpb.RaftLogPosition + +var _ kvflowcontrol.Dispatch = &Dispatch{} + +// New constructs a new Dispatch. +func New() *Dispatch { + d := &Dispatch{} + d.mu.outbox = make(map[roachpb.NodeID]dispatches) + return d +} + +// Dispatch is part of the kvflowcontrol.Dispatch interface. +func (d *Dispatch) Dispatch(nodeID roachpb.NodeID, entries kvflowcontrolpb.AdmittedRaftLogEntries) { + d.mu.Lock() + defer d.mu.Unlock() + + if _, ok := d.mu.outbox[nodeID]; !ok { + d.mu.outbox[nodeID] = dispatches{} + } + + dk := dispatchKey{ + entries.RangeID, + entries.StoreID, + admissionpb.WorkPriority(entries.AdmissionPriority), + } + existing, found := d.mu.outbox[nodeID][dk] + if !found || existing.Less(entries.UpToRaftLogPosition) { + d.mu.outbox[nodeID][dk] = entries.UpToRaftLogPosition + } +} + +// PendingDispatch is part of the kvflowcontrol.Dispatch interface. +func (d *Dispatch) PendingDispatch() []roachpb.NodeID { + d.mu.Lock() + defer d.mu.Unlock() + + nodes := make([]roachpb.NodeID, 0, len(d.mu.outbox)) + for node := range d.mu.outbox { + nodes = append(nodes, node) + } + return nodes +} + +// PendingDispatchFor is part of the kvflowcontrol.Dispatch interface. +func (d *Dispatch) PendingDispatchFor( + nodeID roachpb.NodeID, +) []kvflowcontrolpb.AdmittedRaftLogEntries { + d.mu.Lock() + defer d.mu.Unlock() + + if _, ok := d.mu.outbox[nodeID]; !ok { + return nil + } + + var entries []kvflowcontrolpb.AdmittedRaftLogEntries + for key, dispatch := range d.mu.outbox[nodeID] { + entries = append(entries, kvflowcontrolpb.AdmittedRaftLogEntries{ + RangeID: key.RangeID, + StoreID: key.StoreID, + AdmissionPriority: int32(key.WorkPriority), + UpToRaftLogPosition: dispatch, + }) + } + delete(d.mu.outbox, nodeID) + return entries +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go new file mode 100644 index 000000000000..5caf3f67343a --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go @@ -0,0 +1,169 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvflowdispatch + +import ( + "fmt" + "sort" + "strconv" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/datadriven" + "github.com/stretchr/testify/require" +) + +func TestDispatch(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + reverseWorkPriorityDict := make(map[string]admissionpb.WorkPriority) + for k, v := range admissionpb.WorkPriorityDict { + reverseWorkPriorityDict[v] = k + } + + datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) { + var dispatch *Dispatch + datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "init": + dispatch = New() + return "" + + case "dispatch": + require.NotNilf(t, dispatch, "uninitialized dispatch (did you use 'init'?)") + + for _, line := range strings.Split(d.Input, "\n") { + parts := strings.Fields(line) + require.Len(t, parts, 5, "expected form 'node=n range=r pri= store=s up-to-log-position=/'") + + var ( + entries kvflowcontrolpb.AdmittedRaftLogEntries + nodeID roachpb.NodeID + ) + for i := range parts { + parts[i] = strings.TrimSpace(parts[i]) + inner := strings.Split(parts[i], "=") + require.Len(t, inner, 2) + arg := strings.TrimSpace(inner[1]) + + switch { + case strings.HasPrefix(parts[i], "node="): + // Parse node=n. + ni, err := strconv.Atoi(strings.TrimPrefix(arg, "n")) + require.NoError(t, err) + nodeID = roachpb.NodeID(ni) + + case strings.HasPrefix(parts[i], "range="): + // Parse range=r. + ri, err := strconv.Atoi(strings.TrimPrefix(arg, "r")) + require.NoError(t, err) + entries.RangeID = roachpb.RangeID(ri) + + case strings.HasPrefix(parts[i], "store="): + // Parse store=s. + si, err := strconv.Atoi(strings.TrimPrefix(arg, "s")) + require.NoError(t, err) + entries.StoreID = roachpb.StoreID(si) + + case strings.HasPrefix(parts[i], "pri="): + // Parse pri=. + pri, found := reverseWorkPriorityDict[arg] + require.True(t, found) + entries.AdmissionPriority = int32(pri) + + case strings.HasPrefix(parts[i], "up-to-log-position="): + // Parse up-to-log-position=/. + entries.UpToRaftLogPosition = parseLogPosition(t, arg) + + default: + t.Fatalf("unrecognized prefix: %s", parts[i]) + } + } + dispatch.Dispatch(nodeID, entries) + } + return "" + + case "pending-dispatch": + require.NotNilf(t, dispatch, "uninitialized dispatch (did you use 'init'?)") + var buf strings.Builder + nodes := dispatch.PendingDispatch() + sort.Slice(nodes, func(i, j int) bool { // for determinism + return nodes[i] < nodes[j] + }) + for i, node := range nodes { + if i != 0 { + buf.WriteString("\n") + } + buf.WriteString(fmt.Sprintf("node=n%d", node)) + } + return buf.String() + + case "pending-dispatch-for": + require.NotNilf(t, dispatch, "uninitialized dispatch (did you use 'init'?)") + var arg string + d.ScanArgs(t, "node", &arg) + ni, err := strconv.Atoi(strings.TrimPrefix(arg, "n")) + require.NoError(t, err) + var buf strings.Builder + es := dispatch.PendingDispatchFor(roachpb.NodeID(ni)) + sort.Slice(es, func(i, j int) bool { // for determinism + if es[i].RangeID != es[j].RangeID { + return es[i].RangeID < es[j].RangeID + } + if es[i].StoreID != es[j].StoreID { + return es[i].StoreID < es[j].StoreID + } + if es[i].AdmissionPriority != es[j].AdmissionPriority { + return es[i].AdmissionPriority < es[j].AdmissionPriority + } + return es[i].UpToRaftLogPosition.Less(es[j].UpToRaftLogPosition) + }) + for i, entries := range es { + if i != 0 { + buf.WriteString("\n") + } + buf.WriteString( + fmt.Sprintf("range=r%d pri=%s store=s%d up-to-log-position=%s", + entries.RangeID, + admissionpb.WorkPriority(entries.AdmissionPriority), + entries.StoreID, + entries.UpToRaftLogPosition, + ), + ) + } + return buf.String() + + default: + return fmt.Sprintf("unknown command: %s", d.Cmd) + } + }) + }) +} + +func parseLogPosition(t *testing.T, input string) kvflowcontrolpb.RaftLogPosition { + inner := strings.Split(input, "/") + require.Len(t, inner, 2) + term, err := strconv.Atoi(inner[0]) + require.NoError(t, err) + index, err := strconv.Atoi(inner[1]) + require.NoError(t, err) + return kvflowcontrolpb.RaftLogPosition{ + Term: uint64(term), + Index: uint64(index), + } +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/log_position_ordering b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/log_position_ordering new file mode 100644 index 000000000000..947588448e74 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/log_position_ordering @@ -0,0 +1,25 @@ +# Verify that dispatches get coalesced correctly. All things equal, if +# dispatching with a higher up-to-log-position, we'll ignore the lower entries. + +init +---- + +dispatch +node=n1 range=r1 pri=normal-pri store=s1 up-to-log-position=4/20 +node=n1 range=r1 pri=normal-pri store=s1 up-to-log-position=5/20 +---- + +pending-dispatch-for node=n1 +---- +range=r1 pri=normal-pri store=s1 up-to-log-position=log-position=5/20 + +dispatch +node=n1 range=r1 pri=normal-pri store=s1 up-to-log-position=6/20 +node=n1 range=r1 pri=normal-pri store=s1 up-to-log-position=6/19 +---- + +pending-dispatch-for node=n1 +---- +range=r1 pri=normal-pri store=s1 up-to-log-position=log-position=6/20 + +# vim:ft=sh diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/multiple_nodes_priorities_stores b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/multiple_nodes_priorities_stores new file mode 100644 index 000000000000..629b4be08ca3 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/multiple_nodes_priorities_stores @@ -0,0 +1,40 @@ +# Verify that we can maintain dispatches for: +# - multiple nodes (n1 and n2 below, reading from one doesn't affect the +# other); +# - multiple stores (s2 and s3 from n2 below, where dispatches are not +# coalesced); +# - multiple priorities (high-pri and normal-pri for below, where +# dispatches are not coalesced); +init +---- + +dispatch +node=n1 range=r1 pri=normal-pri store=s1 up-to-log-position=4/20 +node=n2 range=r2 pri=normal-pri store=s2 up-to-log-position=5/20 +node=n2 range=r3 pri=normal-pri store=s3 up-to-log-position=5/21 +node=n2 range=r3 pri=high-pri store=s3 up-to-log-position=5/22 +---- + +pending-dispatch +---- +node=n1 +node=n2 + +pending-dispatch-for node=n1 +---- +range=r1 pri=normal-pri store=s1 up-to-log-position=log-position=4/20 + +pending-dispatch +---- +node=n2 + +pending-dispatch-for node=n2 +---- +range=r2 pri=normal-pri store=s2 up-to-log-position=log-position=5/20 +range=r3 pri=normal-pri store=s3 up-to-log-position=log-position=5/21 +range=r3 pri=high-pri store=s3 up-to-log-position=log-position=5/22 + +pending-dispatch +---- + +# vim:ft=sh diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/single_dispatch b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/single_dispatch new file mode 100644 index 000000000000..c74a021ecce3 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/single_dispatch @@ -0,0 +1,25 @@ +# Verify that we can issue a single dispatch, and that it gets removed +# appropriately. + +init +---- + +dispatch +node=n1 range=r1 pri=normal-pri store=s1 up-to-log-position=4/20 +---- + +pending-dispatch +---- +node=n1 + +pending-dispatch-for node=n1 +---- +range=r1 pri=normal-pri store=s1 up-to-log-position=log-position=4/20 + +pending-dispatch +---- + +pending-dispatch-for node=n1 +---- + +# vim:ft=sh