Skip to content

Commit

Permalink
Merge branch 'master' into fix-shcema-storage-unit-test
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored Jan 18, 2022
2 parents 407925c + 4bfa080 commit e408162
Show file tree
Hide file tree
Showing 75 changed files with 984 additions and 16,693 deletions.
11 changes: 4 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
### Makefile for ticdc
.PHONY: build test check clean fmt cdc kafka_consumer coverage \
integration_test_build integration_test integration_test_mysql integration_test_kafka bank \
dm dm-master dm-worker dmctl dm-portal dm-syncer dm_coverage
dm dm-master dm-worker dmctl dm-syncer dm_coverage

PROJECT=tiflow
P=3
Expand Down Expand Up @@ -240,7 +240,7 @@ clean:
rm -rf bin
rm -rf tools/bin

dm: dm-master dm-worker dmctl dm-portal dm-syncer
dm: dm-master dm-worker dmctl dm-syncer

dm-master:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/dm-master ./dm/cmd/dm-master
Expand All @@ -256,9 +256,6 @@ dm-worker:
dmctl:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/dmctl ./dm/cmd/dm-ctl

dm-portal:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/dm-portal ./dm/cmd/dm-portal

dm-syncer:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/dm-syncer ./dm/cmd/dm-syncer

Expand Down Expand Up @@ -365,8 +362,8 @@ dm_compatibility_test: check_third_party_binary_for_dm
dm_coverage: tools/bin/gocovmerge tools/bin/goveralls
# unify cover mode in coverage files, more details refer to dm/tests/_utils/run_dm_ctl
find "$(DM_TEST_DIR)" -type f -name "cov.*.dmctl.*.out" -exec sed -i "s/mode: count/mode: atomic/g" {} \;
tools/bin/gocovmerge "$(DM_TEST_DIR)"/cov.* | grep -vE ".*.pb.go|.*.pb.gw.go|.*.__failpoint_binding__.go|.*debug-tools.*|.*portal.*|.*chaos.*" > "$(DM_TEST_DIR)/all_cov.out"
tools/bin/gocovmerge "$(DM_TEST_DIR)"/cov.unit_test*.out | grep -vE ".*.pb.go|.*.pb.gw.go|.*.__failpoint_binding__.go|.*debug-tools.*|.*portal.*|.*chaos.*" > $(DM_TEST_DIR)/unit_test.out
tools/bin/gocovmerge "$(DM_TEST_DIR)"/cov.* | grep -vE ".*.pb.go|.*.pb.gw.go|.*.__failpoint_binding__.go|.*debug-tools.*|.*chaos.*" > "$(DM_TEST_DIR)/all_cov.out"
tools/bin/gocovmerge "$(DM_TEST_DIR)"/cov.unit_test*.out | grep -vE ".*.pb.go|.*.pb.gw.go|.*.__failpoint_binding__.go|.*debug-tools.*|.*chaos.*" > $(DM_TEST_DIR)/unit_test.out
go tool cover -html "$(DM_TEST_DIR)/all_cov.out" -o "$(DM_TEST_DIR)/all_cov.html"
go tool cover -html "$(DM_TEST_DIR)/unit_test.out" -o "$(DM_TEST_DIR)/unit_test_cov.html"

Expand Down
14 changes: 10 additions & 4 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (c *Capture) run(stdCtx context.Context) error {
// when the etcd worker of processor returns an error, it means that the processor throws an unrecoverable serious errors
// (recoverable errors are intercepted in the processor tick)
// so we should also stop the processor and let capture restart or exit
processorErr = c.runEtcdWorker(ctx, c.processorManager, globalState, processorFlushInterval)
processorErr = c.runEtcdWorker(ctx, c.processorManager, globalState, processorFlushInterval, "processor")
log.Info("the processor routine has exited", zap.Error(processorErr))
}()
wg.Add(1)
Expand Down Expand Up @@ -425,7 +425,7 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error {
})
}

err = c.runEtcdWorker(ownerCtx, owner, orchestrator.NewGlobalState(), ownerFlushInterval)
err = c.runEtcdWorker(ownerCtx, owner, orchestrator.NewGlobalState(), ownerFlushInterval, "owner")
c.setOwner(nil)
log.Info("run owner exited", zap.Error(err))
// if owner exits, resign the owner key
Expand All @@ -441,13 +441,19 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error {
}
}

