diff --git a/vertical-pod-autoscaler/pkg/utils/status/status_object.go b/vertical-pod-autoscaler/pkg/utils/status/status_object.go new file mode 100644 index 00000000000..674769d0a67 --- /dev/null +++ b/vertical-pod-autoscaler/pkg/utils/status/status_object.go @@ -0,0 +1,169 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package status + +import ( + "net" + "time" + + apicoordinationv1 "k8s.io/api/coordination/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilnet "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apimachinery/pkg/util/wait" + clientset "k8s.io/client-go/kubernetes" + typedcoordinationv1 "k8s.io/client-go/kubernetes/typed/coordination/v1" + "k8s.io/utils/pointer" +) + +const ( + // Parameters for retrying with exponential backoff. + retryBackoffInitialDuration = 100 * time.Millisecond + retryBackoffFactor = 3 + retryBackoffJitter = 0 + retryBackoffSteps = 3 +) + +// Client for the status object. +type Client struct { + client typedcoordinationv1.LeaseInterface + leaseName string + leaseNamespace string + leaseDurationSeconds int32 + holderIdentity string +} + +// NewClient returns a client for the status object. +func NewClient(c clientset.Interface, leaseName, leaseNamespace string, leaseDuration time.Duration, holderIdentity string) *Client { + return &Client{ + client: c.CoordinationV1().Leases(leaseNamespace), + leaseName: leaseName, + leaseNamespace: leaseNamespace, + leaseDurationSeconds: int32(leaseDuration.Seconds()), + holderIdentity: holderIdentity, + } +} + +// UpdateStatus renews status object lease. +// Status object will be created if it doesn't exist. +func (c *Client) UpdateStatus() error { + updateFn := func() error { + lease, err := c.client.Get(c.leaseName, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + // Create lease if it doesn't exist. + return c.create() + } else if err != nil { + return err + } + lease.Spec.RenewTime = &metav1.MicroTime{Time: time.Now()} + _, err = c.client.Update(lease) + if apierrors.IsConflict(err) { + // Lease was updated by an another replica of the component. + // No error should be returned. + return nil + } + return err + } + return retryWithExponentialBackOff(updateFn) +} + +func (c *Client) create() error { + _, err := c.client.Create(c.newLease()) + if apierrors.IsAlreadyExists(err) { + // Lease was created by an another replica of the component. + // No error should be returned. + return nil + } + return err +} + +func (c *Client) newLease() *apicoordinationv1.Lease { + return &apicoordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: c.leaseName, + Namespace: c.leaseNamespace, + }, + Spec: apicoordinationv1.LeaseSpec{ + HolderIdentity: pointer.StringPtr(c.holderIdentity), + LeaseDurationSeconds: pointer.Int32Ptr(c.leaseDurationSeconds), + }, + } +} + +// GetStatus returns status object. +func (c *Client) GetStatus() (*apicoordinationv1.Lease, error) { + var lease *apicoordinationv1.Lease + getFn := func() error { + var err error + lease, err = c.client.Get(c.leaseName, metav1.GetOptions{}) + return err + } + err := retryWithExponentialBackOff(getFn) + return lease, err +} + +// IsStatusValid verifies if status was updated before lease timing out. +func IsStatusValid(status *apicoordinationv1.Lease, leaseTimeout time.Duration) bool { + return isStatusValid(status, leaseTimeout, time.Now()) +} + +func isStatusValid(status *apicoordinationv1.Lease, leaseTimeout time.Duration, now time.Time) bool { + return status.CreationTimestamp.Add(leaseTimeout).After(now) || + status.Spec.RenewTime.Add(leaseTimeout).After(now) +} + +func isRetryableAPIError(err error) bool { + // These errors may indicate a transient error that we can retry. + if apierrors.IsInternalError(err) || apierrors.IsTimeout(err) || apierrors.IsServerTimeout(err) || + apierrors.IsTooManyRequests(err) || utilnet.IsProbableEOF(err) || utilnet.IsConnectionReset(err) { + return true + } + // If the error sends the Retry-After header, we respect it as an explicit confirmation we should retry. + if _, shouldRetry := apierrors.SuggestsClientDelay(err); shouldRetry { + return true + } + return false +} + +func isRetryableNetError(err error) bool { + if netError, ok := err.(net.Error); ok { + return netError.Temporary() || netError.Timeout() + } + return false +} + +func retryWithExponentialBackOff(fn func() error) error { + backoff := wait.Backoff{ + Duration: retryBackoffInitialDuration, + Factor: retryBackoffFactor, + Jitter: retryBackoffJitter, + Steps: retryBackoffSteps, + } + retryFn := func(fn func() error) func() (bool, error) { + return func() (bool, error) { + err := fn() + if err == nil { + return true, nil + } + if isRetryableAPIError(err) || isRetryableNetError(err) { + return false, nil + } + return false, err + } + } + return wait.ExponentialBackoff(backoff, retryFn(fn)) +} diff --git a/vertical-pod-autoscaler/pkg/utils/status/status_object_test.go b/vertical-pod-autoscaler/pkg/utils/status/status_object_test.go new file mode 100644 index 00000000000..0d49399746f --- /dev/null +++ b/vertical-pod-autoscaler/pkg/utils/status/status_object_test.go @@ -0,0 +1,265 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package status + +import ( + "fmt" + "syscall" + "testing" + "time" + + "github.com/stretchr/testify/assert" + apicoordinationv1 "k8s.io/api/coordination/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes/fake" + testcore "k8s.io/client-go/testing" +) + +func TestUpdateStatus(t *testing.T) { + const ( + leaseName = "lease" + leaseNamespace = "default" + ) + tests := []struct { + name string + updateReactor func(action testcore.Action) (bool, runtime.Object, error) + wantErr bool + }{ + { + name: "updating status object", + updateReactor: func(action testcore.Action) (bool, runtime.Object, error) { + if action.GetResource().Resource == "leases" { + return true, &apicoordinationv1.Lease{}, nil + } + + return true, nil, fmt.Errorf("unsupported action") + }, + wantErr: false, + }, + { + name: "updating status object - creating status", + updateReactor: func() func(action testcore.Action) (bool, runtime.Object, error) { + i := 0 + return func(action testcore.Action) (bool, runtime.Object, error) { + if action.GetResource().Resource == "leases" { + i++ + switch i { + case 1: + return true, nil, apierrors.NewNotFound(schema.GroupResource{}, leaseName) + default: + return true, &apicoordinationv1.Lease{}, nil + } + } + + return true, nil, fmt.Errorf("unsupported action") + } + }(), + wantErr: false, + }, + { + // Status doesn't exist but will be created by an other component in the meantime. + name: "updating status object - creating status, already exists", + updateReactor: func() func(action testcore.Action) (bool, runtime.Object, error) { + i := 0 + return func(action testcore.Action) (bool, runtime.Object, error) { + if action.GetResource().Resource == "leases" { + i++ + switch i { + case 1: + return true, nil, apierrors.NewNotFound(schema.GroupResource{}, leaseName) + default: + return true, nil, apierrors.NewAlreadyExists(schema.GroupResource{}, leaseName) + } + } + + return true, nil, fmt.Errorf("unsupported action") + } + }(), + wantErr: false, + }, + { + // Status is updated by an other component in the meantime. + name: "updating status object - updating conflict", + updateReactor: func() func(action testcore.Action) (bool, runtime.Object, error) { + i := 0 + return func(action testcore.Action) (bool, runtime.Object, error) { + if action.GetResource().Resource == "leases" { + i++ + switch i { + case 1: + return true, &apicoordinationv1.Lease{}, nil + default: + return true, nil, apierrors.NewConflict(schema.GroupResource{}, leaseName, nil) + } + } + + return true, nil, fmt.Errorf("unsupported action") + } + }(), + wantErr: false, + }, + { + name: "updating status object - endless error", + updateReactor: func(action testcore.Action) (bool, runtime.Object, error) { + if action.GetResource().Resource == "leases" { + return true, nil, apierrors.NewNotFound(schema.GroupResource{}, leaseName) + } + return true, nil, fmt.Errorf("unsupported action") + }, + wantErr: true, + }, + } + for _, tc := range tests { + t.Run(fmt.Sprintf("test case: %s", tc.name), func(t *testing.T) { + fc := fake.NewSimpleClientset() + fc.PrependReactor("get", "leases", tc.updateReactor) + fc.PrependReactor("create", "leases", tc.updateReactor) + fc.PrependReactor("update", "leases", tc.updateReactor) + client := NewClient(fc, leaseName, leaseNamespace, 10*time.Second, leaseName) + err := client.UpdateStatus() + assert.True(t, (err != nil) == tc.wantErr, fmt.Sprintf("gotErr: %v, wantErr: %v", (err != nil), tc.wantErr)) + }) + } +} + +func TestGetStatus(t *testing.T) { + const ( + leaseName = "lease" + leaseNamespace = "default" + ) + tests := []struct { + name string + getReactor func(action testcore.Action) (bool, runtime.Object, error) + wantErr bool + }{ + { + name: "getting status object", + getReactor: func(action testcore.Action) (bool, runtime.Object, error) { + if action.GetVerb() == "get" && action.GetResource().Resource == "leases" { + return true, &apicoordinationv1.Lease{}, nil + } + + return true, nil, fmt.Errorf("unsupported action") + }, + wantErr: false, + }, + { + name: "getting status object - retryable error", + getReactor: func() func(action testcore.Action) (bool, runtime.Object, error) { + i := 0 + return func(action testcore.Action) (bool, runtime.Object, error) { + if action.GetVerb() == "get" && action.GetResource().Resource == "leases" { + i++ + switch i { + case 1, 2: + return true, nil, syscall.ECONNRESET + default: + return true, &apicoordinationv1.Lease{}, nil + } + } + + return true, nil, fmt.Errorf("unsupported action") + } + }(), + wantErr: false, + }, + { + name: "getting status object - non-retryable error", + getReactor: func() func(action testcore.Action) (bool, runtime.Object, error) { + i := 0 + return func(action testcore.Action) (bool, runtime.Object, error) { + if action.GetVerb() == "get" && action.GetResource().Resource == "leases" { + i++ + switch i { + case 1: + return true, nil, fmt.Errorf("non-retryable error") + default: + return true, &apicoordinationv1.Lease{}, nil + } + } + + return true, nil, fmt.Errorf("unsupported action") + } + }(), + wantErr: true, + }, + } + for _, tc := range tests { + t.Run(fmt.Sprintf("test case: %s", tc.name), func(t *testing.T) { + fc := fake.NewSimpleClientset() + fc.PrependReactor("get", "leases", tc.getReactor) + client := NewClient(fc, leaseName, leaseNamespace, 10*time.Second, leaseName) + _, err := client.GetStatus() + assert.True(t, (err != nil) == tc.wantErr) + }) + } +} + +func TestIsStatusValid(t *testing.T) { + now := time.Now() + tests := []struct { + name string + lease *apicoordinationv1.Lease + leaseTimeout time.Duration + expectedValid bool + }{ + { + name: "Valid CreationTimestamp", + lease: &apicoordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: metav1.Time{Time: now}, + }, + }, + leaseTimeout: 10 * time.Second, + expectedValid: true, + }, + { + name: "Valid RenewTime", + lease: &apicoordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: metav1.Time{Time: now.Add(-time.Minute)}, + }, + Spec: apicoordinationv1.LeaseSpec{ + RenewTime: &metav1.MicroTime{Time: now}, + }, + }, + leaseTimeout: 10 * time.Second, + expectedValid: true, + }, + { + name: "Outdated CreationTimestamp and RenewTime", + lease: &apicoordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: metav1.Time{Time: now.Add(-time.Minute)}, + }, + Spec: apicoordinationv1.LeaseSpec{ + RenewTime: &metav1.MicroTime{Time: now.Add(-time.Minute)}, + }, + }, + leaseTimeout: 10 * time.Second, + expectedValid: false, + }, + } + for _, tc := range tests { + t.Run(fmt.Sprintf("test case: %s", tc.name), func(t *testing.T) { + assert.Equal(t, isStatusValid(tc.lease, tc.leaseTimeout, now), tc.expectedValid) + }) + } +}