Skip to content

Commit

Permalink
feat: dfdaemon report successful piece before end of piece
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Jan 14, 2022
1 parent dd0df3f commit 363b100
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 111 deletions.
4 changes: 2 additions & 2 deletions client/daemon/peer/peertask_conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,8 +630,8 @@ func (pt *peerTaskConductor) pullSinglePiece() {
}

if result, err := pt.pieceManager.DownloadPiece(ctx, request); err == nil {
pt.PublishPieceInfo(request.piece.PieceNum, request.piece.RangeSize)
pt.reportSuccessResult(request, result)
pt.PublishPieceInfo(request.piece.PieceNum, request.piece.RangeSize)

span.SetAttributes(config.AttributePieceSuccess.Bool(true))
span.End()
Expand Down Expand Up @@ -924,8 +924,8 @@ func (pt *peerTaskConductor) downloadPieceWorker(id int32, requests chan *Downlo
continue
} else {
// broadcast success piece
pt.PublishPieceInfo(request.piece.PieceNum, request.piece.RangeSize)
pt.reportSuccessResult(request, result)
pt.PublishPieceInfo(request.piece.PieceNum, request.piece.RangeSize)
}

span.SetAttributes(config.AttributePieceSuccess.Bool(true))
Expand Down
5 changes: 3 additions & 2 deletions client/daemon/peer/piece_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,8 @@ func (pm *pieceManager) downloadKnownLengthSource(ctx context.Context, pt Task,
return err
}
}
pt.PublishPieceInfo(pieceNum, uint32(result.Size))
pt.ReportPieceResult(request, result, nil)
pt.PublishPieceInfo(pieceNum, uint32(result.Size))
}

log.Infof("download from source ok")
Expand Down Expand Up @@ -417,6 +417,7 @@ func (pm *pieceManager) downloadUnknownLengthSource(ctx context.Context, pt Task
return err
}
if result.Size == int64(size) {
pt.ReportPieceResult(request, result, nil)
pt.PublishPieceInfo(pieceNum, uint32(result.Size))
continue
} else if result.Size > int64(size) {
Expand All @@ -443,8 +444,8 @@ func (pm *pieceManager) downloadUnknownLengthSource(ctx context.Context, pt Task
pt.ReportPieceResult(request, result, err)
return err
}
pt.PublishPieceInfo(pieceNum, uint32(result.Size))
pt.ReportPieceResult(request, result, nil)
pt.PublishPieceInfo(pieceNum, uint32(result.Size))
break
}

