diff --git a/cmd/initializer.go b/cmd/initializer.go index 50ac805dc..1b61fadbf 100644 --- a/cmd/initializer.go +++ b/cmd/initializer.go @@ -49,7 +49,7 @@ func NewInitializeCommand(stopCh <-chan struct{}) *cobra.Command { } options := &restorer.RestoreOptions{ - RestoreDataDir: restoreDataDir, + RestoreDataDir: path.Clean(restoreDataDir), Name: restoreName, ClusterURLs: clusterUrlsMap, PeerURLs: peerUrls, diff --git a/cmd/restore.go b/cmd/restore.go index 08c2f2d05..0f086ec5f 100644 --- a/cmd/restore.go +++ b/cmd/restore.go @@ -73,7 +73,7 @@ func NewRestoreCommand(stopCh <-chan struct{}) *cobra.Command { rs := restorer.NewRestorer(store, logger) options := &restorer.RestoreOptions{ - RestoreDataDir: restoreDataDir, + RestoreDataDir: path.Clean(restoreDataDir), Name: restoreName, BaseSnapshot: *baseSnap, DeltaSnapList: deltaSnapList, diff --git a/cmd/server.go b/cmd/server.go index d2d4b8a46..96425ff15 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -38,7 +38,7 @@ func NewServerCommand(stopCh <-chan struct{}) *cobra.Command { var serverCmd = &cobra.Command{ Use: "server", Short: "start the http server with backup scheduler.", - Long: `Server will keep listening for http request to deliver its functionality through http endpoins.`, + Long: `Server will keep listening for http request to deliver its functionality through http endpoints.`, Run: func(cmd *cobra.Command, args []string) { var ( snapstoreConfig *snapstore.Config @@ -56,7 +56,7 @@ func NewServerCommand(stopCh <-chan struct{}) *cobra.Command { } options := &restorer.RestoreOptions{ - RestoreDataDir: restoreDataDir, + RestoreDataDir: path.Clean(restoreDataDir), Name: restoreName, ClusterURLs: clusterUrlsMap, PeerURLs: peerUrls, @@ -238,7 +238,7 @@ func handleNoSsrRequest(handler *server.HTTPHandler) { } } -// handleSsrRequest responds to handlers reqeust and stop interrupt. +// handleSsrRequest responds to handlers request and stop interrupt. func handleSsrRequest(handler *server.HTTPHandler, ssr *snapshotter.Snapshotter, ackCh, ssrStopCh chan struct{}, stopCh <-chan struct{}) { for { var ok bool diff --git a/pkg/initializer/initializer.go b/pkg/initializer/initializer.go index 5bc035cf3..5b0e556a8 100644 --- a/pkg/initializer/initializer.go +++ b/pkg/initializer/initializer.go @@ -63,7 +63,8 @@ func NewInitializer(options *restorer.RestoreOptions, snapstoreConfig *snapstore }, Validator: &validator.DataValidator{ Config: &validator.Config{ - DataDir: options.RestoreDataDir, + DataDir: options.RestoreDataDir, + SnapstoreConfig: snapstoreConfig, }, Logger: logger, }, diff --git a/pkg/initializer/validator/datavalidator.go b/pkg/initializer/validator/datavalidator.go index 62d22a5c3..1c9521e79 100644 --- a/pkg/initializer/validator/datavalidator.go +++ b/pkg/initializer/validator/datavalidator.go @@ -15,6 +15,7 @@ package validator import ( + "encoding/binary" "errors" "fmt" "hash/crc32" @@ -31,26 +32,10 @@ import ( "github.com/coreos/etcd/snap/snappb" "github.com/coreos/etcd/wal" "github.com/coreos/etcd/wal/walpb" + "github.com/gardener/etcd-backup-restore/pkg/snapstore" "github.com/sirupsen/logrus" ) -const ( - // DataDirectoryValid indicates data directory is valid. - DataDirectoryValid = iota - // DataDirectoryNotExist indicates data directory is non-existent. - DataDirectoryNotExist - // DataDirectoryInvStruct indicates data directory has invalid structure. - DataDirectoryInvStruct - // DataDirectoryCorrupt indicates data directory is corrupt. - DataDirectoryCorrupt - // DataDirectoryError indicates unknown error while validation. - DataDirectoryError -) - -const ( - snapSuffix = ".snap" -) - var ( // A map of valid files that can be present in the snap folder. validFiles = map[string]bool{ @@ -70,9 +55,6 @@ var ( logger *logrus.Logger ) -// DataDirStatus represents the status of the etcd data directory. -type DataDirStatus int - func init() { logger = logrus.New() } @@ -85,7 +67,7 @@ func (d *DataValidator) snapDir() string { return filepath.Join(d.memberDir(), " func (d *DataValidator) backendPath() string { return filepath.Join(d.snapDir(), "db") } -//Validate performs the steps required to validate data for Etcd instance. +// Validate performs the steps required to validate data for Etcd instance. // The steps involved are: // * Check if data directory exists. // - If data directory exists @@ -112,12 +94,27 @@ func (d *DataValidator) Validate() (DataDirStatus, error) { d.Logger.Infof("Data directory structure invalid.") return DataDirectoryInvStruct, nil } + + d.Logger.Info("Checking for revision consistency...") + revisionConsistencyCheckStatus, err := d.CheckRevisionConsistency() + if revisionConsistencyCheckStatus == RevisionConsistencyCheckError { + d.Logger.Infof("Unable to check revision consistency. %v", err) + return RevisionConsistencyError, nil + } else if revisionConsistencyCheckStatus == RevisionConsistencyCheckFailure { + d.Logger.Infof("Inconsistent revision numbers between db file and latest snapshot. %v", err) + return RevisionConsistencyError, nil + } else if err != nil { + d.Logger.Infof("Unknown error in revision consistency check. %v", err) + return RevisionConsistencyError, nil + } + d.Logger.Info("Checking for data directory files corruption...") err = d.checkForDataCorruption() if err != nil { d.Logger.Infof("Data directory corrupt. %v", err) return DataDirectoryCorrupt, nil } + d.Logger.Info("Data directory valid.") return DataDirectoryValid, nil } @@ -273,7 +270,7 @@ func checkSuffix(names []string) []string { snaps = append(snaps, names[i]) } else { // If we find a file which is not a snapshot then check if it's - // a vaild file. If not throw out a warning. + // a valid file. If not throw out a warning. if _, ok := validFiles[names[i]]; !ok { fmt.Printf("skipped unexpected non snapshot file %v", names[i]) } @@ -347,3 +344,65 @@ func verifyDB(path string) error { return nil }) } + +// CheckRevisionConsistency compares the latest revisions on the etcd db file and the latest snapshot to verify that the etcd revision is not lesser than snapshot revision. +func (d *DataValidator) CheckRevisionConsistency() (RevisionConsistencyCheckStatus, error) { + etcdRevision, err := getRevision(d.backendPath()) + if err != nil { + return RevisionConsistencyCheckError, fmt.Errorf("unable to get current etcd revision from backend db file: %v", err) + } + + store, err := snapstore.GetSnapstore(d.Config.SnapstoreConfig) + snapList, err := store.List() + if err != nil { + return RevisionConsistencyCheckError, fmt.Errorf("unable to list snapshots from store: %v", err) + } + + latestSnapshotRevision := snapList[len(snapList)-1].LastRevision + + if etcdRevision < latestSnapshotRevision { + return RevisionConsistencyCheckFailure, fmt.Errorf("current etcd revision (%d) is less than latest snapshot revision (%d): possible data loss", etcdRevision, latestSnapshotRevision) + } + + return RevisionConsistencyCheckSuccess, nil +} + +// getRevision finds out the latest revision on the etcd db file without starting etcd server or an embedded etcd server. +func getRevision(path string) (int64, error) { + if _, err := os.Stat(path); err != nil { + return -1, fmt.Errorf("unable to stat backend db file: %v", err) + } + + db, err := bolt.Open(path, 0400, &bolt.Options{ReadOnly: true}) + if err != nil { + return -1, fmt.Errorf("unable to open backend boltdb file: %v", err) + } + defer db.Close() + + var rev int64 + + err = db.View(func(tx *bolt.Tx) error { + c := tx.Cursor() + + for next, _ := c.First(); next != nil; next, _ = c.Next() { + b := tx.Bucket(next) + if b == nil { + return fmt.Errorf("cannot get hash of bucket %s", string(next)) + } + isKey := (string(next) == "key") + b.ForEach(func(k, v []byte) error { + if isKey { + rev = int64(binary.BigEndian.Uint64(k[0:8])) + } + return nil + }) + } + return nil + }) + + if err != nil { + return -1, err + } + + return rev, nil +} diff --git a/pkg/initializer/validator/datavalidator_test.go b/pkg/initializer/validator/datavalidator_test.go new file mode 100644 index 000000000..fcfebd53d --- /dev/null +++ b/pkg/initializer/validator/datavalidator_test.go @@ -0,0 +1,220 @@ +package validator_test + +import ( + "fmt" + "os" + "path" + + "github.com/gardener/etcd-backup-restore/pkg/snapstore" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/sirupsen/logrus" + + . "github.com/gardener/etcd-backup-restore/pkg/initializer/validator" +) + +var _ = Describe("Running Datavalidator", func() { + var ( + restoreDataDir string + snapstoreBackupDir string + snapstoreConfig *snapstore.Config + logger *logrus.Logger + validator *DataValidator + ) + + BeforeEach(func() { + logger = logrus.New() + restoreDataDir = path.Clean(etcdDir) + snapstoreBackupDir = path.Clean(snapstoreDir) + // restoreClusterToken = "etcd-cluster" + // restoreName = "default" + // restoreCluster = restoreName + "=http://localhost:2380" + // restorePeerURLs = []string{"http://localhost:2380"} + // clusterUrlsMap, err = types.NewURLsMap(restoreCluster) + // Expect(err).ShouldNot(HaveOccurred()) + // peerUrls, err = types.NewURLs(restorePeerURLs) + // Expect(err).ShouldNot(HaveOccurred()) + // skipHashCheck = false + // maxFetchers = 6 + + snapstoreConfig = &snapstore.Config{ + Container: snapstoreBackupDir, + Provider: "Local", + } + + validator = &DataValidator{ + Config: &Config{ + DataDir: restoreDataDir, + SnapstoreConfig: snapstoreConfig, + }, + Logger: logger, + } + }) + Context("with missing data directory", func() { + It("should return DataDirStatus as DataDirectoryNotExist or DataDirectoryError, and non-nil error", func() { + tempDir := fmt.Sprintf("%s.%s", restoreDataDir, "temp") + err = os.Rename(restoreDataDir, tempDir) + if err != nil { + Fail(fmt.Sprintf("Unable to rename %s to %s: %v", restoreDataDir, tempDir, err)) + } + dataDirStatus, err := validator.Validate() + Expect(err).Should(HaveOccurred()) + Expect(int(dataDirStatus)).Should(SatisfyAny(Equal(DataDirectoryNotExist), Equal(DataDirectoryError))) + err = os.Rename(tempDir, restoreDataDir) + if err != nil { + Fail(fmt.Sprintf("Unable to rename %s to %s: %v", tempDir, restoreDataDir, err)) + } + }) + }) + + Context("with data directory present", func() { + Context("with incorrect data directory structure", func() { + Context("with missing member directory", func() { + It("should return DataDirStatus as DataDirectoryInvStruct or DataDirectoryError, and nil error", func() { + memberDir := path.Join(restoreDataDir, "member") + tempDir := fmt.Sprintf("%s.%s", memberDir, "temp") + err = os.Rename(memberDir, tempDir) + if err != nil { + Fail(fmt.Sprintf("Unable to rename %s to %s: %v", memberDir, tempDir, err)) + } + dataDirStatus, err := validator.Validate() + Expect(err).ShouldNot(HaveOccurred()) + Expect(int(dataDirStatus)).Should(SatisfyAny(Equal(DataDirectoryInvStruct), Equal(DataDirectoryError))) + err = os.Rename(tempDir, memberDir) + if err != nil { + Fail(fmt.Sprintf("Unable to rename %s to %s: %v", tempDir, memberDir, err)) + } + }) + }) + Context("with member directory present", func() { + Context("with missing snap directory", func() { + It("should return DataDirStatus as DataDirectoryInvStruct or DataDirectoryError, and nil error", func() { + snapDir := path.Join(restoreDataDir, "member", "snap") + tempDir := fmt.Sprintf("%s.%s", snapDir, "temp") + err = os.Rename(snapDir, tempDir) + if err != nil { + Fail(fmt.Sprintf("Unable to rename %s to %s: %v", snapDir, tempDir, err)) + } + dataDirStatus, err := validator.Validate() + Expect(err).ShouldNot(HaveOccurred()) + Expect(int(dataDirStatus)).Should(SatisfyAny(Equal(DataDirectoryInvStruct), Equal(DataDirectoryError))) + err = os.Rename(tempDir, snapDir) + if err != nil { + Fail(fmt.Sprintf("Unable to rename %s to %s: %v", tempDir, snapDir, err)) + } + }) + }) + Context("with missing wal directory", func() { + It("should return DataDirStatus as DataDirectoryInvStruct or DataDirectoryError, and nil error", func() { + walDir := path.Join(restoreDataDir, "member", "wal") + tempDir := fmt.Sprintf("%s.%s", walDir, "temp") + err = os.Rename(walDir, tempDir) + if err != nil { + Fail(fmt.Sprintf("Unable to rename %s to %s: %v", walDir, tempDir, err)) + } + dataDirStatus, err := validator.Validate() + Expect(err).ShouldNot(HaveOccurred()) + Expect(int(dataDirStatus)).Should(SatisfyAny(Equal(DataDirectoryInvStruct), Equal(DataDirectoryError))) + err = os.Rename(tempDir, walDir) + if err != nil { + Fail(fmt.Sprintf("Unable to rename %s to %s: %v", tempDir, walDir, err)) + } + }) + }) + }) + }) + Context("with correct data directory structure", func() { + Context("with inconsistent revision numbers between etcd and latest snapshot", func() { + It("should return DataDirStatus as RevisionConsistencyError or DataDirectoryError, and nil error", func() { + tempDir := fmt.Sprintf("%s.%s", restoreDataDir, "temp") + err = os.Rename(restoreDataDir, tempDir) + if err != nil { + Fail(fmt.Sprintf("Unable to rename %s to %s: %v", restoreDataDir, tempDir, err)) + } + + // start etcd + etcd, err = startEmbeddedEtcd(restoreDataDir, logger) + Expect(err).ShouldNot(HaveOccurred()) + // populate etcd but with lesser data than previous populate call, so that the new db has a lower revision + newEtcdRevision, err := populateEtcdFinite(logger, endpoints, keyFrom, int(keyTo/2)) + Expect(err).ShouldNot(HaveOccurred()) + + etcd.Server.Stop() + etcd.Close() + + fmt.Printf("\n---------\nPrev etcd revision: %d\nNew etcd revision: %d\n-------\n", etcdRevision, newEtcdRevision) + + // etcdRevision: latest revision number on the snapstore (etcd backup) + // newEtcdRevision: current revision number on etcd db + Expect(etcdRevision).To(BeNumerically(">=", newEtcdRevision)) + + dataDirStatus, err := validator.Validate() + Expect(err).ShouldNot(HaveOccurred()) + Expect(int(dataDirStatus)).Should(SatisfyAny(Equal(RevisionConsistencyError), Equal(DataDirectoryError))) + + err = os.RemoveAll(restoreDataDir) + if err != nil { + Fail(fmt.Sprintf("Unable to remove %s: %v", restoreDataDir, err)) + } + + err = os.Rename(tempDir, restoreDataDir) + if err != nil { + Fail(fmt.Sprintf("Unable to rename %s to %s: %v", tempDir, restoreDataDir, err)) + } + }) + }) + Context("with consistent revision numbers between etcd and latest snapshot", func() { + Context("with corrupt data directory", func() { + Context("with corrupt db file", func() { + It("should return DataDirStatus as DataDirectoryCorrupt or DataDirectoryError or RevisionConsistencyError, and nil error", func() { + dbFile := path.Join(restoreDataDir, "member", "snap", "db") + if _, err = os.Stat(dbFile); os.IsNotExist(err) { + Fail(fmt.Sprintf("Unable to find db file %s: %v", dbFile, err)) + } + + tempFile := path.Join(outputDir, "temp", "db") + err = copyFile(dbFile, tempFile) + if err != nil { + Fail(fmt.Sprintf("Unable to copy %s to %s: %v", dbFile, tempFile, err)) + } + + file, err := os.OpenFile( + dbFile, + os.O_WRONLY|os.O_TRUNC|os.O_CREATE, + 0666, + ) + Expect(err).ShouldNot(HaveOccurred()) + defer file.Close() + + // corrupt the db file by writing random data to it + byteSlice := []byte("Random data!\n") + _, err = file.Write(byteSlice) + Expect(err).ShouldNot(HaveOccurred()) + + dataDirStatus, err := validator.Validate() + Expect(err).ShouldNot(HaveOccurred()) + Expect(int(dataDirStatus)).Should(SatisfyAny(Equal(DataDirectoryCorrupt), Equal(DataDirectoryError), Equal(RevisionConsistencyError))) + + err = os.Remove(dbFile) + if err != nil { + Fail(fmt.Sprintf("Unable to remove corrupted db file at %s: %v", dbFile, err)) + } + + err = os.Rename(tempFile, dbFile) + if err != nil { + Fail(fmt.Sprintf("Unable to rename %s to %s: %v", tempFile, dbFile, err)) + } + }) + }) + }) + Context("with clean data directory", func() { + It("should return DataDirStatus as DataDirectoryValid, and nil error", func() { + dataDirStatus, err := validator.Validate() + Expect(err).ShouldNot(HaveOccurred()) + Expect(int(dataDirStatus)).Should(Equal(DataDirectoryValid)) + }) + }) + }) + }) + }) +}) diff --git a/pkg/initializer/validator/types.go b/pkg/initializer/validator/types.go index 5c5046d98..eaea9dc53 100644 --- a/pkg/initializer/validator/types.go +++ b/pkg/initializer/validator/types.go @@ -14,11 +14,49 @@ package validator -import "github.com/sirupsen/logrus" +import ( + "github.com/gardener/etcd-backup-restore/pkg/snapstore" + "github.com/sirupsen/logrus" +) + +// DataDirStatus represents the status of the etcd data directory. +type DataDirStatus int + +const ( + // DataDirectoryValid indicates data directory is valid. + DataDirectoryValid = iota + // DataDirectoryNotExist indicates data directory is non-existent. + DataDirectoryNotExist + // DataDirectoryInvStruct indicates data directory has invalid structure. + DataDirectoryInvStruct + // DataDirectoryCorrupt indicates data directory is corrupt. + DataDirectoryCorrupt + // DataDirectoryError indicates unknown error while validation. + DataDirectoryError + // RevisionConsistencyError indicates current etcd revision is inconsistent with latest snapshot revision. + RevisionConsistencyError +) + +// RevisionConsistencyCheckStatus represents the status of the etcd's revision consistency with the latest snapshot. +type RevisionConsistencyCheckStatus int + +const ( + // RevisionConsistencyCheckSuccess indicates that the etcd revision is greater than or equal to the latest snapshot revision + RevisionConsistencyCheckSuccess = iota + // RevisionConsistencyCheckFailure indicates that the etcd revision is less than the latest snapshot revision + RevisionConsistencyCheckFailure + // RevisionConsistencyCheckError indicates unknown error during revision consistency check + RevisionConsistencyCheckError +) + +const ( + snapSuffix = ".snap" +) // Config store configuration for DataValidator. type Config struct { - DataDir string + DataDir string + SnapstoreConfig *snapstore.Config } // DataValidator contains implements Validator interface to perform data validation. diff --git a/pkg/initializer/validator/validator_suite_test.go b/pkg/initializer/validator/validator_suite_test.go new file mode 100644 index 000000000..1d7babcfa --- /dev/null +++ b/pkg/initializer/validator/validator_suite_test.go @@ -0,0 +1,273 @@ +package validator_test + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "path" + "strconv" + "testing" + "time" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/embed" + "github.com/gardener/etcd-backup-restore/pkg/etcdutil" + "github.com/gardener/etcd-backup-restore/pkg/snapshot/snapshotter" + "github.com/gardener/etcd-backup-restore/pkg/snapstore" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/sirupsen/logrus" +) + +const ( + outputDir = "../../../test/output" + etcdDir = outputDir + "/default.etcd" + snapstoreDir = outputDir + "/snapshotter.bkp" + etcdEndpoint = "http://localhost:2379" + snapshotterDurationSeconds = 15 + snapCount = 10 + keyPrefix = "key-" + valuePrefix = "val-" + keyFrom = 1 +) + +var ( + etcd *embed.Etcd + err error + keyTo int + endpoints = []string{etcdEndpoint} + etcdRevision int64 +) + +// fileInfo holds file information such as file name and file path +type fileInfo struct { + name string + path string +} + +func TestValidator(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Validator Suite") +} + +var _ = SynchronizedBeforeSuite(func() []byte { + var ( + data []byte + errCh = make(chan error) + populatorStopCh = make(chan bool) + populatorStoppedCh = make(chan bool) + ssrStopCh = make(chan struct{}) + ) + + err = os.RemoveAll(outputDir) + Expect(err).ShouldNot(HaveOccurred()) + + logger := logrus.New() + + err = os.RemoveAll(etcdDir) + Expect(err).ShouldNot(HaveOccurred()) + + etcd, err = startEmbeddedEtcd(etcdDir, logger) + Expect(err).ShouldNot(HaveOccurred()) + + go populateEtcd(logger, endpoints, errCh, populatorStopCh, populatorStoppedCh) + + go func() { + <-time.After(time.Duration(snapshotterDurationSeconds * time.Second)) + close(populatorStopCh) + <-populatorStoppedCh + close(ssrStopCh) + }() + + err = runSnapshotter(logger, endpoints, ssrStopCh) + Expect(err).ShouldNot(HaveOccurred()) + + etcd.Server.Stop() + etcd.Close() + + err = os.Mkdir(path.Join(outputDir, "temp"), 0700) + Expect(err).ShouldNot(HaveOccurred()) + + return data +}, func(data []byte) {}) + +var _ = SynchronizedAfterSuite(func() {}, func() { + +}) + +// startEmbeddedEtcd starts an embedded etcd server +func startEmbeddedEtcd(dir string, logger *logrus.Logger) (*embed.Etcd, error) { + logger.Infof("Starting embedded etcd") + cfg := embed.NewConfig() + cfg.Dir = dir + cfg.EnableV2 = false + cfg.Debug = false + cfg.GRPCKeepAliveTimeout = 0 + cfg.SnapCount = snapCount + e, err := embed.StartEtcd(cfg) + if err != nil { + return nil, err + } + + select { + case <-e.Server.ReadyNotify(): + fmt.Printf("Embedded server is ready!\n") + case <-time.After(60 * time.Second): + e.Server.Stop() // trigger a shutdown + e.Close() + return nil, fmt.Errorf("server took too long to start") + } + return e, nil +} + +// runSnapshotter creates a snapshotter object and runs it for a duration specified by 'snapshotterDurationSeconds' +func runSnapshotter(logger *logrus.Logger, endpoints []string, stopCh chan struct{}) error { + var ( + store snapstore.SnapStore + certFile string + keyFile string + caFile string + insecureTransport bool + insecureSkipVerify bool + maxBackups = 1 + deltaSnapshotPeriod = 5 + deltaSnapshotMemoryLimit = 10 * 1024 * 1024 //10Mib + etcdConnectionTimeout = time.Duration(10) + garbageCollectionPeriodSeconds = time.Duration(60) + schedule = "0 0 1 1 *" + garbageCollectionPolicy = snapshotter.GarbageCollectionPolicyLimitBased + ) + + store, err = snapstore.GetSnapstore(&snapstore.Config{Container: snapstoreDir, Provider: "Local"}) + if err != nil { + return err + } + + tlsConfig := etcdutil.NewTLSConfig( + certFile, + keyFile, + caFile, + insecureTransport, + insecureSkipVerify, + endpoints, + ) + + snapshotterConfig, err := snapshotter.NewSnapshotterConfig( + schedule, + store, + maxBackups, + deltaSnapshotPeriod, + deltaSnapshotMemoryLimit, + etcdConnectionTimeout, + garbageCollectionPeriodSeconds, + garbageCollectionPolicy, + tlsConfig, + ) + if err != nil { + return err + } + + ssr := snapshotter.NewSnapshotter( + logger, + snapshotterConfig, + ) + + err = ssr.Run(stopCh, true) + if err != nil { + return err + } + + return nil +} + +// populateEtcd sequentially puts key-value pairs into the embedded etcd, until stopped +func populateEtcd(logger *logrus.Logger, endpoints []string, errCh chan<- error, stopCh <-chan bool, stoppedCh chan<- bool) { + defer func() { + stoppedCh <- true + }() + + cli, err := clientv3.New(clientv3.Config{ + Endpoints: endpoints, + DialTimeout: 10 * time.Second, + }) + if err != nil { + errCh <- fmt.Errorf("unable to start etcd client: %v", err) + return + } + defer cli.Close() + + var ( + key string + value string + currKey = 0 + ) + + for { + select { + case _, more := <-stopCh: + if !more { + keyTo = currKey + logger.Infof("Populated data till key %s into embedded etcd", keyPrefix+strconv.Itoa(currKey)) + return + } + default: + currKey++ + key = keyPrefix + strconv.Itoa(currKey) + value = valuePrefix + strconv.Itoa(currKey) + resp, err := cli.Put(context.TODO(), key, value) + if err != nil { + errCh <- fmt.Errorf("unable to put key-value pair (%s, %s) into embedded etcd: %v", key, value, err) + return + } + etcdRevision = resp.Header.GetRevision() + time.Sleep(time.Second * 1) + } + } +} + +// populateEtcd sequentially puts key-value pairs through numbers 'from' and 'to' into the embedded etcd +func populateEtcdFinite(logger *logrus.Logger, endpoints []string, from, to int) (int64, error) { + cli, err := clientv3.New(clientv3.Config{ + Endpoints: endpoints, + DialTimeout: 10 * time.Second, + }) + if err != nil { + return -1, fmt.Errorf("unable to start etcd client: %v", err) + } + defer cli.Close() + + var ( + key string + value string + rev int64 + ) + + for currKey := from; currKey <= to; currKey++ { + key = keyPrefix + strconv.Itoa(currKey) + value = valuePrefix + strconv.Itoa(currKey) + resp, err := cli.Put(context.TODO(), key, value) + if err != nil { + return -1, fmt.Errorf("unable to put key-value pair (%s, %s) into embedded etcd: %v", key, value, err) + } + rev = resp.Header.GetRevision() + time.Sleep(time.Millisecond * 100) + } + + return rev, nil +} + +// copyFile copies the contents of the file at sourceFilePath into the file at destinationFilePath. If no file exists at destinationFilePath, a new file is created before copying +func copyFile(sourceFilePath, destinationFilePath string) error { + data, err := ioutil.ReadFile(sourceFilePath) + if err != nil { + return fmt.Errorf("unable to read source file %s: %v", sourceFilePath, err) + } + + err = ioutil.WriteFile(destinationFilePath, data, 0700) + if err != nil { + return fmt.Errorf("unable to create destination file %s: %v", destinationFilePath, err) + } + + return nil +} diff --git a/pkg/server/httpAPI.go b/pkg/server/httpAPI.go index 69dbcfc76..c574c3c0a 100644 --- a/pkg/server/httpAPI.go +++ b/pkg/server/httpAPI.go @@ -21,6 +21,8 @@ import ( "sync" "sync/atomic" + "github.com/gardener/etcd-backup-restore/pkg/initializer/validator" + "github.com/gardener/etcd-backup-restore/pkg/initializer" "github.com/sirupsen/logrus" ) @@ -30,6 +32,9 @@ const ( initializationStatusProgress = "Progress" initializationStatusSuccessful = "Successful" initializationStatusFailed = "Failed" + revisionSanityCheckSuccess = "RevisionConsistent" + revisionSanityCheckFailed = "RevisionInconsistent" + revisionSanityCheckError = "RevisionCheckError" ) var emptyStruct struct{} @@ -78,6 +83,7 @@ func (h *HTTPHandler) RegisterHandler() { h.initializationStatus = "New" mux.HandleFunc("/initialization/start", h.serveInitialize) mux.HandleFunc("/initialization/status", h.serveInitializationStatus) + mux.HandleFunc("/initialization/sanitycheck", h.serveRevisionSanityCheck) mux.HandleFunc("/healthz", h.serveHealthz) h.server = &http.Server{ @@ -169,3 +175,25 @@ func (h *HTTPHandler) serveInitializationStatus(rw http.ResponseWriter, req *htt h.initializationStatus = initializationStatusNew } } + +// serveRevisionSanityCheck serves the etcd revision sanity check status +func (h *HTTPHandler) serveRevisionSanityCheck(rw http.ResponseWriter, req *http.Request) { + h.Logger.Info("Received revision sanity check request.") + + revisionConsistencyCheckStatus, err := h.EtcdInitializer.Validator.CheckRevisionConsistency() + rw.WriteHeader(http.StatusOK) + + if revisionConsistencyCheckStatus == validator.RevisionConsistencyCheckError { + h.Logger.Warnf("Unable to check revision consistency: %v", err) + rw.Write([]byte(revisionSanityCheckError)) + } else if revisionConsistencyCheckStatus == validator.RevisionConsistencyCheckFailure { + h.Logger.Warnf("Inconsistent revision numbers between db file and latest snapshot: %v", err) + rw.Write([]byte(revisionSanityCheckFailed)) + } else if err != nil { + h.Logger.Warnf("Unknown error in revision consistency check: %v", err) + rw.Write([]byte(revisionSanityCheckError)) + } else { + h.Logger.Infof("Revision numbers between db file and latest snapshot are consistent.") + rw.Write([]byte(revisionSanityCheckSuccess)) + } +} diff --git a/pkg/snapshot/restorer/restorer_suite_test.go b/pkg/snapshot/restorer/restorer_suite_test.go index 2348f1409..858efbbc3 100644 --- a/pkg/snapshot/restorer/restorer_suite_test.go +++ b/pkg/snapshot/restorer/restorer_suite_test.go @@ -207,7 +207,7 @@ func populateEtcd(logger *logrus.Logger, endpoints []string, errCh chan<- error, case _, more := <-stopCh: if !more { keyTo = currKey - logger.Infof("Popolated data till key %s into embedded etcd", keyPrefix+strconv.Itoa(currKey)) + logger.Infof("Populated data till key %s into embedded etcd", keyPrefix+strconv.Itoa(currKey)) return } default: diff --git a/pkg/snapshot/restorer/restorer_test.go b/pkg/snapshot/restorer/restorer_test.go index b67ccad1d..c1689c9ab 100644 --- a/pkg/snapshot/restorer/restorer_test.go +++ b/pkg/snapshot/restorer/restorer_test.go @@ -122,6 +122,7 @@ var _ = Describe("Running Restorer", func() { Expect(err).ShouldNot(HaveOccurred()) err = checkDataConsistency(restoreDataDir, logger) + Expect(err).ShouldNot(HaveOccurred()) }) })