Skip to content

Commit

Permalink
raft: introducing transport package
Browse files Browse the repository at this point in the history
This package is separate grpc transport layer for raft package. Before we
used membership package + one very big method in raft package.

Signed-off-by: Alexander Morozov <[email protected]>
  • Loading branch information
LK4D4 committed Jan 10, 2017
1 parent 62d835f commit acee090
Show file tree
Hide file tree
Showing 12 changed files with 1,479 additions and 538 deletions.
15 changes: 15 additions & 0 deletions log/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,21 @@ func WithLogger(ctx context.Context, logger *logrus.Entry) context.Context {
return context.WithValue(ctx, loggerKey{}, logger)
}

// WithFields returns a new context with added fields to logger.
func WithFields(ctx context.Context, fields logrus.Fields) context.Context {
logger := ctx.Value(loggerKey{})

if logger == nil {
logger = L
}
return WithLogger(ctx, logger.(*logrus.Entry).WithFields(fields))
}

// WithField is convenience wrapper around WithFields.
func WithField(ctx context.Context, key, value string) context.Context {
return WithFields(ctx, logrus.Fields{key: value})
}

// GetLogger retrieves the current logger from the context. If no logger is
// available, the default logger is returned.
func GetLogger(ctx context.Context) *logrus.Entry {
Expand Down
13 changes: 10 additions & 3 deletions manager/controlapi/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,6 @@ func TestListManagerNodes(t *testing.T) {
return nil
}))

// Switch the raft node used by the server
ts.Server.raft = nodes[2].Node

