diff --git a/cdc/api/validator.go b/cdc/api/validator.go index 0d33a23a9be..76098108817 100644 --- a/cdc/api/validator.go +++ b/cdc/api/validator.go @@ -80,6 +80,7 @@ func verifyCreateChangefeedConfig( if err := gc.EnsureChangefeedStartTsSafety( ctx, upStream.PDClient, + capture.EtcdClient.GetEnsureGCServiceID(), model.DefaultChangeFeedID(changefeedConfig.ID), ensureTTL, changefeedConfig.StartTS); err != nil { if !cerror.ErrStartTsBeforeGC.Equal(err) { diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index a2a304b10d4..74dee5940cb 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -127,7 +127,7 @@ func (c *Capture) reset(ctx context.Context) error { if c.UpstreamManager != nil { c.UpstreamManager.Close() } - c.UpstreamManager = upstream.NewManager(ctx) + c.UpstreamManager = upstream.NewManager(ctx, c.EtcdClient.GetGCServiceID()) err = c.UpstreamManager.Add(upstream.DefaultUpstreamID, c.pdEnpoints, conf.Security) if err != nil { return errors.Annotate( diff --git a/cdc/capture/capture_test.go b/cdc/capture/capture_test.go index c89a416fd01..b263ce4c5fd 100644 --- a/cdc/capture/capture_test.go +++ b/cdc/capture/capture_test.go @@ -42,7 +42,8 @@ func TestReset(t *testing.T) { DialTimeout: 3 * time.Second, }) require.NoError(t, err) - client := etcd.NewCDCEtcdClient(ctx, etcdCli, etcd.DefaultCDCClusterID) + client, err := etcd.NewCDCEtcdClient(ctx, etcdCli, etcd.DefaultCDCClusterID) + require.Nil(t, err) // Close the client before the test function exits to prevent possible // ctx leaks. // Ref: https://github.com/grpc/grpc-go/blob/master/stream.go#L229 diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 95f3df19e33..2cf9dccad9a 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -328,7 +328,9 @@ LOOP: // See more gc doc. ensureTTL := int64(10 * 60) err := gc.EnsureChangefeedStartTsSafety( - ctx, c.upStream.PDClient, c.state.ID, ensureTTL, checkpointTs) + ctx, c.upStream.PDClient, + ctx.GlobalVars().EtcdClient.GetEnsureGCServiceID(), + c.state.ID, ensureTTL, checkpointTs) if err != nil { return errors.Trace(err) } diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index c251d642c15..814697a83e3 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -36,7 +36,6 @@ import ( "github.com/pingcap/tiflow/pkg/orchestrator" "github.com/pingcap/tiflow/pkg/txnutil/gc" "github.com/pingcap/tiflow/pkg/upstream" - "github.com/pingcap/tiflow/pkg/version" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" ) @@ -237,6 +236,7 @@ func TestInitialize(t *testing.T) { tester.MustApplyPatches() // initialize + ctx.GlobalVars().EtcdClient = &etcd.CDCEtcdClient{} cf.Tick(ctx, state, captures) tester.MustApplyPatches() require.Equal(t, state.Status.CheckpointTs, ctx.ChangefeedVars().Info.StartTs) @@ -271,20 +271,8 @@ func TestExecDDL(t *testing.T) { job := helper.DDL2Job("create table test0.table0(id int primary key)") startTs := job.BinlogInfo.FinishedTS + 1000 - ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{ - CaptureInfo: &model.CaptureInfo{ - ID: "capture-id-test", - AdvertiseAddr: "127.0.0.1:0000", - Version: version.ReleaseVersion, - }, - }) - ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ - ID: model.DefaultChangeFeedID("changefeed-id-test"), - Info: &model.ChangeFeedInfo{ - StartTs: startTs, - Config: config.GetDefaultReplicaConfig(), - }, - }) + ctx := cdcContext.NewContext4Test(context.Background(), true) + ctx.ChangefeedVars().Info.StartTs = startTs cf, state, captures, tester := createChangefeed4Test(ctx, t) cf.upStream.KVStorage = helper.Storage() @@ -361,20 +349,8 @@ func TestEmitCheckpointTs(t *testing.T) { job := helper.DDL2Job("create table test0.table0(id int primary key)") startTs := job.BinlogInfo.FinishedTS + 1000 - ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{ - CaptureInfo: &model.CaptureInfo{ - ID: "capture-id-test", - AdvertiseAddr: "127.0.0.1:0000", - Version: version.ReleaseVersion, - }, - }) - ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ - ID: model.DefaultChangeFeedID("changefeed-id-test"), - Info: &model.ChangeFeedInfo{ - StartTs: startTs, - Config: config.GetDefaultReplicaConfig(), - }, - }) + ctx := cdcContext.NewContext4Test(context.Background(), true) + ctx.ChangefeedVars().Info.StartTs = startTs cf, state, captures, tester := createChangefeed4Test(ctx, t) cf.upStream.KVStorage = helper.Storage() diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index 3b6b9541e1a..a157c49b29b 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -421,7 +421,7 @@ func TestUpdateGCSafePoint(t *testing.T) { // Owner will do a snapshot read at (checkpointTs - 1) from TiKV, // set GC safepoint to (checkpointTs - 1) require.Equal(t, safePoint, uint64(1)) - require.Equal(t, serviceID, gc.CDCServiceSafePointID) + require.Equal(t, serviceID, etcd.GcServiceIDForTest()) ch <- struct{}{} return 0, nil } @@ -462,7 +462,7 @@ func TestUpdateGCSafePoint(t *testing.T) { // Owner will do a snapshot read at (checkpointTs - 1) from TiKV, // set GC safepoint to (checkpointTs - 1) require.Equal(t, safePoint, uint64(19)) - require.Equal(t, serviceID, gc.CDCServiceSafePointID) + require.Equal(t, serviceID, etcd.GcServiceIDForTest()) ch <- struct{}{} return 0, nil } diff --git a/cdc/scheduler/internal/base/processor_agent_test.go b/cdc/scheduler/internal/base/processor_agent_test.go index 106f5e95d15..2c6ab6f80fd 100644 --- a/cdc/scheduler/internal/base/processor_agent_test.go +++ b/cdc/scheduler/internal/base/processor_agent_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/failpoint" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" @@ -47,9 +48,10 @@ const ( // TODO add a real unit test with mock components for the agent alone, // which might require refactoring some existing components. type agentTestSuite struct { - cluster *p2p.MockCluster - etcdClient *clientv3.Client - etcdKVClient *mockEtcdKVClient + cluster *p2p.MockCluster + etcdClient *clientv3.Client + etcdKVClient *mockEtcdKVClient + etcdClusterClient *mockEtcdClusterClient tableExecutor *MockTableExecutor dispatchResponseCh chan *protocol.DispatchTableResponseMessage @@ -68,7 +70,7 @@ type agentTestSuite struct { func newAgentTestSuite(t *testing.T) *agentTestSuite { ctx, cancel := context.WithCancel(context.Background()) - etcdCli, KVCli := newMockEtcdClientForAgentTests(ctx) + etcdCli, KVCli, clusterCli := newMockEtcdClientForAgentTests(ctx) cluster := p2p.NewMockCluster(t, agentTestMockNodeNum) ownerMessageServer := cluster.Nodes[ownerCaptureID].Server @@ -77,9 +79,10 @@ func newAgentTestSuite(t *testing.T) *agentTestSuite { require.NotNil(t, ownerMessageClient) ret := &agentTestSuite{ - cluster: cluster, - etcdClient: etcdCli, - etcdKVClient: KVCli, + cluster: cluster, + etcdClient: etcdCli, + etcdKVClient: KVCli, + etcdClusterClient: clusterCli, // The channel sizes 1024 should be more than sufficient for these tests. // Full channels will result in panics to make the cases fail. @@ -153,7 +156,8 @@ func newAgentTestSuite(t *testing.T) *agentTestSuite { } func (s *agentTestSuite) CreateAgent(t *testing.T) (*agentImpl, error) { - cdcEtcdClient := etcd.NewCDCEtcdClient(s.ctx, s.etcdClient, etcd.DefaultCDCClusterID) + cdcEtcdClient, err := etcd.NewCDCEtcdClient(s.ctx, s.etcdClient, etcd.DefaultCDCClusterID) + require.Nil(t, err) messageServer := s.cluster.Nodes["capture-1"].Server messageRouter := s.cluster.Nodes["capture-1"].Router s.tableExecutor = NewMockTableExecutor(t) @@ -193,11 +197,15 @@ func (s *agentTestSuite) Close() { // NOTE: The mock client does not have any useful internal logic. // It only supports GET operations and any output should be supplied by // calling the mock.Mock methods embedded in the mock client. -func newMockEtcdClientForAgentTests(ctx context.Context) (*clientv3.Client, *mockEtcdKVClient) { +func newMockEtcdClientForAgentTests(ctx context.Context) (*clientv3.Client, + *mockEtcdKVClient, *mockEtcdClusterClient, +) { cli := clientv3.NewCtxClient(ctx) mockKVCli := &mockEtcdKVClient{} cli.KV = mockKVCli - return cli, mockKVCli + mockClusterCli := &mockEtcdClusterClient{} + cli.Cluster = mockClusterCli + return cli, mockKVCli, mockClusterCli } type mockEtcdKVClient struct { @@ -214,6 +222,22 @@ func (c *mockEtcdKVClient) Get(ctx context.Context, key string, opts ...clientv3 return resp, args.Error(1) } +type mockEtcdClusterClient struct { + clientv3.Cluster // embeds a null implementation of the Etcd Cluster client + mock.Mock +} + +func (c *mockEtcdClusterClient) MemberList( + ctx context.Context, +) (*clientv3.MemberListResponse, error) { + args := c.Called(ctx) + resp := (*clientv3.MemberListResponse)(nil) + if args.Get(0) != nil { + resp = args.Get(0).(*clientv3.MemberListResponse) + } + return resp, args.Error(1) +} + func TestAgentBasics(t *testing.T) { suite := newAgentTestSuite(t) defer suite.Close() @@ -229,6 +253,10 @@ func TestAgentBasics(t *testing.T) { }, }, }, nil) + suite.etcdClusterClient.On("MemberList", mock.Anything). + Return(&clientv3.MemberListResponse{ + Header: &etcdserverpb.ResponseHeader{ClusterId: 1}, + }, nil) // Test Point 1: Create an agent. agent, err := suite.CreateAgent(t) @@ -339,6 +367,10 @@ func TestAgentNoOwnerAtStartUp(t *testing.T) { suite.etcdKVClient.On("Get", mock.Anything, etcd.CaptureOwnerKey(etcd.DefaultCDCClusterID), mock.Anything). Return(&clientv3.GetResponse{}, nil) + suite.etcdClusterClient.On("MemberList", mock.Anything). + Return(&clientv3.MemberListResponse{ + Header: &etcdserverpb.ResponseHeader{ClusterId: 1}, + }, nil) // Test Point 1: Create an agent. agent, err := suite.CreateAgent(t) @@ -404,6 +436,10 @@ func TestAgentTolerateClientClosed(t *testing.T) { }, }, }, nil) + suite.etcdClusterClient.On("MemberList", mock.Anything). + Return(&clientv3.MemberListResponse{ + Header: &etcdserverpb.ResponseHeader{ClusterId: 1}, + }, nil) // Test Point 1: Create an agent. agent, err := suite.CreateAgent(t) @@ -449,6 +485,10 @@ func TestNoFinishOperationBeforeSyncIsReceived(t *testing.T) { }, }, }, nil) + suite.etcdClusterClient.On("MemberList", mock.Anything). + Return(&clientv3.MemberListResponse{ + Header: &etcdserverpb.ResponseHeader{ClusterId: 1}, + }, nil) agent, err := suite.CreateAgent(t) require.NoError(t, err) diff --git a/cdc/server.go b/cdc/server.go index a6aedaa34ea..51817f4a85e 100644 --- a/cdc/server.go +++ b/cdc/server.go @@ -149,9 +149,12 @@ func (s *Server) Run(ctx context.Context) error { return errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "new etcd client") } - cdcEtcdClient := etcd.NewCDCEtcdClient(ctx, etcdCli, conf.ClusterID) + cdcEtcdClient, err := etcd.NewCDCEtcdClient(ctx, etcdCli, conf.ClusterID) + if err != nil { + return errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), + "wrapper etcd client") + } s.etcdClient = &cdcEtcdClient - err = s.initDir(ctx) if err != nil { return errors.Trace(err) diff --git a/cdc/server_test.go b/cdc/server_test.go index 67c0e20bddf..66002befafd 100644 --- a/cdc/server_test.go +++ b/cdc/server_test.go @@ -77,7 +77,8 @@ func newServer(t *testing.T) *testServer { DialTimeout: 5 * time.Second, }) require.Nil(t, err) - etcdClient := etcd.NewCDCEtcdClient(s.ctx, client, etcd.DefaultCDCClusterID) + etcdClient, err := etcd.NewCDCEtcdClient(s.ctx, client, etcd.DefaultCDCClusterID) + require.Nil(t, err) s.server.etcdClient = &etcdClient s.errg = util.HandleErrWithErrGroup(s.ctx, s.e.Err(), func(e error) { t.Log(e) }) diff --git a/pkg/cmd/cli/cli_changefeed_create.go b/pkg/cmd/cli/cli_changefeed_create.go index cefaa40b7d3..995270301cc 100644 --- a/pkg/cmd/cli/cli_changefeed_create.go +++ b/pkg/cmd/cli/cli_changefeed_create.go @@ -371,7 +371,9 @@ func (o *createChangefeedOptions) validateStartTs(ctx context.Context) error { // Ensure the start ts is validate in the next 1 hour. const ensureTTL = 60 * 60. return gc.EnsureChangefeedStartTsSafety( - ctx, o.pdClient, model.DefaultChangeFeedID(o.changefeedID), ensureTTL, o.startTs) + ctx, o.pdClient, + o.etcdClient.GetEnsureGCServiceID(), + model.DefaultChangeFeedID(o.changefeedID), ensureTTL, o.startTs) } // validateTargetTs checks if targetTs is a valid value. diff --git a/pkg/cmd/cli/cli_unsafe_delete_service_gc_safepoint.go b/pkg/cmd/cli/cli_unsafe_delete_service_gc_safepoint.go index 0ccd0cc2e26..b655d23a800 100644 --- a/pkg/cmd/cli/cli_unsafe_delete_service_gc_safepoint.go +++ b/pkg/cmd/cli/cli_unsafe_delete_service_gc_safepoint.go @@ -17,6 +17,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tiflow/pkg/cmd/context" "github.com/pingcap/tiflow/pkg/cmd/factory" + "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/txnutil/gc" "github.com/spf13/cobra" pd "github.com/tikv/pd/client" @@ -25,7 +26,8 @@ import ( // unsafeDeleteServiceGcSafepointOptions defines flags // for the `cli unsafe delete-service-gc-safepoint` command. type unsafeDeleteServiceGcSafepointOptions struct { - pdClient pd.Client + pdClient pd.Client + etcdClient *etcd.CDCEtcdClient } // newUnsafeDeleteServiceGcSafepointOptions creates new unsafeDeleteServiceGcSafepointOptions @@ -43,14 +45,16 @@ func (o *unsafeDeleteServiceGcSafepointOptions) complete(f factory.Factory) erro o.pdClient = pdClient - return nil + o.etcdClient, err = f.EtcdClient() + return err } // run runs the `cli unsafe delete-service-gc-safepoint` command. func (o *unsafeDeleteServiceGcSafepointOptions) run(cmd *cobra.Command) error { ctx := context.GetDefaultContext() - err := gc.RemoveServiceGCSafepoint(ctx, o.pdClient, gc.CDCServiceSafePointID) + err := gc.RemoveServiceGCSafepoint(ctx, o.pdClient, + o.etcdClient.GetGCServiceID()) if err == nil { cmd.Println("CDC service GC safepoint truncated in PD!") } diff --git a/pkg/cmd/cli/cli_unsafe_reset.go b/pkg/cmd/cli/cli_unsafe_reset.go index 98a0d76beaa..fc536302791 100644 --- a/pkg/cmd/cli/cli_unsafe_reset.go +++ b/pkg/cmd/cli/cli_unsafe_reset.go @@ -72,7 +72,7 @@ func (o *unsafeResetOptions) run(cmd *cobra.Command) error { return errors.Trace(err) } - err = gc.RemoveServiceGCSafepoint(ctx, o.pdClient, gc.CDCServiceSafePointID) + err = gc.RemoveServiceGCSafepoint(ctx, o.pdClient, o.etcdClient.GetGCServiceID()) if err != nil { return errors.Trace(err) } diff --git a/pkg/cmd/factory/factory_impl.go b/pkg/cmd/factory/factory_impl.go index 06c68f184c9..5554989bccd 100644 --- a/pkg/cmd/factory/factory_impl.go +++ b/pkg/cmd/factory/factory_impl.go @@ -130,8 +130,8 @@ func (f *factoryImpl) EtcdClient() (*etcd.CDCEtcdClient, error) { "fail to open PD client, please check pd address \"%s\"", pdAddr) } - client := etcd.NewCDCEtcdClient(ctx, etcdClient, f.clientGetter.GetClusterID()) - return &client, nil + client, err := etcd.NewCDCEtcdClient(ctx, etcdClient, f.clientGetter.GetClusterID()) + return &client, err } // PdClient creates new pd client. diff --git a/pkg/context/context.go b/pkg/context/context.go index 4567743e1e2..d71dc4b8cb4 100644 --- a/pkg/context/context.go +++ b/pkg/context/context.go @@ -191,6 +191,9 @@ func NewContext4Test(baseCtx context.Context, withChangefeedVars bool) Context { AdvertiseAddr: "127.0.0.1:0000", Version: version.ReleaseVersion, }, + EtcdClient: &etcd.CDCEtcdClient{ + ClusterID: etcd.DefaultCDCClusterID, + }, }) if withChangefeedVars { ctx = WithChangefeedVars(ctx, &ChangefeedVars{ diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index 1b4ec82d31c..b6354a79a15 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -91,15 +91,16 @@ func GetEtcdKeyJob(clusterID string, changeFeedID model.ChangeFeedID) string { // CDCEtcdClient is a wrap of etcd client type CDCEtcdClient struct { - Client *Client - ClusterID string + Client *Client + ClusterID string + etcdClusterID uint64 } // NewCDCEtcdClient returns a new CDCEtcdClient func NewCDCEtcdClient(ctx context.Context, cli *clientv3.Client, clusterID string, -) CDCEtcdClient { +) (CDCEtcdClient, error) { metrics := map[string]prometheus.Counter{ EtcdPut: etcdRequestCounter.WithLabelValues(EtcdPut), EtcdGet: etcdRequestCounter.WithLabelValues(EtcdGet), @@ -108,10 +109,15 @@ func NewCDCEtcdClient(ctx context.Context, EtcdGrant: etcdRequestCounter.WithLabelValues(EtcdGrant), EtcdRevoke: etcdRequestCounter.WithLabelValues(EtcdRevoke), } - return CDCEtcdClient{ - Client: Wrap(cli, metrics), - ClusterID: clusterID, + resp, err := cli.MemberList(ctx) + if err != nil { + return CDCEtcdClient{}, err } + return CDCEtcdClient{ + etcdClusterID: resp.Header.ClusterId, + Client: Wrap(cli, metrics), + ClusterID: clusterID, + }, nil } // Close releases resources in CDCEtcdClient @@ -505,6 +511,21 @@ func (c CDCEtcdClient) GetOwnerRevision(ctx context.Context, captureID string) ( return resp.Kvs[0].ModRevision, nil } +// GetGCServiceID returns the cdc gc service ID +func (c CDCEtcdClient) GetGCServiceID() string { + return fmt.Sprintf("ticdc-%s-%d", c.ClusterID, c.etcdClusterID) +} + +// GetEnsureGCServiceID return the prefix for the gc service id when changefeed is creating +func (c CDCEtcdClient) GetEnsureGCServiceID() string { + return c.GetGCServiceID() + "-creating-" +} + +// GcServiceIDForTest returns the gc service ID for tests +func GcServiceIDForTest() string { + return fmt.Sprintf("ticdc-%s-%d", "default", 0) +} + // getFreeListenURLs get free ports and localhost as url. func getFreeListenURLs(n int) (urls []*url.URL, retErr error) { for i := 0; i < n; i++ { diff --git a/pkg/etcd/etcd_test.go b/pkg/etcd/etcd_test.go index 2d239e2f6c0..7d257ecdac1 100644 --- a/pkg/etcd/etcd_test.go +++ b/pkg/etcd/etcd_test.go @@ -70,7 +70,8 @@ func (s *etcdTester) setUpTest(t *testing.T) { LogConfig: &logConfig, }) require.NoError(t, err) - s.client = NewCDCEtcdClient(context.TODO(), client, DefaultCDCClusterID) + s.client, err = NewCDCEtcdClient(context.TODO(), client, DefaultCDCClusterID) + require.Nil(t, err) s.ctx, s.cancel = context.WithCancel(context.Background()) s.errg = util.HandleErrWithErrGroup(s.ctx, s.etcd.Err(), func(e error) { t.Log(e) }) } diff --git a/pkg/txnutil/gc/gc_manager.go b/pkg/txnutil/gc/gc_manager.go index 037d63ba371..3e5d10af9c6 100644 --- a/pkg/txnutil/gc/gc_manager.go +++ b/pkg/txnutil/gc/gc_manager.go @@ -29,11 +29,6 @@ import ( "go.uber.org/zap" ) -const ( - // CDCServiceSafePointID is the ID of CDC service in pd.UpdateServiceGCSafePoint. - CDCServiceSafePointID = "ticdc" -) - // gcSafepointUpdateInterval is the minimum interval that CDC can update gc safepoint var gcSafepointUpdateInterval = 1 * time.Minute @@ -47,9 +42,10 @@ type Manager interface { } type gcManager struct { - pdClient pd.Client - pdClock pdutil.Clock - gcTTL int64 + gcServiceID string + pdClient pd.Client + pdClock pdutil.Clock + gcTTL int64 lastUpdatedTime time.Time lastSucceededTime time.Time @@ -58,12 +54,13 @@ type gcManager struct { } // NewManager creates a new Manager. -func NewManager(pdClient pd.Client, pdClock pdutil.Clock) Manager { +func NewManager(gcServiceID string, pdClient pd.Client, pdClock pdutil.Clock) Manager { serverConfig := config.GetGlobalServerConfig() failpoint.Inject("InjectGcSafepointUpdateInterval", func(val failpoint.Value) { gcSafepointUpdateInterval = time.Duration(val.(int) * int(time.Millisecond)) }) return &gcManager{ + gcServiceID: gcServiceID, pdClient: pdClient, pdClock: pdClock, lastSucceededTime: time.Now(), @@ -80,7 +77,7 @@ func (m *gcManager) TryUpdateGCSafePoint( m.lastUpdatedTime = time.Now() actual, err := setServiceGCSafepoint( - ctx, m.pdClient, CDCServiceSafePointID, m.gcTTL, checkpointTs) + ctx, m.pdClient, m.gcServiceID, m.gcTTL, checkpointTs) if err != nil { log.Warn("updateGCSafePoint failed", zap.Uint64("safePointTs", checkpointTs), diff --git a/pkg/txnutil/gc/gc_manager_test.go b/pkg/txnutil/gc/gc_manager_test.go index 2102115ac2a..0f1338bdaa3 100644 --- a/pkg/txnutil/gc/gc_manager_test.go +++ b/pkg/txnutil/gc/gc_manager_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" cdcContext "github.com/pingcap/tiflow/pkg/context" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/util/testleak" "github.com/tikv/client-go/v2/oracle" @@ -40,14 +41,15 @@ func (s *gcManagerSuite) TestUpdateGCSafePoint(c *check.C) { defer testleak.AfterTest(c)() mockPDClient := &MockPDClient{} pdClock := pdutil.NewClock4Test() - gcManager := NewManager(mockPDClient, pdClock).(*gcManager) + gcManager := NewManager(etcd.GcServiceIDForTest(), + mockPDClient, pdClock).(*gcManager) ctx := cdcContext.NewBackendContext4Test(true) startTs := oracle.GoTimeToTS(time.Now()) mockPDClient.UpdateServiceGCSafePointFunc = func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { c.Assert(safePoint, check.Equals, startTs) c.Assert(ttl, check.Equals, gcManager.gcTTL) - c.Assert(serviceID, check.Equals, CDCServiceSafePointID) + c.Assert(serviceID, check.Equals, etcd.GcServiceIDForTest()) return 0, nil } err := gcManager.TryUpdateGCSafePoint(ctx, startTs, false /* forceUpdate */) @@ -65,7 +67,7 @@ func (s *gcManagerSuite) TestUpdateGCSafePoint(c *check.C) { mockPDClient.UpdateServiceGCSafePointFunc = func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { c.Assert(safePoint, check.Equals, startTs) c.Assert(ttl, check.Equals, gcManager.gcTTL) - c.Assert(serviceID, check.Equals, CDCServiceSafePointID) + c.Assert(serviceID, check.Equals, etcd.GcServiceIDForTest()) return 0, nil } err = gcManager.TryUpdateGCSafePoint(ctx, startTs, false /* forceUpdate */) @@ -77,7 +79,7 @@ func (s *gcManagerSuite) TestUpdateGCSafePoint(c *check.C) { mockPDClient.UpdateServiceGCSafePointFunc = func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { c.Assert(safePoint, check.Equals, startTs) c.Assert(ttl, check.Equals, gcManager.gcTTL) - c.Assert(serviceID, check.Equals, CDCServiceSafePointID) + c.Assert(serviceID, check.Equals, etcd.GcServiceIDForTest()) ch <- struct{}{} return 0, nil } @@ -94,7 +96,8 @@ func (s *gcManagerSuite) TestCheckStaleCheckpointTs(c *check.C) { defer testleak.AfterTest(c)() mockPDClient := &MockPDClient{} pdClock := pdutil.NewClock4Test() - gcManager := NewManager(mockPDClient, pdClock).(*gcManager) + gcManager := NewManager(etcd.GcServiceIDForTest(), + mockPDClient, pdClock).(*gcManager) gcManager.isTiCDCBlockGC = true ctx := context.Background() diff --git a/pkg/txnutil/gc/gc_service.go b/pkg/txnutil/gc/gc_service.go index 5e6f3abc0dc..7a14e34c46a 100644 --- a/pkg/txnutil/gc/gc_service.go +++ b/pkg/txnutil/gc/gc_service.go @@ -26,21 +26,17 @@ import ( "go.uber.org/zap" ) -const ( - // cdcChangefeedCreatingServiceGCSafePointID is service GC safe point ID - cdcChangefeedCreatingServiceGCSafePointID = "ticdc-creating-" -) - // EnsureChangefeedStartTsSafety checks if the startTs less than the minimum of // service GC safepoint and this function will update the service GC to startTs func EnsureChangefeedStartTsSafety( ctx context.Context, pdCli pd.Client, + gcServiceIDPrefix string, changefeedID model.ChangeFeedID, TTL int64, startTs uint64, ) error { minServiceGCTs, err := setServiceGCSafepoint( ctx, pdCli, - cdcChangefeedCreatingServiceGCSafePointID+changefeedID.Namespace+"_"+changefeedID.ID, + gcServiceIDPrefix+changefeedID.Namespace+"_"+changefeedID.ID, TTL, startTs) if err != nil { return errors.Trace(err) diff --git a/pkg/txnutil/gc/gc_service_test.go b/pkg/txnutil/gc/gc_service_test.go index dab97ddd5ec..2317b469ea0 100644 --- a/pkg/txnutil/gc/gc_service_test.go +++ b/pkg/txnutil/gc/gc_service_test.go @@ -40,11 +40,13 @@ func (s *gcServiceSuite) TestCheckSafetyOfStartTs(c *check.C) { // assume no pd leader switch s.pdCli.UpdateServiceGCSafePoint(ctx, "service1", 10, 60) //nolint:errcheck err := EnsureChangefeedStartTsSafety(ctx, s.pdCli, + "ticdc-creating-", model.DefaultChangeFeedID("changefeed1"), TTL, 50) c.Assert(err.Error(), check.Equals, "[CDC:ErrStartTsBeforeGC]fail to create changefeed because start-ts 50 is earlier than GC safepoint at 60") s.pdCli.UpdateServiceGCSafePoint(ctx, "service2", 10, 80) //nolint:errcheck s.pdCli.UpdateServiceGCSafePoint(ctx, "service3", 10, 70) //nolint:errcheck err = EnsureChangefeedStartTsSafety(ctx, s.pdCli, + "ticdc-creating-", model.DefaultChangeFeedID("changefeed2"), TTL, 65) c.Assert(err, check.IsNil) c.Assert(s.pdCli.serviceSafePoint, check.DeepEquals, map[string]uint64{ @@ -59,12 +61,14 @@ func (s *gcServiceSuite) TestCheckSafetyOfStartTs(c *check.C) { s.pdCli.retryThreshold = 1 s.pdCli.retryCount = 0 err = EnsureChangefeedStartTsSafety(ctx, s.pdCli, + "ticdc-creating-", model.DefaultChangeFeedID("changefeed2"), TTL, 65) c.Assert(err, check.IsNil) s.pdCli.retryThreshold = gcServiceMaxRetries + 1 s.pdCli.retryCount = 0 err = EnsureChangefeedStartTsSafety(ctx, s.pdCli, + "ticdc-creating-", model.DefaultChangeFeedID("changefeed2"), TTL, 65) c.Assert(err, check.NotNil) c.Assert(err.Error(), check.Equals, @@ -73,6 +77,7 @@ func (s *gcServiceSuite) TestCheckSafetyOfStartTs(c *check.C) { s.pdCli.retryThreshold = 3 s.pdCli.retryCount = 0 err = EnsureChangefeedStartTsSafety(ctx, s.pdCli, + "ticdc-creating-", model.DefaultChangeFeedID("changefeed1"), TTL, 50) c.Assert(err.Error(), check.Equals, "[CDC:ErrStartTsBeforeGC]fail to create changefeed "+ diff --git a/pkg/upstream/manager.go b/pkg/upstream/manager.go index 08da5173c66..3e67aa9d433 100644 --- a/pkg/upstream/manager.go +++ b/pkg/upstream/manager.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/etcd" pd "github.com/tikv/pd/client" "go.uber.org/zap" ) @@ -29,6 +30,8 @@ const DefaultUpstreamID uint64 = 0 // Manager manages all upstream. type Manager struct { + // gcServiceID identify the cdc cluster gc service id + gcServiceID string // upstreamID map to *Upstream. ups *sync.Map // all upstream should be spawn from this ctx. @@ -41,15 +44,23 @@ type Manager struct { // NewManager creates a new Manager. // ctx will be used to initialize upstream spawned by this Manager. -func NewManager(ctx context.Context) *Manager { +func NewManager(ctx context.Context, gcServiceID string) *Manager { ctx, cancel := context.WithCancel(ctx) - return &Manager{ups: new(sync.Map), ctx: ctx, cancel: cancel} + return &Manager{ + ups: new(sync.Map), + ctx: ctx, + cancel: cancel, + gcServiceID: gcServiceID, + } } // NewManager4Test returns a Manager for unit test. func NewManager4Test(pdClient pd.Client) *Manager { up := NewUpstream4Test(pdClient) - res := &Manager{ups: new(sync.Map), ctx: context.Background()} + res := &Manager{ + ups: new(sync.Map), ctx: context.Background(), + gcServiceID: etcd.GcServiceIDForTest(), + } res.ups.Store(DefaultUpstreamID, up) return res } @@ -67,7 +78,7 @@ func (m *Manager) Add(upstreamID uint64, pdEndpoints []string, securityConfig *c return nil } up := newUpstream(upstreamID, pdEndpoints, securityConfig) - err := up.init(m.ctx) + err := up.init(m.ctx, m.gcServiceID) if err != nil { return errors.Trace(err) } diff --git a/pkg/upstream/upstream.go b/pkg/upstream/upstream.go index abb838f92a7..3b63d151a87 100644 --- a/pkg/upstream/upstream.go +++ b/pkg/upstream/upstream.go @@ -26,6 +26,7 @@ import ( tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tiflow/cdc/kv" "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/txnutil/gc" "github.com/pingcap/tiflow/pkg/version" @@ -86,7 +87,9 @@ func newUpstream(upstreamID uint64, pdEndpoints []string, securityConfig *config // NewUpstream4Test new a upstream for unit test. func NewUpstream4Test(pdClient pd.Client) *Upstream { pdClock := pdutil.NewClock4Test() - gcManager := gc.NewManager(pdClient, pdClock) + gcManager := gc.NewManager( + etcd.GcServiceIDForTest(), + pdClient, pdClock) res := &Upstream{ ID: DefaultUpstreamID, PDClient: pdClient, PDClock: pdClock, GCManager: gcManager, @@ -101,7 +104,7 @@ func NewUpstream4Test(pdClient pd.Client) *Upstream { return res } -func (up *Upstream) init(ctx context.Context) error { +func (up *Upstream) init(ctx context.Context, gcServiceID string) error { log.Info("upstream is initializing", zap.Uint64("upstreamID", up.ID)) var err error @@ -156,7 +159,7 @@ func (up *Upstream) init(ctx context.Context) error { } log.Info("upstream's PDClock created", zap.Uint64("upstreamID", up.ID)) - up.GCManager = gc.NewManager(up.PDClient, up.PDClock) + up.GCManager = gc.NewManager(gcServiceID, up.PDClient, up.PDClock) log.Info("upstream's GCManager created", zap.Uint64("upstreamID", up.ID)) // Update meta-region label to ensure that meta region isolated from data regions. diff --git a/tests/integration_tests/gc_safepoint/run.sh b/tests/integration_tests/gc_safepoint/run.sh index 1a1a25868d2..45f6097ac7c 100755 --- a/tests/integration_tests/gc_safepoint/run.sh +++ b/tests/integration_tests/gc_safepoint/run.sh @@ -12,7 +12,7 @@ MAX_RETRIES=10 function get_safepoint() { pd_addr=$1 pd_cluster_id=$2 - safe_point=$(ETCDCTL_API=3 etcdctl --endpoints=$pd_addr get /pd/$pd_cluster_id/gc/safe_point/service/ticdc | grep -oE "safe_point\":[0-9]+" | grep -oE "[0-9]+") + safe_point=$(ETCDCTL_API=3 etcdctl --endpoints=$pd_addr get /pd/$pd_cluster_id/gc/safe_point/service/ticdc-default --prefix | grep -oE "safe_point\":[0-9]+" | grep -oE "[0-9]+") echo $safe_point } diff --git a/tests/integration_tests/move_table/main.go b/tests/integration_tests/move_table/main.go index b78c1d881b5..b8cedbf19f0 100644 --- a/tests/integration_tests/move_table/main.go +++ b/tests/integration_tests/move_table/main.go @@ -154,10 +154,14 @@ func newCluster(ctx context.Context, pd string) (*cluster, error) { return nil, errors.Trace(err) } + cdcEtcdCli, err := etcd.NewCDCEtcdClient(ctx, etcdCli, etcd.DefaultCDCClusterID) + if err != nil { + return nil, errors.Trace(err) + } ret := &cluster{ ownerAddr: "", captures: nil, - cdcEtcdCli: etcd.NewCDCEtcdClient(ctx, etcdCli, etcd.DefaultCDCClusterID), + cdcEtcdCli: cdcEtcdCli, } log.Info("new cluster initialized")