diff --git a/go.mod b/go.mod index 23c3ac15558..19d3687ecd3 100644 --- a/go.mod +++ b/go.mod @@ -76,6 +76,7 @@ require ( github.com/philhofer/fwd v1.0.0 // indirect github.com/pires/go-proxyproto v0.0.0-20191211124218-517ecdf5bb2b github.com/pkg/errors v0.9.1 + github.com/planetscale/pargzip v0.0.0-20201116224723-90c7fc03ea8a github.com/prometheus/client_golang v1.4.1 github.com/prometheus/common v0.9.1 github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 diff --git a/go.sum b/go.sum index 8060aa7b6c0..02e7d15d91f 100644 --- a/go.sum +++ b/go.sum @@ -524,6 +524,8 @@ github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/planetscale/pargzip v0.0.0-20201116224723-90c7fc03ea8a h1:y0OpQ4+5tKxeh9+H+2cVgASl9yMZYV9CILinKOiKafA= +github.com/planetscale/pargzip v0.0.0-20201116224723-90c7fc03ea8a/go.mod h1:GJFUzQuXIoB2Kjn1ZfDhJr/42D5nWOqRcIQVgCxTuIE= github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index a1db9af5107..b6ef3395441 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -29,6 +29,7 @@ import ( "time" "github.com/klauspost/pgzip" + "github.com/planetscale/pargzip" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/concurrency" @@ -407,13 +408,12 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara } // Create the gzip compression pipe, if necessary. - var gzip *pgzip.Writer + var gzip *pargzip.Writer if *backupStorageCompress { - gzip, err = pgzip.NewWriterLevel(writer, pgzip.BestSpeed) - if err != nil { - return vterrors.Wrap(err, "cannot create gziper") - } - gzip.SetConcurrency(*backupCompressBlockSize, *backupCompressBlocks) + gzip = pargzip.NewWriter(writer) + gzip.ChunkSize = *backupCompressBlockSize + gzip.Parallel = *backupCompressBlocks + gzip.CompressionLevel = pargzip.BestSpeed writer = gzip } diff --git a/go/vt/mysqlctl/xtrabackupengine.go b/go/vt/mysqlctl/xtrabackupengine.go index 9da4d74369c..79b73163987 100644 --- a/go/vt/mysqlctl/xtrabackupengine.go +++ b/go/vt/mysqlctl/xtrabackupengine.go @@ -32,6 +32,7 @@ import ( "time" "github.com/klauspost/pgzip" + "github.com/planetscale/pargzip" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl/backupstorage" @@ -269,7 +270,7 @@ func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams destWriters := []io.Writer{} destBuffers := []*bufio.Writer{} - destCompressors := []*pgzip.Writer{} + destCompressors := []io.WriteCloser{} for _, file := range destFiles { buffer := bufio.NewWriterSize(file, writerBufferSize) destBuffers = append(destBuffers, buffer) @@ -277,11 +278,10 @@ func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams // Create the gzip compression pipe, if necessary. if *backupStorageCompress { - compressor, err := pgzip.NewWriterLevel(writer, pgzip.BestSpeed) - if err != nil { - return replicationPosition, vterrors.Wrap(err, "cannot create gzip compressor") - } - compressor.SetConcurrency(*backupCompressBlockSize, *backupCompressBlocks) + compressor := pargzip.NewWriter(writer) + compressor.ChunkSize = *backupCompressBlockSize + compressor.Parallel = *backupCompressBlocks + compressor.CompressionLevel = pargzip.BestSpeed writer = compressor destCompressors = append(destCompressors, compressor) } @@ -519,7 +519,7 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log }() srcReaders := []io.Reader{} - srcDecompressors := []*pgzip.Reader{} + srcDecompressors := []io.ReadCloser{} for _, file := range srcFiles { reader := io.Reader(file) @@ -733,7 +733,7 @@ func copyToStripes(writers []io.Writer, reader io.Reader, blockSize int64) (writ } // Read blocks from source and round-robin them to destination writers. - // Since we put a buffer in front of the destination file, and pgzip has its + // Since we put a buffer in front of the destination file, and pargzip has its // own buffer as well, we are writing into a buffer either way (whether a // compressor is in the chain or not). That means these writes should not // block often, so we shouldn't need separate goroutines here.