Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for additional compressors/decompressors #7978

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ require (
github.com/opencontainers/image-spec v1.0.1 // indirect
github.com/opencontainers/runc v1.0.1 // indirect
github.com/pelletier/go-toml v1.9.3 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,8 @@ github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
Expand Down Expand Up @@ -859,6 +861,8 @@ github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCko
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/philhofer/fwd v1.0.0 h1:UbZqGr5Y38ApvM/V/jEljVxwocdweyH+vmYvRPBnbqQ=
github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM=
github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pires/go-proxyproto v0.6.1 h1:EBupykFmo22SDjv4fQVQd2J9NOoLPmyZA/15ldOGkPw=
github.com/pires/go-proxyproto v0.6.1/go.mod h1:Odh9VFOZJCf9G8cLW5o435Xf1J95Jw9Gw5rnCjcwzAY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
61 changes: 38 additions & 23 deletions go/vt/mysqlctl/builtinbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ import (
"sync/atomic"
"time"

"github.com/klauspost/pgzip"
"github.com/planetscale/pargzip"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/concurrency"
Expand Down Expand Up @@ -78,6 +75,9 @@ type builtinBackupManifest struct {
// BackupManifest is an anonymous embedding of the base manifest struct.
BackupManifest

// CompressionEngine stores which compression engine was used to originally compress the files.
CompressionEngine string `json:",omitempty"`

// FileEntries contains all the files in the backup
FileEntries []FileEntry

Expand Down Expand Up @@ -351,9 +351,10 @@ func (be *BuiltinBackupEngine) backupFiles(ctx context.Context, params BackupPar
},

// Builtin-specific fields
FileEntries: fes,
TransformHook: *backupStorageHook,
SkipCompress: !*backupStorageCompress,
FileEntries: fes,
TransformHook: *backupStorageHook,
SkipCompress: !*backupStorageCompress,
CompressionEngine: *builtinCompressor,
}
data, err := json.MarshalIndent(bm, "", " ")
if err != nil {
Expand Down Expand Up @@ -498,13 +499,19 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara
}

// Create the gzip compression pipe, if necessary.
var gzip *pargzip.Writer
var compressor io.WriteCloser
if *backupStorageCompress {
gzip = pargzip.NewWriter(writer)
gzip.ChunkSize = *backupCompressBlockSize
gzip.Parallel = *backupCompressBlocks
gzip.CompressionLevel = pargzip.BestSpeed
writer = gzip

if *externalCompressorCmd != "" {
compressor, err = newExternalCompressor(ctx, *externalCompressorCmd, writer, params.Logger)
} else {
compressor, err = newBuiltinCompressor(*builtinCompressor, writer, params.Logger)
}
if err != nil {
return vterrors.Wrap(err, "can't create compressor")
}

writer = compressor
}

// Copy from the source file to writer (optional gzip,
Expand All @@ -515,9 +522,9 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara
}

// Close gzip to flush it, after that all data is sent to writer.
if gzip != nil {
if err = gzip.Close(); err != nil {
return vterrors.Wrap(err, "cannot close gzip")
if compressor != nil {
if err = compressor.Close(); err != nil {
return vterrors.Wrap(err, "cannot close compressor")
}
}

Expand Down Expand Up @@ -599,7 +606,7 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP
// And restore the file.
name := fmt.Sprintf("%v", i)
params.Logger.Infof("Copying file %v: %v", name, fes[i].Name)
err := be.restoreFile(ctx, params, bh, &fes[i], bm.TransformHook, !bm.SkipCompress, name)
err := be.restoreFile(ctx, params, bh, &fes[i], bm.TransformHook, !bm.SkipCompress, bm.CompressionEngine, name)
if err != nil {
rec.RecordError(vterrors.Wrapf(err, "can't restore file %v to %v", name, fes[i].Name))
}
Expand All @@ -610,7 +617,7 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP
}

// restoreFile restores an individual file.
func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle, fe *FileEntry, transformHook string, compress bool, name string) (finalErr error) {
func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle, fe *FileEntry, transformHook string, compress bool, compressionEngine string, name string) (finalErr error) {
// Open the source file for reading.
source, err := bh.ReadFile(ctx, name)
if err != nil {
Expand Down Expand Up @@ -653,21 +660,29 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa

// Create the uncompresser if needed.
if compress {
gz, err := pgzip.NewReader(reader)
var decompressor io.ReadCloser

if *externalDecompressorCmd != "" {
decompressor, err = newExternalDecompressor(ctx, *externalDecompressorCmd, reader, params.Logger)
} else {
decompressor, err = newBuiltinDecompressor(compressionEngine, reader, params.Logger)
}
if err != nil {
return vterrors.Wrap(err, "can't open gzip decompressor")
return vterrors.Wrap(err, "can't create decompressor")
}

defer func() {
if cerr := gz.Close(); cerr != nil {
if cerr := decompressor.Close(); cerr != nil {
params.Logger.Errorf("failed to close decompressor: %v", cerr)
if finalErr != nil {
// We already have an error, just log this one.
log.Errorf("failed to close gzip decompressor %v: %v", name, cerr)
log.Errorf("failed to close decompressor %v: %v", name, cerr)
} else {
finalErr = vterrors.Wrap(err, "failed to close gzip decompressor")
finalErr = vterrors.Wrap(cerr, "failed to close decompressor")
}
}
}()
reader = gz
reader = decompressor
}

// Copy the data. Will also write to the hasher.
Expand Down
Loading