From b15d639a280ac6e0898f29ff011aea71d6dde355 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sun, 24 Apr 2022 16:40:49 +0800 Subject: [PATCH 1/2] *(dm): add some warning and adjust some config (#5223) ref pingcap/tiflow#5220 --- Makefile | 9 ++ dm/dm/config/source_config.go | 10 +- dm/dm/config/source_config_test.go | 4 +- dm/dm/ctl/master/check_task.go | 20 +++- dm/dm/ctl/master/operate_source.go | 3 + dm/dm/ctl/master/start_stop_relay.go | 3 + dm/dm/ctl/master/start_task.go | 16 +++ dm/dm/worker/source_worker.go | 2 +- dm/tests/adjust_gtid/run.sh | 6 +- dm/tests/dmctl_basic/check_list/check_task.sh | 8 ++ dm/tests/dmctl_basic/check_list/start_task.sh | 8 ++ dm/tests/dmctl_basic/conf/dm-task5.yaml | 102 ++++++++++++++++++ dm/tests/dmctl_basic/run.sh | 2 + .../configs/sources/mysql-replica-01.yaml | 1 - dm/tests/new_relay/run.sh | 3 +- 15 files changed, 185 insertions(+), 12 deletions(-) create mode 100644 dm/tests/dmctl_basic/conf/dm-task5.yaml diff --git a/Makefile b/Makefile index 9d88951bd8c..dd7d4f999f6 100644 --- a/Makefile +++ b/Makefile @@ -362,6 +362,15 @@ dm_integration_test_build_master: check_failpoint_ctl $(FAILPOINT_DISABLE) ./dm/tests/prepare_tools.sh +dm_integration_test_build_ctl: check_failpoint_ctl + $(FAILPOINT_ENABLE) + $(GOTESTNORACE) -ldflags '$(LDFLAGS)' -c -cover -covermode=count \ + -coverpkg=github.com/pingcap/tiflow/dm/... \ + -o bin/dmctl.test github.com/pingcap/tiflow/dm/cmd/dm-ctl \ + || { $(FAILPOINT_DISABLE); exit 1; } + $(FAILPOINT_DISABLE) + ./dm/tests/prepare_tools.sh + install_test_python_dep: @echo "install python requirments for test" pip install --user -q -r ./dm/tests/requirements.txt diff --git a/dm/dm/config/source_config.go b/dm/dm/config/source_config.go index e37846c6fb1..31c4f9564aa 100644 --- a/dm/dm/config/source_config.go +++ b/dm/dm/config/source_config.go @@ -67,9 +67,11 @@ type SourceConfig struct { // deprecated AutoFixGTID bool `yaml:"auto-fix-gtid" toml:"auto-fix-gtid" json:"auto-fix-gtid"` RelayDir string `yaml:"relay-dir" toml:"relay-dir" json:"relay-dir"` - MetaDir string `yaml:"meta-dir" toml:"meta-dir" json:"meta-dir"` - Flavor string `yaml:"flavor" toml:"flavor" json:"flavor"` - Charset string `yaml:"charset" toml:"charset" json:"charset"` + // deprecated + MetaDir string `yaml:"meta-dir" toml:"meta-dir" json:"meta-dir"` + Flavor string `yaml:"flavor" toml:"flavor" json:"flavor"` + // deprecated + Charset string `yaml:"charset" toml:"charset" json:"charset"` EnableRelay bool `yaml:"enable-relay" toml:"enable-relay" json:"enable-relay"` // relay synchronous starting point (if specified) @@ -412,7 +414,6 @@ type SourceConfigForDowngrade struct { Enable bool `yaml:"enable,omitempty"` EnableGTID bool `yaml:"enable-gtid"` RelayDir string `yaml:"relay-dir"` - MetaDir string `yaml:"meta-dir"` Flavor string `yaml:"flavor"` Charset string `yaml:"charset"` EnableRelay bool `yaml:"enable-relay"` @@ -436,7 +437,6 @@ func NewSourceConfigForDowngrade(sourceCfg *SourceConfig) *SourceConfigForDowngr Enable: sourceCfg.Enable, EnableGTID: sourceCfg.EnableGTID, RelayDir: sourceCfg.RelayDir, - MetaDir: sourceCfg.MetaDir, Flavor: sourceCfg.Flavor, Charset: sourceCfg.Charset, EnableRelay: sourceCfg.EnableRelay, diff --git a/dm/dm/config/source_config_test.go b/dm/dm/config/source_config_test.go index b79767cc4f8..13e050d924b 100644 --- a/dm/dm/config/source_config_test.go +++ b/dm/dm/config/source_config_test.go @@ -244,8 +244,8 @@ func (t *testConfig) TestSourceConfigForDowngrade(c *C) { cfgForDowngrade := NewSourceConfigForDowngrade(cfg) cfgReflect := reflect.Indirect(reflect.ValueOf(cfg)) cfgForDowngradeReflect := reflect.Indirect(reflect.ValueOf(cfgForDowngrade)) - // auto-fix-gtid is not written when downgrade - c.Assert(cfgReflect.NumField(), Equals, cfgForDowngradeReflect.NumField()+1) + // auto-fix-gtid, meta-dir are not written when downgrade + c.Assert(cfgReflect.NumField(), Equals, cfgForDowngradeReflect.NumField()+2) // make sure all field were copied cfgForClone := &SourceConfigForDowngrade{} diff --git a/dm/dm/ctl/master/check_task.go b/dm/dm/ctl/master/check_task.go index 52c9b2c78f2..c90ff4f0fbc 100644 --- a/dm/dm/ctl/master/check_task.go +++ b/dm/dm/ctl/master/check_task.go @@ -14,10 +14,12 @@ package master import ( + "bytes" "context" "errors" "os" + "github.com/pingcap/tiflow/dm/dm/config" "github.com/spf13/cobra" "github.com/pingcap/tiflow/dm/checker" @@ -63,10 +65,26 @@ func checkTaskFunc(cmd *cobra.Command, _ []string) error { return err } + // we check if `is-sharding` is explicitly set, to distinguish between `false` from default value + isShardingSet := false + lines := bytes.Split(content, []byte("\n")) + for i := range lines { + if bytes.HasPrefix(lines[i], []byte("is-sharding")) { + isShardingSet = true + break + } + } + task := config.NewTaskConfig() + yamlErr := task.RawDecode(string(content)) + // if can't parse yaml, we ignore this check + if yamlErr == nil && isShardingSet && !task.IsSharding && task.ShardMode != "" { + common.PrintLinesf("The behaviour of `is-sharding` and `shard-mode` is conflicting. `is-sharding` is deprecated, please use `shard-mode` only.") + return errors.New("please check output to see error") + } + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // start task resp := &pb.CheckTaskResponse{} err = common.SendRequest( ctx, diff --git a/dm/dm/ctl/master/operate_source.go b/dm/dm/ctl/master/operate_source.go index 2841fec3e7a..6c985913cca 100644 --- a/dm/dm/ctl/master/operate_source.go +++ b/dm/dm/ctl/master/operate_source.go @@ -136,6 +136,9 @@ func operateSourceFunc(cmd *cobra.Command, _ []string) error { } content = []byte(yamlStr) } + if cfg.RelayDir != "" { + common.PrintLinesf("`relay-dir` in source config will be deprecated soon, please use `relay-dir` in worker config instead") + } contents = append(contents, string(content)) } diff --git a/dm/dm/ctl/master/start_stop_relay.go b/dm/dm/ctl/master/start_stop_relay.go index 8eb29463c5c..1e47c9e6924 100644 --- a/dm/dm/ctl/master/start_stop_relay.go +++ b/dm/dm/ctl/master/start_stop_relay.go @@ -72,6 +72,9 @@ func startStopRelay(cmd *cobra.Command, op pb.RelayOpV2) error { } workers := cmd.Flags().Args() + if len(workers) > 0 { + common.PrintLinesf("start-relay/stop-relay with worker name will be deprecated soon. You can try stopping relay first and use start-relay without worker name instead") + } ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/dm/dm/ctl/master/start_task.go b/dm/dm/ctl/master/start_task.go index 6d70bd41999..1c423d0fe54 100644 --- a/dm/dm/ctl/master/start_task.go +++ b/dm/dm/ctl/master/start_task.go @@ -14,6 +14,7 @@ package master import ( + "bytes" "context" "errors" "os" @@ -67,6 +68,21 @@ func startTaskFunc(cmd *cobra.Command, _ []string) error { } content = []byte(task.String()) } + + // we check if `is-sharding` is explicitly set, to distinguish between `false` from default value + isShardingSet := false + lines := bytes.Split(content, []byte("\n")) + for i := range lines { + if bytes.HasPrefix(lines[i], []byte("is-sharding")) { + isShardingSet = true + break + } + } + if isShardingSet && !task.IsSharding && task.ShardMode != "" { + common.PrintLinesf("The behaviour of `is-sharding` and `shard-mode` is conflicting. `is-sharding` is deprecated, please use `shard-mode` only.") + return errors.New("please check output to see error") + } + sources, err := common.GetSourceArgs(cmd) if err != nil { return err diff --git a/dm/dm/worker/source_worker.go b/dm/dm/worker/source_worker.go index fb34412a6b8..9d43eff9406 100644 --- a/dm/dm/worker/source_worker.go +++ b/dm/dm/worker/source_worker.go @@ -202,7 +202,7 @@ func (w *SourceWorker) Start() { } } -// Close stops working and releases resources. +// Stop stops working and releases resources. func (w *SourceWorker) Stop(graceful bool) { if w.closed.Load() { w.l.Warn("already closed") diff --git a/dm/tests/adjust_gtid/run.sh b/dm/tests/adjust_gtid/run.sh index 01676329be1..d7d5f12c7e4 100755 --- a/dm/tests/adjust_gtid/run.sh +++ b/dm/tests/adjust_gtid/run.sh @@ -68,7 +68,11 @@ function run() { sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml # make sure source1 is bound to worker1 - dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "operate-source create $WORK_DIR/source1.yaml" \ + "\"result\": true" 2 \ + "\"source\": \"$SOURCE_ID1\"" 1 \ + "will be deprecated soon" 1 run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT diff --git a/dm/tests/dmctl_basic/check_list/check_task.sh b/dm/tests/dmctl_basic/check_list/check_task.sh index d3a628c95af..95899bca558 100644 --- a/dm/tests/dmctl_basic/check_list/check_task.sh +++ b/dm/tests/dmctl_basic/check_list/check_task.sh @@ -27,6 +27,14 @@ function check_task_not_pass() { "\"result\": false" 1 } +function check_task_not_pass_with_message() { + task_conf=$1 + error_message=$2 + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "check-task $task_conf" \ + "$error_message" 1 +} + function check_task_error_database_config() { task_conf=$1 run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ diff --git a/dm/tests/dmctl_basic/check_list/start_task.sh b/dm/tests/dmctl_basic/check_list/start_task.sh index 6e41a0b7b70..0cb6758f292 100644 --- a/dm/tests/dmctl_basic/check_list/start_task.sh +++ b/dm/tests/dmctl_basic/check_list/start_task.sh @@ -18,3 +18,11 @@ function start_task_wrong_start_time_format() { "start-task $task_conf --start-time '20060102 150405'" \ "error while parse start-time" 1 } + +function start_task_not_pass_with_message() { + task_conf=$1 + error_message=$2 + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task $task_conf" \ + "$2" 1 +} diff --git a/dm/tests/dmctl_basic/conf/dm-task5.yaml b/dm/tests/dmctl_basic/conf/dm-task5.yaml new file mode 100644 index 00000000000..07277194340 --- /dev/null +++ b/dm/tests/dmctl_basic/conf/dm-task5.yaml @@ -0,0 +1,102 @@ +--- +name: test +task-mode: all +is-sharding: false +shard-mode: "pessimistic" +meta-schema: "dm_meta" +enable-heartbeat: false + +target-database: + host: "127.0.0.1" + port: 4000 + user: "root" + password: "" + +mysql-instances: + - source-id: "mysql-replica-01" + block-allow-list: "instance" + route-rules: ["sharding-route-rules-table", "sharding-route-rules-schema"] + column-mapping-rules: ["instance-1"] + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + # sync t_1's alter event by task config(overwrite) + # ignore t_2's alter event by source config + filter-rules: ["user-filter-1"] + + - source-id: "mysql-replica-02" + block-allow-list: "instance" + route-rules: ["sharding-route-rules-table", "sharding-route-rules-schema"] + column-mapping-rules: ["instance-2"] + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + # ignore t_2's alter event by task config + filter-rules: ["user-filter-2"] + +block-allow-list: + instance: + do-dbs: ["dmctl"] + do-tables: + - db-name: "dmctl" + tbl-name: "~^t_[\\d]+" + +routes: + sharding-route-rules-table: + schema-pattern: dmctl + table-pattern: t_* + target-schema: dmctl + target-table: t_target + + sharding-route-rules-schema: + schema-pattern: dmctl + target-schema: dmctl + +column-mappings: + instance-1: + schema-pattern: "dmctl" + table-pattern: "t_*" + expression: "partition id" + source-column: "id" + target-column: "id" + arguments: ["1", "", "t_"] + + instance-2: + schema-pattern: "dmctl" + table-pattern: "t_*" + expression: "partition id" + source-column: "id" + target-column: "id" + arguments: ["2", "", "t_"] + +mydumpers: + global: + threads: 4 + chunk-filesize: 64 + skip-tz-utc: true + extra-args: "" + +loaders: + global: + pool-size: 16 + dir: "./dumped_data" + +syncers: + global: + worker-count: 16 + batch: 100 + checkpoint-flush-interval: 1 + compact: true + multiple-rows: true + +filters: + user-filter-1: + schema-pattern: "dmctl" + table-pattern: "t_1" + events: ["all"] + action: Do + user-filter-2: + schema-pattern: "dmctl" + table-pattern: "t_2" + sql-pattern: ["alter table .* add column aaa int"] + action: Ignore diff --git a/dm/tests/dmctl_basic/run.sh b/dm/tests/dmctl_basic/run.sh index 5437d4f6aa5..828b6ace68e 100755 --- a/dm/tests/dmctl_basic/run.sh +++ b/dm/tests/dmctl_basic/run.sh @@ -282,6 +282,8 @@ function run() { check_task_wrong_start_time_format $cur/conf/dm-task3.yaml check_task_not_pass $cur/conf/dm-task2.yaml check_task_error_count $cur/conf/dm-task3.yaml + check_task_not_pass_with_message $cur/conf/dm-task5.yaml "please use \`shard-mode\` only." + start_task_not_pass_with_message $cur/conf/dm-task5.yaml "please use \`shard-mode\` only." echo "check_task_optimistic" check_task_pass $cur/conf/dm-task4.yaml diff --git a/dm/tests/new_relay/configs/sources/mysql-replica-01.yaml b/dm/tests/new_relay/configs/sources/mysql-replica-01.yaml index 307da65bb3d..4812f648787 100644 --- a/dm/tests/new_relay/configs/sources/mysql-replica-01.yaml +++ b/dm/tests/new_relay/configs/sources/mysql-replica-01.yaml @@ -1,6 +1,5 @@ enable-gtid: true relay-dir: relay-dir -meta-dir: "" flavor: mysql charset: "" enable-relay: false diff --git a/dm/tests/new_relay/run.sh b/dm/tests/new_relay/run.sh index b359bd0c606..8fb149d0e2f 100755 --- a/dm/tests/new_relay/run.sh +++ b/dm/tests/new_relay/run.sh @@ -23,7 +23,8 @@ function test_restart_relay_status() { dmctl_operate_source create $cur/conf/source1.yaml $SOURCE_ID1 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-relay -s $SOURCE_ID1 worker1" + "start-relay -s $SOURCE_ID1 worker1" \ + "will be deprecated soon" 1 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status -s $SOURCE_ID1" \ "\"result\": true" 2 \ From 321e9e1c977af1e5b8769afb38d8b46bb92daf0a Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Sun, 24 Apr 2022 20:12:49 +0800 Subject: [PATCH 2/2] processor(cdc): extract getTableName method (#5258) close pingcap/tiflow#5259 --- cdc/processor/processor.go | 97 ++++++++++++++++++++++++-------------- cdc/sink/sink_manager.go | 2 +- 2 files changed, 62 insertions(+), 37 deletions(-) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 780037b8d24..784499009eb 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -289,7 +289,7 @@ func (p *processor) Tick(ctx cdcContext.Context, state *orchestrator.ChangefeedR ID: state.ID, Info: state.Info, }) - _, err := p.tick(ctx, state) + err := p.tick(ctx, state) costTime := time.Since(startTime) if costTime > processorLogsWarnDuration { @@ -331,33 +331,33 @@ func (p *processor) Tick(ctx cdcContext.Context, state *orchestrator.ChangefeedR return state, cerror.ErrReactorFinished.GenWithStackByArgs() } -func (p *processor) tick(ctx cdcContext.Context, state *orchestrator.ChangefeedReactorState) (nextState orchestrator.ReactorState, err error) { +func (p *processor) tick(ctx cdcContext.Context, state *orchestrator.ChangefeedReactorState) error { p.changefeed = state if !p.checkChangefeedNormal() { - return nil, cerror.ErrAdminStopProcessor.GenWithStackByArgs() + return cerror.ErrAdminStopProcessor.GenWithStackByArgs() } // we should skip this tick after create a task position if p.createTaskPosition() { - return p.changefeed, nil + return nil } if err := p.handleErrorCh(ctx); err != nil { - return nil, errors.Trace(err) + return errors.Trace(err) } if err := p.lazyInit(ctx); err != nil { - return nil, errors.Trace(err) + return errors.Trace(err) } // sink manager will return this checkpointTs to sink node if sink node resolvedTs flush failed p.sinkManager.UpdateChangeFeedCheckpointTs(state.Info.GetCheckpointTs(state.Status)) if err := p.handleTableOperation(ctx); err != nil { - return nil, errors.Trace(err) + return errors.Trace(err) } if err := p.checkTablesNum(ctx); err != nil { - return nil, errors.Trace(err) + return errors.Trace(err) } if err := p.flushRedoLogMeta(ctx); err != nil { - return nil, err + return err } - // it is no need to check the err here, because we will use + // it is no need to check the error here, because we will use // local time when an error return, which is acceptable pdTime, _ := ctx.GlobalVars().PDClock.CurrentTime() @@ -378,10 +378,10 @@ func (p *processor) tick(ctx cdcContext.Context, state *orchestrator.ChangefeedR if p.newSchedulerEnabled { if err := p.agent.Tick(ctx); err != nil { - return nil, errors.Trace(err) + return errors.Trace(err) } } - return p.changefeed, nil + return nil } // checkChangefeedNormal checks if the changefeed is runnable. @@ -927,16 +927,10 @@ func (p *processor) addTable(ctx cdcContext.Context, tableID model.TableID, repl return nil } -func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error) { - ctx = cdcContext.WithErrorHandler(ctx, func(err error) error { - if cerror.ErrTableProcessorStoppedSafely.Equal(err) || - errors.Cause(errors.Cause(err)) == context.Canceled { - return nil - } - p.sendError(err) - return nil - }) - +func (p *processor) getTableName(ctx cdcContext.Context, + tableID model.TableID, + replicaInfo *model.TableReplicaInfo, +) (string, error) { // FIXME: using GetLastSnapshot here would be confused and get the wrong table name // after `rename table` DDL, since `rename table` keeps the tableID unchanged var tableName *model.TableName @@ -945,8 +939,11 @@ func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID mode tableName = &name return nil } - return errors.Errorf("failed to get table name, fallback to use table id: %d", tableID) - }, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), retry.WithMaxTries(maxTries), retry.WithIsRetryableErr(cerror.IsRetryableError)) + return errors.Errorf("failed to get table name, fallback to use table id: %d", + tableID) + }, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), + retry.WithMaxTries(maxTries), + retry.WithIsRetryableErr(cerror.IsRetryableError)) // TODO: remove this feature flag after table actor is GA if p.changefeed.Info.Config.Cyclic.IsEnabled() { // Retry to find mark table ID @@ -955,29 +952,57 @@ func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID mode if tableName == nil { name, exist := p.schemaStorage.GetLastSnapshot().GetTableNameByID(tableID) if !exist { - return cerror.ErrProcessorTableNotFound.GenWithStack("normal table(%s)", tableID) + return cerror.ErrProcessorTableNotFound. + GenWithStack("normal table(%s)", tableID) } tableName = &name } - markTableSchemaName, markTableTableName := mark.GetMarkTableName(tableName.Schema, tableName.Table) - tableInfo, exist := p.schemaStorage.GetLastSnapshot().GetTableByName(markTableSchemaName, markTableTableName) + markTableSchemaName, markTableTableName := mark.GetMarkTableName( + tableName.Schema, tableName.Table) + tableInfo, exist := p.schemaStorage. + GetLastSnapshot(). + GetTableByName(markTableSchemaName, markTableTableName) if !exist { - return cerror.ErrProcessorTableNotFound.GenWithStack("normal table(%s) and mark table not match", tableName.String()) + return cerror.ErrProcessorTableNotFound. + GenWithStack("normal table(%s) and mark table not match", + tableName.String()) } markTableID = tableInfo.ID return nil - }, retry.WithBackoffBaseDelay(50), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(20)) + }, retry.WithBackoffBaseDelay(50), + retry.WithBackoffMaxDelay(60*1000), + retry.WithMaxTries(20)) if err != nil { - return nil, errors.Trace(err) + return "", errors.Trace(err) } replicaInfo.MarkTableID = markTableID } - var tableNameStr string + if tableName == nil { log.Warn("failed to get table name for metric") - tableNameStr = strconv.Itoa(int(tableID)) - } else { - tableNameStr = tableName.QuoteString() + return strconv.Itoa(int(tableID)), nil + } + + return tableName.QuoteString(), nil +} + +func (p *processor) createTablePipelineImpl( + ctx cdcContext.Context, + tableID model.TableID, + replicaInfo *model.TableReplicaInfo, +) (tablepipeline.TablePipeline, error) { + ctx = cdcContext.WithErrorHandler(ctx, func(err error) error { + if cerror.ErrTableProcessorStoppedSafely.Equal(err) || + errors.Cause(errors.Cause(err)) == context.Canceled { + return nil + } + p.sendError(err) + return nil + }) + + tableName, err := p.getTableName(ctx, tableID, replicaInfo) + if err != nil { + return nil, errors.Trace(err) } sink, err := p.sinkManager.CreateTableSink(tableID, p.redoManager) @@ -991,7 +1016,7 @@ func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID mode ctx, p.mounter, tableID, - tableNameStr, + tableName, replicaInfo, sink, p.changefeed.Info.GetTargetTs()) @@ -1003,7 +1028,7 @@ func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID mode ctx, p.mounter, tableID, - tableNameStr, + tableName, replicaInfo, sink, p.changefeed.Info.GetTargetTs(), diff --git a/cdc/sink/sink_manager.go b/cdc/sink/sink_manager.go index 6d63f89675c..c56e6e70b2e 100644 --- a/cdc/sink/sink_manager.go +++ b/cdc/sink/sink_manager.go @@ -101,7 +101,7 @@ func (m *Manager) destroyTableSink(ctx context.Context, tableID model.TableID) e return m.bufSink.Barrier(ctx, tableID) } -// UpdateChangeFeedCheckpointTs updates changedfeed level checkpointTs, +// UpdateChangeFeedCheckpointTs updates changefeed level checkpointTs, // this value is used in getCheckpointTs func func (m *Manager) UpdateChangeFeedCheckpointTs(checkpointTs uint64) { if m.bufSink != nil {