Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#7981
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
Rustin170506 authored and ti-chi-bot committed Dec 28, 2022
1 parent ce45409 commit 9a75a40
Show file tree
Hide file tree
Showing 3 changed files with 2,392 additions and 3 deletions.
7 changes: 4 additions & 3 deletions cdc/sink/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,8 +663,6 @@ func (s *mysqlSink) execDMLWithMaxRetries(pctx context.Context, dmls *preparedDM
return retry.Do(pctx, func() error {
writeTimeout, _ := time.ParseDuration(s.params.writeTimeout)
writeTimeout += networkDriftDuration
ctx, cancelFunc := context.WithTimeout(pctx, writeTimeout)
defer cancelFunc()

failpoint.Inject("MySQLSinkTxnRandomError", func() {
failpoint.Return(
Expand All @@ -676,7 +674,7 @@ func (s *mysqlSink) execDMLWithMaxRetries(pctx context.Context, dmls *preparedDM
time.Sleep(time.Hour)
})
err := s.statistics.RecordBatchExecution(func() (int, error) {
tx, err := s.db.BeginTx(ctx, nil)
tx, err := s.db.BeginTx(pctx, nil)
if err != nil {
return 0, logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
Expand All @@ -686,13 +684,15 @@ func (s *mysqlSink) execDMLWithMaxRetries(pctx context.Context, dmls *preparedDM
for i, query := range dmls.sqls {
args := dmls.values[i]
log.Debug("exec row", zap.String("sql", query), zap.Any("args", args))
ctx, cancelFunc := context.WithTimeout(pctx, writeTimeout)
if _, err := tx.ExecContext(ctx, query, args...); err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
log.Warn("failed to rollback txn", zap.Error(err))
_ = logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
start, s.params.changefeedID, query, dmls.rowCount)
}
cancelFunc()
return 0, logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
start, s.params.changefeedID, query, dmls.rowCount)
Expand All @@ -709,6 +709,7 @@ func (s *mysqlSink) execDMLWithMaxRetries(pctx context.Context, dmls *preparedDM
cerror.WrapError(cerror.ErrMySQLTxnError, err),
start, s.params.changefeedID, dmls.markSQL, dmls.rowCount)
}
cancelFunc()
}

if err = tx.Commit(); err != nil {
Expand Down
Loading

0 comments on commit 9a75a40

Please sign in to comment.