Skip to content

Commit

Permalink
Refactor functional parameters
Browse files Browse the repository at this point in the history
Signed-off-by: Max Smythe <[email protected]>
  • Loading branch information
maxsmythe committed May 13, 2023
1 parent f0339e3 commit d9ecde9
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 30 deletions.
8 changes: 6 additions & 2 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ import (
var (
log = logf.RuntimeLog.WithName("object-cache")
defaultSyncPeriod = 10 * time.Hour

// BlockUntilSynced determines whether a get request for an informer should block
// until the informer's cache has synced.
BlockUntilSynced = internal.BlockUntilSynced
)

// Cache knows how to load Kubernetes objects, fetch informers to request
Expand All @@ -60,11 +64,11 @@ type Cache interface {
type Informers interface {
// GetInformer fetches or constructs an informer for the given object that corresponds to a single
// API kind and resource.
GetInformer(ctx context.Context, obj client.Object) (Informer, error)
GetInformer(ctx context.Context, obj client.Object, opts ...internal.InformerGetOption) (Informer, error)

// GetInformerForKind is similar to GetInformer, except that it takes a group-version-kind, instead
// of the underlying object.
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error)
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...internal.InformerGetOption) (Informer, error)

// Start runs all the informers known to this cache until the context is closed.
// It blocks.
Expand Down
32 changes: 6 additions & 26 deletions pkg/cache/informer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,6 @@ var (
_ Cache = &informerCache{}
)

type InformerGetOption func(*internal.GetOptions)

// BlockUntilSynced determines whether a get request for an informer should block
// until the informer's cache has synced.
func BlockUntilSynced(shouldBlock bool) InformerGetOption {
return func(opts *internal.GetOptions) {
opts.DoNotBlockUntilSynced = !shouldBlock
}
}

// ErrCacheNotStarted is returned when trying to read from the cache that wasn't started.
type ErrCacheNotStarted struct{}

Expand All @@ -70,7 +60,7 @@ func (ic *informerCache) Get(ctx context.Context, key client.ObjectKey, out clie
return err
}

started, cache, err := ic.Informers.Get(ctx, gvk, out, &internal.GetOptions{})
started, cache, err := ic.Informers.Get(ctx, gvk, out)
if err != nil {
return err
}
Expand All @@ -88,7 +78,7 @@ func (ic *informerCache) List(ctx context.Context, out client.ObjectList, opts .
return err
}

started, cache, err := ic.Informers.Get(ctx, *gvk, cacheTypeObj, &internal.GetOptions{})
started, cache, err := ic.Informers.Get(ctx, *gvk, cacheTypeObj)
if err != nil {
return err
}
Expand Down Expand Up @@ -135,38 +125,28 @@ func (ic *informerCache) objectTypeForListObject(list client.ObjectList) (*schem
}

// GetInformerForKind returns the informer for the GroupVersionKind.
func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) {
func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...internal.InformerGetOption) (Informer, error) {
// Map the gvk to an object
obj, err := ic.scheme.New(gvk)
if err != nil {
return nil, err
}

cfg := &internal.GetOptions{}
for _, opt := range opts {
opt(cfg)
}

_, i, err := ic.Informers.Get(ctx, gvk, obj, cfg)
_, i, err := ic.Informers.Get(ctx, gvk, obj, opts)
if err != nil {
return nil, err
}
return i.Informer, err
}

// GetInformer returns the informer for the obj.
func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) {
func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object, opts ...internal.InformerGetOption) (Informer, error) {
gvk, err := apiutil.GVKForObject(obj, ic.scheme)
if err != nil {
return nil, err
}

cfg := &internal.GetOptions{}
for _, opt := range opts {
opt(cfg)
}

_, i, err := ic.Informers.Get(ctx, gvk, obj, cfg)
_, i, err := ic.Informers.Get(ctx, gvk, obj, opts)
if err != nil {
return nil, err
}
Expand Down
19 changes: 17 additions & 2 deletions pkg/cache/internal/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,16 @@ type GetOptions struct {
DoNotBlockUntilSynced bool
}

type InformerGetOption func(*GetOptions)

// BlockUntilSynced determines whether a get request for an informer should block
// until the informer's cache has synced.
func BlockUntilSynced(shouldBlock bool) InformerGetOption {
return func(opts *GetOptions) {
opts.DoNotBlockUntilSynced = !shouldBlock
}
}

// Informers create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
// It uses a standard parameter codec constructed based on the given generated Scheme.
type Informers struct {
Expand Down Expand Up @@ -303,7 +313,7 @@ func (ip *Informers) get(gvk schema.GroupVersionKind, obj runtime.Object) (res *

// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
// the Informer from the map.
func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, opts *GetOptions) (bool, *Cache, error) {
func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, opts ...InformerGetOption) (bool, *Cache, error) {
// Return the informer if it is found
i, started, ok := ip.get(gvk, obj)
if !ok {
Expand All @@ -313,7 +323,12 @@ func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj r
}
}

if started && !i.Informer.HasSynced() && !opts.DoNotBlockUntilSynced {
cfg := &GetOptions{}
for _, opt := range opts {
opt(cfg)
}

if started && !i.Informer.HasSynced() && !cfg.DoNotBlockUntilSynced {
// Wait for it to sync before returning the Informer so that folks don't read from a stale cache.
if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) {
return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0)
Expand Down

0 comments on commit d9ecde9

Please sign in to comment.