From 0da63b008c83a18af360640a5034e5cd5e89c0c7 Mon Sep 17 00:00:00 2001 From: "allen.wq" Date: Wed, 27 May 2020 11:53:58 +0800 Subject: [PATCH 1/2] add hash circler locator Signed-off-by: allen.wq --- dfget/locator/hashcircler_locator.go | 191 ++++++++++++++++++++++ dfget/locator/hashcircler_locator_test.go | 125 ++++++++++++++ pkg/algorithm/algorithm.go | 22 +++ pkg/algorithm/algorithm_test.go | 36 ++++ pkg/hashcircler/hash_circler.go | 167 +++++++++++++++++++ pkg/hashcircler/hash_circler_test.go | 174 ++++++++++++++++++++ 6 files changed, 715 insertions(+) create mode 100644 dfget/locator/hashcircler_locator.go create mode 100644 dfget/locator/hashcircler_locator_test.go create mode 100644 pkg/hashcircler/hash_circler.go create mode 100644 pkg/hashcircler/hash_circler_test.go diff --git a/dfget/locator/hashcircler_locator.go b/dfget/locator/hashcircler_locator.go new file mode 100644 index 000000000..94ad9a738 --- /dev/null +++ b/dfget/locator/hashcircler_locator.go @@ -0,0 +1,191 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package locator + +import ( + "context" + "fmt" + "sort" + "time" + + "github.com/dragonflyoss/Dragonfly/dfget/config" + "github.com/dragonflyoss/Dragonfly/pkg/algorithm" + "github.com/dragonflyoss/Dragonfly/pkg/hashcircler" + "github.com/dragonflyoss/Dragonfly/pkg/netutils" + "github.com/dragonflyoss/Dragonfly/pkg/queue" + + "github.com/sirupsen/logrus" +) + +const ( + addEv = "add" + deleteEv = "delete" +) + +type SuperNodeEvent struct { + evType string + node string +} + +func NewEnableEvent(node string) *SuperNodeEvent { + return &SuperNodeEvent{ + evType: addEv, + node: node, + } +} + +func NewDisableEvent(node string) *SuperNodeEvent { + return &SuperNodeEvent{ + evType: deleteEv, + node: node, + } +} + +// hashCirclerLocator is an implementation of SupernodeLocator. And it provides ability to select a supernode +// by input key. It allows some supernodes disabled, on this condition the disable supernode will not be selected. +type hashCirclerLocator struct { + hc hashcircler.HashCircler + nodes []string + groupName string + group *SupernodeGroup + + // evQueue will puts/polls SuperNodeEvent to disable/enable supernode. + evQueue queue.Queue +} + +func NewHashCirclerLocator(groupName string, nodes []string, eventQueue queue.Queue) (SupernodeLocator, error) { + nodes = algorithm.DedupStringArr(nodes) + if len(nodes) == 0 { + return nil, fmt.Errorf("nodes should not be nil") + } + + sort.Strings(nodes) + + group := &SupernodeGroup{ + Name: groupName, + Nodes: []*Supernode{}, + Infos: make(map[string]string), + } + keys := []string{} + for _, node := range nodes { + ip, port := netutils.GetIPAndPortFromNode(node, config.DefaultSupernodePort) + if ip == "" { + continue + } + supernode := &Supernode{ + Schema: config.DefaultSupernodeSchema, + IP: ip, + Port: port, + GroupName: groupName, + } + + group.Nodes = append(group.Nodes, supernode) + keys = append(keys, supernode.String()) + } + + hc, err := hashcircler.NewConsistentHashCircler(keys, nil) + if err != nil { + return nil, err + } + + h := &hashCirclerLocator{ + hc: hc, + evQueue: eventQueue, + groupName: groupName, + group: group, + } + + go h.eventLoop(context.Background()) + + return h, nil +} + +func (h *hashCirclerLocator) Get() *Supernode { + // not implementation + return nil +} + +func (h *hashCirclerLocator) Next() *Supernode { + // not implementation + return nil +} + +func (h *hashCirclerLocator) Select(key interface{}) *Supernode { + s, err := h.hc.Hash(key.(string)) + if err != nil { + logrus.Errorf("failed to get supernode: %v", err) + return nil + } + + for _, sp := range h.group.Nodes { + if s == sp.String() { + return sp + } + } + + return nil +} + +func (h *hashCirclerLocator) GetGroup(name string) *SupernodeGroup { + if h.group == nil || h.group.Name != name { + return nil + } + + return h.group +} + +func (h *hashCirclerLocator) All() []*SupernodeGroup { + return []*SupernodeGroup{h.group} +} + +func (h *hashCirclerLocator) Size() int { + return len(h.group.Nodes) +} + +func (h *hashCirclerLocator) Report(node string, metrics *SupernodeMetrics) { + return +} + +func (h *hashCirclerLocator) Refresh() bool { + return true +} + +func (h *hashCirclerLocator) eventLoop(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + default: + } + + if ev, ok := h.evQueue.PollTimeout(time.Second); ok { + h.handleEvent(ev.(*SuperNodeEvent)) + } + } +} + +func (h *hashCirclerLocator) handleEvent(ev *SuperNodeEvent) { + switch ev.evType { + case addEv: + h.hc.Add(ev.node) + case deleteEv: + h.hc.Delete(ev.node) + default: + } + + return +} diff --git a/dfget/locator/hashcircler_locator_test.go b/dfget/locator/hashcircler_locator_test.go new file mode 100644 index 000000000..4c2d57082 --- /dev/null +++ b/dfget/locator/hashcircler_locator_test.go @@ -0,0 +1,125 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package locator + +import ( + "time" + + "github.com/dragonflyoss/Dragonfly/pkg/queue" + + "github.com/go-check/check" +) + +type hashCirclerLocatorTestSuite struct { +} + +func init() { + check.Suite(&hashCirclerLocatorTestSuite{}) +} + +var testGroupName1 = "test-group1" + +func (s *hashCirclerLocatorTestSuite) TestHashCirclerLocator(c *check.C) { + evQ := queue.NewQueue(0) + nodes := []string{"1.1.1.1:8002", "2.2.2.2:8002", "3.3.3.3:8002"} + hl, err := NewHashCirclerLocator(testGroupName1, nodes, evQ) + c.Assert(err, check.IsNil) + + c.Assert(hl.Get(), check.IsNil) + c.Assert(hl.Next(), check.IsNil) + + groups := hl.All() + c.Assert(len(groups), check.Equals, 1) + c.Assert(len(groups[0].Nodes), check.Equals, 3) + c.Assert(groups[0].Nodes[0].String(), check.Equals, nodes[0]) + c.Assert(groups[0].Nodes[1].String(), check.Equals, nodes[1]) + c.Assert(groups[0].Nodes[2].String(), check.Equals, nodes[2]) + + keys := []string{"x", "y", "z", "a", "b", "c", "m", "n", "p", "q", "j", "k", "i", "e", "f", "g"} + originSp := make([]string, len(keys)) + + for i, k := range keys { + sp := hl.Select(k) + c.Assert(sp, check.NotNil) + originSp[i] = sp.String() + } + + // select again, the supernode should be equal + for i, k := range keys { + sp := hl.Select(k) + c.Assert(sp, check.NotNil) + c.Assert(originSp[i], check.Equals, sp.String()) + } + + // disable nodes[0] + evQ.Put(NewDisableEvent(nodes[0])) + time.Sleep(time.Second * 2) + // select again, if originSp is not nodes[0], it should not be changed. + for i, k := range keys { + sp := hl.Select(k) + c.Assert(sp, check.NotNil) + if originSp[i] == nodes[0] { + c.Assert(originSp[i], check.Not(check.Equals), sp.String()) + continue + } + + c.Assert(originSp[i], check.Equals, sp.String()) + } + + // disable nodes[1] + evQ.Put(NewDisableEvent(nodes[1])) + time.Sleep(time.Second * 2) + // select again, all select node should be nodes[2] + for _, k := range keys { + sp := hl.Select(k) + c.Assert(sp, check.NotNil) + c.Assert(nodes[2], check.Equals, sp.String()) + } + + // enable nodes[0], disable nodes[2] + evQ.Put(NewDisableEvent(nodes[2])) + evQ.Put(NewEnableEvent(nodes[0])) + time.Sleep(time.Second * 2) + for _, k := range keys { + sp := hl.Select(k) + c.Assert(sp, check.NotNil) + c.Assert(nodes[0], check.Equals, sp.String()) + } + + // enable nodes[1] + evQ.Put(NewEnableEvent(nodes[1])) + time.Sleep(time.Second * 2) + for i, k := range keys { + sp := hl.Select(k) + c.Assert(sp, check.NotNil) + if originSp[i] == nodes[2] { + c.Assert(originSp[i], check.Not(check.Equals), sp.String()) + continue + } + + c.Assert(originSp[i], check.Equals, sp.String()) + } + + // enable nodes[2], select node should be equal with origin one + evQ.Put(NewEnableEvent(nodes[2])) + time.Sleep(time.Second * 2) + for i, k := range keys { + sp := hl.Select(k) + c.Assert(sp, check.NotNil) + c.Assert(originSp[i], check.Equals, sp.String()) + } +} diff --git a/pkg/algorithm/algorithm.go b/pkg/algorithm/algorithm.go index 25445ed3e..18ee28412 100644 --- a/pkg/algorithm/algorithm.go +++ b/pkg/algorithm/algorithm.go @@ -18,6 +18,7 @@ package algorithm import ( "math/rand" + "sort" "time" ) @@ -100,3 +101,24 @@ func GCD(x, y int) int { } return x } + +// DedupStringArr removes duplicate string in array. +func DedupStringArr(input []string) []string { + if len(input) == 0 { + return []string{} + } + + out := make([]string, len(input)) + copy(out, input) + sort.Strings(out) + + idx := 0 + for i := 1; i < len(input); i++ { + if out[idx] != out[i] { + idx++ + out[idx] = out[i] + } + } + + return out[:idx+1] +} diff --git a/pkg/algorithm/algorithm_test.go b/pkg/algorithm/algorithm_test.go index 1fa807ea0..ba296fbee 100644 --- a/pkg/algorithm/algorithm_test.go +++ b/pkg/algorithm/algorithm_test.go @@ -18,6 +18,7 @@ package algorithm import ( "math/rand" + "sort" "testing" "github.com/stretchr/testify/suite" @@ -90,3 +91,38 @@ func (suit *AlgorithmSuite) TestShuffle() { suit.Equal(isRun, n-1) } } + +func (suit *AlgorithmSuite) TestDedup() { + cases := []struct { + input []string + expect []string + }{ + { + input: []string{}, + expect: []string{}, + }, + { + input: []string{ + "abc", "bbc", "abc", + }, + expect: []string{ + "abc", "bbc", + }, + }, + { + input: []string{ + "abc", "bbc", "abc", "bbc", "ddc", "abc", + }, + expect: []string{ + "abc", "bbc", "ddc", + }, + }, + } + + for _, t := range cases { + out := DedupStringArr(t.input) + sort.Strings(out) + sort.Strings(t.expect) + suit.Equal(t.expect, out) + } +} diff --git a/pkg/hashcircler/hash_circler.go b/pkg/hashcircler/hash_circler.go new file mode 100644 index 000000000..615bd7c65 --- /dev/null +++ b/pkg/hashcircler/hash_circler.go @@ -0,0 +1,167 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hashcircler + +import ( + "fmt" + "hash/fnv" + "sort" + "sync" + + "github.com/pkg/errors" +) + +// HashCircler hashes input string to target key, and the key could be enabled or disabled. +// And the keys array is preset, only the keys could be enable or disable. +type HashCircler interface { + // Add adds the target key in hash circle. + Add(key string) + + // Hash hashes the input and output the target key which hashes. + Hash(input string) (key string, err error) + + // Delete deletes the target key + Delete(key string) +} + +var ( + ErrKeyNotPresent = errors.New("key is not present") +) + +// consistentHashCircler is an implementation of HashCircler. And the keys is preset. +type consistentHashCircler struct { + sync.RWMutex + hashFunc func(string) uint64 + + keysMap map[uint64]string + sortedSet []uint64 + replicationPerKey int +} + +// NewConsistentHashCircler constructs an instance of HashCircler from keys. And this is thread safety. +// if hashFunc is nil, it will be set to default hash func. +func NewConsistentHashCircler(keys []string, hashFunc func(string) uint64) (HashCircler, error) { + if hashFunc == nil { + hashFunc = fnvHashFunc + } + + if len(keys) == 0 { + return nil, fmt.Errorf("empty keys") + } + + hc := &consistentHashCircler{ + hashFunc: hashFunc, + keysMap: make(map[uint64]string), + sortedSet: []uint64{}, + replicationPerKey: 16, + } + + for _, k := range keys { + hc.Add(k) + } + + return hc, nil +} + +func (h *consistentHashCircler) Add(key string) { + h.Lock() + defer h.Unlock() + + for i := 0; i < h.replicationPerKey; i++ { + m := h.hashFunc(fmt.Sprintf("%s-%d", key, i)) + if _, exist := h.keysMap[m]; exist { + continue + } + h.keysMap[m] = key + h.sortedSet = append(h.sortedSet, m) + } + + // sort hashes ascendingly + sort.Slice(h.sortedSet, func(i int, j int) bool { + if h.sortedSet[i] < h.sortedSet[j] { + return true + } + return false + }) + + return +} + +func (h *consistentHashCircler) Hash(input string) (key string, err error) { + h.RLock() + defer h.RUnlock() + + if len(h.keysMap) == 0 { + return "", ErrKeyNotPresent + } + + hashN := h.hashFunc(input) + index := h.search(hashN) + + return h.keysMap[h.sortedSet[index]], nil +} + +func (h *consistentHashCircler) Delete(key string) { + h.Lock() + defer h.Unlock() + + for i := 0; i < h.replicationPerKey; i++ { + m := h.hashFunc(fmt.Sprintf("%s-%d", key, i)) + delete(h.keysMap, m) + h.delSlice(m) + } + + return +} + +func (h *consistentHashCircler) search(key uint64) int { + idx := sort.Search(len(h.sortedSet), func(i int) bool { + return h.sortedSet[i] >= key + }) + + if idx >= len(h.sortedSet) { + idx = 0 + } + return idx +} + +func (h *consistentHashCircler) delSlice(val uint64) { + idx := -1 + l := 0 + r := len(h.sortedSet) - 1 + for l <= r { + m := (l + r) / 2 + if h.sortedSet[m] == val { + idx = m + break + } else if h.sortedSet[m] < val { + l = m + 1 + } else if h.sortedSet[m] > val { + r = m - 1 + } + } + + if idx != -1 { + h.sortedSet = append(h.sortedSet[:idx], h.sortedSet[idx+1:]...) + } +} + +func fnvHashFunc(input string) uint64 { + h := fnv.New64a() + h.Write([]byte(input)) + return h.Sum64() +} diff --git a/pkg/hashcircler/hash_circler_test.go b/pkg/hashcircler/hash_circler_test.go new file mode 100644 index 000000000..36b4ca817 --- /dev/null +++ b/pkg/hashcircler/hash_circler_test.go @@ -0,0 +1,174 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hashcircler + +import ( + "math" + "math/rand" + "testing" + + "github.com/go-check/check" +) + +func Test(t *testing.T) { + check.TestingT(t) +} + +type hashCirclerSuite struct { + hashMap map[string]uint64 +} + +func init() { + check.Suite(&hashCirclerSuite{ + hashMap: make(map[string]uint64), + }) +} + +func (suite *hashCirclerSuite) registerHashKV(key string, value uint64) { + suite.hashMap[key] = value +} + +func (suite *hashCirclerSuite) unRegisterHashKV(key string) { + delete(suite.hashMap, key) +} + +func (suite *hashCirclerSuite) cleanHashMap() { + suite.hashMap = make(map[string]uint64) +} + +func (suite *hashCirclerSuite) hash(input string) uint64 { + v, ok := suite.hashMap[input] + if ok { + return v + } + + return 0 +} + +func (suite *hashCirclerSuite) TestHashCircler(c *check.C) { + defer suite.cleanHashMap() + + rangeSize := uint64(math.MaxUint64 / 5) + suite.registerHashKV("v1", rand.Uint64()%rangeSize) + suite.registerHashKV("v2", rand.Uint64()%rangeSize) + suite.registerHashKV("v3", rand.Uint64()%rangeSize+rangeSize) + suite.registerHashKV("v4", rand.Uint64()%rangeSize+rangeSize) + suite.registerHashKV("v5", rand.Uint64()%rangeSize+rangeSize*2) + suite.registerHashKV("v6", rand.Uint64()%rangeSize+rangeSize*2) + suite.registerHashKV("v7", rand.Uint64()%rangeSize+rangeSize*3) + suite.registerHashKV("v8", rand.Uint64()%rangeSize+rangeSize*3) + suite.registerHashKV("v9", rand.Uint64()%rangeSize+rangeSize*4) + suite.registerHashKV("v10", rand.Uint64()%rangeSize+rangeSize*4) + + arr := []string{ + "key1", "key2", "key3", "key4", "key5", + } + + inputStrs := []string{ + "v1", "v2", "v3", "v4", "v5", "v6", "v7", "v8", "v9", "v10", + } + + hasher, err := NewConsistentHashCircler(arr, nil) + c.Assert(err, check.IsNil) + + originKeys := make([]string, len(inputStrs)) + + for i := 0; i < 10; i++ { + k, err := hasher.Hash(inputStrs[i]) + c.Assert(err, check.IsNil) + originKeys[i] = k + } + + // disable arr[0] + hasher.Delete(arr[0]) + for i := 0; i < 10; i++ { + k, err := hasher.Hash(inputStrs[i]) + c.Assert(err, check.IsNil) + c.Assert(k, check.Not(check.Equals), arr[0]) + if originKeys[i] != arr[0] { + c.Assert(k, check.Equals, originKeys[i]) + } + } + + hasher.Delete(arr[1]) + hasher.Delete(arr[2]) + hasher.Delete(arr[4]) + + for i := 0; i < 10; i++ { + k, err := hasher.Hash(inputStrs[i]) + c.Assert(err, check.IsNil) + c.Assert(k, check.Equals, arr[3]) + } + + hasher.Add(arr[1]) + + for i := 0; i < 10; i++ { + k, err := hasher.Hash(inputStrs[i]) + c.Assert(err, check.IsNil) + if originKeys[i] == arr[1] || originKeys[i] == arr[3] { + c.Assert(k, check.Equals, originKeys[i]) + } + c.Assert(true, check.Equals, k == arr[3] || k == arr[1]) + } + + hasher.Add(arr[1]) + hasher.Add(arr[2]) + + for i := 0; i < 10; i++ { + k, err := hasher.Hash(inputStrs[i]) + c.Assert(err, check.IsNil) + + if originKeys[i] == arr[1] || originKeys[i] == arr[2] || originKeys[i] == arr[3] { + c.Assert(k, check.Equals, originKeys[i]) + } + + c.Assert(true, check.Equals, k != arr[0] && k != arr[4]) + } + + hasher.Delete(arr[0]) + hasher.Delete(arr[1]) + hasher.Delete(arr[2]) + for i := 0; i < 10; i++ { + k, err := hasher.Hash(inputStrs[i]) + c.Assert(err, check.IsNil) + c.Assert(k, check.Equals, arr[3]) + } + + hasher.Delete(arr[3]) + for i := 0; i < 10; i++ { + _, err = hasher.Hash(inputStrs[i]) + c.Assert(err, check.NotNil) + } + + hasher.Add(arr[0]) + for i := 0; i < 10; i++ { + k, err := hasher.Hash(inputStrs[i]) + c.Assert(err, check.IsNil) + c.Assert(k, check.Equals, arr[0]) + } + + hasher.Add(arr[1]) + hasher.Add(arr[2]) + hasher.Add(arr[3]) + hasher.Add(arr[4]) + + for i := 0; i < 10; i++ { + k, err := hasher.Hash(inputStrs[i]) + c.Assert(err, check.IsNil) + c.Assert(k, check.Equals, originKeys[i]) + } +} From 0fdd9e725d32d64cbad1475089fa3fe7e41679c0 Mon Sep 17 00:00:00 2001 From: "allen.wq" Date: Mon, 8 Jun 2020 11:38:26 +0800 Subject: [PATCH 2/2] optimize the hash circle, the algorithm of insert/delete/find is updated to rbtree instead of array. Signed-off-by: allen.wq --- go.mod | 1 + go.sum | 2 + pkg/hashcircler/hash_circler.go | 95 ++++++++++++++++++--------------- 3 files changed, 55 insertions(+), 43 deletions(-) diff --git a/go.mod b/go.mod index 38103eaf4..75f737232 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/dragonflyoss/Dragonfly go 1.12 require ( + github.com/HuKeping/rbtree v0.0.0-20200208030951-29f0b79e84ed github.com/PuerkitoBio/purell v0.0.0-20170829232023-f619812e3caf // indirect github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect github.com/asaskevich/govalidator v0.0.0-20170903095215-73945b6115bf // indirect diff --git a/go.sum b/go.sum index e61e7a04d..a6533804a 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/HuKeping/rbtree v0.0.0-20200208030951-29f0b79e84ed h1:YKqpA6qf8Bh73vj8Rv9SBB5OU558f2c1A889nCVUSLE= +github.com/HuKeping/rbtree v0.0.0-20200208030951-29f0b79e84ed/go.mod h1:bODsl3NElqKlgf1UkBLj67fYmY5DsqkKrrYm/kMT/6Y= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/PuerkitoBio/purell v0.0.0-20170829232023-f619812e3caf h1:ePmEKucT6HqNzbxw/yeyfoHplmyGDQUW76ppv4igW7Q= github.com/PuerkitoBio/purell v0.0.0-20170829232023-f619812e3caf/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= diff --git a/pkg/hashcircler/hash_circler.go b/pkg/hashcircler/hash_circler.go index 615bd7c65..1c35810d0 100644 --- a/pkg/hashcircler/hash_circler.go +++ b/pkg/hashcircler/hash_circler.go @@ -19,9 +19,9 @@ package hashcircler import ( "fmt" "hash/fnv" - "sort" "sync" + "github.com/HuKeping/rbtree" "github.com/pkg/errors" ) @@ -48,8 +48,8 @@ type consistentHashCircler struct { hashFunc func(string) uint64 keysMap map[uint64]string - sortedSet []uint64 replicationPerKey int + rb *rbtree.Rbtree } // NewConsistentHashCircler constructs an instance of HashCircler from keys. And this is thread safety. @@ -66,8 +66,8 @@ func NewConsistentHashCircler(keys []string, hashFunc func(string) uint64) (Hash hc := &consistentHashCircler{ hashFunc: hashFunc, keysMap: make(map[uint64]string), - sortedSet: []uint64{}, replicationPerKey: 16, + rb: rbtree.New(), } for _, k := range keys { @@ -87,17 +87,9 @@ func (h *consistentHashCircler) Add(key string) { continue } h.keysMap[m] = key - h.sortedSet = append(h.sortedSet, m) + h.addToRbTree(m, key) } - // sort hashes ascendingly - sort.Slice(h.sortedSet, func(i int, j int) bool { - if h.sortedSet[i] < h.sortedSet[j] { - return true - } - return false - }) - return } @@ -109,10 +101,9 @@ func (h *consistentHashCircler) Hash(input string) (key string, err error) { return "", ErrKeyNotPresent } - hashN := h.hashFunc(input) - index := h.search(hashN) + index := h.hashFunc(input) - return h.keysMap[h.sortedSet[index]], nil + return h.searchFromRbTree(index), nil } func (h *consistentHashCircler) Delete(key string) { @@ -122,46 +113,64 @@ func (h *consistentHashCircler) Delete(key string) { for i := 0; i < h.replicationPerKey; i++ { m := h.hashFunc(fmt.Sprintf("%s-%d", key, i)) delete(h.keysMap, m) - h.delSlice(m) + h.deleteFromRbTree(m) } return } -func (h *consistentHashCircler) search(key uint64) int { - idx := sort.Search(len(h.sortedSet), func(i int) bool { - return h.sortedSet[i] >= key - }) +func fnvHashFunc(input string) uint64 { + h := fnv.New64a() + h.Write([]byte(input)) + return h.Sum64() +} - if idx >= len(h.sortedSet) { - idx = 0 +func (h *consistentHashCircler) addToRbTree(index uint64, key string) { + i := &item{ + index: index, + key: key, } - return idx + + h.rb.Insert(i) } -func (h *consistentHashCircler) delSlice(val uint64) { - idx := -1 - l := 0 - r := len(h.sortedSet) - 1 - for l <= r { - m := (l + r) / 2 - if h.sortedSet[m] == val { - idx = m - break - } else if h.sortedSet[m] < val { - l = m + 1 - } else if h.sortedSet[m] > val { - r = m - 1 - } +func (h *consistentHashCircler) deleteFromRbTree(index uint64) { + i := &item{ + index: index, } - if idx != -1 { - h.sortedSet = append(h.sortedSet[:idx], h.sortedSet[idx+1:]...) + h.rb.Delete(i) +} + +func (h *consistentHashCircler) searchFromRbTree(index uint64) string { + comp := &item{ + index: index, } + + target := "" + + // find the key which index of item greater or equal than input index. + h.rb.Ascend(comp, func(i rbtree.Item) bool { + o := i.(*item) + target = o.key + return false + }) + + // if not found the target, return the max item. + if target == "" { + i := h.rb.Max() + target = i.(*item).key + } + + return target } -func fnvHashFunc(input string) uint64 { - h := fnv.New64a() - h.Write([]byte(input)) - return h.Sum64() +type item struct { + index uint64 + key string +} + +func (i *item) Less(than rbtree.Item) bool { + other := than.(*item) + return i.index < other.index }