Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

playground: Supports TiKV-CDC component #2000

Merged
merged 14 commits into from
Sep 8, 2022
3 changes: 3 additions & 0 deletions components/playground/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func buildCommands(tp CommandType, opt *BootOptions) (cmds []Command) {
{"tiflash", opt.TiFlash},
{"tidb", opt.TiDB},
{"ticdc", opt.TiCDC},
{"tikv-cdc", opt.TiKVCDC},
{"drainer", opt.Drainer},
}

Expand Down Expand Up @@ -98,6 +99,7 @@ func newScaleOut() *cobra.Command {
cmd.Flags().IntVarP(&opt.PD.Num, "pd", "", opt.PD.Num, "PD instance number")
cmd.Flags().IntVarP(&opt.TiFlash.Num, "tiflash", "", opt.TiFlash.Num, "TiFlash instance number")
cmd.Flags().IntVarP(&opt.TiCDC.Num, "ticdc", "", opt.TiCDC.Num, "TiCDC instance number")
cmd.Flags().IntVarP(&opt.TiKVCDC.Num, "kvcdc", "", opt.TiKVCDC.Num, "TiKV-CDC instance number")
cmd.Flags().IntVarP(&opt.Pump.Num, "pump", "", opt.Pump.Num, "Pump instance number")
cmd.Flags().IntVarP(&opt.Drainer.Num, "drainer", "", opt.Pump.Num, "Drainer instance number")

Expand All @@ -116,6 +118,7 @@ func newScaleOut() *cobra.Command {
cmd.Flags().StringVarP(&opt.PD.BinPath, "pd.binpath", "", opt.PD.BinPath, "PD instance binary path")
cmd.Flags().StringVarP(&opt.TiFlash.BinPath, "tiflash.binpath", "", opt.TiFlash.BinPath, "TiFlash instance binary path")
cmd.Flags().StringVarP(&opt.TiCDC.BinPath, "ticdc.binpath", "", opt.TiCDC.BinPath, "TiCDC instance binary path")
cmd.Flags().StringVarP(&opt.TiKVCDC.BinPath, "kvcdc.binpath", "", opt.TiKVCDC.BinPath, "TiKVCDC instance binary path")
cmd.Flags().StringVarP(&opt.Pump.BinPath, "pump.binpath", "", opt.Pump.BinPath, "Pump instance binary path")
cmd.Flags().StringVarP(&opt.Drainer.BinPath, "drainer.binpath", "", opt.Drainer.BinPath, "Drainer instance binary path")

Expand Down
1 change: 1 addition & 0 deletions components/playground/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Config struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
UpTimeout int `yaml:"up_timeout"`
Version string `yaml:"version"`
}

type instance struct {
Expand Down
86 changes: 86 additions & 0 deletions components/playground/instance/tikv_cdc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package instance

import (
"context"
"fmt"
"path/filepath"
"strings"

tiupexec "github.com/pingcap/tiup/pkg/exec"
"github.com/pingcap/tiup/pkg/utils"
)

// TiKVCDC represent a TiKV-CDC instance.
type TiKVCDC struct {
instance
pds []*PDInstance
Process
}

var _ Instance = &TiKVCDC{}

// NewTiKVCDC create a TiKVCDC instance.
func NewTiKVCDC(binPath string, dir, host, configPath string, id int, pds []*PDInstance) *TiKVCDC {
tikvCdc := &TiKVCDC{
instance: instance{
BinPath: binPath,
ID: id,
Dir: dir,
Host: host,
Port: utils.MustGetFreePort(host, 8600),
ConfigPath: configPath,
},
pds: pds,
}
tikvCdc.StatusPort = tikvCdc.Port
return tikvCdc
}

// Start implements Instance interface.
func (c *TiKVCDC) Start(ctx context.Context, version utils.Version) error {
endpoints := pdEndpoints(c.pds, true)

args := []string{
"server",
fmt.Sprintf("--addr=%s:%d", c.Host, c.Port),
fmt.Sprintf("--advertise-addr=%s:%d", AdvertiseHost(c.Host), c.Port),
fmt.Sprintf("--pd=%s", strings.Join(endpoints, ",")),
fmt.Sprintf("--log-file=%s", c.LogFile()),
fmt.Sprintf("--data-dir=%s", filepath.Join(c.Dir, "data")),
}
if c.ConfigPath != "" {
args = append(args, fmt.Sprintf("--config=%s", c.ConfigPath))
}

var err error
if c.BinPath, err = tiupexec.PrepareBinary("tikv-cdc", version, c.BinPath); err != nil {
return err
}
c.Process = &process{cmd: PrepareCommand(ctx, c.BinPath, args, nil, c.Dir)}

logIfErr(c.Process.SetOutputFile(c.LogFile()))
return c.Process.Start()
}

// Component return component name.
func (c *TiKVCDC) Component() string {
return "tikv-cdc"
}

// LogFile return the log file.
func (c *TiKVCDC) LogFile() string {
return filepath.Join(c.Dir, "tikv_cdc.log")
}
24 changes: 24 additions & 0 deletions components/playground/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type BootOptions struct {
TiKV instance.Config `yaml:"tikv"`
TiFlash instance.Config `yaml:"tiflash"`
TiCDC instance.Config `yaml:"ticdc"`
TiKVCDC instance.Config `yaml:"tikv_cdc"`
Pump instance.Config `yaml:"pump"`
Drainer instance.Config `yaml:"drainer"`
Host string `yaml:"host"`
Expand Down Expand Up @@ -89,6 +90,7 @@ const (
pd = "pd"
tiflash = "tiflash"
ticdc = "ticdc"
kvcdc = "kvcdc"
pump = "pump"
drainer = "drainer"

Expand All @@ -109,6 +111,7 @@ const (
pdConfig = "pd.config"
tiflashConfig = "tiflash.config"
ticdcConfig = "ticdc.config"
kvcdcConfig = "kvcdc.config"
pumpConfig = "pump.config"
drainerConfig = "drainer.config"

Expand All @@ -118,8 +121,12 @@ const (
pdBinpath = "pd.binpath"
tiflashBinpath = "tiflash.binpath"
ticdcBinpath = "ticdc.binpath"
kvcdcBinpath = "kvcdc.binpath"
pumpBinpath = "pump.binpath"
drainerBinpath = "drainer.binpath"

// component version
kvcdcVersion = "kvcdc.version"
)

func installIfMissing(component, version string) error {
Expand Down Expand Up @@ -319,6 +326,7 @@ If you'd like to use a TiDB version other than %s, cancel and retry with the fol
rootCmd.Flags().Int(pd, defaultOptions.PD.Num, "PD instance number")
rootCmd.Flags().Int(tiflash, defaultOptions.TiFlash.Num, "TiFlash instance number")
rootCmd.Flags().Int(ticdc, defaultOptions.TiCDC.Num, "TiCDC instance number")
rootCmd.Flags().Int(kvcdc, defaultOptions.TiKVCDC.Num, "TiKV-CDC instance number")
rootCmd.Flags().Int(pump, defaultOptions.Pump.Num, "Pump instance number")
rootCmd.Flags().Int(drainer, defaultOptions.Drainer.Num, "Drainer instance number")

Expand All @@ -338,15 +346,19 @@ If you'd like to use a TiDB version other than %s, cancel and retry with the fol
rootCmd.Flags().String(pumpConfig, defaultOptions.Pump.ConfigPath, "Pump instance configuration file")
rootCmd.Flags().String(drainerConfig, defaultOptions.Drainer.ConfigPath, "Drainer instance configuration file")
rootCmd.Flags().String(ticdcConfig, defaultOptions.TiCDC.ConfigPath, "TiCDC instance configuration file")
rootCmd.Flags().String(kvcdcConfig, defaultOptions.TiKVCDC.ConfigPath, "TiKV-CDC instance configuration file")

rootCmd.Flags().String(dbBinpath, defaultOptions.TiDB.BinPath, "TiDB instance binary path")
rootCmd.Flags().String(kvBinpath, defaultOptions.TiKV.BinPath, "TiKV instance binary path")
rootCmd.Flags().String(pdBinpath, defaultOptions.PD.BinPath, "PD instance binary path")
rootCmd.Flags().String(tiflashBinpath, defaultOptions.TiFlash.BinPath, "TiFlash instance binary path")
rootCmd.Flags().String(ticdcBinpath, defaultOptions.TiCDC.BinPath, "TiCDC instance binary path")
rootCmd.Flags().String(kvcdcBinpath, defaultOptions.TiKVCDC.BinPath, "TiKV-CDC instance binary path")
rootCmd.Flags().String(pumpBinpath, defaultOptions.Pump.BinPath, "Pump instance binary path")
rootCmd.Flags().String(drainerBinpath, defaultOptions.Drainer.BinPath, "Drainer instance binary path")

rootCmd.Flags().String(kvcdcVersion, defaultOptions.TiKVCDC.Version, "TiKV-CDC instance version")

rootCmd.AddCommand(newDisplay())
rootCmd.AddCommand(newScaleOut())
rootCmd.AddCommand(newScaleIn())
Expand Down Expand Up @@ -418,6 +430,11 @@ func populateOpt(flagSet *pflag.FlagSet) (err error) {
if err != nil {
return
}
case kvcdc:
options.TiKVCDC.Num, err = strconv.Atoi(flag.Value.String())
if err != nil {
return
}
case pump:
options.Pump.Num, err = strconv.Atoi(flag.Value.String())
if err != nil {
Expand All @@ -439,6 +456,8 @@ func populateOpt(flagSet *pflag.FlagSet) (err error) {
options.TiFlash.ConfigPath = flag.Value.String()
case ticdcConfig:
options.TiCDC.ConfigPath = flag.Value.String()
case kvcdcConfig:
options.TiKVCDC.ConfigPath = flag.Value.String()
case pumpConfig:
options.Pump.ConfigPath = flag.Value.String()
case drainerConfig:
Expand All @@ -454,6 +473,8 @@ func populateOpt(flagSet *pflag.FlagSet) (err error) {
options.TiFlash.BinPath = flag.Value.String()
case ticdcBinpath:
options.TiCDC.BinPath = flag.Value.String()
case kvcdcBinpath:
options.TiKVCDC.BinPath = flag.Value.String()
case pumpBinpath:
options.Pump.BinPath = flag.Value.String()
case drainerBinpath:
Expand Down Expand Up @@ -486,6 +507,9 @@ func populateOpt(flagSet *pflag.FlagSet) (err error) {
if err != nil {
return
}

case kvcdcVersion:
options.TiKVCDC.Version = flag.Value.String()
}
})

Expand Down
39 changes: 38 additions & 1 deletion components/playground/playground.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type Playground struct {
tidbs []*instance.TiDBInstance
tiflashs []*instance.TiFlashInstance
ticdcs []*instance.TiCDC
tikvCdcs []*instance.TiKVCDC
pumps []*instance.Pump
drainers []*instance.Drainer
startedInstances []instance.Instance
Expand Down Expand Up @@ -309,6 +310,12 @@ func (p *Playground) handleScaleIn(w io.Writer, pid int) error {
p.ticdcs = append(p.ticdcs[:i], p.ticdcs[i+1:]...)
}
}
case spec.ComponentTiKVCDC:
for i := 0; i < len(p.tikvCdcs); i++ {
if p.tikvCdcs[i].Pid() == pid {
p.tikvCdcs = append(p.tikvCdcs[:i], p.tikvCdcs[i+1:]...)
}
}
case spec.ComponentTiFlash:
for i := 0; i < len(p.tiflashs); i++ {
if p.tiflashs[i].Pid() == pid {
Expand Down Expand Up @@ -409,6 +416,8 @@ func (p *Playground) sanitizeComponentConfig(cid string, cfg *instance.Config) e
return p.sanitizeConfig(p.bootOptions.TiFlash, cfg)
case spec.ComponentCDC:
return p.sanitizeConfig(p.bootOptions.TiCDC, cfg)
case spec.ComponentTiKVCDC:
return p.sanitizeConfig(p.bootOptions.TiKVCDC, cfg)
case spec.ComponentPump:
return p.sanitizeConfig(p.bootOptions.Pump, cfg)
case spec.ComponentDrainer:
Expand All @@ -419,7 +428,8 @@ func (p *Playground) sanitizeComponentConfig(cid string, cfg *instance.Config) e
}

func (p *Playground) startInstance(ctx context.Context, inst instance.Instance) error {
version, err := environment.GlobalEnv().V1Repository().ResolveComponentVersion(inst.Component(), p.bootOptions.Version)
boundVersion := p.bindVersion(inst.Component(), p.bootOptions.Version)
version, err := environment.GlobalEnv().V1Repository().ResolveComponentVersion(inst.Component(), boundVersion)
if err != nil {
return err
}
Expand Down Expand Up @@ -593,6 +603,13 @@ func (p *Playground) WalkInstances(fn func(componentID string, ins instance.Inst
}
}

for _, ins := range p.tikvCdcs {
err := fn(spec.ComponentTiKVCDC, ins)
if err != nil {
return err
}
}

for _, ins := range p.drainers {
err := fn(spec.ComponentDrainer, ins)
if err != nil {
Expand Down Expand Up @@ -670,6 +687,10 @@ func (p *Playground) addInstance(componentID string, cfg instance.Config) (ins i
inst := instance.NewTiCDC(cfg.BinPath, dir, host, cfg.ConfigPath, id, p.pds)
ins = inst
p.ticdcs = append(p.ticdcs, inst)
case spec.ComponentTiKVCDC:
inst := instance.NewTiKVCDC(cfg.BinPath, dir, host, cfg.ConfigPath, id, p.pds)
ins = inst
p.tikvCdcs = append(p.tikvCdcs, inst)
case spec.ComponentPump:
inst := instance.NewPump(cfg.BinPath, dir, host, cfg.ConfigPath, id, p.pds)
ins = inst
Expand Down Expand Up @@ -765,6 +786,15 @@ func (p *Playground) waitAllTiFlashUp() {
}
}

func (p *Playground) bindVersion(comp string, version string) (bindVersion string) {
switch comp {
case spec.ComponentTiKVCDC:
return p.bootOptions.TiKVCDC.Version
default:
return version
}
}

func (p *Playground) bootCluster(ctx context.Context, env *environment.Environment, options *BootOptions) error {
for _, cfg := range []*instance.Config{
&options.PD,
Expand All @@ -773,6 +803,7 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme
&options.TiFlash,
&options.Pump,
&options.Drainer,
&options.TiKVCDC,
} {
path, err := getAbsolutePath(cfg.ConfigPath)
if err != nil {
Expand Down Expand Up @@ -807,6 +838,7 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme
{spec.ComponentPump, options.Pump},
{spec.ComponentTiDB, options.TiDB},
{spec.ComponentCDC, options.TiCDC},
{spec.ComponentTiKVCDC, options.TiKVCDC},
{spec.ComponentDrainer, options.Drainer},
{spec.ComponentTiFlash, options.TiFlash},
}
Expand Down Expand Up @@ -1001,6 +1033,11 @@ func (p *Playground) terminate(sig syscall.Signal) {
kill(inst.Component(), inst.Pid(), inst.Wait)
}
}
for _, inst := range p.tikvCdcs {
if inst.Process != nil {
kill(inst.Component(), inst.Pid(), inst.Wait)
}
}
for _, inst := range p.drainers {
if inst.Process != nil {
kill(inst.Component(), inst.Pid(), inst.Wait)
Expand Down
1 change: 1 addition & 0 deletions pkg/cluster/spec/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
ComponentDrainer = "drainer"
ComponentPump = "pump"
ComponentCDC = "cdc"
ComponentTiKVCDC = "tikv-cdc"
Copy link
Contributor

@AstroProfundis AstroProfundis Sep 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between cdc and tikv-cdc? This is added to the cluster/spec package, so does tiup-cluster need to be aware of the new component as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tikv-cdc is another CDC frame but focus on NoSQL scenario, while cdc focus on SQL database. Please refer to https://github.com/tikv/migration/blob/main/cdc/README.md.

tiup-cluster need to be aware of the new component as well, please see #2022.

ComponentTiSpark = "tispark"
ComponentSpark = "spark"
ComponentAlertmanager = "alertmanager"
Expand Down
Loading