Skip to content

Commit

Permalink
DDL: delete partition table tiflash replica progress from ETCD when t…
Browse files Browse the repository at this point in the history
…runcate table, drop table, set tiflash replica 0 (#38551)

close #38550
  • Loading branch information
hehechen authored Oct 20, 2022
1 parent 431a9d8 commit c52b5a7
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 10 deletions.
62 changes: 61 additions & 1 deletion ddl/ddl_tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,13 +983,19 @@ func TestTiFlashProgress(t *testing.T) {
}
_ = infosync.UpdateTiFlashTableSyncProgress(context.TODO(), tb.Meta().ID, "5.0")
mustExist(tb.Meta().ID)
_ = infosync.DeleteTiFlashTableSyncProgress(tb.Meta().ID)
_ = infosync.DeleteTiFlashTableSyncProgress(tb.Meta())
mustAbsent(tb.Meta().ID)

_ = infosync.UpdateTiFlashTableSyncProgress(context.TODO(), tb.Meta().ID, "5.0")
tk.MustExec("truncate table tiflash_d.t")
mustAbsent(tb.Meta().ID)

tb, _ = s.dom.InfoSchema().TableByName(model.NewCIStr("tiflash_d"), model.NewCIStr("t"))
_ = infosync.UpdateTiFlashTableSyncProgress(context.TODO(), tb.Meta().ID, "5.0")
tk.MustExec("alter table tiflash_d.t set tiflash replica 0")
mustAbsent(tb.Meta().ID)
tk.MustExec("alter table tiflash_d.t set tiflash replica 1")

tb, _ = s.dom.InfoSchema().TableByName(model.NewCIStr("tiflash_d"), model.NewCIStr("t"))
_ = infosync.UpdateTiFlashTableSyncProgress(context.TODO(), tb.Meta().ID, "5.0")
tk.MustExec("drop table tiflash_d.t")
Expand All @@ -998,6 +1004,60 @@ func TestTiFlashProgress(t *testing.T) {
time.Sleep(100 * time.Millisecond)
}

func TestTiFlashProgressForPartitionTable(t *testing.T) {
s, teardown := createTiFlashContext(t)
s.tiflash.NotAvailable = true
defer teardown()
tk := testkit.NewTestKit(t, s.store)

integration.BeforeTest(t, integration.WithoutGoLeakDetection())
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)

save := infosync.GetEtcdClient()
defer infosync.SetEtcdClient(save)
infosync.SetEtcdClient(cluster.Client(0))
tk.MustExec("create database tiflash_d")
tk.MustExec("create table tiflash_d.t(z int) PARTITION BY RANGE(z) (PARTITION p0 VALUES LESS THAN (10))")
tk.MustExec("alter table tiflash_d.t set tiflash replica 1")
tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("tiflash_d"), model.NewCIStr("t"))
require.NoError(t, err)
require.NotNil(t, tb)
mustExist := func(tid int64) {
pm, err := infosync.GetTiFlashTableSyncProgress(context.TODO())
require.NoError(t, err)
_, ok := pm[tid]
require.True(t, ok)
}
mustAbsent := func(tid int64) {
pm, err := infosync.GetTiFlashTableSyncProgress(context.TODO())
require.NoError(t, err)
_, ok := pm[tid]
require.False(t, ok)
}
time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable)
mustExist(tb.Meta().Partition.Definitions[0].ID)
_ = infosync.DeleteTiFlashTableSyncProgress(tb.Meta())
mustAbsent(tb.Meta().Partition.Definitions[0].ID)

_ = infosync.UpdateTiFlashTableSyncProgress(context.TODO(), tb.Meta().Partition.Definitions[0].ID, "5.0")
tk.MustExec("truncate table tiflash_d.t")
mustAbsent(tb.Meta().Partition.Definitions[0].ID)

tb, _ = s.dom.InfoSchema().TableByName(model.NewCIStr("tiflash_d"), model.NewCIStr("t"))
_ = infosync.UpdateTiFlashTableSyncProgress(context.TODO(), tb.Meta().Partition.Definitions[0].ID, "5.0")
tk.MustExec("alter table tiflash_d.t set tiflash replica 0")
mustAbsent(tb.Meta().Partition.Definitions[0].ID)
tk.MustExec("alter table tiflash_d.t set tiflash replica 1")

tb, _ = s.dom.InfoSchema().TableByName(model.NewCIStr("tiflash_d"), model.NewCIStr("t"))
_ = infosync.UpdateTiFlashTableSyncProgress(context.TODO(), tb.Meta().Partition.Definitions[0].ID, "5.0")
tk.MustExec("drop table tiflash_d.t")
mustAbsent(tb.Meta().Partition.Definitions[0].ID)