// Stop node 1 (leader)
nodes[1].Server.Stop()
nodes[1].ShutdownRaft()
Expand All @@ -390,6 +387,16 @@ func TestListManagerNodes(t *testing.T) {
// Wait for the re-election to occur
raftutils.WaitForCluster(t, clockSource, newCluster)

var leaderNode *raftutils.TestNode
for _, node := range newCluster {
if node.IsLeader() {
leaderNode = node
}
}

// Switch the raft node used by the server
ts.Server.raft = leaderNode.Node

// Node 1 should not be the leader anymore
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
r, err = ts.Client.ListNodes(context.Background(), &api.ListNodesRequest{})
Expand Down
219 changes: 32 additions & 187 deletions manager/state/raft/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,12 @@ package membership

import (
"errors"
"fmt"
"sync"

"google.golang.org/grpc"

"github.com/coreos/etcd/raft/raftpb"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/watch"
"github.com/gogo/protobuf/proto"
"golang.org/x/net/context"
)

var (
Expand All @@ -25,101 +21,39 @@ var (
ErrConfigChangeInvalid = errors.New("membership: ConfChange type should be either AddNode, RemoveNode or UpdateNode")
// ErrCannotUnmarshalConfig is thrown when a node cannot unmarshal a configuration change
ErrCannotUnmarshalConfig = errors.New("membership: cannot unmarshal configuration change")
// ErrMemberRemoved is thrown when a node was removed from the cluster
ErrMemberRemoved = errors.New("raft: member was removed from the cluster")
)

// deferredConn used to store removed members connection for some time.
// We need this in case if removed node is redirector or endpoint of ControlAPI call.
type deferredConn struct {
tick int
conn *grpc.ClientConn
}

// Cluster represents a set of active
// raft Members
type Cluster struct {
mu sync.RWMutex
members map[uint64]*Member
deferedConns map[*deferredConn]struct{}
mu sync.RWMutex
members map[uint64]*Member

// removed contains the list of removed Members,
// those ids cannot be reused
removed map[uint64]bool
heartbeatTicks int
removed map[uint64]bool

PeersBroadcast *watch.Queue
}

// Member represents a raft Cluster Member
type Member struct {
*api.RaftMember

Conn *grpc.ClientConn
tick int
active bool
lastSeenHost string
}

// HealthCheck sends a health check RPC to the member and returns the response.
func (member *Member) HealthCheck(ctx context.Context) error {
healthClient := api.NewHealthClient(member.Conn)
resp, err := healthClient.Check(ctx, &api.HealthCheckRequest{Service: "Raft"})
if err != nil {
return err
}
if resp.Status != api.HealthCheckResponse_SERVING {
return fmt.Errorf("health check returned status %s", resp.Status.String())
}
return nil
}

// NewCluster creates a new Cluster neighbors list for a raft Member.
// Member marked as inactive if there was no call ReportActive for heartbeatInterval.
func NewCluster(heartbeatTicks int) *Cluster {
func NewCluster() *Cluster {
// TODO(abronan): generate Cluster ID for federation

return &Cluster{
members: make(map[uint64]*Member),
removed: make(map[uint64]bool),
deferedConns: make(map[*deferredConn]struct{}),
heartbeatTicks: heartbeatTicks,
PeersBroadcast: watch.NewQueue(),
}
}

func (c *Cluster) handleInactive() {
for _, m := range c.members {
if !m.active {
continue
}
m.tick++
if m.tick > c.heartbeatTicks {
m.active = false
if m.Conn != nil {
m.Conn.Close()
}
}
}
}

func (c *Cluster) handleDeferredConns() {
for dc := range c.deferedConns {
dc.tick++
if dc.tick > c.heartbeatTicks {
dc.conn.Close()
delete(c.deferedConns, dc)
}
}
}

// Tick increases ticks for all members. After heartbeatTicks node marked as
// inactive.
func (c *Cluster) Tick() {
c.mu.Lock()
defer c.mu.Unlock()
c.handleInactive()
c.handleDeferredConns()
}

// Members returns the list of raft Members in the Cluster.
func (c *Cluster) Members() map[uint64]*Member {
members := make(map[uint64]*Member)
Expand Down Expand Up @@ -168,8 +102,6 @@ func (c *Cluster) AddMember(member *Member) error {
if c.removed[member.RaftID] {
return ErrIDRemoved
}
member.active = true
member.tick = 0

c.members[member.RaftID] = member

Expand All @@ -187,55 +119,47 @@ func (c *Cluster) RemoveMember(id uint64) error {
return c.clearMember(id)
}

// ClearMember removes a node from the Cluster Memberlist, but does NOT add it
// to the removed list.
func (c *Cluster) ClearMember(id uint64) error {
// UpdateMember updates member address.
func (c *Cluster) UpdateMember(id uint64, m *api.RaftMember) error {
c.mu.Lock()
defer c.mu.Unlock()

return c.clearMember(id)
}

func (c *Cluster) clearMember(id uint64) error {
m, ok := c.members[id]
if ok {
if m.Conn != nil {
// defer connection close to after heartbeatTicks
dConn := &deferredConn{conn: m.Conn}
c.deferedConns[dConn] = struct{}{}
}
delete(c.members, id)
if c.removed[id] {
return ErrIDRemoved
}
c.broadcastUpdate()
return nil
}

// ReplaceMemberConnection replaces the member's GRPC connection.
func (c *Cluster) ReplaceMemberConnection(id uint64, oldConn *Member, newConn *Member, newAddr string, force bool) error {
c.mu.Lock()
defer c.mu.Unlock()

oldMember, ok := c.members[id]
if !ok {
return ErrIDNotFound
}

if !force && oldConn.Conn != oldMember.Conn {
// The connection was already replaced. Don't do it again.
newConn.Conn.Close()
return nil
if oldMember.NodeID != m.NodeID {
// Should never happen; this is a sanity check
return errors.New("node ID mismatch match on node update")
}

if oldMember.Conn != nil {
oldMember.Conn.Close()
if oldMember.Addr == m.Addr {
// nothing to do
return nil
}
oldMember.RaftMember = m
return nil
}

// ClearMember removes a node from the Cluster Memberlist, but does NOT add it
// to the removed list.
func (c *Cluster) ClearMember(id uint64) error {
c.mu.Lock()
defer c.mu.Unlock()

newMember := *oldMember
newMember.RaftMember = oldMember.RaftMember.Copy()
newMember.RaftMember.Addr = newAddr
newMember.Conn = newConn.Conn
c.members[id] = &newMember
return c.clearMember(id)
}

func (c *Cluster) clearMember(id uint64) error {
if _, ok := c.members[id]; ok {
delete(c.members, id)
c.broadcastUpdate()
}
return nil
}

Expand All @@ -249,60 +173,12 @@ func (c *Cluster) IsIDRemoved(id uint64) bool {
// Clear resets the list of active Members and removed Members.
func (c *Cluster) Clear() {
c.mu.Lock()
for _, member := range c.members {
if member.Conn != nil {
member.Conn.Close()
}
}

for dc := range c.deferedConns {
dc.conn.Close()
}

c.members = make(map[uint64]*Member)
c.removed = make(map[uint64]bool)
c.deferedConns = make(map[*deferredConn]struct{})
c.mu.Unlock()
}

// ReportActive reports that member is active (called ProcessRaftMessage),
func (c *Cluster) ReportActive(id uint64, sourceHost string) {
c.mu.Lock()
defer c.mu.Unlock()
m, ok := c.members[id]
if !ok {
return
}
m.tick = 0
m.active = true
if sourceHost != "" {
m.lastSeenHost = sourceHost
}
}

// Active returns true if node is active.
func (c *Cluster) Active(id uint64) bool {
c.mu.RLock()
defer c.mu.RUnlock()
m, ok := c.members[id]
if !ok {
return false
}
return m.active
}

// LastSeenHost returns the last observed source address that the specified
// member connected from.
func (c *Cluster) LastSeenHost(id uint64) string {
c.mu.RLock()
defer c.mu.RUnlock()
m, ok := c.members[id]
if ok {
return m.lastSeenHost
}
return ""
}

// ValidateConfigurationChange takes a proposed ConfChange and
// ensures that it is valid.
func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
Expand Down Expand Up @@ -334,34 +210,3 @@ func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
}
return nil
}

// CanRemoveMember checks if removing a Member would not result in a loss
// of quorum, this check is needed before submitting a configuration change
// that might block or harm the Cluster on Member recovery
func (c *Cluster) CanRemoveMember(from uint64, id uint64) bool {
members := c.Members()
nreachable := 0 // reachable managers after removal

for _, m := range members {
if m.RaftID == id {
continue
}

// Local node from where the remove is issued
if m.RaftID == from {
nreachable++
continue
}

if c.Active(m.RaftID) {
nreachable++
}
}

nquorum := (len(members)-1)/2 + 1
if nreachable < nquorum {
return false
}

return true
}
4 changes: 2 additions & 2 deletions manager/state/raft/membership/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func newTestMember(id uint64) *membership.Member {
}

func newTestCluster(members []*membership.Member, removed []*membership.Member) *membership.Cluster {
c := membership.NewCluster(3)
c := membership.NewCluster()
for _, m := range members {
c.AddMember(m)
}
Expand Down Expand Up @@ -79,7 +79,7 @@ func TestClusterMember(t *testing.T) {
}

func TestMembers(t *testing.T) {
cls := membership.NewCluster(1)
cls := membership.NewCluster()
defer cls.Clear()
cls.AddMember(&membership.Member{RaftMember: &api.RaftMember{RaftID: 1}})
cls.AddMember(&membership.Member{RaftMember: &api.RaftMember{RaftID: 5}})
Expand Down
Loading

0 comments on commit acee090

Please sign in to comment.