diff --git a/components/playground/command.go b/components/playground/command.go index ae5b8aa14c..02ce69bcd0 100644 --- a/components/playground/command.go +++ b/components/playground/command.go @@ -91,6 +91,15 @@ func buildCommands(tp CommandType, opt *bootOptions) (cmds []Command) { cmds = append(cmds, c) } + for i := 0; i < opt.ticdc.Num; i++ { + c := Command{ + CommandType: tp, + ComponentID: "ticdc", + Config: opt.ticdc, + } + + cmds = append(cmds, c) + } for i := 0; i < opt.drainer.Num; i++ { c := Command{ CommandType: tp, @@ -127,6 +136,7 @@ func newScaleOut() *cobra.Command { cmd.Flags().IntVarP(&opt.tikv.Num, "kv", "", opt.tikv.Num, "TiKV instance number") cmd.Flags().IntVarP(&opt.pd.Num, "pd", "", opt.pd.Num, "PD instance number") cmd.Flags().IntVarP(&opt.tiflash.Num, "tiflash", "", opt.tiflash.Num, "TiFlash instance number") + cmd.Flags().IntVarP(&opt.ticdc.Num, "ticdc", "", opt.ticdc.Num, "TiCDC instance number") cmd.Flags().IntVarP(&opt.pump.Num, "pump", "", opt.pump.Num, "Pump instance number") cmd.Flags().IntVarP(&opt.drainer.Num, "drainer", "", opt.pump.Num, "Drainer instance number") @@ -144,6 +154,7 @@ func newScaleOut() *cobra.Command { cmd.Flags().StringVarP(&opt.tikv.BinPath, "kv.binpath", "", opt.tikv.BinPath, "TiKV instance binary path") cmd.Flags().StringVarP(&opt.pd.BinPath, "pd.binpath", "", opt.pd.BinPath, "PD instance binary path") cmd.Flags().StringVarP(&opt.tiflash.BinPath, "tiflash.binpath", "", opt.tiflash.BinPath, "TiFlash instance binary path") + cmd.Flags().StringVarP(&opt.ticdc.BinPath, "ticdc.binpath", "", opt.ticdc.BinPath, "TiCDC instance binary path") cmd.Flags().StringVarP(&opt.pump.BinPath, "pump.binpath", "", opt.pump.BinPath, "Pump instance binary path") cmd.Flags().StringVarP(&opt.drainer.BinPath, "drainer.binpath", "", opt.drainer.BinPath, "Drainer instance binary path") diff --git a/components/playground/instance/ticdc.go b/components/playground/instance/ticdc.go new file mode 100644 index 0000000000..ed6d85a5c6 --- /dev/null +++ b/components/playground/instance/ticdc.go @@ -0,0 +1,88 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package instance + +import ( + "context" + "fmt" + "github.com/pingcap/tiup/pkg/repository/v0manifest" + "github.com/pingcap/tiup/pkg/utils" + "os" + "path/filepath" + "strings" +) + +// TiCDC represent a ticdc instance. +type TiCDC struct { + instance + pds []*PDInstance + Process +} + +var _ Instance = &TiCDC{} + +// NewTiCDC create a TiCDC instance. +func NewTiCDC(binPath string, dir, host, configPath string, id int, pds []*PDInstance) *TiCDC { + ticdc := &TiCDC{ + instance: instance{ + BinPath: binPath, + ID: id, + Dir: dir, + Host: host, + Port: utils.MustGetFreePort(host, 8300), + ConfigPath: configPath, + }, + pds: pds, + } + ticdc.StatusPort = ticdc.Port + return ticdc +} + +// Start implements Instance interface. +func (c *TiCDC) Start(ctx context.Context, version v0manifest.Version) error { + if err := os.MkdirAll(c.Dir, 0755); err != nil { + return err + } + + var urls []string + for _, pd := range c.pds { + urls = append(urls, fmt.Sprintf("http://%s:%d", pd.Host, pd.StatusPort)) + } + + args := []string{ + "server", + fmt.Sprintf("--addr=%s:%d", c.Host, c.Port), + fmt.Sprintf("--advertise-addr=%s:%d", advertiseHost(c.Host), c.Port), + fmt.Sprintf("--pd=%s", strings.Join(urls, ",")), + fmt.Sprintf("--log-file=%s", c.LogFile()), + } + + var err error + if c.Process, err = NewComponentProcess(ctx, c.Dir, c.BinPath, "cdc", version, args...); err != nil { + return err + } + logIfErr(c.Process.SetOutputFile(c.LogFile())) + + return c.Process.Start() +} + +// Component return component name. +func (c *TiCDC) Component() string { + return "ticdc" +} + +// LogFile return the log file. +func (c *TiCDC) LogFile() string { + return filepath.Join(c.Dir, "ticdc.log") +} diff --git a/components/playground/main.go b/components/playground/main.go index ed30625421..b66d9e2292 100644 --- a/components/playground/main.go +++ b/components/playground/main.go @@ -51,6 +51,7 @@ type bootOptions struct { tidb instance.Config tikv instance.Config tiflash instance.Config + ticdc instance.Config pump instance.Config drainer instance.Config host string @@ -198,6 +199,7 @@ Examples: rootCmd.Flags().IntVarP(&opt.tikv.Num, "kv", "", opt.tikv.Num, "TiKV instance number") rootCmd.Flags().IntVarP(&opt.pd.Num, "pd", "", opt.pd.Num, "PD instance number") rootCmd.Flags().IntVarP(&opt.tiflash.Num, "tiflash", "", opt.tiflash.Num, "TiFlash instance number") + rootCmd.Flags().IntVarP(&opt.ticdc.Num, "ticdc", "", opt.ticdc.Num, "TiCDC instance number") rootCmd.Flags().IntVarP(&opt.pump.Num, "pump", "", opt.pump.Num, "Pump instance number") rootCmd.Flags().IntVarP(&opt.drainer.Num, "drainer", "", opt.drainer.Num, "Drainer instance number") @@ -217,6 +219,7 @@ Examples: rootCmd.Flags().StringVarP(&opt.tikv.BinPath, "kv.binpath", "", opt.tikv.BinPath, "TiKV instance binary path") rootCmd.Flags().StringVarP(&opt.pd.BinPath, "pd.binpath", "", opt.pd.BinPath, "PD instance binary path") rootCmd.Flags().StringVarP(&opt.tiflash.BinPath, "tiflash.binpath", "", opt.tiflash.BinPath, "TiFlash instance binary path") + rootCmd.Flags().StringVarP(&opt.ticdc.BinPath, "ticdc.binpath", "", opt.ticdc.BinPath, "TiCDC instance binary path") rootCmd.Flags().StringVarP(&opt.pump.BinPath, "pump.binpath", "", opt.pump.BinPath, "Pump instance binary path") rootCmd.Flags().StringVarP(&opt.drainer.BinPath, "drainer.binpath", "", opt.drainer.BinPath, "Drainer instance binary path") diff --git a/components/playground/playground.go b/components/playground/playground.go index 0a49701d11..feefae3d67 100755 --- a/components/playground/playground.go +++ b/components/playground/playground.go @@ -60,6 +60,7 @@ type Playground struct { tikvs []*instance.TiKVInstance tidbs []*instance.TiDBInstance tiflashs []*instance.TiFlashInstance + ticdcs []*instance.TiCDC pumps []*instance.Pump drainers []*instance.Drainer startedInstances []instance.Instance @@ -295,6 +296,12 @@ func (p *Playground) handleScaleIn(w io.Writer, pid int) error { p.tidbs = append(p.tidbs[:i], p.tidbs[i+1:]...) } } + case "ticdc": + for i := 0; i < len(p.ticdcs); i++ { + if p.ticdcs[i].Pid() == pid { + p.ticdcs = append(p.ticdcs[:i], p.ticdcs[i+1:]...) + } + } case "tiflash": for i := 0; i < len(p.tiflashs); i++ { if p.tiflashs[i].Pid() == pid { @@ -393,6 +400,8 @@ func (p *Playground) sanitizeComponentConfig(cid string, cfg *instance.Config) e return p.sanitizeConfig(p.bootOptions.tidb, cfg) case "tiflash": return p.sanitizeConfig(p.bootOptions.tiflash, cfg) + case "ticdc": + return p.sanitizeConfig(p.bootOptions.ticdc, cfg) case "pump": return p.sanitizeConfig(p.bootOptions.pump, cfg) case "drainer": @@ -548,6 +557,13 @@ func (p *Playground) WalkInstances(fn func(componentID string, ins instance.Inst } } + for _, ins := range p.ticdcs { + err := fn("ticdc", ins) + if err != nil { + return errors.AddStack(err) + } + } + for _, ins := range p.drainers { err := fn("drainer", ins) if err != nil { @@ -618,6 +634,10 @@ func (p *Playground) addInstance(componentID string, cfg instance.Config) (ins i inst := instance.NewTiFlashInstance(cfg.BinPath, dir, host, cfg.ConfigPath, id, p.pds, p.tidbs) ins = inst p.tiflashs = append(p.tiflashs, inst) + case "ticdc": + inst := instance.NewTiCDC(cfg.BinPath, dir, host, cfg.ConfigPath, id, p.pds) + ins = inst + p.ticdcs = append(p.ticdcs, inst) case "pump": inst := instance.NewPump(cfg.BinPath, dir, host, cfg.ConfigPath, id, p.pds) ins = inst @@ -699,6 +719,12 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme return errors.AddStack(err) } } + for i := 0; i < options.ticdc.Num; i++ { + _, err := p.addInstance("ticdc", options.ticdc) + if err != nil { + return errors.AddStack(err) + } + } for i := 0; i < options.drainer.Num; i++ { _, err := p.addInstance("drainer", options.drainer) if err != nil {