Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DDL: delete partition table tiflash replica progress from ETCD when truncate table, drop table, set tiflash replica 0 #38551

Merged
merged 6 commits into from
Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 61 additions & 1 deletion ddl/ddl_tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,13 +983,19 @@ func TestTiFlashProgress(t *testing.T) {
}
_ = infosync.UpdateTiFlashTableSyncProgress(context.TODO(), tb.Meta().ID, "5.0")
mustExist(tb.Meta().ID)
_ = infosync.DeleteTiFlashTableSyncProgress(tb.Meta().ID)
_ = infosync.DeleteTiFlashTableSyncProgress(tb.Meta())
mustAbsent(tb.Meta().ID)

_ = infosync.UpdateTiFlashTableSyncProgress(context.TODO(), tb.Meta().ID, "5.0")
tk.MustExec("truncate table tiflash_d.t")
mustAbsent(tb.Meta().ID)

tb, _ = s.dom.InfoSchema().TableByName(model.NewCIStr("tiflash_d"), model.NewCIStr("t"))
_ = infosync.UpdateTiFlashTableSyncProgress(context.TODO(), tb.Meta().ID, "5.0")
tk.MustExec("alter table tiflash_d.t set tiflash replica 0")
mustAbsent(tb.Meta().ID)
tk.MustExec("alter table tiflash_d.t set tiflash replica 1")

tb, _ = s.dom.InfoSchema().TableByName(model.NewCIStr("tiflash_d"), model.NewCIStr("t"))
_ = infosync.UpdateTiFlashTableSyncProgress(context.TODO(), tb.Meta().ID, "5.0")
tk.MustExec("drop table tiflash_d.t")
Expand All @@ -998,6 +1004,60 @@ func TestTiFlashProgress(t *testing.T) {
time.Sleep(100 * time.Millisecond)
}

func TestTiFlashProgressForPartitionTable(t *testing.T) {
s, teardown := createTiFlashContext(t)
s.tiflash.NotAvailable = true
defer teardown()
tk := testkit.NewTestKit(t, s.store)

integration.BeforeTest(t, integration.WithoutGoLeakDetection())
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)

save := infosync.GetEtcdClient()
defer infosync.SetEtcdClient(save)
infosync.SetEtcdClient(cluster.Client(0))
tk.MustExec("create database tiflash_d")
tk.MustExec("create table tiflash_d.t(z int) PARTITION BY RANGE(z) (PARTITION p0 VALUES LESS THAN (10))")
tk.MustExec("alter table tiflash_d.t set tiflash replica 1")
tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("tiflash_d"), model.NewCIStr("t"))
require.NoError(t, err)
require.NotNil(t, tb)
mustExist := func(tid int64) {
pm, err := infosync.GetTiFlashTableSyncProgress(context.TODO())
require.NoError(t, err)
_, ok := pm[tid]
require.True(t, ok)
}
mustAbsent := func(tid int64) {
pm, err := infosync.GetTiFlashTableSyncProgress(context.TODO())
require.NoError(t, err)
_, ok := pm[tid]
require.False(t, ok)
}
time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable)
mustExist(tb.Meta().Partition.Definitions[0].ID)
_ = infosync.DeleteTiFlashTableSyncProgress(tb.Meta())
mustAbsent(tb.Meta().Partition.Definitions[0].ID)

_ = infosync.UpdateTiFlashTableSyncProgress(context.TODO(), tb.Meta().Partition.Definitions[0].ID, "5.0")
tk.MustExec("truncate table tiflash_d.t")
mustAbsent(tb.Meta().Partition.Definitions[0].ID)

tb, _ = s.dom.InfoSchema().TableByName(model.NewCIStr("tiflash_d"), model.NewCIStr("t"))
_ = infosync.UpdateTiFlashTableSyncProgress(context.TODO(), tb.Meta().Partition.Definitions[0].ID, "5.0")
tk.MustExec("alter table tiflash_d.t set tiflash replica 0")
mustAbsent(tb.Meta().Partition.Definitions[0].ID)
tk.MustExec("alter table tiflash_d.t set tiflash replica 1")

tb, _ = s.dom.InfoSchema().TableByName(model.NewCIStr("tiflash_d"), model.NewCIStr("t"))
_ = infosync.UpdateTiFlashTableSyncProgress(context.TODO(), tb.Meta().Partition.Definitions[0].ID, "5.0")
tk.MustExec("drop table tiflash_d.t")
mustAbsent(tb.Meta().Partition.Definitions[0].ID)

