Skip to content

Commit

Permalink
api(ticdc): only update upstreamInfo that has changed (pingcap#10422)
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Jan 24, 2024
1 parent 44c9d4d commit 2d22e7f
Show file tree
Hide file tree
Showing 11 changed files with 196 additions and 136 deletions.
4 changes: 1 addition & 3 deletions cdc/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,7 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) {
CAPath: up.SecurityConfig.CAPath,
CertAllowedCN: up.SecurityConfig.CertAllowedCN,
}
err = h.capture.GetEtcdClient().CreateChangefeedInfo(
ctx, upstreamInfo,
info, model.DefaultChangeFeedID(changefeedConfig.ID))
err = h.capture.GetEtcdClient().CreateChangefeedInfo(ctx, upstreamInfo, info)
if err != nil {
_ = c.Error(err)
return
Expand Down
7 changes: 2 additions & 5 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
return
}

err = h.capture.GetEtcdClient().CreateChangefeedInfo(ctx,
upstreamInfo,
info,
model.DefaultChangeFeedID(info.ID))
err = h.capture.GetEtcdClient().CreateChangefeedInfo(ctx, upstreamInfo, info)
if err != nil {
needRemoveGCSafePoint = true
_ = c.Error(err)
Expand Down Expand Up @@ -387,7 +384,7 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
zap.Any("upstreamInfo", newUpInfo))

err = h.capture.GetEtcdClient().
UpdateChangefeedAndUpstream(ctx, newUpInfo, newCfInfo, changefeedID)
UpdateChangefeedAndUpstream(ctx, newUpInfo, newCfInfo)
if err != nil {
_ = c.Error(errors.Trace(err))
return
Expand Down
6 changes: 3 additions & 3 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ func TestUpdateChangefeed(t *testing.T) {
Return(&model.ChangeFeedInfo{}, &model.UpstreamInfo{}, nil).
Times(1)
etcdClient.EXPECT().
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any()).
Return(cerrors.ErrEtcdAPIError).Times(1)

w = httptest.NewRecorder()
Expand All @@ -474,7 +474,7 @@ func TestUpdateChangefeed(t *testing.T) {
Times(1)
mockCapture.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes()
etcdClient.EXPECT().
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).Times(1)

w = httptest.NewRecorder()
Expand All @@ -491,7 +491,7 @@ func TestUpdateChangefeed(t *testing.T) {
Times(1)
mockCapture.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes()
etcdClient.EXPECT().
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).Times(1)

w = httptest.NewRecorder()
Expand Down
8 changes: 8 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1586,6 +1586,14 @@ error = '''
meta operation fail
'''

<<<<<<< HEAD
=======
["DFLOW:ErrMetaOpFailed"]
error = '''
unexpected meta operation failure: %s
'''

>>>>>>> c5d5eff1f (api(ticdc): only update upstreamInfo that has changed (#10422))
["DFLOW:ErrMetaOptionConflict"]
error = '''
WithRange/WithPrefix/WithFromKey, more than one option are used
Expand Down
5 changes: 5 additions & 0 deletions pkg/errors/cdc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1174,4 +1174,9 @@ var (
"add `ignore-txn-start-ts=[%d]` to the changefeed in the filter configuration.",
errors.RFCCodeText("CDC:ErrHandleDDLFailed"),
)

ErrMetaOpFailed = errors.Normalize(
"unexpected meta operation failure: %s",
errors.RFCCodeText("DFLOW:ErrMetaOpFailed"),
)
)
11 changes: 10 additions & 1 deletion pkg/etcd/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func (m mockWatcher) RequestProgress(ctx context.Context) error {
}

func TestRetry(t *testing.T) {
t.Parallel()

originValue := maxTries
// to speedup the test
maxTries = 2
Expand Down Expand Up @@ -116,6 +118,8 @@ func TestRetry(t *testing.T) {
}

func TestDelegateLease(t *testing.T) {
t.Parallel()

ctx := context.Background()
url, server, err := SetupEmbedEtcd(t.TempDir())
defer func() {
Expand Down Expand Up @@ -148,6 +152,8 @@ func TestDelegateLease(t *testing.T) {

// test no data lost when WatchCh blocked
func TestWatchChBlocked(t *testing.T) {
t.Parallel()

cli := clientv3.NewCtxClient(context.TODO())
resetCount := int32(0)
requestCount := int32(0)
Expand Down Expand Up @@ -209,6 +215,8 @@ func TestWatchChBlocked(t *testing.T) {

// test no data lost when OutCh blocked
func TestOutChBlocked(t *testing.T) {
t.Parallel()

cli := clientv3.NewCtxClient(context.TODO())
resetCount := int32(0)
requestCount := int32(0)
Expand Down Expand Up @@ -260,8 +268,9 @@ func TestOutChBlocked(t *testing.T) {
}

func TestRevisionNotFallBack(t *testing.T) {
cli := clientv3.NewCtxClient(context.TODO())
t.Parallel()

cli := clientv3.NewCtxClient(context.TODO())
resetCount := int32(0)
requestCount := int32(0)
rev := int64(0)
Expand Down
Loading

0 comments on commit 2d22e7f

Please sign in to comment.