Skip to content

Commit

Permalink
change to support an external decompressor
Browse files Browse the repository at this point in the history
Signed-off-by: Renan Rangel <[email protected]>
  • Loading branch information
rvrangel committed Apr 28, 2021
1 parent c5cd306 commit 4becf1b
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 3 deletions.
76 changes: 73 additions & 3 deletions go/vt/mysqlctl/xtrabackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bufio"
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
Expand Down Expand Up @@ -63,6 +64,9 @@ var (
// striping mode
xtrabackupStripes = flag.Uint("xtrabackup_stripes", 0, "If greater than 0, use data striping across this many destination files to parallelize data transfer and decompression")
xtrabackupStripeBlockSize = flag.Uint("xtrabackup_stripe_block_size", 102400, "Size in bytes of each block that gets sent to a given stripe before rotating to the next stripe")
// use and external command to decompress the backups
decompressMethod = flag.String("decompress_method", "builtin", "what decompressor to use [builtin|external]")
externalDecompressor = flag.String("external_decompressor", "", "command with arguments to which decompressor to run")
)

const (
Expand Down Expand Up @@ -519,17 +523,74 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log
}
}()

var (
decompressorCmd *exec.Cmd
decompressorWg sync.WaitGroup
)
srcReaders := []io.Reader{}
srcDecompressors := []io.ReadCloser{}

for _, file := range srcFiles {
reader := io.Reader(file)

// Create the decompressor if needed.
if compressed {
decompressor, err := pgzip.NewReader(reader)
if err != nil {
return vterrors.Wrap(err, "can't create gzip decompressor")
var decompressor io.ReadCloser

switch *decompressMethod {
case "builtin":
logger.Infof("Using built-in decompressor")

decompressor, err = pgzip.NewReader(reader)
if err != nil {
return vterrors.Wrap(err, "can't create gzip decompressor")
}
case "external":
decompressorFlags := strings.Split(*externalDecompressor, " ")
if len(decompressorFlags) < 1 {
return vterrors.Wrap(err, "external_decompressor is empty")
}

decompressorCmdPath, err := validateExternalDecompressor(decompressorFlags[0])
if err != nil {
return vterrors.Wrap(err, "could not validate external decompressor")
}

decompressorCmd = exec.CommandContext(ctx, decompressorCmdPath, decompressorFlags[1:]...)
decompressorCmd.Stdin = reader

logger.Infof("Decompressing using %v", decompressorFlags)

decompressorOut, err := decompressorCmd.StdoutPipe()
if err != nil {
return vterrors.Wrap(err, "cannot create external decompressor stdout pipe")
}

decompressorErr, err := decompressorCmd.StderrPipe()
if err != nil {
return vterrors.Wrap(err, "cannot create external decompressor stderr pipe")
}

if err := decompressorCmd.Start(); err != nil {
return vterrors.Wrap(err, "can't start external decompressor")
}

decompressorWg.Add(1)
go scanLinesToLogger("decompressor stderr", decompressorErr, logger, decompressorWg.Done)

decompressor = decompressorOut

defer func() {
decompressorWg.Wait()
// log the exit status
if err := decompressorCmd.Wait(); err != nil {
vterrors.Wrap(err, "external decompressor failed")
}
}()
default:
return vterrors.Wrap(err, "unknown decompressor method")
}

srcDecompressors = append(srcDecompressors, decompressor)
reader = decompressor
}
Expand Down Expand Up @@ -822,6 +883,15 @@ func (be *XtrabackupEngine) ShouldDrainForBackup() bool {
return false
}

// Validates if the external decompressor exists and return its path.
func validateExternalDecompressor(cmd string) (string, error) {
if cmd == "" {
return "", errors.New("external compressor command is empty")
}

return exec.LookPath(cmd)
}

func init() {
BackupRestoreEngineMap[xtrabackupEngineName] = &XtrabackupEngine{}
}
39 changes: 39 additions & 0 deletions go/vt/mysqlctl/xtrabackupengine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package mysqlctl

import (
"bytes"
"fmt"
"io"
"math/rand"
"strings"
"testing"

"vitess.io/vitess/go/vt/logutil"
Expand Down Expand Up @@ -115,3 +117,40 @@ func TestStripeRoundTrip(t *testing.T) {
// Test block size and stripe count that don't evenly divide data size.
test(6000, 7)
}

func TestValidateExternalDecompressor(t *testing.T) {
tests := []struct {
cmdName string
path string
errStr string
}{
// this should not find an executable
{"non_existent_cmd", "", "executable file not found"},
// we expect ls to be on PATH as it is a basic command part of busybox and most containers
{"ls", "ls", ""},
}

for i, tt := range tests {
t.Run(fmt.Sprintf("Test #%d", i+1), func(t *testing.T) {
CmdName := tt.cmdName

path, err := validateExternalDecompressor(CmdName)

if tt.path != "" {
if !strings.HasSuffix(path, tt.path) {
t.Errorf("Expected path \"%s\" to include \"%s\"", path, tt.path)
}
}

if tt.errStr == "" {
if err != nil {
t.Errorf("Expected result \"%v\", got \"%v\"", "<nil>", err)
}
} else {
if !strings.Contains(fmt.Sprintf("%v", err), tt.errStr) {
t.Errorf("Expected result \"%v\", got \"%v\"", tt.errStr, err)
}
}
})
}
}

0 comments on commit 4becf1b

Please sign in to comment.