Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink/mysql: rollback txn to recycle db conn, refine timeout param in db conn (#1279) #1285

Merged
merged 1 commit into from
Jan 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const (
defaultBatchReplaceSize = 20
defaultReadTimeout = "2m"
defaultWriteTimeout = "2m"
defaultDialTimeout = "2m"
defaultSafeMode = true
)

Expand Down Expand Up @@ -286,6 +287,7 @@ type sinkParams struct {
batchReplaceSize int
readTimeout string
writeTimeout string
dialTimeout string
enableOldValue bool
safeMode bool
timezone string
Expand All @@ -305,6 +307,7 @@ var defaultParams = &sinkParams{
batchReplaceSize: defaultBatchReplaceSize,
readTimeout: defaultReadTimeout,
writeTimeout: defaultWriteTimeout,
dialTimeout: defaultDialTimeout,
safeMode: defaultSafeMode,
}

Expand Down Expand Up @@ -343,6 +346,7 @@ func configureSinkURI(
}
dsnCfg.Params["readTimeout"] = params.readTimeout
dsnCfg.Params["writeTimeout"] = params.writeTimeout
dsnCfg.Params["timeout"] = params.dialTimeout

autoRandom, err := checkTiDBVariable(ctx, testDB, "allow_auto_random_explicit_insert", "1")
if err != nil {
Expand Down Expand Up @@ -467,6 +471,23 @@ func parseSinkURI(ctx context.Context, sinkURI *url.URL, opts map[string]string)
params.timezone = fmt.Sprintf(`"%s"`, tz.String())
}

// read, write, and dial timeout for each individual connection, equals to
// readTimeout, writeTimeout, timeout in go mysql driver respectively.
// ref: https://github.com/go-sql-driver/mysql#connection-pool-and-timeouts
// To keep the same style with other sink parameters, we use dash as word separator.
s = sinkURI.Query().Get("read-timeout")
if s != "" {
params.readTimeout = s
}
s = sinkURI.Query().Get("write-timeout")
if s != "" {
params.writeTimeout = s
}
s = sinkURI.Query().Get("timeout")
if s != "" {
params.dialTimeout = s
}

return params, nil
}

Expand All @@ -480,6 +501,10 @@ func getDBConn(ctx context.Context, dsnStr string) (*sql.DB, error) {
}
err = db.PingContext(ctx)
if err != nil {
// close db to recycle resources
if closeErr := db.Close(); closeErr != nil {
log.Warn("close db failed", zap.Error(err))
}
return nil, errors.Annotate(
cerror.WrapError(cerror.ErrMySQLConnectionError, err), "fail to open MySQL connection")
}
Expand Down Expand Up @@ -528,6 +553,9 @@ func newMySQLSink(
if params.timezone != "" {
dsn.Params["time_zone"] = params.timezone
}
dsn.Params["readTimeout"] = params.readTimeout
dsn.Params["writeTimeout"] = params.writeTimeout
dsn.Params["timeout"] = params.dialTimeout
testDB, err := getDBConnImpl(ctx, dsn.FormatDSN())
if err != nil {
return nil, err
Expand Down Expand Up @@ -815,12 +843,18 @@ func (s *mysqlSink) execDMLWithMaxRetries(
args := dmls.values[i]
log.Debug("exec row", zap.String("sql", query), zap.Any("args", args))
if _, err := tx.ExecContext(ctx, query, args...); err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
log.Warn("failed to rollback txn", zap.Error(err))
}
return 0, checkTxnErr(cerror.WrapError(cerror.ErrMySQLTxnError, err))
}
}
if len(dmls.markSQL) != 0 {
log.Debug("exec row", zap.String("sql", dmls.markSQL))
if _, err := tx.ExecContext(ctx, dmls.markSQL); err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
log.Warn("failed to rollback txn", zap.Error(err))
}
return 0, checkTxnErr(cerror.WrapError(cerror.ErrMySQLTxnError, err))
}
}
Expand Down
Loading