Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

refactor: refactor dfget/core with SupernodeLocator #1325

Merged
merged 1 commit into from
May 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/dfget/app/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type dfgetSuit struct {

func (suit *dfgetSuit) Test_initFlagsNoArguments() {
initProperties()
suit.Equal(cfg.Nodes, []string{"127.0.0.1:8002"})
suit.Equal(cfg.Nodes, []string(nil))
suit.Equal(cfg.LocalLimit, 20*rate.MB)
suit.Equal(cfg.TotalLimit, rate.Rate(0))
suit.Equal(cfg.Notbs, false)
Expand Down
3 changes: 2 additions & 1 deletion dfget/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ type Properties struct {

// NewProperties creates a new properties with default values.
func NewProperties() *Properties {
// don't set Supernodes as default value, the SupernodeLocator will
// do this in a better way.
return &Properties{
Supernodes: GetDefaultSupernodesValue(),
LocalLimit: DefaultLocalLimit,
MinRate: DefaultMinRate,
ClientQueueSize: DefaultClientQueueSize,
Expand Down
60 changes: 36 additions & 24 deletions dfget/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
p2pDown "github.com/dragonflyoss/Dragonfly/dfget/core/downloader/p2p_downloader"
"github.com/dragonflyoss/Dragonfly/dfget/core/regist"
"github.com/dragonflyoss/Dragonfly/dfget/core/uploader"
"github.com/dragonflyoss/Dragonfly/dfget/locator"
"github.com/dragonflyoss/Dragonfly/pkg/algorithm"
"github.com/dragonflyoss/Dragonfly/pkg/constants"
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
Expand All @@ -50,32 +51,33 @@ import (
// Start function creates a new task and starts it to download file.
func Start(cfg *config.Config) *errortypes.DfError {
var (
supernodeAPI = api.NewSupernodeAPI()
register = regist.NewSupernodeRegister(cfg, supernodeAPI)
err error
result *regist.RegisterResult
supernodeAPI = api.NewSupernodeAPI()
supernodeLocator = locator.CreateLocator(cfg)
register = regist.NewSupernodeRegister(cfg, supernodeAPI, supernodeLocator)
err error
result *regist.RegisterResult
)

printer.Println(fmt.Sprintf("--%s-- %s",
cfg.StartTime.Format(config.DefaultTimestampFormat), cfg.URL))

if err = prepare(cfg); err != nil {
if err = prepare(cfg, supernodeLocator); err != nil {
return errortypes.New(config.CodePrepareError, err.Error())
}

if result, err = registerToSuperNode(cfg, register); err != nil {
if result, err = registerToSuperNode(cfg, register, supernodeLocator); err != nil {
return errortypes.New(config.CodeRegisterError, err.Error())
}

if err = downloadFile(cfg, supernodeAPI, register, result); err != nil {
if err = downloadFile(cfg, supernodeAPI, supernodeLocator, register, result); err != nil {
return errortypes.New(config.CodeDownloadError, err.Error())
}

return nil
}

// prepare the RV-related information and create the corresponding files.
func prepare(cfg *config.Config) (err error) {
func prepare(cfg *config.Config, locator locator.SupernodeLocator) (err error) {
printer.Printf("dfget version:%s", version.DFGetVersion)
printer.Printf("workspace:%s", cfg.WorkHome)
printer.Printf("sign:%s", cfg.Sign)
Expand Down Expand Up @@ -104,9 +106,8 @@ func prepare(cfg *config.Config) (err error) {
}
rv.DataDir = cfg.RV.SystemDataDir

cfg.Nodes = adjustSupernodeList(cfg.Nodes)
if stringutils.IsEmptyStr(rv.LocalIP) {
rv.LocalIP = checkConnectSupernode(cfg.Nodes)
rv.LocalIP = checkConnectSupernode(locator)
}
rv.Cid = getCid(rv.LocalIP, cfg.Sign)
rv.TaskFileName = getTaskFileName(rv.RealTarget, cfg.Sign)
Expand All @@ -124,7 +125,7 @@ func launchPeerServer(cfg *config.Config) error {
return err
}

func registerToSuperNode(cfg *config.Config, register regist.SupernodeRegister) (
func registerToSuperNode(cfg *config.Config, register regist.SupernodeRegister, supernodeLocator locator.SupernodeLocator) (
*regist.RegisterResult, error) {
defer func() {
if r := recover(); r != nil {
Expand All @@ -137,7 +138,7 @@ func registerToSuperNode(cfg *config.Config, register regist.SupernodeRegister)
panic("user specified")
}

if len(cfg.Nodes) == 0 {
if supernodeLocator == nil || supernodeLocator.Size() == 0 {
cfg.BackSourceReason = config.BackSourceReasonNodeEmpty
panic("supernode empty")
}
Expand All @@ -162,7 +163,7 @@ func registerToSuperNode(cfg *config.Config, register regist.SupernodeRegister)
return result, nil
}

func downloadFile(cfg *config.Config, supernodeAPI api.SupernodeAPI,
func downloadFile(cfg *config.Config, supernodeAPI api.SupernodeAPI, locator locator.SupernodeLocator,
register regist.SupernodeRegister, result *regist.RegisterResult) error {
timeout := calculateTimeout(cfg)

Expand All @@ -180,7 +181,7 @@ func downloadFile(cfg *config.Config, supernodeAPI api.SupernodeAPI,
downloadTime := time.Since(cfg.StartTime).Seconds()
// upload metrics to supernode only if pattern is p2p or cdn and result is not nil
if cfg.Pattern != config.PatternSource && result != nil {
reportMetrics(cfg, supernodeAPI, downloadTime, result.TaskID, success)
reportMetrics(cfg, supernodeAPI, locator, downloadTime, result.TaskID, success)
}

if success {
Expand Down Expand Up @@ -285,21 +286,26 @@ func adjustSupernodeList(nodes []string) []string {
}
}

func checkConnectSupernode(nodes []string) (localIP string) {
func checkConnectSupernode(locator locator.SupernodeLocator) (localIP string) {
var (
e error
)
for _, n := range nodes {
ip, port := netutils.GetIPAndPortFromNode(n, config.DefaultSupernodePort)
if localIP, e = httputils.CheckConnect(ip, port, 1000); e == nil {
return localIP
if locator == nil {
return ""
}
for _, group := range locator.All() {
for _, n := range group.Nodes {
if localIP, e = httputils.CheckConnect(n.IP, n.Port, 1000); e == nil {
return localIP
}
logrus.Errorf("Connect to node:%s error: %v", n, e)
}
logrus.Errorf("Connect to node:%s error: %v", n, e)
}
return ""
}

func reportMetrics(cfg *config.Config, supernodeAPI api.SupernodeAPI, downloadTime float64, taskID string, success bool) {
func reportMetrics(cfg *config.Config, supernodeAPI api.SupernodeAPI, locator locator.SupernodeLocator,
downloadTime float64, taskID string, success bool) {
req := &types.TaskMetricsRequest{
BacksourceReason: strconv.Itoa(cfg.BackSourceReason),
IP: cfg.RV.LocalIP,
Expand All @@ -311,10 +317,16 @@ func reportMetrics(cfg *config.Config, supernodeAPI api.SupernodeAPI, downloadTi
Success: success,
TaskID: taskID,
}
for _, node := range cfg.Nodes {
resp, err := supernodeAPI.ReportMetrics(node, req)
node := locator.Get()
if node == nil {
return
}
nodeStr := node.String()
// retry twice
for i := 0; i < 2; i++ {
resp, err := supernodeAPI.ReportMetrics(nodeStr, req)
if err != nil {
logrus.Errorf("failed to report metrics to supernode %s: %v", node, err)
logrus.Errorf("failed to report metrics to supernode %s: %v", nodeStr, err)
}
if resp != nil && resp.IsSuccess() {
return
Expand Down
22 changes: 15 additions & 7 deletions dfget/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
. "github.com/dragonflyoss/Dragonfly/dfget/core/helper"
"github.com/dragonflyoss/Dragonfly/dfget/core/regist"
"github.com/dragonflyoss/Dragonfly/dfget/core/uploader"
"github.com/dragonflyoss/Dragonfly/dfget/locator"
"github.com/dragonflyoss/Dragonfly/pkg/algorithm"

"github.com/go-check/check"
Expand Down Expand Up @@ -68,18 +69,20 @@ func (s *CoreTestSuite) TestPrepare(c *check.C) {
cfg := s.createConfig(buf)
cfg.Output = filepath.Join(s.workHome, "test.output")

err := prepare(cfg)
err := prepare(cfg, nil)
fmt.Printf("%s\nerror:%v", buf.String(), err)
}

func (s *CoreTestSuite) TestRegisterToSupernode(c *check.C) {
cfg := s.createConfig(&bytes.Buffer{})
m := new(MockSupernodeAPI)
m.RegisterFunc = CreateRegisterFunc()
register := regist.NewSupernodeRegister(cfg, m)
nodeStr := "127.0.0.1:8002"
snLocator, _ := locator.NewStaticLocatorFromStr("test", []string{nodeStr})
register := regist.NewSupernodeRegister(cfg, m, snLocator)

var f = func(bc int, errIsNil bool, data *regist.RegisterResult) {
res, e := registerToSuperNode(cfg, register)
res, e := registerToSuperNode(cfg, register, snLocator)
c.Assert(res == nil, check.Equals, data == nil)
c.Assert(e == nil, check.Equals, errIsNil)
c.Assert(cfg.BackSourceReason, check.Equals, bc)
Expand All @@ -88,18 +91,21 @@ func (s *CoreTestSuite) TestRegisterToSupernode(c *check.C) {
}
}

tmpGroup := snLocator.Group
snLocator.Group = nil
f(config.BackSourceReasonNodeEmpty, true, nil)

cfg.Pattern = config.PatternSource
f(config.BackSourceReasonUserSpecified, true, nil)

uploader.SetupPeerServerExecutor(nil)
cfg.Pattern = config.PatternP2P
cfg.Nodes = []string{"x"}
snLocator.Group = tmpGroup
snLocator.Refresh()
cfg.URL = "http://x.com"
f(config.BackSourceReasonRegisterFail, true, nil)

cfg.Nodes = []string{"x"}
snLocator.Refresh()
cfg.URL = "http://taobao.com"
cfg.BackSourceReason = config.BackSourceReasonNone
}
Expand Down Expand Up @@ -131,11 +137,13 @@ func (s *CoreTestSuite) TestCheckConnectSupernode(c *check.C) {
s.createConfig(buf)

nodes := []string{host}
ip := checkConnectSupernode(nodes)
l, _ := locator.NewStaticLocatorFromStr("test", nodes)
ip := checkConnectSupernode(l)
c.Assert(ip, check.Equals, "127.0.0.1")

buf.Reset()
ip = checkConnectSupernode([]string{"127.0.0.2"})
l, _ = locator.NewStaticLocatorFromStr("test", []string{"127.0.0.2"})
ip = checkConnectSupernode(l)
c.Assert(strings.Index(buf.String(), "Connect") > 0, check.Equals, true)
c.Assert(ip, check.Equals, "")
}
Expand Down
Loading