Skip to content

Commit

Permalink
Merge e469a5b into f8a7bfa
Browse files Browse the repository at this point in the history
  • Loading branch information
lucklove authored Feb 26, 2021
2 parents f8a7bfa + e469a5b commit a73c32e
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 43 deletions.
64 changes: 64 additions & 0 deletions components/cluster/command/replay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package command

import (
"fmt"
"path"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/tiup/pkg/checkpoint"
"github.com/pingcap/tiup/pkg/cliutil"
"github.com/pingcap/tiup/pkg/cluster/audit"
"github.com/pingcap/tiup/pkg/cluster/spec"
"github.com/spf13/cobra"
)

func newReplayCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "replay <audit-id>",
Short: "Replay previous operation and skip successed steps",
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) != 1 {
return cmd.Help()
}

file := path.Join(spec.AuditDir(), args[0])
if !checkpoint.HasCheckPoint() {
if err := checkpoint.SetCheckPoint(file); err != nil {
return errors.Annotate(err, "set checkpoint failed")
}
}

args, err := audit.CommandArgs(file)
if err != nil {
return errors.Annotate(err, "read audit log failed")
}

if !skipConfirm {
if err := cliutil.PromptForConfirmOrAbortError(
fmt.Sprintf("Will replay the command `tiup cluster %s`\nDo you want to continue? [y/N]: ", strings.Join(args[1:], " ")),
); err != nil {
return err
}
}

rootCmd.SetArgs(args[1:])
return rootCmd.Execute()
},
}

return cmd
}
12 changes: 1 addition & 11 deletions components/cluster/command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@ import (
"encoding/json"
"fmt"
"os"
"path"
"strings"
"time"

"github.com/fatih/color"
"github.com/google/uuid"
"github.com/joomcode/errorx"
"github.com/pingcap/errors"
"github.com/pingcap/tiup/pkg/checkpoint"
"github.com/pingcap/tiup/pkg/cliutil"
"github.com/pingcap/tiup/pkg/cluster/executor"
"github.com/pingcap/tiup/pkg/cluster/flags"
Expand Down Expand Up @@ -123,12 +120,6 @@ func init() {
fmt.Println("The --native-ssh flag has been deprecated, please use --ssh=system")
}

if gOpt.CheckPoint != "" {
if err := checkpoint.SetCheckPoint(path.Join(spec.AuditDir(), gOpt.CheckPoint)); err != nil {
return errors.Annotate(err, "set checkpoint failed")
}
}

return nil
},
PersistentPostRunE: func(cmd *cobra.Command, args []string) error {
Expand All @@ -145,9 +136,7 @@ func init() {
rootCmd.PersistentFlags().BoolVarP(&skipConfirm, "yes", "y", false, "Skip all confirmations and assumes 'yes'")
rootCmd.PersistentFlags().BoolVar(&gOpt.NativeSSH, "native-ssh", gOpt.NativeSSH, "(EXPERIMENTAL) Use the native SSH client installed on local system instead of the build-in one.")
rootCmd.PersistentFlags().StringVar((*string)(&gOpt.SSHType), "ssh", "", "(EXPERIMENTAL) The executor type: 'builtin', 'system', 'none'.")
rootCmd.PersistentFlags().StringVar(&gOpt.CheckPoint, "checkpoint", "", "(EXPERIMENTAL) The audit log ID this command should recover from.")
_ = rootCmd.PersistentFlags().MarkHidden("native-ssh")
_ = rootCmd.PersistentFlags().MarkHidden("checkpoint")

rootCmd.AddCommand(
newCheckCmd(),
Expand Down Expand Up @@ -176,6 +165,7 @@ func init() {
newPushCmd(),
newTestCmd(), // hidden command for test internally
newTelemetryCmd(),
newReplayCmd(),
newTemplateCmd(),
)
}
Expand Down
64 changes: 64 additions & 0 deletions components/dm/command/replay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package command

import (
"fmt"
"path"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/tiup/pkg/checkpoint"
"github.com/pingcap/tiup/pkg/cliutil"
"github.com/pingcap/tiup/pkg/cluster/audit"
"github.com/pingcap/tiup/pkg/cluster/spec"
"github.com/spf13/cobra"
)

func newReplayCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "replay <audit-id>",
Short: "Replay previous operation and skip successed steps",
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) != 1 {
return cmd.Help()
}

file := path.Join(spec.AuditDir(), args[0])
if !checkpoint.HasCheckPoint() {
if err := checkpoint.SetCheckPoint(file); err != nil {
return errors.Annotate(err, "set checkpoint failed")
}
}

args, err := audit.CommandArgs(file)
if err != nil {
return errors.Annotate(err, "read audit log failed")
}

if !skipConfirm {
if err := cliutil.PromptForConfirmOrAbortError(
fmt.Sprintf("Will replay the command `tiup dm %s`\nDo you want to continue? [y/N]: ", strings.Join(args[1:], " ")),
); err != nil {
return err
}
}

rootCmd.SetArgs(args[1:])
return rootCmd.Execute()
},
}

