diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index 861492bd68c..d44c3a5effa 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -499,7 +499,7 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) { zap.Any("upstreamInfo", newUpInfo)) err = owner. - UpdateChangefeedAndUpstream(ctx, newUpInfo, newCfInfo, changefeedID) + UpdateChangefeedAndUpstream(ctx, newUpInfo, newCfInfo) if err != nil { _ = c.Error(errors.Trace(err)) return diff --git a/cdc/api/v2/changefeed_test.go b/cdc/api/v2/changefeed_test.go index 39afc0b0d0b..94ff9e29133 100644 --- a/cdc/api/v2/changefeed_test.go +++ b/cdc/api/v2/changefeed_test.go @@ -474,7 +474,7 @@ func TestUpdateChangefeed(t *testing.T) { Return(&model.ChangeFeedInfo{}, &model.UpstreamInfo{}, nil). Times(1) mockOwner.EXPECT(). - UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any()). Return(cerrors.ErrEtcdAPIError).Times(1) w = httptest.NewRecorder() @@ -494,7 +494,7 @@ func TestUpdateChangefeed(t *testing.T) { Times(1) mockCapture.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes() mockOwner.EXPECT(). - UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any()). Return(nil).Times(1) w = httptest.NewRecorder() @@ -511,7 +511,7 @@ func TestUpdateChangefeed(t *testing.T) { Times(1) mockCapture.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes() mockOwner.EXPECT(). - UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any()). Return(nil).Times(1) w = httptest.NewRecorder() diff --git a/cdc/owner/mock/owner_mock.go b/cdc/owner/mock/owner_mock.go index 06f27cbe98b..8d3d178f94f 100644 --- a/cdc/owner/mock/owner_mock.go +++ b/cdc/owner/mock/owner_mock.go @@ -125,17 +125,17 @@ func (mr *MockOwnerMockRecorder) UpdateChangefeed(ctx, changeFeedInfo interface{ } // UpdateChangefeedAndUpstream mocks base method. -func (m *MockOwner) UpdateChangefeedAndUpstream(ctx context.Context, upstreamInfo *model.UpstreamInfo, changeFeedInfo *model.ChangeFeedInfo, changeFeedID model.ChangeFeedID) error { +func (m *MockOwner) UpdateChangefeedAndUpstream(ctx context.Context, upstreamInfo *model.UpstreamInfo, changeFeedInfo *model.ChangeFeedInfo) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateChangefeedAndUpstream", ctx, upstreamInfo, changeFeedInfo, changeFeedID) + ret := m.ctrl.Call(m, "UpdateChangefeedAndUpstream", ctx, upstreamInfo, changeFeedInfo) ret0, _ := ret[0].(error) return ret0 } // UpdateChangefeedAndUpstream indicates an expected call of UpdateChangefeedAndUpstream. -func (mr *MockOwnerMockRecorder) UpdateChangefeedAndUpstream(ctx, upstreamInfo, changeFeedInfo, changeFeedID interface{}) *gomock.Call { +func (mr *MockOwnerMockRecorder) UpdateChangefeedAndUpstream(ctx, upstreamInfo, changeFeedInfo interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateChangefeedAndUpstream", reflect.TypeOf((*MockOwner)(nil).UpdateChangefeedAndUpstream), ctx, upstreamInfo, changeFeedInfo, changeFeedID) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateChangefeedAndUpstream", reflect.TypeOf((*MockOwner)(nil).UpdateChangefeedAndUpstream), ctx, upstreamInfo, changeFeedInfo) } // WriteDebugInfo mocks base method. diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index 0950eb62d2f..47048975e6e 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -94,7 +94,6 @@ type Owner interface { UpdateChangefeedAndUpstream(ctx context.Context, upstreamInfo *model.UpstreamInfo, changeFeedInfo *model.ChangeFeedInfo, - changeFeedID model.ChangeFeedID, ) error UpdateChangefeed(ctx context.Context, changeFeedInfo *model.ChangeFeedInfo) error @@ -405,9 +404,8 @@ func (o *ownerImpl) AsyncStop() { func (o *ownerImpl) UpdateChangefeedAndUpstream(ctx context.Context, upstreamInfo *model.UpstreamInfo, changeFeedInfo *model.ChangeFeedInfo, - changeFeedID model.ChangeFeedID, ) error { - return o.etcdClient.UpdateChangefeedAndUpstream(ctx, upstreamInfo, changeFeedInfo, changeFeedID) + return o.etcdClient.UpdateChangefeedAndUpstream(ctx, upstreamInfo, changeFeedInfo) } func (o *ownerImpl) UpdateChangefeed(ctx context.Context, diff --git a/cdcv2/owner/owner.go b/cdcv2/owner/owner.go index cf083a8e322..7e9512aab01 100644 --- a/cdcv2/owner/owner.go +++ b/cdcv2/owner/owner.go @@ -57,7 +57,6 @@ type Owner struct { func (o *Owner) UpdateChangefeedAndUpstream(ctx context.Context, upstreamInfo *model.UpstreamInfo, changeFeedInfo *model.ChangeFeedInfo, - changeFeedID model.ChangeFeedID, ) error { panic("implement me") } diff --git a/errors.toml b/errors.toml index fee91083bc7..daee89143b3 100755 --- a/errors.toml +++ b/errors.toml @@ -1353,7 +1353,7 @@ meta operation fail ["DFLOW:ErrMetaOpFailed"] error = ''' -meta operation %s is failed +unexpected meta operation failure: %s ''' ["DFLOW:ErrMetaOptionConflict"] diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index bc4f1142b7b..fc79c5392e8 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -962,7 +962,7 @@ var ( errors.RFCCodeText("CDC:ErrMetaOpIgnored"), ) ErrMetaOpFailed = errors.Normalize( - "meta operation %s is failed", + "unexpected meta operation failure: %s", errors.RFCCodeText("DFLOW:ErrMetaOpFailed"), ) ErrMetaInvalidState = errors.Normalize( diff --git a/pkg/etcd/client_test.go b/pkg/etcd/client_test.go index 4e5731d3538..cd174b92117 100644 --- a/pkg/etcd/client_test.go +++ b/pkg/etcd/client_test.go @@ -71,6 +71,8 @@ func (m mockWatcher) RequestProgress(ctx context.Context) error { } func TestRetry(t *testing.T) { + t.Parallel() + originValue := maxTries // to speedup the test maxTries = 2 @@ -116,6 +118,8 @@ func TestRetry(t *testing.T) { } func TestDelegateLease(t *testing.T) { + t.Parallel() + ctx := context.Background() url, server, err := SetupEmbedEtcd(t.TempDir()) defer func() { @@ -148,6 +152,8 @@ func TestDelegateLease(t *testing.T) { // test no data lost when WatchCh blocked func TestWatchChBlocked(t *testing.T) { + t.Parallel() + cli := clientv3.NewCtxClient(context.TODO()) resetCount := int32(0) requestCount := int32(0) @@ -209,6 +215,8 @@ func TestWatchChBlocked(t *testing.T) { // test no data lost when OutCh blocked func TestOutChBlocked(t *testing.T) { + t.Parallel() + cli := clientv3.NewCtxClient(context.TODO()) resetCount := int32(0) requestCount := int32(0) @@ -260,8 +268,9 @@ func TestOutChBlocked(t *testing.T) { } func TestRevisionNotFallBack(t *testing.T) { - cli := clientv3.NewCtxClient(context.TODO()) + t.Parallel() + cli := clientv3.NewCtxClient(context.TODO()) resetCount := int32(0) requestCount := int32(0) rev := int64(0) diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index bdf3c17a874..c88053ff199 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -14,17 +14,17 @@ package etcd import ( + "bytes" "context" "fmt" "net/url" "strings" "time" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" - cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/pkg/utils/tempurl" "go.etcd.io/etcd/api/v3/mvccpb" @@ -150,7 +150,6 @@ type CDCEtcdClient interface { UpdateChangefeedAndUpstream(ctx context.Context, upstreamInfo *model.UpstreamInfo, changeFeedInfo *model.ChangeFeedInfo, - changeFeedID model.ChangeFeedID, ) error PutCaptureInfo(context.Context, *model.CaptureInfo, clientv3.LeaseID) error @@ -201,7 +200,7 @@ func (c *CDCEtcdClientImpl) Close() error { // ClearAllCDCInfo delete all keys created by CDC func (c *CDCEtcdClientImpl) ClearAllCDCInfo(ctx context.Context) error { _, err := c.Client.Delete(ctx, BaseKey(c.ClusterID), clientv3.WithPrefix()) - return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return errors.WrapError(errors.ErrPDEtcdAPIError, err) } // GetClusterID gets CDC cluster ID. @@ -218,7 +217,7 @@ func (c *CDCEtcdClientImpl) GetEtcdClient() *Client { func (c *CDCEtcdClientImpl) GetAllCDCInfo(ctx context.Context) ([]*mvccpb.KeyValue, error) { resp, err := c.Client.Get(ctx, BaseKey(c.ClusterID), clientv3.WithPrefix()) if err != nil { - return nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return nil, errors.WrapError(errors.ErrPDEtcdAPIError, err) } return resp.Kvs, nil } @@ -230,7 +229,7 @@ func (c *CDCEtcdClientImpl) CheckMultipleCDCClusterExist(ctx context.Context) er clientv3.WithPrefix(), clientv3.WithKeysOnly()) if err != nil { - return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return errors.WrapError(errors.ErrPDEtcdAPIError, err) } for _, kv := range resp.Kvs { key := string(kv.Key) @@ -249,7 +248,7 @@ func (c *CDCEtcdClientImpl) CheckMultipleCDCClusterExist(ctx context.Context) er if isReserved { continue } - return cerror.ErrMultipleCDCClustersExist.GenWithStackByArgs() + return errors.ErrMultipleCDCClustersExist.GenWithStackByArgs() } return nil } @@ -264,7 +263,7 @@ func (c *CDCEtcdClientImpl) GetChangeFeeds(ctx context.Context) ( resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix()) if err != nil { - return 0, nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return 0, nil, errors.WrapError(errors.ErrPDEtcdAPIError, err) } revision := resp.Header.Revision details := make(map[model.ChangeFeedID]*mvccpb.KeyValue, resp.Count) @@ -305,10 +304,10 @@ func (c *CDCEtcdClientImpl) GetChangeFeedInfo(ctx context.Context, key := GetEtcdKeyChangeFeedInfo(c.ClusterID, id) resp, err := c.Client.Get(ctx, key) if err != nil { - return nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return nil, errors.WrapError(errors.ErrPDEtcdAPIError, err) } if resp.Count == 0 { - return nil, cerror.ErrChangeFeedNotExists.GenWithStackByArgs(key) + return nil, errors.ErrChangeFeedNotExists.GenWithStackByArgs(key) } detail := &model.ChangeFeedInfo{} err = detail.Unmarshal(resp.Kvs[0].Value) @@ -321,7 +320,7 @@ func (c *CDCEtcdClientImpl) DeleteChangeFeedInfo(ctx context.Context, ) error { key := GetEtcdKeyChangeFeedInfo(c.ClusterID, id) _, err := c.Client.Delete(ctx, key) - return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return errors.WrapError(errors.ErrPDEtcdAPIError, err) } // GetChangeFeedStatus queries the checkpointTs and resovledTs of a given changefeed @@ -331,10 +330,10 @@ func (c *CDCEtcdClientImpl) GetChangeFeedStatus(ctx context.Context, key := GetEtcdKeyJob(c.ClusterID, id) resp, err := c.Client.Get(ctx, key) if err != nil { - return nil, 0, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return nil, 0, errors.WrapError(errors.ErrPDEtcdAPIError, err) } if resp.Count == 0 { - return nil, 0, cerror.ErrChangeFeedNotExists.GenWithStackByArgs(key) + return nil, 0, errors.ErrChangeFeedNotExists.GenWithStackByArgs(key) } info := &model.ChangeFeedStatus{} err = info.Unmarshal(resp.Kvs[0].Value) @@ -347,7 +346,7 @@ func (c *CDCEtcdClientImpl) GetCaptures(ctx context.Context) (int64, []*model.Ca resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix()) if err != nil { - return 0, nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return 0, nil, errors.WrapError(errors.ErrPDEtcdAPIError, err) } revision := resp.Header.Revision infos := make([]*model.CaptureInfo, 0, resp.Count) @@ -371,11 +370,11 @@ func (c *CDCEtcdClientImpl) GetCaptureInfo( resp, err := c.Client.Get(ctx, key) if err != nil { - return nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return nil, errors.WrapError(errors.ErrPDEtcdAPIError, err) } if len(resp.Kvs) == 0 { - return nil, cerror.ErrCaptureNotExist.GenWithStackByArgs(key) + return nil, errors.ErrCaptureNotExist.GenWithStackByArgs(key) } info = new(model.CaptureInfo) @@ -393,7 +392,7 @@ func (c *CDCEtcdClientImpl) GetCaptureLeases(ctx context.Context) (map[string]in resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix()) if err != nil { - return nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return nil, errors.WrapError(errors.ErrPDEtcdAPIError, err) } leases := make(map[string]int64, resp.Count) for _, kv := range resp.Kvs { @@ -416,111 +415,117 @@ func (c *CDCEtcdClientImpl) RevokeAllLeases(ctx context.Context, leases map[stri // it means the etcd lease is already expired or revoked continue } - return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return errors.WrapError(errors.ErrPDEtcdAPIError, err) } return nil } // CreateChangefeedInfo creates a change feed info into etcd and fails if it is already exists. -func (c *CDCEtcdClientImpl) CreateChangefeedInfo(ctx context.Context, - upstreamInfo *model.UpstreamInfo, - info *model.ChangeFeedInfo, +func (c *CDCEtcdClientImpl) CreateChangefeedInfo( + ctx context.Context, upstreamInfo *model.UpstreamInfo, info *model.ChangeFeedInfo, +) error { + return c.saveChangefeedAndUpstreamInfo(ctx, "Create", upstreamInfo, info) +} + +// UpdateChangefeedAndUpstream updates the changefeed's info and its upstream info into etcd +func (c *CDCEtcdClientImpl) UpdateChangefeedAndUpstream( + ctx context.Context, upstreamInfo *model.UpstreamInfo, changeFeedInfo *model.ChangeFeedInfo, +) error { + return c.saveChangefeedAndUpstreamInfo(ctx, "Update", upstreamInfo, changeFeedInfo) +} + +// saveChangefeedAndUpstreamInfo stores changefeed info and its upstream info into etcd +func (c *CDCEtcdClientImpl) saveChangefeedAndUpstreamInfo( + ctx context.Context, operation string, + upstreamInfo *model.UpstreamInfo, info *model.ChangeFeedInfo, ) error { + cmps := []clientv3.Cmp{} + opsThen := []clientv3.Op{} + + if upstreamInfo != nil { + if info.UpstreamID != upstreamInfo.ID { + return errors.ErrUpstreamMissMatch.GenWithStackByArgs(info.UpstreamID, upstreamInfo.ID) + } + upstreamInfoKey := CDCKey{ + Tp: CDCKeyTypeUpStream, + ClusterID: c.ClusterID, + UpstreamID: upstreamInfo.ID, + Namespace: info.Namespace, + } + upstreamEtcdKeyStr := upstreamInfoKey.String() + upstreamResp, err := c.Client.Get(ctx, upstreamEtcdKeyStr) + if err != nil { + return errors.WrapError(errors.ErrPDEtcdAPIError, err) + } + upstreamData, err := upstreamInfo.Marshal() + if err != nil { + return errors.WrapError(errors.ErrPDEtcdAPIError, err) + } + + if len(upstreamResp.Kvs) == 0 { + cmps = append(cmps, clientv3.Compare(clientv3.ModRevision(upstreamEtcdKeyStr), "=", 0)) + opsThen = append(opsThen, clientv3.OpPut(upstreamEtcdKeyStr, string(upstreamData))) + } else { + cmps = append(cmps, + clientv3.Compare(clientv3.ModRevision(upstreamEtcdKeyStr), "=", upstreamResp.Kvs[0].ModRevision)) + if !bytes.Equal(upstreamResp.Kvs[0].Value, upstreamData) { + opsThen = append(opsThen, clientv3.OpPut(upstreamEtcdKeyStr, string(upstreamData))) + } + } + } + changeFeedID := model.ChangeFeedID{ Namespace: info.Namespace, ID: info.ID, } infoKey := GetEtcdKeyChangeFeedInfo(c.ClusterID, changeFeedID) jobKey := GetEtcdKeyJob(c.ClusterID, changeFeedID) - upstreamInfoKey := CDCKey{ - Tp: CDCKeyTypeUpStream, - ClusterID: c.ClusterID, - UpstreamID: upstreamInfo.ID, - Namespace: changeFeedID.Namespace, - } - upstreamEtcdKeyStr := upstreamInfoKey.String() - info.UpstreamID = upstreamInfo.ID - value, err := info.Marshal() + infoData, err := info.Marshal() if err != nil { return errors.Trace(err) } - upstreamResp, err := c.Client.Get(ctx, upstreamEtcdKeyStr) - if err != nil { - return errors.Trace(err) - } - upstreamData, err := upstreamInfo.Marshal() - if err != nil { - return errors.Trace(err) - } - cmps := []clientv3.Cmp{ - clientv3.Compare(clientv3.ModRevision(infoKey), "=", 0), - clientv3.Compare(clientv3.ModRevision(jobKey), "=", 0), - } - opsThen := []clientv3.Op{ - clientv3.OpPut(infoKey, value), - clientv3.OpPut(upstreamEtcdKeyStr, string(upstreamData)), - } - if len(upstreamResp.Kvs) == 0 { - cmps = append(cmps, - clientv3.Compare(clientv3.ModRevision(upstreamEtcdKeyStr), - "=", 0)) - } else { - cmps = append(cmps, - clientv3.Compare(clientv3.ModRevision(upstreamInfoKey.String()), - "=", upstreamResp.Kvs[0].ModRevision)) - } - resp, err := c.Client.Txn(ctx, cmps, opsThen, TxnEmptyOpsElse) - if err != nil { - return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) - } - if !resp.Succeeded { - log.Warn("changefeed already exists, ignore create changefeed", - zap.String("namespace", changeFeedID.Namespace), - zap.String("changefeed", changeFeedID.ID)) - return cerror.ErrChangeFeedAlreadyExists.GenWithStackByArgs(changeFeedID) - } - return errors.Trace(err) -} + var infoModRevsion, jobModRevision int64 + if operation == "Update" { + infoResp, err := c.Client.Get(ctx, infoKey) + if err != nil { + return errors.WrapError(errors.ErrPDEtcdAPIError, err) + } + if len(infoResp.Kvs) == 0 { + return errors.ErrChangeFeedNotExists.GenWithStackByArgs(infoKey) + } + infoModRevsion = infoResp.Kvs[0].ModRevision -// UpdateChangefeedAndUpstream updates the changefeed's info and its upstream info into etcd -func (c *CDCEtcdClientImpl) UpdateChangefeedAndUpstream(ctx context.Context, - upstreamInfo *model.UpstreamInfo, - changeFeedInfo *model.ChangeFeedInfo, - changeFeedID model.ChangeFeedID, -) error { - infoKey := GetEtcdKeyChangeFeedInfo(c.ClusterID, changeFeedID) - changeFeedInfoStr, err := changeFeedInfo.Marshal() - if err != nil { - return errors.Trace(err) - } - upstreamKey := CDCKey{ - Tp: CDCKeyTypeUpStream, - ClusterID: c.ClusterID, - UpstreamID: upstreamInfo.ID, - Namespace: changeFeedID.Namespace, - } - upstreamKeyStr := upstreamKey.String() - upstreamInfoStr, err := upstreamInfo.Marshal() - if err != nil { - return errors.Trace(err) - } - opsThen := []clientv3.Op{ - clientv3.OpPut(infoKey, changeFeedInfoStr), - clientv3.OpPut(upstreamKeyStr, string(upstreamInfoStr)), + jobResp, err := c.Client.Get(ctx, jobKey) + if err != nil { + return errors.WrapError(errors.ErrPDEtcdAPIError, err) + } + if len(jobResp.Kvs) == 0 { + // Note that status may not exist, so we don't check it here. + log.Debug("job status not exists", zap.Stringer("changefeed", changeFeedID)) + } else { + jobModRevision = jobResp.Kvs[0].ModRevision + } } - resp, err := c.Client.Txn(ctx, txnEmptyCmps, opsThen, TxnEmptyOpsElse) + cmps = append(cmps, + clientv3.Compare(clientv3.ModRevision(infoKey), "=", infoModRevsion), + clientv3.Compare(clientv3.ModRevision(jobKey), "=", jobModRevision), + ) + opsThen = append(opsThen, clientv3.OpPut(infoKey, infoData)) + + resp, err := c.Client.Txn(ctx, cmps, opsThen, TxnEmptyOpsElse) if err != nil { - return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return errors.WrapError(errors.ErrPDEtcdAPIError, err) } if !resp.Succeeded { - log.Warn("unexpected etcd transaction failure", + log.Warn(fmt.Sprintf("unexpected etcd transaction failure, operation: %s", operation), zap.String("namespace", changeFeedID.Namespace), zap.String("changefeed", changeFeedID.ID)) - return cerror.ErrChangefeedUpdateFailedTransaction.GenWithStackByArgs(changeFeedID) + errMsg := fmt.Sprintf("%s changefeed %s", operation, changeFeedID) + return errors.ErrMetaOpFailed.GenWithStackByArgs(errMsg) } - return nil + return errors.Trace(err) } // SaveChangeFeedInfo stores change feed info into etcd @@ -535,7 +540,7 @@ func (c *CDCEtcdClientImpl) SaveChangeFeedInfo(ctx context.Context, return errors.Trace(err) } _, err = c.Client.Put(ctx, key, value) - return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return errors.WrapError(errors.ErrPDEtcdAPIError, err) } // PutCaptureInfo put capture info into etcd, @@ -550,7 +555,7 @@ func (c *CDCEtcdClientImpl) PutCaptureInfo( key := GetEtcdKeyCaptureInfo(c.ClusterID, info.ID) _, err = c.Client.Put(ctx, key, string(data), clientv3.WithLease(leaseID)) - return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return errors.WrapError(errors.ErrPDEtcdAPIError, err) } // DeleteCaptureInfo delete all capture related info from etcd. @@ -558,7 +563,7 @@ func (c *CDCEtcdClientImpl) DeleteCaptureInfo(ctx context.Context, captureID str key := GetEtcdKeyCaptureInfo(c.ClusterID, captureID) _, err := c.Client.Delete(ctx, key) if err != nil { - return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return errors.WrapError(errors.ErrPDEtcdAPIError, err) } // we need to clean all task position related to this capture when the capture is offline // otherwise the task positions may leak @@ -573,7 +578,7 @@ func (c *CDCEtcdClientImpl) DeleteCaptureInfo(ctx context.Context, captureID str zap.String("captureID", captureID), zap.String("key", key), zap.Error(err)) } - return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return errors.WrapError(errors.ErrPDEtcdAPIError, err) } // GetOwnerID returns the owner id by querying etcd @@ -581,7 +586,7 @@ func (c *CDCEtcdClientImpl) GetOwnerID(ctx context.Context) (string, error) { resp, err := c.Client.Get(ctx, CaptureOwnerKey(c.ClusterID), clientv3.WithFirstCreate()...) if err != nil { - return "", cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return "", errors.WrapError(errors.ErrPDEtcdAPIError, err) } if len(resp.Kvs) == 0 { return "", concurrency.ErrElectionNoLeader @@ -595,14 +600,14 @@ func (c *CDCEtcdClientImpl) GetOwnerRevision( ) (rev int64, err error) { resp, err := c.Client.Get(ctx, CaptureOwnerKey(c.ClusterID), clientv3.WithFirstCreate()...) if err != nil { - return 0, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return 0, errors.WrapError(errors.ErrPDEtcdAPIError, err) } if len(resp.Kvs) == 0 { - return 0, cerror.ErrOwnerNotFound.GenWithStackByArgs() + return 0, errors.ErrOwnerNotFound.GenWithStackByArgs() } // Checks that the given capture is indeed the owner. if string(resp.Kvs[0].Value) != captureID { - return 0, cerror.ErrNotOwner.GenWithStackByArgs() + return 0, errors.ErrNotOwner.GenWithStackByArgs() } return resp.Kvs[0].ModRevision, nil } @@ -631,10 +636,10 @@ func (c *CDCEtcdClientImpl) GetUpstreamInfo(ctx context.Context, KeyStr := Key.String() resp, err := c.Client.Get(ctx, KeyStr) if err != nil { - return nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + return nil, errors.WrapError(errors.ErrPDEtcdAPIError, err) } if resp.Count == 0 { - return nil, cerror.ErrUpstreamNotFound.GenWithStackByArgs(KeyStr) + return nil, errors.ErrUpstreamNotFound.GenWithStackByArgs(KeyStr) } info := &model.UpstreamInfo{} err = info.Unmarshal(resp.Kvs[0].Value) @@ -695,7 +700,7 @@ func SetupEmbedEtcd(dir string) (clientURL *url.URL, e *embed.Etcd, err error) { func extractKeySuffix(key string) (string, error) { subs := strings.Split(key, "/") if len(subs) < 2 { - return "", cerror.ErrInvalidEtcdKey.GenWithStackByArgs(key) + return "", errors.ErrInvalidEtcdKey.GenWithStackByArgs(key) } return subs[len(subs)-1], nil } diff --git a/pkg/etcd/etcd_test.go b/pkg/etcd/etcd_test.go index 15388d68f8f..daa83ecd0e5 100644 --- a/pkg/etcd/etcd_test.go +++ b/pkg/etcd/etcd_test.go @@ -37,6 +37,8 @@ func (c Captures) Less(i, j int) bool { return c[i].ID < c[j].ID } func (c Captures) Swap(i, j int) { c[i], c[j] = c[j], c[i] } func TestEmbedEtcd(t *testing.T) { + t.Parallel() + s := &Tester{} s.SetUpTest(t) defer s.TearDownTest(t) @@ -61,6 +63,8 @@ func TestEmbedEtcd(t *testing.T) { } func TestGetChangeFeeds(t *testing.T) { + t.Parallel() + s := &Tester{} s.SetUpTest(t) defer s.TearDownTest(t) @@ -103,6 +107,8 @@ func TestGetChangeFeeds(t *testing.T) { } func TestOpChangeFeedDetail(t *testing.T) { + t.Parallel() + s := &Tester{} s.SetUpTest(t) defer s.TearDownTest(t) @@ -129,6 +135,8 @@ func TestOpChangeFeedDetail(t *testing.T) { } func TestGetAllChangeFeedInfo(t *testing.T) { + t.Parallel() + s := &Tester{} s.SetUpTest(t) defer s.TearDownTest(t) @@ -171,6 +179,8 @@ func TestGetAllChangeFeedInfo(t *testing.T) { } func TestCheckMultipleCDCClusterExist(t *testing.T) { + t.Parallel() + s := &Tester{} s.SetUpTest(t) defer s.TearDownTest(t) @@ -203,13 +213,18 @@ func TestCheckMultipleCDCClusterExist(t *testing.T) { } func TestCreateChangefeed(t *testing.T) { + t.Parallel() + s := &Tester{} s.SetUpTest(t) defer s.TearDownTest(t) ctx := context.Background() detail := &model.ChangeFeedInfo{ - SinkURI: "root@tcp(127.0.0.1:3306)/mysql", + UpstreamID: 1, + Namespace: "test", + ID: "create-changefeed", + SinkURI: "root@tcp(127.0.0.1:3306)/mysql", } upstreamInfo := &model.UpstreamInfo{ID: 1} @@ -219,10 +234,13 @@ func TestCreateChangefeed(t *testing.T) { err = s.client.CreateChangefeedInfo(ctx, upstreamInfo, detail) - require.True(t, cerror.ErrChangeFeedAlreadyExists.Equal(err)) + require.True(t, cerror.ErrMetaOpFailed.Equal(err)) + require.Equal(t, "[DFLOW:ErrMetaOpFailed]unexpected meta operation failure: Create changefeed test/create-changefeed", err.Error()) } func TestUpdateChangefeedAndUpstream(t *testing.T) { + t.Parallel() + s := &Tester{} s.SetUpTest(t) defer s.TearDownTest(t) @@ -234,12 +252,16 @@ func TestUpdateChangefeedAndUpstream(t *testing.T) { } changeFeedID := model.DefaultChangeFeedID("test-update-cf-and-up") changeFeedInfo := &model.ChangeFeedInfo{ - ID: changeFeedID.ID, - Namespace: changeFeedID.Namespace, - SinkURI: "blackhole://", + UpstreamID: upstreamInfo.ID, + ID: changeFeedID.ID, + Namespace: changeFeedID.Namespace, + SinkURI: "blackhole://", } - err := s.client.UpdateChangefeedAndUpstream(ctx, upstreamInfo, changeFeedInfo, changeFeedID) + err := s.client.SaveChangeFeedInfo(ctx, changeFeedInfo, changeFeedID) + require.NoError(t, err) + + err = s.client.UpdateChangefeedAndUpstream(ctx, upstreamInfo, changeFeedInfo) require.NoError(t, err) var upstreamResult *model.UpstreamInfo @@ -255,6 +277,8 @@ func TestUpdateChangefeedAndUpstream(t *testing.T) { } func TestGetAllCaptureLeases(t *testing.T) { + t.Parallel() + s := &Tester{} s.SetUpTest(t) defer s.TearDownTest(t) @@ -310,6 +334,8 @@ const ( ) func TestGetOwnerRevision(t *testing.T) { + t.Parallel() + s := &Tester{} s.SetUpTest(t) defer s.TearDownTest(t) @@ -403,6 +429,8 @@ func TestExtractKeySuffix(t *testing.T) { } func TestMigrateBackupKey(t *testing.T) { + t.Parallel() + key := MigrateBackupKey(1, "/tidb/cdc/capture/abcd") require.Equal(t, "/tidb/cdc/__backup__/1/tidb/cdc/capture/abcd", key) key = MigrateBackupKey(1, "abcdc") @@ -410,6 +438,8 @@ func TestMigrateBackupKey(t *testing.T) { } func TestDeleteCaptureInfo(t *testing.T) { + t.Parallel() + s := &Tester{} s.SetUpTest(t) defer s.TearDownTest(t) diff --git a/pkg/etcd/etcdkey_test.go b/pkg/etcd/etcdkey_test.go index a3b41165381..459cb229548 100644 --- a/pkg/etcd/etcdkey_test.go +++ b/pkg/etcd/etcdkey_test.go @@ -22,6 +22,8 @@ import ( ) func TestEtcdKey(t *testing.T) { + t.Parallel() + testcases := []struct { key string expected *CDCKey @@ -122,6 +124,8 @@ func TestEtcdKey(t *testing.T) { } func TestEtcdKeyParseError(t *testing.T) { + t.Parallel() + testCases := []struct { key string error bool diff --git a/pkg/etcd/mock/etcd_client_mock.go b/pkg/etcd/mock/etcd_client_mock.go index dd2cf19c6ec..40b7ddaac71 100644 --- a/pkg/etcd/mock/etcd_client_mock.go +++ b/pkg/etcd/mock/etcd_client_mock.go @@ -356,15 +356,15 @@ func (mr *MockCDCEtcdClientMockRecorder) SaveChangeFeedInfo(ctx, info, changeFee } // UpdateChangefeedAndUpstream mocks base method. -func (m *MockCDCEtcdClient) UpdateChangefeedAndUpstream(ctx context.Context, upstreamInfo *model.UpstreamInfo, changeFeedInfo *model.ChangeFeedInfo, changeFeedID model.ChangeFeedID) error { +func (m *MockCDCEtcdClient) UpdateChangefeedAndUpstream(ctx context.Context, upstreamInfo *model.UpstreamInfo, changeFeedInfo *model.ChangeFeedInfo) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateChangefeedAndUpstream", ctx, upstreamInfo, changeFeedInfo, changeFeedID) + ret := m.ctrl.Call(m, "UpdateChangefeedAndUpstream", ctx, upstreamInfo, changeFeedInfo) ret0, _ := ret[0].(error) return ret0 } // UpdateChangefeedAndUpstream indicates an expected call of UpdateChangefeedAndUpstream. -func (mr *MockCDCEtcdClientMockRecorder) UpdateChangefeedAndUpstream(ctx, upstreamInfo, changeFeedInfo, changeFeedID interface{}) *gomock.Call { +func (mr *MockCDCEtcdClientMockRecorder) UpdateChangefeedAndUpstream(ctx, upstreamInfo, changeFeedInfo interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateChangefeedAndUpstream", reflect.TypeOf((*MockCDCEtcdClient)(nil).UpdateChangefeedAndUpstream), ctx, upstreamInfo, changeFeedInfo, changeFeedID) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateChangefeedAndUpstream", reflect.TypeOf((*MockCDCEtcdClient)(nil).UpdateChangefeedAndUpstream), ctx, upstreamInfo, changeFeedInfo) } diff --git a/pkg/etcd/util_test.go b/pkg/etcd/util_test.go index bf94f94153e..248078ec19c 100644 --- a/pkg/etcd/util_test.go +++ b/pkg/etcd/util_test.go @@ -22,6 +22,8 @@ import ( ) func TestGetRevisionFromWatchOpts(t *testing.T) { + t.Parallel() + for i := 0; i < 100; i++ { rev := rand.Int63n(math.MaxInt64) opt := clientv3.WithRev(rev)