From d6a2826a14a673d36f4f1f22aff2afc149ad1245 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Thu, 15 Sep 2022 01:38:59 +0800 Subject: [PATCH] cluster: Support TiKV-CDC component (#2022) --- .github/workflows/integrate-cluster-cmd.yaml | 1 + docker/up.sh | 2 +- embed/examples/cluster/topology.example.yaml | 16 + embed/templates/config/prometheus.yml.tpl | 17 + embed/templates/scripts/run_tikv-cdc.sh.tpl | 42 +++ pkg/cluster/ansible/import_test.go | 3 +- pkg/cluster/ansible/test-data/meta.yaml | 1 + pkg/cluster/api/tikv_cdc.go | 229 +++++++++++++ pkg/cluster/spec/bindversion.go | 3 +- pkg/cluster/spec/monitoring.go | 7 + pkg/cluster/spec/spec.go | 6 +- pkg/cluster/spec/spec_test.go | 71 +++++ pkg/cluster/spec/tikv_cdc.go | 300 ++++++++++++++++++ pkg/cluster/task/update_meta.go | 9 + pkg/cluster/template/config/prometheus.go | 7 + pkg/cluster/template/scripts/tikv_cdc.go | 103 ++++++ pkg/tidbver/tidbver.go | 6 + tests/tiup-cluster/run.sh | 9 + tests/tiup-cluster/script/cmd_subtest.sh | 35 +- tests/tiup-cluster/script/scale_core.sh | 2 +- tests/tiup-cluster/script/tikv_cdc.sh | 205 ++++++++++++ tests/tiup-cluster/test_cmd.sh | 4 +- tests/tiup-cluster/test_cmd_tls_native_ssh.sh | 4 +- tests/tiup-cluster/test_scale_core.sh | 4 +- tests/tiup-cluster/test_tikv_cdc.sh | 16 + tests/tiup-cluster/topo/full.yaml | 5 + tests/tiup-cluster/topo/tikv_cdc.yaml | 46 +++ 27 files changed, 1137 insertions(+), 16 deletions(-) create mode 100644 embed/templates/scripts/run_tikv-cdc.sh.tpl create mode 100644 pkg/cluster/api/tikv_cdc.go create mode 100644 pkg/cluster/spec/tikv_cdc.go create mode 100644 pkg/cluster/template/scripts/tikv_cdc.go create mode 100755 tests/tiup-cluster/script/tikv_cdc.sh create mode 100755 tests/tiup-cluster/test_tikv_cdc.sh create mode 100644 tests/tiup-cluster/topo/tikv_cdc.yaml diff --git a/.github/workflows/integrate-cluster-cmd.yaml b/.github/workflows/integrate-cluster-cmd.yaml index 79b6416885..7429ab37ca 100644 --- a/.github/workflows/integrate-cluster-cmd.yaml +++ b/.github/workflows/integrate-cluster-cmd.yaml @@ -40,6 +40,7 @@ jobs: # - 'test_cmd_tls_native_ssh' - 'test_upgrade' - 'test_upgrade_tls' + - 'test_tikv_cdc' env: working-directory: ${{ github.workspace }}/go/src/github.com/${{ github.repository }} steps: diff --git a/docker/up.sh b/docker/up.sh index 62b8858fb6..468a05a90a 100755 --- a/docker/up.sh +++ b/docker/up.sh @@ -159,7 +159,7 @@ if [ -z "${DEV}" ]; then ) else INFO "Build tiup-cluster in $TIUP_CLUSTER_ROOT" - (cd "${TIUP_CLUSTER_ROOT}";make failpoint-enable;GOOS=linux GOARCH=amd64 make cluster dm;make failpoint-disable) + (cd "${TIUP_CLUSTER_ROOT}";make failpoint-enable;GOOS=linux GOARCH=amd64 make tiup cluster dm;make failpoint-disable) fi if [ "${INIT_ONLY}" -eq 1 ]; then diff --git a/embed/examples/cluster/topology.example.yaml b/embed/examples/cluster/topology.example.yaml index c81f4824fe..be1c95dfd6 100644 --- a/embed/examples/cluster/topology.example.yaml +++ b/embed/examples/cluster/topology.example.yaml @@ -58,6 +58,7 @@ monitored: # pd: # tiflash: # tiflash-learner: + # kvcdc: # # Server configs are used to specify the configuration of PD Servers. pd_servers: @@ -278,6 +279,21 @@ tiflash_servers: data_dir: /data2/tidb-data/tiflash-9001 log_dir: /data2/tidb-deploy/tiflash-9001/log +# # Server configs are used to specify the configuration of TiKV-CDC Servers. +kvcdc_servers: + - host: 10.0.1.20 + # # SSH port of the server. + # ssh_port: 22 + # # TiKV-CDC Server communication port. + port: 8600 + # # TiKV-CDC Server data storage directory. + data_dir: "/data1/tidb-data/tikv-cdc-8600" + # # TiKV-CDC Server log file storage directory. + log_dir: "/data1/tidb-deploy/tikv-cdc-8600/log" + - host: 10.0.1.21 + data_dir: "/data1/tidb-data/tikv-cdc-8600" + log_dir: "/data1/tidb-deploy/tikv-cdc-8600/log" + # # Server configs are used to specify the configuration of Prometheus Server. monitoring_servers: # # The ip address of the Monitoring Server. diff --git a/embed/templates/config/prometheus.yml.tpl b/embed/templates/config/prometheus.yml.tpl index 65a4b8b4cf..02226b14eb 100644 --- a/embed/templates/config/prometheus.yml.tpl +++ b/embed/templates/config/prometheus.yml.tpl @@ -245,6 +245,23 @@ scrape_configs: - '{{.}}' {{- end}} {{- end}} +{{- if .TiKVCDCAddrs}} + - job_name: "tikv-cdc" + honor_labels: true # don't overwrite job & instance labels +{{- if .TLSEnabled}} + scheme: https + tls_config: + insecure_skip_verify: false + ca_file: ../tls/ca.crt + cert_file: ../tls/prometheus.crt + key_file: ../tls/prometheus.pem +{{- end}} + static_configs: + - targets: +{{- range .TiKVCDCAddrs}} + - '{{.}}' +{{- end}} +{{- end}} {{- if .NGMonitoringAddrs}} - job_name: "ng-monitoring" honor_labels: true # don't overwrite job & instance labels diff --git a/embed/templates/scripts/run_tikv-cdc.sh.tpl b/embed/templates/scripts/run_tikv-cdc.sh.tpl new file mode 100644 index 0000000000..4b465c95ff --- /dev/null +++ b/embed/templates/scripts/run_tikv-cdc.sh.tpl @@ -0,0 +1,42 @@ +#!/bin/bash +set -e + +# WARNING: This file was auto-generated. Do not edit! +# All your edit might be overwritten! +DEPLOY_DIR={{.DeployDir}} +cd "${DEPLOY_DIR}" || exit 1 + +{{- define "PDList"}} + {{- range $idx, $pd := .}} + {{- if eq $idx 0}} + {{- $pd.Scheme}}://{{$pd.IP}}:{{$pd.ClientPort}} + {{- else -}} + ,{{- $pd.Scheme}}://{{$pd.IP}}:{{$pd.ClientPort}} + {{- end}} + {{- end}} +{{- end}} + +{{- if .NumaNode}} +exec numactl --cpunodebind={{.NumaNode}} --membind={{.NumaNode}} bin/tikv-cdc server \ +{{- else}} +exec bin/tikv-cdc server \ +{{- end}} + --addr "0.0.0.0:{{.Port}}" \ + --advertise-addr "{{.IP}}:{{.Port}}" \ + --pd "{{template "PDList" .Endpoints}}" \ +{{- if .DataDir}} + --data-dir="{{.DataDir}}" \ +{{- end}} +{{- if .TLSEnabled}} + --ca tls/ca.crt \ + --cert tls/cdc.crt \ + --key tls/cdc.pem \ +{{- end}} +{{- if .GCTTL}} + --gc-ttl {{.GCTTL}} \ +{{- end}} +{{- if .TZ}} + --tz "{{.TZ}}" \ +{{- end}} + --config conf/tikv-cdc.toml \ + --log-file "{{.LogDir}}/tikv-cdc.log" 2>> "{{.LogDir}}/tikv-cdc_stderr.log" diff --git a/pkg/cluster/ansible/import_test.go b/pkg/cluster/ansible/import_test.go index e06c24da32..f624362e38 100644 --- a/pkg/cluster/ansible/import_test.go +++ b/pkg/cluster/ansible/import_test.go @@ -129,6 +129,7 @@ server_configs: pump: {} drainer: {} cdc: {} + kvcdc: {} grafana: {} tidb_servers: [] tikv_servers: [] @@ -139,7 +140,7 @@ monitoring_servers: [] topo, err := yaml.Marshal(clsMeta.Topology) c.Assert(err, IsNil) - c.Assert(topo, DeepEquals, expected) + c.Assert(string(topo), DeepEquals, string(expected)) } func (s *ansSuite) TestParseGroupVars(c *C) { diff --git a/pkg/cluster/ansible/test-data/meta.yaml b/pkg/cluster/ansible/test-data/meta.yaml index d179ff869e..55d4c7fc29 100644 --- a/pkg/cluster/ansible/test-data/meta.yaml +++ b/pkg/cluster/ansible/test-data/meta.yaml @@ -24,6 +24,7 @@ topology: pump: {} drainer: {} cdc: {} + kvcdc: {} grafana: {} tidb_servers: - host: 172.16.1.218 diff --git a/pkg/cluster/api/tikv_cdc.go b/pkg/cluster/api/tikv_cdc.go new file mode 100644 index 0000000000..3a7caa2674 --- /dev/null +++ b/pkg/cluster/api/tikv_cdc.go @@ -0,0 +1,229 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "context" + "crypto/tls" + "encoding/json" + "fmt" + "net/http" + "time" + + "github.com/pingcap/errors" + logprinter "github.com/pingcap/tiup/pkg/logger/printer" + "github.com/pingcap/tiup/pkg/utils" +) + +// TiKVCDCOpenAPIClient is client for access TiKVCDC Open API +type TiKVCDCOpenAPIClient struct { + urls []string + client *utils.HTTPClient + ctx context.Context +} + +// NewTiKVCDCOpenAPIClient return a `TiKVCDCOpenAPIClient` +func NewTiKVCDCOpenAPIClient(ctx context.Context, addresses []string, timeout time.Duration, tlsConfig *tls.Config) *TiKVCDCOpenAPIClient { + httpPrefix := "http" + if tlsConfig != nil { + httpPrefix = "https" + } + urls := make([]string, 0, len(addresses)) + for _, addr := range addresses { + urls = append(urls, fmt.Sprintf("%s://%s", httpPrefix, addr)) + } + + return &TiKVCDCOpenAPIClient{ + urls: urls, + client: utils.NewHTTPClient(timeout, tlsConfig), + ctx: ctx, + } +} + +func (c *TiKVCDCOpenAPIClient) getEndpoints(api string) (endpoints []string) { + for _, url := range c.urls { + endpoints = append(endpoints, fmt.Sprintf("%s/%s", url, api)) + } + return endpoints +} + +// ResignOwner resign the TiKV-CDC owner, and wait for a new owner be found +func (c *TiKVCDCOpenAPIClient) ResignOwner() error { + api := "api/v1/owner/resign" + endpoints := c.getEndpoints(api) + _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { + body, statusCode, err := c.client.PostWithStatusCode(c.ctx, endpoint, nil) + if err != nil { + if statusCode == http.StatusNotFound { + c.l().Debugf("resign owner does not found, ignore it, err: %+v", err) + return body, nil + } + return body, err + } + return body, nil + }) + + if err != nil { + return err + } + + owner, err := c.GetOwner() + if err != nil { + return err + } + + c.l().Debugf("tikv-cdc resign owner successfully, and new owner found, owner: %+v", owner) + return nil +} + +// GetOwner return the TiKV-CDC owner capture information +func (c *TiKVCDCOpenAPIClient) GetOwner() (*TiKVCDCCapture, error) { + captures, err := c.GetAllCaptures() + if err != nil { + return nil, err + } + + for _, capture := range captures { + if capture.IsOwner { + return capture, nil + } + } + return nil, fmt.Errorf("cannot found the tikv-cdc owner, query urls: %+v", c.urls) +} + +// GetCaptureByAddr return the capture information by the address +func (c *TiKVCDCOpenAPIClient) GetCaptureByAddr(addr string) (*TiKVCDCCapture, error) { + captures, err := c.GetAllCaptures() + if err != nil { + return nil, err + } + + for _, capture := range captures { + if capture.AdvertiseAddr == addr { + return capture, nil + } + } + + return nil, fmt.Errorf("capture not found, addr: %s", addr) +} + +// GetAllCaptures return all captures instantaneously +func (c *TiKVCDCOpenAPIClient) GetAllCaptures() (result []*TiKVCDCCapture, err error) { + err = utils.Retry(func() error { + result, err = c.getAllCaptures() + if err != nil { + return err + } + return nil + }, utils.RetryOption{ + Timeout: 10 * time.Second, + }) + return result, err +} + +func (c *TiKVCDCOpenAPIClient) getAllCaptures() ([]*TiKVCDCCapture, error) { + api := "api/v1/captures" + endpoints := c.getEndpoints(api) + + var response []*TiKVCDCCapture + + _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { + body, statusCode, err := c.client.GetWithStatusCode(c.ctx, endpoint) + if err != nil { + if statusCode == http.StatusNotFound { + // Ignore error, and return nil to trigger hard restart + c.l().Debugf("get all captures failed, ignore it, err: %+v", err) + return body, nil + } + return body, err + } + + return body, json.Unmarshal(body, &response) + }) + + return response, err +} + +// IsCaptureAlive return error if the capture is not alive +func (c *TiKVCDCOpenAPIClient) IsCaptureAlive() error { + status, err := c.GetStatus() + if err != nil { + return err + } + if status.Liveness != TiKVCDCCaptureAlive { + return fmt.Errorf("capture is not alive, request url: %+v", c.urls[0]) + } + return nil +} + +// GetStatus return the status of the TiKVCDC server. +func (c *TiKVCDCOpenAPIClient) GetStatus() (result TiKVCDCServerStatus, err error) { + api := "api/v1/status" + // client should only have address to the target TiKV-CDC server, not all. + endpoints := c.getEndpoints(api) + + err = utils.Retry(func() error { + data, statusCode, err := c.client.GetWithStatusCode(c.ctx, endpoints[0]) + if err != nil { + if statusCode == http.StatusNotFound { + c.l().Debugf("capture server status failed, ignore it, err: %+v", err) + return nil + } + err = json.Unmarshal(data, &result) + if err != nil { + return err + } + if result.Liveness == TiKVCDCCaptureAlive { + return nil + } + return errors.New("capture status is not alive, retry it") + } + return nil + }, utils.RetryOption{ + Timeout: 10 * time.Second, + }) + + return result, err +} + +func (c *TiKVCDCOpenAPIClient) l() *logprinter.Logger { + return c.ctx.Value(logprinter.ContextKeyLogger).(*logprinter.Logger) +} + +// TiKVCDCLiveness is the liveness status of a capture. +type TiKVCDCLiveness int32 + +const ( + // TiKVCDCCaptureAlive means the capture is alive, and ready to serve. + TiKVCDCCaptureAlive TiKVCDCLiveness = 0 + // TiKVCDCCaptureStopping means the capture is in the process of graceful shutdown. + TiKVCDCCaptureStopping TiKVCDCLiveness = 1 +) + +// TiKVCDCServerStatus holds some common information of a TiCDC server +type TiKVCDCServerStatus struct { + Version string `json:"version"` + GitHash string `json:"git_hash"` + ID string `json:"id"` + Pid int `json:"pid"` + IsOwner bool `json:"is_owner"` + Liveness TiKVCDCLiveness `json:"liveness"` +} + +// TiKVCDCCapture holds common information of a capture in cdc +type TiKVCDCCapture struct { + ID string `json:"id"` + IsOwner bool `json:"is_owner"` + AdvertiseAddr string `json:"address"` +} diff --git a/pkg/cluster/spec/bindversion.go b/pkg/cluster/spec/bindversion.go index e23f88ea93..6e6ac9fcf8 100644 --- a/pkg/cluster/spec/bindversion.go +++ b/pkg/cluster/spec/bindversion.go @@ -28,7 +28,8 @@ func TiDBComponentVersion(comp, version string) string { ComponentPushwaygate, ComponentCheckCollector, ComponentSpark, - ComponentTiSpark: + ComponentTiSpark, + ComponentTiKVCDC: // TiKV-CDC use individual version. return "" default: return version diff --git a/pkg/cluster/spec/monitoring.go b/pkg/cluster/spec/monitoring.go index ccf21a949a..2563e6bc76 100644 --- a/pkg/cluster/spec/monitoring.go +++ b/pkg/cluster/spec/monitoring.go @@ -251,6 +251,13 @@ func (i *MonitorInstance) InitConfig( cfig.AddCDC(cdc.Host, uint64(cdc.Port)) } } + if servers, found := topoHasField("TiKVCDCServers"); found { + for i := 0; i < servers.Len(); i++ { + tikvCdc := servers.Index(i).Interface().(*TiKVCDCSpec) + uniqueHosts.Insert(tikvCdc.Host) + cfig.AddTiKVCDC(tikvCdc.Host, uint64(tikvCdc.Port)) + } + } if servers, found := topoHasField("Monitors"); found { for i := 0; i < servers.Len(); i++ { monitoring := servers.Index(i).Interface().(*PrometheusSpec) diff --git a/pkg/cluster/spec/spec.go b/pkg/cluster/spec/spec.go index 4a891e7845..e01cc2407d 100644 --- a/pkg/cluster/spec/spec.go +++ b/pkg/cluster/spec/spec.go @@ -112,6 +112,7 @@ type ( Pump map[string]interface{} `yaml:"pump"` Drainer map[string]interface{} `yaml:"drainer"` CDC map[string]interface{} `yaml:"cdc"` + TiKVCDC map[string]interface{} `yaml:"kvcdc"` Grafana map[string]string `yaml:"grafana"` } @@ -127,6 +128,7 @@ type ( PumpServers []*PumpSpec `yaml:"pump_servers,omitempty"` Drainers []*DrainerSpec `yaml:"drainer_servers,omitempty"` CDCServers []*CDCSpec `yaml:"cdc_servers,omitempty"` + TiKVCDCServers []*TiKVCDCSpec `yaml:"kvcdc_servers,omitempty"` TiSparkMasters []*TiSparkMasterSpec `yaml:"tispark_masters,omitempty"` TiSparkWorkers []*TiSparkWorkerSpec `yaml:"tispark_workers,omitempty"` Monitors []*PrometheusSpec `yaml:"monitoring_servers"` @@ -485,6 +487,7 @@ func (s *Specification) Merge(that Topology) Topology { PumpServers: append(s.PumpServers, spec.PumpServers...), Drainers: append(s.Drainers, spec.Drainers...), CDCServers: append(s.CDCServers, spec.CDCServers...), + TiKVCDCServers: append(s.TiKVCDCServers, spec.TiKVCDCServers...), TiSparkMasters: append(s.TiSparkMasters, spec.TiSparkMasters...), TiSparkWorkers: append(s.TiSparkWorkers, spec.TiSparkWorkers...), Monitors: append(s.Monitors, spec.Monitors...), @@ -688,7 +691,7 @@ func (s *Specification) ComponentsByStopOrder() (comps []Component) { // ComponentsByStartOrder return component in the order need to start. func (s *Specification) ComponentsByStartOrder() (comps []Component) { - // "pd", "tikv", "pump", "tidb", "tiflash", "drainer", "cdc", "prometheus", "grafana", "alertmanager" + // "pd", "tikv", "pump", "tidb", "tiflash", "drainer", "cdc", "tikv-cdc", "prometheus", "grafana", "alertmanager" comps = append(comps, &PDComponent{s}) comps = append(comps, &TiKVComponent{s}) comps = append(comps, &PumpComponent{s}) @@ -696,6 +699,7 @@ func (s *Specification) ComponentsByStartOrder() (comps []Component) { comps = append(comps, &TiFlashComponent{s}) comps = append(comps, &DrainerComponent{s}) comps = append(comps, &CDCComponent{s}) + comps = append(comps, &TiKVCDCComponent{s}) comps = append(comps, &MonitorComponent{s}) comps = append(comps, &GrafanaComponent{s}) comps = append(comps, &AlertManagerComponent{s}) diff --git a/pkg/cluster/spec/spec_test.go b/pkg/cluster/spec/spec_test.go index 2c63e49426..6791f69d47 100644 --- a/pkg/cluster/spec/spec_test.go +++ b/pkg/cluster/spec/spec_test.go @@ -40,6 +40,7 @@ func (s *metaSuiteTopo) TestDefaultDataDir(c *C) { topo := new(Specification) topo.TiKVServers = append(topo.TiKVServers, &TiKVSpec{Host: "1.1.1.1", Port: 22}) topo.CDCServers = append(topo.CDCServers, &CDCSpec{Host: "2.3.3.3", Port: 22}) + topo.TiKVCDCServers = append(topo.TiKVCDCServers, &TiKVCDCSpec{Host: "3.3.3.3", Port: 22}) data, err := yaml.Marshal(topo) c.Assert(err, IsNil) @@ -50,6 +51,7 @@ func (s *metaSuiteTopo) TestDefaultDataDir(c *C) { c.Assert(topo.GlobalOptions.DataDir, Equals, "data") c.Assert(topo.TiKVServers[0].DataDir, Equals, "data") c.Assert(topo.CDCServers[0].DataDir, Equals, "data") + c.Assert(topo.TiKVCDCServers[0].DataDir, Equals, "data") // Can keep the default value. data, err = yaml.Marshal(topo) @@ -60,6 +62,7 @@ func (s *metaSuiteTopo) TestDefaultDataDir(c *C) { c.Assert(topo.GlobalOptions.DataDir, Equals, "data") c.Assert(topo.TiKVServers[0].DataDir, Equals, "data") c.Assert(topo.CDCServers[0].DataDir, Equals, "data") + c.Assert(topo.TiKVCDCServers[0].DataDir, Equals, "data") // Test with global DataDir. topo = new(Specification) @@ -68,6 +71,8 @@ func (s *metaSuiteTopo) TestDefaultDataDir(c *C) { topo.TiKVServers = append(topo.TiKVServers, &TiKVSpec{Host: "1.1.1.2", Port: 33, DataDir: "/my_data"}) topo.CDCServers = append(topo.CDCServers, &CDCSpec{Host: "2.3.3.3", Port: 22}) topo.CDCServers = append(topo.CDCServers, &CDCSpec{Host: "2.3.3.4", Port: 22, DataDir: "/cdc_data"}) + topo.TiKVCDCServers = append(topo.TiKVCDCServers, &TiKVCDCSpec{Host: "3.3.3.3", Port: 22}) + topo.TiKVCDCServers = append(topo.TiKVCDCServers, &TiKVCDCSpec{Host: "3.3.3.4", Port: 22, DataDir: "/tikv-cdc_data"}) data, err = yaml.Marshal(topo) c.Assert(err, IsNil) @@ -80,6 +85,9 @@ func (s *metaSuiteTopo) TestDefaultDataDir(c *C) { c.Assert(topo.CDCServers[0].DataDir, Equals, "/global_data/cdc-22") c.Assert(topo.CDCServers[1].DataDir, Equals, "/cdc_data") + + c.Assert(topo.TiKVCDCServers[0].DataDir, Equals, "/global_data/tikv-cdc-22") + c.Assert(topo.TiKVCDCServers[1].DataDir, Equals, "/tikv-cdc_data") } func (s *metaSuiteTopo) TestGlobalOptions(c *C) { @@ -99,6 +107,9 @@ pd_servers: cdc_servers: - host: 172.16.5.233 data_dir: "cdc-data" +kvcdc_servers: + - host: 172.16.5.244 + data_dir: "tikv-cdc-data" `), &topo) c.Assert(err, IsNil) c.Assert(topo.GlobalOptions.User, Equals, "test1") @@ -113,6 +124,10 @@ cdc_servers: c.Assert(topo.CDCServers[0].SSHPort, Equals, 220) c.Assert(topo.CDCServers[0].DeployDir, Equals, "test-deploy/cdc-8300") c.Assert(topo.CDCServers[0].DataDir, Equals, "cdc-data") + + c.Assert(topo.TiKVCDCServers[0].SSHPort, Equals, 220) + c.Assert(topo.TiKVCDCServers[0].DeployDir, Equals, "test-deploy/tikv-cdc-8600") + c.Assert(topo.TiKVCDCServers[0].DataDir, Equals, "tikv-cdc-data") } func (s *metaSuiteTopo) TestDataDirAbsolute(c *C) { @@ -131,6 +146,11 @@ cdc_servers: data_dir: "cdc-data" - host: 172.16.5.234 port: 23333 +kvcdc_servers: + - host: 172.16.5.244 + data_dir: "tikv-cdc-data" + - host: 172.16.5.245 + port: 33333 `), &topo) c.Assert(err, IsNil) @@ -139,6 +159,9 @@ cdc_servers: c.Assert(topo.CDCServers[0].DataDir, Equals, "cdc-data") c.Assert(topo.CDCServers[1].DataDir, Equals, "/test-data/cdc-23333") + + c.Assert(topo.TiKVCDCServers[0].DataDir, Equals, "tikv-cdc-data") + c.Assert(topo.TiKVCDCServers[1].DataDir, Equals, "/test-data/tikv-cdc-33333") } func (s *metaSuiteTopo) TestGlobalConfig(c *C) { @@ -164,6 +187,8 @@ server_configs: status.address: 10 port: 1230 scheduler.max_limit: 20480 + kvcdc: + gc-ttl: 43200 tidb_servers: - host: 172.16.5.138 @@ -176,6 +201,13 @@ tidb_servers: config: latch.capacity: 5000 log.file.rotate: "55555.xxx" + +kvcdc_servers: + - host: 172.16.5.200 + - host: 172.16.5.201 + port: 8601 + config: + log-level: "debug" `), &topo) c.Assert(err, IsNil) c.Assert(topo.ServerConfigs.TiDB, DeepEquals, map[string]interface{}{ @@ -184,6 +216,10 @@ tidb_servers: "latch.capacity": 20480, "log.file.rotate": "123445.xxx", }) + c.Assert(topo.ServerConfigs.TiKVCDC, DeepEquals, map[string]interface{}{ + "gc-ttl": 43200, + }) + expected := map[string]interface{}{ "status": map[string]interface{}{ "address": 10, @@ -243,6 +279,20 @@ tidb_servers: got = FoldMap(topo.TiDBServers[1].Config) c.Assert(err, IsNil) c.Assert(got, DeepEquals, expected) + + expected = map[string]interface{}{} + got = FoldMap(topo.TiKVCDCServers[0].Config) + c.Assert(got, DeepEquals, expected) + + expected = map[string]interface{}{} + got = FoldMap(topo.TiKVCDCServers[0].Config) + c.Assert(got, DeepEquals, expected) + + expected = map[string]interface{}{ + "log-level": "debug", + } + got = FoldMap(topo.TiKVCDCServers[1].Config) + c.Assert(got, DeepEquals, expected) } func (s *metaSuiteTopo) TestGlobalConfigPatch(c *C) { @@ -313,6 +363,8 @@ server_configs: config.item2: 300 config.item3.item5: 500 config.item3.item6: 600 + kvcdc: + gc-ttl: 43200 tikv_servers: - host: 172.16.5.138 @@ -320,6 +372,11 @@ tikv_servers: config.item2: 500 config.item3.item5: 700 +kvcdc_servers: + - host: 172.16.5.238 + config: + log-level: "debug" + `), &topo) c.Assert(err, IsNil) expected := `# WARNING: This file is auto-generated. Do not edit! All your modification will be overwritten! @@ -339,6 +396,20 @@ item6 = 600 got, err := merge2Toml("tikv", topo.ServerConfigs.TiKV, topo.TiKVServers[0].Config) c.Assert(err, IsNil) c.Assert(string(got), DeepEquals, expected) + + expected = `# WARNING: This file is auto-generated. Do not edit! All your modification will be overwritten! +# You can use 'tiup cluster edit-config' and 'tiup cluster reload' to update the configuration +# All configuration items you want to change can be added to: +# server_configs: +# kvcdc: +# aa.b1.c3: value +# aa.b2.c4: value +gc-ttl = 43200 +log-level = "debug" +` + got, err = merge2Toml("kvcdc", topo.ServerConfigs.TiKVCDC, topo.TiKVCDCServers[0].Config) + c.Assert(err, IsNil) + c.Assert(string(got), DeepEquals, expected) } func (s *metaSuiteTopo) TestMerge2Toml2(c *C) { diff --git a/pkg/cluster/spec/tikv_cdc.go b/pkg/cluster/spec/tikv_cdc.go new file mode 100644 index 0000000000..b298d0aabc --- /dev/null +++ b/pkg/cluster/spec/tikv_cdc.go @@ -0,0 +1,300 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package spec + +import ( + "context" + "crypto/tls" + "fmt" + "path/filepath" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tiup/pkg/cluster/api" + "github.com/pingcap/tiup/pkg/cluster/ctxt" + "github.com/pingcap/tiup/pkg/cluster/template/scripts" + logprinter "github.com/pingcap/tiup/pkg/logger/printer" + "github.com/pingcap/tiup/pkg/meta" + "github.com/pingcap/tiup/pkg/tidbver" +) + +// TiKVCDCSpec represents the TiKVCDC topology specification in topology.yaml +type TiKVCDCSpec struct { + Host string `yaml:"host"` + SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"` + Imported bool `yaml:"imported,omitempty"` + Patched bool `yaml:"patched,omitempty"` + IgnoreExporter bool `yaml:"ignore_exporter,omitempty"` + Port int `yaml:"port" default:"8600"` + DeployDir string `yaml:"deploy_dir,omitempty"` + DataDir string `yaml:"data_dir,omitempty"` + LogDir string `yaml:"log_dir,omitempty"` + Offline bool `yaml:"offline,omitempty"` + GCTTL int64 `yaml:"gc-ttl,omitempty" validate:"gc-ttl:editable"` + TZ string `yaml:"tz,omitempty" validate:"tz:editable"` + NumaNode string `yaml:"numa_node,omitempty" validate:"numa_node:editable"` + Config map[string]interface{} `yaml:"config,omitempty" validate:"config:ignore"` + ResourceControl meta.ResourceControl `yaml:"resource_control,omitempty" validate:"resource_control:editable"` + Arch string `yaml:"arch,omitempty"` + OS string `yaml:"os,omitempty"` +} + +// Role returns the component role of the instance +func (s *TiKVCDCSpec) Role() string { + return ComponentTiKVCDC +} + +// SSH returns the host and SSH port of the instance +func (s *TiKVCDCSpec) SSH() (string, int) { + return s.Host, s.SSHPort +} + +// GetMainPort returns the main port of the instance +func (s *TiKVCDCSpec) GetMainPort() int { + return s.Port +} + +// IsImported returns if the node is imported from TiDB-Ansible +func (s *TiKVCDCSpec) IsImported() bool { + // TiDB-Ansible do not support TiKV-CDC + return false +} + +// IgnoreMonitorAgent returns if the node does not have monitor agents available +func (s *TiKVCDCSpec) IgnoreMonitorAgent() bool { + return s.IgnoreExporter +} + +// TiKVCDCComponent represents TiKV-CDC component. +type TiKVCDCComponent struct{ Topology *Specification } + +// Name implements Component interface. +func (c *TiKVCDCComponent) Name() string { + return ComponentTiKVCDC +} + +// Role implements Component interface. +func (c *TiKVCDCComponent) Role() string { + return ComponentTiKVCDC +} + +// Instances implements Component interface. +func (c *TiKVCDCComponent) Instances() []Instance { + ins := make([]Instance, 0, len(c.Topology.TiKVCDCServers)) + for _, s := range c.Topology.TiKVCDCServers { + s := s + instance := &TiKVCDCInstance{BaseInstance{ + InstanceSpec: s, + Name: c.Name(), + Host: s.Host, + Port: s.Port, + SSHP: s.SSHPort, + + Ports: []int{ + s.Port, + }, + Dirs: []string{ + s.DeployDir, + }, + StatusFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config, _ ...string) string { + return statusByHost(s.Host, s.Port, "/status", timeout, tlsCfg) + }, + UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { + return UptimeByHost(s.Host, s.Port, timeout, tlsCfg) + }, + }, c.Topology} + if s.DataDir != "" { + instance.Dirs = append(instance.Dirs, s.DataDir) + } + + ins = append(ins, instance) + } + return ins +} + +// TiKVCDCInstance represent the TiKV-CDC instance. +type TiKVCDCInstance struct { + BaseInstance + topo Topology +} + +// ScaleConfig deploy temporary config on scaling +func (i *TiKVCDCInstance) ScaleConfig( + ctx context.Context, + e ctxt.Executor, + topo Topology, + clusterName, + clusterVersion, + user string, + paths meta.DirPaths, +) error { + s := i.topo + defer func() { + i.topo = s + }() + i.topo = mustBeClusterTopo(topo) + + return i.InitConfig(ctx, e, clusterName, clusterVersion, user, paths) +} + +// InitConfig implements Instance interface. +func (i *TiKVCDCInstance) InitConfig( + ctx context.Context, + e ctxt.Executor, + clusterName, + clusterVersion, + deployUser string, + paths meta.DirPaths, +) error { + if !tidbver.TiKVCDCSupportDeploy(clusterVersion) { + return errors.New("tikv-cdc only supports cluster version v6.2.0 or later") + } + + topo := i.topo.(*Specification) + if err := i.BaseInstance.InitConfig(ctx, e, topo.GlobalOptions, deployUser, paths); err != nil { + return err + } + enableTLS := topo.GlobalOptions.TLSEnabled + spec := i.InstanceSpec.(*TiKVCDCSpec) + globalConfig := topo.ServerConfigs.TiKVCDC + instanceConfig := spec.Config + + cfg := scripts.NewTiKVCDCScript( + i.GetHost(), + paths.Deploy, + paths.Log, + paths.Data[0], + enableTLS, + spec.GCTTL, + spec.TZ, + ).WithPort(spec.Port).WithNumaNode(spec.NumaNode).AppendEndpoints(topo.Endpoints(deployUser)...) + + // doesn't work. + if _, err := i.setTLSConfig(ctx, false, nil, paths); err != nil { + return err + } + + fp := filepath.Join(paths.Cache, fmt.Sprintf("run_tikv-cdc_%s_%d.sh", i.GetHost(), i.GetPort())) + + if err := cfg.ConfigToFile(fp); err != nil { + return err + } + dst := filepath.Join(paths.Deploy, "scripts", "run_tikv-cdc.sh") + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { + return err + } + + if _, _, err := e.Execute(ctx, "chmod +x "+dst, false); err != nil { + return err + } + + return i.MergeServerConfig(ctx, e, globalConfig, instanceConfig, paths) +} + +// setTLSConfig set TLS Config to support enable/disable TLS +func (i *TiKVCDCInstance) setTLSConfig(ctx context.Context, enableTLS bool, configs map[string]interface{}, paths meta.DirPaths) (map[string]interface{}, error) { + return nil, nil +} + +var _ RollingUpdateInstance = &TiKVCDCInstance{} + +// GetAddr return the address of this TiKV-CDC instance +func (i *TiKVCDCInstance) GetAddr() string { + return fmt.Sprintf("%s:%d", i.GetHost(), i.GetPort()) +} + +// PreRestart implements RollingUpdateInstance interface. +// All errors are ignored, to trigger hard restart. +func (i *TiKVCDCInstance) PreRestart(ctx context.Context, topo Topology, apiTimeoutSeconds int, tlsCfg *tls.Config) error { + tidbTopo, ok := topo.(*Specification) + if !ok { + panic("should be type of tidb topology") + } + + logger, ok := ctx.Value(logprinter.ContextKeyLogger).(*logprinter.Logger) + if !ok { + panic("logger not found") + } + + address := i.GetAddr() + // cdc rolling upgrade strategy only works if there are more than 2 captures + if len(tidbTopo.TiKVCDCServers) <= 1 { + logger.Debugf("tikv-cdc pre-restart skipped, only one capture in the topology, addr: %s", address) + return nil + } + + start := time.Now() + client := api.NewTiKVCDCOpenAPIClient(ctx, []string{address}, 5*time.Second, tlsCfg) + captures, err := client.GetAllCaptures() + if err != nil { + logger.Debugf("tikv-cdc pre-restart skipped, cannot get all captures, trigger hard restart, addr: %s, elapsed: %+v", address, time.Since(start)) + return nil + } + + var ( + captureID string + found bool + isOwner bool + ) + for _, capture := range captures { + if address == capture.AdvertiseAddr { + found = true + captureID = capture.ID + isOwner = capture.IsOwner + break + } + } + + // this may happen if the capture crashed right away. + if !found { + logger.Debugf("tikv-cdc pre-restart finished, cannot found the capture, trigger hard restart, captureID: %s, addr: %s, elapsed: %+v", captureID, address, time.Since(start)) + return nil + } + + if isOwner { + if err := client.ResignOwner(); err != nil { + // if resign the owner failed, no more need to drain the current capture, + // since it's not allowed by the cdc. + // return nil to trigger hard restart. + logger.Debugf("tikv-cdc pre-restart finished, resign owner failed, trigger hard restart, captureID: %s, addr: %s, elapsed: %+v", captureID, address, time.Since(start)) + return nil + } + } + + // TODO: support drain capture to make restart smooth. + + logger.Debugf("tikv-cdc pre-restart success, captureID: %s, addr: %s, elapsed: %+v", captureID, address, time.Since(start)) + return nil +} + +// PostRestart implements RollingUpdateInstance interface. +func (i *TiKVCDCInstance) PostRestart(ctx context.Context, topo Topology, tlsCfg *tls.Config) error { + logger, ok := ctx.Value(logprinter.ContextKeyLogger).(*logprinter.Logger) + if !ok { + panic("logger not found") + } + + start := time.Now() + address := i.GetAddr() + + client := api.NewTiKVCDCOpenAPIClient(ctx, []string{address}, 5*time.Second, tlsCfg) + err := client.IsCaptureAlive() + if err != nil { + logger.Debugf("tikv-cdc post-restart finished, get capture status failed, addr: %s, err: %+v, elapsed: %+v", address, err, time.Since(start)) + return nil + } + + logger.Debugf("tikv-cdc post-restart success, addr: %s, elapsed: %+v", address, time.Since(start)) + return nil +} diff --git a/pkg/cluster/task/update_meta.go b/pkg/cluster/task/update_meta.go index eb9510c3a1..8b4e02a184 100644 --- a/pkg/cluster/task/update_meta.go +++ b/pkg/cluster/task/update_meta.go @@ -108,6 +108,15 @@ func (u *UpdateMeta) Execute(ctx context.Context) error { } newMeta.Topology.CDCServers = cdcServers + tikvCDCServers := make([]*spec.TiKVCDCSpec, 0) + for i, instance := range (&spec.TiKVCDCComponent{Topology: topo}).Instances() { + if deleted.Exist(instance.ID()) { + continue + } + tikvCDCServers = append(tikvCDCServers, topo.TiKVCDCServers[i]) + } + newMeta.Topology.TiKVCDCServers = tikvCDCServers + tisparkWorkers := make([]*spec.TiSparkWorkerSpec, 0) for i, instance := range (&spec.TiSparkWorkerComponent{Topology: topo}).Instances() { if deleted.Exist(instance.ID()) { diff --git a/pkg/cluster/template/config/prometheus.go b/pkg/cluster/template/config/prometheus.go index 69ef056d2b..8f0ef4233a 100644 --- a/pkg/cluster/template/config/prometheus.go +++ b/pkg/cluster/template/config/prometheus.go @@ -37,6 +37,7 @@ type PrometheusConfig struct { PumpAddrs []string DrainerAddrs []string CDCAddrs []string + TiKVCDCAddrs []string BlackboxExporterAddrs []string LightningAddrs []string MonitoredServers []string @@ -119,6 +120,12 @@ func (c *PrometheusConfig) AddCDC(ip string, port uint64) *PrometheusConfig { return c } +// AddTiKVCDC add a tikv-cdc address +func (c *PrometheusConfig) AddTiKVCDC(ip string, port uint64) *PrometheusConfig { + c.TiKVCDCAddrs = append(c.TiKVCDCAddrs, fmt.Sprintf("%s:%d", ip, port)) + return c +} + // AddBlackboxExporter add a BlackboxExporter address func (c *PrometheusConfig) AddBlackboxExporter(ip string, port uint64) *PrometheusConfig { c.BlackboxExporterAddrs = append(c.BlackboxExporterAddrs, fmt.Sprintf("%s:%d", ip, port)) diff --git a/pkg/cluster/template/scripts/tikv_cdc.go b/pkg/cluster/template/scripts/tikv_cdc.go new file mode 100644 index 0000000000..ca0a18f096 --- /dev/null +++ b/pkg/cluster/template/scripts/tikv_cdc.go @@ -0,0 +1,103 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package scripts + +import ( + "bytes" + "os" + "path" + "text/template" + + "github.com/pingcap/tiup/embed" +) + +// TiKVCDCScript represent the data to generate cdc config +type TiKVCDCScript struct { + IP string + Port int + DeployDir string + LogDir string + DataDir string + NumaNode string + GCTTL int64 + TZ string + TLSEnabled bool + Endpoints []*PDScript +} + +// NewTiKVCDCScript returns a TiKVCDCScript with given arguments +func NewTiKVCDCScript(ip, deployDir, logDir, dataDir string, enableTLS bool, gcTTL int64, tz string) *TiKVCDCScript { + return &TiKVCDCScript{ + IP: ip, + Port: 8600, + DeployDir: deployDir, + LogDir: logDir, + DataDir: dataDir, + TLSEnabled: enableTLS, + GCTTL: gcTTL, + TZ: tz, + } +} + +// WithPort set Port field of TiKVCDCScript +func (c *TiKVCDCScript) WithPort(port int) *TiKVCDCScript { + c.Port = port + return c +} + +// WithNumaNode set NumaNode field of TiKVCDCScript +func (c *TiKVCDCScript) WithNumaNode(numa string) *TiKVCDCScript { + c.NumaNode = numa + return c +} + +// Config generate the config file data. +func (c *TiKVCDCScript) Config() ([]byte, error) { + fp := path.Join("templates", "scripts", "run_tikv-cdc.sh.tpl") + tpl, err := embed.ReadTemplate(fp) + if err != nil { + return nil, err + } + return c.ConfigWithTemplate(string(tpl)) +} + +// ConfigToFile write config content to specific file. +func (c *TiKVCDCScript) ConfigToFile(file string) error { + config, err := c.Config() + if err != nil { + return err + } + return os.WriteFile(file, config, 0755) +} + +// ConfigWithTemplate generate the TiKV-CDC config content by tpl +func (c *TiKVCDCScript) ConfigWithTemplate(tpl string) ([]byte, error) { + tmpl, err := template.New("TiKVCDC").Parse(tpl) + if err != nil { + return nil, err + } + + content := bytes.NewBufferString("") + if err := tmpl.Execute(content, c); err != nil { + return nil, err + } + + return content.Bytes(), nil +} + +// AppendEndpoints add new PDScript to Endpoints field +func (c *TiKVCDCScript) AppendEndpoints(ends ...*PDScript) *TiKVCDCScript { + c.Endpoints = append(c.Endpoints, ends...) + return c +} diff --git a/pkg/tidbver/tidbver.go b/pkg/tidbver/tidbver.go index 6a1cda6377..a56576aaa3 100644 --- a/pkg/tidbver/tidbver.go +++ b/pkg/tidbver/tidbver.go @@ -98,3 +98,9 @@ func DMSupportDeploy(version string) bool { // tiup-dm only support version not less than v2.0 return semver.Compare(version, "v2.0.0") >= 0 || strings.Contains(version, "nightly") } + +// TiKVCDCSupportDeploy return if given version of TiDB/TiKV cluster is supported +func TiKVCDCSupportDeploy(version string) bool { + // TiKV-CDC only support TiKV version not less than v6.2.0 + return semver.Compare(version, "v6.2.0") >= 0 || strings.Contains(version, "nightly") +} diff --git a/tests/tiup-cluster/run.sh b/tests/tiup-cluster/run.sh index 96986a1962..8f06e66c9b 100755 --- a/tests/tiup-cluster/run.sh +++ b/tests/tiup-cluster/run.sh @@ -31,6 +31,15 @@ function tiup-cluster() { fi } +function tiup() { + mkdir -p ~/.tiup/bin && cp -f ./root.json ~/.tiup/bin/ + if [ -f "../tiup/bin/tiup.test" ]; then + ../tiup/bin/tiup.test -test.coverprofile=./cover/cov.itest-$(date +'%s')-$RANDOM.out __DEVEL--i-heard-you-like-tests "$@" + else + ../../bin/tiup "$@" + fi +} + . ./script/util.sh # use run.sh test_cmd test_upgrade to run specify cases diff --git a/tests/tiup-cluster/script/cmd_subtest.sh b/tests/tiup-cluster/script/cmd_subtest.sh index e913fa07b0..d74e760c2b 100755 --- a/tests/tiup-cluster/script/cmd_subtest.sh +++ b/tests/tiup-cluster/script/cmd_subtest.sh @@ -5,15 +5,40 @@ set -eu function cmd_subtest() { mkdir -p ~/.tiup/bin/ - version=$1 - test_tls=$2 - native_ssh=$3 + version="nightly" + topo_name="full" + test_tls=false + native_ssh=false + + while [[ $# -gt 0 ]] + do + case $1 in + --version) + version="$2" + shift + shift + ;; + --topo) + topo_name="$2" + shift + shift + ;; + --tls) + test_tls=true + shift + ;; + --native-ssh) + native_ssh=true + shift + ;; + esac + done name="test_cmd_$RANDOM" if [ $test_tls = true ]; then - topo=./topo/full_tls.yaml + topo=./topo/${topo_name}_tls.yaml else - topo=./topo/full.yaml + topo=./topo/${topo_name}.yaml fi client="" diff --git a/tests/tiup-cluster/script/scale_core.sh b/tests/tiup-cluster/script/scale_core.sh index 86d53bffe9..a026da8fda 100755 --- a/tests/tiup-cluster/script/scale_core.sh +++ b/tests/tiup-cluster/script/scale_core.sh @@ -36,7 +36,7 @@ function scale_core() { if [ $test_tls = true ]; then total_sub_one=18 else - total_sub_one=21 + total_sub_one=23 fi echo "start scale in tidb" diff --git a/tests/tiup-cluster/script/tikv_cdc.sh b/tests/tiup-cluster/script/tikv_cdc.sh new file mode 100755 index 0000000000..c9f73357a2 --- /dev/null +++ b/tests/tiup-cluster/script/tikv_cdc.sh @@ -0,0 +1,205 @@ +#!/bin/bash + +set -eu + +function tikv_cdc_test() { + mkdir -p ~/.tiup/bin/ + + version="nightly" + topo_name="tikv_cdc" + test_tls=false + tikv_cdc_patch="" + + while [[ $# -gt 0 ]]; do + case $1 in + --version) + version="$2" + shift + shift + ;; + --topo) + topo_name="$2" + shift + shift + ;; + --tls) + test_tls=true + shift + ;; + --tikv-cdc-patch) + tikv_cdc_patch="$2" + shift + shift + ;; + esac + done + + name="test_tikv_cdc_$RANDOM" + if [ $test_tls = true ]; then + topo=./topo/${topo_name}_tls.yaml + else + topo=./topo/$topo_name.yaml + fi + + # identify SSH via ssh-agent + eval $(ssh-agent) + ssh-add /root/.ssh/id_rsa + + tiup-cluster check $topo --apply + + # Test check version. Cluster version >= v6.2.0 is required. + # Error message: "Error: init config failed: n3:8600: tikv-cdc only supports cluster version v6.2.0 or later" + ! tiup-cluster --yes deploy $name 6.1.0 $topo + + tiup-cluster --yes deploy $name $version $topo + + # check the local config + tiup-cluster exec $name -R tikv-cdc --command 'grep "gc-ttl = 43200$" /home/tidb/deploy/tikv-cdc-8600/conf/tikv-cdc.toml' + + tiup-cluster list | grep "$name" + + tiup-cluster audit | grep "deploy $name $version" + + # Get the audit id can check it just runnable + id=$(tiup-cluster audit | grep "deploy $name $version" | awk '{print $1}') + tiup-cluster audit $id + + tiup-cluster --yes start $name + + # Patch + if [[ ! -z "$tikv_cdc_patch" ]]; then + tiup install tikv-cdc:v${tikv_cdc_patch} + tiup-cluster --yes patch $name ~/.tiup/storage/cluster/packages/tikv-cdc-v${tikv_cdc_patch}-linux-amd64.tar.gz -R tikv-cdc --offline + tiup-cluster display $name | grep "tikv-cdc (patched)" + fi + + tiup-cluster _test $name writable + + # check the data dir + tiup-cluster exec $name -N n3 --command "grep /home/tidb/deploy/tikv-cdc-8600/data /home/tidb/deploy/tikv-cdc-8600/scripts/run_tikv-cdc.sh" + tiup-cluster exec $name -N n4 --command "grep /home/tidb/tikv_cdc_data /home/tidb/deploy/tikv-cdc-8600/scripts/run_tikv-cdc.sh" + + # test patch overwrite + if [[ ! -z "$tikv_cdc_patch" ]]; then + tiup-cluster --yes patch $name ~/.tiup/storage/cluster/packages/tikv-cdc-v${tikv_cdc_patch}-linux-amd64.tar.gz -R tikv-cdc --overwrite + # overwrite with the same tarball twice + tiup-cluster --yes patch $name ~/.tiup/storage/cluster/packages/tikv-cdc-v${tikv_cdc_patch}-linux-amd64.tar.gz -R tikv-cdc --overwrite + fi + + tiup-cluster --yes stop $name + + tiup-cluster --yes start $name -R pd,tikv-cdc + + tiup-cluster --yes restart $name + + tiup-cluster _test $name writable + + tiup-cluster _test $name data + + # Test enable & disable + tiup-cluster exec $name -R tikv-cdc --command="systemctl status tikv-cdc-8600|grep 'enabled;'" + tiup-cluster disable $name -R tikv-cdc + tiup-cluster exec $name -R tikv-cdc --command="systemctl status tikv-cdc-8600|grep 'disabled;'" + tiup-cluster disable $name + tiup-cluster enable $name + tiup-cluster exec $name -R tikv-cdc --command="systemctl status tikv-cdc-8600|grep 'enabled;'" + + tiup-cluster --yes clean $name --data --all --ignore-node n5:8600 + + echo "checking cleanup data and log" + ! tiup-cluster exec $name -N n3 --command "ls /home/tidb/deploy/tikv-cdc-8600/log/tikv.log" + + tiup-cluster --yes start $name + + ! tiup-cluster _test $name data + + tiup-cluster --yes destroy $name +} + +function tikv_cdc_scale_test() { + mkdir -p ~/.tiup/bin/ + + version="nightly" + topo_name="tikv_cdc" + test_tls=false + + while [[ $# -gt 0 ]]; do + case $1 in + --version) + version="$2" + shift + shift + ;; + --topo) + topo_name="$2" + shift + shift + ;; + --tls) + test_tls=true + shift + ;; + esac + done + + name=test_tikv_cdc_scale_$RANDOM + if [ $test_tls = true ]; then + topo=./topo/${topo_name}_tls.yaml + else + topo=./topo/${topo_name}.yaml + fi + + tiup-cluster --yes deploy $name $version $topo + + tiup-cluster --yes start $name + + tiup-cluster _test $name writable + + tiup-cluster display $name + + total_sub_one=13 + total=14 + total_add_one=15 + + echo -e "\033[0;36m Start scale in tikv-cdc (-n3) \033[0m" + yes | tiup-cluster scale-in $name -N n3:8600 + wait_instance_num_reach $name $total_sub_one false + + echo -e "\033[0;36m Start scale out tikv-cdc (+n5) \033[0m" + mkdir -p /tmp/topo + topo=/tmp/topo/tikv_cdc_scale_in.yaml + cat < $topo +kvcdc_servers: + - host: n5 +EOF + yes | tiup-cluster scale-out $name $topo + wait_instance_num_reach $name $total false + + echo -e "\033[0;36m Scale out another tikv-cdc on n5 to verify port conflict detection \033[0m" + cat < $topo +kvcdc_servers: + - host: n5 + data_dir: "/home/tidb/tikv_cdc_data_1" +EOF + # should fail with message "Error: port conflict for '8600' between 'kvcdc_servers:n5.port' and 'kvcdc_servers:n5.port'" + ! yes | tiup-cluster scale-out $name $topo # should fail + + echo -e "\033[0;36m Scale out another tikv-cdc on n5 with different port & data_dir \033[0m" + cat < $topo +kvcdc_servers: + - host: n5 + port: 8666 + data_dir: "/home/tidb/tikv_cdc_data_1" +EOF + yes | tiup-cluster scale-out $name $topo + wait_instance_num_reach $name $total_add_one false + + # scale in n4, as n4 should be the owner. + echo -e "\033[0;36m Start scale in tikv-cdc (-n4) \033[0m" + yes | tiup-cluster scale-in $name -N n4:8600 + wait_instance_num_reach $name $total false + + tiup-cluster _test $name writable + + tiup-cluster --yes destroy $name +} diff --git a/tests/tiup-cluster/test_cmd.sh b/tests/tiup-cluster/test_cmd.sh index 7ddcc36e76..fcdf2aa171 100755 --- a/tests/tiup-cluster/test_cmd.sh +++ b/tests/tiup-cluster/test_cmd.sh @@ -4,5 +4,5 @@ set -eu source script/cmd_subtest.sh -echo "test cluster for version v4.0.12 wo/ TLS, via easy ssh" -cmd_subtest 4.0.12 false false +echo "test cluster for version v6.2.0 wo/ TLS, via easy ssh" +cmd_subtest --version 6.2.0 diff --git a/tests/tiup-cluster/test_cmd_tls_native_ssh.sh b/tests/tiup-cluster/test_cmd_tls_native_ssh.sh index e6ee83c2b0..3aaccbac68 100755 --- a/tests/tiup-cluster/test_cmd_tls_native_ssh.sh +++ b/tests/tiup-cluster/test_cmd_tls_native_ssh.sh @@ -5,7 +5,7 @@ set -eu source script/cmd_subtest.sh export GO_FAILPOINTS='github.com/pingcap/tiup/pkg/cluster/executor/assertNativeSSH=return(true)' -echo "test cluster for version v4.0.12 w/ TLS, via native ssh" -cmd_subtest 6.0.0 true true +echo "test cluster for version v6.0.0 w/ TLS, via native ssh" +cmd_subtest --version 6.0.0 --tls --native-ssh unset GO_FAILPOINTS diff --git a/tests/tiup-cluster/test_scale_core.sh b/tests/tiup-cluster/test_scale_core.sh index d1e6fcdd65..667fc87043 100755 --- a/tests/tiup-cluster/test_scale_core.sh +++ b/tests/tiup-cluster/test_scale_core.sh @@ -4,5 +4,5 @@ set -eu source script/scale_core.sh -echo "test scaling of core components in cluster for version v4.0.12 wo/ TLS, via easy ssh" -scale_core v6.0.0 false false +echo "test scaling of core components in cluster for version v6.2.0 wo/ TLS, via easy ssh" +scale_core v6.2.0 false false diff --git a/tests/tiup-cluster/test_tikv_cdc.sh b/tests/tiup-cluster/test_tikv_cdc.sh new file mode 100755 index 0000000000..df06c6f4ef --- /dev/null +++ b/tests/tiup-cluster/test_tikv_cdc.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +set -eu + +source script/tikv_cdc.sh + +# TODO: test tls after TLS is supported. +# TODO: test upgrade of TiKV-CDC (there is only one release version now) + +############################################### +echo -e "\033[0;36m<<< Test specified cases for TiKV-CDC >>>\033[0m" +tikv_cdc_test --version 6.2.0 --topo tikv_cdc --tikv-cdc-patch 1.0.0 + +############################################### +echo -e "\033[0;36m<<< Test scale in/out for TiKV-CDC >>>\033[0m" +tikv_cdc_scale_test --version 6.2.0 --topo tikv_cdc diff --git a/tests/tiup-cluster/topo/full.yaml b/tests/tiup-cluster/topo/full.yaml index 999b483c88..603cfb3322 100644 --- a/tests/tiup-cluster/topo/full.yaml +++ b/tests/tiup-cluster/topo/full.yaml @@ -54,6 +54,11 @@ cdc_servers: - host: n4 - host: n5 +kvcdc_servers: + - host: n3 + - host: n4 + data_dir: "/home/tidb/tikv_cdc_data" + tispark_masters: - host: n3 diff --git a/tests/tiup-cluster/topo/tikv_cdc.yaml b/tests/tiup-cluster/topo/tikv_cdc.yaml new file mode 100644 index 0000000000..bae5ccdd62 --- /dev/null +++ b/tests/tiup-cluster/topo/tikv_cdc.yaml @@ -0,0 +1,46 @@ +global: + user: tidb + group: pingcap + +server_configs: + tikv: + storage.reserve-space: 1K + storage.api-version: 2 + storage.enable-ttl: true + kvcdc: + gc-ttl: 43200 + +tidb_servers: + - host: n1 + - host: n2 + +pd_servers: + - host: n3 + - host: n4 + - host: n5 + +tikv_servers: + - host: n1 + - host: n3 + data_dir: "/home/tidb/my_kv_data" + - host: n4 + - host: n5 + +kvcdc_servers: + - host: n3 + - host: n4 + data_dir: "/home/tidb/tikv_cdc_data" + +monitoring_servers: + - host: n1 + rule_dir: /tmp/local/prometheus +grafana_servers: + - host: n1 + dashboard_dir: /tmp/local/grafana +alertmanager_servers: + - host: n1 + config_file: /tmp/local/alertmanager/alertmanager.yml + +monitored: + node_exporter_port: 9100 + blackbox_exporter_port: 9115