Skip to content

Commit

Permalink
Merge branch 'master' into rustin-patch-flowcontrol
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 authored Apr 25, 2022
2 parents d26702b + 321e9e1 commit d21b7b0
Show file tree
Hide file tree
Showing 17 changed files with 247 additions and 49 deletions.
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,15 @@ dm_integration_test_build_master: check_failpoint_ctl
$(FAILPOINT_DISABLE)
./dm/tests/prepare_tools.sh

dm_integration_test_build_ctl: check_failpoint_ctl
$(FAILPOINT_ENABLE)
$(GOTESTNORACE) -ldflags '$(LDFLAGS)' -c -cover -covermode=count \
-coverpkg=github.com/pingcap/tiflow/dm/... \
-o bin/dmctl.test github.com/pingcap/tiflow/dm/cmd/dm-ctl \
|| { $(FAILPOINT_DISABLE); exit 1; }
$(FAILPOINT_DISABLE)
./dm/tests/prepare_tools.sh

install_test_python_dep:
@echo "install python requirments for test"
pip install --user -q -r ./dm/tests/requirements.txt
Expand Down
97 changes: 61 additions & 36 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func (p *processor) Tick(ctx cdcContext.Context, state *orchestrator.ChangefeedR
ID: state.ID,
Info: state.Info,
})
_, err := p.tick(ctx, state)
err := p.tick(ctx, state)

costTime := time.Since(startTime)
if costTime > processorLogsWarnDuration {
Expand Down Expand Up @@ -331,33 +331,33 @@ func (p *processor) Tick(ctx cdcContext.Context, state *orchestrator.ChangefeedR
return state, cerror.ErrReactorFinished.GenWithStackByArgs()
}

func (p *processor) tick(ctx cdcContext.Context, state *orchestrator.ChangefeedReactorState) (nextState orchestrator.ReactorState, err error) {
func (p *processor) tick(ctx cdcContext.Context, state *orchestrator.ChangefeedReactorState) error {
p.changefeed = state
if !p.checkChangefeedNormal() {
return nil, cerror.ErrAdminStopProcessor.GenWithStackByArgs()
return cerror.ErrAdminStopProcessor.GenWithStackByArgs()
}
// we should skip this tick after create a task position
if p.createTaskPosition() {
return p.changefeed, nil
return nil
}
if err := p.handleErrorCh(ctx); err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}
if err := p.lazyInit(ctx); err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}
// sink manager will return this checkpointTs to sink node if sink node resolvedTs flush failed
p.sinkManager.UpdateChangeFeedCheckpointTs(state.Info.GetCheckpointTs(state.Status))
if err := p.handleTableOperation(ctx); err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}
if err := p.checkTablesNum(ctx); err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}
if err := p.flushRedoLogMeta(ctx); err != nil {
return nil, err
return err
}
// it is no need to check the err here, because we will use
// it is no need to check the error here, because we will use
// local time when an error return, which is acceptable
pdTime, _ := ctx.GlobalVars().PDClock.CurrentTime()

Expand All @@ -378,10 +378,10 @@ func (p *processor) tick(ctx cdcContext.Context, state *orchestrator.ChangefeedR

if p.newSchedulerEnabled {
if err := p.agent.Tick(ctx); err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}
}
return p.changefeed, nil
return nil
}

// checkChangefeedNormal checks if the changefeed is runnable.
Expand Down Expand Up @@ -927,16 +927,10 @@ func (p *processor) addTable(ctx cdcContext.Context, tableID model.TableID, repl
return nil
}

