Skip to content

Commit

Permalink
add dm support for playground
Browse files Browse the repository at this point in the history
Signed-off-by: Siddon Tang <[email protected]>
  • Loading branch information
siddontang committed Oct 28, 2024
1 parent 964b40b commit ecd4397
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 0 deletions.
78 changes: 78 additions & 0 deletions components/playground/instance/dm_master.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package instance

import (
"context"
"fmt"
"path/filepath"
"strings"

"github.com/pingcap/tiup/pkg/utils"
)

type DMMaster struct {
instance
Process
initEndpoints []*DMMaster
}

var _ Instance = &DMMaster{}

func NewDMMaster(binPath string, dir, host, configPath string, portOffset int, id int, port int) *DMMaster {
if port <= 0 {
port = 8261
}
return &DMMaster{
instance: instance{
BinPath: binPath,
ID: id,
Dir: dir,
Host: host,
Port: utils.MustGetFreePort(host, 8291, portOffset),
// Similar like PD's client port, here use StatusPort for Master Port.
StatusPort: utils.MustGetFreePort(host, port, portOffset),
ConfigPath: configPath,
},
}
}

func (m *DMMaster) Name() string {
return fmt.Sprintf("dm-master-%d", m.ID)
}

func (m *DMMaster) Start(ctx context.Context) error {
args := []string{
fmt.Sprintf("--name=%s", m.Name()),
fmt.Sprintf("--master-addr=http://%s", utils.JoinHostPort(m.Host, m.StatusPort)),
fmt.Sprintf("--advertise-addr=http://%s", utils.JoinHostPort(AdvertiseHost(m.Host), m.StatusPort)),
fmt.Sprintf("--peer-urls=http://%s", utils.JoinHostPort(m.Host, m.Port)),
fmt.Sprintf("--advertise-peer-urls=http://%s", utils.JoinHostPort(AdvertiseHost(m.Host), m.Port)),
fmt.Sprintf("--log-file=%s", m.LogFile()),
}

endpoints := make([]string, 0)
for _, master := range m.initEndpoints {
endpoints = append(endpoints, fmt.Sprintf("%s=http://%s", master.Name(), utils.JoinHostPort(master.Host, master.Port)))
}
args = append(args, fmt.Sprintf("--initial-cluster=%s", strings.Join(endpoints, ",")))

if m.ConfigPath != "" {
args = append(args, fmt.Sprintf("--config=%s", m.ConfigPath))
}

m.Process = &process{cmd: PrepareCommand(ctx, m.BinPath, args, nil, m.Dir)}

logIfErr(m.Process.SetOutputFile(m.LogFile()))
return m.Process.Start()
}

func (m *DMMaster) SetInitEndpoints(endpoints []*DMMaster) {
m.initEndpoints = endpoints
}

func (m *DMMaster) Component() string {
return "dm-master"
}

func (m *DMMaster) LogFile() string {
return filepath.Join(m.Dir, "dm-master.log")
}
83 changes: 83 additions & 0 deletions components/playground/instance/dm_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package instance

import (
"context"
"fmt"
"path/filepath"
"strings"
"time"

"github.com/pingcap/tiup/pkg/utils"
)

type DMWorker struct {
instance
Process

masters []*DMMaster
}

var _ Instance = &DMWorker{}

// NewDMWorker create a DMWorker instance.
func NewDMWorker(binPath string, dir, host, configPath string, portOffset int, id int, port int, masters []*DMMaster) *DMWorker {
if port <= 0 {
port = 8262
}
return &DMWorker{
instance: instance{
BinPath: binPath,
ID: id,
Dir: dir,
Host: host,
Port: utils.MustGetFreePort(host, port, portOffset),
ConfigPath: configPath,
},
masters: masters,
}
}

func (w *DMWorker) MasterAddrs() []string {
var addrs []string
for _, master := range w.masters {
addrs = append(addrs, utils.JoinHostPort(AdvertiseHost(master.Host), master.StatusPort))
}
return addrs
}

func (w *DMWorker) Name() string {
return fmt.Sprintf("dm-worker-%d", w.ID)
}

func (w *DMWorker) Start(ctx context.Context) error {
args := []string{
fmt.Sprintf("--name=%s", w.Name()),
fmt.Sprintf("--worker-addr=%s", utils.JoinHostPort(w.Host, w.Port)),
fmt.Sprintf("--advertise-addr=%s", utils.JoinHostPort(AdvertiseHost(w.Host), w.Port)),
fmt.Sprintf("--join=%s", strings.Join(w.MasterAddrs(), ",")),
fmt.Sprintf("--log-file=%s", w.LogFile()),
}

if w.ConfigPath != "" {
args = append(args, fmt.Sprintf("--config=%s", w.ConfigPath))
}

w.Process = &process{cmd: PrepareCommand(ctx, w.BinPath, args, nil, w.Dir)}

logIfErr(w.Process.SetOutputFile(w.LogFile()))

// try to wait for the master to be ready
// this is a very ugly implementation, but it may mostly works
// TODO: find a better way to do this,
// e.g, let master support a HTTP API to check if it's ready
time.Sleep(time.Second * 3)
return w.Process.Start()
}

func (w *DMWorker) Component() string {
return "dm-worker"
}

func (w *DMWorker) LogFile() string {
return filepath.Join(w.Dir, "dm-worker.log")
}
15 changes: 15 additions & 0 deletions components/playground/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ type BootOptions struct {
CSEOpts instance.CSEOptions `yaml:"cse"` // Only available when mode == tidb-cse
GrafanaPort int `yaml:"grafana_port"`
PortOffset int `yaml:"port_offset"`
DMMaster instance.Config `yaml:"dm_master"`
DMWorker instance.Config `yaml:"dm_worker"`
}

