Skip to content

Commit

Permalink
feat: stream send error code (#986)
Browse files Browse the repository at this point in the history
* feat: change task and peer ttl (#984)

Signed-off-by: Gaius <[email protected]>

* feat: send error code

Signed-off-by: Gaius <[email protected]>

Signed-off-by: Gaius <[email protected]>

* feat: add retry interval

Signed-off-by: Gaius <[email protected]>

* feat: dfdaemon report successful piece before end of piece

Signed-off-by: Gaius <[email protected]>

* feat: update submodule version

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Jun 28, 2023
1 parent 0c03af1 commit 5479b3a
Show file tree
Hide file tree
Showing 18 changed files with 285 additions and 290 deletions.
4 changes: 2 additions & 2 deletions client/daemon/peer/peertask_conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,8 +649,8 @@ func (pt *peerTaskConductor) pullSinglePiece() {
}

if result, err := pt.pieceManager.DownloadPiece(ctx, request); err == nil {
pt.PublishPieceInfo(request.piece.PieceNum, request.piece.RangeSize)
pt.reportSuccessResult(request, result)
pt.PublishPieceInfo(request.piece.PieceNum, request.piece.RangeSize)

span.SetAttributes(config.AttributePieceSuccess.Bool(true))
span.End()
Expand Down Expand Up @@ -943,8 +943,8 @@ func (pt *peerTaskConductor) downloadPieceWorker(id int32, requests chan *Downlo
continue
} else {
// broadcast success piece
pt.PublishPieceInfo(request.piece.PieceNum, request.piece.RangeSize)
pt.reportSuccessResult(request, result)
pt.PublishPieceInfo(request.piece.PieceNum, request.piece.RangeSize)
}

span.SetAttributes(config.AttributePieceSuccess.Bool(true))
Expand Down
5 changes: 3 additions & 2 deletions client/daemon/peer/piece_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,8 @@ func (pm *pieceManager) downloadKnownLengthSource(ctx context.Context, pt Task,
return err
}
}
pt.PublishPieceInfo(pieceNum, uint32(result.Size))
pt.ReportPieceResult(request, result, nil)
pt.PublishPieceInfo(pieceNum, uint32(result.Size))
}

log.Infof("download from source ok")
Expand Down Expand Up @@ -417,6 +417,7 @@ func (pm *pieceManager) downloadUnknownLengthSource(ctx context.Context, pt Task
return err
}
if result.Size == int64(size) {
pt.ReportPieceResult(request, result, nil)
pt.PublishPieceInfo(pieceNum, uint32(result.Size))
continue
} else if result.Size > int64(size) {
Expand Down Expand Up @@ -447,8 +448,8 @@ func (pm *pieceManager) downloadUnknownLengthSource(ctx context.Context, pt Task
if result.Size == 0 {
break
}
pt.PublishPieceInfo(pieceNum, uint32(result.Size))
pt.ReportPieceResult(request, result, nil)
pt.PublishPieceInfo(pieceNum, uint32(result.Size))
break
}

Expand Down
2 changes: 1 addition & 1 deletion deploy/helm-charts
6 changes: 3 additions & 3 deletions docs/en/deployment/configuration/scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ server:
# port is the ip and port scheduler server listens on.
port: 8002
# limit the number of requests
listenLimit: 1000
listenLimit: 10000
# cacheDir is dynconfig cache storage directory
# in linux, default value is /var/cache/dragonfly
# in macos(just for testing), default value is /Users/$USER/.dragonfly/cache
Expand All @@ -29,9 +29,9 @@ scheduler:
# backSourceCount is the number of backsource clients when the CDN is unavailable
backSourceCount: 3
# retry scheduling limit times
retryLimit: 10
retryLimit: 20
# retry scheduling interval
retryInterval: 1s
retryInterval: 500ms
# gc metadata configuration
gc:
# peerGCInterval is peer's gc interval
Expand Down
6 changes: 3 additions & 3 deletions docs/zh-CN/deployment/configuration/scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ server:
# 服务监听端口
port:
# 限制请求并发数
listenLimit: 1000
listenLimit: 10000
# daemon 动态配置缓存目录
# linux 上默认目录 /var/cache/dragonfly
# macos(仅开发、测试), 默认目录是 /Users/$USER/.dragonfly/cache
Expand All @@ -27,9 +27,9 @@ scheduler:
# 单个任务允许客户端回源的数量
backSourceCount: 3
# 调度重试次数限制
retryLimit: 10
retryLimit: 20
# 调度重试时间间隔
retryInterval: 1s
retryInterval: 500ms
# 数据回收策略
gc:
# peer 的回收间隔
Expand Down
4 changes: 2 additions & 2 deletions scheduler/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ func New() *Config {
Scheduler: &SchedulerConfig{
Algorithm: "default",
BackSourceCount: 3,
RetryLimit: 10,
RetryInterval: 1 * time.Second,
RetryLimit: 20,
RetryInterval: 500 * time.Millisecond,
GC: &GCConfig{
PeerGCInterval: 10 * time.Minute,
PeerTTL: 24 * time.Hour,
Expand Down
78 changes: 35 additions & 43 deletions scheduler/resource/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/looplab/fsm"
"go.uber.org/atomic"

"d7y.io/dragonfly/v2/internal/dferrors"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
)
Expand All @@ -38,6 +37,9 @@ const (
// Peer has been created but did not start running
PeerStatePending = "Pending"

// Peer successfully registered as tiny scope size
PeerStateReceivedTiny = "ReceivedTiny"

// Peer successfully registered as small scope size
PeerStateReceivedSmall = "ReceivedSmall"

Expand All @@ -64,6 +66,9 @@ const (
// Peer is downloading
PeerEventDownload = "Download"

// Peer is registered as tiny scope size
PeerEventRegisterTiny = "RegisterTiny"

// Peer is registered as small scope size
PeerEventRegisterSmall = "RegisterSmall"

Expand All @@ -79,6 +84,9 @@ const (
// Peer downloaded failed
PeerEventDownloadFailed = "DownloadFailed"

// Peer back to initial pending state
PeerEventRestart = "Restart"

// Peer leaves
PeerEventLeave = "Leave"
)
Expand All @@ -96,9 +104,6 @@ type Peer struct {
// Stream is grpc stream instance
Stream *atomic.Value

// Record peer report piece grpc interface stop code
StopChannel chan *dferrors.DfError

// Task state machine
FSM *fsm.FSM

Expand Down Expand Up @@ -130,38 +135,39 @@ type Peer struct {
// New Peer instance
func NewPeer(id string, task *Task, host *Host) *Peer {
p := &Peer{
ID: id,
Pieces: &bitset.BitSet{},
pieceCosts: []int64{},
Stream: &atomic.Value{},
StopChannel: make(chan *dferrors.DfError, 1),
Task: task,
Host: host,
Parent: &atomic.Value{},
Children: &sync.Map{},
CreateAt: atomic.NewTime(time.Now()),
UpdateAt: atomic.NewTime(time.Now()),
mu: &sync.RWMutex{},
Log: logger.WithTaskAndPeerID(task.ID, id),
ID: id,
Pieces: &bitset.BitSet{},
pieceCosts: []int64{},
Stream: &atomic.Value{},
Task: task,
Host: host,
Parent: &atomic.Value{},
Children: &sync.Map{},
CreateAt: atomic.NewTime(time.Now()),
UpdateAt: atomic.NewTime(time.Now()),
mu: &sync.RWMutex{},
Log: logger.WithTaskAndPeerID(task.ID, id),
}

// Initialize state machine
p.FSM = fsm.NewFSM(
PeerStatePending,
fsm.Events{
{Name: PeerEventRegisterTiny, Src: []string{PeerStatePending}, Dst: PeerStateReceivedTiny},
{Name: PeerEventRegisterSmall, Src: []string{PeerStatePending}, Dst: PeerStateReceivedSmall},
{Name: PeerEventRegisterNormal, Src: []string{PeerStatePending}, Dst: PeerStateReceivedNormal},
{Name: PeerEventDownload, Src: []string{PeerStateReceivedSmall, PeerStateReceivedNormal}, Dst: PeerStateRunning},
{Name: PeerEventDownloadFromBackToSource, Src: []string{PeerStateRunning}, Dst: PeerStateBackToSource},
{Name: PeerEventDownload, Src: []string{PeerStateReceivedTiny, PeerStateReceivedSmall, PeerStateReceivedNormal}, Dst: PeerStateRunning},
{Name: PeerEventDownloadFromBackToSource, Src: []string{PeerStateReceivedTiny, PeerStateReceivedSmall, PeerStateReceivedNormal, PeerStateRunning}, Dst: PeerStateBackToSource},
{Name: PeerEventDownloadSucceeded, Src: []string{PeerStateRunning, PeerStateBackToSource}, Dst: PeerStateSucceeded},
{Name: PeerEventDownloadFailed, Src: []string{
PeerStatePending, PeerStateReceivedSmall, PeerStateReceivedNormal,
PeerStatePending, PeerStateReceivedTiny, PeerStateReceivedSmall, PeerStateReceivedNormal,
PeerStateRunning, PeerStateBackToSource, PeerStateSucceeded,
}, Dst: PeerStateFailed},
{Name: PeerEventRestart, Src: []string{PeerStateSucceeded}, Dst: PeerStatePending},
{Name: PeerEventLeave, Src: []string{PeerStateFailed, PeerStateSucceeded}, Dst: PeerEventLeave},
},
fsm.Callbacks{
PeerEventDownload: func(e *fsm.Event) {
PeerEventRegisterTiny: func(e *fsm.Event) {
p.UpdateAt.Store(time.Now())
p.Log.Infof("peer state is %s", e.FSM.Current())
},
Expand All @@ -173,6 +179,10 @@ func NewPeer(id string, task *Task, host *Host) *Peer {
p.UpdateAt.Store(time.Now())
p.Log.Infof("peer state is %s", e.FSM.Current())
},
PeerEventDownload: func(e *fsm.Event) {
p.UpdateAt.Store(time.Now())
p.Log.Infof("peer state is %s", e.FSM.Current())
},
PeerEventDownloadFromBackToSource: func(e *fsm.Event) {
p.Task.BackToSourcePeers.Add(p)
p.UpdateAt.Store(time.Now())
Expand All @@ -194,6 +204,10 @@ func NewPeer(id string, task *Task, host *Host) *Peer {
p.UpdateAt.Store(time.Now())
p.Log.Infof("peer state is %s", e.FSM.Current())
},
PeerEventRestart: func(e *fsm.Event) {
p.UpdateAt.Store(time.Now())
p.Log.Infof("peer state is %s", e.FSM.Current())
},
PeerEventLeave: func(e *fsm.Event) {
p.Log.Infof("peer state is %s", e.FSM.Current())
},
Expand Down Expand Up @@ -373,28 +387,6 @@ func (p *Peer) DeleteStream() {
p.Stream = &atomic.Value{}
}

// StopStream stops grpc stream with error code
func (p *Peer) StopStream(dferr *dferrors.DfError) bool {
p.mu.Lock()
defer p.mu.Unlock()

if _, ok := p.LoadStream(); !ok {
p.Log.Error("stop stream failed: can not find peer stream")
return false
}
p.DeleteStream()

select {
case p.StopChannel <- dferr:
p.Log.Infof("send stop channel %#v", dferr)
default:
p.Log.Error("stop channel busy")
return false
}

return true
}

// Download tiny file from peer
func (p *Peer) DownloadTinyFile(ctx context.Context) ([]byte, error) {
// Download url: http://${host}:${port}/download/${taskIndex}/${taskID}?peerId=scheduler;
Expand Down
63 changes: 0 additions & 63 deletions scheduler/resource/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"

"d7y.io/dragonfly/v2/internal/dferrors"
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler/mocks"
)
Expand All @@ -55,7 +53,6 @@ func TestPeer_NewPeer(t *testing.T) {
assert.Empty(peer.Pieces)
assert.Equal(len(peer.PieceCosts()), 0)
assert.Empty(peer.Stream)
assert.Empty(peer.StopChannel)
assert.Equal(peer.FSM.Current(), PeerStatePending)
assert.EqualValues(peer.Task, mockTask)
assert.EqualValues(peer.Host, mockHost)
Expand Down Expand Up @@ -867,66 +864,6 @@ func TestPeer_DeleteStream(t *testing.T) {
}
}

func TestPeer_StopStream(t *testing.T) {
tests := []struct {
name string
expect func(t *testing.T, peer *Peer, stream scheduler.Scheduler_ReportPieceResultServer)
}{
{
name: "stop stream with scheduling error",
expect: func(t *testing.T, peer *Peer, stream scheduler.Scheduler_ReportPieceResultServer) {
assert := assert.New(t)
peer.StoreStream(stream)
ok := peer.StopStream(dferrors.New(base.Code_SchedError, ""))
assert.Equal(ok, true)
_, ok = peer.LoadStream()
assert.Equal(ok, false)

select {
case dferr := <-peer.StopChannel:
assert.Equal(dferr.Code, base.Code_SchedError)
assert.Equal(dferr.Message, "")
default:
assert.Fail("stop channel can not receive error")
}
},
},
{
name: "stop stream with empty stream",
expect: func(t *testing.T, peer *Peer, stream scheduler.Scheduler_ReportPieceResultServer) {
assert := assert.New(t)
ok := peer.StopStream(dferrors.New(base.Code_SchedError, ""))
assert.Equal(ok, false)
},
},
{
name: "stop stream with channel busy",
expect: func(t *testing.T, peer *Peer, stream scheduler.Scheduler_ReportPieceResultServer) {
assert := assert.New(t)
peer.StoreStream(stream)
peer.StopChannel <- dferrors.New(base.Code_SchedError, "")
ok := peer.StopStream(dferrors.New(base.Code_SchedError, ""))
assert.Equal(ok, false)
_, ok = peer.LoadStream()
assert.Equal(ok, false)
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl)

mockHost := NewHost(mockRawHost)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskBackToSourceLimit, mockTaskURLMeta)
peer := NewPeer(mockPeerID, mockTask, mockHost)
tc.expect(t, peer, stream)
})
}
}

func TestPeer_DownloadTinyFile(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
Expand Down
Loading

0 comments on commit 5479b3a

Please sign in to comment.