diff --git a/changelogs/unreleased/5864-blackpiglet b/changelogs/unreleased/5864-blackpiglet new file mode 100644 index 0000000000..6bc242d500 --- /dev/null +++ b/changelogs/unreleased/5864-blackpiglet @@ -0,0 +1 @@ +Make restore controller adopting the controller-runtime framework. \ No newline at end of file diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 031e97d171..377646a758 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -27,7 +27,7 @@ import ( "strings" "time" - "github.com/bombsimon/logrusr/v3" + logrusr "github.com/bombsimon/logrusr/v3" snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" snapshotv1client "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned" snapshotv1informers "github.com/kubernetes-csi/external-snapshotter/client/v4/informers/externalversions" @@ -47,7 +47,7 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - clocks "k8s.io/utils/clock" + "k8s.io/utils/clock" ctrl "sigs.k8s.io/controller-runtime" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -665,50 +665,12 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string } } - restoreControllerRunInfo := func() controllerRunInfo { - restorer, err := restore.NewKubernetesRestorer( - s.discoveryHelper, - client.NewDynamicFactory(s.dynamicClient), - s.config.restoreResourcePriorities, - s.kubeClient.CoreV1().Namespaces(), - podvolume.NewRestorerFactory(s.repoLocker, s.repoEnsurer, s.veleroClient, s.kubeClient.CoreV1(), - s.kubeClient.CoreV1(), s.kubeClient, s.sharedInformerFactory.Velero().V1().BackupRepositories().Informer().HasSynced, s.logger), - s.config.podVolumeOperationTimeout, - s.config.resourceTerminatingTimeout, - s.logger, - podexec.NewPodCommandExecutor(s.kubeClientConfig, s.kubeClient.CoreV1().RESTClient()), - s.kubeClient.CoreV1().RESTClient(), - s.credentialFileStore, - s.mgr.GetClient(), - ) - cmd.CheckError(err) - - restoreController := controller.NewRestoreController( - s.namespace, - s.sharedInformerFactory.Velero().V1().Restores(), - restorer, - s.mgr.GetClient(), - s.logger, - s.logLevel, - newPluginManager, - backupStoreGetter, - s.metrics, - s.config.formatFlag.Parse(), - ) - - return controllerRunInfo{ - controller: restoreController, - numWorkers: defaultControllerWorkers, - } - } - // By far, PodVolumeBackup, PodVolumeRestore, BackupStorageLocation controllers // are not included in --disable-controllers list. // This is because of PVB and PVR are used by node agent DaemonSet, // and BSL controller is mandatory for Velero to work. enabledControllers := map[string]func() controllerRunInfo{ - controller.Backup: backupControllerRunInfo, - controller.Restore: restoreControllerRunInfo, + controller.Backup: backupControllerRunInfo, } // Note: all runtime type controllers that can be disabled are grouped separately, below: enabledRuntimeControllers := map[string]struct{}{ @@ -719,6 +681,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string controller.BackupDeletion: {}, controller.GarbageCollection: {}, controller.BackupSync: {}, + controller.Restore: {}, } if s.config.restoreOnly { @@ -815,7 +778,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string s.mgr.GetClient(), s.ctx, s.pluginRegistry, - clocks.RealClock{}, + clock.RealClock{}, s.logger, ).SetupWithManager(s.mgr); err != nil { s.logger.Fatal(err, "unable to create controller", "controller", controller.ServerStatusRequest) @@ -825,7 +788,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string if _, ok := enabledRuntimeControllers[controller.DownloadRequest]; ok { r := controller.NewDownloadRequestReconciler( s.mgr.GetClient(), - clocks.RealClock{}, + clock.RealClock{}, newPluginManager, backupStoreGetter, s.logger, @@ -861,6 +824,43 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string } } + if _, ok := enabledRuntimeControllers[controller.Restore]; ok { + restorer, err := restore.NewKubernetesRestorer( + s.discoveryHelper, + client.NewDynamicFactory(s.dynamicClient), + s.config.restoreResourcePriorities, + s.kubeClient.CoreV1().Namespaces(), + podvolume.NewRestorerFactory(s.repoLocker, s.repoEnsurer, s.veleroClient, s.kubeClient.CoreV1(), + s.kubeClient.CoreV1(), s.kubeClient, s.sharedInformerFactory.Velero().V1().BackupRepositories().Informer().HasSynced, s.logger), + s.config.podVolumeOperationTimeout, + s.config.resourceTerminatingTimeout, + s.logger, + podexec.NewPodCommandExecutor(s.kubeClientConfig, s.kubeClient.CoreV1().RESTClient()), + s.kubeClient.CoreV1().RESTClient(), + s.credentialFileStore, + s.mgr.GetClient(), + ) + + cmd.CheckError(err) + + r := controller.NewRestoreReconciler( + s.ctx, + s.namespace, + restorer, + s.mgr.GetClient(), + s.logger, + s.logLevel, + newPluginManager, + backupStoreGetter, + s.metrics, + s.config.formatFlag.Parse(), + ) + + if err = r.SetupWithManager(s.mgr); err != nil { + s.logger.Fatal(err, "fail to create controller", "controller", controller.Restore) + } + } + // TODO(2.0): presuming all controllers and resources are converted to runtime-controller // by v2.0, the block from this line and including the `s.mgr.Start() will be // deprecated, since the manager auto-starts all the caches. Until then, we need to start the diff --git a/pkg/controller/restore_controller.go b/pkg/controller/restore_controller.go index b154d1e5cf..18a87bd618 100644 --- a/pkg/controller/restore_controller.go +++ b/pkg/controller/restore_controller.go @@ -34,14 +34,13 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/tools/cache" - clocks "k8s.io/utils/clock" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/utils/clock" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/vmware-tanzu/velero/internal/hook" api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" - velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" - velerov1informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions/velero/v1" "github.com/vmware-tanzu/velero/pkg/label" "github.com/vmware-tanzu/velero/pkg/metrics" "github.com/vmware-tanzu/velero/pkg/persistence" @@ -85,24 +84,30 @@ var nonRestorableResources = []string{ "backuprepositories.velero.io", } -type restoreController struct { - *genericController - +type restoreReconciler struct { + ctx context.Context namespace string restorer pkgrestore.Restorer kbClient client.Client restoreLogLevel logrus.Level + logger logrus.FieldLogger metrics *metrics.ServerMetrics logFormat logging.Format - clock clocks.WithTickerAndDelayedExecution + clock clock.WithTickerAndDelayedExecution newPluginManager func(logger logrus.FieldLogger) clientmgmt.Manager backupStoreGetter persistence.ObjectBackupStoreGetter } -func NewRestoreController( +type backupInfo struct { + backup *api.Backup + location *api.BackupStorageLocation + backupStore persistence.BackupStore +} + +func NewRestoreReconciler( + ctx context.Context, namespace string, - restoreInformer velerov1informers.RestoreInformer, restorer pkgrestore.Restorer, kbClient client.Client, logger logrus.FieldLogger, @@ -111,16 +116,17 @@ func NewRestoreController( backupStoreGetter persistence.ObjectBackupStoreGetter, metrics *metrics.ServerMetrics, logFormat logging.Format, -) Interface { - c := &restoreController{ - genericController: newGenericController(Restore, logger), - namespace: namespace, - restorer: restorer, - kbClient: kbClient, - restoreLogLevel: restoreLogLevel, - metrics: metrics, - logFormat: logFormat, - clock: &clocks.RealClock{}, +) *restoreReconciler { + r := &restoreReconciler{ + ctx: ctx, + namespace: namespace, + restorer: restorer, + kbClient: kbClient, + logger: logger, + restoreLogLevel: restoreLogLevel, + metrics: metrics, + logFormat: logFormat, + clock: &clock.RealClock{}, // use variables to refer to these functions so they can be // replaced with fakes for testing. @@ -128,95 +134,28 @@ func NewRestoreController( backupStoreGetter: backupStoreGetter, } - c.syncHandler = c.processQueueItem - c.resyncFunc = c.resync - c.resyncPeriod = time.Minute - - // restore informer cannot be removed, until restore controller adopt the controller-runtime framework. - restoreInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - restore := obj.(*api.Restore) - - switch restore.Status.Phase { - case "", api.RestorePhaseNew: - // only process new restores - default: - c.logger.WithFields(logrus.Fields{ - "restore": kubeutil.NamespaceAndName(restore), - "phase": restore.Status.Phase, - }).Debug("Restore is not new, skipping") - return - } - - key, err := cache.MetaNamespaceKeyFunc(restore) - if err != nil { - c.logger.WithError(errors.WithStack(err)).WithField("restore", restore).Error("Error creating queue key, item not added to queue") - return - } - c.queue.Add(key) - }, - }, - ) - - return c -} - -func (c *restoreController) resync() { - restoreList := &velerov1api.RestoreList{} - err := c.kbClient.List(context.Background(), restoreList, &client.ListOptions{}) - if err != nil { - c.logger.Error(err, "Error computing restore_total metric") - } else { - c.metrics.SetRestoreTotal(int64(len(restoreList.Items))) - } -} - -func (c *restoreController) processQueueItem(key string) error { - log := c.logger.WithField("key", key) + // Move the periodical backup and restore metrics computing logic from controllers to here. + // This is due to, after controllers using controller-runtime, controllers doesn't have a + // timer as the before generic-controller, and the backup and restore controller only have + // one length queue, furthermore the backup and restore process could last for a long time. + // Compute the metric here is a better choice. + r.updateTotalRestoreMetric() - log.Debug("Running processQueueItem") - ns, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - log.WithError(err).Error("unable to process queue item: error splitting queue key") - // Return nil here so we don't try to process the key any more - return nil - } - - log.Debug("Getting Restore") - restore := &velerov1api.Restore{} - err = c.kbClient.Get(context.Background(), client.ObjectKey{ - Namespace: ns, - Name: name, - }, restore) - if err != nil { - return errors.Wrap(err, "error getting Restore") - } - - // TODO I think this is now unnecessary. We only initially place - // item with Phase = ("" | New) into the queue. Items will only get - // re-queued if syncHandler returns an error, which will only - // happen if there's an error updating Phase from its initial - // state to something else. So any time it's re-queued it will - // still have its initial state, which we've already confirmed - // is ("" | New) - switch restore.Status.Phase { - case "", api.RestorePhaseNew: - // only process new restores - default: - return nil - } - - // Deep-copy the restore so the copy from the lister is not modified. - // Any errors returned by processRestore will be bubbled up, meaning - // the key will be re-enqueued by the controller. - return c.processRestore(restore.DeepCopy()) + return r } -func (c *restoreController) processRestore(restore *api.Restore) error { +func (r *restoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { // Developer note: any error returned by this method will // cause the restore to be re-enqueued and re-processed by // the controller. + log := r.logger.WithField("Restore", req.NamespacedName.String()) + + restore := &api.Restore{} + err := r.kbClient.Get(ctx, client.ObjectKey{Namespace: req.Namespace, Name: req.Name}, restore) + if err != nil { + log.Infof("Fail to get restore %s: %s", req.NamespacedName.String(), err.Error()) + return ctrl.Result{}, err + } // store a copy of the original restore for creating patch original := restore.DeepCopy() @@ -225,67 +164,86 @@ func (c *restoreController) processRestore(restore *api.Restore) error { // manager used here is not the same one used by c.runValidatedRestore, // since within that function we want the plugin manager to log to // our per-restore log (which is instantiated within c.runValidatedRestore). - pluginManager := c.newPluginManager(c.logger) + pluginManager := r.newPluginManager(r.logger) defer pluginManager.CleanupClients() - info := c.validateAndComplete(restore, pluginManager) + info := r.validateAndComplete(restore, pluginManager) // Register attempts after validation so we don't have to fetch the backup multiple times backupScheduleName := restore.Spec.ScheduleName - c.metrics.RegisterRestoreAttempt(backupScheduleName) + r.metrics.RegisterRestoreAttempt(backupScheduleName) if len(restore.Status.ValidationErrors) > 0 { restore.Status.Phase = api.RestorePhaseFailedValidation - c.metrics.RegisterRestoreValidationFailed(backupScheduleName) + r.metrics.RegisterRestoreValidationFailed(backupScheduleName) } else { - restore.Status.StartTimestamp = &metav1.Time{Time: c.clock.Now()} + restore.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()} restore.Status.Phase = api.RestorePhaseInProgress } // patch to update status and persist to API - err := kubeutil.PatchResource(original, restore, c.kbClient) + err = kubeutil.PatchResource(original, restore, r.kbClient) if err != nil { // return the error so the restore can be re-processed; it's currently // still in phase = New. - return errors.Wrapf(err, "error updating Restore phase to %s", restore.Status.Phase) + log.Errorf("fail to update restore %s status to %s: %s", + req.NamespacedName.String(), restore.Status.Phase, err.Error()) + return ctrl.Result{}, errors.Wrapf(err, "error updating Restore phase to %s", restore.Status.Phase) } // store ref to just-updated item for creating patch original = restore.DeepCopy() if restore.Status.Phase == api.RestorePhaseFailedValidation { - return nil + return ctrl.Result{}, nil } - if err := c.runValidatedRestore(restore, info); err != nil { - c.logger.WithError(err).Debug("Restore failed") + if err := r.runValidatedRestore(restore, info); err != nil { + log.WithError(err).Debug("Restore failed") restore.Status.Phase = api.RestorePhaseFailed restore.Status.FailureReason = err.Error() - c.metrics.RegisterRestoreFailed(backupScheduleName) + r.metrics.RegisterRestoreFailed(backupScheduleName) } else if restore.Status.Errors > 0 { - c.logger.Debug("Restore partially failed") + log.Debug("Restore partially failed") restore.Status.Phase = api.RestorePhasePartiallyFailed - c.metrics.RegisterRestorePartialFailure(backupScheduleName) + r.metrics.RegisterRestorePartialFailure(backupScheduleName) } else { - c.logger.Debug("Restore completed") + log.Debug("Restore completed") restore.Status.Phase = api.RestorePhaseCompleted - c.metrics.RegisterRestoreSuccess(backupScheduleName) + r.metrics.RegisterRestoreSuccess(backupScheduleName) } - restore.Status.CompletionTimestamp = &metav1.Time{Time: c.clock.Now()} - c.logger.Debug("Updating restore's final status") - if err = kubeutil.PatchResource(original, restore, c.kbClient); err != nil { - c.logger.WithError(errors.WithStack(err)).Info("Error updating restore's final status") + restore.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} + log.Debug("Updating restore's final status") + if err = kubeutil.PatchResource(original, restore, r.kbClient); err != nil { + log.WithError(errors.WithStack(err)).Info("Error updating restore's final status") + // No need to re-enqueue here, because restore's already set to InProgress before. + // Controller only handle New restore. } - return nil + return ctrl.Result{}, nil } -type backupInfo struct { - backup *api.Backup - location *velerov1api.BackupStorageLocation - backupStore persistence.BackupStore +func (r *restoreReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + WithEventFilter(kubeutil.NewCreateEventPredicate(func(obj client.Object) bool { + restore := obj.(*api.Restore) + + switch restore.Status.Phase { + case "", api.RestorePhaseNew: + // only process new restores + return true + default: + r.logger.WithFields(logrus.Fields{ + "restore": kubeutil.NamespaceAndName(restore), + "phase": restore.Status.Phase, + }).Debug("Restore is not new, skipping") + return false + } + })). + For(&api.Restore{}). + Complete(r) } -func (c *restoreController) validateAndComplete(restore *api.Restore, pluginManager clientmgmt.Manager) backupInfo { +func (r *restoreReconciler) validateAndComplete(restore *api.Restore, pluginManager clientmgmt.Manager) backupInfo { // add non-restorable resources to restore's excluded resources excludedResources := sets.NewString(restore.Spec.ExcludedResources...) for _, nonrestorable := range nonRestorableResources { @@ -345,11 +303,11 @@ func (c *restoreController) validateAndComplete(restore *api.Restore, pluginMana // the schedule if restore.Spec.ScheduleName != "" { selector := labels.SelectorFromSet(labels.Set(map[string]string{ - velerov1api.ScheduleNameLabel: restore.Spec.ScheduleName, + api.ScheduleNameLabel: restore.Spec.ScheduleName, })) - backupList := &velerov1api.BackupList{} - c.kbClient.List(context.Background(), backupList, &client.ListOptions{ + backupList := &api.BackupList{} + r.kbClient.List(context.Background(), backupList, &client.ListOptions{ LabelSelector: selector, }) if err != nil { @@ -368,7 +326,7 @@ func (c *restoreController) validateAndComplete(restore *api.Restore, pluginMana } } - info, err := c.fetchBackupInfo(restore.Spec.BackupName, pluginManager) + info, err := r.fetchBackupInfo(restore.Spec.BackupName, pluginManager) if err != nil { restore.Status.ValidationErrors = append(restore.Status.ValidationErrors, fmt.Sprintf("Error retrieving backup: %v", err)) return backupInfo{} @@ -376,7 +334,7 @@ func (c *restoreController) validateAndComplete(restore *api.Restore, pluginMana // Fill in the ScheduleName so it's easier to consume for metrics. if restore.Spec.ScheduleName == "" { - restore.Spec.ScheduleName = info.backup.GetLabels()[velerov1api.ScheduleNameLabel] + restore.Spec.ScheduleName = info.backup.GetLabels()[api.ScheduleNameLabel] } return info @@ -423,22 +381,22 @@ func mostRecentCompletedBackup(backups []api.Backup) api.Backup { // fetchBackupInfo checks the backup lister for a backup that matches the given name. If it doesn't // find it, it returns an error. -func (c *restoreController) fetchBackupInfo(backupName string, pluginManager clientmgmt.Manager) (backupInfo, error) { - backup := &velerov1api.Backup{} - err := c.kbClient.Get(context.Background(), types.NamespacedName{Namespace: c.namespace, Name: backupName}, backup) +func (r *restoreReconciler) fetchBackupInfo(backupName string, pluginManager clientmgmt.Manager) (backupInfo, error) { + backup := &api.Backup{} + err := r.kbClient.Get(context.Background(), types.NamespacedName{Namespace: r.namespace, Name: backupName}, backup) if err != nil { return backupInfo{}, err } - location := &velerov1api.BackupStorageLocation{} - if err := c.kbClient.Get(context.Background(), client.ObjectKey{ - Namespace: c.namespace, + location := &api.BackupStorageLocation{} + if err := r.kbClient.Get(context.Background(), client.ObjectKey{ + Namespace: r.namespace, Name: backup.Spec.StorageLocation, }, location); err != nil { return backupInfo{}, errors.WithStack(err) } - backupStore, err := c.backupStoreGetter.Get(location, pluginManager, c.logger) + backupStore, err := r.backupStoreGetter.Get(location, pluginManager, r.logger) if err != nil { return backupInfo{}, err } @@ -454,16 +412,16 @@ func (c *restoreController) fetchBackupInfo(backupName string, pluginManager cli // The log and results files are uploaded to backup storage. Any error returned from this function // means that the restore failed. This function updates the restore API object with warning and error // counts, but *does not* update its phase or patch it via the API. -func (c *restoreController) runValidatedRestore(restore *api.Restore, info backupInfo) error { +func (r *restoreReconciler) runValidatedRestore(restore *api.Restore, info backupInfo) error { // instantiate the per-restore logger that will output both to a temp file // (for upload to object storage) and to stdout. - restoreLog, err := newRestoreLogger(restore, c.restoreLogLevel, c.logFormat) + restoreLog, err := newRestoreLogger(restore, r.restoreLogLevel, r.logFormat) if err != nil { return err } - defer restoreLog.closeAndRemove(c.logger) + defer restoreLog.closeAndRemove(r.logger) - pluginManager := c.newPluginManager(restoreLog) + pluginManager := r.newPluginManager(restoreLog) defer pluginManager.CleanupClients() actions, err := pluginManager.GetRestoreItemActionsV2() @@ -482,16 +440,16 @@ func (c *restoreController) runValidatedRestore(restore *api.Restore, info backu if err != nil { return errors.Wrap(err, "error downloading backup") } - defer closeAndRemoveFile(backupFile, c.logger) + defer closeAndRemoveFile(backupFile, r.logger) listOpts := &client.ListOptions{ LabelSelector: labels.Set(map[string]string{ - velerov1api.BackupNameLabel: label.GetValidName(restore.Spec.BackupName), + api.BackupNameLabel: label.GetValidName(restore.Spec.BackupName), }).AsSelector(), } - podVolumeBackupList := &velerov1api.PodVolumeBackupList{} - err = c.kbClient.List(context.TODO(), podVolumeBackupList, listOpts) + podVolumeBackupList := &api.PodVolumeBackupList{} + err = r.kbClient.List(context.TODO(), podVolumeBackupList, listOpts) if err != nil { restoreLog.Errorf("Fail to list PodVolumeBackup :%s", err.Error()) return errors.WithStack(err) @@ -504,7 +462,7 @@ func (c *restoreController) runValidatedRestore(restore *api.Restore, info backu restoreLog.Info("starting restore") - var podVolumeBackups []*velerov1api.PodVolumeBackup + var podVolumeBackups []*api.PodVolumeBackup for i := range podVolumeBackupList.Items { podVolumeBackups = append(podVolumeBackups, &podVolumeBackupList.Items[i]) } @@ -516,7 +474,7 @@ func (c *restoreController) runValidatedRestore(restore *api.Restore, info backu VolumeSnapshots: volumeSnapshots, BackupReader: backupFile, } - restoreWarnings, restoreErrors := c.restorer.RestoreWithResolvers(restoreReq, actionsResolver, snapshotItemResolver, + restoreWarnings, restoreErrors := r.restorer.RestoreWithResolvers(restoreReq, actionsResolver, snapshotItemResolver, pluginManager) // log errors and warnings to the restore log @@ -546,12 +504,12 @@ func (c *restoreController) runValidatedRestore(restore *api.Restore, info backu // re-instantiate the backup store because credentials could have changed since the original // instantiation, if this was a long-running restore - info.backupStore, err = c.backupStoreGetter.Get(info.location, pluginManager, c.logger) + info.backupStore, err = r.backupStoreGetter.Get(info.location, pluginManager, r.logger) if err != nil { return errors.Wrap(err, "error setting up backup store to persist log and results files") } - if logReader, err := restoreLog.done(c.logger); err != nil { + if logReader, err := restoreLog.done(r.logger); err != nil { restoreErrors.Velero = append(restoreErrors.Velero, fmt.Sprintf("error getting restore log reader: %v", err)) } else { if err := info.backupStore.PutRestoreLog(restore.Spec.BackupName, restore.Name, logReader); err != nil { @@ -578,12 +536,35 @@ func (c *restoreController) runValidatedRestore(restore *api.Restore, info backu } if err := putResults(restore, m, info.backupStore); err != nil { - c.logger.WithError(err).Error("Error uploading restore results to backup storage") + r.logger.WithError(err).Error("Error uploading restore results to backup storage") } return nil } +// updateTotalRestoreMetric update the velero_restore_total metric every minute. +func (r *restoreReconciler) updateTotalRestoreMetric() { + go func() { + // Wait for 5 seconds to let controller-runtime to setup k8s clients. + time.Sleep(5 * time.Second) + + wait.NonSlidingUntil( + func() { + // recompute restore_total metric + restoreList := &api.RestoreList{} + err := r.kbClient.List(context.Background(), restoreList, &client.ListOptions{}) + if err != nil { + r.logger.Error(err, "Error computing restore_total metric") + } else { + r.metrics.SetRestoreTotal(int64(len(restoreList.Items))) + } + }, + 1*time.Minute, + r.ctx.Done(), + ) + }() +} + func putResults(restore *api.Restore, results map[string]results.Result, backupStore persistence.BackupStore) error { buf := new(bytes.Buffer) gzw := gzip.NewWriter(buf) diff --git a/pkg/controller/restore_controller_test.go b/pkg/controller/restore_controller_test.go index ede8c85b5d..334f912493 100644 --- a/pkg/controller/restore_controller_test.go +++ b/pkg/controller/restore_controller_test.go @@ -29,14 +29,13 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/cache" + "k8s.io/apimachinery/pkg/types" clocktesting "k8s.io/utils/clock/testing" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/builder" - "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/fake" - informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions" "github.com/vmware-tanzu/velero/pkg/metrics" persistencemocks "github.com/vmware-tanzu/velero/pkg/persistence/mocks" "github.com/vmware-tanzu/velero/pkg/plugin/clientmgmt" @@ -91,21 +90,19 @@ func TestFetchBackupInfo(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { var ( - client = fake.NewSimpleClientset() - fakeClient = velerotest.NewFakeControllerRuntimeClient(t) - restorer = &fakeRestorer{kbClient: fakeClient} - sharedInformers = informers.NewSharedInformerFactory(client, 0) - logger = velerotest.NewLogger() - pluginManager = &pluginmocks.Manager{} - backupStore = &persistencemocks.BackupStore{} + fakeClient = velerotest.NewFakeControllerRuntimeClient(t) + restorer = &fakeRestorer{kbClient: fakeClient} + logger = velerotest.NewLogger() + pluginManager = &pluginmocks.Manager{} + backupStore = &persistencemocks.BackupStore{} ) defer restorer.AssertExpectations(t) defer backupStore.AssertExpectations(t) - c := NewRestoreController( + r := NewRestoreReconciler( + context.Background(), velerov1api.DefaultNamespace, - sharedInformers.Velero().V1().Restores(), restorer, fakeClient, logger, @@ -114,15 +111,15 @@ func TestFetchBackupInfo(t *testing.T) { NewFakeSingleObjectBackupStoreGetter(backupStore), metrics.NewServerMetrics(), formatFlag, - ).(*restoreController) + ) if test.backupStoreError == nil { for _, itm := range test.informerLocations { - require.NoError(t, c.kbClient.Create(context.Background(), itm)) + require.NoError(t, r.kbClient.Create(context.Background(), itm)) } for _, itm := range test.informerBackups { - assert.NoError(t, c.kbClient.Create(context.Background(), itm)) + assert.NoError(t, r.kbClient.Create(context.Background(), itm)) } } @@ -139,7 +136,7 @@ func TestFetchBackupInfo(t *testing.T) { backupStore.On("GetBackupMetadata", test.backupName).Return(test.backupStoreBackup, nil).Maybe() } - info, err := c.fetchBackupInfo(test.backupName, pluginManager) + info, err := r.fetchBackupInfo(test.backupName, pluginManager) require.Equal(t, test.expectedErr, err != nil) if test.expectedRes != nil { @@ -152,34 +149,23 @@ func TestFetchBackupInfo(t *testing.T) { func TestProcessQueueItemSkips(t *testing.T) { tests := []struct { name string - restoreKey string + namespace string + restoreName string restore *velerov1api.Restore expectError bool }{ { - name: "invalid key returns error", - restoreKey: "invalid/key/value", + name: "invalid key returns error", + namespace: "invalid", + restoreName: "key/value", + expectError: true, }, { name: "missing restore returns error", - restoreKey: "foo/bar", + namespace: "foo", + restoreName: "bar", expectError: true, }, - { - name: "restore with phase InProgress does not get processed", - restoreKey: "foo/bar", - restore: builder.ForRestore("foo", "bar").Phase(velerov1api.RestorePhaseInProgress).Result(), - }, - { - name: "restore with phase Completed does not get processed", - restoreKey: "foo/bar", - restore: builder.ForRestore("foo", "bar").Phase(velerov1api.RestorePhaseCompleted).Result(), - }, - { - name: "restore with phase FailedValidation does not get processed", - restoreKey: "foo/bar", - restore: builder.ForRestore("foo", "bar").Phase(velerov1api.RestorePhaseFailedValidation).Result(), - }, } formatFlag := logging.FormatText @@ -187,16 +173,18 @@ func TestProcessQueueItemSkips(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { var ( - client = fake.NewSimpleClientset() - fakeClient = velerotest.NewFakeControllerRuntimeClient(t) - restorer = &fakeRestorer{kbClient: fakeClient} - sharedInformers = informers.NewSharedInformerFactory(client, 0) - logger = velerotest.NewLogger() + fakeClient = velerotest.NewFakeControllerRuntimeClient(t) + restorer = &fakeRestorer{kbClient: fakeClient} + logger = velerotest.NewLogger() ) - c := NewRestoreController( + if test.restore != nil { + assert.Nil(t, fakeClient.Create(context.Background(), test.restore)) + } + + r := NewRestoreReconciler( + context.Background(), velerov1api.DefaultNamespace, - sharedInformers.Velero().V1().Restores(), restorer, fakeClient, logger, @@ -205,20 +193,19 @@ func TestProcessQueueItemSkips(t *testing.T) { nil, // backupStoreGetter metrics.NewServerMetrics(), formatFlag, - ).(*restoreController) - - if test.restore != nil { - c.kbClient.Create(context.Background(), test.restore) - } + ) - err := c.processQueueItem(test.restoreKey) + _, err := r.Reconcile(context.Background(), ctrl.Request{NamespacedName: types.NamespacedName{ + Namespace: test.namespace, + Name: test.restoreName, + }}) assert.Equal(t, test.expectError, err != nil) }) } } -func TestProcessQueueItem(t *testing.T) { +func TestRestoreReconcile(t *testing.T) { defaultStorageLocation := builder.ForBackupStorageLocation("velero", "default").Provider("myCloud").Bucket("bucket").Result() @@ -410,13 +397,11 @@ func TestProcessQueueItem(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { var ( - client = fake.NewSimpleClientset() - fakeClient = velerotest.NewFakeControllerRuntimeClientBuilder(t).Build() - restorer = &fakeRestorer{kbClient: fakeClient} - sharedInformers = informers.NewSharedInformerFactory(client, 0) - logger = velerotest.NewLogger() - pluginManager = &pluginmocks.Manager{} - backupStore = &persistencemocks.BackupStore{} + fakeClient = velerotest.NewFakeControllerRuntimeClientBuilder(t).Build() + restorer = &fakeRestorer{kbClient: fakeClient} + logger = velerotest.NewLogger() + pluginManager = &pluginmocks.Manager{} + backupStore = &persistencemocks.BackupStore{} ) defer restorer.AssertExpectations(t) @@ -426,9 +411,9 @@ func TestProcessQueueItem(t *testing.T) { defaultStorageLocation.ObjectMeta.ResourceVersion = "" }() - c := NewRestoreController( + r := NewRestoreReconciler( + context.Background(), velerov1api.DefaultNamespace, - sharedInformers.Velero().V1().Restores(), restorer, fakeClient, logger, @@ -437,18 +422,18 @@ func TestProcessQueueItem(t *testing.T) { NewFakeSingleObjectBackupStoreGetter(backupStore), metrics.NewServerMetrics(), formatFlag, - ).(*restoreController) + ) - c.clock = clocktesting.NewFakeClock(now) + r.clock = clocktesting.NewFakeClock(now) if test.location != nil { - require.NoError(t, fakeClient.Create(context.Background(), test.location)) + require.NoError(t, r.kbClient.Create(context.Background(), test.location)) } if test.backup != nil { - assert.NoError(t, c.kbClient.Create(context.Background(), test.backup)) + assert.NoError(t, r.kbClient.Create(context.Background(), test.backup)) } if test.restore != nil { - require.NoError(t, c.kbClient.Create(context.Background(), test.restore)) + require.NoError(t, r.kbClient.Create(context.Background(), test.restore)) } var warnings, errors results.Result @@ -479,17 +464,6 @@ func TestProcessQueueItem(t *testing.T) { backupStore.On("GetBackupVolumeSnapshots", test.backup.Name).Return(volumeSnapshots, nil) } - var ( - key = test.restoreKey - err error - ) - if key == "" && test.restore != nil { - key, err = cache.MetaNamespaceKeyFunc(test.restore) - if err != nil { - panic(err) - } - } - if test.backupStoreGetBackupMetadataErr != nil { // TODO why do I need .Maybe() here? backupStore.On("GetBackupMetadata", test.restore.Spec.BackupName).Return(nil, test.backupStoreGetBackupMetadataErr).Maybe() @@ -506,7 +480,11 @@ func TestProcessQueueItem(t *testing.T) { pluginManager.On("CleanupClients") } - err = c.processQueueItem(key) + //err = r.processQueueItem(key) + _, err = r.Reconcile(context.Background(), ctrl.Request{NamespacedName: types.NamespacedName{ + Namespace: test.restore.Namespace, + Name: test.restore.Name, + }}) assert.Equal(t, test.expectedErr, err != nil, "got error %v", err) @@ -590,26 +568,24 @@ func TestValidateAndCompleteWhenScheduleNameSpecified(t *testing.T) { formatFlag := logging.FormatText var ( - client = fake.NewSimpleClientset() - sharedInformers = informers.NewSharedInformerFactory(client, 0) - logger = velerotest.NewLogger() - pluginManager = &pluginmocks.Manager{} - fakeClient = velerotest.NewFakeControllerRuntimeClient(t) - backupStore = &persistencemocks.BackupStore{} + logger = velerotest.NewLogger() + pluginManager = &pluginmocks.Manager{} + fakeClient = velerotest.NewFakeControllerRuntimeClient(t) + backupStore = &persistencemocks.BackupStore{} ) - c := NewRestoreController( + r := NewRestoreReconciler( + context.Background(), velerov1api.DefaultNamespace, - sharedInformers.Velero().V1().Restores(), nil, fakeClient, logger, logrus.DebugLevel, func(logrus.FieldLogger) clientmgmt.Manager { return pluginManager }, NewFakeSingleObjectBackupStoreGetter(backupStore), - nil, + metrics.NewServerMetrics(), formatFlag, - ).(*restoreController) + ) restore := &velerov1api.Restore{ ObjectMeta: metav1.ObjectMeta{ @@ -622,19 +598,18 @@ func TestValidateAndCompleteWhenScheduleNameSpecified(t *testing.T) { } // no backups created from the schedule: fail validation - require.NoError(t, sharedInformers.Velero().V1().Backups().Informer().GetStore().Add( - defaultBackup(). - ObjectMeta(builder.WithLabels(velerov1api.ScheduleNameLabel, "non-matching-schedule")). - Phase(velerov1api.BackupPhaseCompleted). - Result(), - )) + require.NoError(t, r.kbClient.Create(context.Background(), defaultBackup(). + ObjectMeta(builder.WithLabels(velerov1api.ScheduleNameLabel, "non-matching-schedule")). + Phase(velerov1api.BackupPhaseCompleted). + Result())) - c.validateAndComplete(restore, pluginManager) + r.validateAndComplete(restore, pluginManager) assert.Contains(t, restore.Status.ValidationErrors, "No backups found for schedule") assert.Empty(t, restore.Spec.BackupName) // no completed backups created from the schedule: fail validation - require.NoError(t, sharedInformers.Velero().V1().Backups().Informer().GetStore().Add( + require.NoError(t, r.kbClient.Create( + context.Background(), defaultBackup(). ObjectMeta( builder.WithName("backup-2"), @@ -644,14 +619,14 @@ func TestValidateAndCompleteWhenScheduleNameSpecified(t *testing.T) { Result(), )) - c.validateAndComplete(restore, pluginManager) + r.validateAndComplete(restore, pluginManager) assert.Contains(t, restore.Status.ValidationErrors, "No completed backups found for schedule") assert.Empty(t, restore.Spec.BackupName) // multiple completed backups created from the schedule: use most recent now := time.Now() - require.NoError(t, c.kbClient.Create(context.Background(), + require.NoError(t, r.kbClient.Create(context.Background(), defaultBackup(). ObjectMeta( builder.WithName("foo"), @@ -664,7 +639,7 @@ func TestValidateAndCompleteWhenScheduleNameSpecified(t *testing.T) { )) location := builder.ForBackupStorageLocation("velero", "default").Provider("myCloud").Bucket("bucket").Result() - require.NoError(t, c.kbClient.Create(context.Background(), location)) + require.NoError(t, r.kbClient.Create(context.Background(), location)) restore = &velerov1api.Restore{ ObjectMeta: metav1.ObjectMeta{ @@ -675,7 +650,7 @@ func TestValidateAndCompleteWhenScheduleNameSpecified(t *testing.T) { ScheduleName: "schedule-1", }, } - c.validateAndComplete(restore, pluginManager) + r.validateAndComplete(restore, pluginManager) assert.Nil(t, restore.Status.ValidationErrors) assert.Equal(t, "foo", restore.Spec.BackupName) } diff --git a/pkg/util/kube/predicate.go b/pkg/util/kube/predicate.go index 7bda388689..9ac9d9894e 100644 --- a/pkg/util/kube/predicate.go +++ b/pkg/util/kube/predicate.go @@ -115,3 +115,20 @@ func (f FalsePredicate) Update(event.UpdateEvent) bool { func (f FalsePredicate) Generic(event.GenericEvent) bool { return false } + +func NewCreateEventPredicate(f func(client.Object) bool) predicate.Predicate { + return predicate.Funcs{ + CreateFunc: func(event event.CreateEvent) bool { + return f(event.Object) + }, + DeleteFunc: func(event event.DeleteEvent) bool { + return false + }, + GenericFunc: func(event event.GenericEvent) bool { + return false + }, + UpdateFunc: func(event event.UpdateEvent) bool { + return false + }, + } +}