Skip to content

Commit

Permalink
Change garbage-collection-period-seconds flag to
Browse files Browse the repository at this point in the history
garbage-collection-period

Signed-off-by: Swapnil Mhamane <[email protected]>
  • Loading branch information
Swapnil Mhamane committed Sep 12, 2019
1 parent a22922b commit cd7ddc0
Show file tree
Hide file tree
Showing 22 changed files with 371 additions and 380 deletions.
2 changes: 1 addition & 1 deletion chart/etcd-backup-restore/templates/etcd-statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ spec:
- --defragmentation-schedule={{ .Values.backup.defragmentationSchedule }}
{{- end }}
- --etcd-connection-timeout={{ .Values.backup.etcdConnectionTimeout }}
- --delta-snapshot-period-seconds={{ int $.Values.backup.deltaSnapshotPeriodSeconds }}
- --delta-snapshot-period={{ int $.Values.backup.deltaSnapshotPeriod }}
- --delta-snapshot-memory-limit={{ int $.Values.backup.deltaSnapshotMemoryLimit }}
{{- if and .Values.etcdAuth.username .Values.etcdAuth.password }}
- --etcd-username={{ .Values.etcdAuth.username }}
Expand Down
6 changes: 3 additions & 3 deletions chart/etcd-backup-restore/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ backup:
# schedule is cron standard schedule to take full snapshots.
schedule: "0 */1 * * *"

# deltaSnapshotPeriodSeconds is Period in seconds after which delta snapshot will be persisted. If this value is set to be lesser than 1, delta snapshotting will be disabled.
deltaSnapshotPeriodSeconds: 60
# deltaSnapshotPeriod is Period after which delta snapshot will be persisted. If this value is set to be lesser than 1 second, delta snapshotting will be disabled.
deltaSnapshotPeriod: "60s"
# deltaSnapshotMemoryLimit is memory limit in bytes after which delta snapshots will be taken out of schedule.
deltaSnapshotMemoryLimit: 104857600 #100MB

Expand All @@ -53,7 +53,7 @@ backup:
# maxBackups is the maximum number of backups to keep (may change in future). This is interpreted in case of garbageCollectionPolicy set to LimitBased.
maxBackups: 7

etcdConnectionTimeout: 300
etcdConnectionTimeout: "30s"
# etcdQuotaBytes used to Raise alarms when backend DB size exceeds the given quota bytes
etcdQuotaBytes: 8589934592 #8GB

