diff --git a/server/coordinator/scheduler/nodepicker/hash/affinity.go b/server/coordinator/scheduler/nodepicker/hash/affinity.go index a701e56b..1d651ab2 100644 --- a/server/coordinator/scheduler/nodepicker/hash/affinity.go +++ b/server/coordinator/scheduler/nodepicker/hash/affinity.go @@ -2,10 +2,11 @@ package hash -type Affinity struct { +type PartitionAffinity struct { + PartitionID int NumAllowedOtherPartitions uint } type AffinityRule struct { - PartitionAffinities map[int]Affinity + PartitionAffinities []PartitionAffinity } diff --git a/server/coordinator/scheduler/nodepicker/hash/consistent_uniform.go b/server/coordinator/scheduler/nodepicker/hash/consistent_uniform.go index ffa52440..b391b58d 100644 --- a/server/coordinator/scheduler/nodepicker/hash/consistent_uniform.go +++ b/server/coordinator/scheduler/nodepicker/hash/consistent_uniform.go @@ -40,6 +40,7 @@ import ( "github.com/CeresDB/ceresmeta/pkg/assert" "github.com/CeresDB/ceresmeta/pkg/log" "go.uber.org/zap" + "golang.org/x/exp/slices" ) // TODO: Modify these error definitions to coderr. @@ -141,6 +142,11 @@ func BuildConsistentUniformHash(numPartitions int, members []Member, config Conf for _, mem := range members { memPartitions[mem.String()] = make(map[int]struct{}, maxLoad) } + + // Sort the affinity rule to ensure consistency. + sort.Slice(config.AffinityRule.PartitionAffinities, func(i, j int) bool { + return config.AffinityRule.PartitionAffinities[i].PartitionID < config.AffinityRule.PartitionAffinities[j].PartitionID + }) c := &ConsistentUniformHash{ config: config, minLoad: minLoad, @@ -309,6 +315,7 @@ func (c *ConsistentUniformHash) offloadMember(mem Member, memPartitions map[int] partIDsToOffload = append(partIDsToOffload, partID) } + slices.Sort(partIDsToOffload) for _, partID := range partIDsToOffload { c.offloadPartition(partID, mem, offloadedMems) } diff --git a/server/coordinator/scheduler/nodepicker/hash/consistent_uniform_test.go b/server/coordinator/scheduler/nodepicker/hash/consistent_uniform_test.go index c2250d71..a9c4ab97 100644 --- a/server/coordinator/scheduler/nodepicker/hash/consistent_uniform_test.go +++ b/server/coordinator/scheduler/nodepicker/hash/consistent_uniform_test.go @@ -237,23 +237,57 @@ func checkAffinity(t *testing.T, numPartitions, numMembers int, rule AffinityRul allowedMaxLoad := affinity.NumAllowedOtherPartitions + 1 assert.LessOrEqual(t, load, allowedMaxLoad) } + + distribution := make(map[int]string, numPartitions) + for partID := 0; partID < numPartitions; partID++ { + distribution[partID] = c.GetPartitionOwner(partID).String() + } + { + newMembers := make([]Member, 0, numMembers) + for i := numMembers - 1; i >= 0; i-- { + newMembers = append(newMembers, members[i]) + } + c, err := BuildConsistentUniformHash(numPartitions, newMembers, cfg) + assert.NoError(t, err) + + newDistribution := make(map[int]string, numPartitions) + for partID := 0; partID < numPartitions; partID++ { + newDistribution[partID] = c.GetPartitionOwner(partID).String() + } + numDiffs := computeDiffBetweenDist(t, distribution, newDistribution) + assert.Equal(t, numDiffs, 0) + } } func TestAffinity(t *testing.T) { rule := AffinityRule{ - PartitionAffinities: map[int]Affinity{}, + PartitionAffinities: []PartitionAffinity{}, } checkAffinity(t, 120, 72, rule, 0) checkAffinity(t, 0, 72, rule, 0) rule = AffinityRule{ - PartitionAffinities: map[int]Affinity{ - 0: {0}, - 1: {0}, - 2: {120}, + PartitionAffinities: []PartitionAffinity{ + {0, 0}, + {1, 0}, + {2, 120}, }, } - checkAffinity(t, 120, 72, rule, 0) checkAffinity(t, 3, 72, rule, 0) checkAffinity(t, 72, 72, rule, 0) + + rule = AffinityRule{ + PartitionAffinities: []PartitionAffinity{ + {7, 0}, + {31, 0}, + {41, 0}, + {45, 0}, + {58, 0}, + {81, 0}, + {87, 0}, + {88, 0}, + {89, 0}, + }, + } + checkAffinity(t, 128, 72, rule, 0) } diff --git a/server/coordinator/scheduler/nodepicker/node_picker.go b/server/coordinator/scheduler/nodepicker/node_picker.go index 4e956a12..c5a44e75 100644 --- a/server/coordinator/scheduler/nodepicker/node_picker.go +++ b/server/coordinator/scheduler/nodepicker/node_picker.go @@ -24,11 +24,13 @@ type Config struct { } func (c Config) hashAffinityRule() hash.AffinityRule { - affinities := make(map[int]hash.Affinity, len(c.ShardAffinityRule)) + affinities := make([]hash.PartitionAffinity, 0, len(c.ShardAffinityRule)) for shardID, affinity := range c.ShardAffinityRule { - affinities[int(shardID)] = hash.Affinity{ + partitionID := int(shardID) + affinities = append(affinities, hash.PartitionAffinity{ + PartitionID: partitionID, NumAllowedOtherPartitions: affinity.NumAllowedOtherShards, - } + }) } return hash.AffinityRule{