Skip to content

Commit

Permalink
Use restmapper to identify scope of the object
Browse files Browse the repository at this point in the history
Modify multinamespaced cache to accept restmapper, which
can be used to identify the scope of the object and handle
the cluster scoped objects accordingly.
  • Loading branch information
varshaprasad96 committed Apr 26, 2021
1 parent be66476 commit 6a42f89
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 81 deletions.
66 changes: 33 additions & 33 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,39 +494,39 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
err := informerCache.Get(context.Background(), svcKey, svc)
Expect(err).To(HaveOccurred())
})
// It("test multinamespaced cache for cluster scoped resources", func() {
// By("creating a multinamespaced cache to watch specific namespaces")
// multi := cache.MultiNamespacedCacheBuilder([]string{"default", testNamespaceOne})
// m, err := multi(cfg, cache.Options{})
// Expect(err).NotTo(HaveOccurred())

// By("running the cache and waiting it for sync")
// go func() {
// defer GinkgoRecover()
// Expect(m.Start(informerCacheCtx)).To(Succeed())
// }()
// Expect(m.WaitForCacheSync(informerCacheCtx)).NotTo(BeFalse())

// By("should be able to fetch cluster scoped resource")
// node := &kcorev1.Node{}

// By("verifying that getting the node works with an empty namespace")
// key1 := client.ObjectKey{Namespace: "", Name: testNodeOne}
// Expect(m.Get(context.Background(), key1, node)).To(Succeed())

// By("verifying if the cluster scoped resources are not duplicated")
// nodeList := &unstructured.UnstructuredList{}
// nodeList.SetGroupVersionKind(schema.GroupVersionKind{
// Group: "",
// Version: "v1",
// Kind: "NodeList",
// })
// Expect(m.List(context.Background(), nodeList)).To(Succeed())

