Skip to content

Commit

Permalink
fix: when peer state is PeerStateSucceeded, return size scope is small (
Browse files Browse the repository at this point in the history
#1103)

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Feb 28, 2022
1 parent adeb471 commit e78232d
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 2 deletions.
19 changes: 17 additions & 2 deletions scheduler/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa
fallthrough
case base.SizeScope_SMALL:
peer.Log.Info("task size scope is small")
// If the file is registered as a small type,
// there is no need to build a tree, just find the parent and return
// There is no need to build a tree, just find the parent and return
parent, ok := s.scheduler.FindParent(ctx, peer, set.NewSafeSet())
if !ok {
peer.Log.Warn("task size scope is small and it can not select parent")
Expand All @@ -124,6 +123,22 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa
}, nil
}

// When task size scope is small, parent must be downloaded successfully
// before returning to the parent directly
if !parent.FSM.Is(resource.PeerStateSucceeded) {
peer.Log.Infof("task size scope is small and download state %s is not PeerStateSucceeded", parent.FSM.Current())
if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil {
dferr := dferrors.New(base.Code_SchedError, err.Error())
peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err)
return nil, dferr
}

return &rpcscheduler.RegisterResult{
TaskId: task.ID,
SizeScope: base.SizeScope_NORMAL,
}, nil
}

firstPiece, ok := task.LoadPiece(0)
if !ok {
peer.Log.Warn("task size scope is small and it can not get first piece")
Expand Down
46 changes: 46 additions & 0 deletions scheduler/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,46 @@ func TestService_RegisterPeerTask(t *testing.T) {
assert.True(peer.FSM.Is(resource.PeerStateReceivedNormal))
},
},
{
name: "task scope size is SizeScope_SMALL and load piece error, parent state is PeerStateRunning",
req: &rpcscheduler.PeerTaskRequest{
PeerHost: &rpcscheduler.PeerHost{
Uuid: mockRawHost.Uuid,
},
},
mock: func(
req *rpcscheduler.PeerTaskRequest, mockPeer *resource.Peer, mockCDNPeer *resource.Peer,
scheduler scheduler.Scheduler, res resource.Resource, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager,
ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder,
) {
mockPeer.Task.FSM.SetState(resource.TaskStateSucceeded)
mockPeer.Task.StorePeer(mockCDNPeer)
mockPeer.Task.ContentLength.Store(129)
mockPeer.Task.StorePiece(&base.PieceInfo{
PieceNum: 0,
})
mockPeer.Task.TotalPieceCount.Store(1)
mockPeer.FSM.SetState(resource.PeerStatePending)
mockCDNPeer.FSM.SetState(resource.PeerStateRunning)

gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockPeer.Host.ID)).Return(mockPeer.Host, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
mp.LoadOrStore(gomock.Any()).Return(mockPeer, true).Times(1),
ms.FindParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockCDNPeer, true).Times(1),
)
},
expect: func(t *testing.T, peer *resource.Peer, result *rpcscheduler.RegisterResult, err error) {
assert := assert.New(t)
assert.NoError(err)
assert.Equal(result.TaskId, peer.Task.ID)
assert.Equal(result.SizeScope, base.SizeScope_NORMAL)
assert.True(peer.FSM.Is(resource.PeerStateReceivedNormal))
},
},
{
name: "task scope size is SizeScope_SMALL and load piece error, peer state is PeerStateFailed",
req: &rpcscheduler.PeerTaskRequest{
Expand All @@ -366,6 +406,8 @@ func TestService_RegisterPeerTask(t *testing.T) {
})
mockPeer.Task.TotalPieceCount.Store(1)
mockPeer.FSM.SetState(resource.PeerStateFailed)
mockCDNPeer.FSM.SetState(resource.PeerStateSucceeded)

gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1),
Expand Down Expand Up @@ -403,6 +445,8 @@ func TestService_RegisterPeerTask(t *testing.T) {
})
mockPeer.Task.TotalPieceCount.Store(1)
mockPeer.FSM.SetState(resource.PeerStateFailed)
mockCDNPeer.FSM.SetState(resource.PeerStateSucceeded)

gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1),
Expand Down Expand Up @@ -439,6 +483,8 @@ func TestService_RegisterPeerTask(t *testing.T) {
PieceNum: 0,
})
mockPeer.Task.TotalPieceCount.Store(1)
mockCDNPeer.FSM.SetState(resource.PeerStateSucceeded)

gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1),
Expand Down

0 comments on commit e78232d

Please sign in to comment.