Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <[email protected]>
  • Loading branch information
overvenus committed Feb 7, 2022
1 parent 427fafb commit 51e2418
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 151 deletions.
60 changes: 16 additions & 44 deletions cdc/api/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func RegisterOpenAPIRoutes(router *gin.Engine, api openAPI) {
changefeedGroup.POST("/:changefeed_id/pause", api.PauseChangefeed)
changefeedGroup.POST("/:changefeed_id/resume", api.ResumeChangefeed)
changefeedGroup.DELETE("/:changefeed_id", api.RemoveChangefeed)
changefeedGroup.POST("/:changefeed_id/tables/rebalance_table", api.RebalanceTable)
changefeedGroup.POST("/:changefeed_id/tables/rebalance_table", api.RebalanceTables)
changefeedGroup.POST("/:changefeed_id/tables/move_table", api.MoveTable)

// owner API
Expand Down Expand Up @@ -319,13 +319,7 @@ func (h *openAPI) PauseChangefeed(c *gin.Context) {
Type: model.AdminStop,
}

// Use buffered channel to prevernt blocking owner.
done := make(chan error, 1)
_ = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error {
owner.EnqueueJob(job, done)
return nil
})
if err := waitDone(ctx, done); err != nil {
if err := handleOwnerJob(ctx, h.capture, job); err != nil {
_ = c.Error(err)
return
}
Expand Down Expand Up @@ -366,13 +360,7 @@ func (h *openAPI) ResumeChangefeed(c *gin.Context) {
Type: model.AdminResume,
}

// Use buffered channel to prevernt blocking owner.
done := make(chan error, 1)
_ = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error {
owner.EnqueueJob(job, done)
return nil
})
if err := waitDone(ctx, done); err != nil {
if err := handleOwnerJob(ctx, h.capture, job); err != nil {
_ = c.Error(err)
return
}
Expand Down Expand Up @@ -475,20 +463,14 @@ func (h *openAPI) RemoveChangefeed(c *gin.Context) {
Type: model.AdminRemove,
}

// Use buffered channel to prevernt blocking owner.
done := make(chan error, 1)
_ = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error {
owner.EnqueueJob(job, done)
return nil
})
if err := waitDone(ctx, done); err != nil {
if err := handleOwnerJob(ctx, h.capture, job); err != nil {
_ = c.Error(err)
return
}
c.Status(http.StatusAccepted)
}

