Skip to content

Commit

Permalink
*: fix several integration test cases (#943)
Browse files Browse the repository at this point in the history
ref #442
  • Loading branch information
asddongmen authored Jan 22, 2025
1 parent 3619221 commit 623af8c
Show file tree
Hide file tree
Showing 16 changed files with 112 additions and 231 deletions.
12 changes: 11 additions & 1 deletion .github/workflows/integration_test_mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -478,8 +478,18 @@ jobs:
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=capture_session_done_during_task
- name: Test changefeed_dup_error_restart
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=changefeed_dup_error_restart
- name: Test mysql_sink_retry
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=mysql_sink_retry
# The 17th case in this group
# The 19th case in this group
- name: Test cdc
if: ${{ success() }}
run: |
Expand Down
2 changes: 1 addition & 1 deletion maintainer/replica/replication_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func getTs(client TSOClient) (uint64, error) {
}
ts = oracle.ComposeTS(phy, logic)
return nil
}, retry.WithTotalRetryDuratoin(300*time.Millisecond),
}, retry.WithTotalRetryDuration(300*time.Millisecond),
retry.WithBackoffBaseDelay(100))
return ts, errors.Trace(err)
}
4 changes: 4 additions & 0 deletions pkg/errors/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ var (
"MySQL txn error",
errors.RFCCodeText("CDC:ErrMySQLTxnError"),
)
ErrMySQLDuplicateEntry = errors.Normalize(
"MySQL duplicate entry error",
errors.RFCCodeText("CDC:ErrMySQLDuplicateEntry"),
)
ErrMySQLQueryError = errors.Normalize(
"MySQL query error",
errors.RFCCodeText("CDC:ErrMySQLQueryError"),
Expand Down
4 changes: 2 additions & 2 deletions pkg/retry/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ func WithMaxTries(tries uint64) Option {
}
}

