Skip to content

Commit

Permalink
Use cron schedule for defragmentation
Browse files Browse the repository at this point in the history
Signed-off-by: Swapnil Mhamane <[email protected]>
  • Loading branch information
Swapnil Mhamane committed Sep 18, 2019
1 parent 3411ceb commit b95f3df
Show file tree
Hide file tree
Showing 411 changed files with 29,797 additions and 11,155 deletions.
7 changes: 5 additions & 2 deletions chart/etcd-backup-restore/templates/etcd-statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ spec:
- --endpoints={{ if .Values.tls }}https{{ else }}http{{ end }}://{{ .Release.Name }}-etcd-0:{{ .Values.servicePorts.client }}
{{- if and .Values.etcdAuth.username .Values.etcdAuth.password }}
- --user={{ .Values.etcdAuth.username }}:{{ .Values.etcdAuth.password }}
{{- end }}
{{- end }}
- get
- foo
initialDelaySeconds: 15
Expand Down Expand Up @@ -105,14 +105,17 @@ spec:
- --insecure-transport=true
- --insecure-skip-tls-verify=true
- --endpoints=http://{{ .Release.Name }}-etcd-0:{{ .Values.servicePorts.client }}
{{- end }}
{{- if .Values.backup.defragmentationSchedule }}
- --defragmentation-schedule={{ .Values.backup.defragmentationSchedule }}
{{- end }}
- --etcd-connection-timeout={{ .Values.backup.etcdConnectionTimeout }}
- --delta-snapshot-period-seconds={{ int $.Values.backup.deltaSnapshotPeriodSeconds }}
- --delta-snapshot-memory-limit={{ int $.Values.backup.deltaSnapshotMemoryLimit }}
{{- if and .Values.etcdAuth.username .Values.etcdAuth.password }}
- --etcd-username={{ .Values.etcdAuth.username }}
- --etcd-password={{ .Values.etcdAuth.password }}
{{- end }}
{{- end }}
image: {{ .Values.images.etcdBackupRestore.repository }}:{{ .Values.images.etcdBackupRestore.tag }}
imagePullPolicy: {{ .Values.images.etcdBackupRestore.pullPolicy }}
ports:
Expand Down
4 changes: 2 additions & 2 deletions chart/etcd-backup-restore/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ backup:
# deltaSnapshotMemoryLimit is memory limit in bytes after which delta snapshots will be taken out of schedule.
deltaSnapshotMemoryLimit: 104857600 #100MB

# defragmentationPeriodHours is period after which the etcd data will defragmented. If this value is set to be lesser than 1, defragmentation will be disabled.
defragmentationPeriodHours: 72
# defragmentationSchedule is schedule on which the etcd data will defragmented. Value should follow standard cron format.
defragmentationSchedule: "* * */3 * *"

# garbageCollectionPolicy mentions the policy for garbage collecting old backups. Allowed values are Exponential(default), LimitBased.
garbageCollectionPolicy: Exponential
Expand Down
3 changes: 2 additions & 1 deletion cmd/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package cmd