func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error) {
ctx = cdcContext.WithErrorHandler(ctx, func(err error) error {
if cerror.ErrTableProcessorStoppedSafely.Equal(err) ||
errors.Cause(errors.Cause(err)) == context.Canceled {
return nil
}
p.sendError(err)
return nil
})

func (p *processor) getTableName(ctx cdcContext.Context,
tableID model.TableID,
replicaInfo *model.TableReplicaInfo,
) (string, error) {
// FIXME: using GetLastSnapshot here would be confused and get the wrong table name
// after `rename table` DDL, since `rename table` keeps the tableID unchanged
var tableName *model.TableName
Expand All @@ -945,8 +939,11 @@ func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID mode
tableName = &name
return nil
}
return errors.Errorf("failed to get table name, fallback to use table id: %d", tableID)
}, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), retry.WithMaxTries(maxTries), retry.WithIsRetryableErr(cerror.IsRetryableError))
return errors.Errorf("failed to get table name, fallback to use table id: %d",
tableID)
}, retry.WithBackoffBaseDelay(backoffBaseDelayInMs),
retry.WithMaxTries(maxTries),
retry.WithIsRetryableErr(cerror.IsRetryableError))
// TODO: remove this feature flag after table actor is GA
if p.changefeed.Info.Config.Cyclic.IsEnabled() {
// Retry to find mark table ID
Expand All @@ -955,29 +952,57 @@ func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID mode
if tableName == nil {
name, exist := p.schemaStorage.GetLastSnapshot().GetTableNameByID(tableID)
if !exist {
return cerror.ErrProcessorTableNotFound.GenWithStack("normal table(%s)", tableID)
return cerror.ErrProcessorTableNotFound.
GenWithStack("normal table(%s)", tableID)
}
tableName = &name
}
markTableSchemaName, markTableTableName := mark.GetMarkTableName(tableName.Schema, tableName.Table)
tableInfo, exist := p.schemaStorage.GetLastSnapshot().GetTableByName(markTableSchemaName, markTableTableName)
markTableSchemaName, markTableTableName := mark.GetMarkTableName(
tableName.Schema, tableName.Table)
tableInfo, exist := p.schemaStorage.
GetLastSnapshot().
GetTableByName(markTableSchemaName, markTableTableName)
if !exist {
return cerror.ErrProcessorTableNotFound.GenWithStack("normal table(%s) and mark table not match", tableName.String())
return cerror.ErrProcessorTableNotFound.
GenWithStack("normal table(%s) and mark table not match",
tableName.String())
}
markTableID = tableInfo.ID
return nil
}, retry.WithBackoffBaseDelay(50), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(20))
}, retry.WithBackoffBaseDelay(50),
retry.WithBackoffMaxDelay(60*1000),
retry.WithMaxTries(20))
if err != nil {
return nil, errors.Trace(err)
return "", errors.Trace(err)
}
replicaInfo.MarkTableID = markTableID
}
var tableNameStr string

if tableName == nil {
log.Warn("failed to get table name for metric")
tableNameStr = strconv.Itoa(int(tableID))
} else {
tableNameStr = tableName.QuoteString()
return strconv.Itoa(int(tableID)), nil
}

return tableName.QuoteString(), nil
}

func (p *processor) createTablePipelineImpl(
ctx cdcContext.Context,
tableID model.TableID,
replicaInfo *model.TableReplicaInfo,
) (tablepipeline.TablePipeline, error) {
ctx = cdcContext.WithErrorHandler(ctx, func(err error) error {
if cerror.ErrTableProcessorStoppedSafely.Equal(err) ||
errors.Cause(errors.Cause(err)) == context.Canceled {
return nil
}
p.sendError(err)
return nil
})

tableName, err := p.getTableName(ctx, tableID, replicaInfo)
if err != nil {
return nil, errors.Trace(err)
}

sink, err := p.sinkManager.CreateTableSink(tableID, p.redoManager)
Expand All @@ -991,7 +1016,7 @@ func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID mode
ctx,
p.mounter,
tableID,
tableNameStr,
tableName,
replicaInfo,
sink,
p.changefeed.Info.GetTargetTs())
Expand All @@ -1003,7 +1028,7 @@ func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID mode
ctx,
p.mounter,
tableID,
tableNameStr,
tableName,
replicaInfo,
sink,
p.changefeed.Info.GetTargetTs(),
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/sink_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (m *Manager) destroyTableSink(ctx context.Context, tableID model.TableID) e
return m.bufSink.Barrier(ctx, tableID)
}

