Skip to content

Commit

Permalink
test: fix integration test changefeed_error (#930)
Browse files Browse the repository at this point in the history
ref #442
  • Loading branch information
asddongmen authored Jan 21, 2025
1 parent cd46264 commit 932ae4c
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 124 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/integration_test_mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -468,10 +468,10 @@ jobs:
run: |
export TICDC_NEWARCH=true && make integration_test CASE=move_table
- name: Test changefeed_fast_fail
- name: Test changefeed_error
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=changefeed_fast_fail
export TICDC_NEWARCH=true && make integration_test CASE=changefeed_error
# The 16th case in this group
- name: Test capture_session_done_during_task
Expand Down
33 changes: 32 additions & 1 deletion coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func New(node *node.Info,
pdClock: pdClock,
mc: mc,
updatedChangefeedCh: make(chan map[common.ChangeFeedID]*changefeed.Changefeed, 1024),
stateChangedCh: make(chan *ChangefeedStateChangeEvent, 8),
stateChangedCh: make(chan *ChangefeedStateChangeEvent, 1024),
backend: backend,
}
c.taskScheduler = threadpool.NewThreadPoolDefault()
Expand Down Expand Up @@ -224,12 +224,43 @@ func (c *coordinator) handleStateChangedEvent(ctx context.Context, event *Change
return nil
}

// checkStaleCheckpointTs checks if the checkpointTs is stale, if it is, it will send a state change event to the stateChangedCh
func (c *coordinator) checkStaleCheckpointTs(ctx context.Context, id common.ChangeFeedID, reportedCheckpointTs uint64) {
err := c.gcManager.CheckStaleCheckpointTs(ctx, id, reportedCheckpointTs)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err != nil {
errCode, _ := errors.RFCCode(err)
state := model.StateFailed
if !errors.IsChangefeedGCFastFailErrorCode(errCode) {
state = model.StateWarning
}
select {
case <-ctx.Done():
log.Warn("Failed to send state change event to stateChangedCh since context timeout, "+
"there may be a lot of state need to be handled. Try next time",
zap.String("changefeed", id.String()),
zap.Error(ctx.Err()))
return
case c.stateChangedCh <- &ChangefeedStateChangeEvent{
ChangefeedID: id,
State: state,
err: &model.RunningError{
Code: string(errCode),
Message: err.Error(),
},
}:
}
}
}

func (c *coordinator) saveCheckpointTs(ctx context.Context, cfs map[common.ChangeFeedID]*changefeed.Changefeed) error {
statusMap := make(map[common.ChangeFeedID]uint64)
for _, upCf := range cfs {
reportedCheckpointTs := upCf.GetStatus().CheckpointTs
if upCf.GetLastSavedCheckPointTs() < reportedCheckpointTs {
statusMap[upCf.ID] = reportedCheckpointTs
c.checkStaleCheckpointTs(ctx, upCf.ID, reportedCheckpointTs)
}
}
if len(statusMap) == 0 {
Expand Down
47 changes: 27 additions & 20 deletions maintainer/maintainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/maintainer/replica"
Expand Down Expand Up @@ -99,7 +100,6 @@ type Maintainer struct {
// so when a maintainer is created, that means the dispatcher is gone and must be recreated.
ddlSpan *replica.SpanReplication

pdEndpoints []string
nodeManager *watcher.NodeManager
// closedNodes is used to record the nodes that dispatcherManager is closed
closedNodes map[node.ID]struct{}
Expand All @@ -123,8 +123,10 @@ type Maintainer struct {
// false when otherwise, such as maintainer move to different nodes.
newChangefeed bool

errLock sync.Mutex
runningErrors map[node.ID]*heartbeatpb.RunningError
runningErrors struct {
sync.Mutex
m map[node.ID]*heartbeatpb.RunningError
}
cancelUpdateMetrics context.CancelFunc

changefeedCheckpointTsGauge prometheus.Gauge
Expand All @@ -150,6 +152,7 @@ func NewMaintainer(cfID common.ChangeFeedID,
checkpointTs uint64,
newChangfeed bool,
) *Maintainer {

mc := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter)
nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName)
tableTriggerEventDispatcherID := common.NewDispatcherID()
Expand Down Expand Up @@ -184,7 +187,6 @@ func NewMaintainer(cfID common.ChangeFeedID,

ddlSpan: ddlSpan,
checkpointTsByCapture: make(map[node.ID]heartbeatpb.Watermark),
runningErrors: map[node.ID]*heartbeatpb.RunningError{},
newChangefeed: newChangfeed,

changefeedCheckpointTsGauge: metrics.ChangefeedCheckpointTsGauge.WithLabelValues(cfID.Namespace(), cfID.Name()),
Expand All @@ -197,6 +199,7 @@ func NewMaintainer(cfID common.ChangeFeedID,
tableCountGauge: metrics.TableGauge.WithLabelValues(cfID.Namespace(), cfID.Name()),
handleEventDuration: metrics.MaintainerHandleEventDuration.WithLabelValues(cfID.Namespace(), cfID.Name()),
}
m.runningErrors.m = make(map[node.ID]*heartbeatpb.RunningError)

m.watermark.Watermark = &heartbeatpb.Watermark{
CheckpointTs: checkpointTs,
Expand Down Expand Up @@ -298,15 +301,15 @@ func (m *Maintainer) Close() {
}

func (m *Maintainer) GetMaintainerStatus() *heartbeatpb.MaintainerStatus {
m.errLock.Lock()
defer m.errLock.Unlock()
m.runningErrors.Lock()
defer m.runningErrors.Unlock()
var runningErrors []*heartbeatpb.RunningError
if len(m.runningErrors) > 0 {
runningErrors = make([]*heartbeatpb.RunningError, 0, len(m.runningErrors))
for _, e := range m.runningErrors {
if len(m.runningErrors.m) > 0 {
runningErrors = make([]*heartbeatpb.RunningError, 0, len(m.runningErrors.m))
for _, e := range m.runningErrors.m {
runningErrors = append(runningErrors, e)
}
clear(m.runningErrors)
m.runningErrors.m = make(map[node.ID]*heartbeatpb.RunningError)
}

status := &heartbeatpb.MaintainerStatus{
Expand All @@ -324,6 +327,9 @@ func (m *Maintainer) initialize() error {
log.Info("start to initialize changefeed maintainer",
zap.String("id", m.id.String()))

failpoint.Inject("NewChangefeedRetryError", func() {
failpoint.Return(errors.New("failpoint injected retriable error"))
})
// detect the capture changes
m.nodeManager.RegisterNodeChangeHandler(node.ID("maintainer-"+m.id.Name()), func(allNodes map[node.ID]*node.Info) {
m.mutex.Lock()
Expand Down Expand Up @@ -553,10 +559,10 @@ func (m *Maintainer) onError(from node.ID, err *heartbeatpb.RunningError) {
if info, ok := m.nodeManager.GetAliveNodes()[from]; ok {
err.Node = info.AdvertiseAddr
}
m.errLock.Lock()
m.runningErrors.Lock()
m.statusChanged.Store(true)
m.runningErrors[from] = err
m.errLock.Unlock()
m.runningErrors.m[from] = err
m.runningErrors.Unlock()
}

func (m *Maintainer) onBlockStateRequest(msg *messaging.TargetMessage) {
Expand Down Expand Up @@ -709,13 +715,14 @@ func (m *Maintainer) handleError(err error) {
} else {
code = string(errors.ErrOwnerUnknown.RFCCode())
}
m.runningErrors = map[node.ID]*heartbeatpb.RunningError{
m.selfNode.ID: {
Time: time.Now().String(),
Node: m.selfNode.AdvertiseAddr,
Code: code,
Message: err.Error(),
},

m.runningErrors.Lock()
defer m.runningErrors.Unlock()
m.runningErrors.m[m.selfNode.ID] = &heartbeatpb.RunningError{
Time: time.Now().String(),
Node: m.selfNode.AdvertiseAddr,
Code: code,
Message: err.Error(),
}
m.statusChanged.Store(true)
}
Expand Down
12 changes: 10 additions & 2 deletions maintainer/maintainer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,11 @@ func (m *Manager) onCoordinatorBootstrapRequest(msg *messaging.TargetMessage) {
response := &heartbeatpb.CoordinatorBootstrapResponse{}
m.maintainers.Range(func(key, value interface{}) bool {
maintainer := value.(*Maintainer)
response.Statuses = append(response.Statuses, maintainer.GetMaintainerStatus())
status := maintainer.GetMaintainerStatus()
if status.GetErr() != nil {
log.Info("fizz changefeed meet error", zap.Any("status", status))
}
response.Statuses = append(response.Statuses, status)
maintainer.statusChanged.Store(false)
maintainer.lastReportTime = time.Now()
return true
Expand Down Expand Up @@ -294,7 +298,11 @@ func (m *Manager) sendHeartbeat() {
m.maintainers.Range(func(key, value interface{}) bool {
cfMaintainer := value.(*Maintainer)
if cfMaintainer.statusChanged.Load() || time.Since(cfMaintainer.lastReportTime) > time.Second*2 {
response.Statuses = append(response.Statuses, cfMaintainer.GetMaintainerStatus())
mStatus := cfMaintainer.GetMaintainerStatus()
if mStatus.GetErr() != nil {
log.Info("fizz changefeed meet error", zap.Any("status", mStatus))
}
response.Statuses = append(response.Statuses, mStatus)
cfMaintainer.statusChanged.Store(false)
cfMaintainer.lastReportTime = time.Now()
}
Expand Down
72 changes: 35 additions & 37 deletions tests/integration_tests/changefeed_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ function run() {
start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})
run_sql "CREATE DATABASE changefeed_error;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=changefeed_error
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/NewChangefeedNoRetryError=1*return(true)'
export GO_FAILPOINTS='github.com/pingcap/ticdc/logservice/schemastore/getAllPhysicalTablesGCFastFail=1*return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
capture_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}')

Expand All @@ -60,7 +60,9 @@ function run() {
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "failed" "[CDC:ErrStartTsBeforeGC]" ""
# CASE 1: Test unretryable error
echo "Start case 1: Test unretryable error"
ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "failed" "[CDC:ErrSnapshotLostByGC]" ""
run_cdc_cli changefeed resume -c $changefeedid

check_table_exists "changefeed_error.usertable" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
Expand All @@ -69,61 +71,57 @@ function run() {
go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=changefeed_error
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/NewChangefeedRetryError=return(true)'
kill -9 $capture_pid
# make sure old cpature key and old owner key expire in etcd
export GO_FAILPOINTS=''
cleanup_process $CDC_BINARY
# make sure old capture key and old owner key expire in etcd
ETCDCTL_API=3 etcdctl get /tidb/cdc/default/__cdc_meta__/capture --prefix | grep -v "capture"
ensure $MAX_RETRIES "check_etcd_meta_not_exist '/tidb/cdc/default/__cdc_meta__/capture' 'capture'"
ensure $MAX_RETRIES "check_etcd_meta_not_exist '/tidb/cdc/default/__cdc_meta__/owner' 'owner'"
echo "Pass case 1"

# CASE 2: Test retryable error
echo "Start case 2: Test retryable error"
export GO_FAILPOINTS='github.com/pingcap/ticdc/maintainer/NewChangefeedRetryError=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "warning" "failpoint injected retriable error" ""

# try to create another changefeed to make sure the coordinator is not stuck
changefeedid_2="changefeed-error-2"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid_2
ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_2} "warning" "failpoint injected retriable error" ""

run_cdc_cli changefeed remove -c $changefeedid
ensure $MAX_RETRIES check_no_changefeed ${UP_PD_HOST_1}:${UP_PD_PORT_1}

export GO_FAILPOINTS=''
cleanup_process $CDC_BINARY
ensure $MAX_RETRIES "check_etcd_meta_not_exist '/tidb/cdc/default/__cdc_meta__/owner' 'owner'"

# owner DDL error case
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/InjectChangefeedDDLError=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
changefeedid_1="changefeed-error-1"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid_1

run_sql "CREATE table changefeed_error.DDLERROR(id int primary key, val int);"
ensure $MAX_RETRIES check_changefeed_status 127.0.0.1:8300 $changefeedid_1 warning last_warning ErrExecDDLFailed

run_cdc_cli changefeed remove -c $changefeedid_1
cleanup_process $CDC_BINARY
ensure $MAX_RETRIES "check_etcd_meta_not_exist '/tidb/cdc/default/__cdc_meta__/owner' 'owner'"
# updating GC safepoint failure case
export GO_FAILPOINTS='github.com/pingcap/tiflow/pkg/txnutil/gc/InjectActualGCSafePoint=return(9223372036854775807)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
run_cdc_cli changefeed remove -c $changefeedid_2
ensure $MAX_RETRIES check_no_changefeed ${UP_PD_HOST_1}:${UP_PD_PORT_1}

changefeedid_2="changefeed-error-2"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid_2
ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_2} "failed" "[CDC:ErrSnapshotLostByGC]" ""
# test changefeed remove twice, and it should return "Changefeed not found"
result=$(cdc cli changefeed remove -c $changefeedid_2)
if [[ $result != *"Changefeed not found"* ]]; then
echo "changefeeed remove result is expected to contains 'Changefeed not found', \
but actually got $result"
exit 1
fi

run_cdc_cli changefeed remove -c $changefeedid_2
export GO_FAILPOINTS=''
cleanup_process $CDC_BINARY
ensure $MAX_RETRIES "check_etcd_meta_not_exist '/tidb/cdc/default/__cdc_meta__/owner' 'owner'"
echo "Pass case 2"

# make sure initialize changefeed error will not stuck the owner
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/redo/ChangefeedNewRedoManagerError=2*return(true)'
# CASE 3: updating GC safepoint failure case
echo "Start case 3: updating GC safepoint failure case"
export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/txnutil/gc/InjectActualGCSafePoint=return(9223372036854775807)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

changefeedid_3="changefeed-initialize-error"
run_cdc_cli changefeed create --start-ts=0 --sink-uri="$SINK_URI" -c $changefeedid_3
ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "normal" "null" ""
run_cdc_cli changefeed pause -c $changefeedid_3
ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "stopped" "null" ""
run_cdc_cli changefeed resume -c $changefeedid_3
ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "normal" "null" ""
changefeedid_3="changefeed-error-3"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid_3
ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "failed" "[CDC:ErrSnapshotLostByGC]" ""

run_cdc_cli changefeed remove -c $changefeedid_3
export GO_FAILPOINTS=''
cleanup_process $CDC_BINARY
echo "Pass case 3"
}

trap stop_tidb_cluster EXIT
Expand Down
62 changes: 0 additions & 62 deletions tests/integration_tests/changefeed_fast_fail/run.sh

This file was deleted.

0 comments on commit 932ae4c

Please sign in to comment.