Skip to content

Commit

Permalink
support api
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Jul 12, 2024
1 parent 6d0814e commit 2ebaaeb
Show file tree
Hide file tree
Showing 19 changed files with 533 additions and 33 deletions.
16 changes: 15 additions & 1 deletion components/playground/instance/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/errors"
tiupexec "github.com/pingcap/tiup/pkg/exec"
"github.com/pingcap/tiup/pkg/tidbver"
"github.com/pingcap/tiup/pkg/utils"
)

Expand Down Expand Up @@ -84,7 +85,14 @@ func (inst *PDInstance) InitCluster(pds []*PDInstance) *PDInstance {

// Name return the name of pd.
func (inst *PDInstance) Name() string {
return fmt.Sprintf("pd-%d", inst.ID)
switch inst.Role {
case PDRoleTSO:
return fmt.Sprintf("tso-%d", inst.ID)
case PDRoleScheduling:
return fmt.Sprintf("scheduling-%d", inst.ID)
default:
return fmt.Sprintf("pd-%d", inst.ID)
}
}

// Start calls set inst.cmd and Start
Expand Down Expand Up @@ -143,6 +151,9 @@ func (inst *PDInstance) Start(ctx context.Context, version utils.Version) error
fmt.Sprintf("--log-file=%s", inst.LogFile()),
fmt.Sprintf("--config=%s", configPath),
}
if tidbver.PDSupportMicroServicesWithName(version.String()) {
args = append(args, fmt.Sprintf("--name=%s", uid))
}
case PDRoleScheduling:
endpoints := pdEndpoints(inst.pds, true)
args = []string{
Expand All @@ -154,6 +165,9 @@ func (inst *PDInstance) Start(ctx context.Context, version utils.Version) error
fmt.Sprintf("--log-file=%s", inst.LogFile()),
fmt.Sprintf("--config=%s", configPath),
}
if tidbver.PDSupportMicroServicesWithName(version.String()) {
args = append(args, fmt.Sprintf("--name=%s", uid))
}
}

var err error
Expand Down
3 changes: 3 additions & 0 deletions embed/templates/scripts/run_scheduling.sh.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ cd "${DEPLOY_DIR}" || exit 1
exec numactl --cpunodebind={{.NumaNode}} --membind={{.NumaNode}} env GODEBUG=madvdontneed=1 bin/pd-server services scheduling\
{{- else}}
exec env GODEBUG=madvdontneed=1 bin/pd-server services scheduling \
{{- end}}
{{- if .Name}}
--name="{{.Name}}" \
{{- end}}
--backend-endpoints="{{.BackendEndpoints}}" \
--listen-addr="{{.ListenURL}}" \
Expand Down
3 changes: 3 additions & 0 deletions embed/templates/scripts/run_tso.sh.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ cd "${DEPLOY_DIR}" || exit 1
exec numactl --cpunodebind={{.NumaNode}} --membind={{.NumaNode}} env GODEBUG=madvdontneed=1 bin/pd-server services tso\
{{- else}}
exec env GODEBUG=madvdontneed=1 bin/pd-server services tso \
{{- end}}
{{- if .Name}}
--name="{{.Name}}" \
{{- end}}
--backend-endpoints="{{.BackendEndpoints}}" \
--listen-addr="{{.ListenURL}}" \
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ require (
github.com/rivo/uniseg v0.4.4 // indirect
github.com/rs/xid v1.4.0 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/tklauser/go-sysconf v0.3.11 // indirect
github.com/tklauser/numcpus v0.6.0 // indirect
github.com/vishvananda/netlink v0.0.0-20210530105856-14e832ae1e8f // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ github.com/spf13/cobra v1.6.1/go.mod h1:IOw/AERYS7UzyrGinqmz6HLUo219MORXGxhbaJUq
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
Expand Down
15 changes: 15 additions & 0 deletions pkg/cluster/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,21 @@ pd_servers:
assert.Nil(err)
err = validateNewTopo(&topo)
assert.NotNil(err)

topo = spec.Specification{}
err = yaml.Unmarshal([]byte(`
global:
user: "test4"
deploy_dir: "test-deploy"
data_dir: "test-data"
tso_servers:
- host: 172.16.5.53
scheduling_servers:
- host: 172.16.5.54
`), &topo)
assert.Nil(err)
err = validateNewTopo(&topo)
assert.Nil(err)
}

