Skip to content

Commit

Permalink
Merge 35e26e8 into b8fbc84
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx authored Oct 31, 2023
2 parents b8fbc84 + 35e26e8 commit cb876b0
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 26 deletions.
2 changes: 1 addition & 1 deletion components/playground/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func logIfErr(err error) {
func pdEndpoints(pds []*PDInstance, isHTTP bool) []string {
var endpoints []string
for _, pd := range pds {
if pd.Role == PDRoleTSO || pd.Role == PDRoleResourceManager {
if pd.Role == PDRoleTSO || pd.Role == PDRoleScheduling || pd.Role == PDRoleResourceManager {
continue
}
if isHTTP {
Expand Down
23 changes: 19 additions & 4 deletions components/playground/instance/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const (
PDRoleAPI PDRole = "api"
// PDRoleTSO is the role of PD TSO
PDRoleTSO PDRole = "tso"
// PDRoleScheduling is the role of PD scheduling
PDRoleScheduling PDRole = "scheduling"
// PDRoleResourceManager is the role of PD resource manager
PDRoleResourceManager PDRole = "resource manager"
)
Expand Down Expand Up @@ -128,8 +130,21 @@ func (inst *PDInstance) Start(ctx context.Context, version utils.Version) error
args = []string{
"services",
"tso",
fmt.Sprintf("--listen-addr=http://%s", utils.JoinHostPort(inst.Host, inst.Port)),
fmt.Sprintf("--advertise-listen-addr=http://%s", utils.JoinHostPort(AdvertiseHost(inst.Host), inst.Port)),
fmt.Sprintf("--listen-addr=http://%s", utils.JoinHostPort(inst.Host, inst.StatusPort)),
fmt.Sprintf("--advertise-listen-addr=http://%s", utils.JoinHostPort(AdvertiseHost(inst.Host), inst.StatusPort)),
fmt.Sprintf("--backend-endpoints=%s", strings.Join(endpoints, ",")),
fmt.Sprintf("--log-file=%s", inst.LogFile()),
}
if inst.ConfigPath != "" {
args = append(args, fmt.Sprintf("--config=%s", inst.ConfigPath))
}
case PDRoleScheduling:
endpoints := pdEndpoints(inst.pds, true)
args = []string{
"services",
"scheduling",
fmt.Sprintf("--listen-addr=http://%s", utils.JoinHostPort(inst.Host, inst.StatusPort)),
fmt.Sprintf("--advertise-listen-addr=http://%s", utils.JoinHostPort(AdvertiseHost(inst.Host), inst.StatusPort)),
fmt.Sprintf("--backend-endpoints=%s", strings.Join(endpoints, ",")),
fmt.Sprintf("--log-file=%s", inst.LogFile()),
}
Expand All @@ -141,8 +156,8 @@ func (inst *PDInstance) Start(ctx context.Context, version utils.Version) error
args = []string{
"services",
"resource-manager",
fmt.Sprintf("--listen-addr=http://%s", utils.JoinHostPort(inst.Host, inst.Port)),
fmt.Sprintf("--advertise-listen-addr=http://%s", utils.JoinHostPort(AdvertiseHost(inst.Host), inst.Port)),
fmt.Sprintf("--listen-addr=http://%s", utils.JoinHostPort(inst.Host, inst.StatusPort)),
fmt.Sprintf("--advertise-listen-addr=http://%s", utils.JoinHostPort(AdvertiseHost(inst.Host), inst.StatusPort)),
fmt.Sprintf("--backend-endpoints=%s", strings.Join(endpoints, ",")),
fmt.Sprintf("--log-file=%s", inst.LogFile()),
}
Expand Down
15 changes: 11 additions & 4 deletions components/playground/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,11 @@ type BootOptions struct {
Mode string `yaml:"mode"`
PDMode string `yaml:"pd_mode"`
Version string `yaml:"version"`
PD instance.Config `yaml:"pd"` // ignored when pd_mode == ms
PDAPI instance.Config `yaml:"pd_api"` // Only available when pd_mode == ms
PDTSO instance.Config `yaml:"pd_tso"` // Only available when pd_mode == ms
PDRM instance.Config `yaml:"pd_rm"` // Only available when pd_mode == ms
PD instance.Config `yaml:"pd"` // ignored when pd_mode == ms
PDAPI instance.Config `yaml:"pd_api"` // Only available when pd_mode == ms
PDTSO instance.Config `yaml:"pd_tso"` // Only available when pd_mode == ms
PDScheduling instance.Config `yaml:"pd_scheduling"` // Only available when pd_mode == ms
PDRM instance.Config `yaml:"pd_rm"` // Only available when pd_mode == ms
TiProxy instance.Config `yaml:"tiproxy"`
TiDB instance.Config `yaml:"tidb"`
TiKV instance.Config `yaml:"tikv"`
Expand Down Expand Up @@ -294,6 +295,7 @@ If you'd like to use a TiDB version other than %s, cancel and retry with the fol

rootCmd.Flags().IntVar(&options.PDAPI.Num, "pd.api", 0, "PD API instance number")
rootCmd.Flags().IntVar(&options.PDTSO.Num, "pd.tso", 0, "PD TSO instance number")
rootCmd.Flags().IntVar(&options.PDScheduling.Num, "pd.scheduling", 0, "PD scheduling instance number")
rootCmd.Flags().IntVar(&options.PDRM.Num, "pd.rm", 0, "PD resource manager instance number")

rootCmd.Flags().IntVar(&options.TiDB.UpTimeout, "db.timeout", 60, "TiDB max wait time in seconds for starting, 0 means no limit")
Expand Down Expand Up @@ -326,6 +328,7 @@ If you'd like to use a TiDB version other than %s, cancel and retry with the fol

rootCmd.Flags().StringVar(&options.PDAPI.ConfigPath, "pd.api.config", "", "PD API instance configuration file")
rootCmd.Flags().StringVar(&options.PDTSO.ConfigPath, "pd.tso.config", "", "PD TSO instance configuration file")
rootCmd.Flags().StringVar(&options.PDScheduling.ConfigPath, "pd.scheduling.config", "", "PD scheduling instance configuration file")
rootCmd.Flags().StringVar(&options.PDRM.ConfigPath, "pd.rm.config", "", "PD resource manager instance configuration file")

rootCmd.Flags().StringVar(&options.TiDB.BinPath, "db.binpath", "", "TiDB instance binary path")
Expand All @@ -342,6 +345,7 @@ If you'd like to use a TiDB version other than %s, cancel and retry with the fol

rootCmd.Flags().StringVar(&options.PDAPI.BinPath, "pd.api.binpath", "", "PD API instance binary path")
rootCmd.Flags().StringVar(&options.PDTSO.BinPath, "pd.tso.binpath", "", "PD TSO instance binary path")
rootCmd.Flags().StringVar(&options.PDScheduling.BinPath, "pd.scheduling.binpath", "", "PD scheduling instance binary path")
rootCmd.Flags().StringVar(&options.PDRM.BinPath, "pd.rm.binpath", "", "PD resource manager instance binary path")

rootCmd.Flags().StringVar(&options.TiKVCDC.Version, "kvcdc.version", "", "TiKV-CDC instance version")
Expand Down Expand Up @@ -409,6 +413,9 @@ func populateDefaultOpt(flagSet *pflag.FlagSet) error {
defaultInt(&options.PDTSO.Num, "pd.tso", 1)
defaultStr(&options.PDTSO.BinPath, "pd.tso.binpath", options.PDTSO.BinPath)
defaultStr(&options.PDTSO.ConfigPath, "pd.tso.config", options.PDTSO.ConfigPath)
defaultInt(&options.PDScheduling.Num, "pd.scheduling", 1)
defaultStr(&options.PDScheduling.BinPath, "pd.scheduling.binpath", options.PDScheduling.BinPath)
defaultStr(&options.PDScheduling.ConfigPath, "pd.scheduling.config", options.PDScheduling.ConfigPath)
defaultInt(&options.PDRM.Num, "pd.rm", 1)
defaultStr(&options.PDRM.BinPath, "pd.rm.binpath", options.PDRM.BinPath)
defaultStr(&options.PDRM.ConfigPath, "pd.rm.config", options.PDRM.ConfigPath)
Expand Down
83 changes: 66 additions & 17 deletions components/playground/playground.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ type Playground struct {
port int

pds []*instance.PDInstance
tsos []*instance.PDInstance
schedulings []*instance.PDInstance
rms []*instance.PDInstance
tikvs []*instance.TiKVInstance
tidbs []*instance.TiDBInstance
tiflashs []*instance.TiFlashInstance
Expand Down Expand Up @@ -276,6 +279,7 @@ func (p *Playground) handleScaleIn(w io.Writer, pid int) error {

switch cid {
case spec.ComponentPD:
// microservice not support scale in temporarily
for i := 0; i < len(p.pds); i++ {
if p.pds[i].Pid() == pid {
inst := p.pds[i]
Expand Down Expand Up @@ -593,6 +597,24 @@ func (p *Playground) WalkInstances(fn func(componentID string, ins instance.Inst
return err
}
}
for _, ins := range p.tsos {
err := fn(spec.ComponentPD, ins)
if err != nil {
return err
}
}
for _, ins := range p.schedulings {
err := fn(spec.ComponentPD, ins)
if err != nil {
return err
}
}
for _, ins := range p.rms {
err := fn(spec.ComponentPD, ins)
if err != nil {
return err
}
}
for _, ins := range p.tikvs {
err := fn(spec.ComponentTiKV, ins)
if err != nil {
Expand Down Expand Up @@ -698,8 +720,12 @@ func (p *Playground) addInstance(componentID string, pdRole instance.PDRole, tif
pd.InitCluster(p.pds)
}
}
} else {
p.pds = append(p.pds, inst)
} else if pdRole == instance.PDRoleTSO {
p.tsos = append(p.tsos, inst)
} else if pdRole == instance.PDRoleScheduling {
p.schedulings = append(p.schedulings, inst)
} else if pdRole == instance.PDRoleResourceManager {
p.rms = append(p.rms, inst)
}
case spec.ComponentTiDB:
inst := instance.NewTiDBInstance(cfg.BinPath, dir, host, cfg.ConfigPath, id, cfg.Port, p.pds, p.enableBinlog(), p.bootOptions.Mode == "tidb-disagg")
Expand Down Expand Up @@ -866,6 +892,7 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme
&options.PD,
&options.PDAPI,
&options.PDTSO,
&options.PDScheduling,
&options.PDRM,
&options.TiProxy,
&options.TiDB,
Expand Down Expand Up @@ -975,6 +1002,7 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme
instances = append([]InstancePair{
{spec.ComponentPD, instance.PDRoleAPI, instance.TiFlashRoleNormal, options.PDAPI},
{spec.ComponentPD, instance.PDRoleTSO, instance.TiFlashRoleNormal, options.PDTSO},
{spec.ComponentPD, instance.PDRoleScheduling, instance.TiFlashRoleNormal, options.PDScheduling},
{spec.ComponentPD, instance.PDRoleResourceManager, instance.TiFlashRoleNormal, options.PDRM}},
instances...,
)
Expand Down Expand Up @@ -1080,25 +1108,31 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme
if p.bootOptions.Mode == "tikv-slim" {
if p.bootOptions.PDMode == "ms" {
var (
tsoAddr []string
apiAddr []string
rmAddr []string
tsoAddr []string
apiAddr []string
rmAddr []string
schedulingAddr []string
)
for _, pd := range p.pds {
switch pd.Role {
case instance.PDRoleTSO:
tsoAddr = append(tsoAddr, pd.Addr())
case instance.PDRoleAPI:
apiAddr = append(apiAddr, pd.Addr())
case instance.PDRoleResourceManager:
rmAddr = append(rmAddr, pd.Addr())
}
for _, api := range p.pds {
apiAddr = append(apiAddr, api.Addr())
}
fmt.Printf("PD TSO Endpoints: ")
colorCmd.Printf("%s\n", strings.Join(tsoAddr, ","))
for _, tso := range p.tsos {
tsoAddr = append(tsoAddr, tso.Addr())
}
for _, scheduling := range p.schedulings {
schedulingAddr = append(schedulingAddr, scheduling.Addr())
}
for _, rm := range p.rms {
rmAddr = append(rmAddr, rm.Addr())
}

fmt.Printf("PD API Endpoints: ")
colorCmd.Printf("%s\n", strings.Join(apiAddr, ","))
fmt.Printf("PD Resource Ranager Endpoints: ")
fmt.Printf("PD TSO Endpoints: ")
colorCmd.Printf("%s\n", strings.Join(tsoAddr, ","))
fmt.Printf("PD Scheduling Endpoints: ")
colorCmd.Printf("%s\n", strings.Join(schedulingAddr, ","))
fmt.Printf("PD Resource Manager Endpoints: ")
colorCmd.Printf("%s\n", strings.Join(rmAddr, ","))
} else {
var pdAddrs []string
Expand Down Expand Up @@ -1232,6 +1266,21 @@ func (p *Playground) terminate(sig syscall.Signal) {
kill(inst.Component(), inst.Pid(), inst.Wait)
}
}
for _, inst := range p.tsos {
if inst.Process != nil && inst.Process.Cmd() != nil && inst.Process.Cmd().Process != nil {
kill(inst.Component(), inst.Pid(), inst.Wait)
}
}
for _, inst := range p.schedulings {
if inst.Process != nil && inst.Process.Cmd() != nil && inst.Process.Cmd().Process != nil {
kill(inst.Component(), inst.Pid(), inst.Wait)
}
}
for _, inst := range p.rms {
if inst.Process != nil && inst.Process.Cmd() != nil && inst.Process.Cmd().Process != nil {
kill(inst.Component(), inst.Pid(), inst.Wait)
}
}
}

func (p *Playground) renderSDFile() error {
Expand Down

0 comments on commit cb876b0

Please sign in to comment.