import (
"context"
"fmt"
"path"

Expand All @@ -29,7 +30,7 @@ import (

// NewInitializeCommand returns the command to initialize etcd by validating the data
// directory and restoring from cloud store if needed.
func NewInitializeCommand(stopCh <-chan struct{}) *cobra.Command {
func NewInitializeCommand(ctx context.Context) *cobra.Command {

// restoreCmd represents the restore command
initializeCmd := &cobra.Command{
Expand Down
5 changes: 3 additions & 2 deletions cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package cmd

import (
"context"
"fmt"
"path"

Expand All @@ -27,7 +28,7 @@ import (
)

// NewRestoreCommand returns the command to restore
func NewRestoreCommand(stopCh <-chan struct{}) *cobra.Command {
func NewRestoreCommand(ctx context.Context) *cobra.Command {

// restoreCmd represents the restore command
restoreCmd := &cobra.Command{
Expand Down Expand Up @@ -70,7 +71,7 @@ func NewRestoreCommand(stopCh <-chan struct{}) *cobra.Command {
return
}

rs := restorer.NewRestorer(store, logger)
rs := restorer.NewRestorer(store, logrus.NewEntry(logger))

options := &restorer.RestoreOptions{
RestoreDataDir: path.Clean(restoreDataDir),
Expand Down
12 changes: 7 additions & 5 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
package cmd

import (
"context"

"github.com/spf13/cobra"
)

// NewBackupRestoreCommand represents the base command when called without any subcommands
func NewBackupRestoreCommand(stopCh <-chan struct{}) *cobra.Command {
func NewBackupRestoreCommand(ctx context.Context) *cobra.Command {
var RootCmd = &cobra.Command{
Use: "etcdbrctl",
Short: "command line utility for etcd backup restore",
Expand All @@ -34,9 +36,9 @@ from previously taken snapshot.`,
},
}
RootCmd.Flags().BoolVarP(&version, "version", "v", false, "print version info")
RootCmd.AddCommand(NewSnapshotCommand(stopCh),
NewRestoreCommand(stopCh),
NewInitializeCommand(stopCh),
NewServerCommand(stopCh))
RootCmd.AddCommand(NewSnapshotCommand(ctx),
NewRestoreCommand(ctx),
NewInitializeCommand(ctx),
NewServerCommand(ctx))
return RootCmd
}
56 changes: 30 additions & 26 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ import (
"github.com/gardener/etcd-backup-restore/pkg/snapshot/snapshotter"
"github.com/gardener/etcd-backup-restore/pkg/snapstore"
"github.com/prometheus/client_golang/prometheus"
cron "github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

// NewServerCommand create cobra command for snapshot
func NewServerCommand(stopCh <-chan struct{}) *cobra.Command {
func NewServerCommand(ctx context.Context) *cobra.Command {
var serverCmd = &cobra.Command{
Use: "server",
Short: "start the http server with backup scheduler.",
Expand Down Expand Up @@ -106,7 +108,7 @@ func NewServerCommand(stopCh <-chan struct{}) *cobra.Command {
logger.Infof("Created snapstore from provider: %s", storageProvider)

snapshotterConfig, err := snapshotter.NewSnapshotterConfig(
schedule,
fullSnapshotSchedule,
ss,
maxBackups,
deltaSnapshotIntervalSeconds,
Expand All @@ -121,31 +123,42 @@ func NewServerCommand(stopCh <-chan struct{}) *cobra.Command {

logger.Infof("Creating snapshotter...")
ssr = snapshotter.NewSnapshotter(
logger,
logrus.NewEntry(logger),
snapshotterConfig,
)

handler = startHTTPServer(etcdInitializer, ssr)
defer handler.Stop()

ssrStopCh = make(chan struct{})
go handleSsrStopRequest(handler, ssr, ackCh, ssrStopCh, stopCh)
go handleSsrStopRequest(handler, ssr, ackCh, ssrStopCh, ctx.Done())
go handleAckState(handler, ackCh)
startDefragmentationThread(defragmentationPeriodHours, stopCh, tlsConfig, ssr.TriggerFullSnapshot)
runEtcdProbeLoopWithSnapshotter(tlsConfig, handler, ssr, ssrStopCh, stopCh, ackCh)
} else {
// If no storage provider is given, snapshotter will be nil, in which
// case the status is set to OK as soon as etcd probe is successful
handler = startHTTPServer(etcdInitializer, nil)
defer handler.Stop()

// start defragmentation without trigerring full snapshot
// after each successful data defragmentation
startDefragmentationThread(defragmentationPeriodHours, stopCh, tlsConfig, func() error {
return nil
})
runEtcdProbeLoopWithoutSnapshotter(tlsConfig, handler, stopCh, ackCh)
defragSchedule, err := cron.ParseStandard(defragmentationSchedule)
if err != nil {
logger.Fatalf("failed to parse defragmentation schedule: %v", err)
return
}
go etcdutil.DefragDataPeriodically(ctx, tlsConfig, defragSchedule, time.Duration(etcdConnectionTimeout)*time.Second, ssr.TriggerFullSnapshot, logrus.NewEntry(logger))

runEtcdProbeLoopWithSnapshotter(tlsConfig, handler, ssr, ssrStopCh, ctx.Done(), ackCh)
return
}
// If no storage provider is given, snapshotter will be nil, in which
// case the status is set to OK as soon as etcd probe is successful
handler = startHTTPServer(etcdInitializer, nil)
defer handler.Stop()

// start defragmentation without trigerring full snapshot
// after each successful data defragmentation
defragSchedule, err := cron.ParseStandard(defragmentationSchedule)
if err != nil {
logger.Fatalf("failed to parse defragmentation schedule: %v", err)
return
}
go etcdutil.DefragDataPeriodically(ctx, tlsConfig, defragSchedule, time.Duration(etcdConnectionTimeout)*time.Second, nil, logrus.NewEntry(logger))

runEtcdProbeLoopWithoutSnapshotter(tlsConfig, handler, ctx.Done(), ackCh)
},
}

Expand Down Expand Up @@ -180,15 +193,6 @@ func startHTTPServer(initializer initializer.Initializer, ssr *snapshotter.Snaps
return handler
}

// startDefragmentationThread starts the etcd data defragmentation thread
func startDefragmentationThread(defragPeriod int, stopCh <-chan struct{}, tlsConfig *etcdutil.TLSConfig, triggerFullSnapshotCallback func() error) {
if defragPeriod < 1 {
logger.Infof("Disabling defragmentation since defragmentation period [%d] is less than 1", defragPeriod)
return
}
go etcdutil.DefragDataPeriodically(stopCh, tlsConfig, time.Duration(defragPeriod)*time.Hour, time.Duration(etcdConnectionTimeout)*time.Second, triggerFullSnapshotCallback)
}

// runEtcdProbeLoopWithoutSnapshotter runs the etcd probe loop
// for the case where snapshotter is configured correctly
func runEtcdProbeLoopWithSnapshotter(tlsConfig *etcdutil.TLSConfig, handler *server.HTTPHandler, ssr *snapshotter.Snapshotter, ssrStopCh chan struct{}, stopCh <-chan struct{}, ackCh chan struct{}) {
Expand Down
28 changes: 15 additions & 13 deletions cmd/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@
package cmd

import (
"context"
"path"
"time"

"github.com/gardener/etcd-backup-restore/pkg/etcdutil"
"github.com/gardener/etcd-backup-restore/pkg/snapshot/snapshotter"
"github.com/gardener/etcd-backup-restore/pkg/snapstore"
cron "github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

// NewSnapshotCommand create cobra command for snapshot
func NewSnapshotCommand(stopCh <-chan struct{}) *cobra.Command {
func NewSnapshotCommand(ctx context.Context) *cobra.Command {
var command = &cobra.Command{
Use: "snapshot",
Short: "takes the snapshot of etcd periodically.",
Expand Down Expand Up @@ -55,7 +58,7 @@ storing snapshots on various cloud storage providers as well as local disk locat
etcdUsername,
etcdPassword)
snapshotterConfig, err := snapshotter.NewSnapshotterConfig(
schedule,
fullSnapshotSchedule,
ss,
maxBackups,
deltaSnapshotIntervalSeconds,
Expand All @@ -68,21 +71,20 @@ storing snapshots on various cloud storage providers as well as local disk locat
logger.Fatalf("failed to create snapstore config: %v", err)
}
ssr := snapshotter.NewSnapshotter(
logger,
logrus.NewEntry(logger),
snapshotterConfig)

if defragmentationPeriodHours < 1 {
logger.Infof("Disabling defragmentation since defragmentation period [%d] is less than 1", defragmentationPeriodHours)
} else {
go etcdutil.DefragDataPeriodically(stopCh, tlsConfig, time.Duration(defragmentationPeriodHours)*time.Hour, time.Duration(etcdConnectionTimeout)*time.Second, ssr.TriggerFullSnapshot)
defragSchedule, err := cron.ParseStandard(defragmentationSchedule)
if err != nil {
logger.Fatalf("failed to parse defragmentation schedule: %v", err)
return
}
go etcdutil.DefragDataPeriodically(ctx, tlsConfig, defragSchedule, time.Duration(etcdConnectionTimeout)*time.Second, ssr.TriggerFullSnapshot, logrus.NewEntry(logger))

gcStopCh := make(chan struct{})
go ssr.RunGarbageCollector(gcStopCh)
if err := ssr.Run(stopCh, true); err != nil {
go ssr.RunGarbageCollector(ctx.Done())
if err := ssr.Run(ctx.Done(), true); err != nil {
logger.Fatalf("Snapshotter failed with error: %v", err)
}
close(gcStopCh)
logger.Info("Shutting down...")
return
},
Expand All @@ -95,7 +97,7 @@ storing snapshots on various cloud storage providers as well as local disk locat
// initializeSnapshotterFlags adds snapshotter related flags to <cmd>
func initializeSnapshotterFlags(cmd *cobra.Command) {
cmd.Flags().StringSliceVarP(&etcdEndpoints, "endpoints", "e", []string{"127.0.0.1:2379"}, "comma separated list of etcd endpoints")
cmd.Flags().StringVarP(&schedule, "schedule", "s", "* */1 * * *", "schedule for snapshots")
cmd.Flags().StringVarP(&fullSnapshotSchedule, "schedule", "s", "* */1 * * *", "schedule for snapshots")
cmd.Flags().IntVarP(&deltaSnapshotIntervalSeconds, "delta-snapshot-period-seconds", "i", snapshotter.DefaultDeltaSnapshotIntervalSeconds, "Period in seconds after which delta snapshot will be persisted. If this value is set to be lesser than 1, delta snapshotting will be disabled.")
cmd.Flags().IntVar(&deltaSnapshotMemoryLimit, "delta-snapshot-memory-limit", snapshotter.DefaultDeltaSnapMemoryLimit, "memory limit after which delta snapshots will be taken")
cmd.Flags().IntVarP(&maxBackups, "max-backups", "m", snapshotter.DefaultMaxBackups, "maximum number of previous backups to keep")
Expand All @@ -109,5 +111,5 @@ func initializeSnapshotterFlags(cmd *cobra.Command) {
cmd.Flags().StringVar(&caFile, "cacert", "", "verify certificates of TLS-enabled secure servers using this CA bundle")
cmd.Flags().StringVar(&etcdUsername, "etcd-username", "", "etcd server username, if one is required")
cmd.Flags().StringVar(&etcdPassword, "etcd-password", "", "etcd server password, if one is required")
cmd.Flags().IntVar(&defragmentationPeriodHours, "defragmentation-period-hours", 72, "period after which we should defragment etcd data directory")
cmd.Flags().StringVar(&defragmentationSchedule, "defragmentation-schedule", "0 0 */3 * *", "schedule to defragment etcd data directory")
}
4 changes: 2 additions & 2 deletions cmd/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var (
logger = logrus.New()
version bool
//snapshotter flags
schedule string
fullSnapshotSchedule string
etcdEndpoints []string
etcdUsername string
etcdPassword string
Expand All @@ -44,7 +44,7 @@ var (
certFile string
keyFile string
caFile string
defragmentationPeriodHours int
defragmentationSchedule string

//server flags
port int
Expand Down
Loading

0 comments on commit b95f3df

Please sign in to comment.