Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement role re-assignment based on failure domains #268

Merged
merged 3 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (a *App) Handover(ctx context.Context) error {
return fmt.Errorf("cluster servers: %w", err)
}
changes := a.makeRolesChanges(nodes)
voters := changes.list(client.Voter, true)
voters := changes.list(client.Voter, true, nil)

for i, voter := range voters {
if voter.Address == a.address {
Expand Down
68 changes: 66 additions & 2 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,71 @@ func TestRolesAdjustment_ReplaceVoterHonorFailureDomain(t *testing.T) {
assert.Equal(t, client.Voter, cluster[5].Role)
}

// If cluster is imbalanced (all voters in one failure domain), roles get re-shuffled.
func TestRolesAdjustment_ImbalancedFailureDomain(t *testing.T) {
n := 8
apps := make([]*app.App, n)
cleanups := make([]func(), n)

for i := 0; i < n; i++ {
addr := fmt.Sprintf("127.0.0.1:900%d", i+1)
// Half of the nodes will go to failure domain 0 and half on failure domain 1
fd := 0
if i > n/2 {
fd = 1
}
options := []app.Option{
app.WithAddress(addr),
app.WithRolesAdjustmentFrequency(4 * time.Second),
app.WithFailureDomain(uint64(fd)),
}
if i > 0 {
options = append(options, app.WithCluster([]string{"127.0.0.1:9001"}))
}

// Nodes on failure domain 0 are started first so all voters are initially there.
app, cleanup := newApp(t, options...)

require.NoError(t, app.Ready(context.Background()))

apps[i] = app
cleanups[i] = cleanup
}

for i := 0; i < n; i++ {
defer cleanups[i]()
}

time.Sleep(18 * time.Second)

cli, err := apps[0].Leader(context.Background())
require.NoError(t, err)
defer cli.Close()

cluster, err := cli.Cluster(context.Background())
require.NoError(t, err)

domain := map[int]bool{
0: false,
1: false,
}
for i := 0; i < n; i++ {
// We know we have started half of the nodes in failure domain 0 and the other half on failure domain 1
fd := 0
if i > n/2 {
fd = 1
}
if cluster[i].Role == client.Voter {
domain[fd] = true
}
}

// All domain must have a voter
for _, voters := range domain {
assert.True(t, voters)
}
}

// If a voter goes offline, another node takes its place. Preference will be
// given to candidates with lower weights.
func TestRolesAdjustment_ReplaceVoterHonorWeight(t *testing.T) {
Expand Down Expand Up @@ -1042,7 +1107,6 @@ func TestExternalConnWithTCP(t *testing.T) {
assert.Equal(t, client.Voter, cluster[2].Role)
}


// TestExternalPipe creates a 3-member cluster using net.Pipe
// and ensures the cluster is successfully created, and that the connection is
// handled manually.
Expand All @@ -1062,7 +1126,7 @@ func TestExternalConnWithPipe(t *testing.T) {

dialFunc := func(_ context.Context, addr string) (net.Conn, error) {
client, server := net.Pipe()

dialChannels[addr] <- server

return client, nil
Expand Down
115 changes: 98 additions & 17 deletions app/roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func (c *RolesChanges) Assume(id uint64) client.NodeRole {
return -1
}

onlineVoters := c.list(client.Voter, true)
onlineStandbys := c.list(client.StandBy, true)
onlineVoters := c.list(client.Voter, true, nil)
onlineStandbys := c.list(client.StandBy, true, nil)

// If we have already the desired number of online voters and
// stand-bys, there's nothing to do.
Expand Down Expand Up @@ -94,7 +94,7 @@ func (c *RolesChanges) Handover(id uint64) (client.NodeRole, []client.NodeInfo)

// Make a list of all online nodes with the same role and get their
// failure domains.
peers := c.list(node.Role, true)
peers := c.list(node.Role, true, nil)
for i := range peers {
if peers[i].ID == node.ID {
peers = append(peers[:i], peers[i+1:]...)
Expand All @@ -104,12 +104,12 @@ func (c *RolesChanges) Handover(id uint64) (client.NodeRole, []client.NodeInfo)
domains := c.failureDomains(peers)

// Online spare nodes are always candidates.
candidates := c.list(client.Spare, true)
candidates := c.list(client.Spare, true, nil)

// Stand-by nodes are candidates if we need to transfer voting
// rights, and they are preferred over spares.
if node.Role == client.Voter {
candidates = append(c.list(client.StandBy, true), candidates...)
candidates = append(c.list(client.StandBy, true, nil), candidates...)
}

if len(candidates) == 0 {
Expand Down Expand Up @@ -142,10 +142,30 @@ func (c *RolesChanges) Adjust(leader uint64) (client.NodeRole, []client.NodeInfo
return -1, nil
}

onlineVoters := c.list(client.Voter, true)
onlineStandbys := c.list(client.StandBy, true)
offlineVoters := c.list(client.Voter, false)
offlineStandbys := c.list(client.StandBy, false)
onlineVoters := c.list(client.Voter, true, nil)
onlineStandbys := c.list(client.StandBy, true, nil)
offlineVoters := c.list(client.Voter, false, nil)
offlineStandbys := c.list(client.StandBy, false, nil)

domainsWithVoters := c.failureDomains(onlineVoters)
allDomains := c.allFailureDomains()

// If we do not have voters on all failure domains and we have a domain with more than one voters
// we may need to send voters to domains without voters.
if len(domainsWithVoters) < len(allDomains) && len(domainsWithVoters) < len(onlineVoters) {
// Find the domains we need to populate with voters
domainsWithoutVoters := c.domainsSubtract(allDomains, domainsWithVoters)
// Find StandBys in the domains we need to populate
candidates := c.list(client.StandBy, true, domainsWithoutVoters)
// If we do not have StandBys to promote see if we have Spares
if len(candidates) <= 0 {
candidates = c.list(client.Spare, true, domainsWithoutVoters)
}
if len(candidates) > 0 {
c.sortCandidates(candidates, domainsWithoutVoters)
return client.Voter, candidates
}
cole-miller marked this conversation as resolved.
Show resolved Hide resolved
}

// If we have exactly the desired number of voters and stand-bys, and they are all
// online, we're good.
Expand All @@ -156,16 +176,15 @@ func (c *RolesChanges) Adjust(leader uint64) (client.NodeRole, []client.NodeInfo
// If we have less online voters than desired, let's try to promote
// some other node.
if n := len(onlineVoters); n < c.Config.Voters {
candidates := c.list(client.StandBy, true)
candidates = append(candidates, c.list(client.Spare, true)...)
candidates := c.list(client.StandBy, true, nil)
candidates = append(candidates, c.list(client.Spare, true, nil)...)

if len(candidates) == 0 {
return -1, nil
}

domains := c.failureDomains(onlineVoters)
c.sortCandidates(candidates, domains)

return client.Voter, candidates
}

Expand All @@ -181,6 +200,7 @@ func (c *RolesChanges) Adjust(leader uint64) (client.NodeRole, []client.NodeInfo
nodes = append(nodes, node)
}

c.sortVoterCandidatesToDemote(nodes)
return client.Spare, nodes
}

Expand All @@ -192,7 +212,7 @@ func (c *RolesChanges) Adjust(leader uint64) (client.NodeRole, []client.NodeInfo
// If we have less online stand-bys than desired, let's try to promote
// some other node.
if n := len(onlineStandbys); n < c.Config.StandBys {
candidates := c.list(client.Spare, true)
candidates := c.list(client.Spare, true, nil)

if len(candidates) == 0 {
return -1, nil
Expand Down Expand Up @@ -243,20 +263,22 @@ func (c *RolesChanges) get(id uint64) *client.NodeInfo {
return nil
}

// Return the online or offline nodes with the given role.
func (c *RolesChanges) list(role client.NodeRole, online bool) []client.NodeInfo {
// Return the online or offline nodes with the given role (optionally) in specific domains.
func (c *RolesChanges) list(role client.NodeRole, online bool, domains map[uint64]bool) []client.NodeInfo {
nodes := []client.NodeInfo{}
for node, metadata := range c.State {
if node.Role == role && metadata != nil == online {
nodes = append(nodes, node)
if domains == nil || (domains != nil && domains[metadata.FailureDomain]) {
nodes = append(nodes, node)
}
}
}
return nodes
}

// Return the number of online or offline nodes with the given role.
func (c *RolesChanges) count(role client.NodeRole, online bool) int {
return len(c.list(role, online))
return len(c.list(role, online, nil))
}

// Return a map of the failure domains associated with the
Expand All @@ -273,6 +295,30 @@ func (c *RolesChanges) failureDomains(nodes []client.NodeInfo) map[uint64]bool {
return domains
}

// Return a map of all failureDomains with online nodes.
func (c *RolesChanges) allFailureDomains() map[uint64]bool {
domains := map[uint64]bool{}
for _, metadata := range c.State {
if metadata == nil {
continue
}
domains[metadata.FailureDomain] = true
}
return domains
}

// Return a map of domains that is the "from" minus the "subtract".
func (c *RolesChanges) domainsSubtract(from map[uint64]bool, subtract map[uint64]bool) map[uint64]bool {
domains := map[uint64]bool{}
for fd, val := range from {
_, common := subtract[fd]
if !common {
domains[fd] = val
}
}
return domains
}

// Sort the given candidates according to their failure domain and
// weight. Candidates belonging to a failure domain different from the given
// domains take precedence.
Expand All @@ -299,6 +345,41 @@ func (c *RolesChanges) sortCandidates(candidates []client.NodeInfo, domains map[
sort.Slice(candidates, less)
}

// Sort the given candidates according demotion priority.
// We prefer to select a candidate from a domain with multiple candidates.
// We prefer to select the candidate with highest weight.
func (c *RolesChanges) sortVoterCandidatesToDemote(candidates []client.NodeInfo) {
less := func(i, j int) bool {
metadata1 := c.metadata(candidates[i])
domain1 := map[uint64]bool{
metadata1.FailureDomain: true,
}
sameDomainAs1 := len(c.list(client.Voter, true, domain1))

metadata2 := c.metadata(candidates[j])
domain2 := map[uint64]bool{
metadata2.FailureDomain: true,
}
sameDomainAs2 := len(c.list(client.Voter, true, domain2))

// If i has more voters on the same domain and j does not,
// then i takes precedence.
if sameDomainAs1 > 1 && sameDomainAs2 <= 1 {
return true
}

// If j has more voters on the same domain and i does not,
// then j takes precedence.
if sameDomainAs2 > 1 && sameDomainAs1 <= 1 {
return false
}

return metadata1.Weight > metadata2.Weight
}

sort.Slice(candidates, less)
cole-miller marked this conversation as resolved.
Show resolved Hide resolved
}

// Return the metadata of the given node, if any.
func (c *RolesChanges) metadata(node client.NodeInfo) *client.NodeMetadata {
return c.State[node]
Expand Down
Loading