Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
reset connection
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer committed Sep 4, 2019
1 parent e651cd4 commit 22d5987
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 8 deletions.
13 changes: 13 additions & 0 deletions pkg/retry/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package retry

import (
"database/sql/driver"
"github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
tmysql "github.com/pingcap/parser/mysql"
Expand All @@ -35,3 +36,15 @@ func IsRetryableError(err error) bool {
}
return false
}

// IsConnectionError tells whether this error should reconnect to Database
func IsConnectionError(err error) bool {
err = errors.Cause(err)
switch err {
case driver.ErrBadConn:
return true
case mysql.ErrInvalidConn:
return true
}
return false
}
7 changes: 7 additions & 0 deletions pkg/retry/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ func (t *testStrategySuite) TestFiniteRetryStrategy(c *C) {
c.Assert(opCount, Equals, 0)
c.Assert(terror.ErrDBInvalidConn.Equal(err), IsTrue)

params.IsRetryableFn = func(int, error) bool {
return IsConnectionError(err)
}
_, opCount, err = strategy.Apply(ctx, params, operateFn)
c.Assert(opCount, Equals, 3)
c.Assert(terror.ErrDBInvalidConn.Equal(err), IsTrue)

retValue := "success"
operateFn = func(*tcontext.Context) (interface{}, error) {
return retValue, nil
Expand Down
2 changes: 1 addition & 1 deletion syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (cp *RemoteCheckPoint) Close() {

// ResetConn implements CheckPoint.ResetConn
func (cp *RemoteCheckPoint) ResetConn() error {
return cp.dbConn.ResetConn(cp.tctx, cp.db)
return cp.dbConn.ResetConn(cp.tctx)
}

// Clear implements CheckPoint.Clear
Expand Down
23 changes: 18 additions & 5 deletions syncer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,16 @@ func closeUpstreamConn(tctx *tcontext.Context, conn *UpStreamConn) {
type WorkerConn struct {
cfg *config.SubTaskConfig
baseConn *conn.BaseConn

baseDB *conn.BaseDB
}

// ResetConn reset one worker connection from specify *BaseDB
func (conn *WorkerConn) ResetConn(tctx *tcontext.Context, db *conn.BaseDB) error {
if db == nil {
func (conn *WorkerConn) ResetConn(tctx *tcontext.Context) error {
if conn == nil || conn.baseDB == nil {
return terror.ErrDBDriverError.Generate("database not valid")
}
dbConn, err := db.GetBaseConn(tctx.Context())
dbConn, err := conn.baseDB.GetBaseConn(tctx.Context())
if err != nil {
return err
}
Expand Down Expand Up @@ -201,6 +203,16 @@ func (conn *WorkerConn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError
FirstRetryDuration: retryTimeout,
BackoffStrategy: retry.Stable,
IsRetryableFn: func(retryTime int, err error) bool {
if retry.IsConnectionError(err) {
err := conn.ResetConn(tctx)
if err != nil {
tctx.L().Warn("reset connection failed", zap.Int("retry", retryTime),
zap.String("queries", utils.TruncateInterface(queries, -1)),
zap.String("arguments", utils.TruncateInterface(args, -1)))
return false
}
return true
}
if retry.IsRetryableError(err) {
tctx.L().Warn("execute statements", zap.Int("retry", retryTime),
zap.String("queries", utils.TruncateInterface(queries, -1)),
Expand All @@ -218,6 +230,7 @@ func (conn *WorkerConn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError
func(ctx *tcontext.Context) (interface{}, error) {
startTime := time.Now()
ret, err := conn.baseConn.ExecuteSQLWithIgnoreError(ctx, ignoreError, queries, args...)

if err == nil {
cost := time.Since(startTime)
txnHistogram.WithLabelValues(conn.cfg.Name).Observe(cost.Seconds())
Expand Down Expand Up @@ -251,7 +264,7 @@ func createConn(ctx context.Context, cfg *config.SubTaskConfig, dbCfg config.DBC
if err != nil {
return nil, nil, terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeDownstream)
}
return baseDB, &WorkerConn{baseConn: baseConn, cfg: cfg}, nil
return baseDB, &WorkerConn{baseDB: baseDB, baseConn: baseConn, cfg: cfg}, nil
}

func createConns(ctx context.Context, cfg *config.SubTaskConfig, dbCfg config.DBConfig, count int) (*conn.BaseDB, []*WorkerConn, error) {
Expand All @@ -265,7 +278,7 @@ func createConns(ctx context.Context, cfg *config.SubTaskConfig, dbCfg config.DB
if err != nil {
return nil, nil, terror.WithScope(terror.ErrDBBadConn.Delegate(err), terror.ScopeDownstream)
}
conns = append(conns, &WorkerConn{baseConn: dbConn, cfg: cfg})
conns = append(conns, &WorkerConn{baseDB: db, baseConn: dbConn, cfg: cfg})
}
return db, conns, nil
}
Expand Down
4 changes: 2 additions & 2 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,13 +511,13 @@ func (s *Syncer) resetDBs() error {
var err error

for i := 0; i < len(s.toDBConns); i++ {
err = s.toDBConns[i].ResetConn(s.tctx, s.toDB)
err = s.toDBConns[i].ResetConn(s.tctx)
if err != nil {
return terror.WithScope(err, terror.ScopeDownstream)
}
}

err = s.ddlDBConn.ResetConn(s.tctx, s.ddlDB)
err = s.ddlDBConn.ResetConn(s.tctx)
if err != nil {
return terror.WithScope(err, terror.ScopeDownstream)
}
Expand Down

0 comments on commit 22d5987

Please sign in to comment.