From 363b100a03e66c8d2c40c11758f7abe62f672b3a Mon Sep 17 00:00:00 2001 From: Gaius Date: Fri, 14 Jan 2022 13:51:07 +0800 Subject: [PATCH] feat: dfdaemon report successful piece before end of piece Signed-off-by: Gaius --- client/daemon/peer/peertask_conductor.go | 4 +- client/daemon/peer/piece_manager.go | 5 +- deploy/helm-charts | 2 +- .../scheduler/evaluator/evaluator_base.go | 17 ++-- .../evaluator/evaluator_base_test.go | 37 +++++--- scheduler/scheduler/scheduler.go | 2 +- scheduler/service/callback.go | 2 - scheduler/service/callback_test.go | 84 ------------------- scheduler/service/service.go | 9 +- scheduler/service/service_test.go | 14 ++++ 10 files changed, 65 insertions(+), 111 deletions(-) diff --git a/client/daemon/peer/peertask_conductor.go b/client/daemon/peer/peertask_conductor.go index 0283f0f827f..690612fcd51 100644 --- a/client/daemon/peer/peertask_conductor.go +++ b/client/daemon/peer/peertask_conductor.go @@ -630,8 +630,8 @@ func (pt *peerTaskConductor) pullSinglePiece() { } if result, err := pt.pieceManager.DownloadPiece(ctx, request); err == nil { - pt.PublishPieceInfo(request.piece.PieceNum, request.piece.RangeSize) pt.reportSuccessResult(request, result) + pt.PublishPieceInfo(request.piece.PieceNum, request.piece.RangeSize) span.SetAttributes(config.AttributePieceSuccess.Bool(true)) span.End() @@ -924,8 +924,8 @@ func (pt *peerTaskConductor) downloadPieceWorker(id int32, requests chan *Downlo continue } else { // broadcast success piece - pt.PublishPieceInfo(request.piece.PieceNum, request.piece.RangeSize) pt.reportSuccessResult(request, result) + pt.PublishPieceInfo(request.piece.PieceNum, request.piece.RangeSize) } span.SetAttributes(config.AttributePieceSuccess.Bool(true)) diff --git a/client/daemon/peer/piece_manager.go b/client/daemon/peer/piece_manager.go index 891eafb2bf1..dda13b6da1b 100644 --- a/client/daemon/peer/piece_manager.go +++ b/client/daemon/peer/piece_manager.go @@ -371,8 +371,8 @@ func (pm *pieceManager) downloadKnownLengthSource(ctx context.Context, pt Task, return err } } - pt.PublishPieceInfo(pieceNum, uint32(result.Size)) pt.ReportPieceResult(request, result, nil) + pt.PublishPieceInfo(pieceNum, uint32(result.Size)) } log.Infof("download from source ok") @@ -417,6 +417,7 @@ func (pm *pieceManager) downloadUnknownLengthSource(ctx context.Context, pt Task return err } if result.Size == int64(size) { + pt.ReportPieceResult(request, result, nil) pt.PublishPieceInfo(pieceNum, uint32(result.Size)) continue } else if result.Size > int64(size) { @@ -443,8 +444,8 @@ func (pm *pieceManager) downloadUnknownLengthSource(ctx context.Context, pt Task pt.ReportPieceResult(request, result, err) return err } - pt.PublishPieceInfo(pieceNum, uint32(result.Size)) pt.ReportPieceResult(request, result, nil) + pt.PublishPieceInfo(pieceNum, uint32(result.Size)) break } diff --git a/deploy/helm-charts b/deploy/helm-charts index da3833f7e23..89fe4019f4b 160000 --- a/deploy/helm-charts +++ b/deploy/helm-charts @@ -1 +1 @@ -Subproject commit da3833f7e236ab07e0abcefd02ad34d8681cb58e +Subproject commit 89fe4019f4bada07237a6b120425d6f3369eccb1 diff --git a/scheduler/scheduler/evaluator/evaluator_base.go b/scheduler/scheduler/evaluator/evaluator_base.go index 604284b19c8..d2e94c1f926 100644 --- a/scheduler/scheduler/evaluator/evaluator_base.go +++ b/scheduler/scheduler/evaluator/evaluator_base.go @@ -86,7 +86,7 @@ func (eb *evaluatorBase) Evaluate(parent *resource.Peer, child *resource.Peer, t return finishedPieceWeight*calculatePieceScore(parent, child, totalPieceCount) + freeLoadWeight*calculateFreeLoadScore(parent.Host) + - hostTypeAffinityWeight*calculateHostTypeAffinityScore(parent.Host) + + hostTypeAffinityWeight*calculateHostTypeAffinityScore(parent) + idcAffinityWeight*calculateIDCAffinityScore(parent.Host, child.Host) + netTopologyAffinityWeight*calculateMultiElementAffinityScore(parent.Host.NetTopology, child.Host.NetTopology) + locationAffinityWeight*calculateMultiElementAffinityScore(parent.Host.Location, child.Host.Location) @@ -116,14 +116,19 @@ func calculateFreeLoadScore(host *resource.Host) float64 { } // calculateHostTypeAffinityScore 0.0~1.0 larger and better -func calculateHostTypeAffinityScore(host *resource.Host) float64 { - // The selected priority of CDN is lower, - // because CDN download resources are reserved for the first download as much as possible - if host.IsCDN { +func calculateHostTypeAffinityScore(peer *resource.Peer) float64 { + // When the task is downloaded for the first time, + // peer will be scheduled to cdn first, + // otherwise it will be scheduled to dfdaemon first + if peer.Host.IsCDN { + if peer.FSM.Is(resource.PeerStateRunning) { + return maxScore + } + return minScore } - return maxScore + return maxScore * 0.5 } // calculateIDCAffinityScore 0.0~1.0 larger and better diff --git a/scheduler/scheduler/evaluator/evaluator_base_test.go b/scheduler/scheduler/evaluator/evaluator_base_test.go index 01e8c943fe3..e171b32477e 100644 --- a/scheduler/scheduler/evaluator/evaluator_base_test.go +++ b/scheduler/scheduler/evaluator/evaluator_base_test.go @@ -116,7 +116,7 @@ func TestEvaluatorBase_Evaluate(t *testing.T) { }, expect: func(t *testing.T, score float64) { assert := assert.New(t) - assert.Equal(score, float64(1)) + assert.Equal(score, float64(0.925)) }, }, { @@ -131,7 +131,7 @@ func TestEvaluatorBase_Evaluate(t *testing.T) { }, expect: func(t *testing.T, score float64) { assert := assert.New(t) - assert.Equal(score, float64(1)) + assert.Equal(score, float64(0.925)) }, }, { @@ -146,7 +146,7 @@ func TestEvaluatorBase_Evaluate(t *testing.T) { }, expect: func(t *testing.T, score float64) { assert := assert.New(t) - assert.Equal(score, float64(1)) + assert.Equal(score, float64(0.925)) }, }, } @@ -317,34 +317,47 @@ func TestEvaluatorBase_calculateFreeLoadScore(t *testing.T) { func TestEvaluatorBase_calculateHostTypeAffinityScore(t *testing.T) { tests := []struct { name string - mock func(host *resource.Host) + mock func(peer *resource.Peer) expect func(t *testing.T, score float64) }{ { name: "host is normal peer", - mock: func(host *resource.Host) {}, + mock: func(peer *resource.Peer) {}, expect: func(t *testing.T, score float64) { assert := assert.New(t) - assert.Equal(score, float64(1)) + assert.Equal(score, float64(0.5)) }, }, { - name: "host is cdn", - mock: func(host *resource.Host) { - host.IsCDN = true + name: "host is cdn but peer state is not PeerStateRunning", + mock: func(peer *resource.Peer) { + peer.Host.IsCDN = true }, expect: func(t *testing.T, score float64) { assert := assert.New(t) assert.Equal(score, float64(0)) }, }, + { + name: "host is cdn but peer state is PeerStateRunning", + mock: func(peer *resource.Peer) { + peer.Host.IsCDN = true + peer.FSM.SetState(resource.PeerStateRunning) + }, + expect: func(t *testing.T, score float64) { + assert := assert.New(t) + assert.Equal(score, float64(1)) + }, + }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - host := resource.NewHost(mockRawHost) - tc.mock(host) - tc.expect(t, calculateHostTypeAffinityScore(host)) + mockHost := resource.NewHost(mockRawHost) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskBackToSourceLimit, mockTaskURLMeta) + peer := resource.NewPeer(mockPeerID, mockTask, mockHost) + tc.mock(peer) + tc.expect(t, calculateHostTypeAffinityScore(peer)) }) } } diff --git a/scheduler/scheduler/scheduler.go b/scheduler/scheduler/scheduler.go index a8c81b3f130..04a62312623 100644 --- a/scheduler/scheduler/scheduler.go +++ b/scheduler/scheduler/scheduler.go @@ -144,10 +144,10 @@ func (s *scheduler) filterParents(peer *resource.Peer, blocklist set.SafeSet) [] } parents = append(parents, parent) - peer.Log.Infof("parent %s is selected", parent.ID) return true }) + peer.Log.Infof("candidate parents include %#v", parents) return parents } diff --git a/scheduler/service/callback.go b/scheduler/service/callback.go index 45771f7bd8b..3328ca10b2c 100644 --- a/scheduler/service/callback.go +++ b/scheduler/service/callback.go @@ -197,8 +197,6 @@ func (c *callback) PieceFail(ctx context.Context, peer *resource.Peer, piece *rp // It’s not a case of back-to-source downloading failed, // to help peer to reschedule the parent node switch piece.Code { - case base.Code_ClientWaitPieceReady: - return case base.Code_ClientPieceDownloadFail, base.Code_PeerTaskNotFound, base.Code_CDNError, base.Code_CDNTaskDownloadFail: if err := parent.FSM.Event(resource.PeerEventDownloadFailed); err != nil { peer.Log.Errorf("peer fsm event failed: %v", err) diff --git a/scheduler/service/callback_test.go b/scheduler/service/callback_test.go index bc3fe1949f5..d234c1dd221 100644 --- a/scheduler/service/callback_test.go +++ b/scheduler/service/callback_test.go @@ -24,7 +24,6 @@ import ( "net/url" "reflect" "strconv" - "sync" "testing" "time" @@ -361,26 +360,6 @@ func TestCallback_PieceFail(t *testing.T) { assert.True(peer.FSM.Is(resource.PeerStateBackToSource)) }, }, - { - name: "piece result code is Code_ClientWaitPieceReady", - piece: &rpcscheduler.PieceResult{ - Code: base.Code_ClientWaitPieceReady, - DstPid: mockCDNPeerID, - }, - peer: resource.NewPeer(mockPeerID, mockTask, mockHost), - parent: resource.NewPeer(mockCDNPeerID, mockTask, mockHost), - run: func(t *testing.T, callback Callback, peer *resource.Peer, parent *resource.Peer, piece *rpcscheduler.PieceResult, peerManager resource.PeerManager, cdn resource.CDN, ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, mc *resource.MockCDNMockRecorder) { - peer.FSM.SetState(resource.PeerStateRunning) - gomock.InOrder( - mr.PeerManager().Return(peerManager).Times(1), - mp.Load(gomock.Eq(parent.ID)).Return(parent, true).Times(1), - ) - - callback.PieceFail(context.Background(), peer, piece) - assert := assert.New(t) - assert.True(peer.FSM.Is(resource.PeerStateRunning)) - }, - }, { name: "can not found parent", piece: &rpcscheduler.PieceResult{ @@ -452,69 +431,6 @@ func TestCallback_PieceFail(t *testing.T) { assert.True(parent.FSM.Is(resource.PeerStateFailed)) }, }, - { - name: "piece result code is Code_CDNTaskNotFound", - piece: &rpcscheduler.PieceResult{ - Code: base.Code_CDNTaskNotFound, - DstPid: mockCDNPeerID, - }, - peer: resource.NewPeer(mockPeerID, mockTask, mockHost), - parent: resource.NewPeer(mockCDNPeerID, mockTask, mockHost), - run: func(t *testing.T, callback Callback, peer *resource.Peer, parent *resource.Peer, piece *rpcscheduler.PieceResult, peerManager resource.PeerManager, cdn resource.CDN, ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, mc *resource.MockCDNMockRecorder) { - peer.FSM.SetState(resource.PeerStateRunning) - parent.FSM.SetState(resource.PeerStateSucceeded) - blocklist := set.NewSafeSet() - blocklist.Add(parent.ID) - var wg sync.WaitGroup - wg.Add(2) - defer wg.Wait() - - gomock.InOrder( - mr.PeerManager().Return(peerManager).Times(1), - mp.Load(gomock.Eq(parent.ID)).Return(parent, true).Times(1), - ms.ScheduleParent(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return(nil, true).Times(1), - mr.CDN().Do(func() { wg.Done() }).Return(cdn).Times(1), - mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(parent, &rpcscheduler.PeerResult{}, nil).Times(1), - ) - - callback.PieceFail(context.Background(), peer, piece) - assert := assert.New(t) - assert.True(peer.FSM.Is(resource.PeerStateRunning)) - assert.True(parent.FSM.Is(resource.PeerStatePending)) - }, - }, - { - name: "piece result code is Code_ClientPieceNotFound and parent is CDN", - piece: &rpcscheduler.PieceResult{ - Code: base.Code_ClientPieceNotFound, - DstPid: mockCDNPeerID, - }, - peer: resource.NewPeer(mockPeerID, mockTask, mockHost), - parent: resource.NewPeer(mockCDNPeerID, mockTask, mockHost), - run: func(t *testing.T, callback Callback, peer *resource.Peer, parent *resource.Peer, piece *rpcscheduler.PieceResult, peerManager resource.PeerManager, cdn resource.CDN, ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, mc *resource.MockCDNMockRecorder) { - peer.FSM.SetState(resource.PeerStateRunning) - parent.FSM.SetState(resource.PeerStateSucceeded) - peer.Host.IsCDN = true - blocklist := set.NewSafeSet() - blocklist.Add(parent.ID) - var wg sync.WaitGroup - wg.Add(2) - defer wg.Wait() - - gomock.InOrder( - mr.PeerManager().Return(peerManager).Times(1), - mp.Load(gomock.Eq(parent.ID)).Return(parent, true).Times(1), - ms.ScheduleParent(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return(nil, true).Times(1), - mr.CDN().Do(func() { wg.Done() }).Return(cdn).Times(1), - mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(parent, &rpcscheduler.PeerResult{}, nil).Times(1), - ) - - callback.PieceFail(context.Background(), peer, piece) - assert := assert.New(t) - assert.True(peer.FSM.Is(resource.PeerStateRunning)) - assert.True(parent.FSM.Is(resource.PeerStatePending)) - }, - }, { name: "piece result code is Code_ClientPieceNotFound and parent is not CDN", piece: &rpcscheduler.PieceResult{ diff --git a/scheduler/service/service.go b/scheduler/service/service.go index 4fd0fed4191..0bcb15ced4a 100644 --- a/scheduler/service/service.go +++ b/scheduler/service/service.go @@ -194,8 +194,15 @@ func (s *service) HandlePiece(ctx context.Context, peer *resource.Peer, piece *r } } - // Handle piece download failed + // Handle piece download code if piece.Code != base.Code_Success { + // FIXME(244372610) When dfdaemon download peer return empty, retry later. + if piece.Code == base.Code_ClientWaitPieceReady { + peer.Log.Infof("receive piece code %d and wait for dfdaemon piece ready", piece.Code) + return + } + + // Handle piece download failed peer.Log.Errorf("receive failed piece: %#v %#v", piece, piece.PieceInfo) s.callback.PieceFail(ctx, peer, piece) return diff --git a/scheduler/service/service_test.go b/scheduler/service/service_test.go index 5b9bd40ff8d..4475732d0f8 100644 --- a/scheduler/service/service_test.go +++ b/scheduler/service/service_test.go @@ -590,6 +590,20 @@ func TestService_HandlePiece(t *testing.T) { assert.True(peer.FSM.Is(resource.PeerStatePending)) }, }, + { + name: "receive code is Code_ClientWaitPieceReady", + piece: &rpcscheduler.PieceResult{ + Code: base.Code_ClientWaitPieceReady, + }, + mock: func(mockPeer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + mockPeer.FSM.SetState(resource.PeerStateBackToSource) + }, + expect: func(t *testing.T, peer *resource.Peer) { + assert := assert.New(t) + assert.Equal(peer.ID, mockPeerID) + assert.True(peer.FSM.Is(resource.PeerStateBackToSource)) + }, + }, { name: "receive failed piece", piece: &rpcscheduler.PieceResult{