From 888670c8caf587ad616ea686f91fd452335b2db7 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Wed, 31 Jul 2019 11:39:06 +0200 Subject: [PATCH] (do not review - look at #11002 #11003 #11004 instead) --- raft/confchange/confchange.go | 5 +- raft/confchange/datadriven_test.go | 35 +------- raft/confchange/restore.go | 124 +++++++++++++++++++++++++++++ raft/confchange/restore_test.go | 110 +++++++++++++++++++++++++ raft/quorum/majority.go | 6 +- raft/raft.go | 64 ++++++++------- raft/raftpb/confchange.go | 41 ++++++++++ raft/tracker/tracker.go | 39 ++++++++- raft/util.go | 7 -- 9 files changed, 354 insertions(+), 77 deletions(-) create mode 100644 raft/confchange/restore.go create mode 100644 raft/confchange/restore_test.go diff --git a/raft/confchange/confchange.go b/raft/confchange/confchange.go index bfb2033c941d..58c74bfb53c8 100644 --- a/raft/confchange/confchange.go +++ b/raft/confchange/confchange.go @@ -62,10 +62,7 @@ func (c Changer) EnterJoint(autoLeave bool, ccs ...pb.ConfChangeSingle) (tracker return c.err(err) } // Clear the outgoing config. - { - *outgoingPtr(&cfg.Voters) = quorum.MajorityConfig{} - - } + *outgoingPtr(&cfg.Voters) = quorum.MajorityConfig{} // Copy incoming to outgoing. for id := range incoming(cfg.Voters) { outgoing(cfg.Voters)[id] = struct{}{} diff --git a/raft/confchange/datadriven_test.go b/raft/confchange/datadriven_test.go index 063d927dac8c..c20b48c41a87 100644 --- a/raft/confchange/datadriven_test.go +++ b/raft/confchange/datadriven_test.go @@ -17,8 +17,6 @@ package confchange import ( "errors" "fmt" - "strconv" - "strings" "testing" "github.com/cockroachdb/datadriven" @@ -48,39 +46,14 @@ func TestConfChangeDataDriven(t *testing.T) { defer func() { c.LastIndex++ }() - var ccs []pb.ConfChangeSingle - toks := strings.Split(strings.TrimSpace(d.Input), " ") - if toks[0] == "" { - toks = nil - } - for _, tok := range toks { - if len(tok) < 2 { - return fmt.Sprintf("unknown token %s", tok) - } - var cc pb.ConfChangeSingle - switch tok[0] { - case 'v': - cc.Type = pb.ConfChangeAddNode - case 'l': - cc.Type = pb.ConfChangeAddLearnerNode - case 'r': - cc.Type = pb.ConfChangeRemoveNode - case 'u': - cc.Type = pb.ConfChangeUpdateNode - default: - return fmt.Sprintf("unknown input: %s", tok) - } - id, err := strconv.ParseUint(tok[1:], 10, 64) - if err != nil { - return err.Error() - } - cc.NodeID = id - ccs = append(ccs, cc) + + ccs, err := pb.ConfChangeFromString(d.Input) + if err != nil { + t.Fatal(err) } var cfg tracker.Config var prs tracker.ProgressMap - var err error switch d.Cmd { case "simple": cfg, prs, err = c.Simple(ccs...) diff --git a/raft/confchange/restore.go b/raft/confchange/restore.go new file mode 100644 index 000000000000..3b63efcd8038 --- /dev/null +++ b/raft/confchange/restore.go @@ -0,0 +1,124 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package confchange + +import ( + pb "go.etcd.io/etcd/raft/raftpb" + "go.etcd.io/etcd/raft/tracker" +) + +// toConfChangeSingle translates a conf state into 1) a slice of operations creating +// first the config that will become the outgoing one, and then the incoming one, and +// b) another slice that, when applied to the config resulted from 1), represents the +// ConfState. +func toConfChangeSingle(cs pb.ConfState) (out []pb.ConfChangeSingle, in []pb.ConfChangeSingle) { + for _, id := range cs.VotersOutgoing { + // If there are outgoing voters, first add them one by one so that the + // (non-joint) config has them all. + out = append(out, pb.ConfChangeSingle{ + Type: pb.ConfChangeAddNode, + NodeID: id, + }) + + } + + // NB: we're done constructing the outgoing slice, now on to the incoming + // one (which will apply on top of the config created by the outgoing slice). + + // First, we'll remove all of the outgoing voters. + for _, id := range cs.VotersOutgoing { + in = append(in, pb.ConfChangeSingle{ + Type: pb.ConfChangeRemoveNode, + NodeID: id, + }) + } + // Then we'll add the incoming voters and learners. + for _, id := range cs.Voters { + in = append(in, pb.ConfChangeSingle{ + Type: pb.ConfChangeAddNode, + NodeID: id, + }) + } + for _, id := range cs.Learners { + in = append(in, pb.ConfChangeSingle{ + Type: pb.ConfChangeAddLearnerNode, + NodeID: id, + }) + } + // Same for LearnersNext; these are nodes we want to be learners but which + // are currently voters in the outgoing config. + for _, id := range cs.LearnersNext { + in = append(in, pb.ConfChangeSingle{ + Type: pb.ConfChangeAddLearnerNode, + NodeID: id, + }) + } + return out, in +} + +func chain(chg Changer, ops ...func(Changer) (tracker.Config, tracker.ProgressMap, error)) (tracker.Config, tracker.ProgressMap, error) { + for _, op := range ops { + cfg, prs, err := op(chg) + if err != nil { + return tracker.Config{}, nil, err + } + chg.Tracker.Config = cfg + chg.Tracker.Progress = prs + } + return chg.Tracker.Config, chg.Tracker.Progress, nil +} + +// Restore takes a Changer (which must represent an empty configuration), and +// brings returns the state described by the supplied ConfState. +// +// TODO(tbg) it's silly that this takes a Changer. Unravel this by making sure +// the Changer only needs a ProgressMap (not a whole Tracker) at which point +// this can just take LastIndex and MaxInflight directly instead and cook up +// the results from that alone. +func Restore(chg Changer, cs pb.ConfState) (tracker.Config, tracker.ProgressMap, error) { + outgoing, incoming := toConfChangeSingle(cs) + + var ops []func(Changer) (tracker.Config, tracker.ProgressMap, error) + + // First, apply all of the changes of the outgoing config one by one (so that + // it temporarily becomes the incoming active config. For example, if the + // config is (1 2 3)&(2 3 4), this will establish (2 3 4)&(). + for _, cc := range outgoing { + cc := cc // loop-local copy + ops = append(ops, func(chg Changer) (tracker.Config, tracker.ProgressMap, error) { + return chg.Simple(cc) + }) + } + + if len(outgoing) > 0 { + // If we added anything above, enter the joint state, which rotates the + // above additions into the outgoing config, and adds the incoming config + // in. Continuing the example above, we'd get (1 2 3)&(2 3 4), i.e. the + // incoming operations would be removing 2,3,4 and then adding in 1,2,3. + ops = append(ops, func(chg Changer) (tracker.Config, tracker.ProgressMap, error) { + return chg.EnterJoint(cs.AutoLeave, incoming...) + }) + } else { + // No outgoing config, so just apply the incoming changes one by one. + for _, cc := range incoming { + cc := cc // loop-local copy + ops = append(ops, func(chg Changer) (tracker.Config, tracker.ProgressMap, error) { + return chg.Simple(cc) + }) + } + } + + return chain(chg, ops...) +} diff --git a/raft/confchange/restore_test.go b/raft/confchange/restore_test.go new file mode 100644 index 000000000000..bce489fb41d9 --- /dev/null +++ b/raft/confchange/restore_test.go @@ -0,0 +1,110 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package confchange + +import ( + "math/rand" + "reflect" + "sort" + "testing" + "testing/quick" + + pb "go.etcd.io/etcd/raft/raftpb" + "go.etcd.io/etcd/raft/tracker" +) + +type rndConfChange pb.ConfState + +// Generate creates a random (valid) ConfState for use with quickcheck. +func (_ rndConfChange) Generate(rand *rand.Rand, _ int) reflect.Value { + conv := func(sl []int) []uint64 { + out := make([]uint64, len(sl)) + for i := range sl { + out[i] = uint64(sl[i] + 1) + } + return out + } + var cs pb.ConfState + nVoters := 1 + rand.Intn(5) // at least one voter + nLearners := rand.Intn(5) + nRemovedVoters := rand.Intn(3) + + ids := conv(rand.Perm(2 * (nVoters + nLearners + nRemovedVoters))) + + cs.Voters = ids[:nVoters] + ids = ids[nVoters:] + + if nLearners > 0 { + cs.Learners = ids[:nLearners] + ids = ids[nLearners:] + } + + nOutgoingRetainedVoters := rand.Intn(nVoters) + if nOutgoingRetainedVoters > 0 || nRemovedVoters > 0 { + cs.VotersOutgoing = append([]uint64(nil), cs.Voters[:nOutgoingRetainedVoters]...) + cs.VotersOutgoing = append(cs.VotersOutgoing, ids[:nRemovedVoters]...) + } + // Next learners must be removed voters (i.e. demotions). + if nRemovedVoters > 0 { + nLearnersNext := rand.Intn(nRemovedVoters) + if nLearnersNext > 0 { + cs.LearnersNext = ids[:nLearnersNext] + } + } + + cs.AutoLeave = len(cs.VotersOutgoing) > 0 && rand.Intn(2) == 1 + return reflect.ValueOf(rndConfChange(cs)) +} + +func TestRestore(t *testing.T) { + cfg := quick.Config{MaxCount: 1000} + + f := func(rndCS rndConfChange) bool { + cs := pb.ConfState(rndCS) + chg := Changer{ + Tracker: tracker.MakeProgressTracker(20), + LastIndex: 10, + } + cfg, prs, err := Restore(chg, cs) + if err != nil { + t.Error(err) + return false + } + chg.Tracker.Config = cfg + chg.Tracker.Progress = prs + + for _, sl := range [][]uint64{ + cs.Voters, + cs.Learners, + cs.VotersOutgoing, + cs.LearnersNext, + } { + sort.Slice(sl, func(i, j int) bool { return sl[i] < sl[j] }) + } + + cs2 := chg.Tracker.ConfState() + if reflect.DeepEqual(cs, cs2) { + return true // success + } + t.Errorf(` +before: %+#v +after: %+#v`, cs, cs2) + return false + } + + if err := quick.Check(f, &cfg); err != nil { + t.Error(err) + } +} diff --git a/raft/quorum/majority.go b/raft/quorum/majority.go index 9b10e95838ec..a53f9a4d8f93 100644 --- a/raft/quorum/majority.go +++ b/raft/quorum/majority.go @@ -112,9 +112,7 @@ func (c MajorityConfig) Slice() []uint64 { return sl } -type uint64Slice []uint64 - -func insertionSort(sl uint64Slice) { +func insertionSort(sl []uint64) { a, b := 0, len(sl) for i := a + 1; i < b; i++ { for j := i; j > a && sl[j] < sl[j-1]; j-- { @@ -141,7 +139,7 @@ func (c MajorityConfig) CommittedIndex(l AckedIndexer) Index { // performance is a lesser concern (additionally the performance // implications of an allocation here are far from drastic). var stk [7]uint64 - srt := uint64Slice(stk[:]) + srt := []uint64(stk[:]) if cap(srt) < n { srt = make([]uint64, n) diff --git a/raft/raft.go b/raft/raft.go index 832bff368ab6..9823b30f10c5 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -263,7 +263,8 @@ type raft struct { maxMsgSize uint64 maxUncommittedSize uint64 - prs tracker.ProgressTracker + // TODO(tbg): rename to trk. + prs tracker.ProgressTracker state StateType @@ -327,18 +328,18 @@ func newRaft(c *Config) *raft { if err != nil { panic(err) // TODO(bdarnell) } - peers := c.peers - learners := c.learners - if len(cs.Voters) > 0 || len(cs.Learners) > 0 { - if len(peers) > 0 || len(learners) > 0 { + + if len(c.peers) > 0 || len(c.learners) > 0 { + if len(cs.Voters) > 0 || len(cs.Learners) > 0 { // TODO(bdarnell): the peers argument is always nil except in // tests; the argument should be removed and these tests should be // updated to specify their nodes through a snapshot. panic("cannot specify both newRaft(peers, learners) and ConfState.(Voters, Learners)") } - peers = cs.Voters - learners = cs.Learners + cs.Voters = c.peers + cs.Learners = c.learners } + r := &raft{ id: c.ID, lead: None, @@ -355,14 +356,14 @@ func newRaft(c *Config) *raft { readOnly: newReadOnly(c.ReadOnlyOption), disableProposalForwarding: c.DisableProposalForwarding, } - for _, p := range peers { - // Add node to active config. - r.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: p}.AsV2()) - } - for _, p := range learners { - // Add learner to active config. - r.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddLearnerNode, NodeID: p}.AsV2()) + + if r.prs.Config, r.prs.Progress, err = confchange.Restore(confchange.Changer{ + Tracker: r.prs, + LastIndex: raftlog.lastIndex(), + }, cs); err != nil { + panic(err) } + r.applyConfChangePost() if !isHardStateEqual(hs, emptyState) { r.loadState(hs) @@ -1418,13 +1419,20 @@ func (r *raft) restore(s pb.Snapshot) bool { // Reset the configuration and add the (potentially updated) peers in anew. r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight) - for _, id := range s.Metadata.ConfState.Voters { - r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddNode}.AsV2()) - } - for _, id := range s.Metadata.ConfState.Learners { - r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddLearnerNode}.AsV2()) + cfg, prs, err := confchange.Restore(confchange.Changer{ + Tracker: r.prs, + LastIndex: r.raftLog.lastIndex(), + }, cs) + + if err != nil { + // This should never happen. Either there's a bug in our config change + // handling or the client corrupted the conf change. + panic(fmt.Sprintf("unable to restore config %+v: %s", cs, err)) } + r.prs.Config, r.prs.Progress = cfg, prs + r.applyConfChangePost() + pr := r.prs.Progress[r.id] pr.MaybeUpdate(pr.Next - 1) // TODO(tbg): this is untested and likely unneeded @@ -1461,17 +1469,14 @@ func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState { r.prs.Config = cfg r.prs.Progress = prs + return r.applyConfChangePost() +} +// applyConfChangePost is called from restore and applyConfChange to update any +// updates resulting from having changed the active configuration. +func (r *raft) applyConfChangePost() pb.ConfState { r.logger.Infof("%x switched to configuration %s", r.id, r.prs.Config) - // Now that the configuration is updated, handle any side effects. - - cs := pb.ConfState{ - Voters: r.prs.Voters[0].Slice(), - VotersOutgoing: r.prs.Voters[1].Slice(), - Learners: quorum.MajorityConfig(r.prs.Learners).Slice(), - LearnersNext: quorum.MajorityConfig(r.prs.LearnersNext).Slice(), - AutoLeave: r.prs.AutoLeave, - } + cs := r.prs.ConfState() pr, ok := r.prs.Progress[r.id] // Update whether the node itself is a learner, resetting to false when the @@ -1496,6 +1501,9 @@ func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState { if r.state != StateLeader || len(cs.Voters) == 0 { return cs } + // TODO(tbg): bcastAppend needs to be called more aggressively. Even if + // nothing new is known committed to the leader, new followers may profit + // from getting the updated index proactively. if r.maybeCommit() { // The quorum size may have been reduced (but not to zero), so see if // any pending entries can be committed. diff --git a/raft/raftpb/confchange.go b/raft/raftpb/confchange.go index a91c18dc12c4..74a73dc1e49f 100644 --- a/raft/raftpb/confchange.go +++ b/raft/raftpb/confchange.go @@ -16,6 +16,8 @@ package raftpb import ( "fmt" + "strconv" + "strings" "github.com/gogo/protobuf/proto" ) @@ -103,3 +105,42 @@ func (c *ConfChangeV2) LeaveJoint() bool { cpy.Context = nil return proto.Equal(&cpy, &ConfChangeV2{}) } + +// ConfChangeFromString parses a Space-delimited sequence of operations into a +// slice of ConfChangeSingle. The supported operations are: +// - vn: make n a voter, +// - ln: make n a learner, +// - rn: remove n, and +// - un: update n. +func ConfChangeFromString(s string) ([]ConfChangeSingle, error) { + var ccs []ConfChangeSingle + toks := strings.Split(strings.TrimSpace(s), " ") + if toks[0] == "" { + toks = nil + } + for _, tok := range toks { + if len(tok) < 2 { + return nil, fmt.Errorf("unknown token %s", tok) + } + var cc ConfChangeSingle + switch tok[0] { + case 'v': + cc.Type = ConfChangeAddNode + case 'l': + cc.Type = ConfChangeAddLearnerNode + case 'r': + cc.Type = ConfChangeRemoveNode + case 'u': + cc.Type = ConfChangeUpdateNode + default: + return nil, fmt.Errorf("unknown input: %s", tok) + } + id, err := strconv.ParseUint(tok[1:], 10, 64) + if err != nil { + return nil, err + } + cc.NodeID = id + ccs = append(ccs, cc) + } + return ccs, nil +} diff --git a/raft/tracker/tracker.go b/raft/tracker/tracker.go index f67f3aa53dab..e5f51ba3fc36 100644 --- a/raft/tracker/tracker.go +++ b/raft/tracker/tracker.go @@ -20,6 +20,7 @@ import ( "strings" "go.etcd.io/etcd/raft/quorum" + pb "go.etcd.io/etcd/raft/raftpb" ) // Config reflects the configuration tracked in a ProgressTracker. @@ -141,6 +142,17 @@ func MakeProgressTracker(maxInflight int) ProgressTracker { return p } +// ConfState returns a ConfState representing the active configuration. +func (p *ProgressTracker) ConfState() pb.ConfState { + return pb.ConfState{ + Voters: p.Voters[0].Slice(), + VotersOutgoing: p.Voters[1].Slice(), + Learners: quorum.MajorityConfig(p.Learners).Slice(), + LearnersNext: quorum.MajorityConfig(p.LearnersNext).Slice(), + AutoLeave: p.AutoLeave, + } +} + // IsSingleton returns true if (and only if) there is only one voting member // (i.e. the leader) in the current configuration. func (p *ProgressTracker) IsSingleton() bool { @@ -166,10 +178,31 @@ func (p *ProgressTracker) Committed() uint64 { return uint64(p.Voters.CommittedIndex(matchAckIndexer(p.Progress))) } -// Visit invokes the supplied closure for all tracked progresses. +func insertionSort(sl []uint64) { + a, b := 0, len(sl) + for i := a + 1; i < b; i++ { + for j := i; j > a && sl[j] < sl[j-1]; j-- { + sl[j], sl[j-1] = sl[j-1], sl[j] + } + } +} + +// Visit invokes the supplied closure for all tracked progresses in stable order. func (p *ProgressTracker) Visit(f func(id uint64, pr *Progress)) { - for id, pr := range p.Progress { - f(id, pr) + n := len(p.Progress) + var sl [7]uint64 + ids := sl[:] + if n > len(sl) { + ids = make([]uint64, n) + } + ids = sl[:n] + for id := range p.Progress { + n-- + ids[n] = id + } + insertionSort(ids) + for _, id := range ids { + f(id, p.Progress[id]) } } diff --git a/raft/util.go b/raft/util.go index c145d26dd7f4..c44e3550a4b1 100644 --- a/raft/util.go +++ b/raft/util.go @@ -25,13 +25,6 @@ func (st StateType) MarshalJSON() ([]byte, error) { return []byte(fmt.Sprintf("%q", st.String())), nil } -// uint64Slice implements sort interface -type uint64Slice []uint64 - -func (p uint64Slice) Len() int { return len(p) } -func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] } -func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } - func min(a, b uint64) uint64 { if a > b { return b