Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer committed Sep 3, 2019
1 parent 028e4e8 commit e0c0234
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 4 deletions.
11 changes: 9 additions & 2 deletions loader/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/conn"
tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"

"go.uber.org/zap"
Expand Down Expand Up @@ -296,8 +297,14 @@ func (cp *RemoteCheckPoint) Init(filename string, endPos int64) error {

// Close implements CheckPoint.Close
func (cp *RemoteCheckPoint) Close() {
cp.conn.Close()
cp.db.Close()
err := cp.conn.Close()
if err != nil {
cp.tctx.L().Error("close checkpoint connection error", log.ShortError(err))
}
err = cp.db.Close()
if err != nil {
cp.tctx.L().Error("close checkpoint db error", log.ShortError(err))
}
}

// GenSQL implements CheckPoint.GenSQL
Expand Down
5 changes: 4 additions & 1 deletion loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,10 @@ func (l *Loader) Close() {
}

l.stopLoad()
l.toDB.Close()
err := l.toDB.Close()
if err != nil {
l.tctx.L().Error("close toDB error", log.ShortError(err))
}
l.checkPoint.Close()
l.closed.Set(true)
}
Expand Down
6 changes: 5 additions & 1 deletion syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,11 @@ func (cp *RemoteCheckPoint) createTable() error {
func (cp *RemoteCheckPoint) Load() error {
query := fmt.Sprintf("SELECT `cp_schema`, `cp_table`, `binlog_name`, `binlog_pos`, `is_global` FROM `%s`.`%s` WHERE `id`='%s'", cp.schema, cp.table, cp.id)
rows, err := cp.dbConn.querySQL(cp.tctx, query)
defer rows.Close()
defer func() {
if rows != nil {
rows.Close()
}
}()

failpoint.Inject("LoadCheckpointFailed", func(val failpoint.Value) {
err = tmysql.NewErr(uint16(val.(int)))
Expand Down

0 comments on commit e0c0234

Please sign in to comment.