diff --git a/components/playground/command.go b/components/playground/command.go index 14c1362dd6..4857178f24 100644 --- a/components/playground/command.go +++ b/components/playground/command.go @@ -56,6 +56,7 @@ func buildCommands(tp CommandType, opt *BootOptions) (cmds []Command) { {"tiflash", opt.TiFlash}, {"tidb", opt.TiDB}, {"ticdc", opt.TiCDC}, + {"tikv-cdc", opt.TiKVCDC}, {"drainer", opt.Drainer}, } @@ -98,6 +99,7 @@ func newScaleOut() *cobra.Command { cmd.Flags().IntVarP(&opt.PD.Num, "pd", "", opt.PD.Num, "PD instance number") cmd.Flags().IntVarP(&opt.TiFlash.Num, "tiflash", "", opt.TiFlash.Num, "TiFlash instance number") cmd.Flags().IntVarP(&opt.TiCDC.Num, "ticdc", "", opt.TiCDC.Num, "TiCDC instance number") + cmd.Flags().IntVarP(&opt.TiKVCDC.Num, "kvcdc", "", opt.TiKVCDC.Num, "TiKV-CDC instance number") cmd.Flags().IntVarP(&opt.Pump.Num, "pump", "", opt.Pump.Num, "Pump instance number") cmd.Flags().IntVarP(&opt.Drainer.Num, "drainer", "", opt.Pump.Num, "Drainer instance number") @@ -116,6 +118,7 @@ func newScaleOut() *cobra.Command { cmd.Flags().StringVarP(&opt.PD.BinPath, "pd.binpath", "", opt.PD.BinPath, "PD instance binary path") cmd.Flags().StringVarP(&opt.TiFlash.BinPath, "tiflash.binpath", "", opt.TiFlash.BinPath, "TiFlash instance binary path") cmd.Flags().StringVarP(&opt.TiCDC.BinPath, "ticdc.binpath", "", opt.TiCDC.BinPath, "TiCDC instance binary path") + cmd.Flags().StringVarP(&opt.TiKVCDC.BinPath, "kvcdc.binpath", "", opt.TiKVCDC.BinPath, "TiKVCDC instance binary path") cmd.Flags().StringVarP(&opt.Pump.BinPath, "pump.binpath", "", opt.Pump.BinPath, "Pump instance binary path") cmd.Flags().StringVarP(&opt.Drainer.BinPath, "drainer.binpath", "", opt.Drainer.BinPath, "Drainer instance binary path") diff --git a/components/playground/instance/instance.go b/components/playground/instance/instance.go index aa1789bfa1..294c30326e 100644 --- a/components/playground/instance/instance.go +++ b/components/playground/instance/instance.go @@ -29,6 +29,7 @@ type Config struct { Host string `yaml:"host"` Port int `yaml:"port"` UpTimeout int `yaml:"up_timeout"` + Version string `yaml:"version"` } type instance struct { diff --git a/components/playground/instance/tikv_cdc.go b/components/playground/instance/tikv_cdc.go new file mode 100644 index 0000000000..7d970db1c7 --- /dev/null +++ b/components/playground/instance/tikv_cdc.go @@ -0,0 +1,86 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package instance + +import ( + "context" + "fmt" + "path/filepath" + "strings" + + tiupexec "github.com/pingcap/tiup/pkg/exec" + "github.com/pingcap/tiup/pkg/utils" +) + +// TiKVCDC represent a TiKV-CDC instance. +type TiKVCDC struct { + instance + pds []*PDInstance + Process +} + +var _ Instance = &TiKVCDC{} + +// NewTiKVCDC create a TiKVCDC instance. +func NewTiKVCDC(binPath string, dir, host, configPath string, id int, pds []*PDInstance) *TiKVCDC { + tikvCdc := &TiKVCDC{ + instance: instance{ + BinPath: binPath, + ID: id, + Dir: dir, + Host: host, + Port: utils.MustGetFreePort(host, 8600), + ConfigPath: configPath, + }, + pds: pds, + } + tikvCdc.StatusPort = tikvCdc.Port + return tikvCdc +} + +// Start implements Instance interface. +func (c *TiKVCDC) Start(ctx context.Context, version utils.Version) error { + endpoints := pdEndpoints(c.pds, true) + + args := []string{ + "server", + fmt.Sprintf("--addr=%s:%d", c.Host, c.Port), + fmt.Sprintf("--advertise-addr=%s:%d", AdvertiseHost(c.Host), c.Port), + fmt.Sprintf("--pd=%s", strings.Join(endpoints, ",")), + fmt.Sprintf("--log-file=%s", c.LogFile()), + fmt.Sprintf("--data-dir=%s", filepath.Join(c.Dir, "data")), + } + if c.ConfigPath != "" { + args = append(args, fmt.Sprintf("--config=%s", c.ConfigPath)) + } + + var err error + if c.BinPath, err = tiupexec.PrepareBinary("tikv-cdc", version, c.BinPath); err != nil { + return err + } + c.Process = &process{cmd: PrepareCommand(ctx, c.BinPath, args, nil, c.Dir)} + + logIfErr(c.Process.SetOutputFile(c.LogFile())) + return c.Process.Start() +} + +// Component return component name. +func (c *TiKVCDC) Component() string { + return "tikv-cdc" +} + +// LogFile return the log file. +func (c *TiKVCDC) LogFile() string { + return filepath.Join(c.Dir, "tikv_cdc.log") +} diff --git a/components/playground/main.go b/components/playground/main.go index 56e38b42c1..6a8dec4897 100644 --- a/components/playground/main.go +++ b/components/playground/main.go @@ -60,6 +60,7 @@ type BootOptions struct { TiKV instance.Config `yaml:"tikv"` TiFlash instance.Config `yaml:"tiflash"` TiCDC instance.Config `yaml:"ticdc"` + TiKVCDC instance.Config `yaml:"tikv_cdc"` Pump instance.Config `yaml:"pump"` Drainer instance.Config `yaml:"drainer"` Host string `yaml:"host"` @@ -89,6 +90,7 @@ const ( pd = "pd" tiflash = "tiflash" ticdc = "ticdc" + kvcdc = "kvcdc" pump = "pump" drainer = "drainer" @@ -109,6 +111,7 @@ const ( pdConfig = "pd.config" tiflashConfig = "tiflash.config" ticdcConfig = "ticdc.config" + kvcdcConfig = "kvcdc.config" pumpConfig = "pump.config" drainerConfig = "drainer.config" @@ -118,8 +121,12 @@ const ( pdBinpath = "pd.binpath" tiflashBinpath = "tiflash.binpath" ticdcBinpath = "ticdc.binpath" + kvcdcBinpath = "kvcdc.binpath" pumpBinpath = "pump.binpath" drainerBinpath = "drainer.binpath" + + // component version + kvcdcVersion = "kvcdc.version" ) func installIfMissing(component, version string) error { @@ -319,6 +326,7 @@ If you'd like to use a TiDB version other than %s, cancel and retry with the fol rootCmd.Flags().Int(pd, defaultOptions.PD.Num, "PD instance number") rootCmd.Flags().Int(tiflash, defaultOptions.TiFlash.Num, "TiFlash instance number") rootCmd.Flags().Int(ticdc, defaultOptions.TiCDC.Num, "TiCDC instance number") + rootCmd.Flags().Int(kvcdc, defaultOptions.TiKVCDC.Num, "TiKV-CDC instance number") rootCmd.Flags().Int(pump, defaultOptions.Pump.Num, "Pump instance number") rootCmd.Flags().Int(drainer, defaultOptions.Drainer.Num, "Drainer instance number") @@ -338,15 +346,19 @@ If you'd like to use a TiDB version other than %s, cancel and retry with the fol rootCmd.Flags().String(pumpConfig, defaultOptions.Pump.ConfigPath, "Pump instance configuration file") rootCmd.Flags().String(drainerConfig, defaultOptions.Drainer.ConfigPath, "Drainer instance configuration file") rootCmd.Flags().String(ticdcConfig, defaultOptions.TiCDC.ConfigPath, "TiCDC instance configuration file") + rootCmd.Flags().String(kvcdcConfig, defaultOptions.TiKVCDC.ConfigPath, "TiKV-CDC instance configuration file") rootCmd.Flags().String(dbBinpath, defaultOptions.TiDB.BinPath, "TiDB instance binary path") rootCmd.Flags().String(kvBinpath, defaultOptions.TiKV.BinPath, "TiKV instance binary path") rootCmd.Flags().String(pdBinpath, defaultOptions.PD.BinPath, "PD instance binary path") rootCmd.Flags().String(tiflashBinpath, defaultOptions.TiFlash.BinPath, "TiFlash instance binary path") rootCmd.Flags().String(ticdcBinpath, defaultOptions.TiCDC.BinPath, "TiCDC instance binary path") + rootCmd.Flags().String(kvcdcBinpath, defaultOptions.TiKVCDC.BinPath, "TiKV-CDC instance binary path") rootCmd.Flags().String(pumpBinpath, defaultOptions.Pump.BinPath, "Pump instance binary path") rootCmd.Flags().String(drainerBinpath, defaultOptions.Drainer.BinPath, "Drainer instance binary path") + rootCmd.Flags().String(kvcdcVersion, defaultOptions.TiKVCDC.Version, "TiKV-CDC instance version") + rootCmd.AddCommand(newDisplay()) rootCmd.AddCommand(newScaleOut()) rootCmd.AddCommand(newScaleIn()) @@ -418,6 +430,11 @@ func populateOpt(flagSet *pflag.FlagSet) (err error) { if err != nil { return } + case kvcdc: + options.TiKVCDC.Num, err = strconv.Atoi(flag.Value.String()) + if err != nil { + return + } case pump: options.Pump.Num, err = strconv.Atoi(flag.Value.String()) if err != nil { @@ -439,6 +456,8 @@ func populateOpt(flagSet *pflag.FlagSet) (err error) { options.TiFlash.ConfigPath = flag.Value.String() case ticdcConfig: options.TiCDC.ConfigPath = flag.Value.String() + case kvcdcConfig: + options.TiKVCDC.ConfigPath = flag.Value.String() case pumpConfig: options.Pump.ConfigPath = flag.Value.String() case drainerConfig: @@ -454,6 +473,8 @@ func populateOpt(flagSet *pflag.FlagSet) (err error) { options.TiFlash.BinPath = flag.Value.String() case ticdcBinpath: options.TiCDC.BinPath = flag.Value.String() + case kvcdcBinpath: + options.TiKVCDC.BinPath = flag.Value.String() case pumpBinpath: options.Pump.BinPath = flag.Value.String() case drainerBinpath: @@ -486,6 +507,9 @@ func populateOpt(flagSet *pflag.FlagSet) (err error) { if err != nil { return } + + case kvcdcVersion: + options.TiKVCDC.Version = flag.Value.String() } }) diff --git a/components/playground/playground.go b/components/playground/playground.go index 3698770456..f0b03b404e 100644 --- a/components/playground/playground.go +++ b/components/playground/playground.go @@ -64,6 +64,7 @@ type Playground struct { tidbs []*instance.TiDBInstance tiflashs []*instance.TiFlashInstance ticdcs []*instance.TiCDC + tikvCdcs []*instance.TiKVCDC pumps []*instance.Pump drainers []*instance.Drainer startedInstances []instance.Instance @@ -309,6 +310,12 @@ func (p *Playground) handleScaleIn(w io.Writer, pid int) error { p.ticdcs = append(p.ticdcs[:i], p.ticdcs[i+1:]...) } } + case spec.ComponentTiKVCDC: + for i := 0; i < len(p.tikvCdcs); i++ { + if p.tikvCdcs[i].Pid() == pid { + p.tikvCdcs = append(p.tikvCdcs[:i], p.tikvCdcs[i+1:]...) + } + } case spec.ComponentTiFlash: for i := 0; i < len(p.tiflashs); i++ { if p.tiflashs[i].Pid() == pid { @@ -409,6 +416,8 @@ func (p *Playground) sanitizeComponentConfig(cid string, cfg *instance.Config) e return p.sanitizeConfig(p.bootOptions.TiFlash, cfg) case spec.ComponentCDC: return p.sanitizeConfig(p.bootOptions.TiCDC, cfg) + case spec.ComponentTiKVCDC: + return p.sanitizeConfig(p.bootOptions.TiKVCDC, cfg) case spec.ComponentPump: return p.sanitizeConfig(p.bootOptions.Pump, cfg) case spec.ComponentDrainer: @@ -419,7 +428,8 @@ func (p *Playground) sanitizeComponentConfig(cid string, cfg *instance.Config) e } func (p *Playground) startInstance(ctx context.Context, inst instance.Instance) error { - version, err := environment.GlobalEnv().V1Repository().ResolveComponentVersion(inst.Component(), p.bootOptions.Version) + boundVersion := p.bindVersion(inst.Component(), p.bootOptions.Version) + version, err := environment.GlobalEnv().V1Repository().ResolveComponentVersion(inst.Component(), boundVersion) if err != nil { return err } @@ -593,6 +603,13 @@ func (p *Playground) WalkInstances(fn func(componentID string, ins instance.Inst } } + for _, ins := range p.tikvCdcs { + err := fn(spec.ComponentTiKVCDC, ins) + if err != nil { + return err + } + } + for _, ins := range p.drainers { err := fn(spec.ComponentDrainer, ins) if err != nil { @@ -670,6 +687,10 @@ func (p *Playground) addInstance(componentID string, cfg instance.Config) (ins i inst := instance.NewTiCDC(cfg.BinPath, dir, host, cfg.ConfigPath, id, p.pds) ins = inst p.ticdcs = append(p.ticdcs, inst) + case spec.ComponentTiKVCDC: + inst := instance.NewTiKVCDC(cfg.BinPath, dir, host, cfg.ConfigPath, id, p.pds) + ins = inst + p.tikvCdcs = append(p.tikvCdcs, inst) case spec.ComponentPump: inst := instance.NewPump(cfg.BinPath, dir, host, cfg.ConfigPath, id, p.pds) ins = inst @@ -765,6 +786,15 @@ func (p *Playground) waitAllTiFlashUp() { } } +func (p *Playground) bindVersion(comp string, version string) (bindVersion string) { + switch comp { + case spec.ComponentTiKVCDC: + return p.bootOptions.TiKVCDC.Version + default: + return version + } +} + func (p *Playground) bootCluster(ctx context.Context, env *environment.Environment, options *BootOptions) error { for _, cfg := range []*instance.Config{ &options.PD, @@ -773,6 +803,7 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme &options.TiFlash, &options.Pump, &options.Drainer, + &options.TiKVCDC, } { path, err := getAbsolutePath(cfg.ConfigPath) if err != nil { @@ -807,6 +838,7 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme {spec.ComponentPump, options.Pump}, {spec.ComponentTiDB, options.TiDB}, {spec.ComponentCDC, options.TiCDC}, + {spec.ComponentTiKVCDC, options.TiKVCDC}, {spec.ComponentDrainer, options.Drainer}, {spec.ComponentTiFlash, options.TiFlash}, } @@ -1001,6 +1033,11 @@ func (p *Playground) terminate(sig syscall.Signal) { kill(inst.Component(), inst.Pid(), inst.Wait) } } + for _, inst := range p.tikvCdcs { + if inst.Process != nil { + kill(inst.Component(), inst.Pid(), inst.Wait) + } + } for _, inst := range p.drainers { if inst.Process != nil { kill(inst.Component(), inst.Pid(), inst.Wait) diff --git a/pkg/cluster/spec/instance.go b/pkg/cluster/spec/instance.go index 1956e003e3..bf4505c8cc 100644 --- a/pkg/cluster/spec/instance.go +++ b/pkg/cluster/spec/instance.go @@ -44,6 +44,7 @@ const ( ComponentDrainer = "drainer" ComponentPump = "pump" ComponentCDC = "cdc" + ComponentTiKVCDC = "tikv-cdc" ComponentTiSpark = "tispark" ComponentSpark = "spark" ComponentAlertmanager = "alertmanager" diff --git a/tests/tiup-playground/test_playground.sh b/tests/tiup-playground/test_playground.sh index 8df20df2d3..73200c127e 100755 --- a/tests/tiup-playground/test_playground.sh +++ b/tests/tiup-playground/test_playground.sh @@ -4,7 +4,7 @@ set -eux TEST_DIR=$(cd "$(dirname "$0")"; pwd) TMP_DIR=$TEST_DIR/_tmp - +TIDB_VERSION="v6.2.0" # Profile home directory mkdir -p $TMP_DIR/home/bin/ @@ -28,13 +28,14 @@ function tiup-playground() { fi } -# usage: check_tidb_num 1 +# usage: check_instance_num tidb 1 # make sure the tidb number is 1 or other specified number -function check_tidb_num() { - mustbe=$1 - num=$(tiup-playground display | grep "tidb" | wc -l | sed 's/ //g') +function check_instance_num() { + instance=$1 + mustbe=$2 + num=$(tiup-playground display | grep "$instance" | wc -l | sed 's/ //g') if [ "$num" != "$mustbe" ]; then - echo "unexpected tidb instance number: $num" + echo "unexpected $instance instance number: $num" tiup-playground display fi } @@ -43,6 +44,7 @@ function kill_all() { killall -9 tidb-server || true killall -9 tikv-server || true killall -9 pd-server || true + killall -9 tikv-cdc || true killall -9 tiflash || true killall -9 grafana-server || true killall -9 tiup-playground || true @@ -52,7 +54,7 @@ function kill_all() { } outfile=/tmp/tiup-playground-test.out -tiup-playground v6.0.0 > $outfile 2>&1 & +tiup-playground $TIDB_VERSION > $outfile 2>&1 & # wait $outfile generated sleep 3 @@ -71,14 +73,14 @@ sleep 5 ls "${TIUP_HOME}/data/test_play/prometheus/data" # 1(init) + 2(scale-out) -check_tidb_num 3 +check_instance_num tidb 3 # get pid of one tidb instance and scale-in pid=`tiup-playground display | grep "tidb" | awk 'NR==1 {print $1}'` tiup-playground scale-in --pid $pid sleep 5 -check_tidb_num 2 +check_instance_num tidb 2 # get pid of one tidb instance and kill it pid=`tiup-playground display | grep "tidb" | awk 'NR==1 {print $1}'` @@ -102,7 +104,7 @@ killall -2 tiup-playground.test || killall -2 tiup-playground sleep 100 # test restart with same data -tiup-playground v6.0.0 > $outfile 2>&1 & +tiup-playground $TIDB_VERSION > $outfile 2>&1 & # wait $outfile generated sleep 3 @@ -116,7 +118,7 @@ cat $outfile | grep ":3930" | grep -q "Done" TAG="test_1" outfile_1=/tmp/tiup-playground-test_1.out # no TiFlash to speed up -tiup-playground v6.0.0 --tag $TAG --db 2 --tiflash 0 > $outfile_1 2>&1 & +tiup-playground $TIDB_VERSION --tag $TAG --db 2 --tiflash 0 > $outfile_1 2>&1 & sleep 3 timeout 300 grep -q "CLUSTER START SUCCESSFULLY" <(tail -f $outfile_1) tiup-playground --tag $TAG display | grep -qv "exit" @@ -135,6 +137,24 @@ if [ "$tidb_num" != 3 ]; then exit 1 fi +killall -2 tiup-playground.test || killall -2 tiup-playground +sleep 100 + +# test for TiKV-CDC +echo -e "\033[0;36m<<< Run TiKV-CDC test >>>\033[0m" +tiup-playground $TIDB_VERSION --db 1 --pd 1 --kv 1 --tiflash 0 --kvcdc 1 --kvcdc.version v1.0.0 > $outfile 2>&1 & +sleep 3 +timeout 300 grep -q "CLUSTER START SUCCESSFULLY" <(tail -f $outfile) +tiup-playground display | grep -qv "exit" +# scale out +tiup-playground scale-out --kvcdc 2 +sleep 5 +check_instance_num tikv-cdc 3 # 1(init) + 2(scale-out) +# scale in +pid=`tiup-playground display | grep "tikv-cdc" | awk 'NR==1 {print $1}'` +tiup-playground scale-in --pid $pid +sleep 5 +check_instance_num tikv-cdc 2 # exit all killall -2 tiup-playground.test || killall -2 tiup-playground