time.Sleep(100 * time.Millisecond)
}

func TestTiFlashGroupIndexWhenStartup(t *testing.T) {
s, teardown := createTiFlashContext(t)
tiflash := s.tiflash
Expand Down
20 changes: 15 additions & 5 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func onDropTableOrView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ er
}
}
if tblInfo.TiFlashReplica != nil {
e := infosync.DeleteTiFlashTableSyncProgress(tblInfo.ID)
e := infosync.DeleteTiFlashTableSyncProgress(tblInfo)
if e != nil {
logutil.BgLogger().Error("DeleteTiFlashTableSyncProgress fails", zap.Error(e))
}
Expand Down Expand Up @@ -709,6 +709,14 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro
}
})

// Clear the TiFlash replica progress from ETCD.
if tblInfo.TiFlashReplica != nil {
e := infosync.DeleteTiFlashTableSyncProgress(tblInfo)
if e != nil {
logutil.BgLogger().Error("DeleteTiFlashTableSyncProgress fails", zap.Error(e))
}
}

var oldPartitionIDs []int64
if tblInfo.GetPartitionInfo() != nil {
oldPartitionIDs = getPartitionIDs(tblInfo)
Expand Down Expand Up @@ -748,10 +756,6 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro

// Clear the TiFlash replica available status.
if tblInfo.TiFlashReplica != nil {
e := infosync.DeleteTiFlashTableSyncProgress(tblInfo.ID)
if e != nil {
logutil.BgLogger().Error("DeleteTiFlashTableSyncProgress fails", zap.Error(e))
}
// Set PD rules for TiFlash
if pi := tblInfo.GetPartitionInfo(); pi != nil {
if e := infosync.ConfigureTiFlashPDForPartitions(true, &pi.Definitions, tblInfo.TiFlashReplica.Count, &tblInfo.TiFlashReplica.LocationLabels, tblInfo.ID); e != nil {
Expand Down Expand Up @@ -1271,6 +1275,12 @@ func (w *worker) onSetTableFlashReplica(d *ddlCtx, t *meta.Meta, job *model.Job)
Available: available,
}
} else {
if tblInfo.TiFlashReplica != nil {
err = infosync.DeleteTiFlashTableSyncProgress(tblInfo)
if err != nil {
logutil.BgLogger().Error("DeleteTiFlashTableSyncProgress fails", zap.Error(err))
}
}
tblInfo.TiFlashReplica = nil
}

Expand Down
17 changes: 14 additions & 3 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,16 +352,27 @@ func UpdateTiFlashTableSyncProgress(ctx context.Context, tid int64, progressStri
}

// DeleteTiFlashTableSyncProgress is used to delete the tiflash table replica sync progress.
func DeleteTiFlashTableSyncProgress(tid int64) error {
func DeleteTiFlashTableSyncProgress(tableInfo *model.TableInfo) error {
is, err := getGlobalInfoSyncer()
if err != nil {
return err
}
if is.etcdCli == nil {
return nil
}
key := fmt.Sprintf("%s/%v", TiFlashTableSyncProgressPath, tid)
return util.DeleteKeyFromEtcd(key, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout)
if pi := tableInfo.GetPartitionInfo(); pi != nil {
for _, p := range pi.Definitions {
key := fmt.Sprintf("%s/%v", TiFlashTableSyncProgressPath, p.ID)
err = util.DeleteKeyFromEtcd(key, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout)
if err != nil {
return err
}
}
} else {
key := fmt.Sprintf("%s/%v", TiFlashTableSyncProgressPath, tableInfo.ID)
return util.DeleteKeyFromEtcd(key, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout)
}
return nil
}

// GetTiFlashTableSyncProgress uses to get all the tiflash table replica sync progress.
Expand Down
4 changes: 3 additions & 1 deletion server/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,9 @@ func (h flashReplicaHandler) handleStatusReport(w http.ResponseWriter, req *http
writeError(w, err)
}
if available {
err = infosync.DeleteTiFlashTableSyncProgress(status.ID)
var tableInfo model.TableInfo
tableInfo.ID = status.ID
err = infosync.DeleteTiFlashTableSyncProgress(&tableInfo)
} else {
progress := types.TruncateFloatToString(float64(status.FlashRegionCount)/float64(status.RegionCount), 2)
err = infosync.UpdateTiFlashTableSyncProgress(context.Background(), status.ID, progress)
Expand Down

0 comments on commit c52b5a7

Please sign in to comment.