Skip to content

Commit

Permalink
mcs: fix duplicate start of RaftCluster. (#6358)
Browse files Browse the repository at this point in the history
* Using double-checked locking to avoid duplicate start of RaftCluster.

Signed-off-by: Bin Shi <[email protected]>

* Handle feedback

Signed-off-by: Bin Shi <[email protected]>

* improve locking

Signed-off-by: Bin Shi <[email protected]>

* handle feedback

Signed-off-by: Bin Shi <[email protected]>

---------

Signed-off-by: Bin Shi <[email protected]>
Co-authored-by: Ryan Leung <[email protected]>
  • Loading branch information
binshi-bing and rleungx authored Apr 24, 2023
1 parent 23611ff commit 7350cd2
Showing 1 changed file with 15 additions and 11 deletions.
26 changes: 15 additions & 11 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/coreos/go-semver/semver"
Expand Down Expand Up @@ -134,7 +133,7 @@ type RaftCluster struct {
etcdClient *clientv3.Client
httpClient *http.Client

running atomic.Bool
running bool
meta *metapb.Cluster
storeConfigManager *config.StoreConfigManager
storage storage.Storage
Expand Down Expand Up @@ -258,14 +257,14 @@ func (c *RaftCluster) InitCluster(

// Start starts a cluster.
func (c *RaftCluster) Start(s Server) error {
if c.IsRunning() {
c.Lock()
defer c.Unlock()

if c.running {
log.Warn("raft cluster has already been started")
return nil
}

c.Lock()
defer c.Unlock()

c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetStorage(), s.GetBasicCluster(), s.GetKeyspaceGroupManager())
cluster, err := c.LoadClusterInfo()
if err != nil {
Expand Down Expand Up @@ -317,7 +316,7 @@ func (c *RaftCluster) Start(s Server) error {
go c.runUpdateStoreStats()
go c.startGCTuner()

c.running.Store(true)
c.running = true
return nil
}

Expand Down Expand Up @@ -605,26 +604,31 @@ func (c *RaftCluster) runReplicationMode() {
// Stop stops the cluster.
func (c *RaftCluster) Stop() {
c.Lock()
if !c.running.CompareAndSwap(true, false) {
if !c.running {
c.Unlock()
return
}

c.running = false
c.coordinator.stop()
c.cancel()
c.Unlock()

c.wg.Wait()
log.Info("raftcluster is stopped")
}

// IsRunning return if the cluster is running.
func (c *RaftCluster) IsRunning() bool {
return c.running.Load()
c.RLock()
defer c.RUnlock()
return c.running
}

// Context returns the context of RaftCluster.
func (c *RaftCluster) Context() context.Context {
if c.running.Load() {
c.RLock()
defer c.RUnlock()
if c.running {
return c.ctx
}
return nil
Expand Down

0 comments on commit 7350cd2

Please sign in to comment.