diff --git a/pkg/snapshot/snapshotter/snapshotter.go b/pkg/snapshot/snapshotter/snapshotter.go index 59b7b22f7..1f8ce1d27 100644 --- a/pkg/snapshot/snapshotter/snapshotter.go +++ b/pkg/snapshot/snapshotter/snapshotter.go @@ -15,11 +15,12 @@ package snapshotter import ( + "bytes" "context" "crypto/sha256" "encoding/json" "fmt" - "io" + "io/ioutil" "math" "path" "sync" @@ -193,6 +194,7 @@ func (ssr *Snapshotter) TakeFullSnapshotAndResetTimer() error { // It basically will connect to etcd. Then ask for snapshot. And finally // store it to underlying snapstore on the fly. func (ssr *Snapshotter) takeFullSnapshot() error { + defer ssr.cleanupInMemoryEvents() // close previous watch and client. ssr.closeEtcdClient() @@ -253,9 +255,6 @@ func (ssr *Snapshotter) takeFullSnapshot() error { return nil } - // make event array empty as any event prior to full snapshot should not be uploaded in delta. - ssr.events = []*event{} - ssr.eventMemory = 0 watchCtx, cancelWatch := context.WithCancel(context.TODO()) ssr.cancelWatch = cancelWatch ssr.etcdClient = client @@ -265,6 +264,11 @@ func (ssr *Snapshotter) takeFullSnapshot() error { return nil } +func (ssr *Snapshotter) cleanupInMemoryEvents() { + ssr.events = []byte{} + ssr.lastEventRevision = -1 +} + func (ssr *Snapshotter) takeDeltaSnapshotAndResetTimer() error { if err := ssr.takeDeltaSnapshot(); err != nil { // As per design principle, in business critical service if backup is not working, @@ -272,7 +276,7 @@ func (ssr *Snapshotter) takeDeltaSnapshotAndResetTimer() error { ssr.logger.Warnf("Taking delta snapshot failed: %v", err) return err } - ssr.eventMemory = 0 + if ssr.deltaSnapshotTimer == nil { ssr.deltaSnapshotTimer = time.NewTimer(time.Second * time.Duration(ssr.config.deltaSnapshotIntervalSeconds)) } else { @@ -285,26 +289,31 @@ func (ssr *Snapshotter) takeDeltaSnapshotAndResetTimer() error { } func (ssr *Snapshotter) takeDeltaSnapshot() error { + defer ssr.cleanupInMemoryEvents() ssr.logger.Infof("Taking delta snapshot for time: %s", time.Now().Local()) if len(ssr.events) == 0 { ssr.logger.Infof("No events received to save snapshot. Skipping delta snapshot.") return nil } + ssr.events = append(ssr.events, byte(']')) - snap := snapstore.NewSnapshot(snapstore.SnapshotKindDelta, ssr.prevSnapshot.LastRevision+1, ssr.events[len(ssr.events)-1].EtcdEvent.Kv.ModRevision) + snap := snapstore.NewSnapshot(snapstore.SnapshotKindDelta, ssr.prevSnapshot.LastRevision+1, ssr.lastEventRevision) snap.SnapDir = ssr.prevSnapshot.SnapDir - pr, pw := io.Pipe() - go jsonEncodeEvents(ssr.events, pw) + + // compute hash + hash := sha256.New() + if _, err := hash.Write(ssr.events); err != nil { + return fmt.Errorf("failed to compute hash of events: %v", err) + } + ssr.events = hash.Sum(ssr.events) startTime := time.Now() - if err := ssr.config.store.Save(*snap, pr); err != nil { + if err := ssr.config.store.Save(*snap, ioutil.NopCloser(bytes.NewReader(ssr.events))); err != nil { timeTaken := time.Now().Sub(startTime).Seconds() metrics.SnapshotDurationSeconds.With(prometheus.Labels{metrics.LabelKind: snapstore.SnapshotKindDelta, metrics.LabelSucceeded: metrics.ValueSucceededFalse}).Observe(timeTaken) ssr.logger.Errorf("Error saving delta snapshots. %v", err) return err } - ssr.events = []*event{} - timeTaken := time.Now().Sub(startTime).Seconds() metrics.SnapshotDurationSeconds.With(prometheus.Labels{metrics.LabelKind: snapstore.SnapshotKindDelta, metrics.LabelSucceeded: metrics.ValueSucceededTrue}).Observe(timeTaken) logrus.Infof("Total time to save delta snapshot: %f seconds.", timeTaken) @@ -315,45 +324,6 @@ func (ssr *Snapshotter) takeDeltaSnapshot() error { return nil } -func jsonEncodeEvents(events []*event, pw *io.PipeWriter) { - // compute hash - hash := sha256.New() - mw := io.MultiWriter(hash, pw) - if len(events) == 0 { - pw.CloseWithError(fmt.Errorf("no event found to encode")) - return - } - if _, err := mw.Write([]byte("[")); err != nil { - pw.CloseWithError(fmt.Errorf("no event found to encode")) - return - } - enc := json.NewEncoder(mw) - if err := enc.Encode(events[0]); err != nil { - pw.CloseWithError(fmt.Errorf("no event found to encode")) - return - } - for _, e := range events[1:] { - if _, err := mw.Write([]byte(",")); err != nil { - pw.CloseWithError(fmt.Errorf("no event found to encode")) - return - } - if err := enc.Encode(e); err != nil { - pw.CloseWithError(fmt.Errorf("no event found to encode")) - return - } - } - if _, err := mw.Write([]byte("]")); err != nil { - pw.CloseWithError(fmt.Errorf("no event found to encode")) - return - } - // write the final check sum to pipe - if _, err := pw.Write(hash.Sum(nil)); err != nil { - pw.CloseWithError(fmt.Errorf("no event found to encode")) - return - } - pw.Close() -} - func (ssr *Snapshotter) handleDeltaWatchEvents(wr clientv3.WatchResponse) error { if err := wr.Err(); err != nil { return err @@ -365,12 +335,17 @@ func (ssr *Snapshotter) handleDeltaWatchEvents(wr clientv3.WatchResponse) error if err != nil { return fmt.Errorf("failed to marshal events to json: %v", err) } - ssr.eventMemory = ssr.eventMemory + len(jsonByte) - ssr.events = append(ssr.events, timedEvent) + if len(ssr.events) == 0 { + ssr.events = append(ssr.events, byte('[')) + } else { + ssr.events = append(ssr.events, byte(',')) + } + ssr.events = append(ssr.events, jsonByte...) + ssr.lastEventRevision = ev.Kv.ModRevision } - ssr.logger.Debugf("Added events till revision: %d", ssr.events[len(ssr.events)-1].EtcdEvent.Kv.ModRevision) - if ssr.eventMemory >= ssr.config.deltaSnapshotMemoryLimit { - ssr.logger.Infof("Delta events memory crossed the memory limit: %d Bytes", ssr.eventMemory) + ssr.logger.Debugf("Added events till revision: %d", ssr.lastEventRevision) + if len(ssr.events) >= ssr.config.deltaSnapshotMemoryLimit { + ssr.logger.Infof("Delta events memory crossed the memory limit: %d Bytes", len(ssr.events)) return ssr.takeDeltaSnapshotAndResetTimer() } return nil diff --git a/pkg/snapshot/snapshotter/types.go b/pkg/snapshot/snapshotter/types.go index ce52137f0..2cb7434a7 100644 --- a/pkg/snapshot/snapshotter/types.go +++ b/pkg/snapshot/snapshotter/types.go @@ -57,13 +57,13 @@ type Snapshotter struct { fullSnapshotCh chan struct{} fullSnapshotTimer *time.Timer deltaSnapshotTimer *time.Timer - events []*event + events []byte watchCh clientv3.WatchChan etcdClient *clientv3.Client cancelWatch context.CancelFunc SsrStateMutex *sync.Mutex SsrState State - eventMemory int + lastEventRevision int64 } // Config stores the configuration parameters for the snapshotter.