Skip to content

Commit

Permalink
Merge pull request #128 from swapnilgm/regression
Browse files Browse the repository at this point in the history
Optimise delta snapshot to use one time json marshalling
  • Loading branch information
shreyas-s-rao authored Feb 28, 2019
2 parents 6f68760 + 3fbcb9b commit 5fef841
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 57 deletions.
85 changes: 30 additions & 55 deletions pkg/snapshot/snapshotter/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
package snapshotter

import (
"bytes"
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"math"
"path"
"sync"
Expand Down Expand Up @@ -193,6 +194,7 @@ func (ssr *Snapshotter) TakeFullSnapshotAndResetTimer() error {
// It basically will connect to etcd. Then ask for snapshot. And finally
// store it to underlying snapstore on the fly.
func (ssr *Snapshotter) takeFullSnapshot() error {
defer ssr.cleanupInMemoryEvents()
// close previous watch and client.
ssr.closeEtcdClient()

Expand Down Expand Up @@ -253,9 +255,6 @@ func (ssr *Snapshotter) takeFullSnapshot() error {
return nil
}

// make event array empty as any event prior to full snapshot should not be uploaded in delta.
ssr.events = []*event{}
ssr.eventMemory = 0
watchCtx, cancelWatch := context.WithCancel(context.TODO())
ssr.cancelWatch = cancelWatch
ssr.etcdClient = client
Expand All @@ -265,14 +264,19 @@ func (ssr *Snapshotter) takeFullSnapshot() error {
return nil
}

func (ssr *Snapshotter) cleanupInMemoryEvents() {
ssr.events = []byte{}
ssr.lastEventRevision = -1
}

func (ssr *Snapshotter) takeDeltaSnapshotAndResetTimer() error {
if err := ssr.takeDeltaSnapshot(); err != nil {
// As per design principle, in business critical service if backup is not working,
// it's better to fail the process. So, we are quiting here.
ssr.logger.Warnf("Taking delta snapshot failed: %v", err)
return err
}
ssr.eventMemory = 0

if ssr.deltaSnapshotTimer == nil {
ssr.deltaSnapshotTimer = time.NewTimer(time.Second * time.Duration(ssr.config.deltaSnapshotIntervalSeconds))
} else {
Expand All @@ -285,26 +289,31 @@ func (ssr *Snapshotter) takeDeltaSnapshotAndResetTimer() error {
}

func (ssr *Snapshotter) takeDeltaSnapshot() error {
defer ssr.cleanupInMemoryEvents()
ssr.logger.Infof("Taking delta snapshot for time: %s", time.Now().Local())

if len(ssr.events) == 0 {
ssr.logger.Infof("No events received to save snapshot. Skipping delta snapshot.")
return nil
}
ssr.events = append(ssr.events, byte(']'))

snap := snapstore.NewSnapshot(snapstore.SnapshotKindDelta, ssr.prevSnapshot.LastRevision+1, ssr.events[len(ssr.events)-1].EtcdEvent.Kv.ModRevision)
snap := snapstore.NewSnapshot(snapstore.SnapshotKindDelta, ssr.prevSnapshot.LastRevision+1, ssr.lastEventRevision)
snap.SnapDir = ssr.prevSnapshot.SnapDir
pr, pw := io.Pipe()
go jsonEncodeEvents(ssr.events, pw)

// compute hash
hash := sha256.New()
if _, err := hash.Write(ssr.events); err != nil {
return fmt.Errorf("failed to compute hash of events: %v", err)
}
ssr.events = hash.Sum(ssr.events)
startTime := time.Now()
if err := ssr.config.store.Save(*snap, pr); err != nil {
if err := ssr.config.store.Save(*snap, ioutil.NopCloser(bytes.NewReader(ssr.events))); err != nil {
timeTaken := time.Now().Sub(startTime).Seconds()
metrics.SnapshotDurationSeconds.With(prometheus.Labels{metrics.LabelKind: snapstore.SnapshotKindDelta, metrics.LabelSucceeded: metrics.ValueSucceededFalse}).Observe(timeTaken)
ssr.logger.Errorf("Error saving delta snapshots. %v", err)
return err
}
ssr.events = []*event{}

timeTaken := time.Now().Sub(startTime).Seconds()
metrics.SnapshotDurationSeconds.With(prometheus.Labels{metrics.LabelKind: snapstore.SnapshotKindDelta, metrics.LabelSucceeded: metrics.ValueSucceededTrue}).Observe(timeTaken)
logrus.Infof("Total time to save delta snapshot: %f seconds.", timeTaken)
Expand All @@ -315,45 +324,6 @@ func (ssr *Snapshotter) takeDeltaSnapshot() error {
return nil
}

func jsonEncodeEvents(events []*event, pw *io.PipeWriter) {
// compute hash
hash := sha256.New()
mw := io.MultiWriter(hash, pw)
if len(events) == 0 {
pw.CloseWithError(fmt.Errorf("no event found to encode"))
return
}
if _, err := mw.Write([]byte("[")); err != nil {
pw.CloseWithError(fmt.Errorf("no event found to encode"))
return
}
enc := json.NewEncoder(mw)
if err := enc.Encode(events[0]); err != nil {
pw.CloseWithError(fmt.Errorf("no event found to encode"))
return
}
for _, e := range events[1:] {
if _, err := mw.Write([]byte(",")); err != nil {
pw.CloseWithError(fmt.Errorf("no event found to encode"))
return
}
if err := enc.Encode(e); err != nil {
pw.CloseWithError(fmt.Errorf("no event found to encode"))
return
}
}
if _, err := mw.Write([]byte("]")); err != nil {
pw.CloseWithError(fmt.Errorf("no event found to encode"))
return
}
// write the final check sum to pipe
if _, err := pw.Write(hash.Sum(nil)); err != nil {
pw.CloseWithError(fmt.Errorf("no event found to encode"))
return
}
pw.Close()
}

func (ssr *Snapshotter) handleDeltaWatchEvents(wr clientv3.WatchResponse) error {
if err := wr.Err(); err != nil {
return err
Expand All @@ -365,12 +335,17 @@ func (ssr *Snapshotter) handleDeltaWatchEvents(wr clientv3.WatchResponse) error
if err != nil {
return fmt.Errorf("failed to marshal events to json: %v", err)
}
ssr.eventMemory = ssr.eventMemory + len(jsonByte)
ssr.events = append(ssr.events, timedEvent)
if len(ssr.events) == 0 {
ssr.events = append(ssr.events, byte('['))
} else {
ssr.events = append(ssr.events, byte(','))
}
ssr.events = append(ssr.events, jsonByte...)
ssr.lastEventRevision = ev.Kv.ModRevision
}
ssr.logger.Debugf("Added events till revision: %d", ssr.events[len(ssr.events)-1].EtcdEvent.Kv.ModRevision)
if ssr.eventMemory >= ssr.config.deltaSnapshotMemoryLimit {
ssr.logger.Infof("Delta events memory crossed the memory limit: %d Bytes", ssr.eventMemory)
ssr.logger.Debugf("Added events till revision: %d", ssr.lastEventRevision)
if len(ssr.events) >= ssr.config.deltaSnapshotMemoryLimit {
ssr.logger.Infof("Delta events memory crossed the memory limit: %d Bytes", len(ssr.events))
return ssr.takeDeltaSnapshotAndResetTimer()
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/snapshot/snapshotter/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ type Snapshotter struct {
fullSnapshotCh chan struct{}
fullSnapshotTimer *time.Timer
deltaSnapshotTimer *time.Timer
events []*event
events []byte
watchCh clientv3.WatchChan
etcdClient *clientv3.Client
cancelWatch context.CancelFunc
SsrStateMutex *sync.Mutex
SsrState State
eventMemory int
lastEventRevision int64
}

// Config stores the configuration parameters for the snapshotter.
Expand Down

0 comments on commit 5fef841

Please sign in to comment.