Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
config, loader: auto-remove imported dump file (#770)
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Jul 10, 2020
1 parent bc1094a commit d23a0f8
Show file tree
Hide file tree
Showing 11 changed files with 117 additions and 4 deletions.
2 changes: 2 additions & 0 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ type SubTaskConfig struct {

ConfigFile string `toml:"-" json:"config-file"`

CleanDumpFile bool `toml:"clean-dump-file" json:"clean-dump-file"`

// still needed by Syncer / Loader bin
printVersion bool
}
Expand Down
5 changes: 5 additions & 0 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ type TaskConfig struct {
Mydumpers map[string]*MydumperConfig `yaml:"mydumpers"`
Loaders map[string]*LoaderConfig `yaml:"loaders"`
Syncers map[string]*SyncerConfig `yaml:"syncers"`

CleanDumpFile bool `yaml:"clean-dump-file"`
}

// NewTaskConfig creates a TaskConfig
Expand All @@ -310,6 +312,7 @@ func NewTaskConfig() *TaskConfig {
Mydumpers: make(map[string]*MydumperConfig),
Loaders: make(map[string]*LoaderConfig),
Syncers: make(map[string]*SyncerConfig),
CleanDumpFile: true,
}
cfg.FlagSet = flag.NewFlagSet("task", flag.ContinueOnError)
return cfg
Expand Down Expand Up @@ -556,6 +559,8 @@ func (c *TaskConfig) SubTaskConfigs(sources map[string]DBConfig) ([]*SubTaskConf
cfg.LoaderConfig = *inst.Loader
cfg.SyncerConfig = *inst.Syncer

cfg.CleanDumpFile = c.CleanDumpFile

err := cfg.Adjust(true)
if err != nil {
return nil, terror.Annotatef(err, "source %s", inst.SourceID)
Expand Down
19 changes: 16 additions & 3 deletions loader/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,16 +288,17 @@ func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context, filename string, endPos
}

// fields[0] -> db name, fields[1] -> table name
schema, table := fields[0], fields[1]
sql2 := fmt.Sprintf("INSERT INTO `%s`.`%s` (`id`, `filename`, `cp_schema`, `cp_table`, `offset`, `end_pos`) VALUES(?,?,?,?,?,?)", cp.schema, cp.table)
cp.logCtx.L().Info("initial checkpoint record",
zap.String("sql", sql2),
zap.String("id", cp.id),
zap.String("filename", filename),
zap.String("schema", fields[0]),
zap.String("table", fields[1]),
zap.String("schema", schema),
zap.String("table", table),
zap.Int64("offset", 0),
zap.Int64("end position", endPos))
args := []interface{}{cp.id, filename, fields[0], fields[1], 0, endPos}
args := []interface{}{cp.id, filename, schema, table, 0, endPos}
cp.connMutex.Lock()
err := cp.conn.executeSQL(tctx, []string{sql2}, args)
cp.connMutex.Unlock()
Expand All @@ -308,6 +309,18 @@ func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context, filename string, endPos
}
return terror.WithScope(terror.Annotate(err, "initialize checkpoint"), terror.ScopeDownstream)
}
// checkpoint not exists and no error, cache endPos in memory
if _, ok := cp.restoringFiles[schema]; !ok {
cp.restoringFiles[schema] = make(map[string]FilePosSet)
}
tables := cp.restoringFiles[schema]
if _, ok := tables[table]; !ok {
tables[table] = make(map[string][]int64)
}
restoringFiles := tables[table]
if _, ok := restoringFiles[filename]; !ok {
restoringFiles[filename] = []int64{0, endPos}
}
return nil
}

Expand Down
61 changes: 61 additions & 0 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ type Tables2DataFiles map[string]DataFiles
type dataJob struct {
sql string
schema string
table string
file string
absPath string
offset int64
lastOffset int64
}
Expand Down Expand Up @@ -180,6 +182,20 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalCh
return
}
w.loader.finishedDataSize.Add(job.offset - job.lastOffset)

