From 67705e6623e84c094d4ef1e8222356cf21557f3f Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 16 Jan 2023 15:05:48 +0800 Subject: [PATCH] DDL: Fix bug that TiFlash replica unavailable after add partition with small probability (#39170) (#39195) close pingcap/tidb#39171 --- ddl/ddl_tiflash_api.go | 146 +++++++----------------------------- ddl/ddl_tiflash_test.go | 35 +++++++++ ddl/partition.go | 4 + store/gcworker/gc_worker.go | 1 - 4 files changed, 65 insertions(+), 121 deletions(-) diff --git a/ddl/ddl_tiflash_api.go b/ddl/ddl_tiflash_api.go index cb149f238a81c..2d2312772894a 100644 --- a/ddl/ddl_tiflash_api.go +++ b/ddl/ddl_tiflash_api.go @@ -28,19 +28,13 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/ddl/placement" ddlutil "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/infoschema" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/helper" "github.com/pingcap/tidb/util" - "github.com/pingcap/tidb/util/gcutil" "github.com/pingcap/tidb/util/logutil" atomicutil "go.uber.org/atomic" "go.uber.org/zap" @@ -48,12 +42,13 @@ import ( // TiFlashReplicaStatus records status for each TiFlash replica. type TiFlashReplicaStatus struct { - ID int64 - Count uint64 - LocationLabels []string - Available bool - HighPriority bool - IsPartition bool + ID int64 + Count uint64 + LocationLabels []string + Available bool + LogicalTableAvailable bool + HighPriority bool + IsPartition bool } // TiFlashTick is type for backoff threshold. @@ -274,16 +269,16 @@ func GetTiFlashReplicaInfo(tblInfo *model.TableInfo, tableList *[]TiFlashReplica for _, p := range pi.Definitions { logutil.BgLogger().Debug(fmt.Sprintf("Table %v has partition %v\n", tblInfo.ID, p.ID)) *tableList = append(*tableList, TiFlashReplicaStatus{p.ID, - tblInfo.TiFlashReplica.Count, tblInfo.TiFlashReplica.LocationLabels, tblInfo.TiFlashReplica.IsPartitionAvailable(p.ID), false, true}) + tblInfo.TiFlashReplica.Count, tblInfo.TiFlashReplica.LocationLabels, tblInfo.TiFlashReplica.IsPartitionAvailable(p.ID), tblInfo.TiFlashReplica.Available, false, true}) } // partitions that in adding mid-state for _, p := range pi.AddingDefinitions { logutil.BgLogger().Debug(fmt.Sprintf("Table %v has partition adding %v\n", tblInfo.ID, p.ID)) - *tableList = append(*tableList, TiFlashReplicaStatus{p.ID, tblInfo.TiFlashReplica.Count, tblInfo.TiFlashReplica.LocationLabels, tblInfo.TiFlashReplica.IsPartitionAvailable(p.ID), true, true}) + *tableList = append(*tableList, TiFlashReplicaStatus{p.ID, tblInfo.TiFlashReplica.Count, tblInfo.TiFlashReplica.LocationLabels, tblInfo.TiFlashReplica.IsPartitionAvailable(p.ID), tblInfo.TiFlashReplica.Available, true, true}) } } else { logutil.BgLogger().Debug(fmt.Sprintf("Table %v has no partition\n", tblInfo.ID)) - *tableList = append(*tableList, TiFlashReplicaStatus{tblInfo.ID, tblInfo.TiFlashReplica.Count, tblInfo.TiFlashReplica.LocationLabels, tblInfo.TiFlashReplica.Available, false, false}) + *tableList = append(*tableList, TiFlashReplicaStatus{tblInfo.ID, tblInfo.TiFlashReplica.Count, tblInfo.TiFlashReplica.LocationLabels, tblInfo.TiFlashReplica.Available, tblInfo.TiFlashReplica.Available, false, false}) } } @@ -388,6 +383,21 @@ func (d *ddl) pollTiFlashReplicaStatus(ctx sessionctx.Context, pollTiFlashContex } } + failpoint.Inject("waitForAddPartition", func(val failpoint.Value) { + for _, phyTable := range tableList { + is := d.infoCache.GetLatest() + _, ok := is.TableByID(phyTable.ID) + if !ok { + tb, _, _ := is.FindTableByPartitionID(phyTable.ID) + if tb == nil { + logutil.BgLogger().Info("[ddl] waitForAddPartition") + sleepSecond := val.(int) + time.Sleep(time.Duration(sleepSecond) * time.Second) + } + } + } + }) + for _, tb := range tableList { // For every region in each table, if it has one replica, we reckon it ready. // These request can be batched as an optimization. @@ -396,7 +406,7 @@ func (d *ddl) pollTiFlashReplicaStatus(ctx sessionctx.Context, pollTiFlashContex available = val.(bool) }) // We only check unavailable tables here, so doesn't include blocked add partition case. - if !available { + if !available && !tb.LogicalTableAvailable { allReplicaReady = false enabled, inqueue, _ := pollTiFlashContext.Backoff.Tick(tb.ID) if inqueue && !enabled { @@ -463,110 +473,6 @@ func (d *ddl) pollTiFlashReplicaStatus(ctx sessionctx.Context, pollTiFlashContex return allReplicaReady, nil } -func getDropOrTruncateTableTiflash(ctx sessionctx.Context, currentSchema infoschema.InfoSchema, tikvHelper *helper.Helper, replicaInfos *[]TiFlashReplicaStatus) error { - store := tikvHelper.Store.(kv.Storage) - - txn, err := store.Begin() - if err != nil { - return errors.Trace(err) - } - gcSafePoint, err := gcutil.GetGCSafePoint(ctx) - if err != nil { - return err - } - uniqueIDMap := make(map[int64]struct{}) - handleJobAndTableInfo := func(job *model.Job, tblInfo *model.TableInfo) (bool, error) { - // Avoid duplicate table ID info. - if _, ok := currentSchema.TableByID(tblInfo.ID); ok { - return false, nil - } - if _, ok := uniqueIDMap[tblInfo.ID]; ok { - return false, nil - } - uniqueIDMap[tblInfo.ID] = struct{}{} - GetTiFlashReplicaInfo(tblInfo, replicaInfos) - return false, nil - } - fn := func(jobs []*model.Job) (bool, error) { - getTable := func(StartTS uint64, SchemaID int64, TableID int64) (*model.TableInfo, error) { - snapMeta := meta.NewSnapshotMeta(store.GetSnapshot(kv.NewVersion(StartTS))) - if err != nil { - return nil, err - } - tbl, err := snapMeta.GetTable(SchemaID, TableID) - return tbl, err - } - return GetDropOrTruncateTableInfoFromJobsByStore(jobs, gcSafePoint, getTable, handleJobAndTableInfo) - } - - err = IterAllDDLJobs(txn, fn) - if err != nil { - if terror.ErrorEqual(variable.ErrSnapshotTooOld, err) { - // The err indicate that current ddl job and remain DDL jobs was been deleted by GC, - // just ignore the error and return directly. - return nil - } - return err - } - return nil -} - -// HandlePlacementRuleRoutine fetch all rules from pd, remove all obsolete rules. -// It handles rare situation, when we fail to alter pd rules. -func HandlePlacementRuleRoutine(ctx sessionctx.Context, d *ddl, tableList []TiFlashReplicaStatus) error { - c := context.Background() - tikvStore, ok := ctx.GetStore().(helper.Storage) - if !ok { - return errors.New("Can not get Helper") - } - tikvHelper := &helper.Helper{ - Store: tikvStore, - RegionCache: tikvStore.GetRegionCache(), - } - - allRulesArr, err := infosync.GetTiFlashGroupRules(c, "tiflash") - if err != nil { - return errors.Trace(err) - } - allRules := make(map[string]placement.TiFlashRule) - for _, r := range allRulesArr { - allRules[r.ID] = r - } - - start := time.Now() - originLen := len(tableList) - currentSchema := d.GetInfoSchemaWithInterceptor(ctx) - if err := getDropOrTruncateTableTiflash(ctx, currentSchema, tikvHelper, &tableList); err != nil { - // may fail when no `tikv_gc_safe_point` available, should return in order to remove valid pd rules. - logutil.BgLogger().Error("getDropOrTruncateTableTiflash returns error", zap.Error(err)) - return errors.Trace(err) - } - elapsed := time.Since(start) - logutil.BgLogger().Info("getDropOrTruncateTableTiflash cost", zap.Duration("time", elapsed), zap.Int("updated", len(tableList)-originLen)) - for _, tb := range tableList { - // For every region in each table, if it has one replica, we reckon it ready. - ruleID := fmt.Sprintf("table-%v-r", tb.ID) - if _, ok := allRules[ruleID]; !ok { - // Mostly because of a previous failure of setting pd rule. - logutil.BgLogger().Warn(fmt.Sprintf("Table %v exists, but there are no rule for it", tb.ID)) - newRule := infosync.MakeNewRule(tb.ID, tb.Count, tb.LocationLabels) - _ = infosync.SetTiFlashPlacementRule(context.Background(), *newRule) - } - // For every existing table, we do not remove their rules. - delete(allRules, ruleID) - } - - // Remove rules of non-existing table - for _, v := range allRules { - logutil.BgLogger().Info("Remove TiFlash rule", zap.String("id", v.ID)) - if err := infosync.DeleteTiFlashPlacementRule(c, "tiflash", v.ID); err != nil { - logutil.BgLogger().Warn("delete TiFlash pd rule failed", zap.Error(err), zap.String("ruleID", v.ID)) - } - } - - return nil -} - func (d *ddl) PollTiFlashRoutine() { pollTiflashContext, err := NewTiFlashManagementContext() if err != nil { diff --git a/ddl/ddl_tiflash_test.go b/ddl/ddl_tiflash_test.go index b1a05cc73af3a..8f4349bef6f15 100644 --- a/ddl/ddl_tiflash_test.go +++ b/ddl/ddl_tiflash_test.go @@ -1014,3 +1014,38 @@ func TestTiFlashGroupIndexWhenStartup(t *testing.T) { require.Greater(t, s.tiflash.GroupIndex, placement.RuleIndexTable) require.Greater(t, s.tiflash.GroupIndex, placement.RuleIndexPartition) } + +func TestTiFlashAvailableAfterAddPartition(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists ddltiflash") + tk.MustExec("create table ddltiflash(z int) PARTITION BY RANGE(z) (PARTITION p0 VALUES LESS THAN (10))") + tk.MustExec("alter table ddltiflash set tiflash replica 1") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3) + CheckTableAvailable(s.dom, t, 1, []string{}) + + tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash")) + require.NoError(t, err) + require.NotNil(t, tb) + + // still available after adding partition. + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/sleepBeforeReplicaOnly", `return(2)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/waitForAddPartition", `return(3)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/PollTiFlashReplicaStatusReplaceCurAvailableValue", `return(false)`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/sleepBeforeReplicaOnly")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/waitForAddPartition")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/PollTiFlashReplicaStatusReplaceCurAvailableValue")) + }() + tk.MustExec("ALTER TABLE ddltiflash ADD PARTITION (PARTITION pn VALUES LESS THAN (20))") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3) + CheckTableAvailable(s.dom, t, 1, []string{}) + tb, err = s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash")) + require.NoError(t, err) + pi := tb.Meta().GetPartitionInfo() + require.NotNil(t, pi) + require.Equal(t, len(pi.Definitions), 2) +} diff --git a/ddl/partition.go b/ddl/partition.go index 1a93cfdfb7f9b..e073af97b8cfb 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -165,6 +165,10 @@ func (w *worker) onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (v job.SchemaState = model.StateReplicaOnly case model.StateReplicaOnly: // replica only -> public + failpoint.Inject("sleepBeforeReplicaOnly", func(val failpoint.Value) { + sleepSecond := val.(int) + time.Sleep(time.Duration(sleepSecond) * time.Second) + }) // Here need do some tiflash replica complement check. // TODO: If a table is with no TiFlashReplica or it is not available, the replica-only state can be eliminated. if tblInfo.TiFlashReplica != nil && tblInfo.TiFlashReplica.Available { diff --git a/store/gcworker/gc_worker.go b/store/gcworker/gc_worker.go index 243af7509a9fe..bb62aa0dc8c22 100644 --- a/store/gcworker/gc_worker.go +++ b/store/gcworker/gc_worker.go @@ -1930,7 +1930,6 @@ func (w *GCWorker) doGCPlacementRules(safePoint uint64, dr util.DelRangeTask, gc zap.Int64("tableID", id), zap.String("endKey", string(dr.EndKey)), zap.Uint64("safePoint", safePoint)) ruleID := fmt.Sprintf("table-%v-r", id) if err := infosync.DeleteTiFlashPlacementRule(context.Background(), "tiflash", ruleID); err != nil { - // If DeletePlacementRule fails here, the rule will be deleted in `HandlePlacementRuleRoutine`. logutil.BgLogger().Error("delete TiFlash pd rule failed when gc", zap.Error(err), zap.String("ruleID", ruleID), zap.Uint64("safePoint", safePoint)) } else {