Skip to content

Commit

Permalink
*: log format for server (#1431)
Browse files Browse the repository at this point in the history
* *: log format for server

Signed-off-by: nolouch <[email protected]>
  • Loading branch information
nolouch authored Feb 19, 2019
1 parent 376c933 commit 5e81548
Show file tree
Hide file tree
Showing 15 changed files with 186 additions and 102 deletions.
37 changes: 26 additions & 11 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import (
"github.com/pingcap/errcode"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
log "github.com/pingcap/log"
"github.com/pingcap/pd/pkg/logutil"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/namespace"
syncer "github.com/pingcap/pd/server/region_syncer"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"go.uber.org/zap"
)

var backgroundJobInterval = time.Minute
Expand Down Expand Up @@ -400,7 +401,9 @@ func (c *RaftCluster) putStore(store *metapb.Store) error {
// Check location labels.
for _, k := range c.cachedCluster.GetLocationLabels() {
if v := s.GetLabelValue(k); len(v) == 0 {
log.Warnf("missing location label %q in store %v", k, s)
log.Warn("missing location label",
zap.Stringer("store", s.GetMeta()),
zap.String("label-key", k))
}
}
return cluster.putStore(s)
Expand Down Expand Up @@ -430,7 +433,9 @@ func (c *RaftCluster) RemoveStore(storeID uint64) error {
}

newStore := store.Clone(core.SetStoreState(metapb.StoreState_Offline))
log.Warnf("[store %d] store %s has been Offline", newStore.GetID(), newStore.GetAddress())
log.Warn("store has been offline",
zap.Uint64("store-id", newStore.GetID()),
zap.String("store-address", newStore.GetAddress()))
return cluster.putStore(newStore)
}

Expand Down Expand Up @@ -458,11 +463,13 @@ func (c *RaftCluster) BuryStore(storeID uint64, force bool) error { // revive:di
if !force {
return errors.New("store is still up, please remove store gracefully")
}
log.Warnf("forcedly bury store %v", store)
log.Warn("forcedly bury store", zap.Stringer("store", store.GetMeta()))
}

newStore := store.Clone(core.SetStoreState(metapb.StoreState_Tombstone))
log.Warnf("[store %d] store %s has been Tombstone", newStore.GetID(), newStore.GetAddress())
log.Warn("store has been Tombstone",
zap.Uint64("store-id", newStore.GetID()),
zap.String("store-address", newStore.GetAddress()))
return cluster.putStore(newStore)
}

Expand All @@ -479,7 +486,9 @@ func (c *RaftCluster) SetStoreState(storeID uint64, state metapb.StoreState) err
}

newStore := store.Clone(core.SetStoreState(state))
log.Warnf("[store %d] set state to %v", storeID, state.String())
log.Warn("store update state",
zap.Uint64("store-id", storeID),
zap.Stringer("new-state", state))
return cluster.putStore(newStore)
}

