Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
refactor: refactor dfget/core with SupernodeLocator
Browse files Browse the repository at this point in the history
Signed-off-by: lowzj <[email protected]>
  • Loading branch information
lowzj committed May 6, 2020
1 parent 44506bf commit 1d5d7b6
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 67 deletions.
11 changes: 7 additions & 4 deletions dfget/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
p2pDown "github.com/dragonflyoss/Dragonfly/dfget/core/downloader/p2p_downloader"
"github.com/dragonflyoss/Dragonfly/dfget/core/regist"
"github.com/dragonflyoss/Dragonfly/dfget/core/uploader"
"github.com/dragonflyoss/Dragonfly/dfget/locator"
"github.com/dragonflyoss/Dragonfly/pkg/algorithm"
"github.com/dragonflyoss/Dragonfly/pkg/constants"
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
Expand All @@ -54,10 +55,12 @@ func init() {
// Start function creates a new task and starts it to download file.
func Start(cfg *config.Config) *errortypes.DfError {
var (
supernodeAPI = api.NewSupernodeAPI()
register = regist.NewSupernodeRegister(cfg, supernodeAPI)
err error
result *regist.RegisterResult
supernodeAPI = api.NewSupernodeAPI()
// TODO make it pluggable
supernodeLocator, _ = locator.NewStaticLocatorFromStr(append(cfg.Nodes, cfg.Nodes...))
register = regist.NewSupernodeRegister(cfg, supernodeAPI, supernodeLocator)
err error
result *regist.RegisterResult
)

printer.Println(fmt.Sprintf("--%s-- %s",
Expand Down
2 changes: 1 addition & 1 deletion dfget/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (s *CoreTestSuite) TestRegisterToSupernode(c *check.C) {
cfg := s.createConfig(&bytes.Buffer{})
m := new(MockSupernodeAPI)
m.RegisterFunc = CreateRegisterFunc()
register := regist.NewSupernodeRegister(cfg, m)
register := regist.NewSupernodeRegister(cfg, m, &MockSupernodeLocator{})

var f = func(bc int, errIsNil bool, data *regist.RegisterResult) {
res, e := registerToSuperNode(cfg, register)
Expand Down
37 changes: 37 additions & 0 deletions dfget/core/helper/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
api_types "github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/dfget/config"
"github.com/dragonflyoss/Dragonfly/dfget/core/api"
"github.com/dragonflyoss/Dragonfly/dfget/locator"
"github.com/dragonflyoss/Dragonfly/dfget/types"
"github.com/dragonflyoss/Dragonfly/pkg/constants"
"github.com/dragonflyoss/Dragonfly/pkg/fileutils"
Expand Down Expand Up @@ -394,3 +395,39 @@ type mockFile struct {
size int64
repeatStr []byte
}

// ----------------------------------------------------------------------------
// MockSupernodeLocator

var _ locator.SupernodeLocator = &MockSupernodeLocator{}

type MockSupernodeLocator struct {
}

func (m *MockSupernodeLocator) Get() *locator.Supernode {
panic("implement me")
}

func (m *MockSupernodeLocator) Next() *locator.Supernode {
return &locator.Supernode{
IP: "127.0.0.1",
Port: config.DefaultSupernodePort,
}
}

func (m *MockSupernodeLocator) GetGroup(name string) *locator.SupernodeGroup {
panic("implement me")
}

func (m *MockSupernodeLocator) All() []*locator.SupernodeGroup {
panic("implement me")
}

func (m *MockSupernodeLocator) Report(node string, metrics *locator.SupernodeMetrics) {
panic("implement me")
}

func (m *MockSupernodeLocator) Refresh() bool {
panic("implement me")
}

106 changes: 48 additions & 58 deletions dfget/core/regist/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@
package regist

import (
"fmt"
"io/ioutil"
"os"
"time"

apiTypes "github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/dfget/config"
"github.com/dragonflyoss/Dragonfly/dfget/core/api"
"github.com/dragonflyoss/Dragonfly/dfget/locator"
"github.com/dragonflyoss/Dragonfly/dfget/types"
"github.com/dragonflyoss/Dragonfly/pkg/constants"
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
"github.com/dragonflyoss/Dragonfly/pkg/netutils"
"github.com/dragonflyoss/Dragonfly/pkg/stringutils"
"github.com/dragonflyoss/Dragonfly/pkg/util"
"github.com/dragonflyoss/Dragonfly/version"
Expand All @@ -42,17 +43,19 @@ type SupernodeRegister interface {

type supernodeRegister struct {
api api.SupernodeAPI
locator locator.SupernodeLocator
cfg *config.Config
lastRegisteredNode string
lastRegisteredNode *locator.Supernode
}

var _ SupernodeRegister = &supernodeRegister{}

// NewSupernodeRegister creates an instance of supernodeRegister.
func NewSupernodeRegister(cfg *config.Config, api api.SupernodeAPI) SupernodeRegister {
func NewSupernodeRegister(cfg *config.Config, api api.SupernodeAPI, locator locator.SupernodeLocator) SupernodeRegister {
return &supernodeRegister{
api: api,
cfg: cfg,
api: api,
locator: locator,
cfg: cfg,
}
}

Expand All @@ -61,45 +64,48 @@ func (s *supernodeRegister) Register(peerPort int) (*RegisterResult, *errortypes
var (
resp *types.RegisterResponse
e error
i int
node *locator.Supernode
retryTimes = 0
start = time.Now()
)

nextOrRetry := func() *locator.Supernode {
if resp != nil && resp.Code == constants.CodeWaitAuth && retryTimes < 3 {
retryTimes++
logrus.Infof("sleep 1.0 s to wait auth(%d/3)...", retryTimes)
time.Sleep(1000 * time.Millisecond)
return s.locator.Get()
}
return s.locator.Next()
}

logrus.Infof("do register to one of %v", s.cfg.Nodes)
nodes, nLen := s.cfg.Nodes, len(s.cfg.Nodes)
req := s.constructRegisterRequest(peerPort)
for i = 0; i < nLen; i++ {
if s.lastRegisteredNode == nodes[i] {
logrus.Warnf("the last registered node is the same(%s)", nodes[i])
for node = s.locator.Next(); node != nil; node = nextOrRetry() {
if s.lastRegisteredNode == node {
logrus.Warnf("the last registered node is the same(%s)", s.lastRegisteredNode)
continue
}
req.SupernodeIP = netutils.ExtractHost(nodes[i])
resp, e = s.api.Register(nodes[i], req)
logrus.Infof("do register to %s, res:%s error:%v", nodes[i], resp, e)
req.SupernodeIP = node.IP
nodeHost := s.nodeHostStr(node)
resp, e = s.api.Register(nodeHost, req)
logrus.Infof("do register to %s, res:%s error:%v", nodeHost, resp, e)
if e != nil {
logrus.Errorf("register to node:%s error:%v", nodes[i], e)
continue
}
if resp.Code == constants.Success || resp.Code == constants.CodeNeedAuth ||
resp.Code == constants.CodeURLNotReachable {
break
}
if resp.Code == constants.CodeWaitAuth && retryTimes < 3 {
i--
retryTimes++
logrus.Infof("sleep 2.5s to wait auth(%d/3)...", retryTimes)
time.Sleep(2500 * time.Millisecond)
}
}
s.setLastRegisteredNode(i)
s.setRemainderNodes(i)

s.setLastRegisteredNode(node)
if err := s.checkResponse(resp, e); err != nil {
logrus.Errorf("register fail:%v", err)
return nil, err
}

result := NewRegisterResult(nodes[i], s.cfg.Nodes, s.cfg.URL,
result := NewRegisterResult(s.nodeHostStr(node), s.cfg.URL,
resp.Data.TaskID, resp.Data.FileLength, resp.Data.PieceSize, resp.Data.CDNSource)

logrus.Infof("do register result:%s and cost:%.3fs", resp,
Expand All @@ -120,29 +126,15 @@ func (s *supernodeRegister) checkResponse(resp *types.RegisterResponse, e error)
return nil
}

func (s *supernodeRegister) setLastRegisteredNode(idx int) {
nLen := len(s.cfg.Nodes)
if nLen <= 0 {
return
}
if idx >= nLen {
s.lastRegisteredNode = ""
return
}
s.lastRegisteredNode = s.cfg.Nodes[idx]
func (s *supernodeRegister) setLastRegisteredNode(node *locator.Supernode) {
s.lastRegisteredNode = node
}

func (s *supernodeRegister) setRemainderNodes(idx int) {
nLen := len(s.cfg.Nodes)
if nLen <= 0 {
return
func (s *supernodeRegister) nodeHostStr(node *locator.Supernode) string {
if node == nil {
return ""
}
if idx >= nLen {
s.cfg.Nodes = []string{}
return
}

s.cfg.Nodes = s.cfg.Nodes[idx+1:]
return fmt.Sprintf("%s:%d", node.IP, node.Port)
}

func (s *supernodeRegister) constructRegisterRequest(port int) *types.RegisterRequest {
Expand Down Expand Up @@ -188,28 +180,26 @@ func getTaskPath(taskFileName string) string {
}

// NewRegisterResult creates an instance of RegisterResult.
func NewRegisterResult(node string, remainder []string, url string,
func NewRegisterResult(node string, url string,
taskID string, fileLen int64, pieceSize int32, cdnSource apiTypes.CdnSource) *RegisterResult {
return &RegisterResult{
Node: node,
RemainderNodes: remainder,
URL: url,
TaskID: taskID,
FileLength: fileLen,
PieceSize: pieceSize,
CDNSource: cdnSource,
Node: node,
URL: url,
TaskID: taskID,
FileLength: fileLen,
PieceSize: pieceSize,
CDNSource: cdnSource,
}
}

// RegisterResult is the register result set.
type RegisterResult struct {
Node string
RemainderNodes []string
URL string
TaskID string
FileLength int64
PieceSize int32
CDNSource apiTypes.CdnSource
Node string
URL string
TaskID string
FileLength int64
PieceSize int32
CDNSource apiTypes.CdnSource
}

func (r *RegisterResult) String() string {
Expand Down
7 changes: 3 additions & 4 deletions dfget/core/regist/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,9 @@ func (s *RegistTestSuite) TearDownSuite(c *check.C) {
}

func (s *RegistTestSuite) TestNewRegisterResult(c *check.C) {
result := NewRegisterResult("node", []string{"1"}, "url", "taskID",
result := NewRegisterResult("node", "url", "taskID",
10, 1, "supernode")
c.Assert(result.Node, check.Equals, "node")
c.Assert(result.RemainderNodes, check.DeepEquals, []string{"1"})
c.Assert(result.URL, check.Equals, "url")
c.Assert(result.TaskID, check.Equals, "taskID")
c.Assert(result.FileLength, check.Equals, int64(10))
Expand All @@ -79,7 +78,7 @@ func (s *RegistTestSuite) TestSupernodeRegister_Register(c *check.C) {
m.RegisterFunc = CreateRegisterFunc()

var f = func(ec int, msg string, data *RegisterResult) {
register := NewSupernodeRegister(cfg, m)
register := NewSupernodeRegister(cfg, m, &MockSupernodeLocator{})
resp, e := register.Register(0)
if msg == "" {
c.Assert(e, check.IsNil)
Expand Down Expand Up @@ -109,7 +108,7 @@ func (s *RegistTestSuite) TestSupernodeRegister_Register(c *check.C) {
cfg.Nodes = []string{"x"}
cfg.URL = "http://lowzj.com"
f(constants.Success, "", &RegisterResult{
Node: "x", RemainderNodes: []string{}, URL: cfg.URL, TaskID: "a",
Node: "x", URL: cfg.URL, TaskID: "a",
FileLength: 100, PieceSize: 10})

f(constants.HTTPError, "empty response, unknown error", nil)
Expand Down

0 comments on commit 1d5d7b6

Please sign in to comment.