From 8e9d88bfd15ec62bdf45df1915edbccdad27ca23 Mon Sep 17 00:00:00 2001 From: dsdashun Date: Mon, 21 Feb 2022 17:49:42 +0800 Subject: [PATCH] dm-master(dm): use advertise address for ETCD client (#4608) close pingcap/tiflow#4511 --- dm/dm/master/election_test.go | 2 + dm/dm/master/etcd.go | 2 +- dm/dm/master/etcd_test.go | 5 +- dm/dm/master/openapi_test.go | 3 ++ dm/dm/master/server.go | 2 +- dm/dm/master/server_test.go | 93 ++++++++++++++++++++++++----------- 6 files changed, 76 insertions(+), 31 deletions(-) diff --git a/dm/dm/master/election_test.go b/dm/dm/master/election_test.go index ad13d0d4d9b..a521611f0f5 100644 --- a/dm/dm/master/election_test.go +++ b/dm/dm/master/election_test.go @@ -51,6 +51,7 @@ func (t *testElectionSuite) TestFailToStartLeader(c *check.C) { cfg1.Name = "dm-master-1" cfg1.DataDir = c.MkDir() cfg1.MasterAddr = tempurl.Alloc()[len("http://"):] + cfg1.AdvertiseAddr = cfg1.MasterAddr cfg1.PeerUrls = tempurl.Alloc() cfg1.AdvertisePeerUrls = cfg1.PeerUrls cfg1.InitialCluster = fmt.Sprintf("%s=%s", cfg1.Name, cfg1.AdvertisePeerUrls) @@ -68,6 +69,7 @@ func (t *testElectionSuite) TestFailToStartLeader(c *check.C) { cfg2.Name = "dm-master-2" cfg2.DataDir = c.MkDir() cfg2.MasterAddr = tempurl.Alloc()[len("http://"):] + cfg2.AdvertiseAddr = cfg2.MasterAddr cfg2.PeerUrls = tempurl.Alloc() cfg2.AdvertisePeerUrls = cfg2.PeerUrls cfg2.Join = cfg1.MasterAddr // join to an existing cluster diff --git a/dm/dm/master/etcd.go b/dm/dm/master/etcd.go index bd7762cc4b7..a10b53af54b 100644 --- a/dm/dm/master/etcd.go +++ b/dm/dm/master/etcd.go @@ -97,7 +97,7 @@ func prepareJoinEtcd(cfg *Config) error { } // try to join self, invalid - if cfg.Join == cfg.MasterAddr { + if cfg.Join == cfg.AdvertiseAddr { return terror.ErrMasterJoinEmbedEtcdFail.Generate(fmt.Sprintf("join self %s is forbidden", cfg.Join)) } diff --git a/dm/dm/master/etcd_test.go b/dm/dm/master/etcd_test.go index e3a8a8b1572..402396e0e40 100644 --- a/dm/dm/master/etcd_test.go +++ b/dm/dm/master/etcd_test.go @@ -69,7 +69,7 @@ func (t *testEtcdSuite) TestPrepareJoinEtcd(c *check.C) { cfgCluster.Name = "dm-master-1" cfgCluster.DataDir = c.MkDir() cfgCluster.MasterAddr = tempurl.Alloc()[len("http://"):] - cfgCluster.AdvertiseAddr = tempurl.Alloc()[len("http://"):] + cfgCluster.AdvertiseAddr = cfgCluster.MasterAddr cfgCluster.PeerUrls = tempurl.Alloc() c.Assert(cfgCluster.adjust(), check.IsNil) cfgClusterEtcd := genEmbedEtcdConfigWithLogger("info") @@ -79,6 +79,7 @@ func (t *testEtcdSuite) TestPrepareJoinEtcd(c *check.C) { cfgBefore := t.cloneConfig(cfgCluster) // before `prepareJoinEtcd` applied cfgBefore.DataDir = c.MkDir() // overwrite some config items cfgBefore.MasterAddr = tempurl.Alloc()[len("http://"):] + cfgBefore.AdvertiseAddr = cfgBefore.MasterAddr cfgBefore.PeerUrls = tempurl.Alloc() cfgBefore.AdvertisePeerUrls = cfgBefore.PeerUrls c.Assert(cfgBefore.adjust(), check.IsNil) @@ -172,6 +173,7 @@ func (t *testEtcdSuite) TestPrepareJoinEtcd(c *check.C) { cfgAfter2.Name = "dm-master-3" // overwrite some items cfgAfter2.DataDir = c.MkDir() cfgAfter2.MasterAddr = tempurl.Alloc()[len("http://"):] + cfgAfter2.AdvertiseAddr = cfgAfter2.MasterAddr cfgAfter2.PeerUrls = tempurl.Alloc() cfgAfter2.AdvertisePeerUrls = cfgAfter2.PeerUrls err = prepareJoinEtcd(cfgAfter2) @@ -229,6 +231,7 @@ func (t *testEtcdSuite) TestEtcdAutoCompaction(c *check.C) { cfg.DataDir = c.MkDir() cfg.MasterAddr = tempurl.Alloc()[len("http://"):] + cfg.AdvertiseAddr = cfg.MasterAddr cfg.AutoCompactionRetention = "1s" ctx, cancel := context.WithCancel(context.Background()) diff --git a/dm/dm/master/openapi_test.go b/dm/dm/master/openapi_test.go index 453a485bdce..453fee955c1 100644 --- a/dm/dm/master/openapi_test.go +++ b/dm/dm/master/openapi_test.go @@ -85,6 +85,7 @@ func (t *openAPISuite) TestRedirectRequestToLeader(c *check.C) { cfg1.Name = "dm-master-1" cfg1.DataDir = c.MkDir() cfg1.MasterAddr = tempurl.Alloc()[len("http://"):] + cfg1.AdvertiseAddr = cfg1.MasterAddr cfg1.PeerUrls = tempurl.Alloc() cfg1.AdvertisePeerUrls = cfg1.PeerUrls cfg1.InitialCluster = fmt.Sprintf("%s=%s", cfg1.Name, cfg1.AdvertisePeerUrls) @@ -106,6 +107,7 @@ func (t *openAPISuite) TestRedirectRequestToLeader(c *check.C) { cfg2.Name = "dm-master-2" cfg2.DataDir = c.MkDir() cfg2.MasterAddr = tempurl.Alloc()[len("http://"):] + cfg2.AdvertiseAddr = cfg2.MasterAddr cfg2.PeerUrls = tempurl.Alloc() cfg2.AdvertisePeerUrls = cfg2.PeerUrls cfg2.Join = cfg1.MasterAddr // join to an existing cluster @@ -144,6 +146,7 @@ func (t *openAPISuite) TestOpenAPIWillNotStartInDefaultConfig(c *check.C) { cfg1.Name = "dm-master-1" cfg1.DataDir = c.MkDir() cfg1.MasterAddr = tempurl.Alloc()[len("http://"):] + cfg1.AdvertiseAddr = cfg1.MasterAddr cfg1.PeerUrls = tempurl.Alloc() cfg1.AdvertisePeerUrls = cfg1.PeerUrls cfg1.InitialCluster = fmt.Sprintf("%s=%s", cfg1.Name, cfg1.AdvertisePeerUrls) diff --git a/dm/dm/master/server.go b/dm/dm/master/server.go index 6b02c7af1e1..128df635903 100644 --- a/dm/dm/master/server.go +++ b/dm/dm/master/server.go @@ -216,7 +216,7 @@ func (s *Server) Start(ctx context.Context) (err error) { // create an etcd client used in the whole server instance. // NOTE: we only use the local member's address now, but we can use all endpoints of the cluster if needed. - s.etcdClient, err = etcdutil.CreateClient([]string{withHost(s.cfg.MasterAddr)}, tls.TLSConfig()) + s.etcdClient, err = etcdutil.CreateClient([]string{withHost(s.cfg.AdvertiseAddr)}, tls.TLSConfig()) if err != nil { return } diff --git a/dm/dm/master/server_test.go b/dm/dm/master/server_test.go index b4bfbdcdaba..a2b0386dfaa 100644 --- a/dm/dm/master/server_test.go +++ b/dm/dm/master/server_test.go @@ -1366,40 +1366,51 @@ func (t *testMaster) TestOperateWorkerRelayTask(c *check.C) { } func (t *testMaster) TestServer(c *check.C) { + var err error cfg := NewConfig() c.Assert(cfg.Parse([]string{"-config=./dm-master.toml"}), check.IsNil) cfg.PeerUrls = "http://127.0.0.1:8294" cfg.DataDir = c.MkDir() cfg.MasterAddr = tempurl.Alloc()[len("http://"):] + cfg.AdvertiseAddr = cfg.MasterAddr + + basicServiceCheck := func(c *check.C, cfg *Config) { + t.testHTTPInterface(c, fmt.Sprintf("http://%s/status", cfg.AdvertiseAddr), []byte(utils.GetRawInfo())) + t.testHTTPInterface(c, fmt.Sprintf("http://%s/debug/pprof/", cfg.AdvertiseAddr), []byte("Types of profiles available")) + // HTTP API in this unit test is unstable, but we test it in `http_apis` in integration test. + // t.testHTTPInterface(c, fmt.Sprintf("http://%s/apis/v1alpha1/status/test-task", cfg.AdvertiseAddr), []byte("task test-task has no source or not exist")) + } + t.testNormalServerLifecycle(c, cfg, func(c *check.C, cfg *Config) { + basicServiceCheck(c, cfg) + + // try to start another server with the same address. Expect it to fail + dupServer := NewServer(cfg) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err1 := dupServer.Start(ctx) + c.Assert(terror.ErrMasterStartEmbedEtcdFail.Equal(err1), check.IsTrue) + c.Assert(err1.Error(), check.Matches, ".*bind: address already in use.*") + }) - s := NewServer(cfg) - - ctx, cancel := context.WithCancel(context.Background()) - err1 := s.Start(ctx) - c.Assert(err1, check.IsNil) - - t.testHTTPInterface(c, fmt.Sprintf("http://%s/status", cfg.MasterAddr), []byte(utils.GetRawInfo())) - t.testHTTPInterface(c, fmt.Sprintf("http://%s/debug/pprof/", cfg.MasterAddr), []byte("Types of profiles available")) - // HTTP API in this unit test is unstable, but we test it in `http_apis` in integration test. - // t.testHTTPInterface(c, fmt.Sprintf("http://%s/apis/v1alpha1/status/test-task", cfg.MasterAddr), []byte("task test-task has no source or not exist")) - - dupServer := NewServer(cfg) - err := dupServer.Start(ctx) - c.Assert(terror.ErrMasterStartEmbedEtcdFail.Equal(err), check.IsTrue) - c.Assert(err.Error(), check.Matches, ".*bind: address already in use.*") - - // close - cancel() - s.Close() - - c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { - return s.closed.Load() - }), check.IsTrue) + // test the listen address is 0.0.0.0 + masterAddrStr := tempurl.Alloc()[len("http://"):] + _, masterPort, err := net.SplitHostPort(masterAddrStr) + c.Assert(err, check.IsNil) + cfg2 := NewConfig() + *cfg2 = *cfg + cfg2.MasterAddr = fmt.Sprintf("0.0.0.0:%s", masterPort) + cfg2.AdvertiseAddr = masterAddrStr + t.testNormalServerLifecycle(c, cfg2, basicServiceCheck) } func (t *testMaster) TestMasterTLS(c *check.C) { + var err error masterAddr := tempurl.Alloc()[len("http://"):] peerAddr := tempurl.Alloc()[len("http://"):] + _, masterPort, err := net.SplitHostPort(masterAddr) + c.Assert(err, check.IsNil) + _, peerPort, err := net.SplitHostPort(peerAddr) + c.Assert(err, check.IsNil) // all with `https://` prefix cfg := NewConfig() @@ -1547,17 +1558,40 @@ func (t *testMaster) TestMasterTLS(c *check.C) { c.Assert(cfg.AdvertisePeerUrls, check.Equals, "https://"+peerAddr) c.Assert(cfg.InitialCluster, check.Equals, "master-tls=https://"+peerAddr) t.testTLSPrefix(c, cfg) + + // listen address set to 0.0.0.0 + cfg = NewConfig() + c.Assert(cfg.Parse([]string{ + "--name=master-tls", + fmt.Sprintf("--data-dir=%s", c.MkDir()), + fmt.Sprintf("--master-addr=0.0.0.0:%s", masterPort), + fmt.Sprintf("--advertise-addr=https://%s", masterAddr), + fmt.Sprintf("--peer-urls=0.0.0.0:%s", peerPort), + fmt.Sprintf("--advertise-peer-urls=https://%s", peerAddr), + fmt.Sprintf("--initial-cluster=master-tls=https://%s", peerAddr), + "--ssl-ca=./tls_for_test/ca.pem", + "--ssl-cert=./tls_for_test/dm.pem", + "--ssl-key=./tls_for_test/dm.key", + }), check.IsNil) + t.testTLSPrefix(c, cfg) } func (t *testMaster) testTLSPrefix(c *check.C, cfg *Config) { + t.testNormalServerLifecycle(c, cfg, func(c *check.C, cfg *Config) { + t.testHTTPInterface(c, fmt.Sprintf("https://%s/status", cfg.AdvertiseAddr), []byte(utils.GetRawInfo())) + t.testHTTPInterface(c, fmt.Sprintf("https://%s/debug/pprof/", cfg.AdvertiseAddr), []byte("Types of profiles available")) + }) +} + +func (t *testMaster) testNormalServerLifecycle(c *check.C, cfg *Config, checkLogic func(*check.C, *Config)) { + var err error s := NewServer(cfg) ctx, cancel := context.WithCancel(context.Background()) - err1 := s.Start(ctx) - c.Assert(err1, check.IsNil) + err = s.Start(ctx) + c.Assert(err, check.IsNil) - t.testHTTPInterface(c, fmt.Sprintf("https://%s/status", cfg.AdvertiseAddr), []byte(utils.GetRawInfo())) - t.testHTTPInterface(c, fmt.Sprintf("https://%s/debug/pprof/", cfg.AdvertiseAddr), []byte("Types of profiles available")) + checkLogic(c, cfg) // close cancel() @@ -1594,6 +1628,7 @@ func (t *testMaster) TestJoinMember(c *check.C) { cfg1.Name = "dm-master-1" cfg1.DataDir = c.MkDir() cfg1.MasterAddr = tempurl.Alloc()[len("http://"):] + cfg1.AdvertiseAddr = cfg1.MasterAddr cfg1.PeerUrls = tempurl.Alloc() cfg1.AdvertisePeerUrls = cfg1.PeerUrls cfg1.InitialCluster = fmt.Sprintf("%s=%s", cfg1.Name, cfg1.AdvertisePeerUrls) @@ -1613,6 +1648,7 @@ func (t *testMaster) TestJoinMember(c *check.C) { cfg2.Name = "dm-master-2" cfg2.DataDir = c.MkDir() cfg2.MasterAddr = tempurl.Alloc()[len("http://"):] + cfg2.AdvertiseAddr = cfg2.MasterAddr cfg2.PeerUrls = tempurl.Alloc() cfg2.AdvertisePeerUrls = cfg2.PeerUrls cfg2.Join = cfg1.MasterAddr // join to an existing cluster @@ -1647,6 +1683,7 @@ func (t *testMaster) TestJoinMember(c *check.C) { cfg3.Name = "dm-master-3" cfg3.DataDir = c.MkDir() cfg3.MasterAddr = tempurl.Alloc()[len("http://"):] + cfg3.AdvertiseAddr = cfg3.MasterAddr cfg3.PeerUrls = tempurl.Alloc() cfg3.AdvertisePeerUrls = cfg3.PeerUrls cfg3.Join = cfg1.MasterAddr // join to an existing cluster @@ -1689,7 +1726,7 @@ func (t *testMaster) TestOperateSource(c *check.C) { cfg1.Name = "dm-master-1" cfg1.DataDir = c.MkDir() cfg1.MasterAddr = tempurl.Alloc()[len("http://"):] - cfg1.AdvertiseAddr = tempurl.Alloc()[len("http://"):] + cfg1.AdvertiseAddr = cfg1.MasterAddr cfg1.PeerUrls = tempurl.Alloc() cfg1.AdvertisePeerUrls = cfg1.PeerUrls cfg1.InitialCluster = fmt.Sprintf("%s=%s", cfg1.Name, cfg1.AdvertisePeerUrls)