Skip to content

Commit

Permalink
cluster: Support TiKV-CDC component (#2022)
Browse files Browse the repository at this point in the history
  • Loading branch information
pingyu authored Sep 14, 2022
1 parent fb81eac commit d6a2826
Show file tree
Hide file tree
Showing 27 changed files with 1,137 additions and 16 deletions.
1 change: 1 addition & 0 deletions .github/workflows/integrate-cluster-cmd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ jobs:
# - 'test_cmd_tls_native_ssh'
- 'test_upgrade'
- 'test_upgrade_tls'
- 'test_tikv_cdc'
env:
working-directory: ${{ github.workspace }}/go/src/github.com/${{ github.repository }}
steps:
Expand Down
2 changes: 1 addition & 1 deletion docker/up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ if [ -z "${DEV}" ]; then
)
else
INFO "Build tiup-cluster in $TIUP_CLUSTER_ROOT"
(cd "${TIUP_CLUSTER_ROOT}";make failpoint-enable;GOOS=linux GOARCH=amd64 make cluster dm;make failpoint-disable)
(cd "${TIUP_CLUSTER_ROOT}";make failpoint-enable;GOOS=linux GOARCH=amd64 make tiup cluster dm;make failpoint-disable)
fi

if [ "${INIT_ONLY}" -eq 1 ]; then
Expand Down
16 changes: 16 additions & 0 deletions embed/examples/cluster/topology.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ monitored:
# pd:
# tiflash:
# tiflash-learner:
# kvcdc:

# # Server configs are used to specify the configuration of PD Servers.
pd_servers:
Expand Down Expand Up @@ -278,6 +279,21 @@ tiflash_servers:
data_dir: /data2/tidb-data/tiflash-9001
log_dir: /data2/tidb-deploy/tiflash-9001/log

# # Server configs are used to specify the configuration of TiKV-CDC Servers.
kvcdc_servers:
- host: 10.0.1.20
# # SSH port of the server.
# ssh_port: 22
# # TiKV-CDC Server communication port.
port: 8600
# # TiKV-CDC Server data storage directory.
data_dir: "/data1/tidb-data/tikv-cdc-8600"
# # TiKV-CDC Server log file storage directory.
log_dir: "/data1/tidb-deploy/tikv-cdc-8600/log"
- host: 10.0.1.21
data_dir: "/data1/tidb-data/tikv-cdc-8600"
log_dir: "/data1/tidb-deploy/tikv-cdc-8600/log"

# # Server configs are used to specify the configuration of Prometheus Server.
monitoring_servers:
# # The ip address of the Monitoring Server.
Expand Down
17 changes: 17 additions & 0 deletions embed/templates/config/prometheus.yml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,23 @@ scrape_configs:
- '{{.}}'
{{- end}}
{{- end}}
{{- if .TiKVCDCAddrs}}
- job_name: "tikv-cdc"
honor_labels: true # don't overwrite job & instance labels
{{- if .TLSEnabled}}
scheme: https
tls_config:
insecure_skip_verify: false
ca_file: ../tls/ca.crt
cert_file: ../tls/prometheus.crt
key_file: ../tls/prometheus.pem
{{- end}}
static_configs:
- targets:
{{- range .TiKVCDCAddrs}}
- '{{.}}'
{{- end}}
{{- end}}
{{- if .NGMonitoringAddrs}}
- job_name: "ng-monitoring"
honor_labels: true # don't overwrite job & instance labels
Expand Down
42 changes: 42 additions & 0 deletions embed/templates/scripts/run_tikv-cdc.sh.tpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/bin/bash
set -e

# WARNING: This file was auto-generated. Do not edit!
# All your edit might be overwritten!
DEPLOY_DIR={{.DeployDir}}
cd "${DEPLOY_DIR}" || exit 1

{{- define "PDList"}}
{{- range $idx, $pd := .}}
{{- if eq $idx 0}}
{{- $pd.Scheme}}://{{$pd.IP}}:{{$pd.ClientPort}}
{{- else -}}
,{{- $pd.Scheme}}://{{$pd.IP}}:{{$pd.ClientPort}}
{{- end}}
{{- end}}
{{- end}}

