Skip to content

Commit

Permalink
Fix the revision consitency check error handling
Browse files Browse the repository at this point in the history
Signed-off-by: Swapnil Mhamane <[email protected]>
  • Loading branch information
Swapnil Mhamane committed Sep 7, 2019
1 parent e9f9b74 commit 481c09c
Show file tree
Hide file tree
Showing 4 changed files with 266 additions and 317 deletions.
2 changes: 1 addition & 1 deletion pkg/initializer/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const (
func (e *EtcdInitializer) Initialize(mode validator.Mode, failBelowRevision int64) error {
start := time.Now()
dataDirStatus, err := e.Validator.Validate(mode, failBelowRevision)
if err != nil && dataDirStatus != validator.DataDirectoryNotExist {
if dataDirStatus == validator.DataDirectoryStatusUnknown {
metrics.ValidationDurationSeconds.With(prometheus.Labels{metrics.LabelSucceeded: metrics.ValueSucceededFalse}).Observe(time.Now().Sub(start).Seconds())
return fmt.Errorf("error while initializing: %v", err)
}
Expand Down
206 changes: 45 additions & 161 deletions pkg/initializer/validator/datavalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,12 @@ import (
"fmt"
"hash/crc32"
"io"
"io/ioutil"
"os"
"path/filepath"
"sort"
"strings"

bolt "github.com/coreos/bbolt"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/snap/snappb"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
"github.com/gardener/etcd-backup-restore/pkg/miscellaneous"
Expand Down Expand Up @@ -69,59 +65,56 @@ func (d *DataValidator) snapDir() string { return filepath.Join(d.memberDir(), "
func (d *DataValidator) backendPath() string { return filepath.Join(d.snapDir(), "db") }

// Validate performs the steps required to validate data for Etcd instance.
// The steps involved are:
// * Check if data directory exists.
// - If data directory exists
// * Check for data directory structure.
// - If data directory structure is invalid return DataDirectoryInvStruct status.
// * Check for data corruption.
// - return data directory corruption status.
func (d *DataValidator) Validate(mode Mode, failBelowRevision int64) (DataDirStatus, error) {
status, err := d.sanityCheck(failBelowRevision)
if status != DataDirectoryValid {
return status, err
}

if mode == Full {
d.Logger.Info("Checking for data directory files corruption...")
if err := d.checkForDataCorruption(); err != nil {
d.Logger.Infof("Data directory corrupt. %v", err)
return DataDirectoryCorrupt, nil
}
}

d.Logger.Info("Data directory valid.")
return DataDirectoryValid, nil
}

func (d *DataValidator) sanityCheck(failBelowRevision int64) (DataDirStatus, error) {
dataDir := d.Config.DataDir
dirExists, err := directoryExist(dataDir)
if err != nil {
return DataDirectoryError, err
} else if !dirExists {
err = fmt.Errorf("Directory does not exist: %s", dataDir)
return DataDirectoryNotExist, err
return DataDirectoryStatusUnknown, err
}
if !dirExists {
return DataDirectoryNotExist, fmt.Errorf("directory does not exist: %s", dataDir)
}

d.Logger.Info("Checking for data directory structure validity...")
etcdDirStructValid, err := d.hasEtcdDirectoryStructure()
if err != nil {
return DataDirectoryError, err
return DataDirectoryStatusUnknown, err
}
if !etcdDirStructValid {
d.Logger.Infof("Data directory structure invalid.")
return DataDirectoryInvStruct, nil
}

if d.Config.SnapstoreConfig != nil {
d.Logger.Info("Checking for revision consistency...")
failBelowRevisionCheck, err := checkRevisionConsistency(d.backendPath(), *d.Config.SnapstoreConfig, failBelowRevision)
if err != nil {
d.Logger.Infof("Etcd revision inconsistent with latest snapshot revision: %v", err)
if failBelowRevisionCheck {
return FailBelowRevisionConsistencyError, nil
}
return RevisionConsistencyError, nil
}
} else {
if d.Config.SnapstoreConfig == nil {
d.Logger.Info("Skipping check for revision consistency, since no snapstore configured.")
return DataDirectoryValid, nil
}

switch mode {
case Full:
d.Logger.Info("Checking for data directory files corruption...")
err = d.checkForDataCorruption()
if err != nil {
d.Logger.Infof("Data directory corrupt. %v", err)
return DataDirectoryCorrupt, nil
}
etcdRevision, err := getLatestEtcdRevision(d.backendPath())
if err != nil {
d.Logger.Infof("unable to get current etcd revision from backend db file: %v", err)
return DataDirectoryCorrupt, nil
}

d.Logger.Info("Data directory valid.")
return DataDirectoryValid, nil
d.Logger.Info("Checking for revision consistency...")
return d.checkRevisionConsistency(etcdRevision, failBelowRevision)
}

// checkForDataCorruption will check for corruption of different files used by etcd.
Expand Down Expand Up @@ -173,115 +166,9 @@ func directoryExist(dir string) (bool, error) {
}
}

func isDirEmpty(dataDir string) (bool, error) {
f, err := os.Open(dataDir)
if err != nil {
return false, err
}
defer f.Close()

_, err = f.Readdirnames(1) // Or f.Readdir(1)
if err == io.EOF {
return true, nil
}
return false, err // Either not empty or error, suits both cases
}

func (d *DataValidator) verifySnapDir() (*raftpb.Snapshot, error) {
names, err := d.snapNames()
if err != nil {
return nil, err
}

var snapshot *raftpb.Snapshot
for _, name := range names {
if snapshot, err = verifySnap(d.snapDir(), name); err == nil {
break
}
}
if err != nil {
return nil, snap.ErrNoSnapshot
}
return snapshot, nil
}

func verifySnap(dir, name string) (*raftpb.Snapshot, error) {
fpath := filepath.Join(dir, name)
snap, err := read(fpath)
return snap, err
}

// Read reads the snapshot named by snapname and returns the snapshot.
func read(snapname string) (*raftpb.Snapshot, error) {
fmt.Printf("Verifying Snapfile %s.\n", snapname)
b, err := ioutil.ReadFile(snapname)
if err != nil {
fmt.Printf("Cannot read file %v: %v.\n", snapname, err)
return nil, err
}

if len(b) == 0 {
fmt.Printf("Unexpected empty snapshot.\n")
return nil, snap.ErrEmptySnapshot
}

var serializedSnap snappb.Snapshot
if err = serializedSnap.Unmarshal(b); err != nil {
fmt.Printf("Corrupted snapshot file %v: %v.\n", snapname, err)
return nil, err
}

if len(serializedSnap.Data) == 0 || serializedSnap.Crc == 0 {
fmt.Printf("Unexpected empty snapshot.\n")
return nil, snap.ErrEmptySnapshot
}

crc := crc32.Update(0, crcTable, serializedSnap.Data)
if crc != serializedSnap.Crc {
fmt.Printf("Corrupted snapshot file %v: crc mismatch.\n", snapname)
return nil, snap.ErrCRCMismatch
}

var snap raftpb.Snapshot
if err = snap.Unmarshal(serializedSnap.Data); err != nil {
fmt.Printf("corrupted snapshot file %v: %v", snapname, err)
return nil, err
}
return &snap, nil
}

func (d *DataValidator) snapNames() ([]string, error) {
dir, err := os.Open(d.snapDir())
if err != nil {
return nil, err
}
defer dir.Close()
names, err := dir.Readdirnames(-1)
if err != nil {
return nil, err
}
snaps := checkSuffix(names)
if len(snaps) == 0 {
return nil, snap.ErrNoSnapshot
}
sort.Sort(sort.Reverse(sort.StringSlice(snaps)))
return snaps, nil
}

func checkSuffix(names []string) []string {
snaps := []string{}
for i := range names {
if strings.HasSuffix(names[i], snapSuffix) {
snaps = append(snaps, names[i])
} else {
// If we find a file which is not a snapshot then check if it's
// a valid file. If not throw out a warning.
if _, ok := validFiles[names[i]]; !ok {
fmt.Printf("skipped unexpected non snapshot file %v", names[i])
}
}
}
return snaps
ssr := snap.New(d.snapDir())
return ssr.Load()
}

func verifyWALDir(waldir string, snap walpb.Snapshot) error {
Expand Down Expand Up @@ -342,40 +229,37 @@ func verifyDB(path string) error {
}

// checkRevisionConsistency compares the latest revisions on the etcd db file and the latest snapshot to verify that the etcd revision is not lesser than snapshot revision.
// Return true or false indicating whether it is due to failBelowRevision or latest snapshot revision for snapstore.
func checkRevisionConsistency(dbPath string, config snapstore.Config, failBelowRevision int64) (bool, error) {
etcdRevision, err := getLatestEtcdRevision(dbPath)
// Return DataDirStatus indicating whether it is due to failBelowRevision or latest snapshot revision for snapstore.
func (d *DataValidator) checkRevisionConsistency(etcdRevision, failBelowRevision int64) (DataDirStatus, error) {
store, err := snapstore.GetSnapstore(d.Config.SnapstoreConfig)
if err != nil {
return false, fmt.Errorf("unable to get current etcd revision from backend db file: %v", err)
}

store, err := snapstore.GetSnapstore(&config)
if err != nil {
return false, fmt.Errorf("unable to fetch snapstore: %v", err)
return DataDirectoryStatusUnknown, fmt.Errorf("unable to fetch snapstore: %v", err)
}

var latestSnapshotRevision int64
fullSnap, deltaSnaps, err := miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store)
if err != nil {
return false, fmt.Errorf("unable to get snapshots from store: %v", err)
return DataDirectoryStatusUnknown, fmt.Errorf("unable to get snapshots from store: %v", err)
}
if len(deltaSnaps) != 0 {
latestSnapshotRevision = deltaSnaps[len(deltaSnaps)-1].LastRevision
} else if fullSnap != nil {
latestSnapshotRevision = fullSnap.LastRevision
} else {
logger.Infof("No snapshot found.")
d.Logger.Infof("No snapshot found.")
if etcdRevision < failBelowRevision {
return true, fmt.Errorf("current etcd revision (%d) is less than fail below revision (%d): possible data loss", etcdRevision, failBelowRevision)
d.Logger.Infof("current etcd revision (%d) is less than fail below revision (%d): possible data loss", etcdRevision, failBelowRevision)
return FailBelowRevisionConsistencyError, nil
}
return false, nil
return DataDirectoryValid, nil
}

if etcdRevision < latestSnapshotRevision {
return false, fmt.Errorf("current etcd revision (%d) is less than latest snapshot revision (%d): possible data loss", etcdRevision, latestSnapshotRevision)
d.Logger.Infof("current etcd revision (%d) is less than latest snapshot revision (%d): possible data loss", etcdRevision, latestSnapshotRevision)
return RevisionConsistencyError, nil
}

return false, nil
return DataDirectoryValid, nil
}

// getLatestEtcdRevision finds out the latest revision on the etcd db file without starting etcd server or an embedded etcd server.
Expand Down
Loading

0 comments on commit 481c09c

Please sign in to comment.