diff --git a/go.mod b/go.mod index 71d06ced09b..2af66030a57 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/grpc-ecosystem/grpc-gateway v1.4.1 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/jonboulle/clockwork v0.1.0 // indirect + github.com/juju/ratelimit v1.0.1 github.com/mattn/go-shellwords v1.0.3 github.com/matttproud/golang_protobuf_extensions v1.0.0 // indirect github.com/montanaflynn/stats v0.0.0-20151014174947-eeaced052adb diff --git a/go.sum b/go.sum index fe4b00ad5e2..531753ce32a 100644 --- a/go.sum +++ b/go.sum @@ -58,6 +58,8 @@ github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NH github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= +github.com/juju/ratelimit v1.0.1 h1:+7AIFJVQ0EQgq/K9+0Krm7m530Du7tIz0METWzN0RgY= +github.com/juju/ratelimit v1.0.1/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk= github.com/mattn/go-shellwords v1.0.3 h1:K/VxK7SZ+cvuPgFSLKi5QPI9Vr/ipOf4C1gN+ntueUk= github.com/mattn/go-shellwords v1.0.3/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vqg+NOMyg4B2o= github.com/matttproud/golang_protobuf_extensions v1.0.0 h1:YNOwxxSJzSUARoD9KRZLzM9Y858MNGCOACTvCW9TSAc= diff --git a/server/region_syncer/server.go b/server/region_syncer/server.go index c09bbe9c814..a9bd74f5856 100644 --- a/server/region_syncer/server.go +++ b/server/region_syncer/server.go @@ -19,6 +19,7 @@ import ( "sync" "time" + "github.com/juju/ratelimit" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/pd/server/core" @@ -30,6 +31,8 @@ import ( const ( msgSize = 8 * 1024 * 1024 + defaultBucketRate = 20 * 1024 * 1024 // 20MB/s + defaultBucketCapacity = 20 * 1024 * 1024 // 20MB maxSyncRegionBatchSize = 100 syncerKeepAliveInterval = 10 * time.Second defaultHistoryBufferSize = 10000 @@ -54,6 +57,7 @@ type Server interface { GetLeader() *pdpb.Member GetStorage() *core.KV Name() string + GetMetaRegions() []*metapb.Region } // RegionSyncer is used to sync the region information without raft. @@ -66,6 +70,7 @@ type RegionSyncer struct { closed chan struct{} wg sync.WaitGroup history *historyBuffer + limit *ratelimit.Bucket } // NewRegionSyncer returns a region syncer. @@ -79,6 +84,7 @@ func NewRegionSyncer(s Server) *RegionSyncer { server: s, closed: make(chan struct{}), history: newHistoryBuffer(defaultHistoryBufferSize, s.GetStorage().GetRegionKV()), + limit: ratelimit.NewBucketWithRate(defaultBucketRate, defaultBucketCapacity), } } @@ -153,9 +159,31 @@ func (s *RegionSyncer) syncHistoryRegion(request *pdpb.SyncRegionRequest, stream log.Infof("%s already in sync with %s, the last index is %d", name, s.server.Name(), startIndex) return nil } + // do full synchronization + if startIndex == 0 { + regions := s.server.GetMetaRegions() + lastIndex := 0 + start := time.Now() + res := make([]*metapb.Region, 0, maxSyncRegionBatchSize) + for syncedIndex, r := range regions { + res = append(res, r) + if len(res) < maxSyncRegionBatchSize && syncedIndex < len(regions)-1 { + continue + } + resp := &pdpb.SyncRegionResponse{ + Header: &pdpb.ResponseHeader{ClusterId: s.server.ClusterID()}, + Regions: res, + StartIndex: uint64(lastIndex), + } + s.limit.Wait(int64(resp.Size())) + lastIndex += len(res) + stream.Send(resp) + res = res[:0] + } + log.Infof("%s has completed full synchronization with %s, spend %v", name, s.server.Name(), time.Since(start)) + return nil + } log.Warnf("no history regions from index %d, the leader maybe restarted", startIndex) - // TODO: Full synchronization - // if startIndex == 0 {} return nil } log.Infof("sync the history regions with %s from index: %d, own last index: %d, got records length: %d", diff --git a/server/server.go b/server/server.go index 245daede72f..701cba49fa2 100644 --- a/server/server.go +++ b/server/server.go @@ -690,6 +690,15 @@ func (s *Server) GetCluster() *metapb.Cluster { } } +// GetMetaRegions gets meta regions from cluster. +func (s *Server) GetMetaRegions() []*metapb.Region { + cluster := s.GetRaftCluster() + if cluster != nil { + return cluster.GetMetaRegions() + } + return nil +} + // GetClusterStatus gets cluster status. func (s *Server) GetClusterStatus() (*ClusterStatus, error) { s.cluster.Lock() diff --git a/tests/cluster.go b/tests/cluster.go index e420dfb59e0..37191c31d9e 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -106,6 +106,13 @@ func (s *TestServer) Destroy() error { return nil } +// ResignLeader resigns the leader of the server. +func (s *TestServer) ResignLeader() error { + s.Lock() + defer s.Unlock() + return s.server.ResignLeader("") +} + // State returns the current TestServer's state. func (s *TestServer) State() int32 { s.RLock() @@ -362,6 +369,15 @@ func (c *TestCluster) WaitLeader() string { return "" } +// ResignLeader resigns the leader of the cluster. +func (c *TestCluster) ResignLeader() error { + leader := c.GetLeader() + if len(leader) != 0 { + return c.servers[leader].ResignLeader() + } + return errors.New("no leader") +} + // GetCluster returns PD cluster. func (c *TestCluster) GetCluster() *metapb.Cluster { leader := c.GetLeader() diff --git a/tests/server/region_syncer_test.go b/tests/server/region_syncer_test.go index 7ff17fe53b3..a5ff7727b90 100644 --- a/tests/server/region_syncer_test.go +++ b/tests/server/region_syncer_test.go @@ -14,6 +14,7 @@ package server_test import ( + "context" "time" . "github.com/pingcap/check" @@ -76,3 +77,61 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) { loadRegions := leaderServer.GetServer().GetRaftCluster().GetRegions() c.Assert(len(loadRegions), Equals, regionLen) } + +func (s *serverTestSuite) TestFullSyncWithAddMember(c *C) { + c.Parallel() + cluster, err := tests.NewTestCluster(1, func(conf *server.Config) { conf.PDServerCfg.UseRegionStorage = true }) + + c.Assert(err, IsNil) + defer cluster.Destroy() + + err = cluster.RunInitialServers() + c.Assert(err, IsNil) + cluster.WaitLeader() + leaderServer := cluster.GetServer(cluster.GetLeader()) + c.Assert(leaderServer.BootstrapCluster(), IsNil) + rc := leaderServer.GetServer().GetRaftCluster() + c.Assert(rc, NotNil) + regionLen := 110 + id := &idAllocator{} + regions := make([]*core.RegionInfo, 0, regionLen) + for i := 0; i < regionLen; i++ { + r := &metapb.Region{ + Id: id.Alloc(), + RegionEpoch: &metapb.RegionEpoch{ + ConfVer: 1, + Version: 1, + }, + StartKey: []byte{byte(i)}, + EndKey: []byte{byte(i + 1)}, + Peers: []*metapb.Peer{{Id: id.Alloc(), StoreId: uint64(0)}}, + } + regions = append(regions, core.NewRegionInfo(r, r.Peers[0])) + } + for _, region := range regions { + err = rc.HandleRegionHeartbeat(region) + c.Assert(err, IsNil) + } + // ensure flush to region kv + time.Sleep(3 * time.Second) + // restart pd1 + err = leaderServer.Stop() + c.Assert(err, IsNil) + err = leaderServer.Run(context.TODO()) + c.Assert(err, IsNil) + c.Assert(cluster.WaitLeader(), Equals, "pd1") + + // join new PD + pd2, err := cluster.Join() + c.Assert(err, IsNil) + err = pd2.Run(context.TODO()) + c.Assert(err, IsNil) + c.Assert(cluster.WaitLeader(), Equals, "pd1") + // waiting for synchronization to complete + time.Sleep(3 * time.Second) + err = cluster.ResignLeader() + c.Assert(err, IsNil) + c.Assert(cluster.WaitLeader(), Equals, "pd2") + loadRegions := pd2.GetServer().GetRaftCluster().GetRegions() + c.Assert(len(loadRegions), Equals, regionLen) +}