From fc4099bca3e18dd4e158ac8b1b9f9e034f368523 Mon Sep 17 00:00:00 2001 From: Starnop Date: Mon, 29 Jul 2019 22:09:46 +0800 Subject: [PATCH] bugfix: return error when the url is unreachable Signed-off-by: Starnop --- dfget/core/regist/register.go | 3 +- pkg/{time/time.go => timeutils/time_util.go} | 2 +- supernode/config/config.go | 7 +++ supernode/daemon/mgr/cdn/cdn_util.go | 4 +- supernode/daemon/mgr/task/manager.go | 36 ++++++------ supernode/daemon/mgr/task/manager_util.go | 21 ++++++- .../daemon/mgr/task/manager_util_test.go | 57 ++++++++++--------- supernode/server/result_info.go | 4 ++ 8 files changed, 85 insertions(+), 49 deletions(-) rename pkg/{time/time.go => timeutils/time_util.go} (97%) diff --git a/dfget/core/regist/register.go b/dfget/core/regist/register.go index 963adc2a5..e21248289 100644 --- a/dfget/core/regist/register.go +++ b/dfget/core/regist/register.go @@ -74,7 +74,8 @@ func (s *supernodeRegister) Register(peerPort int) (*RegisterResult, *errortypes logrus.Errorf("register to node:%s error:%v", nodes[i], e) continue } - if resp.Code == constants.Success || resp.Code == constants.CodeNeedAuth { + if resp.Code == constants.Success || resp.Code == constants.CodeNeedAuth || + resp.Code == constants.CodeURLNotReachable { break } if resp.Code == constants.CodeWaitAuth && retryTimes < 3 { diff --git a/pkg/time/time.go b/pkg/timeutils/time_util.go similarity index 97% rename from pkg/time/time.go rename to pkg/timeutils/time_util.go index 425a79e5d..53a6886bd 100644 --- a/pkg/time/time.go +++ b/pkg/timeutils/time_util.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package time +package timeutils import ( "time" diff --git a/supernode/config/config.go b/supernode/config/config.go index b4e47a437..3210ac512 100644 --- a/supernode/config/config.go +++ b/supernode/config/config.go @@ -4,6 +4,7 @@ import ( "fmt" "path/filepath" "strings" + "time" "gopkg.in/yaml.v2" @@ -85,6 +86,7 @@ func NewBaseProperties() *BaseProperties { MaxBandwidth: 200, EnableProfiler: false, Debug: false, + FailAccessInterval: 3, } } @@ -167,6 +169,11 @@ type BaseProperties struct { // By default, the first non-loop address is advertised. AdvertiseIP string `yaml:"advertiseIP"` + // FailAccessInterval is the interval time after failed to access the URL. + // unit: minutes + // default: 3 + FailAccessInterval time.Duration `yaml:"failAccessInterval"` + // cIDPrefix s a prefix string used to indicate that the CID is supernode. cIDPrefix string diff --git a/supernode/daemon/mgr/cdn/cdn_util.go b/supernode/daemon/mgr/cdn/cdn_util.go index f8ad9e229..10fd587f8 100644 --- a/supernode/daemon/mgr/cdn/cdn_util.go +++ b/supernode/daemon/mgr/cdn/cdn_util.go @@ -4,10 +4,10 @@ import ( "fmt" "github.com/dragonflyoss/Dragonfly/apis/types" - "github.com/dragonflyoss/Dragonfly/pkg/time" + "github.com/dragonflyoss/Dragonfly/pkg/timeutils" ) -var getCurrentTimeMillisFunc = time.GetCurrentTimeMillis +var getCurrentTimeMillisFunc = timeutils.GetCurrentTimeMillis // getContentLengthByHeader calculates the piece content length by piece header. func getContentLengthByHeader(pieceHeader uint32) int32 { diff --git a/supernode/daemon/mgr/task/manager.go b/supernode/daemon/mgr/task/manager.go index f05ceba96..faf602a08 100644 --- a/supernode/daemon/mgr/task/manager.go +++ b/supernode/daemon/mgr/task/manager.go @@ -2,13 +2,14 @@ package task import ( "context" + "time" "github.com/dragonflyoss/Dragonfly/apis/types" "github.com/dragonflyoss/Dragonfly/pkg/errortypes" "github.com/dragonflyoss/Dragonfly/pkg/httputils" "github.com/dragonflyoss/Dragonfly/pkg/stringutils" "github.com/dragonflyoss/Dragonfly/pkg/syncmap" - "github.com/dragonflyoss/Dragonfly/pkg/time" + "github.com/dragonflyoss/Dragonfly/pkg/timeutils" "github.com/dragonflyoss/Dragonfly/supernode/config" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" dutil "github.com/dragonflyoss/Dragonfly/supernode/daemon/util" @@ -32,9 +33,10 @@ var getContentLength = httputils.GetContentLength type Manager struct { cfg *config.Config - taskStore *dutil.Store - taskLocker *util.LockerPool - accessTimeMap *syncmap.SyncMap + taskStore *dutil.Store + taskLocker *util.LockerPool + accessTimeMap *syncmap.SyncMap + taskURLUnReachableStore *syncmap.SyncMap peerMgr mgr.PeerMgr dfgetTaskMgr mgr.DfgetTaskMgr @@ -47,15 +49,16 @@ type Manager struct { func NewManager(cfg *config.Config, peerMgr mgr.PeerMgr, dfgetTaskMgr mgr.DfgetTaskMgr, progressMgr mgr.ProgressMgr, cdnMgr mgr.CDNMgr, schedulerMgr mgr.SchedulerMgr) (*Manager, error) { return &Manager{ - cfg: cfg, - taskStore: dutil.NewStore(), - taskLocker: util.NewLockerPool(), - peerMgr: peerMgr, - dfgetTaskMgr: dfgetTaskMgr, - progressMgr: progressMgr, - cdnMgr: cdnMgr, - schedulerMgr: schedulerMgr, - accessTimeMap: syncmap.NewSyncMap(), + cfg: cfg, + taskStore: dutil.NewStore(), + taskLocker: util.NewLockerPool(), + peerMgr: peerMgr, + dfgetTaskMgr: dfgetTaskMgr, + progressMgr: progressMgr, + cdnMgr: cdnMgr, + schedulerMgr: schedulerMgr, + accessTimeMap: syncmap.NewSyncMap(), + taskURLUnReachableStore: syncmap.NewSyncMap(), }, nil } @@ -67,7 +70,8 @@ func (tm *Manager) Register(ctx context.Context, req *types.TaskCreateRequest) ( } // Step2: add a new Task or update the exist task - task, err := tm.addOrUpdateTask(ctx, req) + failAccessInterval := tm.cfg.FailAccessInterval * time.Minute + task, err := tm.addOrUpdateTask(ctx, req, failAccessInterval) if err != nil { logrus.Infof("failed to add or update task with req %+v: %v", req, err) return nil, err @@ -76,7 +80,7 @@ func (tm *Manager) Register(ctx context.Context, req *types.TaskCreateRequest) ( // TODO: defer rollback the task update // update accessTime for taskID - if err := tm.accessTimeMap.Add(task.ID, time.GetCurrentTimeMillis()); err != nil { + if err := tm.accessTimeMap.Add(task.ID, timeutils.GetCurrentTimeMillis()); err != nil { logrus.Warnf("failed to update accessTime for taskID(%s): %v", task.ID, err) } @@ -181,7 +185,7 @@ func (tm *Manager) GetPieces(ctx context.Context, taskID, clientID string, req * logrus.Debugf("success to get task: %+v", task) // update accessTime for taskID - if err := tm.accessTimeMap.Add(task.ID, time.GetCurrentTimeMillis()); err != nil { + if err := tm.accessTimeMap.Add(task.ID, timeutils.GetCurrentTimeMillis()); err != nil { logrus.Warnf("failed to update accessTime for taskID(%s): %v", task.ID, err) } diff --git a/supernode/daemon/mgr/task/manager_util.go b/supernode/daemon/mgr/task/manager_util.go index 0c4ea7008..7b63d7c1c 100644 --- a/supernode/daemon/mgr/task/manager_util.go +++ b/supernode/daemon/mgr/task/manager_util.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net/http" + "time" "github.com/dragonflyoss/Dragonfly/apis/types" "github.com/dragonflyoss/Dragonfly/pkg/digest" @@ -35,13 +36,22 @@ import ( ) // addOrUpdateTask adds a new task or update the exist task to taskStore. -func (tm *Manager) addOrUpdateTask(ctx context.Context, req *types.TaskCreateRequest) (*types.TaskInfo, error) { +func (tm *Manager) addOrUpdateTask(ctx context.Context, req *types.TaskCreateRequest, failAccessInterval time.Duration) (*types.TaskInfo, error) { taskURL := req.TaskURL if stringutils.IsEmptyStr(req.TaskURL) { taskURL = netutils.FilterURLParam(req.RawURL, req.Filter) } taskID := generateTaskID(taskURL, req.Md5, req.Identifier) + if key, err := tm.taskURLUnReachableStore.Get(taskID); err == nil { + if unReachableStartTime, ok := key.(time.Time); ok && + time.Since(unReachableStartTime) < failAccessInterval { + return nil, errors.Wrapf(errortypes.ErrURLNotReachable, "cache taskID: %s, url: %s", taskID, req.RawURL) + } + + tm.taskURLUnReachableStore.Delete(taskID) + } + // using the existing task if it already exists corresponding to taskID var task *types.TaskInfo newTask := &types.TaskInfo{ @@ -74,7 +84,11 @@ func (tm *Manager) addOrUpdateTask(ctx context.Context, req *types.TaskCreateReq // get fileLength with req.Headers fileLength, err := getHTTPFileLength(taskID, task.RawURL, req.Headers) if err != nil { + if errortypes.IsURLNotReachable(err) { + tm.taskURLUnReachableStore.Add(taskID, time.Now()) + } logrus.Errorf("failed to get file length from http client for taskID(%s): %v", taskID, err) + return nil, err } task.HTTPFileLength = fileLength logrus.Infof("get file length %d from http client for taskID(%s)", fileLength, taskID) @@ -485,7 +499,7 @@ func isWait(CDNStatus string) bool { func getHTTPFileLength(taskID, url string, headers map[string]string) (int64, error) { fileLength, code, err := getContentLength(url, headers) if err != nil { - return -1, err + return -1, errors.Wrapf(errortypes.ErrUnknowError, "failed to get http file Length: %v", err) } if code == http.StatusUnauthorized || code == http.StatusProxyAuthRequired { @@ -493,6 +507,9 @@ func getHTTPFileLength(taskID, url string, headers map[string]string) (int64, er } if code != http.StatusOK { logrus.Warnf("failed to get http file length with unexpected code: %d", code) + if code == http.StatusNotFound { + return -1, errors.Wrapf(errortypes.ErrURLNotReachable, "taskID: %s, url: %s", taskID, url) + } return -1, nil } diff --git a/supernode/daemon/mgr/task/manager_util_test.go b/supernode/daemon/mgr/task/manager_util_test.go index 0c17a307f..3fb7cf944 100644 --- a/supernode/daemon/mgr/task/manager_util_test.go +++ b/supernode/daemon/mgr/task/manager_util_test.go @@ -17,10 +17,7 @@ package task import ( - "context" - "github.com/dragonflyoss/Dragonfly/apis/types" - "github.com/dragonflyoss/Dragonfly/pkg/util" "github.com/dragonflyoss/Dragonfly/supernode/config" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/mock" @@ -65,19 +62,22 @@ func (s *TaskUtilTestSuite) TearDownSuite(c *check.C) { s.contentLengthStub.Reset() } -func (s *TaskUtilTestSuite) TestAddOrUpdateTask(c *check.C) { +func (s *TaskUtilTestSuite) TestEqualsTask(c *check.C) { var cases = []struct { - req *types.TaskCreateRequest - task *types.TaskInfo - errNil bool + existTask *types.TaskInfo + task *types.TaskInfo + result bool }{ { - req: &types.TaskCreateRequest{ - CID: "cid", - CallSystem: "foo", - Dfdaemon: true, - Path: "/peer/file/foo", - RawURL: "http://aa.bb.com", + existTask: &types.TaskInfo{ + ID: generateTaskID("http://aa.bb.com", "", ""), + CdnStatus: types.TaskInfoCdnStatusRUNNING, + HTTPFileLength: 1000, + PieceSize: config.DefaultPieceSize, + PieceTotal: 1, + RawURL: "http://aa.bb.com?page=1", + TaskURL: "http://aa.bb.com", + Md5: "fooMD5", }, task: &types.TaskInfo{ ID: generateTaskID("http://aa.bb.com", "", ""), @@ -87,17 +87,22 @@ func (s *TaskUtilTestSuite) TestAddOrUpdateTask(c *check.C) { PieceTotal: 1, RawURL: "http://aa.bb.com", TaskURL: "http://aa.bb.com", + Md5: "fooMD5", }, - errNil: true, + result: true, }, { - req: &types.TaskCreateRequest{ - CID: "cid2", - CallSystem: "foo2", - Dfdaemon: false, - Path: "/peer/file/foo2", - RawURL: "http://aa.bb.com", - Headers: map[string]string{"aaa": "bbb"}, + + existTask: &types.TaskInfo{ + ID: generateTaskID("http://aa.bb.com", "", ""), + CdnStatus: types.TaskInfoCdnStatusWAITING, + HTTPFileLength: 1000, + PieceSize: config.DefaultPieceSize, + PieceTotal: 1, + RawURL: "http://aa.bb.com", + TaskURL: "http://aa.bb.com", + Headers: map[string]string{"aaa": "bbb"}, + Md5: "fooMD5", }, task: &types.TaskInfo{ ID: generateTaskID("http://aa.bb.com", "", ""), @@ -108,16 +113,14 @@ func (s *TaskUtilTestSuite) TestAddOrUpdateTask(c *check.C) { RawURL: "http://aa.bb.com", TaskURL: "http://aa.bb.com", Headers: map[string]string{"aaa": "bbb"}, + Md5: "otherMD5", }, - errNil: true, + result: false, }, } for _, v := range cases { - task, err := s.taskManager.addOrUpdateTask(context.Background(), v.req) - c.Check(util.IsNil(err), check.Equals, v.errNil) - taskInfo, err := s.taskManager.getTask(task.ID) - c.Check(err, check.IsNil) - c.Check(taskInfo, check.DeepEquals, v.task) + result := equalsTask(v.existTask, v.task) + c.Check(result, check.DeepEquals, v.result) } } diff --git a/supernode/server/result_info.go b/supernode/server/result_info.go index 65cc779de..37f4c63b5 100644 --- a/supernode/server/result_info.go +++ b/supernode/server/result_info.go @@ -34,6 +34,10 @@ func NewResultInfoWithError(err error) ResultInfo { return NewResultInfoWithCodeError(constants.CodePeerContinue, err) } + if errortypes.IsURLNotReachable(err) { + return NewResultInfoWithCodeError(constants.CodeURLNotReachable, err) + } + // IsConvertFailed return NewResultInfoWithCodeError(constants.CodeSystemError, err) }