diff --git a/cmd/restore-operator/main.go b/cmd/restore-operator/main.go index 15ec8e232..52ee39b79 100644 --- a/cmd/restore-operator/main.go +++ b/cmd/restore-operator/main.go @@ -38,8 +38,9 @@ import ( ) var ( - namespace string - createCRD bool + namespace string + createCRD bool + clusterWide bool ) const ( @@ -49,6 +50,7 @@ const ( func init() { flag.BoolVar(&createCRD, "create-crd", true, "The restore operator will not create the EtcdRestore CRD when this flag is set to false.") + flag.BoolVar(&clusterWide, "cluster-wide", false, "Enable operator to watch clusters in all namespaces") flag.Parse() } @@ -119,9 +121,20 @@ func createRecorder(kubecli kubernetes.Interface, name, namespace string) record func run(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) defer cancel() - c := controller.New(createCRD, namespace, fmt.Sprintf("%s:%d", serviceNameForMyself, servicePortForMyself)) + c := controller.New(newControllerConfig()) err := c.Start(ctx) if err != nil { logrus.Fatalf("etcd restore operator stopped with error: %v", err) } } + +func newControllerConfig() controller.Config { + cfg := controller.Config{ + Namespace: namespace, + ClusterWide: clusterWide, + CreateCRD: createCRD, + MySvcAddr: fmt.Sprintf("%s.%s:%d", serviceNameForMyself, namespace, servicePortForMyself), + } + + return cfg +} diff --git a/pkg/backup/backupapi/http.go b/pkg/backup/backupapi/http.go index 902482ff1..331137727 100644 --- a/pkg/backup/backupapi/http.go +++ b/pkg/backup/backupapi/http.go @@ -15,6 +15,7 @@ package backupapi import ( + "fmt" "net/url" "path" ) @@ -24,10 +25,11 @@ const ( ) // BackupURLForRestore creates a URL struct for retrieving an existing backup specified by a restore CR -func BackupURLForRestore(scheme, host, restoreName string) *url.URL { +func BackupURLForRestore(scheme, host, restoreName string, namespace string) *url.URL { return &url.URL{ - Scheme: scheme, - Host: host, - Path: path.Join(APIV1, "backup", restoreName), + Scheme: scheme, + Host: host, + Path: path.Join(APIV1, "backup", restoreName), + RawQuery: fmt.Sprintf("namespace=%s", namespace), } } diff --git a/pkg/controller/restore-operator/http.go b/pkg/controller/restore-operator/http.go index cc20e28fe..479aa361a 100644 --- a/pkg/controller/restore-operator/http.go +++ b/pkg/controller/restore-operator/http.go @@ -61,12 +61,15 @@ func (r *Restore) serveBackup(w http.ResponseWriter, req *http.Request) error { return errors.New("restore name is not specified") } + namespace := req.URL.Query().Get("namespace") + obj := &api.EtcdRestore{ ObjectMeta: metav1.ObjectMeta{ Name: restoreName, - Namespace: r.namespace, + Namespace: namespace, }, } + v, exists, err := r.indexer.Get(obj) if err != nil { return fmt.Errorf("failed to get restore CR for restore-name (%v): %v", restoreName, err) @@ -94,7 +97,7 @@ func (r *Restore) serveBackup(w http.ResponseWriter, req *http.Request) error { return errors.New("invalid s3 restore source field (spec.s3), must specify all required subfields") } - s3Cli, err := s3factory.NewClientFromSecret(r.kubecli, r.namespace, s3RestoreSource.Endpoint, s3RestoreSource.AWSSecret, s3RestoreSource.ForcePathStyle) + s3Cli, err := s3factory.NewClientFromSecret(r.kubecli, cr.Namespace, s3RestoreSource.Endpoint, s3RestoreSource.AWSSecret, s3RestoreSource.ForcePathStyle) if err != nil { return fmt.Errorf("failed to create S3 client: %v", err) } @@ -112,7 +115,7 @@ func (r *Restore) serveBackup(w http.ResponseWriter, req *http.Request) error { return errors.New("invalid abs restore source field (spec.abs), must specify all required subfields") } - absCli, err := absfactory.NewClientFromSecret(r.kubecli, r.namespace, absRestoreSource.ABSSecret) + absCli, err := absfactory.NewClientFromSecret(r.kubecli, cr.Namespace, absRestoreSource.ABSSecret) if err != nil { return fmt.Errorf("failed to create ABS client: %v", err) } @@ -131,7 +134,7 @@ func (r *Restore) serveBackup(w http.ResponseWriter, req *http.Request) error { return errors.New("invalid gcs restore source field (spec.gcs), must specify all required subfields") } - gcsCli, err := gcsfactory.NewClientFromSecret(ctx, r.kubecli, r.namespace, gcsRestoreSource.GCPSecret) + gcsCli, err := gcsfactory.NewClientFromSecret(ctx, r.kubecli, cr.Namespace, gcsRestoreSource.GCPSecret) if err != nil { return fmt.Errorf("failed to create GCS client: %v", err) } @@ -149,7 +152,7 @@ func (r *Restore) serveBackup(w http.ResponseWriter, req *http.Request) error { return errors.New("invalid oss restore source field (spec.oss), must specify all required subfields") } - ossCli, err := ossfactory.NewClientFromSecret(r.kubecli, r.namespace, ossRestoreSource.Endpoint, ossRestoreSource.OSSSecret) + ossCli, err := ossfactory.NewClientFromSecret(r.kubecli, cr.Namespace, ossRestoreSource.Endpoint, ossRestoreSource.OSSSecret) if err != nil { return fmt.Errorf("failed to create OSS client: %v", err) } diff --git a/pkg/controller/restore-operator/operator.go b/pkg/controller/restore-operator/operator.go index 0bd54ecdf..1ba3e2d4e 100644 --- a/pkg/controller/restore-operator/operator.go +++ b/pkg/controller/restore-operator/operator.go @@ -25,6 +25,7 @@ import ( "github.com/sirupsen/logrus" apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" @@ -47,16 +48,30 @@ type Restore struct { createCRD bool } +type Config struct { + Namespace string + ClusterWide bool + CreateCRD bool + MySvcAddr string +} + // New creates a restore operator. -func New(createCRD bool, namespace, mySvcAddr string) *Restore { +func New(config Config) *Restore { + var ns string + if config.ClusterWide { + ns = metav1.NamespaceAll + } else { + ns = config.Namespace + } + return &Restore{ logger: logrus.WithField("pkg", "controller"), - namespace: namespace, - mySvcAddr: mySvcAddr, + namespace: ns, + mySvcAddr: config.MySvcAddr, kubecli: k8sutil.MustNewKubeClient(), etcdCRCli: client.MustNewInCluster(), kubeExtCli: k8sutil.MustNewKubeExtClient(), - createCRD: createCRD, + createCRD: config.CreateCRD, } } diff --git a/pkg/controller/restore-operator/sync.go b/pkg/controller/restore-operator/sync.go index 6d59fee7c..88906b1c6 100644 --- a/pkg/controller/restore-operator/sync.go +++ b/pkg/controller/restore-operator/sync.go @@ -97,7 +97,7 @@ func (r *Restore) reportStatus(rerr error, er *api.EtcdRestore) { } else { er.Status.Succeeded = true } - _, err := r.etcdCRCli.EtcdV1beta2().EtcdRestores(r.namespace).Update(er) + _, err := r.etcdCRCli.EtcdV1beta2().EtcdRestores(er.Namespace).Update(er) if err != nil { r.logger.Warningf("failed to update status of restore CR %v : (%v)", er.Name, err) } @@ -148,21 +148,21 @@ func (r *Restore) prepareSeed(er *api.EtcdRestore) (err error) { // Fetch the reference EtcdCluster ecRef := er.Spec.EtcdCluster - ec, err := r.etcdCRCli.EtcdV1beta2().EtcdClusters(r.namespace).Get(ecRef.Name, metav1.GetOptions{}) + ec, err := r.etcdCRCli.EtcdV1beta2().EtcdClusters(er.Namespace).Get(ecRef.Name, metav1.GetOptions{}) if err != nil { - return fmt.Errorf("failed to get reference EtcdCluster(%s/%s): %v", r.namespace, ecRef.Name, err) + return fmt.Errorf("failed to get reference EtcdCluster(%s/%s): %v", er.Namespace, ecRef.Name, err) } if err := ec.Spec.Validate(); err != nil { return fmt.Errorf("invalid cluster spec: %v", err) } // Delete reference EtcdCluster - err = r.etcdCRCli.EtcdV1beta2().EtcdClusters(r.namespace).Delete(ecRef.Name, &metav1.DeleteOptions{}) + err = r.etcdCRCli.EtcdV1beta2().EtcdClusters(er.Namespace).Delete(ecRef.Name, &metav1.DeleteOptions{}) if err != nil { - return fmt.Errorf("failed to delete reference EtcdCluster (%s/%s): %v", r.namespace, ecRef.Name, err) + return fmt.Errorf("failed to delete reference EtcdCluster (%s/%s): %v", er.Namespace, ecRef.Name, err) } // Need to delete etcd pods, etc. completely before creating new cluster. - r.deleteClusterResourcesCompletely(ecRef.Name) + r.deleteClusterResourcesCompletely(er.Namespace, ecRef.Name) // Create the restored EtcdCluster with the same metadata and spec as reference EtcdCluster clusterName := ecRef.Name @@ -178,24 +178,24 @@ func (r *Restore) prepareSeed(er *api.EtcdRestore) (err error) { ec.Spec.Paused = true ec.Status.Phase = api.ClusterPhaseRunning - ec, err = r.etcdCRCli.EtcdV1beta2().EtcdClusters(r.namespace).Create(ec) + ec, err = r.etcdCRCli.EtcdV1beta2().EtcdClusters(er.Namespace).Create(ec) if err != nil { - return fmt.Errorf("failed to create restored EtcdCluster (%s/%s): %v", r.namespace, clusterName, err) + return fmt.Errorf("failed to create restored EtcdCluster (%s/%s): %v", er.Namespace, clusterName, err) } - err = r.createSeedMember(ec, r.mySvcAddr, clusterName, ec.AsOwner()) + err = r.createSeedMember(ec, r.mySvcAddr, er.Namespace, clusterName, ec.AsOwner()) if err != nil { return fmt.Errorf("failed to create seed member for cluster (%s): %v", clusterName, err) } // Retry updating the etcdcluster CR spec.paused=false. The etcd-operator will update the CR once so there needs to be a single retry in case of conflict err = retryutil.Retry(2, 1, func() (bool, error) { - ec, err = r.etcdCRCli.EtcdV1beta2().EtcdClusters(r.namespace).Get(clusterName, metav1.GetOptions{}) + ec, err = r.etcdCRCli.EtcdV1beta2().EtcdClusters(er.Namespace).Get(clusterName, metav1.GetOptions{}) if err != nil { return false, err } ec.Spec.Paused = false - _, err = r.etcdCRCli.EtcdV1beta2().EtcdClusters(r.namespace).Update(ec) + _, err = r.etcdCRCli.EtcdV1beta2().EtcdClusters(er.Namespace).Update(ec) if err != nil { if apierrors.IsConflict(err) { return false, nil @@ -210,29 +210,29 @@ func (r *Restore) prepareSeed(er *api.EtcdRestore) (err error) { return nil } -func (r *Restore) createSeedMember(ec *api.EtcdCluster, svcAddr, clusterName string, owner metav1.OwnerReference) error { +func (r *Restore) createSeedMember(ec *api.EtcdCluster, svcAddr, namespace string, clusterName string, owner metav1.OwnerReference) error { m := &etcdutil.Member{ Name: k8sutil.UniqueMemberName(clusterName), - Namespace: r.namespace, + Namespace: namespace, SecurePeer: ec.Spec.TLS.IsSecurePeer(), SecureClient: ec.Spec.TLS.IsSecureClient(), } ms := etcdutil.NewMemberSet(m) - backupURL := backupapi.BackupURLForRestore("http", svcAddr, clusterName) + backupURL := backupapi.BackupURLForRestore("http", svcAddr, clusterName, namespace) ec.SetDefaults() pod := k8sutil.NewSeedMemberPod(clusterName, ms, m, ec.Spec, owner, backupURL) - _, err := r.kubecli.Core().Pods(r.namespace).Create(pod) + _, err := r.kubecli.Core().Pods(ec.Namespace).Create(pod) return err } -func (r *Restore) deleteClusterResourcesCompletely(clusterName string) error { +func (r *Restore) deleteClusterResourcesCompletely(namespace string, clusterName string) error { // Delete etcd pods - err := r.kubecli.Core().Pods(r.namespace).Delete(clusterName, metav1.NewDeleteOptions(0)) + err := r.kubecli.Core().Pods(namespace).Delete(clusterName, metav1.NewDeleteOptions(0)) if err != nil && !k8sutil.IsKubernetesResourceNotFoundError(err) { return fmt.Errorf("failed to delete cluster pods: %v", err) } - err = r.kubecli.Core().Services(r.namespace).Delete(clusterName, metav1.NewDeleteOptions(0)) + err = r.kubecli.Core().Services(namespace).Delete(clusterName, metav1.NewDeleteOptions(0)) if err != nil && !k8sutil.IsKubernetesResourceNotFoundError(err) { return fmt.Errorf("failed to delete cluster services: %v", err) }