func TestDeduplicateCheckResult(t *testing.T) {
Expand Down
28 changes: 28 additions & 0 deletions pkg/cluster/manager/transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,32 @@ func TestRenderSpec(t *testing.T) {
dir, err = renderSpec("{{.DataDir}}", s, "test-pd")
assert.Nil(t, err)
assert.NotEmpty(t, dir)

s = &spec.TSOInstance{BaseInstance: spec.BaseInstance{
InstanceSpec: &spec.TSOSpec{
Host: "172.16.5.140",
SSHPort: 22,
Name: "tso-1",
DeployDir: "/home/test/deploy/tso-3379",
DataDir: "/home/test/deploy/tso-3379/data",
},
}}
// s.BaseInstance.InstanceSpec
dir, err = renderSpec("{{.DataDir}}", s, "test-tso")
assert.Nil(t, err)
assert.NotEmpty(t, dir)

s = &spec.SchedulingInstance{BaseInstance: spec.BaseInstance{
InstanceSpec: &spec.SchedulingSpec{
Host: "172.16.5.140",
SSHPort: 22,
Name: "scheduling-1",
DeployDir: "/home/test/deploy/scheduling-3379",
DataDir: "/home/test/deploy/scheduling-3379/data",
},
}}
// s.BaseInstance.InstanceSpec
dir, err = renderSpec("{{.DataDir}}", s, "test-scheduling")
assert.Nil(t, err)
assert.NotEmpty(t, dir)
}
70 changes: 70 additions & 0 deletions pkg/cluster/operation/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import (
"crypto/tls"
"fmt"
"os"
"path/filepath"
"reflect"
"strings"
"time"

perrs "github.com/pingcap/errors"
"github.com/pingcap/tiup/pkg/checkpoint"
"github.com/pingcap/tiup/pkg/cluster/api"
"github.com/pingcap/tiup/pkg/cluster/ctxt"
"github.com/pingcap/tiup/pkg/cluster/spec"
logprinter "github.com/pingcap/tiup/pkg/logger/printer"
"github.com/pingcap/tiup/pkg/set"
Expand Down Expand Up @@ -144,6 +146,52 @@ func Upgrade(
logger.Debugf("Deferred upgrading of PD leader %s", instance.ID())
continue
}
case spec.ComponentTSO:
// if component version is not specified, use the cluster version or latest("")
ins := instance.(*spec.TSOInstance)
version, err := getVersion(ctx, ins.DeployDir(), ins.GetManageHost())
if err != nil {
return err
}
logger.Infof(fmt.Sprintf("tso instance change startScript %s, %s", version, ins.Name))
if tidbver.PDSupportMicroServicesWithName(version) && ins.Name != "" {
tsos := topo.(*spec.Specification).TSOServers
var name string
for _, s := range tsos {
if s.Host == ins.Host && s.Port == ins.Port {
name = s.Name
break
}
}
logger.Infof(fmt.Sprintf("tso instance change startScript %s", name))
ins := instance.(*spec.TSOInstance)
err := spec.ModifyPDStartScriptPath(ctx, ins.Role(), ins.Host, ins.GetMainPort(), name)
if err != nil {
return err
}
}
case spec.ComponentScheduling:
ins := instance.(*spec.SchedulingInstance)
version, err := getVersion(ctx, ins.DeployDir(), ins.GetManageHost())
if err != nil {
return err
}
logger.Infof(fmt.Sprintf("scheduling instance change startScript %s, %s", version, ins.Name))
if tidbver.PDSupportMicroServicesWithName(version) && ins.Name != "" {
schedulings := topo.(*spec.Specification).SchedulingServers
var name string
for _, s := range schedulings {
if s.Host == ins.Host && s.Port == ins.Port {
name = s.Name
break
}
}
logger.Infof(fmt.Sprintf("scheduling instance change startScript %s", name))
err := spec.ModifyPDStartScriptPath(ctx, ins.Role(), ins.Host, ins.GetMainPort(), name)
if err != nil {
return err
}
}
case spec.ComponentCDC:
ins := instance.(*spec.CDCInstance)
address := ins.GetAddr()
Expand Down Expand Up @@ -362,3 +410,25 @@ func decreaseScheduleLimit(pc *api.PDClient, origLeaderScheduleLimit, origRegion
}
return pc.SetReplicationConfig("region-schedule-limit", origRegionScheduleLimit)
}

func getVersion(ctx context.Context, deployDir, host string) (string, error) {
binPath := filepath.Join(deployDir, "bin/pd-server")
e, found := ctxt.GetInner(ctx).GetExecutor(host)
if !found {
return "", fmt.Errorf("no executor")
}
stdout, stderr, err := e.Execute(ctx, binPath+" --version", false)
if err != nil {
return "", perrs.Annotatef(err, "stderr: %s", string(stderr))
}

var version string
for _, line := range strings.Split(string(stdout), "\n") {
if strings.HasPrefix(line, "Release Version:") {
// Extract version number from the line
version = strings.TrimSpace(strings.TrimPrefix(line, "Release Version:"))
break
}
}
return version, nil
}
74 changes: 74 additions & 0 deletions pkg/cluster/operation/upgrade_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package operator

import (
"context"
"testing"
"time"

"github.com/pingcap/tiup/pkg/cluster/ctxt"
logprinter "github.com/pingcap/tiup/pkg/logger/printer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

// MockExecutor is a mock of Executor interface
type MockExecutor struct {
mock.Mock
}

func (m *MockExecutor) Execute(ctx context.Context, cmd string, sudo bool, timeout ...time.Duration) (stdout []byte, stderr []byte, err error) {
args := m.Called(ctx, cmd, sudo)
return args.Get(0).([]byte), args.Get(1).([]byte), args.Error(2)
}

func (m *MockExecutor) Transfer(ctx context.Context, src, dst string, download bool, limit int, compress bool) error {
panic("implement me")
}

func TestGetVersion(t *testing.T) {
ctx := ctxt.New(context.Background(), 0, logprinter.NewLogger(""))
deployDir := "/fake/deploy/dir"
host := "localhost"
expectedVersion := "v8.3.0-abcdef"
mockExecutor := new(MockExecutor)
mockExecutor.On("Execute", ctx, "/fake/deploy/dir/bin/pd-server --version", false).Return([]byte("Release Version: v8.3.0-abcdef\nEdition: Community\nGit Commit Hash: b235bc1152a7942d71d42222b0e078d91ccd0106\nGit Branch: heads/refs/tags/v8.3.0\nUTC Build Time: 2024-07-01 05:08:57"), []byte(""), nil)
ctxt.GetInner(ctx).SetExecutor(host, mockExecutor)
// Execute
version, err := getVersion(ctx, deployDir, host)
assert.NoError(t, err)
assert.Equal(t, expectedVersion, version)

expectedVersion = "v8.2.0-abcdef"
mockExecutor = new(MockExecutor)
mockExecutor.On("Execute", ctx, "/fake/deploy/dir/bin/pd-server --version", false).Return([]byte("Release Version: v8.2.0-abcdef\nEdition: Community\nGit Commit Hash: b235bc1152a7942d71d42222b0e078d91ccd0106\nGit Branch: heads/refs/tags/v8.3.0\nUTC Build Time: 2024-07-01 05:08:57"), []byte(""), nil)
ctxt.GetInner(ctx).SetExecutor(host, mockExecutor)
// Execute
version, err = getVersion(ctx, deployDir, host)
assert.NoError(t, err)
assert.Equal(t, expectedVersion, version)

expectedVersion = "nightly-abcdef"
mockExecutor = new(MockExecutor)
mockExecutor.On("Execute", ctx, "/fake/deploy/dir/bin/pd-server --version", false).Return([]byte("Release Version: nightly-abcdef\nEdition: Community\nGit Commit Hash: b235bc1152a7942d71d42222b0e078d91ccd0106\nGit Branch: heads/refs/tags/v8.3.0\nUTC Build Time: 2024-07-01 05:08:57"), []byte(""), nil)
ctxt.GetInner(ctx).SetExecutor(host, mockExecutor)
// Execute
version, err = getVersion(ctx, deployDir, host)
assert.NoError(t, err)
assert.Equal(t, expectedVersion, version)

// Teardown
mockExecutor.AssertExpectations(t)
}
38 changes: 22 additions & 16 deletions pkg/cluster/spec/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,29 @@ import (
"github.com/pingcap/tiup/pkg/cluster/ctxt"
"github.com/pingcap/tiup/pkg/cluster/template/scripts"
"github.com/pingcap/tiup/pkg/meta"
"github.com/pingcap/tiup/pkg/tidbver"
"github.com/pingcap/tiup/pkg/utils"
)

// SchedulingSpec represents the scheduling topology specification in topology.yaml
type SchedulingSpec struct {
Host string `yaml:"host"`
ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"`
ListenHost string `yaml:"listen_host,omitempty"`
AdvertiseListenAddr string `yaml:"advertise_listen_addr,omitempty"`
SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"`
IgnoreExporter bool `yaml:"ignore_exporter,omitempty"`
Port int `yaml:"port" default:"3379"`
DeployDir string `yaml:"deploy_dir,omitempty"`
DataDir string `yaml:"data_dir,omitempty"`
LogDir string `yaml:"log_dir,omitempty"`
Source string `yaml:"source,omitempty" validate:"source:editable"`
NumaNode string `yaml:"numa_node,omitempty" validate:"numa_node:editable"`
Config map[string]any `yaml:"config,omitempty" validate:"config:ignore"`
Arch string `yaml:"arch,omitempty"`
OS string `yaml:"os,omitempty"`
Host string `yaml:"host"`
ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"`
ListenHost string `yaml:"listen_host,omitempty"`
AdvertiseListenAddr string `yaml:"advertise_listen_addr,omitempty"`
SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"`
IgnoreExporter bool `yaml:"ignore_exporter,omitempty"`
// Use Name to get the name with a default value if it's empty.
Name string `yaml:"name,omitempty"`
Port int `yaml:"port" default:"3379"`
DeployDir string `yaml:"deploy_dir,omitempty"`
DataDir string `yaml:"data_dir,omitempty"`
LogDir string `yaml:"log_dir,omitempty"`
Source string `yaml:"source,omitempty" validate:"source:editable"`
NumaNode string `yaml:"numa_node,omitempty" validate:"numa_node:editable"`
Config map[string]any `yaml:"config,omitempty" validate:"config:ignore"`
Arch string `yaml:"arch,omitempty"`
OS string `yaml:"os,omitempty"`
}

// Status queries current status of the instance
Expand Down Expand Up @@ -200,7 +203,6 @@ func (c *SchedulingComponent) Instances() []Instance {

// SchedulingInstance represent the scheduling instance
type SchedulingInstance struct {
Name string
BaseInstance
topo Topology
}
Expand Down Expand Up @@ -229,6 +231,7 @@ func (i *SchedulingInstance) InitConfig(
pds = append(pds, pdspec.GetAdvertiseClientURL(enableTLS))
}
cfg := &scripts.SchedulingScript{
Name: spec.Name,
ListenURL: fmt.Sprintf("%s://%s", scheme, utils.JoinHostPort(i.GetListenHost(), spec.Port)),
AdvertiseListenURL: spec.GetAdvertiseListenURL(enableTLS),
BackendEndpoints: strings.Join(pds, ","),
Expand All @@ -237,6 +240,9 @@ func (i *SchedulingInstance) InitConfig(
LogDir: paths.Log,
NumaNode: spec.NumaNode,
}
if !tidbver.PDSupportMicroServicesWithName(version) {
cfg.Name = ""
}

fp := filepath.Join(paths.Cache, fmt.Sprintf("run_scheduling_%s_%d.sh", i.GetHost(), i.GetPort()))
if err := cfg.ConfigToFile(fp); err != nil {
Expand Down
10 changes: 10 additions & 0 deletions pkg/cluster/spec/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,10 +679,20 @@ func setCustomDefaults(globalOptions *GlobalOptions, field reflect.Value) error
}
field.Field(j).Set(reflect.ValueOf(globalOptions.SSHPort))
case "Name":
// Only PD related components have `Name` field,
if field.Field(j).String() != "" {
continue
}
host := reflect.Indirect(field).FieldByName("Host").String()
// `TSO` and `Scheduling` components use `Port` filed
if reflect.Indirect(field).FieldByName("Port").IsValid() {
port := reflect.Indirect(field).FieldByName("Port").Int()
// field.String() is <spec.TSOSpec Value>
role := strings.Split(strings.Split(field.Type().String(), ".")[1], "Spec")[0]
component := strings.ToLower(role)
field.Field(j).Set(reflect.ValueOf(fmt.Sprintf("%s-%s-%d", component, host, port)))
continue
}
clientPort := reflect.Indirect(field).FieldByName("ClientPort").Int()
field.Field(j).Set(reflect.ValueOf(fmt.Sprintf("pd-%s-%d", host, clientPort)))
case "DataDir":
Expand Down
Loading

0 comments on commit 2ebaaeb

Please sign in to comment.