diff --git a/embed/templates/config/prometheus.yml.tpl b/embed/templates/config/prometheus.yml.tpl index 10bef9607d..238eb179ab 100644 --- a/embed/templates/config/prometheus.yml.tpl +++ b/embed/templates/config/prometheus.yml.tpl @@ -109,6 +109,22 @@ scrape_configs: - targets: {{- range .TiDBStatusAddrs}} - '{{.}}' +{{- end}} + - job_name: "tiproxy" + honor_labels: true # don't overwrite job & instance labels + metrics_path: /api/metrics +{{- if .TLSEnabled}} + scheme: https + tls_config: + insecure_skip_verify: false + ca_file: ../tls/ca.crt + cert_file: ../tls/prometheus.crt + key_file: ../tls/prometheus.pem +{{- end}} + static_configs: + - targets: +{{- range .TiProxyStatusAddrs}} + - '{{.}}' {{- end}} - job_name: "tikv" honor_labels: true # don't overwrite job & instance labels diff --git a/embed/templates/scripts/run_tiproxy.sh.tpl b/embed/templates/scripts/run_tiproxy.sh.tpl new file mode 100644 index 0000000000..83362c9121 --- /dev/null +++ b/embed/templates/scripts/run_tiproxy.sh.tpl @@ -0,0 +1,14 @@ +#!/bin/bash +set -e + +# WARNING: This file was auto-generated. Do not edit! +# All your edit might be overwritten! +DEPLOY_DIR={{.DeployDir}} +cd "${DEPLOY_DIR}" || exit 1 + +{{- if .NumaNode}} +exec numactl --cpunodebind={{.NumaNode}} --membind={{.NumaNode}} bin/tiproxy \ +{{- else}} +exec bin/tiproxy \ +{{- end}} + --config conf/tiproxy.toml diff --git a/pkg/cluster/ansible/import_test.go b/pkg/cluster/ansible/import_test.go index 970bd68c53..8c1f0fe1b6 100644 --- a/pkg/cluster/ansible/import_test.go +++ b/pkg/cluster/ansible/import_test.go @@ -126,6 +126,7 @@ server_configs: pd: {} tidb_dashboard: {} tiflash: {} + tiproxy: {} tiflash-learner: {} pump: {} drainer: {} @@ -135,6 +136,7 @@ server_configs: tidb_servers: [] tikv_servers: [] tiflash_servers: [] +tiproxy_servers: [] pd_servers: [] monitoring_servers: [] `) diff --git a/pkg/cluster/ansible/test-data/meta.yaml b/pkg/cluster/ansible/test-data/meta.yaml index d5c1263cbb..e143d7cd72 100644 --- a/pkg/cluster/ansible/test-data/meta.yaml +++ b/pkg/cluster/ansible/test-data/meta.yaml @@ -21,6 +21,7 @@ topology: pd: {} tidb_dashboard: {} tiflash: {} + tiproxy: {} tiflash-learner: {} pump: {} drainer: {} @@ -193,3 +194,4 @@ topology: data_dir: data/alertmanager-9093 arch: amd64 os: linux + tiproxy_servers: [] diff --git a/pkg/cluster/spec/bindversion.go b/pkg/cluster/spec/bindversion.go index 6e6ac9fcf8..6a0c36328b 100644 --- a/pkg/cluster/spec/bindversion.go +++ b/pkg/cluster/spec/bindversion.go @@ -31,6 +31,8 @@ func TiDBComponentVersion(comp, version string) string { ComponentTiSpark, ComponentTiKVCDC: // TiKV-CDC use individual version. return "" + case ComponentTiProxy: + return "nightly" default: return version } diff --git a/pkg/cluster/spec/monitoring.go b/pkg/cluster/spec/monitoring.go index a5753448eb..91200eae2b 100644 --- a/pkg/cluster/spec/monitoring.go +++ b/pkg/cluster/spec/monitoring.go @@ -247,6 +247,13 @@ func (i *MonitorInstance) InitConfig( cfig.AddTiDB(db.Host, uint64(db.StatusPort)) } } + if servers, found := topoHasField("TiProxyServers"); found { + for i := 0; i < servers.Len(); i++ { + db := servers.Index(i).Interface().(*TiProxySpec) + uniqueHosts.Insert(db.Host) + cfig.AddTiProxy(db.Host, uint64(db.StatusPort)) + } + } if servers, found := topoHasField("TiFlashServers"); found { for i := 0; i < servers.Len(); i++ { flash := servers.Index(i).Interface().(*TiFlashSpec) diff --git a/pkg/cluster/spec/spec.go b/pkg/cluster/spec/spec.go index 47e67251a1..ebefdc466d 100644 --- a/pkg/cluster/spec/spec.go +++ b/pkg/cluster/spec/spec.go @@ -109,6 +109,7 @@ type ( PD map[string]any `yaml:"pd"` Dashboard map[string]any `yaml:"tidb_dashboard"` TiFlash map[string]any `yaml:"tiflash"` + TiProxy map[string]any `yaml:"tiproxy"` TiFlashLearner map[string]any `yaml:"tiflash-learner"` Pump map[string]any `yaml:"pump"` Drainer map[string]any `yaml:"drainer"` @@ -125,6 +126,7 @@ type ( TiDBServers []*TiDBSpec `yaml:"tidb_servers"` TiKVServers []*TiKVSpec `yaml:"tikv_servers"` TiFlashServers []*TiFlashSpec `yaml:"tiflash_servers"` + TiProxyServers []*TiProxySpec `yaml:"tiproxy_servers"` PDServers []*PDSpec `yaml:"pd_servers"` DashboardServers []*DashboardSpec `yaml:"tidb_dashboard_servers,omitempty"` PumpServers []*PumpSpec `yaml:"pump_servers,omitempty"` @@ -502,6 +504,7 @@ func (s *Specification) Merge(that Topology) Topology { PDServers: append(s.PDServers, spec.PDServers...), DashboardServers: append(s.DashboardServers, spec.DashboardServers...), TiFlashServers: append(s.TiFlashServers, spec.TiFlashServers...), + TiProxyServers: append(s.TiProxyServers, spec.TiProxyServers...), PumpServers: append(s.PumpServers, spec.PumpServers...), Drainers: append(s.Drainers, spec.Drainers...), CDCServers: append(s.CDCServers, spec.CDCServers...), @@ -712,6 +715,7 @@ func (s *Specification) ComponentsByStartOrder() (comps []Component) { // "pd", "dashboard", "tikv", "pump", "tidb", "tiflash", "drainer", "cdc", "tikv-cdc", "prometheus", "grafana", "alertmanager" comps = append(comps, &PDComponent{s}) comps = append(comps, &DashboardComponent{s}) + comps = append(comps, &TiProxyComponent{s}) comps = append(comps, &TiKVComponent{s}) comps = append(comps, &PumpComponent{s}) comps = append(comps, &TiDBComponent{s}) @@ -739,6 +743,7 @@ func (s *Specification) ComponentsByUpdateOrder(curVer string) (comps []Componen } comps = append(comps, &PDComponent{s}) comps = append(comps, &DashboardComponent{s}) + comps = append(comps, &TiProxyComponent{s}) comps = append(comps, &TiKVComponent{s}) comps = append(comps, &PumpComponent{s}) comps = append(comps, &TiDBComponent{s}) diff --git a/pkg/cluster/spec/tiproxy.go b/pkg/cluster/spec/tiproxy.go new file mode 100644 index 0000000000..09c19d1051 --- /dev/null +++ b/pkg/cluster/spec/tiproxy.go @@ -0,0 +1,312 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package spec + +import ( + "bytes" + "context" + "crypto/tls" + "fmt" + "path/filepath" + "strings" + "time" + + "github.com/pingcap/tiup/pkg/cluster/ctxt" + "github.com/pingcap/tiup/pkg/cluster/template/scripts" + "github.com/pingcap/tiup/pkg/meta" + "github.com/pingcap/tiup/pkg/utils" + "github.com/prometheus/common/expfmt" +) + +func proxyUptimeByHost(host string, port int, timeout time.Duration, tlsCfg *tls.Config) time.Duration { + if timeout < time.Second { + timeout = statusQueryTimeout + } + + scheme := "http" + if tlsCfg != nil { + scheme = "https" + } + url := fmt.Sprintf("%s://%s/api/metrics", scheme, utils.JoinHostPort(host, port)) + + client := utils.NewHTTPClient(timeout, tlsCfg) + + body, err := client.Get(context.TODO(), url) + if err != nil || body == nil { + return 0 + } + + var parser expfmt.TextParser + reader := bytes.NewReader(body) + mf, err := parser.TextToMetricFamilies(reader) + if err != nil { + return 0 + } + + now := time.Now() + for k, v := range mf { + if k == promMetricStartTimeSeconds { + ms := v.GetMetric() + if len(ms) >= 1 { + startTime := ms[0].Gauge.GetValue() + return now.Sub(time.Unix(int64(startTime), 0)) + } + return 0 + } + } + + return 0 +} + +// TiProxySpec represents the TiProxy topology specification in topology.yaml +type TiProxySpec struct { + Host string `yaml:"host"` + ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"` + SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"` + Port int `yaml:"port" default:"6000"` + StatusPort int `yaml:"status_port" default:"3080"` + DeployDir string `yaml:"deploy_dir,omitempty"` + NumaNode string `yaml:"numa_node,omitempty" validate:"numa_node:editable"` + Config map[string]any `yaml:"config,omitempty" validate:"config:ignore"` + Arch string `yaml:"arch,omitempty"` + OS string `yaml:"os,omitempty"` +} + +// Role returns the component role of the instance +func (s *TiProxySpec) Role() string { + return ComponentTiProxy +} + +// SSH returns the host and SSH port of the instance +func (s *TiProxySpec) SSH() (string, int) { + return s.Host, s.SSHPort +} + +// GetMainPort returns the main port of the instance +func (s *TiProxySpec) GetMainPort() int { + return s.Port +} + +// GetManageHost returns the manage host of the instance +func (s *TiProxySpec) GetManageHost() string { + if s.ManageHost != "" { + return s.ManageHost + } + return s.Host +} + +// IsImported returns if the node is imported from TiDB-Ansible +func (s *TiProxySpec) IsImported() bool { + return false +} + +// IgnoreMonitorAgent returns if the node does not have monitor agents available +func (s *TiProxySpec) IgnoreMonitorAgent() bool { + return false +} + +// TiProxyComponent represents TiProxy component. +type TiProxyComponent struct{ Topology *Specification } + +// Name implements Component interface. +func (c *TiProxyComponent) Name() string { + return ComponentTiProxy +} + +// Role implements Component interface. +func (c *TiProxyComponent) Role() string { + return ComponentTiProxy +} + +// Instances implements Component interface. +func (c *TiProxyComponent) Instances() []Instance { + ins := make([]Instance, 0, len(c.Topology.TiProxyServers)) + for _, s := range c.Topology.TiProxyServers { + s := s + instance := &TiProxyInstance{BaseInstance{ + InstanceSpec: s, + Name: c.Name(), + Host: s.Host, + ManageHost: s.ManageHost, + Port: s.Port, + SSHP: s.SSHPort, + Source: ComponentTiProxy, + Ports: []int{ + s.Port, + s.StatusPort, + }, + Dirs: []string{ + s.DeployDir, + }, + StatusFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config, _ ...string) string { + return statusByHost(s.Host, s.StatusPort, "/api/debug/health", timeout, tlsCfg) + }, + UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { + return proxyUptimeByHost(s.Host, s.StatusPort, timeout, tlsCfg) + }, + }, c.Topology} + + ins = append(ins, instance) + } + return ins +} + +// TiProxyInstance represent the TiProxy instance. +type TiProxyInstance struct { + BaseInstance + topo Topology +} + +// ScaleConfig deploy temporary config on scaling +func (i *TiProxyInstance) ScaleConfig( + ctx context.Context, + e ctxt.Executor, + topo Topology, + clusterName, + clusterVersion, + user string, + paths meta.DirPaths, +) error { + s := i.topo + defer func() { + i.topo = s + }() + i.topo = mustBeClusterTopo(topo) + + return i.InitConfig(ctx, e, clusterName, clusterVersion, user, paths) +} + +func (i *TiProxyInstance) checkConfig( + cfg map[string]any, + paths meta.DirPaths, +) map[string]any { + topo := i.topo.(*Specification) + spec := i.InstanceSpec.(*TiProxySpec) + enableTLS := topo.GlobalOptions.TLSEnabled + + if cfg == nil { + cfg = make(map[string]any) + } + + pds := []string{} + for _, pdspec := range topo.PDServers { + pds = append(pds, pdspec.GetAdvertiseClientURL(enableTLS)) + } + cfg["proxy.pd-addrs"] = strings.Join(pds, ",") + cfg["proxy.require-backend-tls"] = false + cfg["proxy.addr"] = utils.JoinHostPort(i.GetListenHost(), i.GetPort()) + cfg["api.addr"] = utils.JoinHostPort(i.GetListenHost(), spec.StatusPort) + cfg["log.log-file.filename"] = filepath.Join(paths.Log, "tiproxy.log") + + return cfg +} + +// InitConfig implements Instance interface. +func (i *TiProxyInstance) InitConfig( + ctx context.Context, + e ctxt.Executor, + clusterName, + clusterVersion, + deployUser string, + paths meta.DirPaths, +) error { + topo := i.topo.(*Specification) + if err := i.BaseInstance.InitConfig(ctx, e, topo.GlobalOptions, deployUser, paths); err != nil { + return err + } + spec := i.InstanceSpec.(*TiProxySpec) + globalConfig := topo.ServerConfigs.TiProxy + instanceConfig := i.checkConfig(spec.Config, paths) + + cfg := &scripts.TiProxyScript{ + DeployDir: paths.Deploy, + NumaNode: spec.NumaNode, + } + + fp := filepath.Join(paths.Cache, fmt.Sprintf("run_tiproxy_%s_%d.sh", i.GetHost(), i.GetPort())) + + if err := cfg.ConfigToFile(fp); err != nil { + return err + } + dst := filepath.Join(paths.Deploy, "scripts", "run_tiproxy.sh") + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { + return err + } + + if _, _, err := e.Execute(ctx, "chmod +x "+dst, false); err != nil { + return err + } + + var err error + instanceConfig, err = i.setTLSConfig(ctx, false, instanceConfig, paths) + if err != nil { + return err + } + + return i.MergeServerConfig(ctx, e, globalConfig, instanceConfig, paths) +} + +// setTLSConfig set TLS Config to support enable/disable TLS +func (i *TiProxyInstance) setTLSConfig(ctx context.Context, enableTLS bool, configs map[string]any, paths meta.DirPaths) (map[string]any, error) { + if configs == nil { + configs = make(map[string]any) + } + if enableTLS { + configs["security.cluster-tls.ca"] = fmt.Sprintf("%s/tls/%s", paths.Deploy, TLSCACert) + configs["security.cluster-tls.cert"] = fmt.Sprintf("%s/tls/%s.crt", paths.Deploy, i.Role()) + configs["security.cluster-tls.key"] = fmt.Sprintf("%s/tls/%s.pem", paths.Deploy, i.Role()) + + configs["security.server-tls.ca"] = fmt.Sprintf("%s/tls/%s", paths.Deploy, TLSCACert) + configs["security.server-tls.cert"] = fmt.Sprintf("%s/tls/%s.crt", paths.Deploy, i.Role()) + configs["security.server-tls.key"] = fmt.Sprintf("%s/tls/%s.pem", paths.Deploy, i.Role()) + configs["security.server-tls.skip-ca"] = true + + configs["security.sql-tls.ca"] = fmt.Sprintf("%s/tls/%s", paths.Deploy, TLSCACert) + } else { + // drainer tls config list + tlsConfigs := []string{ + "security.cluster-tls.ca", + "security.cluster-tls.cert", + "security.cluster-tls.key", + "security.server-tls.ca", + "security.server-tls.cert", + "security.server-tls.key", + "security.server-tls.skip-ca", + "security.sql-tls.ca", + } + // delete TLS configs + for _, config := range tlsConfigs { + delete(configs, config) + } + } + + return nil, nil +} + +var _ RollingUpdateInstance = &TiProxyInstance{} + +// GetAddr return the address of this TiProxy instance +func (i *TiProxyInstance) GetAddr() string { + return utils.JoinHostPort(i.GetHost(), i.GetPort()) +} + +// PreRestart implements RollingUpdateInstance interface. +func (i *TiProxyInstance) PreRestart(ctx context.Context, topo Topology, apiTimeoutSeconds int, tlsCfg *tls.Config) error { + return nil +} + +// PostRestart implements RollingUpdateInstance interface. +func (i *TiProxyInstance) PostRestart(ctx context.Context, topo Topology, tlsCfg *tls.Config) error { + return nil +} diff --git a/pkg/cluster/task/update_meta.go b/pkg/cluster/task/update_meta.go index 8e9e3d3e6d..dd6457e1b9 100644 --- a/pkg/cluster/task/update_meta.go +++ b/pkg/cluster/task/update_meta.go @@ -72,6 +72,15 @@ func (u *UpdateMeta) Execute(ctx context.Context) error { } newMeta.Topology.PDServers = pdServers + tiproxyServers := make([]*spec.TiProxySpec, 0) + for i, instance := range (&spec.TiProxyComponent{Topology: topo}).Instances() { + if deleted.Exist(instance.ID()) { + continue + } + tiproxyServers = append(tiproxyServers, topo.TiProxyServers[i]) + } + newMeta.Topology.TiProxyServers = tiproxyServers + dashboardServers := make([]*spec.DashboardSpec, 0) for i, instance := range (&spec.DashboardComponent{Topology: topo}).Instances() { if deleted.Exist(instance.ID()) { @@ -79,7 +88,7 @@ func (u *UpdateMeta) Execute(ctx context.Context) error { } dashboardServers = append(dashboardServers, topo.DashboardServers[i]) } - topo.DashboardServers = dashboardServers + newMeta.Topology.DashboardServers = dashboardServers tiflashServers := make([]*spec.TiFlashSpec, 0) for i, instance := range (&spec.TiFlashComponent{Topology: topo}).Instances() { diff --git a/pkg/cluster/template/config/prometheus.go b/pkg/cluster/template/config/prometheus.go index eacef47238..90d6b00051 100644 --- a/pkg/cluster/template/config/prometheus.go +++ b/pkg/cluster/template/config/prometheus.go @@ -31,6 +31,7 @@ type PrometheusConfig struct { TLSEnabled bool NodeExporterAddrs []string TiDBStatusAddrs []string + TiProxyStatusAddrs []string TiKVStatusAddrs []string PDAddrs []string TiFlashStatusAddrs []string @@ -79,6 +80,12 @@ func (c *PrometheusConfig) AddTiDB(ip string, port uint64) *PrometheusConfig { return c } +// AddTiProxy add a TiProxy address +func (c *PrometheusConfig) AddTiProxy(ip string, port uint64) *PrometheusConfig { + c.TiProxyStatusAddrs = append(c.TiProxyStatusAddrs, utils.JoinHostPort(ip, int(port))) + return c +} + // AddTiKV add a TiKV address func (c *PrometheusConfig) AddTiKV(ip string, port uint64) *PrometheusConfig { c.TiKVStatusAddrs = append(c.TiKVStatusAddrs, utils.JoinHostPort(ip, int(port))) diff --git a/pkg/cluster/template/scripts/tiproxy.go b/pkg/cluster/template/scripts/tiproxy.go new file mode 100644 index 0000000000..d0bd8d5980 --- /dev/null +++ b/pkg/cluster/template/scripts/tiproxy.go @@ -0,0 +1,49 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package scripts + +import ( + "bytes" + "path" + "text/template" + + "github.com/pingcap/tiup/embed" + "github.com/pingcap/tiup/pkg/utils" +) + +// TiProxyScript represent the data to generate tiproxy config +type TiProxyScript struct { + DeployDir string + NumaNode string +} + +// ConfigToFile write config content to specific file. +func (c *TiProxyScript) ConfigToFile(file string) error { + fp := path.Join("templates", "scripts", "run_tiproxy.sh.tpl") + tpl, err := embed.ReadTemplate(fp) + if err != nil { + return err + } + tmpl, err := template.New("TiProxy").Parse(string(tpl)) + if err != nil { + return err + } + + content := bytes.NewBufferString("") + if err := tmpl.Execute(content, c); err != nil { + return err + } + + return utils.WriteFile(file, content.Bytes(), 0755) +}