Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replica_rac2: add admitted state protocol #130074

Merged
merged 4 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions pkg/kv/kvserver/flow_control_raft_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,8 +681,11 @@ func TestFlowControlRaftTransportV2(t *testing.T) {
toNodeID := parseNodeID(t, d, "node")
toStoreID := parseStoreID(t, d, "store")
rangeID := parseRangeID(t, d, "range")
control.piggybacker.AddMsgAppRespForLeader(
toNodeID, toStoreID, rangeID, raftpb.Message{})
// TODO(pav-kv): test that these messages are actually sent in
// RaftMessageRequestBatch.
control.piggybacker.Add(toNodeID, kvflowcontrolpb.PiggybackedAdmittedState{
RangeID: rangeID, ToStoreID: toStoreID,
})
return ""

case "fallback-piggyback":
Expand Down
17 changes: 17 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,20 @@ func (a AdmittedResponseForRange) String() string {
func (a AdmittedResponseForRange) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("admitted-response (s%s r%s %s)", a.LeaderStoreID, a.RangeID, a.Msg.String())
}

func (a AdmittedState) String() string {
return redact.StringWithoutMarkers(a)
}

func (a AdmittedState) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("admitted=t%d/%s", a.Term, a.Admitted)
}

func (a PiggybackedAdmittedState) String() string {
return redact.StringWithoutMarkers(a)
}

