diff --git a/ddl/ddl_tiflash_api.go b/ddl/ddl_tiflash_api.go index 7e456815acd8c..4b8fca2a91c0f 100644 --- a/ddl/ddl_tiflash_api.go +++ b/ddl/ddl_tiflash_api.go @@ -29,20 +29,14 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/ddl/placement" ddlutil "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/infoschema" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/helper" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/util" - "github.com/pingcap/tidb/util/gcutil" "github.com/pingcap/tidb/util/logutil" atomicutil "go.uber.org/atomic" "go.uber.org/zap" @@ -50,12 +44,13 @@ import ( // TiFlashReplicaStatus records status for each TiFlash replica. type TiFlashReplicaStatus struct { - ID int64 - Count uint64 - LocationLabels []string - Available bool - HighPriority bool - IsPartition bool + ID int64 + Count uint64 + LocationLabels []string + Available bool + LogicalTableAvailable bool + HighPriority bool + IsPartition bool } // TiFlashTick is type for backoff threshold. @@ -283,16 +278,16 @@ func LoadTiFlashReplicaInfo(tblInfo *model.TableInfo, tableList *[]TiFlashReplic for _, p := range pi.Definitions { logutil.BgLogger().Debug(fmt.Sprintf("Table %v has partition %v\n", tblInfo.ID, p.ID)) *tableList = append(*tableList, TiFlashReplicaStatus{p.ID, - tblInfo.TiFlashReplica.Count, tblInfo.TiFlashReplica.LocationLabels, tblInfo.TiFlashReplica.IsPartitionAvailable(p.ID), false, true}) + tblInfo.TiFlashReplica.Count, tblInfo.TiFlashReplica.LocationLabels, tblInfo.TiFlashReplica.IsPartitionAvailable(p.ID), tblInfo.TiFlashReplica.Available, false, true}) } // partitions that in adding mid-state for _, p := range pi.AddingDefinitions { logutil.BgLogger().Debug(fmt.Sprintf("Table %v has partition adding %v\n", tblInfo.ID, p.ID)) - *tableList = append(*tableList, TiFlashReplicaStatus{p.ID, tblInfo.TiFlashReplica.Count, tblInfo.TiFlashReplica.LocationLabels, tblInfo.TiFlashReplica.IsPartitionAvailable(p.ID), true, true}) + *tableList = append(*tableList, TiFlashReplicaStatus{p.ID, tblInfo.TiFlashReplica.Count, tblInfo.TiFlashReplica.LocationLabels, tblInfo.TiFlashReplica.IsPartitionAvailable(p.ID), tblInfo.TiFlashReplica.Available, true, true}) } } else { logutil.BgLogger().Debug(fmt.Sprintf("Table %v has no partition\n", tblInfo.ID)) - *tableList = append(*tableList, TiFlashReplicaStatus{tblInfo.ID, tblInfo.TiFlashReplica.Count, tblInfo.TiFlashReplica.LocationLabels, tblInfo.TiFlashReplica.Available, false, false}) + *tableList = append(*tableList, TiFlashReplicaStatus{tblInfo.ID, tblInfo.TiFlashReplica.Count, tblInfo.TiFlashReplica.LocationLabels, tblInfo.TiFlashReplica.Available, tblInfo.TiFlashReplica.Available, false, false}) } } @@ -355,22 +350,6 @@ func updateTiFlashStores(pollTiFlashContext *TiFlashManagementContext) error { return nil } -func getTiFlashPeerWithoutLagCount(pollTiFlashContext *TiFlashManagementContext, tableID int64) (int, error) { - // storeIDs -> regionID, PD will not create two peer on the same store - var flashPeerCount int - for _, store := range pollTiFlashContext.TiFlashStores { - regionReplica := make(map[int64]int) - err := helper.CollectTiFlashStatus(store.Store.StatusAddress, tableID, ®ionReplica) - if err != nil { - logutil.BgLogger().Error("Fail to get peer status from TiFlash.", - zap.Int64("tableID", tableID)) - return 0, err - } - flashPeerCount += len(regionReplica) - } - return flashPeerCount, nil -} - func pollAvailableTableProgress(schemas infoschema.InfoSchema, ctx sessionctx.Context, pollTiFlashContext *TiFlashManagementContext) { pollMaxCount := RefreshProgressMaxTableCount failpoint.Inject("PollAvailableTableProgressMaxCount", func(val failpoint.Value) { @@ -466,6 +445,21 @@ func (d *ddl) refreshTiFlashTicker(ctx sessionctx.Context, pollTiFlashContext *T } } + failpoint.Inject("waitForAddPartition", func(val failpoint.Value) { + for _, phyTable := range tableList { + is := d.infoCache.GetLatest() + _, ok := is.TableByID(phyTable.ID) + if !ok { + tb, _, _ := is.FindTableByPartitionID(phyTable.ID) + if tb == nil { + logutil.BgLogger().Info("[ddl] waitForAddPartition") + sleepSecond := val.(int) + time.Sleep(time.Duration(sleepSecond) * time.Second) + } + } + } + }) + needPushPending := false if pollTiFlashContext.UpdatingProgressTables.Len() == 0 { needPushPending = true @@ -479,7 +473,7 @@ func (d *ddl) refreshTiFlashTicker(ctx sessionctx.Context, pollTiFlashContext *T available = val.(bool) }) // We only check unavailable tables here, so doesn't include blocked add partition case. - if !available { + if !available && !tb.LogicalTableAvailable { enabled, inqueue, _ := pollTiFlashContext.Backoff.Tick(tb.ID) if inqueue && !enabled { logutil.BgLogger().Info("Escape checking available status due to backoff", zap.Int64("tableId", tb.ID)) @@ -540,110 +534,6 @@ func (d *ddl) refreshTiFlashTicker(ctx sessionctx.Context, pollTiFlashContext *T return nil } -func getDropOrTruncateTableTiflash(ctx sessionctx.Context, currentSchema infoschema.InfoSchema, tikvHelper *helper.Helper, replicaInfos *[]TiFlashReplicaStatus) error { - store := tikvHelper.Store.(kv.Storage) - - txn, err := store.Begin() - if err != nil { - return errors.Trace(err) - } - gcSafePoint, err := gcutil.GetGCSafePoint(ctx) - if err != nil { - return err - } - uniqueIDMap := make(map[int64]struct{}) - handleJobAndTableInfo := func(job *model.Job, tblInfo *model.TableInfo) (bool, error) { - // Avoid duplicate table ID info. - if _, ok := currentSchema.TableByID(tblInfo.ID); ok { - return false, nil - } - if _, ok := uniqueIDMap[tblInfo.ID]; ok { - return false, nil - } - uniqueIDMap[tblInfo.ID] = struct{}{} - LoadTiFlashReplicaInfo(tblInfo, replicaInfos) - return false, nil - } - fn := func(jobs []*model.Job) (bool, error) { - getTable := func(StartTS uint64, SchemaID int64, TableID int64) (*model.TableInfo, error) { - snapMeta := meta.NewSnapshotMeta(store.GetSnapshot(kv.NewVersion(StartTS))) - if err != nil { - return nil, err - } - tbl, err := snapMeta.GetTable(SchemaID, TableID) - return tbl, err - } - return GetDropOrTruncateTableInfoFromJobsByStore(jobs, gcSafePoint, getTable, handleJobAndTableInfo) - } - - err = IterAllDDLJobs(ctx, txn, fn) - if err != nil { - if terror.ErrorEqual(variable.ErrSnapshotTooOld, err) { - // The err indicate that current ddl job and remain DDL jobs was been deleted by GC, - // just ignore the error and return directly. - return nil - } - return err - } - return nil -} - -// HandlePlacementRuleRoutine fetch all rules from pd, remove all obsolete rules. -// It handles rare situation, when we fail to alter pd rules. -func HandlePlacementRuleRoutine(ctx sessionctx.Context, d *ddl, tableList []TiFlashReplicaStatus) error { - c := context.Background() - tikvStore, ok := ctx.GetStore().(helper.Storage) - if !ok { - return errors.New("Can not get Helper") - } - tikvHelper := &helper.Helper{ - Store: tikvStore, - RegionCache: tikvStore.GetRegionCache(), - } - - allRulesArr, err := infosync.GetTiFlashGroupRules(c, "tiflash") - if err != nil { - return errors.Trace(err) - } - allRules := make(map[string]placement.TiFlashRule) - for _, r := range allRulesArr { - allRules[r.ID] = r - } - - start := time.Now() - originLen := len(tableList) - currentSchema := d.GetInfoSchemaWithInterceptor(ctx) - if err := getDropOrTruncateTableTiflash(ctx, currentSchema, tikvHelper, &tableList); err != nil { - // may fail when no `tikv_gc_safe_point` available, should return in order to remove valid pd rules. - logutil.BgLogger().Error("getDropOrTruncateTableTiflash returns error", zap.Error(err)) - return errors.Trace(err) - } - elapsed := time.Since(start) - logutil.BgLogger().Info("getDropOrTruncateTableTiflash cost", zap.Duration("time", elapsed), zap.Int("updated", len(tableList)-originLen)) - for _, tb := range tableList { - // For every region in each table, if it has one replica, we reckon it ready. - ruleID := fmt.Sprintf("table-%v-r", tb.ID) - if _, ok := allRules[ruleID]; !ok { - // Mostly because of a previous failure of setting pd rule. - logutil.BgLogger().Warn(fmt.Sprintf("Table %v exists, but there are no rule for it", tb.ID)) - newRule := infosync.MakeNewRule(tb.ID, tb.Count, tb.LocationLabels) - _ = infosync.SetTiFlashPlacementRule(context.Background(), *newRule) - } - // For every existing table, we do not remove their rules. - delete(allRules, ruleID) - } - - // Remove rules of non-existing table - for _, v := range allRules { - logutil.BgLogger().Info("Remove TiFlash rule", zap.String("id", v.ID)) - if err := infosync.DeleteTiFlashPlacementRule(c, "tiflash", v.ID); err != nil { - logutil.BgLogger().Warn("delete TiFlash pd rule failed", zap.Error(err), zap.String("ruleID", v.ID)) - } - } - - return nil -} - func (d *ddl) PollTiFlashRoutine() { pollTiflashContext, err := NewTiFlashManagementContext() if err != nil { diff --git a/ddl/ddl_tiflash_test.go b/ddl/ddl_tiflash_test.go index 27d05112df483..accf7cc038ebd 100644 --- a/ddl/ddl_tiflash_test.go +++ b/ddl/ddl_tiflash_test.go @@ -1264,3 +1264,38 @@ func TestTiFlashPartitionNotAvailable(t *testing.T) { require.NotNil(t, replica) require.True(t, replica.Available) } + +func TestTiFlashAvailableAfterAddPartition(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists ddltiflash") + tk.MustExec("create table ddltiflash(z int) PARTITION BY RANGE(z) (PARTITION p0 VALUES LESS THAN (10))") + tk.MustExec("alter table ddltiflash set tiflash replica 1") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3) + CheckTableAvailable(s.dom, t, 1, []string{}) + + tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash")) + require.NoError(t, err) + require.NotNil(t, tb) + + // still available after adding partition. + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/sleepBeforeReplicaOnly", `return(2)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/waitForAddPartition", `return(3)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/PollTiFlashReplicaStatusReplaceCurAvailableValue", `return(false)`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/sleepBeforeReplicaOnly")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/waitForAddPartition")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/PollTiFlashReplicaStatusReplaceCurAvailableValue")) + }() + tk.MustExec("ALTER TABLE ddltiflash ADD PARTITION (PARTITION pn VALUES LESS THAN (20))") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3) + CheckTableAvailable(s.dom, t, 1, []string{}) + tb, err = s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash")) + require.NoError(t, err) + pi := tb.Meta().GetPartitionInfo() + require.NotNil(t, pi) + require.Equal(t, len(pi.Definitions), 2) +} diff --git a/ddl/partition.go b/ddl/partition.go index 7bba0b1006332..a8947d091bfc5 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -170,6 +170,10 @@ func (w *worker) onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (v job.SchemaState = model.StateReplicaOnly case model.StateReplicaOnly: // replica only -> public + failpoint.Inject("sleepBeforeReplicaOnly", func(val failpoint.Value) { + sleepSecond := val.(int) + time.Sleep(time.Duration(sleepSecond) * time.Second) + }) // Here need do some tiflash replica complement check. // TODO: If a table is with no TiFlashReplica or it is not available, the replica-only state can be eliminated. if tblInfo.TiFlashReplica != nil && tblInfo.TiFlashReplica.Available { @@ -193,6 +197,15 @@ func (w *worker) onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (v if tblInfo.TiFlashReplica != nil && tblInfo.TiFlashReplica.Available { for _, d := range partInfo.Definitions { tblInfo.TiFlashReplica.AvailablePartitionIDs = append(tblInfo.TiFlashReplica.AvailablePartitionIDs, d.ID) + err = infosync.UpdateTiFlashProgressCache(d.ID, 1) + if err != nil { + // just print log, progress will be updated in `refreshTiFlashTicker` + logutil.BgLogger().Error("update tiflash sync progress cache failed", + zap.Error(err), + zap.Int64("tableID", tblInfo.ID), + zap.Int64("partitionID", d.ID), + ) + } } } // For normal and replica finished table, move the `addingDefinitions` into `Definitions`. diff --git a/store/gcworker/gc_worker.go b/store/gcworker/gc_worker.go index 92c3b535ba5d3..054ec83ee7e8a 100644 --- a/store/gcworker/gc_worker.go +++ b/store/gcworker/gc_worker.go @@ -2028,7 +2028,6 @@ func (w *GCWorker) doGCPlacementRules(se session.Session, safePoint uint64, dr u zap.Int64("tableID", id), zap.String("endKey", string(dr.EndKey)), zap.Uint64("safePoint", safePoint)) ruleID := fmt.Sprintf("table-%v-r", id) if err := infosync.DeleteTiFlashPlacementRule(context.Background(), "tiflash", ruleID); err != nil { - // If DeletePlacementRule fails here, the rule will be deleted in `HandlePlacementRuleRoutine`. logutil.BgLogger().Error("delete TiFlash pd rule failed when gc", zap.Error(err), zap.String("ruleID", ruleID), zap.Uint64("safePoint", safePoint)) } else {