Skip to content

Commit

Permalink
fix the conflict
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed May 30, 2022
1 parent c9eaba5 commit da6ab99
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 821 deletions.
47 changes: 23 additions & 24 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {

// IsPrepared return true if the prepare checker is ready.
func (c *RaftCluster) IsPrepared() bool {
return c.coordinator.prepareChecker.isPrepared()
return c.prepareChecker.isPrepared()
}

var regionGuide = core.GenerateRegionGuideFunc(true)
Expand Down Expand Up @@ -612,13 +612,8 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
regionEventCounter.WithLabelValues("update_cache").Inc()
}

<<<<<<< HEAD
if isNew {
c.prepareChecker.collect(region)
=======
if !c.IsPrepared() && isNew {
c.coordinator.prepareChecker.collect(region)
>>>>>>> 429b49283 (*: fix scheduling can not immediately start after transfer leader (#4875))
c.prepareChecker.collect(region)
}

if c.regionStats != nil {
Expand Down Expand Up @@ -665,16 +660,12 @@ func (c *RaftCluster) updateStoreStatusLocked(id uint64) {
c.core.UpdateStoreStatus(id, leaderCount, regionCount, pendingPeerCount, leaderRegionSize, regionSize)
}

<<<<<<< HEAD
//nolint:unused
func (c *RaftCluster) getClusterID() uint64 {
c.RLock()
defer c.RUnlock()
return c.meta.GetId()
}

=======
>>>>>>> 429b49283 (*: fix scheduling can not immediately start after transfer leader (#4875))
func (c *RaftCluster) putMetaLocked(meta *metapb.Cluster) error {
if c.storage != nil {
if err := c.storage.SaveMeta(meta); err != nil {
Expand Down Expand Up @@ -1393,13 +1384,6 @@ func (c *RaftCluster) GetComponentManager() *component.Manager {
return c.componentManager
}

// isPrepared if the cluster information is collected
func (c *RaftCluster) isPrepared() bool {
c.RLock()
defer c.RUnlock()
return c.prepareChecker.check(c)
}

// GetStoresLoads returns load stats of all stores.
func (c *RaftCluster) GetStoresLoads() map[uint64][]float64 {
c.RLock()
Expand Down Expand Up @@ -1459,10 +1443,11 @@ func (c *RaftCluster) FitRegion(region *core.RegionInfo) *placement.RegionFit {
}

type prepareChecker struct {
sync.RWMutex
reactiveRegions map[uint64]int
start time.Time
sum int
isPrepared bool
prepared bool
}

func newPrepareChecker() *prepareChecker {
Expand All @@ -1473,12 +1458,18 @@ func newPrepareChecker() *prepareChecker {
}

// Before starting up the scheduler, we need to take the proportion of the regions on each store into consideration.
func (checker *prepareChecker) check(c *RaftCluster) bool {
if checker.isPrepared || time.Since(checker.start) > collectTimeout {
func (checker *prepareChecker) check(c *core.BasicCluster) bool {
checker.Lock()
defer checker.Unlock()
if checker.prepared {
return true
}
if time.Since(checker.start) > collectTimeout {
checker.prepared = true
return true
}
// The number of active regions should be more than total region of all stores * collectFactor
if float64(c.core.GetRegionCount())*collectFactor > float64(checker.sum) {
if float64(c.GetRegionCount())*collectFactor > float64(checker.sum) {
return false
}
for _, store := range c.GetStores() {
Expand All @@ -1487,21 +1478,29 @@ func (checker *prepareChecker) check(c *RaftCluster) bool {
}
storeID := store.GetID()
// For each store, the number of active regions should be more than total region of the store * collectFactor
if float64(c.core.GetStoreRegionCount(storeID))*collectFactor > float64(checker.reactiveRegions[storeID]) {
if float64(c.GetStoreRegionCount(storeID))*collectFactor > float64(checker.reactiveRegions[storeID]) {
return false
}
}
checker.isPrepared = true
checker.prepared = true
return true
}

func (checker *prepareChecker) collect(region *core.RegionInfo) {
checker.Lock()
defer checker.Unlock()
for _, p := range region.GetPeers() {
checker.reactiveRegions[p.GetStoreId()]++
}
checker.sum++
}

func (checker *prepareChecker) isPrepared() bool {
checker.RLock()
defer checker.RUnlock()
return checker.prepared
}

// GetHotWriteRegions gets hot write regions' info.
func (c *RaftCluster) GetHotWriteRegions() *statistics.StoreHotPeersInfos {
c.RLock()
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ func (c *coordinator) resetHotSpotMetrics() {
}

func (c *coordinator) shouldRun() bool {
return c.cluster.isPrepared()
return c.cluster.prepareChecker.check(c.cluster.core)
}

func (c *coordinator) addScheduler(scheduler schedule.Scheduler, args ...string) error {
Expand Down
12 changes: 1 addition & 11 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,8 @@ func (s *testCoordinatorSuite) TestBasic(c *C) {
}

func (s *testCoordinatorSuite) TestDispatch(c *C) {
tc, co, cleanup := prepare(nil, func(tc *testCluster) { tc.prepareChecker.isPrepared = true }, nil, c)
tc, co, cleanup := prepare(nil, func(tc *testCluster) { tc.prepareChecker.prepared = true }, nil, c)
defer cleanup()
<<<<<<< HEAD

=======
co.prepareChecker.prepared = true
>>>>>>> 429b49283 (*: fix scheduling can not immediately start after transfer leader (#4875))
// Transfer peer from store 4 to store 1.
c.Assert(tc.addRegionStore(4, 40), IsNil)
c.Assert(tc.addRegionStore(3, 30), IsNil)
Expand Down Expand Up @@ -287,13 +282,8 @@ func prepare(setCfg func(*config.ScheduleConfig), setTc func(*testCluster), run
if setCfg != nil {
setCfg(cfg)
}
<<<<<<< HEAD
tc := newTestCluster(opt)
hbStreams := hbstream.NewTestHeartbeatStreams(ctx, tc.getClusterID(), tc, true /* need to run */)
=======
tc := newTestCluster(ctx, opt)
hbStreams := hbstream.NewTestHeartbeatStreams(ctx, tc.meta.GetId(), tc, true /* need to run */)
>>>>>>> 429b49283 (*: fix scheduling can not immediately start after transfer leader (#4875))
if setTc != nil {
setTc(tc)
}
Expand Down
81 changes: 0 additions & 81 deletions server/cluster/prepare_checker.go

This file was deleted.

Loading

0 comments on commit da6ab99

Please sign in to comment.