diff --git a/go.mod b/go.mod index 274b35af712..2d5f4714768 100644 --- a/go.mod +++ b/go.mod @@ -180,6 +180,7 @@ require ( github.com/opencontainers/image-spec v1.0.1 // indirect github.com/opencontainers/runc v1.0.1 // indirect github.com/pelletier/go-toml v1.9.3 // indirect + github.com/pierrec/lz4 v2.6.1+incompatible // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/procfs v0.6.0 // indirect diff --git a/go.sum b/go.sum index c86ba10d434..821bf8d7e01 100644 --- a/go.sum +++ b/go.sum @@ -503,6 +503,8 @@ github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -859,6 +861,8 @@ github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCko github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/philhofer/fwd v1.0.0 h1:UbZqGr5Y38ApvM/V/jEljVxwocdweyH+vmYvRPBnbqQ= github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= +github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= +github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pires/go-proxyproto v0.6.1 h1:EBupykFmo22SDjv4fQVQd2J9NOoLPmyZA/15ldOGkPw= github.com/pires/go-proxyproto v0.6.1/go.mod h1:Odh9VFOZJCf9G8cLW5o435Xf1J95Jw9Gw5rnCjcwzAY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index ea0763d473a..2b0ee6d87bd 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -32,9 +32,6 @@ import ( "sync/atomic" "time" - "github.com/klauspost/pgzip" - "github.com/planetscale/pargzip" - "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/concurrency" @@ -78,6 +75,9 @@ type builtinBackupManifest struct { // BackupManifest is an anonymous embedding of the base manifest struct. BackupManifest + // CompressionEngine stores which compression engine was used to originally compress the files. + CompressionEngine string `json:",omitempty"` + // FileEntries contains all the files in the backup FileEntries []FileEntry @@ -351,9 +351,10 @@ func (be *BuiltinBackupEngine) backupFiles(ctx context.Context, params BackupPar }, // Builtin-specific fields - FileEntries: fes, - TransformHook: *backupStorageHook, - SkipCompress: !*backupStorageCompress, + FileEntries: fes, + TransformHook: *backupStorageHook, + SkipCompress: !*backupStorageCompress, + CompressionEngine: *builtinCompressor, } data, err := json.MarshalIndent(bm, "", " ") if err != nil { @@ -498,13 +499,19 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara } // Create the gzip compression pipe, if necessary. - var gzip *pargzip.Writer + var compressor io.WriteCloser if *backupStorageCompress { - gzip = pargzip.NewWriter(writer) - gzip.ChunkSize = *backupCompressBlockSize - gzip.Parallel = *backupCompressBlocks - gzip.CompressionLevel = pargzip.BestSpeed - writer = gzip + + if *externalCompressorCmd != "" { + compressor, err = newExternalCompressor(ctx, *externalCompressorCmd, writer, params.Logger) + } else { + compressor, err = newBuiltinCompressor(*builtinCompressor, writer, params.Logger) + } + if err != nil { + return vterrors.Wrap(err, "can't create compressor") + } + + writer = compressor } // Copy from the source file to writer (optional gzip, @@ -515,9 +522,9 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara } // Close gzip to flush it, after that all data is sent to writer. - if gzip != nil { - if err = gzip.Close(); err != nil { - return vterrors.Wrap(err, "cannot close gzip") + if compressor != nil { + if err = compressor.Close(); err != nil { + return vterrors.Wrap(err, "cannot close compressor") } } @@ -599,7 +606,7 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP // And restore the file. name := fmt.Sprintf("%v", i) params.Logger.Infof("Copying file %v: %v", name, fes[i].Name) - err := be.restoreFile(ctx, params, bh, &fes[i], bm.TransformHook, !bm.SkipCompress, name) + err := be.restoreFile(ctx, params, bh, &fes[i], bm.TransformHook, !bm.SkipCompress, bm.CompressionEngine, name) if err != nil { rec.RecordError(vterrors.Wrapf(err, "can't restore file %v to %v", name, fes[i].Name)) } @@ -610,7 +617,7 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP } // restoreFile restores an individual file. -func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle, fe *FileEntry, transformHook string, compress bool, name string) (finalErr error) { +func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle, fe *FileEntry, transformHook string, compress bool, compressionEngine string, name string) (finalErr error) { // Open the source file for reading. source, err := bh.ReadFile(ctx, name) if err != nil { @@ -653,21 +660,29 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa // Create the uncompresser if needed. if compress { - gz, err := pgzip.NewReader(reader) + var decompressor io.ReadCloser + + if *externalDecompressorCmd != "" { + decompressor, err = newExternalDecompressor(ctx, *externalDecompressorCmd, reader, params.Logger) + } else { + decompressor, err = newBuiltinDecompressor(compressionEngine, reader, params.Logger) + } if err != nil { - return vterrors.Wrap(err, "can't open gzip decompressor") + return vterrors.Wrap(err, "can't create decompressor") } + defer func() { - if cerr := gz.Close(); cerr != nil { + if cerr := decompressor.Close(); cerr != nil { + params.Logger.Errorf("failed to close decompressor: %v", cerr) if finalErr != nil { // We already have an error, just log this one. - log.Errorf("failed to close gzip decompressor %v: %v", name, cerr) + log.Errorf("failed to close decompressor %v: %v", name, cerr) } else { - finalErr = vterrors.Wrap(err, "failed to close gzip decompressor") + finalErr = vterrors.Wrap(cerr, "failed to close decompressor") } } }() - reader = gz + reader = decompressor } // Copy the data. Will also write to the hasher. diff --git a/go/vt/mysqlctl/compression.go b/go/vt/mysqlctl/compression.go new file mode 100644 index 00000000000..eda0143c6eb --- /dev/null +++ b/go/vt/mysqlctl/compression.go @@ -0,0 +1,313 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mysqlctl + +import ( + "context" + "errors" + "flag" + "fmt" + "io" + "io/ioutil" + "os/exec" + "sync" + + "github.com/google/shlex" + "github.com/klauspost/compress/zstd" + "github.com/klauspost/pgzip" + "github.com/pierrec/lz4" + "github.com/planetscale/pargzip" + + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/vterrors" +) + +var ( + compressionLevel = flag.Int("compression_level", 1, "What level to pass to the compressor") + // switch which compressor/decompressor to use + builtinCompressor = flag.String("builtin_compressor", "pgzip", "which builtin compressor engine to use") + builtinDecompressor = flag.String("builtin_decompressor", "auto", "which builtin decompressor engine to use") + // use and external command to decompress the backups + externalCompressorCmd = flag.String("external_compressor", "", "command with arguments to use when decompressing a backup") + externalCompressorExt = flag.String("external_compressor_extension", "", "which extension to use when using an external decompressor") + externalDecompressorCmd = flag.String("external_decompressor", "", "command with arguments to use when compressing a backup") + + errUnsupportedCompressionEngine = errors.New("unsupported engine") + errUnsupportedCompressionExtension = errors.New("unsupported extension") + + // this is used by getEngineFromExtension() to figure out which engine to use in case the user didn't specify + engineExtensions = map[string][]string{ + ".gz": {"pgzip", "pargzip"}, + ".lz4": {"lz4"}, + ".zst": {"zstd"}, + } +) + +func getEngineFromExtension(extension string) (string, error) { + for ext, eng := range engineExtensions { + if ext == extension { + return eng[0], nil // we select the first supported engine in auto mode + } + } + + return "", fmt.Errorf("%w %q", errUnsupportedCompressionExtension, extension) +} + +func getExtensionFromEngine(engine string) (string, error) { + for ext, eng := range engineExtensions { + for _, e := range eng { + if e == engine { + return ext, nil + } + } + } + + return "", fmt.Errorf("%w %q", errUnsupportedCompressionEngine, engine) +} + +// Validates if the external decompressor exists and return its path. +func validateExternalCmd(cmd string) (string, error) { + if cmd == "" { + return "", errors.New("external command is empty") + } + + return exec.LookPath(cmd) +} + +func prepareExternalCompressionCmd(ctx context.Context, cmdStr string) (*exec.Cmd, error) { + cmdArgs, err := shlex.Split(cmdStr) + if err != nil { + return nil, err + } + + if len(cmdArgs) < 1 { + return nil, errors.New("external command is empty") + } + + cmdPath, err := validateExternalCmd(cmdArgs[0]) + if err != nil { + return nil, err + } + + return exec.CommandContext(ctx, cmdPath, cmdArgs[1:]...), nil +} + +// This returns a writer that writes the compressed output of the external command to the provided writer. +func newExternalCompressor(ctx context.Context, cmdStr string, writer io.Writer, logger logutil.Logger) (io.WriteCloser, error) { + logger.Infof("Compressing using external command: %q", cmdStr) + + cmd, err := prepareExternalCompressionCmd(ctx, cmdStr) + if err != nil { + return nil, vterrors.Wrap(err, "unable to start external command") + } + compressor := &externalCompressor{cmd: cmd} + + cmd.Stdout = writer + + cmdIn, err := cmd.StdinPipe() + if err != nil { + return nil, vterrors.Wrap(err, "cannot create external ompressor stdin pipe") + } + + compressor.stdin = cmdIn + + cmdErr, err := cmd.StderrPipe() + if err != nil { + return nil, vterrors.Wrap(err, "cannot create external ompressor stderr pipe") + } + + if err := cmd.Start(); err != nil { + return nil, vterrors.Wrap(err, "can't start external decompressor") + } + + compressor.wg.Add(1) // we wait for the gorouting to finish when we call Close() on the writer + go scanLinesToLogger("compressor stderr", cmdErr, logger, compressor.wg.Done) + + return compressor, nil +} + +// This returns a reader that reads the compressed input and passes it to the external command to be decompressed. Calls to its +// Read() will return the uncompressed data until EOF. +func newExternalDecompressor(ctx context.Context, cmdStr string, reader io.Reader, logger logutil.Logger) (io.ReadCloser, error) { + logger.Infof("Decompressing using external command: %q", cmdStr) + + cmd, err := prepareExternalCompressionCmd(ctx, cmdStr) + if err != nil { + return nil, vterrors.Wrap(err, "unable to start external command") + } + + decompressor := &externalDecompressor{cmd: cmd} + + cmd.Stdin = reader + + cmdOut, err := cmd.StdoutPipe() + if err != nil { + return nil, vterrors.Wrap(err, "cannot create external decompressor stdout pipe") + } + + decompressor.stdout = cmdOut + + cmdErr, err := cmd.StderrPipe() + if err != nil { + return nil, vterrors.Wrap(err, "cannot create external decompressor stderr pipe") + } + + if err := cmd.Start(); err != nil { + return nil, vterrors.Wrap(err, "can't start external decompressor") + } + + decompressor.wg.Add(1) // we wait for the gorouting to finish when we call Close() on the reader + go scanLinesToLogger("decompressor stderr", cmdErr, logger, decompressor.wg.Done) + + return decompressor, nil +} + +// This is a wrapper to get the right decompressor (see below) based on the extension of the file. +func newBuiltinDecompressorFromExtension(extension, engine string, reader io.Reader, logger logutil.Logger) (decompressor io.ReadCloser, err error) { + // we only infer the engine from the extension is set to "auto", otherwise we use whatever the user selected + if engine == "auto" { + logger.Infof("Builtin decompressor set to auto, checking which engine to decompress based on the extension") + + eng, err := getEngineFromExtension(extension) + if err != nil { + return decompressor, err + } + + engine = eng + } + + return newBuiltinDecompressor(engine, reader, logger) +} + +// This returns a reader that will decompress the underlying provided reader and will use the specified supported engine. +func newBuiltinDecompressor(engine string, reader io.Reader, logger logutil.Logger) (decompressor io.ReadCloser, err error) { + if engine == "pargzip" { + logger.Warningf("engine \"pargzip\" doesn't support decompression, using \"pgzip\" instead") + + engine = "pgzip" + } + + switch engine { + case "pgzip": + d, err := pgzip.NewReader(reader) + if err != nil { + return nil, err + } + + decompressor = d + case "lz4": + decompressor = ioutil.NopCloser(lz4.NewReader(reader)) + case "zstd": + d, err := zstd.NewReader(reader) + if err != nil { + return nil, err + } + + decompressor = d.IOReadCloser() + default: + err = fmt.Errorf("Unkown decompressor engine: %q", engine) + return decompressor, err + } + + logger.Infof("Decompressing backup using built-in engine %q", engine) + + return decompressor, err +} + +// This returns a writer that will compress the data using the specified engine before writing to the underlying writer. +func newBuiltinCompressor(engine string, writer io.Writer, logger logutil.Logger) (compressor io.WriteCloser, err error) { + switch engine { + case "pgzip": + gzip, err := pgzip.NewWriterLevel(writer, *compressionLevel) + if err != nil { + return compressor, vterrors.Wrap(err, "cannot create gzip compressor") + } + + gzip.SetConcurrency(*backupCompressBlockSize, *backupCompressBlocks) + + compressor = gzip + case "pargzip": + gzip := pargzip.NewWriter(writer) + gzip.ChunkSize = *backupCompressBlockSize + gzip.Parallel = *backupCompressBlocks + gzip.CompressionLevel = *compressionLevel + + compressor = gzip + case "lz4": + lz4Writer := lz4.NewWriter(writer).WithConcurrency(*backupCompressBlocks) + lz4Writer.Header = lz4.Header{ + CompressionLevel: *compressionLevel, + } + + compressor = lz4Writer + case "zstd": + zst, err := zstd.NewWriter(writer, zstd.WithEncoderLevel(zstd.EncoderLevel(*compressionLevel))) + if err != nil { + return compressor, vterrors.Wrap(err, "cannot create zstd compressor") + } + + compressor = zst + default: + err = fmt.Errorf("Unkown compressor engine: %q", engine) + return compressor, err + } + + logger.Infof("Compressing backup using built-in engine %q", engine) + + return +} + +// This struct wraps the underlying exec.Cmd and implements the io.WriteCloser interface. +type externalCompressor struct { + cmd *exec.Cmd + stdin io.WriteCloser + wg sync.WaitGroup +} + +func (e *externalCompressor) Write(p []byte) (n int, err error) { + return e.stdin.Write(p) +} + +func (e *externalCompressor) Close() error { + if err := e.stdin.Close(); err != nil { + return err + } + + // wait for the stderr to finish reading as well + e.wg.Wait() + + return e.cmd.Wait() +} + +// This struct wraps the underlying exec.Cmd and implements the io.ReadCloser interface. +type externalDecompressor struct { + cmd *exec.Cmd + stdout io.ReadCloser + wg sync.WaitGroup +} + +func (e *externalDecompressor) Read(p []byte) (n int, err error) { + return e.stdout.Read(p) +} + +func (e *externalDecompressor) Close() error { + // wait for the stderr to finish reading as well + e.wg.Wait() + + // exec.Cmd.Wait() will also close the stdout pipe, so we don't need to call it directly + return e.cmd.Wait() +} diff --git a/go/vt/mysqlctl/compression_test.go b/go/vt/mysqlctl/compression_test.go new file mode 100644 index 00000000000..a9137be79ae --- /dev/null +++ b/go/vt/mysqlctl/compression_test.go @@ -0,0 +1,215 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mysqlctl + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "reflect" + "strings" + "testing" + "time" + + "vitess.io/vitess/go/vt/logutil" +) + +func TestGetExtensionFromEngine(t *testing.T) { + tests := []struct { + engine, extension string + err error + }{ + {"pgzip", ".gz", nil}, + {"pargzip", ".gz", nil}, + {"lz4", ".lz4", nil}, + {"zstd", ".zst", nil}, + {"foobar", "", errUnsupportedCompressionEngine}, + } + + for _, tt := range tests { + t.Run(tt.engine, func(t *testing.T) { + ext, err := getExtensionFromEngine(tt.engine) + // if err != tt.err { + if !errors.Is(err, tt.err) { + t.Errorf("got err: %v; expected: %v", err, tt.err) + } + // } + + if ext != tt.extension { + t.Errorf("got err: %v; expected: %v", ext, tt.extension) + } + }) + } +} + +func TestBuiltinCompressors(t *testing.T) { + data := []byte("foo bar foobar") + + logger := logutil.NewMemoryLogger() + + for _, engine := range []string{"pgzip", "lz4", "zstd"} { + t.Run(engine, func(t *testing.T) { + var compressed, decompressed bytes.Buffer + + reader := bytes.NewReader(data) + + compressor, err := newBuiltinCompressor(engine, &compressed, logger) + if err != nil { + t.Fatal(err) + } + + _, err = io.Copy(compressor, reader) + if err != nil { + t.Error(err) + return + } + compressor.Close() + + decompressor, err := newBuiltinDecompressor(engine, &compressed, logger) + if err != nil { + t.Error(err) + return + } + + _, err = io.Copy(&decompressed, decompressor) + if err != nil { + t.Error(err) + return + } + decompressor.Close() + + if len(data) != len(decompressed.Bytes()) { + t.Errorf("Different size of original (%d bytes) and uncompressed (%d bytes) data", len(data), len(decompressed.Bytes())) + } + + if !reflect.DeepEqual(data, decompressed.Bytes()) { + t.Error("decompressed content differs from the original") + } + }) + } +} + +func TestExternalCompressors(t *testing.T) { + data := []byte("foo bar foobar") + logger := logutil.NewMemoryLogger() + + tests := []struct { + compress, decompress string + }{ + {"gzip", "gzip -d"}, + {"pigz", "pigz -d"}, + {"lz4", "lz4 -d"}, + {"zstd", "zstd -d"}, + {"lzop", "lzop -d"}, + {"bzip2", "bzip2 -d"}, + {"lzma", "lzma -d"}, + } + + for _, tt := range tests { + t.Run(tt.compress, func(t *testing.T) { + var compressed, decompressed bytes.Buffer + + reader := bytes.NewReader(data) + + for _, cmd := range []string{tt.compress, tt.decompress} { + cmdArgs := strings.Split(cmd, " ") + + _, err := validateExternalCmd(cmdArgs[0]) + if err != nil { + t.Skip("Command not available in this host:", err) + } + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + compressor, err := newExternalCompressor(ctx, tt.compress, &compressed, logger) + if err != nil { + t.Error(err) + return + } + + _, err = io.Copy(compressor, reader) + if err != nil { + t.Error(err) + return + } + compressor.Close() + + decompressor, err := newExternalDecompressor(ctx, tt.decompress, &compressed, logger) + if err != nil { + t.Error(err) + return + } + + _, err = io.Copy(&decompressed, decompressor) + if err != nil { + t.Error(err) + return + } + decompressor.Close() + + if len(data) != len(decompressed.Bytes()) { + t.Errorf("Different size of original (%d bytes) and uncompressed (%d bytes) data", len(data), len(decompressed.Bytes())) + } + + if !reflect.DeepEqual(data, decompressed.Bytes()) { + t.Error("decompressed content differs from the original") + } + + }) + } +} + +func TestValidateExternalCmd(t *testing.T) { + tests := []struct { + cmdName string + path string + errStr string + }{ + // this should not find an executable + {"non_existent_cmd", "", "executable file not found"}, + // we expect ls to be on PATH as it is a basic command part of busybox and most containers + {"ls", "ls", ""}, + } + + for i, tt := range tests { + t.Run(fmt.Sprintf("Test #%d", i+1), func(t *testing.T) { + CmdName := tt.cmdName + + path, err := validateExternalCmd(CmdName) + + if tt.path != "" { + if !strings.HasSuffix(path, tt.path) { + t.Errorf("Expected path \"%s\" to include \"%s\"", path, tt.path) + } + } + + if tt.errStr == "" { + if err != nil { + t.Errorf("Expected result \"%v\", got \"%v\"", "", err) + } + } else { + if !strings.Contains(fmt.Sprintf("%v", err), tt.errStr) { + t.Errorf("Expected result \"%v\", got \"%v\"", tt.errStr, err) + } + } + }) + } +} diff --git a/go/vt/mysqlctl/xtrabackupengine.go b/go/vt/mysqlctl/xtrabackupengine.go index 13a612644e3..cb9c3b28a30 100644 --- a/go/vt/mysqlctl/xtrabackupengine.go +++ b/go/vt/mysqlctl/xtrabackupengine.go @@ -26,14 +26,12 @@ import ( "os" "os/exec" "path" + "path/filepath" "regexp" "strings" "sync" "time" - "github.com/klauspost/pgzip" - "github.com/planetscale/pargzip" - "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl/backupstorage" @@ -108,7 +106,16 @@ func (be *XtrabackupEngine) backupFileName() string { fileName += *xtrabackupStreamMode } if *backupStorageCompress { - fileName += ".gz" + if *externalDecompressorCmd != "" { + fileName += *externalCompressorExt + } else { + if ext, err := getExtensionFromEngine(*builtinCompressor); err != nil { + // there is a check for this, but just in case that fails, we set a extension to the file + fileName += ".unknown" + } else { + fileName += ext + } + } } return fileName } @@ -130,6 +137,13 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, params BackupPara if *xtrabackupUser == "" { return false, vterrors.New(vtrpc.Code_INVALID_ARGUMENT, "xtrabackupUser must be specified.") } + + // an extension is required when using an external compressor + if *backupStorageCompress && *externalCompressorCmd != "" && *externalCompressorExt == "" { + return false, vterrors.New(vtrpc.Code_INVALID_ARGUMENT, + "xtrabackup_external_compressor_extension not provided when using an external compressor") + } + // use a mysql connection to detect flavor at runtime conn, err := params.Mysqld.GetDbaConnection(ctx) if conn != nil && err == nil { @@ -147,6 +161,7 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, params BackupPara params.Logger.Infof("Detected MySQL flavor: %v", flavor) backupFileName := be.backupFileName() + params.Logger.Infof("backup file name: %s", backupFileName) numStripes := int(*xtrabackupStripes) // Perform backups in a separate function, so deferred calls to Close() are @@ -279,10 +294,17 @@ func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams // Create the gzip compression pipe, if necessary. if *backupStorageCompress { - compressor := pargzip.NewWriter(writer) - compressor.ChunkSize = *backupCompressBlockSize - compressor.Parallel = *backupCompressBlocks - compressor.CompressionLevel = pargzip.BestSpeed + var compressor io.WriteCloser + + if *externalCompressorCmd != "" { + compressor, err = newExternalCompressor(ctx, *externalCompressorCmd, writer, params.Logger) + } else { + compressor, err = newBuiltinCompressor(*builtinCompressor, writer, params.Logger) + } + if err != nil { + return replicationPosition, vterrors.Wrap(err, "can't create compressor") + } + writer = compressor destCompressors = append(destCompressors, compressor) } @@ -343,7 +365,7 @@ func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams // Close compressor to flush it. After that all data is sent to the buffer. for _, compressor := range destCompressors { if err := compressor.Close(); err != nil { - return replicationPosition, vterrors.Wrap(err, "cannot close gzip compressor") + return replicationPosition, vterrors.Wrap(err, "cannot close compressor") } } @@ -510,6 +532,9 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log baseFileName = be.backupFileName() } + logger.Infof("backup file name: %s", baseFileName) + extension := filepath.Ext(baseFileName) + // Open the source files for reading. srcFiles, err := readStripeFiles(ctx, bh, baseFileName, int(bm.NumStripes), logger) if err != nil { @@ -528,10 +553,17 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log // Create the decompressor if needed. if compressed { - decompressor, err := pgzip.NewReader(reader) + var decompressor io.ReadCloser + + if *externalDecompressorCmd != "" { + decompressor, err = newExternalDecompressor(ctx, *externalDecompressorCmd, reader, logger) + } else { + decompressor, err = newBuiltinDecompressorFromExtension(extension, *builtinDecompressor, reader, logger) + } if err != nil { - return vterrors.Wrap(err, "can't create gzip decompressor") + return vterrors.Wrap(err, "can't create decompressor") } + srcDecompressors = append(srcDecompressors, decompressor) reader = decompressor } @@ -541,7 +573,7 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log defer func() { for _, decompressor := range srcDecompressors { if cerr := decompressor.Close(); cerr != nil { - logger.Errorf("failed to close gzip decompressor: %v", cerr) + logger.Errorf("failed to close decompressor: %v", cerr) } } }()