From 68ef2c50296dec9afe0061ceefe46904b0afbe4e Mon Sep 17 00:00:00 2001 From: lowzj Date: Wed, 6 May 2020 20:47:35 +0800 Subject: [PATCH] refactor: refactor dfget/core with SupernodeLocator Signed-off-by: lowzj --- cmd/dfget/app/root_test.go | 2 +- dfget/config/config.go | 3 +- dfget/core/core.go | 67 ++++++++++------- dfget/core/core_test.go | 22 ++++-- dfget/core/regist/register.go | 108 ++++++++++++--------------- dfget/core/regist/register_test.go | 22 +++--- dfget/locator/locator.go | 8 ++ dfget/locator/manager.go | 64 ++++++++++++++++ dfget/locator/static_locator.go | 27 +++++-- dfget/locator/static_locator_test.go | 30 +++++--- 10 files changed, 229 insertions(+), 124 deletions(-) create mode 100644 dfget/locator/manager.go diff --git a/cmd/dfget/app/root_test.go b/cmd/dfget/app/root_test.go index e7dcfdfab..73b48afde 100644 --- a/cmd/dfget/app/root_test.go +++ b/cmd/dfget/app/root_test.go @@ -38,7 +38,7 @@ type dfgetSuit struct { func (suit *dfgetSuit) Test_initFlagsNoArguments() { initProperties() - suit.Equal(cfg.Nodes, []string{"127.0.0.1:8002"}) + suit.Equal(cfg.Nodes, []string(nil)) suit.Equal(cfg.LocalLimit, 20*rate.MB) suit.Equal(cfg.TotalLimit, rate.Rate(0)) suit.Equal(cfg.Notbs, false) diff --git a/dfget/config/config.go b/dfget/config/config.go index bd0e21e13..d5b0b36ec 100644 --- a/dfget/config/config.go +++ b/dfget/config/config.go @@ -91,8 +91,9 @@ type Properties struct { // NewProperties creates a new properties with default values. func NewProperties() *Properties { + // don't set Supernodes as default value, the SupernodeLocator will + // do this in a better way. return &Properties{ - Supernodes: GetDefaultSupernodesValue(), LocalLimit: DefaultLocalLimit, MinRate: DefaultMinRate, ClientQueueSize: DefaultClientQueueSize, diff --git a/dfget/core/core.go b/dfget/core/core.go index 49ee14b6a..1b07faf59 100644 --- a/dfget/core/core.go +++ b/dfget/core/core.go @@ -34,6 +34,7 @@ import ( p2pDown "github.com/dragonflyoss/Dragonfly/dfget/core/downloader/p2p_downloader" "github.com/dragonflyoss/Dragonfly/dfget/core/regist" "github.com/dragonflyoss/Dragonfly/dfget/core/uploader" + "github.com/dragonflyoss/Dragonfly/dfget/locator" "github.com/dragonflyoss/Dragonfly/pkg/algorithm" "github.com/dragonflyoss/Dragonfly/pkg/constants" "github.com/dragonflyoss/Dragonfly/pkg/errortypes" @@ -51,23 +52,25 @@ import ( func Start(cfg *config.Config) *errortypes.DfError { var ( supernodeAPI = api.NewSupernodeAPI() - register = regist.NewSupernodeRegister(cfg, supernodeAPI) - err error - result *regist.RegisterResult + // TODO make it pluggable + supernodeLocator = locator.CreateLocator(cfg) + register = regist.NewSupernodeRegister(cfg, supernodeAPI, supernodeLocator) + err error + result *regist.RegisterResult ) printer.Println(fmt.Sprintf("--%s-- %s", cfg.StartTime.Format(config.DefaultTimestampFormat), cfg.URL)) - if err = prepare(cfg); err != nil { + if err = prepare(cfg, supernodeLocator); err != nil { return errortypes.New(config.CodePrepareError, err.Error()) } - if result, err = registerToSuperNode(cfg, register); err != nil { + if result, err = registerToSuperNode(cfg, register, supernodeLocator); err != nil { return errortypes.New(config.CodeRegisterError, err.Error()) } - if err = downloadFile(cfg, supernodeAPI, register, result); err != nil { + if err = downloadFile(cfg, supernodeAPI, supernodeLocator, register, result); err != nil { return errortypes.New(config.CodeDownloadError, err.Error()) } @@ -75,7 +78,7 @@ func Start(cfg *config.Config) *errortypes.DfError { } // prepare the RV-related information and create the corresponding files. -func prepare(cfg *config.Config) (err error) { +func prepare(cfg *config.Config, locator locator.SupernodeLocator) (err error) { printer.Printf("dfget version:%s", version.DFGetVersion) printer.Printf("workspace:%s", cfg.WorkHome) printer.Printf("sign:%s", cfg.Sign) @@ -104,9 +107,8 @@ func prepare(cfg *config.Config) (err error) { } rv.DataDir = cfg.RV.SystemDataDir - cfg.Nodes = adjustSupernodeList(cfg.Nodes) if stringutils.IsEmptyStr(rv.LocalIP) { - rv.LocalIP = checkConnectSupernode(cfg.Nodes) + rv.LocalIP = checkConnectSupernode(locator) } rv.Cid = getCid(rv.LocalIP, cfg.Sign) rv.TaskFileName = getTaskFileName(rv.RealTarget, cfg.Sign) @@ -124,7 +126,7 @@ func launchPeerServer(cfg *config.Config) error { return err } -func registerToSuperNode(cfg *config.Config, register regist.SupernodeRegister) ( +func registerToSuperNode(cfg *config.Config, register regist.SupernodeRegister, supernodeLocator locator.SupernodeLocator) ( *regist.RegisterResult, error) { defer func() { if r := recover(); r != nil { @@ -137,7 +139,7 @@ func registerToSuperNode(cfg *config.Config, register regist.SupernodeRegister) panic("user specified") } - if len(cfg.Nodes) == 0 { + if supernodeLocator == nil || supernodeLocator.Size() == 0 { cfg.BackSourceReason = config.BackSourceReasonNodeEmpty panic("supernode empty") } @@ -162,7 +164,7 @@ func registerToSuperNode(cfg *config.Config, register regist.SupernodeRegister) return result, nil } -func downloadFile(cfg *config.Config, supernodeAPI api.SupernodeAPI, +func downloadFile(cfg *config.Config, supernodeAPI api.SupernodeAPI, locator locator.SupernodeLocator, register regist.SupernodeRegister, result *regist.RegisterResult) error { timeout := calculateTimeout(cfg) @@ -180,7 +182,7 @@ func downloadFile(cfg *config.Config, supernodeAPI api.SupernodeAPI, downloadTime := time.Since(cfg.StartTime).Seconds() // upload metrics to supernode only if pattern is p2p or cdn and result is not nil if cfg.Pattern != config.PatternSource && result != nil { - reportMetrics(cfg, supernodeAPI, downloadTime, result.TaskID, success) + reportMetrics(cfg, supernodeAPI, locator, downloadTime, result.TaskID, success) } if success { @@ -285,21 +287,26 @@ func adjustSupernodeList(nodes []string) []string { } } -func checkConnectSupernode(nodes []string) (localIP string) { +func checkConnectSupernode(locator locator.SupernodeLocator) (localIP string) { var ( e error ) - for _, n := range nodes { - ip, port := netutils.GetIPAndPortFromNode(n, config.DefaultSupernodePort) - if localIP, e = httputils.CheckConnect(ip, port, 1000); e == nil { - return localIP + if locator == nil { + return "" + } + for _, group := range locator.All() { + for _, n := range group.Nodes { + if localIP, e = httputils.CheckConnect(n.IP, n.Port, 1000); e == nil { + return localIP + } + logrus.Errorf("Connect to node:%s error: %v", n, e) } - logrus.Errorf("Connect to node:%s error: %v", n, e) } return "" } -func reportMetrics(cfg *config.Config, supernodeAPI api.SupernodeAPI, downloadTime float64, taskID string, success bool) { +func reportMetrics(cfg *config.Config, supernodeAPI api.SupernodeAPI, locator locator.SupernodeLocator, + downloadTime float64, taskID string, success bool) { req := &types.TaskMetricsRequest{ BacksourceReason: strconv.Itoa(cfg.BackSourceReason), IP: cfg.RV.LocalIP, @@ -311,15 +318,21 @@ func reportMetrics(cfg *config.Config, supernodeAPI api.SupernodeAPI, downloadTi Success: success, TaskID: taskID, } - for _, node := range cfg.Nodes { - resp, err := supernodeAPI.ReportMetrics(node, req) - if err != nil { - logrus.Errorf("failed to report metrics to supernode %s: %v", node, err) - } - if resp != nil && resp.IsSuccess() { - return + node := locator.Get() + if node != nil { + nodeStr := fmt.Sprintf("%s:%d", node.IP, node.Port) + // retry twice + for i := 0; i < 2; i++ { + resp, err := supernodeAPI.ReportMetrics(nodeStr, req) + if err != nil { + logrus.Errorf("failed to report metrics to supernode %s: %v", nodeStr, err) + } + if resp != nil && resp.IsSuccess() { + return + } } } + } func calculateTimeout(cfg *config.Config) time.Duration { diff --git a/dfget/core/core_test.go b/dfget/core/core_test.go index 876aa9444..cb4f2ec56 100644 --- a/dfget/core/core_test.go +++ b/dfget/core/core_test.go @@ -33,6 +33,7 @@ import ( . "github.com/dragonflyoss/Dragonfly/dfget/core/helper" "github.com/dragonflyoss/Dragonfly/dfget/core/regist" "github.com/dragonflyoss/Dragonfly/dfget/core/uploader" + "github.com/dragonflyoss/Dragonfly/dfget/locator" "github.com/dragonflyoss/Dragonfly/pkg/algorithm" "github.com/go-check/check" @@ -68,7 +69,7 @@ func (s *CoreTestSuite) TestPrepare(c *check.C) { cfg := s.createConfig(buf) cfg.Output = filepath.Join(s.workHome, "test.output") - err := prepare(cfg) + err := prepare(cfg, nil) fmt.Printf("%s\nerror:%v", buf.String(), err) } @@ -76,10 +77,12 @@ func (s *CoreTestSuite) TestRegisterToSupernode(c *check.C) { cfg := s.createConfig(&bytes.Buffer{}) m := new(MockSupernodeAPI) m.RegisterFunc = CreateRegisterFunc() - register := regist.NewSupernodeRegister(cfg, m) + nodeStr := "127.0.0.1:8002" + snLocator, _ := locator.NewStaticLocatorFromStr("test", []string{nodeStr}) + register := regist.NewSupernodeRegister(cfg, m, snLocator) var f = func(bc int, errIsNil bool, data *regist.RegisterResult) { - res, e := registerToSuperNode(cfg, register) + res, e := registerToSuperNode(cfg, register, snLocator) c.Assert(res == nil, check.Equals, data == nil) c.Assert(e == nil, check.Equals, errIsNil) c.Assert(cfg.BackSourceReason, check.Equals, bc) @@ -88,6 +91,8 @@ func (s *CoreTestSuite) TestRegisterToSupernode(c *check.C) { } } + tmpGroup := snLocator.Group + snLocator.Group = nil f(config.BackSourceReasonNodeEmpty, true, nil) cfg.Pattern = config.PatternSource @@ -95,11 +100,12 @@ func (s *CoreTestSuite) TestRegisterToSupernode(c *check.C) { uploader.SetupPeerServerExecutor(nil) cfg.Pattern = config.PatternP2P - cfg.Nodes = []string{"x"} + snLocator.Group = tmpGroup + snLocator.Refresh() cfg.URL = "http://x.com" f(config.BackSourceReasonRegisterFail, true, nil) - cfg.Nodes = []string{"x"} + snLocator.Refresh() cfg.URL = "http://taobao.com" cfg.BackSourceReason = config.BackSourceReasonNone } @@ -131,11 +137,13 @@ func (s *CoreTestSuite) TestCheckConnectSupernode(c *check.C) { s.createConfig(buf) nodes := []string{host} - ip := checkConnectSupernode(nodes) + l, _ := locator.NewStaticLocatorFromStr("test", nodes) + ip := checkConnectSupernode(l) c.Assert(ip, check.Equals, "127.0.0.1") buf.Reset() - ip = checkConnectSupernode([]string{"127.0.0.2"}) + l, _ = locator.NewStaticLocatorFromStr("test", []string{"127.0.0.2"}) + ip = checkConnectSupernode(l) c.Assert(strings.Index(buf.String(), "Connect") > 0, check.Equals, true) c.Assert(ip, check.Equals, "") } diff --git a/dfget/core/regist/register.go b/dfget/core/regist/register.go index 359f72395..7e6fac37d 100644 --- a/dfget/core/regist/register.go +++ b/dfget/core/regist/register.go @@ -17,6 +17,7 @@ package regist import ( + "fmt" "io/ioutil" "os" "time" @@ -24,10 +25,10 @@ import ( apiTypes "github.com/dragonflyoss/Dragonfly/apis/types" "github.com/dragonflyoss/Dragonfly/dfget/config" "github.com/dragonflyoss/Dragonfly/dfget/core/api" + "github.com/dragonflyoss/Dragonfly/dfget/locator" "github.com/dragonflyoss/Dragonfly/dfget/types" "github.com/dragonflyoss/Dragonfly/pkg/constants" "github.com/dragonflyoss/Dragonfly/pkg/errortypes" - "github.com/dragonflyoss/Dragonfly/pkg/netutils" "github.com/dragonflyoss/Dragonfly/pkg/stringutils" "github.com/dragonflyoss/Dragonfly/pkg/util" "github.com/dragonflyoss/Dragonfly/version" @@ -42,17 +43,19 @@ type SupernodeRegister interface { type supernodeRegister struct { api api.SupernodeAPI + locator locator.SupernodeLocator cfg *config.Config - lastRegisteredNode string + lastRegisteredNode *locator.Supernode } var _ SupernodeRegister = &supernodeRegister{} // NewSupernodeRegister creates an instance of supernodeRegister. -func NewSupernodeRegister(cfg *config.Config, api api.SupernodeAPI) SupernodeRegister { +func NewSupernodeRegister(cfg *config.Config, api api.SupernodeAPI, locator locator.SupernodeLocator) SupernodeRegister { return &supernodeRegister{ - api: api, - cfg: cfg, + api: api, + locator: locator, + cfg: cfg, } } @@ -61,45 +64,48 @@ func (s *supernodeRegister) Register(peerPort int) (*RegisterResult, *errortypes var ( resp *types.RegisterResponse e error - i int + node *locator.Supernode retryTimes = 0 start = time.Now() ) - logrus.Infof("do register to one of %v", s.cfg.Nodes) - nodes, nLen := s.cfg.Nodes, len(s.cfg.Nodes) + nextOrRetry := func() *locator.Supernode { + if resp != nil && resp.Code == constants.CodeWaitAuth && retryTimes < 3 { + retryTimes++ + logrus.Infof("sleep 1.0 s to wait auth(%d/3)...", retryTimes) + time.Sleep(1000 * time.Millisecond) + return s.locator.Get() + } + return s.locator.Next() + } + + logrus.Infof("do register to one of %v", s.locator) req := s.constructRegisterRequest(peerPort) - for i = 0; i < nLen; i++ { - if s.lastRegisteredNode == nodes[i] { - logrus.Warnf("the last registered node is the same(%s)", nodes[i]) + for node = s.locator.Next(); node != nil; node = nextOrRetry() { + if s.lastRegisteredNode == node { + logrus.Warnf("the last registered node is the same(%v)", s.lastRegisteredNode) continue } - req.SupernodeIP = netutils.ExtractHost(nodes[i]) - resp, e = s.api.Register(nodes[i], req) - logrus.Infof("do register to %s, res:%s error:%v", nodes[i], resp, e) + req.SupernodeIP = node.IP + nodeHost := s.nodeHostStr(node) + resp, e = s.api.Register(nodeHost, req) + logrus.Infof("do register to %s, res:%s error:%v", nodeHost, resp, e) if e != nil { - logrus.Errorf("register to node:%s error:%v", nodes[i], e) continue } if resp.Code == constants.Success || resp.Code == constants.CodeNeedAuth || resp.Code == constants.CodeURLNotReachable { break } - if resp.Code == constants.CodeWaitAuth && retryTimes < 3 { - i-- - retryTimes++ - logrus.Infof("sleep 2.5s to wait auth(%d/3)...", retryTimes) - time.Sleep(2500 * time.Millisecond) - } } - s.setLastRegisteredNode(i) - s.setRemainderNodes(i) + + s.setLastRegisteredNode(node) if err := s.checkResponse(resp, e); err != nil { logrus.Errorf("register fail:%v", err) return nil, err } - result := NewRegisterResult(nodes[i], s.cfg.Nodes, s.cfg.URL, + result := NewRegisterResult(s.nodeHostStr(node), s.cfg.URL, resp.Data.TaskID, resp.Data.FileLength, resp.Data.PieceSize, resp.Data.CDNSource) logrus.Infof("do register result:%s and cost:%.3fs", resp, @@ -120,29 +126,15 @@ func (s *supernodeRegister) checkResponse(resp *types.RegisterResponse, e error) return nil } -func (s *supernodeRegister) setLastRegisteredNode(idx int) { - nLen := len(s.cfg.Nodes) - if nLen <= 0 { - return - } - if idx >= nLen { - s.lastRegisteredNode = "" - return - } - s.lastRegisteredNode = s.cfg.Nodes[idx] +func (s *supernodeRegister) setLastRegisteredNode(node *locator.Supernode) { + s.lastRegisteredNode = node } -func (s *supernodeRegister) setRemainderNodes(idx int) { - nLen := len(s.cfg.Nodes) - if nLen <= 0 { - return +func (s *supernodeRegister) nodeHostStr(node *locator.Supernode) string { + if node == nil { + return "" } - if idx >= nLen { - s.cfg.Nodes = []string{} - return - } - - s.cfg.Nodes = s.cfg.Nodes[idx+1:] + return fmt.Sprintf("%s:%d", node.IP, node.Port) } func (s *supernodeRegister) constructRegisterRequest(port int) *types.RegisterRequest { @@ -188,28 +180,26 @@ func getTaskPath(taskFileName string) string { } // NewRegisterResult creates an instance of RegisterResult. -func NewRegisterResult(node string, remainder []string, url string, +func NewRegisterResult(node string, url string, taskID string, fileLen int64, pieceSize int32, cdnSource apiTypes.CdnSource) *RegisterResult { return &RegisterResult{ - Node: node, - RemainderNodes: remainder, - URL: url, - TaskID: taskID, - FileLength: fileLen, - PieceSize: pieceSize, - CDNSource: cdnSource, + Node: node, + URL: url, + TaskID: taskID, + FileLength: fileLen, + PieceSize: pieceSize, + CDNSource: cdnSource, } } // RegisterResult is the register result set. type RegisterResult struct { - Node string - RemainderNodes []string - URL string - TaskID string - FileLength int64 - PieceSize int32 - CDNSource apiTypes.CdnSource + Node string + URL string + TaskID string + FileLength int64 + PieceSize int32 + CDNSource apiTypes.CdnSource } func (r *RegisterResult) String() string { diff --git a/dfget/core/regist/register_test.go b/dfget/core/regist/register_test.go index 9e7b61562..e0b578071 100644 --- a/dfget/core/regist/register_test.go +++ b/dfget/core/regist/register_test.go @@ -28,6 +28,7 @@ import ( "github.com/dragonflyoss/Dragonfly/apis/types" "github.com/dragonflyoss/Dragonfly/dfget/config" . "github.com/dragonflyoss/Dragonfly/dfget/core/helper" + "github.com/dragonflyoss/Dragonfly/dfget/locator" "github.com/dragonflyoss/Dragonfly/pkg/constants" "github.com/go-check/check" @@ -58,10 +59,9 @@ func (s *RegistTestSuite) TearDownSuite(c *check.C) { } func (s *RegistTestSuite) TestNewRegisterResult(c *check.C) { - result := NewRegisterResult("node", []string{"1"}, "url", "taskID", + result := NewRegisterResult("node", "url", "taskID", 10, 1, "supernode") c.Assert(result.Node, check.Equals, "node") - c.Assert(result.RemainderNodes, check.DeepEquals, []string{"1"}) c.Assert(result.URL, check.Equals, "url") c.Assert(result.TaskID, check.Equals, "taskID") c.Assert(result.FileLength, check.Equals, int64(10)) @@ -78,8 +78,10 @@ func (s *RegistTestSuite) TestSupernodeRegister_Register(c *check.C) { m := new(MockSupernodeAPI) m.RegisterFunc = CreateRegisterFunc() + nodeStr := "127.0.0.1:8002" + snLocator, _ := locator.NewStaticLocatorFromStr("test", []string{nodeStr}) var f = func(ec int, msg string, data *RegisterResult) { - register := NewSupernodeRegister(cfg, m) + register := NewSupernodeRegister(cfg, m, snLocator) resp, e := register.Register(0) if msg == "" { c.Assert(e, check.IsNil) @@ -92,24 +94,24 @@ func (s *RegistTestSuite) TestSupernodeRegister_Register(c *check.C) { } } - cfg.Nodes = []string{""} + snLocator.Next() f(constants.HTTPError, "empty response, unknown error", nil) - cfg.Nodes = []string{"x"} + snLocator.Refresh() f(501, "invalid source url", nil) - cfg.Nodes = []string{"x"} + snLocator.Refresh() cfg.URL = "http://taobao.com" f(constants.CodeNeedAuth, "need auth", nil) - cfg.Nodes = []string{"x"} + snLocator.Refresh() cfg.URL = "http://github.com" f(constants.CodeWaitAuth, "wait auth", nil) - cfg.Nodes = []string{"x"} + snLocator.Refresh() cfg.URL = "http://lowzj.com" f(constants.Success, "", &RegisterResult{ - Node: "x", RemainderNodes: []string{}, URL: cfg.URL, TaskID: "a", + Node: nodeStr, URL: cfg.URL, TaskID: "a", FileLength: 100, PieceSize: 10}) f(constants.HTTPError, "empty response, unknown error", nil) @@ -118,7 +120,7 @@ func (s *RegistTestSuite) TestSupernodeRegister_Register(c *check.C) { func (s *RegistTestSuite) TestSupernodeRegister_constructRegisterRequest(c *check.C) { buf := &bytes.Buffer{} cfg := s.createConfig(buf) - register := &supernodeRegister{nil, cfg, ""} + register := &supernodeRegister{nil, nil, cfg, nil} cfg.Identifier = "id" req := register.constructRegisterRequest(0) diff --git a/dfget/locator/locator.go b/dfget/locator/locator.go index 2627ea93d..49ee1fffa 100644 --- a/dfget/locator/locator.go +++ b/dfget/locator/locator.go @@ -16,6 +16,10 @@ package locator +import ( + "fmt" +) + // SupernodeLocator defines the way how to get available supernodes. // Developers can implement their own locator more flexibly , not just get the // supernode list from configuration or CLI. @@ -75,6 +79,10 @@ type Supernode struct { Metrics *SupernodeMetrics } +func (s *Supernode) String() string { + return fmt.Sprintf("%s:%d", s.IP, s.Port) +} + // SupernodeMetrics holds metrics used for the locator to choose supernode. type SupernodeMetrics struct { Metrics map[string]interface{} diff --git a/dfget/locator/manager.go b/dfget/locator/manager.go new file mode 100644 index 000000000..81a60bdc8 --- /dev/null +++ b/dfget/locator/manager.go @@ -0,0 +1,64 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package locator + +import ( + "sync" + + "github.com/dragonflyoss/Dragonfly/dfget/config" +) + +const ( + // GroupDefaultName indicates all the supernodes in this group are set as + // default value. It's only used when the supernode list from configuration + // or CLI is empty. + GroupDefaultName = "default" + + // GroupConfigName indicates all the supernodes in this group come from + // configuration or CLI. + GroupConfigName = "config" +) + +var mutex sync.Mutex + +// DefaultBuilder returns a supernode locator with default value when supernodes +// in the config file is empty. +var DefaultBuilder Builder = func(cfg *config.Config) SupernodeLocator { + if cfg == nil || len(cfg.Nodes) == 0 { + return NewStaticLocator(GroupDefaultName, config.GetDefaultSupernodesValue()) + } + locator, _ := NewStaticLocatorFromStr(GroupConfigName, cfg.Nodes) + return locator +} + +// Builder defines the constructor of SupernodeLocator. +type Builder func(cfg *config.Config) SupernodeLocator + +// CreateLocator creates a supernode locator with the giving config. +func CreateLocator(cfg *config.Config) SupernodeLocator { + mutex.Lock() + defer mutex.Unlock() + return DefaultBuilder(cfg) +} + +// RegisterLocator provides a way for users to customize SupernodeLocator. +// This function should be invoked before CreateLocator. +func RegisterLocator(builder Builder) { + mutex.Lock() + defer mutex.Unlock() + DefaultBuilder = builder +} diff --git a/dfget/locator/static_locator.go b/dfget/locator/static_locator.go index ec3369ccc..18dd75976 100644 --- a/dfget/locator/static_locator.go +++ b/dfget/locator/static_locator.go @@ -17,6 +17,7 @@ package locator import ( + "fmt" "sync/atomic" "github.com/dragonflyoss/Dragonfly/dfget/config" @@ -24,8 +25,6 @@ import ( "github.com/dragonflyoss/Dragonfly/pkg/netutils" ) -const staticLocatorGroupName = "config" - var _ SupernodeLocator = &StaticLocator{} // StaticLocator uses the nodes passed from configuration or CLI. @@ -39,7 +38,7 @@ type StaticLocator struct { // NewStaticLocator constructs StaticLocator which uses the nodes passed from // configuration or CLI. -func NewStaticLocator(nodes []*config.NodeWeight) *StaticLocator { +func NewStaticLocator(groupName string, nodes []*config.NodeWeight) *StaticLocator { locator := &StaticLocator{ idx: -1, } @@ -47,7 +46,7 @@ func NewStaticLocator(nodes []*config.NodeWeight) *StaticLocator { return locator } group := &SupernodeGroup{ - Name: staticLocatorGroupName, + Name: groupName, } for _, node := range nodes { ip, port := netutils.GetIPAndPortFromNode(node.Node, config.DefaultSupernodePort) @@ -59,7 +58,7 @@ func NewStaticLocator(nodes []*config.NodeWeight) *StaticLocator { IP: ip, Port: port, Weight: node.Weight, - GroupName: staticLocatorGroupName, + GroupName: groupName, } for i := 0; i < supernode.Weight; i++ { group.Nodes = append(group.Nodes, supernode) @@ -72,12 +71,12 @@ func NewStaticLocator(nodes []*config.NodeWeight) *StaticLocator { // NewStaticLocatorFromStr constructs StaticLocator from string list. // The format of nodes is: ip:port=weight -func NewStaticLocatorFromStr(nodes []string) (*StaticLocator, error) { +func NewStaticLocatorFromStr(groupName string, nodes []string) (*StaticLocator, error) { nodeWeight, err := config.ParseNodesSlice(nodes) if err != nil { return nil, err } - return NewStaticLocator(nodeWeight), nil + return NewStaticLocator(groupName, nodeWeight), nil } // ---------------------------------------------------------------------------- @@ -128,6 +127,20 @@ func (s *StaticLocator) Refresh() bool { return true } +func (s *StaticLocator) String() string { + idx := s.load() + if s.Group == nil || idx >= len(s.Group.Nodes) { + return "empty" + } + + nodes := make([]string, len(s.Group.Nodes)-idx-1) + for i := idx + 1; i < len(s.Group.Nodes); i++ { + n := s.Group.GetNode(i) + nodes[i-idx-1] = fmt.Sprintf("%s:%d=%d", n.IP, n.Port, n.Weight) + } + return s.Group.Name + ":" + fmt.Sprintf("%v", nodes) +} + // ---------------------------------------------------------------------------- // private methods of StaticLocator diff --git a/dfget/locator/static_locator_test.go b/dfget/locator/static_locator_test.go index ef18d9a43..4289aa7df 100644 --- a/dfget/locator/static_locator_test.go +++ b/dfget/locator/static_locator_test.go @@ -37,25 +37,27 @@ func init() { check.Suite(&StaticLocatorTestSuite{}) } +var testGroupName = "test-group" + func (s *StaticLocatorTestSuite) Test_NewStaticLocator(c *check.C) { rand.Seed(0) - l := NewStaticLocator(nil) + l := NewStaticLocator(testGroupName, nil) c.Assert(l, check.NotNil) c.Assert(l.idx, check.Equals, int32(-1)) c.Assert(l.Group, check.IsNil) - l = NewStaticLocator([]*config.NodeWeight{}) + l = NewStaticLocator(testGroupName, []*config.NodeWeight{}) c.Assert(l, check.NotNil) c.Assert(l.idx, check.Equals, int32(-1)) c.Assert(l.Group, check.IsNil) - l = NewStaticLocator([]*config.NodeWeight{ + l = NewStaticLocator(testGroupName, []*config.NodeWeight{ {Node: "a:80", Weight: 1}, {Node: "a:81", Weight: 2}, }) c.Assert(l, check.NotNil) c.Assert(l.Group, check.DeepEquals, &SupernodeGroup{ - Name: staticLocatorGroupName, + Name: testGroupName, Nodes: shuffleNodes([]*Supernode{ create("a", 80, 1), create("a", 81, 2), @@ -77,7 +79,7 @@ func (s *StaticLocatorTestSuite) Test_NewStaticLocatorFromString(c *check.C) { } for _, v := range cases { - l, err := NewStaticLocatorFromStr(strings.Split(v.nodes, ",")) + l, err := NewStaticLocatorFromStr(testGroupName, strings.Split(v.nodes, ",")) if v.err { c.Assert(err, check.NotNil) c.Assert(l, check.IsNil) @@ -98,7 +100,7 @@ func (s *StaticLocatorTestSuite) Test_Get(c *check.C) { {"a:80=1", create("a", 80, 1)}, } for _, v := range cases { - l, _ := NewStaticLocatorFromStr(strings.Split(v.nodes, ",")) + l := createLocator(strings.Split(v.nodes, ",")...) sn := l.Get() c.Assert(sn, check.IsNil) l.Next() @@ -129,7 +131,7 @@ func (s *StaticLocatorTestSuite) Test_Next(c *check.C) { var sn *Supernode for _, v := range cases { - l, _ := NewStaticLocatorFromStr(strings.Split(v.nodes, ",")) + l := createLocator(strings.Split(v.nodes, ",")...) for i := 0; i < v.cnt; i++ { sn = l.Next() } @@ -143,8 +145,8 @@ func (s *StaticLocatorTestSuite) Test_Next(c *check.C) { } func (s *StaticLocatorTestSuite) Test_GetGroup(c *check.C) { - l, _ := NewStaticLocatorFromStr([]string{"a:80=1"}) - group := l.GetGroup(staticLocatorGroupName) + l := createLocator("a:80=1") + group := l.GetGroup(testGroupName) c.Assert(group, check.NotNil) c.Assert(group.Nodes[0], check.DeepEquals, create("a", 80, 1)) @@ -153,14 +155,14 @@ func (s *StaticLocatorTestSuite) Test_GetGroup(c *check.C) { } func (s *StaticLocatorTestSuite) Test_All(c *check.C) { - l, _ := NewStaticLocatorFromStr([]string{"a:80=1"}) + l := createLocator("a:80=1") groups := l.All() c.Assert(groups, check.NotNil) c.Assert(len(groups), check.Equals, 1) } func (s *StaticLocatorTestSuite) Test_Refresh(c *check.C) { - l, _ := NewStaticLocatorFromStr([]string{"a:80=1"}) + l := createLocator("a:80=1") _ = l.Next() c.Assert(l.load(), check.Equals, 0) @@ -174,6 +176,10 @@ func create(ip string, port, weight int) *Supernode { IP: ip, Port: port, Weight: weight, - GroupName: staticLocatorGroupName, + GroupName: testGroupName, } } +func createLocator(nodes ...string) *StaticLocator { + l, _ := NewStaticLocatorFromStr(testGroupName, nodes) + return l +}