Skip to content

Commit

Permalink
address lints
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <[email protected]>
  • Loading branch information
overvenus committed Jan 27, 2022
1 parent ed487cc commit fb216eb
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 56 deletions.
10 changes: 5 additions & 5 deletions cdc/api/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (h *openAPI) PauseChangefeed(c *gin.Context) {
return nil
})
if err := waitDone(ctx, done); err != nil {
c.Error(err)
_ = c.Error(err)
return
}
c.Status(http.StatusAccepted)
Expand Down Expand Up @@ -373,7 +373,7 @@ func (h *openAPI) ResumeChangefeed(c *gin.Context) {
return nil
})
if err := waitDone(ctx, done); err != nil {
c.Error(err)
_ = c.Error(err)
return
}
c.Status(http.StatusAccepted)
Expand Down Expand Up @@ -482,7 +482,7 @@ func (h *openAPI) RemoveChangefeed(c *gin.Context) {
return nil
})
if err := waitDone(ctx, done); err != nil {
c.Error(err)
_ = c.Error(err)
return
}
c.Status(http.StatusAccepted)
Expand Down Expand Up @@ -525,7 +525,7 @@ func (h *openAPI) RebalanceTable(c *gin.Context) {
return nil
})
if err := waitDone(ctx, done); err != nil {
c.Error(err)
_ = c.Error(err)
return
}
c.Status(http.StatusAccepted)
Expand Down Expand Up @@ -584,7 +584,7 @@ func (h *openAPI) MoveTable(c *gin.Context) {
return nil
})
if err := waitDone(ctx, done); err != nil {
c.Error(err)
_ = c.Error(err)
return
}
c.Status(http.StatusAccepted)
Expand Down
54 changes: 43 additions & 11 deletions cdc/api/open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ func newStatusProvider() *mockStatusProvider {
Return(map[model.CaptureID]*model.TaskStatus{captureID: {}}, nil)

statusProvider.On("GetTaskPositions", mock.Anything).
Return(map[model.CaptureID]*model.TaskPosition{captureID: {Error: &model.RunningError{Message: "test"}}}, nil)
Return(map[model.CaptureID]*model.TaskPosition{
captureID: {Error: &model.RunningError{Message: "test"}},
}, nil)

statusProvider.On("GetAllChangeFeedStatuses", mock.Anything).
Return(map[model.ChangeFeedID]*model.ChangeFeedStatus{
Expand Down Expand Up @@ -239,7 +241,10 @@ func TestPauseChangefeed(t *testing.T) {
require.Contains(t, respErr.Error, "changefeed not exists")

// test pause changefeed failed
api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/pause", nonExistChangefeedID), method: "POST"}
api = testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/pause", nonExistChangefeedID),
method: "POST",
}
w = httptest.NewRecorder()
req, _ = http.NewRequest(api.method, api.url, nil)
router.ServeHTTP(w, req)
Expand Down Expand Up @@ -289,7 +294,10 @@ func TestResumeChangefeed(t *testing.T) {
require.Contains(t, respErr.Error, "changefeed not exists")

// test resume changefeed failed
api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/resume", nonExistChangefeedID), method: "POST"}
api = testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/resume", nonExistChangefeedID),
method: "POST",
}
w = httptest.NewRecorder()
req, _ = http.NewRequest(api.method, api.url, nil)
router.ServeHTTP(w, req)
Expand Down Expand Up @@ -339,7 +347,10 @@ func TestRemoveChangefeed(t *testing.T) {
require.Contains(t, respErr.Error, "changefeed not exists")

// test remove changefeed failed
api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", nonExistChangefeedID), method: "DELETE"}
api = testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s", nonExistChangefeedID),
method: "DELETE",
}
w = httptest.NewRecorder()
req, _ = http.NewRequest(api.method, api.url, nil)
router.ServeHTTP(w, req)
Expand All @@ -364,7 +375,10 @@ func TestRebalanceTable(t *testing.T) {
require.EqualValues(t, cfID, changeFeedID)
close(done)
})
api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", changeFeedID), method: "POST"}
api := testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", changeFeedID),
method: "POST",
}
w := httptest.NewRecorder()
req, _ := http.NewRequest(api.method, api.url, nil)
router.ServeHTTP(w, req)
Expand All @@ -391,7 +405,10 @@ func TestRebalanceTable(t *testing.T) {
require.Contains(t, respErr.Error, "changefeed not exists")

// test rebalance table failed
api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", nonExistChangefeedID), method: "POST"}
api = testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", nonExistChangefeedID),
method: "POST",
}
w = httptest.NewRecorder()
req, _ = http.NewRequest(api.method, api.url, nil)
router.ServeHTTP(w, req)
Expand Down Expand Up @@ -429,7 +446,10 @@ func TestMoveTable(t *testing.T) {
require.EqualValues(t, tableID, data.TableID)
close(done)
})
api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", changeFeedID), method: "POST"}
api := testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", changeFeedID),
method: "POST",
}
w := httptest.NewRecorder()
req, _ := http.NewRequest(api.method, api.url, body)
router.ServeHTTP(w, req)
Expand All @@ -455,7 +475,10 @@ func TestMoveTable(t *testing.T) {
done <- cerror.ErrChangeFeedNotExists.FastGenByArgs(cfID)
close(done)
})
api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", changeFeedID), method: "POST"}
api = testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", changeFeedID),
method: "POST",
}
w = httptest.NewRecorder()
req, _ = http.NewRequest(api.method, api.url, body)
router.ServeHTTP(w, req)
Expand All @@ -466,7 +489,10 @@ func TestMoveTable(t *testing.T) {
require.Contains(t, respErr.Error, "changefeed not exists")

// test move table failed
api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", nonExistChangefeedID), method: "POST"}
api = testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", nonExistChangefeedID),
method: "POST",
}
w = httptest.NewRecorder()
req, _ = http.NewRequest(api.method, api.url, body)
router.ServeHTTP(w, req)
Expand Down Expand Up @@ -499,7 +525,10 @@ func TestGetProcessor(t *testing.T) {
cp := capture.NewCapture4Test(mo)
router := newRouter(cp, newStatusProvider())
// test get processor succeeded
api := testCase{url: fmt.Sprintf("/api/v1/processors/%s/%s", changeFeedID, captureID), method: "GET"}
api := testCase{
url: fmt.Sprintf("/api/v1/processors/%s/%s", changeFeedID, captureID),
method: "GET",
}
w := httptest.NewRecorder()
req, _ := http.NewRequest(api.method, api.url, nil)
router.ServeHTTP(w, req)
Expand All @@ -510,7 +539,10 @@ func TestGetProcessor(t *testing.T) {
require.Equal(t, "test", processorDetail.Error.Message)

// test get processor fail due to capture ID error
api = testCase{url: fmt.Sprintf("/api/v1/processors/%s/%s", changeFeedID, "non-exist-capture"), method: "GET"}
api = testCase{
url: fmt.Sprintf("/api/v1/processors/%s/%s", changeFeedID, "non-exist-capture"),
method: "GET",
}
w = httptest.NewRecorder()
req, _ = http.NewRequest(api.method, api.url, nil)
router.ServeHTTP(w, req)
Expand Down
4 changes: 2 additions & 2 deletions cdc/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (a *testCase) String() string {

func TestPProfPath(t *testing.T) {
router := gin.New()
RegisterRoutes(router, capture.NewCapture4Test(false), nil)
RegisterRoutes(router, capture.NewCapture4Test(nil), nil)

apis := []*testCase{
{"/debug/pprof/", http.MethodGet},
Expand All @@ -63,7 +63,7 @@ func TestPProfPath(t *testing.T) {

func TestHandleFailpoint(t *testing.T) {
router := gin.New()
RegisterRoutes(router, capture.NewCapture4Test(false), nil)
RegisterRoutes(router, capture.NewCapture4Test(nil), nil)
fp := "github.com/pingcap/tiflow/cdc/TestHandleFailpoint"
uri := fmt.Sprintf("/debug/fail/%s", fp)
body := bytes.NewReader([]byte("return(true)"))
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/mock/owner_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type ownerJob struct {
debugInfoWriter io.Writer

// for status provider
query *OwnerQuery
query *Query

done chan<- error
}
Expand All @@ -84,7 +84,7 @@ type Owner interface {
tableID model.TableID, done chan<- error,
)
WriteDebugInfo(w io.Writer, done chan<- error)
Query(query *OwnerQuery, done chan<- error)
Query(query *Query, done chan<- error)
AsyncStop()
}

Expand Down Expand Up @@ -261,7 +261,7 @@ func (o *ownerImpl) WriteDebugInfo(w io.Writer, done chan<- error) {
}

// Query writes debug info into the specified http writer
func (o *ownerImpl) Query(query *OwnerQuery, done chan<- error) {
func (o *ownerImpl) Query(query *Query, done chan<- error) {
o.pushOwnerJob(&ownerJob{
Tp: ownerJobTypeQuery,
query: query,
Expand Down Expand Up @@ -386,9 +386,9 @@ func (o *ownerImpl) HandleJobs() {
}
}

func (o *ownerImpl) handleQueries(query *OwnerQuery) error {
func (o *ownerImpl) handleQueries(query *Query) error {
switch query.Tp {
case OwnerQueryAllChangeFeedStatuses:
case QueryAllChangeFeedStatuses:
ret := map[model.ChangeFeedID]*model.ChangeFeedStatus{}
for cfID, cfReactor := range o.changefeeds {
ret[cfID] = &model.ChangeFeedStatus{}
Expand All @@ -403,7 +403,7 @@ func (o *ownerImpl) handleQueries(query *OwnerQuery) error {
ret[cfID].AdminJobType = cfReactor.state.Status.AdminJobType
}
query.Data = ret
case OwnerQueryAllChangeFeedInfo:
case QueryAllChangeFeedInfo:
ret := map[model.ChangeFeedID]*model.ChangeFeedInfo{}
for cfID, cfReactor := range o.changefeeds {
if cfReactor.state == nil {
Expand All @@ -420,7 +420,7 @@ func (o *ownerImpl) handleQueries(query *OwnerQuery) error {
}
}
query.Data = ret
case OwnerQueryAllTaskStatuses:
case QueryAllTaskStatuses:
cfReactor, ok := o.changefeeds[query.ChangeFeedID]
if !ok {
return cerror.ErrChangeFeedNotExists.GenWithStackByArgs(query.ChangeFeedID)
Expand All @@ -444,7 +444,7 @@ func (o *ownerImpl) handleQueries(query *OwnerQuery) error {
}
}
query.Data = ret
case OwnerQueryTaskPositions:
case QueryTaskPositions:
cfReactor, ok := o.changefeeds[query.ChangeFeedID]
if !ok {
return cerror.ErrChangeFeedNotExists.GenWithStackByArgs(query.ChangeFeedID)
Expand All @@ -468,7 +468,7 @@ func (o *ownerImpl) handleQueries(query *OwnerQuery) error {
}
}
query.Data = ret
case OwnerQueryProcessors:
case QueryProcessors:
var ret []*model.ProcInfoSnap
for cfID, cfReactor := range o.changefeeds {
if cfReactor.state == nil {
Expand All @@ -482,7 +482,7 @@ func (o *ownerImpl) handleQueries(query *OwnerQuery) error {
}
}
query.Data = ret
case OwnerQueryCaptures:
case QueryCaptures:
var ret []*model.CaptureInfo
for _, captureInfo := range o.captures {
ret = append(ret, &model.CaptureInfo{
Expand Down
52 changes: 30 additions & 22 deletions cdc/owner/status_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,27 @@ type StatusProvider interface {
GetCaptures(ctx context.Context) ([]*model.CaptureInfo, error)
}

type OwnerQueryType int32
// QueryType is the type of different queries.
type QueryType int32

const (
OwnerQueryAllChangeFeedStatuses = iota
OwnerQueryAllChangeFeedInfo
OwnerQueryAllTaskStatuses
OwnerQueryTaskPositions
OwnerQueryProcessors
OwnerQueryCaptures
// QueryAllChangeFeedStatuses query all changefeed status.
QueryAllChangeFeedStatuses QueryType = iota
// QueryAllChangeFeedInfo is the type of query all changefeed info.
QueryAllChangeFeedInfo
// QueryAllTaskStatuses is the type of query all task statuses.
QueryAllTaskStatuses
// QueryTaskPositions is the type of query task positions.
QueryTaskPositions
// QueryProcessors is the type of query processors.
QueryProcessors
// QueryCaptures is the type of query captures info.
QueryCaptures
)

type OwnerQuery struct {
Tp OwnerQueryType
// Query wraps query command and return results.
type Query struct {
Tp QueryType
ChangeFeedID model.ChangeFeedID

Data interface{}
Expand All @@ -78,8 +86,8 @@ type ownerStatusProvider struct {
}

func (p *ownerStatusProvider) GetAllChangeFeedStatuses(ctx context.Context) (map[model.ChangeFeedID]*model.ChangeFeedStatus, error) {
query := &OwnerQuery{
Tp: OwnerQueryAllChangeFeedStatuses,
query := &Query{
Tp: QueryAllChangeFeedStatuses,
}
if err := p.sendQueryToOwner(ctx, query); err != nil {
return nil, errors.Trace(err)
Expand All @@ -100,8 +108,8 @@ func (p *ownerStatusProvider) GetChangeFeedStatus(ctx context.Context, changefee
}

func (p *ownerStatusProvider) GetAllChangeFeedInfo(ctx context.Context) (map[model.ChangeFeedID]*model.ChangeFeedInfo, error) {
query := &OwnerQuery{
Tp: OwnerQueryAllChangeFeedInfo,
query := &Query{
Tp: QueryAllChangeFeedInfo,
}
if err := p.sendQueryToOwner(ctx, query); err != nil {
return nil, errors.Trace(err)
Expand All @@ -122,8 +130,8 @@ func (p *ownerStatusProvider) GetChangeFeedInfo(ctx context.Context, changefeedI
}

func (p *ownerStatusProvider) GetAllTaskStatuses(ctx context.Context, changefeedID model.ChangeFeedID) (map[model.CaptureID]*model.TaskStatus, error) {
query := &OwnerQuery{
Tp: OwnerQueryAllTaskStatuses,
query := &Query{
Tp: QueryAllTaskStatuses,
ChangeFeedID: changefeedID,
}
if err := p.sendQueryToOwner(ctx, query); err != nil {
Expand All @@ -133,8 +141,8 @@ func (p *ownerStatusProvider) GetAllTaskStatuses(ctx context.Context, changefeed
}

func (p *ownerStatusProvider) GetTaskPositions(ctx context.Context, changefeedID model.ChangeFeedID) (map[model.CaptureID]*model.TaskPosition, error) {
query := &OwnerQuery{
Tp: OwnerQueryTaskPositions,
query := &Query{
Tp: QueryTaskPositions,
ChangeFeedID: changefeedID,
}
if err := p.sendQueryToOwner(ctx, query); err != nil {
Expand All @@ -144,8 +152,8 @@ func (p *ownerStatusProvider) GetTaskPositions(ctx context.Context, changefeedID
}

func (p *ownerStatusProvider) GetProcessors(ctx context.Context) ([]*model.ProcInfoSnap, error) {
query := &OwnerQuery{
Tp: OwnerQueryProcessors,
query := &Query{
Tp: QueryProcessors,
}
if err := p.sendQueryToOwner(ctx, query); err != nil {
return nil, errors.Trace(err)
Expand All @@ -154,16 +162,16 @@ func (p *ownerStatusProvider) GetProcessors(ctx context.Context) ([]*model.ProcI
}

func (p *ownerStatusProvider) GetCaptures(ctx context.Context) ([]*model.CaptureInfo, error) {
query := &OwnerQuery{
Tp: OwnerQueryCaptures,
query := &Query{
Tp: QueryCaptures,
}
if err := p.sendQueryToOwner(ctx, query); err != nil {
return nil, errors.Trace(err)
}
return query.Data.([]*model.CaptureInfo), nil
}

func (p *ownerStatusProvider) sendQueryToOwner(ctx context.Context, query *OwnerQuery) error {
func (p *ownerStatusProvider) sendQueryToOwner(ctx context.Context, query *Query) error {
doneCh := make(chan error, 1)
p.owner.Query(query, doneCh)

Expand Down
Loading

0 comments on commit fb216eb

Please sign in to comment.