Skip to content

Commit

Permalink
DDL: Fix bug that TiFlash replica unavailable after add partition wit…
Browse files Browse the repository at this point in the history
…h small probability (#39170) (#39195)

close #39171
  • Loading branch information
ti-chi-bot authored Jan 16, 2023
1 parent cfdd74f commit 67705e6
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 121 deletions.
146 changes: 26 additions & 120 deletions ddl/ddl_tiflash_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,27 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/placement"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/logutil"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
)

// TiFlashReplicaStatus records status for each TiFlash replica.
type TiFlashReplicaStatus struct {
ID int64
Count uint64
LocationLabels []string
Available bool
HighPriority bool
IsPartition bool
ID int64
Count uint64
LocationLabels []string
Available bool
LogicalTableAvailable bool
HighPriority bool
IsPartition bool
}

// TiFlashTick is type for backoff threshold.
Expand Down Expand Up @@ -274,16 +269,16 @@ func GetTiFlashReplicaInfo(tblInfo *model.TableInfo, tableList *[]TiFlashReplica
for _, p := range pi.Definitions {
logutil.BgLogger().Debug(fmt.Sprintf("Table %v has partition %v\n", tblInfo.ID, p.ID))
*tableList = append(*tableList, TiFlashReplicaStatus{p.ID,
tblInfo.TiFlashReplica.Count, tblInfo.TiFlashReplica.LocationLabels, tblInfo.TiFlashReplica.IsPartitionAvailable(p.ID), false, true})
tblInfo.TiFlashReplica.Count, tblInfo.TiFlashReplica.LocationLabels, tblInfo.TiFlashReplica.IsPartitionAvailable(p.ID), tblInfo.TiFlashReplica.Available, false, true})
}
// partitions that in adding mid-state
for _, p := range pi.AddingDefinitions {
logutil.BgLogger().Debug(fmt.Sprintf("Table %v has partition adding %v\n", tblInfo.ID, p.ID))
*tableList = append(*tableList, TiFlashReplicaStatus{p.ID, tblInfo.TiFlashReplica.Count, tblInfo.TiFlashReplica.LocationLabels, tblInfo.TiFlashReplica.IsPartitionAvailable(p.ID), true, true})
*tableList = append(*tableList, TiFlashReplicaStatus{p.ID, tblInfo.TiFlashReplica.Count, tblInfo.TiFlashReplica.LocationLabels, tblInfo.TiFlashReplica.IsPartitionAvailable(p.ID), tblInfo.TiFlashReplica.Available, true, true})
}
} else {
logutil.BgLogger().Debug(fmt.Sprintf("Table %v has no partition\n", tblInfo.ID))
*tableList = append(*tableList, TiFlashReplicaStatus{tblInfo.ID, tblInfo.TiFlashReplica.Count, tblInfo.TiFlashReplica.LocationLabels, tblInfo.TiFlashReplica.Available, false, false})
*tableList = append(*tableList, TiFlashReplicaStatus{tblInfo.ID, tblInfo.TiFlashReplica.Count, tblInfo.TiFlashReplica.LocationLabels, tblInfo.TiFlashReplica.Available, tblInfo.TiFlashReplica.Available, false, false})
}
}

Expand Down Expand Up @@ -388,6 +383,21 @@ func (d *ddl) pollTiFlashReplicaStatus(ctx sessionctx.Context, pollTiFlashContex
}
}

failpoint.Inject("waitForAddPartition", func(val failpoint.Value) {
for _, phyTable := range tableList {
is := d.infoCache.GetLatest()
_, ok := is.TableByID(phyTable.ID)
if !ok {
tb, _, _ := is.FindTableByPartitionID(phyTable.ID)
if tb == nil {
logutil.BgLogger().Info("[ddl] waitForAddPartition")
sleepSecond := val.(int)
time.Sleep(time.Duration(sleepSecond) * time.Second)
}
}
}
})

