diff --git a/app/app.go b/app/app.go index 17252892..77d975b6 100644 --- a/app/app.go +++ b/app/app.go @@ -365,7 +365,7 @@ func (a *App) Handover(ctx context.Context) error { return fmt.Errorf("cluster servers: %w", err) } changes := a.makeRolesChanges(nodes) - voters := changes.list(client.Voter, true) + voters := changes.list(client.Voter, true, nil) for i, voter := range voters { if voter.Address == a.address { diff --git a/app/app_test.go b/app/app_test.go index d45a194c..d773b866 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -632,6 +632,78 @@ func TestRolesAdjustment_ReplaceVoterHonorFailureDomain(t *testing.T) { assert.Equal(t, client.Voter, cluster[5].Role) } +// If cluster is imbalanced (all voters in one failure domain), roles get re-shuffled. +func TestRolesAdjustment_ImbalancedFailureDomain(t *testing.T) { + n := 8 + apps := make([]*app.App, n) + cleanups := make([]func(), n) + + for i := 0; i < n; i++ { + addr := fmt.Sprintf("127.0.0.1:900%d", i+1) + // Half of the nodes will go to failure domain 0 and half on failure domain 1 + fd := 0 + if i > n/2 { + fd = 1 + } + options := []app.Option{ + app.WithAddress(addr), + app.WithRolesAdjustmentFrequency(4 * time.Second), + app.WithFailureDomain(uint64(fd)), + } + if i > 0 { + options = append(options, app.WithCluster([]string{"127.0.0.1:9001"})) + } + + // Nodes on failure domain 0 are started first so all voters are initially there. + app, cleanup := newApp(t, options...) + + require.NoError(t, app.Ready(context.Background())) + + apps[i] = app + cleanups[i] = cleanup + } + + for i := 0; i < n; i++ { + defer cleanups[i]() + } + + for i := 0; i < n; i++ { + cli, err := apps[i].Client(context.Background()) + require.NoError(t, err) + require.NoError(t, cli.Weight(context.Background(), uint64(n-i))) + defer cli.Close() + } + + time.Sleep(18 * time.Second) + + cli, err := apps[0].Leader(context.Background()) + require.NoError(t, err) + defer cli.Close() + + cluster, err := cli.Cluster(context.Background()) + require.NoError(t, err) + + domain := map[int]bool{ + 0: false, + 1: false, + } + for i := 0; i < n; i++ { + // We know we have started half of the nodes in failure domain 0 and the other half on failure domain 1 + fd := 0 + if i > n/2 { + fd = 1 + } + if cluster[i].Role == client.Voter { + domain[fd] = true + } + } + + // All domain must have a voter + for _, voters := range domain { + assert.True(t, voters) + } +} + // If a voter goes offline, another node takes its place. Preference will be // given to candidates with lower weights. func TestRolesAdjustment_ReplaceVoterHonorWeight(t *testing.T) { @@ -1042,7 +1114,6 @@ func TestExternalConnWithTCP(t *testing.T) { assert.Equal(t, client.Voter, cluster[2].Role) } - // TestExternalPipe creates a 3-member cluster using net.Pipe // and ensures the cluster is successfully created, and that the connection is // handled manually. @@ -1062,7 +1133,7 @@ func TestExternalConnWithPipe(t *testing.T) { dialFunc := func(_ context.Context, addr string) (net.Conn, error) { client, server := net.Pipe() - + dialChannels[addr] <- server return client, nil diff --git a/app/roles.go b/app/roles.go index 5b24bdf3..15cc6bb1 100644 --- a/app/roles.go +++ b/app/roles.go @@ -56,8 +56,8 @@ func (c *RolesChanges) Assume(id uint64) client.NodeRole { return -1 } - onlineVoters := c.list(client.Voter, true) - onlineStandbys := c.list(client.StandBy, true) + onlineVoters := c.list(client.Voter, true, nil) + onlineStandbys := c.list(client.StandBy, true, nil) // If we have already the desired number of online voters and // stand-bys, there's nothing to do. @@ -94,7 +94,7 @@ func (c *RolesChanges) Handover(id uint64) (client.NodeRole, []client.NodeInfo) // Make a list of all online nodes with the same role and get their // failure domains. - peers := c.list(node.Role, true) + peers := c.list(node.Role, true, nil) for i := range peers { if peers[i].ID == node.ID { peers = append(peers[:i], peers[i+1:]...) @@ -104,12 +104,12 @@ func (c *RolesChanges) Handover(id uint64) (client.NodeRole, []client.NodeInfo) domains := c.failureDomains(peers) // Online spare nodes are always candidates. - candidates := c.list(client.Spare, true) + candidates := c.list(client.Spare, true, nil) // Stand-by nodes are candidates if we need to transfer voting // rights, and they are preferred over spares. if node.Role == client.Voter { - candidates = append(c.list(client.StandBy, true), candidates...) + candidates = append(c.list(client.StandBy, true, nil), candidates...) } if len(candidates) == 0 { @@ -142,10 +142,28 @@ func (c *RolesChanges) Adjust(leader uint64) (client.NodeRole, []client.NodeInfo return -1, nil } - onlineVoters := c.list(client.Voter, true) - onlineStandbys := c.list(client.StandBy, true) - offlineVoters := c.list(client.Voter, false) - offlineStandbys := c.list(client.StandBy, false) + onlineVoters := c.list(client.Voter, true, nil) + onlineStandbys := c.list(client.StandBy, true, nil) + offlineVoters := c.list(client.Voter, false, nil) + offlineStandbys := c.list(client.StandBy, false, nil) + + domainsWithVoters := c.failureDomains(onlineVoters) + allDomains := c.allFailureDomains() + + // If we do not have voters on all failure domains and we have a domain with more than one voters + // we may need to send voters to domains without voters. + if len(domainsWithVoters) < len(allDomains) && len(domainsWithVoters) < len(onlineVoters) { + // Find the domains we need to populate with voters + domainsWithoutVoters := c.domainsSubtract(allDomains, domainsWithVoters) + // Find nodes in the domains we need to populate + candidates := c.list(client.StandBy, true, domainsWithoutVoters) + candidates = append(candidates, c.list(client.Spare, true, domainsWithoutVoters)...) + + if len(candidates) > 0 { + c.sortCandidates(candidates, domainsWithoutVoters) + return client.Voter, candidates + } + } // If we have exactly the desired number of voters and stand-bys, and they are all // online, we're good. @@ -156,8 +174,8 @@ func (c *RolesChanges) Adjust(leader uint64) (client.NodeRole, []client.NodeInfo // If we have less online voters than desired, let's try to promote // some other node. if n := len(onlineVoters); n < c.Config.Voters { - candidates := c.list(client.StandBy, true) - candidates = append(candidates, c.list(client.Spare, true)...) + candidates := c.list(client.StandBy, true, nil) + candidates = append(candidates, c.list(client.Spare, true, nil)...) if len(candidates) == 0 { return -1, nil @@ -165,7 +183,6 @@ func (c *RolesChanges) Adjust(leader uint64) (client.NodeRole, []client.NodeInfo domains := c.failureDomains(onlineVoters) c.sortCandidates(candidates, domains) - return client.Voter, candidates } @@ -181,7 +198,7 @@ func (c *RolesChanges) Adjust(leader uint64) (client.NodeRole, []client.NodeInfo nodes = append(nodes, node) } - return client.Spare, nodes + return client.Spare, c.sortVoterCandidatesToDemote(nodes) } // If we have offline voters, let's demote one of them. @@ -192,7 +209,7 @@ func (c *RolesChanges) Adjust(leader uint64) (client.NodeRole, []client.NodeInfo // If we have less online stand-bys than desired, let's try to promote // some other node. if n := len(onlineStandbys); n < c.Config.StandBys { - candidates := c.list(client.Spare, true) + candidates := c.list(client.Spare, true, nil) if len(candidates) == 0 { return -1, nil @@ -243,12 +260,14 @@ func (c *RolesChanges) get(id uint64) *client.NodeInfo { return nil } -// Return the online or offline nodes with the given role. -func (c *RolesChanges) list(role client.NodeRole, online bool) []client.NodeInfo { +// Return the online or offline nodes with the given role (optionally) in specific domains. +func (c *RolesChanges) list(role client.NodeRole, online bool, domains map[uint64]bool) []client.NodeInfo { nodes := []client.NodeInfo{} for node, metadata := range c.State { if node.Role == role && metadata != nil == online { - nodes = append(nodes, node) + if domains == nil || (domains != nil && domains[metadata.FailureDomain]) { + nodes = append(nodes, node) + } } } return nodes @@ -256,7 +275,7 @@ func (c *RolesChanges) list(role client.NodeRole, online bool) []client.NodeInfo // Return the number of online or offline nodes with the given role. func (c *RolesChanges) count(role client.NodeRole, online bool) int { - return len(c.list(role, online)) + return len(c.list(role, online, nil)) } // Return a map of the failure domains associated with the @@ -273,6 +292,30 @@ func (c *RolesChanges) failureDomains(nodes []client.NodeInfo) map[uint64]bool { return domains } +// Return a map of all failureDomains with online nodes. +func (c *RolesChanges) allFailureDomains() map[uint64]bool { + domains := map[uint64]bool{} + for _, metadata := range c.State { + if metadata == nil { + continue + } + domains[metadata.FailureDomain] = true + } + return domains +} + +// Return a map of domains that is the "from" minus the "subtract". +func (c *RolesChanges) domainsSubtract(from map[uint64]bool, subtract map[uint64]bool) map[uint64]bool { + domains := map[uint64]bool{} + for fd, val := range from { + _, common := subtract[fd] + if !common { + domains[fd] = val + } + } + return domains +} + // Sort the given candidates according to their failure domain and // weight. Candidates belonging to a failure domain different from the given // domains take precedence. @@ -299,6 +342,49 @@ func (c *RolesChanges) sortCandidates(candidates []client.NodeInfo, domains map[ sort.Slice(candidates, less) } +// Sort the given candidates according demotion priority. Return the sorted +// We prefer to select a candidate from a domain with multiple candidates. +// We prefer to select the candidate with highest weight. +func (c *RolesChanges) sortVoterCandidatesToDemote(candidates []client.NodeInfo) []client.NodeInfo { + domainsMap := make(map[uint64][]client.NodeInfo) + for _, node := range candidates { + id := c.metadata(node).FailureDomain + domain, exists := domainsMap[id] + if !exists { + domain = []client.NodeInfo{node} + } else { + domain = append(domain, node) + } + domainsMap[id] = domain + } + + domains := make([][]client.NodeInfo, 0, len(domainsMap)) + for _, domain := range domainsMap { + domains = append(domains, domain) + } + + sort.Slice(domains, func(i, j int) bool { + return len(domains[i]) > len(domains[j]) + }) + + for _, domain := range domains { + sort.Slice(domain, func(i, j int) bool { + metadata1 := c.metadata(domain[i]) + metadata2 := c.metadata(domain[j]) + + return metadata1.Weight > metadata2.Weight + }) + } + + sortedCandidates := make([]client.NodeInfo, 0, len(candidates)) + for _, domain := range domains { + sortedCandidates = append(sortedCandidates, domain...) + } + + return sortedCandidates + +} + // Return the metadata of the given node, if any. func (c *RolesChanges) metadata(node client.NodeInfo) *client.NodeMetadata { return c.State[node]