func (c *Capture) runEtcdWorker(ctx cdcContext.Context, reactor orchestrator.Reactor, reactorState orchestrator.ReactorState, timerInterval time.Duration) error {
func (c *Capture) runEtcdWorker(
ctx cdcContext.Context,
reactor orchestrator.Reactor,
reactorState orchestrator.ReactorState,
timerInterval time.Duration,
role string,
) error {
etcdWorker, err := orchestrator.NewEtcdWorker(ctx.GlobalVars().EtcdClient.Client, etcd.EtcdKeyBase, reactor, reactorState)
if err != nil {
return errors.Trace(err)
}
captureAddr := c.info.AdvertiseAddr
if err := etcdWorker.Run(ctx, c.session, timerInterval, captureAddr); err != nil {
if err := etcdWorker.Run(ctx, c.session, timerInterval, captureAddr, role); err != nil {
// We check ttl of lease instead of check `session.Done`, because
// `session.Done` is only notified when etcd client establish a
// new keepalive request, there could be a time window as long as
Expand Down
6 changes: 4 additions & 2 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,13 @@ func (s *ddlSinkImpl) run(ctx cdcContext.Context, id model.ChangeFeedID, info *m

start := time.Now()
if err := s.sinkInitHandler(ctx, s, id, info); err != nil {
log.Warn("ddl sink initialize failed", zap.Duration("duration", time.Since(start)))
log.Warn("ddl sink initialize failed",
zap.Duration("duration", time.Since(start)))
ctx.Throw(err)
return
}
log.Info("ddl sink initialized, start processing...", zap.Duration("duration", time.Since(start)))
log.Info("ddl sink initialized, start processing...",
zap.Duration("duration", time.Since(start)))

// TODO make the tick duration configurable
ticker := time.NewTicker(time.Second)
Expand Down
4 changes: 3 additions & 1 deletion cdc/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ func (m *Manager) closeProcessor(changefeedID model.ChangeFeedID) {
if processor, exist := m.processors[changefeedID]; exist {
err := processor.Close()
if err != nil {
log.Warn("failed to close processor", zap.Error(err))
log.Warn("failed to close processor",
zap.String("changefeed", changefeedID),
zap.Error(err))
}
delete(m.processors, changefeedID)
}
Expand Down
19 changes: 19 additions & 0 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,10 +473,19 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
}
opts[sink.OptChangefeedID] = p.changefeed.ID
opts[sink.OptCaptureAddr] = ctx.GlobalVars().CaptureInfo.AdvertiseAddr
log.Info("processor try new sink", zap.String("changefeed", p.changefeed.ID))

start := time.Now()
s, err := sink.New(stdCtx, p.changefeed.ID, p.changefeed.Info.SinkURI, p.filter, p.changefeed.Info.Config, opts, errCh)
if err != nil {
log.Info("processor new sink failed",
zap.String("changefeed", p.changefeed.ID),
zap.Duration("duration", time.Since(start)))
return errors.Trace(err)
}
log.Info("processor try new sink success",
zap.Duration("duration", time.Since(start)))

checkpointTs := p.changefeed.Info.GetCheckpointTs(p.changefeed.Status)
captureAddr := ctx.GlobalVars().CaptureInfo.AdvertiseAddr
p.sinkManager = sink.NewManager(stdCtx, s, errCh, checkpointTs, captureAddr, p.changefeedID)
Expand Down Expand Up @@ -1040,6 +1049,7 @@ func (p *processor) flushRedoLogMeta(ctx context.Context) error {
}

func (p *processor) Close() error {
log.Info("processor closing ...", zap.String("changefeed", p.changefeedID))
for _, tbl := range p.tables {
tbl.Cancel()
}
Expand All @@ -1061,9 +1071,18 @@ func (p *processor) Close() error {
// pass a canceled context is ok here, since we don't need to wait Close
ctx, cancel := context.WithCancel(context.Background())
cancel()
log.Info("processor try to close the sinkManager",
zap.String("changefeed", p.changefeedID))
start := time.Now()
if err := p.sinkManager.Close(ctx); err != nil {
log.Info("processor close sinkManager failed",
zap.String("changefeed", p.changefeedID),
zap.Duration("duration", time.Since(start)))
return errors.Trace(err)
}
log.Info("processor close sinkManager success",
zap.String("changefeed", p.changefeedID),
zap.Duration("duration", time.Since(start)))
}
if p.newSchedulerEnabled {
if p.agent == nil {
Expand Down
14 changes: 13 additions & 1 deletion cdc/sink/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand Down Expand Up @@ -85,7 +86,18 @@ func (m *Manager) Close(ctx context.Context) error {
defer m.tableSinksMu.Unlock()
tableSinkTotalRowsCountCounter.DeleteLabelValues(m.captureAddr, m.changefeedID)
if m.bufSink != nil {
return m.bufSink.Close(ctx)
log.Info("sinkManager try close bufSink",
zap.String("changefeed", m.changefeedID))
start := time.Now()
if err := m.bufSink.Close(ctx); err != nil {
log.Info("close bufSink failed",
zap.String("changefeed", m.changefeedID),
zap.Duration("duration", time.Since(start)))
return err
}
log.Info("close bufSink success",
zap.String("changefeed", m.changefeedID),
zap.Duration("duration", time.Since(start)))
}
return nil
}
Expand Down
18 changes: 12 additions & 6 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,19 @@ func (k *kafkaSaramaProducer) Close() error {
// In fact close sarama sync client doesn't return any error.
// But close async client returns error if error channel is not empty, we
// don't populate this error to the upper caller, just add a log here.
err1 := k.syncClient.Close()
err2 := k.asyncClient.Close()
if err1 != nil {
log.Error("close sync client with error", zap.Error(err1))
start := time.Now()
err := k.asyncClient.Close()
if err != nil {
log.Error("close async client with error", zap.Error(err), zap.Duration("duration", time.Since(start)))
} else {
log.Info("async client closed", zap.Duration("duration", time.Since(start)))
}
if err2 != nil {
log.Error("close async client with error", zap.Error(err2))
start = time.Now()
err = k.syncClient.Close()
if err != nil {
log.Error("close sync client with error", zap.Error(err), zap.Duration("duration", time.Since(start)))
} else {
log.Info("sync client closed", zap.Duration("duration", time.Since(start)))
}
return nil
}
Expand Down
58 changes: 0 additions & 58 deletions dm/cmd/dm-portal/main.go

This file was deleted.

4 changes: 4 additions & 0 deletions dm/dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,10 @@ type SubTaskConfig struct {

// which DM worker is running the subtask, this will be injected when the real worker starts running the subtask(StartSubTask).
WorkerName string `toml:"-" json:"-"`
// task experimental configs
Experimental struct {
AsyncCheckpointFlush bool `yaml:"async-checkpoint-flush" toml:"async-checkpoint-flush" json:"async-checkpoint-flush"`
} `yaml:"experimental" toml:"experimental" json:"experimental"`
}

// NewSubTaskConfig creates a new SubTaskConfig.
Expand Down
5 changes: 5 additions & 0 deletions dm/dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,11 @@ type TaskConfig struct {

// extra config when target db is TiDB
TiDB *TiDBExtraConfig `yaml:"tidb" toml:"tidb" json:"tidb"`

// task experimental configs
Experimental struct {
AsyncCheckpointFlush bool `yaml:"async-checkpoint-flush" toml:"async-checkpoint-flush" json:"async-checkpoint-flush"`
} `yaml:"experimental" toml:"experimental" json:"experimental"`
}

// NewTaskConfig creates a TaskConfig.
Expand Down
8 changes: 5 additions & 3 deletions dm/dm/config/task_converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func TaskConfigToSubTaskConfigs(c *TaskConfig, sources map[string]DBConfig) ([]*
cfg.Timezone = c.Timezone
cfg.Meta = inst.Meta
cfg.CollationCompatible = c.CollationCompatible
cfg.Experimental = c.Experimental

fromClone := dbCfg.Clone()
if fromClone == nil {
Expand Down Expand Up @@ -111,7 +112,7 @@ func TaskConfigToSubTaskConfigs(c *TaskConfig, sources map[string]DBConfig) ([]*

// OpenAPITaskToSubTaskConfigs generates sub task configs by openapi.Task.
func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *DBConfig, sourceCfgMap map[string]*SourceConfig) (
[]SubTaskConfig, error) {
[]*SubTaskConfig, error) {
// source name -> migrate rule list
tableMigrateRuleMap := make(map[string][]openapi.TaskTableMigrateRule)
for _, rule := range task.TableMigrateRule {
Expand All @@ -136,7 +137,7 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *DBConfig, sourceCf
}
}
// start to generate sub task configs
subTaskCfgList := make([]SubTaskConfig, len(task.SourceConfig.SourceConf))
subTaskCfgList := make([]*SubTaskConfig, len(task.SourceConfig.SourceConf))
for i, sourceCfg := range task.SourceConfig.SourceConf {
// precheck source config
_, exist := sourceCfgMap[sourceCfg.SourceName]
Expand Down Expand Up @@ -245,7 +246,7 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *DBConfig, sourceCf
if err := subTaskCfg.Adjust(true); err != nil {
return nil, terror.Annotatef(err, "source name %s", sourceCfg.SourceName)
}
subTaskCfgList[i] = *subTaskCfg
subTaskCfgList[i] = subTaskCfg
}
return subTaskCfgList, nil
}
Expand Down Expand Up @@ -303,6 +304,7 @@ func SubTaskConfigsToTaskConfig(stCfgs ...*SubTaskConfig) *TaskConfig {
c.Loaders = make(map[string]*LoaderConfig)
c.Syncers = make(map[string]*SyncerConfig)
c.ExprFilter = make(map[string]*ExpressionFilter)
c.Experimental = stCfg0.Experimental

baListMap := make(map[string]string, len(stCfgs))
routeMap := make(map[string]string, len(stCfgs))
Expand Down
6 changes: 3 additions & 3 deletions dm/dm/config/task_converters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func testNoShardSubTaskConfigsToOpenAPITask(c *check.C) {
// prepare sub task config
subTaskConfigMap := make(map[string]map[string]SubTaskConfig)
subTaskConfigMap[task.Name] = make(map[string]SubTaskConfig)
subTaskConfigMap[task.Name][source1Name] = subTaskConfigList[0]
subTaskConfigMap[task.Name][source1Name] = *subTaskConfigList[0]

taskList := SubTaskConfigsToOpenAPITask(subTaskConfigMap)
c.Assert(taskList, check.HasLen, 1)
Expand Down Expand Up @@ -309,8 +309,8 @@ func testShardAndFilterSubTaskConfigsToOpenAPITask(c *check.C) {
// prepare sub task config
subTaskConfigMap := make(map[string]map[string]SubTaskConfig)
subTaskConfigMap[task.Name] = make(map[string]SubTaskConfig)
subTaskConfigMap[task.Name][source1Name] = subTaskConfigList[0]
subTaskConfigMap[task.Name][source2Name] = subTaskConfigList[1]
subTaskConfigMap[task.Name][source1Name] = *subTaskConfigList[0]
subTaskConfigMap[task.Name][source2Name] = *subTaskConfigList[1]

taskList := SubTaskConfigsToOpenAPITask(subTaskConfigMap)
c.Assert(taskList, check.HasLen, 1)
Expand Down
4 changes: 3 additions & 1 deletion dm/dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,7 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
}
)

stCfg1.Experimental.AsyncCheckpointFlush = true
stCfg2, err := stCfg1.Clone()
c.Assert(err, IsNil)
stCfg2.SourceID = source2
Expand Down Expand Up @@ -806,6 +807,7 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
},
CleanDumpFile: stCfg1.CleanDumpFile,
}
cfg2.Experimental.AsyncCheckpointFlush = true

c.Assert(WordCount(cfg.String()), DeepEquals, WordCount(cfg2.String())) // since rules are unordered, so use WordCount to compare

Expand Down Expand Up @@ -1027,7 +1029,7 @@ func (t *testConfig) TestTaskConfigForDowngrade(c *C) {
// make sure all new field were added
cfgReflect := reflect.Indirect(reflect.ValueOf(cfg))
cfgForDowngradeReflect := reflect.Indirect(reflect.ValueOf(cfgForDowngrade))
c.Assert(cfgReflect.NumField(), Equals, cfgForDowngradeReflect.NumField()+3) // without flag and tidb and collation_compatible
c.Assert(cfgReflect.NumField(), Equals, cfgForDowngradeReflect.NumField()+4) // without flag, tidb, collation_compatible and experimental

// make sure all field were copied
cfgForClone := &TaskConfigForDowngrade{}
Expand Down
Loading

0 comments on commit e408162

Please sign in to comment.