Skip to content

Commit

Permalink
pdms(playground, cluster): add name to start pdms (#2447)
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp authored Aug 7, 2024
1 parent ac3a39e commit 9c22e67
Show file tree
Hide file tree
Showing 13 changed files with 239 additions and 33 deletions.
16 changes: 15 additions & 1 deletion components/playground/instance/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/tiup/pkg/tidbver"
"github.com/pingcap/tiup/pkg/utils"
)

Expand Down Expand Up @@ -83,7 +84,14 @@ func (inst *PDInstance) InitCluster(pds []*PDInstance) *PDInstance {

// Name return the name of pd.
func (inst *PDInstance) Name() string {
return fmt.Sprintf("pd-%d", inst.ID)
switch inst.Role {
case PDRoleTSO:
return fmt.Sprintf("tso-%d", inst.ID)
case PDRoleScheduling:
return fmt.Sprintf("scheduling-%d", inst.ID)
default:
return fmt.Sprintf("pd-%d", inst.ID)
}
}

// Start calls set inst.cmd and Start
Expand Down Expand Up @@ -142,6 +150,9 @@ func (inst *PDInstance) Start(ctx context.Context) error {
fmt.Sprintf("--log-file=%s", inst.LogFile()),
fmt.Sprintf("--config=%s", configPath),
}
if tidbver.PDSupportMicroServicesWithName(inst.Version.String()) {
args = append(args, fmt.Sprintf("--name=%s", uid))
}
case PDRoleScheduling:
endpoints := pdEndpoints(inst.pds, true)
args = []string{
Expand All @@ -153,6 +164,9 @@ func (inst *PDInstance) Start(ctx context.Context) error {
fmt.Sprintf("--log-file=%s", inst.LogFile()),
fmt.Sprintf("--config=%s", configPath),
}
if tidbver.PDSupportMicroServicesWithName(inst.Version.String()) {
args = append(args, fmt.Sprintf("--name=%s", uid))
}
}

inst.Process = &process{cmd: PrepareCommand(ctx, inst.BinPath, args, nil, inst.Dir)}
Expand Down
3 changes: 3 additions & 0 deletions embed/templates/scripts/run_scheduling.sh.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ cd "${DEPLOY_DIR}" || exit 1
exec numactl --cpunodebind={{.NumaNode}} --membind={{.NumaNode}} env GODEBUG=madvdontneed=1 bin/pd-server services scheduling\
{{- else}}
exec env GODEBUG=madvdontneed=1 bin/pd-server services scheduling \
{{- end}}
{{- if .Name}}
--name="{{.Name}}" \
{{- end}}
--backend-endpoints="{{.BackendEndpoints}}" \
--listen-addr="{{.ListenURL}}" \
Expand Down
3 changes: 3 additions & 0 deletions embed/templates/scripts/run_tso.sh.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ cd "${DEPLOY_DIR}" || exit 1
exec numactl --cpunodebind={{.NumaNode}} --membind={{.NumaNode}} env GODEBUG=madvdontneed=1 bin/pd-server services tso\
{{- else}}
exec env GODEBUG=madvdontneed=1 bin/pd-server services tso \
{{- end}}
{{- if .Name}}
--name="{{.Name}}" \
{{- end}}
--backend-endpoints="{{.BackendEndpoints}}" \
--listen-addr="{{.ListenURL}}" \
Expand Down
15 changes: 15 additions & 0 deletions pkg/cluster/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,21 @@ pd_servers:
assert.Nil(err)
err = validateNewTopo(&topo)
assert.NotNil(err)

topo = spec.Specification{}
err = yaml.Unmarshal([]byte(`
global:
user: "test4"
deploy_dir: "test-deploy"
data_dir: "test-data"
tso_servers:
- host: 172.16.5.53
scheduling_servers:
- host: 172.16.5.54
`), &topo)
assert.Nil(err)
err = validateNewTopo(&topo)
assert.Nil(err)
}

