Skip to content

Commit

Permalink
raft: use ProgressMap instead of ProgressTracker for config changes
Browse files Browse the repository at this point in the history
This patch pulls MaxInflight and MaxInflightBytes up a level, on to the
raft struct. This then allows us to only copy over the ProgressMap,
instead of the entire ProgressTracker, when constructing a Changer for
config changes. This way, there's only one Config on the changer (which
is updated during a configuration change), lest we access the wrong one.

Informs #125265

Release note: None
  • Loading branch information
arulajmani committed Jul 19, 2024
1 parent 636ebaf commit ec7a9ce
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 73 deletions.
8 changes: 5 additions & 3 deletions pkg/kv/kvserver/client_atomic_membership_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,11 @@ func TestAtomicReplicationChange(t *testing.T) {
// a hair earlier.
cfg := quorum.MakeEmptyConfig()
cfg, _, err = confchange.Restore(confchange.Changer{
Tracker: tracker.MakeProgressTracker(&cfg, 1, 0),
Config: cfg,
LastIndex: 1,
ProgressMap: tracker.MakeProgressTracker(&cfg).Progress,
Config: cfg,
MaxInflight: 1,
MaxInflightBytes: 0,
LastIndex: 1,
}, desc.Replicas().ConfState())
require.NoError(t, err)
act := r.RaftStatus().Config.Voters
Expand Down
12 changes: 7 additions & 5 deletions pkg/raft/confchange/confchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ import (
// refusing invalid configuration changes before they affect the active
// configuration.
type Changer struct {
Config quorum.Config
Tracker tracker.ProgressTracker
LastIndex uint64
Config quorum.Config
ProgressMap tracker.ProgressMap
MaxInflight int
MaxInflightBytes uint64
LastIndex uint64
}

// EnterJoint verifies that the outgoing (=right) majority config of the joint
Expand Down Expand Up @@ -271,7 +273,7 @@ func (c Changer) initProgress(
// making the first index the better choice).
Match: 0,
Next: max(c.LastIndex, 1), // invariant: Match < Next
Inflights: tracker.NewInflights(c.Tracker.MaxInflight, c.Tracker.MaxInflightBytes),
Inflights: tracker.NewInflights(c.MaxInflight, c.MaxInflightBytes),
IsLearner: isLearner,
// When a node is first added, we should mark it as recently active.
// Otherwise, CheckQuorum may cause us to step down if it is invoked
Expand Down Expand Up @@ -348,7 +350,7 @@ func (c Changer) checkAndCopy() (quorum.Config, tracker.ProgressMap, error) {
cfg := c.Config.Clone()
trk := tracker.ProgressMap{}

for id, pr := range c.Tracker.Progress {
for id, pr := range c.ProgressMap {
// A shallow copy is enough because we only mutate the Learner field.
ppr := *pr
trk[id] = &ppr
Expand Down
21 changes: 11 additions & 10 deletions pkg/raft/confchange/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ import (
func TestConfChangeDataDriven(t *testing.T) {
datadriven.Walk(t, "testdata", func(t *testing.T, path string) {
cfg := quorum.MakeEmptyConfig()
tr := tracker.MakeProgressTracker(&cfg, 10, 0)
c := Changer{
Config: cfg,
Tracker: tr,
LastIndex: 0, // incremented in this test with each cmd
Config: cfg,
ProgressMap: tracker.MakeProgressTracker(&cfg).Progress,
MaxInflight: 10,
MaxInflightBytes: 0,
LastIndex: 0, // incremented in this test with each cmd
}

// The test files use the commands
Expand Down Expand Up @@ -85,31 +86,31 @@ func TestConfChangeDataDriven(t *testing.T) {
}

var cfg quorum.Config
var trk tracker.ProgressMap
var progressMap tracker.ProgressMap
var err error
switch d.Cmd {
case "simple":
cfg, trk, err = c.Simple(ccs...)
cfg, progressMap, err = c.Simple(ccs...)
case "enter-joint":
var autoLeave bool
if len(d.CmdArgs) > 0 {
d.ScanArgs(t, "autoleave", &autoLeave)
}
cfg, trk, err = c.EnterJoint(autoLeave, ccs...)
cfg, progressMap, err = c.EnterJoint(autoLeave, ccs...)
case "leave-joint":
if len(ccs) > 0 {
err = errors.New("this command takes no input")
} else {
cfg, trk, err = c.LeaveJoint()
cfg, progressMap, err = c.LeaveJoint()
}
default:
return "unknown command"
}
if err != nil {
return err.Error() + "\n"
}
c.Config, c.Tracker.Progress = cfg, trk
return fmt.Sprintf("%s\n%s", c.Config, c.Tracker.Progress)
c.Config, c.ProgressMap = cfg, progressMap
return fmt.Sprintf("%s\n%s", c.Config, c.ProgressMap)
})
})
}
35 changes: 18 additions & 17 deletions pkg/raft/confchange/quick_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestConfChangeQuick(t *testing.T) {
const infoCount = 5

runWithJoint := func(c *Changer, ccs []pb.ConfChangeSingle) error {
cfg, trk, err := c.EnterJoint(false /* autoLeave */, ccs...)
cfg, progressMap, err := c.EnterJoint(false /* autoLeave */, ccs...)
if err != nil {
return err
}
Expand All @@ -52,29 +52,29 @@ func TestConfChangeQuick(t *testing.T) {
return err
}
cfg2a.AutoLeave = false
if !reflect.DeepEqual(cfg, cfg2a) || !reflect.DeepEqual(trk, trk2a) {
return fmt.Errorf("cfg: %+v\ncfg2a: %+v\ntrk: %+v\ntrk2a: %+v",
cfg, cfg2a, trk, trk2a)
if !reflect.DeepEqual(cfg, cfg2a) || !reflect.DeepEqual(progressMap, trk2a) {
return fmt.Errorf("cfg: %+v\ncfg2a: %+v\nprogressMap: %+v\ntrk2a: %+v",
cfg, cfg2a, progressMap, trk2a)
}
c.Config = cfg
c.Tracker.Progress = trk
c.ProgressMap = progressMap
cfg2b, trk2b, err := c.LeaveJoint()
if err != nil {
return err
}
// Reset back to the main branch with autoLeave=false.
c.Config = cfg
c.Tracker.Progress = trk
cfg, trk, err = c.LeaveJoint()
c.ProgressMap = progressMap
cfg, progressMap, err = c.LeaveJoint()
if err != nil {
return err
}
if !reflect.DeepEqual(cfg, cfg2b) || !reflect.DeepEqual(trk, trk2b) {
return fmt.Errorf("cfg: %+v\ncfg2b: %+v\ntrk: %+v\ntrk2b: %+v",
cfg, cfg2b, trk, trk2b)
if !reflect.DeepEqual(cfg, cfg2b) || !reflect.DeepEqual(progressMap, trk2b) {
return fmt.Errorf("cfg: %+v\ncfg2b: %+v\nprogressMap: %+v\ntrk2b: %+v",
cfg, cfg2b, progressMap, trk2b)
}
c.Config = cfg
c.Tracker.Progress = trk
c.ProgressMap = progressMap
return nil
}

Expand All @@ -84,7 +84,7 @@ func TestConfChangeQuick(t *testing.T) {
if err != nil {
return err
}
c.Config, c.Tracker.Progress = cfg, trk
c.Config, c.ProgressMap = cfg, trk
}
return nil
}
Expand All @@ -94,11 +94,12 @@ func TestConfChangeQuick(t *testing.T) {
wrapper := func(invoke testFunc) func(setup initialChanges, ccs confChanges) (*Changer, error) {
return func(setup initialChanges, ccs confChanges) (*Changer, error) {
cfg := quorum.MakeEmptyConfig()
tr := tracker.MakeProgressTracker(&cfg, 10, 0)
c := &Changer{
Config: cfg,
Tracker: tr,
LastIndex: 10,
Config: cfg,
ProgressMap: tracker.MakeProgressTracker(&cfg).Progress,
MaxInflight: 10,
MaxInflightBytes: 0,
LastIndex: 10,
}

if err := runWithSimple(c, setup); err != nil {
Expand All @@ -120,7 +121,7 @@ func TestConfChangeQuick(t *testing.T) {
t.Log("initial setup:", Describe(setup...))
t.Log("changes:", Describe(ccs...))
t.Log(c.Config)
t.Log(c.Tracker.Progress)
t.Log(c.ProgressMap)
}
n++
return c
Expand Down
6 changes: 3 additions & 3 deletions pkg/raft/confchange/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,14 @@ func chain(
chg Changer, ops ...func(Changer) (quorum.Config, tracker.ProgressMap, error),
) (quorum.Config, tracker.ProgressMap, error) {
for _, op := range ops {
cfg, trk, err := op(chg)
cfg, progressMap, err := op(chg)
if err != nil {
return quorum.Config{}, nil, err
}
chg.Config = cfg
chg.Tracker.Progress = trk
chg.ProgressMap = progressMap
}
return chg.Config, chg.Tracker.Progress, nil
return chg.Config, chg.ProgressMap, nil
}

// Restore takes a Changer (which must represent an empty configuration), and
Expand Down
10 changes: 5 additions & 5 deletions pkg/raft/confchange/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,17 @@ func TestRestore(t *testing.T) {
f := func(cs pb.ConfState) bool {
cfg := quorum.MakeEmptyConfig()
chg := Changer{
Config: cfg,
Tracker: tracker.MakeProgressTracker(&cfg, 20, 0),
LastIndex: 10,
Config: cfg,
ProgressMap: tracker.MakeProgressTracker(&cfg).Progress,
LastIndex: 10,
}
cfg, trk, err := Restore(chg, cs)
cfg, progressMap, err := Restore(chg, cs)
if err != nil {
t.Error(err)
return false
}
chg.Config = cfg
chg.Tracker.Progress = trk
chg.ProgressMap = progressMap

for _, sl := range [][]pb.PeerID{
cs.Voters,
Expand Down
38 changes: 24 additions & 14 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,10 @@ type raft struct {
// only leader keeps heartbeatElapsed.
heartbeatElapsed int

checkQuorum bool
preVote bool
maxInflight int
maxInflightBytes uint64
checkQuorum bool
preVote bool

heartbeatTimeout int
electionTimeout int
Expand Down Expand Up @@ -422,6 +424,8 @@ func newRaft(c *Config) *raft {
electionTimeout: c.ElectionTick,
heartbeatTimeout: c.HeartbeatTick,
logger: c.Logger,
maxInflight: c.MaxInflightMsgs,
maxInflightBytes: c.MaxInflightBytes,
checkQuorum: c.CheckQuorum,
preVote: c.PreVote,
disableProposalForwarding: c.DisableProposalForwarding,
Expand All @@ -431,12 +435,14 @@ func newRaft(c *Config) *raft {
}
lastID := r.raftLog.lastEntryID()

r.trk = tracker.MakeProgressTracker(&r.config, c.MaxInflightMsgs, c.MaxInflightBytes)
r.trk = tracker.MakeProgressTracker(&r.config)

cfg, trk, err := confchange.Restore(confchange.Changer{
Config: r.config,
Tracker: r.trk,
LastIndex: lastID.index,
Config: r.config,
ProgressMap: r.trk.Progress,
MaxInflight: r.maxInflight,
MaxInflightBytes: r.maxInflightBytes,
LastIndex: lastID.index,
}, cs)
if err != nil {
panic(err)
Expand Down Expand Up @@ -762,7 +768,7 @@ func (r *raft) reset(term uint64) {
*pr = tracker.Progress{
Match: 0,
Next: r.raftLog.lastIndex() + 1,
Inflights: tracker.NewInflights(r.trk.MaxInflight, r.trk.MaxInflightBytes),
Inflights: tracker.NewInflights(r.maxInflight, r.maxInflightBytes),
IsLearner: pr.IsLearner,
}
if id == r.id {
Expand Down Expand Up @@ -1904,11 +1910,13 @@ func (r *raft) restore(s snapshot) bool {

// Reset the configuration and add the (potentially updated) peers in anew.
r.config = quorum.MakeEmptyConfig()
r.trk = tracker.MakeProgressTracker(&r.config, r.trk.MaxInflight, r.trk.MaxInflightBytes)
r.trk = tracker.MakeProgressTracker(&r.config)
cfg, trk, err := confchange.Restore(confchange.Changer{
Config: r.config,
Tracker: r.trk,
LastIndex: r.raftLog.lastIndex(),
Config: r.config,
ProgressMap: r.trk.Progress,
MaxInflight: r.maxInflight,
MaxInflightBytes: r.maxInflightBytes,
LastIndex: r.raftLog.lastIndex(),
}, cs)

if err != nil {
Expand All @@ -1935,9 +1943,11 @@ func (r *raft) promotable() bool {
func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState {
cfg, trk, err := func() (quorum.Config, tracker.ProgressMap, error) {
changer := confchange.Changer{
Config: r.config,
Tracker: r.trk,
LastIndex: r.raftLog.lastIndex(),
Config: r.config,
ProgressMap: r.trk.Progress,
MaxInflight: r.maxInflight,
MaxInflightBytes: r.maxInflightBytes,
LastIndex: r.raftLog.lastIndex(),
}
if cc.LeaveJoint() {
return changer.LeaveJoint()
Expand Down
8 changes: 4 additions & 4 deletions pkg/raft/raft_flow_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestMsgAppFlowControlFull(t *testing.T) {
// force the progress to be in replicate state
pr2.BecomeReplicate()
// fill in the inflights window
for i := 0; i < r.trk.MaxInflight; i++ {
for i := 0; i < r.maxInflight; i++ {
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
ms := r.readMessages()
if len(ms) != 1 || ms[0].Type != pb.MsgApp {
Expand Down Expand Up @@ -72,14 +72,14 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) {
// force the progress to be in replicate state
pr2.BecomeReplicate()
// fill in the inflights window
for i := 0; i < r.trk.MaxInflight; i++ {
for i := 0; i < r.maxInflight; i++ {
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
r.readMessages()
}

// 1 is noop, 2 is the first proposal we just sent.
// so we start with 2.
for tt := 2; tt < r.trk.MaxInflight; tt++ {
for tt := 2; tt < r.maxInflight; tt++ {
// move forward the window
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: uint64(tt)})
r.readMessages()
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) {
// force the progress to be in replicate state
pr2.BecomeReplicate()
// fill in the inflights window
for i := 0; i < r.trk.MaxInflight; i++ {
for i := 0; i < r.maxInflight; i++ {
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
r.readMessages()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3750,7 +3750,7 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw
learners[i] = true
}
v.id = id
v.trk = tracker.MakeProgressTracker(&v.config, v.trk.MaxInflight, v.trk.MaxInflightBytes)
v.trk = tracker.MakeProgressTracker(&v.config)
if len(learners) > 0 {
v.config.Learners = map[pb.PeerID]struct{}{}
}
Expand Down
13 changes: 4 additions & 9 deletions pkg/raft/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,14 @@ type ProgressTracker struct {
Progress ProgressMap

Votes map[pb.PeerID]bool

MaxInflight int
MaxInflightBytes uint64
}

// MakeProgressTracker initializes a ProgressTracker.
func MakeProgressTracker(config *quorum.Config, maxInflight int, maxBytes uint64) ProgressTracker {
func MakeProgressTracker(config *quorum.Config) ProgressTracker {
p := ProgressTracker{
MaxInflight: maxInflight,
MaxInflightBytes: maxBytes,
Config: config,
Votes: map[pb.PeerID]bool{},
Progress: map[pb.PeerID]*Progress{},
Config: config,
Votes: map[pb.PeerID]bool{},
Progress: map[pb.PeerID]*Progress{},
}
return p
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/roachpb/metadata_replicas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,10 @@ func TestReplicaDescriptorsCanMakeProgressRandom(t *testing.T) {
emptyCfg := quorum.MakeEmptyConfig()
cfg, _, err := confchange.Restore(
confchange.Changer{
Config: emptyCfg,
Tracker: tracker.MakeProgressTracker(&emptyCfg, 1, 0),
Config: emptyCfg,
ProgressMap: tracker.MakeProgressTracker(&emptyCfg).Progress,
MaxInflight: 1,
MaxInflightBytes: 0,
},
rng.ConfState(),
)
Expand Down

0 comments on commit ec7a9ce

Please sign in to comment.