Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
refactor: refactor dfget/core with SupernodeLocator
Browse files Browse the repository at this point in the history
Signed-off-by: lowzj <[email protected]>
  • Loading branch information
lowzj committed May 7, 2020
1 parent 0d3e9b2 commit f7ae939
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 78 deletions.
15 changes: 9 additions & 6 deletions dfget/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
p2pDown "github.com/dragonflyoss/Dragonfly/dfget/core/downloader/p2p_downloader"
"github.com/dragonflyoss/Dragonfly/dfget/core/regist"
"github.com/dragonflyoss/Dragonfly/dfget/core/uploader"
"github.com/dragonflyoss/Dragonfly/dfget/locator"
"github.com/dragonflyoss/Dragonfly/pkg/algorithm"
"github.com/dragonflyoss/Dragonfly/pkg/constants"
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
Expand All @@ -51,9 +52,11 @@ import (
func Start(cfg *config.Config) *errortypes.DfError {
var (
supernodeAPI = api.NewSupernodeAPI()
register = regist.NewSupernodeRegister(cfg, supernodeAPI)
err error
result *regist.RegisterResult
// TODO make it pluggable
supernodeLocator = locator.GetLocator(cfg)
register = regist.NewSupernodeRegister(cfg, supernodeAPI, supernodeLocator)
err error
result *regist.RegisterResult
)

printer.Println(fmt.Sprintf("--%s-- %s",
Expand All @@ -63,7 +66,7 @@ func Start(cfg *config.Config) *errortypes.DfError {
return errortypes.New(config.CodePrepareError, err.Error())
}

if result, err = registerToSuperNode(cfg, register); err != nil {
if result, err = registerToSuperNode(cfg, register, supernodeLocator); err != nil {
return errortypes.New(config.CodeRegisterError, err.Error())
}

Expand Down Expand Up @@ -124,7 +127,7 @@ func launchPeerServer(cfg *config.Config) error {
return err
}

func registerToSuperNode(cfg *config.Config, register regist.SupernodeRegister) (
func registerToSuperNode(cfg *config.Config, register regist.SupernodeRegister, supernodeLocator locator.SupernodeLocator) (
*regist.RegisterResult, error) {
defer func() {
if r := recover(); r != nil {
Expand All @@ -137,7 +140,7 @@ func registerToSuperNode(cfg *config.Config, register regist.SupernodeRegister)
panic("user specified")
}

if len(cfg.Nodes) == 0 {
if supernodeLocator == nil || supernodeLocator.Size() == 0 {
cfg.BackSourceReason = config.BackSourceReasonNodeEmpty
panic("supernode empty")
}
Expand Down
14 changes: 10 additions & 4 deletions dfget/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
. "github.com/dragonflyoss/Dragonfly/dfget/core/helper"
"github.com/dragonflyoss/Dragonfly/dfget/core/regist"
"github.com/dragonflyoss/Dragonfly/dfget/core/uploader"
"github.com/dragonflyoss/Dragonfly/dfget/locator"
"github.com/dragonflyoss/Dragonfly/pkg/algorithm"

"github.com/go-check/check"
Expand Down Expand Up @@ -76,10 +77,12 @@ func (s *CoreTestSuite) TestRegisterToSupernode(c *check.C) {
cfg := s.createConfig(&bytes.Buffer{})
m := new(MockSupernodeAPI)
m.RegisterFunc = CreateRegisterFunc()
register := regist.NewSupernodeRegister(cfg, m)
nodeStr := "127.0.0.1:8002"
snLocator, _ := locator.NewStaticLocatorFromStr([]string{nodeStr})
register := regist.NewSupernodeRegister(cfg, m, snLocator)

var f = func(bc int, errIsNil bool, data *regist.RegisterResult) {
res, e := registerToSuperNode(cfg, register)
res, e := registerToSuperNode(cfg, register, snLocator)
c.Assert(res == nil, check.Equals, data == nil)
c.Assert(e == nil, check.Equals, errIsNil)
c.Assert(cfg.BackSourceReason, check.Equals, bc)
Expand All @@ -88,18 +91,21 @@ func (s *CoreTestSuite) TestRegisterToSupernode(c *check.C) {
}
}

tmpGroup := snLocator.Group
snLocator.Group = nil
f(config.BackSourceReasonNodeEmpty, true, nil)

cfg.Pattern = config.PatternSource
f(config.BackSourceReasonUserSpecified, true, nil)

uploader.SetupPeerServerExecutor(nil)
cfg.Pattern = config.PatternP2P
cfg.Nodes = []string{"x"}
snLocator.Group = tmpGroup
snLocator.Refresh()
cfg.URL = "http://x.com"
f(config.BackSourceReasonRegisterFail, true, nil)

cfg.Nodes = []string{"x"}
snLocator.Refresh()
cfg.URL = "http://taobao.com"
cfg.BackSourceReason = config.BackSourceReasonNone
}
Expand Down
106 changes: 48 additions & 58 deletions dfget/core/regist/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@
package regist

import (
"fmt"
"io/ioutil"
"os"
"time"

apiTypes "github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/dfget/config"
"github.com/dragonflyoss/Dragonfly/dfget/core/api"
"github.com/dragonflyoss/Dragonfly/dfget/locator"
"github.com/dragonflyoss/Dragonfly/dfget/types"
"github.com/dragonflyoss/Dragonfly/pkg/constants"
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
"github.com/dragonflyoss/Dragonfly/pkg/netutils"
"github.com/dragonflyoss/Dragonfly/pkg/stringutils"
"github.com/dragonflyoss/Dragonfly/pkg/util"
"github.com/dragonflyoss/Dragonfly/version"
Expand All @@ -42,17 +43,19 @@ type SupernodeRegister interface {

type supernodeRegister struct {
api api.SupernodeAPI
locator locator.SupernodeLocator
cfg *config.Config
lastRegisteredNode string
lastRegisteredNode *locator.Supernode
}

var _ SupernodeRegister = &supernodeRegister{}

// NewSupernodeRegister creates an instance of supernodeRegister.
func NewSupernodeRegister(cfg *config.Config, api api.SupernodeAPI) SupernodeRegister {
func NewSupernodeRegister(cfg *config.Config, api api.SupernodeAPI, locator locator.SupernodeLocator) SupernodeRegister {
return &supernodeRegister{
api: api,
cfg: cfg,
api: api,
locator: locator,
cfg: cfg,
}
}

Expand All @@ -61,45 +64,48 @@ func (s *supernodeRegister) Register(peerPort int) (*RegisterResult, *errortypes
var (
resp *types.RegisterResponse
e error
i int
node *locator.Supernode
retryTimes = 0
start = time.Now()
)

nextOrRetry := func() *locator.Supernode {
if resp != nil && resp.Code == constants.CodeWaitAuth && retryTimes < 3 {
retryTimes++
logrus.Infof("sleep 1.0 s to wait auth(%d/3)...", retryTimes)
time.Sleep(1000 * time.Millisecond)
return s.locator.Get()
}
return s.locator.Next()
}

logrus.Infof("do register to one of %v", s.cfg.Nodes)
nodes, nLen := s.cfg.Nodes, len(s.cfg.Nodes)
req := s.constructRegisterRequest(peerPort)
for i = 0; i < nLen; i++ {
if s.lastRegisteredNode == nodes[i] {
logrus.Warnf("the last registered node is the same(%s)", nodes[i])
for node = s.locator.Next(); node != nil; node = nextOrRetry() {
if s.lastRegisteredNode == node {
logrus.Warnf("the last registered node is the same(%v)", s.lastRegisteredNode)
continue
}
req.SupernodeIP = netutils.ExtractHost(nodes[i])
resp, e = s.api.Register(nodes[i], req)
logrus.Infof("do register to %s, res:%s error:%v", nodes[i], resp, e)
req.SupernodeIP = node.IP
nodeHost := s.nodeHostStr(node)
resp, e = s.api.Register(nodeHost, req)
logrus.Infof("do register to %s, res:%s error:%v", nodeHost, resp, e)
if e != nil {
logrus.Errorf("register to node:%s error:%v", nodes[i], e)
continue
}
if resp.Code == constants.Success || resp.Code == constants.CodeNeedAuth ||
resp.Code == constants.CodeURLNotReachable {
break
}
if resp.Code == constants.CodeWaitAuth && retryTimes < 3 {
i--
retryTimes++
logrus.Infof("sleep 2.5s to wait auth(%d/3)...", retryTimes)
time.Sleep(2500 * time.Millisecond)
}
}
s.setLastRegisteredNode(i)
s.setRemainderNodes(i)

s.setLastRegisteredNode(node)
if err := s.checkResponse(resp, e); err != nil {
logrus.Errorf("register fail:%v", err)
return nil, err
}

result := NewRegisterResult(nodes[i], s.cfg.Nodes, s.cfg.URL,
result := NewRegisterResult(s.nodeHostStr(node), s.cfg.URL,
resp.Data.TaskID, resp.Data.FileLength, resp.Data.PieceSize, resp.Data.CDNSource)

logrus.Infof("do register result:%s and cost:%.3fs", resp,
Expand All @@ -120,29 +126,15 @@ func (s *supernodeRegister) checkResponse(resp *types.RegisterResponse, e error)
return nil
}

func (s *supernodeRegister) setLastRegisteredNode(idx int) {
nLen := len(s.cfg.Nodes)
if nLen <= 0 {
return
}
if idx >= nLen {
s.lastRegisteredNode = ""
return
}
s.lastRegisteredNode = s.cfg.Nodes[idx]
func (s *supernodeRegister) setLastRegisteredNode(node *locator.Supernode) {
s.lastRegisteredNode = node
}

func (s *supernodeRegister) setRemainderNodes(idx int) {
nLen := len(s.cfg.Nodes)
if nLen <= 0 {
return
func (s *supernodeRegister) nodeHostStr(node *locator.Supernode) string {
if node == nil {
return ""
}
if idx >= nLen {
s.cfg.Nodes = []string{}
return
}

s.cfg.Nodes = s.cfg.Nodes[idx+1:]
return fmt.Sprintf("%s:%d", node.IP, node.Port)
}

func (s *supernodeRegister) constructRegisterRequest(port int) *types.RegisterRequest {
Expand Down Expand Up @@ -188,28 +180,26 @@ func getTaskPath(taskFileName string) string {
}

// NewRegisterResult creates an instance of RegisterResult.
func NewRegisterResult(node string, remainder []string, url string,
func NewRegisterResult(node string, url string,
taskID string, fileLen int64, pieceSize int32, cdnSource apiTypes.CdnSource) *RegisterResult {
return &RegisterResult{
Node: node,
RemainderNodes: remainder,
URL: url,
TaskID: taskID,
FileLength: fileLen,
PieceSize: pieceSize,
CDNSource: cdnSource,
Node: node,
URL: url,
TaskID: taskID,
FileLength: fileLen,
PieceSize: pieceSize,
CDNSource: cdnSource,
}
}

// RegisterResult is the register result set.
type RegisterResult struct {
Node string
RemainderNodes []string
URL string
TaskID string
FileLength int64
PieceSize int32
CDNSource apiTypes.CdnSource
Node string
URL string
TaskID string
FileLength int64
PieceSize int32
CDNSource apiTypes.CdnSource
}

func (r *RegisterResult) String() string {
Expand Down
22 changes: 12 additions & 10 deletions dfget/core/regist/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/dfget/config"
. "github.com/dragonflyoss/Dragonfly/dfget/core/helper"
"github.com/dragonflyoss/Dragonfly/dfget/locator"
"github.com/dragonflyoss/Dragonfly/pkg/constants"

"github.com/go-check/check"
Expand Down Expand Up @@ -58,10 +59,9 @@ func (s *RegistTestSuite) TearDownSuite(c *check.C) {
}

func (s *RegistTestSuite) TestNewRegisterResult(c *check.C) {
result := NewRegisterResult("node", []string{"1"}, "url", "taskID",
result := NewRegisterResult("node", "url", "taskID",
10, 1, "supernode")
c.Assert(result.Node, check.Equals, "node")
c.Assert(result.RemainderNodes, check.DeepEquals, []string{"1"})
c.Assert(result.URL, check.Equals, "url")
c.Assert(result.TaskID, check.Equals, "taskID")
c.Assert(result.FileLength, check.Equals, int64(10))
Expand All @@ -78,8 +78,10 @@ func (s *RegistTestSuite) TestSupernodeRegister_Register(c *check.C) {
m := new(MockSupernodeAPI)
m.RegisterFunc = CreateRegisterFunc()

nodeStr := "127.0.0.1:8002"
snLocator, _ := locator.NewStaticLocatorFromStr([]string{nodeStr})
var f = func(ec int, msg string, data *RegisterResult) {
register := NewSupernodeRegister(cfg, m)
register := NewSupernodeRegister(cfg, m, snLocator)
resp, e := register.Register(0)
if msg == "" {
c.Assert(e, check.IsNil)
Expand All @@ -92,24 +94,24 @@ func (s *RegistTestSuite) TestSupernodeRegister_Register(c *check.C) {
}
}

cfg.Nodes = []string{""}
snLocator.Next()
f(constants.HTTPError, "empty response, unknown error", nil)

cfg.Nodes = []string{"x"}
snLocator.Refresh()
f(501, "invalid source url", nil)

cfg.Nodes = []string{"x"}
snLocator.Refresh()
cfg.URL = "http://taobao.com"
f(constants.CodeNeedAuth, "need auth", nil)

cfg.Nodes = []string{"x"}
snLocator.Refresh()
cfg.URL = "http://github.com"
f(constants.CodeWaitAuth, "wait auth", nil)

cfg.Nodes = []string{"x"}
snLocator.Refresh()
cfg.URL = "http://lowzj.com"
f(constants.Success, "", &RegisterResult{
Node: "x", RemainderNodes: []string{}, URL: cfg.URL, TaskID: "a",
Node: nodeStr, URL: cfg.URL, TaskID: "a",
FileLength: 100, PieceSize: 10})

f(constants.HTTPError, "empty response, unknown error", nil)
Expand All @@ -118,7 +120,7 @@ func (s *RegistTestSuite) TestSupernodeRegister_Register(c *check.C) {
func (s *RegistTestSuite) TestSupernodeRegister_constructRegisterRequest(c *check.C) {
buf := &bytes.Buffer{}
cfg := s.createConfig(buf)
register := &supernodeRegister{nil, cfg, ""}
register := &supernodeRegister{nil, nil, cfg, nil}

cfg.Identifier = "id"
req := register.constructRegisterRequest(0)
Expand Down
Loading

0 comments on commit f7ae939

Please sign in to comment.