if w.cfg.CleanDumpFile {
fileInfos := w.checkPoint.GetRestoringFileInfo(job.schema, job.table)
if pos, ok := fileInfos[job.file]; ok {
if job.offset == pos[1] {
w.tctx.L().Info("try to remove loaded dump file", zap.String("data file", job.file))
if err := os.Remove(job.absPath); err != nil {
w.tctx.L().Warn("error when remove loaded dump file", zap.String("data file", job.file), zap.Error(err))
}
}
} else {
w.tctx.L().Warn("file not recorded in checkpoint", zap.String("data file", job.file))
}
}
}
}
}
Expand Down Expand Up @@ -319,7 +335,9 @@ func (w *Worker) dispatchSQL(ctx context.Context, file string, offset int64, tab
j := &dataJob{
sql: query,
schema: table.targetSchema,
table: table.targetTable,
file: baseFile,
absPath: file,
offset: cur,
lastOffset: lastOffset,
}
Expand Down Expand Up @@ -595,6 +613,25 @@ func (l *Loader) Restore(ctx context.Context) error {

if err == nil {
l.logCtx.L().Info("all data files have been finished", zap.Duration("cost time", time.Since(begin)))
if l.cfg.CleanDumpFile {
files := CollectDirFiles(l.cfg.Dir)
hasDatafile := false
for file := range files {
if strings.HasSuffix(file, ".sql") &&
!strings.HasSuffix(file, "-schema.sql") &&
!strings.HasSuffix(file, "-schema-create.sql") &&
!strings.Contains(file, "-schema-view.sql") &&
!strings.Contains(file, "-schema-triggers.sql") &&
!strings.Contains(file, "-schema-post.sql") {
hasDatafile = true
}
}

if !hasDatafile {
l.logCtx.L().Info("clean dump files after importing all files")
l.cleanDumpFiles()
}
}
} else if errors.Cause(err) != context.Canceled {
return err
}
Expand Down Expand Up @@ -1211,3 +1248,27 @@ func (l *Loader) getMydumpMetadata() error {
l.metaBinlog.Set(pos.String())
return nil
}

// cleanDumpFiles is called when finish restoring data, to clean useless files
func (l *Loader) cleanDumpFiles() {
if l.cfg.Mode == config.ModeFull {
// in full-mode all files won't be need in the future
if err := os.RemoveAll(l.cfg.Dir); err != nil {
l.logCtx.L().Warn("error when remove loaded dump folder", zap.String("data folder", l.cfg.Dir), zap.Error(err))
}
} else {
// leave metadata file, only delete structure files
for db, tables := range l.db2Tables {
dbFile := fmt.Sprintf("%s/%s-schema-create.sql", l.cfg.Dir, db)
if err := os.Remove(dbFile); err != nil {
l.logCtx.L().Warn("error when remove loaded dump file", zap.String("data file", dbFile), zap.Error(err))
}
for table := range tables {
tableFile := fmt.Sprintf("%s/%s.%s-schema.sql", l.cfg.Dir, db, table)
if err := os.Remove(tableFile); err != nil {
l.logCtx.L().Warn("error when remove loaded dump file", zap.String("data file", tableFile), zap.Error(err))
}
}
}
}
}
18 changes: 18 additions & 0 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package syncer
import (
"context"
"fmt"
"os"
"path"
"reflect"
"strconv"
"strings"
Expand Down Expand Up @@ -1049,6 +1051,22 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
if err != nil {
return err
}

// for fresh and all-mode task, flush checkpoint so we could delete metadata file
if s.cfg.Mode == config.ModeAll {
if err := s.flushCheckPoints(); err != nil {
s.tctx.L().Warn("fail to flush checkpoints when starting task", zap.Error(err))
} else {
s.tctx.L().Info("try to remove loaded files")
metadataFile := path.Join(s.cfg.Dir, "metadata")
if err := os.Remove(metadataFile); err != nil {
s.tctx.L().Warn("error when remove loaded dump file", zap.String("data file", metadataFile), zap.Error(err))
}
if err := os.Remove(s.cfg.Dir); err != nil {
s.tctx.L().Warn("error when remove loaded dump folder", zap.String("data folder", s.cfg.Dir), zap.Error(err))
}
}
}
}

// currentPos is the pos for current received event (End_log_pos in `show binlog events` for mysql)
Expand Down
4 changes: 4 additions & 0 deletions tests/all_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ function run() {
check_not_contains "ignore_db"
check_contains "all_mode"

echo "check dump files have been cleaned"
ls $WORK_DIR/worker1/dumped_data.test && exit 1 || echo "worker1 auto removed dump files"
ls $WORK_DIR/worker2/dumped_data.test && exit 1 || echo "worker2 auto removed dump files"

export GO_FAILPOINTS=''
}

Expand Down
1 change: 1 addition & 0 deletions tests/dm_syncer/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ meta-schema: "dm_meta"
heartbeat-update-interval: 1
heartbeat-report-interval: 1
timezone: "Asia/Shanghai"
clean-dump-file: false

target-database:
host: "127.0.0.1"
Expand Down
4 changes: 4 additions & 0 deletions tests/full_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ function run() {

# use sync_diff_inspector to check full dump loader
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

echo "check dump files have been cleaned"
ls $WORK_DIR/worker1/dumped_data.test && exit 1 || echo "worker1 auto removed dump files"
ls $WORK_DIR/worker2/dumped_data.test && exit 1 || echo "worker2 auto removed dump files"
}

cleanup_data full_mode
Expand Down
2 changes: 1 addition & 1 deletion tests/import_goroutine_leak/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ function run() {
run_sql_file $WORK_DIR/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2


echo "dm-worker paninc, doJob of import unit workers don't exit"
echo "dm-worker panic, doJob of import unit workers don't exit"
# check doJobs of import unit worker exit
inject_points=("github.com/pingcap/dm/loader/dispatchError=return(1)"
"github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)"
Expand Down
1 change: 1 addition & 0 deletions tests/incremental_mode/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ meta-schema: "dm_meta"
heartbeat-update-interval: 1
heartbeat-report-interval: 1
timezone: "Asia/Shanghai"
clean-dump-file: false

target-database:
host: "127.0.0.1"
Expand Down
4 changes: 4 additions & 0 deletions tests/load_interrupt/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ function run() {

check_port_offline $WORKER1_PORT 20

# check dump files are generated before worker down
ls $WORK_DIR/worker1/dumped_data.test

run_sql "SELECT count(*) from dm_meta.test_loader_checkpoint where cp_schema = '$TEST_NAME' and offset < $THRESHOLD" $TIDB_PORT $TIDB_PASSWORD
check_contains "count(*): 1"
# TODO: block for dumpling temporarily
Expand Down Expand Up @@ -87,6 +90,7 @@ function run() {
check_contains "count(*): 1"

export GO_FAILPOINTS=''
ls $WORK_DIR/worker1/dumped_data.test && exit 1 || echo "worker1 auto removed dump files"
}

cleanup_data load_interrupt
Expand Down

0 comments on commit d23a0f8

Please sign in to comment.