Skip to content

Commit

Permalink
changefeed: fix changefeed does not fast fail when occur ErrGCTTLExce…
Browse files Browse the repository at this point in the history
…eded error (#3120) (#3135)
  • Loading branch information
ti-chi-bot authored Oct 28, 2021
1 parent cd56829 commit f8c1f36
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 14 deletions.
14 changes: 10 additions & 4 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ func (c *changefeed) Tick(ctx cdcContext.Context, state *model.ChangefeedReactor
func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs uint64) error {
state := c.state.Info.State
if state == model.StateNormal || state == model.StateStopped || state == model.StateError {
failpoint.Inject("InjectChangefeedFastFailError", func() error {
return cerror.ErrGCTTLExceeded.FastGen("InjectChangefeedFastFailError")
})
if err := c.gcManager.CheckStaleCheckpointTs(ctx, c.id, checkpointTs); err != nil {
return errors.Trace(err)
}
Expand All @@ -133,16 +136,19 @@ func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs
func (c *changefeed) tick(ctx cdcContext.Context, state *model.ChangefeedReactorState, captures map[model.CaptureID]*model.CaptureInfo) error {
c.state = state
c.feedStateManager.Tick(state)
if !c.feedStateManager.ShouldRunning() {
c.releaseResources()
return nil
}

checkpointTs := c.state.Info.GetCheckpointTs(c.state.Status)
// check stale checkPointTs must be called before `feedStateManager.ShouldRunning()`
// to ensure an error or stopped changefeed also be checked
if err := c.checkStaleCheckpointTs(ctx, checkpointTs); err != nil {
return errors.Trace(err)
}

if !c.feedStateManager.ShouldRunning() {
c.releaseResources()
return nil
}

if !c.preflightCheck(captures) {
return nil
}
Expand Down
39 changes: 29 additions & 10 deletions pkg/errors/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,40 @@ func WrapError(rfcError *errors.Error, err error) error {
return rfcError.Wrap(err).GenWithStackByCause()
}

// ChangefeedFastFailError checks the error, returns true if it is meaningless
// to retry on this error
// ChangeFeedFastFailError is read only.
// If this type of error occurs in a changefeed, it means that the data it
// wants to replicate has been or will be GC. So it makes no sense to try to
// resume the changefeed, and the changefeed should immediately be failed.
var ChangeFeedFastFailError = []*errors.Error{
ErrGCTTLExceeded, ErrSnapshotLostByGC, ErrStartTsBeforeGC,
}

// ChangefeedFastFailError checks if an error is a ChangefeedFastFailError
func ChangefeedFastFailError(err error) bool {
return ErrStartTsBeforeGC.Equal(errors.Cause(err)) || ErrSnapshotLostByGC.Equal(errors.Cause(err))
if err == nil {
return false
}
for _, e := range ChangeFeedFastFailError {
if e.Equal(err) {
return true
}
rfcCode, ok := RFCCode(err)
if ok && e.RFCCode() == rfcCode {
return true
}
}
return false
}

// ChangefeedFastFailErrorCode checks the error, returns true if it is meaningless
// to retry on this error
// ChangefeedFastFailErrorCode checks the error code, returns true if it is a
// ChangefeedFastFailError code
func ChangefeedFastFailErrorCode(errCode errors.RFCErrorCode) bool {
switch errCode {
case ErrStartTsBeforeGC.RFCCode(), ErrSnapshotLostByGC.RFCCode():
return true
default:
return false
for _, e := range ChangeFeedFastFailError {
if errCode == e.RFCCode() {
return true
}
}
return false
}

// RFCCode returns a RFCCode from an error
Expand Down
34 changes: 34 additions & 0 deletions pkg/errors/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,37 @@ func (s *helperSuite) TestIsRetryableError(c *check.C) {
c.Assert(ret, check.Equals, tt.want, check.Commentf("case:%s", tt.name))
}
}

func (s *helperSuite) TestChangefeedFastFailError(c *check.C) {
defer testleak.AfterTest(c)()

err := ErrGCTTLExceeded.FastGenByArgs()
rfcCode, _ := RFCCode(err)
c.Assert(ChangefeedFastFailError(err), check.IsTrue)
c.Assert(ChangefeedFastFailErrorCode(rfcCode), check.IsTrue)

err = ErrGCTTLExceeded.GenWithStack("aa")
rfcCode, _ = RFCCode(err)
c.Assert(ChangefeedFastFailError(err), check.IsTrue)
c.Assert(ChangefeedFastFailErrorCode(rfcCode), check.IsTrue)

err = ErrGCTTLExceeded.Wrap(errors.New("aa"))
rfcCode, _ = RFCCode(err)
c.Assert(ChangefeedFastFailError(err), check.IsTrue)
c.Assert(ChangefeedFastFailErrorCode(rfcCode), check.IsTrue)

err = ErrSnapshotLostByGC.FastGenByArgs()
rfcCode, _ = RFCCode(err)
c.Assert(ChangefeedFastFailError(err), check.IsTrue)
c.Assert(ChangefeedFastFailErrorCode(rfcCode), check.IsTrue)

err = ErrStartTsBeforeGC.FastGenByArgs()
rfcCode, _ = RFCCode(err)
c.Assert(ChangefeedFastFailError(err), check.IsTrue)
c.Assert(ChangefeedFastFailErrorCode(rfcCode), check.IsTrue)

err = ErrToTLSConfigFailed.FastGenByArgs()
rfcCode, _ = RFCCode(err)
c.Assert(ChangefeedFastFailError(err), check.IsFalse)
c.Assert(ChangefeedFastFailErrorCode(rfcCode), check.IsFalse)
}
2 changes: 2 additions & 0 deletions pkg/txnutil/gc/gc_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func (s *gcManagerSuite) TestCheckStaleCheckpointTs(c *check.C) {
ctx := context.Background()
err := gcManager.CheckStaleCheckpointTs(ctx, "cfID", 10)
c.Assert(cerror.ErrGCTTLExceeded.Equal(errors.Cause(err)), check.IsTrue)
c.Assert(cerror.ChangefeedFastFailError(err), check.IsTrue)

err = gcManager.CheckStaleCheckpointTs(ctx, "cfID", oracle.GoTimeToTS(time.Now()))
c.Assert(err, check.IsNil)
Expand All @@ -130,4 +131,5 @@ func (s *gcManagerSuite) TestCheckStaleCheckpointTs(c *check.C) {
gcManager.lastSafePointTs = 20
err = gcManager.CheckStaleCheckpointTs(ctx, "cfID", 10)
c.Assert(cerror.ErrSnapshotLostByGC.Equal(errors.Cause(err)), check.IsTrue)
c.Assert(cerror.ChangefeedFastFailError(err), check.IsTrue)
}
71 changes: 71 additions & 0 deletions tests/changefeed_fast_fail/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#!/bin/bash

set -e

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1
MAX_RETRIES=20

function check_changefeed_mark_failed_regex() {
endpoints=$1
changefeedid=$2
error_msg=$3
info=$(cdc cli changefeed query --pd=$endpoints -c $changefeedid -s)
echo "$info"
state=$(echo $info | jq -r '.state')
if [[ ! "$state" == "failed" ]]; then
echo "changefeed state $state does not equal to failed"
exit 1
fi
message=$(echo $info | jq -r '.error.message')
if [[ ! "$message" =~ $error_msg ]]; then
echo "error message '$message' does not match '$error_msg'"
exit 1
fi
}

export -f check_changefeed_mark_failed_regex

function run() {
# it is no need to test kafka
# the logic are all the same
if [ "$SINK_TYPE" == "kafka" ]; then
return
fi

rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})
run_sql "CREATE DATABASE changefeed_error;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/owner/InjectChangefeedFastFailError=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

SINK_URI="mysql://normal:[email protected]:3306/?max-txn-row=1"

changefeedid="changefeed-fast-fail"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid

ensure $MAX_RETRIES check_changefeed_mark_failed_regex http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "ErrGCTTLExceeded"
run_cdc_cli changefeed remove -c $changefeedid
sleep 2
#result=$(curl -X GET "http://127.0.0.1:8300/api/v1/changefeeds")
result=$(cdc cli changefeed list)
if [[ ! "$result" == "[]" ]]; then
echo "changefeed remove failed"
exit 1
fi

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
5 changes: 5 additions & 0 deletions tests/move_table/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ CDC_BINARY=cdc.test
SINK_TYPE=$1

function run() {
# TODO: remove after kafka-consumer/main.go is fixed
if [ "$SINK_TYPE" == "kafka" ]; then
return
fi

rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR
Expand Down

0 comments on commit f8c1f36

Please sign in to comment.