Skip to content

Commit

Permalink
cdc: always ignore sequence tables (#4563)
Browse files Browse the repository at this point in the history
close #4552
  • Loading branch information
overvenus authored and ti-chi-bot committed Jun 20, 2022
1 parent 352a8e1 commit 98c9c9d
Show file tree
Hide file tree
Showing 13 changed files with 185 additions and 116 deletions.
11 changes: 9 additions & 2 deletions cdc/capture/http_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func verifyCreateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch
}

if !replicaConfig.ForceReplicate && !changefeedConfig.IgnoreIneligibleTable {
ineligibleTables, _, err := verifyTables(replicaConfig, capture.kvStorage, changefeedConfig.StartTS)
ineligibleTables, _, err := VerifyTables(replicaConfig, capture.kvStorage, changefeedConfig.StartTS)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -199,7 +199,9 @@ func verifyUpdateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch
return newInfo, nil
}

func verifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, startTs uint64) (ineligibleTables, eligibleTables []model.TableName, err error) {
// VerifyTables catalog tables specified by ReplicaConfig into
// eligible (has an unique index or primary key) and ineligible tables.
func VerifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, startTs uint64) (ineligibleTables, eligibleTables []model.TableName, err error) {
filter, err := filter.NewFilter(replicaConfig)
if err != nil {
return nil, nil, errors.Trace(err)
Expand All @@ -217,6 +219,11 @@ func verifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, s
if filter.ShouldIgnoreTable(tableInfo.TableName.Schema, tableInfo.TableName.Table) {
continue
}
// Sequence is not supported yet, TiCDC needs to filter all sequence tables.
// See https://github.com/pingcap/tiflow/issues/4559
if tableInfo.IsSequence() {
continue
}
if !tableInfo.IsEligible(false /* forceReplicate */) {
ineligibleTables = append(ineligibleTables, tableInfo.TableName)
} else {
Expand Down
52 changes: 32 additions & 20 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ type schemaSnapshot struct {

currentTs uint64

// if explicit is true, treat tables without explicit row id as eligible
explicitTables bool
// if forceReplicate is true, treat ineligible tables as eligible.
forceReplicate bool
}

// SingleSchemaSnapshot is a single schema snapshot independent of schema storage
Expand Down Expand Up @@ -98,17 +98,17 @@ func (s *SingleSchemaSnapshot) PreTableInfo(job *timodel.Job) (*model.TableInfo,
}

// NewSingleSchemaSnapshotFromMeta creates a new single schema snapshot from a tidb meta
func NewSingleSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, explicitTables bool) (*SingleSchemaSnapshot, error) {
func NewSingleSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, forceReplicate bool) (*SingleSchemaSnapshot, error) {
// meta is nil only in unit tests
if meta == nil {
snap := newEmptySchemaSnapshot(explicitTables)
snap := newEmptySchemaSnapshot(forceReplicate)
snap.currentTs = currentTs
return snap, nil
}
return newSchemaSnapshotFromMeta(meta, currentTs, explicitTables)
return newSchemaSnapshotFromMeta(meta, currentTs, forceReplicate)
}

func newEmptySchemaSnapshot(explicitTables bool) *schemaSnapshot {
func newEmptySchemaSnapshot(forceReplicate bool) *schemaSnapshot {
return &schemaSnapshot{
tableNameToID: make(map[model.TableName]int64),
schemaNameToID: make(map[string]int64),
Expand All @@ -121,12 +121,12 @@ func newEmptySchemaSnapshot(explicitTables bool) *schemaSnapshot {
truncateTableID: make(map[int64]struct{}),
ineligibleTableID: make(map[int64]struct{}),

explicitTables: explicitTables,
forceReplicate: forceReplicate,
}
}

func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, explicitTables bool) (*schemaSnapshot, error) {
snap := newEmptySchemaSnapshot(explicitTables)
func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, forceReplicate bool) (*schemaSnapshot, error) {
snap := newEmptySchemaSnapshot(forceReplicate)
dbinfos, err := meta.ListDatabases()
if err != nil {
return nil, cerror.WrapError(cerror.ErrMetaListDatabases, err)
Expand All @@ -146,7 +146,7 @@ func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, explicitTabl
tableInfo := model.WrapTableInfo(dbinfo.ID, dbinfo.Name.O, currentTs, tableInfo)
snap.tables[tableInfo.ID] = tableInfo
snap.tableNameToID[model.TableName{Schema: dbinfo.Name.O, Table: tableInfo.Name.O}] = tableInfo.ID
isEligible := tableInfo.IsEligible(explicitTables)
isEligible := tableInfo.IsEligible(forceReplicate)
if !isEligible {
snap.ineligibleTableID[tableInfo.ID] = struct{}{}
}
Expand Down Expand Up @@ -468,7 +468,7 @@ func (s *schemaSnapshot) updatePartition(tbl *model.TableInfo) error {
zap.Int64("add partition id", partition.ID))
}
s.partitionTable[partition.ID] = tbl
if !tbl.IsEligible(s.explicitTables) {
if !tbl.IsEligible(s.forceReplicate) {
s.ineligibleTableID[partition.ID] = struct{}{}
}
delete(oldIDs, partition.ID)
Expand Down Expand Up @@ -504,14 +504,20 @@ func (s *schemaSnapshot) createTable(table *model.TableInfo) error {
s.tableInSchema[table.SchemaID] = tableInSchema

s.tables[table.ID] = table
if !table.IsEligible(s.explicitTables) {
log.Warn("this table is not eligible to replicate", zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID))
if !table.IsEligible(s.forceReplicate) {
// Sequence is not supported yet, and always ineligible.
// Skip Warn to avoid confusion.
// See https://github.com/pingcap/tiflow/issues/4559
if !table.IsSequence() {
log.Warn("this table is ineligible to replicate",
zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID))
}
s.ineligibleTableID[table.ID] = struct{}{}
}
if pi := table.GetPartitionInfo(); pi != nil {
for _, partition := range pi.Definitions {
s.partitionTable[partition.ID] = table
if !table.IsEligible(s.explicitTables) {
if !table.IsEligible(s.forceReplicate) {
s.ineligibleTableID[partition.ID] = struct{}{}
}
}
Expand All @@ -529,14 +535,20 @@ func (s *schemaSnapshot) replaceTable(table *model.TableInfo) error {
return cerror.ErrSnapshotTableNotFound.GenWithStack("table %s(%d)", table.Name, table.ID)
}
s.tables[table.ID] = table
if !table.IsEligible(s.explicitTables) {
log.Warn("this table is not eligible to replicate", zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID))
if !table.IsEligible(s.forceReplicate) {
// Sequence is not supported yet, and always ineligible.
// Skip Warn to avoid confusion.
// See https://github.com/pingcap/tiflow/issues/4559
if !table.IsSequence() {
log.Warn("this table is ineligible to replicate",
zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID))
}
s.ineligibleTableID[table.ID] = struct{}{}
}
if pi := table.GetPartitionInfo(); pi != nil {
for _, partition := range pi.Definitions {
s.partitionTable[partition.ID] = table
if !table.IsEligible(s.explicitTables) {
if !table.IsEligible(s.forceReplicate) {
s.ineligibleTableID[partition.ID] = struct{}{}
}
}
Expand Down Expand Up @@ -673,7 +685,7 @@ type schemaStorageImpl struct {
resolvedTs uint64

filter *filter.Filter
explicitTables bool
forceReplicate bool
}

// NewSchemaStorage creates a new schema storage
Expand All @@ -692,7 +704,7 @@ func NewSchemaStorage(meta *timeta.Meta, startTs uint64, filter *filter.Filter,
snaps: []*schemaSnapshot{snap},
resolvedTs: startTs,
filter: filter,
explicitTables: forceReplicate,
forceReplicate: forceReplicate,
}
return schema, nil
}
Expand Down Expand Up @@ -769,7 +781,7 @@ func (s *schemaStorageImpl) HandleDDLJob(job *timodel.Job) error {
}
snap = lastSnap.Clone()
} else {
snap = newEmptySchemaSnapshot(s.explicitTables)
snap = newEmptySchemaSnapshot(s.forceReplicate)
}
if err := snap.handleDDL(job); err != nil {
return errors.Trace(err)
Expand Down
10 changes: 5 additions & 5 deletions cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ func TestSnapshotClone(t *testing.T) {
require.Nil(t, err)
meta, err := kv.GetSnapshotMeta(store, ver.Ver)
require.Nil(t, err)
snap, err := newSchemaSnapshotFromMeta(meta, ver.Ver, false /* explicitTables */)
snap, err := newSchemaSnapshotFromMeta(meta, ver.Ver, false /* forceReplicate */)
require.Nil(t, err)

clone := snap.Clone()
Expand All @@ -716,7 +716,7 @@ func TestSnapshotClone(t *testing.T) {
require.Equal(t, clone.truncateTableID, snap.truncateTableID)
require.Equal(t, clone.ineligibleTableID, snap.ineligibleTableID)
require.Equal(t, clone.currentTs, snap.currentTs)
require.Equal(t, clone.explicitTables, snap.explicitTables)
require.Equal(t, clone.forceReplicate, snap.forceReplicate)
require.Equal(t, len(clone.tables), len(snap.tables))
require.Equal(t, len(clone.schemas), len(snap.schemas))
require.Equal(t, len(clone.partitionTable), len(snap.partitionTable))
Expand Down Expand Up @@ -750,13 +750,13 @@ func TestExplicitTables(t *testing.T) {
require.Nil(t, err)
meta1, err := kv.GetSnapshotMeta(store, ver1.Ver)
require.Nil(t, err)
snap1, err := newSchemaSnapshotFromMeta(meta1, ver1.Ver, true /* explicitTables */)
snap1, err := newSchemaSnapshotFromMeta(meta1, ver1.Ver, true /* forceReplicate */)
require.Nil(t, err)
meta2, err := kv.GetSnapshotMeta(store, ver2.Ver)
require.Nil(t, err)
snap2, err := newSchemaSnapshotFromMeta(meta2, ver2.Ver, false /* explicitTables */)
snap2, err := newSchemaSnapshotFromMeta(meta2, ver2.Ver, false /* forceReplicate */)
require.Nil(t, err)
snap3, err := newSchemaSnapshotFromMeta(meta2, ver2.Ver, true /* explicitTables */)
snap3, err := newSchemaSnapshotFromMeta(meta2, ver2.Ver, true /* forceReplicate */)
require.Nil(t, err)

require.Equal(t, len(snap2.tables)-len(snap1.tables), 5)
Expand Down
5 changes: 5 additions & 0 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,11 @@ func (ti *TableInfo) ExistTableUniqueColumn() bool {

// IsEligible returns whether the table is a eligible table
func (ti *TableInfo) IsEligible(forceReplicate bool) bool {
// Sequence is not supported yet, TiCDC needs to filter all sequence tables.
// See https://github.com/pingcap/tiflow/issues/4559
if ti.IsSequence() {
return false
}
if forceReplicate {
return true
}
Expand Down
10 changes: 10 additions & 0 deletions cdc/model/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,19 @@ func TestTableInfoGetterFuncs(t *testing.T) {
info = WrapTableInfo(1, "test", 0, &tbl)
require.False(t, info.IsEligible(false))
require.True(t, info.IsEligible(true))

// View is eligible.
tbl.View = &timodel.ViewInfo{}
info = WrapTableInfo(1, "test", 0, &tbl)
require.True(t, info.IsView())
require.True(t, info.IsEligible(false))

// Sequence is ineligible.
tbl.Sequence = &timodel.SequenceInfo{}
info = WrapTableInfo(1, "test", 0, &tbl)
require.True(t, info.IsSequence())
require.False(t, info.IsEligible(false))
require.False(t, info.IsEligible(true))
}

func TestTableInfoClone(t *testing.T) {
Expand Down
46 changes: 11 additions & 35 deletions cdc/owner/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,48 +116,24 @@ func (s *schemaWrap4Owner) BuildDDLEvent(job *timodel.Job) (*model.DDLEvent, err
return ddlEvent, nil
}

func (s *schemaWrap4Owner) SinkTableInfos() []*model.SimpleTableInfo {
var sinkTableInfos []*model.SimpleTableInfo
for tableID := range s.schemaSnapshot.CloneTables() {
tblInfo, ok := s.schemaSnapshot.TableByID(tableID)
if !ok {
log.Panic("table not found for table ID", zap.Int64("tid", tableID))
}
if s.shouldIgnoreTable(tblInfo) {
continue
}
dbInfo, ok := s.schemaSnapshot.SchemaByTableID(tableID)
if !ok {
log.Panic("schema not found for table ID", zap.Int64("tid", tableID))
}

// TODO separate function for initializing SimpleTableInfo
sinkTableInfo := new(model.SimpleTableInfo)
sinkTableInfo.Schema = dbInfo.Name.O
sinkTableInfo.TableID = tableID
sinkTableInfo.Table = tblInfo.TableName.Table
sinkTableInfo.ColumnInfo = make([]*model.ColumnInfo, len(tblInfo.Cols()))
for i, colInfo := range tblInfo.Cols() {
sinkTableInfo.ColumnInfo[i] = new(model.ColumnInfo)
sinkTableInfo.ColumnInfo[i].FromTiColumnInfo(colInfo)
}
sinkTableInfos = append(sinkTableInfos, sinkTableInfo)
}
return sinkTableInfos
}

func (s *schemaWrap4Owner) shouldIgnoreTable(tableInfo *model.TableInfo) bool {
schemaName := tableInfo.TableName.Schema
tableName := tableInfo.TableName.Table
func (s *schemaWrap4Owner) shouldIgnoreTable(t *model.TableInfo) bool {
schemaName := t.TableName.Schema
tableName := t.TableName.Table
if s.filter.ShouldIgnoreTable(schemaName, tableName) {
return true
}
if s.config.Cyclic.IsEnabled() && mark.IsMarkTable(schemaName, tableName) {
// skip the mark table if cyclic is enabled
return true
}
if !tableInfo.IsEligible(s.config.ForceReplicate) {
log.Warn("skip ineligible table", zap.Int64("tid", tableInfo.ID), zap.Stringer("table", tableInfo.TableName))
if !t.IsEligible(s.config.ForceReplicate) {
// Sequence is not supported yet, and always ineligible.
// Skip Warn to avoid confusion.
// See https://github.com/pingcap/tiflow/issues/4559
if !t.IsSequence() {
log.Warn("skip ineligible table",
zap.Int64("tableID", t.ID), zap.Stringer("tableName", t.TableName))
}
return true
}
return false
Expand Down
25 changes: 0 additions & 25 deletions cdc/owner/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,28 +145,3 @@ func (s *schemaSuite) TestBuildDDLEvent(c *check.C) {
},
})
}

func (s *schemaSuite) TestSinkTableInfos(c *check.C) {
defer testleak.AfterTest(c)()
helper := entry.NewSchemaTestHelper(c)
defer helper.Close()
ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope)
c.Assert(err, check.IsNil)
schema, err := newSchemaWrap4Owner(helper.Storage(), ver.Ver, config.GetDefaultReplicaConfig())
c.Assert(err, check.IsNil)
// add normal table
job := helper.DDL2Job("create table test.t1(id int primary key)")
tableIDT1 := job.BinlogInfo.TableInfo.ID
c.Assert(schema.HandleDDL(job), check.IsNil)
// add ineligible table
job = helper.DDL2Job("create table test.t2(id int)")
c.Assert(schema.HandleDDL(job), check.IsNil)
c.Assert(schema.SinkTableInfos(), check.DeepEquals, []*model.SimpleTableInfo{
{
Schema: "test",
Table: "t1",
TableID: tableIDT1,
ColumnInfo: []*model.ColumnInfo{{Name: "id", Type: mysql.TypeLong}},
},
})
}
31 changes: 2 additions & 29 deletions pkg/cmd/cli/cli_changefeed_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tiflow/cdc"
"github.com/pingcap/tiflow/cdc/entry"
cdcCapture "github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/cmd/util"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/httputil"
"github.com/pingcap/tiflow/pkg/security"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -86,33 +85,7 @@ func getTables(cliPdAddr string, credential *security.Credential, cfg *config.Re
return nil, nil, err
}

meta, err := kv.GetSnapshotMeta(kvStore, startTs)
if err != nil {
return nil, nil, errors.Trace(err)
}

filter, err := filter.NewFilter(cfg)
if err != nil {
return nil, nil, errors.Trace(err)
}

snap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, startTs, false /* explicitTables */)
if err != nil {
return nil, nil, errors.Trace(err)
}

for _, tableInfo := range snap.Tables() {
if filter.ShouldIgnoreTable(tableInfo.TableName.Schema, tableInfo.TableName.Table) {
continue
}
if !tableInfo.IsEligible(false /* forceReplicate */) {
ineligibleTables = append(ineligibleTables, tableInfo.TableName)
} else {
eligibleTables = append(eligibleTables, tableInfo.TableName)
}
}

return
return cdcCapture.VerifyTables(cfg, kvStore, startTs)
}

// sendOwnerChangefeedQuery sends owner changefeed query request.
Expand Down
4 changes: 4 additions & 0 deletions pkg/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ func TestShouldDiscardDDL(t *testing.T) {
require.Nil(t, err)
require.False(t, filter.ShouldDiscardDDL(model.ActionDropSchema))
require.False(t, filter.ShouldDiscardDDL(model.ActionAddForeignKey))

// Discard sequence DDL.
require.True(t, filter.ShouldDiscardDDL(model.ActionCreateSequence))
require.True(t, filter.ShouldDiscardDDL(model.ActionAlterSequence))
require.True(t, filter.ShouldDiscardDDL(model.ActionDropSequence))
}

func TestShouldIgnoreDDL(t *testing.T) {
Expand Down
Loading

0 comments on commit 98c9c9d

Please sign in to comment.