diff --git a/components/playground/instance/dm_master.go b/components/playground/instance/dm_master.go new file mode 100644 index 0000000000..6631a181c7 --- /dev/null +++ b/components/playground/instance/dm_master.go @@ -0,0 +1,78 @@ +package instance + +import ( + "context" + "fmt" + "path/filepath" + "strings" + + "github.com/pingcap/tiup/pkg/utils" +) + +type DMMaster struct { + instance + Process + initEndpoints []*DMMaster +} + +var _ Instance = &DMMaster{} + +func NewDMMaster(binPath string, dir, host, configPath string, portOffset int, id int, port int) *DMMaster { + if port <= 0 { + port = 8261 + } + return &DMMaster{ + instance: instance{ + BinPath: binPath, + ID: id, + Dir: dir, + Host: host, + Port: utils.MustGetFreePort(host, 8291, portOffset), + // Similar like PD's client port, here use StatusPort for Master Port. + StatusPort: utils.MustGetFreePort(host, port, portOffset), + ConfigPath: configPath, + }, + } +} + +func (m *DMMaster) Name() string { + return fmt.Sprintf("dm-master-%d", m.ID) +} + +func (m *DMMaster) Start(ctx context.Context) error { + args := []string{ + fmt.Sprintf("--name=%s", m.Name()), + fmt.Sprintf("--master-addr=http://%s", utils.JoinHostPort(m.Host, m.StatusPort)), + fmt.Sprintf("--advertise-addr=http://%s", utils.JoinHostPort(AdvertiseHost(m.Host), m.StatusPort)), + fmt.Sprintf("--peer-urls=http://%s", utils.JoinHostPort(m.Host, m.Port)), + fmt.Sprintf("--advertise-peer-urls=http://%s", utils.JoinHostPort(AdvertiseHost(m.Host), m.Port)), + fmt.Sprintf("--log-file=%s", m.LogFile()), + } + + endpoints := make([]string, 0) + for _, master := range m.initEndpoints { + endpoints = append(endpoints, fmt.Sprintf("%s=http://%s", master.Name(), utils.JoinHostPort(master.Host, master.Port))) + } + args = append(args, fmt.Sprintf("--initial-cluster=%s", strings.Join(endpoints, ","))) + + if m.ConfigPath != "" { + args = append(args, fmt.Sprintf("--config=%s", m.ConfigPath)) + } + + m.Process = &process{cmd: PrepareCommand(ctx, m.BinPath, args, nil, m.Dir)} + + logIfErr(m.Process.SetOutputFile(m.LogFile())) + return m.Process.Start() +} + +func (m *DMMaster) SetInitEndpoints(endpoints []*DMMaster) { + m.initEndpoints = endpoints +} + +func (m *DMMaster) Component() string { + return "dm-master" +} + +func (m *DMMaster) LogFile() string { + return filepath.Join(m.Dir, "dm-master.log") +} diff --git a/components/playground/instance/dm_worker.go b/components/playground/instance/dm_worker.go new file mode 100644 index 0000000000..e5acb8a6cf --- /dev/null +++ b/components/playground/instance/dm_worker.go @@ -0,0 +1,83 @@ +package instance + +import ( + "context" + "fmt" + "path/filepath" + "strings" + "time" + + "github.com/pingcap/tiup/pkg/utils" +) + +type DMWorker struct { + instance + Process + + masters []*DMMaster +} + +var _ Instance = &DMWorker{} + +// NewDMWorker create a DMWorker instance. +func NewDMWorker(binPath string, dir, host, configPath string, portOffset int, id int, port int, masters []*DMMaster) *DMWorker { + if port <= 0 { + port = 8262 + } + return &DMWorker{ + instance: instance{ + BinPath: binPath, + ID: id, + Dir: dir, + Host: host, + Port: utils.MustGetFreePort(host, port, portOffset), + ConfigPath: configPath, + }, + masters: masters, + } +} + +func (w *DMWorker) MasterAddrs() []string { + var addrs []string + for _, master := range w.masters { + addrs = append(addrs, utils.JoinHostPort(AdvertiseHost(master.Host), master.StatusPort)) + } + return addrs +} + +func (w *DMWorker) Name() string { + return fmt.Sprintf("dm-worker-%d", w.ID) +} + +func (w *DMWorker) Start(ctx context.Context) error { + args := []string{ + fmt.Sprintf("--name=%s", w.Name()), + fmt.Sprintf("--worker-addr=%s", utils.JoinHostPort(w.Host, w.Port)), + fmt.Sprintf("--advertise-addr=%s", utils.JoinHostPort(AdvertiseHost(w.Host), w.Port)), + fmt.Sprintf("--join=%s", strings.Join(w.MasterAddrs(), ",")), + fmt.Sprintf("--log-file=%s", w.LogFile()), + } + + if w.ConfigPath != "" { + args = append(args, fmt.Sprintf("--config=%s", w.ConfigPath)) + } + + w.Process = &process{cmd: PrepareCommand(ctx, w.BinPath, args, nil, w.Dir)} + + logIfErr(w.Process.SetOutputFile(w.LogFile())) + + // try to wait for the master to be ready + // this is a very ugly implementation, but it may mostly works + // TODO: find a better way to do this, + // e.g, let master support a HTTP API to check if it's ready + time.Sleep(time.Second * 3) + return w.Process.Start() +} + +func (w *DMWorker) Component() string { + return "dm-worker" +} + +func (w *DMWorker) LogFile() string { + return filepath.Join(w.Dir, "dm-worker.log") +} diff --git a/components/playground/main.go b/components/playground/main.go index ad0429e258..777ae0c1f1 100644 --- a/components/playground/main.go +++ b/components/playground/main.go @@ -76,6 +76,8 @@ type BootOptions struct { CSEOpts instance.CSEOptions `yaml:"cse"` // Only available when mode == tidb-cse GrafanaPort int `yaml:"grafana_port"` PortOffset int `yaml:"port_offset"` + DMMaster instance.Config `yaml:"dm_master"` + DMWorker instance.Config `yaml:"dm_worker"` } var ( @@ -351,6 +353,19 @@ Note: Version constraint [bold]%s[reset] is resolved to [green][bold]%s[reset]. rootCmd.Flags().StringVar(&options.CSEOpts.AccessKey, "cse.access_key", "minioadmin", "Object store access key for --mode=tidb-cse") rootCmd.Flags().StringVar(&options.CSEOpts.SecretKey, "cse.secret_key", "minioadmin", "Object store secret key for --mode=tidb-cse") + rootCmd.Flags().IntVar(&options.DMMaster.Num, "dm-master", 0, "DM-master instance number") + rootCmd.Flags().IntVar(&options.DMWorker.Num, "dm-worker", 0, "DM-worker instance number") + + rootCmd.Flags().StringVar(&options.DMMaster.Host, "dm-master.host", "", "DM-master instance host") + rootCmd.Flags().IntVar(&options.DMMaster.Port, "dm-master.port", 8261, "DM-master instance port") + rootCmd.Flags().StringVar(&options.DMMaster.ConfigPath, "dm-master.config", "", "DM-master instance configuration file") + rootCmd.Flags().StringVar(&options.DMMaster.BinPath, "dm-master.binpath", "", "DM-master instance binary path") + + rootCmd.Flags().StringVar(&options.DMWorker.Host, "dm-worker.host", "", "DM-worker instance host") + rootCmd.Flags().IntVar(&options.DMWorker.Port, "dm-worker.port", 8262, "DM-worker instance port") + rootCmd.Flags().StringVar(&options.DMWorker.ConfigPath, "dm-worker.config", "", "DM-worker instance configuration file") + rootCmd.Flags().StringVar(&options.DMWorker.BinPath, "dm-worker.binpath", "", "DM-worker instance binary path") + rootCmd.AddCommand(newDisplay()) rootCmd.AddCommand(newScaleOut()) rootCmd.AddCommand(newScaleIn()) diff --git a/components/playground/playground.go b/components/playground/playground.go index 17c08ef3c9..8f95b6941d 100644 --- a/components/playground/playground.go +++ b/components/playground/playground.go @@ -71,6 +71,8 @@ type Playground struct { tikvCdcs []*instance.TiKVCDC pumps []*instance.Pump drainers []*instance.Drainer + dmMasters []*instance.DMMaster + dmWorkers []*instance.DMWorker startedInstances []instance.Instance idAlloc map[string]int @@ -682,6 +684,20 @@ func (p *Playground) WalkInstances(fn func(componentID string, ins instance.Inst } } + for _, ins := range p.dmMasters { + err := fn(spec.ComponentDMMaster, ins) + if err != nil { + return err + } + } + + for _, ins := range p.dmWorkers { + err := fn(spec.ComponentDMWorker, ins) + if err != nil { + return err + } + } + return nil } @@ -783,6 +799,17 @@ func (p *Playground) addInstance(componentID string, pdRole instance.PDRole, tif inst := instance.NewDrainer(cfg.BinPath, dir, host, cfg.ConfigPath, options.PortOffset, id, p.pds) ins = inst p.drainers = append(p.drainers, inst) + case spec.ComponentDMMaster: + inst := instance.NewDMMaster(cfg.BinPath, dir, host, cfg.ConfigPath, options.PortOffset, id, cfg.Port) + ins = inst + p.dmMasters = append(p.dmMasters, inst) + for _, master := range p.dmMasters { + master.SetInitEndpoints(p.dmMasters) + } + case spec.ComponentDMWorker: + inst := instance.NewDMWorker(cfg.BinPath, dir, host, cfg.ConfigPath, options.PortOffset, id, cfg.Port, p.dmMasters) + ins = inst + p.dmWorkers = append(p.dmWorkers, inst) default: return nil, errors.Errorf("unknown component: %s", componentID) } @@ -928,6 +955,8 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme &options.Pump, &options.Drainer, &options.TiKVCDC, + &options.DMMaster, + &options.DMWorker, } { path, err := getAbsolutePath(cfg.ConfigPath) if err != nil { @@ -969,6 +998,8 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme {spec.ComponentCDC, "", "", options.TiCDC}, {spec.ComponentTiKVCDC, "", "", options.TiKVCDC}, {spec.ComponentDrainer, "", "", options.Drainer}, + {spec.ComponentDMMaster, "", "", options.DMMaster}, + {spec.ComponentDMWorker, "", "", options.DMWorker}, } if options.Mode == "tidb" { @@ -1254,6 +1285,19 @@ func (p *Playground) terminate(sig syscall.Signal) { if p.grafana != nil && p.grafana.cmd != nil && p.grafana.cmd.Process != nil { go kill("grafana", p.grafana.cmd.Process.Pid, p.grafana.wait) } + + for _, inst := range p.dmMasters { + if inst.Process != nil && inst.Process.Cmd() != nil && inst.Process.Cmd().Process != nil { + kill(inst.Component(), inst.Pid(), inst.Wait) + } + } + + for _, inst := range p.dmWorkers { + if inst.Process != nil && inst.Process.Cmd() != nil && inst.Process.Cmd().Process != nil { + kill(inst.Component(), inst.Pid(), inst.Wait) + } + } + for _, inst := range p.tiflashs { if inst.Process != nil && inst.Process.Cmd() != nil && inst.Process.Cmd().Process != nil { kill(inst.Component(), inst.Pid(), inst.Wait)