Skip to content

Commit

Permalink
ddl: Delete TiFlash sync status from etcd when table is truncated or …
Browse files Browse the repository at this point in the history
…dropped (#37184) (#38268)

close #37168
  • Loading branch information
ti-chi-bot authored Oct 28, 2022
1 parent 66494ba commit ce40035
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 0 deletions.
49 changes: 49 additions & 0 deletions ddl/ddl_tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/testutils"
"go.etcd.io/etcd/tests/v3/integration"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -948,6 +949,54 @@ func TestTiFlashBatchUnsupported(t *testing.T) {
tk.MustGetErrCode("alter database information_schema set tiflash replica 1", 8200)
}

func TestTiFlashProgress(t *testing.T) {
s, teardown := createTiFlashContext(t)
s.tiflash.NotAvailable = true
defer teardown()
tk := testkit.NewTestKit(t, s.store)

integration.BeforeTest(t, integration.WithoutGoLeakDetection())
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)

save := infosync.GetEtcdClient()
defer infosync.SetEtcdClient(save)
infosync.SetEtcdClient(cluster.Client(0))
tk.MustExec("create database tiflash_d")
tk.MustExec("create table tiflash_d.t(z int)")
tk.MustExec("alter table tiflash_d.t set tiflash replica 1")
tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("tiflash_d"), model.NewCIStr("t"))
require.NoError(t, err)
require.NotNil(t, tb)
mustExist := func(tid int64) {
pm, err := infosync.GetTiFlashTableSyncProgress(context.TODO())
require.NoError(t, err)
_, ok := pm[tb.Meta().ID]
require.True(t, ok)
}
mustAbsent := func(tid int64) {
pm, err := infosync.GetTiFlashTableSyncProgress(context.TODO())
require.NoError(t, err)
_, ok := pm[tb.Meta().ID]
require.False(t, ok)
}
_ = infosync.UpdateTiFlashTableSyncProgress(context.TODO(), tb.Meta().ID, 5.0)
mustExist(tb.Meta().ID)
_ = infosync.DeleteTiFlashTableSyncProgress(tb.Meta().ID)
mustAbsent(tb.Meta().ID)

_ = infosync.UpdateTiFlashTableSyncProgress(context.TODO(), tb.Meta().ID, 5.0)
tk.MustExec("truncate table tiflash_d.t")
mustAbsent(tb.Meta().ID)

tb, _ = s.dom.InfoSchema().TableByName(model.NewCIStr("tiflash_d"), model.NewCIStr("t"))
_ = infosync.UpdateTiFlashTableSyncProgress(context.TODO(), tb.Meta().ID, 5.0)
tk.MustExec("drop table tiflash_d.t")
mustAbsent(tb.Meta().ID)

time.Sleep(100 * time.Millisecond)
}

func TestTiFlashGroupIndexWhenStartup(t *testing.T) {
s, teardown := createTiFlashContext(t)
defer teardown()
Expand Down
10 changes: 10 additions & 0 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,12 @@ func onDropTableOrView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ er
return ver, errors.Trace(err)
}
}
if tblInfo.TiFlashReplica != nil {
e := infosync.DeleteTiFlashTableSyncProgress(tblInfo.ID)
if e != nil {
logutil.BgLogger().Error("DeleteTiFlashTableSyncProgress fails", zap.Error(e))
}
}
// Placement rules cannot be removed immediately after drop table / truncate table, because the
// tables can be flashed back or recovered, therefore it moved to doGCPlacementRules in gc_worker.go.

Expand Down Expand Up @@ -685,6 +691,10 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro

// Clear the TiFlash replica available status.
if tblInfo.TiFlashReplica != nil {
e := infosync.DeleteTiFlashTableSyncProgress(tblInfo.ID)
if e != nil {
logutil.BgLogger().Error("DeleteTiFlashTableSyncProgress fails", zap.Error(e))
}
// Set PD rules for TiFlash
if pi := tblInfo.GetPartitionInfo(); pi != nil {
if e := infosync.ConfigureTiFlashPDForPartitions(true, &pi.Definitions, tblInfo.TiFlashReplica.Count, &tblInfo.TiFlashReplica.LocationLabels, tblInfo.ID); e != nil {
Expand Down
20 changes: 20 additions & 0 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -1114,3 +1114,23 @@ func DeleteInternalSession(se interface{}) {
}
sm.DeleteInternalSession(se)
}

// SetEtcdClient is only used for test.
func SetEtcdClient(etcdCli *clientv3.Client) {
is, err := getGlobalInfoSyncer()

if err != nil {
return
}
is.etcdCli = etcdCli
}

// GetEtcdClient is only used for test.
func GetEtcdClient() *clientv3.Client {
is, err := getGlobalInfoSyncer()

if err != nil {
return nil
}
return is.etcdCli
}
6 changes: 6 additions & 0 deletions domain/infosync/tiflash_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ type MockTiFlash struct {
PdEnabled bool
TiflashDelay time.Duration
StartTime time.Time
NotAvailable bool
}

func (tiflash *MockTiFlash) setUpMockTiFlashHTTPServer() {
Expand All @@ -335,6 +336,10 @@ func (tiflash *MockTiFlash) setUpMockTiFlashHTTPServer() {
return
}
table, ok := tiflash.SyncStatus[tableID]
if tiflash.NotAvailable {
// No region is available, so the table is not available.
table.Regions = []int{}
}
if !ok {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("0\n\n"))
Expand Down Expand Up @@ -364,6 +369,7 @@ func NewMockTiFlash() *MockTiFlash {
PdEnabled: true,
TiflashDelay: 0,
StartTime: time.Now(),
NotAvailable: false,
}
tiflash.setUpMockTiFlashHTTPServer()
return tiflash
Expand Down

0 comments on commit ce40035

Please sign in to comment.