Skip to content

Commit

Permalink
Merge branch 'master' into upgrade-pd
Browse files Browse the repository at this point in the history
  • Loading branch information
lucklove authored Jan 22, 2021
2 parents 2ade913 + 9f96db2 commit 1574ec7
Show file tree
Hide file tree
Showing 101 changed files with 2,166 additions and 806 deletions.
18 changes: 12 additions & 6 deletions components/cluster/command/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package command

import (
"context"
"fmt"
"path"
"path/filepath"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/joomcode/errorx"
perrs "github.com/pingcap/errors"
"github.com/pingcap/tiup/pkg/cliutil"
"github.com/pingcap/tiup/pkg/cluster/ctxt"
"github.com/pingcap/tiup/pkg/cluster/executor"
operator "github.com/pingcap/tiup/pkg/cluster/operation"
"github.com/pingcap/tiup/pkg/cluster/spec"
Expand Down Expand Up @@ -330,7 +332,7 @@ func checkSystemInfo(s *cliutil.SSHConnectionProps, topo *spec.Specification, gO
ParallelStep("+ Cleanup check files", false, cleanTasks...).
Build()

ctx := task.NewContext()
ctx := ctxt.New(context.Background())
if err := t.Execute(ctx); err != nil {
if errorx.Cast(err) != nil {
// FIXME: Map possible task errors and give suggestions.
Expand Down Expand Up @@ -387,11 +389,15 @@ func checkSystemInfo(s *cliutil.SSHConnectionProps, topo *spec.Specification, gO
}

// handleCheckResults parses the result of checks
func handleCheckResults(ctx *task.Context, host string, opt *checkOptions, t *task.Builder) ([][]string, error) {
results, _ := ctx.GetCheckResults(host)
if len(results) < 1 {
func handleCheckResults(ctx context.Context, host string, opt *checkOptions, t *task.Builder) ([][]string, error) {
rr, _ := ctxt.GetInner(ctx).GetCheckResults(host)
if len(rr) < 1 {
return nil, fmt.Errorf("no check results found for %s", host)
}
results := []*operator.CheckResult{}
for _, r := range rr {
results = append(results, r.(*operator.CheckResult))
}

lines := make([][]string, 0)
//log.Infof("Check results of %s: (only errors and important info are displayed)", color.HiCyanString(host))
Expand All @@ -407,7 +413,7 @@ func handleCheckResults(ctx *task.Context, host string, opt *checkOptions, t *ta
lines = append(lines, line)
continue
}
msg, err := fixFailedChecks(ctx, host, r, t)
msg, err := fixFailedChecks(host, r, t)
if err != nil {
log.Debugf("%s: fail to apply fix to %s (%s)", host, r.Name, err)
}
Expand All @@ -429,7 +435,7 @@ func handleCheckResults(ctx *task.Context, host string, opt *checkOptions, t *ta
}

// fixFailedChecks tries to automatically apply changes to fix failed checks
func fixFailedChecks(ctx *task.Context, host string, res *operator.CheckResult, t *task.Builder) (string, error) {
func fixFailedChecks(host string, res *operator.CheckResult, t *task.Builder) (string, error) {
msg := ""
switch res.Name {
case operator.CheckNameSysService:
Expand Down
6 changes: 3 additions & 3 deletions components/cluster/command/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ func newDeploy() *cobra.Command {
}

func postDeployHook(builder *task.Builder, topo spec.Topology) {
nodeInfoTask := task.NewBuilder().Func("Check status", func(ctx *task.Context) error {
nodeInfoTask := task.NewBuilder().Func("Check status", func(ctx context.Context) error {
var err error
teleNodeInfos, err = operator.GetNodeInfo(context.Background(), ctx, topo)
teleNodeInfos, err = operator.GetNodeInfo(ctx, topo)
_ = err
// intend to never return error
return nil
Expand All @@ -107,7 +107,7 @@ func postDeployHook(builder *task.Builder, topo spec.Topology) {
builder.ParallelStep("+ Check status", false, nodeInfoTask)
}

enableTask := task.NewBuilder().Func("Setting service auto start on boot", func(ctx *task.Context) error {
enableTask := task.NewBuilder().Func("Setting service auto start on boot", func(ctx context.Context) error {
return operator.Enable(ctx, topo, operator.Options{}, true)
}).BuildAsStep("Enable service").SetHidden(true)

Expand Down
11 changes: 11 additions & 0 deletions components/cluster/command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ import (
"encoding/json"
"fmt"
"os"
"path"
"strings"
"time"

"github.com/fatih/color"
"github.com/google/uuid"
"github.com/joomcode/errorx"
"github.com/pingcap/errors"
"github.com/pingcap/tiup/pkg/checkpoint"
"github.com/pingcap/tiup/pkg/cliutil"
"github.com/pingcap/tiup/pkg/cluster/executor"
"github.com/pingcap/tiup/pkg/cluster/flags"
Expand Down Expand Up @@ -120,6 +123,12 @@ func init() {
fmt.Println("The --native-ssh flag has been deprecated, please use --ssh=system")
}

if gOpt.CheckPoint != "" {
if err := checkpoint.SetCheckPoint(path.Join(spec.AuditDir(), gOpt.CheckPoint)); err != nil {
return errors.Annotate(err, "set checkpoint failed")
}
}

return nil
},
PersistentPostRunE: func(cmd *cobra.Command, args []string) error {
Expand All @@ -136,7 +145,9 @@ func init() {
rootCmd.PersistentFlags().BoolVarP(&skipConfirm, "yes", "y", false, "Skip all confirmations and assumes 'yes'")
rootCmd.PersistentFlags().BoolVar(&gOpt.NativeSSH, "native-ssh", gOpt.NativeSSH, "(EXPERIMENTAL) Use the native SSH client installed on local system instead of the build-in one.")
rootCmd.PersistentFlags().StringVar((*string)(&gOpt.SSHType), "ssh", "", "(EXPERIMENTAL) The executor type: 'builtin', 'system', 'none'.")
rootCmd.PersistentFlags().StringVar(&gOpt.CheckPoint, "checkpoint", "", "(EXPERIMENTAL) The audit log ID this command should recover from.")
_ = rootCmd.PersistentFlags().MarkHidden("native-ssh")
_ = rootCmd.PersistentFlags().MarkHidden("checkpoint")

rootCmd.AddCommand(
newCheckCmd(),
Expand Down
34 changes: 18 additions & 16 deletions components/dm/ansible/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ansible
import (
"bufio"
"bytes"
"context"
"fmt"
"io/ioutil"
"os"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tiup/components/dm/spec"
"github.com/pingcap/tiup/pkg/cluster/ansible"
"github.com/pingcap/tiup/pkg/cluster/ctxt"
"github.com/pingcap/tiup/pkg/cluster/executor"
"github.com/pingcap/tiup/pkg/utils"
"github.com/relex/aini"
Expand Down Expand Up @@ -108,7 +110,7 @@ func getAbsPath(dir string, path string) string {

// ExecutorGetter get the executor by host.
type ExecutorGetter interface {
Get(host string) (e executor.Executor)
Get(host string) (e ctxt.Executor)
}

// Importer used for import from ansible.
Expand Down Expand Up @@ -145,7 +147,7 @@ func NewImporter(ansibleDir, inventoryFileName string, sshType executor.SSHType,
}, nil
}

func (im *Importer) getExecutor(host string, port int) (e executor.Executor, err error) {
func (im *Importer) getExecutor(host string, port int) (e ctxt.Executor, err error) {
if im.testExecutorGetter != nil {
return im.testExecutorGetter.Get(host), nil
}
Expand All @@ -165,7 +167,7 @@ func (im *Importer) getExecutor(host string, port int) (e executor.Executor, err
return
}

func (im *Importer) fetchFile(host string, port int, fname string) (data []byte, err error) {
func (im *Importer) fetchFile(ctx context.Context, host string, port int, fname string) (data []byte, err error) {
e, err := im.getExecutor(host, port)
if err != nil {
return nil, errors.Annotatef(err, "failed to get executor, target: %s:%d", host, port)
Expand All @@ -179,7 +181,7 @@ func (im *Importer) fetchFile(host string, port int, fname string) (data []byte,

tmp = filepath.Join(tmp, filepath.Base(fname))

err = e.Transfer(fname, tmp, true /*download*/)
err = e.Transfer(ctx, fname, tmp, true /*download*/)
if err != nil {
return nil, errors.Annotatef(err, "transfer %s from %s:%d", fname, host, port)
}
Expand All @@ -202,8 +204,8 @@ func setConfig(config *map[string]interface{}, k string, v interface{}) {

// handleWorkerConfig fetch the config file of worker and generate the source
// which we need for the master.
func (im *Importer) handleWorkerConfig(srv *spec.WorkerSpec, fname string) error {
data, err := im.fetchFile(srv.Host, srv.SSHPort, fname)
func (im *Importer) handleWorkerConfig(ctx context.Context, srv *spec.WorkerSpec, fname string) error {
data, err := im.fetchFile(ctx, srv.Host, srv.SSHPort, fname)
if err != nil {
return err
}
Expand All @@ -222,7 +224,7 @@ func (im *Importer) handleWorkerConfig(srv *spec.WorkerSpec, fname string) error

// ScpSourceToMaster scp the source files to master,
// and set V1SourcePath of the master spec.
func (im *Importer) ScpSourceToMaster(topo *spec.Specification) (err error) {
func (im *Importer) ScpSourceToMaster(ctx context.Context, topo *spec.Specification) (err error) {
for i := 0; i < len(topo.Masters); i++ {
master := &topo.Masters[i]
target := filepath.Join(firstNonEmpty(master.DeployDir, topo.GlobalOptions.DeployDir), "v1source")
Expand All @@ -232,7 +234,7 @@ func (im *Importer) ScpSourceToMaster(topo *spec.Specification) (err error) {
if err != nil {
return errors.Annotatef(err, "failed to get executor, target: %s:%d", master.Host, master.SSHPort)
}
_, stderr, err := e.Execute("mkdir -p "+target, false)
_, stderr, err := e.Execute(ctx, "mkdir -p "+target, false)
if err != nil {
return errors.Annotatef(err, "failed to execute: %s", string(stderr))
}
Expand All @@ -253,7 +255,7 @@ func (im *Importer) ScpSourceToMaster(topo *spec.Specification) (err error) {
return errors.AddStack(err)
}

err = e.Transfer(f.Name(), filepath.Join(target, addr+".yml"), false)
err = e.Transfer(ctx, f.Name(), filepath.Join(target, addr+".yml"), false)
if err != nil {
return err
}
Expand All @@ -272,7 +274,7 @@ func instancDeployDir(comp string, port int, hostDir string, globalDir string) s
}

// ImportFromAnsibleDir generate the metadata from ansible deployed cluster.
func (im *Importer) ImportFromAnsibleDir() (clusterName string, meta *spec.Metadata, err error) {
func (im *Importer) ImportFromAnsibleDir(ctx context.Context) (clusterName string, meta *spec.Metadata, err error) {
dir := im.dir
inventoryFileName := im.inventoryFileName

Expand Down Expand Up @@ -334,7 +336,7 @@ func (im *Importer) ImportFromAnsibleDir() (clusterName string, meta *spec.Metad
}

runFileName := filepath.Join(host.Vars["deploy_dir"], "scripts", "run_dm-master.sh")
data, err := im.fetchFile(srv.Host, srv.SSHPort, runFileName)
data, err := im.fetchFile(ctx, srv.Host, srv.SSHPort, runFileName)
if err != nil {
return "", nil, err
}
Expand Down Expand Up @@ -382,7 +384,7 @@ func (im *Importer) ImportFromAnsibleDir() (clusterName string, meta *spec.Metad
}

runFileName := filepath.Join(host.Vars["deploy_dir"], "scripts", "run_dm-worker.sh")
data, err := im.fetchFile(srv.Host, srv.SSHPort, runFileName)
data, err := im.fetchFile(ctx, srv.Host, srv.SSHPort, runFileName)
if err != nil {
return "", nil, err
}
Expand Down Expand Up @@ -425,7 +427,7 @@ func (im *Importer) ImportFromAnsibleDir() (clusterName string, meta *spec.Metad
// We will always set the wd as DeployDir.
srv.DeployDir = deployDir

err = im.handleWorkerConfig(&srv, configFileName)
err = im.handleWorkerConfig(ctx, &srv, configFileName)
if err != nil {
return "", nil, err
}
Expand All @@ -443,7 +445,7 @@ func (im *Importer) ImportFromAnsibleDir() (clusterName string, meta *spec.Metad
}

runFileName := filepath.Join(host.Vars["deploy_dir"], "scripts", "run_prometheus.sh")
data, err := im.fetchFile(srv.Host, srv.SSHPort, runFileName)
data, err := im.fetchFile(ctx, srv.Host, srv.SSHPort, runFileName)
if err != nil {
return "", nil, err
}
Expand Down Expand Up @@ -491,7 +493,7 @@ func (im *Importer) ImportFromAnsibleDir() (clusterName string, meta *spec.Metad
}

runFileName := filepath.Join(host.Vars["deploy_dir"], "scripts", "run_alertmanager.sh")
data, err := im.fetchFile(srv.Host, srv.SSHPort, runFileName)
data, err := im.fetchFile(ctx, srv.Host, srv.SSHPort, runFileName)
if err != nil {
return "", nil, err
}
Expand Down Expand Up @@ -548,7 +550,7 @@ func (im *Importer) ImportFromAnsibleDir() (clusterName string, meta *spec.Metad
}

runFileName := filepath.Join(host.Vars["deploy_dir"], "scripts", "run_grafana.sh")
data, err := im.fetchFile(srv.Host, srv.SSHPort, runFileName)
data, err := im.fetchFile(ctx, srv.Host, srv.SSHPort, runFileName)
if err != nil {
return "", nil, err
}
Expand Down
10 changes: 6 additions & 4 deletions components/dm/ansible/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package ansible

import (
"context"
"io/ioutil"
"path/filepath"
"strconv"
Expand All @@ -22,6 +23,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tiup/components/dm/spec"
"github.com/pingcap/tiup/pkg/cluster/ctxt"
"github.com/pingcap/tiup/pkg/cluster/executor"
"github.com/stretchr/testify/require"
)
Expand All @@ -38,21 +40,21 @@ type executorGetter struct {
var _ ExecutorGetter = &executorGetter{}

// Get implements ExecutorGetter interface.
func (g *executorGetter) Get(host string) executor.Executor {
func (g *executorGetter) Get(host string) ctxt.Executor {
return &localExecutor{
host: host,
}
}

// Transfer implements executor interface.
// Replace the deploy directory as the local one in testdata, so we can fetch it.
func (l *localExecutor) Transfer(src string, target string, download bool) error {
func (l *localExecutor) Transfer(ctx context.Context, src string, target string, download bool) error {
mydeploy, err := filepath.Abs("./testdata/deploy_dir/" + l.host)
if err != nil {
return errors.AddStack(err)
}
src = strings.Replace(src, "/home/tidb/deploy", mydeploy, 1)
return l.Local.Transfer(src, target, download)
return l.Local.Transfer(ctx, src, target, download)
}

func TestParseRunScript(t *testing.T) {
Expand Down Expand Up @@ -140,7 +142,7 @@ func TestImportFromAnsible(t *testing.T) {
im, err := NewImporter(dir, "inventory.ini", executor.SSHTypeBuiltin, 0)
assert.Nil(err)
im.testExecutorGetter = &executorGetter{}
clusterName, meta, err := im.ImportFromAnsibleDir()
clusterName, meta, err := im.ImportFromAnsibleDir(ctxt.New(context.Background()))
assert.Nil(err, "verbose: %+v", err)
assert.Equal("test-cluster", clusterName)

Expand Down
7 changes: 5 additions & 2 deletions components/dm/command/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package command

import (
"context"
"fmt"
"io/ioutil"

Expand All @@ -22,6 +23,7 @@ import (
"github.com/pingcap/tiup/components/dm/ansible"
"github.com/pingcap/tiup/pkg/cliutil"
cansible "github.com/pingcap/tiup/pkg/cluster/ansible"
"github.com/pingcap/tiup/pkg/cluster/ctxt"
"github.com/pingcap/tiup/pkg/cluster/manager"
tiuputils "github.com/pingcap/tiup/pkg/utils"
"github.com/spf13/cobra"
Expand All @@ -47,7 +49,8 @@ func newImportCmd() *cobra.Command {
return err
}

clusterName, meta, err := importer.ImportFromAnsibleDir()
ctx := ctxt.New(context.Background())
clusterName, meta, err := importer.ImportFromAnsibleDir(ctx)
if err != nil {
return err
}
Expand All @@ -56,7 +59,7 @@ func newImportCmd() *cobra.Command {
clusterName = rename
}

err = importer.ScpSourceToMaster(meta.Topology)
err = importer.ScpSourceToMaster(ctx, meta.Topology)
if err != nil {
return err
}
Expand Down
11 changes: 11 additions & 0 deletions components/dm/command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ package command
import (
"fmt"
"os"
"path"
"strings"

"github.com/fatih/color"
"github.com/joomcode/errorx"
"github.com/pingcap/errors"
"github.com/pingcap/tiup/components/dm/spec"
"github.com/pingcap/tiup/pkg/checkpoint"
"github.com/pingcap/tiup/pkg/cliutil"
"github.com/pingcap/tiup/pkg/cluster/executor"
"github.com/pingcap/tiup/pkg/cluster/manager"
Expand Down Expand Up @@ -96,6 +99,12 @@ please backup your data before process.`,
fmt.Println("The --native-ssh flag has been deprecated, please use --ssh=system")
}

if gOpt.CheckPoint != "" {
if err := checkpoint.SetCheckPoint(path.Join(cspec.AuditDir(), gOpt.CheckPoint)); err != nil {
return errors.Annotate(err, "set checkpoint failed")
}
}

return nil
},
PersistentPostRunE: func(cmd *cobra.Command, args []string) error {
Expand All @@ -110,7 +119,9 @@ please backup your data before process.`,
rootCmd.PersistentFlags().BoolVarP(&skipConfirm, "yes", "y", false, "Skip all confirmations and assumes 'yes'")
rootCmd.PersistentFlags().BoolVar(&gOpt.NativeSSH, "native-ssh", gOpt.NativeSSH, "Use the SSH client installed on local system instead of the build-in one.")
rootCmd.PersistentFlags().StringVar((*string)(&gOpt.SSHType), "ssh", "", "The executor type: 'builtin', 'system', 'none'")
rootCmd.PersistentFlags().StringVar(&gOpt.CheckPoint, "checkpoint", "", "(EXPERIMENTAL) The audit log ID this command should recover from.")
_ = rootCmd.PersistentFlags().MarkHidden("native-ssh")
_ = rootCmd.PersistentFlags().MarkHidden("checkpoint")

rootCmd.AddCommand(
newDeployCmd(),
Expand Down
Loading

0 comments on commit 1574ec7

Please sign in to comment.