From 2d22e7f7d3b2927a8316a58c46051c08440e7207 Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Thu, 11 Jan 2024 13:34:55 +0800 Subject: [PATCH] api(ticdc): only update upstreamInfo that has changed (#10422) close pingcap/tiflow#10430 --- cdc/api/v1/api.go | 4 +- cdc/api/v2/changefeed.go | 7 +- cdc/api/v2/changefeed_test.go | 6 +- errors.toml | 8 ++ pkg/errors/cdc_errors.go | 5 + pkg/etcd/client_test.go | 11 +- pkg/etcd/etcd.go | 223 +++++++++++++++--------------- pkg/etcd/etcd_test.go | 46 ++++-- pkg/etcd/etcdkey_test.go | 4 + pkg/etcd/mock/etcd_client_mock.go | 16 +-- pkg/etcd/util_test.go | 2 + 11 files changed, 196 insertions(+), 136 deletions(-) diff --git a/cdc/api/v1/api.go b/cdc/api/v1/api.go index 333250f2800..6b623415b40 100644 --- a/cdc/api/v1/api.go +++ b/cdc/api/v1/api.go @@ -323,9 +323,7 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) { CAPath: up.SecurityConfig.CAPath, CertAllowedCN: up.SecurityConfig.CertAllowedCN, } - err = h.capture.GetEtcdClient().CreateChangefeedInfo( - ctx, upstreamInfo, - info, model.DefaultChangeFeedID(changefeedConfig.ID)) + err = h.capture.GetEtcdClient().CreateChangefeedInfo(ctx, upstreamInfo, info) if err != nil { _ = c.Error(err) return diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index 089beaa171e..25327587370 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -134,10 +134,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { return } - err = h.capture.GetEtcdClient().CreateChangefeedInfo(ctx, - upstreamInfo, - info, - model.DefaultChangeFeedID(info.ID)) + err = h.capture.GetEtcdClient().CreateChangefeedInfo(ctx, upstreamInfo, info) if err != nil { needRemoveGCSafePoint = true _ = c.Error(err) @@ -387,7 +384,7 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) { zap.Any("upstreamInfo", newUpInfo)) err = h.capture.GetEtcdClient(). - UpdateChangefeedAndUpstream(ctx, newUpInfo, newCfInfo, changefeedID) + UpdateChangefeedAndUpstream(ctx, newUpInfo, newCfInfo) if err != nil { _ = c.Error(errors.Trace(err)) return diff --git a/cdc/api/v2/changefeed_test.go b/cdc/api/v2/changefeed_test.go index ef85e0dac8c..20966589c8a 100644 --- a/cdc/api/v2/changefeed_test.go +++ b/cdc/api/v2/changefeed_test.go @@ -454,7 +454,7 @@ func TestUpdateChangefeed(t *testing.T) { Return(&model.ChangeFeedInfo{}, &model.UpstreamInfo{}, nil). Times(1) etcdClient.EXPECT(). - UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any()). Return(cerrors.ErrEtcdAPIError).Times(1) w = httptest.NewRecorder() @@ -474,7 +474,7 @@ func TestUpdateChangefeed(t *testing.T) { Times(1) mockCapture.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes() etcdClient.EXPECT(). - UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any()). Return(nil).Times(1) w = httptest.NewRecorder() @@ -491,7 +491,7 @@ func TestUpdateChangefeed(t *testing.T) { Times(1) mockCapture.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes() etcdClient.EXPECT(). - UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any()). Return(nil).Times(1) w = httptest.NewRecorder() diff --git a/errors.toml b/errors.toml index 8c51210b938..211603d913d 100755 --- a/errors.toml +++ b/errors.toml @@ -1586,6 +1586,14 @@ error = ''' meta operation fail ''' +<<<<<<< HEAD +======= +["DFLOW:ErrMetaOpFailed"] +error = ''' +unexpected meta operation failure: %s +''' + +>>>>>>> c5d5eff1f (api(ticdc): only update upstreamInfo that has changed (#10422)) ["DFLOW:ErrMetaOptionConflict"] error = ''' WithRange/WithPrefix/WithFromKey, more than one option are used diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index 90a7c54f5b6..c6b28935910 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -1174,4 +1174,9 @@ var ( "add `ignore-txn-start-ts=[%d]` to the changefeed in the filter configuration.", errors.RFCCodeText("CDC:ErrHandleDDLFailed"), ) + + ErrMetaOpFailed = errors.Normalize( + "unexpected meta operation failure: %s", + errors.RFCCodeText("DFLOW:ErrMetaOpFailed"), + ) ) diff --git a/pkg/etcd/client_test.go b/pkg/etcd/client_test.go index 4e5731d3538..cd174b92117 100644 --- a/pkg/etcd/client_test.go +++ b/pkg/etcd/client_test.go @@ -71,6 +71,8 @@ func (m mockWatcher) RequestProgress(ctx context.Context) error { } func TestRetry(t *testing.T) { + t.Parallel() + originValue := maxTries // to speedup the test maxTries = 2 @@ -116,6 +118,8 @@ func TestRetry(t *testing.T) { } func TestDelegateLease(t *testing.T) { + t.Parallel() + ctx := context.Background() url, server, err := SetupEmbedEtcd(t.TempDir()) defer func() { @@ -148,6 +152,8 @@ func TestDelegateLease(t *testing.T) { // test no data lost when WatchCh blocked func TestWatchChBlocked(t *testing.T) { + t.Parallel() + cli := clientv3.NewCtxClient(context.TODO()) resetCount := int32(0) requestCount := int32(0) @@ -209,6 +215,8 @@ func TestWatchChBlocked(t *testing.T) { // test no data lost when OutCh blocked func TestOutChBlocked(t *testing.T) { + t.Parallel() + cli := clientv3.NewCtxClient(context.TODO()) resetCount := int32(0) requestCount := int32(0) @@ -260,8 +268,9 @@ func TestOutChBlocked(t *testing.T) { } func TestRevisionNotFallBack(t *testing.T) { - cli := clientv3.NewCtxClient(context.TODO()) + t.Parallel() + cli := clientv3.NewCtxClient(context.TODO()) resetCount := int32(0) requestCount := int32(0) rev := int64(0) diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index e966b9f5968..32843ec885a 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -14,17 +14,17 @@ package etcd import ( + "bytes" "context" "fmt" "net/url" "strings" "time" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" - cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/pkg/tempurl" "go.etcd.io/etcd/api/v3/mvccpb" @@ -140,13 +140,11 @@ type CDCEtcdClient interface { CreateChangefeedInfo(context.Context, *model.UpstreamInfo, *model.ChangeFeedInfo, - model.ChangeFeedID, ) error UpdateChangefeedAndUpstream(ctx context.Context, upstreamInfo *model.UpstreamInfo, changeFeedInfo *model.ChangeFeedInfo, - changeFeedID model.ChangeFeedID, ) error PutCaptureInfo(context.Context, *model.CaptureInfo, clientv3.LeaseID) error @@ -197,7 +195,7 @@ func (c *CDCEtcdClientImpl) Close() error { // ClearAllCDCInfo delete all keys created by CDC func (c *CDCEtcdClientImpl) ClearAllCDCInfo(ctx context.Context) error { _, err := c.Client.Delete(ctx, BaseKey(c.ClusterID), clientv3.WithPrefix()) - return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return errors.WrapError(errors.ErrPDEtcdAPIError, err) } // GetClusterID gets CDC cluster ID. @@ -214,7 +212,7 @@ func (c *CDCEtcdClientImpl) GetEtcdClient() *Client { func (c *CDCEtcdClientImpl) GetAllCDCInfo(ctx context.Context) ([]*mvccpb.KeyValue, error) { resp, err := c.Client.Get(ctx, BaseKey(c.ClusterID), clientv3.WithPrefix()) if err != nil { - return nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return nil, errors.WrapError(errors.ErrPDEtcdAPIError, err) } return resp.Kvs, nil } @@ -226,7 +224,7 @@ func (c *CDCEtcdClientImpl) CheckMultipleCDCClusterExist(ctx context.Context) er clientv3.WithPrefix(), clientv3.WithKeysOnly()) if err != nil { - return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return errors.WrapError(errors.ErrPDEtcdAPIError, err) } for _, kv := range resp.Kvs { key := string(kv.Key) @@ -245,7 +243,7 @@ func (c *CDCEtcdClientImpl) CheckMultipleCDCClusterExist(ctx context.Context) er if isReserved { continue } - return cerror.ErrMultipleCDCClustersExist.GenWithStackByArgs() + return errors.ErrMultipleCDCClustersExist.GenWithStackByArgs() } return nil } @@ -260,7 +258,7 @@ func (c *CDCEtcdClientImpl) GetChangeFeeds(ctx context.Context) ( resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix()) if err != nil { - return 0, nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return 0, nil, errors.WrapError(errors.ErrPDEtcdAPIError, err) } revision := resp.Header.Revision details := make(map[model.ChangeFeedID]*mvccpb.KeyValue, resp.Count) @@ -301,10 +299,10 @@ func (c *CDCEtcdClientImpl) GetChangeFeedInfo(ctx context.Context, key := GetEtcdKeyChangeFeedInfo(c.ClusterID, id) resp, err := c.Client.Get(ctx, key) if err != nil { - return nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return nil, errors.WrapError(errors.ErrPDEtcdAPIError, err) } if resp.Count == 0 { - return nil, cerror.ErrChangeFeedNotExists.GenWithStackByArgs(key) + return nil, errors.ErrChangeFeedNotExists.GenWithStackByArgs(key) } detail := &model.ChangeFeedInfo{} err = detail.Unmarshal(resp.Kvs[0].Value) @@ -317,7 +315,7 @@ func (c *CDCEtcdClientImpl) DeleteChangeFeedInfo(ctx context.Context, ) error { key := GetEtcdKeyChangeFeedInfo(c.ClusterID, id) _, err := c.Client.Delete(ctx, key) - return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return errors.WrapError(errors.ErrPDEtcdAPIError, err) } // GetChangeFeedStatus queries the checkpointTs and resovledTs of a given changefeed @@ -327,10 +325,10 @@ func (c *CDCEtcdClientImpl) GetChangeFeedStatus(ctx context.Context, key := GetEtcdKeyJob(c.ClusterID, id) resp, err := c.Client.Get(ctx, key) if err != nil { - return nil, 0, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return nil, 0, errors.WrapError(errors.ErrPDEtcdAPIError, err) } if resp.Count == 0 { - return nil, 0, cerror.ErrChangeFeedNotExists.GenWithStackByArgs(key) + return nil, 0, errors.ErrChangeFeedNotExists.GenWithStackByArgs(key) } info := &model.ChangeFeedStatus{} err = info.Unmarshal(resp.Kvs[0].Value) @@ -343,7 +341,7 @@ func (c *CDCEtcdClientImpl) GetCaptures(ctx context.Context) (int64, []*model.Ca resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix()) if err != nil { - return 0, nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return 0, nil, errors.WrapError(errors.ErrPDEtcdAPIError, err) } revision := resp.Header.Revision infos := make([]*model.CaptureInfo, 0, resp.Count) @@ -367,11 +365,11 @@ func (c *CDCEtcdClientImpl) GetCaptureInfo( resp, err := c.Client.Get(ctx, key) if err != nil { - return nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return nil, errors.WrapError(errors.ErrPDEtcdAPIError, err) } if len(resp.Kvs) == 0 { - return nil, cerror.ErrCaptureNotExist.GenWithStackByArgs(key) + return nil, errors.ErrCaptureNotExist.GenWithStackByArgs(key) } info = new(model.CaptureInfo) @@ -389,7 +387,7 @@ func (c *CDCEtcdClientImpl) GetCaptureLeases(ctx context.Context) (map[string]in resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix()) if err != nil { - return nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return nil, errors.WrapError(errors.ErrPDEtcdAPIError, err) } leases := make(map[string]int64, resp.Count) for _, kv := range resp.Kvs { @@ -412,108 +410,117 @@ func (c *CDCEtcdClientImpl) RevokeAllLeases(ctx context.Context, leases map[stri // it means the etcd lease is already expired or revoked continue } - return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return errors.WrapError(errors.ErrPDEtcdAPIError, err) } return nil } // CreateChangefeedInfo creates a change feed info into etcd and fails if it is already exists. -func (c *CDCEtcdClientImpl) CreateChangefeedInfo(ctx context.Context, - upstreamInfo *model.UpstreamInfo, - info *model.ChangeFeedInfo, - changeFeedID model.ChangeFeedID, +func (c *CDCEtcdClientImpl) CreateChangefeedInfo( + ctx context.Context, upstreamInfo *model.UpstreamInfo, info *model.ChangeFeedInfo, ) error { - infoKey := GetEtcdKeyChangeFeedInfo(c.ClusterID, changeFeedID) - jobKey := GetEtcdKeyJob(c.ClusterID, changeFeedID) - upstreamInfoKey := CDCKey{ - Tp: CDCKeyTypeUpStream, - ClusterID: c.ClusterID, - UpstreamID: upstreamInfo.ID, - Namespace: changeFeedID.Namespace, - } - upstreamEtcdKeyStr := upstreamInfoKey.String() - info.UpstreamID = upstreamInfo.ID - value, err := info.Marshal() - if err != nil { - return errors.Trace(err) - } - upstreamResp, err := c.Client.Get(ctx, upstreamEtcdKeyStr) - if err != nil { - return errors.Trace(err) - } - upstreamData, err := upstreamInfo.Marshal() - if err != nil { - return errors.Trace(err) - } - cmps := []clientv3.Cmp{ - clientv3.Compare(clientv3.ModRevision(infoKey), "=", 0), - clientv3.Compare(clientv3.ModRevision(jobKey), "=", 0), - } - opsThen := []clientv3.Op{ - clientv3.OpPut(infoKey, value), - clientv3.OpPut(upstreamEtcdKeyStr, string(upstreamData)), - } - if len(upstreamResp.Kvs) == 0 { - cmps = append(cmps, - clientv3.Compare(clientv3.ModRevision(upstreamEtcdKeyStr), - "=", 0)) - } else { - cmps = append(cmps, - clientv3.Compare(clientv3.ModRevision(upstreamInfoKey.String()), - "=", upstreamResp.Kvs[0].ModRevision)) - } - - resp, err := c.Client.Txn(ctx, cmps, opsThen, TxnEmptyOpsElse) - if err != nil { - return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) - } - if !resp.Succeeded { - log.Warn("changefeed already exists, ignore create changefeed", - zap.String("namespace", changeFeedID.Namespace), - zap.String("changefeed", changeFeedID.ID)) - return cerror.ErrChangeFeedAlreadyExists.GenWithStackByArgs(changeFeedID) - } - return errors.Trace(err) + return c.saveChangefeedAndUpstreamInfo(ctx, "Create", upstreamInfo, info) } // UpdateChangefeedAndUpstream updates the changefeed's info and its upstream info into etcd -func (c *CDCEtcdClientImpl) UpdateChangefeedAndUpstream(ctx context.Context, - upstreamInfo *model.UpstreamInfo, - changeFeedInfo *model.ChangeFeedInfo, - changeFeedID model.ChangeFeedID, +func (c *CDCEtcdClientImpl) UpdateChangefeedAndUpstream( + ctx context.Context, upstreamInfo *model.UpstreamInfo, changeFeedInfo *model.ChangeFeedInfo, ) error { - infoKey := GetEtcdKeyChangeFeedInfo(c.ClusterID, changeFeedID) - changeFeedInfoStr, err := changeFeedInfo.Marshal() - if err != nil { - return errors.Trace(err) + return c.saveChangefeedAndUpstreamInfo(ctx, "Update", upstreamInfo, changeFeedInfo) +} + +// saveChangefeedAndUpstreamInfo stores changefeed info and its upstream info into etcd +func (c *CDCEtcdClientImpl) saveChangefeedAndUpstreamInfo( + ctx context.Context, operation string, + upstreamInfo *model.UpstreamInfo, info *model.ChangeFeedInfo, +) error { + cmps := []clientv3.Cmp{} + opsThen := []clientv3.Op{} + + if upstreamInfo != nil { + if info.UpstreamID != upstreamInfo.ID { + return errors.ErrUpstreamMissMatch.GenWithStackByArgs(info.UpstreamID, upstreamInfo.ID) + } + upstreamInfoKey := CDCKey{ + Tp: CDCKeyTypeUpStream, + ClusterID: c.ClusterID, + UpstreamID: upstreamInfo.ID, + Namespace: info.Namespace, + } + upstreamEtcdKeyStr := upstreamInfoKey.String() + upstreamResp, err := c.Client.Get(ctx, upstreamEtcdKeyStr) + if err != nil { + return errors.WrapError(errors.ErrPDEtcdAPIError, err) + } + upstreamData, err := upstreamInfo.Marshal() + if err != nil { + return errors.WrapError(errors.ErrPDEtcdAPIError, err) + } + + if len(upstreamResp.Kvs) == 0 { + cmps = append(cmps, clientv3.Compare(clientv3.ModRevision(upstreamEtcdKeyStr), "=", 0)) + opsThen = append(opsThen, clientv3.OpPut(upstreamEtcdKeyStr, string(upstreamData))) + } else { + cmps = append(cmps, + clientv3.Compare(clientv3.ModRevision(upstreamEtcdKeyStr), "=", upstreamResp.Kvs[0].ModRevision)) + if !bytes.Equal(upstreamResp.Kvs[0].Value, upstreamData) { + opsThen = append(opsThen, clientv3.OpPut(upstreamEtcdKeyStr, string(upstreamData))) + } + } } - upstreamKey := CDCKey{ - Tp: CDCKeyTypeUpStream, - ClusterID: c.ClusterID, - UpstreamID: upstreamInfo.ID, - Namespace: changeFeedID.Namespace, + + changeFeedID := model.ChangeFeedID{ + Namespace: info.Namespace, + ID: info.ID, } - upstreamKeyStr := upstreamKey.String() - upstreamInfoStr, err := upstreamInfo.Marshal() + infoKey := GetEtcdKeyChangeFeedInfo(c.ClusterID, changeFeedID) + jobKey := GetEtcdKeyJob(c.ClusterID, changeFeedID) + infoData, err := info.Marshal() if err != nil { return errors.Trace(err) } - opsThen := []clientv3.Op{ - clientv3.OpPut(infoKey, changeFeedInfoStr), - clientv3.OpPut(upstreamKeyStr, string(upstreamInfoStr)), + + var infoModRevsion, jobModRevision int64 + if operation == "Update" { + infoResp, err := c.Client.Get(ctx, infoKey) + if err != nil { + return errors.WrapError(errors.ErrPDEtcdAPIError, err) + } + if len(infoResp.Kvs) == 0 { + return errors.ErrChangeFeedNotExists.GenWithStackByArgs(infoKey) + } + infoModRevsion = infoResp.Kvs[0].ModRevision + + jobResp, err := c.Client.Get(ctx, jobKey) + if err != nil { + return errors.WrapError(errors.ErrPDEtcdAPIError, err) + } + if len(jobResp.Kvs) == 0 { + // Note that status may not exist, so we don't check it here. + log.Debug("job status not exists", zap.Stringer("changefeed", changeFeedID)) + } else { + jobModRevision = jobResp.Kvs[0].ModRevision + } } - resp, err := c.Client.Txn(ctx, txnEmptyCmps, opsThen, TxnEmptyOpsElse) + cmps = append(cmps, + clientv3.Compare(clientv3.ModRevision(infoKey), "=", infoModRevsion), + clientv3.Compare(clientv3.ModRevision(jobKey), "=", jobModRevision), + ) + opsThen = append(opsThen, clientv3.OpPut(infoKey, infoData)) + + resp, err := c.Client.Txn(ctx, cmps, opsThen, TxnEmptyOpsElse) if err != nil { - return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return errors.WrapError(errors.ErrPDEtcdAPIError, err) } if !resp.Succeeded { - log.Warn("unexpected etcd transaction failure", + log.Warn(fmt.Sprintf("unexpected etcd transaction failure, operation: %s", operation), zap.String("namespace", changeFeedID.Namespace), zap.String("changefeed", changeFeedID.ID)) - return cerror.ErrChangefeedUpdateFailedTransaction.GenWithStackByArgs(changeFeedID) + errMsg := fmt.Sprintf("%s changefeed %s", operation, changeFeedID) + return errors.ErrMetaOpFailed.GenWithStackByArgs(errMsg) } - return nil + return errors.Trace(err) } // SaveChangeFeedInfo stores change feed info into etcd @@ -528,7 +535,7 @@ func (c *CDCEtcdClientImpl) SaveChangeFeedInfo(ctx context.Context, return errors.Trace(err) } _, err = c.Client.Put(ctx, key, value) - return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return errors.WrapError(errors.ErrPDEtcdAPIError, err) } // PutCaptureInfo put capture info into etcd, @@ -543,7 +550,7 @@ func (c *CDCEtcdClientImpl) PutCaptureInfo( key := GetEtcdKeyCaptureInfo(c.ClusterID, info.ID) _, err = c.Client.Put(ctx, key, string(data), clientv3.WithLease(leaseID)) - return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return errors.WrapError(errors.ErrPDEtcdAPIError, err) } // DeleteCaptureInfo delete all capture related info from etcd. @@ -551,7 +558,7 @@ func (c *CDCEtcdClientImpl) DeleteCaptureInfo(ctx context.Context, captureID str key := GetEtcdKeyCaptureInfo(c.ClusterID, captureID) _, err := c.Client.Delete(ctx, key) if err != nil { - return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return errors.WrapError(errors.ErrPDEtcdAPIError, err) } // we need to clean all task position related to this capture when the capture is offline // otherwise the task positions may leak @@ -566,7 +573,7 @@ func (c *CDCEtcdClientImpl) DeleteCaptureInfo(ctx context.Context, captureID str zap.String("captureID", captureID), zap.String("key", key), zap.Error(err)) } - return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return errors.WrapError(errors.ErrPDEtcdAPIError, err) } // GetOwnerID returns the owner id by querying etcd @@ -574,7 +581,7 @@ func (c *CDCEtcdClientImpl) GetOwnerID(ctx context.Context) (string, error) { resp, err := c.Client.Get(ctx, CaptureOwnerKey(c.ClusterID), clientv3.WithFirstCreate()...) if err != nil { - return "", cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return "", errors.WrapError(errors.ErrPDEtcdAPIError, err) } if len(resp.Kvs) == 0 { return "", concurrency.ErrElectionNoLeader @@ -588,14 +595,14 @@ func (c *CDCEtcdClientImpl) GetOwnerRevision( ) (rev int64, err error) { resp, err := c.Client.Get(ctx, CaptureOwnerKey(c.ClusterID), clientv3.WithFirstCreate()...) if err != nil { - return 0, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return 0, errors.WrapError(errors.ErrPDEtcdAPIError, err) } if len(resp.Kvs) == 0 { - return 0, cerror.ErrOwnerNotFound.GenWithStackByArgs() + return 0, errors.ErrOwnerNotFound.GenWithStackByArgs() } // Checks that the given capture is indeed the owner. if string(resp.Kvs[0].Value) != captureID { - return 0, cerror.ErrNotOwner.GenWithStackByArgs() + return 0, errors.ErrNotOwner.GenWithStackByArgs() } return resp.Kvs[0].ModRevision, nil } @@ -624,10 +631,10 @@ func (c *CDCEtcdClientImpl) GetUpstreamInfo(ctx context.Context, KeyStr := Key.String() resp, err := c.Client.Get(ctx, KeyStr) if err != nil { - return nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return nil, errors.WrapError(errors.ErrPDEtcdAPIError, err) } if resp.Count == 0 { - return nil, cerror.ErrUpstreamNotFound.GenWithStackByArgs(KeyStr) + return nil, errors.ErrUpstreamNotFound.GenWithStackByArgs(KeyStr) } info := &model.UpstreamInfo{} err = info.Unmarshal(resp.Kvs[0].Value) @@ -688,7 +695,7 @@ func SetupEmbedEtcd(dir string) (clientURL *url.URL, e *embed.Etcd, err error) { func extractKeySuffix(key string) (string, error) { subs := strings.Split(key, "/") if len(subs) < 2 { - return "", cerror.ErrInvalidEtcdKey.GenWithStackByArgs(key) + return "", errors.ErrInvalidEtcdKey.GenWithStackByArgs(key) } return subs[len(subs)-1], nil } diff --git a/pkg/etcd/etcd_test.go b/pkg/etcd/etcd_test.go index 3f7a9bb9b68..daa83ecd0e5 100644 --- a/pkg/etcd/etcd_test.go +++ b/pkg/etcd/etcd_test.go @@ -37,6 +37,8 @@ func (c Captures) Less(i, j int) bool { return c[i].ID < c[j].ID } func (c Captures) Swap(i, j int) { c[i], c[j] = c[j], c[i] } func TestEmbedEtcd(t *testing.T) { + t.Parallel() + s := &Tester{} s.SetUpTest(t) defer s.TearDownTest(t) @@ -61,6 +63,8 @@ func TestEmbedEtcd(t *testing.T) { } func TestGetChangeFeeds(t *testing.T) { + t.Parallel() + s := &Tester{} s.SetUpTest(t) defer s.TearDownTest(t) @@ -103,6 +107,8 @@ func TestGetChangeFeeds(t *testing.T) { } func TestOpChangeFeedDetail(t *testing.T) { + t.Parallel() + s := &Tester{} s.SetUpTest(t) defer s.TearDownTest(t) @@ -129,6 +135,8 @@ func TestOpChangeFeedDetail(t *testing.T) { } func TestGetAllChangeFeedInfo(t *testing.T) { + t.Parallel() + s := &Tester{} s.SetUpTest(t) defer s.TearDownTest(t) @@ -171,6 +179,8 @@ func TestGetAllChangeFeedInfo(t *testing.T) { } func TestCheckMultipleCDCClusterExist(t *testing.T) { + t.Parallel() + s := &Tester{} s.SetUpTest(t) defer s.TearDownTest(t) @@ -203,26 +213,34 @@ func TestCheckMultipleCDCClusterExist(t *testing.T) { } func TestCreateChangefeed(t *testing.T) { + t.Parallel() + s := &Tester{} s.SetUpTest(t) defer s.TearDownTest(t) ctx := context.Background() detail := &model.ChangeFeedInfo{ - SinkURI: "root@tcp(127.0.0.1:3306)/mysql", + UpstreamID: 1, + Namespace: "test", + ID: "create-changefeed", + SinkURI: "root@tcp(127.0.0.1:3306)/mysql", } upstreamInfo := &model.UpstreamInfo{ID: 1} err := s.client.CreateChangefeedInfo(ctx, - upstreamInfo, detail, model.DefaultChangeFeedID("test-id")) + upstreamInfo, detail) require.NoError(t, err) err = s.client.CreateChangefeedInfo(ctx, - upstreamInfo, detail, model.DefaultChangeFeedID("test-id")) - require.True(t, cerror.ErrChangeFeedAlreadyExists.Equal(err)) + upstreamInfo, detail) + require.True(t, cerror.ErrMetaOpFailed.Equal(err)) + require.Equal(t, "[DFLOW:ErrMetaOpFailed]unexpected meta operation failure: Create changefeed test/create-changefeed", err.Error()) } func TestUpdateChangefeedAndUpstream(t *testing.T) { + t.Parallel() + s := &Tester{} s.SetUpTest(t) defer s.TearDownTest(t) @@ -234,12 +252,16 @@ func TestUpdateChangefeedAndUpstream(t *testing.T) { } changeFeedID := model.DefaultChangeFeedID("test-update-cf-and-up") changeFeedInfo := &model.ChangeFeedInfo{ - ID: changeFeedID.ID, - Namespace: changeFeedID.Namespace, - SinkURI: "blackhole://", + UpstreamID: upstreamInfo.ID, + ID: changeFeedID.ID, + Namespace: changeFeedID.Namespace, + SinkURI: "blackhole://", } - err := s.client.UpdateChangefeedAndUpstream(ctx, upstreamInfo, changeFeedInfo, changeFeedID) + err := s.client.SaveChangeFeedInfo(ctx, changeFeedInfo, changeFeedID) + require.NoError(t, err) + + err = s.client.UpdateChangefeedAndUpstream(ctx, upstreamInfo, changeFeedInfo) require.NoError(t, err) var upstreamResult *model.UpstreamInfo @@ -255,6 +277,8 @@ func TestUpdateChangefeedAndUpstream(t *testing.T) { } func TestGetAllCaptureLeases(t *testing.T) { + t.Parallel() + s := &Tester{} s.SetUpTest(t) defer s.TearDownTest(t) @@ -310,6 +334,8 @@ const ( ) func TestGetOwnerRevision(t *testing.T) { + t.Parallel() + s := &Tester{} s.SetUpTest(t) defer s.TearDownTest(t) @@ -403,6 +429,8 @@ func TestExtractKeySuffix(t *testing.T) { } func TestMigrateBackupKey(t *testing.T) { + t.Parallel() + key := MigrateBackupKey(1, "/tidb/cdc/capture/abcd") require.Equal(t, "/tidb/cdc/__backup__/1/tidb/cdc/capture/abcd", key) key = MigrateBackupKey(1, "abcdc") @@ -410,6 +438,8 @@ func TestMigrateBackupKey(t *testing.T) { } func TestDeleteCaptureInfo(t *testing.T) { + t.Parallel() + s := &Tester{} s.SetUpTest(t) defer s.TearDownTest(t) diff --git a/pkg/etcd/etcdkey_test.go b/pkg/etcd/etcdkey_test.go index a3b41165381..459cb229548 100644 --- a/pkg/etcd/etcdkey_test.go +++ b/pkg/etcd/etcdkey_test.go @@ -22,6 +22,8 @@ import ( ) func TestEtcdKey(t *testing.T) { + t.Parallel() + testcases := []struct { key string expected *CDCKey @@ -122,6 +124,8 @@ func TestEtcdKey(t *testing.T) { } func TestEtcdKeyParseError(t *testing.T) { + t.Parallel() + testCases := []struct { key string error bool diff --git a/pkg/etcd/mock/etcd_client_mock.go b/pkg/etcd/mock/etcd_client_mock.go index c9cdb00f8bd..41dd336428a 100644 --- a/pkg/etcd/mock/etcd_client_mock.go +++ b/pkg/etcd/mock/etcd_client_mock.go @@ -53,17 +53,17 @@ func (mr *MockCDCEtcdClientMockRecorder) CheckMultipleCDCClusterExist(ctx interf } // CreateChangefeedInfo mocks base method. -func (m *MockCDCEtcdClient) CreateChangefeedInfo(arg0 context.Context, arg1 *model.UpstreamInfo, arg2 *model.ChangeFeedInfo, arg3 model.ChangeFeedID) error { +func (m *MockCDCEtcdClient) CreateChangefeedInfo(arg0 context.Context, arg1 *model.UpstreamInfo, arg2 *model.ChangeFeedInfo) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CreateChangefeedInfo", arg0, arg1, arg2, arg3) + ret := m.ctrl.Call(m, "CreateChangefeedInfo", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } // CreateChangefeedInfo indicates an expected call of CreateChangefeedInfo. -func (mr *MockCDCEtcdClientMockRecorder) CreateChangefeedInfo(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { +func (mr *MockCDCEtcdClientMockRecorder) CreateChangefeedInfo(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateChangefeedInfo", reflect.TypeOf((*MockCDCEtcdClient)(nil).CreateChangefeedInfo), arg0, arg1, arg2, arg3) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateChangefeedInfo", reflect.TypeOf((*MockCDCEtcdClient)(nil).CreateChangefeedInfo), arg0, arg1, arg2) } // DeleteCaptureInfo mocks base method. @@ -287,15 +287,15 @@ func (mr *MockCDCEtcdClientMockRecorder) SaveChangeFeedInfo(ctx, info, changeFee } // UpdateChangefeedAndUpstream mocks base method. -func (m *MockCDCEtcdClient) UpdateChangefeedAndUpstream(ctx context.Context, upstreamInfo *model.UpstreamInfo, changeFeedInfo *model.ChangeFeedInfo, changeFeedID model.ChangeFeedID) error { +func (m *MockCDCEtcdClient) UpdateChangefeedAndUpstream(ctx context.Context, upstreamInfo *model.UpstreamInfo, changeFeedInfo *model.ChangeFeedInfo) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateChangefeedAndUpstream", ctx, upstreamInfo, changeFeedInfo, changeFeedID) + ret := m.ctrl.Call(m, "UpdateChangefeedAndUpstream", ctx, upstreamInfo, changeFeedInfo) ret0, _ := ret[0].(error) return ret0 } // UpdateChangefeedAndUpstream indicates an expected call of UpdateChangefeedAndUpstream. -func (mr *MockCDCEtcdClientMockRecorder) UpdateChangefeedAndUpstream(ctx, upstreamInfo, changeFeedInfo, changeFeedID interface{}) *gomock.Call { +func (mr *MockCDCEtcdClientMockRecorder) UpdateChangefeedAndUpstream(ctx, upstreamInfo, changeFeedInfo interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateChangefeedAndUpstream", reflect.TypeOf((*MockCDCEtcdClient)(nil).UpdateChangefeedAndUpstream), ctx, upstreamInfo, changeFeedInfo, changeFeedID) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateChangefeedAndUpstream", reflect.TypeOf((*MockCDCEtcdClient)(nil).UpdateChangefeedAndUpstream), ctx, upstreamInfo, changeFeedInfo) } diff --git a/pkg/etcd/util_test.go b/pkg/etcd/util_test.go index bf94f94153e..248078ec19c 100644 --- a/pkg/etcd/util_test.go +++ b/pkg/etcd/util_test.go @@ -22,6 +22,8 @@ import ( ) func TestGetRevisionFromWatchOpts(t *testing.T) { + t.Parallel() + for i := 0; i < 100; i++ { rev := rand.Int63n(math.MaxInt64) opt := clientv3.WithRev(rev)