Skip to content

Commit

Permalink
api(ticdc): only update upstreamInfo that has changed (#10422) (#10452)
Browse files Browse the repository at this point in the history
close #10430
  • Loading branch information
ti-chi-bot authored Feb 9, 2024
1 parent df3ee6a commit 05411d4
Show file tree
Hide file tree
Showing 10 changed files with 175 additions and 125 deletions.
2 changes: 1 addition & 1 deletion cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
zap.Any("upstreamInfo", newUpInfo))

err = h.capture.GetEtcdClient().
UpdateChangefeedAndUpstream(ctx, newUpInfo, newCfInfo, changefeedID)
UpdateChangefeedAndUpstream(ctx, newUpInfo, newCfInfo)
if err != nil {
_ = c.Error(errors.Trace(err))
return
Expand Down
6 changes: 3 additions & 3 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func TestUpdateChangefeed(t *testing.T) {
Return(&model.ChangeFeedInfo{}, &model.UpstreamInfo{}, nil).
Times(1)
etcdClient.EXPECT().
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any()).
Return(cerrors.ErrEtcdAPIError).Times(1)

w = httptest.NewRecorder()
Expand All @@ -495,7 +495,7 @@ func TestUpdateChangefeed(t *testing.T) {
Times(1)
mockCapture.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes()
etcdClient.EXPECT().
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).Times(1)

w = httptest.NewRecorder()
Expand All @@ -512,7 +512,7 @@ func TestUpdateChangefeed(t *testing.T) {
Times(1)
mockCapture.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes()
etcdClient.EXPECT().
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).Times(1)

w = httptest.NewRecorder()
Expand Down
2 changes: 1 addition & 1 deletion errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1348,7 +1348,7 @@ meta operation fail

["DFLOW:ErrMetaOpFailed"]
error = '''
meta operation %s is failed
unexpected meta operation failure: %s
'''

["DFLOW:ErrMetaOptionConflict"]
Expand Down
2 changes: 1 addition & 1 deletion pkg/errors/cdc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,7 @@ var (
errors.RFCCodeText("CDC:ErrMetaOpIgnored"),
)
ErrMetaOpFailed = errors.Normalize(
"meta operation %s is failed",
"unexpected meta operation failure: %s",
errors.RFCCodeText("DFLOW:ErrMetaOpFailed"),
)
ErrMetaInvalidState = errors.Normalize(
Expand Down
11 changes: 10 additions & 1 deletion pkg/etcd/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func (m mockWatcher) RequestProgress(ctx context.Context) error {
}

func TestRetry(t *testing.T) {
t.Parallel()

originValue := maxTries
// to speedup the test
maxTries = 2
Expand Down Expand Up @@ -116,6 +118,8 @@ func TestRetry(t *testing.T) {
}

func TestDelegateLease(t *testing.T) {
t.Parallel()

ctx := context.Background()
url, server, err := SetupEmbedEtcd(t.TempDir())
defer func() {
Expand Down Expand Up @@ -148,6 +152,8 @@ func TestDelegateLease(t *testing.T) {

// test no data lost when WatchCh blocked
func TestWatchChBlocked(t *testing.T) {
t.Parallel()

cli := clientv3.NewCtxClient(context.TODO())
resetCount := int32(0)
requestCount := int32(0)
Expand Down Expand Up @@ -209,6 +215,8 @@ func TestWatchChBlocked(t *testing.T) {

// test no data lost when OutCh blocked
func TestOutChBlocked(t *testing.T) {
t.Parallel()

cli := clientv3.NewCtxClient(context.TODO())
resetCount := int32(0)
requestCount := int32(0)
Expand Down Expand Up @@ -260,8 +268,9 @@ func TestOutChBlocked(t *testing.T) {
}

func TestRevisionNotFallBack(t *testing.T) {
cli := clientv3.NewCtxClient(context.TODO())
t.Parallel()

cli := clientv3.NewCtxClient(context.TODO())
resetCount := int32(0)
requestCount := int32(0)
rev := int64(0)
Expand Down
Loading

0 comments on commit 05411d4

Please sign in to comment.