Skip to content
This repository has been archived by the owner on Mar 28, 2020. It is now read-only.

Clusterwide restore #2074

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions cmd/restore-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ import (
)

var (
namespace string
createCRD bool
namespace string
createCRD bool
clusterWide bool
)

const (
Expand All @@ -49,6 +50,7 @@ const (

func init() {
flag.BoolVar(&createCRD, "create-crd", true, "The restore operator will not create the EtcdRestore CRD when this flag is set to false.")
flag.BoolVar(&clusterWide, "cluster-wide", false, "Enable operator to watch clusters in all namespaces")
flag.Parse()
}

Expand Down Expand Up @@ -119,9 +121,20 @@ func createRecorder(kubecli kubernetes.Interface, name, namespace string) record
func run(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
c := controller.New(createCRD, namespace, fmt.Sprintf("%s:%d", serviceNameForMyself, servicePortForMyself))
c := controller.New(newControllerConfig())
err := c.Start(ctx)
if err != nil {
logrus.Fatalf("etcd restore operator stopped with error: %v", err)
}
}

func newControllerConfig() controller.Config {
cfg := controller.Config{
Namespace: namespace,
ClusterWide: clusterWide,
CreateCRD: createCRD,
MySvcAddr: fmt.Sprintf("%s.%s:%d", serviceNameForMyself, namespace, servicePortForMyself),
}

return cfg
}
10 changes: 6 additions & 4 deletions pkg/backup/backupapi/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package backupapi

import (
"fmt"
"net/url"
"path"
)
Expand All @@ -24,10 +25,11 @@ const (
)

// BackupURLForRestore creates a URL struct for retrieving an existing backup specified by a restore CR
func BackupURLForRestore(scheme, host, restoreName string) *url.URL {
func BackupURLForRestore(scheme, host, restoreName string, namespace string) *url.URL {
return &url.URL{
Scheme: scheme,
Host: host,
Path: path.Join(APIV1, "backup", restoreName),
Scheme: scheme,
Host: host,
Path: path.Join(APIV1, "backup", restoreName),
RawQuery: fmt.Sprintf("namespace=%s", namespace),
}
}
13 changes: 8 additions & 5 deletions pkg/controller/restore-operator/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,15 @@ func (r *Restore) serveBackup(w http.ResponseWriter, req *http.Request) error {
return errors.New("restore name is not specified")
}

namespace := req.URL.Query().Get("namespace")

obj := &api.EtcdRestore{
ObjectMeta: metav1.ObjectMeta{
Name: restoreName,
Namespace: r.namespace,
Namespace: namespace,
},
}

v, exists, err := r.indexer.Get(obj)
if err != nil {
return fmt.Errorf("failed to get restore CR for restore-name (%v): %v", restoreName, err)
Expand Down Expand Up @@ -94,7 +97,7 @@ func (r *Restore) serveBackup(w http.ResponseWriter, req *http.Request) error {
return errors.New("invalid s3 restore source field (spec.s3), must specify all required subfields")
}

s3Cli, err := s3factory.NewClientFromSecret(r.kubecli, r.namespace, s3RestoreSource.Endpoint, s3RestoreSource.AWSSecret, s3RestoreSource.ForcePathStyle)
s3Cli, err := s3factory.NewClientFromSecret(r.kubecli, cr.Namespace, s3RestoreSource.Endpoint, s3RestoreSource.AWSSecret, s3RestoreSource.ForcePathStyle)
if err != nil {
return fmt.Errorf("failed to create S3 client: %v", err)
}
Expand All @@ -112,7 +115,7 @@ func (r *Restore) serveBackup(w http.ResponseWriter, req *http.Request) error {
return errors.New("invalid abs restore source field (spec.abs), must specify all required subfields")
}

absCli, err := absfactory.NewClientFromSecret(r.kubecli, r.namespace, absRestoreSource.ABSSecret)
absCli, err := absfactory.NewClientFromSecret(r.kubecli, cr.Namespace, absRestoreSource.ABSSecret)
if err != nil {
return fmt.Errorf("failed to create ABS client: %v", err)
}
Expand All @@ -131,7 +134,7 @@ func (r *Restore) serveBackup(w http.ResponseWriter, req *http.Request) error {
return errors.New("invalid gcs restore source field (spec.gcs), must specify all required subfields")
}

gcsCli, err := gcsfactory.NewClientFromSecret(ctx, r.kubecli, r.namespace, gcsRestoreSource.GCPSecret)
gcsCli, err := gcsfactory.NewClientFromSecret(ctx, r.kubecli, cr.Namespace, gcsRestoreSource.GCPSecret)
if err != nil {
return fmt.Errorf("failed to create GCS client: %v", err)
}
Expand All @@ -149,7 +152,7 @@ func (r *Restore) serveBackup(w http.ResponseWriter, req *http.Request) error {
return errors.New("invalid oss restore source field (spec.oss), must specify all required subfields")
}

ossCli, err := ossfactory.NewClientFromSecret(r.kubecli, r.namespace, ossRestoreSource.Endpoint, ossRestoreSource.OSSSecret)
ossCli, err := ossfactory.NewClientFromSecret(r.kubecli, cr.Namespace, ossRestoreSource.Endpoint, ossRestoreSource.OSSSecret)
if err != nil {
return fmt.Errorf("failed to create OSS client: %v", err)
}
Expand Down
23 changes: 19 additions & 4 deletions pkg/controller/restore-operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/sirupsen/logrus"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
Expand All @@ -47,16 +48,30 @@ type Restore struct {
createCRD bool
}

type Config struct {
Namespace string
ClusterWide bool
CreateCRD bool
MySvcAddr string
}

// New creates a restore operator.
func New(createCRD bool, namespace, mySvcAddr string) *Restore {
func New(config Config) *Restore {
var ns string
if config.ClusterWide {
ns = metav1.NamespaceAll
} else {
ns = config.Namespace
}

return &Restore{
logger: logrus.WithField("pkg", "controller"),
namespace: namespace,
mySvcAddr: mySvcAddr,
namespace: ns,
mySvcAddr: config.MySvcAddr,
kubecli: k8sutil.MustNewKubeClient(),
etcdCRCli: client.MustNewInCluster(),
kubeExtCli: k8sutil.MustNewKubeExtClient(),
createCRD: createCRD,
createCRD: config.CreateCRD,
}
}

Expand Down
36 changes: 18 additions & 18 deletions pkg/controller/restore-operator/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (r *Restore) reportStatus(rerr error, er *api.EtcdRestore) {
} else {
er.Status.Succeeded = true
}
_, err := r.etcdCRCli.EtcdV1beta2().EtcdRestores(r.namespace).Update(er)
_, err := r.etcdCRCli.EtcdV1beta2().EtcdRestores(er.Namespace).Update(er)
if err != nil {
r.logger.Warningf("failed to update status of restore CR %v : (%v)", er.Name, err)
}
Expand Down Expand Up @@ -148,21 +148,21 @@ func (r *Restore) prepareSeed(er *api.EtcdRestore) (err error) {

// Fetch the reference EtcdCluster
ecRef := er.Spec.EtcdCluster
ec, err := r.etcdCRCli.EtcdV1beta2().EtcdClusters(r.namespace).Get(ecRef.Name, metav1.GetOptions{})
ec, err := r.etcdCRCli.EtcdV1beta2().EtcdClusters(er.Namespace).Get(ecRef.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get reference EtcdCluster(%s/%s): %v", r.namespace, ecRef.Name, err)
return fmt.Errorf("failed to get reference EtcdCluster(%s/%s): %v", er.Namespace, ecRef.Name, err)
}
if err := ec.Spec.Validate(); err != nil {
return fmt.Errorf("invalid cluster spec: %v", err)
}

// Delete reference EtcdCluster
err = r.etcdCRCli.EtcdV1beta2().EtcdClusters(r.namespace).Delete(ecRef.Name, &metav1.DeleteOptions{})
err = r.etcdCRCli.EtcdV1beta2().EtcdClusters(er.Namespace).Delete(ecRef.Name, &metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("failed to delete reference EtcdCluster (%s/%s): %v", r.namespace, ecRef.Name, err)
return fmt.Errorf("failed to delete reference EtcdCluster (%s/%s): %v", er.Namespace, ecRef.Name, err)
}
// Need to delete etcd pods, etc. completely before creating new cluster.
r.deleteClusterResourcesCompletely(ecRef.Name)
r.deleteClusterResourcesCompletely(er.Namespace, ecRef.Name)

// Create the restored EtcdCluster with the same metadata and spec as reference EtcdCluster
clusterName := ecRef.Name
Expand All @@ -178,24 +178,24 @@ func (r *Restore) prepareSeed(er *api.EtcdRestore) (err error) {

ec.Spec.Paused = true
ec.Status.Phase = api.ClusterPhaseRunning
ec, err = r.etcdCRCli.EtcdV1beta2().EtcdClusters(r.namespace).Create(ec)
ec, err = r.etcdCRCli.EtcdV1beta2().EtcdClusters(er.Namespace).Create(ec)
if err != nil {
return fmt.Errorf("failed to create restored EtcdCluster (%s/%s): %v", r.namespace, clusterName, err)
return fmt.Errorf("failed to create restored EtcdCluster (%s/%s): %v", er.Namespace, clusterName, err)
}

err = r.createSeedMember(ec, r.mySvcAddr, clusterName, ec.AsOwner())
err = r.createSeedMember(ec, r.mySvcAddr, er.Namespace, clusterName, ec.AsOwner())
if err != nil {
return fmt.Errorf("failed to create seed member for cluster (%s): %v", clusterName, err)
}

// Retry updating the etcdcluster CR spec.paused=false. The etcd-operator will update the CR once so there needs to be a single retry in case of conflict
err = retryutil.Retry(2, 1, func() (bool, error) {
ec, err = r.etcdCRCli.EtcdV1beta2().EtcdClusters(r.namespace).Get(clusterName, metav1.GetOptions{})
ec, err = r.etcdCRCli.EtcdV1beta2().EtcdClusters(er.Namespace).Get(clusterName, metav1.GetOptions{})
if err != nil {
return false, err
}
ec.Spec.Paused = false
_, err = r.etcdCRCli.EtcdV1beta2().EtcdClusters(r.namespace).Update(ec)
_, err = r.etcdCRCli.EtcdV1beta2().EtcdClusters(er.Namespace).Update(ec)
if err != nil {
if apierrors.IsConflict(err) {
return false, nil
Expand All @@ -210,29 +210,29 @@ func (r *Restore) prepareSeed(er *api.EtcdRestore) (err error) {
return nil
}

func (r *Restore) createSeedMember(ec *api.EtcdCluster, svcAddr, clusterName string, owner metav1.OwnerReference) error {
func (r *Restore) createSeedMember(ec *api.EtcdCluster, svcAddr, namespace string, clusterName string, owner metav1.OwnerReference) error {
m := &etcdutil.Member{
Name: k8sutil.UniqueMemberName(clusterName),
Namespace: r.namespace,
Namespace: namespace,
SecurePeer: ec.Spec.TLS.IsSecurePeer(),
SecureClient: ec.Spec.TLS.IsSecureClient(),
}
ms := etcdutil.NewMemberSet(m)
backupURL := backupapi.BackupURLForRestore("http", svcAddr, clusterName)
backupURL := backupapi.BackupURLForRestore("http", svcAddr, clusterName, namespace)
ec.SetDefaults()
pod := k8sutil.NewSeedMemberPod(clusterName, ms, m, ec.Spec, owner, backupURL)
_, err := r.kubecli.Core().Pods(r.namespace).Create(pod)
_, err := r.kubecli.Core().Pods(ec.Namespace).Create(pod)
return err
}

func (r *Restore) deleteClusterResourcesCompletely(clusterName string) error {
func (r *Restore) deleteClusterResourcesCompletely(namespace string, clusterName string) error {
// Delete etcd pods
err := r.kubecli.Core().Pods(r.namespace).Delete(clusterName, metav1.NewDeleteOptions(0))
err := r.kubecli.Core().Pods(namespace).Delete(clusterName, metav1.NewDeleteOptions(0))
if err != nil && !k8sutil.IsKubernetesResourceNotFoundError(err) {
return fmt.Errorf("failed to delete cluster pods: %v", err)
}

err = r.kubecli.Core().Services(r.namespace).Delete(clusterName, metav1.NewDeleteOptions(0))
err = r.kubecli.Core().Services(namespace).Delete(clusterName, metav1.NewDeleteOptions(0))
if err != nil && !k8sutil.IsKubernetesResourceNotFoundError(err) {
return fmt.Errorf("failed to delete cluster services: %v", err)
}
Expand Down