Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

playground: add dm support for playground #2465

Merged
merged 7 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions components/playground/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func buildCommands(tp CommandType, opt *BootOptions) (cmds []Command) {
{"ticdc", opt.TiCDC},
{"tikv-cdc", opt.TiKVCDC},
{"drainer", opt.Drainer},
{"dm-master", opt.DMMaster},
{"dm-worker", opt.DMWorker},
}

for _, cmd := range commands {
Expand Down Expand Up @@ -113,6 +115,8 @@ func newScaleOut() *cobra.Command {
cmd.Flags().StringVarP(&opt.TSO.Host, "tso.host", "", opt.TSO.Host, "Playground TSO host. If not provided, TSO will still use `host` flag as its host")
cmd.Flags().StringVarP(&opt.Scheduling.Host, "scheduling.host", "", opt.Scheduling.Host, "Playground Scheduling host. If not provided, Scheduling will still use `host` flag as its host")
cmd.Flags().StringVarP(&opt.TiProxy.Host, "tiproxy.host", "", opt.PD.Host, "Playground TiProxy host. If not provided, TiProxy will still use `host` flag as its host")
cmd.Flags().IntVarP(&opt.DMMaster.Num, "dm-master", "", opt.DMMaster.Num, "DM-master instance number")
cmd.Flags().IntVarP(&opt.DMWorker.Num, "dm-worker", "", opt.DMWorker.Num, "DM-worker instance number")

cmd.Flags().StringVarP(&opt.TiDB.ConfigPath, "db.config", "", opt.TiDB.ConfigPath, "TiDB instance configuration file")
cmd.Flags().StringVarP(&opt.TiKV.ConfigPath, "kv.config", "", opt.TiKV.ConfigPath, "TiKV instance configuration file")
Expand All @@ -123,6 +127,8 @@ func newScaleOut() *cobra.Command {
cmd.Flags().StringVarP(&opt.TiProxy.ConfigPath, "tiproxy.config", "", opt.TiProxy.ConfigPath, "TiProxy instance configuration file")
cmd.Flags().StringVarP(&opt.Pump.ConfigPath, "pump.config", "", opt.Pump.ConfigPath, "Pump instance configuration file")
cmd.Flags().StringVarP(&opt.Drainer.ConfigPath, "drainer.config", "", opt.Drainer.ConfigPath, "Drainer instance configuration file")
cmd.Flags().StringVarP(&opt.DMMaster.ConfigPath, "dm-master.config", "", opt.DMMaster.ConfigPath, "DM-master instance configuration file")
cmd.Flags().StringVarP(&opt.DMWorker.ConfigPath, "dm-worker.config", "", opt.DMWorker.ConfigPath, "DM-worker instance configuration file")

cmd.Flags().StringVarP(&opt.TiDB.BinPath, "db.binpath", "", opt.TiDB.BinPath, "TiDB instance binary path")
cmd.Flags().StringVarP(&opt.TiKV.BinPath, "kv.binpath", "", opt.TiKV.BinPath, "TiKV instance binary path")
Expand All @@ -135,6 +141,8 @@ func newScaleOut() *cobra.Command {
cmd.Flags().StringVarP(&opt.TiKVCDC.BinPath, "kvcdc.binpath", "", opt.TiKVCDC.BinPath, "TiKVCDC instance binary path")
cmd.Flags().StringVarP(&opt.Pump.BinPath, "pump.binpath", "", opt.Pump.BinPath, "Pump instance binary path")
cmd.Flags().StringVarP(&opt.Drainer.BinPath, "drainer.binpath", "", opt.Drainer.BinPath, "Drainer instance binary path")
cmd.Flags().StringVarP(&opt.DMMaster.BinPath, "dm-master.binpath", "", opt.DMMaster.BinPath, "DM-master instance binary path")
cmd.Flags().StringVarP(&opt.DMWorker.BinPath, "dm-worker.binpath", "", opt.DMWorker.BinPath, "DM-worker instance binary path")

return cmd
}
Expand Down
90 changes: 90 additions & 0 deletions components/playground/instance/dm_master.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package instance

import (
"context"
"fmt"
"path/filepath"
"strings"

"github.com/pingcap/tiup/pkg/utils"
)

// DMMaster represent a DM master instance.
type DMMaster struct {
instance
Process
initEndpoints []*DMMaster
}

var _ Instance = &DMMaster{}

// NewDMMaster create a new DMMaster instance.
func NewDMMaster(binPath string, dir, host, configPath string, portOffset int, id int, port int) *DMMaster {
if port <= 0 {
port = 8261
}
return &DMMaster{
instance: instance{
BinPath: binPath,
ID: id,
Dir: dir,
Host: host,
Port: utils.MustGetFreePort(host, 8291, portOffset),
// Similar like PD's client port, here use StatusPort for Master Port.
StatusPort: utils.MustGetFreePort(host, port, portOffset),
ConfigPath: configPath,
},
}
}

// Name return the name of the instance.
func (m *DMMaster) Name() string {
return fmt.Sprintf("dm-master-%d", m.ID)
}

// Start starts the instance.
func (m *DMMaster) Start(ctx context.Context) error {
args := []string{
fmt.Sprintf("--name=%s", m.Name()),
fmt.Sprintf("--master-addr=http://%s", utils.JoinHostPort(m.Host, m.StatusPort)),
fmt.Sprintf("--advertise-addr=http://%s", utils.JoinHostPort(AdvertiseHost(m.Host), m.StatusPort)),
fmt.Sprintf("--peer-urls=http://%s", utils.JoinHostPort(m.Host, m.Port)),
fmt.Sprintf("--advertise-peer-urls=http://%s", utils.JoinHostPort(AdvertiseHost(m.Host), m.Port)),
fmt.Sprintf("--log-file=%s", m.LogFile()),
}

endpoints := make([]string, 0)
for _, master := range m.initEndpoints {
endpoints = append(endpoints, fmt.Sprintf("%s=http://%s", master.Name(), utils.JoinHostPort(master.Host, master.Port)))
}
args = append(args, fmt.Sprintf("--initial-cluster=%s", strings.Join(endpoints, ",")))

if m.ConfigPath != "" {
args = append(args, fmt.Sprintf("--config=%s", m.ConfigPath))
}

m.Process = &process{cmd: PrepareCommand(ctx, m.BinPath, args, nil, m.Dir)}

logIfErr(m.Process.SetOutputFile(m.LogFile()))
return m.Process.Start()
}

// SetInitEndpoints set the initial endpoints for the DM master.
func (m *DMMaster) SetInitEndpoints(endpoints []*DMMaster) {
m.initEndpoints = endpoints
}

// Component return the component of the instance.
func (m *DMMaster) Component() string {
return "dm-master"
}

// LogFile return the log file path of the instance.
func (m *DMMaster) LogFile() string {
return filepath.Join(m.Dir, "dm-master.log")
}

// Addr return the address of the instance.
func (m *DMMaster) Addr() string {
return utils.JoinHostPort(m.Host, m.StatusPort)
}
83 changes: 83 additions & 0 deletions components/playground/instance/dm_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package instance

import (
"context"
"fmt"
"path/filepath"
"strings"

"github.com/pingcap/tiup/pkg/utils"
)

// DMWorker represent a DM worker instance.
type DMWorker struct {
instance
Process

masters []*DMMaster
}

var _ Instance = &DMWorker{}

// NewDMWorker create a DMWorker instance.
func NewDMWorker(binPath string, dir, host, configPath string, portOffset int, id int, port int, masters []*DMMaster) *DMWorker {
if port <= 0 {
port = 8262
}
return &DMWorker{
instance: instance{
BinPath: binPath,
ID: id,
Dir: dir,
Host: host,
Port: utils.MustGetFreePort(host, port, portOffset),
ConfigPath: configPath,
},
masters: masters,
}
}

// MasterAddrs return the master addresses.
func (w *DMWorker) MasterAddrs() []string {
var addrs []string
for _, master := range w.masters {
addrs = append(addrs, utils.JoinHostPort(AdvertiseHost(master.Host), master.StatusPort))
}
return addrs
}

// Name return the name of the instance.
func (w *DMWorker) Name() string {
return fmt.Sprintf("dm-worker-%d", w.ID)
}

// Start starts the instance.
func (w *DMWorker) Start(ctx context.Context) error {
args := []string{
fmt.Sprintf("--name=%s", w.Name()),
fmt.Sprintf("--worker-addr=%s", utils.JoinHostPort(w.Host, w.Port)),
fmt.Sprintf("--advertise-addr=%s", utils.JoinHostPort(AdvertiseHost(w.Host), w.Port)),
fmt.Sprintf("--join=%s", strings.Join(w.MasterAddrs(), ",")),
fmt.Sprintf("--log-file=%s", w.LogFile()),
}

if w.ConfigPath != "" {
args = append(args, fmt.Sprintf("--config=%s", w.ConfigPath))
}

w.Process = &process{cmd: PrepareCommand(ctx, w.BinPath, args, nil, w.Dir)}

logIfErr(w.Process.SetOutputFile(w.LogFile()))

return w.Process.Start()
}

// Component return the component of the instance.
func (w *DMWorker) Component() string {
return "dm-worker"
}

// LogFile return the log file of the instance.
func (w *DMWorker) LogFile() string {
return filepath.Join(w.Dir, "dm-worker.log")
}
30 changes: 30 additions & 0 deletions components/playground/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ type BootOptions struct {
CSEOpts instance.CSEOptions `yaml:"cse"` // Only available when mode == tidb-cse
GrafanaPort int `yaml:"grafana_port"`
PortOffset int `yaml:"port_offset"`
DMMaster instance.Config `yaml:"dm_master"`
DMWorker instance.Config `yaml:"dm_worker"`
}

var (
Expand Down Expand Up @@ -298,6 +300,8 @@ Note: Version constraint [bold]%s[reset] is resolved to [green][bold]%s[reset].
rootCmd.Flags().IntVar(&options.TiKVCDC.Num, "kvcdc", 0, "TiKV-CDC instance number")
rootCmd.Flags().IntVar(&options.Pump.Num, "pump", 0, "Pump instance number")
rootCmd.Flags().IntVar(&options.Drainer.Num, "drainer", 0, "Drainer instance number")
rootCmd.Flags().IntVar(&options.DMMaster.Num, "dm-master", 0, "DM-master instance number")
rootCmd.Flags().IntVar(&options.DMWorker.Num, "dm-worker", 0, "DM-worker instance number")

rootCmd.Flags().IntVar(&options.TiDB.UpTimeout, "db.timeout", 60, "TiDB max wait time in seconds for starting, 0 means no limit")
rootCmd.Flags().IntVar(&options.TiFlash.UpTimeout, "tiflash.timeout", 120, "TiFlash max wait time in seconds for starting, 0 means no limit")
Expand All @@ -314,6 +318,10 @@ Note: Version constraint [bold]%s[reset] is resolved to [green][bold]%s[reset].
rootCmd.Flags().IntVar(&options.TiCDC.Port, "ticdc.port", 0, "Playground TiCDC port. If not provided, TiCDC will use 8300 as its port")
rootCmd.Flags().StringVar(&options.TiProxy.Host, "tiproxy.host", "", "Playground TiProxy host. If not provided, TiProxy will still use `host` flag as its host")
rootCmd.Flags().IntVar(&options.TiProxy.Port, "tiproxy.port", 0, "Playground TiProxy port. If not provided, TiProxy will use 6000 as its port")
rootCmd.Flags().StringVar(&options.DMMaster.Host, "dm-master.host", "", "DM-master instance host")
rootCmd.Flags().IntVar(&options.DMMaster.Port, "dm-master.port", 8261, "DM-master instance port")
rootCmd.Flags().StringVar(&options.DMWorker.Host, "dm-worker.host", "", "DM-worker instance host")
rootCmd.Flags().IntVar(&options.DMWorker.Port, "dm-worker.port", 8262, "DM-worker instance port")

rootCmd.Flags().StringVar(&options.TiDB.ConfigPath, "db.config", "", "TiDB instance configuration file")
rootCmd.Flags().StringVar(&options.TiKV.ConfigPath, "kv.config", "", "TiKV instance configuration file")
Expand All @@ -328,6 +336,8 @@ Note: Version constraint [bold]%s[reset] is resolved to [green][bold]%s[reset].
rootCmd.Flags().StringVar(&options.Drainer.ConfigPath, "drainer.config", "", "Drainer instance configuration file")
rootCmd.Flags().StringVar(&options.TiCDC.ConfigPath, "ticdc.config", "", "TiCDC instance configuration file")
rootCmd.Flags().StringVar(&options.TiKVCDC.ConfigPath, "kvcdc.config", "", "TiKV-CDC instance configuration file")
rootCmd.Flags().StringVar(&options.DMMaster.ConfigPath, "dm-master.config", "", "DM-master instance configuration file")
rootCmd.Flags().StringVar(&options.DMWorker.ConfigPath, "dm-worker.config", "", "DM-worker instance configuration file")

rootCmd.Flags().StringVar(&options.TiDB.BinPath, "db.binpath", "", "TiDB instance binary path")
rootCmd.Flags().StringVar(&options.TiKV.BinPath, "kv.binpath", "", "TiKV instance binary path")
Expand All @@ -343,6 +353,8 @@ Note: Version constraint [bold]%s[reset] is resolved to [green][bold]%s[reset].
rootCmd.Flags().StringVar(&options.TiKVCDC.BinPath, "kvcdc.binpath", "", "TiKV-CDC instance binary path")
rootCmd.Flags().StringVar(&options.Pump.BinPath, "pump.binpath", "", "Pump instance binary path")
rootCmd.Flags().StringVar(&options.Drainer.BinPath, "drainer.binpath", "", "Drainer instance binary path")
rootCmd.Flags().StringVar(&options.DMMaster.BinPath, "dm-master.binpath", "", "DM-master instance binary path")
rootCmd.Flags().StringVar(&options.DMWorker.BinPath, "dm-worker.binpath", "", "DM-worker instance binary path")

rootCmd.Flags().StringVar(&options.TiKVCDC.Version, "kvcdc.version", "", "TiKV-CDC instance version")

Expand Down Expand Up @@ -466,6 +478,24 @@ func checkStoreStatus(pdClient *api.PDClient, storeAddr string, timeout int) boo
}
}

func checkDMMasterStatus(dmMasterClient *api.DMMasterClient, dmMasterAddr string, timeout int) bool {
if timeout > 0 {
for i := 0; i < timeout; i++ {
if _, isActive, _, err := dmMasterClient.GetMaster(dmMasterAddr); err == nil && isActive {
return true
}
time.Sleep(time.Second)
}
return false
}
for {
if _, isActive, _, err := dmMasterClient.GetMaster(dmMasterAddr); err == nil && isActive {
return true
}
time.Sleep(time.Second)
}
}

func hasDashboard(pdAddr string) bool {
resp, err := http.Get(fmt.Sprintf("http://%s/dashboard", pdAddr))
if err != nil {
Expand Down
Loading
Loading