diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go new file mode 100644 index 000000000000..b2986f722df7 --- /dev/null +++ b/pkg/mcs/scheduling/server/cluster.go @@ -0,0 +1,466 @@ +package server + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/kvproto/pkg/schedulingpb" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/cluster" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/scheduling/server/config" + "github.com/tikv/pd/pkg/schedule" + sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/schedule/hbstream" + "github.com/tikv/pd/pkg/schedule/labeler" + "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/schedule/schedulers" + "github.com/tikv/pd/pkg/slice" + "github.com/tikv/pd/pkg/statistics" + "github.com/tikv/pd/pkg/statistics/buckets" + "github.com/tikv/pd/pkg/statistics/utils" + "github.com/tikv/pd/pkg/storage" + "github.com/tikv/pd/pkg/utils/logutil" + "go.uber.org/zap" +) + +// Cluster is used to manage all information for scheduling purpose. +type Cluster struct { + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + *core.BasicCluster + persistConfig *config.PersistConfig + ruleManager *placement.RuleManager + labelerManager *labeler.RegionLabeler + regionStats *statistics.RegionStatistics + labelStats *statistics.LabelStatistics + hotStat *statistics.HotStat + storage storage.Storage + coordinator *schedule.Coordinator + checkMembershipCh chan struct{} + apiServerLeader atomic.Value + clusterID uint64 +} + +const regionLabelGCInterval = time.Hour + +// NewCluster creates a new cluster. +func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, storage storage.Storage, basicCluster *core.BasicCluster, hbStreams *hbstream.HeartbeatStreams, clusterID uint64, checkMembershipCh chan struct{}) (*Cluster, error) { + ctx, cancel := context.WithCancel(parentCtx) + labelerManager, err := labeler.NewRegionLabeler(ctx, storage, regionLabelGCInterval) + if err != nil { + cancel() + return nil, err + } + ruleManager := placement.NewRuleManager(storage, basicCluster, persistConfig) + c := &Cluster{ + ctx: ctx, + cancel: cancel, + BasicCluster: basicCluster, + ruleManager: ruleManager, + labelerManager: labelerManager, + persistConfig: persistConfig, + hotStat: statistics.NewHotStat(ctx), + labelStats: statistics.NewLabelStatistics(), + regionStats: statistics.NewRegionStatistics(basicCluster, persistConfig, ruleManager), + storage: storage, + clusterID: clusterID, + checkMembershipCh: checkMembershipCh, + } + c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams) + err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel()) + if err != nil { + cancel() + return nil, err + } + return c, nil +} + +// GetCoordinator returns the coordinator +func (c *Cluster) GetCoordinator() *schedule.Coordinator { + return c.coordinator +} + +// GetHotStat gets hot stat. +func (c *Cluster) GetHotStat() *statistics.HotStat { + return c.hotStat +} + +// GetRegionStats gets region statistics. +func (c *Cluster) GetRegionStats() *statistics.RegionStatistics { + return c.regionStats +} + +// GetLabelStats gets label statistics. +func (c *Cluster) GetLabelStats() *statistics.LabelStatistics { + return c.labelStats +} + +// GetBasicCluster returns the basic cluster. +func (c *Cluster) GetBasicCluster() *core.BasicCluster { + return c.BasicCluster +} + +// GetSharedConfig returns the shared config. +func (c *Cluster) GetSharedConfig() sc.SharedConfigProvider { + return c.persistConfig +} + +// GetRuleManager returns the rule manager. +func (c *Cluster) GetRuleManager() *placement.RuleManager { + return c.ruleManager +} + +// GetRegionLabeler returns the region labeler. +func (c *Cluster) GetRegionLabeler() *labeler.RegionLabeler { + return c.labelerManager +} + +// GetStoresLoads returns load stats of all stores. +func (c *Cluster) GetStoresLoads() map[uint64][]float64 { + return c.hotStat.GetStoresLoads() +} + +// IsRegionHot checks if a region is in hot state. +func (c *Cluster) IsRegionHot(region *core.RegionInfo) bool { + return c.hotStat.IsRegionHot(region, c.persistConfig.GetHotRegionCacheHitsThreshold()) +} + +// GetHotPeerStat returns hot peer stat with specified regionID and storeID. +func (c *Cluster) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *statistics.HotPeerStat { + return c.hotStat.GetHotPeerStat(rw, regionID, storeID) +} + +// RegionReadStats returns hot region's read stats. +// The result only includes peers that are hot enough. +// RegionStats is a thread-safe method +func (c *Cluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat { + // As read stats are reported by store heartbeat, the threshold needs to be adjusted. + threshold := c.persistConfig.GetHotRegionCacheHitsThreshold() * + (utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval) + return c.hotStat.RegionStats(utils.Read, threshold) +} + +// RegionWriteStats returns hot region's write stats. +// The result only includes peers that are hot enough. +func (c *Cluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { + // RegionStats is a thread-safe method + return c.hotStat.RegionStats(utils.Write, c.persistConfig.GetHotRegionCacheHitsThreshold()) +} + +// BucketsStats returns hot region's buckets stats. +func (c *Cluster) BucketsStats(degree int, regionIDs ...uint64) map[uint64][]*buckets.BucketStat { + return c.hotStat.BucketsStats(degree, regionIDs...) +} + +// GetStorage returns the storage. +func (c *Cluster) GetStorage() storage.Storage { + return c.storage +} + +// GetCheckerConfig returns the checker config. +func (c *Cluster) GetCheckerConfig() sc.CheckerConfigProvider { return c.persistConfig } + +// GetSchedulerConfig returns the scheduler config. +func (c *Cluster) GetSchedulerConfig() sc.SchedulerConfigProvider { return c.persistConfig } + +// GetStoreConfig returns the store config. +func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return c.persistConfig } + +// AllocID allocates a new ID. +func (c *Cluster) AllocID() (uint64, error) { + client, err := c.getAPIServerLeaderClient() + if err != nil { + return 0, err + } + resp, err := client.AllocID(c.ctx, &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: c.clusterID}}) + if err != nil { + c.checkMembershipCh <- struct{}{} + return 0, err + } + return resp.GetId(), nil +} + +func (c *Cluster) getAPIServerLeaderClient() (pdpb.PDClient, error) { + cli := c.apiServerLeader.Load() + if cli == nil { + c.checkMembershipCh <- struct{}{} + return nil, errors.New("API server leader is not found") + } + return cli.(pdpb.PDClient), nil +} + +// SwitchAPIServerLeader switches the API server leader. +func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool { + old := c.apiServerLeader.Load() + return c.apiServerLeader.CompareAndSwap(old, new) +} + +// updateScheduler listens on the schedulers updating notifier and manage the scheduler creation and deletion. +func (c *Cluster) updateScheduler() { + defer logutil.LogPanic() + defer c.wg.Done() + + // Make sure the coordinator has initialized all the existing schedulers. + c.waitSchedulersInitialized() + // Establish a notifier to listen the schedulers updating. + notifier := make(chan struct{}, 1) + // Make sure the check will be triggered once later. + notifier <- struct{}{} + c.persistConfig.SetSchedulersUpdatingNotifier(notifier) + for { + select { + case <-c.ctx.Done(): + log.Info("cluster is closing, stop listening the schedulers updating notifier") + return + case <-notifier: + // This is triggered by the watcher when the schedulers are updated. + } + + log.Info("schedulers updating notifier is triggered, try to update the scheduler") + var ( + schedulersController = c.coordinator.GetSchedulersController() + latestSchedulersConfig = c.persistConfig.GetScheduleConfig().Schedulers + ) + // Create the newly added schedulers. + for _, scheduler := range latestSchedulersConfig { + s, err := schedulers.CreateScheduler( + scheduler.Type, + c.coordinator.GetOperatorController(), + c.storage, + schedulers.ConfigSliceDecoder(scheduler.Type, scheduler.Args), + schedulersController.RemoveScheduler, + ) + if err != nil { + log.Error("failed to create scheduler", + zap.String("scheduler-type", scheduler.Type), + zap.Strings("scheduler-args", scheduler.Args), + errs.ZapError(err)) + continue + } + name := s.GetName() + if existed, _ := schedulersController.IsSchedulerExisted(name); existed { + log.Info("scheduler has already existed, skip adding it", + zap.String("scheduler-name", name), + zap.Strings("scheduler-args", scheduler.Args)) + continue + } + if err := schedulersController.AddScheduler(s, scheduler.Args...); err != nil { + log.Error("failed to add scheduler", + zap.String("scheduler-name", name), + zap.Strings("scheduler-args", scheduler.Args), + errs.ZapError(err)) + continue + } + log.Info("add scheduler successfully", + zap.String("scheduler-name", name), + zap.Strings("scheduler-args", scheduler.Args)) + } + // Remove the deleted schedulers. + for _, name := range schedulersController.GetSchedulerNames() { + scheduler := schedulersController.GetScheduler(name) + if slice.AnyOf(latestSchedulersConfig, func(i int) bool { + return latestSchedulersConfig[i].Type == scheduler.GetType() + }) { + continue + } + if err := schedulersController.RemoveScheduler(name); err != nil { + log.Error("failed to remove scheduler", + zap.String("scheduler-name", name), + errs.ZapError(err)) + continue + } + log.Info("remove scheduler successfully", + zap.String("scheduler-name", name)) + } + } +} + +func (c *Cluster) waitSchedulersInitialized() { + ticker := time.NewTicker(time.Millisecond * 100) + defer ticker.Stop() + for { + if c.coordinator.AreSchedulersInitialized() { + return + } + select { + case <-c.ctx.Done(): + log.Info("cluster is closing, stop waiting the schedulers initialization") + return + case <-ticker.C: + } + } +} + +// TODO: implement the following methods + +// UpdateRegionsLabelLevelStats updates the status of the region label level by types. +func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) { + for _, region := range regions { + c.labelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.persistConfig.GetLocationLabels()) + } +} + +func (c *Cluster) getStoresWithoutLabelLocked(region *core.RegionInfo, key, value string) []*core.StoreInfo { + stores := make([]*core.StoreInfo, 0, len(region.GetPeers())) + for _, p := range region.GetPeers() { + if store := c.GetStore(p.GetStoreId()); store != nil && !core.IsStoreContainLabel(store.GetMeta(), key, value) { + stores = append(stores, store) + } + } + return stores +} + +// HandleStoreHeartbeat updates the store status. +func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatRequest) error { + stats := heartbeat.GetStats() + storeID := stats.GetStoreId() + store := c.GetStore(storeID) + if store == nil { + return errors.Errorf("store %v not found", storeID) + } + + nowTime := time.Now() + newStore := store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime)) + + if store := c.GetStore(storeID); store != nil { + statistics.UpdateStoreHeartbeatMetrics(store) + } + c.PutStore(newStore) + c.hotStat.Observe(storeID, newStore.GetStoreStats()) + c.hotStat.FilterUnhealthyStore(c) + reportInterval := stats.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + + regions := make(map[uint64]*core.RegionInfo, len(stats.GetPeerStats())) + for _, peerStat := range stats.GetPeerStats() { + regionID := peerStat.GetRegionId() + region := c.GetRegion(regionID) + regions[regionID] = region + if region == nil { + log.Warn("discard hot peer stat for unknown region", + zap.Uint64("region-id", regionID), + zap.Uint64("store-id", storeID)) + continue + } + peer := region.GetStorePeer(storeID) + if peer == nil { + log.Warn("discard hot peer stat for unknown region peer", + zap.Uint64("region-id", regionID), + zap.Uint64("store-id", storeID)) + continue + } + readQueryNum := core.GetReadQueryNum(peerStat.GetQueryStats()) + loads := []float64{ + utils.RegionReadBytes: float64(peerStat.GetReadBytes()), + utils.RegionReadKeys: float64(peerStat.GetReadKeys()), + utils.RegionReadQueryNum: float64(readQueryNum), + utils.RegionWriteBytes: 0, + utils.RegionWriteKeys: 0, + utils.RegionWriteQueryNum: 0, + } + peerInfo := core.NewPeerInfo(peer, loads, interval) + c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region)) + } + + // Here we will compare the reported regions with the previous hot peers to decide if it is still hot. + c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval)) + return nil +} + +// runUpdateStoreStats updates store stats periodically. +func (c *Cluster) runUpdateStoreStats() { + defer logutil.LogPanic() + defer c.wg.Done() + + ticker := time.NewTicker(9 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-c.ctx.Done(): + log.Info("update store stats background jobs has been stopped") + return + case <-ticker.C: + c.UpdateAllStoreStatus() + } + } +} + +// StartBackgroundJobs starts background jobs. +func (c *Cluster) StartBackgroundJobs() { + c.wg.Add(2) + go c.updateScheduler() + go c.runUpdateStoreStats() +} + +// StopBackgroundJobs stops background jobs. +func (c *Cluster) StopBackgroundJobs() { + c.cancel() + c.wg.Wait() +} + +// HandleRegionHeartbeat processes RegionInfo reports from client. +func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error { + if err := c.processRegionHeartbeat(region); err != nil { + return err + } + + c.coordinator.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, c.coordinator.RecordOpStepWithTTL) + return nil +} + +// processRegionHeartbeat updates the region information. +func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error { + origin, _, err := c.PreCheckPutRegion(region) + if err != nil { + return err + } + if c.GetStoreConfig().IsEnableRegionBucket() { + region.InheritBuckets(origin) + } + + cluster.HandleStatsAsync(c, region) + + hasRegionStats := c.regionStats != nil + // Save to storage if meta is updated, except for flashback. + // Save to cache if meta or leader is updated, or contains any down/pending peer. + // Mark isNew if the region in cache does not have leader. + changed := core.GenerateRegionGuideFunc(true)(region, origin) + if !changed.SaveCache && !changed.IsNew { + // Due to some config changes need to update the region stats as well, + // so we do some extra checks here. + if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { + c.regionStats.Observe(region, c.GetRegionStores(region)) + } + return nil + } + + var overlaps []*core.RegionInfo + if changed.SaveCache { + // To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one, + // check its validation again here. + // + // However it can't solve the race condition of concurrent heartbeats from the same region. + if overlaps, err = c.AtomicCheckAndPutRegion(region); err != nil { + return err + } + + cluster.HandleOverlaps(c, overlaps) + } + + cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, changed.IsNew, c.IsPrepared()) + return nil +} + +// IsPrepared return true if the prepare checker is ready. +func (c *Cluster) IsPrepared() bool { + return c.coordinator.GetPrepareChecker().IsPrepared() +} diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index c1318868aef7..93fbc0a93f33 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -165,8 +165,13 @@ func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) { func (mc *Cluster) initRuleManager() { if mc.RuleManager == nil { +<<<<<<< HEAD mc.RuleManager = placement.NewRuleManager(core.NewStorage(kv.NewMemoryKV()), mc, mc.GetOpts()) mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels) +======= + mc.RuleManager = placement.NewRuleManager(mc.GetStorage(), mc, mc.GetSharedConfig()) + mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels, mc.GetReplicationConfig().IsolationLevel) +>>>>>>> 5b3d0172b (*: fix sync isolation level to default placement rule (#7122)) } } diff --git a/pkg/schedule/checker/rule_checker_test.go b/pkg/schedule/checker/rule_checker_test.go new file mode 100644 index 000000000000..ad140e91606b --- /dev/null +++ b/pkg/schedule/checker/rule_checker_test.go @@ -0,0 +1,1585 @@ +// Copyright 2019 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package checker + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/cache" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/core/constant" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mock/mockcluster" + "github.com/tikv/pd/pkg/mock/mockconfig" + "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/utils/operatorutil" + "github.com/tikv/pd/pkg/versioninfo" +) + +func TestRuleCheckerTestSuite(t *testing.T) { + suite.Run(t, new(ruleCheckerTestSuite)) +} + +type ruleCheckerTestSuite struct { + suite.Suite + cluster *mockcluster.Cluster + ruleManager *placement.RuleManager + rc *RuleChecker + ctx context.Context + cancel context.CancelFunc +} + +func (suite *ruleCheckerTestSuite) SetupTest() { + cfg := mockconfig.NewTestOptions() + suite.ctx, suite.cancel = context.WithCancel(context.Background()) + suite.cluster = mockcluster.NewCluster(suite.ctx, cfg) + suite.cluster.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.SwitchWitness)) + suite.cluster.SetEnablePlacementRules(true) + suite.cluster.SetEnableWitness(true) + suite.cluster.SetEnableUseJointConsensus(false) + suite.ruleManager = suite.cluster.RuleManager + suite.rc = NewRuleChecker(suite.ctx, suite.cluster, suite.ruleManager, cache.NewDefaultCache(10)) +} + +func (suite *ruleCheckerTestSuite) TearDownTest() { + suite.cancel() +} + +func (suite *ruleCheckerTestSuite) TestAddRulePeer() { + suite.cluster.AddLeaderStore(1, 1) + suite.cluster.AddLeaderStore(2, 1) + suite.cluster.AddLeaderStore(3, 1) + suite.cluster.AddLeaderRegionWithRange(1, "", "", 1, 2) + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.NotNil(op) + suite.Equal("add-rule-peer", op.Desc()) + suite.Equal(constant.High, op.GetPriorityLevel()) + suite.Equal(uint64(3), op.Step(0).(operator.AddLearner).ToStore) +} + +func (suite *ruleCheckerTestSuite) TestAddRulePeerWithIsolationLevel() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"zone": "z1", "rack": "r1", "host": "h2"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"zone": "z1", "rack": "r2", "host": "h1"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"zone": "z1", "rack": "r3", "host": "h1"}) + suite.cluster.AddLeaderRegionWithRange(1, "", "", 1, 2) + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "test", + Index: 100, + Override: true, + Role: placement.Voter, + Count: 3, + LocationLabels: []string{"zone", "rack", "host"}, + IsolationLevel: "zone", + }) + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Nil(op) + suite.cluster.AddLeaderRegionWithRange(1, "", "", 1, 3) + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "test", + Index: 100, + Override: true, + Role: placement.Voter, + Count: 3, + LocationLabels: []string{"zone", "rack", "host"}, + IsolationLevel: "rack", + }) + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.NotNil(op) + suite.Equal("add-rule-peer", op.Desc()) + suite.Equal(uint64(4), op.Step(0).(operator.AddLearner).ToStore) +} + +func (suite *ruleCheckerTestSuite) TestReplaceDownPeerWithIsolationLevel() { + suite.cluster.SetMaxStoreDownTime(100 * time.Millisecond) + suite.cluster.AddLabelsStore(1, 1, map[string]string{"zone": "z1", "host": "h1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"zone": "z1", "host": "h2"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"zone": "z2", "host": "h3"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"zone": "z2", "host": "h4"}) + suite.cluster.AddLabelsStore(5, 1, map[string]string{"zone": "z3", "host": "h5"}) + suite.cluster.AddLabelsStore(6, 1, map[string]string{"zone": "z3", "host": "h6"}) + suite.cluster.AddLeaderRegionWithRange(1, "", "", 1, 3, 5) + suite.ruleManager.DeleteRule("pd", "default") + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "test", + Index: 100, + Override: true, + Role: placement.Voter, + Count: 3, + LocationLabels: []string{"zone", "host"}, + IsolationLevel: "zone", + }) + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Nil(op) + region := suite.cluster.GetRegion(1) + downPeer := []*pdpb.PeerStats{ + {Peer: region.GetStorePeer(5), DownSeconds: 6000}, + } + region = region.Clone(core.WithDownPeers(downPeer)) + suite.cluster.PutRegion(region) + suite.cluster.SetStoreDown(5) + suite.cluster.SetStoreDown(6) + time.Sleep(200 * time.Millisecond) + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Nil(op) +} + +func (suite *ruleCheckerTestSuite) TestFixPeer() { + suite.cluster.AddLeaderStore(1, 1) + suite.cluster.AddLeaderStore(2, 1) + suite.cluster.AddLeaderStore(3, 1) + suite.cluster.AddLeaderStore(4, 1) + suite.cluster.AddLeaderRegionWithRange(1, "", "", 1, 2, 3) + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Nil(op) + suite.cluster.SetStoreDown(2) + r := suite.cluster.GetRegion(1) + r = r.Clone(core.WithDownPeers([]*pdpb.PeerStats{{Peer: r.GetStorePeer(2), DownSeconds: 60000}})) + op = suite.rc.Check(r) + suite.NotNil(op) + suite.Equal("fast-replace-rule-down-peer", op.Desc()) + suite.Equal(constant.Urgent, op.GetPriorityLevel()) + var add operator.AddLearner + suite.IsType(add, op.Step(0)) + suite.cluster.SetStoreUp(2) + suite.cluster.SetStoreOffline(2) + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.NotNil(op) + suite.Equal("replace-rule-offline-peer", op.Desc()) + suite.Equal(constant.High, op.GetPriorityLevel()) + suite.IsType(add, op.Step(0)) + + suite.cluster.SetStoreUp(2) + // leader store offline + suite.cluster.SetStoreOffline(1) + r1 := suite.cluster.GetRegion(1) + nr1 := r1.Clone(core.WithPendingPeers([]*metapb.Peer{r1.GetStorePeer(3)})) + suite.cluster.PutRegion(nr1) + hasTransferLeader := false + for i := 0; i < 100; i++ { + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.NotNil(op) + if step, ok := op.Step(0).(operator.TransferLeader); ok { + suite.Equal(uint64(1), step.FromStore) + suite.NotEqual(uint64(3), step.ToStore) + hasTransferLeader = true + } + } + suite.True(hasTransferLeader) +} + +func (suite *ruleCheckerTestSuite) TestFixOrphanPeers() { + suite.cluster.AddLeaderStore(1, 1) + suite.cluster.AddLeaderStore(2, 1) + suite.cluster.AddLeaderStore(3, 1) + suite.cluster.AddLeaderStore(4, 1) + suite.cluster.AddLeaderRegionWithRange(1, "", "", 1, 2, 3, 4) + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.NotNil(op) + suite.Equal("remove-orphan-peer", op.Desc()) + suite.Equal(uint64(4), op.Step(0).(operator.RemovePeer).FromStore) +} + +func (suite *ruleCheckerTestSuite) TestFixToManyOrphanPeers() { + suite.cluster.AddLeaderStore(1, 1) + suite.cluster.AddLeaderStore(2, 1) + suite.cluster.AddLeaderStore(3, 1) + suite.cluster.AddLeaderStore(4, 1) + suite.cluster.AddLeaderStore(5, 1) + suite.cluster.AddLeaderStore(6, 1) + suite.cluster.AddRegionWithLearner(1, 1, []uint64{2, 3}, []uint64{4, 5, 6}) + // Case1: + // store 4, 5, 6 are orphan peers, and peer on store 3 is pending and down peer. + region := suite.cluster.GetRegion(1) + region = region.Clone( + core.WithDownPeers([]*pdpb.PeerStats{{Peer: region.GetStorePeer(3), DownSeconds: 60000}}), + core.WithPendingPeers([]*metapb.Peer{region.GetStorePeer(3)})) + suite.cluster.PutRegion(region) + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.NotNil(op) + suite.Equal("remove-orphan-peer", op.Desc()) + suite.Equal(uint64(5), op.Step(0).(operator.RemovePeer).FromStore) + + // Case2: + // store 4, 5, 6 are orphan peers, and peer on store 3 is down peer. and peer on store 4, 5 are pending. + region = suite.cluster.GetRegion(1) + region = region.Clone( + core.WithDownPeers([]*pdpb.PeerStats{{Peer: region.GetStorePeer(3), DownSeconds: 60000}}), + core.WithPendingPeers([]*metapb.Peer{region.GetStorePeer(4), region.GetStorePeer(5)})) + suite.cluster.PutRegion(region) + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.NotNil(op) + suite.Equal("remove-orphan-peer", op.Desc()) + suite.Equal(uint64(4), op.Step(0).(operator.RemovePeer).FromStore) +} + +func (suite *ruleCheckerTestSuite) TestFixOrphanPeers2() { + // check orphan peers can only be handled when all rules are satisfied. + suite.cluster.AddLabelsStore(1, 1, map[string]string{"foo": "bar"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"foo": "bar"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"foo": "baz"}) + suite.cluster.AddLeaderRegionWithRange(1, "", "", 1, 3) + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "r1", + Index: 100, + Override: true, + Role: placement.Leader, + Count: 2, + LabelConstraints: []placement.LabelConstraint{ + {Key: "foo", Op: "in", Values: []string{"baz"}}, + }, + }) + suite.cluster.SetStoreDown(2) + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Nil(op) +} + +func (suite *ruleCheckerTestSuite) TestFixRole() { + suite.cluster.AddLeaderStore(1, 1) + suite.cluster.AddLeaderStore(2, 1) + suite.cluster.AddLeaderStore(3, 1) + suite.cluster.AddLeaderRegionWithRange(1, "", "", 2, 1, 3) + r := suite.cluster.GetRegion(1) + p := r.GetStorePeer(1) + p.Role = metapb.PeerRole_Learner + r = r.Clone(core.WithLearners([]*metapb.Peer{p})) + op := suite.rc.Check(r) + suite.NotNil(op) + suite.Equal("fix-peer-role", op.Desc()) + suite.Equal(uint64(1), op.Step(0).(operator.PromoteLearner).ToStore) +} + +func (suite *ruleCheckerTestSuite) TestFixRoleLeader() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"role": "follower"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"role": "follower"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"role": "voter"}) + suite.cluster.AddLeaderRegionWithRange(1, "", "", 1, 2, 3) + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "r1", + Index: 100, + Override: true, + Role: placement.Voter, + Count: 1, + LabelConstraints: []placement.LabelConstraint{ + {Key: "role", Op: "in", Values: []string{"voter"}}, + }, + }) + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "r2", + Index: 101, + Role: placement.Follower, + Count: 2, + LabelConstraints: []placement.LabelConstraint{ + {Key: "role", Op: "in", Values: []string{"follower"}}, + }, + }) + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.NotNil(op) + suite.Equal("fix-follower-role", op.Desc()) + suite.Equal(uint64(3), op.Step(0).(operator.TransferLeader).ToStore) +} + +func (suite *ruleCheckerTestSuite) TestFixRoleLeaderIssue3130() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"role": "follower"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"role": "leader"}) + suite.cluster.AddLeaderRegion(1, 1, 2) + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "r1", + Index: 100, + Override: true, + Role: placement.Leader, + Count: 1, + LabelConstraints: []placement.LabelConstraint{ + {Key: "role", Op: "in", Values: []string{"leader"}}, + }, + }) + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.NotNil(op) + suite.Equal("fix-leader-role", op.Desc()) + suite.Equal(uint64(2), op.Step(0).(operator.TransferLeader).ToStore) + + suite.cluster.SetStoreBusy(2, true) + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Nil(op) + suite.cluster.SetStoreBusy(2, false) + + suite.cluster.AddLeaderRegion(1, 2, 1) + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.NotNil(op) + suite.Equal("remove-orphan-peer", op.Desc()) + suite.Equal(uint64(1), op.Step(0).(operator.RemovePeer).FromStore) +} + +func (suite *ruleCheckerTestSuite) TestFixLeaderRoleWithUnhealthyRegion() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"rule": "follower"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"rule": "follower"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"rule": "leader"}) + suite.ruleManager.SetRuleGroup(&placement.RuleGroup{ + ID: "cluster", + Index: 2, + Override: true, + }) + err := suite.ruleManager.SetRules([]*placement.Rule{ + { + GroupID: "cluster", + ID: "r1", + Index: 100, + Role: placement.Follower, + Count: 2, + LabelConstraints: []placement.LabelConstraint{ + {Key: "rule", Op: "in", Values: []string{"follower"}}, + }, + }, + { + GroupID: "cluster", + ID: "r2", + Index: 100, + Role: placement.Leader, + Count: 1, + LabelConstraints: []placement.LabelConstraint{ + {Key: "rule", Op: "in", Values: []string{"leader"}}, + }, + }, + }) + suite.NoError(err) + // no Leader + suite.cluster.AddNoLeaderRegion(1, 1, 2, 3) + r := suite.cluster.GetRegion(1) + op := suite.rc.Check(r) + suite.Nil(op) +} + +func (suite *ruleCheckerTestSuite) TestFixRuleWitness() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"A": "leader"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"B": "follower"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"C": "voter"}) + suite.cluster.AddLeaderRegion(1, 1) + + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "r1", + Index: 100, + Override: true, + Role: placement.Voter, + Count: 1, + IsWitness: true, + LabelConstraints: []placement.LabelConstraint{ + {Key: "C", Op: "in", Values: []string{"voter"}}, + }, + }) + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.NotNil(op) + suite.Equal("add-rule-peer", op.Desc()) + suite.Equal(uint64(3), op.Step(0).(operator.AddLearner).ToStore) + suite.True(op.Step(0).(operator.AddLearner).IsWitness) +} + +func (suite *ruleCheckerTestSuite) TestFixRuleWitness2() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"A": "leader"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"B": "voter"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"C": "voter"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"D": "voter"}) + suite.cluster.AddLeaderRegion(1, 1, 2, 3, 4) + + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "r1", + Index: 100, + Override: false, + Role: placement.Voter, + Count: 1, + IsWitness: true, + LabelConstraints: []placement.LabelConstraint{ + {Key: "D", Op: "in", Values: []string{"voter"}}, + }, + }) + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.NotNil(op) + suite.Equal("fix-witness-peer", op.Desc()) + suite.Equal(uint64(4), op.Step(0).(operator.BecomeWitness).StoreID) +} + +func (suite *ruleCheckerTestSuite) TestFixRuleWitness3() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"A": "leader"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"B": "voter"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"C": "voter"}) + suite.cluster.AddLeaderRegion(1, 1, 2, 3) + + r := suite.cluster.GetRegion(1) + // set peer3 to witness + r = r.Clone(core.WithWitnesses([]*metapb.Peer{r.GetPeer(3)})) + suite.cluster.PutRegion(r) + op := suite.rc.Check(r) + suite.NotNil(op) + suite.Equal("fix-non-witness-peer", op.Desc()) + suite.Equal(uint64(3), op.Step(0).(operator.RemovePeer).FromStore) + suite.Equal(uint64(3), op.Step(1).(operator.AddLearner).ToStore) +} + +func (suite *ruleCheckerTestSuite) TestFixRuleWitness4() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"A": "leader"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"B": "voter"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"C": "learner"}) + suite.cluster.AddLeaderRegion(1, 1, 2, 3) + + r := suite.cluster.GetRegion(1) + // set peer3 to witness learner + r = r.Clone(core.WithLearners([]*metapb.Peer{r.GetPeer(3)})) + r = r.Clone(core.WithWitnesses([]*metapb.Peer{r.GetPeer(3)})) + + err := suite.ruleManager.SetRules([]*placement.Rule{ + { + GroupID: "pd", + ID: "default", + Index: 100, + Override: true, + Role: placement.Voter, + Count: 2, + IsWitness: false, + }, + { + GroupID: "pd", + ID: "r1", + Index: 100, + Override: false, + Role: placement.Learner, + Count: 1, + IsWitness: false, + LabelConstraints: []placement.LabelConstraint{ + {Key: "C", Op: "in", Values: []string{"learner"}}, + }, + }, + }) + suite.NoError(err) + + op := suite.rc.Check(r) + suite.NotNil(op) + suite.Equal("fix-non-witness-peer", op.Desc()) + suite.Equal(uint64(3), op.Step(0).(operator.BecomeNonWitness).StoreID) +} + +func (suite *ruleCheckerTestSuite) TestFixRuleWitness5() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"A": "leader"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"B": "voter"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"C": "voter"}) + suite.cluster.AddLeaderRegion(1, 1, 2, 3) + + err := suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "r1", + Index: 100, + Override: true, + Role: placement.Voter, + Count: 2, + IsWitness: true, + LabelConstraints: []placement.LabelConstraint{ + {Key: "A", Op: "In", Values: []string{"leader"}}, + }, + }) + suite.Error(err) + suite.Equal(errs.ErrRuleContent.FastGenByArgs(fmt.Sprintf("define too many witness by count %d", 2)).Error(), err.Error()) +} + +func (suite *ruleCheckerTestSuite) TestFixRuleWitness6() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"A": "leader"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"B": "voter"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"C": "voter"}) + suite.cluster.AddLeaderRegion(1, 1, 2, 3) + + err := suite.ruleManager.SetRules([]*placement.Rule{ + { + GroupID: "pd", + ID: "default", + Index: 100, + Role: placement.Voter, + IsWitness: false, + Count: 2, + }, + { + GroupID: "pd", + ID: "r1", + Index: 100, + Role: placement.Voter, + Count: 1, + IsWitness: true, + LabelConstraints: []placement.LabelConstraint{ + {Key: "C", Op: "in", Values: []string{"voter"}}, + }, + }, + }) + suite.NoError(err) + + suite.rc.RecordRegionPromoteToNonWitness(1) + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Nil(op) + + suite.rc.switchWitnessCache.Remove(1) + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.NotNil(op) +} + +func (suite *ruleCheckerTestSuite) TestDisableWitness() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"A": "leader"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"B": "voter"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"C": "voter"}) + suite.cluster.AddLeaderRegion(1, 1, 2, 3) + + err := suite.ruleManager.SetRules([]*placement.Rule{ + { + GroupID: "pd", + ID: "default", + Index: 100, + Role: placement.Voter, + IsWitness: false, + Count: 2, + }, + { + GroupID: "pd", + ID: "r1", + Index: 100, + Role: placement.Voter, + Count: 1, + IsWitness: true, + LabelConstraints: []placement.LabelConstraint{ + {Key: "C", Op: "in", Values: []string{"voter"}}, + }, + }, + }) + suite.NoError(err) + + r := suite.cluster.GetRegion(1) + r = r.Clone(core.WithWitnesses([]*metapb.Peer{r.GetPeer(3)})) + + op := suite.rc.Check(r) + suite.Nil(op) + + suite.cluster.SetEnableWitness(false) + op = suite.rc.Check(r) + suite.NotNil(op) +} + +func (suite *ruleCheckerTestSuite) TestBetterReplacement() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host1"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host2"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host3"}) + suite.cluster.AddLeaderRegionWithRange(1, "", "", 1, 2, 3) + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "test", + Index: 100, + Override: true, + Role: placement.Voter, + Count: 3, + LocationLabels: []string{"host"}, + }) + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.NotNil(op) + suite.Equal("move-to-better-location", op.Desc()) + suite.Equal(uint64(4), op.Step(0).(operator.AddLearner).ToStore) + suite.cluster.AddLeaderRegionWithRange(1, "", "", 1, 3, 4) + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Nil(op) +} + +func (suite *ruleCheckerTestSuite) TestBetterReplacement2() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"zone": "z1", "host": "host1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"zone": "z1", "host": "host2"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"zone": "z1", "host": "host3"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"zone": "z2", "host": "host1"}) + suite.cluster.AddLeaderRegionWithRange(1, "", "", 1, 2, 3) + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "test", + Index: 100, + Override: true, + Role: placement.Voter, + Count: 3, + LocationLabels: []string{"zone", "host"}, + }) + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.NotNil(op) + suite.Equal("move-to-better-location", op.Desc()) + suite.Equal(uint64(4), op.Step(0).(operator.AddLearner).ToStore) + suite.cluster.AddLeaderRegionWithRange(1, "", "", 1, 3, 4) + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Nil(op) +} + +func (suite *ruleCheckerTestSuite) TestNoBetterReplacement() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host1"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host2"}) + suite.cluster.AddLeaderRegionWithRange(1, "", "", 1, 2, 3) + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "test", + Index: 100, + Override: true, + Role: placement.Voter, + Count: 3, + LocationLabels: []string{"host"}, + }) + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Nil(op) +} + +func (suite *ruleCheckerTestSuite) TestIssue2419() { + suite.cluster.AddLeaderStore(1, 1) + suite.cluster.AddLeaderStore(2, 1) + suite.cluster.AddLeaderStore(3, 1) + suite.cluster.AddLeaderStore(4, 1) + suite.cluster.SetStoreOffline(3) + suite.cluster.AddLeaderRegionWithRange(1, "", "", 1, 2, 3) + r := suite.cluster.GetRegion(1) + r = r.Clone(core.WithAddPeer(&metapb.Peer{Id: 5, StoreId: 4, Role: metapb.PeerRole_Learner})) + op := suite.rc.Check(r) + suite.NotNil(op) + suite.Equal("remove-orphan-peer", op.Desc()) + suite.Equal(uint64(4), op.Step(0).(operator.RemovePeer).FromStore) + + r = r.Clone(core.WithRemoveStorePeer(4)) + op = suite.rc.Check(r) + suite.NotNil(op) + suite.Equal("replace-rule-offline-peer", op.Desc()) + suite.Equal(uint64(4), op.Step(0).(operator.AddLearner).ToStore) + suite.Equal(uint64(4), op.Step(1).(operator.PromoteLearner).ToStore) + suite.Equal(uint64(3), op.Step(2).(operator.RemovePeer).FromStore) +} + +// Ref https://github.com/tikv/pd/issues/3521 https://github.com/tikv/pd/issues/5786 +// The problem is when offline a store, we may add learner multiple times if +// the operator is timeout. +func (suite *ruleCheckerTestSuite) TestPriorityFixOrphanPeer() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host1"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host2"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4"}) + suite.cluster.AddLabelsStore(5, 1, map[string]string{"host": "host5"}) + suite.cluster.AddLeaderRegionWithRange(1, "", "", 1, 2, 3) + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Nil(op) + var add operator.AddLearner + var remove operator.RemovePeer + // Ref 5786 + originRegion := suite.cluster.GetRegion(1) + learner4 := &metapb.Peer{Id: 114, StoreId: 4, Role: metapb.PeerRole_Learner} + testRegion := originRegion.Clone( + core.WithAddPeer(learner4), + core.WithAddPeer(&metapb.Peer{Id: 115, StoreId: 5, Role: metapb.PeerRole_Learner}), + core.WithPendingPeers([]*metapb.Peer{originRegion.GetStorePeer(2), learner4}), + ) + suite.cluster.PutRegion(testRegion) + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.NotNil(op) + suite.Equal("remove-orphan-peer", op.Desc()) + suite.IsType(remove, op.Step(0)) + // Ref #3521 + suite.cluster.SetStoreOffline(2) + suite.cluster.PutRegion(originRegion) + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.NotNil(op) + suite.IsType(add, op.Step(0)) + suite.Equal("replace-rule-offline-peer", op.Desc()) + testRegion = suite.cluster.GetRegion(1).Clone(core.WithAddPeer( + &metapb.Peer{ + Id: 125, + StoreId: 4, + Role: metapb.PeerRole_Learner, + })) + suite.cluster.PutRegion(testRegion) + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.IsType(remove, op.Step(0)) + suite.Equal("remove-orphan-peer", op.Desc()) +} + +func (suite *ruleCheckerTestSuite) TestPriorityFitHealthWithDifferentRole1() { + suite.cluster.SetEnableUseJointConsensus(true) + suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4"}) + suite.cluster.AddRegionWithLearner(1, 1, []uint64{2, 3}, []uint64{4}) + r1 := suite.cluster.GetRegion(1) + suite.cluster.GetStore(3).GetMeta().LastHeartbeat = time.Now().Add(-31 * time.Minute).UnixNano() + + // set peer3 to pending and down + r1 = r1.Clone(core.WithPendingPeers([]*metapb.Peer{r1.GetPeer(3)})) + r1 = r1.Clone(core.WithDownPeers([]*pdpb.PeerStats{ + { + Peer: r1.GetStorePeer(3), + DownSeconds: 30000, + }, + })) + suite.cluster.PutRegion(r1) + + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Equal(uint64(3), op.Step(0).(operator.ChangePeerV2Enter).DemoteVoters[0].ToStore) + suite.Equal(uint64(4), op.Step(0).(operator.ChangePeerV2Enter).PromoteLearners[0].ToStore) + suite.Equal(uint64(3), op.Step(1).(operator.ChangePeerV2Leave).DemoteVoters[0].ToStore) + suite.Equal(uint64(4), op.Step(1).(operator.ChangePeerV2Leave).PromoteLearners[0].ToStore) + suite.Equal("replace-down-peer-with-orphan-peer", op.Desc()) + + // set peer3 only pending + r1 = r1.Clone(core.WithDownPeers(nil)) + suite.cluster.PutRegion(r1) + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Nil(op) +} + +func (suite *ruleCheckerTestSuite) TestPriorityFitHealthWithDifferentRole2() { + suite.cluster.SetEnableUseJointConsensus(true) + suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4"}) + suite.cluster.AddLabelsStore(5, 1, map[string]string{"host": "host5"}) + suite.cluster.AddLeaderRegion(1, 1, 2, 3, 4, 5) + r1 := suite.cluster.GetRegion(1) + + // set peer3 to pending and down, and peer 3 to learner, and store 3 is down + suite.cluster.GetStore(3).GetMeta().LastHeartbeat = time.Now().Add(-31 * time.Minute).UnixNano() + r1 = r1.Clone(core.WithLearners([]*metapb.Peer{r1.GetPeer(3)})) + r1 = r1.Clone( + core.WithPendingPeers([]*metapb.Peer{r1.GetPeer(3)}), + core.WithDownPeers([]*pdpb.PeerStats{ + { + Peer: r1.GetStorePeer(3), + DownSeconds: 30000, + }, + }), + ) + suite.cluster.PutRegion(r1) + + // default and test group => 3 voter + 1 learner + err := suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "test", + ID: "10", + Role: placement.Learner, + Count: 1, + }) + suite.NoError(err) + + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Equal(uint64(5), op.Step(0).(operator.ChangePeerV2Enter).DemoteVoters[0].ToStore) + suite.Equal(uint64(3), op.Step(1).(operator.RemovePeer).FromStore) + suite.Equal("replace-down-peer-with-orphan-peer", op.Desc()) +} + +func (suite *ruleCheckerTestSuite) TestPriorityFitHealthPeersAndTiFlash() { + suite.cluster.SetEnableUseJointConsensus(true) + suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4", "engine": "tiflash"}) + suite.cluster.AddRegionWithLearner(1, 1, []uint64{2, 3}, []uint64{4}) + rule := &placement.Rule{ + GroupID: "pd", + ID: "test", + Role: placement.Voter, + Count: 3, + } + rule2 := &placement.Rule{ + GroupID: "pd", + ID: "test2", + Role: placement.Learner, + Count: 1, + LabelConstraints: []placement.LabelConstraint{ + { + Key: "engine", + Op: placement.In, + Values: []string{"tiflash"}, + }, + }, + } + suite.ruleManager.SetRule(rule) + suite.ruleManager.SetRule(rule2) + suite.ruleManager.DeleteRule("pd", "default") + + r1 := suite.cluster.GetRegion(1) + // set peer3 to pending and down + r1 = r1.Clone(core.WithPendingPeers([]*metapb.Peer{r1.GetPeer(3)})) + r1 = r1.Clone(core.WithDownPeers([]*pdpb.PeerStats{ + { + Peer: r1.GetStorePeer(3), + DownSeconds: 30000, + }, + })) + suite.cluster.PutRegion(r1) + suite.cluster.GetStore(3).GetMeta().LastHeartbeat = time.Now().Add(-31 * time.Minute).UnixNano() + + op := suite.rc.Check(suite.cluster.GetRegion(1)) + // should not promote tiflash peer + suite.Nil(op) + + // scale a node, can replace the down peer + suite.cluster.AddLabelsStore(5, 1, map[string]string{"host": "host5"}) + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.NotNil(op) + suite.Equal("fast-replace-rule-down-peer", op.Desc()) +} + +func (suite *ruleCheckerTestSuite) TestIssue3293() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host1"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host2"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4"}) + suite.cluster.AddLabelsStore(5, 1, map[string]string{"host": "host5"}) + suite.cluster.AddLeaderRegionWithRange(1, "", "", 1, 2) + err := suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "TiDB_DDL_51", + ID: "0", + Role: placement.Follower, + Count: 1, + LabelConstraints: []placement.LabelConstraint{ + { + Key: "host", + Values: []string{ + "host5", + }, + Op: placement.In, + }, + }, + }) + suite.NoError(err) + suite.cluster.DeleteStore(suite.cluster.GetStore(5)) + err = suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "TiDB_DDL_51", + ID: "default", + Role: placement.Voter, + Count: 3, + }) + suite.NoError(err) + err = suite.ruleManager.DeleteRule("pd", "default") + suite.NoError(err) + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.NotNil(op) + suite.Equal("add-rule-peer", op.Desc()) +} + +func (suite *ruleCheckerTestSuite) TestIssue3299() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"dc": "sh"}) + suite.cluster.AddLeaderRegionWithRange(1, "", "", 1, 2) + + testCases := []struct { + constraints []placement.LabelConstraint + err string + }{ + { + constraints: []placement.LabelConstraint{ + { + Key: "host", + Values: []string{"host5"}, + Op: placement.In, + }, + }, + err: ".*can not match any store", + }, + { + constraints: []placement.LabelConstraint{ + { + Key: "ho", + Values: []string{"sh"}, + Op: placement.In, + }, + }, + err: ".*can not match any store", + }, + { + constraints: []placement.LabelConstraint{ + { + Key: "host", + Values: []string{"host1"}, + Op: placement.In, + }, + { + Key: "host", + Values: []string{"host1"}, + Op: placement.NotIn, + }, + }, + err: ".*can not match any store", + }, + { + constraints: []placement.LabelConstraint{ + { + Key: "host", + Values: []string{"host1"}, + Op: placement.In, + }, + { + Key: "host", + Values: []string{"host3"}, + Op: placement.In, + }, + }, + err: ".*can not match any store", + }, + { + constraints: []placement.LabelConstraint{ + { + Key: "host", + Values: []string{"host1"}, + Op: placement.In, + }, + { + Key: "host", + Values: []string{"host1"}, + Op: placement.In, + }, + }, + err: "", + }, + } + + for _, testCase := range testCases { + err := suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "p", + ID: "0", + Role: placement.Follower, + Count: 1, + LabelConstraints: testCase.constraints, + }) + if testCase.err != "" { + suite.Regexp(testCase.err, err.Error()) + } else { + suite.NoError(err) + } + } +} + +// See issue: https://github.com/tikv/pd/issues/3705 +func (suite *ruleCheckerTestSuite) TestFixDownPeer() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"zone": "z1"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"zone": "z2"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"zone": "z3"}) + suite.cluster.AddLabelsStore(5, 1, map[string]string{"zone": "z3"}) + suite.cluster.AddLeaderRegion(1, 1, 3, 4) + rule := &placement.Rule{ + GroupID: "pd", + ID: "test", + Index: 100, + Override: true, + Role: placement.Voter, + Count: 3, + LocationLabels: []string{"zone"}, + } + suite.ruleManager.SetRule(rule) + + region := suite.cluster.GetRegion(1) + suite.Nil(suite.rc.Check(region)) + + suite.cluster.SetStoreDown(4) + region = region.Clone(core.WithDownPeers([]*pdpb.PeerStats{ + {Peer: region.GetStorePeer(4), DownSeconds: 6000}, + })) + operatorutil.CheckTransferPeer(suite.Require(), suite.rc.Check(region), operator.OpRegion, 4, 5) + + suite.cluster.SetStoreDown(5) + operatorutil.CheckTransferPeer(suite.Require(), suite.rc.Check(region), operator.OpRegion, 4, 2) + + rule.IsolationLevel = "zone" + suite.ruleManager.SetRule(rule) + suite.Nil(suite.rc.Check(region)) +} + +func (suite *ruleCheckerTestSuite) TestFixDownPeerWithNoWitness() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"zone": "z2"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"zone": "z3"}) + suite.cluster.AddLeaderRegion(1, 1, 2, 3) + + suite.cluster.SetStoreDown(2) + suite.cluster.GetStore(2).GetMeta().LastHeartbeat = time.Now().Add(-11 * time.Minute).UnixNano() + r := suite.cluster.GetRegion(1) + // set peer2 to down + r = r.Clone(core.WithDownPeers([]*pdpb.PeerStats{{Peer: r.GetStorePeer(2), DownSeconds: 600}})) + suite.Nil(suite.rc.Check(r)) +} + +func (suite *ruleCheckerTestSuite) TestFixDownWitnessPeer() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"zone": "z2"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"zone": "z3"}) + suite.cluster.AddLeaderRegion(1, 1, 2, 3) + + suite.cluster.SetStoreDown(2) + suite.cluster.GetStore(2).GetMeta().LastHeartbeat = time.Now().Add(-11 * time.Minute).UnixNano() + r := suite.cluster.GetRegion(1) + // set peer2 to down + r = r.Clone(core.WithDownPeers([]*pdpb.PeerStats{{Peer: r.GetStorePeer(2), DownSeconds: 600}})) + // set peer2 to witness + r = r.Clone(core.WithWitnesses([]*metapb.Peer{r.GetPeer(2)})) + + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "default", + Role: placement.Voter, + Count: 2, + }) + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "r1", + Role: placement.Voter, + Count: 1, + IsWitness: true, + }) + suite.Nil(suite.rc.Check(r)) + + suite.cluster.GetStore(2).GetMeta().LastHeartbeat = time.Now().Add(-31 * time.Minute).UnixNano() + suite.Nil(suite.rc.Check(r)) +} + +func (suite *ruleCheckerTestSuite) TestFixDownPeerWithAvailableWitness() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"zone": "z2"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"zone": "z3"}) + suite.cluster.AddLeaderRegion(1, 1, 2, 3) + + suite.cluster.SetStoreDown(2) + suite.cluster.GetStore(2).GetMeta().LastHeartbeat = time.Now().Add(-11 * time.Minute).UnixNano() + r := suite.cluster.GetRegion(1) + // set peer2 to down + r = r.Clone(core.WithDownPeers([]*pdpb.PeerStats{{Peer: r.GetStorePeer(2), DownSeconds: 600}})) + // set peer3 to witness + r = r.Clone(core.WithWitnesses([]*metapb.Peer{r.GetPeer(3)})) + + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "default", + Role: placement.Voter, + Count: 2, + }) + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "r1", + Role: placement.Voter, + Count: 1, + IsWitness: true, + }) + + op := suite.rc.Check(r) + + suite.NotNil(op) + suite.Equal("promote-witness-for-down", op.Desc()) + suite.Equal(uint64(3), op.Step(0).(operator.RemovePeer).FromStore) + suite.Equal(uint64(3), op.Step(1).(operator.AddLearner).ToStore) + suite.Equal(uint64(3), op.Step(2).(operator.BecomeNonWitness).StoreID) + suite.Equal(uint64(3), op.Step(3).(operator.PromoteLearner).ToStore) +} + +func (suite *ruleCheckerTestSuite) TestFixDownPeerWithAvailableWitness2() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"zone": "z2"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"zone": "z3"}) + suite.cluster.AddLeaderRegion(1, 1, 2, 3) + + suite.cluster.SetStoreDown(2) + suite.cluster.GetStore(2).GetMeta().LastHeartbeat = time.Now().Add(-31 * time.Minute).UnixNano() + r := suite.cluster.GetRegion(1) + // set peer2 to down + r = r.Clone(core.WithDownPeers([]*pdpb.PeerStats{{Peer: r.GetStorePeer(2), DownSeconds: 6000}})) + // set peer3 to witness + r = r.Clone(core.WithWitnesses([]*metapb.Peer{r.GetPeer(3)})) + + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "default", + Role: placement.Voter, + Count: 2, + }) + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "r1", + Role: placement.Voter, + Count: 1, + IsWitness: true, + }) + + op := suite.rc.Check(r) + + suite.Nil(op) +} + +func (suite *ruleCheckerTestSuite) TestFixDownPeerWithAvailableWitness3() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"zone": "z2"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"zone": "z3"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"zone": "z3"}) + suite.cluster.AddLeaderRegion(1, 1, 2, 3) + + suite.cluster.SetStoreDown(2) + suite.cluster.GetStore(2).GetMeta().LastHeartbeat = time.Now().Add(-31 * time.Minute).UnixNano() + r := suite.cluster.GetRegion(1) + // set peer2 to down + r = r.Clone(core.WithDownPeers([]*pdpb.PeerStats{{Peer: r.GetStorePeer(2), DownSeconds: 6000}})) + // set peer3 to witness + r = r.Clone(core.WithWitnesses([]*metapb.Peer{r.GetPeer(3)})) + + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "default", + Role: placement.Voter, + Count: 2, + }) + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "r1", + Role: placement.Voter, + Count: 1, + IsWitness: true, + }) + + op := suite.rc.Check(r) + + suite.NotNil(op) + suite.Equal("fast-replace-rule-down-peer", op.Desc()) + suite.Equal(uint64(4), op.Step(0).(operator.AddLearner).ToStore) + suite.True(op.Step(0).(operator.AddLearner).IsWitness) + suite.Equal(uint64(4), op.Step(1).(operator.PromoteLearner).ToStore) + suite.True(op.Step(1).(operator.PromoteLearner).IsWitness) + suite.Equal(uint64(2), op.Step(2).(operator.RemovePeer).FromStore) +} + +func (suite *ruleCheckerTestSuite) TestFixDownPeerWithAvailableWitness4() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"zone": "z2"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"zone": "z3"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"zone": "z3"}) + suite.cluster.AddLeaderRegion(1, 1, 2, 3) + + suite.cluster.SetStoreDown(2) + suite.cluster.GetStore(2).GetMeta().LastHeartbeat = time.Now().Add(-31 * time.Minute).UnixNano() + r := suite.cluster.GetRegion(1) + // set peer2 to down + r = r.Clone(core.WithDownPeers([]*pdpb.PeerStats{{Peer: r.GetStorePeer(2), DownSeconds: 6000}})) + + op := suite.rc.Check(r) + + suite.NotNil(op) + suite.Equal("fast-replace-rule-down-peer", op.Desc()) + suite.Equal(uint64(4), op.Step(0).(operator.AddLearner).ToStore) + suite.True(op.Step(0).(operator.AddLearner).IsWitness) + suite.Equal(uint64(4), op.Step(1).(operator.PromoteLearner).ToStore) + suite.True(op.Step(1).(operator.PromoteLearner).IsWitness) + suite.Equal(uint64(2), op.Step(2).(operator.RemovePeer).FromStore) +} + +// See issue: https://github.com/tikv/pd/issues/3705 +func (suite *ruleCheckerTestSuite) TestFixOfflinePeer() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"zone": "z1"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"zone": "z2"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"zone": "z3"}) + suite.cluster.AddLabelsStore(5, 1, map[string]string{"zone": "z3"}) + suite.cluster.AddLeaderRegion(1, 1, 3, 4) + rule := &placement.Rule{ + GroupID: "pd", + ID: "test", + Index: 100, + Override: true, + Role: placement.Voter, + Count: 3, + LocationLabels: []string{"zone"}, + } + suite.ruleManager.SetRule(rule) + + region := suite.cluster.GetRegion(1) + suite.Nil(suite.rc.Check(region)) + + suite.cluster.SetStoreOffline(4) + operatorutil.CheckTransferPeer(suite.Require(), suite.rc.Check(region), operator.OpRegion, 4, 5) + + suite.cluster.SetStoreOffline(5) + operatorutil.CheckTransferPeer(suite.Require(), suite.rc.Check(region), operator.OpRegion, 4, 2) + + rule.IsolationLevel = "zone" + suite.ruleManager.SetRule(rule) + suite.Nil(suite.rc.Check(region)) +} + +func (suite *ruleCheckerTestSuite) TestFixOfflinePeerWithAvaliableWitness() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"zone": "z1"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"zone": "z2"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"zone": "z3"}) + suite.cluster.AddLabelsStore(5, 1, map[string]string{"zone": "z3"}) + suite.cluster.AddLeaderRegion(1, 1, 3, 4) + + r := suite.cluster.GetRegion(1) + r = r.Clone(core.WithWitnesses([]*metapb.Peer{r.GetPeer(2)})) + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "default", + Role: placement.Voter, + Count: 2, + }) + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "r1", + Role: placement.Voter, + Count: 1, + IsWitness: true, + }) + suite.Nil(suite.rc.Check(r)) + + suite.cluster.SetStoreOffline(4) + op := suite.rc.Check(r) + suite.NotNil(op) + suite.Equal("replace-rule-offline-peer", op.Desc()) +} + +func (suite *ruleCheckerTestSuite) TestRuleCache() { + suite.cluster.PersistOptions.SetPlacementRulesCacheEnabled(true) + suite.cluster.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"zone": "z1"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"zone": "z2"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"zone": "z3"}) + suite.cluster.AddLabelsStore(5, 1, map[string]string{"zone": "z3"}) + suite.cluster.AddRegionStore(999, 1) + suite.cluster.AddLeaderRegion(1, 1, 3, 4) + rule := &placement.Rule{ + GroupID: "pd", + ID: "test", + Index: 100, + Override: true, + Role: placement.Voter, + Count: 3, + LocationLabels: []string{"zone"}, + } + suite.ruleManager.SetRule(rule) + region := suite.cluster.GetRegion(1) + region = region.Clone(core.WithIncConfVer(), core.WithIncVersion()) + suite.Nil(suite.rc.Check(region)) + + testCases := []struct { + name string + region *core.RegionInfo + stillCached bool + }{ + { + name: "default", + region: region, + stillCached: true, + }, + { + name: "region topo changed", + region: func() *core.RegionInfo { + return region.Clone(core.WithAddPeer(&metapb.Peer{ + Id: 999, + StoreId: 999, + Role: metapb.PeerRole_Voter, + }), core.WithIncConfVer()) + }(), + stillCached: false, + }, + { + name: "region leader changed", + region: region.Clone( + core.WithLeader(&metapb.Peer{Role: metapb.PeerRole_Voter, Id: 2, StoreId: 3})), + stillCached: false, + }, + { + name: "region have down peers", + region: region.Clone(core.WithDownPeers([]*pdpb.PeerStats{ + { + Peer: region.GetPeer(3), + DownSeconds: 42, + }, + })), + stillCached: false, + }, + } + for _, testCase := range testCases { + suite.T().Log(testCase.name) + if testCase.stillCached { + suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/checker/assertShouldCache", "return(true)")) + suite.rc.Check(testCase.region) + suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/checker/assertShouldCache")) + } else { + suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/checker/assertShouldNotCache", "return(true)")) + suite.rc.Check(testCase.region) + suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/checker/assertShouldNotCache")) + } + } +} + +// Ref https://github.com/tikv/pd/issues/4045 +func (suite *ruleCheckerTestSuite) TestSkipFixOrphanPeerIfSelectedPeerisPendingOrDown() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host1"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host2"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4"}) + suite.cluster.AddLeaderRegionWithRange(1, "", "", 1, 2, 3, 4) + + // set peer3 and peer4 to pending + r1 := suite.cluster.GetRegion(1) + r1 = r1.Clone(core.WithPendingPeers([]*metapb.Peer{r1.GetStorePeer(3), r1.GetStorePeer(4)})) + suite.cluster.PutRegion(r1) + + // should not remove extra peer + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Nil(op) + + // set peer3 to down-peer + r1 = r1.Clone(core.WithPendingPeers([]*metapb.Peer{r1.GetStorePeer(4)})) + r1 = r1.Clone(core.WithDownPeers([]*pdpb.PeerStats{ + { + Peer: r1.GetStorePeer(3), + DownSeconds: 42, + }, + })) + suite.cluster.PutRegion(r1) + + // should not remove extra peer + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Nil(op) + + // set peer3 to normal + r1 = r1.Clone(core.WithDownPeers(nil)) + suite.cluster.PutRegion(r1) + + // should remove extra peer now + var remove operator.RemovePeer + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.IsType(remove, op.Step(0)) + suite.Equal("remove-orphan-peer", op.Desc()) +} + +func (suite *ruleCheckerTestSuite) TestPriorityFitHealthPeers() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4"}) + suite.cluster.AddLeaderRegionWithRange(1, "", "", 1, 2, 3, 4) + r1 := suite.cluster.GetRegion(1) + + // set peer3 to pending + r1 = r1.Clone(core.WithPendingPeers([]*metapb.Peer{r1.GetPeer(3)})) + suite.cluster.PutRegion(r1) + + var remove operator.RemovePeer + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.IsType(remove, op.Step(0)) + suite.Equal("remove-orphan-peer", op.Desc()) + + // set peer3 to down + r1 = r1.Clone(core.WithDownPeers([]*pdpb.PeerStats{ + { + Peer: r1.GetStorePeer(3), + DownSeconds: 42, + }, + })) + r1 = r1.Clone(core.WithPendingPeers(nil)) + suite.cluster.PutRegion(r1) + + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.IsType(remove, op.Step(0)) + suite.Equal("remove-orphan-peer", op.Desc()) +} + +// Ref https://github.com/tikv/pd/issues/4140 +func (suite *ruleCheckerTestSuite) TestDemoteVoter() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"zone": "z4"}) + region := suite.cluster.AddLeaderRegion(1, 1, 4) + rule := &placement.Rule{ + GroupID: "pd", + ID: "test", + Role: placement.Voter, + Count: 1, + LabelConstraints: []placement.LabelConstraint{ + { + Key: "zone", + Op: placement.In, + Values: []string{"z1"}, + }, + }, + } + rule2 := &placement.Rule{ + GroupID: "pd", + ID: "test2", + Role: placement.Learner, + Count: 1, + LabelConstraints: []placement.LabelConstraint{ + { + Key: "zone", + Op: placement.In, + Values: []string{"z4"}, + }, + }, + } + suite.ruleManager.SetRule(rule) + suite.ruleManager.SetRule(rule2) + suite.ruleManager.DeleteRule("pd", "default") + op := suite.rc.Check(region) + suite.NotNil(op) + suite.Equal("fix-demote-voter", op.Desc()) +} + +func (suite *ruleCheckerTestSuite) TestOfflineAndDownStore() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"zone": "z4"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"zone": "z1"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"zone": "z4"}) + region := suite.cluster.AddLeaderRegion(1, 1, 2, 3) + op := suite.rc.Check(region) + suite.Nil(op) + // assert rule checker should generate replace offline peer operator after cached + suite.cluster.SetStoreOffline(1) + op = suite.rc.Check(region) + suite.NotNil(op) + suite.Equal("replace-rule-offline-peer", op.Desc()) + // re-cache the regionFit + suite.cluster.SetStoreUp(1) + op = suite.rc.Check(region) + suite.Nil(op) + + // assert rule checker should generate replace down peer operator after cached + suite.cluster.SetStoreDown(2) + region = region.Clone(core.WithDownPeers([]*pdpb.PeerStats{{Peer: region.GetStorePeer(2), DownSeconds: 60000}})) + op = suite.rc.Check(region) + suite.NotNil(op) + suite.Equal("fast-replace-rule-down-peer", op.Desc()) +} + +func (suite *ruleCheckerTestSuite) TestPendingList() { + // no enough store + suite.cluster.AddLeaderStore(1, 1) + suite.cluster.AddLeaderRegionWithRange(1, "", "", 1, 2) + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Nil(op) + _, exist := suite.rc.pendingList.Get(1) + suite.True(exist) + + // add more stores + suite.cluster.AddLeaderStore(2, 1) + suite.cluster.AddLeaderStore(3, 1) + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.NotNil(op) + suite.Equal("add-rule-peer", op.Desc()) + suite.Equal(constant.High, op.GetPriorityLevel()) + suite.Equal(uint64(3), op.Step(0).(operator.AddLearner).ToStore) + _, exist = suite.rc.pendingList.Get(1) + suite.False(exist) +} + +func (suite *ruleCheckerTestSuite) TestLocationLabels() { + suite.cluster.AddLabelsStore(1, 1, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"zone": "z1", "rack": "r2", "host": "h1"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"zone": "z1", "rack": "r2", "host": "h1"}) + suite.cluster.AddLabelsStore(5, 1, map[string]string{"zone": "z2", "rack": "r3", "host": "h2"}) + suite.cluster.AddLabelsStore(6, 1, map[string]string{"zone": "z2", "rack": "r3", "host": "h2"}) + suite.cluster.AddLeaderRegionWithRange(1, "", "", 1, 2, 5) + rule1 := &placement.Rule{ + GroupID: "pd", + ID: "test1", + Role: placement.Leader, + Count: 1, + LabelConstraints: []placement.LabelConstraint{ + { + Key: "zone", + Op: placement.In, + Values: []string{"z1"}, + }, + }, + LocationLabels: []string{"rack"}, + } + rule2 := &placement.Rule{ + GroupID: "pd", + ID: "test2", + Role: placement.Voter, + Count: 1, + LabelConstraints: []placement.LabelConstraint{ + { + Key: "zone", + Op: placement.In, + Values: []string{"z1"}, + }, + }, + LocationLabels: []string{"rack"}, + } + rule3 := &placement.Rule{ + GroupID: "pd", + ID: "test3", + Role: placement.Voter, + Count: 1, + LabelConstraints: []placement.LabelConstraint{ + { + Key: "zone", + Op: placement.In, + Values: []string{"z2"}, + }, + }, + LocationLabels: []string{"rack"}, + } + suite.ruleManager.SetRule(rule1) + suite.ruleManager.SetRule(rule2) + suite.ruleManager.SetRule(rule3) + suite.ruleManager.DeleteRule("pd", "default") + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.NotNil(op) + suite.Equal("move-to-better-location", op.Desc()) +} + +func (suite *ruleCheckerTestSuite) TestTiFlashLocationLabels() { + suite.cluster.SetEnableUseJointConsensus(true) + suite.cluster.AddLabelsStore(1, 1, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"zone": "z1", "rack": "r2", "host": "h1"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"zone": "z1", "rack": "r2", "host": "h1"}) + suite.cluster.AddLabelsStore(5, 1, map[string]string{"zone": "z2", "rack": "r3", "host": "h2"}) + suite.cluster.AddLabelsStore(6, 1, map[string]string{"zone": "z2", "rack": "r3", "host": "h2"}) + suite.cluster.AddLabelsStore(7, 1, map[string]string{"engine": "tiflash"}) + suite.cluster.AddRegionWithLearner(1, 1, []uint64{3, 5}, []uint64{7}) + + rule1 := &placement.Rule{ + GroupID: "tiflash", + ID: "test1", + Role: placement.Learner, + Count: 1, + LabelConstraints: []placement.LabelConstraint{ + { + Key: "engine", + Op: placement.In, + Values: []string{"tiflash"}, + }, + }, + } + suite.ruleManager.SetRule(rule1) + rule := suite.ruleManager.GetRule("pd", "default") + rule.LocationLabels = []string{"zone", "rack", "host"} + suite.ruleManager.SetRule(rule) + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Nil(op) +} diff --git a/pkg/schedule/placement/rule_manager_test.go b/pkg/schedule/placement/rule_manager_test.go new file mode 100644 index 000000000000..a6454337aa84 --- /dev/null +++ b/pkg/schedule/placement/rule_manager_test.go @@ -0,0 +1,498 @@ +// Copyright 2019 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package placement + +import ( + "encoding/hex" + "testing" + + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/codec" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/core/constant" + "github.com/tikv/pd/pkg/mock/mockconfig" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/storage/kv" +) + +func newTestManager(t *testing.T, enableWitness bool) (endpoint.RuleStorage, *RuleManager) { + re := require.New(t) + store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil) + var err error + manager := NewRuleManager(store, nil, mockconfig.NewTestOptions()) + manager.conf.SetEnableWitness(enableWitness) + err = manager.Initialize(3, []string{"zone", "rack", "host"}, "") + re.NoError(err) + return store, manager +} + +func TestDefault(t *testing.T) { + re := require.New(t) + _, manager := newTestManager(t, false) + rules := manager.GetAllRules() + re.Len(rules, 1) + re.Equal("pd", rules[0].GroupID) + re.Equal("default", rules[0].ID) + re.Equal(0, rules[0].Index) + re.Empty(rules[0].StartKey) + re.Empty(rules[0].EndKey) + re.Equal(Voter, rules[0].Role) + re.Equal([]string{"zone", "rack", "host"}, rules[0].LocationLabels) +} + +func TestDefault2(t *testing.T) { + re := require.New(t) + _, manager := newTestManager(t, true) + rules := manager.GetAllRules() + re.Len(rules, 2) + re.Equal("pd", rules[0].GroupID) + re.Equal("default", rules[0].ID) + re.Equal(0, rules[0].Index) + re.Empty(rules[0].StartKey) + re.Empty(rules[0].EndKey) + re.Equal(Voter, rules[0].Role) + re.Equal([]string{"zone", "rack", "host"}, rules[0].LocationLabels) + re.Equal("pd", rules[1].GroupID) + re.Equal("witness", rules[1].ID) + re.Equal(0, rules[1].Index) + re.Empty(rules[1].StartKey) + re.Empty(rules[1].EndKey) + re.Equal(Voter, rules[1].Role) + re.True(rules[1].IsWitness) + re.Equal([]string{"zone", "rack", "host"}, rules[1].LocationLabels) +} + +func TestAdjustRule(t *testing.T) { + re := require.New(t) + _, manager := newTestManager(t, false) + rules := []Rule{ + {GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: "voter", Count: 3}, + {GroupID: "", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: "voter", Count: 3}, + {GroupID: "group", ID: "", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: "voter", Count: 3}, + {GroupID: "group", ID: "id", StartKeyHex: "123ab", EndKeyHex: "123abf", Role: "voter", Count: 3}, + {GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "1123abf", Role: "voter", Count: 3}, + {GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123aaa", Role: "voter", Count: 3}, + {GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: "master", Count: 3}, + {GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: "voter", Count: 0}, + {GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: "voter", Count: -1}, + {GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: "voter", Count: 3, LabelConstraints: []LabelConstraint{{Op: "foo"}}}, + } + re.NoError(manager.adjustRule(&rules[0], "group")) + + re.Equal([]byte{0x12, 0x3a, 0xbc}, rules[0].StartKey) + re.Equal([]byte{0x12, 0x3a, 0xbf}, rules[0].EndKey) + re.Error(manager.adjustRule(&rules[1], "")) + + for i := 2; i < len(rules); i++ { + re.Error(manager.adjustRule(&rules[i], "group")) + } + + manager.SetKeyType(constant.Table.String()) + re.Error(manager.adjustRule(&Rule{GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: "voter", Count: 3}, "group")) + + manager.SetKeyType(constant.Txn.String()) + re.Error(manager.adjustRule(&Rule{GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: "voter", Count: 3}, "group")) + + re.Error(manager.adjustRule(&Rule{ + GroupID: "group", + ID: "id", + StartKeyHex: hex.EncodeToString(codec.EncodeBytes([]byte{0})), + EndKeyHex: "123abf", + Role: "voter", + Count: 3, + }, "group")) + + re.Error(manager.adjustRule(&Rule{ + GroupID: "tiflash", + ID: "id", + StartKeyHex: hex.EncodeToString(codec.EncodeBytes([]byte{0})), + EndKeyHex: hex.EncodeToString(codec.EncodeBytes([]byte{1})), + Role: "learner", + Count: 1, + IsWitness: true, + LabelConstraints: []LabelConstraint{{Key: "engine", Op: "in", Values: []string{"tiflash"}}}, + }, "tiflash")) +} + +func TestLeaderCheck(t *testing.T) { + re := require.New(t) + _, manager := newTestManager(t, false) + re.Regexp(".*needs at least one leader or voter.*", manager.SetRule(&Rule{GroupID: "pd", ID: "default", Role: "learner", Count: 3}).Error()) + re.Regexp(".*define multiple leaders by count 2.*", manager.SetRule(&Rule{GroupID: "g2", ID: "33", Role: "leader", Count: 2}).Error()) + re.Regexp(".*multiple leader replicas.*", manager.Batch([]RuleOp{ + { + Rule: &Rule{GroupID: "g2", ID: "foo1", Role: "leader", Count: 1}, + Action: RuleOpAdd, + }, + { + Rule: &Rule{GroupID: "g2", ID: "foo2", Role: "leader", Count: 1}, + Action: RuleOpAdd, + }, + }).Error()) +} + +func TestSaveLoad(t *testing.T) { + re := require.New(t) + store, manager := newTestManager(t, false) + rules := []*Rule{ + {GroupID: "pd", ID: "default", Role: "voter", Count: 5}, + {GroupID: "foo", ID: "baz", StartKeyHex: "", EndKeyHex: "abcd", Role: "voter", Count: 1}, + {GroupID: "foo", ID: "bar", Role: "learner", Count: 1}, + } + for _, r := range rules { + re.NoError(manager.SetRule(r.Clone())) + } + + m2 := NewRuleManager(store, nil, nil) + err := m2.Initialize(3, []string{"no", "labels"}, "") + re.NoError(err) + re.Len(m2.GetAllRules(), 3) + re.Equal(rules[0].String(), m2.GetRule("pd", "default").String()) + re.Equal(rules[1].String(), m2.GetRule("foo", "baz").String()) + re.Equal(rules[2].String(), m2.GetRule("foo", "bar").String()) +} + +func TestSetAfterGet(t *testing.T) { + re := require.New(t) + store, manager := newTestManager(t, false) + rule := manager.GetRule("pd", "default") + rule.Count = 1 + manager.SetRule(rule) + + m2 := NewRuleManager(store, nil, nil) + err := m2.Initialize(100, []string{}, "") + re.NoError(err) + rule = m2.GetRule("pd", "default") + re.Equal(1, rule.Count) +} + +func checkRules(t *testing.T, rules []*Rule, expect [][2]string) { + re := require.New(t) + re.Len(rules, len(expect)) + for i := range rules { + re.Equal(expect[i], rules[i].Key()) + } +} + +func TestKeys(t *testing.T) { + re := require.New(t) + _, manager := newTestManager(t, false) + rules := []*Rule{ + {GroupID: "1", ID: "1", Role: "voter", Count: 1, StartKeyHex: "", EndKeyHex: ""}, + {GroupID: "2", ID: "2", Role: "voter", Count: 1, StartKeyHex: "11", EndKeyHex: "ff"}, + {GroupID: "2", ID: "3", Role: "voter", Count: 1, StartKeyHex: "22", EndKeyHex: "dd"}, + } + + toDelete := []RuleOp{} + for _, r := range rules { + manager.SetRule(r) + toDelete = append(toDelete, RuleOp{ + Rule: r, + Action: RuleOpDel, + DeleteByIDPrefix: false, + }) + } + checkRules(t, manager.GetAllRules(), [][2]string{{"1", "1"}, {"2", "2"}, {"2", "3"}, {"pd", "default"}}) + manager.Batch(toDelete) + checkRules(t, manager.GetAllRules(), [][2]string{{"pd", "default"}}) + + rules = append(rules, &Rule{GroupID: "3", ID: "4", Role: "voter", Count: 1, StartKeyHex: "44", EndKeyHex: "ee"}, + &Rule{GroupID: "3", ID: "5", Role: "voter", Count: 1, StartKeyHex: "44", EndKeyHex: "dd"}) + manager.SetRules(rules) + checkRules(t, manager.GetAllRules(), [][2]string{{"1", "1"}, {"2", "2"}, {"2", "3"}, {"3", "4"}, {"3", "5"}, {"pd", "default"}}) + + manager.DeleteRule("pd", "default") + checkRules(t, manager.GetAllRules(), [][2]string{{"1", "1"}, {"2", "2"}, {"2", "3"}, {"3", "4"}, {"3", "5"}}) + + splitKeys := [][]string{ + {"", "", "11", "22", "44", "dd", "ee", "ff"}, + {"44", "", "dd", "ee", "ff"}, + {"44", "dd"}, + {"22", "ef", "44", "dd", "ee"}, + } + for _, keys := range splitKeys { + splits := manager.GetSplitKeys(dhex(keys[0]), dhex(keys[1])) + re.Len(splits, len(keys)-2) + for i := range splits { + re.Equal(dhex(keys[i+2]), splits[i]) + } + } + + regionKeys := [][][2]string{ + {{"", ""}}, + {{"aa", "bb"}, {"", ""}, {"11", "ff"}, {"22", "dd"}, {"44", "ee"}, {"44", "dd"}}, + {{"11", "22"}, {"", ""}, {"11", "ff"}}, + {{"11", "33"}}, + } + for _, keys := range regionKeys { + region := core.NewRegionInfo(&metapb.Region{StartKey: dhex(keys[0][0]), EndKey: dhex(keys[0][1])}, nil) + rules := manager.GetRulesForApplyRegion(region) + re.Len(rules, len(keys)-1) + for i := range rules { + re.Equal(keys[i+1][0], rules[i].StartKeyHex) + re.Equal(keys[i+1][1], rules[i].EndKeyHex) + } + } + + ruleByKeys := [][]string{ // first is query key, rests are rule keys. + {"", "", ""}, + {"11", "", "", "11", "ff"}, + {"33", "", "", "11", "ff", "22", "dd"}, + } + for _, keys := range ruleByKeys { + rules := manager.GetRulesByKey(dhex(keys[0])) + re.Len(rules, (len(keys)-1)/2) + for i := range rules { + re.Equal(keys[i*2+1], rules[i].StartKeyHex) + re.Equal(keys[i*2+2], rules[i].EndKeyHex) + } + } + + rulesByGroup := [][]string{ // first is group, rests are rule keys. + {"1", "", ""}, + {"2", "11", "ff", "22", "dd"}, + {"3", "44", "ee", "44", "dd"}, + {"4"}, + } + for _, keys := range rulesByGroup { + rules := manager.GetRulesByGroup(keys[0]) + re.Len(rules, (len(keys)-1)/2) + for i := range rules { + re.Equal(keys[i*2+1], rules[i].StartKeyHex) + re.Equal(keys[i*2+2], rules[i].EndKeyHex) + } + } +} + +func TestDeleteByIDPrefix(t *testing.T) { + _, manager := newTestManager(t, false) + manager.SetRules([]*Rule{ + {GroupID: "g1", ID: "foo1", Role: "voter", Count: 1}, + {GroupID: "g2", ID: "foo1", Role: "voter", Count: 1}, + {GroupID: "g2", ID: "foobar", Role: "voter", Count: 1}, + {GroupID: "g2", ID: "baz2", Role: "voter", Count: 1}, + }) + manager.DeleteRule("pd", "default") + checkRules(t, manager.GetAllRules(), [][2]string{{"g1", "foo1"}, {"g2", "baz2"}, {"g2", "foo1"}, {"g2", "foobar"}}) + + manager.Batch([]RuleOp{{ + Rule: &Rule{GroupID: "g2", ID: "foo"}, + Action: RuleOpDel, + DeleteByIDPrefix: true, + }}) + checkRules(t, manager.GetAllRules(), [][2]string{{"g1", "foo1"}, {"g2", "baz2"}}) +} + +func TestRangeGap(t *testing.T) { + re := require.New(t) + _, manager := newTestManager(t, false) + err := manager.DeleteRule("pd", "default") + re.Error(err) + + err = manager.SetRule(&Rule{GroupID: "pd", ID: "foo", StartKeyHex: "", EndKeyHex: "abcd", Role: "voter", Count: 1}) + re.NoError(err) + // |-- default --| + // |-- foo --| + // still cannot delete default since it will cause ("abcd", "") has no rules inside. + err = manager.DeleteRule("pd", "default") + re.Error(err) + err = manager.SetRule(&Rule{GroupID: "pd", ID: "bar", StartKeyHex: "abcd", EndKeyHex: "", Role: "voter", Count: 1}) + re.NoError(err) + // now default can be deleted. + err = manager.DeleteRule("pd", "default") + re.NoError(err) + // cannot change range since it will cause ("abaa", "abcd") has no rules inside. + err = manager.SetRule(&Rule{GroupID: "pd", ID: "foo", StartKeyHex: "", EndKeyHex: "abaa", Role: "voter", Count: 1}) + re.Error(err) +} + +func TestGroupConfig(t *testing.T) { + re := require.New(t) + _, manager := newTestManager(t, false) + pd1 := &RuleGroup{ID: "pd"} + re.Equal(pd1, manager.GetRuleGroup("pd")) + + // update group pd + pd2 := &RuleGroup{ID: "pd", Index: 100, Override: true} + err := manager.SetRuleGroup(pd2) + re.NoError(err) + re.Equal(pd2, manager.GetRuleGroup("pd")) + + // new group g without config + err = manager.SetRule(&Rule{GroupID: "g", ID: "1", Role: "voter", Count: 1}) + re.NoError(err) + g1 := &RuleGroup{ID: "g"} + re.Equal(g1, manager.GetRuleGroup("g")) + re.Equal([]*RuleGroup{g1, pd2}, manager.GetRuleGroups()) + + // update group g + g2 := &RuleGroup{ID: "g", Index: 2, Override: true} + err = manager.SetRuleGroup(g2) + re.NoError(err) + re.Equal([]*RuleGroup{g2, pd2}, manager.GetRuleGroups()) + + // delete pd group, restore to default config + err = manager.DeleteRuleGroup("pd") + re.NoError(err) + re.Equal([]*RuleGroup{pd1, g2}, manager.GetRuleGroups()) + + // delete rule, the group is removed too + err = manager.DeleteRule("pd", "default") + re.NoError(err) + re.Equal([]*RuleGroup{g2}, manager.GetRuleGroups()) +} + +func TestRuleVersion(t *testing.T) { + re := require.New(t) + _, manager := newTestManager(t, false) + rule1 := manager.GetRule("pd", "default") + re.Equal(uint64(0), rule1.Version) + // create new rule + newRule := &Rule{GroupID: "g1", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: "voter", Count: 3} + err := manager.SetRule(newRule) + re.NoError(err) + newRule = manager.GetRule("g1", "id") + re.Equal(uint64(0), newRule.Version) + // update rule + newRule = &Rule{GroupID: "g1", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: "voter", Count: 2} + err = manager.SetRule(newRule) + re.NoError(err) + newRule = manager.GetRule("g1", "id") + re.Equal(uint64(1), newRule.Version) + // delete rule + err = manager.DeleteRule("g1", "id") + re.NoError(err) + // recreate new rule + err = manager.SetRule(newRule) + re.NoError(err) + // assert version should be 0 again + newRule = manager.GetRule("g1", "id") + re.Equal(uint64(0), newRule.Version) +} + +func TestCheckApplyRules(t *testing.T) { + re := require.New(t) + err := checkApplyRules([]*Rule{ + { + Role: Leader, + Count: 1, + }, + }) + re.NoError(err) + + err = checkApplyRules([]*Rule{ + { + Role: Voter, + Count: 1, + }, + }) + re.NoError(err) + + err = checkApplyRules([]*Rule{ + { + Role: Leader, + Count: 1, + }, + { + Role: Voter, + Count: 1, + }, + }) + re.NoError(err) + + err = checkApplyRules([]*Rule{ + { + Role: Leader, + Count: 3, + }, + }) + re.Regexp("multiple leader replicas", err.Error()) + + err = checkApplyRules([]*Rule{ + { + Role: Leader, + Count: 1, + }, + { + Role: Leader, + Count: 1, + }, + }) + re.Regexp("multiple leader replicas", err.Error()) + + err = checkApplyRules([]*Rule{ + { + Role: Learner, + Count: 1, + }, + { + Role: Follower, + Count: 1, + }, + }) + re.Regexp("needs at least one leader or voter", err.Error()) +} + +func TestCacheManager(t *testing.T) { + re := require.New(t) + _, manager := newTestManager(t, false) + manager.conf.SetPlacementRulesCacheEnabled(true) + rules := addExtraRules(0) + re.NoError(manager.SetRules(rules)) + stores := makeStores() + + regionMeta := &metapb.Region{ + Id: 1, + StartKey: []byte(""), + EndKey: []byte(""), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 0, Version: 0}, + Peers: []*metapb.Peer{ + {Id: 11, StoreId: 1111, Role: metapb.PeerRole_Voter}, + {Id: 12, StoreId: 2111, Role: metapb.PeerRole_Voter}, + {Id: 13, StoreId: 3111, Role: metapb.PeerRole_Voter}, + }, + } + region := core.NewRegionInfo(regionMeta, regionMeta.Peers[0]) + fit := manager.FitRegion(stores, region) + manager.SetRegionFitCache(region, fit) + // bestFit is not stored when the total number of hits is insufficient. + for i := 1; i < minHitCountToCacheHit/2; i++ { + manager.FitRegion(stores, region) + re.True(manager.IsRegionFitCached(stores, region)) + cache := manager.cache.regionCaches[1] + re.Equal(uint32(i), cache.hitCount) + re.Nil(cache.bestFit) + } + // Store bestFit when the total number of hits is sufficient. + for i := 0; i < minHitCountToCacheHit; i++ { + manager.FitRegion(stores, region) + } + cache := manager.cache.regionCaches[1] + re.Equal(uint32(minHitCountToCacheHit), cache.hitCount) + re.NotNil(cache.bestFit) + // Cache invalidation after change + regionMeta.Peers[2] = &metapb.Peer{Id: 14, StoreId: 4111, Role: metapb.PeerRole_Voter} + region = core.NewRegionInfo(regionMeta, regionMeta.Peers[0]) + re.False(manager.IsRegionFitCached(stores, region)) +} + +func dhex(hk string) []byte { + k, err := hex.DecodeString(hk) + if err != nil { + panic("decode fail") + } + return k +} diff --git a/server/api/operator_test.go b/server/api/operator_test.go index f05b7e2ede23..c13bc8909fa9 100644 --- a/server/api/operator_test.go +++ b/server/api/operator_test.go @@ -343,6 +343,7 @@ func (s *testTransferRegionOperatorSuite) TestTransferRegionWithPlacementRule(c }, ", "), }, } +<<<<<<< HEAD for _, tc := range tt { c.Log(tc.name) @@ -352,6 +353,18 @@ func (s *testTransferRegionOperatorSuite) TestTransferRegionWithPlacementRule(c s.svr.GetRaftCluster().GetOpts().GetMaxReplicas(), s.svr.GetRaftCluster().GetOpts().GetLocationLabels()) c.Assert(err, IsNil) +======= + for _, testCase := range testCases { + suite.T().Log(testCase.name) + suite.svr.GetRaftCluster().GetOpts().SetPlacementRuleEnabled(testCase.placementRuleEnable) + if testCase.placementRuleEnable { + err := suite.svr.GetRaftCluster().GetRuleManager().Initialize( + suite.svr.GetRaftCluster().GetOpts().GetMaxReplicas(), + suite.svr.GetRaftCluster().GetOpts().GetLocationLabels(), + suite.svr.GetRaftCluster().GetOpts().GetIsolationLevel(), + ) + suite.NoError(err) +>>>>>>> 5b3d0172b (*: fix sync isolation level to default placement rule (#7122)) } if len(tc.rules) > 0 { // add customized rule first and then remove default rule diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 57963a48d9be..68cc8dbe7587 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -240,7 +240,7 @@ func (c *RaftCluster) Start(s Server) error { c.ruleManager = placement.NewRuleManager(c.storage, c, c.GetOpts()) if c.opt.IsPlacementRulesEnabled() { - err = c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels()) + err = c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels(), c.opt.GetIsolationLevel()) if err != nil { return err } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index d2ffea4931e4..ff90ee5c76ec 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -213,11 +213,23 @@ func (s *testClusterInfoSuite) TestFilterUnhealthyStore(c *C) { func (s *testClusterInfoSuite) TestSetOfflineStore(c *C) { _, opt, err := newTestScheduleConfig() +<<<<<<< HEAD c.Assert(err, IsNil) cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) // Put 4 stores. for _, store := range newTestStores(4, "2.0.0") { c.Assert(cluster.PutStore(store.GetMeta()), IsNil) +======= + re.NoError(err) + cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) + cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) + cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) + if opt.IsPlacementRulesEnabled() { + err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) + if err != nil { + panic(err) + } +>>>>>>> 5b3d0172b (*: fix sync isolation level to default placement rule (#7122)) } // store 1: up -> offline c.Assert(cluster.RemoveStore(1, false), IsNil) @@ -315,8 +327,21 @@ func getTestDeployPath(storeID uint64) string { func (s *testClusterInfoSuite) TestUpStore(c *C) { _, opt, err := newTestScheduleConfig() +<<<<<<< HEAD c.Assert(err, IsNil) cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) +======= + re.NoError(err) + cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) + cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) + cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) + if opt.IsPlacementRulesEnabled() { + err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) + if err != nil { + panic(err) + } + } +>>>>>>> 5b3d0172b (*: fix sync isolation level to default placement rule (#7122)) // Put 3 stores. for _, store := range newTestStores(3, "2.0.0") { @@ -348,8 +373,79 @@ func (s *testClusterInfoSuite) TestUpStore(c *C) { func (s *testClusterInfoSuite) TestDeleteStoreUpdatesClusterVersion(c *C) { _, opt, err := newTestScheduleConfig() +<<<<<<< HEAD c.Assert(err, IsNil) cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) +======= + re.NoError(err) + cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) + cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) + cluster.SetPrepared() + + // Put 5 stores. + stores := newTestStores(5, "5.0.0") + for _, store := range stores { + re.NoError(cluster.PutStore(store.GetMeta())) + } + regions := newTestRegions(100, 5, 1) + var regionInStore1 []*core.RegionInfo + for _, region := range regions { + if region.GetPeers()[0].GetStoreId() == 1 { + region = region.Clone(core.SetApproximateSize(100)) + regionInStore1 = append(regionInStore1, region) + } + re.NoError(cluster.putRegion(region)) + } + re.Len(regionInStore1, 20) + cluster.progressManager = progress.NewManager() + cluster.RemoveStore(1, false) + cluster.checkStores() + process := "removing-1" + // no region moving + p, l, cs, err := cluster.progressManager.Status(process) + re.NoError(err) + re.Equal(0.0, p) + re.Equal(math.MaxFloat64, l) + re.Equal(0.0, cs) + i := 0 + // simulate region moving by deleting region from store 1 + for _, region := range regionInStore1 { + if i >= 5 { + break + } + cluster.DropCacheRegion(region.GetID()) + i++ + } + cluster.checkStores() + p, l, cs, err = cluster.progressManager.Status(process) + re.NoError(err) + // In above we delete 5 region from store 1, the total count of region in store 1 is 20. + // process = 5 / 20 = 0.25 + re.Equal(0.25, p) + // Each region is 100MB, we use more than 1s to move 5 region. + // speed = 5 * 100MB / 20s = 25MB/s + re.Equal(25.0, cs) + // left second = 15 * 100MB / 25s = 60s + re.Equal(60.0, l) +} + +func TestDeleteStoreUpdatesClusterVersion(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, opt, err := newTestScheduleConfig() + re.NoError(err) + cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) + cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) + cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) + if opt.IsPlacementRulesEnabled() { + err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) + if err != nil { + panic(err) + } + } +>>>>>>> 5b3d0172b (*: fix sync isolation level to default placement rule (#7122)) // Put 3 new 4.0.9 stores. for _, store := range newTestStores(3, "4.0.9") { @@ -850,7 +946,7 @@ func (s *testClusterInfoSuite) TestOfflineAndMerge(c *C) { storage := core.NewStorage(kv.NewMemoryKV()) cluster.ruleManager = placement.NewRuleManager(storage, cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { - err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels()) + err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { panic(err) } @@ -1146,7 +1242,7 @@ func newTestCluster(ctx context.Context, opt *config.PersistOptions) *testCluste rc := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage, core.NewBasicCluster()) rc.ruleManager = placement.NewRuleManager(storage, rc, rc.GetOpts()) if opt.IsPlacementRulesEnabled() { - err := rc.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels()) + err := rc.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { panic(err) } diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 0b1412f71092..673ebc03a3b4 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -252,6 +252,59 @@ func (o *PersistOptions) SetSplitMergeInterval(splitMergeInterval time.Duration) o.SetScheduleConfig(v) } +<<<<<<< HEAD +======= +// GetSwitchWitnessInterval returns the interval between promote to non-witness and starting to switch to witness. +func (o *PersistOptions) GetSwitchWitnessInterval() time.Duration { + return o.GetScheduleConfig().SwitchWitnessInterval.Duration +} + +// IsDiagnosticAllowed returns whether is enable to use diagnostic. +func (o *PersistOptions) IsDiagnosticAllowed() bool { + return o.GetScheduleConfig().EnableDiagnostic +} + +// SetEnableDiagnostic to set the option for diagnose. It's only used to test. +func (o *PersistOptions) SetEnableDiagnostic(enable bool) { + v := o.GetScheduleConfig().Clone() + v.EnableDiagnostic = enable + o.SetScheduleConfig(v) +} + +// IsWitnessAllowed returns whether is enable to use witness. +func (o *PersistOptions) IsWitnessAllowed() bool { + return o.GetScheduleConfig().EnableWitness +} + +// SetEnableWitness to set the option for witness. It's only used to test. +func (o *PersistOptions) SetEnableWitness(enable bool) { + v := o.GetScheduleConfig().Clone() + v.EnableWitness = enable + o.SetScheduleConfig(v) +} + +// SetMaxStoreDownTime to set the max store down time. It's only used to test. +func (o *PersistOptions) SetMaxStoreDownTime(time time.Duration) { + v := o.GetScheduleConfig().Clone() + v.MaxStoreDownTime = typeutil.NewDuration(time) + o.SetScheduleConfig(v) +} + +// SetMaxMergeRegionSize sets the max merge region size. +func (o *PersistOptions) SetMaxMergeRegionSize(maxMergeRegionSize uint64) { + v := o.GetScheduleConfig().Clone() + v.MaxMergeRegionSize = maxMergeRegionSize + o.SetScheduleConfig(v) +} + +// SetMaxMergeRegionKeys sets the max merge region keys. +func (o *PersistOptions) SetMaxMergeRegionKeys(maxMergeRegionKeys uint64) { + v := o.GetScheduleConfig().Clone() + v.MaxMergeRegionKeys = maxMergeRegionKeys + o.SetScheduleConfig(v) +} + +>>>>>>> 5b3d0172b (*: fix sync isolation level to default placement rule (#7122)) // SetStoreLimit sets a store limit for a given type and rate. func (o *PersistOptions) SetStoreLimit(storeID uint64, typ storelimit.Type, ratePerMin float64) { v := o.GetScheduleConfig().Clone() diff --git a/server/schedule/placement/rule_manager.go b/server/schedule/placement/rule_manager.go index 66ec8f877542..555e98ed21f4 100644 --- a/server/schedule/placement/rule_manager.go +++ b/server/schedule/placement/rule_manager.go @@ -62,7 +62,7 @@ func NewRuleManager(storage *core.Storage, storeSetInformer core.StoreSetInforme // Initialize loads rules from storage. If Placement Rules feature is never enabled, it creates default rule that is // compatible with previous configuration. -func (m *RuleManager) Initialize(maxReplica int, locationLabels []string) error { +func (m *RuleManager) Initialize(maxReplica int, locationLabels []string, isolationLevel string) error { m.Lock() defer m.Unlock() if m.initialized { @@ -77,12 +77,49 @@ func (m *RuleManager) Initialize(maxReplica int, locationLabels []string) error } if len(m.ruleConfig.rules) == 0 { // migrate from old config. +<<<<<<< HEAD:server/schedule/placement/rule_manager.go defaultRule := &Rule{ GroupID: "pd", ID: "default", Role: Voter, Count: maxReplica, LocationLabels: locationLabels, +======= + var defaultRules []*Rule + if m.conf != nil && m.conf.IsWitnessAllowed() && maxReplica >= 3 { + // Because maxReplica is actually always an odd number, so directly divided by 2 + witnessCount := maxReplica / 2 + defaultRules = append(defaultRules, + []*Rule{ + { + GroupID: "pd", + ID: "default", + Role: Voter, + Count: maxReplica - witnessCount, + LocationLabels: locationLabels, + IsolationLevel: isolationLevel, + }, + { + GroupID: "pd", + ID: "witness", + Role: Voter, + Count: witnessCount, + IsWitness: true, + LocationLabels: locationLabels, + IsolationLevel: isolationLevel, + }, + }..., + ) + } else { + defaultRules = append(defaultRules, &Rule{ + GroupID: "pd", + ID: "default", + Role: Voter, + Count: maxReplica, + LocationLabels: locationLabels, + IsolationLevel: isolationLevel, + }) +>>>>>>> 5b3d0172b (*: fix sync isolation level to default placement rule (#7122)):pkg/schedule/placement/rule_manager.go } if err := m.storage.SaveRule(defaultRule.StoreKey(), defaultRule); err != nil { return err diff --git a/server/server.go b/server/server.go index 88647b89dd30..8b0bd4350212 100644 --- a/server/server.go +++ b/server/server.go @@ -859,7 +859,11 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error { } if cfg.EnablePlacementRules { // initialize rule manager. +<<<<<<< HEAD if err := raftCluster.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels); err != nil { +======= + if err := rc.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels, cfg.IsolationLevel); err != nil { +>>>>>>> 5b3d0172b (*: fix sync isolation level to default placement rule (#7122)) return err } } else { @@ -878,19 +882,27 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error { defaultRule := s.GetRaftCluster().GetRuleManager().GetRule("pd", "default") CheckInDefaultRule := func() error { - // replication config won't work when placement rule is enabled and exceeds one default rule + // replication config won't work when placement rule is enabled and exceeds one default rule if !(defaultRule != nil && len(defaultRule.StartKey) == 0 && len(defaultRule.EndKey) == 0) { - return errors.New("cannot update MaxReplicas or LocationLabels when placement rules feature is enabled and not only default rule exists, please update rule instead") + return errors.New("cannot update MaxReplicas, LocationLabels or IsolationLevel when placement rules feature is enabled and not only default rule exists, please update rule instead") } +<<<<<<< HEAD if !(defaultRule.Count == int(old.MaxReplicas) && typeutil.StringsEqual(defaultRule.LocationLabels, []string(old.LocationLabels))) { +======= + if !(defaultRule.Count == int(old.MaxReplicas) && typeutil.AreStringSlicesEqual(defaultRule.LocationLabels, []string(old.LocationLabels)) && defaultRule.IsolationLevel == old.IsolationLevel) { +>>>>>>> 5b3d0172b (*: fix sync isolation level to default placement rule (#7122)) return errors.New("cannot to update replication config, the default rules do not consistent with replication config, please update rule instead") } return nil } +<<<<<<< HEAD if !(cfg.MaxReplicas == old.MaxReplicas && typeutil.StringsEqual(cfg.LocationLabels, old.LocationLabels)) { +======= + if !(cfg.MaxReplicas == old.MaxReplicas && typeutil.AreStringSlicesEqual(cfg.LocationLabels, old.LocationLabels) && cfg.IsolationLevel == old.IsolationLevel) { +>>>>>>> 5b3d0172b (*: fix sync isolation level to default placement rule (#7122)) if err := CheckInDefaultRule(); err != nil { return err } @@ -901,7 +913,16 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error { if rule != nil { rule.Count = int(cfg.MaxReplicas) rule.LocationLabels = cfg.LocationLabels +<<<<<<< HEAD if err := s.GetRaftCluster().GetRuleManager().SetRule(rule); err != nil { +======= + rule.IsolationLevel = cfg.IsolationLevel + rc := s.GetRaftCluster() + if rc == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } + if err := rc.GetRuleManager().SetRule(rule); err != nil { +>>>>>>> 5b3d0172b (*: fix sync isolation level to default placement rule (#7122)) log.Error("failed to update rule count", errs.ZapError(err)) return err diff --git a/server/statistics/region_collection_test.go b/server/statistics/region_collection_test.go index 7ec069e810d8..fd6309f0bfc8 100644 --- a/server/statistics/region_collection_test.go +++ b/server/statistics/region_collection_test.go @@ -26,6 +26,7 @@ import ( "github.com/tikv/pd/server/schedule/placement" ) +<<<<<<< HEAD:server/statistics/region_collection_test.go func TestStatistics(t *testing.T) { TestingT(t) } @@ -47,6 +48,15 @@ func (t *testRegionStatisticsSuite) SetUpTest(c *C) { func (t *testRegionStatisticsSuite) TestRegionStatistics(c *C) { opt := config.NewTestOptions() +======= +func TestRegionStatistics(t *testing.T) { + re := require.New(t) + store := storage.NewStorageWithMemoryBackend() + manager := placement.NewRuleManager(store, nil, nil) + err := manager.Initialize(3, []string{"zone", "rack", "host"}, "") + re.NoError(err) + opt := mockconfig.NewTestOptions() +>>>>>>> 5b3d0172b (*: fix sync isolation level to default placement rule (#7122)):pkg/statistics/region_collection_test.go opt.SetPlacementRuleEnabled(false) peers := []*metapb.Peer{ {Id: 5, StoreId: 1}, @@ -143,8 +153,18 @@ func (t *testRegionStatisticsSuite) TestRegionStatistics(c *C) { c.Assert(regionStats.stats[OfflinePeer], HasLen, 0) } +<<<<<<< HEAD:server/statistics/region_collection_test.go func (t *testRegionStatisticsSuite) TestRegionStatisticsWithPlacementRule(c *C) { opt := config.NewTestOptions() +======= +func TestRegionStatisticsWithPlacementRule(t *testing.T) { + re := require.New(t) + store := storage.NewStorageWithMemoryBackend() + manager := placement.NewRuleManager(store, nil, nil) + err := manager.Initialize(3, []string{"zone", "rack", "host"}, "") + re.NoError(err) + opt := mockconfig.NewTestOptions() +>>>>>>> 5b3d0172b (*: fix sync isolation level to default placement rule (#7122)):pkg/statistics/region_collection_test.go opt.SetPlacementRuleEnabled(true) peers := []*metapb.Peer{ {Id: 5, StoreId: 1}, diff --git a/tests/pdctl/config/config_test.go b/tests/pdctl/config/config_test.go index 799a9e47151d..0691e453c24f 100644 --- a/tests/pdctl/config/config_test.go +++ b/tests/pdctl/config/config_test.go @@ -620,7 +620,7 @@ func (s *configTestSuite) TestUpdateDefaultReplicaConfig(c *C) { c.Assert(replicationCfg.MaxReplicas, Equals, expect) } - checkLocaltionLabels := func(expect int) { + checkLocationLabels := func(expect int) { args := []string{"-u", pdAddr, "config", "show", "replication"} output, err := pdctl.ExecuteCommand(cmd, args...) c.Assert(err, IsNil) @@ -629,6 +629,15 @@ func (s *configTestSuite) TestUpdateDefaultReplicaConfig(c *C) { c.Assert(replicationCfg.LocationLabels, HasLen, expect) } + checkIsolationLevel := func(expect string) { + args := []string{"-u", pdAddr, "config", "show", "replication"} + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + replicationCfg := sc.ReplicationConfig{} + re.NoError(json.Unmarshal(output, &replicationCfg)) + re.Equal(replicationCfg.IsolationLevel, expect) + } + checkRuleCount := func(expect int) { args := []string{"-u", pdAddr, "config", "placement-rules", "show", "--group", "pd", "--id", "default"} output, err := pdctl.ExecuteCommand(cmd, args...) @@ -647,16 +656,36 @@ func (s *configTestSuite) TestUpdateDefaultReplicaConfig(c *C) { c.Assert(rule.LocationLabels, HasLen, expect) } + checkRuleIsolationLevel := func(expect string) { + args := []string{"-u", pdAddr, "config", "placement-rules", "show", "--group", "pd", "--id", "default"} + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + rule := placement.Rule{} + re.NoError(json.Unmarshal(output, &rule)) + re.Equal(rule.IsolationLevel, expect) + } + // update successfully when placement rules is not enabled. output, err := pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "max-replicas", "2") c.Assert(err, IsNil) c.Assert(strings.Contains(string(output), "Success!"), IsTrue) checkMaxReplicas(2) output, err = pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "location-labels", "zone,host") +<<<<<<< HEAD c.Assert(err, IsNil) c.Assert(strings.Contains(string(output), "Success!"), IsTrue) checkLocaltionLabels(2) +======= + re.NoError(err) + re.Contains(string(output), "Success!") + output, err = pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "isolation-level", "zone") + re.NoError(err) + re.Contains(string(output), "Success!") + checkLocationLabels(2) +>>>>>>> 5b3d0172b (*: fix sync isolation level to default placement rule (#7122)) checkRuleLocationLabels(2) + checkIsolationLevel("zone") + checkRuleIsolationLevel("zone") // update successfully when only one default rule exists. output, err = pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "placement-rules", "enable") @@ -669,11 +698,24 @@ func (s *configTestSuite) TestUpdateDefaultReplicaConfig(c *C) { checkMaxReplicas(3) checkRuleCount(3) + // We need to change isolation first because we will validate + // if the location label contains the isolation level when setting location labels. + output, err = pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "isolation-level", "host") + re.NoError(err) + re.Contains(string(output), "Success!") output, err = pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "location-labels", "host") +<<<<<<< HEAD c.Assert(err, IsNil) c.Assert(strings.Contains(string(output), "Success!"), IsTrue) checkLocaltionLabels(1) +======= + re.NoError(err) + re.Contains(string(output), "Success!") + checkLocationLabels(1) +>>>>>>> 5b3d0172b (*: fix sync isolation level to default placement rule (#7122)) checkRuleLocationLabels(1) + checkIsolationLevel("host") + checkRuleIsolationLevel("host") // update unsuccessfully when many rule exists. f, _ := os.CreateTemp("/tmp", "pd_tests") @@ -703,8 +745,10 @@ func (s *configTestSuite) TestUpdateDefaultReplicaConfig(c *C) { c.Assert(err, IsNil) checkMaxReplicas(4) checkRuleCount(4) - checkLocaltionLabels(1) + checkLocationLabels(1) checkRuleLocationLabels(1) + checkIsolationLevel("host") + checkRuleIsolationLevel("host") } func (s *configTestSuite) TestPDServerConfig(c *C) {