Skip to content

Commit

Permalink
Add experimental faiil below revision flag
Browse files Browse the repository at this point in the history
Main motivation behind this is to help extenal entity configure revision
in case of bucket change or object store prefix change

Signed-off-by: Swapnil Mhamane <[email protected]>
  • Loading branch information
Swapnil Mhamane committed Aug 7, 2019
1 parent a17381e commit 8d98b0f
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 34 deletions.
3 changes: 2 additions & 1 deletion cmd/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func NewInitializeCommand(stopCh <-chan struct{}) *cobra.Command {
}
}
etcdInitializer := initializer.NewInitializer(options, snapstoreConfig, logger)
err = etcdInitializer.Initialize(mode)
err = etcdInitializer.Initialize(mode, failBelowRevision)
if err != nil {
logger.Fatalf("initializer failed. %v", err)
}
Expand All @@ -90,6 +90,7 @@ func NewInitializeCommand(stopCh <-chan struct{}) *cobra.Command {
initializeEtcdFlags(initializeCmd)
initializeSnapstoreFlags(initializeCmd)
initializeValidatorFlags(initializeCmd)
initializeCmd.Flags().Int64Var(&failBelowRevision, "experimental-fail-below-revision", 0, "revision below which validation fails")
return initializeCmd
}

Expand Down
3 changes: 2 additions & 1 deletion cmd/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ var (
snapstoreTempDir string

//initializer flags
validationMode string
validationMode string
failBelowRevision int64
)

