From ecd4397dd6425ce7cfc0c9d34954a52627ae2b06 Mon Sep 17 00:00:00 2001 From: Siddon Tang Date: Mon, 28 Oct 2024 16:25:06 +0800 Subject: [PATCH 1/7] add dm support for playground Signed-off-by: Siddon Tang --- components/playground/instance/dm_master.go | 78 +++++++++++++++++++ components/playground/instance/dm_worker.go | 83 +++++++++++++++++++++ components/playground/main.go | 15 ++++ components/playground/playground.go | 44 +++++++++++ 4 files changed, 220 insertions(+) create mode 100644 components/playground/instance/dm_master.go create mode 100644 components/playground/instance/dm_worker.go diff --git a/components/playground/instance/dm_master.go b/components/playground/instance/dm_master.go new file mode 100644 index 0000000000..6631a181c7 --- /dev/null +++ b/components/playground/instance/dm_master.go @@ -0,0 +1,78 @@ +package instance + +import ( + "context" + "fmt" + "path/filepath" + "strings" + + "github.com/pingcap/tiup/pkg/utils" +) + +type DMMaster struct { + instance + Process + initEndpoints []*DMMaster +} + +var _ Instance = &DMMaster{} + +func NewDMMaster(binPath string, dir, host, configPath string, portOffset int, id int, port int) *DMMaster { + if port <= 0 { + port = 8261 + } + return &DMMaster{ + instance: instance{ + BinPath: binPath, + ID: id, + Dir: dir, + Host: host, + Port: utils.MustGetFreePort(host, 8291, portOffset), + // Similar like PD's client port, here use StatusPort for Master Port. + StatusPort: utils.MustGetFreePort(host, port, portOffset), + ConfigPath: configPath, + }, + } +} + +func (m *DMMaster) Name() string { + return fmt.Sprintf("dm-master-%d", m.ID) +} + +func (m *DMMaster) Start(ctx context.Context) error { + args := []string{ + fmt.Sprintf("--name=%s", m.Name()), + fmt.Sprintf("--master-addr=http://%s", utils.JoinHostPort(m.Host, m.StatusPort)), + fmt.Sprintf("--advertise-addr=http://%s", utils.JoinHostPort(AdvertiseHost(m.Host), m.StatusPort)), + fmt.Sprintf("--peer-urls=http://%s", utils.JoinHostPort(m.Host, m.Port)), + fmt.Sprintf("--advertise-peer-urls=http://%s", utils.JoinHostPort(AdvertiseHost(m.Host), m.Port)), + fmt.Sprintf("--log-file=%s", m.LogFile()), + } + + endpoints := make([]string, 0) + for _, master := range m.initEndpoints { + endpoints = append(endpoints, fmt.Sprintf("%s=http://%s", master.Name(), utils.JoinHostPort(master.Host, master.Port))) + } + args = append(args, fmt.Sprintf("--initial-cluster=%s", strings.Join(endpoints, ","))) + + if m.ConfigPath != "" { + args = append(args, fmt.Sprintf("--config=%s", m.ConfigPath)) + } + + m.Process = &process{cmd: PrepareCommand(ctx, m.BinPath, args, nil, m.Dir)} + + logIfErr(m.Process.SetOutputFile(m.LogFile())) + return m.Process.Start() +} + +func (m *DMMaster) SetInitEndpoints(endpoints []*DMMaster) { + m.initEndpoints = endpoints +} + +func (m *DMMaster) Component() string { + return "dm-master" +} + +func (m *DMMaster) LogFile() string { + return filepath.Join(m.Dir, "dm-master.log") +} diff --git a/components/playground/instance/dm_worker.go b/components/playground/instance/dm_worker.go new file mode 100644 index 0000000000..e5acb8a6cf --- /dev/null +++ b/components/playground/instance/dm_worker.go @@ -0,0 +1,83 @@ +package instance + +import ( + "context" + "fmt" + "path/filepath" + "strings" + "time" + + "github.com/pingcap/tiup/pkg/utils" +) + +type DMWorker struct { + instance + Process + + masters []*DMMaster +} + +var _ Instance = &DMWorker{} + +// NewDMWorker create a DMWorker instance. +func NewDMWorker(binPath string, dir, host, configPath string, portOffset int, id int, port int, masters []*DMMaster) *DMWorker { + if port <= 0 { + port = 8262 + } + return &DMWorker{ + instance: instance{ + BinPath: binPath, + ID: id, + Dir: dir, + Host: host, + Port: utils.MustGetFreePort(host, port, portOffset), + ConfigPath: configPath, + }, + masters: masters, + } +} + +func (w *DMWorker) MasterAddrs() []string { + var addrs []string + for _, master := range w.masters { + addrs = append(addrs, utils.JoinHostPort(AdvertiseHost(master.Host), master.StatusPort)) + } + return addrs +} + +func (w *DMWorker) Name() string { + return fmt.Sprintf("dm-worker-%d", w.ID) +} + +func (w *DMWorker) Start(ctx context.Context) error { + args := []string{ + fmt.Sprintf("--name=%s", w.Name()), + fmt.Sprintf("--worker-addr=%s", utils.JoinHostPort(w.Host, w.Port)), + fmt.Sprintf("--advertise-addr=%s", utils.JoinHostPort(AdvertiseHost(w.Host), w.Port)), + fmt.Sprintf("--join=%s", strings.Join(w.MasterAddrs(), ",")), + fmt.Sprintf("--log-file=%s", w.LogFile()), + } + + if w.ConfigPath != "" { + args = append(args, fmt.Sprintf("--config=%s", w.ConfigPath)) + } + + w.Process = &process{cmd: PrepareCommand(ctx, w.BinPath, args, nil, w.Dir)} + + logIfErr(w.Process.SetOutputFile(w.LogFile())) + + // try to wait for the master to be ready + // this is a very ugly implementation, but it may mostly works + // TODO: find a better way to do this, + // e.g, let master support a HTTP API to check if it's ready + time.Sleep(time.Second * 3) + return w.Process.Start() +} + +func (w *DMWorker) Component() string { + return "dm-worker" +} + +func (w *DMWorker) LogFile() string { + return filepath.Join(w.Dir, "dm-worker.log") +} diff --git a/components/playground/main.go b/components/playground/main.go index ad0429e258..777ae0c1f1 100644 --- a/components/playground/main.go +++ b/components/playground/main.go @@ -76,6 +76,8 @@ type BootOptions struct { CSEOpts instance.CSEOptions `yaml:"cse"` // Only available when mode == tidb-cse GrafanaPort int `yaml:"grafana_port"` PortOffset int `yaml:"port_offset"` + DMMaster instance.Config `yaml:"dm_master"` + DMWorker instance.Config `yaml:"dm_worker"` } var ( @@ -351,6 +353,19 @@ Note: Version constraint [bold]%s[reset] is resolved to [green][bold]%s[reset]. rootCmd.Flags().StringVar(&options.CSEOpts.AccessKey, "cse.access_key", "minioadmin", "Object store access key for --mode=tidb-cse") rootCmd.Flags().StringVar(&options.CSEOpts.SecretKey, "cse.secret_key", "minioadmin", "Object store secret key for --mode=tidb-cse") + rootCmd.Flags().IntVar(&options.DMMaster.Num, "dm-master", 0, "DM-master instance number") + rootCmd.Flags().IntVar(&options.DMWorker.Num, "dm-worker", 0, "DM-worker instance number") + + rootCmd.Flags().StringVar(&options.DMMaster.Host, "dm-master.host", "", "DM-master instance host") + rootCmd.Flags().IntVar(&options.DMMaster.Port, "dm-master.port", 8261, "DM-master instance port") + rootCmd.Flags().StringVar(&options.DMMaster.ConfigPath, "dm-master.config", "", "DM-master instance configuration file") + rootCmd.Flags().StringVar(&options.DMMaster.BinPath, "dm-master.binpath", "", "DM-master instance binary path") + + rootCmd.Flags().StringVar(&options.DMWorker.Host, "dm-worker.host", "", "DM-worker instance host") + rootCmd.Flags().IntVar(&options.DMWorker.Port, "dm-worker.port", 8262, "DM-worker instance port") + rootCmd.Flags().StringVar(&options.DMWorker.ConfigPath, "dm-worker.config", "", "DM-worker instance configuration file") + rootCmd.Flags().StringVar(&options.DMWorker.BinPath, "dm-worker.binpath", "", "DM-worker instance binary path") + rootCmd.AddCommand(newDisplay()) rootCmd.AddCommand(newScaleOut()) rootCmd.AddCommand(newScaleIn()) diff --git a/components/playground/playground.go b/components/playground/playground.go index 17c08ef3c9..8f95b6941d 100644 --- a/components/playground/playground.go +++ b/components/playground/playground.go @@ -71,6 +71,8 @@ type Playground struct { tikvCdcs []*instance.TiKVCDC pumps []*instance.Pump drainers []*instance.Drainer + dmMasters []*instance.DMMaster + dmWorkers []*instance.DMWorker startedInstances []instance.Instance idAlloc map[string]int @@ -682,6 +684,20 @@ func (p *Playground) WalkInstances(fn func(componentID string, ins instance.Inst } } + for _, ins := range p.dmMasters { + err := fn(spec.ComponentDMMaster, ins) + if err != nil { + return err + } + } + + for _, ins := range p.dmWorkers { + err := fn(spec.ComponentDMWorker, ins) + if err != nil { + return err + } + } + return nil } @@ -783,6 +799,17 @@ func (p *Playground) addInstance(componentID string, pdRole instance.PDRole, tif inst := instance.NewDrainer(cfg.BinPath, dir, host, cfg.ConfigPath, options.PortOffset, id, p.pds) ins = inst p.drainers = append(p.drainers, inst) + case spec.ComponentDMMaster: + inst := instance.NewDMMaster(cfg.BinPath, dir, host, cfg.ConfigPath, options.PortOffset, id, cfg.Port) + ins = inst + p.dmMasters = append(p.dmMasters, inst) + for _, master := range p.dmMasters { + master.SetInitEndpoints(p.dmMasters) + } + case spec.ComponentDMWorker: + inst := instance.NewDMWorker(cfg.BinPath, dir, host, cfg.ConfigPath, options.PortOffset, id, cfg.Port, p.dmMasters) + ins = inst + p.dmWorkers = append(p.dmWorkers, inst) default: return nil, errors.Errorf("unknown component: %s", componentID) } @@ -928,6 +955,8 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme &options.Pump, &options.Drainer, &options.TiKVCDC, + &options.DMMaster, + &options.DMWorker, } { path, err := getAbsolutePath(cfg.ConfigPath) if err != nil { @@ -969,6 +998,8 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme {spec.ComponentCDC, "", "", options.TiCDC}, {spec.ComponentTiKVCDC, "", "", options.TiKVCDC}, {spec.ComponentDrainer, "", "", options.Drainer}, + {spec.ComponentDMMaster, "", "", options.DMMaster}, + {spec.ComponentDMWorker, "", "", options.DMWorker}, } if options.Mode == "tidb" { @@ -1254,6 +1285,19 @@ func (p *Playground) terminate(sig syscall.Signal) { if p.grafana != nil && p.grafana.cmd != nil && p.grafana.cmd.Process != nil { go kill("grafana", p.grafana.cmd.Process.Pid, p.grafana.wait) } + + for _, inst := range p.dmMasters { + if inst.Process != nil && inst.Process.Cmd() != nil && inst.Process.Cmd().Process != nil { + kill(inst.Component(), inst.Pid(), inst.Wait) + } + } + + for _, inst := range p.dmWorkers { + if inst.Process != nil && inst.Process.Cmd() != nil && inst.Process.Cmd().Process != nil { + kill(inst.Component(), inst.Pid(), inst.Wait) + } + } + for _, inst := range p.tiflashs { if inst.Process != nil && inst.Process.Cmd() != nil && inst.Process.Cmd().Process != nil { kill(inst.Component(), inst.Pid(), inst.Wait) From c16245de6cada208056498073dcf44f573ad7ba7 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Mon, 28 Oct 2024 23:17:12 +0800 Subject: [PATCH 2/7] support command-line args, scale in, alive check --- components/playground/command.go | 8 ++ components/playground/instance/dm_master.go | 4 + components/playground/instance/dm_worker.go | 6 -- components/playground/main.go | 24 +++++ components/playground/playground.go | 106 ++++++++++++++++++-- 5 files changed, 135 insertions(+), 13 deletions(-) diff --git a/components/playground/command.go b/components/playground/command.go index 36cdccd65b..e5c017141f 100644 --- a/components/playground/command.go +++ b/components/playground/command.go @@ -61,6 +61,8 @@ func buildCommands(tp CommandType, opt *BootOptions) (cmds []Command) { {"ticdc", opt.TiCDC}, {"tikv-cdc", opt.TiKVCDC}, {"drainer", opt.Drainer}, + {"dmmaster", opt.DMMaster}, + {"dmworker", opt.DMWorker}, } for _, cmd := range commands { @@ -113,6 +115,8 @@ func newScaleOut() *cobra.Command { cmd.Flags().StringVarP(&opt.TSO.Host, "tso.host", "", opt.TSO.Host, "Playground TSO host. If not provided, TSO will still use `host` flag as its host") cmd.Flags().StringVarP(&opt.Scheduling.Host, "scheduling.host", "", opt.Scheduling.Host, "Playground Scheduling host. If not provided, Scheduling will still use `host` flag as its host") cmd.Flags().StringVarP(&opt.TiProxy.Host, "tiproxy.host", "", opt.PD.Host, "Playground TiProxy host. If not provided, TiProxy will still use `host` flag as its host") + cmd.Flags().IntVarP(&opt.DMMaster.Num, "dmmaster", "", opt.DMMaster.Num, "DM-master instance number") + cmd.Flags().IntVarP(&opt.DMWorker.Num, "dmworker", "", opt.DMWorker.Num, "DM-worker instance number") cmd.Flags().StringVarP(&opt.TiDB.ConfigPath, "db.config", "", opt.TiDB.ConfigPath, "TiDB instance configuration file") cmd.Flags().StringVarP(&opt.TiKV.ConfigPath, "kv.config", "", opt.TiKV.ConfigPath, "TiKV instance configuration file") @@ -123,6 +127,8 @@ func newScaleOut() *cobra.Command { cmd.Flags().StringVarP(&opt.TiProxy.ConfigPath, "tiproxy.config", "", opt.TiProxy.ConfigPath, "TiProxy instance configuration file") cmd.Flags().StringVarP(&opt.Pump.ConfigPath, "pump.config", "", opt.Pump.ConfigPath, "Pump instance configuration file") cmd.Flags().StringVarP(&opt.Drainer.ConfigPath, "drainer.config", "", opt.Drainer.ConfigPath, "Drainer instance configuration file") + cmd.Flags().StringVarP(&opt.DMMaster.ConfigPath, "dmmaster.config", "", opt.DMMaster.ConfigPath, "DM-master instance configuration file") + cmd.Flags().StringVarP(&opt.DMWorker.ConfigPath, "dmworker.config", "", opt.DMWorker.ConfigPath, "DM-worker instance configuration file") cmd.Flags().StringVarP(&opt.TiDB.BinPath, "db.binpath", "", opt.TiDB.BinPath, "TiDB instance binary path") cmd.Flags().StringVarP(&opt.TiKV.BinPath, "kv.binpath", "", opt.TiKV.BinPath, "TiKV instance binary path") @@ -135,6 +141,8 @@ func newScaleOut() *cobra.Command { cmd.Flags().StringVarP(&opt.TiKVCDC.BinPath, "kvcdc.binpath", "", opt.TiKVCDC.BinPath, "TiKVCDC instance binary path") cmd.Flags().StringVarP(&opt.Pump.BinPath, "pump.binpath", "", opt.Pump.BinPath, "Pump instance binary path") cmd.Flags().StringVarP(&opt.Drainer.BinPath, "drainer.binpath", "", opt.Drainer.BinPath, "Drainer instance binary path") + cmd.Flags().StringVarP(&opt.DMMaster.BinPath, "dmmaster.binpath", "", opt.DMMaster.BinPath, "DM-master instance binary path") + cmd.Flags().StringVarP(&opt.DMWorker.BinPath, "dmworker.binpath", "", opt.DMWorker.BinPath, "DM-worker instance binary path") return cmd } diff --git a/components/playground/instance/dm_master.go b/components/playground/instance/dm_master.go index 6631a181c7..8f3868d4e9 100644 --- a/components/playground/instance/dm_master.go +++ b/components/playground/instance/dm_master.go @@ -76,3 +76,7 @@ func (m *DMMaster) Component() string { func (m *DMMaster) LogFile() string { return filepath.Join(m.Dir, "dm-master.log") } + +func (m *DMMaster) Addr() string { + return utils.JoinHostPort(m.Host, m.StatusPort) +} diff --git a/components/playground/instance/dm_worker.go b/components/playground/instance/dm_worker.go index e5acb8a6cf..4a68ab96f5 100644 --- a/components/playground/instance/dm_worker.go +++ b/components/playground/instance/dm_worker.go @@ -5,7 +5,6 @@ import ( "fmt" "path/filepath" "strings" - "time" "github.com/pingcap/tiup/pkg/utils" ) @@ -66,11 +65,6 @@ func (w *DMWorker) Start(ctx context.Context) error { logIfErr(w.Process.SetOutputFile(w.LogFile())) - // try to wait for the master to be ready - // this is a very ugly implementation, but it may mostly works - // TODO: find a better way to do this, - // e.g, let master support a HTTP API to check if it's ready - time.Sleep(time.Second * 3) return w.Process.Start() } diff --git a/components/playground/main.go b/components/playground/main.go index 777ae0c1f1..7b0f824e4b 100644 --- a/components/playground/main.go +++ b/components/playground/main.go @@ -300,6 +300,8 @@ Note: Version constraint [bold]%s[reset] is resolved to [green][bold]%s[reset]. rootCmd.Flags().IntVar(&options.TiKVCDC.Num, "kvcdc", 0, "TiKV-CDC instance number") rootCmd.Flags().IntVar(&options.Pump.Num, "pump", 0, "Pump instance number") rootCmd.Flags().IntVar(&options.Drainer.Num, "drainer", 0, "Drainer instance number") + rootCmd.Flags().IntVar(&options.DMMaster.Num, "dmmaster", 0, "DM-master instance number") + rootCmd.Flags().IntVar(&options.DMWorker.Num, "dmworker", 0, "DM-worker instance number") rootCmd.Flags().IntVar(&options.TiDB.UpTimeout, "db.timeout", 60, "TiDB max wait time in seconds for starting, 0 means no limit") rootCmd.Flags().IntVar(&options.TiFlash.UpTimeout, "tiflash.timeout", 120, "TiFlash max wait time in seconds for starting, 0 means no limit") @@ -330,6 +332,8 @@ Note: Version constraint [bold]%s[reset] is resolved to [green][bold]%s[reset]. rootCmd.Flags().StringVar(&options.Drainer.ConfigPath, "drainer.config", "", "Drainer instance configuration file") rootCmd.Flags().StringVar(&options.TiCDC.ConfigPath, "ticdc.config", "", "TiCDC instance configuration file") rootCmd.Flags().StringVar(&options.TiKVCDC.ConfigPath, "kvcdc.config", "", "TiKV-CDC instance configuration file") + rootCmd.Flags().StringVar(&options.DMMaster.ConfigPath, "dmmaster.config", "", "DM-master instance configuration file") + rootCmd.Flags().StringVar(&options.DMWorker.ConfigPath, "dmworker.config", "", "DM-worker instance configuration file") rootCmd.Flags().StringVar(&options.TiDB.BinPath, "db.binpath", "", "TiDB instance binary path") rootCmd.Flags().StringVar(&options.TiKV.BinPath, "kv.binpath", "", "TiKV instance binary path") @@ -345,6 +349,8 @@ Note: Version constraint [bold]%s[reset] is resolved to [green][bold]%s[reset]. rootCmd.Flags().StringVar(&options.TiKVCDC.BinPath, "kvcdc.binpath", "", "TiKV-CDC instance binary path") rootCmd.Flags().StringVar(&options.Pump.BinPath, "pump.binpath", "", "Pump instance binary path") rootCmd.Flags().StringVar(&options.Drainer.BinPath, "drainer.binpath", "", "Drainer instance binary path") + rootCmd.Flags().StringVar(&options.DMMaster.BinPath, "dmmaster.binpath", "", "DM-master instance binary path") + rootCmd.Flags().StringVar(&options.DMWorker.BinPath, "dmworker.binpath", "", "DM-worker instance binary path") rootCmd.Flags().StringVar(&options.TiKVCDC.Version, "kvcdc.version", "", "TiKV-CDC instance version") @@ -481,6 +487,24 @@ func checkStoreStatus(pdClient *api.PDClient, storeAddr string, timeout int) boo } } +func checkDMMasterStatus(dmMasterClient *api.DMMasterClient, dmMasterAddr string, timeout int) bool { + if timeout > 0 { + for i := 0; i < timeout; i++ { + if _, isActive, _, err := dmMasterClient.GetMaster(dmMasterAddr); err == nil && isActive { + return true + } + time.Sleep(time.Second) + } + return false + } + for { + if _, isActive, _, err := dmMasterClient.GetMaster(dmMasterAddr); err == nil && isActive { + return true + } + time.Sleep(time.Second) + } +} + func hasDashboard(pdAddr string) bool { resp, err := http.Get(fmt.Sprintf("http://%s/dashboard", pdAddr)) if err != nil { diff --git a/components/playground/playground.go b/components/playground/playground.go index 8f95b6941d..bd908bee4e 100644 --- a/components/playground/playground.go +++ b/components/playground/playground.go @@ -139,6 +139,15 @@ func (p *Playground) binlogClient() (*api.BinlogClient, error) { return api.NewBinlogClient(addrs, 5*time.Second, nil) } +func (p *Playground) dmMasterClient() *api.DMMasterClient { + var addrs []string + for _, inst := range p.dmMasters { + addrs = append(addrs, inst.Addr()) + } + + return api.NewDMMasterClient(addrs, 5*time.Second, nil) +} + func (p *Playground) pdClient() *api.PDClient { var addrs []string for _, inst := range p.pds { @@ -384,6 +393,34 @@ func (p *Playground) handleScaleIn(w io.Writer, pid int) error { return nil } } + case spec.ComponentDMWorker: + for i := 0; i < len(p.dmWorkers); i++ { + if p.dmWorkers[i].Pid() == pid { + inst := p.dmWorkers[i] + + c := p.dmMasterClient() + err = c.OfflineWorker(inst.Name(), nil) + if err != nil { + return err + } + p.dmWorkers = append(p.dmWorkers[:i], p.dmWorkers[i+1:]...) + return nil + } + } + case spec.ComponentDMMaster: + for i := 0; i < len(p.dmMasters); i++ { + if p.dmMasters[i].Pid() == pid { + inst := p.dmMasters[i] + + c := p.dmMasterClient() + err = c.OfflineMaster(inst.Name(), nil) + if err != nil { + return err + } + p.dmMasters = append(p.dmMasters[:i], p.dmMasters[i+1:]...) + return nil + } + } default: fmt.Fprintf(w, "unknown component in scale in: %s", cid) return nil @@ -444,6 +481,10 @@ func (p *Playground) sanitizeComponentConfig(cid string, cfg *instance.Config) e return p.sanitizeConfig(p.bootOptions.Drainer, cfg) case spec.ComponentTiProxy: return p.sanitizeConfig(p.bootOptions.TiProxy, cfg) + case spec.ComponentDMMaster: + return p.sanitizeConfig(p.bootOptions.DMMaster, cfg) + case spec.ComponentDMWorker: + return p.sanitizeConfig(p.bootOptions.DMWorker, cfg) default: return fmt.Errorf("unknown %s in sanitizeConfig", cid) } @@ -927,6 +968,40 @@ func (p *Playground) waitAllTiFlashUp() { } } +func (p *Playground) waitAllDMMasterUp() { + if len(p.dmMasters) > 0 { + var wg sync.WaitGroup + bars := progress.NewMultiBar(colorstr.Sprintf("[dark_gray]Waiting for dm-master instances ready")) + for _, master := range p.dmMasters { + wg.Add(1) + prefix := master.Addr() + bar := bars.AddBar(prefix) + go func(masterInst *instance.DMMaster) { + defer wg.Done() + displayResult := &progress.DisplayProps{ + Prefix: prefix, + } + if cmd := masterInst.Cmd(); cmd == nil { + displayResult.Mode = progress.ModeError + displayResult.Suffix = "initialize command failed" + } else if state := cmd.ProcessState; state != nil && state.Exited() { + displayResult.Mode = progress.ModeError + displayResult.Suffix = fmt.Sprintf("process exited with code: %d", state.ExitCode()) + } else if s := checkDMMasterStatus(p.dmMasterClient(), masterInst.Name(), options.DMMaster.UpTimeout); !s { + displayResult.Mode = progress.ModeError + displayResult.Suffix = "failed to up after timeout" + } else { + displayResult.Mode = progress.ModeDone + } + bar.UpdateDisplay(displayResult) + }(master) + } + bars.StartRenderLoop() + wg.Wait() + bars.StopRenderLoop() + } +} + func (p *Playground) bindVersion(comp string, version string) (bindVersion string) { bindVersion = version switch comp { @@ -967,8 +1042,8 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme p.bootOptions = options - // All others components depend on the pd, we just ensure the pd count must be great than 0 - if options.PDMode != "ms" && options.PD.Num < 1 { + // All others components depend on the pd except dm, we just ensure the pd count must be great than 0 + if options.PDMode != "ms" && options.PD.Num < 1 && options.DMMaster.Num < 1 { return fmt.Errorf("all components count must be great than 0 (pd=%v)", options.PD.Num) } @@ -1083,11 +1158,17 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme } anyPumpReady := false + allDMMasterReady := false // Start all instance except tiflash. err := p.WalkInstances(func(cid string, ins instance.Instance) error { if cid == spec.ComponentTiFlash { return nil } + // wait dm-master up before dm-worker + if cid == spec.ComponentDMWorker && !allDMMasterReady { + p.waitAllDMMasterUp() + allDMMasterReady = true + } err := p.startInstance(ctx, ins) if err != nil { @@ -1166,9 +1247,20 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme } } - if pdAddr := p.pds[0].Addr(); len(p.tidbs) > 0 && hasDashboard(pdAddr) { - fmt.Printf("TiDB Dashboard: ") - colorCmd.Printf("http://%s/dashboard\n", pdAddr) + if len(p.dmMasters) > 0 { + fmt.Printf("Connect DM: ") + endpoints := make([]string, 0, len(p.dmMasters)) + for _, dmMaster := range p.dmMasters { + endpoints = append(endpoints, dmMaster.Addr()) + } + colorCmd.Printf("tiup dmctl --master-addr %s\n", strings.Join(endpoints, ",")) + } + + if len(p.pds) > 0 { + if pdAddr := p.pds[0].Addr(); len(p.tidbs) > 0 && hasDashboard(pdAddr) { + fmt.Printf("TiDB Dashboard: ") + colorCmd.Printf("http://%s/dashboard\n", pdAddr) + } } if p.bootOptions.Mode == "tikv-slim" { @@ -1286,13 +1378,13 @@ func (p *Playground) terminate(sig syscall.Signal) { go kill("grafana", p.grafana.cmd.Process.Pid, p.grafana.wait) } - for _, inst := range p.dmMasters { + for _, inst := range p.dmWorkers { if inst.Process != nil && inst.Process.Cmd() != nil && inst.Process.Cmd().Process != nil { kill(inst.Component(), inst.Pid(), inst.Wait) } } - for _, inst := range p.dmWorkers { + for _, inst := range p.dmMasters { if inst.Process != nil && inst.Process.Cmd() != nil && inst.Process.Cmd().Process != nil { kill(inst.Component(), inst.Pid(), inst.Wait) } From 97aa6389aae8aaa3b597ee44dbd74c3378467755 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Mon, 28 Oct 2024 23:34:51 +0800 Subject: [PATCH 3/7] fix exported --- components/playground/instance/dm_master.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/components/playground/instance/dm_master.go b/components/playground/instance/dm_master.go index 8f3868d4e9..e621458b81 100644 --- a/components/playground/instance/dm_master.go +++ b/components/playground/instance/dm_master.go @@ -9,6 +9,7 @@ import ( "github.com/pingcap/tiup/pkg/utils" ) +// DMMaster represent a DM master instance. type DMMaster struct { instance Process @@ -17,6 +18,7 @@ type DMMaster struct { var _ Instance = &DMMaster{} +// NewDMMaster create a new DMMaster instance. func NewDMMaster(binPath string, dir, host, configPath string, portOffset int, id int, port int) *DMMaster { if port <= 0 { port = 8261 @@ -35,10 +37,12 @@ func NewDMMaster(binPath string, dir, host, configPath string, portOffset int, i } } +// Name return the name of the instance. func (m *DMMaster) Name() string { return fmt.Sprintf("dm-master-%d", m.ID) } +// Start starts the instance. func (m *DMMaster) Start(ctx context.Context) error { args := []string{ fmt.Sprintf("--name=%s", m.Name()), @@ -65,18 +69,22 @@ func (m *DMMaster) Start(ctx context.Context) error { return m.Process.Start() } +// SetInitEndpoints set the initial endpoints for the DM master. func (m *DMMaster) SetInitEndpoints(endpoints []*DMMaster) { m.initEndpoints = endpoints } +// Component return the component of the instance. func (m *DMMaster) Component() string { return "dm-master" } +// LogFile return the log file path of the instance. func (m *DMMaster) LogFile() string { return filepath.Join(m.Dir, "dm-master.log") } +// Addr return the address of the instance. func (m *DMMaster) Addr() string { return utils.JoinHostPort(m.Host, m.StatusPort) } From 3a7063a37a4ecbe6b822428bee5c4ef5f6e15821 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Mon, 28 Oct 2024 23:43:29 +0800 Subject: [PATCH 4/7] fix cognitive-complexity --- components/playground/instance/dm_worker.go | 6 +++ components/playground/playground.go | 60 ++++++++++++--------- 2 files changed, 42 insertions(+), 24 deletions(-) diff --git a/components/playground/instance/dm_worker.go b/components/playground/instance/dm_worker.go index 4a68ab96f5..61c5e748f1 100644 --- a/components/playground/instance/dm_worker.go +++ b/components/playground/instance/dm_worker.go @@ -9,6 +9,7 @@ import ( "github.com/pingcap/tiup/pkg/utils" ) +// DMWorker represent a DM worker instance. type DMWorker struct { instance Process @@ -36,6 +37,7 @@ func NewDMWorker(binPath string, dir, host, configPath string, portOffset int, i } } +// MasterAddrs return the master addresses. func (w *DMWorker) MasterAddrs() []string { var addrs []string for _, master := range w.masters { @@ -44,10 +46,12 @@ func (w *DMWorker) MasterAddrs() []string { return addrs } +// Name return the name of the instance. func (w *DMWorker) Name() string { return fmt.Sprintf("dm-worker-%d", w.ID) } +// Start starts the instance. func (w *DMWorker) Start(ctx context.Context) error { args := []string{ fmt.Sprintf("--name=%s", w.Name()), @@ -68,10 +72,12 @@ func (w *DMWorker) Start(ctx context.Context) error { return w.Process.Start() } +// Component return the component of the instance. func (w *DMWorker) Component() string { return "dm-worker" } +// LogFile return the log file of the instance. func (w *DMWorker) LogFile() string { return filepath.Join(w.Dir, "dm-worker.log") } diff --git a/components/playground/playground.go b/components/playground/playground.go index bd908bee4e..73fe009ad5 100644 --- a/components/playground/playground.go +++ b/components/playground/playground.go @@ -394,32 +394,12 @@ func (p *Playground) handleScaleIn(w io.Writer, pid int) error { } } case spec.ComponentDMWorker: - for i := 0; i < len(p.dmWorkers); i++ { - if p.dmWorkers[i].Pid() == pid { - inst := p.dmWorkers[i] - - c := p.dmMasterClient() - err = c.OfflineWorker(inst.Name(), nil) - if err != nil { - return err - } - p.dmWorkers = append(p.dmWorkers[:i], p.dmWorkers[i+1:]...) - return nil - } + if err := p.handleScaleInDMWorker(pid); err != nil { + return err } case spec.ComponentDMMaster: - for i := 0; i < len(p.dmMasters); i++ { - if p.dmMasters[i].Pid() == pid { - inst := p.dmMasters[i] - - c := p.dmMasterClient() - err = c.OfflineMaster(inst.Name(), nil) - if err != nil { - return err - } - p.dmMasters = append(p.dmMasters[:i], p.dmMasters[i+1:]...) - return nil - } + if err := p.handleScaleInDMMaster(pid); err != nil { + return err } default: fmt.Fprintf(w, "unknown component in scale in: %s", cid) @@ -438,6 +418,38 @@ func (p *Playground) handleScaleIn(w io.Writer, pid int) error { return nil } +func (p *Playground) handleScaleInDMWorker(pid int) error { + for i := 0; i < len(p.dmWorkers); i++ { + if p.dmWorkers[i].Pid() == pid { + inst := p.dmWorkers[i] + + c := p.dmMasterClient() + if err := c.OfflineWorker(inst.Name(), nil); err != nil { + return err + } + p.dmWorkers = append(p.dmWorkers[:i], p.dmWorkers[i+1:]...) + return nil + } + } + return nil +} + +func (p *Playground) handleScaleInDMMaster(pid int) error { + for i := 0; i < len(p.dmMasters); i++ { + if p.dmMasters[i].Pid() == pid { + inst := p.dmMasters[i] + + c := p.dmMasterClient() + if err := c.OfflineMaster(inst.Name(), nil); err != nil { + return err + } + p.dmMasters = append(p.dmMasters[:i], p.dmMasters[i+1:]...) + return nil + } + } + return nil +} + func (p *Playground) sanitizeConfig(boot instance.Config, cfg *instance.Config) error { if cfg.BinPath == "" { cfg.BinPath = boot.BinPath From ac3f51bbcfda7f316f6e7e2a72962d0a09fc768a Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Tue, 29 Oct 2024 14:39:21 +0800 Subject: [PATCH 5/7] fix name --- components/playground/command.go | 16 ++++++++-------- components/playground/main.go | 12 ++++++------ 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/components/playground/command.go b/components/playground/command.go index e5c017141f..62a1b9232b 100644 --- a/components/playground/command.go +++ b/components/playground/command.go @@ -61,8 +61,8 @@ func buildCommands(tp CommandType, opt *BootOptions) (cmds []Command) { {"ticdc", opt.TiCDC}, {"tikv-cdc", opt.TiKVCDC}, {"drainer", opt.Drainer}, - {"dmmaster", opt.DMMaster}, - {"dmworker", opt.DMWorker}, + {"dm-master", opt.DMMaster}, + {"dm-worker", opt.DMWorker}, } for _, cmd := range commands { @@ -115,8 +115,8 @@ func newScaleOut() *cobra.Command { cmd.Flags().StringVarP(&opt.TSO.Host, "tso.host", "", opt.TSO.Host, "Playground TSO host. If not provided, TSO will still use `host` flag as its host") cmd.Flags().StringVarP(&opt.Scheduling.Host, "scheduling.host", "", opt.Scheduling.Host, "Playground Scheduling host. If not provided, Scheduling will still use `host` flag as its host") cmd.Flags().StringVarP(&opt.TiProxy.Host, "tiproxy.host", "", opt.PD.Host, "Playground TiProxy host. If not provided, TiProxy will still use `host` flag as its host") - cmd.Flags().IntVarP(&opt.DMMaster.Num, "dmmaster", "", opt.DMMaster.Num, "DM-master instance number") - cmd.Flags().IntVarP(&opt.DMWorker.Num, "dmworker", "", opt.DMWorker.Num, "DM-worker instance number") + cmd.Flags().IntVarP(&opt.DMMaster.Num, "dm-master", "", opt.DMMaster.Num, "DM-master instance number") + cmd.Flags().IntVarP(&opt.DMWorker.Num, "dm-worker", "", opt.DMWorker.Num, "DM-worker instance number") cmd.Flags().StringVarP(&opt.TiDB.ConfigPath, "db.config", "", opt.TiDB.ConfigPath, "TiDB instance configuration file") cmd.Flags().StringVarP(&opt.TiKV.ConfigPath, "kv.config", "", opt.TiKV.ConfigPath, "TiKV instance configuration file") @@ -127,8 +127,8 @@ func newScaleOut() *cobra.Command { cmd.Flags().StringVarP(&opt.TiProxy.ConfigPath, "tiproxy.config", "", opt.TiProxy.ConfigPath, "TiProxy instance configuration file") cmd.Flags().StringVarP(&opt.Pump.ConfigPath, "pump.config", "", opt.Pump.ConfigPath, "Pump instance configuration file") cmd.Flags().StringVarP(&opt.Drainer.ConfigPath, "drainer.config", "", opt.Drainer.ConfigPath, "Drainer instance configuration file") - cmd.Flags().StringVarP(&opt.DMMaster.ConfigPath, "dmmaster.config", "", opt.DMMaster.ConfigPath, "DM-master instance configuration file") - cmd.Flags().StringVarP(&opt.DMWorker.ConfigPath, "dmworker.config", "", opt.DMWorker.ConfigPath, "DM-worker instance configuration file") + cmd.Flags().StringVarP(&opt.DMMaster.ConfigPath, "dm-master.config", "", opt.DMMaster.ConfigPath, "DM-master instance configuration file") + cmd.Flags().StringVarP(&opt.DMWorker.ConfigPath, "dm-worker.config", "", opt.DMWorker.ConfigPath, "DM-worker instance configuration file") cmd.Flags().StringVarP(&opt.TiDB.BinPath, "db.binpath", "", opt.TiDB.BinPath, "TiDB instance binary path") cmd.Flags().StringVarP(&opt.TiKV.BinPath, "kv.binpath", "", opt.TiKV.BinPath, "TiKV instance binary path") @@ -141,8 +141,8 @@ func newScaleOut() *cobra.Command { cmd.Flags().StringVarP(&opt.TiKVCDC.BinPath, "kvcdc.binpath", "", opt.TiKVCDC.BinPath, "TiKVCDC instance binary path") cmd.Flags().StringVarP(&opt.Pump.BinPath, "pump.binpath", "", opt.Pump.BinPath, "Pump instance binary path") cmd.Flags().StringVarP(&opt.Drainer.BinPath, "drainer.binpath", "", opt.Drainer.BinPath, "Drainer instance binary path") - cmd.Flags().StringVarP(&opt.DMMaster.BinPath, "dmmaster.binpath", "", opt.DMMaster.BinPath, "DM-master instance binary path") - cmd.Flags().StringVarP(&opt.DMWorker.BinPath, "dmworker.binpath", "", opt.DMWorker.BinPath, "DM-worker instance binary path") + cmd.Flags().StringVarP(&opt.DMMaster.BinPath, "dm-master.binpath", "", opt.DMMaster.BinPath, "DM-master instance binary path") + cmd.Flags().StringVarP(&opt.DMWorker.BinPath, "dm-worker.binpath", "", opt.DMWorker.BinPath, "DM-worker instance binary path") return cmd } diff --git a/components/playground/main.go b/components/playground/main.go index 7b0f824e4b..9b555009e4 100644 --- a/components/playground/main.go +++ b/components/playground/main.go @@ -300,8 +300,8 @@ Note: Version constraint [bold]%s[reset] is resolved to [green][bold]%s[reset]. rootCmd.Flags().IntVar(&options.TiKVCDC.Num, "kvcdc", 0, "TiKV-CDC instance number") rootCmd.Flags().IntVar(&options.Pump.Num, "pump", 0, "Pump instance number") rootCmd.Flags().IntVar(&options.Drainer.Num, "drainer", 0, "Drainer instance number") - rootCmd.Flags().IntVar(&options.DMMaster.Num, "dmmaster", 0, "DM-master instance number") - rootCmd.Flags().IntVar(&options.DMWorker.Num, "dmworker", 0, "DM-worker instance number") + rootCmd.Flags().IntVar(&options.DMMaster.Num, "dm-master", 0, "DM-master instance number") + rootCmd.Flags().IntVar(&options.DMWorker.Num, "dm-worker", 0, "DM-worker instance number") rootCmd.Flags().IntVar(&options.TiDB.UpTimeout, "db.timeout", 60, "TiDB max wait time in seconds for starting, 0 means no limit") rootCmd.Flags().IntVar(&options.TiFlash.UpTimeout, "tiflash.timeout", 120, "TiFlash max wait time in seconds for starting, 0 means no limit") @@ -332,8 +332,8 @@ Note: Version constraint [bold]%s[reset] is resolved to [green][bold]%s[reset]. rootCmd.Flags().StringVar(&options.Drainer.ConfigPath, "drainer.config", "", "Drainer instance configuration file") rootCmd.Flags().StringVar(&options.TiCDC.ConfigPath, "ticdc.config", "", "TiCDC instance configuration file") rootCmd.Flags().StringVar(&options.TiKVCDC.ConfigPath, "kvcdc.config", "", "TiKV-CDC instance configuration file") - rootCmd.Flags().StringVar(&options.DMMaster.ConfigPath, "dmmaster.config", "", "DM-master instance configuration file") - rootCmd.Flags().StringVar(&options.DMWorker.ConfigPath, "dmworker.config", "", "DM-worker instance configuration file") + rootCmd.Flags().StringVar(&options.DMMaster.ConfigPath, "dm-master.config", "", "DM-master instance configuration file") + rootCmd.Flags().StringVar(&options.DMWorker.ConfigPath, "dm-worker.config", "", "DM-worker instance configuration file") rootCmd.Flags().StringVar(&options.TiDB.BinPath, "db.binpath", "", "TiDB instance binary path") rootCmd.Flags().StringVar(&options.TiKV.BinPath, "kv.binpath", "", "TiKV instance binary path") @@ -349,8 +349,8 @@ Note: Version constraint [bold]%s[reset] is resolved to [green][bold]%s[reset]. rootCmd.Flags().StringVar(&options.TiKVCDC.BinPath, "kvcdc.binpath", "", "TiKV-CDC instance binary path") rootCmd.Flags().StringVar(&options.Pump.BinPath, "pump.binpath", "", "Pump instance binary path") rootCmd.Flags().StringVar(&options.Drainer.BinPath, "drainer.binpath", "", "Drainer instance binary path") - rootCmd.Flags().StringVar(&options.DMMaster.BinPath, "dmmaster.binpath", "", "DM-master instance binary path") - rootCmd.Flags().StringVar(&options.DMWorker.BinPath, "dmworker.binpath", "", "DM-worker instance binary path") + rootCmd.Flags().StringVar(&options.DMMaster.BinPath, "dm-master.binpath", "", "DM-master instance binary path") + rootCmd.Flags().StringVar(&options.DMWorker.BinPath, "dm-worker.binpath", "", "DM-worker instance binary path") rootCmd.Flags().StringVar(&options.TiKVCDC.Version, "kvcdc.version", "", "TiKV-CDC instance version") From 12fc9b6a653833ad01dcb35e1b3ca28412a7b192 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Tue, 29 Oct 2024 14:43:26 +0800 Subject: [PATCH 6/7] fix command-line args --- components/playground/main.go | 17 ++++------------- components/playground/playground.go | 2 +- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/components/playground/main.go b/components/playground/main.go index 9b555009e4..9cfb32d280 100644 --- a/components/playground/main.go +++ b/components/playground/main.go @@ -318,6 +318,10 @@ Note: Version constraint [bold]%s[reset] is resolved to [green][bold]%s[reset]. rootCmd.Flags().IntVar(&options.TiCDC.Port, "ticdc.port", 0, "Playground TiCDC port. If not provided, TiCDC will use 8300 as its port") rootCmd.Flags().StringVar(&options.TiProxy.Host, "tiproxy.host", "", "Playground TiProxy host. If not provided, TiProxy will still use `host` flag as its host") rootCmd.Flags().IntVar(&options.TiProxy.Port, "tiproxy.port", 0, "Playground TiProxy port. If not provided, TiProxy will use 6000 as its port") + rootCmd.Flags().StringVar(&options.DMMaster.Host, "dm-master.host", "", "DM-master instance host") + rootCmd.Flags().IntVar(&options.DMMaster.Port, "dm-master.port", 8261, "DM-master instance port") + rootCmd.Flags().StringVar(&options.DMWorker.Host, "dm-worker.host", "", "DM-worker instance host") + rootCmd.Flags().IntVar(&options.DMWorker.Port, "dm-worker.port", 8262, "DM-worker instance port") rootCmd.Flags().StringVar(&options.TiDB.ConfigPath, "db.config", "", "TiDB instance configuration file") rootCmd.Flags().StringVar(&options.TiKV.ConfigPath, "kv.config", "", "TiKV instance configuration file") @@ -359,19 +363,6 @@ Note: Version constraint [bold]%s[reset] is resolved to [green][bold]%s[reset]. rootCmd.Flags().StringVar(&options.CSEOpts.AccessKey, "cse.access_key", "minioadmin", "Object store access key for --mode=tidb-cse") rootCmd.Flags().StringVar(&options.CSEOpts.SecretKey, "cse.secret_key", "minioadmin", "Object store secret key for --mode=tidb-cse") - rootCmd.Flags().IntVar(&options.DMMaster.Num, "dm-master", 0, "DM-master instance number") - rootCmd.Flags().IntVar(&options.DMWorker.Num, "dm-worker", 0, "DM-worker instance number") - - rootCmd.Flags().StringVar(&options.DMMaster.Host, "dm-master.host", "", "DM-master instance host") - rootCmd.Flags().IntVar(&options.DMMaster.Port, "dm-master.port", 8261, "DM-master instance port") - rootCmd.Flags().StringVar(&options.DMMaster.ConfigPath, "dm-master.config", "", "DM-master instance configuration file") - rootCmd.Flags().StringVar(&options.DMMaster.BinPath, "dm-master.binpath", "", "DM-master instance binary path") - - rootCmd.Flags().StringVar(&options.DMWorker.Host, "dm-worker.host", "", "DM-worker instance host") - rootCmd.Flags().IntVar(&options.DMWorker.Port, "dm-worker.port", 8262, "DM-worker instance port") - rootCmd.Flags().StringVar(&options.DMWorker.ConfigPath, "dm-worker.config", "", "DM-worker instance configuration file") - rootCmd.Flags().StringVar(&options.DMWorker.BinPath, "dm-worker.binpath", "", "DM-worker instance binary path") - rootCmd.AddCommand(newDisplay()) rootCmd.AddCommand(newScaleOut()) rootCmd.AddCommand(newScaleIn()) diff --git a/components/playground/playground.go b/components/playground/playground.go index 73fe009ad5..2b1f783d61 100644 --- a/components/playground/playground.go +++ b/components/playground/playground.go @@ -1178,7 +1178,7 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme } // wait dm-master up before dm-worker if cid == spec.ComponentDMWorker && !allDMMasterReady { - p.waitAllDMMasterUp() + //p.waitAllDMMasterUp() allDMMasterReady = true } From 77463ffbba171f31476600d43ecc345830b21d9b Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Tue, 29 Oct 2024 14:44:01 +0800 Subject: [PATCH 7/7] remove debug comment --- components/playground/playground.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/playground/playground.go b/components/playground/playground.go index 2b1f783d61..73fe009ad5 100644 --- a/components/playground/playground.go +++ b/components/playground/playground.go @@ -1178,7 +1178,7 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme } // wait dm-master up before dm-worker if cid == spec.ComponentDMWorker && !allDMMasterReady { - //p.waitAllDMMasterUp() + p.waitAllDMMasterUp() allDMMasterReady = true }