Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: simplify raft cluster. #78

Merged
merged 5 commits into from
Apr 19, 2016
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions pd-client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pd

import (
"path"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -42,8 +43,8 @@ type client struct {
quit chan struct{}
}

func getLeaderPath(rootPath string) string {
return path.Join(rootPath, "leader")
func getLeaderPath(clusterID uint64, rootPath string) string {
return path.Join(rootPath, strconv.FormatUint(clusterID, 10), "leader")
}

// NewClient creates a PD client.
Expand All @@ -56,7 +57,7 @@ func NewClient(etcdAddrs []string, rootPath string, clusterID uint64) (Client, e
if err != nil {
return nil, errors.Trace(err)
}
leaderPath := getLeaderPath(rootPath)
leaderPath := getLeaderPath(clusterID, rootPath)
leaderAddr, revision, err := getLeader(etcdClient, leaderPath)
if err != nil {
return nil, errors.Trace(err)
Expand Down
5 changes: 3 additions & 2 deletions pd-client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type testClientSuite struct {
}

func (s *testClientSuite) SetUpSuite(c *C) {
s.srv = newServer(c, 1234, "/pd-test")
s.srv = newServer(c, 1234, "/pd-test", clusterID)

// wait for srv to become leader
time.Sleep(time.Second * 3)
Expand All @@ -73,12 +73,13 @@ func (s *testClientSuite) TearDownSuite(c *C) {
s.srv.Close()
}

func newServer(c *C, port int, root string) *server.Server {
func newServer(c *C, port int, root string, clusterID uint64) *server.Server {
cfg := &server.Config{
Addr: fmt.Sprintf("127.0.0.1:%d", port),
EtcdAddrs: strings.Split(*testEtcd, ","),
RootPath: root,
LeaderLease: 1,
ClusterID: clusterID,
}
s, err := server.NewServer(cfg)
c.Assert(err, IsNil)
Expand Down
4 changes: 2 additions & 2 deletions pd-client/leader_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ var _ = Suite(&testLeaderChangeSuite{})
type testLeaderChangeSuite struct{}

func (s *testLeaderChangeSuite) TestLeaderChange(c *C) {
srv1 := newServer(c, 1235, "/pd-leader-change")
srv1 := newServer(c, 1235, "/pd-leader-change", 1)

// wait for srv1 to become leader
time.Sleep(time.Second * 3)
Expand All @@ -24,7 +24,7 @@ func (s *testLeaderChangeSuite) TestLeaderChange(c *C) {
p1, l1, err := client.GetTS()
c.Assert(err, IsNil)

srv2 := newServer(c, 1236, "/pd-leader-change")
srv2 := newServer(c, 1236, "/pd-leader-change", 1)
defer srv2.Close()

// stop srv1, srv2 will become leader
Expand Down
2 changes: 2 additions & 0 deletions pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var (
leaderLease = flag.Int64("lease", 3, "Leader lease time (second)")
logLevel = flag.String("L", "debug", "log level: info, debug, warn, error, fatal")
pprofAddr = flag.String("pprof", ":6060", "pprof HTTP listening address")
clusterID = flag.Uint64("cluster-id", 0, "Cluster ID")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe another default clusterID since 0 is for test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check 0 and panic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think make the default value to be 1 is enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

em, 1 seems strange, I prefer 0 if 1.

We can log a warning if we use 0. Btw for most cases, we use different root path not "/pd", so it is ok.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine.

)

func main() {
Expand All @@ -38,6 +39,7 @@ func main() {
EtcdAddrs: strings.Split(*etcdAddrs, ","),
RootPath: *rootPath,
LeaderLease: *leaderLease,
ClusterID: *clusterID,
}

svr, err := server.NewServer(cfg)
Expand Down
134 changes: 55 additions & 79 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,28 @@ var (
)

// Raft cluster key format:
// cluster 1 -> /raft/1, value is metapb.Cluster
// cluster 2 -> /raft/2
// cluster 1 -> /1/raft, value is metapb.Cluster
// cluster 2 -> /1/raft
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment should be /2/raft?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

// For cluster 1
// store 1 -> /raft/1/s/1, value is metapb.Store
// region 1 -> /raft/1/r/1, value is encoded_region_key
// region search key map -> /raft/1/k/encoded_region_key, value is metapb.Region
// store 1 -> /1/raft/s/1, value is metapb.Store
// region 1 -> /1/raft/r/1, value is encoded_region_key
// region search key map -> /1/raft/k/encoded_region_key, value is metapb.Region
//
// Operation queue, pd can only handle operations like auto-balance, split,
// merge sequentially, and every operation will be assigned a unique incremental ID.
// pending queue -> /raft/1/j/1, /raft/1/j/2, value is operation job.
// pending queue -> /1/raft/j/1, /raft/1/j/2, value is operation job.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/1/raft/j/2

//
// Encode region search key:
// 1, the maximum end region key is empty, so the encode key is \xFF
// 2, other region end key is not empty, the encode key is \z end_key

type raftCluster struct {
sync.RWMutex

s *Server

running bool

clusterID uint64
clusterRoot string

Expand All @@ -69,16 +73,21 @@ type raftCluster struct {
storeConns *storeConns
}

func (s *Server) newCluster(clusterID uint64, meta metapb.Cluster) (*raftCluster, error) {
c := &raftCluster{
s: s,
clusterID: clusterID,
clusterRoot: s.getClusterRootPath(clusterID),
askJobCh: make(chan struct{}, askJobChannelSize),
quitCh: make(chan struct{}),
storeConns: newStoreConns(defaultConnFunc),
func (c *raftCluster) Start(meta metapb.Cluster) error {
c.Lock()
defer c.Unlock()

if c.running {
log.Warn("raft cluster is already stared")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"raft cluster has already started"

return nil
}

c.running = true

c.askJobCh = make(chan struct{}, askJobChannelSize)
c.quitCh = make(chan struct{})
c.storeConns = newStoreConns(defaultConnFunc)

c.storeConns.SetIdleTimeout(idleTimeout)

// Force checking the pending job.
Expand All @@ -91,37 +100,49 @@ func (s *Server) newCluster(clusterID uint64, meta metapb.Cluster) (*raftCluster
// many stores, so it is OK to cache them all.
// And we should use these cache for later ChangePeer too.
if err := c.cacheAllStores(); err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}

c.wg.Add(1)
go c.onJobWorker()

return c, nil
return nil
}

func (c *raftCluster) Close() {
func (c *raftCluster) Stop() {
c.Lock()
defer c.Unlock()

if !c.running {
return
}

close(c.quitCh)
c.wg.Wait()

c.storeConns.Close()

c.running = false
}

func (s *Server) getClusterRootPath(clusterID uint64) string {
return path.Join(s.cfg.RootPath, "raft", strconv.FormatUint(clusterID, 10))
func (c *raftCluster) IsRunning() bool {
c.RLock()
defer c.RUnlock()

return c.running
}

func (s *Server) getCluster(clusterID uint64) (*raftCluster, error) {
s.clusterLock.RLock()
c, ok := s.clusters[clusterID]
s.clusterLock.RUnlock()
func (s *Server) getClusterRootPath() string {
return path.Join(s.rootPath, "raft")
}

if ok {
return c, nil
func (s *Server) getRaftCluster() (*raftCluster, error) {
if s.cluster.IsRunning() {
return s.cluster, nil
}

// Find in etcd
value, err := getValue(s.client, s.getClusterRootPath(clusterID))
value, err := getValue(s.client, s.getClusterRootPath())
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -134,36 +155,11 @@ func (s *Server) getCluster(clusterID uint64) (*raftCluster, error) {
return nil, errors.Trace(err)
}

s.clusterLock.Lock()
defer s.clusterLock.Unlock()

// check again, other goroutine may create it already.
c, ok = s.clusters[clusterID]
if ok {
return c, nil
}

if c, err = s.newCluster(clusterID, m); err != nil {
if err = s.cluster.Start(m); err != nil {
return nil, errors.Trace(err)
}

s.clusters[clusterID] = c
return c, nil
}

func (s *Server) closeClusters() {
s.clusterLock.Lock()
defer s.clusterLock.Unlock()

if len(s.clusters) == 0 {
return
}

for _, cluster := range s.clusters {
cluster.Close()
}

s.clusters = make(map[uint64]*raftCluster)
return s.cluster, nil
}

func encodeRegionSearchKey(endKey []byte) string {
Expand Down Expand Up @@ -233,8 +229,10 @@ func checkBootstrapRequest(clusterID uint64, req *pdpb.BootstrapRequest) error {
return nil
}

func (s *Server) bootstrapCluster(clusterID uint64, req *pdpb.BootstrapRequest) (*pdpb.Response, error) {
log.Infof("try to bootstrap cluster %d with %v", clusterID, req)
func (s *Server) bootstrapCluster(req *pdpb.BootstrapRequest) (*pdpb.Response, error) {
clusterID := s.cfg.ClusterID

log.Infof("try to bootstrap raft cluster %d with %v", clusterID, req)

if err := checkBootstrapRequest(clusterID, req); err != nil {
return nil, errors.Trace(err)
Expand All @@ -250,7 +248,7 @@ func (s *Server) bootstrapCluster(clusterID uint64, req *pdpb.BootstrapRequest)
if err != nil {
return nil, errors.Trace(err)
}
clusterRootPath := s.getClusterRootPath(clusterID)
clusterRootPath := s.getClusterRootPath()

var ops []clientv3.Op
ops = append(ops, clientv3.OpPut(clusterRootPath, string(clusterValue)))
Expand Down Expand Up @@ -294,32 +292,10 @@ func (s *Server) bootstrapCluster(clusterID uint64, req *pdpb.BootstrapRequest)

log.Infof("bootstrap cluster %d ok", clusterID)

s.clusterLock.Lock()
defer s.clusterLock.Unlock()

if _, ok := s.clusters[clusterID]; ok {
// We have bootstrapped cluster ok, and another goroutine quickly requests to
// use this cluster and we create the cluster object for it.
// But can this really happen?
log.Warnf("cluster object %d already exists", clusterID)
return &pdpb.Response{
Bootstrap: &pdpb.BootstrapResponse{},
}, nil
}

c, err := s.newCluster(clusterID, clusterMeta)
if err != nil {
if err = s.cluster.Start(clusterMeta); err != nil {
return nil, errors.Trace(err)
}

mu := &c.mu
mu.Lock()
defer mu.Unlock()

mu.stores[storeMeta.GetId()] = *storeMeta

s.clusters[clusterID] = c

return &pdpb.Response{
Bootstrap: &pdpb.BootstrapResponse{},
}, nil
Expand Down
Loading