Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: prefetch ranged requests #1053

Merged
merged 10 commits into from
Feb 10, 2022
4 changes: 4 additions & 0 deletions client/clientutil/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ type Range struct {
Start, Length int64
}

func (r Range) String() string {
return fmt.Sprintf("bytes=%d-%d", r.Start, r.Start+r.Length-1)
}

// ErrNoOverlap is returned by ParseRange if first-byte-pos of
// all of the byte-range-spec values is greater than the content size.
var ErrNoOverlap = errors.New("invalid range: failed to overlap")
Expand Down
1 change: 1 addition & 0 deletions client/config/constants_otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
AttributePeerID = attribute.Key("d7y.peer.id")
AttributeTargetPeerID = attribute.Key("d7y.peer.target.id")
AttributeReusePeerID = attribute.Key("d7y.peer.reuse.id")
AttributeReuseRange = attribute.Key("d7y.peer.reuse.range")
AttributeTargetPeerAddr = attribute.Key("d7y.peer.target.addr")
AttributeMainPeer = attribute.Key("d7y.peer.task.main_peer")
AttributePeerPacketCode = attribute.Key("d7y.peer.packet.code")
Expand Down
1 change: 1 addition & 0 deletions client/config/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
HeaderDragonflyFilter = "X-Dragonfly-Filter"
HeaderDragonflyPeer = "X-Dragonfly-Peer"
HeaderDragonflyTask = "X-Dragonfly-Task"
HeaderDragonflyRange = "X-Dragonfly-Range"
HeaderDragonflyBiz = "X-Dragonfly-Biz"
// HeaderDragonflyRegistry is used for dynamic registry mirrors
HeaderDragonflyRegistry = "X-Dragonfly-Registry"
Expand Down
1 change: 1 addition & 0 deletions client/config/peerhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ type DownloadOption struct {
CalculateDigest bool `mapstructure:"calculateDigest" yaml:"calculateDigest"`
TransportOption *TransportOption `mapstructure:"transportOption" yaml:"transportOption"`
GetPiecesMaxRetry int `mapstructure:"getPiecesMaxRetry" yaml:"getPiecesMaxRetry"`
Prefetch bool `mapstructure:"prefetch" yaml:"prefetch"`
}