Expand Down
181 changes: 91 additions & 90 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,10 @@ func NewServerCommand(ctx context.Context) *cobra.Command {
Short: "start the http server with backup scheduler.",
Long: `Server will keep listening for http request to deliver its functionality through http endpoints.`,
Run: func(cmd *cobra.Command, args []string) {
// TODO: Refactor the code to move it to coordination logic to server package.
printVersionInfo()
var (
snapstoreConfig *snapstore.Config
ssrStopCh chan struct{}
ackCh chan struct{}
ssr *snapshotter.Snapshotter
handler *server.HTTPHandler
snapshotterEnabled bool
)
ackCh = make(chan struct{})

// TODO: move this to validate config
clusterUrlsMap, err := types.NewURLsMap(restoreCluster)
if err != nil {
logger.Fatalf("failed creating url map for restore cluster: %v", err)
Expand All @@ -63,6 +57,12 @@ func NewServerCommand(ctx context.Context) *cobra.Command {
logger.Fatalf("failed parsing peers urls for restore cluster: %v", err)
}

defragSchedule, err := cron.ParseStandard(defragmentationSchedule)
if err != nil {
logger.Fatalf("failed to parse defragmentation schedule: %v", err)
}

// TODO: move this to build/complete config
options := &restorer.RestoreOptions{
RestoreDataDir: path.Clean(restoreDataDir),
Name: restoreName,
Expand All @@ -74,22 +74,6 @@ func NewServerCommand(ctx context.Context) *cobra.Command {
EmbeddedEtcdQuotaBytes: embeddedEtcdQuotaBytes,
}

if storageProvider == "" {
snapshotterEnabled = false
logger.Warnf("No snapstore storage provider configured. Will not start backup schedule.")
} else {
snapshotterEnabled = true
snapstoreConfig = &snapstore.Config{
Provider: storageProvider,
Container: storageContainer,
Prefix: path.Join(storagePrefix, backupFormatVersion),
MaxParallelChunkUploads: maxParallelChunkUploads,
TempDir: snapstoreTempDir,
}
}

etcdInitializer := initializer.NewInitializer(options, snapstoreConfig, logger)

tlsConfig := etcdutil.NewTLSConfig(
certFile,
keyFile,
Expand All @@ -100,65 +84,12 @@ func NewServerCommand(ctx context.Context) *cobra.Command {
etcdUsername,
etcdPassword)

if snapshotterEnabled {
ss, err := snapstore.GetSnapstore(snapstoreConfig)
if err != nil {
logger.Fatalf("Failed to create snapstore from configured storage provider: %v", err)
}
logger.Infof("Created snapstore from provider: %s", storageProvider)

snapshotterConfig, err := snapshotter.NewSnapshotterConfig(
fullSnapshotSchedule,
ss,
maxBackups,
deltaSnapshotIntervalSeconds,
deltaSnapshotMemoryLimit,
time.Duration(etcdConnectionTimeout),
time.Duration(garbageCollectionPeriodSeconds),
garbageCollectionPolicy,
tlsConfig)
if err != nil {
logger.Fatalf("failed to create snapshotter config: %v", err)
}

logger.Infof("Creating snapshotter...")
ssr = snapshotter.NewSnapshotter(
logrus.NewEntry(logger),
snapshotterConfig,
)

handler = startHTTPServer(etcdInitializer, ssr)
defer handler.Stop()

ssrStopCh = make(chan struct{})
go handleSsrStopRequest(handler, ssr, ackCh, ssrStopCh, ctx.Done())
go handleAckState(handler, ackCh)

defragSchedule, err := cron.ParseStandard(defragmentationSchedule)
if err != nil {
logger.Fatalf("failed to parse defragmentation schedule: %v", err)
return
}
go etcdutil.DefragDataPeriodically(ctx, tlsConfig, defragSchedule, time.Duration(etcdConnectionTimeout)*time.Second, ssr.TriggerFullSnapshot, logrus.NewEntry(logger))

runEtcdProbeLoopWithSnapshotter(tlsConfig, handler, ssr, ssrStopCh, ctx.Done(), ackCh)
return
}
// If no storage provider is given, snapshotter will be nil, in which
// case the status is set to OK as soon as etcd probe is successful
handler = startHTTPServer(etcdInitializer, nil)
defer handler.Stop()

// start defragmentation without trigerring full snapshot
// after each successful data defragmentation
defragSchedule, err := cron.ParseStandard(defragmentationSchedule)
if err != nil {
logger.Fatalf("failed to parse defragmentation schedule: %v", err)
if storageProvider == "" {
logger.Warnf("No snapstore storage provider configured. Will not start backup schedule.")
runServerWithoutSnapshotter(ctx, tlsConfig, options, defragSchedule)
return
}
go etcdutil.DefragDataPeriodically(ctx, tlsConfig, defragSchedule, time.Duration(etcdConnectionTimeout)*time.Second, nil, logrus.NewEntry(logger))

runEtcdProbeLoopWithoutSnapshotter(tlsConfig, handler, ctx.Done(), ackCh)
runServerWithSnapshotter(ctx, tlsConfig, options, defragSchedule)
},
}

Expand Down Expand Up @@ -193,9 +124,62 @@ func startHTTPServer(initializer initializer.Initializer, ssr *snapshotter.Snaps
return handler
}

// runServerWithoutSnapshotter runs the etcd-backup-restore
// for the case where snapshotter is configured correctly
func runServerWithSnapshotter(ctx context.Context, tlsConfig *etcdutil.TLSConfig, restoreOpts *restorer.RestoreOptions, defragSchedule cron.Schedule) {
ackCh := make(chan struct{})
snapstoreConfig := &snapstore.Config{
Provider: storageProvider,
Container: storageContainer,
Prefix: path.Join(storagePrefix, backupFormatVersion),
MaxParallelChunkUploads: maxParallelChunkUploads,
TempDir: snapstoreTempDir,
}

etcdInitializer := initializer.NewInitializer(restoreOpts, snapstoreConfig, logger)

ss, err := snapstore.GetSnapstore(snapstoreConfig)
if err != nil {
logger.Fatalf("Failed to create snapstore from configured storage provider: %v", err)
}
logger.Infof("Created snapstore from provider: %s", storageProvider)

snapshotterConfig, err := snapshotter.NewSnapshotterConfig(
fullSnapshotSchedule,
ss,
maxBackups,
deltaSnapshotMemoryLimit,
deltaSnapshotInterval,
etcdConnectionTimeout,
garbageCollectionPeriod,
garbageCollectionPolicy,
tlsConfig)
if err != nil {
logger.Fatalf("failed to create snapshotter config: %v", err)
}

logger.Infof("Creating snapshotter...")
ssr := snapshotter.NewSnapshotter(
logrus.NewEntry(logger),
snapshotterConfig,
)

handler := startHTTPServer(etcdInitializer, ssr)
defer handler.Stop()

ssrStopCh := make(chan struct{})
go handleSsrStopRequest(ctx, handler, ssr, ackCh, ssrStopCh)
go handleAckState(handler, ackCh)

go etcdutil.DefragDataPeriodically(ctx, tlsConfig, defragSchedule, etcdConnectionTimeout, ssr.TriggerFullSnapshot, logrus.NewEntry(logger))

runEtcdProbeLoopWithSnapshotter(ctx, tlsConfig, handler, ssr, ssrStopCh, ackCh)
return
}

// runEtcdProbeLoopWithoutSnapshotter runs the etcd probe loop
// for the case where snapshotter is configured correctly
func runEtcdProbeLoopWithSnapshotter(tlsConfig *etcdutil.TLSConfig, handler *server.HTTPHandler, ssr *snapshotter.Snapshotter, ssrStopCh chan struct{}, stopCh <-chan struct{}, ackCh chan struct{}) {
func runEtcdProbeLoopWithSnapshotter(ctx context.Context, tlsConfig *etcdutil.TLSConfig, handler *server.HTTPHandler, ssr *snapshotter.Snapshotter, ssrStopCh chan struct{}, ackCh chan struct{}) {
var (
err error
initialDeltaSnapshotTaken bool
Expand All @@ -204,7 +188,7 @@ func runEtcdProbeLoopWithSnapshotter(tlsConfig *etcdutil.TLSConfig, handler *ser
for {
logger.Infof("Probing etcd...")
select {
case <-stopCh:
case <-ctx.Done():
logger.Info("Shutting down...")
return
default:
Expand Down Expand Up @@ -284,14 +268,31 @@ func runEtcdProbeLoopWithSnapshotter(tlsConfig *etcdutil.TLSConfig, handler *ser
}
}

// runServerWithoutSnapshotter runs the etcd-backup-restore
// for the case where snapshotter is not configured
func runServerWithoutSnapshotter(ctx context.Context, tlsConfig *etcdutil.TLSConfig, restoreOpts *restorer.RestoreOptions, defragSchedule cron.Schedule) {
etcdInitializer := initializer.NewInitializer(restoreOpts, nil, logger)

// If no storage provider is given, snapshotter will be nil, in which
// case the status is set to OK as soon as etcd probe is successful
handler := startHTTPServer(etcdInitializer, nil)
defer handler.Stop()

// start defragmentation without trigerring full snapshot
// after each successful data defragmentation
go etcdutil.DefragDataPeriodically(ctx, tlsConfig, defragSchedule, etcdConnectionTimeout, nil, logrus.NewEntry(logger))

runEtcdProbeLoopWithoutSnapshotter(ctx, tlsConfig, handler)
}

// runEtcdProbeLoopWithoutSnapshotter runs the etcd probe loop
// for the case where snapshotter is not configured
func runEtcdProbeLoopWithoutSnapshotter(tlsConfig *etcdutil.TLSConfig, handler *server.HTTPHandler, stopCh <-chan struct{}, ackCh chan struct{}) {
func runEtcdProbeLoopWithoutSnapshotter(ctx context.Context, tlsConfig *etcdutil.TLSConfig, handler *server.HTTPHandler) {
var err error
for {
logger.Infof("Probing etcd...")
select {
case <-stopCh:
case <-ctx.Done():
logger.Info("Shutting down...")
return
default:
Expand All @@ -304,7 +305,7 @@ func runEtcdProbeLoopWithoutSnapshotter(tlsConfig *etcdutil.TLSConfig, handler *
}

handler.SetStatus(http.StatusOK)
<-stopCh
<-ctx.Done()
handler.SetStatus(http.StatusServiceUnavailable)
logger.Infof("Received stop signal. Terminating !!")
return
Expand All @@ -327,7 +328,7 @@ func ProbeEtcd(tlsConfig *etcdutil.TLSConfig) error {
}
}

ctx, cancel := context.WithTimeout(context.TODO(), time.Duration(etcdConnectionTimeout)*time.Second)
ctx, cancel := context.WithTimeout(context.TODO(), etcdConnectionTimeout)
defer cancel()
if _, err := client.Get(ctx, "foo"); err != nil {
logger.Errorf("Failed to connect to client: %v", err)
Expand All @@ -346,12 +347,12 @@ func handleAckState(handler *server.HTTPHandler, ackCh chan struct{}) {
}

// handleSsrStopRequest responds to handlers request and stop interrupt.
func handleSsrStopRequest(handler *server.HTTPHandler, ssr *snapshotter.Snapshotter, ackCh, ssrStopCh chan struct{}, stopCh <-chan struct{}) {
func handleSsrStopRequest(ctx context.Context, handler *server.HTTPHandler, ssr *snapshotter.Snapshotter, ackCh, ssrStopCh chan struct{}) {
for {
var ok bool
select {
case _, ok = <-handler.ReqCh:
case _, ok = <-stopCh:
case _, ok = <-ctx.Done():
}

ssr.SsrStateMutex.Lock()
Expand Down
14 changes: 7 additions & 7 deletions cmd/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ storing snapshots on various cloud storage providers as well as local disk locat
fullSnapshotSchedule,
ss,
maxBackups,
deltaSnapshotIntervalSeconds,
deltaSnapshotMemoryLimit,
time.Duration(etcdConnectionTimeout),
time.Duration(garbageCollectionPeriodSeconds),
deltaSnapshotInterval,
etcdConnectionTimeout,
garbageCollectionPeriod,
garbageCollectionPolicy,
tlsConfig)
if err != nil {
Expand All @@ -79,7 +79,7 @@ storing snapshots on various cloud storage providers as well as local disk locat
logger.Fatalf("failed to parse defragmentation schedule: %v", err)
return
}
go etcdutil.DefragDataPeriodically(ctx, tlsConfig, defragSchedule, time.Duration(etcdConnectionTimeout)*time.Second, ssr.TriggerFullSnapshot, logrus.NewEntry(logger))
go etcdutil.DefragDataPeriodically(ctx, tlsConfig, defragSchedule, etcdConnectionTimeout, ssr.TriggerFullSnapshot, logrus.NewEntry(logger))

go ssr.RunGarbageCollector(ctx.Done())
if err := ssr.Run(ctx.Done(), true); err != nil {
Expand All @@ -98,11 +98,11 @@ storing snapshots on various cloud storage providers as well as local disk locat
func initializeSnapshotterFlags(cmd *cobra.Command) {
cmd.Flags().StringSliceVarP(&etcdEndpoints, "endpoints", "e", []string{"127.0.0.1:2379"}, "comma separated list of etcd endpoints")
cmd.Flags().StringVarP(&fullSnapshotSchedule, "schedule", "s", "* */1 * * *", "schedule for snapshots")
cmd.Flags().IntVarP(&deltaSnapshotIntervalSeconds, "delta-snapshot-period-seconds", "i", snapshotter.DefaultDeltaSnapshotIntervalSeconds, "Period in seconds after which delta snapshot will be persisted. If this value is set to be lesser than 1, delta snapshotting will be disabled.")
cmd.Flags().DurationVarP(&deltaSnapshotInterval, "delta-snapshot-period", "i", snapshotter.DefaultDeltaSnapshotInterval, "Period after which delta snapshot will be persisted. If this value is set to be lesser than 1 seconds, delta snapshotting will be disabled.")
cmd.Flags().IntVar(&deltaSnapshotMemoryLimit, "delta-snapshot-memory-limit", snapshotter.DefaultDeltaSnapMemoryLimit, "memory limit after which delta snapshots will be taken")
cmd.Flags().IntVarP(&maxBackups, "max-backups", "m", snapshotter.DefaultMaxBackups, "maximum number of previous backups to keep")
cmd.Flags().IntVar(&etcdConnectionTimeout, "etcd-connection-timeout", 30, "etcd client connection timeout")
cmd.Flags().IntVar(&garbageCollectionPeriodSeconds, "garbage-collection-period-seconds", 60, "Period in seconds for garbage collecting old backups")
cmd.Flags().DurationVar(&etcdConnectionTimeout, "etcd-connection-timeout", 30*time.Second, "etcd client connection timeout")
cmd.Flags().DurationVar(&garbageCollectionPeriod, "garbage-collection-period", 60*time.Second, "Period for garbage collecting old backups")
cmd.Flags().StringVar(&garbageCollectionPolicy, "garbage-collection-policy", snapshotter.GarbageCollectionPolicyExponential, "Policy for garbage collecting old backups")
cmd.Flags().BoolVar(&insecureTransport, "insecure-transport", true, "disable transport security for client connections")
cmd.Flags().BoolVar(&insecureSkipVerify, "insecure-skip-tls-verify", false, "skip server certificate verification")
Expand Down
34 changes: 18 additions & 16 deletions cmd/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package cmd

import (
"time"

"github.com/sirupsen/logrus"
)

Expand All @@ -29,22 +31,22 @@ var (
logger = logrus.New()
version bool
//snapshotter flags
fullSnapshotSchedule string
etcdEndpoints []string
etcdUsername string
etcdPassword string
deltaSnapshotIntervalSeconds int
deltaSnapshotMemoryLimit int
maxBackups int
etcdConnectionTimeout int
garbageCollectionPeriodSeconds int
garbageCollectionPolicy string
insecureTransport bool
insecureSkipVerify bool
certFile string
keyFile string
caFile string
defragmentationSchedule string
fullSnapshotSchedule string
etcdEndpoints []string
etcdUsername string
etcdPassword string
deltaSnapshotMemoryLimit int
deltaSnapshotInterval time.Duration
etcdConnectionTimeout time.Duration
garbageCollectionPeriod time.Duration
garbageCollectionPolicy string
maxBackups int
insecureTransport bool
insecureSkipVerify bool
certFile string
keyFile string
caFile string
defragmentationSchedule string

//server flags
port int
Expand Down
Loading

0 comments on commit cd7ddc0

Please sign in to comment.