Skip to content

Commit

Permalink
bugfix: add timeout for download piece
Browse files Browse the repository at this point in the history
Signed-off-by: Starnop <[email protected]>
  • Loading branch information
starnop committed Sep 1, 2019
1 parent d8738f3 commit 79d3e3c
Show file tree
Hide file tree
Showing 24 changed files with 184 additions and 53 deletions.
5 changes: 4 additions & 1 deletion cmd/dfget/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ func runServer() error {
if err != nil {
return err
}
printer.Printf("dfget uploader server port is %d.", port)

// NOTE: Please update the dfget.PeerServerExecutor.readPort
// because it will get the port from the stdout after call the `dfget server`.
printer.Printf("dfget uploader server port is %d", port)
uploader.WaitForShutdown()
return nil
}
Expand Down
5 changes: 3 additions & 2 deletions dfget/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ const (
LocalHTTPPathRate = "/rate/"
LocalHTTPPing = "/server/ping"

DataExpireTime = 3 * time.Minute
ServerAliveTime = 5 * time.Minute
DataExpireTime = 3 * time.Minute
ServerAliveTime = 5 * time.Minute
DefaultDownlodTimeout = 5 * time.Minute

DefaultSupernodePort = 8002
)
Expand Down
7 changes: 4 additions & 3 deletions dfget/core/api/download_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net/http"
"strconv"
"time"

