From 1134755c1a5ca0626b2e9815e796b6d107c74fe8 Mon Sep 17 00:00:00 2001 From: Abhishek Dasgupta Date: Tue, 5 Jan 2021 11:02:07 +0530 Subject: [PATCH] Added compactor subcommand. --- .gitignore | 1 + cmd/compact.go | 62 +++++ cmd/initializer.go | 4 +- cmd/miscellaneous.go | 51 +++- cmd/options.go | 70 +++++- cmd/restore.go | 40 +-- cmd/root.go | 1 + pkg/compactor/compactor.go | 228 ++++++++++++++++++ pkg/compactor/compactor_suite_test.go | 107 ++++++++ pkg/compactor/compactor_test.go | 151 ++++++++++++ pkg/compressor/compressor.go | 6 +- pkg/defragmentor/defrag_test.go | 2 +- pkg/initializer/initializer.go | 5 +- pkg/initializer/types.go | 4 +- pkg/initializer/validator/datavalidator.go | 8 +- pkg/miscellaneous/miscellaneous.go | 43 ++++ pkg/server/backuprestoreserver.go | 8 +- pkg/server/init.go | 4 +- pkg/server/types.go | 4 +- pkg/snapshot/restorer/init.go | 91 ------- pkg/snapshot/restorer/restorer.go | 103 +++----- pkg/snapshot/restorer/restorer_suite_test.go | 35 +-- pkg/snapshot/restorer/restorer_test.go | 72 +++--- pkg/snapshot/restorer/types_test.go | 45 ++-- pkg/types/compactor.go | 54 +++++ .../restorer/types.go => types/restorer.go} | 151 ++++++++---- test/utils/utils.go | 33 +++ vendor/github.com/onsi/gomega/go.mod | 2 + 28 files changed, 1033 insertions(+), 352 deletions(-) create mode 100644 cmd/compact.go create mode 100644 pkg/compactor/compactor.go create mode 100644 pkg/compactor/compactor_suite_test.go create mode 100644 pkg/compactor/compactor_test.go delete mode 100644 pkg/snapshot/restorer/init.go create mode 100644 pkg/types/compactor.go rename pkg/{snapshot/restorer/types.go => types/restorer.go} (50%) mode change 100755 => 100644 diff --git a/.gitignore b/.gitignore index c6d937e0e..7da04a201 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,7 @@ test/output test/e2e_test_data default.bkp* default.etcd* +compctedsnap.bkp* # IDE config .vscode diff --git a/cmd/compact.go b/cmd/compact.go new file mode 100644 index 000000000..a48c34786 --- /dev/null +++ b/cmd/compact.go @@ -0,0 +1,62 @@ +// Copyright (c) 2021 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "context" + "fmt" + + "github.com/gardener/etcd-backup-restore/pkg/compactor" + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +// NewCompactCommand compacts the ETCD instance +func NewCompactCommand(ctx context.Context) *cobra.Command { + opts := newCompactOptions() + // compactCmd represents the restore command + compactCmd := &cobra.Command{ + Use: "compact", + Short: "compacts multiple incremental snapshots in etcd backup into a single full snapshot", + Long: fmt.Sprintf(`Compacts an existing backup stored in snapshot store.`), + Run: func(cmd *cobra.Command, args []string) { + /* Compact operation + - Restore from all the latest snapshots (Base + Delta). + - Compact the newly created embedded ETCD instance. + - Defragment + - Save the snapshot + */ + logger := logrus.New() + + options, store, err := BuildRestoreOptionsAndStore(opts) + if err != nil { + return + } + + cp := compactor.NewCompactor(store, logrus.NewEntry(logger)) + res, err := cp.Compact(options, opts.needDefragmentation) + if err != nil { + logger.Fatalf("Failed to restore snapshot: %v", err) + return + } + // logger.Infof("Compacted snapshot is in: %v", filepath.Join(opts.snapstoreConfig.Container, res.Snapshot.SnapDir, res.Snapshot.SnapName)) + logger.Infof("Compacted snapshot is in: %v", res.Path) + + }, + } + + opts.addFlags(compactCmd.Flags()) + return compactCmd +} diff --git a/cmd/initializer.go b/cmd/initializer.go index 0a3d90bfb..b5504d211 100644 --- a/cmd/initializer.go +++ b/cmd/initializer.go @@ -20,7 +20,7 @@ import ( "github.com/gardener/etcd-backup-restore/pkg/initializer" "github.com/gardener/etcd-backup-restore/pkg/initializer/validator" - "github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" "github.com/sirupsen/logrus" "github.com/spf13/cobra" "go.etcd.io/etcd/pkg/types" @@ -64,7 +64,7 @@ func NewInitializeCommand(ctx context.Context) *cobra.Command { logger.Fatal("validation-mode can only be one of these values [full/sanity]") } - restoreOptions := &restorer.RestoreOptions{ + restoreOptions := &brtypes.RestoreOptions{ Config: opts.restorerOptions.restorationConfig, ClusterURLs: clusterUrlsMap, PeerURLs: peerUrls, diff --git a/cmd/miscellaneous.go b/cmd/miscellaneous.go index 0a0d95c6a..1dd594109 100644 --- a/cmd/miscellaneous.go +++ b/cmd/miscellaneous.go @@ -1,4 +1,4 @@ -// Copyright (c) 2018 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file. +// Copyright (c) 2021 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,9 +15,14 @@ package cmd import ( + "fmt" "runtime" + "github.com/gardener/etcd-backup-restore/pkg/miscellaneous" + "github.com/gardener/etcd-backup-restore/pkg/snapstore" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" ver "github.com/gardener/etcd-backup-restore/pkg/version" + "go.etcd.io/etcd/pkg/types" ) func printVersionInfo() { @@ -26,3 +31,47 @@ func printVersionInfo() { logger.Infof("Go Version: %s", runtime.Version()) logger.Infof("Go OS/Arch: %s/%s", runtime.GOOS, runtime.GOARCH) } + +// BuildRestoreOptionsAndStore forms the RestoreOptions and Store object +func BuildRestoreOptionsAndStore(opts restoreOpts) (*brtypes.RestoreOptions, snapstore.SnapStore, error) { + if err := opts.validate(); err != nil { + logger.Fatalf("failed to validate the options: %v", err) + return nil, nil, err + } + + opts.complete() + + clusterUrlsMap, err := types.NewURLsMap(opts.getRestorationConfig().InitialCluster) + if err != nil { + logger.Fatalf("failed creating url map for restore cluster: %v", err) + } + + peerUrls, err := types.NewURLs(opts.getRestorationConfig().InitialAdvertisePeerURLs) + if err != nil { + logger.Fatalf("failed parsing peers urls for restore cluster: %v", err) + } + + store, err := snapstore.GetSnapstore(opts.getSnapstoreConfig()) + if err != nil { + logger.Fatalf("failed to create restore snapstore from configured storage provider: %v", err) + } + + logger.Info("Finding latest set of snapshot to recover from...") + baseSnap, deltaSnapList, err := miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store) + if err != nil { + logger.Fatalf("failed to get latest snapshot: %v", err) + } + + if baseSnap == nil { + logger.Infof("No base snapshot found. Will do nothing.") + return nil, nil, fmt.Errorf("No base snapshot found") + } + + return &brtypes.RestoreOptions{ + Config: opts.getRestorationConfig(), + BaseSnapshot: baseSnap, + DeltaSnapList: deltaSnapList, + ClusterURLs: clusterUrlsMap, + PeerURLs: peerUrls, + }, store, nil +} diff --git a/cmd/options.go b/cmd/options.go index 28e0a4e5c..109a3c69e 100644 --- a/cmd/options.go +++ b/cmd/options.go @@ -26,9 +26,9 @@ import ( "github.com/gardener/etcd-backup-restore/pkg/snapstore" "github.com/gardener/etcd-backup-restore/pkg/initializer/validator" - "github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer" "github.com/gardener/etcd-backup-restore/pkg/server" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" "github.com/ghodss/yaml" "github.com/sirupsen/logrus" flag "github.com/spf13/pflag" @@ -127,19 +127,83 @@ func (c *initializerOptions) complete() { c.restorerOptions.complete() } +type restoreOpts interface { + getRestorationConfig() *brtypes.RestorationConfig + getSnapstoreConfig() *snapstore.Config + validate() error + complete() +} + +type compactOptions struct { + *restorerOptions + needDefragmentation bool +} + +// newCompactOptions returns the validation config. +func newCompactOptions() *compactOptions { + return &compactOptions{ + restorerOptions: &restorerOptions{ + restorationConfig: brtypes.NewRestorationConfig(), + snapstoreConfig: snapstore.NewSnapstoreConfig(), + }, + needDefragmentation: true, + } +} + +func (c *compactOptions) getRestorationConfig() *brtypes.RestorationConfig { + return c.restorationConfig +} + +func (c *compactOptions) getSnapstoreConfig() *snapstore.Config { + return c.snapstoreConfig +} + +// AddFlags adds the flags to flagset. +func (c *compactOptions) addFlags(fs *flag.FlagSet) { + c.restorationConfig.AddFlags(fs) + c.snapstoreConfig.AddFlags(fs) + fs.BoolVar(&c.needDefragmentation, "defragment", c.needDefragmentation, "defragment after compaction") +} + +// Validate validates the config. +func (c *compactOptions) validate() error { + if err := c.snapstoreConfig.Validate(); err != nil { + return err + } + + if err := c.snapstoreConfig.Validate(); err != nil { + return err + } + + return c.restorationConfig.Validate() +} + +// complete completes the config. +func (c *compactOptions) complete() { + c.snapstoreConfig.Complete() +} + type restorerOptions struct { - restorationConfig *restorer.RestorationConfig + restorationConfig *brtypes.RestorationConfig snapstoreConfig *snapstore.Config } // newRestorerOptions returns the validation config. func newRestorerOptions() *restorerOptions { return &restorerOptions{ - restorationConfig: restorer.NewRestorationConfig(), + restorationConfig: brtypes.NewRestorationConfig(), snapstoreConfig: snapstore.NewSnapstoreConfig(), } } +func (c *restorerOptions) getRestorationConfig() *brtypes.RestorationConfig { + return c.restorationConfig +} + +func (c *restorerOptions) getSnapstoreConfig() *snapstore.Config { + return c.snapstoreConfig +} + // AddFlags adds the flags to flagset. func (c *restorerOptions) addFlags(fs *flag.FlagSet) { c.restorationConfig.AddFlags(fs) diff --git a/cmd/restore.go b/cmd/restore.go index 2ab42b1f7..ff378ebff 100644 --- a/cmd/restore.go +++ b/cmd/restore.go @@ -18,12 +18,9 @@ import ( "context" "fmt" - "github.com/gardener/etcd-backup-restore/pkg/miscellaneous" "github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer" - "github.com/gardener/etcd-backup-restore/pkg/snapstore" "github.com/sirupsen/logrus" "github.com/spf13/cobra" - "go.etcd.io/etcd/pkg/types" ) // NewRestoreCommand returns the command to restore @@ -40,48 +37,13 @@ func NewRestoreCommand(ctx context.Context) *cobra.Command { - Restore etcd data diretory from full snapshot. */ logger := logrus.New() - if err := opts.validate(); err != nil { - logger.Fatalf("failed to validate the options: %v", err) - return - } - - opts.complete() - - clusterUrlsMap, err := types.NewURLsMap(opts.restorationConfig.InitialCluster) - if err != nil { - logger.Fatalf("failed creating url map for restore cluster: %v", err) - } - - peerUrls, err := types.NewURLs(opts.restorationConfig.InitialAdvertisePeerURLs) - if err != nil { - logger.Fatalf("failed parsing peers urls for restore cluster: %v", err) - } - - store, err := snapstore.GetSnapstore(opts.snapstoreConfig) - if err != nil { - logger.Fatalf("failed to create snapstore from configured storage provider: %v", err) - } - logger.Info("Finding latest set of snapshot to recover from...") - baseSnap, deltaSnapList, err := miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store) + options, store, err := BuildRestoreOptionsAndStore(opts) if err != nil { - logger.Fatalf("failed to get latest snapshot: %v", err) - } - if baseSnap == nil { - logger.Infof("No snapshot found. Will do nothing.") return } rs := restorer.NewRestorer(store, logrus.NewEntry(logger)) - - options := &restorer.RestoreOptions{ - Config: opts.restorationConfig, - BaseSnapshot: *baseSnap, - DeltaSnapList: deltaSnapList, - ClusterURLs: clusterUrlsMap, - PeerURLs: peerUrls, - } - if err := rs.Restore(*options); err != nil { logger.Fatalf("Failed to restore snapshot: %v", err) return diff --git a/cmd/root.go b/cmd/root.go index f0760d7c4..697222ec1 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -38,6 +38,7 @@ from previously taken snapshot.`, RootCmd.Flags().BoolVarP(&version, "version", "v", false, "print version info") RootCmd.AddCommand(NewSnapshotCommand(ctx), NewRestoreCommand(ctx), + NewCompactCommand(ctx), NewInitializeCommand(ctx), NewServerCommand(ctx)) return RootCmd diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go new file mode 100644 index 000000000..b994c759b --- /dev/null +++ b/pkg/compactor/compactor.go @@ -0,0 +1,228 @@ +package compactor + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "time" + + "github.com/gardener/etcd-backup-restore/pkg/compressor" + "github.com/gardener/etcd-backup-restore/pkg/metrics" + "github.com/gardener/etcd-backup-restore/pkg/miscellaneous" + "github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer" + "github.com/gardener/etcd-backup-restore/pkg/snapstore" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" + "go.etcd.io/etcd/clientv3" +) + +const ( + tmpDir = "/tmp" + defaultName = "default" + defaultInitialAdvertisePeerURLs = "http://localhost:2384" + defaultInitialClusterToken = "etcd-cluster" + defaultMaxFetchers = 6 + defaultMaxCallSendMsgSize = 10 * 1024 * 1024 //10Mib + defaultMaxRequestBytes = 10 * 1024 * 1024 //10Mib + defaultMaxTxnOps = 10 * 1024 + defaultEmbeddedEtcdQuotaBytes = 8 * 1024 * 1024 * 1024 //8Gib + etcdDialTimeout = time.Second * 30 + etcdDir = tmpDir + "/compaction" + restoreClusterToken = "etcd-cluster" +) + +// Compactor holds the necessary details for compacting ETCD +type Compactor struct { + logger *logrus.Entry + store snapstore.SnapStore +} + +// NewCompactor creates compactor +func NewCompactor(store snapstore.SnapStore, logger *logrus.Entry) *Compactor { + return &Compactor{ + logger: logger, + store: store, + } +} + +// Compact is mainly responsible for applying snapshots (full + delta), compacting, drefragmenting, taking the snapshot and saving it sequentially. +func (cp *Compactor) Compact(ro *brtypes.RestoreOptions, needDefragmentation bool) (*brtypes.CompactionResult, error) { + cp.logger.Info("Start compacting") + + // If no basesnapshot is found, abort compaction as there would be nothing to compact + if ro.BaseSnapshot == nil { + cp.logger.Error("No base snapshot found. Nothing is available for compaction") + return nil, fmt.Errorf("No base snapshot found. Nothing is available for compaction") + } + + // Set suffix of compacted snapshot that will be result of this compaction + suffix := ro.BaseSnapshot.CompressionSuffix + if len(ro.DeltaSnapList) > 0 { + suffix = ro.DeltaSnapList[ro.DeltaSnapList.Len()-1].CompressionSuffix + } + + // TODO: Remove this provision of saving the compacted snapshot in current directory when we change directory structure + cwd, err := os.Getwd() + if err != nil { + return nil, fmt.Errorf("Unable to create file to save compacted snapshot %v", err) + } + + cmpctdFileName := filepath.Join(cwd, filepath.Base(fmt.Sprintf("%s-%d%s", "compacted", time.Now().UTC().Unix(), suffix))) + cmpctdFile, err := os.Create(cmpctdFileName) + defer cmpctdFile.Close() + cp.logger.Infof("Created compacted snapshot in: %s", cmpctdFileName) + // Remove till here + + // Set a temporary etcd data directory for embedded etcd + cmpctDir, err := ioutil.TempDir("/tmp", "compactor") + if err != nil { + cp.logger.Errorf("Unable to create temporary etcd directory for compaction: %s", err.Error()) + return nil, err + } + + defer os.RemoveAll(cmpctDir) + + ro.Config.RestoreDataDir = cmpctDir + + var client *clientv3.Client + var ep []string + + // Then restore from the base snapshot + r := restorer.NewRestorer(cp.store, cp.logger) + /*if err := r.Restore(*ro); err != nil { + return nil, fmt.Errorf("Unable to restore snapshots during compaction: %v", err) + }*/ + + if err := r.RestoreFromBaseSnapshot(*ro); err != nil { + return nil, fmt.Errorf("Failed to restore from the base snapshot :%v", err) + } + + cp.logger.Infof("Starting embedded etcd server for compaction...") + e, err := miscellaneous.StartEmbeddedEtcd(cp.logger, ro) + if err != nil { + return nil, err + } + defer func() { + e.Server.Stop() + e.Close() + }() + + ep = []string{e.Clients[0].Addr().String()} + cfg := clientv3.Config{MaxCallSendMsgSize: ro.Config.MaxCallSendMsgSize, Endpoints: ep} + client, err = clientv3.New(cfg) + if err != nil { + return nil, err + } + defer client.Close() + + if len(ro.DeltaSnapList) > 0 { + cp.logger.Infof("Applying delta snapshots...") + if err := r.ApplyDeltaSnapshots(client, *ro); err != nil { + cp.logger.Warnf("Could not apply the delta snapshots: %v", err) + } + } else { + cp.logger.Infof("No delta snapshots present over base snapshot.") + } + + // Then compact ETCD + ctx := context.TODO() + revCheckCtx, cancel := context.WithTimeout(ctx, etcdDialTimeout) + getResponse, err := client.Get(revCheckCtx, "foo") + cancel() + if err != nil { + return nil, fmt.Errorf("failed to connect to client: %v", err) + } + etcdRevision := getResponse.Header.GetRevision() + + client.Compact(ctx, etcdRevision) + + // Then defrag the ETCD + if needDefragmentation { + var dbSizeBeforeDefrag, dbSizeAfterDefrag int64 + statusReqCtx, cancel := context.WithTimeout(ctx, etcdDialTimeout) + status, err := client.Status(statusReqCtx, ep[0]) + cancel() + if err != nil { + cp.logger.Warnf("Failed to get status of etcd member[%s] with error: %v", ep, err) + } else { + dbSizeBeforeDefrag = status.DbSize + } + + start := time.Now() + defragCtx, cancel := context.WithTimeout(ctx, time.Duration(60*time.Second)) + _, err = client.Defragment(defragCtx, ep[0]) + cancel() + if err != nil { + cp.logger.Errorf("Total time taken to defragment: %v", time.Now().Sub(start).Seconds()) + cp.logger.Errorf("Failed to defragment etcd member[%s] with error: %v", ep, err) + } + cp.logger.Infof("Total time taken to defragment: %v", time.Now().Sub(start).Seconds()) + cp.logger.Infof("Finished defragmenting etcd member[%s]", ep) + + statusReqCtx, cancel = context.WithTimeout(ctx, etcdDialTimeout) + status, err = client.Status(statusReqCtx, ep[0]) + cancel() + if err != nil { + cp.logger.Warnf("Failed to get status of etcd member[%s] with error: %v", ep, err) + } else { + dbSizeAfterDefrag = status.DbSize + cp.logger.Infof("Probable DB size change for etcd member [%s]: %dB -> %dB after defragmentation", ep, dbSizeBeforeDefrag, dbSizeAfterDefrag) + } + } + + // Then take snapeshot of ETCD + snapshotReqCtx, cancel := context.WithTimeout(ctx, etcdDialTimeout) + defer cancel() + + rc, err := client.Snapshot(snapshotReqCtx) + if err != nil { + return nil, fmt.Errorf("Failed to create etcd snapshot out of compacted DB: %v", err) + } + + isCompressed, compressionPolicy, err := compressor.IsSnapshotCompressed(suffix) + if err != nil { + return nil, fmt.Errorf("Unable to determine if snapshot is compressed: %v", ro.BaseSnapshot.CompressionSuffix) + } + if isCompressed { + rc, err = compressor.CompressSnapshot(rc, compressionPolicy) + if err != nil { + return nil, fmt.Errorf("Unable to obtain reader for compressed file: %v", err) + } + } + defer rc.Close() + + cp.logger.Infof("Successfully opened snapshot reader on etcd") + + // TODO: Save the snapshots to store when we change the directory structure + // Then save the snapshot to the store. + //s := snapstore.NewSnapshot(snapstore.SnapshotKindFull, 0, etcdRevision, ro.BaseSnapshot.CompressionSuffix) + startTime := time.Now() + /*if err := cp.store.Save(*s, rc); err != nil { + timeTaken := time.Now().Sub(startTime).Seconds() + metrics.SnapshotDurationSeconds.With(prometheus.Labels{metrics.LabelKind: snapstore.SnapshotKindFull, metrics.LabelSucceeded: metrics.ValueSucceededFalse}).Observe(timeTaken) + return nil, fmt.Errorf("failed to save snapshot: %v", err) + }*/ + + // TODO: Remove this copy when directory structure is changed + if _, err = io.Copy(cmpctdFile, rc); err != nil { + return nil, fmt.Errorf("Unable to create compacted snapshot: %v", err) + } + + timeTaken := time.Now().Sub(startTime).Seconds() + metrics.SnapshotDurationSeconds.With(prometheus.Labels{metrics.LabelKind: snapstore.SnapshotKindFull, metrics.LabelSucceeded: metrics.ValueSucceededTrue}).Observe(timeTaken) + cp.logger.Infof("Total time to save snapshot: %f seconds.", timeTaken) + + compactionDuration, err := time.ParseDuration(fmt.Sprintf("%fs", timeTaken)) + if err != nil { + cp.logger.Warnf("Could not record compaction duration: %v", err) + } + return &brtypes.CompactionResult{ + //Snapshot: s, + Path: cmpctdFileName, + LastCompactionDuration: compactionDuration, + }, nil +} diff --git a/pkg/compactor/compactor_suite_test.go b/pkg/compactor/compactor_suite_test.go new file mode 100644 index 000000000..d18814a39 --- /dev/null +++ b/pkg/compactor/compactor_suite_test.go @@ -0,0 +1,107 @@ +// Copyright (c) 2018 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compactor_test + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "sync" + "testing" + "time" + + "github.com/gardener/etcd-backup-restore/pkg/compressor" + "github.com/gardener/etcd-backup-restore/test/utils" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/sirupsen/logrus" + "go.etcd.io/etcd/embed" +) + +var ( + testSuitDir, testEtcdDir, testSnapshotDir string + testCtx = context.Background() + logger = logrus.New().WithField("suite", "compactor") + etcd *embed.Etcd + err error + keyTo int + endpoints []string +) + +func TestCompactor(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Compactor Suite") +} + +var _ = SynchronizedBeforeSuite(func() []byte { + var ( + data []byte + ) + + // Create a directory for compaction test cases + testSuitDir, err = ioutil.TempDir("/tmp", "compactor-test") + Expect(err).ShouldNot(HaveOccurred()) + + // Directory for the main ETCD process + testEtcdDir := fmt.Sprintf("%s/etcd/default.etcd", testSuitDir) + // Directory for storing the backups + testSnapshotDir := fmt.Sprintf("%s/etcd/snapshotter.bkp", testSuitDir) + + logger.Infof("ETCD Directory is: %s", testEtcdDir) + logger.Infof("Snapshot Directory is: %s", testSnapshotDir) + + // Start the main ETCD process that will run untill all compaction test cases are run + etcd, err = utils.StartEmbeddedEtcd(testCtx, testEtcdDir, logger) + Expect(err).ShouldNot(HaveOccurred()) + endpoints = []string{etcd.Clients[0].Addr().String()} + logger.Infof("endpoints: %s", endpoints) + + // Populates data into the ETCD + populatorCtx, cancelPopulator := context.WithTimeout(testCtx, 15*time.Second) + defer cancelPopulator() + resp := &utils.EtcdDataPopulationResponse{} + wg := &sync.WaitGroup{} + wg.Add(1) + go utils.PopulateEtcdWithWaitGroup(populatorCtx, wg, logger, endpoints, resp) + + // Take snapshots (Full + Delta) of the ETCD database + deltaSnapshotPeriod := time.Second + ctx := utils.ContextWithWaitGroupFollwedByGracePeriod(populatorCtx, wg, deltaSnapshotPeriod+2*time.Second) + compressionConfig := compressor.NewCompressorConfig() + compressionConfig.Enabled = true + compressionConfig.CompressionPolicy = "gzip" + err = utils.RunSnapshotter(logger, testSnapshotDir, deltaSnapshotPeriod, endpoints, ctx.Done(), true, compressionConfig) + Expect(err).ShouldNot(HaveOccurred()) + + // Wait unitil the populator finishes with populating ETCD + wg.Wait() + + keyTo = resp.KeyTo + return data + +}, func(data []byte) {}) + +var _ = SynchronizedAfterSuite(func() {}, cleanUp) + +func cleanUp() { + logger.Info("Stop the Embedded etcd server.") + etcd.Server.Stop() + etcd.Close() + + logger.Infof("All tests are done for compactor suite. %s is being removed.", testSuitDir) + err = os.RemoveAll(testSuitDir) + Expect(err).ShouldNot(HaveOccurred()) +} diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go new file mode 100644 index 000000000..db2865154 --- /dev/null +++ b/pkg/compactor/compactor_test.go @@ -0,0 +1,151 @@ +package compactor_test + +import ( + "fmt" + "os" + + "github.com/gardener/etcd-backup-restore/pkg/compactor" + "github.com/gardener/etcd-backup-restore/pkg/miscellaneous" + "github.com/gardener/etcd-backup-restore/pkg/snapstore" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "go.etcd.io/etcd/pkg/types" +) + +var _ = Describe("Running Compactor", func() { + var ( + dir string + store snapstore.SnapStore + cptr *compactor.Compactor + restorePeerURLs []string + clusterUrlsMap types.URLsMap + peerUrls types.URLs + // deltaSnapshotPeriod time.Duration + ) + const ( + restoreName string = "default" + restoreClusterToken string = "etcd-cluster" + restoreCluster string = "default=http://localhost:2380" + skipHashCheck bool = false + maxFetchers uint = 6 + maxCallSendMsgSize = 2 * 1024 * 1024 //2Mib + maxRequestBytes = 2 * 1024 * 1024 //2Mib + maxTxnOps = 2 * 1024 + embeddedEtcdQuotaBytes int64 = 8 * 1024 * 1024 * 1024 + ) + + BeforeEach(func() { + //wg = &sync.WaitGroup{} + restorePeerURLs = []string{"http://localhost:2380"} + clusterUrlsMap, err = types.NewURLsMap(restoreCluster) + Expect(err).ShouldNot(HaveOccurred()) + peerUrls, err = types.NewURLs(restorePeerURLs) + Expect(err).ShouldNot(HaveOccurred()) + }) + + Describe("Compact while a etcd server is running", func() { + var restoreOpts *brtypes.RestoreOptions + + BeforeEach(func() { + dir = fmt.Sprintf("%s/etcd/snapshotter.bkp", testSuitDir) + + store, err = snapstore.GetSnapstore(&snapstore.Config{Container: dir, Provider: "Local"}) + Expect(err).ShouldNot(HaveOccurred()) + fmt.Println("The store where compaction will save snapshot is: ", store) + + cptr = compactor.NewCompactor(store, logger) + restoreOpts = &brtypes.RestoreOptions{ + Config: &brtypes.RestorationConfig{ + InitialClusterToken: restoreClusterToken, + InitialCluster: restoreCluster, + Name: restoreName, + InitialAdvertisePeerURLs: restorePeerURLs, + SkipHashCheck: skipHashCheck, + MaxFetchers: maxFetchers, + MaxCallSendMsgSize: maxCallSendMsgSize, + MaxRequestBytes: maxRequestBytes, + MaxTxnOps: maxTxnOps, + EmbeddedEtcdQuotaBytes: embeddedEtcdQuotaBytes, + }, + ClusterURLs: clusterUrlsMap, + PeerURLs: peerUrls, + } + }) + + AfterEach(func() { + }) + + Context("with defragmention allowed", func() { + It("should create a snapshot", func() { + restoreOpts.Config.MaxFetchers = 4 + + // Fetch latest snapshots + baseSnapshot, deltaSnapList, err := miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store) + Expect(err).ShouldNot(HaveOccurred()) + + restoreOpts.BaseSnapshot = baseSnapshot + restoreOpts.DeltaSnapList = deltaSnapList + + // Take the compacted full snapshot with defragmnetation allowed + res, err := cptr.Compact(restoreOpts, true) + Expect(err).ShouldNot(HaveOccurred()) + + // Check if the compacted full snapshot is really present + // fi, err := os.Stat(filepath.Join(dir, res.Snapshot.SnapDir, res.Snapshot.SnapName)) + fi, err := os.Stat(res.Path) + Expect(err).ShouldNot(HaveOccurred()) + + size := fi.Size() + Expect(size).ShouldNot(BeZero()) + + err = os.Remove(res.Path) + Expect(err).ShouldNot(HaveOccurred()) + }) + }) + Context("with defragmention not allowed", func() { + It("should create a snapshot", func() { + restoreOpts.Config.MaxFetchers = 4 + + // Fetch latest snapshots + baseSnapshot, deltaSnapList, err := miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store) + Expect(err).ShouldNot(HaveOccurred()) + + restoreOpts.BaseSnapshot = baseSnapshot + restoreOpts.DeltaSnapList = deltaSnapList + + // Take the compacted full snapshot with defragmnetation not allowed + res, err := cptr.Compact(restoreOpts, false) + Expect(err).ShouldNot(HaveOccurred()) + + // Check if the compacted full snapshot is really present + // fi, err := os.Stat(filepath.Join(dir, res.Snapshot.SnapDir, res.Snapshot.SnapName)) + fi, err := os.Stat(res.Path) + Expect(err).ShouldNot(HaveOccurred()) + + size := fi.Size() + Expect(size).ShouldNot(BeZero()) + + err = os.Remove(res.Path) + Expect(err).ShouldNot(HaveOccurred()) + }) + }) + Context("with no basesnapshot in backup store", func() { + It("should not run compaction", func() { + restoreOpts.Config.MaxFetchers = 4 + + // Fetch the latest snapshots which are one compacted full snapshot and subsequent delta snapshots + _, deltaSnapList, err := miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store) + Expect(err).ShouldNot(HaveOccurred()) + + // But set the BaseSnapshot as nil + restoreOpts.BaseSnapshot = nil + restoreOpts.DeltaSnapList = deltaSnapList + + // Try capturing the compacted full snapshot + _, err = cptr.Compact(restoreOpts, false) + Expect(err).Should(HaveOccurred()) + }) + }) + }) +}) diff --git a/pkg/compressor/compressor.go b/pkg/compressor/compressor.go index 1d1d4bc7c..2b1f680dc 100644 --- a/pkg/compressor/compressor.go +++ b/pkg/compressor/compressor.go @@ -52,14 +52,16 @@ func CompressSnapshot(data io.ReadCloser, compressionPolicy string) (io.ReadClos go func() { var err error + var n int64 defer pWriter.CloseWithError(err) defer gWriter.Close() defer data.Close() - _, err = io.Copy(gWriter, data) + n, err = io.Copy(gWriter, data) if err != nil { - logger.Infof("compression failed: %v", err) + logger.Errorf("compression failed: %v", err) return } + logger.Infof("Total written bytes: %v", n) }() return pReader, nil diff --git a/pkg/defragmentor/defrag_test.go b/pkg/defragmentor/defrag_test.go index 6f0e7d479..0d2bba9e6 100644 --- a/pkg/defragmentor/defrag_test.go +++ b/pkg/defragmentor/defrag_test.go @@ -135,7 +135,7 @@ var _ = Describe("Defrag", func() { Expect(err).ShouldNot(HaveOccurred()) Expect(defragCount).Should(Or(Equal(minimumExpectedDefragCount), Equal(minimumExpectedDefragCount+1))) - Expect(newStatus.DbSize).Should(BeNumerically("<", oldDBSize)) + Expect(newStatus.DbSize).Should(BeNumerically("<=", oldDBSize)) Expect(newStatus.Header.GetRevision()).Should(BeNumerically("==", oldRevision)) }) }) diff --git a/pkg/initializer/initializer.go b/pkg/initializer/initializer.go index bce01ff06..5387e6596 100644 --- a/pkg/initializer/initializer.go +++ b/pkg/initializer/initializer.go @@ -25,6 +25,7 @@ import ( "github.com/gardener/etcd-backup-restore/pkg/miscellaneous" "github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer" "github.com/gardener/etcd-backup-restore/pkg/snapstore" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "go.uber.org/zap" @@ -69,7 +70,7 @@ func (e *EtcdInitializer) Initialize(mode validator.Mode, failBelowRevision int6 } //NewInitializer creates an etcd initializer object. -func NewInitializer(options *restorer.RestoreOptions, snapstoreConfig *snapstore.Config, logger *logrus.Logger) *EtcdInitializer { +func NewInitializer(options *brtypes.RestoreOptions, snapstoreConfig *snapstore.Config, logger *logrus.Logger) *EtcdInitializer { zapLogger, _ := zap.NewProduction() etcdInit := &EtcdInitializer{ Config: &Config{ @@ -121,7 +122,7 @@ func (e *EtcdInitializer) restoreCorruptData() (bool, error) { return e.restoreWithEmptySnapstore() } - tempRestoreOptions.BaseSnapshot = *baseSnap + tempRestoreOptions.BaseSnapshot = baseSnap tempRestoreOptions.DeltaSnapList = deltaSnapList tempRestoreOptions.Config.RestoreDataDir = fmt.Sprintf("%s.%s", tempRestoreOptions.Config.RestoreDataDir, "part") diff --git a/pkg/initializer/types.go b/pkg/initializer/types.go index 4bcfc2f90..8af7e4d62 100644 --- a/pkg/initializer/types.go +++ b/pkg/initializer/types.go @@ -16,8 +16,8 @@ package initializer import ( "github.com/gardener/etcd-backup-restore/pkg/initializer/validator" - "github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer" "github.com/gardener/etcd-backup-restore/pkg/snapstore" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" "github.com/sirupsen/logrus" ) @@ -25,7 +25,7 @@ import ( // checks and snapshot restoration in case of corruption. type Config struct { SnapstoreConfig *snapstore.Config - RestoreOptions *restorer.RestoreOptions + RestoreOptions *brtypes.RestoreOptions } // EtcdInitializer implements Initializer interface to perform validation and diff --git a/pkg/initializer/validator/datavalidator.go b/pkg/initializer/validator/datavalidator.go index 11f1ad6a2..85932d7eb 100644 --- a/pkg/initializer/validator/datavalidator.go +++ b/pkg/initializer/validator/datavalidator.go @@ -26,8 +26,8 @@ import ( "time" "github.com/gardener/etcd-backup-restore/pkg/miscellaneous" - "github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer" "github.com/gardener/etcd-backup-restore/pkg/snapstore" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" "github.com/sirupsen/logrus" bolt "go.etcd.io/bbolt" "go.etcd.io/etcd/clientv3" @@ -291,15 +291,15 @@ func (d *DataValidator) checkFullRevisionConsistency(dataDir string, latestSnaps var latestSyncedEtcdRevision int64 d.Logger.Info("Starting embedded etcd server...") - ro := &restorer.RestoreOptions{ - Config: &restorer.RestorationConfig{ + ro := &brtypes.RestoreOptions{ + Config: &brtypes.RestorationConfig{ RestoreDataDir: dataDir, EmbeddedEtcdQuotaBytes: d.Config.EmbeddedEtcdQuotaBytes, MaxRequestBytes: defaultMaxRequestBytes, MaxTxnOps: defaultMaxTxnOps, }, } - e, err := restorer.StartEmbeddedEtcd(logrus.NewEntry(d.Logger), ro) + e, err := miscellaneous.StartEmbeddedEtcd(logrus.NewEntry(d.Logger), ro) if err != nil { d.Logger.Infof("unable to start embedded etcd: %v", err) return DataDirectoryCorrupt, err diff --git a/pkg/miscellaneous/miscellaneous.go b/pkg/miscellaneous/miscellaneous.go index 6e2183cf5..6f6a34a1e 100644 --- a/pkg/miscellaneous/miscellaneous.go +++ b/pkg/miscellaneous/miscellaneous.go @@ -15,11 +15,18 @@ package miscellaneous import ( + "fmt" + "net/url" + "path/filepath" "sort" + "time" "github.com/gardener/etcd-backup-restore/pkg/metrics" "github.com/gardener/etcd-backup-restore/pkg/snapstore" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" + "go.etcd.io/etcd/embed" ) // GetLatestFullSnapshotAndDeltaSnapList returns the latest snapshot @@ -54,3 +61,39 @@ func GetLatestFullSnapshotAndDeltaSnapList(store snapstore.SnapStore) (*snapstor } return fullSnapshot, deltaSnapList, nil } + +// StartEmbeddedEtcd starts the embedded etcd server. +func StartEmbeddedEtcd(logger *logrus.Entry, ro *brtypes.RestoreOptions) (*embed.Etcd, error) { + cfg := embed.NewConfig() + cfg.Dir = filepath.Join(ro.Config.RestoreDataDir) + DefaultListenPeerURLs := "http://localhost:0" + DefaultListenClientURLs := "http://localhost:0" + DefaultInitialAdvertisePeerURLs := "http://localhost:0" + DefaultAdvertiseClientURLs := "http://localhost:0" + lpurl, _ := url.Parse(DefaultListenPeerURLs) + apurl, _ := url.Parse(DefaultInitialAdvertisePeerURLs) + lcurl, _ := url.Parse(DefaultListenClientURLs) + acurl, _ := url.Parse(DefaultAdvertiseClientURLs) + cfg.LPUrls = []url.URL{*lpurl} + cfg.LCUrls = []url.URL{*lcurl} + cfg.APUrls = []url.URL{*apurl} + cfg.ACUrls = []url.URL{*acurl} + cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) + cfg.QuotaBackendBytes = ro.Config.EmbeddedEtcdQuotaBytes + cfg.MaxRequestBytes = ro.Config.MaxRequestBytes + cfg.MaxTxnOps = ro.Config.MaxTxnOps + cfg.Logger = "zap" + e, err := embed.StartEtcd(cfg) + if err != nil { + return nil, err + } + select { + case <-e.Server.ReadyNotify(): + logger.Infof("Embedded server is ready to listen client at: %s", e.Clients[0].Addr()) + case <-time.After(60 * time.Second): + e.Server.Stop() // trigger a shutdown + e.Close() + return nil, fmt.Errorf("server took too long to start") + } + return e, nil +} diff --git a/pkg/server/backuprestoreserver.go b/pkg/server/backuprestoreserver.go index 608b1c87d..d3a9e3ea8 100644 --- a/pkg/server/backuprestoreserver.go +++ b/pkg/server/backuprestoreserver.go @@ -23,12 +23,12 @@ import ( "github.com/gardener/etcd-backup-restore/pkg/errors" "github.com/gardener/etcd-backup-restore/pkg/metrics" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" "github.com/prometheus/client_golang/prometheus" "github.com/gardener/etcd-backup-restore/pkg/defragmentor" "github.com/gardener/etcd-backup-restore/pkg/etcdutil" "github.com/gardener/etcd-backup-restore/pkg/initializer" - "github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer" "github.com/gardener/etcd-backup-restore/pkg/snapshot/snapshotter" "github.com/gardener/etcd-backup-restore/pkg/snapstore" cron "github.com/robfig/cron/v3" @@ -71,7 +71,7 @@ func (b *BackupRestoreServer) Run(ctx context.Context) error { b.logger.Fatalf("failed creating url map for restore cluster: %v", err) } - options := &restorer.RestoreOptions{ + options := &brtypes.RestoreOptions{ Config: b.config.RestorationConfig, ClusterURLs: clusterURLsMap, PeerURLs: peerURLs, @@ -114,7 +114,7 @@ func (b *BackupRestoreServer) startHTTPServer(initializer initializer.Initialize // runServerWithoutSnapshotter runs the etcd-backup-restore // for the case where snapshotter is not configured -func (b *BackupRestoreServer) runServerWithoutSnapshotter(ctx context.Context, restoreOpts *restorer.RestoreOptions) { +func (b *BackupRestoreServer) runServerWithoutSnapshotter(ctx context.Context, restoreOpts *brtypes.RestoreOptions) { etcdInitializer := initializer.NewInitializer(restoreOpts, nil, b.logger.Logger) // If no storage provider is given, snapshotter will be nil, in which @@ -131,7 +131,7 @@ func (b *BackupRestoreServer) runServerWithoutSnapshotter(ctx context.Context, r // runServerWithSnapshotter runs the etcd-backup-restore // for the case where snapshotter is configured correctly -func (b *BackupRestoreServer) runServerWithSnapshotter(ctx context.Context, restoreOpts *restorer.RestoreOptions) error { +func (b *BackupRestoreServer) runServerWithSnapshotter(ctx context.Context, restoreOpts *brtypes.RestoreOptions) error { ackCh := make(chan struct{}) etcdInitializer := initializer.NewInitializer(restoreOpts, b.config.SnapstoreConfig, b.logger.Logger) diff --git a/pkg/server/init.go b/pkg/server/init.go index 3e3a13a47..c17ac7615 100644 --- a/pkg/server/init.go +++ b/pkg/server/init.go @@ -20,8 +20,8 @@ import ( "github.com/gardener/etcd-backup-restore/pkg/compressor" "github.com/gardener/etcd-backup-restore/pkg/etcdutil" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" - "github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer" "github.com/gardener/etcd-backup-restore/pkg/snapshot/snapshotter" "github.com/gardener/etcd-backup-restore/pkg/snapstore" "github.com/robfig/cron/v3" @@ -35,8 +35,8 @@ func NewBackupRestoreComponentConfig() *BackupRestoreComponentConfig { ServerConfig: NewHTTPServerConfig(), SnapshotterConfig: snapshotter.NewSnapshotterConfig(), SnapstoreConfig: snapstore.NewSnapstoreConfig(), - RestorationConfig: restorer.NewRestorationConfig(), CompressionConfig: compressor.NewCompressorConfig(), + RestorationConfig: brtypes.NewRestorationConfig(), DefragmentationSchedule: defaultDefragmentationSchedule, } } diff --git a/pkg/server/types.go b/pkg/server/types.go index 2bec32011..10ac5958d 100644 --- a/pkg/server/types.go +++ b/pkg/server/types.go @@ -17,9 +17,9 @@ package server import ( "github.com/gardener/etcd-backup-restore/pkg/compressor" "github.com/gardener/etcd-backup-restore/pkg/etcdutil" - "github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer" "github.com/gardener/etcd-backup-restore/pkg/snapshot/snapshotter" "github.com/gardener/etcd-backup-restore/pkg/snapstore" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" ) const ( @@ -33,8 +33,8 @@ type BackupRestoreComponentConfig struct { ServerConfig *HTTPServerConfig `json:"serverConfig,omitempty"` SnapshotterConfig *snapshotter.Config `json:"snapshotterConfig,omitempty"` SnapstoreConfig *snapstore.Config `json:"snapstoreConfig,omitempty"` - RestorationConfig *restorer.RestorationConfig `json:"restorationConfig,omitempty"` CompressionConfig *compressor.CompressionConfig `json:"compressionConfig,omitempty"` + RestorationConfig *brtypes.RestorationConfig `json:"restorationConfig,omitempty"` DefragmentationSchedule string `json:"defragmentationSchedule"` } diff --git a/pkg/snapshot/restorer/init.go b/pkg/snapshot/restorer/init.go deleted file mode 100644 index fd7c4eaf7..000000000 --- a/pkg/snapshot/restorer/init.go +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright (c) 2019 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package restorer - -import ( - "fmt" - "path" - - flag "github.com/spf13/pflag" - "go.etcd.io/etcd/pkg/types" -) - -// NewRestorationConfig returns the restoration config. -func NewRestorationConfig() *RestorationConfig { - return &RestorationConfig{ - InitialCluster: initialClusterFromName(defaultName), - InitialClusterToken: defaultInitialClusterToken, - RestoreDataDir: fmt.Sprintf("%s.etcd", defaultName), - InitialAdvertisePeerURLs: []string{defaultInitialAdvertisePeerURLs}, - Name: defaultName, - SkipHashCheck: false, - MaxFetchers: defaultMaxFetchers, - MaxCallSendMsgSize: defaultMaxCallSendMsgSize, - MaxRequestBytes: defaultMaxRequestBytes, - MaxTxnOps: defaultMaxTxnOps, - EmbeddedEtcdQuotaBytes: int64(defaultEmbeddedEtcdQuotaBytes), - AutoCompactionMode: defaultAutoCompactionMode, - AutoCompactionRetention: defaultAutoCompactionRetention, - } -} - -// AddFlags adds the flags to flagset. -func (c *RestorationConfig) AddFlags(fs *flag.FlagSet) { - fs.StringVar(&c.InitialCluster, "initial-cluster", c.InitialCluster, "initial cluster configuration for restore bootstrap") - fs.StringVar(&c.InitialClusterToken, "initial-cluster-token", c.InitialClusterToken, "initial cluster token for the etcd cluster during restore bootstrap") - fs.StringVarP(&c.RestoreDataDir, "data-dir", "d", c.RestoreDataDir, "path to the data directory") - fs.StringArrayVar(&c.InitialAdvertisePeerURLs, "initial-advertise-peer-urls", c.InitialAdvertisePeerURLs, "list of this member's peer URLs to advertise to the rest of the cluster") - fs.StringVar(&c.Name, "name", c.Name, "human-readable name for this member") - fs.BoolVar(&c.SkipHashCheck, "skip-hash-check", c.SkipHashCheck, "ignore snapshot integrity hash value (required if copied from data directory)") - fs.UintVar(&c.MaxFetchers, "max-fetchers", c.MaxFetchers, "maximum number of threads that will fetch delta snapshots in parallel") - fs.IntVar(&c.MaxCallSendMsgSize, "max-call-send-message-size", c.MaxCallSendMsgSize, "maximum size of message that the client sends") - fs.UintVar(&c.MaxRequestBytes, "max-request-bytes", c.MaxRequestBytes, "Maximum client request size in bytes the server will accept") - fs.UintVar(&c.MaxTxnOps, "max-txn-ops", c.MaxTxnOps, "Maximum number of operations permitted in a transaction") - fs.Int64Var(&c.EmbeddedEtcdQuotaBytes, "embedded-etcd-quota-bytes", c.EmbeddedEtcdQuotaBytes, "maximum backend quota for the embedded etcd used for applying delta snapshots") - fs.StringVar(&c.AutoCompactionMode, "auto-compaction-mode", c.AutoCompactionMode, "mode for auto-compaction: 'periodic' for duration based retention. 'revision' for revision number based retention.") - fs.StringVar(&c.AutoCompactionRetention, "auto-compaction-retention", c.AutoCompactionRetention, "Auto-compaction retention length.") -} - -// Validate validates the config. -func (c *RestorationConfig) Validate() error { - if _, err := types.NewURLsMap(c.InitialCluster); err != nil { - return fmt.Errorf("failed creating url map for restore cluster: %v", err) - } - if _, err := types.NewURLs(c.InitialAdvertisePeerURLs); err != nil { - return fmt.Errorf("failed parsing peers urls for restore cluster: %v", err) - } - if c.MaxCallSendMsgSize <= 0 { - return fmt.Errorf("max call send message should be greater than zero") - } - if c.MaxFetchers <= 0 { - return fmt.Errorf("max fetchers should be greater than zero") - } - if c.EmbeddedEtcdQuotaBytes <= 0 { - return fmt.Errorf("Etcd Quota size for etcd must be greater than 0") - } - if c.AutoCompactionMode != "periodic" && c.AutoCompactionMode != "revision" { - return fmt.Errorf("UnSupported auto-compaction-mode") - } - c.RestoreDataDir = path.Clean(c.RestoreDataDir) - return nil -} - -func initialClusterFromName(name string) string { - n := name - if name == "" { - n = defaultName - } - return fmt.Sprintf("%s=http://localhost:2380", n) -} diff --git a/pkg/snapshot/restorer/restorer.go b/pkg/snapshot/restorer/restorer.go index 80796b52f..e9328717e 100644 --- a/pkg/snapshot/restorer/restorer.go +++ b/pkg/snapshot/restorer/restorer.go @@ -23,7 +23,6 @@ import ( "io" "io/ioutil" "math" - "net/url" "os" "path" "path/filepath" @@ -32,10 +31,11 @@ import ( "time" "github.com/gardener/etcd-backup-restore/pkg/compressor" + "github.com/gardener/etcd-backup-restore/pkg/miscellaneous" "github.com/gardener/etcd-backup-restore/pkg/snapstore" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" "github.com/sirupsen/logrus" "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/embed" "go.etcd.io/etcd/etcdserver" "go.etcd.io/etcd/etcdserver/api/membership" "go.etcd.io/etcd/etcdserver/api/snap" @@ -54,6 +54,18 @@ import ( "go.uber.org/zap" ) +const ( + tmpDir = "/tmp" + tmpEventsDataFilePrefix = "etcd-restore-" +) + +// Restorer is a struct for etcd data directory restorer +type Restorer struct { + logger *logrus.Entry + zapLogger *zap.Logger + store snapstore.SnapStore +} + // NewRestorer returns the restorer object. func NewRestorer(store snapstore.SnapStore, logger *logrus.Entry) *Restorer { zapLogger, _ := zap.NewProduction() @@ -65,8 +77,8 @@ func NewRestorer(store snapstore.SnapStore, logger *logrus.Entry) *Restorer { } // Restore restore the etcd data directory as per specified restore options. -func (r *Restorer) Restore(ro RestoreOptions) error { - if err := r.restoreFromBaseSnapshot(ro); err != nil { +func (r *Restorer) Restore(ro brtypes.RestoreOptions) error { + if err := r.RestoreFromBaseSnapshot(ro); err != nil { return fmt.Errorf("failed to restore from the base snapshot :%v", err) } if len(ro.DeltaSnapList) == 0 { @@ -74,7 +86,7 @@ func (r *Restorer) Restore(ro RestoreOptions) error { return nil } r.logger.Infof("Starting embedded etcd server...") - e, err := StartEmbeddedEtcd(r.logger, &ro) + e, err := miscellaneous.StartEmbeddedEtcd(r.logger, &ro) if err != nil { return err } @@ -91,11 +103,11 @@ func (r *Restorer) Restore(ro RestoreOptions) error { defer client.Close() r.logger.Infof("Applying delta snapshots...") - return r.applyDeltaSnapshots(client, ro) + return r.ApplyDeltaSnapshots(client, ro) } -// restoreFromBaseSnapshot restore the etcd data directory from base snapshot. -func (r *Restorer) restoreFromBaseSnapshot(ro RestoreOptions) error { +// RestoreFromBaseSnapshot restore the etcd data directory from base snapshot. +func (r *Restorer) RestoreFromBaseSnapshot(ro brtypes.RestoreOptions) error { var err error if path.Join(ro.BaseSnapshot.SnapDir, ro.BaseSnapshot.SnapName) == "" { r.logger.Warnf("Base snapshot path not provided. Will do nothing.") @@ -131,8 +143,8 @@ func (r *Restorer) restoreFromBaseSnapshot(ro RestoreOptions) error { } // makeDB copies the database snapshot to the snapshot directory. -func (r *Restorer) makeDB(snapdir string, snap snapstore.Snapshot, commit int, skipHashCheck bool) error { - rc, err := r.store.Fetch(snap) +func (r *Restorer) makeDB(snapdir string, snap *snapstore.Snapshot, commit int, skipHashCheck bool) error { + rc, err := r.store.Fetch(*snap) if err != nil { return err } @@ -221,8 +233,9 @@ func (r *Restorer) makeDB(snapdir string, snap snapstore.Snapshot, commit int, s be := backend.NewDefaultBackend(dbPath) // a lessor that never times out leases lessor := lease.NewLessor(r.zapLogger, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64}) - s := mvcc.NewStore(r.zapLogger, be, lessor, (*initIndex)(&commit), mvcc.StoreConfig{}) + s := mvcc.NewStore(r.zapLogger, be, lessor, (*brtypes.InitIndex)(&commit), mvcc.StoreConfig{}) trace := traceutil.New("write", r.zapLogger) + txn := s.Write(trace) btx := be.BatchTx() del := func(k, v []byte) error { @@ -329,46 +342,8 @@ func makeWALAndSnap(logger *zap.Logger, waldir, snapdir string, cl *membership.R return w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term}) } -// StartEmbeddedEtcd starts the embedded etcd server. -func StartEmbeddedEtcd(logger *logrus.Entry, ro *RestoreOptions) (*embed.Etcd, error) { - cfg := embed.NewConfig() - cfg.Dir = filepath.Join(ro.Config.RestoreDataDir) - DefaultListenPeerURLs := "http://localhost:0" - DefaultListenClientURLs := "http://localhost:0" - DefaultInitialAdvertisePeerURLs := "http://localhost:0" - DefaultAdvertiseClientURLs := "http://localhost:0" - lpurl, _ := url.Parse(DefaultListenPeerURLs) - apurl, _ := url.Parse(DefaultInitialAdvertisePeerURLs) - lcurl, _ := url.Parse(DefaultListenClientURLs) - acurl, _ := url.Parse(DefaultAdvertiseClientURLs) - cfg.LPUrls = []url.URL{*lpurl} - cfg.LCUrls = []url.URL{*lcurl} - cfg.APUrls = []url.URL{*apurl} - cfg.ACUrls = []url.URL{*acurl} - cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) - cfg.QuotaBackendBytes = ro.Config.EmbeddedEtcdQuotaBytes - cfg.MaxRequestBytes = ro.Config.MaxRequestBytes - cfg.MaxTxnOps = ro.Config.MaxTxnOps - cfg.Logger = "zap" - cfg.AutoCompactionMode = ro.Config.AutoCompactionMode - cfg.AutoCompactionRetention = ro.Config.AutoCompactionRetention - e, err := embed.StartEtcd(cfg) - if err != nil { - return nil, err - } - select { - case <-e.Server.ReadyNotify(): - logger.Infof("Embedded server is ready to listen client at: %s", e.Clients[0].Addr()) - case <-time.After(60 * time.Second): - e.Server.Stop() // trigger a shutdown - e.Close() - return nil, fmt.Errorf("server took too long to start") - } - return e, nil -} - -// applyDeltaSnapshots fetches the events from delta snapshots in parallel and applies them to the embedded etcd sequentially. -func (r *Restorer) applyDeltaSnapshots(client *clientv3.Client, ro RestoreOptions) error { +// ApplyDeltaSnapshots fetches the events from delta snapshots in parallel and applies them to the embedded etcd sequentially. +func (r *Restorer) ApplyDeltaSnapshots(client *clientv3.Client, ro brtypes.RestoreOptions) error { snapList := ro.DeltaSnapList numMaxFetchers := ro.Config.MaxFetchers @@ -392,8 +367,8 @@ func (r *Restorer) applyDeltaSnapshots(client *clientv3.Client, ro RestoreOption numFetchers = int(math.Min(float64(numMaxFetchers), float64(numSnaps))) snapLocationsCh = make(chan string, numSnaps) errCh = make(chan error, numFetchers+1) - fetcherInfoCh = make(chan fetcherInfo, numSnaps) - applierInfoCh = make(chan applierInfo, numSnaps) + fetcherInfoCh = make(chan brtypes.FetcherInfo, numSnaps) + applierInfoCh = make(chan brtypes.ApplierInfo, numSnaps) stopCh = make(chan bool) wg sync.WaitGroup ) @@ -405,7 +380,7 @@ func (r *Restorer) applyDeltaSnapshots(client *clientv3.Client, ro RestoreOption } for i, snap := range remainingSnaps { - fetcherInfo := fetcherInfo{ + fetcherInfo := brtypes.FetcherInfo{ Snapshot: *snap, SnapIndex: i, } @@ -443,7 +418,7 @@ func (r *Restorer) cleanup(snapLocationsCh chan string, stopCh chan bool, wg *sy } // fetchSnaps fetches delta snapshots as events and persists them onto disk. -func (r *Restorer) fetchSnaps(fetcherIndex int, fetcherInfoCh <-chan fetcherInfo, applierInfoCh chan<- applierInfo, snapLocationsCh chan<- string, errCh chan<- error, stopCh chan bool, wg *sync.WaitGroup) { +func (r *Restorer) fetchSnaps(fetcherIndex int, fetcherInfoCh <-chan brtypes.FetcherInfo, applierInfoCh chan<- brtypes.ApplierInfo, snapLocationsCh chan<- string, errCh chan<- error, stopCh chan bool, wg *sync.WaitGroup) { defer wg.Done() wg.Add(1) @@ -459,20 +434,20 @@ func (r *Restorer) fetchSnaps(fetcherIndex int, fetcherInfoCh <-chan fetcherInfo eventsData, err := r.getEventsDataFromDeltaSnapshot(fetcherInfo.Snapshot) if err != nil { errCh <- fmt.Errorf("failed to read events data from delta snapshot %s : %v", fetcherInfo.Snapshot.SnapName, err) - applierInfoCh <- applierInfo{SnapIndex: -1} // cannot use close(ch) as concurrent fetchSnaps routines might try to send on channel, causing a panic + applierInfoCh <- brtypes.ApplierInfo{SnapIndex: -1} // cannot use close(ch) as concurrent fetchSnaps routines might try to send on channel, causing a panic return } eventsFilePath, err := persistDeltaSnapshot(eventsData) if err != nil { errCh <- fmt.Errorf("failed to persist events data for delta snapshot %s : %v", fetcherInfo.Snapshot.SnapName, err) - applierInfoCh <- applierInfo{SnapIndex: -1} + applierInfoCh <- brtypes.ApplierInfo{SnapIndex: -1} return } snapLocationsCh <- eventsFilePath // used for cleanup later - applierInfo := applierInfo{ + applierInfo := brtypes.ApplierInfo{ EventsFilePath: eventsFilePath, SnapIndex: fetcherInfo.SnapIndex, } @@ -482,7 +457,7 @@ func (r *Restorer) fetchSnaps(fetcherIndex int, fetcherInfoCh <-chan fetcherInfo } // applySnaps applies delta snapshot events to the embedded etcd sequentially, in the right order of snapshots, regardless of the order in which they were fetched. -func (r *Restorer) applySnaps(client *clientv3.Client, remainingSnaps snapstore.SnapList, applierInfoCh <-chan applierInfo, errCh chan<- error, stopCh <-chan bool, wg *sync.WaitGroup) { +func (r *Restorer) applySnaps(client *clientv3.Client, remainingSnaps snapstore.SnapList, applierInfoCh <-chan brtypes.ApplierInfo, errCh chan<- error, stopCh <-chan bool, wg *sync.WaitGroup) { defer wg.Done() wg.Add(1) @@ -526,7 +501,7 @@ func (r *Restorer) applySnaps(client *clientv3.Client, remainingSnaps snapstore. if err = os.Remove(filePath); err != nil { r.logger.Warnf("Unable to remove file: %s; err: %v", filePath, err) } - events := []event{} + events := []brtypes.Event{} if err = json.Unmarshal(eventsData, &events); err != nil { errCh <- fmt.Errorf("failed to read events from events data for delta snapshot %s : %v", snapName, err) return @@ -548,7 +523,7 @@ func (r *Restorer) applySnaps(client *clientv3.Client, remainingSnaps snapstore. } // applyEventsAndVerify applies events from one snapshot to the embedded etcd and verifies the correctness of the sequence of snapshot applied. -func applyEventsAndVerify(client *clientv3.Client, events []event, snap *snapstore.Snapshot) error { +func applyEventsAndVerify(client *clientv3.Client, events []brtypes.Event, snap *snapstore.Snapshot) error { if err := applyEventsToEtcd(client, events); err != nil { return fmt.Errorf("failed to apply events to etcd for delta snapshot %s : %v", snap.SnapName, err) } @@ -591,13 +566,13 @@ func (r *Restorer) applyFirstDeltaSnapshot(client *clientv3.Client, snap snapsto } // getEventsFromDeltaSnapshot returns the events from delta snapshot from snap store. -func (r *Restorer) getEventsFromDeltaSnapshot(snap snapstore.Snapshot) ([]event, error) { +func (r *Restorer) getEventsFromDeltaSnapshot(snap snapstore.Snapshot) ([]brtypes.Event, error) { data, err := r.getEventsDataFromDeltaSnapshot(snap) if err != nil { return nil, err } - events := []event{} + events := []brtypes.Event{} if err := json.Unmarshal(data, &events); err != nil { return nil, err } @@ -678,7 +653,7 @@ func persistDeltaSnapshot(data []byte) (string, error) { } // applyEventsToEtcd performs operations in events sequentially. -func applyEventsToEtcd(client *clientv3.Client, events []event) error { +func applyEventsToEtcd(client *clientv3.Client, events []brtypes.Event) error { var ( lastRev int64 ops = []clientv3.Op{} diff --git a/pkg/snapshot/restorer/restorer_suite_test.go b/pkg/snapshot/restorer/restorer_suite_test.go index 447098248..848f80fdb 100644 --- a/pkg/snapshot/restorer/restorer_suite_test.go +++ b/pkg/snapshot/restorer/restorer_suite_test.go @@ -23,11 +23,7 @@ import ( "time" "github.com/gardener/etcd-backup-restore/pkg/compressor" - "github.com/gardener/etcd-backup-restore/pkg/wrappers" - "github.com/gardener/etcd-backup-restore/pkg/etcdutil" - "github.com/gardener/etcd-backup-restore/pkg/snapshot/snapshotter" - "github.com/gardener/etcd-backup-restore/pkg/snapstore" "github.com/gardener/etcd-backup-restore/test/utils" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -80,8 +76,9 @@ var _ = SynchronizedBeforeSuite(func() []byte { deltaSnapshotPeriod := time.Second ctx := utils.ContextWithWaitGroupFollwedByGracePeriod(populatorCtx, wg, deltaSnapshotPeriod+2*time.Second) + compressionConfig := compressor.NewCompressorConfig() - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ctx.Done(), true, compressionConfig) + err = utils.RunSnapshotter(logger, snapstoreDir, deltaSnapshotPeriod, endpoints, ctx.Done(), true, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) keyTo = resp.KeyTo @@ -104,31 +101,3 @@ func cleanUp() { Expect(err).ShouldNot(HaveOccurred()) } - -// runSnapshotter creates a snapshotter object and runs it for a duration specified by 'snapshotterDurationSeconds' -func runSnapshotter(logger *logrus.Entry, deltaSnapshotPeriod time.Duration, endpoints []string, stopCh <-chan struct{}, startWithFullSnapshot bool, compressionConfig *compressor.CompressionConfig) error { - store, err := snapstore.GetSnapstore(&snapstore.Config{Container: snapstoreDir, Provider: "Local"}) - if err != nil { - return err - } - - etcdConnectionConfig := etcdutil.NewEtcdConnectionConfig() - etcdConnectionConfig.ConnectionTimeout.Duration = 10 * time.Second - etcdConnectionConfig.Endpoints = endpoints - - snapshotterConfig := &snapshotter.Config{ - FullSnapshotSchedule: "0 0 1 1 *", - DeltaSnapshotPeriod: wrappers.Duration{Duration: deltaSnapshotPeriod}, - DeltaSnapshotMemoryLimit: snapshotter.DefaultDeltaSnapMemoryLimit, - GarbageCollectionPeriod: wrappers.Duration{Duration: time.Minute}, - GarbageCollectionPolicy: snapshotter.GarbageCollectionPolicyLimitBased, - MaxBackups: 1, - } - - ssr, err := snapshotter.NewSnapshotter(logger, snapshotterConfig, store, etcdConnectionConfig, compressionConfig) - if err != nil { - return err - } - - return ssr.Run(stopCh, startWithFullSnapshot) -} diff --git a/pkg/snapshot/restorer/restorer_test.go b/pkg/snapshot/restorer/restorer_test.go index 95afc16a9..8878ed73d 100644 --- a/pkg/snapshot/restorer/restorer_test.go +++ b/pkg/snapshot/restorer/restorer_test.go @@ -27,6 +27,7 @@ import ( "github.com/gardener/etcd-backup-restore/pkg/compressor" "github.com/gardener/etcd-backup-restore/pkg/miscellaneous" "github.com/gardener/etcd-backup-restore/pkg/snapstore" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" "github.com/gardener/etcd-backup-restore/test/utils" "github.com/sirupsen/logrus" "go.etcd.io/etcd/clientv3" @@ -72,7 +73,7 @@ var _ = Describe("Running Restorer", func() { }) Describe("For pre-loaded Snapstore", func() { - var restoreOpts RestoreOptions + var restoreOpts brtypes.RestoreOptions BeforeEach(func() { err = corruptEtcdDir() @@ -85,8 +86,8 @@ var _ = Describe("Running Restorer", func() { Expect(err).ShouldNot(HaveOccurred()) rstr = NewRestorer(store, logger) - restoreOpts = RestoreOptions{ - Config: &RestorationConfig{ + restoreOpts = brtypes.RestoreOptions{ + Config: &brtypes.RestorationConfig{ RestoreDataDir: etcdDir, InitialClusterToken: restoreClusterToken, InitialCluster: restoreCluster, @@ -101,7 +102,7 @@ var _ = Describe("Running Restorer", func() { AutoCompactionMode: autoCompactionMode, AutoCompactionRetention: autoCompactionRetention, }, - BaseSnapshot: *baseSnapshot, + BaseSnapshot: baseSnapshot, DeltaSnapList: deltaSnapList, ClusterURLs: clusterUrlsMap, PeerURLs: peerUrls, @@ -233,7 +234,7 @@ var _ = Describe("Running Restorer", func() { store snapstore.SnapStore deltaSnapshotPeriod time.Duration endpoints []string - restorationConfig *RestorationConfig + restorationConfig *brtypes.RestorationConfig ) BeforeEach(func() { @@ -245,7 +246,7 @@ var _ = Describe("Running Restorer", func() { store, err = snapstore.GetSnapstore(&snapstore.Config{Container: snapstoreDir, Provider: "Local"}) Expect(err).ShouldNot(HaveOccurred()) - restorationConfig = &RestorationConfig{ + restorationConfig = &brtypes.RestorationConfig{ RestoreDataDir: etcdDir, InitialClusterToken: restoreClusterToken, InitialCluster: restoreCluster, @@ -281,7 +282,7 @@ var _ = Describe("Running Restorer", func() { logger.Infoln("Starting snapshotter with basesnapshot set to false") ssrCtx := utils.ContextWithWaitGroupFollwedByGracePeriod(testCtx, wg, 2) compressionConfig := compressor.NewCompressorConfig() - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ssrCtx.Done(), startWithFullSnapshot, compressionConfig) + err = utils.RunSnapshotter(logger, snapstoreDir, deltaSnapshotPeriod, endpoints, ssrCtx.Done(), startWithFullSnapshot, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) etcd.Server.Stop() etcd.Close() @@ -295,15 +296,18 @@ var _ = Describe("Running Restorer", func() { logger.Infof("Base snapshot is %v", baseSnapshot) rstr = NewRestorer(store, logger) - restoreOpts := RestoreOptions{ + restoreOpts := brtypes.RestoreOptions{ Config: restorationConfig, + BaseSnapshot: baseSnapshot, DeltaSnapList: deltaSnapList, ClusterURLs: clusterUrlsMap, PeerURLs: peerUrls, } - restoreOpts.BaseSnapshot.SnapDir = "" - restoreOpts.BaseSnapshot.SnapName = "" + if baseSnapshot != nil { + restoreOpts.BaseSnapshot.SnapDir = "" + restoreOpts.BaseSnapshot.SnapName = "" + } err := rstr.Restore(restoreOpts) @@ -323,7 +327,7 @@ var _ = Describe("Running Restorer", func() { defer cancelPopulator() ssrCtx := utils.ContextWithWaitGroupFollwedByGracePeriod(testCtx, wg, time.Second) compressionConfig := compressor.NewCompressorConfig() - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ssrCtx.Done(), true, compressionConfig) + err = utils.RunSnapshotter(logger, snapstoreDir, deltaSnapshotPeriod, endpoints, ssrCtx.Done(), true, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) etcd.Server.Stop() etcd.Close() @@ -337,9 +341,9 @@ var _ = Describe("Running Restorer", func() { rstr = NewRestorer(store, logger) - restoreOpts := RestoreOptions{ + restoreOpts := brtypes.RestoreOptions{ Config: restorationConfig, - BaseSnapshot: *baseSnapshot, + BaseSnapshot: baseSnapshot, DeltaSnapList: deltaSnapList, ClusterURLs: clusterUrlsMap, PeerURLs: peerUrls, @@ -361,7 +365,7 @@ var _ = Describe("Running Restorer", func() { defer cancelPopulator() ssrCtx := utils.ContextWithWaitGroupFollwedByGracePeriod(testCtx, wg, time.Second) compressionConfig := compressor.NewCompressorConfig() - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ssrCtx.Done(), true, compressionConfig) + err = utils.RunSnapshotter(logger, snapstoreDir, deltaSnapshotPeriod, endpoints, ssrCtx.Done(), true, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) etcd.Close() @@ -380,9 +384,9 @@ var _ = Describe("Running Restorer", func() { rstr = NewRestorer(store, logger) - restoreOpts := RestoreOptions{ + restoreOpts := brtypes.RestoreOptions{ Config: restorationConfig, - BaseSnapshot: *baseSnapshot, + BaseSnapshot: baseSnapshot, DeltaSnapList: deltaSnapList, ClusterURLs: clusterUrlsMap, PeerURLs: peerUrls, @@ -407,7 +411,7 @@ var _ = Describe("Running Restorer", func() { defer cancelPopulator() ssrCtx := utils.ContextWithWaitGroupFollwedByGracePeriod(testCtx, wg, 2*time.Second) compressionConfig := compressor.NewCompressorConfig() - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ssrCtx.Done(), true, compressionConfig) + err = utils.RunSnapshotter(logger, snapstoreDir, deltaSnapshotPeriod, endpoints, ssrCtx.Done(), true, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) etcd.Close() @@ -416,9 +420,9 @@ var _ = Describe("Running Restorer", func() { rstr = NewRestorer(store, logger) - restoreOpts := RestoreOptions{ + restoreOpts := brtypes.RestoreOptions{ Config: restorationConfig, - BaseSnapshot: *baseSnapshot, + BaseSnapshot: baseSnapshot, DeltaSnapList: deltaSnapList, ClusterURLs: clusterUrlsMap, PeerURLs: peerUrls, @@ -445,7 +449,7 @@ var _ = Describe("Running Restorer", func() { logger.Infoln("Starting snapshotter while loading is happening") compressionConfig := compressor.NewCompressorConfig() - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ssrCtx.Done(), true, compressionConfig) + err = utils.RunSnapshotter(logger, snapstoreDir, deltaSnapshotPeriod, endpoints, ssrCtx.Done(), true, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) time.Sleep(time.Duration(5 * time.Second)) @@ -463,9 +467,9 @@ var _ = Describe("Running Restorer", func() { rstr = NewRestorer(store, logger) - restoreOpts := RestoreOptions{ + restoreOpts := brtypes.RestoreOptions{ Config: restorationConfig, - BaseSnapshot: *baseSnapshot, + BaseSnapshot: baseSnapshot, DeltaSnapList: deltaSnapList, ClusterURLs: clusterUrlsMap, PeerURLs: peerUrls, @@ -494,7 +498,7 @@ var _ = Describe("Running Restorer", func() { compressionConfig := compressor.NewCompressorConfig() compressionConfig.Enabled = false ctx, cancel := context.WithTimeout(testCtx, time.Duration(2*time.Second)) - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ctx.Done(), true, compressionConfig) + err = utils.RunSnapshotter(logger, snapstoreDir, deltaSnapshotPeriod, endpoints, ctx.Done(), true, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) cancel() @@ -508,7 +512,7 @@ var _ = Describe("Running Restorer", func() { compressionConfig.Enabled = true compressionConfig.CompressionPolicy = "lzw" ctx, cancel = context.WithTimeout(testCtx, time.Duration(2*time.Second)) - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ctx.Done(), false, compressionConfig) + err = utils.RunSnapshotter(logger, snapstoreDir, deltaSnapshotPeriod, endpoints, ctx.Done(), false, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) cancel() @@ -521,7 +525,7 @@ var _ = Describe("Running Restorer", func() { compressionConfig = compressor.NewCompressorConfig() compressionConfig.Enabled = true ctx, cancel = context.WithTimeout(testCtx, time.Duration(2*time.Second)) - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ctx.Done(), false, compressionConfig) + err = utils.RunSnapshotter(logger, snapstoreDir, deltaSnapshotPeriod, endpoints, ctx.Done(), false, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) cancel() @@ -535,7 +539,7 @@ var _ = Describe("Running Restorer", func() { compressionConfig.Enabled = true compressionConfig.CompressionPolicy = "zlib" ctx, cancel = context.WithTimeout(testCtx, time.Duration(2*time.Second)) - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ctx.Done(), false, compressionConfig) + err = utils.RunSnapshotter(logger, snapstoreDir, deltaSnapshotPeriod, endpoints, ctx.Done(), false, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) cancel() @@ -548,9 +552,9 @@ var _ = Describe("Running Restorer", func() { rstr = NewRestorer(store, logger) - restoreOpts := RestoreOptions{ + restoreOpts := brtypes.RestoreOptions{ Config: restorationConfig, - BaseSnapshot: *baseSnapshot, + BaseSnapshot: baseSnapshot, DeltaSnapList: deltaSnapList, ClusterURLs: clusterUrlsMap, PeerURLs: peerUrls, @@ -576,7 +580,7 @@ var _ = Describe("Running Restorer", func() { compressionConfig := compressor.NewCompressorConfig() compressionConfig.Enabled = true ctx, cancel := context.WithTimeout(testCtx, time.Duration(2*time.Second)) - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ctx.Done(), true, compressionConfig) + err = utils.RunSnapshotter(logger, snapstoreDir, deltaSnapshotPeriod, endpoints, ctx.Done(), true, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) cancel() @@ -590,7 +594,7 @@ var _ = Describe("Running Restorer", func() { compressionConfig.Enabled = true compressionConfig.CompressionPolicy = "lzw" ctx, cancel = context.WithTimeout(testCtx, time.Duration(2*time.Second)) - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ctx.Done(), false, compressionConfig) + err = utils.RunSnapshotter(logger, snapstoreDir, deltaSnapshotPeriod, endpoints, ctx.Done(), false, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) cancel() @@ -604,7 +608,7 @@ var _ = Describe("Running Restorer", func() { compressionConfig.Enabled = true compressionConfig.CompressionPolicy = "zlib" ctx, cancel = context.WithTimeout(testCtx, time.Duration(2*time.Second)) - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ctx.Done(), false, compressionConfig) + err = utils.RunSnapshotter(logger, snapstoreDir, deltaSnapshotPeriod, endpoints, ctx.Done(), false, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) cancel() @@ -616,7 +620,7 @@ var _ = Describe("Running Restorer", func() { // start the Snapshotter with compression not enabled to take delta snapshot. compressionConfig = compressor.NewCompressorConfig() ctx, cancel = context.WithTimeout(testCtx, time.Duration(2*time.Second)) - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ctx.Done(), false, compressionConfig) + err = utils.RunSnapshotter(logger, snapstoreDir, deltaSnapshotPeriod, endpoints, ctx.Done(), false, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) cancel() @@ -629,9 +633,9 @@ var _ = Describe("Running Restorer", func() { rstr = NewRestorer(store, logger) - restoreOpts := RestoreOptions{ + restoreOpts := brtypes.RestoreOptions{ Config: restorationConfig, - BaseSnapshot: *baseSnapshot, + BaseSnapshot: baseSnapshot, DeltaSnapList: deltaSnapList, ClusterURLs: clusterUrlsMap, PeerURLs: peerUrls, diff --git a/pkg/snapshot/restorer/types_test.go b/pkg/snapshot/restorer/types_test.go index a2fab22ee..187b80120 100644 --- a/pkg/snapshot/restorer/types_test.go +++ b/pkg/snapshot/restorer/types_test.go @@ -19,8 +19,9 @@ import ( "net/url" "time" - . "github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer" + _ "github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer" "github.com/gardener/etcd-backup-restore/pkg/snapstore" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "go.etcd.io/etcd/pkg/types" @@ -28,8 +29,8 @@ import ( var _ = Describe("restorer types", func() { var ( - makeRestorationConfig = func(s string, b bool, i int) *RestorationConfig { - return &RestorationConfig{ + makeRestorationConfig = func(s string, b bool, i int) *brtypes.RestorationConfig { + return &brtypes.RestorationConfig{ InitialCluster: s, InitialClusterToken: s, RestoreDataDir: s, @@ -40,8 +41,8 @@ var _ = Describe("restorer types", func() { EmbeddedEtcdQuotaBytes: int64(i), } } - makeSnap = func(s string, i int, t time.Time, b bool) snapstore.Snapshot { - return snapstore.Snapshot{ + makeSnap = func(s string, i int, t time.Time, b bool) *snapstore.Snapshot { + return &snapstore.Snapshot{ Kind: s, StartRevision: int64(i), LastRevision: int64(i), @@ -53,7 +54,7 @@ var _ = Describe("restorer types", func() { } makeSnapList = func(s string, i int, t time.Time, b bool) snapstore.SnapList { var s1, s2 = makeSnap(s, i, t, b), makeSnap(s, i, t, b) - return snapstore.SnapList{&s1, &s2} + return snapstore.SnapList{s1, s2} } makeURL = func(s string, b bool) url.URL { return url.URL{ @@ -77,8 +78,8 @@ var _ = Describe("restorer types", func() { } return out } - makeRestoreOptions = func(s string, i int, t time.Time, b bool) *RestoreOptions { - return &RestoreOptions{ + makeRestoreOptions = func(s string, i int, t time.Time, b bool) *brtypes.RestoreOptions { + return &brtypes.RestoreOptions{ Config: makeRestorationConfig(s, b, i), ClusterURLs: makeURLsMap(s, b), PeerURLs: makeURLs(s, b), @@ -88,14 +89,14 @@ var _ = Describe("restorer types", func() { } ) - Describe("RestorationConfig", func() { + Describe("brtypes.RestorationConfig", func() { var ( - makeA = func() *RestorationConfig { return makeRestorationConfig("a", false, 1) } - makeB = func() *RestorationConfig { return makeRestorationConfig("b", true, 2) } + makeA = func() *brtypes.RestorationConfig { return makeRestorationConfig("a", false, 1) } + makeB = func() *brtypes.RestorationConfig { return makeRestorationConfig("b", true, 2) } ) Describe("DeepCopyInto", func() { It("new out", func() { - var a, in, out = makeA(), makeA(), new(RestorationConfig) + var a, in, out = makeA(), makeA(), new(brtypes.RestorationConfig) in.DeepCopyInto(out) Expect(out).To(Equal(in)) Expect(out).ToNot(BeIdenticalTo(in)) @@ -130,10 +131,10 @@ var _ = Describe("restorer types", func() { now = time.Now() makeA = func() snapstore.SnapList { return makeSnapList("a", 1, now, false) } ) - Describe("DeepCopySnapList", func() { + Describe("brtypes.DeepCopySnapList", func() { It("out", func() { var a, in = makeA(), makeA() - var out = DeepCopySnapList(in) + var out = brtypes.DeepCopySnapList(in) Expect(out).ToNot(BeNil()) Expect(out).To(Equal(in)) Expect(out).ToNot(BeIdenticalTo(in)) @@ -147,10 +148,10 @@ var _ = Describe("restorer types", func() { var ( makeA = func() *url.URL { var u = makeURL("a", false); return &u } ) - Describe("DeepCopyURL", func() { + Describe("brtypes.DeepCopyURL", func() { It("out", func() { var a, in = makeA(), makeA() - var out = DeepCopyURL(in) + var out = brtypes.DeepCopyURL(in) Expect(out).ToNot(BeNil()) Expect(out).To(Equal(in)) Expect(out).ToNot(BeIdenticalTo(in)) @@ -164,10 +165,10 @@ var _ = Describe("restorer types", func() { var ( makeA = func() types.URLs { return makeURLs("a", false) } ) - Describe("DeepCopyURLs", func() { + Describe("brtypes.DeepCopyURLs", func() { It("out", func() { var a, in = makeA(), makeA() - var out = DeepCopyURLs(in) + var out = brtypes.DeepCopyURLs(in) Expect(out).ToNot(BeNil()) Expect(out).To(Equal(in)) Expect(out).ToNot(BeIdenticalTo(in)) @@ -177,15 +178,15 @@ var _ = Describe("restorer types", func() { }) }) - Describe("RestoreOptions", func() { + Describe("brtypes.RestoreOptions", func() { var ( now = time.Now() - makeA = func() *RestoreOptions { return makeRestoreOptions("a", 1, now, false) } - makeB = func() *RestoreOptions { return makeRestoreOptions("b", 2, now.Add(-1*time.Hour), true) } + makeA = func() *brtypes.RestoreOptions { return makeRestoreOptions("a", 1, now, false) } + makeB = func() *brtypes.RestoreOptions { return makeRestoreOptions("b", 2, now.Add(-1*time.Hour), true) } ) Describe("DeepCopyInto", func() { It("new out", func() { - var a, in, out = makeA(), makeA(), new(RestoreOptions) + var a, in, out = makeA(), makeA(), new(brtypes.RestoreOptions) in.DeepCopyInto(out) Expect(out).To(Equal(in)) Expect(out).ToNot(BeIdenticalTo(in)) diff --git a/pkg/types/compactor.go b/pkg/types/compactor.go new file mode 100644 index 000000000..6bf900354 --- /dev/null +++ b/pkg/types/compactor.go @@ -0,0 +1,54 @@ +package types + +import ( + "fmt" + "io/ioutil" + "time" +) + +// CompactionResult holds the compaction details after a compaction is done +type CompactionResult struct { + // Snapshot *snapstore.Snapshot + Path string + LastCompactionDuration time.Duration +} + +// CompactionConfig holds the configuration data for compaction +type CompactionConfig struct { + *RestorationConfig +} + +// NewCompactionConfig returns the compaction config. +func NewCompactionConfig() (*CompactionConfig, error) { + restoreDir, err := getEtcdDir("/tmp") + if err != nil { + return nil, err + } + + return &CompactionConfig{ + &RestorationConfig{ + InitialCluster: initialClusterFromName(defaultName), + InitialClusterToken: defaultInitialClusterToken, + RestoreDataDir: fmt.Sprintf("%s/%s.etcd", restoreDir, defaultName), + InitialAdvertisePeerURLs: []string{defaultInitialAdvertisePeerURLs}, + Name: defaultName, + SkipHashCheck: false, + MaxFetchers: defaultMaxFetchers, + MaxCallSendMsgSize: defaultMaxCallSendMsgSize, + MaxRequestBytes: defaultMaxRequestBytes, + MaxTxnOps: defaultMaxTxnOps, + EmbeddedEtcdQuotaBytes: int64(defaultEmbeddedEtcdQuotaBytes), + AutoCompactionMode: defaultAutoCompactionMode, + AutoCompactionRetention: defaultAutoCompactionRetention, + }, + }, nil +} + +func getEtcdDir(dir string) (string, error) { + outputDir, err := ioutil.TempDir(dir, "compactor") + if err != nil { + return "", err + } + + return outputDir, nil +} diff --git a/pkg/snapshot/restorer/types.go b/pkg/types/restorer.go old mode 100755 new mode 100644 similarity index 50% rename from pkg/snapshot/restorer/types.go rename to pkg/types/restorer.go index e93b6a949..fc2139fdc --- a/pkg/snapshot/restorer/types.go +++ b/pkg/types/restorer.go @@ -12,23 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. -package restorer +package types import ( + "fmt" "net/url" + "path" "time" "github.com/gardener/etcd-backup-restore/pkg/snapstore" - "github.com/sirupsen/logrus" + flag "github.com/spf13/pflag" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/pkg/types" - "go.uber.org/zap" ) const ( - tmpDir = "/tmp" - tmpEventsDataFilePrefix = "etcd-restore-" - defaultName = "default" defaultInitialAdvertisePeerURLs = "http://localhost:2380" defaultInitialClusterToken = "etcd-cluster" @@ -38,16 +36,9 @@ const ( defaultMaxTxnOps = 10 * 1024 defaultEmbeddedEtcdQuotaBytes = 8 * 1024 * 1024 * 1024 //8Gib defaultAutoCompactionMode = "periodic" // only 2 mode is supported: 'periodic' or 'revision' - defaultAutoCompactionRetention = "30m" + defaultAutoCompactionRetention = "5m" ) -// Restorer is a struct for etcd data directory restorer -type Restorer struct { - logger *logrus.Entry - zapLogger *zap.Logger - store snapstore.SnapStore -} - // RestoreOptions hold all snapshot restore related fields // Note: Please ensure DeepCopy and DeepCopyInto are properly implemented. type RestoreOptions struct { @@ -55,7 +46,7 @@ type RestoreOptions struct { ClusterURLs types.URLsMap PeerURLs types.URLs // Base full snapshot + delta snapshots to restore from - BaseSnapshot snapstore.Snapshot + BaseSnapshot *snapstore.Snapshot DeltaSnapList snapstore.SnapList } @@ -77,24 +68,119 @@ type RestorationConfig struct { AutoCompactionRetention string `json:"autoCompactionRetention,omitempty"` } -type initIndex int +// NewRestorationConfig returns the restoration config. +func NewRestorationConfig() *RestorationConfig { + return &RestorationConfig{ + InitialCluster: initialClusterFromName(defaultName), + InitialClusterToken: defaultInitialClusterToken, + RestoreDataDir: fmt.Sprintf("%s.etcd", defaultName), + InitialAdvertisePeerURLs: []string{defaultInitialAdvertisePeerURLs}, + Name: defaultName, + SkipHashCheck: false, + MaxFetchers: defaultMaxFetchers, + MaxCallSendMsgSize: defaultMaxCallSendMsgSize, + MaxRequestBytes: defaultMaxRequestBytes, + MaxTxnOps: defaultMaxTxnOps, + EmbeddedEtcdQuotaBytes: int64(defaultEmbeddedEtcdQuotaBytes), + AutoCompactionMode: defaultAutoCompactionMode, + AutoCompactionRetention: defaultAutoCompactionRetention, + } +} + +// AddFlags adds the flags to flagset. +func (c *RestorationConfig) AddFlags(fs *flag.FlagSet) { + fs.StringVar(&c.InitialCluster, "initial-cluster", c.InitialCluster, "initial cluster configuration for restore bootstrap") + fs.StringVar(&c.InitialClusterToken, "initial-cluster-token", c.InitialClusterToken, "initial cluster token for the etcd cluster during restore bootstrap") + fs.StringVarP(&c.RestoreDataDir, "data-dir", "d", c.RestoreDataDir, "path to the data directory") + fs.StringArrayVar(&c.InitialAdvertisePeerURLs, "initial-advertise-peer-urls", c.InitialAdvertisePeerURLs, "list of this member's peer URLs to advertise to the rest of the cluster") + fs.StringVar(&c.Name, "name", c.Name, "human-readable name for this member") + fs.BoolVar(&c.SkipHashCheck, "skip-hash-check", c.SkipHashCheck, "ignore snapshot integrity hash value (required if copied from data directory)") + fs.UintVar(&c.MaxFetchers, "max-fetchers", c.MaxFetchers, "maximum number of threads that will fetch delta snapshots in parallel") + fs.IntVar(&c.MaxCallSendMsgSize, "max-call-send-message-size", c.MaxCallSendMsgSize, "maximum size of message that the client sends") + fs.UintVar(&c.MaxRequestBytes, "max-request-bytes", c.MaxRequestBytes, "Maximum client request size in bytes the server will accept") + fs.UintVar(&c.MaxTxnOps, "max-txn-ops", c.MaxTxnOps, "Maximum number of operations permitted in a transaction") + fs.Int64Var(&c.EmbeddedEtcdQuotaBytes, "embedded-etcd-quota-bytes", c.EmbeddedEtcdQuotaBytes, "maximum backend quota for the embedded etcd used for applying delta snapshots") + fs.StringVar(&c.AutoCompactionMode, "auto-compaction-mode", c.AutoCompactionMode, "mode for auto-compaction: 'periodic' for duration based retention. 'revision' for revision number based retention.") + fs.StringVar(&c.AutoCompactionRetention, "auto-compaction-retention", c.AutoCompactionRetention, "Auto-compaction retention length.") +} + +// Validate validates the config. +func (c *RestorationConfig) Validate() error { + if _, err := types.NewURLsMap(c.InitialCluster); err != nil { + return fmt.Errorf("failed creating url map for restore cluster: %v", err) + } + if _, err := types.NewURLs(c.InitialAdvertisePeerURLs); err != nil { + return fmt.Errorf("failed parsing peers urls for restore cluster: %v", err) + } + if c.MaxCallSendMsgSize <= 0 { + return fmt.Errorf("max call send message should be greater than zero") + } + if c.MaxFetchers <= 0 { + return fmt.Errorf("max fetchers should be greater than zero") + } + if c.EmbeddedEtcdQuotaBytes <= 0 { + return fmt.Errorf("Etcd Quota size for etcd must be greater than 0") + } + if c.AutoCompactionMode != "periodic" && c.AutoCompactionMode != "revision" { + return fmt.Errorf("UnSupported auto-compaction-mode") + } + c.RestoreDataDir = path.Clean(c.RestoreDataDir) + return nil +} + +// DeepCopyInto copies the structure deeply from in to out. +func (c *RestorationConfig) DeepCopyInto(out *RestorationConfig) { + *out = *c + if c.InitialAdvertisePeerURLs != nil { + c, out := &c.InitialAdvertisePeerURLs, &out.InitialAdvertisePeerURLs + *out = make([]string, len(*c)) + for i, v := range *c { + (*out)[i] = v + } + } +} + +// DeepCopy returns a deeply copied structure. +func (c *RestorationConfig) DeepCopy() *RestorationConfig { + if c == nil { + return nil + } + + out := new(RestorationConfig) + c.DeepCopyInto(out) + return out +} + +func initialClusterFromName(name string) string { + n := name + if name == "" { + n = defaultName + } + return fmt.Sprintf("%s=http://localhost:2380", n) +} -func (i *initIndex) ConsistentIndex() uint64 { +// InitIndex stores the index +type InitIndex int + +// ConsistentIndex gets the index +func (i *InitIndex) ConsistentIndex() uint64 { return uint64(*i) } -// event is wrapper over etcd event to keep track of time of event -type event struct { +// Event is wrapper over etcd event to keep track of time of event +type Event struct { EtcdEvent *clientv3.Event `json:"etcdEvent"` Time time.Time `json:"time"` } -type fetcherInfo struct { +// FetcherInfo stores the information about fetcher +type FetcherInfo struct { Snapshot snapstore.Snapshot SnapIndex int } -type applierInfo struct { +// ApplierInfo stores the info about applier +type ApplierInfo struct { EventsFilePath string SnapIndex int } @@ -167,26 +253,3 @@ func (in *RestoreOptions) DeepCopy() *RestoreOptions { in.DeepCopyInto(out) return out } - -// DeepCopyInto copies the structure deeply from in to out. -func (in *RestorationConfig) DeepCopyInto(out *RestorationConfig) { - *out = *in - if in.InitialAdvertisePeerURLs != nil { - in, out := &in.InitialAdvertisePeerURLs, &out.InitialAdvertisePeerURLs - *out = make([]string, len(*in)) - for i, v := range *in { - (*out)[i] = v - } - } -} - -// DeepCopy returns a deeply copied structure. -func (in *RestorationConfig) DeepCopy() *RestorationConfig { - if in == nil { - return nil - } - - out := new(RestorationConfig) - in.DeepCopyInto(out) - return out -} diff --git a/test/utils/utils.go b/test/utils/utils.go index 40de38cb6..b238d5733 100644 --- a/test/utils/utils.go +++ b/test/utils/utils.go @@ -23,6 +23,11 @@ import ( "sync" "time" + "github.com/gardener/etcd-backup-restore/pkg/compressor" + "github.com/gardener/etcd-backup-restore/pkg/etcdutil" + "github.com/gardener/etcd-backup-restore/pkg/snapshot/snapshotter" + "github.com/gardener/etcd-backup-restore/pkg/snapstore" + "github.com/gardener/etcd-backup-restore/pkg/wrappers" "github.com/sirupsen/logrus" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/embed" @@ -176,3 +181,31 @@ func ContextWithWaitGroupFollwedByGracePeriod(parent context.Context, wg *sync.W ctx := ContextWithWaitGroup(parent, wg) return ContextWithGracePeriod(ctx, gracePeriod) } + +// RunSnapshotter creates a snapshotter object and runs it for a duration specified by 'snapshotterDurationSeconds' +func RunSnapshotter(logger *logrus.Entry, container string, deltaSnapshotPeriod time.Duration, endpoints []string, stopCh <-chan struct{}, startWithFullSnapshot bool, compressionConfig *compressor.CompressionConfig) error { + store, err := snapstore.GetSnapstore(&snapstore.Config{Container: container, Provider: "Local"}) + if err != nil { + return err + } + + etcdConnectionConfig := etcdutil.NewEtcdConnectionConfig() + etcdConnectionConfig.ConnectionTimeout.Duration = 10 * time.Second + etcdConnectionConfig.Endpoints = endpoints + + snapshotterConfig := &snapshotter.Config{ + FullSnapshotSchedule: "0 0 1 1 *", + DeltaSnapshotPeriod: wrappers.Duration{Duration: deltaSnapshotPeriod}, + DeltaSnapshotMemoryLimit: snapshotter.DefaultDeltaSnapMemoryLimit, + GarbageCollectionPeriod: wrappers.Duration{Duration: time.Minute}, + GarbageCollectionPolicy: snapshotter.GarbageCollectionPolicyLimitBased, + MaxBackups: 1, + } + + ssr, err := snapshotter.NewSnapshotter(logger, snapshotterConfig, store, etcdConnectionConfig, compressionConfig) + if err != nil { + return err + } + + return ssr.Run(stopCh, startWithFullSnapshot) +} diff --git a/vendor/github.com/onsi/gomega/go.mod b/vendor/github.com/onsi/gomega/go.mod index 778935141..6179333c1 100644 --- a/vendor/github.com/onsi/gomega/go.mod +++ b/vendor/github.com/onsi/gomega/go.mod @@ -1,5 +1,7 @@ module github.com/onsi/gomega +go 1.15 + require ( github.com/golang/protobuf v1.4.2 github.com/onsi/ginkgo v1.12.1