diff --git a/client/client.go b/client/client.go index c8ffa6b703e2..ebb6536f8611 100644 --- a/client/client.go +++ b/client/client.go @@ -59,7 +59,7 @@ type Client interface { // GetAllStores gets all stores from pd. // The store may expire later. Caller is responsible for caching and taking care // of store change. - GetAllStores(ctx context.Context) ([]*metapb.Store, error) + GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*metapb.Store, error) // Update GC safe point. TiKV will check it and do GC themselves if necessary. // If the given safePoint is less than the current one, it will not be updated. // Returns the new safePoint after updating. @@ -68,6 +68,19 @@ type Client interface { Close() } +// GetStoreOp represents available options when getting stores. +type GetStoreOp struct { + excludeTombstone bool +} + +// GetStoreOption configures GetStoreOp. +type GetStoreOption func(*GetStoreOp) + +// WithExcludeTombstone excludes tombstone stores from the result. +func WithExcludeTombstone() GetStoreOption { + return func(op *GetStoreOp) { op.excludeTombstone = true } +} + type tsoRequest struct { start time.Time ctx context.Context @@ -686,7 +699,13 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, e return store, nil } -func (c *client) GetAllStores(ctx context.Context) ([]*metapb.Store, error) { +func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*metapb.Store, error) { + // Applies options + options := &GetStoreOp{} + for _, opt := range opts { + opt(options) + } + if span := opentracing.SpanFromContext(ctx); span != nil { span = opentracing.StartSpan("pdclient.GetAllStores", opentracing.ChildOf(span.Context())) defer span.Finish() @@ -696,7 +715,8 @@ func (c *client) GetAllStores(ctx context.Context) ([]*metapb.Store, error) { ctx, cancel := context.WithTimeout(ctx, pdTimeout) resp, err := c.leaderClient().GetAllStores(ctx, &pdpb.GetAllStoresRequest{ - Header: c.requestHeader(), + Header: c.requestHeader(), + ExcludeTombstoneStores: options.excludeTombstone, }) cancel() diff --git a/client/client_test.go b/client/client_test.go index 6bd5dfa9117b..027c65f3fe6e 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/gogo/protobuf/proto" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" @@ -263,31 +264,46 @@ func (s *testClientSuite) TestGetStore(c *C) { c.Assert(err, IsNil) c.Assert(n, DeepEquals, store) - // Get a removed store should return error. + stores, err := s.client.GetAllStores(context.Background()) + c.Assert(err, IsNil) + c.Assert(stores, DeepEquals, []*metapb.Store{store}) + + // Mark the store as offline. err = cluster.RemoveStore(store.GetId()) c.Assert(err, IsNil) + offlineStore := proto.Clone(store).(*metapb.Store) + offlineStore.State = metapb.StoreState_Offline // Get an offline store should be OK. n, err = s.client.GetStore(context.Background(), store.GetId()) c.Assert(err, IsNil) - c.Assert(n.GetState(), Equals, metapb.StoreState_Offline) + c.Assert(n, DeepEquals, offlineStore) + + // Should return offline stores. + stores, err = s.client.GetAllStores(context.Background()) + c.Assert(err, IsNil) + c.Assert(stores, DeepEquals, []*metapb.Store{offlineStore}) + // Mark the store as tombstone. err = cluster.BuryStore(store.GetId(), true) c.Assert(err, IsNil) + tombstoneStore := proto.Clone(store).(*metapb.Store) + tombstoneStore.State = metapb.StoreState_Tombstone // Get a tombstone store should fail. n, err = s.client.GetStore(context.Background(), store.GetId()) c.Assert(err, IsNil) c.Assert(n, IsNil) -} -func (s *testClientSuite) TestGetAllStores(c *C) { - cluster := s.srv.GetRaftCluster() - c.Assert(cluster, NotNil) + // Should return tombstone stores. + stores, err = s.client.GetAllStores(context.Background()) + c.Assert(err, IsNil) + c.Assert(stores, DeepEquals, []*metapb.Store{tombstoneStore}) - stores, err := s.client.GetAllStores(context.Background()) + // Should not return tombstone stores. + stores, err = s.client.GetAllStores(context.Background(), WithExcludeTombstone()) c.Assert(err, IsNil) - c.Assert(stores, DeepEquals, []*metapb.Store{store}) + c.Assert(stores, IsNil) } func (s *testClientSuite) checkGCSafePoint(c *C, expectedSafePoint uint64) { diff --git a/go.mod b/go.mod index 18227c52aff9..321f9121e754 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,6 @@ require ( github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385 // indirect github.com/ghodss/yaml v1.0.0 github.com/gogo/protobuf v1.0.0 - github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff // indirect github.com/golang/protobuf v1.2.0 github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect @@ -40,7 +39,7 @@ require ( github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9 github.com/pingcap/errors v0.10.1 // indirect github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3 - github.com/pingcap/kvproto v0.0.0-20181123124450-d48563486f61 + github.com/pingcap/kvproto v0.0.0-20190225084405-84f2c621d8e8 github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7 github.com/pkg/errors v0.8.1 github.com/prometheus/client_golang v0.8.0 @@ -59,9 +58,7 @@ require ( github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18 // indirect go.uber.org/zap v1.9.1 golang.org/x/crypto v0.0.0-20180503215945-1f94bef427e3 // indirect - golang.org/x/sync v0.0.0-20181108010431-42b317875d0f // indirect golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 // indirect - google.golang.org/genproto v0.0.0-20180427144745-86e600f69ee4 // indirect google.golang.org/grpc v1.12.2 gopkg.in/airbrake/gobrake.v2 v2.0.9 // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect diff --git a/go.sum b/go.sum index 96c706e6fc5e..33d5a1ab8861 100644 --- a/go.sum +++ b/go.sum @@ -31,12 +31,14 @@ github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/gogo/protobuf v0.0.0-20180717141946-636bf0302bc9/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.0.0 h1:2jyBKDKU/8v3v2xVR2PtiWQviFUyiaGk2rpfyFT8rTM= github.com/gogo/protobuf v1.0.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff h1:kOkM9whyQYodu09SJ6W3NCsHG7crFaJILQ22Gozp3lg= github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/protobuf v0.0.0-20180814211427-aa810b61a9c7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= @@ -86,8 +88,8 @@ github.com/pingcap/errors v0.10.1 h1:fGVuPMtwNcxbzQ3aoRyyi6kxvXKMkEsceP81f3b8wsk github.com/pingcap/errors v0.10.1/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3 h1:04yuCf5NMvLU8rB2m4Qs3rynH7EYpMno3lHkewIOdMo= github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3/go.mod h1:DazNTg0PTldtpsQiT9I5tVJwV1onHMKBBgXzmJUlMns= -github.com/pingcap/kvproto v0.0.0-20181123124450-d48563486f61 h1:Yms1MiO/ezhE9ozwEOnlh/HrEFHX/r3fPCV6vNThGDM= -github.com/pingcap/kvproto v0.0.0-20181123124450-d48563486f61/go.mod h1:0gwbe1F2iBIjuQ9AH0DbQhL+Dpr5GofU8fgYyXk+ykk= +github.com/pingcap/kvproto v0.0.0-20190225084405-84f2c621d8e8 h1:ZoP49RWRjlmvXUWAySYZD1tV8BIVVEJ7xrbCg1B7/fw= +github.com/pingcap/kvproto v0.0.0-20190225084405-84f2c621d8e8/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7 h1:kOHAMalwF69bJrtWrOdVaCSvZjLucrJhP4NQKIu6uM4= github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= @@ -135,17 +137,20 @@ golang.org/x/crypto v0.0.0-20180503215945-1f94bef427e3 h1:+U/hI4i24Enhs+2BEMCN7w golang.org/x/crypto v0.0.0-20180503215945-1f94bef427e3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181005035420-146acd28ed58 h1:otZG8yDCO4LVps5+9bxOeNiCvgmOyt96J3roHTYs7oE= +golang.org/x/net v0.0.0-20181005035420-146acd28ed58/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20181108010431-42b317875d0f h1:Bl/8QSvNqXvPGPGXa2z5xUTmV7VDcZyvRZ+QQXkXTZQ= -golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 h1:+DCIGbF/swA92ohVg0//6X2IVY3KZs6p9mix0ziNYJM= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -google.golang.org/genproto v0.0.0-20180427144745-86e600f69ee4 h1:0rk3/gV3HbvCeUzVMhdxV3TEVKMVPDnayjN7sYRmcxY= -google.golang.org/genproto v0.0.0-20180427144745-86e600f69ee4/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f h1:FU37niK8AQ59mHcskRyQL7H0ErSeNh650vdcj8HqdSI= +google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.12.2 h1:FDcj+1t3wSAWho63301gD11L6ysvOl7XPJ0r/ClqNm0= google.golang.org/grpc v1.12.2/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= gopkg.in/airbrake/gobrake.v2 v2.0.9 h1:7z2uVWwn7oVeeugY1DtlPAy5H+KYgB1KeKTnqjNatLo= diff --git a/pkg/etcdutil/etcdutil.go b/pkg/etcdutil/etcdutil.go index 762898acc5a6..84ffc9fd3a1b 100644 --- a/pkg/etcdutil/etcdutil.go +++ b/pkg/etcdutil/etcdutil.go @@ -23,8 +23,8 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/pkg/types" + log "github.com/pingcap/log" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" ) const ( diff --git a/pkg/logutil/log.go b/pkg/logutil/log.go index 1f7c2b33f0d4..7ffb02017a49 100644 --- a/pkg/logutil/log.go +++ b/pkg/logutil/log.go @@ -19,7 +19,6 @@ import ( "os" "path" "runtime" - "runtime/debug" "strings" "sync" @@ -28,6 +27,8 @@ import ( zaplog "github.com/pingcap/log" "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" "google.golang.org/grpc/grpclog" lumberjack "gopkg.in/natefinch/lumberjack.v2" ) @@ -145,6 +146,23 @@ func StringToLogLevel(level string) log.Level { return defaultLogLevel } +// StringToZapLogLevel translates log level string to log level. +func StringToZapLogLevel(level string) zapcore.Level { + switch strings.ToLower(level) { + case "fatal": + return zapcore.FatalLevel + case "error": + return zapcore.ErrorLevel + case "warn", "warning": + return zapcore.WarnLevel + case "debug": + return zapcore.DebugLevel + case "info": + return zapcore.InfoLevel + } + return zapcore.InfoLevel +} + // textFormatter is for compatibility with ngaut/log type textFormatter struct { DisableTimestamp bool @@ -273,6 +291,6 @@ func InitLogger(cfg *zaplog.Config) error { // Commonly used with a `defer`. func LogPanic() { if e := recover(); e != nil { - log.Fatalf("panic: %v, stack: %s", e, string(debug.Stack())) + zaplog.Fatal("panic", zap.Reflect("recover", e)) } } diff --git a/pkg/logutil/log_test.go b/pkg/logutil/log_test.go index 258a884227a0..7d397f5356a8 100644 --- a/pkg/logutil/log_test.go +++ b/pkg/logutil/log_test.go @@ -22,6 +22,7 @@ import ( . "github.com/pingcap/check" zaplog "github.com/pingcap/log" log "github.com/sirupsen/logrus" + "go.uber.org/zap/zapcore" ) const ( @@ -52,6 +53,16 @@ func (s *testLogSuite) TestStringToLogLevel(c *C) { c.Assert(StringToLogLevel("whatever"), Equals, log.InfoLevel) } +func (s *testLogSuite) TestStringToZapLogLevel(c *C) { + c.Assert(StringToZapLogLevel("fatal"), Equals, zapcore.FatalLevel) + c.Assert(StringToZapLogLevel("ERROR"), Equals, zapcore.ErrorLevel) + c.Assert(StringToZapLogLevel("warn"), Equals, zapcore.WarnLevel) + c.Assert(StringToZapLogLevel("warning"), Equals, zapcore.WarnLevel) + c.Assert(StringToZapLogLevel("debug"), Equals, zapcore.DebugLevel) + c.Assert(StringToZapLogLevel("info"), Equals, zapcore.InfoLevel) + c.Assert(StringToZapLogLevel("whatever"), Equals, zapcore.InfoLevel) +} + func (s *testLogSuite) TestStringToLogFormatter(c *C) { c.Assert(StringToLogFormatter("text", true), DeepEquals, &textFormatter{ DisableTimestamp: true, diff --git a/server/api/log.go b/server/api/log.go index 714909f9f183..461e330cdd6d 100644 --- a/server/api/log.go +++ b/server/api/log.go @@ -18,9 +18,9 @@ import ( "io/ioutil" "net/http" + log "github.com/pingcap/log" "github.com/pingcap/pd/pkg/logutil" "github.com/pingcap/pd/server" - log "github.com/sirupsen/logrus" "github.com/unrolled/render" ) @@ -51,7 +51,7 @@ func (h *logHandler) Handle(w http.ResponseWriter, r *http.Request) { } h.svr.SetLogLevel(level) - log.SetLevel(logutil.StringToLogLevel(level)) + log.SetLevel(logutil.StringToZapLogLevel(level)) h.rd.JSON(w, http.StatusOK, nil) } diff --git a/server/cache/ttl.go b/server/cache/ttl.go index e0dd4d325e18..afb2823f3a40 100644 --- a/server/cache/ttl.go +++ b/server/cache/ttl.go @@ -17,7 +17,8 @@ import ( "sync" "time" - log "github.com/sirupsen/logrus" + log "github.com/pingcap/log" + "go.uber.org/zap" ) type ttlCacheItem struct { @@ -124,7 +125,7 @@ func (c *TTL) doGC() { } c.Unlock() - log.Debugf("GC %d items", count) + log.Debug("TTL GC items", zap.Int("count", count)) } } diff --git a/server/config.go b/server/config.go index e59f6001cac6..2d5a6810928c 100644 --- a/server/config.go +++ b/server/config.go @@ -152,6 +152,8 @@ func NewConfig() *Config { fs.StringVar(&cfg.InitialCluster, "initial-cluster", "", "initial cluster configuration for bootstrapping, e,g. pd=http://127.0.0.1:2380") fs.StringVar(&cfg.Join, "join", "", "join to an existing cluster (usage: cluster's '${advertise-client-urls}'") + fs.StringVar(&cfg.Metric.PushAddress, "metrics-addr", "", "prometheus pushgateway address, leaves it empty will disable prometheus push.") + fs.StringVar(&cfg.Log.Level, "L", "", "log level: debug, info, warn, error, fatal (default 'info')") fs.StringVar(&cfg.Log.File.Filename, "log-file", "", "log file path") fs.BoolVar(&cfg.Log.File.LogRotate, "log-rotate", true, "rotate log") @@ -185,6 +187,8 @@ const ( // embed etcd has a check that `5 * tick > election` defaultElectionInterval = 3000 * time.Millisecond + defaultMetricsPushInterval = 15 * time.Second + defaultHeartbeatStreamRebindInterval = time.Minute defaultLeaderPriorityCheckInterval = time.Minute @@ -359,6 +363,7 @@ func (c *Config) Adjust(meta *toml.MetaData) error { adjustString(&c.AdvertiseClientUrls, c.ClientUrls) adjustString(&c.PeerUrls, defaultPeerUrls) adjustString(&c.AdvertisePeerUrls, c.PeerUrls) + adjustDuration(&c.Metric.PushInterval, defaultMetricsPushInterval) if len(c.InitialCluster) == 0 { // The advertise peer urls may be http://127.0.0.1:2380,http://127.0.0.1:2381 diff --git a/server/config_test.go b/server/config_test.go index b90a075c40bf..e06eb8005896 100644 --- a/server/config_test.go +++ b/server/config_test.go @@ -17,6 +17,7 @@ import ( "fmt" "os" "path" + "time" "github.com/BurntSushi/toml" @@ -131,4 +132,18 @@ type = "random-merge" c.Assert(err, IsNil) err = cfg.Adjust(&meta) c.Assert(err, NotNil) + + cfgData = ` +[metric] +interval = "35s" +address = "localhost:9090" +` + cfg = NewConfig() + meta, err = toml.Decode(cfgData, &cfg) + c.Assert(err, IsNil) + err = cfg.Adjust(&meta) + c.Assert(err, IsNil) + + c.Assert(cfg.Metric.PushInterval.Duration, Equals, 35*time.Second) + c.Assert(cfg.Metric.PushAddress, Equals, "localhost:9090") } diff --git a/server/coordinator_test.go b/server/coordinator_test.go index a58766acfb67..bfdedae5fdfe 100644 --- a/server/coordinator_test.go +++ b/server/coordinator_test.go @@ -509,6 +509,53 @@ func (s *testCoordinatorSuite) TestShouldRun(c *C) { c.Assert(co.cluster.prepareChecker.sum, Equals, 7) } +func (s *testCoordinatorSuite) TestShouldRunWithNonLeaderRegions(c *C) { + _, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) + tc := newTestClusterInfo(opt) + hbStreams := newHeartbeatStreams(tc.getClusterID()) + defer hbStreams.Close() + + co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) + + c.Assert(tc.addLeaderStore(1, 10), IsNil) + c.Assert(tc.addLeaderStore(2, 0), IsNil) + c.Assert(tc.addLeaderStore(3, 0), IsNil) + for i := 0; i < 10; i++ { + c.Assert(tc.LoadRegion(uint64(i+1), 1, 2, 3), IsNil) + } + c.Assert(co.shouldRun(), IsFalse) + c.Assert(tc.core.Regions.GetStoreRegionCount(1), Equals, 10) + + tbl := []struct { + regionID uint64 + shouldRun bool + }{ + {1, false}, + {2, false}, + {3, false}, + {4, false}, + {5, false}, + {6, false}, + {7, false}, + {8, true}, + } + + for _, t := range tbl { + r := tc.GetRegion(t.regionID) + nr := r.Clone(core.WithLeader(r.GetPeers()[0])) + c.Assert(tc.handleRegionHeartbeat(nr), IsNil) + c.Assert(co.shouldRun(), Equals, t.shouldRun) + } + nr := &metapb.Region{Id: 8, Peers: []*metapb.Peer{}} + newRegion := core.NewRegionInfo(nr, nil) + c.Assert(tc.handleRegionHeartbeat(newRegion), NotNil) + c.Assert(co.cluster.prepareChecker.sum, Equals, 8) + + // Now, after server is prepared, there exist some regions with no leader. + c.Assert(tc.GetRegion(9).GetLeader().GetStoreId(), Equals, uint64(0)) + c.Assert(tc.GetRegion(10).GetLeader().GetStoreId(), Equals, uint64(0)) +} func (s *testCoordinatorSuite) TestAddScheduler(c *C) { cfg, opt, err := newTestScheduleConfig() diff --git a/server/core/region_kv.go b/server/core/region_kv.go index be1ad86a80a5..eed06e4ec7cb 100644 --- a/server/core/region_kv.go +++ b/server/core/region_kv.go @@ -20,8 +20,9 @@ import ( "time" "github.com/pingcap/kvproto/pkg/metapb" + log "github.com/pingcap/log" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) var dirtyFlushTick = time.Second @@ -84,7 +85,7 @@ func (kv *RegionKV) backgroundFlush() { continue } if err = kv.FlushRegion(); err != nil { - log.Error("flush regions error: ", err) + log.Error("flush regions meet error", zap.Error(err)) } case <-kv.ctx.Done(): return @@ -176,7 +177,7 @@ func (kv *RegionKV) flush() error { func (kv *RegionKV) Close() error { err := kv.FlushRegion() if err != nil { - log.Error("meet error before close the region storage: ", err) + log.Error("meet error before close the region storage", zap.Error(err)) } kv.cancel() return kv.db.Close() diff --git a/server/core/region_tree.go b/server/core/region_tree.go index ba6ac83c2129..2fc96a1cc6ee 100644 --- a/server/core/region_tree.go +++ b/server/core/region_tree.go @@ -17,7 +17,8 @@ import ( "github.com/google/btree" "github.com/pingcap/kvproto/pkg/metapb" - log "github.com/sirupsen/logrus" + log "github.com/pingcap/log" + "go.uber.org/zap" ) var _ btree.Item = ®ionItem{} @@ -89,7 +90,10 @@ func (t *regionTree) getOverlaps(region *metapb.Region) []*metapb.Region { func (t *regionTree) update(region *metapb.Region) []*metapb.Region { overlaps := t.getOverlaps(region) for _, item := range overlaps { - log.Debugf("[region %d] delete region %v, cause overlapping with region %v", item.GetId(), HexRegionMeta(item), HexRegionMeta(region)) + log.Debug("overlapping region", + zap.Uint64("region-id", item.GetId()), + zap.Reflect("delete-region", HexRegionMeta(item)), + zap.Reflect("update-region", HexRegionMeta(region))) t.tree.Delete(®ionItem{item}) } diff --git a/server/core/store.go b/server/core/store.go index 75819bf438ac..8e7156d8624c 100644 --- a/server/core/store.go +++ b/server/core/store.go @@ -23,7 +23,8 @@ import ( "github.com/pingcap/errcode" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" - log "github.com/sirupsen/logrus" + log "github.com/pingcap/log" + "go.uber.org/zap" ) // StoreInfo contains information about a store. @@ -529,7 +530,8 @@ func (s *StoresInfo) BlockStore(storeID uint64) errcode.ErrorCode { func (s *StoresInfo) UnblockStore(storeID uint64) { store, ok := s.stores[storeID] if !ok { - log.Fatalf("store %d is unblocked, but it is not found", storeID) + log.Fatal("store is unblocked, but it is not found", + zap.Uint64("store-id", storeID)) } s.stores[storeID] = store.Clone(SetStoreUnBlock()) } diff --git a/server/grpc_service.go b/server/grpc_service.go index fa20c14e1dff..b0e343e5398d 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -228,9 +228,21 @@ func (s *Server) GetAllStores(ctx context.Context, request *pdpb.GetAllStoresReq return &pdpb.GetAllStoresResponse{Header: s.notBootstrappedHeader()}, nil } + // Don't return tombstone stores. + var stores []*metapb.Store + if request.GetExcludeTombstoneStores() { + for _, store := range cluster.GetStores() { + if store.GetState() != metapb.StoreState_Tombstone { + stores = append(stores, store) + } + } + } else { + stores = cluster.GetStores() + } + return &pdpb.GetAllStoresResponse{ Header: s.header(), - Stores: cluster.GetStores(), + Stores: stores, }, nil } diff --git a/server/schedule/range_cluster.go b/server/schedule/range_cluster.go index cdb924141d94..e4db455c9e19 100644 --- a/server/schedule/range_cluster.go +++ b/server/schedule/range_cluster.go @@ -91,7 +91,9 @@ func (r *RangeCluster) updateStoreInfo(s *core.StoreInfo) *core.StoreInfo { // GetStore searches for a store by ID. func (r *RangeCluster) GetStore(id uint64) *core.StoreInfo { s := r.Cluster.GetStore(id) - r.updateStoreInfo(s) + if s != nil { + r.updateStoreInfo(s) + } return s } @@ -154,6 +156,8 @@ func (r *RangeCluster) GetFollowerStores(region *core.RegionInfo) []*core.StoreI // GetLeaderStore returns all stores that contains the region's leader peer. func (r *RangeCluster) GetLeaderStore(region *core.RegionInfo) *core.StoreInfo { s := r.Cluster.GetLeaderStore(region) - r.updateStoreInfo(s) + if s != nil { + r.updateStoreInfo(s) + } return s } diff --git a/server/schedulers/balance_test.go b/server/schedulers/balance_test.go index 882f03db60bf..e6846115f0d0 100644 --- a/server/schedulers/balance_test.go +++ b/server/schedulers/balance_test.go @@ -1306,3 +1306,69 @@ func (s *testScatterRangeLeaderSuite) TestBalance(c *C) { c.Check(regionCount, LessEqual, 32) } } + +func (s *testScatterRangeLeaderSuite) TestBalanceWhenRegionNotHeartbeat(c *C) { + opt := schedule.NewMockSchedulerOptions() + tc := schedule.NewMockCluster(opt) + // Add stores 1,2,3. + tc.AddRegionStore(1, 0) + tc.AddRegionStore(2, 0) + tc.AddRegionStore(3, 0) + var ( + id uint64 + regions []*metapb.Region + ) + for i := 0; i < 10; i++ { + peers := []*metapb.Peer{ + {Id: id + 1, StoreId: 1}, + {Id: id + 2, StoreId: 2}, + {Id: id + 3, StoreId: 3}, + } + regions = append(regions, &metapb.Region{ + Id: id + 4, + Peers: peers, + StartKey: []byte(fmt.Sprintf("s_%02d", i)), + EndKey: []byte(fmt.Sprintf("s_%02d", i+1)), + }) + id += 4 + } + // empty case + regions[9].EndKey = []byte("") + + // To simulate server prepared, + // store 1 contains 8 leader region peers and leaders of 2 regions are unknown yet. + for _, meta := range regions { + var leader *metapb.Peer + if meta.Id < 8 { + leader = meta.Peers[0] + } + regionInfo := core.NewRegionInfo( + meta, + leader, + core.SetApproximateKeys(96), + core.SetApproximateSize(96), + ) + + tc.Regions.SetRegion(regionInfo) + } + + for i := 1; i <= 3; i++ { + tc.UpdateStoreStatus(uint64(i)) + } + + oc := schedule.NewOperatorController(nil, nil) + hb := newScatterRangeScheduler(oc, []string{"s_00", "s_09", "t"}) + + limit := 0 + for { + if limit > 100 { + break + } + ops := hb.Schedule(tc) + if ops == nil { + limit++ + continue + } + tc.ApplyOperator(ops[0]) + } +} diff --git a/server/server.go b/server/server.go index 87871a3ef00a..8a32e5ca0623 100644 --- a/server/server.go +++ b/server/server.go @@ -779,6 +779,8 @@ func (s *Server) GetMemberLeaderPriority(id uint64) (int, error) { // SetLogLevel sets log level. func (s *Server) SetLogLevel(level string) { s.cfg.Log.Level = level + log.SetLevel(logutil.StringToZapLogLevel(level)) + log.Warn("log level changed", zap.String("level", log.GetLevel().String())) } var healthURL = "/pd/ping"