diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index 33a0838fc0..7126c1d240 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -115,6 +115,8 @@ ErrElectionGetLeaderIDFail,[code=11108:class=functional:scope=internal:level=med ErrBinlogInvalidFilenameWithUUIDSuffix,[code=11109:class=functional:scope=internal:level=high], "Message: invalid binlog filename with uuid suffix %s" ErrDecodeEtcdKeyFail,[code=11110:class=functional:scope=internal:level=medium], "Message: fail to decode etcd key: %s" ErrShardDDLOptimismTrySyncFail,[code=11111:class=functional:scope=internal:level=medium], "Message: fail to try sync the optimistic shard ddl lock %s: %s, Workaround: Please use show-ddl-locks command for more details." +ErrConnInvalidTLSConfig,[code=11112:class=functional:scope=internal:level=medium], "Message: invalid TLS config, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config." +ErrConnRegistryTLSConfig,[code=11113:class=functional:scope=internal:level=medium], "Message: fail to registry TLS config" ErrConfigCheckItemNotSupport,[code=20001:class=config:scope=internal:level=medium], "Message: checking item %s is not supported\n%s, Workaround: Please check `ignore-checking-items` config in task configuration file, which can be set including `all`/`dump_privilege`/`replication_privilege`/`version`/`binlog_enable`/`binlog_format`/`binlog_row_image`/`table_schema`/`schema_of_shard_tables`/`auto_increment_ID`." ErrConfigTomlTransform,[code=20002:class=config:scope=internal:level=medium], "Message: %s, Workaround: Please check the configuration file has correct TOML format." ErrConfigTaskYamlTransform,[code=20003:class=config:scope=internal:level=medium], "Message: %s, Workaround: Please check the configuration file has correct YAML format." @@ -342,6 +344,8 @@ ErrMasterPessimistNotStarted,[code=38046:class=dm-master:scope=internal:level=me ErrMasterOptimistNotStarted,[code=38047:class=dm-master:scope=internal:level=medium], "Message: the shardddl optimist has not started" ErrMasterMasterNameNotExist,[code=38048:class=dm-master:scope=internal:level=low], "Message: dm-master with name %s not exists, Workaround: Please use list-member command to see masters." ErrMasterInvalidOfflineType,[code=38049:class=dm-master:scope=internal:level=low], "Message: offline member type %s is invalid, Workaround: Please use master/worker." +ErrMasterAdvertisePeerURLsNotValid,[code=38050:class=dm-master:scope=internal:level=high], "Message: advertise peer urls %s not valid, Workaround: Please check the `advertise-peer-urls` config in master configuration file." +ErrMasterTLSConfigNotValid,[code=38051:class=dm-master:scope=internal:level=high], "Message: TLS config not valid, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in master configuration file." ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium], "Message: parse dm-worker config flag set" ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium], "Message: '%s' is an invalid flag" ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium], "Message: toml decode file, Workaround: Please check the configuration file has correct TOML format." @@ -417,6 +421,7 @@ ErrWorkerSourceNotMatch,[code=40072:class=dm-worker:scope=internal:level=high], ErrWorkerFailToGetSubtaskConfigFromEtcd,[code=40073:class=dm-worker:scope=internal:level=medium], "Message: there is no relative subtask config for task %s in etcd" ErrWorkerFailToGetSourceConfigFromEtcd,[code=40074:class=dm-worker:scope=internal:level=medium], "Message: there is no relative source config for source %s in etcd" ErrWorkerDDLLockOpNotFound,[code=40075:class=dm-worker:scope=internal:level=high], "Message: missing shard DDL lock operation for shard DDL info (%s)" +ErrWorkerTLSConfigNotValid,[code=40076:class=dm-worker:scope=internal:level=high], "Message: TLS config not valid, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in worker configuration file." ErrTracerParseFlagSet,[code=42001:class=dm-tracer:scope=internal:level=medium], "Message: parse dm-tracer config flag set" ErrTracerConfigTomlTransform,[code=42002:class=dm-tracer:scope=internal:level=medium], "Message: config toml transform, Workaround: Please check the configuration file has correct TOML format." ErrTracerConfigInvalidFlag,[code=42003:class=dm-tracer:scope=internal:level=medium], "Message: '%s' is an invalid flag" @@ -457,4 +462,5 @@ ErrSchedulerSubTaskOpTaskNotExist,[code=46016:class=dm-master:scope=internal:lev ErrSchedulerSubTaskOpSourceNotExist,[code=46017:class=dm-master:scope=internal:level=medium], "Message: sources %v need to be operate not exist" ErrSchedulerTaskNotExist,[code=46018:class=scheduler:scope=internal:level=medium], "Message: task with name %s not exist, Workaround: Please use `query-status` command to see tasks." ErrCtlGRPCCreateConn,[code=48001:class=dmctl:scope=internal:level=high], "Message: can not create grpc connection, Workaround: Please check your network connection." +ErrCtlInvalidTLSCfg,[code=48002:class=dmctl:scope=internal:level=medium], "Message: invalid TLS config, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in command line." ErrNotSet,[code=50000:class=not-set:scope=not-set:level=high] diff --git a/dm/config/security.go b/dm/config/security.go new file mode 100644 index 0000000000..08dd4cd65a --- /dev/null +++ b/dm/config/security.go @@ -0,0 +1,36 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import "fmt" + +// Security config +type Security struct { + SSLCA string `toml:"ssl-ca" json:"ssl-ca" yaml:"ssl-ca"` + SSLCert string `toml:"ssl-cert" json:"ssl-cert" yaml:"ssl-cert"` + SSLKey string `toml:"ssl-key" json:"ssl-key" yaml:"ssl-key"` + CertAllowedCN strArray `toml:"cert-allowed-cn" json:"cert-allowed-cn" yaml:"cert-allowed-cn"` +} + +// used for parse string slice in flag +type strArray []string + +func (i *strArray) String() string { + return fmt.Sprint([]string(*i)) +} + +func (i *strArray) Set(value string) error { + *i = append(*i, value) + return nil +} diff --git a/dm/config/subtask.go b/dm/config/subtask.go index 201db46efe..00e89f388a 100644 --- a/dm/config/subtask.go +++ b/dm/config/subtask.go @@ -88,6 +88,9 @@ type DBConfig struct { MaxAllowedPacket *int `toml:"max-allowed-packet" json:"max-allowed-packet" yaml:"max-allowed-packet"` Session map[string]string `toml:"session" json:"session" yaml:"session"` + // security config + Security *Security `toml:"security" json:"security" yaml:"security"` + RawDBCfg *RawDBConfig `toml:"-" json:"-" yaml:"-"` } diff --git a/dm/ctl/common/config.go b/dm/ctl/common/config.go index acee187179..24733cbee2 100644 --- a/dm/ctl/common/config.go +++ b/dm/ctl/common/config.go @@ -20,6 +20,7 @@ import ( "net" "time" + "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/pkg/utils" "github.com/BurntSushi/toml" @@ -54,6 +55,9 @@ func NewConfig() *Config { fs.StringVar(&cfg.MasterAddr, "master-addr", "", "master API server addr") fs.StringVar(&cfg.RPCTimeoutStr, "rpc-timeout", defaultRPCTimeout, fmt.Sprintf("rpc timeout, default is %s", defaultRPCTimeout)) fs.StringVar(&cfg.encrypt, EncryptCmdName, "", "encrypt plaintext to ciphertext") + fs.StringVar(&cfg.SSLCA, "ssl-ca", "", "path of file that contains list of trusted SSL CAs for connection") + fs.StringVar(&cfg.SSLCert, "ssl-cert", "", "path of file that contains X509 certificate in PEM format for connection") + fs.StringVar(&cfg.SSLKey, "ssl-key", "", "path of file that contains X509 key in PEM format for connection") fs.StringVar(&cfg.decrypt, DecryptCmdName, "", "decrypt ciphertext to plaintext") return cfg @@ -72,7 +76,9 @@ type Config struct { printVersion bool encrypt string // string need to be encrypted - decrypt string // string need to be decrypted + + config.Security + decrypt string // string need to be decrypted } func (c *Config) String() string { diff --git a/dm/ctl/common/util.go b/dm/ctl/common/util.go index 39a047b5b2..aa152518b3 100644 --- a/dm/ctl/common/util.go +++ b/dm/ctl/common/util.go @@ -20,6 +20,7 @@ import ( "strings" "time" + "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" parserpkg "github.com/pingcap/dm/pkg/parser" "github.com/pingcap/dm/pkg/terror" @@ -29,6 +30,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/parser" "github.com/pingcap/parser/ast" + toolutils "github.com/pingcap/tidb-tools/pkg/utils" "github.com/spf13/cobra" "google.golang.org/grpc" ) @@ -41,12 +43,17 @@ var ( // InitUtils inits necessary dmctl utils func InitUtils(cfg *Config) error { globalConfig = cfg - return errors.Trace(InitClient(cfg.MasterAddr)) + return errors.Trace(InitClient(cfg.MasterAddr, cfg.Security)) } // InitClient initializes dm-master client -func InitClient(addr string) error { - conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(3*time.Second), grpc.WithBlock(), grpc.WithTimeout(3*time.Second)) +func InitClient(addr string, securityCfg config.Security) error { + tls, err := toolutils.NewTLS(securityCfg.SSLCA, securityCfg.SSLCert, securityCfg.SSLKey, "", securityCfg.CertAllowedCN) + if err != nil { + return terror.ErrCtlInvalidTLSCfg.Delegate(err) + } + + conn, err := grpc.Dial(addr, tls.ToGRPCDialOption(), grpc.WithBackoffMaxDelay(3*time.Second), grpc.WithBlock(), grpc.WithTimeout(3*time.Second)) if err != nil { return terror.ErrCtlGRPCCreateConn.AnnotateDelegate(err, "can't connect to %s", addr) } diff --git a/dm/master/config.go b/dm/master/config.go index c3b93fb6e7..9ec8fb289e 100644 --- a/dm/master/config.go +++ b/dm/master/config.go @@ -23,12 +23,14 @@ import ( "net/url" "os" "strings" + "sync/atomic" "time" "github.com/BurntSushi/toml" "go.etcd.io/etcd/embed" "go.uber.org/zap" + "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" @@ -75,6 +77,11 @@ func NewConfig() *Config { fs.StringVar(&cfg.AdvertisePeerUrls, "advertise-peer-urls", "", `advertise URLs for peer traffic (default "${peer-urls}")`) fs.StringVar(&cfg.Join, "join", "", `join to an existing cluster (usage: cluster's "${master-addr}" list, e.g. "127.0.0.1:8261,127.0.0.1:18261"`) + fs.StringVar(&cfg.SSLCA, "ssl-ca", "", "path of file that contains list of trusted SSL CAs for connection") + fs.StringVar(&cfg.SSLCert, "ssl-cert", "", "path of file that contains X509 certificate in PEM format for connection") + fs.StringVar(&cfg.SSLKey, "ssl-key", "", "path of file that contains X509 key in PEM format for connection") + fs.Var(&cfg.CertAllowedCN, "cert-allowed-cn", "the trusted common name that allowed to visit") + return cfg } @@ -129,6 +136,9 @@ type Config struct { Join string `toml:"join" json:"join"` // cluster's client address (endpoints), not peer address Debug bool `toml:"debug" json:"debug"` // only use for test + // tls config + config.Security + printVersion bool printSampleConfig bool } @@ -358,6 +368,25 @@ func (c *Config) genEmbedEtcdConfig(cfg *embed.Config) (*embed.Config, error) { return nil, terror.ErrMasterGenEmbedEtcdConfigFail.AnnotateDelegate(err, "fail to validate embed etcd config") } + // security config + if len(c.SSLCA) != 0 { + cfg.ClientTLSInfo.TrustedCAFile = c.SSLCA + cfg.ClientTLSInfo.CertFile = c.SSLCert + cfg.ClientTLSInfo.KeyFile = c.SSLKey + + cfg.PeerTLSInfo.TrustedCAFile = c.SSLCA + cfg.PeerTLSInfo.CertFile = c.SSLCert + cfg.PeerTLSInfo.KeyFile = c.SSLKey + + // NOTE: etcd only support one allowed CN + if len(c.CertAllowedCN) > 0 { + cfg.ClientTLSInfo.AllowedCN = c.CertAllowedCN[0] + cfg.PeerTLSInfo.AllowedCN = c.CertAllowedCN[0] + cfg.PeerTLSInfo.ClientCertAuth = len(c.SSLCA) != 0 + cfg.ClientTLSInfo.ClientCertAuth = len(c.SSLCA) != 0 + } + } + return cfg, nil } @@ -378,7 +407,11 @@ func parseURLs(s string) ([]url.URL, error) { // `127.0.0.1:8261`: first path segment in URL cannot contain colon if err != nil && (strings.Contains(err.Error(), "missing protocol scheme") || strings.Contains(err.Error(), "first path segment in URL cannot contain colon")) { - u, err = url.Parse("http://" + item) + prefix := "http://" + if atomic.LoadInt32(&useTLS) == 1 { + prefix = "https://" + } + u, err = url.Parse(prefix + item) } if err != nil { return nil, terror.ErrMasterParseURLFail.Delegate(err, item) diff --git a/dm/master/config_test.go b/dm/master/config_test.go index ba7bc34763..f7eee6ccdc 100644 --- a/dm/master/config_test.go +++ b/dm/master/config_test.go @@ -300,6 +300,10 @@ func (t *testConfigSuite) TestParseURLs(c *check.C) { str: "http://127.0.0.1:8291", urls: []url.URL{{Scheme: "http", Host: "127.0.0.1:8291"}}, }, + { + str: "https://127.0.0.1:8291", + urls: []url.URL{{Scheme: "https", Host: "127.0.0.1:8291"}}, + }, { str: "http://127.0.0.1:8291,http://127.0.0.1:18291", urls: []url.URL{ @@ -307,6 +311,13 @@ func (t *testConfigSuite) TestParseURLs(c *check.C) { {Scheme: "http", Host: "127.0.0.1:18291"}, }, }, + { + str: "https://127.0.0.1:8291,https://127.0.0.1:18291", + urls: []url.URL{ + {Scheme: "https", Host: "127.0.0.1:8291"}, + {Scheme: "https", Host: "127.0.0.1:18291"}, + }, + }, { str: "127.0.0.1:8291", // no scheme urls: []url.URL{{Scheme: "http", Host: "127.0.0.1:8291"}}, diff --git a/dm/master/election.go b/dm/master/election.go index 19e32eb2b9..b62d259a1a 100644 --- a/dm/master/election.go +++ b/dm/master/election.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/dm/pkg/log" "github.com/pingcap/failpoint" + toolutils "github.com/pingcap/tidb-tools/pkg/utils" "go.uber.org/zap" "google.golang.org/grpc" ) @@ -89,7 +90,13 @@ func (s *Server) electionNotify(ctx context.Context) { func (s *Server) createLeaderClient(leaderAddr string) { s.closeLeaderClient() - conn, err := grpc.Dial(leaderAddr, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(3*time.Second)) + tls, err := toolutils.NewTLS(s.cfg.SSLCA, s.cfg.SSLCert, s.cfg.SSLKey, s.cfg.AdvertiseAddr, s.cfg.CertAllowedCN) + if err != nil { + log.L().Error("can't create grpc connection with leader, can't forward request to leader", zap.String("leader", leaderAddr), zap.Error(err)) + return + } + + conn, err := grpc.Dial(leaderAddr, tls.ToGRPCDialOption(), grpc.WithBackoffMaxDelay(3*time.Second)) if err != nil { log.L().Error("can't create grpc connection with leader, can't forward request to leader", zap.String("leader", leaderAddr), zap.Error(err)) return diff --git a/dm/master/election_test.go b/dm/master/election_test.go index 73b6e6057b..625743c1ec 100644 --- a/dm/master/election_test.go +++ b/dm/master/election_test.go @@ -69,7 +69,7 @@ func (t *testMaster) TestFailToStartLeader(c *check.C) { c.Assert(s2.Start(ctx), check.IsNil) defer s2.Close() - client, err := etcdutil.CreateClient(strings.Split(cfg1.AdvertisePeerUrls, ",")) + client, err := etcdutil.CreateClient(strings.Split(cfg1.AdvertisePeerUrls, ","), nil) c.Assert(err, check.IsNil) defer client.Close() diff --git a/dm/master/etcd.go b/dm/master/etcd.go index eb0b97cd0a..795daba2ce 100644 --- a/dm/master/etcd.go +++ b/dm/master/etcd.go @@ -22,6 +22,7 @@ import ( "strings" "time" + toolutils "github.com/pingcap/tidb-tools/pkg/utils" "go.etcd.io/etcd/embed" "go.uber.org/zap" "google.golang.org/grpc" @@ -124,8 +125,13 @@ func prepareJoinEtcd(cfg *Config) error { return nil } + tlsCfg, err := toolutils.ToTLSConfig(cfg.SSLCA, cfg.SSLCert, cfg.SSLKey) + if err != nil { + return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, "generate tls config") + } + // if without previous data, we need a client to contact with the existing cluster. - client, err := etcdutil.CreateClient(strings.Split(cfg.Join, ",")) + client, err := etcdutil.CreateClient(strings.Split(cfg.Join, ","), tlsCfg) if err != nil { return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, fmt.Sprintf("create etcd client for %s", cfg.Join)) } diff --git a/dm/master/http_handler.go b/dm/master/http_handler.go index ffb1ddddfc..4c700d35f0 100644 --- a/dm/master/http_handler.go +++ b/dm/master/http_handler.go @@ -18,6 +18,7 @@ import ( "net/http" "net/http/pprof" + "github.com/gogo/gateway" "github.com/grpc-ecosystem/grpc-gateway/runtime" "google.golang.org/grpc" @@ -45,16 +46,27 @@ func getStatusHandle() http.Handler { } // getHTTPAPIHandler returns a HTTP handler to handle DM-master APIs. -func getHTTPAPIHandler(ctx context.Context, addr string) (http.Handler, error) { +func getHTTPAPIHandler(ctx context.Context, addr string, securityOpt grpc.DialOption) (http.Handler, error) { // dial the real API server in non-blocking mode, it may not started yet. - opts := []grpc.DialOption{grpc.WithInsecure()} + opts := []grpc.DialOption{securityOpt} // NOTE: should we need to replace `host` in `addr` to `127.0.0.1`? conn, err := grpc.DialContext(ctx, addr, opts...) if err != nil { return nil, terror.ErrMasterHandleHTTPApis.Delegate(err) } - gwmux := runtime.NewServeMux() + jsonpb := &gateway.JSONPb{ + EmitDefaults: true, + Indent: " ", + OrigName: true, + } + + gwmux := runtime.NewServeMux( + runtime.WithMarshalerOption(runtime.MIMEWildcard, jsonpb), + // This is necessary to get error details properly + // marshalled in unary requests. + runtime.WithProtoErrorHandler(runtime.DefaultHTTPProtoErrorHandler), + ) err = pb.RegisterMasterHandler(ctx, gwmux, conn) if err != nil { return nil, terror.ErrMasterHandleHTTPApis.Delegate(err) diff --git a/dm/master/scheduler/scheduler.go b/dm/master/scheduler/scheduler.go index 1f79cccdca..820ee55488 100644 --- a/dm/master/scheduler/scheduler.go +++ b/dm/master/scheduler/scheduler.go @@ -138,10 +138,12 @@ type Scheduler struct { // delete: // - remove/stop subtask by user request (calling `RemoveSubTasks`). expectSubTaskStages map[string]map[string]ha.Stage + + securityCfg config.Security } // NewScheduler creates a new scheduler instance. -func NewScheduler(pLogger *log.Logger) *Scheduler { +func NewScheduler(pLogger *log.Logger, securityCfg config.Security) *Scheduler { return &Scheduler{ logger: pLogger.WithFields(zap.String("component", "scheduler")), sourceCfgs: make(map[string]config.SourceConfig), @@ -152,6 +154,7 @@ func NewScheduler(pLogger *log.Logger) *Scheduler { unbounds: make(map[string]struct{}), expectRelayStages: make(map[string]ha.Stage), expectSubTaskStages: make(map[string]map[string]ha.Stage), + securityCfg: securityCfg, } } @@ -1368,7 +1371,7 @@ func (s *Scheduler) boundSourceToWorker(source string, w *Worker) error { // this func is used when adding a new worker. // NOTE: trigger scheduler when the worker become online, not when added. func (s *Scheduler) recordWorker(info ha.WorkerInfo) (*Worker, error) { - w, err := NewWorker(info) + w, err := NewWorker(info, s.securityCfg) if err != nil { return nil, err } diff --git a/dm/master/scheduler/scheduler_test.go b/dm/master/scheduler/scheduler_test.go index 8de13f3226..11587e0c18 100644 --- a/dm/master/scheduler/scheduler_test.go +++ b/dm/master/scheduler/scheduler_test.go @@ -86,7 +86,7 @@ func (t *testScheduler) testSchedulerProgress(c *C, restart int) { var ( logger = log.L() - s = NewScheduler(&logger) + s = NewScheduler(&logger, config.Security{}) sourceID1 = "mysql-replica-1" sourceID2 = "mysql-replica-2" workerName1 = "dm-worker-1" @@ -108,7 +108,7 @@ func (t *testScheduler) testSchedulerProgress(c *C, restart int) { c.Assert(s.Start(ctx, etcdTestCli), IsNil) case restartNewInstance: s.Close() - s = NewScheduler(&logger) + s = NewScheduler(&logger, config.Security{}) c.Assert(s.Start(ctx, etcdTestCli), IsNil) } } @@ -681,7 +681,7 @@ func (t *testScheduler) TestRestartScheduler(c *C) { c.Assert(sourceCfg1.LoadFromFile(sourceSampleFile), IsNil) sourceCfg1.SourceID = sourceID1 - s := NewScheduler(&logger) + s := NewScheduler(&logger, config.Security{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() // step 1: start scheduler @@ -776,7 +776,7 @@ func (t *testScheduler) TestWatchWorkerEventEtcdCompact(c *C) { var ( logger = log.L() - s = NewScheduler(&logger) + s = NewScheduler(&logger, config.Security{}) sourceID1 = "mysql-replica-1" sourceID2 = "mysql-replica-2" workerName1 = "dm-worker-1" diff --git a/dm/master/scheduler/worker.go b/dm/master/scheduler/worker.go index ff82fcefae..e45912c1e8 100644 --- a/dm/master/scheduler/worker.go +++ b/dm/master/scheduler/worker.go @@ -18,6 +18,7 @@ import ( "sync" "time" + "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/master/metrics" "github.com/pingcap/dm/dm/master/workerrpc" "github.com/pingcap/dm/pkg/ha" @@ -65,8 +66,8 @@ type Worker struct { } // NewWorker creates a new Worker instance with Offline stage. -func NewWorker(baseInfo ha.WorkerInfo) (*Worker, error) { - cli, err := workerrpc.NewGRPCClient(baseInfo.Addr) +func NewWorker(baseInfo ha.WorkerInfo, securityCfg config.Security) (*Worker, error) { + cli, err := workerrpc.NewGRPCClient(baseInfo.Addr, securityCfg) if err != nil { return nil, err } diff --git a/dm/master/scheduler/worker_test.go b/dm/master/scheduler/worker_test.go index edf2edbcd0..274b338a8e 100644 --- a/dm/master/scheduler/worker_test.go +++ b/dm/master/scheduler/worker_test.go @@ -19,6 +19,7 @@ import ( . "github.com/pingcap/check" + "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/master/workerrpc" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/pkg/ha" @@ -38,7 +39,7 @@ func (t *testWorker) TestWorker(c *C) { ) // create a worker with Offline stage and not bound. - w, err := NewWorker(info) + w, err := NewWorker(info, config.Security{}) c.Assert(err, IsNil) defer w.Close() c.Assert(w.BaseInfo(), DeepEquals, info) diff --git a/dm/master/server.go b/dm/master/server.go index 570f69cb99..cba295cd94 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -16,14 +16,17 @@ package master import ( "context" "fmt" + "net" "net/http" "sort" "strings" "sync" + "sync/atomic" "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + toolutils "github.com/pingcap/tidb-tools/pkg/utils" "github.com/siddontang/go/sync2" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/embed" @@ -68,6 +71,10 @@ var ( // the retry interval for dm-master to confirm the dm-workers status is expected retryInterval = time.Second + // 0 means not use tls + // 1 means use tls + useTLS = int32(0) + // typically there's only one server running in one process, but testMaster.TestOfflineMember starts 3 servers, // so we need sync.Once to prevent data race registerOnce sync.Once @@ -121,7 +128,7 @@ func NewServer(cfg *Config) *Server { logger := log.L() server := Server{ cfg: cfg, - scheduler: scheduler.NewScheduler(&logger), + scheduler: scheduler.NewScheduler(&logger, cfg.Security), sqlOperatorHolder: operator.NewHolder(), idGen: tracing.NewIDGen(), ap: NewAgentPool(&RateLimitConfig{rate: cfg.RPCRateLimit, burst: cfg.RPCRateBurst}), @@ -130,6 +137,8 @@ func NewServer(cfg *Config) *Server { server.optimist = shardddl.NewOptimist(&logger) server.closed.Set(true) + setUseTLS(&cfg.Security) + return &server } @@ -155,7 +164,21 @@ func (s *Server) Start(ctx context.Context) (err error) { return } - apiHandler, err := getHTTPAPIHandler(ctx, s.cfg.MasterAddr) + tls, err := toolutils.NewTLS(s.cfg.SSLCA, s.cfg.SSLCert, s.cfg.SSLKey, s.cfg.AdvertiseAddr, s.cfg.CertAllowedCN) + if err != nil { + return terror.ErrMasterTLSConfigNotValid.Delegate(err) + } + + // tls2 is used for grpc client in grpc gateway + tls2, err := toolutils.NewTLS(s.cfg.SSLCA, s.cfg.SSLCert, s.cfg.SSLKey, s.cfg.AdvertiseAddr, s.cfg.CertAllowedCN) + if err != nil { + return terror.ErrMasterTLSConfigNotValid.Delegate(err) + } + if tls2 != nil && tls2.TLSConfig() != nil { + tls2.TLSConfig().InsecureSkipVerify = true + } + + apiHandler, err := getHTTPAPIHandler(ctx, s.cfg.MasterAddr, tls2.ToGRPCDialOption()) if err != nil { return } @@ -187,7 +210,7 @@ func (s *Server) Start(ctx context.Context) (err error) { // create an etcd client used in the whole server instance. // NOTE: we only use the local member's address now, but we can use all endpoints of the cluster if needed. - s.etcdClient, err = etcdutil.CreateClient([]string{s.cfg.MasterAddr}) + s.etcdClient, err = etcdutil.CreateClient([]string{withHost(s.cfg.MasterAddr)}, tls.TLSConfig()) if err != nil { return } @@ -1543,6 +1566,40 @@ func (s *Server) generateSubTask(ctx context.Context, task string) (*config.Task return cfg, stCfgs, nil } +func setUseTLS(tlsCfg *config.Security) { + if enableTLS(tlsCfg) { + atomic.StoreInt32(&useTLS, 1) + } else { + atomic.StoreInt32(&useTLS, 0) + } + +} + +func enableTLS(tlsCfg *config.Security) bool { + if tlsCfg == nil { + return false + } + + if len(tlsCfg.SSLCA) == 0 || len(tlsCfg.SSLCert) == 0 || len(tlsCfg.SSLKey) == 0 { + return false + } + + return true +} + +func withHost(addr string) string { + host, port, err := net.SplitHostPort(addr) + if err != nil { + // do nothing + return addr + } + if len(host) == 0 { + return fmt.Sprintf("127.0.0.1:%s", port) + } + + return addr +} + func (s *Server) removeMetaData(ctx context.Context, cfg *config.TaskConfig) error { toDB := *cfg.TargetDB toDB.Adjust() diff --git a/dm/master/server_test.go b/dm/master/server_test.go index 72a634b97e..2ed1a8fd2b 100644 --- a/dm/master/server_test.go +++ b/dm/master/server_test.go @@ -245,7 +245,7 @@ func testDefaultMasterServer(c *check.C) *Server { func testMockScheduler(ctx context.Context, wg *sync.WaitGroup, c *check.C, sources, workers []string, password string, workerClients map[string]workerrpc.Client) (*scheduler.Scheduler, []context.CancelFunc) { logger := log.L() - scheduler2 := scheduler.NewScheduler(&logger) + scheduler2 := scheduler.NewScheduler(&logger, config.Security{}) err := scheduler2.Start(ctx, etcdTestCli) c.Assert(err, check.IsNil) cancels := make([]context.CancelFunc, 0, 2) @@ -449,7 +449,7 @@ type mockDBProvider struct { // Apply will build BaseDB with DBConfig func (d *mockDBProvider) Apply(config config.DBConfig) (*conn.BaseDB, error) { - return conn.NewBaseDB(d.db), nil + return conn.NewBaseDB(d.db, func() {}), nil } func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) { @@ -1089,7 +1089,7 @@ func (t *testMaster) TestJoinMember(c *check.C) { c.Assert(s2.Start(ctx), check.IsNil) defer s2.Close() - client, err := etcdutil.CreateClient(strings.Split(cfg1.AdvertisePeerUrls, ",")) + client, err := etcdutil.CreateClient(strings.Split(cfg1.AdvertisePeerUrls, ","), nil) c.Assert(err, check.IsNil) defer client.Close() diff --git a/dm/master/workerrpc/rawgrpc.go b/dm/master/workerrpc/rawgrpc.go index a7f1a48bf8..78300e9fd7 100644 --- a/dm/master/workerrpc/rawgrpc.go +++ b/dm/master/workerrpc/rawgrpc.go @@ -18,9 +18,11 @@ import ( "sync/atomic" "time" + toolutils "github.com/pingcap/tidb-tools/pkg/utils" "google.golang.org/grpc" "google.golang.org/grpc/backoff" + "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/pkg/terror" ) @@ -42,8 +44,13 @@ func NewGRPCClientWrap(conn *grpc.ClientConn, client pb.WorkerClient) (*GRPCClie } // NewGRPCClient initializes a new grpc client from worker address -func NewGRPCClient(addr string) (*GRPCClient, error) { - conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(3*time.Second), +func NewGRPCClient(addr string, securityCfg config.Security) (*GRPCClient, error) { + tls, err := toolutils.NewTLS(securityCfg.SSLCA, securityCfg.SSLCert, securityCfg.SSLKey, addr, securityCfg.CertAllowedCN) + if err != nil { + return nil, terror.ErrMasterGRPCCreateConn.Delegate(err) + } + + conn, err := grpc.Dial(addr, tls.ToGRPCDialOption(), grpc.WithBackoffMaxDelay(3*time.Second), grpc.WithConnectParams(grpc.ConnectParams{ Backoff: backoff.Config{ BaseDelay: 100 * time.Millisecond, diff --git a/dm/worker/config.go b/dm/worker/config.go index 8bc228c018..5aa4461f3b 100644 --- a/dm/worker/config.go +++ b/dm/worker/config.go @@ -23,6 +23,7 @@ import ( "strings" "github.com/BurntSushi/toml" + "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" @@ -57,6 +58,12 @@ func NewConfig() *Config { fs.StringVar(&cfg.Join, "join", "", `join to an existing cluster (usage: dm-master cluster's "${master-addr}")`) fs.StringVar(&cfg.Name, "name", "", "human-readable name for DM-worker member") fs.Int64Var(&cfg.KeepAliveTTL, "keepalive-ttl", defaultKeepAliveTTL, "dm-worker's TTL for keepalive with etcd (in seconds)") + + fs.StringVar(&cfg.SSLCA, "ssl-ca", "", "path of file that contains list of trusted SSL CAs for connection") + fs.StringVar(&cfg.SSLCert, "ssl-cert", "", "path of file that contains X509 certificate in PEM format for connection") + fs.StringVar(&cfg.SSLKey, "ssl-key", "", "path of file that contains X509 key in PEM format for connection") + fs.Var(&cfg.CertAllowedCN, "cert-allowed-cn", "the trusted common name that allowed to visit") + return cfg } @@ -78,6 +85,9 @@ type Config struct { // TODO: in the future dm-workers should share a same ttl from dm-master KeepAliveTTL int64 `toml:"keepalive-ttl" json:"keepalive-ttl"` + // tls config + config.Security + printVersion bool printSampleConfig bool } diff --git a/dm/worker/join.go b/dm/worker/join.go index 1d7a899e8a..6acebc9a71 100644 --- a/dm/worker/join.go +++ b/dm/worker/join.go @@ -20,12 +20,14 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" + toolutils "github.com/pingcap/tidb-tools/pkg/utils" "go.uber.org/zap" "google.golang.org/grpc" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/pkg/ha" "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/terror" ) // GetJoinURLs gets the endpoints from the join address. @@ -37,6 +39,11 @@ func GetJoinURLs(addrs string) []string { // JoinMaster let dm-worker join the cluster with the specified master endpoints. func (s *Server) JoinMaster(endpoints []string) error { // TODO: grpc proxy + tls, err := toolutils.NewTLS(s.cfg.SSLCA, s.cfg.SSLCert, s.cfg.SSLKey, s.cfg.AdvertiseAddr, s.cfg.CertAllowedCN) + if err != nil { + return terror.ErrWorkerTLSConfigNotValid.Delegate(err) + } + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -47,7 +54,7 @@ func (s *Server) JoinMaster(endpoints []string) error { for _, endpoint := range endpoints { ctx1, cancel1 := context.WithTimeout(ctx, 3*time.Second) - conn, err := grpc.DialContext(ctx1, endpoint, grpc.WithBlock(), grpc.WithInsecure(), grpc.WithBackoffMaxDelay(3*time.Second)) + conn, err := grpc.DialContext(ctx1, endpoint, grpc.WithBlock(), tls.ToGRPCDialOption(), grpc.WithBackoffMaxDelay(3*time.Second)) cancel1() if err != nil { if conn != nil { diff --git a/dm/worker/server.go b/dm/worker/server.go index 068d3d79c4..eb9371c13e 100644 --- a/dm/worker/server.go +++ b/dm/worker/server.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/dm/syncer" "github.com/pingcap/errors" + toolutils "github.com/pingcap/tidb-tools/pkg/utils" "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go/sync2" "github.com/soheilhy/cmux" @@ -82,11 +83,16 @@ func NewServer(cfg *Config) *Server { // Start starts to serving func (s *Server) Start() error { - var err error - s.rootLis, err = net.Listen("tcp", s.cfg.WorkerAddr) + tls, err := toolutils.NewTLS(s.cfg.SSLCA, s.cfg.SSLCert, s.cfg.SSLKey, s.cfg.AdvertiseAddr, s.cfg.CertAllowedCN) + if err != nil { + return terror.ErrWorkerTLSConfigNotValid.Delegate(err) + } + + rootLis, err := net.Listen("tcp", s.cfg.WorkerAddr) if err != nil { return terror.ErrWorkerStartService.Delegate(err) } + s.rootLis = tls.WrapListener(rootLis) log.L().Info("Start Server") s.setWorker(nil, true) @@ -96,6 +102,7 @@ func (s *Server) Start() error { DialTimeout: dialTimeout, DialKeepAliveTime: keepaliveTime, DialKeepAliveTimeout: keepaliveTimeout, + TLS: tls.TLSConfig(), }) if err != nil { return err @@ -133,12 +140,15 @@ func (s *Server) Start() error { // create a cmux m := cmux.New(s.rootLis) + m.SetReadTimeout(cmuxReadTimeout) // set a timeout, ref: https://github.com/pingcap/tidb-binlog/pull/352 // match connections in order: first gRPC, then HTTP grpcL := m.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc")) + httpL := m.Match(cmux.HTTP1Fast()) + // NOTE: don't need to set tls config, because rootLis already use tls s.svr = grpc.NewServer() pb.RegisterWorkerServer(s.svr, s) go func() { diff --git a/errors.toml b/errors.toml index 9274c56bda..76f9e00ab9 100644 --- a/errors.toml +++ b/errors.toml @@ -700,6 +700,18 @@ description = "" workaround = "Please use show-ddl-locks command for more details." tags = ["internal", "medium"] +[error.DM-functional-11112] +message = "invalid TLS config" +description = "" +workaround = "Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config." +tags = ["internal", "medium"] + +[error.DM-functional-11113] +message = "fail to registry TLS config" +description = "" +workaround = "" +tags = ["internal", "medium"] + [error.DM-config-20001] message = "checking item %s is not supported\n%s" description = "" @@ -2062,6 +2074,18 @@ description = "" workaround = "Please use master/worker." tags = ["internal", "low"] +[error.DM-dm-master-38050] +message = "advertise peer urls %s not valid" +description = "" +workaround = "Please check the `advertise-peer-urls` config in master configuration file." +tags = ["internal", "high"] + +[error.DM-dm-master-38051] +message = "TLS config not valid" +description = "" +workaround = "Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in master configuration file." +tags = ["internal", "high"] + [error.DM-dm-worker-40001] message = "parse dm-worker config flag set" description = "" @@ -2512,6 +2536,12 @@ description = "" workaround = "" tags = ["internal", "high"] +[error.DM-dm-worker-40076] +message = "TLS config not valid" +description = "" +workaround = "Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in worker configuration file." +tags = ["internal", "high"] + [error.DM-dm-tracer-42001] message = "parse dm-tracer config flag set" description = "" @@ -2752,6 +2782,12 @@ description = "" workaround = "Please check your network connection." tags = ["internal", "high"] +[error.DM-dmctl-48002] +message = "invalid TLS config" +description = "" +workaround = "Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in command line." +tags = ["internal", "medium"] + [error.DM-not-set-50000] message = "" description = "" diff --git a/go.mod b/go.mod index 21ce3b4c07..c1c6bef96c 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/docker/go-units v0.4.0 github.com/dustin/go-humanize v1.0.0 github.com/go-sql-driver/mysql v1.5.0 + github.com/gogo/gateway v1.1.0 github.com/gogo/protobuf v1.3.1 github.com/golang/groupcache v0.0.0-20191027212112-611e8accdfc9 // indirect github.com/golang/mock v1.3.1 diff --git a/go.sum b/go.sum index fb899460ad..90f73a7661 100644 --- a/go.sum +++ b/go.sum @@ -161,8 +161,11 @@ github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gogo/gateway v1.1.0 h1:u0SuhL9+Il+UbjM9VIE3ntfRujKbvVpFvNB4HbjeVQ0= +github.com/gogo/gateway v1.1.0/go.mod h1:S7rR8FRQyG3QFESeSv4l2WnsyzlCLG0CzBbUUo/mbic= github.com/gogo/protobuf v0.0.0-20180717141946-636bf0302bc9/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls= @@ -224,6 +227,7 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 h1:THDBEeQ9xZ8JEaCLyLQqXMMdR github.com/grpc-ecosystem/go-grpc-middleware v1.1.0/go.mod h1:f5nM7jw/oeRSadq3xCzHAvxcr8HZnzsqU6ILg/0NiiE= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= +github.com/grpc-ecosystem/grpc-gateway v1.8.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.12.1 h1:zCy2xE9ablevUOrUZc3Dl72Dt+ya2FNAvC2yLYMHzi4= @@ -740,6 +744,7 @@ google.golang.org/genproto v0.0.0-20191114150713-6bbd007550de h1:dFEMUWudT9iV1JM google.golang.org/genproto v0.0.0-20191114150713-6bbd007550de/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.19.1/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.23.1 h1:q4XQuHFC6I28BKZpo6IYyb3mNO+l7lSOxRuYTCiDfXk= diff --git a/pkg/conn/basedb.go b/pkg/conn/basedb.go index e324a85318..ae63fd1d0f 100644 --- a/pkg/conn/basedb.go +++ b/pkg/conn/basedb.go @@ -18,13 +18,20 @@ import ( "database/sql" "fmt" "net/url" + "strconv" "sync" + "sync/atomic" "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/pkg/retry" "github.com/pingcap/dm/pkg/terror" + + "github.com/go-sql-driver/mysql" + toolutils "github.com/pingcap/tidb-tools/pkg/utils" ) +var customID int64 + // DBProvider providers BaseDB instance type DBProvider interface { Apply(config config.DBConfig) (*BaseDB, error) @@ -46,6 +53,26 @@ func (d *DefaultDBProviderImpl) Apply(config config.DBConfig) (*BaseDB, error) { dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&interpolateParams=true&maxAllowedPacket=%d", config.User, config.Password, config.Host, config.Port, *config.MaxAllowedPacket) + doFuncInClose := func() {} + if config.Security != nil && len(config.Security.SSLCA) != 0 && + len(config.Security.SSLCert) != 0 && len(config.Security.SSLKey) != 0 { + tlsConfig, err := toolutils.ToTLSConfig(config.Security.SSLCA, config.Security.SSLCert, config.Security.SSLKey) + if err != nil { + return nil, terror.ErrConnInvalidTLSConfig.Delegate(err) + } + + name := "dm" + strconv.FormatInt(atomic.AddInt64(&customID, 1), 10) + err = mysql.RegisterTLSConfig(name, tlsConfig) + if err != nil { + return nil, terror.ErrConnRegistryTLSConfig.Delegate(err) + } + dsn += "&tls=" + name + + doFuncInClose = func() { + mysql.DeregisterTLSConfig(name) + } + } + var maxIdleConns int rawCfg := config.RawDBCfg if rawCfg != nil { @@ -76,7 +103,7 @@ func (d *DefaultDBProviderImpl) Apply(config config.DBConfig) (*BaseDB, error) { db.SetMaxIdleConns(maxIdleConns) - return NewBaseDB(db), nil + return NewBaseDB(db, doFuncInClose), nil } // BaseDB wraps *sql.DB, control the BaseConn @@ -88,12 +115,15 @@ type BaseDB struct { conns map[*BaseConn]struct{} Retry retry.Strategy + + // this function will do when close the BaseDB + doFuncInClose func() } // NewBaseDB returns *BaseDB object -func NewBaseDB(db *sql.DB) *BaseDB { +func NewBaseDB(db *sql.DB, doFuncInClose func()) *BaseDB { conns := make(map[*BaseConn]struct{}) - return &BaseDB{DB: db, conns: conns, Retry: &retry.FiniteRetryStrategy{}} + return &BaseDB{DB: db, conns: conns, Retry: &retry.FiniteRetryStrategy{}, doFuncInClose: doFuncInClose} } // GetBaseConn retrieves *BaseConn which has own retryStrategy @@ -136,8 +166,11 @@ func (d *BaseDB) Close() error { } } terr := d.DB.Close() + d.doFuncInClose() + if err == nil { return terr } + return err } diff --git a/pkg/conn/basedb_test.go b/pkg/conn/basedb_test.go index 16dfd5ac89..2f365aaf18 100644 --- a/pkg/conn/basedb_test.go +++ b/pkg/conn/basedb_test.go @@ -14,9 +14,9 @@ package conn import ( - "github.com/DATA-DOG/go-sqlmock" . "github.com/pingcap/check" + sqlmock "github.com/DATA-DOG/go-sqlmock" tcontext "github.com/pingcap/dm/pkg/context" ) @@ -29,7 +29,7 @@ func (t *testBaseDBSuite) TestGetBaseConn(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) - baseDB := NewBaseDB(db) + baseDB := NewBaseDB(db, func() {}) tctx := tcontext.Background() diff --git a/pkg/election/election_test.go b/pkg/election/election_test.go index 33fe55933f..bcc59ac991 100644 --- a/pkg/election/election_test.go +++ b/pkg/election/election_test.go @@ -97,7 +97,7 @@ func testElection2After1(t *testElectionSuite, c *C, normalExit bool) { addr2 = "127.0.0.1:2" addr3 = "127.0.0.1:3" ) - cli, err := etcdutil.CreateClient([]string{t.endPoint}) + cli, err := etcdutil.CreateClient([]string{t.endPoint}, nil) c.Assert(err, IsNil) defer cli.Close() ctx0, cancel0 := context.WithTimeout(context.Background(), time.Second) @@ -224,7 +224,7 @@ func (t *testElectionSuite) TestElectionAlways1(c *C) { addr1 = "127.0.0.1:1234" addr2 = "127.0.0.1:2345" ) - cli, err := etcdutil.CreateClient([]string{t.endPoint}) + cli, err := etcdutil.CreateClient([]string{t.endPoint}, nil) c.Assert(err, IsNil) defer cli.Close() @@ -294,7 +294,7 @@ func (t *testElectionSuite) TestElectionEvictLeader(c *C) { addr1 = "127.0.0.1:1234" addr2 = "127.0.0.1:2345" ) - cli, err := etcdutil.CreateClient([]string{t.endPoint}) + cli, err := etcdutil.CreateClient([]string{t.endPoint}, nil) c.Assert(err, IsNil) defer cli.Close() @@ -358,7 +358,7 @@ func (t *testElectionSuite) TestElectionDeleteKey(c *C) { ID = "member" addr = "127.0.0.1:1234" ) - cli, err := etcdutil.CreateClient([]string{t.endPoint}) + cli, err := etcdutil.CreateClient([]string{t.endPoint}, nil) c.Assert(err, IsNil) defer cli.Close() diff --git a/pkg/etcdutil/etcdutil.go b/pkg/etcdutil/etcdutil.go index 79110202fe..fbfba8dc02 100644 --- a/pkg/etcdutil/etcdutil.go +++ b/pkg/etcdutil/etcdutil.go @@ -17,6 +17,7 @@ package etcdutil import ( "context" + "crypto/tls" "time" "github.com/pingcap/errors" @@ -67,10 +68,11 @@ var etcdDefaultTxnRetryParam = retry.Params{ var etcdDefaultTxnStrategy = retry.FiniteRetryStrategy{} // CreateClient creates an etcd client with some default config items. -func CreateClient(endpoints []string) (*clientv3.Client, error) { +func CreateClient(endpoints []string, tlsCfg *tls.Config) (*clientv3.Client, error) { return clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: DefaultDialTimeout, + TLS: tlsCfg, }) } diff --git a/pkg/etcdutil/etcdutil_test.go b/pkg/etcdutil/etcdutil_test.go index b68a9de9a2..274bc20999 100644 --- a/pkg/etcdutil/etcdutil_test.go +++ b/pkg/etcdutil/etcdutil_test.go @@ -101,7 +101,7 @@ func (t *testEtcdUtilSuite) startEtcd(c *C, cfg *embed.Config) *embed.Etcd { } func (t *testEtcdUtilSuite) createEtcdClient(c *C, cfg *embed.Config) *clientv3.Client { - cli, err := CreateClient(t.urlsToStrings(cfg.LCUrls)) + cli, err := CreateClient(t.urlsToStrings(cfg.LCUrls), nil) c.Assert(err, IsNil) return cli } diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index daae22af16..95bcd57c6a 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -152,6 +152,10 @@ const ( // pkg/shardddl/optimism codeShardDDLOptimismTrySyncFail + + // pkg/conn + codeConnInvalidTLSConfig + codeConnRegistryTLSConfig ) // Config related error code list @@ -419,6 +423,8 @@ const ( codeMasterOptimistNotStarted codeMasterMasterNameNotExist codeMasterInvalidOfflineType + codeMasterAdvertisePeerURLsNotValid + codeMasterTLSConfigNotValid ) // DM-worker error code @@ -498,6 +504,7 @@ const ( codeWorkerFailToGetSubtaskConfigFromEtcd codeWorkerFailToGetSourceConfigFromEtcd codeWorkerDDLLockOpNotFound + codeWorkerTLSConfigNotValid ) // DM-tracer error code @@ -554,6 +561,7 @@ const ( // dmctl error code const ( codeCtlGRPCCreateConn ErrCode = iota + 48001 + codeCtlInvalidTLSCfg ) // default error code @@ -701,6 +709,10 @@ var ( // pkg/shardddl/optimism ErrShardDDLOptimismTrySyncFail = New(codeShardDDLOptimismTrySyncFail, ClassFunctional, ScopeInternal, LevelMedium, "fail to try sync the optimistic shard ddl lock %s: %s", "Please use show-ddl-locks command for more details.") + // pkg/conn + ErrConnInvalidTLSConfig = New(codeConnInvalidTLSConfig, ClassFunctional, ScopeInternal, LevelMedium, "invalid TLS config", "Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config.") + ErrConnRegistryTLSConfig = New(codeConnRegistryTLSConfig, ClassFunctional, ScopeInternal, LevelMedium, "fail to registry TLS config", "") + // Config related error ErrConfigCheckItemNotSupport = New(codeConfigCheckItemNotSupport, ClassConfig, ScopeInternal, LevelMedium, "checking item %s is not supported\n%s", "Please check `ignore-checking-items` config in task configuration file, which can be set including `all`/`dump_privilege`/`replication_privilege`/`version`/`binlog_enable`/`binlog_format`/`binlog_row_image`/`table_schema`/`schema_of_shard_tables`/`auto_increment_ID`.") ErrConfigTomlTransform = New(codeConfigTomlTransform, ClassConfig, ScopeInternal, LevelMedium, "%s", "Please check the configuration file has correct TOML format.") @@ -950,6 +962,9 @@ var ( ErrMasterMasterNameNotExist = New(codeMasterMasterNameNotExist, ClassDMMaster, ScopeInternal, LevelLow, "dm-master with name %s not exists", "Please use list-member command to see masters.") ErrMasterInvalidOfflineType = New(codeMasterInvalidOfflineType, ClassDMMaster, ScopeInternal, LevelLow, "offline member type %s is invalid", "Please use master/worker.") + ErrMasterAdvertisePeerURLsNotValid = New(codeMasterAdvertisePeerURLsNotValid, ClassDMMaster, ScopeInternal, LevelHigh, "advertise peer urls %s not valid", "Please check the `advertise-peer-urls` config in master configuration file.") + ErrMasterTLSConfigNotValid = New(codeMasterTLSConfigNotValid, ClassDMMaster, ScopeInternal, LevelHigh, "TLS config not valid", "Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in master configuration file.") + // DM-worker error ErrWorkerParseFlagSet = New(codeWorkerParseFlagSet, ClassDMWorker, ScopeInternal, LevelMedium, "parse dm-worker config flag set", "") ErrWorkerInvalidFlag = New(codeWorkerInvalidFlag, ClassDMWorker, ScopeInternal, LevelMedium, "'%s' is an invalid flag", "") @@ -1026,8 +1041,8 @@ var ( ErrWorkerFailToGetSubtaskConfigFromEtcd = New(codeWorkerFailToGetSubtaskConfigFromEtcd, ClassDMWorker, ScopeInternal, LevelMedium, "there is no relative subtask config for task %s in etcd", "") ErrWorkerFailToGetSourceConfigFromEtcd = New(codeWorkerFailToGetSourceConfigFromEtcd, ClassDMWorker, ScopeInternal, LevelMedium, "there is no relative source config for source %s in etcd", "") - - ErrWorkerDDLLockOpNotFound = New(codeWorkerDDLLockOpNotFound, ClassDMWorker, ScopeInternal, LevelHigh, "missing shard DDL lock operation for shard DDL info (%s)", "") + ErrWorkerDDLLockOpNotFound = New(codeWorkerDDLLockOpNotFound, ClassDMWorker, ScopeInternal, LevelHigh, "missing shard DDL lock operation for shard DDL info (%s)", "") + ErrWorkerTLSConfigNotValid = New(codeWorkerTLSConfigNotValid, ClassDMWorker, ScopeInternal, LevelHigh, "TLS config not valid", "Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in worker configuration file.") // DM-tracer error ErrTracerParseFlagSet = New(codeTracerParseFlagSet, ClassDMTracer, ScopeInternal, LevelMedium, "parse dm-tracer config flag set", "") @@ -1084,6 +1099,7 @@ var ( // dmctl ErrCtlGRPCCreateConn = New(codeCtlGRPCCreateConn, ClassDMCtl, ScopeInternal, LevelHigh, "can not create grpc connection", "Please check your network connection.") + ErrCtlInvalidTLSCfg = New(codeCtlInvalidTLSCfg, ClassDMCtl, ScopeInternal, LevelMedium, "invalid TLS config", "Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in command line.") // default error ErrNotSet = New(codeNotSet, ClassNotSet, ScopeNotSet, LevelHigh, "", "") diff --git a/pkg/tracing/tracer.go b/pkg/tracing/tracer.go index d90508494b..38fefae93a 100644 --- a/pkg/tracing/tracer.go +++ b/pkg/tracing/tracer.go @@ -78,6 +78,7 @@ func (t *Tracer) Enable() bool { // Start starts tracing service func (t *Tracer) Start() { + // TODO: use tls conn, err := grpc.Dial(t.cfg.TracerAddr, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(3*time.Second)) if err != nil { t.logger.Error("grpc dial failed", log.ShortError(err)) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index ff59e86140..913cdce799 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + sqlmock "github.com/DATA-DOG/go-sqlmock" "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/pkg/binlog" @@ -36,7 +37,6 @@ import ( streamer2 "github.com/pingcap/dm/pkg/streamer" "github.com/pingcap/dm/pkg/utils" - "github.com/DATA-DOG/go-sqlmock" _ "github.com/go-sql-driver/mysql" . "github.com/pingcap/check" "github.com/pingcap/parser" @@ -950,7 +950,7 @@ func (s *testSyncerSuite) TestGeneratedColumn(c *C) { // use upstream dbConn as mock downstream dbConn, err := db.Conn(context.Background()) c.Assert(err, IsNil) - syncer.fromDB = &UpStreamConn{BaseDB: conn.NewBaseDB(db)} + syncer.fromDB = &UpStreamConn{BaseDB: conn.NewBaseDB(db, func() {})} syncer.ddlDBConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})} syncer.toDBConns = []*DBConn{{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}} syncer.reset() @@ -1127,7 +1127,7 @@ func (s *testSyncerSuite) TestRun(c *C) { s.cfg.DisableCausality = false syncer := NewSyncer(s.cfg, nil) - syncer.fromDB = &UpStreamConn{BaseDB: conn.NewBaseDB(db)} + syncer.fromDB = &UpStreamConn{BaseDB: conn.NewBaseDB(db, func() {})} syncer.toDBConns = []*DBConn{{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}, {cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}} syncer.ddlDBConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})} diff --git a/tests/_dmctl_tools/check_master_online.go b/tests/_dmctl_tools/check_master_online.go index 746b529540..a1a986fb78 100644 --- a/tests/_dmctl_tools/check_master_online.go +++ b/tests/_dmctl_tools/check_master_online.go @@ -22,12 +22,26 @@ import ( "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/tests/utils" + toolutils "github.com/pingcap/tidb-tools/pkg/utils" ) // use show-ddl-locks request to test DM-master is online func main() { addr := os.Args[1] - conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(2*time.Second)) + + secureOpt := grpc.WithInsecure() + if len(os.Args) == 5 { + sslCA := os.Args[2] + sslCert := os.Args[3] + sslKey := os.Args[4] + tls, err := toolutils.NewTLS(sslCA, sslCert, sslKey, "", nil) + if err != nil { + utils.ExitWithError(err) + } + secureOpt = tls.ToGRPCDialOption() + } + + conn, err := grpc.Dial(addr, secureOpt, grpc.WithBackoffMaxDelay(2*time.Second)) if err != nil { utils.ExitWithError(err) } diff --git a/tests/_dmctl_tools/check_master_online_http.go b/tests/_dmctl_tools/check_master_online_http.go new file mode 100644 index 0000000000..ea126db44e --- /dev/null +++ b/tests/_dmctl_tools/check_master_online_http.go @@ -0,0 +1,62 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "io/ioutil" + "net/http" + "os" + + "github.com/pingcap/dm/tests/utils" + + toolutils "github.com/pingcap/tidb-tools/pkg/utils" +) + +// use show-ddl-locks request to test DM-master is online +func main() { + addr := os.Args[1] + sslCA := "" + sslCert := "" + sslKey := "" + transport := http.DefaultTransport.(*http.Transport).Clone() + + if len(os.Args) == 5 { + sslCA = os.Args[2] + sslCert = os.Args[3] + sslKey = os.Args[4] + + tls, err := toolutils.NewTLS(sslCA, sslCert, sslKey, "", nil) + if err != nil { + utils.ExitWithError(err) + } + + tlsCfg := tls.TLSConfig() + tlsCfg.InsecureSkipVerify = true + transport.TLSClientConfig = tlsCfg + } + + client := &http.Client{Transport: transport} + + resp, err := client.Get("https://" + addr + "/status") + if err != nil { + utils.ExitWithError(err) + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + utils.ExitWithError(err) + } + fmt.Println(string(body)) +} diff --git a/tests/_dmctl_tools/check_worker_online.go b/tests/_dmctl_tools/check_worker_online.go index 3d3058209f..0bcbb804c2 100644 --- a/tests/_dmctl_tools/check_worker_online.go +++ b/tests/_dmctl_tools/check_worker_online.go @@ -22,12 +22,26 @@ import ( "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/tests/utils" + toolutils "github.com/pingcap/tidb-tools/pkg/utils" ) // use query status request to test DM-worker is online func main() { addr := os.Args[1] - conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(2*time.Second)) + + secureOpt := grpc.WithInsecure() + if len(os.Args) == 5 { + sslCA := os.Args[2] + sslCert := os.Args[3] + sslKey := os.Args[4] + tls, err := toolutils.NewTLS(sslCA, sslCert, sslKey, "", nil) + if err != nil { + utils.ExitWithError(err) + } + secureOpt = tls.ToGRPCDialOption() + } + + conn, err := grpc.Dial(addr, secureOpt, grpc.WithBackoffMaxDelay(2*time.Second)) if err != nil { utils.ExitWithError(err) } diff --git a/tests/_utils/check_rpc_alive b/tests/_utils/check_rpc_alive index 85b96fab44..10abf5523c 100755 --- a/tests/_utils/check_rpc_alive +++ b/tests/_utils/check_rpc_alive @@ -5,10 +5,21 @@ check_tool=$1 addr=$2 +ssl_ca="" +ssl_cert="" +ssl_key="" + + +if [ $# == 5 ] ; then + ssl_ca=$3 + ssl_cert=$4 + ssl_key=$5 +fi + i=0 while [ $i -lt 10 ] do - bash -c "$check_tool $addr > /dev/null 2>&1" + bash -c "$check_tool $addr $ssl_ca $ssl_cert $ssl_key" ret=$? if [ "$ret" == 0 ]; then echo "rpc addr $addr is alive" diff --git a/tests/_utils/run_dm_ctl_with_tls b/tests/_utils/run_dm_ctl_with_tls new file mode 100755 index 0000000000..694d8ef902 --- /dev/null +++ b/tests/_utils/run_dm_ctl_with_tls @@ -0,0 +1,44 @@ +#!/bin/bash +# tools to run dmctl from command line +# parameter 1: work directory +# parameter 2: master-addr port +# parameter 3: ssl-ca path +# parameter 4: ssl-cert path +# parameter 5: ssl-key path +# parameter 6: command + +workdir=$1 +master_addr=$2 +ssl_ca=$3 +ssl_cert=$4 +ssl_key=$5 +cmd=$6 + + + +shift 6 + +PWD=$(pwd) +binary=$PWD/bin/dmctl.test +ts=$(date +"%s") +dmctl_log=$workdir/dmctl.$ts.log +pid=$$ +echo "dmctl test cmd: \"$cmd\"" +echo "$cmd" | $binary -test.coverprofile="$TEST_DIR/cov.$TEST_NAME.dmctl.$ts.$pid.out" DEVEL -master-addr=$master_addr --ssl-ca $ssl_ca --ssl-cert $ssl_cert --ssl-key $ssl_key > $dmctl_log 2>&1 + +for ((i=1; i<$#; i+=2)); do + j=$((i+1)) + value=${!i} + expected=${!j} + got=$(sed "s/$value/$value\n/g" $dmctl_log | grep -c "$value") + if [ "$got" != "$expected" ]; then + echo "command: $cmd $value count: $got != expected: $expected" + cat $dmctl_log + exit 1 + fi +done + +# gocovmerge doesn't support merge profiles with different modes, however atomic +# mode and count mode have the same profile format, so we need to unify cover +# mode before running gocovmerge. As coverage file is not generated synchronously, +# we will patch covermode before `make coverage` diff --git a/tests/_utils/run_tidb_server b/tests/_utils/run_tidb_server index 161bb877ad..3cc2226311 100755 --- a/tests/_utils/run_tidb_server +++ b/tests/_utils/run_tidb_server @@ -6,6 +6,7 @@ set -eu PORT=$1 PASSWORD=$2 + TEST_DIR=/tmp/dm_test echo "Starting TiDB on port ${PORT}" diff --git a/tests/all_mode/run.sh b/tests/all_mode/run.sh index fa11eafd8a..d2be307b56 100755 --- a/tests/all_mode/run.sh +++ b/tests/all_mode/run.sh @@ -81,7 +81,6 @@ function run() { dmctl_operate_source create $WORK_DIR/source1.toml $SOURCE_ID1 dmctl_operate_source create $WORK_DIR/source2.toml $SOURCE_ID2 - # start DM task only cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task.yaml # test deprecated config @@ -116,11 +115,11 @@ function run() { sleep 2 # wait for task running - check_http_alive 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/status/test '"name":"test","stage":"Running"' 10 + check_http_alive 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/status/test '"stage": "Running"' 10 sleep 2 # still wait for subtask running on other dm-workers # wait for task running - check_http_alive 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/status/test '"name":"test","stage":"Running"' 10 + check_http_alive 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/status/test '"stage": "Running"' 10 sleep 2 # still wait for subtask running on other dm-workers # kill tidb diff --git a/tests/dmctl_command/run.sh b/tests/dmctl_command/run.sh index 05684eefc2..34d474fd5e 100644 --- a/tests/dmctl_command/run.sh +++ b/tests/dmctl_command/run.sh @@ -6,7 +6,7 @@ cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) source $cur/../_utils/test_prepare WORK_DIR=$TEST_DIR/$TEST_NAME -help_cnt=39 +help_cnt=42 function run() { # check dmctl alone output diff --git a/tests/ha/run.sh b/tests/ha/run.sh index f92682bcf8..acc87d16bd 100755 --- a/tests/ha/run.sh +++ b/tests/ha/run.sh @@ -64,7 +64,7 @@ function run() { sleep 8 echo "wait and check task running" - check_http_alive 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/status/test '"name":"test","stage":"Running"' 10 + check_http_alive 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/status/test '"stage": "Running"' 10 echo "query-status from all dm-master" run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT1" \ diff --git a/tests/ha_cases/run.sh b/tests/ha_cases/run.sh index 332728e86b..15b79d912b 100755 --- a/tests/ha_cases/run.sh +++ b/tests/ha_cases/run.sh @@ -150,7 +150,7 @@ function test_kill_master() { echo "waiting 5 seconds" sleep 5 echo "check task is running" - check_http_alive 127.0.0.1:$MASTER_PORT2/apis/${API_VERSION}/status/test '"name":"test","stage":"Running"' 10 + check_http_alive 127.0.0.1:$MASTER_PORT2/apis/${API_VERSION}/status/test '"stage": "Running"' 10 echo "check master2,3 are running" run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT2" \ @@ -188,7 +188,7 @@ function test_kill_and_isolate_worker() { check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER3_PORT echo "wait and check task running" - check_http_alive 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/status/test '"name":"test","stage":"Running"' 10 + check_http_alive 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/status/test '"stage": "Running"' 10 run_dm_worker $WORK_DIR/worker4 $WORKER4_PORT $cur/conf/dm-worker4.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER4_PORT @@ -199,7 +199,7 @@ function test_kill_and_isolate_worker() { rm -rf $WORK_DIR/worker3/relay_log echo "wait and check task running" - check_http_alive 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/status/test '"name":"test","stage":"Running"' 10 + check_http_alive 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/status/test '"stage": "Running"' 10 run_dm_worker $WORK_DIR/worker3 $WORKER3_PORT $cur/conf/dm-worker3.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER3_PORT @@ -275,7 +275,7 @@ function test_kill_master_in_sync() { echo "wait and check task running" sleep 1 - check_http_alive 127.0.0.1:$MASTER_PORT2/apis/${API_VERSION}/status/test '"name":"test","stage":"Running"' 10 + check_http_alive 127.0.0.1:$MASTER_PORT2/apis/${API_VERSION}/status/test '"stage": "Running"' 10 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT2" \ "query-status test" \ "\"stage\": \"Running\"" 2 @@ -311,7 +311,7 @@ function test_kill_worker_in_sync() { echo "wait and check task running" - check_http_alive 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/status/test '"name":"test","stage":"Running"' 10 + check_http_alive 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/status/test '"stage": "Running"' 10 echo "query-status from all dm-master" run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT1" \ diff --git a/tests/ha_master/run.sh b/tests/ha_master/run.sh index 7cdda88122..c58dba03dd 100755 --- a/tests/ha_master/run.sh +++ b/tests/ha_master/run.sh @@ -47,7 +47,7 @@ function test_evict_leader() { # evict leader twice, and test evict leader from http interface curl -X PUT 127.0.0.1:$LEADER_PORT/apis/v1alpha1/leader/1 > $WORK_DIR/evict_leader.log - check_log_contains $WORK_DIR/evict_leader.log "\"result\":true" 1 + check_log_contains $WORK_DIR/evict_leader.log "\"result\": true" 1 # will get_leader failed because evict leader on all master, so just skip if [ $i = 4 ]; then @@ -68,7 +68,7 @@ function test_evict_leader() { echo "cancel evict leader twice, and test cancel evict leader from http interface" curl -X PUT 127.0.0.1:$MASTER_PORT1/apis/v1alpha1/leader/2 > $WORK_DIR/cancel_evict_leader.log - check_log_contains $WORK_DIR/cancel_evict_leader.log "\"result\":true" 1 + check_log_contains $WORK_DIR/cancel_evict_leader.log "\"result\": true" 1 LEADER_NAME=$(get_leader $WORK_DIR 127.0.0.1:${MASTER_PORT1}) echo "leader is $LEADER_NAME" @@ -307,7 +307,7 @@ function run() { check_port_offline $MASTER_PORT2 20 echo "wait and check task running" - check_http_alive 127.0.0.1:$MASTER_PORT3/apis/${API_VERSION}/status/test '"name":"test","stage":"Running"' 10 + check_http_alive 127.0.0.1:$MASTER_PORT3/apis/${API_VERSION}/status/test '"stage": "Running"' 10 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT3" \ "query-status test" \ "\"stage\": \"Running\"" 2 diff --git a/tests/http_apis/run.sh b/tests/http_apis/run.sh index b0343632d0..a056618e71 100644 --- a/tests/http_apis/run.sh +++ b/tests/http_apis/run.sh @@ -34,8 +34,8 @@ function run() { rm $WORK_DIR/source1.toml.bak echo $source_data curl -X PUT 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/sources -d '{"op": 1, "config": ["'"$source_data"'"]}' > $WORK_DIR/create-source.log - check_log_contains $WORK_DIR/create-source.log "\"result\":true" 1 - check_log_contains $WORK_DIR/create-source.log "\"source\":\"$SOURCE_ID1\"" 1 + check_log_contains $WORK_DIR/create-source.log "\"result\": true" 2 + check_log_contains $WORK_DIR/create-source.log "\"source\": \"$SOURCE_ID1\"" 1 echo "start task and check stage" cat $cur/conf/dm-task.yaml | sed 's/$/\\n/' | sed 's/"/\\"/g' | tr -d '\n' > $WORK_DIR/task.yaml.bak @@ -46,12 +46,12 @@ function run() { check_http_alive 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/status/test-task "task test-task has no source or not exist" 3 curl -X POST 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/tasks -d '{"task": "'"$task_data"'"}' > $WORK_DIR/start-task.log - check_log_contains $WORK_DIR/start-task.log "\"result\":true" 1 + check_log_contains $WORK_DIR/start-task.log "\"result\": true" 2 sleep 1 curl -X GET 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/status/test > $WORK_DIR/status.log - check_log_contains $WORK_DIR/status.log "\"stage\":\"Running\"" 1 - check_log_contains $WORK_DIR/status.log "\"name\":\"test\"" 1 + check_log_contains $WORK_DIR/status.log "\"stage\": \"Running\"" 1 + check_log_contains $WORK_DIR/status.log "\"name\": \"test\"" 1 echo "get sub task configs" curl -X GET 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/subtasks/test > $WORK_DIR/subtask.log @@ -63,36 +63,38 @@ function run() { echo "pause task and check stage" curl -X PUT 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/tasks/test -d '{ "op": 2 }' > $WORK_DIR/pause.log - check_log_contains $WORK_DIR/pause.log "\"op\":\"Pause\"" 1 + check_log_contains $WORK_DIR/pause.log "\"op\": \"Pause\"" 1 sleep 1 curl -X GET 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/status/test > $WORK_DIR/status.log - check_log_contains $WORK_DIR/status.log "\"stage\":\"Paused\"" 1 - check_log_contains $WORK_DIR/status.log "\"name\":\"test\"" 1 + check_log_contains $WORK_DIR/status.log "\"stage\": \"Paused\"" 1 + check_log_contains $WORK_DIR/status.log "\"name\": \"test\"" 1 echo "resume task and check stage" curl -X PUT 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/tasks/test -d '{ "op": 3 }' > $WORK_DIR/resume.log - check_log_contains $WORK_DIR/resume.log "\"op\":\"Resume\"" 1 + check_log_contains $WORK_DIR/resume.log "\"op\": \"Resume\"" 1 sleep 1 curl -X GET 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/status/test > $WORK_DIR/status.log - check_log_contains $WORK_DIR/status.log "\"stage\":\"Running\"" 1 - check_log_contains $WORK_DIR/status.log "\"name\":\"test\"" 1 + check_log_contains $WORK_DIR/status.log "\"stage\": \"Running\"" 1 + check_log_contains $WORK_DIR/status.log "\"name\": \"test\"" 1 sleep 1 curl -X GET "127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/members?leader=true&master=true&worker=true" > $WORK_DIR/list-member.log - check_log_contains $WORK_DIR/list-member.log "\"leader\":{\"name\":\"master1\"" 1 - check_log_contains $WORK_DIR/list-member.log "\"masters\":\[{\"name\":\"master1\"" 1 - check_log_contains $WORK_DIR/list-member.log "\"workers\":\[{\"name\":\"worker1\"" 1 - check_log_contains $WORK_DIR/list-member.log "\"stage\":\"bound\"" 1 - check_log_contains $WORK_DIR/list-member.log "\"source\":\"mysql-replica-01\"" 1 + check_log_contains $WORK_DIR/list-member.log "leader" 1 + check_log_contains $WORK_DIR/list-member.log "masters" 1 + check_log_contains $WORK_DIR/list-member.log "workers" 1 + check_log_contains $WORK_DIR/list-member.log "\"name\": \"master1\"" 2 # one in leader, one in masters + check_log_contains $WORK_DIR/list-member.log "\"name\": \"worker1\"" 1 + check_log_contains $WORK_DIR/list-member.log "\"stage\": \"bound\"" 1 + check_log_contains $WORK_DIR/list-member.log "\"source\": \"mysql-replica-01\"" 1 sleep 1 echo "kill dm-worker1" ps aux | grep dm-worker1 |awk '{print $2}'|xargs kill || true check_port_offline $WORKER1_PORT 20 curl -X DELETE 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/members/worker/worker1 > $WORK_DIR/offline-worker.log - check_log_contains $WORK_DIR/offline-worker.log "\"result\":true" 1 + check_log_contains $WORK_DIR/offline-worker.log "\"result\": true" 1 echo "check data" check_sync_diff $WORK_DIR $cur/conf/diff_config.toml diff --git a/tests/others_integration.txt b/tests/others_integration.txt index fe0ed0fbb5..27610d24b4 100644 --- a/tests/others_integration.txt +++ b/tests/others_integration.txt @@ -1,3 +1,4 @@ +tls sharding2 ha ha_master diff --git a/tests/tls/conf/ca.pem b/tests/tls/conf/ca.pem new file mode 100644 index 0000000000..9fc215fa83 --- /dev/null +++ b/tests/tls/conf/ca.pem @@ -0,0 +1,8 @@ +-----BEGIN CERTIFICATE----- +MIIBGDCBwAIJAOjYXLFw5V1HMAoGCCqGSM49BAMCMBQxEjAQBgNVBAMMCWxvY2Fs +aG9zdDAgFw0yMDAzMTcxMjAwMzNaGA8yMjkzMTIzMTEyMDAzM1owFDESMBAGA1UE +AwwJbG9jYWxob3N0MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEglCIJD8uVBfD +kuM+UQP+VA7Srbz17WPLA0Sqc+sQ2p6fT6HYKCW60EXiZ/yEC0925iyVbXEEbX4J +xCc2Heow5TAKBggqhkjOPQQDAgNHADBEAiAILL3Zt/3NFeDW9c9UAcJ9lc92E0ZL +GNDuH6i19Fex3wIgT0ZMAKAFSirGGtcLu0emceuk+zVKjJzmYbsLdpj/JuQ= +-----END CERTIFICATE----- diff --git a/tests/tls/conf/dm-master1.toml b/tests/tls/conf/dm-master1.toml new file mode 100644 index 0000000000..73b2016072 --- /dev/null +++ b/tests/tls/conf/dm-master1.toml @@ -0,0 +1,11 @@ +# Master Configuration. +name = "master1" +master-addr = ":8261" +advertise-addr = "127.0.0.1:8261" +peer-urls = "127.0.0.1:8291" +initial-cluster = "master1=https://127.0.0.1:8291,master2=https://127.0.0.1:8292,master3=https://127.0.0.1:8293" + +ssl-ca = "dir-placeholer/ca.pem" +ssl-cert = "dir-placeholer/dm.pem" +ssl-key = "dir-placeholer/dm.key" +cert-allowed-cn = ["dm"] diff --git a/tests/tls/conf/dm-master2.toml b/tests/tls/conf/dm-master2.toml new file mode 100644 index 0000000000..a3d7e9f2fc --- /dev/null +++ b/tests/tls/conf/dm-master2.toml @@ -0,0 +1,11 @@ +# Master Configuration. +name = "master2" +master-addr = ":8361" +advertise-addr = "127.0.0.1:8361" +peer-urls = "127.0.0.1:8292" +initial-cluster = "master1=https://127.0.0.1:8291,master2=https://127.0.0.1:8292,master3=https://127.0.0.1:8293" + +ssl-ca = "dir-placeholer/ca.pem" +ssl-cert = "dir-placeholer/dm.pem" +ssl-key = "dir-placeholer/dm.key" +cert-allowed-cn = ["dm"] \ No newline at end of file diff --git a/tests/tls/conf/dm-master3.toml b/tests/tls/conf/dm-master3.toml new file mode 100644 index 0000000000..8bd1bf4eb7 --- /dev/null +++ b/tests/tls/conf/dm-master3.toml @@ -0,0 +1,11 @@ +# Master Configuration. +name = "master3" +master-addr = ":8461" +advertise-addr = "127.0.0.1:8461" +peer-urls = "127.0.0.1:8293" +initial-cluster = "master1=https://127.0.0.1:8291,master2=https://127.0.0.1:8292,master3=https://127.0.0.1:8293" + +ssl-ca = "dir-placeholer/ca.pem" +ssl-cert = "dir-placeholer/dm.pem" +ssl-key = "dir-placeholer/dm.key" +cert-allowed-cn = ["dm"] \ No newline at end of file diff --git a/tests/tls/conf/dm-task.yaml b/tests/tls/conf/dm-task.yaml new file mode 100644 index 0000000000..89ffbf6b1c --- /dev/null +++ b/tests/tls/conf/dm-task.yaml @@ -0,0 +1,46 @@ +--- +name: test +task-mode: all +is-sharding: false +meta-schema: "dm_meta" +# enable-heartbeat: true +timezone: "Asia/Shanghai" + +target-database: + host: "127.0.0.1" + port: 4400 + user: "root" + password: "" + security: + ssl-ca: "dir-placeholer/ca.pem" + ssl-cert: "dir-placeholer/dm.pem" + ssl-key: "dir-placeholer/dm.key" + +mysql-instances: + - source-id: "mysql-replica-01" + black-white-list: "instance" + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + +black-white-list: + instance: + do-dbs: ["tls"] + +mydumpers: + global: + mydumper-path: "./bin/mydumper" + threads: 4 + chunk-filesize: 0 + skip-tz-utc: true + extra-args: "--statement-size=100" + +loaders: + global: + pool-size: 16 + dir: "./dumped_data" + +syncers: + global: + worker-count: 16 + batch: 100 diff --git a/tests/tls/conf/dm-worker1.toml b/tests/tls/conf/dm-worker1.toml new file mode 100644 index 0000000000..7e57bcf274 --- /dev/null +++ b/tests/tls/conf/dm-worker1.toml @@ -0,0 +1,7 @@ +name = "worker1" +join = "127.0.0.1:8261" + +ssl-ca = "dir-placeholer/ca.pem" +ssl-cert = "dir-placeholer/dm.pem" +ssl-key = "dir-placeholer/dm.key" +cert-allowed-cn = ["dm"] \ No newline at end of file diff --git a/tests/tls/conf/dm.key b/tests/tls/conf/dm.key new file mode 100644 index 0000000000..dfdc077bc4 --- /dev/null +++ b/tests/tls/conf/dm.key @@ -0,0 +1,8 @@ +-----BEGIN EC PARAMETERS----- +BggqhkjOPQMBBw== +-----END EC PARAMETERS----- +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEICF/GDtVxhTPTP501nOu4jgwGSDY01xN+61xd9MfChw+oAoGCCqGSM49 +AwEHoUQDQgAEgQOv5bQO7xK16vZWhwJqlz2vl19+AXW2Ql7KQyGiBJVSvLbyDLOr +kIeFlHN04iqQ39SKSOSfeGSfRt6doU6IcA== +-----END EC PRIVATE KEY----- diff --git a/tests/tls/conf/dm.pem b/tests/tls/conf/dm.pem new file mode 100644 index 0000000000..d4f846e3a2 --- /dev/null +++ b/tests/tls/conf/dm.pem @@ -0,0 +1,10 @@ +-----BEGIN CERTIFICATE----- +MIIBZDCCAQqgAwIBAgIJAIT/lgXUc1JqMAoGCCqGSM49BAMCMBQxEjAQBgNVBAMM +CWxvY2FsaG9zdDAgFw0yMDAzMTcxMjAwMzNaGA8yMjkzMTIzMTEyMDAzM1owDTEL +MAkGA1UEAwwCZG0wWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAASBA6/ltA7vErXq +9laHAmqXPa+XX34BdbZCXspDIaIElVK8tvIMs6uQh4WUc3TiKpDf1IpI5J94ZJ9G +3p2hTohwo0owSDAaBgNVHREEEzARgglsb2NhbGhvc3SHBH8AAAEwCwYDVR0PBAQD +AgWgMB0GA1UdJQQWMBQGCCsGAQUFBwMCBggrBgEFBQcDATAKBggqhkjOPQQDAgNI +ADBFAiEAx6ljJ+tNa55ypWLGNqmXlB4UdMmKmE4RSKJ8mmEelfECIG2ZmCE59rv5 +wImM6KnK+vM2QnEiISH3PeYyyRzQzycu +-----END CERTIFICATE----- diff --git a/tests/tls/conf/generate_tls.sh b/tests/tls/conf/generate_tls.sh new file mode 100644 index 0000000000..fca4020ad2 --- /dev/null +++ b/tests/tls/conf/generate_tls.sh @@ -0,0 +1,28 @@ +#!/bin/bash + +# this script used to generate tls file + +cat - > "ipsan.cnf" < /dev/null + +for role in dm other; do + openssl ecparam -out "$role.key" -name prime256v1 -genkey + openssl req -new -batch -sha256 -subj "/CN=${role}" -key "$role.key" -out "$role.csr" + openssl x509 -req -sha256 -days 100000 -extensions EXT -extfile "ipsan.cnf" -in "$role.csr" -CA "ca.pem" -CAkey "ca.key" -CAcreateserial -out "$role.pem" 2> /dev/null +done \ No newline at end of file diff --git a/tests/tls/conf/other.key b/tests/tls/conf/other.key new file mode 100644 index 0000000000..ee95ca5faa --- /dev/null +++ b/tests/tls/conf/other.key @@ -0,0 +1,8 @@ +-----BEGIN EC PARAMETERS----- +BggqhkjOPQMBBw== +-----END EC PARAMETERS----- +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEICzbWZZ7dtha0lGXlBiP3QjCurPs5ExsR5thIQCcKCKeoAoGCCqGSM49 +AwEHoUQDQgAEt5z9ACtEnsqv0ZPFx5YJhvBNQZJCEb75ZS/kDBiPoISea1HMt1w8 +4ZkeWW+SBCwt0RtwzVPRq9VUGWaFRUOwdQ== +-----END EC PRIVATE KEY----- diff --git a/tests/tls/conf/other.pem b/tests/tls/conf/other.pem new file mode 100644 index 0000000000..5710106ffe --- /dev/null +++ b/tests/tls/conf/other.pem @@ -0,0 +1,10 @@ +-----BEGIN CERTIFICATE----- +MIIBZzCCAQ2gAwIBAgIJAIT/lgXUc1JrMAoGCCqGSM49BAMCMBQxEjAQBgNVBAMM +CWxvY2FsaG9zdDAgFw0yMDAzMTcxMjAwMzNaGA8yMjkzMTIzMTEyMDAzM1owEDEO +MAwGA1UEAwwFb3RoZXIwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAAS3nP0AK0Se +yq/Rk8XHlgmG8E1BkkIRvvllL+QMGI+ghJ5rUcy3XDzhmR5Zb5IELC3RG3DNU9Gr +1VQZZoVFQ7B1o0owSDAaBgNVHREEEzARgglsb2NhbGhvc3SHBH8AAAEwCwYDVR0P +BAQDAgWgMB0GA1UdJQQWMBQGCCsGAQUFBwMCBggrBgEFBQcDATAKBggqhkjOPQQD +AgNIADBFAiEA34/Vz7SaJWqYOgOLyr+y1OwiT9R7yTgBQCSSvGC+HpsCIA20BhNe +RnicYz+9qOQRxAFP1wpIyMMgOK4tKuZhx+/J +-----END CERTIFICATE----- diff --git a/tests/tls/conf/source1.toml b/tests/tls/conf/source1.toml new file mode 100644 index 0000000000..7b08068ef2 --- /dev/null +++ b/tests/tls/conf/source1.toml @@ -0,0 +1,13 @@ +# MySQL Configuration. + +source-id = "mysql-replica-01" +flavor = "" +enable-gtid = true +relay-binlog-name = "" +relay-binlog-gtid = "" + +[from] +host = "127.0.0.1" +user = "root" +password = "/Q7B9DizNLLTTfiZHv9WoEAKamfpIUs=" +port = 3306 diff --git a/tests/tls/run.sh b/tests/tls/run.sh new file mode 100644 index 0000000000..38733da24a --- /dev/null +++ b/tests/tls/run.sh @@ -0,0 +1,129 @@ +#!/bin/bash + +set -eu + +cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +source $cur/../_utils/test_prepare + +WORK_DIR=$TEST_DIR/$TEST_NAME + +API_VERSION="v1alpha1" + +function run_tidb_with_tls() { + echo "run a new tidb server with tls" + cat - > "$WORK_DIR/tidb-tls-config.toml" < $WORK_DIR/pause.log || cat $WORK_DIR/pause.log + #check_log_contains $WORK_DIR/pause.log "\"result\": true" 2 + + #echo "query status" + #curl -X GET --cacert "$cur/conf/ca.pem" --key "$cur/conf/dm.key" --cert "$cur/conf/dm.pem" https://127.0.0.1:$MASTER_PORT1/apis/$API_VERSION/status/test > $WORK_DIR/status.log || cat $WORK_DIR/status.log + #check_log_contains $WORK_DIR/status.log "\"stage\": \"Paused\"" 1 + + sleep 1 + + echo "check data" + mysql -uroot -h127.0.0.1 -P4400 --default-character-set utf8 --ssl-ca $cur/conf/ca.pem --ssl-cert $cur/conf/dm.pem --ssl-key $cur/conf/dm.key -E -e "select count(*) from tls.t" > "$TEST_DIR/sql_res.$TEST_NAME.txt" + check_contains "count(*): 20" +} + +cleanup_data tls +cleanup_process + +run $* + +# kill the tidb with tls +pkill -hup tidb-server 2>/dev/null || true +wait_process_exit tidb-server + +run_tidb_server 4000 $TIDB_PASSWORD + +cleanup_process + +echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>" diff --git a/tests/utils/dmctl.go b/tests/utils/dmctl.go index 5cda2633a8..3a87761fc0 100644 --- a/tests/utils/dmctl.go +++ b/tests/utils/dmctl.go @@ -26,6 +26,7 @@ import ( ) func CreateDmCtl(addr string) (pb.MasterClient, error) { + // TODO: use tls, this function is not used conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(3*time.Second)) if err != nil { return nil, errors.Trace(err)