diff --git a/ddl/ddl_tiflash_test.go b/ddl/ddl_tiflash_test.go index ec1afd9300107..39c6fccc3f228 100644 --- a/ddl/ddl_tiflash_test.go +++ b/ddl/ddl_tiflash_test.go @@ -983,13 +983,19 @@ func TestTiFlashProgress(t *testing.T) { } _ = infosync.UpdateTiFlashTableSyncProgress(context.TODO(), tb.Meta().ID, "5.0") mustExist(tb.Meta().ID) - _ = infosync.DeleteTiFlashTableSyncProgress(tb.Meta().ID) + _ = infosync.DeleteTiFlashTableSyncProgress(tb.Meta()) mustAbsent(tb.Meta().ID) _ = infosync.UpdateTiFlashTableSyncProgress(context.TODO(), tb.Meta().ID, "5.0") tk.MustExec("truncate table tiflash_d.t") mustAbsent(tb.Meta().ID) + tb, _ = s.dom.InfoSchema().TableByName(model.NewCIStr("tiflash_d"), model.NewCIStr("t")) + _ = infosync.UpdateTiFlashTableSyncProgress(context.TODO(), tb.Meta().ID, "5.0") + tk.MustExec("alter table tiflash_d.t set tiflash replica 0") + mustAbsent(tb.Meta().ID) + tk.MustExec("alter table tiflash_d.t set tiflash replica 1") + tb, _ = s.dom.InfoSchema().TableByName(model.NewCIStr("tiflash_d"), model.NewCIStr("t")) _ = infosync.UpdateTiFlashTableSyncProgress(context.TODO(), tb.Meta().ID, "5.0") tk.MustExec("drop table tiflash_d.t") @@ -998,6 +1004,60 @@ func TestTiFlashProgress(t *testing.T) { time.Sleep(100 * time.Millisecond) } +func TestTiFlashProgressForPartitionTable(t *testing.T) { + s, teardown := createTiFlashContext(t) + s.tiflash.NotAvailable = true + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + integration.BeforeTest(t, integration.WithoutGoLeakDetection()) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + + save := infosync.GetEtcdClient() + defer infosync.SetEtcdClient(save) + infosync.SetEtcdClient(cluster.Client(0)) + tk.MustExec("create database tiflash_d") + tk.MustExec("create table tiflash_d.t(z int) PARTITION BY RANGE(z) (PARTITION p0 VALUES LESS THAN (10))") + tk.MustExec("alter table tiflash_d.t set tiflash replica 1") + tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("tiflash_d"), model.NewCIStr("t")) + require.NoError(t, err) + require.NotNil(t, tb) + mustExist := func(tid int64) { + pm, err := infosync.GetTiFlashTableSyncProgress(context.TODO()) + require.NoError(t, err) + _, ok := pm[tid] + require.True(t, ok) + } + mustAbsent := func(tid int64) { + pm, err := infosync.GetTiFlashTableSyncProgress(context.TODO()) + require.NoError(t, err) + _, ok := pm[tid] + require.False(t, ok) + } + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable) + mustExist(tb.Meta().Partition.Definitions[0].ID) + _ = infosync.DeleteTiFlashTableSyncProgress(tb.Meta()) + mustAbsent(tb.Meta().Partition.Definitions[0].ID) + + _ = infosync.UpdateTiFlashTableSyncProgress(context.TODO(), tb.Meta().Partition.Definitions[0].ID, "5.0") + tk.MustExec("truncate table tiflash_d.t") + mustAbsent(tb.Meta().Partition.Definitions[0].ID) + + tb, _ = s.dom.InfoSchema().TableByName(model.NewCIStr("tiflash_d"), model.NewCIStr("t")) + _ = infosync.UpdateTiFlashTableSyncProgress(context.TODO(), tb.Meta().Partition.Definitions[0].ID, "5.0") + tk.MustExec("alter table tiflash_d.t set tiflash replica 0") + mustAbsent(tb.Meta().Partition.Definitions[0].ID) + tk.MustExec("alter table tiflash_d.t set tiflash replica 1") + + tb, _ = s.dom.InfoSchema().TableByName(model.NewCIStr("tiflash_d"), model.NewCIStr("t")) + _ = infosync.UpdateTiFlashTableSyncProgress(context.TODO(), tb.Meta().Partition.Definitions[0].ID, "5.0") + tk.MustExec("drop table tiflash_d.t") + mustAbsent(tb.Meta().Partition.Definitions[0].ID) + + time.Sleep(100 * time.Millisecond) +} + func TestTiFlashGroupIndexWhenStartup(t *testing.T) { s, teardown := createTiFlashContext(t) tiflash := s.tiflash diff --git a/ddl/table.go b/ddl/table.go index 64d7aed775981..df329fd973c06 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -372,7 +372,7 @@ func onDropTableOrView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ er } } if tblInfo.TiFlashReplica != nil { - e := infosync.DeleteTiFlashTableSyncProgress(tblInfo.ID) + e := infosync.DeleteTiFlashTableSyncProgress(tblInfo) if e != nil { logutil.BgLogger().Error("DeleteTiFlashTableSyncProgress fails", zap.Error(e)) } @@ -709,6 +709,14 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro } }) + // Clear the TiFlash replica progress from ETCD. + if tblInfo.TiFlashReplica != nil { + e := infosync.DeleteTiFlashTableSyncProgress(tblInfo) + if e != nil { + logutil.BgLogger().Error("DeleteTiFlashTableSyncProgress fails", zap.Error(e)) + } + } + var oldPartitionIDs []int64 if tblInfo.GetPartitionInfo() != nil { oldPartitionIDs = getPartitionIDs(tblInfo) @@ -748,10 +756,6 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro // Clear the TiFlash replica available status. if tblInfo.TiFlashReplica != nil { - e := infosync.DeleteTiFlashTableSyncProgress(tblInfo.ID) - if e != nil { - logutil.BgLogger().Error("DeleteTiFlashTableSyncProgress fails", zap.Error(e)) - } // Set PD rules for TiFlash if pi := tblInfo.GetPartitionInfo(); pi != nil { if e := infosync.ConfigureTiFlashPDForPartitions(true, &pi.Definitions, tblInfo.TiFlashReplica.Count, &tblInfo.TiFlashReplica.LocationLabels, tblInfo.ID); e != nil { @@ -1271,6 +1275,12 @@ func (w *worker) onSetTableFlashReplica(d *ddlCtx, t *meta.Meta, job *model.Job) Available: available, } } else { + if tblInfo.TiFlashReplica != nil { + err = infosync.DeleteTiFlashTableSyncProgress(tblInfo) + if err != nil { + logutil.BgLogger().Error("DeleteTiFlashTableSyncProgress fails", zap.Error(err)) + } + } tblInfo.TiFlashReplica = nil } diff --git a/domain/infosync/info.go b/domain/infosync/info.go index fd29483c8b157..094fc616cd2fc 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -352,7 +352,7 @@ func UpdateTiFlashTableSyncProgress(ctx context.Context, tid int64, progressStri } // DeleteTiFlashTableSyncProgress is used to delete the tiflash table replica sync progress. -func DeleteTiFlashTableSyncProgress(tid int64) error { +func DeleteTiFlashTableSyncProgress(tableInfo *model.TableInfo) error { is, err := getGlobalInfoSyncer() if err != nil { return err @@ -360,8 +360,19 @@ func DeleteTiFlashTableSyncProgress(tid int64) error { if is.etcdCli == nil { return nil } - key := fmt.Sprintf("%s/%v", TiFlashTableSyncProgressPath, tid) - return util.DeleteKeyFromEtcd(key, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout) + if pi := tableInfo.GetPartitionInfo(); pi != nil { + for _, p := range pi.Definitions { + key := fmt.Sprintf("%s/%v", TiFlashTableSyncProgressPath, p.ID) + err = util.DeleteKeyFromEtcd(key, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout) + if err != nil { + return err + } + } + } else { + key := fmt.Sprintf("%s/%v", TiFlashTableSyncProgressPath, tableInfo.ID) + return util.DeleteKeyFromEtcd(key, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout) + } + return nil } // GetTiFlashTableSyncProgress uses to get all the tiflash table replica sync progress. diff --git a/server/http_handler.go b/server/http_handler.go index 1338ed072c1d0..6e12eeeae4c9d 100644 --- a/server/http_handler.go +++ b/server/http_handler.go @@ -971,7 +971,9 @@ func (h flashReplicaHandler) handleStatusReport(w http.ResponseWriter, req *http writeError(w, err) } if available { - err = infosync.DeleteTiFlashTableSyncProgress(status.ID) + var tableInfo model.TableInfo + tableInfo.ID = status.ID + err = infosync.DeleteTiFlashTableSyncProgress(&tableInfo) } else { progress := types.TruncateFloatToString(float64(status.FlashRegionCount)/float64(status.RegionCount), 2) err = infosync.UpdateTiFlashTableSyncProgress(context.Background(), status.ID, progress)