diff --git a/components/cluster/command/upgrade.go b/components/cluster/command/upgrade.go index d9285b136d..adb611e5ce 100644 --- a/components/cluster/command/upgrade.go +++ b/components/cluster/command/upgrade.go @@ -22,7 +22,7 @@ import ( func newUpgradeCmd() *cobra.Command { offlineMode := false ignoreVersionCheck := false - var tidbVer, tikvVer, pdVer, tiflashVer, kvcdcVer, dashboardVer, cdcVer, alertmanagerVer, nodeExporterVer, blackboxExporterVer, tiproxyVer string + var tidbVer, tikvVer, pdVer, tsoVer, schedulingVer, tiflashVer, kvcdcVer, dashboardVer, cdcVer, alertmanagerVer, nodeExporterVer, blackboxExporterVer, tiproxyVer string cmd := &cobra.Command{ Use: "upgrade ", @@ -47,6 +47,8 @@ func newUpgradeCmd() *cobra.Command { spec.ComponentTiDB: tidbVer, spec.ComponentTiKV: tikvVer, spec.ComponentPD: pdVer, + spec.ComponentTSO: tsoVer, + spec.ComponentScheduling: schedulingVer, spec.ComponentTiFlash: tiflashVer, spec.ComponentTiKVCDC: kvcdcVer, spec.ComponentCDC: cdcVer, @@ -76,7 +78,9 @@ func newUpgradeCmd() *cobra.Command { // cmd.Flags().StringVar(&tidbVer, "tidb-version", "", "Fix the version of tidb and no longer follows the cluster version.") cmd.Flags().StringVar(&tikvVer, "tikv-version", "", "Fix the version of tikv and no longer follows the cluster version.") - cmd.Flags().StringVar(&pdVer, "pd-version", "", "Fix the version of pv and no longer follows the cluster version.") + cmd.Flags().StringVar(&pdVer, "pd-version", "", "Fix the version of pd and no longer follows the cluster version.") + cmd.Flags().StringVar(&tsoVer, "tso-version", "", "Fix the version of tso and no longer follows the cluster version.") + cmd.Flags().StringVar(&schedulingVer, "scheduling-version", "", "Fix the version of scheduling and no longer follows the cluster version.") cmd.Flags().StringVar(&tiflashVer, "tiflash-version", "", "Fix the version of tiflash and no longer follows the cluster version.") cmd.Flags().StringVar(&dashboardVer, "tidb-dashboard-version", "", "Fix the version of tidb-dashboard and no longer follows the cluster version.") cmd.Flags().StringVar(&cdcVer, "cdc-version", "", "Fix the version of cdc and no longer follows the cluster version.") diff --git a/embed/templates/config/prometheus.yml.tpl b/embed/templates/config/prometheus.yml.tpl index 0770f36d2e..1981b7fa34 100644 --- a/embed/templates/config/prometheus.yml.tpl +++ b/embed/templates/config/prometheus.yml.tpl @@ -155,6 +155,36 @@ scrape_configs: - targets: {{- range .PDAddrs}} - '{{.}}' +{{- end}} + - job_name: "tso" + honor_labels: true # don't overwrite job & instance labels +{{- if .TLSEnabled}} + scheme: https + tls_config: + insecure_skip_verify: false + ca_file: ../tls/ca.crt + cert_file: ../tls/prometheus.crt + key_file: ../tls/prometheus.pem +{{- end}} + static_configs: + - targets: +{{- range .TSOAddrs}} + - '{{.}}' +{{- end}} + - job_name: "scheduling" + honor_labels: true # don't overwrite job & instance labels +{{- if .TLSEnabled}} + scheme: https + tls_config: + insecure_skip_verify: false + ca_file: ../tls/ca.crt + cert_file: ../tls/prometheus.crt + key_file: ../tls/prometheus.pem +{{- end}} + static_configs: + - targets: +{{- range .SchedulingAddrs}} + - '{{.}}' {{- end}} {{- if .TiFlashStatusAddrs}} - job_name: "tiflash" diff --git a/embed/templates/scripts/run_pd.sh.tpl b/embed/templates/scripts/run_pd.sh.tpl index 0b659ac329..31094d3b18 100644 --- a/embed/templates/scripts/run_pd.sh.tpl +++ b/embed/templates/scripts/run_pd.sh.tpl @@ -7,11 +7,15 @@ DEPLOY_DIR={{.DeployDir}} cd "${DEPLOY_DIR}" || exit 1 +exec \ {{- if .NumaNode}} -exec numactl --cpunodebind={{.NumaNode}} --membind={{.NumaNode}} env GODEBUG=madvdontneed=1 bin/pd-server \ -{{- else}} -exec env GODEBUG=madvdontneed=1 bin/pd-server \ + numactl --cpunodebind={{.NumaNode}} --membind={{.NumaNode}} \ {{- end}} + env GODEBUG=madvdontneed=1 \ +{{- if .MSMode}} + PD_SERVICE_MODE=api \ +{{- end}} + bin/pd-server \ --name="{{.Name}}" \ --client-urls="{{.ClientURL}}" \ --advertise-client-urls="{{.AdvertiseClientURL}}" \ diff --git a/embed/templates/scripts/run_pd_scale.sh.tpl b/embed/templates/scripts/run_pd_scale.sh.tpl index cecfd52fe1..cb2f0513bf 100644 --- a/embed/templates/scripts/run_pd_scale.sh.tpl +++ b/embed/templates/scripts/run_pd_scale.sh.tpl @@ -7,11 +7,15 @@ DEPLOY_DIR={{.DeployDir}} cd "${DEPLOY_DIR}" || exit 1 +exec \ {{- if .NumaNode}} -exec numactl --cpunodebind={{.NumaNode}} --membind={{.NumaNode}} env GODEBUG=madvdontneed=1 bin/pd-server \ -{{- else}} -exec env GODEBUG=madvdontneed=1 bin/pd-server \ + numactl --cpunodebind={{.NumaNode}} --membind={{.NumaNode}} \ {{- end}} + env GODEBUG=madvdontneed=1 \ +{{- if .MSMode}} + PD_SERVICE_MODE=api \ +{{- end}} + bin/pd-server \ --name="{{.Name}}" \ --client-urls="{{.ClientURL}}" \ --advertise-client-urls="{{.AdvertiseClientURL}}" \ diff --git a/embed/templates/scripts/run_scheduling.sh.tpl b/embed/templates/scripts/run_scheduling.sh.tpl new file mode 100644 index 0000000000..a15b1ba4f7 --- /dev/null +++ b/embed/templates/scripts/run_scheduling.sh.tpl @@ -0,0 +1,19 @@ +#!/bin/bash +set -e + +# WARNING: This file was auto-generated. Do not edit! +# All your edit might be overwritten! +DEPLOY_DIR={{.DeployDir}} + +cd "${DEPLOY_DIR}" || exit 1 + +{{- if .NumaNode}} +exec numactl --cpunodebind={{.NumaNode}} --membind={{.NumaNode}} env GODEBUG=madvdontneed=1 bin/pd-server services scheduling\ +{{- else}} +exec env GODEBUG=madvdontneed=1 bin/pd-server services scheduling \ +{{- end}} + --backend-endpoints="{{.BackendEndpoints}}" \ + --listen-addr="{{.ListenURL}}" \ + --advertise-listen-addr="{{.AdvertiseListenURL}}" \ + --config=conf/scheduling.toml \ + --log-file="{{.LogDir}}/scheduling.log" 2>> "{{.LogDir}}/scheduling_stderr.log" diff --git a/embed/templates/scripts/run_tso.sh.tpl b/embed/templates/scripts/run_tso.sh.tpl new file mode 100644 index 0000000000..0d6486d73e --- /dev/null +++ b/embed/templates/scripts/run_tso.sh.tpl @@ -0,0 +1,19 @@ +#!/bin/bash +set -e + +# WARNING: This file was auto-generated. Do not edit! +# All your edit might be overwritten! +DEPLOY_DIR={{.DeployDir}} + +cd "${DEPLOY_DIR}" || exit 1 + +{{- if .NumaNode}} +exec numactl --cpunodebind={{.NumaNode}} --membind={{.NumaNode}} env GODEBUG=madvdontneed=1 bin/pd-server services tso\ +{{- else}} +exec env GODEBUG=madvdontneed=1 bin/pd-server services tso \ +{{- end}} + --backend-endpoints="{{.BackendEndpoints}}" \ + --listen-addr="{{.ListenURL}}" \ + --advertise-listen-addr="{{.AdvertiseListenURL}}" \ + --config=conf/tso.toml \ + --log-file="{{.LogDir}}/tso.log" 2>> "{{.LogDir}}/tso_stderr.log" diff --git a/pkg/cluster/ansible/import_test.go b/pkg/cluster/ansible/import_test.go index 8c1f0fe1b6..f951fd1afc 100644 --- a/pkg/cluster/ansible/import_test.go +++ b/pkg/cluster/ansible/import_test.go @@ -124,6 +124,8 @@ server_configs: binlog.enable: true tikv: {} pd: {} + tso: {} + scheduling: {} tidb_dashboard: {} tiflash: {} tiproxy: {} diff --git a/pkg/cluster/ansible/test-data/meta.yaml b/pkg/cluster/ansible/test-data/meta.yaml index e143d7cd72..d115b04ccd 100644 --- a/pkg/cluster/ansible/test-data/meta.yaml +++ b/pkg/cluster/ansible/test-data/meta.yaml @@ -19,6 +19,8 @@ topology: binlog.enable: true tikv: {} pd: {} + tso: {} + scheduling: {} tidb_dashboard: {} tiflash: {} tiproxy: {} diff --git a/pkg/cluster/api/pdapi.go b/pkg/cluster/api/pdapi.go index 329946bacf..b0e7f79595 100644 --- a/pkg/cluster/api/pdapi.go +++ b/pkg/cluster/api/pdapi.go @@ -139,6 +139,7 @@ var ( pdStoresURI = "pd/api/v1/stores" pdStoresLimitURI = "pd/api/v1/stores/limit" pdRegionsCheckURI = "pd/api/v1/regions/check" + pdServicePrimaryURI = "pd/api/v2/ms/primary" tsoHealthPrefix = "tso/api/v1/health" ) @@ -1006,3 +1007,214 @@ func (pc *PDClient) SetAllStoreLimits(value int) error { pc.l().Debugf("setting store limit: %d", value) return pc.updateConfig(pdStoresLimitURI, bytes.NewBuffer(body)) } + +// GetServicePrimary queries for the primary of a service +func (pc *PDClient) GetServicePrimary(service string) (string, error) { + endpoints := pc.getEndpoints(fmt.Sprintf("%s/%s", pdServicePrimaryURI, service)) + + var primary string + _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { + body, err := pc.httpClient.Get(pc.ctx, endpoint) + if err != nil { + return body, err + } + + return body, json.Unmarshal(body, &primary) + }) + return primary, err +} + +const ( + tsoStatusURI = "status" + schedulingStatusURI = "status" +) + +// TSOClient is an HTTP client of the TSO server +type TSOClient struct { + version string + addrs []string + tlsEnabled bool + httpClient *utils.HTTPClient + ctx context.Context +} + +// NewTSOClient returns a new TSOClient, the context must have +// a *logprinter.Logger as value of "logger" +func NewTSOClient( + ctx context.Context, + addrs []string, + timeout time.Duration, + tlsConfig *tls.Config, +) *TSOClient { + enableTLS := false + if tlsConfig != nil { + enableTLS = true + } + + if _, ok := ctx.Value(logprinter.ContextKeyLogger).(*logprinter.Logger); !ok { + panic("the context must have logger inside") + } + + cli := &TSOClient{ + addrs: addrs, + tlsEnabled: enableTLS, + httpClient: utils.NewHTTPClient(timeout, tlsConfig), + ctx: ctx, + } + + cli.tryIdentifyVersion() + return cli +} + +// func (tc *TSOClient) l() *logprinter.Logger { +// return tc.ctx.Value(logprinter.ContextKeyLogger).(*logprinter.Logger) +// } + +func (tc *TSOClient) tryIdentifyVersion() { + endpoints := tc.getEndpoints(tsoStatusURI) + response := map[string]string{} + _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { + body, err := tc.httpClient.Get(tc.ctx, endpoint) + if err != nil { + return body, err + } + + return body, json.Unmarshal(body, &response) + }) + if err == nil { + tc.version = response["version"] + } +} + +// GetURL builds the client URL of PDClient +func (tc *TSOClient) GetURL(addr string) string { + httpPrefix := "http" + if tc.tlsEnabled { + httpPrefix = "https" + } + return fmt.Sprintf("%s://%s", httpPrefix, addr) +} + +func (tc *TSOClient) getEndpoints(uri string) (endpoints []string) { + for _, addr := range tc.addrs { + endpoint := fmt.Sprintf("%s/%s", tc.GetURL(addr), uri) + endpoints = append(endpoints, endpoint) + } + + return +} + +// CheckHealth checks the health of TSO node. +func (tc *TSOClient) CheckHealth() error { + endpoints := tc.getEndpoints(tsoStatusURI) + + _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { + body, err := tc.httpClient.Get(tc.ctx, endpoint) + if err != nil { + return body, err + } + + return body, nil + }) + + if err != nil { + return err + } + + return nil +} + +// SchedulingClient is an HTTP client of the scheduling server +type SchedulingClient struct { + version string + addrs []string + tlsEnabled bool + httpClient *utils.HTTPClient + ctx context.Context +} + +// NewSchedulingClient returns a new SchedulingClient, the context must have +// a *logprinter.Logger as value of "logger" +func NewSchedulingClient( + ctx context.Context, + addrs []string, + timeout time.Duration, + tlsConfig *tls.Config, +) *SchedulingClient { + enableTLS := false + if tlsConfig != nil { + enableTLS = true + } + + if _, ok := ctx.Value(logprinter.ContextKeyLogger).(*logprinter.Logger); !ok { + panic("the context must have logger inside") + } + + cli := &SchedulingClient{ + addrs: addrs, + tlsEnabled: enableTLS, + httpClient: utils.NewHTTPClient(timeout, tlsConfig), + ctx: ctx, + } + + cli.tryIdentifyVersion() + return cli +} + +// func (tc *SchedulingClient) l() *logprinter.Logger { +// return tc.ctx.Value(logprinter.ContextKeyLogger).(*logprinter.Logger) +// } + +func (tc *SchedulingClient) tryIdentifyVersion() { + endpoints := tc.getEndpoints(schedulingStatusURI) + response := map[string]string{} + _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { + body, err := tc.httpClient.Get(tc.ctx, endpoint) + if err != nil { + return body, err + } + + return body, json.Unmarshal(body, &response) + }) + if err == nil { + tc.version = response["version"] + } +} + +// GetURL builds the client URL of PDClient +func (tc *SchedulingClient) GetURL(addr string) string { + httpPrefix := "http" + if tc.tlsEnabled { + httpPrefix = "https" + } + return fmt.Sprintf("%s://%s", httpPrefix, addr) +} + +func (tc *SchedulingClient) getEndpoints(uri string) (endpoints []string) { + for _, addr := range tc.addrs { + endpoint := fmt.Sprintf("%s/%s", tc.GetURL(addr), uri) + endpoints = append(endpoints, endpoint) + } + + return +} + +// CheckHealth checks the health of scheduling node. +func (tc *SchedulingClient) CheckHealth() error { + endpoints := tc.getEndpoints(schedulingStatusURI) + + _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { + body, err := tc.httpClient.Get(tc.ctx, endpoint) + if err != nil { + return body, err + } + + return body, nil + }) + + if err != nil { + return err + } + + return nil +} diff --git a/pkg/cluster/spec/instance.go b/pkg/cluster/spec/instance.go index 33c79b570c..92a301e664 100644 --- a/pkg/cluster/spec/instance.go +++ b/pkg/cluster/spec/instance.go @@ -40,6 +40,8 @@ const ( ComponentTiDB = "tidb" ComponentTiKV = "tikv" ComponentPD = "pd" + ComponentTSO = "tso" + ComponentScheduling = "scheduling" ComponentTiFlash = "tiflash" ComponentTiProxy = "tiproxy" ComponentGrafana = "grafana" diff --git a/pkg/cluster/spec/monitoring.go b/pkg/cluster/spec/monitoring.go index cf219ca9d0..8627b1c05c 100644 --- a/pkg/cluster/spec/monitoring.go +++ b/pkg/cluster/spec/monitoring.go @@ -261,6 +261,20 @@ func (i *MonitorInstance) InitConfig( cfig.AddPD(pd.Host, uint64(pd.ClientPort)) } } + if servers, found := topoHasField("TSOServers"); found { + for i := 0; i < servers.Len(); i++ { + tso := servers.Index(i).Interface().(*TSOSpec) + uniqueHosts.Insert(tso.Host) + cfig.AddTSO(tso.Host, uint64(tso.Port)) + } + } + if servers, found := topoHasField("SchedulingServers"); found { + for i := 0; i < servers.Len(); i++ { + scheduling := servers.Index(i).Interface().(*SchedulingSpec) + uniqueHosts.Insert(scheduling.Host) + cfig.AddScheduling(scheduling.Host, uint64(scheduling.Port)) + } + } if servers, found := topoHasField("TiKVServers"); found { for i := 0; i < servers.Len(); i++ { kv := servers.Index(i).Interface().(*TiKVSpec) diff --git a/pkg/cluster/spec/pd.go b/pkg/cluster/spec/pd.go index 25fa6230e7..680dbc86e1 100644 --- a/pkg/cluster/spec/pd.go +++ b/pkg/cluster/spec/pd.go @@ -254,6 +254,7 @@ func (i *PDInstance) InitConfig( LogDir: paths.Log, InitialCluster: strings.Join(initialCluster, ","), NumaNode: spec.NumaNode, + MSMode: topo.GlobalOptions.PDMode == "ms", } fp := filepath.Join(paths.Cache, fmt.Sprintf("run_pd_%s_%d.sh", i.GetHost(), i.GetPort())) @@ -378,6 +379,7 @@ func (i *PDInstance) ScaleConfig( LogDir: paths.Log, InitialCluster: strings.Join(initialCluster, ","), NumaNode: spec.NumaNode, + MSMode: cluster.GlobalOptions.PDMode == "ms", } join := []string{} diff --git a/pkg/cluster/spec/scheduling.go b/pkg/cluster/spec/scheduling.go new file mode 100644 index 0000000000..efbe7def00 --- /dev/null +++ b/pkg/cluster/spec/scheduling.go @@ -0,0 +1,334 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package spec + +import ( + "context" + "crypto/tls" + "fmt" + "path/filepath" + "strings" + "time" + + "github.com/pingcap/tiup/pkg/cluster/api" + "github.com/pingcap/tiup/pkg/cluster/ctxt" + "github.com/pingcap/tiup/pkg/cluster/template/scripts" + "github.com/pingcap/tiup/pkg/meta" + "github.com/pingcap/tiup/pkg/utils" +) + +// SchedulingSpec represents the scheduling topology specification in topology.yaml +type SchedulingSpec struct { + Host string `yaml:"host"` + ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"` + ListenHost string `yaml:"listen_host,omitempty"` + AdvertiseListenAddr string `yaml:"advertise_listen_addr,omitempty"` + SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"` + IgnoreExporter bool `yaml:"ignore_exporter,omitempty"` + Port int `yaml:"port" default:"3379"` + DeployDir string `yaml:"deploy_dir,omitempty"` + DataDir string `yaml:"data_dir,omitempty"` + LogDir string `yaml:"log_dir,omitempty"` + Source string `yaml:"source,omitempty" validate:"source:editable"` + NumaNode string `yaml:"numa_node,omitempty" validate:"numa_node:editable"` + Config map[string]any `yaml:"config,omitempty" validate:"config:ignore"` + Arch string `yaml:"arch,omitempty"` + OS string `yaml:"os,omitempty"` +} + +// Status queries current status of the instance +func (s *SchedulingSpec) Status(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, pdList ...string) string { + if timeout < time.Second { + timeout = statusQueryTimeout + } + + addr := utils.JoinHostPort(s.GetManageHost(), s.Port) + tc := api.NewSchedulingClient(ctx, []string{addr}, timeout, tlsCfg) + pc := api.NewPDClient(ctx, pdList, timeout, tlsCfg) + + // check health + err := tc.CheckHealth() + if err != nil { + return "Down" + } + + primary, err := pc.GetServicePrimary("scheduling") + if err != nil { + return "ERR" + } + res := "Up" + enableTLS := false + if tlsCfg != nil { + enableTLS = true + } + if s.GetAdvertiseListenURL(enableTLS) == primary { + res += "|P" + } + + return res +} + +// Role returns the component role of the instance +func (s *SchedulingSpec) Role() string { + return ComponentScheduling +} + +// SSH returns the host and SSH port of the instance +func (s *SchedulingSpec) SSH() (string, int) { + host := s.Host + if s.ManageHost != "" { + host = s.ManageHost + } + return host, s.SSHPort +} + +// GetMainPort returns the main port of the instance +func (s *SchedulingSpec) GetMainPort() int { + return s.Port +} + +// GetManageHost returns the manage host of the instance +func (s *SchedulingSpec) GetManageHost() string { + if s.ManageHost != "" { + return s.ManageHost + } + return s.Host +} + +// IsImported returns if the node is imported from TiDB-Ansible +func (s *SchedulingSpec) IsImported() bool { + return false +} + +// IgnoreMonitorAgent returns if the node does not have monitor agents available +func (s *SchedulingSpec) IgnoreMonitorAgent() bool { + return s.IgnoreExporter +} + +// GetAdvertiseListenURL returns AdvertiseListenURL +func (s *SchedulingSpec) GetAdvertiseListenURL(enableTLS bool) string { + if s.AdvertiseListenAddr != "" { + return s.AdvertiseListenAddr + } + scheme := utils.Ternary(enableTLS, "https", "http").(string) + return fmt.Sprintf("%s://%s", scheme, utils.JoinHostPort(s.Host, s.Port)) +} + +// SchedulingComponent represents scheduling component. +type SchedulingComponent struct{ Topology *Specification } + +// Name implements Component interface. +func (c *SchedulingComponent) Name() string { + return ComponentScheduling +} + +// Role implements Component interface. +func (c *SchedulingComponent) Role() string { + return ComponentScheduling +} + +// Source implements Component interface. +func (c *SchedulingComponent) Source() string { + source := c.Topology.ComponentSources.PD + if source != "" { + return source + } + return ComponentPD +} + +// CalculateVersion implements the Component interface +func (c *SchedulingComponent) CalculateVersion(clusterVersion string) string { + version := c.Topology.ComponentVersions.Scheduling + if version == "" { + version = clusterVersion + } + return version +} + +// SetVersion implements Component interface. +func (c *SchedulingComponent) SetVersion(version string) { + c.Topology.ComponentVersions.Scheduling = version +} + +// Instances implements Component interface. +func (c *SchedulingComponent) Instances() []Instance { + ins := make([]Instance, 0, len(c.Topology.SchedulingServers)) + for _, s := range c.Topology.SchedulingServers { + s := s + ins = append(ins, &SchedulingInstance{ + BaseInstance: BaseInstance{ + InstanceSpec: s, + Name: c.Name(), + Host: s.Host, + ManageHost: s.ManageHost, + ListenHost: utils.Ternary(s.ListenHost != "", s.ListenHost, c.Topology.BaseTopo().GlobalOptions.ListenHost).(string), + Port: s.Port, + SSHP: s.SSHPort, + Source: s.Source, + NumaNode: s.NumaNode, + NumaCores: "", + + Ports: []int{ + s.Port, + }, + Dirs: []string{ + s.DeployDir, + s.DataDir, + }, + StatusFn: s.Status, + UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { + return UptimeByHost(s.GetManageHost(), s.Port, timeout, tlsCfg) + }, + Component: c, + }, + topo: c.Topology, + }) + } + return ins +} + +// SchedulingInstance represent the scheduling instance +type SchedulingInstance struct { + Name string + BaseInstance + topo Topology +} + +// InitConfig implement Instance interface +func (i *SchedulingInstance) InitConfig( + ctx context.Context, + e ctxt.Executor, + clusterName, + clusterVersion, + deployUser string, + paths meta.DirPaths, +) error { + topo := i.topo.(*Specification) + if err := i.BaseInstance.InitConfig(ctx, e, topo.GlobalOptions, deployUser, paths); err != nil { + return err + } + + enableTLS := topo.GlobalOptions.TLSEnabled + spec := i.InstanceSpec.(*SchedulingSpec) + scheme := utils.Ternary(enableTLS, "https", "http").(string) + version := i.CalculateVersion(clusterVersion) + + pds := []string{} + for _, pdspec := range topo.PDServers { + pds = append(pds, pdspec.GetAdvertiseClientURL(enableTLS)) + } + cfg := &scripts.SchedulingScript{ + ListenURL: fmt.Sprintf("%s://%s", scheme, utils.JoinHostPort(i.GetListenHost(), spec.Port)), + AdvertiseListenURL: spec.GetAdvertiseListenURL(enableTLS), + BackendEndpoints: strings.Join(pds, ","), + DeployDir: paths.Deploy, + DataDir: paths.Data[0], + LogDir: paths.Log, + NumaNode: spec.NumaNode, + } + + fp := filepath.Join(paths.Cache, fmt.Sprintf("run_scheduling_%s_%d.sh", i.GetHost(), i.GetPort())) + if err := cfg.ConfigToFile(fp); err != nil { + return err + } + dst := filepath.Join(paths.Deploy, "scripts", "run_scheduling.sh") + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { + return err + } + _, _, err := e.Execute(ctx, "chmod +x "+dst, false) + if err != nil { + return err + } + + globalConfig := topo.ServerConfigs.Scheduling + // set TLS configs + spec.Config, err = i.setTLSConfig(ctx, enableTLS, spec.Config, paths) + if err != nil { + return err + } + + if err := i.MergeServerConfig(ctx, e, globalConfig, spec.Config, paths); err != nil { + return err + } + + return checkConfig(ctx, e, i.ComponentName(), i.ComponentSource(), version, i.OS(), i.Arch(), i.ComponentName()+".toml", paths) +} + +// setTLSConfig set TLS Config to support enable/disable TLS +func (i *SchedulingInstance) setTLSConfig(ctx context.Context, enableTLS bool, configs map[string]any, paths meta.DirPaths) (map[string]any, error) { + // set TLS configs + if enableTLS { + if configs == nil { + configs = make(map[string]any) + } + configs["security.cacert-path"] = fmt.Sprintf( + "%s/tls/%s", + paths.Deploy, + TLSCACert, + ) + configs["security.cert-path"] = fmt.Sprintf( + "%s/tls/%s.crt", + paths.Deploy, + i.Role()) + configs["security.key-path"] = fmt.Sprintf( + "%s/tls/%s.pem", + paths.Deploy, + i.Role()) + } else { + // drainer tls config list + tlsConfigs := []string{ + "security.cacert-path", + "security.cert-path", + "security.key-path", + } + // delete TLS configs + if configs != nil { + for _, config := range tlsConfigs { + delete(configs, config) + } + } + } + + return configs, nil +} + +// ScaleConfig deploy temporary config on scaling +func (i *SchedulingInstance) ScaleConfig( + ctx context.Context, + e ctxt.Executor, + topo Topology, + clusterName, + clusterVersion, + deployUser string, + paths meta.DirPaths, +) error { + s := i.topo + defer func() { + i.topo = s + }() + i.topo = mustBeClusterTopo(topo) + return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths) +} + +var _ RollingUpdateInstance = &SchedulingInstance{} + +// PreRestart implements RollingUpdateInstance interface. +func (i *SchedulingInstance) PreRestart(ctx context.Context, topo Topology, apiTimeoutSeconds int, tlsCfg *tls.Config) error { + return nil +} + +// PostRestart implements RollingUpdateInstance interface. +func (i *SchedulingInstance) PostRestart(ctx context.Context, topo Topology, tlsCfg *tls.Config) error { + return nil +} diff --git a/pkg/cluster/spec/spec.go b/pkg/cluster/spec/spec.go index bd5b99fbdb..f3400c9758 100644 --- a/pkg/cluster/spec/spec.go +++ b/pkg/cluster/spec/spec.go @@ -101,6 +101,7 @@ type ( Arch string `yaml:"arch,omitempty"` Custom any `yaml:"custom,omitempty" validate:"custom:ignore"` SystemdMode SystemdMode `yaml:"systemd_mode,omitempty" default:"system"` + PDMode string `yaml:"pd_mode,omitempty"` } // MonitoredOptions represents the monitored node configuration @@ -121,6 +122,8 @@ type ( TiDB map[string]any `yaml:"tidb"` TiKV map[string]any `yaml:"tikv"` PD map[string]any `yaml:"pd"` + TSO map[string]any `yaml:"tso"` + Scheduling map[string]any `yaml:"scheduling"` Dashboard map[string]any `yaml:"tidb_dashboard"` TiFlash map[string]any `yaml:"tiflash"` TiProxy map[string]any `yaml:"tiproxy"` @@ -138,6 +141,8 @@ type ( TiKV string `yaml:"tikv,omitempty"` TiFlash string `yaml:"tiflash,omitempty"` PD string `yaml:"pd,omitempty"` + TSO string `yaml:"tso,omitempty"` + Scheduling string `yaml:"scheduling,omitempty"` Dashboard string `yaml:"tidb_dashboard,omitempty"` Pump string `yaml:"pump,omitempty"` Drainer string `yaml:"drainer,omitempty"` @@ -177,6 +182,8 @@ type ( TiFlashServers []*TiFlashSpec `yaml:"tiflash_servers"` TiProxyServers []*TiProxySpec `yaml:"tiproxy_servers"` PDServers []*PDSpec `yaml:"pd_servers"` + TSOServers []*TSOSpec `yaml:"tso_servers,omitempty"` + SchedulingServers []*SchedulingSpec `yaml:"scheduling_servers,omitempty"` DashboardServers []*DashboardSpec `yaml:"tidb_dashboard_servers,omitempty"` PumpServers []*PumpSpec `yaml:"pump_servers,omitempty"` Drainers []*DrainerSpec `yaml:"drainer_servers,omitempty"` @@ -561,6 +568,8 @@ func (s *Specification) Merge(that Topology) Topology { DashboardServers: append(s.DashboardServers, spec.DashboardServers...), TiFlashServers: append(s.TiFlashServers, spec.TiFlashServers...), TiProxyServers: append(s.TiProxyServers, spec.TiProxyServers...), + TSOServers: append(s.TSOServers, spec.TSOServers...), + SchedulingServers: append(s.SchedulingServers, spec.SchedulingServers...), PumpServers: append(s.PumpServers, spec.PumpServers...), Drainers: append(s.Drainers, spec.Drainers...), CDCServers: append(s.CDCServers, spec.CDCServers...), @@ -579,6 +588,8 @@ func (v *ComponentVersions) Merge(that ComponentVersions) ComponentVersions { TiDB: utils.Ternary(that.TiDB != "", that.TiDB, v.TiDB).(string), TiKV: utils.Ternary(that.TiKV != "", that.TiKV, v.TiKV).(string), PD: utils.Ternary(that.PD != "", that.PD, v.PD).(string), + TSO: utils.Ternary(that.TSO != "", that.TSO, v.TSO).(string), + Scheduling: utils.Ternary(that.Scheduling != "", that.Scheduling, v.Scheduling).(string), Dashboard: utils.Ternary(that.Dashboard != "", that.Dashboard, v.Dashboard).(string), TiFlash: utils.Ternary(that.TiFlash != "", that.TiFlash, v.TiFlash).(string), TiProxy: utils.Ternary(that.TiProxy != "", that.TiProxy, v.TiProxy).(string), @@ -789,8 +800,10 @@ func (s *Specification) ComponentsByStopOrder() (comps []Component) { // ComponentsByStartOrder return component in the order need to start. func (s *Specification) ComponentsByStartOrder() (comps []Component) { - // "pd", "dashboard", "tiproxy", "tikv", "pump", "tidb", "tiflash", "drainer", "cdc", "tikv-cdc", "prometheus", "grafana", "alertmanager" + // "pd", "tso", "scheduling", "dashboard", "tiproxy", "tikv", "pump", "tidb", "tiflash", "drainer", "cdc", "tikv-cdc", "prometheus", "grafana", "alertmanager" comps = append(comps, &PDComponent{s}) + comps = append(comps, &TSOComponent{s}) + comps = append(comps, &SchedulingComponent{s}) comps = append(comps, &DashboardComponent{s}) comps = append(comps, &TiProxyComponent{s}) comps = append(comps, &TiKVComponent{s}) @@ -813,12 +826,14 @@ func (s *Specification) ComponentsByUpdateOrder(curVer string) (comps []Componen // Ref: https://github.com/pingcap/tiup/issues/2166 cdcUpgradeBeforePDTiKVTiDB := tidbver.TiCDCUpgradeBeforePDTiKVTiDB(curVer) - // "tiflash", <"cdc">, "pd", "dashboard", "tiproxy", "tikv", "pump", "tidb", "drainer", <"cdc>", "prometheus", "grafana", "alertmanager" + // "tiflash", <"cdc">, "pd", "tso", "scheduling", "dashboard", "tiproxy", "tikv", "pump", "tidb", "drainer", <"cdc>", "prometheus", "grafana", "alertmanager" comps = append(comps, &TiFlashComponent{s}) if cdcUpgradeBeforePDTiKVTiDB { comps = append(comps, &CDCComponent{s}) } comps = append(comps, &PDComponent{s}) + comps = append(comps, &TSOComponent{s}) + comps = append(comps, &SchedulingComponent{s}) comps = append(comps, &DashboardComponent{s}) comps = append(comps, &TiProxyComponent{s}) comps = append(comps, &TiKVComponent{s}) diff --git a/pkg/cluster/spec/tso.go b/pkg/cluster/spec/tso.go new file mode 100644 index 0000000000..a84069f6ee --- /dev/null +++ b/pkg/cluster/spec/tso.go @@ -0,0 +1,334 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package spec + +import ( + "context" + "crypto/tls" + "fmt" + "path/filepath" + "strings" + "time" + + "github.com/pingcap/tiup/pkg/cluster/api" + "github.com/pingcap/tiup/pkg/cluster/ctxt" + "github.com/pingcap/tiup/pkg/cluster/template/scripts" + "github.com/pingcap/tiup/pkg/meta" + "github.com/pingcap/tiup/pkg/utils" +) + +// TSOSpec represents the TSO topology specification in topology.yaml +type TSOSpec struct { + Host string `yaml:"host"` + ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"` + ListenHost string `yaml:"listen_host,omitempty"` + AdvertiseListenAddr string `yaml:"advertise_listen_addr,omitempty"` + SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"` + IgnoreExporter bool `yaml:"ignore_exporter,omitempty"` + Port int `yaml:"port" default:"3379"` + DeployDir string `yaml:"deploy_dir,omitempty"` + DataDir string `yaml:"data_dir,omitempty"` + LogDir string `yaml:"log_dir,omitempty"` + Source string `yaml:"source,omitempty" validate:"source:editable"` + NumaNode string `yaml:"numa_node,omitempty" validate:"numa_node:editable"` + Config map[string]any `yaml:"config,omitempty" validate:"config:ignore"` + Arch string `yaml:"arch,omitempty"` + OS string `yaml:"os,omitempty"` +} + +// Status queries current status of the instance +func (s *TSOSpec) Status(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, pdList ...string) string { + if timeout < time.Second { + timeout = statusQueryTimeout + } + + addr := utils.JoinHostPort(s.GetManageHost(), s.Port) + tc := api.NewTSOClient(ctx, []string{addr}, timeout, tlsCfg) + pc := api.NewPDClient(ctx, pdList, timeout, tlsCfg) + + // check health + err := tc.CheckHealth() + if err != nil { + return "Down" + } + + primary, err := pc.GetServicePrimary("tso") + if err != nil { + return "ERR" + } + res := "Up" + enableTLS := false + if tlsCfg != nil { + enableTLS = true + } + if s.GetAdvertiseListenURL(enableTLS) == primary { + res += "|P" + } + + return res +} + +// Role returns the component role of the instance +func (s *TSOSpec) Role() string { + return ComponentTSO +} + +// SSH returns the host and SSH port of the instance +func (s *TSOSpec) SSH() (string, int) { + host := s.Host + if s.ManageHost != "" { + host = s.ManageHost + } + return host, s.SSHPort +} + +// GetMainPort returns the main port of the instance +func (s *TSOSpec) GetMainPort() int { + return s.Port +} + +// GetManageHost returns the manage host of the instance +func (s *TSOSpec) GetManageHost() string { + if s.ManageHost != "" { + return s.ManageHost + } + return s.Host +} + +// IsImported returns if the node is imported from TiDB-Ansible +func (s *TSOSpec) IsImported() bool { + return false +} + +// IgnoreMonitorAgent returns if the node does not have monitor agents available +func (s *TSOSpec) IgnoreMonitorAgent() bool { + return s.IgnoreExporter +} + +// GetAdvertiseListenURL returns AdvertiseListenURL +func (s *TSOSpec) GetAdvertiseListenURL(enableTLS bool) string { + if s.AdvertiseListenAddr != "" { + return s.AdvertiseListenAddr + } + scheme := utils.Ternary(enableTLS, "https", "http").(string) + return fmt.Sprintf("%s://%s", scheme, utils.JoinHostPort(s.Host, s.Port)) +} + +// TSOComponent represents TSO component. +type TSOComponent struct{ Topology *Specification } + +// Name implements Component interface. +func (c *TSOComponent) Name() string { + return ComponentTSO +} + +// Role implements Component interface. +func (c *TSOComponent) Role() string { + return ComponentTSO +} + +// Source implements Component interface. +func (c *TSOComponent) Source() string { + source := c.Topology.ComponentSources.PD + if source != "" { + return source + } + return ComponentPD +} + +// CalculateVersion implements the Component interface +func (c *TSOComponent) CalculateVersion(clusterVersion string) string { + version := c.Topology.ComponentVersions.TSO + if version == "" { + version = clusterVersion + } + return version +} + +// SetVersion implements Component interface. +func (c *TSOComponent) SetVersion(version string) { + c.Topology.ComponentVersions.TSO = version +} + +// Instances implements Component interface. +func (c *TSOComponent) Instances() []Instance { + ins := make([]Instance, 0, len(c.Topology.TSOServers)) + for _, s := range c.Topology.TSOServers { + s := s + ins = append(ins, &TSOInstance{ + BaseInstance: BaseInstance{ + InstanceSpec: s, + Name: c.Name(), + Host: s.Host, + ManageHost: s.ManageHost, + ListenHost: utils.Ternary(s.ListenHost != "", s.ListenHost, c.Topology.BaseTopo().GlobalOptions.ListenHost).(string), + Port: s.Port, + SSHP: s.SSHPort, + Source: s.Source, + NumaNode: s.NumaNode, + NumaCores: "", + + Ports: []int{ + s.Port, + }, + Dirs: []string{ + s.DeployDir, + s.DataDir, + }, + StatusFn: s.Status, + UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { + return UptimeByHost(s.GetManageHost(), s.Port, timeout, tlsCfg) + }, + Component: c, + }, + topo: c.Topology, + }) + } + return ins +} + +// TSOInstance represent the TSO instance +type TSOInstance struct { + Name string + BaseInstance + topo Topology +} + +// InitConfig implement Instance interface +func (i *TSOInstance) InitConfig( + ctx context.Context, + e ctxt.Executor, + clusterName, + clusterVersion, + deployUser string, + paths meta.DirPaths, +) error { + topo := i.topo.(*Specification) + if err := i.BaseInstance.InitConfig(ctx, e, topo.GlobalOptions, deployUser, paths); err != nil { + return err + } + + enableTLS := topo.GlobalOptions.TLSEnabled + spec := i.InstanceSpec.(*TSOSpec) + scheme := utils.Ternary(enableTLS, "https", "http").(string) + version := i.CalculateVersion(clusterVersion) + + pds := []string{} + for _, pdspec := range topo.PDServers { + pds = append(pds, pdspec.GetAdvertiseClientURL(enableTLS)) + } + cfg := &scripts.TSOScript{ + ListenURL: fmt.Sprintf("%s://%s", scheme, utils.JoinHostPort(i.GetListenHost(), spec.Port)), + AdvertiseListenURL: spec.GetAdvertiseListenURL(enableTLS), + BackendEndpoints: strings.Join(pds, ","), + DeployDir: paths.Deploy, + DataDir: paths.Data[0], + LogDir: paths.Log, + NumaNode: spec.NumaNode, + } + + fp := filepath.Join(paths.Cache, fmt.Sprintf("run_tso_%s_%d.sh", i.GetHost(), i.GetPort())) + if err := cfg.ConfigToFile(fp); err != nil { + return err + } + dst := filepath.Join(paths.Deploy, "scripts", "run_tso.sh") + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { + return err + } + _, _, err := e.Execute(ctx, "chmod +x "+dst, false) + if err != nil { + return err + } + + globalConfig := topo.ServerConfigs.TSO + // set TLS configs + spec.Config, err = i.setTLSConfig(ctx, enableTLS, spec.Config, paths) + if err != nil { + return err + } + + if err := i.MergeServerConfig(ctx, e, globalConfig, spec.Config, paths); err != nil { + return err + } + + return checkConfig(ctx, e, i.ComponentName(), i.ComponentSource(), version, i.OS(), i.Arch(), i.ComponentName()+".toml", paths) +} + +// setTLSConfig set TLS Config to support enable/disable TLS +func (i *TSOInstance) setTLSConfig(ctx context.Context, enableTLS bool, configs map[string]any, paths meta.DirPaths) (map[string]any, error) { + // set TLS configs + if enableTLS { + if configs == nil { + configs = make(map[string]any) + } + configs["security.cacert-path"] = fmt.Sprintf( + "%s/tls/%s", + paths.Deploy, + TLSCACert, + ) + configs["security.cert-path"] = fmt.Sprintf( + "%s/tls/%s.crt", + paths.Deploy, + i.Role()) + configs["security.key-path"] = fmt.Sprintf( + "%s/tls/%s.pem", + paths.Deploy, + i.Role()) + } else { + // drainer tls config list + tlsConfigs := []string{ + "security.cacert-path", + "security.cert-path", + "security.key-path", + } + // delete TLS configs + if configs != nil { + for _, config := range tlsConfigs { + delete(configs, config) + } + } + } + + return configs, nil +} + +// ScaleConfig deploy temporary config on scaling +func (i *TSOInstance) ScaleConfig( + ctx context.Context, + e ctxt.Executor, + topo Topology, + clusterName, + clusterVersion, + deployUser string, + paths meta.DirPaths, +) error { + s := i.topo + defer func() { + i.topo = s + }() + i.topo = mustBeClusterTopo(topo) + return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths) +} + +var _ RollingUpdateInstance = &TSOInstance{} + +// PreRestart implements RollingUpdateInstance interface. +func (i *TSOInstance) PreRestart(ctx context.Context, topo Topology, apiTimeoutSeconds int, tlsCfg *tls.Config) error { + return nil +} + +// PostRestart implements RollingUpdateInstance interface. +func (i *TSOInstance) PostRestart(ctx context.Context, topo Topology, tlsCfg *tls.Config) error { + return nil +} diff --git a/pkg/cluster/task/update_meta.go b/pkg/cluster/task/update_meta.go index dd6457e1b9..aaa6eeb4cf 100644 --- a/pkg/cluster/task/update_meta.go +++ b/pkg/cluster/task/update_meta.go @@ -72,6 +72,24 @@ func (u *UpdateMeta) Execute(ctx context.Context) error { } newMeta.Topology.PDServers = pdServers + tsoServers := make([]*spec.TSOSpec, 0) + for i, instance := range (&spec.TSOComponent{Topology: topo}).Instances() { + if deleted.Exist(instance.ID()) { + continue + } + tsoServers = append(tsoServers, topo.TSOServers[i]) + } + newMeta.Topology.TSOServers = tsoServers + + schedulingServers := make([]*spec.SchedulingSpec, 0) + for i, instance := range (&spec.SchedulingComponent{Topology: topo}).Instances() { + if deleted.Exist(instance.ID()) { + continue + } + schedulingServers = append(schedulingServers, topo.SchedulingServers[i]) + } + newMeta.Topology.SchedulingServers = schedulingServers + tiproxyServers := make([]*spec.TiProxySpec, 0) for i, instance := range (&spec.TiProxyComponent{Topology: topo}).Instances() { if deleted.Exist(instance.ID()) { diff --git a/pkg/cluster/template/config/prometheus.go b/pkg/cluster/template/config/prometheus.go index 90d6b00051..916fa55349 100644 --- a/pkg/cluster/template/config/prometheus.go +++ b/pkg/cluster/template/config/prometheus.go @@ -34,6 +34,8 @@ type PrometheusConfig struct { TiProxyStatusAddrs []string TiKVStatusAddrs []string PDAddrs []string + TSOAddrs []string + SchedulingAddrs []string TiFlashStatusAddrs []string TiFlashLearnerStatusAddrs []string PumpAddrs []string @@ -98,6 +100,18 @@ func (c *PrometheusConfig) AddPD(ip string, port uint64) *PrometheusConfig { return c } +// AddTSO add a TSO address +func (c *PrometheusConfig) AddTSO(ip string, port uint64) *PrometheusConfig { + c.TSOAddrs = append(c.TSOAddrs, utils.JoinHostPort(ip, int(port))) + return c +} + +// AddScheduling add a scheduling address +func (c *PrometheusConfig) AddScheduling(ip string, port uint64) *PrometheusConfig { + c.SchedulingAddrs = append(c.SchedulingAddrs, utils.JoinHostPort(ip, int(port))) + return c +} + // AddTiFlashLearner add a TiFlash learner address func (c *PrometheusConfig) AddTiFlashLearner(ip string, port uint64) *PrometheusConfig { c.TiFlashLearnerStatusAddrs = append(c.TiFlashLearnerStatusAddrs, utils.JoinHostPort(ip, int(port))) diff --git a/pkg/cluster/template/scripts/pd.go b/pkg/cluster/template/scripts/pd.go index a09eb4be5b..62bda1d152 100644 --- a/pkg/cluster/template/scripts/pd.go +++ b/pkg/cluster/template/scripts/pd.go @@ -37,6 +37,7 @@ type PDScript struct { LogDir string NumaNode string + MSMode bool } // ConfigToFile write config content to specific path diff --git a/pkg/cluster/template/scripts/scheduling.go b/pkg/cluster/template/scripts/scheduling.go new file mode 100644 index 0000000000..6167d9336e --- /dev/null +++ b/pkg/cluster/template/scripts/scheduling.go @@ -0,0 +1,57 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package scripts + +import ( + "bytes" + "path" + "text/template" + + "github.com/pingcap/tiup/embed" + "github.com/pingcap/tiup/pkg/utils" +) + +// SchedulingScript represent the data to generate scheduling config +type SchedulingScript struct { + ListenURL string + AdvertiseListenURL string + BackendEndpoints string + + DeployDir string + DataDir string + LogDir string + + NumaNode string +} + +// ConfigToFile write config content to specific path +func (c *SchedulingScript) ConfigToFile(file string) error { + fp := path.Join("templates", "scripts", "run_scheduling.sh.tpl") + tpl, err := embed.ReadTemplate(fp) + if err != nil { + return err + } + + tmpl, err := template.New("Scheduling").Parse(string(tpl)) + if err != nil { + return err + } + + content := bytes.NewBufferString("") + if err := tmpl.Execute(content, c); err != nil { + return err + } + + return utils.WriteFile(file, content.Bytes(), 0755) +} diff --git a/pkg/cluster/template/scripts/tso.go b/pkg/cluster/template/scripts/tso.go new file mode 100644 index 0000000000..0197b82c38 --- /dev/null +++ b/pkg/cluster/template/scripts/tso.go @@ -0,0 +1,57 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package scripts + +import ( + "bytes" + "path" + "text/template" + + "github.com/pingcap/tiup/embed" + "github.com/pingcap/tiup/pkg/utils" +) + +// TSOScript represent the data to generate tso config +type TSOScript struct { + ListenURL string + AdvertiseListenURL string + BackendEndpoints string + + DeployDir string + DataDir string + LogDir string + + NumaNode string +} + +// ConfigToFile write config content to specific path +func (c *TSOScript) ConfigToFile(file string) error { + fp := path.Join("templates", "scripts", "run_tso.sh.tpl") + tpl, err := embed.ReadTemplate(fp) + if err != nil { + return err + } + + tmpl, err := template.New("TSO").Parse(string(tpl)) + if err != nil { + return err + } + + content := bytes.NewBufferString("") + if err := tmpl.Execute(content, c); err != nil { + return err + } + + return utils.WriteFile(file, content.Bytes(), 0755) +}