Skip to content

Commit

Permalink
sink(ticdc): use separate ctx for each DML (#7981) (#7985)
Browse files Browse the repository at this point in the history
close #7982
  • Loading branch information
ti-chi-bot authored Jan 10, 2023
1 parent f69c799 commit 9613e94
Showing 1 changed file with 7 additions and 3 deletions.
10 changes: 7 additions & 3 deletions cdc/sink/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,8 +667,6 @@ func (s *mysqlSink) execDMLWithMaxRetries(pctx context.Context, dmls *preparedDM
return retry.Do(pctx, func() error {
writeTimeout, _ := time.ParseDuration(s.params.writeTimeout)
writeTimeout += networkDriftDuration
ctx, cancelFunc := context.WithTimeout(pctx, writeTimeout)
defer cancelFunc()

failpoint.Inject("MySQLSinkTxnRandomError", func() {
failpoint.Return(
Expand All @@ -680,7 +678,7 @@ func (s *mysqlSink) execDMLWithMaxRetries(pctx context.Context, dmls *preparedDM
time.Sleep(time.Hour)
})
err := s.statistics.RecordBatchExecution(func() (int, error) {
tx, err := s.db.BeginTx(ctx, nil)
tx, err := s.db.BeginTx(pctx, nil)
if err != nil {
return 0, logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
Expand All @@ -690,29 +688,35 @@ func (s *mysqlSink) execDMLWithMaxRetries(pctx context.Context, dmls *preparedDM
for i, query := range dmls.sqls {
args := dmls.values[i]
log.Debug("exec row", zap.String("sql", query), zap.Any("args", args))
ctx, cancelFunc := context.WithTimeout(pctx, writeTimeout)
if _, err := tx.ExecContext(ctx, query, args...); err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
log.Warn("failed to rollback txn", zap.Error(err))
_ = logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
start, s.params.changefeedID, query, dmls.rowCount)
}
cancelFunc()
return 0, logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
start, s.params.changefeedID, query, dmls.rowCount)
}
cancelFunc()
}

if len(dmls.markSQL) != 0 {
log.Debug("exec row", zap.String("sql", dmls.markSQL))
ctx, cancelFunc := context.WithTimeout(pctx, writeTimeout)
if _, err := tx.ExecContext(ctx, dmls.markSQL); err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
log.Warn("failed to rollback txn", zap.Error(err))
}
cancelFunc()
return 0, logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
start, s.params.changefeedID, dmls.markSQL, dmls.rowCount)
}
cancelFunc()
}

if err = tx.Commit(); err != nil {
Expand Down

0 comments on commit 9613e94

Please sign in to comment.