From 5e81548c3c1a1adab056d977e7767307a39ecb70 Mon Sep 17 00:00:00 2001 From: ShuNing Date: Tue, 19 Feb 2019 19:14:03 +0800 Subject: [PATCH] *: log format for server (#1431) * *: log format for server Signed-off-by: nolouch --- server/cluster.go | 37 +++++++++++++++------- server/cluster_info.go | 56 ++++++++++++++++++++++++++------- server/cluster_worker.go | 23 ++++++++++---- server/coordinator.go | 17 +++++----- server/etcd_kv.go | 11 ++++--- server/grpc_service.go | 14 ++++++--- server/handler.go | 13 ++++---- server/heartbeat_stream_test.go | 7 +++-- server/heartbeat_streams.go | 12 ++++--- server/id.go | 5 +-- server/join.go | 9 +++--- server/leader.go | 55 +++++++++++++++++--------------- server/systime_mon.go | 5 +-- server/tso.go | 17 +++++----- server/version.go | 7 +++-- 15 files changed, 186 insertions(+), 102 deletions(-) diff --git a/server/cluster.go b/server/cluster.go index 51492225b38..39076050e16 100644 --- a/server/cluster.go +++ b/server/cluster.go @@ -23,12 +23,13 @@ import ( "github.com/pingcap/errcode" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + log "github.com/pingcap/log" "github.com/pingcap/pd/pkg/logutil" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/namespace" syncer "github.com/pingcap/pd/server/region_syncer" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) var backgroundJobInterval = time.Minute @@ -400,7 +401,9 @@ func (c *RaftCluster) putStore(store *metapb.Store) error { // Check location labels. for _, k := range c.cachedCluster.GetLocationLabels() { if v := s.GetLabelValue(k); len(v) == 0 { - log.Warnf("missing location label %q in store %v", k, s) + log.Warn("missing location label", + zap.Stringer("store", s.GetMeta()), + zap.String("label-key", k)) } } return cluster.putStore(s) @@ -430,7 +433,9 @@ func (c *RaftCluster) RemoveStore(storeID uint64) error { } newStore := store.Clone(core.SetStoreState(metapb.StoreState_Offline)) - log.Warnf("[store %d] store %s has been Offline", newStore.GetID(), newStore.GetAddress()) + log.Warn("store has been offline", + zap.Uint64("store-id", newStore.GetID()), + zap.String("store-address", newStore.GetAddress())) return cluster.putStore(newStore) } @@ -458,11 +463,13 @@ func (c *RaftCluster) BuryStore(storeID uint64, force bool) error { // revive:di if !force { return errors.New("store is still up, please remove store gracefully") } - log.Warnf("forcedly bury store %v", store) + log.Warn("forcedly bury store", zap.Stringer("store", store.GetMeta())) } newStore := store.Clone(core.SetStoreState(metapb.StoreState_Tombstone)) - log.Warnf("[store %d] store %s has been Tombstone", newStore.GetID(), newStore.GetAddress()) + log.Warn("store has been Tombstone", + zap.Uint64("store-id", newStore.GetID()), + zap.String("store-address", newStore.GetAddress())) return cluster.putStore(newStore) } @@ -479,7 +486,9 @@ func (c *RaftCluster) SetStoreState(storeID uint64, state metapb.StoreState) err } newStore := store.Clone(core.SetStoreState(state)) - log.Warnf("[store %d] set state to %v", storeID, state.String()) + log.Warn("store update state", + zap.Uint64("store-id", storeID), + zap.Stringer("new-state", state)) return cluster.putStore(newStore) } @@ -526,7 +535,9 @@ func (c *RaftCluster) checkStores() { // If the store is empty, it can be buried. if cluster.getStoreRegionCount(offlineStore.GetId()) == 0 { if err := c.BuryStore(offlineStore.GetId(), false); err != nil { - log.Errorf("bury store %v failed: %v", offlineStore, err) + log.Error("bury store failed", + zap.Stringer("store", offlineStore), + zap.Error(err)) } } else { offlineStores = append(offlineStores, offlineStore) @@ -539,7 +550,7 @@ func (c *RaftCluster) checkStores() { if upStoreCount < cluster.GetMaxReplicas() { for _, offlineStore := range offlineStores { - log.Warnf("store %v may not turn into Tombstone, there are no extra up node has enough space to accommodate the extra replica", offlineStore) + log.Warn("store may not turn into Tombstone, there are no extra up node has enough space to accommodate the extra replica", zap.Stringer("store", offlineStore)) } } } @@ -550,13 +561,17 @@ func (c *RaftCluster) checkOperators() { // after region is merged, it will not heartbeat anymore // the operator of merged region will not timeout actively if c.cachedCluster.GetRegion(op.RegionID()) == nil { - log.Debugf("remove operator %v cause region %d is merged", op, op.RegionID()) + log.Debug("remove operator cause region is merged", + zap.Uint64("region-id", op.RegionID()), + zap.Stringer("operator", op)) opController.RemoveOperator(op) continue } if op.IsTimeout() { - log.Infof("[region %v] operator timeout: %s", op.RegionID(), op) + log.Info("operator timeout", + zap.Uint64("region-id", op.RegionID()), + zap.Stringer("operator", op)) opController.RemoveOperator(op) } } @@ -580,7 +595,7 @@ func (c *RaftCluster) collectHealthStatus() { client := c.s.GetClient() members, err := GetMembers(client) if err != nil { - log.Info("get members error:", err) + log.Error("get members error", zap.Error(err)) } unhealth := c.s.CheckHealth(members) for _, member := range members { diff --git a/server/cluster_info.go b/server/cluster_info.go index 70c1949549f..58a701301ac 100644 --- a/server/cluster_info.go +++ b/server/cluster_info.go @@ -21,10 +21,11 @@ import ( "github.com/gogo/protobuf/proto" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + log "github.com/pingcap/log" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/namespace" "github.com/pingcap/pd/server/schedule" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) type clusterInfo struct { @@ -72,13 +73,19 @@ func loadClusterInfo(id core.IDAllocator, kv *core.KV, opt *scheduleOption) (*cl if err := kv.LoadStores(c.core.Stores); err != nil { return nil, err } - log.Infof("load %v stores cost %v", c.core.Stores.GetStoreCount(), time.Since(start)) + log.Info("load stores", + zap.Int("count", c.core.Stores.GetStoreCount()), + zap.Duration("cost", time.Since(start)), + ) start = time.Now() if err := kv.LoadRegions(c.core.Regions); err != nil { return nil, err } - log.Infof("load %v regions cost %v", c.core.Regions.GetRegionCount(), time.Since(start)) + log.Info("load regions", + zap.Int("count", c.core.Regions.GetRegionCount()), + zap.Duration("cost", time.Since(start)), + ) return c, nil } @@ -107,9 +114,11 @@ func (c *clusterInfo) OnStoreVersionChange() { c.opt.SetClusterVersion(*minVersion) err := c.opt.persist(c.kv) if err != nil { - log.Infof("persist cluster version meet error: %s", err) + log.Error("persist cluster version meet error", zap.Error(err)) } - log.Infof("cluster version changed from %s to %s", clusterVersion, minVersion) + log.Info("cluster version changed", + zap.Stringer("old-cluster-version", clusterVersion), + zap.Stringer("new-cluster-version", minVersion)) CheckPDVersion(c.opt) } } @@ -133,7 +142,7 @@ func (c *clusterInfo) allocID() (uint64, error) { func (c *clusterInfo) AllocPeer(storeID uint64) (*metapb.Peer, error) { peerID, err := c.allocID() if err != nil { - log.Errorf("failed to alloc peer: %v", err) + log.Error("failed to alloc peer", zap.Error(err)) return nil, err } peer := &metapb.Peer{ @@ -482,7 +491,10 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error { // Mark isNew if the region in cache does not have leader. var saveKV, saveCache, isNew bool if origin == nil { - log.Debugf("[region %d] Insert new region {%v}", region.GetID(), core.HexRegionMeta(region.GetMeta())) + log.Debug("insert new region", + zap.Uint64("region-id", region.GetID()), + zap.Reflect("meta-region", core.HexRegionMeta(region.GetMeta())), + ) saveKV, saveCache, isNew = true, true, true } else { r := region.GetRegionEpoch() @@ -492,18 +504,32 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error { return ErrRegionIsStale(region.GetMeta(), origin.GetMeta()) } if r.GetVersion() > o.GetVersion() { - log.Infof("[region %d] %s, Version changed from {%d} to {%d}", region.GetID(), core.DiffRegionKeyInfo(origin, region), o.GetVersion(), r.GetVersion()) + log.Info("region Version changed", + zap.Uint64("region-id", region.GetID()), + zap.String("detail", core.DiffRegionKeyInfo(origin, region)), + zap.Uint64("old-version", o.GetVersion()), + zap.Uint64("new-version", r.GetVersion()), + ) saveKV, saveCache = true, true } if r.GetConfVer() > o.GetConfVer() { - log.Infof("[region %d] %s, ConfVer changed from {%d} to {%d}", region.GetID(), core.DiffRegionPeersInfo(origin, region), o.GetConfVer(), r.GetConfVer()) + log.Info("region ConfVer changed", + zap.Uint64("region-id", region.GetID()), + zap.String("detail", core.DiffRegionPeersInfo(origin, region)), + zap.Uint64("old-confver", o.GetConfVer()), + zap.Uint64("new-confver", r.GetConfVer()), + ) saveKV, saveCache = true, true } if region.GetLeader().GetId() != origin.GetLeader().GetId() { if origin.GetLeader().GetId() == 0 { isNew = true } else { - log.Infof("[region %d] Leader changed from store {%d} to {%d}", region.GetID(), origin.GetLeader().GetStoreId(), region.GetLeader().GetStoreId()) + log.Info("leader changed", + zap.Uint64("region-id", region.GetID()), + zap.Uint64("from", origin.GetLeader().GetStoreId()), + zap.Uint64("to", region.GetLeader().GetStoreId()), + ) } saveCache = true } @@ -528,7 +554,10 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error { if err := c.kv.SaveRegion(region.GetMeta()); err != nil { // Not successfully saved to kv is not fatal, it only leads to longer warm-up // after restart. Here we only log the error then go on updating cache. - log.Errorf("[region %d] fail to save region %v: %v", region.GetID(), core.HexRegionMeta(region.GetMeta()), err) + log.Error("fail to save region to kv", + zap.Uint64("region-id", region.GetID()), + zap.Reflect("region-meta", core.HexRegionMeta(region.GetMeta())), + zap.Error(err)) } select { case c.changedRegions <- region: @@ -550,7 +579,10 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error { if c.kv != nil { for _, item := range overlaps { if err := c.kv.DeleteRegion(item); err != nil { - log.Errorf("[region %d] fail to delete region %v: %v", item.GetId(), core.HexRegionMeta(item), err) + log.Error("fail to delete region from kv", + zap.Uint64("region-id", item.GetId()), + zap.Reflect("region-meta", core.HexRegionMeta(item)), + zap.Error(err)) } } } diff --git a/server/cluster_worker.go b/server/cluster_worker.go index 1cbf2ce40e7..b1b5ba22c1f 100644 --- a/server/cluster_worker.go +++ b/server/cluster_worker.go @@ -19,9 +19,10 @@ import ( "github.com/gogo/protobuf/proto" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + log "github.com/pingcap/log" "github.com/pingcap/pd/server/core" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) // HandleRegionHeartbeat processes RegionInfo reports from client. @@ -34,7 +35,7 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { // If the region peer count is 0, then we should not handle this. if len(region.GetPeers()) == 0 { - log.Warnf("invalid region, zero region peer count: %v", core.HexRegionMeta(region.GetMeta())) + log.Warn("invalid region, zero region peer count", zap.Reflect("region-meta", core.HexRegionMeta(region.GetMeta()))) return errors.Errorf("invalid region, zero region peer count: %v", core.HexRegionMeta(region.GetMeta())) } @@ -169,7 +170,10 @@ func (c *RaftCluster) handleReportSplit(request *pdpb.ReportSplitRequest) (*pdpb err := c.checkSplitRegion(left, right) if err != nil { - log.Warnf("report split region is invalid: %v, %v, %v", core.HexRegionMeta(left), core.HexRegionMeta(right), err) + log.Warn("report split region is invalid", + zap.Reflect("left-region", core.HexRegionMeta(left)), + zap.Reflect("right-region", core.HexRegionMeta(right)), + zap.Error(err)) return nil, err } @@ -177,7 +181,9 @@ func (c *RaftCluster) handleReportSplit(request *pdpb.ReportSplitRequest) (*pdpb originRegion := proto.Clone(right).(*metapb.Region) originRegion.RegionEpoch = nil originRegion.StartKey = left.GetStartKey() - log.Infof("[region %d] region split, generate new region: %v", originRegion.GetId(), core.HexRegionMeta(left)) + log.Info("region split, generate new region", + zap.Uint64("region-id", originRegion.GetId()), + zap.Reflect("region-meta", core.HexRegionMeta(left))) return &pdpb.ReportSplitResponse{}, nil } @@ -190,11 +196,16 @@ func (c *RaftCluster) handleBatchReportSplit(request *pdpb.ReportBatchSplitReque err := c.checkSplitRegions(regions) if err != nil { - log.Warnf("report batch split region is invalid: %v, %v", hexRegionMetas, err) + log.Warn("report batch split region is invalid", + zap.Reflect("region-meta", hexRegionMetas), + zap.Error(err)) return nil, err } last := len(regions) - 1 originRegion := proto.Clone(regions[last]).(*metapb.Region) - log.Infof("[region %d] region split, generate %d new regions: %v", originRegion.GetId(), last, hexRegionMetas[:last]) + log.Info("region batch split, generate new regions", + zap.Uint64("region-id", originRegion.GetId()), + zap.Reflect("origin", hexRegionMetas[:last]), + zap.Int("total", last)) return &pdpb.ReportBatchSplitResponse{}, nil } diff --git a/server/coordinator.go b/server/coordinator.go index 9a8e72313eb..9f5f0647af0 100644 --- a/server/coordinator.go +++ b/server/coordinator.go @@ -19,12 +19,13 @@ import ( "sync" "time" + log "github.com/pingcap/log" "github.com/pingcap/pd/pkg/logutil" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/namespace" "github.com/pingcap/pd/server/schedule" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) const ( @@ -198,16 +199,16 @@ func (c *coordinator) run() { if schedulerCfg.Disable { scheduleCfg.Schedulers[k] = schedulerCfg k++ - log.Info("skip create ", schedulerCfg.Type) + log.Info("skip create scheduler", zap.String("scheduler-type", schedulerCfg.Type)) continue } s, err := schedule.CreateScheduler(schedulerCfg.Type, c.opController, schedulerCfg.Args...) if err != nil { - log.Fatalf("can not create scheduler %s: %v", schedulerCfg.Type, err) + log.Fatal("can not create scheduler", zap.String("scheduler-type", schedulerCfg.Type), zap.Error(err)) } - log.Infof("create scheduler %s", s.GetName()) + log.Info("create scheduler", zap.String("scheduler-name", s.GetName())) if err = c.addScheduler(s, schedulerCfg.Args...); err != nil { - log.Errorf("can not add scheduler %s: %v", s.GetName(), err) + log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Error(err)) } // Only records the valid scheduler config. @@ -221,7 +222,7 @@ func (c *coordinator) run() { scheduleCfg.Schedulers = scheduleCfg.Schedulers[:k] c.cluster.opt.store(scheduleCfg) if err := c.cluster.opt.persist(c.cluster.kv); err != nil { - log.Errorf("can't persist schedule config: %v", err) + log.Error("cannot persist schedule config", zap.Error(err)) } c.wg.Add(1) @@ -408,7 +409,9 @@ func (c *coordinator) runScheduler(s *scheduleController) { } case <-s.Ctx().Done(): - log.Infof("%v stopped: %v", s.GetName(), s.Ctx().Err()) + log.Info("stopped scheduler", + zap.String("scheduler-name", s.GetName()), + zap.Error(s.Ctx().Err())) return } } diff --git a/server/etcd_kv.go b/server/etcd_kv.go index 8553ec308b4..55a67baf027 100644 --- a/server/etcd_kv.go +++ b/server/etcd_kv.go @@ -19,8 +19,9 @@ import ( "time" "github.com/coreos/etcd/clientv3" + log "github.com/pingcap/log" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) const ( @@ -83,7 +84,7 @@ func (kv *etcdKVBase) Save(key, value string) error { resp, err := kv.server.leaderTxn().Then(clientv3.OpPut(key, value)).Commit() if err != nil { - log.Errorf("save to etcd error: %v", err) + log.Error("save to etcd meet error", zap.Error(err)) return errors.WithStack(err) } if !resp.Succeeded { @@ -97,7 +98,7 @@ func (kv *etcdKVBase) Delete(key string) error { resp, err := kv.server.leaderTxn().Then(clientv3.OpDelete(key)).Commit() if err != nil { - log.Errorf("delete from etcd error: %v", err) + log.Error("delete from etcd meet error", zap.Error(err)) return errors.WithStack(err) } if !resp.Succeeded { @@ -113,10 +114,10 @@ func kvGet(c *clientv3.Client, key string, opts ...clientv3.OpOption) (*clientv3 start := time.Now() resp, err := clientv3.NewKV(c).Get(ctx, key, opts...) if err != nil { - log.Errorf("load from etcd error: %v", err) + log.Error("load from etcd meet error", zap.Error(err)) } if cost := time.Since(start); cost > kvSlowRequestTime { - log.Warnf("kv gets too slow: key %v cost %v err %v", key, cost, err) + log.Warn("kv gets too slow", zap.String("request-key", key), zap.Duration("cost", cost), zap.Error(err)) } return resp, errors.WithStack(err) diff --git a/server/grpc_service.go b/server/grpc_service.go index f4bf516b364..9b15936d368 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -23,9 +23,10 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + log "github.com/pingcap/log" "github.com/pingcap/pd/server/core" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -207,7 +208,7 @@ func (s *Server) PutStore(ctx context.Context, request *pdpb.PutStoreRequest) (* return nil, status.Errorf(codes.Unknown, err.Error()) } - log.Infof("put store ok - %v", store) + log.Info("put store ok", zap.Stringer("store", store)) cluster.RLock() defer cluster.RUnlock() cluster.cachedCluster.OnStoreVersionChange() @@ -561,7 +562,7 @@ func (s *Server) PutClusterConfig(ctx context.Context, request *pdpb.PutClusterC return nil, status.Errorf(codes.Unknown, err.Error()) } - log.Infof("put cluster config ok - %v", conf) + log.Info("put cluster config ok", zap.Reflect("config", conf)) return &pdpb.PutClusterConfigResponse{ Header: s.header(), @@ -652,9 +653,12 @@ func (s *Server) UpdateGCSafePoint(ctx context.Context, request *pdpb.UpdateGCSa if err := s.kv.SaveGCSafePoint(newSafePoint); err != nil { return nil, err } - log.Infof("updated gc safe point to %d", newSafePoint) + log.Info("updated gc safe point", + zap.Uint64("safe-point", newSafePoint)) } else if newSafePoint < oldSafePoint { - log.Warnf("trying to update gc safe point from %d to %d", oldSafePoint, newSafePoint) + log.Warn("trying to update gc safe point", + zap.Uint64("old-safe-point", oldSafePoint), + zap.Uint64("new-safe-point", newSafePoint)) newSafePoint = oldSafePoint } diff --git a/server/handler.go b/server/handler.go index e5518d31165..6036a5d5a3b 100644 --- a/server/handler.go +++ b/server/handler.go @@ -22,10 +22,11 @@ import ( "github.com/pingcap/errcode" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + log "github.com/pingcap/log" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/schedule" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) var ( @@ -170,11 +171,11 @@ func (h *Handler) AddScheduler(name string, args ...string) error { if err != nil { return err } - log.Infof("create scheduler %s", s.GetName()) + log.Info("create scheduler", zap.String("scheduler-name", s.GetName())) if err = c.addScheduler(s, args...); err != nil { - log.Errorf("can not add scheduler %v: %v", s.GetName(), err) + log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Error(err)) } else if err = h.opt.persist(c.cluster.kv); err != nil { - log.Errorf("can not persist scheduler config: %v", err) + log.Error("can not persist scheduler config", zap.Error(err)) } return err } @@ -186,9 +187,9 @@ func (h *Handler) RemoveScheduler(name string) error { return err } if err = c.removeScheduler(name); err != nil { - log.Errorf("can not remove scheduler %v: %v", name, err) + log.Error("can not remove scheduler", zap.String("scheduler-name", name), zap.Error(err)) } else if err = h.opt.persist(c.cluster.kv); err != nil { - log.Errorf("can not persist scheduler config: %v", err) + log.Error("can not persist scheduler config", zap.Error(err)) } return err } diff --git a/server/heartbeat_stream_test.go b/server/heartbeat_stream_test.go index c517cee3c11..48784180c6c 100644 --- a/server/heartbeat_stream_test.go +++ b/server/heartbeat_stream_test.go @@ -20,9 +20,10 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + log "github.com/pingcap/log" "github.com/pingcap/pd/pkg/testutil" "github.com/pingcap/pd/pkg/typeutil" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) var _ = Suite(&testHeartbeatStreamSuite{}) @@ -113,13 +114,13 @@ func newRegionheartbeatClient(c *C, grpcClient pdpb.PDClient) *regionHeartbeatCl func (c *regionHeartbeatClient) close() { if err := c.stream.CloseSend(); err != nil { - log.Errorf("Failed to terminate client stream: %v", err) + log.Error("failed to terminate client stream", zap.Error(err)) } } func (c *regionHeartbeatClient) SendRecv(msg *pdpb.RegionHeartbeatRequest, timeout time.Duration) *pdpb.RegionHeartbeatResponse { if err := c.stream.Send(msg); err != nil { - log.Errorf("send heartbeat message fail: %v", err) + log.Error("send heartbeat message fail", zap.Error(err)) } select { case <-time.After(timeout): diff --git a/server/heartbeat_streams.go b/server/heartbeat_streams.go index a267b7cf59b..c1bcd2ad835 100644 --- a/server/heartbeat_streams.go +++ b/server/heartbeat_streams.go @@ -20,9 +20,10 @@ import ( "time" "github.com/pingcap/kvproto/pkg/pdpb" + log "github.com/pingcap/log" "github.com/pingcap/pd/pkg/logutil" "github.com/pingcap/pd/server/core" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) const heartbeatStreamKeepAliveInterval = time.Minute @@ -80,21 +81,24 @@ func (s *heartbeatStreams) run() { storeLabel := strconv.FormatUint(storeID, 10) if stream, ok := s.streams[storeID]; ok { if err := stream.Send(msg); err != nil { - log.Errorf("[region %v] send heartbeat message fail: %v", msg.RegionId, err) + log.Error("send heartbeat message fail", + zap.Uint64("region-id", msg.RegionId), zap.Error(err)) delete(s.streams, storeID) regionHeartbeatCounter.WithLabelValues(storeLabel, "push", "err").Inc() } else { regionHeartbeatCounter.WithLabelValues(storeLabel, "push", "ok").Inc() } } else { - log.Debugf("[region %v] heartbeat stream not found for store %v, skip send message", msg.RegionId, storeID) + log.Debug("heartbeat stream not found, skip send message", zap.Uint64("region-id", msg.RegionId), zap.Uint64("store-id", storeID)) regionHeartbeatCounter.WithLabelValues(storeLabel, "push", "skip").Inc() } case <-keepAliveTicker.C: for storeID, stream := range s.streams { storeLabel := strconv.FormatUint(storeID, 10) if err := stream.Send(keepAlive); err != nil { - log.Errorf("[store %v] send keepalive message fail: %v", storeID, err) + log.Error("send keepalive message fail", + zap.Uint64("target-store-id", storeID), + zap.Error(err)) delete(s.streams, storeID) regionHeartbeatCounter.WithLabelValues(storeLabel, "keepalive", "err").Inc() } else { diff --git a/server/id.go b/server/id.go index 17b3d3d8ec0..3c6f7b4ff17 100644 --- a/server/id.go +++ b/server/id.go @@ -17,8 +17,9 @@ import ( "sync" "github.com/coreos/etcd/clientv3" + log "github.com/pingcap/log" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) const ( @@ -87,7 +88,7 @@ func (alloc *idAllocator) generate() (uint64, error) { return 0, errors.New("generate id failed, we may not leader") } - log.Infof("idAllocator allocates a new id: %d", end) + log.Info("idAllocator allocates a new id", zap.Uint64("alloc-id", end)) metadataGauge.WithLabelValues("idalloc").Set(float64(end)) return end, nil } diff --git a/server/join.go b/server/join.go index 3de0d21c94d..57eebf4f141 100644 --- a/server/join.go +++ b/server/join.go @@ -22,9 +22,10 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/embed" + log "github.com/pingcap/log" "github.com/pingcap/pd/pkg/etcdutil" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) const ( @@ -86,7 +87,7 @@ func PrepareJoinCluster(cfg *Config) error { if _, err := os.Stat(filePath); !os.IsNotExist(err) { s, err := ioutil.ReadFile(filePath) if err != nil { - log.Fatal("read the join config meet error: ", err) + log.Fatal("read the join config meet error", zap.Error(err)) } cfg.InitialCluster = strings.TrimSpace(string(s)) cfg.InitialClusterState = embed.ClusterStateFlagExisting @@ -176,14 +177,14 @@ func PrepareJoinCluster(cfg *Config) error { func isDataExist(d string) bool { dir, err := os.Open(d) if err != nil { - log.Error("failed to open:", err) + log.Error("failed to open directory", zap.Error(err)) return false } defer dir.Close() names, err := dir.Readdirnames(-1) if err != nil { - log.Error("failed to list:", err) + log.Error("failed to list directory", zap.Error(err)) return false } return len(names) != 0 diff --git a/server/leader.go b/server/leader.go index 81fb2e875e4..3253b8a2241 100644 --- a/server/leader.go +++ b/server/leader.go @@ -15,7 +15,6 @@ package server import ( "context" - "fmt" "math/rand" "path" "strings" @@ -24,10 +23,11 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/mvcc/mvccpb" "github.com/pingcap/kvproto/pkg/pdpb" + log "github.com/pingcap/log" "github.com/pingcap/pd/pkg/etcdutil" "github.com/pingcap/pd/pkg/logutil" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) // IsLeader returns whether the server is leader or not. @@ -72,7 +72,7 @@ func (s *Server) leaderLoop() { for { if s.isClosed() { - log.Infof("server is closed, return leader loop") + log.Info("server is closed, return leader loop") return } @@ -84,7 +84,7 @@ func (s *Server) leaderLoop() { leader, rev, err := getLeader(s.client, s.getLeaderPath()) if err != nil { - log.Errorf("get leader err %v", err) + log.Error("get leader meet error", zap.Error(err)) time.Sleep(200 * time.Millisecond) continue } @@ -92,14 +92,14 @@ func (s *Server) leaderLoop() { if s.isSameLeader(leader) { // oh, we are already leader, we may meet something wrong // in previous campaignLeader. we can delete and campaign again. - log.Warnf("leader is still %s, delete and campaign again", leader) + log.Warn("the leader has not changed, delete and campaign again", zap.Stringer("old-leader", leader)) if err = s.deleteLeaderKey(); err != nil { - log.Errorf("delete leader key err %s", err) + log.Error("delete leader key meet error", zap.Error(err)) time.Sleep(200 * time.Millisecond) continue } } else { - log.Infof("leader is %s, watch it", leader) + log.Info("start watch leader", zap.Stringer("leader", leader)) s.watchLeader(leader, rev) log.Info("leader changed, try to campaign leader") } @@ -107,13 +107,15 @@ func (s *Server) leaderLoop() { etcdLeader := s.GetEtcdLeader() if etcdLeader != s.ID() { - log.Infof("%v is not etcd leader, skip campaign leader and check later", s.Name()) + log.Info("skip campaign leader and check later", + zap.String("server-name", s.Name()), + zap.Uint64("etcd-leader-id", etcdLeader)) time.Sleep(200 * time.Millisecond) continue } if err = s.campaignLeader(); err != nil { - log.Errorf("campaign leader err %s", fmt.Sprintf("%+v", err)) + log.Error("campaign leader meet error", zap.Error(err)) } } } @@ -133,20 +135,22 @@ func (s *Server) etcdLeaderLoop() { } myPriority, err := s.GetMemberLeaderPriority(s.ID()) if err != nil { - log.Errorf("failed to load leader priority: %v", err) + log.Error("failed to load leader priority", zap.Error(err)) break } leaderPriority, err := s.GetMemberLeaderPriority(etcdLeader) if err != nil { - log.Errorf("failed to load leader priority: %v", err) + log.Error("failed to load leader priority", zap.Error(err)) break } if myPriority > leaderPriority { err := s.etcd.Server.MoveLeader(ctx, etcdLeader, s.ID()) if err != nil { - log.Errorf("failed to transfer etcd leader: %v", err) + log.Error("failed to transfer etcd leader", zap.Error(err)) } else { - log.Infof("etcd leader moved from %v to %v", etcdLeader, s.ID()) + log.Info("transfer etcd leader", + zap.Uint64("from", etcdLeader), + zap.Uint64("to", s.ID())) } } case <-ctx.Done(): @@ -190,14 +194,14 @@ func (s *Server) memberInfo() (member *pdpb.Member, marshalStr string) { data, err := leader.Marshal() if err != nil { // can't fail, so panic here. - log.Fatalf("marshal leader %s err %v", leader, err) + log.Fatal("marshal leader meet error", zap.Stringer("leader", leader), zap.Error(err)) } return leader, string(data) } func (s *Server) campaignLeader() error { - log.Debugf("begin to campaign leader %s", s.Name()) + log.Debug("begin to campaign leader", zap.String("campaign-leader-name", s.Name())) lessor := clientv3.NewLease(s.client) defer lessor.Close() @@ -208,7 +212,7 @@ func (s *Server) campaignLeader() error { cancel() if cost := time.Since(start); cost > slowRequestTime { - log.Warnf("lessor grants too slow, cost %s", cost) + log.Warn("lessor grants too slow", zap.Duration("cost", cost)) } if err != nil { @@ -236,7 +240,7 @@ func (s *Server) campaignLeader() error { if err != nil { return errors.WithStack(err) } - log.Debugf("campaign leader ok %s", s.Name()) + log.Debug("campaign leader ok", zap.String("campaign-leader-name", s.Name())) err = s.reloadConfigFromKV() if err != nil { @@ -260,8 +264,8 @@ func (s *Server) campaignLeader() error { s.enableLeader() defer s.disableLeader() - log.Infof("cluster version is %s", s.scheduleOpt.loadClusterVersion()) - log.Infof("PD cluster leader %s is ready to serve", s.Name()) + log.Info("load cluster version", zap.Stringer("cluster-version", s.scheduleOpt.loadClusterVersion())) + log.Info("PD cluster leader is ready to serve", zap.String("leader-name", s.Name())) CheckPDVersion(s.scheduleOpt) tsTicker := time.NewTicker(updateTimestampStep) @@ -280,7 +284,7 @@ func (s *Server) campaignLeader() error { } etcdLeader := s.GetEtcdLeader() if etcdLeader != s.ID() { - log.Infof("etcd leader changed, %s resigns leadership", s.Name()) + log.Info("etcd leader changed, resigns leadership", zap.String("old-leader-name", s.Name())) return nil } case <-ctx.Done(): @@ -302,7 +306,7 @@ func (s *Server) watchLeader(leader *pdpb.Member, revision int64) { defer cancel() err := s.reloadConfigFromKV() if err != nil { - log.Error("reload config failed:", err) + log.Error("reload config failed", zap.Error(err)) return } if s.scheduleOpt.loadPDServerConfig().UseRegionStorage { @@ -319,12 +323,14 @@ func (s *Server) watchLeader(leader *pdpb.Member, revision int64) { for wresp := range rch { // meet compacted error, use the compact revision. if wresp.CompactRevision != 0 { - log.Warnf("required revision %d has been compacted, use the compact revision %d", revision, wresp.CompactRevision) + log.Warn("required revision has been compacted, use the compact revision", + zap.Int64("required-revision", revision), + zap.Int64("compact-revision", wresp.CompactRevision)) revision = wresp.CompactRevision break } if wresp.Canceled { - log.Errorf("leader watcher is canceled with revision: %d, error: %s", revision, wresp.Err()) + log.Error("leader watcher is canceled with", zap.Int64("revision", revision), zap.Error(wresp.Err())) return } @@ -348,7 +354,7 @@ func (s *Server) watchLeader(leader *pdpb.Member, revision int64) { // ResignLeader resigns current PD's leadership. If nextLeader is empty, all // other pd-servers can campaign. func (s *Server) ResignLeader(nextLeader string) error { - log.Infof("%s tries to resign leader with next leader directive: %v", s.Name(), nextLeader) + log.Info("try to resign leader to next leader", zap.String("from", s.Name()), zap.String("to", nextLeader)) // Determine next leaders. var leaderIDs []uint64 res, err := etcdutil.ListEtcdMembers(s.client) @@ -364,7 +370,6 @@ func (s *Server) ResignLeader(nextLeader string) error { return errors.New("no valid pd to transfer leader") } nextLeaderID := leaderIDs[rand.Intn(len(leaderIDs))] - log.Infof("%s ready to resign leader, next leader: %v", s.Name(), nextLeaderID) err = s.etcd.Server.MoveLeader(s.serverLoopCtx, s.ID(), nextLeaderID) return errors.WithStack(err) } diff --git a/server/systime_mon.go b/server/systime_mon.go index d1e8fd09085..f8664be8aac 100644 --- a/server/systime_mon.go +++ b/server/systime_mon.go @@ -16,7 +16,8 @@ package server import ( "time" - log "github.com/sirupsen/logrus" + log "github.com/pingcap/log" + "go.uber.org/zap" ) // StartMonitor calls systimeErrHandler if system time jump backward. @@ -28,7 +29,7 @@ func StartMonitor(now func() time.Time, systimeErrHandler func()) { last := now().UnixNano() <-tick.C if now().UnixNano() < last { - log.Errorf("system time jump backward, last:%v", last) + log.Error("system time jump backward", zap.Int64("last", last)) systimeErrHandler() } } diff --git a/server/tso.go b/server/tso.go index 58a0f2b0e0d..4c51cca41c9 100644 --- a/server/tso.go +++ b/server/tso.go @@ -20,8 +20,9 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/pingcap/kvproto/pkg/pdpb" + log "github.com/pingcap/log" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) const ( @@ -91,7 +92,7 @@ func (s *Server) syncTimestamp() error { // If the current system time minus the saved etcd timestamp is less than `updateTimestampGuard`, // the timestamp allocation will start from the saved etcd timestamp temporarily. if subTimeByWallClock(next, last) < updateTimestampGuard { - log.Errorf("system time may be incorrect: last: %v next %v", last, next) + log.Error("system time may be incorrect", zap.Time("last", last), zap.Time("next", next)) next = last.Add(updateTimestampGuard) } @@ -101,7 +102,7 @@ func (s *Server) syncTimestamp() error { } tsoCounter.WithLabelValues("sync_ok").Inc() - log.Infof("sync and save timestamp: last %v save %v next %v", last, save, next) + log.Info("sync and save timestamp", zap.Time("last", last), zap.Time("save", save), zap.Time("next", next)) current := &atomicObject{ physical: next, @@ -134,7 +135,7 @@ func (s *Server) updateTimestamp() error { jetLag := subTimeByWallClock(now, prev.physical) if jetLag > 3*updateTimestampStep { - log.Warnf("clock offset: %v, prev: %v, now: %v", jetLag, prev.physical, now) + log.Warn("clock offset", zap.Duration("jet-lag", jetLag), zap.Time("prev-physical", prev.physical), zap.Time("now", now)) tsoCounter.WithLabelValues("slow_save").Inc() } @@ -150,7 +151,7 @@ func (s *Server) updateTimestamp() error { } else if prevLogical > maxLogical/2 { // The reason choosing maxLogical/2 here is that it's big enough for common cases. // Because there is enough timestamp can be allocated before next update. - log.Warnf("the logical time may be not enough, prevLogical: %v", prevLogical) + log.Warn("the logical time may be not enough", zap.Int64("prev-logical", prevLogical)) next = prev.physical.Add(time.Millisecond) } else { // It will still use the previous physical time to alloc the timestamp. @@ -190,7 +191,7 @@ func (s *Server) getRespTS(count uint32) (pdpb.Timestamp, error) { for i := 0; i < maxRetryCount; i++ { current, ok := s.ts.Load().(*atomicObject) if !ok || current.physical == zeroTime { - log.Errorf("we haven't synced timestamp ok, wait and retry, retry count %d", i) + log.Error("we haven't synced timestamp ok, wait and retry", zap.Int("retry-count", i)) time.Sleep(200 * time.Millisecond) continue } @@ -198,7 +199,9 @@ func (s *Server) getRespTS(count uint32) (pdpb.Timestamp, error) { resp.Physical = current.physical.UnixNano() / int64(time.Millisecond) resp.Logical = atomic.AddInt64(¤t.logical, int64(count)) if resp.Logical >= maxLogical { - log.Errorf("logical part outside of max logical interval %v, please check ntp time, retry count %d", resp, i) + log.Error("logical part outside of max logical interval, please check ntp time", + zap.Reflect("response", resp), + zap.Int("retry-count", i)) tsoCounter.WithLabelValues("logical_overflow").Inc() time.Sleep(updateTimestampStep) continue diff --git a/server/version.go b/server/version.go index 6b853fb88aa..485b3c8647b 100644 --- a/server/version.go +++ b/server/version.go @@ -15,8 +15,9 @@ package server import ( "github.com/coreos/go-semver/semver" + log "github.com/pingcap/log" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) // Feature supported features. @@ -53,7 +54,7 @@ var featuresDict = map[Feature]string{ func MinSupportedVersion(v Feature) semver.Version { target, ok := featuresDict[v] if !ok { - log.Fatalf("the corresponding version of the feature %d doesn't exist", v) + log.Fatal("the corresponding version of the feature doesn't exist", zap.Int("feature-number", int(v))) } version := MustParseVersion(target) return *version @@ -76,7 +77,7 @@ func ParseVersion(v string) (*semver.Version, error) { func MustParseVersion(v string) *semver.Version { ver, err := ParseVersion(v) if err != nil { - log.Fatalf("version string is illegal: %s", err) + log.Fatal("version string is illegal", zap.Error(err)) } return ver }