Skip to content

Commit

Permalink
cluster: support use different component versions (pingcap#2264)
Browse files Browse the repository at this point in the history
  • Loading branch information
nexustar authored Nov 7, 2023
1 parent 1e2e855 commit eb2f058
Show file tree
Hide file tree
Showing 33 changed files with 511 additions and 154 deletions.
1 change: 1 addition & 0 deletions components/cluster/command/display.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func newDisplayCmd() *cobra.Command {
cmd.Flags().BoolVar(&dopt.ShowProcess, "process", false, "display cpu and memory usage of nodes")
cmd.Flags().BoolVar(&dopt.ShowManageHost, "manage-host", false, "display manage host of nodes")
cmd.Flags().BoolVar(&dopt.ShowNuma, "numa", false, "display numa information of nodes")
cmd.Flags().BoolVar(&dopt.ShowVersions, "versions", false, "display component version of instances")
cmd.Flags().Uint64Var(&statusTimeout, "status-timeout", 10, "Timeout in seconds when getting node status")

return cmd
Expand Down
2 changes: 1 addition & 1 deletion components/cluster/command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func init() {
}

tidbSpec = spec.GetSpecManager()
cm = manager.NewManager("tidb", tidbSpec, spec.TiDBComponentVersion, log)
cm = manager.NewManager("tidb", tidbSpec, log)
if cmd.Name() != "__complete" {
logger.EnableAuditLog(spec.AuditDir())
}
Expand Down
29 changes: 28 additions & 1 deletion components/cluster/command/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
package command

import (
"github.com/pingcap/tiup/pkg/cluster/spec"
"github.com/pingcap/tiup/pkg/utils"
"github.com/spf13/cobra"
)

func newUpgradeCmd() *cobra.Command {
offlineMode := false
ignoreVersionCheck := false
var tidbVer, tikvVer, pdVer, tiflashVer, kvcdcVer, dashboardVer, cdcVer, alertmanagerVer, nodeExporterVer, blackboxExporterVer, tiproxyVer string

cmd := &cobra.Command{
Use: "upgrade <cluster-name> <version>",
Expand All @@ -39,7 +41,21 @@ func newUpgradeCmd() *cobra.Command {
teleCommand = append(teleCommand, scrubClusterName(clusterName))
teleCommand = append(teleCommand, version)

return cm.Upgrade(clusterName, version, gOpt, skipConfirm, offlineMode, ignoreVersionCheck)
componentVersions := map[string]string{
spec.ComponentDashboard: dashboardVer,
spec.ComponentAlertmanager: alertmanagerVer,
spec.ComponentTiDB: tidbVer,
spec.ComponentTiKV: tikvVer,
spec.ComponentPD: pdVer,
spec.ComponentTiFlash: tiflashVer,
spec.ComponentTiKVCDC: kvcdcVer,
spec.ComponentCDC: cdcVer,
spec.ComponentTiProxy: tiproxyVer,
spec.ComponentBlackboxExporter: blackboxExporterVer,
spec.ComponentNodeExporter: nodeExporterVer,
}

return cm.Upgrade(clusterName, version, componentVersions, gOpt, skipConfirm, offlineMode, ignoreVersionCheck)
},
ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
switch len(args) {
Expand All @@ -58,5 +74,16 @@ func newUpgradeCmd() *cobra.Command {
cmd.Flags().StringVar(&gOpt.SSHCustomScripts.BeforeRestartInstance.Raw, "pre-upgrade-script", "", "(EXPERIMENTAL) Custom script to be executed on each server before the server is upgraded")
cmd.Flags().StringVar(&gOpt.SSHCustomScripts.AfterRestartInstance.Raw, "post-upgrade-script", "", "(EXPERIMENTAL) Custom script to be executed on each server after the server is upgraded")

// cmd.Flags().StringVar(&tidbVer, "tidb-version", "", "Fix the version of tidb and no longer follows the cluster version.")
cmd.Flags().StringVar(&tikvVer, "tikv-version", "", "Fix the version of tikv and no longer follows the cluster version.")
cmd.Flags().StringVar(&pdVer, "pd-version", "", "Fix the version of pv and no longer follows the cluster version.")
cmd.Flags().StringVar(&tiflashVer, "tiflash-version", "", "Fix the version of tiflash and no longer follows the cluster version.")
cmd.Flags().StringVar(&dashboardVer, "tidb-dashboard-version", "", "Fix the version of tidb-dashboard and no longer follows the cluster version.")
cmd.Flags().StringVar(&cdcVer, "cdc-version", "", "Fix the version of cdc and no longer follows the cluster version.")
cmd.Flags().StringVar(&kvcdcVer, "tikv-cdc-version", "", "Fix the version of tikv-cdc and no longer follows the cluster version.")
cmd.Flags().StringVar(&alertmanagerVer, "alertmanager-version", "", "Fix the version of alertmanager and no longer follows the cluster version.")
cmd.Flags().StringVar(&nodeExporterVer, "node-exporter-version", "", "Fix the version of node-exporter and no longer follows the cluster version.")
cmd.Flags().StringVar(&blackboxExporterVer, "blackbox-exporter-version", "", "Fix the version of blackbox-exporter and no longer follows the cluster version.")
cmd.Flags().StringVar(&tiproxyVer, "tiproxy-version", "", "Fix the version of tiproxy and no longer follows the cluster version.")
return cmd
}
2 changes: 1 addition & 1 deletion components/dm/command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ please backup your data before process.`,

dmspec = spec.GetSpecManager()
logger.EnableAuditLog(cspec.AuditDir())
cm = manager.NewManager("dm", dmspec, spec.DMComponentVersion, log)
cm = manager.NewManager("dm", dmspec, log)

// Running in other OS/ARCH Should be fine we only download manifest file.
env, err = tiupmeta.InitEnv(repository.Options{
Expand Down
2 changes: 1 addition & 1 deletion components/dm/command/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func newUpgradeCmd() *cobra.Command {
return cmd.Help()
}

return cm.Upgrade(args[0], args[1], gOpt, skipConfirm, offlineMode, ignoreVersionCheck)
return cm.Upgrade(args[0], args[1], nil, gOpt, skipConfirm, offlineMode, ignoreVersionCheck)
},
ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
switch len(args) {
Expand Down
18 changes: 0 additions & 18 deletions components/dm/spec/bindversion.go

This file was deleted.

22 changes: 22 additions & 0 deletions components/dm/spec/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ func (c *DMMasterComponent) Role() string {
return ComponentDMMaster
}

// CalculateVersion implements the Component interface
func (c *DMMasterComponent) CalculateVersion(clusterVersion string) string {
return clusterVersion
}

// SetVersion implements Component interface.
func (c *DMMasterComponent) SetVersion(version string) {
// not supported now
}

// Instances implements Component interface.
func (c *DMMasterComponent) Instances() []Instance {
ins := make([]Instance, 0)
Expand Down Expand Up @@ -96,6 +106,7 @@ func (c *DMMasterComponent) Instances() []Instance {
UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration {
return spec.UptimeByHost(s.Host, s.Port, timeout, tlsCfg)
},
Component: c,
},
topo: c.Topology,
})
Expand Down Expand Up @@ -271,6 +282,16 @@ func (c *DMWorkerComponent) Role() string {
return ComponentDMWorker
}

// CalculateVersion implements the Component interface
func (c *DMWorkerComponent) CalculateVersion(clusterVersion string) string {
return clusterVersion
}

// SetVersion implements Component interface.
func (c *DMWorkerComponent) SetVersion(version string) {
// not supported now
}

// Instances implements Component interface.
func (c *DMWorkerComponent) Instances() []Instance {
ins := make([]Instance, 0)
Expand Down Expand Up @@ -298,6 +319,7 @@ func (c *DMWorkerComponent) Instances() []Instance {
UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration {
return spec.UptimeByHost(s.Host, s.Port, timeout, tlsCfg)
},
Component: c,
},
topo: c.Topology,
})
Expand Down
19 changes: 9 additions & 10 deletions pkg/cluster/manager/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,15 +164,14 @@ func buildScaleOutTask(
newPart,
m.logger,
gOpt,
m.bindVersion,
)

sshType := topo.BaseTopo().GlobalOptions.SSHType

var iterErr error
// Deploy the new topology and refresh the configuration
newPart.IterInstance(func(inst spec.Instance) {
version := m.bindVersion(inst.ComponentName(), base.Version)
version := inst.CalculateVersion(base.Version)
deployDir := spec.Abs(base.User, inst.DeployDir())
// data dir would be empty for components which don't need it
dataDirs := spec.MultiDirAbs(base.User, inst.DataDir())
Expand Down Expand Up @@ -213,7 +212,7 @@ func buildScaleOutTask(
inst.ComponentSource(),
inst.OS(),
inst.Arch(),
version,
inst.CalculateVersion(version),
srcPath,
inst.GetManageHost(),
deployDir,
Expand All @@ -235,9 +234,9 @@ func buildScaleOutTask(
// data dir would be empty for components which don't need it
// Download and copy the latest component to remote if the cluster is imported from Ansible
tb := task.NewBuilder(m.logger)
version := inst.CalculateVersion(base.Version)
switch compName := inst.ComponentName(); compName {
case spec.ComponentGrafana, spec.ComponentPrometheus, spec.ComponentAlertmanager:
version := m.bindVersion(compName, base.Version)
tb.Download(compName, inst.OS(), inst.Arch(), version).
CopyComponent(compName, inst.OS(), inst.Arch(), version, "", inst.GetManageHost(), deployDir)
}
Expand Down Expand Up @@ -272,7 +271,6 @@ func buildScaleOutTask(
noAgentHosts,
topo.BaseTopo().GlobalOptions,
topo.BaseTopo().MonitoredOptions,
base.Version,
gOpt,
p,
)
Expand Down Expand Up @@ -429,7 +427,6 @@ func buildMonitoredDeployTask(
noAgentHosts set.StringSet, // hosts that do not deploy monitor agents
globalOptions *spec.GlobalOptions,
monitoredOptions *spec.MonitoredOptions,
version string,
gOpt operator.Options,
p *tui.SSHConnectionProps,
) (downloadCompTasks []*task.StepDisplay, deployCompTasks []*task.StepDisplay, err error) {
Expand All @@ -440,7 +437,10 @@ func buildMonitoredDeployTask(
uniqueCompOSArch := set.NewStringSet()
// monitoring agents
for _, comp := range []string{spec.ComponentNodeExporter, spec.ComponentBlackboxExporter} {
version := m.bindVersion(comp, version)
version := monitoredOptions.NodeExporterVersion
if comp == spec.ComponentBlackboxExporter {
version = monitoredOptions.BlackboxExporterVersion
}
for host, info := range uniqueHosts {
// skip deploying monitoring agents if the instance is marked so
if noAgentHosts.Exist(host) {
Expand Down Expand Up @@ -638,9 +638,9 @@ func buildInitConfigTasks(
// Download and copy the latest component to remote if the cluster is imported from Ansible
tb := task.NewBuilder(m.logger)
if instance.IsImported() {
version := instance.CalculateVersion(base.Version)
switch compName {
case spec.ComponentGrafana, spec.ComponentPrometheus, spec.ComponentAlertmanager:
version := m.bindVersion(compName, base.Version)
tb.Download(compName, instance.OS(), instance.Arch(), version).
CopyComponent(
compName,
Expand Down Expand Up @@ -683,7 +683,6 @@ func buildDownloadCompTasks(
topo spec.Topology,
logger *logprinter.Logger,
gOpt operator.Options,
bindVersion spec.BindVersion,
) []*task.StepDisplay {
var tasks []*task.StepDisplay
uniqueTaskList := set.NewStringSet()
Expand All @@ -698,7 +697,7 @@ func buildDownloadCompTasks(
// download spark as dependency of tispark
tasks = append(tasks, buildDownloadSparkTask(inst, logger, gOpt))
} else {
version = bindVersion(inst.ComponentSource(), clusterVersion)
version = inst.CalculateVersion(clusterVersion)
}

t := task.NewBuilder(logger).
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/manager/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func checkSystemInfo(
downloadTasks []*task.StepDisplay
)
logger := ctx.Value(logprinter.ContextKeyLogger).(*logprinter.Logger)
insightVer := spec.TiDBComponentVersion(spec.ComponentCheckCollector, "")
insightVer := ""

uniqueHosts := map[string]int{} // host -> ssh-port
uniqueArchList := make(map[string]struct{}) // map["os-arch"]{}
Expand Down
5 changes: 2 additions & 3 deletions pkg/cluster/manager/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,11 @@ func (m *Manager) Deploy(
}

// Download missing component
downloadCompTasks = buildDownloadCompTasks(clusterVersion, topo, m.logger, gOpt, m.bindVersion)
downloadCompTasks = buildDownloadCompTasks(clusterVersion, topo, m.logger, gOpt)

// Deploy components to remote
topo.IterInstance(func(inst spec.Instance) {
version := m.bindVersion(inst.ComponentSource(), clusterVersion)
version := inst.CalculateVersion(clusterVersion)
deployDir := spec.Abs(globalOptions.User, inst.DeployDir())
// data dir would be empty for components which don't need it
dataDirs := spec.MultiDirAbs(globalOptions.User, inst.DataDir())
Expand Down Expand Up @@ -320,7 +320,6 @@ func (m *Manager) Deploy(
noAgentHosts,
globalOptions,
topo.GetMonitoredOptions(),
clusterVersion,
gOpt,
sshProxyProps,
)
Expand Down
9 changes: 9 additions & 0 deletions pkg/cluster/manager/display.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type DisplayOption struct {
ShowProcess bool
ShowManageHost bool
ShowNuma bool
ShowVersions bool
}

// InstInfo represents an instance info
Expand All @@ -70,6 +71,7 @@ type InstInfo struct {
DeployDir string `json:"deploy_dir"`
NumaNode string `json:"numa_node"`
NumaCores string `json:"numa_cores"`
Version string `json:"version"`

ComponentName string
Port int
Expand Down Expand Up @@ -205,6 +207,9 @@ func (m *Manager) Display(dopt DisplayOption, opt operator.Options) error {
if dopt.ShowNuma {
rowHead = append(rowHead, "Numa Node", "Numd Cores")
}
if dopt.ShowVersions {
rowHead = append(rowHead, "Version")
}

rowHead = append(rowHead, "Data Dir", "Deploy Dir")
clusterTable = append(clusterTable, rowHead)
Expand Down Expand Up @@ -235,6 +240,9 @@ func (m *Manager) Display(dopt DisplayOption, opt operator.Options) error {
if dopt.ShowNuma {
row = append(row, v.NumaNode, v.NumaCores)
}
if dopt.ShowVersions {
row = append(row, v.Version)
}

row = append(row, v.DataDir, v.DeployDir)
clusterTable = append(clusterTable, row)
Expand Down Expand Up @@ -667,6 +675,7 @@ func (m *Manager) GetClusterTopology(dopt DisplayOption, opt operator.Options) (
Since: since,
NumaNode: utils.Ternary(ins.GetNumaNode() == "", "-", ins.GetNumaNode()).(string),
NumaCores: utils.Ternary(ins.GetNumaCores() == "", "-", ins.GetNumaCores()).(string),
Version: ins.CalculateVersion(base.Version),
})
mu.Unlock()
}, opt.Concurrency)
Expand Down
3 changes: 0 additions & 3 deletions pkg/cluster/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,18 @@ var (
type Manager struct {
sysName string
specManager *spec.SpecManager
bindVersion spec.BindVersion
logger *logprinter.Logger
}

// NewManager create a Manager.
func NewManager(
sysName string,
specManager *spec.SpecManager,
bindVersion spec.BindVersion,
logger *logprinter.Logger,
) *Manager {
return &Manager{
sysName: sysName,
specManager: specManager,
bindVersion: bindVersion,
logger: logger,
}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/cluster/manager/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (m *Manager) Patch(name string, packagePath string, opt operator.Options, o
if err != nil {
return err
}
if err := checkPackage(m.bindVersion, m.specManager, name, insts[0].ComponentName(), insts[0].OS(), insts[0].Arch(), packagePath); err != nil {
if err := checkPackage(m.specManager, name, insts[0], insts[0].OS(), insts[0].Arch(), packagePath); err != nil {
return err
}

Expand Down Expand Up @@ -140,18 +140,18 @@ func (m *Manager) Patch(name string, packagePath string, opt operator.Options, o
return m.specManager.SaveMeta(name, metadata)
}

func checkPackage(bindVersion spec.BindVersion, specManager *spec.SpecManager, name, comp, nodeOS, arch, packagePath string) error {
func checkPackage(specManager *spec.SpecManager, name string, inst spec.Instance, nodeOS, arch, packagePath string) error {
metadata := specManager.NewMetadata()
if err := specManager.Metadata(name, metadata); err != nil {
return err
}

ver := bindVersion(comp, metadata.GetBaseMeta().Version)
ver := inst.CalculateVersion(metadata.GetBaseMeta().Version)
repo, err := clusterutil.NewRepository(nodeOS, arch)
if err != nil {
return err
}
entry, err := repo.ComponentBinEntry(comp, ver)
entry, err := repo.ComponentBinEntry(inst.ComponentSource(), ver)
if err != nil {
return err
}
Expand All @@ -160,7 +160,7 @@ func checkPackage(bindVersion spec.BindVersion, specManager *spec.SpecManager, n
if err != nil {
return err
}
cacheDir := specManager.Path(name, "cache", comp+"-"+checksum[:7])
cacheDir := specManager.Path(name, "cache", inst.ComponentSource()+"-"+checksum[:7])
if err := utils.MkdirAll(cacheDir, 0755); err != nil {
return perrs.Annotatef(err, "create cache directory %s", cacheDir)
}
Expand Down
Loading

0 comments on commit eb2f058

Please sign in to comment.