Expand Down
2 changes: 1 addition & 1 deletion deploy/helm-charts
17 changes: 11 additions & 6 deletions scheduler/scheduler/evaluator/evaluator_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (eb *evaluatorBase) Evaluate(parent *resource.Peer, child *resource.Peer, t

return finishedPieceWeight*calculatePieceScore(parent, child, totalPieceCount) +
freeLoadWeight*calculateFreeLoadScore(parent.Host) +
hostTypeAffinityWeight*calculateHostTypeAffinityScore(parent.Host) +
hostTypeAffinityWeight*calculateHostTypeAffinityScore(parent) +
idcAffinityWeight*calculateIDCAffinityScore(parent.Host, child.Host) +
netTopologyAffinityWeight*calculateMultiElementAffinityScore(parent.Host.NetTopology, child.Host.NetTopology) +
locationAffinityWeight*calculateMultiElementAffinityScore(parent.Host.Location, child.Host.Location)
Expand Down Expand Up @@ -116,14 +116,19 @@ func calculateFreeLoadScore(host *resource.Host) float64 {
}

// calculateHostTypeAffinityScore 0.0~1.0 larger and better
func calculateHostTypeAffinityScore(host *resource.Host) float64 {
// The selected priority of CDN is lower,
// because CDN download resources are reserved for the first download as much as possible
if host.IsCDN {
func calculateHostTypeAffinityScore(peer *resource.Peer) float64 {
// When the task is downloaded for the first time,
// peer will be scheduled to cdn first,
// otherwise it will be scheduled to dfdaemon first
if peer.Host.IsCDN {
if peer.FSM.Is(resource.PeerStateRunning) {
return maxScore
}

return minScore
}

return maxScore
return maxScore * 0.5
}

// calculateIDCAffinityScore 0.0~1.0 larger and better
Expand Down
37 changes: 25 additions & 12 deletions scheduler/scheduler/evaluator/evaluator_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestEvaluatorBase_Evaluate(t *testing.T) {
},
expect: func(t *testing.T, score float64) {
assert := assert.New(t)
assert.Equal(score, float64(1))
assert.Equal(score, float64(0.925))
},
},
{
Expand All @@ -131,7 +131,7 @@ func TestEvaluatorBase_Evaluate(t *testing.T) {
},
expect: func(t *testing.T, score float64) {
assert := assert.New(t)
assert.Equal(score, float64(1))
assert.Equal(score, float64(0.925))
},
},
{
Expand All @@ -146,7 +146,7 @@ func TestEvaluatorBase_Evaluate(t *testing.T) {
},
expect: func(t *testing.T, score float64) {
assert := assert.New(t)
assert.Equal(score, float64(1))
assert.Equal(score, float64(0.925))
},
},
}
Expand Down Expand Up @@ -317,34 +317,47 @@ func TestEvaluatorBase_calculateFreeLoadScore(t *testing.T) {
func TestEvaluatorBase_calculateHostTypeAffinityScore(t *testing.T) {
tests := []struct {
name string
mock func(host *resource.Host)
mock func(peer *resource.Peer)
expect func(t *testing.T, score float64)
}{
{
name: "host is normal peer",
mock: func(host *resource.Host) {},
mock: func(peer *resource.Peer) {},
expect: func(t *testing.T, score float64) {
assert := assert.New(t)
assert.Equal(score, float64(1))
assert.Equal(score, float64(0.5))
},
},
{
name: "host is cdn",
mock: func(host *resource.Host) {
host.IsCDN = true
name: "host is cdn but peer state is not PeerStateRunning",
mock: func(peer *resource.Peer) {
peer.Host.IsCDN = true
},
expect: func(t *testing.T, score float64) {
assert := assert.New(t)
assert.Equal(score, float64(0))
},
},
{
name: "host is cdn but peer state is PeerStateRunning",
mock: func(peer *resource.Peer) {
peer.Host.IsCDN = true
peer.FSM.SetState(resource.PeerStateRunning)
},
expect: func(t *testing.T, score float64) {
assert := assert.New(t)
assert.Equal(score, float64(1))
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
host := resource.NewHost(mockRawHost)
tc.mock(host)
tc.expect(t, calculateHostTypeAffinityScore(host))
mockHost := resource.NewHost(mockRawHost)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskBackToSourceLimit, mockTaskURLMeta)
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
tc.mock(peer)
tc.expect(t, calculateHostTypeAffinityScore(peer))
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion scheduler/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,10 @@ func (s *scheduler) filterParents(peer *resource.Peer, blocklist set.SafeSet) []
}

parents = append(parents, parent)
peer.Log.Infof("parent %s is selected", parent.ID)
return true
})

peer.Log.Infof("candidate parents include %#v", parents)
return parents
}

Expand Down
2 changes: 0 additions & 2 deletions scheduler/service/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,6 @@ func (c *callback) PieceFail(ctx context.Context, peer *resource.Peer, piece *rp
// It’s not a case of back-to-source downloading failed,
// to help peer to reschedule the parent node
switch piece.Code {
case base.Code_ClientWaitPieceReady:
return
case base.Code_ClientPieceDownloadFail, base.Code_PeerTaskNotFound, base.Code_CDNError, base.Code_CDNTaskDownloadFail:
if err := parent.FSM.Event(resource.PeerEventDownloadFailed); err != nil {
peer.Log.Errorf("peer fsm event failed: %v", err)
Expand Down
84 changes: 0 additions & 84 deletions scheduler/service/callback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"net/url"
"reflect"
"strconv"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -361,26 +360,6 @@ func TestCallback_PieceFail(t *testing.T) {
assert.True(peer.FSM.Is(resource.PeerStateBackToSource))
},
},
{
name: "piece result code is Code_ClientWaitPieceReady",
piece: &rpcscheduler.PieceResult{
Code: base.Code_ClientWaitPieceReady,
DstPid: mockCDNPeerID,
},
peer: resource.NewPeer(mockPeerID, mockTask, mockHost),
parent: resource.NewPeer(mockCDNPeerID, mockTask, mockHost),
run: func(t *testing.T, callback Callback, peer *resource.Peer, parent *resource.Peer, piece *rpcscheduler.PieceResult, peerManager resource.PeerManager, cdn resource.CDN, ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, mc *resource.MockCDNMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning)
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(parent.ID)).Return(parent, true).Times(1),
)

callback.PieceFail(context.Background(), peer, piece)
assert := assert.New(t)
assert.True(peer.FSM.Is(resource.PeerStateRunning))
},
},
{
name: "can not found parent",
piece: &rpcscheduler.PieceResult{
Expand Down Expand Up @@ -452,69 +431,6 @@ func TestCallback_PieceFail(t *testing.T) {
assert.True(parent.FSM.Is(resource.PeerStateFailed))
},
},
{
name: "piece result code is Code_CDNTaskNotFound",
piece: &rpcscheduler.PieceResult{
Code: base.Code_CDNTaskNotFound,
DstPid: mockCDNPeerID,
},
peer: resource.NewPeer(mockPeerID, mockTask, mockHost),
parent: resource.NewPeer(mockCDNPeerID, mockTask, mockHost),
run: func(t *testing.T, callback Callback, peer *resource.Peer, parent *resource.Peer, piece *rpcscheduler.PieceResult, peerManager resource.PeerManager, cdn resource.CDN, ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, mc *resource.MockCDNMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning)
parent.FSM.SetState(resource.PeerStateSucceeded)
blocklist := set.NewSafeSet()
blocklist.Add(parent.ID)
var wg sync.WaitGroup
wg.Add(2)
defer wg.Wait()

gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(parent.ID)).Return(parent, true).Times(1),
ms.ScheduleParent(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return(nil, true).Times(1),
mr.CDN().Do(func() { wg.Done() }).Return(cdn).Times(1),
mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(parent, &rpcscheduler.PeerResult{}, nil).Times(1),
)

callback.PieceFail(context.Background(), peer, piece)
assert := assert.New(t)
assert.True(peer.FSM.Is(resource.PeerStateRunning))
assert.True(parent.FSM.Is(resource.PeerStatePending))
},
},
{
name: "piece result code is Code_ClientPieceNotFound and parent is CDN",
piece: &rpcscheduler.PieceResult{
Code: base.Code_ClientPieceNotFound,
DstPid: mockCDNPeerID,
},
peer: resource.NewPeer(mockPeerID, mockTask, mockHost),
parent: resource.NewPeer(mockCDNPeerID, mockTask, mockHost),
run: func(t *testing.T, callback Callback, peer *resource.Peer, parent *resource.Peer, piece *rpcscheduler.PieceResult, peerManager resource.PeerManager, cdn resource.CDN, ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, mc *resource.MockCDNMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning)
parent.FSM.SetState(resource.PeerStateSucceeded)
peer.Host.IsCDN = true
blocklist := set.NewSafeSet()
blocklist.Add(parent.ID)
var wg sync.WaitGroup
wg.Add(2)
defer wg.Wait()

gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(parent.ID)).Return(parent, true).Times(1),
ms.ScheduleParent(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return(nil, true).Times(1),
mr.CDN().Do(func() { wg.Done() }).Return(cdn).Times(1),
mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(parent, &rpcscheduler.PeerResult{}, nil).Times(1),
)

