Skip to content

Commit

Permalink
cherry pick tikv#2853 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
rleungx committed Sep 7, 2020
1 parent 30e7167 commit 6756173
Show file tree
Hide file tree
Showing 31 changed files with 563 additions and 129 deletions.
14 changes: 7 additions & 7 deletions client/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (c *baseClient) leaderLoop() {
}

if err := c.updateLeader(); err != nil {
log.Error("[pd] failed updateLeader", errs.ZapError(errs.ErrUpdateLeader, err))
log.Error("[pd] failed updateLeader", errs.ZapError(err))
}
}
}
Expand Down Expand Up @@ -178,7 +178,7 @@ func (c *baseClient) initClusterID() error {
members, err := c.getMembers(timeoutCtx, u)
timeoutCancel()
if err != nil || members.GetHeader() == nil {
log.Warn("[pd] failed to get cluster id", zap.String("url", u), errs.ZapError(errs.ErrGetClusterID, err))
log.Warn("[pd] failed to get cluster id", zap.String("url", u), errs.ZapError(err))
continue
}
c.clusterID = members.GetHeader().GetClusterId()
Expand All @@ -192,7 +192,7 @@ func (c *baseClient) updateLeader() error {
ctx, cancel := context.WithTimeout(c.ctx, updateLeaderTimeout)
members, err := c.getMembers(ctx, u)
if err != nil {
log.Warn("[pd] cannot update leader", zap.String("address", u), errs.ZapError(errs.ErrUpdateLeader, err))
log.Warn("[pd] cannot update leader", zap.String("address", u), errs.ZapError(err))
}
cancel()
if err != nil || members.GetLeader() == nil || len(members.GetLeader().GetClientUrls()) == 0 {
Expand All @@ -206,7 +206,7 @@ func (c *baseClient) updateLeader() error {
c.updateURLs(members.GetMembers())
return c.switchLeader(members.GetLeader().GetClientUrls())
}
return errors.Errorf("failed to get leader from %v", c.urls)
return errs.ErrClientGetLeader.FastGenByArgs(c.urls)
}

func (c *baseClient) getMembers(ctx context.Context, url string) (*pdpb.GetMembersResponse, error) {
Expand All @@ -217,7 +217,7 @@ func (c *baseClient) getMembers(ctx context.Context, url string) (*pdpb.GetMembe
members, err := pdpb.NewPDClient(cc).GetMembers(ctx, &pdpb.GetMembersRequest{})
if err != nil {
attachErr := errors.Errorf("error:%s target:%s status:%s", err, cc.Target(), cc.GetState().String())
return nil, errors.WithStack(attachErr)
return nil, errs.ErrClientGetMember.Wrap(attachErr).GenWithStackByCause()
}
return members, nil
}
Expand Down Expand Up @@ -274,13 +274,13 @@ func (c *baseClient) getOrCreateGRPCConn(addr string) (*grpc.ClientConn, error)
KeyPath: c.security.KeyPath,
}.ToTLSConfig()
if err != nil {
return nil, errors.WithStack(err)
return nil, err
}
dctx, cancel := context.WithTimeout(c.ctx, dialTimeout)
defer cancel()
cc, err := grpcutil.GetClientConn(dctx, addr, tlsCfg, c.gRPCDialOptions...)
if err != nil {
return nil, errors.WithStack(err)
return nil, err
}
c.connMu.Lock()
defer c.connMu.Unlock()
Expand Down
6 changes: 3 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (c *client) tsCancelLoop() {
case d := <-c.tsDeadlineCh:
select {
case <-d.timer:
log.Error("tso request is canceled due to timeout")
log.Error("tso request is canceled due to timeout", errs.ZapError(errs.ErrClientGetTSOTimeout))
d.cancel()
case <-d.done:
case <-ctx.Done():
Expand Down Expand Up @@ -233,7 +233,7 @@ func (c *client) tsLoop() {
return
default:
}
log.Error("[pd] create tso stream error", errs.ZapError(errs.ErrCreateTSOStream, err))
log.Error("[pd] create tso stream error", errs.ZapError(errs.ErrClientCreateTSOStream, err))
c.ScheduleCheckLeader()
cancel()
c.revokeTSORequest(errors.WithStack(err))
Expand Down Expand Up @@ -281,7 +281,7 @@ func (c *client) tsLoop() {
return
default:
}
log.Error("[pd] getTS error", errs.ZapError(errs.ErrGetTSO, err))
log.Error("[pd] getTS error", errs.ZapError(errs.ErrClientGetTSO, err))
c.ScheduleCheckLeader()
cancel()
stream, cancel = nil, nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,14 @@ func (p *customReverseProxies) ServeHTTP(w http.ResponseWriter, r *http.Request)

resp, err := p.client.Do(r)
if err != nil {
log.Error("request failed", zap.Error(err))
log.Error("request failed", errs.ZapError(errs.ErrSendRequest, err))
continue
}

b, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
log.Error("request failed", zap.Error(err))
log.Error("read failed", errs.ZapError(errs.ErrIORead, err))
continue
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/dashboard/adapter/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/pingcap-incubator/tidb-dashboard/pkg/apiserver"
"github.com/pingcap/kvproto/pkg/pdpb"
"go.uber.org/zap"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
Expand Down Expand Up @@ -106,7 +105,7 @@ func (m *Manager) updateInfo() {

var err error
if m.members, err = cluster.GetMembers(m.srv.GetClient()); err != nil {
log.Warn("failed to get members", zap.Error(err))
log.Warn("failed to get members", errs.ZapError(err))
m.members = nil
return
}
Expand Down Expand Up @@ -196,7 +195,7 @@ func (m *Manager) startService() {
return
}
if err := m.service.Start(m.ctx); err != nil {
log.Error("Can not start dashboard server", errs.ZapError(errs.ErrStartDashboard, err))
log.Error("Can not start dashboard server", errs.ZapError(errs.ErrDashboardStart, err))
} else {
log.Info("Dashboard server is started")
}
Expand All @@ -207,7 +206,7 @@ func (m *Manager) stopService() {
return
}
if err := m.service.Stop(context.Background()); err != nil {
log.Error("Stop dashboard server error", errs.ZapError(errs.ErrStopDashboard, err))
log.Error("Stop dashboard server error", errs.ZapError(errs.ErrDashboardStop, err))
} else {
log.Info("Dashboard server is stopped")
}
Expand Down
123 changes: 92 additions & 31 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,38 @@ package errs

import "github.com/pingcap/errors"

// The internal error which is generated in PD project.
// tso errors
var (
ErrInvalidTimestamp = errors.Normalize("invalid timestamp", errors.RFCCodeText("PD:tso:ErrInvalidTimestamp"))
ErrLogicOverflow = errors.Normalize("logic part overflow", errors.RFCCodeText("PD:tso:ErrLogicOverflow"))
ErrIncorrectSystemTime = errors.Normalize("incorrect system time", errors.RFCCodeText("PD:tso:ErrIncorrectSystemTime"))
)

// adapter errors
// member errors
var (
ErrStartDashboard = errors.Normalize("start dashboard failed", errors.RFCCodeText("PD:adapter:ErrStartDashboard"))
ErrStopDashboard = errors.Normalize("stop dashboard failed", errors.RFCCodeText("PD:adapter:ErrStopDashboard"))
ErrEtcdLeaderNotFound = errors.Normalize("etcd leader not found", errors.RFCCodeText("PD:member:ErrEtcdLeaderNotFound"))
ErrMarshalLeader = errors.Normalize("marshal leader failed", errors.RFCCodeText("PD:member:ErrMarshalLeader"))
)

// member errors
// client errors
var (
ErretcdLeaderNotFound = errors.Normalize("etcd leader not found", errors.RFCCodeText("PD:member:ErretcdLeaderNotFound"))
ErrGetLeader = errors.Normalize("get leader failed", errors.RFCCodeText("PD:member:ErrGetLeader"))
ErrDeleteLeaderKey = errors.Normalize("delete leader key failed", errors.RFCCodeText("PD:member:ErrDeleteLeaderKey"))
ErrLoadLeaderPriority = errors.Normalize("load leader priority failed", errors.RFCCodeText("PD:member:ErrLoadLeaderPriority"))
ErrLoadetcdLeaderPriority = errors.Normalize("load etcd leader priority failed", errors.RFCCodeText("PD:member:ErrLoadetcdLeaderPriority"))
ErrTransferetcdLeader = errors.Normalize("transfer etcd leader failed", errors.RFCCodeText("PD:member:ErrTransferetcdLeader"))
ErrWatcherCancel = errors.Normalize("watcher canceled", errors.RFCCodeText("PD:member:ErrWatcherCancel"))
ErrMarshalLeader = errors.Normalize("marshal leader failed", errors.RFCCodeText("PD:member:ErrMarshalLeader"))
ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream"))
ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout"))
ErrClientGetTSO = errors.Normalize("get TSO failed", errors.RFCCodeText("PD:client:ErrClientGetTSO"))
ErrClientGetLeader = errors.Normalize("get leader from %v error", errors.RFCCodeText("PD:client:ErrClientGetLeader"))
ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember"))
)

// client errors
// scheduler errors
var (
ErrCloseGRPCConn = errors.Normalize("close gRPC connection failed", errors.RFCCodeText("PD:client:ErrCloseGRPCConn"))
ErrUpdateLeader = errors.Normalize("update leader failed", errors.RFCCodeText("PD:client:ErrUpdateLeader"))
ErrCreateTSOStream = errors.Normalize("create TSO stream failed", errors.RFCCodeText("PD:client:ErrCreateTSOStream"))
ErrGetTSO = errors.Normalize("get TSO failed", errors.RFCCodeText("PD:client:ErrGetTSO"))
ErrGetClusterID = errors.Normalize("get cluster ID failed", errors.RFCCodeText("PD:client:ErrGetClusterID"))
ErrGetSourceStore = errors.Normalize("failed to get the source store", errors.RFCCodeText("PD:scheduler:ErrGetSourceStore"))
ErrSchedulerExisted = errors.Normalize("scheduler existed", errors.RFCCodeText("PD:scheduler:ErrSchedulerExisted"))
ErrSchedulerNotFound = errors.Normalize("scheduler not found", errors.RFCCodeText("PD:scheduler:ErrSchedulerNotFound"))
ErrScheduleConfigNotExist = errors.Normalize("the config does not exist", errors.RFCCodeText("PD:scheduler:ErrScheduleConfigNotExist"))
ErrSchedulerConfig = errors.Normalize("wrong scheduler config %s", errors.RFCCodeText("PD:scheduler:ErrSchedulerConfig"))
ErrCacheOverflow = errors.Normalize("cache overflow", errors.RFCCodeText("PD:scheduler:ErrCacheOverflow"))
ErrInternalGrowth = errors.Normalize("unknown interval growth type error", errors.RFCCodeText("PD:scheduler:ErrInternalGrowth"))
)

// placement errors
Expand All @@ -57,29 +57,90 @@ var (
ErrBuildRuleList = errors.Normalize("build rule list failed, %s", errors.RFCCodeText("PD:placement:ErrBuildRuleList"))
)

// kv errors
// cluster errors
var (
ErrEtcdKVSave = errors.Normalize("etcd KV save failed", errors.RFCCodeText("PD:kv:ErrEtcdKVSave"))
ErrEtcdKVRemove = errors.Normalize("etcd KV remove failed", errors.RFCCodeText("PD:kv:ErrEtcdKVRemove"))
ErrPersistStore = errors.Normalize("failed to persist store", errors.RFCCodeText("PD:cluster:ErrPersistStore"))
ErrDeleteRegion = errors.Normalize("failed to delete region from storage", errors.RFCCodeText("PD:cluster:ErrDeleteRegion"))
ErrSaveRegion = errors.Normalize("failed to save region from storage", errors.RFCCodeText("PD:cluster:ErrSaveRegion"))
ErrBuryStore = errors.Normalize("failed to bury store", errors.RFCCodeText("PD:cluster:ErrBuryStore"))
ErrDeleteStore = errors.Normalize("failed to delete store", errors.RFCCodeText("PD:cluster:ErrDeleteStore"))
ErrPersistClusterVersion = errors.Normalize("persist cluster version meet error", errors.RFCCodeText("PD:cluster:ErrPersistClusterVersion"))
ErrGetMembers = errors.Normalize("get members failed", errors.RFCCodeText("PD:cluster:ErrGetMembers"))
)

// scheduler errors
// grpcutil errors
var (
ErrGetSourceStore = errors.Normalize("failed to get the source store", errors.RFCCodeText("PD:scheduler:ErrGetSourceStore"))
ErrSchedulerExisted = errors.Normalize("scheduler existed", errors.RFCCodeText("PD:scheduler:ErrSchedulerExisted"))
ErrSchedulerNotFound = errors.Normalize("scheduler not found", errors.RFCCodeText("PD:scheduler:ErrSchedulerNotFound"))
ErrScheduleConfigNotExist = errors.Normalize("the config does not exist", errors.RFCCodeText("PD:scheduler:ErrScheduleConfigNotExist"))
ErrSchedulerConfig = errors.Normalize("wrong scheduler config %s", errors.RFCCodeText("PD:scheduler:ErrSchedulerConfig"))
ErrCacheOverflow = errors.Normalize("cache overflow", errors.RFCCodeText("PD:scheduler:ErrCacheOverflow"))
ErrInternalGrowth = errors.Normalize("unknown interval growth type error", errors.RFCCodeText("PD:scheduler:ErrInternalGrowth"))
ErrSecurityConfig = errors.Normalize("security config error: %s", errors.RFCCodeText("PD:grpcutil:ErrSecurityConfig"))
)

// The third-party project error.
// url errors
var (
ErrURLParse = errors.Normalize("parse url error", errors.RFCCodeText("PD:url:ErrURLParse"))
ErrQueryUnescape = errors.Normalize("inverse transformation of QueryEscape error", errors.RFCCodeText("PD:url:ErrQueryUnescape"))
)

// grpc errors
var (
ErrGRPCDial = errors.Normalize("dial error", errors.RFCCodeText("PD:grpc:ErrGRPCDial"))
ErrCloseGRPCConn = errors.Normalize("close gRPC connection failed", errors.RFCCodeText("PD:grpc:ErrCloseGRPCConn"))
)

// proto errors
var (
ErrProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrProtoUnmarshal"))
)

// etcd errors
var (
ErrEtcdTxn = errors.Normalize("etcd Txn failed", errors.RFCCodeText("PD:etcd:ErrEtcdTxn"))
ErrEtcdKVPut = errors.Normalize("etcd KV put failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVPut"))
ErrEtcdKVDelete = errors.Normalize("etcd KV delete failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVDelete"))
ErrEtcdKVGet = errors.Normalize("etcd KV get failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVGet"))
ErrEtcdKVGetResponse = errors.Normalize("etcd invalid get value response %v, must only one", errors.RFCCodeText("PD:etcd:ErrEtcdKVGetResponse"))
ErrEtcdGetCluster = errors.Normalize("etcd get cluster from remote peer failed", errors.RFCCodeText("PD:etcd:ErrEtcdGetCluster"))
ErrEtcdMoveLeader = errors.Normalize("etcd move leader error", errors.RFCCodeText("PD:etcd:ErrEtcdMoveLeader"))
ErrEtcdTLSConfig = errors.Normalize("etcd TLS config error", errors.RFCCodeText("PD:etcd:ErrEtcdTLSConfig"))
ErrEtcdGetProtoMsgWithModRev = errors.Normalize("etcd get proto message with mod rev error", errors.RFCCodeText("PD:etcd:ErrEtcdGetProtoMsgWithModRev"))
ErrEtcdWatcherCancel = errors.Normalize("watcher canceled", errors.RFCCodeText("PD:etcd:ErrEtcdWatcherCancel"))
ErrCloseEtcdClient = errors.Normalize("close etcd client failed", errors.RFCCodeText("PD:etcd:ErrCloseEtcdClient"))
)

// dashboard errors
var (
ErrDashboardStart = errors.Normalize("start dashboard failed", errors.RFCCodeText("PD:dashboard:ErrDashboardStart"))
ErrDashboardStop = errors.Normalize("stop dashboard failed", errors.RFCCodeText("PD:dashboard:ErrDashboardStop"))
)

// strconv errors
var (
ErrStrconvParseInt = errors.Normalize("parse int error", errors.RFCCodeText("PD:strconv:ErrStrconvParseInt"))
ErrStrconvParseUint = errors.Normalize("parse uint error", errors.RFCCodeText("PD:strconv:ErrStrconvParseUint"))
)

// url errors
// prometheus errors
var (
ErrQueryUnescape = errors.Normalize("inverse transformation of QueryEscape error", errors.RFCCodeText("PD:url:ErrQueryUnescape"))
ErrPrometheusPushMetrics = errors.Normalize("push metrics to gateway failed", errors.RFCCodeText("PD:prometheus:ErrPrometheusPushMetrics"))
)

// http errors
var (
ErrSendRequest = errors.Normalize("send HTTP request failed", errors.RFCCodeText("PD:http:ErrSendRequest"))
ErrWriteHTTPBody = errors.Normalize("write HTTP body failed", errors.RFCCodeText("PD:http:ErrWriteHTTPBody"))
ErrNewHTTPRequest = errors.Normalize("new HTTP request failed", errors.RFCCodeText("PD:http:ErrNewHTTPRequest"))
)

// ioutil error
var (
ErrIORead = errors.Normalize("IO read error", errors.RFCCodeText("PD:ioutil:ErrIORead"))
)

// netstat error
var (
ErrNetstatTCPSocks = errors.Normalize("TCP socks error", errors.RFCCodeText("PD:netstat:ErrNetstatTCPSocks"))
)

// hex error
var (
ErrHexDecodingString = errors.Normalize("decode string %s error", errors.RFCCodeText("PD:hex:ErrHexDecodingString"))
)
3 changes: 3 additions & 0 deletions pkg/errs/errs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (

// ZapError is used to make the log output eaiser.
func ZapError(err error, causeError ...error) zap.Field {
if err == nil {
return zap.Skip()
}
if e, ok := err.(*errors.Error); ok {
if len(causeError) >= 1 {
err = e.Wrap(causeError[0]).FastGenWithCause()
Expand Down
16 changes: 16 additions & 0 deletions pkg/errs/errs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package errs
import (
"bytes"
"fmt"
"strconv"
"strings"
"testing"

Expand Down Expand Up @@ -127,3 +128,18 @@ func (s *testErrorSuite) TestZapError(c *C) {
log.Info("test", ZapError(err1))
log.Info("test", ZapError(err1, err))
}

func (s *testErrorSuite) TestErrorWithStack(c *C) {
conf := &log.Config{Level: "debug", File: log.FileLogConfig{}, DisableTimestamp: true}
lg := newZapTestLogger(conf)
log.ReplaceGlobals(lg.Logger, nil)

_, err := strconv.ParseUint("-42", 10, 64)
log.Error("test", ZapError(ErrStrconvParseInt.Wrap(err).GenWithStackByCause()))
m1 := lg.Message()
log.Error("test", zap.Error(errors.WithStack(err)))
m2 := lg.Message()
// This test is based on line number and the first log is in line 141, the second is in line 142.
// So they have the same length stack. Move this test to another place need to change the corresponding length.
c.Assert(len(m1[strings.Index(m1, "[stack="):]), Equals, len(m2[strings.Index(m2, "[stack="):]))
}
18 changes: 10 additions & 8 deletions pkg/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func CheckClusterID(localClusterID types.ID, um types.URLsMap, tlsConfig *tls.Co
trp.CloseIdleConnections()
if gerr != nil {
// Do not return error, because other members may be not ready.
log.Error("failed to get cluster from remote", zap.Error(gerr))
log.Error("failed to get cluster from remote", errs.ZapError(errs.ErrEtcdGetCluster, gerr))
continue
}

Expand Down Expand Up @@ -104,14 +104,16 @@ func EtcdKVGet(c *clientv3.Client, key string, opts ...clientv3.OpOption) (*clie

start := time.Now()
resp, err := clientv3.NewKV(c).Get(ctx, key, opts...)
if err != nil {
log.Error("load from etcd meet error", zap.Error(err))
}
if cost := time.Since(start); cost > DefaultSlowRequestTime {
log.Warn("kv gets too slow", zap.String("request-key", key), zap.Duration("cost", cost), zap.Error(err))
log.Warn("kv gets too slow", zap.String("request-key", key), zap.Duration("cost", cost), errs.ZapError(err))
}

return resp, errors.WithStack(err)
if err != nil {
e := errs.ErrEtcdKVGet.Wrap(err).GenWithStackByCause()
log.Error("load from etcd meet error", zap.String("key", key), errs.ZapError(e))
return resp, e
}
return resp, nil
}

// GetValue gets value with key from etcd.
Expand All @@ -135,7 +137,7 @@ func get(c *clientv3.Client, key string, opts ...clientv3.OpOption) (*clientv3.G
if n := len(resp.Kvs); n == 0 {
return nil, nil
} else if n > 1 {
return nil, errors.Errorf("invalid get value resp %v, must only one", resp.Kvs)
return nil, errs.ErrEtcdKVGetResponse.FastGenByArgs(resp.Kvs)
}
return resp, nil
}
Expand All @@ -151,7 +153,7 @@ func GetProtoMsgWithModRev(c *clientv3.Client, key string, msg proto.Message, op
}
value := resp.Kvs[0].Value
if err = proto.Unmarshal(value, msg); err != nil {
return false, 0, errors.WithStack(err)
return false, 0, errs.ErrProtoUnmarshal.Wrap(err).GenWithStackByCause()
}
return true, resp.Kvs[0].ModRevision, nil
}
Loading

0 comments on commit 6756173

Please sign in to comment.