Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: stream send error code #986

Merged
merged 14 commits into from
Jan 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions client/daemon/peer/peertask_conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,8 +648,8 @@ func (pt *peerTaskConductor) pullSinglePiece() {
}

if result, err := pt.pieceManager.DownloadPiece(ctx, request); err == nil {
pt.PublishPieceInfo(request.piece.PieceNum, request.piece.RangeSize)
pt.reportSuccessResult(request, result)
pt.PublishPieceInfo(request.piece.PieceNum, request.piece.RangeSize)

span.SetAttributes(config.AttributePieceSuccess.Bool(true))
span.End()
Expand Down Expand Up @@ -942,8 +942,8 @@ func (pt *peerTaskConductor) downloadPieceWorker(id int32, requests chan *Downlo
continue
} else {
// broadcast success piece
pt.PublishPieceInfo(request.piece.PieceNum, request.piece.RangeSize)
pt.reportSuccessResult(request, result)
pt.PublishPieceInfo(request.piece.PieceNum, request.piece.RangeSize)
}

span.SetAttributes(config.AttributePieceSuccess.Bool(true))
Expand Down
5 changes: 3 additions & 2 deletions client/daemon/peer/piece_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,8 @@ func (pm *pieceManager) downloadKnownLengthSource(ctx context.Context, pt Task,
return err
}
}
pt.PublishPieceInfo(pieceNum, uint32(result.Size))
pt.ReportPieceResult(request, result, nil)
pt.PublishPieceInfo(pieceNum, uint32(result.Size))
}