return cmd
}
12 changes: 1 addition & 11 deletions components/dm/command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,11 @@ package command
import (
"fmt"
"os"
"path"
"strings"

"github.com/fatih/color"
"github.com/joomcode/errorx"
"github.com/pingcap/errors"
"github.com/pingcap/tiup/components/dm/spec"
"github.com/pingcap/tiup/pkg/checkpoint"
"github.com/pingcap/tiup/pkg/cliutil"
"github.com/pingcap/tiup/pkg/cluster/executor"
"github.com/pingcap/tiup/pkg/cluster/manager"
Expand Down Expand Up @@ -99,12 +96,6 @@ please backup your data before process.`,
fmt.Println("The --native-ssh flag has been deprecated, please use --ssh=system")
}

if gOpt.CheckPoint != "" {
if err := checkpoint.SetCheckPoint(path.Join(cspec.AuditDir(), gOpt.CheckPoint)); err != nil {
return errors.Annotate(err, "set checkpoint failed")
}
}

return nil
},
PersistentPostRunE: func(cmd *cobra.Command, args []string) error {
Expand All @@ -119,9 +110,7 @@ please backup your data before process.`,
rootCmd.PersistentFlags().BoolVarP(&skipConfirm, "yes", "y", false, "Skip all confirmations and assumes 'yes'")
rootCmd.PersistentFlags().BoolVar(&gOpt.NativeSSH, "native-ssh", gOpt.NativeSSH, "Use the SSH client installed on local system instead of the build-in one.")
rootCmd.PersistentFlags().StringVar((*string)(&gOpt.SSHType), "ssh", "", "The executor type: 'builtin', 'system', 'none'")
rootCmd.PersistentFlags().StringVar(&gOpt.CheckPoint, "checkpoint", "", "(EXPERIMENTAL) The audit log ID this command should recover from.")
_ = rootCmd.PersistentFlags().MarkHidden("native-ssh")
_ = rootCmd.PersistentFlags().MarkHidden("checkpoint")

rootCmd.AddCommand(
newDeployCmd(),
Expand All @@ -143,6 +132,7 @@ please backup your data before process.`,
newImportCmd(),
newEnableCmd(),
newDisableCmd(),
newReplayCmd(),
newTemplateCmd(),
)
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var (
DebugCheckpoint = os.Getenv("DEBUG_CHECKPOINT") == "1"
)

// SetCheckPoint set global checkpoint for executor
// SetCheckPoint set global checkpoint for replay
func SetCheckPoint(file string) error {
pointReader, err := os.Open(file)
if err != nil {
Expand All @@ -63,6 +63,11 @@ func SetCheckPoint(file string) error {
return nil
}

// HasCheckPoint returns if SetCheckPoint has been called
func HasCheckPoint() bool {
return checkpoint != nil
}

// Acquire wraps CheckPoint.Acquire
func Acquire(ctx context.Context, point map[string]interface{}) *Point {
if ctx.Value(goroutineKey) == nil || ctx.Value(semKey) == nil {
Expand Down Expand Up @@ -202,11 +207,16 @@ next_set:
if cf.eq == nil {
continue
}
if !cf.eq(ma[cf.field], mb[cf.field]) {
if !contains(ma, cf.field) || !contains(mb, cf.field) || !cf.eq(ma[cf.field], mb[cf.field]) {
continue next_set
}
}
return true
}
return false
}

func contains(m map[string]interface{}, f string) bool {
_, ok := m[f]
return ok
}
72 changes: 58 additions & 14 deletions pkg/cluster/audit/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package audit
import (
"bufio"
"fmt"
"net/url"
"os"
"path/filepath"
"sort"
Expand All @@ -30,22 +31,51 @@ import (
"github.com/pingcap/tiup/pkg/utils/rand"
)

// ShowAuditList show the audit list.
func ShowAuditList(dir string) error {
firstLine := func(fileName string) (string, error) {
file, err := os.Open(filepath.Join(dir, fileName))
if err != nil {
return "", errors.Trace(err)
}
defer file.Close()
// CommandArgs returns the original commands from the first line of a file
func CommandArgs(fp string) ([]string, error) {
file, err := os.Open(fp)
if err != nil {
return nil, errors.Trace(err)
}
defer file.Close()

scanner := bufio.NewScanner(file)
if !scanner.Scan() {
return nil, errors.New("unknown audit log format")
}

args := strings.Split(scanner.Text(), " ")
return decodeCommandArgs(args)
}

// encodeCommandArgs encode args with url.QueryEscape
func encodeCommandArgs(args []string) []string {
encoded := []string{}

scanner := bufio.NewScanner(file)
if scanner.Scan() {
return scanner.Text(), nil
for _, arg := range args {
encoded = append(encoded, url.QueryEscape(arg))
}

return encoded
}

// decodeCommandArgs decode args with url.QueryUnescape
func decodeCommandArgs(args []string) ([]string, error) {
decoded := []string{}

for _, arg := range args {
a, err := url.QueryUnescape(arg)
if err != nil {
return nil, errors.Annotate(err, "failed on decode the command line of audit log")
}
return "", errors.New("unknown audit log format")
decoded = append(decoded, a)
}

return decoded, nil
}

// ShowAuditList show the audit list.
func ShowAuditList(dir string) error {
// Header
clusterTable := [][]string{{"ID", "Time", "Command"}}
fileInfos, err := os.ReadDir(dir)
Expand All @@ -60,10 +90,11 @@ func ShowAuditList(dir string) error {
if err != nil {
continue
}
cmd, err := firstLine(fi.Name())
args, err := CommandArgs(filepath.Join(dir, fi.Name()))
if err != nil {
continue
}
cmd := strings.Join(args, " ")
clusterTable = append(clusterTable, []string{
fi.Name(),
t.Format(time.RFC3339),
Expand All @@ -82,7 +113,20 @@ func ShowAuditList(dir string) error {
// OutputAuditLog outputs audit log.
func OutputAuditLog(dir string, data []byte) error {
fname := filepath.Join(dir, base52.Encode(time.Now().UnixNano()+rand.Int63n(1000)))
return os.WriteFile(fname, data, 0644)
f, err := os.Create(fname)
if err != nil {
return errors.Annotate(err, "create audit log")
}
defer f.Close()

args := encodeCommandArgs(os.Args)
if _, err := f.Write([]byte(strings.Join(args, " ") + "\n")); err != nil {
return errors.Annotate(err, "write audit log")
}
if _, err := f.Write(data); err != nil {
return errors.Annotate(err, "write audit log")
}
return nil
}

// ShowAuditLog show the audit with the specified auditID
Expand Down
1 change: 0 additions & 1 deletion pkg/cluster/operation/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ type Options struct {
SSHTimeout uint64 // timeout in seconds when connecting an SSH server
OptTimeout uint64 // timeout in seconds for operations that support it, not to confuse with SSH timeout
APITimeout uint64 // timeout in seconds for API operations that support it, like transferring store leader
CheckPoint string // the audit log ID where we should recover from, this is useful when an action failed and we want to continue that action
IgnoreConfigCheck bool // should we ignore the config check result after init config
NativeSSH bool // should use native ssh client or builtin easy ssh (deprecated, shoule use SSHType)
SSHType executor.SSHType // the ssh type: 'builtin', 'system', 'none'
Expand Down
4 changes: 1 addition & 3 deletions pkg/logger/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ package logger

import (
"bytes"
"os"
"strings"

"github.com/pingcap/tiup/pkg/cluster/audit"
utils2 "github.com/pingcap/tiup/pkg/utils"
Expand All @@ -41,7 +39,7 @@ func DisableAuditLog() {
}

func newAuditLogCore() zapcore.Core {
auditBuffer = bytes.NewBufferString(strings.Join(os.Args, " ") + "\n")
auditBuffer = bytes.NewBuffer([]byte{})
encoder := zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig())
return zapcore.NewCore(encoder, zapcore.Lock(zapcore.AddSync(auditBuffer)), zapcore.DebugLevel)
}
Expand Down
Loading

0 comments on commit a73c32e

Please sign in to comment.