Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: scaleobject ready condition 'False/Unknow' to 'True' requeue #3097

Merged
merged 6 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md
- **General:** Use metricName from GetMetricsSpec in ScaledJobs instead of `queueLength` ([#3032](https://github.com/kedacore/keda/issue/3032))
- **General:** Refactor adapter startup to ensure proper log initilization. ([2316](https://github.com/kedacore/keda/issues/2316))
- **Azure Eventhub Scaler:** KEDA operator crashes on nil memory panic if the eventhub connectionstring for Azure Eventhub Scaler contains an invalid character ([#3082](https://github.com/kedacore/keda/issues/3082))
- **General:** Scaleobject ready condition 'False/Unknow' to 'True' requeue([#3096](https://github.com/kedacore/keda/issues/3096))

### Deprecations

Expand Down
6 changes: 5 additions & 1 deletion controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,11 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager, options cont
// (in this case metadata.Generation does not change)
// so reconcile loop is not started on Status updates
For(&kedav1alpha1.ScaledObject{}, builder.WithPredicates(
predicate.Or(kedacontrollerutil.PausedReplicasPredicate{}, predicate.GenerationChangedPredicate{}),
predicate.Or(
kedacontrollerutil.PausedReplicasPredicate{},
kedacontrollerutil.ScaleObjectReadyConditionPredicate{},
predicate.GenerationChangedPredicate{},
),
)).
Owns(&autoscalingv2beta2.HorizontalPodAutoscaler{}).
Complete(r)
Expand Down
112 changes: 112 additions & 0 deletions controllers/keda/scaledobject_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package keda
import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/golang/mock/gomock"
Expand All @@ -27,6 +28,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
Expand Down Expand Up @@ -598,6 +600,116 @@ var _ = Describe("ScaledObjectController", func() {
}, 20*time.Second).Should(Equal(metav1.ConditionFalse))
})
})

It("scaleobject ready condition 'False/Unknow' to 'True' will requeue", func() {
var (
deploymentName = "conditionchange"
soName = "so-" + deploymentName
min int32 = 1
max int32 = 5
pollingInterVal int32 = 1
)

// Create the scaling target.
err := k8sClient.Create(context.Background(), generateDeployment(deploymentName))
Expect(err).ToNot(HaveOccurred())

so := &kedav1alpha1.ScaledObject{
ObjectMeta: metav1.ObjectMeta{Name: soName, Namespace: "default"},
Spec: kedav1alpha1.ScaledObjectSpec{
ScaleTargetRef: &kedav1alpha1.ScaleTarget{
Name: deploymentName,
},
MinReplicaCount: &min,
MaxReplicaCount: &max,
PollingInterval: &pollingInterVal,
Triggers: []kedav1alpha1.ScaleTriggers{
{
Type: "cpu",
MetricType: autoscalingv2beta2.UtilizationMetricType,
Metadata: map[string]string{
"value": "50",
},
},
{
Type: "external-mock",
MetricType: autoscalingv2beta2.AverageValueMetricType,
Metadata: map[string]string{},
},
},
},
}
err = k8sClient.Create(context.Background(), so)
Expect(err).ToNot(HaveOccurred())

// wait so's ready condition Ready
Eventually(func() metav1.ConditionStatus {
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
Expect(err).ToNot(HaveOccurred())
return so.Status.Conditions.GetReadyCondition().Status
}, 5*time.Second).Should(Equal(metav1.ConditionTrue))

// check hpa
hpa := &autoscalingv2beta2.HorizontalPodAutoscaler{}
Eventually(func() int {
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: getHPAName(so), Namespace: "default"}, hpa)
Expect(err).ToNot(HaveOccurred())
return len(hpa.Spec.Metrics)
}, 1*time.Second).Should(Equal(2))

// mock external server offline
atomic.StoreInt32(&scalers.MockExternalServerStatus, scalers.MockExternalServerStatusOffline)

// wait so's ready condition not
Eventually(func() metav1.ConditionStatus {
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
Expect(err).ToNot(HaveOccurred())
return so.Status.Conditions.GetReadyCondition().Status
}, 5*time.Second).Should(Or(Equal(metav1.ConditionFalse), Equal(metav1.ConditionUnknown)))

// mock kube-controller-manager request v1beta1.custom.metrics.k8s.io api GetMetrics
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: getHPAName(so), Namespace: "default"}, hpa)
Expect(err).ToNot(HaveOccurred())
hpa.Status.CurrentMetrics = []autoscalingv2beta2.MetricStatus{
{
Type: autoscalingv2beta2.ResourceMetricSourceType,
Resource: &autoscalingv2beta2.ResourceMetricStatus{
Name: corev1.ResourceCPU,
Current: autoscalingv2beta2.MetricValueStatus{
Value: resource.NewQuantity(int64(100), resource.DecimalSI),
},
},
},
}
err = k8sClient.Status().Update(ctx, hpa)
Expect(err).ToNot(HaveOccurred())

// hpa metrics will only left CPU metric
Eventually(func() int {
hpa := &autoscalingv2beta2.HorizontalPodAutoscaler{}
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: getHPAName(so), Namespace: "default"}, hpa)
Expect(err).ToNot(HaveOccurred())
return len(hpa.Spec.Metrics)
}, 5*time.Second).Should(Equal(1))

