Skip to content

Commit

Permalink
[v2] Refactor scaleHandler cont. (#747)
Browse files Browse the repository at this point in the history
  • Loading branch information
zroubalik authored Apr 16, 2020
1 parent 64cb239 commit 2d81976
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 38 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ generate-api:
$(GO_BUILD_VARS) operator-sdk generate crds
# withTriggers is only used for duck typing so we only need the deepcopy methods
# However operator-sdk generate doesn't appear to have an option for that
# until this issue is fixed: https://github.com/kubernetes-sigs/controller-tools/issues/398
rm deploy/crds/keda.sh_withtriggers_crd.yaml

pkg/scalers/liiklus/LiiklusService.pb.go: hack/LiiklusService.proto
Expand Down
3 changes: 2 additions & 1 deletion pkg/scaling/executor/scale_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package executor
import (
"context"

"github.com/go-logr/logr"
kedav1alpha1 "github.com/kedacore/keda/pkg/apis/keda/v1alpha1"
"github.com/kedacore/keda/pkg/scalers"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down
32 changes: 17 additions & 15 deletions pkg/scaling/executor/scale_scaledobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,38 @@ import (
kedav1alpha1 "github.com/kedacore/keda/pkg/apis/keda/v1alpha1"
"github.com/kedacore/keda/pkg/scalers"

"github.com/go-logr/logr"
autoscalingv1 "k8s.io/api/autoscaling/v1"
)

func (e *scaleExecutor) RequestScale(ctx context.Context, scalers []scalers.Scaler, scaledObject *kedav1alpha1.ScaledObject) {
logger := e.logger.WithValues("Scaledobject.Name", scaledObject.Name,
"ScaledObject.Namespace", scaledObject.Namespace,
"ScaleTarget.Name", scaledObject.Spec.ScaleTargetRef.Name)

isActive := false
for _, scaler := range scalers {
defer scaler.Close()
isTriggerActive, err := scaler.IsActive(ctx)

if err != nil {
e.logger.V(1).Info("Error getting scale decision", "Error", err)
logger.V(1).Info("Error getting scale decision", "Error", err)
continue
} else if isTriggerActive {
isActive = true
e.logger.V(1).Info("Scaler for scaledObject is active", "Scaler", scaler)
logger.V(1).Info("Scaler for scaledObject is active", "Scaler", scaler)
}
}

currentScale, err := e.getScaleTargetScale(scaledObject)
if err != nil {
e.logger.Error(err, "Error getting Scale")
logger.Error(err, "Error getting Scale")
}

if currentScale.Spec.Replicas == 0 && isActive {
// current replica count is 0, but there is an active trigger.
// scale the ScaleTarget up
e.scaleFromZero(ctx, scaledObject, currentScale)
e.scaleFromZero(ctx, logger, scaledObject, currentScale)
} else if !isActive &&
currentScale.Spec.Replicas > 0 &&
(scaledObject.Spec.MinReplicaCount == nil || *scaledObject.Spec.MinReplicaCount == 0) {
Expand All @@ -42,7 +47,7 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scalers []scalers.Scal
// There is no minimum configured or minimum is set to ZERO. HPA will handles other scale down operations

// Try to scale it down.
e.scaleToZero(scaledObject, currentScale)
e.scaleToZero(logger, scaledObject, currentScale)
} else if !isActive &&
scaledObject.Spec.MinReplicaCount != nil &&
currentScale.Spec.Replicas < *scaledObject.Spec.MinReplicaCount {
Expand All @@ -54,22 +59,21 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scalers []scalers.Scal

err := e.updateScaleOnScaleTarget(scaledObject, currentScale)
if err == nil {
e.logger.Info("Successfully set ScaleTarget replicas count to ScaledObject minReplicaCount",
"ScaleTarget.Name", scaledObject.Spec.ScaleTargetRef.Name,
logger.Info("Successfully set ScaleTarget replicas count to ScaledObject minReplicaCount",
"ScaleTarget.Replicas", currentScale.Spec.Replicas)
}
} else if isActive {
// triggers are active, but we didn't need to scale (replica count > 0)
// Update LastActiveTime to now.
e.updateLastActiveTime(ctx, scaledObject)
} else {
e.logger.V(1).Info("ScaleTarget no change", "ScaleTarget.Name", scaledObject.Spec.ScaleTargetRef.Name)
logger.V(1).Info("ScaleTarget no change")
}
}

// An object will be scaled down to 0 only if it's passed its cooldown period
// or if LastActiveTime is nil
func (e *scaleExecutor) scaleToZero(scaledObject *kedav1alpha1.ScaledObject, scale *autoscalingv1.Scale) {
func (e *scaleExecutor) scaleToZero(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, scale *autoscalingv1.Scale) {
var cooldownPeriod time.Duration

if scaledObject.Spec.CooldownPeriod != nil {
Expand All @@ -86,17 +90,16 @@ func (e *scaleExecutor) scaleToZero(scaledObject *kedav1alpha1.ScaledObject, sca
scale.Spec.Replicas = 0
err := e.updateScaleOnScaleTarget(scaledObject, scale)
if err == nil {
e.logger.Info("Successfully scaled ScaleTarget to 0 replicas", "ScaleTarget.Name", scaledObject.Spec.ScaleTargetRef.Name)
logger.Info("Successfully scaled ScaleTarget to 0 replicas")
}
} else {
e.logger.V(1).Info("ScaleTarget cooling down",
"ScaleTarget.Name", scaledObject.Spec.ScaleTargetRef.Name,
logger.V(1).Info("ScaleTarget cooling down",
"LastActiveTime", scaledObject.Status.LastActiveTime,
"CoolDownPeriod", cooldownPeriod)
}
}

func (e *scaleExecutor) scaleFromZero(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, scale *autoscalingv1.Scale) {
func (e *scaleExecutor) scaleFromZero(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, scale *autoscalingv1.Scale) {
currentReplicas := scale.Spec.Replicas
if scaledObject.Spec.MinReplicaCount != nil && *scaledObject.Spec.MinReplicaCount > 0 {
scale.Spec.Replicas = *scaledObject.Spec.MinReplicaCount
Expand All @@ -107,8 +110,7 @@ func (e *scaleExecutor) scaleFromZero(ctx context.Context, scaledObject *kedav1a
err := e.updateScaleOnScaleTarget(scaledObject, scale)

if err == nil {
e.logger.Info("Successfully updated ScaleTarget",
"ScaleTarget.Name", scaledObject.Spec.ScaleTargetRef.Name,
logger.Info("Successfully updated ScaleTarget",
"Original Replicas Count", currentReplicas,
"New Replicas Count", scale.Spec.Replicas)

Expand Down
4 changes: 3 additions & 1 deletion pkg/scaling/resolver/scale_resolvers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package resolver
import (
"context"
"fmt"
"github.com/go-logr/logr"

kedav1alpha1 "github.com/kedacore/keda/pkg/apis/keda/v1alpha1"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down
52 changes: 31 additions & 21 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@ package scaling
import (
"context"
"fmt"
"github.com/kedacore/keda/pkg/scaling/resolver"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"knative.dev/pkg/apis/duck"
"sync"
"time"

kedav1alpha1 "github.com/kedacore/keda/pkg/apis/keda/v1alpha1"
"github.com/kedacore/keda/pkg/scalers"
"github.com/kedacore/keda/pkg/scaling/executor"
"github.com/kedacore/keda/pkg/scaling/resolver"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/scale"
"knative.dev/pkg/apis/duck"
duckv1 "knative.dev/pkg/apis/duck/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -59,12 +59,12 @@ func (h *scaleHandler) GetScalers(scalableObject interface{}) ([]scalers.Scaler,
return nil, err
}

withPods, containerName, err := h.getPods(scalableObject)
podTemplateSpec, containerName, err := h.getPods(scalableObject)
if err != nil {
return nil, err
}

return h.buildScalers(withTriggers, withPods, containerName)
return h.buildScalers(withTriggers, podTemplateSpec, containerName)
}

func (h *scaleHandler) HandleScalableObject(scalableObject interface{}) error {
Expand Down Expand Up @@ -117,18 +117,18 @@ func (h *scaleHandler) DeleteScalableObject(scalableObject interface{}) error {

// startScaleLoop blocks forever and checks the scaledObject based on its pollingInterval
func (h *scaleHandler) startScaleLoop(ctx context.Context, withTriggers *kedav1alpha1.WithTriggers, scalableObject interface{}) {
logger := h.logger.WithValues("namespace", withTriggers.GetNamespace(), "name", withTriggers.GetName())
logger := h.logger.WithValues("type", withTriggers.Kind, "namespace", withTriggers.Namespace, "name", withTriggers.Name)

// kick off one check to the scalers now
h.checkScalers(ctx, withTriggers, scalableObject)
h.checkScalers(ctx, scalableObject)

pollingInterval := getPollingInterval(withTriggers)
logger.V(1).Info("Watching with pollingInterval", "PollingInterval", pollingInterval)

for {
select {
case <-time.After(pollingInterval):
h.checkScalers(ctx, withTriggers, scalableObject)
h.checkScalers(ctx, scalableObject)
case <-ctx.Done():
logger.V(1).Info("Context canceled")
return
Expand All @@ -138,7 +138,7 @@ func (h *scaleHandler) startScaleLoop(ctx context.Context, withTriggers *kedav1a

// checkScalers contains the main logic for the ScaleHandler scaling logic.
// It'll check each trigger active status then call RequestScale
func (h *scaleHandler) checkScalers(ctx context.Context, withTriggers *kedav1alpha1.WithTriggers, scalableObject interface{}) {
func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interface{}) {
scalers, err := h.GetScalers(scalableObject)
if err != nil {
h.logger.Error(err, "Error getting scalers", "object", scalableObject)
Expand All @@ -153,21 +153,21 @@ func (h *scaleHandler) checkScalers(ctx context.Context, withTriggers *kedav1alp
}
}

// GetScaledObjectScalers returns list of Scalers for the specified ScaledObject
func (h *scaleHandler) buildScalers(withTriggers *kedav1alpha1.WithTriggers, withPods *duckv1.WithPod, containerName string) ([]scalers.Scaler, error) {
// buildScalers returns list of Scalers for the specified triggers
func (h *scaleHandler) buildScalers(withTriggers *kedav1alpha1.WithTriggers, podTemplateSpec *corev1.PodTemplateSpec, containerName string) ([]scalers.Scaler, error) {
logger := h.logger.WithValues("type", withTriggers.Kind, "namespace", withTriggers.Namespace, "name", withTriggers.Name)
var scalersRes []scalers.Scaler
logger := h.logger.WithValues("name", withTriggers.Name, "namespace", withTriggers.Namespace)

resolvedEnv, err := resolver.ResolveContainerEnv(h.client, logger, &withPods.Spec.Template.Spec, containerName, withTriggers.Namespace)
resolvedEnv, err := resolver.ResolveContainerEnv(h.client, logger, &podTemplateSpec.Spec, containerName, withTriggers.Namespace)
if err != nil {
return scalersRes, fmt.Errorf("error resolving secrets for ScaleTarget: %s", err)
}

for i, trigger := range withTriggers.Spec.Triggers {
authParams, podIdentity := resolver.ResolveAuthRef(h.client, logger, trigger.AuthenticationRef, &withPods.Spec.Template.Spec, withTriggers.Namespace)
authParams, podIdentity := resolver.ResolveAuthRef(h.client, logger, trigger.AuthenticationRef, &podTemplateSpec.Spec, withTriggers.Namespace)

if podIdentity == kedav1alpha1.PodIdentityProviderAwsEKS {
serviceAccountName := withPods.Spec.Template.Spec.ServiceAccountName
serviceAccountName := podTemplateSpec.Spec.ServiceAccountName
serviceAccount := &corev1.ServiceAccount{}
err = h.client.Get(context.TODO(), types.NamespacedName{Name: serviceAccountName, Namespace: withTriggers.Namespace}, serviceAccount)
if err != nil {
Expand All @@ -176,7 +176,7 @@ func (h *scaleHandler) buildScalers(withTriggers *kedav1alpha1.WithTriggers, wit
}
authParams["awsRoleArn"] = serviceAccount.Annotations[kedav1alpha1.PodIdentityAnnotationEKS]
} else if podIdentity == kedav1alpha1.PodIdentityProviderAwsKiam {
authParams["awsRoleArn"] = withPods.Spec.Template.ObjectMeta.Annotations[kedav1alpha1.PodIdentityAnnotationKiam]
authParams["awsRoleArn"] = podTemplateSpec.ObjectMeta.Annotations[kedav1alpha1.PodIdentityAnnotationKiam]
}

scaler, err := buildScaler(withTriggers.Name, withTriggers.Namespace, trigger.Type, resolvedEnv, trigger.Metadata, authParams, podIdentity)
Expand All @@ -191,7 +191,7 @@ func (h *scaleHandler) buildScalers(withTriggers *kedav1alpha1.WithTriggers, wit
return scalersRes, nil
}

func (h *scaleHandler) getPods(scalableObject interface{}) (*duckv1.WithPod, string, error) {
func (h *scaleHandler) getPods(scalableObject interface{}) (*corev1.PodTemplateSpec, string, error) {
switch obj := scalableObject.(type) {
case *kedav1alpha1.ScaledObject:
unstruct := &unstructured.Unstructured{}
Expand All @@ -212,11 +212,17 @@ func (h *scaleHandler) getPods(scalableObject interface{}) (*duckv1.WithPod, str
return nil, "", fmt.Errorf("no containers found")
}

return withPods, obj.Spec.ScaleTargetRef.ContainerName, nil
podTemplateSpec := corev1.PodTemplateSpec{
ObjectMeta: withPods.ObjectMeta,
Spec: withPods.Spec.Template.Spec,
}
return &podTemplateSpec, obj.Spec.ScaleTargetRef.ContainerName, nil
case *kedav1alpha1.ScaledJob:
// TODO add ContainerName for ScaledJobs!!
return &obj.Spec.JobTargetRef.Template, "", nil
default:
return nil, "", fmt.Errorf("unknown scalable object type %v", scalableObject)
}

// TODO: implement this for ScaledJobs!!
return nil, "", fmt.Errorf("resolvePods is only implemented for ScaledObjects so far")
}

func buildScaler(name, namespace, triggerType string, resolvedEnv, triggerMetadata, authParams map[string]string, podIdentity string) (scalers.Scaler, error) {
Expand Down Expand Up @@ -269,13 +275,17 @@ func asDuckWithTriggers(scalableObject interface{}) (*kedav1alpha1.WithTriggers,
switch obj := scalableObject.(type) {
case *kedav1alpha1.ScaledObject:
withTriggers = &kedav1alpha1.WithTriggers{
TypeMeta: obj.TypeMeta,
ObjectMeta: obj.ObjectMeta,
Spec: kedav1alpha1.WithTriggersSpec{
PollingInterval: obj.Spec.PollingInterval,
Triggers: obj.Spec.Triggers,
},
}
case *kedav1alpha1.ScaledJob:
withTriggers = &kedav1alpha1.WithTriggers{
TypeMeta: obj.TypeMeta,
ObjectMeta: obj.ObjectMeta,
Spec: kedav1alpha1.WithTriggersSpec{
PollingInterval: obj.Spec.PollingInterval,
Triggers: obj.Spec.Triggers,
Expand Down

0 comments on commit 2d81976

Please sign in to comment.