Skip to content

Commit

Permalink
rule_checker: fix the issue of not being able to achieve the better R…
Browse files Browse the repository at this point in the history
…egionFit (#7219) (#7244)

close #7185

fix the issue of not being able to achieve the better RegionFit
- try to replace the existing peer in another rulefit

Signed-off-by: nolouch <[email protected]>

Co-authored-by: nolouch <[email protected]>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Oct 26, 2023
1 parent 026ac3c commit 190d5e2
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 2 deletions.
24 changes: 24 additions & 0 deletions pkg/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,30 @@ func SetLastPersistTime(lastPersist time.Time) StoreCreateOption {
}
}

// SetStoreState sets the state for the store.
func SetStoreState(state metapb.StoreState, physicallyDestroyed ...bool) StoreCreateOption {
return func(store *StoreInfo) {
meta := typeutil.DeepClone(store.meta, StoreFactory)
switch state {
case metapb.StoreState_Up:
meta.State = metapb.StoreState_Up
meta.NodeState = metapb.NodeState_Serving
case metapb.StoreState_Offline:
if len(physicallyDestroyed) != 0 {
meta.State = metapb.StoreState_Offline
meta.NodeState = metapb.NodeState_Removing
meta.PhysicallyDestroyed = physicallyDestroyed[0]
} else {
panic("physicallyDestroyed should be set when set store state to offline")
}
case metapb.StoreState_Tombstone:
meta.State = metapb.StoreState_Tombstone
meta.NodeState = metapb.NodeState_Removed
}
store.meta = meta
}
}

