Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

loader(dm): skip feature for nil etcd client #4599

Merged
merged 1 commit into from
Feb 16, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion dm/loader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func getMydumpMetadata(cli *clientv3.Client, cfg *config.SubTaskConfig, workerNa
loc, _, err := dumpling.ParseMetaData(metafile, cfg.Flavor)
if err != nil {
if os.IsNotExist(err) {
worker, _, err2 := ha.GetLoadTask(cli, cfg.Name, cfg.SourceID)
worker, err2 := getLoadTask(cli, cfg.Name, cfg.SourceID)
if err2 != nil {
log.L().Warn("get load task", log.ShortError(err2))
}
Expand Down Expand Up @@ -171,7 +171,12 @@ func cleanDumpFiles(cfg *config.SubTaskConfig) {
}

// putLoadTask is called when start restoring data, to put load worker in etcd.
// This is no-op when the `cli` argument is nil.
func putLoadTask(cli *clientv3.Client, cfg *config.SubTaskConfig, workerName string) error {
// some usage like DM as a library, we don't support this feature
if cli == nil {
return nil
}
_, err := ha.PutLoadTask(cli, cfg.Name, cfg.SourceID, workerName)
if err != nil {
return err
Expand All @@ -181,11 +186,26 @@ func putLoadTask(cli *clientv3.Client, cfg *config.SubTaskConfig, workerName str
}

// delLoadTask is called when finish restoring data, to delete load worker in etcd.
// This is no-op when the `cli` argument is nil.
func delLoadTask(cli *clientv3.Client, cfg *config.SubTaskConfig, workerName string) error {
// some usage like DM as a library, we don't support this feature
if cli == nil {
return nil
}
_, _, err := ha.DelLoadTask(cli, cfg.Name, cfg.SourceID)
if err != nil {
return err
}
log.L().Info("delete load worker in etcd for full mode", zap.String("task", cfg.Name), zap.String("source", cfg.SourceID), zap.String("worker", workerName))
return nil
}

// getLoadTask gets the worker which in load stage for the source of the subtask.
// It will return "" and no error when the `cli` argument is nil.
func getLoadTask(cli *clientv3.Client, task, sourceID string) (string, error) {
if cli == nil {
return "", nil
}
name, _, err := ha.GetLoadTask(cli, task, sourceID)
return name, err
}