Skip to content

Commit

Permalink
region-syncer: full synchronization of new member (#1349)
Browse files Browse the repository at this point in the history
* region-syncer: full synchronization of new member

Signed-off-by: nolouch <[email protected]>
  • Loading branch information
nolouch authored and huachaohuang committed Dec 29, 2018
1 parent 7fac1b2 commit 38221ce
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 2 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway v1.4.1 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jonboulle/clockwork v0.1.0 // indirect
github.com/juju/ratelimit v1.0.1
github.com/mattn/go-shellwords v1.0.3
github.com/matttproud/golang_protobuf_extensions v1.0.0 // indirect
github.com/montanaflynn/stats v0.0.0-20151014174947-eeaced052adb
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NH
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/juju/ratelimit v1.0.1 h1:+7AIFJVQ0EQgq/K9+0Krm7m530Du7tIz0METWzN0RgY=
github.com/juju/ratelimit v1.0.1/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk=
github.com/mattn/go-shellwords v1.0.3 h1:K/VxK7SZ+cvuPgFSLKi5QPI9Vr/ipOf4C1gN+ntueUk=
github.com/mattn/go-shellwords v1.0.3/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vqg+NOMyg4B2o=
github.com/matttproud/golang_protobuf_extensions v1.0.0 h1:YNOwxxSJzSUARoD9KRZLzM9Y858MNGCOACTvCW9TSAc=
Expand Down
32 changes: 30 additions & 2 deletions server/region_syncer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync"
"time"

"github.com/juju/ratelimit"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/server/core"
Expand All @@ -30,6 +31,8 @@ import (

const (
msgSize = 8 * 1024 * 1024
defaultBucketRate = 20 * 1024 * 1024 // 20MB/s
defaultBucketCapacity = 20 * 1024 * 1024 // 20MB
maxSyncRegionBatchSize = 100
syncerKeepAliveInterval = 10 * time.Second
defaultHistoryBufferSize = 10000
Expand All @@ -54,6 +57,7 @@ type Server interface {
GetLeader() *pdpb.Member
GetStorage() *core.KV
Name() string
GetMetaRegions() []*metapb.Region
}

// RegionSyncer is used to sync the region information without raft.
Expand All @@ -66,6 +70,7 @@ type RegionSyncer struct {
closed chan struct{}
wg sync.WaitGroup
history *historyBuffer
limit *ratelimit.Bucket
}

// NewRegionSyncer returns a region syncer.
Expand All @@ -79,6 +84,7 @@ func NewRegionSyncer(s Server) *RegionSyncer {
server: s,
closed: make(chan struct{}),
history: newHistoryBuffer(defaultHistoryBufferSize, s.GetStorage().GetRegionKV()),
limit: ratelimit.NewBucketWithRate(defaultBucketRate, defaultBucketCapacity),
}
}

Expand Down Expand Up @@ -153,9 +159,31 @@ func (s *RegionSyncer) syncHistoryRegion(request *pdpb.SyncRegionRequest, stream
log.Infof("%s already in sync with %s, the last index is %d", name, s.server.Name(), startIndex)
return nil
}
// do full synchronization
if startIndex == 0 {
regions := s.server.GetMetaRegions()
lastIndex := 0
start := time.Now()
res := make([]*metapb.Region, 0, maxSyncRegionBatchSize)
for syncedIndex, r := range regions {
res = append(res, r)
if len(res) < maxSyncRegionBatchSize && syncedIndex < len(regions)-1 {
continue
}
resp := &pdpb.SyncRegionResponse{
Header: &pdpb.ResponseHeader{ClusterId: s.server.ClusterID()},
Regions: res,
StartIndex: uint64(lastIndex),
}
s.limit.Wait(int64(resp.Size()))
lastIndex += len(res)
stream.Send(resp)
res = res[:0]
}
log.Infof("%s has completed full synchronization with %s, spend %v", name, s.server.Name(), time.Since(start))
return nil
}
log.Warnf("no history regions from index %d, the leader maybe restarted", startIndex)
// TODO: Full synchronization
// if startIndex == 0 {}
return nil
}
log.Infof("sync the history regions with %s from index: %d, own last index: %d, got records length: %d",
Expand Down
9 changes: 9 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,15 @@ func (s *Server) GetCluster() *metapb.Cluster {
}
}

// GetMetaRegions gets meta regions from cluster.
func (s *Server) GetMetaRegions() []*metapb.Region {
cluster := s.GetRaftCluster()
if cluster != nil {
return cluster.GetMetaRegions()
}
return nil
}

// GetClusterStatus gets cluster status.
func (s *Server) GetClusterStatus() (*ClusterStatus, error) {
s.cluster.Lock()
Expand Down
16 changes: 16 additions & 0 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ func (s *TestServer) Destroy() error {
return nil
}

// ResignLeader resigns the leader of the server.
func (s *TestServer) ResignLeader() error {
s.Lock()
defer s.Unlock()
return s.server.ResignLeader("")
}

// State returns the current TestServer's state.
func (s *TestServer) State() int32 {
s.RLock()
Expand Down Expand Up @@ -362,6 +369,15 @@ func (c *TestCluster) WaitLeader() string {
return ""
}

// ResignLeader resigns the leader of the cluster.
func (c *TestCluster) ResignLeader() error {
leader := c.GetLeader()
if len(leader) != 0 {
return c.servers[leader].ResignLeader()
}
return errors.New("no leader")
}

// GetCluster returns PD cluster.
func (c *TestCluster) GetCluster() *metapb.Cluster {
leader := c.GetLeader()
Expand Down
59 changes: 59 additions & 0 deletions tests/server/region_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package server_test

import (
"context"
"time"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -76,3 +77,61 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) {
loadRegions := leaderServer.GetServer().GetRaftCluster().GetRegions()
c.Assert(len(loadRegions), Equals, regionLen)
}

func (s *serverTestSuite) TestFullSyncWithAddMember(c *C) {
c.Parallel()
cluster, err := tests.NewTestCluster(1, func(conf *server.Config) { conf.PDServerCfg.UseRegionStorage = true })

c.Assert(err, IsNil)
defer cluster.Destroy()

err = cluster.RunInitialServers()
c.Assert(err, IsNil)
cluster.WaitLeader()
leaderServer := cluster.GetServer(cluster.GetLeader())
c.Assert(leaderServer.BootstrapCluster(), IsNil)
rc := leaderServer.GetServer().GetRaftCluster()
c.Assert(rc, NotNil)
regionLen := 110
id := &idAllocator{}
regions := make([]*core.RegionInfo, 0, regionLen)
for i := 0; i < regionLen; i++ {
r := &metapb.Region{
Id: id.Alloc(),
RegionEpoch: &metapb.RegionEpoch{
ConfVer: 1,
Version: 1,
},
StartKey: []byte{byte(i)},
EndKey: []byte{byte(i + 1)},
Peers: []*metapb.Peer{{Id: id.Alloc(), StoreId: uint64(0)}},
}
regions = append(regions, core.NewRegionInfo(r, r.Peers[0]))
}
for _, region := range regions {
err = rc.HandleRegionHeartbeat(region)
c.Assert(err, IsNil)
}
// ensure flush to region kv
time.Sleep(3 * time.Second)
// restart pd1
err = leaderServer.Stop()
c.Assert(err, IsNil)
err = leaderServer.Run(context.TODO())
c.Assert(err, IsNil)
c.Assert(cluster.WaitLeader(), Equals, "pd1")

// join new PD
pd2, err := cluster.Join()
c.Assert(err, IsNil)
err = pd2.Run(context.TODO())
c.Assert(err, IsNil)
c.Assert(cluster.WaitLeader(), Equals, "pd1")
// waiting for synchronization to complete
time.Sleep(3 * time.Second)
err = cluster.ResignLeader()
c.Assert(err, IsNil)
c.Assert(cluster.WaitLeader(), Equals, "pd2")
loadRegions := pd2.GetServer().GetRaftCluster().GetRegions()
c.Assert(len(loadRegions), Equals, regionLen)
}

0 comments on commit 38221ce

Please sign in to comment.