diff --git a/dfget/core/core.go b/dfget/core/core.go index 49ee14b6a..080bb6217 100644 --- a/dfget/core/core.go +++ b/dfget/core/core.go @@ -34,6 +34,7 @@ import ( p2pDown "github.com/dragonflyoss/Dragonfly/dfget/core/downloader/p2p_downloader" "github.com/dragonflyoss/Dragonfly/dfget/core/regist" "github.com/dragonflyoss/Dragonfly/dfget/core/uploader" + "github.com/dragonflyoss/Dragonfly/dfget/locator" "github.com/dragonflyoss/Dragonfly/pkg/algorithm" "github.com/dragonflyoss/Dragonfly/pkg/constants" "github.com/dragonflyoss/Dragonfly/pkg/errortypes" @@ -51,9 +52,11 @@ import ( func Start(cfg *config.Config) *errortypes.DfError { var ( supernodeAPI = api.NewSupernodeAPI() - register = regist.NewSupernodeRegister(cfg, supernodeAPI) - err error - result *regist.RegisterResult + // TODO make it pluggable + supernodeLocator = locator.GetLocator(cfg) + register = regist.NewSupernodeRegister(cfg, supernodeAPI, supernodeLocator) + err error + result *regist.RegisterResult ) printer.Println(fmt.Sprintf("--%s-- %s", @@ -63,7 +66,7 @@ func Start(cfg *config.Config) *errortypes.DfError { return errortypes.New(config.CodePrepareError, err.Error()) } - if result, err = registerToSuperNode(cfg, register); err != nil { + if result, err = registerToSuperNode(cfg, register, supernodeLocator); err != nil { return errortypes.New(config.CodeRegisterError, err.Error()) } @@ -124,7 +127,7 @@ func launchPeerServer(cfg *config.Config) error { return err } -func registerToSuperNode(cfg *config.Config, register regist.SupernodeRegister) ( +func registerToSuperNode(cfg *config.Config, register regist.SupernodeRegister, supernodeLocator locator.SupernodeLocator) ( *regist.RegisterResult, error) { defer func() { if r := recover(); r != nil { @@ -137,7 +140,7 @@ func registerToSuperNode(cfg *config.Config, register regist.SupernodeRegister) panic("user specified") } - if len(cfg.Nodes) == 0 { + if supernodeLocator == nil || supernodeLocator.Size() == 0 { cfg.BackSourceReason = config.BackSourceReasonNodeEmpty panic("supernode empty") } diff --git a/dfget/core/core_test.go b/dfget/core/core_test.go index 876aa9444..24817a39c 100644 --- a/dfget/core/core_test.go +++ b/dfget/core/core_test.go @@ -33,6 +33,7 @@ import ( . "github.com/dragonflyoss/Dragonfly/dfget/core/helper" "github.com/dragonflyoss/Dragonfly/dfget/core/regist" "github.com/dragonflyoss/Dragonfly/dfget/core/uploader" + "github.com/dragonflyoss/Dragonfly/dfget/locator" "github.com/dragonflyoss/Dragonfly/pkg/algorithm" "github.com/go-check/check" @@ -76,10 +77,12 @@ func (s *CoreTestSuite) TestRegisterToSupernode(c *check.C) { cfg := s.createConfig(&bytes.Buffer{}) m := new(MockSupernodeAPI) m.RegisterFunc = CreateRegisterFunc() - register := regist.NewSupernodeRegister(cfg, m) + nodeStr := "127.0.0.1:8002" + snLocator, _ := locator.NewStaticLocatorFromStr([]string{nodeStr}) + register := regist.NewSupernodeRegister(cfg, m, snLocator) var f = func(bc int, errIsNil bool, data *regist.RegisterResult) { - res, e := registerToSuperNode(cfg, register) + res, e := registerToSuperNode(cfg, register, snLocator) c.Assert(res == nil, check.Equals, data == nil) c.Assert(e == nil, check.Equals, errIsNil) c.Assert(cfg.BackSourceReason, check.Equals, bc) @@ -88,6 +91,8 @@ func (s *CoreTestSuite) TestRegisterToSupernode(c *check.C) { } } + tmpGroup := snLocator.Group + snLocator.Group = nil f(config.BackSourceReasonNodeEmpty, true, nil) cfg.Pattern = config.PatternSource @@ -95,11 +100,12 @@ func (s *CoreTestSuite) TestRegisterToSupernode(c *check.C) { uploader.SetupPeerServerExecutor(nil) cfg.Pattern = config.PatternP2P - cfg.Nodes = []string{"x"} + snLocator.Group = tmpGroup + snLocator.Refresh() cfg.URL = "http://x.com" f(config.BackSourceReasonRegisterFail, true, nil) - cfg.Nodes = []string{"x"} + snLocator.Refresh() cfg.URL = "http://taobao.com" cfg.BackSourceReason = config.BackSourceReasonNone } diff --git a/dfget/core/regist/register.go b/dfget/core/regist/register.go index 359f72395..cd63dffe6 100644 --- a/dfget/core/regist/register.go +++ b/dfget/core/regist/register.go @@ -17,6 +17,7 @@ package regist import ( + "fmt" "io/ioutil" "os" "time" @@ -24,10 +25,10 @@ import ( apiTypes "github.com/dragonflyoss/Dragonfly/apis/types" "github.com/dragonflyoss/Dragonfly/dfget/config" "github.com/dragonflyoss/Dragonfly/dfget/core/api" + "github.com/dragonflyoss/Dragonfly/dfget/locator" "github.com/dragonflyoss/Dragonfly/dfget/types" "github.com/dragonflyoss/Dragonfly/pkg/constants" "github.com/dragonflyoss/Dragonfly/pkg/errortypes" - "github.com/dragonflyoss/Dragonfly/pkg/netutils" "github.com/dragonflyoss/Dragonfly/pkg/stringutils" "github.com/dragonflyoss/Dragonfly/pkg/util" "github.com/dragonflyoss/Dragonfly/version" @@ -42,17 +43,19 @@ type SupernodeRegister interface { type supernodeRegister struct { api api.SupernodeAPI + locator locator.SupernodeLocator cfg *config.Config - lastRegisteredNode string + lastRegisteredNode *locator.Supernode } var _ SupernodeRegister = &supernodeRegister{} // NewSupernodeRegister creates an instance of supernodeRegister. -func NewSupernodeRegister(cfg *config.Config, api api.SupernodeAPI) SupernodeRegister { +func NewSupernodeRegister(cfg *config.Config, api api.SupernodeAPI, locator locator.SupernodeLocator) SupernodeRegister { return &supernodeRegister{ - api: api, - cfg: cfg, + api: api, + locator: locator, + cfg: cfg, } } @@ -61,45 +64,48 @@ func (s *supernodeRegister) Register(peerPort int) (*RegisterResult, *errortypes var ( resp *types.RegisterResponse e error - i int + node *locator.Supernode retryTimes = 0 start = time.Now() ) + nextOrRetry := func() *locator.Supernode { + if resp != nil && resp.Code == constants.CodeWaitAuth && retryTimes < 3 { + retryTimes++ + logrus.Infof("sleep 1.0 s to wait auth(%d/3)...", retryTimes) + time.Sleep(1000 * time.Millisecond) + return s.locator.Get() + } + return s.locator.Next() + } + logrus.Infof("do register to one of %v", s.cfg.Nodes) - nodes, nLen := s.cfg.Nodes, len(s.cfg.Nodes) req := s.constructRegisterRequest(peerPort) - for i = 0; i < nLen; i++ { - if s.lastRegisteredNode == nodes[i] { - logrus.Warnf("the last registered node is the same(%s)", nodes[i]) + for node = s.locator.Next(); node != nil; node = nextOrRetry() { + if s.lastRegisteredNode == node { + logrus.Warnf("the last registered node is the same(%v)", s.lastRegisteredNode) continue } - req.SupernodeIP = netutils.ExtractHost(nodes[i]) - resp, e = s.api.Register(nodes[i], req) - logrus.Infof("do register to %s, res:%s error:%v", nodes[i], resp, e) + req.SupernodeIP = node.IP + nodeHost := s.nodeHostStr(node) + resp, e = s.api.Register(nodeHost, req) + logrus.Infof("do register to %s, res:%s error:%v", nodeHost, resp, e) if e != nil { - logrus.Errorf("register to node:%s error:%v", nodes[i], e) continue } if resp.Code == constants.Success || resp.Code == constants.CodeNeedAuth || resp.Code == constants.CodeURLNotReachable { break } - if resp.Code == constants.CodeWaitAuth && retryTimes < 3 { - i-- - retryTimes++ - logrus.Infof("sleep 2.5s to wait auth(%d/3)...", retryTimes) - time.Sleep(2500 * time.Millisecond) - } } - s.setLastRegisteredNode(i) - s.setRemainderNodes(i) + + s.setLastRegisteredNode(node) if err := s.checkResponse(resp, e); err != nil { logrus.Errorf("register fail:%v", err) return nil, err } - result := NewRegisterResult(nodes[i], s.cfg.Nodes, s.cfg.URL, + result := NewRegisterResult(s.nodeHostStr(node), s.cfg.URL, resp.Data.TaskID, resp.Data.FileLength, resp.Data.PieceSize, resp.Data.CDNSource) logrus.Infof("do register result:%s and cost:%.3fs", resp, @@ -120,29 +126,15 @@ func (s *supernodeRegister) checkResponse(resp *types.RegisterResponse, e error) return nil } -func (s *supernodeRegister) setLastRegisteredNode(idx int) { - nLen := len(s.cfg.Nodes) - if nLen <= 0 { - return - } - if idx >= nLen { - s.lastRegisteredNode = "" - return - } - s.lastRegisteredNode = s.cfg.Nodes[idx] +func (s *supernodeRegister) setLastRegisteredNode(node *locator.Supernode) { + s.lastRegisteredNode = node } -func (s *supernodeRegister) setRemainderNodes(idx int) { - nLen := len(s.cfg.Nodes) - if nLen <= 0 { - return +func (s *supernodeRegister) nodeHostStr(node *locator.Supernode) string { + if node == nil { + return "" } - if idx >= nLen { - s.cfg.Nodes = []string{} - return - } - - s.cfg.Nodes = s.cfg.Nodes[idx+1:] + return fmt.Sprintf("%s:%d", node.IP, node.Port) } func (s *supernodeRegister) constructRegisterRequest(port int) *types.RegisterRequest { @@ -188,28 +180,26 @@ func getTaskPath(taskFileName string) string { } // NewRegisterResult creates an instance of RegisterResult. -func NewRegisterResult(node string, remainder []string, url string, +func NewRegisterResult(node string, url string, taskID string, fileLen int64, pieceSize int32, cdnSource apiTypes.CdnSource) *RegisterResult { return &RegisterResult{ - Node: node, - RemainderNodes: remainder, - URL: url, - TaskID: taskID, - FileLength: fileLen, - PieceSize: pieceSize, - CDNSource: cdnSource, + Node: node, + URL: url, + TaskID: taskID, + FileLength: fileLen, + PieceSize: pieceSize, + CDNSource: cdnSource, } } // RegisterResult is the register result set. type RegisterResult struct { - Node string - RemainderNodes []string - URL string - TaskID string - FileLength int64 - PieceSize int32 - CDNSource apiTypes.CdnSource + Node string + URL string + TaskID string + FileLength int64 + PieceSize int32 + CDNSource apiTypes.CdnSource } func (r *RegisterResult) String() string { diff --git a/dfget/core/regist/register_test.go b/dfget/core/regist/register_test.go index 9e7b61562..fb09ff995 100644 --- a/dfget/core/regist/register_test.go +++ b/dfget/core/regist/register_test.go @@ -28,6 +28,7 @@ import ( "github.com/dragonflyoss/Dragonfly/apis/types" "github.com/dragonflyoss/Dragonfly/dfget/config" . "github.com/dragonflyoss/Dragonfly/dfget/core/helper" + "github.com/dragonflyoss/Dragonfly/dfget/locator" "github.com/dragonflyoss/Dragonfly/pkg/constants" "github.com/go-check/check" @@ -58,10 +59,9 @@ func (s *RegistTestSuite) TearDownSuite(c *check.C) { } func (s *RegistTestSuite) TestNewRegisterResult(c *check.C) { - result := NewRegisterResult("node", []string{"1"}, "url", "taskID", + result := NewRegisterResult("node", "url", "taskID", 10, 1, "supernode") c.Assert(result.Node, check.Equals, "node") - c.Assert(result.RemainderNodes, check.DeepEquals, []string{"1"}) c.Assert(result.URL, check.Equals, "url") c.Assert(result.TaskID, check.Equals, "taskID") c.Assert(result.FileLength, check.Equals, int64(10)) @@ -78,8 +78,10 @@ func (s *RegistTestSuite) TestSupernodeRegister_Register(c *check.C) { m := new(MockSupernodeAPI) m.RegisterFunc = CreateRegisterFunc() + nodeStr := "127.0.0.1:8002" + snLocator, _ := locator.NewStaticLocatorFromStr([]string{nodeStr}) var f = func(ec int, msg string, data *RegisterResult) { - register := NewSupernodeRegister(cfg, m) + register := NewSupernodeRegister(cfg, m, snLocator) resp, e := register.Register(0) if msg == "" { c.Assert(e, check.IsNil) @@ -92,24 +94,24 @@ func (s *RegistTestSuite) TestSupernodeRegister_Register(c *check.C) { } } - cfg.Nodes = []string{""} + snLocator.Next() f(constants.HTTPError, "empty response, unknown error", nil) - cfg.Nodes = []string{"x"} + snLocator.Refresh() f(501, "invalid source url", nil) - cfg.Nodes = []string{"x"} + snLocator.Refresh() cfg.URL = "http://taobao.com" f(constants.CodeNeedAuth, "need auth", nil) - cfg.Nodes = []string{"x"} + snLocator.Refresh() cfg.URL = "http://github.com" f(constants.CodeWaitAuth, "wait auth", nil) - cfg.Nodes = []string{"x"} + snLocator.Refresh() cfg.URL = "http://lowzj.com" f(constants.Success, "", &RegisterResult{ - Node: "x", RemainderNodes: []string{}, URL: cfg.URL, TaskID: "a", + Node: nodeStr, URL: cfg.URL, TaskID: "a", FileLength: 100, PieceSize: 10}) f(constants.HTTPError, "empty response, unknown error", nil) @@ -118,7 +120,7 @@ func (s *RegistTestSuite) TestSupernodeRegister_Register(c *check.C) { func (s *RegistTestSuite) TestSupernodeRegister_constructRegisterRequest(c *check.C) { buf := &bytes.Buffer{} cfg := s.createConfig(buf) - register := &supernodeRegister{nil, cfg, ""} + register := &supernodeRegister{nil, nil, cfg, nil} cfg.Identifier = "id" req := register.constructRegisterRequest(0) diff --git a/dfget/locator/manager.go b/dfget/locator/manager.go new file mode 100644 index 000000000..12d9b3030 --- /dev/null +++ b/dfget/locator/manager.go @@ -0,0 +1,48 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package locator + +import ( + "sync" + + "github.com/dragonflyoss/Dragonfly/dfget/config" +) + +var DefaultBuilder Builder = func(cfg *config.Config) SupernodeLocator { + return NewStaticLocator(cfg.Supernodes) +} + +var DefaultLocator SupernodeLocator + +var mutex sync.Mutex + +type Builder func(cfg *config.Config) SupernodeLocator + +func GetLocator(cfg *config.Config) SupernodeLocator { + mutex.Lock() + defer mutex.Unlock() + if DefaultLocator == nil { + DefaultLocator = DefaultBuilder(cfg) + } + return DefaultLocator +} + +func RegisterLocator(builder Builder) { + mutex.Lock() + defer mutex.Unlock() + DefaultBuilder = builder +}