Skip to content

Commit

Permalink
owner: fix failed changefeed can't be removed (#782)
Browse files Browse the repository at this point in the history
* owner: fix failed changefeed can't be removed

* add integration test

* fix test case

Co-authored-by: ti-srebot <[email protected]>
  • Loading branch information
amyangfei and ti-srebot authored Jul 27, 2020
1 parent a1a7ed7 commit 5388867
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 21 deletions.
61 changes: 49 additions & 12 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,14 @@ func (o *Owner) newChangeFeed(
log.Info("Find new changefeed", zap.Reflect("info", info),
zap.String("id", id), zap.Uint64("checkpoint ts", checkpointTs))

failpoint.Inject("NewChangefeedError", func() {
failpoint.Inject("NewChangefeedNoRetryError", func() {
failpoint.Return(nil, tikv.ErrGCTooEarly.GenWithStackByArgs(checkpointTs-300, checkpointTs))
})

failpoint.Inject("NewChangefeedRetryError", func() {
failpoint.Return(nil, errors.New("failpoint injected retriable error"))
})

// TODO here we create another pb client,we should reuse them
kvStore, err := kv.CreateTiStore(strings.Join(o.pdEndpoints, ","), o.credential)
if err != nil {
Expand Down Expand Up @@ -604,23 +608,47 @@ func (o *Owner) dispatchJob(ctx context.Context, job model.AdminJob) error {
return nil
}

func (o *Owner) collectChangefeedInfo(ctx context.Context, cid model.ChangeFeedID) (*changeFeed, *model.ChangeFeedStatus, model.FeedState, error) {
cf, ok := o.changeFeeds[cid]
func (o *Owner) collectChangefeedInfo(ctx context.Context, cid model.ChangeFeedID) (
cf *changeFeed,
status *model.ChangeFeedStatus,
feedState model.FeedState,
err error,
) {
var ok bool
cf, ok = o.changeFeeds[cid]
if ok {
return cf, cf.status, cf.info.State, nil
}
status, _, err := o.etcdClient.GetChangeFeedStatus(ctx, cid)
feedState = model.StateNormal

var cfInfo *model.ChangeFeedInfo
cfInfo, err = o.etcdClient.GetChangeFeedInfo(ctx, cid)
if err != nil && errors.Cause(err) != model.ErrChangeFeedNotExists {
return
}

status, _, err = o.etcdClient.GetChangeFeedStatus(ctx, cid)
if err != nil {
return nil, nil, model.StateNormal, err
if errors.Cause(err) == model.ErrChangeFeedNotExists {
// Only changefeed info exists and error field is not nil means
// the changefeed has met error, mark it as failed.
if cfInfo != nil && cfInfo.Error != nil {
feedState = model.StateFailed
}
}
return
}
feedState := model.StateNormal
switch status.AdminJobType {
case model.AdminNone, model.AdminResume:
if cfInfo != nil && cfInfo.Error != nil {
feedState = model.StateFailed
}
case model.AdminStop:
feedState = model.StateStopped
case model.AdminRemove:
feedState = model.StateRemoved
}
return nil, status, feedState, nil
return
}

func (o *Owner) handleAdminJob(ctx context.Context) error {
Expand All @@ -636,11 +664,20 @@ func (o *Owner) handleAdminJob(ctx context.Context) error {

cf, status, feedState, err := o.collectChangefeedInfo(ctx, job.CfID)
if err != nil {
if errors.Cause(err) == model.ErrChangeFeedNotExists {
if errors.Cause(err) != model.ErrChangeFeedNotExists {
return err
}
if feedState == model.StateFailed && job.Type == model.AdminRemove {
// changefeed in failed state, but changefeed status has not
// been created yet. Try to remove changefeed info only.
err := o.etcdClient.DeleteChangeFeedInfo(ctx, job.CfID)
if err != nil {
return errors.Trace(err)
}
} else {
log.Warn("invalid admin job, changefeed status not found", zap.String("changefeed", job.CfID))
continue
}
return err
continue
}
switch job.Type {
case model.AdminStop:
Expand Down Expand Up @@ -683,15 +720,15 @@ func (o *Owner) handleAdminJob(ctx context.Context) error {
// remove a removed changefeed
log.Info("changefeed has been removed, remove command will do nothing")
continue
case model.StateStopped:
case model.StateStopped, model.StateFailed:
// remove a paused changefeed
status.AdminJobType = model.AdminRemove
err = o.etcdClient.PutChangeFeedStatus(ctx, job.CfID, status)
if err != nil {
return errors.Trace(err)
}
default:
return errors.Errorf("changefeed in abnormal state: %+v", status)
return errors.Errorf("changefeed in abnormal state: %s, replication status: %+v", feedState, status)
}
}
// remove changefeed info
Expand Down
50 changes: 41 additions & 9 deletions tests/changefeed_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,44 @@ source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

CDC_COUNT=3
DB_COUNT=4
MAX_RETRIES=10

function check_changefeed_mark_failed() {
endpoints=$1
changefeedid=$2
ETCDCTL_API=3 etcdctl --endpoints=$endpoints get /tidb/cdc/changefeed/info/${changefeedid}
changefeed_info=$(ETCDCTL_API=3 etcdctl --endpoints=$endpoints get /tidb/cdc/changefeed/info/${changefeedid}|tail -n 1)
if [[ ! $changefeed_info == *"\"state\":\"failed\""* ]]; then
echo "changefeed is not marked as failed: $changefeed_info"
error_msg=$3
info=$(cdc cli changefeed query --pd=$endpoints -c $changefeedid -s)
state=$(echo $info|jq -r '.state')
if [[ ! "$state" == "failed" ]]; then
echo "changefeed state $state does not equal to failed"
exit 1
fi
message=$(echo $info|jq -r '.error.message')
if [[ ! "$message" =~ "$error_msg" ]]; then
echo "error message '$message' is not as expected '$error_msg'"
exit 1
fi
}

function check_no_changefeed() {
pd=$1
count=$(cdc cli changefeed list --pd=$pd 2>&1|jq '.|length')
if [[ ! "$count" -eq "0" ]]; then
exit 1
fi
}

function check_no_capture() {
pd=$1
count=$(cdc cli capture list --pd=$pd 2>&1|jq '.|length')
if [[ ! "$count" -eq "0" ]]; then
exit 1
fi
}

export -f check_changefeed_mark_failed
export -f check_no_changefeed
export -f check_no_capture

function run() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR
Expand All @@ -34,8 +56,9 @@ function run() {
start_ts=$(run_cdc_cli tso query --pd=http://$UP_PD_HOST:$UP_PD_PORT)
run_sql "CREATE DATABASE changefeed_error;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=changefeed_error
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/NewChangefeedError=1*return(true)'
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/NewChangefeedNoRetryError=1*return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
capture_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}')

TOPIC_NAME="ticdc-sink-retry-test-$RANDOM"
case $SINK_TYPE in
Expand All @@ -47,7 +70,7 @@ function run() {
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4"
fi

ensure 5 check_changefeed_mark_failed ${UP_PD_HOST}:${UP_PD_PORT} ${changefeedid}
ensure $MAX_RETRIES check_changefeed_mark_failed http://${UP_PD_HOST}:${UP_PD_PORT} ${changefeedid} "\[tikv:9006\]GC life time is shorter than transaction duration.*"
changefeed_info=$(ETCDCTL_API=3 etcdctl --endpoints=${UP_PD_HOST}:${UP_PD_PORT} get /tidb/cdc/changefeed/info/${changefeedid}|tail -n 1)
new_info=$(echo $changefeed_info|sed 's/"state":"failed"/"state":"normal"/g')
ETCDCTL_API=3 etcdctl --endpoints=${UP_PD_HOST}:${UP_PD_PORT} put /tidb/cdc/changefeed/info/${changefeedid} "$new_info"
Expand All @@ -58,6 +81,15 @@ function run() {
go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=changefeed_error
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/NewChangefeedRetryError=return(true)'
kill $capture_pid
ensure $MAX_RETRIES check_no_capture http://${UP_PD_HOST}:${UP_PD_PORT}
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
ensure $MAX_RETRIES check_changefeed_mark_failed http://${UP_PD_HOST}:${UP_PD_PORT} ${changefeedid} "failpoint injected retriable error"

cdc cli changefeed remove -c $changefeedid
ensure $MAX_RETRIES check_no_changefeed ${UP_PD_HOST}:${UP_PD_PORT}

export GO_FAILPOINTS=''
cleanup_process $CDC_BINARY
}
Expand Down

0 comments on commit 5388867

Please sign in to comment.