Skip to content

Commit

Permalink
*: simplify raft cluster. (#78)
Browse files Browse the repository at this point in the history
 *: simplify raft cluster.
  • Loading branch information
siddontang committed Apr 19, 2016
1 parent 264ba60 commit f467c7d
Show file tree
Hide file tree
Showing 17 changed files with 174 additions and 149 deletions.
7 changes: 4 additions & 3 deletions pd-client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pd

import (
"path"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -42,8 +43,8 @@ type client struct {
quit chan struct{}
}

func getLeaderPath(rootPath string) string {
return path.Join(rootPath, "leader")
func getLeaderPath(clusterID uint64, rootPath string) string {
return path.Join(rootPath, strconv.FormatUint(clusterID, 10), "leader")
}

// NewClient creates a PD client.
Expand All @@ -56,7 +57,7 @@ func NewClient(etcdAddrs []string, rootPath string, clusterID uint64) (Client, e
if err != nil {
return nil, errors.Trace(err)
}
leaderPath := getLeaderPath(rootPath)
leaderPath := getLeaderPath(clusterID, rootPath)
leaderAddr, revision, err := getLeader(etcdClient, leaderPath)
if err != nil {
return nil, errors.Trace(err)
Expand Down
5 changes: 3 additions & 2 deletions pd-client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type testClientSuite struct {
}

func (s *testClientSuite) SetUpSuite(c *C) {
s.srv = newServer(c, 1234, "/pd-test")
s.srv = newServer(c, 1234, "/pd-test", clusterID)

// wait for srv to become leader
time.Sleep(time.Second * 3)
Expand All @@ -73,12 +73,13 @@ func (s *testClientSuite) TearDownSuite(c *C) {
s.srv.Close()
}

func newServer(c *C, port int, root string) *server.Server {
func newServer(c *C, port int, root string, clusterID uint64) *server.Server {
cfg := &server.Config{
Addr: fmt.Sprintf("127.0.0.1:%d", port),
EtcdAddrs: strings.Split(*testEtcd, ","),
RootPath: root,
LeaderLease: 1,
ClusterID: clusterID,
}
s, err := server.NewServer(cfg)
c.Assert(err, IsNil)
Expand Down
4 changes: 2 additions & 2 deletions pd-client/leader_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ var _ = Suite(&testLeaderChangeSuite{})
type testLeaderChangeSuite struct{}

func (s *testLeaderChangeSuite) TestLeaderChange(c *C) {
srv1 := newServer(c, 1235, "/pd-leader-change")
srv1 := newServer(c, 1235, "/pd-leader-change", 1)

// wait for srv1 to become leader
time.Sleep(time.Second * 3)
Expand All @@ -24,7 +24,7 @@ func (s *testLeaderChangeSuite) TestLeaderChange(c *C) {
p1, l1, err := client.GetTS()
c.Assert(err, IsNil)

srv2 := newServer(c, 1236, "/pd-leader-change")
srv2 := newServer(c, 1236, "/pd-leader-change", 1)
defer srv2.Close()

// stop srv1, srv2 will become leader
Expand Down
6 changes: 6 additions & 0 deletions pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@ var (
leaderLease = flag.Int64("lease", 3, "Leader lease time (second)")
logLevel = flag.String("L", "debug", "log level: info, debug, warn, error, fatal")
pprofAddr = flag.String("pprof", ":6060", "pprof HTTP listening address")
clusterID = flag.Uint64("cluster-id", 0, "Cluster ID")
)

func main() {
flag.Parse()

if *clusterID == 0 {
log.Warn("cluster id is 0, don't use it in production")
}

log.SetLevelByString(*logLevel)

go func() {
Expand All @@ -38,6 +43,7 @@ func main() {
EtcdAddrs: strings.Split(*etcdAddrs, ","),
RootPath: *rootPath,
LeaderLease: *leaderLease,
ClusterID: *clusterID,
}

svr, err := server.NewServer(cfg)
Expand Down
134 changes: 55 additions & 79 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,28 @@ var (
)

// Raft cluster key format:
// cluster 1 -> /raft/1, value is metapb.Cluster
// cluster 2 -> /raft/2
// cluster 1 -> /1/raft, value is metapb.Cluster
// cluster 2 -> /2/raft
// For cluster 1
// store 1 -> /raft/1/s/1, value is metapb.Store
// region 1 -> /raft/1/r/1, value is encoded_region_key
// region search key map -> /raft/1/k/encoded_region_key, value is metapb.Region
// store 1 -> /1/raft/s/1, value is metapb.Store
// region 1 -> /1/raft/r/1, value is encoded_region_key
// region search key map -> /1/raft/k/encoded_region_key, value is metapb.Region
//
// Operation queue, pd can only handle operations like auto-balance, split,
// merge sequentially, and every operation will be assigned a unique incremental ID.
// pending queue -> /raft/1/j/1, /raft/1/j/2, value is operation job.
// pending queue -> /1/raft/j/1, /1/raft/j/2, value is operation job.
//
// Encode region search key:
// 1, the maximum end region key is empty, so the encode key is \xFF
// 2, other region end key is not empty, the encode key is \z end_key

type raftCluster struct {
sync.RWMutex

s *Server

running bool

clusterID uint64
clusterRoot string

Expand All @@ -69,16 +73,21 @@ type raftCluster struct {
storeConns *storeConns
}

func (s *Server) newCluster(clusterID uint64, meta metapb.Cluster) (*raftCluster, error) {
c := &raftCluster{
s: s,
clusterID: clusterID,
clusterRoot: s.getClusterRootPath(clusterID),
askJobCh: make(chan struct{}, askJobChannelSize),
quitCh: make(chan struct{}),
storeConns: newStoreConns(defaultConnFunc),
func (c *raftCluster) Start(meta metapb.Cluster) error {
c.Lock()
defer c.Unlock()

if c.running {
log.Warn("raft cluster has already been started")
return nil
}

c.running = true

c.askJobCh = make(chan struct{}, askJobChannelSize)
c.quitCh = make(chan struct{})
c.storeConns = newStoreConns(defaultConnFunc)

c.storeConns.SetIdleTimeout(idleTimeout)

// Force checking the pending job.
Expand All @@ -91,37 +100,49 @@ func (s *Server) newCluster(clusterID uint64, meta metapb.Cluster) (*raftCluster
// many stores, so it is OK to cache them all.
// And we should use these cache for later ChangePeer too.
if err := c.cacheAllStores(); err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}

c.wg.Add(1)
go c.onJobWorker()

return c, nil
return nil
}

func (c *raftCluster) Close() {
func (c *raftCluster) Stop() {
c.Lock()
defer c.Unlock()

if !c.running {
return
}

close(c.quitCh)
c.wg.Wait()

c.storeConns.Close()

c.running = false
}

func (s *Server) getClusterRootPath(clusterID uint64) string {
return path.Join(s.cfg.RootPath, "raft", strconv.FormatUint(clusterID, 10))
func (c *raftCluster) IsRunning() bool {
c.RLock()
defer c.RUnlock()

return c.running
}

func (s *Server) getCluster(clusterID uint64) (*raftCluster, error) {
s.clusterLock.RLock()
c, ok := s.clusters[clusterID]
s.clusterLock.RUnlock()
func (s *Server) getClusterRootPath() string {
return path.Join(s.rootPath, "raft")
}

if ok {
return c, nil
func (s *Server) getRaftCluster() (*raftCluster, error) {
if s.cluster.IsRunning() {
return s.cluster, nil
}

// Find in etcd
value, err := getValue(s.client, s.getClusterRootPath(clusterID))
value, err := getValue(s.client, s.getClusterRootPath())
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -134,36 +155,11 @@ func (s *Server) getCluster(clusterID uint64) (*raftCluster, error) {
return nil, errors.Trace(err)
}

s.clusterLock.Lock()
defer s.clusterLock.Unlock()

// check again, other goroutine may create it already.
c, ok = s.clusters[clusterID]
if ok {
return c, nil
}

if c, err = s.newCluster(clusterID, m); err != nil {
if err = s.cluster.Start(m); err != nil {
return nil, errors.Trace(err)
}

s.clusters[clusterID] = c
return c, nil
}

func (s *Server) closeClusters() {
s.clusterLock.Lock()
defer s.clusterLock.Unlock()

if len(s.clusters) == 0 {
return
}

for _, cluster := range s.clusters {
cluster.Close()
}

s.clusters = make(map[uint64]*raftCluster)
return s.cluster, nil
}

func encodeRegionSearchKey(endKey []byte) string {
Expand Down Expand Up @@ -233,8 +229,10 @@ func checkBootstrapRequest(clusterID uint64, req *pdpb.BootstrapRequest) error {
return nil
}

func (s *Server) bootstrapCluster(clusterID uint64, req *pdpb.BootstrapRequest) (*pdpb.Response, error) {
log.Infof("try to bootstrap cluster %d with %v", clusterID, req)
func (s *Server) bootstrapCluster(req *pdpb.BootstrapRequest) (*pdpb.Response, error) {
clusterID := s.cfg.ClusterID

log.Infof("try to bootstrap raft cluster %d with %v", clusterID, req)

if err := checkBootstrapRequest(clusterID, req); err != nil {
return nil, errors.Trace(err)
Expand All @@ -250,7 +248,7 @@ func (s *Server) bootstrapCluster(clusterID uint64, req *pdpb.BootstrapRequest)
if err != nil {
return nil, errors.Trace(err)
}
clusterRootPath := s.getClusterRootPath(clusterID)
clusterRootPath := s.getClusterRootPath()

var ops []clientv3.Op
ops = append(ops, clientv3.OpPut(clusterRootPath, string(clusterValue)))
Expand Down Expand Up @@ -294,32 +292,10 @@ func (s *Server) bootstrapCluster(clusterID uint64, req *pdpb.BootstrapRequest)

log.Infof("bootstrap cluster %d ok", clusterID)

s.clusterLock.Lock()
defer s.clusterLock.Unlock()

if _, ok := s.clusters[clusterID]; ok {
// We have bootstrapped cluster ok, and another goroutine quickly requests to
// use this cluster and we create the cluster object for it.
// But can this really happen?
log.Warnf("cluster object %d already exists", clusterID)
return &pdpb.Response{
Bootstrap: &pdpb.BootstrapResponse{},
}, nil
}

c, err := s.newCluster(clusterID, clusterMeta)
if err != nil {
if err = s.cluster.Start(clusterMeta); err != nil {
return nil, errors.Trace(err)
}

mu := &c.mu
mu.Lock()
defer mu.Unlock()

mu.stores[storeMeta.GetId()] = *storeMeta

s.clusters[clusterID] = c

return &pdpb.Response{
Bootstrap: &pdpb.BootstrapResponse{},
}, nil
Expand Down
Loading

0 comments on commit f467c7d

Please sign in to comment.