func (a PiggybackedAdmittedState) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("[r%s,s%s,%d->%d] %s",
a.RangeID, a.ToStoreID, a.FromReplicaID, a.ToReplicaID, a.Admitted.String())
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ message RaftLogPosition {
// AdmittedResponseForRange is only used in RACv2. It contains a MsgAppResp
// from a follower to a leader, that was generated to advance the admitted
// vector for that follower, maintained by the leader.
//
// TODO(pav-kv): remove this type and use PiggybackedAdmittedState.
message AdmittedResponseForRange {
option (gogoproto.goproto_stringer) = false;

Expand All @@ -149,3 +151,44 @@ message AdmittedResponseForRange {
// Msg is the MsgAppResp containing the admitted vector.
raftpb.Message msg = 3 [(gogoproto.nullable) = false];
}

// AdmittedState communicates a replica's vector of admitted log indices at
// different priorities to the leader of a range.
//
// Used only in RACv2.
message AdmittedState {
option (gogoproto.goproto_stringer) = false;
// Term is the leader term of the log for which the Admitted indices were
// computed. The indices are consistent with this leader's log.
uint64 term = 1;
// Admitted contains admitted log indices for each priority < NumPriorities.
repeated uint64 admitted = 2;
}

// PiggybackedAdmittedState wraps the AdmittedState with the routing information
// needed to deliver the admitted vector to a particular leader replica, and for
// it to know who sent it.
//
// Used only in RACv2.
message PiggybackedAdmittedState {
option (gogoproto.goproto_stringer) = false;

// RangeID is the ID of the range to which this message is related. Used for
// routing at the leader node.
uint64 range_id = 1 [(gogoproto.customname) = "RangeID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.RangeID"];
// ToStoreID is the store at the leader containing the leader replica. Used
// for routing at the leader node.
uint64 to_store_id = 2 [(gogoproto.customname) = "ToStoreID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.StoreID"];

// FromReplicaID is the replica sending this message.
uint64 from_replica_id = 3 [(gogoproto.customname) = "FromReplicaID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.ReplicaID"];
// ToReplicaID is the leader replica receiving this message.
uint64 to_replica_id = 4 [(gogoproto.customname) = "ToReplicaID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.ReplicaID"];

// Admitted is the admitted vector at the sending replica.
AdmittedState admitted = 5 [(gogoproto.nullable) = false];
}
2 changes: 0 additions & 2 deletions pkg/kv/kvserver/kvflowcontrol/node_rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ go_library(
deps = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/kv/kvserver/kvflowcontrol/replica_rac2",
"//pkg/raft/raftpb",
"//pkg/roachpb",
"//pkg/util/syncutil",
],
Expand All @@ -24,7 +23,6 @@ go_test(
embed = [":node_rac2"],
deps = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/raft/raftpb",
"//pkg/roachpb",
"//pkg/testutils/datapathutils",
"//pkg/util/leaktest",
Expand Down
36 changes: 17 additions & 19 deletions pkg/kv/kvserver/kvflowcontrol/node_rac2/admitted_piggybacker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)
Expand All @@ -29,7 +28,7 @@ type PiggybackMsgReader interface {
// least one message will be popped.
PopMsgsForNode(
now time.Time, nodeID roachpb.NodeID, maxBytes int64,
) (msgs []kvflowcontrolpb.AdmittedResponseForRange, remainingMsgs int)
) (_ []kvflowcontrolpb.PiggybackedAdmittedState, remainingMsgs int)
// NodesWithMsgs is used to periodically drop msgs from disconnected nodes.
// See RaftTransport.dropFlowTokensForDisconnectedNodes.
NodesWithMsgs(now time.Time) []roachpb.NodeID
Expand All @@ -46,7 +45,7 @@ type AdmittedPiggybacker struct {
}

type rangeMap struct {
rangeMap map[roachpb.RangeID]kvflowcontrolpb.AdmittedResponseForRange
rangeMap map[roachpb.RangeID]kvflowcontrolpb.PiggybackedAdmittedState
transitionToEmptyTime time.Time
}

Expand All @@ -59,32 +58,28 @@ func NewAdmittedPiggybacker() *AdmittedPiggybacker {
var _ PiggybackMsgReader = &AdmittedPiggybacker{}
var _ replica_rac2.AdmittedPiggybacker = &AdmittedPiggybacker{}

// AddMsgAppRespForLeader implements replica_rac2.AdmittedPiggybacker.
func (ap *AdmittedPiggybacker) AddMsgAppRespForLeader(
nodeID roachpb.NodeID, storeID roachpb.StoreID, rangeID roachpb.RangeID, msg raftpb.Message,
// Add implements replica_rac2.AdmittedPiggybacker.
func (ap *AdmittedPiggybacker) Add(
nodeID roachpb.NodeID, msg kvflowcontrolpb.PiggybackedAdmittedState,
) {
ap.mu.Lock()
defer ap.mu.Unlock()
rm, ok := ap.mu.msgsForNode[nodeID]
if !ok {
rm = &rangeMap{rangeMap: map[roachpb.RangeID]kvflowcontrolpb.AdmittedResponseForRange{}}
rm = &rangeMap{rangeMap: map[roachpb.RangeID]kvflowcontrolpb.PiggybackedAdmittedState{}}
ap.mu.msgsForNode[nodeID] = rm
}
rm.rangeMap[rangeID] = kvflowcontrolpb.AdmittedResponseForRange{
LeaderStoreID: storeID,
RangeID: rangeID,
Msg: msg,
}
rm.rangeMap[msg.RangeID] = msg
}

// Made-up number. There are 10+ integers, all varint encoded, many of which
// Made-up number. There are < 10 integers, all varint encoded, many of which
// like nodeID, storeID, replicaIDs etc. will be small.
const admittedForRangeRACv2SizeBytes = 50
const admittedForRangeRACv2SizeBytes = 40

// PopMsgsForNode implements PiggybackMsgReader.
func (ap *AdmittedPiggybacker) PopMsgsForNode(
now time.Time, nodeID roachpb.NodeID, maxBytes int64,
) (msgs []kvflowcontrolpb.AdmittedResponseForRange, remainingMsgs int) {
) (_ []kvflowcontrolpb.PiggybackedAdmittedState, remainingMsgs int) {
if ap == nil {
return nil, 0
}
Expand All @@ -94,13 +89,16 @@ func (ap *AdmittedPiggybacker) PopMsgsForNode(
if !ok || len(rm.rangeMap) == 0 {
return nil, 0
}
maxEntries := maxBytes / admittedForRangeRACv2SizeBytes
// NB: +1 to include at least one entry.
maxEntries := maxBytes/admittedForRangeRACv2SizeBytes + 1
msgs := make([]kvflowcontrolpb.PiggybackedAdmittedState, 0,
min(int64(len(rm.rangeMap)), maxEntries))
for rangeID, msg := range rm.rangeMap {
msgs = append(msgs, msg)
delete(rm.rangeMap, rangeID)
if int64(len(msgs)) > maxEntries {
if len(msgs) == cap(msgs) {
break
}
msgs = append(msgs, msg)
delete(rm.rangeMap, rangeID)
}
n := len(rm.rangeMap)
if n == 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -38,14 +37,20 @@ func TestPiggybacker(t *testing.T) {
func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "add":
var nodeID, storeID, rangeID, match int
var nodeID, storeID, rangeID, from, to, term int
d.ScanArgs(t, "node-id", &nodeID)
d.ScanArgs(t, "store-id", &storeID)
d.ScanArgs(t, "range-id", &rangeID)
// Match is just a placeholder to differentiate messages in the test.
d.ScanArgs(t, "match", &match)
p.AddMsgAppRespForLeader(roachpb.NodeID(nodeID), roachpb.StoreID(storeID),
roachpb.RangeID(rangeID), raftpb.Message{Match: uint64(match)})
d.ScanArgs(t, "from", &from)
d.ScanArgs(t, "to", &to)
d.ScanArgs(t, "term", &term)
p.Add(roachpb.NodeID(nodeID), kvflowcontrolpb.PiggybackedAdmittedState{
RangeID: roachpb.RangeID(rangeID),
ToStoreID: roachpb.StoreID(storeID),
FromReplicaID: roachpb.ReplicaID(from),
ToReplicaID: roachpb.ReplicaID(to),
Admitted: kvflowcontrolpb.AdmittedState{Term: uint64(term)},
})
return ""

case "nodes-with-msgs":
Expand All @@ -71,13 +76,13 @@ func TestPiggybacker(t *testing.T) {
var nodeID int
d.ScanArgs(t, "node-id", &nodeID)
msgs, remaining := p.PopMsgsForNode(ts, roachpb.NodeID(nodeID), math.MaxInt64)
slices.SortFunc(msgs, func(a, b kvflowcontrolpb.AdmittedResponseForRange) int {
slices.SortFunc(msgs, func(a, b kvflowcontrolpb.PiggybackedAdmittedState) int {
return cmp.Compare(a.RangeID, b.RangeID)
})
var b strings.Builder
fmt.Fprintf(&b, "msgs:\n")
for _, msg := range msgs {
fmt.Fprintf(&b, "s%s, r%s, match=%d\n", msg.LeaderStoreID, msg.RangeID, msg.Msg.Match)
fmt.Fprintf(&b, "%s\n", msg)
}
fmt.Fprintf(&b, "remaining-msgs: %d\n", remaining)
return b.String()
Expand All @@ -100,15 +105,15 @@ func TestPiggybackerMaxBytes(t *testing.T) {
defer log.Scope(t).Close(t)

p := NewAdmittedPiggybacker()
p.AddMsgAppRespForLeader(1, 1, 1, raftpb.Message{})
p.AddMsgAppRespForLeader(1, 1, 2, raftpb.Message{})
p.Add(1, kvflowcontrolpb.PiggybackedAdmittedState{RangeID: 1, ToStoreID: 1})
p.Add(1, kvflowcontrolpb.PiggybackedAdmittedState{RangeID: 2, ToStoreID: 1})
// Both are popped.
msgs, remaining := p.PopMsgsForNode(time.UnixMilli(1), 1, 60)
require.Equal(t, 2, len(msgs))
require.Equal(t, 0, remaining)

p.AddMsgAppRespForLeader(1, 1, 1, raftpb.Message{})
p.AddMsgAppRespForLeader(1, 1, 2, raftpb.Message{})
p.Add(1, kvflowcontrolpb.PiggybackedAdmittedState{RangeID: 1, ToStoreID: 1})
p.Add(1, kvflowcontrolpb.PiggybackedAdmittedState{RangeID: 2, ToStoreID: 1})
// Only one is popped.
msgs, remaining = p.PopMsgsForNode(time.UnixMilli(1), 1, 20)
require.Equal(t, 1, len(msgs))
Expand Down
14 changes: 7 additions & 7 deletions pkg/kv/kvserver/kvflowcontrol/node_rac2/testdata/piggybacker
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ msgs:
remaining-msgs: 0

# Add for node 1.
add node-id=1 store-id=2 range-id=3 match=6
add node-id=1 store-id=2 range-id=3 from=2 to=1 term=6
----

# Add for node 11.
add node-id=11 store-id=12 range-id=13 match=14
add node-id=11 store-id=12 range-id=13 from=3 to=1 term=14
----

nodes-with-msgs time-sec=2
Expand All @@ -23,15 +23,15 @@ n1 n11
map len: 2

# Add another for node 11, for a different range.
add node-id=11 store-id=22 range-id=23 match=24
add node-id=11 store-id=22 range-id=23 from=2 to=1 term=24
----

# Pop both for node 11.
pop node-id=11 time-sec=2
----
msgs:
s12, r13, match=14
s22, r23, match=24
[r13,s12,3->1] admitted=t14/[]
[r23,s22,2->1] admitted=t24/[]
remaining-msgs: 0

# There is still an empty map entry for node 11.
Expand All @@ -47,14 +47,14 @@ n1
map len: 1

# Overwrite the msg for the range at node 1.
add node-id=1 store-id=2 range-id=3 match=7
add node-id=1 store-id=2 range-id=3 from=2 to=1 term=25
----

# Pop for node 1. There was only one msg.
pop node-id=1 time-sec=64
----
msgs:
s2, r3, match=7
[r3,s2,2->1] admitted=t25/[]
remaining-msgs: 0

# The map entry for node 1 is garbage collected.
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver/kvflowcontrol",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/kv/kvserver/kvflowcontrol/rac2",
"//pkg/kv/kvserver/raftlog",
"//pkg/raft/raftpb",
Expand Down
23 changes: 15 additions & 8 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
Expand Down Expand Up @@ -132,13 +133,13 @@ type RaftNode interface {
StepMsgAppRespForAdmittedLocked(raftpb.Message) error
}

// AdmittedPiggybacker is used to enqueue MsgAppResp messages whose purpose is
// to advance Admitted. For efficiency, these need to be piggybacked on other
// messages being sent to the given leader node. The StoreID and RangeID are
// provided so that the leader node can route the incoming message to the
// relevant range.
// AdmittedPiggybacker is used to enqueue admitted vector messages addressed to
// replicas on a particular node. For efficiency, these need to be piggybacked
// on other messages being sent to the given leader node. The store / range /
// replica IDs are provided so that the leader node can route the incoming
// message to the relevant range.
type AdmittedPiggybacker interface {
AddMsgAppRespForLeader(roachpb.NodeID, roachpb.StoreID, roachpb.RangeID, raftpb.Message)
Add(roachpb.NodeID, kvflowcontrolpb.PiggybackedAdmittedState)
}

// EntryForAdmission is the information provided to the admission control (AC)
Expand Down Expand Up @@ -752,8 +753,14 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2.
p.opts.Replica.MuUnlock()
if p.mu.leader.rc == nil && p.mu.leaderNodeID != 0 {
// Follower, and know leaderNodeID, leaderStoreID.
p.opts.AdmittedPiggybacker.AddMsgAppRespForLeader(
p.mu.leaderNodeID, p.mu.leaderStoreID, p.opts.RangeID, msgResp)
// TODO(pav-kv): populate the message correctly.
p.opts.AdmittedPiggybacker.Add(p.mu.leaderNodeID, kvflowcontrolpb.PiggybackedAdmittedState{
RangeID: p.opts.RangeID,
ToStoreID: p.mu.leaderStoreID,
FromReplicaID: p.opts.ReplicaID,
ToReplicaID: roachpb.ReplicaID(msgResp.To),
Admitted: kvflowcontrolpb.AdmittedState{},
})
}
// Else if the local replica is the leader, we have already told it
// about the update by calling SetAdmittedLocked. If the leader is not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,10 @@ type testAdmittedPiggybacker struct {
b *strings.Builder
}

func (p *testAdmittedPiggybacker) AddMsgAppRespForLeader(
n roachpb.NodeID, s roachpb.StoreID, r roachpb.RangeID, msg raftpb.Message,
func (p *testAdmittedPiggybacker) Add(
n roachpb.NodeID, m kvflowcontrolpb.PiggybackedAdmittedState,
) {
fmt.Fprintf(p.b, " Piggybacker.AddMsgAppRespForLeader(leader=(n%s,s%s,r%s), msg=%s)\n",
n, s, r, msgString(msg))
fmt.Fprintf(p.b, " Piggybacker.Add(n%s, %s)\n", n, m)
}

type testACWorkQueue struct {
Expand Down
Loading
Loading