Expand Down Expand Up @@ -526,7 +535,9 @@ func (c *RaftCluster) checkStores() {
// If the store is empty, it can be buried.
if cluster.getStoreRegionCount(offlineStore.GetId()) == 0 {
if err := c.BuryStore(offlineStore.GetId(), false); err != nil {
log.Errorf("bury store %v failed: %v", offlineStore, err)
log.Error("bury store failed",
zap.Stringer("store", offlineStore),
zap.Error(err))
}
} else {
offlineStores = append(offlineStores, offlineStore)
Expand All @@ -539,7 +550,7 @@ func (c *RaftCluster) checkStores() {

if upStoreCount < cluster.GetMaxReplicas() {
for _, offlineStore := range offlineStores {
log.Warnf("store %v may not turn into Tombstone, there are no extra up node has enough space to accommodate the extra replica", offlineStore)
log.Warn("store may not turn into Tombstone, there are no extra up node has enough space to accommodate the extra replica", zap.Stringer("store", offlineStore))
}
}
}
Expand All @@ -550,13 +561,17 @@ func (c *RaftCluster) checkOperators() {
// after region is merged, it will not heartbeat anymore
// the operator of merged region will not timeout actively
if c.cachedCluster.GetRegion(op.RegionID()) == nil {
log.Debugf("remove operator %v cause region %d is merged", op, op.RegionID())
log.Debug("remove operator cause region is merged",
zap.Uint64("region-id", op.RegionID()),
zap.Stringer("operator", op))
opController.RemoveOperator(op)
continue
}

if op.IsTimeout() {
log.Infof("[region %v] operator timeout: %s", op.RegionID(), op)
log.Info("operator timeout",
zap.Uint64("region-id", op.RegionID()),
zap.Stringer("operator", op))
opController.RemoveOperator(op)
}
}
Expand All @@ -580,7 +595,7 @@ func (c *RaftCluster) collectHealthStatus() {
client := c.s.GetClient()
members, err := GetMembers(client)
if err != nil {
log.Info("get members error:", err)
log.Error("get members error", zap.Error(err))
}
unhealth := c.s.CheckHealth(members)
for _, member := range members {
Expand Down
56 changes: 44 additions & 12 deletions server/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
log "github.com/pingcap/log"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/namespace"
"github.com/pingcap/pd/server/schedule"
log "github.com/sirupsen/logrus"
"go.uber.org/zap"
)

type clusterInfo struct {
Expand Down Expand Up @@ -72,13 +73,19 @@ func loadClusterInfo(id core.IDAllocator, kv *core.KV, opt *scheduleOption) (*cl
if err := kv.LoadStores(c.core.Stores); err != nil {
return nil, err
}
log.Infof("load %v stores cost %v", c.core.Stores.GetStoreCount(), time.Since(start))
log.Info("load stores",
zap.Int("count", c.core.Stores.GetStoreCount()),
zap.Duration("cost", time.Since(start)),
)

start = time.Now()
if err := kv.LoadRegions(c.core.Regions); err != nil {
return nil, err
}
log.Infof("load %v regions cost %v", c.core.Regions.GetRegionCount(), time.Since(start))
log.Info("load regions",
zap.Int("count", c.core.Regions.GetRegionCount()),
zap.Duration("cost", time.Since(start)),
)

return c, nil
}
Expand Down Expand Up @@ -107,9 +114,11 @@ func (c *clusterInfo) OnStoreVersionChange() {
c.opt.SetClusterVersion(*minVersion)
err := c.opt.persist(c.kv)
if err != nil {
log.Infof("persist cluster version meet error: %s", err)
log.Error("persist cluster version meet error", zap.Error(err))
}
log.Infof("cluster version changed from %s to %s", clusterVersion, minVersion)
log.Info("cluster version changed",
zap.Stringer("old-cluster-version", clusterVersion),
zap.Stringer("new-cluster-version", minVersion))
CheckPDVersion(c.opt)
}
}
Expand All @@ -133,7 +142,7 @@ func (c *clusterInfo) allocID() (uint64, error) {
func (c *clusterInfo) AllocPeer(storeID uint64) (*metapb.Peer, error) {
peerID, err := c.allocID()
if err != nil {
log.Errorf("failed to alloc peer: %v", err)
log.Error("failed to alloc peer", zap.Error(err))
return nil, err
}
peer := &metapb.Peer{
Expand Down Expand Up @@ -482,7 +491,10 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error {
// Mark isNew if the region in cache does not have leader.
var saveKV, saveCache, isNew bool
if origin == nil {
log.Debugf("[region %d] Insert new region {%v}", region.GetID(), core.HexRegionMeta(region.GetMeta()))
log.Debug("insert new region",
zap.Uint64("region-id", region.GetID()),
zap.Reflect("meta-region", core.HexRegionMeta(region.GetMeta())),
)
saveKV, saveCache, isNew = true, true, true
} else {
r := region.GetRegionEpoch()
Expand All @@ -492,18 +504,32 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error {
return ErrRegionIsStale(region.GetMeta(), origin.GetMeta())
}
if r.GetVersion() > o.GetVersion() {
log.Infof("[region %d] %s, Version changed from {%d} to {%d}", region.GetID(), core.DiffRegionKeyInfo(origin, region), o.GetVersion(), r.GetVersion())
log.Info("region Version changed",
zap.Uint64("region-id", region.GetID()),
zap.String("detail", core.DiffRegionKeyInfo(origin, region)),
zap.Uint64("old-version", o.GetVersion()),
zap.Uint64("new-version", r.GetVersion()),
)
saveKV, saveCache = true, true
}
if r.GetConfVer() > o.GetConfVer() {
log.Infof("[region %d] %s, ConfVer changed from {%d} to {%d}", region.GetID(), core.DiffRegionPeersInfo(origin, region), o.GetConfVer(), r.GetConfVer())
log.Info("region ConfVer changed",
zap.Uint64("region-id", region.GetID()),
zap.String("detail", core.DiffRegionPeersInfo(origin, region)),
zap.Uint64("old-confver", o.GetConfVer()),
zap.Uint64("new-confver", r.GetConfVer()),
)
saveKV, saveCache = true, true
}
if region.GetLeader().GetId() != origin.GetLeader().GetId() {
if origin.GetLeader().GetId() == 0 {
isNew = true
} else {
log.Infof("[region %d] Leader changed from store {%d} to {%d}", region.GetID(), origin.GetLeader().GetStoreId(), region.GetLeader().GetStoreId())
log.Info("leader changed",
zap.Uint64("region-id", region.GetID()),
zap.Uint64("from", origin.GetLeader().GetStoreId()),
zap.Uint64("to", region.GetLeader().GetStoreId()),
)
}
saveCache = true
}
Expand All @@ -528,7 +554,10 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error {
if err := c.kv.SaveRegion(region.GetMeta()); err != nil {
// Not successfully saved to kv is not fatal, it only leads to longer warm-up
// after restart. Here we only log the error then go on updating cache.
log.Errorf("[region %d] fail to save region %v: %v", region.GetID(), core.HexRegionMeta(region.GetMeta()), err)
log.Error("fail to save region to kv",
zap.Uint64("region-id", region.GetID()),
zap.Reflect("region-meta", core.HexRegionMeta(region.GetMeta())),
zap.Error(err))
}
select {
case c.changedRegions <- region:
Expand All @@ -550,7 +579,10 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error {
if c.kv != nil {
for _, item := range overlaps {
if err := c.kv.DeleteRegion(item); err != nil {
log.Errorf("[region %d] fail to delete region %v: %v", item.GetId(), core.HexRegionMeta(item), err)
log.Error("fail to delete region from kv",
zap.Uint64("region-id", item.GetId()),
zap.Reflect("region-meta", core.HexRegionMeta(item)),
zap.Error(err))
}
}
}
Expand Down
23 changes: 17 additions & 6 deletions server/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
log "github.com/pingcap/log"
"github.com/pingcap/pd/server/core"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"go.uber.org/zap"
)

// HandleRegionHeartbeat processes RegionInfo reports from client.
Expand All @@ -34,7 +35,7 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error {

// If the region peer count is 0, then we should not handle this.
if len(region.GetPeers()) == 0 {
log.Warnf("invalid region, zero region peer count: %v", core.HexRegionMeta(region.GetMeta()))
log.Warn("invalid region, zero region peer count", zap.Reflect("region-meta", core.HexRegionMeta(region.GetMeta())))
return errors.Errorf("invalid region, zero region peer count: %v", core.HexRegionMeta(region.GetMeta()))
}

Expand Down Expand Up @@ -169,15 +170,20 @@ func (c *RaftCluster) handleReportSplit(request *pdpb.ReportSplitRequest) (*pdpb

err := c.checkSplitRegion(left, right)
if err != nil {
log.Warnf("report split region is invalid: %v, %v, %v", core.HexRegionMeta(left), core.HexRegionMeta(right), err)
log.Warn("report split region is invalid",
zap.Reflect("left-region", core.HexRegionMeta(left)),
zap.Reflect("right-region", core.HexRegionMeta(right)),
zap.Error(err))
return nil, err
}

// Build origin region by using left and right.
originRegion := proto.Clone(right).(*metapb.Region)
originRegion.RegionEpoch = nil
originRegion.StartKey = left.GetStartKey()
log.Infof("[region %d] region split, generate new region: %v", originRegion.GetId(), core.HexRegionMeta(left))
log.Info("region split, generate new region",
zap.Uint64("region-id", originRegion.GetId()),
zap.Reflect("region-meta", core.HexRegionMeta(left)))
return &pdpb.ReportSplitResponse{}, nil
}

Expand All @@ -190,11 +196,16 @@ func (c *RaftCluster) handleBatchReportSplit(request *pdpb.ReportBatchSplitReque

err := c.checkSplitRegions(regions)
if err != nil {
log.Warnf("report batch split region is invalid: %v, %v", hexRegionMetas, err)
log.Warn("report batch split region is invalid",
zap.Reflect("region-meta", hexRegionMetas),
zap.Error(err))
return nil, err
}
last := len(regions) - 1
originRegion := proto.Clone(regions[last]).(*metapb.Region)
log.Infof("[region %d] region split, generate %d new regions: %v", originRegion.GetId(), last, hexRegionMetas[:last])
log.Info("region batch split, generate new regions",
zap.Uint64("region-id", originRegion.GetId()),
zap.Reflect("origin", hexRegionMetas[:last]),
zap.Int("total", last))
return &pdpb.ReportBatchSplitResponse{}, nil
}
17 changes: 10 additions & 7 deletions server/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ import (
"sync"
"time"

log "github.com/pingcap/log"
"github.com/pingcap/pd/pkg/logutil"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/namespace"
"github.com/pingcap/pd/server/schedule"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -198,16 +199,16 @@ func (c *coordinator) run() {
if schedulerCfg.Disable {
scheduleCfg.Schedulers[k] = schedulerCfg
k++
log.Info("skip create ", schedulerCfg.Type)
log.Info("skip create scheduler", zap.String("scheduler-type", schedulerCfg.Type))
continue
}
s, err := schedule.CreateScheduler(schedulerCfg.Type, c.opController, schedulerCfg.Args...)
if err != nil {
log.Fatalf("can not create scheduler %s: %v", schedulerCfg.Type, err)
log.Fatal("can not create scheduler", zap.String("scheduler-type", schedulerCfg.Type), zap.Error(err))
}
log.Infof("create scheduler %s", s.GetName())
log.Info("create scheduler", zap.String("scheduler-name", s.GetName()))
if err = c.addScheduler(s, schedulerCfg.Args...); err != nil {
log.Errorf("can not add scheduler %s: %v", s.GetName(), err)
log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Error(err))
}

// Only records the valid scheduler config.
Expand All @@ -221,7 +222,7 @@ func (c *coordinator) run() {
scheduleCfg.Schedulers = scheduleCfg.Schedulers[:k]
c.cluster.opt.store(scheduleCfg)
if err := c.cluster.opt.persist(c.cluster.kv); err != nil {
log.Errorf("can't persist schedule config: %v", err)
log.Error("cannot persist schedule config", zap.Error(err))
}

c.wg.Add(1)
Expand Down Expand Up @@ -408,7 +409,9 @@ func (c *coordinator) runScheduler(s *scheduleController) {
}

case <-s.Ctx().Done():
log.Infof("%v stopped: %v", s.GetName(), s.Ctx().Err())
log.Info("stopped scheduler",
zap.String("scheduler-name", s.GetName()),
zap.Error(s.Ctx().Err()))
return
}
}
Expand Down
Loading

0 comments on commit 5e81548

Please sign in to comment.