Skip to content

Commit

Permalink
Add SelectorsByObject option to cache
Browse files Browse the repository at this point in the history
Controller-Runtime controllers use a cache to subscribe to events from
Kubernetes objects and to read those objects more efficiently by avoiding
to call out to the API. This cache is backed by Kubernetes informers.

The only way to filter this cache is by namespace and resource type.
In cases where a controller is only interested in a small subset of objects
(for example all pods on a node), this might end up not being efficient enough.

This change increase the "pkg/cache" interface adding the
"BuildWithOptins" function and the "Options.SelectorsByObject" option.

The SelectorsByObject restricts the cache's ListWatch to the desired
fields per GVK at the specified object, the map's value must implement
Selector [1] using for example a Set [2]

This is the implementation of the design document [3]

[1] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Selector
[2] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Set
[3] https://github.com/kubernetes-sigs/controller-runtime/blob/master/designs/use-selectors-at-cache.md

Signed-off-by: Quique Llorente <[email protected]>
  • Loading branch information
qinqon committed Apr 27, 2021
1 parent b2c90ab commit cd065bf
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 19 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
SHELL:=/usr/bin/env bash
.DEFAULT_GOAL:=help

export WHAT ?= ./...

# Use GOPROXY environment variable if set
GOPROXY := $(shell go env GOPROXY)
ifeq ($(GOPROXY),)
Expand Down
2 changes: 1 addition & 1 deletion hack/test-all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ source $(dirname ${BASH_SOURCE})/common.sh

header_text "running go test"

go test -race ${MOD_OPT} ./...
go test -race ${MOD_OPT} ${WHAT}

