Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New crd for log compact #5822

Open
wants to merge 51 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
9240101
new types
RidRisR Oct 28, 2024
df3c191
add new crd
RidRisR Oct 28, 2024
92c9819
trry
RidRisR Oct 28, 2024
a420fdb
adapt controller
RidRisR Oct 28, 2024
69b6c7b
register kind
RidRisR Oct 28, 2024
4923c61
adjust param
RidRisR Oct 29, 2024
24b88b8
upload state
RidRisR Oct 29, 2024
50b0bb2
fix
RidRisR Oct 29, 2024
28a38e4
remove cluster dependency
RidRisR Oct 30, 2024
f546333
refactor getStoragePath
RidRisR Oct 30, 2024
de989f8
add env vars
RidRisR Oct 31, 2024
a744d6b
use backup-manager
RidRisR Oct 31, 2024
e042927
opt(ticdc): support to overwrite the default cluster TLS cert secret …
ideascf Oct 29, 2024
ced6412
Revert "opt(ticdc): support to overwrite the default cluster TLS cert…
ideascf Oct 30, 2024
b6faebd
add compact command to backup manager
YuJuncen Oct 29, 2024
8f81357
adapt backup-manager params
RidRisR Oct 31, 2024
bf9b185
attach to tc
RidRisR Dec 4, 2024
f9aafcb
lint
RidRisR Dec 4, 2024
ed78dd9
Merge branch 'master' into newcrd
RidRisR Dec 4, 2024
523281b
copyright
RidRisR Dec 4, 2024
5fd163c
Merge branch 'master' into newcrd
RidRisR Dec 4, 2024
00e395d
refactor
RidRisR Dec 6, 2024
b0f3c65
fix
RidRisR Dec 6, 2024
2569354
fix types name
RidRisR Dec 9, 2024
53a443c
delete unused
RidRisR Dec 11, 2024
ce4005a
more detailed status
RidRisR Dec 12, 2024
d3537b4
fix
RidRisR Dec 12, 2024
cde1ef6
lint
RidRisR Dec 12, 2024
2ebd69e
add test
RidRisR Dec 12, 2024
67142e8
fix test
RidRisR Dec 12, 2024
6fc663f
report status when fail
RidRisR Dec 12, 2024
da1f000
report progress in e2e test
RidRisR Dec 12, 2024
ab3419a
Update manifests/backup/backup-rbac.yaml
RidRisR Dec 17, 2024
975fec2
mod types
RidRisR Dec 17, 2024
9d7fc30
make generate
RidRisR Dec 17, 2024
6594acf
handle error
RidRisR Dec 18, 2024
74d0015
add necessary interface for retry
RidRisR Dec 23, 2024
09873ae
add control interface
RidRisR Dec 23, 2024
c9e78c8
fix updater usage
RidRisR Dec 23, 2024
13d51a1
status_updater formally done
RidRisR Dec 23, 2024
8fb8ae8
simplify codes
RidRisR Dec 23, 2024
ed3e4ba
caller side
RidRisR Dec 23, 2024
b6e8831
add retry impl
RidRisR Dec 25, 2024
d4d8a86
add helper functions
RidRisR Dec 25, 2024
26bf6ed
revert backoff logic
RidRisR Dec 25, 2024
4826093
add skip for already running job
RidRisR Dec 26, 2024
129ff36
make generate
RidRisR Dec 26, 2024
101c5cc
initialize status updater
RidRisR Dec 26, 2024
6411b8a
apply backoff limit
RidRisR Dec 26, 2024
f0d740a
add check for eerr
RidRisR Dec 26, 2024
9e1f0f8
make generate
RidRisR Dec 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/backup-manager/app/backup/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (bm *Manager) performBackup(ctx context.Context, backup *v1alpha1.Backup, d

var errs []error

backupFullPath, err := util.GetStoragePath(backup)
backupFullPath, err := util.GetStoragePath(&backup.Spec.StorageProvider)
if err != nil {
errs = append(errs, err)
uerr := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Expand Down Expand Up @@ -506,7 +506,7 @@ func (bm *Manager) performLogBackup(ctx context.Context, backup *v1alpha1.Backup
// startLogBackup starts log backup.
func (bm *Manager) startLogBackup(ctx context.Context, backup *v1alpha1.Backup) (*controller.BackupUpdateStatus, string, error) {
started := time.Now()
backupFullPath, err := util.GetStoragePath(backup)
backupFullPath, err := util.GetStoragePath(&backup.Spec.StorageProvider)
if err != nil {
klog.Errorf("Get backup full path of cluster %s failed, err: %s", bm, err)
return nil, "GetBackupRemotePathFailed", err
Expand Down
5 changes: 2 additions & 3 deletions cmd/backup-manager/app/clean/clean_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ import (
"github.com/agiledragon/gomonkey/v2"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
. "github.com/onsi/gomega"
"gocloud.dev/blob"
"gocloud.dev/blob/driver"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/backup/util"
"gocloud.dev/blob"
"gocloud.dev/blob/driver"
)

func TestCleanBRRemoteBackupDataOnce(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions cmd/backup-manager/app/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func NewBackupMgrCommand() *cobra.Command {
cmds.AddCommand(NewRestoreCommand())
cmds.AddCommand(NewImportCommand())
cmds.AddCommand(NewCleanCommand())
cmds.AddCommand(NewCompactCommand())
return cmds
}

Expand Down
69 changes: 69 additions & 0 deletions cmd/backup-manager/app/cmd/compact.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package cmd

import (
"context"

"github.com/pingcap/tidb-operator/cmd/backup-manager/app/compact"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/compact/options"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/util"
informers "github.com/pingcap/tidb-operator/pkg/client/informers/externalversions"
"github.com/spf13/cobra"
"k8s.io/client-go/tools/cache"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
)

func NewCompactCommand() *cobra.Command {
opts := options.CompactOpts{}

cmd := &cobra.Command{
Use: "compact",
Short: "Compact log backup.",
Run: func(cmd *cobra.Command, args []string) {
util.ValidCmdFlags(cmd.CommandPath(), cmd.LocalFlags())
cmdutil.CheckErr(runCompact(opts, kubecfg))
},
}

cmd.Flags().StringVar(&opts.Namespace, "namespace", "", "Backup CR's namespace")
cmd.Flags().StringVar(&opts.ResourceName, "resourceName", "", "Backup CRD object name")
return cmd
}

func runCompact(compactOpts options.CompactOpts, kubecfg string) error {
kubeCli, cli, err := util.NewKubeAndCRCli(kubecfg)
if err != nil {
return err
}
options := []informers.SharedInformerOption{
informers.WithNamespace(compactOpts.Namespace),
}
informerFactory := informers.NewSharedInformerFactoryWithOptions(cli, constants.ResyncDuration, options...)
recorder := util.NewEventRecorder(kubeCli, "compact-manager")
compactInformer := informerFactory.Pingcap().V1alpha1().CompactBackups()
statusUpdater := compact.NewCompactStatusUpdater(recorder, compactInformer.Lister(), cli)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go informerFactory.Start(ctx.Done())

// waiting for the shared informer's store has synced.
cache.WaitForCacheSync(ctx.Done(), compactInformer.Informer().HasSynced)

// klog.Infof("start to process backup %s", compactOpts.String())
cm := compact.NewManager(compactInformer.Lister(), statusUpdater, compactOpts)
return cm.ProcessCompact()
}
229 changes: 229 additions & 0 deletions cmd/backup-manager/app/compact/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package compact

import (
"bytes"
"context"
"encoding/json"
"io"
"os"
"os/exec"
"path/filepath"
"strconv"

"github.com/pingcap/errors"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/compact/options"
backuputil "github.com/pingcap/tidb-operator/cmd/backup-manager/app/util"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
pkgutil "github.com/pingcap/tidb-operator/pkg/backup/util"
listers "github.com/pingcap/tidb-operator/pkg/client/listers/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/util"
"k8s.io/klog/v2"
)

type Progress struct {
MetaCompleted uint64 `json:"meta_completed"`
MetaTotal uint64 `json:"meta_total"`
BytesToCompact uint64 `json:"bytes_to_compact"`
BytesCompacted uint64 `json:"bytes_compacted"`
}

// logLine is line of JSON log.
// It just extracted the message from the JSON and keeps the origin json bytes.
// So you may extract fields from it by `json.Unmarshal(l.Raw, ...)`.
type logLine struct {
Message string `json:"Message"`
Raw json.RawMessage `json:"-"`
}

// Manager mainly used to manage backup related work
type Manager struct {
compact *v1alpha1.CompactBackup
resourceLister listers.CompactBackupLister
statusUpdater *CompactStatusUpdater
options options.CompactOpts
}

// NewManager return a Manager
func NewManager(
lister listers.CompactBackupLister,
statusUpdater *CompactStatusUpdater,
compactOpts options.CompactOpts) *Manager {
compact, err := lister.CompactBackups(compactOpts.Namespace).Get(compactOpts.ResourceName)
if err != nil {
klog.Errorf("can't find compact %s:%s CRD object, err: %v", compactOpts.Namespace, compactOpts.ResourceName, err)
return nil
}
return &Manager{
compact,
lister,
statusUpdater,
compactOpts,
}
}

func (cm *Manager) brBin() string {
return filepath.Join(util.BRBinPath, "br")
}

func (cm *Manager) kvCtlBin() string {
return filepath.Join(util.KVCTLBinPath, "tikv-ctl")
}

// ProcessBackup used to process the backup logic
func (cm *Manager) ProcessCompact() error {
ctx, cancel := backuputil.GetContextForTerminationSignals(cm.options.ResourceName)
defer cancel()

compact, err := cm.resourceLister.CompactBackups(cm.options.Namespace).Get(cm.options.ResourceName)
defer func() { cm.statusUpdater.OnFinish(ctx, err, cm.compact) }()
if err != nil {
return errors.New("backup not found")
}
if err := options.ParseCompactOptions(compact, &cm.options); err != nil {
return errors.Annotate(err, "failed to parse compact options")
}

b64, err := cm.base64ifyStorage(ctx)
if err != nil {
return errors.Annotate(err, "failed to base64ify storage")
}
return cm.runCompaction(ctx, b64)
}

func (cm *Manager) base64ifyStorage(ctx context.Context) (string, error) {
brCmd, err := cm.base64ifyCmd(ctx)
if err != nil {
return "", err
}
out, err := brCmd.Output()
if err != nil {
eerr := err.(*exec.ExitError)
RidRisR marked this conversation as resolved.
Show resolved Hide resolved
klog.Warningf("Failed to execute base64ify; stderr = %s", string(eerr.Stderr))
return "", errors.Annotatef(err, "failed to execute BR with args %v", brCmd.Args)
}
out = bytes.Trim(out, "\r\n \t")
return string(out), nil
}

func (cm *Manager) base64ifyCmd(ctx context.Context) (*exec.Cmd, error) {
br := cm.brBin()
args := []string{
"operator",
"base64ify",
}
StorageOpts, err := pkgutil.GenStorageArgsForFlag(cm.compact.Spec.StorageProvider, "storage")
if err != nil {
return nil, err
}
args = append(args, StorageOpts...)
return exec.CommandContext(ctx, br, args...), nil
}

func (cm *Manager) runCompaction(ctx context.Context, base64Storage string) (err error) {
cmd := cm.compactCmd(ctx, base64Storage)

logs, err := cmd.StderrPipe()
RidRisR marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Annotate(err, "failed to create stderr pipe for compact")
}
if err := cmd.Start(); err != nil {
return errors.Annotate(err, "failed to start compact")
}

cm.statusUpdater.OnStart(ctx, cm.compact)
err = cm.processCompactionLogs(ctx, io.TeeReader(logs, os.Stdout))
if err != nil {
return err
}

return cmd.Wait()
}

func (cm *Manager) compactCmd(ctx context.Context, base64Storage string) *exec.Cmd {
ctl := cm.kvCtlBin()
// You should not change the log configuration here, it should sync with the upstream setting
args := []string{
"--log-level",
"INFO",
"--log-format",
"json",
"compact-log-backup",
"--storage-base64",
base64Storage,
"--from",
strconv.FormatUint(cm.options.FromTS, 10),
"--until",
strconv.FormatUint(cm.options.UntilTS, 10),
"-N",
strconv.FormatUint(cm.options.Concurrency, 10),
}
return exec.CommandContext(ctx, ctl, args...)
}

func (cm *Manager) processCompactionLogs(ctx context.Context, logStream io.Reader) error {
dec := json.NewDecoder(logStream)

for dec.More() {
if ctx.Err() != nil {
return ctx.Err()
}

var raw json.RawMessage
if err := dec.Decode(&raw); err != nil {
return errors.Annotate(err, "failed to decode raw log line")
}

var line logLine
if err := json.Unmarshal(raw, &line); err != nil {
return errors.Annotate(err, "failed to decode the line of log")
}
line.Raw = raw

if err := cm.processLogLine(ctx, line); err != nil {
return err
}
}

return nil
}

func (cm *Manager) processLogLine(ctx context.Context, l logLine) error {
const (
RidRisR marked this conversation as resolved.
Show resolved Hide resolved
messageCompactionDone = "Finishing compaction."
messageCompactAborted = "Compaction aborted."
)

switch l.Message {
case messageCompactionDone:
var prog Progress
if err := json.Unmarshal(l.Raw, &prog); err != nil {
return errors.Annotatef(err, "failed to decode progress message: %s", string(l.Raw))
}
cm.statusUpdater.OnProgress(ctx, prog, cm.compact)
return nil
case messageCompactAborted:
errContainer := struct {
Err string `json:"err"`
}{}
if err := json.Unmarshal(l.Raw, &errContainer); err != nil {
return errors.Annotatef(err, "failed to decode error message: %s", string(l.Raw))
}
return errors.New(errContainer.Err)
default:
klog.Infof("progress log: %s", l.Message)
RidRisR marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
}
Loading
Loading