time.Sleep(100 * time.Millisecond)
}

func TestTiFlashGroupIndexWhenStartup(t *testing.T) {
s, teardown := createTiFlashContext(t)
tiflash := s.tiflash
Expand Down
20 changes: 15 additions & 5 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func onDropTableOrView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ er
}
}
if tblInfo.TiFlashReplica != nil {
e := infosync.DeleteTiFlashTableSyncProgress(tblInfo.ID)
e := infosync.DeleteTiFlashTableSyncProgress(tblInfo)
if e != nil {
logutil.BgLogger().Error("DeleteTiFlashTableSyncProgress fails", zap.Error(e))
}
Expand Down Expand Up @@ -709,6 +709,14 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro
}
})

// Clear the TiFlash replica progress from ETCD.
if tblInfo.TiFlashReplica != nil {
e := infosync.DeleteTiFlashTableSyncProgress(tblInfo)
if e != nil {
logutil.BgLogger().Error("DeleteTiFlashTableSyncProgress fails", zap.Error(e))
}
}

var oldPartitionIDs []int64
if tblInfo.GetPartitionInfo() != nil {
oldPartitionIDs = getPartitionIDs(tblInfo)
Expand Down Expand Up @@ -748,10 +756,6 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro

// Clear the TiFlash replica available status.
if tblInfo.TiFlashReplica != nil {
e := infosync.DeleteTiFlashTableSyncProgress(tblInfo.ID)
if e != nil {
logutil.BgLogger().Error("DeleteTiFlashTableSyncProgress fails", zap.Error(e))
}
// Set PD rules for TiFlash
if pi := tblInfo.GetPartitionInfo(); pi != nil {
if e := infosync.ConfigureTiFlashPDForPartitions(true, &pi.Definitions, tblInfo.TiFlashReplica.Count, &tblInfo.TiFlashReplica.LocationLabels, tblInfo.ID); e != nil {
Expand Down Expand Up @@ -1271,6 +1275,12 @@ func (w *worker) onSetTableFlashReplica(d *ddlCtx, t *meta.Meta, job *model.Job)
Available: available,
}
} else {
if tblInfo.TiFlashReplica != nil {
err = infosync.DeleteTiFlashTableSyncProgress(tblInfo)
if err != nil {
logutil.BgLogger().Error("DeleteTiFlashTableSyncProgress fails", zap.Error(err))
}
}
tblInfo.TiFlashReplica = nil
}

Expand Down
17 changes: 14 additions & 3 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,16 +352,27 @@ func UpdateTiFlashTableSyncProgress(ctx context.Context, tid int64, progressStri
}

// DeleteTiFlashTableSyncProgress is used to delete the tiflash table replica sync progress.
func DeleteTiFlashTableSyncProgress(tid int64) error {
func DeleteTiFlashTableSyncProgress(tableInfo *model.TableInfo) error {
is, err := getGlobalInfoSyncer()
if err != nil {
return err
}
if is.etcdCli == nil {
return nil
}
key := fmt.Sprintf("%s/%v", TiFlashTableSyncProgressPath, tid)
return util.DeleteKeyFromEtcd(key, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout)
if pi := tableInfo.GetPartitionInfo(); pi != nil {
for _, p := range pi.Definitions {
key := fmt.Sprintf("%s/%v", TiFlashTableSyncProgressPath, p.ID)
err = util.DeleteKeyFromEtcd(key, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout)
if err != nil {
return err
}
}
} else {
key := fmt.Sprintf("%s/%v", TiFlashTableSyncProgressPath, tableInfo.ID)
return util.DeleteKeyFromEtcd(key, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout)
}
return nil
}

// GetTiFlashTableSyncProgress uses to get all the tiflash table replica sync progress.
Expand Down
4 changes: 3 additions & 1 deletion server/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,9 @@ func (h flashReplicaHandler) handleStatusReport(w http.ResponseWriter, req *http
writeError(w, err)
}
if available {
err = infosync.DeleteTiFlashTableSyncProgress(status.ID)
var tableInfo model.TableInfo
tableInfo.ID = status.ID
err = infosync.DeleteTiFlashTableSyncProgress(&tableInfo)
} else {
progress := types.TruncateFloatToString(float64(status.FlashRegionCount)/float64(status.RegionCount), 2)
err = infosync.UpdateTiFlashTableSyncProgress(context.Background(), status.ID, progress)
Expand Down