Skip to content

Commit

Permalink
api(ticdc): only update upstreamInfo that has changed (#10422)
Browse files Browse the repository at this point in the history
close #10430
  • Loading branch information
CharlesCheung96 authored Jan 11, 2024
1 parent 1bbb996 commit c5d5eff
Show file tree
Hide file tree
Showing 13 changed files with 180 additions and 133 deletions.
2 changes: 1 addition & 1 deletion cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
zap.Any("upstreamInfo", newUpInfo))

err = owner.
UpdateChangefeedAndUpstream(ctx, newUpInfo, newCfInfo, changefeedID)
UpdateChangefeedAndUpstream(ctx, newUpInfo, newCfInfo)
if err != nil {
_ = c.Error(errors.Trace(err))
return
Expand Down
6 changes: 3 additions & 3 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ func TestUpdateChangefeed(t *testing.T) {
Return(&model.ChangeFeedInfo{}, &model.UpstreamInfo{}, nil).
Times(1)
mockOwner.EXPECT().
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any()).
Return(cerrors.ErrEtcdAPIError).Times(1)

w = httptest.NewRecorder()
Expand All @@ -494,7 +494,7 @@ func TestUpdateChangefeed(t *testing.T) {
Times(1)
mockCapture.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes()
mockOwner.EXPECT().
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).Times(1)

w = httptest.NewRecorder()
Expand All @@ -511,7 +511,7 @@ func TestUpdateChangefeed(t *testing.T) {
Times(1)
mockCapture.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes()
mockOwner.EXPECT().
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).Times(1)

w = httptest.NewRecorder()
Expand Down
8 changes: 4 additions & 4 deletions cdc/owner/mock/owner_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ type Owner interface {
UpdateChangefeedAndUpstream(ctx context.Context,
upstreamInfo *model.UpstreamInfo,
changeFeedInfo *model.ChangeFeedInfo,
changeFeedID model.ChangeFeedID,
) error
UpdateChangefeed(ctx context.Context,
changeFeedInfo *model.ChangeFeedInfo) error
Expand Down Expand Up @@ -405,9 +404,8 @@ func (o *ownerImpl) AsyncStop() {
func (o *ownerImpl) UpdateChangefeedAndUpstream(ctx context.Context,
upstreamInfo *model.UpstreamInfo,
changeFeedInfo *model.ChangeFeedInfo,
changeFeedID model.ChangeFeedID,
) error {
return o.etcdClient.UpdateChangefeedAndUpstream(ctx, upstreamInfo, changeFeedInfo, changeFeedID)
return o.etcdClient.UpdateChangefeedAndUpstream(ctx, upstreamInfo, changeFeedInfo)
}

func (o *ownerImpl) UpdateChangefeed(ctx context.Context,
Expand Down
1 change: 0 additions & 1 deletion cdcv2/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ type Owner struct {
func (o *Owner) UpdateChangefeedAndUpstream(ctx context.Context,
upstreamInfo *model.UpstreamInfo,
changeFeedInfo *model.ChangeFeedInfo,
changeFeedID model.ChangeFeedID,
) error {
panic("implement me")
}
Expand Down
2 changes: 1 addition & 1 deletion errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1353,7 +1353,7 @@ meta operation fail

["DFLOW:ErrMetaOpFailed"]
error = '''
meta operation %s is failed
unexpected meta operation failure: %s
'''

["DFLOW:ErrMetaOptionConflict"]
Expand Down
2 changes: 1 addition & 1 deletion pkg/errors/cdc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,7 @@ var (
errors.RFCCodeText("CDC:ErrMetaOpIgnored"),
)
ErrMetaOpFailed = errors.Normalize(
"meta operation %s is failed",
"unexpected meta operation failure: %s",
errors.RFCCodeText("DFLOW:ErrMetaOpFailed"),
)
ErrMetaInvalidState = errors.Normalize(
Expand Down
11 changes: 10 additions & 1 deletion pkg/etcd/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func (m mockWatcher) RequestProgress(ctx context.Context) error {
}

func TestRetry(t *testing.T) {
t.Parallel()

originValue := maxTries
// to speedup the test
maxTries = 2
Expand Down Expand Up @@ -116,6 +118,8 @@ func TestRetry(t *testing.T) {
}

func TestDelegateLease(t *testing.T) {
t.Parallel()

ctx := context.Background()
url, server, err := SetupEmbedEtcd(t.TempDir())
defer func() {
Expand Down Expand Up @@ -148,6 +152,8 @@ func TestDelegateLease(t *testing.T) {

// test no data lost when WatchCh blocked
func TestWatchChBlocked(t *testing.T) {
t.Parallel()

cli := clientv3.NewCtxClient(context.TODO())
resetCount := int32(0)
requestCount := int32(0)
Expand Down Expand Up @@ -209,6 +215,8 @@ func TestWatchChBlocked(t *testing.T) {

// test no data lost when OutCh blocked
func TestOutChBlocked(t *testing.T) {
t.Parallel()

cli := clientv3.NewCtxClient(context.TODO())
resetCount := int32(0)
requestCount := int32(0)
Expand Down Expand Up @@ -260,8 +268,9 @@ func TestOutChBlocked(t *testing.T) {
}

func TestRevisionNotFallBack(t *testing.T) {
cli := clientv3.NewCtxClient(context.TODO())
t.Parallel()

cli := clientv3.NewCtxClient(context.TODO())
resetCount := int32(0)
requestCount := int32(0)
rev := int64(0)
Expand Down
Loading

0 comments on commit c5d5eff

Please sign in to comment.