From 76b72c189b1602939ddbbb3e800fe372ce2a4329 Mon Sep 17 00:00:00 2001 From: Scott Seago Date: Mon, 21 Aug 2023 11:36:46 -0400 Subject: [PATCH] Perf improvements for existing resource restore Use informer cache with dynamic client for Get calls on restore When enabled, also make the Get call before create. Add server and install parameter to allow disabling this feature, but enable by default Signed-off-by: Scott Seago --- pkg/client/dynamic.go | 8 + pkg/cmd/cli/install/install.go | 4 + pkg/cmd/server/server.go | 4 + pkg/controller/restore_controller.go | 18 ++- pkg/controller/restore_controller_test.go | 5 + pkg/install/deployment.go | 15 +- pkg/install/deployment_test.go | 7 + pkg/install/resources.go | 3 + pkg/restore/request.go | 17 ++- pkg/restore/restore.go | 169 ++++++++++++++++++++-- pkg/test/api_server.go | 1 + pkg/test/fake_dynamic.go | 7 + 12 files changed, 231 insertions(+), 27 deletions(-) diff --git a/pkg/client/dynamic.go b/pkg/client/dynamic.go index 8fcfab107b9..0e9655b11be 100644 --- a/pkg/client/dynamic.go +++ b/pkg/client/dynamic.go @@ -18,6 +18,7 @@ package client import ( "context" + "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -25,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" ) // DynamicFactory contains methods for retrieving dynamic clients for GroupVersionResources and @@ -33,6 +35,8 @@ type DynamicFactory interface { // ClientForGroupVersionResource returns a Dynamic client for the given group/version // and resource for the given namespace. ClientForGroupVersionResource(gv schema.GroupVersion, resource metav1.APIResource, namespace string) (Dynamic, error) + // DynamicSharedInformerFactoryForNamespace returns a DynamicSharedInformerFactory for the given namespace. + DynamicSharedInformerFactoryForNamespace(namespace string) dynamicinformer.DynamicSharedInformerFactory } // dynamicFactory implements DynamicFactory. @@ -51,6 +55,10 @@ func (f *dynamicFactory) ClientForGroupVersionResource(gv schema.GroupVersion, r }, nil } +func (f *dynamicFactory) DynamicSharedInformerFactoryForNamespace(namespace string) dynamicinformer.DynamicSharedInformerFactory { + return dynamicinformer.NewFilteredDynamicSharedInformerFactory(f.dynamicClient, time.Minute, namespace, nil) +} + // Creator creates an object. type Creator interface { // Create creates an object. diff --git a/pkg/cmd/cli/install/install.go b/pkg/cmd/cli/install/install.go index 95f32979368..f67a7b9b7f9 100644 --- a/pkg/cmd/cli/install/install.go +++ b/pkg/cmd/cli/install/install.go @@ -79,6 +79,7 @@ type Options struct { Features string DefaultVolumesToFsBackup bool UploaderType string + UseInformerCacheForGet bool } // BindFlags adds command line values to the options struct. @@ -118,6 +119,7 @@ func (o *Options) BindFlags(flags *pflag.FlagSet) { flags.StringVar(&o.Features, "features", o.Features, "Comma separated list of Velero feature flags to be set on the Velero deployment and the node-agent daemonset, if node-agent is enabled") flags.BoolVar(&o.DefaultVolumesToFsBackup, "default-volumes-to-fs-backup", o.DefaultVolumesToFsBackup, "Bool flag to configure Velero server to use pod volume file system backup by default for all volumes on all backups. Optional.") flags.StringVar(&o.UploaderType, "uploader-type", o.UploaderType, fmt.Sprintf("The type of uploader to transfer the data of pod volumes, the supported values are '%s', '%s'", uploader.ResticType, uploader.KopiaType)) + flags.BoolVar(&o.UseInformerCacheForGet, "use-informer-cache-for-get", o.UseInformerCacheForGet, "Use informer cache for Get calls on restore. This will speed up restore in cases where there are backup resources which already exist in the cluster, but for very large clusters this will increase velero memory usage. Default is true. Optional.") } // NewInstallOptions instantiates a new, default InstallOptions struct. @@ -144,6 +146,7 @@ func NewInstallOptions() *Options { CRDsOnly: false, DefaultVolumesToFsBackup: false, UploaderType: uploader.KopiaType, + UseInformerCacheForGet: true, } } @@ -206,6 +209,7 @@ func (o *Options) AsVeleroOptions() (*install.VeleroOptions, error) { Features: strings.Split(o.Features, ","), DefaultVolumesToFsBackup: o.DefaultVolumesToFsBackup, UploaderType: o.UploaderType, + UseInformerCacheForGet: o.UseInformerCacheForGet, }, nil } diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 20e14e36861..7d0208b7fec 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -135,6 +135,7 @@ type serverConfig struct { defaultVolumesToFsBackup bool uploaderType string maxConcurrentK8SConnections int + useInformerCacheForGet bool } func NewCommand(f client.Factory) *cobra.Command { @@ -163,6 +164,7 @@ func NewCommand(f client.Factory) *cobra.Command { defaultVolumesToFsBackup: podvolume.DefaultVolumesToFsBackup, uploaderType: uploader.ResticType, maxConcurrentK8SConnections: defaultMaxConcurrentK8SConnections, + useInformerCacheForGet: true, } ) @@ -233,6 +235,7 @@ func NewCommand(f client.Factory) *cobra.Command { command.Flags().DurationVar(&config.defaultItemOperationTimeout, "default-item-operation-timeout", config.defaultItemOperationTimeout, "How long to wait on asynchronous BackupItemActions and RestoreItemActions to complete before timing out. Default is 4 hours") command.Flags().DurationVar(&config.resourceTimeout, "resource-timeout", config.resourceTimeout, "How long to wait for resource processes which are not covered by other specific timeout parameters. Default is 10 minutes.") command.Flags().IntVar(&config.maxConcurrentK8SConnections, "max-concurrent-k8s-connections", config.maxConcurrentK8SConnections, "Max concurrent connections number that Velero can create with kube-apiserver. Default is 30.") + command.Flags().BoolVar(&config.useInformerCacheForGet, "use-informer-cache-for-get", config.useInformerCacheForGet, "Use informer cache for Get calls on restore. This will speed up restore in cases where there are backup resources which already exist in the cluster, but for very large clusters this will increase velero memory usage. Default is true") return command } @@ -933,6 +936,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string s.metrics, s.config.formatFlag.Parse(), s.config.defaultItemOperationTimeout, + s.config.useInformerCacheForGet, ) if err = r.SetupWithManager(s.mgr); err != nil { diff --git a/pkg/controller/restore_controller.go b/pkg/controller/restore_controller.go index 076baf85cb2..a9bdd40a616 100644 --- a/pkg/controller/restore_controller.go +++ b/pkg/controller/restore_controller.go @@ -101,6 +101,7 @@ type restoreReconciler struct { logFormat logging.Format clock clock.WithTickerAndDelayedExecution defaultItemOperationTimeout time.Duration + useInformerCacheForGet bool newPluginManager func(logger logrus.FieldLogger) clientmgmt.Manager backupStoreGetter persistence.ObjectBackupStoreGetter @@ -123,6 +124,7 @@ func NewRestoreReconciler( metrics *metrics.ServerMetrics, logFormat logging.Format, defaultItemOperationTimeout time.Duration, + useInformerCacheForGet bool, ) *restoreReconciler { r := &restoreReconciler{ ctx: ctx, @@ -135,6 +137,7 @@ func NewRestoreReconciler( logFormat: logFormat, clock: &clock.RealClock{}, defaultItemOperationTimeout: defaultItemOperationTimeout, + useInformerCacheForGet: useInformerCacheForGet, // use variables to refer to these functions so they can be // replaced with fakes for testing. @@ -519,13 +522,14 @@ func (r *restoreReconciler) runValidatedRestore(restore *api.Restore, info backu } restoreReq := &pkgrestore.Request{ - Log: restoreLog, - Restore: restore, - Backup: info.backup, - PodVolumeBackups: podVolumeBackups, - VolumeSnapshots: volumeSnapshots, - BackupReader: backupFile, - ResourceModifiers: resourceModifiers, + Log: restoreLog, + Restore: restore, + Backup: info.backup, + PodVolumeBackups: podVolumeBackups, + VolumeSnapshots: volumeSnapshots, + BackupReader: backupFile, + ResourceModifiers: resourceModifiers, + UseInformerCacheForGet: r.useInformerCacheForGet, } restoreWarnings, restoreErrors := r.restorer.RestoreWithResolvers(restoreReq, actionsResolver, pluginManager) diff --git a/pkg/controller/restore_controller_test.go b/pkg/controller/restore_controller_test.go index 009bccbb51a..ccbe5d4cc9c 100644 --- a/pkg/controller/restore_controller_test.go +++ b/pkg/controller/restore_controller_test.go @@ -114,6 +114,7 @@ func TestFetchBackupInfo(t *testing.T) { metrics.NewServerMetrics(), formatFlag, 60*time.Minute, + true, ) if test.backupStoreError == nil { @@ -191,6 +192,7 @@ func TestProcessQueueItemSkips(t *testing.T) { metrics.NewServerMetrics(), formatFlag, 60*time.Minute, + true, ) _, err := r.Reconcile(context.Background(), ctrl.Request{NamespacedName: types.NamespacedName{ @@ -445,6 +447,7 @@ func TestRestoreReconcile(t *testing.T) { metrics.NewServerMetrics(), formatFlag, 60*time.Minute, + true, ) r.clock = clocktesting.NewFakeClock(now) @@ -616,6 +619,7 @@ func TestValidateAndCompleteWhenScheduleNameSpecified(t *testing.T) { metrics.NewServerMetrics(), formatFlag, 60*time.Minute, + true, ) restore := &velerov1api.Restore{ @@ -708,6 +712,7 @@ func TestValidateAndCompleteWithResourceModifierSpecified(t *testing.T) { metrics.NewServerMetrics(), formatFlag, 60*time.Minute, + true, ) restore := &velerov1api.Restore{ diff --git a/pkg/install/deployment.go b/pkg/install/deployment.go index 22e2e5a4dd0..e7ab8d37989 100644 --- a/pkg/install/deployment.go +++ b/pkg/install/deployment.go @@ -46,6 +46,7 @@ type podTemplateConfig struct { defaultVolumesToFsBackup bool serviceAccountName string uploaderType string + useInformerCacheForGet bool } func WithImage(image string) podTemplateOption { @@ -136,6 +137,12 @@ func WithDefaultVolumesToFsBackup() podTemplateOption { } } +func WithUseInformerCacheForGet(useCache bool) podTemplateOption { + return func(c *podTemplateConfig) { + c.useInformerCacheForGet = useCache + } +} + func WithServiceAccountName(sa string) podTemplateOption { return func(c *podTemplateConfig) { c.serviceAccountName = sa @@ -145,7 +152,8 @@ func WithServiceAccountName(sa string) podTemplateOption { func Deployment(namespace string, opts ...podTemplateOption) *appsv1.Deployment { // TODO: Add support for server args c := &podTemplateConfig{ - image: velero.DefaultVeleroImage(), + image: velero.DefaultVeleroImage(), + useInformerCacheForGet: true, } for _, opt := range opts { @@ -167,6 +175,11 @@ func Deployment(namespace string, opts ...podTemplateOption) *appsv1.Deployment args = append(args, "--default-volumes-to-fs-backup=true") } + // default is true, so set if false + if !c.useInformerCacheForGet { + args = append(args, "--use-informer-cache-for-get=false") + } + if len(c.uploaderType) > 0 { args = append(args, fmt.Sprintf("--uploader-type=%s", c.uploaderType)) } diff --git a/pkg/install/deployment_test.go b/pkg/install/deployment_test.go index 05099f19b09..bb8c40a8c76 100644 --- a/pkg/install/deployment_test.go +++ b/pkg/install/deployment_test.go @@ -64,4 +64,11 @@ func TestDeployment(t *testing.T) { deploy = Deployment("velero", WithServiceAccountName("test-sa")) assert.Equal(t, "test-sa", deploy.Spec.Template.Spec.ServiceAccountName) + + deploy = Deployment("velero", WithUseInformerCacheForGet(false)) + assert.Len(t, deploy.Spec.Template.Spec.Containers[0].Args, 2) + assert.Equal(t, "--use-informer-cache-for-get=false", deploy.Spec.Template.Spec.Containers[0].Args[1]) + + deploy = Deployment("velero", WithUseInformerCacheForGet(true)) + assert.Len(t, deploy.Spec.Template.Spec.Containers[0].Args, 1) } diff --git a/pkg/install/resources.go b/pkg/install/resources.go index d7014c22c43..89e0dc33219 100644 --- a/pkg/install/resources.go +++ b/pkg/install/resources.go @@ -243,6 +243,7 @@ type VeleroOptions struct { Features []string DefaultVolumesToFsBackup bool UploaderType string + UseInformerCacheForGet bool } func AllCRDs() *unstructured.UnstructuredList { @@ -343,6 +344,8 @@ func AllResources(o *VeleroOptions) *unstructured.UnstructuredList { deployOpts = append(deployOpts, WithDefaultVolumesToFsBackup()) } + deployOpts = append(deployOpts, WithUseInformerCacheForGet(o.UseInformerCacheForGet)) + deploy := Deployment(o.Namespace, deployOpts...) if err := appendUnstructured(resources, deploy); err != nil { diff --git a/pkg/restore/request.go b/pkg/restore/request.go index 8f49395c8c8..99a32b32d5e 100644 --- a/pkg/restore/request.go +++ b/pkg/restore/request.go @@ -51,14 +51,15 @@ func resourceKey(obj runtime.Object) string { type Request struct { *velerov1api.Restore - Log logrus.FieldLogger - Backup *velerov1api.Backup - PodVolumeBackups []*velerov1api.PodVolumeBackup - VolumeSnapshots []*volume.Snapshot - BackupReader io.Reader - RestoredItems map[itemKey]restoredItemStatus - itemOperationsList *[]*itemoperation.RestoreOperation - ResourceModifiers *resourcemodifiers.ResourceModifiers + Log logrus.FieldLogger + Backup *velerov1api.Backup + PodVolumeBackups []*velerov1api.PodVolumeBackup + VolumeSnapshots []*volume.Snapshot + BackupReader io.Reader + RestoredItems map[itemKey]restoredItemStatus + itemOperationsList *[]*itemoperation.RestoreOperation + ResourceModifiers *resourcemodifiers.ResourceModifiers + UseInformerCacheForGet bool } type restoredItemStatus struct { diff --git a/pkg/restore/restore.go b/pkg/restore/restore.go index 2cc9c562242..0ed13011f8c 100644 --- a/pkg/restore/restore.go +++ b/pkg/restore/restore.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "os" + "os/signal" "path/filepath" "sort" "strings" @@ -42,6 +43,8 @@ import ( kubeerrs "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/informers" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" crclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -294,6 +297,8 @@ func (kr *kubernetesRestorer) RestoreWithResolvers( resourceTerminatingTimeout: kr.resourceTerminatingTimeout, resourceTimeout: kr.resourceTimeout, resourceClients: make(map[resourceClientKey]client.Dynamic), + dynamicInformerFactories: make(map[string]*informerFactoryWithContext), + resourceInformers: make(map[resourceClientKey]informers.GenericInformer), restoredItems: req.RestoredItems, renamedPVs: make(map[string]string), pvRenamer: kr.pvRenamer, @@ -307,6 +312,7 @@ func (kr *kubernetesRestorer) RestoreWithResolvers( kbClient: kr.kbClient, itemOperationsList: req.GetItemOperationsList(), resourceModifiers: req.ResourceModifiers, + useInformerCacheForGet: req.UseInformerCacheForGet, } return restoreCtx.execute() @@ -339,6 +345,8 @@ type restoreContext struct { resourceTerminatingTimeout time.Duration resourceTimeout time.Duration resourceClients map[resourceClientKey]client.Dynamic + dynamicInformerFactories map[string]*informerFactoryWithContext + resourceInformers map[resourceClientKey]informers.GenericInformer restoredItems map[itemKey]restoredItemStatus renamedPVs map[string]string pvRenamer func(string) (string, error) @@ -353,6 +361,7 @@ type restoreContext struct { kbClient crclient.Client itemOperationsList *[]*itemoperation.RestoreOperation resourceModifiers *resourcemodifiers.ResourceModifiers + useInformerCacheForGet bool } type resourceClientKey struct { @@ -360,6 +369,12 @@ type resourceClientKey struct { namespace string } +type informerFactoryWithContext struct { + factory dynamicinformer.DynamicSharedInformerFactory + context go_context.Context + cancel go_context.CancelFunc +} + // getOrderedResources returns an ordered list of resource identifiers to restore, // based on the provided resource priorities and backup contents. The returned list // begins with all of the high prioritized resources (in order), ends with all of @@ -410,6 +425,17 @@ func (ctx *restoreContext) execute() (results.Result, results.Result) { } }() + // Need to stop all informers if enabled + if ctx.useInformerCacheForGet { + defer func() { + // Call the cancel func to close the channel for each started informer + for _, factory := range ctx.dynamicInformerFactories { + factory.cancel() + } + // After upgrading to client-go 0.27 or newer, also call Shutdown for each informer factory + }() + } + // Need to set this for additionalItems to be restored. ctx.restoreDir = dir @@ -514,6 +540,36 @@ func (ctx *restoreContext) execute() (results.Result, results.Result) { warnings.Merge(&w) errs.Merge(&e) + // initialize informer caches for selected resources if enabled + if ctx.useInformerCacheForGet { + // CRD informer will have already been initialized if any CRDs were created, + // but already-initialized informers aren't re-initialized because getGenericInformer + // looks for an existing one first. + factoriesToStart := make(map[string]*informerFactoryWithContext) + for _, informerResource := range selectedResourceCollection { + gr := schema.ParseGroupResource(informerResource.resource) + for _, items := range informerResource.selectedItemsByNamespace { + // don't use ns key since it represents original ns, not mapped ns + if len(items) == 0 { + continue + } + // use the first item in the list to initialize the informer. The rest of the list + // should share the same gvr and namespace + _, factory, err := ctx.getGenericInformerInternal(gr, items[0].version, items[0].targetNamespace) + if err != nil { + warnings.AddVeleroError(errors.Wrapf(err, "Error initializing informer for %v, %v", gr, items[0].targetNamespace)) + continue + } + if factory != nil { + factoriesToStart[items[0].targetNamespace] = factory + } + } + } + for _, factoryWithContext := range factoriesToStart { + factoryWithContext.factory.WaitForCacheSync(factoryWithContext.context.Done()) + } + } + // reset processedItems and totalItems before processing full resource list processedItems = 0 totalItems = 0 @@ -928,11 +984,14 @@ func (ctx *restoreContext) itemsAvailable(action framework.RestoreItemResolvedAc return available, err } -func (ctx *restoreContext) getResourceClient(groupResource schema.GroupResource, obj *unstructured.Unstructured, namespace string) (client.Dynamic, error) { - key := resourceClientKey{ - resource: groupResource.WithVersion(obj.GroupVersionKind().Version), +func getResourceClientKey(groupResource schema.GroupResource, version, namespace string) resourceClientKey { + return resourceClientKey{ + resource: groupResource.WithVersion(version), namespace: namespace, } +} +func (ctx *restoreContext) getResourceClient(groupResource schema.GroupResource, obj *unstructured.Unstructured, namespace string) (client.Dynamic, error) { + key := getResourceClientKey(groupResource, obj.GroupVersionKind().Version, namespace) if client, ok := ctx.resourceClients[key]; ok { return client, nil @@ -956,6 +1015,55 @@ func (ctx *restoreContext) getResourceClient(groupResource schema.GroupResource, return client, nil } +// if new informer is created, non-nil factory is returned +func (ctx *restoreContext) getGenericInformerInternal(groupResource schema.GroupResource, version, namespace string) (informers.GenericInformer, *informerFactoryWithContext, error) { + var returnFactory *informerFactoryWithContext + + key := getResourceClientKey(groupResource, version, namespace) + factoryWithContext, ok := ctx.dynamicInformerFactories[key.namespace] + if !ok { + factory := ctx.dynamicFactory.DynamicSharedInformerFactoryForNamespace(namespace) + informerContext, informerCancel := signal.NotifyContext(go_context.Background(), os.Interrupt) + factoryWithContext = &informerFactoryWithContext{ + factory: factory, + context: informerContext, + cancel: informerCancel, + } + ctx.dynamicInformerFactories[key.namespace] = factoryWithContext + } + informer, ok := ctx.resourceInformers[key] + if !ok { + ctx.log.Infof("[debug] Creating factory for %s in namespace %s", key.resource, key.namespace) + informer = factoryWithContext.factory.ForResource(key.resource) + factoryWithContext.factory.Start(factoryWithContext.context.Done()) + ctx.resourceInformers[key] = informer + returnFactory = factoryWithContext + } + return informer, returnFactory, nil +} + +func (ctx *restoreContext) getGenericInformer(groupResource schema.GroupResource, version, namespace string) (informers.GenericInformer, error) { + informer, factoryWithContext, err := ctx.getGenericInformerInternal(groupResource, version, namespace) + if err != nil { + return nil, err + } + if factoryWithContext != nil { + factoryWithContext.factory.WaitForCacheSync(factoryWithContext.context.Done()) + } + return informer, nil +} +func (ctx *restoreContext) getResourceLister(groupResource schema.GroupResource, obj *unstructured.Unstructured, namespace string) (cache.GenericNamespaceLister, error) { + informer, err := ctx.getGenericInformer(groupResource, obj.GroupVersionKind().Version, namespace) + if err != nil { + return nil, err + } + if namespace == "" { + return informer.Lister(), nil + } else { + return informer.Lister().ByNamespace(namespace), nil + } +} + func getResourceID(groupResource schema.GroupResource, namespace, name string) string { if namespace == "" { return fmt.Sprintf("%s/%s", groupResource.String(), name) @@ -964,6 +1072,23 @@ func getResourceID(groupResource schema.GroupResource, namespace, name string) s return fmt.Sprintf("%s/%s/%s", groupResource.String(), namespace, name) } +func (ctx *restoreContext) getResource(groupResource schema.GroupResource, obj *unstructured.Unstructured, namespace, name string) (*unstructured.Unstructured, error) { + lister, err := ctx.getResourceLister(groupResource, obj, namespace) + if err != nil { + return nil, err + } + clusterObj, err := lister.Get(name) + if err != nil { + return nil, errors.Wrapf(err, "error getting resource from lister for %s, %s/%s", groupResource, namespace, name) + } + u, ok := clusterObj.(*unstructured.Unstructured) + if !ok { + ctx.log.WithError(errors.WithStack(fmt.Errorf("expected *unstructured.Unstructured but got %T", u))).Error("unable to understand entry returned from client") + return nil, fmt.Errorf("expected *unstructured.Unstructured but got %T", u) + } + return u, nil +} + func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupResource schema.GroupResource, namespace string) (results.Result, results.Result, bool) { warnings, errs := results.Result{}, results.Result{} // itemExists bool is used to determine whether to include this item in the "wait for additional items" list @@ -1157,6 +1282,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso ctx.renamedPVs[oldName] = pvName obj.SetName(pvName) + name = pvName // Add the original PV name as an annotation. annotations := obj.GetAnnotations() @@ -1376,27 +1502,46 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso } ctx.log.Infof("Attempting to restore %s: %v", obj.GroupVersionKind().Kind, name) - createdObj, restoreErr := resourceClient.Create(obj) - if restoreErr == nil { - itemExists = true - ctx.restoredItems[itemKey] = restoredItemStatus{action: itemRestoreResultCreated, itemExists: itemExists} + + // check if we want to treat the error as a warning, in some cases the creation call might not get executed due to object API validations + // and Velero might not get the already exists error type but in reality the object already exists + var fromCluster, createdObj *unstructured.Unstructured + var restoreErr error + + // only attempt Get before Create if using informer cache, otherwise this will slow down restore into + // new namespace + if ctx.useInformerCacheForGet { + ctx.log.Infof("Checking for existence %s: %v", obj.GroupVersionKind().Kind, name) + fromCluster, err = ctx.getResource(groupResource, obj, namespace, name) } + if err != nil || fromCluster == nil { + // couldn't find the resource, attempt to create + ctx.log.Infof("Creating %s: %v", obj.GroupVersionKind().Kind, name) + createdObj, restoreErr = resourceClient.Create(obj) + if restoreErr == nil { + itemExists = true + ctx.restoredItems[itemKey] = restoredItemStatus{action: itemRestoreResultCreated, itemExists: itemExists} + } + + } + isAlreadyExistsError, err := isAlreadyExistsError(ctx, obj, restoreErr, resourceClient) if err != nil { errs.Add(namespace, err) return warnings, errs, itemExists } - // check if we want to treat the error as a warning, in some cases the creation call might not get executed due to object API validations - // and Velero might not get the already exists error type but in reality the object already exists - var fromCluster *unstructured.Unstructured if restoreErr != nil { // check for the existence of the object in cluster, if no error then it implies that object exists // and if err then we want to judge whether there is an existing error in the previous creation. // if so, we will return the 'get' error. // otherwise, we will return the original creation error. - fromCluster, err = resourceClient.Get(name, metav1.GetOptions{}) + if ctx.useInformerCacheForGet { + fromCluster, err = ctx.getResource(groupResource, obj, namespace, name) + } else { + fromCluster, err = resourceClient.Get(name, metav1.GetOptions{}) + } if err != nil && isAlreadyExistsError { ctx.log.Errorf("Error retrieving in-cluster version of %s: %v", kube.NamespaceAndName(obj), err) errs.Add(namespace, err) @@ -1941,6 +2086,7 @@ type restoreableItem struct { path string targetNamespace string name string + version string // used for initializing informer cache } // getOrderedResourceCollection iterates over list of ordered resource @@ -2130,6 +2276,7 @@ func (ctx *restoreContext) getSelectedRestoreableItems(resource, targetNamespace path: itemPath, name: item, targetNamespace: targetNamespace, + version: obj.GroupVersionKind().Version, } restorable.selectedItemsByNamespace[originalNamespace] = append(restorable.selectedItemsByNamespace[originalNamespace], selectedItem) diff --git a/pkg/test/api_server.go b/pkg/test/api_server.go index 75cc545d774..c96f742e8b5 100644 --- a/pkg/test/api_server.go +++ b/pkg/test/api_server.go @@ -59,6 +59,7 @@ func NewAPIServer(t *testing.T) *APIServer { {Group: "velero.io", Version: "v1", Resource: "backups"}: "BackupList", {Group: "extensions", Version: "v1", Resource: "deployments"}: "ExtDeploymentsList", {Group: "velero.io", Version: "v1", Resource: "deployments"}: "VeleroDeploymentsList", + {Group: "velero.io", Version: "v2alpha1", Resource: "datauploads"}: "DataUploadsList", }) discoveryClient = &DiscoveryClient{FakeDiscovery: kubeClient.Discovery().(*discoveryfake.FakeDiscovery)} ) diff --git a/pkg/test/fake_dynamic.go b/pkg/test/fake_dynamic.go index df2b2f6e222..ce91d13d578 100644 --- a/pkg/test/fake_dynamic.go +++ b/pkg/test/fake_dynamic.go @@ -22,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic/dynamicinformer" "github.com/vmware-tanzu/velero/pkg/client" ) @@ -37,6 +38,12 @@ func (df *FakeDynamicFactory) ClientForGroupVersionResource(gv schema.GroupVersi return args.Get(0).(client.Dynamic), args.Error(1) } +func (df *FakeDynamicFactory) DynamicSharedInformerFactoryForNamespace(namespace string) dynamicinformer.DynamicSharedInformerFactory { + args := df.Called(namespace) + return args.Get(0).(dynamicinformer.DynamicSharedInformerFactory) + //return dynamicinformer.NewFilteredDynamicSharedInformerFactory(, time.Minute, namespace, nil) +} + type FakeDynamicClient struct { mock.Mock }