From 51e2418f0cbd9e4726e7fd851df9fecfca37a7e4 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Mon, 7 Feb 2022 22:59:22 +0800 Subject: [PATCH] address comments Signed-off-by: Neil Shen --- cdc/api/open.go | 60 ++++++++++-------------------------- cdc/api/open_test.go | 10 +++--- cdc/api/owner.go | 51 +++++++----------------------- cdc/api/util.go | 56 ++++++++++++++++++++++++++++++--- cdc/capture/capture.go | 30 +++++++++--------- cdc/owner/mock/owner_mock.go | 48 ++++++++++++++--------------- cdc/owner/owner.go | 28 ++++++++--------- cdc/owner/owner_test.go | 6 ++-- cdc/processor/manager.go | 2 +- 9 files changed, 140 insertions(+), 151 deletions(-) diff --git a/cdc/api/open.go b/cdc/api/open.go index be658de41c8..feacbd8525d 100644 --- a/cdc/api/open.go +++ b/cdc/api/open.go @@ -90,7 +90,7 @@ func RegisterOpenAPIRoutes(router *gin.Engine, api openAPI) { changefeedGroup.POST("/:changefeed_id/pause", api.PauseChangefeed) changefeedGroup.POST("/:changefeed_id/resume", api.ResumeChangefeed) changefeedGroup.DELETE("/:changefeed_id", api.RemoveChangefeed) - changefeedGroup.POST("/:changefeed_id/tables/rebalance_table", api.RebalanceTable) + changefeedGroup.POST("/:changefeed_id/tables/rebalance_table", api.RebalanceTables) changefeedGroup.POST("/:changefeed_id/tables/move_table", api.MoveTable) // owner API @@ -319,13 +319,7 @@ func (h *openAPI) PauseChangefeed(c *gin.Context) { Type: model.AdminStop, } - // Use buffered channel to prevernt blocking owner. - done := make(chan error, 1) - _ = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { - owner.EnqueueJob(job, done) - return nil - }) - if err := waitDone(ctx, done); err != nil { + if err := handleOwnerJob(ctx, h.capture, job); err != nil { _ = c.Error(err) return } @@ -366,13 +360,7 @@ func (h *openAPI) ResumeChangefeed(c *gin.Context) { Type: model.AdminResume, } - // Use buffered channel to prevernt blocking owner. - done := make(chan error, 1) - _ = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { - owner.EnqueueJob(job, done) - return nil - }) - if err := waitDone(ctx, done); err != nil { + if err := handleOwnerJob(ctx, h.capture, job); err != nil { _ = c.Error(err) return } @@ -475,20 +463,14 @@ func (h *openAPI) RemoveChangefeed(c *gin.Context) { Type: model.AdminRemove, } - // Use buffered channel to prevernt blocking owner. - done := make(chan error, 1) - _ = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { - owner.EnqueueJob(job, done) - return nil - }) - if err := waitDone(ctx, done); err != nil { + if err := handleOwnerJob(ctx, h.capture, job); err != nil { _ = c.Error(err) return } c.Status(http.StatusAccepted) } -// RebalanceTable rebalances tables +// RebalanceTables rebalances tables // @Summary rebalance tables // @Description rebalance all tables of a changefeed // @Tags changefeed @@ -498,7 +480,7 @@ func (h *openAPI) RemoveChangefeed(c *gin.Context) { // @Success 202 // @Failure 500,400 {object} model.HTTPError // @Router /api/v1/changefeeds/{changefeed_id}/tables/rebalance_table [post] -func (h *openAPI) RebalanceTable(c *gin.Context) { +func (h *openAPI) RebalanceTables(c *gin.Context) { if !h.capture.IsOwner() { h.forwardToOwner(c) return @@ -518,13 +500,7 @@ func (h *openAPI) RebalanceTable(c *gin.Context) { return } - // Use buffered channel to prevernt blocking owner. - done := make(chan error, 1) - _ = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { - owner.TriggerRebalance(changefeedID, done) - return nil - }) - if err := waitDone(ctx, done); err != nil { + if err := handleOwnerRebalance(ctx, h.capture, changefeedID); err != nil { _ = c.Error(err) return } @@ -577,13 +553,9 @@ func (h *openAPI) MoveTable(c *gin.Context) { return } - // Use buffered channel to prevernt blocking owner. - done := make(chan error, 1) - _ = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { - owner.ManualSchedule(changefeedID, data.CaptureID, data.TableID, done) - return nil - }) - if err := waitDone(ctx, done); err != nil { + err = handleOwnerScheduleTable( + ctx, h.capture, changefeedID, data.CaptureID, data.TableID) + if err != nil { _ = c.Error(err) return } @@ -605,10 +577,10 @@ func (h *openAPI) ResignOwner(c *gin.Context) { return } - _ = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { - owner.AsyncStop() - return nil - }) + o, _ := h.capture.GetOwner() + if o != nil { + o.AsyncStop() + } c.Status(http.StatusAccepted) } @@ -773,7 +745,7 @@ func (h *openAPI) ServerStatus(c *gin.Context) { func (h *openAPI) Health(c *gin.Context) { ctx := c.Request.Context() - if _, err := h.capture.GetOwner(ctx); err != nil { + if _, err := h.capture.GetOwnerCaptureInfo(ctx); err != nil { c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) return } @@ -823,7 +795,7 @@ func (h *openAPI) forwardToOwner(c *gin.Context) { var owner *model.CaptureInfo // get owner err := retry.Do(ctx, func() error { - o, err := h.capture.GetOwner(ctx) + o, err := h.capture.GetOwnerCaptureInfo(ctx) if err != nil { log.Info("get owner failed, retry later", zap.Error(err)) return err diff --git a/cdc/api/open_test.go b/cdc/api/open_test.go index 4bd5c5de6e3..6541b63ba80 100644 --- a/cdc/api/open_test.go +++ b/cdc/api/open_test.go @@ -361,7 +361,7 @@ func TestRemoveChangefeed(t *testing.T) { require.Contains(t, respErr.Error, "changefeed not exists") } -func TestRebalanceTable(t *testing.T) { +func TestRebalanceTables(t *testing.T) { t.Parallel() ctrl := gomock.NewController(t) mo := mock_owner.NewMockOwner(ctrl) @@ -370,7 +370,7 @@ func TestRebalanceTable(t *testing.T) { // test rebalance table succeeded mo.EXPECT(). - TriggerRebalance(gomock.Any(), gomock.Any()). + RebalanceTables(gomock.Any(), gomock.Any()). Do(func(cfID model.ChangeFeedID, done chan<- error) { require.EqualValues(t, cfID, changeFeedID) close(done) @@ -386,7 +386,7 @@ func TestRebalanceTable(t *testing.T) { // test rebalance table failed from owner side. mo.EXPECT(). - TriggerRebalance(gomock.Any(), gomock.Any()). + RebalanceTables(gomock.Any(), gomock.Any()). Do(func(cfID model.ChangeFeedID, done chan<- error) { done <- cerror.ErrChangeFeedNotExists.FastGenByArgs(cfID) close(done) @@ -436,7 +436,7 @@ func TestMoveTable(t *testing.T) { require.Nil(t, err) body := bytes.NewReader(b) mo.EXPECT(). - ManualSchedule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + ScheduleTable(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Do(func( cfID model.ChangeFeedID, toCapture model.CaptureID, tableID model.TableID, done chan<- error, @@ -464,7 +464,7 @@ func TestMoveTable(t *testing.T) { require.Nil(t, err) body = bytes.NewReader(b) mo.EXPECT(). - ManualSchedule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + ScheduleTable(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Do(func( cfID model.ChangeFeedID, toCapture model.CaptureID, tableID model.TableID, done chan<- error, diff --git a/cdc/api/owner.go b/cdc/api/owner.go index 793962c16e4..d1275d8ea35 100644 --- a/cdc/api/owner.go +++ b/cdc/api/owner.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/capture" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/owner" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/logutil" "github.com/tikv/client-go/v2/oracle" @@ -112,10 +111,10 @@ func (h *ownerAPI) handleResignOwner(w http.ResponseWriter, req *http.Request) { handleOwnerResp(w, concurrency.ErrElectionNotLeader) return } - err := h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { - owner.AsyncStop() - return nil - }) + o, err := h.capture.GetOwner() + if o != nil { + o.AsyncStop() + } handleOwnerResp(w, err) } @@ -153,17 +152,8 @@ func (h *ownerAPI) handleChangefeedAdmin(w http.ResponseWriter, req *http.Reques Opts: opts, } - // Use buffered channel to prevernt blocking owner. - done := make(chan error, 1) - err = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { - owner.EnqueueJob(job, done) - return nil - }) - if err != nil { - handleOwnerResp(w, err) - return - } - handleOwnerResp(w, waitDone(req.Context(), done)) + err = handleOwnerJob(req.Context(), h.capture, job) + handleOwnerResp(w, err) } func (h *ownerAPI) handleRebalanceTrigger(w http.ResponseWriter, req *http.Request) { @@ -185,18 +175,8 @@ func (h *ownerAPI) handleRebalanceTrigger(w http.ResponseWriter, req *http.Reque return } - // Use buffered channel to prevernt blocking owner. - done := make(chan error, 1) - err = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { - owner.TriggerRebalance(changefeedID, done) - return nil - }) - - if err != nil { - handleOwnerResp(w, err) - return - } - handleOwnerResp(w, waitDone(req.Context(), done)) + err = handleOwnerRebalance(req.Context(), h.capture, changefeedID) + handleOwnerResp(w, err) } func (h *ownerAPI) handleMoveTable(w http.ResponseWriter, req *http.Request) { @@ -232,18 +212,9 @@ func (h *ownerAPI) handleMoveTable(w http.ResponseWriter, req *http.Request) { return } - // Use buffered channel to prevernt blocking owner. - done := make(chan error, 1) - err = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error { - owner.ManualSchedule(changefeedID, to, tableID, done) - return nil - }) - - if err != nil { - handleOwnerResp(w, err) - return - } - handleOwnerResp(w, waitDone(req.Context(), done)) + err = handleOwnerScheduleTable( + req.Context(), h.capture, changefeedID, to, tableID) + handleOwnerResp(w, err) } func (h *ownerAPI) handleChangefeedQuery(w http.ResponseWriter, req *http.Request) { diff --git a/cdc/api/util.go b/cdc/api/util.go index 42ef82a3c8c..7f9650972a0 100644 --- a/cdc/api/util.go +++ b/cdc/api/util.go @@ -21,6 +21,8 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/capture" + "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" "go.uber.org/zap" ) @@ -78,11 +80,57 @@ func writeData(w http.ResponseWriter, data interface{}) { } } -func waitDone(ctx context.Context, done <-chan error) (err error) { +func handleOwnerJob( + ctx context.Context, capture *capture.Capture, job model.AdminJob, +) error { + // Use buffered channel to prevernt blocking owner. + done := make(chan error, 1) + o, err := capture.GetOwner() + if err != nil { + return errors.Trace(err) + } + o.EnqueueJob(job, done) + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case err := <-done: + return errors.Trace(err) + } +} + +func handleOwnerRebalance( + ctx context.Context, capture *capture.Capture, changefeedID string, +) error { + // Use buffered channel to prevernt blocking owner. + done := make(chan error, 1) + o, err := capture.GetOwner() + if err != nil { + return errors.Trace(err) + } + o.RebalanceTables(changefeedID, done) + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case err := <-done: + return errors.Trace(err) + } +} + +func handleOwnerScheduleTable( + ctx context.Context, capture *capture.Capture, + changefeedID string, captureID string, tableID int64, +) error { + // Use buffered channel to prevernt blocking owner. + done := make(chan error, 1) + o, err := capture.GetOwner() + if err != nil { + return errors.Trace(err) + } + o.ScheduleTable(changefeedID, captureID, tableID, done) select { case <-ctx.Done(): - err = ctx.Err() - case err = <-done: + return errors.Trace(ctx.Err()) + case err := <-done: + return errors.Trace(err) } - return } diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index daa16b4c922..aa5681e608f 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -486,14 +486,14 @@ func (c *Capture) setOwner(owner owner.Owner) { c.owner = owner } -// OperateOwnerUnderLock operates the owner with lock -func (c *Capture) OperateOwnerUnderLock(fn func(owner.Owner) error) error { +// GetOwner returns owner if it is the owner. +func (c *Capture) GetOwner() (owner.Owner, error) { c.ownerMu.Lock() defer c.ownerMu.Unlock() if c.owner == nil { - return cerror.ErrNotOwner.GenWithStackByArgs() + return nil, cerror.ErrNotOwner.GenWithStackByArgs() } - return fn(c.owner) + return c.owner, nil } // campaign to be an owner. @@ -527,10 +527,10 @@ func (c *Capture) AsyncClose() { defer c.cancel() // Safety: Here we mainly want to stop the owner // and ignore it if the owner does not exist or is not set. - _ = c.OperateOwnerUnderLock(func(o owner.Owner) error { + o, _ := c.GetOwner() + if o != nil { o.AsyncStop() - return nil - }) + } c.captureMu.Lock() defer c.captureMu.Unlock() if c.processorManager != nil { @@ -583,14 +583,14 @@ func (c *Capture) WriteDebugInfo(ctx context.Context, w io.Writer) { } // Safety: Because we are mainly outputting information about the owner here, // if the owner does not exist or is not set, the information will not be output. - doneOwner := make(chan error, 1) - _ = c.OperateOwnerUnderLock(func(o owner.Owner) error { + o, _ := c.GetOwner() + if o != nil { + doneOwner := make(chan error, 1) fmt.Fprintf(w, "\n\n*** owner info ***:\n\n") o.WriteDebugInfo(w, doneOwner) - return nil - }) - // wait the debug info printed - wait(doneOwner) + // wait the debug info printed + wait(doneOwner) + } doneM := make(chan error, 1) c.captureMu.Lock() @@ -610,8 +610,8 @@ func (c *Capture) IsOwner() bool { return c.owner != nil } -// GetOwner return the owner of current TiCDC cluster -func (c *Capture) GetOwner(ctx context.Context) (*model.CaptureInfo, error) { +// GetOwnerCaptureInfo return the owner capture info of current TiCDC cluster +func (c *Capture) GetOwnerCaptureInfo(ctx context.Context) (*model.CaptureInfo, error) { _, captureInfos, err := c.EtcdClient.GetCaptures(ctx) if err != nil { return nil, err diff --git a/cdc/owner/mock/owner_mock.go b/cdc/owner/mock/owner_mock.go index ccffc1727a3..b99c3b1d38f 100644 --- a/cdc/owner/mock/owner_mock.go +++ b/cdc/owner/mock/owner_mock.go @@ -62,18 +62,6 @@ func (mr *MockOwnerMockRecorder) EnqueueJob(adminJob, done interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnqueueJob", reflect.TypeOf((*MockOwner)(nil).EnqueueJob), adminJob, done) } -// ManualSchedule mocks base method. -func (m *MockOwner) ManualSchedule(cfID model.ChangeFeedID, toCapture model.CaptureID, tableID model.TableID, done chan<- error) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "ManualSchedule", cfID, toCapture, tableID, done) -} - -// ManualSchedule indicates an expected call of ManualSchedule. -func (mr *MockOwnerMockRecorder) ManualSchedule(cfID, toCapture, tableID, done interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ManualSchedule", reflect.TypeOf((*MockOwner)(nil).ManualSchedule), cfID, toCapture, tableID, done) -} - // Query mocks base method. func (m *MockOwner) Query(query *owner.Query, done chan<- error) { m.ctrl.T.Helper() @@ -86,6 +74,30 @@ func (mr *MockOwnerMockRecorder) Query(query, done interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Query", reflect.TypeOf((*MockOwner)(nil).Query), query, done) } +// RebalanceTables mocks base method. +func (m *MockOwner) RebalanceTables(cfID model.ChangeFeedID, done chan<- error) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RebalanceTables", cfID, done) +} + +// RebalanceTables indicates an expected call of RebalanceTables. +func (mr *MockOwnerMockRecorder) RebalanceTables(cfID, done interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RebalanceTables", reflect.TypeOf((*MockOwner)(nil).RebalanceTables), cfID, done) +} + +// ScheduleTable mocks base method. +func (m *MockOwner) ScheduleTable(cfID model.ChangeFeedID, toCapture model.CaptureID, tableID model.TableID, done chan<- error) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ScheduleTable", cfID, toCapture, tableID, done) +} + +// ScheduleTable indicates an expected call of ScheduleTable. +func (mr *MockOwnerMockRecorder) ScheduleTable(cfID, toCapture, tableID, done interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScheduleTable", reflect.TypeOf((*MockOwner)(nil).ScheduleTable), cfID, toCapture, tableID, done) +} + // Tick mocks base method. func (m *MockOwner) Tick(ctx context.Context, state orchestrator.ReactorState) (orchestrator.ReactorState, error) { m.ctrl.T.Helper() @@ -101,18 +113,6 @@ func (mr *MockOwnerMockRecorder) Tick(ctx, state interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Tick", reflect.TypeOf((*MockOwner)(nil).Tick), ctx, state) } -// TriggerRebalance mocks base method. -func (m *MockOwner) TriggerRebalance(cfID model.ChangeFeedID, done chan<- error) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "TriggerRebalance", cfID, done) -} - -// TriggerRebalance indicates an expected call of TriggerRebalance. -func (mr *MockOwnerMockRecorder) TriggerRebalance(cfID, done interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TriggerRebalance", reflect.TypeOf((*MockOwner)(nil).TriggerRebalance), cfID, done) -} - // WriteDebugInfo mocks base method. func (m *MockOwner) WriteDebugInfo(w io.Writer, done chan<- error) { m.ctrl.T.Helper() diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index ed79434bc28..70c546e0a17 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -40,7 +40,7 @@ type ownerJobType int // All OwnerJob types const ( ownerJobTypeRebalance ownerJobType = iota - ownerJobTypeManualSchedule + ownerJobTypeScheduleTable ownerJobTypeAdminJob ownerJobTypeDebugInfo ownerJobTypeQuery @@ -55,9 +55,9 @@ type ownerJob struct { Tp ownerJobType ChangefeedID model.ChangeFeedID - // for ManualSchedule only + // for ScheduleTable only TargetCaptureID model.CaptureID - // for ManualSchedule only + // for ScheduleTable only TableID model.TableID // for Admin Job only @@ -78,8 +78,8 @@ type ownerJob struct { type Owner interface { orchestrator.Reactor EnqueueJob(adminJob model.AdminJob, done chan<- error) - TriggerRebalance(cfID model.ChangeFeedID, done chan<- error) - ManualSchedule( + RebalanceTables(cfID model.ChangeFeedID, done chan<- error) + ScheduleTable( cfID model.ChangeFeedID, toCapture model.CaptureID, tableID model.TableID, done chan<- error, ) @@ -157,7 +157,7 @@ func (o *ownerImpl) Tick(stdCtx context.Context, rawState orchestrator.ReactorSt // when there are different versions of cdc nodes in the cluster, // the admin job may not be processed all the time. And http api relies on // admin job, which will cause all http api unavailable. - o.HandleJobs() + o.handleJobs() if !o.clusterVersionConsistent(state.Captures) { return state, nil @@ -226,9 +226,9 @@ func (o *ownerImpl) EnqueueJob(adminJob model.AdminJob, done chan<- error) { }) } -// TriggerRebalance triggers a rebalance for the specified changefeed +// RebalanceTables triggers a rebalance for the specified changefeed // `done` must be buffered to prevernt blocking owner. -func (o *ownerImpl) TriggerRebalance(cfID model.ChangeFeedID, done chan<- error) { +func (o *ownerImpl) RebalanceTables(cfID model.ChangeFeedID, done chan<- error) { o.pushOwnerJob(&ownerJob{ Tp: ownerJobTypeRebalance, ChangefeedID: cfID, @@ -236,14 +236,14 @@ func (o *ownerImpl) TriggerRebalance(cfID model.ChangeFeedID, done chan<- error) }) } -// ManualSchedule moves a table from a capture to another capture +// ScheduleTable moves a table from a capture to another capture // `done` must be buffered to prevernt blocking owner. -func (o *ownerImpl) ManualSchedule( +func (o *ownerImpl) ScheduleTable( cfID model.ChangeFeedID, toCapture model.CaptureID, tableID model.TableID, done chan<- error, ) { o.pushOwnerJob(&ownerJob{ - Tp: ownerJobTypeManualSchedule, + Tp: ownerJobTypeScheduleTable, ChangefeedID: cfID, TargetCaptureID: toCapture, TableID: tableID, @@ -357,9 +357,7 @@ func (o *ownerImpl) clusterVersionConsistent(captures map[model.CaptureID]*model return true } -// HandleJobs handles changefeed admin jobs and others. -// Exported for tests. -func (o *ownerImpl) HandleJobs() { +func (o *ownerImpl) handleJobs() { jobs := o.takeOwnerJobs() for _, job := range jobs { changefeedID := job.ChangefeedID @@ -373,7 +371,7 @@ func (o *ownerImpl) HandleJobs() { switch job.Tp { case ownerJobTypeAdminJob: cfReactor.feedStateManager.PushAdminJob(job.AdminJob) - case ownerJobTypeManualSchedule: + case ownerJobTypeScheduleTable: cfReactor.scheduler.MoveTable(job.TableID, job.TargetCaptureID) case ownerJobTypeRebalance: cfReactor.scheduler.Rebalance() diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index df23d417e07..4bf12827ac3 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -313,9 +313,9 @@ func TestAdminJob(t *testing.T) { Type: model.AdminResume, }, done1) done2 := make(chan error, 1) - owner.TriggerRebalance("test-changefeed2", done2) + owner.RebalanceTables("test-changefeed2", done2) done3 := make(chan error, 1) - owner.ManualSchedule("test-changefeed3", "test-caputre1", 10, done3) + owner.ScheduleTable("test-changefeed3", "test-caputre1", 10, done3) done4 := make(chan error, 1) var buf bytes.Buffer owner.WriteDebugInfo(&buf, done4) @@ -339,7 +339,7 @@ func TestAdminJob(t *testing.T) { Tp: ownerJobTypeRebalance, ChangefeedID: "test-changefeed2", }, { - Tp: ownerJobTypeManualSchedule, + Tp: ownerJobTypeScheduleTable, ChangefeedID: "test-changefeed3", TargetCaptureID: "test-caputre1", TableID: 10, diff --git a/cdc/processor/manager.go b/cdc/processor/manager.go index bd4a93daf80..cb8f3830140 100644 --- a/cdc/processor/manager.go +++ b/cdc/processor/manager.go @@ -139,7 +139,7 @@ func (m *Manager) closeProcessor(changefeedID model.ChangeFeedID) { } } -// AsyncClose sends a close signal to Manager and closing all processors +// AsyncClose sends a signal to Manager to close all processors. func (m *Manager) AsyncClose() { timeout := 3 * time.Second ctx, cancel := context.WithTimeout(context.TODO(), timeout)