diff --git a/components/cluster/command/replay.go b/components/cluster/command/replay.go new file mode 100644 index 0000000000..e556bb004c --- /dev/null +++ b/components/cluster/command/replay.go @@ -0,0 +1,64 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "fmt" + "path" + "strings" + + "github.com/pingcap/errors" + "github.com/pingcap/tiup/pkg/checkpoint" + "github.com/pingcap/tiup/pkg/cliutil" + "github.com/pingcap/tiup/pkg/cluster/audit" + "github.com/pingcap/tiup/pkg/cluster/spec" + "github.com/spf13/cobra" +) + +func newReplayCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "replay ", + Short: "Replay previous operation and skip successed steps", + RunE: func(cmd *cobra.Command, args []string) error { + if len(args) != 1 { + return cmd.Help() + } + + file := path.Join(spec.AuditDir(), args[0]) + if !checkpoint.HasCheckPoint() { + if err := checkpoint.SetCheckPoint(file); err != nil { + return errors.Annotate(err, "set checkpoint failed") + } + } + + args, err := audit.CommandArgs(file) + if err != nil { + return errors.Annotate(err, "read audit log failed") + } + + if !skipConfirm { + if err := cliutil.PromptForConfirmOrAbortError( + fmt.Sprintf("Will replay the command `tiup cluster %s`\nDo you want to continue? [y/N]: ", strings.Join(args[1:], " ")), + ); err != nil { + return err + } + } + + rootCmd.SetArgs(args[1:]) + return rootCmd.Execute() + }, + } + + return cmd +} diff --git a/components/cluster/command/root.go b/components/cluster/command/root.go index addc3461e3..cc22a55ceb 100644 --- a/components/cluster/command/root.go +++ b/components/cluster/command/root.go @@ -18,15 +18,12 @@ import ( "encoding/json" "fmt" "os" - "path" "strings" "time" "github.com/fatih/color" "github.com/google/uuid" "github.com/joomcode/errorx" - "github.com/pingcap/errors" - "github.com/pingcap/tiup/pkg/checkpoint" "github.com/pingcap/tiup/pkg/cliutil" "github.com/pingcap/tiup/pkg/cluster/executor" "github.com/pingcap/tiup/pkg/cluster/flags" @@ -123,12 +120,6 @@ func init() { fmt.Println("The --native-ssh flag has been deprecated, please use --ssh=system") } - if gOpt.CheckPoint != "" { - if err := checkpoint.SetCheckPoint(path.Join(spec.AuditDir(), gOpt.CheckPoint)); err != nil { - return errors.Annotate(err, "set checkpoint failed") - } - } - return nil }, PersistentPostRunE: func(cmd *cobra.Command, args []string) error { @@ -145,9 +136,7 @@ func init() { rootCmd.PersistentFlags().BoolVarP(&skipConfirm, "yes", "y", false, "Skip all confirmations and assumes 'yes'") rootCmd.PersistentFlags().BoolVar(&gOpt.NativeSSH, "native-ssh", gOpt.NativeSSH, "(EXPERIMENTAL) Use the native SSH client installed on local system instead of the build-in one.") rootCmd.PersistentFlags().StringVar((*string)(&gOpt.SSHType), "ssh", "", "(EXPERIMENTAL) The executor type: 'builtin', 'system', 'none'.") - rootCmd.PersistentFlags().StringVar(&gOpt.CheckPoint, "checkpoint", "", "(EXPERIMENTAL) The audit log ID this command should recover from.") _ = rootCmd.PersistentFlags().MarkHidden("native-ssh") - _ = rootCmd.PersistentFlags().MarkHidden("checkpoint") rootCmd.AddCommand( newCheckCmd(), @@ -176,6 +165,7 @@ func init() { newPushCmd(), newTestCmd(), // hidden command for test internally newTelemetryCmd(), + newReplayCmd(), newTemplateCmd(), ) } diff --git a/components/dm/command/replay.go b/components/dm/command/replay.go new file mode 100644 index 0000000000..04e85392f1 --- /dev/null +++ b/components/dm/command/replay.go @@ -0,0 +1,64 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "fmt" + "path" + "strings" + + "github.com/pingcap/errors" + "github.com/pingcap/tiup/pkg/checkpoint" + "github.com/pingcap/tiup/pkg/cliutil" + "github.com/pingcap/tiup/pkg/cluster/audit" + "github.com/pingcap/tiup/pkg/cluster/spec" + "github.com/spf13/cobra" +) + +func newReplayCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "replay ", + Short: "Replay previous operation and skip successed steps", + RunE: func(cmd *cobra.Command, args []string) error { + if len(args) != 1 { + return cmd.Help() + } + + file := path.Join(spec.AuditDir(), args[0]) + if !checkpoint.HasCheckPoint() { + if err := checkpoint.SetCheckPoint(file); err != nil { + return errors.Annotate(err, "set checkpoint failed") + } + } + + args, err := audit.CommandArgs(file) + if err != nil { + return errors.Annotate(err, "read audit log failed") + } + + if !skipConfirm { + if err := cliutil.PromptForConfirmOrAbortError( + fmt.Sprintf("Will replay the command `tiup dm %s`\nDo you want to continue? [y/N]: ", strings.Join(args[1:], " ")), + ); err != nil { + return err + } + } + + rootCmd.SetArgs(args[1:]) + return rootCmd.Execute() + }, + } + + return cmd +} diff --git a/components/dm/command/root.go b/components/dm/command/root.go index 693e3340c9..85926ec4a3 100644 --- a/components/dm/command/root.go +++ b/components/dm/command/root.go @@ -16,14 +16,11 @@ package command import ( "fmt" "os" - "path" "strings" "github.com/fatih/color" "github.com/joomcode/errorx" - "github.com/pingcap/errors" "github.com/pingcap/tiup/components/dm/spec" - "github.com/pingcap/tiup/pkg/checkpoint" "github.com/pingcap/tiup/pkg/cliutil" "github.com/pingcap/tiup/pkg/cluster/executor" "github.com/pingcap/tiup/pkg/cluster/manager" @@ -99,12 +96,6 @@ please backup your data before process.`, fmt.Println("The --native-ssh flag has been deprecated, please use --ssh=system") } - if gOpt.CheckPoint != "" { - if err := checkpoint.SetCheckPoint(path.Join(cspec.AuditDir(), gOpt.CheckPoint)); err != nil { - return errors.Annotate(err, "set checkpoint failed") - } - } - return nil }, PersistentPostRunE: func(cmd *cobra.Command, args []string) error { @@ -119,9 +110,7 @@ please backup your data before process.`, rootCmd.PersistentFlags().BoolVarP(&skipConfirm, "yes", "y", false, "Skip all confirmations and assumes 'yes'") rootCmd.PersistentFlags().BoolVar(&gOpt.NativeSSH, "native-ssh", gOpt.NativeSSH, "Use the SSH client installed on local system instead of the build-in one.") rootCmd.PersistentFlags().StringVar((*string)(&gOpt.SSHType), "ssh", "", "The executor type: 'builtin', 'system', 'none'") - rootCmd.PersistentFlags().StringVar(&gOpt.CheckPoint, "checkpoint", "", "(EXPERIMENTAL) The audit log ID this command should recover from.") _ = rootCmd.PersistentFlags().MarkHidden("native-ssh") - _ = rootCmd.PersistentFlags().MarkHidden("checkpoint") rootCmd.AddCommand( newDeployCmd(), @@ -143,6 +132,7 @@ please backup your data before process.`, newImportCmd(), newEnableCmd(), newDisableCmd(), + newReplayCmd(), newTemplateCmd(), ) } diff --git a/pkg/checkpoint/checkpoint.go b/pkg/checkpoint/checkpoint.go index ac5ceac02b..c0751a560a 100644 --- a/pkg/checkpoint/checkpoint.go +++ b/pkg/checkpoint/checkpoint.go @@ -47,7 +47,7 @@ var ( DebugCheckpoint = os.Getenv("DEBUG_CHECKPOINT") == "1" ) -// SetCheckPoint set global checkpoint for executor +// SetCheckPoint set global checkpoint for replay func SetCheckPoint(file string) error { pointReader, err := os.Open(file) if err != nil { @@ -63,6 +63,11 @@ func SetCheckPoint(file string) error { return nil } +// HasCheckPoint returns if SetCheckPoint has been called +func HasCheckPoint() bool { + return checkpoint != nil +} + // Acquire wraps CheckPoint.Acquire func Acquire(ctx context.Context, point map[string]interface{}) *Point { if ctx.Value(goroutineKey) == nil || ctx.Value(semKey) == nil { @@ -202,7 +207,7 @@ next_set: if cf.eq == nil { continue } - if !cf.eq(ma[cf.field], mb[cf.field]) { + if !contains(ma, cf.field) || !contains(mb, cf.field) || !cf.eq(ma[cf.field], mb[cf.field]) { continue next_set } } @@ -210,3 +215,8 @@ next_set: } return false } + +func contains(m map[string]interface{}, f string) bool { + _, ok := m[f] + return ok +} diff --git a/pkg/checkpoint/checkpoint_test.go b/pkg/checkpoint/checkpoint_test.go index eaa378a9bf..ba58f122bc 100644 --- a/pkg/checkpoint/checkpoint_test.go +++ b/pkg/checkpoint/checkpoint_test.go @@ -35,6 +35,17 @@ func setup() { return fmt.Sprintf("%v", a) == fmt.Sprintf("%v", b) }), ) + + RegisterField( + Field("host", reflect.DeepEqual), + Field("port", func(a, b interface{}) bool { + return fmt.Sprintf("%v", a) == fmt.Sprintf("%v", b) + }), + Field("user", reflect.DeepEqual), + Field("src", reflect.DeepEqual), + Field("dst", reflect.DeepEqual), + Field("download", reflect.DeepEqual), + ) } func TestCheckPointSimple(t *testing.T) { diff --git a/pkg/cluster/audit/audit.go b/pkg/cluster/audit/audit.go index 575254855b..8bf7f9d3e2 100644 --- a/pkg/cluster/audit/audit.go +++ b/pkg/cluster/audit/audit.go @@ -16,6 +16,7 @@ package audit import ( "bufio" "fmt" + "net/url" "os" "path/filepath" "sort" @@ -30,22 +31,51 @@ import ( "github.com/pingcap/tiup/pkg/utils/rand" ) -// ShowAuditList show the audit list. -func ShowAuditList(dir string) error { - firstLine := func(fileName string) (string, error) { - file, err := os.Open(filepath.Join(dir, fileName)) - if err != nil { - return "", errors.Trace(err) - } - defer file.Close() +// CommandArgs returns the original commands from the first line of a file +func CommandArgs(fp string) ([]string, error) { + file, err := os.Open(fp) + if err != nil { + return nil, errors.Trace(err) + } + defer file.Close() + + scanner := bufio.NewScanner(file) + if !scanner.Scan() { + return nil, errors.New("unknown audit log format") + } + + args := strings.Split(scanner.Text(), " ") + return decodeCommandArgs(args) +} + +// encodeCommandArgs encode args with url.QueryEscape +func encodeCommandArgs(args []string) []string { + encoded := []string{} - scanner := bufio.NewScanner(file) - if scanner.Scan() { - return scanner.Text(), nil + for _, arg := range args { + encoded = append(encoded, url.QueryEscape(arg)) + } + + return encoded +} + +// decodeCommandArgs decode args with url.QueryUnescape +func decodeCommandArgs(args []string) ([]string, error) { + decoded := []string{} + + for _, arg := range args { + a, err := url.QueryUnescape(arg) + if err != nil { + return nil, errors.Annotate(err, "failed on decode the command line of audit log") } - return "", errors.New("unknown audit log format") + decoded = append(decoded, a) } + return decoded, nil +} + +// ShowAuditList show the audit list. +func ShowAuditList(dir string) error { // Header clusterTable := [][]string{{"ID", "Time", "Command"}} fileInfos, err := os.ReadDir(dir) @@ -60,10 +90,11 @@ func ShowAuditList(dir string) error { if err != nil { continue } - cmd, err := firstLine(fi.Name()) + args, err := CommandArgs(filepath.Join(dir, fi.Name())) if err != nil { continue } + cmd := strings.Join(args, " ") clusterTable = append(clusterTable, []string{ fi.Name(), t.Format(time.RFC3339), @@ -82,7 +113,20 @@ func ShowAuditList(dir string) error { // OutputAuditLog outputs audit log. func OutputAuditLog(dir string, data []byte) error { fname := filepath.Join(dir, base52.Encode(time.Now().UnixNano()+rand.Int63n(1000))) - return os.WriteFile(fname, data, 0644) + f, err := os.Create(fname) + if err != nil { + return errors.Annotate(err, "create audit log") + } + defer f.Close() + + args := encodeCommandArgs(os.Args) + if _, err := f.Write([]byte(strings.Join(args, " ") + "\n")); err != nil { + return errors.Annotate(err, "write audit log") + } + if _, err := f.Write(data); err != nil { + return errors.Annotate(err, "write audit log") + } + return nil } // ShowAuditLog show the audit with the specified auditID diff --git a/pkg/cluster/operation/operation.go b/pkg/cluster/operation/operation.go index cfaab27f52..01c96b0239 100644 --- a/pkg/cluster/operation/operation.go +++ b/pkg/cluster/operation/operation.go @@ -29,7 +29,6 @@ type Options struct { SSHTimeout uint64 // timeout in seconds when connecting an SSH server OptTimeout uint64 // timeout in seconds for operations that support it, not to confuse with SSH timeout APITimeout uint64 // timeout in seconds for API operations that support it, like transferring store leader - CheckPoint string // the audit log ID where we should recover from, this is useful when an action failed and we want to continue that action IgnoreConfigCheck bool // should we ignore the config check result after init config NativeSSH bool // should use native ssh client or builtin easy ssh (deprecated, shoule use SSHType) SSHType executor.SSHType // the ssh type: 'builtin', 'system', 'none' diff --git a/pkg/logger/audit.go b/pkg/logger/audit.go index 01724ee534..90c1c8de07 100644 --- a/pkg/logger/audit.go +++ b/pkg/logger/audit.go @@ -15,8 +15,6 @@ package logger import ( "bytes" - "os" - "strings" "github.com/pingcap/tiup/pkg/cluster/audit" utils2 "github.com/pingcap/tiup/pkg/utils" @@ -41,7 +39,7 @@ func DisableAuditLog() { } func newAuditLogCore() zapcore.Core { - auditBuffer = bytes.NewBufferString(strings.Join(os.Args, " ") + "\n") + auditBuffer = bytes.NewBuffer([]byte{}) encoder := zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()) return zapcore.NewCore(encoder, zapcore.Lock(zapcore.AddSync(auditBuffer)), zapcore.DebugLevel) } diff --git a/tests/tiup-cluster/script/cmd_subtest.sh b/tests/tiup-cluster/script/cmd_subtest.sh index 8873acd2de..9c9202b419 100755 --- a/tests/tiup-cluster/script/cmd_subtest.sh +++ b/tests/tiup-cluster/script/cmd_subtest.sh @@ -73,7 +73,7 @@ function cmd_subtest() { tiup-cluster exec $name -N n1 --command "ls /tmp/checkpoint" tiup-cluster exec $name -N n1 --command "rm -f /tmp/checkpoint" id=`tiup-cluster audit | grep "exec $name" | grep "ls /tmp/checkpoint" | awk '{print $1}'` - tiup-cluster exec $name -N n1 --command "ls /tmp/checkpoint" --checkpoint $id + tiup-cluster replay --yes $id ! tiup-cluster exec $name -N n1 --command "ls /tmp/checkpoint" # test patch overwrite