log.Infof("download from source ok")
Expand Down Expand Up @@ -417,6 +417,7 @@ func (pm *pieceManager) downloadUnknownLengthSource(ctx context.Context, pt Task
return err
}
if result.Size == int64(size) {
pt.ReportPieceResult(request, result, nil)
pt.PublishPieceInfo(pieceNum, uint32(result.Size))
continue
} else if result.Size > int64(size) {
Expand Down Expand Up @@ -447,8 +448,8 @@ func (pm *pieceManager) downloadUnknownLengthSource(ctx context.Context, pt Task
if result.Size == 0 {
break
}
pt.PublishPieceInfo(pieceNum, uint32(result.Size))
pt.ReportPieceResult(request, result, nil)
pt.PublishPieceInfo(pieceNum, uint32(result.Size))
break
}

Expand Down
6 changes: 3 additions & 3 deletions docs/en/deployment/configuration/scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ server:
# port is the ip and port scheduler server listens on.
port: 8002
# limit the number of requests
listenLimit: 1000
listenLimit: 10000
# cacheDir is dynconfig cache storage directory
# in linux, default value is /var/cache/dragonfly
# in macos(just for testing), default value is /Users/$USER/.dragonfly/cache
Expand All @@ -29,9 +29,9 @@ scheduler:
# backSourceCount is the number of backsource clients when the CDN is unavailable
backSourceCount: 3
# retry scheduling limit times
retryLimit: 10
retryLimit: 20
# retry scheduling interval
retryInterval: 1s
retryInterval: 500ms
# gc metadata configuration
gc:
# peerGCInterval is peer's gc interval
Expand Down
6 changes: 3 additions & 3 deletions docs/zh-CN/deployment/configuration/scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ server:
# 服务监听端口
port:
# 限制请求并发数
listenLimit: 1000
listenLimit: 10000
# daemon 动态配置缓存目录
# linux 上默认目录 /var/cache/dragonfly
# macos(仅开发、测试), 默认目录是 /Users/$USER/.dragonfly/cache
Expand All @@ -27,9 +27,9 @@ scheduler:
# 单个任务允许客户端回源的数量
backSourceCount: 3
# 调度重试次数限制
retryLimit: 10
retryLimit: 20
# 调度重试时间间隔
retryInterval: 1s
retryInterval: 500ms
# 数据回收策略
gc:
# peer 的回收间隔
Expand Down
4 changes: 2 additions & 2 deletions scheduler/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ func New() *Config {
Scheduler: &SchedulerConfig{
Algorithm: "default",
BackSourceCount: 3,
RetryLimit: 10,
RetryInterval: 1 * time.Second,
RetryLimit: 20,
RetryInterval: 500 * time.Millisecond,
GC: &GCConfig{
PeerGCInterval: 10 * time.Minute,
PeerTTL: 24 * time.Hour,
Expand Down
78 changes: 35 additions & 43 deletions scheduler/resource/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/looplab/fsm"
"go.uber.org/atomic"

"d7y.io/dragonfly/v2/internal/dferrors"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
)
Expand All @@ -38,6 +37,9 @@ const (
// Peer has been created but did not start running
PeerStatePending = "Pending"

// Peer successfully registered as tiny scope size
PeerStateReceivedTiny = "ReceivedTiny"

// Peer successfully registered as small scope size
PeerStateReceivedSmall = "ReceivedSmall"

Expand All @@ -64,6 +66,9 @@ const (
// Peer is downloading
PeerEventDownload = "Download"

// Peer is registered as tiny scope size
PeerEventRegisterTiny = "RegisterTiny"

// Peer is registered as small scope size
PeerEventRegisterSmall = "RegisterSmall"

Expand All @@ -79,6 +84,9 @@ const (
// Peer downloaded failed
PeerEventDownloadFailed = "DownloadFailed"

// Peer back to initial pending state
PeerEventRestart = "Restart"

// Peer leaves
PeerEventLeave = "Leave"
)
Expand All @@ -96,9 +104,6 @@ type Peer struct {
// Stream is grpc stream instance
Stream *atomic.Value

// Record peer report piece grpc interface stop code
StopChannel chan *dferrors.DfError

// Task state machine
FSM *fsm.FSM

Expand Down Expand Up @@ -130,38 +135,39 @@ type Peer struct {
// New Peer instance
func NewPeer(id string, task *Task, host *Host) *Peer {
p := &Peer{
ID: id,
Pieces: &bitset.BitSet{},
pieceCosts: []int64{},
Stream: &atomic.Value{},
StopChannel: make(chan *dferrors.DfError, 1),
Task: task,
Host: host,
Parent: &atomic.Value{},
Children: &sync.Map{},
CreateAt: atomic.NewTime(time.Now()),
UpdateAt: atomic.NewTime(time.Now()),
mu: &sync.RWMutex{},
Log: logger.WithTaskAndPeerID(task.ID, id),
ID: id,
Pieces: &bitset.BitSet{},
pieceCosts: []int64{},
Stream: &atomic.Value{},
Task: task,
Host: host,
Parent: &atomic.Value{},
Children: &sync.Map{},
CreateAt: atomic.NewTime(time.Now()),
UpdateAt: atomic.NewTime(time.Now()),
mu: &sync.RWMutex{},
Log: logger.WithTaskAndPeerID(task.ID, id),
}

// Initialize state machine
p.FSM = fsm.NewFSM(
PeerStatePending,
fsm.Events{
{Name: PeerEventRegisterTiny, Src: []string{PeerStatePending}, Dst: PeerStateReceivedTiny},
{Name: PeerEventRegisterSmall, Src: []string{PeerStatePending}, Dst: PeerStateReceivedSmall},
{Name: PeerEventRegisterNormal, Src: []string{PeerStatePending}, Dst: PeerStateReceivedNormal},
{Name: PeerEventDownload, Src: []string{PeerStateReceivedSmall, PeerStateReceivedNormal}, Dst: PeerStateRunning},
{Name: PeerEventDownloadFromBackToSource, Src: []string{PeerStateRunning}, Dst: PeerStateBackToSource},
{Name: PeerEventDownload, Src: []string{PeerStateReceivedTiny, PeerStateReceivedSmall, PeerStateReceivedNormal}, Dst: PeerStateRunning},
{Name: PeerEventDownloadFromBackToSource, Src: []string{PeerStateReceivedTiny, PeerStateReceivedSmall, PeerStateReceivedNormal, PeerStateRunning}, Dst: PeerStateBackToSource},
{Name: PeerEventDownloadSucceeded, Src: []string{PeerStateRunning, PeerStateBackToSource}, Dst: PeerStateSucceeded},
{Name: PeerEventDownloadFailed, Src: []string{
PeerStatePending, PeerStateReceivedSmall, PeerStateReceivedNormal,
PeerStatePending, PeerStateReceivedTiny, PeerStateReceivedSmall, PeerStateReceivedNormal,
PeerStateRunning, PeerStateBackToSource, PeerStateSucceeded,
}, Dst: PeerStateFailed},
{Name: PeerEventRestart, Src: []string{PeerStateSucceeded}, Dst: PeerStatePending},
{Name: PeerEventLeave, Src: []string{PeerStateFailed, PeerStateSucceeded}, Dst: PeerEventLeave},
},
fsm.Callbacks{
PeerEventDownload: func(e *fsm.Event) {
PeerEventRegisterTiny: func(e *fsm.Event) {
p.UpdateAt.Store(time.Now())
p.Log.Infof("peer state is %s", e.FSM.Current())
},
Expand All @@ -173,6 +179,10 @@ func NewPeer(id string, task *Task, host *Host) *Peer {
p.UpdateAt.Store(time.Now())
p.Log.Infof("peer state is %s", e.FSM.Current())
},
PeerEventDownload: func(e *fsm.Event) {
p.UpdateAt.Store(time.Now())
p.Log.Infof("peer state is %s", e.FSM.Current())
},
PeerEventDownloadFromBackToSource: func(e *fsm.Event) {
p.Task.BackToSourcePeers.Add(p)
p.UpdateAt.Store(time.Now())
Expand All @@ -194,6 +204,10 @@ func NewPeer(id string, task *Task, host *Host) *Peer {
p.UpdateAt.Store(time.Now())
p.Log.Infof("peer state is %s", e.FSM.Current())
},
PeerEventRestart: func(e *fsm.Event) {
p.UpdateAt.Store(time.Now())
p.Log.Infof("peer state is %s", e.FSM.Current())
},
PeerEventLeave: func(e *fsm.Event) {
p.Log.Infof("peer state is %s", e.FSM.Current())
},
Expand Down Expand Up @@ -373,28 +387,6 @@ func (p *Peer) DeleteStream() {
p.Stream = &atomic.Value{}
}

// StopStream stops grpc stream with error code
func (p *Peer) StopStream(dferr *dferrors.DfError) bool {
p.mu.Lock()
defer p.mu.Unlock()

if _, ok := p.LoadStream(); !ok {
p.Log.Error("stop stream failed: can not find peer stream")
return false
}
p.DeleteStream()

select {
case p.StopChannel <- dferr:
p.Log.Infof("send stop channel %#v", dferr)
default:
p.Log.Error("stop channel busy")
return false
}

return true
}

// Download tiny file from peer
func (p *Peer) DownloadTinyFile(ctx context.Context) ([]byte, error) {
// Download url: http://${host}:${port}/download/${taskIndex}/${taskID}?peerId=scheduler;
Expand Down
63 changes: 0 additions & 63 deletions scheduler/resource/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"

"d7y.io/dragonfly/v2/internal/dferrors"
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler/mocks"
)
Expand All @@ -55,7 +53,6 @@ func TestPeer_NewPeer(t *testing.T) {
assert.Empty(peer.Pieces)
assert.Equal(len(peer.PieceCosts()), 0)
assert.Empty(peer.Stream)
assert.Empty(peer.StopChannel)
assert.Equal(peer.FSM.Current(), PeerStatePending)
assert.EqualValues(peer.Task, mockTask)
assert.EqualValues(peer.Host, mockHost)
Expand Down Expand Up @@ -867,66 +864,6 @@ func TestPeer_DeleteStream(t *testing.T) {
}
}

func TestPeer_StopStream(t *testing.T) {
tests := []struct {
name string
expect func(t *testing.T, peer *Peer, stream scheduler.Scheduler_ReportPieceResultServer)
}{
{
name: "stop stream with scheduling error",
expect: func(t *testing.T, peer *Peer, stream scheduler.Scheduler_ReportPieceResultServer) {
assert := assert.New(t)
peer.StoreStream(stream)
ok := peer.StopStream(dferrors.New(base.Code_SchedError, ""))
assert.Equal(ok, true)
_, ok = peer.LoadStream()
assert.Equal(ok, false)

select {
case dferr := <-peer.StopChannel:
assert.Equal(dferr.Code, base.Code_SchedError)
assert.Equal(dferr.Message, "")
default:
assert.Fail("stop channel can not receive error")
}
},
},
{
name: "stop stream with empty stream",
expect: func(t *testing.T, peer *Peer, stream scheduler.Scheduler_ReportPieceResultServer) {
assert := assert.New(t)
ok := peer.StopStream(dferrors.New(base.Code_SchedError, ""))
assert.Equal(ok, false)
},
},
{
name: "stop stream with channel busy",
expect: func(t *testing.T, peer *Peer, stream scheduler.Scheduler_ReportPieceResultServer) {
assert := assert.New(t)
peer.StoreStream(stream)
peer.StopChannel <- dferrors.New(base.Code_SchedError, "")
ok := peer.StopStream(dferrors.New(base.Code_SchedError, ""))
assert.Equal(ok, false)
_, ok = peer.LoadStream()
assert.Equal(ok, false)
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl)

mockHost := NewHost(mockRawHost)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskBackToSourceLimit, mockTaskURLMeta)
peer := NewPeer(mockPeerID, mockTask, mockHost)
tc.expect(t, peer, stream)
})
}
}

func TestPeer_DownloadTinyFile(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
Expand Down
Loading