From b7615239f48531a6a381f8410ba9be52523605c5 Mon Sep 17 00:00:00 2001 From: Gaius Date: Fri, 15 Apr 2022 12:19:06 +0800 Subject: [PATCH 1/2] test: AnnounceTask and StatTask Signed-off-by: Gaius --- scheduler/job/job.go | 24 +- scheduler/resource/host.go | 4 +- scheduler/resource/peer_manager.go | 2 +- scheduler/resource/task.go | 4 +- scheduler/rpcserver/rpcserver.go | 12 +- scheduler/scheduler.go | 16 +- scheduler/scheduler/scheduler.go | 8 +- scheduler/service/service.go | 161 +++++++------- scheduler/service/service_test.go | 338 +++++++++++++++++++++++++++++ 9 files changed, 455 insertions(+), 114 deletions(-) diff --git a/scheduler/job/job.go b/scheduler/job/job.go index 4514927ae2d..4013d703404 100644 --- a/scheduler/job/job.go +++ b/scheduler/job/job.go @@ -57,27 +57,27 @@ func New(cfg *config.Config, resource resource.Resource) (Job, error) { globalJob, err := internaljob.New(redisConfig, internaljob.GlobalQueue) if err != nil { - logger.Errorf("create global job queue error: %v", err) + logger.Errorf("create global job queue error: %s", err.Error()) return nil, err } logger.Infof("create global job queue: %v", globalJob) schedulerJob, err := internaljob.New(redisConfig, internaljob.SchedulersQueue) if err != nil { - logger.Errorf("create scheduler job queue error: %v", err) + logger.Errorf("create scheduler job queue error: %s", err.Error()) return nil, err } logger.Infof("create scheduler job queue: %v", schedulerJob) localQueue, err := internaljob.GetSchedulerQueue(cfg.Manager.SchedulerClusterID, cfg.Server.Host) if err != nil { - logger.Errorf("get local job queue name error: %v", err) + logger.Errorf("get local job queue name error: %s", err.Error()) return nil, err } localJob, err := internaljob.New(redisConfig, localQueue) if err != nil { - logger.Errorf("create local job queue error: %v", err) + logger.Errorf("create local job queue error: %s", err.Error()) return nil, err } logger.Infof("create local job queue: %v", localQueue) @@ -95,7 +95,7 @@ func New(cfg *config.Config, resource resource.Resource) (Job, error) { } if err := localJob.RegisterJob(namedJobFuncs); err != nil { - logger.Errorf("register preheat job to local queue error: %v", err) + logger.Errorf("register preheat job to local queue error: %s", err.Error()) return nil, err } @@ -106,21 +106,21 @@ func (j *job) Serve() { go func() { logger.Infof("ready to launch %d worker(s) on global queue", j.config.Job.GlobalWorkerNum) if err := j.globalJob.LaunchWorker("global_worker", int(j.config.Job.GlobalWorkerNum)); err != nil { - logger.Fatalf("global queue worker error: %v", err) + logger.Fatalf("global queue worker error: %s", err.Error()) } }() go func() { logger.Infof("ready to launch %d worker(s) on scheduler queue", j.config.Job.SchedulerWorkerNum) if err := j.schedulerJob.LaunchWorker("scheduler_worker", int(j.config.Job.SchedulerWorkerNum)); err != nil { - logger.Fatalf("scheduler queue worker error: %v", err) + logger.Fatalf("scheduler queue worker error: %s", err.Error()) } }() go func() { logger.Infof("ready to launch %d worker(s) on local queue", j.config.Job.LocalWorkerNum) if err := j.localJob.LaunchWorker("local_worker", int(j.config.Job.LocalWorkerNum)); err != nil { - logger.Fatalf("scheduler queue worker error: %v", err) + logger.Fatalf("scheduler queue worker error: %s", err.Error()) } }() } @@ -138,12 +138,12 @@ func (j *job) preheat(ctx context.Context, req string) error { request := &internaljob.PreheatRequest{} if err := internaljob.UnmarshalRequest(req, request); err != nil { - logger.Errorf("unmarshal request err: %v, request body: %s", err, req) + logger.Errorf("unmarshal request err: %s, request body: %s", err.Error(), req) return err } if err := validator.New().Struct(request); err != nil { - logger.Errorf("url %s validate failed: %v", request.URL, err) + logger.Errorf("url %s validate failed: %s", request.URL, err.Error()) return err } @@ -172,14 +172,14 @@ func (j *job) preheat(ctx context.Context, req string) error { UrlMeta: urlMeta, }) if err != nil { - log.Errorf("preheat failed: %v", err) + log.Errorf("preheat failed: %s", err.Error()) return err } for { piece, err := stream.Recv() if err != nil { - log.Errorf("preheat recive piece failed: %v", err) + log.Errorf("preheat recive piece failed: %s", err.Error()) return err } diff --git a/scheduler/resource/host.go b/scheduler/resource/host.go index 6f9b581d6db..62c78540a91 100644 --- a/scheduler/resource/host.go +++ b/scheduler/resource/host.go @@ -170,12 +170,12 @@ func (h *Host) LeavePeers() { h.Peers.Range(func(_, value interface{}) bool { if peer, ok := value.(*Peer); ok { if err := peer.FSM.Event(PeerEventDownloadFailed); err != nil { - peer.Log.Errorf("peer fsm event failed: %v", err) + peer.Log.Errorf("peer fsm event failed: %s", err.Error()) return true } if err := peer.FSM.Event(PeerEventLeave); err != nil { - peer.Log.Errorf("peer fsm event failed: %v", err) + peer.Log.Errorf("peer fsm event failed: %s", err.Error()) return true } diff --git a/scheduler/resource/peer_manager.go b/scheduler/resource/peer_manager.go index cb157d4c103..ad8e3666bc3 100644 --- a/scheduler/resource/peer_manager.go +++ b/scheduler/resource/peer_manager.go @@ -141,7 +141,7 @@ func (p *peerManager) RunGC() error { // If the peer is not leave, // first change the state to PeerEventLeave if err := peer.FSM.Event(PeerEventLeave); err != nil { - peer.Log.Errorf("peer fsm event failed: %v", err) + peer.Log.Errorf("peer fsm event failed: %s", err.Error()) } peer.Log.Info("gc causes the peer to leave") diff --git a/scheduler/resource/task.go b/scheduler/resource/task.go index 24a9d1bff28..0579ba3b9e9 100644 --- a/scheduler/resource/task.go +++ b/scheduler/resource/task.go @@ -337,13 +337,13 @@ func (t *Task) NotifyPeers(code base.Code, event string) { } if err := stream.Send(&rpcscheduler.PeerPacket{Code: code}); err != nil { - t.Log.Errorf("send packet to peer %s failed: %v", peer.ID, err) + t.Log.Errorf("send packet to peer %s failed: %s", peer.ID, err.Error()) return true } t.Log.Infof("task notify peer %s code %s", peer.ID, code) if err := peer.FSM.Event(event); err != nil { - peer.Log.Errorf("peer fsm event failed: %v", err) + peer.Log.Errorf("peer fsm event failed: %s", err.Error()) return true } } diff --git a/scheduler/rpcserver/rpcserver.go b/scheduler/rpcserver/rpcserver.go index 63b2301f627..fbae58b117f 100644 --- a/scheduler/rpcserver/rpcserver.go +++ b/scheduler/rpcserver/rpcserver.go @@ -83,12 +83,7 @@ func (s *Server) ReportPeerResult(ctx context.Context, req *scheduler.PeerResult return new(empty.Empty), s.service.ReportPeerResult(ctx, req) } -// LeaveTask makes the peer unschedulable -func (s *Server) LeaveTask(ctx context.Context, req *scheduler.PeerTarget) (*empty.Empty, error) { - return new(empty.Empty), s.service.LeaveTask(ctx, req) -} - -// StatTask checks if the given task exists in P2P network +// StatTask checks if the given task exists func (s *Server) StatTask(ctx context.Context, req *scheduler.StatTaskRequest) (*scheduler.Task, error) { // TODO: add metrics return s.service.StatTask(ctx, req) @@ -99,3 +94,8 @@ func (s *Server) AnnounceTask(ctx context.Context, req *scheduler.AnnounceTaskRe // TODO: add metrics return new(empty.Empty), s.service.AnnounceTask(ctx, req) } + +// LeaveTask makes the peer unschedulable +func (s *Server) LeaveTask(ctx context.Context, req *scheduler.PeerTarget) (*empty.Empty, error) { + return new(empty.Empty), s.service.LeaveTask(ctx, req) +} diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 0bf1b4b134b..6d7f21b54b2 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -90,7 +90,7 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err Location: s.config.Host.Location, SchedulerClusterId: uint64(s.config.Manager.SchedulerClusterID), }); err != nil { - logger.Fatalf("register to manager failed %v", err) + logger.Fatalf("register to manager failed %s", err.Error()) } // Initialize dynconfig client @@ -165,7 +165,7 @@ func (s *Server) Serve() error { // Serve dynConfig go func() { if err := s.dynconfig.Serve(); err != nil { - logger.Fatalf("dynconfig start failed %v", err) + logger.Fatalf("dynconfig start failed %s", err.Error()) } logger.Info("dynconfig start successfully") }() @@ -188,7 +188,7 @@ func (s *Server) Serve() error { if err == http.ErrServerClosed { return } - logger.Fatalf("metrics server closed unexpect: %v", err) + logger.Fatalf("metrics server closed unexpect: %s", err.Error()) } }() } @@ -208,14 +208,14 @@ func (s *Server) Serve() error { // Generate GRPC limit listener listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", s.config.Server.Listen, s.config.Server.Port)) if err != nil { - logger.Fatalf("net listener failed to start: %v", err) + logger.Fatalf("net listener failed to start: %s", err.Error()) } defer listener.Close() // Started GRPC server logger.Infof("started grpc server at %s://%s", listener.Addr().Network(), listener.Addr().String()) if err := s.grpcServer.Serve(listener); err != nil { - logger.Errorf("stoped grpc server: %v", err) + logger.Errorf("stoped grpc server: %s", err.Error()) return err } @@ -225,14 +225,14 @@ func (s *Server) Serve() error { func (s *Server) Stop() { // Stop dynconfig server if err := s.dynconfig.Stop(); err != nil { - logger.Errorf("dynconfig client closed failed %v", err) + logger.Errorf("dynconfig client closed failed %s", err.Error()) } logger.Info("dynconfig client closed") // Stop manager client if s.managerClient != nil { if err := s.managerClient.Close(); err != nil { - logger.Errorf("manager client failed to stop: %v", err) + logger.Errorf("manager client failed to stop: %s", err.Error()) } logger.Info("manager client closed") } @@ -244,7 +244,7 @@ func (s *Server) Stop() { // Stop metrics server if s.metricsServer != nil { if err := s.metricsServer.Shutdown(context.Background()); err != nil { - logger.Errorf("metrics server failed to stop: %v", err) + logger.Errorf("metrics server failed to stop: %s", err.Error()) } logger.Info("metrics server closed under request") } diff --git a/scheduler/scheduler/scheduler.go b/scheduler/scheduler/scheduler.go index 9f432870503..c4e4005a8ea 100644 --- a/scheduler/scheduler/scheduler.go +++ b/scheduler/scheduler/scheduler.go @@ -92,14 +92,14 @@ func (s *scheduler) ScheduleParent(ctx context.Context, peer *resource.Peer, blo // Notify peer back-to-source if err := stream.Send(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource}); err != nil { - peer.Log.Errorf("send packet failed: %v", err) + peer.Log.Errorf("send packet failed: %s", err.Error()) return } peer.Log.Infof("peer scheduling %d times, peer downloads back-to-source %d", n, base.Code_SchedNeedBackSource) if err := peer.FSM.Event(resource.PeerEventDownloadFromBackToSource); err != nil { - peer.Log.Errorf("peer fsm event failed: %v", err) + peer.Log.Errorf("peer fsm event failed: %s", err.Error()) return } @@ -107,7 +107,7 @@ func (s *scheduler) ScheduleParent(ctx context.Context, peer *resource.Peer, blo // peer back-to-source and reset task state to TaskStateRunning if peer.Task.FSM.Is(resource.TaskStateFailed) { if err := peer.Task.FSM.Event(resource.TaskEventDownload); err != nil { - peer.Task.Log.Errorf("task fsm event failed: %v", err) + peer.Task.Log.Errorf("task fsm event failed: %s", err.Error()) return } } @@ -125,7 +125,7 @@ func (s *scheduler) ScheduleParent(ctx context.Context, peer *resource.Peer, blo // Notify peer schedule failed if err := stream.Send(&rpcscheduler.PeerPacket{Code: base.Code_SchedTaskStatusError}); err != nil { - peer.Log.Errorf("send packet failed: %v", err) + peer.Log.Errorf("send packet failed: %s", err.Error()) return } peer.Log.Errorf("peer scheduling exceeds the limit %d times and return code %d", s.config.RetryLimit, base.Code_SchedTaskStatusError) diff --git a/scheduler/service/service.go b/scheduler/service/service.go index a0118450987..8a5b7133ee1 100644 --- a/scheduler/service/service.go +++ b/scheduler/service/service.go @@ -79,9 +79,9 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa // Register task and trigger cdn download task task, err := s.registerTask(ctx, req) if err != nil { - dferr := dferrors.New(base.Code_SchedTaskStatusError, "register task is fail") - logger.Errorf("peer %s register is failed: %v", req.PeerId, err) - return nil, dferr + msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error()) + logger.Error(msg) + return nil, dferrors.New(base.Code_SchedTaskStatusError, msg) } host := s.registerHost(ctx, req.PeerHost) peer := s.registerPeer(ctx, req.PeerId, task, host, req.UrlMeta.Tag) @@ -96,9 +96,9 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa peer.Log.Info("task size scope is tiny and return piece content directly") if len(task.DirectPiece) > 0 && int64(len(task.DirectPiece)) == task.ContentLength.Load() { if err := peer.FSM.Event(resource.PeerEventRegisterTiny); err != nil { - dferr := dferrors.New(base.Code_SchedError, err.Error()) - peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err) - return nil, dferr + msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error()) + peer.Log.Error(msg) + return nil, dferrors.New(base.Code_SchedError, msg) } return &rpcscheduler.RegisterResult{ @@ -121,9 +121,9 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa if !ok { peer.Log.Warn("task size scope is small and it can not select parent") if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil { - dferr := dferrors.New(base.Code_SchedError, err.Error()) - peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err) - return nil, dferr + msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error()) + peer.Log.Error(msg) + return nil, dferrors.New(base.Code_SchedError, msg) } return &rpcscheduler.RegisterResult{ @@ -137,9 +137,9 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa if !parent.FSM.Is(resource.PeerStateSucceeded) { peer.Log.Infof("task size scope is small and download state %s is not PeerStateSucceeded", parent.FSM.Current()) if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil { - dferr := dferrors.New(base.Code_SchedError, err.Error()) - peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err) - return nil, dferr + msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error()) + peer.Log.Error(msg) + return nil, dferrors.New(base.Code_SchedError, msg) } return &rpcscheduler.RegisterResult{ @@ -152,9 +152,9 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa if !ok { peer.Log.Warn("task size scope is small and it can not get first piece") if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil { - dferr := dferrors.New(base.Code_SchedError, err.Error()) - peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err) - return nil, dferr + msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error()) + peer.Log.Error(msg) + return nil, dferrors.New(base.Code_SchedError, msg) } return &rpcscheduler.RegisterResult{ @@ -165,9 +165,9 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa peer.ReplaceParent(parent) if err := peer.FSM.Event(resource.PeerEventRegisterSmall); err != nil { - dferr := dferrors.New(base.Code_SchedError, err.Error()) - peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err) - return nil, dferr + msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error()) + peer.Log.Error(msg) + return nil, dferrors.New(base.Code_SchedError, msg) } singlePiece := &rpcscheduler.SinglePiece{ @@ -194,9 +194,9 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa default: peer.Log.Info("task size scope is normal and needs to be register") if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil { - dferr := dferrors.New(base.Code_SchedError, err.Error()) - peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err) - return nil, dferr + msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error()) + peer.Log.Error(msg) + return nil, dferrors.New(base.Code_SchedError, msg) } return &rpcscheduler.RegisterResult{ @@ -209,9 +209,9 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa // Task is unsuccessful peer.Log.Infof("task state is %s and needs to be register", task.FSM.Current()) if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil { - dferr := dferrors.New(base.Code_SchedError, err.Error()) - peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err) - return nil, dferr + msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error()) + peer.Log.Error(msg) + return nil, dferrors.New(base.Code_SchedError, msg) } return &rpcscheduler.RegisterResult{ @@ -242,7 +242,7 @@ func (s *Service) ReportPieceResult(stream rpcscheduler.Scheduler_ReportPieceRes if err == io.EOF { return nil } - logger.Errorf("receive piece %#v error: %v", piece, err) + logger.Errorf("receive piece %#v error: %s", piece, err.Error()) return err } @@ -252,9 +252,9 @@ func (s *Service) ReportPieceResult(stream rpcscheduler.Scheduler_ReportPieceRes // Get peer from peer manager peer, ok = s.resource.PeerManager().Load(piece.SrcPid) if !ok { - dferr := dferrors.Newf(base.Code_SchedPeerNotFound, "peer %s not found", piece.SrcPid) - logger.Errorf("peer %s not found", piece.SrcPid) - return dferr + msg := fmt.Sprintf("peer %s not found", piece.SrcPid) + logger.Error(msg) + return dferrors.New(base.Code_SchedPeerNotFound, msg) } // Peer setting stream @@ -323,8 +323,9 @@ func (s *Service) ReportPieceResult(stream rpcscheduler.Scheduler_ReportPieceRes func (s *Service) ReportPeerResult(ctx context.Context, req *rpcscheduler.PeerResult) error { peer, ok := s.resource.PeerManager().Load(req.PeerId) if !ok { - logger.Errorf("report peer result and peer %s is not exists", req.PeerId) - return dferrors.Newf(base.Code_SchedPeerNotFound, "peer %s not found", req.PeerId) + msg := fmt.Sprintf("report peer result and peer %s is not exists", req.PeerId) + logger.Error(msg) + return dferrors.New(base.Code_SchedPeerNotFound, msg) } metrics.DownloadCount.WithLabelValues(peer.BizTag).Inc() @@ -361,40 +362,6 @@ func (s *Service) ReportPeerResult(ctx context.Context, req *rpcscheduler.PeerRe return nil } -// LeaveTask makes the peer unschedulable -func (s *Service) LeaveTask(ctx context.Context, req *rpcscheduler.PeerTarget) error { - peer, ok := s.resource.PeerManager().Load(req.PeerId) - if !ok { - logger.Errorf("leave task and peer %s is not exists", req.PeerId) - return dferrors.Newf(base.Code_SchedPeerNotFound, "peer %s not found", req.PeerId) - } - - metrics.LeaveTaskCount.WithLabelValues(peer.BizTag).Inc() - - peer.Log.Infof("leave task: %#v", req) - if err := peer.FSM.Event(resource.PeerEventLeave); err != nil { - peer.Log.Errorf("peer fsm event failed: %v", err) - - metrics.LeaveTaskFailureCount.WithLabelValues(peer.BizTag).Inc() - return dferrors.Newf(base.Code_SchedTaskStatusError, err.Error()) - } - - peer.Children.Range(func(_, value interface{}) bool { - child, ok := value.(*resource.Peer) - if !ok { - return true - } - - // Reschedule a new parent to children of peer to exclude the current leave peer - child.Log.Infof("schedule parent because of parent peer %s is leaving", peer.ID) - s.scheduler.ScheduleParent(ctx, child, child.BlockPeers) - return true - }) - - s.resource.PeerManager().Delete(peer.ID) - return nil -} - // StatTask checks the current state of the task func (s *Service) StatTask(ctx context.Context, req *rpcscheduler.StatTaskRequest) (*rpcscheduler.Task, error) { task, loaded := s.resource.TaskManager().Load(req.TaskId) @@ -404,7 +371,7 @@ func (s *Service) StatTask(ctx context.Context, req *rpcscheduler.StatTaskReques return nil, dferrors.New(base.Code_PeerTaskNotFound, msg) } - task.Log.Debug("task has been found in P2P network") + task.Log.Debug("task has been found") return &rpcscheduler.Task{ Id: task.ID, Type: int32(task.Type), @@ -439,7 +406,7 @@ func (s *Service) AnnounceTask(ctx context.Context, req *rpcscheduler.AnnounceTa if !task.FSM.Is(resource.TaskStateSucceeded) { if task.FSM.Is(resource.TaskStatePending) { if err := task.FSM.Event(resource.TaskEventDownload); err != nil { - msg := fmt.Sprintf("task fsm Pending -> Download event failed: %s", err) + msg := fmt.Sprintf("task fsm event failed: %s", err.Error()) task.Log.Error(msg) return dferrors.New(base.Code_SchedError, msg) } @@ -447,7 +414,7 @@ func (s *Service) AnnounceTask(ctx context.Context, req *rpcscheduler.AnnounceTa if task.FSM.Is(resource.TaskStateFailed) { if err := task.FSM.Event(resource.TaskEventDownload); err != nil { - msg := fmt.Sprintf("task fsm Failed -> Download event failed: %s", err) + msg := fmt.Sprintf("task fsm event failed: %s", err.Error()) task.Log.Error(msg) return dferrors.New(base.Code_SchedError, msg) } @@ -477,7 +444,7 @@ func (s *Service) AnnounceTask(ctx context.Context, req *rpcscheduler.AnnounceTa if !peer.FSM.Is(resource.PeerStateSucceeded) { if peer.FSM.Is(resource.PeerStatePending) { if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil { - msg := fmt.Sprintf("peer fsm Pending -> Normal event failed: %s", err) + msg := fmt.Sprintf("peer fsm event failed: %s", err.Error()) task.Log.Error(msg) return dferrors.New(base.Code_SchedError, msg) } @@ -487,7 +454,7 @@ func (s *Service) AnnounceTask(ctx context.Context, req *rpcscheduler.AnnounceTa peer.FSM.Is(resource.PeerStateReceivedSmall) || peer.FSM.Is(resource.PeerStateReceivedNormal) { if err := peer.FSM.Event(resource.PeerEventDownload); err != nil { - msg := fmt.Sprintf("peer fsm Normal -> Download event failed: %s", err) + msg := fmt.Sprintf("peer fsm event failed: %s", err.Error()) task.Log.Error(msg) return dferrors.New(base.Code_SchedError, msg) } @@ -499,6 +466,42 @@ func (s *Service) AnnounceTask(ctx context.Context, req *rpcscheduler.AnnounceTa return nil } +// LeaveTask makes the peer unschedulable +func (s *Service) LeaveTask(ctx context.Context, req *rpcscheduler.PeerTarget) error { + peer, ok := s.resource.PeerManager().Load(req.PeerId) + if !ok { + msg := fmt.Sprintf("leave task and peer %s is not exists", req.PeerId) + logger.Error(msg) + return dferrors.New(base.Code_SchedPeerNotFound, msg) + } + + metrics.LeaveTaskCount.WithLabelValues(peer.BizTag).Inc() + + peer.Log.Infof("leave task: %#v", req) + if err := peer.FSM.Event(resource.PeerEventLeave); err != nil { + metrics.LeaveTaskFailureCount.WithLabelValues(peer.BizTag).Inc() + + msg := fmt.Sprintf("peer fsm event failed: %s", err.Error()) + peer.Log.Error(msg) + return dferrors.New(base.Code_SchedTaskStatusError, msg) + } + + peer.Children.Range(func(_, value interface{}) bool { + child, ok := value.(*resource.Peer) + if !ok { + return true + } + + // Reschedule a new parent to children of peer to exclude the current leave peer + child.Log.Infof("schedule parent because of parent peer %s is leaving", peer.ID) + s.scheduler.ScheduleParent(ctx, child, child.BlockPeers) + return true + }) + + s.resource.PeerManager().Delete(peer.ID) + return nil +} + // registerTask creates a new task or reuses a previous task func (s *Service) registerTask(ctx context.Context, req *rpcscheduler.PeerTaskRequest) (*resource.Task, error) { task := resource.NewTask(idgen.TaskID(req.Url, req.UrlMeta), req.Url, resource.TaskTypeNormal, req.UrlMeta, resource.WithBackToSourceLimit(int32(s.config.Scheduler.BackSourceCount))) @@ -564,7 +567,7 @@ func (s *Service) triggerCDNTask(ctx context.Context, task *resource.Task) { peer, endOfPiece, err := s.resource.CDN().TriggerTask( trace.ContextWithSpanContext(context.Background(), trace.SpanContextFromContext(ctx)), task) if err != nil { - task.Log.Errorf("trigger cdn download task failed: %v", err) + task.Log.Errorf("trigger cdn download task failed: %s", err.Error()) s.handleTaskFail(ctx, task) return } @@ -587,7 +590,7 @@ func (s *Service) handleBeginOfPiece(ctx context.Context, peer *resource.Peer) { // the peer has already returned to piece data when registering peer.Log.Info("file type is tiny, peer has already returned to piece data when registering") if err := peer.FSM.Event(resource.PeerEventDownload); err != nil { - peer.Log.Errorf("peer fsm event failed: %v", err) + peer.Log.Errorf("peer fsm event failed: %s", err.Error()) return } case resource.PeerStateReceivedSmall: @@ -595,12 +598,12 @@ func (s *Service) handleBeginOfPiece(ctx context.Context, peer *resource.Peer) { // the peer has already returned to the parent when registering peer.Log.Info("file type is small, peer has already returned to the parent when registering") if err := peer.FSM.Event(resource.PeerEventDownload); err != nil { - peer.Log.Errorf("peer fsm event failed: %v", err) + peer.Log.Errorf("peer fsm event failed: %s", err.Error()) return } case resource.PeerStateReceivedNormal: if err := peer.FSM.Event(resource.PeerEventDownload); err != nil { - peer.Log.Errorf("peer fsm event failed: %v", err) + peer.Log.Errorf("peer fsm event failed: %s", err.Error()) return } @@ -648,7 +651,7 @@ func (s *Service) handlePieceFail(ctx context.Context, peer *resource.Peer, piec switch piece.Code { case base.Code_PeerTaskNotFound, base.Code_CDNError, base.Code_CDNTaskDownloadFail: if err := parent.FSM.Event(resource.PeerEventDownloadFailed); err != nil { - peer.Log.Errorf("peer fsm event failed: %v", err) + peer.Log.Errorf("peer fsm event failed: %s", err.Error()) break } case base.Code_ClientPieceNotFound: @@ -693,12 +696,12 @@ func (s *Service) handlePeerSuccess(ctx context.Context, peer *resource.Peer) { // Tiny file downloaded successfully peer.Task.DirectPiece = data } else { - peer.Log.Warnf("download tiny file length is %d, task content length is %d, downloading is failed: %v", len(data), peer.Task.ContentLength.Load(), err) + peer.Log.Warnf("download tiny file length is %d, task content length is %d, downloading is failed: %s", len(data), peer.Task.ContentLength.Load(), err.Error()) } } if err := peer.FSM.Event(resource.PeerEventDownloadSucceeded); err != nil { - peer.Log.Errorf("peer fsm event failed: %v", err) + peer.Log.Errorf("peer fsm event failed: %s", err.Error()) return } } @@ -706,7 +709,7 @@ func (s *Service) handlePeerSuccess(ctx context.Context, peer *resource.Peer) { // handlePeerFail handles failed peer func (s *Service) handlePeerFail(ctx context.Context, peer *resource.Peer) { if err := peer.FSM.Event(resource.PeerEventDownloadFailed); err != nil { - peer.Log.Errorf("peer fsm event failed: %v", err) + peer.Log.Errorf("peer fsm event failed: %s", err.Error()) return } @@ -733,7 +736,7 @@ func (s *Service) handleTaskSuccess(ctx context.Context, task *resource.Task, re } if err := task.FSM.Event(resource.TaskEventDownloadSucceeded); err != nil { - task.Log.Errorf("task fsm event failed: %v", err) + task.Log.Errorf("task fsm event failed: %s", err.Error()) return } @@ -758,7 +761,7 @@ func (s *Service) handleTaskFail(ctx context.Context, task *resource.Task) { } if err := task.FSM.Event(resource.TaskEventDownloadFailed); err != nil { - task.Log.Errorf("task fsm event failed: %v", err) + task.Log.Errorf("task fsm event failed: %s", err.Error()) return } } diff --git a/scheduler/service/service_test.go b/scheduler/service/service_test.go index 6bc8e775516..0910ad9dcc7 100644 --- a/scheduler/service/service_test.go +++ b/scheduler/service/service_test.go @@ -92,6 +92,7 @@ var ( mockTaskID = idgen.TaskID(mockTaskURL, mockTaskURLMeta) mockPeerID = idgen.PeerID("127.0.0.1") mockCDNPeerID = idgen.CDNPeerID("127.0.0.1") + mockCID = "d7y://foo" ) func TestService_New(t *testing.T) { @@ -1064,6 +1065,343 @@ func TestService_ReportPeerResult(t *testing.T) { } } +func TestService_StatTask(t *testing.T) { + tests := []struct { + name string + mock func(mockTask *resource.Task, taskManager resource.TaskManager, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder) + expect func(t *testing.T, task *rpcscheduler.Task, err error) + }{ + { + name: "task not found", + mock: func(mockTask *resource.Task, taskManager resource.TaskManager, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder) { + gomock.InOrder( + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Any()).Return(nil, false).Times(1), + ) + }, + expect: func(t *testing.T, task *rpcscheduler.Task, err error) { + assert := assert.New(t) + assert.Error(err) + }, + }, + { + name: "stat task", + mock: func(mockTask *resource.Task, taskManager resource.TaskManager, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder) { + gomock.InOrder( + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Any()).Return(mockTask, true).Times(1), + ) + }, + expect: func(t *testing.T, task *rpcscheduler.Task, err error) { + assert := assert.New(t) + assert.NoError(err) + assert.EqualValues(task, &rpcscheduler.Task{ + Id: mockTaskID, + Type: resource.TaskTypeNormal, + ContentLength: 0, + TotalPieceCount: 0, + State: resource.TaskStatePending, + PeerCount: 0, + HasAvailablePeer: false, + }) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + scheduler := mocks.NewMockScheduler(ctl) + res := resource.NewMockResource(ctl) + dynconfig := configmocks.NewMockDynconfigInterface(ctl) + storage := storagemocks.NewMockStorage(ctl) + taskManager := resource.NewMockTaskManager(ctl) + svc := New(&config.Config{Scheduler: mockSchedulerConfig, Metrics: &config.MetricsConfig{EnablePeerHost: true}}, res, scheduler, dynconfig, storage) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, resource.TaskTypeNormal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) + + tc.mock(mockTask, taskManager, res.EXPECT(), taskManager.EXPECT()) + task, err := svc.StatTask(context.Background(), &rpcscheduler.StatTaskRequest{TaskId: mockTaskID}) + tc.expect(t, task, err) + }) + } +} + +func TestService_AnnounceTask(t *testing.T) { + tests := []struct { + name string + req *rpcscheduler.AnnounceTaskRequest + mock func(mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, + hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, + mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) + expect func(t *testing.T, mockTask *resource.Task, mockPeer *resource.Peer, err error) + }{ + { + name: "PieceInfos is empty", + req: &rpcscheduler.AnnounceTaskRequest{ + PiecePacket: &base.PiecePacket{}, + }, + mock: func(mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, + hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, + mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + }, + expect: func(t *testing.T, mockTask *resource.Task, mockPeer *resource.Peer, err error) { + assert := assert.New(t) + dferr, ok := err.(*dferrors.DfError) + assert.True(ok) + assert.Equal(dferr.Code, base.Code_BadRequest) + }, + }, + { + name: "PieceInfos length not equal to TotalPiece", + req: &rpcscheduler.AnnounceTaskRequest{ + PiecePacket: &base.PiecePacket{ + TotalPiece: 1, + }, + }, + mock: func(mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, + hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, + mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + }, + expect: func(t *testing.T, mockTask *resource.Task, mockPeer *resource.Peer, err error) { + assert := assert.New(t) + dferr, ok := err.(*dferrors.DfError) + assert.True(ok) + assert.Equal(dferr.Code, base.Code_BadRequest) + }, + }, + { + name: "task state is TaskStateSucceeded and peer state is PeerStateSucceeded", + req: &rpcscheduler.AnnounceTaskRequest{ + TaskId: mockTaskID, + Cid: mockCID, + UrlMeta: &base.UrlMeta{}, + PeerHost: &rpcscheduler.PeerHost{ + Uuid: mockRawHost.Uuid, + }, + PiecePacket: &base.PiecePacket{ + PieceInfos: []*base.PieceInfo{{PieceNum: 1}}, + TotalPiece: 1, + }, + }, + mock: func(mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, + hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, + mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + mockTask.FSM.SetState(resource.TaskStateSucceeded) + mockPeer.FSM.SetState(resource.PeerStateSucceeded) + + gomock.InOrder( + mr.TaskManager().Return(taskManager).Times(1), + mt.LoadOrStore(gomock.Any()).Return(mockTask, true).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Any()).Return(mockHost, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.LoadOrStore(gomock.Any()).Return(mockPeer, true).Times(1), + ) + }, + expect: func(t *testing.T, mockTask *resource.Task, mockPeer *resource.Peer, err error) { + assert := assert.New(t) + assert.NoError(err) + assert.Equal(mockTask.FSM.Current(), resource.TaskStateSucceeded) + assert.Equal(mockPeer.FSM.Current(), resource.PeerStateSucceeded) + }, + }, + { + name: "task state is TaskStatePending and peer state is PeerStateSucceeded", + req: &rpcscheduler.AnnounceTaskRequest{ + TaskId: mockTaskID, + Cid: mockCID, + UrlMeta: &base.UrlMeta{}, + PeerHost: mockRawHost, + PiecePacket: &base.PiecePacket{ + PieceInfos: []*base.PieceInfo{{PieceNum: 1, DownloadCost: 1}}, + TotalPiece: 1, + ContentLength: 1000, + }, + }, + mock: func(mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, + hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, + mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + mockTask.FSM.SetState(resource.TaskStatePending) + mockPeer.FSM.SetState(resource.PeerStateSucceeded) + + gomock.InOrder( + mr.TaskManager().Return(taskManager).Times(1), + mt.LoadOrStore(gomock.Any()).Return(mockTask, true).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Any()).Return(mockHost, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.LoadOrStore(gomock.Any()).Return(mockPeer, true).Times(1), + ) + }, + expect: func(t *testing.T, mockTask *resource.Task, mockPeer *resource.Peer, err error) { + assert := assert.New(t) + assert.NoError(err) + assert.Equal(mockTask.FSM.Current(), resource.TaskStateSucceeded) + assert.Equal(mockTask.TotalPieceCount.Load(), int32(1)) + assert.Equal(mockTask.ContentLength.Load(), int64(1000)) + piece, ok := mockTask.LoadPiece(1) + assert.True(ok) + assert.EqualValues(piece, &base.PieceInfo{PieceNum: 1, DownloadCost: 1}) + + assert.Equal(mockPeer.Pieces.Count(), uint(1)) + assert.Equal(mockPeer.PieceCosts()[0], int64(1*time.Millisecond)) + assert.Equal(mockPeer.FSM.Current(), resource.PeerStateSucceeded) + }, + }, + { + name: "task state is TaskStateFailed and peer state is PeerStateSucceeded", + req: &rpcscheduler.AnnounceTaskRequest{ + TaskId: mockTaskID, + Cid: mockCID, + UrlMeta: &base.UrlMeta{}, + PeerHost: mockRawHost, + PiecePacket: &base.PiecePacket{ + PieceInfos: []*base.PieceInfo{{PieceNum: 1, DownloadCost: 1}}, + TotalPiece: 1, + ContentLength: 1000, + }, + }, + mock: func(mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, + hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, + mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + mockTask.FSM.SetState(resource.TaskStateFailed) + mockPeer.FSM.SetState(resource.PeerStateSucceeded) + + gomock.InOrder( + mr.TaskManager().Return(taskManager).Times(1), + mt.LoadOrStore(gomock.Any()).Return(mockTask, true).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Any()).Return(mockHost, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.LoadOrStore(gomock.Any()).Return(mockPeer, true).Times(1), + ) + }, + expect: func(t *testing.T, mockTask *resource.Task, mockPeer *resource.Peer, err error) { + assert := assert.New(t) + assert.NoError(err) + assert.Equal(mockTask.FSM.Current(), resource.TaskStateSucceeded) + assert.Equal(mockTask.TotalPieceCount.Load(), int32(1)) + assert.Equal(mockTask.ContentLength.Load(), int64(1000)) + piece, ok := mockTask.LoadPiece(1) + assert.True(ok) + + assert.EqualValues(piece, &base.PieceInfo{PieceNum: 1, DownloadCost: 1}) + assert.Equal(mockPeer.Pieces.Count(), uint(1)) + assert.Equal(mockPeer.PieceCosts()[0], int64(1*time.Millisecond)) + assert.Equal(mockPeer.FSM.Current(), resource.PeerStateSucceeded) + }, + }, + { + name: "task state is TaskStatePending and peer state is PeerStatePending", + req: &rpcscheduler.AnnounceTaskRequest{ + TaskId: mockTaskID, + Cid: mockCID, + UrlMeta: &base.UrlMeta{}, + PeerHost: mockRawHost, + PiecePacket: &base.PiecePacket{ + PieceInfos: []*base.PieceInfo{{PieceNum: 1, DownloadCost: 1}}, + TotalPiece: 1, + ContentLength: 1000, + }, + }, + mock: func(mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, + hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, + mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + mockTask.FSM.SetState(resource.TaskStatePending) + mockPeer.FSM.SetState(resource.PeerStatePending) + + gomock.InOrder( + mr.TaskManager().Return(taskManager).Times(1), + mt.LoadOrStore(gomock.Any()).Return(mockTask, true).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Any()).Return(mockHost, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.LoadOrStore(gomock.Any()).Return(mockPeer, true).Times(1), + ) + }, + expect: func(t *testing.T, mockTask *resource.Task, mockPeer *resource.Peer, err error) { + assert := assert.New(t) + assert.NoError(err) + assert.Equal(mockTask.FSM.Current(), resource.TaskStateSucceeded) + assert.Equal(mockTask.TotalPieceCount.Load(), int32(1)) + assert.Equal(mockTask.ContentLength.Load(), int64(1000)) + piece, ok := mockTask.LoadPiece(1) + assert.True(ok) + + assert.EqualValues(piece, &base.PieceInfo{PieceNum: 1, DownloadCost: 1}) + assert.Equal(mockPeer.Pieces.Count(), uint(1)) + assert.Equal(mockPeer.PieceCosts()[0], int64(1*time.Millisecond)) + assert.Equal(mockPeer.FSM.Current(), resource.PeerStateSucceeded) + }, + }, + { + name: "task state is TaskStatePending and peer state is PeerStateReceivedNormal", + req: &rpcscheduler.AnnounceTaskRequest{ + TaskId: mockTaskID, + Cid: mockCID, + UrlMeta: &base.UrlMeta{}, + PeerHost: mockRawHost, + PiecePacket: &base.PiecePacket{ + PieceInfos: []*base.PieceInfo{{PieceNum: 1, DownloadCost: 1}}, + TotalPiece: 1, + ContentLength: 1000, + }, + }, + mock: func(mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, + hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, + mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + mockTask.FSM.SetState(resource.TaskStatePending) + mockPeer.FSM.SetState(resource.PeerStateReceivedNormal) + + gomock.InOrder( + mr.TaskManager().Return(taskManager).Times(1), + mt.LoadOrStore(gomock.Any()).Return(mockTask, true).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Any()).Return(mockHost, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.LoadOrStore(gomock.Any()).Return(mockPeer, true).Times(1), + ) + }, + expect: func(t *testing.T, mockTask *resource.Task, mockPeer *resource.Peer, err error) { + assert := assert.New(t) + assert.NoError(err) + assert.Equal(mockTask.FSM.Current(), resource.TaskStateSucceeded) + assert.Equal(mockTask.TotalPieceCount.Load(), int32(1)) + assert.Equal(mockTask.ContentLength.Load(), int64(1000)) + piece, ok := mockTask.LoadPiece(1) + assert.True(ok) + + assert.EqualValues(piece, &base.PieceInfo{PieceNum: 1, DownloadCost: 1}) + assert.Equal(mockPeer.Pieces.Count(), uint(1)) + assert.Equal(mockPeer.PieceCosts()[0], int64(1*time.Millisecond)) + assert.Equal(mockPeer.FSM.Current(), resource.PeerStateSucceeded) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + scheduler := mocks.NewMockScheduler(ctl) + res := resource.NewMockResource(ctl) + dynconfig := configmocks.NewMockDynconfigInterface(ctl) + storage := storagemocks.NewMockStorage(ctl) + hostManager := resource.NewMockHostManager(ctl) + taskManager := resource.NewMockTaskManager(ctl) + peerManager := resource.NewMockPeerManager(ctl) + svc := New(&config.Config{Scheduler: mockSchedulerConfig, Metrics: &config.MetricsConfig{EnablePeerHost: true}}, res, scheduler, dynconfig, storage) + mockHost := resource.NewHost(mockRawHost) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, resource.TaskTypeNormal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) + mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost) + + tc.mock(mockHost, mockTask, mockPeer, hostManager, taskManager, peerManager, res.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT()) + tc.expect(t, mockTask, mockPeer, svc.AnnounceTask(context.Background(), tc.req)) + }) + } +} + func TestService_LeaveTask(t *testing.T) { mockHost := resource.NewHost(mockRawHost) mockTask := resource.NewTask(mockTaskID, mockTaskURL, resource.TaskTypeNormal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) From 44c9e7cc6c96bbaadebc4f3864a8d509f6012209 Mon Sep 17 00:00:00 2001 From: Gaius Date: Fri, 15 Apr 2022 14:13:37 +0800 Subject: [PATCH 2/2] fix: error message Signed-off-by: Gaius --- scheduler/service/service.go | 168 ++++------------------------------- 1 file changed, 15 insertions(+), 153 deletions(-) diff --git a/scheduler/service/service.go b/scheduler/service/service.go index ef4acabfadf..62811c50bfb 100644 --- a/scheduler/service/service.go +++ b/scheduler/service/service.go @@ -79,19 +79,9 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa // Register task and trigger cdn download task task, err := s.registerTask(ctx, req) if err != nil { -<<<<<<< HEAD msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error()) logger.Error(msg) return nil, dferrors.New(base.Code_SchedTaskStatusError, msg) -||||||| 06e8bc76 - dferr := dferrors.New(base.Code_SchedTaskStatusError, "register task is fail") - logger.Errorf("peer %s register is failed: %v", req.PeerId, err) - return nil, dferr -======= - msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err) - logger.Error(msg) - return nil, dferrors.New(base.Code_SchedTaskStatusError, msg) ->>>>>>> 14ff3ddc6a5282fd4042ba0c1b1c03c0f4007312 } host := s.registerHost(ctx, req.PeerHost) peer := s.registerPeer(ctx, req.PeerId, task, host, req.UrlMeta.Tag) @@ -106,19 +96,9 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa peer.Log.Info("task size scope is tiny and return piece content directly") if len(task.DirectPiece) > 0 && int64(len(task.DirectPiece)) == task.ContentLength.Load() { if err := peer.FSM.Event(resource.PeerEventRegisterTiny); err != nil { -<<<<<<< HEAD msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error()) peer.Log.Error(msg) return nil, dferrors.New(base.Code_SchedError, msg) -||||||| 06e8bc76 - dferr := dferrors.New(base.Code_SchedError, err.Error()) - peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err) - return nil, dferr -======= - msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err) - peer.Log.Error(msg) - return nil, dferrors.New(base.Code_SchedError, msg) ->>>>>>> 14ff3ddc6a5282fd4042ba0c1b1c03c0f4007312 } return &rpcscheduler.RegisterResult{ @@ -141,19 +121,9 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa if !ok { peer.Log.Warn("task size scope is small and it can not select parent") if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil { -<<<<<<< HEAD msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error()) peer.Log.Error(msg) return nil, dferrors.New(base.Code_SchedError, msg) -||||||| 06e8bc76 - dferr := dferrors.New(base.Code_SchedError, err.Error()) - peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err) - return nil, dferr -======= - msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err) - peer.Log.Error(msg) - return nil, dferrors.New(base.Code_SchedError, msg) ->>>>>>> 14ff3ddc6a5282fd4042ba0c1b1c03c0f4007312 } return &rpcscheduler.RegisterResult{ @@ -167,19 +137,9 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa if !parent.FSM.Is(resource.PeerStateSucceeded) { peer.Log.Infof("task size scope is small and download state %s is not PeerStateSucceeded", parent.FSM.Current()) if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil { -<<<<<<< HEAD msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error()) peer.Log.Error(msg) return nil, dferrors.New(base.Code_SchedError, msg) -||||||| 06e8bc76 - dferr := dferrors.New(base.Code_SchedError, err.Error()) - peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err) - return nil, dferr -======= - msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err) - peer.Log.Error(msg) - return nil, dferrors.New(base.Code_SchedError, msg) ->>>>>>> 14ff3ddc6a5282fd4042ba0c1b1c03c0f4007312 } return &rpcscheduler.RegisterResult{ @@ -192,19 +152,9 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa if !ok { peer.Log.Warn("task size scope is small and it can not get first piece") if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil { -<<<<<<< HEAD msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error()) peer.Log.Error(msg) return nil, dferrors.New(base.Code_SchedError, msg) -||||||| 06e8bc76 - dferr := dferrors.New(base.Code_SchedError, err.Error()) - peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err) - return nil, dferr -======= - msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err) - peer.Log.Error(msg) - return nil, dferrors.New(base.Code_SchedError, msg) ->>>>>>> 14ff3ddc6a5282fd4042ba0c1b1c03c0f4007312 } return &rpcscheduler.RegisterResult{ @@ -215,19 +165,9 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa peer.ReplaceParent(parent) if err := peer.FSM.Event(resource.PeerEventRegisterSmall); err != nil { -<<<<<<< HEAD msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error()) peer.Log.Error(msg) return nil, dferrors.New(base.Code_SchedError, msg) -||||||| 06e8bc76 - dferr := dferrors.New(base.Code_SchedError, err.Error()) - peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err) - return nil, dferr -======= - msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err) - peer.Log.Error(msg) - return nil, dferrors.New(base.Code_SchedError, msg) ->>>>>>> 14ff3ddc6a5282fd4042ba0c1b1c03c0f4007312 } singlePiece := &rpcscheduler.SinglePiece{ @@ -254,19 +194,9 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa default: peer.Log.Info("task size scope is normal and needs to be register") if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil { -<<<<<<< HEAD msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error()) peer.Log.Error(msg) return nil, dferrors.New(base.Code_SchedError, msg) -||||||| 06e8bc76 - dferr := dferrors.New(base.Code_SchedError, err.Error()) - peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err) - return nil, dferr -======= - msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err) - peer.Log.Error(msg) - return nil, dferrors.New(base.Code_SchedError, msg) ->>>>>>> 14ff3ddc6a5282fd4042ba0c1b1c03c0f4007312 } return &rpcscheduler.RegisterResult{ @@ -279,19 +209,9 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa // Task is unsuccessful peer.Log.Infof("task state is %s and needs to be register", task.FSM.Current()) if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil { -<<<<<<< HEAD msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error()) peer.Log.Error(msg) return nil, dferrors.New(base.Code_SchedError, msg) -||||||| 06e8bc76 - dferr := dferrors.New(base.Code_SchedError, err.Error()) - peer.Log.Errorf("peer %s register is failed: %v", req.PeerId, err) - return nil, dferr -======= - msg := fmt.Sprintf("peer %s register is failed: %v", req.PeerId, err) - peer.Log.Error(msg) - return nil, dferrors.New(base.Code_SchedError, msg) ->>>>>>> 14ff3ddc6a5282fd4042ba0c1b1c03c0f4007312 } return &rpcscheduler.RegisterResult{ @@ -486,13 +406,7 @@ func (s *Service) AnnounceTask(ctx context.Context, req *rpcscheduler.AnnounceTa if !task.FSM.Is(resource.TaskStateSucceeded) { if task.FSM.Is(resource.TaskStatePending) { if err := task.FSM.Event(resource.TaskEventDownload); err != nil { -<<<<<<< HEAD msg := fmt.Sprintf("task fsm event failed: %s", err.Error()) -||||||| 06e8bc76 - msg := fmt.Sprintf("task fsm Pending -> Download event failed: %s", err) -======= - msg := fmt.Sprintf("task fsm event failed: %v", err) ->>>>>>> 14ff3ddc6a5282fd4042ba0c1b1c03c0f4007312 task.Log.Error(msg) return dferrors.New(base.Code_SchedError, msg) } @@ -500,13 +414,7 @@ func (s *Service) AnnounceTask(ctx context.Context, req *rpcscheduler.AnnounceTa if task.FSM.Is(resource.TaskStateFailed) { if err := task.FSM.Event(resource.TaskEventDownload); err != nil { -<<<<<<< HEAD msg := fmt.Sprintf("task fsm event failed: %s", err.Error()) -||||||| 06e8bc76 - msg := fmt.Sprintf("task fsm Failed -> Download event failed: %s", err) -======= - msg := fmt.Sprintf("task fsm event failed: %v", err) ->>>>>>> 14ff3ddc6a5282fd4042ba0c1b1c03c0f4007312 task.Log.Error(msg) return dferrors.New(base.Code_SchedError, msg) } @@ -536,13 +444,7 @@ func (s *Service) AnnounceTask(ctx context.Context, req *rpcscheduler.AnnounceTa if !peer.FSM.Is(resource.PeerStateSucceeded) { if peer.FSM.Is(resource.PeerStatePending) { if err := peer.FSM.Event(resource.PeerEventRegisterNormal); err != nil { -<<<<<<< HEAD msg := fmt.Sprintf("peer fsm event failed: %s", err.Error()) -||||||| 06e8bc76 - msg := fmt.Sprintf("peer fsm Pending -> Normal event failed: %s", err) -======= - msg := fmt.Sprintf("peer fsm event failed: %v", err) ->>>>>>> 14ff3ddc6a5282fd4042ba0c1b1c03c0f4007312 task.Log.Error(msg) return dferrors.New(base.Code_SchedError, msg) } @@ -552,13 +454,7 @@ func (s *Service) AnnounceTask(ctx context.Context, req *rpcscheduler.AnnounceTa peer.FSM.Is(resource.PeerStateReceivedSmall) || peer.FSM.Is(resource.PeerStateReceivedNormal) { if err := peer.FSM.Event(resource.PeerEventDownload); err != nil { -<<<<<<< HEAD msg := fmt.Sprintf("peer fsm event failed: %s", err.Error()) -||||||| 06e8bc76 - msg := fmt.Sprintf("peer fsm Normal -> Download event failed: %s", err) -======= - msg := fmt.Sprintf("peer fsm event failed: %v", err) ->>>>>>> 14ff3ddc6a5282fd4042ba0c1b1c03c0f4007312 task.Log.Error(msg) return dferrors.New(base.Code_SchedError, msg) } @@ -570,7 +466,6 @@ func (s *Service) AnnounceTask(ctx context.Context, req *rpcscheduler.AnnounceTa return nil } -<<<<<<< HEAD // LeaveTask makes the peer unschedulable func (s *Service) LeaveTask(ctx context.Context, req *rpcscheduler.PeerTarget) error { peer, ok := s.resource.PeerManager().Load(req.PeerId) @@ -607,45 +502,6 @@ func (s *Service) LeaveTask(ctx context.Context, req *rpcscheduler.PeerTarget) e return nil } -||||||| 06e8bc76 -======= -// LeaveTask makes the peer unschedulable -func (s *Service) LeaveTask(ctx context.Context, req *rpcscheduler.PeerTarget) error { - peer, ok := s.resource.PeerManager().Load(req.PeerId) - if !ok { - msg := fmt.Sprintf("leave task and peer %s is not exists", req.PeerId) - logger.Error(msg) - return dferrors.New(base.Code_SchedPeerNotFound, msg) - } - - metrics.LeaveTaskCount.WithLabelValues(peer.BizTag).Inc() - - peer.Log.Infof("leave task: %#v", req) - if err := peer.FSM.Event(resource.PeerEventLeave); err != nil { - metrics.LeaveTaskFailureCount.WithLabelValues(peer.BizTag).Inc() - - msg := fmt.Sprintf("peer fsm event failed: %v", err) - peer.Log.Error(msg) - return dferrors.New(base.Code_SchedTaskStatusError, msg) - } - - peer.Children.Range(func(_, value interface{}) bool { - child, ok := value.(*resource.Peer) - if !ok { - return true - } - - // Reschedule a new parent to children of peer to exclude the current leave peer - child.Log.Infof("schedule parent because of parent peer %s is leaving", peer.ID) - s.scheduler.ScheduleParent(ctx, child, child.BlockPeers) - return true - }) - - s.resource.PeerManager().Delete(peer.ID) - return nil -} - ->>>>>>> 14ff3ddc6a5282fd4042ba0c1b1c03c0f4007312 // registerTask creates a new task or reuses a previous task func (s *Service) registerTask(ctx context.Context, req *rpcscheduler.PeerTaskRequest) (*resource.Task, error) { task := resource.NewTask(idgen.TaskID(req.Url, req.UrlMeta), req.Url, resource.TaskTypeNormal, req.UrlMeta, resource.WithBackToSourceLimit(int32(s.config.Scheduler.BackSourceCount))) @@ -832,21 +688,27 @@ func (s *Service) handlePieceFail(ctx context.Context, peer *resource.Peer, piec // handlePeerSuccess handles successful peer func (s *Service) handlePeerSuccess(ctx context.Context, peer *resource.Peer) { + if err := peer.FSM.Event(resource.PeerEventDownloadSucceeded); err != nil { + peer.Log.Errorf("peer fsm event failed: %s", err.Error()) + return + } + // If the peer type is tiny and back-to-source, // it need to directly download the tiny file and store the data in task DirectPiece if peer.Task.SizeScope() == base.SizeScope_TINY && len(peer.Task.DirectPiece) == 0 { data, err := peer.DownloadTinyFile() - if err == nil && len(data) == int(peer.Task.ContentLength.Load()) { - // Tiny file downloaded successfully - peer.Task.DirectPiece = data - } else { - peer.Log.Warnf("download tiny file length is %d, task content length is %d, downloading is failed: %s", len(data), peer.Task.ContentLength.Load(), err.Error()) + if err != nil { + peer.Log.Errorf("download tiny task failed: %s", err.Error()) + return } - } - if err := peer.FSM.Event(resource.PeerEventDownloadSucceeded); err != nil { - peer.Log.Errorf("peer fsm event failed: %s", err.Error()) - return + if len(data) != int(peer.Task.ContentLength.Load()) { + peer.Log.Errorf("download tiny task length of data is %d, task content length is %d", len(data), peer.Task.ContentLength.Load()) + return + } + + // Tiny file downloaded successfully + peer.Task.DirectPiece = data } }