Skip to content
This repository has been archived by the owner on Mar 28, 2020. It is now read-only.

Commit

Permalink
backup-operator: Support periodic backup (#1841) (#2028)
Browse files Browse the repository at this point in the history
* backup-operator: Add BackupIntervalInSecond

This commit added BackupIntervalInSecond in BackupPolicy, which perform
periodic backup as #1841 issue describe.  This commit is part of #1841.
By specifying BackupIntervalInSecond, user can let etcd-backup-operator
do periodic backup.

The specification of BackupIntervalInSecond is following
 - unit in sec
 - >0 means interval.
 - =0 means explicit disable interval backup, it will just do one time
   backup.

* backup: Add validation of BackupIntervalInSecond

This commit implement validation of BackupIntervalInSecond.
After this commit, backup-operator will make sure BackupIntervalInSecond
follow following restrictions

- <0 is not allowed, failed validation
- 0 is valid and disable periodic backup
- >0 is valid and means interval

* backup: Add LastSuccessDate

Current backup status is only designed for one-shot snapshot.
Always it show lastest results but it would be nice if we could record
the last time to successfully take a snapshot.

* backup: Add MaxBackups in BackupPolicy

This commit added MaxBackups attributs which let backup-operator delete
older snapshots if the number of snapshots exceeded than MaxBackups.

Specification of MaxBackups is following

 - <0 is not allowed, which cause validation failure
 - =0 is to indicate MaxBackups is infinite, which let not operator
   delete any exisiting snapshots
 - >0 is to indicate the max number of snapshot

* backup: Use path without adding info if one-shot

After support periodic backup, backup-operator added revision number and
date to s3 path as following <bucket name>/<object name>_v<rev>_<date>.
This behaviour has been applied even if backup is one-shot backup,
therfore this change broke exisiting behaviour.

This commit brough back original behaviour which use s3 path without
adding anything <bucket name>/<object name>, if backup is not periodic

* backup: Reset reason if backup succeeded

* backup: Update the codegen files

* backup: fix typo

* backup: Refactoring

* backup: Use meta/v1.Time instead of time.Time

After I generated the code based on k8s object
(zz_generated.deepcopy.go), we happened to be in failing to build.
This is because all k8s custom resource's fileds should implement
DeepCopyInto but time.Time we added doesn't implement it.
For this purpose we should have used meta/v1.Time which is the
implementation to implement all necessary function for k8s object
and same function of time.Time.

And also this commit include some refactoring which is pointed out
in code-review

* backup: minor fixes (naming, extra space)

* backup: Add periodically backup example

* backup: add e2e slow test for periodic backup

* Update generated k8s code license for new year(2019)

* backup: Minor fix "composite literal uses unkeyed"

* backup: Update CHANGELOG.md for periodic support

* backup: Move periodbackup test after restore test

restore test expected backup file to be present but
periodic backup test actually cleanup backup file to be created
so we failed to perform restore test because of that.

that's why we moved periodic test after restore test

* backup: Fixed the bug to get operator in infinite-loop

* backup: Only Retry in the case of transient error
  • Loading branch information
ukinau authored and hasbro17 committed Feb 18, 2019
1 parent aeb3e3e commit 4aea722
Show file tree
Hide file tree
Showing 19 changed files with 531 additions and 30 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@

### Changed

- EtcdBackup: Support periodically backup. This change added 3 new fileds in EtcdBackup schema, 2 variables is in spec, 1 varialbe is in status.
- in spec.backupPolicy
- maxBackup which indicate maximum number of backup to keep
- backupIntervalInSecond which indicate how often do backup operation.
- in status
- LastSuccessDate which indicate the last time to succeed in taking backup

### Removed

### Fixed
Expand Down
10 changes: 10 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions example/etcd-backup-operator/periodic_backup_cr.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
apiVersion: "etcd.database.coreos.com/v1beta2"
kind: "EtcdBackup"
metadata:
name: example-etcd-cluster-periodic-backup
spec:
etcdEndpoints: [<etcd-cluster-endpoints>]
storageType: S3
backupPolicy:
# 0 > enable periodic backup
backupIntervalInSecond: 125
maxBackups: 4
s3:
# The format of "path" must be: "<s3-bucket-name>/<path-to-backup-file>"
# e.g: "mybucket/etcd.backup"
path: <full-s3-path>
awsSecret: <aws-secret>
12 changes: 11 additions & 1 deletion pkg/apis/etcd/v1beta2/backup_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

package v1beta2

import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
// AWS S3 related consts
Expand Down Expand Up @@ -94,6 +96,12 @@ type BackupSource struct {
type BackupPolicy struct {
// TimeoutInSecond is the maximal allowed time in second of the entire backup process.
TimeoutInSecond int64 `json:"timeoutInSecond,omitempty"`
// BackupIntervalInSecond is to specify how often operator take snapshot
// 0 is magic number to indicate one-shot backup
BackupIntervalInSecond int64 `json:"backupIntervalInSecond,omitempty"`
// MaxBackups is to specify how many backups we want to keep
// 0 is magic number to indicate un-limited backups
MaxBackups int `json:"maxBackups,omitempty"`
}

// BackupStatus represents the status of the EtcdBackup Custom Resource.
Expand All @@ -106,6 +114,8 @@ type BackupStatus struct {
EtcdVersion string `json:"etcdVersion,omitempty"`
// EtcdRevision is the revision of etcd's KV store where the backup is performed on.
EtcdRevision int64 `json:"etcdRevision,omitempty"`
// LastSuccessDate indicate the time to get snapshot last time
LastSuccessDate metav1.Time `json:"lastSuccessDate,omitempty"`
}

// S3BackupSource provides the spec how to store backups on S3.
Expand Down
3 changes: 2 additions & 1 deletion pkg/apis/etcd/v1beta2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 33 additions & 7 deletions pkg/backup/backup_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ import (
"context"
"crypto/tls"
"fmt"
"sort"
"time"

"github.com/coreos/etcd-operator/pkg/backup/writer"
"github.com/coreos/etcd-operator/pkg/util/constants"

"github.com/coreos/etcd/clientv3"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

Expand Down Expand Up @@ -51,29 +54,52 @@ func NewBackupManagerFromWriter(kubecli kubernetes.Interface, bw writer.Writer,

// SaveSnap uses backup writer to save etcd snapshot to a specified S3 path
// and returns backup etcd server's kv store revision and its version.
func (bm *BackupManager) SaveSnap(ctx context.Context, s3Path string) (int64, string, error) {
func (bm *BackupManager) SaveSnap(ctx context.Context, s3Path string, isPeriodic bool) (int64, string, *metav1.Time, error) {
now := time.Now().UTC()
etcdcli, rev, err := bm.etcdClientWithMaxRevision(ctx)
if err != nil {
return 0, "", fmt.Errorf("create etcd client failed: %v", err)
return 0, "", nil, fmt.Errorf("create etcd client failed: %v", err)
}
defer etcdcli.Close()

resp, err := etcdcli.Status(ctx, etcdcli.Endpoints()[0])
if err != nil {
return 0, "", fmt.Errorf("failed to retrieve etcd version from the status call: %v", err)
return 0, "", nil, fmt.Errorf("failed to retrieve etcd version from the status call: %v", err)
}

rc, err := etcdcli.Snapshot(ctx)
if err != nil {
return 0, "", fmt.Errorf("failed to receive snapshot (%v)", err)
return 0, "", nil, fmt.Errorf("failed to receive snapshot (%v)", err)
}
defer rc.Close()

if isPeriodic {
s3Path = fmt.Sprintf(s3Path+"_v%d_%s", rev, now.Format("2006-01-02-15:04:05"))
}
_, err = bm.bw.Write(ctx, s3Path, rc)
if err != nil {
return 0, "", fmt.Errorf("failed to write snapshot (%v)", err)
return 0, "", nil, fmt.Errorf("failed to write snapshot (%v)", err)
}
return rev, resp.Version, &metav1.Time{Time: now}, nil
}

// EnsureMaxBackup to ensure the number of snapshot is under maxcount
// if the number of snapshot exceeded than maxcount, delete oldest snapshot
func (bm *BackupManager) EnsureMaxBackup(ctx context.Context, basePath string, maxCount int) error {
savedSnapShots, err := bm.bw.List(ctx, basePath)
if err != nil {
return fmt.Errorf("failed to get exisiting snapshots: %v", err)
}
sort.Sort(sort.Reverse(sort.StringSlice(savedSnapShots)))
for i, snapshotPath := range savedSnapShots {
if i < maxCount {
continue
}
err := bm.bw.Delete(ctx, snapshotPath)
if err != nil {
return fmt.Errorf("failed to delete snapshot: %v", err)
}
}
return rev, resp.Version, nil
return nil
}

// etcdClientWithMaxRevision gets the etcd endpoint with the maximum kv store revision
Expand Down
47 changes: 47 additions & 0 deletions pkg/backup/writer/abs_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,50 @@ func (absw *absWriter) Write(ctx context.Context, path string, r io.Reader) (int

return blob.Properties.ContentLength, nil
}

func (absw *absWriter) List(ctx context.Context, basePath string) ([]string, error) {
// TODO: support context.
container, _, err := util.ParseBucketAndKey(basePath)
if err != nil {
return nil, err
}

containerRef := absw.abs.GetContainerReference(container)
containerExists, err := containerRef.Exists()
if err != nil {
return nil, err
}
if !containerExists {
return nil, fmt.Errorf("container %v does not exist", container)
}

blobs, err := containerRef.ListBlobs(
storage.ListBlobsParameters{Prefix: basePath})
if err != nil {
return nil, err
}
blobKeys := []string{}
for _, blob := range blobs.Blobs {
blobKeys = append(blobKeys, container+"/"+blob.Name)
}
return blobKeys, nil
}

func (absw *absWriter) Delete(ctx context.Context, path string) error {
// TODO: support context.
container, key, err := util.ParseBucketAndKey(path)
if err != nil {
return err
}
containerRef := absw.abs.GetContainerReference(container)
containerExists, err := containerRef.Exists()
if err != nil {
return err
}
if !containerExists {
return fmt.Errorf("container %v does not exist", container)
}

blob := containerRef.GetBlobReference(key)
return blob.Delete(&storage.DeleteBlobOptions{})
}
35 changes: 35 additions & 0 deletions pkg/backup/writer/gcs_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"cloud.google.com/go/storage"
"github.com/sirupsen/logrus"
"google.golang.org/api/iterator"
)

var _ Writer = &gcsWriter{}
Expand Down Expand Up @@ -58,3 +59,37 @@ func (gcsw *gcsWriter) Write(ctx context.Context, path string, r io.Reader) (int
}
return n, err
}

func (gcsw *gcsWriter) List(ctx context.Context, basePath string) ([]string, error) {
bucket, key, err := util.ParseBucketAndKey(basePath)
if err != nil {
return nil, err
}
objects := gcsw.gcs.Bucket(bucket).Objects(ctx, &storage.Query{Prefix: key})
if objects == nil {
return nil, fmt.Errorf("failed to get objects having %s prefix", key)
}

objectKeys := []string{}

for {
objAttrs, err := objects.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}
objectKeys = append(objectKeys, bucket+"/"+objAttrs.Name)
}
return objectKeys, nil
}

func (gcsw *gcsWriter) Delete(ctx context.Context, path string) error {
bucket, key, err := util.ParseBucketAndKey(path)
if err != nil {
return err
}

return gcsw.gcs.Bucket(bucket).Object(key).Delete(ctx)
}
36 changes: 36 additions & 0 deletions pkg/backup/writer/s3_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,39 @@ func (s3w *s3Writer) Write(ctx context.Context, path string, r io.Reader) (int64
}
return *resp.ContentLength, nil
}

// List return the file paths which match the given s3 path
func (s3w *s3Writer) List(ctx context.Context, basePath string) ([]string, error) {
bk, key, err := util.ParseBucketAndKey(basePath)
if err != nil {
return nil, err
}

objects, err := s3w.s3.ListObjectsWithContext(ctx,
&s3.ListObjectsInput{
Bucket: aws.String(bk),
Prefix: aws.String(key),
})
if err != nil {
return nil, err
}
objectKeys := []string{}
for _, object := range objects.Contents {
objectKeys = append(objectKeys, bk+"/"+*object.Key)
}
return objectKeys, nil
}

func (s3w *s3Writer) Delete(ctx context.Context, path string) error {
bk, key, err := util.ParseBucketAndKey(path)
if err != nil {
return err
}

_, err = s3w.s3.DeleteObjectWithContext(ctx,
&s3.DeleteObjectInput{
Bucket: aws.String(bk),
Key: aws.String(key),
})
return err
}
6 changes: 6 additions & 0 deletions pkg/backup/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,10 @@ import (
type Writer interface {
// Write writes a backup file to the given path and returns size of written file.
Write(ctx context.Context, path string, r io.Reader) (int64, error)

// List backup files
List(ctx context.Context, basePath string) ([]string, error)

// Delete a backup file
Delete(ctx context.Context, path string) error
}
14 changes: 10 additions & 4 deletions pkg/controller/backup-operator/abs_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (
)

// handleABS saves etcd cluster's backup to specificed ABS path.
func handleABS(ctx context.Context, kubecli kubernetes.Interface, s *api.ABSBackupSource, endpoints []string, clientTLSSecret, namespace string) (*api.BackupStatus, error) {
func handleABS(ctx context.Context, kubecli kubernetes.Interface, s *api.ABSBackupSource, endpoints []string, clientTLSSecret,
namespace string, isPeriodic bool, maxBackup int) (*api.BackupStatus, error) {
// TODO: controls NewClientFromSecret with ctx. This depends on upstream kubernetes to support API calls with ctx.
cli, err := absfactory.NewClientFromSecret(kubecli, namespace, s.ABSSecret)
if err != nil {
Expand All @@ -39,12 +40,17 @@ func handleABS(ctx context.Context, kubecli kubernetes.Interface, s *api.ABSBack
if tlsConfig, err = generateTLSConfig(kubecli, clientTLSSecret, namespace); err != nil {
return nil, err
}

bm := backup.NewBackupManagerFromWriter(kubecli, writer.NewABSWriter(cli.ABS), tlsConfig, endpoints, namespace)

rev, etcdVersion, err := bm.SaveSnap(ctx, s.Path)
rev, etcdVersion, now, err := bm.SaveSnap(ctx, s.Path, isPeriodic)
if err != nil {
return nil, fmt.Errorf("failed to save snapshot (%v)", err)
}
return &api.BackupStatus{EtcdVersion: etcdVersion, EtcdRevision: rev}, nil
if maxBackup > 0 {
err := bm.EnsureMaxBackup(ctx, s.Path, maxBackup)
if err != nil {
return nil, fmt.Errorf("succeeded in saving snapshot but failed to delete old snapshot (%v)", err)
}
}
return &api.BackupStatus{EtcdVersion: etcdVersion, EtcdRevision: rev, LastSuccessDate: *now}, nil
}
14 changes: 10 additions & 4 deletions pkg/controller/backup-operator/gcs_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (
)

// handleGCS saves etcd cluster's backup to specificed GCS path.
func handleGCS(ctx context.Context, kubecli kubernetes.Interface, s *api.GCSBackupSource, endpoints []string, clientTLSSecret, namespace string) (*api.BackupStatus, error) {
func handleGCS(ctx context.Context, kubecli kubernetes.Interface, s *api.GCSBackupSource, endpoints []string, clientTLSSecret,
namespace string, isPeriodic bool, maxBackup int) (*api.BackupStatus, error) {
// TODO: controls NewClientFromSecret with ctx. This depends on upstream kubernetes to support API calls with ctx.
cli, err := gcsfactory.NewClientFromSecret(ctx, kubecli, namespace, s.GCPSecret)
if err != nil {
Expand All @@ -40,12 +41,17 @@ func handleGCS(ctx context.Context, kubecli kubernetes.Interface, s *api.GCSBack
if tlsConfig, err = generateTLSConfig(kubecli, clientTLSSecret, namespace); err != nil {
return nil, err
}

bm := backup.NewBackupManagerFromWriter(kubecli, writer.NewGCSWriter(cli.GCS), tlsConfig, endpoints, namespace)

rev, etcdVersion, err := bm.SaveSnap(ctx, s.Path)
rev, etcdVersion, now, err := bm.SaveSnap(ctx, s.Path, isPeriodic)
if err != nil {
return nil, fmt.Errorf("failed to save snapshot (%v)", err)
}
return &api.BackupStatus{EtcdVersion: etcdVersion, EtcdRevision: rev}, nil
if maxBackup > 0 {
err := bm.EnsureMaxBackup(ctx, s.Path, maxBackup)
if err != nil {
return nil, fmt.Errorf("succeeded in saving snapshot but failed to delete old snapshot (%v)", err)
}
}
return &api.BackupStatus{EtcdVersion: etcdVersion, EtcdRevision: rev, LastSuccessDate: *now}, nil
}
Loading

0 comments on commit 4aea722

Please sign in to comment.