diff --git a/balancer/go.mod b/balancer/go.mod index 53f721c80917..d56aab45e485 100644 --- a/balancer/go.mod +++ b/balancer/go.mod @@ -1,4 +1,4 @@ -module k8s.io/autoscaling/balancer +module k8s.io/autoscaler/balancer go 1.19 diff --git a/balancer/pkg/client/clientset/versioned/clientset.go b/balancer/pkg/client/clientset/versioned/clientset.go new file mode 100644 index 000000000000..833e2ea18f7f --- /dev/null +++ b/balancer/pkg/client/clientset/versioned/clientset.go @@ -0,0 +1,120 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package versioned + +import ( + "fmt" + "net/http" + + balancerv1alpha1 "k8s.io/autoscaler/balancer/pkg/client/clientset/versioned/typed/balancer.x-k8s.io/v1alpha1" + discovery "k8s.io/client-go/discovery" + rest "k8s.io/client-go/rest" + flowcontrol "k8s.io/client-go/util/flowcontrol" +) + +type Interface interface { + Discovery() discovery.DiscoveryInterface + BalancerV1alpha1() balancerv1alpha1.BalancerV1alpha1Interface +} + +// Clientset contains the clients for groups. +type Clientset struct { + *discovery.DiscoveryClient + balancerV1alpha1 *balancerv1alpha1.BalancerV1alpha1Client +} + +// BalancerV1alpha1 retrieves the BalancerV1alpha1Client +func (c *Clientset) BalancerV1alpha1() balancerv1alpha1.BalancerV1alpha1Interface { + return c.balancerV1alpha1 +} + +// Discovery retrieves the DiscoveryClient +func (c *Clientset) Discovery() discovery.DiscoveryInterface { + if c == nil { + return nil + } + return c.DiscoveryClient +} + +// NewForConfig creates a new Clientset for the given config. +// If config's RateLimiter is not set and QPS and Burst are acceptable, +// NewForConfig will generate a rate-limiter in configShallowCopy. +// NewForConfig is equivalent to NewForConfigAndClient(c, httpClient), +// where httpClient was generated with rest.HTTPClientFor(c). +func NewForConfig(c *rest.Config) (*Clientset, error) { + configShallowCopy := *c + + if configShallowCopy.UserAgent == "" { + configShallowCopy.UserAgent = rest.DefaultKubernetesUserAgent() + } + + // share the transport between all clients + httpClient, err := rest.HTTPClientFor(&configShallowCopy) + if err != nil { + return nil, err + } + + return NewForConfigAndClient(&configShallowCopy, httpClient) +} + +// NewForConfigAndClient creates a new Clientset for the given config and http client. +// Note the http client provided takes precedence over the configured transport values. +// If config's RateLimiter is not set and QPS and Burst are acceptable, +// NewForConfigAndClient will generate a rate-limiter in configShallowCopy. +func NewForConfigAndClient(c *rest.Config, httpClient *http.Client) (*Clientset, error) { + configShallowCopy := *c + if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 { + if configShallowCopy.Burst <= 0 { + return nil, fmt.Errorf("burst is required to be greater than 0 when RateLimiter is not set and QPS is set to greater than 0") + } + configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst) + } + + var cs Clientset + var err error + cs.balancerV1alpha1, err = balancerv1alpha1.NewForConfigAndClient(&configShallowCopy, httpClient) + if err != nil { + return nil, err + } + + cs.DiscoveryClient, err = discovery.NewDiscoveryClientForConfigAndClient(&configShallowCopy, httpClient) + if err != nil { + return nil, err + } + return &cs, nil +} + +// NewForConfigOrDie creates a new Clientset for the given config and +// panics if there is an error in the config. +func NewForConfigOrDie(c *rest.Config) *Clientset { + cs, err := NewForConfig(c) + if err != nil { + panic(err) + } + return cs +} + +// New creates a new Clientset for the given RESTClient. +func New(c rest.Interface) *Clientset { + var cs Clientset + cs.balancerV1alpha1 = balancerv1alpha1.New(c) + + cs.DiscoveryClient = discovery.NewDiscoveryClient(c) + return &cs +} diff --git a/balancer/pkg/client/clientset/versioned/doc.go b/balancer/pkg/client/clientset/versioned/doc.go new file mode 100644 index 000000000000..41721ca52d44 --- /dev/null +++ b/balancer/pkg/client/clientset/versioned/doc.go @@ -0,0 +1,20 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +// This package has the automatically generated clientset. +package versioned diff --git a/balancer/pkg/client/clientset/versioned/fake/clientset_generated.go b/balancer/pkg/client/clientset/versioned/fake/clientset_generated.go new file mode 100644 index 000000000000..39970744d1c0 --- /dev/null +++ b/balancer/pkg/client/clientset/versioned/fake/clientset_generated.go @@ -0,0 +1,85 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + clientset "k8s.io/autoscaler/balancer/pkg/client/clientset/versioned" + balancerv1alpha1 "k8s.io/autoscaler/balancer/pkg/client/clientset/versioned/typed/balancer.x-k8s.io/v1alpha1" + fakebalancerv1alpha1 "k8s.io/autoscaler/balancer/pkg/client/clientset/versioned/typed/balancer.x-k8s.io/v1alpha1/fake" + "k8s.io/client-go/discovery" + fakediscovery "k8s.io/client-go/discovery/fake" + "k8s.io/client-go/testing" +) + +// NewSimpleClientset returns a clientset that will respond with the provided objects. +// It's backed by a very simple object tracker that processes creates, updates and deletions as-is, +// without applying any validations and/or defaults. It shouldn't be considered a replacement +// for a real clientset and is mostly useful in simple unit tests. +func NewSimpleClientset(objects ...runtime.Object) *Clientset { + o := testing.NewObjectTracker(scheme, codecs.UniversalDecoder()) + for _, obj := range objects { + if err := o.Add(obj); err != nil { + panic(err) + } + } + + cs := &Clientset{tracker: o} + cs.discovery = &fakediscovery.FakeDiscovery{Fake: &cs.Fake} + cs.AddReactor("*", "*", testing.ObjectReaction(o)) + cs.AddWatchReactor("*", func(action testing.Action) (handled bool, ret watch.Interface, err error) { + gvr := action.GetResource() + ns := action.GetNamespace() + watch, err := o.Watch(gvr, ns) + if err != nil { + return false, nil, err + } + return true, watch, nil + }) + + return cs +} + +// Clientset implements clientset.Interface. Meant to be embedded into a +// struct to get a default implementation. This makes faking out just the method +// you want to test easier. +type Clientset struct { + testing.Fake + discovery *fakediscovery.FakeDiscovery + tracker testing.ObjectTracker +} + +func (c *Clientset) Discovery() discovery.DiscoveryInterface { + return c.discovery +} + +func (c *Clientset) Tracker() testing.ObjectTracker { + return c.tracker +} + +var ( + _ clientset.Interface = &Clientset{} + _ testing.FakeClient = &Clientset{} +) + +// BalancerV1alpha1 retrieves the BalancerV1alpha1Client +func (c *Clientset) BalancerV1alpha1() balancerv1alpha1.BalancerV1alpha1Interface { + return &fakebalancerv1alpha1.FakeBalancerV1alpha1{Fake: &c.Fake} +} diff --git a/balancer/pkg/client/clientset/versioned/fake/doc.go b/balancer/pkg/client/clientset/versioned/fake/doc.go new file mode 100644 index 000000000000..9b99e7167091 --- /dev/null +++ b/balancer/pkg/client/clientset/versioned/fake/doc.go @@ -0,0 +1,20 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +// This package has the automatically generated fake clientset. +package fake diff --git a/balancer/pkg/client/clientset/versioned/fake/register.go b/balancer/pkg/client/clientset/versioned/fake/register.go new file mode 100644 index 000000000000..b0d21ae06242 --- /dev/null +++ b/balancer/pkg/client/clientset/versioned/fake/register.go @@ -0,0 +1,56 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + schema "k8s.io/apimachinery/pkg/runtime/schema" + serializer "k8s.io/apimachinery/pkg/runtime/serializer" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + balancerv1alpha1 "k8s.io/autoscaler/balancer/pkg/apis/balancer.x-k8s.io/v1alpha1" +) + +var scheme = runtime.NewScheme() +var codecs = serializer.NewCodecFactory(scheme) + +var localSchemeBuilder = runtime.SchemeBuilder{ + balancerv1alpha1.AddToScheme, +} + +// AddToScheme adds all types of this clientset into the given scheme. This allows composition +// of clientsets, like in: +// +// import ( +// "k8s.io/client-go/kubernetes" +// clientsetscheme "k8s.io/client-go/kubernetes/scheme" +// aggregatorclientsetscheme "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/scheme" +// ) +// +// kclientset, _ := kubernetes.NewForConfig(c) +// _ = aggregatorclientsetscheme.AddToScheme(clientsetscheme.Scheme) +// +// After this, RawExtensions in Kubernetes types will serialize kube-aggregator types +// correctly. +var AddToScheme = localSchemeBuilder.AddToScheme + +func init() { + v1.AddToGroupVersion(scheme, schema.GroupVersion{Version: "v1"}) + utilruntime.Must(AddToScheme(scheme)) +} diff --git a/balancer/pkg/client/clientset/versioned/scheme/doc.go b/balancer/pkg/client/clientset/versioned/scheme/doc.go new file mode 100644 index 000000000000..7dc3756168fa --- /dev/null +++ b/balancer/pkg/client/clientset/versioned/scheme/doc.go @@ -0,0 +1,20 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +// This package contains the scheme of the automatically generated clientset. +package scheme diff --git a/balancer/pkg/client/clientset/versioned/scheme/register.go b/balancer/pkg/client/clientset/versioned/scheme/register.go new file mode 100644 index 000000000000..7eb08337f967 --- /dev/null +++ b/balancer/pkg/client/clientset/versioned/scheme/register.go @@ -0,0 +1,56 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package scheme + +import ( + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + schema "k8s.io/apimachinery/pkg/runtime/schema" + serializer "k8s.io/apimachinery/pkg/runtime/serializer" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + balancerv1alpha1 "k8s.io/autoscaler/balancer/pkg/apis/balancer.x-k8s.io/v1alpha1" +) + +var Scheme = runtime.NewScheme() +var Codecs = serializer.NewCodecFactory(Scheme) +var ParameterCodec = runtime.NewParameterCodec(Scheme) +var localSchemeBuilder = runtime.SchemeBuilder{ + balancerv1alpha1.AddToScheme, +} + +// AddToScheme adds all types of this clientset into the given scheme. This allows composition +// of clientsets, like in: +// +// import ( +// "k8s.io/client-go/kubernetes" +// clientsetscheme "k8s.io/client-go/kubernetes/scheme" +// aggregatorclientsetscheme "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/scheme" +// ) +// +// kclientset, _ := kubernetes.NewForConfig(c) +// _ = aggregatorclientsetscheme.AddToScheme(clientsetscheme.Scheme) +// +// After this, RawExtensions in Kubernetes types will serialize kube-aggregator types +// correctly. +var AddToScheme = localSchemeBuilder.AddToScheme + +func init() { + v1.AddToGroupVersion(Scheme, schema.GroupVersion{Version: "v1"}) + utilruntime.Must(AddToScheme(Scheme)) +} diff --git a/balancer/pkg/client/clientset/versioned/typed/balancer.x-k8s.io/v1alpha1/balancer.go b/balancer/pkg/client/clientset/versioned/typed/balancer.x-k8s.io/v1alpha1/balancer.go new file mode 100644 index 000000000000..9719ebd2e7b8 --- /dev/null +++ b/balancer/pkg/client/clientset/versioned/typed/balancer.x-k8s.io/v1alpha1/balancer.go @@ -0,0 +1,195 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + "time" + + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + v1alpha1 "k8s.io/autoscaler/balancer/pkg/apis/balancer.x-k8s.io/v1alpha1" + scheme "k8s.io/autoscaler/balancer/pkg/client/clientset/versioned/scheme" + rest "k8s.io/client-go/rest" +) + +// BalancersGetter has a method to return a BalancerInterface. +// A group's client should implement this interface. +type BalancersGetter interface { + Balancers(namespace string) BalancerInterface +} + +// BalancerInterface has methods to work with Balancer resources. +type BalancerInterface interface { + Create(ctx context.Context, balancer *v1alpha1.Balancer, opts v1.CreateOptions) (*v1alpha1.Balancer, error) + Update(ctx context.Context, balancer *v1alpha1.Balancer, opts v1.UpdateOptions) (*v1alpha1.Balancer, error) + UpdateStatus(ctx context.Context, balancer *v1alpha1.Balancer, opts v1.UpdateOptions) (*v1alpha1.Balancer, error) + Delete(ctx context.Context, name string, opts v1.DeleteOptions) error + DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error + Get(ctx context.Context, name string, opts v1.GetOptions) (*v1alpha1.Balancer, error) + List(ctx context.Context, opts v1.ListOptions) (*v1alpha1.BalancerList, error) + Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) + Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.Balancer, err error) + BalancerExpansion +} + +// balancers implements BalancerInterface +type balancers struct { + client rest.Interface + ns string +} + +// newBalancers returns a Balancers +func newBalancers(c *BalancerV1alpha1Client, namespace string) *balancers { + return &balancers{ + client: c.RESTClient(), + ns: namespace, + } +} + +// Get takes name of the balancer, and returns the corresponding balancer object, and an error if there is any. +func (c *balancers) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.Balancer, err error) { + result = &v1alpha1.Balancer{} + err = c.client.Get(). + Namespace(c.ns). + Resource("balancers"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(ctx). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of Balancers that match those selectors. +func (c *balancers) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.BalancerList, err error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + result = &v1alpha1.BalancerList{} + err = c.client.Get(). + Namespace(c.ns). + Resource("balancers"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Do(ctx). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested balancers. +func (c *balancers) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + opts.Watch = true + return c.client.Get(). + Namespace(c.ns). + Resource("balancers"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Watch(ctx) +} + +// Create takes the representation of a balancer and creates it. Returns the server's representation of the balancer, and an error, if there is any. +func (c *balancers) Create(ctx context.Context, balancer *v1alpha1.Balancer, opts v1.CreateOptions) (result *v1alpha1.Balancer, err error) { + result = &v1alpha1.Balancer{} + err = c.client.Post(). + Namespace(c.ns). + Resource("balancers"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(balancer). + Do(ctx). + Into(result) + return +} + +// Update takes the representation of a balancer and updates it. Returns the server's representation of the balancer, and an error, if there is any. +func (c *balancers) Update(ctx context.Context, balancer *v1alpha1.Balancer, opts v1.UpdateOptions) (result *v1alpha1.Balancer, err error) { + result = &v1alpha1.Balancer{} + err = c.client.Put(). + Namespace(c.ns). + Resource("balancers"). + Name(balancer.Name). + VersionedParams(&opts, scheme.ParameterCodec). + Body(balancer). + Do(ctx). + Into(result) + return +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *balancers) UpdateStatus(ctx context.Context, balancer *v1alpha1.Balancer, opts v1.UpdateOptions) (result *v1alpha1.Balancer, err error) { + result = &v1alpha1.Balancer{} + err = c.client.Put(). + Namespace(c.ns). + Resource("balancers"). + Name(balancer.Name). + SubResource("status"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(balancer). + Do(ctx). + Into(result) + return +} + +// Delete takes name of the balancer and deletes it. Returns an error if one occurs. +func (c *balancers) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("balancers"). + Name(name). + Body(&opts). + Do(ctx). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *balancers) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + var timeout time.Duration + if listOpts.TimeoutSeconds != nil { + timeout = time.Duration(*listOpts.TimeoutSeconds) * time.Second + } + return c.client.Delete(). + Namespace(c.ns). + Resource("balancers"). + VersionedParams(&listOpts, scheme.ParameterCodec). + Timeout(timeout). + Body(&opts). + Do(ctx). + Error() +} + +// Patch applies the patch and returns the patched balancer. +func (c *balancers) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.Balancer, err error) { + result = &v1alpha1.Balancer{} + err = c.client.Patch(pt). + Namespace(c.ns). + Resource("balancers"). + Name(name). + SubResource(subresources...). + VersionedParams(&opts, scheme.ParameterCodec). + Body(data). + Do(ctx). + Into(result) + return +} diff --git a/balancer/pkg/client/clientset/versioned/typed/balancer.x-k8s.io/v1alpha1/balancer.x-k8s.io_client.go b/balancer/pkg/client/clientset/versioned/typed/balancer.x-k8s.io/v1alpha1/balancer.x-k8s.io_client.go new file mode 100644 index 000000000000..f21fa0f3f43b --- /dev/null +++ b/balancer/pkg/client/clientset/versioned/typed/balancer.x-k8s.io/v1alpha1/balancer.x-k8s.io_client.go @@ -0,0 +1,107 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "net/http" + + v1alpha1 "k8s.io/autoscaler/balancer/pkg/apis/balancer.x-k8s.io/v1alpha1" + "k8s.io/autoscaler/balancer/pkg/client/clientset/versioned/scheme" + rest "k8s.io/client-go/rest" +) + +type BalancerV1alpha1Interface interface { + RESTClient() rest.Interface + BalancersGetter +} + +// BalancerV1alpha1Client is used to interact with features provided by the balancer.x-k8s.io group. +type BalancerV1alpha1Client struct { + restClient rest.Interface +} + +func (c *BalancerV1alpha1Client) Balancers(namespace string) BalancerInterface { + return newBalancers(c, namespace) +} + +// NewForConfig creates a new BalancerV1alpha1Client for the given config. +// NewForConfig is equivalent to NewForConfigAndClient(c, httpClient), +// where httpClient was generated with rest.HTTPClientFor(c). +func NewForConfig(c *rest.Config) (*BalancerV1alpha1Client, error) { + config := *c + if err := setConfigDefaults(&config); err != nil { + return nil, err + } + httpClient, err := rest.HTTPClientFor(&config) + if err != nil { + return nil, err + } + return NewForConfigAndClient(&config, httpClient) +} + +// NewForConfigAndClient creates a new BalancerV1alpha1Client for the given config and http client. +// Note the http client provided takes precedence over the configured transport values. +func NewForConfigAndClient(c *rest.Config, h *http.Client) (*BalancerV1alpha1Client, error) { + config := *c + if err := setConfigDefaults(&config); err != nil { + return nil, err + } + client, err := rest.RESTClientForConfigAndClient(&config, h) + if err != nil { + return nil, err + } + return &BalancerV1alpha1Client{client}, nil +} + +// NewForConfigOrDie creates a new BalancerV1alpha1Client for the given config and +// panics if there is an error in the config. +func NewForConfigOrDie(c *rest.Config) *BalancerV1alpha1Client { + client, err := NewForConfig(c) + if err != nil { + panic(err) + } + return client +} + +// New creates a new BalancerV1alpha1Client for the given RESTClient. +func New(c rest.Interface) *BalancerV1alpha1Client { + return &BalancerV1alpha1Client{c} +} + +func setConfigDefaults(config *rest.Config) error { + gv := v1alpha1.SchemeGroupVersion + config.GroupVersion = &gv + config.APIPath = "/apis" + config.NegotiatedSerializer = scheme.Codecs.WithoutConversion() + + if config.UserAgent == "" { + config.UserAgent = rest.DefaultKubernetesUserAgent() + } + + return nil +} + +// RESTClient returns a RESTClient that is used to communicate +// with API server by this client implementation. +func (c *BalancerV1alpha1Client) RESTClient() rest.Interface { + if c == nil { + return nil + } + return c.restClient +} diff --git a/balancer/pkg/client/clientset/versioned/typed/balancer.x-k8s.io/v1alpha1/doc.go b/balancer/pkg/client/clientset/versioned/typed/balancer.x-k8s.io/v1alpha1/doc.go new file mode 100644 index 000000000000..df51baa4d4c1 --- /dev/null +++ b/balancer/pkg/client/clientset/versioned/typed/balancer.x-k8s.io/v1alpha1/doc.go @@ -0,0 +1,20 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +// This package has the automatically generated typed clients. +package v1alpha1 diff --git a/balancer/pkg/client/clientset/versioned/typed/balancer.x-k8s.io/v1alpha1/fake/doc.go b/balancer/pkg/client/clientset/versioned/typed/balancer.x-k8s.io/v1alpha1/fake/doc.go new file mode 100644 index 000000000000..16f44399065e --- /dev/null +++ b/balancer/pkg/client/clientset/versioned/typed/balancer.x-k8s.io/v1alpha1/fake/doc.go @@ -0,0 +1,20 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +// Package fake has the automatically generated clients. +package fake diff --git a/balancer/pkg/client/clientset/versioned/typed/balancer.x-k8s.io/v1alpha1/fake/fake_balancer.go b/balancer/pkg/client/clientset/versioned/typed/balancer.x-k8s.io/v1alpha1/fake/fake_balancer.go new file mode 100644 index 000000000000..becfbde89288 --- /dev/null +++ b/balancer/pkg/client/clientset/versioned/typed/balancer.x-k8s.io/v1alpha1/fake/fake_balancer.go @@ -0,0 +1,142 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + "context" + + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + schema "k8s.io/apimachinery/pkg/runtime/schema" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + v1alpha1 "k8s.io/autoscaler/balancer/pkg/apis/balancer.x-k8s.io/v1alpha1" + testing "k8s.io/client-go/testing" +) + +// FakeBalancers implements BalancerInterface +type FakeBalancers struct { + Fake *FakeBalancerV1alpha1 + ns string +} + +var balancersResource = schema.GroupVersionResource{Group: "balancer.x-k8s.io", Version: "v1alpha1", Resource: "balancers"} + +var balancersKind = schema.GroupVersionKind{Group: "balancer.x-k8s.io", Version: "v1alpha1", Kind: "Balancer"} + +// Get takes name of the balancer, and returns the corresponding balancer object, and an error if there is any. +func (c *FakeBalancers) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.Balancer, err error) { + obj, err := c.Fake. + Invokes(testing.NewGetAction(balancersResource, c.ns, name), &v1alpha1.Balancer{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.Balancer), err +} + +// List takes label and field selectors, and returns the list of Balancers that match those selectors. +func (c *FakeBalancers) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.BalancerList, err error) { + obj, err := c.Fake. + Invokes(testing.NewListAction(balancersResource, balancersKind, c.ns, opts), &v1alpha1.BalancerList{}) + + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1alpha1.BalancerList{ListMeta: obj.(*v1alpha1.BalancerList).ListMeta} + for _, item := range obj.(*v1alpha1.BalancerList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested balancers. +func (c *FakeBalancers) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewWatchAction(balancersResource, c.ns, opts)) + +} + +// Create takes the representation of a balancer and creates it. Returns the server's representation of the balancer, and an error, if there is any. +func (c *FakeBalancers) Create(ctx context.Context, balancer *v1alpha1.Balancer, opts v1.CreateOptions) (result *v1alpha1.Balancer, err error) { + obj, err := c.Fake. + Invokes(testing.NewCreateAction(balancersResource, c.ns, balancer), &v1alpha1.Balancer{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.Balancer), err +} + +// Update takes the representation of a balancer and updates it. Returns the server's representation of the balancer, and an error, if there is any. +func (c *FakeBalancers) Update(ctx context.Context, balancer *v1alpha1.Balancer, opts v1.UpdateOptions) (result *v1alpha1.Balancer, err error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateAction(balancersResource, c.ns, balancer), &v1alpha1.Balancer{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.Balancer), err +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *FakeBalancers) UpdateStatus(ctx context.Context, balancer *v1alpha1.Balancer, opts v1.UpdateOptions) (*v1alpha1.Balancer, error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateSubresourceAction(balancersResource, "status", c.ns, balancer), &v1alpha1.Balancer{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.Balancer), err +} + +// Delete takes name of the balancer and deletes it. Returns an error if one occurs. +func (c *FakeBalancers) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewDeleteActionWithOptions(balancersResource, c.ns, name, opts), &v1alpha1.Balancer{}) + + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakeBalancers) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + action := testing.NewDeleteCollectionAction(balancersResource, c.ns, listOpts) + + _, err := c.Fake.Invokes(action, &v1alpha1.BalancerList{}) + return err +} + +// Patch applies the patch and returns the patched balancer. +func (c *FakeBalancers) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.Balancer, err error) { + obj, err := c.Fake. + Invokes(testing.NewPatchSubresourceAction(balancersResource, c.ns, name, pt, data, subresources...), &v1alpha1.Balancer{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.Balancer), err +} diff --git a/balancer/pkg/client/clientset/versioned/typed/balancer.x-k8s.io/v1alpha1/fake/fake_balancer.x-k8s.io_client.go b/balancer/pkg/client/clientset/versioned/typed/balancer.x-k8s.io/v1alpha1/fake/fake_balancer.x-k8s.io_client.go new file mode 100644 index 000000000000..f3e34c4321b7 --- /dev/null +++ b/balancer/pkg/client/clientset/versioned/typed/balancer.x-k8s.io/v1alpha1/fake/fake_balancer.x-k8s.io_client.go @@ -0,0 +1,40 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + v1alpha1 "k8s.io/autoscaler/balancer/pkg/client/clientset/versioned/typed/balancer.x-k8s.io/v1alpha1" + rest "k8s.io/client-go/rest" + testing "k8s.io/client-go/testing" +) + +type FakeBalancerV1alpha1 struct { + *testing.Fake +} + +func (c *FakeBalancerV1alpha1) Balancers(namespace string) v1alpha1.BalancerInterface { + return &FakeBalancers{c, namespace} +} + +// RESTClient returns a RESTClient that is used to communicate +// with API server by this client implementation. +func (c *FakeBalancerV1alpha1) RESTClient() rest.Interface { + var ret *rest.RESTClient + return ret +} diff --git a/balancer/pkg/client/clientset/versioned/typed/balancer.x-k8s.io/v1alpha1/generated_expansion.go b/balancer/pkg/client/clientset/versioned/typed/balancer.x-k8s.io/v1alpha1/generated_expansion.go new file mode 100644 index 000000000000..5e2f32724a3d --- /dev/null +++ b/balancer/pkg/client/clientset/versioned/typed/balancer.x-k8s.io/v1alpha1/generated_expansion.go @@ -0,0 +1,21 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +type BalancerExpansion interface{} diff --git a/balancer/pkg/client/informers/externalversions/balancer.x-k8s.io/interface.go b/balancer/pkg/client/informers/externalversions/balancer.x-k8s.io/interface.go new file mode 100644 index 000000000000..0b0bb3fb40fc --- /dev/null +++ b/balancer/pkg/client/informers/externalversions/balancer.x-k8s.io/interface.go @@ -0,0 +1,46 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package balancer + +import ( + v1alpha1 "k8s.io/autoscaler/balancer/pkg/client/informers/externalversions/balancer.x-k8s.io/v1alpha1" + internalinterfaces "k8s.io/autoscaler/balancer/pkg/client/informers/externalversions/internalinterfaces" +) + +// Interface provides access to each of this group's versions. +type Interface interface { + // V1alpha1 provides access to shared informers for resources in V1alpha1. + V1alpha1() v1alpha1.Interface +} + +type group struct { + factory internalinterfaces.SharedInformerFactory + namespace string + tweakListOptions internalinterfaces.TweakListOptionsFunc +} + +// New returns a new Interface. +func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface { + return &group{factory: f, namespace: namespace, tweakListOptions: tweakListOptions} +} + +// V1alpha1 returns a new v1alpha1.Interface. +func (g *group) V1alpha1() v1alpha1.Interface { + return v1alpha1.New(g.factory, g.namespace, g.tweakListOptions) +} diff --git a/balancer/pkg/client/informers/externalversions/balancer.x-k8s.io/v1alpha1/balancer.go b/balancer/pkg/client/informers/externalversions/balancer.x-k8s.io/v1alpha1/balancer.go new file mode 100644 index 000000000000..bf17a6c05bdb --- /dev/null +++ b/balancer/pkg/client/informers/externalversions/balancer.x-k8s.io/v1alpha1/balancer.go @@ -0,0 +1,90 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + time "time" + + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + balancerxk8siov1alpha1 "k8s.io/autoscaler/balancer/pkg/apis/balancer.x-k8s.io/v1alpha1" + versioned "k8s.io/autoscaler/balancer/pkg/client/clientset/versioned" + internalinterfaces "k8s.io/autoscaler/balancer/pkg/client/informers/externalversions/internalinterfaces" + v1alpha1 "k8s.io/autoscaler/balancer/pkg/client/listers/balancer.x-k8s.io/v1alpha1" + cache "k8s.io/client-go/tools/cache" +) + +// BalancerInformer provides access to a shared informer and lister for +// Balancers. +type BalancerInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1alpha1.BalancerLister +} + +type balancerInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc + namespace string +} + +// NewBalancerInformer constructs a new informer for Balancer type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewBalancerInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredBalancerInformer(client, namespace, resyncPeriod, indexers, nil) +} + +// NewFilteredBalancerInformer constructs a new informer for Balancer type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredBalancerInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.BalancerV1alpha1().Balancers(namespace).List(context.TODO(), options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.BalancerV1alpha1().Balancers(namespace).Watch(context.TODO(), options) + }, + }, + &balancerxk8siov1alpha1.Balancer{}, + resyncPeriod, + indexers, + ) +} + +func (f *balancerInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredBalancerInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *balancerInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&balancerxk8siov1alpha1.Balancer{}, f.defaultInformer) +} + +func (f *balancerInformer) Lister() v1alpha1.BalancerLister { + return v1alpha1.NewBalancerLister(f.Informer().GetIndexer()) +} diff --git a/balancer/pkg/client/informers/externalversions/balancer.x-k8s.io/v1alpha1/interface.go b/balancer/pkg/client/informers/externalversions/balancer.x-k8s.io/v1alpha1/interface.go new file mode 100644 index 000000000000..be8ca7453c59 --- /dev/null +++ b/balancer/pkg/client/informers/externalversions/balancer.x-k8s.io/v1alpha1/interface.go @@ -0,0 +1,45 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + internalinterfaces "k8s.io/autoscaler/balancer/pkg/client/informers/externalversions/internalinterfaces" +) + +// Interface provides access to all the informers in this group version. +type Interface interface { + // Balancers returns a BalancerInformer. + Balancers() BalancerInformer +} + +type version struct { + factory internalinterfaces.SharedInformerFactory + namespace string + tweakListOptions internalinterfaces.TweakListOptionsFunc +} + +// New returns a new Interface. +func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface { + return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions} +} + +// Balancers returns a BalancerInformer. +func (v *version) Balancers() BalancerInformer { + return &balancerInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} +} diff --git a/balancer/pkg/client/informers/externalversions/factory.go b/balancer/pkg/client/informers/externalversions/factory.go new file mode 100644 index 000000000000..cc519f8a3377 --- /dev/null +++ b/balancer/pkg/client/informers/externalversions/factory.go @@ -0,0 +1,251 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package externalversions + +import ( + reflect "reflect" + sync "sync" + time "time" + + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + schema "k8s.io/apimachinery/pkg/runtime/schema" + versioned "k8s.io/autoscaler/balancer/pkg/client/clientset/versioned" + balancerxk8sio "k8s.io/autoscaler/balancer/pkg/client/informers/externalversions/balancer.x-k8s.io" + internalinterfaces "k8s.io/autoscaler/balancer/pkg/client/informers/externalversions/internalinterfaces" + cache "k8s.io/client-go/tools/cache" +) + +// SharedInformerOption defines the functional option type for SharedInformerFactory. +type SharedInformerOption func(*sharedInformerFactory) *sharedInformerFactory + +type sharedInformerFactory struct { + client versioned.Interface + namespace string + tweakListOptions internalinterfaces.TweakListOptionsFunc + lock sync.Mutex + defaultResync time.Duration + customResync map[reflect.Type]time.Duration + + informers map[reflect.Type]cache.SharedIndexInformer + // startedInformers is used for tracking which informers have been started. + // This allows Start() to be called multiple times safely. + startedInformers map[reflect.Type]bool + // wg tracks how many goroutines were started. + wg sync.WaitGroup + // shuttingDown is true when Shutdown has been called. It may still be running + // because it needs to wait for goroutines. + shuttingDown bool +} + +// WithCustomResyncConfig sets a custom resync period for the specified informer types. +func WithCustomResyncConfig(resyncConfig map[v1.Object]time.Duration) SharedInformerOption { + return func(factory *sharedInformerFactory) *sharedInformerFactory { + for k, v := range resyncConfig { + factory.customResync[reflect.TypeOf(k)] = v + } + return factory + } +} + +// WithTweakListOptions sets a custom filter on all listers of the configured SharedInformerFactory. +func WithTweakListOptions(tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerOption { + return func(factory *sharedInformerFactory) *sharedInformerFactory { + factory.tweakListOptions = tweakListOptions + return factory + } +} + +// WithNamespace limits the SharedInformerFactory to the specified namespace. +func WithNamespace(namespace string) SharedInformerOption { + return func(factory *sharedInformerFactory) *sharedInformerFactory { + factory.namespace = namespace + return factory + } +} + +// NewSharedInformerFactory constructs a new instance of sharedInformerFactory for all namespaces. +func NewSharedInformerFactory(client versioned.Interface, defaultResync time.Duration) SharedInformerFactory { + return NewSharedInformerFactoryWithOptions(client, defaultResync) +} + +// NewFilteredSharedInformerFactory constructs a new instance of sharedInformerFactory. +// Listers obtained via this SharedInformerFactory will be subject to the same filters +// as specified here. +// Deprecated: Please use NewSharedInformerFactoryWithOptions instead +func NewFilteredSharedInformerFactory(client versioned.Interface, defaultResync time.Duration, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerFactory { + return NewSharedInformerFactoryWithOptions(client, defaultResync, WithNamespace(namespace), WithTweakListOptions(tweakListOptions)) +} + +// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options. +func NewSharedInformerFactoryWithOptions(client versioned.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory { + factory := &sharedInformerFactory{ + client: client, + namespace: v1.NamespaceAll, + defaultResync: defaultResync, + informers: make(map[reflect.Type]cache.SharedIndexInformer), + startedInformers: make(map[reflect.Type]bool), + customResync: make(map[reflect.Type]time.Duration), + } + + // Apply all options + for _, opt := range options { + factory = opt(factory) + } + + return factory +} + +func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { + f.lock.Lock() + defer f.lock.Unlock() + + if f.shuttingDown { + return + } + + for informerType, informer := range f.informers { + if !f.startedInformers[informerType] { + f.wg.Add(1) + // We need a new variable in each loop iteration, + // otherwise the goroutine would use the loop variable + // and that keeps changing. + informer := informer + go func() { + defer f.wg.Done() + informer.Run(stopCh) + }() + f.startedInformers[informerType] = true + } + } +} + +func (f *sharedInformerFactory) Shutdown() { + f.lock.Lock() + f.shuttingDown = true + f.lock.Unlock() + + // Will return immediately if there is nothing to wait for. + f.wg.Wait() +} + +func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool { + informers := func() map[reflect.Type]cache.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + + informers := map[reflect.Type]cache.SharedIndexInformer{} + for informerType, informer := range f.informers { + if f.startedInformers[informerType] { + informers[informerType] = informer + } + } + return informers + }() + + res := map[reflect.Type]bool{} + for informType, informer := range informers { + res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced) + } + return res +} + +// InternalInformerFor returns the SharedIndexInformer for obj using an internal +// client. +func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + + informerType := reflect.TypeOf(obj) + informer, exists := f.informers[informerType] + if exists { + return informer + } + + resyncPeriod, exists := f.customResync[informerType] + if !exists { + resyncPeriod = f.defaultResync + } + + informer = newFunc(f.client, resyncPeriod) + f.informers[informerType] = informer + + return informer +} + +// SharedInformerFactory provides shared informers for resources in all known +// API group versions. +// +// It is typically used like this: +// +// ctx, cancel := context.Background() +// defer cancel() +// factory := NewSharedInformerFactory(client, resyncPeriod) +// defer factory.WaitForStop() // Returns immediately if nothing was started. +// genericInformer := factory.ForResource(resource) +// typedInformer := factory.SomeAPIGroup().V1().SomeType() +// factory.Start(ctx.Done()) // Start processing these informers. +// synced := factory.WaitForCacheSync(ctx.Done()) +// for v, ok := range synced { +// if !ok { +// fmt.Fprintf(os.Stderr, "caches failed to sync: %v", v) +// return +// } +// } +// +// // Creating informers can also be created after Start, but then +// // Start must be called again: +// anotherGenericInformer := factory.ForResource(resource) +// factory.Start(ctx.Done()) +type SharedInformerFactory interface { + internalinterfaces.SharedInformerFactory + + // Start initializes all requested informers. They are handled in goroutines + // which run until the stop channel gets closed. + Start(stopCh <-chan struct{}) + + // Shutdown marks a factory as shutting down. At that point no new + // informers can be started anymore and Start will return without + // doing anything. + // + // In addition, Shutdown blocks until all goroutines have terminated. For that + // to happen, the close channel(s) that they were started with must be closed, + // either before Shutdown gets called or while it is waiting. + // + // Shutdown may be called multiple times, even concurrently. All such calls will + // block until all goroutines have terminated. + Shutdown() + + // WaitForCacheSync blocks until all started informers' caches were synced + // or the stop channel gets closed. + WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool + + // ForResource gives generic access to a shared informer of the matching type. + ForResource(resource schema.GroupVersionResource) (GenericInformer, error) + + // InternalInformerFor returns the SharedIndexInformer for obj using an internal + // client. + InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer + + Balancer() balancerxk8sio.Interface +} + +func (f *sharedInformerFactory) Balancer() balancerxk8sio.Interface { + return balancerxk8sio.New(f, f.namespace, f.tweakListOptions) +} diff --git a/balancer/pkg/client/informers/externalversions/generic.go b/balancer/pkg/client/informers/externalversions/generic.go new file mode 100644 index 000000000000..140a51691af9 --- /dev/null +++ b/balancer/pkg/client/informers/externalversions/generic.go @@ -0,0 +1,62 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package externalversions + +import ( + "fmt" + + schema "k8s.io/apimachinery/pkg/runtime/schema" + v1alpha1 "k8s.io/autoscaler/balancer/pkg/apis/balancer.x-k8s.io/v1alpha1" + cache "k8s.io/client-go/tools/cache" +) + +// GenericInformer is type of SharedIndexInformer which will locate and delegate to other +// sharedInformers based on type +type GenericInformer interface { + Informer() cache.SharedIndexInformer + Lister() cache.GenericLister +} + +type genericInformer struct { + informer cache.SharedIndexInformer + resource schema.GroupResource +} + +// Informer returns the SharedIndexInformer. +func (f *genericInformer) Informer() cache.SharedIndexInformer { + return f.informer +} + +// Lister returns the GenericLister. +func (f *genericInformer) Lister() cache.GenericLister { + return cache.NewGenericLister(f.Informer().GetIndexer(), f.resource) +} + +// ForResource gives generic access to a shared informer of the matching type +// TODO extend this to unknown resources with a client pool +func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource) (GenericInformer, error) { + switch resource { + // Group=balancer.x-k8s.io, Version=v1alpha1 + case v1alpha1.SchemeGroupVersion.WithResource("balancers"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Balancer().V1alpha1().Balancers().Informer()}, nil + + } + + return nil, fmt.Errorf("no informer found for %v", resource) +} diff --git a/balancer/pkg/client/informers/externalversions/internalinterfaces/factory_interfaces.go b/balancer/pkg/client/informers/externalversions/internalinterfaces/factory_interfaces.go new file mode 100644 index 000000000000..91d5b9266c55 --- /dev/null +++ b/balancer/pkg/client/informers/externalversions/internalinterfaces/factory_interfaces.go @@ -0,0 +1,40 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package internalinterfaces + +import ( + time "time" + + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + versioned "k8s.io/autoscaler/balancer/pkg/client/clientset/versioned" + cache "k8s.io/client-go/tools/cache" +) + +// NewInformerFunc takes versioned.Interface and time.Duration to return a SharedIndexInformer. +type NewInformerFunc func(versioned.Interface, time.Duration) cache.SharedIndexInformer + +// SharedInformerFactory a small interface to allow for adding an informer without an import cycle +type SharedInformerFactory interface { + Start(stopCh <-chan struct{}) + InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer +} + +// TweakListOptionsFunc is a function that transforms a v1.ListOptions. +type TweakListOptionsFunc func(*v1.ListOptions) diff --git a/balancer/pkg/client/listers/balancer.x-k8s.io/v1alpha1/balancer.go b/balancer/pkg/client/listers/balancer.x-k8s.io/v1alpha1/balancer.go new file mode 100644 index 000000000000..48cc8acfd5f7 --- /dev/null +++ b/balancer/pkg/client/listers/balancer.x-k8s.io/v1alpha1/balancer.go @@ -0,0 +1,99 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by lister-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + v1alpha1 "k8s.io/autoscaler/balancer/pkg/apis/balancer.x-k8s.io/v1alpha1" + "k8s.io/client-go/tools/cache" +) + +// BalancerLister helps list Balancers. +// All objects returned here must be treated as read-only. +type BalancerLister interface { + // List lists all Balancers in the indexer. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1alpha1.Balancer, err error) + // Balancers returns an object that can list and get Balancers. + Balancers(namespace string) BalancerNamespaceLister + BalancerListerExpansion +} + +// balancerLister implements the BalancerLister interface. +type balancerLister struct { + indexer cache.Indexer +} + +// NewBalancerLister returns a new BalancerLister. +func NewBalancerLister(indexer cache.Indexer) BalancerLister { + return &balancerLister{indexer: indexer} +} + +// List lists all Balancers in the indexer. +func (s *balancerLister) List(selector labels.Selector) (ret []*v1alpha1.Balancer, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.Balancer)) + }) + return ret, err +} + +// Balancers returns an object that can list and get Balancers. +func (s *balancerLister) Balancers(namespace string) BalancerNamespaceLister { + return balancerNamespaceLister{indexer: s.indexer, namespace: namespace} +} + +// BalancerNamespaceLister helps list and get Balancers. +// All objects returned here must be treated as read-only. +type BalancerNamespaceLister interface { + // List lists all Balancers in the indexer for a given namespace. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1alpha1.Balancer, err error) + // Get retrieves the Balancer from the indexer for a given namespace and name. + // Objects returned here must be treated as read-only. + Get(name string) (*v1alpha1.Balancer, error) + BalancerNamespaceListerExpansion +} + +// balancerNamespaceLister implements the BalancerNamespaceLister +// interface. +type balancerNamespaceLister struct { + indexer cache.Indexer + namespace string +} + +// List lists all Balancers in the indexer for a given namespace. +func (s balancerNamespaceLister) List(selector labels.Selector) (ret []*v1alpha1.Balancer, err error) { + err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.Balancer)) + }) + return ret, err +} + +// Get retrieves the Balancer from the indexer for a given namespace and name. +func (s balancerNamespaceLister) Get(name string) (*v1alpha1.Balancer, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1alpha1.Resource("balancer"), name) + } + return obj.(*v1alpha1.Balancer), nil +} diff --git a/balancer/pkg/client/listers/balancer.x-k8s.io/v1alpha1/expansion_generated.go b/balancer/pkg/client/listers/balancer.x-k8s.io/v1alpha1/expansion_generated.go new file mode 100644 index 000000000000..61f8b1160e7c --- /dev/null +++ b/balancer/pkg/client/listers/balancer.x-k8s.io/v1alpha1/expansion_generated.go @@ -0,0 +1,27 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by lister-gen. DO NOT EDIT. + +package v1alpha1 + +// BalancerListerExpansion allows custom methods to be added to +// BalancerLister. +type BalancerListerExpansion interface{} + +// BalancerNamespaceListerExpansion allows custom methods to be added to +// BalancerNamespaceLister. +type BalancerNamespaceListerExpansion interface{} diff --git a/balancer/pkg/controller/conditions.go b/balancer/pkg/controller/conditions.go new file mode 100644 index 000000000000..d02f6bd2acb2 --- /dev/null +++ b/balancer/pkg/controller/conditions.go @@ -0,0 +1,69 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/autoscaler/balancer/pkg/apis/balancer.x-k8s.io/v1alpha1" +) + +func setConditionsBasedOnError(balancer *v1alpha1.Balancer, err *BalancerError, now time.Time) { + if err == nil { + balancer.Status.Conditions = setConditionInList(balancer.Status.Conditions, + now, v1alpha1.BalancerConditionRunning, metav1.ConditionTrue, + "Completed", "Balancer running OK") + } else { + balancer.Status.Conditions = setConditionInList(balancer.Status.Conditions, + now, v1alpha1.BalancerConditionRunning, metav1.ConditionFalse, + string(err.phase), err.Error()) + } +} + +// setConditionInList sets the specific condition type on the given Balancer to +// the specified value with the given reason and message. The message and args +// are treated like a format string. The condition will be added if +// it is not present. The condition will be overwritten if it already exists +// (and LastTransitionState update if condition's status is different) +// A new list will be returned. +func setConditionInList(inputList []metav1.Condition, now time.Time, conditionType string, + status metav1.ConditionStatus, reason, message string, args ...interface{}) []metav1.Condition { + resList := inputList + var existingCond *metav1.Condition + for i, condition := range resList { + if condition.Type == conditionType { + // can't take a pointer to an iteration variable + existingCond = &resList[i] + break + } + } + if existingCond == nil { + resList = append(resList, metav1.Condition{ + Type: conditionType, + }) + existingCond = &resList[len(resList)-1] + } + if existingCond.Status != status { + existingCond.LastTransitionTime = metav1.NewTime(now) + } + existingCond.Status = status + existingCond.Reason = reason + existingCond.Message = fmt.Sprintf(message, args...) + return resList +} diff --git a/balancer/pkg/controller/conditions_test.go b/balancer/pkg/controller/conditions_test.go new file mode 100644 index 000000000000..a9e280aacccd --- /dev/null +++ b/balancer/pkg/controller/conditions_test.go @@ -0,0 +1,50 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + balancerapi "k8s.io/autoscaler/balancer/pkg/apis/balancer.x-k8s.io/v1alpha1" +) + +func TestSetConditionsBasedOnProcessError(t *testing.T) { + balancer := newBalancer(5) + now := time.Now() + setConditionsBasedOnError(balancer, nil, now) + + assert.Len(t, balancer.Status.Conditions, 1) + assert.Equal(t, balancerapi.BalancerConditionRunning, balancer.Status.Conditions[0].Type) + assert.Equal(t, metav1.ConditionTrue, balancer.Status.Conditions[0].Status) + assert.Equal(t, now, balancer.Status.Conditions[0].LastTransitionTime.Time) + + setConditionsBasedOnError(balancer, nil, now.Add(time.Minute)) + assert.Len(t, balancer.Status.Conditions, 1) + assert.Equal(t, now, balancer.Status.Conditions[0].LastTransitionTime.Time) + + now = now.Add(time.Hour) + + setConditionsBasedOnError(balancer, newBalancerError(ScaleSubresourcePolling, fmt.Errorf("bum")), now) + assert.Len(t, balancer.Status.Conditions, 1) + assert.Equal(t, balancerapi.BalancerConditionRunning, balancer.Status.Conditions[0].Type) + assert.Equal(t, metav1.ConditionFalse, balancer.Status.Conditions[0].Status) + assert.Equal(t, now, balancer.Status.Conditions[0].LastTransitionTime.Time) +} diff --git a/balancer/pkg/controller/controller.go b/balancer/pkg/controller/controller.go new file mode 100644 index 000000000000..e3c274cf1a2b --- /dev/null +++ b/balancer/pkg/controller/controller.go @@ -0,0 +1,249 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + "time" + + corev1 "k8s.io/api/core/v1" + + apiequality "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + + "k8s.io/client-go/kubernetes/scheme" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + + "k8s.io/klog/v2" + + balancerapi "k8s.io/autoscaler/balancer/pkg/apis/balancer.x-k8s.io/v1alpha1" + balancerclientset "k8s.io/autoscaler/balancer/pkg/client/clientset/versioned" + balancerscheme "k8s.io/autoscaler/balancer/pkg/client/clientset/versioned/scheme" + balancerinformers "k8s.io/autoscaler/balancer/pkg/client/informers/externalversions/balancer.x-k8s.io/v1alpha1" + balancerlisters "k8s.io/autoscaler/balancer/pkg/client/listers/balancer.x-k8s.io/v1alpha1" +) + +const controllerAgentName = "balancer-controller" + +// Controller is the controller implementation for Balancer resources +type Controller struct { + // For balancer object access. + balancerClientSet balancerclientset.Interface + balancerLister balancerlisters.BalancerLister + balancerSynced cache.InformerSynced + + core CoreInterface + + // workqueue is a rate limited work queue. + workqueue workqueue.RateLimitingInterface + + // recorder is an event recorder for recording Event resources to the + // Kubernetes API. + recorder record.EventRecorder +} + +// NewController returns a new Balancer controller +func NewController( + balancerClientSet balancerclientset.Interface, + balancerInformer balancerinformers.BalancerInformer, + eventinterface typedcorev1.EventInterface, + + core CoreInterface, + resync time.Duration, +) *Controller { + + // Create event recorder. + // Add balancer-controller types to the default Kubernetes Scheme so Events can be + // logged for balancer-controller types. + utilruntime.Must(balancerscheme.AddToScheme(scheme.Scheme)) + klog.V(4).Info("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartStructuredLogging(0) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: eventinterface}) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) + + controller := &Controller{ + balancerClientSet: balancerClientSet, + balancerLister: balancerInformer.Lister(), + balancerSynced: balancerInformer.Informer().HasSynced, + recorder: recorder, + core: core, + + // Workqueue will process the items every resync period. + workqueue: workqueue.NewNamedRateLimitingQueue(NewFixedItemIntervalRateLimiter(resync), "Balancer"), + } + + klog.Info("Setting up event handlers for Balancer") + // Set up an event handler for when Balancer resources change + balancerInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.enqueueBalancer, + UpdateFunc: func(old, new interface{}) { + controller.enqueueBalancer(new) + }, + DeleteFunc: controller.deleteBalancer, + }) + + return controller +} + +// Run will set up the event handlers for types we are interested in, as well +// as syncing informer caches and starting workers. It will block until stopCh +// is closed, at which point it will shut down the workqueue and wait for +// workers to finish processing their current work items. +func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { + defer utilruntime.HandleCrash() + defer c.workqueue.ShutDown() + + // Start the informer factories to begin populating the informer caches + klog.V(1).Info("Starting Balancer controller") + if ok := cache.WaitForCacheSync(stopCh, c.balancerSynced, c.core.IsSynced); !ok { + return fmt.Errorf("failed to wait for caches to sync") + } + // Launch workers to process Balancers + for i := 0; i < threadiness; i++ { + go wait.Until(func() { + for c.processNextWorkItem() { + } + }, time.Second, stopCh) + } + + klog.V(1).Info("Balancer controller is running") + <-stopCh + klog.V(1).Info("Shutting down Balancer controller") + return nil +} + +// processNextWorkItem will read a single work item off the workqueue and +// attempt to process it, by calling the syncHandler. +func (c *Controller) processNextWorkItem() bool { + objKey, shutdown := c.workqueue.Get() + if shutdown { + return false + } + + // We expect strings to come off the workqueue. These are of the + // form namespace/name. + key, ok := objKey.(string) + if !ok { + c.dropKey(objKey) + klog.Errorf("expected string in workqueue but got %#v", objKey) + return true + } + + // Convert the namespace/name string into a distinct namespace and name + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + klog.Errorf("Invalid resource key: %s", key) + return false + } + klog.V(3).Infof("Processing balancer %s", key) + + again := c.syncHandler(namespace, name) + if again { + c.workqueue.AddRateLimited(key) + c.workqueue.Done(key) + } else { + c.dropKey(key) + } + return true +} + +// syncHandler Processes balancer with the given key and updates the Status. +// Return true if the key should be re-processed. False if the key is to +// be dropped. +func (c *Controller) syncHandler(namespace, name string) bool { + + // Get the Balancer resource with this namespace/name + balancer, err := c.balancerLister.Balancers(namespace).Get(name) + if err != nil { + // The Balancer resource may no longer exist, in which case we stop + // processing. + if errors.IsNotFound(err) { + klog.Warningf("Balancer %s/%s not found, dropping from queue", namespace, name) + return false + } + klog.Warningf("Balancer %s/%s not obtained: %s", namespace, name, err.Error()) + // Maybe there is a chance... + return true + } + + // Make a deep copy to avoid modifying concurrently used informer object. + balancer = balancer.DeepCopy() + originalStatus := balancer.Status.DeepCopy() + + statusInfo, processError := c.core.ProcessBalancer(balancer, time.Now()) + + if statusInfo != nil { + balancer.Status.Replicas = statusInfo.replicasObserved + } + selector := balancer.Spec.Selector + balancer.Status.Selector = metav1.FormatLabelSelector(&selector) + setConditionsBasedOnError(balancer, processError, time.Now()) + if processError != nil { + klog.Warningf("Failed to process balancer %s/%s: %s", namespace, name, processError.Error()) + c.recorder.Event(balancer, corev1.EventTypeWarning, "UnableToBalance", processError.Error()) + } + + if err = c.updateStatusIfNeeded(originalStatus, balancer); err != nil { + c.recorder.Event(balancer, corev1.EventTypeWarning, "StatusNotUpdated", err.Error()) + klog.Warningf("Failed to update status of balancer %s/%s: %s", namespace, name, err.Error()) + } + return true +} + +// enqueueBalancer takes a Balancer resource and converts it into a namespace/name +// string which is then put onto the work queue. This method should *not* be +// passed resources of any type other than Balancer. +func (c *Controller) enqueueBalancer(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + utilruntime.HandleError(err) + return + } + c.workqueue.Add(key) +} + +func (c *Controller) deleteBalancer(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + klog.Errorf("couldn't get key for object %+v: %v", obj, err) + return + } + c.dropKey(key) +} + +func (c *Controller) dropKey(key interface{}) { + c.workqueue.Forget(key) + c.workqueue.Done(key) +} + +// updateStatusIfNeeded calls updateStatus only if the status of the new Balancer is not the same as the old status +func (c *Controller) updateStatusIfNeeded(oldStatus *balancerapi.BalancerStatus, new *balancerapi.Balancer) error { + // skip a write if we wouldn't need to update + if apiequality.Semantic.DeepEqual(oldStatus, &new.Status) { + return nil + } + _, err := c.balancerClientSet.BalancerV1alpha1().Balancers(new.Namespace).UpdateStatus(context.TODO(), new, metav1.UpdateOptions{}) + return err +} diff --git a/balancer/pkg/controller/controller_test.go b/balancer/pkg/controller/controller_test.go new file mode 100644 index 000000000000..5ee03247c90a --- /dev/null +++ b/balancer/pkg/controller/controller_test.go @@ -0,0 +1,249 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "errors" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + balancerapi "k8s.io/autoscaler/balancer/pkg/apis/balancer.x-k8s.io/v1alpha1" + fakebalancer "k8s.io/autoscaler/balancer/pkg/client/clientset/versioned/fake" + "k8s.io/autoscaler/balancer/pkg/client/informers/externalversions" + coretesting "k8s.io/client-go/testing" + "k8s.io/klog/v2" + + "k8s.io/client-go/kubernetes/fake" +) + +type testContext struct { + statusInfo *BalancerStatusInfo + balancerError *BalancerError + input []balancerapi.Balancer + core *fakeCore + controller *Controller + stop chan struct{} + balancerUpdates chan balancerapi.Balancer + events chan v1.Event +} + +type fakeCore struct { + sync.Mutex + received *balancerapi.Balancer + calls int32 + testContext *testContext +} + +func (f *fakeCore) ProcessBalancer(balancer *balancerapi.Balancer, now time.Time) (*BalancerStatusInfo, *BalancerError) { + defer f.Unlock() + f.Lock() + f.received = balancer + f.calls++ + return f.testContext.statusInfo, f.testContext.balancerError +} + +func (f *fakeCore) IsSynced() bool { + return true +} + +func prepareTest(balancer *balancerapi.Balancer, info *BalancerStatusInfo, err *BalancerError, updateOk bool) *testContext { + tc := &testContext{ + input: []balancerapi.Balancer{*balancer}, + statusInfo: info, + balancerError: err, + stop: make(chan struct{}), + balancerUpdates: make(chan balancerapi.Balancer, 1000), + events: make(chan v1.Event, 1000), + } + + balancerclient := &fakebalancer.Clientset{} + balancerclient.AddReactor("list", "balancers", + func(action coretesting.Action) (handled bool, ret runtime.Object, err error) { + obj := &balancerapi.BalancerList{} + obj.Items = tc.input + klog.Infof("List balancers") + return true, obj, nil + }) + + balancerclient.AddReactor("update", "balancers", + func(action coretesting.Action) (handled bool, ret runtime.Object, err error) { + obj := action.(coretesting.UpdateAction).GetObject().(*balancerapi.Balancer) + if updateOk { + tc.balancerUpdates <- *obj.DeepCopy() + klog.Infof("Updated balancers") + return true, obj, nil + } + return true, obj, fmt.Errorf("Access denied") + }) + + balancerInformerFactory := externalversions.NewSharedInformerFactory(balancerclient, 0) + informer := balancerInformerFactory.Balancer().V1alpha1().Balancers() + + fakeEvents := fake.Clientset{} + fakeEvents.AddReactor("create", "events", + func(action coretesting.Action) (handled bool, ret runtime.Object, err error) { + obj := action.(coretesting.CreateAction).GetObject().(*v1.Event) + tc.events <- *obj.DeepCopy() + klog.Info("Published event") + return true, obj, nil + }) + + fc := &fakeCore{ + testContext: tc, + } + + tc.controller = NewController(balancerclient, informer, fakeEvents.CoreV1().Events(""), fc, time.Second) + balancerInformerFactory.Start(tc.stop) + + return tc +} + +func poolBalancer(ch chan balancerapi.Balancer, maxDuration time.Duration) *balancerapi.Balancer { + select { + case balancer := <-ch: + return &balancer + case <-time.After(maxDuration): + return nil + } +} + +func poolEvents(ch chan v1.Event, maxDuration time.Duration) *v1.Event { + select { + case event := <-ch: + return &event + case <-time.After(maxDuration): + return nil + } +} + +func TestController(t *testing.T) { + testCases := []struct { + name string + + // input + balancer *balancerapi.Balancer + info *BalancerStatusInfo + err *BalancerError + + // expectations: + statusReplicas int32 + conditionType string + conditionStatus metav1.ConditionStatus + expectedEvent string + updateFailed bool + }{ + { + name: "all fine", + balancer: newBalancer(5), + info: &BalancerStatusInfo{replicasObserved: 10}, + err: nil, + statusReplicas: 10, + conditionType: balancerapi.BalancerConditionRunning, + conditionStatus: metav1.ConditionTrue, + }, + { + name: "early error", + balancer: newBalancer(5), + info: nil, + err: newBalancerError(ScaleSubresourcePolling, errors.New("Booom")), + conditionType: balancerapi.BalancerConditionRunning, + conditionStatus: metav1.ConditionFalse, + expectedEvent: "UnableToBalance", + }, + { + name: "error overwrite", + balancer: func() *balancerapi.Balancer { + b := newBalancer(5) + setConditionsBasedOnError(b, nil, time.Now()) + return b + }(), + info: nil, + err: newBalancerError(ScaleSubresourcePolling, errors.New("Booom")), + conditionType: balancerapi.BalancerConditionRunning, + conditionStatus: metav1.ConditionFalse, + expectedEvent: "UnableToBalance", + }, + { + name: "late error", + balancer: newBalancer(5), + info: &BalancerStatusInfo{replicasObserved: 7}, + err: newBalancerError(ReplicaCountSetting, errors.New("Booom")), + statusReplicas: 7, + conditionType: balancerapi.BalancerConditionRunning, + conditionStatus: metav1.ConditionFalse, + expectedEvent: "UnableToBalance", + }, + { + name: "balancer update error", + balancer: newBalancer(5), + info: &BalancerStatusInfo{replicasObserved: 7}, + err: nil, + expectedEvent: "StatusNotUpdated", + updateFailed: true, + }, + } + + for i, tc := range testCases { + t.Run(fmt.Sprintf("Test %d: %s", i+1, tc.name), func(t *testing.T) { + testContext := prepareTest( + tc.balancer, + tc.info, + tc.err, + !tc.updateFailed, + ) + go testContext.controller.Run(1, testContext.stop) + + if !tc.updateFailed { + // On a very overloaded test machine this may be flaky. + // Each test case is expected to end up in < 0.1s. The machine needs + // to be >> 100x slower than usual in order for that to fail. + balancer := poolBalancer(testContext.balancerUpdates, time.Second*10) + close(testContext.stop) + + assert.NotNil(t, balancer) + assert.Equal(t, tc.statusReplicas, balancer.Status.Replicas) + found := false + for _, c := range balancer.Status.Conditions { + if c.Type == tc.conditionType { + found = true + assert.Equal(t, tc.conditionStatus, c.Status) + } + } + assert.True(t, found) + } + + if len(tc.expectedEvent) == 0 { + // This is not guaranteed to always catch errors. We expect no events to + // be published, we wait a bit here, but after 0.1s after the balancer is + // processed we expect that no unexpected events will arrive. + assert.Nil(t, poolEvents(testContext.events, time.Millisecond*100)) + } else { + // On a very overloaded test machine this may be flaky. + // Each test case is expected to end up in < 0.1s. The machine needs + // to be >> 100x slower than usual in order for that to fail. + event := poolEvents(testContext.events, time.Second*10) + assert.Equal(t, tc.expectedEvent, event.Reason) + } + }) + } +} diff --git a/balancer/pkg/controller/core.go b/balancer/pkg/controller/core.go new file mode 100644 index 000000000000..25f1e16c292a --- /dev/null +++ b/balancer/pkg/controller/core.go @@ -0,0 +1,191 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + v1 "k8s.io/client-go/informers/core/v1" + "time" + + autoscalingv1 "k8s.io/api/autoscaling/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + balancerapi "k8s.io/autoscaler/balancer/pkg/apis/balancer.x-k8s.io/v1alpha1" + "k8s.io/autoscaler/balancer/pkg/pods" + "k8s.io/autoscaler/balancer/pkg/policy" + corelisters "k8s.io/client-go/listers/core/v1" +) + +// CoreInterface of the balancer controller. Handles individual Balancer reconciliation. +type CoreInterface interface { + ProcessBalancer(balancer *balancerapi.Balancer, now time.Time) (*BalancerStatusInfo, *BalancerError) + IsSynced() bool +} + +// BalancerError adds phase information to error returned by balancer core. +type BalancerError struct { + phase BalancerPhase + err error +} + +// BalancerStatusInfo summarizes the balancing operation. +type BalancerStatusInfo struct { + replicasObserved int32 + updated bool +} + +func newBalancerStatusInfo(replicas int32, updated bool) BalancerStatusInfo { + return BalancerStatusInfo{replicasObserved: replicas, updated: updated} +} + +// core is CoreInferface implementation. +type core struct { + scaleClient ScaleClientInterface + podLister corelisters.PodLister + podSynced func() bool +} + +func newCoreForTests(client ScaleClientInterface, lister corelisters.PodLister) CoreInterface { + return &core{ + scaleClient: client, + podLister: lister, + podSynced: func() bool { + return true + }, + } +} + +// NewCore returns an implementation of the CoreInterface. +func NewCore(client ScaleClientInterface, informer v1.PodInformer) CoreInterface { + return &core{ + scaleClient: client, + podLister: informer.Lister(), + podSynced: informer.Informer().HasSynced, + } +} + +type scaleInfo struct { + scale *autoscalingv1.Scale + groupResource *schema.GroupResource +} + +// BalancerPhase indicates the phase of the balancer reconciliation. +type BalancerPhase string + +const ( + // 50 years + infDeadline = time.Hour * 24 * 365 * 50 + + // ScaleSubresourcePolling - phase where the scale subresources af a balancer are get. + ScaleSubresourcePolling BalancerPhase = "ScaleSubresourcePolling" + // PodListing - phase where pods under balancers target are listed. + PodListing BalancerPhase = "PodListing" + // PodLabelsChecking - phase where pods labels are checked. + PodLabelsChecking BalancerPhase = "PodLabelsChecking" + // ApplyingPolicyListing - phase where the balancer policy is applied. + ApplyingPolicyListing BalancerPhase = "ApplyingBalancerPolicy" + // ReplicaCountSetting - phase where balancer targets are resized. + ReplicaCountSetting BalancerPhase = "ReplicaCountSetting" +) + +func (b *BalancerError) Error() string { + return fmt.Sprintf("%s: %v", b.phase, b.err.Error()) +} + +func newBalancerError(phase BalancerPhase, err error) *BalancerError { + return &BalancerError{ + phase: phase, + err: err, + } +} + +// ProcessBalancer process Balancer and returns status information and/or error +// depending on how far the process progressed before encountering a problem. +func (c *core) ProcessBalancer(balancer *balancerapi.Balancer, now time.Time) (*BalancerStatusInfo, *BalancerError) { + scaleInfos := make(map[string]scaleInfo) + summaries := make(map[string]pods.Summary) + + for _, target := range balancer.Spec.Targets { + scale, gr, err := c.scaleClient.GetScale(balancer.Namespace, target.ScaleTargetRef) + if err != nil { + return nil, newBalancerError(ScaleSubresourcePolling, err) + } + scaleInfos[target.Name] = scaleInfo{ + scale: scale, + groupResource: gr, + } + } + + balancerSelector, err := metav1.LabelSelectorAsSelector(&balancer.Spec.Selector) + if err != nil { + return nil, newBalancerError(PodLabelsChecking, fmt.Errorf("incorrect selector")) + } + + statusInfo := BalancerStatusInfo{} + + for name, si := range scaleInfos { + selector, err := labels.Parse(si.scale.Status.Selector) + if err != nil { + return nil, newBalancerError(PodListing, err) + } + podList, err := c.podLister.Pods(balancer.Namespace).List(selector) + if err != nil { + return nil, newBalancerError(PodListing, err) + } + + for _, p := range podList { + if !balancerSelector.Matches(labels.Set(p.Labels)) { + return nil, newBalancerError(PodLabelsChecking, + fmt.Errorf("incorrect labeling for pods in target %s", name)) + } + } + deadline := infDeadline + if balancer.Spec.Policy.Fallback != nil { + deadline = time.Duration(balancer.Spec.Policy.Fallback.StartupTimeoutSeconds) * time.Second + } + summary := pods.CalculateSummary(podList, now, deadline) + summaries[name] = summary + + statusInfo.replicasObserved += summary.Total + } + placement, _, err := policy.GetPlacement(balancer, summaries) + if err != nil { + return &statusInfo, newBalancerError(ApplyingPolicyListing, err) + } + + for name, scaleInfo := range scaleInfos { + replicas, found := placement[name] + if !found { + return &statusInfo, newBalancerError(ApplyingPolicyListing, fmt.Errorf("placement for %s not found", name)) + } + if scaleInfo.scale.Spec.Replicas != replicas { + statusInfo.updated = true + scaleInfo.scale.Spec.Replicas = replicas + err := c.scaleClient.UpdateScale(scaleInfo.scale, scaleInfo.groupResource) + if err != nil { + return &statusInfo, newBalancerError(ReplicaCountSetting, err) + } + } + } + + return &statusInfo, nil +} + +func (c *core) IsSynced() bool { + return c.podSynced() +} diff --git a/balancer/pkg/controller/core_test.go b/balancer/pkg/controller/core_test.go new file mode 100644 index 000000000000..3098513ede02 --- /dev/null +++ b/balancer/pkg/controller/core_test.go @@ -0,0 +1,311 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "errors" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + hpav1 "k8s.io/api/autoscaling/v1" + hpa "k8s.io/api/autoscaling/v2" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + balancerapi "k8s.io/autoscaler/balancer/pkg/apis/balancer.x-k8s.io/v1alpha1" + corelisters "k8s.io/client-go/listers/core/v1" +) + +type podListerMock struct { + pods []*v1.Pod +} + +type podNamespaceListerMock struct { + pods []*v1.Pod +} + +func (p *podListerMock) List(selector labels.Selector) (ret []*v1.Pod, err error) { + result := make([]*v1.Pod, 0) + for _, pod := range p.pods { + if selector.Matches(labels.Set(pod.Labels)) { + result = append(result, pod) + } + } + return result, nil +} + +func (p *podListerMock) Pods(namespace string) corelisters.PodNamespaceLister { + filtered := make([]*v1.Pod, 0) + for _, pod := range p.pods { + if pod.Namespace == namespace { + filtered = append(filtered, pod) + } + } + return &podNamespaceListerMock{ + pods: filtered, + } +} + +func (p *podNamespaceListerMock) List(selector labels.Selector) (ret []*v1.Pod, err error) { + result := make([]*v1.Pod, 0) + for _, pod := range p.pods { + if selector.Matches(labels.Set(pod.Labels)) { + result = append(result, pod) + } + } + return result, nil +} + +func (p *podNamespaceListerMock) Get(name string) (*v1.Pod, error) { + for _, pod := range p.pods { + if pod.Name == name { + return pod, nil + } + } + return nil, errors.New("Not found") +} + +func newPod(namespace, name string, phase v1.PodPhase, createTime time.Time, podLabels string) *v1.Pod { + parsedLabels, err := labels.ConvertSelectorToLabelsMap(podLabels) + if err != nil { + panic(err) + } + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + CreationTimestamp: metav1.NewTime(createTime), + Labels: parsedLabels, + }, + Status: v1.PodStatus{ + Phase: phase, + }, + } +} + +func newTarget(name string) balancerapi.BalancerTarget { + return balancerapi.BalancerTarget{ + Name: name, + ScaleTargetRef: hpa.CrossVersionObjectReference{ + Name: name, + Kind: "Deployment", + APIVersion: "apps/v1", + }, + } +} + +func newScale(name string, replicas int32) *hpav1.Scale { + return &hpav1.Scale{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + }, + Spec: hpav1.ScaleSpec{ + Replicas: replicas, + }, + Status: hpav1.ScaleStatus{ + Replicas: replicas, + Selector: "run=" + name, + }, + } +} + +func newBalancer(replicas int32) *balancerapi.Balancer { + return &balancerapi.Balancer{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "balancer", + }, + Spec: balancerapi.BalancerSpec{ + Targets: []balancerapi.BalancerTarget{ + newTarget("a"), + newTarget("b"), + }, + Replicas: replicas, + Selector: metav1.LabelSelector{MatchLabels: map[string]string{"service": "nginx"}}, + Policy: balancerapi.BalancerPolicy{ + PolicyName: balancerapi.ProportionalPolicyName, + Proportions: &balancerapi.ProportionalPolicy{ + TargetProportions: map[string]int32{"a": 30, "b": 70}, + }, + Fallback: &balancerapi.FallbackPolicy{ + StartupTimeoutSeconds: 60, + }, + }, + }, + } +} + +func TestProcessBalancer(t *testing.T) { + + tests := []struct { + name string + pods []*v1.Pod + balancer *balancerapi.Balancer + scales []*hpav1.Scale + noChange bool + expected map[string]int32 + balancerPhaseError BalancerPhase + }{ + { + name: "No pods, 10 replicas, 30/70", + pods: []*v1.Pod{}, + balancer: newBalancer(10), + scales: []*hpav1.Scale{ + newScale("a", 0), + newScale("b", 0), + }, + expected: map[string]int32{"a": 3, "b": 7}, + }, + { + name: "With pods, 10 replicas, 30/70", + pods: []*v1.Pod{ + newPod("default", "a1", v1.PodRunning, time.Now(), "run=a,service=nginx"), + newPod("default", "a2", v1.PodRunning, time.Now(), "run=a,service=nginx"), + newPod("default", "a3", v1.PodRunning, time.Now(), "run=a,service=nginx"), + }, + balancer: newBalancer(10), + scales: []*hpav1.Scale{ + newScale("a", 3), + newScale("b", 0), + }, + expected: map[string]int32{"a": 3, "b": 7}, + }, + { + name: "With pods, wrong selector", + pods: []*v1.Pod{ + newPod("default", "a1", v1.PodRunning, time.Now(), "run=a,service=nginx"), + }, + balancer: func() *balancerapi.Balancer { + b := newBalancer(10) + b.Spec.Selector.MatchLabels["xx"] = "yy" + return b + }(), + scales: []*hpav1.Scale{ + newScale("a", 1), + newScale("b", 0), + }, + balancerPhaseError: PodLabelsChecking, + }, + { + name: "Without pods, priority", + pods: []*v1.Pod{}, + balancer: func() *balancerapi.Balancer { + b := newBalancer(10) + b.Spec.Policy.PolicyName = balancerapi.PriorityPolicyName + b.Spec.Policy.Proportions = nil + b.Spec.Policy.Priorities = &balancerapi.PriorityPolicy{ + TargetOrder: []string{"a", "b"}, + } + return b + }(), + scales: []*hpav1.Scale{ + newScale("a", 0), + newScale("b", 0), + }, + expected: map[string]int32{"a": 10, "b": 0}, + }, + { + name: "No pods, 0 replicas, 30/70", + pods: []*v1.Pod{}, + balancer: newBalancer(0), + scales: []*hpav1.Scale{ + newScale("a", 0), + newScale("b", 0), + }, + noChange: true, + }, + { + name: "With pods, 1 replica, 30/70, with fallback", + pods: []*v1.Pod{ + newPod("default", "b1", v1.PodPending, time.Now().Add(-time.Hour), "run=b,service=nginx"), + }, + balancer: newBalancer(1), + scales: []*hpav1.Scale{ + newScale("a", 0), + newScale("b", 1), + }, + expected: map[string]int32{"a": 1, "b": 1}, + }, + { + name: "With pods, 1 replica, 30/70, with young pending", + pods: []*v1.Pod{ + newPod("default", "b1", v1.PodPending, time.Now().Add(-time.Second*20), "run=b,service=nginx"), + }, + balancer: newBalancer(1), + scales: []*hpav1.Scale{ + newScale("a", 0), + newScale("b", 0), + }, + expected: map[string]int32{"a": 0, "b": 1}, + }, + { + name: "With pods, 1 replica, 30/70, with medium pending (fallback)", + pods: []*v1.Pod{ + newPod("default", "b1", v1.PodPending, time.Now().Add(-time.Second*61), "run=b,service=nginx"), + }, + balancer: newBalancer(1), + scales: []*hpav1.Scale{ + newScale("a", 0), + newScale("b", 0), + }, + expected: map[string]int32{"a": 1, "b": 1}, + }, + { + name: "Wrong targets", + pods: []*v1.Pod{}, + balancer: newBalancer(0), + scales: []*hpav1.Scale{}, + balancerPhaseError: ScaleSubresourcePolling, + }, + } + + for i, tc := range tests { + t.Run(fmt.Sprintf("%d: %s", i, tc.name), func(t *testing.T) { + scaleClient := scaleClientMock{ + scales: map[string]*hpav1.Scale{}, + } + for _, s := range tc.scales { + scaleClient.scales[scalesMockKey(s.Namespace, newTarget(s.Name).ScaleTargetRef)] = s + } + + podLister := podListerMock{ + pods: tc.pods, + } + + core := newCoreForTests(&scaleClient, &podLister) + statusInfo, errorsInfo := core.ProcessBalancer(tc.balancer, time.Now()) + + if tc.balancerPhaseError != "" { + assert.True(t, statusInfo == nil || statusInfo.updated == false) + assert.Equal(t, tc.balancerPhaseError, errorsInfo.phase) + } + if tc.balancerPhaseError == "" { + assert.Equal(t, !tc.noChange, statusInfo.updated) + } + if tc.expected != nil { + for k, v := range tc.expected { + key := scalesMockKey("default", newTarget(k).ScaleTargetRef) + replicas := scaleClient.scales[key].Spec.Replicas + assert.Equal(t, v, replicas, "replica count for "+key) + } + } + }) + } +} diff --git a/balancer/pkg/controller/rate_limiters.go b/balancer/pkg/controller/rate_limiters.go new file mode 100644 index 000000000000..ce0a8a98e5d8 --- /dev/null +++ b/balancer/pkg/controller/rate_limiters.go @@ -0,0 +1,68 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "time" + + "k8s.io/client-go/util/workqueue" +) + +// FixedItemIntervalRateLimiter limits items to a fixed-rate interval +// Borrowed from HPA. +type FixedItemIntervalRateLimiter struct { + interval time.Duration +} + +var _ workqueue.RateLimiter = &FixedItemIntervalRateLimiter{} + +// NewFixedItemIntervalRateLimiter creates a new instance of a RateLimiter using a fixed interval +func NewFixedItemIntervalRateLimiter(interval time.Duration) workqueue.RateLimiter { + return &FixedItemIntervalRateLimiter{ + interval: interval, + } +} + +// When returns the interval of the rate limiter +func (r *FixedItemIntervalRateLimiter) When(item interface{}) time.Duration { + return r.interval +} + +// NumRequeues returns back how many failures the item has had +func (r *FixedItemIntervalRateLimiter) NumRequeues(item interface{}) int { + return 1 +} + +// Forget indicates that an item is finished being retried. +func (r *FixedItemIntervalRateLimiter) Forget(item interface{}) { +} diff --git a/balancer/pkg/controller/scale.go b/balancer/pkg/controller/scale.go new file mode 100644 index 000000000000..f25ef6f7ec82 --- /dev/null +++ b/balancer/pkg/controller/scale.go @@ -0,0 +1,122 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + autoscalingv1 "k8s.io/api/autoscaling/v1" + hpa "k8s.io/api/autoscaling/v2" + apimeta "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + scaleclient "k8s.io/client-go/scale" + "k8s.io/klog/v2" +) + +// ScaleClient implements ScaleClientInterface and issues real queries to K8S +// apiserver. +type ScaleClient struct { + context context.Context + scaleNamespacer scaleclient.ScalesGetter + mapper apimeta.RESTMapper +} + +// ScaleClientInterface is an interface to interact with Scale subresources. +type ScaleClientInterface interface { + // GetScale gets scale subresource for the given reference. + GetScale(namespace string, scaleRef hpa.CrossVersionObjectReference) (*autoscalingv1.Scale, *schema.GroupResource, error) + + // UpdateScale updates the given scale resource. + UpdateScale(scale *autoscalingv1.Scale, resource *schema.GroupResource) error +} + +// NewScaleClient builds scale client. +func NewScaleClient(context context.Context, scale scaleclient.ScalesGetter, mapper apimeta.RESTMapper) *ScaleClient { + return &ScaleClient{ + context: context, + scaleNamespacer: scale, + mapper: mapper, + } +} + +// GetScale gets scale subresource for the given reference. Copied from HPA controller. +// TODO(mwielgus): Add cache if frequent scale resource lookups become a problem. +func (s *ScaleClient) GetScale(namespace string, scaleRef hpa.CrossVersionObjectReference) (*autoscalingv1.Scale, *schema.GroupResource, error) { + + reference := fmt.Sprintf("%s/%s/%s", scaleRef.Kind, namespace, scaleRef.Name) + targetGV, err := schema.ParseGroupVersion(scaleRef.APIVersion) + if err != nil { + return nil, nil, fmt.Errorf("invalid API version in scale target reference: %v", err) + } + + targetGK := schema.GroupKind{ + Group: targetGV.Group, + Kind: scaleRef.Kind, + } + + mappings, err := s.mapper.RESTMappings(targetGK) + if err != nil { + return nil, nil, fmt.Errorf("unable to determine resource for scale target reference: %v", err) + } + + scale, gr, err := s.scaleForResourceMappings(namespace, scaleRef.Name, mappings) + if err != nil { + return nil, nil, fmt.Errorf("failed to query scale subresource for %s: %v", reference, err) + } + return scale, &gr, nil +} + +// scaleForResourceMappings attempts to fetch the scale for the +// resource with the given name and namespace, trying each RESTMapping +// in turn until a working one is found. If none work, the first error +// is returned. It returns both the scale, as well as the group-resource from +// the working mapping. +func (s *ScaleClient) scaleForResourceMappings(namespace, name string, + mappings []*apimeta.RESTMapping) (*autoscalingv1.Scale, schema.GroupResource, error) { + var firstErr error + for i, mapping := range mappings { + targetGR := mapping.Resource.GroupResource() + scale, err := s.scaleNamespacer.Scales(namespace).Get(s.context, + targetGR, name, metav1.GetOptions{}) + + if err == nil { + return scale, targetGR, nil + } + + // if this is the first error, remember it, + // then go on and try other mappings until we find a good one + if i == 0 { + firstErr = err + } + } + + // make sure we handle an empty set of mappings + if firstErr == nil { + firstErr = fmt.Errorf("unrecognized resource") + } + + return nil, schema.GroupResource{}, firstErr +} + +// UpdateScale updates the given scale resource. +func (s *ScaleClient) UpdateScale(scale *autoscalingv1.Scale, resource *schema.GroupResource) error { + klog.V(4).Infof("Scaling %s/%s/%s to %d", resource.Resource, scale.Namespace, scale.Name, + scale.Spec.Replicas) + _, err := s.scaleNamespacer.Scales(scale.Namespace).Update(s.context, *resource, scale, metav1.UpdateOptions{}) + return err +} diff --git a/balancer/pkg/controller/scale_mock.go b/balancer/pkg/controller/scale_mock.go new file mode 100644 index 000000000000..979b146cb687 --- /dev/null +++ b/balancer/pkg/controller/scale_mock.go @@ -0,0 +1,67 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + + autoscalingv1 "k8s.io/api/autoscaling/v1" + hpa "k8s.io/api/autoscaling/v2" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +type scaleClientMock struct { + scales map[string]*autoscalingv1.Scale +} + +func scalesMockKey(namespace string, scaleRef hpa.CrossVersionObjectReference) string { + return fmt.Sprintf("%s/%s/%s/%s", namespace, scaleRef.APIVersion, scaleRef.Kind, scaleRef.Name) +} + +func (s *scaleClientMock) PutForTest(namespace string, scaleRef hpa.CrossVersionObjectReference, scale *autoscalingv1.Scale) { + key := scalesMockKey(namespace, scaleRef) + s.scales[key] = scale +} + +func (s *scaleClientMock) GetForTest(namespace string, scaleRef hpa.CrossVersionObjectReference) *autoscalingv1.Scale { + key := scalesMockKey(namespace, scaleRef) + return s.scales[key] +} + +func (s *scaleClientMock) GetScale(namespace string, scaleRef hpa.CrossVersionObjectReference) (*autoscalingv1.Scale, *schema.GroupResource, error) { + key := scalesMockKey(namespace, scaleRef) + if scale, found := s.scales[key]; found { + return scale, &schema.GroupResource{ + Group: scaleRef.APIVersion, + Resource: scaleRef.Kind, + }, nil + } + return nil, nil, fmt.Errorf("Not found: %s", key) +} + +func (s *scaleClientMock) UpdateScale(scale *autoscalingv1.Scale, resource *schema.GroupResource) error { + key := scalesMockKey(scale.Namespace, hpa.CrossVersionObjectReference{ + Name: scale.Name, + APIVersion: resource.Group, + Kind: resource.Resource, + }) + if _, found := s.scales[key]; found { + s.scales[key] = scale + return nil + } + return fmt.Errorf("Not found: %s", key) +} diff --git a/balancer/pkg/policy/policy.go b/balancer/pkg/policy/policy.go index 4c5699473dc0..a8b26b365172 100644 --- a/balancer/pkg/policy/policy.go +++ b/balancer/pkg/policy/policy.go @@ -18,8 +18,8 @@ package policy import ( "fmt" - "k8s.io/autoscaling/balancer/pkg/apis/balancer.x-k8s.io/v1alpha1" - "k8s.io/autoscaling/balancer/pkg/pods" + "k8s.io/autoscaler/balancer/pkg/apis/balancer.x-k8s.io/v1alpha1" + "k8s.io/autoscaler/balancer/pkg/pods" ) // GetPlacement calculates the placement for the given balancer and pod summary diff --git a/balancer/pkg/policy/priority_test.go b/balancer/pkg/policy/priority_test.go index 3f45e2b26abe..886b8cade9cc 100644 --- a/balancer/pkg/policy/priority_test.go +++ b/balancer/pkg/policy/priority_test.go @@ -21,7 +21,7 @@ import ( "testing" "github.com/stretchr/testify/assert" - "k8s.io/autoscaling/balancer/pkg/pods" + "k8s.io/autoscaler/balancer/pkg/pods" ) func TestDistributeByPriority(t *testing.T) { diff --git a/balancer/pkg/policy/proportional_test.go b/balancer/pkg/policy/proportional_test.go index b0138c804bc0..76ad2731a871 100644 --- a/balancer/pkg/policy/proportional_test.go +++ b/balancer/pkg/policy/proportional_test.go @@ -21,7 +21,7 @@ import ( "testing" "github.com/stretchr/testify/assert" - "k8s.io/autoscaling/balancer/pkg/pods" + "k8s.io/autoscaler/balancer/pkg/pods" ) func TestDistributeByProportions(t *testing.T) { diff --git a/balancer/pkg/policy/utils.go b/balancer/pkg/policy/utils.go index 41a7441b860b..7aaa31c1651c 100644 --- a/balancer/pkg/policy/utils.go +++ b/balancer/pkg/policy/utils.go @@ -17,8 +17,8 @@ limitations under the License. package policy import ( - "k8s.io/autoscaling/balancer/pkg/apis/balancer.x-k8s.io/v1alpha1" - "k8s.io/autoscaling/balancer/pkg/pods" + "k8s.io/autoscaler/balancer/pkg/apis/balancer.x-k8s.io/v1alpha1" + "k8s.io/autoscaler/balancer/pkg/pods" ) const (