Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
fix: inconsistency caused by the affinity adjustion
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi committed Oct 8, 2023
1 parent 7a3fe18 commit e902ca5
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 11 deletions.
5 changes: 3 additions & 2 deletions server/coordinator/scheduler/nodepicker/hash/affinity.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

package hash

type Affinity struct {
type PartitionAffinity struct {
PartitionID int
NumAllowedOtherPartitions uint
}

type AffinityRule struct {
PartitionAffinities map[int]Affinity
PartitionAffinities []PartitionAffinity
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/CeresDB/ceresmeta/pkg/assert"
"github.com/CeresDB/ceresmeta/pkg/log"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)

// TODO: Modify these error definitions to coderr.
Expand Down Expand Up @@ -141,6 +142,11 @@ func BuildConsistentUniformHash(numPartitions int, members []Member, config Conf
for _, mem := range members {
memPartitions[mem.String()] = make(map[int]struct{}, maxLoad)
}

// Sort the affinity rule to ensure consistency.
sort.Slice(config.AffinityRule.PartitionAffinities, func(i, j int) bool {
return config.AffinityRule.PartitionAffinities[i].PartitionID < config.AffinityRule.PartitionAffinities[j].PartitionID
})
c := &ConsistentUniformHash{
config: config,
minLoad: minLoad,
Expand Down Expand Up @@ -309,6 +315,7 @@ func (c *ConsistentUniformHash) offloadMember(mem Member, memPartitions map[int]
partIDsToOffload = append(partIDsToOffload, partID)
}

slices.Sort(partIDsToOffload)
for _, partID := range partIDsToOffload {
c.offloadPartition(partID, mem, offloadedMems)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,23 +237,57 @@ func checkAffinity(t *testing.T, numPartitions, numMembers int, rule AffinityRul
allowedMaxLoad := affinity.NumAllowedOtherPartitions + 1
assert.LessOrEqual(t, load, allowedMaxLoad)
}

distribution := make(map[int]string, numPartitions)
for partID := 0; partID < numPartitions; partID++ {
distribution[partID] = c.GetPartitionOwner(partID).String()
}
{
newMembers := make([]Member, 0, numMembers)
for i := numMembers - 1; i >= 0; i-- {
newMembers = append(newMembers, members[i])
}
c, err := BuildConsistentUniformHash(numPartitions, newMembers, cfg)
assert.NoError(t, err)

newDistribution := make(map[int]string, numPartitions)
for partID := 0; partID < numPartitions; partID++ {
newDistribution[partID] = c.GetPartitionOwner(partID).String()
}
numDiffs := computeDiffBetweenDist(t, distribution, newDistribution)
assert.Equal(t, numDiffs, 0)
}
}

func TestAffinity(t *testing.T) {
rule := AffinityRule{
PartitionAffinities: map[int]Affinity{},
PartitionAffinities: []PartitionAffinity{},
}
checkAffinity(t, 120, 72, rule, 0)
checkAffinity(t, 0, 72, rule, 0)

rule = AffinityRule{
PartitionAffinities: map[int]Affinity{
0: {0},
1: {0},
2: {120},
PartitionAffinities: []PartitionAffinity{
{0, 0},
{1, 0},
{2, 120},
},
}
checkAffinity(t, 120, 72, rule, 0)
checkAffinity(t, 3, 72, rule, 0)
checkAffinity(t, 72, 72, rule, 0)

rule = AffinityRule{
PartitionAffinities: []PartitionAffinity{
{7, 0},
{31, 0},
{41, 0},
{45, 0},
{58, 0},
{81, 0},
{87, 0},
{88, 0},
{89, 0},
},
}
checkAffinity(t, 128, 72, rule, 0)
}
8 changes: 5 additions & 3 deletions server/coordinator/scheduler/nodepicker/node_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ type Config struct {
}

func (c Config) hashAffinityRule() hash.AffinityRule {
affinities := make(map[int]hash.Affinity, len(c.ShardAffinityRule))
affinities := make([]hash.PartitionAffinity, 0, len(c.ShardAffinityRule))
for shardID, affinity := range c.ShardAffinityRule {
affinities[int(shardID)] = hash.Affinity{
partitionID := int(shardID)
affinities = append(affinities, hash.PartitionAffinity{
PartitionID: partitionID,
NumAllowedOtherPartitions: affinity.NumAllowedOtherShards,
}
})
}

return hash.AffinityRule{
Expand Down

0 comments on commit e902ca5

Please sign in to comment.