From 57ae2fdf83bdf6030cb5d5d9fb0585933f1ad95f Mon Sep 17 00:00:00 2001 From: Konstantinos Tsakalozos Date: Thu, 24 Aug 2023 12:33:22 +0300 Subject: [PATCH 1/3] Implement role re-assignmest based on failure domains --- app/app.go | 2 +- app/app_test.go | 68 ++++++++++++++++++++++++++++++++++++++++-- app/roles.go | 78 +++++++++++++++++++++++++++++++++++++++---------- 3 files changed, 129 insertions(+), 19 deletions(-) diff --git a/app/app.go b/app/app.go index 45a274d6..f18e1d84 100644 --- a/app/app.go +++ b/app/app.go @@ -364,7 +364,7 @@ func (a *App) Handover(ctx context.Context) error { return fmt.Errorf("cluster servers: %w", err) } changes := a.makeRolesChanges(nodes) - voters := changes.list(client.Voter, true) + voters := changes.list(client.Voter, true, nil) for i, voter := range voters { if voter.Address == a.address { diff --git a/app/app_test.go b/app/app_test.go index e3e298b3..5e49e1f0 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -632,6 +632,71 @@ func TestRolesAdjustment_ReplaceVoterHonorFailureDomain(t *testing.T) { assert.Equal(t, client.Voter, cluster[5].Role) } +// If cluster is imbalanced (all voters in one failure domain), roles get re-shuffled. +func TestRolesAdjustment_ImbalancedFailureDomain(t *testing.T) { + n := 8 + apps := make([]*app.App, n) + cleanups := make([]func(), n) + + for i := 0; i < n; i++ { + addr := fmt.Sprintf("127.0.0.1:900%d", i+1) + // Half of the nodes will go to failure domain 0 and half on failure domain 1 + fd := 0 + if i > n/2 { + fd = 1 + } + options := []app.Option{ + app.WithAddress(addr), + app.WithRolesAdjustmentFrequency(4 * time.Second), + app.WithFailureDomain(uint64(fd)), + } + if i > 0 { + options = append(options, app.WithCluster([]string{"127.0.0.1:9001"})) + } + + // Nodes on failure domain 0 are started first so all voters are initially there. + app, cleanup := newApp(t, options...) + + require.NoError(t, app.Ready(context.Background())) + + apps[i] = app + cleanups[i] = cleanup + } + + for i := 0; i < n; i++ { + defer cleanups[i]() + } + + time.Sleep(18 * time.Second) + + cli, err := apps[0].Leader(context.Background()) + require.NoError(t, err) + defer cli.Close() + + cluster, err := cli.Cluster(context.Background()) + require.NoError(t, err) + + domain := map[int]bool{ + 0: false, + 1: false, + } + for i := 0; i < n; i++ { + // We know we have started half of the nodes in failure domain 0 and the other half on failure domain 1 + fd := 0 + if i > n/2 { + fd = 1 + } + if cluster[i].Role == client.Voter { + domain[fd] = true + } + } + + // All domain must have a voter + for _, voters := range domain { + assert.True(t, voters) + } +} + // If a voter goes offline, another node takes its place. Preference will be // given to candidates with lower weights. func TestRolesAdjustment_ReplaceVoterHonorWeight(t *testing.T) { @@ -1042,7 +1107,6 @@ func TestExternalConnWithTCP(t *testing.T) { assert.Equal(t, client.Voter, cluster[2].Role) } - // TestExternalPipe creates a 3-member cluster using net.Pipe // and ensures the cluster is successfully created, and that the connection is // handled manually. @@ -1062,7 +1126,7 @@ func TestExternalConnWithPipe(t *testing.T) { dialFunc := func(_ context.Context, addr string) (net.Conn, error) { client, server := net.Pipe() - + dialChannels[addr] <- server return client, nil diff --git a/app/roles.go b/app/roles.go index 5b24bdf3..c9979ca9 100644 --- a/app/roles.go +++ b/app/roles.go @@ -56,8 +56,8 @@ func (c *RolesChanges) Assume(id uint64) client.NodeRole { return -1 } - onlineVoters := c.list(client.Voter, true) - onlineStandbys := c.list(client.StandBy, true) + onlineVoters := c.list(client.Voter, true, nil) + onlineStandbys := c.list(client.StandBy, true, nil) // If we have already the desired number of online voters and // stand-bys, there's nothing to do. @@ -94,7 +94,7 @@ func (c *RolesChanges) Handover(id uint64) (client.NodeRole, []client.NodeInfo) // Make a list of all online nodes with the same role and get their // failure domains. - peers := c.list(node.Role, true) + peers := c.list(node.Role, true, nil) for i := range peers { if peers[i].ID == node.ID { peers = append(peers[:i], peers[i+1:]...) @@ -104,12 +104,12 @@ func (c *RolesChanges) Handover(id uint64) (client.NodeRole, []client.NodeInfo) domains := c.failureDomains(peers) // Online spare nodes are always candidates. - candidates := c.list(client.Spare, true) + candidates := c.list(client.Spare, true, nil) // Stand-by nodes are candidates if we need to transfer voting // rights, and they are preferred over spares. if node.Role == client.Voter { - candidates = append(c.list(client.StandBy, true), candidates...) + candidates = append(c.list(client.StandBy, true, nil), candidates...) } if len(candidates) == 0 { @@ -142,10 +142,30 @@ func (c *RolesChanges) Adjust(leader uint64) (client.NodeRole, []client.NodeInfo return -1, nil } - onlineVoters := c.list(client.Voter, true) - onlineStandbys := c.list(client.StandBy, true) - offlineVoters := c.list(client.Voter, false) - offlineStandbys := c.list(client.StandBy, false) + onlineVoters := c.list(client.Voter, true, nil) + onlineStandbys := c.list(client.StandBy, true, nil) + offlineVoters := c.list(client.Voter, false, nil) + offlineStandbys := c.list(client.StandBy, false, nil) + + domainsWithVoters := c.failureDomains(onlineVoters) + allDomains := c.allFailureDomains() + + // If we do not have voters on all failure domains and we have a domain with more than one voters + // we may need to send voters to domains without voters. + if len(domainsWithVoters) < len(allDomains) && len(domainsWithVoters) < len(onlineVoters) { + // Find the domains we need to populate with voters + domainsWithoutVoters := c.domainsSubtract(allDomains, domainsWithVoters) + // Find StandBys in the domains we need to populate + candidates := c.list(client.StandBy, true, domainsWithoutVoters) + // If we do not have StandBys to promote see if we have Spares + if len(candidates) <= 0 { + candidates = c.list(client.Spare, true, domainsWithoutVoters) + } + if len(candidates) > 0 { + c.sortCandidates(candidates, domainsWithoutVoters) + return client.Voter, candidates + } + } // If we have exactly the desired number of voters and stand-bys, and they are all // online, we're good. @@ -156,8 +176,8 @@ func (c *RolesChanges) Adjust(leader uint64) (client.NodeRole, []client.NodeInfo // If we have less online voters than desired, let's try to promote // some other node. if n := len(onlineVoters); n < c.Config.Voters { - candidates := c.list(client.StandBy, true) - candidates = append(candidates, c.list(client.Spare, true)...) + candidates := c.list(client.StandBy, true, nil) + candidates = append(candidates, c.list(client.Spare, true, nil)...) if len(candidates) == 0 { return -1, nil @@ -192,7 +212,7 @@ func (c *RolesChanges) Adjust(leader uint64) (client.NodeRole, []client.NodeInfo // If we have less online stand-bys than desired, let's try to promote // some other node. if n := len(onlineStandbys); n < c.Config.StandBys { - candidates := c.list(client.Spare, true) + candidates := c.list(client.Spare, true, nil) if len(candidates) == 0 { return -1, nil @@ -243,12 +263,14 @@ func (c *RolesChanges) get(id uint64) *client.NodeInfo { return nil } -// Return the online or offline nodes with the given role. -func (c *RolesChanges) list(role client.NodeRole, online bool) []client.NodeInfo { +// Return the online or offline nodes with the given role (optionally) in specific domains. +func (c *RolesChanges) list(role client.NodeRole, online bool, domains map[uint64]bool) []client.NodeInfo { nodes := []client.NodeInfo{} for node, metadata := range c.State { if node.Role == role && metadata != nil == online { - nodes = append(nodes, node) + if domains == nil || (domains != nil && domains[metadata.FailureDomain]) { + nodes = append(nodes, node) + } } } return nodes @@ -256,7 +278,7 @@ func (c *RolesChanges) list(role client.NodeRole, online bool) []client.NodeInfo // Return the number of online or offline nodes with the given role. func (c *RolesChanges) count(role client.NodeRole, online bool) int { - return len(c.list(role, online)) + return len(c.list(role, online, nil)) } // Return a map of the failure domains associated with the @@ -273,6 +295,30 @@ func (c *RolesChanges) failureDomains(nodes []client.NodeInfo) map[uint64]bool { return domains } +// Return a map of all failureDomains with online nodes. +func (c *RolesChanges) allFailureDomains() map[uint64]bool { + domains := map[uint64]bool{} + for _, metadata := range c.State { + if metadata == nil { + continue + } + domains[metadata.FailureDomain] = true + } + return domains +} + +// Return a map of domains that is the "from" minus the "subtract". +func (c *RolesChanges) domainsSubtract(from map[uint64]bool, subtract map[uint64]bool) map[uint64]bool { + domains := map[uint64]bool{} + for fd, val := range from { + _, common := subtract[fd] + if !common { + domains[fd] = val + } + } + return domains +} + // Sort the given candidates according to their failure domain and // weight. Candidates belonging to a failure domain different from the given // domains take precedence. From 3c6fa6649299f9551399570166b9ff6282093d6a Mon Sep 17 00:00:00 2001 From: Konstantinos Tsakalozos Date: Sun, 27 Aug 2023 08:07:14 +0300 Subject: [PATCH 2/3] Domain aware demotion policy --- app/roles.go | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/app/roles.go b/app/roles.go index c9979ca9..cfebbd7c 100644 --- a/app/roles.go +++ b/app/roles.go @@ -185,7 +185,6 @@ func (c *RolesChanges) Adjust(leader uint64) (client.NodeRole, []client.NodeInfo domains := c.failureDomains(onlineVoters) c.sortCandidates(candidates, domains) - return client.Voter, candidates } @@ -201,6 +200,7 @@ func (c *RolesChanges) Adjust(leader uint64) (client.NodeRole, []client.NodeInfo nodes = append(nodes, node) } + c.sortVoterCandidatesToDemote(nodes) return client.Spare, nodes } @@ -345,6 +345,41 @@ func (c *RolesChanges) sortCandidates(candidates []client.NodeInfo, domains map[ sort.Slice(candidates, less) } +// Sort the given candidates according demotion priority. +// We prefer to select a candidate from a domain with multiple candidates. +// We prefer to select the candidate with highest weight. +func (c *RolesChanges) sortVoterCandidatesToDemote(candidates []client.NodeInfo) { + less := func(i, j int) bool { + metadata1 := c.metadata(candidates[i]) + domain1 := map[uint64]bool{ + metadata1.FailureDomain: true, + } + sameDomainAs1 := len(c.list(client.Voter, true, domain1)) + + metadata2 := c.metadata(candidates[j]) + domain2 := map[uint64]bool{ + metadata2.FailureDomain: true, + } + sameDomainAs2 := len(c.list(client.Voter, true, domain2)) + + // If i has more voters on the same domain and j does not, + // then i takes precedence. + if sameDomainAs1 > 1 && sameDomainAs2 <= 1 { + return true + } + + // If j has more voters on the same domain and i does not, + // then j takes precedence. + if sameDomainAs2 > 1 && sameDomainAs1 <= 1 { + return false + } + + return metadata1.Weight > metadata2.Weight + } + + sort.Slice(candidates, less) +} + // Return the metadata of the given node, if any. func (c *RolesChanges) metadata(node client.NodeInfo) *client.NodeMetadata { return c.State[node] From 64d79b35d1bbac96a5e960ea6cb5ea3eabaf1747 Mon Sep 17 00:00:00 2001 From: Konstantinos Tsakalozos Date: Thu, 7 Sep 2023 13:23:33 +0300 Subject: [PATCH 3/3] Better sorting for demotion. Use all candidate nodes. --- app/app_test.go | 7 ++++++ app/roles.go | 67 ++++++++++++++++++++++++++----------------------- 2 files changed, 43 insertions(+), 31 deletions(-) diff --git a/app/app_test.go b/app/app_test.go index 5e49e1f0..da20105d 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -667,6 +667,13 @@ func TestRolesAdjustment_ImbalancedFailureDomain(t *testing.T) { defer cleanups[i]() } + for i := 0; i < n; i++ { + cli, err := apps[i].Client(context.Background()) + require.NoError(t, err) + require.NoError(t, cli.Weight(context.Background(), uint64(n-i))) + defer cli.Close() + } + time.Sleep(18 * time.Second) cli, err := apps[0].Leader(context.Background()) diff --git a/app/roles.go b/app/roles.go index cfebbd7c..15cc6bb1 100644 --- a/app/roles.go +++ b/app/roles.go @@ -155,12 +155,10 @@ func (c *RolesChanges) Adjust(leader uint64) (client.NodeRole, []client.NodeInfo if len(domainsWithVoters) < len(allDomains) && len(domainsWithVoters) < len(onlineVoters) { // Find the domains we need to populate with voters domainsWithoutVoters := c.domainsSubtract(allDomains, domainsWithVoters) - // Find StandBys in the domains we need to populate + // Find nodes in the domains we need to populate candidates := c.list(client.StandBy, true, domainsWithoutVoters) - // If we do not have StandBys to promote see if we have Spares - if len(candidates) <= 0 { - candidates = c.list(client.Spare, true, domainsWithoutVoters) - } + candidates = append(candidates, c.list(client.Spare, true, domainsWithoutVoters)...) + if len(candidates) > 0 { c.sortCandidates(candidates, domainsWithoutVoters) return client.Voter, candidates @@ -200,8 +198,7 @@ func (c *RolesChanges) Adjust(leader uint64) (client.NodeRole, []client.NodeInfo nodes = append(nodes, node) } - c.sortVoterCandidatesToDemote(nodes) - return client.Spare, nodes + return client.Spare, c.sortVoterCandidatesToDemote(nodes) } // If we have offline voters, let's demote one of them. @@ -345,39 +342,47 @@ func (c *RolesChanges) sortCandidates(candidates []client.NodeInfo, domains map[ sort.Slice(candidates, less) } -// Sort the given candidates according demotion priority. +// Sort the given candidates according demotion priority. Return the sorted // We prefer to select a candidate from a domain with multiple candidates. // We prefer to select the candidate with highest weight. -func (c *RolesChanges) sortVoterCandidatesToDemote(candidates []client.NodeInfo) { - less := func(i, j int) bool { - metadata1 := c.metadata(candidates[i]) - domain1 := map[uint64]bool{ - metadata1.FailureDomain: true, +func (c *RolesChanges) sortVoterCandidatesToDemote(candidates []client.NodeInfo) []client.NodeInfo { + domainsMap := make(map[uint64][]client.NodeInfo) + for _, node := range candidates { + id := c.metadata(node).FailureDomain + domain, exists := domainsMap[id] + if !exists { + domain = []client.NodeInfo{node} + } else { + domain = append(domain, node) } - sameDomainAs1 := len(c.list(client.Voter, true, domain1)) + domainsMap[id] = domain + } - metadata2 := c.metadata(candidates[j]) - domain2 := map[uint64]bool{ - metadata2.FailureDomain: true, - } - sameDomainAs2 := len(c.list(client.Voter, true, domain2)) + domains := make([][]client.NodeInfo, 0, len(domainsMap)) + for _, domain := range domainsMap { + domains = append(domains, domain) + } - // If i has more voters on the same domain and j does not, - // then i takes precedence. - if sameDomainAs1 > 1 && sameDomainAs2 <= 1 { - return true - } + sort.Slice(domains, func(i, j int) bool { + return len(domains[i]) > len(domains[j]) + }) - // If j has more voters on the same domain and i does not, - // then j takes precedence. - if sameDomainAs2 > 1 && sameDomainAs1 <= 1 { - return false - } + for _, domain := range domains { + sort.Slice(domain, func(i, j int) bool { + metadata1 := c.metadata(domain[i]) + metadata2 := c.metadata(domain[j]) - return metadata1.Weight > metadata2.Weight + return metadata1.Weight > metadata2.Weight + }) } - sort.Slice(candidates, less) + sortedCandidates := make([]client.NodeInfo, 0, len(candidates)) + for _, domain := range domains { + sortedCandidates = append(sortedCandidates, domain...) + } + + return sortedCandidates + } // Return the metadata of the given node, if any.