Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
add hash circler locator
Browse files Browse the repository at this point in the history
Signed-off-by: allen.wq <[email protected]>
  • Loading branch information
wangforthinker committed May 29, 2020
1 parent 3945cdb commit 6e795c6
Show file tree
Hide file tree
Showing 6 changed files with 820 additions and 0 deletions.
191 changes: 191 additions & 0 deletions dfget/locator/hashcircler_locator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package locator

import (
"context"
"fmt"
"sort"
"time"

"github.com/dragonflyoss/Dragonfly/dfget/config"
"github.com/dragonflyoss/Dragonfly/pkg/algorithm"
"github.com/dragonflyoss/Dragonfly/pkg/hashcircler"
"github.com/dragonflyoss/Dragonfly/pkg/netutils"
"github.com/dragonflyoss/Dragonfly/pkg/queue"

"github.com/sirupsen/logrus"
)

const (
enableEv = "enable"
disableEv = "disable"
)

type SuperNodeEvent struct {
evType string
node string
}

func NewEnableEvent(node string) *SuperNodeEvent {
return &SuperNodeEvent{
evType: enableEv,
node: node,
}
}

func NewDisableEvent(node string) *SuperNodeEvent {
return &SuperNodeEvent{
evType: disableEv,
node: node,
}
}

// hashCirclerLocator is an implementation of SupernodeLocator. And it provides ability to select a supernode
// by input key. It allows some supernodes disabled, on this condition the disable supernode will not be selected.
type hashCirclerLocator struct {
hc hashcircler.HashCircler
nodes []string
groupName string
group *SupernodeGroup

// evQueue will puts/polls SuperNodeEvent to disable/enable supernode.
evQueue queue.Queue
}

func NewHashCirclerLocator(groupName string, nodes []string, eventQueue queue.Queue) (SupernodeLocator, error) {
nodes = algorithm.DedupStringArr(nodes)
if len(nodes) == 0 {
return nil, fmt.Errorf("nodes should not be nil")
}

sort.Strings(nodes)

group := &SupernodeGroup{
Name: groupName,
Nodes: []*Supernode{},
Infos: make(map[string]string),
}
keys := []string{}
for _, node := range nodes {
ip, port := netutils.GetIPAndPortFromNode(node, config.DefaultSupernodePort)
if ip == "" {
continue
}
supernode := &Supernode{
Schema: config.DefaultSupernodeSchema,
IP: ip,
Port: port,
GroupName: groupName,
}

group.Nodes = append(group.Nodes, supernode)
keys = append(keys, supernode.String())
}

hc, err := hashcircler.NewConsistentHashCircler(keys, nil)
if err != nil {
return nil, err
}

h := &hashCirclerLocator{
hc: hc,
evQueue: eventQueue,
groupName: groupName,
group: group,
}

go h.eventLoop(context.Background())

return h, nil
}

func (h *hashCirclerLocator) Get() *Supernode {
// not implementation
return nil
}

func (h *hashCirclerLocator) Next() *Supernode {
// not implementation
return nil
}

func (h *hashCirclerLocator) Select(key interface{}) *Supernode {
s, err := h.hc.Hash(key.(string))
if err != nil {
logrus.Errorf("failed to get supernode: %v", err)
return nil
}

for _, sp := range h.group.Nodes {
if s == sp.String() {
return sp
}
}

return nil
}

func (h *hashCirclerLocator) GetGroup(name string) *SupernodeGroup {
if h.group == nil || h.group.Name != name {
return nil
}

return h.group
}

func (h *hashCirclerLocator) All() []*SupernodeGroup {
return []*SupernodeGroup{h.group}
}

func (h *hashCirclerLocator) Size() int {
return len(h.group.Nodes)
}

func (h *hashCirclerLocator) Report(node string, metrics *SupernodeMetrics) {
return
}

func (h *hashCirclerLocator) Refresh() bool {
return true
}

func (h *hashCirclerLocator) eventLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
}

if ev, ok := h.evQueue.PollTimeout(time.Second); ok {
h.handleEvent(ev.(*SuperNodeEvent))
}
}
}

func (h *hashCirclerLocator) handleEvent(ev *SuperNodeEvent) {
switch ev.evType {
case enableEv:
h.hc.Enable(ev.node)
case disableEv:
h.hc.Disable(ev.node)
default:
}

return
}
125 changes: 125 additions & 0 deletions dfget/locator/hashcircler_locator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package locator

import (
"time"

"github.com/dragonflyoss/Dragonfly/pkg/queue"

"github.com/go-check/check"
)

type hashCirclerLocatorTestSuite struct {
}

func init() {
check.Suite(&hashCirclerLocatorTestSuite{})
}

var testGroupName1 = "test-group1"

