Skip to content

Commit

Permalink
filter (ticdc): fix synpoint table is replicated in BDR mode (#10587) (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Feb 27, 2024
1 parent 65918ab commit 50b1b81
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 17 deletions.
18 changes: 6 additions & 12 deletions cdc/syncpointstore/mysql_syncpoint_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,11 @@ import (
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/errorutil"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/sink/mysql"
"go.uber.org/zap"
)

const (
// syncPointTableName is the name of table where all syncpoint maps sit
syncPointTableName string = "syncpoint_v1"
// schemaName is the name of database where syncPoint maps sit
schemaName = "tidb_cdc"
)

type mysqlSyncPointStore struct {
db *sql.DB
clusterID string
Expand Down Expand Up @@ -88,7 +82,7 @@ func newMySQLSyncPointStore(
}

func (s *mysqlSyncPointStore) CreateSyncTable(ctx context.Context) error {
database := schemaName
database := filter.TiCDCSystemSchema
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
log.Error("create sync table: begin Tx fail", zap.Error(err))
Expand Down Expand Up @@ -120,7 +114,7 @@ func (s *mysqlSyncPointStore) CreateSyncTable(ctx context.Context) error {
INDEX (created_at),
PRIMARY KEY (changefeed, primary_ts)
);`
query = fmt.Sprintf(query, syncPointTableName)
query = fmt.Sprintf(query, filter.SyncPointTable)
_, err = tx.Exec(query)
if err != nil {
err2 := tx.Rollback()
Expand Down Expand Up @@ -154,7 +148,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context,
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
}
// insert ts map
query := "insert ignore into " + schemaName + "." + syncPointTableName +
query := "insert ignore into " + filter.TiCDCSystemSchema + "." + filter.SyncPointTable +
"(ticdc_cluster_id, changefeed, primary_ts, secondary_ts) VALUES (?,?,?,?)"
_, err = tx.Exec(query, s.clusterID, id.ID, checkpointTs, secondaryTs)
if err != nil {
Expand Down Expand Up @@ -186,8 +180,8 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context,
if time.Since(s.lastCleanSyncPointTime) >= s.syncPointRetention {
query = fmt.Sprintf(
"DELETE IGNORE FROM "+
schemaName+"."+
syncPointTableName+
filter.TiCDCSystemSchema+"."+
filter.SyncPointTable+
" WHERE ticdc_cluster_id = '%s' and changefeed = '%s' and created_at < (NOW() - INTERVAL %.2f SECOND)",
s.clusterID,
id.ID,
Expand Down
7 changes: 7 additions & 0 deletions pkg/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ import (
"github.com/pingcap/tiflow/pkg/config"
)

const (
// SyncPointTable is the tale name use to write ts-map when sync-point is enable.
SyncPointTable = "syncpoint_v1"
// TiCDCSystemSchema is the schema only use by TiCDC.
TiCDCSystemSchema = "tidb_cdc"
)

// allowDDLList is a list of DDL types that can be applied to cdc's schema storage.
// It's a white list.
var allowDDLList = []timodel.ActionType{
Expand Down
2 changes: 1 addition & 1 deletion pkg/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestShouldUseDefaultRules(t *testing.T) {
require.True(t, filter.ShouldIgnoreTable("performance_schema", ""))
require.False(t, filter.ShouldIgnoreTable("metric_schema", "query_duration"))
require.False(t, filter.ShouldIgnoreTable("sns", "user"))
require.False(t, filter.ShouldIgnoreTable("tidb_cdc", "repl_mark_a_a"))
require.True(t, filter.ShouldIgnoreTable("tidb_cdc", "repl_mark_a_a"))
}

func TestShouldUseCustomRules(t *testing.T) {
Expand Down
9 changes: 8 additions & 1 deletion pkg/filter/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,14 @@ import (

// isSysSchema returns true if the given schema is a system schema
func isSysSchema(db string) bool {
return tifilter.IsSystemSchema(db)
switch db {
// TiCDCSystemSchema is used by TiCDC only.
// Tables in TiCDCSystemSchema should not be replicated by cdc.
case TiCDCSystemSchema:
return true
default:
return tifilter.IsSystemSchema(db)
}
}

// VerifyTableRules checks the table filter rules in the configuration
Expand Down
1 change: 1 addition & 0 deletions pkg/filter/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func TestIsSchema(t *testing.T) {
{tifilter.InspectionSchemaName, true},
{tifilter.PerformanceSchemaName, true},
{tifilter.MetricSchemaName, true},
{TiCDCSystemSchema, true},
}
for _, c := range cases {
require.Equal(t, c.result, isSysSchema(c.schema))
Expand Down
1 change: 0 additions & 1 deletion tests/integration_tests/bdr_mode/conf/cf.toml

This file was deleted.

1 change: 1 addition & 0 deletions tests/integration_tests/bdr_mode/conf/down.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
bdr-mode = true
3 changes: 3 additions & 0 deletions tests/integration_tests/bdr_mode/conf/up.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
bdr-mode = true
enable-sync-point = true
sync-point-interval = "30s"
8 changes: 6 additions & 2 deletions tests/integration_tests/bdr_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,20 @@ function run() {
check_table_exists "bdr_mode.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

# up -> down
run_cdc_cli changefeed create --sink-uri="$SINK_URI_1" -c "test-1" --config="$CUR/conf/cf.toml"
run_cdc_cli changefeed create --sink-uri="$SINK_URI_1" -c "test-1" --config="$CUR/conf/up.toml"
# down -> up
run_cdc_cli changefeed create --sink-uri="$SINK_URI_2" -c "test-2" --server "http://127.0.0.1:8400" --config="$CUR/conf/cf.toml"
run_cdc_cli changefeed create --sink-uri="$SINK_URI_2" -c "test-2" --server "http://127.0.0.1:8400" --config="$CUR/conf/down.toml"

run_sql_file $CUR/data/up.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql_file $CUR/data/down.sql ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

run_sql_file $CUR/data/finished.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql_file $CUR/data/finished.sql ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

# syncpoint table should exists in secondary tidb, but does not exists in primary cluster
check_table_exists "tidb_cdc.syncpoint_v1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 60
check_table_not_exists "tidb_cdc.syncpoint_v1" ${UP_TIDB_HOST} ${UP_TIDB_PORT} 60

check_table_exists "bdr_mode.finish_mark" ${UP_TIDB_HOST} ${UP_TIDB_PORT} 60
check_table_exists "bdr_mode.finish_mark" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 60

Expand Down

0 comments on commit 50b1b81

Please sign in to comment.