// RebalanceTable rebalances tables
// RebalanceTables rebalances tables
// @Summary rebalance tables
// @Description rebalance all tables of a changefeed
// @Tags changefeed
Expand All @@ -498,7 +480,7 @@ func (h *openAPI) RemoveChangefeed(c *gin.Context) {
// @Success 202
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/changefeeds/{changefeed_id}/tables/rebalance_table [post]
func (h *openAPI) RebalanceTable(c *gin.Context) {
func (h *openAPI) RebalanceTables(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
Expand All @@ -518,13 +500,7 @@ func (h *openAPI) RebalanceTable(c *gin.Context) {
return
}

// Use buffered channel to prevernt blocking owner.
done := make(chan error, 1)
_ = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error {
owner.TriggerRebalance(changefeedID, done)
return nil
})
if err := waitDone(ctx, done); err != nil {
if err := handleOwnerRebalance(ctx, h.capture, changefeedID); err != nil {
_ = c.Error(err)
return
}
Expand Down Expand Up @@ -577,13 +553,9 @@ func (h *openAPI) MoveTable(c *gin.Context) {
return
}

// Use buffered channel to prevernt blocking owner.
done := make(chan error, 1)
_ = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error {
owner.ManualSchedule(changefeedID, data.CaptureID, data.TableID, done)
return nil
})
if err := waitDone(ctx, done); err != nil {
err = handleOwnerScheduleTable(
ctx, h.capture, changefeedID, data.CaptureID, data.TableID)
if err != nil {
_ = c.Error(err)
return
}
Expand All @@ -605,10 +577,10 @@ func (h *openAPI) ResignOwner(c *gin.Context) {
return
}

_ = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error {
owner.AsyncStop()
return nil
})
o, _ := h.capture.GetOwner()
if o != nil {
o.AsyncStop()
}

c.Status(http.StatusAccepted)
}
Expand Down Expand Up @@ -773,7 +745,7 @@ func (h *openAPI) ServerStatus(c *gin.Context) {
func (h *openAPI) Health(c *gin.Context) {
ctx := c.Request.Context()

if _, err := h.capture.GetOwner(ctx); err != nil {
if _, err := h.capture.GetOwnerCaptureInfo(ctx); err != nil {
c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
return
}
Expand Down Expand Up @@ -823,7 +795,7 @@ func (h *openAPI) forwardToOwner(c *gin.Context) {
var owner *model.CaptureInfo
// get owner
err := retry.Do(ctx, func() error {
o, err := h.capture.GetOwner(ctx)
o, err := h.capture.GetOwnerCaptureInfo(ctx)
if err != nil {
log.Info("get owner failed, retry later", zap.Error(err))
return err
Expand Down
10 changes: 5 additions & 5 deletions cdc/api/open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func TestRemoveChangefeed(t *testing.T) {
require.Contains(t, respErr.Error, "changefeed not exists")
}

func TestRebalanceTable(t *testing.T) {
func TestRebalanceTables(t *testing.T) {
t.Parallel()
ctrl := gomock.NewController(t)
mo := mock_owner.NewMockOwner(ctrl)
Expand All @@ -370,7 +370,7 @@ func TestRebalanceTable(t *testing.T) {

// test rebalance table succeeded
mo.EXPECT().
TriggerRebalance(gomock.Any(), gomock.Any()).
RebalanceTables(gomock.Any(), gomock.Any()).
Do(func(cfID model.ChangeFeedID, done chan<- error) {
require.EqualValues(t, cfID, changeFeedID)
close(done)
Expand All @@ -386,7 +386,7 @@ func TestRebalanceTable(t *testing.T) {

// test rebalance table failed from owner side.
mo.EXPECT().
TriggerRebalance(gomock.Any(), gomock.Any()).
RebalanceTables(gomock.Any(), gomock.Any()).
Do(func(cfID model.ChangeFeedID, done chan<- error) {
done <- cerror.ErrChangeFeedNotExists.FastGenByArgs(cfID)
close(done)
Expand Down Expand Up @@ -436,7 +436,7 @@ func TestMoveTable(t *testing.T) {
require.Nil(t, err)
body := bytes.NewReader(b)
mo.EXPECT().
ManualSchedule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
ScheduleTable(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Do(func(
cfID model.ChangeFeedID, toCapture model.CaptureID,
tableID model.TableID, done chan<- error,
Expand Down Expand Up @@ -464,7 +464,7 @@ func TestMoveTable(t *testing.T) {
require.Nil(t, err)
body = bytes.NewReader(b)
mo.EXPECT().
ManualSchedule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
ScheduleTable(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Do(func(
cfID model.ChangeFeedID, toCapture model.CaptureID,
tableID model.TableID, done chan<- error,
Expand Down
51 changes: 11 additions & 40 deletions cdc/api/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/owner"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/logutil"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -112,10 +111,10 @@ func (h *ownerAPI) handleResignOwner(w http.ResponseWriter, req *http.Request) {
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
return
}
err := h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error {
owner.AsyncStop()
return nil
})
o, err := h.capture.GetOwner()
if o != nil {
o.AsyncStop()
}
handleOwnerResp(w, err)
}

Expand Down Expand Up @@ -153,17 +152,8 @@ func (h *ownerAPI) handleChangefeedAdmin(w http.ResponseWriter, req *http.Reques
Opts: opts,
}

// Use buffered channel to prevernt blocking owner.
done := make(chan error, 1)
err = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error {
owner.EnqueueJob(job, done)
return nil
})
if err != nil {
handleOwnerResp(w, err)
return
}
handleOwnerResp(w, waitDone(req.Context(), done))
err = handleOwnerJob(req.Context(), h.capture, job)
handleOwnerResp(w, err)
}

func (h *ownerAPI) handleRebalanceTrigger(w http.ResponseWriter, req *http.Request) {
Expand All @@ -185,18 +175,8 @@ func (h *ownerAPI) handleRebalanceTrigger(w http.ResponseWriter, req *http.Reque
return
}

// Use buffered channel to prevernt blocking owner.
done := make(chan error, 1)
err = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error {
owner.TriggerRebalance(changefeedID, done)
return nil
})

if err != nil {
handleOwnerResp(w, err)
return
}
handleOwnerResp(w, waitDone(req.Context(), done))
err = handleOwnerRebalance(req.Context(), h.capture, changefeedID)
handleOwnerResp(w, err)
}

func (h *ownerAPI) handleMoveTable(w http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -232,18 +212,9 @@ func (h *ownerAPI) handleMoveTable(w http.ResponseWriter, req *http.Request) {
return
}

// Use buffered channel to prevernt blocking owner.
done := make(chan error, 1)
err = h.capture.OperateOwnerUnderLock(func(owner owner.Owner) error {
owner.ManualSchedule(changefeedID, to, tableID, done)
return nil
})

if err != nil {
handleOwnerResp(w, err)
return
}
handleOwnerResp(w, waitDone(req.Context(), done))
err = handleOwnerScheduleTable(
req.Context(), h.capture, changefeedID, to, tableID)
handleOwnerResp(w, err)
}

func (h *ownerAPI) handleChangefeedQuery(w http.ResponseWriter, req *http.Request) {
Expand Down
56 changes: 52 additions & 4 deletions cdc/api/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -78,11 +80,57 @@ func writeData(w http.ResponseWriter, data interface{}) {
}
}

func waitDone(ctx context.Context, done <-chan error) (err error) {
func handleOwnerJob(
ctx context.Context, capture *capture.Capture, job model.AdminJob,
) error {
// Use buffered channel to prevernt blocking owner.
done := make(chan error, 1)
o, err := capture.GetOwner()
if err != nil {
return errors.Trace(err)
}
o.EnqueueJob(job, done)
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case err := <-done:
return errors.Trace(err)
}
}

func handleOwnerRebalance(
ctx context.Context, capture *capture.Capture, changefeedID string,
) error {
// Use buffered channel to prevernt blocking owner.
done := make(chan error, 1)
o, err := capture.GetOwner()
if err != nil {
return errors.Trace(err)
}
o.RebalanceTables(changefeedID, done)
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case err := <-done:
return errors.Trace(err)
}
}

func handleOwnerScheduleTable(
ctx context.Context, capture *capture.Capture,
changefeedID string, captureID string, tableID int64,
) error {
// Use buffered channel to prevernt blocking owner.
done := make(chan error, 1)
o, err := capture.GetOwner()
if err != nil {
return errors.Trace(err)
}
o.ScheduleTable(changefeedID, captureID, tableID, done)
select {
case <-ctx.Done():
err = ctx.Err()
case err = <-done:
return errors.Trace(ctx.Err())
case err := <-done:
return errors.Trace(err)
}
return
}
Loading

0 comments on commit 51e2418

Please sign in to comment.