Skip to content

Commit

Permalink
log(ticdc): Add more error query information to the returned error to…
Browse files Browse the repository at this point in the history
… facilitate users to know the cause of the failure (#10945)

ref #10862
  • Loading branch information
hongyunyan authored Apr 23, 2024
1 parent e61d080 commit cd065d3
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 12 deletions.
4 changes: 3 additions & 1 deletion cdc/sink/ddlsink/mysql/mysql_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ package mysql
import (
"context"
"database/sql"
"fmt"
"net/url"
"time"

cerrors "github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
timodel "github.com/pingcap/tidb/pkg/parser/model"
Expand Down Expand Up @@ -233,7 +235,7 @@ func (m *DDLSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error {
zap.Duration("duration", time.Since(start)),
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID), zap.Error(err))
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
return cerror.WrapError(cerror.ErrMySQLTxnError, cerrors.WithMessage(err, fmt.Sprintf("Query info: %s; ", ddl.Query)))
}

log.Info("Exec DDL succeeded", zap.String("sql", ddl.Query),
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/dmlsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ func logDMLTxnErr(
zap.String("query", query), zap.Int("count", count),
zap.String("changefeed", changefeed))
}
return err
return errors.WithMessage(err, fmt.Sprintf("Failed query info: %s; ", query))
}

func isRetryableDMLError(err error) bool {
Expand Down
20 changes: 10 additions & 10 deletions cdc/syncpointstore/mysql_syncpoint_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,23 +86,23 @@ func (s *mysqlSyncPointStore) CreateSyncTable(ctx context.Context) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
log.Error("create sync table: begin Tx fail", zap.Error(err))
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "create sync table: begin Tx fail;"))
}
_, err = tx.Exec("CREATE DATABASE IF NOT EXISTS " + database)
if err != nil {
err2 := tx.Rollback()
if err2 != nil {
log.Error("failed to create syncpoint table", zap.Error(err2))
}
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to create syncpoint table;"))
}
_, err = tx.Exec("USE " + database)
if err != nil {
err2 := tx.Rollback()
if err2 != nil {
log.Error("failed to create syncpoint table", zap.Error(err2))
}
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to create syncpoint table;"))
}
query := `CREATE TABLE IF NOT EXISTS %s
(
Expand All @@ -121,10 +121,10 @@ func (s *mysqlSyncPointStore) CreateSyncTable(ctx context.Context) error {
if err2 != nil {
log.Error("failed to create syncpoint table", zap.Error(err2))
}
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to create syncpoint table;"))
}
err = tx.Commit()
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to create syncpoint table;"))
}

func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context,
Expand All @@ -134,7 +134,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context,
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
log.Error("sync table: begin Tx fail", zap.Error(err))
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "sync table: begin Tx fail;"))
}
row := tx.QueryRow("select @@tidb_current_ts")
var secondaryTs string
Expand All @@ -145,7 +145,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context,
if err2 != nil {
log.Error("failed to write syncpoint table", zap.Error(err))
}
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to write syncpoint table;"))
}
// insert ts map
query := "insert ignore into " + filter.TiCDCSystemSchema + "." + filter.SyncPointTable +
Expand All @@ -156,7 +156,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context,
if err2 != nil {
log.Error("failed to write syncpoint table", zap.Error(err2))
}
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to write syncpoint table;"))
}

// set global tidb_external_ts to secondary ts
Expand All @@ -172,7 +172,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context,
if err2 != nil {
log.Error("failed to write syncpoint table", zap.Error(err2))
}
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to write syncpoint table;"))
}
}

Expand All @@ -197,7 +197,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context,
}

err = tx.Commit()
return cerror.WrapError(cerror.ErrMySQLTxnError, err)
return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to write syncpoint table;"))
}

func (s *mysqlSyncPointStore) Close() error {
Expand Down

0 comments on commit cd065d3

Please sign in to comment.