Skip to content

Commit

Permalink
playground: Supports TiKV-CDC component (#2000)
Browse files Browse the repository at this point in the history
  • Loading branch information
pingyu authored Sep 8, 2022
1 parent 42b07cb commit 3671d82
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 12 deletions.
3 changes: 3 additions & 0 deletions components/playground/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func buildCommands(tp CommandType, opt *BootOptions) (cmds []Command) {
{"tiflash", opt.TiFlash},
{"tidb", opt.TiDB},
{"ticdc", opt.TiCDC},
{"tikv-cdc", opt.TiKVCDC},
{"drainer", opt.Drainer},
}

Expand Down Expand Up @@ -98,6 +99,7 @@ func newScaleOut() *cobra.Command {
cmd.Flags().IntVarP(&opt.PD.Num, "pd", "", opt.PD.Num, "PD instance number")
cmd.Flags().IntVarP(&opt.TiFlash.Num, "tiflash", "", opt.TiFlash.Num, "TiFlash instance number")
cmd.Flags().IntVarP(&opt.TiCDC.Num, "ticdc", "", opt.TiCDC.Num, "TiCDC instance number")
cmd.Flags().IntVarP(&opt.TiKVCDC.Num, "kvcdc", "", opt.TiKVCDC.Num, "TiKV-CDC instance number")
cmd.Flags().IntVarP(&opt.Pump.Num, "pump", "", opt.Pump.Num, "Pump instance number")
cmd.Flags().IntVarP(&opt.Drainer.Num, "drainer", "", opt.Pump.Num, "Drainer instance number")

Expand All @@ -116,6 +118,7 @@ func newScaleOut() *cobra.Command {
cmd.Flags().StringVarP(&opt.PD.BinPath, "pd.binpath", "", opt.PD.BinPath, "PD instance binary path")
cmd.Flags().StringVarP(&opt.TiFlash.BinPath, "tiflash.binpath", "", opt.TiFlash.BinPath, "TiFlash instance binary path")
cmd.Flags().StringVarP(&opt.TiCDC.BinPath, "ticdc.binpath", "", opt.TiCDC.BinPath, "TiCDC instance binary path")
cmd.Flags().StringVarP(&opt.TiKVCDC.BinPath, "kvcdc.binpath", "", opt.TiKVCDC.BinPath, "TiKVCDC instance binary path")
cmd.Flags().StringVarP(&opt.Pump.BinPath, "pump.binpath", "", opt.Pump.BinPath, "Pump instance binary path")
cmd.Flags().StringVarP(&opt.Drainer.BinPath, "drainer.binpath", "", opt.Drainer.BinPath, "Drainer instance binary path")

Expand Down
1 change: 1 addition & 0 deletions components/playground/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Config struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
UpTimeout int `yaml:"up_timeout"`
Version string `yaml:"version"`
}

type instance struct {
Expand Down
86 changes: 86 additions & 0 deletions components/playground/instance/tikv_cdc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package instance

import (
"context"
"fmt"
"path/filepath"
"strings"

tiupexec "github.com/pingcap/tiup/pkg/exec"
"github.com/pingcap/tiup/pkg/utils"
)

// TiKVCDC represent a TiKV-CDC instance.
type TiKVCDC struct {
instance
pds []*PDInstance
Process
}

var _ Instance = &TiKVCDC{}

// NewTiKVCDC create a TiKVCDC instance.
func NewTiKVCDC(binPath string, dir, host, configPath string, id int, pds []*PDInstance) *TiKVCDC {
tikvCdc := &TiKVCDC{
instance: instance{
BinPath: binPath,
ID: id,
Dir: dir,
Host: host,
Port: utils.MustGetFreePort(host, 8600),
ConfigPath: configPath,
},
pds: pds,
}
tikvCdc.StatusPort = tikvCdc.Port
return tikvCdc
}

// Start implements Instance interface.
func (c *TiKVCDC) Start(ctx context.Context, version utils.Version) error {
endpoints := pdEndpoints(c.pds, true)

args := []string{
"server",
fmt.Sprintf("--addr=%s:%d", c.Host, c.Port),
fmt.Sprintf("--advertise-addr=%s:%d", AdvertiseHost(c.Host), c.Port),
fmt.Sprintf("--pd=%s", strings.Join(endpoints, ",")),
fmt.Sprintf("--log-file=%s", c.LogFile()),
fmt.Sprintf("--data-dir=%s", filepath.Join(c.Dir, "data")),
}
if c.ConfigPath != "" {
args = append(args, fmt.Sprintf("--config=%s", c.ConfigPath))
}

var err error
if c.BinPath, err = tiupexec.PrepareBinary("tikv-cdc", version, c.BinPath); err != nil {
return err
}
c.Process = &process{cmd: PrepareCommand(ctx, c.BinPath, args, nil, c.Dir)}

logIfErr(c.Process.SetOutputFile(c.LogFile()))
return c.Process.Start()
}

// Component return component name.
func (c *TiKVCDC) Component() string {
return "tikv-cdc"
}

// LogFile return the log file.
func (c *TiKVCDC) LogFile() string {
return filepath.Join(c.Dir, "tikv_cdc.log")
}
24 changes: 24 additions & 0 deletions components/playground/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type BootOptions struct {
TiKV instance.Config `yaml:"tikv"`
TiFlash instance.Config `yaml:"tiflash"`
TiCDC instance.Config `yaml:"ticdc"`
TiKVCDC instance.Config `yaml:"tikv_cdc"`
Pump instance.Config `yaml:"pump"`
Drainer instance.Config `yaml:"drainer"`
Host string `yaml:"host"`
Expand Down Expand Up @@ -89,6 +90,7 @@ const (
pd = "pd"
tiflash = "tiflash"
ticdc = "ticdc"
kvcdc = "kvcdc"
pump = "pump"
drainer = "drainer"

Expand All @@ -109,6 +111,7 @@ const (
pdConfig = "pd.config"
tiflashConfig = "tiflash.config"
ticdcConfig = "ticdc.config"
kvcdcConfig = "kvcdc.config"
pumpConfig = "pump.config"
drainerConfig = "drainer.config"

Expand All @@ -118,8 +121,12 @@ const (
pdBinpath = "pd.binpath"
tiflashBinpath = "tiflash.binpath"
ticdcBinpath = "ticdc.binpath"
kvcdcBinpath = "kvcdc.binpath"
pumpBinpath = "pump.binpath"
drainerBinpath = "drainer.binpath"

// component version
kvcdcVersion = "kvcdc.version"
)

func installIfMissing(component, version string) error {
Expand Down Expand Up @@ -319,6 +326,7 @@ If you'd like to use a TiDB version other than %s, cancel and retry with the fol
rootCmd.Flags().Int(pd, defaultOptions.PD.Num, "PD instance number")
rootCmd.Flags().Int(tiflash, defaultOptions.TiFlash.Num, "TiFlash instance number")
rootCmd.Flags().Int(ticdc, defaultOptions.TiCDC.Num, "TiCDC instance number")
rootCmd.Flags().Int(kvcdc, defaultOptions.TiKVCDC.Num, "TiKV-CDC instance number")
rootCmd.Flags().Int(pump, defaultOptions.Pump.Num, "Pump instance number")
rootCmd.Flags().Int(drainer, defaultOptions.Drainer.Num, "Drainer instance number")

Expand All @@ -338,15 +346,19 @@ If you'd like to use a TiDB version other than %s, cancel and retry with the fol
rootCmd.Flags().String(pumpConfig, defaultOptions.Pump.ConfigPath, "Pump instance configuration file")
rootCmd.Flags().String(drainerConfig, defaultOptions.Drainer.ConfigPath, "Drainer instance configuration file")
rootCmd.Flags().String(ticdcConfig, defaultOptions.TiCDC.ConfigPath, "TiCDC instance configuration file")
rootCmd.Flags().String(kvcdcConfig, defaultOptions.TiKVCDC.ConfigPath, "TiKV-CDC instance configuration file")

rootCmd.Flags().String(dbBinpath, defaultOptions.TiDB.BinPath, "TiDB instance binary path")
rootCmd.Flags().String(kvBinpath, defaultOptions.TiKV.BinPath, "TiKV instance binary path")
rootCmd.Flags().String(pdBinpath, defaultOptions.PD.BinPath, "PD instance binary path")
rootCmd.Flags().String(tiflashBinpath, defaultOptions.TiFlash.BinPath, "TiFlash instance binary path")
rootCmd.Flags().String(ticdcBinpath, defaultOptions.TiCDC.BinPath, "TiCDC instance binary path")
rootCmd.Flags().String(kvcdcBinpath, defaultOptions.TiKVCDC.BinPath, "TiKV-CDC instance binary path")
rootCmd.Flags().String(pumpBinpath, defaultOptions.Pump.BinPath, "Pump instance binary path")
rootCmd.Flags().String(drainerBinpath, defaultOptions.Drainer.BinPath, "Drainer instance binary path")

rootCmd.Flags().String(kvcdcVersion, defaultOptions.TiKVCDC.Version, "TiKV-CDC instance version")

rootCmd.AddCommand(newDisplay())
rootCmd.AddCommand(newScaleOut())
rootCmd.AddCommand(newScaleIn())
Expand Down Expand Up @@ -418,6 +430,11 @@ func populateOpt(flagSet *pflag.FlagSet) (err error) {
if err != nil {
return
}
case kvcdc:
options.TiKVCDC.Num, err = strconv.Atoi(flag.Value.String())
if err != nil {
return
}
case pump:
options.Pump.Num, err = strconv.Atoi(flag.Value.String())
if err != nil {
Expand All @@ -439,6 +456,8 @@ func populateOpt(flagSet *pflag.FlagSet) (err error) {
options.TiFlash.ConfigPath = flag.Value.String()
case ticdcConfig:
options.TiCDC.ConfigPath = flag.Value.String()
case kvcdcConfig:
options.TiKVCDC.ConfigPath = flag.Value.String()
case pumpConfig:
options.Pump.ConfigPath = flag.Value.String()
case drainerConfig:
Expand All @@ -454,6 +473,8 @@ func populateOpt(flagSet *pflag.FlagSet) (err error) {
options.TiFlash.BinPath = flag.Value.String()
case ticdcBinpath:
options.TiCDC.BinPath = flag.Value.String()
case kvcdcBinpath:
options.TiKVCDC.BinPath = flag.Value.String()
case pumpBinpath:
options.Pump.BinPath = flag.Value.String()
case drainerBinpath:
Expand Down Expand Up @@ -486,6 +507,9 @@ func populateOpt(flagSet *pflag.FlagSet) (err error) {
if err != nil {
return
}

case kvcdcVersion:
options.TiKVCDC.Version = flag.Value.String()
}
})

Expand Down
39 changes: 38 additions & 1 deletion components/playground/playground.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type Playground struct {
tidbs []*instance.TiDBInstance
tiflashs []*instance.TiFlashInstance
ticdcs []*instance.TiCDC
tikvCdcs []*instance.TiKVCDC
pumps []*instance.Pump
drainers []*instance.Drainer
startedInstances []instance.Instance
Expand Down Expand Up @@ -309,6 +310,12 @@ func (p *Playground) handleScaleIn(w io.Writer, pid int) error {
p.ticdcs = append(p.ticdcs[:i], p.ticdcs[i+1:]...)
}
}
case spec.ComponentTiKVCDC:
for i := 0; i < len(p.tikvCdcs); i++ {
if p.tikvCdcs[i].Pid() == pid {
p.tikvCdcs = append(p.tikvCdcs[:i], p.tikvCdcs[i+1:]...)
}
}
case spec.ComponentTiFlash:
for i := 0; i < len(p.tiflashs); i++ {
if p.tiflashs[i].Pid() == pid {
Expand Down Expand Up @@ -409,6 +416,8 @@ func (p *Playground) sanitizeComponentConfig(cid string, cfg *instance.Config) e
return p.sanitizeConfig(p.bootOptions.TiFlash, cfg)
case spec.ComponentCDC:
return p.sanitizeConfig(p.bootOptions.TiCDC, cfg)
case spec.ComponentTiKVCDC:
return p.sanitizeConfig(p.bootOptions.TiKVCDC, cfg)
case spec.ComponentPump:
return p.sanitizeConfig(p.bootOptions.Pump, cfg)
case spec.ComponentDrainer:
Expand All @@ -419,7 +428,8 @@ func (p *Playground) sanitizeComponentConfig(cid string, cfg *instance.Config) e
}

func (p *Playground) startInstance(ctx context.Context, inst instance.Instance) error {
version, err := environment.GlobalEnv().V1Repository().ResolveComponentVersion(inst.Component(), p.bootOptions.Version)
boundVersion := p.bindVersion(inst.Component(), p.bootOptions.Version)
version, err := environment.GlobalEnv().V1Repository().ResolveComponentVersion(inst.Component(), boundVersion)
if err != nil {
return err
}
Expand Down Expand Up @@ -593,6 +603,13 @@ func (p *Playground) WalkInstances(fn func(componentID string, ins instance.Inst
}
}

for _, ins := range p.tikvCdcs {
err := fn(spec.ComponentTiKVCDC, ins)
if err != nil {
return err
}
}

for _, ins := range p.drainers {
err := fn(spec.ComponentDrainer, ins)
if err != nil {
Expand Down Expand Up @@ -670,6 +687,10 @@ func (p *Playground) addInstance(componentID string, cfg instance.Config) (ins i
inst := instance.NewTiCDC(cfg.BinPath, dir, host, cfg.ConfigPath, id, p.pds)
ins = inst
p.ticdcs = append(p.ticdcs, inst)
case spec.ComponentTiKVCDC:
inst := instance.NewTiKVCDC(cfg.BinPath, dir, host, cfg.ConfigPath, id, p.pds)
ins = inst
p.tikvCdcs = append(p.tikvCdcs, inst)
case spec.ComponentPump:
inst := instance.NewPump(cfg.BinPath, dir, host, cfg.ConfigPath, id, p.pds)
ins = inst
Expand Down Expand Up @@ -765,6 +786,15 @@ func (p *Playground) waitAllTiFlashUp() {
}
}

func (p *Playground) bindVersion(comp string, version string) (bindVersion string) {
switch comp {
case spec.ComponentTiKVCDC:
return p.bootOptions.TiKVCDC.Version
default:
return version
}
}

func (p *Playground) bootCluster(ctx context.Context, env *environment.Environment, options *BootOptions) error {
for _, cfg := range []*instance.Config{
&options.PD,
Expand All @@ -773,6 +803,7 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme
&options.TiFlash,
&options.Pump,
&options.Drainer,
&options.TiKVCDC,
} {
path, err := getAbsolutePath(cfg.ConfigPath)
if err != nil {
Expand Down Expand Up @@ -807,6 +838,7 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme
{spec.ComponentPump, options.Pump},
{spec.ComponentTiDB, options.TiDB},
{spec.ComponentCDC, options.TiCDC},
{spec.ComponentTiKVCDC, options.TiKVCDC},
{spec.ComponentDrainer, options.Drainer},
{spec.ComponentTiFlash, options.TiFlash},
}
Expand Down Expand Up @@ -1001,6 +1033,11 @@ func (p *Playground) terminate(sig syscall.Signal) {
kill(inst.Component(), inst.Pid(), inst.Wait)
}
}
for _, inst := range p.tikvCdcs {
if inst.Process != nil {
kill(inst.Component(), inst.Pid(), inst.Wait)
}
}
for _, inst := range p.drainers {
if inst.Process != nil {
kill(inst.Component(), inst.Pid(), inst.Wait)
Expand Down
1 change: 1 addition & 0 deletions pkg/cluster/spec/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
ComponentDrainer = "drainer"
ComponentPump = "pump"
ComponentCDC = "cdc"
ComponentTiKVCDC = "tikv-cdc"
ComponentTiSpark = "tispark"
ComponentSpark = "spark"
ComponentAlertmanager = "alertmanager"
Expand Down
Loading

0 comments on commit 3671d82

Please sign in to comment.