// UpdateChangeFeedCheckpointTs updates changedfeed level checkpointTs,
// UpdateChangeFeedCheckpointTs updates changefeed level checkpointTs,
// this value is used in getCheckpointTs func
func (m *Manager) UpdateChangeFeedCheckpointTs(checkpointTs uint64) {
if m.bufSink != nil {
Expand Down
10 changes: 5 additions & 5 deletions dm/dm/config/source_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ type SourceConfig struct {
// deprecated
AutoFixGTID bool `yaml:"auto-fix-gtid" toml:"auto-fix-gtid" json:"auto-fix-gtid"`
RelayDir string `yaml:"relay-dir" toml:"relay-dir" json:"relay-dir"`
MetaDir string `yaml:"meta-dir" toml:"meta-dir" json:"meta-dir"`
Flavor string `yaml:"flavor" toml:"flavor" json:"flavor"`
Charset string `yaml:"charset" toml:"charset" json:"charset"`
// deprecated
MetaDir string `yaml:"meta-dir" toml:"meta-dir" json:"meta-dir"`
Flavor string `yaml:"flavor" toml:"flavor" json:"flavor"`
// deprecated
Charset string `yaml:"charset" toml:"charset" json:"charset"`

EnableRelay bool `yaml:"enable-relay" toml:"enable-relay" json:"enable-relay"`
// relay synchronous starting point (if specified)
Expand Down Expand Up @@ -412,7 +414,6 @@ type SourceConfigForDowngrade struct {
Enable bool `yaml:"enable,omitempty"`
EnableGTID bool `yaml:"enable-gtid"`
RelayDir string `yaml:"relay-dir"`
MetaDir string `yaml:"meta-dir"`
Flavor string `yaml:"flavor"`
Charset string `yaml:"charset"`
EnableRelay bool `yaml:"enable-relay"`
Expand All @@ -436,7 +437,6 @@ func NewSourceConfigForDowngrade(sourceCfg *SourceConfig) *SourceConfigForDowngr
Enable: sourceCfg.Enable,
EnableGTID: sourceCfg.EnableGTID,
RelayDir: sourceCfg.RelayDir,
MetaDir: sourceCfg.MetaDir,
Flavor: sourceCfg.Flavor,
Charset: sourceCfg.Charset,
EnableRelay: sourceCfg.EnableRelay,
Expand Down
4 changes: 2 additions & 2 deletions dm/dm/config/source_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,8 @@ func (t *testConfig) TestSourceConfigForDowngrade(c *C) {
cfgForDowngrade := NewSourceConfigForDowngrade(cfg)
cfgReflect := reflect.Indirect(reflect.ValueOf(cfg))
cfgForDowngradeReflect := reflect.Indirect(reflect.ValueOf(cfgForDowngrade))
// auto-fix-gtid is not written when downgrade
c.Assert(cfgReflect.NumField(), Equals, cfgForDowngradeReflect.NumField()+1)
// auto-fix-gtid, meta-dir are not written when downgrade
c.Assert(cfgReflect.NumField(), Equals, cfgForDowngradeReflect.NumField()+2)

// make sure all field were copied
cfgForClone := &SourceConfigForDowngrade{}
Expand Down
20 changes: 19 additions & 1 deletion dm/dm/ctl/master/check_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
package master

import (
"bytes"
"context"
"errors"
"os"

"github.com/pingcap/tiflow/dm/dm/config"
"github.com/spf13/cobra"

"github.com/pingcap/tiflow/dm/checker"
Expand Down Expand Up @@ -63,10 +65,26 @@ func checkTaskFunc(cmd *cobra.Command, _ []string) error {
return err
}

// we check if `is-sharding` is explicitly set, to distinguish between `false` from default value
isShardingSet := false
lines := bytes.Split(content, []byte("\n"))
for i := range lines {
if bytes.HasPrefix(lines[i], []byte("is-sharding")) {
isShardingSet = true
break
}
}
task := config.NewTaskConfig()
yamlErr := task.RawDecode(string(content))
// if can't parse yaml, we ignore this check
if yamlErr == nil && isShardingSet && !task.IsSharding && task.ShardMode != "" {
common.PrintLinesf("The behaviour of `is-sharding` and `shard-mode` is conflicting. `is-sharding` is deprecated, please use `shard-mode` only.")
return errors.New("please check output to see error")
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// start task
resp := &pb.CheckTaskResponse{}
err = common.SendRequest(
ctx,
Expand Down
3 changes: 3 additions & 0 deletions dm/dm/ctl/master/operate_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ func operateSourceFunc(cmd *cobra.Command, _ []string) error {
}
content = []byte(yamlStr)
}
if cfg.RelayDir != "" {
common.PrintLinesf("`relay-dir` in source config will be deprecated soon, please use `relay-dir` in worker config instead")
}
contents = append(contents, string(content))
}

Expand Down
3 changes: 3 additions & 0 deletions dm/dm/ctl/master/start_stop_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ func startStopRelay(cmd *cobra.Command, op pb.RelayOpV2) error {
}

workers := cmd.Flags().Args()
if len(workers) > 0 {
common.PrintLinesf("start-relay/stop-relay with worker name will be deprecated soon. You can try stopping relay first and use start-relay without worker name instead")
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
16 changes: 16 additions & 0 deletions dm/dm/ctl/master/start_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package master

import (
"bytes"
"context"
"errors"
"os"
Expand Down Expand Up @@ -67,6 +68,21 @@ func startTaskFunc(cmd *cobra.Command, _ []string) error {
}
content = []byte(task.String())
}

// we check if `is-sharding` is explicitly set, to distinguish between `false` from default value
isShardingSet := false
lines := bytes.Split(content, []byte("\n"))
for i := range lines {
if bytes.HasPrefix(lines[i], []byte("is-sharding")) {
isShardingSet = true
break
}
}
if isShardingSet && !task.IsSharding && task.ShardMode != "" {
common.PrintLinesf("The behaviour of `is-sharding` and `shard-mode` is conflicting. `is-sharding` is deprecated, please use `shard-mode` only.")
return errors.New("please check output to see error")
}

sources, err := common.GetSourceArgs(cmd)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (w *SourceWorker) Start() {
}
}

// Close stops working and releases resources.
// Stop stops working and releases resources.
func (w *SourceWorker) Stop(graceful bool) {
if w.closed.Load() {
w.l.Warn("already closed")
Expand Down
6 changes: 5 additions & 1 deletion dm/tests/adjust_gtid/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ function run() {
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml
# make sure source1 is bound to worker1
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"operate-source create $WORK_DIR/source1.yaml" \
"\"result\": true" 2 \
"\"source\": \"$SOURCE_ID1\"" 1 \
"will be deprecated soon" 1

run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT
Expand Down
8 changes: 8 additions & 0 deletions dm/tests/dmctl_basic/check_list/check_task.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ function check_task_not_pass() {
"\"result\": false" 1
}

function check_task_not_pass_with_message() {
task_conf=$1
error_message=$2
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"check-task $task_conf" \
"$error_message" 1
}

function check_task_error_database_config() {
task_conf=$1
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
Expand Down
8 changes: 8 additions & 0 deletions dm/tests/dmctl_basic/check_list/start_task.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,11 @@ function start_task_wrong_start_time_format() {
"start-task $task_conf --start-time '20060102 150405'" \
"error while parse start-time" 1
}

function start_task_not_pass_with_message() {
task_conf=$1
error_message=$2
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-task $task_conf" \
"$2" 1
}
Loading

0 comments on commit d21b7b0

Please sign in to comment.