Skip to content

Commit

Permalink
Add memory limit of events aggregation
Browse files Browse the repository at this point in the history
Signed-off-by: Swapnil Mhamane <[email protected]>
  • Loading branch information
Swapnil Mhamane committed Dec 19, 2018
1 parent d43f10c commit bbc5309
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 21 deletions.
1 change: 1 addition & 0 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func NewServerCommand(stopCh <-chan struct{}) *cobra.Command {
ss,
maxBackups,
deltaSnapshotIntervalSeconds,
deltaSnapshotMemoryLimit,
time.Duration(etcdConnectionTimeout),
time.Duration(garbageCollectionPeriodSeconds),
garbageCollectionPolicy,
Expand Down
6 changes: 4 additions & 2 deletions cmd/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ storing snapshots on various cloud storage providers as well as local disk locat
ss,
maxBackups,
deltaSnapshotIntervalSeconds,
deltaSnapshotMemoryLimit,
time.Duration(etcdConnectionTimeout),
time.Duration(garbageCollectionPeriodSeconds),
garbageCollectionPolicy,
Expand Down Expand Up @@ -92,8 +93,9 @@ storing snapshots on various cloud storage providers as well as local disk locat
func initializeSnapshotterFlags(cmd *cobra.Command) {
cmd.Flags().StringSliceVarP(&etcdEndpoints, "endpoints", "e", []string{"127.0.0.1:2379"}, "comma separated list of etcd endpoints")
cmd.Flags().StringVarP(&schedule, "schedule", "s", "* */1 * * *", "schedule for snapshots")
cmd.Flags().IntVarP(&deltaSnapshotIntervalSeconds, "delta-snapshot-period-seconds", "i", 10, "Period in seconds after which delta snapshot will be persisted")
cmd.Flags().IntVarP(&maxBackups, "max-backups", "m", 7, "maximum number of previous backups to keep")
cmd.Flags().IntVarP(&deltaSnapshotIntervalSeconds, "delta-snapshot-period-seconds", "i", snapshotter.DefaultDeltaSnapshotIntervalSeconds, "Period in seconds after which delta snapshot will be persisted")
cmd.Flags().IntVar(&deltaSnapshotMemoryLimit, "delta-snapshot-memory-limit", snapshotter.DefaultDeltaSnapMemoryLimit, "memory limit after which delta snapshots will be taken")
cmd.Flags().IntVarP(&maxBackups, "max-backups", "m", snapshotter.DefaultMaxBackups, "maximum number of previous backups to keep")
cmd.Flags().IntVar(&etcdConnectionTimeout, "etcd-connection-timeout", 30, "etcd client connection timeout")
cmd.Flags().IntVar(&garbageCollectionPeriodSeconds, "garbage-collection-period-seconds", 60, "Period in seconds for garbage collecting old backups")
cmd.Flags().StringVar(&garbageCollectionPolicy, "garbage-collection-policy", snapshotter.GarbageCollectionPolicyExponential, "Policy for garbage collecting old backups")
Expand Down
1 change: 1 addition & 0 deletions cmd/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (
schedule string
etcdEndpoints []string
deltaSnapshotIntervalSeconds int
deltaSnapshotMemoryLimit int
maxBackups int
etcdConnectionTimeout int
garbageCollectionPeriodSeconds int
Expand Down
1 change: 1 addition & 0 deletions pkg/snapshot/restorer/restorer_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func runSnapshotter(logger *logrus.Logger, endpoints []string, stopCh chan struc
store,
maxBackups,
deltaSnapshotPeriod,
snapshotter.DefaultDeltaSnapMemoryLimit,
etcdConnectionTimeout,
garbageCollectionPeriodSeconds,
garbageCollectionPolicy,
Expand Down
4 changes: 4 additions & 0 deletions pkg/snapshot/snapshotter/garbagecollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import (

// RunGarbageCollector basically consider the older backups as garbage and deletes it
func (ssr *Snapshotter) RunGarbageCollector(stopCh <-chan struct{}) {
if ssr.config.garbageCollectionPeriodSeconds <= 0 {
ssr.logger.Infof("GC: Not running garbage collector since GarbageCollectionPeriodSeconds [%d] set to less than 1.", ssr.config.garbageCollectionPeriodSeconds)
return
}
for {
select {
case <-stopCh:
Expand Down
50 changes: 38 additions & 12 deletions pkg/snapshot/snapshotter/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,31 @@ import (
)

// NewSnapshotterConfig returns a config for the snapshotter.
func NewSnapshotterConfig(schedule string, store snapstore.SnapStore, maxBackups, deltaSnapshotIntervalSeconds int, etcdConnectionTimeout, garbageCollectionPeriodSeconds time.Duration, garbageCollectionPolicy string, tlsConfig *etcdutil.TLSConfig) (*Config, error) {
func NewSnapshotterConfig(schedule string, store snapstore.SnapStore, maxBackups, deltaSnapshotIntervalSeconds, deltaSnapshotMemoryLimit int, etcdConnectionTimeout, garbageCollectionPeriodSeconds time.Duration, garbageCollectionPolicy string, tlsConfig *etcdutil.TLSConfig) (*Config, error) {
logrus.Printf("Validating schedule...")
sdl, err := cron.ParseStandard(schedule)
if err != nil {
return nil, fmt.Errorf("invalid schedule provied %s : %v", schedule, err)
}
if maxBackups < 1 {
return nil, fmt.Errorf("maximum backups limit should be greater than zero. Input MaxBackups: %d", maxBackups)

if garbageCollectionPolicy == GarbageCollectionPolicyLimitBased && maxBackups < 1 {
logrus.Infof("Found garbage collection policy: [%s], and maximum backup value %d less than 1. Setting it to default: %d ", GarbageCollectionPolicyLimitBased, maxBackups, DefaultMaxBackups)
maxBackups = DefaultMaxBackups
}
if deltaSnapshotIntervalSeconds < 1 {
logrus.Infof("Found delta snapshot interval %d second less than 1 second. Setting it to default: %d ", deltaSnapshotIntervalSeconds, DefaultDeltaSnapshotIntervalSeconds)
deltaSnapshotIntervalSeconds = DefaultDeltaSnapshotIntervalSeconds
}
if deltaSnapshotMemoryLimit < 1 {
logrus.Infof("Found delta snapshot memory limit %d bytes less than 1 byte. Setting it to default: %d ", deltaSnapshotMemoryLimit, DefaultDeltaSnapMemoryLimit)
deltaSnapshotMemoryLimit = DefaultDeltaSnapMemoryLimit
}

return &Config{
schedule: sdl,
store: store,
deltaSnapshotIntervalSeconds: deltaSnapshotIntervalSeconds,
deltaSnapshotMemoryLimit: deltaSnapshotMemoryLimit,
etcdConnectionTimeout: etcdConnectionTimeout,
garbageCollectionPeriodSeconds: garbageCollectionPeriodSeconds,
garbageCollectionPolicy: garbageCollectionPolicy,
Expand Down Expand Up @@ -191,6 +203,7 @@ func (ssr *Snapshotter) takeFullSnapshot() error {
ssr.logger.Infof("Successfully saved full snapshot at: %s", path.Join(s.SnapDir, s.SnapName))
//make event array empty as any event prior to full snapshot should not be uploaded in delta.
ssr.events = []*event{}
ssr.eventMemory = 0
watchCtx, cancelWatch := context.WithCancel(context.TODO())
ssr.cancelWatch = cancelWatch
ssr.watchCh = client.Watch(watchCtx, "", clientv3.WithPrefix(), clientv3.WithRev(ssr.prevSnapshot.LastRevision+1))
Expand All @@ -200,6 +213,13 @@ func (ssr *Snapshotter) takeFullSnapshot() error {
}

func (ssr *Snapshotter) takeDeltaSnapshotAndResetTimer() error {
if err := ssr.takeDeltaSnapshot(); err != nil {
// As per design principle, in business critical service if backup is not working,
// it's better to fail the process. So, we are quiting here.
ssr.logger.Warnf("Taking delta snapshot failed: %v", err)
return err
}
ssr.eventMemory = 0
if ssr.deltaSnapshotTimer == nil {
ssr.deltaSnapshotTimer = time.NewTimer(time.Second * time.Duration(ssr.config.deltaSnapshotIntervalSeconds))
} else {
Expand All @@ -208,12 +228,6 @@ func (ssr *Snapshotter) takeDeltaSnapshotAndResetTimer() error {
ssr.logger.Infof("Reseting delta snapshot to run after %d secs.", ssr.config.deltaSnapshotIntervalSeconds)
ssr.deltaSnapshotTimer.Reset(time.Second * time.Duration(ssr.config.deltaSnapshotIntervalSeconds))
}
if err := ssr.takeDeltaSnapshot(); err != nil {
// As per design principle, in business critical service if backup is not working,
// it's better to fail the process. So, we are quiting here.
ssr.logger.Warnf("Taking delta snapshot failed: %v", err)
return err
}
return nil
}

Expand All @@ -234,10 +248,12 @@ func (ssr *Snapshotter) takeDeltaSnapshot() error {
}
// compute hash
hash := sha256.New()
hash.Write(data)
if _, err := hash.Write(data); err != nil {
return fmt.Errorf("failed to compute hash of events: %v", err)
}
data = hash.Sum(data)
dataReader := bytes.NewReader(data)

dataReader := bytes.NewReader(data)
if err := ssr.config.store.Save(*snap, dataReader); err != nil {
ssr.logger.Errorf("Error saving delta snapshots. %v", err)
return err
Expand All @@ -253,9 +269,19 @@ func (ssr *Snapshotter) handleDeltaWatchEvents(wr clientv3.WatchResponse) error
}
// aggregate events
for _, ev := range wr.Events {
ssr.events = append(ssr.events, newEvent(ev))
timedEvent := newEvent(ev)
jsonByte, err := json.Marshal(timedEvent)
if err != nil {
return fmt.Errorf("failed to marshal events to json: %v", err)
}
ssr.eventMemory = ssr.eventMemory + len(jsonByte)
ssr.events = append(ssr.events, timedEvent)
}
ssr.logger.Debugf("Added events till revision: %d", ssr.events[len(ssr.events)-1].EtcdEvent.Kv.ModRevision)
if ssr.eventMemory >= ssr.config.deltaSnapshotMemoryLimit {
ssr.logger.Infof("Delta events memory crossed the memory limit: %d Bytes", ssr.eventMemory)
return ssr.takeDeltaSnapshotAndResetTimer()
}
return nil
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/snapshot/snapshotter/snapshotter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ var _ = Describe("Snapshotter", func() {
store,
1,
10,
DefaultDeltaSnapMemoryLimit,
etcdConnectionTimeout,
garbageCollectionPeriodSeconds,
GarbageCollectionPolicyExponential,
Expand All @@ -94,6 +95,7 @@ var _ = Describe("Snapshotter", func() {
store,
1,
10,
DefaultDeltaSnapMemoryLimit,
etcdConnectionTimeout,
garbageCollectionPeriodSeconds,
GarbageCollectionPolicyExponential,
Expand Down Expand Up @@ -125,6 +127,7 @@ var _ = Describe("Snapshotter", func() {
store,
maxBackups,
10,
DefaultDeltaSnapMemoryLimit,
etcdConnectionTimeout,
garbageCollectionPeriodSeconds,
GarbageCollectionPolicyExponential,
Expand Down Expand Up @@ -174,6 +177,7 @@ var _ = Describe("Snapshotter", func() {
store,
maxBackups,
10,
DefaultDeltaSnapMemoryLimit,
etcdConnectionTimeout,
garbageCollectionPeriodSeconds,
GarbageCollectionPolicyExponential,
Expand Down Expand Up @@ -231,6 +235,7 @@ var _ = Describe("Snapshotter", func() {
store,
maxBackups,
10,
DefaultDeltaSnapMemoryLimit,
etcdConnectionTimeout,
garbageCollectionPeriodSeconds,
GarbageCollectionPolicyExponential,
Expand Down Expand Up @@ -376,6 +381,7 @@ var _ = Describe("Snapshotter", func() {
store,
maxBackups,
10,
DefaultDeltaSnapMemoryLimit,
etcdConnectionTimeout,
garbageCollectionPeriodSeconds,
GarbageCollectionPolicyExponential,
Expand Down Expand Up @@ -426,6 +432,7 @@ var _ = Describe("Snapshotter", func() {
store,
maxBackups,
10,
DefaultDeltaSnapMemoryLimit,
etcdConnectionTimeout,
garbageCollectionPeriodSeconds,
GarbageCollectionPolicyLimitBased,
Expand Down
20 changes: 13 additions & 7 deletions pkg/snapshot/snapshotter/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,24 @@ const (
GarbageCollectionPolicyExponential = "Exponential"
// GarbageCollectionPolicyLimitBased defines the limit based policy for garbage collecting old backups
GarbageCollectionPolicyLimitBased = "LimitBased"
)

var emptyStruct struct{}

// State denotes the state the snapshotter would be in.
type State int
// DefaultMaxBackups is default number of maximum backups for limit based garbage collection policy.
DefaultMaxBackups = 7

const (
// SnapshotterInactive is set when the snapshotter has not started taking snapshots.
SnapshotterInactive State = 0
// SnapshotterActive is set when the snapshotter has started taking snapshots.
SnapshotterActive State = 1
// DefaultDeltaSnapMemoryLimit is default memory limit for delta snapshots.
DefaultDeltaSnapMemoryLimit = 10 * 1024 * 1024 //10Mib
// DefaultDeltaSnapshotIntervalSeconds is the default interval for delta snapshots in seconds.
DefaultDeltaSnapshotIntervalSeconds = 20
)

var emptyStruct struct{}

// State denotes the state the snapshotter would be in.
type State int

// Snapshotter is a struct for etcd snapshot taker
type Snapshotter struct {
logger *logrus.Logger
Expand All @@ -58,6 +62,7 @@ type Snapshotter struct {
cancelWatch context.CancelFunc
SsrStateMutex *sync.Mutex
SsrState State
eventMemory int
}

// Config stores the configuration parameters for the snapshotter.
Expand All @@ -66,6 +71,7 @@ type Config struct {
store snapstore.SnapStore
maxBackups int
deltaSnapshotIntervalSeconds int
deltaSnapshotMemoryLimit int
etcdConnectionTimeout time.Duration
garbageCollectionPeriodSeconds time.Duration
garbageCollectionPolicy string
Expand Down

0 comments on commit bbc5309

Please sign in to comment.