{{- if .NumaNode}}
exec numactl --cpunodebind={{.NumaNode}} --membind={{.NumaNode}} bin/tikv-cdc server \
{{- else}}
exec bin/tikv-cdc server \
{{- end}}
--addr "0.0.0.0:{{.Port}}" \
--advertise-addr "{{.IP}}:{{.Port}}" \
--pd "{{template "PDList" .Endpoints}}" \
{{- if .DataDir}}
--data-dir="{{.DataDir}}" \
{{- end}}
{{- if .TLSEnabled}}
--ca tls/ca.crt \
--cert tls/cdc.crt \
--key tls/cdc.pem \
{{- end}}
{{- if .GCTTL}}
--gc-ttl {{.GCTTL}} \
{{- end}}
{{- if .TZ}}
--tz "{{.TZ}}" \
{{- end}}
--config conf/tikv-cdc.toml \
--log-file "{{.LogDir}}/tikv-cdc.log" 2>> "{{.LogDir}}/tikv-cdc_stderr.log"
3 changes: 2 additions & 1 deletion pkg/cluster/ansible/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ server_configs:
pump: {}
drainer: {}
cdc: {}
kvcdc: {}
grafana: {}
tidb_servers: []
tikv_servers: []
Expand All @@ -139,7 +140,7 @@ monitoring_servers: []

topo, err := yaml.Marshal(clsMeta.Topology)
c.Assert(err, IsNil)
c.Assert(topo, DeepEquals, expected)
c.Assert(string(topo), DeepEquals, string(expected))
}

func (s *ansSuite) TestParseGroupVars(c *C) {
Expand Down
1 change: 1 addition & 0 deletions pkg/cluster/ansible/test-data/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ topology:
pump: {}
drainer: {}
cdc: {}
kvcdc: {}
grafana: {}
tidb_servers:
- host: 172.16.1.218
Expand Down
229 changes: 229 additions & 0 deletions pkg/cluster/api/tikv_cdc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/pingcap/errors"
logprinter "github.com/pingcap/tiup/pkg/logger/printer"
"github.com/pingcap/tiup/pkg/utils"
)

// TiKVCDCOpenAPIClient is client for access TiKVCDC Open API
type TiKVCDCOpenAPIClient struct {
urls []string
client *utils.HTTPClient
ctx context.Context
}

// NewTiKVCDCOpenAPIClient return a `TiKVCDCOpenAPIClient`
func NewTiKVCDCOpenAPIClient(ctx context.Context, addresses []string, timeout time.Duration, tlsConfig *tls.Config) *TiKVCDCOpenAPIClient {
httpPrefix := "http"
if tlsConfig != nil {
httpPrefix = "https"
}
urls := make([]string, 0, len(addresses))
for _, addr := range addresses {
urls = append(urls, fmt.Sprintf("%s://%s", httpPrefix, addr))
}

return &TiKVCDCOpenAPIClient{
urls: urls,
client: utils.NewHTTPClient(timeout, tlsConfig),
ctx: ctx,
}
}

func (c *TiKVCDCOpenAPIClient) getEndpoints(api string) (endpoints []string) {
for _, url := range c.urls {
endpoints = append(endpoints, fmt.Sprintf("%s/%s", url, api))
}
return endpoints
}

// ResignOwner resign the TiKV-CDC owner, and wait for a new owner be found
func (c *TiKVCDCOpenAPIClient) ResignOwner() error {
api := "api/v1/owner/resign"
endpoints := c.getEndpoints(api)
_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, statusCode, err := c.client.PostWithStatusCode(c.ctx, endpoint, nil)
if err != nil {
if statusCode == http.StatusNotFound {
c.l().Debugf("resign owner does not found, ignore it, err: %+v", err)
return body, nil
}
return body, err
}
return body, nil
})

if err != nil {
return err
}

owner, err := c.GetOwner()
if err != nil {
return err
}

c.l().Debugf("tikv-cdc resign owner successfully, and new owner found, owner: %+v", owner)
return nil
}

// GetOwner return the TiKV-CDC owner capture information
func (c *TiKVCDCOpenAPIClient) GetOwner() (*TiKVCDCCapture, error) {
captures, err := c.GetAllCaptures()
if err != nil {
return nil, err
}

for _, capture := range captures {
if capture.IsOwner {
return capture, nil
}
}
return nil, fmt.Errorf("cannot found the tikv-cdc owner, query urls: %+v", c.urls)
}

// GetCaptureByAddr return the capture information by the address
func (c *TiKVCDCOpenAPIClient) GetCaptureByAddr(addr string) (*TiKVCDCCapture, error) {
captures, err := c.GetAllCaptures()
if err != nil {
return nil, err
}

for _, capture := range captures {
if capture.AdvertiseAddr == addr {
return capture, nil
}
}

return nil, fmt.Errorf("capture not found, addr: %s", addr)
}

