Skip to content

Commit

Permalink
fix: send piece result error not handled (dragonflyoss#987)
Browse files Browse the repository at this point in the history
* fix: send piece result error not handled
* chore: optimize daemon cancel logic
* chore: remove redundant log and ctx.Done check
* chore: handle piece download with 404 status

Signed-off-by: Jim Ma <[email protected]>
  • Loading branch information
jim3ma authored Jan 13, 2022
1 parent c8205ed commit 0b1b803
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 127 deletions.
246 changes: 135 additions & 111 deletions client/daemon/peer/peertask_conductor.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion client/daemon/peer/peertask_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type Task interface {
GetPieceMd5Sign() string

PublishPieceInfo(pieceNum int32, size uint32)
ReportPieceResult(request *DownloadPieceRequest, result *DownloadPieceResult, success bool)
ReportPieceResult(request *DownloadPieceRequest, result *DownloadPieceResult, err error)
}

type Logger interface {
Expand Down
8 changes: 4 additions & 4 deletions client/daemon/peer/peertask_manager_mock_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 31 additions & 2 deletions client/daemon/peer/piece_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,35 @@ type pieceDownloader struct {
httpClient *http.Client
}

type pieceDownloadError struct {
connectionError bool
status string
statusCode int
target string
err error
}

func isConnectionError(err error) bool {
if e, ok := err.(*pieceDownloadError); ok {
return e.connectionError
}
return false
}

func isPieceNotFound(err error) bool {
if e, ok := err.(*pieceDownloadError); ok {
return e.statusCode == http.StatusNotFound
}
return false
}

func (e *pieceDownloadError) Error() string {
if e.connectionError {
return fmt.Sprintf("connect with %s with error: %s", e.target, e.err)
}
return fmt.Sprintf("download %s with error status: %s", e.target, e.status)
}

var _ PieceDownloader = (*pieceDownloader)(nil)

var defaultTransport http.RoundTripper = &http.Transport{
Expand Down Expand Up @@ -107,12 +136,12 @@ func (p *pieceDownloader) DownloadPiece(ctx context.Context, d *DownloadPieceReq
if err != nil {
logger.Errorf("task id: %s, piece num: %d, dst: %s, download piece failed: %s",
d.TaskID, d.piece.PieceNum, d.DstAddr, err)
return nil, nil, err
return nil, nil, &pieceDownloadError{err: err, connectionError: true}
}
if resp.StatusCode > 299 {
_, _ = io.Copy(io.Discard, resp.Body)
_ = resp.Body.Close()
return nil, nil, fmt.Errorf("download piece failed with http code: %s", resp.Status)
return nil, nil, &pieceDownloadError{err: err, connectionError: false, status: resp.Status, statusCode: resp.StatusCode}
}
r := resp.Body.(io.Reader)
c := resp.Body.(io.Closer)
Expand Down
16 changes: 8 additions & 8 deletions client/daemon/peer/piece_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,13 +392,13 @@ func (pm *pieceManager) downloadKnownLengthSource(ctx context.Context, pt Task,
}
if err != nil {
log.Errorf("download piece %d error: %s", pieceNum, err)
pt.ReportPieceResult(request, result, false)
pt.ReportPieceResult(request, result, err)
return err
}

if result.Size != int64(size) {
log.Errorf("download piece %d size not match, desired: %d, actual: %d", pieceNum, size, result.Size)
pt.ReportPieceResult(request, result, false)
pt.ReportPieceResult(request, result, err)
return storage.ErrShortRead
}

Expand All @@ -415,12 +415,12 @@ func (pm *pieceManager) downloadKnownLengthSource(ctx context.Context, pt Task,
})
if err != nil {
log.Errorf("update task failed %s", err)
pt.ReportPieceResult(request, result, false)
pt.ReportPieceResult(request, result, err)
return err
}
}
pt.PublishPieceInfo(pieceNum, uint32(result.Size))
pt.ReportPieceResult(request, result, true)
pt.ReportPieceResult(request, result, nil)
}

log.Infof("download from source ok")
Expand Down Expand Up @@ -452,7 +452,7 @@ func (pm *pieceManager) downloadUnknownLengthSource(ctx context.Context, pt Task
},
}
if err != nil {
pt.ReportPieceResult(request, result, false)
pt.ReportPieceResult(request, result, err)
log.Errorf("download piece %d error: %s", pieceNum, err)
return err
}
Expand All @@ -462,7 +462,7 @@ func (pm *pieceManager) downloadUnknownLengthSource(ctx context.Context, pt Task
} else if result.Size > int64(size) {
err = fmt.Errorf("piece %d size %d should not great than %d", pieceNum, result.Size, size)
log.Errorf(err.Error())
pt.ReportPieceResult(request, result, false)
pt.ReportPieceResult(request, result, err)
return err
}

Expand All @@ -480,11 +480,11 @@ func (pm *pieceManager) downloadUnknownLengthSource(ctx context.Context, pt Task
})
if err != nil {
log.Errorf("update task failed %s", err)
pt.ReportPieceResult(request, result, false)
pt.ReportPieceResult(request, result, err)
return err
}
pt.PublishPieceInfo(pieceNum, uint32(result.Size))
pt.ReportPieceResult(request, result, true)
pt.ReportPieceResult(request, result, nil)
break
}

Expand Down
2 changes: 1 addition & 1 deletion client/daemon/peer/piece_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestPieceManager_DownloadSource(t *testing.T) {
})
mockPeerTask.EXPECT().AddTraffic(gomock.Any()).AnyTimes().DoAndReturn(func(int642 uint64) {})
mockPeerTask.EXPECT().ReportPieceResult(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
func(request *DownloadPieceRequest, result *DownloadPieceResult, success bool) {
func(*DownloadPieceRequest, *DownloadPieceResult, error) {

})
mockPeerTask.EXPECT().PublishPieceInfo(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
Expand Down
2 changes: 2 additions & 0 deletions pkg/rpc/base/base.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/rpc/base/base.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ enum Code{
ClientWaitPieceReady = 4004; // when target peer downloads from source slowly, should wait
ClientPieceDownloadFail = 4005;
ClientRequestLimitFail = 4006;
ClientConnectionError = 4007;
ClientPieceNotFound = 4404;

// scheduler response error 5000-5999
SchedError = 5000;
Expand Down

0 comments on commit 0b1b803

Please sign in to comment.