Skip to content

Commit

Permalink
support ticdc data dir (#1372)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ling Jin authored Jun 1, 2021
1 parent 5928416 commit b12ae28
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 36 deletions.
1 change: 1 addition & 0 deletions components/client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.16
require (
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/gizak/termui/v3 v3.1.0
github.com/google/flatbuffers v2.0.0+incompatible // indirect
github.com/mattn/go-sqlite3 v2.0.2+incompatible // indirect
github.com/mitchellh/go-wordwrap v1.0.0 // indirect
github.com/sergi/go-diff v1.1.0 // indirect
Expand Down
3 changes: 2 additions & 1 deletion components/client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,9 @@ github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ
github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4=
github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA=
github.com/google/flatbuffers v1.11.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/flatbuffers v1.12.0 h1:/PtAHvnBY4Kqnx/xCQ3OIV9uYcSFGScBsWI3Oogeh6w=
github.com/google/flatbuffers v1.12.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/flatbuffers v2.0.0+incompatible h1:dicJ2oXwypfwUGnB2/TYWYEKiuk9eYQlQO/AnOHl5mI=
github.com/google/flatbuffers v2.0.0+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
Expand Down
7 changes: 7 additions & 0 deletions embed/templates/scripts/run_cdc.sh.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ exec bin/cdc server \
--addr "0.0.0.0:{{.Port}}" \
--advertise-addr "{{.IP}}:{{.Port}}" \
--pd "{{template "PDList" .Endpoints}}" \
{{- if .DataDir}}
{{- if .DataDirEnabled}}
--data-dir="{{.DataDir}}" \
{{- else}}
--sort-dir="{{.DataDir}}/tmp/sorter" \
{{- end}}
{{- end}}
{{- if .TLSEnabled}}
--ca tls/ca.crt \
--cert tls/cdc.crt \
Expand Down
1 change: 1 addition & 0 deletions pkg/cluster/manager/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (m *Manager) CheckCluster(clusterOrTopoName string, opt CheckOptions, gOpt
opt.IdentityFile = m.specManager.Path(clusterName, "ssh", "id_rsa")

topo = *metadata.Topology
topo.AdjustByVersion(metadata.Version)
} else { // check before cluster is deployed
topoFileName := clusterOrTopoName

Expand Down
19 changes: 11 additions & 8 deletions pkg/cluster/manager/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,17 @@ func (m *Manager) Deploy(
base.GlobalOptions.SSHType = sshType
}

if topo, ok := topo.(*spec.Specification); ok && !opt.NoLabels {
// Check if TiKV's label set correctly
lbs, err := topo.LocationLabels()
if err != nil {
return err
}
if err := spec.CheckTiKVLabels(lbs, topo); err != nil {
return perrs.Errorf("check TiKV label failed, please fix that before continue:\n%s", err)
if topo, ok := topo.(*spec.Specification); ok {
topo.AdjustByVersion(clusterVersion)
if !opt.NoLabels {
// Check if TiKV's label set correctly
lbs, err := topo.LocationLabels()
if err != nil {
return err
}
if err := spec.CheckTiKVLabels(lbs, topo); err != nil {
return perrs.Errorf("check TiKV label failed, please fix that before continue:\n%s", err)
}
}
}

Expand Down
34 changes: 19 additions & 15 deletions pkg/cluster/manager/scale_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ func (m *Manager) ScaleOut(

topo := metadata.GetTopology()
base := metadata.GetBaseMeta()

// Inherit existing global configuration. We must assign the inherited values before unmarshalling
// because some default value rely on the global options and monitored options.
newPart := topo.NewPart()
Expand All @@ -77,6 +76,9 @@ func (m *Manager) ScaleOut(
!errors.Is(perrs.Cause(err), spec.ErrNoTiSparkMaster) {
return err
}
if newPartTopo, ok := newPart.(*spec.Specification); ok {
newPartTopo.AdjustByVersion(base.Version)
}

if err := validateNewTopo(newPart); err != nil {
return err
Expand All @@ -89,21 +91,23 @@ func (m *Manager) ScaleOut(
}
spec.ExpandRelativeDir(mergedTopo)

if topo, ok := topo.(*spec.Specification); ok && !opt.NoLabels {
if topo, ok := mergedTopo.(*spec.Specification); ok {
// Check if TiKV's label set correctly
pdList := topo.BaseTopo().MasterList
tlsCfg, err := topo.TLSConfig(m.specManager.Path(name, spec.TLSCertKeyDir))
if err != nil {
return err
}
pdClient := api.NewPDClient(pdList, 10*time.Second, tlsCfg)
lbs, placementRule, err := pdClient.GetLocationLabels()
if err != nil {
return err
}
if !placementRule {
if err := spec.CheckTiKVLabels(lbs, mergedTopo.(*spec.Specification)); err != nil {
return perrs.Errorf("check TiKV label failed, please fix that before continue:\n%s", err)
if !opt.NoLabels {
pdList := topo.BaseTopo().MasterList
tlsCfg, err := topo.TLSConfig(m.specManager.Path(name, spec.TLSCertKeyDir))
if err != nil {
return err
}
pdClient := api.NewPDClient(pdList, 10*time.Second, tlsCfg)
lbs, placementRule, err := pdClient.GetLocationLabels()
if err != nil {
return err
}
if !placementRule {
if err := spec.CheckTiKVLabels(lbs, mergedTopo.(*spec.Specification)); err != nil {
return perrs.Errorf("check TiKV label failed, please fix that before continue:\n%s", err)
}
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/cluster/manager/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ func (m *Manager) Upgrade(name string, clusterVersion string, opt operator.Optio

// Deploy component
tb := task.NewBuilder()

// for some component, dataDirs might need to be created due to upgrade
// eg: TiCDC support DataDir since v4.0.13
tb = tb.Mkdir(topo.BaseTopo().GlobalOptions.User, inst.GetHost(), dataDirs...)

if inst.IsImported() {
switch inst.ComponentName() {
case spec.ComponentPrometheus, spec.ComponentGrafana, spec.ComponentAlertmanager:
Expand Down
23 changes: 14 additions & 9 deletions pkg/cluster/spec/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type CDCSpec struct {
Patched bool `yaml:"patched,omitempty"`
Port int `yaml:"port" default:"8300"`
DeployDir string `yaml:"deploy_dir,omitempty"`
DataDir string `yaml:"data_dir,omitempty"`
LogDir string `yaml:"log_dir,omitempty"`
Offline bool `yaml:"offline,omitempty"`
GCTTL int64 `yaml:"gc-ttl,omitempty" validate:"gc-ttl:editable"`
Expand Down Expand Up @@ -84,7 +85,7 @@ func (c *CDCComponent) Instances() []Instance {
ins := make([]Instance, 0, len(c.Topology.CDCServers))
for _, s := range c.Topology.CDCServers {
s := s
ins = append(ins, &CDCInstance{BaseInstance{
instance := &CDCInstance{BaseInstance{
InstanceSpec: s,
Name: c.Name(),
Host: s.Host,
Expand All @@ -103,7 +104,12 @@ func (c *CDCComponent) Instances() []Instance {
UptimeFn: func(tlsCfg *tls.Config) time.Duration {
return UptimeByHost(s.Host, s.Port, tlsCfg)
},
}, c.Topology})
}, c.Topology}
if s.DataDir != "" {
instance.Dirs = append(instance.Dirs, s.DataDir)
}

ins = append(ins, instance)
}
return ins
}
Expand Down Expand Up @@ -151,11 +157,10 @@ func (i *CDCInstance) InitConfig(
globalConfig := topo.ServerConfigs.CDC
instanceConfig := spec.Config

configFileSupported := false
if semver.Compare(clusterVersion, "v4.0.13") >= 0 && clusterVersion != "v5.0.0-rc" {
configFileSupported = true
} else if len(globalConfig)+len(instanceConfig) > 0 {
return perrs.New("server_config is only supported with TiCDC version v4.0.13 or later")
if semver.Compare(clusterVersion, "v4.0.13") == -1 {
if len(globalConfig)+len(instanceConfig) > 0 {
return perrs.New("server_config is only supported with TiCDC version v4.0.13 or later")
}
}

cfg := scripts.NewCDCScript(
Expand All @@ -167,8 +172,8 @@ func (i *CDCInstance) InitConfig(
spec.TZ,
).WithPort(spec.Port).WithNumaNode(spec.NumaNode).AppendEndpoints(topo.Endpoints(deployUser)...)

if configFileSupported {
cfg = cfg.WithConfigFileEnabled()
if len(paths.Data) != 0 {
cfg = cfg.PatchByVersion(clusterVersion, paths.Data[0])
}

fp := filepath.Join(paths.Cache, fmt.Sprintf("run_cdc_%s_%d.sh", i.GetHost(), i.GetPort()))
Expand Down
11 changes: 11 additions & 0 deletions pkg/cluster/spec/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tiup/pkg/logger/log"
"github.com/pingcap/tiup/pkg/meta"
clientv3 "go.etcd.io/etcd/client/v3"
"golang.org/x/mod/semver"
)

const (
Expand Down Expand Up @@ -376,6 +377,16 @@ func (s *Specification) GetPDList() []string {
return pdList
}

// AdjustByVersion modify the spec by cluster version.
func (s *Specification) AdjustByVersion(clusterVersion string) {
// CDC does not support data dir for version below v4.0.13, and also v5.0.0-rc, set it to empty.
if semver.Compare(clusterVersion, "v4.0.13") == -1 || clusterVersion == "v5.0.0-rc" {
for _, server := range s.CDCServers {
server.DataDir = ""
}
}
}

// GetDashboardAddress returns the cluster's dashboard addr
func (s *Specification) GetDashboardAddress(tlsCfg *tls.Config, pdList ...string) (string, error) {
pc := api.NewPDClient(pdList, statusQueryTimeout, tlsCfg)
Expand Down
67 changes: 64 additions & 3 deletions pkg/cluster/spec/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/BurntSushi/toml"
. "github.com/pingcap/check"
"github.com/pingcap/tiup/pkg/cluster/template/scripts"
"golang.org/x/mod/semver"
"gopkg.in/yaml.v2"
)

Expand All @@ -37,6 +38,7 @@ func (s *metaSuiteTopo) TestDefaultDataDir(c *C) {
// Test with without global DataDir.
topo := new(Specification)
topo.TiKVServers = append(topo.TiKVServers, &TiKVSpec{Host: "1.1.1.1", Port: 22})
topo.CDCServers = append(topo.CDCServers, &CDCSpec{Host: "2.3.3.3", Port: 22})
data, err := yaml.Marshal(topo)
c.Assert(err, IsNil)

Expand All @@ -46,6 +48,7 @@ func (s *metaSuiteTopo) TestDefaultDataDir(c *C) {
c.Assert(err, IsNil)
c.Assert(topo.GlobalOptions.DataDir, Equals, "data")
c.Assert(topo.TiKVServers[0].DataDir, Equals, "data")
c.Assert(topo.CDCServers[0].DataDir, Equals, "data")

// Can keep the default value.
data, err = yaml.Marshal(topo)
Expand All @@ -55,21 +58,27 @@ func (s *metaSuiteTopo) TestDefaultDataDir(c *C) {
c.Assert(err, IsNil)
c.Assert(topo.GlobalOptions.DataDir, Equals, "data")
c.Assert(topo.TiKVServers[0].DataDir, Equals, "data")
c.Assert(topo.CDCServers[0].DataDir, Equals, "data")

// Test with global DataDir.
topo = new(Specification)
topo.GlobalOptions.DataDir = "/gloable_data"
topo.GlobalOptions.DataDir = "/global_data"
topo.TiKVServers = append(topo.TiKVServers, &TiKVSpec{Host: "1.1.1.1", Port: 22})
topo.TiKVServers = append(topo.TiKVServers, &TiKVSpec{Host: "1.1.1.2", Port: 33, DataDir: "/my_data"})
topo.CDCServers = append(topo.CDCServers, &CDCSpec{Host: "2.3.3.3", Port: 22})
topo.CDCServers = append(topo.CDCServers, &CDCSpec{Host: "2.3.3.4", Port: 22, DataDir: "/cdc_data"})
data, err = yaml.Marshal(topo)
c.Assert(err, IsNil)

topo = new(Specification)
err = yaml.Unmarshal(data, topo)
c.Assert(err, IsNil)
c.Assert(topo.GlobalOptions.DataDir, Equals, "/gloable_data")
c.Assert(topo.TiKVServers[0].DataDir, Equals, "/gloable_data/tikv-22")
c.Assert(topo.GlobalOptions.DataDir, Equals, "/global_data")
c.Assert(topo.TiKVServers[0].DataDir, Equals, "/global_data/tikv-22")
c.Assert(topo.TiKVServers[1].DataDir, Equals, "/my_data")

c.Assert(topo.CDCServers[0].DataDir, Equals, "/global_data/cdc-22")
c.Assert(topo.CDCServers[1].DataDir, Equals, "/cdc_data")
}

func (s *metaSuiteTopo) TestGlobalOptions(c *C) {
Expand All @@ -86,6 +95,9 @@ tidb_servers:
pd_servers:
- host: 172.16.5.53
data_dir: "pd-data"
cdc_servers:
- host: 172.16.5.233
data_dir: "cdc-data"
`), &topo)
c.Assert(err, IsNil)
c.Assert(topo.GlobalOptions.User, Equals, "test1")
Expand All @@ -96,6 +108,10 @@ pd_servers:
c.Assert(topo.PDServers[0].SSHPort, Equals, 220)
c.Assert(topo.PDServers[0].DeployDir, Equals, "test-deploy/pd-2379")
c.Assert(topo.PDServers[0].DataDir, Equals, "pd-data")

c.Assert(topo.CDCServers[0].SSHPort, Equals, 220)
c.Assert(topo.CDCServers[0].DeployDir, Equals, "test-deploy/cdc-8300")
c.Assert(topo.CDCServers[0].DataDir, Equals, "cdc-data")
}

func (s *metaSuiteTopo) TestDataDirAbsolute(c *C) {
Expand All @@ -109,11 +125,19 @@ pd_servers:
data_dir: "pd-data"
- host: 172.16.5.54
client_port: 12379
cdc_servers:
- host: 172.16.5.233
data_dir: "cdc-data"
- host: 172.16.5.234
port: 23333
`), &topo)
c.Assert(err, IsNil)

c.Assert(topo.PDServers[0].DataDir, Equals, "pd-data")
c.Assert(topo.PDServers[1].DataDir, Equals, "/test-data/pd-12379")

c.Assert(topo.CDCServers[0].DataDir, Equals, "cdc-data")
c.Assert(topo.CDCServers[1].DataDir, Equals, "/test-data/cdc-23333")
}

func (s *metaSuiteTopo) TestGlobalConfig(c *C) {
Expand Down Expand Up @@ -684,6 +708,43 @@ tiflash_servers:
}
}

func (s *metaSuiteTopo) TestTiCDCDataDir(c *C) {
spec := &Specification{}
err := yaml.Unmarshal([]byte(`
cdc_servers:
- host: 172.16.6.191
data_dir: /tidb-data/cdc-8300
`), spec)
c.Assert(err, IsNil)

cdcComp := FindComponent(spec, ComponentCDC)
instances := cdcComp.Instances()
c.Assert(len(instances), Equals, 1)

checkByVersion := func(version string) {
ins := instances[0].(*CDCInstance)
cfg := scripts.NewCDCScript(ins.GetHost(), "", "", false, 0, "").
PatchByVersion(version, ins.DataDir())

// DataDir support since v4.0.13
checker := Equals
if semver.Compare(version, "v4.0.13") >= 0 && version != "v5.0.0-rc" {
checker = Not(checker)
c.Assert(len(cfg.DataDir), checker, 0)

// TiCDC support --data-dir since v4.0.14 and v5.0.3
expected := semver.Compare(version, "v4.0.14") >= 0 || semver.Compare(version, "v5.0.3") >= 0
c.Assert(cfg.DataDirEnabled, Equals, expected)
}
}

checkByVersion("v4.0.12")
checkByVersion("v4.0.13")
checkByVersion("v5.0.0-rc")
checkByVersion("v4.0.14")
checkByVersion("v5.0.3")
}

func (s *metaSuiteTopo) TestTiFlashUsersSettings(c *C) {
spec := &Specification{}
err := yaml.Unmarshal([]byte(`
Expand Down
Loading

0 comments on commit b12ae28

Please sign in to comment.