From 3d48f0c46cc03d3b699a7db1bc833e20f7cf279b Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Thu, 27 Jan 2022 13:54:42 +0800 Subject: [PATCH 1/6] api,owner(ticdc): return error when api fails Signed-off-by: Neil Shen --- Makefile | 7 +- cdc/api/open.go | 57 +++++--- cdc/api/open_test.go | 238 ++++++++++++++++++++++++++++++---- cdc/api/owner.go | 39 ++++-- cdc/api/status.go | 5 +- cdc/api/util.go | 10 ++ cdc/capture/capture.go | 44 +++++-- cdc/owner/mock/owner_mock.go | 126 ++++++++++++++++++ cdc/owner/owner.go | 198 +++++++++++++++------------- cdc/owner/owner_test.go | 47 ++++--- cdc/owner/status_provider.go | 89 +++++++------ cdc/processor/manager.go | 41 +++--- cdc/processor/manager_test.go | 21 ++- 13 files changed, 679 insertions(+), 243 deletions(-) create mode 100644 cdc/owner/mock/owner_mock.go diff --git a/Makefile b/Makefile index 2b12c40d678..716936866cb 100644 --- a/Makefile +++ b/Makefile @@ -112,7 +112,7 @@ kafka_consumer: install: go install ./... -unit_test: check_failpoint_ctl +unit_test: check_failpoint_ctl generate_mock mkdir -p "$(TEST_DIR)" $(FAILPOINT_ENABLE) @export log_level=error;\ @@ -167,7 +167,7 @@ integration_test_mysql: integration_test_kafka: check_third_party_binary tests/integration_tests/run.sh kafka "$(CASE)" "$(START_AT)" -fmt: tools/bin/gofumports tools/bin/shfmt +fmt: tools/bin/gofumports tools/bin/shfmt generate_mock @echo "gofmt (simplify)" tools/bin/gofumports -l -w $(FILES) 2>&1 | $(FAIL_ON_STDOUT) @echo "run shfmt" @@ -234,6 +234,9 @@ data-flow-diagram: docs/data-flow.dot swagger-spec: tools/bin/swag tools/bin/swag init --parseVendor -generalInfo cdc/api/open.go --output docs/swagger +generate_mock: tools/bin/mockgen + tools/bin/mockgen -source cdc/owner/owner.go -destination cdc/owner/mock/owner_mock.go + clean: go clean -i ./... rm -rf *.out diff --git a/cdc/api/open.go b/cdc/api/open.go index 14f0b29c403..8fbb1388889 100644 --- a/cdc/api/open.go +++ b/cdc/api/open.go @@ -319,11 +319,16 @@ func (h *openAPI) PauseChangefeed(c *gin.Context) { Type: model.AdminStop, } - _ = h.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error { - owner.EnqueueJob(job) + // Use buffered channel to prevernt blocking owner. + done := make(chan error, 1) + _ = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { + owner.EnqueueJob(job, done) return nil }) - + if err := waitDone(ctx, done); err != nil { + c.Error(err) + return + } c.Status(http.StatusAccepted) } @@ -361,11 +366,16 @@ func (h *openAPI) ResumeChangefeed(c *gin.Context) { Type: model.AdminResume, } - _ = h.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error { - owner.EnqueueJob(job) + // Use buffered channel to prevernt blocking owner. + done := make(chan error, 1) + _ = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { + owner.EnqueueJob(job, done) return nil }) - + if err := waitDone(ctx, done); err != nil { + c.Error(err) + return + } c.Status(http.StatusAccepted) } @@ -465,11 +475,16 @@ func (h *openAPI) RemoveChangefeed(c *gin.Context) { Type: model.AdminRemove, } - _ = h.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error { - owner.EnqueueJob(job) + // Use buffered channel to prevernt blocking owner. + done := make(chan error, 1) + _ = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { + owner.EnqueueJob(job, done) return nil }) - + if err := waitDone(ctx, done); err != nil { + c.Error(err) + return + } c.Status(http.StatusAccepted) } @@ -503,11 +518,16 @@ func (h *openAPI) RebalanceTable(c *gin.Context) { return } - _ = h.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error { - owner.TriggerRebalance(changefeedID) + // Use buffered channel to prevernt blocking owner. + done := make(chan error, 1) + _ = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { + owner.TriggerRebalance(changefeedID, done) return nil }) - + if err := waitDone(ctx, done); err != nil { + c.Error(err) + return + } c.Status(http.StatusAccepted) } @@ -557,11 +577,16 @@ func (h *openAPI) MoveTable(c *gin.Context) { return } - _ = h.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error { - owner.ManualSchedule(changefeedID, data.CaptureID, data.TableID) + // Use buffered channel to prevernt blocking owner. + done := make(chan error, 1) + _ = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { + owner.ManualSchedule(changefeedID, data.CaptureID, data.TableID, done) return nil }) - + if err := waitDone(ctx, done); err != nil { + c.Error(err) + return + } c.Status(http.StatusAccepted) } @@ -580,7 +605,7 @@ func (h *openAPI) ResignOwner(c *gin.Context) { return } - _ = h.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error { + _ = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { owner.AsyncStop() return nil }) diff --git a/cdc/api/open_test.go b/cdc/api/open_test.go index 48a1fb53d4d..f7cec1d2270 100644 --- a/cdc/api/open_test.go +++ b/cdc/api/open_test.go @@ -23,9 +23,11 @@ import ( "testing" "github.com/gin-gonic/gin" + "github.com/golang/mock/gomock" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/capture" "github.com/pingcap/tiflow/cdc/model" + mock_owner "github.com/pingcap/tiflow/cdc/owner/mock" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -88,8 +90,7 @@ func (p *mockStatusProvider) GetCaptures(ctx context.Context) ([]*model.CaptureI return args.Get(0).([]*model.CaptureInfo), args.Error(1) } -func newRouter(p *mockStatusProvider) *gin.Engine { - c := capture.NewCapture4Test(true) +func newRouter(c *capture.Capture, p *mockStatusProvider) *gin.Engine { router := gin.New() RegisterOpenAPIRoutes(router, NewOpenAPI4Test(c, p)) return router @@ -139,7 +140,10 @@ func newStatusProvider() *mockStatusProvider { func TestListChangefeed(t *testing.T) { t.Parallel() - router := newRouter(newStatusProvider()) + ctrl := gomock.NewController(t) + mo := mock_owner.NewMockOwner(ctrl) + cp := capture.NewCapture4Test(mo) + router := newRouter(cp, newStatusProvider()) // test list changefeed succeeded api := testCase{url: "/api/v1/changefeeds", method: "GET"} @@ -168,7 +172,10 @@ func TestListChangefeed(t *testing.T) { func TestGetChangefeed(t *testing.T) { t.Parallel() - router := newRouter(newStatusProvider()) + ctrl := gomock.NewController(t) + mo := mock_owner.NewMockOwner(ctrl) + cp := capture.NewCapture4Test(mo) + router := newRouter(cp, newStatusProvider()) // test get changefeed succeeded api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", changeFeedID), method: "GET"} @@ -195,16 +202,33 @@ func TestGetChangefeed(t *testing.T) { func TestPauseChangefeed(t *testing.T) { t.Parallel() - router := newRouter(newStatusProvider()) + ctrl := gomock.NewController(t) + mo := mock_owner.NewMockOwner(ctrl) + cp := capture.NewCapture4Test(mo) + router := newRouter(cp, newStatusProvider()) + // test pause changefeed succeeded + mo.EXPECT(). + EnqueueJob(gomock.Any(), gomock.Any()). + Do(func(adminJob model.AdminJob, done chan<- error) { + require.EqualValues(t, changeFeedID, adminJob.CfID) + require.EqualValues(t, model.AdminStop, adminJob.Type) + close(done) + }) api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/pause", changeFeedID), method: "POST"} w := httptest.NewRecorder() req, _ := http.NewRequest(api.method, api.url, nil) router.ServeHTTP(w, req) require.Equal(t, 202, w.Code) - // test pause changefeed failed - api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/pause", nonExistChangefeedID), method: "POST"} + // test pause changefeed failed from owner side + mo.EXPECT(). + EnqueueJob(gomock.Any(), gomock.Any()). + Do(func(adminJob model.AdminJob, done chan<- error) { + done <- cerror.ErrChangeFeedNotExists.FastGenByArgs(adminJob.CfID) + close(done) + }) + api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/pause", changeFeedID), method: "POST"} w = httptest.NewRecorder() req, _ = http.NewRequest(api.method, api.url, nil) router.ServeHTTP(w, req) @@ -213,20 +237,48 @@ func TestPauseChangefeed(t *testing.T) { err := json.NewDecoder(w.Body).Decode(&respErr) require.Nil(t, err) require.Contains(t, respErr.Error, "changefeed not exists") + + // test pause changefeed failed + api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/pause", nonExistChangefeedID), method: "POST"} + w = httptest.NewRecorder() + req, _ = http.NewRequest(api.method, api.url, nil) + router.ServeHTTP(w, req) + require.Equal(t, 400, w.Code) + respErr = model.HTTPError{} + err = json.NewDecoder(w.Body).Decode(&respErr) + require.Nil(t, err) + require.Contains(t, respErr.Error, "changefeed not exists") } func TestResumeChangefeed(t *testing.T) { t.Parallel() - router := newRouter(newStatusProvider()) + ctrl := gomock.NewController(t) + mo := mock_owner.NewMockOwner(ctrl) + cp := capture.NewCapture4Test(mo) + router := newRouter(cp, newStatusProvider()) + // test resume changefeed succeeded + mo.EXPECT(). + EnqueueJob(gomock.Any(), gomock.Any()). + Do(func(adminJob model.AdminJob, done chan<- error) { + require.EqualValues(t, changeFeedID, adminJob.CfID) + require.EqualValues(t, model.AdminResume, adminJob.Type) + close(done) + }) api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/resume", changeFeedID), method: "POST"} w := httptest.NewRecorder() req, _ := http.NewRequest(api.method, api.url, nil) router.ServeHTTP(w, req) require.Equal(t, 202, w.Code) - // test resume changefeed failed - api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/resume", nonExistChangefeedID), method: "POST"} + // test resume changefeed failed from owner side. + mo.EXPECT(). + EnqueueJob(gomock.Any(), gomock.Any()). + Do(func(adminJob model.AdminJob, done chan<- error) { + done <- cerror.ErrChangeFeedNotExists.FastGenByArgs(adminJob.CfID) + close(done) + }) + api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/resume", changeFeedID), method: "POST"} w = httptest.NewRecorder() req, _ = http.NewRequest(api.method, api.url, nil) router.ServeHTTP(w, req) @@ -235,20 +287,48 @@ func TestResumeChangefeed(t *testing.T) { err := json.NewDecoder(w.Body).Decode(&respErr) require.Nil(t, err) require.Contains(t, respErr.Error, "changefeed not exists") + + // test resume changefeed failed + api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/resume", nonExistChangefeedID), method: "POST"} + w = httptest.NewRecorder() + req, _ = http.NewRequest(api.method, api.url, nil) + router.ServeHTTP(w, req) + require.Equal(t, 400, w.Code) + respErr = model.HTTPError{} + err = json.NewDecoder(w.Body).Decode(&respErr) + require.Nil(t, err) + require.Contains(t, respErr.Error, "changefeed not exists") } func TestRemoveChangefeed(t *testing.T) { t.Parallel() - router := newRouter(newStatusProvider()) + ctrl := gomock.NewController(t) + mo := mock_owner.NewMockOwner(ctrl) + cp := capture.NewCapture4Test(mo) + router := newRouter(cp, newStatusProvider()) + // test remove changefeed succeeded + mo.EXPECT(). + EnqueueJob(gomock.Any(), gomock.Any()). + Do(func(adminJob model.AdminJob, done chan<- error) { + require.EqualValues(t, changeFeedID, adminJob.CfID) + require.EqualValues(t, model.AdminRemove, adminJob.Type) + close(done) + }) api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", changeFeedID), method: "DELETE"} w := httptest.NewRecorder() req, _ := http.NewRequest(api.method, api.url, nil) router.ServeHTTP(w, req) require.Equal(t, 202, w.Code) - // test remove changefeed failed - api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", nonExistChangefeedID), method: "DELETE"} + // test remove changefeed failed from owner size + mo.EXPECT(). + EnqueueJob(gomock.Any(), gomock.Any()). + Do(func(adminJob model.AdminJob, done chan<- error) { + done <- cerror.ErrChangeFeedNotExists.FastGenByArgs(adminJob.CfID) + close(done) + }) + api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", changeFeedID), method: "DELETE"} w = httptest.NewRecorder() req, _ = http.NewRequest(api.method, api.url, nil) router.ServeHTTP(w, req) @@ -257,20 +337,50 @@ func TestRemoveChangefeed(t *testing.T) { err := json.NewDecoder(w.Body).Decode(&respErr) require.Nil(t, err) require.Contains(t, respErr.Error, "changefeed not exists") + + // test remove changefeed failed + api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", nonExistChangefeedID), method: "DELETE"} + w = httptest.NewRecorder() + req, _ = http.NewRequest(api.method, api.url, nil) + router.ServeHTTP(w, req) + require.Equal(t, 400, w.Code) + respErr = model.HTTPError{} + err = json.NewDecoder(w.Body).Decode(&respErr) + require.Nil(t, err) + require.Contains(t, respErr.Error, "changefeed not exists") } func TestRebalanceTable(t *testing.T) { t.Parallel() - router := newRouter(newStatusProvider()) + ctrl := gomock.NewController(t) + mo := mock_owner.NewMockOwner(ctrl) + cp := capture.NewCapture4Test(mo) + router := newRouter(cp, newStatusProvider()) + // test rebalance table succeeded + mo.EXPECT(). + TriggerRebalance(gomock.Any(), gomock.Any()). + Do(func(cfID model.ChangeFeedID, done chan<- error) { + require.EqualValues(t, cfID, changeFeedID) + close(done) + }) api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", changeFeedID), method: "POST"} w := httptest.NewRecorder() req, _ := http.NewRequest(api.method, api.url, nil) router.ServeHTTP(w, req) require.Equal(t, 202, w.Code) - // test rebalance table failed - api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", nonExistChangefeedID), method: "POST"} + // test rebalance table failed from owner side. + mo.EXPECT(). + TriggerRebalance(gomock.Any(), gomock.Any()). + Do(func(cfID model.ChangeFeedID, done chan<- error) { + done <- cerror.ErrChangeFeedNotExists.FastGenByArgs(cfID) + close(done) + }) + api = testCase{ + url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", changeFeedID), + method: "POST", + } w = httptest.NewRecorder() req, _ = http.NewRequest(api.method, api.url, nil) router.ServeHTTP(w, req) @@ -279,11 +389,28 @@ func TestRebalanceTable(t *testing.T) { err := json.NewDecoder(w.Body).Decode(&respErr) require.Nil(t, err) require.Contains(t, respErr.Error, "changefeed not exists") + + // test rebalance table failed + api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", nonExistChangefeedID), method: "POST"} + w = httptest.NewRecorder() + req, _ = http.NewRequest(api.method, api.url, nil) + router.ServeHTTP(w, req) + require.Equal(t, 400, w.Code) + respErr = model.HTTPError{} + err = json.NewDecoder(w.Body).Decode(&respErr) + require.Nil(t, err) + require.Contains(t, respErr.Error, "changefeed not exists") } func TestMoveTable(t *testing.T) { t.Parallel() + ctrl := gomock.NewController(t) + mo := mock_owner.NewMockOwner(ctrl) + cp := capture.NewCapture4Test(mo) + router := newRouter(cp, newStatusProvider()) + + // test move table succeeded data := struct { CaptureID string `json:"capture_id"` TableID int64 `json:"table_id"` @@ -291,22 +418,60 @@ func TestMoveTable(t *testing.T) { b, err := json.Marshal(&data) require.Nil(t, err) body := bytes.NewReader(b) - - router := newRouter(newStatusProvider()) - // test move table succeeded + mo.EXPECT(). + ManualSchedule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Do(func( + cfID model.ChangeFeedID, toCapture model.CaptureID, + tableID model.TableID, done chan<- error, + ) { + require.EqualValues(t, cfID, changeFeedID) + require.EqualValues(t, toCapture, data.CaptureID) + require.EqualValues(t, tableID, data.TableID) + close(done) + }) api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", changeFeedID), method: "POST"} w := httptest.NewRecorder() req, _ := http.NewRequest(api.method, api.url, body) router.ServeHTTP(w, req) require.Equal(t, 202, w.Code) + // test move table failed from owner side. + data = struct { + CaptureID string `json:"capture_id"` + TableID int64 `json:"table_id"` + }{captureID, 1} + b, err = json.Marshal(&data) + require.Nil(t, err) + body = bytes.NewReader(b) + mo.EXPECT(). + ManualSchedule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Do(func( + cfID model.ChangeFeedID, toCapture model.CaptureID, + tableID model.TableID, done chan<- error, + ) { + require.EqualValues(t, cfID, changeFeedID) + require.EqualValues(t, toCapture, data.CaptureID) + require.EqualValues(t, tableID, data.TableID) + done <- cerror.ErrChangeFeedNotExists.FastGenByArgs(cfID) + close(done) + }) + api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", changeFeedID), method: "POST"} + w = httptest.NewRecorder() + req, _ = http.NewRequest(api.method, api.url, body) + router.ServeHTTP(w, req) + require.Equal(t, 400, w.Code) + respErr := model.HTTPError{} + err = json.NewDecoder(w.Body).Decode(&respErr) + require.Nil(t, err) + require.Contains(t, respErr.Error, "changefeed not exists") + // test move table failed api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", nonExistChangefeedID), method: "POST"} w = httptest.NewRecorder() req, _ = http.NewRequest(api.method, api.url, body) router.ServeHTTP(w, req) require.Equal(t, 400, w.Code) - respErr := model.HTTPError{} + respErr = model.HTTPError{} err = json.NewDecoder(w.Body).Decode(&respErr) require.Nil(t, err) require.Contains(t, respErr.Error, "changefeed not exists") @@ -314,8 +479,12 @@ func TestMoveTable(t *testing.T) { func TestResignOwner(t *testing.T) { t.Parallel() - router := newRouter(newStatusProvider()) + ctrl := gomock.NewController(t) + mo := mock_owner.NewMockOwner(ctrl) + cp := capture.NewCapture4Test(mo) + router := newRouter(cp, newStatusProvider()) // test resign owner succeeded + mo.EXPECT().AsyncStop() api := testCase{url: "/api/v1/owner/resign", method: "POST"} w := httptest.NewRecorder() req, _ := http.NewRequest(api.method, api.url, nil) @@ -325,7 +494,10 @@ func TestResignOwner(t *testing.T) { func TestGetProcessor(t *testing.T) { t.Parallel() - router := newRouter(newStatusProvider()) + ctrl := gomock.NewController(t) + mo := mock_owner.NewMockOwner(ctrl) + cp := capture.NewCapture4Test(mo) + router := newRouter(cp, newStatusProvider()) // test get processor succeeded api := testCase{url: fmt.Sprintf("/api/v1/processors/%s/%s", changeFeedID, captureID), method: "GET"} w := httptest.NewRecorder() @@ -351,7 +523,10 @@ func TestGetProcessor(t *testing.T) { func TestListProcessor(t *testing.T) { t.Parallel() - router := newRouter(newStatusProvider()) + ctrl := gomock.NewController(t) + mo := mock_owner.NewMockOwner(ctrl) + cp := capture.NewCapture4Test(mo) + router := newRouter(cp, newStatusProvider()) // test list processor succeeded api := testCase{url: "/api/v1/processors", method: "GET"} w := httptest.NewRecorder() @@ -366,7 +541,10 @@ func TestListProcessor(t *testing.T) { func TestListCapture(t *testing.T) { t.Parallel() - router := newRouter(newStatusProvider()) + ctrl := gomock.NewController(t) + mo := mock_owner.NewMockOwner(ctrl) + cp := capture.NewCapture4Test(mo) + router := newRouter(cp, newStatusProvider()) // test list processor succeeded api := testCase{url: "/api/v1/captures", method: "GET"} w := httptest.NewRecorder() @@ -382,7 +560,10 @@ func TestListCapture(t *testing.T) { func TestServerStatus(t *testing.T) { t.Parallel() // capture is owner - ownerRouter := newRouter(newStatusProvider()) + ctrl := gomock.NewController(t) + mo := mock_owner.NewMockOwner(ctrl) + cp := capture.NewCapture4Test(mo) + ownerRouter := newRouter(cp, newStatusProvider()) api := testCase{url: "/api/v1/status", method: "GET"} w := httptest.NewRecorder() req, _ := http.NewRequest(api.method, api.url, nil) @@ -395,7 +576,7 @@ func TestServerStatus(t *testing.T) { require.True(t, resp.IsOwner) // capture is not owner - c := capture.NewCapture4Test(false) + c := capture.NewCapture4Test(nil) r := gin.New() RegisterOpenAPIRoutes(r, NewOpenAPI4Test(c, nil)) api = testCase{url: "/api/v1/status", method: "GET"} @@ -416,7 +597,10 @@ func TestSetLogLevel(t *testing.T) { data := struct { Level string `json:"log_level"` }{"warn"} - router := newRouter(newStatusProvider()) + ctrl := gomock.NewController(t) + mo := mock_owner.NewMockOwner(ctrl) + cp := capture.NewCapture4Test(mo) + router := newRouter(cp, newStatusProvider()) api := testCase{url: "/api/v1/log", method: "POST"} w := httptest.NewRecorder() b, err := json.Marshal(&data) diff --git a/cdc/api/owner.go b/cdc/api/owner.go index a8b565c4a1a..793962c16e4 100644 --- a/cdc/api/owner.go +++ b/cdc/api/owner.go @@ -112,7 +112,7 @@ func (h *ownerAPI) handleResignOwner(w http.ResponseWriter, req *http.Request) { handleOwnerResp(w, concurrency.ErrElectionNotLeader) return } - err := h.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error { + err := h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { owner.AsyncStop() return nil }) @@ -153,12 +153,17 @@ func (h *ownerAPI) handleChangefeedAdmin(w http.ResponseWriter, req *http.Reques Opts: opts, } - err = h.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error { - owner.EnqueueJob(job) + // Use buffered channel to prevernt blocking owner. + done := make(chan error, 1) + err = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { + owner.EnqueueJob(job, done) return nil }) - - handleOwnerResp(w, err) + if err != nil { + handleOwnerResp(w, err) + return + } + handleOwnerResp(w, waitDone(req.Context(), done)) } func (h *ownerAPI) handleRebalanceTrigger(w http.ResponseWriter, req *http.Request) { @@ -180,12 +185,18 @@ func (h *ownerAPI) handleRebalanceTrigger(w http.ResponseWriter, req *http.Reque return } - err = h.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error { - owner.TriggerRebalance(changefeedID) + // Use buffered channel to prevernt blocking owner. + done := make(chan error, 1) + err = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { + owner.TriggerRebalance(changefeedID, done) return nil }) - handleOwnerResp(w, err) + if err != nil { + handleOwnerResp(w, err) + return + } + handleOwnerResp(w, waitDone(req.Context(), done)) } func (h *ownerAPI) handleMoveTable(w http.ResponseWriter, req *http.Request) { @@ -221,12 +232,18 @@ func (h *ownerAPI) handleMoveTable(w http.ResponseWriter, req *http.Request) { return } - err = h.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error { - owner.ManualSchedule(changefeedID, to, tableID) + // Use buffered channel to prevernt blocking owner. + done := make(chan error, 1) + err = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { + owner.ManualSchedule(changefeedID, to, tableID, done) return nil }) - handleOwnerResp(w, err) + if err != nil { + handleOwnerResp(w, err) + return + } + handleOwnerResp(w, waitDone(req.Context(), done)) } func (h *ownerAPI) handleChangefeedQuery(w http.ResponseWriter, req *http.Request) { diff --git a/cdc/api/status.go b/cdc/api/status.go index eb93bfc5982..e71c1528c03 100644 --- a/cdc/api/status.go +++ b/cdc/api/status.go @@ -60,9 +60,10 @@ func (h *statusAPI) writeEtcdInfo(ctx context.Context, cli *etcd.CDCEtcdClient, } func (h *statusAPI) handleDebugInfo(w http.ResponseWriter, req *http.Request) { - h.capture.WriteDebugInfo(w) + ctx := req.Context() + h.capture.WriteDebugInfo(ctx, w) fmt.Fprintf(w, "\n\n*** etcd info ***:\n\n") - h.writeEtcdInfo(req.Context(), h.capture.EtcdClient, w) + h.writeEtcdInfo(ctx, h.capture.EtcdClient, w) } func (h *statusAPI) handleStatus(w http.ResponseWriter, req *http.Request) { diff --git a/cdc/api/util.go b/cdc/api/util.go index 4baebf54e64..42ef82a3c8c 100644 --- a/cdc/api/util.go +++ b/cdc/api/util.go @@ -14,6 +14,7 @@ package api import ( + "context" "encoding/json" "net/http" "strings" @@ -76,3 +77,12 @@ func writeData(w http.ResponseWriter, data interface{}) { log.Error("fail to write data", zap.Error(err)) } } + +func waitDone(ctx context.Context, done <-chan error) (err error) { + select { + case <-ctx.Done(): + err = ctx.Err() + case err = <-done: + } + return +} diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index b2f8fae43aa..daa16b4c922 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -53,7 +53,7 @@ type Capture struct { info *model.CaptureInfo ownerMu sync.Mutex - owner *owner.Owner + owner owner.Owner processorManager *processor.Manager // session keeps alive between the capture and etcd @@ -88,7 +88,7 @@ type Capture struct { cancel context.CancelFunc newProcessorManager func() *processor.Manager - newOwner func(pd.Client) *owner.Owner + newOwner func(pd.Client) owner.Owner } // NewCapture returns a new Capture instance @@ -107,13 +107,11 @@ func NewCapture(pdClient pd.Client, kvStorage tidbkv.Storage, etcdClient *etcd.C } } -func NewCapture4Test(isOwner bool) *Capture { +func NewCapture4Test(o owner.Owner) *Capture { res := &Capture{ info: &model.CaptureInfo{ID: "capture-for-test", AdvertiseAddr: "127.0.0.1", Version: "test"}, } - if isOwner { - res.owner = &owner.Owner{} - } + res.owner = o return res } @@ -482,14 +480,14 @@ func (c *Capture) runEtcdWorker( return nil } -func (c *Capture) setOwner(owner *owner.Owner) { +func (c *Capture) setOwner(owner owner.Owner) { c.ownerMu.Lock() defer c.ownerMu.Unlock() c.owner = owner } // OperateOwnerUnderLock operates the owner with lock -func (c *Capture) OperateOwnerUnderLock(fn func(*owner.Owner) error) error { +func (c *Capture) OperateOwnerUnderLock(fn func(owner.Owner) error) error { c.ownerMu.Lock() defer c.ownerMu.Unlock() if c.owner == nil { @@ -529,7 +527,7 @@ func (c *Capture) AsyncClose() { defer c.cancel() // Safety: Here we mainly want to stop the owner // and ignore it if the owner does not exist or is not set. - _ = c.OperateOwnerUnderLock(func(o *owner.Owner) error { + _ = c.OperateOwnerUnderLock(func(o owner.Owner) error { o.AsyncStop() return nil }) @@ -571,20 +569,38 @@ func (c *Capture) AsyncClose() { } // WriteDebugInfo writes the debug info into writer. -func (c *Capture) WriteDebugInfo(w io.Writer) { +func (c *Capture) WriteDebugInfo(ctx context.Context, w io.Writer) { + wait := func(done <-chan error) { + var err error + select { + case <-ctx.Done(): + err = ctx.Err() + case err = <-done: + } + if err != nil { + log.Warn("write debug info failed", zap.Error(err)) + } + } // Safety: Because we are mainly outputting information about the owner here, // if the owner does not exist or is not set, the information will not be output. - _ = c.OperateOwnerUnderLock(func(o *owner.Owner) error { + doneOwner := make(chan error, 1) + _ = c.OperateOwnerUnderLock(func(o owner.Owner) error { fmt.Fprintf(w, "\n\n*** owner info ***:\n\n") - o.WriteDebugInfo(w) + o.WriteDebugInfo(w, doneOwner) return nil }) + // wait the debug info printed + wait(doneOwner) + + doneM := make(chan error, 1) c.captureMu.Lock() defer c.captureMu.Unlock() if c.processorManager != nil { fmt.Fprintf(w, "\n\n*** processors info ***:\n\n") - c.processorManager.WriteDebugInfo(w) + c.processorManager.WriteDebugInfo(ctx, w, doneM) } + // wait the debug info printed + wait(doneM) } // IsOwner returns whether the capture is an owner @@ -621,5 +637,5 @@ func (c *Capture) StatusProvider() owner.StatusProvider { if c.owner == nil { return nil } - return c.owner.StatusProvider() + return owner.NewStatusProvider(c.owner) } diff --git a/cdc/owner/mock/owner_mock.go b/cdc/owner/mock/owner_mock.go new file mode 100644 index 00000000000..dfef50ac0db --- /dev/null +++ b/cdc/owner/mock/owner_mock.go @@ -0,0 +1,126 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: cdc/owner/owner.go + +// Package mock_owner is a generated GoMock package. +package mock_owner + +import ( + context "context" + io "io" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + model "github.com/pingcap/tiflow/cdc/model" + owner "github.com/pingcap/tiflow/cdc/owner" + orchestrator "github.com/pingcap/tiflow/pkg/orchestrator" +) + +// MockOwner is a mock of Owner interface. +type MockOwner struct { + ctrl *gomock.Controller + recorder *MockOwnerMockRecorder +} + +// MockOwnerMockRecorder is the mock recorder for MockOwner. +type MockOwnerMockRecorder struct { + mock *MockOwner +} + +// NewMockOwner creates a new mock instance. +func NewMockOwner(ctrl *gomock.Controller) *MockOwner { + mock := &MockOwner{ctrl: ctrl} + mock.recorder = &MockOwnerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockOwner) EXPECT() *MockOwnerMockRecorder { + return m.recorder +} + +// AsyncStop mocks base method. +func (m *MockOwner) AsyncStop() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AsyncStop") +} + +// AsyncStop indicates an expected call of AsyncStop. +func (mr *MockOwnerMockRecorder) AsyncStop() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AsyncStop", reflect.TypeOf((*MockOwner)(nil).AsyncStop)) +} + +// EnqueueJob mocks base method. +func (m *MockOwner) EnqueueJob(adminJob model.AdminJob, done chan<- error) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "EnqueueJob", adminJob, done) +} + +// EnqueueJob indicates an expected call of EnqueueJob. +func (mr *MockOwnerMockRecorder) EnqueueJob(adminJob, done interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnqueueJob", reflect.TypeOf((*MockOwner)(nil).EnqueueJob), adminJob, done) +} + +// ManualSchedule mocks base method. +func (m *MockOwner) ManualSchedule(cfID model.ChangeFeedID, toCapture model.CaptureID, tableID model.TableID, done chan<- error) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ManualSchedule", cfID, toCapture, tableID, done) +} + +// ManualSchedule indicates an expected call of ManualSchedule. +func (mr *MockOwnerMockRecorder) ManualSchedule(cfID, toCapture, tableID, done interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ManualSchedule", reflect.TypeOf((*MockOwner)(nil).ManualSchedule), cfID, toCapture, tableID, done) +} + +// Query mocks base method. +func (m *MockOwner) Query(query *owner.OwnerQuery, done chan<- error) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Query", query, done) +} + +// Query indicates an expected call of Query. +func (mr *MockOwnerMockRecorder) Query(query, done interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Query", reflect.TypeOf((*MockOwner)(nil).Query), query, done) +} + +// Tick mocks base method. +func (m *MockOwner) Tick(ctx context.Context, state orchestrator.ReactorState) (orchestrator.ReactorState, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Tick", ctx, state) + ret0, _ := ret[0].(orchestrator.ReactorState) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Tick indicates an expected call of Tick. +func (mr *MockOwnerMockRecorder) Tick(ctx, state interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Tick", reflect.TypeOf((*MockOwner)(nil).Tick), ctx, state) +} + +// TriggerRebalance mocks base method. +func (m *MockOwner) TriggerRebalance(cfID model.ChangeFeedID, done chan<- error) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "TriggerRebalance", cfID, done) +} + +// TriggerRebalance indicates an expected call of TriggerRebalance. +func (mr *MockOwnerMockRecorder) TriggerRebalance(cfID, done interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TriggerRebalance", reflect.TypeOf((*MockOwner)(nil).TriggerRebalance), cfID, done) +} + +// WriteDebugInfo mocks base method. +func (m *MockOwner) WriteDebugInfo(w io.Writer, done chan<- error) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "WriteDebugInfo", w, done) +} + +// WriteDebugInfo indicates an expected call of WriteDebugInfo. +func (mr *MockOwnerMockRecorder) WriteDebugInfo(w, done interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteDebugInfo", reflect.TypeOf((*MockOwner)(nil).WriteDebugInfo), w, done) +} diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index 53af624c721..8f662ef91aa 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -15,7 +15,6 @@ package owner import ( "context" - "fmt" "io" "math" "sync" @@ -51,30 +50,45 @@ const ( // captures with versions different from that of the owner const versionInconsistentLogRate = 1 +// Export field names for pretty printing. type ownerJob struct { - tp ownerJobType - changefeedID model.ChangeFeedID + Tp ownerJobType + ChangefeedID model.ChangeFeedID // for ManualSchedule only - targetCaptureID model.CaptureID + TargetCaptureID model.CaptureID // for ManualSchedule only - tableID model.TableID + TableID model.TableID // for Admin Job only - adminJob *model.AdminJob + AdminJob *model.AdminJob // for debug info only debugInfoWriter io.Writer // for status provider - query *ownerQuery + query *OwnerQuery - done chan struct{} + done chan<- error } -// Owner manages many changefeeds -// All public functions are THREAD-SAFE, except for Tick, Tick is only used for etcd worker -type Owner struct { +// Owner managers TiCDC cluster. +// +// The interface is thread-safe, except for Tick, it's only used by etcd worker. +type Owner interface { + orchestrator.Reactor + EnqueueJob(adminJob model.AdminJob, done chan<- error) + TriggerRebalance(cfID model.ChangeFeedID, done chan<- error) + ManualSchedule( + cfID model.ChangeFeedID, toCapture model.CaptureID, + tableID model.TableID, done chan<- error, + ) + WriteDebugInfo(w io.Writer, done chan<- error) + Query(query *OwnerQuery, done chan<- error) + AsyncStop() +} + +type ownerImpl struct { changefeeds map[model.ChangeFeedID]*changefeed captures map[model.CaptureID]*model.CaptureInfo @@ -88,15 +102,16 @@ type Owner struct { closed int32 // bootstrapped specifies whether the owner has been initialized. // This will only be done when the owner starts the first Tick. - // NOTICE: Do not use it in a method other than tick unexpectedly, as it is not a thread-safe value. + // NOTICE: Do not use it in a method other than tick unexpectedly, + // as it is not a thread-safe value. bootstrapped bool newChangefeed func(id model.ChangeFeedID, gcManager gc.Manager) *changefeed } // NewOwner creates a new Owner -func NewOwner(pdClient pd.Client) *Owner { - return &Owner{ +func NewOwner(pdClient pd.Client) Owner { + return &ownerImpl{ changefeeds: make(map[model.ChangeFeedID]*changefeed), gcManager: gc.NewManager(pdClient), lastTickTime: time.Now(), @@ -110,8 +125,8 @@ func NewOwner4Test( newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error), newSink func() DDLSink, pdClient pd.Client, -) *Owner { - o := NewOwner(pdClient) +) Owner { + o := NewOwner(pdClient).(*ownerImpl) // Most tests do not need to test bootstrap. o.bootstrapped = true o.newChangefeed = func(id model.ChangeFeedID, gcManager gc.Manager) *changefeed { @@ -121,7 +136,7 @@ func NewOwner4Test( } // Tick implements the Reactor interface -func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState) (nextState orchestrator.ReactorState, err error) { +func (o *ownerImpl) Tick(stdCtx context.Context, rawState orchestrator.ReactorState) (nextState orchestrator.ReactorState, err error) { failpoint.Inject("owner-run-with-error", func() { failpoint.Return(nil, errors.New("owner run with injected error")) }) @@ -142,7 +157,7 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState) // when there are different versions of cdc nodes in the cluster, // the admin job may not be processed all the time. And http api relies on // admin job, which will cause all http api unavailable. - o.handleJobs() + o.HandleJobs() if !o.clusterVersionConsistent(state.Captures) { return state, nil @@ -201,58 +216,65 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState) } // EnqueueJob enqueues an admin job into an internal queue, and the Owner will handle the job in the next tick -func (o *Owner) EnqueueJob(adminJob model.AdminJob) { +// `done` must be buffered to prevernt blocking owner. +func (o *ownerImpl) EnqueueJob(adminJob model.AdminJob, done chan<- error) { o.pushOwnerJob(&ownerJob{ - tp: ownerJobTypeAdminJob, - adminJob: &adminJob, - changefeedID: adminJob.CfID, - done: make(chan struct{}), + Tp: ownerJobTypeAdminJob, + AdminJob: &adminJob, + ChangefeedID: adminJob.CfID, + done: done, }) } // TriggerRebalance triggers a rebalance for the specified changefeed -func (o *Owner) TriggerRebalance(cfID model.ChangeFeedID) { +// `done` must be buffered to prevernt blocking owner. +func (o *ownerImpl) TriggerRebalance(cfID model.ChangeFeedID, done chan<- error) { o.pushOwnerJob(&ownerJob{ - tp: ownerJobTypeRebalance, - changefeedID: cfID, - done: make(chan struct{}), + Tp: ownerJobTypeRebalance, + ChangefeedID: cfID, + done: done, }) } // ManualSchedule moves a table from a capture to another capture -func (o *Owner) ManualSchedule(cfID model.ChangeFeedID, toCapture model.CaptureID, tableID model.TableID) { +// `done` must be buffered to prevernt blocking owner. +func (o *ownerImpl) ManualSchedule( + cfID model.ChangeFeedID, toCapture model.CaptureID, tableID model.TableID, + done chan<- error, +) { o.pushOwnerJob(&ownerJob{ - tp: ownerJobTypeManualSchedule, - changefeedID: cfID, - targetCaptureID: toCapture, - tableID: tableID, - done: make(chan struct{}), + Tp: ownerJobTypeManualSchedule, + ChangefeedID: cfID, + TargetCaptureID: toCapture, + TableID: tableID, + done: done, }) } // WriteDebugInfo writes debug info into the specified http writer -func (o *Owner) WriteDebugInfo(w io.Writer) { - timeout := time.Second * 3 - done := make(chan struct{}) +func (o *ownerImpl) WriteDebugInfo(w io.Writer, done chan<- error) { o.pushOwnerJob(&ownerJob{ - tp: ownerJobTypeDebugInfo, + Tp: ownerJobTypeDebugInfo, debugInfoWriter: w, done: done, }) - // wait the debug info printed - select { - case <-done: - case <-time.After(timeout): - fmt.Fprintf(w, "failed to print debug info for owner\n") - } +} + +// Query writes debug info into the specified http writer +func (o *ownerImpl) Query(query *OwnerQuery, done chan<- error) { + o.pushOwnerJob(&ownerJob{ + Tp: ownerJobTypeQuery, + query: query, + done: done, + }) } // AsyncStop stops the owner asynchronously -func (o *Owner) AsyncStop() { +func (o *ownerImpl) AsyncStop() { atomic.StoreInt32(&o.closed, 1) } -func (o *Owner) cleanUpChangefeed(state *orchestrator.ChangefeedReactorState) { +func (o *ownerImpl) cleanUpChangefeed(state *orchestrator.ChangefeedReactorState) { state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { return nil, info != nil, nil }) @@ -277,7 +299,7 @@ func (o *Owner) cleanUpChangefeed(state *orchestrator.ChangefeedReactorState) { } // Bootstrap checks if the state contains incompatible or incorrect information and tries to fix it. -func (o *Owner) Bootstrap(state *orchestrator.GlobalReactorState) { +func (o *ownerImpl) Bootstrap(state *orchestrator.GlobalReactorState) { log.Info("Start bootstrapping") fixChangefeedInfos(state) } @@ -297,7 +319,7 @@ func fixChangefeedInfos(state *orchestrator.GlobalReactorState) { } } -func (o *Owner) updateMetrics(state *orchestrator.GlobalReactorState) { +func (o *ownerImpl) updateMetrics(state *orchestrator.GlobalReactorState) { // Keep the value of prometheus expression `rate(counter)` = 1 // Please also change alert rule in ticdc.rules.yml when change the expression value. now := time.Now() @@ -321,7 +343,7 @@ func (o *Owner) updateMetrics(state *orchestrator.GlobalReactorState) { } } -func (o *Owner) clusterVersionConsistent(captures map[model.CaptureID]*model.CaptureInfo) bool { +func (o *ownerImpl) clusterVersionConsistent(captures map[model.CaptureID]*model.CaptureInfo) bool { myVersion := version.ReleaseVersion for _, capture := range captures { if myVersion != capture.Version { @@ -335,24 +357,28 @@ func (o *Owner) clusterVersionConsistent(captures map[model.CaptureID]*model.Cap return true } -func (o *Owner) handleJobs() { +// HandleJobs handles changefeed admin jobs and others. +// Exported for tests. +func (o *ownerImpl) HandleJobs() { jobs := o.takeOwnerJobs() for _, job := range jobs { - changefeedID := job.changefeedID + changefeedID := job.ChangefeedID cfReactor, exist := o.changefeeds[changefeedID] - if !exist && job.tp != ownerJobTypeQuery { + if !exist && job.Tp != ownerJobTypeQuery { log.Warn("changefeed not found when handle a job", zap.Reflect("job", job)) + job.done <- cerror.ErrChangeFeedNotExists.FastGenByArgs(job.ChangefeedID) + close(job.done) continue } - switch job.tp { + switch job.Tp { case ownerJobTypeAdminJob: - cfReactor.feedStateManager.PushAdminJob(job.adminJob) + cfReactor.feedStateManager.PushAdminJob(job.AdminJob) case ownerJobTypeManualSchedule: - cfReactor.scheduler.MoveTable(job.tableID, job.targetCaptureID) + cfReactor.scheduler.MoveTable(job.TableID, job.TargetCaptureID) case ownerJobTypeRebalance: cfReactor.scheduler.Rebalance() case ownerJobTypeQuery: - o.handleQueries(job.query) + job.done <- o.handleQueries(job.query) case ownerJobTypeDebugInfo: // TODO: implement this function } @@ -360,9 +386,9 @@ func (o *Owner) handleJobs() { } } -func (o *Owner) handleQueries(query *ownerQuery) { - switch query.tp { - case ownerQueryAllChangeFeedStatuses: +func (o *ownerImpl) handleQueries(query *OwnerQuery) error { + switch query.Tp { + case OwnerQueryAllChangeFeedStatuses: ret := map[model.ChangeFeedID]*model.ChangeFeedStatus{} for cfID, cfReactor := range o.changefeeds { ret[cfID] = &model.ChangeFeedStatus{} @@ -376,8 +402,8 @@ func (o *Owner) handleQueries(query *ownerQuery) { ret[cfID].CheckpointTs = cfReactor.state.Status.CheckpointTs ret[cfID].AdminJobType = cfReactor.state.Status.AdminJobType } - query.data = ret - case ownerQueryAllChangeFeedInfo: + query.Data = ret + case OwnerQueryAllChangeFeedInfo: ret := map[model.ChangeFeedID]*model.ChangeFeedInfo{} for cfID, cfReactor := range o.changefeeds { if cfReactor.state == nil { @@ -390,20 +416,17 @@ func (o *Owner) handleQueries(query *ownerQuery) { var err error ret[cfID], err = cfReactor.state.Info.Clone() if err != nil { - query.err = errors.Trace(err) - return + return errors.Trace(err) } } - query.data = ret - case ownerQueryAllTaskStatuses: - cfReactor, ok := o.changefeeds[query.changeFeedID] + query.Data = ret + case OwnerQueryAllTaskStatuses: + cfReactor, ok := o.changefeeds[query.ChangeFeedID] if !ok { - query.err = cerror.ErrChangeFeedNotExists.GenWithStackByArgs(query.changeFeedID) - return + return cerror.ErrChangeFeedNotExists.GenWithStackByArgs(query.ChangeFeedID) } if cfReactor.state == nil { - query.err = cerror.ErrChangeFeedNotExists.GenWithStackByArgs(query.changeFeedID) - return + return cerror.ErrChangeFeedNotExists.GenWithStackByArgs(query.ChangeFeedID) } var ret map[model.CaptureID]*model.TaskStatus @@ -412,8 +435,7 @@ func (o *Owner) handleQueries(query *ownerQuery) { var err error ret, err = provider.GetTaskStatuses() if err != nil { - query.err = errors.Trace(err) - return + return errors.Trace(err) } } else { ret = map[model.CaptureID]*model.TaskStatus{} @@ -421,12 +443,11 @@ func (o *Owner) handleQueries(query *ownerQuery) { ret[captureID] = taskStatus.Clone() } } - query.data = ret - case ownerQueryTaskPositions: - cfReactor, ok := o.changefeeds[query.changeFeedID] + query.Data = ret + case OwnerQueryTaskPositions: + cfReactor, ok := o.changefeeds[query.ChangeFeedID] if !ok { - query.err = cerror.ErrChangeFeedNotExists.GenWithStackByArgs(query.changeFeedID) - return + return cerror.ErrChangeFeedNotExists.GenWithStackByArgs(query.ChangeFeedID) } var ret map[model.CaptureID]*model.TaskPosition @@ -435,21 +456,19 @@ func (o *Owner) handleQueries(query *ownerQuery) { var err error ret, err = provider.GetTaskPositions() if err != nil { - query.err = errors.Trace(err) - return + return errors.Trace(err) } } else { if cfReactor.state == nil { - query.err = cerror.ErrChangeFeedNotExists.GenWithStackByArgs(query.changeFeedID) - return + return cerror.ErrChangeFeedNotExists.GenWithStackByArgs(query.ChangeFeedID) } ret = map[model.CaptureID]*model.TaskPosition{} for captureID, taskPosition := range cfReactor.state.TaskPositions { ret[captureID] = taskPosition.Clone() } } - query.data = ret - case ownerQueryProcessors: + query.Data = ret + case OwnerQueryProcessors: var ret []*model.ProcInfoSnap for cfID, cfReactor := range o.changefeeds { if cfReactor.state == nil { @@ -462,8 +481,8 @@ func (o *Owner) handleQueries(query *ownerQuery) { }) } } - query.data = ret - case ownerQueryCaptures: + query.Data = ret + case OwnerQueryCaptures: var ret []*model.CaptureInfo for _, captureInfo := range o.captures { ret = append(ret, &model.CaptureInfo{ @@ -472,11 +491,12 @@ func (o *Owner) handleQueries(query *ownerQuery) { Version: captureInfo.Version, }) } - query.data = ret + query.Data = ret } + return nil } -func (o *Owner) takeOwnerJobs() []*ownerJob { +func (o *ownerImpl) takeOwnerJobs() []*ownerJob { o.ownerJobQueueMu.Lock() defer o.ownerJobQueueMu.Unlock() @@ -485,13 +505,13 @@ func (o *Owner) takeOwnerJobs() []*ownerJob { return jobs } -func (o *Owner) pushOwnerJob(job *ownerJob) { +func (o *ownerImpl) pushOwnerJob(job *ownerJob) { o.ownerJobQueueMu.Lock() defer o.ownerJobQueueMu.Unlock() o.ownerJobQueue = append(o.ownerJobQueue, job) } -func (o *Owner) updateGCSafepoint( +func (o *ownerImpl) updateGCSafepoint( ctx context.Context, state *orchestrator.GlobalReactorState, ) error { forceUpdate := false @@ -524,6 +544,6 @@ func (o *Owner) updateGCSafepoint( } // StatusProvider returns a StatusProvider -func (o *Owner) StatusProvider() StatusProvider { +func (o *ownerImpl) StatusProvider() StatusProvider { return &ownerStatusProvider{owner: o} } diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index bcca13f5dad..df23d417e07 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -44,7 +44,7 @@ func (m *mockManager) CheckStaleCheckpointTs( var _ gc.Manager = (*mockManager)(nil) -func createOwner4Test(ctx cdcContext.Context, t *testing.T) (*Owner, *orchestrator.GlobalReactorState, *orchestrator.ReactorStateTester) { +func createOwner4Test(ctx cdcContext.Context, t *testing.T) (*ownerImpl, *orchestrator.GlobalReactorState, *orchestrator.ReactorStateTester) { ctx.GlobalVars().PDClient = &gc.MockPDClient{ UpdateServiceGCSafePointFunc: func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { return safePoint, nil @@ -68,7 +68,7 @@ func createOwner4Test(ctx cdcContext.Context, t *testing.T) (*Owner, *orchestrat captureBytes, err := ctx.GlobalVars().CaptureInfo.Marshal() require.Nil(t, err) tester.MustUpdate(cdcKey.String(), captureBytes) - return owner, state, tester + return owner.(*ownerImpl), state, tester } func TestCreateRemoveChangefeed(t *testing.T) { @@ -128,9 +128,11 @@ func TestCreateRemoveChangefeed(t *testing.T) { require.NotNil(t, err) // this tick create remove changefeed patches - owner.EnqueueJob(removeJob) + done := make(chan error, 1) + owner.EnqueueJob(removeJob, done) _, err = owner.Tick(ctx, state) require.Nil(t, err) + require.Nil(t, <-done) // apply patches and update owner's in memory changefeed states tester.MustApplyPatches() @@ -162,17 +164,20 @@ func TestStopChangefeed(t *testing.T) { require.Nil(t, err) require.Contains(t, owner.changefeeds, changefeedID) // remove changefeed forcibly + done := make(chan error, 1) owner.EnqueueJob(model.AdminJob{ CfID: changefeedID, Type: model.AdminRemove, Opts: &model.AdminJobOption{ ForceRemove: true, }, - }) + }, done) // this tick to clean the leak info fo the removed changefeed _, err = owner.Tick(ctx, state) require.Nil(t, err) + require.Nil(t, <-done) + // this tick to remove the changefeed state in memory tester.MustApplyPatches() _, err = owner.Tick(ctx, state) @@ -301,15 +306,19 @@ func TestAdminJob(t *testing.T) { ctx, cancel := cdcContext.WithCancel(ctx) defer cancel() + done1 := make(chan error, 1) owner, _, _ := createOwner4Test(ctx, t) owner.EnqueueJob(model.AdminJob{ CfID: "test-changefeed1", Type: model.AdminResume, - }) - owner.TriggerRebalance("test-changefeed2") - owner.ManualSchedule("test-changefeed3", "test-caputre1", 10) + }, done1) + done2 := make(chan error, 1) + owner.TriggerRebalance("test-changefeed2", done2) + done3 := make(chan error, 1) + owner.ManualSchedule("test-changefeed3", "test-caputre1", 10, done3) + done4 := make(chan error, 1) var buf bytes.Buffer - owner.WriteDebugInfo(&buf) + owner.WriteDebugInfo(&buf, done4) // remove job.done, it's hard to check deep equals jobs := owner.takeOwnerJobs() @@ -320,22 +329,22 @@ func TestAdminJob(t *testing.T) { } require.Equal(t, jobs, []*ownerJob{ { - tp: ownerJobTypeAdminJob, - adminJob: &model.AdminJob{ + Tp: ownerJobTypeAdminJob, + AdminJob: &model.AdminJob{ CfID: "test-changefeed1", Type: model.AdminResume, }, - changefeedID: "test-changefeed1", + ChangefeedID: "test-changefeed1", }, { - tp: ownerJobTypeRebalance, - changefeedID: "test-changefeed2", + Tp: ownerJobTypeRebalance, + ChangefeedID: "test-changefeed2", }, { - tp: ownerJobTypeManualSchedule, - changefeedID: "test-changefeed3", - targetCaptureID: "test-caputre1", - tableID: 10, + Tp: ownerJobTypeManualSchedule, + ChangefeedID: "test-changefeed3", + TargetCaptureID: "test-caputre1", + TableID: 10, }, { - tp: ownerJobTypeDebugInfo, + Tp: ownerJobTypeDebugInfo, debugInfoWriter: &buf, }, }) @@ -344,7 +353,7 @@ func TestAdminJob(t *testing.T) { func TestUpdateGCSafePoint(t *testing.T) { mockPDClient := &gc.MockPDClient{} - o := NewOwner(mockPDClient) + o := NewOwner(mockPDClient).(*ownerImpl) o.gcManager = gc.NewManager(mockPDClient) ctx := cdcContext.NewBackendContext4Test(true) ctx, cancel := cdcContext.WithCancel(ctx) diff --git a/cdc/owner/status_provider.go b/cdc/owner/status_provider.go index 22ecdb2c072..5e1dd6fd6fa 100644 --- a/cdc/owner/status_provider.go +++ b/cdc/owner/status_provider.go @@ -50,37 +50,41 @@ type StatusProvider interface { GetCaptures(ctx context.Context) ([]*model.CaptureInfo, error) } -type ownerQueryType int32 +type OwnerQueryType int32 const ( - ownerQueryAllChangeFeedStatuses = iota - ownerQueryAllChangeFeedInfo - ownerQueryAllTaskStatuses - ownerQueryTaskPositions - ownerQueryProcessors - ownerQueryCaptures + OwnerQueryAllChangeFeedStatuses = iota + OwnerQueryAllChangeFeedInfo + OwnerQueryAllTaskStatuses + OwnerQueryTaskPositions + OwnerQueryProcessors + OwnerQueryCaptures ) -type ownerQuery struct { - tp ownerQueryType - changeFeedID model.ChangeFeedID +type OwnerQuery struct { + Tp OwnerQueryType + ChangeFeedID model.ChangeFeedID - data interface{} - err error + Data interface{} +} + +// NewStatusProvider returns a new StatusProvider for the owner. +func NewStatusProvider(owner Owner) StatusProvider { + return &ownerStatusProvider{owner: owner} } type ownerStatusProvider struct { - owner *Owner + owner Owner } func (p *ownerStatusProvider) GetAllChangeFeedStatuses(ctx context.Context) (map[model.ChangeFeedID]*model.ChangeFeedStatus, error) { - query := &ownerQuery{ - tp: ownerQueryAllChangeFeedStatuses, + query := &OwnerQuery{ + Tp: OwnerQueryAllChangeFeedStatuses, } if err := p.sendQueryToOwner(ctx, query); err != nil { return nil, errors.Trace(err) } - return query.data.(map[model.ChangeFeedID]*model.ChangeFeedStatus), nil + return query.Data.(map[model.ChangeFeedID]*model.ChangeFeedStatus), nil } func (p *ownerStatusProvider) GetChangeFeedStatus(ctx context.Context, changefeedID model.ChangeFeedID) (*model.ChangeFeedStatus, error) { @@ -96,13 +100,13 @@ func (p *ownerStatusProvider) GetChangeFeedStatus(ctx context.Context, changefee } func (p *ownerStatusProvider) GetAllChangeFeedInfo(ctx context.Context) (map[model.ChangeFeedID]*model.ChangeFeedInfo, error) { - query := &ownerQuery{ - tp: ownerQueryAllChangeFeedInfo, + query := &OwnerQuery{ + Tp: OwnerQueryAllChangeFeedInfo, } if err := p.sendQueryToOwner(ctx, query); err != nil { return nil, errors.Trace(err) } - return query.data.(map[model.ChangeFeedID]*model.ChangeFeedInfo), nil + return query.Data.(map[model.ChangeFeedID]*model.ChangeFeedInfo), nil } func (p *ownerStatusProvider) GetChangeFeedInfo(ctx context.Context, changefeedID model.ChangeFeedID) (*model.ChangeFeedInfo, error) { @@ -118,64 +122,59 @@ func (p *ownerStatusProvider) GetChangeFeedInfo(ctx context.Context, changefeedI } func (p *ownerStatusProvider) GetAllTaskStatuses(ctx context.Context, changefeedID model.ChangeFeedID) (map[model.CaptureID]*model.TaskStatus, error) { - query := &ownerQuery{ - tp: ownerQueryAllTaskStatuses, - changeFeedID: changefeedID, + query := &OwnerQuery{ + Tp: OwnerQueryAllTaskStatuses, + ChangeFeedID: changefeedID, } if err := p.sendQueryToOwner(ctx, query); err != nil { return nil, errors.Trace(err) } - return query.data.(map[model.CaptureID]*model.TaskStatus), nil + return query.Data.(map[model.CaptureID]*model.TaskStatus), nil } func (p *ownerStatusProvider) GetTaskPositions(ctx context.Context, changefeedID model.ChangeFeedID) (map[model.CaptureID]*model.TaskPosition, error) { - query := &ownerQuery{ - tp: ownerQueryTaskPositions, - changeFeedID: changefeedID, + query := &OwnerQuery{ + Tp: OwnerQueryTaskPositions, + ChangeFeedID: changefeedID, } if err := p.sendQueryToOwner(ctx, query); err != nil { return nil, errors.Trace(err) } - return query.data.(map[model.CaptureID]*model.TaskPosition), nil + return query.Data.(map[model.CaptureID]*model.TaskPosition), nil } func (p *ownerStatusProvider) GetProcessors(ctx context.Context) ([]*model.ProcInfoSnap, error) { - query := &ownerQuery{ - tp: ownerQueryProcessors, + query := &OwnerQuery{ + Tp: OwnerQueryProcessors, } if err := p.sendQueryToOwner(ctx, query); err != nil { return nil, errors.Trace(err) } - return query.data.([]*model.ProcInfoSnap), nil + return query.Data.([]*model.ProcInfoSnap), nil } func (p *ownerStatusProvider) GetCaptures(ctx context.Context) ([]*model.CaptureInfo, error) { - query := &ownerQuery{ - tp: ownerQueryCaptures, + query := &OwnerQuery{ + Tp: OwnerQueryCaptures, } if err := p.sendQueryToOwner(ctx, query); err != nil { return nil, errors.Trace(err) } - return query.data.([]*model.CaptureInfo), nil + return query.Data.([]*model.CaptureInfo), nil } -func (p *ownerStatusProvider) sendQueryToOwner(ctx context.Context, query *ownerQuery) error { - doneCh := make(chan struct{}) - job := &ownerJob{ - tp: ownerJobTypeQuery, - query: query, - done: doneCh, - } - p.owner.pushOwnerJob(job) +func (p *ownerStatusProvider) sendQueryToOwner(ctx context.Context, query *OwnerQuery) error { + doneCh := make(chan error, 1) + p.owner.Query(query, doneCh) select { case <-ctx.Done(): return errors.Trace(ctx.Err()) - case <-doneCh: + case err := <-doneCh: + if err != nil { + return errors.Trace(err) + } } - if query.err != nil { - return errors.Trace(query.err) - } return nil } diff --git a/cdc/processor/manager.go b/cdc/processor/manager.go index 935e484bb0e..56132feba32 100644 --- a/cdc/processor/manager.go +++ b/cdc/processor/manager.go @@ -41,7 +41,7 @@ const ( type command struct { tp commandTp payload interface{} - done chan struct{} + done chan<- error } // Manager is a manager of processor, which maintains the state and behavior of processors @@ -141,31 +141,37 @@ func (m *Manager) closeProcessor(changefeedID model.ChangeFeedID) { // AsyncClose sends a close signal to Manager and closing all processors func (m *Manager) AsyncClose() { - m.sendCommand(commandTpClose, nil) + timeout := 3 * time.Second + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + done := make(chan error, 1) + m.sendCommand(ctx, commandTpClose, nil, done) } // WriteDebugInfo write the debug info to Writer -func (m *Manager) WriteDebugInfo(w io.Writer) { - timeout := time.Second * 3 - done := m.sendCommand(commandTpWriteDebugInfo, w) - // wait the debug info printed - select { - case <-done: - case <-time.After(timeout): - fmt.Fprintf(w, "failed to print debug info for processor\n") +func (m *Manager) WriteDebugInfo( + ctx context.Context, w io.Writer, done chan<- error, +) { + err := m.sendCommand(ctx, commandTpWriteDebugInfo, w, done) + if err != nil { + log.Warn("send command commandTpWriteDebugInfo failed", zap.Error(err)) } } -func (m *Manager) sendCommand(tp commandTp, payload interface{}) chan struct{} { - timeout := time.Second * 3 - cmd := &command{tp: tp, payload: payload, done: make(chan struct{})} +// sendCommands sends command to manager. +// `done` is closed upon command completion or sendCommand returns error. +func (m *Manager) sendCommand( + ctx context.Context, tp commandTp, payload interface{}, done chan<- error, +) error { + cmd := &command{tp: tp, payload: payload, done: done} select { + case <-ctx.Done(): + close(done) + return errors.Trace(ctx.Err()) case m.commandQueue <- cmd: - case <-time.After(timeout): - close(cmd.done) - log.Warn("the command queue is full, ignore this command", zap.Any("command", cmd)) + // FIXME: signal EtcdWorker to handle commands ASAP. } - return cmd.done + return nil } func (m *Manager) handleCommand() error { @@ -181,6 +187,7 @@ func (m *Manager) handleCommand() error { for changefeedID := range m.processors { m.closeProcessor(changefeedID) } + // FIXME: we should drain command queue and signal callers an error. return cerrors.ErrReactorFinished case commandTpWriteDebugInfo: w := cmd.payload.(io.Writer) diff --git a/cdc/processor/manager_test.go b/cdc/processor/manager_test.go index 8415af305aa..faa3d88b88a 100644 --- a/cdc/processor/manager_test.go +++ b/cdc/processor/manager_test.go @@ -15,6 +15,7 @@ package processor import ( "bytes" + "context" "fmt" "math" "testing" @@ -170,8 +171,10 @@ func TestDebugInfo(t *testing.T) { s.tester.MustApplyPatches() } }() + doneM := make(chan error, 1) buf := bytes.NewBufferString("") - s.manager.WriteDebugInfo(buf) + s.manager.WriteDebugInfo(ctx, buf, doneM) + <-doneM require.Greater(t, len(buf.String()), 0) s.manager.AsyncClose() <-done @@ -218,3 +221,19 @@ func TestClose(t *testing.T) { s.tester.MustApplyPatches() require.Len(t, s.manager.processors, 0) } + +func TestSendCommandError(t *testing.T) { + m := NewManager() + ctx, cancel := context.WithCancel(context.TODO()) + cancel() + // Use unbuffered channel to stable test. + m.commandQueue = make(chan *command, 0) + done := make(chan error, 1) + err := m.sendCommand(ctx, commandTpClose, nil, done) + require.Error(t, err) + select { + case <-done: + case <-time.After(time.Second): + require.FailNow(t, "done must be closed") + } +} From ed487cc282bf10966dc2654b4d0cc9f2aecb321d Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Thu, 27 Jan 2022 14:44:37 +0800 Subject: [PATCH 2/6] errors(ticdc): remove confusing "key" in error message Signed-off-by: Neil Shen --- errors.toml | 10 +++++----- pkg/errors/errors.go | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/errors.toml b/errors.toml index 8726dc63a35..7e918972366 100755 --- a/errors.toml +++ b/errors.toml @@ -103,7 +103,7 @@ campaign owner failed ["CDC:ErrCaptureNotExist"] error = ''' -capture not exists, key: %s +capture not exists, %s ''' ["CDC:ErrCaptureRegister"] @@ -123,12 +123,12 @@ capture suicide ["CDC:ErrChangeFeedAlreadyExists"] error = ''' -changefeed already exists, key: %s +changefeed already exists, %s ''' ["CDC:ErrChangeFeedNotExists"] error = ''' -changefeed not exists, key: %s +changefeed not exists, %s ''' ["CDC:ErrChangefeedAbnormalState"] @@ -978,12 +978,12 @@ fail to create changefeed because target-ts %d is earlier than start-ts %d ["CDC:ErrTaskPositionNotExists"] error = ''' -task position not exists, key: %s +task position not exists, %s ''' ["CDC:ErrTaskStatusNotExists"] error = ''' -task status not exists, key: %s +task status not exists, %s ''' ["CDC:ErrTiKVEventFeed"] diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 5acab118024..94c26419d7d 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -21,11 +21,11 @@ import ( var ( // kv related errors ErrWriteTsConflict = errors.Normalize("write ts conflict", errors.RFCCodeText("CDC:ErrWriteTsConflict")) - ErrChangeFeedNotExists = errors.Normalize("changefeed not exists, key: %s", errors.RFCCodeText("CDC:ErrChangeFeedNotExists")) - ErrChangeFeedAlreadyExists = errors.Normalize("changefeed already exists, key: %s", errors.RFCCodeText("CDC:ErrChangeFeedAlreadyExists")) - ErrTaskStatusNotExists = errors.Normalize("task status not exists, key: %s", errors.RFCCodeText("CDC:ErrTaskStatusNotExists")) - ErrTaskPositionNotExists = errors.Normalize("task position not exists, key: %s", errors.RFCCodeText("CDC:ErrTaskPositionNotExists")) - ErrCaptureNotExist = errors.Normalize("capture not exists, key: %s", errors.RFCCodeText("CDC:ErrCaptureNotExist")) + ErrChangeFeedNotExists = errors.Normalize("changefeed not exists, %s", errors.RFCCodeText("CDC:ErrChangeFeedNotExists")) + ErrChangeFeedAlreadyExists = errors.Normalize("changefeed already exists, %s", errors.RFCCodeText("CDC:ErrChangeFeedAlreadyExists")) + ErrTaskStatusNotExists = errors.Normalize("task status not exists, %s", errors.RFCCodeText("CDC:ErrTaskStatusNotExists")) + ErrTaskPositionNotExists = errors.Normalize("task position not exists, %s", errors.RFCCodeText("CDC:ErrTaskPositionNotExists")) + ErrCaptureNotExist = errors.Normalize("capture not exists, %s", errors.RFCCodeText("CDC:ErrCaptureNotExist")) ErrGetAllStoresFailed = errors.Normalize("get stores from pd failed", errors.RFCCodeText("CDC:ErrGetAllStoresFailed")) ErrMetaListDatabases = errors.Normalize("meta store list databases", errors.RFCCodeText("CDC:ErrMetaListDatabases")) ErrGRPCDialFailed = errors.Normalize("grpc dial failed", errors.RFCCodeText("CDC:ErrGRPCDialFailed")) From fb216eb18014694e9162f72e259454b189a8dbe4 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Thu, 27 Jan 2022 15:20:24 +0800 Subject: [PATCH 3/6] address lints Signed-off-by: Neil Shen --- cdc/api/open.go | 10 +++---- cdc/api/open_test.go | 54 ++++++++++++++++++++++++++++------- cdc/http_test.go | 4 +-- cdc/owner/mock/owner_mock.go | 2 +- cdc/owner/owner.go | 20 ++++++------- cdc/owner/status_provider.go | 52 +++++++++++++++++++-------------- cdc/processor/manager.go | 5 +++- cdc/processor/manager_test.go | 2 +- cdc/server_test.go | 4 +-- scripts/check-copyright.sh | 2 +- 10 files changed, 99 insertions(+), 56 deletions(-) diff --git a/cdc/api/open.go b/cdc/api/open.go index 8fbb1388889..be658de41c8 100644 --- a/cdc/api/open.go +++ b/cdc/api/open.go @@ -326,7 +326,7 @@ func (h *openAPI) PauseChangefeed(c *gin.Context) { return nil }) if err := waitDone(ctx, done); err != nil { - c.Error(err) + _ = c.Error(err) return } c.Status(http.StatusAccepted) @@ -373,7 +373,7 @@ func (h *openAPI) ResumeChangefeed(c *gin.Context) { return nil }) if err := waitDone(ctx, done); err != nil { - c.Error(err) + _ = c.Error(err) return } c.Status(http.StatusAccepted) @@ -482,7 +482,7 @@ func (h *openAPI) RemoveChangefeed(c *gin.Context) { return nil }) if err := waitDone(ctx, done); err != nil { - c.Error(err) + _ = c.Error(err) return } c.Status(http.StatusAccepted) @@ -525,7 +525,7 @@ func (h *openAPI) RebalanceTable(c *gin.Context) { return nil }) if err := waitDone(ctx, done); err != nil { - c.Error(err) + _ = c.Error(err) return } c.Status(http.StatusAccepted) @@ -584,7 +584,7 @@ func (h *openAPI) MoveTable(c *gin.Context) { return nil }) if err := waitDone(ctx, done); err != nil { - c.Error(err) + _ = c.Error(err) return } c.Status(http.StatusAccepted) diff --git a/cdc/api/open_test.go b/cdc/api/open_test.go index f7cec1d2270..a9e71f13f11 100644 --- a/cdc/api/open_test.go +++ b/cdc/api/open_test.go @@ -109,7 +109,9 @@ func newStatusProvider() *mockStatusProvider { Return(map[model.CaptureID]*model.TaskStatus{captureID: {}}, nil) statusProvider.On("GetTaskPositions", mock.Anything). - Return(map[model.CaptureID]*model.TaskPosition{captureID: {Error: &model.RunningError{Message: "test"}}}, nil) + Return(map[model.CaptureID]*model.TaskPosition{ + captureID: {Error: &model.RunningError{Message: "test"}}, + }, nil) statusProvider.On("GetAllChangeFeedStatuses", mock.Anything). Return(map[model.ChangeFeedID]*model.ChangeFeedStatus{ @@ -239,7 +241,10 @@ func TestPauseChangefeed(t *testing.T) { require.Contains(t, respErr.Error, "changefeed not exists") // test pause changefeed failed - api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/pause", nonExistChangefeedID), method: "POST"} + api = testCase{ + url: fmt.Sprintf("/api/v1/changefeeds/%s/pause", nonExistChangefeedID), + method: "POST", + } w = httptest.NewRecorder() req, _ = http.NewRequest(api.method, api.url, nil) router.ServeHTTP(w, req) @@ -289,7 +294,10 @@ func TestResumeChangefeed(t *testing.T) { require.Contains(t, respErr.Error, "changefeed not exists") // test resume changefeed failed - api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/resume", nonExistChangefeedID), method: "POST"} + api = testCase{ + url: fmt.Sprintf("/api/v1/changefeeds/%s/resume", nonExistChangefeedID), + method: "POST", + } w = httptest.NewRecorder() req, _ = http.NewRequest(api.method, api.url, nil) router.ServeHTTP(w, req) @@ -339,7 +347,10 @@ func TestRemoveChangefeed(t *testing.T) { require.Contains(t, respErr.Error, "changefeed not exists") // test remove changefeed failed - api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", nonExistChangefeedID), method: "DELETE"} + api = testCase{ + url: fmt.Sprintf("/api/v1/changefeeds/%s", nonExistChangefeedID), + method: "DELETE", + } w = httptest.NewRecorder() req, _ = http.NewRequest(api.method, api.url, nil) router.ServeHTTP(w, req) @@ -364,7 +375,10 @@ func TestRebalanceTable(t *testing.T) { require.EqualValues(t, cfID, changeFeedID) close(done) }) - api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", changeFeedID), method: "POST"} + api := testCase{ + url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", changeFeedID), + method: "POST", + } w := httptest.NewRecorder() req, _ := http.NewRequest(api.method, api.url, nil) router.ServeHTTP(w, req) @@ -391,7 +405,10 @@ func TestRebalanceTable(t *testing.T) { require.Contains(t, respErr.Error, "changefeed not exists") // test rebalance table failed - api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", nonExistChangefeedID), method: "POST"} + api = testCase{ + url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", nonExistChangefeedID), + method: "POST", + } w = httptest.NewRecorder() req, _ = http.NewRequest(api.method, api.url, nil) router.ServeHTTP(w, req) @@ -429,7 +446,10 @@ func TestMoveTable(t *testing.T) { require.EqualValues(t, tableID, data.TableID) close(done) }) - api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", changeFeedID), method: "POST"} + api := testCase{ + url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", changeFeedID), + method: "POST", + } w := httptest.NewRecorder() req, _ := http.NewRequest(api.method, api.url, body) router.ServeHTTP(w, req) @@ -455,7 +475,10 @@ func TestMoveTable(t *testing.T) { done <- cerror.ErrChangeFeedNotExists.FastGenByArgs(cfID) close(done) }) - api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", changeFeedID), method: "POST"} + api = testCase{ + url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", changeFeedID), + method: "POST", + } w = httptest.NewRecorder() req, _ = http.NewRequest(api.method, api.url, body) router.ServeHTTP(w, req) @@ -466,7 +489,10 @@ func TestMoveTable(t *testing.T) { require.Contains(t, respErr.Error, "changefeed not exists") // test move table failed - api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", nonExistChangefeedID), method: "POST"} + api = testCase{ + url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", nonExistChangefeedID), + method: "POST", + } w = httptest.NewRecorder() req, _ = http.NewRequest(api.method, api.url, body) router.ServeHTTP(w, req) @@ -499,7 +525,10 @@ func TestGetProcessor(t *testing.T) { cp := capture.NewCapture4Test(mo) router := newRouter(cp, newStatusProvider()) // test get processor succeeded - api := testCase{url: fmt.Sprintf("/api/v1/processors/%s/%s", changeFeedID, captureID), method: "GET"} + api := testCase{ + url: fmt.Sprintf("/api/v1/processors/%s/%s", changeFeedID, captureID), + method: "GET", + } w := httptest.NewRecorder() req, _ := http.NewRequest(api.method, api.url, nil) router.ServeHTTP(w, req) @@ -510,7 +539,10 @@ func TestGetProcessor(t *testing.T) { require.Equal(t, "test", processorDetail.Error.Message) // test get processor fail due to capture ID error - api = testCase{url: fmt.Sprintf("/api/v1/processors/%s/%s", changeFeedID, "non-exist-capture"), method: "GET"} + api = testCase{ + url: fmt.Sprintf("/api/v1/processors/%s/%s", changeFeedID, "non-exist-capture"), + method: "GET", + } w = httptest.NewRecorder() req, _ = http.NewRequest(api.method, api.url, nil) router.ServeHTTP(w, req) diff --git a/cdc/http_test.go b/cdc/http_test.go index 39398a82639..6aaa47f3605 100644 --- a/cdc/http_test.go +++ b/cdc/http_test.go @@ -38,7 +38,7 @@ func (a *testCase) String() string { func TestPProfPath(t *testing.T) { router := gin.New() - RegisterRoutes(router, capture.NewCapture4Test(false), nil) + RegisterRoutes(router, capture.NewCapture4Test(nil), nil) apis := []*testCase{ {"/debug/pprof/", http.MethodGet}, @@ -63,7 +63,7 @@ func TestPProfPath(t *testing.T) { func TestHandleFailpoint(t *testing.T) { router := gin.New() - RegisterRoutes(router, capture.NewCapture4Test(false), nil) + RegisterRoutes(router, capture.NewCapture4Test(nil), nil) fp := "github.com/pingcap/tiflow/cdc/TestHandleFailpoint" uri := fmt.Sprintf("/debug/fail/%s", fp) body := bytes.NewReader([]byte("return(true)")) diff --git a/cdc/owner/mock/owner_mock.go b/cdc/owner/mock/owner_mock.go index dfef50ac0db..ccffc1727a3 100644 --- a/cdc/owner/mock/owner_mock.go +++ b/cdc/owner/mock/owner_mock.go @@ -75,7 +75,7 @@ func (mr *MockOwnerMockRecorder) ManualSchedule(cfID, toCapture, tableID, done i } // Query mocks base method. -func (m *MockOwner) Query(query *owner.OwnerQuery, done chan<- error) { +func (m *MockOwner) Query(query *owner.Query, done chan<- error) { m.ctrl.T.Helper() m.ctrl.Call(m, "Query", query, done) } diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index 8f662ef91aa..6b15f40eabf 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -67,7 +67,7 @@ type ownerJob struct { debugInfoWriter io.Writer // for status provider - query *OwnerQuery + query *Query done chan<- error } @@ -84,7 +84,7 @@ type Owner interface { tableID model.TableID, done chan<- error, ) WriteDebugInfo(w io.Writer, done chan<- error) - Query(query *OwnerQuery, done chan<- error) + Query(query *Query, done chan<- error) AsyncStop() } @@ -261,7 +261,7 @@ func (o *ownerImpl) WriteDebugInfo(w io.Writer, done chan<- error) { } // Query writes debug info into the specified http writer -func (o *ownerImpl) Query(query *OwnerQuery, done chan<- error) { +func (o *ownerImpl) Query(query *Query, done chan<- error) { o.pushOwnerJob(&ownerJob{ Tp: ownerJobTypeQuery, query: query, @@ -386,9 +386,9 @@ func (o *ownerImpl) HandleJobs() { } } -func (o *ownerImpl) handleQueries(query *OwnerQuery) error { +func (o *ownerImpl) handleQueries(query *Query) error { switch query.Tp { - case OwnerQueryAllChangeFeedStatuses: + case QueryAllChangeFeedStatuses: ret := map[model.ChangeFeedID]*model.ChangeFeedStatus{} for cfID, cfReactor := range o.changefeeds { ret[cfID] = &model.ChangeFeedStatus{} @@ -403,7 +403,7 @@ func (o *ownerImpl) handleQueries(query *OwnerQuery) error { ret[cfID].AdminJobType = cfReactor.state.Status.AdminJobType } query.Data = ret - case OwnerQueryAllChangeFeedInfo: + case QueryAllChangeFeedInfo: ret := map[model.ChangeFeedID]*model.ChangeFeedInfo{} for cfID, cfReactor := range o.changefeeds { if cfReactor.state == nil { @@ -420,7 +420,7 @@ func (o *ownerImpl) handleQueries(query *OwnerQuery) error { } } query.Data = ret - case OwnerQueryAllTaskStatuses: + case QueryAllTaskStatuses: cfReactor, ok := o.changefeeds[query.ChangeFeedID] if !ok { return cerror.ErrChangeFeedNotExists.GenWithStackByArgs(query.ChangeFeedID) @@ -444,7 +444,7 @@ func (o *ownerImpl) handleQueries(query *OwnerQuery) error { } } query.Data = ret - case OwnerQueryTaskPositions: + case QueryTaskPositions: cfReactor, ok := o.changefeeds[query.ChangeFeedID] if !ok { return cerror.ErrChangeFeedNotExists.GenWithStackByArgs(query.ChangeFeedID) @@ -468,7 +468,7 @@ func (o *ownerImpl) handleQueries(query *OwnerQuery) error { } } query.Data = ret - case OwnerQueryProcessors: + case QueryProcessors: var ret []*model.ProcInfoSnap for cfID, cfReactor := range o.changefeeds { if cfReactor.state == nil { @@ -482,7 +482,7 @@ func (o *ownerImpl) handleQueries(query *OwnerQuery) error { } } query.Data = ret - case OwnerQueryCaptures: + case QueryCaptures: var ret []*model.CaptureInfo for _, captureInfo := range o.captures { ret = append(ret, &model.CaptureInfo{ diff --git a/cdc/owner/status_provider.go b/cdc/owner/status_provider.go index 5e1dd6fd6fa..54337e796f7 100644 --- a/cdc/owner/status_provider.go +++ b/cdc/owner/status_provider.go @@ -50,19 +50,27 @@ type StatusProvider interface { GetCaptures(ctx context.Context) ([]*model.CaptureInfo, error) } -type OwnerQueryType int32 +// QueryType is the type of different queries. +type QueryType int32 const ( - OwnerQueryAllChangeFeedStatuses = iota - OwnerQueryAllChangeFeedInfo - OwnerQueryAllTaskStatuses - OwnerQueryTaskPositions - OwnerQueryProcessors - OwnerQueryCaptures + // QueryAllChangeFeedStatuses query all changefeed status. + QueryAllChangeFeedStatuses QueryType = iota + // QueryAllChangeFeedInfo is the type of query all changefeed info. + QueryAllChangeFeedInfo + // QueryAllTaskStatuses is the type of query all task statuses. + QueryAllTaskStatuses + // QueryTaskPositions is the type of query task positions. + QueryTaskPositions + // QueryProcessors is the type of query processors. + QueryProcessors + // QueryCaptures is the type of query captures info. + QueryCaptures ) -type OwnerQuery struct { - Tp OwnerQueryType +// Query wraps query command and return results. +type Query struct { + Tp QueryType ChangeFeedID model.ChangeFeedID Data interface{} @@ -78,8 +86,8 @@ type ownerStatusProvider struct { } func (p *ownerStatusProvider) GetAllChangeFeedStatuses(ctx context.Context) (map[model.ChangeFeedID]*model.ChangeFeedStatus, error) { - query := &OwnerQuery{ - Tp: OwnerQueryAllChangeFeedStatuses, + query := &Query{ + Tp: QueryAllChangeFeedStatuses, } if err := p.sendQueryToOwner(ctx, query); err != nil { return nil, errors.Trace(err) @@ -100,8 +108,8 @@ func (p *ownerStatusProvider) GetChangeFeedStatus(ctx context.Context, changefee } func (p *ownerStatusProvider) GetAllChangeFeedInfo(ctx context.Context) (map[model.ChangeFeedID]*model.ChangeFeedInfo, error) { - query := &OwnerQuery{ - Tp: OwnerQueryAllChangeFeedInfo, + query := &Query{ + Tp: QueryAllChangeFeedInfo, } if err := p.sendQueryToOwner(ctx, query); err != nil { return nil, errors.Trace(err) @@ -122,8 +130,8 @@ func (p *ownerStatusProvider) GetChangeFeedInfo(ctx context.Context, changefeedI } func (p *ownerStatusProvider) GetAllTaskStatuses(ctx context.Context, changefeedID model.ChangeFeedID) (map[model.CaptureID]*model.TaskStatus, error) { - query := &OwnerQuery{ - Tp: OwnerQueryAllTaskStatuses, + query := &Query{ + Tp: QueryAllTaskStatuses, ChangeFeedID: changefeedID, } if err := p.sendQueryToOwner(ctx, query); err != nil { @@ -133,8 +141,8 @@ func (p *ownerStatusProvider) GetAllTaskStatuses(ctx context.Context, changefeed } func (p *ownerStatusProvider) GetTaskPositions(ctx context.Context, changefeedID model.ChangeFeedID) (map[model.CaptureID]*model.TaskPosition, error) { - query := &OwnerQuery{ - Tp: OwnerQueryTaskPositions, + query := &Query{ + Tp: QueryTaskPositions, ChangeFeedID: changefeedID, } if err := p.sendQueryToOwner(ctx, query); err != nil { @@ -144,8 +152,8 @@ func (p *ownerStatusProvider) GetTaskPositions(ctx context.Context, changefeedID } func (p *ownerStatusProvider) GetProcessors(ctx context.Context) ([]*model.ProcInfoSnap, error) { - query := &OwnerQuery{ - Tp: OwnerQueryProcessors, + query := &Query{ + Tp: QueryProcessors, } if err := p.sendQueryToOwner(ctx, query); err != nil { return nil, errors.Trace(err) @@ -154,8 +162,8 @@ func (p *ownerStatusProvider) GetProcessors(ctx context.Context) ([]*model.ProcI } func (p *ownerStatusProvider) GetCaptures(ctx context.Context) ([]*model.CaptureInfo, error) { - query := &OwnerQuery{ - Tp: OwnerQueryCaptures, + query := &Query{ + Tp: QueryCaptures, } if err := p.sendQueryToOwner(ctx, query); err != nil { return nil, errors.Trace(err) @@ -163,7 +171,7 @@ func (p *ownerStatusProvider) GetCaptures(ctx context.Context) ([]*model.Capture return query.Data.([]*model.CaptureInfo), nil } -func (p *ownerStatusProvider) sendQueryToOwner(ctx context.Context, query *OwnerQuery) error { +func (p *ownerStatusProvider) sendQueryToOwner(ctx context.Context, query *Query) error { doneCh := make(chan error, 1) p.owner.Query(query, doneCh) diff --git a/cdc/processor/manager.go b/cdc/processor/manager.go index 56132feba32..bd4a93daf80 100644 --- a/cdc/processor/manager.go +++ b/cdc/processor/manager.go @@ -145,7 +145,10 @@ func (m *Manager) AsyncClose() { ctx, cancel := context.WithTimeout(context.TODO(), timeout) defer cancel() done := make(chan error, 1) - m.sendCommand(ctx, commandTpClose, nil, done) + err := m.sendCommand(ctx, commandTpClose, nil, done) + if err != nil { + log.Warn("async close failed", zap.Error(err)) + } } // WriteDebugInfo write the debug info to Writer diff --git a/cdc/processor/manager_test.go b/cdc/processor/manager_test.go index faa3d88b88a..8d0fe700995 100644 --- a/cdc/processor/manager_test.go +++ b/cdc/processor/manager_test.go @@ -227,7 +227,7 @@ func TestSendCommandError(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) cancel() // Use unbuffered channel to stable test. - m.commandQueue = make(chan *command, 0) + m.commandQueue = make(chan *command) done := make(chan error, 1) err := m.sendCommand(ctx, commandTpClose, nil, done) require.Error(t, err) diff --git a/cdc/server_test.go b/cdc/server_test.go index b6eccf3c74b..784b7f15543 100644 --- a/cdc/server_test.go +++ b/cdc/server_test.go @@ -188,7 +188,7 @@ func TestServerTLSWithoutCommonName(t *testing.T) { config.StoreGlobalServerConfig(conf) server, err := NewServer([]string{"https://127.0.0.1:2379"}) - server.capture = capture.NewCapture4Test(false) + server.capture = capture.NewCapture4Test(nil) require.Nil(t, err) err = server.startStatusHTTP(server.tcpServer.HTTP1Listener()) require.Nil(t, err) @@ -264,7 +264,7 @@ func TestServerTLSWithCommonName(t *testing.T) { config.StoreGlobalServerConfig(conf) server, err := NewServer([]string{"https://127.0.0.1:2379"}) - server.capture = capture.NewCapture4Test(false) + server.capture = capture.NewCapture4Test(nil) require.Nil(t, err) err = server.startStatusHTTP(server.tcpServer.HTTP1Listener()) require.Nil(t, err) diff --git a/scripts/check-copyright.sh b/scripts/check-copyright.sh index 99b8848f15f..bd2d3387a78 100755 --- a/scripts/check-copyright.sh +++ b/scripts/check-copyright.sh @@ -1,4 +1,4 @@ -result=$(find ./ -name "*.go" | grep -vE '\.pb\.go|vendor/|leaktest.go|kv_gen|redo_gen|sink_gen|pbmock|\.pb\.gw\.go|statik.go|openapi/gen\..*\.go|embedded_asserts.go|empty_asserts.go|docs/swagger' | +result=$(find ./ -name "*.go" | grep -vE '\.pb\.go|vendor/|leaktest.go|kv_gen|redo_gen|sink_gen|pbmock|\.pb\.gw\.go|statik.go|openapi/gen\..*\.go|embedded_asserts.go|empty_asserts.go|docs/swagger|owner/mock' | while read -r file_path; do head=$(head -n 1 "$file_path") if [[ ! "$head" =~ Copyright\ 20[0-9][0-9]\ PingCAP,\ Inc\. ]]; then From 1dc276f66f4dc91424ca33d8fbe3b03c61ababd0 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Thu, 27 Jan 2022 16:31:37 +0800 Subject: [PATCH 4/6] fix test Signed-off-by: Neil Shen --- cdc/api/open_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/api/open_test.go b/cdc/api/open_test.go index a9e71f13f11..4bd5c5de6e3 100644 --- a/cdc/api/open_test.go +++ b/cdc/api/open_test.go @@ -550,7 +550,7 @@ func TestGetProcessor(t *testing.T) { httpError := &model.HTTPError{} err = json.NewDecoder(w.Body).Decode(httpError) require.Nil(t, err) - require.Contains(t, httpError.Error, "capture not exists, key: non-exist-capture") + require.Contains(t, httpError.Error, "capture not exists, non-exist-capture") } func TestListProcessor(t *testing.T) { From 427fafba2faac9e7d87f64ebe95185b22b3315d0 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Fri, 28 Jan 2022 16:52:34 +0800 Subject: [PATCH 5/6] address comments Signed-off-by: Neil Shen --- cdc/owner/owner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index 6b15f40eabf..ed79434bc28 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -260,7 +260,7 @@ func (o *ownerImpl) WriteDebugInfo(w io.Writer, done chan<- error) { }) } -// Query writes debug info into the specified http writer +// Query queries owner internal information. func (o *ownerImpl) Query(query *Query, done chan<- error) { o.pushOwnerJob(&ownerJob{ Tp: ownerJobTypeQuery, From 51e2418f0cbd9e4726e7fd851df9fecfca37a7e4 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Mon, 7 Feb 2022 22:59:22 +0800 Subject: [PATCH 6/6] address comments Signed-off-by: Neil Shen --- cdc/api/open.go | 60 ++++++++++-------------------------- cdc/api/open_test.go | 10 +++--- cdc/api/owner.go | 51 +++++++----------------------- cdc/api/util.go | 56 ++++++++++++++++++++++++++++++--- cdc/capture/capture.go | 30 +++++++++--------- cdc/owner/mock/owner_mock.go | 48 ++++++++++++++--------------- cdc/owner/owner.go | 28 ++++++++--------- cdc/owner/owner_test.go | 6 ++-- cdc/processor/manager.go | 2 +- 9 files changed, 140 insertions(+), 151 deletions(-) diff --git a/cdc/api/open.go b/cdc/api/open.go index be658de41c8..feacbd8525d 100644 --- a/cdc/api/open.go +++ b/cdc/api/open.go @@ -90,7 +90,7 @@ func RegisterOpenAPIRoutes(router *gin.Engine, api openAPI) { changefeedGroup.POST("/:changefeed_id/pause", api.PauseChangefeed) changefeedGroup.POST("/:changefeed_id/resume", api.ResumeChangefeed) changefeedGroup.DELETE("/:changefeed_id", api.RemoveChangefeed) - changefeedGroup.POST("/:changefeed_id/tables/rebalance_table", api.RebalanceTable) + changefeedGroup.POST("/:changefeed_id/tables/rebalance_table", api.RebalanceTables) changefeedGroup.POST("/:changefeed_id/tables/move_table", api.MoveTable) // owner API @@ -319,13 +319,7 @@ func (h *openAPI) PauseChangefeed(c *gin.Context) { Type: model.AdminStop, } - // Use buffered channel to prevernt blocking owner. - done := make(chan error, 1) - _ = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { - owner.EnqueueJob(job, done) - return nil - }) - if err := waitDone(ctx, done); err != nil { + if err := handleOwnerJob(ctx, h.capture, job); err != nil { _ = c.Error(err) return } @@ -366,13 +360,7 @@ func (h *openAPI) ResumeChangefeed(c *gin.Context) { Type: model.AdminResume, } - // Use buffered channel to prevernt blocking owner. - done := make(chan error, 1) - _ = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { - owner.EnqueueJob(job, done) - return nil - }) - if err := waitDone(ctx, done); err != nil { + if err := handleOwnerJob(ctx, h.capture, job); err != nil { _ = c.Error(err) return } @@ -475,20 +463,14 @@ func (h *openAPI) RemoveChangefeed(c *gin.Context) { Type: model.AdminRemove, } - // Use buffered channel to prevernt blocking owner. - done := make(chan error, 1) - _ = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { - owner.EnqueueJob(job, done) - return nil - }) - if err := waitDone(ctx, done); err != nil { + if err := handleOwnerJob(ctx, h.capture, job); err != nil { _ = c.Error(err) return } c.Status(http.StatusAccepted) } -// RebalanceTable rebalances tables +// RebalanceTables rebalances tables // @Summary rebalance tables // @Description rebalance all tables of a changefeed // @Tags changefeed @@ -498,7 +480,7 @@ func (h *openAPI) RemoveChangefeed(c *gin.Context) { // @Success 202 // @Failure 500,400 {object} model.HTTPError // @Router /api/v1/changefeeds/{changefeed_id}/tables/rebalance_table [post] -func (h *openAPI) RebalanceTable(c *gin.Context) { +func (h *openAPI) RebalanceTables(c *gin.Context) { if !h.capture.IsOwner() { h.forwardToOwner(c) return @@ -518,13 +500,7 @@ func (h *openAPI) RebalanceTable(c *gin.Context) { return } - // Use buffered channel to prevernt blocking owner. - done := make(chan error, 1) - _ = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { - owner.TriggerRebalance(changefeedID, done) - return nil - }) - if err := waitDone(ctx, done); err != nil { + if err := handleOwnerRebalance(ctx, h.capture, changefeedID); err != nil { _ = c.Error(err) return } @@ -577,13 +553,9 @@ func (h *openAPI) MoveTable(c *gin.Context) { return } - // Use buffered channel to prevernt blocking owner. - done := make(chan error, 1) - _ = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { - owner.ManualSchedule(changefeedID, data.CaptureID, data.TableID, done) - return nil - }) - if err := waitDone(ctx, done); err != nil { + err = handleOwnerScheduleTable( + ctx, h.capture, changefeedID, data.CaptureID, data.TableID) + if err != nil { _ = c.Error(err) return } @@ -605,10 +577,10 @@ func (h *openAPI) ResignOwner(c *gin.Context) { return } - _ = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { - owner.AsyncStop() - return nil - }) + o, _ := h.capture.GetOwner() + if o != nil { + o.AsyncStop() + } c.Status(http.StatusAccepted) } @@ -773,7 +745,7 @@ func (h *openAPI) ServerStatus(c *gin.Context) { func (h *openAPI) Health(c *gin.Context) { ctx := c.Request.Context() - if _, err := h.capture.GetOwner(ctx); err != nil { + if _, err := h.capture.GetOwnerCaptureInfo(ctx); err != nil { c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) return } @@ -823,7 +795,7 @@ func (h *openAPI) forwardToOwner(c *gin.Context) { var owner *model.CaptureInfo // get owner err := retry.Do(ctx, func() error { - o, err := h.capture.GetOwner(ctx) + o, err := h.capture.GetOwnerCaptureInfo(ctx) if err != nil { log.Info("get owner failed, retry later", zap.Error(err)) return err diff --git a/cdc/api/open_test.go b/cdc/api/open_test.go index 4bd5c5de6e3..6541b63ba80 100644 --- a/cdc/api/open_test.go +++ b/cdc/api/open_test.go @@ -361,7 +361,7 @@ func TestRemoveChangefeed(t *testing.T) { require.Contains(t, respErr.Error, "changefeed not exists") } -func TestRebalanceTable(t *testing.T) { +func TestRebalanceTables(t *testing.T) { t.Parallel() ctrl := gomock.NewController(t) mo := mock_owner.NewMockOwner(ctrl) @@ -370,7 +370,7 @@ func TestRebalanceTable(t *testing.T) { // test rebalance table succeeded mo.EXPECT(). - TriggerRebalance(gomock.Any(), gomock.Any()). + RebalanceTables(gomock.Any(), gomock.Any()). Do(func(cfID model.ChangeFeedID, done chan<- error) { require.EqualValues(t, cfID, changeFeedID) close(done) @@ -386,7 +386,7 @@ func TestRebalanceTable(t *testing.T) { // test rebalance table failed from owner side. mo.EXPECT(). - TriggerRebalance(gomock.Any(), gomock.Any()). + RebalanceTables(gomock.Any(), gomock.Any()). Do(func(cfID model.ChangeFeedID, done chan<- error) { done <- cerror.ErrChangeFeedNotExists.FastGenByArgs(cfID) close(done) @@ -436,7 +436,7 @@ func TestMoveTable(t *testing.T) { require.Nil(t, err) body := bytes.NewReader(b) mo.EXPECT(). - ManualSchedule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + ScheduleTable(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Do(func( cfID model.ChangeFeedID, toCapture model.CaptureID, tableID model.TableID, done chan<- error, @@ -464,7 +464,7 @@ func TestMoveTable(t *testing.T) { require.Nil(t, err) body = bytes.NewReader(b) mo.EXPECT(). - ManualSchedule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + ScheduleTable(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Do(func( cfID model.ChangeFeedID, toCapture model.CaptureID, tableID model.TableID, done chan<- error, diff --git a/cdc/api/owner.go b/cdc/api/owner.go index 793962c16e4..d1275d8ea35 100644 --- a/cdc/api/owner.go +++ b/cdc/api/owner.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/capture" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/owner" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/logutil" "github.com/tikv/client-go/v2/oracle" @@ -112,10 +111,10 @@ func (h *ownerAPI) handleResignOwner(w http.ResponseWriter, req *http.Request) { handleOwnerResp(w, concurrency.ErrElectionNotLeader) return } - err := h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { - owner.AsyncStop() - return nil - }) + o, err := h.capture.GetOwner() + if o != nil { + o.AsyncStop() + } handleOwnerResp(w, err) } @@ -153,17 +152,8 @@ func (h *ownerAPI) handleChangefeedAdmin(w http.ResponseWriter, req *http.Reques Opts: opts, } - // Use buffered channel to prevernt blocking owner. - done := make(chan error, 1) - err = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { - owner.EnqueueJob(job, done) - return nil - }) - if err != nil { - handleOwnerResp(w, err) - return - } - handleOwnerResp(w, waitDone(req.Context(), done)) + err = handleOwnerJob(req.Context(), h.capture, job) + handleOwnerResp(w, err) } func (h *ownerAPI) handleRebalanceTrigger(w http.ResponseWriter, req *http.Request) { @@ -185,18 +175,8 @@ func (h *ownerAPI) handleRebalanceTrigger(w http.ResponseWriter, req *http.Reque return } - // Use buffered channel to prevernt blocking owner. - done := make(chan error, 1) - err = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { - owner.TriggerRebalance(changefeedID, done) - return nil - }) - - if err != nil { - handleOwnerResp(w, err) - return - } - handleOwnerResp(w, waitDone(req.Context(), done)) + err = handleOwnerRebalance(req.Context(), h.capture, changefeedID) + handleOwnerResp(w, err) } func (h *ownerAPI) handleMoveTable(w http.ResponseWriter, req *http.Request) { @@ -232,18 +212,9 @@ func (h *ownerAPI) handleMoveTable(w http.ResponseWriter, req *http.Request) { return } - // Use buffered channel to prevernt blocking owner. - done := make(chan error, 1) - err = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { - owner.ManualSchedule(changefeedID, to, tableID, done) - return nil - }) - - if err != nil { - handleOwnerResp(w, err) - return - } - handleOwnerResp(w, waitDone(req.Context(), done)) + err = handleOwnerScheduleTable( + req.Context(), h.capture, changefeedID, to, tableID) + handleOwnerResp(w, err) } func (h *ownerAPI) handleChangefeedQuery(w http.ResponseWriter, req *http.Request) { diff --git a/cdc/api/util.go b/cdc/api/util.go index 42ef82a3c8c..7f9650972a0 100644 --- a/cdc/api/util.go +++ b/cdc/api/util.go @@ -21,6 +21,8 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/capture" + "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" "go.uber.org/zap" ) @@ -78,11 +80,57 @@ func writeData(w http.ResponseWriter, data interface{}) { } } -func waitDone(ctx context.Context, done <-chan error) (err error) { +func handleOwnerJob( + ctx context.Context, capture *capture.Capture, job model.AdminJob, +) error { + // Use buffered channel to prevernt blocking owner. + done := make(chan error, 1) + o, err := capture.GetOwner() + if err != nil { + return errors.Trace(err) + } + o.EnqueueJob(job, done) + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case err := <-done: + return errors.Trace(err) + } +} + +func handleOwnerRebalance( + ctx context.Context, capture *capture.Capture, changefeedID string, +) error { + // Use buffered channel to prevernt blocking owner. + done := make(chan error, 1) + o, err := capture.GetOwner() + if err != nil { + return errors.Trace(err) + } + o.RebalanceTables(changefeedID, done) + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case err := <-done: + return errors.Trace(err) + } +} + +func handleOwnerScheduleTable( + ctx context.Context, capture *capture.Capture, + changefeedID string, captureID string, tableID int64, +) error { + // Use buffered channel to prevernt blocking owner. + done := make(chan error, 1) + o, err := capture.GetOwner() + if err != nil { + return errors.Trace(err) + } + o.ScheduleTable(changefeedID, captureID, tableID, done) select { case <-ctx.Done(): - err = ctx.Err() - case err = <-done: + return errors.Trace(ctx.Err()) + case err := <-done: + return errors.Trace(err) } - return } diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index daa16b4c922..aa5681e608f 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -486,14 +486,14 @@ func (c *Capture) setOwner(owner owner.Owner) { c.owner = owner } -// OperateOwnerUnderLock operates the owner with lock -func (c *Capture) OperateOwnerUnderLock(fn func(owner.Owner) error) error { +// GetOwner returns owner if it is the owner. +func (c *Capture) GetOwner() (owner.Owner, error) { c.ownerMu.Lock() defer c.ownerMu.Unlock() if c.owner == nil { - return cerror.ErrNotOwner.GenWithStackByArgs() + return nil, cerror.ErrNotOwner.GenWithStackByArgs() } - return fn(c.owner) + return c.owner, nil } // campaign to be an owner. @@ -527,10 +527,10 @@ func (c *Capture) AsyncClose() { defer c.cancel() // Safety: Here we mainly want to stop the owner // and ignore it if the owner does not exist or is not set. - _ = c.OperateOwnerUnderLock(func(o owner.Owner) error { + o, _ := c.GetOwner() + if o != nil { o.AsyncStop() - return nil - }) + } c.captureMu.Lock() defer c.captureMu.Unlock() if c.processorManager != nil { @@ -583,14 +583,14 @@ func (c *Capture) WriteDebugInfo(ctx context.Context, w io.Writer) { } // Safety: Because we are mainly outputting information about the owner here, // if the owner does not exist or is not set, the information will not be output. - doneOwner := make(chan error, 1) - _ = c.OperateOwnerUnderLock(func(o owner.Owner) error { + o, _ := c.GetOwner() + if o != nil { + doneOwner := make(chan error, 1) fmt.Fprintf(w, "\n\n*** owner info ***:\n\n") o.WriteDebugInfo(w, doneOwner) - return nil - }) - // wait the debug info printed - wait(doneOwner) + // wait the debug info printed + wait(doneOwner) + } doneM := make(chan error, 1) c.captureMu.Lock() @@ -610,8 +610,8 @@ func (c *Capture) IsOwner() bool { return c.owner != nil } -// GetOwner return the owner of current TiCDC cluster -func (c *Capture) GetOwner(ctx context.Context) (*model.CaptureInfo, error) { +// GetOwnerCaptureInfo return the owner capture info of current TiCDC cluster +func (c *Capture) GetOwnerCaptureInfo(ctx context.Context) (*model.CaptureInfo, error) { _, captureInfos, err := c.EtcdClient.GetCaptures(ctx) if err != nil { return nil, err diff --git a/cdc/owner/mock/owner_mock.go b/cdc/owner/mock/owner_mock.go index ccffc1727a3..b99c3b1d38f 100644 --- a/cdc/owner/mock/owner_mock.go +++ b/cdc/owner/mock/owner_mock.go @@ -62,18 +62,6 @@ func (mr *MockOwnerMockRecorder) EnqueueJob(adminJob, done interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnqueueJob", reflect.TypeOf((*MockOwner)(nil).EnqueueJob), adminJob, done) } -// ManualSchedule mocks base method. -func (m *MockOwner) ManualSchedule(cfID model.ChangeFeedID, toCapture model.CaptureID, tableID model.TableID, done chan<- error) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "ManualSchedule", cfID, toCapture, tableID, done) -} - -// ManualSchedule indicates an expected call of ManualSchedule. -func (mr *MockOwnerMockRecorder) ManualSchedule(cfID, toCapture, tableID, done interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ManualSchedule", reflect.TypeOf((*MockOwner)(nil).ManualSchedule), cfID, toCapture, tableID, done) -} - // Query mocks base method. func (m *MockOwner) Query(query *owner.Query, done chan<- error) { m.ctrl.T.Helper() @@ -86,6 +74,30 @@ func (mr *MockOwnerMockRecorder) Query(query, done interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Query", reflect.TypeOf((*MockOwner)(nil).Query), query, done) } +// RebalanceTables mocks base method. +func (m *MockOwner) RebalanceTables(cfID model.ChangeFeedID, done chan<- error) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RebalanceTables", cfID, done) +} + +// RebalanceTables indicates an expected call of RebalanceTables. +func (mr *MockOwnerMockRecorder) RebalanceTables(cfID, done interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RebalanceTables", reflect.TypeOf((*MockOwner)(nil).RebalanceTables), cfID, done) +} + +// ScheduleTable mocks base method. +func (m *MockOwner) ScheduleTable(cfID model.ChangeFeedID, toCapture model.CaptureID, tableID model.TableID, done chan<- error) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ScheduleTable", cfID, toCapture, tableID, done) +} + +// ScheduleTable indicates an expected call of ScheduleTable. +func (mr *MockOwnerMockRecorder) ScheduleTable(cfID, toCapture, tableID, done interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScheduleTable", reflect.TypeOf((*MockOwner)(nil).ScheduleTable), cfID, toCapture, tableID, done) +} + // Tick mocks base method. func (m *MockOwner) Tick(ctx context.Context, state orchestrator.ReactorState) (orchestrator.ReactorState, error) { m.ctrl.T.Helper() @@ -101,18 +113,6 @@ func (mr *MockOwnerMockRecorder) Tick(ctx, state interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Tick", reflect.TypeOf((*MockOwner)(nil).Tick), ctx, state) } -// TriggerRebalance mocks base method. -func (m *MockOwner) TriggerRebalance(cfID model.ChangeFeedID, done chan<- error) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "TriggerRebalance", cfID, done) -} - -// TriggerRebalance indicates an expected call of TriggerRebalance. -func (mr *MockOwnerMockRecorder) TriggerRebalance(cfID, done interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TriggerRebalance", reflect.TypeOf((*MockOwner)(nil).TriggerRebalance), cfID, done) -} - // WriteDebugInfo mocks base method. func (m *MockOwner) WriteDebugInfo(w io.Writer, done chan<- error) { m.ctrl.T.Helper() diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index ed79434bc28..70c546e0a17 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -40,7 +40,7 @@ type ownerJobType int // All OwnerJob types const ( ownerJobTypeRebalance ownerJobType = iota - ownerJobTypeManualSchedule + ownerJobTypeScheduleTable ownerJobTypeAdminJob ownerJobTypeDebugInfo ownerJobTypeQuery @@ -55,9 +55,9 @@ type ownerJob struct { Tp ownerJobType ChangefeedID model.ChangeFeedID - // for ManualSchedule only + // for ScheduleTable only TargetCaptureID model.CaptureID - // for ManualSchedule only + // for ScheduleTable only TableID model.TableID // for Admin Job only @@ -78,8 +78,8 @@ type ownerJob struct { type Owner interface { orchestrator.Reactor EnqueueJob(adminJob model.AdminJob, done chan<- error) - TriggerRebalance(cfID model.ChangeFeedID, done chan<- error) - ManualSchedule( + RebalanceTables(cfID model.ChangeFeedID, done chan<- error) + ScheduleTable( cfID model.ChangeFeedID, toCapture model.CaptureID, tableID model.TableID, done chan<- error, ) @@ -157,7 +157,7 @@ func (o *ownerImpl) Tick(stdCtx context.Context, rawState orchestrator.ReactorSt // when there are different versions of cdc nodes in the cluster, // the admin job may not be processed all the time. And http api relies on // admin job, which will cause all http api unavailable. - o.HandleJobs() + o.handleJobs() if !o.clusterVersionConsistent(state.Captures) { return state, nil @@ -226,9 +226,9 @@ func (o *ownerImpl) EnqueueJob(adminJob model.AdminJob, done chan<- error) { }) } -// TriggerRebalance triggers a rebalance for the specified changefeed +// RebalanceTables triggers a rebalance for the specified changefeed // `done` must be buffered to prevernt blocking owner. -func (o *ownerImpl) TriggerRebalance(cfID model.ChangeFeedID, done chan<- error) { +func (o *ownerImpl) RebalanceTables(cfID model.ChangeFeedID, done chan<- error) { o.pushOwnerJob(&ownerJob{ Tp: ownerJobTypeRebalance, ChangefeedID: cfID, @@ -236,14 +236,14 @@ func (o *ownerImpl) TriggerRebalance(cfID model.ChangeFeedID, done chan<- error) }) } -// ManualSchedule moves a table from a capture to another capture +// ScheduleTable moves a table from a capture to another capture // `done` must be buffered to prevernt blocking owner. -func (o *ownerImpl) ManualSchedule( +func (o *ownerImpl) ScheduleTable( cfID model.ChangeFeedID, toCapture model.CaptureID, tableID model.TableID, done chan<- error, ) { o.pushOwnerJob(&ownerJob{ - Tp: ownerJobTypeManualSchedule, + Tp: ownerJobTypeScheduleTable, ChangefeedID: cfID, TargetCaptureID: toCapture, TableID: tableID, @@ -357,9 +357,7 @@ func (o *ownerImpl) clusterVersionConsistent(captures map[model.CaptureID]*model return true } -// HandleJobs handles changefeed admin jobs and others. -// Exported for tests. -func (o *ownerImpl) HandleJobs() { +func (o *ownerImpl) handleJobs() { jobs := o.takeOwnerJobs() for _, job := range jobs { changefeedID := job.ChangefeedID @@ -373,7 +371,7 @@ func (o *ownerImpl) HandleJobs() { switch job.Tp { case ownerJobTypeAdminJob: cfReactor.feedStateManager.PushAdminJob(job.AdminJob) - case ownerJobTypeManualSchedule: + case ownerJobTypeScheduleTable: cfReactor.scheduler.MoveTable(job.TableID, job.TargetCaptureID) case ownerJobTypeRebalance: cfReactor.scheduler.Rebalance() diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index df23d417e07..4bf12827ac3 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -313,9 +313,9 @@ func TestAdminJob(t *testing.T) { Type: model.AdminResume, }, done1) done2 := make(chan error, 1) - owner.TriggerRebalance("test-changefeed2", done2) + owner.RebalanceTables("test-changefeed2", done2) done3 := make(chan error, 1) - owner.ManualSchedule("test-changefeed3", "test-caputre1", 10, done3) + owner.ScheduleTable("test-changefeed3", "test-caputre1", 10, done3) done4 := make(chan error, 1) var buf bytes.Buffer owner.WriteDebugInfo(&buf, done4) @@ -339,7 +339,7 @@ func TestAdminJob(t *testing.T) { Tp: ownerJobTypeRebalance, ChangefeedID: "test-changefeed2", }, { - Tp: ownerJobTypeManualSchedule, + Tp: ownerJobTypeScheduleTable, ChangefeedID: "test-changefeed3", TargetCaptureID: "test-caputre1", TableID: 10, diff --git a/cdc/processor/manager.go b/cdc/processor/manager.go index bd4a93daf80..cb8f3830140 100644 --- a/cdc/processor/manager.go +++ b/cdc/processor/manager.go @@ -139,7 +139,7 @@ func (m *Manager) closeProcessor(changefeedID model.ChangeFeedID) { } } -// AsyncClose sends a close signal to Manager and closing all processors +// AsyncClose sends a signal to Manager to close all processors. func (m *Manager) AsyncClose() { timeout := 3 * time.Second ctx, cancel := context.WithTimeout(context.TODO(), timeout)