// By("verifying the node list is not empty")
// Expect(nodeList.Items).NotTo(BeEmpty())
// Expect(len(nodeList.Items)).To(BeEquivalentTo(1))
// })
It("test multinamespaced cache for cluster scoped resources", func() {
By("creating a multinamespaced cache to watch specific namespaces")
multi := cache.MultiNamespacedCacheBuilder([]string{"default", testNamespaceOne})
m, err := multi(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())

By("running the cache and waiting it for sync")
go func() {
defer GinkgoRecover()
Expect(m.Start(informerCacheCtx)).To(Succeed())
}()
Expect(m.WaitForCacheSync(informerCacheCtx)).NotTo(BeFalse())

By("should be able to fetch cluster scoped resource")
node := &kcorev1.Node{}

By("verifying that getting the node works with an empty namespace")
key1 := client.ObjectKey{Namespace: "", Name: testNodeOne}
Expect(m.Get(context.Background(), key1, node)).To(Succeed())

By("verifying if the cluster scoped resources are not duplicated")
nodeList := &unstructured.UnstructuredList{}
nodeList.SetGroupVersionKind(schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "NodeList",
})
Expect(m.List(context.Background(), nodeList)).To(Succeed())

By("verifying the node list is not empty")
Expect(nodeList.Items).NotTo(BeEmpty())
Expect(len(nodeList.Items)).To(BeEquivalentTo(1))
})
})
Context("with metadata-only objects", func() {
It("should be able to list objects that haven't been watched previously", func() {
Expand Down
72 changes: 24 additions & 48 deletions pkg/cache/multi_namespace_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/client-go/rest"
toolscache "k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/internal/objectutil"
)

// NewCacheFunc - Function for creating a new cache from the options and a rest config
Expand Down Expand Up @@ -134,13 +135,16 @@ func (c *multiNamespaceCache) IndexField(ctx context.Context, obj client.Object,
}

func (c *multiNamespaceCache) Get(ctx context.Context, key client.ObjectKey, obj client.Object) error {
// gvk := obj.GetObjectKind().GroupVersionKind()
// mapping, _ := c.RESTMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
// if mapping.Scope.Name() == meta.RESTScopeNameRoot {
// // Look into the global cache to fetch the object
// cache := c.namespaceToCache[globalCache]
// return cache.Get(ctx, key, obj)
// }
isNamespaced, err := objectutil.IsNamespacedObject(obj, c.Scheme, c.RESTMapper)
if err != nil {
return err
}

if !isNamespaced {
// Look into the global cache to fetch the object
cache := c.namespaceToCache[globalCache]
return cache.Get(ctx, key, obj)
}

cache, ok := c.namespaceToCache[key.Namespace]
if !ok {
Expand All @@ -154,32 +158,23 @@ func (c *multiNamespaceCache) List(ctx context.Context, list client.ObjectList,
listOpts := client.ListOptions{}
listOpts.ApplyOptions(opts)

// handle cluster scoped objects by looking into global cache
// gvk := list.GetObjectKind().GroupVersionKind()
// mapping, _ := c.RESTMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
// if mapping.Scope.Name() == meta.RESTScopeNameRoot {
// // Look at the global cache to get the objects with the specified GVK
// cache := c.namespaceToCache[globalCache]
// err := cache.List(ctx, list, opts...)
// if err != nil {
// return err
// }
// }
isNamespaced, err := objectutil.IsNamespacedObject(list, c.Scheme, c.RESTMapper)
if err != nil {
return err
}

if !isNamespaced {
// Look at the global cache to get the objects with the specified GVK
cache := c.namespaceToCache[globalCache]
return cache.List(ctx, list, opts...)
}

if listOpts.Namespace != corev1.NamespaceAll {
cache, ok := c.namespaceToCache[listOpts.Namespace]
if !ok {
return fmt.Errorf("unable to get: %v because of unknown namespace for the cache", listOpts.Namespace)
}
err := cache.List(ctx, list, opts...)
if err != nil {
return err
}
items, err := apimeta.ExtractList(list)
if err != nil {
return err
}
uniqueItems := removeDuplicates(items)
return apimeta.SetList(list, uniqueItems)
return cache.List(ctx, list, opts...)
}

listAccessor, err := meta.ListAccessor(list)
Expand Down Expand Up @@ -210,28 +205,9 @@ func (c *multiNamespaceCache) List(ctx context.Context, list client.ObjectList,
// The last list call should have the most correct resource version.
resourceVersion = accessor.GetResourceVersion()
}

uniqueItems := removeDuplicates(allItems)
listAccessor.SetResourceVersion(resourceVersion)

return apimeta.SetList(list, uniqueItems)
}

// removeDuplicates removes the duplicate objects obtained from all namespaces so that
// the resulting list has objects with unique name and namespace.
func removeDuplicates(items []runtime.Object) []runtime.Object {
objects := make(map[string]bool)
unique := []runtime.Object{}

for _, obj := range items {
metaObj, _ := meta.Accessor(obj)
key := metaObj.GetName() + " " + metaObj.GetNamespace()
if _, value := objects[key]; !value {
objects[key] = true
unique = append(unique, obj)
}
}
return unique
return apimeta.SetList(list, allItems)
}

// multiNamespaceInformer knows how to handle interacting with the underlying informer across multiple namespaces
Expand Down
44 changes: 44 additions & 0 deletions pkg/internal/objectutil/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@ limitations under the License.
package objectutil

import (
"errors"
"fmt"

"k8s.io/apimachinery/pkg/api/meta"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)

// FilterWithLabels returns a copy of the items in objs matching labelSel
Expand All @@ -40,3 +47,40 @@ func FilterWithLabels(objs []runtime.Object, labelSel labels.Selector) ([]runtim
}
return outItems, nil
}

// IsNamespacedObject returns true if the object is namespace scoped.
// For unstructured objects the gvk is found from the object itself.
func IsNamespacedObject(obj runtime.Object, scheme *runtime.Scheme, restmapper apimeta.RESTMapper) (bool, error) {
var gvk schema.GroupVersionKind
var err error

_, isUnstructured := obj.(*unstructured.Unstructured)
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)

isUnstructured = isUnstructured || isUnstructuredList

if isUnstructured {
gvk = obj.GetObjectKind().GroupVersionKind()
} else {
gvk, err = apiutil.GVKForObject(obj, scheme)
if err != nil {
return false, err
}
}

restmapping, err := restmapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind})
if err != nil {
return false, fmt.Errorf("failed to get restmapping: %w", err)
}

scope := restmapping.Scope.Name()

if scope == "" {
return false, errors.New("Scope cannot be identified. Empty scope returned")
}

if scope != meta.RESTScopeNameRoot {
return true, nil
}
return false, nil
}

0 comments on commit 6a42f89

Please sign in to comment.