// WithTotalRetryDuratoin configures the total retry duration.
func WithTotalRetryDuratoin(retryDuration time.Duration) Option {
// WithTotalRetryDuration configures the total retry duration.
func WithTotalRetryDuration(retryDuration time.Duration) Option {
return func(o *retryOptions) {
o.totalRetryDuration = retryDuration
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/retry/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestTotalRetryDuration(t *testing.T) {
err := Do(
context.Background(), f,
WithBackoffBaseDelay(math.MinInt64),
WithTotalRetryDuratoin(time.Second),
WithTotalRetryDuration(time.Second),
)
require.Regexp(t, "test", errors.Cause(err))
require.LessOrEqual(t, 1, int(math.Round(time.Since(start).Seconds())))
Expand All @@ -182,7 +182,7 @@ func TestTotalRetryDuration(t *testing.T) {
err = Do(
context.Background(), f,
WithBackoffBaseDelay(math.MinInt64),
WithTotalRetryDuratoin(2*time.Second),
WithTotalRetryDuration(2*time.Second),
)
require.Regexp(t, "test", errors.Cause(err))
require.LessOrEqual(t, 2, int(math.Round(time.Since(start).Seconds())))
Expand Down
27 changes: 27 additions & 0 deletions pkg/sink/mysql/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,3 +375,30 @@ func ShouldFormatVectorType(db *sql.DB, cfg *MysqlConfig) bool {
}
return false
}

func isRetryableDMLError(err error) bool {
if !cerror.IsRetryableError(err) {
return false
}

errCode, ok := getSQLErrCode(err)
if !ok {
return true
}

switch errCode {
// when meet dup entry error, we don't retry and report the error directly to owner to restart the changefeed.
case mysql.ErrNoSuchTable, mysql.ErrBadDB, mysql.ErrDupEntry:
return false
}
return true
}

func getSQLErrCode(err error) (errors.ErrCode, bool) {
mysqlErr, ok := errors.Cause(err).(*dmysql.MySQLError)
if !ok {
return -1, false
}

return errors.ErrCode(mysqlErr.Number), true
}
60 changes: 53 additions & 7 deletions pkg/sink/mysql/mysql_writer_dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,20 @@ package mysql
import (
"context"
"database/sql"
"database/sql/driver"
"fmt"
"strings"
"time"

dmysql "github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/retry"
"github.com/pingcap/tidb/pkg/parser/mysql"
pmysql "github.com/pingcap/tiflow/pkg/sink/mysql"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -46,9 +51,10 @@ func (w *MysqlWriter) prepareDMLs(events []*commonEvent.DMLEvent) (*preparedDMLs
dmls.startTs = append(dmls.startTs, event.StartTs)
}

translateToInsert := !w.cfg.SafeMode && event.CommitTs > event.ReplicatingTs
log.Debug("translate to insert",
zap.Bool("translateToInsert", translateToInsert),
inSafeMode := !w.cfg.SafeMode && event.CommitTs > event.ReplicatingTs

log.Debug("inSafeMode",
zap.Bool("inSafeMode", inSafeMode),
zap.Uint64("firstRowCommitTs", event.CommitTs),
zap.Uint64("firstRowReplicatingTs", event.ReplicatingTs),
zap.Bool("safeMode", w.cfg.SafeMode))
Expand All @@ -65,7 +71,7 @@ func (w *MysqlWriter) prepareDMLs(events []*commonEvent.DMLEvent) (*preparedDMLs

switch row.RowType {
case commonEvent.RowTypeUpdate:
if translateToInsert {
if inSafeMode {
query, args, err = buildUpdate(event.TableInfo, row, w.cfg.ForceReplicate)
} else {
query, args, err = buildDelete(event.TableInfo, row, w.cfg.ForceReplicate)
Expand All @@ -77,12 +83,12 @@ func (w *MysqlWriter) prepareDMLs(events []*commonEvent.DMLEvent) (*preparedDMLs
dmls.sqls = append(dmls.sqls, query)
dmls.values = append(dmls.values, args)
}
query, args, err = buildInsert(event.TableInfo, row, translateToInsert)
query, args, err = buildInsert(event.TableInfo, row, inSafeMode)
}
case commonEvent.RowTypeDelete:
query, args, err = buildDelete(event.TableInfo, row, w.cfg.ForceReplicate)
case commonEvent.RowTypeInsert:
query, args, err = buildInsert(event.TableInfo, row, translateToInsert)
query, args, err = buildInsert(event.TableInfo, row, inSafeMode)
}

if err != nil {
Expand Down Expand Up @@ -157,14 +163,32 @@ func (w *MysqlWriter) execDMLWithMaxRetries(dmls *preparedDMLs) error {
return dmls.rowCount, dmls.approximateSize, nil
}
return retry.Do(w.ctx, func() error {
failpoint.Inject("MySQLSinkTxnRandomError", func() {
log.Warn("inject MySQLSinkTxnRandomError")
err := errors.Trace(driver.ErrBadConn)
logDMLTxnErr(err, time.Now(), w.ChangefeedID.String(), dmls.sqls[0], dmls.rowCount, dmls.startTs)
failpoint.Return(err)
})

failpoint.Inject("MySQLDuplicateEntryError", func() {
log.Warn("inject MySQLDuplicateEntryError")
err := cerror.WrapError(cerror.ErrMySQLDuplicateEntry, &dmysql.MySQLError{
Number: uint16(mysql.ErrDupEntry),
})
logDMLTxnErr(err, time.Now(), w.ChangefeedID.String(), dmls.sqls[0], dmls.rowCount, dmls.startTs)
failpoint.Return(err)
})

err := w.statistics.RecordBatchExecution(tryExec)
if err != nil {
logDMLTxnErr(err, time.Now(), w.ChangefeedID.String(), dmls.sqls[0], dmls.rowCount, dmls.startTs)
return errors.Trace(err)
}
return nil
}, retry.WithBackoffBaseDelay(pmysql.BackoffBaseDelay.Milliseconds()),
retry.WithBackoffMaxDelay(pmysql.BackoffMaxDelay.Milliseconds()),
retry.WithMaxTries(w.cfg.DMLMaxRetry))
retry.WithMaxTries(w.cfg.DMLMaxRetry),
retry.WithIsRetryableErr(isRetryableDMLError))
}

func (w *MysqlWriter) sequenceExecute(
Expand Down Expand Up @@ -238,3 +262,25 @@ func (w *MysqlWriter) multiStmtExecute(
}
return nil
}

func logDMLTxnErr(
err error, start time.Time, changefeed string,
query string, count int, startTs []common.Ts,
) error {
if len(query) > 1024 {
query = query[:1024]
}
if isRetryableDMLError(err) {
log.Warn("execute DMLs with error, retry later",
zap.Error(err), zap.Duration("duration", time.Since(start)),
zap.String("query", query), zap.Int("count", count),
zap.Uint64s("startTs", startTs),
zap.String("changefeed", changefeed))
} else {
log.Error("execute DMLs with error, can not retry",
zap.Error(err), zap.Duration("duration", time.Since(start)),
zap.String("query", query), zap.Int("count", count),
zap.String("changefeed", changefeed))
}
return errors.WithMessage(err, fmt.Sprintf("Failed query info: %s; ", query))
}
29 changes: 0 additions & 29 deletions tests/integration_tests/changefeed_auto_stop/conf/diff_config.toml

This file was deleted.

13 changes: 0 additions & 13 deletions tests/integration_tests/changefeed_auto_stop/conf/workload

This file was deleted.

62 changes: 0 additions & 62 deletions tests/integration_tests/changefeed_auto_stop/run.sh

This file was deleted.

8 changes: 3 additions & 5 deletions tests/integration_tests/changefeed_dup_error_restart/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ function run() {
start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})
run_sql "CREATE DATABASE changefeed_dup_error_restart;"
go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=changefeed_dup_error_restart
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/txn/mysql/MySQLDuplicateEntryError=5%return(true)'
export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/sink/mysql/MySQLDuplicateEntryError=5%return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

TOPIC_NAME="ticdc-changefeed-dup-error-restart-test-$RANDOM"
Expand All @@ -35,14 +35,12 @@ function run() {
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"

run_sql "CREATE TABLE changefeed_dup_error_restart.finish_mark_1 (a int primary key);"
sleep 30
check_table_exists "changefeed_dup_error_restart.finish_mark_1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 60
check_table_exists "changefeed_dup_error_restart.finish_mark_1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=changefeed_dup_error_restart
run_sql "CREATE TABLE changefeed_dup_error_restart.finish_mark_2 (a int primary key);"
sleep 30
check_table_exists "changefeed_dup_error_restart.finish_mark_2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 60
check_table_exists "changefeed_dup_error_restart.finish_mark_2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

cleanup_process $CDC_BINARY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ CDC_COUNT=3
DB_COUNT=4

function run() {
# Validate sink type is mysql since this test is mysql specific
if [ "$SINK_TYPE" != "mysql" ]; then
echo "skip sink_hang test for $SINK_TYPE"
return 0
fi

rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR
Expand All @@ -21,25 +27,12 @@ function run() {
start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})
run_sql "CREATE DATABASE sink_retry;"
go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=sink_retry
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/txn/mysql/MySQLSinkTxnRandomError=25%return(true)'
export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/sink/mysql/MySQLSinkTxnRandomError=25%return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

TOPIC_NAME="ticdc-sink-retry-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://normal:[email protected]:3306/?max-txn-row=1" ;;
esac

SINK_URI="mysql://normal:[email protected]:3306/?max-txn-row=1"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

run_sql "CREATE TABLE sink_retry.finish_mark_1 (a int primary key);"
sleep 30
Expand Down
Loading

0 comments on commit 623af8c

Please sign in to comment.