Skip to content

Commit

Permalink
Merge #127312
Browse files Browse the repository at this point in the history
127312: raft: move Config onto the raft struct  r=nvanbenschoten a=arulajmani

See individual commits for details.

Co-authored-by: Arul Ajmani <[email protected]>
  • Loading branch information
craig[bot] and arulajmani committed Jul 19, 2024
2 parents 727ee2f + 6268e37 commit 05cbf0f
Show file tree
Hide file tree
Showing 18 changed files with 306 additions and 252 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ go_test(
"//pkg/multitenant/tenantcapabilities/tenantcapabilitiesauthorizer",
"//pkg/raft",
"//pkg/raft/confchange",
"//pkg/raft/quorum",
"//pkg/raft/raftpb",
"//pkg/raft/raftstoreliveness",
"//pkg/raft/tracker",
Expand Down
11 changes: 8 additions & 3 deletions pkg/kv/kvserver/client_atomic_membership_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/raft/confchange"
"github.com/cockroachdb/cockroach/pkg/raft/quorum"
"github.com/cockroachdb/cockroach/pkg/raft/tracker"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -86,9 +87,13 @@ func TestAtomicReplicationChange(t *testing.T) {
// Check that conf state is up to date. This can fail even though
// the descriptor already matches since the descriptor is updated
// a hair earlier.
cfg, _, err := confchange.Restore(confchange.Changer{
Tracker: tracker.MakeProgressTracker(1, 0),
LastIndex: 1,
cfg := quorum.MakeEmptyConfig()
cfg, _, err = confchange.Restore(confchange.Changer{
ProgressMap: tracker.MakeEmptyProgressMap(),
Config: cfg,
MaxInflight: 1,
MaxInflightBytes: 0,
LastIndex: 1,
}, desc.Replicas().ConfState())
require.NoError(t, err)
act := r.RaftStatus().Config.Voters
Expand Down
1 change: 1 addition & 0 deletions pkg/raft/confchange/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_test(
data = glob(["testdata/**"]),
embed = [":confchange"],
deps = [
"//pkg/raft/quorum",
"//pkg/raft/raftpb",
"//pkg/raft/tracker",
"@com_github_cockroachdb_datadriven//:datadriven",
Expand Down
49 changes: 26 additions & 23 deletions pkg/raft/confchange/confchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ import (
// refusing invalid configuration changes before they affect the active
// configuration.
type Changer struct {
Tracker tracker.ProgressTracker
LastIndex uint64
Config quorum.Config
ProgressMap tracker.ProgressMap
MaxInflight int
MaxInflightBytes uint64
LastIndex uint64
}

// EnterJoint verifies that the outgoing (=right) majority config of the joint
Expand All @@ -53,7 +56,7 @@ type Changer struct {
// [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf
func (c Changer) EnterJoint(
autoLeave bool, ccs ...pb.ConfChangeSingle,
) (tracker.Config, tracker.ProgressMap, error) {
) (quorum.Config, tracker.ProgressMap, error) {
cfg, trk, err := c.checkAndCopy()
if err != nil {
return c.err(err)
Expand Down Expand Up @@ -96,7 +99,7 @@ func (c Changer) EnterJoint(
// inserted into Learners.
//
// [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf
func (c Changer) LeaveJoint() (tracker.Config, tracker.ProgressMap, error) {
func (c Changer) LeaveJoint() (quorum.Config, tracker.ProgressMap, error) {
cfg, trk, err := c.checkAndCopy()
if err != nil {
return c.err(err)
Expand Down Expand Up @@ -130,7 +133,7 @@ func (c Changer) LeaveJoint() (tracker.Config, tracker.ProgressMap, error) {
// will return an error if that is not the case, if the resulting quorum is
// zero, or if the configuration is in a joint state (i.e. if there is an
// outgoing configuration).
func (c Changer) Simple(ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.ProgressMap, error) {
func (c Changer) Simple(ccs ...pb.ConfChangeSingle) (quorum.Config, tracker.ProgressMap, error) {
cfg, trk, err := c.checkAndCopy()
if err != nil {
return c.err(err)
Expand All @@ -142,8 +145,8 @@ func (c Changer) Simple(ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.Pro
if err := c.apply(&cfg, trk, ccs...); err != nil {
return c.err(err)
}
if n := symdiff(incoming(c.Tracker.Voters), incoming(cfg.Voters)); n > 1 {
return tracker.Config{}, nil, errors.New("more than one voter changed without entering joint config")
if n := symdiff(incoming(c.Config.Voters), incoming(cfg.Voters)); n > 1 {
return quorum.Config{}, nil, errors.New("more than one voter changed without entering joint config")
}

return checkAndReturn(cfg, trk)
Expand All @@ -153,7 +156,7 @@ func (c Changer) Simple(ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.Pro
// always made to the incoming majority config Voters[0]. Voters[1] is either
// empty or preserves the outgoing majority configuration while in a joint state.
func (c Changer) apply(
cfg *tracker.Config, trk tracker.ProgressMap, ccs ...pb.ConfChangeSingle,
cfg *quorum.Config, trk tracker.ProgressMap, ccs ...pb.ConfChangeSingle,
) error {
for _, cc := range ccs {
if cc.NodeID == 0 {
Expand Down Expand Up @@ -182,7 +185,7 @@ func (c Changer) apply(

// makeVoter adds or promotes the given ID to be a voter in the incoming
// majority config.
func (c Changer) makeVoter(cfg *tracker.Config, trk tracker.ProgressMap, id pb.PeerID) {
func (c Changer) makeVoter(cfg *quorum.Config, trk tracker.ProgressMap, id pb.PeerID) {
pr := trk[id]
if pr == nil {
c.initProgress(cfg, trk, id, false /* isLearner */)
Expand All @@ -208,7 +211,7 @@ func (c Changer) makeVoter(cfg *tracker.Config, trk tracker.ProgressMap, id pb.P
// simultaneously. Instead, we add the learner to LearnersNext, so that it will
// be added to Learners the moment the outgoing config is removed by
// LeaveJoint().
func (c Changer) makeLearner(cfg *tracker.Config, trk tracker.ProgressMap, id pb.PeerID) {
func (c Changer) makeLearner(cfg *quorum.Config, trk tracker.ProgressMap, id pb.PeerID) {
pr := trk[id]
if pr == nil {
c.initProgress(cfg, trk, id, true /* isLearner */)
Expand All @@ -235,7 +238,7 @@ func (c Changer) makeLearner(cfg *tracker.Config, trk tracker.ProgressMap, id pb
}

// remove this peer as a voter or learner from the incoming config.
func (c Changer) remove(cfg *tracker.Config, trk tracker.ProgressMap, id pb.PeerID) {
func (c Changer) remove(cfg *quorum.Config, trk tracker.ProgressMap, id pb.PeerID) {
if _, ok := trk[id]; !ok {
return
}
Expand All @@ -252,7 +255,7 @@ func (c Changer) remove(cfg *tracker.Config, trk tracker.ProgressMap, id pb.Peer

// initProgress initializes a new progress for the given node or learner.
func (c Changer) initProgress(
cfg *tracker.Config, trk tracker.ProgressMap, id pb.PeerID, isLearner bool,
cfg *quorum.Config, trk tracker.ProgressMap, id pb.PeerID, isLearner bool,
) {
if !isLearner {
incoming(cfg.Voters)[id] = struct{}{}
Expand All @@ -270,7 +273,7 @@ func (c Changer) initProgress(
// making the first index the better choice).
Match: 0,
Next: max(c.LastIndex, 1), // invariant: Match < Next
Inflights: tracker.NewInflights(c.Tracker.MaxInflight, c.Tracker.MaxInflightBytes),
Inflights: tracker.NewInflights(c.MaxInflight, c.MaxInflightBytes),
IsLearner: isLearner,
// When a node is first added, we should mark it as recently active.
// Otherwise, CheckQuorum may cause us to step down if it is invoked
Expand All @@ -282,7 +285,7 @@ func (c Changer) initProgress(
// checkInvariants makes sure that the config and progress are compatible with
// each other. This is used to check both what the Changer is initialized with,
// as well as what it returns.
func checkInvariants(cfg tracker.Config, trk tracker.ProgressMap) error {
func checkInvariants(cfg quorum.Config, trk tracker.ProgressMap) error {
// NB: intentionally allow the empty config. In production we'll never see a
// non-empty config (we prevent it from being created) but we will need to
// be able to *create* an initial config, for example during bootstrap (or
Expand Down Expand Up @@ -343,11 +346,11 @@ func checkInvariants(cfg tracker.Config, trk tracker.ProgressMap) error {
// checkAndCopy copies the tracker's config and progress map (deeply enough for
// the purposes of the Changer) and returns those copies. It returns an error
// if checkInvariants does.
func (c Changer) checkAndCopy() (tracker.Config, tracker.ProgressMap, error) {
cfg := c.Tracker.Config.Clone()
func (c Changer) checkAndCopy() (quorum.Config, tracker.ProgressMap, error) {
cfg := c.Config.Clone()
trk := tracker.ProgressMap{}

for id, pr := range c.Tracker.Progress {
for id, pr := range c.ProgressMap {
// A shallow copy is enough because we only mutate the Learner field.
ppr := *pr
trk[id] = &ppr
Expand All @@ -358,17 +361,17 @@ func (c Changer) checkAndCopy() (tracker.Config, tracker.ProgressMap, error) {
// checkAndReturn calls checkInvariants on the input and returns either the
// resulting error or the input.
func checkAndReturn(
cfg tracker.Config, trk tracker.ProgressMap,
) (tracker.Config, tracker.ProgressMap, error) {
cfg quorum.Config, trk tracker.ProgressMap,
) (quorum.Config, tracker.ProgressMap, error) {
if err := checkInvariants(cfg, trk); err != nil {
return tracker.Config{}, tracker.ProgressMap{}, err
return quorum.Config{}, tracker.ProgressMap{}, err
}
return cfg, trk, nil
}

// err returns zero values and an error.
func (c Changer) err(err error) (tracker.Config, tracker.ProgressMap, error) {
return tracker.Config{}, nil, err
func (c Changer) err(err error) (quorum.Config, tracker.ProgressMap, error) {
return quorum.Config{}, nil, err
}

// nilAwareAdd populates a map entry, creating the map if necessary.
Expand Down Expand Up @@ -408,7 +411,7 @@ func symdiff(l, r map[pb.PeerID]struct{}) int {
return n
}

func joint(cfg tracker.Config) bool {
func joint(cfg quorum.Config) bool {
return len(outgoing(cfg.Voters)) > 0
}

Expand Down
23 changes: 13 additions & 10 deletions pkg/raft/confchange/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,20 @@ import (
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/raft/quorum"
pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/raft/tracker"
"github.com/cockroachdb/datadriven"
)

func TestConfChangeDataDriven(t *testing.T) {
datadriven.Walk(t, "testdata", func(t *testing.T, path string) {
tr := tracker.MakeProgressTracker(10, 0)
c := Changer{
Tracker: tr,
LastIndex: 0, // incremented in this test with each cmd
Config: quorum.MakeEmptyConfig(),
ProgressMap: tracker.MakeEmptyProgressMap(),
MaxInflight: 10,
MaxInflightBytes: 0,
LastIndex: 0, // incremented in this test with each cmd
}

// The test files use the commands
Expand Down Expand Up @@ -81,32 +84,32 @@ func TestConfChangeDataDriven(t *testing.T) {
ccs = append(ccs, cc)
}

var cfg tracker.Config
var trk tracker.ProgressMap
var cfg quorum.Config
var progressMap tracker.ProgressMap
var err error
switch d.Cmd {
case "simple":
cfg, trk, err = c.Simple(ccs...)
cfg, progressMap, err = c.Simple(ccs...)
case "enter-joint":
var autoLeave bool
if len(d.CmdArgs) > 0 {
d.ScanArgs(t, "autoleave", &autoLeave)
}
cfg, trk, err = c.EnterJoint(autoLeave, ccs...)
cfg, progressMap, err = c.EnterJoint(autoLeave, ccs...)
case "leave-joint":
if len(ccs) > 0 {
err = errors.New("this command takes no input")
} else {
cfg, trk, err = c.LeaveJoint()
cfg, progressMap, err = c.LeaveJoint()
}
default:
return "unknown command"
}
if err != nil {
return err.Error() + "\n"
}
c.Tracker.Config, c.Tracker.Progress = cfg, trk
return fmt.Sprintf("%s\n%s", c.Tracker.Config, c.Tracker.Progress)
c.Config, c.ProgressMap = cfg, progressMap
return fmt.Sprintf("%s\n%s", c.Config, c.ProgressMap)
})
})
}
43 changes: 23 additions & 20 deletions pkg/raft/confchange/quick_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"testing"
"testing/quick"

"github.com/cockroachdb/cockroach/pkg/raft/quorum"
pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/raft/tracker"
)
Expand All @@ -40,7 +41,7 @@ func TestConfChangeQuick(t *testing.T) {
const infoCount = 5

runWithJoint := func(c *Changer, ccs []pb.ConfChangeSingle) error {
cfg, trk, err := c.EnterJoint(false /* autoLeave */, ccs...)
cfg, progressMap, err := c.EnterJoint(false /* autoLeave */, ccs...)
if err != nil {
return err
}
Expand All @@ -51,29 +52,29 @@ func TestConfChangeQuick(t *testing.T) {
return err
}
cfg2a.AutoLeave = false
if !reflect.DeepEqual(cfg, cfg2a) || !reflect.DeepEqual(trk, trk2a) {
return fmt.Errorf("cfg: %+v\ncfg2a: %+v\ntrk: %+v\ntrk2a: %+v",
cfg, cfg2a, trk, trk2a)
if !reflect.DeepEqual(cfg, cfg2a) || !reflect.DeepEqual(progressMap, trk2a) {
return fmt.Errorf("cfg: %+v\ncfg2a: %+v\nprogressMap: %+v\ntrk2a: %+v",
cfg, cfg2a, progressMap, trk2a)
}
c.Tracker.Config = cfg
c.Tracker.Progress = trk
c.Config = cfg
c.ProgressMap = progressMap
cfg2b, trk2b, err := c.LeaveJoint()
if err != nil {
return err
}
// Reset back to the main branch with autoLeave=false.
c.Tracker.Config = cfg
c.Tracker.Progress = trk
cfg, trk, err = c.LeaveJoint()
c.Config = cfg
c.ProgressMap = progressMap
cfg, progressMap, err = c.LeaveJoint()
if err != nil {
return err
}
if !reflect.DeepEqual(cfg, cfg2b) || !reflect.DeepEqual(trk, trk2b) {
return fmt.Errorf("cfg: %+v\ncfg2b: %+v\ntrk: %+v\ntrk2b: %+v",
cfg, cfg2b, trk, trk2b)
if !reflect.DeepEqual(cfg, cfg2b) || !reflect.DeepEqual(progressMap, trk2b) {
return fmt.Errorf("cfg: %+v\ncfg2b: %+v\nprogressMap: %+v\ntrk2b: %+v",
cfg, cfg2b, progressMap, trk2b)
}
c.Tracker.Config = cfg
c.Tracker.Progress = trk
c.Config = cfg
c.ProgressMap = progressMap
return nil
}

Expand All @@ -83,7 +84,7 @@ func TestConfChangeQuick(t *testing.T) {
if err != nil {
return err
}
c.Tracker.Config, c.Tracker.Progress = cfg, trk
c.Config, c.ProgressMap = cfg, trk
}
return nil
}
Expand All @@ -92,10 +93,12 @@ func TestConfChangeQuick(t *testing.T) {

wrapper := func(invoke testFunc) func(setup initialChanges, ccs confChanges) (*Changer, error) {
return func(setup initialChanges, ccs confChanges) (*Changer, error) {
tr := tracker.MakeProgressTracker(10, 0)
c := &Changer{
Tracker: tr,
LastIndex: 10,
Config: quorum.MakeEmptyConfig(),
ProgressMap: tracker.MakeEmptyProgressMap(),
MaxInflight: 10,
MaxInflightBytes: 0,
LastIndex: 10,
}

if err := runWithSimple(c, setup); err != nil {
Expand All @@ -116,8 +119,8 @@ func TestConfChangeQuick(t *testing.T) {
if n < infoCount {
t.Log("initial setup:", Describe(setup...))
t.Log("changes:", Describe(ccs...))
t.Log(c.Tracker.Config)
t.Log(c.Tracker.Progress)
t.Log(c.Config)
t.Log(c.ProgressMap)
}
n++
return c
Expand Down
Loading

0 comments on commit 05cbf0f

Please sign in to comment.