Skip to content

Commit

Permalink
loader(dm): skip nil etcd client
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 committed Feb 16, 2022
1 parent 536d8ae commit 27a2351
Showing 1 changed file with 21 additions and 1 deletion.
22 changes: 21 additions & 1 deletion dm/loader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func getMydumpMetadata(cli *clientv3.Client, cfg *config.SubTaskConfig, workerNa
loc, _, err := dumpling.ParseMetaData(metafile, cfg.Flavor)
if err != nil {
if os.IsNotExist(err) {
worker, _, err2 := ha.GetLoadTask(cli, cfg.Name, cfg.SourceID)
worker, err2 := getLoadTask(cli, cfg.Name, cfg.SourceID)
if err2 != nil {
log.L().Warn("get load task", log.ShortError(err2))
}
Expand Down Expand Up @@ -171,7 +171,12 @@ func cleanDumpFiles(cfg *config.SubTaskConfig) {
}

// putLoadTask is called when start restoring data, to put load worker in etcd.
// This is no-op when the `cli` argument is nil.
func putLoadTask(cli *clientv3.Client, cfg *config.SubTaskConfig, workerName string) error {
// some usage like DM as a library, we don't support this feature
if cli == nil {
return nil
}
_, err := ha.PutLoadTask(cli, cfg.Name, cfg.SourceID, workerName)
if err != nil {
return err
Expand All @@ -181,11 +186,26 @@ func putLoadTask(cli *clientv3.Client, cfg *config.SubTaskConfig, workerName str
}

// delLoadTask is called when finish restoring data, to delete load worker in etcd.
// This is no-op when the `cli` argument is nil.
func delLoadTask(cli *clientv3.Client, cfg *config.SubTaskConfig, workerName string) error {
// some usage like DM as a library, we don't support this feature
if cli == nil {
return nil
}
_, _, err := ha.DelLoadTask(cli, cfg.Name, cfg.SourceID)
if err != nil {
return err
}
log.L().Info("delete load worker in etcd for full mode", zap.String("task", cfg.Name), zap.String("source", cfg.SourceID), zap.String("worker", workerName))
return nil
}

// getLoadTask gets the worker which in load stage for the source of the subtask.
// It will return "" and no error when the `cli` argument is nil.
func getLoadTask(cli *clientv3.Client, task, sourceID string) (string, error) {
if cli == nil {
return "", nil
}
name, _, err := ha.GetLoadTask(cli, task, sourceID)
return name, err
}

0 comments on commit 27a2351

Please sign in to comment.