for _, tb := range tableList {
// For every region in each table, if it has one replica, we reckon it ready.
// These request can be batched as an optimization.
Expand All @@ -396,7 +406,7 @@ func (d *ddl) pollTiFlashReplicaStatus(ctx sessionctx.Context, pollTiFlashContex
available = val.(bool)
})
// We only check unavailable tables here, so doesn't include blocked add partition case.
if !available {
if !available && !tb.LogicalTableAvailable {
allReplicaReady = false
enabled, inqueue, _ := pollTiFlashContext.Backoff.Tick(tb.ID)
if inqueue && !enabled {
Expand Down Expand Up @@ -463,110 +473,6 @@ func (d *ddl) pollTiFlashReplicaStatus(ctx sessionctx.Context, pollTiFlashContex
return allReplicaReady, nil
}

func getDropOrTruncateTableTiflash(ctx sessionctx.Context, currentSchema infoschema.InfoSchema, tikvHelper *helper.Helper, replicaInfos *[]TiFlashReplicaStatus) error {
store := tikvHelper.Store.(kv.Storage)

txn, err := store.Begin()
if err != nil {
return errors.Trace(err)
}
gcSafePoint, err := gcutil.GetGCSafePoint(ctx)
if err != nil {
return err
}
uniqueIDMap := make(map[int64]struct{})
handleJobAndTableInfo := func(job *model.Job, tblInfo *model.TableInfo) (bool, error) {
// Avoid duplicate table ID info.
if _, ok := currentSchema.TableByID(tblInfo.ID); ok {
return false, nil
}
if _, ok := uniqueIDMap[tblInfo.ID]; ok {
return false, nil
}
uniqueIDMap[tblInfo.ID] = struct{}{}
GetTiFlashReplicaInfo(tblInfo, replicaInfos)
return false, nil
}
fn := func(jobs []*model.Job) (bool, error) {
getTable := func(StartTS uint64, SchemaID int64, TableID int64) (*model.TableInfo, error) {
snapMeta := meta.NewSnapshotMeta(store.GetSnapshot(kv.NewVersion(StartTS)))
if err != nil {
return nil, err
}
tbl, err := snapMeta.GetTable(SchemaID, TableID)
return tbl, err
}
return GetDropOrTruncateTableInfoFromJobsByStore(jobs, gcSafePoint, getTable, handleJobAndTableInfo)
}

err = IterAllDDLJobs(txn, fn)
if err != nil {
if terror.ErrorEqual(variable.ErrSnapshotTooOld, err) {
// The err indicate that current ddl job and remain DDL jobs was been deleted by GC,
// just ignore the error and return directly.
return nil
}
return err
}
return nil
}

// HandlePlacementRuleRoutine fetch all rules from pd, remove all obsolete rules.
// It handles rare situation, when we fail to alter pd rules.
func HandlePlacementRuleRoutine(ctx sessionctx.Context, d *ddl, tableList []TiFlashReplicaStatus) error {
c := context.Background()
tikvStore, ok := ctx.GetStore().(helper.Storage)
if !ok {
return errors.New("Can not get Helper")
}
tikvHelper := &helper.Helper{
Store: tikvStore,
RegionCache: tikvStore.GetRegionCache(),
}

allRulesArr, err := infosync.GetTiFlashGroupRules(c, "tiflash")
if err != nil {
return errors.Trace(err)
}
allRules := make(map[string]placement.TiFlashRule)
for _, r := range allRulesArr {
allRules[r.ID] = r
}

start := time.Now()
originLen := len(tableList)
currentSchema := d.GetInfoSchemaWithInterceptor(ctx)
if err := getDropOrTruncateTableTiflash(ctx, currentSchema, tikvHelper, &tableList); err != nil {
// may fail when no `tikv_gc_safe_point` available, should return in order to remove valid pd rules.
logutil.BgLogger().Error("getDropOrTruncateTableTiflash returns error", zap.Error(err))
return errors.Trace(err)
}
elapsed := time.Since(start)
logutil.BgLogger().Info("getDropOrTruncateTableTiflash cost", zap.Duration("time", elapsed), zap.Int("updated", len(tableList)-originLen))
for _, tb := range tableList {
// For every region in each table, if it has one replica, we reckon it ready.
ruleID := fmt.Sprintf("table-%v-r", tb.ID)
if _, ok := allRules[ruleID]; !ok {
// Mostly because of a previous failure of setting pd rule.
logutil.BgLogger().Warn(fmt.Sprintf("Table %v exists, but there are no rule for it", tb.ID))
newRule := infosync.MakeNewRule(tb.ID, tb.Count, tb.LocationLabels)
_ = infosync.SetTiFlashPlacementRule(context.Background(), *newRule)
}
// For every existing table, we do not remove their rules.
delete(allRules, ruleID)
}

// Remove rules of non-existing table
for _, v := range allRules {
logutil.BgLogger().Info("Remove TiFlash rule", zap.String("id", v.ID))
if err := infosync.DeleteTiFlashPlacementRule(c, "tiflash", v.ID); err != nil {
logutil.BgLogger().Warn("delete TiFlash pd rule failed", zap.Error(err), zap.String("ruleID", v.ID))
}
}

return nil
}

func (d *ddl) PollTiFlashRoutine() {
pollTiflashContext, err := NewTiFlashManagementContext()
if err != nil {
Expand Down
35 changes: 35 additions & 0 deletions ddl/ddl_tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1014,3 +1014,38 @@ func TestTiFlashGroupIndexWhenStartup(t *testing.T) {
require.Greater(t, s.tiflash.GroupIndex, placement.RuleIndexTable)
require.Greater(t, s.tiflash.GroupIndex, placement.RuleIndexPartition)
}

func TestTiFlashAvailableAfterAddPartition(t *testing.T) {
s, teardown := createTiFlashContext(t)
defer teardown()
tk := testkit.NewTestKit(t, s.store)

tk.MustExec("use test")
tk.MustExec("drop table if exists ddltiflash")
tk.MustExec("create table ddltiflash(z int) PARTITION BY RANGE(z) (PARTITION p0 VALUES LESS THAN (10))")
tk.MustExec("alter table ddltiflash set tiflash replica 1")
time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3)
CheckTableAvailable(s.dom, t, 1, []string{})

tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash"))
require.NoError(t, err)
require.NotNil(t, tb)

// still available after adding partition.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/sleepBeforeReplicaOnly", `return(2)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/waitForAddPartition", `return(3)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/PollTiFlashReplicaStatusReplaceCurAvailableValue", `return(false)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/sleepBeforeReplicaOnly"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/waitForAddPartition"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/PollTiFlashReplicaStatusReplaceCurAvailableValue"))
}()
tk.MustExec("ALTER TABLE ddltiflash ADD PARTITION (PARTITION pn VALUES LESS THAN (20))")
time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3)
CheckTableAvailable(s.dom, t, 1, []string{})
tb, err = s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash"))
require.NoError(t, err)
pi := tb.Meta().GetPartitionInfo()
require.NotNil(t, pi)
require.Equal(t, len(pi.Definitions), 2)
}
4 changes: 4 additions & 0 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ func (w *worker) onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (v
job.SchemaState = model.StateReplicaOnly
case model.StateReplicaOnly:
// replica only -> public
failpoint.Inject("sleepBeforeReplicaOnly", func(val failpoint.Value) {
sleepSecond := val.(int)
time.Sleep(time.Duration(sleepSecond) * time.Second)
})
// Here need do some tiflash replica complement check.
// TODO: If a table is with no TiFlashReplica or it is not available, the replica-only state can be eliminated.
if tblInfo.TiFlashReplica != nil && tblInfo.TiFlashReplica.Available {
Expand Down
1 change: 0 additions & 1 deletion store/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1930,7 +1930,6 @@ func (w *GCWorker) doGCPlacementRules(safePoint uint64, dr util.DelRangeTask, gc
zap.Int64("tableID", id), zap.String("endKey", string(dr.EndKey)), zap.Uint64("safePoint", safePoint))
ruleID := fmt.Sprintf("table-%v-r", id)
if err := infosync.DeleteTiFlashPlacementRule(context.Background(), "tiflash", ruleID); err != nil {
// If DeletePlacementRule fails here, the rule will be deleted in `HandlePlacementRuleRoutine`.
logutil.BgLogger().Error("delete TiFlash pd rule failed when gc",
zap.Error(err), zap.String("ruleID", ruleID), zap.Uint64("safePoint", safePoint))
} else {
Expand Down

0 comments on commit 67705e6

Please sign in to comment.