if [[ -n ${ARTIFACTS:-} ]]; then
if grep -Rin '<failure type="Failure">' ${ARTIFACTS}/*; then exit 1; fi
Expand Down
52 changes: 51 additions & 1 deletion pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ type Informer interface {
HasSynced() bool
}

// SelectorsByObject associate a client.Object's GVK to a field/label selector
type SelectorsByObject map[client.Object]internal.Selector

// Options are the optional arguments for creating a new InformersMap object
type Options struct {
// Scheme is the scheme to use for mapping objects to GroupVersionKinds
Expand All @@ -103,6 +106,13 @@ type Options struct {
// Namespace restricts the cache's ListWatch to the desired namespace
// Default watches all namespaces
Namespace string

// SelectorsByObject restricts the cache's ListWatch to the desired
// fields per GVK at the specified object, the map's value must implement
// Selector [1] using for example a Set [2]
// [1] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Selector
// [2] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Set
SelectorsByObject SelectorsByObject
}

var defaultResyncTime = 10 * time.Hour
Expand All @@ -113,10 +123,38 @@ func New(config *rest.Config, opts Options) (Cache, error) {
if err != nil {
return nil, err
}
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
selectorsByGVK, err := convertToSelectorsByGVK(opts.SelectorsByObject, opts.Scheme)
if err != nil {
return nil, err
}
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, selectorsByGVK)
return &informerCache{InformersMap: im}, nil
}

// BuilderWithOptions returns a Cache constructor that will build the a cache
// honoring the options argument, this is useful to specify options like
// SelectorsByObject
// WARNING: if SelectorsByObject is specified. filtered out resources are not
// returned.
func BuilderWithOptions(options Options) NewCacheFunc {
return func(config *rest.Config, opts Options) (Cache, error) {
if opts.Scheme == nil {
opts.Scheme = options.Scheme
}
if opts.Mapper == nil {
opts.Mapper = options.Mapper
}
if opts.Resync == nil {
opts.Resync = options.Resync
}
if opts.Namespace == "" {
opts.Namespace = options.Namespace
}
opts.SelectorsByObject = options.SelectorsByObject
return New(config, opts)
}
}

func defaultOpts(config *rest.Config, opts Options) (Options, error) {
// Use the default Kubernetes Scheme if unset
if opts.Scheme == nil {
Expand All @@ -139,3 +177,15 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) {
}
return opts, nil
}

func convertToSelectorsByGVK(selectorsByObject SelectorsByObject, scheme *runtime.Scheme) (internal.SelectorsByGVK, error) {
selectorsByGVK := internal.SelectorsByGVK{}
for object, selector := range selectorsByObject {
gvk, err := apiutil.GVKForObject(object, scheme)
if err != nil {
return nil, err
}
selectorsByGVK[gvk] = selector
}
return selectorsByGVK, nil
}
131 changes: 124 additions & 7 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ import (
"fmt"

. "github.com/onsi/ginkgo"
. "github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"

kcorev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
kscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
Expand All @@ -42,15 +46,17 @@ const testNamespaceThree = "test-namespace-3"

// TODO(community): Pull these helper functions into testenv.
// Restart policy is included to allow indexing on that field.
func createPod(name, namespace string, restartPolicy kcorev1.RestartPolicy) client.Object {
func createPodWithLabels(name, namespace string, restartPolicy kcorev1.RestartPolicy, labels map[string]string) client.Object {
three := int64(3)
if labels == nil {
labels = map[string]string{}
}
labels["test-label"] = name
pod := &kcorev1.Pod{
ObjectMeta: kmetav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: map[string]string{
"test-label": name,
},
Labels: labels,
},
Spec: kcorev1.PodSpec{
Containers: []kcorev1.Container{{Name: "nginx", Image: "nginx"}},
Expand All @@ -65,6 +71,10 @@ func createPod(name, namespace string, restartPolicy kcorev1.RestartPolicy) clie
return pod
}

func createPod(name, namespace string, restartPolicy kcorev1.RestartPolicy) client.Object {
return createPodWithLabels(name, namespace, restartPolicy, nil)
}

func deletePod(pod client.Object) {
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -110,8 +120,8 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
// Includes restart policy since these objects are indexed on this field.
knownPod1 = createPod("test-pod-1", testNamespaceOne, kcorev1.RestartPolicyNever)
knownPod2 = createPod("test-pod-2", testNamespaceTwo, kcorev1.RestartPolicyAlways)
knownPod3 = createPod("test-pod-3", testNamespaceTwo, kcorev1.RestartPolicyOnFailure)
knownPod4 = createPod("test-pod-4", testNamespaceThree, kcorev1.RestartPolicyNever)
knownPod3 = createPodWithLabels("test-pod-3", testNamespaceTwo, kcorev1.RestartPolicyOnFailure, map[string]string{"common-label": "common"})
knownPod4 = createPodWithLabels("test-pod-4", testNamespaceThree, kcorev1.RestartPolicyNever, map[string]string{"common-label": "common"})
podGVK := schema.GroupVersionKind{
Kind: "Pod",
Version: "v1",
Expand Down Expand Up @@ -284,6 +294,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
Expect(err).To(HaveOccurred())
Expect(errors.IsTimeout(err)).To(BeTrue())
})

})
Context("with unstructured objects", func() {
It("should be able to list objects that haven't been watched previously", func() {
Expand Down Expand Up @@ -709,6 +720,113 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
Expect(err).To(HaveOccurred())
})
})
type selectorsTestCase struct {
fieldSelectors map[string]string
labelSelectors map[string]string
expectedPods []string
}
DescribeTable(" and cache with selectors", func(tc selectorsTestCase) {
By("creating the cache")
builder := cache.BuilderWithOptions(
cache.Options{
SelectorsByObject: cache.SelectorsByObject{
&kcorev1.Pod{}: {
Label: labels.Set(tc.labelSelectors).AsSelector(),
Field: fields.Set(tc.fieldSelectors).AsSelector(),
},
},
},
)
informer, err := builder(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())

By("running the cache and waiting for it to sync")
go func() {
defer GinkgoRecover()
Expect(informer.Start(informerCacheCtx)).To(Succeed())
}()
Expect(informer.WaitForCacheSync(informerCacheCtx)).NotTo(BeFalse())

By("Checking with structured")
obtainedStructuredPodList := kcorev1.PodList{}
Expect(informer.List(context.Background(), &obtainedStructuredPodList)).To(Succeed())
Expect(obtainedStructuredPodList.Items).Should(WithTransform(func(pods []kcorev1.Pod) []string {
obtainedPodNames := []string{}
for _, pod := range pods {
obtainedPodNames = append(obtainedPodNames, pod.Name)
}
return obtainedPodNames
}, ConsistOf(tc.expectedPods)))

By("Checking with unstructured")
obtainedUnstructuredPodList := unstructured.UnstructuredList{}
obtainedUnstructuredPodList.SetGroupVersionKind(schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "PodList",
})
err = informer.List(context.Background(), &obtainedUnstructuredPodList)
Expect(err).To(Succeed())
Expect(obtainedUnstructuredPodList.Items).Should(WithTransform(func(pods []unstructured.Unstructured) []string {
obtainedPodNames := []string{}
for _, pod := range pods {
obtainedPodNames = append(obtainedPodNames, pod.GetName())
}
return obtainedPodNames
}, ConsistOf(tc.expectedPods)))

By("Checking with metadata")
obtainedMetadataPodList := kmetav1.PartialObjectMetadataList{}
obtainedMetadataPodList.SetGroupVersionKind(schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "PodList",
})
err = informer.List(context.Background(), &obtainedMetadataPodList)
Expect(err).To(Succeed())
Expect(obtainedMetadataPodList.Items).Should(WithTransform(func(pods []kmetav1.PartialObjectMetadata) []string {
obtainedPodNames := []string{}
for _, pod := range pods {
obtainedPodNames = append(obtainedPodNames, pod.Name)
}
return obtainedPodNames
}, ConsistOf(tc.expectedPods)))
},
Entry("when selectors are empty it has to inform about all the pods", selectorsTestCase{
fieldSelectors: map[string]string{},
labelSelectors: map[string]string{},
expectedPods: []string{"test-pod-1", "test-pod-2", "test-pod-3", "test-pod-4"},
}),
Entry("when field matches one pod it has to inform about it", selectorsTestCase{
fieldSelectors: map[string]string{"metadata.name": "test-pod-2"},
expectedPods: []string{"test-pod-2"},
}),
Entry("when field matches multiple pods it has to infor about all of them", selectorsTestCase{
fieldSelectors: map[string]string{"metadata.namespace": testNamespaceTwo},
expectedPods: []string{"test-pod-2", "test-pod-3"},
}),
Entry("when label matches one pod it has to inform about it", selectorsTestCase{
labelSelectors: map[string]string{"test-label": "test-pod-4"},
expectedPods: []string{"test-pod-4"},
}),
Entry("when label matches multiple pods it has to infor about all of them", selectorsTestCase{
labelSelectors: map[string]string{"common-label": "common"},
expectedPods: []string{"test-pod-3", "test-pod-4"},
}),
Entry("when label and field matches one pod it has to infor about about it", selectorsTestCase{
labelSelectors: map[string]string{"common-label": "common"},
fieldSelectors: map[string]string{"metadata.namespace": testNamespaceTwo},
expectedPods: []string{"test-pod-3"},
}),
Entry("when label does not match it does not has to inform", selectorsTestCase{
labelSelectors: map[string]string{"new-label": "new"},
expectedPods: []string{},
}),
Entry("when field does not match it does not has to inform", selectorsTestCase{
fieldSelectors: map[string]string{"metadata.namespace": "new"},
expectedPods: []string{},
}),
)
})
Describe("as an Informer", func() {
Context("with structured objects", func() {
Expand Down Expand Up @@ -789,7 +907,6 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
Eventually(out).Should(Receive(Equal(pod)))
close(done)
})

It("should be able to index an object field then retrieve objects by that field", func() {
By("creating the cache")
informer, err := cache.New(cfg, cache.Options{})
Expand Down
25 changes: 15 additions & 10 deletions pkg/cache/internal/deleg_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ func NewInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration,
namespace string) *InformersMap {
namespace string,
selectors SelectorsByGVK,
) *InformersMap {

return &InformersMap{
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace),
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace),
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors),
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors),

Scheme: scheme,
}
Expand Down Expand Up @@ -105,16 +107,19 @@ func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj
}

// newStructuredInformersMap creates a new InformersMap for structured objects.
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createStructuredListWatch)
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
namespace string, selectors SelectorsByGVK) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createStructuredListWatch)
}

// newUnstructuredInformersMap creates a new InformersMap for unstructured objects.
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createUnstructuredListWatch)
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
namespace string, selectors SelectorsByGVK) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createUnstructuredListWatch)
}

// newMetadataInformersMap creates a new InformersMap for metadata-only objects.
func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createMetadataListWatch)
func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
namespace string, selectors SelectorsByGVK) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createMetadataListWatch)
}
Loading

0 comments on commit cd065bf

Please sign in to comment.