Skip to content

Commit

Permalink
raft: allow use of joint quorums
Browse files Browse the repository at this point in the history
This change introduces joint quorums by changing the Node and RawNode
API to accept pb.ConfChangeV2 (on top of pb.ConfChange).

pb.ConfChange continues to work as today: it allows carrying out a
single configuration change. A pb.ConfChange proposal gets added to
the Raft log as such and is thus also observed by the app during Ready
handling, and fed back to ApplyConfChange.

ConfChangeV2 allows joint configuration changes but will continue to
carry out configuration changes in "one phase" (i.e. without ever
entering a joint config) when this is possible.
  • Loading branch information
tbg committed Jul 22, 2019
1 parent a5b5e61 commit 2ebe0f3
Show file tree
Hide file tree
Showing 16 changed files with 544 additions and 140 deletions.
4 changes: 2 additions & 2 deletions etcdserver/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
}
}

// TestConfgChangeBlocksApply ensures apply blocks if committed entries contain config-change.
func TestConfgChangeBlocksApply(t *testing.T) {
// TestConfigChangeBlocksApply ensures apply blocks if committed entries contain config-change.
func TestConfigChangeBlocksApply(t *testing.T) {
n := newNopReadyNode()

r := newRaftNode(raftNodeConfig{
Expand Down
2 changes: 1 addition & 1 deletion raft/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (rn *RawNode) Bootstrap(peers []Peer) error {
// the invariant that committed < unstable?
rn.raft.raftLog.committed = uint64(len(ents))
for _, peer := range peers {
rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode})
rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}.AsV2())
}
return nil
}
10 changes: 7 additions & 3 deletions raft/confchange/confchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Changer struct {
// (Section 4.3) corresponds to `C_{new,old}`.
//
// [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf
func (c Changer) EnterJoint(ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.ProgressMap, error) {
func (c Changer) EnterJoint(autoLeave bool, ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.ProgressMap, error) {
cfg, prs, err := c.checkAndCopy()
if err != nil {
return c.err(err)
Expand Down Expand Up @@ -74,7 +74,7 @@ func (c Changer) EnterJoint(ccs ...pb.ConfChangeSingle) (tracker.Config, tracker
if err := c.apply(&cfg, prs, ccs...); err != nil {
return c.err(err)
}

cfg.AutoLeave = autoLeave
return checkAndReturn(cfg, prs)
}

Expand Down Expand Up @@ -120,6 +120,7 @@ func (c Changer) LeaveJoint() (tracker.Config, tracker.ProgressMap, error) {
}
}
*outgoingPtr(&cfg.Voters) = nil
cfg.AutoLeave = false

return checkAndReturn(cfg, prs)
}
Expand All @@ -142,7 +143,7 @@ func (c Changer) Simple(ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.Pro
return c.err(err)
}
if n := symdiff(incoming(c.Tracker.Voters), incoming(cfg.Voters)); n > 1 {
return tracker.Config{}, nil, errors.New("more than voter changed without entering joint config")
return tracker.Config{}, nil, errors.New("more than one voter changed without entering joint config")
}
if err := checkInvariants(cfg, prs); err != nil {
return tracker.Config{}, tracker.ProgressMap{}, nil
Expand Down Expand Up @@ -327,6 +328,9 @@ func checkInvariants(cfg tracker.Config, prs tracker.ProgressMap) error {
if cfg.LearnersNext != nil {
return fmt.Errorf("LearnersNext must be nil when not joint")
}
if cfg.AutoLeave {
return fmt.Errorf("AutoLeave must be false when not joint")
}
}

return nil
Expand Down
6 changes: 5 additions & 1 deletion raft/confchange/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,11 @@ func TestConfChangeDataDriven(t *testing.T) {
case "simple":
cfg, prs, err = c.Simple(ccs...)
case "enter-joint":
cfg, prs, err = c.EnterJoint(ccs...)
var autoLeave bool
if len(d.CmdArgs) > 0 {
d.ScanArgs(t, "autoleave", &autoLeave)
}
cfg, prs, err = c.EnterJoint(autoLeave, ccs...)
case "leave-joint":
if len(ccs) > 0 {
err = errors.New("this command takes no input")
Expand Down
2 changes: 1 addition & 1 deletion raft/confchange/quick_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestConfChangeQuick(t *testing.T) {
const infoCount = 5

runWithJoint := func(c *Changer, ccs []pb.ConfChangeSingle) error {
cfg, prs, err := c.EnterJoint(ccs...)
cfg, prs, err := c.EnterJoint(false /* autoLeave */, ccs...)
if err != nil {
return err
}
Expand Down
29 changes: 29 additions & 0 deletions raft/confchange/testdata/joint_autoleave.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Test the autoleave argument to EnterJoint. It defaults to false in the
# datadriven tests. The flag has no associated semantics in this package,
# it is simply passed through.
simple
v1
----
voters=(1)
1: StateProbe match=0 next=1

# Autoleave is reflected in the config.
enter-joint autoleave=true
v2 v3
----
voters=(1 2 3)&&(1) autoleave
1: StateProbe match=0 next=1
2: StateProbe match=0 next=2
3: StateProbe match=0 next=2

# Can't enter-joint twice, even if autoleave changes.
enter-joint autoleave=false
----
config is already joint

leave-joint
----
voters=(1 2 3)
1: StateProbe match=0 next=1
2: StateProbe match=0 next=2
3: StateProbe match=0 next=2
6 changes: 3 additions & 3 deletions raft/confchange/testdata/simple_safety.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ voters=(1 2) learners=(3)
simple
r1 v5
----
more than voter changed without entering joint config
more than one voter changed without entering joint config

simple
r1 r2
Expand All @@ -30,12 +30,12 @@ removed all voters
simple
v3 v4
----
more than voter changed without entering joint config
more than one voter changed without entering joint config

simple
l1 v5
----
more than voter changed without entering joint config
more than one voter changed without entering joint config

simple
l1 l2
Expand Down
75 changes: 57 additions & 18 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,20 @@ type Node interface {
// Propose proposes that data be appended to the log. Note that proposals can be lost without
// notice, therefore it is user's job to ensure proposal retries.
Propose(ctx context.Context, data []byte) error
// ProposeConfChange proposes config change.
// At most one ConfChange can be in the process of going through consensus.
// Application needs to call ApplyConfChange when applying EntryConfChange type entry.
ProposeConfChange(ctx context.Context, cc pb.ConfChange) error
// ProposeConfChange proposes a configuration change. Like any proposal, the
// configuration change may be dropped with or without an error being
// returned. In particular, configuration changes are dropped unless the
// leader has certainty that there is no prior unapplied configuration
// change in its log.
//
// The method accepts either a pb.ConfChange (deprecated) or pb.ConfChangeV2
// message. The latter allows arbitrary configuration changes via joint
// consensus, notably including replacing a voter. Passing a ConfChangeV2
// message is only allowed if all Nodes participating in the cluster run a
// version of this library aware of the V2 API. See pb.ConfChangeV2 for
// usage details and semantics.
ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error

// Step advances the state machine using the given message. ctx.Err() will be returned, if any.
Step(ctx context.Context, msg pb.Message) error

Expand All @@ -156,11 +166,13 @@ type Node interface {
// a long time to apply the snapshot data. To continue receiving Ready without blocking raft
// progress, it can call Advance before finishing applying the last ready.
Advance()
// ApplyConfChange applies config change to the local node.
// Returns an opaque ConfState protobuf which must be recorded
// in snapshots. Will never return nil; it returns a pointer only
// to match MemoryStorage.Compact.
ApplyConfChange(cc pb.ConfChange) *pb.ConfState
// ApplyConfChange applies a config change (previously passed to
// ProposeConfChange) to the node. This must be called whenever a config
// change is observed in Ready.CommittedEntries.
//
// Returns an opaque non-nil ConfState protobuf which must be recorded in
// snapshots.
ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState

// TransferLeadership attempts to transfer leadership to the given transferee.
TransferLeadership(ctx context.Context, lead, transferee uint64)
Expand Down Expand Up @@ -240,7 +252,7 @@ type msgWithResult struct {
type node struct {
propc chan msgWithResult
recvc chan pb.Message
confc chan pb.ConfChange
confc chan pb.ConfChangeV2
confstatec chan pb.ConfState
readyc chan Ready
advancec chan struct{}
Expand All @@ -256,7 +268,7 @@ func newNode() node {
return node{
propc: make(chan msgWithResult),
recvc: make(chan pb.Message),
confc: make(chan pb.ConfChange),
confc: make(chan pb.ConfChangeV2),
confstatec: make(chan pb.ConfState),
readyc: make(chan Ready),
advancec: make(chan struct{}),
Expand Down Expand Up @@ -344,8 +356,16 @@ func (n *node) run(rn *RawNode) {
cs := r.applyConfChange(cc)
if _, ok := r.prs.Progress[r.id]; !ok {
// block incoming proposal when local node is
// removed
if cc.NodeID == r.id {
// removed.
all := append([]uint64(nil), cs.Nodes...)
all = append(all, cs.NodesJoint...)
var found bool
for _, id := range all {
if id == r.id {
found = true
}
}
if !found {
propc = nil
}
}
Expand Down Expand Up @@ -397,12 +417,31 @@ func (n *node) Step(ctx context.Context, m pb.Message) error {
return n.step(ctx, m)
}

func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error {
data, err := cc.Marshal()
func confChangeToMsg(cc pb.ConfChangeI) (pb.Message, error) {
var typ pb.EntryType
var data []byte
var err error
if ccc, legacy := cc.AsV1(); legacy {
typ = pb.EntryConfChange
data, err = ccc.Marshal()
} else {
ccc := cc.AsV2()

typ = pb.EntryConfChangeV2
data, err = ccc.Marshal()
}
if err != nil {
return pb.Message{}, err
}
return pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: typ, Data: data}}}, nil
}

func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error {
msg, err := confChangeToMsg(cc)
if err != nil {
return err
}
return n.Step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}})
return n.Step(ctx, msg)
}

func (n *node) step(ctx context.Context, m pb.Message) error {
Expand Down Expand Up @@ -463,10 +502,10 @@ func (n *node) Advance() {
}
}

func (n *node) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
func (n *node) ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState {
var cs pb.ConfState
select {
case n.confc <- cc:
case n.confc <- cc.AsV2():
case <-n.done:
}
select {
Expand Down
10 changes: 10 additions & 0 deletions raft/quorum/majority.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ func (c MajorityConfig) Describe(l AckedIndexer) string {
return buf.String()
}

// Slice returns the MajorityConfig as a sorted slice.
func (c MajorityConfig) Slice() []uint64 {
var sl []uint64
for id := range c {
sl = append(sl, id)
}
sort.Slice(sl, func(i, j int) bool { return sl[i] < sl[j] })
return sl
}

type uint64Slice []uint64

func insertionSort(sl uint64Slice) {
Expand Down
Loading

0 comments on commit 2ebe0f3

Please sign in to comment.