Skip to content

Commit

Permalink
fix: error message (dragonflyoss#1255)
Browse files Browse the repository at this point in the history
* fix: error message

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Apr 15, 2022
1 parent 14ff3dd commit 8d001b5
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 60 deletions.
24 changes: 12 additions & 12 deletions scheduler/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,27 +57,27 @@ func New(cfg *config.Config, resource resource.Resource) (Job, error) {

globalJob, err := internaljob.New(redisConfig, internaljob.GlobalQueue)
if err != nil {
logger.Errorf("create global job queue error: %v", err)
logger.Errorf("create global job queue error: %s", err.Error())
return nil, err
}
logger.Infof("create global job queue: %v", globalJob)

schedulerJob, err := internaljob.New(redisConfig, internaljob.SchedulersQueue)
if err != nil {
logger.Errorf("create scheduler job queue error: %v", err)
logger.Errorf("create scheduler job queue error: %s", err.Error())
return nil, err
}
logger.Infof("create scheduler job queue: %v", schedulerJob)

localQueue, err := internaljob.GetSchedulerQueue(cfg.Manager.SchedulerClusterID, cfg.Server.Host)
if err != nil {
logger.Errorf("get local job queue name error: %v", err)
logger.Errorf("get local job queue name error: %s", err.Error())
return nil, err
}

localJob, err := internaljob.New(redisConfig, localQueue)
if err != nil {
logger.Errorf("create local job queue error: %v", err)
logger.Errorf("create local job queue error: %s", err.Error())
return nil, err
}
logger.Infof("create local job queue: %v", localQueue)
Expand All @@ -95,7 +95,7 @@ func New(cfg *config.Config, resource resource.Resource) (Job, error) {
}

if err := localJob.RegisterJob(namedJobFuncs); err != nil {
logger.Errorf("register preheat job to local queue error: %v", err)
logger.Errorf("register preheat job to local queue error: %s", err.Error())
return nil, err
}

Expand All @@ -106,21 +106,21 @@ func (j *job) Serve() {
go func() {
logger.Infof("ready to launch %d worker(s) on global queue", j.config.Job.GlobalWorkerNum)
if err := j.globalJob.LaunchWorker("global_worker", int(j.config.Job.GlobalWorkerNum)); err != nil {
logger.Fatalf("global queue worker error: %v", err)
logger.Fatalf("global queue worker error: %s", err.Error())
}
}()

go func() {
logger.Infof("ready to launch %d worker(s) on scheduler queue", j.config.Job.SchedulerWorkerNum)
if err := j.schedulerJob.LaunchWorker("scheduler_worker", int(j.config.Job.SchedulerWorkerNum)); err != nil {
logger.Fatalf("scheduler queue worker error: %v", err)
logger.Fatalf("scheduler queue worker error: %s", err.Error())
}
}()

go func() {
logger.Infof("ready to launch %d worker(s) on local queue", j.config.Job.LocalWorkerNum)
if err := j.localJob.LaunchWorker("local_worker", int(j.config.Job.LocalWorkerNum)); err != nil {
logger.Fatalf("scheduler queue worker error: %v", err)
logger.Fatalf("scheduler queue worker error: %s", err.Error())
}
}()
}
Expand All @@ -138,12 +138,12 @@ func (j *job) preheat(ctx context.Context, req string) error {

request := &internaljob.PreheatRequest{}
if err := internaljob.UnmarshalRequest(req, request); err != nil {
logger.Errorf("unmarshal request err: %v, request body: %s", err, req)
logger.Errorf("unmarshal request err: %s, request body: %s", err.Error(), req)
return err
}

if err := validator.New().Struct(request); err != nil {
logger.Errorf("url %s validate failed: %v", request.URL, err)
logger.Errorf("url %s validate failed: %s", request.URL, err.Error())
return err
}

Expand Down Expand Up @@ -172,14 +172,14 @@ func (j *job) preheat(ctx context.Context, req string) error {
UrlMeta: urlMeta,
})
if err != nil {
log.Errorf("preheat failed: %v", err)
log.Errorf("preheat failed: %s", err.Error())
return err
}

for {
piece, err := stream.Recv()
if err != nil {
log.Errorf("preheat recive piece failed: %v", err)
log.Errorf("preheat recive piece failed: %s", err.Error())
return err
}

Expand Down
4 changes: 2 additions & 2 deletions scheduler/resource/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,12 @@ func (h *Host) LeavePeers() {
h.Peers.Range(func(_, value interface{}) bool {
if peer, ok := value.(*Peer); ok {
if err := peer.FSM.Event(PeerEventDownloadFailed); err != nil {
peer.Log.Errorf("peer fsm event failed: %v", err)
peer.Log.Errorf("peer fsm event failed: %s", err.Error())
return true
}

if err := peer.FSM.Event(PeerEventLeave); err != nil {
peer.Log.Errorf("peer fsm event failed: %v", err)
peer.Log.Errorf("peer fsm event failed: %s", err.Error())
return true
}

Expand Down
2 changes: 1 addition & 1 deletion scheduler/resource/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (p *peerManager) RunGC() error {
// If the peer is not leave,
// first change the state to PeerEventLeave
if err := peer.FSM.Event(PeerEventLeave); err != nil {
peer.Log.Errorf("peer fsm event failed: %v", err)
peer.Log.Errorf("peer fsm event failed: %s", err.Error())
}

peer.Log.Info("gc causes the peer to leave")
Expand Down
4 changes: 2 additions & 2 deletions scheduler/resource/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,13 +337,13 @@ func (t *Task) NotifyPeers(code base.Code, event string) {
}

if err := stream.Send(&rpcscheduler.PeerPacket{Code: code}); err != nil {
t.Log.Errorf("send packet to peer %s failed: %v", peer.ID, err)
t.Log.Errorf("send packet to peer %s failed: %s", peer.ID, err.Error())
return true
}
t.Log.Infof("task notify peer %s code %s", peer.ID, code)

if err := peer.FSM.Event(event); err != nil {
peer.Log.Errorf("peer fsm event failed: %v", err)
peer.Log.Errorf("peer fsm event failed: %s", err.Error())
return true
}
}
Expand Down
16 changes: 8 additions & 8 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err
Location: s.config.Host.Location,
SchedulerClusterId: uint64(s.config.Manager.SchedulerClusterID),
}); err != nil {
logger.Fatalf("register to manager failed %v", err)
logger.Fatalf("register to manager failed %s", err.Error())
}

// Initialize dynconfig client
Expand Down Expand Up @@ -165,7 +165,7 @@ func (s *Server) Serve() error {
// Serve dynConfig
go func() {
if err := s.dynconfig.Serve(); err != nil {
logger.Fatalf("dynconfig start failed %v", err)
logger.Fatalf("dynconfig start failed %s", err.Error())
}
logger.Info("dynconfig start successfully")
}()
Expand All @@ -188,7 +188,7 @@ func (s *Server) Serve() error {
if err == http.ErrServerClosed {
return
}
logger.Fatalf("metrics server closed unexpect: %v", err)
logger.Fatalf("metrics server closed unexpect: %s", err.Error())
}
}()
}
Expand All @@ -208,14 +208,14 @@ func (s *Server) Serve() error {
// Generate GRPC limit listener
listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", s.config.Server.Listen, s.config.Server.Port))
if err != nil {
logger.Fatalf("net listener failed to start: %v", err)
logger.Fatalf("net listener failed to start: %s", err.Error())
}
defer listener.Close()

// Started GRPC server
logger.Infof("started grpc server at %s://%s", listener.Addr().Network(), listener.Addr().String())
if err := s.grpcServer.Serve(listener); err != nil {
logger.Errorf("stoped grpc server: %v", err)
logger.Errorf("stoped grpc server: %s", err.Error())
return err
}

Expand All @@ -225,14 +225,14 @@ func (s *Server) Serve() error {
func (s *Server) Stop() {
// Stop dynconfig server
if err := s.dynconfig.Stop(); err != nil {
logger.Errorf("dynconfig client closed failed %v", err)
logger.Errorf("dynconfig client closed failed %s", err.Error())
}
logger.Info("dynconfig client closed")

// Stop manager client
if s.managerClient != nil {
if err := s.managerClient.Close(); err != nil {
logger.Errorf("manager client failed to stop: %v", err)
logger.Errorf("manager client failed to stop: %s", err.Error())
}
logger.Info("manager client closed")
}
Expand All @@ -244,7 +244,7 @@ func (s *Server) Stop() {
// Stop metrics server
if s.metricsServer != nil {
if err := s.metricsServer.Shutdown(context.Background()); err != nil {
logger.Errorf("metrics server failed to stop: %v", err)
logger.Errorf("metrics server failed to stop: %s", err.Error())
}
logger.Info("metrics server closed under request")
}
Expand Down
8 changes: 4 additions & 4 deletions scheduler/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,22 +92,22 @@ func (s *scheduler) ScheduleParent(ctx context.Context, peer *resource.Peer, blo

// Notify peer back-to-source
if err := stream.Send(&rpcscheduler.PeerPacket{Code: base.Code_SchedNeedBackSource}); err != nil {
peer.Log.Errorf("send packet failed: %v", err)
peer.Log.Errorf("send packet failed: %s", err.Error())
return
}
peer.Log.Infof("peer scheduling %d times, peer downloads back-to-source %d",
n, base.Code_SchedNeedBackSource)

if err := peer.FSM.Event(resource.PeerEventDownloadFromBackToSource); err != nil {
peer.Log.Errorf("peer fsm event failed: %v", err)
peer.Log.Errorf("peer fsm event failed: %s", err.Error())
return
}

// If the task state is TaskStateFailed,
// peer back-to-source and reset task state to TaskStateRunning
if peer.Task.FSM.Is(resource.TaskStateFailed) {
if err := peer.Task.FSM.Event(resource.TaskEventDownload); err != nil {
peer.Task.Log.Errorf("task fsm event failed: %v", err)
peer.Task.Log.Errorf("task fsm event failed: %s", err.Error())
return
}
}
Expand All @@ -125,7 +125,7 @@ func (s *scheduler) ScheduleParent(ctx context.Context, peer *resource.Peer, blo

// Notify peer schedule failed
if err := stream.Send(&rpcscheduler.PeerPacket{Code: base.Code_SchedTaskStatusError}); err != nil {
peer.Log.Errorf("send packet failed: %v", err)
peer.Log.Errorf("send packet failed: %s", err.Error())
return
}
peer.Log.Errorf("peer scheduling exceeds the limit %d times and return code %d", s.config.RetryLimit, base.Code_SchedTaskStatusError)
Expand Down
Loading

0 comments on commit 8d001b5

Please sign in to comment.