func TestDeduplicateCheckResult(t *testing.T) {
Expand Down
28 changes: 28 additions & 0 deletions pkg/cluster/manager/transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,32 @@ func TestRenderSpec(t *testing.T) {
dir, err = renderSpec("{{.DataDir}}", s, "test-pd")
assert.Nil(t, err)
assert.NotEmpty(t, dir)

s = &spec.TSOInstance{BaseInstance: spec.BaseInstance{
InstanceSpec: &spec.TSOSpec{
Host: "172.16.5.140",
SSHPort: 22,
Name: "tso-1",
DeployDir: "/home/test/deploy/tso-3379",
DataDir: "/home/test/deploy/tso-3379/data",
},
}}
// s.BaseInstance.InstanceSpec
dir, err = renderSpec("{{.DataDir}}", s, "test-tso")
assert.Nil(t, err)
assert.NotEmpty(t, dir)

s = &spec.SchedulingInstance{BaseInstance: spec.BaseInstance{
InstanceSpec: &spec.SchedulingSpec{
Host: "172.16.5.140",
SSHPort: 22,
Name: "scheduling-1",
DeployDir: "/home/test/deploy/scheduling-3379",
DataDir: "/home/test/deploy/scheduling-3379/data",
},
}}
// s.BaseInstance.InstanceSpec
dir, err = renderSpec("{{.DataDir}}", s, "test-scheduling")
assert.Nil(t, err)
assert.NotEmpty(t, dir)
}
38 changes: 22 additions & 16 deletions pkg/cluster/spec/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,29 @@ import (
"github.com/pingcap/tiup/pkg/cluster/ctxt"
"github.com/pingcap/tiup/pkg/cluster/template/scripts"
"github.com/pingcap/tiup/pkg/meta"
"github.com/pingcap/tiup/pkg/tidbver"
"github.com/pingcap/tiup/pkg/utils"
)

// SchedulingSpec represents the scheduling topology specification in topology.yaml
type SchedulingSpec struct {
Host string `yaml:"host"`
ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"`
ListenHost string `yaml:"listen_host,omitempty"`
AdvertiseListenAddr string `yaml:"advertise_listen_addr,omitempty"`
SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"`
IgnoreExporter bool `yaml:"ignore_exporter,omitempty"`
Port int `yaml:"port" default:"3379"`
DeployDir string `yaml:"deploy_dir,omitempty"`
DataDir string `yaml:"data_dir,omitempty"`
LogDir string `yaml:"log_dir,omitempty"`
Source string `yaml:"source,omitempty" validate:"source:editable"`
NumaNode string `yaml:"numa_node,omitempty" validate:"numa_node:editable"`
Config map[string]any `yaml:"config,omitempty" validate:"config:ignore"`
Arch string `yaml:"arch,omitempty"`
OS string `yaml:"os,omitempty"`
Host string `yaml:"host"`
ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"`
ListenHost string `yaml:"listen_host,omitempty"`
AdvertiseListenAddr string `yaml:"advertise_listen_addr,omitempty"`
SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"`
IgnoreExporter bool `yaml:"ignore_exporter,omitempty"`
// Use Name to get the name with a default value if it's empty.
Name string `yaml:"name,omitempty"`
Port int `yaml:"port" default:"3379"`
DeployDir string `yaml:"deploy_dir,omitempty"`
DataDir string `yaml:"data_dir,omitempty"`
LogDir string `yaml:"log_dir,omitempty"`
Source string `yaml:"source,omitempty" validate:"source:editable"`
NumaNode string `yaml:"numa_node,omitempty" validate:"numa_node:editable"`
Config map[string]any `yaml:"config,omitempty" validate:"config:ignore"`
Arch string `yaml:"arch,omitempty"`
OS string `yaml:"os,omitempty"`
}

// Status queries current status of the instance
Expand Down Expand Up @@ -200,7 +203,6 @@ func (c *SchedulingComponent) Instances() []Instance {

// SchedulingInstance represent the scheduling instance
type SchedulingInstance struct {
Name string
BaseInstance
topo Topology
}
Expand Down Expand Up @@ -229,6 +231,7 @@ func (i *SchedulingInstance) InitConfig(
pds = append(pds, pdspec.GetAdvertiseClientURL(enableTLS))
}
cfg := &scripts.SchedulingScript{
Name: spec.Name,
ListenURL: fmt.Sprintf("%s://%s", scheme, utils.JoinHostPort(i.GetListenHost(), spec.Port)),
AdvertiseListenURL: spec.GetAdvertiseListenURL(enableTLS),
BackendEndpoints: strings.Join(pds, ","),
Expand All @@ -237,6 +240,9 @@ func (i *SchedulingInstance) InitConfig(
LogDir: paths.Log,
NumaNode: spec.NumaNode,
}
if !tidbver.PDSupportMicroServicesWithName(version) {
cfg.Name = ""
}

fp := filepath.Join(paths.Cache, fmt.Sprintf("run_scheduling_%s_%d.sh", i.GetHost(), i.GetPort()))
if err := cfg.ConfigToFile(fp); err != nil {
Expand Down
10 changes: 10 additions & 0 deletions pkg/cluster/spec/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,10 +679,20 @@ func setCustomDefaults(globalOptions *GlobalOptions, field reflect.Value) error
}
field.Field(j).Set(reflect.ValueOf(globalOptions.SSHPort))
case "Name":
// Only PD related components have `Name` field,
if field.Field(j).String() != "" {
continue
}
host := reflect.Indirect(field).FieldByName("Host").String()
// `TSO` and `Scheduling` components use `Port` filed
if reflect.Indirect(field).FieldByName("Port").IsValid() {
port := reflect.Indirect(field).FieldByName("Port").Int()
// field.String() is <spec.TSOSpec Value>
role := strings.Split(strings.Split(field.Type().String(), ".")[1], "Spec")[0]
component := strings.ToLower(role)
field.Field(j).Set(reflect.ValueOf(fmt.Sprintf("%s-%s-%d", component, host, port)))
continue
}
clientPort := reflect.Indirect(field).FieldByName("ClientPort").Int()
field.Field(j).Set(reflect.ValueOf(fmt.Sprintf("pd-%s-%d", host, clientPort)))
case "DataDir":
Expand Down
38 changes: 22 additions & 16 deletions pkg/cluster/spec/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,29 @@ import (
"github.com/pingcap/tiup/pkg/cluster/ctxt"
"github.com/pingcap/tiup/pkg/cluster/template/scripts"
"github.com/pingcap/tiup/pkg/meta"
"github.com/pingcap/tiup/pkg/tidbver"
"github.com/pingcap/tiup/pkg/utils"
)

// TSOSpec represents the TSO topology specification in topology.yaml
type TSOSpec struct {
Host string `yaml:"host"`
ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"`
ListenHost string `yaml:"listen_host,omitempty"`
AdvertiseListenAddr string `yaml:"advertise_listen_addr,omitempty"`
SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"`
IgnoreExporter bool `yaml:"ignore_exporter,omitempty"`
Port int `yaml:"port" default:"3379"`
DeployDir string `yaml:"deploy_dir,omitempty"`
DataDir string `yaml:"data_dir,omitempty"`
LogDir string `yaml:"log_dir,omitempty"`
Source string `yaml:"source,omitempty" validate:"source:editable"`
NumaNode string `yaml:"numa_node,omitempty" validate:"numa_node:editable"`
Config map[string]any `yaml:"config,omitempty" validate:"config:ignore"`
Arch string `yaml:"arch,omitempty"`
OS string `yaml:"os,omitempty"`
Host string `yaml:"host"`
ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"`
ListenHost string `yaml:"listen_host,omitempty"`
AdvertiseListenAddr string `yaml:"advertise_listen_addr,omitempty"`
SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"`
IgnoreExporter bool `yaml:"ignore_exporter,omitempty"`
// Use Name to get the name with a default value if it's empty.
Name string `yaml:"name,omitempty"`
Port int `yaml:"port" default:"3379"`
DeployDir string `yaml:"deploy_dir,omitempty"`
DataDir string `yaml:"data_dir,omitempty"`
LogDir string `yaml:"log_dir,omitempty"`
Source string `yaml:"source,omitempty" validate:"source:editable"`
NumaNode string `yaml:"numa_node,omitempty" validate:"numa_node:editable"`
Config map[string]any `yaml:"config,omitempty" validate:"config:ignore"`
Arch string `yaml:"arch,omitempty"`
OS string `yaml:"os,omitempty"`
}

// Status queries current status of the instance
Expand Down Expand Up @@ -200,7 +203,6 @@ func (c *TSOComponent) Instances() []Instance {

// TSOInstance represent the TSO instance
type TSOInstance struct {
Name string
BaseInstance
topo Topology
}
Expand Down Expand Up @@ -229,6 +231,7 @@ func (i *TSOInstance) InitConfig(
pds = append(pds, pdspec.GetAdvertiseClientURL(enableTLS))
}
cfg := &scripts.TSOScript{
Name: spec.Name,
ListenURL: fmt.Sprintf("%s://%s", scheme, utils.JoinHostPort(i.GetListenHost(), spec.Port)),
AdvertiseListenURL: spec.GetAdvertiseListenURL(enableTLS),
BackendEndpoints: strings.Join(pds, ","),
Expand All @@ -237,6 +240,9 @@ func (i *TSOInstance) InitConfig(
LogDir: paths.Log,
NumaNode: spec.NumaNode,
}
if !tidbver.PDSupportMicroServicesWithName(version) {
cfg.Name = ""
}

fp := filepath.Join(paths.Cache, fmt.Sprintf("run_tso_%s_%d.sh", i.GetHost(), i.GetPort()))
if err := cfg.ConfigToFile(fp); err != nil {
Expand Down
34 changes: 34 additions & 0 deletions pkg/cluster/spec/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,38 @@ func (s *Specification) validatePDNames() error {
return nil
}

func (s *Specification) validateTSONames() error {
// check tso server name
tsoNames := set.NewStringSet()
for _, tso := range s.TSOServers {
if tso.Name == "" {
continue
}

if tsoNames.Exist(tso.Name) {
return errors.Errorf("component tso_servers.name is not supported duplicated, the name %s is duplicated", tso.Name)
}
tsoNames.Insert(tso.Name)
}
return nil
}

func (s *Specification) validateSchedulingNames() error {
// check scheduling server name
schedulingNames := set.NewStringSet()
for _, scheduling := range s.SchedulingServers {
if scheduling.Name == "" {
continue
}

if schedulingNames.Exist(scheduling.Name) {
return errors.Errorf("component scheduling_servers.name is not supported duplicated, the name %s is duplicated", scheduling.Name)
}
schedulingNames.Insert(scheduling.Name)
}
return nil
}

func (s *Specification) validateTiFlashConfigs() error {
c := FindComponent(s, ComponentTiFlash)
for _, ins := range c.Instances() {
Expand Down Expand Up @@ -1063,6 +1095,8 @@ func (s *Specification) Validate() error {
s.dirConflictsDetect,
s.validateUserGroup,
s.validatePDNames,
s.validateTSONames,
s.validateSchedulingNames,
s.validateTiSparkSpec,
s.validateTiFlashConfigs,
s.validateMonitorAgent,
Expand Down
Loading

0 comments on commit 9c22e67

Please sign in to comment.