From 325652678217a80b26551b35faa36baa1ee3de53 Mon Sep 17 00:00:00 2001 From: Rameez Sajwani Date: Thu, 7 Jul 2022 12:16:07 -0700 Subject: [PATCH] Backup/Restore: add support for external compressors and decompressors (#10558) * change to support an external decompressor Signed-off-by: Renan Rangel Signed-off-by: Rameez Sajwani * add external compressor support + builtin additional compressors Signed-off-by: Renan Rangel Signed-off-by: Rameez Sajwani * wrap external compressor/decompressor Signed-off-by: Renan Rangel Signed-off-by: Rameez Sajwani * go mod tidy + comments Signed-off-by: Renan Rangel Signed-off-by: Rameez Sajwani * add copyright notices Signed-off-by: Renan Rangel Signed-off-by: Rameez Sajwani * add support for builtin engine Signed-off-by: Renan Rangel Signed-off-by: Rameez Sajwani * Adding test case for buckup compression Signed-off-by: Rameez Sajwani * Fixing unit test and run mod tidy Signed-off-by: Rameez Sajwani * Removing unwanted unit tests Signed-off-by: Rameez Sajwani * Increase timeout of backup tests Signed-off-by: Rameez Sajwani * fixing linter errors Signed-off-by: Rameez Sajwani * Change test logic to accomodate running selective tests Signed-off-by: Rameez Sajwani * removing lint warning Signed-off-by: Rameez Sajwani * fixing test failure Signed-off-by: Rameez Sajwani * Removing un-necessary test Signed-off-by: Rameez Sajwani * Fixing code review feeback Signed-off-by: Rameez Sajwani * Change builtinEngine to consider 'auto' decompressor Signed-off-by: Rameez Sajwani * fixing Upgrade/Downgrade test Signed-off-by: Rameez Sajwani * Fix type & add summary under release notes Signed-off-by: Rameez Sajwani * Fixing typos in summary Signed-off-by: Rameez Sajwani * Fixing flag name typos Signed-off-by: Rameez Sajwani Co-authored-by: Renan Rangel Co-authored-by: Renan Rangel --- doc/releasenotes/15_0_0_summary.md | 28 ++ go.mod | 11 +- go.sum | 26 +- go/flags/endtoend/vttablet.txt | 12 + .../backup/mysqlctld/backup_mysqlctld_test.go | 21 +- .../backup/vtctlbackup/backup_test.go | 23 +- .../backup/vtctlbackup/backup_utils.go | 67 ++++- .../backup/xtrabackup/xtrabackup_test.go | 23 +- .../xtrabackup_stream_test.go | 21 +- go/vt/mysqlctl/builtinbackupengine.go | 70 +++-- go/vt/mysqlctl/compression.go | 283 ++++++++++++++++++ go/vt/mysqlctl/compression_test.go | 196 ++++++++++++ go/vt/mysqlctl/xtrabackupengine.go | 56 +++- go/vt/wrangler/testlib/backup_test.go | 62 +++- test/config.json | 4 +- 15 files changed, 838 insertions(+), 65 deletions(-) create mode 100644 go/vt/mysqlctl/compression.go create mode 100644 go/vt/mysqlctl/compression_test.go diff --git a/doc/releasenotes/15_0_0_summary.md b/doc/releasenotes/15_0_0_summary.md index 8bfcc2a4410..244786b743a 100644 --- a/doc/releasenotes/15_0_0_summary.md +++ b/doc/releasenotes/15_0_0_summary.md @@ -54,6 +54,34 @@ Please see the VDiff2 [documentation](https://vitess.io/docs/15.0/reference/vrep ### New command line flags and behavior +#### Support for additional compressors and decompressors during backup & restore +Backup/Restore now allow you many more options for compression and decompression instead of relying on the default compressor(pgzip). +There are some built-in compressors which you can use out-of-the-box. Users will need to evaluate which option works best for their +use-case. Here are the flags that control this feature + +- --builtin-compressor +- --builtin-decompressor +- --external-compressor +- --external-decompressor +- --external-compressor-extension +- --compression-level + +builtin compressor as of today supports the following options +- pgzip +- pargzip +- lz4 +- zstd + +If you want to use any of the builtin compressors, simply set one of the above values for `--builtin-compressor`. You don't need to set +the `--builtin-decompressor` flag in this case as we infer it automatically from the MANIFEST file. The default value for +`--builtin-decompressor` is `auto`. + +If you would like to use a custom command or external tool for compression/decompression then you need to provide the full command with +arguments to the `--external-compressor` and `--external-decompressor` flags. `--external-compressor-extension` flag also needs to be provided +so that compressed files are created with the correct extension. There is no need to override `--builtin-compressor` and `--builtin-decompressor` +when using an external compressor/decompressor. Please note that if you want the current behavior then you don't need to change anything +in these flags. You can read more about backup & restore [here] (https://vitess.io/docs/15.0/user-guides/operating-vitess/backup-and-restore/). + ### Online DDL changes #### Concurrent vitess migrations diff --git a/go.mod b/go.mod index d329c4d6f8d..37644ea0a5c 100644 --- a/go.mod +++ b/go.mod @@ -26,8 +26,8 @@ require ( github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b github.com/golang/mock v1.5.0 github.com/golang/protobuf v1.5.2 - github.com/golang/snappy v0.0.1 - github.com/google/go-cmp v0.5.6 + github.com/golang/snappy v0.0.3 + github.com/google/go-cmp v0.5.7 github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 github.com/google/uuid v1.3.0 github.com/googleapis/gnostic v0.4.1 // indirect @@ -46,7 +46,7 @@ require ( github.com/icrowley/fake v0.0.0-20180203215853-4178557ae428 github.com/imdario/mergo v0.3.12 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect - github.com/klauspost/compress v1.11.13 // indirect + github.com/klauspost/compress v1.13.0 github.com/klauspost/pgzip v1.2.4 github.com/krishicks/yaml-patch v0.0.10 github.com/magiconair/properties v1.8.5 @@ -63,6 +63,7 @@ require ( github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c // indirect github.com/patrickmn/go-cache v2.1.0+incompatible github.com/philhofer/fwd v1.0.0 // indirect + github.com/pierrec/lz4 v2.6.1+incompatible github.com/pires/go-proxyproto v0.6.1 github.com/pkg/errors v0.9.1 // indirect github.com/planetscale/pargzip v0.0.0-20201116224723-90c7fc03ea8a @@ -90,6 +91,7 @@ require ( go.etcd.io/etcd/api/v3 v3.5.0 go.etcd.io/etcd/client/pkg/v3 v3.5.0 go.etcd.io/etcd/client/v3 v3.5.0 + golang.org/x/crypto v0.0.0-20220507011949-2cf3adece122 // indirect golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 golang.org/x/mod v0.5.1 // indirect golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 @@ -136,6 +138,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/fatih/color v1.9.0 // indirect github.com/felixge/httpsnoop v1.0.1 // indirect + github.com/frankban/quicktest v1.14.3 // indirect github.com/go-logr/logr v0.2.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect @@ -150,7 +153,6 @@ require ( github.com/jessevdk/go-flags v1.4.0 // indirect github.com/json-iterator/go v1.1.11 // indirect github.com/jstemmer/go-junit-report v0.9.1 // indirect - github.com/kr/pretty v0.2.1 // indirect github.com/mattn/go-colorable v0.1.6 // indirect github.com/mattn/go-ieproxy v0.0.0-20190702010315-6dee0af9227d // indirect github.com/mattn/go-isatty v0.0.12 // indirect @@ -180,7 +182,6 @@ require ( go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.17.0 // indirect - golang.org/x/crypto v0.0.0-20220507011949-2cf3adece122 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/appengine v1.6.7 // indirect gopkg.in/inf.v0 v0.9.1 // indirect diff --git a/go.sum b/go.sum index dee7a170245..dd899960220 100644 --- a/go.sum +++ b/go.sum @@ -179,6 +179,7 @@ github.com/corpix/uarand v0.1.1/go.mod h1:SFKZvkcRoLqVRFZ4u25xPmp6m9ktANfbpXZ7SJ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/cyphar/filepath-securejoin v0.2.3 h1:YX6ebbZCZP7VkM3scTTokDgBL2TY741X51MTk3ycuNI= github.com/cyphar/filepath-securejoin v0.2.3/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4= github.com/daaku/go.zipexe v1.0.0 h1:VSOgZtH418pH9L16hC/JrgSNJbbAL26pj7lmD1+CGdY= @@ -218,6 +219,8 @@ github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8S github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= +github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE= +github.com/frankban/quicktest v1.14.3/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= @@ -327,8 +330,8 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= -github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -343,8 +346,8 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= -github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= +github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/go-github/v27 v27.0.4/go.mod h1:/0Gr8pJ55COkmv+S/yPKCczSkUPIM/LnFyubufRNIS0= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -493,8 +496,8 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8 github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.11.13 h1:eSvu8Tmq6j2psUJqJrLcWH6K3w5Dwc+qipbaA6eVEN4= -github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/klauspost/compress v1.13.0 h1:2T7tUoQrQT+fQWdaY5rjWztFGAFwbGD04iPJg90ZiOs= +github.com/klauspost/compress v1.13.0/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/pgzip v1.2.4 h1:TQ7CNpYKovDOmqzRHKxJh0BeaBI7UdQZYc6p7pMQh1A= github.com/klauspost/pgzip v1.2.4/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -503,12 +506,13 @@ github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= -github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/krishicks/yaml-patch v0.0.10 h1:H4FcHpnNwVmw8u0MjPRjWyIXtco6zM2F78t+57oNM3E= github.com/krishicks/yaml-patch v0.0.10/go.mod h1:Sm5TchwZS6sm7RJoyg87tzxm2ZcKzdRE4Q7TjNhPrME= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= @@ -619,6 +623,8 @@ github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCko github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/philhofer/fwd v1.0.0 h1:UbZqGr5Y38ApvM/V/jEljVxwocdweyH+vmYvRPBnbqQ= github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= +github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= +github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pires/go-proxyproto v0.6.1 h1:EBupykFmo22SDjv4fQVQd2J9NOoLPmyZA/15ldOGkPw= github.com/pires/go-proxyproto v0.6.1/go.mod h1:Odh9VFOZJCf9G8cLW5o435Xf1J95Jw9Gw5rnCjcwzAY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -668,6 +674,8 @@ github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqn github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index f711cae2617..db8b824891f 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -57,6 +57,10 @@ Usage of vttablet: PITR restore parameter: TLS server name (common name) to verify against for the binlog server we are connecting to (If not set: use the hostname or IP supplied in -binlog_host). --binlog_user string PITR restore parameter: username of binlog server. + --builtin-compressor string + builtin compressor engine to use (default pgzip) + --builtin-decompressor string + builtin decompressor engine to use (default auto) --builtinbackup_mysqld_timeout duration how long to wait for mysqld to shutdown at the start of the backup (default 10m0s) --builtinbackup_progress duration @@ -65,6 +69,8 @@ Usage of vttablet: catch and ignore SIGPIPE on stdout and stderr if specified --ceph_backup_storage_config string Path to JSON config file for ceph backup storage (default ceph_backup_config.json) + --compression-level int + what level to pass to the compressor (default 1) --consul_auth_static_file string JSON File to read the topos/tokens from. --cpu_profile string @@ -401,6 +407,12 @@ Usage of vttablet: if this flag is true, vttablet will fail to start if a valid tableacl config does not exist --enforce_strict_trans_tables If true, vttablet requires MySQL to run with STRICT_TRANS_TABLES or STRICT_ALL_TABLES on. It is recommended to not turn this flag off. Otherwise MySQL may alter your supplied values before saving them to the database. (default true) + --external-compressor string + command with arguments to use when compressing a backup + --external-compressor-extension string + extension to use when using an external compressor + --external-decompressor string + command with arguments to use when decompressing a backup --file_backup_storage_root string root directory for the file backup storage --filecustomrules string diff --git a/go/test/endtoend/backup/mysqlctld/backup_mysqlctld_test.go b/go/test/endtoend/backup/mysqlctld/backup_mysqlctld_test.go index 2ee11103c72..177c1a8b8ff 100644 --- a/go/test/endtoend/backup/mysqlctld/backup_mysqlctld_test.go +++ b/go/test/endtoend/backup/mysqlctld/backup_mysqlctld_test.go @@ -19,10 +19,29 @@ package mysqlctld import ( "testing" + "vitess.io/vitess/go/vt/mysqlctl" + backup "vitess.io/vitess/go/test/endtoend/backup/vtctlbackup" ) // TestBackupMysqlctld - tests the backup using mysqlctld. func TestBackupMysqlctld(t *testing.T) { - backup.TestBackup(t, backup.Mysqlctld, "", 0) + backup.TestBackup(t, backup.Mysqlctld, "", 0, nil, nil) +} + +func TestBackupMysqlctldWithlz4Compression(t *testing.T) { + defer setDefaultCompressionFlag() + cDetails := &backup.CompressionDetails{ + BuiltinCompressor: "lz4", + } + + backup.TestBackup(t, backup.Mysqlctld, "", 0, cDetails, []string{"TestReplicaBackup", "TestPrimaryBackup"}) +} + +func setDefaultCompressionFlag() { + *mysqlctl.BuiltinCompressor = "pgzip" + *mysqlctl.BuiltinDecompressor = "auto" + *mysqlctl.ExternalCompressorCmd = "" + *mysqlctl.ExternalCompressorExt = "" + *mysqlctl.ExternalDecompressorCmd = "" } diff --git a/go/test/endtoend/backup/vtctlbackup/backup_test.go b/go/test/endtoend/backup/vtctlbackup/backup_test.go index 6f233eafeda..00e51c435e6 100644 --- a/go/test/endtoend/backup/vtctlbackup/backup_test.go +++ b/go/test/endtoend/backup/vtctlbackup/backup_test.go @@ -18,9 +18,30 @@ package vtctlbackup import ( "testing" + + "vitess.io/vitess/go/vt/mysqlctl" ) // TestBackupMain - main tests backup using vtctl commands func TestBackupMain(t *testing.T) { - TestBackup(t, Backup, "", 0) + TestBackup(t, Backup, "", 0, nil, nil) +} + +func TestBackupMainWithZstdCompression(t *testing.T) { + defer setDefaultCompressionFlag() + cDetails := &CompressionDetails{ + ExternalCompressorCmd: "zstd", + ExternalCompressorExt: ".zst", + ExternalDecompressorCmd: "zstd -d", + } + + TestBackup(t, Backup, "", 0, cDetails, []string{"TestReplicaBackup", "TestPrimaryBackup"}) +} + +func setDefaultCompressionFlag() { + *mysqlctl.BuiltinCompressor = "pgzip" + *mysqlctl.BuiltinDecompressor = "auto" + *mysqlctl.ExternalCompressorCmd = "" + *mysqlctl.ExternalCompressorExt = "" + *mysqlctl.ExternalDecompressorCmd = "" } diff --git a/go/test/endtoend/backup/vtctlbackup/backup_utils.go b/go/test/endtoend/backup/vtctlbackup/backup_utils.go index c1e629243f6..19c73659d5f 100644 --- a/go/test/endtoend/backup/vtctlbackup/backup_utils.go +++ b/go/test/endtoend/backup/vtctlbackup/backup_utils.go @@ -28,6 +28,9 @@ import ( "testing" "time" + "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/proto/topodata" @@ -78,8 +81,16 @@ var ( ) Engine=InnoDB` ) +type CompressionDetails struct { + BuiltinCompressor string + BuiltinDecompressor string + ExternalCompressorCmd string + ExternalCompressorExt string + ExternalDecompressorCmd string +} + // LaunchCluster : starts the cluster as per given params. -func LaunchCluster(setupType int, streamMode string, stripes int) (int, error) { +func LaunchCluster(setupType int, streamMode string, stripes int, cDetails *CompressionDetails) (int, error) { localCluster = cluster.NewCluster(cell, hostname) // Start topo server @@ -134,6 +145,8 @@ func LaunchCluster(setupType int, streamMode string, stripes int) (int, error) { commonTabletArg = append(commonTabletArg, xtrabackupArgs...) } + commonTabletArg = append(commonTabletArg, getCompressorArgs(cDetails)...) + var mysqlProcs []*exec.Cmd for i := 0; i < 3; i++ { tabletType := "replica" @@ -206,13 +219,40 @@ func LaunchCluster(setupType int, streamMode string, stripes int) (int, error) { return 0, nil } +func getCompressorArgs(cDetails *CompressionDetails) []string { + var args []string + + if cDetails == nil { + return args + } + + if cDetails.BuiltinCompressor != "" { + args = append(args, fmt.Sprintf("--builtin-compressor=%s", cDetails.BuiltinCompressor)) + } + if cDetails.BuiltinDecompressor != "" { + args = append(args, fmt.Sprintf("--builtin-decompressor=%s", cDetails.BuiltinDecompressor)) + } + if cDetails.ExternalCompressorCmd != "" { + args = append(args, fmt.Sprintf("--external-compressor=%s", cDetails.ExternalCompressorCmd)) + } + if cDetails.ExternalCompressorExt != "" { + args = append(args, fmt.Sprintf("--external-compressor-extension=%s", cDetails.ExternalCompressorExt)) + } + if cDetails.ExternalDecompressorCmd != "" { + args = append(args, fmt.Sprintf("--external-decompressor=%s", cDetails.ExternalDecompressorCmd)) + } + + return args + +} + // TearDownCluster shuts down all cluster processes func TearDownCluster() { localCluster.Teardown() } // TestBackup runs all the backup tests -func TestBackup(t *testing.T, setupType int, streamMode string, stripes int) { +func TestBackup(t *testing.T, setupType int, streamMode string, stripes int, cDetails *CompressionDetails, runSpecific []string) error { testMethods := []struct { name string @@ -233,7 +273,7 @@ func TestBackup(t *testing.T, setupType int, streamMode string, stripes int) { { name: "TestPrimaryBackup", method: primaryBackup, - }, // + }, { name: "TestPrimaryReplicaSameBackup", method: primaryReplicaSameBackup, @@ -253,9 +293,8 @@ func TestBackup(t *testing.T, setupType int, streamMode string, stripes int) { } defer cluster.PanicHandler(t) - // setup cluster for the testing - code, err := LaunchCluster(setupType, streamMode, stripes) + code, err := LaunchCluster(setupType, streamMode, stripes, cDetails) require.Nilf(t, err, "setup failed with status code %d", code) // Teardown the cluster @@ -264,9 +303,23 @@ func TestBackup(t *testing.T, setupType int, streamMode string, stripes int) { // Run all the backup tests for _, test := range testMethods { - t.Run(test.name, test.method) + if len(runSpecific) > 0 && !isRegistered(test.name, runSpecific) { + continue + } + if retVal := t.Run(test.name, test.method); !retVal { + return vterrors.Errorf(vtrpc.Code_UNKNOWN, "test failure: %s", test.name) + } } + return nil +} +func isRegistered(name string, runlist []string) bool { + for _, f := range runlist { + if f == name { + return true + } + } + return false } type restoreMethod func(t *testing.T, tablet *cluster.Vttablet) @@ -299,7 +352,7 @@ func primaryBackup(t *testing.T) { require.Nil(t, err) // We'll restore this on the primary later to test restores using a backup timestamp - firstBackupTimestamp := time.Now().Format(mysqlctl.BackupTimestampFormat) + firstBackupTimestamp := time.Now().UTC().Format(mysqlctl.BackupTimestampFormat) backups := localCluster.VerifyBackupCount(t, shardKsName, 1) assert.Contains(t, backups[0], primary.Alias) diff --git a/go/test/endtoend/backup/xtrabackup/xtrabackup_test.go b/go/test/endtoend/backup/xtrabackup/xtrabackup_test.go index 7dbef9df25e..20ff7841506 100644 --- a/go/test/endtoend/backup/xtrabackup/xtrabackup_test.go +++ b/go/test/endtoend/backup/xtrabackup/xtrabackup_test.go @@ -19,10 +19,31 @@ package vtctlbackup import ( "testing" + "vitess.io/vitess/go/vt/mysqlctl" + backup "vitess.io/vitess/go/test/endtoend/backup/vtctlbackup" ) // TestXtraBackup - tests the backup using xtrabackup func TestXtrabackup(t *testing.T) { - backup.TestBackup(t, backup.XtraBackup, "tar", 0) + backup.TestBackup(t, backup.XtraBackup, "tar", 0, nil, nil) +} + +func TestXtrabackWithZstdCompression(t *testing.T) { + defer setDefaultCompressionFlag() + cDetails := &backup.CompressionDetails{ + ExternalCompressorCmd: "zstd", + ExternalCompressorExt: ".zst", + ExternalDecompressorCmd: "zstd -d", + } + + backup.TestBackup(t, backup.XtraBackup, "tar", 0, cDetails, []string{"TestReplicaBackup"}) +} + +func setDefaultCompressionFlag() { + *mysqlctl.BuiltinCompressor = "pgzip" + *mysqlctl.BuiltinDecompressor = "auto" + *mysqlctl.ExternalCompressorCmd = "" + *mysqlctl.ExternalCompressorExt = "" + *mysqlctl.ExternalDecompressorCmd = "" } diff --git a/go/test/endtoend/backup/xtrabackupstream/xtrabackup_stream_test.go b/go/test/endtoend/backup/xtrabackupstream/xtrabackup_stream_test.go index 99eec5a8c53..2e41c6bee5a 100644 --- a/go/test/endtoend/backup/xtrabackupstream/xtrabackup_stream_test.go +++ b/go/test/endtoend/backup/xtrabackupstream/xtrabackup_stream_test.go @@ -19,10 +19,29 @@ package vtctlbackup import ( "testing" + "vitess.io/vitess/go/vt/mysqlctl" + backup "vitess.io/vitess/go/test/endtoend/backup/vtctlbackup" ) // TestXtrabackupStream - tests the backup using xtrabackup with xbstream mode func TestXtrabackupStream(t *testing.T) { - backup.TestBackup(t, backup.XtraBackup, "xbstream", 8) + backup.TestBackup(t, backup.XtraBackup, "xbstream", 8, nil, nil) +} + +func TestXtrabackupStreamWithlz4Compression(t *testing.T) { + defer setDefaultCompressionFlag() + cDetails := &backup.CompressionDetails{ + BuiltinCompressor: "lz4", + } + + backup.TestBackup(t, backup.XtraBackup, "xbstream", 8, cDetails, []string{"TestReplicaBackup"}) +} + +func setDefaultCompressionFlag() { + *mysqlctl.BuiltinCompressor = "pgzip" + *mysqlctl.BuiltinDecompressor = "auto" + *mysqlctl.ExternalCompressorCmd = "" + *mysqlctl.ExternalCompressorExt = "" + *mysqlctl.ExternalDecompressorCmd = "" } diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index 6fbadf3fea4..aa1fdec4d8d 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -32,9 +32,6 @@ import ( "sync/atomic" "time" - "github.com/klauspost/pgzip" - "github.com/planetscale/pargzip" - "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/concurrency" @@ -78,6 +75,9 @@ type builtinBackupManifest struct { // BackupManifest is an anonymous embedding of the base manifest struct. BackupManifest + // CompressionEngine stores which compression engine was used to originally compress the files. + CompressionEngine string `json:",omitempty"` + // FileEntries contains all the files in the backup FileEntries []FileEntry @@ -351,9 +351,10 @@ func (be *BuiltinBackupEngine) backupFiles(ctx context.Context, params BackupPar }, // Builtin-specific fields - FileEntries: fes, - TransformHook: *backupStorageHook, - SkipCompress: !*backupStorageCompress, + FileEntries: fes, + TransformHook: *backupStorageHook, + SkipCompress: !*backupStorageCompress, + CompressionEngine: *BuiltinCompressor, } data, err := json.MarshalIndent(bm, "", " ") if err != nil { @@ -498,13 +499,19 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara } // Create the gzip compression pipe, if necessary. - var gzip *pargzip.Writer + var compressor io.WriteCloser if *backupStorageCompress { - gzip = pargzip.NewWriter(writer) - gzip.ChunkSize = *backupCompressBlockSize - gzip.Parallel = *backupCompressBlocks - gzip.CompressionLevel = pargzip.BestSpeed - writer = gzip + + if *ExternalCompressorCmd != "" { + compressor, err = newExternalCompressor(ctx, *ExternalCompressorCmd, writer, params.Logger) + } else { + compressor, err = newBuiltinCompressor(*BuiltinCompressor, writer, params.Logger) + } + if err != nil { + return vterrors.Wrap(err, "can't create compressor") + } + + writer = compressor } // Copy from the source file to writer (optional gzip, @@ -515,9 +522,9 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara } // Close gzip to flush it, after that all data is sent to writer. - if gzip != nil { - if err = gzip.Close(); err != nil { - return vterrors.Wrap(err, "cannot close gzip") + if compressor != nil { + if err = compressor.Close(); err != nil { + return vterrors.Wrap(err, "cannot close compressor") } } @@ -599,7 +606,16 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP // And restore the file. name := fmt.Sprintf("%v", i) params.Logger.Infof("Copying file %v: %v", name, fes[i].Name) - err := be.restoreFile(ctx, params, bh, &fes[i], bm.TransformHook, !bm.SkipCompress, name) + // For backward compatibility. Incase if Manifest is from N-1 binary + // then we assign the default value of compressionEngine. + if bm.CompressionEngine == "" { + bm.CompressionEngine = *BuiltinCompressor + } + var decompEngine = bm.CompressionEngine + if *BuiltinDecompressor != "auto" { + decompEngine = *BuiltinDecompressor + } + err := be.restoreFile(ctx, params, bh, &fes[i], bm.TransformHook, !bm.SkipCompress, decompEngine, name) if err != nil { rec.RecordError(vterrors.Wrapf(err, "can't restore file %v to %v", name, fes[i].Name)) } @@ -610,7 +626,7 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP } // restoreFile restores an individual file. -func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle, fe *FileEntry, transformHook string, compress bool, name string) (finalErr error) { +func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle, fe *FileEntry, transformHook string, compress bool, deCompressionEngine string, name string) (finalErr error) { // Open the source file for reading. source, err := bh.ReadFile(ctx, name) if err != nil { @@ -653,21 +669,29 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa // Create the uncompresser if needed. if compress { - gz, err := pgzip.NewReader(reader) + var decompressor io.ReadCloser + + if *ExternalDecompressorCmd != "" { + decompressor, err = newExternalDecompressor(ctx, *ExternalDecompressorCmd, reader, params.Logger) + } else { + decompressor, err = newBuiltinDecompressor(deCompressionEngine, reader, params.Logger) + } if err != nil { - return vterrors.Wrap(err, "can't open gzip decompressor") + return vterrors.Wrap(err, "can't create decompressor") } + defer func() { - if cerr := gz.Close(); cerr != nil { + if cerr := decompressor.Close(); cerr != nil { + params.Logger.Errorf("failed to close decompressor: %v", cerr) if finalErr != nil { // We already have an error, just log this one. - log.Errorf("failed to close gzip decompressor %v: %v", name, cerr) + log.Errorf("failed to close decompressor %v: %v", name, cerr) } else { - finalErr = vterrors.Wrap(err, "failed to close gzip decompressor") + finalErr = vterrors.Wrap(cerr, "failed to close decompressor") } } }() - reader = gz + reader = decompressor } // Copy the data. Will also write to the hasher. diff --git a/go/vt/mysqlctl/compression.go b/go/vt/mysqlctl/compression.go new file mode 100644 index 00000000000..c158f327391 --- /dev/null +++ b/go/vt/mysqlctl/compression.go @@ -0,0 +1,283 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mysqlctl + +import ( + "context" + "errors" + "flag" + "fmt" + "io" + "io/ioutil" + "os/exec" + "sync" + + "github.com/google/shlex" + "github.com/klauspost/compress/zstd" + "github.com/klauspost/pgzip" + "github.com/pierrec/lz4" + "github.com/planetscale/pargzip" + + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/vterrors" +) + +var ( + compressionLevel = flag.Int("compression-level", 1, "what level to pass to the compressor") + // switch which compressor/decompressor to use + BuiltinCompressor = flag.String("builtin-compressor", "pgzip", "builtin compressor engine to use") + BuiltinDecompressor = flag.String("builtin-decompressor", "auto", "builtin decompressor engine to use") + // use and external command to decompress the backups + ExternalCompressorCmd = flag.String("external-compressor", "", "command with arguments to use when compressing a backup") + ExternalCompressorExt = flag.String("external-compressor-extension", "", "extension to use when using an external compressor") + ExternalDecompressorCmd = flag.String("external-decompressor", "", "command with arguments to use when decompressing a backup") + + errUnsupportedCompressionEngine = errors.New("unsupported engine") + errUnsupportedCompressionExtension = errors.New("unsupported extension") + + // this is used by getEngineFromExtension() to figure out which engine to use in case the user didn't specify + engineExtensions = map[string][]string{ + ".gz": {"pgzip", "pargzip"}, + ".lz4": {"lz4"}, + ".zst": {"zstd"}, + } +) + +func getEngineFromExtension(extension string) (string, error) { + for ext, eng := range engineExtensions { + if ext == extension { + return eng[0], nil // we select the first supported engine in auto mode + } + } + return "", fmt.Errorf("%w %q", errUnsupportedCompressionExtension, extension) +} + +func getExtensionFromEngine(engine string) (string, error) { + for ext, eng := range engineExtensions { + for _, e := range eng { + if e == engine { + return ext, nil + } + } + } + return "", fmt.Errorf("%w %q", errUnsupportedCompressionEngine, engine) +} + +// Validates if the external decompressor exists and return its path. +func validateExternalCmd(cmd string) (string, error) { + if cmd == "" { + return "", errors.New("external command is empty") + } + return exec.LookPath(cmd) +} + +func prepareExternalCompressionCmd(ctx context.Context, cmdStr string) (*exec.Cmd, error) { + cmdArgs, err := shlex.Split(cmdStr) + if err != nil { + return nil, err + } + if len(cmdArgs) < 1 { + return nil, errors.New("external command is empty") + } + cmdPath, err := validateExternalCmd(cmdArgs[0]) + if err != nil { + return nil, err + } + return exec.CommandContext(ctx, cmdPath, cmdArgs[1:]...), nil +} + +// This returns a writer that writes the compressed output of the external command to the provided writer. +func newExternalCompressor(ctx context.Context, cmdStr string, writer io.Writer, logger logutil.Logger) (io.WriteCloser, error) { + logger.Infof("Compressing using external command: %q", cmdStr) + + cmd, err := prepareExternalCompressionCmd(ctx, cmdStr) + if err != nil { + return nil, vterrors.Wrap(err, "unable to start external command") + } + compressor := &externalCompressor{cmd: cmd} + cmd.Stdout = writer + cmdIn, err := cmd.StdinPipe() + if err != nil { + return nil, vterrors.Wrap(err, "cannot create external ompressor stdin pipe") + } + compressor.stdin = cmdIn + cmdErr, err := cmd.StderrPipe() + if err != nil { + return nil, vterrors.Wrap(err, "cannot create external ompressor stderr pipe") + } + + if err := cmd.Start(); err != nil { + return nil, vterrors.Wrap(err, "can't start external decompressor") + } + + compressor.wg.Add(1) // we wait for the gorouting to finish when we call Close() on the writer + go scanLinesToLogger("compressor stderr", cmdErr, logger, compressor.wg.Done) + return compressor, nil +} + +// This returns a reader that reads the compressed input and passes it to the external command to be decompressed. Calls to its +// Read() will return the uncompressed data until EOF. +func newExternalDecompressor(ctx context.Context, cmdStr string, reader io.Reader, logger logutil.Logger) (io.ReadCloser, error) { + logger.Infof("Decompressing using external command: %q", cmdStr) + + cmd, err := prepareExternalCompressionCmd(ctx, cmdStr) + if err != nil { + return nil, vterrors.Wrap(err, "unable to start external command") + } + decompressor := &externalDecompressor{cmd: cmd} + cmd.Stdin = reader + cmdOut, err := cmd.StdoutPipe() + if err != nil { + return nil, vterrors.Wrap(err, "cannot create external decompressor stdout pipe") + } + decompressor.stdout = cmdOut + cmdErr, err := cmd.StderrPipe() + if err != nil { + return nil, vterrors.Wrap(err, "cannot create external decompressor stderr pipe") + } + + if err := cmd.Start(); err != nil { + return nil, vterrors.Wrap(err, "can't start external decompressor") + } + + decompressor.wg.Add(1) // we wait for the gorouting to finish when we call Close() on the reader + go scanLinesToLogger("decompressor stderr", cmdErr, logger, decompressor.wg.Done) + return decompressor, nil +} + +// This is a wrapper to get the right decompressor (see below) based on the extension of the file. +func newBuiltinDecompressorFromExtension(extension, engine string, reader io.Reader, logger logutil.Logger) (decompressor io.ReadCloser, err error) { + // we only infer the engine from the extension is set to "auto", otherwise we use whatever the user selected + if engine == "auto" { + logger.Infof("Builtin decompressor set to auto, checking which engine to decompress based on the extension") + + eng, err := getEngineFromExtension(extension) + if err != nil { + return decompressor, err + } + engine = eng + } + return newBuiltinDecompressor(engine, reader, logger) +} + +// This returns a reader that will decompress the underlying provided reader and will use the specified supported engine. +func newBuiltinDecompressor(engine string, reader io.Reader, logger logutil.Logger) (decompressor io.ReadCloser, err error) { + if engine == "pargzip" { + logger.Warningf("engine \"pargzip\" doesn't support decompression, using \"pgzip\" instead") + engine = "pgzip" + } + + switch engine { + case "pgzip": + d, err := pgzip.NewReader(reader) + if err != nil { + return nil, err + } + decompressor = d + case "lz4": + decompressor = ioutil.NopCloser(lz4.NewReader(reader)) + case "zstd": + d, err := zstd.NewReader(reader) + if err != nil { + return nil, err + } + decompressor = d.IOReadCloser() + default: + err = fmt.Errorf("Unkown decompressor engine: %q", engine) + return decompressor, err + } + + logger.Infof("Decompressing backup using engine %q", engine) + return decompressor, err +} + +// This returns a writer that will compress the data using the specified engine before writing to the underlying writer. +func newBuiltinCompressor(engine string, writer io.Writer, logger logutil.Logger) (compressor io.WriteCloser, err error) { + switch engine { + case "pgzip": + gzip, err := pgzip.NewWriterLevel(writer, *compressionLevel) + if err != nil { + return compressor, vterrors.Wrap(err, "cannot create gzip compressor") + } + gzip.SetConcurrency(*backupCompressBlockSize, *backupCompressBlocks) + compressor = gzip + case "pargzip": + gzip := pargzip.NewWriter(writer) + gzip.ChunkSize = *backupCompressBlockSize + gzip.Parallel = *backupCompressBlocks + gzip.CompressionLevel = *compressionLevel + compressor = gzip + case "lz4": + lz4Writer := lz4.NewWriter(writer).WithConcurrency(*backupCompressBlocks) + lz4Writer.Header = lz4.Header{ + CompressionLevel: *compressionLevel, + } + compressor = lz4Writer + case "zstd": + zst, err := zstd.NewWriter(writer, zstd.WithEncoderLevel(zstd.EncoderLevel(*compressionLevel))) + if err != nil { + return compressor, vterrors.Wrap(err, "cannot create zstd compressor") + } + compressor = zst + default: + err = fmt.Errorf("Unkown compressor engine: %q", engine) + return compressor, err + } + + logger.Infof("Compressing backup using engine %q", engine) + return +} + +// This struct wraps the underlying exec.Cmd and implements the io.WriteCloser interface. +type externalCompressor struct { + cmd *exec.Cmd + stdin io.WriteCloser + wg sync.WaitGroup +} + +func (e *externalCompressor) Write(p []byte) (n int, err error) { + return e.stdin.Write(p) +} + +func (e *externalCompressor) Close() error { + if err := e.stdin.Close(); err != nil { + return err + } + + // wait for the stderr to finish reading as well + e.wg.Wait() + return e.cmd.Wait() +} + +// This struct wraps the underlying exec.Cmd and implements the io.ReadCloser interface. +type externalDecompressor struct { + cmd *exec.Cmd + stdout io.ReadCloser + wg sync.WaitGroup +} + +func (e *externalDecompressor) Read(p []byte) (n int, err error) { + return e.stdout.Read(p) +} + +func (e *externalDecompressor) Close() error { + // wait for the stderr to finish reading as well + e.wg.Wait() + + // exec.Cmd.Wait() will also close the stdout pipe, so we don't need to call it directly + return e.cmd.Wait() +} diff --git a/go/vt/mysqlctl/compression_test.go b/go/vt/mysqlctl/compression_test.go new file mode 100644 index 00000000000..2c3d8827228 --- /dev/null +++ b/go/vt/mysqlctl/compression_test.go @@ -0,0 +1,196 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mysqlctl + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "reflect" + "strings" + "testing" + "time" + + "vitess.io/vitess/go/vt/logutil" +) + +func TestGetExtensionFromEngine(t *testing.T) { + tests := []struct { + engine, extension string + err error + }{ + {"pgzip", ".gz", nil}, + {"pargzip", ".gz", nil}, + {"lz4", ".lz4", nil}, + {"zstd", ".zst", nil}, + {"foobar", "", errUnsupportedCompressionEngine}, + } + + for _, tt := range tests { + t.Run(tt.engine, func(t *testing.T) { + ext, err := getExtensionFromEngine(tt.engine) + // if err != tt.err { + if !errors.Is(err, tt.err) { + t.Errorf("got err: %v; expected: %v", err, tt.err) + } + // } + + if ext != tt.extension { + t.Errorf("got err: %v; expected: %v", ext, tt.extension) + } + }) + } +} + +func TestBuiltinCompressors(t *testing.T) { + data := []byte("foo bar foobar") + logger := logutil.NewMemoryLogger() + + for _, engine := range []string{"pgzip", "lz4", "zstd"} { + t.Run(engine, func(t *testing.T) { + var compressed, decompressed bytes.Buffer + reader := bytes.NewReader(data) + compressor, err := newBuiltinCompressor(engine, &compressed, logger) + if err != nil { + t.Fatal(err) + } + _, err = io.Copy(compressor, reader) + if err != nil { + t.Error(err) + return + } + compressor.Close() + decompressor, err := newBuiltinDecompressor(engine, &compressed, logger) + if err != nil { + t.Error(err) + return + } + _, err = io.Copy(&decompressed, decompressor) + if err != nil { + t.Error(err) + return + } + decompressor.Close() + if len(data) != len(decompressed.Bytes()) { + t.Errorf("Different size of original (%d bytes) and uncompressed (%d bytes) data", len(data), len(decompressed.Bytes())) + } + + if !reflect.DeepEqual(data, decompressed.Bytes()) { + t.Error("decompressed content differs from the original") + } + }) + } +} + +func TestExternalCompressors(t *testing.T) { + data := []byte("foo bar foobar") + logger := logutil.NewMemoryLogger() + + tests := []struct { + compress, decompress string + }{ + {"gzip", "gzip -d"}, + {"pigz", "pigz -d"}, + {"lz4", "lz4 -d"}, + {"zstd", "zstd -d"}, + {"lzop", "lzop -d"}, + {"bzip2", "bzip2 -d"}, + {"lzma", "lzma -d"}, + } + + for _, tt := range tests { + t.Run(tt.compress, func(t *testing.T) { + var compressed, decompressed bytes.Buffer + reader := bytes.NewReader(data) + for _, cmd := range []string{tt.compress, tt.decompress} { + cmdArgs := strings.Split(cmd, " ") + + _, err := validateExternalCmd(cmdArgs[0]) + if err != nil { + t.Skip("Command not available in this host:", err) + } + } + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + compressor, err := newExternalCompressor(ctx, tt.compress, &compressed, logger) + if err != nil { + t.Error(err) + return + } + _, err = io.Copy(compressor, reader) + if err != nil { + t.Error(err) + return + } + compressor.Close() + decompressor, err := newExternalDecompressor(ctx, tt.decompress, &compressed, logger) + if err != nil { + t.Error(err) + return + } + _, err = io.Copy(&decompressed, decompressor) + if err != nil { + t.Error(err) + return + } + decompressor.Close() + if len(data) != len(decompressed.Bytes()) { + t.Errorf("Different size of original (%d bytes) and uncompressed (%d bytes) data", len(data), len(decompressed.Bytes())) + } + if !reflect.DeepEqual(data, decompressed.Bytes()) { + t.Error("decompressed content differs from the original") + } + + }) + } +} + +func TestValidateExternalCmd(t *testing.T) { + tests := []struct { + cmdName string + path string + errStr string + }{ + // this should not find an executable + {"non_existent_cmd", "", "executable file not found"}, + // we expect ls to be on PATH as it is a basic command part of busybox and most containers + {"ls", "ls", ""}, + } + + for i, tt := range tests { + t.Run(fmt.Sprintf("Test #%d", i+1), func(t *testing.T) { + CmdName := tt.cmdName + path, err := validateExternalCmd(CmdName) + if tt.path != "" { + if !strings.HasSuffix(path, tt.path) { + t.Errorf("Expected path \"%s\" to include \"%s\"", path, tt.path) + } + } + if tt.errStr == "" { + if err != nil { + t.Errorf("Expected result \"%v\", got \"%v\"", "", err) + } + } else { + if !strings.Contains(fmt.Sprintf("%v", err), tt.errStr) { + t.Errorf("Expected result \"%v\", got \"%v\"", tt.errStr, err) + } + } + }) + } +} diff --git a/go/vt/mysqlctl/xtrabackupengine.go b/go/vt/mysqlctl/xtrabackupengine.go index 18ced1b375b..e594a582212 100644 --- a/go/vt/mysqlctl/xtrabackupengine.go +++ b/go/vt/mysqlctl/xtrabackupengine.go @@ -26,14 +26,12 @@ import ( "os" "os/exec" "path" + "path/filepath" "regexp" "strings" "sync" "time" - "github.com/klauspost/pgzip" - "github.com/planetscale/pargzip" - "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl/backupstorage" @@ -108,7 +106,16 @@ func (be *XtrabackupEngine) backupFileName() string { fileName += *xtrabackupStreamMode } if *backupStorageCompress { - fileName += ".gz" + if *ExternalDecompressorCmd != "" { + fileName += *ExternalCompressorExt + } else { + if ext, err := getExtensionFromEngine(*BuiltinCompressor); err != nil { + // there is a check for this, but just in case that fails, we set a extension to the file + fileName += ".unknown" + } else { + fileName += ext + } + } } return fileName } @@ -130,6 +137,13 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, params BackupPara if *xtrabackupUser == "" { return false, vterrors.New(vtrpc.Code_INVALID_ARGUMENT, "xtrabackupUser must be specified.") } + + // an extension is required when using an external compressor + if *backupStorageCompress && *ExternalCompressorCmd != "" && *ExternalCompressorExt == "" { + return false, vterrors.New(vtrpc.Code_INVALID_ARGUMENT, + "flag --external-compressor-extension not provided when using an external compressor") + } + // use a mysql connection to detect flavor at runtime conn, err := params.Mysqld.GetDbaConnection(ctx) if conn != nil && err == nil { @@ -147,6 +161,7 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, params BackupPara params.Logger.Infof("Detected MySQL flavor: %v", flavor) backupFileName := be.backupFileName() + params.Logger.Infof("backup file name: %s", backupFileName) numStripes := int(*xtrabackupStripes) // Perform backups in a separate function, so deferred calls to Close() are @@ -279,10 +294,17 @@ func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams // Create the gzip compression pipe, if necessary. if *backupStorageCompress { - compressor := pargzip.NewWriter(writer) - compressor.ChunkSize = *backupCompressBlockSize - compressor.Parallel = *backupCompressBlocks - compressor.CompressionLevel = pargzip.BestSpeed + var compressor io.WriteCloser + + if *ExternalCompressorCmd != "" { + compressor, err = newExternalCompressor(ctx, *ExternalCompressorCmd, writer, params.Logger) + } else { + compressor, err = newBuiltinCompressor(*BuiltinCompressor, writer, params.Logger) + } + if err != nil { + return replicationPosition, vterrors.Wrap(err, "can't create compressor") + } + writer = compressor destCompressors = append(destCompressors, compressor) } @@ -343,7 +365,7 @@ func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams // Close compressor to flush it. After that all data is sent to the buffer. for _, compressor := range destCompressors { if err := compressor.Close(); err != nil { - return replicationPosition, vterrors.Wrap(err, "cannot close gzip compressor") + return replicationPosition, vterrors.Wrap(err, "cannot close compressor") } } @@ -510,6 +532,9 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log baseFileName = be.backupFileName() } + logger.Infof("backup file name: %s", baseFileName) + extension := filepath.Ext(baseFileName) + // Open the source files for reading. srcFiles, err := readStripeFiles(ctx, bh, baseFileName, int(bm.NumStripes), logger) if err != nil { @@ -528,10 +553,17 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log // Create the decompressor if needed. if compressed { - decompressor, err := pgzip.NewReader(reader) + var decompressor io.ReadCloser + + if *ExternalDecompressorCmd != "" { + decompressor, err = newExternalDecompressor(ctx, *ExternalDecompressorCmd, reader, logger) + } else { + decompressor, err = newBuiltinDecompressorFromExtension(extension, *BuiltinDecompressor, reader, logger) + } if err != nil { - return vterrors.Wrap(err, "can't create gzip decompressor") + return vterrors.Wrap(err, "can't create decompressor") } + srcDecompressors = append(srcDecompressors, decompressor) reader = decompressor } @@ -541,7 +573,7 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log defer func() { for _, decompressor := range srcDecompressors { if cerr := decompressor.Close(); cerr != nil { - logger.Errorf("failed to close gzip decompressor: %v", cerr) + logger.Errorf("failed to close decompressor: %v", cerr) } } }() diff --git a/go/vt/wrangler/testlib/backup_test.go b/go/vt/wrangler/testlib/backup_test.go index e77e2353d98..d0f6476b31f 100644 --- a/go/vt/wrangler/testlib/backup_test.go +++ b/go/vt/wrangler/testlib/backup_test.go @@ -24,9 +24,8 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/discovery" @@ -46,7 +45,43 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) +type compressionDetails struct { + BuiltinCompressor string + BuiltinDecompressor string + ExternalCompressorCmd string + ExternalCompressorExt string + ExternalDecompressorCmd string +} + func TestBackupRestore(t *testing.T) { + defer setDefaultCompressionFlag() + err := testBackupRestore(t, nil) + require.NoError(t, err) +} + +// TODO: @rameez. I was expecting this test to fail but it turns out +// we infer decompressor through compression engine in builtinEngine. +// It is only in xtrabackup where we infer decompressor through extension & BuiltinDecompressor param. +func TestBackupRestoreWithPargzip(t *testing.T) { + defer setDefaultCompressionFlag() + cDetails := &compressionDetails{ + BuiltinCompressor: "pargzip", + BuiltinDecompressor: "lz4", + } + + err := testBackupRestore(t, cDetails) + require.ErrorContains(t, err, "lz4: bad magic number") +} + +func setDefaultCompressionFlag() { + *mysqlctl.BuiltinCompressor = "pgzip" + *mysqlctl.BuiltinDecompressor = "auto" + *mysqlctl.ExternalCompressorCmd = "" + *mysqlctl.ExternalCompressorExt = "" + *mysqlctl.ExternalDecompressorCmd = "" +} + +func testBackupRestore(t *testing.T, cDetails *compressionDetails) error { delay := discovery.GetTabletPickerRetryDelay() defer func() { discovery.SetTabletPickerRetryDelay(delay) @@ -82,6 +117,23 @@ func TestBackupRestore(t *testing.T) { fbsRoot := path.Join(root, "fbs") *filebackupstorage.FileBackupStorageRoot = fbsRoot *backupstorage.BackupStorageImplementation = "file" + if cDetails != nil { + if cDetails.BuiltinCompressor != "" { + *mysqlctl.BuiltinCompressor = cDetails.BuiltinCompressor + } + if cDetails.BuiltinDecompressor != "" { + *mysqlctl.BuiltinDecompressor = cDetails.BuiltinDecompressor + } + if cDetails.ExternalCompressorCmd != "" { + *mysqlctl.ExternalCompressorCmd = cDetails.ExternalCompressorCmd + } + if cDetails.ExternalCompressorExt != "" { + *mysqlctl.ExternalCompressorExt = cDetails.ExternalCompressorExt + } + if cDetails.ExternalDecompressorCmd != "" { + *mysqlctl.ExternalDecompressorCmd = cDetails.ExternalDecompressorCmd + } + } // Initialize the fake mysql root directories sourceInnodbDataDir := path.Join(root, "source_innodb_data") @@ -198,7 +250,10 @@ func TestBackupRestore(t *testing.T) { RelayLogInfoPath: path.Join(root, "relay-log.info"), } - require.NoError(t, destTablet.TM.RestoreData(ctx, logutil.NewConsoleLogger(), 0 /* waitForBackupInterval */, false /* deleteBeforeRestore */, time.Time{} /* backupTime */)) + err := destTablet.TM.RestoreData(ctx, logutil.NewConsoleLogger(), 0 /* waitForBackupInterval */, false /* deleteBeforeRestore */, time.Time{} /* backupTime */) + if err != nil { + return err + } // verify the full status require.NoError(t, destTablet.FakeMysqlDaemon.CheckSuperQueryList(), "destTablet.FakeMysqlDaemon.CheckSuperQueryList failed") assert.True(t, destTablet.FakeMysqlDaemon.Replicating) @@ -253,6 +308,7 @@ func TestBackupRestore(t *testing.T) { assert.Equal(t, topodatapb.TabletType_PRIMARY, primary.Tablet.Type) assert.False(t, primary.FakeMysqlDaemon.Replicating) assert.True(t, primary.FakeMysqlDaemon.Running) + return nil } // TestBackupRestoreLagged tests the changes made in https://github.com/vitessio/vitess/pull/5000 diff --git a/test/config.json b/test/config.json index c2269440d24..8348a911e29 100644 --- a/test/config.json +++ b/test/config.json @@ -93,7 +93,7 @@ }, "backup": { "File": "unused.go", - "Args": ["vitess.io/vitess/go/test/endtoend/backup/vtctlbackup"], + "Args": ["vitess.io/vitess/go/test/endtoend/backup/vtctlbackup", "-timeout", "15m"], "Command": [], "Manual": false, "Shard": "vtctlbackup_sharded_clustertest_heavy", @@ -102,7 +102,7 @@ }, "backup_mysqlctld": { "File": "unused.go", - "Args": ["vitess.io/vitess/go/test/endtoend/backup/mysqlctld"], + "Args": ["vitess.io/vitess/go/test/endtoend/backup/mysqlctld", "-timeout", "15m"], "Command": [], "Manual": false, "Shard": "21",