// mock external server online
atomic.StoreInt32(&scalers.MockExternalServerStatus, scalers.MockExternalServerStatusOnline)

// wait so's ready condition Ready
Eventually(func() metav1.ConditionStatus {
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
Expect(err).ToNot(HaveOccurred())
return so.Status.Conditions.GetReadyCondition().Status
}, 5*time.Second).Should(Equal(metav1.ConditionTrue))

// hpa will recover
Eventually(func() int {
hpa := &autoscalingv2beta2.HorizontalPodAutoscaler{}
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: getHPAName(so), Namespace: "default"}, hpa)
Expect(err).ToNot(HaveOccurred())
return len(hpa.Spec.Metrics)
}, 5*time.Second).Should(Equal(2))
})
})

func generateDeployment(name string) *appsv1.Deployment {
Expand Down
33 changes: 33 additions & 0 deletions controllers/keda/util/predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package util
import (
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
)

const PausedReplicasAnnotation = "autoscaling.keda.sh/paused-replicas"
Expand All @@ -28,3 +30,34 @@ func (PausedReplicasPredicate) Update(e event.UpdateEvent) bool {
}
return false
}

type ScaleObjectReadyConditionPredicate struct {
predicate.Funcs
}

func (ScaleObjectReadyConditionPredicate) Update(e event.UpdateEvent) bool {
if e.ObjectOld == nil || e.ObjectNew == nil {
return false
}

var newReadyCondition, oldReadyCondition kedav1alpha1.Condition

oldObj, ok := e.ObjectOld.(*kedav1alpha1.ScaledObject)
if !ok {
return false
}
oldReadyCondition = oldObj.Status.Conditions.GetReadyCondition()

newObj, ok := e.ObjectNew.(*kedav1alpha1.ScaledObject)
if !ok {
return false
}
newReadyCondition = newObj.Status.Conditions.GetReadyCondition()

// False/Unknown -> True
if !oldReadyCondition.IsTrue() && newReadyCondition.IsTrue() {
return true
}

return false
}
91 changes: 91 additions & 0 deletions pkg/scalers/external_mock_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package scalers

import (
"context"
"errors"
"sync/atomic"

"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
)

const (
MockExternalServerStatusOffline int32 = 0
MockExternalServerStatusOnline int32 = 1
)

var (
MockExternalServerStatus = MockExternalServerStatusOnline
ErrMock = errors.New("mock error")
MockMetricName = "mockMetricName"
MockMetricTarget int64 = 50
MockMetricValue int64 = 100
)

type externalMockScaler struct{}

func NewExternalMockScaler(config *ScalerConfig) (Scaler, error) {
return &externalMockScaler{}, nil
}

// IsActive implements Scaler
func (*externalMockScaler) IsActive(ctx context.Context) (bool, error) {
if atomic.LoadInt32(&MockExternalServerStatus) != MockExternalServerStatusOnline {
return false, ErrMock
}

return true, nil
}

// Close implements Scaler
func (*externalMockScaler) Close(ctx context.Context) error {
return nil
}

// GetMetricSpecForScaling implements Scaler
func (*externalMockScaler) GetMetricSpecForScaling(ctx context.Context) []v2beta2.MetricSpec {
if atomic.LoadInt32(&MockExternalServerStatus) != MockExternalServerStatusOnline {
return nil
}

return getMockMetricsSpecs()
}

// GetMetrics implements Scaler
func (*externalMockScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
if atomic.LoadInt32(&MockExternalServerStatus) != MockExternalServerStatusOnline {
return nil, ErrMock
}

return getMockExternalMetricsValue(), nil
}

func getMockMetricsSpecs() []v2beta2.MetricSpec {
return []v2beta2.MetricSpec{
{
Type: v2beta2.ExternalMetricSourceType,
External: &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: MockMetricName,
},
Target: v2beta2.MetricTarget{
Type: v2beta2.ValueMetricType,
Value: resource.NewQuantity(MockMetricValue, resource.DecimalSI),
},
},
},
}
}

func getMockExternalMetricsValue() []external_metrics.ExternalMetricValue {
return []external_metrics.ExternalMetricValue{
{
MetricName: MockMetricName,
Value: *resource.NewQuantity(MockMetricValue, resource.DecimalSI),
Timestamp: metav1.Now(),
},
}
}
2 changes: 1 addition & 1 deletion pkg/scaling/executor/scale_scaledobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al
// but ScaledObject.Status.ReadyCondition is set not set to 'true' -> set it back to 'true'
readyCondition := scaledObject.Status.Conditions.GetReadyCondition()
if !isError && !readyCondition.IsTrue() {
if err := e.setReadyCondition(ctx, logger, scaledObject, metav1.ConditionFalse,
if err := e.setReadyCondition(ctx, logger, scaledObject, metav1.ConditionTrue,
kedav1alpha1.ScaledObjectConditionReadySucccesReason, kedav1alpha1.ScaledObjectConditionReadySuccessMessage); err != nil {
logger.Error(err, "error setting ready condition")
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,9 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string,
return scalers.NewElasticsearchScaler(config)
case "external":
return scalers.NewExternalScaler(config)
// TODO: use other way for test.
case "external-mock":
return scalers.NewExternalMockScaler(config)
case "external-push":
return scalers.NewExternalPushScaler(config)
case "gcp-pubsub":
Expand Down