Skip to content

Commit

Permalink
cdc: always ignore sequence tables (pingcap#4563)
Browse files Browse the repository at this point in the history
  • Loading branch information
overvenus committed Apr 14, 2022
1 parent 9820578 commit aad3ef6
Show file tree
Hide file tree
Showing 12 changed files with 181 additions and 111 deletions.
11 changes: 9 additions & 2 deletions cdc/capture/http_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func verifyCreateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch
}

if !replicaConfig.ForceReplicate && !changefeedConfig.IgnoreIneligibleTable {
ineligibleTables, _, err := verifyTables(replicaConfig, capture.kvStorage, changefeedConfig.StartTS)
ineligibleTables, _, err := VerifyTables(replicaConfig, capture.kvStorage, changefeedConfig.StartTS)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -201,7 +201,9 @@ func verifyUpdateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch
return newInfo, nil
}

func verifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, startTs uint64) (ineligibleTables, eligibleTables []model.TableName, err error) {
// VerifyTables catalog tables specified by ReplicaConfig into
// eligible (has an unique index or primary key) and ineligible tables.
func VerifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, startTs uint64) (ineligibleTables, eligibleTables []model.TableName, err error) {
filter, err := filter.NewFilter(replicaConfig)
if err != nil {
return nil, nil, errors.Trace(err)
Expand All @@ -219,6 +221,11 @@ func verifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, s
if filter.ShouldIgnoreTable(tableInfo.TableName.Schema, tableInfo.TableName.Table) {
continue
}
// Sequence is not supported yet, TiCDC needs to filter all sequence tables.
// See https://github.com/pingcap/tiflow/issues/4559
if tableInfo.IsSequence() {
continue
}
if !tableInfo.IsEligible(false /* forceReplicate */) {
ineligibleTables = append(ineligibleTables, tableInfo.TableName)
} else {
Expand Down
52 changes: 32 additions & 20 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ type schemaSnapshot struct {

currentTs uint64

// if explicit is true, treat tables without explicit row id as eligible
explicitTables bool
// if forceReplicate is true, treat ineligible tables as eligible.
forceReplicate bool
}

// SingleSchemaSnapshot is a single schema snapshot independent of schema storage
Expand Down Expand Up @@ -98,17 +98,17 @@ func (s *SingleSchemaSnapshot) PreTableInfo(job *timodel.Job) (*model.TableInfo,
}

// NewSingleSchemaSnapshotFromMeta creates a new single schema snapshot from a tidb meta
func NewSingleSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, explicitTables bool) (*SingleSchemaSnapshot, error) {
func NewSingleSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, forceReplicate bool) (*SingleSchemaSnapshot, error) {
// meta is nil only in unit tests
if meta == nil {
snap := newEmptySchemaSnapshot(explicitTables)
snap := newEmptySchemaSnapshot(forceReplicate)
snap.currentTs = currentTs
return snap, nil
}
return newSchemaSnapshotFromMeta(meta, currentTs, explicitTables)
return newSchemaSnapshotFromMeta(meta, currentTs, forceReplicate)
}

func newEmptySchemaSnapshot(explicitTables bool) *schemaSnapshot {
func newEmptySchemaSnapshot(forceReplicate bool) *schemaSnapshot {
return &schemaSnapshot{
tableNameToID: make(map[model.TableName]int64),
schemaNameToID: make(map[string]int64),
Expand All @@ -121,12 +121,12 @@ func newEmptySchemaSnapshot(explicitTables bool) *schemaSnapshot {
truncateTableID: make(map[int64]struct{}),
ineligibleTableID: make(map[int64]struct{}),

explicitTables: explicitTables,
forceReplicate: forceReplicate,
}
}

func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, explicitTables bool) (*schemaSnapshot, error) {
snap := newEmptySchemaSnapshot(explicitTables)
func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, forceReplicate bool) (*schemaSnapshot, error) {
snap := newEmptySchemaSnapshot(forceReplicate)
dbinfos, err := meta.ListDatabases()
if err != nil {
return nil, cerror.WrapError(cerror.ErrMetaListDatabases, err)
Expand All @@ -146,7 +146,7 @@ func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, explicitTabl
tableInfo := model.WrapTableInfo(dbinfo.ID, dbinfo.Name.O, currentTs, tableInfo)
snap.tables[tableInfo.ID] = tableInfo
snap.tableNameToID[model.TableName{Schema: dbinfo.Name.O, Table: tableInfo.Name.O}] = tableInfo.ID
isEligible := tableInfo.IsEligible(explicitTables)
isEligible := tableInfo.IsEligible(forceReplicate)
if !isEligible {
snap.ineligibleTableID[tableInfo.ID] = struct{}{}
}
Expand Down Expand Up @@ -468,7 +468,7 @@ func (s *schemaSnapshot) updatePartition(tbl *model.TableInfo) error {
zap.Int64("add partition id", partition.ID))
}
s.partitionTable[partition.ID] = tbl
if !tbl.IsEligible(s.explicitTables) {
if !tbl.IsEligible(s.forceReplicate) {
s.ineligibleTableID[partition.ID] = struct{}{}
}
delete(oldIDs, partition.ID)
Expand Down Expand Up @@ -504,14 +504,20 @@ func (s *schemaSnapshot) createTable(table *model.TableInfo) error {
s.tableInSchema[table.SchemaID] = tableInSchema

s.tables[table.ID] = table
if !table.IsEligible(s.explicitTables) {
log.Warn("this table is not eligible to replicate", zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID))
if !table.IsEligible(s.forceReplicate) {
// Sequence is not supported yet, and always ineligible.
// Skip Warn to avoid confusion.
// See https://github.com/pingcap/tiflow/issues/4559
if !table.IsSequence() {
log.Warn("this table is ineligible to replicate",
zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID))
}
s.ineligibleTableID[table.ID] = struct{}{}
}
if pi := table.GetPartitionInfo(); pi != nil {
for _, partition := range pi.Definitions {
s.partitionTable[partition.ID] = table
if !table.IsEligible(s.explicitTables) {
if !table.IsEligible(s.forceReplicate) {
s.ineligibleTableID[partition.ID] = struct{}{}
}
}
Expand All @@ -529,14 +535,20 @@ func (s *schemaSnapshot) replaceTable(table *model.TableInfo) error {
return cerror.ErrSnapshotTableNotFound.GenWithStack("table %s(%d)", table.Name, table.ID)
}
s.tables[table.ID] = table
if !table.IsEligible(s.explicitTables) {
log.Warn("this table is not eligible to replicate", zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID))
if !table.IsEligible(s.forceReplicate) {
// Sequence is not supported yet, and always ineligible.
// Skip Warn to avoid confusion.
// See https://github.com/pingcap/tiflow/issues/4559
if !table.IsSequence() {
log.Warn("this table is ineligible to replicate",
zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID))
}
s.ineligibleTableID[table.ID] = struct{}{}
}
if pi := table.GetPartitionInfo(); pi != nil {
for _, partition := range pi.Definitions {
s.partitionTable[partition.ID] = table
if !table.IsEligible(s.explicitTables) {
if !table.IsEligible(s.forceReplicate) {
s.ineligibleTableID[partition.ID] = struct{}{}
}
}
Expand Down Expand Up @@ -673,7 +685,7 @@ type schemaStorageImpl struct {
resolvedTs uint64

filter *filter.Filter
explicitTables bool
forceReplicate bool
}

// NewSchemaStorage creates a new schema storage
Expand All @@ -692,7 +704,7 @@ func NewSchemaStorage(meta *timeta.Meta, startTs uint64, filter *filter.Filter,
snaps: []*schemaSnapshot{snap},
resolvedTs: startTs,
filter: filter,
explicitTables: forceReplicate,
forceReplicate: forceReplicate,
}
return schema, nil
}
Expand Down Expand Up @@ -769,7 +781,7 @@ func (s *schemaStorageImpl) HandleDDLJob(job *timodel.Job) error {
}
snap = lastSnap.Clone()
} else {
snap = newEmptySchemaSnapshot(s.explicitTables)
snap = newEmptySchemaSnapshot(s.forceReplicate)
}
if err := snap.handleDDL(job); err != nil {
return errors.Trace(err)
Expand Down
5 changes: 5 additions & 0 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,11 @@ func (ti *TableInfo) ExistTableUniqueColumn() bool {

// IsEligible returns whether the table is a eligible table
func (ti *TableInfo) IsEligible(forceReplicate bool) bool {
// Sequence is not supported yet, TiCDC needs to filter all sequence tables.
// See https://github.com/pingcap/tiflow/issues/4559
if ti.IsSequence() {
return false
}
if forceReplicate {
return true
}
Expand Down
10 changes: 10 additions & 0 deletions cdc/model/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,19 @@ func (s *schemaStorageSuite) TestTableInfoGetterFuncs(c *check.C) {
info = WrapTableInfo(1, "test", 0, &t)
c.Assert(info.IsEligible(false), check.IsFalse)
c.Assert(info.IsEligible(true), check.IsTrue)

// View is eligible.
t.View = &timodel.ViewInfo{}
info = WrapTableInfo(1, "test", 0, &t)
c.Assert(info.IsView(), check.IsTrue)
c.Assert(info.IsEligible(false), check.IsTrue)

// Sequence is ineligible.
t.Sequence = &timodel.SequenceInfo{}
info = WrapTableInfo(1, "test", 0, &t)
c.Assert(info.IsSequence(), check.IsTrue)
c.Assert(info.IsEligible(false), check.IsFalse)
c.Assert(info.IsEligible(true), check.IsFalse)
}

func (s *schemaStorageSuite) TestTableInfoClone(c *check.C) {
Expand Down
46 changes: 11 additions & 35 deletions cdc/owner/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,48 +116,24 @@ func (s *schemaWrap4Owner) BuildDDLEvent(job *timodel.Job) (*model.DDLEvent, err
return ddlEvent, nil
}

func (s *schemaWrap4Owner) SinkTableInfos() []*model.SimpleTableInfo {
var sinkTableInfos []*model.SimpleTableInfo
for tableID := range s.schemaSnapshot.CloneTables() {
tblInfo, ok := s.schemaSnapshot.TableByID(tableID)
if !ok {
log.Panic("table not found for table ID", zap.Int64("tid", tableID))
}
if s.shouldIgnoreTable(tblInfo) {
continue
}
dbInfo, ok := s.schemaSnapshot.SchemaByTableID(tableID)
if !ok {
log.Panic("schema not found for table ID", zap.Int64("tid", tableID))
}

// TODO separate function for initializing SimpleTableInfo
sinkTableInfo := new(model.SimpleTableInfo)
sinkTableInfo.Schema = dbInfo.Name.O
sinkTableInfo.TableID = tableID
sinkTableInfo.Table = tblInfo.TableName.Table
sinkTableInfo.ColumnInfo = make([]*model.ColumnInfo, len(tblInfo.Cols()))
for i, colInfo := range tblInfo.Cols() {
sinkTableInfo.ColumnInfo[i] = new(model.ColumnInfo)
sinkTableInfo.ColumnInfo[i].FromTiColumnInfo(colInfo)
}
sinkTableInfos = append(sinkTableInfos, sinkTableInfo)
}
return sinkTableInfos
}

func (s *schemaWrap4Owner) shouldIgnoreTable(tableInfo *model.TableInfo) bool {
schemaName := tableInfo.TableName.Schema
tableName := tableInfo.TableName.Table
func (s *schemaWrap4Owner) shouldIgnoreTable(t *model.TableInfo) bool {
schemaName := t.TableName.Schema
tableName := t.TableName.Table
if s.filter.ShouldIgnoreTable(schemaName, tableName) {
return true
}
if s.config.Cyclic.IsEnabled() && mark.IsMarkTable(schemaName, tableName) {
// skip the mark table if cyclic is enabled
return true
}
if !tableInfo.IsEligible(s.config.ForceReplicate) {
log.Warn("skip ineligible table", zap.Int64("tid", tableInfo.ID), zap.Stringer("table", tableInfo.TableName))
if !t.IsEligible(s.config.ForceReplicate) {
// Sequence is not supported yet, and always ineligible.
// Skip Warn to avoid confusion.
// See https://github.com/pingcap/tiflow/issues/4559
if !t.IsSequence() {
log.Warn("skip ineligible table",
zap.Int64("tableID", t.ID), zap.Stringer("tableName", t.TableName))
}
return true
}
return false
Expand Down
25 changes: 0 additions & 25 deletions cdc/owner/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,28 +145,3 @@ func (s *schemaSuite) TestBuildDDLEvent(c *check.C) {
},
})
}

func (s *schemaSuite) TestSinkTableInfos(c *check.C) {
defer testleak.AfterTest(c)()
helper := entry.NewSchemaTestHelper(c)
defer helper.Close()
ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope)
c.Assert(err, check.IsNil)
schema, err := newSchemaWrap4Owner(helper.Storage(), ver.Ver, config.GetDefaultReplicaConfig())
c.Assert(err, check.IsNil)
// add normal table
job := helper.DDL2Job("create table test.t1(id int primary key)")
tableIDT1 := job.BinlogInfo.TableInfo.ID
c.Assert(schema.HandleDDL(job), check.IsNil)
// add ineligible table
job = helper.DDL2Job("create table test.t2(id int)")
c.Assert(schema.HandleDDL(job), check.IsNil)
c.Assert(schema.SinkTableInfos(), check.DeepEquals, []*model.SimpleTableInfo{
{
Schema: "test",
Table: "t1",
TableID: tableIDT1,
ColumnInfo: []*model.ColumnInfo{{Name: "id", Type: mysql.TypeLong}},
},
})
}
31 changes: 2 additions & 29 deletions pkg/cmd/cli/cli_changefeed_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tiflow/cdc"
"github.com/pingcap/tiflow/cdc/entry"
captureAPI "github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/cmd/util"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/httputil"
"github.com/pingcap/tiflow/pkg/security"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -85,33 +84,7 @@ func getTables(cliPdAddr string, credential *security.Credential, cfg *config.Re
return nil, nil, err
}

meta, err := kv.GetSnapshotMeta(kvStore, startTs)
if err != nil {
return nil, nil, errors.Trace(err)
}

filter, err := filter.NewFilter(cfg)
if err != nil {
return nil, nil, errors.Trace(err)
}

snap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, startTs, false /* explicitTables */)
if err != nil {
return nil, nil, errors.Trace(err)
}

for _, tableInfo := range snap.Tables() {
if filter.ShouldIgnoreTable(tableInfo.TableName.Schema, tableInfo.TableName.Table) {
continue
}
if !tableInfo.IsEligible(false /* forceReplicate */) {
ineligibleTables = append(ineligibleTables, tableInfo.TableName)
} else {
eligibleTables = append(eligibleTables, tableInfo.TableName)
}
}

return
return captureAPI.VerifyTables(cfg, kvStore, startTs)
}

// sendOwnerChangefeedQuery sends owner changefeed query request.
Expand Down
5 changes: 5 additions & 0 deletions pkg/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ func (s *filterSuite) TestShouldDiscardDDL(c *check.C) {
c.Assert(filter.ShouldDiscardDDL(model.ActionDropSchema), check.IsFalse)
c.Assert(filter.ShouldDiscardDDL(model.ActionAddForeignKey), check.IsFalse)
c.Assert(filter.ShouldDiscardDDL(model.ActionCreateSequence), check.IsTrue)

// Discard sequence DDL.
c.Assert(filter.ShouldDiscardDDL(model.ActionCreateSequence), check.IsTrue)
c.Assert(filter.ShouldDiscardDDL(model.ActionAlterSequence), check.IsTrue)
c.Assert(filter.ShouldDiscardDDL(model.ActionDropSequence), check.IsTrue)
}

func (s *filterSuite) TestShouldIgnoreDDL(c *check.C) {
Expand Down
29 changes: 29 additions & 0 deletions tests/integration_tests/sequence/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/sequence/sync_diff/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["sequence_test.t1"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
1 change: 1 addition & 0 deletions tests/integration_tests/sequence/conf/force_replicate.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
force-replicate=true
Loading

0 comments on commit aad3ef6

Please sign in to comment.