Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvflowcontrol: implement kvflowcontrol.Dispatch #97766

Merged
merged 1 commit into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ ALL_TESTS = [
"//pkg/kv/kvserver/idalloc:idalloc_test",
"//pkg/kv/kvserver/intentresolver:intentresolver_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:kvflowdispatch_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker_test",
Expand Down Expand Up @@ -1252,6 +1253,8 @@ GO_TARGETS = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:kvflowcontrolpb",
"//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:kvflowdispatch",
"//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:kvflowdispatch_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test",
Expand Down Expand Up @@ -2670,6 +2673,7 @@ GET_X_DATA_TARGETS = [
"//pkg/kv/kvserver/kvflowcontrol:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:get_x_data",
Expand Down
36 changes: 36 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "kvflowdispatch",
srcs = ["kvflowdispatch.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver/kvflowcontrol",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/roachpb",
"//pkg/util/admission/admissionpb",
"//pkg/util/syncutil",
],
)

go_test(
name = "kvflowdispatch_test",
srcs = ["kvflowdispatch_test.go"],
args = ["-test.timeout=295s"],
data = glob(["testdata/**"]),
embed = [":kvflowdispatch"],
deps = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/roachpb",
"//pkg/testutils/datapathutils",
"//pkg/util/admission/admissionpb",
"//pkg/util/leaktest",
"//pkg/util/log",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_stretchr_testify//require",
],
)

get_x_data(name = "get_x_data")
107 changes: 107 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvflowdispatch

import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

// Dispatch is a concrete implementation of the kvflowcontrol.Dispatch
// interface. It's used to (i) dispatch information about admitted raft log
// entries to specific nodes, and (ii) to read pending dispatches.
type Dispatch struct {
mu struct {
syncutil.Mutex
// outbox maintains pending dispatches on a per-node basis.
outbox map[roachpb.NodeID]dispatches
}
}

// dispatchKey is used to coalesce dispatches bound for a given node. If
// transmitting two kvflowcontrolpb.AdmittedRaftLogEntries with the same
// <RangeID,StoreID,WorkPriority> triple, with UpToRaftLogPositions L1 and L2
// where L1 < L2, we can simply dispatch the one with L2.
type dispatchKey struct {
roachpb.RangeID
roachpb.StoreID
admissionpb.WorkPriority
}

type dispatches map[dispatchKey]kvflowcontrolpb.RaftLogPosition

var _ kvflowcontrol.Dispatch = &Dispatch{}

// New constructs a new Dispatch.
func New() *Dispatch {
d := &Dispatch{}
d.mu.outbox = make(map[roachpb.NodeID]dispatches)
return d
}

// Dispatch is part of the kvflowcontrol.Dispatch interface.
func (d *Dispatch) Dispatch(nodeID roachpb.NodeID, entries kvflowcontrolpb.AdmittedRaftLogEntries) {
d.mu.Lock()
defer d.mu.Unlock()

if _, ok := d.mu.outbox[nodeID]; !ok {
d.mu.outbox[nodeID] = dispatches{}
}

dk := dispatchKey{
entries.RangeID,
entries.StoreID,
admissionpb.WorkPriority(entries.AdmissionPriority),
}
existing, found := d.mu.outbox[nodeID][dk]
if !found || existing.Less(entries.UpToRaftLogPosition) {
d.mu.outbox[nodeID][dk] = entries.UpToRaftLogPosition
}
}

// PendingDispatch is part of the kvflowcontrol.Dispatch interface.
func (d *Dispatch) PendingDispatch() []roachpb.NodeID {
d.mu.Lock()
defer d.mu.Unlock()

nodes := make([]roachpb.NodeID, 0, len(d.mu.outbox))
for node := range d.mu.outbox {
nodes = append(nodes, node)
}
return nodes
}

// PendingDispatchFor is part of the kvflowcontrol.Dispatch interface.
func (d *Dispatch) PendingDispatchFor(
nodeID roachpb.NodeID,
) []kvflowcontrolpb.AdmittedRaftLogEntries {
d.mu.Lock()
defer d.mu.Unlock()

if _, ok := d.mu.outbox[nodeID]; !ok {
return nil
}

var entries []kvflowcontrolpb.AdmittedRaftLogEntries
for key, dispatch := range d.mu.outbox[nodeID] {
entries = append(entries, kvflowcontrolpb.AdmittedRaftLogEntries{
RangeID: key.RangeID,
StoreID: key.StoreID,
AdmissionPriority: int32(key.WorkPriority),
UpToRaftLogPosition: dispatch,
})
}
delete(d.mu.outbox, nodeID)
return entries
}
169 changes: 169 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvflowdispatch

import (
"fmt"
"sort"
"strconv"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/datadriven"
"github.com/stretchr/testify/require"
)

