Skip to content

Commit

Permalink
Merge branch 'master' into go1.19
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Sep 15, 2022
2 parents b5b4bb3 + d6a2826 commit f8c9810
Show file tree
Hide file tree
Showing 36 changed files with 1,339 additions and 31 deletions.
1 change: 1 addition & 0 deletions .github/workflows/integrate-cluster-cmd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ jobs:
# - 'test_cmd_tls_native_ssh'
- 'test_upgrade'
- 'test_upgrade_tls'
- 'test_tikv_cdc'
env:
working-directory: ${{ github.workspace }}/go/src/github.com/${{ github.repository }}
steps:
Expand Down
3 changes: 3 additions & 0 deletions components/playground/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func buildCommands(tp CommandType, opt *BootOptions) (cmds []Command) {
{"tiflash", opt.TiFlash},
{"tidb", opt.TiDB},
{"ticdc", opt.TiCDC},
{"tikv-cdc", opt.TiKVCDC},
{"drainer", opt.Drainer},
}

Expand Down Expand Up @@ -98,6 +99,7 @@ func newScaleOut() *cobra.Command {
cmd.Flags().IntVarP(&opt.PD.Num, "pd", "", opt.PD.Num, "PD instance number")
cmd.Flags().IntVarP(&opt.TiFlash.Num, "tiflash", "", opt.TiFlash.Num, "TiFlash instance number")
cmd.Flags().IntVarP(&opt.TiCDC.Num, "ticdc", "", opt.TiCDC.Num, "TiCDC instance number")
cmd.Flags().IntVarP(&opt.TiKVCDC.Num, "kvcdc", "", opt.TiKVCDC.Num, "TiKV-CDC instance number")
cmd.Flags().IntVarP(&opt.Pump.Num, "pump", "", opt.Pump.Num, "Pump instance number")
cmd.Flags().IntVarP(&opt.Drainer.Num, "drainer", "", opt.Pump.Num, "Drainer instance number")

Expand All @@ -116,6 +118,7 @@ func newScaleOut() *cobra.Command {
cmd.Flags().StringVarP(&opt.PD.BinPath, "pd.binpath", "", opt.PD.BinPath, "PD instance binary path")
cmd.Flags().StringVarP(&opt.TiFlash.BinPath, "tiflash.binpath", "", opt.TiFlash.BinPath, "TiFlash instance binary path")
cmd.Flags().StringVarP(&opt.TiCDC.BinPath, "ticdc.binpath", "", opt.TiCDC.BinPath, "TiCDC instance binary path")
cmd.Flags().StringVarP(&opt.TiKVCDC.BinPath, "kvcdc.binpath", "", opt.TiKVCDC.BinPath, "TiKVCDC instance binary path")
cmd.Flags().StringVarP(&opt.Pump.BinPath, "pump.binpath", "", opt.Pump.BinPath, "Pump instance binary path")
cmd.Flags().StringVarP(&opt.Drainer.BinPath, "drainer.binpath", "", opt.Drainer.BinPath, "Drainer instance binary path")

Expand Down
1 change: 1 addition & 0 deletions components/playground/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Config struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
UpTimeout int `yaml:"up_timeout"`
Version string `yaml:"version"`
}

type instance struct {
Expand Down
86 changes: 86 additions & 0 deletions components/playground/instance/tikv_cdc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package instance

import (
"context"
"fmt"
"path/filepath"
"strings"

tiupexec "github.com/pingcap/tiup/pkg/exec"
"github.com/pingcap/tiup/pkg/utils"
)

// TiKVCDC represent a TiKV-CDC instance.
type TiKVCDC struct {
instance
pds []*PDInstance
Process
}

var _ Instance = &TiKVCDC{}

// NewTiKVCDC create a TiKVCDC instance.
func NewTiKVCDC(binPath string, dir, host, configPath string, id int, pds []*PDInstance) *TiKVCDC {
tikvCdc := &TiKVCDC{
instance: instance{
BinPath: binPath,
ID: id,
Dir: dir,
Host: host,
Port: utils.MustGetFreePort(host, 8600),
ConfigPath: configPath,
},
pds: pds,
}
tikvCdc.StatusPort = tikvCdc.Port
return tikvCdc
}

// Start implements Instance interface.
func (c *TiKVCDC) Start(ctx context.Context, version utils.Version) error {
endpoints := pdEndpoints(c.pds, true)

args := []string{
"server",
fmt.Sprintf("--addr=%s:%d", c.Host, c.Port),
fmt.Sprintf("--advertise-addr=%s:%d", AdvertiseHost(c.Host), c.Port),
fmt.Sprintf("--pd=%s", strings.Join(endpoints, ",")),
fmt.Sprintf("--log-file=%s", c.LogFile()),
fmt.Sprintf("--data-dir=%s", filepath.Join(c.Dir, "data")),
}
if c.ConfigPath != "" {
args = append(args, fmt.Sprintf("--config=%s", c.ConfigPath))
}

var err error
if c.BinPath, err = tiupexec.PrepareBinary("tikv-cdc", version, c.BinPath); err != nil {
return err
}
c.Process = &process{cmd: PrepareCommand(ctx, c.BinPath, args, nil, c.Dir)}

logIfErr(c.Process.SetOutputFile(c.LogFile()))
return c.Process.Start()
}

// Component return component name.
func (c *TiKVCDC) Component() string {
return "tikv-cdc"
}

// LogFile return the log file.
func (c *TiKVCDC) LogFile() string {
return filepath.Join(c.Dir, "tikv_cdc.log")
}
24 changes: 24 additions & 0 deletions components/playground/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type BootOptions struct {
TiKV instance.Config `yaml:"tikv"`
TiFlash instance.Config `yaml:"tiflash"`
TiCDC instance.Config `yaml:"ticdc"`
TiKVCDC instance.Config `yaml:"tikv_cdc"`
Pump instance.Config `yaml:"pump"`
Drainer instance.Config `yaml:"drainer"`
Host string `yaml:"host"`
Expand Down Expand Up @@ -89,6 +90,7 @@ const (
pd = "pd"
tiflash = "tiflash"
ticdc = "ticdc"
kvcdc = "kvcdc"
pump = "pump"
drainer = "drainer"

Expand All @@ -109,6 +111,7 @@ const (
pdConfig = "pd.config"
tiflashConfig = "tiflash.config"
ticdcConfig = "ticdc.config"
kvcdcConfig = "kvcdc.config"
pumpConfig = "pump.config"
drainerConfig = "drainer.config"

Expand All @@ -118,8 +121,12 @@ const (
pdBinpath = "pd.binpath"
tiflashBinpath = "tiflash.binpath"
ticdcBinpath = "ticdc.binpath"
kvcdcBinpath = "kvcdc.binpath"
pumpBinpath = "pump.binpath"
drainerBinpath = "drainer.binpath"

// component version
kvcdcVersion = "kvcdc.version"
)

func installIfMissing(component, version string) error {
Expand Down Expand Up @@ -319,6 +326,7 @@ If you'd like to use a TiDB version other than %s, cancel and retry with the fol
rootCmd.Flags().Int(pd, defaultOptions.PD.Num, "PD instance number")
rootCmd.Flags().Int(tiflash, defaultOptions.TiFlash.Num, "TiFlash instance number")
rootCmd.Flags().Int(ticdc, defaultOptions.TiCDC.Num, "TiCDC instance number")
rootCmd.Flags().Int(kvcdc, defaultOptions.TiKVCDC.Num, "TiKV-CDC instance number")
rootCmd.Flags().Int(pump, defaultOptions.Pump.Num, "Pump instance number")
rootCmd.Flags().Int(drainer, defaultOptions.Drainer.Num, "Drainer instance number")

Expand All @@ -338,15 +346,19 @@ If you'd like to use a TiDB version other than %s, cancel and retry with the fol
rootCmd.Flags().String(pumpConfig, defaultOptions.Pump.ConfigPath, "Pump instance configuration file")
rootCmd.Flags().String(drainerConfig, defaultOptions.Drainer.ConfigPath, "Drainer instance configuration file")
rootCmd.Flags().String(ticdcConfig, defaultOptions.TiCDC.ConfigPath, "TiCDC instance configuration file")
rootCmd.Flags().String(kvcdcConfig, defaultOptions.TiKVCDC.ConfigPath, "TiKV-CDC instance configuration file")

rootCmd.Flags().String(dbBinpath, defaultOptions.TiDB.BinPath, "TiDB instance binary path")
rootCmd.Flags().String(kvBinpath, defaultOptions.TiKV.BinPath, "TiKV instance binary path")
rootCmd.Flags().String(pdBinpath, defaultOptions.PD.BinPath, "PD instance binary path")
rootCmd.Flags().String(tiflashBinpath, defaultOptions.TiFlash.BinPath, "TiFlash instance binary path")
rootCmd.Flags().String(ticdcBinpath, defaultOptions.TiCDC.BinPath, "TiCDC instance binary path")
rootCmd.Flags().String(kvcdcBinpath, defaultOptions.TiKVCDC.BinPath, "TiKV-CDC instance binary path")
rootCmd.Flags().String(pumpBinpath, defaultOptions.Pump.BinPath, "Pump instance binary path")
rootCmd.Flags().String(drainerBinpath, defaultOptions.Drainer.BinPath, "Drainer instance binary path")

rootCmd.Flags().String(kvcdcVersion, defaultOptions.TiKVCDC.Version, "TiKV-CDC instance version")

rootCmd.AddCommand(newDisplay())
rootCmd.AddCommand(newScaleOut())
rootCmd.AddCommand(newScaleIn())
Expand Down Expand Up @@ -418,6 +430,11 @@ func populateOpt(flagSet *pflag.FlagSet) (err error) {
if err != nil {
return
}
case kvcdc:
options.TiKVCDC.Num, err = strconv.Atoi(flag.Value.String())
if err != nil {
return
}
case pump:
options.Pump.Num, err = strconv.Atoi(flag.Value.String())
if err != nil {
Expand All @@ -439,6 +456,8 @@ func populateOpt(flagSet *pflag.FlagSet) (err error) {
options.TiFlash.ConfigPath = flag.Value.String()
case ticdcConfig:
options.TiCDC.ConfigPath = flag.Value.String()
case kvcdcConfig:
options.TiKVCDC.ConfigPath = flag.Value.String()
case pumpConfig:
options.Pump.ConfigPath = flag.Value.String()
case drainerConfig:
Expand All @@ -454,6 +473,8 @@ func populateOpt(flagSet *pflag.FlagSet) (err error) {
options.TiFlash.BinPath = flag.Value.String()
case ticdcBinpath:
options.TiCDC.BinPath = flag.Value.String()
case kvcdcBinpath:
options.TiKVCDC.BinPath = flag.Value.String()
case pumpBinpath:
options.Pump.BinPath = flag.Value.String()
case drainerBinpath:
Expand Down Expand Up @@ -486,6 +507,9 @@ func populateOpt(flagSet *pflag.FlagSet) (err error) {
if err != nil {
return
}

case kvcdcVersion:
options.TiKVCDC.Version = flag.Value.String()
}
})

Expand Down
39 changes: 38 additions & 1 deletion components/playground/playground.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type Playground struct {
tidbs []*instance.TiDBInstance
tiflashs []*instance.TiFlashInstance
ticdcs []*instance.TiCDC
tikvCdcs []*instance.TiKVCDC
pumps []*instance.Pump
drainers []*instance.Drainer
startedInstances []instance.Instance
Expand Down Expand Up @@ -309,6 +310,12 @@ func (p *Playground) handleScaleIn(w io.Writer, pid int) error {
p.ticdcs = append(p.ticdcs[:i], p.ticdcs[i+1:]...)
}
}
case spec.ComponentTiKVCDC:
for i := 0; i < len(p.tikvCdcs); i++ {
if p.tikvCdcs[i].Pid() == pid {
p.tikvCdcs = append(p.tikvCdcs[:i], p.tikvCdcs[i+1:]...)
}
}
case spec.ComponentTiFlash:
for i := 0; i < len(p.tiflashs); i++ {
if p.tiflashs[i].Pid() == pid {
Expand Down Expand Up @@ -409,6 +416,8 @@ func (p *Playground) sanitizeComponentConfig(cid string, cfg *instance.Config) e
return p.sanitizeConfig(p.bootOptions.TiFlash, cfg)
case spec.ComponentCDC:
return p.sanitizeConfig(p.bootOptions.TiCDC, cfg)
case spec.ComponentTiKVCDC:
return p.sanitizeConfig(p.bootOptions.TiKVCDC, cfg)
case spec.ComponentPump:
return p.sanitizeConfig(p.bootOptions.Pump, cfg)
case spec.ComponentDrainer:
Expand All @@ -419,7 +428,8 @@ func (p *Playground) sanitizeComponentConfig(cid string, cfg *instance.Config) e
}

func (p *Playground) startInstance(ctx context.Context, inst instance.Instance) error {
version, err := environment.GlobalEnv().V1Repository().ResolveComponentVersion(inst.Component(), p.bootOptions.Version)
boundVersion := p.bindVersion(inst.Component(), p.bootOptions.Version)
version, err := environment.GlobalEnv().V1Repository().ResolveComponentVersion(inst.Component(), boundVersion)
if err != nil {
return err
}
Expand Down Expand Up @@ -593,6 +603,13 @@ func (p *Playground) WalkInstances(fn func(componentID string, ins instance.Inst
}
}

for _, ins := range p.tikvCdcs {
err := fn(spec.ComponentTiKVCDC, ins)
if err != nil {
return err
}
}

for _, ins := range p.drainers {
err := fn(spec.ComponentDrainer, ins)
if err != nil {
Expand Down Expand Up @@ -670,6 +687,10 @@ func (p *Playground) addInstance(componentID string, cfg instance.Config) (ins i
inst := instance.NewTiCDC(cfg.BinPath, dir, host, cfg.ConfigPath, id, p.pds)
ins = inst
p.ticdcs = append(p.ticdcs, inst)
case spec.ComponentTiKVCDC:
inst := instance.NewTiKVCDC(cfg.BinPath, dir, host, cfg.ConfigPath, id, p.pds)
ins = inst
p.tikvCdcs = append(p.tikvCdcs, inst)
case spec.ComponentPump:
inst := instance.NewPump(cfg.BinPath, dir, host, cfg.ConfigPath, id, p.pds)
ins = inst
Expand Down Expand Up @@ -765,6 +786,15 @@ func (p *Playground) waitAllTiFlashUp() {
}
}

func (p *Playground) bindVersion(comp string, version string) (bindVersion string) {
switch comp {
case spec.ComponentTiKVCDC:
return p.bootOptions.TiKVCDC.Version
default:
return version
}
}

func (p *Playground) bootCluster(ctx context.Context, env *environment.Environment, options *BootOptions) error {
for _, cfg := range []*instance.Config{
&options.PD,
Expand All @@ -773,6 +803,7 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme
&options.TiFlash,
&options.Pump,
&options.Drainer,
&options.TiKVCDC,
} {
path, err := getAbsolutePath(cfg.ConfigPath)
if err != nil {
Expand Down Expand Up @@ -807,6 +838,7 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme
{spec.ComponentPump, options.Pump},
{spec.ComponentTiDB, options.TiDB},
{spec.ComponentCDC, options.TiCDC},
{spec.ComponentTiKVCDC, options.TiKVCDC},
{spec.ComponentDrainer, options.Drainer},
{spec.ComponentTiFlash, options.TiFlash},
}
Expand Down Expand Up @@ -1001,6 +1033,11 @@ func (p *Playground) terminate(sig syscall.Signal) {
kill(inst.Component(), inst.Pid(), inst.Wait)
}
}
for _, inst := range p.tikvCdcs {
if inst.Process != nil {
kill(inst.Component(), inst.Pid(), inst.Wait)
}
}
for _, inst := range p.drainers {
if inst.Process != nil {
kill(inst.Component(), inst.Pid(), inst.Wait)
Expand Down
2 changes: 1 addition & 1 deletion docker/up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ if [ -z "${DEV}" ]; then
)
else
INFO "Build tiup-cluster in $TIUP_CLUSTER_ROOT"
(cd "${TIUP_CLUSTER_ROOT}";make failpoint-enable;GOOS=linux GOARCH=amd64 make cluster dm;make failpoint-disable)
(cd "${TIUP_CLUSTER_ROOT}";make failpoint-enable;GOOS=linux GOARCH=amd64 make tiup cluster dm;make failpoint-disable)
fi

if [ "${INIT_ONLY}" -eq 1 ]; then
Expand Down
16 changes: 16 additions & 0 deletions embed/examples/cluster/topology.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ monitored:
# pd:
# tiflash:
# tiflash-learner:
# kvcdc:

# # Server configs are used to specify the configuration of PD Servers.
pd_servers:
Expand Down Expand Up @@ -278,6 +279,21 @@ tiflash_servers:
data_dir: /data2/tidb-data/tiflash-9001
log_dir: /data2/tidb-deploy/tiflash-9001/log

# # Server configs are used to specify the configuration of TiKV-CDC Servers.
kvcdc_servers:
- host: 10.0.1.20
# # SSH port of the server.
# ssh_port: 22
# # TiKV-CDC Server communication port.
port: 8600
# # TiKV-CDC Server data storage directory.
data_dir: "/data1/tidb-data/tikv-cdc-8600"
# # TiKV-CDC Server log file storage directory.
log_dir: "/data1/tidb-deploy/tikv-cdc-8600/log"
- host: 10.0.1.21
data_dir: "/data1/tidb-data/tikv-cdc-8600"
log_dir: "/data1/tidb-deploy/tikv-cdc-8600/log"

# # Server configs are used to specify the configuration of Prometheus Server.
monitoring_servers:
# # The ip address of the Monitoring Server.
Expand Down
Loading

0 comments on commit f8c9810

Please sign in to comment.