Skip to content

Commit

Permalink
replication: check the up stores to switch to async (#3991)
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
nolouch and ti-chi-bot authored Aug 16, 2021
1 parent ab0125b commit 7a2ab50
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 17 deletions.
39 changes: 24 additions & 15 deletions server/replication/replication_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,21 +369,22 @@ func (m *ModeManager) tickDR() {

drTickCounter.Inc()

totalPrimary, totalDr := m.config.DRAutoSync.PrimaryReplicas, m.config.DRAutoSync.DRReplicas
downPrimary, downDr := m.checkStoreStatus()
totalPrimaryPeers, totalDrPeers := m.config.DRAutoSync.PrimaryReplicas, m.config.DRAutoSync.DRReplicas
downPrimaryStores, downDrStores, upPrimayStores, upDrStores := m.checkStoreStatus()

// canSync is true when every region has at least 1 replica in each DC.
canSync := downPrimary < totalPrimary && downDr < totalDr
canSync := downPrimaryStores < totalPrimaryPeers && downDrStores < totalDrPeers &&
upPrimayStores > 0 && upDrStores > 0

// hasMajority is true when every region has majority peer online.
var upPeers int
if downPrimary < totalPrimary {
upPeers += totalPrimary - downPrimary
if downPrimaryStores < totalPrimaryPeers {
upPeers += totalPrimaryPeers - downPrimaryStores
}
if downDr < totalDr {
upPeers += totalDr - downDr
if downDrStores < totalDrPeers {
upPeers += totalDrPeers - downDrStores
}
hasMajority := upPeers*2 > totalPrimary+totalDr
hasMajority := upPeers*2 > totalPrimaryPeers+totalDrPeers

// If hasMajority is false, the cluster is always unavailable. Switch to async won't help.
if !canSync && hasMajority && m.drGetState() != drStateAsync && m.drCheckAsyncTimeout() {
Expand All @@ -407,17 +408,25 @@ func (m *ModeManager) tickDR() {
}
}

func (m *ModeManager) checkStoreStatus() (primaryFailCount, drFailCount int) {
func (m *ModeManager) checkStoreStatus() (primaryDownCount, drDownCount, primaryUpCount, drUpCount int) {
m.RLock()
defer m.RUnlock()
for _, s := range m.cluster.GetStores() {
if !s.IsTombstone() && s.DownTime() >= m.config.DRAutoSync.WaitStoreTimeout.Duration {
labelValue := s.GetLabelValue(m.config.DRAutoSync.LabelKey)
if labelValue == m.config.DRAutoSync.Primary {
primaryFailCount++
down := !s.IsTombstone() && s.DownTime() >= m.config.DRAutoSync.WaitStoreTimeout.Duration
labelValue := s.GetLabelValue(m.config.DRAutoSync.LabelKey)
if labelValue == m.config.DRAutoSync.Primary {
if down {
primaryDownCount++
} else {
primaryUpCount++
}
if labelValue == m.config.DRAutoSync.DR {
drFailCount++

}
if labelValue == m.config.DRAutoSync.DR {
if down {
drDownCount++
} else {
drUpCount++
}
}
}
Expand Down
17 changes: 15 additions & 2 deletions server/replication/replication_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,6 @@ func (s *testReplicationMode) TestStateSwitch(c *C) {
cluster.AddLabelsStore(1, 1, map[string]string{"zone": "zone1"})
cluster.AddLabelsStore(2, 1, map[string]string{"zone": "zone1"})
cluster.AddLabelsStore(3, 1, map[string]string{"zone": "zone1"})
cluster.AddLabelsStore(4, 1, map[string]string{"zone": "zone2"})
cluster.AddLabelsStore(5, 1, map[string]string{"zone": "zone2"})

// initial state is sync
c.Assert(rep.drGetState(), Equals, drStateSync)
Expand All @@ -178,6 +176,21 @@ func (s *testReplicationMode) TestStateSwitch(c *C) {
stateID = rep.drAutoSync.StateID
}

// only one zone, sync -> async
rep.tickDR()
c.Assert(rep.drGetState(), Equals, drStateAsync)
assertStateIDUpdate()

// add new store in dr zone.
cluster.AddLabelsStore(4, 1, map[string]string{"zone": "zone2"})
cluster.AddLabelsStore(5, 1, map[string]string{"zone": "zone2"})
// async -> sync
rep.tickDR()
c.Assert(rep.drGetState(), Equals, drStateSyncRecover)
rep.drSwitchToSync()
c.Assert(rep.drGetState(), Equals, drStateSync)
assertStateIDUpdate()

// sync -> async
rep.tickDR()
c.Assert(rep.drGetState(), Equals, drStateSync)
Expand Down

0 comments on commit 7a2ab50

Please sign in to comment.