From b12ae285679acabd6a0f3cc7d524d93f7bd52d02 Mon Sep 17 00:00:00 2001 From: Ling Jin Date: Tue, 1 Jun 2021 11:11:37 +0800 Subject: [PATCH] support ticdc data dir (#1372) --- components/client/go.mod | 1 + components/client/go.sum | 3 +- embed/templates/scripts/run_cdc.sh.tpl | 7 +++ pkg/cluster/manager/check.go | 1 + pkg/cluster/manager/deploy.go | 19 +++++--- pkg/cluster/manager/scale_out.go | 34 +++++++------ pkg/cluster/manager/upgrade.go | 5 ++ pkg/cluster/spec/cdc.go | 23 +++++---- pkg/cluster/spec/spec.go | 11 +++++ pkg/cluster/spec/spec_test.go | 67 ++++++++++++++++++++++++-- pkg/cluster/template/scripts/cdc.go | 33 +++++++++++++ 11 files changed, 168 insertions(+), 36 deletions(-) diff --git a/components/client/go.mod b/components/client/go.mod index 310456f044..2c46f3256e 100644 --- a/components/client/go.mod +++ b/components/client/go.mod @@ -5,6 +5,7 @@ go 1.16 require ( github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/gizak/termui/v3 v3.1.0 + github.com/google/flatbuffers v2.0.0+incompatible // indirect github.com/mattn/go-sqlite3 v2.0.2+incompatible // indirect github.com/mitchellh/go-wordwrap v1.0.0 // indirect github.com/sergi/go-diff v1.1.0 // indirect diff --git a/components/client/go.sum b/components/client/go.sum index 6eaee538f9..00f8eddf56 100644 --- a/components/client/go.sum +++ b/components/client/go.sum @@ -499,8 +499,9 @@ github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4= github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA= github.com/google/flatbuffers v1.11.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= -github.com/google/flatbuffers v1.12.0 h1:/PtAHvnBY4Kqnx/xCQ3OIV9uYcSFGScBsWI3Oogeh6w= github.com/google/flatbuffers v1.12.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/flatbuffers v2.0.0+incompatible h1:dicJ2oXwypfwUGnB2/TYWYEKiuk9eYQlQO/AnOHl5mI= +github.com/google/flatbuffers v2.0.0+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= diff --git a/embed/templates/scripts/run_cdc.sh.tpl b/embed/templates/scripts/run_cdc.sh.tpl index b7d0ff104d..028d12642d 100644 --- a/embed/templates/scripts/run_cdc.sh.tpl +++ b/embed/templates/scripts/run_cdc.sh.tpl @@ -24,6 +24,13 @@ exec bin/cdc server \ --addr "0.0.0.0:{{.Port}}" \ --advertise-addr "{{.IP}}:{{.Port}}" \ --pd "{{template "PDList" .Endpoints}}" \ +{{- if .DataDir}} + {{- if .DataDirEnabled}} + --data-dir="{{.DataDir}}" \ + {{- else}} + --sort-dir="{{.DataDir}}/tmp/sorter" \ + {{- end}} +{{- end}} {{- if .TLSEnabled}} --ca tls/ca.crt \ --cert tls/cdc.crt \ diff --git a/pkg/cluster/manager/check.go b/pkg/cluster/manager/check.go index 49a6d71ca1..504ef59c00 100644 --- a/pkg/cluster/manager/check.go +++ b/pkg/cluster/manager/check.go @@ -67,6 +67,7 @@ func (m *Manager) CheckCluster(clusterOrTopoName string, opt CheckOptions, gOpt opt.IdentityFile = m.specManager.Path(clusterName, "ssh", "id_rsa") topo = *metadata.Topology + topo.AdjustByVersion(metadata.Version) } else { // check before cluster is deployed topoFileName := clusterOrTopoName diff --git a/pkg/cluster/manager/deploy.go b/pkg/cluster/manager/deploy.go index f38e82624e..c6eb7456a8 100644 --- a/pkg/cluster/manager/deploy.go +++ b/pkg/cluster/manager/deploy.go @@ -112,14 +112,17 @@ func (m *Manager) Deploy( base.GlobalOptions.SSHType = sshType } - if topo, ok := topo.(*spec.Specification); ok && !opt.NoLabels { - // Check if TiKV's label set correctly - lbs, err := topo.LocationLabels() - if err != nil { - return err - } - if err := spec.CheckTiKVLabels(lbs, topo); err != nil { - return perrs.Errorf("check TiKV label failed, please fix that before continue:\n%s", err) + if topo, ok := topo.(*spec.Specification); ok { + topo.AdjustByVersion(clusterVersion) + if !opt.NoLabels { + // Check if TiKV's label set correctly + lbs, err := topo.LocationLabels() + if err != nil { + return err + } + if err := spec.CheckTiKVLabels(lbs, topo); err != nil { + return perrs.Errorf("check TiKV label failed, please fix that before continue:\n%s", err) + } } } diff --git a/pkg/cluster/manager/scale_out.go b/pkg/cluster/manager/scale_out.go index ea8740b702..a6e8d936bf 100644 --- a/pkg/cluster/manager/scale_out.go +++ b/pkg/cluster/manager/scale_out.go @@ -65,7 +65,6 @@ func (m *Manager) ScaleOut( topo := metadata.GetTopology() base := metadata.GetBaseMeta() - // Inherit existing global configuration. We must assign the inherited values before unmarshalling // because some default value rely on the global options and monitored options. newPart := topo.NewPart() @@ -77,6 +76,9 @@ func (m *Manager) ScaleOut( !errors.Is(perrs.Cause(err), spec.ErrNoTiSparkMaster) { return err } + if newPartTopo, ok := newPart.(*spec.Specification); ok { + newPartTopo.AdjustByVersion(base.Version) + } if err := validateNewTopo(newPart); err != nil { return err @@ -89,21 +91,23 @@ func (m *Manager) ScaleOut( } spec.ExpandRelativeDir(mergedTopo) - if topo, ok := topo.(*spec.Specification); ok && !opt.NoLabels { + if topo, ok := mergedTopo.(*spec.Specification); ok { // Check if TiKV's label set correctly - pdList := topo.BaseTopo().MasterList - tlsCfg, err := topo.TLSConfig(m.specManager.Path(name, spec.TLSCertKeyDir)) - if err != nil { - return err - } - pdClient := api.NewPDClient(pdList, 10*time.Second, tlsCfg) - lbs, placementRule, err := pdClient.GetLocationLabels() - if err != nil { - return err - } - if !placementRule { - if err := spec.CheckTiKVLabels(lbs, mergedTopo.(*spec.Specification)); err != nil { - return perrs.Errorf("check TiKV label failed, please fix that before continue:\n%s", err) + if !opt.NoLabels { + pdList := topo.BaseTopo().MasterList + tlsCfg, err := topo.TLSConfig(m.specManager.Path(name, spec.TLSCertKeyDir)) + if err != nil { + return err + } + pdClient := api.NewPDClient(pdList, 10*time.Second, tlsCfg) + lbs, placementRule, err := pdClient.GetLocationLabels() + if err != nil { + return err + } + if !placementRule { + if err := spec.CheckTiKVLabels(lbs, mergedTopo.(*spec.Specification)); err != nil { + return perrs.Errorf("check TiKV label failed, please fix that before continue:\n%s", err) + } } } } diff --git a/pkg/cluster/manager/upgrade.go b/pkg/cluster/manager/upgrade.go index c45eeff6ca..427b2fc377 100644 --- a/pkg/cluster/manager/upgrade.go +++ b/pkg/cluster/manager/upgrade.go @@ -96,6 +96,11 @@ func (m *Manager) Upgrade(name string, clusterVersion string, opt operator.Optio // Deploy component tb := task.NewBuilder() + + // for some component, dataDirs might need to be created due to upgrade + // eg: TiCDC support DataDir since v4.0.13 + tb = tb.Mkdir(topo.BaseTopo().GlobalOptions.User, inst.GetHost(), dataDirs...) + if inst.IsImported() { switch inst.ComponentName() { case spec.ComponentPrometheus, spec.ComponentGrafana, spec.ComponentAlertmanager: diff --git a/pkg/cluster/spec/cdc.go b/pkg/cluster/spec/cdc.go index bb241b07dc..2b503a771c 100644 --- a/pkg/cluster/spec/cdc.go +++ b/pkg/cluster/spec/cdc.go @@ -35,6 +35,7 @@ type CDCSpec struct { Patched bool `yaml:"patched,omitempty"` Port int `yaml:"port" default:"8300"` DeployDir string `yaml:"deploy_dir,omitempty"` + DataDir string `yaml:"data_dir,omitempty"` LogDir string `yaml:"log_dir,omitempty"` Offline bool `yaml:"offline,omitempty"` GCTTL int64 `yaml:"gc-ttl,omitempty" validate:"gc-ttl:editable"` @@ -84,7 +85,7 @@ func (c *CDCComponent) Instances() []Instance { ins := make([]Instance, 0, len(c.Topology.CDCServers)) for _, s := range c.Topology.CDCServers { s := s - ins = append(ins, &CDCInstance{BaseInstance{ + instance := &CDCInstance{BaseInstance{ InstanceSpec: s, Name: c.Name(), Host: s.Host, @@ -103,7 +104,12 @@ func (c *CDCComponent) Instances() []Instance { UptimeFn: func(tlsCfg *tls.Config) time.Duration { return UptimeByHost(s.Host, s.Port, tlsCfg) }, - }, c.Topology}) + }, c.Topology} + if s.DataDir != "" { + instance.Dirs = append(instance.Dirs, s.DataDir) + } + + ins = append(ins, instance) } return ins } @@ -151,11 +157,10 @@ func (i *CDCInstance) InitConfig( globalConfig := topo.ServerConfigs.CDC instanceConfig := spec.Config - configFileSupported := false - if semver.Compare(clusterVersion, "v4.0.13") >= 0 && clusterVersion != "v5.0.0-rc" { - configFileSupported = true - } else if len(globalConfig)+len(instanceConfig) > 0 { - return perrs.New("server_config is only supported with TiCDC version v4.0.13 or later") + if semver.Compare(clusterVersion, "v4.0.13") == -1 { + if len(globalConfig)+len(instanceConfig) > 0 { + return perrs.New("server_config is only supported with TiCDC version v4.0.13 or later") + } } cfg := scripts.NewCDCScript( @@ -167,8 +172,8 @@ func (i *CDCInstance) InitConfig( spec.TZ, ).WithPort(spec.Port).WithNumaNode(spec.NumaNode).AppendEndpoints(topo.Endpoints(deployUser)...) - if configFileSupported { - cfg = cfg.WithConfigFileEnabled() + if len(paths.Data) != 0 { + cfg = cfg.PatchByVersion(clusterVersion, paths.Data[0]) } fp := filepath.Join(paths.Cache, fmt.Sprintf("run_cdc_%s_%d.sh", i.GetHost(), i.GetPort())) diff --git a/pkg/cluster/spec/spec.go b/pkg/cluster/spec/spec.go index d50549e4e4..43aa4f58fc 100644 --- a/pkg/cluster/spec/spec.go +++ b/pkg/cluster/spec/spec.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tiup/pkg/logger/log" "github.com/pingcap/tiup/pkg/meta" clientv3 "go.etcd.io/etcd/client/v3" + "golang.org/x/mod/semver" ) const ( @@ -376,6 +377,16 @@ func (s *Specification) GetPDList() []string { return pdList } +// AdjustByVersion modify the spec by cluster version. +func (s *Specification) AdjustByVersion(clusterVersion string) { + // CDC does not support data dir for version below v4.0.13, and also v5.0.0-rc, set it to empty. + if semver.Compare(clusterVersion, "v4.0.13") == -1 || clusterVersion == "v5.0.0-rc" { + for _, server := range s.CDCServers { + server.DataDir = "" + } + } +} + // GetDashboardAddress returns the cluster's dashboard addr func (s *Specification) GetDashboardAddress(tlsCfg *tls.Config, pdList ...string) (string, error) { pc := api.NewPDClient(pdList, statusQueryTimeout, tlsCfg) diff --git a/pkg/cluster/spec/spec_test.go b/pkg/cluster/spec/spec_test.go index 86e838906d..e32ca640cf 100644 --- a/pkg/cluster/spec/spec_test.go +++ b/pkg/cluster/spec/spec_test.go @@ -21,6 +21,7 @@ import ( "github.com/BurntSushi/toml" . "github.com/pingcap/check" "github.com/pingcap/tiup/pkg/cluster/template/scripts" + "golang.org/x/mod/semver" "gopkg.in/yaml.v2" ) @@ -37,6 +38,7 @@ func (s *metaSuiteTopo) TestDefaultDataDir(c *C) { // Test with without global DataDir. topo := new(Specification) topo.TiKVServers = append(topo.TiKVServers, &TiKVSpec{Host: "1.1.1.1", Port: 22}) + topo.CDCServers = append(topo.CDCServers, &CDCSpec{Host: "2.3.3.3", Port: 22}) data, err := yaml.Marshal(topo) c.Assert(err, IsNil) @@ -46,6 +48,7 @@ func (s *metaSuiteTopo) TestDefaultDataDir(c *C) { c.Assert(err, IsNil) c.Assert(topo.GlobalOptions.DataDir, Equals, "data") c.Assert(topo.TiKVServers[0].DataDir, Equals, "data") + c.Assert(topo.CDCServers[0].DataDir, Equals, "data") // Can keep the default value. data, err = yaml.Marshal(topo) @@ -55,21 +58,27 @@ func (s *metaSuiteTopo) TestDefaultDataDir(c *C) { c.Assert(err, IsNil) c.Assert(topo.GlobalOptions.DataDir, Equals, "data") c.Assert(topo.TiKVServers[0].DataDir, Equals, "data") + c.Assert(topo.CDCServers[0].DataDir, Equals, "data") // Test with global DataDir. topo = new(Specification) - topo.GlobalOptions.DataDir = "/gloable_data" + topo.GlobalOptions.DataDir = "/global_data" topo.TiKVServers = append(topo.TiKVServers, &TiKVSpec{Host: "1.1.1.1", Port: 22}) topo.TiKVServers = append(topo.TiKVServers, &TiKVSpec{Host: "1.1.1.2", Port: 33, DataDir: "/my_data"}) + topo.CDCServers = append(topo.CDCServers, &CDCSpec{Host: "2.3.3.3", Port: 22}) + topo.CDCServers = append(topo.CDCServers, &CDCSpec{Host: "2.3.3.4", Port: 22, DataDir: "/cdc_data"}) data, err = yaml.Marshal(topo) c.Assert(err, IsNil) topo = new(Specification) err = yaml.Unmarshal(data, topo) c.Assert(err, IsNil) - c.Assert(topo.GlobalOptions.DataDir, Equals, "/gloable_data") - c.Assert(topo.TiKVServers[0].DataDir, Equals, "/gloable_data/tikv-22") + c.Assert(topo.GlobalOptions.DataDir, Equals, "/global_data") + c.Assert(topo.TiKVServers[0].DataDir, Equals, "/global_data/tikv-22") c.Assert(topo.TiKVServers[1].DataDir, Equals, "/my_data") + + c.Assert(topo.CDCServers[0].DataDir, Equals, "/global_data/cdc-22") + c.Assert(topo.CDCServers[1].DataDir, Equals, "/cdc_data") } func (s *metaSuiteTopo) TestGlobalOptions(c *C) { @@ -86,6 +95,9 @@ tidb_servers: pd_servers: - host: 172.16.5.53 data_dir: "pd-data" +cdc_servers: + - host: 172.16.5.233 + data_dir: "cdc-data" `), &topo) c.Assert(err, IsNil) c.Assert(topo.GlobalOptions.User, Equals, "test1") @@ -96,6 +108,10 @@ pd_servers: c.Assert(topo.PDServers[0].SSHPort, Equals, 220) c.Assert(topo.PDServers[0].DeployDir, Equals, "test-deploy/pd-2379") c.Assert(topo.PDServers[0].DataDir, Equals, "pd-data") + + c.Assert(topo.CDCServers[0].SSHPort, Equals, 220) + c.Assert(topo.CDCServers[0].DeployDir, Equals, "test-deploy/cdc-8300") + c.Assert(topo.CDCServers[0].DataDir, Equals, "cdc-data") } func (s *metaSuiteTopo) TestDataDirAbsolute(c *C) { @@ -109,11 +125,19 @@ pd_servers: data_dir: "pd-data" - host: 172.16.5.54 client_port: 12379 +cdc_servers: + - host: 172.16.5.233 + data_dir: "cdc-data" + - host: 172.16.5.234 + port: 23333 `), &topo) c.Assert(err, IsNil) c.Assert(topo.PDServers[0].DataDir, Equals, "pd-data") c.Assert(topo.PDServers[1].DataDir, Equals, "/test-data/pd-12379") + + c.Assert(topo.CDCServers[0].DataDir, Equals, "cdc-data") + c.Assert(topo.CDCServers[1].DataDir, Equals, "/test-data/cdc-23333") } func (s *metaSuiteTopo) TestGlobalConfig(c *C) { @@ -684,6 +708,43 @@ tiflash_servers: } } +func (s *metaSuiteTopo) TestTiCDCDataDir(c *C) { + spec := &Specification{} + err := yaml.Unmarshal([]byte(` +cdc_servers: + - host: 172.16.6.191 + data_dir: /tidb-data/cdc-8300 +`), spec) + c.Assert(err, IsNil) + + cdcComp := FindComponent(spec, ComponentCDC) + instances := cdcComp.Instances() + c.Assert(len(instances), Equals, 1) + + checkByVersion := func(version string) { + ins := instances[0].(*CDCInstance) + cfg := scripts.NewCDCScript(ins.GetHost(), "", "", false, 0, ""). + PatchByVersion(version, ins.DataDir()) + + // DataDir support since v4.0.13 + checker := Equals + if semver.Compare(version, "v4.0.13") >= 0 && version != "v5.0.0-rc" { + checker = Not(checker) + c.Assert(len(cfg.DataDir), checker, 0) + + // TiCDC support --data-dir since v4.0.14 and v5.0.3 + expected := semver.Compare(version, "v4.0.14") >= 0 || semver.Compare(version, "v5.0.3") >= 0 + c.Assert(cfg.DataDirEnabled, Equals, expected) + } + } + + checkByVersion("v4.0.12") + checkByVersion("v4.0.13") + checkByVersion("v5.0.0-rc") + checkByVersion("v4.0.14") + checkByVersion("v5.0.3") +} + func (s *metaSuiteTopo) TestTiFlashUsersSettings(c *C) { spec := &Specification{} err := yaml.Unmarshal([]byte(` diff --git a/pkg/cluster/template/scripts/cdc.go b/pkg/cluster/template/scripts/cdc.go index ee179444dd..1453495b7e 100644 --- a/pkg/cluster/template/scripts/cdc.go +++ b/pkg/cluster/template/scripts/cdc.go @@ -20,6 +20,7 @@ import ( "text/template" "github.com/pingcap/tiup/embed" + "golang.org/x/mod/semver" ) // CDCScript represent the data to generate cdc config @@ -28,12 +29,14 @@ type CDCScript struct { Port int DeployDir string LogDir string + DataDir string NumaNode string GCTTL int64 TZ string TLSEnabled bool Endpoints []*PDScript ConfigFileEnabled bool + DataDirEnabled bool } // NewCDCScript returns a CDCScript with given arguments @@ -67,6 +70,18 @@ func (c *CDCScript) WithConfigFileEnabled() *CDCScript { return c } +// WithDataDir set DataDir field of TiCDCScript +func (c *CDCScript) WithDataDir(dataDir string) *CDCScript { + c.DataDir = dataDir + return c +} + +// WithDataDirEnabled enables CDC data-dir +func (c *CDCScript) WithDataDirEnabled() *CDCScript { + c.DataDirEnabled = true + return c +} + // Config generate the config file data. func (c *CDCScript) Config() ([]byte, error) { fp := path.Join("templates", "scripts", "run_cdc.sh.tpl") @@ -106,3 +121,21 @@ func (c *CDCScript) AppendEndpoints(ends ...*PDScript) *CDCScript { c.Endpoints = append(c.Endpoints, ends...) return c } + +// PatchByVersion update fields by cluster version +func (c *CDCScript) PatchByVersion(clusterVersion, dataDir string) *CDCScript { + ignore := map[string]struct{}{ + "v5.0.0-rc": {}, + "v5.1.0-alpha": {}, + } + + if _, ok := ignore[clusterVersion]; !ok && semver.Compare(clusterVersion, "v4.0.13") >= 0 { + c = c.WithConfigFileEnabled().WithDataDir(dataDir) + + if semver.Compare(clusterVersion, "v4.0.14") >= 0 || semver.Compare(clusterVersion, "v5.0.3") >= 0 { + c = c.WithDataDirEnabled() + } + } + + return c +}