// GetAllCaptures return all captures instantaneously
func (c *TiKVCDCOpenAPIClient) GetAllCaptures() (result []*TiKVCDCCapture, err error) {
err = utils.Retry(func() error {
result, err = c.getAllCaptures()
if err != nil {
return err
}
return nil
}, utils.RetryOption{
Timeout: 10 * time.Second,
})
return result, err
}

func (c *TiKVCDCOpenAPIClient) getAllCaptures() ([]*TiKVCDCCapture, error) {
api := "api/v1/captures"
endpoints := c.getEndpoints(api)

var response []*TiKVCDCCapture

_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, statusCode, err := c.client.GetWithStatusCode(c.ctx, endpoint)
if err != nil {
if statusCode == http.StatusNotFound {
// Ignore error, and return nil to trigger hard restart
c.l().Debugf("get all captures failed, ignore it, err: %+v", err)
return body, nil
}
return body, err
}

return body, json.Unmarshal(body, &response)
})

return response, err
}

// IsCaptureAlive return error if the capture is not alive
func (c *TiKVCDCOpenAPIClient) IsCaptureAlive() error {
status, err := c.GetStatus()
if err != nil {
return err
}
if status.Liveness != TiKVCDCCaptureAlive {
return fmt.Errorf("capture is not alive, request url: %+v", c.urls[0])
}
return nil
}

// GetStatus return the status of the TiKVCDC server.
func (c *TiKVCDCOpenAPIClient) GetStatus() (result TiKVCDCServerStatus, err error) {
api := "api/v1/status"
// client should only have address to the target TiKV-CDC server, not all.
endpoints := c.getEndpoints(api)

err = utils.Retry(func() error {
data, statusCode, err := c.client.GetWithStatusCode(c.ctx, endpoints[0])
if err != nil {
if statusCode == http.StatusNotFound {
c.l().Debugf("capture server status failed, ignore it, err: %+v", err)
return nil
}
err = json.Unmarshal(data, &result)
if err != nil {
return err
}
if result.Liveness == TiKVCDCCaptureAlive {
return nil
}
return errors.New("capture status is not alive, retry it")
}
return nil
}, utils.RetryOption{
Timeout: 10 * time.Second,
})

return result, err
}

func (c *TiKVCDCOpenAPIClient) l() *logprinter.Logger {
return c.ctx.Value(logprinter.ContextKeyLogger).(*logprinter.Logger)
}

// TiKVCDCLiveness is the liveness status of a capture.
type TiKVCDCLiveness int32

const (
// TiKVCDCCaptureAlive means the capture is alive, and ready to serve.
TiKVCDCCaptureAlive TiKVCDCLiveness = 0
// TiKVCDCCaptureStopping means the capture is in the process of graceful shutdown.
TiKVCDCCaptureStopping TiKVCDCLiveness = 1
)

// TiKVCDCServerStatus holds some common information of a TiCDC server
type TiKVCDCServerStatus struct {
Version string `json:"version"`
GitHash string `json:"git_hash"`
ID string `json:"id"`
Pid int `json:"pid"`
IsOwner bool `json:"is_owner"`
Liveness TiKVCDCLiveness `json:"liveness"`
}

// TiKVCDCCapture holds common information of a capture in cdc
type TiKVCDCCapture struct {
ID string `json:"id"`
IsOwner bool `json:"is_owner"`
AdvertiseAddr string `json:"address"`
}
3 changes: 2 additions & 1 deletion pkg/cluster/spec/bindversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ func TiDBComponentVersion(comp, version string) string {
ComponentPushwaygate,
ComponentCheckCollector,
ComponentSpark,
ComponentTiSpark:
ComponentTiSpark,
ComponentTiKVCDC: // TiKV-CDC use individual version.
return ""
default:
return version
Expand Down
7 changes: 7 additions & 0 deletions pkg/cluster/spec/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,13 @@ func (i *MonitorInstance) InitConfig(
cfig.AddCDC(cdc.Host, uint64(cdc.Port))
}
}
if servers, found := topoHasField("TiKVCDCServers"); found {
for i := 0; i < servers.Len(); i++ {
tikvCdc := servers.Index(i).Interface().(*TiKVCDCSpec)
uniqueHosts.Insert(tikvCdc.Host)
cfig.AddTiKVCDC(tikvCdc.Host, uint64(tikvCdc.Port))
}
}
if servers, found := topoHasField("Monitors"); found {
for i := 0; i < servers.Len(); i++ {
monitoring := servers.Index(i).Interface().(*PrometheusSpec)
Expand Down
Loading

0 comments on commit d6a2826

Please sign in to comment.