Skip to content

Commit

Permalink
api(ticdc): only update upstreamInfo that has changed (pingcap#10422)
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Feb 9, 2024
1 parent a385de6 commit 5f62c4e
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 125 deletions.
2 changes: 1 addition & 1 deletion cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
zap.Any("upstreamInfo", newUpInfo))

err = h.capture.GetEtcdClient().
UpdateChangefeedAndUpstream(ctx, newUpInfo, newCfInfo, changefeedID)
UpdateChangefeedAndUpstream(ctx, newUpInfo, newCfInfo)
if err != nil {
_ = c.Error(errors.Trace(err))
return
Expand Down
6 changes: 3 additions & 3 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func TestUpdateChangefeed(t *testing.T) {
Return(&model.ChangeFeedInfo{}, &model.UpstreamInfo{}, nil).
Times(1)
etcdClient.EXPECT().
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any()).
Return(cerrors.ErrEtcdAPIError).Times(1)

w = httptest.NewRecorder()
Expand All @@ -495,7 +495,7 @@ func TestUpdateChangefeed(t *testing.T) {
Times(1)
mockCapture.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes()
etcdClient.EXPECT().
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).Times(1)

w = httptest.NewRecorder()
Expand All @@ -512,7 +512,7 @@ func TestUpdateChangefeed(t *testing.T) {
Times(1)
mockCapture.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes()
etcdClient.EXPECT().
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).Times(1)

w = httptest.NewRecorder()
Expand Down
18 changes: 18 additions & 0 deletions cdc/owner/mock/owner_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1343,7 +1343,7 @@ meta operation fail

["DFLOW:ErrMetaOpFailed"]
error = '''
meta operation %s is failed
unexpected meta operation failure: %s
'''

["DFLOW:ErrMetaOptionConflict"]
Expand Down
2 changes: 1 addition & 1 deletion pkg/errors/cdc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ var (
errors.RFCCodeText("CDC:ErrMetaOpIgnored"),
)
ErrMetaOpFailed = errors.Normalize(
"meta operation %s is failed",
"unexpected meta operation failure: %s",
errors.RFCCodeText("DFLOW:ErrMetaOpFailed"),
)
ErrMetaInvalidState = errors.Normalize(
Expand Down
11 changes: 10 additions & 1 deletion pkg/etcd/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func (m mockWatcher) RequestProgress(ctx context.Context) error {
}

func TestRetry(t *testing.T) {
t.Parallel()

originValue := maxTries
// to speedup the test
maxTries = 2
Expand Down Expand Up @@ -116,6 +118,8 @@ func TestRetry(t *testing.T) {
}

func TestDelegateLease(t *testing.T) {
t.Parallel()

ctx := context.Background()
url, server, err := SetupEmbedEtcd(t.TempDir())
defer func() {
Expand Down Expand Up @@ -148,6 +152,8 @@ func TestDelegateLease(t *testing.T) {

// test no data lost when WatchCh blocked
func TestWatchChBlocked(t *testing.T) {
t.Parallel()

cli := clientv3.NewCtxClient(context.TODO())
resetCount := int32(0)
requestCount := int32(0)
Expand Down Expand Up @@ -209,6 +215,8 @@ func TestWatchChBlocked(t *testing.T) {

// test no data lost when OutCh blocked
func TestOutChBlocked(t *testing.T) {
t.Parallel()

cli := clientv3.NewCtxClient(context.TODO())
resetCount := int32(0)
requestCount := int32(0)
Expand Down Expand Up @@ -260,8 +268,9 @@ func TestOutChBlocked(t *testing.T) {
}

func TestRevisionNotFallBack(t *testing.T) {
cli := clientv3.NewCtxClient(context.TODO())
t.Parallel()

cli := clientv3.NewCtxClient(context.TODO())
resetCount := int32(0)
requestCount := int32(0)
rev := int64(0)
Expand Down
Loading

0 comments on commit 5f62c4e

Please sign in to comment.