func TestDispatch(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

reverseWorkPriorityDict := make(map[string]admissionpb.WorkPriority)
for k, v := range admissionpb.WorkPriorityDict {
reverseWorkPriorityDict[v] = k
}

datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) {
var dispatch *Dispatch
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "init":
dispatch = New()
return ""

case "dispatch":
require.NotNilf(t, dispatch, "uninitialized dispatch (did you use 'init'?)")

for _, line := range strings.Split(d.Input, "\n") {
parts := strings.Fields(line)
require.Len(t, parts, 5, "expected form 'node=n<int> range=r<int> pri=<string> store=s<int> up-to-log-position=<int>/<int>'")

var (
entries kvflowcontrolpb.AdmittedRaftLogEntries
nodeID roachpb.NodeID
)
for i := range parts {
parts[i] = strings.TrimSpace(parts[i])
inner := strings.Split(parts[i], "=")
require.Len(t, inner, 2)
arg := strings.TrimSpace(inner[1])

switch {
case strings.HasPrefix(parts[i], "node="):
// Parse node=n<int>.
ni, err := strconv.Atoi(strings.TrimPrefix(arg, "n"))
require.NoError(t, err)
nodeID = roachpb.NodeID(ni)

case strings.HasPrefix(parts[i], "range="):
// Parse range=r<int>.
ri, err := strconv.Atoi(strings.TrimPrefix(arg, "r"))
require.NoError(t, err)
entries.RangeID = roachpb.RangeID(ri)

case strings.HasPrefix(parts[i], "store="):
// Parse store=s<int>.
si, err := strconv.Atoi(strings.TrimPrefix(arg, "s"))
require.NoError(t, err)
entries.StoreID = roachpb.StoreID(si)

case strings.HasPrefix(parts[i], "pri="):
// Parse pri=<string>.
pri, found := reverseWorkPriorityDict[arg]
require.True(t, found)
entries.AdmissionPriority = int32(pri)

case strings.HasPrefix(parts[i], "up-to-log-position="):
// Parse up-to-log-position=<int>/<int>.
entries.UpToRaftLogPosition = parseLogPosition(t, arg)

default:
t.Fatalf("unrecognized prefix: %s", parts[i])
}
}
dispatch.Dispatch(nodeID, entries)
}
return ""

case "pending-dispatch":
require.NotNilf(t, dispatch, "uninitialized dispatch (did you use 'init'?)")
var buf strings.Builder
nodes := dispatch.PendingDispatch()
sort.Slice(nodes, func(i, j int) bool { // for determinism
return nodes[i] < nodes[j]
})
for i, node := range nodes {
if i != 0 {
buf.WriteString("\n")
}
buf.WriteString(fmt.Sprintf("node=n%d", node))
}
return buf.String()

case "pending-dispatch-for":
require.NotNilf(t, dispatch, "uninitialized dispatch (did you use 'init'?)")
var arg string
d.ScanArgs(t, "node", &arg)
ni, err := strconv.Atoi(strings.TrimPrefix(arg, "n"))
require.NoError(t, err)
var buf strings.Builder
es := dispatch.PendingDispatchFor(roachpb.NodeID(ni))
sort.Slice(es, func(i, j int) bool { // for determinism
if es[i].RangeID != es[j].RangeID {
return es[i].RangeID < es[j].RangeID
}
if es[i].StoreID != es[j].StoreID {
return es[i].StoreID < es[j].StoreID
}
if es[i].AdmissionPriority != es[j].AdmissionPriority {
return es[i].AdmissionPriority < es[j].AdmissionPriority
}
return es[i].UpToRaftLogPosition.Less(es[j].UpToRaftLogPosition)
})
for i, entries := range es {
if i != 0 {
buf.WriteString("\n")
}
buf.WriteString(
fmt.Sprintf("range=r%d pri=%s store=s%d up-to-log-position=%s",
entries.RangeID,
admissionpb.WorkPriority(entries.AdmissionPriority),
entries.StoreID,
entries.UpToRaftLogPosition,
),
)
}
return buf.String()

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
}
})
})
}

func parseLogPosition(t *testing.T, input string) kvflowcontrolpb.RaftLogPosition {
inner := strings.Split(input, "/")
require.Len(t, inner, 2)
term, err := strconv.Atoi(inner[0])
require.NoError(t, err)
index, err := strconv.Atoi(inner[1])
require.NoError(t, err)
return kvflowcontrolpb.RaftLogPosition{
Term: uint64(term),
Index: uint64(index),
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Verify that dispatches get coalesced correctly. All things equal, if
# dispatching with a higher up-to-log-position, we'll ignore the lower entries.

init
----

dispatch
node=n1 range=r1 pri=normal-pri store=s1 up-to-log-position=4/20
node=n1 range=r1 pri=normal-pri store=s1 up-to-log-position=5/20
----

pending-dispatch-for node=n1
----
range=r1 pri=normal-pri store=s1 up-to-log-position=log-position=5/20

dispatch
node=n1 range=r1 pri=normal-pri store=s1 up-to-log-position=6/20
node=n1 range=r1 pri=normal-pri store=s1 up-to-log-position=6/19
----

pending-dispatch-for node=n1
----
range=r1 pri=normal-pri store=s1 up-to-log-position=log-position=6/20

# vim:ft=sh
Loading