From 5f05520b7a724449606eec43f988d145e4eef4df Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 19 Feb 2024 14:52:56 +0800 Subject: [PATCH] filter (ticdc): fix synpoint table is replicated in BDR mode (#10587) (#10607) close pingcap/tiflow#10576 --- cdc/syncpointstore/mysql_syncpoint_store.go | 18 ++++++------------ pkg/filter/filter.go | 7 +++++++ pkg/filter/filter_test.go | 2 +- pkg/filter/utils.go | 9 ++++++++- pkg/filter/utils_test.go | 1 + tests/integration_tests/bdr_mode/conf/cf.toml | 1 - .../integration_tests/bdr_mode/conf/down.toml | 1 + tests/integration_tests/bdr_mode/conf/up.toml | 3 +++ tests/integration_tests/bdr_mode/run.sh | 8 ++++++-- 9 files changed, 33 insertions(+), 17 deletions(-) delete mode 100644 tests/integration_tests/bdr_mode/conf/cf.toml create mode 100644 tests/integration_tests/bdr_mode/conf/down.toml create mode 100644 tests/integration_tests/bdr_mode/conf/up.toml diff --git a/cdc/syncpointstore/mysql_syncpoint_store.go b/cdc/syncpointstore/mysql_syncpoint_store.go index ce4e88df234..53aaa4f0cd4 100644 --- a/cdc/syncpointstore/mysql_syncpoint_store.go +++ b/cdc/syncpointstore/mysql_syncpoint_store.go @@ -26,17 +26,11 @@ import ( "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/errorutil" + "github.com/pingcap/tiflow/pkg/filter" "github.com/pingcap/tiflow/pkg/sink/mysql" "go.uber.org/zap" ) -const ( - // syncPointTableName is the name of table where all syncpoint maps sit - syncPointTableName string = "syncpoint_v1" - // schemaName is the name of database where syncPoint maps sit - schemaName = "tidb_cdc" -) - type mysqlSyncPointStore struct { db *sql.DB clusterID string @@ -88,7 +82,7 @@ func newMySQLSyncPointStore( } func (s *mysqlSyncPointStore) CreateSyncTable(ctx context.Context) error { - database := schemaName + database := filter.TiCDCSystemSchema tx, err := s.db.BeginTx(ctx, nil) if err != nil { log.Error("create sync table: begin Tx fail", zap.Error(err)) @@ -120,7 +114,7 @@ func (s *mysqlSyncPointStore) CreateSyncTable(ctx context.Context) error { INDEX (created_at), PRIMARY KEY (changefeed, primary_ts) );` - query = fmt.Sprintf(query, syncPointTableName) + query = fmt.Sprintf(query, filter.SyncPointTable) _, err = tx.Exec(query) if err != nil { err2 := tx.Rollback() @@ -154,7 +148,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context, return cerror.WrapError(cerror.ErrMySQLTxnError, err) } // insert ts map - query := "insert ignore into " + schemaName + "." + syncPointTableName + + query := "insert ignore into " + filter.TiCDCSystemSchema + "." + filter.SyncPointTable + "(ticdc_cluster_id, changefeed, primary_ts, secondary_ts) VALUES (?,?,?,?)" _, err = tx.Exec(query, s.clusterID, id.ID, checkpointTs, secondaryTs) if err != nil { @@ -186,8 +180,8 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context, if time.Since(s.lastCleanSyncPointTime) >= s.syncPointRetention { query = fmt.Sprintf( "DELETE IGNORE FROM "+ - schemaName+"."+ - syncPointTableName+ + filter.TiCDCSystemSchema+"."+ + filter.SyncPointTable+ " WHERE ticdc_cluster_id = '%s' and changefeed = '%s' and created_at < (NOW() - INTERVAL %.2f SECOND)", s.clusterID, id.ID, diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go index d61a897928f..7c438cf6b31 100644 --- a/pkg/filter/filter.go +++ b/pkg/filter/filter.go @@ -20,6 +20,13 @@ import ( "github.com/pingcap/tiflow/pkg/config" ) +const ( + // SyncPointTable is the tale name use to write ts-map when sync-point is enable. + SyncPointTable = "syncpoint_v1" + // TiCDCSystemSchema is the schema only use by TiCDC. + TiCDCSystemSchema = "tidb_cdc" +) + // allowDDLList is a list of DDL types that can be applied to cdc's schema storage. // It's a white list. var allowDDLList = []timodel.ActionType{ diff --git a/pkg/filter/filter_test.go b/pkg/filter/filter_test.go index 9d59ec5dd50..9ec9c7baa52 100644 --- a/pkg/filter/filter_test.go +++ b/pkg/filter/filter_test.go @@ -33,7 +33,7 @@ func TestShouldUseDefaultRules(t *testing.T) { require.True(t, filter.ShouldIgnoreTable("performance_schema", "")) require.False(t, filter.ShouldIgnoreTable("metric_schema", "query_duration")) require.False(t, filter.ShouldIgnoreTable("sns", "user")) - require.False(t, filter.ShouldIgnoreTable("tidb_cdc", "repl_mark_a_a")) + require.True(t, filter.ShouldIgnoreTable("tidb_cdc", "repl_mark_a_a")) } func TestShouldUseCustomRules(t *testing.T) { diff --git a/pkg/filter/utils.go b/pkg/filter/utils.go index a046e31e7a7..1fed8d36dad 100644 --- a/pkg/filter/utils.go +++ b/pkg/filter/utils.go @@ -27,7 +27,14 @@ import ( // isSysSchema returns true if the given schema is a system schema func isSysSchema(db string) bool { - return tifilter.IsSystemSchema(db) + switch db { + // TiCDCSystemSchema is used by TiCDC only. + // Tables in TiCDCSystemSchema should not be replicated by cdc. + case TiCDCSystemSchema: + return true + default: + return tifilter.IsSystemSchema(db) + } } // VerifyTableRules checks the table filter rules in the configuration diff --git a/pkg/filter/utils_test.go b/pkg/filter/utils_test.go index c3441e10877..ebf4ff46514 100644 --- a/pkg/filter/utils_test.go +++ b/pkg/filter/utils_test.go @@ -42,6 +42,7 @@ func TestIsSchema(t *testing.T) { {tifilter.InspectionSchemaName, true}, {tifilter.PerformanceSchemaName, true}, {tifilter.MetricSchemaName, true}, + {TiCDCSystemSchema, true}, } for _, c := range cases { require.Equal(t, c.result, isSysSchema(c.schema)) diff --git a/tests/integration_tests/bdr_mode/conf/cf.toml b/tests/integration_tests/bdr_mode/conf/cf.toml deleted file mode 100644 index b6929758e72..00000000000 --- a/tests/integration_tests/bdr_mode/conf/cf.toml +++ /dev/null @@ -1 +0,0 @@ -bdr-mode = true diff --git a/tests/integration_tests/bdr_mode/conf/down.toml b/tests/integration_tests/bdr_mode/conf/down.toml new file mode 100644 index 00000000000..6b23a869479 --- /dev/null +++ b/tests/integration_tests/bdr_mode/conf/down.toml @@ -0,0 +1 @@ +bdr-mode = true \ No newline at end of file diff --git a/tests/integration_tests/bdr_mode/conf/up.toml b/tests/integration_tests/bdr_mode/conf/up.toml new file mode 100644 index 00000000000..ac2e6c7cf56 --- /dev/null +++ b/tests/integration_tests/bdr_mode/conf/up.toml @@ -0,0 +1,3 @@ +bdr-mode = true +enable-sync-point = true +sync-point-interval = "30s" \ No newline at end of file diff --git a/tests/integration_tests/bdr_mode/run.sh b/tests/integration_tests/bdr_mode/run.sh index 25007391f56..03aa405a166 100644 --- a/tests/integration_tests/bdr_mode/run.sh +++ b/tests/integration_tests/bdr_mode/run.sh @@ -37,9 +37,9 @@ function run() { check_table_exists "bdr_mode.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} # up -> down - run_cdc_cli changefeed create --sink-uri="$SINK_URI_1" -c "test-1" --config="$CUR/conf/cf.toml" + run_cdc_cli changefeed create --sink-uri="$SINK_URI_1" -c "test-1" --config="$CUR/conf/up.toml" # down -> up - run_cdc_cli changefeed create --sink-uri="$SINK_URI_2" -c "test-2" --server "http://127.0.0.1:8400" --config="$CUR/conf/cf.toml" + run_cdc_cli changefeed create --sink-uri="$SINK_URI_2" -c "test-2" --server "http://127.0.0.1:8400" --config="$CUR/conf/down.toml" run_sql_file $CUR/data/up.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql_file $CUR/data/down.sql ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} @@ -47,6 +47,10 @@ function run() { run_sql_file $CUR/data/finished.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql_file $CUR/data/finished.sql ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + # syncpoint table should exists in secondary tidb, but does not exists in primary cluster + check_table_exists "tidb_cdc.syncpoint_v1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 60 + check_table_not_exists "tidb_cdc.syncpoint_v1" ${UP_TIDB_HOST} ${UP_TIDB_PORT} 60 + check_table_exists "bdr_mode.finish_mark" ${UP_TIDB_HOST} ${UP_TIDB_PORT} 60 check_table_exists "bdr_mode.finish_mark" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 60