From 344a7a989fcd540bc1f0a0dc854dbb0bbcdcc0f0 Mon Sep 17 00:00:00 2001 From: disksing Date: Mon, 16 Oct 2023 14:27:27 +0800 Subject: [PATCH] This is an automated cherry-pick of #7202 close tikv/pd#7201 Signed-off-by: ti-chi-bot --- server/replication/replication_mode.go | 115 ++++++++---- server/replication/replication_mode_test.go | 198 ++++++++++++++++++-- 2 files changed, 268 insertions(+), 45 deletions(-) diff --git a/server/replication/replication_mode.go b/server/replication/replication_mode.go index a75ef2bfb0d..698cf854f56 100644 --- a/server/replication/replication_mode.go +++ b/server/replication/replication_mode.go @@ -28,8 +28,16 @@ import ( pb "github.com/pingcap/kvproto/pkg/replication_modepb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" +<<<<<<< HEAD:server/replication/replication_mode.go "github.com/tikv/pd/pkg/logutil" "github.com/tikv/pd/pkg/syncutil" +======= + sche "github.com/tikv/pd/pkg/schedule/core" + "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/syncutil" +>>>>>>> 4176c1daa (replication_mode: use placement to determin canSync and hasMajority (#7202)):pkg/replication/replication_mode.go "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/schedule" @@ -364,6 +372,39 @@ func (m *ModeManager) Run(ctx context.Context) { wg.Wait() } +func storeIDs(stores []*core.StoreInfo) []uint64 { + ids := make([]uint64, len(stores)) + for i, s := range stores { + ids[i] = s.GetID() + } + return ids +} + +func minimalUpVoters(rule *placement.Rule, upStores, downStores []*core.StoreInfo) int { + if rule.Role == placement.Learner { + return 0 + } + var up, down int + for _, s := range upStores { + if placement.MatchLabelConstraints(s, rule.LabelConstraints) { + up++ + } + } + for _, s := range downStores { + if placement.MatchLabelConstraints(s, rule.LabelConstraints) { + down++ + } + } + minimalUp := rule.Count - down + if minimalUp < 0 { + minimalUp = 0 + } + if minimalUp > up { + minimalUp = up + } + return minimalUp +} + func (m *ModeManager) tickUpdateState() { if m.getModeName() != modeDRAutoSync { return @@ -371,30 +412,40 @@ func (m *ModeManager) tickUpdateState() { drTickCounter.Inc() - totalPrimaryPeers, totalDrPeers := m.config.DRAutoSync.PrimaryReplicas, m.config.DRAutoSync.DRReplicas stores := m.checkStoreStatus() - // canSync is true when every region has at least 1 replica in each DC. - canSync := len(stores[primaryDown]) < totalPrimaryPeers && len(stores[drDown]) < totalDrPeers && - len(stores[primaryUp]) > 0 && len(stores[drUp]) > 0 + var primaryHasVoter, drHasVoter bool + var totalVoter, totalUpVoter int + for _, r := range m.cluster.GetRuleManager().GetAllRules() { + if len(r.StartKey) > 0 || len(r.EndKey) > 0 { + // All rules should be global rules. If not, skip it. + continue + } + if r.Role != placement.Learner { + totalVoter += r.Count + } + minimalUpPrimary := minimalUpVoters(r, stores[primaryUp], stores[primaryDown]) + minimalUpDr := minimalUpVoters(r, stores[drUp], stores[drDown]) + primaryHasVoter = primaryHasVoter || minimalUpPrimary > 0 + drHasVoter = drHasVoter || minimalUpDr > 0 + upVoters := minimalUpPrimary + minimalUpDr + if upVoters > r.Count { + upVoters = r.Count + } + totalUpVoter += upVoters + } + // canSync is true when every region has at least 1 voter replica in each DC. // hasMajority is true when every region has majority peer online. - var upPeers int - if len(stores[primaryDown]) < totalPrimaryPeers { - upPeers += totalPrimaryPeers - len(stores[primaryDown]) - } - if len(stores[drDown]) < totalDrPeers { - upPeers += totalDrPeers - len(stores[drDown]) - } - hasMajority := upPeers*2 > totalPrimaryPeers+totalDrPeers + canSync := primaryHasVoter && drHasVoter + hasMajority := totalUpVoter*2 > totalVoter log.Debug("replication store status", - zap.Uint64s("up-primary", stores[primaryUp]), - zap.Uint64s("up-dr", stores[drUp]), - zap.Uint64s("down-primary", stores[primaryDown]), - zap.Uint64s("down-dr", stores[drDown]), + zap.Uint64s("up-primary", storeIDs(stores[primaryUp])), + zap.Uint64s("up-dr", storeIDs(stores[drUp])), + zap.Uint64s("down-primary", storeIDs(stores[primaryDown])), + zap.Uint64s("down-dr", storeIDs(stores[drDown])), zap.Bool("can-sync", canSync), - zap.Int("up-peers", upPeers), zap.Bool("has-majority", hasMajority), ) @@ -420,31 +471,31 @@ func (m *ModeManager) tickUpdateState() { case drStateSync: // If hasMajority is false, the cluster is always unavailable. Switch to async won't help. if !canSync && hasMajority { - m.drSwitchToAsyncWait(stores[primaryUp]) + m.drSwitchToAsyncWait(storeIDs(stores[primaryUp])) } case drStateAsyncWait: if canSync { m.drSwitchToSync() break } - if oldAvailableStores := m.drGetAvailableStores(); !reflect.DeepEqual(oldAvailableStores, stores[primaryUp]) { - m.drSwitchToAsyncWait(stores[primaryUp]) + if oldAvailableStores := m.drGetAvailableStores(); !reflect.DeepEqual(oldAvailableStores, storeIDs(stores[primaryUp])) { + m.drSwitchToAsyncWait(storeIDs(stores[primaryUp])) break } - if m.drCheckStoreStateUpdated(stores[primaryUp]) { - m.drSwitchToAsync(stores[primaryUp]) + if m.drCheckStoreStateUpdated(storeIDs(stores[primaryUp])) { + m.drSwitchToAsync(storeIDs(stores[primaryUp])) } case drStateAsync: if canSync { m.drSwitchToSyncRecover() break } - if !reflect.DeepEqual(m.drGetAvailableStores(), stores[primaryUp]) && m.drCheckStoreStateUpdated(stores[primaryUp]) { - m.drSwitchToAsync(stores[primaryUp]) + if !reflect.DeepEqual(m.drGetAvailableStores(), stores[primaryUp]) && m.drCheckStoreStateUpdated(storeIDs(stores[primaryUp])) { + m.drSwitchToAsync(storeIDs(stores[primaryUp])) } case drStateSyncRecover: if !canSync && hasMajority { - m.drSwitchToAsync(stores[primaryUp]) + m.drSwitchToAsync(storeIDs(stores[primaryUp])) } else { m.updateProgress() progress := m.estimateProgress() @@ -519,10 +570,10 @@ const ( storeStatusTypeCount ) -func (m *ModeManager) checkStoreStatus() [][]uint64 { +func (m *ModeManager) checkStoreStatus() [][]*core.StoreInfo { m.RLock() defer m.RUnlock() - stores := make([][]uint64, storeStatusTypeCount) + stores := make([][]*core.StoreInfo, storeStatusTypeCount) for _, s := range m.cluster.GetStores() { if s.IsRemoved() { continue @@ -535,21 +586,21 @@ func (m *ModeManager) checkStoreStatus() [][]uint64 { labelValue := s.GetLabelValue(m.config.DRAutoSync.LabelKey) if labelValue == m.config.DRAutoSync.Primary { if down { - stores[primaryDown] = append(stores[primaryDown], s.GetID()) + stores[primaryDown] = append(stores[primaryDown], s) } else { - stores[primaryUp] = append(stores[primaryUp], s.GetID()) + stores[primaryUp] = append(stores[primaryUp], s) } } if labelValue == m.config.DRAutoSync.DR { if down { - stores[drDown] = append(stores[drDown], s.GetID()) + stores[drDown] = append(stores[drDown], s) } else { - stores[drUp] = append(stores[drUp], s.GetID()) + stores[drUp] = append(stores[drUp], s) } } } for i := range stores { - sort.Slice(stores[i], func(a, b int) bool { return stores[i][a] < stores[i][b] }) + sort.Slice(stores[i], func(a, b int) bool { return stores[i][a].GetID() < stores[i][b].GetID() }) } return stores } diff --git a/server/replication/replication_mode_test.go b/server/replication/replication_mode_test.go index 7ddd4880011..f5669e965a4 100644 --- a/server/replication/replication_mode_test.go +++ b/server/replication/replication_mode_test.go @@ -25,7 +25,14 @@ import ( pb "github.com/pingcap/kvproto/pkg/replication_modepb" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/mock/mockcluster" +<<<<<<< HEAD:server/replication/replication_mode_test.go "github.com/tikv/pd/pkg/typeutil" +======= + "github.com/tikv/pd/pkg/mock/mockconfig" + "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/storage" + "github.com/tikv/pd/pkg/utils/typeutil" +>>>>>>> 4176c1daa (replication_mode: use placement to determin canSync and hasMajority (#7202)):pkg/replication/replication_mode_test.go "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/storage" @@ -166,14 +173,17 @@ func TestStateSwitch(t *testing.T) { LabelKey: "zone", Primary: "zone1", DR: "zone2", - PrimaryReplicas: 4, - DRReplicas: 2, WaitStoreTimeout: typeutil.Duration{Duration: time.Minute}, }} cluster := mockcluster.NewCluster(ctx, config.NewTestOptions()) replicator := newMockReplicator([]uint64{1}) rep, err := NewReplicationModeManager(conf, store, cluster, replicator) re.NoError(err) + cluster.GetRuleManager().SetAllGroupBundles( + genPlacementRuleConfig([]ruleConfig{ + {key: "zone", value: "zone1", role: placement.Voter, count: 4}, + {key: "zone", value: "zone2", role: placement.Voter, count: 2}, + }), true) cluster.AddLabelsStore(1, 1, map[string]string{"zone": "zone1"}) cluster.AddLabelsStore(2, 1, map[string]string{"zone": "zone1"}) @@ -365,14 +375,23 @@ func TestReplicateState(t *testing.T) { LabelKey: "zone", Primary: "zone1", DR: "zone2", - PrimaryReplicas: 2, - DRReplicas: 1, WaitStoreTimeout: typeutil.Duration{Duration: time.Minute}, }} +<<<<<<< HEAD:server/replication/replication_mode_test.go cluster := mockcluster.NewCluster(ctx, config.NewTestOptions()) +======= + cluster := mockcluster.NewCluster(ctx, mockconfig.NewTestOptions()) + cluster.GetRuleManager().SetAllGroupBundles( + genPlacementRuleConfig([]ruleConfig{ + {key: "zone", value: "zone1", role: placement.Voter, count: 2}, + {key: "zone", value: "zone2", role: placement.Voter, count: 1}, + }), true) +>>>>>>> 4176c1daa (replication_mode: use placement to determin canSync and hasMajority (#7202)):pkg/replication/replication_mode_test.go replicator := newMockReplicator([]uint64{1}) rep, err := NewReplicationModeManager(conf, store, cluster, replicator) re.NoError(err) + cluster.AddLabelsStore(1, 1, map[string]string{"zone": "zone1"}) + cluster.AddLabelsStore(2, 1, map[string]string{"zone": "zone1"}) stateID := rep.drAutoSync.StateID // replicate after initialized @@ -390,14 +409,14 @@ func TestReplicateState(t *testing.T) { rep.tickUpdateState() // switch async_wait since there is only one zone newStateID := rep.drAutoSync.StateID rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d}`, newStateID), replicator.lastData[1]) + re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2]}`, newStateID), replicator.lastData[1]) re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[2]) - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d}`, newStateID), replicator.lastData[3]) + re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2]}`, newStateID), replicator.lastData[3]) // clear error, replicate to node 2 next time delete(replicator.errors, 2) rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d}`, newStateID), replicator.lastData[2]) + re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2]}`, newStateID), replicator.lastData[2]) } func TestAsynctimeout(t *testing.T) { @@ -409,11 +428,18 @@ func TestAsynctimeout(t *testing.T) { LabelKey: "zone", Primary: "zone1", DR: "zone2", - PrimaryReplicas: 2, - DRReplicas: 1, WaitStoreTimeout: typeutil.Duration{Duration: time.Minute}, }} +<<<<<<< HEAD:server/replication/replication_mode_test.go cluster := mockcluster.NewCluster(ctx, config.NewTestOptions()) +======= + cluster := mockcluster.NewCluster(ctx, mockconfig.NewTestOptions()) + cluster.GetRuleManager().SetAllGroupBundles( + genPlacementRuleConfig([]ruleConfig{ + {key: "zone", value: "zone1", role: placement.Voter, count: 2}, + {key: "zone", value: "zone2", role: placement.Voter, count: 1}, + }), true) +>>>>>>> 4176c1daa (replication_mode: use placement to determin canSync and hasMajority (#7202)):pkg/replication/replication_mode_test.go var replicator mockFileReplicator rep, err := NewReplicationModeManager(conf, store, cluster, &replicator) re.NoError(err) @@ -452,11 +478,18 @@ func TestRecoverProgress(t *testing.T) { LabelKey: "zone", Primary: "zone1", DR: "zone2", - PrimaryReplicas: 2, - DRReplicas: 1, WaitStoreTimeout: typeutil.Duration{Duration: time.Minute}, }} +<<<<<<< HEAD:server/replication/replication_mode_test.go cluster := mockcluster.NewCluster(ctx, config.NewTestOptions()) +======= + cluster := mockcluster.NewCluster(ctx, mockconfig.NewTestOptions()) + cluster.GetRuleManager().SetAllGroupBundles( + genPlacementRuleConfig([]ruleConfig{ + {key: "zone", value: "zone1", role: placement.Voter, count: 2}, + {key: "zone", value: "zone2", role: placement.Voter, count: 1}, + }), true) +>>>>>>> 4176c1daa (replication_mode: use placement to determin canSync and hasMajority (#7202)):pkg/replication/replication_mode_test.go cluster.AddLabelsStore(1, 1, map[string]string{}) rep, err := NewReplicationModeManager(conf, store, cluster, newMockReplicator([]uint64{1})) re.NoError(err) @@ -514,11 +547,18 @@ func TestRecoverProgressWithSplitAndMerge(t *testing.T) { LabelKey: "zone", Primary: "zone1", DR: "zone2", - PrimaryReplicas: 2, - DRReplicas: 1, WaitStoreTimeout: typeutil.Duration{Duration: time.Minute}, }} +<<<<<<< HEAD:server/replication/replication_mode_test.go cluster := mockcluster.NewCluster(ctx, config.NewTestOptions()) +======= + cluster := mockcluster.NewCluster(ctx, mockconfig.NewTestOptions()) + cluster.GetRuleManager().SetAllGroupBundles( + genPlacementRuleConfig([]ruleConfig{ + {key: "zone", value: "zone1", role: placement.Voter, count: 2}, + {key: "zone", value: "zone2", role: placement.Voter, count: 1}, + }), true) +>>>>>>> 4176c1daa (replication_mode: use placement to determin canSync and hasMajority (#7202)):pkg/replication/replication_mode_test.go cluster.AddLabelsStore(1, 1, map[string]string{}) rep, err := NewReplicationModeManager(conf, store, cluster, newMockReplicator([]uint64{1})) re.NoError(err) @@ -570,6 +610,114 @@ func TestRecoverProgressWithSplitAndMerge(t *testing.T) { re.Equal(float32(1.0), rep.estimateProgress()) } +func TestComplexPlacementRules(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + store := storage.NewStorageWithMemoryBackend() + conf := config.ReplicationModeConfig{ReplicationMode: modeDRAutoSync, DRAutoSync: config.DRAutoSyncReplicationConfig{ + LabelKey: "zone", + Primary: "zone1", + DR: "zone2", + WaitStoreTimeout: typeutil.Duration{Duration: time.Minute}, + }} + cluster := mockcluster.NewCluster(ctx, mockconfig.NewTestOptions()) + replicator := newMockReplicator([]uint64{1}) + rep, err := NewReplicationModeManager(conf, store, cluster, replicator) + re.NoError(err) + cluster.GetRuleManager().SetAllGroupBundles( + genPlacementRuleConfig([]ruleConfig{ + {key: "logic", value: "logic1", role: placement.Voter, count: 1}, + {key: "logic", value: "logic2", role: placement.Voter, count: 1}, + {key: "logic", value: "logic3", role: placement.Voter, count: 1}, + {key: "logic", value: "logic4", role: placement.Voter, count: 1}, + {key: "logic", value: "logic5", role: placement.Voter, count: 1}, + }), true) + + cluster.AddLabelsStore(1, 1, map[string]string{"zone": "zone1", "logic": "logic1"}) + cluster.AddLabelsStore(2, 1, map[string]string{"zone": "zone1", "logic": "logic1"}) + cluster.AddLabelsStore(3, 1, map[string]string{"zone": "zone1", "logic": "logic2"}) + cluster.AddLabelsStore(4, 1, map[string]string{"zone": "zone1", "logic": "logic2"}) + cluster.AddLabelsStore(5, 1, map[string]string{"zone": "zone1", "logic": "logic3"}) + cluster.AddLabelsStore(6, 1, map[string]string{"zone": "zone1", "logic": "logic3"}) + cluster.AddLabelsStore(7, 1, map[string]string{"zone": "zone2", "logic": "logic4"}) + cluster.AddLabelsStore(8, 1, map[string]string{"zone": "zone2", "logic": "logic4"}) + cluster.AddLabelsStore(9, 1, map[string]string{"zone": "zone2", "logic": "logic5"}) + cluster.AddLabelsStore(10, 1, map[string]string{"zone": "zone2", "logic": "logic5"}) + + // initial state is sync + re.Equal(drStateSync, rep.drGetState()) + + // down logic3 + logic5, can remain sync + setStoreState(cluster, "up", "up", "up", "up", "down", "down", "up", "up", "down", "down") + rep.tickUpdateState() + re.Equal(drStateSync, rep.drGetState()) + + // down 1 tikv from logic4 + 1 tikv from logic5, cannot sync + setStoreState(cluster, "up", "up", "up", "up", "up", "up", "up", "down", "up", "down") + rep.tickUpdateState() + re.Equal(drStateAsyncWait, rep.drGetState()) + + // reset to sync + setStoreState(cluster, "up", "up", "up", "up", "up", "up", "up", "up", "up", "up") + rep.tickUpdateState() + re.Equal(drStateSync, rep.drGetState()) + + // lost majority, down 1 tikv from logic2 + 1 tikv from logic3 + 1tikv from logic5, remain sync state + setStoreState(cluster, "up", "up", "up", "down", "up", "down", "up", "up", "up", "down") + rep.tickUpdateState() + re.Equal(drStateSync, rep.drGetState()) +} + +func TestComplexPlacementRules2(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + store := storage.NewStorageWithMemoryBackend() + conf := config.ReplicationModeConfig{ReplicationMode: modeDRAutoSync, DRAutoSync: config.DRAutoSyncReplicationConfig{ + LabelKey: "zone", + Primary: "zone1", + DR: "zone2", + WaitStoreTimeout: typeutil.Duration{Duration: time.Minute}, + }} + cluster := mockcluster.NewCluster(ctx, mockconfig.NewTestOptions()) + replicator := newMockReplicator([]uint64{1}) + rep, err := NewReplicationModeManager(conf, store, cluster, replicator) + re.NoError(err) + cluster.GetRuleManager().SetAllGroupBundles( + genPlacementRuleConfig([]ruleConfig{ + {key: "logic", value: "logic1", role: placement.Voter, count: 2}, + {key: "logic", value: "logic2", role: placement.Voter, count: 1}, + {key: "logic", value: "logic3", role: placement.Voter, count: 2}, + }), true) + + cluster.AddLabelsStore(1, 1, map[string]string{"zone": "zone1", "logic": "logic1"}) + cluster.AddLabelsStore(2, 1, map[string]string{"zone": "zone1", "logic": "logic1"}) + cluster.AddLabelsStore(3, 1, map[string]string{"zone": "zone1", "logic": "logic2"}) + cluster.AddLabelsStore(4, 1, map[string]string{"zone": "zone1", "logic": "logic2"}) + cluster.AddLabelsStore(5, 1, map[string]string{"zone": "zone2", "logic": "logic3"}) + cluster.AddLabelsStore(6, 1, map[string]string{"zone": "zone2", "logic": "logic3"}) + cluster.AddLabelsStore(7, 1, map[string]string{"zone": "zone2", "logic": "logic3"}) + + // initial state is sync + re.Equal(drStateSync, rep.drGetState()) + + // down 1 from logic3, can remain sync + setStoreState(cluster, "up", "up", "up", "up", "up", "down", "up") + rep.tickUpdateState() + re.Equal(drStateSync, rep.drGetState()) + + // down 1 from logic1, 1 from logic2, can remain sync + setStoreState(cluster, "up", "down", "up", "down", "up", "up", "up") + rep.tickUpdateState() + re.Equal(drStateSync, rep.drGetState()) + + // down another from logic3, cannot sync + setStoreState(cluster, "up", "up", "up", "up", "down", "down", "up") + rep.tickUpdateState() + re.Equal(drStateAsyncWait, rep.drGetState()) +} + func genRegions(cluster *mockcluster.Cluster, stateID uint64, n int) []*core.RegionInfo { var regions []*core.RegionInfo for i := 1; i <= n; i++ { @@ -589,3 +737,27 @@ func genRegions(cluster *mockcluster.Cluster, stateID uint64, n int) []*core.Reg } return regions } + +type ruleConfig struct { + key string + value string + role placement.PeerRoleType + count int +} + +func genPlacementRuleConfig(rules []ruleConfig) []placement.GroupBundle { + group := placement.GroupBundle{ + ID: "group1", + } + for i, r := range rules { + group.Rules = append(group.Rules, &placement.Rule{ + ID: fmt.Sprintf("rule%d", i), + Role: r.role, + LabelConstraints: []placement.LabelConstraint{ + {Key: r.key, Op: placement.In, Values: []string{r.value}}, + }, + Count: r.count, + }) + } + return []placement.GroupBundle{group} +}