From 6e795c666cc4df53fbcb484c77be0d98fee2b088 Mon Sep 17 00:00:00 2001 From: "allen.wq" Date: Wed, 27 May 2020 11:53:58 +0800 Subject: [PATCH] add hash circler locator Signed-off-by: allen.wq --- dfget/locator/hashcircler_locator.go | 191 ++++++++++++++++ dfget/locator/hashcircler_locator_test.go | 125 +++++++++++ pkg/algorithm/algorithm.go | 22 ++ pkg/algorithm/algorithm_test.go | 36 +++ pkg/hashcircler/hash_circler.go | 253 ++++++++++++++++++++++ pkg/hashcircler/hash_circler_test.go | 193 +++++++++++++++++ 6 files changed, 820 insertions(+) create mode 100644 dfget/locator/hashcircler_locator.go create mode 100644 dfget/locator/hashcircler_locator_test.go create mode 100644 pkg/hashcircler/hash_circler.go create mode 100644 pkg/hashcircler/hash_circler_test.go diff --git a/dfget/locator/hashcircler_locator.go b/dfget/locator/hashcircler_locator.go new file mode 100644 index 000000000..b2536ce84 --- /dev/null +++ b/dfget/locator/hashcircler_locator.go @@ -0,0 +1,191 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package locator + +import ( + "context" + "fmt" + "sort" + "time" + + "github.com/dragonflyoss/Dragonfly/dfget/config" + "github.com/dragonflyoss/Dragonfly/pkg/algorithm" + "github.com/dragonflyoss/Dragonfly/pkg/hashcircler" + "github.com/dragonflyoss/Dragonfly/pkg/netutils" + "github.com/dragonflyoss/Dragonfly/pkg/queue" + + "github.com/sirupsen/logrus" +) + +const ( + enableEv = "enable" + disableEv = "disable" +) + +type SuperNodeEvent struct { + evType string + node string +} + +func NewEnableEvent(node string) *SuperNodeEvent { + return &SuperNodeEvent{ + evType: enableEv, + node: node, + } +} + +func NewDisableEvent(node string) *SuperNodeEvent { + return &SuperNodeEvent{ + evType: disableEv, + node: node, + } +} + +// hashCirclerLocator is an implementation of SupernodeLocator. And it provides ability to select a supernode +// by input key. It allows some supernodes disabled, on this condition the disable supernode will not be selected. +type hashCirclerLocator struct { + hc hashcircler.HashCircler + nodes []string + groupName string + group *SupernodeGroup + + // evQueue will puts/polls SuperNodeEvent to disable/enable supernode. + evQueue queue.Queue +} + +func NewHashCirclerLocator(groupName string, nodes []string, eventQueue queue.Queue) (SupernodeLocator, error) { + nodes = algorithm.DedupStringArr(nodes) + if len(nodes) == 0 { + return nil, fmt.Errorf("nodes should not be nil") + } + + sort.Strings(nodes) + + group := &SupernodeGroup{ + Name: groupName, + Nodes: []*Supernode{}, + Infos: make(map[string]string), + } + keys := []string{} + for _, node := range nodes { + ip, port := netutils.GetIPAndPortFromNode(node, config.DefaultSupernodePort) + if ip == "" { + continue + } + supernode := &Supernode{ + Schema: config.DefaultSupernodeSchema, + IP: ip, + Port: port, + GroupName: groupName, + } + + group.Nodes = append(group.Nodes, supernode) + keys = append(keys, supernode.String()) + } + + hc, err := hashcircler.NewConsistentHashCircler(keys, nil) + if err != nil { + return nil, err + } + + h := &hashCirclerLocator{ + hc: hc, + evQueue: eventQueue, + groupName: groupName, + group: group, + } + + go h.eventLoop(context.Background()) + + return h, nil +} + +func (h *hashCirclerLocator) Get() *Supernode { + // not implementation + return nil +} + +func (h *hashCirclerLocator) Next() *Supernode { + // not implementation + return nil +} + +func (h *hashCirclerLocator) Select(key interface{}) *Supernode { + s, err := h.hc.Hash(key.(string)) + if err != nil { + logrus.Errorf("failed to get supernode: %v", err) + return nil + } + + for _, sp := range h.group.Nodes { + if s == sp.String() { + return sp + } + } + + return nil +} + +func (h *hashCirclerLocator) GetGroup(name string) *SupernodeGroup { + if h.group == nil || h.group.Name != name { + return nil + } + + return h.group +} + +func (h *hashCirclerLocator) All() []*SupernodeGroup { + return []*SupernodeGroup{h.group} +} + +func (h *hashCirclerLocator) Size() int { + return len(h.group.Nodes) +} + +func (h *hashCirclerLocator) Report(node string, metrics *SupernodeMetrics) { + return +} + +func (h *hashCirclerLocator) Refresh() bool { + return true +} + +func (h *hashCirclerLocator) eventLoop(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + default: + } + + if ev, ok := h.evQueue.PollTimeout(time.Second); ok { + h.handleEvent(ev.(*SuperNodeEvent)) + } + } +} + +func (h *hashCirclerLocator) handleEvent(ev *SuperNodeEvent) { + switch ev.evType { + case enableEv: + h.hc.Enable(ev.node) + case disableEv: + h.hc.Disable(ev.node) + default: + } + + return +} diff --git a/dfget/locator/hashcircler_locator_test.go b/dfget/locator/hashcircler_locator_test.go new file mode 100644 index 000000000..4c2d57082 --- /dev/null +++ b/dfget/locator/hashcircler_locator_test.go @@ -0,0 +1,125 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package locator + +import ( + "time" + + "github.com/dragonflyoss/Dragonfly/pkg/queue" + + "github.com/go-check/check" +) + +type hashCirclerLocatorTestSuite struct { +} + +func init() { + check.Suite(&hashCirclerLocatorTestSuite{}) +} + +var testGroupName1 = "test-group1" + +func (s *hashCirclerLocatorTestSuite) TestHashCirclerLocator(c *check.C) { + evQ := queue.NewQueue(0) + nodes := []string{"1.1.1.1:8002", "2.2.2.2:8002", "3.3.3.3:8002"} + hl, err := NewHashCirclerLocator(testGroupName1, nodes, evQ) + c.Assert(err, check.IsNil) + + c.Assert(hl.Get(), check.IsNil) + c.Assert(hl.Next(), check.IsNil) + + groups := hl.All() + c.Assert(len(groups), check.Equals, 1) + c.Assert(len(groups[0].Nodes), check.Equals, 3) + c.Assert(groups[0].Nodes[0].String(), check.Equals, nodes[0]) + c.Assert(groups[0].Nodes[1].String(), check.Equals, nodes[1]) + c.Assert(groups[0].Nodes[2].String(), check.Equals, nodes[2]) + + keys := []string{"x", "y", "z", "a", "b", "c", "m", "n", "p", "q", "j", "k", "i", "e", "f", "g"} + originSp := make([]string, len(keys)) + + for i, k := range keys { + sp := hl.Select(k) + c.Assert(sp, check.NotNil) + originSp[i] = sp.String() + } + + // select again, the supernode should be equal + for i, k := range keys { + sp := hl.Select(k) + c.Assert(sp, check.NotNil) + c.Assert(originSp[i], check.Equals, sp.String()) + } + + // disable nodes[0] + evQ.Put(NewDisableEvent(nodes[0])) + time.Sleep(time.Second * 2) + // select again, if originSp is not nodes[0], it should not be changed. + for i, k := range keys { + sp := hl.Select(k) + c.Assert(sp, check.NotNil) + if originSp[i] == nodes[0] { + c.Assert(originSp[i], check.Not(check.Equals), sp.String()) + continue + } + + c.Assert(originSp[i], check.Equals, sp.String()) + } + + // disable nodes[1] + evQ.Put(NewDisableEvent(nodes[1])) + time.Sleep(time.Second * 2) + // select again, all select node should be nodes[2] + for _, k := range keys { + sp := hl.Select(k) + c.Assert(sp, check.NotNil) + c.Assert(nodes[2], check.Equals, sp.String()) + } + + // enable nodes[0], disable nodes[2] + evQ.Put(NewDisableEvent(nodes[2])) + evQ.Put(NewEnableEvent(nodes[0])) + time.Sleep(time.Second * 2) + for _, k := range keys { + sp := hl.Select(k) + c.Assert(sp, check.NotNil) + c.Assert(nodes[0], check.Equals, sp.String()) + } + + // enable nodes[1] + evQ.Put(NewEnableEvent(nodes[1])) + time.Sleep(time.Second * 2) + for i, k := range keys { + sp := hl.Select(k) + c.Assert(sp, check.NotNil) + if originSp[i] == nodes[2] { + c.Assert(originSp[i], check.Not(check.Equals), sp.String()) + continue + } + + c.Assert(originSp[i], check.Equals, sp.String()) + } + + // enable nodes[2], select node should be equal with origin one + evQ.Put(NewEnableEvent(nodes[2])) + time.Sleep(time.Second * 2) + for i, k := range keys { + sp := hl.Select(k) + c.Assert(sp, check.NotNil) + c.Assert(originSp[i], check.Equals, sp.String()) + } +} diff --git a/pkg/algorithm/algorithm.go b/pkg/algorithm/algorithm.go index 25445ed3e..18ee28412 100644 --- a/pkg/algorithm/algorithm.go +++ b/pkg/algorithm/algorithm.go @@ -18,6 +18,7 @@ package algorithm import ( "math/rand" + "sort" "time" ) @@ -100,3 +101,24 @@ func GCD(x, y int) int { } return x } + +// DedupStringArr removes duplicate string in array. +func DedupStringArr(input []string) []string { + if len(input) == 0 { + return []string{} + } + + out := make([]string, len(input)) + copy(out, input) + sort.Strings(out) + + idx := 0 + for i := 1; i < len(input); i++ { + if out[idx] != out[i] { + idx++ + out[idx] = out[i] + } + } + + return out[:idx+1] +} diff --git a/pkg/algorithm/algorithm_test.go b/pkg/algorithm/algorithm_test.go index 1fa807ea0..ba296fbee 100644 --- a/pkg/algorithm/algorithm_test.go +++ b/pkg/algorithm/algorithm_test.go @@ -18,6 +18,7 @@ package algorithm import ( "math/rand" + "sort" "testing" "github.com/stretchr/testify/suite" @@ -90,3 +91,38 @@ func (suit *AlgorithmSuite) TestShuffle() { suit.Equal(isRun, n-1) } } + +func (suit *AlgorithmSuite) TestDedup() { + cases := []struct { + input []string + expect []string + }{ + { + input: []string{}, + expect: []string{}, + }, + { + input: []string{ + "abc", "bbc", "abc", + }, + expect: []string{ + "abc", "bbc", + }, + }, + { + input: []string{ + "abc", "bbc", "abc", "bbc", "ddc", "abc", + }, + expect: []string{ + "abc", "bbc", "ddc", + }, + }, + } + + for _, t := range cases { + out := DedupStringArr(t.input) + sort.Strings(out) + sort.Strings(t.expect) + suit.Equal(t.expect, out) + } +} diff --git a/pkg/hashcircler/hash_circler.go b/pkg/hashcircler/hash_circler.go new file mode 100644 index 000000000..92590cdc8 --- /dev/null +++ b/pkg/hashcircler/hash_circler.go @@ -0,0 +1,253 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hashcircler + +import ( + "fmt" + "hash/fnv" + "math" + "math/rand" + "sort" + "sync" + "time" + + "github.com/pkg/errors" +) + +// HashCircler hashes input string to target key, and the key could be enabled or disabled. +// And the keys array is preset, only the keys could be enable or disable. +type HashCircler interface { + // Enable enables the target key in hash circle. + Enable(key string) error + + // Hash hashes the input and output the target key which hashes. + Hash(input string) (key string, err error) + + // Disable disables the target key + Disable(key string) +} + +var ( + ErrKeyNotPresent = errors.New("key is not present") +) + +func init() { + rand.Seed(time.Now().UnixNano()) +} + +type virtualNode struct { + start uint64 + end uint64 + keyIndex int +} + +type keyStatus struct { + key string + enable bool +} + +// preSetHashCircler is an implementation of HashCircler. And the keys is preset. +type preSetHashCircler struct { + sync.RWMutex + validKeyCount int + vNodes []*virtualNode + keyArr []*keyStatus + hashFunc func(string) uint64 +} + +// NewConsistentHashCircler constructs an instance of HashCircler from keys. And this is thread safety. +// if hashFunc is nil, it will be set to default hash func. +func NewConsistentHashCircler(keys []string, hashFunc func(string) uint64) (HashCircler, error) { + if hashFunc == nil { + hashFunc = fnvHashFunc + } + + if len(keys) == 0 { + return nil, fmt.Errorf("empty keys") + } + + keyArr := make([]*keyStatus, len(keys)) + + for i, key := range keys { + keyArr[i] = &keyStatus{ + key: key, + enable: true, + } + } + + hc := &preSetHashCircler{ + keyArr: keyArr, + hashFunc: hashFunc, + validKeyCount: len(keyArr), + } + + vNodes := hc.generateVirtualNodes(keys, 32) + hc.vNodes = vNodes + + return hc, nil +} + +func (h *preSetHashCircler) Enable(key string) error { + h.Lock() + defer h.Unlock() + + found := false + + for _, k := range h.keyArr { + if k.key == key { + found = true + if k.enable { + return nil + } + + k.enable = true + break + } + } + + if !found { + return ErrKeyNotPresent + } + + h.validKeyCount++ + return nil +} + +func (h *preSetHashCircler) Hash(input string) (key string, err error) { + h.RLock() + defer h.RUnlock() + + if h.validKeyCount == 0 { + return "", ErrKeyNotPresent + } + + hashIndex := h.hashFunc(input) + rangeIndex := sort.Search(len(h.vNodes), func(i int) bool { + return hashIndex <= h.vNodes[i].end + }) + + currIndex := rangeIndex + for { + keyIndex := h.vNodes[rangeIndex].keyIndex + if h.keyArr[keyIndex].enable { + return h.keyArr[keyIndex].key, nil + } + + rangeIndex = h.circlePrvIndex(rangeIndex, len(h.vNodes)) + if rangeIndex == currIndex { + break + } + } + + return "", ErrKeyNotPresent +} + +func (h *preSetHashCircler) Disable(key string) { + h.Lock() + defer h.Unlock() + + found := false + + for _, k := range h.keyArr { + if k.key == key { + found = true + if !k.enable { + return + } + + k.enable = false + break + } + } + + if !found { + return + } + + h.validKeyCount-- + return +} + +func (h *preSetHashCircler) circlePrvIndex(index int, arrCount int) int { + if index > 0 { + return index - 1 + } + + return arrCount - 1 +} + +func (h *preSetHashCircler) circleNextIndex(index int, arrCount int) int { + if index < arrCount-1 { + return index + 1 + } + + return 0 +} + +func (h *preSetHashCircler) generateVirtualNodes(keys []string, virtualNodesPerKey int) []*virtualNode { + dup := map[uint64]struct{}{} + vNodesArr := []*virtualNode{} + for i := range keys { + indexArr := h.randomVirtualNodeKeys(dup, virtualNodesPerKey) + for _, index := range indexArr { + vNodesArr = append(vNodesArr, &virtualNode{ + start: index, + keyIndex: i, + }) + } + } + + sort.Slice(vNodesArr, func(i, j int) bool { + return vNodesArr[i].start < vNodesArr[j].start + }) + + for i := 0; i < len(vNodesArr)-1; i++ { + vNodesArr[i].end = vNodesArr[i+1].start - 1 + } + + vNodesArr[0].start = 0 + vNodesArr[len(vNodesArr)-1].end = math.MaxUint64 + + return vNodesArr +} + +func (h *preSetHashCircler) randomVirtualNodeKeys(dup map[uint64]struct{}, count int) []uint64 { + index := 0 + res := make([]uint64, count) + for { + if index >= count { + break + } + + v := rand.Uint64() + if _, exist := dup[v]; exist { + continue + } + + res[index] = v + index++ + dup[v] = struct{}{} + } + + return res +} + +func fnvHashFunc(input string) uint64 { + h := fnv.New64a() + h.Write([]byte(input)) + return h.Sum64() +} diff --git a/pkg/hashcircler/hash_circler_test.go b/pkg/hashcircler/hash_circler_test.go new file mode 100644 index 000000000..3fd2f16ff --- /dev/null +++ b/pkg/hashcircler/hash_circler_test.go @@ -0,0 +1,193 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hashcircler + +import ( + "math" + "math/rand" + "testing" + + "github.com/go-check/check" +) + +func Test(t *testing.T) { + check.TestingT(t) +} + +type hashCirclerSuite struct { + hashMap map[string]uint64 +} + +func init() { + check.Suite(&hashCirclerSuite{ + hashMap: make(map[string]uint64), + }) +} + +func (suite *hashCirclerSuite) registerHashKV(key string, value uint64) { + suite.hashMap[key] = value +} + +func (suite *hashCirclerSuite) unRegisterHashKV(key string) { + delete(suite.hashMap, key) +} + +func (suite *hashCirclerSuite) cleanHashMap() { + suite.hashMap = make(map[string]uint64) +} + +func (suite *hashCirclerSuite) hash(input string) uint64 { + v, ok := suite.hashMap[input] + if ok { + return v + } + + return 0 +} + +func (suite *hashCirclerSuite) TestHashCircler(c *check.C) { + defer suite.cleanHashMap() + + rangeSize := uint64(math.MaxUint64 / 5) + suite.registerHashKV("v1", rand.Uint64()%rangeSize) + suite.registerHashKV("v2", rand.Uint64()%rangeSize) + suite.registerHashKV("v3", rand.Uint64()%rangeSize+rangeSize) + suite.registerHashKV("v4", rand.Uint64()%rangeSize+rangeSize) + suite.registerHashKV("v5", rand.Uint64()%rangeSize+rangeSize*2) + suite.registerHashKV("v6", rand.Uint64()%rangeSize+rangeSize*2) + suite.registerHashKV("v7", rand.Uint64()%rangeSize+rangeSize*3) + suite.registerHashKV("v8", rand.Uint64()%rangeSize+rangeSize*3) + suite.registerHashKV("v9", rand.Uint64()%rangeSize+rangeSize*4) + suite.registerHashKV("v10", rand.Uint64()%rangeSize+rangeSize*4) + + arr := []string{ + "key1", "key2", "key3", "key4", "key5", + } + + inputStrs := []string{ + "v1", "v2", "v3", "v4", "v5", "v6", "v7", "v8", "v9", "v10", + } + + hasher, err := NewConsistentHashCircler(arr, suite.hash) + c.Assert(err, check.IsNil) + + originKeys := make([]string, len(inputStrs)) + + for i := 0; i < 10; i++ { + k, err := hasher.Hash(inputStrs[i]) + c.Assert(err, check.IsNil) + originKeys[i] = k + } + + // disable arr[0] + hasher.Disable(arr[0]) + for i := 0; i < 10; i++ { + k, err := hasher.Hash(inputStrs[i]) + c.Assert(err, check.IsNil) + c.Assert(k, check.Not(check.Equals), arr[0]) + if originKeys[i] != arr[0] { + c.Assert(k, check.Equals, originKeys[i]) + } + } + + hasher.Disable(arr[1]) + hasher.Disable(arr[2]) + hasher.Disable(arr[4]) + + for i := 0; i < 10; i++ { + k, err := hasher.Hash(inputStrs[i]) + c.Assert(err, check.IsNil) + c.Assert(k, check.Equals, arr[3]) + } + + err = hasher.Enable(arr[1]) + c.Assert(err, check.IsNil) + + for i := 0; i < 10; i++ { + k, err := hasher.Hash(inputStrs[i]) + c.Assert(err, check.IsNil) + if originKeys[i] == arr[1] || originKeys[i] == arr[3] { + c.Assert(k, check.Equals, originKeys[i]) + } + c.Assert(true, check.Equals, k == arr[3] || k == arr[1]) + } + + err = hasher.Enable(arr[1]) + c.Assert(err, check.IsNil) + err = hasher.Enable(arr[2]) + c.Assert(err, check.IsNil) + + for i := 0; i < 10; i++ { + k, err := hasher.Hash(inputStrs[i]) + c.Assert(err, check.IsNil) + + if originKeys[i] == arr[1] || originKeys[i] == arr[2] || originKeys[i] == arr[3] { + c.Assert(k, check.Equals, originKeys[i]) + } + + c.Assert(true, check.Equals, k != arr[0] && k != arr[4]) + } + + hasher.Disable(arr[0]) + hasher.Disable(arr[1]) + hasher.Disable(arr[2]) + for i := 0; i < 10; i++ { + k, err := hasher.Hash(inputStrs[i]) + c.Assert(err, check.IsNil) + c.Assert(k, check.Equals, arr[3]) + } + + hasher.Disable(arr[3]) + for i := 0; i < 10; i++ { + _, err = hasher.Hash(inputStrs[i]) + c.Assert(err, check.NotNil) + } + + err = hasher.Enable(arr[0]) + c.Assert(err, check.IsNil) + for i := 0; i < 10; i++ { + k, err := hasher.Hash(inputStrs[i]) + c.Assert(err, check.IsNil) + c.Assert(k, check.Equals, arr[0]) + } + + err = hasher.Enable(arr[1]) + c.Assert(err, check.IsNil) + for i := 0; i < 10; i++ { + k, err := hasher.Hash(inputStrs[i]) + c.Assert(err, check.IsNil) + + if originKeys[i] == arr[0] || originKeys[i] == arr[1] { + c.Assert(k, check.Equals, originKeys[i]) + } + + c.Assert(k == arr[0] || k == arr[1], check.Equals, true) + } + + err = hasher.Enable(arr[2]) + c.Assert(err, check.IsNil) + err = hasher.Enable(arr[3]) + c.Assert(err, check.IsNil) + err = hasher.Enable(arr[4]) + c.Assert(err, check.IsNil) + + for i := 0; i < 10; i++ { + k, err := hasher.Hash(inputStrs[i]) + c.Assert(err, check.IsNil) + c.Assert(k, check.Equals, originKeys[i]) + } +}