From 1d5d7b66e7b03d8d259cc8d24d758234185938f8 Mon Sep 17 00:00:00 2001 From: lowzj Date: Wed, 6 May 2020 20:47:35 +0800 Subject: [PATCH] refactor: refactor dfget/core with SupernodeLocator Signed-off-by: lowzj --- dfget/core/core.go | 11 +-- dfget/core/core_test.go | 2 +- dfget/core/helper/test_helper.go | 37 ++++++++++ dfget/core/regist/register.go | 106 +++++++++++++---------------- dfget/core/regist/register_test.go | 7 +- 5 files changed, 96 insertions(+), 67 deletions(-) diff --git a/dfget/core/core.go b/dfget/core/core.go index 22965d9cb..794c799b3 100644 --- a/dfget/core/core.go +++ b/dfget/core/core.go @@ -34,6 +34,7 @@ import ( p2pDown "github.com/dragonflyoss/Dragonfly/dfget/core/downloader/p2p_downloader" "github.com/dragonflyoss/Dragonfly/dfget/core/regist" "github.com/dragonflyoss/Dragonfly/dfget/core/uploader" + "github.com/dragonflyoss/Dragonfly/dfget/locator" "github.com/dragonflyoss/Dragonfly/pkg/algorithm" "github.com/dragonflyoss/Dragonfly/pkg/constants" "github.com/dragonflyoss/Dragonfly/pkg/errortypes" @@ -54,10 +55,12 @@ func init() { // Start function creates a new task and starts it to download file. func Start(cfg *config.Config) *errortypes.DfError { var ( - supernodeAPI = api.NewSupernodeAPI() - register = regist.NewSupernodeRegister(cfg, supernodeAPI) - err error - result *regist.RegisterResult + supernodeAPI = api.NewSupernodeAPI() + // TODO make it pluggable + supernodeLocator, _ = locator.NewStaticLocatorFromStr(append(cfg.Nodes, cfg.Nodes...)) + register = regist.NewSupernodeRegister(cfg, supernodeAPI, supernodeLocator) + err error + result *regist.RegisterResult ) printer.Println(fmt.Sprintf("--%s-- %s", diff --git a/dfget/core/core_test.go b/dfget/core/core_test.go index 876aa9444..556d489a9 100644 --- a/dfget/core/core_test.go +++ b/dfget/core/core_test.go @@ -76,7 +76,7 @@ func (s *CoreTestSuite) TestRegisterToSupernode(c *check.C) { cfg := s.createConfig(&bytes.Buffer{}) m := new(MockSupernodeAPI) m.RegisterFunc = CreateRegisterFunc() - register := regist.NewSupernodeRegister(cfg, m) + register := regist.NewSupernodeRegister(cfg, m, &MockSupernodeLocator{}) var f = func(bc int, errIsNil bool, data *regist.RegisterResult) { res, e := registerToSuperNode(cfg, register) diff --git a/dfget/core/helper/test_helper.go b/dfget/core/helper/test_helper.go index 8ccd8acb9..e265b56fe 100644 --- a/dfget/core/helper/test_helper.go +++ b/dfget/core/helper/test_helper.go @@ -33,6 +33,7 @@ import ( api_types "github.com/dragonflyoss/Dragonfly/apis/types" "github.com/dragonflyoss/Dragonfly/dfget/config" "github.com/dragonflyoss/Dragonfly/dfget/core/api" + "github.com/dragonflyoss/Dragonfly/dfget/locator" "github.com/dragonflyoss/Dragonfly/dfget/types" "github.com/dragonflyoss/Dragonfly/pkg/constants" "github.com/dragonflyoss/Dragonfly/pkg/fileutils" @@ -394,3 +395,39 @@ type mockFile struct { size int64 repeatStr []byte } + +// ---------------------------------------------------------------------------- +// MockSupernodeLocator + +var _ locator.SupernodeLocator = &MockSupernodeLocator{} + +type MockSupernodeLocator struct { +} + +func (m *MockSupernodeLocator) Get() *locator.Supernode { + panic("implement me") +} + +func (m *MockSupernodeLocator) Next() *locator.Supernode { + return &locator.Supernode{ + IP: "127.0.0.1", + Port: config.DefaultSupernodePort, + } +} + +func (m *MockSupernodeLocator) GetGroup(name string) *locator.SupernodeGroup { + panic("implement me") +} + +func (m *MockSupernodeLocator) All() []*locator.SupernodeGroup { + panic("implement me") +} + +func (m *MockSupernodeLocator) Report(node string, metrics *locator.SupernodeMetrics) { + panic("implement me") +} + +func (m *MockSupernodeLocator) Refresh() bool { + panic("implement me") +} + diff --git a/dfget/core/regist/register.go b/dfget/core/regist/register.go index 359f72395..e24010cd8 100644 --- a/dfget/core/regist/register.go +++ b/dfget/core/regist/register.go @@ -17,6 +17,7 @@ package regist import ( + "fmt" "io/ioutil" "os" "time" @@ -24,10 +25,10 @@ import ( apiTypes "github.com/dragonflyoss/Dragonfly/apis/types" "github.com/dragonflyoss/Dragonfly/dfget/config" "github.com/dragonflyoss/Dragonfly/dfget/core/api" + "github.com/dragonflyoss/Dragonfly/dfget/locator" "github.com/dragonflyoss/Dragonfly/dfget/types" "github.com/dragonflyoss/Dragonfly/pkg/constants" "github.com/dragonflyoss/Dragonfly/pkg/errortypes" - "github.com/dragonflyoss/Dragonfly/pkg/netutils" "github.com/dragonflyoss/Dragonfly/pkg/stringutils" "github.com/dragonflyoss/Dragonfly/pkg/util" "github.com/dragonflyoss/Dragonfly/version" @@ -42,17 +43,19 @@ type SupernodeRegister interface { type supernodeRegister struct { api api.SupernodeAPI + locator locator.SupernodeLocator cfg *config.Config - lastRegisteredNode string + lastRegisteredNode *locator.Supernode } var _ SupernodeRegister = &supernodeRegister{} // NewSupernodeRegister creates an instance of supernodeRegister. -func NewSupernodeRegister(cfg *config.Config, api api.SupernodeAPI) SupernodeRegister { +func NewSupernodeRegister(cfg *config.Config, api api.SupernodeAPI, locator locator.SupernodeLocator) SupernodeRegister { return &supernodeRegister{ - api: api, - cfg: cfg, + api: api, + locator: locator, + cfg: cfg, } } @@ -61,45 +64,48 @@ func (s *supernodeRegister) Register(peerPort int) (*RegisterResult, *errortypes var ( resp *types.RegisterResponse e error - i int + node *locator.Supernode retryTimes = 0 start = time.Now() ) + nextOrRetry := func() *locator.Supernode { + if resp != nil && resp.Code == constants.CodeWaitAuth && retryTimes < 3 { + retryTimes++ + logrus.Infof("sleep 1.0 s to wait auth(%d/3)...", retryTimes) + time.Sleep(1000 * time.Millisecond) + return s.locator.Get() + } + return s.locator.Next() + } + logrus.Infof("do register to one of %v", s.cfg.Nodes) - nodes, nLen := s.cfg.Nodes, len(s.cfg.Nodes) req := s.constructRegisterRequest(peerPort) - for i = 0; i < nLen; i++ { - if s.lastRegisteredNode == nodes[i] { - logrus.Warnf("the last registered node is the same(%s)", nodes[i]) + for node = s.locator.Next(); node != nil; node = nextOrRetry() { + if s.lastRegisteredNode == node { + logrus.Warnf("the last registered node is the same(%s)", s.lastRegisteredNode) continue } - req.SupernodeIP = netutils.ExtractHost(nodes[i]) - resp, e = s.api.Register(nodes[i], req) - logrus.Infof("do register to %s, res:%s error:%v", nodes[i], resp, e) + req.SupernodeIP = node.IP + nodeHost := s.nodeHostStr(node) + resp, e = s.api.Register(nodeHost, req) + logrus.Infof("do register to %s, res:%s error:%v", nodeHost, resp, e) if e != nil { - logrus.Errorf("register to node:%s error:%v", nodes[i], e) continue } if resp.Code == constants.Success || resp.Code == constants.CodeNeedAuth || resp.Code == constants.CodeURLNotReachable { break } - if resp.Code == constants.CodeWaitAuth && retryTimes < 3 { - i-- - retryTimes++ - logrus.Infof("sleep 2.5s to wait auth(%d/3)...", retryTimes) - time.Sleep(2500 * time.Millisecond) - } } - s.setLastRegisteredNode(i) - s.setRemainderNodes(i) + + s.setLastRegisteredNode(node) if err := s.checkResponse(resp, e); err != nil { logrus.Errorf("register fail:%v", err) return nil, err } - result := NewRegisterResult(nodes[i], s.cfg.Nodes, s.cfg.URL, + result := NewRegisterResult(s.nodeHostStr(node), s.cfg.URL, resp.Data.TaskID, resp.Data.FileLength, resp.Data.PieceSize, resp.Data.CDNSource) logrus.Infof("do register result:%s and cost:%.3fs", resp, @@ -120,29 +126,15 @@ func (s *supernodeRegister) checkResponse(resp *types.RegisterResponse, e error) return nil } -func (s *supernodeRegister) setLastRegisteredNode(idx int) { - nLen := len(s.cfg.Nodes) - if nLen <= 0 { - return - } - if idx >= nLen { - s.lastRegisteredNode = "" - return - } - s.lastRegisteredNode = s.cfg.Nodes[idx] +func (s *supernodeRegister) setLastRegisteredNode(node *locator.Supernode) { + s.lastRegisteredNode = node } -func (s *supernodeRegister) setRemainderNodes(idx int) { - nLen := len(s.cfg.Nodes) - if nLen <= 0 { - return +func (s *supernodeRegister) nodeHostStr(node *locator.Supernode) string { + if node == nil { + return "" } - if idx >= nLen { - s.cfg.Nodes = []string{} - return - } - - s.cfg.Nodes = s.cfg.Nodes[idx+1:] + return fmt.Sprintf("%s:%d", node.IP, node.Port) } func (s *supernodeRegister) constructRegisterRequest(port int) *types.RegisterRequest { @@ -188,28 +180,26 @@ func getTaskPath(taskFileName string) string { } // NewRegisterResult creates an instance of RegisterResult. -func NewRegisterResult(node string, remainder []string, url string, +func NewRegisterResult(node string, url string, taskID string, fileLen int64, pieceSize int32, cdnSource apiTypes.CdnSource) *RegisterResult { return &RegisterResult{ - Node: node, - RemainderNodes: remainder, - URL: url, - TaskID: taskID, - FileLength: fileLen, - PieceSize: pieceSize, - CDNSource: cdnSource, + Node: node, + URL: url, + TaskID: taskID, + FileLength: fileLen, + PieceSize: pieceSize, + CDNSource: cdnSource, } } // RegisterResult is the register result set. type RegisterResult struct { - Node string - RemainderNodes []string - URL string - TaskID string - FileLength int64 - PieceSize int32 - CDNSource apiTypes.CdnSource + Node string + URL string + TaskID string + FileLength int64 + PieceSize int32 + CDNSource apiTypes.CdnSource } func (r *RegisterResult) String() string { diff --git a/dfget/core/regist/register_test.go b/dfget/core/regist/register_test.go index 9e7b61562..c1007bffd 100644 --- a/dfget/core/regist/register_test.go +++ b/dfget/core/regist/register_test.go @@ -58,10 +58,9 @@ func (s *RegistTestSuite) TearDownSuite(c *check.C) { } func (s *RegistTestSuite) TestNewRegisterResult(c *check.C) { - result := NewRegisterResult("node", []string{"1"}, "url", "taskID", + result := NewRegisterResult("node", "url", "taskID", 10, 1, "supernode") c.Assert(result.Node, check.Equals, "node") - c.Assert(result.RemainderNodes, check.DeepEquals, []string{"1"}) c.Assert(result.URL, check.Equals, "url") c.Assert(result.TaskID, check.Equals, "taskID") c.Assert(result.FileLength, check.Equals, int64(10)) @@ -79,7 +78,7 @@ func (s *RegistTestSuite) TestSupernodeRegister_Register(c *check.C) { m.RegisterFunc = CreateRegisterFunc() var f = func(ec int, msg string, data *RegisterResult) { - register := NewSupernodeRegister(cfg, m) + register := NewSupernodeRegister(cfg, m, &MockSupernodeLocator{}) resp, e := register.Register(0) if msg == "" { c.Assert(e, check.IsNil) @@ -109,7 +108,7 @@ func (s *RegistTestSuite) TestSupernodeRegister_Register(c *check.C) { cfg.Nodes = []string{"x"} cfg.URL = "http://lowzj.com" f(constants.Success, "", &RegisterResult{ - Node: "x", RemainderNodes: []string{}, URL: cfg.URL, TaskID: "a", + Node: "x", URL: cfg.URL, TaskID: "a", FileLength: 100, PieceSize: 10}) f(constants.HTTPError, "empty response, unknown error", nil)