Skip to content

Commit

Permalink
Add sanity check for revision numbers
Browse files Browse the repository at this point in the history
  • Loading branch information
shreyas-s-rao committed Jan 12, 2019
1 parent 26f7194 commit 3871e39
Show file tree
Hide file tree
Showing 11 changed files with 581 additions and 41 deletions.
2 changes: 1 addition & 1 deletion cmd/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func NewInitializeCommand(stopCh <-chan struct{}) *cobra.Command {
}

options := &restorer.RestoreOptions{
RestoreDataDir: restoreDataDir,
RestoreDataDir: path.Clean(restoreDataDir),
Name: restoreName,
ClusterURLs: clusterUrlsMap,
PeerURLs: peerUrls,
Expand Down
2 changes: 1 addition & 1 deletion cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func NewRestoreCommand(stopCh <-chan struct{}) *cobra.Command {
rs := restorer.NewRestorer(store, logger)

options := &restorer.RestoreOptions{
RestoreDataDir: restoreDataDir,
RestoreDataDir: path.Clean(restoreDataDir),
Name: restoreName,
BaseSnapshot: *baseSnap,
DeltaSnapList: deltaSnapList,
Expand Down
6 changes: 3 additions & 3 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewServerCommand(stopCh <-chan struct{}) *cobra.Command {
var serverCmd = &cobra.Command{
Use: "server",
Short: "start the http server with backup scheduler.",
Long: `Server will keep listening for http request to deliver its functionality through http endpoins.`,
Long: `Server will keep listening for http request to deliver its functionality through http endpoints.`,
Run: func(cmd *cobra.Command, args []string) {
var (
snapstoreConfig *snapstore.Config
Expand All @@ -56,7 +56,7 @@ func NewServerCommand(stopCh <-chan struct{}) *cobra.Command {
}

options := &restorer.RestoreOptions{
RestoreDataDir: restoreDataDir,
RestoreDataDir: path.Clean(restoreDataDir),
Name: restoreName,
ClusterURLs: clusterUrlsMap,
PeerURLs: peerUrls,
Expand Down Expand Up @@ -238,7 +238,7 @@ func handleNoSsrRequest(handler *server.HTTPHandler) {
}
}

// handleSsrRequest responds to handlers reqeust and stop interrupt.
// handleSsrRequest responds to handlers request and stop interrupt.
func handleSsrRequest(handler *server.HTTPHandler, ssr *snapshotter.Snapshotter, ackCh, ssrStopCh chan struct{}, stopCh <-chan struct{}) {
for {
var ok bool
Expand Down
3 changes: 2 additions & 1 deletion pkg/initializer/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ func NewInitializer(options *restorer.RestoreOptions, snapstoreConfig *snapstore
},
Validator: &validator.DataValidator{
Config: &validator.Config{
DataDir: options.RestoreDataDir,
DataDir: options.RestoreDataDir,
SnapstoreConfig: snapstoreConfig,
},
Logger: logger,
},
Expand Down
106 changes: 82 additions & 24 deletions pkg/initializer/validator/datavalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package validator

import (
"encoding/binary"
"errors"
"fmt"
"hash/crc32"
Expand All @@ -31,26 +32,11 @@ import (
"github.com/coreos/etcd/snap/snappb"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
"github.com/gardener/etcd-backup-restore/pkg/miscellaneous"
"github.com/gardener/etcd-backup-restore/pkg/snapstore"
"github.com/sirupsen/logrus"
)

const (
// DataDirectoryValid indicates data directory is valid.
DataDirectoryValid = iota
// DataDirectoryNotExist indicates data directory is non-existent.
DataDirectoryNotExist
// DataDirectoryInvStruct indicates data directory has invalid structure.
DataDirectoryInvStruct
// DataDirectoryCorrupt indicates data directory is corrupt.
DataDirectoryCorrupt
// DataDirectoryError indicates unknown error while validation.
DataDirectoryError
)

const (
snapSuffix = ".snap"
)

var (
// A map of valid files that can be present in the snap folder.
validFiles = map[string]bool{
Expand All @@ -70,9 +56,6 @@ var (
logger *logrus.Logger
)

// DataDirStatus represents the status of the etcd data directory.
type DataDirStatus int

func init() {
logger = logrus.New()
}
Expand All @@ -85,7 +68,7 @@ func (d *DataValidator) snapDir() string { return filepath.Join(d.memberDir(), "

func (d *DataValidator) backendPath() string { return filepath.Join(d.snapDir(), "db") }

//Validate performs the steps required to validate data for Etcd instance.
// Validate performs the steps required to validate data for Etcd instance.
// The steps involved are:
// * Check if data directory exists.
// - If data directory exists
Expand All @@ -112,12 +95,19 @@ func (d *DataValidator) Validate() (DataDirStatus, error) {
d.Logger.Infof("Data directory structure invalid.")
return DataDirectoryInvStruct, nil
}

d.Logger.Info("Checking for revision consistency...")
if err = d.CheckRevisionConsistency(); err != nil {
d.Logger.Infof("Etcd revision inconsistent with latest snapshot revision: %v", err)
return RevisionConsistencyError, nil
}

d.Logger.Info("Checking for data directory files corruption...")
err = d.checkForDataCorruption()
if err != nil {
if err = d.checkForDataCorruption(); err != nil {
d.Logger.Infof("Data directory corrupt. %v", err)
return DataDirectoryCorrupt, nil
}

d.Logger.Info("Data directory valid.")
return DataDirectoryValid, nil
}
Expand Down Expand Up @@ -273,7 +263,7 @@ func checkSuffix(names []string) []string {
snaps = append(snaps, names[i])
} else {
// If we find a file which is not a snapshot then check if it's
// a vaild file. If not throw out a warning.
// a valid file. If not throw out a warning.
if _, ok := validFiles[names[i]]; !ok {
fmt.Printf("skipped unexpected non snapshot file %v", names[i])
}
Expand Down Expand Up @@ -347,3 +337,71 @@ func verifyDB(path string) error {
return nil
})
}

// CheckRevisionConsistency compares the latest revisions on the etcd db file and the latest snapshot to verify that the etcd revision is not lesser than snapshot revision.
func (d *DataValidator) CheckRevisionConsistency() error {
etcdRevision, err := getLatestEtcdRevision(d.backendPath())
if err != nil {
return fmt.Errorf("unable to get current etcd revision from backend db file: %v", err)
}

store, err := snapstore.GetSnapstore(d.Config.SnapstoreConfig)
if err != nil {
return fmt.Errorf("unable to fetch snapstore: %v", err)
}

fullSnap, deltaSnaps, err := miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store)
if err != nil {
return fmt.Errorf("unable to get snapshots from store: %v", err)
}
var latestSnapshotRevision int64
if len(deltaSnaps) != 0 {
latestSnapshotRevision = deltaSnaps[len(deltaSnaps)-1].LastRevision
} else {
latestSnapshotRevision = fullSnap.LastRevision
}

if etcdRevision < latestSnapshotRevision {
return fmt.Errorf("current etcd revision (%d) is less than latest snapshot revision (%d): possible data loss", etcdRevision, latestSnapshotRevision)
}

return nil
}

// getLatestEtcdRevision finds out the latest revision on the etcd db file without starting etcd server or an embedded etcd server.
func getLatestEtcdRevision(path string) (int64, error) {
if _, err := os.Stat(path); err != nil {
return -1, fmt.Errorf("unable to stat backend db file: %v", err)
}

db, err := bolt.Open(path, 0400, &bolt.Options{ReadOnly: true})
if err != nil {
return -1, fmt.Errorf("unable to open backend boltdb file: %v", err)
}
defer db.Close()

var rev int64

err = db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("key"))
if b == nil {
return fmt.Errorf("cannot get hash of bucket \"key\"")
}

c := b.Cursor()
k, _ := c.Last()
if len(k) < 8 {
rev = 1
return nil
}
rev = int64(binary.BigEndian.Uint64(k[0:8]))

return nil
})

if err != nil {
return -1, err
}

return rev, nil
}
181 changes: 181 additions & 0 deletions pkg/initializer/validator/datavalidator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package validator_test

import (
"fmt"
"os"
"path"

"github.com/gardener/etcd-backup-restore/pkg/snapstore"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/sirupsen/logrus"

. "github.com/gardener/etcd-backup-restore/pkg/initializer/validator"
)

var _ = Describe("Running Datavalidator", func() {
var (
restoreDataDir string
snapstoreBackupDir string
snapstoreConfig *snapstore.Config
logger *logrus.Logger
validator *DataValidator
)

BeforeEach(func() {
logger = logrus.New()
restoreDataDir = path.Clean(etcdDir)
snapstoreBackupDir = path.Clean(snapstoreDir)

snapstoreConfig = &snapstore.Config{
Container: snapstoreBackupDir,
Provider: "Local",
}

validator = &DataValidator{
Config: &Config{
DataDir: restoreDataDir,
SnapstoreConfig: snapstoreConfig,
},
Logger: logger,
}
})
Context("with missing data directory", func() {
It("should return DataDirStatus as DataDirectoryNotExist or DataDirectoryError, and non-nil error", func() {
tempDir := fmt.Sprintf("%s.%s", restoreDataDir, "temp")
err = os.Rename(restoreDataDir, tempDir)
Expect(err).ShouldNot(HaveOccurred())
dataDirStatus, err := validator.Validate()
Expect(err).Should(HaveOccurred())
Expect(int(dataDirStatus)).Should(SatisfyAny(Equal(DataDirectoryNotExist), Equal(DataDirectoryError)))
err = os.Rename(tempDir, restoreDataDir)
Expect(err).ShouldNot(HaveOccurred())
})
})

Context("with data directory present", func() {
Context("with incorrect data directory structure", func() {
Context("with missing member directory", func() {
It("should return DataDirStatus as DataDirectoryInvStruct or DataDirectoryError, and nil error", func() {
memberDir := path.Join(restoreDataDir, "member")
tempDir := fmt.Sprintf("%s.%s", memberDir, "temp")
err = os.Rename(memberDir, tempDir)
Expect(err).ShouldNot(HaveOccurred())
dataDirStatus, err := validator.Validate()
Expect(err).ShouldNot(HaveOccurred())
Expect(int(dataDirStatus)).Should(SatisfyAny(Equal(DataDirectoryInvStruct), Equal(DataDirectoryError)))
err = os.Rename(tempDir, memberDir)
Expect(err).ShouldNot(HaveOccurred())
})
})
Context("with member directory present", func() {
Context("with missing snap directory", func() {
It("should return DataDirStatus as DataDirectoryInvStruct or DataDirectoryError, and nil error", func() {
snapDir := path.Join(restoreDataDir, "member", "snap")
tempDir := fmt.Sprintf("%s.%s", snapDir, "temp")
err = os.Rename(snapDir, tempDir)
Expect(err).ShouldNot(HaveOccurred())
dataDirStatus, err := validator.Validate()
Expect(err).ShouldNot(HaveOccurred())
Expect(int(dataDirStatus)).Should(SatisfyAny(Equal(DataDirectoryInvStruct), Equal(DataDirectoryError)))
err = os.Rename(tempDir, snapDir)
Expect(err).ShouldNot(HaveOccurred())
})
})
Context("with missing wal directory", func() {
It("should return DataDirStatus as DataDirectoryInvStruct or DataDirectoryError, and nil error", func() {
walDir := path.Join(restoreDataDir, "member", "wal")
tempDir := fmt.Sprintf("%s.%s", walDir, "temp")
err = os.Rename(walDir, tempDir)
Expect(err).ShouldNot(HaveOccurred())
dataDirStatus, err := validator.Validate()
Expect(err).ShouldNot(HaveOccurred())
Expect(int(dataDirStatus)).Should(SatisfyAny(Equal(DataDirectoryInvStruct), Equal(DataDirectoryError)))
err = os.Rename(tempDir, walDir)
Expect(err).ShouldNot(HaveOccurred())
})
})
})
})
Context("with correct data directory structure", func() {
Context("with inconsistent revision numbers between etcd and latest snapshot", func() {
It("should return DataDirStatus as RevisionConsistencyError or DataDirectoryError, and nil error", func() {
tempDir := fmt.Sprintf("%s.%s", restoreDataDir, "temp")
err = os.Rename(restoreDataDir, tempDir)
Expect(err).ShouldNot(HaveOccurred())

// start etcd
etcd, err = startEmbeddedEtcd(restoreDataDir, logger)
Expect(err).ShouldNot(HaveOccurred())
// populate etcd but with lesser data than previous populate call, so that the new db has a lower revision
newEtcdRevision, err := populateEtcdFinite(logger, endpoints, keyFrom, int(keyTo/2))
Expect(err).ShouldNot(HaveOccurred())

etcd.Server.Stop()
etcd.Close()

fmt.Printf("\nPrev etcd revision: %d\nNew etcd revision: %d\n", etcdRevision, newEtcdRevision)

// etcdRevision: latest revision number on the snapstore (etcd backup)
// newEtcdRevision: current revision number on etcd db
Expect(etcdRevision).To(BeNumerically(">=", newEtcdRevision))

dataDirStatus, err := validator.Validate()
Expect(err).ShouldNot(HaveOccurred())
Expect(int(dataDirStatus)).Should(SatisfyAny(Equal(RevisionConsistencyError), Equal(DataDirectoryError)))

err = os.RemoveAll(restoreDataDir)
Expect(err).ShouldNot(HaveOccurred())

err = os.Rename(tempDir, restoreDataDir)
Expect(err).ShouldNot(HaveOccurred())
})
})
Context("with consistent revision numbers between etcd and latest snapshot", func() {
Context("with corrupt data directory", func() {
Context("with corrupt db file", func() {
It("should return DataDirStatus as DataDirectoryCorrupt or DataDirectoryError or RevisionConsistencyError, and nil error", func() {
dbFile := path.Join(restoreDataDir, "member", "snap", "db")
_, err = os.Stat(dbFile)
Expect(err).ShouldNot(HaveOccurred())

tempFile := path.Join(outputDir, "temp", "db")
err = copyFile(dbFile, tempFile)
Expect(err).ShouldNot(HaveOccurred())

file, err := os.OpenFile(
dbFile,
os.O_WRONLY|os.O_TRUNC|os.O_CREATE,
0666,
)
Expect(err).ShouldNot(HaveOccurred())
defer file.Close()

// corrupt the db file by writing random data to it
byteSlice := []byte("Random data!\n")
_, err = file.Write(byteSlice)
Expect(err).ShouldNot(HaveOccurred())

dataDirStatus, err := validator.Validate()
Expect(err).ShouldNot(HaveOccurred())
Expect(int(dataDirStatus)).Should(SatisfyAny(Equal(DataDirectoryCorrupt), Equal(DataDirectoryError), Equal(RevisionConsistencyError)))

err = os.Remove(dbFile)
Expect(err).ShouldNot(HaveOccurred())

err = os.Rename(tempFile, dbFile)
Expect(err).ShouldNot(HaveOccurred())
})
})
})
Context("with clean data directory", func() {
It("should return DataDirStatus as DataDirectoryValid, and nil error", func() {
dataDirStatus, err := validator.Validate()
Expect(err).ShouldNot(HaveOccurred())
Expect(int(dataDirStatus)).Should(Equal(DataDirectoryValid))
})
})
})
})
})
})
Loading

0 comments on commit 3871e39

Please sign in to comment.