From ec7a9ce02f16aacd965618e4ae38c48847e36604 Mon Sep 17 00:00:00 2001 From: Arul Ajmani Date: Thu, 18 Jul 2024 18:46:06 -0400 Subject: [PATCH] raft: use ProgressMap instead of ProgressTracker for config changes This patch pulls MaxInflight and MaxInflightBytes up a level, on to the raft struct. This then allows us to only copy over the ProgressMap, instead of the entire ProgressTracker, when constructing a Changer for config changes. This way, there's only one Config on the changer (which is updated during a configuration change), lest we access the wrong one. Informs https://github.com/cockroachdb/cockroach/issues/125265 Release note: None --- .../client_atomic_membership_change_test.go | 8 ++-- pkg/raft/confchange/confchange.go | 12 +++--- pkg/raft/confchange/datadriven_test.go | 21 +++++----- pkg/raft/confchange/quick_test.go | 35 ++++++++--------- pkg/raft/confchange/restore.go | 6 +-- pkg/raft/confchange/restore_test.go | 10 ++--- pkg/raft/raft.go | 38 ++++++++++++------- pkg/raft/raft_flow_control_test.go | 8 ++-- pkg/raft/raft_test.go | 2 +- pkg/raft/tracker/tracker.go | 13 ++----- pkg/roachpb/metadata_replicas_test.go | 6 ++- 11 files changed, 86 insertions(+), 73 deletions(-) diff --git a/pkg/kv/kvserver/client_atomic_membership_change_test.go b/pkg/kv/kvserver/client_atomic_membership_change_test.go index 0e1348c1389e..6410ec78ef3f 100644 --- a/pkg/kv/kvserver/client_atomic_membership_change_test.go +++ b/pkg/kv/kvserver/client_atomic_membership_change_test.go @@ -89,9 +89,11 @@ func TestAtomicReplicationChange(t *testing.T) { // a hair earlier. cfg := quorum.MakeEmptyConfig() cfg, _, err = confchange.Restore(confchange.Changer{ - Tracker: tracker.MakeProgressTracker(&cfg, 1, 0), - Config: cfg, - LastIndex: 1, + ProgressMap: tracker.MakeProgressTracker(&cfg).Progress, + Config: cfg, + MaxInflight: 1, + MaxInflightBytes: 0, + LastIndex: 1, }, desc.Replicas().ConfState()) require.NoError(t, err) act := r.RaftStatus().Config.Voters diff --git a/pkg/raft/confchange/confchange.go b/pkg/raft/confchange/confchange.go index 06378db6671a..8e204b4ffc3d 100644 --- a/pkg/raft/confchange/confchange.go +++ b/pkg/raft/confchange/confchange.go @@ -32,9 +32,11 @@ import ( // refusing invalid configuration changes before they affect the active // configuration. type Changer struct { - Config quorum.Config - Tracker tracker.ProgressTracker - LastIndex uint64 + Config quorum.Config + ProgressMap tracker.ProgressMap + MaxInflight int + MaxInflightBytes uint64 + LastIndex uint64 } // EnterJoint verifies that the outgoing (=right) majority config of the joint @@ -271,7 +273,7 @@ func (c Changer) initProgress( // making the first index the better choice). Match: 0, Next: max(c.LastIndex, 1), // invariant: Match < Next - Inflights: tracker.NewInflights(c.Tracker.MaxInflight, c.Tracker.MaxInflightBytes), + Inflights: tracker.NewInflights(c.MaxInflight, c.MaxInflightBytes), IsLearner: isLearner, // When a node is first added, we should mark it as recently active. // Otherwise, CheckQuorum may cause us to step down if it is invoked @@ -348,7 +350,7 @@ func (c Changer) checkAndCopy() (quorum.Config, tracker.ProgressMap, error) { cfg := c.Config.Clone() trk := tracker.ProgressMap{} - for id, pr := range c.Tracker.Progress { + for id, pr := range c.ProgressMap { // A shallow copy is enough because we only mutate the Learner field. ppr := *pr trk[id] = &ppr diff --git a/pkg/raft/confchange/datadriven_test.go b/pkg/raft/confchange/datadriven_test.go index dc983f64fb2a..9bf2cd0d151a 100644 --- a/pkg/raft/confchange/datadriven_test.go +++ b/pkg/raft/confchange/datadriven_test.go @@ -33,11 +33,12 @@ import ( func TestConfChangeDataDriven(t *testing.T) { datadriven.Walk(t, "testdata", func(t *testing.T, path string) { cfg := quorum.MakeEmptyConfig() - tr := tracker.MakeProgressTracker(&cfg, 10, 0) c := Changer{ - Config: cfg, - Tracker: tr, - LastIndex: 0, // incremented in this test with each cmd + Config: cfg, + ProgressMap: tracker.MakeProgressTracker(&cfg).Progress, + MaxInflight: 10, + MaxInflightBytes: 0, + LastIndex: 0, // incremented in this test with each cmd } // The test files use the commands @@ -85,22 +86,22 @@ func TestConfChangeDataDriven(t *testing.T) { } var cfg quorum.Config - var trk tracker.ProgressMap + var progressMap tracker.ProgressMap var err error switch d.Cmd { case "simple": - cfg, trk, err = c.Simple(ccs...) + cfg, progressMap, err = c.Simple(ccs...) case "enter-joint": var autoLeave bool if len(d.CmdArgs) > 0 { d.ScanArgs(t, "autoleave", &autoLeave) } - cfg, trk, err = c.EnterJoint(autoLeave, ccs...) + cfg, progressMap, err = c.EnterJoint(autoLeave, ccs...) case "leave-joint": if len(ccs) > 0 { err = errors.New("this command takes no input") } else { - cfg, trk, err = c.LeaveJoint() + cfg, progressMap, err = c.LeaveJoint() } default: return "unknown command" @@ -108,8 +109,8 @@ func TestConfChangeDataDriven(t *testing.T) { if err != nil { return err.Error() + "\n" } - c.Config, c.Tracker.Progress = cfg, trk - return fmt.Sprintf("%s\n%s", c.Config, c.Tracker.Progress) + c.Config, c.ProgressMap = cfg, progressMap + return fmt.Sprintf("%s\n%s", c.Config, c.ProgressMap) }) }) } diff --git a/pkg/raft/confchange/quick_test.go b/pkg/raft/confchange/quick_test.go index 021fb1006a89..72ca0b4aea28 100644 --- a/pkg/raft/confchange/quick_test.go +++ b/pkg/raft/confchange/quick_test.go @@ -41,7 +41,7 @@ func TestConfChangeQuick(t *testing.T) { const infoCount = 5 runWithJoint := func(c *Changer, ccs []pb.ConfChangeSingle) error { - cfg, trk, err := c.EnterJoint(false /* autoLeave */, ccs...) + cfg, progressMap, err := c.EnterJoint(false /* autoLeave */, ccs...) if err != nil { return err } @@ -52,29 +52,29 @@ func TestConfChangeQuick(t *testing.T) { return err } cfg2a.AutoLeave = false - if !reflect.DeepEqual(cfg, cfg2a) || !reflect.DeepEqual(trk, trk2a) { - return fmt.Errorf("cfg: %+v\ncfg2a: %+v\ntrk: %+v\ntrk2a: %+v", - cfg, cfg2a, trk, trk2a) + if !reflect.DeepEqual(cfg, cfg2a) || !reflect.DeepEqual(progressMap, trk2a) { + return fmt.Errorf("cfg: %+v\ncfg2a: %+v\nprogressMap: %+v\ntrk2a: %+v", + cfg, cfg2a, progressMap, trk2a) } c.Config = cfg - c.Tracker.Progress = trk + c.ProgressMap = progressMap cfg2b, trk2b, err := c.LeaveJoint() if err != nil { return err } // Reset back to the main branch with autoLeave=false. c.Config = cfg - c.Tracker.Progress = trk - cfg, trk, err = c.LeaveJoint() + c.ProgressMap = progressMap + cfg, progressMap, err = c.LeaveJoint() if err != nil { return err } - if !reflect.DeepEqual(cfg, cfg2b) || !reflect.DeepEqual(trk, trk2b) { - return fmt.Errorf("cfg: %+v\ncfg2b: %+v\ntrk: %+v\ntrk2b: %+v", - cfg, cfg2b, trk, trk2b) + if !reflect.DeepEqual(cfg, cfg2b) || !reflect.DeepEqual(progressMap, trk2b) { + return fmt.Errorf("cfg: %+v\ncfg2b: %+v\nprogressMap: %+v\ntrk2b: %+v", + cfg, cfg2b, progressMap, trk2b) } c.Config = cfg - c.Tracker.Progress = trk + c.ProgressMap = progressMap return nil } @@ -84,7 +84,7 @@ func TestConfChangeQuick(t *testing.T) { if err != nil { return err } - c.Config, c.Tracker.Progress = cfg, trk + c.Config, c.ProgressMap = cfg, trk } return nil } @@ -94,11 +94,12 @@ func TestConfChangeQuick(t *testing.T) { wrapper := func(invoke testFunc) func(setup initialChanges, ccs confChanges) (*Changer, error) { return func(setup initialChanges, ccs confChanges) (*Changer, error) { cfg := quorum.MakeEmptyConfig() - tr := tracker.MakeProgressTracker(&cfg, 10, 0) c := &Changer{ - Config: cfg, - Tracker: tr, - LastIndex: 10, + Config: cfg, + ProgressMap: tracker.MakeProgressTracker(&cfg).Progress, + MaxInflight: 10, + MaxInflightBytes: 0, + LastIndex: 10, } if err := runWithSimple(c, setup); err != nil { @@ -120,7 +121,7 @@ func TestConfChangeQuick(t *testing.T) { t.Log("initial setup:", Describe(setup...)) t.Log("changes:", Describe(ccs...)) t.Log(c.Config) - t.Log(c.Tracker.Progress) + t.Log(c.ProgressMap) } n++ return c diff --git a/pkg/raft/confchange/restore.go b/pkg/raft/confchange/restore.go index 5d85a35466ec..5ac80a3c25b4 100644 --- a/pkg/raft/confchange/restore.go +++ b/pkg/raft/confchange/restore.go @@ -104,14 +104,14 @@ func chain( chg Changer, ops ...func(Changer) (quorum.Config, tracker.ProgressMap, error), ) (quorum.Config, tracker.ProgressMap, error) { for _, op := range ops { - cfg, trk, err := op(chg) + cfg, progressMap, err := op(chg) if err != nil { return quorum.Config{}, nil, err } chg.Config = cfg - chg.Tracker.Progress = trk + chg.ProgressMap = progressMap } - return chg.Config, chg.Tracker.Progress, nil + return chg.Config, chg.ProgressMap, nil } // Restore takes a Changer (which must represent an empty configuration), and diff --git a/pkg/raft/confchange/restore_test.go b/pkg/raft/confchange/restore_test.go index ca18adb4400c..58e4dbc30ceb 100644 --- a/pkg/raft/confchange/restore_test.go +++ b/pkg/raft/confchange/restore_test.go @@ -91,17 +91,17 @@ func TestRestore(t *testing.T) { f := func(cs pb.ConfState) bool { cfg := quorum.MakeEmptyConfig() chg := Changer{ - Config: cfg, - Tracker: tracker.MakeProgressTracker(&cfg, 20, 0), - LastIndex: 10, + Config: cfg, + ProgressMap: tracker.MakeProgressTracker(&cfg).Progress, + LastIndex: 10, } - cfg, trk, err := Restore(chg, cs) + cfg, progressMap, err := Restore(chg, cs) if err != nil { t.Error(err) return false } chg.Config = cfg - chg.Tracker.Progress = trk + chg.ProgressMap = progressMap for _, sl := range [][]pb.PeerID{ cs.Voters, diff --git a/pkg/raft/raft.go b/pkg/raft/raft.go index 200d2acc3474..348c8ba1311c 100644 --- a/pkg/raft/raft.go +++ b/pkg/raft/raft.go @@ -382,8 +382,10 @@ type raft struct { // only leader keeps heartbeatElapsed. heartbeatElapsed int - checkQuorum bool - preVote bool + maxInflight int + maxInflightBytes uint64 + checkQuorum bool + preVote bool heartbeatTimeout int electionTimeout int @@ -422,6 +424,8 @@ func newRaft(c *Config) *raft { electionTimeout: c.ElectionTick, heartbeatTimeout: c.HeartbeatTick, logger: c.Logger, + maxInflight: c.MaxInflightMsgs, + maxInflightBytes: c.MaxInflightBytes, checkQuorum: c.CheckQuorum, preVote: c.PreVote, disableProposalForwarding: c.DisableProposalForwarding, @@ -431,12 +435,14 @@ func newRaft(c *Config) *raft { } lastID := r.raftLog.lastEntryID() - r.trk = tracker.MakeProgressTracker(&r.config, c.MaxInflightMsgs, c.MaxInflightBytes) + r.trk = tracker.MakeProgressTracker(&r.config) cfg, trk, err := confchange.Restore(confchange.Changer{ - Config: r.config, - Tracker: r.trk, - LastIndex: lastID.index, + Config: r.config, + ProgressMap: r.trk.Progress, + MaxInflight: r.maxInflight, + MaxInflightBytes: r.maxInflightBytes, + LastIndex: lastID.index, }, cs) if err != nil { panic(err) @@ -762,7 +768,7 @@ func (r *raft) reset(term uint64) { *pr = tracker.Progress{ Match: 0, Next: r.raftLog.lastIndex() + 1, - Inflights: tracker.NewInflights(r.trk.MaxInflight, r.trk.MaxInflightBytes), + Inflights: tracker.NewInflights(r.maxInflight, r.maxInflightBytes), IsLearner: pr.IsLearner, } if id == r.id { @@ -1904,11 +1910,13 @@ func (r *raft) restore(s snapshot) bool { // Reset the configuration and add the (potentially updated) peers in anew. r.config = quorum.MakeEmptyConfig() - r.trk = tracker.MakeProgressTracker(&r.config, r.trk.MaxInflight, r.trk.MaxInflightBytes) + r.trk = tracker.MakeProgressTracker(&r.config) cfg, trk, err := confchange.Restore(confchange.Changer{ - Config: r.config, - Tracker: r.trk, - LastIndex: r.raftLog.lastIndex(), + Config: r.config, + ProgressMap: r.trk.Progress, + MaxInflight: r.maxInflight, + MaxInflightBytes: r.maxInflightBytes, + LastIndex: r.raftLog.lastIndex(), }, cs) if err != nil { @@ -1935,9 +1943,11 @@ func (r *raft) promotable() bool { func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState { cfg, trk, err := func() (quorum.Config, tracker.ProgressMap, error) { changer := confchange.Changer{ - Config: r.config, - Tracker: r.trk, - LastIndex: r.raftLog.lastIndex(), + Config: r.config, + ProgressMap: r.trk.Progress, + MaxInflight: r.maxInflight, + MaxInflightBytes: r.maxInflightBytes, + LastIndex: r.raftLog.lastIndex(), } if cc.LeaveJoint() { return changer.LeaveJoint() diff --git a/pkg/raft/raft_flow_control_test.go b/pkg/raft/raft_flow_control_test.go index 191431c7055d..acd6aa6c858f 100644 --- a/pkg/raft/raft_flow_control_test.go +++ b/pkg/raft/raft_flow_control_test.go @@ -36,7 +36,7 @@ func TestMsgAppFlowControlFull(t *testing.T) { // force the progress to be in replicate state pr2.BecomeReplicate() // fill in the inflights window - for i := 0; i < r.trk.MaxInflight; i++ { + for i := 0; i < r.maxInflight; i++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) ms := r.readMessages() if len(ms) != 1 || ms[0].Type != pb.MsgApp { @@ -72,14 +72,14 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) { // force the progress to be in replicate state pr2.BecomeReplicate() // fill in the inflights window - for i := 0; i < r.trk.MaxInflight; i++ { + for i := 0; i < r.maxInflight; i++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) r.readMessages() } // 1 is noop, 2 is the first proposal we just sent. // so we start with 2. - for tt := 2; tt < r.trk.MaxInflight; tt++ { + for tt := 2; tt < r.maxInflight; tt++ { // move forward the window r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: uint64(tt)}) r.readMessages() @@ -117,7 +117,7 @@ func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) { // force the progress to be in replicate state pr2.BecomeReplicate() // fill in the inflights window - for i := 0; i < r.trk.MaxInflight; i++ { + for i := 0; i < r.maxInflight; i++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) r.readMessages() } diff --git a/pkg/raft/raft_test.go b/pkg/raft/raft_test.go index 47eed59e5f95..f1ab34cc93bd 100644 --- a/pkg/raft/raft_test.go +++ b/pkg/raft/raft_test.go @@ -3750,7 +3750,7 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw learners[i] = true } v.id = id - v.trk = tracker.MakeProgressTracker(&v.config, v.trk.MaxInflight, v.trk.MaxInflightBytes) + v.trk = tracker.MakeProgressTracker(&v.config) if len(learners) > 0 { v.config.Learners = map[pb.PeerID]struct{}{} } diff --git a/pkg/raft/tracker/tracker.go b/pkg/raft/tracker/tracker.go index 551e6c9515fc..4393e65a2f06 100644 --- a/pkg/raft/tracker/tracker.go +++ b/pkg/raft/tracker/tracker.go @@ -34,19 +34,14 @@ type ProgressTracker struct { Progress ProgressMap Votes map[pb.PeerID]bool - - MaxInflight int - MaxInflightBytes uint64 } // MakeProgressTracker initializes a ProgressTracker. -func MakeProgressTracker(config *quorum.Config, maxInflight int, maxBytes uint64) ProgressTracker { +func MakeProgressTracker(config *quorum.Config) ProgressTracker { p := ProgressTracker{ - MaxInflight: maxInflight, - MaxInflightBytes: maxBytes, - Config: config, - Votes: map[pb.PeerID]bool{}, - Progress: map[pb.PeerID]*Progress{}, + Config: config, + Votes: map[pb.PeerID]bool{}, + Progress: map[pb.PeerID]*Progress{}, } return p } diff --git a/pkg/roachpb/metadata_replicas_test.go b/pkg/roachpb/metadata_replicas_test.go index f135838996a8..2a950629fe7d 100644 --- a/pkg/roachpb/metadata_replicas_test.go +++ b/pkg/roachpb/metadata_replicas_test.go @@ -340,8 +340,10 @@ func TestReplicaDescriptorsCanMakeProgressRandom(t *testing.T) { emptyCfg := quorum.MakeEmptyConfig() cfg, _, err := confchange.Restore( confchange.Changer{ - Config: emptyCfg, - Tracker: tracker.MakeProgressTracker(&emptyCfg, 1, 0), + Config: emptyCfg, + ProgressMap: tracker.MakeProgressTracker(&emptyCfg).Progress, + MaxInflight: 1, + MaxInflightBytes: 0, }, rng.ConfState(), )