"github.com/dragonflyoss/Dragonfly/dfget/config"
"github.com/dragonflyoss/Dragonfly/pkg/httputils"
Expand All @@ -38,7 +39,7 @@ type DownloadRequest struct {
// DownloadAPI defines the download method between dfget and peer server.
type DownloadAPI interface {
// Download downloads a piece and returns an HTTP response.
Download(ip string, port int, req *DownloadRequest) (*http.Response, error)
Download(ip string, port int, req *DownloadRequest, timeout time.Duration) (*http.Response, error)
}

// downloadAPI is an implementation of interface DownloadAPI.
Expand All @@ -52,13 +53,13 @@ func NewDownloadAPI() DownloadAPI {
return &downloadAPI{}
}

func (d *downloadAPI) Download(ip string, port int, req *DownloadRequest) (*http.Response, error) {
func (d *downloadAPI) Download(ip string, port int, req *DownloadRequest, timeout time.Duration) (*http.Response, error) {
headers := make(map[string]string)
headers[config.StrRange] = config.StrBytes + "=" + req.PieceRange
headers[config.StrPieceNum] = strconv.Itoa(req.PieceNum)
headers[config.StrPieceSize] = fmt.Sprint(req.PieceSize)
headers[config.StrUserAgent] = "dfget/" + version.DFGetVersion

url := fmt.Sprintf("http://%s:%d%s", ip, port, req.Path)
return httputils.HTTPGet(url, headers)
return httputils.HTTPGetTimeout(url, headers, timeout)
}
20 changes: 4 additions & 16 deletions dfget/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,10 @@ func downloadFile(cfg *config.Config, supernodeAPI api.SupernodeAPI,
getter = p2pDown.NewP2PDownloader(cfg, supernodeAPI, register, result)
}

timeout := calculateTimeout(cfg.RV.FileLength, cfg.Timeout, cfg.MinRate)
timeout := netutils.CalculateTimeout(cfg.RV.FileLength, cfg.MinRate, config.DefaultMinRate, 10*time.Second)
if timeout == 0 && cfg.Timeout > 0 {
timeout = time.Duration(cfg.Timeout) * time.Second
}
err := downloader.DoDownloadTimeout(getter, timeout)
success := "SUCCESS"
if err != nil {
Expand Down Expand Up @@ -266,18 +269,3 @@ func checkConnectSupernode(nodes []string) (localIP string) {
}
return ""
}

func calculateTimeout(fileLength int64, defaultTimeoutSecond int, minRate int) time.Duration {
timeout := 5 * 60
// avoid trigger panic when minRate equals zero
if minRate <= 0 {
minRate = config.DefaultMinRate
}

if defaultTimeoutSecond > 0 {
timeout = defaultTimeoutSecond
} else if fileLength > 0 {
timeout = int(fileLength/int64(minRate) + 10)
}
return time.Duration(timeout) * time.Second
}
4 changes: 3 additions & 1 deletion dfget/core/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"fmt"
"time"

"github.com/dragonflyoss/Dragonfly/dfget/config"
"github.com/dragonflyoss/Dragonfly/pkg/fileutils"

"github.com/sirupsen/logrus"
Expand All @@ -40,7 +41,8 @@ type Downloader interface {
// the given timeout duration.
func DoDownloadTimeout(downloader Downloader, timeout time.Duration) error {
if timeout <= 0 {
return fmt.Errorf("download timeout(%.3fs)", timeout.Seconds())
logrus.Warnf("invalid download timeout(%.3fs)", timeout.Seconds())
timeout = config.DefaultDownlodTimeout
}

var ch = make(chan error)
Expand Down
2 changes: 1 addition & 1 deletion dfget/core/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (s *DownloaderTestSuite) TestDoDownloadTimeout(c *check.C) {
md := &MockDownloader{100}

err := DoDownloadTimeout(md, 0*time.Millisecond)
c.Assert(err, check.NotNil)
c.Assert(err, check.IsNil)

err = DoDownloadTimeout(md, 50*time.Millisecond)
c.Assert(err, check.NotNil)
Expand Down
16 changes: 15 additions & 1 deletion dfget/core/downloader/p2p_downloader/p2p_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ type P2PDownloader struct {
// pullRateTime the time when the pull rate API is called to
// control the time interval between two calls to the API.
pullRateTime time.Time

// dfget will sleep some time which between minTimeout and maxTimeout
// unit: Millisecond
minTimeout int
maxTimeout int
}

var _ downloader.Downloader = &P2PDownloader{}
Expand All @@ -113,6 +118,8 @@ func NewP2PDownloader(cfg *config.Config,
API: api,
Register: register,
RegisterResult: result,
minTimeout: 50,
maxTimeout: 100,
}
p2p.init()
return p2p
Expand Down Expand Up @@ -241,9 +248,16 @@ func (p2p *P2PDownloader) pullPieceTask(item *Piece) (
if p2p.queue.Len() > 0 {
break
}
sleepTime := time.Duration(rand.Intn(1400)+600) * time.Millisecond

sleepTime := time.Duration(rand.Intn(p2p.maxTimeout-p2p.minTimeout)+p2p.minTimeout) * time.Millisecond
logrus.Infof("pull piece task(%+v) result:%s and sleep %.3fs", item, res, sleepTime.Seconds())
time.Sleep(sleepTime)

// gradually increase the sleep time, up to [800-1600]
if p2p.minTimeout < 800 {
p2p.minTimeout *= 2
p2p.maxTimeout *= 2
}
}

// FIXME: try to abstract the judgement to make it more readable.
Expand Down
5 changes: 4 additions & 1 deletion dfget/core/downloader/p2p_downloader/power_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
"github.com/dragonflyoss/Dragonfly/pkg/httputils"
"github.com/dragonflyoss/Dragonfly/pkg/limitreader"
"github.com/dragonflyoss/Dragonfly/pkg/netutils"
"github.com/dragonflyoss/Dragonfly/pkg/queue"
"github.com/dragonflyoss/Dragonfly/pkg/ratelimiter"

Expand Down Expand Up @@ -115,10 +116,12 @@ func (pc *PowerClient) downloadPiece() (content *bytes.Buffer, e error) {

// send download request
startTime := time.Now()
resp, err := pc.downloadAPI.Download(dstIP, peerPort, pc.createDownloadRequest())
timeout := netutils.CalculateTimeout(int64(pc.pieceTask.PieceSize), pc.cfg.MinRate, config.DefaultMinRate, 10*time.Second)
resp, err := pc.downloadAPI.Download(dstIP, peerPort, pc.createDownloadRequest(), timeout)
if err != nil {
return nil, err
}
logrus.Debugf("success to get resp timeSince(%v)", time.Since(startTime))
defer resp.Body.Close()
if resp.StatusCode == http.StatusRequestedRangeNotSatisfiable {
return nil, errortypes.ErrRangeNotSatisfiable
Expand Down
3 changes: 2 additions & 1 deletion dfget/core/downloader/p2p_downloader/power_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"io/ioutil"
"net"
"net/http"
"time"

"github.com/dragonflyoss/Dragonfly/dfget/config"
"github.com/dragonflyoss/Dragonfly/dfget/core/api"
Expand Down Expand Up @@ -188,6 +189,6 @@ func NewMockDownloadAPI() api.DownloadAPI {
return &downloadMockAPI{}
}

func (d *downloadMockAPI) Download(ip string, port int, req *api.DownloadRequest) (*http.Response, error) {
func (d *downloadMockAPI) Download(ip string, port int, req *api.DownloadRequest, timeout time.Duration) (*http.Response, error) {
return downloadMock()
}
9 changes: 5 additions & 4 deletions dfget/core/uploader/peer_server_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (pe *peerServerExecutor) StartPeerServerProcess(cfg *config.Config) (port i
return 0, err
}
if err = cmd.Start(); err == nil {
port, err = pe.readPort(stdout)
port, err = readPort(stdout)
}
if err == nil && pe.checkPeerServerExist(cfg, port) <= 0 {
err = fmt.Errorf("invalid server on port:%d", port)
Expand All @@ -100,7 +100,7 @@ func (pe *peerServerExecutor) StartPeerServerProcess(cfg *config.Config) (port i
return
}

func (pe *peerServerExecutor) readPort(r io.Reader) (int, error) {
func readPort(r io.Reader) (int, error) {
done := make(chan error)
var port int32

Expand All @@ -112,8 +112,9 @@ func (pe *peerServerExecutor) readPort(r io.Reader) (int, error) {
done <- err
}

content := strings.TrimSpace(string(buf[:n]))
portValue, err := strconv.Atoi(content)
content := string(buf[:n])
contentSlice := strings.Split(content, " ")
portValue, err := strconv.Atoi(strings.TrimSpace(contentSlice[len(contentSlice)-1]))
// avoid data race
atomic.StoreInt32(&port, int32(portValue))
done <- err
Expand Down
9 changes: 9 additions & 0 deletions dfget/core/uploader/peer_server_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/http"
"os"
"path"
"strconv"
"strings"

"github.com/dragonflyoss/Dragonfly/dfget/core/helper"
Expand Down Expand Up @@ -114,6 +115,14 @@ func (s *PeerServerExecutorTestSuite) TestStartPeerServerProcess(c *check.C) {
c.Assert(e, check.IsNil)
}

func (s *PeerServerExecutorTestSuite) TestReadPort(c *check.C) {
port := 39480
reader := strings.NewReader("dfget uploader server port is " + strconv.Itoa(port) + "\n")
result, err := readPort(reader)
c.Check(err, check.IsNil)
c.Check(result, check.Equals, port)
}

// ---------------------------------------------------------------------------
// helper functions

Expand Down
5 changes: 5 additions & 0 deletions pkg/fileutils/fileutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@ import (
"os/user"
"path"
"path/filepath"
"testing"

"github.com/go-check/check"
)

func Test(t *testing.T) {
check.TestingT(t)
}

type FileUtilTestSuite struct {
tmpDir string
username string
Expand Down
14 changes: 6 additions & 8 deletions pkg/httputils/http_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package httputils

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net"
Expand Down Expand Up @@ -226,17 +225,16 @@ func HTTPWithHeaders(method, url string, headers map[string]string, timeout time
return nil, err
}

if timeout > 0 {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
req = req.WithContext(ctx)
defer cancel()
}

for k, v := range headers {
req.Header.Add(k, v)
}

return http.DefaultClient.Do(req)
c := &http.Client{}
if timeout > 0 {
c.Timeout = timeout
}

return c.Do(req)
}

// HTTPStatusOk reports whether the http response code is 200.
Expand Down
5 changes: 5 additions & 0 deletions pkg/httputils/http_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"math/rand"
"net"
"testing"
"time"

"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
Expand All @@ -29,6 +30,10 @@ import (
"github.com/valyala/fasthttp"
)

func Test(t *testing.T) {
check.TestingT(t)
}

type HTTPUtilTestSuite struct {
port int
host string
Expand Down
16 changes: 16 additions & 0 deletions pkg/netutils/netutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,3 +277,19 @@ func isExist(mmap map[string]bool, key string) bool {
}
return false
}

// CalculateTimeout calculate the timeout(in seconds) according to the fileLength and the min rate of network.
//
// The 0 will be returned when both minRate and defaultMinRate both are <=0.
func CalculateTimeout(fileLength int64, minRate int, defaultMinRate int, reservedTime time.Duration) time.Duration {
// ensure the minRate to avoid trigger panic when minRate equals zero
if fileLength <= 0 ||
(minRate <= 0 && defaultMinRate <= 0) {
return 0
}
if minRate <= 0 {
minRate = defaultMinRate
}

return time.Duration(fileLength/int64(minRate))*time.Second + reservedTime
}
Loading

0 comments on commit 79d3e3c

Please sign in to comment.