Skip to content

Commit

Permalink
Merge pull request #268 from ktsakalozos/MK-1304/failure-domains
Browse files Browse the repository at this point in the history
Implement role re-assignment based on failure domains
  • Loading branch information
cole-miller authored Sep 7, 2023
2 parents f54a152 + 64d79b3 commit 64c24c7
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 21 deletions.
2 changes: 1 addition & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func (a *App) Handover(ctx context.Context) error {
return fmt.Errorf("cluster servers: %w", err)
}
changes := a.makeRolesChanges(nodes)
voters := changes.list(client.Voter, true)
voters := changes.list(client.Voter, true, nil)

for i, voter := range voters {
if voter.Address == a.address {
Expand Down
75 changes: 73 additions & 2 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,78 @@ func TestRolesAdjustment_ReplaceVoterHonorFailureDomain(t *testing.T) {
assert.Equal(t, client.Voter, cluster[5].Role)
}

// If cluster is imbalanced (all voters in one failure domain), roles get re-shuffled.
func TestRolesAdjustment_ImbalancedFailureDomain(t *testing.T) {
n := 8
apps := make([]*app.App, n)
cleanups := make([]func(), n)

for i := 0; i < n; i++ {
addr := fmt.Sprintf("127.0.0.1:900%d", i+1)
// Half of the nodes will go to failure domain 0 and half on failure domain 1
fd := 0
if i > n/2 {
fd = 1
}
options := []app.Option{
app.WithAddress(addr),
app.WithRolesAdjustmentFrequency(4 * time.Second),
app.WithFailureDomain(uint64(fd)),
}
if i > 0 {
options = append(options, app.WithCluster([]string{"127.0.0.1:9001"}))
}

// Nodes on failure domain 0 are started first so all voters are initially there.
app, cleanup := newApp(t, options...)

require.NoError(t, app.Ready(context.Background()))

apps[i] = app
cleanups[i] = cleanup
}

for i := 0; i < n; i++ {
defer cleanups[i]()
}

for i := 0; i < n; i++ {
cli, err := apps[i].Client(context.Background())
require.NoError(t, err)
require.NoError(t, cli.Weight(context.Background(), uint64(n-i)))
defer cli.Close()
}

time.Sleep(18 * time.Second)

cli, err := apps[0].Leader(context.Background())
require.NoError(t, err)
defer cli.Close()

cluster, err := cli.Cluster(context.Background())
require.NoError(t, err)

domain := map[int]bool{
0: false,
1: false,
}
for i := 0; i < n; i++ {
// We know we have started half of the nodes in failure domain 0 and the other half on failure domain 1
fd := 0
if i > n/2 {
fd = 1
}
if cluster[i].Role == client.Voter {
domain[fd] = true
}
}

// All domain must have a voter
for _, voters := range domain {
assert.True(t, voters)
}
}

// If a voter goes offline, another node takes its place. Preference will be
// given to candidates with lower weights.
func TestRolesAdjustment_ReplaceVoterHonorWeight(t *testing.T) {
Expand Down Expand Up @@ -1042,7 +1114,6 @@ func TestExternalConnWithTCP(t *testing.T) {
assert.Equal(t, client.Voter, cluster[2].Role)
}


// TestExternalPipe creates a 3-member cluster using net.Pipe
// and ensures the cluster is successfully created, and that the connection is
// handled manually.
Expand All @@ -1062,7 +1133,7 @@ func TestExternalConnWithPipe(t *testing.T) {

dialFunc := func(_ context.Context, addr string) (net.Conn, error) {
client, server := net.Pipe()

dialChannels[addr] <- server

return client, nil
Expand Down
122 changes: 104 additions & 18 deletions app/roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func (c *RolesChanges) Assume(id uint64) client.NodeRole {
return -1
}

onlineVoters := c.list(client.Voter, true)
onlineStandbys := c.list(client.StandBy, true)
onlineVoters := c.list(client.Voter, true, nil)
onlineStandbys := c.list(client.StandBy, true, nil)

// If we have already the desired number of online voters and
// stand-bys, there's nothing to do.
Expand Down Expand Up @@ -94,7 +94,7 @@ func (c *RolesChanges) Handover(id uint64) (client.NodeRole, []client.NodeInfo)

// Make a list of all online nodes with the same role and get their
// failure domains.
peers := c.list(node.Role, true)
peers := c.list(node.Role, true, nil)
for i := range peers {
if peers[i].ID == node.ID {
peers = append(peers[:i], peers[i+1:]...)
Expand All @@ -104,12 +104,12 @@ func (c *RolesChanges) Handover(id uint64) (client.NodeRole, []client.NodeInfo)
domains := c.failureDomains(peers)

// Online spare nodes are always candidates.
candidates := c.list(client.Spare, true)
candidates := c.list(client.Spare, true, nil)

// Stand-by nodes are candidates if we need to transfer voting
// rights, and they are preferred over spares.
if node.Role == client.Voter {
candidates = append(c.list(client.StandBy, true), candidates...)
candidates = append(c.list(client.StandBy, true, nil), candidates...)
}

if len(candidates) == 0 {
Expand Down Expand Up @@ -142,10 +142,28 @@ func (c *RolesChanges) Adjust(leader uint64) (client.NodeRole, []client.NodeInfo
return -1, nil
}

onlineVoters := c.list(client.Voter, true)
onlineStandbys := c.list(client.StandBy, true)
offlineVoters := c.list(client.Voter, false)
offlineStandbys := c.list(client.StandBy, false)
onlineVoters := c.list(client.Voter, true, nil)
onlineStandbys := c.list(client.StandBy, true, nil)
offlineVoters := c.list(client.Voter, false, nil)
offlineStandbys := c.list(client.StandBy, false, nil)

domainsWithVoters := c.failureDomains(onlineVoters)
allDomains := c.allFailureDomains()

// If we do not have voters on all failure domains and we have a domain with more than one voters
// we may need to send voters to domains without voters.
if len(domainsWithVoters) < len(allDomains) && len(domainsWithVoters) < len(onlineVoters) {
// Find the domains we need to populate with voters
domainsWithoutVoters := c.domainsSubtract(allDomains, domainsWithVoters)
// Find nodes in the domains we need to populate
candidates := c.list(client.StandBy, true, domainsWithoutVoters)
candidates = append(candidates, c.list(client.Spare, true, domainsWithoutVoters)...)

if len(candidates) > 0 {
c.sortCandidates(candidates, domainsWithoutVoters)
return client.Voter, candidates
}
}

// If we have exactly the desired number of voters and stand-bys, and they are all
// online, we're good.
Expand All @@ -156,16 +174,15 @@ func (c *RolesChanges) Adjust(leader uint64) (client.NodeRole, []client.NodeInfo
// If we have less online voters than desired, let's try to promote
// some other node.
if n := len(onlineVoters); n < c.Config.Voters {
candidates := c.list(client.StandBy, true)
candidates = append(candidates, c.list(client.Spare, true)...)
candidates := c.list(client.StandBy, true, nil)
candidates = append(candidates, c.list(client.Spare, true, nil)...)

if len(candidates) == 0 {
return -1, nil
}

domains := c.failureDomains(onlineVoters)
c.sortCandidates(candidates, domains)

return client.Voter, candidates
}

Expand All @@ -181,7 +198,7 @@ func (c *RolesChanges) Adjust(leader uint64) (client.NodeRole, []client.NodeInfo
nodes = append(nodes, node)
}

return client.Spare, nodes
return client.Spare, c.sortVoterCandidatesToDemote(nodes)
}

// If we have offline voters, let's demote one of them.
Expand All @@ -192,7 +209,7 @@ func (c *RolesChanges) Adjust(leader uint64) (client.NodeRole, []client.NodeInfo
// If we have less online stand-bys than desired, let's try to promote
// some other node.
if n := len(onlineStandbys); n < c.Config.StandBys {
candidates := c.list(client.Spare, true)
candidates := c.list(client.Spare, true, nil)

if len(candidates) == 0 {
return -1, nil
Expand Down Expand Up @@ -243,20 +260,22 @@ func (c *RolesChanges) get(id uint64) *client.NodeInfo {
return nil
}

// Return the online or offline nodes with the given role.
func (c *RolesChanges) list(role client.NodeRole, online bool) []client.NodeInfo {
// Return the online or offline nodes with the given role (optionally) in specific domains.
func (c *RolesChanges) list(role client.NodeRole, online bool, domains map[uint64]bool) []client.NodeInfo {
nodes := []client.NodeInfo{}
for node, metadata := range c.State {
if node.Role == role && metadata != nil == online {
nodes = append(nodes, node)
if domains == nil || (domains != nil && domains[metadata.FailureDomain]) {
nodes = append(nodes, node)
}
}
}
return nodes
}

// Return the number of online or offline nodes with the given role.
func (c *RolesChanges) count(role client.NodeRole, online bool) int {
return len(c.list(role, online))
return len(c.list(role, online, nil))
}

// Return a map of the failure domains associated with the
Expand All @@ -273,6 +292,30 @@ func (c *RolesChanges) failureDomains(nodes []client.NodeInfo) map[uint64]bool {
return domains
}

// Return a map of all failureDomains with online nodes.
func (c *RolesChanges) allFailureDomains() map[uint64]bool {
domains := map[uint64]bool{}
for _, metadata := range c.State {
if metadata == nil {
continue
}
domains[metadata.FailureDomain] = true
}
return domains
}

// Return a map of domains that is the "from" minus the "subtract".
func (c *RolesChanges) domainsSubtract(from map[uint64]bool, subtract map[uint64]bool) map[uint64]bool {
domains := map[uint64]bool{}
for fd, val := range from {
_, common := subtract[fd]
if !common {
domains[fd] = val
}
}
return domains
}

// Sort the given candidates according to their failure domain and
// weight. Candidates belonging to a failure domain different from the given
// domains take precedence.
Expand All @@ -299,6 +342,49 @@ func (c *RolesChanges) sortCandidates(candidates []client.NodeInfo, domains map[
sort.Slice(candidates, less)
}

// Sort the given candidates according demotion priority. Return the sorted
// We prefer to select a candidate from a domain with multiple candidates.
// We prefer to select the candidate with highest weight.
func (c *RolesChanges) sortVoterCandidatesToDemote(candidates []client.NodeInfo) []client.NodeInfo {
domainsMap := make(map[uint64][]client.NodeInfo)
for _, node := range candidates {
id := c.metadata(node).FailureDomain
domain, exists := domainsMap[id]
if !exists {
domain = []client.NodeInfo{node}
} else {
domain = append(domain, node)
}
domainsMap[id] = domain
}

domains := make([][]client.NodeInfo, 0, len(domainsMap))
for _, domain := range domainsMap {
domains = append(domains, domain)
}

sort.Slice(domains, func(i, j int) bool {
return len(domains[i]) > len(domains[j])
})

for _, domain := range domains {
sort.Slice(domain, func(i, j int) bool {
metadata1 := c.metadata(domain[i])
metadata2 := c.metadata(domain[j])

return metadata1.Weight > metadata2.Weight
})
}

sortedCandidates := make([]client.NodeInfo, 0, len(candidates))
for _, domain := range domains {
sortedCandidates = append(sortedCandidates, domain...)
}

return sortedCandidates

}

// Return the metadata of the given node, if any.
func (c *RolesChanges) metadata(node client.NodeInfo) *client.NodeMetadata {
return c.State[node]
Expand Down

0 comments on commit 64c24c7

Please sign in to comment.