type TransportOption struct {
Expand Down
2 changes: 1 addition & 1 deletion client/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) {
return nil, err
}
peerTaskManager, err := peer.NewPeerTaskManager(host, pieceManager, storageManager, sched, opt.Scheduler,
opt.Download.PerPeerRateLimit.Limit, opt.Storage.Multiplex, opt.Download.CalculateDigest, opt.Download.GetPiecesMaxRetry)
opt.Download.PerPeerRateLimit.Limit, opt.Storage.Multiplex, opt.Download.Prefetch, opt.Download.CalculateDigest, opt.Download.GetPiecesMaxRetry)
if err != nil {
return nil, err
}
Expand Down
21 changes: 14 additions & 7 deletions client/daemon/peer/peertask_conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (pt *peerTaskConductor) storeTinyPeerTask() {
pt.SetTotalPieces(1)
ctx := pt.ctx
var err error
pt.storage, err = pt.peerTaskManager.storageManager.RegisterTask(ctx,
storageDriver, err := pt.peerTaskManager.storageManager.RegisterTask(ctx,
storage.RegisterTaskRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: pt.tinyData.PeerID,
Expand All @@ -424,12 +424,13 @@ func (pt *peerTaskConductor) storeTinyPeerTask() {
TotalPieces: 1,
// TODO check digest
})
pt.storage = storageDriver
if err != nil {
logger.Errorf("register tiny data storage failed: %s", err)
pt.cancel(base.Code_ClientError, err.Error())
return
}
n, err := pt.storage.WritePiece(ctx,
n, err := pt.GetStorage().WritePiece(ctx,
&storage.WritePieceRequest{
PeerTaskMetadata: storage.PeerTaskMetadata{
PeerID: pt.tinyData.PeerID,
Expand Down Expand Up @@ -653,7 +654,7 @@ func (pt *peerTaskConductor) pullSinglePiece() {
}

request := &DownloadPieceRequest{
storage: pt.storage,
storage: pt.GetStorage(),
piece: pt.singlePiece.PieceInfo,
log: pt.Log(),
TaskID: pt.GetTaskID(),
Expand Down Expand Up @@ -892,7 +893,7 @@ func (pt *peerTaskConductor) dispatchPieceRequest(pieceRequestCh chan *DownloadP
pt.requestedPieces.Set(piece.PieceNum)
}
req := &DownloadPieceRequest{
storage: pt.storage,
storage: pt.GetStorage(),
piece: piece,
log: pt.Log(),
TaskID: pt.GetTaskID(),
Expand Down Expand Up @@ -1106,6 +1107,12 @@ func (pt *peerTaskConductor) reportFailResult(request *DownloadPieceRequest, res
}

func (pt *peerTaskConductor) InitStorage() (err error) {
pt.lock.Lock()
defer pt.lock.Unlock()
// check storage for partial back source cases.
if pt.storage != nil {
return nil
}
// prepare storage
pt.storage, err = pt.storageManager.RegisterTask(pt.ctx,
storage.RegisterTaskRequest{
Expand All @@ -1125,7 +1132,7 @@ func (pt *peerTaskConductor) InitStorage() (err error) {

func (pt *peerTaskConductor) UpdateStorage() error {
// update storage
err := pt.storage.UpdateTask(pt.ctx,
err := pt.GetStorage().UpdateTask(pt.ctx,
&storage.UpdateTaskRequest{
PeerTaskMetadata: storage.PeerTaskMetadata{
PeerID: pt.GetPeerID(),
Expand Down Expand Up @@ -1278,7 +1285,7 @@ func (pt *peerTaskConductor) fail() {

// Validate stores metadata and validates digest
func (pt *peerTaskConductor) Validate() error {
err := pt.storage.Store(pt.ctx,
err := pt.GetStorage().Store(pt.ctx,
&storage.StoreRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: pt.peerID,
Expand All @@ -1295,7 +1302,7 @@ func (pt *peerTaskConductor) Validate() error {
if !pt.peerTaskManager.calculateDigest {
return nil
}
err = pt.storage.ValidateDigest(
err = pt.GetStorage().ValidateDigest(
&storage.PeerTaskMetadata{
PeerID: pt.GetPeerID(),
TaskID: pt.GetTaskID(),
Expand Down
4 changes: 4 additions & 0 deletions client/daemon/peer/peertask_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ func (ptm *peerTaskManager) newFileTask(
if err != nil {
return nil, nil, err
}
// prefetch parent request
if ptm.enablePrefetch && request.UrlMeta.Range != "" {
go ptm.prefetch(&request.PeerTaskRequest)
}
ctx, span := tracer.Start(ctx, config.SpanFileTask, trace.WithSpanKind(trace.SpanKindClient))

pt := &fileTask{
Expand Down
48 changes: 44 additions & 4 deletions client/daemon/peer/peertask_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"sync"

"github.com/go-http-utils/headers"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"golang.org/x/time/rate"
Expand All @@ -29,6 +30,8 @@ import (
"d7y.io/dragonfly/v2/client/daemon/metrics"
"d7y.io/dragonfly/v2/client/daemon/storage"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
)
Expand Down Expand Up @@ -108,10 +111,10 @@ type peerTaskManager struct {

perPeerRateLimit rate.Limit

// enableMultiplex indicates reusing completed peer task storage
// currently, only check completed peer task after register to scheduler
// TODO multiplex the running peer task
// enableMultiplex indicates to reuse the data of completed peer tasks
enableMultiplex bool
// enablePrefetch indicates to prefetch the whole files of ranged requests
enablePrefetch bool

calculateDigest bool

Expand All @@ -126,6 +129,7 @@ func NewPeerTaskManager(
schedulerOption config.SchedulerOption,
perPeerRateLimit rate.Limit,
multiplex bool,
prefetch bool,
calculateDigest bool,
getPiecesMaxRetry int) (TaskManager, error) {

Expand All @@ -139,6 +143,7 @@ func NewPeerTaskManager(
schedulerOption: schedulerOption,
perPeerRateLimit: perPeerRateLimit,
enableMultiplex: multiplex,
enablePrefetch: prefetch,
calculateDigest: calculateDigest,
getPiecesMaxRetry: getPiecesMaxRetry,
}
Expand Down Expand Up @@ -198,6 +203,41 @@ func (ptm *peerTaskManager) getOrCreatePeerTaskConductor(
return ptc, true, nil
}

func (ptm *peerTaskManager) prefetch(request *scheduler.PeerTaskRequest) {
req := &scheduler.PeerTaskRequest{
Url: request.Url,
PeerId: request.PeerId,
PeerHost: ptm.host,
HostLoad: request.HostLoad,
IsMigrating: request.IsMigrating,
UrlMeta: &base.UrlMeta{
Digest: request.UrlMeta.Digest,
Tag: request.UrlMeta.Tag,
Filter: request.UrlMeta.Filter,
Header: map[string]string{},
},
}
for k, v := range request.UrlMeta.Header {
if k == headers.Range {
continue
}
req.UrlMeta.Header[k] = v
}
taskID := idgen.TaskID(req.Url, req.UrlMeta)
req.PeerId = idgen.PeerID(req.PeerHost.Ip)

var limit = rate.Inf
if ptm.perPeerRateLimit > 0 {
limit = ptm.perPeerRateLimit
}

logger.Infof("prefetch peer task %s/%s", taskID, req.PeerId)
prefetch, err := ptm.getPeerTaskConductor(context.Background(), taskID, req, limit)
if err != nil {
logger.Errorf("prefetch peer task %s/%s error: %s", prefetch.taskID, prefetch.peerID, err)
}
}

func (ptm *peerTaskManager) StartFileTask(ctx context.Context, req *FileTaskRequest) (chan *FileTaskProgress, *TinyData, error) {
if ptm.enableMultiplex {
progress, ok := ptm.tryReuseFilePeerTask(ctx, req)
Expand Down Expand Up @@ -235,7 +275,7 @@ func (ptm *peerTaskManager) StartStreamTask(ctx context.Context, req *StreamTask
}

if ptm.enableMultiplex {
r, attr, ok := ptm.tryReuseStreamPeerTask(ctx, peerTaskRequest)
r, attr, ok := ptm.tryReuseStreamPeerTask(ctx, req)
if ok {
metrics.PeerTaskCacheHitCount.Add(1)
return r, attr, nil
Expand Down
Loading