From 3425318208265fb41956164c3ef34fe7dd4e1d0f Mon Sep 17 00:00:00 2001 From: Jianyuan Jiang Date: Thu, 19 May 2022 15:02:14 +0800 Subject: [PATCH] DNM *(ticdc): cli support cdc cluster (#5412) * cli support cdc cluster-id cmd flag --- cdc/api/status.go | 3 +- cdc/capture/capture.go | 15 +-- cdc/capture/capture_test.go | 2 +- cdc/owner/changefeed_test.go | 3 +- cdc/owner/feed_state_manager_test.go | 18 ++-- cdc/owner/owner_test.go | 15 ++- cdc/processor/manager_test.go | 11 ++- cdc/processor/processor_test.go | 4 +- .../internal/base/processor_agent.go | 2 +- .../internal/base/processor_agent_test.go | 17 ++-- cdc/server.go | 2 +- cdc/server_test.go | 2 +- pkg/cmd/cli/cli_capture_list.go | 3 +- pkg/cmd/factory/factory.go | 18 +++- pkg/cmd/factory/factory_impl.go | 8 +- pkg/cmd/server/server.go | 3 + pkg/etcd/etcd.go | 95 +++++++++++-------- pkg/etcd/etcd_test.go | 10 +- pkg/etcd/etcdkey.go | 26 +++-- pkg/etcd/etcdkey_test.go | 20 ++-- pkg/orchestrator/reactor_state.go | 20 +++- pkg/orchestrator/reactor_state_test.go | 61 +++++++----- tests/integration_tests/move_table/main.go | 5 +- tests/utils/cdc_state_checker/cdc_monitor.go | 3 +- tests/utils/cdc_state_checker/state.go | 11 ++- 25 files changed, 233 insertions(+), 144 deletions(-) diff --git a/cdc/api/status.go b/cdc/api/status.go index 1678333f8e9..1471465696f 100644 --- a/cdc/api/status.go +++ b/cdc/api/status.go @@ -48,7 +48,8 @@ func RegisterStatusAPIRoutes(router *gin.Engine, capture *capture.Capture) { } func (h *statusAPI) writeEtcdInfo(ctx context.Context, cli *etcd.CDCEtcdClient, w io.Writer) { - resp, err := cli.Client.Get(ctx, etcd.BaseKey(), clientv3.WithPrefix()) + resp, err := cli.Client.Get(ctx, + etcd.BaseKey(cli.ClusterID), clientv3.WithPrefix()) if err != nil { fmt.Fprintf(w, "failed to get info: %s\n\n", err.Error()) return diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 03f14c3e217..a2a304b10d4 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -141,7 +141,7 @@ func (c *Capture) reset(ctx context.Context) error { _ = c.session.Close() } c.session = sess - c.election = concurrency.NewElection(sess, etcd.CaptureOwnerKey()) + c.election = concurrency.NewElection(sess, etcd.CaptureOwnerKey(c.EtcdClient.ClusterID)) if c.tableActorSystem != nil { c.tableActorSystem.Stop() @@ -279,7 +279,7 @@ func (c *Capture) run(stdCtx context.Context) error { conf := config.GetGlobalServerConfig() processorFlushInterval := time.Duration(conf.ProcessorFlushInterval) - globalState := orchestrator.NewGlobalState() + globalState := orchestrator.NewGlobalState(c.EtcdClient.ClusterID) globalState.SetOnCaptureAdded(func(captureID model.CaptureID, addr string) { c.MessageRouter.AddPeer(captureID, addr) @@ -380,7 +380,7 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error { owner := c.newOwner(c.UpstreamManager) c.setOwner(owner) - globalState := orchestrator.NewGlobalState() + globalState := orchestrator.NewGlobalState(c.EtcdClient.ClusterID) globalState.SetOnCaptureAdded(func(captureID model.CaptureID, addr string) { c.MessageRouter.AddPeer(captureID, addr) @@ -389,7 +389,9 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error { c.MessageRouter.RemovePeer(captureID) }) - err = c.runEtcdWorker(ownerCtx, owner, orchestrator.NewGlobalState(), ownerFlushInterval, util.RoleOwner.String()) + err = c.runEtcdWorker(ownerCtx, owner, + orchestrator.NewGlobalState(c.EtcdClient.ClusterID), + ownerFlushInterval, util.RoleOwner.String()) c.setOwner(nil) log.Info("run owner exited", zap.Error(err)) // if owner exits, resign the owner key @@ -413,7 +415,7 @@ func (c *Capture) runEtcdWorker( role string, ) error { etcdWorker, err := orchestrator.NewEtcdWorker(ctx.GlobalVars().EtcdClient.Client, - etcd.BaseKey(), reactor, reactorState) + etcd.BaseKey(c.EtcdClient.ClusterID), reactor, reactorState) if err != nil { return errors.Trace(err) } @@ -578,7 +580,8 @@ func (c *Capture) GetOwnerCaptureInfo(ctx context.Context) (*model.CaptureInfo, return nil, err } - ownerID, err := c.EtcdClient.GetOwnerID(ctx, etcd.CaptureOwnerKey()) + ownerID, err := c.EtcdClient.GetOwnerID(ctx, + etcd.CaptureOwnerKey(c.EtcdClient.ClusterID)) if err != nil { return nil, err } diff --git a/cdc/capture/capture_test.go b/cdc/capture/capture_test.go index 4840bf80291..c89a416fd01 100644 --- a/cdc/capture/capture_test.go +++ b/cdc/capture/capture_test.go @@ -42,7 +42,7 @@ func TestReset(t *testing.T) { DialTimeout: 3 * time.Second, }) require.NoError(t, err) - client := etcd.NewCDCEtcdClient(ctx, etcdCli) + client := etcd.NewCDCEtcdClient(ctx, etcdCli, etcd.DefaultCDCClusterID) // Close the client before the test function exits to prevent possible // ctx leaks. // Ref: https://github.com/grpc/grpc-go/blob/master/stream.go#L229 diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index b701484572f..c251d642c15 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -192,7 +192,8 @@ func createChangefeed4Test(ctx cdcContext.Context, t *testing.T) ( return &mockScheduler{}, nil } cf.upStream = upStream - state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID) + state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID, + ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(t, state, nil) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { require.Nil(t, info) diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index 5fda9e6c3ee..97eb2b20da6 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -29,7 +29,8 @@ import ( func TestHandleJob(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) manager := newFeedStateManager4Test() - state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID) + state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID, + ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(t, state, nil) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { require.Nil(t, info) @@ -105,7 +106,8 @@ func TestHandleJob(t *testing.T) { func TestMarkFinished(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) manager := newFeedStateManager4Test() - state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID) + state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID, + ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(t, state, nil) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { require.Nil(t, info) @@ -133,7 +135,8 @@ func TestMarkFinished(t *testing.T) { func TestCleanUpInfos(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) manager := newFeedStateManager4Test() - state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID) + state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID, + ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(t, state, nil) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { require.Nil(t, info) @@ -165,7 +168,8 @@ func TestCleanUpInfos(t *testing.T) { func TestHandleError(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) manager := newFeedStateManager4Test() - state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID) + state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID, + ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(t, state, nil) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { require.Nil(t, info) @@ -228,7 +232,8 @@ func TestHandleError(t *testing.T) { func TestHandleFastFailError(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) manager := new(feedStateManager) - state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID) + state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID, + ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(t, state, nil) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { require.Nil(t, info) @@ -308,7 +313,8 @@ func TestChangefeedStatusNotExist(t *testing.T) { ` ctx := cdcContext.NewBackendContext4Test(true) manager := newFeedStateManager4Test() - state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID) + state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID, + ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(t, state, map[string]string{ fmt.Sprintf("%s/capture/d563bfc0-f406-4f34-bc7d-6dc2e35a44e5", etcd.DefaultClusterAndMetaPrefix): ` diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index c524a044ed8..3b6b9541e1a 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -62,11 +62,12 @@ func createOwner4Test(ctx cdcContext.Context, t *testing.T) (*ownerImpl, *orches o := owner.(*ownerImpl) o.upstreamManager = upstream.NewManager4Test(pdClient) - state := orchestrator.NewGlobalState() + state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID) tester := orchestrator.NewReactorStateTester(t, state, nil) // set captures cdcKey := etcd.CDCKey{ + ClusterID: state.ClusterID, Tp: etcd.CDCKeyTypeCapture, CaptureID: ctx.GlobalVars().CaptureInfo.ID, } @@ -91,6 +92,7 @@ func TestCreateRemoveChangefeed(t *testing.T) { changefeedStr, err := changefeedInfo.Marshal() require.Nil(t, err) cdcKey := etcd.CDCKey{ + ClusterID: state.ClusterID, Tp: etcd.CDCKeyTypeChangefeedInfo, ChangefeedID: changefeedID, } @@ -160,6 +162,7 @@ func TestStopChangefeed(t *testing.T) { changefeedStr, err := changefeedInfo.Marshal() require.Nil(t, err) cdcKey := etcd.CDCKey{ + ClusterID: state.ClusterID, Tp: etcd.CDCKeyTypeChangefeedInfo, ChangefeedID: changefeedID, } @@ -209,6 +212,7 @@ func TestFixChangefeedState(t *testing.T) { changefeedStr, err := changefeedInfo.Marshal() require.Nil(t, err) cdcKey := etcd.CDCKey{ + ClusterID: state.ClusterID, Tp: etcd.CDCKeyTypeChangefeedInfo, ChangefeedID: changefeedID, } @@ -248,6 +252,7 @@ func TestFixChangefeedSinkProtocol(t *testing.T) { changefeedStr, err := changefeedInfo.Marshal() require.Nil(t, err) cdcKey := etcd.CDCKey{ + ClusterID: state.ClusterID, Tp: etcd.CDCKeyTypeChangefeedInfo, ChangefeedID: changefeedID, } @@ -288,6 +293,7 @@ func TestCheckClusterVersion(t *testing.T) { changefeedStr, err := changefeedInfo.Marshal() require.Nil(t, err) cdcKey := etcd.CDCKey{ + ClusterID: state.ClusterID, Tp: etcd.CDCKeyTypeChangefeedInfo, ChangefeedID: changefeedID, } @@ -369,7 +375,7 @@ func TestUpdateGCSafePoint(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) ctx, cancel := cdcContext.WithCancel(ctx) defer cancel() - state := orchestrator.NewGlobalState() + state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID) tester := orchestrator.NewReactorStateTester(t, state, nil) // no changefeed, the gc safe point should be max uint64 @@ -488,6 +494,7 @@ func TestHandleJobsDontBlock(t *testing.T) { changefeedStr, err := cfInfo1.Marshal() require.Nil(t, err) cdcKey := etcd.CDCKey{ + ClusterID: state.ClusterID, Tp: etcd.CDCKeyTypeChangefeedInfo, ChangefeedID: cf1, } @@ -505,6 +512,7 @@ func TestHandleJobsDontBlock(t *testing.T) { Version: " v0.0.1-test-only", } cdcKey = etcd.CDCKey{ + ClusterID: state.ClusterID, Tp: etcd.CDCKeyTypeCapture, CaptureID: captureInfo.ID, } @@ -522,6 +530,7 @@ func TestHandleJobsDontBlock(t *testing.T) { changefeedStr1, err := cfInfo2.Marshal() require.Nil(t, err) cdcKey = etcd.CDCKey{ + ClusterID: state.ClusterID, Tp: etcd.CDCKeyTypeChangefeedInfo, ChangefeedID: cf2, } @@ -565,7 +574,7 @@ WorkLoop: } func TestCalculateGCSafepointTs(t *testing.T) { - state := orchestrator.NewGlobalState() + state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID) expectMinTsMap := make(map[uint64]uint64) expectForceUpdateMap := make(map[uint64]interface{}) o := ownerImpl{changefeeds: make(map[model.ChangeFeedID]*changefeed)} diff --git a/cdc/processor/manager_test.go b/cdc/processor/manager_test.go index 9c3e4f91161..dcad4c2c844 100644 --- a/cdc/processor/manager_test.go +++ b/cdc/processor/manager_test.go @@ -61,7 +61,7 @@ func (s *managerTester) resetSuit(ctx cdcContext.Context, t *testing.T) { checkpointTs: replicaInfo.StartTs, }, nil }) - s.state = orchestrator.NewGlobalState() + s.state = orchestrator.NewGlobalState(etcd.DefaultCDCClusterID) captureInfoBytes, err := ctx.GlobalVars().CaptureInfo.Marshal() require.Nil(t, err) s.tester = orchestrator.NewReactorStateTester(t, s.state, map[string]string{ @@ -83,7 +83,8 @@ func TestChangefeed(t *testing.T) { changefeedID := model.DefaultChangeFeedID("test-changefeed") // an inactive changefeed - s.state.Changefeeds[changefeedID] = orchestrator.NewChangefeedReactorState(changefeedID) + s.state.Changefeeds[changefeedID] = orchestrator.NewChangefeedReactorState( + etcd.DefaultCDCClusterID, changefeedID) _, err = s.manager.Tick(ctx, s.state) s.tester.MustApplyPatches() require.Nil(t, err) @@ -135,7 +136,8 @@ func TestDebugInfo(t *testing.T) { changefeedID := model.DefaultChangeFeedID("test-changefeed") // an active changefeed - s.state.Changefeeds[changefeedID] = orchestrator.NewChangefeedReactorState(changefeedID) + s.state.Changefeeds[changefeedID] = orchestrator.NewChangefeedReactorState( + etcd.DefaultCDCClusterID, changefeedID) s.state.Changefeeds[changefeedID].PatchInfo( func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { return &model.ChangeFeedInfo{ @@ -189,7 +191,8 @@ func TestClose(t *testing.T) { changefeedID := model.DefaultChangeFeedID("test-changefeed") // an active changefeed - s.state.Changefeeds[changefeedID] = orchestrator.NewChangefeedReactorState(changefeedID) + s.state.Changefeeds[changefeedID] = orchestrator.NewChangefeedReactorState( + etcd.DefaultCDCClusterID, changefeedID) s.state.Changefeeds[changefeedID].PatchInfo( func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { return &model.ChangeFeedInfo{ diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index 4606c3e07a7..6f9e7422333 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -110,7 +110,8 @@ func initProcessor4Test(ctx cdcContext.Context, t *testing.T) (*processor, *orch checkpointTs: replicaInfo.StartTs, }, nil }) - p.changefeed = orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID) + p.changefeed = orchestrator.NewChangefeedReactorState( + etcd.DefaultCDCClusterID, ctx.ChangefeedVars().ID) captureID := ctx.GlobalVars().CaptureInfo.ID changefeedID := ctx.ChangefeedVars().ID return p, orchestrator.NewReactorStateTester(t, p.changefeed, map[string]string{ @@ -578,6 +579,7 @@ func TestSchemaGC(t *testing.T) { func updateChangeFeedPosition(t *testing.T, tester *orchestrator.ReactorStateTester, cfID model.ChangeFeedID, resolvedTs, checkpointTs model.Ts) { key := etcd.CDCKey{ + ClusterID: etcd.DefaultCDCClusterID, Tp: etcd.CDCKeyTypeChangeFeedStatus, ChangefeedID: cfID, } diff --git a/cdc/scheduler/internal/base/processor_agent.go b/cdc/scheduler/internal/base/processor_agent.go index f51b733e6e1..39909a7dc55 100644 --- a/cdc/scheduler/internal/base/processor_agent.go +++ b/cdc/scheduler/internal/base/processor_agent.go @@ -122,7 +122,7 @@ func NewAgent( etcdCliCtx, cancel := context.WithTimeout(ctx, getOwnerFromEtcdTimeout) defer cancel() ownerCaptureID, err := ret.etcdClient. - GetOwnerID(etcdCliCtx, etcd.CaptureOwnerKey()) + GetOwnerID(etcdCliCtx, etcd.CaptureOwnerKey(etcd.DefaultCDCClusterID)) if err != nil { if err != concurrency.ErrElectionNoLeader { return nil, errors.Trace(err) diff --git a/cdc/scheduler/internal/base/processor_agent_test.go b/cdc/scheduler/internal/base/processor_agent_test.go index 2e58dc8750e..106f5e95d15 100644 --- a/cdc/scheduler/internal/base/processor_agent_test.go +++ b/cdc/scheduler/internal/base/processor_agent_test.go @@ -153,7 +153,7 @@ func newAgentTestSuite(t *testing.T) *agentTestSuite { } func (s *agentTestSuite) CreateAgent(t *testing.T) (*agentImpl, error) { - cdcEtcdClient := etcd.NewCDCEtcdClient(s.ctx, s.etcdClient) + cdcEtcdClient := etcd.NewCDCEtcdClient(s.ctx, s.etcdClient, etcd.DefaultCDCClusterID) messageServer := s.cluster.Nodes["capture-1"].Server messageRouter := s.cluster.Nodes["capture-1"].Router s.tableExecutor = NewMockTableExecutor(t) @@ -219,11 +219,11 @@ func TestAgentBasics(t *testing.T) { defer suite.Close() suite.etcdKVClient.On("Get", mock.Anything, - etcd.CaptureOwnerKey(), mock.Anything). + etcd.CaptureOwnerKey(etcd.DefaultCDCClusterID), mock.Anything). Return(&clientv3.GetResponse{ Kvs: []*mvccpb.KeyValue{ { - Key: []byte(etcd.CaptureOwnerKey()), + Key: []byte(etcd.CaptureOwnerKey(etcd.DefaultCDCClusterID)), Value: []byte(ownerCaptureID), ModRevision: 1, }, @@ -337,7 +337,7 @@ func TestAgentNoOwnerAtStartUp(t *testing.T) { // Empty response implies no owner. suite.etcdKVClient.On("Get", mock.Anything, - etcd.CaptureOwnerKey(), mock.Anything). + etcd.CaptureOwnerKey(etcd.DefaultCDCClusterID), mock.Anything). Return(&clientv3.GetResponse{}, nil) // Test Point 1: Create an agent. @@ -393,11 +393,12 @@ func TestAgentTolerateClientClosed(t *testing.T) { defer suite.Close() suite.etcdKVClient.On("Get", mock.Anything, - etcd.CaptureOwnerKey(), mock.Anything). + etcd.CaptureOwnerKey(etcd.DefaultCDCClusterID), mock.Anything). Return(&clientv3.GetResponse{ Kvs: []*mvccpb.KeyValue{ { - Key: []byte(etcd.CaptureOwnerKey()), + Key: []byte(etcd.CaptureOwnerKey( + etcd.DefaultCDCClusterID)), Value: []byte(ownerCaptureID), ModRevision: 1, }, @@ -438,11 +439,11 @@ func TestNoFinishOperationBeforeSyncIsReceived(t *testing.T) { defer suite.Close() suite.etcdKVClient.On("Get", mock.Anything, - etcd.CaptureOwnerKey(), mock.Anything). + etcd.CaptureOwnerKey(etcd.DefaultCDCClusterID), mock.Anything). Return(&clientv3.GetResponse{ Kvs: []*mvccpb.KeyValue{ { - Key: []byte(etcd.CaptureOwnerKey()), + Key: []byte(etcd.CaptureOwnerKey(etcd.DefaultCDCClusterID)), Value: []byte(ownerCaptureID), ModRevision: 1, }, diff --git a/cdc/server.go b/cdc/server.go index fdf07da0dd4..a6aedaa34ea 100644 --- a/cdc/server.go +++ b/cdc/server.go @@ -149,7 +149,7 @@ func (s *Server) Run(ctx context.Context) error { return errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "new etcd client") } - cdcEtcdClient := etcd.NewCDCEtcdClient(ctx, etcdCli) + cdcEtcdClient := etcd.NewCDCEtcdClient(ctx, etcdCli, conf.ClusterID) s.etcdClient = &cdcEtcdClient err = s.initDir(ctx) diff --git a/cdc/server_test.go b/cdc/server_test.go index ff47f554460..67c0e20bddf 100644 --- a/cdc/server_test.go +++ b/cdc/server_test.go @@ -77,7 +77,7 @@ func newServer(t *testing.T) *testServer { DialTimeout: 5 * time.Second, }) require.Nil(t, err) - etcdClient := etcd.NewCDCEtcdClient(s.ctx, client) + etcdClient := etcd.NewCDCEtcdClient(s.ctx, client, etcd.DefaultCDCClusterID) s.server.etcdClient = &etcdClient s.errg = util.HandleErrWithErrGroup(s.ctx, s.e.Err(), func(e error) { t.Log(e) }) diff --git a/pkg/cmd/cli/cli_capture_list.go b/pkg/cmd/cli/cli_capture_list.go index 9b77594a4bf..5234630ee51 100644 --- a/pkg/cmd/cli/cli_capture_list.go +++ b/pkg/cmd/cli/cli_capture_list.go @@ -96,7 +96,8 @@ func listCaptures(ctx context.Context, etcdClient *etcd.CDCEtcdClient) ([]*captu return nil, err } - ownerID, err := etcdClient.GetOwnerID(ctx, etcd.CaptureOwnerKey()) + ownerID, err := etcdClient.GetOwnerID(ctx, + etcd.CaptureOwnerKey(etcdClient.ClusterID)) if err != nil && errors.Cause(err) != concurrency.ErrElectionNoLeader { return nil, err } diff --git a/pkg/cmd/factory/factory.go b/pkg/cmd/factory/factory.go index b1f07054c11..4a67a77a982 100644 --- a/pkg/cmd/factory/factory.go +++ b/pkg/cmd/factory/factory.go @@ -36,6 +36,7 @@ type Factory interface { // ClientGetter defines the client getter. type ClientGetter interface { + GetClusterID() string ToTLSConfig() (*tls.Config, error) ToGRPCDialOption() (grpc.DialOption, error) GetPdAddr() string @@ -45,11 +46,12 @@ type ClientGetter interface { // ClientFlags specifies the parameters needed to construct the client. type ClientFlags struct { - pdAddr string - logLevel string - caPath string - certPath string - keyPath string + clusterID string + pdAddr string + logLevel string + caPath string + certPath string + keyPath string } var _ ClientGetter = &ClientFlags{} @@ -85,6 +87,11 @@ func (c *ClientFlags) GetLogLevel() string { return c.logLevel } +// GetClusterID returns cdc cluster id. +func (c *ClientFlags) GetClusterID() string { + return c.clusterID +} + // NewClientFlags creates new client flags. func NewClientFlags() *ClientFlags { return &ClientFlags{} @@ -93,6 +100,7 @@ func NewClientFlags() *ClientFlags { // AddFlags receives a *cobra.Command reference and binds // flags related to template printing to it. func (c *ClientFlags) AddFlags(cmd *cobra.Command) { + cmd.PersistentFlags().StringVar(&c.clusterID, "cluster-id", "default", "Set the cdc cluster id") cmd.PersistentFlags().StringVar(&c.pdAddr, "pd", "http://127.0.0.1:2379", "PD address, use ',' to separate multiple PDs") cmd.PersistentFlags().StringVar(&c.caPath, "ca", "", "CA certificate path for TLS connection") cmd.PersistentFlags().StringVar(&c.certPath, "cert", "", "Certificate path for TLS connection") diff --git a/pkg/cmd/factory/factory_impl.go b/pkg/cmd/factory/factory_impl.go index 9761cd9b38a..06c68f184c9 100644 --- a/pkg/cmd/factory/factory_impl.go +++ b/pkg/cmd/factory/factory_impl.go @@ -75,10 +75,14 @@ func (f *factoryImpl) GetCredential() *security.Credential { return f.clientGetter.GetCredential() } +// GetClusterID returns cdc cluster id. +func (f *factoryImpl) GetClusterID() string { + return f.clientGetter.GetClusterID() +} + // EtcdClient creates new cdc etcd client. func (f *factoryImpl) EtcdClient() (*etcd.CDCEtcdClient, error) { ctx := cmdconetxt.GetDefaultContext() - tlsConfig, err := f.ToTLSConfig() if err != nil { return nil, err @@ -126,7 +130,7 @@ func (f *factoryImpl) EtcdClient() (*etcd.CDCEtcdClient, error) { "fail to open PD client, please check pd address \"%s\"", pdAddr) } - client := etcd.NewCDCEtcdClient(ctx, etcdClient) + client := etcd.NewCDCEtcdClient(ctx, etcdClient, f.clientGetter.GetClusterID()) return &client, nil } diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 149a1318ef2..f7c12d99097 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -62,6 +62,7 @@ func newOptions() *options { // addFlags receives a *cobra.Command reference and binds // flags related to template printing to it. func (o *options) addFlags(cmd *cobra.Command) { + cmd.Flags().StringVar(&o.serverConfig.ClusterID, "cluster-id", "default", "Set cdc cluster id") cmd.Flags().StringVar(&o.serverConfig.Addr, "addr", o.serverConfig.Addr, "Set the listening address") cmd.Flags().StringVar(&o.serverConfig.AdvertiseAddr, "advertise-addr", o.serverConfig.AdvertiseAddr, "Set the advertise listening address for client communication") @@ -227,6 +228,8 @@ func (o *options) complete(cmd *cobra.Command) error { "sort-dir will be set to `{data-dir}/tmp/sorter`. The sort-dir here will be no-op\n")) } cfg.Sorter.SortDir = config.DefaultSortDir + case "cluster-id": + cfg.ClusterID = o.serverConfig.ClusterID case "pd", "config": // do nothing default: diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index 300a1d7a6b8..1b4ec82d31c 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -36,58 +36,70 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" ) +// DefaultCDCClusterID is the default value of cdc cluster id +const DefaultCDCClusterID = "default" + // CaptureOwnerKey is the capture owner path that is saved to etcd -func CaptureOwnerKey() string { - return BaseKey() + metaPrefix + "/owner" +func CaptureOwnerKey(clusterID string) string { + return BaseKey(clusterID) + metaPrefix + "/owner" } // CaptureInfoKeyPrefix is the capture info path that is saved to etcd -func CaptureInfoKeyPrefix() string { - return BaseKey() + metaPrefix + captureKey +func CaptureInfoKeyPrefix(clusterID string) string { + return BaseKey(clusterID) + metaPrefix + captureKey } // TaskPositionKeyPrefix is the prefix of task position keys -func TaskPositionKeyPrefix(namespace string) string { - return NamespacedPrefix(namespace) + taskPositionKey +func TaskPositionKeyPrefix(clusterID, namespace string) string { + return NamespacedPrefix(clusterID, namespace) + taskPositionKey } // JobKeyPrefix is the prefix of job keys -func JobKeyPrefix(namespace string) string { - return NamespacedPrefix(namespace) + jobKey +func JobKeyPrefix(clusterID, namespace string) string { + return NamespacedPrefix(clusterID, namespace) + jobKey } // GetEtcdKeyChangeFeedList returns the prefix key of all changefeed config -func GetEtcdKeyChangeFeedList(namespace string) string { - return fmt.Sprintf("%s/changefeed/info", NamespacedPrefix(namespace)) +func GetEtcdKeyChangeFeedList(clusterID, namespace string) string { + return fmt.Sprintf("%s/changefeed/info", NamespacedPrefix(clusterID, namespace)) } // GetEtcdKeyChangeFeedInfo returns the key of a changefeed config -func GetEtcdKeyChangeFeedInfo(changefeedID model.ChangeFeedID) string { - return fmt.Sprintf("%s/%s", GetEtcdKeyChangeFeedList(changefeedID.Namespace), changefeedID.ID) +func GetEtcdKeyChangeFeedInfo(clusterID string, changefeedID model.ChangeFeedID) string { + return fmt.Sprintf("%s/%s", GetEtcdKeyChangeFeedList(clusterID, + changefeedID.Namespace), changefeedID.ID) } // GetEtcdKeyTaskPosition returns the key of a task position -func GetEtcdKeyTaskPosition(changefeedID model.ChangeFeedID, captureID string) string { - return TaskPositionKeyPrefix(changefeedID.Namespace) + "/" + captureID + "/" + changefeedID.ID +func GetEtcdKeyTaskPosition(clusterID string, + changefeedID model.ChangeFeedID, + captureID string, +) string { + return TaskPositionKeyPrefix(clusterID, changefeedID.Namespace) + + "/" + captureID + "/" + changefeedID.ID } // GetEtcdKeyCaptureInfo returns the key of a capture info -func GetEtcdKeyCaptureInfo(id string) string { - return CaptureInfoKeyPrefix() + "/" + id +func GetEtcdKeyCaptureInfo(clusterID, id string) string { + return CaptureInfoKeyPrefix(clusterID) + "/" + id } // GetEtcdKeyJob returns the key for a job status -func GetEtcdKeyJob(changeFeedID model.ChangeFeedID) string { - return JobKeyPrefix(changeFeedID.Namespace) + "/" + changeFeedID.ID +func GetEtcdKeyJob(clusterID string, changeFeedID model.ChangeFeedID) string { + return JobKeyPrefix(clusterID, changeFeedID.Namespace) + "/" + changeFeedID.ID } // CDCEtcdClient is a wrap of etcd client type CDCEtcdClient struct { - Client *Client + Client *Client + ClusterID string } // NewCDCEtcdClient returns a new CDCEtcdClient -func NewCDCEtcdClient(ctx context.Context, cli *clientv3.Client) CDCEtcdClient { +func NewCDCEtcdClient(ctx context.Context, + cli *clientv3.Client, + clusterID string, +) CDCEtcdClient { metrics := map[string]prometheus.Counter{ EtcdPut: etcdRequestCounter.WithLabelValues(EtcdPut), EtcdGet: etcdRequestCounter.WithLabelValues(EtcdGet), @@ -96,7 +108,10 @@ func NewCDCEtcdClient(ctx context.Context, cli *clientv3.Client) CDCEtcdClient { EtcdGrant: etcdRequestCounter.WithLabelValues(EtcdGrant), EtcdRevoke: etcdRequestCounter.WithLabelValues(EtcdRevoke), } - return CDCEtcdClient{Client: Wrap(cli, metrics)} + return CDCEtcdClient{ + Client: Wrap(cli, metrics), + ClusterID: clusterID, + } } // Close releases resources in CDCEtcdClient @@ -106,13 +121,13 @@ func (c CDCEtcdClient) Close() error { // ClearAllCDCInfo delete all keys created by CDC func (c CDCEtcdClient) ClearAllCDCInfo(ctx context.Context) error { - _, err := c.Client.Delete(ctx, BaseKey(), clientv3.WithPrefix()) + _, err := c.Client.Delete(ctx, BaseKey(c.ClusterID), clientv3.WithPrefix()) return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) } // GetAllCDCInfo get all keys created by CDC func (c CDCEtcdClient) GetAllCDCInfo(ctx context.Context) ([]*mvccpb.KeyValue, error) { - resp, err := c.Client.Get(ctx, BaseKey(), clientv3.WithPrefix()) + resp, err := c.Client.Get(ctx, BaseKey(c.ClusterID), clientv3.WithPrefix()) if err != nil { return nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) } @@ -125,7 +140,7 @@ func (c CDCEtcdClient) GetChangeFeeds(ctx context.Context) ( map[model.ChangeFeedID]*mvccpb.KeyValue, error, ) { // todo: support namespace - key := GetEtcdKeyChangeFeedList(model.DefaultNamespace) + key := GetEtcdKeyChangeFeedList(c.ClusterID, model.DefaultNamespace) resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix()) if err != nil { @@ -167,7 +182,7 @@ func (c CDCEtcdClient) GetAllChangeFeedInfo(ctx context.Context) ( func (c CDCEtcdClient) GetChangeFeedInfo(ctx context.Context, id model.ChangeFeedID, ) (*model.ChangeFeedInfo, error) { - key := GetEtcdKeyChangeFeedInfo(id) + key := GetEtcdKeyChangeFeedInfo(c.ClusterID, id) resp, err := c.Client.Get(ctx, key) if err != nil { return nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) @@ -184,7 +199,7 @@ func (c CDCEtcdClient) GetChangeFeedInfo(ctx context.Context, func (c CDCEtcdClient) DeleteChangeFeedInfo(ctx context.Context, id model.ChangeFeedID, ) error { - key := GetEtcdKeyChangeFeedInfo(id) + key := GetEtcdKeyChangeFeedInfo(c.ClusterID, id) _, err := c.Client.Delete(ctx, key) return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) } @@ -194,7 +209,7 @@ func (c CDCEtcdClient) GetAllChangeFeedStatus(ctx context.Context) ( map[model.ChangeFeedID]*model.ChangeFeedStatus, error, ) { // todo: support namespace - key := JobKeyPrefix(model.DefaultNamespace) + key := JobKeyPrefix(c.ClusterID, model.DefaultNamespace) resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix()) if err != nil { return nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) @@ -219,7 +234,7 @@ func (c CDCEtcdClient) GetAllChangeFeedStatus(ctx context.Context) ( func (c CDCEtcdClient) GetChangeFeedStatus(ctx context.Context, id model.ChangeFeedID, ) (*model.ChangeFeedStatus, int64, error) { - key := GetEtcdKeyJob(id) + key := GetEtcdKeyJob(c.ClusterID, id) resp, err := c.Client.Get(ctx, key) if err != nil { return nil, 0, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) @@ -234,7 +249,7 @@ func (c CDCEtcdClient) GetChangeFeedStatus(ctx context.Context, // GetCaptures returns kv revision and CaptureInfo list func (c CDCEtcdClient) GetCaptures(ctx context.Context) (int64, []*model.CaptureInfo, error) { - key := CaptureInfoKeyPrefix() + key := CaptureInfoKeyPrefix(c.ClusterID) resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix()) if err != nil { @@ -256,7 +271,7 @@ func (c CDCEtcdClient) GetCaptures(ctx context.Context) (int64, []*model.Capture // GetCaptureInfo get capture info from etcd. // return errCaptureNotExist if the capture not exists. func (c CDCEtcdClient) GetCaptureInfo(ctx context.Context, id string) (info *model.CaptureInfo, err error) { - key := GetEtcdKeyCaptureInfo(id) + key := GetEtcdKeyCaptureInfo(c.ClusterID, id) resp, err := c.Client.Get(ctx, key) if err != nil { @@ -278,7 +293,7 @@ func (c CDCEtcdClient) GetCaptureInfo(ctx context.Context, id string) (info *mod // GetCaptureLeases returns a map mapping from capture ID to its lease func (c CDCEtcdClient) GetCaptureLeases(ctx context.Context) (map[string]int64, error) { - key := CaptureInfoKeyPrefix() + key := CaptureInfoKeyPrefix(c.ClusterID) resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix()) if err != nil { @@ -315,8 +330,8 @@ func (c CDCEtcdClient) CreateChangefeedInfo(ctx context.Context, info *model.ChangeFeedInfo, changeFeedID model.ChangeFeedID, ) error { - infoKey := GetEtcdKeyChangeFeedInfo(changeFeedID) - jobKey := GetEtcdKeyJob(changeFeedID) + infoKey := GetEtcdKeyChangeFeedInfo(c.ClusterID, changeFeedID) + jobKey := GetEtcdKeyJob(c.ClusterID, changeFeedID) value, err := info.Marshal() if err != nil { return errors.Trace(err) @@ -348,7 +363,7 @@ func (c CDCEtcdClient) SaveChangeFeedInfo(ctx context.Context, info *model.ChangeFeedInfo, changeFeedID model.ChangeFeedID, ) error { - key := GetEtcdKeyChangeFeedInfo(changeFeedID) + key := GetEtcdKeyChangeFeedInfo(c.ClusterID, changeFeedID) value, err := info.Marshal() if err != nil { return errors.Trace(err) @@ -361,7 +376,7 @@ func (c CDCEtcdClient) SaveChangeFeedInfo(ctx context.Context, // and returns a slice of ProcInfoSnap(without table info) func (c CDCEtcdClient) GetProcessors(ctx context.Context) ([]*model.ProcInfoSnap, error) { // todo: support namespace - resp, err := c.Client.Get(ctx, TaskPositionKeyPrefix(model.DefaultNamespace), + resp, err := c.Client.Get(ctx, TaskPositionKeyPrefix(c.ClusterID, model.DefaultNamespace), clientv3.WithPrefix()) if err != nil { return nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) @@ -391,7 +406,7 @@ func (c CDCEtcdClient) GetProcessors(ctx context.Context) ([]*model.ProcInfoSnap func (c CDCEtcdClient) GetAllTaskPositions(ctx context.Context, changefeedID model.ChangeFeedID, ) (map[string]*model.TaskPosition, error) { - resp, err := c.Client.Get(ctx, TaskPositionKeyPrefix(changefeedID.Namespace), + resp, err := c.Client.Get(ctx, TaskPositionKeyPrefix(c.ClusterID, changefeedID.Namespace), clientv3.WithPrefix()) if err != nil { return nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) @@ -429,7 +444,7 @@ func (c CDCEtcdClient) GetTaskPosition( changefeedID model.ChangeFeedID, captureID string, ) (int64, *model.TaskPosition, error) { - key := GetEtcdKeyTaskPosition(changefeedID, captureID) + key := GetEtcdKeyTaskPosition(c.ClusterID, changefeedID, captureID) resp, err := c.Client.Get(ctx, key) if err != nil { return 0, nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) @@ -450,14 +465,14 @@ func (c CDCEtcdClient) PutCaptureInfo(ctx context.Context, info *model.CaptureIn return errors.Trace(err) } - key := GetEtcdKeyCaptureInfo(info.ID) + key := GetEtcdKeyCaptureInfo(c.ClusterID, info.ID) _, err = c.Client.Put(ctx, key, string(data), clientv3.WithLease(leaseID)) return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) } // DeleteCaptureInfo delete capture info from etcd. func (c CDCEtcdClient) DeleteCaptureInfo(ctx context.Context, id string) error { - key := GetEtcdKeyCaptureInfo(id) + key := GetEtcdKeyCaptureInfo(c.ClusterID, id) _, err := c.Client.Delete(ctx, key) return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) } @@ -476,7 +491,7 @@ func (c CDCEtcdClient) GetOwnerID(ctx context.Context, key string) (string, erro // GetOwnerRevision gets the Etcd revision for the elected owner. func (c CDCEtcdClient) GetOwnerRevision(ctx context.Context, captureID string) (rev int64, err error) { - resp, err := c.Client.Get(ctx, CaptureOwnerKey(), clientv3.WithFirstCreate()...) + resp, err := c.Client.Get(ctx, CaptureOwnerKey(c.ClusterID), clientv3.WithFirstCreate()...) if err != nil { return 0, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) } diff --git a/pkg/etcd/etcd_test.go b/pkg/etcd/etcd_test.go index c5b34f345a5..2d239e2f6c0 100644 --- a/pkg/etcd/etcd_test.go +++ b/pkg/etcd/etcd_test.go @@ -70,7 +70,7 @@ func (s *etcdTester) setUpTest(t *testing.T) { LogConfig: &logConfig, }) require.NoError(t, err) - s.client = NewCDCEtcdClient(context.TODO(), client) + s.client = NewCDCEtcdClient(context.TODO(), client, DefaultCDCClusterID) s.ctx, s.cancel = context.WithCancel(context.Background()) s.errg = util.HandleErrWithErrGroup(s.ctx, s.etcd.Err(), func(e error) { t.Log(e) }) } @@ -133,7 +133,8 @@ func TestGetChangeFeeds(t *testing.T) { for _, tc := range testCases { for i := 0; i < len(tc.ids); i++ { _, err := s.client.Client.Put(context.Background(), - GetEtcdKeyChangeFeedInfo(model.DefaultChangeFeedID(tc.ids[i])), + GetEtcdKeyChangeFeedInfo(DefaultCDCClusterID, + model.DefaultChangeFeedID(tc.ids[i])), tc.details[i]) require.NoError(t, err) } @@ -233,7 +234,7 @@ func putChangeFeedStatus( changefeedID model.ChangeFeedID, status *model.ChangeFeedStatus, ) error { - key := GetEtcdKeyJob(changefeedID) + key := GetEtcdKeyJob(DefaultCDCClusterID, changefeedID) value, err := status.Marshal() if err != nil { return errors.Trace(err) @@ -369,7 +370,8 @@ func TestGetOwnerRevision(t *testing.T) { sess, err := concurrency.NewSession(s.client.Client.Unwrap(), concurrency.WithTTL(10 /* seconds */)) require.Nil(t, err) - election := concurrency.NewElection(sess, CaptureOwnerKey()) + election := concurrency.NewElection(sess, + CaptureOwnerKey(DefaultCDCClusterID)) mockCaptureID := fmt.Sprintf("capture-%d", i) diff --git a/pkg/etcd/etcdkey.go b/pkg/etcd/etcdkey.go index 04a1250aab1..2b72e352038 100644 --- a/pkg/etcd/etcdkey.go +++ b/pkg/etcd/etcdkey.go @@ -19,7 +19,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" ) @@ -55,7 +54,7 @@ const ( CDCKeyTypeTaskPosition ) -// CDCKey represents a etcd key which is defined by TiCDC +// CDCKey represents an etcd key which is defined by TiCDC /* Usage: we can parse a raw etcd key: @@ -88,19 +87,18 @@ type CDCKey struct { } // BaseKey is the common prefix of the keys with cluster id in CDC -func BaseKey() string { - clusterID := config.GetGlobalServerConfig().ClusterID +func BaseKey(clusterID string) string { return fmt.Sprintf("/tidb/cdc/%s", clusterID) } // NamespacedPrefix returns the etcd prefix of changefeed data -func NamespacedPrefix(namespace string) string { - return BaseKey() + "/" + namespace +func NamespacedPrefix(clusterID, namespace string) string { + return BaseKey(clusterID) + "/" + namespace } // Parse parses the given etcd key -func (k *CDCKey) Parse(key string) error { - if !strings.HasPrefix(key, BaseKey()) { +func (k *CDCKey) Parse(clusterID, key string) error { + if !strings.HasPrefix(key, BaseKey(clusterID)) { return cerror.ErrInvalidEtcdKey.GenWithStackByArgs(key) } key = key[len("/tidb/cdc"):] @@ -168,19 +166,19 @@ func (k *CDCKey) String() string { switch k.Tp { case CDCKeyTypeOwner: if len(k.OwnerLeaseID) == 0 { - return BaseKey() + metaPrefix + ownerKey + return BaseKey(k.ClusterID) + metaPrefix + ownerKey } - return BaseKey() + metaPrefix + ownerKey + "/" + k.OwnerLeaseID + return BaseKey(k.ClusterID) + metaPrefix + ownerKey + "/" + k.OwnerLeaseID case CDCKeyTypeCapture: - return BaseKey() + metaPrefix + captureKey + "/" + k.CaptureID + return BaseKey(k.ClusterID) + metaPrefix + captureKey + "/" + k.CaptureID case CDCKeyTypeChangefeedInfo: - return NamespacedPrefix(k.ChangefeedID.Namespace) + changefeedInfoKey + + return NamespacedPrefix(k.ClusterID, k.ChangefeedID.Namespace) + changefeedInfoKey + "/" + k.ChangefeedID.ID case CDCKeyTypeChangeFeedStatus: - return NamespacedPrefix(k.ChangefeedID.Namespace) + jobKey + + return NamespacedPrefix(k.ClusterID, k.ChangefeedID.Namespace) + jobKey + "/" + k.ChangefeedID.ID case CDCKeyTypeTaskPosition: - return NamespacedPrefix(k.ChangefeedID.Namespace) + taskPositionKey + + return NamespacedPrefix(k.ClusterID, k.ChangefeedID.Namespace) + taskPositionKey + "/" + k.CaptureID + "/" + k.ChangefeedID.ID } log.Panic("unreachable") diff --git a/pkg/etcd/etcdkey_test.go b/pkg/etcd/etcdkey_test.go index 3283850964c..1595967b30b 100644 --- a/pkg/etcd/etcdkey_test.go +++ b/pkg/etcd/etcdkey_test.go @@ -30,14 +30,14 @@ func TestEtcdKey(t *testing.T) { expected: &CDCKey{ Tp: CDCKeyTypeOwner, OwnerLeaseID: "223176cb44d20a13", - ClusterID: "default", + ClusterID: DefaultCDCClusterID, }, }, { key: fmt.Sprintf("%s/owner", DefaultClusterAndMetaPrefix), expected: &CDCKey{ Tp: CDCKeyTypeOwner, OwnerLeaseID: "", - ClusterID: "default", + ClusterID: DefaultCDCClusterID, }, }, { key: fmt.Sprintf("%s/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225", @@ -45,7 +45,7 @@ func TestEtcdKey(t *testing.T) { expected: &CDCKey{ Tp: CDCKeyTypeCapture, CaptureID: "6bbc01c8-0605-4f86-a0f9-b3119109b225", - ClusterID: "default", + ClusterID: DefaultCDCClusterID, }, }, { key: fmt.Sprintf("%s", DefaultClusterAndNamespacePrefix) + @@ -53,7 +53,7 @@ func TestEtcdKey(t *testing.T) { expected: &CDCKey{ Tp: CDCKeyTypeChangefeedInfo, ChangefeedID: model.DefaultChangeFeedID("test-_@#$%changefeed"), - ClusterID: "default", + ClusterID: DefaultCDCClusterID, }, }, { key: fmt.Sprintf("%s", DefaultClusterAndNamespacePrefix) + @@ -61,7 +61,7 @@ func TestEtcdKey(t *testing.T) { expected: &CDCKey{ Tp: CDCKeyTypeChangefeedInfo, ChangefeedID: model.DefaultChangeFeedID("test/changefeed"), - ClusterID: "default", + ClusterID: DefaultCDCClusterID, }, }, { key: fmt.Sprintf("%s", DefaultClusterAndNamespacePrefix) + @@ -69,7 +69,7 @@ func TestEtcdKey(t *testing.T) { expected: &CDCKey{ Tp: CDCKeyTypeChangeFeedStatus, ChangefeedID: model.DefaultChangeFeedID("test-changefeed"), - ClusterID: "default", + ClusterID: DefaultCDCClusterID, }, }, { key: "/tidb/cdc/default/name/task" + @@ -81,7 +81,7 @@ func TestEtcdKey(t *testing.T) { ID: "test-changefeed", }, CaptureID: "6bbc01c8-0605-4f86-a0f9-b3119109b225", - ClusterID: "default", + ClusterID: DefaultCDCClusterID, }, }, { key: fmt.Sprintf("%s", DefaultClusterAndNamespacePrefix) + @@ -90,12 +90,12 @@ func TestEtcdKey(t *testing.T) { Tp: CDCKeyTypeTaskPosition, ChangefeedID: model.DefaultChangeFeedID("test/changefeed"), CaptureID: "6bbc01c8-0605-4f86-a0f9-b3119109b225", - ClusterID: "default", + ClusterID: DefaultCDCClusterID, }, }} for _, tc := range testcases { k := new(CDCKey) - err := k.Parse(tc.key) + err := k.Parse(DefaultCDCClusterID, tc.key) require.NoError(t, err) require.Equal(t, k, tc.expected) require.Equal(t, k.String(), tc.key) @@ -127,7 +127,7 @@ func TestEtcdKeyParseError(t *testing.T) { }} for _, tc := range testCases { k := new(CDCKey) - err := k.Parse(tc.key) + err := k.Parse(DefaultCDCClusterID, tc.key) if tc.error { require.NotNil(t, err) } else { diff --git a/pkg/orchestrator/reactor_state.go b/pkg/orchestrator/reactor_state.go index a62792579b1..35893debb35 100644 --- a/pkg/orchestrator/reactor_state.go +++ b/pkg/orchestrator/reactor_state.go @@ -28,6 +28,7 @@ import ( // GlobalReactorState represents a global state which stores all key-value pairs in ETCD type GlobalReactorState struct { + ClusterID string Owner map[string]struct{} Captures map[model.CaptureID]*model.CaptureInfo Changefeeds map[model.ChangeFeedID]*ChangefeedReactorState @@ -40,8 +41,9 @@ type GlobalReactorState struct { } // NewGlobalState creates a new global state -func NewGlobalState() *GlobalReactorState { +func NewGlobalState(clusterID string) *GlobalReactorState { return &GlobalReactorState{ + ClusterID: clusterID, Owner: map[string]struct{}{}, Captures: make(map[model.CaptureID]*model.CaptureInfo), Changefeeds: make(map[model.ChangeFeedID]*ChangefeedReactorState), @@ -51,7 +53,7 @@ func NewGlobalState() *GlobalReactorState { // Update implements the ReactorState interface func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) error { k := new(etcd.CDCKey) - err := k.Parse(key.String()) + err := k.Parse(s.ClusterID, key.String()) if err != nil { return errors.Trace(err) } @@ -95,7 +97,7 @@ func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) erro if value == nil { return nil } - changefeedState = NewChangefeedReactorState(k.ChangefeedID) + changefeedState = NewChangefeedReactorState(s.ClusterID, k.ChangefeedID) s.Changefeeds[k.ChangefeedID] = changefeedState } if err := changefeedState.UpdateCDCKey(k, value); err != nil { @@ -134,6 +136,7 @@ func (s *GlobalReactorState) SetOnCaptureRemoved(f func(captureID model.CaptureI // ChangefeedReactorState represents a changefeed state which stores all key-value pairs of a changefeed in ETCD type ChangefeedReactorState struct { + ClusterID string ID model.ChangeFeedID Info *model.ChangeFeedInfo Status *model.ChangeFeedStatus @@ -144,8 +147,11 @@ type ChangefeedReactorState struct { } // NewChangefeedReactorState creates a new changefeed reactor state -func NewChangefeedReactorState(id model.ChangeFeedID) *ChangefeedReactorState { +func NewChangefeedReactorState(clusterID string, + id model.ChangeFeedID, +) *ChangefeedReactorState { return &ChangefeedReactorState{ + ClusterID: clusterID, ID: id, TaskPositions: make(map[model.CaptureID]*model.TaskPosition), } @@ -154,7 +160,7 @@ func NewChangefeedReactorState(id model.ChangeFeedID) *ChangefeedReactorState { // Update implements the ReactorState interface func (s *ChangefeedReactorState) Update(key util.EtcdKey, value []byte, _ bool) error { k := new(etcd.CDCKey) - if err := k.Parse(key.String()); err != nil { + if err := k.Parse(s.ClusterID, key.String()); err != nil { return errors.Trace(err) } if err := s.UpdateCDCKey(k, value); err != nil { @@ -238,6 +244,7 @@ func (s *ChangefeedReactorState) getPatches() []DataPatch { // the etcd worker will exit and throw the ErrLeaseExpired error. func (s *ChangefeedReactorState) CheckCaptureAlive(captureID model.CaptureID) { k := etcd.CDCKey{ + ClusterID: s.ClusterID, Tp: etcd.CDCKeyTypeCapture, CaptureID: captureID, } @@ -284,6 +291,7 @@ func (s *ChangefeedReactorState) CheckChangefeedNormal() { // PatchInfo appends a DataPatch which can modify the ChangeFeedInfo func (s *ChangefeedReactorState) PatchInfo(fn func(*model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error)) { key := &etcd.CDCKey{ + ClusterID: s.ClusterID, Tp: etcd.CDCKeyTypeChangefeedInfo, ChangefeedID: s.ID, } @@ -299,6 +307,7 @@ func (s *ChangefeedReactorState) PatchInfo(fn func(*model.ChangeFeedInfo) (*mode // PatchStatus appends a DataPatch which can modify the ChangeFeedStatus func (s *ChangefeedReactorState) PatchStatus(fn func(*model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error)) { key := &etcd.CDCKey{ + ClusterID: s.ClusterID, Tp: etcd.CDCKeyTypeChangeFeedStatus, ChangefeedID: s.ID, } @@ -314,6 +323,7 @@ func (s *ChangefeedReactorState) PatchStatus(fn func(*model.ChangeFeedStatus) (* // PatchTaskPosition appends a DataPatch which can modify the TaskPosition of a specified capture func (s *ChangefeedReactorState) PatchTaskPosition(captureID model.CaptureID, fn func(*model.TaskPosition) (*model.TaskPosition, bool, error)) { key := &etcd.CDCKey{ + ClusterID: s.ClusterID, Tp: etcd.CDCKeyTypeTaskPosition, CaptureID: captureID, ChangefeedID: s.ID, diff --git a/pkg/orchestrator/reactor_state_test.go b/pkg/orchestrator/reactor_state_test.go index 03d2aa8e97b..db2457e7fbf 100644 --- a/pkg/orchestrator/reactor_state_test.go +++ b/pkg/orchestrator/reactor_state_test.go @@ -29,7 +29,8 @@ import ( ) func TestCheckCaptureAlive(t *testing.T) { - state := NewChangefeedReactorState(model.DefaultChangeFeedID("test")) + state := NewChangefeedReactorState(etcd.DefaultCDCClusterID, + model.DefaultChangeFeedID("test")) stateTester := NewReactorStateTester(t, state, nil) state.CheckCaptureAlive("6bbc01c8-0605-4f86-a0f9-b3119109b225") require.Contains(t, stateTester.ApplyPatches().Error(), "[CDC:ErrLeaseExpired]") @@ -117,7 +118,8 @@ func TestChangefeedStateUpdate(t *testing.T) { `{"id":"6bbc01c8-0605-4f86-a0f9-b3119109b225","address":"127.0.0.1:8300"}`, }, expected: ChangefeedReactorState{ - ID: model.DefaultChangeFeedID("test1"), + ClusterID: etcd.DefaultCDCClusterID, + ID: model.DefaultChangeFeedID("test1"), Info: &model.ChangeFeedInfo{ SinkURI: "blackhole://", Opts: map[string]string{}, @@ -167,7 +169,8 @@ func TestChangefeedStateUpdate(t *testing.T) { `{"id":"666777888","address":"127.0.0.1:8300"}`, }, expected: ChangefeedReactorState{ - ID: model.DefaultChangeFeedID("test1"), + ClusterID: etcd.DefaultCDCClusterID, + ID: model.DefaultChangeFeedID("test1"), Info: &model.ChangeFeedInfo{ SinkURI: "blackhole://", Opts: map[string]string{}, @@ -221,7 +224,8 @@ func TestChangefeedStateUpdate(t *testing.T) { `fake value`, }, expected: ChangefeedReactorState{ - ID: model.DefaultChangeFeedID("test1"), + ClusterID: etcd.DefaultCDCClusterID, + ID: model.DefaultChangeFeedID("test1"), Info: &model.ChangeFeedInfo{ SinkURI: "blackhole://", Opts: map[string]string{}, @@ -282,9 +286,10 @@ func TestChangefeedStateUpdate(t *testing.T) { ``, }, expected: ChangefeedReactorState{ - ID: model.DefaultChangeFeedID("test1"), - Info: nil, - Status: nil, + ClusterID: etcd.DefaultCDCClusterID, + ID: model.DefaultChangeFeedID("test1"), + Info: nil, + Status: nil, TaskPositions: map[model.CaptureID]*model.TaskPosition{ "666777888": {CheckPointTs: 11332244, ResolvedTs: 312321, Count: 8}, }, @@ -292,7 +297,8 @@ func TestChangefeedStateUpdate(t *testing.T) { }, } for i, tc := range testCases { - state := NewChangefeedReactorState(model.DefaultChangeFeedID(tc.changefeedID)) + state := NewChangefeedReactorState(etcd.DefaultCDCClusterID, + model.DefaultChangeFeedID(tc.changefeedID)) for i, k := range tc.updateKey { value := []byte(tc.updateValue[i]) if len(value) == 0 { @@ -307,7 +313,8 @@ func TestChangefeedStateUpdate(t *testing.T) { } func TestPatchInfo(t *testing.T) { - state := NewChangefeedReactorState(model.DefaultChangeFeedID("test1")) + state := NewChangefeedReactorState(etcd.DefaultCDCClusterID, + model.DefaultChangeFeedID("test1")) stateTester := NewReactorStateTester(t, state, nil) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { require.Nil(t, info) @@ -351,7 +358,8 @@ func TestPatchInfo(t *testing.T) { } func TestPatchStatus(t *testing.T) { - state := NewChangefeedReactorState(model.DefaultChangeFeedID("test1")) + state := NewChangefeedReactorState(etcd.DefaultCDCClusterID, + model.DefaultChangeFeedID("test1")) stateTester := NewReactorStateTester(t, state, nil) state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { require.Nil(t, status) @@ -373,7 +381,8 @@ func TestPatchStatus(t *testing.T) { } func TestPatchTaskPosition(t *testing.T) { - state := NewChangefeedReactorState(model.DefaultChangeFeedID("test1")) + state := NewChangefeedReactorState(etcd.DefaultCDCClusterID, + model.DefaultChangeFeedID("test1")) stateTester := NewReactorStateTester(t, state, nil) captureID1 := "capture1" captureID2 := "capture2" @@ -464,20 +473,23 @@ func TestGlobalStateUpdate(t *testing.T) { "admin-job-type":0}`, }, expected: GlobalReactorState{ - Owner: map[string]struct{}{"22317526c4fc9a37": {}, "22317526c4fc9a38": {}}, + ClusterID: etcd.DefaultCDCClusterID, + Owner: map[string]struct{}{"22317526c4fc9a37": {}, "22317526c4fc9a38": {}}, Captures: map[model.CaptureID]*model.CaptureInfo{"6bbc01c8-0605-4f86-a0f9-b3119109b225": { ID: "6bbc01c8-0605-4f86-a0f9-b3119109b225", AdvertiseAddr: "127.0.0.1:8300", }}, Changefeeds: map[model.ChangeFeedID]*ChangefeedReactorState{ model.DefaultChangeFeedID("test1"): { - ID: model.DefaultChangeFeedID("test1"), + ClusterID: etcd.DefaultCDCClusterID, + ID: model.DefaultChangeFeedID("test1"), TaskPositions: map[model.CaptureID]*model.TaskPosition{ "6bbc01c8-0605-4f86-a0f9-b3119109b225": {CheckPointTs: 421980719742451713, ResolvedTs: 421980720003809281}, }, }, model.DefaultChangeFeedID("test2"): { - ID: model.DefaultChangeFeedID("test2"), + ClusterID: etcd.DefaultCDCClusterID, + ID: model.DefaultChangeFeedID("test2"), TaskPositions: map[model.CaptureID]*model.TaskPosition{ "6bbc01c8-0605-4f86-a0f9-b3119109b225": { CheckPointTs: 421980719742451713, @@ -520,11 +532,13 @@ func TestGlobalStateUpdate(t *testing.T) { ``, }, expected: GlobalReactorState{ - Owner: map[string]struct{}{"22317526c4fc9a38": {}}, - Captures: map[model.CaptureID]*model.CaptureInfo{}, + ClusterID: etcd.DefaultCDCClusterID, + Owner: map[string]struct{}{"22317526c4fc9a38": {}}, + Captures: map[model.CaptureID]*model.CaptureInfo{}, Changefeeds: map[model.ChangeFeedID]*ChangefeedReactorState{ model.DefaultChangeFeedID("test2"): { - ID: model.DefaultChangeFeedID("test2"), + ClusterID: etcd.DefaultCDCClusterID, + ID: model.DefaultChangeFeedID("test2"), TaskPositions: map[model.CaptureID]*model.TaskPosition{ "6bbc01c8-0605-4f86-a0f9-b3119109b225": { CheckPointTs: 421980719742451713, @@ -537,7 +551,7 @@ func TestGlobalStateUpdate(t *testing.T) { }, } for _, tc := range testCases { - state := NewGlobalState() + state := NewGlobalState(etcd.DefaultCDCClusterID) for i, k := range tc.updateKey { value := []byte(tc.updateValue[i]) if len(value) == 0 { @@ -552,7 +566,7 @@ func TestGlobalStateUpdate(t *testing.T) { } func TestCaptureChangeHooks(t *testing.T) { - state := NewGlobalState() + state := NewGlobalState(etcd.DefaultCDCClusterID) var callCount int state.onCaptureAdded = func(captureID model.CaptureID, addr string) { @@ -572,19 +586,22 @@ func TestCaptureChangeHooks(t *testing.T) { captureInfoBytes, err := json.Marshal(captureInfo) require.Nil(t, err) - err = state.Update(util.NewEtcdKey(etcd.CaptureInfoKeyPrefix()+"/capture-1"), + err = state.Update(util.NewEtcdKey( + etcd.CaptureInfoKeyPrefix(etcd.DefaultCDCClusterID)+"/capture-1"), captureInfoBytes, false) require.Nil(t, err) require.Equal(t, callCount, 1) - err = state.Update(util.NewEtcdKey(etcd.CaptureInfoKeyPrefix()+"/capture-1"), + err = state.Update(util.NewEtcdKey( + etcd.CaptureInfoKeyPrefix(etcd.DefaultCDCClusterID)+"/capture-1"), nil /* delete */, false) require.Nil(t, err) require.Equal(t, callCount, 2) } func TestCheckChangefeedNormal(t *testing.T) { - state := NewChangefeedReactorState(model.DefaultChangeFeedID("test1")) + state := NewChangefeedReactorState(etcd.DefaultCDCClusterID, + model.DefaultChangeFeedID("test1")) stateTester := NewReactorStateTester(t, state, nil) state.CheckChangefeedNormal() stateTester.MustApplyPatches() diff --git a/tests/integration_tests/move_table/main.go b/tests/integration_tests/move_table/main.go index eef4cf155d6..b78c1d881b5 100644 --- a/tests/integration_tests/move_table/main.go +++ b/tests/integration_tests/move_table/main.go @@ -157,7 +157,7 @@ func newCluster(ctx context.Context, pd string) (*cluster, error) { ret := &cluster{ ownerAddr: "", captures: nil, - cdcEtcdCli: etcd.NewCDCEtcdClient(ctx, etcdCli), + cdcEtcdCli: etcd.NewCDCEtcdClient(ctx, etcdCli, etcd.DefaultCDCClusterID), } log.Info("new cluster initialized") @@ -208,7 +208,8 @@ func (c *cluster) moveAllTables(ctx context.Context, sourceCapture, targetCaptur } func (c *cluster) refreshInfo(ctx context.Context) error { - ownerID, err := c.cdcEtcdCli.GetOwnerID(ctx, etcd.CaptureOwnerKey) + ownerID, err := c.cdcEtcdCli.GetOwnerID(ctx, + etcd.CaptureOwnerKey(c.cdcEtcdCli.ClusterID)) if err != nil { return errors.Trace(err) } diff --git a/tests/utils/cdc_state_checker/cdc_monitor.go b/tests/utils/cdc_state_checker/cdc_monitor.go index 775ac59d007..378f824d42f 100644 --- a/tests/utils/cdc_state_checker/cdc_monitor.go +++ b/tests/utils/cdc_state_checker/cdc_monitor.go @@ -74,7 +74,8 @@ func newCDCMonitor(ctx context.Context, pd string, credential *security.Credenti wrappedCli := etcd.Wrap(etcdCli, map[string]prometheus.Counter{}) reactor := &cdcMonitReactor{} initState := newCDCReactorState() - etcdWorker, err := orchestrator.NewEtcdWorker(wrappedCli, etcd.BaseKey(), reactor, initState) + etcdWorker, err := orchestrator.NewEtcdWorker(wrappedCli, + etcd.BaseKey(etcd.DefaultCDCClusterID), reactor, initState) if err != nil { return nil, errors.Trace(err) } diff --git a/tests/utils/cdc_state_checker/state.go b/tests/utils/cdc_state_checker/state.go index cff39664e18..343bc4a5e09 100644 --- a/tests/utils/cdc_state_checker/state.go +++ b/tests/utils/cdc_state_checker/state.go @@ -35,11 +35,14 @@ type cdcReactorState struct { } var ( - captureRegex = regexp.MustCompile(regexp.QuoteMeta(etcd.CaptureInfoKeyPrefix()) + "/(.+)") + captureRegex = regexp.MustCompile(regexp.QuoteMeta( + etcd.CaptureInfoKeyPrefix(etcd.DefaultCDCClusterID)) + "/(.+)") changefeedRegex = regexp.MustCompile(regexp. - QuoteMeta(etcd.JobKeyPrefix(model.DefaultNamespace)) + "/(.+)") + QuoteMeta(etcd.JobKeyPrefix(etcd.DefaultCDCClusterID, + model.DefaultNamespace)) + "/(.+)") positionRegex = regexp.MustCompile(regexp. - QuoteMeta(etcd.TaskPositionKeyPrefix(model.DefaultNamespace)) + "/(.+?)/(.+)") + QuoteMeta(etcd.TaskPositionKeyPrefix(etcd.DefaultCDCClusterID, + model.DefaultNamespace)) + "/(.+?)/(.+)") ) func newCDCReactorState() *cdcReactorState { @@ -52,7 +55,7 @@ func newCDCReactorState() *cdcReactorState { } func (s *cdcReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error { - if key.String() == etcd.CaptureOwnerKey() { + if key.String() == etcd.CaptureOwnerKey(etcd.DefaultCDCClusterID) { if value == nil { log.Info("Owner lost", zap.String("oldOwner", s.Owner)) return nil