Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for additional compressors/decompressors #7978

Closed
wants to merge 9 commits into from
76 changes: 73 additions & 3 deletions go/vt/mysqlctl/xtrabackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bufio"
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
Expand Down Expand Up @@ -63,6 +64,9 @@ var (
// striping mode
xtrabackupStripes = flag.Uint("xtrabackup_stripes", 0, "If greater than 0, use data striping across this many destination files to parallelize data transfer and decompression")
xtrabackupStripeBlockSize = flag.Uint("xtrabackup_stripe_block_size", 102400, "Size in bytes of each block that gets sent to a given stripe before rotating to the next stripe")
// use and external command to decompress the backups
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: and -> an

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While adding support to builtinbackupengine, all these flags should be renamed so that they are not xtrabackup specific.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @deepthi thanks for the feedback. I will add support for it this week in the builtin engine and for the tests 👍

decompressMethod = flag.String("decompress_method", "builtin", "what decompressor to use [builtin|external]")
externalDecompressor = flag.String("external_decompressor", "", "command with arguments to which decompressor to run")
)

const (
Expand Down Expand Up @@ -519,17 +523,74 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log
}
}()

var (
decompressorCmd *exec.Cmd
decompressorWg sync.WaitGroup
)
srcReaders := []io.Reader{}
srcDecompressors := []io.ReadCloser{}

for _, file := range srcFiles {
reader := io.Reader(file)

// Create the decompressor if needed.
if compressed {
decompressor, err := pgzip.NewReader(reader)
if err != nil {
return vterrors.Wrap(err, "can't create gzip decompressor")
var decompressor io.ReadCloser

switch *decompressMethod {
case "builtin":
logger.Infof("Using built-in decompressor")

decompressor, err = pgzip.NewReader(reader)
if err != nil {
return vterrors.Wrap(err, "can't create gzip decompressor")
}
case "external":
decompressorFlags := strings.Split(*externalDecompressor, " ")
if len(decompressorFlags) < 1 {
return vterrors.Wrap(err, "external_decompressor is empty")
}

decompressorCmdPath, err := validateExternalDecompressor(decompressorFlags[0])
if err != nil {
return vterrors.Wrap(err, "could not validate external decompressor")
}

decompressorCmd = exec.CommandContext(ctx, decompressorCmdPath, decompressorFlags[1:]...)
decompressorCmd.Stdin = reader

logger.Infof("Decompressing using %v", decompressorFlags)

decompressorOut, err := decompressorCmd.StdoutPipe()
if err != nil {
return vterrors.Wrap(err, "cannot create external decompressor stdout pipe")
}

decompressorErr, err := decompressorCmd.StderrPipe()
if err != nil {
return vterrors.Wrap(err, "cannot create external decompressor stderr pipe")
}

if err := decompressorCmd.Start(); err != nil {
return vterrors.Wrap(err, "can't start external decompressor")
}

decompressorWg.Add(1)
go scanLinesToLogger("decompressor stderr", decompressorErr, logger, decompressorWg.Done)

decompressor = decompressorOut

defer func() {
decompressorWg.Wait()
// log the exit status
if err := decompressorCmd.Wait(); err != nil {
vterrors.Wrap(err, "external decompressor failed")
}
}()
default:
return vterrors.Wrap(err, "unknown decompressor method")
}

srcDecompressors = append(srcDecompressors, decompressor)
reader = decompressor
}
Expand Down Expand Up @@ -822,6 +883,15 @@ func (be *XtrabackupEngine) ShouldDrainForBackup() bool {
return false
}

// Validates if the external decompressor exists and return its path.
func validateExternalDecompressor(cmd string) (string, error) {
if cmd == "" {
return "", errors.New("external compressor command is empty")
}

return exec.LookPath(cmd)
}

func init() {
BackupRestoreEngineMap[xtrabackupEngineName] = &XtrabackupEngine{}
}
39 changes: 39 additions & 0 deletions go/vt/mysqlctl/xtrabackupengine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package mysqlctl

import (
"bytes"
"fmt"
"io"
"math/rand"
"strings"
"testing"

"vitess.io/vitess/go/vt/logutil"
Expand Down Expand Up @@ -115,3 +117,40 @@ func TestStripeRoundTrip(t *testing.T) {
// Test block size and stripe count that don't evenly divide data size.
test(6000, 7)
}

func TestValidateExternalDecompressor(t *testing.T) {
tests := []struct {
cmdName string
path string
errStr string
}{
// this should not find an executable
{"non_existent_cmd", "", "executable file not found"},
// we expect ls to be on PATH as it is a basic command part of busybox and most containers
{"ls", "ls", ""},
}

for i, tt := range tests {
t.Run(fmt.Sprintf("Test #%d", i+1), func(t *testing.T) {
CmdName := tt.cmdName

path, err := validateExternalDecompressor(CmdName)

if tt.path != "" {
if !strings.HasSuffix(path, tt.path) {
t.Errorf("Expected path \"%s\" to include \"%s\"", path, tt.path)
}
}

if tt.errStr == "" {
if err != nil {
t.Errorf("Expected result \"%v\", got \"%v\"", "<nil>", err)
}
} else {
if !strings.Contains(fmt.Sprintf("%v", err), tt.errStr) {
t.Errorf("Expected result \"%v\", got \"%v\"", tt.errStr, err)
}
}
})
}
}