Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster: support spec template for exec #1048

Merged
merged 5 commits into from
Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions components/cluster/command/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func checkSystemInfo(s *cliutil.SSHConnectionProps, topo *spec.Specification, gO
Shell(
inst.GetHost(),
filepath.Join(task.CheckToolsPathDir, "bin", "insight"),
"",
false,
).
BuildAsStep(fmt.Sprintf(" - Getting system info of %s:%d", inst.GetHost(), inst.GetSSHPort()))
Expand All @@ -230,6 +231,7 @@ func checkSystemInfo(s *cliutil.SSHConnectionProps, topo *spec.Specification, gO
Shell(
inst.GetHost(),
"ss -lnt",
"",
false,
).
CheckSys(
Expand All @@ -243,6 +245,7 @@ func checkSystemInfo(s *cliutil.SSHConnectionProps, topo *spec.Specification, gO
Shell(
inst.GetHost(),
"cat /etc/security/limits.conf",
"",
false,
).
CheckSys(
Expand All @@ -256,6 +259,7 @@ func checkSystemInfo(s *cliutil.SSHConnectionProps, topo *spec.Specification, gO
Shell(
inst.GetHost(),
"sysctl -a",
"",
true,
).
CheckSys(
Expand Down Expand Up @@ -459,11 +463,13 @@ func fixFailedChecks(ctx *task.Context, host string, res *operator.CheckResult,
"/etc/selinux/config",
"setenforce 0",
),
"",
true)
msg = fmt.Sprintf("will try to %s, reboot might be needed", color.HiBlueString("disable SELinux"))
case operator.CheckNameTHP:
t.Shell(host,
"echo never > /sys/kernel/mm/transparent_hugepage/enabled && echo never > /sys/kernel/mm/transparent_hugepage/defrag",
"",
true)
msg = fmt.Sprintf("will try to %s, please check again after reboot", color.HiBlueString("disable THP"))
default:
Expand Down
62 changes: 41 additions & 21 deletions pkg/cluster/manager/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
package manager

import (
"fmt"
"strings"

"github.com/fatih/color"
"github.com/joomcode/errorx"
perrs "github.com/pingcap/errors"
Expand Down Expand Up @@ -44,9 +47,10 @@ func (m *Manager) Exec(name string, opt ExecOptions, gOpt operator.Options) erro
filterNodes := set.NewStringSet(gOpt.Nodes...)

var shellTasks []task.Task
uniqueHosts := map[string]int{} // host -> ssh-port
uniqueHosts := map[string]set.StringSet{} // host-sshPort -> {command}
topo.IterInstance(func(inst spec.Instance) {
if _, found := uniqueHosts[inst.GetHost()]; !found {
key := fmt.Sprintf("%s-%d", inst.GetHost(), inst.GetSSHPort())
if _, found := uniqueHosts[key]; !found {
if len(gOpt.Roles) > 0 && !filterRoles.Exist(inst.Role()) {
return
}
Expand All @@ -55,15 +59,28 @@ func (m *Manager) Exec(name string, opt ExecOptions, gOpt operator.Options) erro
return
}

uniqueHosts[inst.GetHost()] = inst.GetSSHPort()
cmds, err := renderInstanceSpec(opt.Command, inst)
if err != nil {
log.Debugf("error rendering command with spec: %s", err)
return // skip
}
cmdSet := set.NewStringSet(cmds...)
if _, ok := uniqueHosts[key]; ok {
uniqueHosts[key].Join(cmdSet)
return
}
uniqueHosts[key] = cmdSet
}
})

for host := range uniqueHosts {
shellTasks = append(shellTasks,
task.NewBuilder().
Shell(host, opt.Command, opt.Sudo).
Build())
for hostKey, i := range uniqueHosts {
host := strings.Split(hostKey, "-")[0]
for _, cmd := range i.Slice() {
shellTasks = append(shellTasks,
task.NewBuilder().
Shell(host, cmd, hostKey+cmd, opt.Sudo).
Build())
}
}

t := m.sshTaskBuilder(name, topo, base.User, gOpt).
Expand All @@ -80,19 +97,22 @@ func (m *Manager) Exec(name string, opt ExecOptions, gOpt operator.Options) erro
}

// print outputs
for host := range uniqueHosts {
stdout, stderr, ok := execCtx.GetOutputs(host)
if !ok {
continue
}
log.Infof("Outputs of %s on %s:",
color.CyanString(opt.Command),
color.CyanString(host))
if len(stdout) > 0 {
log.Infof("%s:\n%s", color.GreenString("stdout"), stdout)
}
if len(stderr) > 0 {
log.Infof("%s:\n%s", color.RedString("stderr"), stderr)
for hostKey, i := range uniqueHosts {
host := strings.Split(hostKey, "-")[0]
for _, cmd := range i.Slice() {
stdout, stderr, ok := execCtx.GetOutputs(hostKey + cmd)
if !ok {
continue
}
log.Infof("Outputs of %s on %s:",
color.CyanString(cmd),
color.CyanString(host))
if len(stdout) > 0 {
log.Infof("%s:\n%s", color.GreenString("stdout"), stdout)
}
if len(stderr) > 0 {
log.Infof("%s:\n%s", color.RedString("stderr"), stderr)
}
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/cluster/manager/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ func (m *Manager) Transfer(name string, opt TransferOptions, gOpt operator.Optio

var shellTasks []task.Task

uniqueHosts := map[string]set.StringSet{} // host-sshPort-port -> {remote-path}
uniqueHosts := map[string]set.StringSet{} // host-sshPort -> {remote-path}
topo.IterInstance(func(inst spec.Instance) {
key := fmt.Sprintf("%s-%d-%d", inst.GetHost(), inst.GetSSHPort(), inst.GetPort())
key := fmt.Sprintf("%s-%d", inst.GetHost(), inst.GetSSHPort())
if _, found := uniqueHosts[key]; !found {
if len(gOpt.Roles) > 0 && !filterRoles.Exist(inst.Role()) {
return
Expand All @@ -68,6 +68,7 @@ func (m *Manager) Transfer(name string, opt TransferOptions, gOpt operator.Optio
instPath := opt.Remote
paths, err := renderInstanceSpec(instPath, inst)
if err != nil {
log.Debugf("error rendering remote path with spec: %s", err)
return // skip
}
pathSet := set.NewStringSet(paths...)
Expand Down Expand Up @@ -114,8 +115,7 @@ func renderInstanceSpec(t string, inst spec.Instance) ([]string, error) {
switch inst.ComponentName() {
case spec.ComponentTiFlash:
for _, d := range strings.Split(inst.DataDir(), ",") {
tf := inst
tfs, ok := tf.(*spec.TiFlashInstance).InstanceSpec.(spec.TiFlashSpec)
tfs, ok := inst.(*spec.TiFlashInstance).InstanceSpec.(spec.TiFlashSpec)
if !ok {
return result, perrs.Errorf("instance type mismatch for %v", inst)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/cluster/task/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,12 @@ func (b *Builder) Rmdir(host string, dirs ...string) *Builder {
}

// Shell command on cluster host
func (b *Builder) Shell(host, command string, sudo bool) *Builder {
func (b *Builder) Shell(host, command, cmdID string, sudo bool) *Builder {
b.tasks = append(b.tasks, &Shell{
host: host,
command: command,
sudo: sudo,
cmdID: cmdID,
})
return b
}
Expand Down Expand Up @@ -364,6 +365,7 @@ func (b *Builder) DeploySpark(inst spec.Instance, sparkVersion, srcPath, deployD
deployDir,
filepath.Join(deployDir, sparkSubPath),
),
"",
false, // (not) sudo
).CopyComponent(
inst.ComponentName(),
Expand All @@ -380,6 +382,7 @@ func (b *Builder) DeploySpark(inst spec.Instance, sparkVersion, srcPath, deployD
filepath.Join(deployDir, "bin"),
deployDir,
),
"",
false, // (not) sudo
)
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/cluster/task/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Shell struct {
host string
command string
sudo bool
cmdID string
}

// Execute implements the Task interface
Expand All @@ -37,7 +38,11 @@ func (m *Shell) Execute(ctx *Context) error {
log.Infof("Run command on %s(sudo:%v): %s", m.host, m.sudo, m.command)

stdout, stderr, err := exec.Execute(m.command, m.sudo)
ctx.SetOutputs(m.host, stdout, stderr)
outputID := m.host
if m.cmdID != "" {
outputID = m.cmdID
}
ctx.SetOutputs(outputID, stdout, stderr)
if err != nil {
return errors.Trace(err)
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/cluster/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,19 @@ func (ctx *Context) SetExecutor(host string, e executor.Executor) {
}

// GetOutputs get the outputs of a host (if has any)
func (ctx *Context) GetOutputs(host string) ([]byte, []byte, bool) {
func (ctx *Context) GetOutputs(hostID string) ([]byte, []byte, bool) {
ctx.exec.RLock()
stdout, ok1 := ctx.exec.stderrs[host]
stderr, ok2 := ctx.exec.stdouts[host]
stdout, ok1 := ctx.exec.stderrs[hostID]
stderr, ok2 := ctx.exec.stdouts[hostID]
ctx.exec.RUnlock()
return stdout, stderr, ok1 && ok2
}

// SetOutputs set the outputs of a host
func (ctx *Context) SetOutputs(host string, stdout []byte, stderr []byte) {
func (ctx *Context) SetOutputs(hostID string, stdout []byte, stderr []byte) {
ctx.exec.Lock()
ctx.exec.stderrs[host] = stdout
ctx.exec.stdouts[host] = stderr
ctx.exec.stderrs[hostID] = stdout
ctx.exec.stdouts[hostID] = stderr
ctx.exec.Unlock()
}

Expand Down