diff --git a/cdc/capture/http_errors.go b/cdc/capture/http_errors.go index ed58b71e108..91aa41f6d87 100644 --- a/cdc/capture/http_errors.go +++ b/cdc/capture/http_errors.go @@ -25,7 +25,8 @@ var httpBadRequestError = []*errors.Error{ cerror.ErrAPIInvalidParam, cerror.ErrSinkURIInvalid, cerror.ErrStartTsBeforeGC, cerror.ErrChangeFeedNotExists, cerror.ErrTargetTsBeforeStartTs, cerror.ErrTableIneligible, cerror.ErrFilterRuleInvalid, cerror.ErrChangefeedUpdateRefused, cerror.ErrMySQLConnectionError, - cerror.ErrMySQLInvalidConfig, + cerror.ErrMySQLInvalidConfig, cerror.ErrCaptureNotExist, cerror.ErrTaskStatusNotExists, + cerror.ErrTaskPositionNotExists, } // IsHTTPBadRequestError check if a error is a http bad request error diff --git a/cdc/sink/manager.go b/cdc/sink/manager.go index 6fae483b2d0..1697a0b8cba 100644 --- a/cdc/sink/manager.go +++ b/cdc/sink/manager.go @@ -162,6 +162,7 @@ func (m *Manager) getCheckpointTs(tableID model.TableID) uint64 { return atomic.LoadUint64(&m.changeFeedCheckpointTs) } +// UpdateChangeFeedCheckpointTs update the changeFeedCheckpointTs every processor tick func (m *Manager) UpdateChangeFeedCheckpointTs(checkpointTs uint64) { atomic.StoreUint64(&m.changeFeedCheckpointTs, checkpointTs) if m.backendSink != nil { @@ -235,10 +236,6 @@ func (t *tableSink) FlushRowChangedEvents(ctx context.Context, tableID model.Tab return ckpt, err } -func (t *tableSink) getEmittedTs() uint64 { - return atomic.LoadUint64(&t.emittedTs) -} - func (t *tableSink) EmitCheckpointTs(ctx context.Context, ts uint64) error { // the table sink doesn't receive the checkpoint event return nil diff --git a/tests/integration_tests/http_api/util/test_case.py b/tests/integration_tests/http_api/util/test_case.py index 5206bc9b8ca..382ffe1ab55 100644 --- a/tests/integration_tests/http_api/util/test_case.py +++ b/tests/integration_tests/http_api/util/test_case.py @@ -243,14 +243,19 @@ def list_processor(): # must at least one table is sync will the test success def get_processor(): - url = BASE_URL0 + "/processors" - resp = rq.get(url, cert=CERT, verify=VERIFY) + base_url = BASE_URL0 + "/processors" + resp = rq.get(base_url, cert=CERT, verify=VERIFY) assert resp.status_code == rq.codes.ok data = resp.json()[0] - url = url + "/" + data["changefeed_id"] + "/" + data["capture_id"] + url = base_url + "/" + data["changefeed_id"] + "/" + data["capture_id"] resp = rq.get(url, cert=CERT, verify=VERIFY) assert resp.status_code == rq.codes.ok + # test capture_id error and cdc server no panic + url = base_url + "/" + data["changefeed_id"] + "/" + "non-exist-capture-id" + resp = rq.get(url, cert=CERT, verify=VERIFY) + assert resp.status_code == rq.codes.bad_request + print("pass test: get processors")