var (
Expand Down Expand Up @@ -351,6 +353,19 @@ Note: Version constraint [bold]%s[reset] is resolved to [green][bold]%s[reset].
rootCmd.Flags().StringVar(&options.CSEOpts.AccessKey, "cse.access_key", "minioadmin", "Object store access key for --mode=tidb-cse")
rootCmd.Flags().StringVar(&options.CSEOpts.SecretKey, "cse.secret_key", "minioadmin", "Object store secret key for --mode=tidb-cse")

rootCmd.Flags().IntVar(&options.DMMaster.Num, "dm-master", 0, "DM-master instance number")
rootCmd.Flags().IntVar(&options.DMWorker.Num, "dm-worker", 0, "DM-worker instance number")

rootCmd.Flags().StringVar(&options.DMMaster.Host, "dm-master.host", "", "DM-master instance host")
rootCmd.Flags().IntVar(&options.DMMaster.Port, "dm-master.port", 8261, "DM-master instance port")
rootCmd.Flags().StringVar(&options.DMMaster.ConfigPath, "dm-master.config", "", "DM-master instance configuration file")
rootCmd.Flags().StringVar(&options.DMMaster.BinPath, "dm-master.binpath", "", "DM-master instance binary path")

rootCmd.Flags().StringVar(&options.DMWorker.Host, "dm-worker.host", "", "DM-worker instance host")
rootCmd.Flags().IntVar(&options.DMWorker.Port, "dm-worker.port", 8262, "DM-worker instance port")
rootCmd.Flags().StringVar(&options.DMWorker.ConfigPath, "dm-worker.config", "", "DM-worker instance configuration file")
rootCmd.Flags().StringVar(&options.DMWorker.BinPath, "dm-worker.binpath", "", "DM-worker instance binary path")

rootCmd.AddCommand(newDisplay())
rootCmd.AddCommand(newScaleOut())
rootCmd.AddCommand(newScaleIn())
Expand Down
44 changes: 44 additions & 0 deletions components/playground/playground.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type Playground struct {
tikvCdcs []*instance.TiKVCDC
pumps []*instance.Pump
drainers []*instance.Drainer
dmMasters []*instance.DMMaster
dmWorkers []*instance.DMWorker
startedInstances []instance.Instance

idAlloc map[string]int
Expand Down Expand Up @@ -682,6 +684,20 @@ func (p *Playground) WalkInstances(fn func(componentID string, ins instance.Inst
}
}

for _, ins := range p.dmMasters {
err := fn(spec.ComponentDMMaster, ins)
if err != nil {
return err
}
}

for _, ins := range p.dmWorkers {
err := fn(spec.ComponentDMWorker, ins)
if err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -783,6 +799,17 @@ func (p *Playground) addInstance(componentID string, pdRole instance.PDRole, tif
inst := instance.NewDrainer(cfg.BinPath, dir, host, cfg.ConfigPath, options.PortOffset, id, p.pds)
ins = inst
p.drainers = append(p.drainers, inst)
case spec.ComponentDMMaster:
inst := instance.NewDMMaster(cfg.BinPath, dir, host, cfg.ConfigPath, options.PortOffset, id, cfg.Port)
ins = inst
p.dmMasters = append(p.dmMasters, inst)
for _, master := range p.dmMasters {
master.SetInitEndpoints(p.dmMasters)
}
case spec.ComponentDMWorker:
inst := instance.NewDMWorker(cfg.BinPath, dir, host, cfg.ConfigPath, options.PortOffset, id, cfg.Port, p.dmMasters)
ins = inst
p.dmWorkers = append(p.dmWorkers, inst)
default:
return nil, errors.Errorf("unknown component: %s", componentID)
}
Expand Down Expand Up @@ -928,6 +955,8 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme
&options.Pump,
&options.Drainer,
&options.TiKVCDC,
&options.DMMaster,
&options.DMWorker,
} {
path, err := getAbsolutePath(cfg.ConfigPath)
if err != nil {
Expand Down Expand Up @@ -969,6 +998,8 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme
{spec.ComponentCDC, "", "", options.TiCDC},
{spec.ComponentTiKVCDC, "", "", options.TiKVCDC},
{spec.ComponentDrainer, "", "", options.Drainer},
{spec.ComponentDMMaster, "", "", options.DMMaster},
{spec.ComponentDMWorker, "", "", options.DMWorker},
}

if options.Mode == "tidb" {
Expand Down Expand Up @@ -1254,6 +1285,19 @@ func (p *Playground) terminate(sig syscall.Signal) {
if p.grafana != nil && p.grafana.cmd != nil && p.grafana.cmd.Process != nil {
go kill("grafana", p.grafana.cmd.Process.Pid, p.grafana.wait)
}

for _, inst := range p.dmMasters {
if inst.Process != nil && inst.Process.Cmd() != nil && inst.Process.Cmd().Process != nil {
kill(inst.Component(), inst.Pid(), inst.Wait)
}
}

for _, inst := range p.dmWorkers {
if inst.Process != nil && inst.Process.Cmd() != nil && inst.Process.Cmd().Process != nil {
kill(inst.Component(), inst.Pid(), inst.Wait)
}
}

for _, inst := range p.tiflashs {
if inst.Process != nil && inst.Process.Cmd() != nil && inst.Process.Cmd().Process != nil {
kill(inst.Component(), inst.Pid(), inst.Wait)
Expand Down

0 comments on commit ecd4397

Please sign in to comment.