Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
feat: support shard affinity
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi committed Oct 3, 2023
1 parent 7095a6a commit a1a3483
Show file tree
Hide file tree
Showing 3 changed files with 222 additions and 55 deletions.
11 changes: 11 additions & 0 deletions server/hash/affinity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

package hash

type Affinity struct {
NumAllowedOtherPartitions uint
}

type AffinityRule struct {
PartitionAffinities map[int]Affinity
}
174 changes: 139 additions & 35 deletions server/hash/consistent_uniform.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//
// This code is licensed under the MIT License.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// Permission is hereby granted, free G charge, to any person obtaining a copy
// of this software and associated documentation files(the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
Expand Down Expand Up @@ -78,6 +78,9 @@ type Config struct {
// distribute keys uniformly. Select a big PartitionCount if you have
// too many keys.
ReplicationFactor int

// The rule describes the partition affinity.
AffinityRule AffinityRule
}

type virtualNode uint64
Expand All @@ -86,14 +89,14 @@ type virtualNode uint64
// consistent as possible while the members has some tiny changes.
type ConsistentUniformHash struct {
config Config
minLoad float64
maxLoad float64
minLoad int
maxLoad int
numPartitions uint32
// Member name => Member
members map[string]Member
// Member name => Member's load
memLoads map[string]float64
// partition id => index of the virtualNode in the sortedRing
// Member name => Partition ID
memPartitions map[string]map[int]struct{}
// Partition ID => index of the virtualNode in the sortedRing
partitionDist map[int]int
// The nodeToMems contains all the virtual nodes
nodeToMems map[virtualNode]Member
Expand Down Expand Up @@ -126,24 +129,32 @@ func BuildConsistentUniformHash(numPartitions int, members []Member, config Conf

numReplicatedNodes := len(members) * config.ReplicationFactor
avgLoad := float64(numPartitions) / float64(len(members))
minLoad := int(math.Floor(avgLoad))
maxLoad := int(math.Ceil(avgLoad))

memPartitions := make(map[string]map[int]struct{}, len(members))
for _, mem := range members {
memPartitions[mem.String()] = make(map[int]struct{}, maxLoad)
}
c := &ConsistentUniformHash{
config: config,
minLoad: math.Floor(avgLoad),
maxLoad: math.Ceil(avgLoad),
minLoad: minLoad,
maxLoad: maxLoad,
numPartitions: uint32(numPartitions),
sortedRing: make([]virtualNode, 0, numReplicatedNodes),
memPartitions: memPartitions,
members: make(map[string]Member, len(members)),
memLoads: make(map[string]float64, len(members)),
partitionDist: make(map[int]int, numPartitions),
nodeToMems: make(map[virtualNode]Member, numReplicatedNodes),
}

c.initializeVirtualNodes(members)
c.distributePartitions()
c.ensureAffinity()
return c, nil
}

func (c *ConsistentUniformHash) distributePartitionWithLoad(partID, virtualNodeIdx int, allowedLoad float64) bool {
func (c *ConsistentUniformHash) distributePartitionWithLoad(partID, virtualNodeIdx int, allowedLoad int) bool {
// A fast path to avoid unnecessary loop.
if allowedLoad == 0 {
return false
Expand All @@ -157,10 +168,12 @@ func (c *ConsistentUniformHash) distributePartitionWithLoad(partID, virtualNodeI
}
i := c.sortedRing[virtualNodeIdx]
member := c.nodeToMems[i]
load := c.memLoads[member.String()]
if load+1 <= allowedLoad {
partitions, ok := c.memPartitions[member.String()]
assert.Assert(ok)

if len(partitions)+1 <= allowedLoad {
c.partitionDist[partID] = virtualNodeIdx
c.memLoads[member.String()]++
partitions[partID] = struct{}{}
return true
}
virtualNodeIdx++
Expand All @@ -171,12 +184,12 @@ func (c *ConsistentUniformHash) distributePartitionWithLoad(partID, virtualNodeI
}

func (c *ConsistentUniformHash) distributePartition(partID, virtualNodeIdx int) {
ok := c.distributePartitionWithLoad(partID, virtualNodeIdx, c.MinLoad())
ok := c.distributePartitionWithLoad(partID, virtualNodeIdx, c.minLoad)
if ok {
return
}

ok = c.distributePartitionWithLoad(partID, virtualNodeIdx, c.MaxLoad())
ok = c.distributePartitionWithLoad(partID, virtualNodeIdx, c.maxLoad)
assert.Assertf(ok, "not enough room to distribute partitions")
}

Expand All @@ -195,12 +208,33 @@ func (c *ConsistentUniformHash) distributePartitions() {
}
}

func (c *ConsistentUniformHash) MinLoad() float64 {
return c.minLoad
func (c *ConsistentUniformHash) MinLoad() uint {
return uint(c.minLoad)
}

func (c *ConsistentUniformHash) MaxLoad() uint {
return uint(c.maxLoad)
}

func (c *ConsistentUniformHash) MaxLoad() float64 {
return c.maxLoad
// LoadDistribution exposes load distribution of members.
func (c *ConsistentUniformHash) LoadDistribution() map[string]uint {
loads := make(map[string]uint, len(c.memPartitions))
for member, partitions := range c.memPartitions {
loads[member] = uint(len(partitions))
}
return loads
}

// GetPartitionOwner returns the owner of the given partition.
func (c *ConsistentUniformHash) GetPartitionOwner(partID int) Member {
virtualNodeIdx, ok := c.partitionDist[partID]
if !ok {
return nil
}
virtualNode := c.sortedRing[virtualNodeIdx]
mem, ok := c.nodeToMems[virtualNode]
assert.Assertf(ok, "member must exist for the virtual node")
return mem
}

func (c *ConsistentUniformHash) initializeVirtualNodes(members []Member) {
Expand All @@ -220,24 +254,94 @@ func (c *ConsistentUniformHash) initializeVirtualNodes(members []Member) {
})
}

// LoadDistribution exposes load distribution of members.
func (c *ConsistentUniformHash) LoadDistribution() map[string]float64 {
// Create a thread-safe copy
res := make(map[string]float64)
for member, load := range c.memLoads {
res[member] = load
func (c *ConsistentUniformHash) ensureAffinity() {
offloadedMems := make(map[string]struct{}, len(c.config.AffinityRule.PartitionAffinities))

for partID, affinity := range c.config.AffinityRule.PartitionAffinities {
vNodeIdx := c.partitionDist[partID]
vNode := c.sortedRing[vNodeIdx]
mem, ok := c.nodeToMems[vNode]
assert.Assert(ok)
offloadedMems[mem.String()] = struct{}{}

allowedLoad := int(affinity.NumAllowedOtherPartitions) + 1
memPartIDs, ok := c.memPartitions[mem.String()]
assert.Assert(ok)
memLoad := len(memPartIDs)
if memLoad > allowedLoad {
c.offloadMember(mem, memPartIDs, partID, allowedLoad, offloadedMems)
}
}
return res
}

// GetPartitionOwner returns the owner of the given partition.
func (c *ConsistentUniformHash) GetPartitionOwner(partID int) Member {
virtualNodeIdx, ok := c.partitionDist[partID]
if !ok {
return nil
// offloadMember tries to offload the given member by moving the its partitions to another members.
func (c *ConsistentUniformHash) offloadMember(mem Member, memPartitions map[int]struct{}, retainedPartID, numAllowedParts int, offloadedMems map[string]struct{}) {
assert.Assertf(numAllowedParts >= 1, "At least the partition itself should be allowed")
partIDsToOffload := make([]int, 0, len(memPartitions)-numAllowedParts)
// The `retainedPartID` must be retained.
numRetainedParts := 1
for partID := range memPartitions {
if partID == retainedPartID {
continue
}

if numRetainedParts < numAllowedParts {
numRetainedParts++
continue
}

partIDsToOffload = append(partIDsToOffload, partID)
}
virtualNode := c.sortedRing[virtualNodeIdx]
mem, ok := c.nodeToMems[virtualNode]
assert.Assertf(ok, "member must exist for the virtual node")
return mem

for _, partID := range partIDsToOffload {
c.offloadPartition(partID, mem, offloadedMems)
}
}

func (c *ConsistentUniformHash) offloadPartition(sourcePartID int, sourceMem Member, blackedMembers map[string]struct{}) {
// Ensure all members' load smaller than the max load as much as possible.
loadUpperBound := c.numPartitions
for load := c.maxLoad; load < int(loadUpperBound); load++ {
if done := c.offloadPartitionWithAllowedLoad(sourcePartID, sourceMem, load, blackedMembers); done {
break
}
}
}

func (c *ConsistentUniformHash) offloadPartitionWithAllowedLoad(sourcePartID int, sourceMem Member, allowedMaxLoad int, blackedMembers map[string]struct{}) bool {
vNodeIdx := c.partitionDist[sourcePartID]
// Skip the first member which must not be the target to move.
for loopCnt := 1; loopCnt < len(c.sortedRing); loopCnt++ {
vNodeIdx++
if vNodeIdx == len(c.sortedRing) {
vNodeIdx = 0
}

vNode := c.sortedRing[vNodeIdx]
mem, ok := c.nodeToMems[vNode]
assert.Assert(ok)

// Check whether this member is blacked.
if _, blacked := blackedMembers[mem.String()]; blacked {
continue
}

memPartitions, ok := c.memPartitions[mem.String()]
assert.Assert(ok)
memLoad := len(memPartitions)
// Check whether the member's load is too allowed.
if memLoad+1 > allowedMaxLoad {
continue
}

// The member meets the requirement, let's move the `sourcePartID` to this member.
memPartitions[sourcePartID] = struct{}{}
c.partitionDist[sourcePartID] = vNodeIdx
sourceMemPartitions, ok := c.memPartitions[sourceMem.String()]
assert.Assert(ok)
delete(sourceMemPartitions, sourcePartID)
return true
}

return false
}
Loading

0 comments on commit a1a3483

Please sign in to comment.