func (s *hashCirclerLocatorTestSuite) TestHashCirclerLocator(c *check.C) {
evQ := queue.NewQueue(0)
nodes := []string{"1.1.1.1:8002", "2.2.2.2:8002", "3.3.3.3:8002"}
hl, err := NewHashCirclerLocator(testGroupName1, nodes, evQ)
c.Assert(err, check.IsNil)

c.Assert(hl.Get(), check.IsNil)
c.Assert(hl.Next(), check.IsNil)

groups := hl.All()
c.Assert(len(groups), check.Equals, 1)
c.Assert(len(groups[0].Nodes), check.Equals, 3)
c.Assert(groups[0].Nodes[0].String(), check.Equals, nodes[0])
c.Assert(groups[0].Nodes[1].String(), check.Equals, nodes[1])
c.Assert(groups[0].Nodes[2].String(), check.Equals, nodes[2])

keys := []string{"x", "y", "z", "a", "b", "c", "m", "n", "p", "q", "j", "k", "i", "e", "f", "g"}
originSp := make([]string, len(keys))

for i, k := range keys {
sp := hl.Select(k)
c.Assert(sp, check.NotNil)
originSp[i] = sp.String()
}

// select again, the supernode should be equal
for i, k := range keys {
sp := hl.Select(k)
c.Assert(sp, check.NotNil)
c.Assert(originSp[i], check.Equals, sp.String())
}

// disable nodes[0]
evQ.Put(NewDisableEvent(nodes[0]))
time.Sleep(time.Second * 2)
// select again, if originSp is not nodes[0], it should not be changed.
for i, k := range keys {
sp := hl.Select(k)
c.Assert(sp, check.NotNil)
if originSp[i] == nodes[0] {
c.Assert(originSp[i], check.Not(check.Equals), sp.String())
continue
}

c.Assert(originSp[i], check.Equals, sp.String())
}

// disable nodes[1]
evQ.Put(NewDisableEvent(nodes[1]))
time.Sleep(time.Second * 2)
// select again, all select node should be nodes[2]
for _, k := range keys {
sp := hl.Select(k)
c.Assert(sp, check.NotNil)
c.Assert(nodes[2], check.Equals, sp.String())
}

// enable nodes[0], disable nodes[2]
evQ.Put(NewDisableEvent(nodes[2]))
evQ.Put(NewEnableEvent(nodes[0]))
time.Sleep(time.Second * 2)
for _, k := range keys {
sp := hl.Select(k)
c.Assert(sp, check.NotNil)
c.Assert(nodes[0], check.Equals, sp.String())
}

// enable nodes[1]
evQ.Put(NewEnableEvent(nodes[1]))
time.Sleep(time.Second * 2)
for i, k := range keys {
sp := hl.Select(k)
c.Assert(sp, check.NotNil)
if originSp[i] == nodes[2] {
c.Assert(originSp[i], check.Not(check.Equals), sp.String())
continue
}

c.Assert(originSp[i], check.Equals, sp.String())
}

// enable nodes[2], select node should be equal with origin one
evQ.Put(NewEnableEvent(nodes[2]))
time.Sleep(time.Second * 2)
for i, k := range keys {
sp := hl.Select(k)
c.Assert(sp, check.NotNil)
c.Assert(originSp[i], check.Equals, sp.String())
}
}
22 changes: 22 additions & 0 deletions pkg/algorithm/algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package algorithm

import (
"math/rand"
"sort"
"time"
)

Expand Down Expand Up @@ -100,3 +101,24 @@ func GCD(x, y int) int {
}
return x
}

// DedupStringArr removes duplicate string in array.
func DedupStringArr(input []string) []string {
if len(input) == 0 {
return []string{}
}

out := make([]string, len(input))
copy(out, input)
sort.Strings(out)

idx := 0
for i := 1; i < len(input); i++ {
if out[idx] != out[i] {
idx++
out[idx] = out[i]
}
}

return out[:idx+1]
}
36 changes: 36 additions & 0 deletions pkg/algorithm/algorithm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package algorithm

import (
"math/rand"
"sort"
"testing"

"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -90,3 +91,38 @@ func (suit *AlgorithmSuite) TestShuffle() {
suit.Equal(isRun, n-1)
}
}

func (suit *AlgorithmSuite) TestDedup() {
cases := []struct {
input []string
expect []string
}{
{
input: []string{},
expect: []string{},
},
{
input: []string{
"abc", "bbc", "abc",
},
expect: []string{
"abc", "bbc",
},
},
{
input: []string{
"abc", "bbc", "abc", "bbc", "ddc", "abc",
},
expect: []string{
"abc", "bbc", "ddc",
},
},
}

for _, t := range cases {
out := DedupStringArr(t.input)
sort.Strings(out)
sort.Strings(t.expect)
suit.Equal(t.expect, out)
}
}
Loading

0 comments on commit 6e795c6

Please sign in to comment.