diff --git a/scheduler/service/service.go b/scheduler/service/service.go index 0a159067660..cadb4189f60 100644 --- a/scheduler/service/service.go +++ b/scheduler/service/service.go @@ -107,8 +107,7 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa fallthrough case base.SizeScope_SMALL: peer.Log.Info("task size scope is small") - // If the file is registered as a small type, - // there is no need to build a tree, just find the parent and return + // There is no need to build a tree, just find the parent and return parent, ok := s.scheduler.FindParent(ctx, peer, set.NewSafeSet()) if !ok { peer.Log.Warn("task size scope is small and it can not select parent") @@ -124,6 +123,22 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa }, nil } + // When task size scope is small, parent must be downloaded successfully + // before returning to the parent directly + if !parent.FSM.Is(resource.PeerStateSucceeded) { + peer.Log.Infof("task size scope is small and download state %s is not PeerStateSucceeded", parent.FSM.Current()) + if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil { + dferr := dferrors.New(base.Code_SchedError, err.Error()) + peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err) + return nil, dferr + } + + return &rpcscheduler.RegisterResult{ + TaskId: task.ID, + SizeScope: base.SizeScope_NORMAL, + }, nil + } + firstPiece, ok := task.LoadPiece(0) if !ok { peer.Log.Warn("task size scope is small and it can not get first piece") diff --git a/scheduler/service/service_test.go b/scheduler/service/service_test.go index 0959b9394c9..0f2deff96a4 100644 --- a/scheduler/service/service_test.go +++ b/scheduler/service/service_test.go @@ -346,6 +346,46 @@ func TestService_RegisterPeerTask(t *testing.T) { assert.True(peer.FSM.Is(resource.PeerStateReceivedNormal)) }, }, + { + name: "task scope size is SizeScope_SMALL and load piece error, parent state is PeerStateRunning", + req: &rpcscheduler.PeerTaskRequest{ + PeerHost: &rpcscheduler.PeerHost{ + Uuid: mockRawHost.Uuid, + }, + }, + mock: func( + req *rpcscheduler.PeerTaskRequest, mockPeer *resource.Peer, mockCDNPeer *resource.Peer, + scheduler scheduler.Scheduler, res resource.Resource, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, + ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, + ) { + mockPeer.Task.FSM.SetState(resource.TaskStateSucceeded) + mockPeer.Task.StorePeer(mockCDNPeer) + mockPeer.Task.ContentLength.Store(129) + mockPeer.Task.StorePiece(&base.PieceInfo{ + PieceNum: 0, + }) + mockPeer.Task.TotalPieceCount.Store(1) + mockPeer.FSM.SetState(resource.PeerStatePending) + mockCDNPeer.FSM.SetState(resource.PeerStateRunning) + + gomock.InOrder( + mr.TaskManager().Return(taskManager).Times(1), + mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(mockPeer.Host.ID)).Return(mockPeer.Host, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.LoadOrStore(gomock.Any()).Return(mockPeer, true).Times(1), + ms.FindParent(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockCDNPeer, true).Times(1), + ) + }, + expect: func(t *testing.T, peer *resource.Peer, result *rpcscheduler.RegisterResult, err error) { + assert := assert.New(t) + assert.NoError(err) + assert.Equal(result.TaskId, peer.Task.ID) + assert.Equal(result.SizeScope, base.SizeScope_NORMAL) + assert.True(peer.FSM.Is(resource.PeerStateReceivedNormal)) + }, + }, { name: "task scope size is SizeScope_SMALL and load piece error, peer state is PeerStateFailed", req: &rpcscheduler.PeerTaskRequest{ @@ -366,6 +406,8 @@ func TestService_RegisterPeerTask(t *testing.T) { }) mockPeer.Task.TotalPieceCount.Store(1) mockPeer.FSM.SetState(resource.PeerStateFailed) + mockCDNPeer.FSM.SetState(resource.PeerStateSucceeded) + gomock.InOrder( mr.TaskManager().Return(taskManager).Times(1), mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1), @@ -403,6 +445,8 @@ func TestService_RegisterPeerTask(t *testing.T) { }) mockPeer.Task.TotalPieceCount.Store(1) mockPeer.FSM.SetState(resource.PeerStateFailed) + mockCDNPeer.FSM.SetState(resource.PeerStateSucceeded) + gomock.InOrder( mr.TaskManager().Return(taskManager).Times(1), mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1), @@ -439,6 +483,8 @@ func TestService_RegisterPeerTask(t *testing.T) { PieceNum: 0, }) mockPeer.Task.TotalPieceCount.Store(1) + mockCDNPeer.FSM.SetState(resource.PeerStateSucceeded) + gomock.InOrder( mr.TaskManager().Return(taskManager).Times(1), mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1),