From aad3ef6bbf973316d767893da366eaa7f4793cfc Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Wed, 23 Feb 2022 20:49:42 +0800 Subject: [PATCH] cdc: always ignore sequence tables (#4563) close pingcap/tiflow#4552 --- cdc/capture/http_validator.go | 11 +++- cdc/entry/schema_storage.go | 52 ++++++++++++------- cdc/model/schema_storage.go | 5 ++ cdc/model/schema_storage_test.go | 10 ++++ cdc/owner/schema.go | 46 ++++------------ cdc/owner/schema_test.go | 25 --------- pkg/cmd/cli/cli_changefeed_helper.go | 31 +---------- pkg/filter/filter_test.go | 5 ++ .../sequence/conf/diff_config.toml | 29 +++++++++++ .../sequence/conf/force_replicate.toml | 1 + .../integration_tests/sequence/data/test.sql | 25 +++++++++ tests/integration_tests/sequence/run.sh | 52 +++++++++++++++++++ 12 files changed, 181 insertions(+), 111 deletions(-) create mode 100644 tests/integration_tests/sequence/conf/diff_config.toml create mode 100644 tests/integration_tests/sequence/conf/force_replicate.toml create mode 100644 tests/integration_tests/sequence/data/test.sql create mode 100755 tests/integration_tests/sequence/run.sh diff --git a/cdc/capture/http_validator.go b/cdc/capture/http_validator.go index 3e2b60b825d..66b3a24edfc 100644 --- a/cdc/capture/http_validator.go +++ b/cdc/capture/http_validator.go @@ -128,7 +128,7 @@ func verifyCreateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch } if !replicaConfig.ForceReplicate && !changefeedConfig.IgnoreIneligibleTable { - ineligibleTables, _, err := verifyTables(replicaConfig, capture.kvStorage, changefeedConfig.StartTS) + ineligibleTables, _, err := VerifyTables(replicaConfig, capture.kvStorage, changefeedConfig.StartTS) if err != nil { return nil, err } @@ -201,7 +201,9 @@ func verifyUpdateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch return newInfo, nil } -func verifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, startTs uint64) (ineligibleTables, eligibleTables []model.TableName, err error) { +// VerifyTables catalog tables specified by ReplicaConfig into +// eligible (has an unique index or primary key) and ineligible tables. +func VerifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, startTs uint64) (ineligibleTables, eligibleTables []model.TableName, err error) { filter, err := filter.NewFilter(replicaConfig) if err != nil { return nil, nil, errors.Trace(err) @@ -219,6 +221,11 @@ func verifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, s if filter.ShouldIgnoreTable(tableInfo.TableName.Schema, tableInfo.TableName.Table) { continue } + // Sequence is not supported yet, TiCDC needs to filter all sequence tables. + // See https://github.com/pingcap/tiflow/issues/4559 + if tableInfo.IsSequence() { + continue + } if !tableInfo.IsEligible(false /* forceReplicate */) { ineligibleTables = append(ineligibleTables, tableInfo.TableName) } else { diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index a23cb651bec..cd039bcc21d 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -50,8 +50,8 @@ type schemaSnapshot struct { currentTs uint64 - // if explicit is true, treat tables without explicit row id as eligible - explicitTables bool + // if forceReplicate is true, treat ineligible tables as eligible. + forceReplicate bool } // SingleSchemaSnapshot is a single schema snapshot independent of schema storage @@ -98,17 +98,17 @@ func (s *SingleSchemaSnapshot) PreTableInfo(job *timodel.Job) (*model.TableInfo, } // NewSingleSchemaSnapshotFromMeta creates a new single schema snapshot from a tidb meta -func NewSingleSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, explicitTables bool) (*SingleSchemaSnapshot, error) { +func NewSingleSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, forceReplicate bool) (*SingleSchemaSnapshot, error) { // meta is nil only in unit tests if meta == nil { - snap := newEmptySchemaSnapshot(explicitTables) + snap := newEmptySchemaSnapshot(forceReplicate) snap.currentTs = currentTs return snap, nil } - return newSchemaSnapshotFromMeta(meta, currentTs, explicitTables) + return newSchemaSnapshotFromMeta(meta, currentTs, forceReplicate) } -func newEmptySchemaSnapshot(explicitTables bool) *schemaSnapshot { +func newEmptySchemaSnapshot(forceReplicate bool) *schemaSnapshot { return &schemaSnapshot{ tableNameToID: make(map[model.TableName]int64), schemaNameToID: make(map[string]int64), @@ -121,12 +121,12 @@ func newEmptySchemaSnapshot(explicitTables bool) *schemaSnapshot { truncateTableID: make(map[int64]struct{}), ineligibleTableID: make(map[int64]struct{}), - explicitTables: explicitTables, + forceReplicate: forceReplicate, } } -func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, explicitTables bool) (*schemaSnapshot, error) { - snap := newEmptySchemaSnapshot(explicitTables) +func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, forceReplicate bool) (*schemaSnapshot, error) { + snap := newEmptySchemaSnapshot(forceReplicate) dbinfos, err := meta.ListDatabases() if err != nil { return nil, cerror.WrapError(cerror.ErrMetaListDatabases, err) @@ -146,7 +146,7 @@ func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, explicitTabl tableInfo := model.WrapTableInfo(dbinfo.ID, dbinfo.Name.O, currentTs, tableInfo) snap.tables[tableInfo.ID] = tableInfo snap.tableNameToID[model.TableName{Schema: dbinfo.Name.O, Table: tableInfo.Name.O}] = tableInfo.ID - isEligible := tableInfo.IsEligible(explicitTables) + isEligible := tableInfo.IsEligible(forceReplicate) if !isEligible { snap.ineligibleTableID[tableInfo.ID] = struct{}{} } @@ -468,7 +468,7 @@ func (s *schemaSnapshot) updatePartition(tbl *model.TableInfo) error { zap.Int64("add partition id", partition.ID)) } s.partitionTable[partition.ID] = tbl - if !tbl.IsEligible(s.explicitTables) { + if !tbl.IsEligible(s.forceReplicate) { s.ineligibleTableID[partition.ID] = struct{}{} } delete(oldIDs, partition.ID) @@ -504,14 +504,20 @@ func (s *schemaSnapshot) createTable(table *model.TableInfo) error { s.tableInSchema[table.SchemaID] = tableInSchema s.tables[table.ID] = table - if !table.IsEligible(s.explicitTables) { - log.Warn("this table is not eligible to replicate", zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID)) + if !table.IsEligible(s.forceReplicate) { + // Sequence is not supported yet, and always ineligible. + // Skip Warn to avoid confusion. + // See https://github.com/pingcap/tiflow/issues/4559 + if !table.IsSequence() { + log.Warn("this table is ineligible to replicate", + zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID)) + } s.ineligibleTableID[table.ID] = struct{}{} } if pi := table.GetPartitionInfo(); pi != nil { for _, partition := range pi.Definitions { s.partitionTable[partition.ID] = table - if !table.IsEligible(s.explicitTables) { + if !table.IsEligible(s.forceReplicate) { s.ineligibleTableID[partition.ID] = struct{}{} } } @@ -529,14 +535,20 @@ func (s *schemaSnapshot) replaceTable(table *model.TableInfo) error { return cerror.ErrSnapshotTableNotFound.GenWithStack("table %s(%d)", table.Name, table.ID) } s.tables[table.ID] = table - if !table.IsEligible(s.explicitTables) { - log.Warn("this table is not eligible to replicate", zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID)) + if !table.IsEligible(s.forceReplicate) { + // Sequence is not supported yet, and always ineligible. + // Skip Warn to avoid confusion. + // See https://github.com/pingcap/tiflow/issues/4559 + if !table.IsSequence() { + log.Warn("this table is ineligible to replicate", + zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID)) + } s.ineligibleTableID[table.ID] = struct{}{} } if pi := table.GetPartitionInfo(); pi != nil { for _, partition := range pi.Definitions { s.partitionTable[partition.ID] = table - if !table.IsEligible(s.explicitTables) { + if !table.IsEligible(s.forceReplicate) { s.ineligibleTableID[partition.ID] = struct{}{} } } @@ -673,7 +685,7 @@ type schemaStorageImpl struct { resolvedTs uint64 filter *filter.Filter - explicitTables bool + forceReplicate bool } // NewSchemaStorage creates a new schema storage @@ -692,7 +704,7 @@ func NewSchemaStorage(meta *timeta.Meta, startTs uint64, filter *filter.Filter, snaps: []*schemaSnapshot{snap}, resolvedTs: startTs, filter: filter, - explicitTables: forceReplicate, + forceReplicate: forceReplicate, } return schema, nil } @@ -769,7 +781,7 @@ func (s *schemaStorageImpl) HandleDDLJob(job *timodel.Job) error { } snap = lastSnap.Clone() } else { - snap = newEmptySchemaSnapshot(s.explicitTables) + snap = newEmptySchemaSnapshot(s.forceReplicate) } if err := snap.handleDDL(job); err != nil { return errors.Trace(err) diff --git a/cdc/model/schema_storage.go b/cdc/model/schema_storage.go index 909adac9739..d8cf828ef06 100644 --- a/cdc/model/schema_storage.go +++ b/cdc/model/schema_storage.go @@ -307,6 +307,11 @@ func (ti *TableInfo) ExistTableUniqueColumn() bool { // IsEligible returns whether the table is a eligible table func (ti *TableInfo) IsEligible(forceReplicate bool) bool { + // Sequence is not supported yet, TiCDC needs to filter all sequence tables. + // See https://github.com/pingcap/tiflow/issues/4559 + if ti.IsSequence() { + return false + } if forceReplicate { return true } diff --git a/cdc/model/schema_storage_test.go b/cdc/model/schema_storage_test.go index 5979546c707..13bbddb1058 100644 --- a/cdc/model/schema_storage_test.go +++ b/cdc/model/schema_storage_test.go @@ -401,9 +401,19 @@ func (s *schemaStorageSuite) TestTableInfoGetterFuncs(c *check.C) { info = WrapTableInfo(1, "test", 0, &t) c.Assert(info.IsEligible(false), check.IsFalse) c.Assert(info.IsEligible(true), check.IsTrue) + + // View is eligible. t.View = &timodel.ViewInfo{} info = WrapTableInfo(1, "test", 0, &t) + c.Assert(info.IsView(), check.IsTrue) c.Assert(info.IsEligible(false), check.IsTrue) + + // Sequence is ineligible. + t.Sequence = &timodel.SequenceInfo{} + info = WrapTableInfo(1, "test", 0, &t) + c.Assert(info.IsSequence(), check.IsTrue) + c.Assert(info.IsEligible(false), check.IsFalse) + c.Assert(info.IsEligible(true), check.IsFalse) } func (s *schemaStorageSuite) TestTableInfoClone(c *check.C) { diff --git a/cdc/owner/schema.go b/cdc/owner/schema.go index 9cd467191b1..1ceb9cdef53 100644 --- a/cdc/owner/schema.go +++ b/cdc/owner/schema.go @@ -116,39 +116,9 @@ func (s *schemaWrap4Owner) BuildDDLEvent(job *timodel.Job) (*model.DDLEvent, err return ddlEvent, nil } -func (s *schemaWrap4Owner) SinkTableInfos() []*model.SimpleTableInfo { - var sinkTableInfos []*model.SimpleTableInfo - for tableID := range s.schemaSnapshot.CloneTables() { - tblInfo, ok := s.schemaSnapshot.TableByID(tableID) - if !ok { - log.Panic("table not found for table ID", zap.Int64("tid", tableID)) - } - if s.shouldIgnoreTable(tblInfo) { - continue - } - dbInfo, ok := s.schemaSnapshot.SchemaByTableID(tableID) - if !ok { - log.Panic("schema not found for table ID", zap.Int64("tid", tableID)) - } - - // TODO separate function for initializing SimpleTableInfo - sinkTableInfo := new(model.SimpleTableInfo) - sinkTableInfo.Schema = dbInfo.Name.O - sinkTableInfo.TableID = tableID - sinkTableInfo.Table = tblInfo.TableName.Table - sinkTableInfo.ColumnInfo = make([]*model.ColumnInfo, len(tblInfo.Cols())) - for i, colInfo := range tblInfo.Cols() { - sinkTableInfo.ColumnInfo[i] = new(model.ColumnInfo) - sinkTableInfo.ColumnInfo[i].FromTiColumnInfo(colInfo) - } - sinkTableInfos = append(sinkTableInfos, sinkTableInfo) - } - return sinkTableInfos -} - -func (s *schemaWrap4Owner) shouldIgnoreTable(tableInfo *model.TableInfo) bool { - schemaName := tableInfo.TableName.Schema - tableName := tableInfo.TableName.Table +func (s *schemaWrap4Owner) shouldIgnoreTable(t *model.TableInfo) bool { + schemaName := t.TableName.Schema + tableName := t.TableName.Table if s.filter.ShouldIgnoreTable(schemaName, tableName) { return true } @@ -156,8 +126,14 @@ func (s *schemaWrap4Owner) shouldIgnoreTable(tableInfo *model.TableInfo) bool { // skip the mark table if cyclic is enabled return true } - if !tableInfo.IsEligible(s.config.ForceReplicate) { - log.Warn("skip ineligible table", zap.Int64("tid", tableInfo.ID), zap.Stringer("table", tableInfo.TableName)) + if !t.IsEligible(s.config.ForceReplicate) { + // Sequence is not supported yet, and always ineligible. + // Skip Warn to avoid confusion. + // See https://github.com/pingcap/tiflow/issues/4559 + if !t.IsSequence() { + log.Warn("skip ineligible table", + zap.Int64("tableID", t.ID), zap.Stringer("tableName", t.TableName)) + } return true } return false diff --git a/cdc/owner/schema_test.go b/cdc/owner/schema_test.go index 3bb993e6dd8..5951daa3964 100644 --- a/cdc/owner/schema_test.go +++ b/cdc/owner/schema_test.go @@ -145,28 +145,3 @@ func (s *schemaSuite) TestBuildDDLEvent(c *check.C) { }, }) } - -func (s *schemaSuite) TestSinkTableInfos(c *check.C) { - defer testleak.AfterTest(c)() - helper := entry.NewSchemaTestHelper(c) - defer helper.Close() - ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope) - c.Assert(err, check.IsNil) - schema, err := newSchemaWrap4Owner(helper.Storage(), ver.Ver, config.GetDefaultReplicaConfig()) - c.Assert(err, check.IsNil) - // add normal table - job := helper.DDL2Job("create table test.t1(id int primary key)") - tableIDT1 := job.BinlogInfo.TableInfo.ID - c.Assert(schema.HandleDDL(job), check.IsNil) - // add ineligible table - job = helper.DDL2Job("create table test.t2(id int)") - c.Assert(schema.HandleDDL(job), check.IsNil) - c.Assert(schema.SinkTableInfos(), check.DeepEquals, []*model.SimpleTableInfo{ - { - Schema: "test", - Table: "t1", - TableID: tableIDT1, - ColumnInfo: []*model.ColumnInfo{{Name: "id", Type: mysql.TypeLong}}, - }, - }) -} diff --git a/pkg/cmd/cli/cli_changefeed_helper.go b/pkg/cmd/cli/cli_changefeed_helper.go index 4e63b82c08e..8780a025ab1 100644 --- a/pkg/cmd/cli/cli_changefeed_helper.go +++ b/pkg/cmd/cli/cli_changefeed_helper.go @@ -22,12 +22,11 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tiflow/cdc" - "github.com/pingcap/tiflow/cdc/entry" + captureAPI "github.com/pingcap/tiflow/cdc/capture" "github.com/pingcap/tiflow/cdc/kv" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/pingcap/tiflow/pkg/config" - "github.com/pingcap/tiflow/pkg/filter" "github.com/pingcap/tiflow/pkg/httputil" "github.com/pingcap/tiflow/pkg/security" "github.com/spf13/cobra" @@ -85,33 +84,7 @@ func getTables(cliPdAddr string, credential *security.Credential, cfg *config.Re return nil, nil, err } - meta, err := kv.GetSnapshotMeta(kvStore, startTs) - if err != nil { - return nil, nil, errors.Trace(err) - } - - filter, err := filter.NewFilter(cfg) - if err != nil { - return nil, nil, errors.Trace(err) - } - - snap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, startTs, false /* explicitTables */) - if err != nil { - return nil, nil, errors.Trace(err) - } - - for _, tableInfo := range snap.Tables() { - if filter.ShouldIgnoreTable(tableInfo.TableName.Schema, tableInfo.TableName.Table) { - continue - } - if !tableInfo.IsEligible(false /* forceReplicate */) { - ineligibleTables = append(ineligibleTables, tableInfo.TableName) - } else { - eligibleTables = append(eligibleTables, tableInfo.TableName) - } - } - - return + return captureAPI.VerifyTables(cfg, kvStore, startTs) } // sendOwnerChangefeedQuery sends owner changefeed query request. diff --git a/pkg/filter/filter_test.go b/pkg/filter/filter_test.go index 8408413031a..d2d243c06f1 100644 --- a/pkg/filter/filter_test.go +++ b/pkg/filter/filter_test.go @@ -138,6 +138,11 @@ func (s *filterSuite) TestShouldDiscardDDL(c *check.C) { c.Assert(filter.ShouldDiscardDDL(model.ActionDropSchema), check.IsFalse) c.Assert(filter.ShouldDiscardDDL(model.ActionAddForeignKey), check.IsFalse) c.Assert(filter.ShouldDiscardDDL(model.ActionCreateSequence), check.IsTrue) + + // Discard sequence DDL. + c.Assert(filter.ShouldDiscardDDL(model.ActionCreateSequence), check.IsTrue) + c.Assert(filter.ShouldDiscardDDL(model.ActionAlterSequence), check.IsTrue) + c.Assert(filter.ShouldDiscardDDL(model.ActionDropSequence), check.IsTrue) } func (s *filterSuite) TestShouldIgnoreDDL(c *check.C) { diff --git a/tests/integration_tests/sequence/conf/diff_config.toml b/tests/integration_tests/sequence/conf/diff_config.toml new file mode 100644 index 00000000000..cbb7eee0d11 --- /dev/null +++ b/tests/integration_tests/sequence/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/sequence/sync_diff/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["sequence_test.t1"] + +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.tidb0] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/sequence/conf/force_replicate.toml b/tests/integration_tests/sequence/conf/force_replicate.toml new file mode 100644 index 00000000000..dba5fc404ab --- /dev/null +++ b/tests/integration_tests/sequence/conf/force_replicate.toml @@ -0,0 +1 @@ +force-replicate=true diff --git a/tests/integration_tests/sequence/data/test.sql b/tests/integration_tests/sequence/data/test.sql new file mode 100644 index 00000000000..8b98c97f726 --- /dev/null +++ b/tests/integration_tests/sequence/data/test.sql @@ -0,0 +1,25 @@ +drop database if exists `sequence_test`; +create database `sequence_test`; +use `sequence_test`; + +CREATE SEQUENCE seq0 start with 1 minvalue 1 maxvalue 999999999999999 increment by 1 nocache cycle; + +-- select seq0 +SELECT next value for seq0; +-- select again +SELECT next value for seq0; + +-- t1 refers seq0 +-- note that only TiDB supports it. +CREATE TABLE t1 ( + id VARCHAR(255), + a INT default next value for seq0, + PRIMARY KEY(id) +); + +-- TiCDC is able to replicate following changes to TiDB. +INSERT INTO t1 (id) VALUES ('111'); +INSERT INTO t1 (id) VALUES ('222'); +INSERT INTO t1 (id) VALUES ('333'); +UPDATE t1 SET id = '10' WHERE id = '111'; +DELETE FROM t1 WHERE a = 222; diff --git a/tests/integration_tests/sequence/run.sh b/tests/integration_tests/sequence/run.sh new file mode 100755 index 00000000000..702131d6015 --- /dev/null +++ b/tests/integration_tests/sequence/run.sh @@ -0,0 +1,52 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function run() { + # No need to test kafka. + if [ "$SINK_TYPE" == "kafka" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + # record tso before we create tables to skip the system table DDLs + start_ts=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + SINK_URI="mysql://normal:123456@127.0.0.1:3306/" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config $CUR/conf/force_replicate.toml + + run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + # sync_diff can't check non-exist table, so we check expected tables are created in downstream first + + check_table_exists sequence_test.t1 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + echo "check table exists success" + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 60 + + # TiCDC discards all SEQUENCE DDL for now. + # See https://github.com/pingcap/tiflow/issues/4559 + ! run_sql "SHOW CREATE SEQUENCE sequence_test.seq0;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + run_sql "DROP SEQUENCE sequence_test.seq0;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + # Make sure changefeed is normal. + run_sql "CREATE table sequence_test.mark_table(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "sequence_test.mark_table" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"