var emptyStruct struct{}
13 changes: 9 additions & 4 deletions pkg/initializer/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,20 @@ const (
// * Check if Latest snapshot available.
// - Try to perform an Etcd data restoration from the latest snapshot.
// - No snapshots are available, start etcd as a fresh installation.
func (e *EtcdInitializer) Initialize(mode validator.Mode) error {
func (e *EtcdInitializer) Initialize(mode validator.Mode, failBelowRevision int64) error {
start := time.Now()
dataDirStatus, err := e.Validator.Validate(mode)
dataDirStatus, err := e.Validator.Validate(mode, failBelowRevision)
if err != nil && dataDirStatus != validator.DataDirectoryNotExist {
metrics.ValidationDurationSeconds.With(prometheus.Labels{metrics.LabelSucceeded: metrics.ValueSucceededFalse}).Observe(time.Now().Sub(start).Seconds())
err = fmt.Errorf("error while initializing: %v", err)
return err
return fmt.Errorf("error while initializing: %v", err)
}

if dataDirStatus == validator.FailBelowRevisionConsistencyError {
metrics.ValidationDurationSeconds.With(prometheus.Labels{metrics.LabelSucceeded: metrics.ValueSucceededFalse}).Observe(time.Now().Sub(start).Seconds())
return fmt.Errorf("failed to initialize since fail below revision check failed")
}
metrics.ValidationDurationSeconds.With(prometheus.Labels{metrics.LabelSucceeded: metrics.ValueSucceededTrue}).Observe(time.Now().Sub(start).Seconds())

if dataDirStatus != validator.DataDirectoryValid {
start := time.Now()
if err := e.restoreCorruptData(); err != nil {
Expand Down
34 changes: 21 additions & 13 deletions pkg/initializer/validator/datavalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (d *DataValidator) backendPath() string { return filepath.Join(d.snapDir(),
// - If data directory structure is invalid return DataDirectoryInvStruct status.
// * Check for data corruption.
// - return data directory corruption status.
func (d *DataValidator) Validate(mode Mode) (DataDirStatus, error) {
func (d *DataValidator) Validate(mode Mode, failBelowRevision int64) (DataDirStatus, error) {
dataDir := d.Config.DataDir
dirExists, err := directoryExist(dataDir)
if err != nil {
Expand All @@ -98,8 +98,12 @@ func (d *DataValidator) Validate(mode Mode) (DataDirStatus, error) {

if d.Config.SnapstoreConfig != nil {
d.Logger.Info("Checking for revision consistency...")
if err = checkRevisionConsistency(d.backendPath(), *d.Config.SnapstoreConfig); err != nil {
failBelowRevisionCheck, err := checkRevisionConsistency(d.backendPath(), *d.Config.SnapstoreConfig, failBelowRevision)
if err != nil {
d.Logger.Infof("Etcd revision inconsistent with latest snapshot revision: %v", err)
if failBelowRevisionCheck {
return FailBelowRevisionConsistencyError, nil
}
return RevisionConsistencyError, nil
}
} else {
Expand Down Expand Up @@ -338,36 +342,40 @@ func verifyDB(path string) error {
}

// checkRevisionConsistency compares the latest revisions on the etcd db file and the latest snapshot to verify that the etcd revision is not lesser than snapshot revision.
func checkRevisionConsistency(dbPath string, config snapstore.Config) error {
// Return true or false indicating whether it is due to failBelowRevision or latest snapshot revision for snapstore.
func checkRevisionConsistency(dbPath string, config snapstore.Config, failBelowRevision int64) (bool, error) {
etcdRevision, err := getLatestEtcdRevision(dbPath)
if err != nil {
return fmt.Errorf("unable to get current etcd revision from backend db file: %v", err)
return false, fmt.Errorf("unable to get current etcd revision from backend db file: %v", err)
}

store, err := snapstore.GetSnapstore(&config)
if err != nil {
return fmt.Errorf("unable to fetch snapstore: %v", err)
return false, fmt.Errorf("unable to fetch snapstore: %v", err)
}

var latestSnapshotRevision int64
fullSnap, deltaSnaps, err := miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store)
if err != nil {
return fmt.Errorf("unable to get snapshots from store: %v", err)
return false, fmt.Errorf("unable to get snapshots from store: %v", err)
}
if fullSnap == nil {
logger.Infof("No snapshot found.")
return nil
} else if len(deltaSnaps) == 0 {
if len(deltaSnaps) != 0 {
latestSnapshotRevision = deltaSnaps[len(deltaSnaps)-1].LastRevision
} else if fullSnap != nil {
latestSnapshotRevision = fullSnap.LastRevision
} else {
latestSnapshotRevision = deltaSnaps[len(deltaSnaps)-1].LastRevision
logger.Infof("No snapshot found.")
if etcdRevision < failBelowRevision {
return true, fmt.Errorf("current etcd revision (%d) is less than fail below revision (%d): possible data loss", etcdRevision, failBelowRevision)
}
return false, nil
}

if etcdRevision < latestSnapshotRevision {
return fmt.Errorf("current etcd revision (%d) is less than latest snapshot revision (%d): possible data loss", etcdRevision, latestSnapshotRevision)
return false, fmt.Errorf("current etcd revision (%d) is less than latest snapshot revision (%d): possible data loss", etcdRevision, latestSnapshotRevision)
}

return nil
return false, nil
}

// getLatestEtcdRevision finds out the latest revision on the etcd db file without starting etcd server or an embedded etcd server.
Expand Down
32 changes: 21 additions & 11 deletions pkg/initializer/validator/datavalidator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var _ = Describe("Running Datavalidator", func() {
tempDir := fmt.Sprintf("%s.%s", restoreDataDir, "temp")
err = os.Rename(restoreDataDir, tempDir)
Expect(err).ShouldNot(HaveOccurred())
dataDirStatus, err := validator.Validate(Full)
dataDirStatus, err := validator.Validate(Full, 0)
Expect(err).Should(HaveOccurred())
Expect(int(dataDirStatus)).Should(SatisfyAny(Equal(DataDirectoryNotExist), Equal(DataDirectoryError)))
err = os.Rename(tempDir, restoreDataDir)
Expand All @@ -61,7 +61,7 @@ var _ = Describe("Running Datavalidator", func() {
tempDir := fmt.Sprintf("%s.%s", memberDir, "temp")
err = os.Rename(memberDir, tempDir)
Expect(err).ShouldNot(HaveOccurred())
dataDirStatus, err := validator.Validate(Full)
dataDirStatus, err := validator.Validate(Full, 0)
Expect(err).ShouldNot(HaveOccurred())
Expect(int(dataDirStatus)).Should(SatisfyAny(Equal(DataDirectoryInvStruct), Equal(DataDirectoryError)))
err = os.Rename(tempDir, memberDir)
Expand All @@ -75,7 +75,7 @@ var _ = Describe("Running Datavalidator", func() {
tempDir := fmt.Sprintf("%s.%s", snapDir, "temp")
err = os.Rename(snapDir, tempDir)
Expect(err).ShouldNot(HaveOccurred())
dataDirStatus, err := validator.Validate(Full)
dataDirStatus, err := validator.Validate(Full, 0)
Expect(err).ShouldNot(HaveOccurred())
Expect(int(dataDirStatus)).Should(SatisfyAny(Equal(DataDirectoryInvStruct), Equal(DataDirectoryError)))
err = os.Rename(tempDir, snapDir)
Expand All @@ -88,7 +88,7 @@ var _ = Describe("Running Datavalidator", func() {
tempDir := fmt.Sprintf("%s.%s", walDir, "temp")
err = os.Rename(walDir, tempDir)
Expect(err).ShouldNot(HaveOccurred())
dataDirStatus, err := validator.Validate(Full)
dataDirStatus, err := validator.Validate(Full, 0)
Expect(err).ShouldNot(HaveOccurred())
Expect(int(dataDirStatus)).Should(SatisfyAny(Equal(DataDirectoryInvStruct), Equal(DataDirectoryError)))
err = os.Rename(tempDir, walDir)
Expand All @@ -104,7 +104,7 @@ var _ = Describe("Running Datavalidator", func() {
Expect(err).ShouldNot(HaveOccurred())
err = os.Mkdir(walDir, 0700)
Expect(err).ShouldNot(HaveOccurred())
dataDirStatus, err := validator.Validate(Sanity)
dataDirStatus, err := validator.Validate(Sanity, 0)
Expect(err).ShouldNot(HaveOccurred())
Expect(int(dataDirStatus)).Should(Equal(DataDirectoryValid))
err = os.RemoveAll(walDir)
Expand Down Expand Up @@ -138,7 +138,7 @@ var _ = Describe("Running Datavalidator", func() {
// newEtcdRevision: current revision number on etcd db
Expect(etcdRevision).To(BeNumerically(">=", newEtcdRevision))

dataDirStatus, err := validator.Validate(Full)
dataDirStatus, err := validator.Validate(Full, 0)
Expect(err).ShouldNot(HaveOccurred())
Expect(int(dataDirStatus)).Should(SatisfyAny(Equal(RevisionConsistencyError), Equal(DataDirectoryError)))

Expand Down Expand Up @@ -174,7 +174,7 @@ var _ = Describe("Running Datavalidator", func() {
_, err = file.Write(byteSlice)
Expect(err).ShouldNot(HaveOccurred())

dataDirStatus, err := validator.Validate(Full)
dataDirStatus, err := validator.Validate(Full, 0)
Expect(err).ShouldNot(HaveOccurred())
Expect(int(dataDirStatus)).Should(SatisfyAny(Equal(DataDirectoryCorrupt), Equal(DataDirectoryError), Equal(RevisionConsistencyError)))

Expand All @@ -187,10 +187,20 @@ var _ = Describe("Running Datavalidator", func() {
})
})
Context("with clean data directory", func() {
It("should return DataDirStatus as DataDirectoryValid, and nil error", func() {
dataDirStatus, err := validator.Validate(Full)
Expect(err).ShouldNot(HaveOccurred())
Expect(int(dataDirStatus)).Should(Equal(DataDirectoryValid))
Context("with fail below revision configured to low value", func() {
It("should return DataDirStatus as DataDirectoryValid, and nil error", func() {
dataDirStatus, err := validator.Validate(Full, 0)
Expect(err).ShouldNot(HaveOccurred())
Expect(int(dataDirStatus)).Should(Equal(DataDirectoryValid))
})
})

Context("with fail below revision configured to high value", func() {
It("should return DataDirStatus as RevisionConsistencyError and nil error", func() {
dataDirStatus, err := validator.Validate(Full, 1000000)
Expect(err).ShouldNot(HaveOccurred())
Expect(int(dataDirStatus)).Should(Equal(FailBelowRevisionConsistencyError))
})
})
})
})
Expand Down
4 changes: 3 additions & 1 deletion pkg/initializer/validator/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const (
DataDirectoryError
// RevisionConsistencyError indicates current etcd revision is inconsistent with latest snapshot revision.
RevisionConsistencyError
//FailBelowRevisionConsistencyError indicate the current etcd revision is inconsistent with failBelowRevison.
FailBelowRevisionConsistencyError
)

const (
Expand Down Expand Up @@ -65,5 +67,5 @@ type DataValidator struct {

// Validator is the interface for data validation actions.
type Validator interface {
Validate(Mode) error
Validate(Mode, int64) error
}
17 changes: 16 additions & 1 deletion pkg/server/httpAPI.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"net/http"
"net/http/pprof"
"strconv"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -150,6 +151,20 @@ func (h *HTTPHandler) serveInitialize(rw http.ResponseWriter, req *http.Request)
<-h.AckCh
}

failBelowRevisionStr := req.URL.Query().Get("failbelowrevision")
h.Logger.Infof("Validation failBelowRevision: %s", failBelowRevisionStr)
var failBelowRevision int64
if len(failBelowRevisionStr) != 0 {
var err error
failBelowRevision, err = strconv.ParseInt(failBelowRevisionStr, 10, 64)
if err != nil {
h.initializationStatusMutex.Lock()
defer h.initializationStatusMutex.Unlock()
h.Logger.Errorf("Failed initialization due wrong parameter value `failbelowrevision`: %v", err)
h.initializationStatus = initializationStatusFailed
return
}
}
switch modeVal := req.URL.Query().Get("mode"); modeVal {
case string(validator.Full):
mode = validator.Full
Expand All @@ -159,7 +174,7 @@ func (h *HTTPHandler) serveInitialize(rw http.ResponseWriter, req *http.Request)
mode = validator.Full
}
h.Logger.Infof("Validation mode: %s", mode)
err := h.EtcdInitializer.Initialize(mode)
err := h.EtcdInitializer.Initialize(mode, failBelowRevision)
h.initializationStatusMutex.Lock()
defer h.initializationStatusMutex.Unlock()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/integration/cloud_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ var _ = Describe("CloudBackup", func() {
SnapstoreConfig: snapstoreConfig,
},
}
dataDirStatus, err := dataValidator.Validate(validator.Full)
dataDirStatus, err := dataValidator.Validate(validator.Full, 0)
Expect(err).ShouldNot(HaveOccurred())
Expect(dataDirStatus).Should(Equal(validator.DataDirStatus(validator.DataDirectoryValid)))
})
Expand Down Expand Up @@ -248,7 +248,7 @@ var _ = Describe("CloudBackup", func() {
SnapstoreConfig: snapstoreConfig,
},
}
dataDirStatus, err := dataValidator.Validate(validator.Full)
dataDirStatus, err := dataValidator.Validate(validator.Full, 0)
Expect(err).ShouldNot(HaveOccurred())
Expect(dataDirStatus).Should(SatisfyAny(Equal(validator.DataDirStatus(validator.DataDirectoryCorrupt)), Equal(validator.DataDirStatus(validator.RevisionConsistencyError))))
})
Expand Down

0 comments on commit 8d98b0f

Please sign in to comment.