diff --git a/dm/command/argument.go b/dm/command/argument.go index 4d4e3ccb7e..9a6cdf0166 100644 --- a/dm/command/argument.go +++ b/dm/command/argument.go @@ -50,7 +50,7 @@ func VerifySQLOperateArgs(binlogPosStr, sqlPattern string, sharding bool) (*mysq if len(binlogPosStr) > 0 { pos2, err := binlog.PositionFromStr(binlogPosStr) if err != nil { - return nil, nil, terror.ErrVerifySQLOperateArgs.Generatef("invalid --binlog-pos %s in sql operation: %s", binlogPosStr, terror.Message(err)) + return nil, nil, terror.ErrVerifySQLOperateArgs.Generatef("invalid --binlog-pos %s in sql operation: %v", binlogPosStr, err) } pos = &pos2 } else if len(sqlPattern) > 0 { diff --git a/dm/master/server.go b/dm/master/server.go index 1f43d25778..7ed5063316 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -281,7 +281,7 @@ func (s *Server) RegisterWorker(ctx context.Context, req *pb.RegisterWorkerReque if err != nil { return &pb.RegisterWorkerResponse{ Result: false, - Msg: terror.Message(err), + Msg: err.Error(), }, nil } log.L().Info("register worker successfully", zap.String("name", req.Name), zap.String("address", req.Address)) @@ -310,7 +310,7 @@ func (s *Server) OfflineMember(ctx context.Context, req *pb.OfflineMemberRequest if err != nil { return &pb.OfflineMemberResponse{ Result: false, - Msg: terror.Message(err), + Msg: err.Error(), }, nil } } else if req.Type == common.Master { @@ -318,7 +318,7 @@ func (s *Server) OfflineMember(ctx context.Context, req *pb.OfflineMemberRequest if err != nil { return &pb.OfflineMemberResponse{ Result: false, - Msg: terror.Message(err), + Msg: err.Error(), }, nil } } else { @@ -383,7 +383,7 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S resp := &pb.StartTaskResponse{} cfg, stCfgs, err := s.generateSubTask(ctx, req.Task) if err != nil { - resp.Msg = terror.Message(err) + resp.Msg = err.Error() return resp, nil } log.L().Info("", zap.String("task name", cfg.Name), zap.Stringer("task", cfg), zap.String("request", "StartTask")) @@ -432,7 +432,7 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S err = s.scheduler.AddSubTasks(subtaskCfgPointersToInstances(stCfgs...)...) s.removeMetaLock.Unlock() if err != nil { - resp.Msg = terror.Message(err) + resp.Msg = err.Error() return resp, nil } resp.Result = true @@ -487,7 +487,7 @@ func (s *Server) OperateTask(ctx context.Context, req *pb.OperateTaskRequest) (* err = s.scheduler.UpdateExpectSubTaskStage(expect, req.Name, sources...) } if err != nil { - resp.Msg = terror.Message(err) + resp.Msg = err.Error() return resp, nil } @@ -513,7 +513,7 @@ func (s *Server) UpdateTask(ctx context.Context, req *pb.UpdateTaskRequest) (*pb if err != nil { return &pb.UpdateTaskResponse{ Result: false, - Msg: terror.Message(err), + Msg: err.Error(), }, nil } log.L().Info("update task", zap.String("task name", cfg.Name), zap.Stringer("task", cfg)) @@ -753,7 +753,7 @@ func (s *Server) UnlockDDLLock(ctx context.Context, req *pb.UnlockDDLLockRequest err := s.pessimist.UnlockLock(ctx, req.ID, req.ReplaceOwner, req.ForceRemove) if err != nil { resp.Result = false - resp.Msg = terror.Message(err) + resp.Msg = err.Error() } return resp, nil @@ -777,7 +777,7 @@ func (s *Server) HandleSQLs(ctx context.Context, req *pb.HandleSQLsRequest) (*pb if err != nil { return &pb.HandleSQLsResponse{ Result: false, - Msg: fmt.Sprintf("save request with --sharding error:\n%s", terror.Message(err)), + Msg: fmt.Sprintf("save request with --sharding error:\n%v", err), }, nil } log.L().Info("handle sqls request was saved", zap.String("task name", req.Name), zap.String("request", "HandleSQLs")) @@ -816,7 +816,7 @@ func (s *Server) HandleSQLs(ctx context.Context, req *pb.HandleSQLsRequest) (*pb response, err := worker.SendRequest(ctx, subReq, s.cfg.RPCTimeout) workerResp := &pb.CommonWorkerResponse{} if err != nil { - workerResp = errorCommonWorkerResponse(terror.Message(err), req.Source, worker.BaseInfo().Name) + workerResp = errorCommonWorkerResponse(err.Error(), req.Source, worker.BaseInfo().Name) } else { workerResp = response.HandleSubTaskSQLs } @@ -861,7 +861,7 @@ func (s *Server) PurgeWorkerRelay(ctx context.Context, req *pb.PurgeWorkerRelayR resp, err := worker.SendRequest(ctx, workerReq, s.cfg.RPCTimeout) workerResp := &pb.CommonWorkerResponse{} if err != nil { - workerResp = errorCommonWorkerResponse(terror.Message(err), source, worker.BaseInfo().Name) + workerResp = errorCommonWorkerResponse(err.Error(), source, worker.BaseInfo().Name) } else { workerResp = resp.PurgeRelay } @@ -905,7 +905,7 @@ func (s *Server) SwitchWorkerRelayMaster(ctx context.Context, req *pb.SwitchWork handleErr := func(err error, source string) { log.L().Error("response error", zap.Error(err)) - resp := errorCommonWorkerResponse(terror.Message(err), source, "") + resp := errorCommonWorkerResponse(err.Error(), source, "") workerRespCh <- resp } @@ -928,7 +928,7 @@ func (s *Server) SwitchWorkerRelayMaster(ctx context.Context, req *pb.SwitchWork resp, err := worker.SendRequest(ctx, request, s.cfg.RPCTimeout) workerResp := &pb.CommonWorkerResponse{} if err != nil { - workerResp = errorCommonWorkerResponse(terror.Message(err), sourceID, worker.BaseInfo().Name) + workerResp = errorCommonWorkerResponse(err.Error(), sourceID, worker.BaseInfo().Name) } else { workerResp = resp.SwitchRelayMaster } @@ -987,7 +987,7 @@ func (s *Server) OperateWorkerRelayTask(ctx context.Context, req *pb.OperateWork } err := s.scheduler.UpdateExpectRelayStage(expect, req.Sources...) if err != nil { - resp.Msg = terror.Message(err) + resp.Msg = err.Error() return resp, nil } resp.Result = true @@ -1020,7 +1020,7 @@ func (s *Server) getStatusFromWorkers(ctx context.Context, sources []string, tas log.L().Error("response error", zap.Error(err)) resp := &pb.QueryStatusResponse{ Result: false, - Msg: terror.Message(err), + Msg: err.Error(), SourceStatus: &pb.SourceStatus{ Source: source, }, @@ -1046,7 +1046,7 @@ func (s *Server) getStatusFromWorkers(ctx context.Context, sources []string, tas if err != nil { workerStatus = &pb.QueryStatusResponse{ Result: false, - Msg: terror.Message(err), + Msg: err.Error(), SourceStatus: &pb.SourceStatus{}, } } else { @@ -1076,7 +1076,7 @@ func (s *Server) getErrorFromWorkers(ctx context.Context, sources []string, task log.L().Error("response error", zap.Error(err)) resp := &pb.QueryErrorResponse{ Result: false, - Msg: terror.Message(err), + Msg: err.Error(), SourceError: &pb.SourceError{ Source: source, }, @@ -1103,7 +1103,7 @@ func (s *Server) getErrorFromWorkers(ctx context.Context, sources []string, task if err != nil { workerError = &pb.QueryErrorResponse{ Result: false, - Msg: terror.Message(err), + Msg: err.Error(), SourceError: &pb.SourceError{}, } } else { @@ -1154,7 +1154,7 @@ func (s *Server) UpdateMasterConfig(ctx context.Context, req *pb.UpdateMasterCon s.Unlock() return &pb.UpdateMasterConfigResponse{ Result: false, - Msg: "Failed to write config to local file. detail: " + terror.Message(err), + Msg: "Failed to write config to local file. detail: " + err.Error(), }, nil } log.L().Info("saved dm-master config file", zap.String("config file", s.cfg.ConfigFile), zap.String("request", "UpdateMasterConfig")) @@ -1166,7 +1166,7 @@ func (s *Server) UpdateMasterConfig(ctx context.Context, req *pb.UpdateMasterCon s.Unlock() return &pb.UpdateMasterConfigResponse{ Result: false, - Msg: fmt.Sprintf("Failed to parse configure from file %s, detail: ", cfg.ConfigFile) + terror.Message(err), + Msg: fmt.Sprintf("Failed to parse configure from file %s, detail: ", cfg.ConfigFile) + err.Error(), }, nil } log.L().Info("update dm-master config", zap.Stringer("config", cfg), zap.String("request", "UpdateMasterConfig")) @@ -1205,7 +1205,7 @@ func (s *Server) UpdateWorkerRelayConfig(ctx context.Context, req *pb.UpdateWork } resp, err := worker.SendRequest(ctx, request, s.cfg.RPCTimeout) if err != nil { - return errorCommonWorkerResponse(terror.Message(err), source, worker.BaseInfo().Name), nil + return errorCommonWorkerResponse(err.Error(), source, worker.BaseInfo().Name), nil } return resp.UpdateRelay, nil } @@ -1249,7 +1249,7 @@ func (s *Server) MigrateWorkerRelay(ctx context.Context, req *pb.MigrateWorkerRe } resp, err := worker.SendRequest(ctx, request, s.cfg.RPCTimeout) if err != nil { - return errorCommonWorkerResponse(terror.Message(err), source, worker.BaseInfo().Name), nil + return errorCommonWorkerResponse(err.Error(), source, worker.BaseInfo().Name), nil } return resp.MigrateRelay, nil } @@ -1270,7 +1270,7 @@ func (s *Server) CheckTask(ctx context.Context, req *pb.CheckTaskRequest) (*pb.C if err != nil { return &pb.CheckTaskResponse{ Result: false, - Msg: terror.Message(err), + Msg: err.Error(), }, nil } @@ -1317,7 +1317,7 @@ func (s *Server) OperateSource(ctx context.Context, req *pb.OperateSourceRequest Result: false, } if err != nil { - resp.Msg = terror.Message(err) + resp.Msg = err.Error() return resp, nil } var w *scheduler.Worker @@ -1325,7 +1325,7 @@ func (s *Server) OperateSource(ctx context.Context, req *pb.OperateSourceRequest case pb.SourceOp_StartSource: err := s.scheduler.AddSourceCfg(*cfg) if err != nil { - resp.Msg = terror.Message(err) + resp.Msg = err.Error() return resp, nil } // for start source, we should get worker after start source @@ -1338,7 +1338,7 @@ func (s *Server) OperateSource(ctx context.Context, req *pb.OperateSourceRequest w = s.scheduler.GetWorkerBySource(cfg.SourceID) err := s.scheduler.RemoveSourceCfg(cfg.SourceID) if err != nil { - resp.Msg = terror.Message(err) + resp.Msg = err.Error() return resp, nil } default: @@ -1632,7 +1632,7 @@ func (s *Server) handleOperationResult(ctx context.Context, cli *scheduler.Worke var response *pb.CommonWorkerResponse queryResp, err := s.waitOperationOk(ctx, cli, taskName, sourceID, req) if err != nil { - response = errorCommonWorkerResponse(terror.Message(err), sourceID, cli.BaseInfo().Name) + response = errorCommonWorkerResponse(err.Error(), sourceID, cli.BaseInfo().Name) } else { response = &pb.CommonWorkerResponse{ Result: true, @@ -1699,7 +1699,7 @@ func (s *Server) listMemberMaster(ctx context.Context, names []string) (*pb.Memb memberList, err := s.etcdClient.MemberList(ctx) if err != nil { - resp.Master.Msg = terror.Message(err) + resp.Master.Msg = err.Error() return resp, nil } @@ -1749,7 +1749,7 @@ func (s *Server) listMemberWorker(ctx context.Context, names []string) (*pb.Memb workerAgents, err := s.scheduler.GetAllWorkers() if err != nil { - resp.Worker.Msg = terror.Message(err) + resp.Worker.Msg = err.Error() return resp, nil } @@ -1794,7 +1794,7 @@ func (s *Server) listMemberLeader(ctx context.Context, names []string) (*pb.Memb _, name, addr, err := s.election.LeaderInfo(ctx) if err != nil { - resp.Leader.Msg = terror.Message(err) + resp.Leader.Msg = err.Error() return resp, nil } @@ -1831,7 +1831,7 @@ func (s *Server) ListMember(ctx context.Context, req *pb.ListMemberRequest) (*pb if req.Leader { res, err := s.listMemberLeader(ctx, req.Names) if err != nil { - resp.Msg = terror.Message(err) + resp.Msg = err.Error() return resp, nil } members = append(members, &pb.Members{ @@ -1842,7 +1842,7 @@ func (s *Server) ListMember(ctx context.Context, req *pb.ListMemberRequest) (*pb if req.Master { res, err := s.listMemberMaster(ctx, req.Names) if err != nil { - resp.Msg = terror.Message(err) + resp.Msg = err.Error() return resp, nil } members = append(members, &pb.Members{ @@ -1853,7 +1853,7 @@ func (s *Server) ListMember(ctx context.Context, req *pb.ListMemberRequest) (*pb if req.Worker { res, err := s.listMemberWorker(ctx, req.Names) if err != nil { - resp.Msg = terror.Message(err) + resp.Msg = err.Error() return resp, nil } members = append(members, &pb.Members{ diff --git a/dm/tracer/server.go b/dm/tracer/server.go index a8f2ac69b8..62f768bbfe 100644 --- a/dm/tracer/server.go +++ b/dm/tracer/server.go @@ -19,7 +19,6 @@ import ( "sync" "time" - "github.com/pingcap/errors" "github.com/siddontang/go/sync2" "github.com/soheilhy/cmux" "go.uber.org/zap" @@ -139,7 +138,7 @@ func (s *Server) UploadSyncerBinlogEvent(ctx context.Context, req *pb.UploadSync if err != nil { return &pb.CommonUploadResponse{ Result: false, - Msg: errors.ErrorStack(err), + Msg: err.Error(), }, nil } } @@ -157,7 +156,7 @@ func (s *Server) UploadSyncerJobEvent(ctx context.Context, req *pb.UploadSyncerJ if err != nil { return &pb.CommonUploadResponse{ Result: false, - Msg: errors.ErrorStack(err), + Msg: err.Error(), }, nil } } diff --git a/dm/worker/server.go b/dm/worker/server.go index 604f5a8e39..73afe8b9ec 100644 --- a/dm/worker/server.go +++ b/dm/worker/server.go @@ -600,7 +600,7 @@ func (s *Server) OperateRelay(ctx context.Context, req *pb.OperateRelayRequest) err := w.OperateRelay(ctx, req) if err != nil { log.L().Error("fail to operate relay", zap.String("request", "OperateRelay"), zap.Stringer("payload", req), zap.Error(err)) - resp.Msg = errors.ErrorStack(err) + resp.Msg = err.Error() return resp, nil } @@ -663,7 +663,7 @@ func (s *Server) QueryWorkerConfig(ctx context.Context, req *pb.QueryWorkerConfi workerCfg, err := w.QueryConfig(ctx) if err != nil { resp.Result = false - resp.Msg = errors.ErrorStack(err) + resp.Msg = err.Error() log.L().Error("fail to query worker config", zap.String("request", "QueryWorkerConfig"), zap.Stringer("payload", req), zap.Error(err)) return resp, nil } @@ -671,7 +671,7 @@ func (s *Server) QueryWorkerConfig(ctx context.Context, req *pb.QueryWorkerConfi rawConfig, err := workerCfg.From.Toml() if err != nil { resp.Result = false - resp.Msg = errors.ErrorStack(err) + resp.Msg = err.Error() log.L().Error("fail to marshal worker config", zap.String("request", "QueryWorkerConfig"), zap.Stringer("worker from config", &workerCfg.From), zap.Error(err)) } @@ -808,7 +808,7 @@ func makeCommonWorkerResponse(reqErr error) *pb.CommonWorkerResponse { } if reqErr != nil { resp.Result = false - resp.Msg = errors.ErrorStack(reqErr) + resp.Msg = reqErr.Error() } return resp }