From 131435ccbbe2bcf78fd332b274c941e5d763aa0f Mon Sep 17 00:00:00 2001 From: yeya24 Date: Tue, 3 Sep 2019 15:51:29 -0400 Subject: [PATCH] add type rate Signed-off-by: yeya24 --- cmd/dfdaemon/app/init.go | 2 +- cmd/dfdaemon/app/root.go | 6 +- cmd/dfget/app/root.go | 70 +----- cmd/dfget/app/root_test.go | 53 +--- cmd/supernode/app/root.go | 8 +- dfdaemon/config/config.go | 20 +- dfdaemon/config/config_test.go | 18 +- dfdaemon/proxy/proxy.go | 2 +- dfget/config/config.go | 53 ++-- dfget/config/config_test.go | 11 +- dfget/config/constants.go | 6 +- dfget/core/core_test.go | 7 - .../back_downloader/back_downloader.go | 2 +- .../p2p_downloader/p2p_downloader.go | 8 +- dfget/core/uploader/peer_server.go | 2 +- dfget/core/uploader/peer_server_executor.go | 2 +- go.sum | 3 + pkg/limitreader/limit_reader.go | 6 +- pkg/netutils/netutils.go | 28 ++- pkg/netutils/netutils_test.go | 8 +- pkg/rate/rate.go | 142 +++++++++++ pkg/rate/rate_test.go | 234 ++++++++++++++++++ pkg/ratelimiter/ratelimiter.go | 4 +- supernode/config/config.go | 32 +-- supernode/daemon/mgr/cdn/manager.go | 2 +- 25 files changed, 506 insertions(+), 223 deletions(-) create mode 100644 pkg/rate/rate.go create mode 100644 pkg/rate/rate_test.go diff --git a/cmd/dfdaemon/app/init.go b/cmd/dfdaemon/app/init.go index 5e4543e09..131258921 100644 --- a/cmd/dfdaemon/app/init.go +++ b/cmd/dfdaemon/app/init.go @@ -65,7 +65,7 @@ func initDfdaemon(cfg config.Properties) error { if err != nil { return errors.Wrap(err, "get dfget version") } - logrus.Infof("use dfget %s from %s", bytes.TrimSpace(dfgetVersion), cfg.DFPath) + logrus.Infof("use %s from %s", bytes.TrimSpace(dfgetVersion), cfg.DFPath) return nil } diff --git a/cmd/dfdaemon/app/root.go b/cmd/dfdaemon/app/root.go index 184c12d30..f134df408 100644 --- a/cmd/dfdaemon/app/root.go +++ b/cmd/dfdaemon/app/root.go @@ -22,12 +22,14 @@ import ( "os/exec" "path/filepath" "reflect" + "time" "github.com/dragonflyoss/Dragonfly/dfdaemon" "github.com/dragonflyoss/Dragonfly/dfdaemon/config" "github.com/dragonflyoss/Dragonfly/dfdaemon/constant" dferr "github.com/dragonflyoss/Dragonfly/pkg/errortypes" "github.com/dragonflyoss/Dragonfly/pkg/netutils" + "github.com/dragonflyoss/Dragonfly/pkg/rate" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" @@ -92,7 +94,7 @@ func init() { // dfget download config rf.String("localrepo", filepath.Join(os.Getenv("HOME"), ".small-dragonfly/dfdaemon/data/"), "temp output dir of dfdaemon") rf.String("dfpath", defaultDfgetPath, "dfget path") - rf.String("ratelimit", netutils.NetLimit(), "net speed limit,format:xxxM/K") + rf.Var(netutils.NetLimit(), "ratelimit", "net speed limit") rf.StringSlice("node", nil, "specify the addresses(host:port) of supernodes that will be passed to dfget.") exitOnError(bindRootFlags(viper.GetViper()), "bind root command flags") @@ -152,6 +154,8 @@ func getConfigFromViper(v *viper.Viper) (*config.Properties, error) { reflect.TypeOf(config.Regexp{}), reflect.TypeOf(config.URL{}), reflect.TypeOf(config.CertPool{}), + reflect.TypeOf(time.Second), + reflect.TypeOf(rate.B), ) }); err != nil { return nil, errors.Wrap(err, "unmarshal yaml") diff --git a/cmd/dfget/app/root.go b/cmd/dfget/app/root.go index ae300d747..1e1d4d2c5 100644 --- a/cmd/dfget/app/root.go +++ b/cmd/dfget/app/root.go @@ -20,7 +20,6 @@ import ( "fmt" "os" "path" - "strconv" "strings" "time" @@ -36,12 +35,7 @@ import ( "github.com/spf13/cobra" ) -var ( - localLimit string - totalLimit string - minRate string - filter string -) +var filter string var cfg = config.NewConfig() @@ -75,9 +69,7 @@ func runDfget() error { return err } - if err := transParams(); err != nil { - return err - } + cfg.Filter = transFilter(filter) // get config from property files initProperties() @@ -145,27 +137,6 @@ func initProperties() { } } -// transParams trans the user-friendly parameter formats -// to the format corresponding to the `Config` struct. -func transParams() error { - cfg.Filter = transFilter(filter) - - var err error - if cfg.LocalLimit, err = transLimit(localLimit); err != nil { - return errors.Wrapf(errortypes.ErrConvertFailed, "locallimit: %v", err) - } - - if cfg.MinRate, err = transLimit(minRate); err != nil { - return errors.Wrapf(errortypes.ErrConvertFailed, "minrate: %v", err) - } - - if cfg.TotalLimit, err = transLimit(totalLimit); err != nil { - return errors.Wrapf(errortypes.ErrConvertFailed, "totallimit: %v", err) - } - - return nil -} - // initClientLog initializes dfget client's logger. // There are two kinds of logger dfget client uses: logfile and console. // logfile is used to stored generated log in local filesystem, @@ -198,13 +169,13 @@ func initFlags() { "Destination path which is used to store the requested downloading file. It must contain detailed directory and specific filename, for example, '/tmp/file.mp4'") // localLimit & minRate & totalLimit & timeout - flagSet.StringVarP(&localLimit, "locallimit", "s", "", - "network bandwidth rate limit for single download task, in format of 20M/m/K/k") - flagSet.StringVar(&minRate, "minrate", "", - "minimal network bandwidth rate for downloading a file, in format of 20M/m/K/k") - flagSet.StringVar(&totalLimit, "totallimit", "", - "network bandwidth rate limit for the whole host, in format of 20M/m/K/k") - flagSet.IntVarP(&cfg.Timeout, "timeout", "e", 0, + flagSet.VarP(&cfg.LocalLimit, "locallimit", "s", + "network bandwidth rate limit for single download task, in format of G(B)/g/M(B)/m/K(B)/k/B, pure number will also be parsed as Byte") + flagSet.Var(&cfg.MinRate, "minrate", + "minimal network bandwidth rate for downloading a file, in format of G(B)/g/M(B)/m/K(B)/k/B, pure number will also be parsed as Byte") + flagSet.Var(&cfg.TotalLimit, "totallimit", + "network bandwidth rate limit for the whole host, in format of G(B)/g/M(B)/m/K(B)/k/B, pure number will also be parsed as Byte") + flagSet.DurationVarP(&cfg.Timeout, "timeout", "e", 0, "Timeout set for file downloading task. If dfget has not finished downloading all pieces of file before --timeout, the dfget will throw an error and exit") // md5 & identifier @@ -257,29 +228,6 @@ func initFlags() { flagSet.MarkDeprecated("exceed", "please use '--timeout' or '-e' instead") } -// Helper functions. -func transLimit(limit string) (int, error) { - if stringutils.IsEmptyStr(limit) { - return 0, nil - } - l := len(limit) - i, err := strconv.Atoi(limit[:l-1]) - - if err != nil { - return 0, err - } - - unit := limit[l-1] - if unit == 'k' || unit == 'K' { - return i * 1024, nil - } - if unit == 'm' || unit == 'M' { - return i * 1024 * 1024, nil - } - return 0, fmt.Errorf("invalid unit '%c' of '%s', 'KkMm' are supported", - unit, limit) -} - func transFilter(filter string) []string { if stringutils.IsEmptyStr(filter) { return nil diff --git a/cmd/dfget/app/root_test.go b/cmd/dfget/app/root_test.go index b7b9e1504..c6fa57b92 100644 --- a/cmd/dfget/app/root_test.go +++ b/cmd/dfget/app/root_test.go @@ -18,18 +18,15 @@ package app import ( "bytes" - "fmt" "io/ioutil" "os" "path/filepath" - "strconv" - "strings" "testing" "time" "github.com/dragonflyoss/Dragonfly/dfget/config" "github.com/dragonflyoss/Dragonfly/pkg/errortypes" - "github.com/dragonflyoss/Dragonfly/pkg/stringutils" + "github.com/dragonflyoss/Dragonfly/pkg/rate" "github.com/sirupsen/logrus" "github.com/stretchr/testify/suite" @@ -41,11 +38,10 @@ type dfgetSuit struct { func (suit *dfgetSuit) Test_initFlagsNoArguments() { suit.Nil(cfg.Node) - suit.Equal(cfg.LocalLimit, 0) - suit.Equal(cfg.TotalLimit, 0) + suit.Equal(cfg.LocalLimit, 20*rate.MB) + suit.Equal(cfg.TotalLimit, rate.Rate(0)) suit.Equal(cfg.Notbs, false) suit.Equal(cfg.DFDaemon, false) - suit.Equal(cfg.ShowBar, false) suit.Equal(cfg.Console, false) suit.Equal(cfg.Verbose, false) suit.Equal(cfg.URL, "") @@ -59,7 +55,7 @@ func (suit *dfgetSuit) Test_initProperties() { iniFile := filepath.Join(dirName, "dragonfly.ini") yamlFile := filepath.Join(dirName, "dragonfly.yaml") iniContent := []byte("[node]\naddress=1.1.1.1") - yamlContent := []byte("nodes:\n - 1.1.1.2\nlocalLimit: 1024000") + yamlContent := []byte("nodes:\n - 1.1.1.2\nlocalLimit: 1000K") ioutil.WriteFile(iniFile, iniContent, os.ModePerm) ioutil.WriteFile(yamlFile, yamlContent, os.ModePerm) @@ -75,20 +71,18 @@ func (suit *dfgetSuit) Test_initProperties() { {configs: []string{iniFile, yamlFile}, expected: newProp(0, 0, 0, "1.1.1.1")}, {configs: []string{yamlFile, iniFile}, - expected: newProp(1024000, 0, 0, "1.1.1.2")}, + expected: newProp(int(rate.MB*20), 0, 0, "1.1.1.2")}, {configs: []string{filepath.Join(dirName, "x"), yamlFile}, - expected: newProp(1024000, 0, 0, "1.1.1.2")}, + expected: newProp(int(rate.MB*20), 0, 0, "1.1.1.2")}, } for _, v := range cases { cfg = config.NewConfig() buf.Reset() cfg.ConfigFiles = v.configs - localLimitStr := strconv.FormatInt(int64(v.expected.LocalLimit/1024), 10) - totalLimitStr := strconv.FormatInt(int64(v.expected.TotalLimit/1024), 10) rootCmd.Flags().Parse([]string{ - "--locallimit", fmt.Sprintf("%sk", localLimitStr), - "--totallimit", fmt.Sprintf("%sk", totalLimitStr)}) + "--locallimit", v.expected.LocalLimit.String(), + "--totallimit", v.expected.TotalLimit.String()}) initProperties() suit.EqualValues(cfg.Node, v.expected.Nodes) suit.Equal(cfg.LocalLimit, v.expected.LocalLimit) @@ -97,33 +91,6 @@ func (suit *dfgetSuit) Test_initProperties() { } } -func (suit *dfgetSuit) Test_transLimit() { - var cases = map[string]struct { - i int - err string - }{ - "20M": {20971520, ""}, - "20m": {20971520, ""}, - "10k": {10240, ""}, - "10K": {10240, ""}, - "10x": {0, "invalid unit 'x' of '10x', 'KkMm' are supported"}, - "10.0x": {0, "invalid syntax"}, - "ab": {0, "invalid syntax"}, - "abM": {0, "invalid syntax"}, - } - - for k, v := range cases { - i, e := transLimit(k) - suit.Equal(i, v.i) - if stringutils.IsEmptyStr(v.err) { - suit.Nil(e) - } else { - suit.NotNil(e) - suit.True(strings.Contains(e.Error(), v.err), true) - } - } -} - func (suit *dfgetSuit) Test_transFilter() { var cases = []string{ "a&b&c", @@ -170,10 +137,10 @@ func newProp(local int, total int, size int, nodes ...string) *config.Properties p.Nodes = nodes } if local != 0 { - p.LocalLimit = local + p.LocalLimit = rate.Rate(local) } if total != 0 { - p.TotalLimit = total + p.TotalLimit = rate.Rate(total) } if size != 0 { p.ClientQueueSize = size diff --git a/cmd/supernode/app/root.go b/cmd/supernode/app/root.go index 2d910e149..653478cbb 100644 --- a/cmd/supernode/app/root.go +++ b/cmd/supernode/app/root.go @@ -80,11 +80,11 @@ func setupFlags(cmd *cobra.Command, opt *Options) { flagSet.StringVar(&opt.DownloadPath, "download-path", opt.DownloadPath, "specifies the path where to store downloaded filed from source address") - flagSet.IntVar(&opt.SystemReservedBandwidth, "system-bandwidth", opt.SystemReservedBandwidth, - "Network rate reserved for system (unit: MB/s)") + flagSet.Var(&opt.SystemReservedBandwidth, "system-bandwidth", + "Network rate reserved for system") - flagSet.IntVar(&opt.MaxBandwidth, "max-bandwidth", opt.MaxBandwidth, - "network rate that supernode can use (unit: MB/s)") + flagSet.Var(&opt.MaxBandwidth, "max-bandwidth", + "network rate that supernode can use") flagSet.IntVar(&opt.SchedulerCorePoolSize, "pool-size", opt.SchedulerCorePoolSize, "the core pool size of ScheduledExecutorService") diff --git a/dfdaemon/config/config.go b/dfdaemon/config/config.go index 15877b330..116f22144 100644 --- a/dfdaemon/config/config.go +++ b/dfdaemon/config/config.go @@ -27,6 +27,7 @@ import ( "github.com/dragonflyoss/Dragonfly/dfdaemon/constant" dferr "github.com/dragonflyoss/Dragonfly/pkg/errortypes" + "github.com/dragonflyoss/Dragonfly/pkg/rate" "github.com/pkg/errors" "github.com/spf13/afero" @@ -97,11 +98,11 @@ type Properties struct { MaxProcs int `yaml:"maxprocs" json:"maxprocs"` // dfget config - DfgetFlags []string `yaml:"dfget_flags" json:"dfget_flags"` - SuperNodes []string `yaml:"supernodes" json:"supernodes"` - RateLimit string `yaml:"ratelimit" json:"ratelimit"` - DFRepo string `yaml:"localrepo" json:"localrepo"` - DFPath string `yaml:"dfpath" json:"dfpath"` + DfgetFlags []string `yaml:"dfget_flags" json:"dfget_flags"` + SuperNodes []string `yaml:"supernodes" json:"supernodes"` + RateLimit rate.Rate `yaml:"ratelimit" json:"ratelimit"` + DFRepo string `yaml:"localrepo" json:"localrepo"` + DFPath string `yaml:"dfpath" json:"dfpath"` } // Validate validates the config @@ -127,13 +128,6 @@ func (p *Properties) Validate() error { ) } - if ok, _ := regexp.MatchString("^[[:digit:]]+[MK]$", p.RateLimit); !ok { - return dferr.Newf( - constant.CodeExitRateLimitInvalid, - "invalid rate limit %s", p.RateLimit, - ) - } - return nil } @@ -150,7 +144,7 @@ func (p *Properties) DFGetConfig() DFGetConfig { dfgetConfig := DFGetConfig{ DfgetFlags: dfgetFlags, SuperNodes: p.SuperNodes, - RateLimit: p.RateLimit, + RateLimit: p.RateLimit.String(), DFRepo: p.DFRepo, DFPath: p.DFPath, } diff --git a/dfdaemon/config/config_test.go b/dfdaemon/config/config_test.go index 622bc8d9f..0aa89e8a0 100644 --- a/dfdaemon/config/config_test.go +++ b/dfdaemon/config/config_test.go @@ -28,6 +28,7 @@ import ( "github.com/dragonflyoss/Dragonfly/dfdaemon/constant" "github.com/dragonflyoss/Dragonfly/pkg/errortypes" + "github.com/dragonflyoss/Dragonfly/pkg/rate" "github.com/pkg/errors" "github.com/spf13/afero" @@ -95,21 +96,6 @@ func (ts *configTestSuite) TestValidateDFPath() { r.Equal(constant.CodeExitDfgetNotFound, getCode(c.Validate())) } -func (ts *configTestSuite) TestValidateRateLimit() { - c := defaultConfig() - r := ts.Require() - - for _, l := range []string{"M", "K", "1KB"} { - c.RateLimit = l - r.Equal(constant.CodeExitRateLimitInvalid, getCode(c.Validate())) - } - - for _, l := range []string{"1M", "20K", "20M"} { - c.RateLimit = l - r.Nil(c.Validate()) - } -} - func (ts *configTestSuite) TestURLNew() { r := ts.Require() @@ -366,7 +352,7 @@ func defaultConfig() *Properties { HostIP: "127.0.0.1", DFRepo: "/tmp", DFPath: "/tmp", - RateLimit: "20M", + RateLimit: 20 * rate.MB, } } diff --git a/dfdaemon/proxy/proxy.go b/dfdaemon/proxy/proxy.go index 949604657..e09df2e8e 100644 --- a/dfdaemon/proxy/proxy.go +++ b/dfdaemon/proxy/proxy.go @@ -146,7 +146,7 @@ func NewFromConfig(c config.Properties) (*Proxy, error) { if len(c.SuperNodes) > 0 { logrus.Infof("use supernodes: %s", strings.Join(c.SuperNodes, ",")) } - logrus.Infof("rate limit set to %s", c.RateLimit) + logrus.Infof("rate limit set to %s", c.RateLimit.String()) if c.HijackHTTPS != nil { opts = append(opts, WithHTTPSHosts(c.HijackHTTPS.Hosts...)) diff --git a/dfget/config/config.go b/dfget/config/config.go index 5b72d5167..a1305c763 100644 --- a/dfget/config/config.go +++ b/dfget/config/config.go @@ -32,6 +32,7 @@ import ( "github.com/dragonflyoss/Dragonfly/pkg/fileutils" "github.com/dragonflyoss/Dragonfly/pkg/netutils" "github.com/dragonflyoss/Dragonfly/pkg/printer" + "github.com/dragonflyoss/Dragonfly/pkg/rate" "github.com/dragonflyoss/Dragonfly/pkg/stringutils" "github.com/pkg/errors" @@ -52,27 +53,30 @@ import ( // nodes: // - 127.0.0.1 // - 10.10.10.1 -// localLimit: 20971520 -// totalLimit: 20971520 +// localLimit: 20M +// totalLimit: 20M // clientQueueSize: 6 type Properties struct { // Nodes specify supernodes. - Nodes []string `yaml:"nodes"` + Nodes []string `yaml:"nodes,omitempty" json:"nodes,omitempty"` - // LocalLimit rate limit about a single download task,format: 20M/m/K/k. - LocalLimit int `yaml:"localLimit"` + // LocalLimit rate limit about a single download task, format: G(B)/g/M(B)/m/K(B)/k/B + // pure number will also be parsed as Byte. + LocalLimit rate.Rate `yaml:"localLimit,omitempty" json:"localLimit,omitempty"` - // Minimal rate about a single download task,format: 20M/m/K/k. - MinRate int `yaml:"minRate"` + // Minimal rate about a single download task, format: G(B)/g/M(B)/m/K(B)/k/B + // pure number will also be parsed as Byte. + MinRate rate.Rate `yaml:"minRate,omitempty" json:"minRate,omitempty"` - // TotalLimit rate limit about the whole host,format: 20M/m/K/k. - TotalLimit int `yaml:"totalLimit"` + // TotalLimit rate limit about the whole host, format: G(B)/g/M(B)/m/K(B)/k/B + // pure number will also be parsed as Byte. + TotalLimit rate.Rate `yaml:"totalLimit,omitempty" json:"totalLimit,omitempty"` // ClientQueueSize is the size of client queue // which controls the number of pieces that can be processed simultaneously. // It is only useful when the Pattern equals "source". // The default value is 6. - ClientQueueSize int `yaml:"clientQueueSize"` + ClientQueueSize int `yaml:"clientQueueSize" json:"clientQueueSize,omitempty"` } // NewProperties create a new properties with default values. @@ -142,17 +146,20 @@ type Config struct { // Output full output path. Output string `json:"output"` - // LocalLimit rate limit about a single download task,format: 20M/m/K/k. - LocalLimit int `json:"localLimit,omitempty"` + // LocalLimit rate limit about a single download task, format: G(B)/g/M(B)/m/K(B)/k/B + // pure number will also be parsed as Byte. + LocalLimit rate.Rate `json:"localLimit,omitempty"` - // Minimal rate about a single download task,format: 20M/m/K/k. - MinRate int `json:"minRate,omitempty"` + // Minimal rate about a single download task, format: G(B)/g/M(B)/m/K(B)/k/B + // pure number will also be parsed as Byte. + MinRate rate.Rate `json:"minRate,omitempty"` - // TotalLimit rate limit about the whole host,format: 20M/m/K/k. - TotalLimit int `json:"totalLimit,omitempty"` + // TotalLimit rate limit about the whole host, format: G(B)/g/M(B)/m/K(B)/k/B + // pure number will also be parsed as Byte. + TotalLimit rate.Rate `json:"totalLimit,omitempty"` // Timeout download timeout(second). - Timeout int `json:"timeout,omitempty"` + Timeout time.Duration `json:"timeout,omitempty"` // Md5 expected file md5. Md5 string `json:"md5,omitempty"` @@ -208,25 +215,25 @@ type Config struct { ClientQueueSize int `json:"clientQueueSize,omitempty"` // Start time. - StartTime time.Time `json:"startTime"` + StartTime time.Time `json:"-"` // Sign the value is 'Pid + float64(time.Now().UnixNano())/float64(time.Second) format: "%d-%.3f"'. // It is unique for downloading task, and is used for debugging. - Sign string `json:"sign"` + Sign string `json:"-"` // Username of the system currently logged in. - User string `json:"user"` + User string `json:"-"` // WorkHome work home path, // default: `$HOME/.small-dragonfly`. - WorkHome string `json:"workHome"` + WorkHome string `json:"-"` // Config file paths, // default:["/etc/dragonfly/dfget.yml","/etc/dragonfly.conf"]. // // NOTE: It is recommended to use `/etc/dragonfly/dfget.yml` as default, // and the `/etc/dragonfly.conf` is just to ensure compatibility with previous versions. - ConfigFiles []string `json:"configFile"` + ConfigFiles []string `json:"-"` // RV stores the variables that are initialized and used at downloading task executing. RV RuntimeVariable `json:"-"` @@ -260,6 +267,8 @@ func NewConfig() *Config { cfg.RV.SystemDataDir = path.Join(cfg.WorkHome, "data") cfg.RV.FileLength = -1 cfg.ConfigFiles = []string{DefaultYamlConfigFile, DefaultIniConfigFile} + cfg.LocalLimit = DefaultLocalLimit + cfg.MinRate = DefaultMinRate return cfg } diff --git a/dfget/config/config_test.go b/dfget/config/config_test.go index 9c16503be..c61747124 100644 --- a/dfget/config/config_test.go +++ b/dfget/config/config_test.go @@ -30,6 +30,7 @@ import ( "time" "github.com/dragonflyoss/Dragonfly/pkg/errortypes" + "github.com/dragonflyoss/Dragonfly/pkg/rate" "github.com/dragonflyoss/Dragonfly/pkg/stringutils" "github.com/go-check/check" @@ -56,10 +57,10 @@ func (suite *ConfigSuite) TestConfig_String(c *check.C) { cfg := NewConfig() expected := "{\"url\":\"\",\"output\":\"\"" c.Assert(strings.Contains(cfg.String(), expected), check.Equals, true) - cfg.LocalLimit = 20971520 + cfg.LocalLimit = 20 * rate.MB cfg.Pattern = "p2p" - expected = "\"url\":\"\",\"output\":\"\",\"localLimit\":20971520," + - "\"pattern\":\"p2p\"" + expected = "\"url\":\"\",\"output\":\"\",\"localLimit\":\"20MB\"," + + "\"minRate\":\"64KB\",\"pattern\":\"p2p\"" c.Assert(strings.Contains(cfg.String(), expected), check.Equals, true) } @@ -173,8 +174,8 @@ func (suite *ConfigSuite) TestProperties_Load(c *check.C) { content: "nodes:\n - 10.10.10.1\n - 10.10.10.2\n", errMsg: "", expected: &Properties{Nodes: []string{"10.10.10.1", "10.10.10.2"}}}, {create: true, ext: "yaml", - content: "totalLimit: 10485760", - errMsg: "", expected: &Properties{TotalLimit: 10485760}}, + content: "totalLimit: 10M", + errMsg: "", expected: &Properties{TotalLimit: 10 * rate.MB}}, {create: false, ext: "ini", content: "[node]\naddress=1.1.1.1", errMsg: "read ini config"}, {create: true, ext: "ini", content: "[node]\naddress=1.1.1.1", expected: &Properties{Nodes: []string{"1.1.1.1"}}}, diff --git a/dfget/config/constants.go b/dfget/config/constants.go index 8495765e2..04b60c585 100644 --- a/dfget/config/constants.go +++ b/dfget/config/constants.go @@ -18,6 +18,8 @@ package config import ( "time" + + "github.com/dragonflyoss/Dragonfly/pkg/rate" ) /* the reason of backing to source */ @@ -48,8 +50,8 @@ const ( DefaultYamlConfigFile = "/etc/dragonfly/dfget.yml" DefaultIniConfigFile = "/etc/dragonfly.conf" DefaultNode = "127.0.0.1" - DefaultLocalLimit = 20 * 1024 * 1024 - DefaultMinRate = 64 * 1024 + DefaultLocalLimit = 20 * rate.MB + DefaultMinRate = 64 * rate.KB DefaultClientQueueSize = 6 ) diff --git a/dfget/core/core_test.go b/dfget/core/core_test.go index 4b6c4ac59..f52b62732 100644 --- a/dfget/core/core_test.go +++ b/dfget/core/core_test.go @@ -101,13 +101,6 @@ func (s *CoreTestSuite) TestRegisterToSupernode(c *check.C) { cfg.Node = []string{"x"} cfg.URL = "http://taobao.com" cfg.BackSourceReason = config.BackSourceReasonNone - // f(config.BackSourceReasonNone, false, nil) - - cfg.Node = []string{"x"} - cfg.URL = "http://lowzj.com" - f(config.BackSourceReasonNone, true, ®ist.RegisterResult{ - Node: "x", RemainderNodes: []string{}, URL: cfg.URL, TaskID: "a", - FileLength: 100, PieceSize: 10}) } func (s *CoreTestSuite) TestAdjustSupernodeList(c *check.C) { diff --git a/dfget/core/downloader/back_downloader/back_downloader.go b/dfget/core/downloader/back_downloader/back_downloader.go index 647554148..3edc0255a 100644 --- a/dfget/core/downloader/back_downloader/back_downloader.go +++ b/dfget/core/downloader/back_downloader/back_downloader.go @@ -112,7 +112,7 @@ func (bd *BackDownloader) Run() error { } buf := make([]byte, 512*1024) - reader := limitreader.NewLimitReader(resp.Body, bd.cfg.LocalLimit, bd.Md5 != "") + reader := limitreader.NewLimitReader(resp.Body, int64(bd.cfg.LocalLimit), bd.Md5 != "") if _, err = io.CopyBuffer(f, reader, buf); err != nil { return err } diff --git a/dfget/core/downloader/p2p_downloader/p2p_downloader.go b/dfget/core/downloader/p2p_downloader/p2p_downloader.go index cb553d0dd..a64d284c2 100644 --- a/dfget/core/downloader/p2p_downloader/p2p_downloader.go +++ b/dfget/core/downloader/p2p_downloader/p2p_downloader.go @@ -294,7 +294,7 @@ func (p2p *P2PDownloader) getPullRate(data *types.PullPieceTaskResponseContinueD localRate := data.DownLink * 1024 if p2p.cfg.LocalLimit > 0 { - localRate = p2p.cfg.LocalLimit + localRate = int(p2p.cfg.LocalLimit) } // Calculate the download speed limit @@ -307,18 +307,18 @@ func (p2p *P2PDownloader) getPullRate(data *types.PullPieceTaskResponseContinueD resp, err := uploaderAPI.ParseRate(p2p.cfg.RV.LocalIP, p2p.cfg.RV.PeerPort, req) if err != nil { logrus.Errorf("failed to parse rate in pull rate: %v", err) - p2p.rateLimiter.SetRate(ratelimiter.TransRate(localRate)) + p2p.rateLimiter.SetRate(ratelimiter.TransRate(int64(localRate))) return } reqRate, err := strconv.Atoi(resp) if err != nil { logrus.Errorf("failed to parse rate from resp %s: %v", resp, err) - p2p.rateLimiter.SetRate(ratelimiter.TransRate(localRate)) + p2p.rateLimiter.SetRate(ratelimiter.TransRate(int64(localRate))) return } logrus.Infof("pull rate result:%d cost:%v", reqRate, time.Since(start)) - p2p.rateLimiter.SetRate(ratelimiter.TransRate(reqRate)) + p2p.rateLimiter.SetRate(ratelimiter.TransRate(int64(reqRate))) } func (p2p *P2PDownloader) startTask(data *types.PullPieceTaskResponseContinueData) { diff --git a/dfget/core/uploader/peer_server.go b/dfget/core/uploader/peer_server.go index b500061c1..8597c930d 100644 --- a/dfget/core/uploader/peer_server.go +++ b/dfget/core/uploader/peer_server.go @@ -213,7 +213,7 @@ func (ps *peerServer) checkHandler(w http.ResponseWriter, r *http.Request) { if ps.rateLimiter == nil { ps.rateLimiter = ratelimiter.NewRateLimiter(int64(totalLimit), 2) } else { - ps.rateLimiter.SetRate(ratelimiter.TransRate(totalLimit)) + ps.rateLimiter.SetRate(ratelimiter.TransRate(int64(totalLimit))) } ps.totalLimitRate = totalLimit logrus.Infof("update total limit to %d", totalLimit) diff --git a/dfget/core/uploader/peer_server_executor.go b/dfget/core/uploader/peer_server_executor.go index bf0b9bdc1..98d842e70 100644 --- a/dfget/core/uploader/peer_server_executor.go +++ b/dfget/core/uploader/peer_server_executor.go @@ -137,7 +137,7 @@ func (pe *peerServerExecutor) checkPeerServerExist(cfg *config.Config, port int) } // check the peer server whether is available - result, err := checkServer(cfg.RV.LocalIP, port, cfg.RV.DataDir, taskFileName, cfg.TotalLimit) + result, err := checkServer(cfg.RV.LocalIP, port, cfg.RV.DataDir, taskFileName, int(cfg.TotalLimit)) logrus.Infof("local http result:%s err:%v, port:%d path:%s", result, err, port, config.LocalHTTPPathCheck) diff --git a/go.sum b/go.sum index 90e8ccce5..7c3f9b48a 100644 --- a/go.sum +++ b/go.sum @@ -6,7 +6,9 @@ github.com/PuerkitoBio/purell v0.0.0-20170829232023-f619812e3caf h1:ePmEKucT6HqN github.com/PuerkitoBio/purell v0.0.0-20170829232023-f619812e3caf/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/asaskevich/govalidator v0.0.0-20170903095215-73945b6115bf h1:wXq5VXJjLole37O6oWZwqBRbKZw6VmC+wuAe8j/w2ZA= @@ -209,6 +211,7 @@ google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9Ywl google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= +gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= diff --git a/pkg/limitreader/limit_reader.go b/pkg/limitreader/limit_reader.go index 9ead0bdf1..3a07a2556 100644 --- a/pkg/limitreader/limit_reader.go +++ b/pkg/limitreader/limit_reader.go @@ -28,7 +28,7 @@ import ( // NewLimitReader creates LimitReader // src: reader // rate: bytes/second -func NewLimitReader(src io.Reader, rate int, calculateMd5 bool) *LimitReader { +func NewLimitReader(src io.Reader, rate int64, calculateMd5 bool) *LimitReader { return NewLimitReaderWithLimiter(newRateLimiterWithDefaultWindow(rate), src, calculateMd5) } @@ -50,7 +50,7 @@ func NewLimitReaderWithLimiter(rl *ratelimiter.RateLimiter, src io.Reader, calcu // NewLimitReaderWithMD5Sum creates LimitReader with a md5 sum. // src: reader // rate: bytes/second -func NewLimitReaderWithMD5Sum(src io.Reader, rate int, md5sum hash.Hash) *LimitReader { +func NewLimitReaderWithMD5Sum(src io.Reader, rate int64, md5sum hash.Hash) *LimitReader { return NewLimitReaderWithLimiterAndMD5Sum(src, newRateLimiterWithDefaultWindow(rate), md5sum) } @@ -65,7 +65,7 @@ func NewLimitReaderWithLimiterAndMD5Sum(src io.Reader, rl *ratelimiter.RateLimit } } -func newRateLimiterWithDefaultWindow(rate int) *ratelimiter.RateLimiter { +func newRateLimiterWithDefaultWindow(rate int64) *ratelimiter.RateLimiter { return ratelimiter.NewRateLimiter(ratelimiter.TransRate(rate), 2) } diff --git a/pkg/netutils/netutils.go b/pkg/netutils/netutils.go index ecc571455..85227c4c8 100644 --- a/pkg/netutils/netutils.go +++ b/pkg/netutils/netutils.go @@ -29,6 +29,7 @@ import ( "strings" "time" + "github.com/dragonflyoss/Dragonfly/pkg/rate" "github.com/dragonflyoss/Dragonfly/pkg/stringutils" log "github.com/sirupsen/logrus" @@ -39,17 +40,18 @@ const ( layoutGMT = "GMT" ) -var defaultRateLimit = "20M" +// default rate limit is 20M. +var defaultRateLimit = 20 * rate.MB -// NetLimit parses speed of interface that it has prefix of eth. -func NetLimit() string { +// NetLimit parse speed of interface that it has prefix of eth. +func NetLimit() *rate.Rate { defer func() { if err := recover(); err != nil { log.Errorf("parse default net limit error:%v", err) } }() if runtime.NumCPU() < 24 { - return defaultRateLimit + return &defaultRateLimit } var ethtool string @@ -60,18 +62,18 @@ func NetLimit() string { } if ethtool == "" { log.Warn("ethtool not found") - return defaultRateLimit + return &defaultRateLimit } var maxInterfaceLimit = uint64(0) interfaces, err := net.Interfaces() if err != nil { - return defaultRateLimit + return &defaultRateLimit } compile := regexp.MustCompile("^[[:space:]]*([[:digit:]]+)[[:space:]]*Mb/s[[:space:]]*$") for _, dev := range interfaces { - if !strings.HasPrefix(dev.Name, "eth") { + if !strings.HasPrefix(dev.Name, "enp") { continue } cmd := exec.Command(ethtool, dev.Name) @@ -85,7 +87,8 @@ func NetLimit() string { continue } scanner := bufio.NewScanner(stdoutPipe) - + // TODO(yeya24): using scanner.Scan() will execute multiple syscall to read data, + // change to use a single syscall to read all data here. for scanner.Scan() { fields := strings.Split(strings.TrimSpace(scanner.Text()), ":") if len(fields) != 2 { @@ -96,7 +99,7 @@ func NetLimit() string { } speed := compile.FindStringSubmatch(fields[1]) if tmpLimit, err := strconv.ParseUint(speed[1], 0, 32); err == nil { - tmpLimit = tmpLimit * 8 / 10 + tmpLimit = tmpLimit / 8 if tmpLimit > maxInterfaceLimit { maxInterfaceLimit = tmpLimit } @@ -107,10 +110,11 @@ func NetLimit() string { } if maxInterfaceLimit > 0 { - return strconv.FormatUint(maxInterfaceLimit/8, 10) + "M" + r := rate.Rate(maxInterfaceLimit) * rate.MB + return &r } - return defaultRateLimit + return &defaultRateLimit } // ExtractHost extracts host ip from the giving string. @@ -281,7 +285,7 @@ func isExist(mmap map[string]bool, key string) bool { // CalculateTimeout calculates the timeout(in seconds) according to the fileLength and the min rate of network. // // The 0 will be returned when both minRate and defaultMinRate both are <=0. -func CalculateTimeout(fileLength int64, minRate int, defaultMinRate int, reservedTime time.Duration) time.Duration { +func CalculateTimeout(fileLength int64, minRate, defaultMinRate rate.Rate, reservedTime time.Duration) time.Duration { // ensure the minRate to avoid trigger panic when minRate equals zero if fileLength <= 0 || (minRate <= 0 && defaultMinRate <= 0) { diff --git a/pkg/netutils/netutils_test.go b/pkg/netutils/netutils_test.go index bd6f66205..cc538c3bc 100644 --- a/pkg/netutils/netutils_test.go +++ b/pkg/netutils/netutils_test.go @@ -22,6 +22,8 @@ import ( "testing" "time" + "github.com/dragonflyoss/Dragonfly/pkg/rate" + "github.com/go-check/check" ) @@ -62,7 +64,7 @@ func (suite *NetUtilSuite) TestGetIPAndPortFromNode(c *check.C) { func (suite *NetUtilSuite) TestNetLimit(c *check.C) { speed := NetLimit() if runtime.NumCPU() < 24 { - c.Assert(speed, check.Equals, "20M") + c.Assert(*speed, check.Equals, 20*rate.MB) } } @@ -226,8 +228,8 @@ func (suite *NetUtilSuite) TestConvertTimeIntToString(c *check.C) { func (suite *NetUtilSuite) TestCalculateTimeout(c *check.C) { var cases = []struct { fileLength int64 - minRate int - defaultMinRate int + minRate rate.Rate + defaultMinRate rate.Rate reservedTime time.Duration expectedResult time.Duration }{ diff --git a/pkg/rate/rate.go b/pkg/rate/rate.go new file mode 100644 index 000000000..b135bd246 --- /dev/null +++ b/pkg/rate/rate.go @@ -0,0 +1,142 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rate + +import ( + "encoding/json" + "fmt" + "regexp" + "strconv" +) + +// Rate wraps int64. It is used to parse the custom rate format +// from YAML and JSON. +// This type should not propagate beyond the scope of input/output processing. +type Rate int64 + +const ( + B Rate = 1 + KB = 1024 * B + MB = 1024 * KB + GB = 1024 * MB +) + +// Set implements pflag/flag.Value +func (d *Rate) Set(s string) error { + var err error + *d, err = ParseRate(s) + return err +} + +// Type implements pflag.Value +func (d *Rate) Type() string { + return "rate" +} + +var rateRE = regexp.MustCompile("^([0-9]+)(MB?|m|KB?|k|GB?|g|B)$") + +// ParseRate parses a string into a int64. +func ParseRate(rateStr string) (Rate, error) { + var n int + n, err := strconv.Atoi(rateStr) + if err == nil && n >= 0 { + return Rate(n), nil + } + + if n < 0 { + return 0, fmt.Errorf("not a valid rate string: %d, only non-negative values are supported", n) + } + + matches := rateRE.FindStringSubmatch(rateStr) + if len(matches) != 3 { + return 0, fmt.Errorf("not a valid rate string: %q, supported format: G(B)/g/M(B)/m/K(B)/k/B or pure number", rateStr) + } + n, _ = strconv.Atoi(matches[1]) + switch unit := matches[2]; { + case unit == "g" || unit == "G" || unit == "GB": + n *= int(GB) + case unit == "m" || unit == "M" || unit == "MB": + n *= int(MB) + case unit == "k" || unit == "K" || unit == "KB": + n *= int(KB) + case unit == "B": + // Value already correct + default: + return 0, fmt.Errorf("invalid unit in rate string: %q, supported format: G(B)/g/M(B)/m/K(B)/k/B or pure number", unit) + } + return Rate(n), nil +} + +// String returns the rate with an uppercase unit. +func (d Rate) String() string { + var ( + n = int64(d) + symbol = "B" + unit = B + ) + if n == 0 { + return "0B" + } + + switch int64(0) { + case n % int64(GB): + symbol = "GB" + unit = GB + case n % int64(MB): + symbol = "MB" + unit = MB + case n % int64(KB): + symbol = "KB" + unit = KB + } + return fmt.Sprintf("%v%v", n/int64(unit), symbol) +} + +// MarshalYAML implements the yaml.Marshaler interface. +func (d Rate) MarshalYAML() (interface{}, error) { + return d.String(), nil +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (d *Rate) UnmarshalYAML(unmarshal func(interface{}) error) error { + var s string + if err := unmarshal(&s); err != nil { + return err + } + rate, err := ParseRate(s) + if err != nil { + return err + } + *d = rate + return nil +} + +// MarshalJSON implements the json.Marshaler interface. +func (d Rate) MarshalJSON() ([]byte, error) { + return json.Marshal(d.String()) +} + +// UnmarshalJSON implements the json.Unmarshaler interface. +func (d *Rate) UnmarshalJSON(b []byte) error { + str, _ := strconv.Unquote(string(b)) + rate, err := ParseRate(str) + if err != nil { + return err + } + *d = rate + return nil +} diff --git a/pkg/rate/rate_test.go b/pkg/rate/rate_test.go new file mode 100644 index 000000000..e20ce61ab --- /dev/null +++ b/pkg/rate/rate_test.go @@ -0,0 +1,234 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rate + +import ( + "encoding/json" + "testing" + + "github.com/go-check/check" + "gopkg.in/yaml.v2" +) + +func Test(t *testing.T) { + check.TestingT(t) +} + +type RateSuite struct{} + +func init() { + check.Suite(&RateSuite{}) +} + +func (suite *RateSuite) TestParseRate(c *check.C) { + var cases = []struct { + input string + expected Rate + isWrong bool + }{ + {"5m", 5 * MB, false}, + {"5M", 5 * MB, false}, + {"5MB", 5 * MB, false}, + {"1B", B, false}, + {"100", 100 * B, false}, + {"10K", 10 * KB, false}, + {"10KB", 10 * KB, false}, + {"10k", 10 * KB, false}, + {"10G", 10 * GB, false}, + {"10GB", 10 * GB, false}, + {"10g", 10 * GB, false}, + {"10xx", 0, true}, + } + + for _, cc := range cases { + output, err := ParseRate(cc.input) + if !cc.isWrong { + c.Assert(err, check.IsNil) + c.Assert(output, check.Equals, cc.expected) + } else { + c.Assert(err, check.NotNil) + } + + } +} + +func (suite *RateSuite) TestString(c *check.C) { + var cases = []struct { + expected string + input Rate + }{ + {"5MB", 5 * MB}, + {"1B", B}, + {"0B", Rate(0)}, + {"10KB", 10 * KB}, + {"1GB", GB}, + } + + for _, cc := range cases { + c.Check(cc.expected, check.Equals, cc.input.String()) + } +} + +func (suite *RateSuite) TestMarshalJSON(c *check.C) { + var cases = []struct { + input Rate + output string + }{ + { + 5 * MB, + "\"5MB\"", + }, + { + 1 * GB, + "\"1GB\"", + }, + { + 1 * B, + "\"1B\"", + }, + { + 1 * KB, + "\"1KB\"", + }, + } + + for _, cc := range cases { + data, err := json.Marshal(cc.input) + c.Check(err, check.IsNil) + c.Check(string(data), check.Equals, cc.output) + } +} + +func (suite *RateSuite) TestUnMarshalJSON(c *check.C) { + var cases = []struct { + output Rate + input string + }{ + { + 5 * MB, + "\"5M\"", + }, + { + 5 * MB, + "\"5MB\"", + }, + { + 5 * MB, + "\"5m\"", + }, + { + 1 * GB, + "\"1GB\"", + }, + { + 1 * GB, + "\"1G\"", + }, + { + 1 * GB, + "\"1g\"", + }, + { + 1 * B, + "\"1B\"", + }, + { + 1 * B, + "\"1\"", + }, + { + 1 * KB, + "\"1KB\"", + }, + { + 1 * KB, + "\"1K\"", + }, + { + 1 * KB, + "\"1k\"", + }, + } + + for _, cc := range cases { + var r Rate + err := json.Unmarshal([]byte(cc.input), &r) + c.Check(err, check.IsNil) + c.Check(r, check.Equals, cc.output) + } +} + +func (suite *RateSuite) TestMarshalYAML(c *check.C) { + var cases = []struct { + input Rate + output string + }{ + { + 5 * MB, + "5MB\n", + }, + { + 1 * GB, + "1GB\n", + }, + { + 1 * B, + "1B\n", + }, + { + 1 * KB, + "1KB\n", + }, + } + + for _, cc := range cases { + data, err := yaml.Marshal(cc.input) + c.Check(err, check.IsNil) + c.Check(string(data), check.Equals, cc.output) + } +} + +func (suite *RateSuite) TestUnMarshalYAML(c *check.C) { + var cases = []struct { + output Rate + input string + }{ + { + 5 * MB, + "5M\n", + }, + { + 1 * GB, + "1G\n", + }, + { + 1 * B, + "1B\n", + }, + { + 1 * KB, + "1K\n", + }, + } + + for _, cc := range cases { + var output Rate + err := yaml.Unmarshal([]byte(cc.input), &output) + c.Check(err, check.IsNil) + c.Check(output, check.Equals, cc.output) + } +} diff --git a/pkg/ratelimiter/ratelimiter.go b/pkg/ratelimiter/ratelimiter.go index 77d2973b4..e6e2b2fff 100644 --- a/pkg/ratelimiter/ratelimiter.go +++ b/pkg/ratelimiter/ratelimiter.go @@ -142,10 +142,10 @@ func (rl *RateLimiter) blocking(requiredToken int64) { // TransRate trans the rate to multiples of 1000. // For NewRateLimiter, the production of rate should be division by 1000. -func TransRate(rate int) int64 { +func TransRate(rate int64) int64 { if rate <= 0 { rate = 10 * 1024 * 1024 } rate = (rate/1000 + 1) * 1000 - return int64(rate) + return rate } diff --git a/supernode/config/config.go b/supernode/config/config.go index 6252017c4..5bf29dc06 100644 --- a/supernode/config/config.go +++ b/supernode/config/config.go @@ -22,9 +22,10 @@ import ( "strings" "time" - "gopkg.in/yaml.v2" - "github.com/dragonflyoss/Dragonfly/pkg/fileutils" + "github.com/dragonflyoss/Dragonfly/pkg/rate" + + "gopkg.in/yaml.v2" ) // NewConfig creates an instant with default values. @@ -97,9 +98,9 @@ func NewBaseProperties() *BaseProperties { PeerDownLimit: 5, EliminationLimit: 5, FailureCountLimit: 5, - LinkLimit: 20, - SystemReservedBandwidth: DefaultSystemReservedBandwidth, - MaxBandwidth: DefaultMaxBandwidth, + LinkLimit: 20 * rate.MB, + SystemReservedBandwidth: 20 * rate.MB, + MaxBandwidth: 200 * rate.MB, EnableProfiler: false, Debug: false, FailAccessInterval: DefaultFailAccessInterval, @@ -163,19 +164,17 @@ type BaseProperties struct { // default: 5 FailureCountLimit int `yaml:"failureCountLimit"` - // LinkLimit is set for supernode to limit every piece download network speed (unit: MB/s). - // default: 20 - LinkLimit int `yaml:"linkLimit"` + // LinkLimit is set for supernode to limit every piece download network speed. + // default: 20 MB, in format of G(B)/g/M(B)/m/K(B)/k/B, pure number will also be parsed as Byte. + LinkLimit rate.Rate `yaml:"linkLimit"` // SystemReservedBandwidth is the network bandwidth reserved for system software. - // unit: MB/s - // default: 20 - SystemReservedBandwidth int `yaml:"systemReservedBandwidth"` + // default: 20 MB, in format of G(B)/g/M(B)/m/K(B)/k/B, pure number will also be parsed as Byte. + SystemReservedBandwidth rate.Rate `yaml:"systemReservedBandwidth"` // MaxBandwidth is the network bandwidth that supernode can use. - // unit: MB/s - // default: 200 - MaxBandwidth int `yaml:"maxBandwidth"` + // default: 200 MB, in format of G(B)/g/M(B)/m/K(B)/k/B, pure number will also be parsed as Byte. + MaxBandwidth rate.Rate `yaml:"maxBandwidth"` // Whether to enable profiler // default: false @@ -213,8 +212,3 @@ type BaseProperties struct { // superNodePID is the ID of supernode, which is the same as peer ID of dfget. superNodePID string } - -// TransLimit trans rateLimit from MB/s to B/s. -func TransLimit(rateLimit int) int { - return rateLimit * 1024 * 1024 -} diff --git a/supernode/daemon/mgr/cdn/manager.go b/supernode/daemon/mgr/cdn/manager.go index 018565318..0ffb7703d 100644 --- a/supernode/daemon/mgr/cdn/manager.go +++ b/supernode/daemon/mgr/cdn/manager.go @@ -78,7 +78,7 @@ type Manager struct { // NewManager returns a new Manager. func NewManager(cfg *config.Config, cacheStore *store.Store, progressManager mgr.ProgressMgr, originClient httpclient.OriginHTTPClient, register prometheus.Registerer) (*Manager, error) { - rateLimiter := ratelimiter.NewRateLimiter(ratelimiter.TransRate(config.TransLimit(cfg.MaxBandwidth-cfg.SystemReservedBandwidth)), 2) + rateLimiter := ratelimiter.NewRateLimiter(ratelimiter.TransRate(int64(cfg.MaxBandwidth-cfg.SystemReservedBandwidth)), 2) metaDataManager := newFileMetaDataManager(cacheStore) pieceMD5Manager := newpieceMD5Mgr() cdnReporter := newReporter(cfg, cacheStore, progressManager, metaDataManager, pieceMD5Manager)