Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
Merge pull request #739 from Starnop/supernode-not-found
Browse files Browse the repository at this point in the history
bugfix: return error when the url is unreachable
  • Loading branch information
lowzj authored Aug 2, 2019
2 parents 4b2a3fc + fc4099b commit 5fc9635
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 49 deletions.
3 changes: 2 additions & 1 deletion dfget/core/regist/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ func (s *supernodeRegister) Register(peerPort int) (*RegisterResult, *errortypes
logrus.Errorf("register to node:%s error:%v", nodes[i], e)
continue
}
if resp.Code == constants.Success || resp.Code == constants.CodeNeedAuth {
if resp.Code == constants.Success || resp.Code == constants.CodeNeedAuth ||
resp.Code == constants.CodeURLNotReachable {
break
}
if resp.Code == constants.CodeWaitAuth && retryTimes < 3 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/time/time.go → pkg/timeutils/time_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package time
package timeutils

import (
"time"
Expand Down
7 changes: 7 additions & 0 deletions supernode/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"path/filepath"
"strings"
"time"

"gopkg.in/yaml.v2"

Expand Down Expand Up @@ -85,6 +86,7 @@ func NewBaseProperties() *BaseProperties {
MaxBandwidth: 200,
EnableProfiler: false,
Debug: false,
FailAccessInterval: 3,
}
}

Expand Down Expand Up @@ -167,6 +169,11 @@ type BaseProperties struct {
// By default, the first non-loop address is advertised.
AdvertiseIP string `yaml:"advertiseIP"`

// FailAccessInterval is the interval time after failed to access the URL.
// unit: minutes
// default: 3
FailAccessInterval time.Duration `yaml:"failAccessInterval"`

// cIDPrefix s a prefix string used to indicate that the CID is supernode.
cIDPrefix string

Expand Down
4 changes: 2 additions & 2 deletions supernode/daemon/mgr/cdn/cdn_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"fmt"

"github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/pkg/time"
"github.com/dragonflyoss/Dragonfly/pkg/timeutils"
)

var getCurrentTimeMillisFunc = time.GetCurrentTimeMillis
var getCurrentTimeMillisFunc = timeutils.GetCurrentTimeMillis

// getContentLengthByHeader calculates the piece content length by piece header.
func getContentLengthByHeader(pieceHeader uint32) int32 {
Expand Down
36 changes: 20 additions & 16 deletions supernode/daemon/mgr/task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package task

import (
"context"
"time"

"github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
"github.com/dragonflyoss/Dragonfly/pkg/httputils"
"github.com/dragonflyoss/Dragonfly/pkg/stringutils"
"github.com/dragonflyoss/Dragonfly/pkg/syncmap"
"github.com/dragonflyoss/Dragonfly/pkg/time"
"github.com/dragonflyoss/Dragonfly/pkg/timeutils"
"github.com/dragonflyoss/Dragonfly/supernode/config"
"github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr"
dutil "github.com/dragonflyoss/Dragonfly/supernode/daemon/util"
Expand All @@ -32,9 +33,10 @@ var getContentLength = httputils.GetContentLength
type Manager struct {
cfg *config.Config

taskStore *dutil.Store
taskLocker *util.LockerPool
accessTimeMap *syncmap.SyncMap
taskStore *dutil.Store
taskLocker *util.LockerPool
accessTimeMap *syncmap.SyncMap
taskURLUnReachableStore *syncmap.SyncMap

peerMgr mgr.PeerMgr
dfgetTaskMgr mgr.DfgetTaskMgr
Expand All @@ -47,15 +49,16 @@ type Manager struct {
func NewManager(cfg *config.Config, peerMgr mgr.PeerMgr, dfgetTaskMgr mgr.DfgetTaskMgr,
progressMgr mgr.ProgressMgr, cdnMgr mgr.CDNMgr, schedulerMgr mgr.SchedulerMgr) (*Manager, error) {
return &Manager{
cfg: cfg,
taskStore: dutil.NewStore(),
taskLocker: util.NewLockerPool(),
peerMgr: peerMgr,
dfgetTaskMgr: dfgetTaskMgr,
progressMgr: progressMgr,
cdnMgr: cdnMgr,
schedulerMgr: schedulerMgr,
accessTimeMap: syncmap.NewSyncMap(),
cfg: cfg,
taskStore: dutil.NewStore(),
taskLocker: util.NewLockerPool(),
peerMgr: peerMgr,
dfgetTaskMgr: dfgetTaskMgr,
progressMgr: progressMgr,
cdnMgr: cdnMgr,
schedulerMgr: schedulerMgr,
accessTimeMap: syncmap.NewSyncMap(),
taskURLUnReachableStore: syncmap.NewSyncMap(),
}, nil
}

Expand All @@ -67,7 +70,8 @@ func (tm *Manager) Register(ctx context.Context, req *types.TaskCreateRequest) (
}

// Step2: add a new Task or update the exist task
task, err := tm.addOrUpdateTask(ctx, req)
failAccessInterval := tm.cfg.FailAccessInterval * time.Minute
task, err := tm.addOrUpdateTask(ctx, req, failAccessInterval)
if err != nil {
logrus.Infof("failed to add or update task with req %+v: %v", req, err)
return nil, err
Expand All @@ -76,7 +80,7 @@ func (tm *Manager) Register(ctx context.Context, req *types.TaskCreateRequest) (
// TODO: defer rollback the task update

// update accessTime for taskID
if err := tm.accessTimeMap.Add(task.ID, time.GetCurrentTimeMillis()); err != nil {
if err := tm.accessTimeMap.Add(task.ID, timeutils.GetCurrentTimeMillis()); err != nil {
logrus.Warnf("failed to update accessTime for taskID(%s): %v", task.ID, err)
}

Expand Down Expand Up @@ -181,7 +185,7 @@ func (tm *Manager) GetPieces(ctx context.Context, taskID, clientID string, req *
logrus.Debugf("success to get task: %+v", task)

// update accessTime for taskID
if err := tm.accessTimeMap.Add(task.ID, time.GetCurrentTimeMillis()); err != nil {
if err := tm.accessTimeMap.Add(task.ID, timeutils.GetCurrentTimeMillis()); err != nil {
logrus.Warnf("failed to update accessTime for taskID(%s): %v", task.ID, err)
}

Expand Down
21 changes: 19 additions & 2 deletions supernode/daemon/mgr/task/manager_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"net/http"
"time"

"github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/pkg/digest"
Expand All @@ -35,13 +36,22 @@ import (
)

// addOrUpdateTask adds a new task or update the exist task to taskStore.
func (tm *Manager) addOrUpdateTask(ctx context.Context, req *types.TaskCreateRequest) (*types.TaskInfo, error) {
func (tm *Manager) addOrUpdateTask(ctx context.Context, req *types.TaskCreateRequest, failAccessInterval time.Duration) (*types.TaskInfo, error) {
taskURL := req.TaskURL
if stringutils.IsEmptyStr(req.TaskURL) {
taskURL = netutils.FilterURLParam(req.RawURL, req.Filter)
}
taskID := generateTaskID(taskURL, req.Md5, req.Identifier)

if key, err := tm.taskURLUnReachableStore.Get(taskID); err == nil {
if unReachableStartTime, ok := key.(time.Time); ok &&
time.Since(unReachableStartTime) < failAccessInterval {
return nil, errors.Wrapf(errortypes.ErrURLNotReachable, "cache taskID: %s, url: %s", taskID, req.RawURL)
}

tm.taskURLUnReachableStore.Delete(taskID)
}

// using the existing task if it already exists corresponding to taskID
var task *types.TaskInfo
newTask := &types.TaskInfo{
Expand Down Expand Up @@ -74,7 +84,11 @@ func (tm *Manager) addOrUpdateTask(ctx context.Context, req *types.TaskCreateReq
// get fileLength with req.Headers
fileLength, err := getHTTPFileLength(taskID, task.RawURL, req.Headers)
if err != nil {
if errortypes.IsURLNotReachable(err) {
tm.taskURLUnReachableStore.Add(taskID, time.Now())
}
logrus.Errorf("failed to get file length from http client for taskID(%s): %v", taskID, err)
return nil, err
}
task.HTTPFileLength = fileLength
logrus.Infof("get file length %d from http client for taskID(%s)", fileLength, taskID)
Expand Down Expand Up @@ -485,14 +499,17 @@ func isWait(CDNStatus string) bool {
func getHTTPFileLength(taskID, url string, headers map[string]string) (int64, error) {
fileLength, code, err := getContentLength(url, headers)
if err != nil {
return -1, err
return -1, errors.Wrapf(errortypes.ErrUnknowError, "failed to get http file Length: %v", err)
}

if code == http.StatusUnauthorized || code == http.StatusProxyAuthRequired {
return -1, errors.Wrapf(errortypes.ErrAuthenticationRequired, "taskID: %s,code: %d", taskID, code)
}
if code != http.StatusOK {
logrus.Warnf("failed to get http file length with unexpected code: %d", code)
if code == http.StatusNotFound {
return -1, errors.Wrapf(errortypes.ErrURLNotReachable, "taskID: %s, url: %s", taskID, url)
}
return -1, nil
}

Expand Down
57 changes: 30 additions & 27 deletions supernode/daemon/mgr/task/manager_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
package task

import (
"context"

"github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/pkg/util"
"github.com/dragonflyoss/Dragonfly/supernode/config"
"github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/mock"

Expand Down Expand Up @@ -65,19 +62,22 @@ func (s *TaskUtilTestSuite) TearDownSuite(c *check.C) {
s.contentLengthStub.Reset()
}

func (s *TaskUtilTestSuite) TestAddOrUpdateTask(c *check.C) {
func (s *TaskUtilTestSuite) TestEqualsTask(c *check.C) {
var cases = []struct {
req *types.TaskCreateRequest
task *types.TaskInfo
errNil bool
existTask *types.TaskInfo
task *types.TaskInfo
result bool
}{
{
req: &types.TaskCreateRequest{
CID: "cid",
CallSystem: "foo",
Dfdaemon: true,
Path: "/peer/file/foo",
RawURL: "http://aa.bb.com",
existTask: &types.TaskInfo{
ID: generateTaskID("http://aa.bb.com", "", ""),
CdnStatus: types.TaskInfoCdnStatusRUNNING,
HTTPFileLength: 1000,
PieceSize: config.DefaultPieceSize,
PieceTotal: 1,
RawURL: "http://aa.bb.com?page=1",
TaskURL: "http://aa.bb.com",
Md5: "fooMD5",
},
task: &types.TaskInfo{
ID: generateTaskID("http://aa.bb.com", "", ""),
Expand All @@ -87,17 +87,22 @@ func (s *TaskUtilTestSuite) TestAddOrUpdateTask(c *check.C) {
PieceTotal: 1,
RawURL: "http://aa.bb.com",
TaskURL: "http://aa.bb.com",
Md5: "fooMD5",
},
errNil: true,
result: true,
},
{
req: &types.TaskCreateRequest{
CID: "cid2",
CallSystem: "foo2",
Dfdaemon: false,
Path: "/peer/file/foo2",
RawURL: "http://aa.bb.com",
Headers: map[string]string{"aaa": "bbb"},

existTask: &types.TaskInfo{
ID: generateTaskID("http://aa.bb.com", "", ""),
CdnStatus: types.TaskInfoCdnStatusWAITING,
HTTPFileLength: 1000,
PieceSize: config.DefaultPieceSize,
PieceTotal: 1,
RawURL: "http://aa.bb.com",
TaskURL: "http://aa.bb.com",
Headers: map[string]string{"aaa": "bbb"},
Md5: "fooMD5",
},
task: &types.TaskInfo{
ID: generateTaskID("http://aa.bb.com", "", ""),
Expand All @@ -108,16 +113,14 @@ func (s *TaskUtilTestSuite) TestAddOrUpdateTask(c *check.C) {
RawURL: "http://aa.bb.com",
TaskURL: "http://aa.bb.com",
Headers: map[string]string{"aaa": "bbb"},
Md5: "otherMD5",
},
errNil: true,
result: false,
},
}

for _, v := range cases {
task, err := s.taskManager.addOrUpdateTask(context.Background(), v.req)
c.Check(util.IsNil(err), check.Equals, v.errNil)
taskInfo, err := s.taskManager.getTask(task.ID)
c.Check(err, check.IsNil)
c.Check(taskInfo, check.DeepEquals, v.task)
result := equalsTask(v.existTask, v.task)
c.Check(result, check.DeepEquals, v.result)
}
}
4 changes: 4 additions & 0 deletions supernode/server/result_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func NewResultInfoWithError(err error) ResultInfo {
return NewResultInfoWithCodeError(constants.CodePeerContinue, err)
}

if errortypes.IsURLNotReachable(err) {
return NewResultInfoWithCodeError(constants.CodeURLNotReachable, err)
}

// IsConvertFailed
return NewResultInfoWithCodeError(constants.CodeSystemError, err)
}
Expand Down

0 comments on commit 5fc9635

Please sign in to comment.