Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter (ticdc): fix synpoint table is replicated in BDR mode (#10587) #10607

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions cdc/syncpointstore/mysql_syncpoint_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,11 @@ import (
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/errorutil"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/sink/mysql"
"go.uber.org/zap"
)

const (
// syncPointTableName is the name of table where all syncpoint maps sit
syncPointTableName string = "syncpoint_v1"
// schemaName is the name of database where syncPoint maps sit
schemaName = "tidb_cdc"
)

type mysqlSyncPointStore struct {
db *sql.DB
clusterID string
Expand Down Expand Up @@ -88,7 +82,7 @@ func newMySQLSyncPointStore(
}

func (s *mysqlSyncPointStore) CreateSyncTable(ctx context.Context) error {
database := schemaName
database := filter.TiCDCSystemSchema
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
log.Error("create sync table: begin Tx fail", zap.Error(err))
Expand Down Expand Up @@ -120,7 +114,7 @@ func (s *mysqlSyncPointStore) CreateSyncTable(ctx context.Context) error {
INDEX (created_at),
PRIMARY KEY (changefeed, primary_ts)
);`
query = fmt.Sprintf(query, syncPointTableName)
query = fmt.Sprintf(query, filter.SyncPointTable)
_, err = tx.Exec(query)
if err != nil {
err2 := tx.Rollback()
Expand Down Expand Up @@ -154,7 +148,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context,
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
}
// insert ts map
query := "insert ignore into " + schemaName + "." + syncPointTableName +
query := "insert ignore into " + filter.TiCDCSystemSchema + "." + filter.SyncPointTable +
"(ticdc_cluster_id, changefeed, primary_ts, secondary_ts) VALUES (?,?,?,?)"
_, err = tx.Exec(query, s.clusterID, id.ID, checkpointTs, secondaryTs)
if err != nil {
Expand Down Expand Up @@ -186,8 +180,8 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context,
if time.Since(s.lastCleanSyncPointTime) >= s.syncPointRetention {
query = fmt.Sprintf(
"DELETE IGNORE FROM "+
schemaName+"."+
syncPointTableName+
filter.TiCDCSystemSchema+"."+
filter.SyncPointTable+
" WHERE ticdc_cluster_id = '%s' and changefeed = '%s' and created_at < (NOW() - INTERVAL %.2f SECOND)",
s.clusterID,
id.ID,
Expand Down
7 changes: 7 additions & 0 deletions pkg/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ import (
"github.com/pingcap/tiflow/pkg/config"
)

const (
// SyncPointTable is the tale name use to write ts-map when sync-point is enable.
SyncPointTable = "syncpoint_v1"
// TiCDCSystemSchema is the schema only use by TiCDC.
TiCDCSystemSchema = "tidb_cdc"
)

// allowDDLList is a list of DDL types that can be applied to cdc's schema storage.
// It's a white list.
var allowDDLList = []timodel.ActionType{
Expand Down
2 changes: 1 addition & 1 deletion pkg/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestShouldUseDefaultRules(t *testing.T) {
require.True(t, filter.ShouldIgnoreTable("performance_schema", ""))
require.False(t, filter.ShouldIgnoreTable("metric_schema", "query_duration"))
require.False(t, filter.ShouldIgnoreTable("sns", "user"))
require.False(t, filter.ShouldIgnoreTable("tidb_cdc", "repl_mark_a_a"))
require.True(t, filter.ShouldIgnoreTable("tidb_cdc", "repl_mark_a_a"))
}

func TestShouldUseCustomRules(t *testing.T) {
Expand Down
9 changes: 8 additions & 1 deletion pkg/filter/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,14 @@ import (

// isSysSchema returns true if the given schema is a system schema
func isSysSchema(db string) bool {
return tifilter.IsSystemSchema(db)
switch db {
// TiCDCSystemSchema is used by TiCDC only.
// Tables in TiCDCSystemSchema should not be replicated by cdc.
case TiCDCSystemSchema:
return true
default:
return tifilter.IsSystemSchema(db)
}
}

// VerifyTableRules checks the table filter rules in the configuration
Expand Down
1 change: 1 addition & 0 deletions pkg/filter/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func TestIsSchema(t *testing.T) {
{tifilter.InspectionSchemaName, true},
{tifilter.PerformanceSchemaName, true},
{tifilter.MetricSchemaName, true},
{TiCDCSystemSchema, true},
}
for _, c := range cases {
require.Equal(t, c.result, isSysSchema(c.schema))
Expand Down
1 change: 0 additions & 1 deletion tests/integration_tests/bdr_mode/conf/cf.toml

This file was deleted.

1 change: 1 addition & 0 deletions tests/integration_tests/bdr_mode/conf/down.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
bdr-mode = true
3 changes: 3 additions & 0 deletions tests/integration_tests/bdr_mode/conf/up.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
bdr-mode = true
enable-sync-point = true
sync-point-interval = "30s"
8 changes: 6 additions & 2 deletions tests/integration_tests/bdr_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,20 @@ function run() {
check_table_exists "bdr_mode.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

# up -> down
run_cdc_cli changefeed create --sink-uri="$SINK_URI_1" -c "test-1" --config="$CUR/conf/cf.toml"
run_cdc_cli changefeed create --sink-uri="$SINK_URI_1" -c "test-1" --config="$CUR/conf/up.toml"
# down -> up
run_cdc_cli changefeed create --sink-uri="$SINK_URI_2" -c "test-2" --server "http://127.0.0.1:8400" --config="$CUR/conf/cf.toml"
run_cdc_cli changefeed create --sink-uri="$SINK_URI_2" -c "test-2" --server "http://127.0.0.1:8400" --config="$CUR/conf/down.toml"

run_sql_file $CUR/data/up.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql_file $CUR/data/down.sql ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

run_sql_file $CUR/data/finished.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql_file $CUR/data/finished.sql ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

# syncpoint table should exists in secondary tidb, but does not exists in primary cluster
check_table_exists "tidb_cdc.syncpoint_v1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 60
check_table_not_exists "tidb_cdc.syncpoint_v1" ${UP_TIDB_HOST} ${UP_TIDB_PORT} 60

check_table_exists "bdr_mode.finish_mark" ${UP_TIDB_HOST} ${UP_TIDB_PORT} 60
check_table_exists "bdr_mode.finish_mark" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 60

Expand Down
Loading