diff --git a/CHANGELOG.md b/CHANGELOG.md index c84df37b04..a27d643f30 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,19 @@ TiUP Changelog +## [1.13.1] 2023-09-25 + +### Fixes + +- Increase timeout when publish package in `tiup` (#2269, @nexustar) +- Fix pd microservice component id in `tiup-playground` (#2272, @iosmanthus) +- Fix grafana for multiple instances using same host in `tiup-cluster` and `tiup-dm` (#2277, @lastincisor) +- Add cdn workaround (#2285, @nexustar) +- Mirror: fix progress bar is not accurate (#2284, @nexustar) + +### Improvement + +- Support ignore version check when upgrade in `tiup-cluster` and `tiup-dm` (#2282, @nexustar) + ## [1.13.0] 2023-08-26 ### New Features diff --git a/components/cluster/command/display.go b/components/cluster/command/display.go index 816b0c2bcf..981e864542 100644 --- a/components/cluster/command/display.go +++ b/components/cluster/command/display.go @@ -97,6 +97,7 @@ func newDisplayCmd() *cobra.Command { cmd.Flags().BoolVar(&showTiKVLabels, "labels", false, "Only display labels of specified TiKV role or nodes") cmd.Flags().BoolVar(&dopt.ShowProcess, "process", false, "display cpu and memory usage of nodes") cmd.Flags().BoolVar(&dopt.ShowManageHost, "manage-host", false, "display manage host of nodes") + cmd.Flags().BoolVar(&dopt.ShowNuma, "numa", false, "display numa information of nodes") cmd.Flags().Uint64Var(&statusTimeout, "status-timeout", 10, "Timeout in seconds when getting node status") return cmd diff --git a/components/cluster/command/upgrade.go b/components/cluster/command/upgrade.go index 3fc36fa98e..623dea909f 100644 --- a/components/cluster/command/upgrade.go +++ b/components/cluster/command/upgrade.go @@ -21,7 +21,8 @@ import ( func newUpgradeCmd() *cobra.Command { offlineMode := false - var tidbVer, tikvVer, pdVer, tiflashVer, kvcdcVer, dashboardVer, cdcVer, alertmanagerVer, nodeExporterVer, blackboxExporterVer string + ignoreVersionCheck := false + var tidbVer, tikvVer, pdVer, tiflashVer, kvcdcVer, dashboardVer, cdcVer, alertmanagerVer, nodeExporterVer, blackboxExporterVer, tiproxyVer string cmd := &cobra.Command{ Use: "upgrade ", @@ -49,11 +50,12 @@ func newUpgradeCmd() *cobra.Command { spec.ComponentTiFlash: tiflashVer, spec.ComponentTiKVCDC: kvcdcVer, spec.ComponentCDC: cdcVer, + spec.ComponentTiProxy: tiproxyVer, spec.ComponentBlackboxExporter: blackboxExporterVer, spec.ComponentNodeExporter: nodeExporterVer, } - return cm.Upgrade(clusterName, version, componentVersions, gOpt, skipConfirm, offlineMode) + return cm.Upgrade(clusterName, version, componentVersions, gOpt, skipConfirm, offlineMode, ignoreVersionCheck) }, ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { switch len(args) { @@ -68,6 +70,7 @@ func newUpgradeCmd() *cobra.Command { cmd.Flags().Uint64Var(&gOpt.APITimeout, "transfer-timeout", 600, "Timeout in seconds when transferring PD and TiKV store leaders, also for TiCDC drain one capture") cmd.Flags().BoolVarP(&gOpt.IgnoreConfigCheck, "ignore-config-check", "", false, "Ignore the config check result") cmd.Flags().BoolVarP(&offlineMode, "offline", "", false, "Upgrade a stopped cluster") + cmd.Flags().BoolVarP(&ignoreVersionCheck, "ignore-version-check", "", false, "Ignore checking if target version is bigger than current version") cmd.Flags().StringVar(&gOpt.SSHCustomScripts.BeforeRestartInstance.Raw, "pre-upgrade-script", "", "(EXPERIMENTAL) Custom script to be executed on each server before the server is upgraded") cmd.Flags().StringVar(&gOpt.SSHCustomScripts.AfterRestartInstance.Raw, "post-upgrade-script", "", "(EXPERIMENTAL) Custom script to be executed on each server after the server is upgraded") @@ -81,5 +84,6 @@ func newUpgradeCmd() *cobra.Command { cmd.Flags().StringVar(&alertmanagerVer, "alertmanager-version", "", "Specify the version of alertmanager to upgrade to") cmd.Flags().StringVar(&nodeExporterVer, "node-exporter-version", "", "Specify the version of node-exporter to upgrade to") cmd.Flags().StringVar(&blackboxExporterVer, "blackbox-exporter-version", "", "Specify the version of blackbox-exporter to upgrade to") + cmd.Flags().StringVar(&tiproxyVer, "tiproxy-version", "", "Specify the version of tiproxy to upgrade to") return cmd } diff --git a/components/dm/command/upgrade.go b/components/dm/command/upgrade.go index af79ccfeda..d902979295 100644 --- a/components/dm/command/upgrade.go +++ b/components/dm/command/upgrade.go @@ -19,7 +19,7 @@ import ( func newUpgradeCmd() *cobra.Command { offlineMode := false - + ignoreVersionCheck := false cmd := &cobra.Command{ Use: "upgrade ", Short: "Upgrade a specified DM cluster", @@ -28,7 +28,7 @@ func newUpgradeCmd() *cobra.Command { return cmd.Help() } - return cm.Upgrade(args[0], args[1], nil, gOpt, skipConfirm, offlineMode) + return cm.Upgrade(args[0], args[1], nil, gOpt, skipConfirm, offlineMode, ignoreVersionCheck) }, ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { switch len(args) { @@ -41,6 +41,7 @@ func newUpgradeCmd() *cobra.Command { } cmd.Flags().BoolVarP(&offlineMode, "offline", "", false, "Upgrade a stopped cluster") + cmd.Flags().BoolVarP(&ignoreVersionCheck, "ignore-version-check", "", false, "Ignore checking if target version is higher than current version") return cmd } diff --git a/components/playground/instance/instance.go b/components/playground/instance/instance.go index 902e32b52a..620f2a502d 100644 --- a/components/playground/instance/instance.go +++ b/components/playground/instance/instance.go @@ -47,6 +47,12 @@ type instance struct { BinPath string } +// MetricAddr will be used by prometheus scrape_configs. +type MetricAddr struct { + Targets []string `json:"targets"` + Labels map[string]string `json:"labels"` +} + // Instance represent running component type Instance interface { Pid() int @@ -59,16 +65,16 @@ type Instance interface { LogFile() string // Uptime show uptime. Uptime() string - // StatusAddrs return the address to pull metrics. - StatusAddrs() []string + // MetricAddr return the address to pull metrics. + MetricAddr() MetricAddr // Wait Should only call this if the instance is started successfully. // The implementation should be safe to call Wait multi times. Wait() error } -func (inst *instance) StatusAddrs() (addrs []string) { +func (inst *instance) MetricAddr() (r MetricAddr) { if inst.Host != "" && inst.StatusPort != 0 { - addrs = append(addrs, utils.JoinHostPort(inst.Host, inst.StatusPort)) + r.Targets = append(r.Targets, utils.JoinHostPort(inst.Host, inst.StatusPort)) } return } @@ -109,7 +115,7 @@ func logIfErr(err error) { func pdEndpoints(pds []*PDInstance, isHTTP bool) []string { var endpoints []string for _, pd := range pds { - if pd.Role == PDRoleTSO || pd.Role == PDRoleResourceManager { + if pd.Role == PDRoleTSO || pd.Role == PDRoleScheduling || pd.Role == PDRoleResourceManager { continue } if isHTTP { diff --git a/components/playground/instance/pd.go b/components/playground/instance/pd.go index 84f1f53117..68673d55fe 100644 --- a/components/playground/instance/pd.go +++ b/components/playground/instance/pd.go @@ -34,6 +34,8 @@ const ( PDRoleAPI PDRole = "api" // PDRoleTSO is the role of PD TSO PDRoleTSO PDRole = "tso" + // PDRoleScheduling is the role of PD scheduling + PDRoleScheduling PDRole = "scheduling" // PDRoleResourceManager is the role of PD resource manager PDRoleResourceManager PDRole = "resource manager" ) @@ -128,8 +130,21 @@ func (inst *PDInstance) Start(ctx context.Context, version utils.Version) error args = []string{ "services", "tso", - fmt.Sprintf("--listen-addr=http://%s", utils.JoinHostPort(inst.Host, inst.Port)), - fmt.Sprintf("--advertise-listen-addr=http://%s", utils.JoinHostPort(AdvertiseHost(inst.Host), inst.Port)), + fmt.Sprintf("--listen-addr=http://%s", utils.JoinHostPort(inst.Host, inst.StatusPort)), + fmt.Sprintf("--advertise-listen-addr=http://%s", utils.JoinHostPort(AdvertiseHost(inst.Host), inst.StatusPort)), + fmt.Sprintf("--backend-endpoints=%s", strings.Join(endpoints, ",")), + fmt.Sprintf("--log-file=%s", inst.LogFile()), + } + if inst.ConfigPath != "" { + args = append(args, fmt.Sprintf("--config=%s", inst.ConfigPath)) + } + case PDRoleScheduling: + endpoints := pdEndpoints(inst.pds, true) + args = []string{ + "services", + "scheduling", + fmt.Sprintf("--listen-addr=http://%s", utils.JoinHostPort(inst.Host, inst.StatusPort)), + fmt.Sprintf("--advertise-listen-addr=http://%s", utils.JoinHostPort(AdvertiseHost(inst.Host), inst.StatusPort)), fmt.Sprintf("--backend-endpoints=%s", strings.Join(endpoints, ",")), fmt.Sprintf("--log-file=%s", inst.LogFile()), } @@ -141,8 +156,8 @@ func (inst *PDInstance) Start(ctx context.Context, version utils.Version) error args = []string{ "services", "resource-manager", - fmt.Sprintf("--listen-addr=http://%s", utils.JoinHostPort(inst.Host, inst.Port)), - fmt.Sprintf("--advertise-listen-addr=http://%s", utils.JoinHostPort(AdvertiseHost(inst.Host), inst.Port)), + fmt.Sprintf("--listen-addr=http://%s", utils.JoinHostPort(inst.Host, inst.StatusPort)), + fmt.Sprintf("--advertise-listen-addr=http://%s", utils.JoinHostPort(AdvertiseHost(inst.Host), inst.StatusPort)), fmt.Sprintf("--backend-endpoints=%s", strings.Join(endpoints, ",")), fmt.Sprintf("--log-file=%s", inst.LogFile()), } diff --git a/components/playground/instance/tiproxy.go b/components/playground/instance/tiproxy.go index b39f5f01de..95b6b13cd8 100644 --- a/components/playground/instance/tiproxy.go +++ b/components/playground/instance/tiproxy.go @@ -54,6 +54,15 @@ func NewTiProxy(binPath string, dir, host, configPath string, id int, port int, return tiproxy } +// MetricAddr implements Instance interface. +func (c *TiProxy) MetricAddr() (r MetricAddr) { + r.Targets = append(r.Targets, utils.JoinHostPort(c.Host, c.StatusPort)) + r.Labels = map[string]string{ + "__metrics_path__": "/api/metrics", + } + return +} + // Start implements Instance interface. func (c *TiProxy) Start(ctx context.Context, version utils.Version) error { endpoints := pdEndpoints(c.pds, true) diff --git a/components/playground/main.go b/components/playground/main.go index 2bae3ecc1e..d6419c100e 100644 --- a/components/playground/main.go +++ b/components/playground/main.go @@ -15,9 +15,9 @@ package main import ( "context" - "database/sql" "encoding/json" "fmt" + "net" "net/http" _ "net/http/pprof" "os" @@ -57,10 +57,11 @@ type BootOptions struct { Mode string `yaml:"mode"` PDMode string `yaml:"pd_mode"` Version string `yaml:"version"` - PD instance.Config `yaml:"pd"` // ignored when pd_mode == ms - PDAPI instance.Config `yaml:"pd_api"` // Only available when pd_mode == ms - PDTSO instance.Config `yaml:"pd_tso"` // Only available when pd_mode == ms - PDRM instance.Config `yaml:"pd_rm"` // Only available when pd_mode == ms + PD instance.Config `yaml:"pd"` // ignored when pd_mode == ms + PDAPI instance.Config `yaml:"pd_api"` // Only available when pd_mode == ms + PDTSO instance.Config `yaml:"pd_tso"` // Only available when pd_mode == ms + PDScheduling instance.Config `yaml:"pd_scheduling"` // Only available when pd_mode == ms + PDRM instance.Config `yaml:"pd_rm"` // Only available when pd_mode == ms TiProxy instance.Config `yaml:"tiproxy"` TiDB instance.Config `yaml:"tidb"` TiKV instance.Config `yaml:"tikv"` @@ -294,6 +295,7 @@ If you'd like to use a TiDB version other than %s, cancel and retry with the fol rootCmd.Flags().IntVar(&options.PDAPI.Num, "pd.api", 0, "PD API instance number") rootCmd.Flags().IntVar(&options.PDTSO.Num, "pd.tso", 0, "PD TSO instance number") + rootCmd.Flags().IntVar(&options.PDScheduling.Num, "pd.scheduling", 0, "PD scheduling instance number") rootCmd.Flags().IntVar(&options.PDRM.Num, "pd.rm", 0, "PD resource manager instance number") rootCmd.Flags().IntVar(&options.TiDB.UpTimeout, "db.timeout", 60, "TiDB max wait time in seconds for starting, 0 means no limit") @@ -326,6 +328,7 @@ If you'd like to use a TiDB version other than %s, cancel and retry with the fol rootCmd.Flags().StringVar(&options.PDAPI.ConfigPath, "pd.api.config", "", "PD API instance configuration file") rootCmd.Flags().StringVar(&options.PDTSO.ConfigPath, "pd.tso.config", "", "PD TSO instance configuration file") + rootCmd.Flags().StringVar(&options.PDScheduling.ConfigPath, "pd.scheduling.config", "", "PD scheduling instance configuration file") rootCmd.Flags().StringVar(&options.PDRM.ConfigPath, "pd.rm.config", "", "PD resource manager instance configuration file") rootCmd.Flags().StringVar(&options.TiDB.BinPath, "db.binpath", "", "TiDB instance binary path") @@ -342,6 +345,7 @@ If you'd like to use a TiDB version other than %s, cancel and retry with the fol rootCmd.Flags().StringVar(&options.PDAPI.BinPath, "pd.api.binpath", "", "PD API instance binary path") rootCmd.Flags().StringVar(&options.PDTSO.BinPath, "pd.tso.binpath", "", "PD TSO instance binary path") + rootCmd.Flags().StringVar(&options.PDScheduling.BinPath, "pd.scheduling.binpath", "", "PD scheduling instance binary path") rootCmd.Flags().StringVar(&options.PDRM.BinPath, "pd.rm.binpath", "", "PD resource manager instance binary path") rootCmd.Flags().StringVar(&options.TiKVCDC.Version, "kvcdc.version", "", "TiKV-CDC instance version") @@ -409,6 +413,9 @@ func populateDefaultOpt(flagSet *pflag.FlagSet) error { defaultInt(&options.PDTSO.Num, "pd.tso", 1) defaultStr(&options.PDTSO.BinPath, "pd.tso.binpath", options.PDTSO.BinPath) defaultStr(&options.PDTSO.ConfigPath, "pd.tso.config", options.PDTSO.ConfigPath) + defaultInt(&options.PDScheduling.Num, "pd.scheduling", 1) + defaultStr(&options.PDScheduling.BinPath, "pd.scheduling.binpath", options.PDScheduling.BinPath) + defaultStr(&options.PDScheduling.ConfigPath, "pd.scheduling.config", options.PDScheduling.ConfigPath) defaultInt(&options.PDRM.Num, "pd.rm", 1) defaultStr(&options.PDRM.BinPath, "pd.rm.binpath", options.PDRM.BinPath) defaultStr(&options.PDRM.ConfigPath, "pd.rm.config", options.PDRM.ConfigPath) @@ -419,28 +426,20 @@ func populateDefaultOpt(flagSet *pflag.FlagSet) error { return nil } -func tryConnect(dsn string) error { - cli, err := sql.Open("mysql", dsn) - if err != nil { - return err - } - defer cli.Close() - - conn, err := cli.Conn(context.Background()) +func tryConnect(addr string, timeoutSec int) error { + conn, err := net.DialTimeout("tcp", addr, time.Duration(timeoutSec)*time.Second) if err != nil { return err } defer conn.Close() - return nil } // checkDB check if the addr is connectable by getting a connection from sql.DB. timeout <=0 means no timeout func checkDB(dbAddr string, timeout int) bool { - dsn := fmt.Sprintf("root:@tcp(%s)/", dbAddr) if timeout > 0 { for i := 0; i < timeout; i++ { - if tryConnect(dsn) == nil { + if tryConnect(dbAddr, timeout) == nil { return true } time.Sleep(time.Second) @@ -448,7 +447,7 @@ func checkDB(dbAddr string, timeout int) bool { return false } for { - if err := tryConnect(dsn); err == nil { + if err := tryConnect(dbAddr, timeout); err == nil { return true } time.Sleep(time.Second) diff --git a/components/playground/monitor.go b/components/playground/monitor.go index f0cfd2bd45..3d17a7edae 100644 --- a/components/playground/monitor.go +++ b/components/playground/monitor.go @@ -29,24 +29,20 @@ import ( ) // ref: https://prometheus.io/docs/prometheus/latest/configuration/configuration/#file_sd_config -func (m *monitor) renderSDFile(cid2targets map[string][]string) error { - type Item struct { - Targets []string `json:"targets"` - Labels map[string]string `json:"labels"` - } - - cid2targets["prometheus"] = []string{utils.JoinHostPort(m.host, m.port)} +func (m *monitor) renderSDFile(cid2targets map[string]instance.MetricAddr) error { + cid2targets["prometheus"] = instance.MetricAddr{Targets: []string{utils.JoinHostPort(m.host, m.port)}} - var items []Item + var items []instance.MetricAddr - for id, targets := range cid2targets { - item := Item{ - Targets: targets, - Labels: map[string]string{ - "job": id, - }, + for id, t := range cid2targets { + it := instance.MetricAddr{ + Targets: t.Targets, + Labels: map[string]string{"job": id}, + } + for k, v := range t.Labels { + it.Labels[k] = v } - items = append(items, item) + items = append(items, it) } data, err := json.MarshalIndent(&items, "", "\t") diff --git a/components/playground/playground.go b/components/playground/playground.go index fc600fc21e..4df08b60af 100644 --- a/components/playground/playground.go +++ b/components/playground/playground.go @@ -61,6 +61,9 @@ type Playground struct { port int pds []*instance.PDInstance + tsos []*instance.PDInstance + schedulings []*instance.PDInstance + rms []*instance.PDInstance tikvs []*instance.TiKVInstance tidbs []*instance.TiDBInstance tiflashs []*instance.TiFlashInstance @@ -276,6 +279,7 @@ func (p *Playground) handleScaleIn(w io.Writer, pid int) error { switch cid { case spec.ComponentPD: + // microservice not support scale in temporarily for i := 0; i < len(p.pds); i++ { if p.pds[i].Pid() == pid { inst := p.pds[i] @@ -593,6 +597,24 @@ func (p *Playground) WalkInstances(fn func(componentID string, ins instance.Inst return err } } + for _, ins := range p.tsos { + err := fn(spec.ComponentPD, ins) + if err != nil { + return err + } + } + for _, ins := range p.schedulings { + err := fn(spec.ComponentPD, ins) + if err != nil { + return err + } + } + for _, ins := range p.rms { + err := fn(spec.ComponentPD, ins) + if err != nil { + return err + } + } for _, ins := range p.tikvs { err := fn(spec.ComponentTiKV, ins) if err != nil { @@ -698,8 +720,12 @@ func (p *Playground) addInstance(componentID string, pdRole instance.PDRole, tif pd.InitCluster(p.pds) } } - } else { - p.pds = append(p.pds, inst) + } else if pdRole == instance.PDRoleTSO { + p.tsos = append(p.tsos, inst) + } else if pdRole == instance.PDRoleScheduling { + p.schedulings = append(p.schedulings, inst) + } else if pdRole == instance.PDRoleResourceManager { + p.rms = append(p.rms, inst) } case spec.ComponentTiDB: inst := instance.NewTiDBInstance(cfg.BinPath, dir, host, cfg.ConfigPath, id, cfg.Port, p.pds, p.enableBinlog(), p.bootOptions.Mode == "tidb-disagg") @@ -866,6 +892,7 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme &options.PD, &options.PDAPI, &options.PDTSO, + &options.PDScheduling, &options.PDRM, &options.TiProxy, &options.TiDB, @@ -975,6 +1002,7 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme instances = append([]InstancePair{ {spec.ComponentPD, instance.PDRoleAPI, instance.TiFlashRoleNormal, options.PDAPI}, {spec.ComponentPD, instance.PDRoleTSO, instance.TiFlashRoleNormal, options.PDTSO}, + {spec.ComponentPD, instance.PDRoleScheduling, instance.TiFlashRoleNormal, options.PDScheduling}, {spec.ComponentPD, instance.PDRoleResourceManager, instance.TiFlashRoleNormal, options.PDRM}}, instances..., ) @@ -1080,25 +1108,31 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme if p.bootOptions.Mode == "tikv-slim" { if p.bootOptions.PDMode == "ms" { var ( - tsoAddr []string - apiAddr []string - rmAddr []string + tsoAddr []string + apiAddr []string + rmAddr []string + schedulingAddr []string ) - for _, pd := range p.pds { - switch pd.Role { - case instance.PDRoleTSO: - tsoAddr = append(tsoAddr, pd.Addr()) - case instance.PDRoleAPI: - apiAddr = append(apiAddr, pd.Addr()) - case instance.PDRoleResourceManager: - rmAddr = append(rmAddr, pd.Addr()) - } + for _, api := range p.pds { + apiAddr = append(apiAddr, api.Addr()) } - fmt.Printf("PD TSO Endpoints: ") - colorCmd.Printf("%s\n", strings.Join(tsoAddr, ",")) + for _, tso := range p.tsos { + tsoAddr = append(tsoAddr, tso.Addr()) + } + for _, scheduling := range p.schedulings { + schedulingAddr = append(schedulingAddr, scheduling.Addr()) + } + for _, rm := range p.rms { + rmAddr = append(rmAddr, rm.Addr()) + } + fmt.Printf("PD API Endpoints: ") colorCmd.Printf("%s\n", strings.Join(apiAddr, ",")) - fmt.Printf("PD Resource Ranager Endpoints: ") + fmt.Printf("PD TSO Endpoints: ") + colorCmd.Printf("%s\n", strings.Join(tsoAddr, ",")) + fmt.Printf("PD Scheduling Endpoints: ") + colorCmd.Printf("%s\n", strings.Join(schedulingAddr, ",")) + fmt.Printf("PD Resource Manager Endpoints: ") colorCmd.Printf("%s\n", strings.Join(rmAddr, ",")) } else { var pdAddrs []string @@ -1232,6 +1266,21 @@ func (p *Playground) terminate(sig syscall.Signal) { kill(inst.Component(), inst.Pid(), inst.Wait) } } + for _, inst := range p.tsos { + if inst.Process != nil && inst.Process.Cmd() != nil && inst.Process.Cmd().Process != nil { + kill(inst.Component(), inst.Pid(), inst.Wait) + } + } + for _, inst := range p.schedulings { + if inst.Process != nil && inst.Process.Cmd() != nil && inst.Process.Cmd().Process != nil { + kill(inst.Component(), inst.Pid(), inst.Wait) + } + } + for _, inst := range p.rms { + if inst.Process != nil && inst.Process.Cmd() != nil && inst.Process.Cmd().Process != nil { + kill(inst.Component(), inst.Pid(), inst.Wait) + } + } } func (p *Playground) renderSDFile() error { @@ -1240,12 +1289,10 @@ func (p *Playground) renderSDFile() error { return nil } - cid2targets := make(map[string][]string) + cid2targets := make(map[string]instance.MetricAddr) _ = p.WalkInstances(func(cid string, inst instance.Instance) error { - targets := cid2targets[cid] - targets = append(targets, inst.StatusAddrs()...) - cid2targets[cid] = targets + cid2targets[cid] = inst.MetricAddr() return nil }) diff --git a/embed/templates/config/prometheus.yml.tpl b/embed/templates/config/prometheus.yml.tpl index 10bef9607d..238eb179ab 100644 --- a/embed/templates/config/prometheus.yml.tpl +++ b/embed/templates/config/prometheus.yml.tpl @@ -109,6 +109,22 @@ scrape_configs: - targets: {{- range .TiDBStatusAddrs}} - '{{.}}' +{{- end}} + - job_name: "tiproxy" + honor_labels: true # don't overwrite job & instance labels + metrics_path: /api/metrics +{{- if .TLSEnabled}} + scheme: https + tls_config: + insecure_skip_verify: false + ca_file: ../tls/ca.crt + cert_file: ../tls/prometheus.crt + key_file: ../tls/prometheus.pem +{{- end}} + static_configs: + - targets: +{{- range .TiProxyStatusAddrs}} + - '{{.}}' {{- end}} - job_name: "tikv" honor_labels: true # don't overwrite job & instance labels diff --git a/embed/templates/scripts/run_tiproxy.sh.tpl b/embed/templates/scripts/run_tiproxy.sh.tpl new file mode 100644 index 0000000000..83362c9121 --- /dev/null +++ b/embed/templates/scripts/run_tiproxy.sh.tpl @@ -0,0 +1,14 @@ +#!/bin/bash +set -e + +# WARNING: This file was auto-generated. Do not edit! +# All your edit might be overwritten! +DEPLOY_DIR={{.DeployDir}} +cd "${DEPLOY_DIR}" || exit 1 + +{{- if .NumaNode}} +exec numactl --cpunodebind={{.NumaNode}} --membind={{.NumaNode}} bin/tiproxy \ +{{- else}} +exec bin/tiproxy \ +{{- end}} + --config conf/tiproxy.toml diff --git a/pkg/cluster/ansible/import_test.go b/pkg/cluster/ansible/import_test.go index 970bd68c53..8c1f0fe1b6 100644 --- a/pkg/cluster/ansible/import_test.go +++ b/pkg/cluster/ansible/import_test.go @@ -126,6 +126,7 @@ server_configs: pd: {} tidb_dashboard: {} tiflash: {} + tiproxy: {} tiflash-learner: {} pump: {} drainer: {} @@ -135,6 +136,7 @@ server_configs: tidb_servers: [] tikv_servers: [] tiflash_servers: [] +tiproxy_servers: [] pd_servers: [] monitoring_servers: [] `) diff --git a/pkg/cluster/ansible/test-data/meta.yaml b/pkg/cluster/ansible/test-data/meta.yaml index d5c1263cbb..e143d7cd72 100644 --- a/pkg/cluster/ansible/test-data/meta.yaml +++ b/pkg/cluster/ansible/test-data/meta.yaml @@ -21,6 +21,7 @@ topology: pd: {} tidb_dashboard: {} tiflash: {} + tiproxy: {} tiflash-learner: {} pump: {} drainer: {} @@ -193,3 +194,4 @@ topology: data_dir: data/alertmanager-9093 arch: amd64 os: linux + tiproxy_servers: [] diff --git a/pkg/cluster/api/tidbapi.go b/pkg/cluster/api/tidbapi.go new file mode 100644 index 0000000000..793ba16656 --- /dev/null +++ b/pkg/cluster/api/tidbapi.go @@ -0,0 +1,77 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "context" + "crypto/tls" + "fmt" + "time" + + "github.com/pingcap/tiup/pkg/utils" +) + +// TiDBClient is client for access TiKVCDC Open API +type TiDBClient struct { + urls []string + client *utils.HTTPClient + ctx context.Context +} + +// NewTiDBClient return a `TiDBClient` +func NewTiDBClient(ctx context.Context, addresses []string, timeout time.Duration, tlsConfig *tls.Config) *TiDBClient { + httpPrefix := "http" + if tlsConfig != nil { + httpPrefix = "https" + } + urls := make([]string, 0, len(addresses)) + for _, addr := range addresses { + urls = append(urls, fmt.Sprintf("%s://%s", httpPrefix, addr)) + } + + return &TiDBClient{ + urls: urls, + client: utils.NewHTTPClient(timeout, tlsConfig), + ctx: ctx, + } +} + +func (c *TiDBClient) getEndpoints(api string) (endpoints []string) { + for _, url := range c.urls { + endpoints = append(endpoints, fmt.Sprintf("%s%s", url, api)) + } + return endpoints +} + +// StartUpgrade sends the start upgrade message to the TiDB server +func (c *TiDBClient) StartUpgrade() error { + api := "/upgrade/start" + endpoints := c.getEndpoints(api) + _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { + return c.client.Post(c.ctx, endpoint, nil) + }) + + return err +} + +// FinishUpgrade sends the finish upgrade message to the TiDB server +func (c *TiDBClient) FinishUpgrade() error { + api := "/upgrade/finish" + endpoints := c.getEndpoints(api) + _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { + return c.client.Post(c.ctx, endpoint, nil) + }) + + return err +} diff --git a/pkg/cluster/executor/executor.go b/pkg/cluster/executor/executor.go index a2c2f20aab..8cc867a991 100644 --- a/pkg/cluster/executor/executor.go +++ b/pkg/cluster/executor/executor.go @@ -118,6 +118,19 @@ func New(etype SSHType, sudo bool, c SSHConfig) (ctxt.Executor, error) { return &CheckPointExecutor{executor, &c}, nil } +// UnwarpCheckPointExecutor unwarp the CheckPointExecutor and return the real executor +// +// Sometimes we just want to get the output of a command, and the CheckPointExecutor will +// always cache the output, it will be a problem when we want to get the real output. +func UnwarpCheckPointExecutor(e ctxt.Executor) ctxt.Executor { + switch e := e.(type) { + case *CheckPointExecutor: + return e.Executor + default: + return e + } +} + func checkLocalIP(ip string) error { ifaces, err := net.Interfaces() if err != nil { diff --git a/pkg/cluster/manager/display.go b/pkg/cluster/manager/display.go index 8965b14bb0..d2f68990e6 100644 --- a/pkg/cluster/manager/display.go +++ b/pkg/cluster/manager/display.go @@ -50,6 +50,7 @@ type DisplayOption struct { ShowUptime bool ShowProcess bool ShowManageHost bool + ShowNuma bool } // InstInfo represents an instance info @@ -67,6 +68,8 @@ type InstInfo struct { Since string `json:"since"` DataDir string `json:"data_dir"` DeployDir string `json:"deploy_dir"` + NumaNode string `json:"numa_node"` + NumaCores string `json:"numa_cores"` ComponentName string Port int @@ -199,6 +202,10 @@ func (m *Manager) Display(dopt DisplayOption, opt operator.Options) error { if dopt.ShowUptime { rowHead = append(rowHead, "Since") } + if dopt.ShowNuma { + rowHead = append(rowHead, "Numa Node", "Numd Cores") + } + rowHead = append(rowHead, "Data Dir", "Deploy Dir") clusterTable = append(clusterTable, rowHead) @@ -225,6 +232,10 @@ func (m *Manager) Display(dopt DisplayOption, opt operator.Options) error { if dopt.ShowUptime { row = append(row, v.Since) } + if dopt.ShowNuma { + row = append(row, v.NumaNode, v.NumaCores) + } + row = append(row, v.DataDir, v.DeployDir) clusterTable = append(clusterTable, row) @@ -654,6 +665,8 @@ func (m *Manager) GetClusterTopology(dopt DisplayOption, opt operator.Options) ( ComponentName: ins.ComponentName(), Port: ins.GetPort(), Since: since, + NumaNode: utils.Ternary(ins.GetNumaNode() == "", "-", ins.GetNumaNode()).(string), + NumaCores: utils.Ternary(ins.GetNumaCores() == "", "-", ins.GetNumaCores()).(string), }) mu.Unlock() }, opt.Concurrency) diff --git a/pkg/cluster/manager/patch.go b/pkg/cluster/manager/patch.go index afbfa28a3d..64936ec952 100644 --- a/pkg/cluster/manager/patch.go +++ b/pkg/cluster/manager/patch.go @@ -104,7 +104,8 @@ func (m *Manager) Patch(name string, packagePath string, opt operator.Options, o if offline { return nil } - return operator.Upgrade(ctx, topo, opt, tlsCfg, base.Version) + // TBD: should patch be treated as an upgrade? + return operator.Upgrade(ctx, topo, opt, tlsCfg, base.Version, base.Version) }). Build() diff --git a/pkg/cluster/manager/reload.go b/pkg/cluster/manager/reload.go index 2a78e3fe41..226c66535e 100644 --- a/pkg/cluster/manager/reload.go +++ b/pkg/cluster/manager/reload.go @@ -124,7 +124,7 @@ func (m *Manager) Reload(name string, gOpt operator.Options, skipRestart, skipCo return err } b.Func("Upgrade Cluster", func(ctx context.Context) error { - return operator.Upgrade(ctx, topo, gOpt, tlsCfg, base.Version) + return operator.Upgrade(ctx, topo, gOpt, tlsCfg, base.Version, base.Version) }) } diff --git a/pkg/cluster/manager/upgrade.go b/pkg/cluster/manager/upgrade.go index 341f3601c9..9f40cb279f 100644 --- a/pkg/cluster/manager/upgrade.go +++ b/pkg/cluster/manager/upgrade.go @@ -38,7 +38,7 @@ import ( ) // Upgrade the cluster. -func (m *Manager) Upgrade(name string, clusterVersion string, componentVersions map[string]string, opt operator.Options, skipConfirm, offline bool) error { +func (m *Manager) Upgrade(name string, clusterVersion string, componentVersions map[string]string, opt operator.Options, skipConfirm, offline, ignoreVersionCheck bool) error { if !skipConfirm && strings.ToLower(opt.DisplayMode) != "json" { for _, v := range componentVersions { if v != "" { @@ -81,7 +81,10 @@ func (m *Manager) Upgrade(name string, clusterVersion string, componentVersions ) if err := versionCompare(base.Version, clusterVersion); err != nil { - return err + if !ignoreVersionCheck { + return err + } + m.logger.Warnf(color.RedString("There is no guarantee that the cluster can be downgraded. Be careful before you continue.")) } if !skipConfirm { @@ -293,7 +296,7 @@ Do you want to continue? [y/N]:`, if offline { return nil } - return operator.Upgrade(ctx, topo, opt, tlsCfg, base.Version) + return operator.Upgrade(ctx, topo, opt, tlsCfg, base.Version, clusterVersion) }). Build() diff --git a/pkg/cluster/module/wait_for.go b/pkg/cluster/module/wait_for.go index 8a59104e1b..757b0f8fae 100644 --- a/pkg/cluster/module/wait_for.go +++ b/pkg/cluster/module/wait_for.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tiup/pkg/cluster/ctxt" + "github.com/pingcap/tiup/pkg/cluster/executor" "github.com/pingcap/tiup/pkg/utils" "go.uber.org/zap" ) @@ -71,7 +72,7 @@ func (w *WaitFor) Execute(ctx context.Context, e ctxt.Executor) (err error) { } if err := utils.Retry(func() error { // only listing TCP ports - stdout, _, err := e.Execute(ctx, "ss -ltn", false) + stdout, _, err := executor.UnwarpCheckPointExecutor(e).Execute(ctx, "ss -ltn", false) if err == nil { switch w.c.State { case "started": diff --git a/pkg/cluster/operation/upgrade.go b/pkg/cluster/operation/upgrade.go index a34731a9fb..5f4ab04b3f 100644 --- a/pkg/cluster/operation/upgrade.go +++ b/pkg/cluster/operation/upgrade.go @@ -39,13 +39,14 @@ var ( increaseLimitPoint = checkpoint.Register() ) -// Upgrade the cluster. +// Upgrade the cluster. (actually, it's rolling restart) func Upgrade( ctx context.Context, topo spec.Topology, options Options, tlsCfg *tls.Config, currentVersion string, + targetVersion string, ) error { roleFilter := set.NewStringSet(options.Roles...) nodeFilter := set.NewStringSet(options.Nodes...) @@ -70,6 +71,7 @@ func Upgrade( var origRegionScheduleLimit int var err error + var tidbClient *api.TiDBClient var pdEndpoints []string forcePDEndpoints := os.Getenv(EnvNamePDEndpointOverwrite) // custom set PD endpoint list @@ -100,6 +102,21 @@ func Upgrade( } }() } + case spec.ComponentTiDB: + dbs := topo.(*spec.Specification).TiDBServers + endpoints := []string{} + for _, db := range dbs { + endpoints = append(endpoints, utils.JoinHostPort(db.GetManageHost(), db.StatusPort)) + } + + if currentVersion != targetVersion && tidbver.TiDBSupportUpgradeAPI(currentVersion) && tidbver.TiDBSupportUpgradeAPI(targetVersion) { + tidbClient = api.NewTiDBClient(ctx, endpoints, 10*time.Second, tlsCfg) + err = tidbClient.StartUpgrade() + if err != nil { + return err + } + } + default: // do nothing, kept for future usage with other components } @@ -178,6 +195,19 @@ func Upgrade( return err } } + + switch component.Name() { + case spec.ComponentTiDB: + if currentVersion != targetVersion && tidbver.TiDBSupportUpgradeAPI(currentVersion) && tidbver.TiDBSupportUpgradeAPI(targetVersion) { + err = tidbClient.FinishUpgrade() + if err != nil { + return err + } + } + + default: + // do nothing, kept for future usage with other components + } } if topo.GetMonitoredOptions() == nil { diff --git a/pkg/cluster/spec/alertmanager.go b/pkg/cluster/spec/alertmanager.go index 075a2f70f6..a6ba3a617b 100644 --- a/pkg/cluster/spec/alertmanager.go +++ b/pkg/cluster/spec/alertmanager.go @@ -130,6 +130,8 @@ func (c *AlertManagerComponent) Instances() []Instance { ListenHost: s.ListenHost, Port: s.WebPort, SSHP: s.SSHPort, + NumaNode: s.NumaNode, + NumaCores: "", Ports: []int{ s.WebPort, diff --git a/pkg/cluster/spec/bindversion.go b/pkg/cluster/spec/bindversion.go index b20c24c9fb..1f27033adf 100644 --- a/pkg/cluster/spec/bindversion.go +++ b/pkg/cluster/spec/bindversion.go @@ -30,6 +30,8 @@ func TiDBComponentVersion(comp, version string) string { ComponentTiSpark, ComponentTiKVCDC: // TiKV-CDC use individual version. return "" + case ComponentTiProxy: + return "nightly" default: return version } diff --git a/pkg/cluster/spec/cdc.go b/pkg/cluster/spec/cdc.go index de818ddfbe..341627833c 100644 --- a/pkg/cluster/spec/cdc.go +++ b/pkg/cluster/spec/cdc.go @@ -140,6 +140,8 @@ func (c *CDCComponent) Instances() []Instance { Port: s.Port, SSHP: s.SSHPort, Source: s.GetSource(), + NumaNode: s.NumaNode, + NumaCores: "", Ports: []int{ s.Port, diff --git a/pkg/cluster/spec/dashboard.go b/pkg/cluster/spec/dashboard.go index 7c12b9d0a0..26c429e4b5 100644 --- a/pkg/cluster/spec/dashboard.go +++ b/pkg/cluster/spec/dashboard.go @@ -142,6 +142,8 @@ func (c *DashboardComponent) Instances() []Instance { Port: s.Port, SSHP: s.SSHPort, Source: s.Source, + NumaNode: s.NumaNode, + NumaCores: "", Ports: []int{ s.Port, diff --git a/pkg/cluster/spec/drainer.go b/pkg/cluster/spec/drainer.go index e81d284ccc..90d75dcba4 100644 --- a/pkg/cluster/spec/drainer.go +++ b/pkg/cluster/spec/drainer.go @@ -161,6 +161,8 @@ func (c *DrainerComponent) Instances() []Instance { Port: s.Port, SSHP: s.SSHPort, Source: s.GetSource(), + NumaNode: s.NumaNode, + NumaCores: "", Ports: []int{ s.Port, diff --git a/pkg/cluster/spec/grafana.go b/pkg/cluster/spec/grafana.go index 8ac3982944..29d9171fa0 100644 --- a/pkg/cluster/spec/grafana.go +++ b/pkg/cluster/spec/grafana.go @@ -137,6 +137,8 @@ func (c *GrafanaComponent) Instances() []Instance { ManageHost: s.ManageHost, Port: s.Port, SSHP: s.SSHPort, + NumaNode: "", + NumaCores: "", Ports: []int{ s.Port, @@ -197,7 +199,7 @@ func (i *GrafanaInstance) InitConfig( // transfer config spec := i.InstanceSpec.(*GrafanaSpec) - fp = filepath.Join(paths.Cache, fmt.Sprintf("grafana_%s.ini", i.GetHost())) + fp = filepath.Join(paths.Cache, fmt.Sprintf("grafana_%s_%d.ini", i.GetHost(), i.GetPort())) if err := config.NewGrafanaConfig(i.GetHost(), paths.Deploy). WithPort(uint64(i.GetPort())). WithUsername(spec.Username). diff --git a/pkg/cluster/spec/instance.go b/pkg/cluster/spec/instance.go index 6298456a3a..3281484a80 100644 --- a/pkg/cluster/spec/instance.go +++ b/pkg/cluster/spec/instance.go @@ -99,6 +99,8 @@ type Instance interface { GetManageHost() string GetPort() int GetSSHPort() int + GetNumaNode() string + GetNumaCores() string DeployDir() string UsedPorts() []int UsedDirs() []string @@ -148,6 +150,8 @@ type BaseInstance struct { Port int SSHP int Source string + NumaNode string + NumaCores string Ports []int Dirs []string @@ -371,6 +375,16 @@ func (i *BaseInstance) GetSSHPort() int { return i.SSHP } +// GetNumaNode implements Instance interface +func (i *BaseInstance) GetNumaNode() string { + return i.NumaNode +} + +// GetNumaCores implements Instance interface +func (i *BaseInstance) GetNumaCores() string { + return i.NumaCores +} + // DeployDir implements Instance interface func (i *BaseInstance) DeployDir() string { return reflect.Indirect(reflect.ValueOf(i.InstanceSpec)).FieldByName("DeployDir").String() diff --git a/pkg/cluster/spec/monitoring.go b/pkg/cluster/spec/monitoring.go index 1580bdd645..1039b96fd1 100644 --- a/pkg/cluster/spec/monitoring.go +++ b/pkg/cluster/spec/monitoring.go @@ -153,6 +153,8 @@ func (c *MonitorComponent) Instances() []Instance { ManageHost: s.ManageHost, Port: s.Port, SSHP: s.SSHPort, + NumaNode: s.NumaNode, + NumaCores: "", Ports: []int{ s.Port, @@ -263,6 +265,13 @@ func (i *MonitorInstance) InitConfig( cfig.AddTiDB(db.Host, uint64(db.StatusPort)) } } + if servers, found := topoHasField("TiProxyServers"); found { + for i := 0; i < servers.Len(); i++ { + db := servers.Index(i).Interface().(*TiProxySpec) + uniqueHosts.Insert(db.Host) + cfig.AddTiProxy(db.Host, uint64(db.StatusPort)) + } + } if servers, found := topoHasField("TiFlashServers"); found { for i := 0; i < servers.Len(); i++ { flash := servers.Index(i).Interface().(*TiFlashSpec) diff --git a/pkg/cluster/spec/pd.go b/pkg/cluster/spec/pd.go index ad01761d25..4671f3794d 100644 --- a/pkg/cluster/spec/pd.go +++ b/pkg/cluster/spec/pd.go @@ -189,6 +189,8 @@ func (c *PDComponent) Instances() []Instance { Port: s.ClientPort, SSHP: s.SSHPort, Source: s.GetSource(), + NumaNode: s.NumaNode, + NumaCores: "", Ports: []int{ s.ClientPort, diff --git a/pkg/cluster/spec/pump.go b/pkg/cluster/spec/pump.go index a7b6122d9b..42ac0a1f42 100644 --- a/pkg/cluster/spec/pump.go +++ b/pkg/cluster/spec/pump.go @@ -160,6 +160,8 @@ func (c *PumpComponent) Instances() []Instance { Port: s.Port, SSHP: s.SSHPort, Source: s.GetSource(), + NumaNode: s.NumaNode, + NumaCores: "", Ports: []int{ s.Port, diff --git a/pkg/cluster/spec/spec.go b/pkg/cluster/spec/spec.go index d0a7089973..a8b1cfc708 100644 --- a/pkg/cluster/spec/spec.go +++ b/pkg/cluster/spec/spec.go @@ -111,6 +111,7 @@ type ( PD map[string]any `yaml:"pd"` Dashboard map[string]any `yaml:"tidb_dashboard"` TiFlash map[string]any `yaml:"tiflash"` + TiProxy map[string]any `yaml:"tiproxy"` TiFlashLearner map[string]any `yaml:"tiflash-learner"` Pump map[string]any `yaml:"pump"` Drainer map[string]any `yaml:"drainer"` @@ -130,6 +131,7 @@ type ( Drainer string `yaml:"drainer"` CDC string `yaml:"cdc"` TiKVCDC string `yaml:"kvcdc"` + TiProxy string `yaml:"tiproxy"` Prometheus string `yaml:"prometheus"` Grafana string `yaml:"grafana"` AlertManager string `yaml:"alertmanager"` @@ -146,6 +148,7 @@ type ( TiDBServers []*TiDBSpec `yaml:"tidb_servers"` TiKVServers []*TiKVSpec `yaml:"tikv_servers"` TiFlashServers []*TiFlashSpec `yaml:"tiflash_servers"` + TiProxyServers []*TiProxySpec `yaml:"tiproxy_servers"` PDServers []*PDSpec `yaml:"pd_servers"` DashboardServers []*DashboardSpec `yaml:"tidb_dashboard_servers,omitempty"` PumpServers []*PumpSpec `yaml:"pump_servers,omitempty"` @@ -529,6 +532,7 @@ func (s *Specification) Merge(that Topology) Topology { PDServers: append(s.PDServers, spec.PDServers...), DashboardServers: append(s.DashboardServers, spec.DashboardServers...), TiFlashServers: append(s.TiFlashServers, spec.TiFlashServers...), + TiProxyServers: append(s.TiProxyServers, spec.TiProxyServers...), PumpServers: append(s.PumpServers, spec.PumpServers...), Drainers: append(s.Drainers, spec.Drainers...), CDCServers: append(s.CDCServers, spec.CDCServers...), @@ -737,9 +741,10 @@ func (s *Specification) ComponentsByStopOrder() (comps []Component) { // ComponentsByStartOrder return component in the order need to start. func (s *Specification) ComponentsByStartOrder() (comps []Component) { - // "pd", "dashboard", "tikv", "pump", "tidb", "tiflash", "drainer", "cdc", "tikv-cdc", "prometheus", "grafana", "alertmanager" + // "pd", "dashboard", "tiproxy", "tikv", "pump", "tidb", "tiflash", "drainer", "cdc", "tikv-cdc", "prometheus", "grafana", "alertmanager" comps = append(comps, &PDComponent{s}) comps = append(comps, &DashboardComponent{s}) + comps = append(comps, &TiProxyComponent{s}) comps = append(comps, &TiKVComponent{s}) comps = append(comps, &PumpComponent{s}) comps = append(comps, &TiDBComponent{s}) @@ -760,13 +765,14 @@ func (s *Specification) ComponentsByUpdateOrder(curVer string) (comps []Componen // Ref: https://github.com/pingcap/tiup/issues/2166 cdcUpgradeBeforePDTiKVTiDB := tidbver.TiCDCUpgradeBeforePDTiKVTiDB(curVer) - // "tiflash", <"cdc">, "pd", "dashboard", "tikv", "pump", "tidb", "drainer", <"cdc>", "prometheus", "grafana", "alertmanager" + // "tiflash", <"cdc">, "pd", "dashboard", "tiproxy", "tikv", "pump", "tidb", "drainer", <"cdc>", "prometheus", "grafana", "alertmanager" comps = append(comps, &TiFlashComponent{s}) if cdcUpgradeBeforePDTiKVTiDB { comps = append(comps, &CDCComponent{s}) } comps = append(comps, &PDComponent{s}) comps = append(comps, &DashboardComponent{s}) + comps = append(comps, &TiProxyComponent{s}) comps = append(comps, &TiKVComponent{s}) comps = append(comps, &PumpComponent{s}) comps = append(comps, &TiDBComponent{s}) diff --git a/pkg/cluster/spec/tidb.go b/pkg/cluster/spec/tidb.go index e94fe21011..6e5ef550a5 100644 --- a/pkg/cluster/spec/tidb.go +++ b/pkg/cluster/spec/tidb.go @@ -138,6 +138,8 @@ func (c *TiDBComponent) Instances() []Instance { Port: s.Port, SSHP: s.SSHPort, Source: s.Source, + NumaNode: s.NumaNode, + NumaCores: s.NumaCores, Ports: []int{ s.Port, diff --git a/pkg/cluster/spec/tiflash.go b/pkg/cluster/spec/tiflash.go index 13d7f1696a..7dafc28ad4 100644 --- a/pkg/cluster/spec/tiflash.go +++ b/pkg/cluster/spec/tiflash.go @@ -308,6 +308,8 @@ func (c *TiFlashComponent) Instances() []Instance { Port: s.GetMainPort(), SSHP: s.SSHPort, Source: s.GetSource(), + NumaNode: s.NumaNode, + NumaCores: s.NumaCores, Ports: []int{ s.TCPPort, diff --git a/pkg/cluster/spec/tikv.go b/pkg/cluster/spec/tikv.go index 8fd4d511d0..089d1cb9bf 100644 --- a/pkg/cluster/spec/tikv.go +++ b/pkg/cluster/spec/tikv.go @@ -213,6 +213,8 @@ func (c *TiKVComponent) Instances() []Instance { Port: s.Port, SSHP: s.SSHPort, Source: s.Source, + NumaNode: s.NumaNode, + NumaCores: s.NumaCores, Ports: []int{ s.Port, diff --git a/pkg/cluster/spec/tikv_cdc.go b/pkg/cluster/spec/tikv_cdc.go index 51301e09c1..1247178216 100644 --- a/pkg/cluster/spec/tikv_cdc.go +++ b/pkg/cluster/spec/tikv_cdc.go @@ -138,6 +138,8 @@ func (c *TiKVCDCComponent) Instances() []Instance { Port: s.Port, SSHP: s.SSHPort, Source: s.GetSource(), + NumaNode: s.NumaNode, + NumaCores: "", Ports: []int{ s.Port, diff --git a/pkg/cluster/spec/tiproxy.go b/pkg/cluster/spec/tiproxy.go new file mode 100644 index 0000000000..9cfdc56104 --- /dev/null +++ b/pkg/cluster/spec/tiproxy.go @@ -0,0 +1,329 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package spec + +import ( + "bytes" + "context" + "crypto/tls" + "fmt" + "path/filepath" + "strings" + "time" + + "github.com/pingcap/tiup/pkg/cluster/ctxt" + "github.com/pingcap/tiup/pkg/cluster/template/scripts" + "github.com/pingcap/tiup/pkg/meta" + "github.com/pingcap/tiup/pkg/utils" + "github.com/prometheus/common/expfmt" +) + +func proxyUptimeByHost(host string, port int, timeout time.Duration, tlsCfg *tls.Config) time.Duration { + if timeout < time.Second { + timeout = statusQueryTimeout + } + + scheme := "http" + if tlsCfg != nil { + scheme = "https" + } + url := fmt.Sprintf("%s://%s/api/metrics", scheme, utils.JoinHostPort(host, port)) + + client := utils.NewHTTPClient(timeout, tlsCfg) + + body, err := client.Get(context.TODO(), url) + if err != nil || body == nil { + return 0 + } + + var parser expfmt.TextParser + reader := bytes.NewReader(body) + mf, err := parser.TextToMetricFamilies(reader) + if err != nil { + return 0 + } + + now := time.Now() + for k, v := range mf { + if k == promMetricStartTimeSeconds { + ms := v.GetMetric() + if len(ms) >= 1 { + startTime := ms[0].Gauge.GetValue() + return now.Sub(time.Unix(int64(startTime), 0)) + } + return 0 + } + } + + return 0 +} + +// TiProxySpec represents the TiProxy topology specification in topology.yaml +type TiProxySpec struct { + Host string `yaml:"host"` + ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"` + SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"` + Port int `yaml:"port" default:"6000"` + StatusPort int `yaml:"status_port" default:"3080"` + DeployDir string `yaml:"deploy_dir,omitempty"` + NumaNode string `yaml:"numa_node,omitempty" validate:"numa_node:editable"` + Config map[string]any `yaml:"config,omitempty" validate:"config:ignore"` + Arch string `yaml:"arch,omitempty"` + OS string `yaml:"os,omitempty"` +} + +// Role returns the component role of the instance +func (s *TiProxySpec) Role() string { + return ComponentTiProxy +} + +// SSH returns the host and SSH port of the instance +func (s *TiProxySpec) SSH() (string, int) { + return s.Host, s.SSHPort +} + +// GetMainPort returns the main port of the instance +func (s *TiProxySpec) GetMainPort() int { + return s.Port +} + +// GetManageHost returns the manage host of the instance +func (s *TiProxySpec) GetManageHost() string { + if s.ManageHost != "" { + return s.ManageHost + } + return s.Host +} + +// IsImported returns if the node is imported from TiDB-Ansible +func (s *TiProxySpec) IsImported() bool { + return false +} + +// IgnoreMonitorAgent returns if the node does not have monitor agents available +func (s *TiProxySpec) IgnoreMonitorAgent() bool { + return false +} + +// TiProxyComponent represents TiProxy component. +type TiProxyComponent struct{ Topology *Specification } + +// Name implements Component interface. +func (c *TiProxyComponent) Name() string { + return ComponentTiProxy +} + +// Role implements Component interface. +func (c *TiProxyComponent) Role() string { + return ComponentTiProxy +} + +// CalculateVersion implements the Component interface +func (c *TiProxyComponent) CalculateVersion(clusterVersion string) string { + // always not follow global version, use ""(latest) by default + version := c.Topology.ComponentVersions.TiProxy + if version != "" { + return version + } + return "nightly" +} + +// SetVersion implements Component interface. +func (c *TiProxyComponent) SetVersion(version string) { + c.Topology.ComponentVersions.TiProxy = version +} + +// Instances implements Component interface. +func (c *TiProxyComponent) Instances() []Instance { + ins := make([]Instance, 0, len(c.Topology.TiProxyServers)) + for _, s := range c.Topology.TiProxyServers { + s := s + instance := &TiProxyInstance{BaseInstance{ + InstanceSpec: s, + Name: c.Name(), + Host: s.Host, + ManageHost: s.ManageHost, + Port: s.Port, + SSHP: s.SSHPort, + Source: ComponentTiProxy, + NumaNode: s.NumaNode, + NumaCores: "", + Ports: []int{ + s.Port, + s.StatusPort, + }, + Dirs: []string{ + s.DeployDir, + }, + StatusFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config, _ ...string) string { + return statusByHost(s.Host, s.StatusPort, "/api/debug/health", timeout, tlsCfg) + }, + UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { + return proxyUptimeByHost(s.Host, s.StatusPort, timeout, tlsCfg) + }, + }, c.Topology} + + ins = append(ins, instance) + } + return ins +} + +// TiProxyInstance represent the TiProxy instance. +type TiProxyInstance struct { + BaseInstance + topo Topology +} + +// ScaleConfig deploy temporary config on scaling +func (i *TiProxyInstance) ScaleConfig( + ctx context.Context, + e ctxt.Executor, + topo Topology, + clusterName, + clusterVersion, + user string, + paths meta.DirPaths, +) error { + s := i.topo + defer func() { + i.topo = s + }() + i.topo = mustBeClusterTopo(topo) + + return i.InitConfig(ctx, e, clusterName, clusterVersion, user, paths) +} + +func (i *TiProxyInstance) checkConfig( + cfg map[string]any, + paths meta.DirPaths, +) map[string]any { + topo := i.topo.(*Specification) + spec := i.InstanceSpec.(*TiProxySpec) + enableTLS := topo.GlobalOptions.TLSEnabled + + if cfg == nil { + cfg = make(map[string]any) + } + + pds := []string{} + for _, pdspec := range topo.PDServers { + pds = append(pds, pdspec.GetAdvertiseClientURL(enableTLS)) + } + cfg["proxy.pd-addrs"] = strings.Join(pds, ",") + cfg["proxy.require-backend-tls"] = false + cfg["proxy.addr"] = utils.JoinHostPort(i.GetListenHost(), i.GetPort()) + cfg["api.addr"] = utils.JoinHostPort(i.GetListenHost(), spec.StatusPort) + cfg["log.log-file.filename"] = filepath.Join(paths.Log, "tiproxy.log") + + return cfg +} + +// InitConfig implements Instance interface. +func (i *TiProxyInstance) InitConfig( + ctx context.Context, + e ctxt.Executor, + clusterName, + clusterVersion, + deployUser string, + paths meta.DirPaths, +) error { + topo := i.topo.(*Specification) + if err := i.BaseInstance.InitConfig(ctx, e, topo.GlobalOptions, deployUser, paths); err != nil { + return err + } + spec := i.InstanceSpec.(*TiProxySpec) + globalConfig := topo.ServerConfigs.TiProxy + instanceConfig := i.checkConfig(spec.Config, paths) + + cfg := &scripts.TiProxyScript{ + DeployDir: paths.Deploy, + NumaNode: spec.NumaNode, + } + + fp := filepath.Join(paths.Cache, fmt.Sprintf("run_tiproxy_%s_%d.sh", i.GetHost(), i.GetPort())) + + if err := cfg.ConfigToFile(fp); err != nil { + return err + } + dst := filepath.Join(paths.Deploy, "scripts", "run_tiproxy.sh") + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { + return err + } + + if _, _, err := e.Execute(ctx, "chmod +x "+dst, false); err != nil { + return err + } + + var err error + instanceConfig, err = i.setTLSConfig(ctx, false, instanceConfig, paths) + if err != nil { + return err + } + + return i.MergeServerConfig(ctx, e, globalConfig, instanceConfig, paths) +} + +// setTLSConfig set TLS Config to support enable/disable TLS +func (i *TiProxyInstance) setTLSConfig(ctx context.Context, enableTLS bool, configs map[string]any, paths meta.DirPaths) (map[string]any, error) { + if configs == nil { + configs = make(map[string]any) + } + if enableTLS { + configs["security.cluster-tls.ca"] = fmt.Sprintf("%s/tls/%s", paths.Deploy, TLSCACert) + configs["security.cluster-tls.cert"] = fmt.Sprintf("%s/tls/%s.crt", paths.Deploy, i.Role()) + configs["security.cluster-tls.key"] = fmt.Sprintf("%s/tls/%s.pem", paths.Deploy, i.Role()) + + configs["security.server-tls.ca"] = fmt.Sprintf("%s/tls/%s", paths.Deploy, TLSCACert) + configs["security.server-tls.cert"] = fmt.Sprintf("%s/tls/%s.crt", paths.Deploy, i.Role()) + configs["security.server-tls.key"] = fmt.Sprintf("%s/tls/%s.pem", paths.Deploy, i.Role()) + configs["security.server-tls.skip-ca"] = true + + configs["security.sql-tls.ca"] = fmt.Sprintf("%s/tls/%s", paths.Deploy, TLSCACert) + } else { + // drainer tls config list + tlsConfigs := []string{ + "security.cluster-tls.ca", + "security.cluster-tls.cert", + "security.cluster-tls.key", + "security.server-tls.ca", + "security.server-tls.cert", + "security.server-tls.key", + "security.server-tls.skip-ca", + "security.sql-tls.ca", + } + // delete TLS configs + for _, config := range tlsConfigs { + delete(configs, config) + } + } + + return nil, nil +} + +var _ RollingUpdateInstance = &TiProxyInstance{} + +// GetAddr return the address of this TiProxy instance +func (i *TiProxyInstance) GetAddr() string { + return utils.JoinHostPort(i.GetHost(), i.GetPort()) +} + +// PreRestart implements RollingUpdateInstance interface. +func (i *TiProxyInstance) PreRestart(ctx context.Context, topo Topology, apiTimeoutSeconds int, tlsCfg *tls.Config) error { + return nil +} + +// PostRestart implements RollingUpdateInstance interface. +func (i *TiProxyInstance) PostRestart(ctx context.Context, topo Topology, tlsCfg *tls.Config) error { + return nil +} diff --git a/pkg/cluster/spec/tispark.go b/pkg/cluster/spec/tispark.go index fac2306c91..59aea16985 100644 --- a/pkg/cluster/spec/tispark.go +++ b/pkg/cluster/spec/tispark.go @@ -164,6 +164,8 @@ func (c *TiSparkMasterComponent) Instances() []Instance { Host: s.Host, Port: s.Port, SSHP: s.SSHPort, + NumaNode: "", + NumaCores: "", Ports: []int{ s.Port, @@ -354,6 +356,8 @@ func (c *TiSparkWorkerComponent) Instances() []Instance { Host: s.Host, Port: s.Port, SSHP: s.SSHPort, + NumaNode: "", + NumaCores: "", Ports: []int{ s.Port, diff --git a/pkg/cluster/task/update_meta.go b/pkg/cluster/task/update_meta.go index 8e9e3d3e6d..dd6457e1b9 100644 --- a/pkg/cluster/task/update_meta.go +++ b/pkg/cluster/task/update_meta.go @@ -72,6 +72,15 @@ func (u *UpdateMeta) Execute(ctx context.Context) error { } newMeta.Topology.PDServers = pdServers + tiproxyServers := make([]*spec.TiProxySpec, 0) + for i, instance := range (&spec.TiProxyComponent{Topology: topo}).Instances() { + if deleted.Exist(instance.ID()) { + continue + } + tiproxyServers = append(tiproxyServers, topo.TiProxyServers[i]) + } + newMeta.Topology.TiProxyServers = tiproxyServers + dashboardServers := make([]*spec.DashboardSpec, 0) for i, instance := range (&spec.DashboardComponent{Topology: topo}).Instances() { if deleted.Exist(instance.ID()) { @@ -79,7 +88,7 @@ func (u *UpdateMeta) Execute(ctx context.Context) error { } dashboardServers = append(dashboardServers, topo.DashboardServers[i]) } - topo.DashboardServers = dashboardServers + newMeta.Topology.DashboardServers = dashboardServers tiflashServers := make([]*spec.TiFlashSpec, 0) for i, instance := range (&spec.TiFlashComponent{Topology: topo}).Instances() { diff --git a/pkg/cluster/template/config/prometheus.go b/pkg/cluster/template/config/prometheus.go index eacef47238..90d6b00051 100644 --- a/pkg/cluster/template/config/prometheus.go +++ b/pkg/cluster/template/config/prometheus.go @@ -31,6 +31,7 @@ type PrometheusConfig struct { TLSEnabled bool NodeExporterAddrs []string TiDBStatusAddrs []string + TiProxyStatusAddrs []string TiKVStatusAddrs []string PDAddrs []string TiFlashStatusAddrs []string @@ -79,6 +80,12 @@ func (c *PrometheusConfig) AddTiDB(ip string, port uint64) *PrometheusConfig { return c } +// AddTiProxy add a TiProxy address +func (c *PrometheusConfig) AddTiProxy(ip string, port uint64) *PrometheusConfig { + c.TiProxyStatusAddrs = append(c.TiProxyStatusAddrs, utils.JoinHostPort(ip, int(port))) + return c +} + // AddTiKV add a TiKV address func (c *PrometheusConfig) AddTiKV(ip string, port uint64) *PrometheusConfig { c.TiKVStatusAddrs = append(c.TiKVStatusAddrs, utils.JoinHostPort(ip, int(port))) diff --git a/pkg/cluster/template/scripts/tiproxy.go b/pkg/cluster/template/scripts/tiproxy.go new file mode 100644 index 0000000000..d0bd8d5980 --- /dev/null +++ b/pkg/cluster/template/scripts/tiproxy.go @@ -0,0 +1,49 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package scripts + +import ( + "bytes" + "path" + "text/template" + + "github.com/pingcap/tiup/embed" + "github.com/pingcap/tiup/pkg/utils" +) + +// TiProxyScript represent the data to generate tiproxy config +type TiProxyScript struct { + DeployDir string + NumaNode string +} + +// ConfigToFile write config content to specific file. +func (c *TiProxyScript) ConfigToFile(file string) error { + fp := path.Join("templates", "scripts", "run_tiproxy.sh.tpl") + tpl, err := embed.ReadTemplate(fp) + if err != nil { + return err + } + tmpl, err := template.New("TiProxy").Parse(string(tpl)) + if err != nil { + return err + } + + content := bytes.NewBufferString("") + if err := tmpl.Execute(content, c); err != nil { + return err + } + + return utils.WriteFile(file, content.Bytes(), 0755) +} diff --git a/pkg/repository/mirror.go b/pkg/repository/mirror.go index f88cc75468..7fb446f704 100644 --- a/pkg/repository/mirror.go +++ b/pkg/repository/mirror.go @@ -15,6 +15,7 @@ package repository import ( "bytes" + "crypto/tls" "encoding/json" stderrors "errors" "fmt" @@ -293,6 +294,14 @@ func (l *httpMirror) downloadFile(url string, to string, maxSize int64) (io.Read }(time.Now()) client := grab.NewClient() + + // workaround to resolve cdn error "tls: protocol version not supported" + client.HTTPClient.(*http.Client).Transport = &http.Transport{ + Proxy: http.ProxyFromEnvironment, + // avoid using http/2 by setting non-nil TLSClientConfig + TLSClientConfig: &tls.Config{}, + } + client.UserAgent = fmt.Sprintf("tiup/%s", version.NewTiUPVersion().SemVer()) req, err := grab.NewRequest(to, url) if err != nil { @@ -326,6 +335,7 @@ L: } progress.SetCurrent(resp.BytesComplete()) case <-resp.Done: + progress.SetCurrent(resp.BytesComplete()) progress.Finish() break L } diff --git a/pkg/repository/progress.go b/pkg/repository/progress.go index 951dc99a88..f8df7e2cdf 100644 --- a/pkg/repository/progress.go +++ b/pkg/repository/progress.go @@ -52,6 +52,5 @@ func (p *ProgressBar) SetCurrent(size int64) { // Finish implement the DownloadProgress interface func (p *ProgressBar) Finish() { - p.bar.SetCurrent(p.size) p.bar.Finish() } diff --git a/pkg/tidbver/tidbver.go b/pkg/tidbver/tidbver.go index bfa169899e..eae857f78e 100644 --- a/pkg/tidbver/tidbver.go +++ b/pkg/tidbver/tidbver.go @@ -26,6 +26,13 @@ func TiDBSupportSecureBoot(version string) bool { return semver.Compare(version, "v5.3.0") >= 0 || strings.Contains(version, "nightly") } +// TiDBSupportUpgradeAPI return if given version of TiDB support upgrade API +func TiDBSupportUpgradeAPI(version string) bool { + return semver.Compare(version, "v7.4.0") >= 0 || + (semver.MajorMinor(version) == "v7.1" && semver.Compare(version, "v7.1.2") >= 0) || + strings.Contains(version, "nightly") +} + // TiKVSupportAdvertiseStatusAddr return if given version of TiKV support --advertise-status-addr func TiKVSupportAdvertiseStatusAddr(version string) bool { // TiKV support --advertise-status-addr since v4.0.1 diff --git a/pkg/utils/retry.go b/pkg/utils/retry.go index 0a6c65a409..a958b8c0b2 100644 --- a/pkg/utils/retry.go +++ b/pkg/utils/retry.go @@ -77,8 +77,9 @@ func Retry(doFunc func() error, opts ...RetryOption) error { // call the function var attemptCount int64 + var err error for attemptCount = 0; attemptCount < cfg.Attempts; attemptCount++ { - if err := doFunc(); err == nil { + if err = doFunc(); err == nil { return nil } @@ -91,7 +92,7 @@ func Retry(doFunc func() error, opts ...RetryOption) error { } } - return fmt.Errorf("operation exceeds the max retry attempts of %d", cfg.Attempts) + return fmt.Errorf("operation exceeds the max retry attempts of %d. error of last attempt: %s", cfg.Attempts, err) } // IsTimeoutOrMaxRetry return true if it's timeout or reach max retry. diff --git a/pkg/version/version.go b/pkg/version/version.go index 07dd9d7118..8645e63060 100644 --- a/pkg/version/version.go +++ b/pkg/version/version.go @@ -23,7 +23,7 @@ var ( // TiUPVerMinor is the minor version of TiUP TiUPVerMinor = 13 // TiUPVerPatch is the patch version of TiUP - TiUPVerPatch = 0 + TiUPVerPatch = 1 // TiUPVerName is an alternative name of the version TiUPVerName = "tiup" // GitHash is the current git commit hash