Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GCP Pub/Sub Scaler: add oldest unacked message age metric #2266

Merged
Merged
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,15 @@
- Improve error message if `IdleReplicaCount` are equal to `MinReplicaCount` to be the same as the check ([#2212](https://github.com/kedacore/keda/pull/2212))
- Improve Cloudwatch Scaler metric exporting logic ([#2243](https://github.com/kedacore/keda/pull/2243))
- Refactor aws related scalers to reuse the aws clients instead of creating a new one for every GetMetrics call([#2255](https://github.com/kedacore/keda/pull/2255))
- GCP PubSub scaler may be used in SubscriptionSize and OldestUnackedMessageAge modes
- Cleanup metric names inside scalers ([#2260](https://github.com/kedacore/keda/pull/2260))
- Validating values length in prometheus query response ([#2264](https://github.com/kedacore/keda/pull/2264))
- Add `unsafeSsl` parameter in SeleniumGrid scaler ([#2157](https://github.com/kedacore/keda/pull/2157))

### Deprecations

- `subscriptionSize` is deprecated in favor of `mode` and `value` for GCP Pub/Sub scaler

### Breaking Changes

- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX))
Expand Down
143 changes: 104 additions & 39 deletions pkg/scalers/gcp_pub_sub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package scalers

import (
"context"
"errors"
"fmt"
"strconv"

v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -17,8 +18,13 @@ import (
)

const (
defaultTargetSubscriptionSize = 5
pubSubStackDriverMetricName = "pubsub.googleapis.com/subscription/num_undelivered_messages"
defaultTargetSubscriptionSize = 5
defaultTargetOldestUnackedMessageAge = 10
pubSubStackDriverSubscriptionSizeMetricName = "pubsub.googleapis.com/subscription/num_undelivered_messages"
pubSubStackDriverOldestUnackedMessageAgeMetricName = "pubsub.googleapis.com/subscription/oldest_unacked_message_age"

pubsubModeSubscriptionSize = "SubscriptionSize"
pubsubModeOldestUnackedMessageAge = "OldestUnackedMessageAge"
)

type gcpAuthorizationMetadata struct {
Expand All @@ -33,10 +39,12 @@ type pubsubScaler struct {
}

type pubsubMetadata struct {
targetSubscriptionSize int
subscriptionName string
gcpAuthorization gcpAuthorizationMetadata
scalerIndex int
mode string
value int

subscriptionName string
gcpAuthorization gcpAuthorizationMetadata
scalerIndex int
fira42073 marked this conversation as resolved.
Show resolved Hide resolved
}

var gcpPubSubLog = logf.Log.WithName("gcp_pub_sub_scaler")
Expand All @@ -55,15 +63,43 @@ func NewPubSubScaler(config *ScalerConfig) (Scaler, error) {

func parsePubSubMetadata(config *ScalerConfig) (*pubsubMetadata, error) {
meta := pubsubMetadata{}
meta.targetSubscriptionSize = defaultTargetSubscriptionSize
meta.mode = pubsubModeSubscriptionSize

if val, ok := config.TriggerMetadata["subscriptionSize"]; ok {
subscriptionSize, err := strconv.Atoi(val)
mode, modePresent := config.TriggerMetadata["mode"]
value, valuePresent := config.TriggerMetadata["value"]

if subSize, subSizePresent := config.TriggerMetadata["subscriptionSize"]; subSizePresent {
if modePresent || valuePresent {
return nil, errors.New("you can use either mode and value fields or subscriptionSize field")
}
gcpPubSubLog.Info("subscriptionSize field is deprecated. Use mode and value fields instead")
meta.mode = pubsubModeSubscriptionSize
subSizeValue, err := strconv.Atoi(subSize)
if err != nil {
return nil, fmt.Errorf("subscription Size parsing error %s", err.Error())
return nil, fmt.Errorf("value parsing error %s", err.Error())
}
meta.value = subSizeValue
} else {
if modePresent {
meta.mode = mode
}

switch meta.mode {
case pubsubModeSubscriptionSize:
meta.value = defaultTargetSubscriptionSize
case pubsubModeOldestUnackedMessageAge:
meta.value = defaultTargetOldestUnackedMessageAge
default:
return nil, fmt.Errorf("trigger mode %s must be one of %s, %s", meta.mode, pubsubModeSubscriptionSize, pubsubModeOldestUnackedMessageAge)
}

meta.targetSubscriptionSize = subscriptionSize
if valuePresent {
triggerValue, err := strconv.Atoi(value)
if err != nil {
return nil, fmt.Errorf("value parsing error %s", err.Error())
}
meta.value = triggerValue
}
}

if val, ok := config.TriggerMetadata["subscriptionName"]; ok {
Expand All @@ -87,14 +123,24 @@ func parsePubSubMetadata(config *ScalerConfig) (*pubsubMetadata, error) {

// IsActive checks if there are any messages in the subscription
func (s *pubsubScaler) IsActive(ctx context.Context) (bool, error) {
size, err := s.GetSubscriptionSize(ctx)

if err != nil {
gcpPubSubLog.Error(err, "error getting Active Status")
return false, err
switch s.metadata.mode {
case pubsubModeSubscriptionSize:
size, err := s.getMetrics(ctx, pubSubStackDriverSubscriptionSizeMetricName)
if err != nil {
gcpPubSubLog.Error(err, "error getting Active Status")
return false, err
}
return size > 0, nil
case pubsubModeOldestUnackedMessageAge:
_, err := s.getMetrics(ctx, pubSubStackDriverOldestUnackedMessageAgeMetricName)
if err != nil {
gcpPubSubLog.Error(err, "error getting Active Status")
return false, err
}
return true, nil
default:
return false, errors.New("unknown mode")
}

return size > 0, nil
}

func (s *pubsubScaler) Close(context.Context) error {
Expand All @@ -111,16 +157,16 @@ func (s *pubsubScaler) Close(context.Context) error {

// GetMetricSpecForScaling returns the metric spec for the HPA
func (s *pubsubScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
// Construct the target subscription size as a quantity
targetSubscriptionSizeQty := resource.NewQuantity(int64(s.metadata.targetSubscriptionSize), resource.DecimalSI)
// Construct the target value as a quantity
targetValueQty := resource.NewQuantity(int64(s.metadata.value), resource.DecimalSI)

externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("gcp-ps-%s", s.metadata.subscriptionName))),
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetSubscriptionSizeQty,
AverageValue: targetValueQty,
},
}

Expand All @@ -135,40 +181,59 @@ func (s *pubsubScaler) GetMetricSpecForScaling(context.Context) []v2beta2.Metric

// GetMetrics connects to Stack Driver and finds the size of the pub sub subscription
func (s *pubsubScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
size, err := s.GetSubscriptionSize(ctx)
var value int64
fira42073 marked this conversation as resolved.
Show resolved Hide resolved
var err error

if err != nil {
gcpPubSubLog.Error(err, "error getting subscription size")
return []external_metrics.ExternalMetricValue{}, err
switch s.metadata.mode {
case pubsubModeSubscriptionSize:
value, err = s.getMetrics(ctx, pubSubStackDriverSubscriptionSizeMetricName)
if err != nil {
gcpPubSubLog.Error(err, "error getting subscription size")
return []external_metrics.ExternalMetricValue{}, err
}
case pubsubModeOldestUnackedMessageAge:
value, err = s.getMetrics(ctx, pubSubStackDriverOldestUnackedMessageAgeMetricName)
if err != nil {
gcpPubSubLog.Error(err, "error getting oldest unacked message age")
return []external_metrics.ExternalMetricValue{}, err
}
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(size, resource.DecimalSI),
Value: *resource.NewQuantity(value, resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

// GetSubscriptionSize gets the number of messages in a subscription by calling the
// Stackdriver api
func (s *pubsubScaler) GetSubscriptionSize(ctx context.Context) (int64, error) {
func (s *pubsubScaler) setStackdriverClient(ctx context.Context) error {
var client *StackDriverClient
var err error
if s.metadata.gcpAuthorization.podIdentityProviderEnabled {
client, err = NewStackDriverClientPodIdentity(ctx)
} else {
client, err = NewStackDriverClient(ctx, s.metadata.gcpAuthorization.GoogleApplicationCredentials)
}

if err != nil {
return err
}
s.client = client
return nil
}

// getMetrics gets metric type value from stackdriver api
func (s *pubsubScaler) getMetrics(ctx context.Context, metricType string) (int64, error) {
if s.client == nil {
var client *StackDriverClient
var err error
if s.metadata.gcpAuthorization.podIdentityProviderEnabled {
client, err = NewStackDriverClientPodIdentity(ctx)
} else {
client, err = NewStackDriverClient(ctx, s.metadata.gcpAuthorization.GoogleApplicationCredentials)
}
err := s.setStackdriverClient(ctx)
if err != nil {
return -1, err
}
s.client = client
}

filter := `metric.type="` + pubSubStackDriverMetricName + `" AND resource.labels.subscription_id="` + s.metadata.subscriptionName + `"`
filter := `metric.type="` + metricType + `" AND resource.labels.subscription_id="` + s.metadata.subscriptionName + `"`

return s.client.GetMetrics(ctx, filter)
}
Expand Down
18 changes: 12 additions & 6 deletions pkg/scalers/gcp_pubsub_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,24 @@ type gcpPubSubMetricIdentifier struct {

var testPubSubMetadata = []parsePubSubMetadataTestData{
{map[string]string{}, map[string]string{}, true},
// all properly formed
// all properly formed with deprecated field
{nil, map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// all properly formed
{nil, map[string]string{"subscriptionName": "mysubscription", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// all properly formed with oldest unacked message age mode
{nil, map[string]string{"subscriptionName": "mysubscription", "mode": pubsubModeOldestUnackedMessageAge, "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// missing subscriptionName
{nil, map[string]string{"subscriptionName": "", "subscriptionSize": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
{nil, map[string]string{"subscriptionName": "", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// missing credentials
{nil, map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "7", "credentialsFromEnv": ""}, true},
{nil, map[string]string{"subscriptionName": "mysubscription", "value": "7", "credentialsFromEnv": ""}, true},
// malformed subscriptionSize
{nil, map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "AA", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
{nil, map[string]string{"subscriptionName": "mysubscription", "value": "AA", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// malformed mode
{nil, map[string]string{"subscriptionName": "", "mode": "AA", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// Credentials from AuthParams
{map[string]string{"GoogleApplicationCredentials": "Creds", "podIdentityOwner": ""}, map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "7"}, false},
{map[string]string{"GoogleApplicationCredentials": "Creds", "podIdentityOwner": ""}, map[string]string{"subscriptionName": "mysubscription", "value": "7"}, false},
// Credentials from AuthParams with empty creds
{map[string]string{"GoogleApplicationCredentials": "", "podIdentityOwner": ""}, map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "7"}, true},
{map[string]string{"GoogleApplicationCredentials": "", "podIdentityOwner": ""}, map[string]string{"subscriptionName": "mysubscription", "value": "7"}, true},
}

var gcpPubSubMetricIdentifiers = []gcpPubSubMetricIdentifier{
Expand Down