From a1a34837ee49ca35ce273e92437412c96005ce0c Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Tue, 3 Oct 2023 17:39:01 +0800 Subject: [PATCH] feat: support shard affinity --- server/hash/affinity.go | 11 ++ server/hash/consistent_uniform.go | 174 ++++++++++++++++++++----- server/hash/consistent_uniform_test.go | 92 ++++++++++--- 3 files changed, 222 insertions(+), 55 deletions(-) create mode 100644 server/hash/affinity.go diff --git a/server/hash/affinity.go b/server/hash/affinity.go new file mode 100644 index 00000000..a701e56b --- /dev/null +++ b/server/hash/affinity.go @@ -0,0 +1,11 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +package hash + +type Affinity struct { + NumAllowedOtherPartitions uint +} + +type AffinityRule struct { + PartitionAffinities map[int]Affinity +} diff --git a/server/hash/consistent_uniform.go b/server/hash/consistent_uniform.go index d629b4a7..09c9137a 100644 --- a/server/hash/consistent_uniform.go +++ b/server/hash/consistent_uniform.go @@ -5,7 +5,7 @@ // // This code is licensed under the MIT License. // -// Permission is hereby granted, free of charge, to any person obtaining a copy +// Permission is hereby granted, free G charge, to any person obtaining a copy // of this software and associated documentation files(the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and / or sell @@ -78,6 +78,9 @@ type Config struct { // distribute keys uniformly. Select a big PartitionCount if you have // too many keys. ReplicationFactor int + + // The rule describes the partition affinity. + AffinityRule AffinityRule } type virtualNode uint64 @@ -86,14 +89,14 @@ type virtualNode uint64 // consistent as possible while the members has some tiny changes. type ConsistentUniformHash struct { config Config - minLoad float64 - maxLoad float64 + minLoad int + maxLoad int numPartitions uint32 // Member name => Member members map[string]Member - // Member name => Member's load - memLoads map[string]float64 - // partition id => index of the virtualNode in the sortedRing + // Member name => Partition ID + memPartitions map[string]map[int]struct{} + // Partition ID => index of the virtualNode in the sortedRing partitionDist map[int]int // The nodeToMems contains all the virtual nodes nodeToMems map[virtualNode]Member @@ -126,24 +129,32 @@ func BuildConsistentUniformHash(numPartitions int, members []Member, config Conf numReplicatedNodes := len(members) * config.ReplicationFactor avgLoad := float64(numPartitions) / float64(len(members)) + minLoad := int(math.Floor(avgLoad)) + maxLoad := int(math.Ceil(avgLoad)) + + memPartitions := make(map[string]map[int]struct{}, len(members)) + for _, mem := range members { + memPartitions[mem.String()] = make(map[int]struct{}, maxLoad) + } c := &ConsistentUniformHash{ config: config, - minLoad: math.Floor(avgLoad), - maxLoad: math.Ceil(avgLoad), + minLoad: minLoad, + maxLoad: maxLoad, numPartitions: uint32(numPartitions), sortedRing: make([]virtualNode, 0, numReplicatedNodes), + memPartitions: memPartitions, members: make(map[string]Member, len(members)), - memLoads: make(map[string]float64, len(members)), partitionDist: make(map[int]int, numPartitions), nodeToMems: make(map[virtualNode]Member, numReplicatedNodes), } c.initializeVirtualNodes(members) c.distributePartitions() + c.ensureAffinity() return c, nil } -func (c *ConsistentUniformHash) distributePartitionWithLoad(partID, virtualNodeIdx int, allowedLoad float64) bool { +func (c *ConsistentUniformHash) distributePartitionWithLoad(partID, virtualNodeIdx int, allowedLoad int) bool { // A fast path to avoid unnecessary loop. if allowedLoad == 0 { return false @@ -157,10 +168,12 @@ func (c *ConsistentUniformHash) distributePartitionWithLoad(partID, virtualNodeI } i := c.sortedRing[virtualNodeIdx] member := c.nodeToMems[i] - load := c.memLoads[member.String()] - if load+1 <= allowedLoad { + partitions, ok := c.memPartitions[member.String()] + assert.Assert(ok) + + if len(partitions)+1 <= allowedLoad { c.partitionDist[partID] = virtualNodeIdx - c.memLoads[member.String()]++ + partitions[partID] = struct{}{} return true } virtualNodeIdx++ @@ -171,12 +184,12 @@ func (c *ConsistentUniformHash) distributePartitionWithLoad(partID, virtualNodeI } func (c *ConsistentUniformHash) distributePartition(partID, virtualNodeIdx int) { - ok := c.distributePartitionWithLoad(partID, virtualNodeIdx, c.MinLoad()) + ok := c.distributePartitionWithLoad(partID, virtualNodeIdx, c.minLoad) if ok { return } - ok = c.distributePartitionWithLoad(partID, virtualNodeIdx, c.MaxLoad()) + ok = c.distributePartitionWithLoad(partID, virtualNodeIdx, c.maxLoad) assert.Assertf(ok, "not enough room to distribute partitions") } @@ -195,12 +208,33 @@ func (c *ConsistentUniformHash) distributePartitions() { } } -func (c *ConsistentUniformHash) MinLoad() float64 { - return c.minLoad +func (c *ConsistentUniformHash) MinLoad() uint { + return uint(c.minLoad) +} + +func (c *ConsistentUniformHash) MaxLoad() uint { + return uint(c.maxLoad) } -func (c *ConsistentUniformHash) MaxLoad() float64 { - return c.maxLoad +// LoadDistribution exposes load distribution of members. +func (c *ConsistentUniformHash) LoadDistribution() map[string]uint { + loads := make(map[string]uint, len(c.memPartitions)) + for member, partitions := range c.memPartitions { + loads[member] = uint(len(partitions)) + } + return loads +} + +// GetPartitionOwner returns the owner of the given partition. +func (c *ConsistentUniformHash) GetPartitionOwner(partID int) Member { + virtualNodeIdx, ok := c.partitionDist[partID] + if !ok { + return nil + } + virtualNode := c.sortedRing[virtualNodeIdx] + mem, ok := c.nodeToMems[virtualNode] + assert.Assertf(ok, "member must exist for the virtual node") + return mem } func (c *ConsistentUniformHash) initializeVirtualNodes(members []Member) { @@ -220,24 +254,94 @@ func (c *ConsistentUniformHash) initializeVirtualNodes(members []Member) { }) } -// LoadDistribution exposes load distribution of members. -func (c *ConsistentUniformHash) LoadDistribution() map[string]float64 { - // Create a thread-safe copy - res := make(map[string]float64) - for member, load := range c.memLoads { - res[member] = load +func (c *ConsistentUniformHash) ensureAffinity() { + offloadedMems := make(map[string]struct{}, len(c.config.AffinityRule.PartitionAffinities)) + + for partID, affinity := range c.config.AffinityRule.PartitionAffinities { + vNodeIdx := c.partitionDist[partID] + vNode := c.sortedRing[vNodeIdx] + mem, ok := c.nodeToMems[vNode] + assert.Assert(ok) + offloadedMems[mem.String()] = struct{}{} + + allowedLoad := int(affinity.NumAllowedOtherPartitions) + 1 + memPartIDs, ok := c.memPartitions[mem.String()] + assert.Assert(ok) + memLoad := len(memPartIDs) + if memLoad > allowedLoad { + c.offloadMember(mem, memPartIDs, partID, allowedLoad, offloadedMems) + } } - return res } -// GetPartitionOwner returns the owner of the given partition. -func (c *ConsistentUniformHash) GetPartitionOwner(partID int) Member { - virtualNodeIdx, ok := c.partitionDist[partID] - if !ok { - return nil +// offloadMember tries to offload the given member by moving the its partitions to another members. +func (c *ConsistentUniformHash) offloadMember(mem Member, memPartitions map[int]struct{}, retainedPartID, numAllowedParts int, offloadedMems map[string]struct{}) { + assert.Assertf(numAllowedParts >= 1, "At least the partition itself should be allowed") + partIDsToOffload := make([]int, 0, len(memPartitions)-numAllowedParts) + // The `retainedPartID` must be retained. + numRetainedParts := 1 + for partID := range memPartitions { + if partID == retainedPartID { + continue + } + + if numRetainedParts < numAllowedParts { + numRetainedParts++ + continue + } + + partIDsToOffload = append(partIDsToOffload, partID) } - virtualNode := c.sortedRing[virtualNodeIdx] - mem, ok := c.nodeToMems[virtualNode] - assert.Assertf(ok, "member must exist for the virtual node") - return mem + + for _, partID := range partIDsToOffload { + c.offloadPartition(partID, mem, offloadedMems) + } +} + +func (c *ConsistentUniformHash) offloadPartition(sourcePartID int, sourceMem Member, blackedMembers map[string]struct{}) { + // Ensure all members' load smaller than the max load as much as possible. + loadUpperBound := c.numPartitions + for load := c.maxLoad; load < int(loadUpperBound); load++ { + if done := c.offloadPartitionWithAllowedLoad(sourcePartID, sourceMem, load, blackedMembers); done { + break + } + } +} + +func (c *ConsistentUniformHash) offloadPartitionWithAllowedLoad(sourcePartID int, sourceMem Member, allowedMaxLoad int, blackedMembers map[string]struct{}) bool { + vNodeIdx := c.partitionDist[sourcePartID] + // Skip the first member which must not be the target to move. + for loopCnt := 1; loopCnt < len(c.sortedRing); loopCnt++ { + vNodeIdx++ + if vNodeIdx == len(c.sortedRing) { + vNodeIdx = 0 + } + + vNode := c.sortedRing[vNodeIdx] + mem, ok := c.nodeToMems[vNode] + assert.Assert(ok) + + // Check whether this member is blacked. + if _, blacked := blackedMembers[mem.String()]; blacked { + continue + } + + memPartitions, ok := c.memPartitions[mem.String()] + assert.Assert(ok) + memLoad := len(memPartitions) + // Check whether the member's load is too allowed. + if memLoad+1 > allowedMaxLoad { + continue + } + + // The member meets the requirement, let's move the `sourcePartID` to this member. + memPartitions[sourcePartID] = struct{}{} + c.partitionDist[sourcePartID] = vNodeIdx + sourceMemPartitions, ok := c.memPartitions[sourceMem.String()] + assert.Assert(ok) + delete(sourceMemPartitions, sourcePartID) + return true + } + + return false } diff --git a/server/hash/consistent_uniform_test.go b/server/hash/consistent_uniform_test.go index d4ffd569..f2323e9d 100644 --- a/server/hash/consistent_uniform_test.go +++ b/server/hash/consistent_uniform_test.go @@ -32,34 +32,36 @@ import ( "github.com/stretchr/testify/assert" ) -func newConfig() Config { - return Config{ - ReplicationFactor: 127, - Hasher: hasher{}, - } -} - type testMember string func (tm testMember) String() string { return string(tm) } -type hasher struct{} +type testHasher struct{} -func (hs hasher) Sum64(data []byte) uint64 { +func (hs testHasher) Sum64(data []byte) uint64 { h := fnv.New64() _, _ = h.Write(data) return h.Sum64() } -func checkUniform(t *testing.T, numPartitions, numMembers int) { +func buildTestMembers(n int) []Member { members := []Member{} - for i := 0; i < numMembers; i++ { + for i := 0; i < n; i++ { member := testMember(fmt.Sprintf("node-%d", i)) members = append(members, member) } - cfg := newConfig() + + return members +} + +func checkUniform(t *testing.T, numPartitions, numMembers int) { + members := buildTestMembers(numMembers) + cfg := Config{ + ReplicationFactor: 127, + Hasher: testHasher{}, + } c, err := BuildConsistentUniformHash(numPartitions, members, cfg) assert.NoError(t, err) @@ -80,7 +82,7 @@ func checkUniform(t *testing.T, numPartitions, numMembers int) { func TestZeroReplicationFactor(t *testing.T) { cfg := Config{ ReplicationFactor: 0, - Hasher: hasher{}, + Hasher: testHasher{}, } _, err := BuildConsistentUniformHash(0, []Member{testMember("")}, cfg) assert.Error(t, err) @@ -98,7 +100,7 @@ func TestEmptyHasher(t *testing.T) { func TestEmptyMembers(t *testing.T) { cfg := Config{ ReplicationFactor: 127, - Hasher: hasher{}, + Hasher: testHasher{}, } _, err := BuildConsistentUniformHash(0, []Member{}, cfg) assert.Error(t, err) @@ -107,7 +109,7 @@ func TestEmptyMembers(t *testing.T) { func TestNegativeNumPartitions(t *testing.T) { cfg := Config{ ReplicationFactor: 127, - Hasher: hasher{}, + Hasher: testHasher{}, } _, err := BuildConsistentUniformHash(-1, []Member{testMember("")}, cfg) assert.Error(t, err) @@ -137,12 +139,11 @@ func computeDiffBetweenDist(t *testing.T, oldDist, newDist map[int]string) int { } func checkConsistent(t *testing.T, numPartitions, numMembers, maxDiff int) { - members := make([]Member, 0, numMembers) - for i := 0; i < numMembers; i++ { - member := testMember(fmt.Sprintf("node-%d", i)) - members = append(members, member) + members := buildTestMembers(numMembers) + cfg := Config{ + ReplicationFactor: 127, + Hasher: testHasher{}, } - cfg := newConfig() c, err := BuildConsistentUniformHash(numPartitions, members, cfg) assert.NoError(t, err) @@ -192,3 +193,54 @@ func TestConsistency(t *testing.T) { checkConsistent(t, 256, 30, 70) checkConsistent(t, 17, 5, 7) } + +func checkAffinity(t *testing.T, numPartitions, numMembers int, rule AffinityRule, revisedMaxLoad uint) { + members := buildTestMembers(numMembers) + cfg := Config{ + ReplicationFactor: 127, + Hasher: testHasher{}, + AffinityRule: rule, + } + c, err := BuildConsistentUniformHash(numPartitions, members, cfg) + assert.NoError(t, err) + + minLoad := c.MinLoad() + maxLoad := c.MaxLoad() + if maxLoad < revisedMaxLoad { + maxLoad = revisedMaxLoad + } + loadDistribution := c.LoadDistribution() + for _, mem := range members { + load, ok := loadDistribution[mem.String()] + if !ok { + assert.Equal(t, 0.0, minLoad) + } + assert.LessOrEqual(t, load, maxLoad) + } + + for partID, affinity := range rule.PartitionAffinities { + mem := c.GetPartitionOwner(partID) + load := loadDistribution[mem.String()] + allowedMaxLoad := affinity.NumAllowedOtherPartitions + 1 + assert.LessOrEqual(t, load, allowedMaxLoad) + } +} + +func TestAffinity(t *testing.T) { + rule := AffinityRule{ + PartitionAffinities: map[int]Affinity{}, + } + checkAffinity(t, 120, 72, rule, 0) + checkAffinity(t, 0, 72, rule, 0) + + rule = AffinityRule{ + PartitionAffinities: map[int]Affinity{ + 0: {0}, + 1: {0}, + 2: {120}, + }, + } + checkAffinity(t, 120, 72, rule, 0) + checkAffinity(t, 3, 72, rule, 0) + checkAffinity(t, 72, 72, rule, 0) +}