From 9d1ba4b8357a1b34d02556cbd77236de1df1b1dc Mon Sep 17 00:00:00 2001 From: lastincisor <64511824+lastincisor@users.noreply.github.com> Date: Fri, 15 Sep 2023 17:12:09 +0800 Subject: [PATCH 01/14] grafana for Multiple instances of the same host (#2277) --- pkg/cluster/spec/grafana.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cluster/spec/grafana.go b/pkg/cluster/spec/grafana.go index ae6b39f817..d4f1824444 100644 --- a/pkg/cluster/spec/grafana.go +++ b/pkg/cluster/spec/grafana.go @@ -181,7 +181,7 @@ func (i *GrafanaInstance) InitConfig( // transfer config spec := i.InstanceSpec.(*GrafanaSpec) - fp = filepath.Join(paths.Cache, fmt.Sprintf("grafana_%s.ini", i.GetHost())) + fp = filepath.Join(paths.Cache, fmt.Sprintf("grafana_%s_%d.ini", i.GetHost(), i.GetPort())) if err := config.NewGrafanaConfig(i.GetHost(), paths.Deploy). WithPort(uint64(i.GetPort())). WithUsername(spec.Username). From 456f37af225e3918cfb48955236e8285a81ef86c Mon Sep 17 00:00:00 2001 From: nexustar Date: Mon, 25 Sep 2023 15:28:46 +0800 Subject: [PATCH 02/14] add cdn workaround (#2285) --- pkg/repository/mirror.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/repository/mirror.go b/pkg/repository/mirror.go index f88cc75468..89d037c5de 100644 --- a/pkg/repository/mirror.go +++ b/pkg/repository/mirror.go @@ -15,6 +15,7 @@ package repository import ( "bytes" + "crypto/tls" "encoding/json" stderrors "errors" "fmt" @@ -293,6 +294,14 @@ func (l *httpMirror) downloadFile(url string, to string, maxSize int64) (io.Read }(time.Now()) client := grab.NewClient() + + // workaround to resolve cdn error "tls: protocol version not supported" + client.HTTPClient.(*http.Client).Transport = &http.Transport{ + Proxy: http.ProxyFromEnvironment, + // avoid using http/2 by setting non-nil TLSClientConfig + TLSClientConfig: &tls.Config{}, + } + client.UserAgent = fmt.Sprintf("tiup/%s", version.NewTiUPVersion().SemVer()) req, err := grab.NewRequest(to, url) if err != nil { From f3b080677ac73e0ce97574224bdc5c11142e9a3d Mon Sep 17 00:00:00 2001 From: nexustar Date: Mon, 25 Sep 2023 16:40:46 +0800 Subject: [PATCH 03/14] cluster/dm: support ignore version check when upgrade (#2282) --- components/cluster/command/upgrade.go | 4 +++- components/dm/command/upgrade.go | 5 +++-- pkg/cluster/manager/upgrade.go | 7 +++++-- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/components/cluster/command/upgrade.go b/components/cluster/command/upgrade.go index 2afe973a07..6ee3941e65 100644 --- a/components/cluster/command/upgrade.go +++ b/components/cluster/command/upgrade.go @@ -20,6 +20,7 @@ import ( func newUpgradeCmd() *cobra.Command { offlineMode := false + ignoreVersionCheck := false cmd := &cobra.Command{ Use: "upgrade ", @@ -38,7 +39,7 @@ func newUpgradeCmd() *cobra.Command { teleCommand = append(teleCommand, scrubClusterName(clusterName)) teleCommand = append(teleCommand, version) - return cm.Upgrade(clusterName, version, gOpt, skipConfirm, offlineMode) + return cm.Upgrade(clusterName, version, gOpt, skipConfirm, offlineMode, ignoreVersionCheck) }, ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { switch len(args) { @@ -53,6 +54,7 @@ func newUpgradeCmd() *cobra.Command { cmd.Flags().Uint64Var(&gOpt.APITimeout, "transfer-timeout", 600, "Timeout in seconds when transferring PD and TiKV store leaders, also for TiCDC drain one capture") cmd.Flags().BoolVarP(&gOpt.IgnoreConfigCheck, "ignore-config-check", "", false, "Ignore the config check result") cmd.Flags().BoolVarP(&offlineMode, "offline", "", false, "Upgrade a stopped cluster") + cmd.Flags().BoolVarP(&ignoreVersionCheck, "ignore-version-check", "", false, "Ignore checking if target version is bigger than current version") cmd.Flags().StringVar(&gOpt.SSHCustomScripts.BeforeRestartInstance.Raw, "pre-upgrade-script", "", "(EXPERIMENTAL) Custom script to be executed on each server before the server is upgraded") cmd.Flags().StringVar(&gOpt.SSHCustomScripts.AfterRestartInstance.Raw, "post-upgrade-script", "", "(EXPERIMENTAL) Custom script to be executed on each server after the server is upgraded") diff --git a/components/dm/command/upgrade.go b/components/dm/command/upgrade.go index bb6abf0ac7..fda3185089 100644 --- a/components/dm/command/upgrade.go +++ b/components/dm/command/upgrade.go @@ -19,7 +19,7 @@ import ( func newUpgradeCmd() *cobra.Command { offlineMode := false - + ignoreVersionCheck := false cmd := &cobra.Command{ Use: "upgrade ", Short: "Upgrade a specified DM cluster", @@ -28,7 +28,7 @@ func newUpgradeCmd() *cobra.Command { return cmd.Help() } - return cm.Upgrade(args[0], args[1], gOpt, skipConfirm, offlineMode) + return cm.Upgrade(args[0], args[1], gOpt, skipConfirm, offlineMode, ignoreVersionCheck) }, ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { switch len(args) { @@ -41,6 +41,7 @@ func newUpgradeCmd() *cobra.Command { } cmd.Flags().BoolVarP(&offlineMode, "offline", "", false, "Upgrade a stopped cluster") + cmd.Flags().BoolVarP(&ignoreVersionCheck, "ignore-version-check", "", false, "Ignore checking if target version is higher than current version") return cmd } diff --git a/pkg/cluster/manager/upgrade.go b/pkg/cluster/manager/upgrade.go index cf28dbadb0..06a668b999 100644 --- a/pkg/cluster/manager/upgrade.go +++ b/pkg/cluster/manager/upgrade.go @@ -37,7 +37,7 @@ import ( ) // Upgrade the cluster. -func (m *Manager) Upgrade(name string, clusterVersion string, opt operator.Options, skipConfirm, offline bool) error { +func (m *Manager) Upgrade(name string, clusterVersion string, opt operator.Options, skipConfirm, offline, ignoreVersionCheck bool) error { if err := clusterutil.ValidateClusterNameOrError(name); err != nil { return err } @@ -68,7 +68,10 @@ func (m *Manager) Upgrade(name string, clusterVersion string, opt operator.Optio ) if err := versionCompare(base.Version, clusterVersion); err != nil { - return err + if !ignoreVersionCheck { + return err + } + m.logger.Warnf(color.RedString("There is no guarantee that the cluster can be downgraded. Be careful before you continue.")) } if !skipConfirm { From 1c9c153431a58f57c49ed046dbf79955954b32e5 Mon Sep 17 00:00:00 2001 From: nexustar Date: Mon, 25 Sep 2023 20:04:46 +0800 Subject: [PATCH 04/14] mirror: fix progress bar is not accurate (#2284) --- pkg/repository/mirror.go | 1 + pkg/repository/progress.go | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/repository/mirror.go b/pkg/repository/mirror.go index 89d037c5de..7fb446f704 100644 --- a/pkg/repository/mirror.go +++ b/pkg/repository/mirror.go @@ -335,6 +335,7 @@ L: } progress.SetCurrent(resp.BytesComplete()) case <-resp.Done: + progress.SetCurrent(resp.BytesComplete()) progress.Finish() break L } diff --git a/pkg/repository/progress.go b/pkg/repository/progress.go index 951dc99a88..f8df7e2cdf 100644 --- a/pkg/repository/progress.go +++ b/pkg/repository/progress.go @@ -52,6 +52,5 @@ func (p *ProgressBar) SetCurrent(size int64) { // Finish implement the DownloadProgress interface func (p *ProgressBar) Finish() { - p.bar.SetCurrent(p.size) p.bar.Finish() } From 3653dc521afbd0da505cdbe4bcbc92c39fe66b74 Mon Sep 17 00:00:00 2001 From: nexustar Date: Mon, 25 Sep 2023 21:10:46 +0800 Subject: [PATCH 05/14] bump version to v1.13.1 (#2286) --- CHANGELOG.md | 14 ++++++++++++++ pkg/version/version.go | 2 +- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c84df37b04..a27d643f30 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,19 @@ TiUP Changelog +## [1.13.1] 2023-09-25 + +### Fixes + +- Increase timeout when publish package in `tiup` (#2269, @nexustar) +- Fix pd microservice component id in `tiup-playground` (#2272, @iosmanthus) +- Fix grafana for multiple instances using same host in `tiup-cluster` and `tiup-dm` (#2277, @lastincisor) +- Add cdn workaround (#2285, @nexustar) +- Mirror: fix progress bar is not accurate (#2284, @nexustar) + +### Improvement + +- Support ignore version check when upgrade in `tiup-cluster` and `tiup-dm` (#2282, @nexustar) + ## [1.13.0] 2023-08-26 ### New Features diff --git a/pkg/version/version.go b/pkg/version/version.go index 07dd9d7118..8645e63060 100644 --- a/pkg/version/version.go +++ b/pkg/version/version.go @@ -23,7 +23,7 @@ var ( // TiUPVerMinor is the minor version of TiUP TiUPVerMinor = 13 // TiUPVerPatch is the patch version of TiUP - TiUPVerPatch = 0 + TiUPVerPatch = 1 // TiUPVerName is an alternative name of the version TiUPVerName = "tiup" // GitHash is the current git commit hash From 4f5d9e30688c4e6d625f03ab5042cc4d2d93c593 Mon Sep 17 00:00:00 2001 From: nexustar Date: Sun, 8 Oct 2023 19:15:22 +0800 Subject: [PATCH 06/14] Support tidb upgrade api (#2287) --- pkg/cluster/api/tidbapi.go | 78 ++++++++++++++++++++++++++++++++ pkg/cluster/manager/patch.go | 3 +- pkg/cluster/manager/reload.go | 2 +- pkg/cluster/manager/upgrade.go | 2 +- pkg/cluster/operation/upgrade.go | 32 ++++++++++++- pkg/tidbver/tidbver.go | 5 ++ 6 files changed, 118 insertions(+), 4 deletions(-) create mode 100644 pkg/cluster/api/tidbapi.go diff --git a/pkg/cluster/api/tidbapi.go b/pkg/cluster/api/tidbapi.go new file mode 100644 index 0000000000..ddb3a36403 --- /dev/null +++ b/pkg/cluster/api/tidbapi.go @@ -0,0 +1,78 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "context" + "crypto/tls" + "fmt" + "time" + + "github.com/pingcap/tiup/pkg/utils" +) + +// TiDBClient is client for access TiKVCDC Open API +type TiDBClient struct { + urls []string + client *utils.HTTPClient + ctx context.Context +} + +// NewTiDBClient return a `TiDBClient` +func NewTiDBClient(ctx context.Context, addresses []string, timeout time.Duration, tlsConfig *tls.Config) *TiDBClient { + httpPrefix := "http" + if tlsConfig != nil { + httpPrefix = "https" + } + urls := make([]string, 0, len(addresses)) + for _, addr := range addresses { + urls = append(urls, fmt.Sprintf("%s://%s", httpPrefix, addr)) + } + + return &TiDBClient{ + urls: urls, + client: utils.NewHTTPClient(timeout, tlsConfig), + ctx: ctx, + } +} + +func (c *TiDBClient) getEndpoints(api string) (endpoints []string) { + for _, url := range c.urls { + endpoints = append(endpoints, fmt.Sprintf("%s%s", url, api)) + } + return endpoints +} + +// StartUpgrade sends the start upgrade message to the TiDB server +func (c *TiDBClient) StartUpgrade() error { + api := "/upgrade/start" + endpoints := c.getEndpoints(api) + _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { + return c.client.Post(c.ctx, endpoint, nil) + }) + + return err +} + +// FinishUpgrade sends the finish upgrade message to the TiDB server +func (c *TiDBClient) FinishUpgrade() error { + api := "/upgrade/finish" + endpoints := c.getEndpoints(api) + _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { + return c.client.Post(c.ctx, endpoint, nil) + }) + + return err +} + diff --git a/pkg/cluster/manager/patch.go b/pkg/cluster/manager/patch.go index 3babc36b82..087ded0ccb 100644 --- a/pkg/cluster/manager/patch.go +++ b/pkg/cluster/manager/patch.go @@ -104,7 +104,8 @@ func (m *Manager) Patch(name string, packagePath string, opt operator.Options, o if offline { return nil } - return operator.Upgrade(ctx, topo, opt, tlsCfg, base.Version) + // TBD: should patch be treated as an upgrade? + return operator.Upgrade(ctx, topo, opt, tlsCfg, base.Version, false) }). Build() diff --git a/pkg/cluster/manager/reload.go b/pkg/cluster/manager/reload.go index 2a78e3fe41..21ec4f6b7a 100644 --- a/pkg/cluster/manager/reload.go +++ b/pkg/cluster/manager/reload.go @@ -124,7 +124,7 @@ func (m *Manager) Reload(name string, gOpt operator.Options, skipRestart, skipCo return err } b.Func("Upgrade Cluster", func(ctx context.Context) error { - return operator.Upgrade(ctx, topo, gOpt, tlsCfg, base.Version) + return operator.Upgrade(ctx, topo, gOpt, tlsCfg, base.Version, false) }) } diff --git a/pkg/cluster/manager/upgrade.go b/pkg/cluster/manager/upgrade.go index 06a668b999..fe76ff5ac5 100644 --- a/pkg/cluster/manager/upgrade.go +++ b/pkg/cluster/manager/upgrade.go @@ -237,7 +237,7 @@ Do you want to continue? [y/N]:`, if offline { return nil } - return operator.Upgrade(ctx, topo, opt, tlsCfg, base.Version) + return operator.Upgrade(ctx, topo, opt, tlsCfg, base.Version, true) }). Build() diff --git a/pkg/cluster/operation/upgrade.go b/pkg/cluster/operation/upgrade.go index a34731a9fb..80ee1679b2 100644 --- a/pkg/cluster/operation/upgrade.go +++ b/pkg/cluster/operation/upgrade.go @@ -39,13 +39,14 @@ var ( increaseLimitPoint = checkpoint.Register() ) -// Upgrade the cluster. +// Upgrade the cluster. (actually, it's rolling restart) func Upgrade( ctx context.Context, topo spec.Topology, options Options, tlsCfg *tls.Config, currentVersion string, + isUpgrade bool, ) error { roleFilter := set.NewStringSet(options.Roles...) nodeFilter := set.NewStringSet(options.Nodes...) @@ -70,6 +71,7 @@ func Upgrade( var origRegionScheduleLimit int var err error + var tidbClient *api.TiDBClient var pdEndpoints []string forcePDEndpoints := os.Getenv(EnvNamePDEndpointOverwrite) // custom set PD endpoint list @@ -100,6 +102,21 @@ func Upgrade( } }() } + case spec.ComponentTiDB: + dbs := topo.(*spec.Specification).TiDBServers + endpoints := []string{} + for _, db := range dbs { + endpoints = append(endpoints, utils.JoinHostPort(db.GetManageHost(), db.StatusPort)) + } + + if isUpgrade && tidbver.TiDBSupportUpgradeAPI(currentVersion) { + tidbClient = api.NewTiDBClient(ctx, endpoints, 10*time.Second, tlsCfg) + err = tidbClient.StartUpgrade() + if err != nil { + return err + } + } + default: // do nothing, kept for future usage with other components } @@ -178,6 +195,19 @@ func Upgrade( return err } } + + switch component.Name() { + case spec.ComponentTiDB: + if isUpgrade && tidbver.TiDBSupportUpgradeAPI(currentVersion) { + err = tidbClient.FinishUpgrade() + if err != nil { + return err + } + } + + default: + // do nothing, kept for future usage with other components + } } if topo.GetMonitoredOptions() == nil { diff --git a/pkg/tidbver/tidbver.go b/pkg/tidbver/tidbver.go index bfa169899e..18cc53e74f 100644 --- a/pkg/tidbver/tidbver.go +++ b/pkg/tidbver/tidbver.go @@ -26,6 +26,11 @@ func TiDBSupportSecureBoot(version string) bool { return semver.Compare(version, "v5.3.0") >= 0 || strings.Contains(version, "nightly") } +// TiDBSupportUpgradeAPI return if given version of TiDB support upgrade API +func TiDBSupportUpgradeAPI(version string) bool { + return semver.Compare(version, "v7.4.0") >= 0 || strings.Contains(version, "nightly") +} + // TiKVSupportAdvertiseStatusAddr return if given version of TiKV support --advertise-status-addr func TiKVSupportAdvertiseStatusAddr(version string) bool { // TiKV support --advertise-status-addr since v4.0.1 From 8fd533dbf28d1cdfd42f0194b9e58ed2b0e95dcd Mon Sep 17 00:00:00 2001 From: kaaaaaaang Date: Tue, 17 Oct 2023 17:41:41 +0800 Subject: [PATCH 07/14] update tidbver.TiDBSupportUpgradeAPI (#2289) * update tidbver.TiDBSupportUpgradeAPI * fix --- pkg/cluster/manager/patch.go | 2 +- pkg/cluster/manager/reload.go | 2 +- pkg/cluster/manager/upgrade.go | 2 +- pkg/cluster/operation/upgrade.go | 6 +++--- pkg/tidbver/tidbver.go | 4 +++- 5 files changed, 9 insertions(+), 7 deletions(-) diff --git a/pkg/cluster/manager/patch.go b/pkg/cluster/manager/patch.go index 087ded0ccb..e4c5669f29 100644 --- a/pkg/cluster/manager/patch.go +++ b/pkg/cluster/manager/patch.go @@ -105,7 +105,7 @@ func (m *Manager) Patch(name string, packagePath string, opt operator.Options, o return nil } // TBD: should patch be treated as an upgrade? - return operator.Upgrade(ctx, topo, opt, tlsCfg, base.Version, false) + return operator.Upgrade(ctx, topo, opt, tlsCfg, base.Version, base.Version) }). Build() diff --git a/pkg/cluster/manager/reload.go b/pkg/cluster/manager/reload.go index 21ec4f6b7a..226c66535e 100644 --- a/pkg/cluster/manager/reload.go +++ b/pkg/cluster/manager/reload.go @@ -124,7 +124,7 @@ func (m *Manager) Reload(name string, gOpt operator.Options, skipRestart, skipCo return err } b.Func("Upgrade Cluster", func(ctx context.Context) error { - return operator.Upgrade(ctx, topo, gOpt, tlsCfg, base.Version, false) + return operator.Upgrade(ctx, topo, gOpt, tlsCfg, base.Version, base.Version) }) } diff --git a/pkg/cluster/manager/upgrade.go b/pkg/cluster/manager/upgrade.go index fe76ff5ac5..32354d4f7b 100644 --- a/pkg/cluster/manager/upgrade.go +++ b/pkg/cluster/manager/upgrade.go @@ -237,7 +237,7 @@ Do you want to continue? [y/N]:`, if offline { return nil } - return operator.Upgrade(ctx, topo, opt, tlsCfg, base.Version, true) + return operator.Upgrade(ctx, topo, opt, tlsCfg, base.Version, clusterVersion) }). Build() diff --git a/pkg/cluster/operation/upgrade.go b/pkg/cluster/operation/upgrade.go index 80ee1679b2..5f4ab04b3f 100644 --- a/pkg/cluster/operation/upgrade.go +++ b/pkg/cluster/operation/upgrade.go @@ -46,7 +46,7 @@ func Upgrade( options Options, tlsCfg *tls.Config, currentVersion string, - isUpgrade bool, + targetVersion string, ) error { roleFilter := set.NewStringSet(options.Roles...) nodeFilter := set.NewStringSet(options.Nodes...) @@ -109,7 +109,7 @@ func Upgrade( endpoints = append(endpoints, utils.JoinHostPort(db.GetManageHost(), db.StatusPort)) } - if isUpgrade && tidbver.TiDBSupportUpgradeAPI(currentVersion) { + if currentVersion != targetVersion && tidbver.TiDBSupportUpgradeAPI(currentVersion) && tidbver.TiDBSupportUpgradeAPI(targetVersion) { tidbClient = api.NewTiDBClient(ctx, endpoints, 10*time.Second, tlsCfg) err = tidbClient.StartUpgrade() if err != nil { @@ -198,7 +198,7 @@ func Upgrade( switch component.Name() { case spec.ComponentTiDB: - if isUpgrade && tidbver.TiDBSupportUpgradeAPI(currentVersion) { + if currentVersion != targetVersion && tidbver.TiDBSupportUpgradeAPI(currentVersion) && tidbver.TiDBSupportUpgradeAPI(targetVersion) { err = tidbClient.FinishUpgrade() if err != nil { return err diff --git a/pkg/tidbver/tidbver.go b/pkg/tidbver/tidbver.go index 18cc53e74f..eae857f78e 100644 --- a/pkg/tidbver/tidbver.go +++ b/pkg/tidbver/tidbver.go @@ -28,7 +28,9 @@ func TiDBSupportSecureBoot(version string) bool { // TiDBSupportUpgradeAPI return if given version of TiDB support upgrade API func TiDBSupportUpgradeAPI(version string) bool { - return semver.Compare(version, "v7.4.0") >= 0 || strings.Contains(version, "nightly") + return semver.Compare(version, "v7.4.0") >= 0 || + (semver.MajorMinor(version) == "v7.1" && semver.Compare(version, "v7.1.2") >= 0) || + strings.Contains(version, "nightly") } // TiKVSupportAdvertiseStatusAddr return if given version of TiKV support --advertise-status-addr From 9e47d78b5518999efc3763168e891d6910f26099 Mon Sep 17 00:00:00 2001 From: nexustar Date: Tue, 17 Oct 2023 18:24:29 +0800 Subject: [PATCH 08/14] show more retry error message (#2290) --- pkg/utils/retry.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/utils/retry.go b/pkg/utils/retry.go index 0a6c65a409..a958b8c0b2 100644 --- a/pkg/utils/retry.go +++ b/pkg/utils/retry.go @@ -77,8 +77,9 @@ func Retry(doFunc func() error, opts ...RetryOption) error { // call the function var attemptCount int64 + var err error for attemptCount = 0; attemptCount < cfg.Attempts; attemptCount++ { - if err := doFunc(); err == nil { + if err = doFunc(); err == nil { return nil } @@ -91,7 +92,7 @@ func Retry(doFunc func() error, opts ...RetryOption) error { } } - return fmt.Errorf("operation exceeds the max retry attempts of %d", cfg.Attempts) + return fmt.Errorf("operation exceeds the max retry attempts of %d. error of last attempt: %s", cfg.Attempts, err) } // IsTimeoutOrMaxRetry return true if it's timeout or reach max retry. From 79d3011137564f54f1c5df6b2e021995d41392dc Mon Sep 17 00:00:00 2001 From: xhe Date: Wed, 25 Oct 2023 17:56:03 +0800 Subject: [PATCH 09/14] cluster: support tiproxy (#2271) --- embed/templates/config/prometheus.yml.tpl | 16 ++ embed/templates/scripts/run_tiproxy.sh.tpl | 14 + pkg/cluster/ansible/import_test.go | 2 + pkg/cluster/ansible/test-data/meta.yaml | 2 + pkg/cluster/spec/bindversion.go | 2 + pkg/cluster/spec/monitoring.go | 7 + pkg/cluster/spec/spec.go | 5 + pkg/cluster/spec/tiproxy.go | 312 +++++++++++++++++++++ pkg/cluster/task/update_meta.go | 11 +- pkg/cluster/template/config/prometheus.go | 7 + pkg/cluster/template/scripts/tiproxy.go | 49 ++++ 11 files changed, 426 insertions(+), 1 deletion(-) create mode 100644 embed/templates/scripts/run_tiproxy.sh.tpl create mode 100644 pkg/cluster/spec/tiproxy.go create mode 100644 pkg/cluster/template/scripts/tiproxy.go diff --git a/embed/templates/config/prometheus.yml.tpl b/embed/templates/config/prometheus.yml.tpl index 10bef9607d..238eb179ab 100644 --- a/embed/templates/config/prometheus.yml.tpl +++ b/embed/templates/config/prometheus.yml.tpl @@ -109,6 +109,22 @@ scrape_configs: - targets: {{- range .TiDBStatusAddrs}} - '{{.}}' +{{- end}} + - job_name: "tiproxy" + honor_labels: true # don't overwrite job & instance labels + metrics_path: /api/metrics +{{- if .TLSEnabled}} + scheme: https + tls_config: + insecure_skip_verify: false + ca_file: ../tls/ca.crt + cert_file: ../tls/prometheus.crt + key_file: ../tls/prometheus.pem +{{- end}} + static_configs: + - targets: +{{- range .TiProxyStatusAddrs}} + - '{{.}}' {{- end}} - job_name: "tikv" honor_labels: true # don't overwrite job & instance labels diff --git a/embed/templates/scripts/run_tiproxy.sh.tpl b/embed/templates/scripts/run_tiproxy.sh.tpl new file mode 100644 index 0000000000..83362c9121 --- /dev/null +++ b/embed/templates/scripts/run_tiproxy.sh.tpl @@ -0,0 +1,14 @@ +#!/bin/bash +set -e + +# WARNING: This file was auto-generated. Do not edit! +# All your edit might be overwritten! +DEPLOY_DIR={{.DeployDir}} +cd "${DEPLOY_DIR}" || exit 1 + +{{- if .NumaNode}} +exec numactl --cpunodebind={{.NumaNode}} --membind={{.NumaNode}} bin/tiproxy \ +{{- else}} +exec bin/tiproxy \ +{{- end}} + --config conf/tiproxy.toml diff --git a/pkg/cluster/ansible/import_test.go b/pkg/cluster/ansible/import_test.go index 970bd68c53..8c1f0fe1b6 100644 --- a/pkg/cluster/ansible/import_test.go +++ b/pkg/cluster/ansible/import_test.go @@ -126,6 +126,7 @@ server_configs: pd: {} tidb_dashboard: {} tiflash: {} + tiproxy: {} tiflash-learner: {} pump: {} drainer: {} @@ -135,6 +136,7 @@ server_configs: tidb_servers: [] tikv_servers: [] tiflash_servers: [] +tiproxy_servers: [] pd_servers: [] monitoring_servers: [] `) diff --git a/pkg/cluster/ansible/test-data/meta.yaml b/pkg/cluster/ansible/test-data/meta.yaml index d5c1263cbb..e143d7cd72 100644 --- a/pkg/cluster/ansible/test-data/meta.yaml +++ b/pkg/cluster/ansible/test-data/meta.yaml @@ -21,6 +21,7 @@ topology: pd: {} tidb_dashboard: {} tiflash: {} + tiproxy: {} tiflash-learner: {} pump: {} drainer: {} @@ -193,3 +194,4 @@ topology: data_dir: data/alertmanager-9093 arch: amd64 os: linux + tiproxy_servers: [] diff --git a/pkg/cluster/spec/bindversion.go b/pkg/cluster/spec/bindversion.go index 6e6ac9fcf8..6a0c36328b 100644 --- a/pkg/cluster/spec/bindversion.go +++ b/pkg/cluster/spec/bindversion.go @@ -31,6 +31,8 @@ func TiDBComponentVersion(comp, version string) string { ComponentTiSpark, ComponentTiKVCDC: // TiKV-CDC use individual version. return "" + case ComponentTiProxy: + return "nightly" default: return version } diff --git a/pkg/cluster/spec/monitoring.go b/pkg/cluster/spec/monitoring.go index a5753448eb..91200eae2b 100644 --- a/pkg/cluster/spec/monitoring.go +++ b/pkg/cluster/spec/monitoring.go @@ -247,6 +247,13 @@ func (i *MonitorInstance) InitConfig( cfig.AddTiDB(db.Host, uint64(db.StatusPort)) } } + if servers, found := topoHasField("TiProxyServers"); found { + for i := 0; i < servers.Len(); i++ { + db := servers.Index(i).Interface().(*TiProxySpec) + uniqueHosts.Insert(db.Host) + cfig.AddTiProxy(db.Host, uint64(db.StatusPort)) + } + } if servers, found := topoHasField("TiFlashServers"); found { for i := 0; i < servers.Len(); i++ { flash := servers.Index(i).Interface().(*TiFlashSpec) diff --git a/pkg/cluster/spec/spec.go b/pkg/cluster/spec/spec.go index 47e67251a1..ebefdc466d 100644 --- a/pkg/cluster/spec/spec.go +++ b/pkg/cluster/spec/spec.go @@ -109,6 +109,7 @@ type ( PD map[string]any `yaml:"pd"` Dashboard map[string]any `yaml:"tidb_dashboard"` TiFlash map[string]any `yaml:"tiflash"` + TiProxy map[string]any `yaml:"tiproxy"` TiFlashLearner map[string]any `yaml:"tiflash-learner"` Pump map[string]any `yaml:"pump"` Drainer map[string]any `yaml:"drainer"` @@ -125,6 +126,7 @@ type ( TiDBServers []*TiDBSpec `yaml:"tidb_servers"` TiKVServers []*TiKVSpec `yaml:"tikv_servers"` TiFlashServers []*TiFlashSpec `yaml:"tiflash_servers"` + TiProxyServers []*TiProxySpec `yaml:"tiproxy_servers"` PDServers []*PDSpec `yaml:"pd_servers"` DashboardServers []*DashboardSpec `yaml:"tidb_dashboard_servers,omitempty"` PumpServers []*PumpSpec `yaml:"pump_servers,omitempty"` @@ -502,6 +504,7 @@ func (s *Specification) Merge(that Topology) Topology { PDServers: append(s.PDServers, spec.PDServers...), DashboardServers: append(s.DashboardServers, spec.DashboardServers...), TiFlashServers: append(s.TiFlashServers, spec.TiFlashServers...), + TiProxyServers: append(s.TiProxyServers, spec.TiProxyServers...), PumpServers: append(s.PumpServers, spec.PumpServers...), Drainers: append(s.Drainers, spec.Drainers...), CDCServers: append(s.CDCServers, spec.CDCServers...), @@ -712,6 +715,7 @@ func (s *Specification) ComponentsByStartOrder() (comps []Component) { // "pd", "dashboard", "tikv", "pump", "tidb", "tiflash", "drainer", "cdc", "tikv-cdc", "prometheus", "grafana", "alertmanager" comps = append(comps, &PDComponent{s}) comps = append(comps, &DashboardComponent{s}) + comps = append(comps, &TiProxyComponent{s}) comps = append(comps, &TiKVComponent{s}) comps = append(comps, &PumpComponent{s}) comps = append(comps, &TiDBComponent{s}) @@ -739,6 +743,7 @@ func (s *Specification) ComponentsByUpdateOrder(curVer string) (comps []Componen } comps = append(comps, &PDComponent{s}) comps = append(comps, &DashboardComponent{s}) + comps = append(comps, &TiProxyComponent{s}) comps = append(comps, &TiKVComponent{s}) comps = append(comps, &PumpComponent{s}) comps = append(comps, &TiDBComponent{s}) diff --git a/pkg/cluster/spec/tiproxy.go b/pkg/cluster/spec/tiproxy.go new file mode 100644 index 0000000000..09c19d1051 --- /dev/null +++ b/pkg/cluster/spec/tiproxy.go @@ -0,0 +1,312 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package spec + +import ( + "bytes" + "context" + "crypto/tls" + "fmt" + "path/filepath" + "strings" + "time" + + "github.com/pingcap/tiup/pkg/cluster/ctxt" + "github.com/pingcap/tiup/pkg/cluster/template/scripts" + "github.com/pingcap/tiup/pkg/meta" + "github.com/pingcap/tiup/pkg/utils" + "github.com/prometheus/common/expfmt" +) + +func proxyUptimeByHost(host string, port int, timeout time.Duration, tlsCfg *tls.Config) time.Duration { + if timeout < time.Second { + timeout = statusQueryTimeout + } + + scheme := "http" + if tlsCfg != nil { + scheme = "https" + } + url := fmt.Sprintf("%s://%s/api/metrics", scheme, utils.JoinHostPort(host, port)) + + client := utils.NewHTTPClient(timeout, tlsCfg) + + body, err := client.Get(context.TODO(), url) + if err != nil || body == nil { + return 0 + } + + var parser expfmt.TextParser + reader := bytes.NewReader(body) + mf, err := parser.TextToMetricFamilies(reader) + if err != nil { + return 0 + } + + now := time.Now() + for k, v := range mf { + if k == promMetricStartTimeSeconds { + ms := v.GetMetric() + if len(ms) >= 1 { + startTime := ms[0].Gauge.GetValue() + return now.Sub(time.Unix(int64(startTime), 0)) + } + return 0 + } + } + + return 0 +} + +// TiProxySpec represents the TiProxy topology specification in topology.yaml +type TiProxySpec struct { + Host string `yaml:"host"` + ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"` + SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"` + Port int `yaml:"port" default:"6000"` + StatusPort int `yaml:"status_port" default:"3080"` + DeployDir string `yaml:"deploy_dir,omitempty"` + NumaNode string `yaml:"numa_node,omitempty" validate:"numa_node:editable"` + Config map[string]any `yaml:"config,omitempty" validate:"config:ignore"` + Arch string `yaml:"arch,omitempty"` + OS string `yaml:"os,omitempty"` +} + +// Role returns the component role of the instance +func (s *TiProxySpec) Role() string { + return ComponentTiProxy +} + +// SSH returns the host and SSH port of the instance +func (s *TiProxySpec) SSH() (string, int) { + return s.Host, s.SSHPort +} + +// GetMainPort returns the main port of the instance +func (s *TiProxySpec) GetMainPort() int { + return s.Port +} + +// GetManageHost returns the manage host of the instance +func (s *TiProxySpec) GetManageHost() string { + if s.ManageHost != "" { + return s.ManageHost + } + return s.Host +} + +// IsImported returns if the node is imported from TiDB-Ansible +func (s *TiProxySpec) IsImported() bool { + return false +} + +// IgnoreMonitorAgent returns if the node does not have monitor agents available +func (s *TiProxySpec) IgnoreMonitorAgent() bool { + return false +} + +// TiProxyComponent represents TiProxy component. +type TiProxyComponent struct{ Topology *Specification } + +// Name implements Component interface. +func (c *TiProxyComponent) Name() string { + return ComponentTiProxy +} + +// Role implements Component interface. +func (c *TiProxyComponent) Role() string { + return ComponentTiProxy +} + +// Instances implements Component interface. +func (c *TiProxyComponent) Instances() []Instance { + ins := make([]Instance, 0, len(c.Topology.TiProxyServers)) + for _, s := range c.Topology.TiProxyServers { + s := s + instance := &TiProxyInstance{BaseInstance{ + InstanceSpec: s, + Name: c.Name(), + Host: s.Host, + ManageHost: s.ManageHost, + Port: s.Port, + SSHP: s.SSHPort, + Source: ComponentTiProxy, + Ports: []int{ + s.Port, + s.StatusPort, + }, + Dirs: []string{ + s.DeployDir, + }, + StatusFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config, _ ...string) string { + return statusByHost(s.Host, s.StatusPort, "/api/debug/health", timeout, tlsCfg) + }, + UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { + return proxyUptimeByHost(s.Host, s.StatusPort, timeout, tlsCfg) + }, + }, c.Topology} + + ins = append(ins, instance) + } + return ins +} + +// TiProxyInstance represent the TiProxy instance. +type TiProxyInstance struct { + BaseInstance + topo Topology +} + +// ScaleConfig deploy temporary config on scaling +func (i *TiProxyInstance) ScaleConfig( + ctx context.Context, + e ctxt.Executor, + topo Topology, + clusterName, + clusterVersion, + user string, + paths meta.DirPaths, +) error { + s := i.topo + defer func() { + i.topo = s + }() + i.topo = mustBeClusterTopo(topo) + + return i.InitConfig(ctx, e, clusterName, clusterVersion, user, paths) +} + +func (i *TiProxyInstance) checkConfig( + cfg map[string]any, + paths meta.DirPaths, +) map[string]any { + topo := i.topo.(*Specification) + spec := i.InstanceSpec.(*TiProxySpec) + enableTLS := topo.GlobalOptions.TLSEnabled + + if cfg == nil { + cfg = make(map[string]any) + } + + pds := []string{} + for _, pdspec := range topo.PDServers { + pds = append(pds, pdspec.GetAdvertiseClientURL(enableTLS)) + } + cfg["proxy.pd-addrs"] = strings.Join(pds, ",") + cfg["proxy.require-backend-tls"] = false + cfg["proxy.addr"] = utils.JoinHostPort(i.GetListenHost(), i.GetPort()) + cfg["api.addr"] = utils.JoinHostPort(i.GetListenHost(), spec.StatusPort) + cfg["log.log-file.filename"] = filepath.Join(paths.Log, "tiproxy.log") + + return cfg +} + +// InitConfig implements Instance interface. +func (i *TiProxyInstance) InitConfig( + ctx context.Context, + e ctxt.Executor, + clusterName, + clusterVersion, + deployUser string, + paths meta.DirPaths, +) error { + topo := i.topo.(*Specification) + if err := i.BaseInstance.InitConfig(ctx, e, topo.GlobalOptions, deployUser, paths); err != nil { + return err + } + spec := i.InstanceSpec.(*TiProxySpec) + globalConfig := topo.ServerConfigs.TiProxy + instanceConfig := i.checkConfig(spec.Config, paths) + + cfg := &scripts.TiProxyScript{ + DeployDir: paths.Deploy, + NumaNode: spec.NumaNode, + } + + fp := filepath.Join(paths.Cache, fmt.Sprintf("run_tiproxy_%s_%d.sh", i.GetHost(), i.GetPort())) + + if err := cfg.ConfigToFile(fp); err != nil { + return err + } + dst := filepath.Join(paths.Deploy, "scripts", "run_tiproxy.sh") + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { + return err + } + + if _, _, err := e.Execute(ctx, "chmod +x "+dst, false); err != nil { + return err + } + + var err error + instanceConfig, err = i.setTLSConfig(ctx, false, instanceConfig, paths) + if err != nil { + return err + } + + return i.MergeServerConfig(ctx, e, globalConfig, instanceConfig, paths) +} + +// setTLSConfig set TLS Config to support enable/disable TLS +func (i *TiProxyInstance) setTLSConfig(ctx context.Context, enableTLS bool, configs map[string]any, paths meta.DirPaths) (map[string]any, error) { + if configs == nil { + configs = make(map[string]any) + } + if enableTLS { + configs["security.cluster-tls.ca"] = fmt.Sprintf("%s/tls/%s", paths.Deploy, TLSCACert) + configs["security.cluster-tls.cert"] = fmt.Sprintf("%s/tls/%s.crt", paths.Deploy, i.Role()) + configs["security.cluster-tls.key"] = fmt.Sprintf("%s/tls/%s.pem", paths.Deploy, i.Role()) + + configs["security.server-tls.ca"] = fmt.Sprintf("%s/tls/%s", paths.Deploy, TLSCACert) + configs["security.server-tls.cert"] = fmt.Sprintf("%s/tls/%s.crt", paths.Deploy, i.Role()) + configs["security.server-tls.key"] = fmt.Sprintf("%s/tls/%s.pem", paths.Deploy, i.Role()) + configs["security.server-tls.skip-ca"] = true + + configs["security.sql-tls.ca"] = fmt.Sprintf("%s/tls/%s", paths.Deploy, TLSCACert) + } else { + // drainer tls config list + tlsConfigs := []string{ + "security.cluster-tls.ca", + "security.cluster-tls.cert", + "security.cluster-tls.key", + "security.server-tls.ca", + "security.server-tls.cert", + "security.server-tls.key", + "security.server-tls.skip-ca", + "security.sql-tls.ca", + } + // delete TLS configs + for _, config := range tlsConfigs { + delete(configs, config) + } + } + + return nil, nil +} + +var _ RollingUpdateInstance = &TiProxyInstance{} + +// GetAddr return the address of this TiProxy instance +func (i *TiProxyInstance) GetAddr() string { + return utils.JoinHostPort(i.GetHost(), i.GetPort()) +} + +// PreRestart implements RollingUpdateInstance interface. +func (i *TiProxyInstance) PreRestart(ctx context.Context, topo Topology, apiTimeoutSeconds int, tlsCfg *tls.Config) error { + return nil +} + +// PostRestart implements RollingUpdateInstance interface. +func (i *TiProxyInstance) PostRestart(ctx context.Context, topo Topology, tlsCfg *tls.Config) error { + return nil +} diff --git a/pkg/cluster/task/update_meta.go b/pkg/cluster/task/update_meta.go index 8e9e3d3e6d..dd6457e1b9 100644 --- a/pkg/cluster/task/update_meta.go +++ b/pkg/cluster/task/update_meta.go @@ -72,6 +72,15 @@ func (u *UpdateMeta) Execute(ctx context.Context) error { } newMeta.Topology.PDServers = pdServers + tiproxyServers := make([]*spec.TiProxySpec, 0) + for i, instance := range (&spec.TiProxyComponent{Topology: topo}).Instances() { + if deleted.Exist(instance.ID()) { + continue + } + tiproxyServers = append(tiproxyServers, topo.TiProxyServers[i]) + } + newMeta.Topology.TiProxyServers = tiproxyServers + dashboardServers := make([]*spec.DashboardSpec, 0) for i, instance := range (&spec.DashboardComponent{Topology: topo}).Instances() { if deleted.Exist(instance.ID()) { @@ -79,7 +88,7 @@ func (u *UpdateMeta) Execute(ctx context.Context) error { } dashboardServers = append(dashboardServers, topo.DashboardServers[i]) } - topo.DashboardServers = dashboardServers + newMeta.Topology.DashboardServers = dashboardServers tiflashServers := make([]*spec.TiFlashSpec, 0) for i, instance := range (&spec.TiFlashComponent{Topology: topo}).Instances() { diff --git a/pkg/cluster/template/config/prometheus.go b/pkg/cluster/template/config/prometheus.go index eacef47238..90d6b00051 100644 --- a/pkg/cluster/template/config/prometheus.go +++ b/pkg/cluster/template/config/prometheus.go @@ -31,6 +31,7 @@ type PrometheusConfig struct { TLSEnabled bool NodeExporterAddrs []string TiDBStatusAddrs []string + TiProxyStatusAddrs []string TiKVStatusAddrs []string PDAddrs []string TiFlashStatusAddrs []string @@ -79,6 +80,12 @@ func (c *PrometheusConfig) AddTiDB(ip string, port uint64) *PrometheusConfig { return c } +// AddTiProxy add a TiProxy address +func (c *PrometheusConfig) AddTiProxy(ip string, port uint64) *PrometheusConfig { + c.TiProxyStatusAddrs = append(c.TiProxyStatusAddrs, utils.JoinHostPort(ip, int(port))) + return c +} + // AddTiKV add a TiKV address func (c *PrometheusConfig) AddTiKV(ip string, port uint64) *PrometheusConfig { c.TiKVStatusAddrs = append(c.TiKVStatusAddrs, utils.JoinHostPort(ip, int(port))) diff --git a/pkg/cluster/template/scripts/tiproxy.go b/pkg/cluster/template/scripts/tiproxy.go new file mode 100644 index 0000000000..d0bd8d5980 --- /dev/null +++ b/pkg/cluster/template/scripts/tiproxy.go @@ -0,0 +1,49 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package scripts + +import ( + "bytes" + "path" + "text/template" + + "github.com/pingcap/tiup/embed" + "github.com/pingcap/tiup/pkg/utils" +) + +// TiProxyScript represent the data to generate tiproxy config +type TiProxyScript struct { + DeployDir string + NumaNode string +} + +// ConfigToFile write config content to specific file. +func (c *TiProxyScript) ConfigToFile(file string) error { + fp := path.Join("templates", "scripts", "run_tiproxy.sh.tpl") + tpl, err := embed.ReadTemplate(fp) + if err != nil { + return err + } + tmpl, err := template.New("TiProxy").Parse(string(tpl)) + if err != nil { + return err + } + + content := bytes.NewBufferString("") + if err := tmpl.Execute(content, c); err != nil { + return err + } + + return utils.WriteFile(file, content.Bytes(), 0755) +} From bdf6f26089b97faa2731dd3b5ac6b705f716190f Mon Sep 17 00:00:00 2001 From: Shiori Date: Tue, 31 Oct 2023 14:17:07 +0800 Subject: [PATCH 10/14] add a new flag '--numa' to show numa in display command (#2295) --- components/cluster/command/display.go | 1 + pkg/cluster/manager/display.go | 13 +++++++++++++ pkg/cluster/spec/alertmanager.go | 2 ++ pkg/cluster/spec/cdc.go | 2 ++ pkg/cluster/spec/dashboard.go | 2 ++ pkg/cluster/spec/drainer.go | 2 ++ pkg/cluster/spec/grafana.go | 2 ++ pkg/cluster/spec/instance.go | 14 ++++++++++++++ pkg/cluster/spec/monitoring.go | 2 ++ pkg/cluster/spec/pd.go | 2 ++ pkg/cluster/spec/pump.go | 2 ++ pkg/cluster/spec/tidb.go | 2 ++ pkg/cluster/spec/tiflash.go | 2 ++ pkg/cluster/spec/tikv.go | 2 ++ pkg/cluster/spec/tikv_cdc.go | 2 ++ pkg/cluster/spec/tiproxy.go | 2 ++ pkg/cluster/spec/tispark.go | 4 ++++ 17 files changed, 58 insertions(+) diff --git a/components/cluster/command/display.go b/components/cluster/command/display.go index 816b0c2bcf..981e864542 100644 --- a/components/cluster/command/display.go +++ b/components/cluster/command/display.go @@ -97,6 +97,7 @@ func newDisplayCmd() *cobra.Command { cmd.Flags().BoolVar(&showTiKVLabels, "labels", false, "Only display labels of specified TiKV role or nodes") cmd.Flags().BoolVar(&dopt.ShowProcess, "process", false, "display cpu and memory usage of nodes") cmd.Flags().BoolVar(&dopt.ShowManageHost, "manage-host", false, "display manage host of nodes") + cmd.Flags().BoolVar(&dopt.ShowNuma, "numa", false, "display numa information of nodes") cmd.Flags().Uint64Var(&statusTimeout, "status-timeout", 10, "Timeout in seconds when getting node status") return cmd diff --git a/pkg/cluster/manager/display.go b/pkg/cluster/manager/display.go index 8965b14bb0..d2f68990e6 100644 --- a/pkg/cluster/manager/display.go +++ b/pkg/cluster/manager/display.go @@ -50,6 +50,7 @@ type DisplayOption struct { ShowUptime bool ShowProcess bool ShowManageHost bool + ShowNuma bool } // InstInfo represents an instance info @@ -67,6 +68,8 @@ type InstInfo struct { Since string `json:"since"` DataDir string `json:"data_dir"` DeployDir string `json:"deploy_dir"` + NumaNode string `json:"numa_node"` + NumaCores string `json:"numa_cores"` ComponentName string Port int @@ -199,6 +202,10 @@ func (m *Manager) Display(dopt DisplayOption, opt operator.Options) error { if dopt.ShowUptime { rowHead = append(rowHead, "Since") } + if dopt.ShowNuma { + rowHead = append(rowHead, "Numa Node", "Numd Cores") + } + rowHead = append(rowHead, "Data Dir", "Deploy Dir") clusterTable = append(clusterTable, rowHead) @@ -225,6 +232,10 @@ func (m *Manager) Display(dopt DisplayOption, opt operator.Options) error { if dopt.ShowUptime { row = append(row, v.Since) } + if dopt.ShowNuma { + row = append(row, v.NumaNode, v.NumaCores) + } + row = append(row, v.DataDir, v.DeployDir) clusterTable = append(clusterTable, row) @@ -654,6 +665,8 @@ func (m *Manager) GetClusterTopology(dopt DisplayOption, opt operator.Options) ( ComponentName: ins.ComponentName(), Port: ins.GetPort(), Since: since, + NumaNode: utils.Ternary(ins.GetNumaNode() == "", "-", ins.GetNumaNode()).(string), + NumaCores: utils.Ternary(ins.GetNumaCores() == "", "-", ins.GetNumaCores()).(string), }) mu.Unlock() }, opt.Concurrency) diff --git a/pkg/cluster/spec/alertmanager.go b/pkg/cluster/spec/alertmanager.go index 50a4522478..783b93cce6 100644 --- a/pkg/cluster/spec/alertmanager.go +++ b/pkg/cluster/spec/alertmanager.go @@ -115,6 +115,8 @@ func (c *AlertManagerComponent) Instances() []Instance { ListenHost: s.ListenHost, Port: s.WebPort, SSHP: s.SSHPort, + NumaNode: s.NumaNode, + NumaCores: "", Ports: []int{ s.WebPort, diff --git a/pkg/cluster/spec/cdc.go b/pkg/cluster/spec/cdc.go index 5ba4646f3b..b97abd9398 100644 --- a/pkg/cluster/spec/cdc.go +++ b/pkg/cluster/spec/cdc.go @@ -126,6 +126,8 @@ func (c *CDCComponent) Instances() []Instance { Port: s.Port, SSHP: s.SSHPort, Source: s.GetSource(), + NumaNode: s.NumaNode, + NumaCores: "", Ports: []int{ s.Port, diff --git a/pkg/cluster/spec/dashboard.go b/pkg/cluster/spec/dashboard.go index 75f36db7f2..7c127819ec 100644 --- a/pkg/cluster/spec/dashboard.go +++ b/pkg/cluster/spec/dashboard.go @@ -129,6 +129,8 @@ func (c *DashboardComponent) Instances() []Instance { Port: s.Port, SSHP: s.SSHPort, Source: s.Source, + NumaNode: s.NumaNode, + NumaCores: "", Ports: []int{ s.Port, diff --git a/pkg/cluster/spec/drainer.go b/pkg/cluster/spec/drainer.go index 1ad3360a82..5e60d3305f 100644 --- a/pkg/cluster/spec/drainer.go +++ b/pkg/cluster/spec/drainer.go @@ -147,6 +147,8 @@ func (c *DrainerComponent) Instances() []Instance { Port: s.Port, SSHP: s.SSHPort, Source: s.GetSource(), + NumaNode: s.NumaNode, + NumaCores: "", Ports: []int{ s.Port, diff --git a/pkg/cluster/spec/grafana.go b/pkg/cluster/spec/grafana.go index d4f1824444..9b417dfe0c 100644 --- a/pkg/cluster/spec/grafana.go +++ b/pkg/cluster/spec/grafana.go @@ -122,6 +122,8 @@ func (c *GrafanaComponent) Instances() []Instance { ManageHost: s.ManageHost, Port: s.Port, SSHP: s.SSHPort, + NumaNode: "", + NumaCores: "", Ports: []int{ s.Port, diff --git a/pkg/cluster/spec/instance.go b/pkg/cluster/spec/instance.go index 7e8446903e..7a3547df27 100644 --- a/pkg/cluster/spec/instance.go +++ b/pkg/cluster/spec/instance.go @@ -98,6 +98,8 @@ type Instance interface { GetManageHost() string GetPort() int GetSSHPort() int + GetNumaNode() string + GetNumaCores() string DeployDir() string UsedPorts() []int UsedDirs() []string @@ -145,6 +147,8 @@ type BaseInstance struct { Port int SSHP int Source string + NumaNode string + NumaCores string Ports []int Dirs []string @@ -366,6 +370,16 @@ func (i *BaseInstance) GetSSHPort() int { return i.SSHP } +// GetNumaNode implements Instance interface +func (i *BaseInstance) GetNumaNode() string { + return i.NumaNode +} + +// GetNumaCores implements Instance interface +func (i *BaseInstance) GetNumaCores() string { + return i.NumaCores +} + // DeployDir implements Instance interface func (i *BaseInstance) DeployDir() string { return reflect.Indirect(reflect.ValueOf(i.InstanceSpec)).FieldByName("DeployDir").String() diff --git a/pkg/cluster/spec/monitoring.go b/pkg/cluster/spec/monitoring.go index 91200eae2b..b61a1e6bdd 100644 --- a/pkg/cluster/spec/monitoring.go +++ b/pkg/cluster/spec/monitoring.go @@ -138,6 +138,8 @@ func (c *MonitorComponent) Instances() []Instance { ManageHost: s.ManageHost, Port: s.Port, SSHP: s.SSHPort, + NumaNode: s.NumaNode, + NumaCores: "", Ports: []int{ s.Port, diff --git a/pkg/cluster/spec/pd.go b/pkg/cluster/spec/pd.go index 008d1917da..9d944d2003 100644 --- a/pkg/cluster/spec/pd.go +++ b/pkg/cluster/spec/pd.go @@ -175,6 +175,8 @@ func (c *PDComponent) Instances() []Instance { Port: s.ClientPort, SSHP: s.SSHPort, Source: s.GetSource(), + NumaNode: s.NumaNode, + NumaCores: "", Ports: []int{ s.ClientPort, diff --git a/pkg/cluster/spec/pump.go b/pkg/cluster/spec/pump.go index 7428105d08..2967b64342 100644 --- a/pkg/cluster/spec/pump.go +++ b/pkg/cluster/spec/pump.go @@ -146,6 +146,8 @@ func (c *PumpComponent) Instances() []Instance { Port: s.Port, SSHP: s.SSHPort, Source: s.GetSource(), + NumaNode: s.NumaNode, + NumaCores: "", Ports: []int{ s.Port, diff --git a/pkg/cluster/spec/tidb.go b/pkg/cluster/spec/tidb.go index 668bc15e57..21f22c5490 100644 --- a/pkg/cluster/spec/tidb.go +++ b/pkg/cluster/spec/tidb.go @@ -124,6 +124,8 @@ func (c *TiDBComponent) Instances() []Instance { Port: s.Port, SSHP: s.SSHPort, Source: s.Source, + NumaNode: s.NumaNode, + NumaCores: s.NumaCores, Ports: []int{ s.Port, diff --git a/pkg/cluster/spec/tiflash.go b/pkg/cluster/spec/tiflash.go index d19eab1f92..8a042d5156 100644 --- a/pkg/cluster/spec/tiflash.go +++ b/pkg/cluster/spec/tiflash.go @@ -294,6 +294,8 @@ func (c *TiFlashComponent) Instances() []Instance { Port: s.GetMainPort(), SSHP: s.SSHPort, Source: s.GetSource(), + NumaNode: s.NumaNode, + NumaCores: s.NumaCores, Ports: []int{ s.TCPPort, diff --git a/pkg/cluster/spec/tikv.go b/pkg/cluster/spec/tikv.go index 270bae06c2..bb93e7acc0 100644 --- a/pkg/cluster/spec/tikv.go +++ b/pkg/cluster/spec/tikv.go @@ -199,6 +199,8 @@ func (c *TiKVComponent) Instances() []Instance { Port: s.Port, SSHP: s.SSHPort, Source: s.Source, + NumaNode: s.NumaNode, + NumaCores: s.NumaCores, Ports: []int{ s.Port, diff --git a/pkg/cluster/spec/tikv_cdc.go b/pkg/cluster/spec/tikv_cdc.go index 55c393d099..5f3bd032e9 100644 --- a/pkg/cluster/spec/tikv_cdc.go +++ b/pkg/cluster/spec/tikv_cdc.go @@ -126,6 +126,8 @@ func (c *TiKVCDCComponent) Instances() []Instance { Port: s.Port, SSHP: s.SSHPort, Source: s.GetSource(), + NumaNode: s.NumaNode, + NumaCores: "", Ports: []int{ s.Port, diff --git a/pkg/cluster/spec/tiproxy.go b/pkg/cluster/spec/tiproxy.go index 09c19d1051..afc1a7689d 100644 --- a/pkg/cluster/spec/tiproxy.go +++ b/pkg/cluster/spec/tiproxy.go @@ -142,6 +142,8 @@ func (c *TiProxyComponent) Instances() []Instance { Port: s.Port, SSHP: s.SSHPort, Source: ComponentTiProxy, + NumaNode: s.NumaNode, + NumaCores: "", Ports: []int{ s.Port, s.StatusPort, diff --git a/pkg/cluster/spec/tispark.go b/pkg/cluster/spec/tispark.go index bacd9f4b04..792bbeac2f 100644 --- a/pkg/cluster/spec/tispark.go +++ b/pkg/cluster/spec/tispark.go @@ -154,6 +154,8 @@ func (c *TiSparkMasterComponent) Instances() []Instance { Host: s.Host, Port: s.Port, SSHP: s.SSHPort, + NumaNode: "", + NumaCores: "", Ports: []int{ s.Port, @@ -333,6 +335,8 @@ func (c *TiSparkWorkerComponent) Instances() []Instance { Host: s.Host, Port: s.Port, SSHP: s.SSHPort, + NumaNode: "", + NumaCores: "", Ports: []int{ s.Port, From b8fbc842e4f85270d2bf3c0773e4b20e48d95687 Mon Sep 17 00:00:00 2001 From: Shiori Date: Tue, 31 Oct 2023 14:29:53 +0800 Subject: [PATCH 11/14] use port to probe db in playground (#2296) --- components/playground/main.go | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/components/playground/main.go b/components/playground/main.go index 2bae3ecc1e..3231a5708c 100644 --- a/components/playground/main.go +++ b/components/playground/main.go @@ -15,9 +15,9 @@ package main import ( "context" - "database/sql" "encoding/json" "fmt" + "net" "net/http" _ "net/http/pprof" "os" @@ -419,28 +419,20 @@ func populateDefaultOpt(flagSet *pflag.FlagSet) error { return nil } -func tryConnect(dsn string) error { - cli, err := sql.Open("mysql", dsn) - if err != nil { - return err - } - defer cli.Close() - - conn, err := cli.Conn(context.Background()) +func tryConnect(addr string, timeoutSec int) error { + conn, err := net.DialTimeout("tcp", addr, time.Duration(timeoutSec)*time.Second) if err != nil { return err } defer conn.Close() - return nil } // checkDB check if the addr is connectable by getting a connection from sql.DB. timeout <=0 means no timeout func checkDB(dbAddr string, timeout int) bool { - dsn := fmt.Sprintf("root:@tcp(%s)/", dbAddr) if timeout > 0 { for i := 0; i < timeout; i++ { - if tryConnect(dsn) == nil { + if tryConnect(dbAddr, timeout) == nil { return true } time.Sleep(time.Second) @@ -448,7 +440,7 @@ func checkDB(dbAddr string, timeout int) bool { return false } for { - if err := tryConnect(dsn); err == nil { + if err := tryConnect(dbAddr, timeout); err == nil { return true } time.Sleep(time.Second) From 14777393b3f730f7a2f84a313668a7d79e237843 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 31 Oct 2023 17:12:37 +0800 Subject: [PATCH 12/14] Support scheduling service (#2273) --- components/playground/instance/instance.go | 2 +- components/playground/instance/pd.go | 23 ++++-- components/playground/main.go | 15 ++-- components/playground/playground.go | 83 +++++++++++++++++----- 4 files changed, 97 insertions(+), 26 deletions(-) diff --git a/components/playground/instance/instance.go b/components/playground/instance/instance.go index 902e32b52a..0676c75910 100644 --- a/components/playground/instance/instance.go +++ b/components/playground/instance/instance.go @@ -109,7 +109,7 @@ func logIfErr(err error) { func pdEndpoints(pds []*PDInstance, isHTTP bool) []string { var endpoints []string for _, pd := range pds { - if pd.Role == PDRoleTSO || pd.Role == PDRoleResourceManager { + if pd.Role == PDRoleTSO || pd.Role == PDRoleScheduling || pd.Role == PDRoleResourceManager { continue } if isHTTP { diff --git a/components/playground/instance/pd.go b/components/playground/instance/pd.go index 84f1f53117..68673d55fe 100644 --- a/components/playground/instance/pd.go +++ b/components/playground/instance/pd.go @@ -34,6 +34,8 @@ const ( PDRoleAPI PDRole = "api" // PDRoleTSO is the role of PD TSO PDRoleTSO PDRole = "tso" + // PDRoleScheduling is the role of PD scheduling + PDRoleScheduling PDRole = "scheduling" // PDRoleResourceManager is the role of PD resource manager PDRoleResourceManager PDRole = "resource manager" ) @@ -128,8 +130,21 @@ func (inst *PDInstance) Start(ctx context.Context, version utils.Version) error args = []string{ "services", "tso", - fmt.Sprintf("--listen-addr=http://%s", utils.JoinHostPort(inst.Host, inst.Port)), - fmt.Sprintf("--advertise-listen-addr=http://%s", utils.JoinHostPort(AdvertiseHost(inst.Host), inst.Port)), + fmt.Sprintf("--listen-addr=http://%s", utils.JoinHostPort(inst.Host, inst.StatusPort)), + fmt.Sprintf("--advertise-listen-addr=http://%s", utils.JoinHostPort(AdvertiseHost(inst.Host), inst.StatusPort)), + fmt.Sprintf("--backend-endpoints=%s", strings.Join(endpoints, ",")), + fmt.Sprintf("--log-file=%s", inst.LogFile()), + } + if inst.ConfigPath != "" { + args = append(args, fmt.Sprintf("--config=%s", inst.ConfigPath)) + } + case PDRoleScheduling: + endpoints := pdEndpoints(inst.pds, true) + args = []string{ + "services", + "scheduling", + fmt.Sprintf("--listen-addr=http://%s", utils.JoinHostPort(inst.Host, inst.StatusPort)), + fmt.Sprintf("--advertise-listen-addr=http://%s", utils.JoinHostPort(AdvertiseHost(inst.Host), inst.StatusPort)), fmt.Sprintf("--backend-endpoints=%s", strings.Join(endpoints, ",")), fmt.Sprintf("--log-file=%s", inst.LogFile()), } @@ -141,8 +156,8 @@ func (inst *PDInstance) Start(ctx context.Context, version utils.Version) error args = []string{ "services", "resource-manager", - fmt.Sprintf("--listen-addr=http://%s", utils.JoinHostPort(inst.Host, inst.Port)), - fmt.Sprintf("--advertise-listen-addr=http://%s", utils.JoinHostPort(AdvertiseHost(inst.Host), inst.Port)), + fmt.Sprintf("--listen-addr=http://%s", utils.JoinHostPort(inst.Host, inst.StatusPort)), + fmt.Sprintf("--advertise-listen-addr=http://%s", utils.JoinHostPort(AdvertiseHost(inst.Host), inst.StatusPort)), fmt.Sprintf("--backend-endpoints=%s", strings.Join(endpoints, ",")), fmt.Sprintf("--log-file=%s", inst.LogFile()), } diff --git a/components/playground/main.go b/components/playground/main.go index 3231a5708c..d6419c100e 100644 --- a/components/playground/main.go +++ b/components/playground/main.go @@ -57,10 +57,11 @@ type BootOptions struct { Mode string `yaml:"mode"` PDMode string `yaml:"pd_mode"` Version string `yaml:"version"` - PD instance.Config `yaml:"pd"` // ignored when pd_mode == ms - PDAPI instance.Config `yaml:"pd_api"` // Only available when pd_mode == ms - PDTSO instance.Config `yaml:"pd_tso"` // Only available when pd_mode == ms - PDRM instance.Config `yaml:"pd_rm"` // Only available when pd_mode == ms + PD instance.Config `yaml:"pd"` // ignored when pd_mode == ms + PDAPI instance.Config `yaml:"pd_api"` // Only available when pd_mode == ms + PDTSO instance.Config `yaml:"pd_tso"` // Only available when pd_mode == ms + PDScheduling instance.Config `yaml:"pd_scheduling"` // Only available when pd_mode == ms + PDRM instance.Config `yaml:"pd_rm"` // Only available when pd_mode == ms TiProxy instance.Config `yaml:"tiproxy"` TiDB instance.Config `yaml:"tidb"` TiKV instance.Config `yaml:"tikv"` @@ -294,6 +295,7 @@ If you'd like to use a TiDB version other than %s, cancel and retry with the fol rootCmd.Flags().IntVar(&options.PDAPI.Num, "pd.api", 0, "PD API instance number") rootCmd.Flags().IntVar(&options.PDTSO.Num, "pd.tso", 0, "PD TSO instance number") + rootCmd.Flags().IntVar(&options.PDScheduling.Num, "pd.scheduling", 0, "PD scheduling instance number") rootCmd.Flags().IntVar(&options.PDRM.Num, "pd.rm", 0, "PD resource manager instance number") rootCmd.Flags().IntVar(&options.TiDB.UpTimeout, "db.timeout", 60, "TiDB max wait time in seconds for starting, 0 means no limit") @@ -326,6 +328,7 @@ If you'd like to use a TiDB version other than %s, cancel and retry with the fol rootCmd.Flags().StringVar(&options.PDAPI.ConfigPath, "pd.api.config", "", "PD API instance configuration file") rootCmd.Flags().StringVar(&options.PDTSO.ConfigPath, "pd.tso.config", "", "PD TSO instance configuration file") + rootCmd.Flags().StringVar(&options.PDScheduling.ConfigPath, "pd.scheduling.config", "", "PD scheduling instance configuration file") rootCmd.Flags().StringVar(&options.PDRM.ConfigPath, "pd.rm.config", "", "PD resource manager instance configuration file") rootCmd.Flags().StringVar(&options.TiDB.BinPath, "db.binpath", "", "TiDB instance binary path") @@ -342,6 +345,7 @@ If you'd like to use a TiDB version other than %s, cancel and retry with the fol rootCmd.Flags().StringVar(&options.PDAPI.BinPath, "pd.api.binpath", "", "PD API instance binary path") rootCmd.Flags().StringVar(&options.PDTSO.BinPath, "pd.tso.binpath", "", "PD TSO instance binary path") + rootCmd.Flags().StringVar(&options.PDScheduling.BinPath, "pd.scheduling.binpath", "", "PD scheduling instance binary path") rootCmd.Flags().StringVar(&options.PDRM.BinPath, "pd.rm.binpath", "", "PD resource manager instance binary path") rootCmd.Flags().StringVar(&options.TiKVCDC.Version, "kvcdc.version", "", "TiKV-CDC instance version") @@ -409,6 +413,9 @@ func populateDefaultOpt(flagSet *pflag.FlagSet) error { defaultInt(&options.PDTSO.Num, "pd.tso", 1) defaultStr(&options.PDTSO.BinPath, "pd.tso.binpath", options.PDTSO.BinPath) defaultStr(&options.PDTSO.ConfigPath, "pd.tso.config", options.PDTSO.ConfigPath) + defaultInt(&options.PDScheduling.Num, "pd.scheduling", 1) + defaultStr(&options.PDScheduling.BinPath, "pd.scheduling.binpath", options.PDScheduling.BinPath) + defaultStr(&options.PDScheduling.ConfigPath, "pd.scheduling.config", options.PDScheduling.ConfigPath) defaultInt(&options.PDRM.Num, "pd.rm", 1) defaultStr(&options.PDRM.BinPath, "pd.rm.binpath", options.PDRM.BinPath) defaultStr(&options.PDRM.ConfigPath, "pd.rm.config", options.PDRM.ConfigPath) diff --git a/components/playground/playground.go b/components/playground/playground.go index fc600fc21e..a63bb4b5ed 100644 --- a/components/playground/playground.go +++ b/components/playground/playground.go @@ -61,6 +61,9 @@ type Playground struct { port int pds []*instance.PDInstance + tsos []*instance.PDInstance + schedulings []*instance.PDInstance + rms []*instance.PDInstance tikvs []*instance.TiKVInstance tidbs []*instance.TiDBInstance tiflashs []*instance.TiFlashInstance @@ -276,6 +279,7 @@ func (p *Playground) handleScaleIn(w io.Writer, pid int) error { switch cid { case spec.ComponentPD: + // microservice not support scale in temporarily for i := 0; i < len(p.pds); i++ { if p.pds[i].Pid() == pid { inst := p.pds[i] @@ -593,6 +597,24 @@ func (p *Playground) WalkInstances(fn func(componentID string, ins instance.Inst return err } } + for _, ins := range p.tsos { + err := fn(spec.ComponentPD, ins) + if err != nil { + return err + } + } + for _, ins := range p.schedulings { + err := fn(spec.ComponentPD, ins) + if err != nil { + return err + } + } + for _, ins := range p.rms { + err := fn(spec.ComponentPD, ins) + if err != nil { + return err + } + } for _, ins := range p.tikvs { err := fn(spec.ComponentTiKV, ins) if err != nil { @@ -698,8 +720,12 @@ func (p *Playground) addInstance(componentID string, pdRole instance.PDRole, tif pd.InitCluster(p.pds) } } - } else { - p.pds = append(p.pds, inst) + } else if pdRole == instance.PDRoleTSO { + p.tsos = append(p.tsos, inst) + } else if pdRole == instance.PDRoleScheduling { + p.schedulings = append(p.schedulings, inst) + } else if pdRole == instance.PDRoleResourceManager { + p.rms = append(p.rms, inst) } case spec.ComponentTiDB: inst := instance.NewTiDBInstance(cfg.BinPath, dir, host, cfg.ConfigPath, id, cfg.Port, p.pds, p.enableBinlog(), p.bootOptions.Mode == "tidb-disagg") @@ -866,6 +892,7 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme &options.PD, &options.PDAPI, &options.PDTSO, + &options.PDScheduling, &options.PDRM, &options.TiProxy, &options.TiDB, @@ -975,6 +1002,7 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme instances = append([]InstancePair{ {spec.ComponentPD, instance.PDRoleAPI, instance.TiFlashRoleNormal, options.PDAPI}, {spec.ComponentPD, instance.PDRoleTSO, instance.TiFlashRoleNormal, options.PDTSO}, + {spec.ComponentPD, instance.PDRoleScheduling, instance.TiFlashRoleNormal, options.PDScheduling}, {spec.ComponentPD, instance.PDRoleResourceManager, instance.TiFlashRoleNormal, options.PDRM}}, instances..., ) @@ -1080,25 +1108,31 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme if p.bootOptions.Mode == "tikv-slim" { if p.bootOptions.PDMode == "ms" { var ( - tsoAddr []string - apiAddr []string - rmAddr []string + tsoAddr []string + apiAddr []string + rmAddr []string + schedulingAddr []string ) - for _, pd := range p.pds { - switch pd.Role { - case instance.PDRoleTSO: - tsoAddr = append(tsoAddr, pd.Addr()) - case instance.PDRoleAPI: - apiAddr = append(apiAddr, pd.Addr()) - case instance.PDRoleResourceManager: - rmAddr = append(rmAddr, pd.Addr()) - } + for _, api := range p.pds { + apiAddr = append(apiAddr, api.Addr()) } - fmt.Printf("PD TSO Endpoints: ") - colorCmd.Printf("%s\n", strings.Join(tsoAddr, ",")) + for _, tso := range p.tsos { + tsoAddr = append(tsoAddr, tso.Addr()) + } + for _, scheduling := range p.schedulings { + schedulingAddr = append(schedulingAddr, scheduling.Addr()) + } + for _, rm := range p.rms { + rmAddr = append(rmAddr, rm.Addr()) + } + fmt.Printf("PD API Endpoints: ") colorCmd.Printf("%s\n", strings.Join(apiAddr, ",")) - fmt.Printf("PD Resource Ranager Endpoints: ") + fmt.Printf("PD TSO Endpoints: ") + colorCmd.Printf("%s\n", strings.Join(tsoAddr, ",")) + fmt.Printf("PD Scheduling Endpoints: ") + colorCmd.Printf("%s\n", strings.Join(schedulingAddr, ",")) + fmt.Printf("PD Resource Manager Endpoints: ") colorCmd.Printf("%s\n", strings.Join(rmAddr, ",")) } else { var pdAddrs []string @@ -1232,6 +1266,21 @@ func (p *Playground) terminate(sig syscall.Signal) { kill(inst.Component(), inst.Pid(), inst.Wait) } } + for _, inst := range p.tsos { + if inst.Process != nil && inst.Process.Cmd() != nil && inst.Process.Cmd().Process != nil { + kill(inst.Component(), inst.Pid(), inst.Wait) + } + } + for _, inst := range p.schedulings { + if inst.Process != nil && inst.Process.Cmd() != nil && inst.Process.Cmd().Process != nil { + kill(inst.Component(), inst.Pid(), inst.Wait) + } + } + for _, inst := range p.rms { + if inst.Process != nil && inst.Process.Cmd() != nil && inst.Process.Cmd().Process != nil { + kill(inst.Component(), inst.Pid(), inst.Wait) + } + } } func (p *Playground) renderSDFile() error { From 721b06561ef00c007a3cb72f40fbd06448050f28 Mon Sep 17 00:00:00 2001 From: Shiori Date: Wed, 1 Nov 2023 15:57:45 +0800 Subject: [PATCH 13/14] add UnwarpCheckPointExecutor (#2298) --- pkg/cluster/api/tidbapi.go | 1 - pkg/cluster/executor/executor.go | 13 +++++++++++++ pkg/cluster/module/wait_for.go | 3 ++- 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/pkg/cluster/api/tidbapi.go b/pkg/cluster/api/tidbapi.go index ddb3a36403..793ba16656 100644 --- a/pkg/cluster/api/tidbapi.go +++ b/pkg/cluster/api/tidbapi.go @@ -75,4 +75,3 @@ func (c *TiDBClient) FinishUpgrade() error { return err } - diff --git a/pkg/cluster/executor/executor.go b/pkg/cluster/executor/executor.go index a2c2f20aab..8cc867a991 100644 --- a/pkg/cluster/executor/executor.go +++ b/pkg/cluster/executor/executor.go @@ -118,6 +118,19 @@ func New(etype SSHType, sudo bool, c SSHConfig) (ctxt.Executor, error) { return &CheckPointExecutor{executor, &c}, nil } +// UnwarpCheckPointExecutor unwarp the CheckPointExecutor and return the real executor +// +// Sometimes we just want to get the output of a command, and the CheckPointExecutor will +// always cache the output, it will be a problem when we want to get the real output. +func UnwarpCheckPointExecutor(e ctxt.Executor) ctxt.Executor { + switch e := e.(type) { + case *CheckPointExecutor: + return e.Executor + default: + return e + } +} + func checkLocalIP(ip string) error { ifaces, err := net.Interfaces() if err != nil { diff --git a/pkg/cluster/module/wait_for.go b/pkg/cluster/module/wait_for.go index 8a59104e1b..757b0f8fae 100644 --- a/pkg/cluster/module/wait_for.go +++ b/pkg/cluster/module/wait_for.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tiup/pkg/cluster/ctxt" + "github.com/pingcap/tiup/pkg/cluster/executor" "github.com/pingcap/tiup/pkg/utils" "go.uber.org/zap" ) @@ -71,7 +72,7 @@ func (w *WaitFor) Execute(ctx context.Context, e ctxt.Executor) (err error) { } if err := utils.Retry(func() error { // only listing TCP ports - stdout, _, err := e.Execute(ctx, "ss -ltn", false) + stdout, _, err := executor.UnwarpCheckPointExecutor(e).Execute(ctx, "ss -ltn", false) if err == nil { switch w.c.State { case "started": From a07c4d7e259d9490062c1a7a9a7583277f76c85d Mon Sep 17 00:00:00 2001 From: xhe Date: Fri, 3 Nov 2023 16:32:09 +0800 Subject: [PATCH 14/14] playground: fix tiproxy metrics addr (#2299) --- components/playground/instance/instance.go | 14 ++++++++---- components/playground/instance/tiproxy.go | 9 ++++++++ components/playground/monitor.go | 26 +++++++++------------- components/playground/playground.go | 6 ++--- 4 files changed, 32 insertions(+), 23 deletions(-) diff --git a/components/playground/instance/instance.go b/components/playground/instance/instance.go index 0676c75910..620f2a502d 100644 --- a/components/playground/instance/instance.go +++ b/components/playground/instance/instance.go @@ -47,6 +47,12 @@ type instance struct { BinPath string } +// MetricAddr will be used by prometheus scrape_configs. +type MetricAddr struct { + Targets []string `json:"targets"` + Labels map[string]string `json:"labels"` +} + // Instance represent running component type Instance interface { Pid() int @@ -59,16 +65,16 @@ type Instance interface { LogFile() string // Uptime show uptime. Uptime() string - // StatusAddrs return the address to pull metrics. - StatusAddrs() []string + // MetricAddr return the address to pull metrics. + MetricAddr() MetricAddr // Wait Should only call this if the instance is started successfully. // The implementation should be safe to call Wait multi times. Wait() error } -func (inst *instance) StatusAddrs() (addrs []string) { +func (inst *instance) MetricAddr() (r MetricAddr) { if inst.Host != "" && inst.StatusPort != 0 { - addrs = append(addrs, utils.JoinHostPort(inst.Host, inst.StatusPort)) + r.Targets = append(r.Targets, utils.JoinHostPort(inst.Host, inst.StatusPort)) } return } diff --git a/components/playground/instance/tiproxy.go b/components/playground/instance/tiproxy.go index b39f5f01de..95b6b13cd8 100644 --- a/components/playground/instance/tiproxy.go +++ b/components/playground/instance/tiproxy.go @@ -54,6 +54,15 @@ func NewTiProxy(binPath string, dir, host, configPath string, id int, port int, return tiproxy } +// MetricAddr implements Instance interface. +func (c *TiProxy) MetricAddr() (r MetricAddr) { + r.Targets = append(r.Targets, utils.JoinHostPort(c.Host, c.StatusPort)) + r.Labels = map[string]string{ + "__metrics_path__": "/api/metrics", + } + return +} + // Start implements Instance interface. func (c *TiProxy) Start(ctx context.Context, version utils.Version) error { endpoints := pdEndpoints(c.pds, true) diff --git a/components/playground/monitor.go b/components/playground/monitor.go index f0cfd2bd45..3d17a7edae 100644 --- a/components/playground/monitor.go +++ b/components/playground/monitor.go @@ -29,24 +29,20 @@ import ( ) // ref: https://prometheus.io/docs/prometheus/latest/configuration/configuration/#file_sd_config -func (m *monitor) renderSDFile(cid2targets map[string][]string) error { - type Item struct { - Targets []string `json:"targets"` - Labels map[string]string `json:"labels"` - } - - cid2targets["prometheus"] = []string{utils.JoinHostPort(m.host, m.port)} +func (m *monitor) renderSDFile(cid2targets map[string]instance.MetricAddr) error { + cid2targets["prometheus"] = instance.MetricAddr{Targets: []string{utils.JoinHostPort(m.host, m.port)}} - var items []Item + var items []instance.MetricAddr - for id, targets := range cid2targets { - item := Item{ - Targets: targets, - Labels: map[string]string{ - "job": id, - }, + for id, t := range cid2targets { + it := instance.MetricAddr{ + Targets: t.Targets, + Labels: map[string]string{"job": id}, + } + for k, v := range t.Labels { + it.Labels[k] = v } - items = append(items, item) + items = append(items, it) } data, err := json.MarshalIndent(&items, "", "\t") diff --git a/components/playground/playground.go b/components/playground/playground.go index a63bb4b5ed..4df08b60af 100644 --- a/components/playground/playground.go +++ b/components/playground/playground.go @@ -1289,12 +1289,10 @@ func (p *Playground) renderSDFile() error { return nil } - cid2targets := make(map[string][]string) + cid2targets := make(map[string]instance.MetricAddr) _ = p.WalkInstances(func(cid string, inst instance.Instance) error { - targets := cid2targets[cid] - targets = append(targets, inst.StatusAddrs()...) - cid2targets[cid] = targets + cid2targets[cid] = inst.MetricAddr() return nil })