// SetStoreStats sets the statistics information for the store.
func SetStoreStats(stats *pdpb.StoreStats) StoreCreateOption {
return func(store *StoreInfo) {
Expand Down
24 changes: 22 additions & 2 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var (
ruleCheckerReplaceOfflineCounter = checkerCounter.WithLabelValues(ruleChecker, "replace-offline")
ruleCheckerAddRulePeerCounter = checkerCounter.WithLabelValues(ruleChecker, "add-rule-peer")
ruleCheckerNoStoreAddCounter = checkerCounter.WithLabelValues(ruleChecker, "no-store-add")
ruleCheckerNoStoreThenTryReplace = checkerCounter.WithLabelValues(ruleChecker, "no-store-then-try-replace")
ruleCheckerNoStoreReplaceCounter = checkerCounter.WithLabelValues(ruleChecker, "no-store-replace")
ruleCheckerFixPeerRoleCounter = checkerCounter.WithLabelValues(ruleChecker, "fix-peer-role")
ruleCheckerFixLeaderRoleCounter = checkerCounter.WithLabelValues(ruleChecker, "fix-leader-role")
Expand Down Expand Up @@ -185,7 +186,7 @@ func (c *RuleChecker) isWitnessEnabled() bool {
func (c *RuleChecker) fixRulePeer(region *core.RegionInfo, fit *placement.RegionFit, rf *placement.RuleFit) (*operator.Operator, error) {
// make up peers.
if len(rf.Peers) < rf.Rule.Count {
return c.addRulePeer(region, rf)
return c.addRulePeer(region, fit, rf)
}
// fix down/offline peers.
for _, peer := range rf.Peers {
Expand Down Expand Up @@ -220,7 +221,7 @@ func (c *RuleChecker) fixRulePeer(region *core.RegionInfo, fit *placement.Region
return c.fixBetterLocation(region, rf)
}

func (c *RuleChecker) addRulePeer(region *core.RegionInfo, rf *placement.RuleFit) (*operator.Operator, error) {
func (c *RuleChecker) addRulePeer(region *core.RegionInfo, fit *placement.RegionFit, rf *placement.RuleFit) (*operator.Operator, error) {
ruleCheckerAddRulePeerCounter.Inc()
ruleStores := c.getRuleFitStores(rf)
isWitness := rf.Rule.IsWitness && c.isWitnessEnabled()
Expand All @@ -229,6 +230,25 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, rf *placement.RuleFit
if store == 0 {
ruleCheckerNoStoreAddCounter.Inc()
c.handleFilterState(region, filterByTempState)
// try to replace an existing peer that matches the label constraints.
// issue: https://github.com/tikv/pd/issues/7185
for _, p := range region.GetPeers() {
s := c.cluster.GetStore(p.GetStoreId())
if placement.MatchLabelConstraints(s, rf.Rule.LabelConstraints) {
oldPeerRuleFit := fit.GetRuleFit(p.GetId())
if oldPeerRuleFit == nil || !oldPeerRuleFit.IsSatisfied() || oldPeerRuleFit == rf {
continue
}
ruleCheckerNoStoreThenTryReplace.Inc()
op, err := c.replaceUnexpectRulePeer(region, oldPeerRuleFit, fit, p, "swap-fit")
if err != nil {
return nil, err
}
if op != nil {
return op, nil
}
}
}
return nil, errNoStoreToAdd
}
peer := &metapb.Peer{StoreId: store, Role: rf.Rule.Role.MetaPeerRole(), IsWitness: isWitness}
Expand Down
142 changes: 142 additions & 0 deletions pkg/schedule/checker/rule_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package checker
import (
"context"
"fmt"
"strconv"
"strings"
"testing"
"time"

Expand All @@ -38,6 +40,7 @@ import (

func TestRuleCheckerTestSuite(t *testing.T) {
suite.Run(t, new(ruleCheckerTestSuite))
suite.Run(t, new(ruleCheckerTestAdvancedSuite))
}

type ruleCheckerTestSuite struct {
Expand Down Expand Up @@ -1583,3 +1586,142 @@ func (suite *ruleCheckerTestSuite) TestTiFlashLocationLabels() {
op := suite.rc.Check(suite.cluster.GetRegion(1))
suite.Nil(op)
}

type ruleCheckerTestAdvancedSuite struct {
suite.Suite
cluster *mockcluster.Cluster
ruleManager *placement.RuleManager
rc *RuleChecker
ctx context.Context
cancel context.CancelFunc
}

func (suite *ruleCheckerTestAdvancedSuite) SetupTest() {
cfg := mockconfig.NewTestOptions()
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.cluster = mockcluster.NewCluster(suite.ctx, cfg)
suite.cluster.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.SwitchWitness))
suite.cluster.SetEnablePlacementRules(true)
suite.cluster.SetEnableWitness(true)
suite.cluster.SetEnableUseJointConsensus(true)
suite.ruleManager = suite.cluster.RuleManager
suite.rc = NewRuleChecker(suite.ctx, suite.cluster, suite.ruleManager, cache.NewDefaultCache(10))
}

func (suite *ruleCheckerTestAdvancedSuite) TearDownTest() {
suite.cancel()
}

func makeStores() placement.StoreSet {
stores := core.NewStoresInfo()
now := time.Now()
for region := 1; region <= 3; region++ {
for zone := 1; zone <= 5; zone++ {
for host := 1; host <= 5; host++ {
id := uint64(region*100 + zone*10 + host)
labels := map[string]string{
"region": fmt.Sprintf("region%d", region),
"zone": fmt.Sprintf("zone%d", zone),
"host": fmt.Sprintf("host%d", host),
}
if host == 5 {
labels["engine"] = "tiflash"
}
if zone == 1 && host == 1 {
labels["type"] = "read"
}
stores.SetStore(core.NewStoreInfoWithLabel(id, labels).Clone(core.SetLastHeartbeatTS(now), core.SetStoreState(metapb.StoreState_Up)))
}
}
}
return stores
}

// example: "1111_leader,1234,2111_learner"
func makeRegion(def string) *core.RegionInfo {
var regionMeta metapb.Region
var leader *metapb.Peer
for _, peerDef := range strings.Split(def, ",") {
role, idStr := placement.Follower, peerDef
if strings.Contains(peerDef, "_") {
splits := strings.Split(peerDef, "_")
idStr, role = splits[0], placement.PeerRoleType(splits[1])
}
id, _ := strconv.Atoi(idStr)
peer := &metapb.Peer{Id: uint64(id), StoreId: uint64(id), Role: role.MetaPeerRole()}
regionMeta.Peers = append(regionMeta.Peers, peer)
if role == placement.Leader {
leader = peer
regionMeta.Id = peer.Id - 1
}
}
return core.NewRegionInfo(&regionMeta, leader)
}

// example: "3/voter/zone=zone1+zone2,rack=rack2/zone,rack,host"
// count role constraints location_labels
func makeRule(def string) *placement.Rule {
var rule placement.Rule
splits := strings.Split(def, "/")
rule.Count, _ = strconv.Atoi(splits[0])
rule.Role = placement.PeerRoleType(splits[1])
// only support k=v type constraint
for _, c := range strings.Split(splits[2], ",") {
if c == "" {
break
}
kv := strings.Split(c, "=")
rule.LabelConstraints = append(rule.LabelConstraints, placement.LabelConstraint{
Key: kv[0],
Op: "in",
Values: strings.Split(kv[1], "+"),
})
}
rule.LocationLabels = strings.Split(splits[3], ",")
return &rule
}

// TestReplaceAnExistingPeerCases address issue: https://github.com/tikv/pd/issues/7185
func (suite *ruleCheckerTestAdvancedSuite) TestReplaceAnExistingPeerCases() {
stores := makeStores()
for _, store := range stores.GetStores() {
suite.cluster.PutStore(store)
}

testCases := []struct {
region string
rules []string
opStr string
}{
{"111_leader,211,311", []string{"3/voter//", "3/learner/type=read/"}, "replace-rule-swap-fit-peer {mv peer: store [111] to"},
{"211,311_leader,151", []string{"3/voter//", "3/learner/type=read/"}, "add-rule-peer {add peer: store [111]}"},
{"111_learner,211,311_leader,151", []string{"3/voter//", "3/learner/type=read/"}, "replace-rule-swap-fit-peer {mv peer: store [211] to"},
{"111_learner,311_leader,151,351", []string{"3/voter//", "3/learner/type=read/"}, "add-rule-peer {add peer: store [211]}"},
{"111_learner,211_learner,311_leader,151,351", []string{"3/voter//", "3/learner/type=read/"}, "replace-rule-swap-fit-peer {mv peer: store [311] to"},
{"111_learner,211_learner,151_leader,252,351", []string{"3/voter//", "3/learner/type=read/"}, "add-rule-peer {add peer: store [311]}"},
{"111_learner,211_learner,311_learner,151_leader,252,351", []string{"3/voter//", "3/learner/type=read/"}, ""},
}
groupName := "a_test"
for i, cas := range testCases {
bundle := placement.GroupBundle{
ID: groupName,
Index: 1000,
Override: true,
Rules: make([]*placement.Rule, 0, len(cas.rules)),
}
for id, r := range cas.rules {
rule := makeRule(r)
rule.ID = fmt.Sprintf("r%d", id)
bundle.Rules = append(bundle.Rules, rule)
}
err := suite.ruleManager.SetGroupBundle(bundle)
suite.NoError(err)
region := makeRegion(cas.region)
suite.cluster.PutRegion(region)
op := suite.rc.Check(region)
if len(cas.opStr) > 0 {
suite.Contains(op.String(), cas.opStr, i, cas.opStr)
}
suite.ruleManager.DeleteGroupBundle(groupName, false)
}
}
6 changes: 6 additions & 0 deletions pkg/schedule/placement/fit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ func makeStores() StoreSet {
if x == 5 {
labels["engine"] = "tiflash"
}
if id == 1111 || id == 2111 || id == 3111 {
labels["disk"] = "ssd"
}
stores.SetStore(core.NewStoreInfoWithLabel(id, labels).Clone(core.SetLastHeartbeatTS(now)))
}
}
Expand Down Expand Up @@ -186,6 +189,9 @@ func TestFitRegion(t *testing.T) {
{"1111,1112,1113,1114", []string{"3/voter//", "1/voter/id=id1/"}, "1112,1113,1114/1111"},
{"1111,2211,3111,3112", []string{"3/voter//zone", "1/voter/rack=rack2/"}, "1111,2211,3111//3112"},
{"1111,2211,3111,3112", []string{"1/voter/rack=rack2/", "3/voter//zone"}, "2211/1111,3111,3112"},
{"1111_leader,2111,3111", []string{"3/voter//", "3/learner/disk=ssd/"}, "1111,2111,3111/"},
{"1111_leader,2111,3111,4111", []string{"3/voter//", "3/learner/disk=ssd/"}, "1111,2111,4111/3111"},
{"1111_leader,2111,3111,4111_learner", []string{"3/voter//", "3/learner/disk=ssd/"}, "1111,2111,3111//4111"},
}

for _, testCase := range testCases {
Expand Down

0 comments on commit 190d5e2

Please sign in to comment.