Skip to content

Commit

Permalink
fix: infinitely get pieces when piece num is invalid (dragonflyoss#926)
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Ma <[email protected]>
  • Loading branch information
jim3ma authored Dec 15, 2021
1 parent 464da9f commit 1d8b63c
Show file tree
Hide file tree
Showing 13 changed files with 147 additions and 127 deletions.
1 change: 1 addition & 0 deletions client/config/peerhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ type DownloadOption struct {
PeerGRPC ListenOption `mapstructure:"peerGRPC" yaml:"peerGRPC"`
CalculateDigest bool `mapstructure:"calculateDigest" yaml:"calculateDigest"`
TransportOption *TransportOption `mapstructure:"transportOption" yaml:"transportOption"`
GetPiecesMaxRetry int `mapstructure:"getPiecesMaxRetry" yaml:"getPiecesMaxRetry"`
}

type TransportOption struct {
Expand Down
1 change: 1 addition & 0 deletions client/config/peerhost_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var peerHostConfig = DaemonOption{
Download: DownloadOption{
CalculateDigest: true,
PieceDownloadTimeout: 30 * time.Second,
GetPiecesMaxRetry: 100,
TotalRateLimit: clientutil.RateLimit{
Limit: rate.Limit(DefaultTotalDownloadLimit),
},
Expand Down
1 change: 1 addition & 0 deletions client/config/peerhost_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var peerHostConfig = DaemonOption{
Download: DownloadOption{
CalculateDigest: true,
PieceDownloadTimeout: 30 * time.Second,
GetPiecesMaxRetry: 100,
TotalRateLimit: clientutil.RateLimit{
Limit: rate.Limit(DefaultTotalDownloadLimit),
},
Expand Down
2 changes: 1 addition & 1 deletion client/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) {
return nil, err
}
peerTaskManager, err := peer.NewPeerTaskManager(host, pieceManager, storageManager, sched, opt.Scheduler,
opt.Download.PerPeerRateLimit.Limit, opt.Storage.Multiplex, opt.Download.CalculateDigest)
opt.Download.PerPeerRateLimit.Limit, opt.Storage.Multiplex, opt.Download.CalculateDigest, opt.Download.GetPiecesMaxRetry)
if err != nil {
return nil, err
}
Expand Down
118 changes: 78 additions & 40 deletions client/daemon/peer/peertask_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ type peerTask struct {
peerPacketReady chan bool
// pieceParallelCount stands the piece parallel count from peerPacket
pieceParallelCount *atomic.Int32
// getPiecesMaxRetry stands max retry to get pieces from one peer packet
getPiecesMaxRetry int

// done channel will be close when peer task is finished
done chan struct{}
Expand Down Expand Up @@ -502,8 +504,8 @@ loop:

func (pt *peerTask) init(piecePacket *base.PiecePacket, pieceBufferSize uint32) (chan *DownloadPieceRequest, bool) {
pt.contentLength.Store(piecePacket.ContentLength)
if pt.contentLength.Load() > 0 {
pt.span.SetAttributes(config.AttributeTaskContentLength.Int64(pt.contentLength.Load()))
if piecePacket.ContentLength > 0 {
pt.span.SetAttributes(config.AttributeTaskContentLength.Int64(piecePacket.ContentLength))
}
if err := pt.callback.Init(pt); err != nil {
pt.span.RecordError(err)
Expand Down Expand Up @@ -579,7 +581,7 @@ func (pt *peerTask) waitAvailablePeerPacket() (int32, bool) {
// preparePieceTasksByPeer func already send piece result with error
pt.Infof("new peer client ready, main peer: %s", pt.peerPacket.Load().(*scheduler.PeerPacket).MainPeer)
// research from piece 0
return pt.getNextPieceNum(0), true
return 0, true
}
// when scheduler says base.Code_SchedNeedBackSource, receivePeerPacket will close pt.peerPacketReady
pt.Infof("start download from source due to base.Code_SchedNeedBackSource")
Expand All @@ -605,6 +607,7 @@ func (pt *peerTask) waitAvailablePeerPacket() (int32, bool) {
}

func (pt *peerTask) dispatchPieceRequest(pieceRequestCh chan *DownloadPieceRequest, piecePacket *base.PiecePacket) {
pt.Debugf("dispatch piece request, piece count: %d", len(piecePacket.PieceInfos))
for _, piece := range piecePacket.PieceInfos {
pt.Infof("get piece %d from %s/%s, md5: %s, start: %d, size: %d",
piece.PieceNum, piecePacket.DstAddr, piecePacket.DstPid, piece.PieceMd5, piece.RangeStart, piece.RangeSize)
Expand Down Expand Up @@ -721,7 +724,9 @@ func (pt *peerTask) isCompleted() bool {

func (pt *peerTask) preparePieceTasks(request *base.PieceTaskRequest) (p *base.PiecePacket, err error) {
defer pt.recoverFromPanic()
var retryCount int
prepare:
retryCount++
peerPacket := pt.peerPacket.Load().(*scheduler.PeerPacket)
pt.pieceParallelCount.Store(peerPacket.ParallelCount)
request.DstPid = peerPacket.MainPeer.PeerId
Expand All @@ -730,6 +735,10 @@ prepare:
return
}
if err == errPeerPacketChanged {
if pt.getPiecesMaxRetry > 0 && retryCount > pt.getPiecesMaxRetry {
err = fmt.Errorf("get pieces max retry count reached")
return
}
goto prepare
}
for _, peer := range peerPacket.StealPeers {
Expand All @@ -739,6 +748,10 @@ prepare:
return
}
if err == errPeerPacketChanged {
if pt.getPiecesMaxRetry > 0 && retryCount > pt.getPiecesMaxRetry {
err = fmt.Errorf("get pieces max retry count reached")
return
}
goto prepare
}
}
Expand Down Expand Up @@ -816,51 +829,75 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer
count int
)
p, _, err := retry.Run(pt.ctx, func() (interface{}, bool, error) {
pp, getErr := dfclient.GetPieceTasks(pt.ctx, peer, request)
piecePacket, getError := dfclient.GetPieceTasks(pt.ctx, peer, request)
// when GetPieceTasks returns err, exit retry
if getErr != nil {
span.RecordError(getErr)
// fast way to exit retry
lastPeerPacket := pt.peerPacket.Load().(*scheduler.PeerPacket)
if curPeerPacket.MainPeer.PeerId != lastPeerPacket.MainPeer.PeerId {
pt.Warnf("get piece tasks with error: %s, but peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s", getErr,
curPeerPacket.MainPeer.PeerId, lastPeerPacket.MainPeer.PeerId)
peerPacketChanged = true
return nil, true, nil
}
return nil, true, getErr
}
// by santong: when peer return empty, retry later
if len(pp.PieceInfos) == 0 {
count++
er := pt.peerPacketStream.Send(&scheduler.PieceResult{
TaskId: pt.taskID,
SrcPid: pt.peerID,
DstPid: peer.PeerId,
PieceInfo: &base.PieceInfo{},
Success: false,
Code: base.Code_ClientWaitPieceReady,
HostLoad: nil,
FinishedCount: pt.readyPieces.Settled(),
})
if er != nil {
span.RecordError(er)
pt.Errorf("send piece result with base.Code_ClientWaitPieceReady error: %s", er)
if getError != nil {
pt.Errorf("get piece tasks with error: %s", getError)
span.RecordError(getError)

// fast way 1 to exit retry
if de, ok := getError.(*dferrors.DfError); ok {
pt.Debugf("get piece task with grpc error, code: %d", de.Code)
// bad request, like invalid piece num, just exit
if de.Code == base.Code_BadRequest {
span.AddEvent("bad request")
pt.Warnf("get piece task from peer %s canceled: %s", peer.PeerId, getError)
return nil, true, getError
}
}
// fast way to exit retry

// fast way 2 to exit retry
lastPeerPacket := pt.peerPacket.Load().(*scheduler.PeerPacket)
if curPeerPacket.MainPeer.PeerId != lastPeerPacket.MainPeer.PeerId {
pt.Warnf("get empty pieces and peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s",
pt.Warnf("get piece tasks with error: %s, but peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s", getError,
curPeerPacket.MainPeer.PeerId, lastPeerPacket.MainPeer.PeerId)
peerPacketChanged = true
return nil, true, nil
}
span.AddEvent("retry due to empty pieces",
trace.WithAttributes(config.AttributeGetPieceRetry.Int(count)))
pt.Infof("peer %s returns success but with empty pieces, retry later", peer.PeerId)
return nil, false, dferrors.ErrEmptyValue
return nil, true, getError
}
return pp, false, nil
// got any pieces
if len(piecePacket.PieceInfos) > 0 {
return piecePacket, false, nil
}
// need update metadata
if piecePacket.ContentLength > pt.contentLength.Load() || piecePacket.TotalPiece > pt.totalPiece {
return piecePacket, false, nil
}
// invalid request num
if piecePacket.TotalPiece > -1 && uint32(piecePacket.TotalPiece) <= request.StartNum {
pt.Warnf("invalid start num: %d, total piece: %d", request.StartNum, piecePacket.TotalPiece)
return piecePacket, false, nil
}

// by santong: when peer return empty, retry later
sendError := pt.peerPacketStream.Send(&scheduler.PieceResult{
TaskId: pt.taskID,
SrcPid: pt.peerID,
DstPid: peer.PeerId,
PieceInfo: &base.PieceInfo{},
Success: false,
Code: base.Code_ClientWaitPieceReady,
HostLoad: nil,
FinishedCount: pt.readyPieces.Settled(),
})
if sendError != nil {
span.RecordError(sendError)
pt.Errorf("send piece result with base.Code_ClientWaitPieceReady error: %s", sendError)
}
// fast way to exit retry
lastPeerPacket := pt.peerPacket.Load().(*scheduler.PeerPacket)
if curPeerPacket.MainPeer.PeerId != lastPeerPacket.MainPeer.PeerId {
pt.Warnf("get empty pieces and peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s",
curPeerPacket.MainPeer.PeerId, lastPeerPacket.MainPeer.PeerId)
peerPacketChanged = true
return nil, true, nil
}
count++
span.AddEvent("retry due to empty pieces",
trace.WithAttributes(config.AttributeGetPieceRetry.Int(count)))
pt.Infof("peer %s returns success but with empty pieces, retry later", peer.PeerId)
return nil, false, dferrors.ErrEmptyValue
}, 0.05, 0.2, 40, nil)
if peerPacketChanged {
return nil, errPeerPacketChanged
Expand All @@ -877,13 +914,14 @@ func (pt *peerTask) getNextPieceNum(cur int32) int32 {
return -1
}
i := cur
// try to find next not requested piece
for ; pt.requestedPieces.IsSet(i); i++ {
}
if pt.totalPiece > 0 && i >= pt.totalPiece {
// double check, re-search not success or not requested pieces
for i = int32(0); pt.requestedPieces.IsSet(i); i++ {
}
if pt.totalPiece > 0 && i >= int32(pt.totalPiece) {
if pt.totalPiece > 0 && i >= pt.totalPiece {
return -1
}
}
Expand Down
4 changes: 3 additions & 1 deletion client/daemon/peer/peertask_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ func newFilePeerTask(ctx context.Context,
request *FilePeerTaskRequest,
schedulerClient schedulerclient.SchedulerClient,
schedulerOption config.SchedulerOption,
perPeerRateLimit rate.Limit) (context.Context, *filePeerTask, *TinyData, error) {
perPeerRateLimit rate.Limit,
getPiecesMaxRetry int) (context.Context, *filePeerTask, *TinyData, error) {
ctx, span := tracer.Start(ctx, config.SpanFilePeerTask, trace.WithSpanKind(trace.SpanKindClient))
span.SetAttributes(config.AttributePeerHost.String(host.Uuid))
span.SetAttributes(semconv.NetHostIPKey.String(host.Ip))
Expand Down Expand Up @@ -196,6 +197,7 @@ func newFilePeerTask(ctx context.Context,
contentLength: atomic.NewInt64(-1),
pieceParallelCount: atomic.NewInt32(0),
totalPiece: -1,
getPiecesMaxRetry: getPiecesMaxRetry,
schedulerOption: schedulerOption,
schedulerClient: schedulerClient,
limiter: limiter,
Expand Down
4 changes: 2 additions & 2 deletions client/daemon/peer/peertask_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestFilePeerTask_BackSource_WithContentLength(t *testing.T) {
req,
ptm.schedulerClient,
ptm.schedulerOption,
0)
0, 10)
assert.Nil(err, "new file peer task")
pt.needBackSource = true

Expand Down Expand Up @@ -261,7 +261,7 @@ func TestFilePeerTask_BackSource_WithoutContentLength(t *testing.T) {
req,
ptm.schedulerClient,
ptm.schedulerOption,
0)
0, 10)
assert.Nil(err, "new file peer task")
pt.needBackSource = true

Expand Down
29 changes: 16 additions & 13 deletions client/daemon/peer/peertask_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ type peerTaskManager struct {
enableMultiplex bool

calculateDigest bool

getPiecesMaxRetry int
}

func NewPeerTaskManager(
Expand All @@ -127,18 +129,20 @@ func NewPeerTaskManager(
schedulerOption config.SchedulerOption,
perPeerRateLimit rate.Limit,
multiplex bool,
calculateDigest bool) (TaskManager, error) {
calculateDigest bool,
getPiecesMaxRetry int) (TaskManager, error) {

ptm := &peerTaskManager{
host: host,
runningPeerTasks: sync.Map{},
pieceManager: pieceManager,
storageManager: storageManager,
schedulerClient: schedulerClient,
schedulerOption: schedulerOption,
perPeerRateLimit: perPeerRateLimit,
enableMultiplex: multiplex,
calculateDigest: calculateDigest,
host: host,
runningPeerTasks: sync.Map{},
pieceManager: pieceManager,
storageManager: storageManager,
schedulerClient: schedulerClient,
schedulerOption: schedulerOption,
perPeerRateLimit: perPeerRateLimit,
enableMultiplex: multiplex,
calculateDigest: calculateDigest,
getPiecesMaxRetry: getPiecesMaxRetry,
}
return ptm, nil
}
Expand All @@ -159,7 +163,7 @@ func (ptm *peerTaskManager) StartFilePeerTask(ctx context.Context, req *FilePeer
limit = rate.Limit(req.Limit)
}
ctx, pt, tiny, err := newFilePeerTask(ctx, ptm.host, ptm.pieceManager,
req, ptm.schedulerClient, ptm.schedulerOption, limit)
req, ptm.schedulerClient, ptm.schedulerOption, limit, ptm.getPiecesMaxRetry)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -217,8 +221,7 @@ func (ptm *peerTaskManager) StartStreamPeerTask(ctx context.Context, req *schedu
}

start := time.Now()
ctx, pt, tiny, err := newStreamPeerTask(ctx, ptm.host, ptm.pieceManager,
req, ptm.schedulerClient, ptm.schedulerOption, ptm.perPeerRateLimit)
ctx, pt, tiny, err := newStreamPeerTask(ctx, ptm, req)
if err != nil {
return nil, nil, err
}
Expand Down
Loading

0 comments on commit 1d8b63c

Please sign in to comment.