From 94b2ff601ee0d05c231829fde2d54be20cb3ad5b Mon Sep 17 00:00:00 2001 From: Abhishek Dasgupta Date: Tue, 5 Jan 2021 11:02:07 +0530 Subject: [PATCH] Added compactor subcommand. --- .gitignore | 1 + cmd/compact.go | 100 ++++++++++ cmd/initializer.go | 4 +- cmd/options.go | 50 ++++- cmd/restore.go | 5 +- cmd/root.go | 1 + pkg/compactor/compactor.go | 185 ++++++++++++++++++ pkg/compactor/compactor_suite_test.go | 97 +++++++++ pkg/compactor/compactor_test.go | 125 ++++++++++++ pkg/compressor/compressor.go | 5 +- pkg/initializer/initializer.go | 5 +- pkg/initializer/types.go | 4 +- pkg/initializer/validator/datavalidator.go | 8 +- pkg/miscellaneous/miscellaneous.go | 43 ++++ pkg/server/backuprestoreserver.go | 8 +- pkg/server/init.go | 4 +- pkg/server/types.go | 4 +- pkg/snapshot/restorer/init.go | 84 -------- pkg/snapshot/restorer/restorer.go | 101 ++++------ pkg/snapshot/restorer/restorer_suite_test.go | 35 +--- pkg/snapshot/restorer/restorer_test.go | 72 +++---- pkg/snapshot/restorer/types_test.go | 45 ++--- pkg/types/compactor.go | 92 +++++++++ .../restorer/types.go => types/restorer.go} | 142 ++++++++++---- test/utils/utils.go | 33 ++++ vendor/github.com/onsi/gomega/go.mod | 2 + 26 files changed, 952 insertions(+), 303 deletions(-) create mode 100644 cmd/compact.go create mode 100644 pkg/compactor/compactor.go create mode 100644 pkg/compactor/compactor_suite_test.go create mode 100644 pkg/compactor/compactor_test.go delete mode 100644 pkg/snapshot/restorer/init.go create mode 100644 pkg/types/compactor.go rename pkg/{snapshot/restorer/types.go => types/restorer.go} (52%) mode change 100755 => 100644 diff --git a/.gitignore b/.gitignore index b7b0b38aa..b407c3c9d 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,7 @@ test/output test/e2e_test_data default.bkp* default.etcd* +compctedsnap.bkp* # IDE config .vscode diff --git a/cmd/compact.go b/cmd/compact.go new file mode 100644 index 000000000..af9fc598c --- /dev/null +++ b/cmd/compact.go @@ -0,0 +1,100 @@ +// Copyright (c) 2018 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "context" + "fmt" + "path/filepath" + + "github.com/gardener/etcd-backup-restore/pkg/compactor" + "github.com/gardener/etcd-backup-restore/pkg/miscellaneous" + "github.com/gardener/etcd-backup-restore/pkg/snapstore" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "go.etcd.io/etcd/pkg/types" +) + +// NewCompactCommand compacts the ETCD instance +func NewCompactCommand(ctx context.Context) *cobra.Command { + opts := newCompactOptions() + // compactCmd represents the restore command + compactCmd := &cobra.Command{ + Use: "compact", + Short: "compacts multiple incremental snapshots in etcd backup into a single full snapshot", + Long: fmt.Sprintf(`Compacts an existing backup stored in snapshot store.`), + Run: func(cmd *cobra.Command, args []string) { + /* Compact operation + - Restore from all the latest snapshots (Base + Delta). + - Compact the newly created embedded ETCD instance. + - Defragment + - Save the snapshot + */ + logger := logrus.New() + if err := opts.validate(); err != nil { + logger.Fatalf("failed to validate the options: %v", err) + return + } + + opts.complete() + + clusterUrlsMap, err := types.NewURLsMap(opts.restorationConfig.InitialCluster) + if err != nil { + logger.Fatalf("failed creating url map for restore cluster: %v", err) + } + + peerUrls, err := types.NewURLs(opts.restorationConfig.InitialAdvertisePeerURLs) + if err != nil { + logger.Fatalf("failed parsing peers urls for restore cluster: %v", err) + } + + store, err := snapstore.GetSnapstore(opts.snapstoreConfig) + if err != nil { + logger.Fatalf("failed to create restore snapstore from configured storage provider: %v", err) + } + + logger.Info("Finding latest set of snapshot to recover from...") + baseSnap, deltaSnapList, err := miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store) + if err != nil { + logger.Fatalf("failed to get latest snapshot: %v", err) + } + if baseSnap == nil { + logger.Infof("No snapshot found. Will do nothing.") + return + } + + cp := compactor.NewCompactor(store, logrus.NewEntry(logger)) + + options := &brtypes.RestoreOptions{ + Config: opts.restorationConfig, + BaseSnapshot: baseSnap, + DeltaSnapList: deltaSnapList, + ClusterURLs: clusterUrlsMap, + PeerURLs: peerUrls, + } + + res, err := cp.Compact(options, opts.needDefragmentation) + if err != nil { + logger.Fatalf("Failed to restore snapshot: %v", err) + return + } + logger.Infof("Compacted snapshot is in: %v", filepath.Join(opts.snapstoreConfig.Container, res.Snapshot.SnapDir, res.Snapshot.SnapName)) + }, + } + + opts.addFlags(compactCmd.Flags()) + return compactCmd +} diff --git a/cmd/initializer.go b/cmd/initializer.go index 0a3d90bfb..b5504d211 100644 --- a/cmd/initializer.go +++ b/cmd/initializer.go @@ -20,7 +20,7 @@ import ( "github.com/gardener/etcd-backup-restore/pkg/initializer" "github.com/gardener/etcd-backup-restore/pkg/initializer/validator" - "github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" "github.com/sirupsen/logrus" "github.com/spf13/cobra" "go.etcd.io/etcd/pkg/types" @@ -64,7 +64,7 @@ func NewInitializeCommand(ctx context.Context) *cobra.Command { logger.Fatal("validation-mode can only be one of these values [full/sanity]") } - restoreOptions := &restorer.RestoreOptions{ + restoreOptions := &brtypes.RestoreOptions{ Config: opts.restorerOptions.restorationConfig, ClusterURLs: clusterUrlsMap, PeerURLs: peerUrls, diff --git a/cmd/options.go b/cmd/options.go index 28e0a4e5c..2b850aabb 100644 --- a/cmd/options.go +++ b/cmd/options.go @@ -26,14 +26,18 @@ import ( "github.com/gardener/etcd-backup-restore/pkg/snapstore" "github.com/gardener/etcd-backup-restore/pkg/initializer/validator" - "github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer" "github.com/gardener/etcd-backup-restore/pkg/server" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" "github.com/ghodss/yaml" "github.com/sirupsen/logrus" flag "github.com/spf13/pflag" ) +const ( + defaultCompactContainer = "compctedsnap.bkp" +) + type serverOptions struct { ConfigFile string Version bool @@ -127,15 +131,55 @@ func (c *initializerOptions) complete() { c.restorerOptions.complete() } +type compactOptions struct { + restorationConfig *brtypes.RestorationConfig + snapstoreConfig *snapstore.Config + needDefragmentation bool +} + +// newCompactOptions returns the validation config. +func newCompactOptions() *compactOptions { + return &compactOptions{ + restorationConfig: brtypes.NewRestorationConfig(), + snapstoreConfig: snapstore.NewSnapstoreConfig(), + needDefragmentation: true, + } +} + +// AddFlags adds the flags to flagset. +func (c *compactOptions) addFlags(fs *flag.FlagSet) { + c.restorationConfig.AddFlags(fs) + c.snapstoreConfig.AddFlags(fs) + fs.BoolVar(&c.needDefragmentation, "defragment", c.needDefragmentation, "defragment after compaction") +} + +// Validate validates the config. +func (c *compactOptions) validate() error { + if err := c.snapstoreConfig.Validate(); err != nil { + return err + } + + if err := c.snapstoreConfig.Validate(); err != nil { + return err + } + + return c.restorationConfig.Validate() +} + +// complete completes the config. +func (c *compactOptions) complete() { + c.snapstoreConfig.Complete() +} + type restorerOptions struct { - restorationConfig *restorer.RestorationConfig + restorationConfig *brtypes.RestorationConfig snapstoreConfig *snapstore.Config } // newRestorerOptions returns the validation config. func newRestorerOptions() *restorerOptions { return &restorerOptions{ - restorationConfig: restorer.NewRestorationConfig(), + restorationConfig: brtypes.NewRestorationConfig(), snapstoreConfig: snapstore.NewSnapstoreConfig(), } } diff --git a/cmd/restore.go b/cmd/restore.go index 2ab42b1f7..7891dd5b2 100644 --- a/cmd/restore.go +++ b/cmd/restore.go @@ -21,6 +21,7 @@ import ( "github.com/gardener/etcd-backup-restore/pkg/miscellaneous" "github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer" "github.com/gardener/etcd-backup-restore/pkg/snapstore" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" "github.com/sirupsen/logrus" "github.com/spf13/cobra" "go.etcd.io/etcd/pkg/types" @@ -74,9 +75,9 @@ func NewRestoreCommand(ctx context.Context) *cobra.Command { rs := restorer.NewRestorer(store, logrus.NewEntry(logger)) - options := &restorer.RestoreOptions{ + options := &brtypes.RestoreOptions{ Config: opts.restorationConfig, - BaseSnapshot: *baseSnap, + BaseSnapshot: baseSnap, DeltaSnapList: deltaSnapList, ClusterURLs: clusterUrlsMap, PeerURLs: peerUrls, diff --git a/cmd/root.go b/cmd/root.go index f0760d7c4..697222ec1 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -38,6 +38,7 @@ from previously taken snapshot.`, RootCmd.Flags().BoolVarP(&version, "version", "v", false, "print version info") RootCmd.AddCommand(NewSnapshotCommand(ctx), NewRestoreCommand(ctx), + NewCompactCommand(ctx), NewInitializeCommand(ctx), NewServerCommand(ctx)) return RootCmd diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go new file mode 100644 index 000000000..3c2103091 --- /dev/null +++ b/pkg/compactor/compactor.go @@ -0,0 +1,185 @@ +package compactor + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "time" + + "github.com/gardener/etcd-backup-restore/pkg/metrics" + "github.com/gardener/etcd-backup-restore/pkg/miscellaneous" + "github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer" + "github.com/gardener/etcd-backup-restore/pkg/snapstore" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" + "go.etcd.io/etcd/clientv3" +) + +const ( + tmpDir = "/tmp" + defaultName = "default" + defaultInitialAdvertisePeerURLs = "http://localhost:2384" + defaultInitialClusterToken = "etcd-cluster" + defaultMaxFetchers = 6 + defaultMaxCallSendMsgSize = 10 * 1024 * 1024 //10Mib + defaultMaxRequestBytes = 10 * 1024 * 1024 //10Mib + defaultMaxTxnOps = 10 * 1024 + defaultEmbeddedEtcdQuotaBytes = 8 * 1024 * 1024 * 1024 //8Gib + etcdDialTimeout = time.Second * 30 + etcdDir = tmpDir + "/compaction" + restoreClusterToken = "etcd-cluster" +) + +// Compactor holds the necessary details for compacting ETCD +type Compactor struct { + logger *logrus.Entry + store snapstore.SnapStore +} + +// NewCompactor creates compactor +func NewCompactor(store snapstore.SnapStore, logger *logrus.Entry) *Compactor { + return &Compactor{ + logger: logger, + store: store, + } +} + +// Compact is mainly responsible for applying snapshots (full + delta), compacting, drefragmenting, taking the snapshot and saving it sequentially. +func (cp *Compactor) Compact(ro *brtypes.RestoreOptions, needDefragmentation bool) (*brtypes.CompactionResult, error) { + cp.logger.Info("Start compacting") + + // If no basesnapshot is found, abort compaction as there would be nothing to compact + if ro.BaseSnapshot == nil { + cp.logger.Error("No base snapshot found. Nothing is available for compaction") + return nil, fmt.Errorf("No base snapshot found. Nothing is available for compaction") + } + + // Set a temporary etcd data directory for embedded etcd + cmpctDir, err := ioutil.TempDir("/tmp", "compactor") + if err != nil { + cp.logger.Errorf("Unable to create temporary etcd directory for compaction: %s", err.Error()) + return nil, err + } + + defer os.RemoveAll(cmpctDir) + + ro.Config.RestoreDataDir = cmpctDir + + var client *clientv3.Client + var ep []string + + // Then restore from the base snapshot + r := restorer.NewRestorer(cp.store, cp.logger) + /*if err := r.Restore(*ro); err != nil { + return nil, fmt.Errorf("Unable to restore snapshots during compaction: %v", err) + }*/ + + if err := r.RestoreFromBaseSnapshot(*ro); err != nil { + return nil, fmt.Errorf("Failed to restore from the base snapshot :%v", err) + } + + cp.logger.Infof("Starting embedded etcd server for compaction...") + e, err := miscellaneous.StartEmbeddedEtcd(cp.logger, ro) + if err != nil { + return nil, err + } + defer func() { + e.Server.Stop() + e.Close() + }() + + ep = []string{e.Clients[0].Addr().String()} + cfg := clientv3.Config{MaxCallSendMsgSize: ro.Config.MaxCallSendMsgSize, Endpoints: ep} + client, err = clientv3.New(cfg) + if err != nil { + return nil, err + } + defer client.Close() + + if len(ro.DeltaSnapList) > 0 { + cp.logger.Infof("Applying delta snapshots...") + if err := r.ApplyDeltaSnapshots(client, *ro); err != nil { + cp.logger.Warnf("Could not apply the delta snapshots: %v", err) + } + } else { + cp.logger.Infof("No delta snapshots present over base snapshot.") + } + + // Then compact ETCD + ctx := context.TODO() + revCheckCtx, cancel := context.WithTimeout(ctx, etcdDialTimeout) + getResponse, err := client.Get(revCheckCtx, "foo") + cancel() + if err != nil { + return nil, fmt.Errorf("failed to connect to client: %v", err) + } + etcdRevision := getResponse.Header.GetRevision() + + client.Compact(ctx, etcdRevision) + + // Then defrag the ETCD + if needDefragmentation { + var dbSizeBeforeDefrag, dbSizeAfterDefrag int64 + statusReqCtx, cancel := context.WithTimeout(ctx, etcdDialTimeout) + status, err := client.Status(statusReqCtx, ep[0]) + cancel() + if err != nil { + cp.logger.Warnf("Failed to get status of etcd member[%s] with error: %v", ep, err) + } else { + dbSizeBeforeDefrag = status.DbSize + } + + start := time.Now() + defragCtx, cancel := context.WithTimeout(ctx, time.Duration(60*time.Second)) + _, err = client.Defragment(defragCtx, ep[0]) + cancel() + if err != nil { + cp.logger.Errorf("Total time taken to defragment: %v", time.Now().Sub(start).Seconds()) + cp.logger.Errorf("Failed to defragment etcd member[%s] with error: %v", ep, err) + } + cp.logger.Infof("Total time taken to defragment: %v", time.Now().Sub(start).Seconds()) + cp.logger.Infof("Finished defragmenting etcd member[%s]", ep) + + statusReqCtx, cancel = context.WithTimeout(ctx, etcdDialTimeout) + status, err = client.Status(statusReqCtx, ep[0]) + cancel() + if err != nil { + cp.logger.Warnf("Failed to get status of etcd member[%s] with error: %v", ep, err) + } else { + dbSizeAfterDefrag = status.DbSize + cp.logger.Infof("Probable DB size change for etcd member [%s]: %dB -> %dB after defragmentation", ep, dbSizeBeforeDefrag, dbSizeAfterDefrag) + } + } + + // Then take snapeshot of ETCD + snapshotReqCtx, cancel := context.WithTimeout(ctx, etcdDialTimeout) + defer cancel() + rc, err := client.Snapshot(snapshotReqCtx) + if err != nil { + return nil, fmt.Errorf("Failed to create etcd snapshot out of compacted DB: %v", err) + } + cp.logger.Infof("Successfully opened snapshot reader on etcd") + + // Then save the snapshot to the store. + s := snapstore.NewSnapshot(snapstore.SnapshotKindFull, 0, etcdRevision, ro.BaseSnapshot.CompressionSuffix) + startTime := time.Now() + if err := cp.store.Save(*s, rc); err != nil { + timeTaken := time.Now().Sub(startTime).Seconds() + metrics.SnapshotDurationSeconds.With(prometheus.Labels{metrics.LabelKind: snapstore.SnapshotKindFull, metrics.LabelSucceeded: metrics.ValueSucceededFalse}).Observe(timeTaken) + return nil, fmt.Errorf("failed to save snapshot: %v", err) + } + timeTaken := time.Now().Sub(startTime).Seconds() + metrics.SnapshotDurationSeconds.With(prometheus.Labels{metrics.LabelKind: snapstore.SnapshotKindFull, metrics.LabelSucceeded: metrics.ValueSucceededTrue}).Observe(timeTaken) + cp.logger.Infof("Total time to save snapshot: %f seconds.", timeTaken) + + compactionDuration, err := time.ParseDuration(fmt.Sprintf("%fs", timeTaken)) + if err != nil { + cp.logger.Warnf("Could not record compaction duration: %v", err) + } + return &brtypes.CompactionResult{ + Snapshot: s, + LastCompactionDuration: compactionDuration, + }, nil +} diff --git a/pkg/compactor/compactor_suite_test.go b/pkg/compactor/compactor_suite_test.go new file mode 100644 index 000000000..ca9cd8b58 --- /dev/null +++ b/pkg/compactor/compactor_suite_test.go @@ -0,0 +1,97 @@ +// Copyright (c) 2018 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compactor_test + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "sync" + "testing" + "time" + + "github.com/gardener/etcd-backup-restore/pkg/compressor" + "github.com/gardener/etcd-backup-restore/test/utils" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/sirupsen/logrus" + "go.etcd.io/etcd/embed" +) + +var ( + testSuitDir, testEtcdDir, testSnapshotDir string + testCtx = context.Background() + logger = logrus.New().WithField("suite", "compactor") + etcd *embed.Etcd + err error + keyTo int + endpoints []string +) + +func TestCompactor(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Compactor Suite") +} + +var _ = SynchronizedBeforeSuite(func() []byte { + var ( + data []byte + ) + + testSuitDir, err = ioutil.TempDir("/tmp", "compactor-test") + Expect(err).ShouldNot(HaveOccurred()) + + testEtcdDir := fmt.Sprintf("%s/etcd/default.etcd", testSuitDir) + testSnapshotDir := fmt.Sprintf("%s/etcd/snapshotter.bkp", testSuitDir) + + logger.Infof("ETCD Directory is: %s", testEtcdDir) + logger.Infof("Snapshot Directory is: %s", testSnapshotDir) + + etcd, err = utils.StartEmbeddedEtcd(testCtx, testEtcdDir, logger) + Expect(err).ShouldNot(HaveOccurred()) + endpoints = []string{etcd.Clients[0].Addr().String()} + logger.Infof("endpoints: %s", endpoints) + populatorCtx, cancelPopulator := context.WithTimeout(testCtx, 15*time.Second) + defer cancelPopulator() + resp := &utils.EtcdDataPopulationResponse{} + wg := &sync.WaitGroup{} + wg.Add(1) + go utils.PopulateEtcdWithWaitGroup(populatorCtx, wg, logger, endpoints, resp) + + deltaSnapshotPeriod := time.Second + ctx := utils.ContextWithWaitGroupFollwedByGracePeriod(populatorCtx, wg, deltaSnapshotPeriod+2*time.Second) + compressionConfig := compressor.NewCompressorConfig() + compressionConfig.Enabled = true + compressionConfig.CompressionPolicy = "gzip" + err = utils.RunSnapshotter(logger, testSnapshotDir, deltaSnapshotPeriod, endpoints, ctx.Done(), true, compressionConfig) + Expect(err).ShouldNot(HaveOccurred()) + + keyTo = resp.KeyTo + return data + +}, func(data []byte) {}) + +var _ = SynchronizedAfterSuite(func() {}, cleanUp) + +func cleanUp() { + logger.Info("Stop the Embedded etcd server.") + etcd.Server.Stop() + etcd.Close() + + logger.Infof("All tests are done for compactor suite. %s is being removed.", testSuitDir) + err = os.RemoveAll(testSuitDir) + Expect(err).ShouldNot(HaveOccurred()) +} diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go new file mode 100644 index 000000000..aa58a3010 --- /dev/null +++ b/pkg/compactor/compactor_test.go @@ -0,0 +1,125 @@ +package compactor_test + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/gardener/etcd-backup-restore/pkg/compactor" + "github.com/gardener/etcd-backup-restore/pkg/miscellaneous" + "github.com/gardener/etcd-backup-restore/pkg/snapstore" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "go.etcd.io/etcd/pkg/types" +) + +var _ = Describe("Running Compactor", func() { + var ( + dir string + store snapstore.SnapStore + cptr *compactor.Compactor + restorePeerURLs []string + clusterUrlsMap types.URLsMap + peerUrls types.URLs + baseSnapshot *snapstore.Snapshot + deltaSnapList snapstore.SnapList + //wg *sync.WaitGroup + ) + const ( + restoreName string = "default" + restoreClusterToken string = "etcd-cluster" + restoreCluster string = "default=http://localhost:2380" + skipHashCheck bool = false + maxFetchers uint = 6 + maxCallSendMsgSize = 2 * 1024 * 1024 //2Mib + maxRequestBytes = 2 * 1024 * 1024 //2Mib + maxTxnOps = 2 * 1024 + embeddedEtcdQuotaBytes int64 = 8 * 1024 * 1024 * 1024 + ) + + BeforeEach(func() { + //wg = &sync.WaitGroup{} + restorePeerURLs = []string{"http://localhost:2380"} + clusterUrlsMap, err = types.NewURLsMap(restoreCluster) + Expect(err).ShouldNot(HaveOccurred()) + peerUrls, err = types.NewURLs(restorePeerURLs) + Expect(err).ShouldNot(HaveOccurred()) + }) + + Describe("Compact while a etcd server is running", func() { + var restoreOpts *brtypes.RestoreOptions + + BeforeEach(func() { + dir = fmt.Sprintf("%s/etcd/snapshotter.bkp", testSuitDir) + + store, err = snapstore.GetSnapstore(&snapstore.Config{Container: dir, Provider: "Local"}) + Expect(err).ShouldNot(HaveOccurred()) + fmt.Println("The store where compaction will save snapshot is: ", store) + + baseSnapshot, deltaSnapList, err = miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store) + Expect(err).ShouldNot(HaveOccurred()) + + cptr = compactor.NewCompactor(store, logger) + restoreOpts = &brtypes.RestoreOptions{ + Config: &brtypes.RestorationConfig{ + InitialClusterToken: restoreClusterToken, + InitialCluster: restoreCluster, + Name: restoreName, + InitialAdvertisePeerURLs: restorePeerURLs, + SkipHashCheck: skipHashCheck, + MaxFetchers: maxFetchers, + MaxCallSendMsgSize: maxCallSendMsgSize, + MaxRequestBytes: maxRequestBytes, + MaxTxnOps: maxTxnOps, + EmbeddedEtcdQuotaBytes: embeddedEtcdQuotaBytes, + }, + BaseSnapshot: baseSnapshot, + DeltaSnapList: deltaSnapList, + ClusterURLs: clusterUrlsMap, + PeerURLs: peerUrls, + } + }) + + AfterEach(func() { + }) + + Context("with defragmention allowed", func() { + It("should create a snapshot", func() { + restoreOpts.Config.MaxFetchers = 4 + + res, err := cptr.Compact(restoreOpts, true) + Expect(err).ShouldNot(HaveOccurred()) + + fi, err := os.Stat(filepath.Join(dir, res.Snapshot.SnapDir, res.Snapshot.SnapName)) + Expect(err).ShouldNot(HaveOccurred()) + + size := fi.Size() + Expect(size).ShouldNot(BeZero()) + }) + }) + Context("with defragmention not allowed", func() { + It("should create a snapshot", func() { + restoreOpts.Config.MaxFetchers = 4 + + res, err := cptr.Compact(restoreOpts, false) + Expect(err).ShouldNot(HaveOccurred()) + + fi, err := os.Stat(filepath.Join(dir, res.Snapshot.SnapDir, res.Snapshot.SnapName)) + Expect(err).ShouldNot(HaveOccurred()) + + size := fi.Size() + Expect(size).ShouldNot(BeZero()) + }) + }) + Context("with no basesnapshot in backup directory", func() { + It("should not run compaction", func() { + restoreOpts.Config.MaxFetchers = 4 + restoreOpts.BaseSnapshot = nil + + _, err := cptr.Compact(restoreOpts, false) + Expect(err).Should(HaveOccurred()) + }) + }) + }) +}) diff --git a/pkg/compressor/compressor.go b/pkg/compressor/compressor.go index 1d1d4bc7c..38cdb2eec 100644 --- a/pkg/compressor/compressor.go +++ b/pkg/compressor/compressor.go @@ -55,11 +55,12 @@ func CompressSnapshot(data io.ReadCloser, compressionPolicy string) (io.ReadClos defer pWriter.CloseWithError(err) defer gWriter.Close() defer data.Close() - _, err = io.Copy(gWriter, data) + n, err := io.Copy(gWriter, data) if err != nil { - logger.Infof("compression failed: %v", err) + logger.Errorf("compression failed: %v", err) return } + logger.Infof("Total written bytes: %v", n) }() return pReader, nil diff --git a/pkg/initializer/initializer.go b/pkg/initializer/initializer.go index bce01ff06..5387e6596 100644 --- a/pkg/initializer/initializer.go +++ b/pkg/initializer/initializer.go @@ -25,6 +25,7 @@ import ( "github.com/gardener/etcd-backup-restore/pkg/miscellaneous" "github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer" "github.com/gardener/etcd-backup-restore/pkg/snapstore" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "go.uber.org/zap" @@ -69,7 +70,7 @@ func (e *EtcdInitializer) Initialize(mode validator.Mode, failBelowRevision int6 } //NewInitializer creates an etcd initializer object. -func NewInitializer(options *restorer.RestoreOptions, snapstoreConfig *snapstore.Config, logger *logrus.Logger) *EtcdInitializer { +func NewInitializer(options *brtypes.RestoreOptions, snapstoreConfig *snapstore.Config, logger *logrus.Logger) *EtcdInitializer { zapLogger, _ := zap.NewProduction() etcdInit := &EtcdInitializer{ Config: &Config{ @@ -121,7 +122,7 @@ func (e *EtcdInitializer) restoreCorruptData() (bool, error) { return e.restoreWithEmptySnapstore() } - tempRestoreOptions.BaseSnapshot = *baseSnap + tempRestoreOptions.BaseSnapshot = baseSnap tempRestoreOptions.DeltaSnapList = deltaSnapList tempRestoreOptions.Config.RestoreDataDir = fmt.Sprintf("%s.%s", tempRestoreOptions.Config.RestoreDataDir, "part") diff --git a/pkg/initializer/types.go b/pkg/initializer/types.go index 4bcfc2f90..8af7e4d62 100644 --- a/pkg/initializer/types.go +++ b/pkg/initializer/types.go @@ -16,8 +16,8 @@ package initializer import ( "github.com/gardener/etcd-backup-restore/pkg/initializer/validator" - "github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer" "github.com/gardener/etcd-backup-restore/pkg/snapstore" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" "github.com/sirupsen/logrus" ) @@ -25,7 +25,7 @@ import ( // checks and snapshot restoration in case of corruption. type Config struct { SnapstoreConfig *snapstore.Config - RestoreOptions *restorer.RestoreOptions + RestoreOptions *brtypes.RestoreOptions } // EtcdInitializer implements Initializer interface to perform validation and diff --git a/pkg/initializer/validator/datavalidator.go b/pkg/initializer/validator/datavalidator.go index 11f1ad6a2..85932d7eb 100644 --- a/pkg/initializer/validator/datavalidator.go +++ b/pkg/initializer/validator/datavalidator.go @@ -26,8 +26,8 @@ import ( "time" "github.com/gardener/etcd-backup-restore/pkg/miscellaneous" - "github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer" "github.com/gardener/etcd-backup-restore/pkg/snapstore" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" "github.com/sirupsen/logrus" bolt "go.etcd.io/bbolt" "go.etcd.io/etcd/clientv3" @@ -291,15 +291,15 @@ func (d *DataValidator) checkFullRevisionConsistency(dataDir string, latestSnaps var latestSyncedEtcdRevision int64 d.Logger.Info("Starting embedded etcd server...") - ro := &restorer.RestoreOptions{ - Config: &restorer.RestorationConfig{ + ro := &brtypes.RestoreOptions{ + Config: &brtypes.RestorationConfig{ RestoreDataDir: dataDir, EmbeddedEtcdQuotaBytes: d.Config.EmbeddedEtcdQuotaBytes, MaxRequestBytes: defaultMaxRequestBytes, MaxTxnOps: defaultMaxTxnOps, }, } - e, err := restorer.StartEmbeddedEtcd(logrus.NewEntry(d.Logger), ro) + e, err := miscellaneous.StartEmbeddedEtcd(logrus.NewEntry(d.Logger), ro) if err != nil { d.Logger.Infof("unable to start embedded etcd: %v", err) return DataDirectoryCorrupt, err diff --git a/pkg/miscellaneous/miscellaneous.go b/pkg/miscellaneous/miscellaneous.go index 6e2183cf5..6f6a34a1e 100644 --- a/pkg/miscellaneous/miscellaneous.go +++ b/pkg/miscellaneous/miscellaneous.go @@ -15,11 +15,18 @@ package miscellaneous import ( + "fmt" + "net/url" + "path/filepath" "sort" + "time" "github.com/gardener/etcd-backup-restore/pkg/metrics" "github.com/gardener/etcd-backup-restore/pkg/snapstore" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" + "go.etcd.io/etcd/embed" ) // GetLatestFullSnapshotAndDeltaSnapList returns the latest snapshot @@ -54,3 +61,39 @@ func GetLatestFullSnapshotAndDeltaSnapList(store snapstore.SnapStore) (*snapstor } return fullSnapshot, deltaSnapList, nil } + +// StartEmbeddedEtcd starts the embedded etcd server. +func StartEmbeddedEtcd(logger *logrus.Entry, ro *brtypes.RestoreOptions) (*embed.Etcd, error) { + cfg := embed.NewConfig() + cfg.Dir = filepath.Join(ro.Config.RestoreDataDir) + DefaultListenPeerURLs := "http://localhost:0" + DefaultListenClientURLs := "http://localhost:0" + DefaultInitialAdvertisePeerURLs := "http://localhost:0" + DefaultAdvertiseClientURLs := "http://localhost:0" + lpurl, _ := url.Parse(DefaultListenPeerURLs) + apurl, _ := url.Parse(DefaultInitialAdvertisePeerURLs) + lcurl, _ := url.Parse(DefaultListenClientURLs) + acurl, _ := url.Parse(DefaultAdvertiseClientURLs) + cfg.LPUrls = []url.URL{*lpurl} + cfg.LCUrls = []url.URL{*lcurl} + cfg.APUrls = []url.URL{*apurl} + cfg.ACUrls = []url.URL{*acurl} + cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) + cfg.QuotaBackendBytes = ro.Config.EmbeddedEtcdQuotaBytes + cfg.MaxRequestBytes = ro.Config.MaxRequestBytes + cfg.MaxTxnOps = ro.Config.MaxTxnOps + cfg.Logger = "zap" + e, err := embed.StartEtcd(cfg) + if err != nil { + return nil, err + } + select { + case <-e.Server.ReadyNotify(): + logger.Infof("Embedded server is ready to listen client at: %s", e.Clients[0].Addr()) + case <-time.After(60 * time.Second): + e.Server.Stop() // trigger a shutdown + e.Close() + return nil, fmt.Errorf("server took too long to start") + } + return e, nil +} diff --git a/pkg/server/backuprestoreserver.go b/pkg/server/backuprestoreserver.go index 4bffff562..f44027d22 100644 --- a/pkg/server/backuprestoreserver.go +++ b/pkg/server/backuprestoreserver.go @@ -23,12 +23,12 @@ import ( "github.com/gardener/etcd-backup-restore/pkg/errors" "github.com/gardener/etcd-backup-restore/pkg/metrics" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" "github.com/prometheus/client_golang/prometheus" "github.com/gardener/etcd-backup-restore/pkg/defragmentor" "github.com/gardener/etcd-backup-restore/pkg/etcdutil" "github.com/gardener/etcd-backup-restore/pkg/initializer" - "github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer" "github.com/gardener/etcd-backup-restore/pkg/snapshot/snapshotter" "github.com/gardener/etcd-backup-restore/pkg/snapstore" cron "github.com/robfig/cron/v3" @@ -71,7 +71,7 @@ func (b *BackupRestoreServer) Run(ctx context.Context) error { b.logger.Fatalf("failed creating url map for restore cluster: %v", err) } - options := &restorer.RestoreOptions{ + options := &brtypes.RestoreOptions{ Config: b.config.RestorationConfig, ClusterURLs: clusterURLsMap, PeerURLs: peerURLs, @@ -114,7 +114,7 @@ func (b *BackupRestoreServer) startHTTPServer(initializer initializer.Initialize // runServerWithoutSnapshotter runs the etcd-backup-restore // for the case where snapshotter is not configured -func (b *BackupRestoreServer) runServerWithoutSnapshotter(ctx context.Context, restoreOpts *restorer.RestoreOptions) { +func (b *BackupRestoreServer) runServerWithoutSnapshotter(ctx context.Context, restoreOpts *brtypes.RestoreOptions) { etcdInitializer := initializer.NewInitializer(restoreOpts, nil, b.logger.Logger) // If no storage provider is given, snapshotter will be nil, in which @@ -131,7 +131,7 @@ func (b *BackupRestoreServer) runServerWithoutSnapshotter(ctx context.Context, r // runServerWithSnapshotter runs the etcd-backup-restore // for the case where snapshotter is configured correctly -func (b *BackupRestoreServer) runServerWithSnapshotter(ctx context.Context, restoreOpts *restorer.RestoreOptions) error { +func (b *BackupRestoreServer) runServerWithSnapshotter(ctx context.Context, restoreOpts *brtypes.RestoreOptions) error { ackCh := make(chan struct{}) etcdInitializer := initializer.NewInitializer(restoreOpts, b.config.SnapstoreConfig, b.logger.Logger) diff --git a/pkg/server/init.go b/pkg/server/init.go index 3e3a13a47..c17ac7615 100644 --- a/pkg/server/init.go +++ b/pkg/server/init.go @@ -20,8 +20,8 @@ import ( "github.com/gardener/etcd-backup-restore/pkg/compressor" "github.com/gardener/etcd-backup-restore/pkg/etcdutil" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" - "github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer" "github.com/gardener/etcd-backup-restore/pkg/snapshot/snapshotter" "github.com/gardener/etcd-backup-restore/pkg/snapstore" "github.com/robfig/cron/v3" @@ -35,8 +35,8 @@ func NewBackupRestoreComponentConfig() *BackupRestoreComponentConfig { ServerConfig: NewHTTPServerConfig(), SnapshotterConfig: snapshotter.NewSnapshotterConfig(), SnapstoreConfig: snapstore.NewSnapstoreConfig(), - RestorationConfig: restorer.NewRestorationConfig(), CompressionConfig: compressor.NewCompressorConfig(), + RestorationConfig: brtypes.NewRestorationConfig(), DefragmentationSchedule: defaultDefragmentationSchedule, } } diff --git a/pkg/server/types.go b/pkg/server/types.go index 2bec32011..10ac5958d 100644 --- a/pkg/server/types.go +++ b/pkg/server/types.go @@ -17,9 +17,9 @@ package server import ( "github.com/gardener/etcd-backup-restore/pkg/compressor" "github.com/gardener/etcd-backup-restore/pkg/etcdutil" - "github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer" "github.com/gardener/etcd-backup-restore/pkg/snapshot/snapshotter" "github.com/gardener/etcd-backup-restore/pkg/snapstore" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" ) const ( @@ -33,8 +33,8 @@ type BackupRestoreComponentConfig struct { ServerConfig *HTTPServerConfig `json:"serverConfig,omitempty"` SnapshotterConfig *snapshotter.Config `json:"snapshotterConfig,omitempty"` SnapstoreConfig *snapstore.Config `json:"snapstoreConfig,omitempty"` - RestorationConfig *restorer.RestorationConfig `json:"restorationConfig,omitempty"` CompressionConfig *compressor.CompressionConfig `json:"compressionConfig,omitempty"` + RestorationConfig *brtypes.RestorationConfig `json:"restorationConfig,omitempty"` DefragmentationSchedule string `json:"defragmentationSchedule"` } diff --git a/pkg/snapshot/restorer/init.go b/pkg/snapshot/restorer/init.go deleted file mode 100644 index 3bbb8fa12..000000000 --- a/pkg/snapshot/restorer/init.go +++ /dev/null @@ -1,84 +0,0 @@ -// Copyright (c) 2019 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package restorer - -import ( - "fmt" - "path" - - flag "github.com/spf13/pflag" - "go.etcd.io/etcd/pkg/types" -) - -// NewRestorationConfig returns the restoration config. -func NewRestorationConfig() *RestorationConfig { - return &RestorationConfig{ - InitialCluster: initialClusterFromName(defaultName), - InitialClusterToken: defaultInitialClusterToken, - RestoreDataDir: fmt.Sprintf("%s.etcd", defaultName), - InitialAdvertisePeerURLs: []string{defaultInitialAdvertisePeerURLs}, - Name: defaultName, - SkipHashCheck: false, - MaxFetchers: defaultMaxFetchers, - MaxCallSendMsgSize: defaultMaxCallSendMsgSize, - MaxRequestBytes: defaultMaxRequestBytes, - MaxTxnOps: defaultMaxTxnOps, - EmbeddedEtcdQuotaBytes: int64(defaultEmbeddedEtcdQuotaBytes), - } -} - -// AddFlags adds the flags to flagset. -func (c *RestorationConfig) AddFlags(fs *flag.FlagSet) { - fs.StringVar(&c.InitialCluster, "initial-cluster", c.InitialCluster, "initial cluster configuration for restore bootstrap") - fs.StringVar(&c.InitialClusterToken, "initial-cluster-token", c.InitialClusterToken, "initial cluster token for the etcd cluster during restore bootstrap") - fs.StringVarP(&c.RestoreDataDir, "data-dir", "d", c.RestoreDataDir, "path to the data directory") - fs.StringArrayVar(&c.InitialAdvertisePeerURLs, "initial-advertise-peer-urls", c.InitialAdvertisePeerURLs, "list of this member's peer URLs to advertise to the rest of the cluster") - fs.StringVar(&c.Name, "name", c.Name, "human-readable name for this member") - fs.BoolVar(&c.SkipHashCheck, "skip-hash-check", c.SkipHashCheck, "ignore snapshot integrity hash value (required if copied from data directory)") - fs.UintVar(&c.MaxFetchers, "max-fetchers", c.MaxFetchers, "maximum number of threads that will fetch delta snapshots in parallel") - fs.IntVar(&c.MaxCallSendMsgSize, "max-call-send-message-size", c.MaxCallSendMsgSize, "maximum size of message that the client sends") - fs.UintVar(&c.MaxRequestBytes, "max-request-bytes", c.MaxRequestBytes, "Maximum client request size in bytes the server will accept") - fs.UintVar(&c.MaxTxnOps, "max-txn-ops", c.MaxTxnOps, "Maximum number of operations permitted in a transaction") - fs.Int64Var(&c.EmbeddedEtcdQuotaBytes, "embedded-etcd-quota-bytes", c.EmbeddedEtcdQuotaBytes, "maximum backend quota for the embedded etcd used for applying delta snapshots") -} - -// Validate validates the config. -func (c *RestorationConfig) Validate() error { - if _, err := types.NewURLsMap(c.InitialCluster); err != nil { - return fmt.Errorf("failed creating url map for restore cluster: %v", err) - } - if _, err := types.NewURLs(c.InitialAdvertisePeerURLs); err != nil { - return fmt.Errorf("failed parsing peers urls for restore cluster: %v", err) - } - if c.MaxCallSendMsgSize <= 0 { - return fmt.Errorf("max call send message should be greater than zero") - } - if c.MaxFetchers <= 0 { - return fmt.Errorf("max fetchers should be greater than zero") - } - if c.EmbeddedEtcdQuotaBytes <= 0 { - return fmt.Errorf("Etcd Quota size for etcd must be greater than 0") - } - c.RestoreDataDir = path.Clean(c.RestoreDataDir) - return nil -} - -func initialClusterFromName(name string) string { - n := name - if name == "" { - n = defaultName - } - return fmt.Sprintf("%s=http://localhost:2380", n) -} diff --git a/pkg/snapshot/restorer/restorer.go b/pkg/snapshot/restorer/restorer.go index e2d74fa26..e9328717e 100644 --- a/pkg/snapshot/restorer/restorer.go +++ b/pkg/snapshot/restorer/restorer.go @@ -23,7 +23,6 @@ import ( "io" "io/ioutil" "math" - "net/url" "os" "path" "path/filepath" @@ -32,10 +31,11 @@ import ( "time" "github.com/gardener/etcd-backup-restore/pkg/compressor" + "github.com/gardener/etcd-backup-restore/pkg/miscellaneous" "github.com/gardener/etcd-backup-restore/pkg/snapstore" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" "github.com/sirupsen/logrus" "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/embed" "go.etcd.io/etcd/etcdserver" "go.etcd.io/etcd/etcdserver/api/membership" "go.etcd.io/etcd/etcdserver/api/snap" @@ -54,6 +54,18 @@ import ( "go.uber.org/zap" ) +const ( + tmpDir = "/tmp" + tmpEventsDataFilePrefix = "etcd-restore-" +) + +// Restorer is a struct for etcd data directory restorer +type Restorer struct { + logger *logrus.Entry + zapLogger *zap.Logger + store snapstore.SnapStore +} + // NewRestorer returns the restorer object. func NewRestorer(store snapstore.SnapStore, logger *logrus.Entry) *Restorer { zapLogger, _ := zap.NewProduction() @@ -65,8 +77,8 @@ func NewRestorer(store snapstore.SnapStore, logger *logrus.Entry) *Restorer { } // Restore restore the etcd data directory as per specified restore options. -func (r *Restorer) Restore(ro RestoreOptions) error { - if err := r.restoreFromBaseSnapshot(ro); err != nil { +func (r *Restorer) Restore(ro brtypes.RestoreOptions) error { + if err := r.RestoreFromBaseSnapshot(ro); err != nil { return fmt.Errorf("failed to restore from the base snapshot :%v", err) } if len(ro.DeltaSnapList) == 0 { @@ -74,7 +86,7 @@ func (r *Restorer) Restore(ro RestoreOptions) error { return nil } r.logger.Infof("Starting embedded etcd server...") - e, err := StartEmbeddedEtcd(r.logger, &ro) + e, err := miscellaneous.StartEmbeddedEtcd(r.logger, &ro) if err != nil { return err } @@ -91,11 +103,11 @@ func (r *Restorer) Restore(ro RestoreOptions) error { defer client.Close() r.logger.Infof("Applying delta snapshots...") - return r.applyDeltaSnapshots(client, ro) + return r.ApplyDeltaSnapshots(client, ro) } -// restoreFromBaseSnapshot restore the etcd data directory from base snapshot. -func (r *Restorer) restoreFromBaseSnapshot(ro RestoreOptions) error { +// RestoreFromBaseSnapshot restore the etcd data directory from base snapshot. +func (r *Restorer) RestoreFromBaseSnapshot(ro brtypes.RestoreOptions) error { var err error if path.Join(ro.BaseSnapshot.SnapDir, ro.BaseSnapshot.SnapName) == "" { r.logger.Warnf("Base snapshot path not provided. Will do nothing.") @@ -131,8 +143,8 @@ func (r *Restorer) restoreFromBaseSnapshot(ro RestoreOptions) error { } // makeDB copies the database snapshot to the snapshot directory. -func (r *Restorer) makeDB(snapdir string, snap snapstore.Snapshot, commit int, skipHashCheck bool) error { - rc, err := r.store.Fetch(snap) +func (r *Restorer) makeDB(snapdir string, snap *snapstore.Snapshot, commit int, skipHashCheck bool) error { + rc, err := r.store.Fetch(*snap) if err != nil { return err } @@ -221,8 +233,9 @@ func (r *Restorer) makeDB(snapdir string, snap snapstore.Snapshot, commit int, s be := backend.NewDefaultBackend(dbPath) // a lessor that never times out leases lessor := lease.NewLessor(r.zapLogger, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64}) - s := mvcc.NewStore(r.zapLogger, be, lessor, (*initIndex)(&commit), mvcc.StoreConfig{}) + s := mvcc.NewStore(r.zapLogger, be, lessor, (*brtypes.InitIndex)(&commit), mvcc.StoreConfig{}) trace := traceutil.New("write", r.zapLogger) + txn := s.Write(trace) btx := be.BatchTx() del := func(k, v []byte) error { @@ -329,44 +342,8 @@ func makeWALAndSnap(logger *zap.Logger, waldir, snapdir string, cl *membership.R return w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term}) } -// StartEmbeddedEtcd starts the embedded etcd server. -func StartEmbeddedEtcd(logger *logrus.Entry, ro *RestoreOptions) (*embed.Etcd, error) { - cfg := embed.NewConfig() - cfg.Dir = filepath.Join(ro.Config.RestoreDataDir) - DefaultListenPeerURLs := "http://localhost:0" - DefaultListenClientURLs := "http://localhost:0" - DefaultInitialAdvertisePeerURLs := "http://localhost:0" - DefaultAdvertiseClientURLs := "http://localhost:0" - lpurl, _ := url.Parse(DefaultListenPeerURLs) - apurl, _ := url.Parse(DefaultInitialAdvertisePeerURLs) - lcurl, _ := url.Parse(DefaultListenClientURLs) - acurl, _ := url.Parse(DefaultAdvertiseClientURLs) - cfg.LPUrls = []url.URL{*lpurl} - cfg.LCUrls = []url.URL{*lcurl} - cfg.APUrls = []url.URL{*apurl} - cfg.ACUrls = []url.URL{*acurl} - cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) - cfg.QuotaBackendBytes = ro.Config.EmbeddedEtcdQuotaBytes - cfg.MaxRequestBytes = ro.Config.MaxRequestBytes - cfg.MaxTxnOps = ro.Config.MaxTxnOps - cfg.Logger = "zap" - e, err := embed.StartEtcd(cfg) - if err != nil { - return nil, err - } - select { - case <-e.Server.ReadyNotify(): - logger.Infof("Embedded server is ready to listen client at: %s", e.Clients[0].Addr()) - case <-time.After(60 * time.Second): - e.Server.Stop() // trigger a shutdown - e.Close() - return nil, fmt.Errorf("server took too long to start") - } - return e, nil -} - -// applyDeltaSnapshots fetches the events from delta snapshots in parallel and applies them to the embedded etcd sequentially. -func (r *Restorer) applyDeltaSnapshots(client *clientv3.Client, ro RestoreOptions) error { +// ApplyDeltaSnapshots fetches the events from delta snapshots in parallel and applies them to the embedded etcd sequentially. +func (r *Restorer) ApplyDeltaSnapshots(client *clientv3.Client, ro brtypes.RestoreOptions) error { snapList := ro.DeltaSnapList numMaxFetchers := ro.Config.MaxFetchers @@ -390,8 +367,8 @@ func (r *Restorer) applyDeltaSnapshots(client *clientv3.Client, ro RestoreOption numFetchers = int(math.Min(float64(numMaxFetchers), float64(numSnaps))) snapLocationsCh = make(chan string, numSnaps) errCh = make(chan error, numFetchers+1) - fetcherInfoCh = make(chan fetcherInfo, numSnaps) - applierInfoCh = make(chan applierInfo, numSnaps) + fetcherInfoCh = make(chan brtypes.FetcherInfo, numSnaps) + applierInfoCh = make(chan brtypes.ApplierInfo, numSnaps) stopCh = make(chan bool) wg sync.WaitGroup ) @@ -403,7 +380,7 @@ func (r *Restorer) applyDeltaSnapshots(client *clientv3.Client, ro RestoreOption } for i, snap := range remainingSnaps { - fetcherInfo := fetcherInfo{ + fetcherInfo := brtypes.FetcherInfo{ Snapshot: *snap, SnapIndex: i, } @@ -441,7 +418,7 @@ func (r *Restorer) cleanup(snapLocationsCh chan string, stopCh chan bool, wg *sy } // fetchSnaps fetches delta snapshots as events and persists them onto disk. -func (r *Restorer) fetchSnaps(fetcherIndex int, fetcherInfoCh <-chan fetcherInfo, applierInfoCh chan<- applierInfo, snapLocationsCh chan<- string, errCh chan<- error, stopCh chan bool, wg *sync.WaitGroup) { +func (r *Restorer) fetchSnaps(fetcherIndex int, fetcherInfoCh <-chan brtypes.FetcherInfo, applierInfoCh chan<- brtypes.ApplierInfo, snapLocationsCh chan<- string, errCh chan<- error, stopCh chan bool, wg *sync.WaitGroup) { defer wg.Done() wg.Add(1) @@ -457,20 +434,20 @@ func (r *Restorer) fetchSnaps(fetcherIndex int, fetcherInfoCh <-chan fetcherInfo eventsData, err := r.getEventsDataFromDeltaSnapshot(fetcherInfo.Snapshot) if err != nil { errCh <- fmt.Errorf("failed to read events data from delta snapshot %s : %v", fetcherInfo.Snapshot.SnapName, err) - applierInfoCh <- applierInfo{SnapIndex: -1} // cannot use close(ch) as concurrent fetchSnaps routines might try to send on channel, causing a panic + applierInfoCh <- brtypes.ApplierInfo{SnapIndex: -1} // cannot use close(ch) as concurrent fetchSnaps routines might try to send on channel, causing a panic return } eventsFilePath, err := persistDeltaSnapshot(eventsData) if err != nil { errCh <- fmt.Errorf("failed to persist events data for delta snapshot %s : %v", fetcherInfo.Snapshot.SnapName, err) - applierInfoCh <- applierInfo{SnapIndex: -1} + applierInfoCh <- brtypes.ApplierInfo{SnapIndex: -1} return } snapLocationsCh <- eventsFilePath // used for cleanup later - applierInfo := applierInfo{ + applierInfo := brtypes.ApplierInfo{ EventsFilePath: eventsFilePath, SnapIndex: fetcherInfo.SnapIndex, } @@ -480,7 +457,7 @@ func (r *Restorer) fetchSnaps(fetcherIndex int, fetcherInfoCh <-chan fetcherInfo } // applySnaps applies delta snapshot events to the embedded etcd sequentially, in the right order of snapshots, regardless of the order in which they were fetched. -func (r *Restorer) applySnaps(client *clientv3.Client, remainingSnaps snapstore.SnapList, applierInfoCh <-chan applierInfo, errCh chan<- error, stopCh <-chan bool, wg *sync.WaitGroup) { +func (r *Restorer) applySnaps(client *clientv3.Client, remainingSnaps snapstore.SnapList, applierInfoCh <-chan brtypes.ApplierInfo, errCh chan<- error, stopCh <-chan bool, wg *sync.WaitGroup) { defer wg.Done() wg.Add(1) @@ -524,7 +501,7 @@ func (r *Restorer) applySnaps(client *clientv3.Client, remainingSnaps snapstore. if err = os.Remove(filePath); err != nil { r.logger.Warnf("Unable to remove file: %s; err: %v", filePath, err) } - events := []event{} + events := []brtypes.Event{} if err = json.Unmarshal(eventsData, &events); err != nil { errCh <- fmt.Errorf("failed to read events from events data for delta snapshot %s : %v", snapName, err) return @@ -546,7 +523,7 @@ func (r *Restorer) applySnaps(client *clientv3.Client, remainingSnaps snapstore. } // applyEventsAndVerify applies events from one snapshot to the embedded etcd and verifies the correctness of the sequence of snapshot applied. -func applyEventsAndVerify(client *clientv3.Client, events []event, snap *snapstore.Snapshot) error { +func applyEventsAndVerify(client *clientv3.Client, events []brtypes.Event, snap *snapstore.Snapshot) error { if err := applyEventsToEtcd(client, events); err != nil { return fmt.Errorf("failed to apply events to etcd for delta snapshot %s : %v", snap.SnapName, err) } @@ -589,13 +566,13 @@ func (r *Restorer) applyFirstDeltaSnapshot(client *clientv3.Client, snap snapsto } // getEventsFromDeltaSnapshot returns the events from delta snapshot from snap store. -func (r *Restorer) getEventsFromDeltaSnapshot(snap snapstore.Snapshot) ([]event, error) { +func (r *Restorer) getEventsFromDeltaSnapshot(snap snapstore.Snapshot) ([]brtypes.Event, error) { data, err := r.getEventsDataFromDeltaSnapshot(snap) if err != nil { return nil, err } - events := []event{} + events := []brtypes.Event{} if err := json.Unmarshal(data, &events); err != nil { return nil, err } @@ -676,7 +653,7 @@ func persistDeltaSnapshot(data []byte) (string, error) { } // applyEventsToEtcd performs operations in events sequentially. -func applyEventsToEtcd(client *clientv3.Client, events []event) error { +func applyEventsToEtcd(client *clientv3.Client, events []brtypes.Event) error { var ( lastRev int64 ops = []clientv3.Op{} diff --git a/pkg/snapshot/restorer/restorer_suite_test.go b/pkg/snapshot/restorer/restorer_suite_test.go index 447098248..848f80fdb 100644 --- a/pkg/snapshot/restorer/restorer_suite_test.go +++ b/pkg/snapshot/restorer/restorer_suite_test.go @@ -23,11 +23,7 @@ import ( "time" "github.com/gardener/etcd-backup-restore/pkg/compressor" - "github.com/gardener/etcd-backup-restore/pkg/wrappers" - "github.com/gardener/etcd-backup-restore/pkg/etcdutil" - "github.com/gardener/etcd-backup-restore/pkg/snapshot/snapshotter" - "github.com/gardener/etcd-backup-restore/pkg/snapstore" "github.com/gardener/etcd-backup-restore/test/utils" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -80,8 +76,9 @@ var _ = SynchronizedBeforeSuite(func() []byte { deltaSnapshotPeriod := time.Second ctx := utils.ContextWithWaitGroupFollwedByGracePeriod(populatorCtx, wg, deltaSnapshotPeriod+2*time.Second) + compressionConfig := compressor.NewCompressorConfig() - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ctx.Done(), true, compressionConfig) + err = utils.RunSnapshotter(logger, snapstoreDir, deltaSnapshotPeriod, endpoints, ctx.Done(), true, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) keyTo = resp.KeyTo @@ -104,31 +101,3 @@ func cleanUp() { Expect(err).ShouldNot(HaveOccurred()) } - -// runSnapshotter creates a snapshotter object and runs it for a duration specified by 'snapshotterDurationSeconds' -func runSnapshotter(logger *logrus.Entry, deltaSnapshotPeriod time.Duration, endpoints []string, stopCh <-chan struct{}, startWithFullSnapshot bool, compressionConfig *compressor.CompressionConfig) error { - store, err := snapstore.GetSnapstore(&snapstore.Config{Container: snapstoreDir, Provider: "Local"}) - if err != nil { - return err - } - - etcdConnectionConfig := etcdutil.NewEtcdConnectionConfig() - etcdConnectionConfig.ConnectionTimeout.Duration = 10 * time.Second - etcdConnectionConfig.Endpoints = endpoints - - snapshotterConfig := &snapshotter.Config{ - FullSnapshotSchedule: "0 0 1 1 *", - DeltaSnapshotPeriod: wrappers.Duration{Duration: deltaSnapshotPeriod}, - DeltaSnapshotMemoryLimit: snapshotter.DefaultDeltaSnapMemoryLimit, - GarbageCollectionPeriod: wrappers.Duration{Duration: time.Minute}, - GarbageCollectionPolicy: snapshotter.GarbageCollectionPolicyLimitBased, - MaxBackups: 1, - } - - ssr, err := snapshotter.NewSnapshotter(logger, snapshotterConfig, store, etcdConnectionConfig, compressionConfig) - if err != nil { - return err - } - - return ssr.Run(stopCh, startWithFullSnapshot) -} diff --git a/pkg/snapshot/restorer/restorer_test.go b/pkg/snapshot/restorer/restorer_test.go index c2d1a6433..f61ca7953 100644 --- a/pkg/snapshot/restorer/restorer_test.go +++ b/pkg/snapshot/restorer/restorer_test.go @@ -27,6 +27,7 @@ import ( "github.com/gardener/etcd-backup-restore/pkg/compressor" "github.com/gardener/etcd-backup-restore/pkg/miscellaneous" "github.com/gardener/etcd-backup-restore/pkg/snapstore" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" "github.com/gardener/etcd-backup-restore/test/utils" "github.com/sirupsen/logrus" "go.etcd.io/etcd/clientv3" @@ -70,7 +71,7 @@ var _ = Describe("Running Restorer", func() { }) Describe("For pre-loaded Snapstore", func() { - var restoreOpts RestoreOptions + var restoreOpts brtypes.RestoreOptions BeforeEach(func() { err = corruptEtcdDir() @@ -83,8 +84,8 @@ var _ = Describe("Running Restorer", func() { Expect(err).ShouldNot(HaveOccurred()) rstr = NewRestorer(store, logger) - restoreOpts = RestoreOptions{ - Config: &RestorationConfig{ + restoreOpts = brtypes.RestoreOptions{ + Config: &brtypes.RestorationConfig{ RestoreDataDir: etcdDir, InitialClusterToken: restoreClusterToken, InitialCluster: restoreCluster, @@ -97,7 +98,7 @@ var _ = Describe("Running Restorer", func() { MaxTxnOps: maxTxnOps, EmbeddedEtcdQuotaBytes: embeddedEtcdQuotaBytes, }, - BaseSnapshot: *baseSnapshot, + BaseSnapshot: baseSnapshot, DeltaSnapList: deltaSnapList, ClusterURLs: clusterUrlsMap, PeerURLs: peerUrls, @@ -220,7 +221,7 @@ var _ = Describe("Running Restorer", func() { store snapstore.SnapStore deltaSnapshotPeriod time.Duration endpoints []string - restorationConfig *RestorationConfig + restorationConfig *brtypes.RestorationConfig ) BeforeEach(func() { @@ -232,7 +233,7 @@ var _ = Describe("Running Restorer", func() { store, err = snapstore.GetSnapstore(&snapstore.Config{Container: snapstoreDir, Provider: "Local"}) Expect(err).ShouldNot(HaveOccurred()) - restorationConfig = &RestorationConfig{ + restorationConfig = &brtypes.RestorationConfig{ RestoreDataDir: etcdDir, InitialClusterToken: restoreClusterToken, InitialCluster: restoreCluster, @@ -266,7 +267,7 @@ var _ = Describe("Running Restorer", func() { logger.Infoln("Starting snapshotter with basesnapshot set to false") ssrCtx := utils.ContextWithWaitGroupFollwedByGracePeriod(testCtx, wg, 2) compressionConfig := compressor.NewCompressorConfig() - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ssrCtx.Done(), startWithFullSnapshot, compressionConfig) + err = utils.RunSnapshotter(logger, snapstoreDir, deltaSnapshotPeriod, endpoints, ssrCtx.Done(), startWithFullSnapshot, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) etcd.Server.Stop() etcd.Close() @@ -280,15 +281,18 @@ var _ = Describe("Running Restorer", func() { logger.Infof("Base snapshot is %v", baseSnapshot) rstr = NewRestorer(store, logger) - restoreOpts := RestoreOptions{ + restoreOpts := brtypes.RestoreOptions{ Config: restorationConfig, + BaseSnapshot: baseSnapshot, DeltaSnapList: deltaSnapList, ClusterURLs: clusterUrlsMap, PeerURLs: peerUrls, } - restoreOpts.BaseSnapshot.SnapDir = "" - restoreOpts.BaseSnapshot.SnapName = "" + if baseSnapshot != nil { + restoreOpts.BaseSnapshot.SnapDir = "" + restoreOpts.BaseSnapshot.SnapName = "" + } err := rstr.Restore(restoreOpts) @@ -308,7 +312,7 @@ var _ = Describe("Running Restorer", func() { defer cancelPopulator() ssrCtx := utils.ContextWithWaitGroupFollwedByGracePeriod(testCtx, wg, time.Second) compressionConfig := compressor.NewCompressorConfig() - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ssrCtx.Done(), true, compressionConfig) + err = utils.RunSnapshotter(logger, snapstoreDir, deltaSnapshotPeriod, endpoints, ssrCtx.Done(), true, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) etcd.Server.Stop() etcd.Close() @@ -322,9 +326,9 @@ var _ = Describe("Running Restorer", func() { rstr = NewRestorer(store, logger) - restoreOpts := RestoreOptions{ + restoreOpts := brtypes.RestoreOptions{ Config: restorationConfig, - BaseSnapshot: *baseSnapshot, + BaseSnapshot: baseSnapshot, DeltaSnapList: deltaSnapList, ClusterURLs: clusterUrlsMap, PeerURLs: peerUrls, @@ -346,7 +350,7 @@ var _ = Describe("Running Restorer", func() { defer cancelPopulator() ssrCtx := utils.ContextWithWaitGroupFollwedByGracePeriod(testCtx, wg, time.Second) compressionConfig := compressor.NewCompressorConfig() - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ssrCtx.Done(), true, compressionConfig) + err = utils.RunSnapshotter(logger, snapstoreDir, deltaSnapshotPeriod, endpoints, ssrCtx.Done(), true, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) etcd.Close() @@ -365,9 +369,9 @@ var _ = Describe("Running Restorer", func() { rstr = NewRestorer(store, logger) - restoreOpts := RestoreOptions{ + restoreOpts := brtypes.RestoreOptions{ Config: restorationConfig, - BaseSnapshot: *baseSnapshot, + BaseSnapshot: baseSnapshot, DeltaSnapList: deltaSnapList, ClusterURLs: clusterUrlsMap, PeerURLs: peerUrls, @@ -392,7 +396,7 @@ var _ = Describe("Running Restorer", func() { defer cancelPopulator() ssrCtx := utils.ContextWithWaitGroupFollwedByGracePeriod(testCtx, wg, 2*time.Second) compressionConfig := compressor.NewCompressorConfig() - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ssrCtx.Done(), true, compressionConfig) + err = utils.RunSnapshotter(logger, snapstoreDir, deltaSnapshotPeriod, endpoints, ssrCtx.Done(), true, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) etcd.Close() @@ -401,9 +405,9 @@ var _ = Describe("Running Restorer", func() { rstr = NewRestorer(store, logger) - restoreOpts := RestoreOptions{ + restoreOpts := brtypes.RestoreOptions{ Config: restorationConfig, - BaseSnapshot: *baseSnapshot, + BaseSnapshot: baseSnapshot, DeltaSnapList: deltaSnapList, ClusterURLs: clusterUrlsMap, PeerURLs: peerUrls, @@ -430,7 +434,7 @@ var _ = Describe("Running Restorer", func() { logger.Infoln("Starting snapshotter while loading is happening") compressionConfig := compressor.NewCompressorConfig() - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ssrCtx.Done(), true, compressionConfig) + err = utils.RunSnapshotter(logger, snapstoreDir, deltaSnapshotPeriod, endpoints, ssrCtx.Done(), true, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) time.Sleep(time.Duration(5 * time.Second)) @@ -448,9 +452,9 @@ var _ = Describe("Running Restorer", func() { rstr = NewRestorer(store, logger) - restoreOpts := RestoreOptions{ + restoreOpts := brtypes.RestoreOptions{ Config: restorationConfig, - BaseSnapshot: *baseSnapshot, + BaseSnapshot: baseSnapshot, DeltaSnapList: deltaSnapList, ClusterURLs: clusterUrlsMap, PeerURLs: peerUrls, @@ -479,7 +483,7 @@ var _ = Describe("Running Restorer", func() { compressionConfig := compressor.NewCompressorConfig() compressionConfig.Enabled = false ctx, cancel := context.WithTimeout(testCtx, time.Duration(2*time.Second)) - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ctx.Done(), true, compressionConfig) + err = utils.RunSnapshotter(logger, snapstoreDir, deltaSnapshotPeriod, endpoints, ctx.Done(), true, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) cancel() @@ -493,7 +497,7 @@ var _ = Describe("Running Restorer", func() { compressionConfig.Enabled = true compressionConfig.CompressionPolicy = "lzw" ctx, cancel = context.WithTimeout(testCtx, time.Duration(2*time.Second)) - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ctx.Done(), false, compressionConfig) + err = utils.RunSnapshotter(logger, snapstoreDir, deltaSnapshotPeriod, endpoints, ctx.Done(), false, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) cancel() @@ -506,7 +510,7 @@ var _ = Describe("Running Restorer", func() { compressionConfig = compressor.NewCompressorConfig() compressionConfig.Enabled = true ctx, cancel = context.WithTimeout(testCtx, time.Duration(2*time.Second)) - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ctx.Done(), false, compressionConfig) + err = utils.RunSnapshotter(logger, snapstoreDir, deltaSnapshotPeriod, endpoints, ctx.Done(), false, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) cancel() @@ -520,7 +524,7 @@ var _ = Describe("Running Restorer", func() { compressionConfig.Enabled = true compressionConfig.CompressionPolicy = "zlib" ctx, cancel = context.WithTimeout(testCtx, time.Duration(2*time.Second)) - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ctx.Done(), false, compressionConfig) + err = utils.RunSnapshotter(logger, snapstoreDir, deltaSnapshotPeriod, endpoints, ctx.Done(), false, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) cancel() @@ -533,9 +537,9 @@ var _ = Describe("Running Restorer", func() { rstr = NewRestorer(store, logger) - restoreOpts := RestoreOptions{ + restoreOpts := brtypes.RestoreOptions{ Config: restorationConfig, - BaseSnapshot: *baseSnapshot, + BaseSnapshot: baseSnapshot, DeltaSnapList: deltaSnapList, ClusterURLs: clusterUrlsMap, PeerURLs: peerUrls, @@ -561,7 +565,7 @@ var _ = Describe("Running Restorer", func() { compressionConfig := compressor.NewCompressorConfig() compressionConfig.Enabled = true ctx, cancel := context.WithTimeout(testCtx, time.Duration(2*time.Second)) - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ctx.Done(), true, compressionConfig) + err = utils.RunSnapshotter(logger, snapstoreDir, deltaSnapshotPeriod, endpoints, ctx.Done(), true, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) cancel() @@ -575,7 +579,7 @@ var _ = Describe("Running Restorer", func() { compressionConfig.Enabled = true compressionConfig.CompressionPolicy = "lzw" ctx, cancel = context.WithTimeout(testCtx, time.Duration(2*time.Second)) - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ctx.Done(), false, compressionConfig) + err = utils.RunSnapshotter(logger, snapstoreDir, deltaSnapshotPeriod, endpoints, ctx.Done(), false, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) cancel() @@ -589,7 +593,7 @@ var _ = Describe("Running Restorer", func() { compressionConfig.Enabled = true compressionConfig.CompressionPolicy = "zlib" ctx, cancel = context.WithTimeout(testCtx, time.Duration(2*time.Second)) - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ctx.Done(), false, compressionConfig) + err = utils.RunSnapshotter(logger, snapstoreDir, deltaSnapshotPeriod, endpoints, ctx.Done(), false, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) cancel() @@ -601,7 +605,7 @@ var _ = Describe("Running Restorer", func() { // start the Snapshotter with compression not enabled to take delta snapshot. compressionConfig = compressor.NewCompressorConfig() ctx, cancel = context.WithTimeout(testCtx, time.Duration(2*time.Second)) - err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ctx.Done(), false, compressionConfig) + err = utils.RunSnapshotter(logger, snapstoreDir, deltaSnapshotPeriod, endpoints, ctx.Done(), false, compressionConfig) Expect(err).ShouldNot(HaveOccurred()) cancel() @@ -614,9 +618,9 @@ var _ = Describe("Running Restorer", func() { rstr = NewRestorer(store, logger) - restoreOpts := RestoreOptions{ + restoreOpts := brtypes.RestoreOptions{ Config: restorationConfig, - BaseSnapshot: *baseSnapshot, + BaseSnapshot: baseSnapshot, DeltaSnapList: deltaSnapList, ClusterURLs: clusterUrlsMap, PeerURLs: peerUrls, diff --git a/pkg/snapshot/restorer/types_test.go b/pkg/snapshot/restorer/types_test.go index a2fab22ee..187b80120 100644 --- a/pkg/snapshot/restorer/types_test.go +++ b/pkg/snapshot/restorer/types_test.go @@ -19,8 +19,9 @@ import ( "net/url" "time" - . "github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer" + _ "github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer" "github.com/gardener/etcd-backup-restore/pkg/snapstore" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "go.etcd.io/etcd/pkg/types" @@ -28,8 +29,8 @@ import ( var _ = Describe("restorer types", func() { var ( - makeRestorationConfig = func(s string, b bool, i int) *RestorationConfig { - return &RestorationConfig{ + makeRestorationConfig = func(s string, b bool, i int) *brtypes.RestorationConfig { + return &brtypes.RestorationConfig{ InitialCluster: s, InitialClusterToken: s, RestoreDataDir: s, @@ -40,8 +41,8 @@ var _ = Describe("restorer types", func() { EmbeddedEtcdQuotaBytes: int64(i), } } - makeSnap = func(s string, i int, t time.Time, b bool) snapstore.Snapshot { - return snapstore.Snapshot{ + makeSnap = func(s string, i int, t time.Time, b bool) *snapstore.Snapshot { + return &snapstore.Snapshot{ Kind: s, StartRevision: int64(i), LastRevision: int64(i), @@ -53,7 +54,7 @@ var _ = Describe("restorer types", func() { } makeSnapList = func(s string, i int, t time.Time, b bool) snapstore.SnapList { var s1, s2 = makeSnap(s, i, t, b), makeSnap(s, i, t, b) - return snapstore.SnapList{&s1, &s2} + return snapstore.SnapList{s1, s2} } makeURL = func(s string, b bool) url.URL { return url.URL{ @@ -77,8 +78,8 @@ var _ = Describe("restorer types", func() { } return out } - makeRestoreOptions = func(s string, i int, t time.Time, b bool) *RestoreOptions { - return &RestoreOptions{ + makeRestoreOptions = func(s string, i int, t time.Time, b bool) *brtypes.RestoreOptions { + return &brtypes.RestoreOptions{ Config: makeRestorationConfig(s, b, i), ClusterURLs: makeURLsMap(s, b), PeerURLs: makeURLs(s, b), @@ -88,14 +89,14 @@ var _ = Describe("restorer types", func() { } ) - Describe("RestorationConfig", func() { + Describe("brtypes.RestorationConfig", func() { var ( - makeA = func() *RestorationConfig { return makeRestorationConfig("a", false, 1) } - makeB = func() *RestorationConfig { return makeRestorationConfig("b", true, 2) } + makeA = func() *brtypes.RestorationConfig { return makeRestorationConfig("a", false, 1) } + makeB = func() *brtypes.RestorationConfig { return makeRestorationConfig("b", true, 2) } ) Describe("DeepCopyInto", func() { It("new out", func() { - var a, in, out = makeA(), makeA(), new(RestorationConfig) + var a, in, out = makeA(), makeA(), new(brtypes.RestorationConfig) in.DeepCopyInto(out) Expect(out).To(Equal(in)) Expect(out).ToNot(BeIdenticalTo(in)) @@ -130,10 +131,10 @@ var _ = Describe("restorer types", func() { now = time.Now() makeA = func() snapstore.SnapList { return makeSnapList("a", 1, now, false) } ) - Describe("DeepCopySnapList", func() { + Describe("brtypes.DeepCopySnapList", func() { It("out", func() { var a, in = makeA(), makeA() - var out = DeepCopySnapList(in) + var out = brtypes.DeepCopySnapList(in) Expect(out).ToNot(BeNil()) Expect(out).To(Equal(in)) Expect(out).ToNot(BeIdenticalTo(in)) @@ -147,10 +148,10 @@ var _ = Describe("restorer types", func() { var ( makeA = func() *url.URL { var u = makeURL("a", false); return &u } ) - Describe("DeepCopyURL", func() { + Describe("brtypes.DeepCopyURL", func() { It("out", func() { var a, in = makeA(), makeA() - var out = DeepCopyURL(in) + var out = brtypes.DeepCopyURL(in) Expect(out).ToNot(BeNil()) Expect(out).To(Equal(in)) Expect(out).ToNot(BeIdenticalTo(in)) @@ -164,10 +165,10 @@ var _ = Describe("restorer types", func() { var ( makeA = func() types.URLs { return makeURLs("a", false) } ) - Describe("DeepCopyURLs", func() { + Describe("brtypes.DeepCopyURLs", func() { It("out", func() { var a, in = makeA(), makeA() - var out = DeepCopyURLs(in) + var out = brtypes.DeepCopyURLs(in) Expect(out).ToNot(BeNil()) Expect(out).To(Equal(in)) Expect(out).ToNot(BeIdenticalTo(in)) @@ -177,15 +178,15 @@ var _ = Describe("restorer types", func() { }) }) - Describe("RestoreOptions", func() { + Describe("brtypes.RestoreOptions", func() { var ( now = time.Now() - makeA = func() *RestoreOptions { return makeRestoreOptions("a", 1, now, false) } - makeB = func() *RestoreOptions { return makeRestoreOptions("b", 2, now.Add(-1*time.Hour), true) } + makeA = func() *brtypes.RestoreOptions { return makeRestoreOptions("a", 1, now, false) } + makeB = func() *brtypes.RestoreOptions { return makeRestoreOptions("b", 2, now.Add(-1*time.Hour), true) } ) Describe("DeepCopyInto", func() { It("new out", func() { - var a, in, out = makeA(), makeA(), new(RestoreOptions) + var a, in, out = makeA(), makeA(), new(brtypes.RestoreOptions) in.DeepCopyInto(out) Expect(out).To(Equal(in)) Expect(out).ToNot(BeIdenticalTo(in)) diff --git a/pkg/types/compactor.go b/pkg/types/compactor.go new file mode 100644 index 000000000..e1618e893 --- /dev/null +++ b/pkg/types/compactor.go @@ -0,0 +1,92 @@ +package types + +import ( + "fmt" + "io/ioutil" + "path" + "time" + + "github.com/gardener/etcd-backup-restore/pkg/snapstore" + flag "github.com/spf13/pflag" + "go.etcd.io/etcd/pkg/types" +) + +// CompactionResult holds the compaction details after a compaction is done +type CompactionResult struct { + Snapshot *snapstore.Snapshot + LastCompactionDuration time.Duration +} + +// CompactionConfig holds the configuration data for compaction +type CompactionConfig struct { + rc *RestorationConfig +} + +// NewCompactionConfig returns the compaction config. +func NewCompactionConfig() (*CompactionConfig, error) { + restoreDir, err := getEtcdDir("/tmp") + if err != nil { + return nil, err + } + + return &CompactionConfig{ + rc: &RestorationConfig{ + InitialCluster: initialClusterFromName(defaultName), + InitialClusterToken: defaultInitialClusterToken, + RestoreDataDir: fmt.Sprintf("%s/%s.etcd", restoreDir, defaultName), + InitialAdvertisePeerURLs: []string{defaultInitialAdvertisePeerURLs}, + Name: defaultName, + SkipHashCheck: false, + MaxFetchers: defaultMaxFetchers, + MaxCallSendMsgSize: defaultMaxCallSendMsgSize, + MaxRequestBytes: defaultMaxRequestBytes, + MaxTxnOps: defaultMaxTxnOps, + EmbeddedEtcdQuotaBytes: int64(defaultEmbeddedEtcdQuotaBytes), + }, + }, nil +} + +// AddFlags adds the flags to flagset. +func (c *CompactionConfig) AddFlags(fs *flag.FlagSet) { + fs.StringVar(&c.rc.InitialCluster, "initial-cluster", c.rc.InitialCluster, "initial cluster configuration for restore bootstrap") + fs.StringVar(&c.rc.InitialClusterToken, "initial-cluster-token", c.rc.InitialClusterToken, "initial cluster token for the etcd cluster during restore bootstrap") + fs.StringVarP(&c.rc.RestoreDataDir, "data-dir", "d", c.rc.RestoreDataDir, "path to the data directory") + fs.StringArrayVar(&c.rc.InitialAdvertisePeerURLs, "initial-advertise-peer-urls", c.rc.InitialAdvertisePeerURLs, "list of this member's peer URLs to advertise to the rest of the cluster") + fs.StringVar(&c.rc.Name, "name", c.rc.Name, "human-readable name for this member") + fs.BoolVar(&c.rc.SkipHashCheck, "skip-hash-check", c.rc.SkipHashCheck, "ignore snapshot integrity hash value (required if copied from data directory)") + fs.UintVar(&c.rc.MaxFetchers, "max-fetchers", c.rc.MaxFetchers, "maximum number of threads that will fetch delta snapshots in parallel") + fs.IntVar(&c.rc.MaxCallSendMsgSize, "max-call-send-message-size", c.rc.MaxCallSendMsgSize, "maximum size of message that the client sends") + fs.UintVar(&c.rc.MaxRequestBytes, "max-request-bytes", c.rc.MaxRequestBytes, "Maximum client request size in bytes the server will accept") + fs.UintVar(&c.rc.MaxTxnOps, "max-txn-ops", c.rc.MaxTxnOps, "Maximum number of operations permitted in a transaction") + fs.Int64Var(&c.rc.EmbeddedEtcdQuotaBytes, "embedded-etcd-quota-bytes", c.rc.EmbeddedEtcdQuotaBytes, "maximum backend quota for the embedded etcd used for applying delta snapshots") +} + +// Validate validates the config. +func (c *CompactionConfig) Validate() error { + if _, err := types.NewURLsMap(c.rc.InitialCluster); err != nil { + return fmt.Errorf("failed creating url map for restore cluster: %v", err) + } + if _, err := types.NewURLs(c.rc.InitialAdvertisePeerURLs); err != nil { + return fmt.Errorf("failed parsing peers urls for restore cluster: %v", err) + } + if c.rc.MaxCallSendMsgSize <= 0 { + return fmt.Errorf("max call send message should be greater than zero") + } + if c.rc.MaxFetchers <= 0 { + return fmt.Errorf("max fetchers should be greater than zero") + } + if c.rc.EmbeddedEtcdQuotaBytes <= 0 { + return fmt.Errorf("Etcd Quota size for etcd must be greater than 0") + } + c.rc.RestoreDataDir = path.Clean(c.rc.RestoreDataDir) + return nil +} + +func getEtcdDir(dir string) (string, error) { + outputDir, err := ioutil.TempDir(dir, "compactor") + if err != nil { + return "", err + } + + return outputDir, nil +} diff --git a/pkg/snapshot/restorer/types.go b/pkg/types/restorer.go old mode 100755 new mode 100644 similarity index 52% rename from pkg/snapshot/restorer/types.go rename to pkg/types/restorer.go index dab94225c..c545054ca --- a/pkg/snapshot/restorer/types.go +++ b/pkg/types/restorer.go @@ -12,23 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. -package restorer +package types import ( + "fmt" "net/url" + "path" "time" "github.com/gardener/etcd-backup-restore/pkg/snapstore" - "github.com/sirupsen/logrus" + flag "github.com/spf13/pflag" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/pkg/types" - "go.uber.org/zap" ) const ( - tmpDir = "/tmp" - tmpEventsDataFilePrefix = "etcd-restore-" - defaultName = "default" defaultInitialAdvertisePeerURLs = "http://localhost:2380" defaultInitialClusterToken = "etcd-cluster" @@ -39,13 +37,6 @@ const ( defaultEmbeddedEtcdQuotaBytes = 8 * 1024 * 1024 * 1024 //8Gib ) -// Restorer is a struct for etcd data directory restorer -type Restorer struct { - logger *logrus.Entry - zapLogger *zap.Logger - store snapstore.SnapStore -} - // RestoreOptions hold all snapshot restore related fields // Note: Please ensure DeepCopy and DeepCopyInto are properly implemented. type RestoreOptions struct { @@ -53,7 +44,7 @@ type RestoreOptions struct { ClusterURLs types.URLsMap PeerURLs types.URLs // Base full snapshot + delta snapshots to restore from - BaseSnapshot snapstore.Snapshot + BaseSnapshot *snapstore.Snapshot DeltaSnapList snapstore.SnapList } @@ -73,24 +64,112 @@ type RestorationConfig struct { EmbeddedEtcdQuotaBytes int64 `json:"embeddedEtcdQuotaBytes,omitempty"` } -type initIndex int +// NewRestorationConfig returns the restoration config. +func NewRestorationConfig() *RestorationConfig { + return &RestorationConfig{ + InitialCluster: initialClusterFromName(defaultName), + InitialClusterToken: defaultInitialClusterToken, + RestoreDataDir: fmt.Sprintf("%s.etcd", defaultName), + InitialAdvertisePeerURLs: []string{defaultInitialAdvertisePeerURLs}, + Name: defaultName, + SkipHashCheck: false, + MaxFetchers: defaultMaxFetchers, + MaxCallSendMsgSize: defaultMaxCallSendMsgSize, + MaxRequestBytes: defaultMaxRequestBytes, + MaxTxnOps: defaultMaxTxnOps, + EmbeddedEtcdQuotaBytes: int64(defaultEmbeddedEtcdQuotaBytes), + } +} + +// AddFlags adds the flags to flagset. +func (c *RestorationConfig) AddFlags(fs *flag.FlagSet) { + fs.StringVar(&c.InitialCluster, "initial-cluster", c.InitialCluster, "initial cluster configuration for restore bootstrap") + fs.StringVar(&c.InitialClusterToken, "initial-cluster-token", c.InitialClusterToken, "initial cluster token for the etcd cluster during restore bootstrap") + fs.StringVarP(&c.RestoreDataDir, "data-dir", "d", c.RestoreDataDir, "path to the data directory") + fs.StringArrayVar(&c.InitialAdvertisePeerURLs, "initial-advertise-peer-urls", c.InitialAdvertisePeerURLs, "list of this member's peer URLs to advertise to the rest of the cluster") + fs.StringVar(&c.Name, "name", c.Name, "human-readable name for this member") + fs.BoolVar(&c.SkipHashCheck, "skip-hash-check", c.SkipHashCheck, "ignore snapshot integrity hash value (required if copied from data directory)") + fs.UintVar(&c.MaxFetchers, "max-fetchers", c.MaxFetchers, "maximum number of threads that will fetch delta snapshots in parallel") + fs.IntVar(&c.MaxCallSendMsgSize, "max-call-send-message-size", c.MaxCallSendMsgSize, "maximum size of message that the client sends") + fs.UintVar(&c.MaxRequestBytes, "max-request-bytes", c.MaxRequestBytes, "Maximum client request size in bytes the server will accept") + fs.UintVar(&c.MaxTxnOps, "max-txn-ops", c.MaxTxnOps, "Maximum number of operations permitted in a transaction") + fs.Int64Var(&c.EmbeddedEtcdQuotaBytes, "embedded-etcd-quota-bytes", c.EmbeddedEtcdQuotaBytes, "maximum backend quota for the embedded etcd used for applying delta snapshots") +} -func (i *initIndex) ConsistentIndex() uint64 { +// Validate validates the config. +func (c *RestorationConfig) Validate() error { + if _, err := types.NewURLsMap(c.InitialCluster); err != nil { + return fmt.Errorf("failed creating url map for restore cluster: %v", err) + } + if _, err := types.NewURLs(c.InitialAdvertisePeerURLs); err != nil { + return fmt.Errorf("failed parsing peers urls for restore cluster: %v", err) + } + if c.MaxCallSendMsgSize <= 0 { + return fmt.Errorf("max call send message should be greater than zero") + } + if c.MaxFetchers <= 0 { + return fmt.Errorf("max fetchers should be greater than zero") + } + if c.EmbeddedEtcdQuotaBytes <= 0 { + return fmt.Errorf("Etcd Quota size for etcd must be greater than 0") + } + c.RestoreDataDir = path.Clean(c.RestoreDataDir) + return nil +} + +// DeepCopyInto copies the structure deeply from in to out. +func (c *RestorationConfig) DeepCopyInto(out *RestorationConfig) { + *out = *c + if c.InitialAdvertisePeerURLs != nil { + c, out := &c.InitialAdvertisePeerURLs, &out.InitialAdvertisePeerURLs + *out = make([]string, len(*c)) + for i, v := range *c { + (*out)[i] = v + } + } +} + +// DeepCopy returns a deeply copied structure. +func (c *RestorationConfig) DeepCopy() *RestorationConfig { + if c == nil { + return nil + } + + out := new(RestorationConfig) + c.DeepCopyInto(out) + return out +} + +func initialClusterFromName(name string) string { + n := name + if name == "" { + n = defaultName + } + return fmt.Sprintf("%s=http://localhost:2380", n) +} + +// InitIndex stores the index +type InitIndex int + +// ConsistentIndex gets the index +func (i *InitIndex) ConsistentIndex() uint64 { return uint64(*i) } -// event is wrapper over etcd event to keep track of time of event -type event struct { +// Event is wrapper over etcd event to keep track of time of event +type Event struct { EtcdEvent *clientv3.Event `json:"etcdEvent"` Time time.Time `json:"time"` } -type fetcherInfo struct { +// FetcherInfo stores the information about fetcher +type FetcherInfo struct { Snapshot snapstore.Snapshot SnapIndex int } -type applierInfo struct { +// ApplierInfo stores the info about applier +type ApplierInfo struct { EventsFilePath string SnapIndex int } @@ -163,26 +242,3 @@ func (in *RestoreOptions) DeepCopy() *RestoreOptions { in.DeepCopyInto(out) return out } - -// DeepCopyInto copies the structure deeply from in to out. -func (in *RestorationConfig) DeepCopyInto(out *RestorationConfig) { - *out = *in - if in.InitialAdvertisePeerURLs != nil { - in, out := &in.InitialAdvertisePeerURLs, &out.InitialAdvertisePeerURLs - *out = make([]string, len(*in)) - for i, v := range *in { - (*out)[i] = v - } - } -} - -// DeepCopy returns a deeply copied structure. -func (in *RestorationConfig) DeepCopy() *RestorationConfig { - if in == nil { - return nil - } - - out := new(RestorationConfig) - in.DeepCopyInto(out) - return out -} diff --git a/test/utils/utils.go b/test/utils/utils.go index 40de38cb6..b238d5733 100644 --- a/test/utils/utils.go +++ b/test/utils/utils.go @@ -23,6 +23,11 @@ import ( "sync" "time" + "github.com/gardener/etcd-backup-restore/pkg/compressor" + "github.com/gardener/etcd-backup-restore/pkg/etcdutil" + "github.com/gardener/etcd-backup-restore/pkg/snapshot/snapshotter" + "github.com/gardener/etcd-backup-restore/pkg/snapstore" + "github.com/gardener/etcd-backup-restore/pkg/wrappers" "github.com/sirupsen/logrus" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/embed" @@ -176,3 +181,31 @@ func ContextWithWaitGroupFollwedByGracePeriod(parent context.Context, wg *sync.W ctx := ContextWithWaitGroup(parent, wg) return ContextWithGracePeriod(ctx, gracePeriod) } + +// RunSnapshotter creates a snapshotter object and runs it for a duration specified by 'snapshotterDurationSeconds' +func RunSnapshotter(logger *logrus.Entry, container string, deltaSnapshotPeriod time.Duration, endpoints []string, stopCh <-chan struct{}, startWithFullSnapshot bool, compressionConfig *compressor.CompressionConfig) error { + store, err := snapstore.GetSnapstore(&snapstore.Config{Container: container, Provider: "Local"}) + if err != nil { + return err + } + + etcdConnectionConfig := etcdutil.NewEtcdConnectionConfig() + etcdConnectionConfig.ConnectionTimeout.Duration = 10 * time.Second + etcdConnectionConfig.Endpoints = endpoints + + snapshotterConfig := &snapshotter.Config{ + FullSnapshotSchedule: "0 0 1 1 *", + DeltaSnapshotPeriod: wrappers.Duration{Duration: deltaSnapshotPeriod}, + DeltaSnapshotMemoryLimit: snapshotter.DefaultDeltaSnapMemoryLimit, + GarbageCollectionPeriod: wrappers.Duration{Duration: time.Minute}, + GarbageCollectionPolicy: snapshotter.GarbageCollectionPolicyLimitBased, + MaxBackups: 1, + } + + ssr, err := snapshotter.NewSnapshotter(logger, snapshotterConfig, store, etcdConnectionConfig, compressionConfig) + if err != nil { + return err + } + + return ssr.Run(stopCh, startWithFullSnapshot) +} diff --git a/vendor/github.com/onsi/gomega/go.mod b/vendor/github.com/onsi/gomega/go.mod index 778935141..6179333c1 100644 --- a/vendor/github.com/onsi/gomega/go.mod +++ b/vendor/github.com/onsi/gomega/go.mod @@ -1,5 +1,7 @@ module github.com/onsi/gomega +go 1.15 + require ( github.com/golang/protobuf v1.4.2 github.com/onsi/ginkgo v1.12.1