diff --git a/cmd/tidb-lightning-ctl/main.go b/cmd/tidb-lightning-ctl/main.go index f83283d066cfb..07d5a8693f74e 100644 --- a/cmd/tidb-lightning-ctl/main.go +++ b/cmd/tidb-lightning-ctl/main.go @@ -38,62 +38,34 @@ func main() { } func run() error { - cfg := config.NewConfig() - fs := flag.NewFlagSet("lightning-ctl", flag.ExitOnError) - - fs.StringVar(&cfg.ConfigFile, "config", "", "tidb-lightning configuration file") + var ( + compact *bool + mode, flagImportEngine, flagCleanupEngine *string + cpRemove, cpErrIgnore, cpErrDestroy, cpDump *string - logLevel := fs.String("L", "", `log level: info, debug, warn, error, fatal (default "info")`) - logFilePath := fs.String("log-file", "", "log file path") - tidbHost := fs.String("tidb-host", "", "TiDB server host") - tidbPort := fs.Int("tidb-port", 0, "TiDB server port (default 4000)") - tidbUser := fs.String("tidb-user", "", "TiDB user name to connect") - pdAddr := fs.String("pd-urls", "", "PD endpoint address") - importerAddr := fs.String("importer", "", "address (host:port) to connect to tikv-importer") + fsUsage func() + ) - compact := fs.Bool("compact", false, "do manual compaction on the target cluster") - mode := fs.String("switch-mode", "", "switch tikv into import mode or normal mode, values can be ['import', 'normal']") + globalCfg := config.Must(config.LoadGlobalConfig(os.Args[1:], func(fs *flag.FlagSet) { + compact = fs.Bool("compact", false, "do manual compaction on the target cluster") + mode = fs.String("switch-mode", "", "switch tikv into import mode or normal mode, values can be ['import', 'normal']") - flagImportEngine := fs.String("import-engine", "", "manually import a closed engine (value can be '`db`.`table`:123' or a UUID") - flagCleanupEngine := fs.String("cleanup-engine", "", "manually delete a closed engine") + flagImportEngine = fs.String("import-engine", "", "manually import a closed engine (value can be '`db`.`table`:123' or a UUID") + flagCleanupEngine = fs.String("cleanup-engine", "", "manually delete a closed engine") - cpRemove := fs.String("checkpoint-remove", "", "remove the checkpoint associated with the given table (value can be 'all' or '`db`.`table`')") - cpErrIgnore := fs.String("checkpoint-error-ignore", "", "ignore errors encoutered previously on the given table (value can be 'all' or '`db`.`table`'); may corrupt this table if used incorrectly") - cpErrDestroy := fs.String("checkpoint-error-destroy", "", "deletes imported data with table which has an error before (value can be 'all' or '`db`.`table`')") - cpDump := fs.String("checkpoint-dump", "", "dump the checkpoint information as two CSV files in the given folder") + cpRemove = fs.String("checkpoint-remove", "", "remove the checkpoint associated with the given table (value can be 'all' or '`db`.`table`')") + cpErrIgnore = fs.String("checkpoint-error-ignore", "", "ignore errors encoutered previously on the given table (value can be 'all' or '`db`.`table`'); may corrupt this table if used incorrectly") + cpErrDestroy = fs.String("checkpoint-error-destroy", "", "deletes imported data with table which has an error before (value can be 'all' or '`db`.`table`')") + cpDump = fs.String("checkpoint-dump", "", "dump the checkpoint information as two CSV files in the given folder") - err := fs.Parse(os.Args[1:]) - if err == nil { - err = cfg.Load() - } - if err != nil { - return errors.Trace(err) - } + fsUsage = fs.Usage + })) - if *logLevel != "" { - cfg.App.Config.Level = *logLevel - } - if *logFilePath != "" { - cfg.App.Config.File = *logFilePath - } - if *tidbHost != "" { - cfg.TiDB.Host = *tidbHost - } - if *tidbPort != 0 { - cfg.TiDB.Port = *tidbPort - } - if *tidbUser != "" { - cfg.TiDB.User = *tidbUser - } - if *pdAddr != "" { - cfg.TiDB.PdAddr = *pdAddr - } - if *importerAddr != "" { - cfg.TikvImporter.Addr = *importerAddr + cfg := config.NewConfig() + if err := cfg.LoadFromGlobal(globalCfg); err != nil { + return err } - - err = cfg.Adjust() - if err != nil { + if err := cfg.Adjust(); err != nil { return err } @@ -125,7 +97,7 @@ func run() error { return errors.Trace(checkpointDump(ctx, cfg, *cpDump)) } - fs.Usage() + fsUsage() return nil } diff --git a/cmd/tidb-lightning/main.go b/cmd/tidb-lightning/main.go index 6f5a6e53d2bf8..9bf6100c6ef89 100644 --- a/cmd/tidb-lightning/main.go +++ b/cmd/tidb-lightning/main.go @@ -14,40 +14,20 @@ package main import ( - "flag" "fmt" _ "net/http/pprof" "os" "os/signal" "syscall" - "github.com/pingcap/errors" "github.com/pingcap/tidb-lightning/lightning" "github.com/pingcap/tidb-lightning/lightning/config" "github.com/pingcap/tidb-lightning/lightning/log" - plan "github.com/pingcap/tidb/planner/core" "go.uber.org/zap" ) -func setGlobalVars() { - // hardcode it - plan.SetPreparedPlanCache(true) - plan.PreparedPlanCacheCapacity = 10 -} - func main() { - setGlobalVars() - - cfg, err := config.LoadConfig(os.Args[1:]) - switch errors.Cause(err) { - case nil: - case flag.ErrHelp: - os.Exit(0) - default: - fmt.Println("Failed to parse command flags: ", err) - os.Exit(1) - } - + cfg := config.Must(config.LoadGlobalConfig(os.Args[1:], nil)) app := lightning.New(cfg) sc := make(chan os.Signal, 1) @@ -63,7 +43,14 @@ func main() { app.Stop() }() - err = app.Run() + go app.Serve() + + var err error + if cfg.App.ServerMode { + err = app.RunServer() + } else { + err = app.RunOnce() + } logger := log.L() if err != nil { logger.Error("tidb lightning encountered error", zap.Error(err)) diff --git a/lightning/common/util_test.go b/lightning/common/util_test.go index 4e7c26ef52396..d12f1bbd4ae68 100644 --- a/lightning/common/util_test.go +++ b/lightning/common/util_test.go @@ -22,6 +22,7 @@ import ( "net/http" "net/http/httptest" "testing" + "time" "github.com/go-sql-driver/mysql" . "github.com/pingcap/check" @@ -65,10 +66,13 @@ func (s *utilSuite) TestGetJSON(c *C) { handle(res, req) })) defer testServer.Close() + + client := &http.Client{Timeout: time.Second} + response := TestPayload{} - err := common.GetJSON(http.DefaultClient, "http://not-exists", &response) + err := common.GetJSON(client, "http://not-exists", &response) c.Assert(err, NotNil) - err = common.GetJSON(http.DefaultClient, testServer.URL, &response) + err = common.GetJSON(client, testServer.URL, &response) c.Assert(err, IsNil) c.Assert(request, DeepEquals, response) @@ -76,7 +80,7 @@ func (s *utilSuite) TestGetJSON(c *C) { handle = func(res http.ResponseWriter, req *http.Request) { res.WriteHeader(http.StatusNoContent) } - err = common.GetJSON(http.DefaultClient, testServer.URL, &response) + err = common.GetJSON(client, testServer.URL, &response) c.Assert(err, NotNil) c.Assert(err, ErrorMatches, ".*http status code != 200.*") } diff --git a/lightning/config/config.go b/lightning/config/config.go index 1b72338f15dab..ce1f4c416fcf6 100644 --- a/lightning/config/config.go +++ b/lightning/config/config.go @@ -15,9 +15,7 @@ package config import ( "encoding/json" - "flag" "fmt" - "io/ioutil" "net/http" "runtime" "strings" @@ -50,7 +48,6 @@ type DBStore struct { StatusPort int `toml:"status-port" json:"status-port"` PdAddr string `toml:"pd-addr" json:"pd-addr"` StrSQLMode string `toml:"sql-mode" json:"sql-mode"` - LogLevel string `toml:"log-level" json:"log-level"` SQLMode mysql.SQLMode `toml:"-" json:"-"` @@ -61,11 +58,11 @@ type DBStore struct { } type Config struct { + TaskID int64 `toml:"-" json:"id"` + App Lightning `toml:"lightning" json:"lightning"` TiDB DBStore `toml:"tidb" json:"tidb"` - // not implemented yet. - // ProgressStore DBStore `toml:"progress-store" json:"progress-store"` Checkpoint Checkpoint `toml:"checkpoint" json:"checkpoint"` Mydumper MydumperRuntime `toml:"mydumper" json:"mydumper"` BWList *filter.Rules `toml:"black-white-list" json:"black-white-list"` @@ -73,9 +70,6 @@ type Config struct { PostRestore PostRestore `toml:"post-restore" json:"post-restore"` Cron Cron `toml:"cron" json:"cron"` Routes []*router.TableRule `toml:"routes" json:"routes"` - - // command line flags - ConfigFile string `json:"config-file"` } func (c *Config) String() string { @@ -87,12 +81,10 @@ func (c *Config) String() string { } type Lightning struct { - log.Config TableConcurrency int `toml:"table-concurrency" json:"table-concurrency"` IndexConcurrency int `toml:"index-concurrency" json:"index-concurrency"` RegionConcurrency int `toml:"region-concurrency" json:"region-concurrency"` IOConcurrency int `toml:"io-concurrency" json:"io-concurrency"` - ProfilePort int `toml:"pprof-port" json:"pprof-port"` CheckRequirements bool `toml:"check-requirements" json:"check-requirements"` } @@ -171,7 +163,6 @@ func NewConfig() *Config { Host: "127.0.0.1", User: "root", StatusPort: 10080, - LogLevel: "error", StrSQLMode: mysql.DefaultSQLMode, BuildStatsConcurrency: 20, DistSQLScanConcurrency: 100, @@ -186,6 +177,7 @@ func NewConfig() *Config { ReadBlockSize: ReadBlockSize, CSV: CSVConfig{ Separator: ",", + Delimiter: `"`, }, }, PostRestore: PostRestore{ @@ -194,89 +186,30 @@ func NewConfig() *Config { } } -func LoadConfig(args []string) (*Config, error) { - cfg := NewConfig() - - fs := flag.NewFlagSet("lightning", flag.ContinueOnError) - - // if both `-c` and `-config` are specified, the last one in the command line will take effect. - // the default value is assigned immediately after the StringVar() call, - // so it is fine to not give any default value for `-c`, to keep the `-h` page clean. - fs.StringVar(&cfg.ConfigFile, "c", "", "(deprecated alias of -config)") - fs.StringVar(&cfg.ConfigFile, "config", "", "tidb-lightning configuration file") - printVersion := fs.Bool("V", false, "print version of lightning") - - logLevel := fs.String("L", "", `log level: info, debug, warn, error, fatal (default "info")`) - logFilePath := fs.String("log-file", "", "log file path") - tidbHost := fs.String("tidb-host", "", "TiDB server host") - tidbPort := fs.Int("tidb-port", 0, "TiDB server port (default 4000)") - tidbUser := fs.String("tidb-user", "", "TiDB user name to connect") - tidbStatusPort := fs.Int("tidb-status", 0, "TiDB server status port (default 10080)") - pdAddr := fs.String("pd-urls", "", "PD endpoint address") - dataSrcPath := fs.String("d", "", "Directory of the dump to import") - importerAddr := fs.String("importer", "", "address (host:port) to connect to tikv-importer") - - if err := fs.Parse(args); err != nil { - return nil, errors.Trace(err) - } - if *printVersion { - fmt.Println(common.GetRawInfo()) - return nil, flag.ErrHelp - } - - if err := cfg.Load(); err != nil { - return nil, errors.Trace(err) - } - - if *logLevel != "" { - cfg.App.Config.Level = *logLevel - } - if *logFilePath != "" { - cfg.App.Config.File = *logFilePath - } - if *tidbHost != "" { - cfg.TiDB.Host = *tidbHost - } - if *tidbPort != 0 { - cfg.TiDB.Port = *tidbPort - } - if *tidbStatusPort != 0 { - cfg.TiDB.StatusPort = *tidbStatusPort - } - if *tidbUser != "" { - cfg.TiDB.User = *tidbUser - } - if *pdAddr != "" { - cfg.TiDB.PdAddr = *pdAddr - } - if *dataSrcPath != "" { - cfg.Mydumper.SourceDir = *dataSrcPath - } - if *importerAddr != "" { - cfg.TikvImporter.Addr = *importerAddr +// LoadFromGlobal resets the current configuration to the global settings. +func (cfg *Config) LoadFromGlobal(global *GlobalConfig) error { + if err := cfg.LoadFromTOML(global.ConfigFileContent); err != nil { + return err } - if err := cfg.Adjust(); err != nil { - return nil, err - } + cfg.TiDB.Host = global.TiDB.Host + cfg.TiDB.Port = global.TiDB.Port + cfg.TiDB.User = global.TiDB.User + cfg.TiDB.StatusPort = global.TiDB.StatusPort + cfg.TiDB.PdAddr = global.TiDB.PdAddr + cfg.Mydumper.SourceDir = global.Mydumper.SourceDir + cfg.TikvImporter.Addr = global.TikvImporter.Addr - return cfg, nil + return nil } -func (cfg *Config) Load() error { - // use standard config if unspecified. - if cfg.ConfigFile == "" { - return nil - } - - data, err := ioutil.ReadFile(cfg.ConfigFile) - if err != nil { - return errors.Trace(err) - } - if err = toml.Unmarshal(data, cfg); err != nil { - return errors.Trace(err) - } +// LoadFromTOML overwrites the current configuration by the TOML data +func (cfg *Config) LoadFromTOML(data []byte) error { + return errors.Trace(toml.Unmarshal(data, cfg)) +} +// Adjust fixes the invalid or unspecified settings to reasonable valid values. +func (cfg *Config) Adjust() error { // Reject problematic CSV configurations. csv := &cfg.Mydumper.CSV if len(csv.Separator) != 1 { @@ -300,6 +233,7 @@ func (cfg *Config) Load() error { } } + var err error cfg.TiDB.SQLMode, err = mysql.GetSQLMode(cfg.TiDB.StrSQLMode) if err != nil { return errors.Annotate(err, "invalid config: `mydumper.tidb.sql_mode` must be a valid SQL_MODE") @@ -314,13 +248,6 @@ func (cfg *Config) Load() error { } } - return nil -} - -// Adjust fixes the invalid or unspecified settings to reasonable valid values. -func (cfg *Config) Adjust() error { - cfg.App.Config.Adjust() - // automatically determine the TiDB port & PD address from TiDB settings if cfg.TiDB.Port <= 0 || len(cfg.TiDB.PdAddr) == 0 { resp, err := http.Get(fmt.Sprintf("http://%s:%d/settings", cfg.TiDB.Host, cfg.TiDB.StatusPort)) diff --git a/lightning/config/config_test.go b/lightning/config/config_test.go index 341105fb00581..d0c36ab802475 100644 --- a/lightning/config/config_test.go +++ b/lightning/config/config_test.go @@ -16,12 +16,10 @@ package config_test import ( "flag" "fmt" - "io/ioutil" "net" "net/http" "net/http/httptest" "net/url" - "path" "regexp" "strconv" "testing" @@ -173,9 +171,6 @@ func (s *configTestSuite) TestAdjustWillBatchImportRatioInvalid(c *C) { } func (s *configTestSuite) TestInvalidCSV(c *C) { - d := c.MkDir() - p := path.Join(d, "cfg.toml") - testCases := []struct { input string err string @@ -260,14 +255,6 @@ func (s *configTestSuite) TestInvalidCSV(c *C) { `, err: "invalid config: cannot use '\\' as CSV delimiter when `mydumper.csv.backslash-escape` is true", }, - { - input: ` - invalid[mydumper.csv] - delimiter = '\' - backslash-escape = true - `, - err: "Near line 0 (last key parsed ''): bare keys cannot contain '['", - }, { input: ` [tidb] @@ -297,11 +284,12 @@ func (s *configTestSuite) TestInvalidCSV(c *C) { comment := Commentf("input = %s", tc.input) cfg := config.NewConfig() - cfg.ConfigFile = p - err := ioutil.WriteFile(p, []byte(tc.input), 0644) - c.Assert(err, IsNil, comment) + cfg.TiDB.Port = 4000 + cfg.TiDB.PdAddr = "test.invalid:2379" + err := cfg.LoadFromTOML([]byte(tc.input)) + c.Assert(err, IsNil) - err = cfg.Load() + err = cfg.Adjust() if tc.err != "" { c.Assert(err, ErrorMatches, regexp.QuoteMeta(tc.err), comment) } else { @@ -310,6 +298,16 @@ func (s *configTestSuite) TestInvalidCSV(c *C) { } } +func (s *configTestSuite) TestInvalidTOML(c *C) { + cfg := &config.Config{} + err := cfg.LoadFromTOML([]byte(` + invalid[mydumper.csv] + delimiter = '\' + backslash-escape = true + `)) + c.Assert(err, ErrorMatches, regexp.QuoteMeta("Near line 0 (last key parsed ''): bare keys cannot contain '['")) +} + func (s *configTestSuite) TestDurationUnmarshal(c *C) { duration := config.Duration{} err := duration.UnmarshalText([]byte("13m20s")) @@ -330,23 +328,23 @@ func (s *configTestSuite) TestDurationMarshalJSON(c *C) { } func (s *configTestSuite) TestLoadConfig(c *C) { - cfg, err := config.LoadConfig([]string{"-tidb-port", "sss"}) + cfg, err := config.LoadGlobalConfig([]string{"-tidb-port", "sss"}, nil) c.Assert(err, ErrorMatches, `invalid value "sss" for flag -tidb-port: parse error`) c.Assert(cfg, IsNil) - cfg, err = config.LoadConfig([]string{"-V"}) + cfg, err = config.LoadGlobalConfig([]string{"-V"}, nil) c.Assert(err, Equals, flag.ErrHelp) c.Assert(cfg, IsNil) - cfg, err = config.LoadConfig([]string{"-config", "not-exists"}) + cfg, err = config.LoadGlobalConfig([]string{"-config", "not-exists"}, nil) c.Assert(err, ErrorMatches, ".*no such file or directory.*") c.Assert(cfg, IsNil) - cfg, err = config.LoadConfig([]string{"-tidb-status", "111111"}) - c.Assert(err, ErrorMatches, "cannot fetch settings from TiDB.*") + cfg, err = config.LoadGlobalConfig([]string{"--server-mode"}, nil) + c.Assert(err, ErrorMatches, "If server-mode is enabled, the status-addr must be a valid listen address") c.Assert(cfg, IsNil) - cfg, err = config.LoadConfig([]string{ + cfg, err = config.LoadGlobalConfig([]string{ "-L", "debug", "-log-file", "/path/to/file.log", "-tidb-host", "172.16.30.11", @@ -355,7 +353,7 @@ func (s *configTestSuite) TestLoadConfig(c *C) { "-pd-urls", "172.16.30.11:2379,172.16.30.12:2379", "-d", "/path/to/import", "-importer", "172.16.30.11:23008", - }) + }, nil) c.Assert(err, IsNil) c.Assert(cfg.App.Config.Level, Equals, "debug") c.Assert(cfg.App.Config.File, Equals, "/path/to/file.log") @@ -366,12 +364,24 @@ func (s *configTestSuite) TestLoadConfig(c *C) { c.Assert(cfg.Mydumper.SourceDir, Equals, "/path/to/import") c.Assert(cfg.TikvImporter.Addr, Equals, "172.16.30.11:23008") - cfg.Checkpoint.DSN = "" - cfg.Checkpoint.Driver = "mysql" - err = cfg.Adjust() + taskCfg := config.NewConfig() + err = taskCfg.LoadFromGlobal(cfg) + c.Assert(err, IsNil) + + taskCfg.Checkpoint.DSN = "" + taskCfg.Checkpoint.Driver = "mysql" + err = taskCfg.Adjust() c.Assert(err, IsNil) - c.Assert(cfg.Checkpoint.DSN, Equals, "guest:@tcp(172.16.30.11:4001)/?charset=utf8") + c.Assert(taskCfg.Checkpoint.DSN, Equals, "guest:@tcp(172.16.30.11:4001)/?charset=utf8") - result := cfg.String() + result := taskCfg.String() c.Assert(result, Matches, `.*"pd-addr":"172.16.30.11:2379,172.16.30.12:2379".*`) } + +func (s *configTestSuite) TestLoadFromInvalidConfig(c *C) { + taskCfg := config.NewConfig() + err := taskCfg.LoadFromGlobal(&config.GlobalConfig { + ConfigFileContent: []byte("invalid toml"), + }) + c.Assert(err, ErrorMatches, "Near line 1.*") +} diff --git a/lightning/config/configlist.go b/lightning/config/configlist.go new file mode 100644 index 0000000000000..46bc19a1d5e77 --- /dev/null +++ b/lightning/config/configlist.go @@ -0,0 +1,116 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "container/list" + "context" + "sync" + "time" +) + +// ConfigList is a goroutine-safe FIFO list of *Config, which supports removal +// from the middle. The list is not expected to be very long. +type ConfigList struct { + cond *sync.Cond + taskIDMap map[int64]*list.Element + nodes list.List +} + +// NewConfigList creates a new ConfigList instance. +func NewConfigList() *ConfigList { + return &ConfigList{ + cond: sync.NewCond(new(sync.Mutex)), + taskIDMap: make(map[int64]*list.Element), + } +} + +// Push adds a configuration to the end of the list. The field `cfg.TaskID` will +// be modified to include a unique ID to identify this task. +func (cl *ConfigList) Push(cfg *Config) { + id := time.Now().UnixNano() + cl.cond.L.Lock() + defer cl.cond.L.Unlock() + cfg.TaskID = id + cl.taskIDMap[id] = cl.nodes.PushBack(cfg) + cl.cond.Broadcast() +} + +// Pop removes a configuration from the front of the list. If the list is empty, +// this method will block until either another goroutines calls Push() or the +// input context expired. +// +// If the context expired, the error field will contain the error from context. +func (cl *ConfigList) Pop(ctx context.Context) (*Config, error) { + res := make(chan *Config) + + go func() { + cl.cond.L.Lock() + defer cl.cond.L.Unlock() + for { + if front := cl.nodes.Front(); front != nil { + cfg := front.Value.(*Config) + delete(cl.taskIDMap, cfg.TaskID) + cl.nodes.Remove(front) + res <- cfg + break + } + cl.cond.Wait() + } + }() + + select { + case cfg := <-res: + return cfg, nil + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +// Remove removes a task from the list given its task ID. Returns true if a task +// is successfully removed, false if the task ID did not exist. +func (cl *ConfigList) Remove(taskID int64) bool { + cl.cond.L.Lock() + defer cl.cond.L.Unlock() + element, ok := cl.taskIDMap[taskID] + if !ok { + return false + } + delete(cl.taskIDMap, taskID) + cl.nodes.Remove(element) + return true +} + +// Get obtains a task from the list given its task ID. If the task ID did not +// exist, the returned bool field will be false. +func (cl *ConfigList) Get(taskID int64) (*Config, bool) { + cl.cond.L.Lock() + defer cl.cond.L.Unlock() + element, ok := cl.taskIDMap[taskID] + if !ok { + return nil, false + } + return element.Value.(*Config), true +} + +// AllIDs returns a list of all task IDs in the list. +func (cl *ConfigList) AllIDs() []int64 { + cl.cond.L.Lock() + defer cl.cond.L.Unlock() + res := make([]int64, 0, len(cl.taskIDMap)) + for taskID := range cl.taskIDMap { + res = append(res, taskID) + } + return res +} diff --git a/lightning/config/configlist_test.go b/lightning/config/configlist_test.go new file mode 100644 index 0000000000000..81f610f3987b3 --- /dev/null +++ b/lightning/config/configlist_test.go @@ -0,0 +1,104 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config_test + +import ( + "context" + "time" + + . "github.com/pingcap/check" + "github.com/pingcap/tidb-lightning/lightning/config" +) + +var _ = Suite(&configListTestSuite{}) + +type configListTestSuite struct{} + +func (s *configListTestSuite) TestNormalPushPop(c *C) { + cl := config.NewConfigList() + + cl.Push(&config.Config{TikvImporter: config.TikvImporter{Addr: "1.1.1.1:1111"}}) + cl.Push(&config.Config{TikvImporter: config.TikvImporter{Addr: "2.2.2.2:2222"}}) + + startTime := time.Now() + cfg, err := cl.Pop(context.Background()) // these two should never block. + c.Assert(time.Since(startTime), Less, 100*time.Millisecond) + c.Assert(err, IsNil) + c.Assert(cfg.TikvImporter.Addr, Equals, "1.1.1.1:1111") + + startTime = time.Now() + cfg, err = cl.Pop(context.Background()) + c.Assert(time.Since(startTime), Less, 100*time.Millisecond) + c.Assert(err, IsNil) + c.Assert(cfg.TikvImporter.Addr, Equals, "2.2.2.2:2222") + + go func() { + time.Sleep(400 * time.Millisecond) + cl.Push(&config.Config{TikvImporter: config.TikvImporter{Addr: "3.3.3.3:3333"}}) + }() + + startTime = time.Now() + cfg, err = cl.Pop(context.Background()) // this should block for ≥400ms + c.Assert(time.Since(startTime), GreaterEqual, 400*time.Millisecond) + c.Assert(err, IsNil) + c.Assert(cfg.TikvImporter.Addr, Equals, "3.3.3.3:3333") +} + +func (s *configListTestSuite) TestContextCancel(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + cl := config.NewConfigList() + + go func() { + time.Sleep(400 * time.Millisecond) + cancel() + }() + + startTime := time.Now() + _, err := cl.Pop(ctx) + c.Assert(time.Since(startTime), GreaterEqual, 400*time.Millisecond) + c.Assert(err, Equals, context.Canceled) +} + +func (s *configListTestSuite) TestGetRemove(c *C) { + cl := config.NewConfigList() + + cfg1 := &config.Config{TikvImporter: config.TikvImporter{Addr: "1.1.1.1:1111"}} + cl.Push(cfg1) + cfg2 := &config.Config{TikvImporter: config.TikvImporter{Addr: "2.2.2.2:2222"}} + cl.Push(cfg2) + cfg3 := &config.Config{TikvImporter: config.TikvImporter{Addr: "3.3.3.3:3333"}} + cl.Push(cfg3) + + cfg, ok := cl.Get(cfg2.TaskID) + c.Assert(ok, IsTrue) + c.Assert(cfg, Equals, cfg2) + _, ok = cl.Get(cfg3.TaskID + 1000) + c.Assert(ok, IsFalse) + + ok = cl.Remove(cfg2.TaskID) + c.Assert(ok, IsTrue) + ok = cl.Remove(cfg3.TaskID + 1000) + c.Assert(ok, IsFalse) + _, ok = cl.Get(cfg2.TaskID) + c.Assert(ok, IsFalse) + + var err error + cfg, err = cl.Pop(context.Background()) + c.Assert(err, IsNil) + c.Assert(cfg, Equals, cfg1) + + cfg, err = cl.Pop(context.Background()) + c.Assert(err, IsNil) + c.Assert(cfg, Equals, cfg3) +} diff --git a/lightning/config/global.go b/lightning/config/global.go new file mode 100644 index 0000000000000..0356cb896152f --- /dev/null +++ b/lightning/config/global.go @@ -0,0 +1,184 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "flag" + "fmt" + "io/ioutil" + "os" + + "github.com/BurntSushi/toml" + "github.com/pingcap/errors" + "github.com/pingcap/tidb-lightning/lightning/common" + "github.com/pingcap/tidb-lightning/lightning/log" +) + +type GlobalLightning struct { + log.Config + StatusAddr string `toml:"status-addr" json:"status-addr"` + ServerMode bool `toml:"server-mode" json:"server-mode"` + + // The legacy alias for setting "status-addr". The value should always the + // same as StatusAddr, and will not be published in the JSON encoding. + PProfPort int `toml:"pprof-port" json:"-"` +} + +type GlobalTiDB struct { + Host string `toml:"host" json:"host"` + Port int `toml:"port" json:"port"` + User string `toml:"user" json:"user"` + StatusPort int `toml:"status-port" json:"status-port"` + PdAddr string `toml:"pd-addr" json:"pd-addr"` + LogLevel string `toml:"log-level" json:"log-level"` +} + +type GlobalMydumper struct { + SourceDir string `toml:"data-source-dir" json:"data-source-dir"` +} + +type GlobalImporter struct { + Addr string `toml:"addr" json:"addr"` +} + +type GlobalConfig struct { + App GlobalLightning `toml:"lightning" json:"lightning"` + TiDB GlobalTiDB `toml:"tidb" json:"tidb"` + Mydumper GlobalMydumper `toml:"mydumper" json:"mydumper"` + TikvImporter GlobalImporter `toml:"tikv-importer" json:"tikv-importer"` + + ConfigFileContent []byte +} + +func NewGlobalConfig() *GlobalConfig { + return &GlobalConfig{ + App: GlobalLightning{ + ServerMode: false, + }, + TiDB: GlobalTiDB{ + Host: "127.0.0.1", + User: "root", + StatusPort: 10080, + LogLevel: "error", + }, + } +} + +// Must should be called after LoadGlobalConfig(). If LoadGlobalConfig() returns +// any error, this function will exit the program with an appropriate exit code. +func Must(cfg *GlobalConfig, err error) *GlobalConfig { + switch errors.Cause(err) { + case nil: + case flag.ErrHelp: + os.Exit(0) + default: + fmt.Println("Failed to parse command flags: ", err) + os.Exit(2) + } + return cfg +} + +// LoadGlobalConfig reads the arguments and fills in the GlobalConfig. +func LoadGlobalConfig(args []string, extraFlags func(*flag.FlagSet)) (*GlobalConfig, error) { + cfg := NewGlobalConfig() + fs := flag.NewFlagSet("", flag.ContinueOnError) + + // if both `-c` and `-config` are specified, the last one in the command line will take effect. + // the default value is assigned immediately after the StringVar() call, + // so it is fine to not give any default value for `-c`, to keep the `-h` page clean. + var configFilePath string + fs.StringVar(&configFilePath, "c", "", "(deprecated alias of -config)") + fs.StringVar(&configFilePath, "config", "", "tidb-lightning configuration file") + printVersion := fs.Bool("V", false, "print version of lightning") + + logLevel := fs.String("L", "", `log level: info, debug, warn, error, fatal (default "info")`) + logFilePath := fs.String("log-file", "", "log file path") + tidbHost := fs.String("tidb-host", "", "TiDB server host") + tidbPort := fs.Int("tidb-port", 0, "TiDB server port (default 4000)") + tidbUser := fs.String("tidb-user", "", "TiDB user name to connect") + tidbStatusPort := fs.Int("tidb-status", 0, "TiDB server status port (default 10080)") + pdAddr := fs.String("pd-urls", "", "PD endpoint address") + dataSrcPath := fs.String("d", "", "Directory of the dump to import") + importerAddr := fs.String("importer", "", "address (host:port) to connect to tikv-importer") + + statusAddr := fs.String("status-addr", "", "the Lightning server address") + serverMode := fs.Bool("server-mode", false, "start Lightning in server mode, wait for multiple tasks instead of starting immediately") + + if extraFlags != nil { + extraFlags(fs) + } + + if err := fs.Parse(args); err != nil { + return nil, errors.Trace(err) + } + if *printVersion { + fmt.Println(common.GetRawInfo()) + return nil, flag.ErrHelp + } + + if len(configFilePath) > 0 { + data, err := ioutil.ReadFile(configFilePath) + if err != nil { + return nil, errors.Annotatef(err, "Cannot read config file `%s`", configFilePath) + } + if err = toml.Unmarshal(data, cfg); err != nil { + return nil, errors.Annotatef(err, "Cannot parse config file `%s`", configFilePath) + } + cfg.ConfigFileContent = data + } + + if *logLevel != "" { + cfg.App.Config.Level = *logLevel + } + if *logFilePath != "" { + cfg.App.Config.File = *logFilePath + } + if *tidbHost != "" { + cfg.TiDB.Host = *tidbHost + } + if *tidbPort != 0 { + cfg.TiDB.Port = *tidbPort + } + if *tidbStatusPort != 0 { + cfg.TiDB.StatusPort = *tidbStatusPort + } + if *tidbUser != "" { + cfg.TiDB.User = *tidbUser + } + if *pdAddr != "" { + cfg.TiDB.PdAddr = *pdAddr + } + if *dataSrcPath != "" { + cfg.Mydumper.SourceDir = *dataSrcPath + } + if *importerAddr != "" { + cfg.TikvImporter.Addr = *importerAddr + } + if *serverMode { + cfg.App.ServerMode = true + } + if *statusAddr != "" { + cfg.App.StatusAddr = *statusAddr + } + if cfg.App.StatusAddr == "" && cfg.App.PProfPort != 0 { + cfg.App.StatusAddr = fmt.Sprintf(":%d", cfg.App.PProfPort) + } + + if cfg.App.StatusAddr == "" && cfg.App.ServerMode { + return nil, errors.New("If server-mode is enabled, the status-addr must be a valid listen address") + } + + cfg.App.Config.Adjust() + return cfg, nil +} diff --git a/lightning/lightning.go b/lightning/lightning.go index c7fe571db0887..5147e6ec4e51b 100644 --- a/lightning/lightning.go +++ b/lightning/lightning.go @@ -15,12 +15,14 @@ package lightning import ( "context" + "encoding/json" "fmt" + "io/ioutil" "net/http" - "runtime" - "sync" + "os" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/prometheus/client_golang/prometheus/promhttp" "go.uber.org/zap" @@ -32,67 +34,97 @@ import ( ) type Lightning struct { - cfg *config.Config - ctx context.Context - shutdown context.CancelFunc + globalCfg *config.GlobalConfig + taskCfgs *config.ConfigList + ctx context.Context + shutdown context.CancelFunc + server http.Server +} - wg sync.WaitGroup +func initEnv(cfg *config.GlobalConfig) error { + return log.InitLogger(&cfg.App.Config, cfg.TiDB.LogLevel) } -func initEnv(cfg *config.Config) error { - if err := log.InitLogger(&cfg.App.Config, cfg.TiDB.LogLevel); err != nil { - return errors.Trace(err) +func New(globalCfg *config.GlobalConfig) *Lightning { + if err := initEnv(globalCfg); err != nil { + fmt.Println("Failed to initialize environment:", err) + os.Exit(1) } - if cfg.App.ProfilePort > 0 { - go func() { - http.Handle("/metrics", promhttp.Handler()) - err := http.ListenAndServe(fmt.Sprintf(":%d", cfg.App.ProfilePort), nil) - log.L().Info("stopped HTTP server", log.ShortError(err)) - }() + ctx, shutdown := context.WithCancel(context.Background()) + return &Lightning{ + globalCfg: globalCfg, + ctx: ctx, + shutdown: shutdown, } - - return nil } -func New(cfg *config.Config) *Lightning { - initEnv(cfg) +func (l *Lightning) Serve() { + if len(l.globalCfg.App.StatusAddr) == 0 { + return + } + + http.Handle("/metrics", promhttp.Handler()) + http.HandleFunc("/tasks", func(w http.ResponseWriter, req *http.Request) { + l.handleTask(w, req) + }) - ctx, shutdown := context.WithCancel(context.Background()) + l.server.Addr = l.globalCfg.App.StatusAddr + err := l.server.ListenAndServe() + log.L().Info("stopped HTTP server", log.ShortError(err)) +} - return &Lightning{ - cfg: cfg, - ctx: ctx, - shutdown: shutdown, +// Run Lightning using the global config as the same as the task config. +func (l *Lightning) RunOnce() error { + cfg := config.NewConfig() + if err := cfg.LoadFromGlobal(l.globalCfg); err != nil { + return err + } + if err := cfg.Adjust(); err != nil { + return err } + return l.run(cfg) } -func (l *Lightning) Run() error { - runtime.GOMAXPROCS(runtime.NumCPU()) - common.PrintInfo("lightning", func() { - log.L().Info("cfg", zap.Stringer("cfg", l.cfg)) +func (l *Lightning) RunServer() error { + l.taskCfgs = config.NewConfigList() + log.L().Info("Lightning server is running, post to /tasks to start an import task") + + for { + task, err := l.taskCfgs.Pop(l.ctx) + if err != nil { + return err + } + err = l.run(task) + if err != nil { + log.L().Error("tidb lightning encountered error", zap.Error(err)) + } + } +} + +var taskCfgRecorderKey struct{} + +func (l *Lightning) run(taskCfg *config.Config) error { + failpoint.Inject("SkipRunTask", func() error { + if recorder, ok := l.ctx.Value(&taskCfgRecorderKey).(chan *config.Config); ok { + recorder <- taskCfg + } + return nil }) - l.wg.Add(1) - var err error - go func() { - defer l.wg.Done() - err = l.run() - }() - l.wg.Wait() - return errors.Trace(err) -} + common.PrintInfo("lightning", func() { + log.L().Info("cfg", zap.Stringer("cfg", taskCfg)) + }) -func (l *Lightning) run() error { loadTask := log.L().Begin(zap.InfoLevel, "load data source") - mdl, err := mydump.NewMyDumpLoader(l.cfg) + mdl, err := mydump.NewMyDumpLoader(taskCfg) loadTask.End(zap.ErrorLevel, err) if err != nil { return errors.Trace(err) } dbMetas := mdl.GetDatabases() - procedure, err := restore.NewRestoreController(l.ctx, dbMetas, l.cfg) + procedure, err := restore.NewRestoreController(l.ctx, dbMetas, taskCfg) if err != nil { log.L().Error("restore failed", log.ShortError(err)) return errors.Trace(err) @@ -105,6 +137,84 @@ func (l *Lightning) run() error { } func (l *Lightning) Stop() { + if err := l.server.Shutdown(l.ctx); err != nil { + log.L().Warn("failed to shutdown HTTP server", log.ShortError(err)) + } l.shutdown() - l.wg.Wait() +} + +func (l *Lightning) handleTask(w http.ResponseWriter, req *http.Request) { + w.Header().Set("Content-Type", "application/json") + + switch req.Method { + case http.MethodGet: + l.handleGetTask(w) + case http.MethodPost: + l.handlePostTask(w, req) + default: + w.Header().Set("Allow", http.MethodGet+", "+http.MethodPost) + w.WriteHeader(http.StatusMethodNotAllowed) + w.Write([]byte(`{"error":"only GET and POST are allowed"}`)) + } +} + +func (l *Lightning) handleGetTask(w http.ResponseWriter) { + var response struct { + Enabled bool `json:"enabled"` + QueuedIDs []int64 `json:"queue"` + } + + response.Enabled = l.taskCfgs != nil + if response.Enabled { + response.QueuedIDs = l.taskCfgs.AllIDs() + } + + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(response) +} + +func (l *Lightning) handlePostTask(w http.ResponseWriter, req *http.Request) { + w.Header().Set("Cache-Control", "no-store") + + type errorResponse struct { + Error string `json:"error"` + } + type taskResponse struct { + ID int64 `json:"id"` + } + + if l.taskCfgs == nil { + w.WriteHeader(http.StatusNotImplemented) + json.NewEncoder(w).Encode(errorResponse{Error: "server-mode not enabled"}) + return + } + + data, err := ioutil.ReadAll(req.Body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(errorResponse{Error: fmt.Sprintf("cannot read request: %v", err)}) + return + } + log.L().Debug("received task config", zap.ByteString("content", data)) + + cfg := config.NewConfig() + if err = cfg.LoadFromGlobal(l.globalCfg); err != nil { + w.WriteHeader(http.StatusInternalServerError) + json.NewEncoder(w).Encode(errorResponse{Error: fmt.Sprintf("cannot restore from global config: %v", err)}) + return + } + if err = cfg.LoadFromTOML(data); err != nil { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(errorResponse{Error: fmt.Sprintf("cannot parse task (must be TOML): %v", err)}) + return + } + if err = cfg.Adjust(); err != nil { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(errorResponse{Error: fmt.Sprintf("invalid task configuration: %v", err)}) + return + } + + l.taskCfgs.Push(cfg) + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(taskResponse{ID: cfg.TaskID}) } diff --git a/lightning/lightning_test.go b/lightning/lightning_test.go index 9d48d70a15ac8..1c15d98ae48f4 100644 --- a/lightning/lightning_test.go +++ b/lightning/lightning_test.go @@ -14,9 +14,16 @@ package lightning import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" "testing" + "time" . "github.com/pingcap/check" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb-lightning/lightning/config" ) @@ -29,36 +36,129 @@ func TestLightning(t *testing.T) { } func (s *lightningSuite) TestInitEnv(c *C) { - cfg := &config.Config{ - App: config.Lightning{ProfilePort: 45678}, + cfg := &config.GlobalConfig{ + App: config.GlobalLightning{StatusAddr: ":45678"}, } err := initEnv(cfg) c.Assert(err, IsNil) - cfg.App.ProfilePort = 0 + cfg.App.StatusAddr = "" cfg.App.Config.File = "." err = initEnv(cfg) c.Assert(err, ErrorMatches, "can't use directory as log file name") } func (s *lightningSuite) TestRun(c *C) { - cfg := &config.Config{} + cfg := config.NewGlobalConfig() + cfg.TiDB.Host = "test.invalid" + cfg.TiDB.Port = 4000 + cfg.TiDB.PdAddr = "test.invalid:2379" cfg.Mydumper.SourceDir = "not-exists" lightning := New(cfg) - err := lightning.Run() + err := lightning.RunOnce() c.Assert(err, ErrorMatches, ".*mydumper dir does not exist") - cfg.Mydumper.SourceDir = "." - cfg.Checkpoint.Enable = true - cfg.Checkpoint.Driver = "invalid" - lightning = New(cfg) - err = lightning.Run() + err = lightning.run(&config.Config{ + Mydumper: config.MydumperRuntime{SourceDir: "."}, + Checkpoint: config.Checkpoint{ + Enable: true, + Driver: "invalid", + }, + }) c.Assert(err, ErrorMatches, "Unknown checkpoint driver invalid") - cfg.Mydumper.SourceDir = "." - cfg.Checkpoint.Enable = true - cfg.Checkpoint.Driver = "file" - cfg.Checkpoint.DSN = "any-file" - lightning = New(cfg) - err = lightning.Run() + err = lightning.run(&config.Config{ + Mydumper: config.MydumperRuntime{SourceDir: "."}, + Checkpoint: config.Checkpoint{ + Enable: true, + Driver: "file", + DSN: "any-file", + }, + }) c.Assert(err, NotNil) } + +func (s *lightningSuite) TestRunServer(c *C) { + cfg := config.NewGlobalConfig() + cfg.TiDB.Host = "test.invalid" + cfg.TiDB.Port = 4000 + cfg.TiDB.PdAddr = "test.invalid:2379" + cfg.App.ServerMode = true + cfg.App.StatusAddr = "127.0.0.1:45678" + + lightning := New(cfg) + go lightning.Serve() + defer lightning.Stop() + + url := "http://127.0.0.1:45678/tasks" + + // Wait a bit until the server is really listening. + time.Sleep(500 * time.Millisecond) + + req, err := http.NewRequest(http.MethodPut, url, nil) + c.Assert(err, IsNil) + resp, err := http.DefaultClient.Do(req) + c.Assert(err, IsNil) + c.Assert(resp.StatusCode, Equals, http.StatusMethodNotAllowed) + c.Assert(resp.Header.Get("Allow"), Equals, http.MethodGet+", "+http.MethodPost) + resp.Body.Close() + + resp, err = http.Post(url, "application/toml", strings.NewReader("????")) + c.Assert(err, IsNil) + c.Assert(resp.StatusCode, Equals, http.StatusNotImplemented) + var data map[string]string + err = json.NewDecoder(resp.Body).Decode(&data) + c.Assert(err, IsNil) + c.Assert(data, HasKey, "error") + c.Assert(data["error"], Equals, "server-mode not enabled") + resp.Body.Close() + + go lightning.RunServer() + + resp, err = http.Post(url, "application/toml", strings.NewReader("????")) + c.Assert(err, IsNil) + c.Assert(resp.StatusCode, Equals, http.StatusBadRequest) + err = json.NewDecoder(resp.Body).Decode(&data) + c.Assert(err, IsNil) + c.Assert(data, HasKey, "error") + c.Assert(data["error"], Matches, "cannot parse task.*") + resp.Body.Close() + + resp, err = http.Post(url, "application/toml", strings.NewReader("[mydumper.csv]\nseparator = 'fooo'")) + c.Assert(err, IsNil) + c.Assert(resp.StatusCode, Equals, http.StatusBadRequest) + err = json.NewDecoder(resp.Body).Decode(&data) + c.Assert(err, IsNil) + c.Assert(data, HasKey, "error") + c.Assert(data["error"], Matches, "invalid task configuration:.*") + resp.Body.Close() + + taskCfgCh := make(chan *config.Config) + lightning.ctx = context.WithValue(lightning.ctx, &taskCfgRecorderKey, taskCfgCh) + failpoint.Enable("github.com/pingcap/tidb-lightning/lightning/SkipRunTask", "return") + defer failpoint.Disable("github.com/pingcap/tidb-lightning/lightning/SkipRunTask") + + for i := 0; i < 20; i++ { + resp, err = http.Post(url, "application/toml", strings.NewReader(fmt.Sprintf(` + [mydumper] + data-source-dir = 'demo-path-%d' + [mydumper.csv] + separator = '/' + `, i))) + c.Assert(err, IsNil) + c.Assert(resp.StatusCode, Equals, http.StatusOK) + var result map[string]int + err = json.NewDecoder(resp.Body).Decode(&result) + c.Assert(err, IsNil) + c.Assert(result, HasKey, "id") + resp.Body.Close() + + select { + case taskCfg := <-taskCfgCh: + c.Assert(taskCfg.TiDB.Host, Equals, "test.invalid") + c.Assert(taskCfg.Mydumper.SourceDir, Equals, fmt.Sprintf("demo-path-%d", i)) + c.Assert(taskCfg.Mydumper.CSV.Separator, Equals, "/") + case <-time.After(500 * time.Millisecond): + c.Fatalf("task is not queued after 500ms (i = %d)", i) + } + } +} diff --git a/tidb-lightning.toml b/tidb-lightning.toml index 068f899cbde51..c214590d1a29d 100644 --- a/tidb-lightning.toml +++ b/tidb-lightning.toml @@ -1,8 +1,18 @@ ### tidb-lightning configuartion [lightning] -# background profile for debuging ( 0 to disable ) -pprof-port = 8289 +# Listening address for the HTTP server (set to empty string to disable). +# The server is responsible for the web interface, submitting import tasks, +# serving Prometheus metrics and exposing debug profiling data. +status-addr = ":8289" + +# Toggle server mode. +# If "false", running Lightning will immediately start the import job, and exits +# after the job is finished. +# If "true", running Lightning will wait for user to submit tasks, via the HTTP API +# (`curl http://lightning-ip:8289/tasks --data-binary @tidb-lightning.toml`). +# The program will keep running and waiting for more tasks, until receiving the SIGINT signal. +server-mode = false # check if the cluster satisfies the minimum requirement before starting # check-requirements = true