Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#7202
Browse files Browse the repository at this point in the history
close tikv#7201

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
disksing authored and ti-chi-bot committed Oct 16, 2023
1 parent 77d6f5b commit 344a7a9
Show file tree
Hide file tree
Showing 2 changed files with 268 additions and 45 deletions.
115 changes: 83 additions & 32 deletions server/replication/replication_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,16 @@ import (
pb "github.com/pingcap/kvproto/pkg/replication_modepb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
<<<<<<< HEAD:server/replication/replication_mode.go
"github.com/tikv/pd/pkg/logutil"
"github.com/tikv/pd/pkg/syncutil"
=======
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
>>>>>>> 4176c1daa (replication_mode: use placement to determin canSync and hasMajority (#7202)):pkg/replication/replication_mode.go
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule"
Expand Down Expand Up @@ -364,37 +372,80 @@ func (m *ModeManager) Run(ctx context.Context) {
wg.Wait()
}

func storeIDs(stores []*core.StoreInfo) []uint64 {
ids := make([]uint64, len(stores))
for i, s := range stores {
ids[i] = s.GetID()
}
return ids
}

func minimalUpVoters(rule *placement.Rule, upStores, downStores []*core.StoreInfo) int {
if rule.Role == placement.Learner {
return 0
}
var up, down int
for _, s := range upStores {
if placement.MatchLabelConstraints(s, rule.LabelConstraints) {
up++
}
}
for _, s := range downStores {
if placement.MatchLabelConstraints(s, rule.LabelConstraints) {
down++
}
}
minimalUp := rule.Count - down
if minimalUp < 0 {
minimalUp = 0
}
if minimalUp > up {
minimalUp = up
}
return minimalUp
}

func (m *ModeManager) tickUpdateState() {
if m.getModeName() != modeDRAutoSync {
return
}

drTickCounter.Inc()

totalPrimaryPeers, totalDrPeers := m.config.DRAutoSync.PrimaryReplicas, m.config.DRAutoSync.DRReplicas
stores := m.checkStoreStatus()

// canSync is true when every region has at least 1 replica in each DC.
canSync := len(stores[primaryDown]) < totalPrimaryPeers && len(stores[drDown]) < totalDrPeers &&
len(stores[primaryUp]) > 0 && len(stores[drUp]) > 0
var primaryHasVoter, drHasVoter bool
var totalVoter, totalUpVoter int
for _, r := range m.cluster.GetRuleManager().GetAllRules() {
if len(r.StartKey) > 0 || len(r.EndKey) > 0 {
// All rules should be global rules. If not, skip it.
continue
}
if r.Role != placement.Learner {
totalVoter += r.Count
}
minimalUpPrimary := minimalUpVoters(r, stores[primaryUp], stores[primaryDown])
minimalUpDr := minimalUpVoters(r, stores[drUp], stores[drDown])
primaryHasVoter = primaryHasVoter || minimalUpPrimary > 0
drHasVoter = drHasVoter || minimalUpDr > 0
upVoters := minimalUpPrimary + minimalUpDr
if upVoters > r.Count {
upVoters = r.Count
}
totalUpVoter += upVoters
}

// canSync is true when every region has at least 1 voter replica in each DC.
// hasMajority is true when every region has majority peer online.
var upPeers int
if len(stores[primaryDown]) < totalPrimaryPeers {
upPeers += totalPrimaryPeers - len(stores[primaryDown])
}
if len(stores[drDown]) < totalDrPeers {
upPeers += totalDrPeers - len(stores[drDown])
}
hasMajority := upPeers*2 > totalPrimaryPeers+totalDrPeers
canSync := primaryHasVoter && drHasVoter
hasMajority := totalUpVoter*2 > totalVoter

log.Debug("replication store status",
zap.Uint64s("up-primary", stores[primaryUp]),
zap.Uint64s("up-dr", stores[drUp]),
zap.Uint64s("down-primary", stores[primaryDown]),
zap.Uint64s("down-dr", stores[drDown]),
zap.Uint64s("up-primary", storeIDs(stores[primaryUp])),
zap.Uint64s("up-dr", storeIDs(stores[drUp])),
zap.Uint64s("down-primary", storeIDs(stores[primaryDown])),
zap.Uint64s("down-dr", storeIDs(stores[drDown])),
zap.Bool("can-sync", canSync),
zap.Int("up-peers", upPeers),
zap.Bool("has-majority", hasMajority),
)

Expand All @@ -420,31 +471,31 @@ func (m *ModeManager) tickUpdateState() {
case drStateSync:
// If hasMajority is false, the cluster is always unavailable. Switch to async won't help.
if !canSync && hasMajority {
m.drSwitchToAsyncWait(stores[primaryUp])
m.drSwitchToAsyncWait(storeIDs(stores[primaryUp]))
}
case drStateAsyncWait:
if canSync {
m.drSwitchToSync()
break
}
if oldAvailableStores := m.drGetAvailableStores(); !reflect.DeepEqual(oldAvailableStores, stores[primaryUp]) {
m.drSwitchToAsyncWait(stores[primaryUp])
if oldAvailableStores := m.drGetAvailableStores(); !reflect.DeepEqual(oldAvailableStores, storeIDs(stores[primaryUp])) {
m.drSwitchToAsyncWait(storeIDs(stores[primaryUp]))
break
}
if m.drCheckStoreStateUpdated(stores[primaryUp]) {
m.drSwitchToAsync(stores[primaryUp])
if m.drCheckStoreStateUpdated(storeIDs(stores[primaryUp])) {
m.drSwitchToAsync(storeIDs(stores[primaryUp]))
}
case drStateAsync:
if canSync {
m.drSwitchToSyncRecover()
break
}
if !reflect.DeepEqual(m.drGetAvailableStores(), stores[primaryUp]) && m.drCheckStoreStateUpdated(stores[primaryUp]) {
m.drSwitchToAsync(stores[primaryUp])
if !reflect.DeepEqual(m.drGetAvailableStores(), stores[primaryUp]) && m.drCheckStoreStateUpdated(storeIDs(stores[primaryUp])) {
m.drSwitchToAsync(storeIDs(stores[primaryUp]))
}
case drStateSyncRecover:
if !canSync && hasMajority {
m.drSwitchToAsync(stores[primaryUp])
m.drSwitchToAsync(storeIDs(stores[primaryUp]))
} else {
m.updateProgress()
progress := m.estimateProgress()
Expand Down Expand Up @@ -519,10 +570,10 @@ const (
storeStatusTypeCount
)

func (m *ModeManager) checkStoreStatus() [][]uint64 {
func (m *ModeManager) checkStoreStatus() [][]*core.StoreInfo {
m.RLock()
defer m.RUnlock()
stores := make([][]uint64, storeStatusTypeCount)
stores := make([][]*core.StoreInfo, storeStatusTypeCount)
for _, s := range m.cluster.GetStores() {
if s.IsRemoved() {
continue
Expand All @@ -535,21 +586,21 @@ func (m *ModeManager) checkStoreStatus() [][]uint64 {
labelValue := s.GetLabelValue(m.config.DRAutoSync.LabelKey)
if labelValue == m.config.DRAutoSync.Primary {
if down {
stores[primaryDown] = append(stores[primaryDown], s.GetID())
stores[primaryDown] = append(stores[primaryDown], s)
} else {
stores[primaryUp] = append(stores[primaryUp], s.GetID())
stores[primaryUp] = append(stores[primaryUp], s)
}
}
if labelValue == m.config.DRAutoSync.DR {
if down {
stores[drDown] = append(stores[drDown], s.GetID())
stores[drDown] = append(stores[drDown], s)
} else {
stores[drUp] = append(stores[drUp], s.GetID())
stores[drUp] = append(stores[drUp], s)
}
}
}
for i := range stores {
sort.Slice(stores[i], func(a, b int) bool { return stores[i][a] < stores[i][b] })
sort.Slice(stores[i], func(a, b int) bool { return stores[i][a].GetID() < stores[i][b].GetID() })
}
return stores
}
Expand Down
Loading

0 comments on commit 344a7a9

Please sign in to comment.