Skip to content

Commit

Permalink
Merge branch 'master' into log-backup-support-restart-operator
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Oct 21, 2022
2 parents 8bdf05a + ec8974c commit e204c89
Show file tree
Hide file tree
Showing 53 changed files with 9,409 additions and 5,309 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ rules:
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["persistentvolumes"]
verbs: ["get", "list", "watch", "patch","update"]
verbs: ["get", "list", "watch", "patch", "update", "create"]
- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses"]
verbs: ["get", "list", "watch"]
Expand Down
126 changes: 123 additions & 3 deletions cmd/backup-manager/app/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,23 @@ import (
"context"
"fmt"
"io/ioutil"
"os"
"os/exec"
"path"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"

"github.com/dustin/go-humanize"
backupUtil "github.com/pingcap/tidb-operator/cmd/backup-manager/app/util"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
backupConst "github.com/pingcap/tidb-operator/pkg/backup/constants"
"github.com/pingcap/tidb-operator/pkg/controller"
"github.com/pingcap/tidb-operator/pkg/util"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
)

Expand All @@ -36,7 +44,11 @@ type Options struct {
}

// backupData generates br args and runs br binary to do the real backup work
func (bo *Options) backupData(ctx context.Context, backup *v1alpha1.Backup) error {
func (bo *Options) backupData(
ctx context.Context,
backup *v1alpha1.Backup,
statusUpdater controller.BackupConditionUpdaterInterface,
) error {
var backupType string
if backup.Spec.Type == "" {
backupType = string(v1alpha1.BackupTypeFull)
Expand All @@ -47,11 +59,68 @@ func (bo *Options) backupData(ctx context.Context, backup *v1alpha1.Backup) erro
"backup",
backupType,
}

var logCallback func(line string)
// Add extra args for volume snapshot backup.
if backup.Spec.Mode == v1alpha1.BackupModeVolumeSnapshot {
var (
progressFile = "progress.txt"
progressStep = "Full Backup"
successTag = "EBS backup success"
)

cloudSnapMeta := os.Getenv(backupConst.EnvCloudSnapMeta)
if cloudSnapMeta == "" {
return fmt.Errorf("cloud snapshot metadata not found, env %s is empty", backupConst.EnvCloudSnapMeta)
}
klog.Infof("Running cloud-snapshot-backup with metadata: %s", cloudSnapMeta)
csbPath := path.Join(util.BRBinPath, "csb_backup.json")
err := os.WriteFile(csbPath, []byte(cloudSnapMeta), 0644)
if err != nil {
return err
}
// Currently, we only support aws ebs volume snapshot.
specificArgs = append(specificArgs, "--type=aws-ebs")
specificArgs = append(specificArgs, fmt.Sprintf("--volume-file=%s", csbPath))
logCallback = func(line string) {
if strings.Contains(line, successTag) {
extract := strings.Split(line, successTag)[1]
sizeStr := regexp.MustCompile(`size=(\d+)`).FindString(extract)
size := strings.ReplaceAll(sizeStr, "size=", "")
tsStr := regexp.MustCompile(`resolved_ts=(\d+)`).FindString(extract)
ts := strings.ReplaceAll(tsStr, "resolved_ts=", "")
klog.Infof("%s size: %s, resolved_ts: %s", successTag, size, ts)

backupSize, err := strconv.ParseInt(size, 10, 64)
if err != nil {
klog.Warningf("Failed to parse BackupSize %s, %v", size, err)
}
backupSize = backupSize << 30 // Convert GiB to bytes.
backupSizeReadable := humanize.Bytes(uint64(backupSize))
progress := 100.0
if err := statusUpdater.Update(backup, nil, &controller.BackupUpdateStatus{
CommitTs: &ts,
BackupSize: &backupSize,
BackupSizeReadable: &backupSizeReadable,
ProgressStep: &progressStep,
Progress: &progress,
ProgressUpdateTime: &metav1.Time{Time: time.Now()},
}); err != nil {
klog.Errorf("Failed to update BackupUpdateStatus for cluster %s, %v", bo, err)
}
}
}

progressCtx, cancel := context.WithCancel(ctx)
defer cancel()
go bo.updateProgressFromFile(progressCtx.Done(), backup, progressFile, progressStep, statusUpdater)
}

fullArgs, err := bo.backupCommandTemplate(backup, specificArgs)
if err != nil {
return err
}
return bo.brCommandRun(ctx, fullArgs)
return bo.brCommandRunWithLogCallback(ctx, fullArgs, logCallback)
}

// constructOptions constructs options for BR
Expand Down Expand Up @@ -157,8 +226,13 @@ func (bo *Options) backupCommandTemplate(backup *v1alpha1.Backup, specificArgs [
return fullArgs, nil
}

// brCommandRun run br binary to do backup work
// brCommandRun run br binary to do backup work.
func (bo *Options) brCommandRun(ctx context.Context, fullArgs []string) error {
return bo.brCommandRunWithLogCallback(ctx, fullArgs, nil)
}

// brCommandRun run br binary to do backup work with log callback.
func (bo *Options) brCommandRunWithLogCallback(ctx context.Context, fullArgs []string, logCallback func(line string)) error {
if len(fullArgs) == 0 {
return fmt.Errorf("command is invalid, fullArgs: %v", fullArgs)
}
Expand All @@ -185,6 +259,9 @@ func (bo *Options) brCommandRun(ctx context.Context, fullArgs []string) error {
if strings.Contains(line, "[ERROR]") {
errMsg += line
}
if logCallback != nil {
logCallback(line)
}

klog.Info(strings.Replace(line, "\n", "", -1))
if err != nil {
Expand All @@ -204,3 +281,46 @@ func (bo *Options) brCommandRun(ctx context.Context, fullArgs []string) error {
klog.Infof("Run br commond %v for cluster %s successfully", fullArgs, bo)
return nil
}

func (bo *Options) updateProgressFromFile(
stopCh <-chan struct{},
backup *v1alpha1.Backup,
progressFile string,
progressStep string,
statusUpdater controller.BackupConditionUpdaterInterface,
) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
data, err := os.ReadFile(progressFile)
if err != nil {
if !os.IsNotExist(err) {
klog.Warningf("Failed to read progress file %s: %v", progressFile, err)
}
continue
}
progressStr := strings.TrimSpace(string(data))
progressStr = strings.TrimSuffix(progressStr, "%")
if progressStr == "" {
continue
}
progress, err := strconv.ParseFloat(progressStr, 64)
if err != nil {
klog.Warningf("Failed to parse progress %s, err: %v", string(data), err)
continue
}
if err := statusUpdater.Update(backup, nil, &controller.BackupUpdateStatus{
ProgressStep: &progressStep,
Progress: &progress,
ProgressUpdateTime: &metav1.Time{Time: time.Now()},
}); err != nil {
klog.Errorf("Failed to update BackupUpdateStatus for cluster %s, %v", bo, err)
}
case <-stopCh:
return
}
}
}
68 changes: 38 additions & 30 deletions cmd/backup-manager/app/backup/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (bm *Manager) performBackup(ctx context.Context, backup *v1alpha1.Backup, d
}

// run br binary to do the real job
backupErr := bm.backupData(ctx, backup)
backupErr := bm.backupData(ctx, backup, bm.StatusUpdater)

if db != nil && oldTikvGCTimeDuration < tikvGCTimeDuration {
// use another context to revert `tikv_gc_life_time` back.
Expand Down Expand Up @@ -318,35 +318,43 @@ func (bm *Manager) performBackup(ctx context.Context, backup *v1alpha1.Backup, d
}
klog.Infof("backup cluster %s data to %s success", bm, backupFullPath)

backupMeta, err := util.GetBRMetaData(ctx, backup.Spec.StorageProvider)
if err != nil {
errs = append(errs, err)
klog.Errorf("Get backup metadata for backup files in %s of cluster %s failed, err: %s", backupFullPath, bm, err)
uerr := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Reason: "GetBackupMetadataFailed",
Message: err.Error(),
}, nil)
errs = append(errs, uerr)
return errorutils.NewAggregate(errs)
}
klog.Infof("Get br metadata for backup files in %s of cluster %s success", backupFullPath, bm)
size := util.GetBRArchiveSize(backupMeta)
commitTs := backupMeta.EndVersion
klog.Infof("Get size %d for backup files in %s of cluster %s success", size, backupFullPath, bm)
klog.Infof("Get cluster %s commitTs %d success", bm, commitTs)
finish := time.Now()

backupSize := int64(size)
backupSizeReadable := humanize.Bytes(uint64(size))
ts := strconv.FormatUint(commitTs, 10)
updateStatus := &controller.BackupUpdateStatus{
TimeStarted: &metav1.Time{Time: started},
TimeCompleted: &metav1.Time{Time: finish},
BackupSize: &backupSize,
BackupSizeReadable: &backupSizeReadable,
CommitTs: &ts,
var updateStatus *controller.BackupUpdateStatus
switch bm.Mode {
case string(v1alpha1.BackupModeVolumeSnapshot):
// In volume snapshot mode, commitTS and size have been updated according to the
// br command output, so we don't need to update them here.
updateStatus = &controller.BackupUpdateStatus{
TimeStarted: &metav1.Time{Time: started},
TimeCompleted: &metav1.Time{Time: time.Now()},
}
default:
backupMeta, err := util.GetBRMetaData(ctx, backup.Spec.StorageProvider)
if err != nil {
errs = append(errs, err)
klog.Errorf("Get backup metadata for backup files in %s of cluster %s failed, err: %s", backupFullPath, bm, err)
uerr := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Reason: "GetBackupMetadataFailed",
Message: err.Error(),
}, nil)
errs = append(errs, uerr)
return errorutils.NewAggregate(errs)
}
klog.Infof("Get br metadata for backup files in %s of cluster %s success", backupFullPath, bm)
backupSize := int64(util.GetBRArchiveSize(backupMeta))
backupSizeReadable := humanize.Bytes(uint64(backupSize))
commitTS := backupMeta.EndVersion
klog.Infof("Get size %d for backup files in %s of cluster %s success", backupSize, backupFullPath, bm)
klog.Infof("Get cluster %s commitTs %d success", bm, commitTS)
ts := strconv.FormatUint(commitTS, 10)
updateStatus = &controller.BackupUpdateStatus{
TimeStarted: &metav1.Time{Time: started},
TimeCompleted: &metav1.Time{Time: time.Now()},
BackupSize: &backupSize,
BackupSizeReadable: &backupSizeReadable,
CommitTs: &ts,
}
}
return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupComplete,
Expand Down
Loading

0 comments on commit e204c89

Please sign in to comment.