Skip to content

Commit

Permalink
DNM *(ticdc): cli support cdc cluster (#5412)
Browse files Browse the repository at this point in the history
* cli support cdc cluster-id cmd flag
  • Loading branch information
sdojjy committed May 24, 2022
1 parent c1996c0 commit 3425318
Show file tree
Hide file tree
Showing 25 changed files with 233 additions and 144 deletions.
3 changes: 2 additions & 1 deletion cdc/api/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ func RegisterStatusAPIRoutes(router *gin.Engine, capture *capture.Capture) {
}

func (h *statusAPI) writeEtcdInfo(ctx context.Context, cli *etcd.CDCEtcdClient, w io.Writer) {
resp, err := cli.Client.Get(ctx, etcd.BaseKey(), clientv3.WithPrefix())
resp, err := cli.Client.Get(ctx,
etcd.BaseKey(cli.ClusterID), clientv3.WithPrefix())
if err != nil {
fmt.Fprintf(w, "failed to get info: %s\n\n", err.Error())
return
Expand Down
15 changes: 9 additions & 6 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (c *Capture) reset(ctx context.Context) error {
_ = c.session.Close()
}
c.session = sess
c.election = concurrency.NewElection(sess, etcd.CaptureOwnerKey())
c.election = concurrency.NewElection(sess, etcd.CaptureOwnerKey(c.EtcdClient.ClusterID))

if c.tableActorSystem != nil {
c.tableActorSystem.Stop()
Expand Down Expand Up @@ -279,7 +279,7 @@ func (c *Capture) run(stdCtx context.Context) error {
conf := config.GetGlobalServerConfig()
processorFlushInterval := time.Duration(conf.ProcessorFlushInterval)

globalState := orchestrator.NewGlobalState()
globalState := orchestrator.NewGlobalState(c.EtcdClient.ClusterID)

globalState.SetOnCaptureAdded(func(captureID model.CaptureID, addr string) {
c.MessageRouter.AddPeer(captureID, addr)
Expand Down Expand Up @@ -380,7 +380,7 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error {
owner := c.newOwner(c.UpstreamManager)
c.setOwner(owner)

globalState := orchestrator.NewGlobalState()
globalState := orchestrator.NewGlobalState(c.EtcdClient.ClusterID)

globalState.SetOnCaptureAdded(func(captureID model.CaptureID, addr string) {
c.MessageRouter.AddPeer(captureID, addr)
Expand All @@ -389,7 +389,9 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error {
c.MessageRouter.RemovePeer(captureID)
})

err = c.runEtcdWorker(ownerCtx, owner, orchestrator.NewGlobalState(), ownerFlushInterval, util.RoleOwner.String())
err = c.runEtcdWorker(ownerCtx, owner,
orchestrator.NewGlobalState(c.EtcdClient.ClusterID),
ownerFlushInterval, util.RoleOwner.String())
c.setOwner(nil)
log.Info("run owner exited", zap.Error(err))
// if owner exits, resign the owner key
Expand All @@ -413,7 +415,7 @@ func (c *Capture) runEtcdWorker(
role string,
) error {
etcdWorker, err := orchestrator.NewEtcdWorker(ctx.GlobalVars().EtcdClient.Client,
etcd.BaseKey(), reactor, reactorState)
etcd.BaseKey(c.EtcdClient.ClusterID), reactor, reactorState)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -578,7 +580,8 @@ func (c *Capture) GetOwnerCaptureInfo(ctx context.Context) (*model.CaptureInfo,
return nil, err
}

ownerID, err := c.EtcdClient.GetOwnerID(ctx, etcd.CaptureOwnerKey())
ownerID, err := c.EtcdClient.GetOwnerID(ctx,
etcd.CaptureOwnerKey(c.EtcdClient.ClusterID))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/capture/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestReset(t *testing.T) {
DialTimeout: 3 * time.Second,
})
require.NoError(t, err)
client := etcd.NewCDCEtcdClient(ctx, etcdCli)
client := etcd.NewCDCEtcdClient(ctx, etcdCli, etcd.DefaultCDCClusterID)
// Close the client before the test function exits to prevent possible
// ctx leaks.
// Ref: https://github.com/grpc/grpc-go/blob/master/stream.go#L229
Expand Down
3 changes: 2 additions & 1 deletion cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ func createChangefeed4Test(ctx cdcContext.Context, t *testing.T) (
return &mockScheduler{}, nil
}
cf.upStream = upStream
state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
require.Nil(t, info)
Expand Down
18 changes: 12 additions & 6 deletions cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import (
func TestHandleJob(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
manager := newFeedStateManager4Test()
state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
require.Nil(t, info)
Expand Down Expand Up @@ -105,7 +106,8 @@ func TestHandleJob(t *testing.T) {
func TestMarkFinished(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
manager := newFeedStateManager4Test()
state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
require.Nil(t, info)
Expand Down Expand Up @@ -133,7 +135,8 @@ func TestMarkFinished(t *testing.T) {
func TestCleanUpInfos(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
manager := newFeedStateManager4Test()
state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
require.Nil(t, info)
Expand Down Expand Up @@ -165,7 +168,8 @@ func TestCleanUpInfos(t *testing.T) {
func TestHandleError(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
manager := newFeedStateManager4Test()
state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
require.Nil(t, info)
Expand Down Expand Up @@ -228,7 +232,8 @@ func TestHandleError(t *testing.T) {
func TestHandleFastFailError(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
manager := new(feedStateManager)
state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
require.Nil(t, info)
Expand Down Expand Up @@ -308,7 +313,8 @@ func TestChangefeedStatusNotExist(t *testing.T) {
`
ctx := cdcContext.NewBackendContext4Test(true)
manager := newFeedStateManager4Test()
state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, map[string]string{
fmt.Sprintf("%s/capture/d563bfc0-f406-4f34-bc7d-6dc2e35a44e5",
etcd.DefaultClusterAndMetaPrefix): `
Expand Down
15 changes: 12 additions & 3 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@ func createOwner4Test(ctx cdcContext.Context, t *testing.T) (*ownerImpl, *orches
o := owner.(*ownerImpl)
o.upstreamManager = upstream.NewManager4Test(pdClient)

state := orchestrator.NewGlobalState()
state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID)
tester := orchestrator.NewReactorStateTester(t, state, nil)

// set captures
cdcKey := etcd.CDCKey{
ClusterID: state.ClusterID,
Tp: etcd.CDCKeyTypeCapture,
CaptureID: ctx.GlobalVars().CaptureInfo.ID,
}
Expand All @@ -91,6 +92,7 @@ func TestCreateRemoveChangefeed(t *testing.T) {
changefeedStr, err := changefeedInfo.Marshal()
require.Nil(t, err)
cdcKey := etcd.CDCKey{
ClusterID: state.ClusterID,
Tp: etcd.CDCKeyTypeChangefeedInfo,
ChangefeedID: changefeedID,
}
Expand Down Expand Up @@ -160,6 +162,7 @@ func TestStopChangefeed(t *testing.T) {
changefeedStr, err := changefeedInfo.Marshal()
require.Nil(t, err)
cdcKey := etcd.CDCKey{
ClusterID: state.ClusterID,
Tp: etcd.CDCKeyTypeChangefeedInfo,
ChangefeedID: changefeedID,
}
Expand Down Expand Up @@ -209,6 +212,7 @@ func TestFixChangefeedState(t *testing.T) {
changefeedStr, err := changefeedInfo.Marshal()
require.Nil(t, err)
cdcKey := etcd.CDCKey{
ClusterID: state.ClusterID,
Tp: etcd.CDCKeyTypeChangefeedInfo,
ChangefeedID: changefeedID,
}
Expand Down Expand Up @@ -248,6 +252,7 @@ func TestFixChangefeedSinkProtocol(t *testing.T) {
changefeedStr, err := changefeedInfo.Marshal()
require.Nil(t, err)
cdcKey := etcd.CDCKey{
ClusterID: state.ClusterID,
Tp: etcd.CDCKeyTypeChangefeedInfo,
ChangefeedID: changefeedID,
}
Expand Down Expand Up @@ -288,6 +293,7 @@ func TestCheckClusterVersion(t *testing.T) {
changefeedStr, err := changefeedInfo.Marshal()
require.Nil(t, err)
cdcKey := etcd.CDCKey{
ClusterID: state.ClusterID,
Tp: etcd.CDCKeyTypeChangefeedInfo,
ChangefeedID: changefeedID,
}
Expand Down Expand Up @@ -369,7 +375,7 @@ func TestUpdateGCSafePoint(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
ctx, cancel := cdcContext.WithCancel(ctx)
defer cancel()
state := orchestrator.NewGlobalState()
state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID)
tester := orchestrator.NewReactorStateTester(t, state, nil)

// no changefeed, the gc safe point should be max uint64
Expand Down Expand Up @@ -488,6 +494,7 @@ func TestHandleJobsDontBlock(t *testing.T) {
changefeedStr, err := cfInfo1.Marshal()
require.Nil(t, err)
cdcKey := etcd.CDCKey{
ClusterID: state.ClusterID,
Tp: etcd.CDCKeyTypeChangefeedInfo,
ChangefeedID: cf1,
}
Expand All @@ -505,6 +512,7 @@ func TestHandleJobsDontBlock(t *testing.T) {
Version: " v0.0.1-test-only",
}
cdcKey = etcd.CDCKey{
ClusterID: state.ClusterID,
Tp: etcd.CDCKeyTypeCapture,
CaptureID: captureInfo.ID,
}
Expand All @@ -522,6 +530,7 @@ func TestHandleJobsDontBlock(t *testing.T) {
changefeedStr1, err := cfInfo2.Marshal()
require.Nil(t, err)
cdcKey = etcd.CDCKey{
ClusterID: state.ClusterID,
Tp: etcd.CDCKeyTypeChangefeedInfo,
ChangefeedID: cf2,
}
Expand Down Expand Up @@ -565,7 +574,7 @@ WorkLoop:
}

func TestCalculateGCSafepointTs(t *testing.T) {
state := orchestrator.NewGlobalState()
state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID)
expectMinTsMap := make(map[uint64]uint64)
expectForceUpdateMap := make(map[uint64]interface{})
o := ownerImpl{changefeeds: make(map[model.ChangeFeedID]*changefeed)}
Expand Down
11 changes: 7 additions & 4 deletions cdc/processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (s *managerTester) resetSuit(ctx cdcContext.Context, t *testing.T) {
checkpointTs: replicaInfo.StartTs,
}, nil
})
s.state = orchestrator.NewGlobalState()
s.state = orchestrator.NewGlobalState(etcd.DefaultCDCClusterID)
captureInfoBytes, err := ctx.GlobalVars().CaptureInfo.Marshal()
require.Nil(t, err)
s.tester = orchestrator.NewReactorStateTester(t, s.state, map[string]string{
Expand All @@ -83,7 +83,8 @@ func TestChangefeed(t *testing.T) {

changefeedID := model.DefaultChangeFeedID("test-changefeed")
// an inactive changefeed
s.state.Changefeeds[changefeedID] = orchestrator.NewChangefeedReactorState(changefeedID)
s.state.Changefeeds[changefeedID] = orchestrator.NewChangefeedReactorState(
etcd.DefaultCDCClusterID, changefeedID)
_, err = s.manager.Tick(ctx, s.state)
s.tester.MustApplyPatches()
require.Nil(t, err)
Expand Down Expand Up @@ -135,7 +136,8 @@ func TestDebugInfo(t *testing.T) {

changefeedID := model.DefaultChangeFeedID("test-changefeed")
// an active changefeed
s.state.Changefeeds[changefeedID] = orchestrator.NewChangefeedReactorState(changefeedID)
s.state.Changefeeds[changefeedID] = orchestrator.NewChangefeedReactorState(
etcd.DefaultCDCClusterID, changefeedID)
s.state.Changefeeds[changefeedID].PatchInfo(
func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
return &model.ChangeFeedInfo{
Expand Down Expand Up @@ -189,7 +191,8 @@ func TestClose(t *testing.T) {

changefeedID := model.DefaultChangeFeedID("test-changefeed")
// an active changefeed
s.state.Changefeeds[changefeedID] = orchestrator.NewChangefeedReactorState(changefeedID)
s.state.Changefeeds[changefeedID] = orchestrator.NewChangefeedReactorState(
etcd.DefaultCDCClusterID, changefeedID)
s.state.Changefeeds[changefeedID].PatchInfo(
func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
return &model.ChangeFeedInfo{
Expand Down
4 changes: 3 additions & 1 deletion cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ func initProcessor4Test(ctx cdcContext.Context, t *testing.T) (*processor, *orch
checkpointTs: replicaInfo.StartTs,
}, nil
})
p.changefeed = orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
p.changefeed = orchestrator.NewChangefeedReactorState(
etcd.DefaultCDCClusterID, ctx.ChangefeedVars().ID)
captureID := ctx.GlobalVars().CaptureInfo.ID
changefeedID := ctx.ChangefeedVars().ID
return p, orchestrator.NewReactorStateTester(t, p.changefeed, map[string]string{
Expand Down Expand Up @@ -578,6 +579,7 @@ func TestSchemaGC(t *testing.T) {

func updateChangeFeedPosition(t *testing.T, tester *orchestrator.ReactorStateTester, cfID model.ChangeFeedID, resolvedTs, checkpointTs model.Ts) {
key := etcd.CDCKey{
ClusterID: etcd.DefaultCDCClusterID,
Tp: etcd.CDCKeyTypeChangeFeedStatus,
ChangefeedID: cfID,
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/base/processor_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func NewAgent(
etcdCliCtx, cancel := context.WithTimeout(ctx, getOwnerFromEtcdTimeout)
defer cancel()
ownerCaptureID, err := ret.etcdClient.
GetOwnerID(etcdCliCtx, etcd.CaptureOwnerKey())
GetOwnerID(etcdCliCtx, etcd.CaptureOwnerKey(etcd.DefaultCDCClusterID))
if err != nil {
if err != concurrency.ErrElectionNoLeader {
return nil, errors.Trace(err)
Expand Down
17 changes: 9 additions & 8 deletions cdc/scheduler/internal/base/processor_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func newAgentTestSuite(t *testing.T) *agentTestSuite {
}

func (s *agentTestSuite) CreateAgent(t *testing.T) (*agentImpl, error) {
cdcEtcdClient := etcd.NewCDCEtcdClient(s.ctx, s.etcdClient)
cdcEtcdClient := etcd.NewCDCEtcdClient(s.ctx, s.etcdClient, etcd.DefaultCDCClusterID)
messageServer := s.cluster.Nodes["capture-1"].Server
messageRouter := s.cluster.Nodes["capture-1"].Router
s.tableExecutor = NewMockTableExecutor(t)
Expand Down Expand Up @@ -219,11 +219,11 @@ func TestAgentBasics(t *testing.T) {
defer suite.Close()

suite.etcdKVClient.On("Get", mock.Anything,
etcd.CaptureOwnerKey(), mock.Anything).
etcd.CaptureOwnerKey(etcd.DefaultCDCClusterID), mock.Anything).
Return(&clientv3.GetResponse{
Kvs: []*mvccpb.KeyValue{
{
Key: []byte(etcd.CaptureOwnerKey()),
Key: []byte(etcd.CaptureOwnerKey(etcd.DefaultCDCClusterID)),
Value: []byte(ownerCaptureID),
ModRevision: 1,
},
Expand Down Expand Up @@ -337,7 +337,7 @@ func TestAgentNoOwnerAtStartUp(t *testing.T) {

// Empty response implies no owner.
suite.etcdKVClient.On("Get", mock.Anything,
etcd.CaptureOwnerKey(), mock.Anything).
etcd.CaptureOwnerKey(etcd.DefaultCDCClusterID), mock.Anything).
Return(&clientv3.GetResponse{}, nil)

// Test Point 1: Create an agent.
Expand Down Expand Up @@ -393,11 +393,12 @@ func TestAgentTolerateClientClosed(t *testing.T) {
defer suite.Close()

suite.etcdKVClient.On("Get", mock.Anything,
etcd.CaptureOwnerKey(), mock.Anything).
etcd.CaptureOwnerKey(etcd.DefaultCDCClusterID), mock.Anything).
Return(&clientv3.GetResponse{
Kvs: []*mvccpb.KeyValue{
{
Key: []byte(etcd.CaptureOwnerKey()),
Key: []byte(etcd.CaptureOwnerKey(
etcd.DefaultCDCClusterID)),
Value: []byte(ownerCaptureID),
ModRevision: 1,
},
Expand Down Expand Up @@ -438,11 +439,11 @@ func TestNoFinishOperationBeforeSyncIsReceived(t *testing.T) {
defer suite.Close()

suite.etcdKVClient.On("Get", mock.Anything,
etcd.CaptureOwnerKey(), mock.Anything).
etcd.CaptureOwnerKey(etcd.DefaultCDCClusterID), mock.Anything).
Return(&clientv3.GetResponse{
Kvs: []*mvccpb.KeyValue{
{
Key: []byte(etcd.CaptureOwnerKey()),
Key: []byte(etcd.CaptureOwnerKey(etcd.DefaultCDCClusterID)),
Value: []byte(ownerCaptureID),
ModRevision: 1,
},
Expand Down
2 changes: 1 addition & 1 deletion cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (s *Server) Run(ctx context.Context) error {
return errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "new etcd client")
}

cdcEtcdClient := etcd.NewCDCEtcdClient(ctx, etcdCli)
cdcEtcdClient := etcd.NewCDCEtcdClient(ctx, etcdCli, conf.ClusterID)
s.etcdClient = &cdcEtcdClient

err = s.initDir(ctx)
Expand Down
2 changes: 1 addition & 1 deletion cdc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func newServer(t *testing.T) *testServer {
DialTimeout: 5 * time.Second,
})
require.Nil(t, err)
etcdClient := etcd.NewCDCEtcdClient(s.ctx, client)
etcdClient := etcd.NewCDCEtcdClient(s.ctx, client, etcd.DefaultCDCClusterID)
s.server.etcdClient = &etcdClient

s.errg = util.HandleErrWithErrGroup(s.ctx, s.e.Err(), func(e error) { t.Log(e) })
Expand Down
3 changes: 2 additions & 1 deletion pkg/cmd/cli/cli_capture_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ func listCaptures(ctx context.Context, etcdClient *etcd.CDCEtcdClient) ([]*captu
return nil, err
}

ownerID, err := etcdClient.GetOwnerID(ctx, etcd.CaptureOwnerKey())
ownerID, err := etcdClient.GetOwnerID(ctx,
etcd.CaptureOwnerKey(etcdClient.ClusterID))
if err != nil && errors.Cause(err) != concurrency.ErrElectionNoLeader {
return nil, err
}
Expand Down
Loading

0 comments on commit 3425318

Please sign in to comment.