callback.PieceFail(context.Background(), peer, piece)
assert := assert.New(t)
assert.True(peer.FSM.Is(resource.PeerStateRunning))
assert.True(parent.FSM.Is(resource.PeerStatePending))
},
},
{
name: "piece result code is Code_ClientPieceNotFound and parent is not CDN",
piece: &rpcscheduler.PieceResult{
Expand Down
9 changes: 8 additions & 1 deletion scheduler/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,15 @@ func (s *service) HandlePiece(ctx context.Context, peer *resource.Peer, piece *r
}
}

// Handle piece download failed
// Handle piece download code
if piece.Code != base.Code_Success {
// FIXME(244372610) When dfdaemon download peer return empty, retry later.
if piece.Code == base.Code_ClientWaitPieceReady {
peer.Log.Infof("receive piece code %d and wait for dfdaemon piece ready", piece.Code)
return
}

// Handle piece download failed
peer.Log.Errorf("receive failed piece: %#v %#v", piece, piece.PieceInfo)
s.callback.PieceFail(ctx, peer, piece)
return
Expand Down
14 changes: 14 additions & 0 deletions scheduler/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,20 @@ func TestService_HandlePiece(t *testing.T) {
assert.True(peer.FSM.Is(resource.PeerStatePending))
},
},
{
name: "receive code is Code_ClientWaitPieceReady",
piece: &rpcscheduler.PieceResult{
Code: base.Code_ClientWaitPieceReady,
},
mock: func(mockPeer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
mockPeer.FSM.SetState(resource.PeerStateBackToSource)
},
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
assert.Equal(peer.ID, mockPeerID)
assert.True(peer.FSM.Is(resource.PeerStateBackToSource))
},
},
{
name: "receive failed piece",
piece: &rpcscheduler.PieceResult{
Expand Down

0 comments on commit 363b100

Please sign in to comment.