From 4aea72218031b976866f3303fe614f6714a7cf98 Mon Sep 17 00:00:00 2001 From: uckey Date: Tue, 19 Feb 2019 07:27:12 +0900 Subject: [PATCH] backup-operator: Support periodic backup (#1841) (#2028) * backup-operator: Add BackupIntervalInSecond This commit added BackupIntervalInSecond in BackupPolicy, which perform periodic backup as #1841 issue describe. This commit is part of #1841. By specifying BackupIntervalInSecond, user can let etcd-backup-operator do periodic backup. The specification of BackupIntervalInSecond is following - unit in sec - >0 means interval. - =0 means explicit disable interval backup, it will just do one time backup. * backup: Add validation of BackupIntervalInSecond This commit implement validation of BackupIntervalInSecond. After this commit, backup-operator will make sure BackupIntervalInSecond follow following restrictions - <0 is not allowed, failed validation - 0 is valid and disable periodic backup - >0 is valid and means interval * backup: Add LastSuccessDate Current backup status is only designed for one-shot snapshot. Always it show lastest results but it would be nice if we could record the last time to successfully take a snapshot. * backup: Add MaxBackups in BackupPolicy This commit added MaxBackups attributs which let backup-operator delete older snapshots if the number of snapshots exceeded than MaxBackups. Specification of MaxBackups is following - <0 is not allowed, which cause validation failure - =0 is to indicate MaxBackups is infinite, which let not operator delete any exisiting snapshots - >0 is to indicate the max number of snapshot * backup: Use path without adding info if one-shot After support periodic backup, backup-operator added revision number and date to s3 path as following /_v_. This behaviour has been applied even if backup is one-shot backup, therfore this change broke exisiting behaviour. This commit brough back original behaviour which use s3 path without adding anything /, if backup is not periodic * backup: Reset reason if backup succeeded * backup: Update the codegen files * backup: fix typo * backup: Refactoring * backup: Use meta/v1.Time instead of time.Time After I generated the code based on k8s object (zz_generated.deepcopy.go), we happened to be in failing to build. This is because all k8s custom resource's fileds should implement DeepCopyInto but time.Time we added doesn't implement it. For this purpose we should have used meta/v1.Time which is the implementation to implement all necessary function for k8s object and same function of time.Time. And also this commit include some refactoring which is pointed out in code-review * backup: minor fixes (naming, extra space) * backup: Add periodically backup example * backup: add e2e slow test for periodic backup * Update generated k8s code license for new year(2019) * backup: Minor fix "composite literal uses unkeyed" * backup: Update CHANGELOG.md for periodic support * backup: Move periodbackup test after restore test restore test expected backup file to be present but periodic backup test actually cleanup backup file to be created so we failed to perform restore test because of that. that's why we moved periodic test after restore test * backup: Fixed the bug to get operator in infinite-loop * backup: Only Retry in the case of transient error --- CHANGELOG.md | 7 + Gopkg.lock | 10 ++ .../periodic_backup_cr.yaml | 16 ++ pkg/apis/etcd/v1beta2/backup_types.go | 12 +- .../etcd/v1beta2/zz_generated.deepcopy.go | 3 +- pkg/backup/backup_manager.go | 40 ++++- pkg/backup/writer/abs_writer.go | 47 +++++ pkg/backup/writer/gcs_writer.go | 35 ++++ pkg/backup/writer/s3_writer.go | 36 ++++ pkg/backup/writer/writer.go | 6 + pkg/controller/backup-operator/abs_backup.go | 14 +- pkg/controller/backup-operator/gcs_backup.go | 14 +- pkg/controller/backup-operator/operator.go | 8 + pkg/controller/backup-operator/s3_backup.go | 14 +- pkg/controller/backup-operator/sync.go | 166 +++++++++++++++++- pkg/controller/backup-operator/util.go | 8 + test/e2e/e2eslow/backup_restore_test.go | 99 +++++++++++ test/e2e/e2eutil/util.go | 11 ++ test/e2e/e2eutil/wait_util.go | 15 ++ 19 files changed, 531 insertions(+), 30 deletions(-) create mode 100644 example/etcd-backup-operator/periodic_backup_cr.yaml diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d9a7f34a..d64337116 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,13 @@ ### Changed +- EtcdBackup: Support periodically backup. This change added 3 new fileds in EtcdBackup schema, 2 variables is in spec, 1 varialbe is in status. + - in spec.backupPolicy + - maxBackup which indicate maximum number of backup to keep + - backupIntervalInSecond which indicate how often do backup operation. + - in status + - LastSuccessDate which indicate the last time to succeed in taking backup + ### Removed ### Fixed diff --git a/Gopkg.lock b/Gopkg.lock index 933a1b1f0..1578e35b3 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -974,6 +974,14 @@ pruneopts = "NT" revision = "0317810137be915b9cf888946c6e115c1bfac693" +[[projects]] + digest = "1:6209fb6d1debd999cd64e943b14314a3ce665424d05369b297096df3992ce688" + name = "k8s.io/kubernetes" + packages = ["pkg/util/slice"] + pruneopts = "NT" + revision = "eec55b9ba98609a46fee712359c7b5b365bdd920" + version = "v1.13.1" + [solve-meta] analyzer-name = "dep" analyzer-version = 1 @@ -1001,6 +1009,7 @@ "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1", "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset", "k8s.io/apimachinery/pkg/api/errors", + "k8s.io/apimachinery/pkg/api/meta", "k8s.io/apimachinery/pkg/api/resource", "k8s.io/apimachinery/pkg/apis/meta/v1", "k8s.io/apimachinery/pkg/fields", @@ -1038,6 +1047,7 @@ "k8s.io/code-generator/cmd/lister-gen", "k8s.io/code-generator/cmd/openapi-gen", "k8s.io/gengo/args", + "k8s.io/kubernetes/pkg/util/slice", ] solver-name = "gps-cdcl" solver-version = 1 diff --git a/example/etcd-backup-operator/periodic_backup_cr.yaml b/example/etcd-backup-operator/periodic_backup_cr.yaml new file mode 100644 index 000000000..d5e9a3dc0 --- /dev/null +++ b/example/etcd-backup-operator/periodic_backup_cr.yaml @@ -0,0 +1,16 @@ +apiVersion: "etcd.database.coreos.com/v1beta2" +kind: "EtcdBackup" +metadata: + name: example-etcd-cluster-periodic-backup +spec: + etcdEndpoints: [] + storageType: S3 + backupPolicy: + # 0 > enable periodic backup + backupIntervalInSecond: 125 + maxBackups: 4 + s3: + # The format of "path" must be: "/" + # e.g: "mybucket/etcd.backup" + path: + awsSecret: diff --git a/pkg/apis/etcd/v1beta2/backup_types.go b/pkg/apis/etcd/v1beta2/backup_types.go index 246ac8c60..f9c977487 100644 --- a/pkg/apis/etcd/v1beta2/backup_types.go +++ b/pkg/apis/etcd/v1beta2/backup_types.go @@ -14,7 +14,9 @@ package v1beta2 -import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) const ( // AWS S3 related consts @@ -94,6 +96,12 @@ type BackupSource struct { type BackupPolicy struct { // TimeoutInSecond is the maximal allowed time in second of the entire backup process. TimeoutInSecond int64 `json:"timeoutInSecond,omitempty"` + // BackupIntervalInSecond is to specify how often operator take snapshot + // 0 is magic number to indicate one-shot backup + BackupIntervalInSecond int64 `json:"backupIntervalInSecond,omitempty"` + // MaxBackups is to specify how many backups we want to keep + // 0 is magic number to indicate un-limited backups + MaxBackups int `json:"maxBackups,omitempty"` } // BackupStatus represents the status of the EtcdBackup Custom Resource. @@ -106,6 +114,8 @@ type BackupStatus struct { EtcdVersion string `json:"etcdVersion,omitempty"` // EtcdRevision is the revision of etcd's KV store where the backup is performed on. EtcdRevision int64 `json:"etcdRevision,omitempty"` + // LastSuccessDate indicate the time to get snapshot last time + LastSuccessDate metav1.Time `json:"lastSuccessDate,omitempty"` } // S3BackupSource provides the spec how to store backups on S3. diff --git a/pkg/apis/etcd/v1beta2/zz_generated.deepcopy.go b/pkg/apis/etcd/v1beta2/zz_generated.deepcopy.go index a6b031465..c4ff202e8 100644 --- a/pkg/apis/etcd/v1beta2/zz_generated.deepcopy.go +++ b/pkg/apis/etcd/v1beta2/zz_generated.deepcopy.go @@ -134,6 +134,7 @@ func (in *BackupSpec) DeepCopy() *BackupSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *BackupStatus) DeepCopyInto(out *BackupStatus) { *out = *in + in.LastSuccessDate.DeepCopyInto(&out.LastSuccessDate) return } @@ -217,7 +218,7 @@ func (in *EtcdBackup) DeepCopyInto(out *EtcdBackup) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) - out.Status = in.Status + in.Status.DeepCopyInto(&out.Status) return } diff --git a/pkg/backup/backup_manager.go b/pkg/backup/backup_manager.go index e5e6350a9..a82ae32eb 100644 --- a/pkg/backup/backup_manager.go +++ b/pkg/backup/backup_manager.go @@ -18,12 +18,15 @@ import ( "context" "crypto/tls" "fmt" + "sort" + "time" "github.com/coreos/etcd-operator/pkg/backup/writer" "github.com/coreos/etcd-operator/pkg/util/constants" "github.com/coreos/etcd/clientv3" "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" ) @@ -51,29 +54,52 @@ func NewBackupManagerFromWriter(kubecli kubernetes.Interface, bw writer.Writer, // SaveSnap uses backup writer to save etcd snapshot to a specified S3 path // and returns backup etcd server's kv store revision and its version. -func (bm *BackupManager) SaveSnap(ctx context.Context, s3Path string) (int64, string, error) { +func (bm *BackupManager) SaveSnap(ctx context.Context, s3Path string, isPeriodic bool) (int64, string, *metav1.Time, error) { + now := time.Now().UTC() etcdcli, rev, err := bm.etcdClientWithMaxRevision(ctx) if err != nil { - return 0, "", fmt.Errorf("create etcd client failed: %v", err) + return 0, "", nil, fmt.Errorf("create etcd client failed: %v", err) } defer etcdcli.Close() resp, err := etcdcli.Status(ctx, etcdcli.Endpoints()[0]) if err != nil { - return 0, "", fmt.Errorf("failed to retrieve etcd version from the status call: %v", err) + return 0, "", nil, fmt.Errorf("failed to retrieve etcd version from the status call: %v", err) } rc, err := etcdcli.Snapshot(ctx) if err != nil { - return 0, "", fmt.Errorf("failed to receive snapshot (%v)", err) + return 0, "", nil, fmt.Errorf("failed to receive snapshot (%v)", err) } defer rc.Close() - + if isPeriodic { + s3Path = fmt.Sprintf(s3Path+"_v%d_%s", rev, now.Format("2006-01-02-15:04:05")) + } _, err = bm.bw.Write(ctx, s3Path, rc) if err != nil { - return 0, "", fmt.Errorf("failed to write snapshot (%v)", err) + return 0, "", nil, fmt.Errorf("failed to write snapshot (%v)", err) + } + return rev, resp.Version, &metav1.Time{Time: now}, nil +} + +// EnsureMaxBackup to ensure the number of snapshot is under maxcount +// if the number of snapshot exceeded than maxcount, delete oldest snapshot +func (bm *BackupManager) EnsureMaxBackup(ctx context.Context, basePath string, maxCount int) error { + savedSnapShots, err := bm.bw.List(ctx, basePath) + if err != nil { + return fmt.Errorf("failed to get exisiting snapshots: %v", err) + } + sort.Sort(sort.Reverse(sort.StringSlice(savedSnapShots))) + for i, snapshotPath := range savedSnapShots { + if i < maxCount { + continue + } + err := bm.bw.Delete(ctx, snapshotPath) + if err != nil { + return fmt.Errorf("failed to delete snapshot: %v", err) + } } - return rev, resp.Version, nil + return nil } // etcdClientWithMaxRevision gets the etcd endpoint with the maximum kv store revision diff --git a/pkg/backup/writer/abs_writer.go b/pkg/backup/writer/abs_writer.go index d7fc1fd08..743e5b758 100644 --- a/pkg/backup/writer/abs_writer.go +++ b/pkg/backup/writer/abs_writer.go @@ -100,3 +100,50 @@ func (absw *absWriter) Write(ctx context.Context, path string, r io.Reader) (int return blob.Properties.ContentLength, nil } + +func (absw *absWriter) List(ctx context.Context, basePath string) ([]string, error) { + // TODO: support context. + container, _, err := util.ParseBucketAndKey(basePath) + if err != nil { + return nil, err + } + + containerRef := absw.abs.GetContainerReference(container) + containerExists, err := containerRef.Exists() + if err != nil { + return nil, err + } + if !containerExists { + return nil, fmt.Errorf("container %v does not exist", container) + } + + blobs, err := containerRef.ListBlobs( + storage.ListBlobsParameters{Prefix: basePath}) + if err != nil { + return nil, err + } + blobKeys := []string{} + for _, blob := range blobs.Blobs { + blobKeys = append(blobKeys, container+"/"+blob.Name) + } + return blobKeys, nil +} + +func (absw *absWriter) Delete(ctx context.Context, path string) error { + // TODO: support context. + container, key, err := util.ParseBucketAndKey(path) + if err != nil { + return err + } + containerRef := absw.abs.GetContainerReference(container) + containerExists, err := containerRef.Exists() + if err != nil { + return err + } + if !containerExists { + return fmt.Errorf("container %v does not exist", container) + } + + blob := containerRef.GetBlobReference(key) + return blob.Delete(&storage.DeleteBlobOptions{}) +} diff --git a/pkg/backup/writer/gcs_writer.go b/pkg/backup/writer/gcs_writer.go index eda6142f9..61b6cd056 100644 --- a/pkg/backup/writer/gcs_writer.go +++ b/pkg/backup/writer/gcs_writer.go @@ -23,6 +23,7 @@ import ( "cloud.google.com/go/storage" "github.com/sirupsen/logrus" + "google.golang.org/api/iterator" ) var _ Writer = &gcsWriter{} @@ -58,3 +59,37 @@ func (gcsw *gcsWriter) Write(ctx context.Context, path string, r io.Reader) (int } return n, err } + +func (gcsw *gcsWriter) List(ctx context.Context, basePath string) ([]string, error) { + bucket, key, err := util.ParseBucketAndKey(basePath) + if err != nil { + return nil, err + } + objects := gcsw.gcs.Bucket(bucket).Objects(ctx, &storage.Query{Prefix: key}) + if objects == nil { + return nil, fmt.Errorf("failed to get objects having %s prefix", key) + } + + objectKeys := []string{} + + for { + objAttrs, err := objects.Next() + if err == iterator.Done { + break + } + if err != nil { + return nil, err + } + objectKeys = append(objectKeys, bucket+"/"+objAttrs.Name) + } + return objectKeys, nil +} + +func (gcsw *gcsWriter) Delete(ctx context.Context, path string) error { + bucket, key, err := util.ParseBucketAndKey(path) + if err != nil { + return err + } + + return gcsw.gcs.Bucket(bucket).Object(key).Delete(ctx) +} diff --git a/pkg/backup/writer/s3_writer.go b/pkg/backup/writer/s3_writer.go index 9ffe37f42..623180e36 100644 --- a/pkg/backup/writer/s3_writer.go +++ b/pkg/backup/writer/s3_writer.go @@ -67,3 +67,39 @@ func (s3w *s3Writer) Write(ctx context.Context, path string, r io.Reader) (int64 } return *resp.ContentLength, nil } + +// List return the file paths which match the given s3 path +func (s3w *s3Writer) List(ctx context.Context, basePath string) ([]string, error) { + bk, key, err := util.ParseBucketAndKey(basePath) + if err != nil { + return nil, err + } + + objects, err := s3w.s3.ListObjectsWithContext(ctx, + &s3.ListObjectsInput{ + Bucket: aws.String(bk), + Prefix: aws.String(key), + }) + if err != nil { + return nil, err + } + objectKeys := []string{} + for _, object := range objects.Contents { + objectKeys = append(objectKeys, bk+"/"+*object.Key) + } + return objectKeys, nil +} + +func (s3w *s3Writer) Delete(ctx context.Context, path string) error { + bk, key, err := util.ParseBucketAndKey(path) + if err != nil { + return err + } + + _, err = s3w.s3.DeleteObjectWithContext(ctx, + &s3.DeleteObjectInput{ + Bucket: aws.String(bk), + Key: aws.String(key), + }) + return err +} diff --git a/pkg/backup/writer/writer.go b/pkg/backup/writer/writer.go index 37322710e..e1b9762b4 100644 --- a/pkg/backup/writer/writer.go +++ b/pkg/backup/writer/writer.go @@ -23,4 +23,10 @@ import ( type Writer interface { // Write writes a backup file to the given path and returns size of written file. Write(ctx context.Context, path string, r io.Reader) (int64, error) + + // List backup files + List(ctx context.Context, basePath string) ([]string, error) + + // Delete a backup file + Delete(ctx context.Context, path string) error } diff --git a/pkg/controller/backup-operator/abs_backup.go b/pkg/controller/backup-operator/abs_backup.go index d38af1cc8..c1c9c786d 100644 --- a/pkg/controller/backup-operator/abs_backup.go +++ b/pkg/controller/backup-operator/abs_backup.go @@ -28,7 +28,8 @@ import ( ) // handleABS saves etcd cluster's backup to specificed ABS path. -func handleABS(ctx context.Context, kubecli kubernetes.Interface, s *api.ABSBackupSource, endpoints []string, clientTLSSecret, namespace string) (*api.BackupStatus, error) { +func handleABS(ctx context.Context, kubecli kubernetes.Interface, s *api.ABSBackupSource, endpoints []string, clientTLSSecret, + namespace string, isPeriodic bool, maxBackup int) (*api.BackupStatus, error) { // TODO: controls NewClientFromSecret with ctx. This depends on upstream kubernetes to support API calls with ctx. cli, err := absfactory.NewClientFromSecret(kubecli, namespace, s.ABSSecret) if err != nil { @@ -39,12 +40,17 @@ func handleABS(ctx context.Context, kubecli kubernetes.Interface, s *api.ABSBack if tlsConfig, err = generateTLSConfig(kubecli, clientTLSSecret, namespace); err != nil { return nil, err } - bm := backup.NewBackupManagerFromWriter(kubecli, writer.NewABSWriter(cli.ABS), tlsConfig, endpoints, namespace) - rev, etcdVersion, err := bm.SaveSnap(ctx, s.Path) + rev, etcdVersion, now, err := bm.SaveSnap(ctx, s.Path, isPeriodic) if err != nil { return nil, fmt.Errorf("failed to save snapshot (%v)", err) } - return &api.BackupStatus{EtcdVersion: etcdVersion, EtcdRevision: rev}, nil + if maxBackup > 0 { + err := bm.EnsureMaxBackup(ctx, s.Path, maxBackup) + if err != nil { + return nil, fmt.Errorf("succeeded in saving snapshot but failed to delete old snapshot (%v)", err) + } + } + return &api.BackupStatus{EtcdVersion: etcdVersion, EtcdRevision: rev, LastSuccessDate: *now}, nil } diff --git a/pkg/controller/backup-operator/gcs_backup.go b/pkg/controller/backup-operator/gcs_backup.go index 5a882d116..1f1f5a921 100644 --- a/pkg/controller/backup-operator/gcs_backup.go +++ b/pkg/controller/backup-operator/gcs_backup.go @@ -28,7 +28,8 @@ import ( ) // handleGCS saves etcd cluster's backup to specificed GCS path. -func handleGCS(ctx context.Context, kubecli kubernetes.Interface, s *api.GCSBackupSource, endpoints []string, clientTLSSecret, namespace string) (*api.BackupStatus, error) { +func handleGCS(ctx context.Context, kubecli kubernetes.Interface, s *api.GCSBackupSource, endpoints []string, clientTLSSecret, + namespace string, isPeriodic bool, maxBackup int) (*api.BackupStatus, error) { // TODO: controls NewClientFromSecret with ctx. This depends on upstream kubernetes to support API calls with ctx. cli, err := gcsfactory.NewClientFromSecret(ctx, kubecli, namespace, s.GCPSecret) if err != nil { @@ -40,12 +41,17 @@ func handleGCS(ctx context.Context, kubecli kubernetes.Interface, s *api.GCSBack if tlsConfig, err = generateTLSConfig(kubecli, clientTLSSecret, namespace); err != nil { return nil, err } - bm := backup.NewBackupManagerFromWriter(kubecli, writer.NewGCSWriter(cli.GCS), tlsConfig, endpoints, namespace) - rev, etcdVersion, err := bm.SaveSnap(ctx, s.Path) + rev, etcdVersion, now, err := bm.SaveSnap(ctx, s.Path, isPeriodic) if err != nil { return nil, fmt.Errorf("failed to save snapshot (%v)", err) } - return &api.BackupStatus{EtcdVersion: etcdVersion, EtcdRevision: rev}, nil + if maxBackup > 0 { + err := bm.EnsureMaxBackup(ctx, s.Path, maxBackup) + if err != nil { + return nil, fmt.Errorf("succeeded in saving snapshot but failed to delete old snapshot (%v)", err) + } + } + return &api.BackupStatus{EtcdVersion: etcdVersion, EtcdRevision: rev, LastSuccessDate: *now}, nil } diff --git a/pkg/controller/backup-operator/operator.go b/pkg/controller/backup-operator/operator.go index 1a4476160..6c34c25b7 100644 --- a/pkg/controller/backup-operator/operator.go +++ b/pkg/controller/backup-operator/operator.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "os" + "sync" api "github.com/coreos/etcd-operator/pkg/apis/etcd/v1beta2" "github.com/coreos/etcd-operator/pkg/client" @@ -45,9 +46,16 @@ type Backup struct { backupCRCli versioned.Interface kubeExtCli apiextensionsclient.Interface + backupRunnerStore sync.Map + createCRD bool } +type BackupRunner struct { + spec api.BackupSpec + cancelFunc context.CancelFunc +} + // New creates a backup operator. func New(createCRD bool) *Backup { return &Backup{ diff --git a/pkg/controller/backup-operator/s3_backup.go b/pkg/controller/backup-operator/s3_backup.go index cca3f70be..075ddfb65 100644 --- a/pkg/controller/backup-operator/s3_backup.go +++ b/pkg/controller/backup-operator/s3_backup.go @@ -29,7 +29,8 @@ import ( // TODO: replace this with generic backend interface for other options (PV, Azure) // handleS3 saves etcd cluster's backup to specificed S3 path. -func handleS3(ctx context.Context, kubecli kubernetes.Interface, s *api.S3BackupSource, endpoints []string, clientTLSSecret, namespace string) (*api.BackupStatus, error) { +func handleS3(ctx context.Context, kubecli kubernetes.Interface, s *api.S3BackupSource, endpoints []string, clientTLSSecret, + namespace string, isPeriodic bool, maxBackup int) (*api.BackupStatus, error) { // TODO: controls NewClientFromSecret with ctx. This depends on upstream kubernetes to support API calls with ctx. cli, err := s3factory.NewClientFromSecret(kubecli, namespace, s.Endpoint, s.AWSSecret, s.ForcePathStyle) if err != nil { @@ -41,12 +42,17 @@ func handleS3(ctx context.Context, kubecli kubernetes.Interface, s *api.S3Backup if tlsConfig, err = generateTLSConfig(kubecli, clientTLSSecret, namespace); err != nil { return nil, err } - bm := backup.NewBackupManagerFromWriter(kubecli, writer.NewS3Writer(cli.S3), tlsConfig, endpoints, namespace) - rev, etcdVersion, err := bm.SaveSnap(ctx, s.Path) + rev, etcdVersion, now, err := bm.SaveSnap(ctx, s.Path, isPeriodic) if err != nil { return nil, fmt.Errorf("failed to save snapshot (%v)", err) } - return &api.BackupStatus{EtcdVersion: etcdVersion, EtcdRevision: rev}, nil + if maxBackup > 0 { + err := bm.EnsureMaxBackup(ctx, s.Path, maxBackup) + if err != nil { + return nil, fmt.Errorf("succeeded in saving snapshot but failed to delete old snapshot (%v)", err) + } + } + return &api.BackupStatus{EtcdVersion: etcdVersion, EtcdRevision: rev, LastSuccessDate: *now}, nil } diff --git a/pkg/controller/backup-operator/sync.go b/pkg/controller/backup-operator/sync.go index abc4b4caf..5decdbf37 100644 --- a/pkg/controller/backup-operator/sync.go +++ b/pkg/controller/backup-operator/sync.go @@ -17,12 +17,18 @@ package controller import ( "context" "errors" + "reflect" "time" api "github.com/coreos/etcd-operator/pkg/apis/etcd/v1beta2" "github.com/coreos/etcd-operator/pkg/util/constants" "github.com/sirupsen/logrus" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/util/slice" ) const ( @@ -66,25 +72,148 @@ func (b *Backup) processItem(key string) error { } eb := obj.(*api.EtcdBackup) + + if eb.DeletionTimestamp != nil { + b.deletePeriodicBackupRunner(eb.ObjectMeta.UID) + return b.removeFinalizerOfPeriodicBackup(eb) + } + isPeriodic := isPeriodicBackup(&eb.Spec) + // don't process the CR if it has a status since // having a status means that the backup is either made or failed. - if eb.Status.Succeeded || len(eb.Status.Reason) != 0 { + if !isPeriodic && + (eb.Status.Succeeded || len(eb.Status.Reason) != 0) { return nil } - bs, err := b.handleBackup(&eb.Spec) - // Report backup status - b.reportBackupStatus(bs, err, eb) + + if isPeriodic && b.isChanged(eb) { + // Stop previous backup runner if it exists + b.deletePeriodicBackupRunner(eb.ObjectMeta.UID) + + // Add finalizer if need + eb, err = b.addFinalizerOfPeriodicBackupIfNeed(eb) + if err != nil { + return err + } + + // Run new backup runner + ticker := time.NewTicker( + time.Duration(eb.Spec.BackupPolicy.BackupIntervalInSecond) * time.Second) + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + go b.periodicRunnerFunc(ctx, ticker, eb) + + // Store cancel function for periodic + b.backupRunnerStore.Store(eb.ObjectMeta.UID, BackupRunner{eb.Spec, cancel}) + + } else if !isPeriodic { + // Perform backup + bs, err := b.handleBackup(nil, &eb.Spec, false) + // Report backup status + b.reportBackupStatus(bs, err, eb) + } + return err +} + +func (b *Backup) isChanged(eb *api.EtcdBackup) bool { + backupRunner, exists := b.backupRunnerStore.Load(eb.ObjectMeta.UID) + if !exists { + return true + } + return !reflect.DeepEqual(eb.Spec, backupRunner.(BackupRunner).spec) +} + +func (b *Backup) deletePeriodicBackupRunner(uid types.UID) bool { + backupRunner, exists := b.backupRunnerStore.Load(uid) + if exists { + backupRunner.(BackupRunner).cancelFunc() + b.backupRunnerStore.Delete(uid) + return true + } + return false +} + +func (b *Backup) addFinalizerOfPeriodicBackupIfNeed(eb *api.EtcdBackup) (*api.EtcdBackup, error) { + ebNew := eb.DeepCopyObject() + metadata, err := meta.Accessor(ebNew) + if err != nil { + return eb, err + } + if !slice.ContainsString(metadata.GetFinalizers(), "backup-operator-periodic", nil) { + metadata.SetFinalizers(append(metadata.GetFinalizers(), "backup-operator-periodic")) + _, err := b.backupCRCli.EtcdV1beta2().EtcdBackups(b.namespace).Update(ebNew.(*api.EtcdBackup)) + if err != nil { + return eb, err + } + return ebNew.(*api.EtcdBackup), nil + } + return eb, nil +} + +func (b *Backup) removeFinalizerOfPeriodicBackup(eb *api.EtcdBackup) error { + ebNew := eb.DeepCopyObject() + metadata, err := meta.Accessor(ebNew) + if err != nil { + return err + } + var finalizers []string + for _, finalizer := range metadata.GetFinalizers() { + if finalizer == "backup-operator-periodic" { + continue + } + finalizers = append(finalizers, finalizer) + } + metadata.SetFinalizers(finalizers) + _, err = b.backupCRCli.EtcdV1beta2().EtcdBackups(b.namespace).Update(ebNew.(*api.EtcdBackup)) return err } +func (b *Backup) periodicRunnerFunc(ctx context.Context, t *time.Ticker, eb *api.EtcdBackup) { + defer t.Stop() + for { + select { + case <-ctx.Done(): + break + case <-t.C: + var latestEb *api.EtcdBackup + var bs *api.BackupStatus + var err error + retryLimit := 5 + for i := 1; i < retryLimit+1; i++ { + latestEb, err = b.backupCRCli.EtcdV1beta2().EtcdBackups(b.namespace).Get(eb.Name, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + b.logger.Infof("Could not find EtcdBackup. Stopping periodic backup for EtcdBackup CR %v", + eb.Name) + break + } + b.logger.Warningf("[Attempt: %d/%d] Failed to get latest EtcdBackup %v : (%v)", + i, retryLimit, eb.Name, err) + time.Sleep(1) + continue + } + break + } + if err == nil { + // Perform backup + bs, err = b.handleBackup(&ctx, &latestEb.Spec, true) + } + // Report backup status + b.reportBackupStatus(bs, err, latestEb) + } + } +} + func (b *Backup) reportBackupStatus(bs *api.BackupStatus, berr error, eb *api.EtcdBackup) { if berr != nil { eb.Status.Succeeded = false eb.Status.Reason = berr.Error() } else { + eb.Status.Reason = "" eb.Status.Succeeded = true eb.Status.EtcdRevision = bs.EtcdRevision eb.Status.EtcdVersion = bs.EtcdVersion + eb.Status.LastSuccessDate = bs.LastSuccessDate } _, err := b.backupCRCli.EtcdV1beta2().EtcdBackups(b.namespace).Update(eb) if err != nil { @@ -116,7 +245,7 @@ func (b *Backup) handleErr(err error, key interface{}) { b.logger.Infof("Dropping etcd backup (%v) out of the queue: %v", key, err) } -func (b *Backup) handleBackup(spec *api.BackupSpec) (*api.BackupStatus, error) { +func (b *Backup) handleBackup(parentContext *context.Context, spec *api.BackupSpec, isPeriodic bool) (*api.BackupStatus, error) { err := validate(spec) if err != nil { return nil, err @@ -127,24 +256,35 @@ func (b *Backup) handleBackup(spec *api.BackupSpec) (*api.BackupStatus, error) { if spec.BackupPolicy != nil && spec.BackupPolicy.TimeoutInSecond > 0 { backupTimeout = time.Duration(spec.BackupPolicy.TimeoutInSecond) * time.Second } + backupMaxCount := 0 + if spec.BackupPolicy != nil && spec.BackupPolicy.MaxBackups > 0 { + backupMaxCount = spec.BackupPolicy.MaxBackups + } - ctx, cancel := context.WithTimeout(context.Background(), backupTimeout) + if parentContext == nil { + tmpParent := context.Background() + parentContext = &tmpParent + } + ctx, cancel := context.WithTimeout(*parentContext, backupTimeout) defer cancel() switch spec.StorageType { case api.BackupStorageTypeS3: - bs, err := handleS3(ctx, b.kubecli, spec.S3, spec.EtcdEndpoints, spec.ClientTLSSecret, b.namespace) + bs, err := handleS3(ctx, b.kubecli, spec.S3, spec.EtcdEndpoints, spec.ClientTLSSecret, + b.namespace, isPeriodic, backupMaxCount) if err != nil { return nil, err } return bs, nil case api.BackupStorageTypeABS: - bs, err := handleABS(ctx, b.kubecli, spec.ABS, spec.EtcdEndpoints, spec.ClientTLSSecret, b.namespace) + bs, err := handleABS(ctx, b.kubecli, spec.ABS, spec.EtcdEndpoints, spec.ClientTLSSecret, + b.namespace, isPeriodic, backupMaxCount) if err != nil { return nil, err } return bs, nil case api.BackupStorageTypeGCS: - bs, err := handleGCS(ctx, b.kubecli, spec.GCS, spec.EtcdEndpoints, spec.ClientTLSSecret, b.namespace) + bs, err := handleGCS(ctx, b.kubecli, spec.GCS, spec.EtcdEndpoints, spec.ClientTLSSecret, + b.namespace, isPeriodic, backupMaxCount) if err != nil { return nil, err } @@ -160,5 +300,13 @@ func validate(spec *api.BackupSpec) error { if len(spec.EtcdEndpoints) == 0 { return errors.New("spec.etcdEndpoints should not be empty") } + if spec.BackupPolicy != nil { + if spec.BackupPolicy.BackupIntervalInSecond < 0 { + return errors.New("spec.BackupPolicy.BackupIntervalInSecond should not be lower than 0") + } + if spec.BackupPolicy.MaxBackups < 0 { + return errors.New("spec.BackupPolicy.MaxBackups should not be lower than 0") + } + } return nil } diff --git a/pkg/controller/backup-operator/util.go b/pkg/controller/backup-operator/util.go index 5cceba083..22efa5fba 100644 --- a/pkg/controller/backup-operator/util.go +++ b/pkg/controller/backup-operator/util.go @@ -18,6 +18,7 @@ import ( "crypto/tls" "fmt" + api "github.com/coreos/etcd-operator/pkg/apis/etcd/v1beta2" "github.com/coreos/etcd-operator/pkg/util/etcdutil" "github.com/coreos/etcd-operator/pkg/util/k8sutil" @@ -38,3 +39,10 @@ func generateTLSConfig(kubecli kubernetes.Interface, clientTLSSecret, namespace } return tlsConfig, nil } + +func isPeriodicBackup(ebSpec *api.BackupSpec) bool { + if ebSpec.BackupPolicy != nil { + return ebSpec.BackupPolicy.BackupIntervalInSecond != 0 + } + return false +} diff --git a/test/e2e/e2eslow/backup_restore_test.go b/test/e2e/e2eslow/backup_restore_test.go index ddb149553..deff374ea 100644 --- a/test/e2e/e2eslow/backup_restore_test.go +++ b/test/e2e/e2eslow/backup_restore_test.go @@ -15,15 +15,20 @@ package e2eslow import ( + "context" "errors" "fmt" "math/rand" "os" "path" + "sort" + "strings" "testing" "time" api "github.com/coreos/etcd-operator/pkg/apis/etcd/v1beta2" + "github.com/coreos/etcd-operator/pkg/backup/writer" + "github.com/coreos/etcd-operator/pkg/util/awsutil/s3factory" "github.com/coreos/etcd-operator/pkg/util/etcdutil" "github.com/coreos/etcd-operator/pkg/util/k8sutil" "github.com/coreos/etcd-operator/pkg/util/retryutil" @@ -77,8 +82,11 @@ func TestBackupAndRestore(t *testing.T) { s3Path := path.Join(os.Getenv("TEST_S3_BUCKET"), "jenkins", suffix, time.Now().Format(time.RFC3339), "etcd.backup") + // Backup then restore tests testEtcdBackupOperatorForS3Backup(t, clusterName, operatorClientTLSSecret, s3Path) testEtcdRestoreOperatorForS3Source(t, clusterName, s3Path) + // Periodic backup test + testEtcdBackupOperatorForPeriodicS3Backup(t, clusterName, operatorClientTLSSecret, s3Path) } func verifyAWSEnvVars() error { @@ -166,6 +174,97 @@ func testEtcdBackupOperatorForS3Backup(t *testing.T, clusterName, operatorClient t.Logf("backup for cluster (%s) has been saved", clusterName) } +// testEtcdBackupOperatorForPeriodicS3Backup test if etcd backup operator can periodically backup and upload to s3. +// This e2e test would check basic function of periodic backup and MaxBackup functionality +func testEtcdBackupOperatorForPeriodicS3Backup(t *testing.T, clusterName, operatorClientTLSSecret, s3Path string) { + f := framework.Global + + endpoints, err := getEndpoints(f.KubeClient, true, f.Namespace, clusterName) + if err != nil { + t.Fatalf("failed to get endpoints: %v", err) + } + backupCR := e2eutil.NewS3Backup(endpoints, clusterName, s3Path, os.Getenv("TEST_AWS_SECRET"), operatorClientTLSSecret) + // enable periodic backup + backupCR.Spec.BackupPolicy = &api.BackupPolicy{BackupIntervalInSecond: 5, MaxBackups: 2} + backupS3Source := backupCR.Spec.BackupSource.S3 + + // initialize s3 client + s3cli, err := s3factory.NewClientFromSecret( + f.KubeClient, f.Namespace, backupS3Source.Endpoint, backupS3Source.AWSSecret) + if err != nil { + t.Fatalf("failed to initialize s3client: %v", err) + } + wr := writer.NewS3Writer(s3cli.S3) + + // check if there is existing backup file + allBackups, err := wr.List(context.Background(), backupS3Source.Path) + if err != nil { + t.Fatalf("failed to list backup files: %v", err) + } + if len(allBackups) > 0 { + t.Logf("existing backup file is detected: %s", strings.Join(allBackups, ",")) + // try to delete all existing backup files + if err := e2eutil.DeleteBackupFiles(wr, allBackups); err != nil { + t.Fatalf("failed to delete existing backup: %v", err) + } + // make sure no exisiting backups + // will wait for 10 sec until deleting operation completed + if err := e2eutil.WaitUntilNoBackupFiles(wr, backupS3Source.Path, 10); err != nil { + t.Fatalf("failed to make sure no old backup: %v", err) + } + } + + // create etcdbackup resource + eb, err := f.CRClient.EtcdV1beta2().EtcdBackups(f.Namespace).Create(backupCR) + if err != nil { + t.Fatalf("failed to create etcd back cr: %v", err) + } + defer func() { + if err := f.CRClient.EtcdV1beta2().EtcdBackups(f.Namespace).Delete(eb.Name, nil); err != nil { + t.Fatalf("failed to delete etcd backup cr: %v", err) + } + // cleanup backup files + allBackups, err = wr.List(context.Background(), backupS3Source.Path) + if err != nil { + t.Fatalf("failed to list backup files: %v", err) + } + if err := e2eutil.DeleteBackupFiles(wr, allBackups); err != nil { + t.Fatalf("failed to cleanup backup files: %v", err) + } + }() + + var firstBackup string + var periodicBackup, maxBackup bool + // Check if periodic backup is correctly performed + // Check if maxBackup is correctly performed + err = retryutil.Retry(time.Second, 20, func() (bool, error) { + allBackups, err = wr.List(context.Background(), backupS3Source.Path) + sort.Strings(allBackups) + if err != nil { + return false, fmt.Errorf("failed to list backup files: %v", err) + } + if len(allBackups) > 0 { + if firstBackup == "" { + firstBackup = allBackups[0] + } + // Check if firt seen backup file is deleted or not + if firstBackup != allBackups[0] { + maxBackup = true + } + if len(allBackups) > 1 { + periodicBackup = true + } + } + if periodicBackup && maxBackup { + return true, nil + } + return false, nil + }) + if err != nil { + t.Fatalf("failed to verify periodic bakcup: %v", err) + } +} + // testEtcdRestoreOperatorForS3Source tests if the restore-operator can restore an etcd cluster from an S3 restore source func testEtcdRestoreOperatorForS3Source(t *testing.T, clusterName, s3Path string) { f := framework.Global diff --git a/test/e2e/e2eutil/util.go b/test/e2e/e2eutil/util.go index 3e2060199..ea1def9a4 100644 --- a/test/e2e/e2eutil/util.go +++ b/test/e2e/e2eutil/util.go @@ -16,10 +16,12 @@ package e2eutil import ( "bytes" + "context" "fmt" "testing" "time" + "github.com/coreos/etcd-operator/pkg/backup/writer" "github.com/coreos/etcd-operator/pkg/util/k8sutil" "k8s.io/api/core/v1" @@ -62,3 +64,12 @@ func printContainerStatus(buf *bytes.Buffer, ss []v1.ContainerStatus) { } } } + +func DeleteBackupFiles(wr writer.Writer, files []string) error { + for _, v := range files { + if err := wr.Delete(context.Background(), v); err != nil { + return err + } + } + return nil +} diff --git a/test/e2e/e2eutil/wait_util.go b/test/e2e/e2eutil/wait_util.go index 42d273a50..9a55f75d6 100644 --- a/test/e2e/e2eutil/wait_util.go +++ b/test/e2e/e2eutil/wait_util.go @@ -16,12 +16,14 @@ package e2eutil import ( "bytes" + "context" "fmt" "strings" "testing" "time" api "github.com/coreos/etcd-operator/pkg/apis/etcd/v1beta2" + "github.com/coreos/etcd-operator/pkg/backup/writer" "github.com/coreos/etcd-operator/pkg/generated/clientset/versioned" "github.com/coreos/etcd-operator/pkg/util" "github.com/coreos/etcd-operator/pkg/util/k8sutil" @@ -275,3 +277,16 @@ func WaitUntilOperatorReady(kubecli kubernetes.Interface, namespace, name string } return nil } + +func WaitUntilNoBackupFiles(wr writer.Writer, path string, timeout int) error { + return retryutil.Retry(time.Second, timeout, func() (bool, error) { + allBackups, err := wr.List(context.Background(), path) + if err != nil { + return false, fmt.Errorf("failed to list backup files: %v", err) + } + if len(allBackups) > 0 { + return false, fmt.Errorf("%d existing backup files are detected", len(allBackups)) + } + return true, nil + }) +}