Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GCP Pub/Sub Scaler: add oldest unacked message age metric #2266

Merged
Merged
137 changes: 100 additions & 37 deletions pkg/scalers/gcp_pub_sub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package scalers

import (
"context"
"errors"
"fmt"
"strconv"

v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -17,8 +18,13 @@ import (
)

const (
defaultTargetSubscriptionSize = 5
pubSubStackDriverMetricName = "pubsub.googleapis.com/subscription/num_undelivered_messages"
defaultTargetSubscriptionSize = 5
defaultTargetOldestUnackedMessageAge = 10
pubSubStackDriverSubscriptionSizeMetricName = "pubsub.googleapis.com/subscription/num_undelivered_messages"
pubSubStackDriverOldestUnackedMessageAgeMetricName = "pubsub.googleapis.com/subscription/oldest_unacked_message_age"

pubsubModeSubscriptionSize = "SubscriptionSize"
pubsubModeOldestUnackedMessageAge = "OldestUnackedMessageAge"
)

type gcpAuthorizationMetadata struct {
Expand All @@ -33,10 +39,12 @@ type pubsubScaler struct {
}

type pubsubMetadata struct {
targetSubscriptionSize int
subscriptionName string
gcpAuthorization gcpAuthorizationMetadata
scalerIndex int
mode string
value int

subscriptionName string
gcpAuthorization gcpAuthorizationMetadata
scalerIndex int
fira42073 marked this conversation as resolved.
Show resolved Hide resolved
}

var gcpPubSubLog = logf.Log.WithName("gcp_pub_sub_scaler")
Expand All @@ -55,15 +63,28 @@ func NewPubSubScaler(config *ScalerConfig) (Scaler, error) {

func parsePubSubMetadata(config *ScalerConfig) (*pubsubMetadata, error) {
meta := pubsubMetadata{}
meta.targetSubscriptionSize = defaultTargetSubscriptionSize
meta.mode = pubsubModeSubscriptionSize

if val, ok := config.TriggerMetadata["subscriptionSize"]; ok {
subscriptionSize, err := strconv.Atoi(val)
mode, modePresent := config.TriggerMetadata["mode"]
if modePresent {
meta.mode = mode
}

switch meta.mode {
case pubsubModeSubscriptionSize:
meta.value = defaultTargetSubscriptionSize
case pubsubModeOldestUnackedMessageAge:
meta.value = defaultTargetOldestUnackedMessageAge
default:
return nil, fmt.Errorf("trigger mode %s must be one of %s, %s", meta.mode, pubsubModeSubscriptionSize, pubsubModeOldestUnackedMessageAge)
}

if val, ok := config.TriggerMetadata["value"]; ok {
triggerValue, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("subscription Size parsing error %s", err.Error())
return nil, fmt.Errorf("value parsing error %s", err.Error())
}

meta.targetSubscriptionSize = subscriptionSize
meta.value = triggerValue
}

if val, ok := config.TriggerMetadata["subscriptionName"]; ok {
Expand All @@ -87,14 +108,23 @@ func parsePubSubMetadata(config *ScalerConfig) (*pubsubMetadata, error) {

// IsActive checks if there are any messages in the subscription
func (s *pubsubScaler) IsActive(ctx context.Context) (bool, error) {
size, err := s.GetSubscriptionSize(ctx)

if err != nil {
gcpPubSubLog.Error(err, "error getting Active Status")
return false, err
if s.metadata.mode == pubsubModeSubscriptionSize {
fira42073 marked this conversation as resolved.
Show resolved Hide resolved
size, err := s.getSubscriptionSize(ctx)
if err != nil {
gcpPubSubLog.Error(err, "error getting Active Status")
return false, err
}
return size > 0, nil
} else if s.metadata.mode == pubsubModeOldestUnackedMessageAge {
_, err := s.getOldestUnackedMessageAge(ctx)
if err != nil {
gcpPubSubLog.Error(err, "error getting Active Status")
return false, err
}
return true, nil
}

return size > 0, nil
return false, errors.New("unknown mode")
}

func (s *pubsubScaler) Close(context.Context) error {
Expand All @@ -111,16 +141,16 @@ func (s *pubsubScaler) Close(context.Context) error {

// GetMetricSpecForScaling returns the metric spec for the HPA
func (s *pubsubScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
// Construct the target subscription size as a quantity
targetSubscriptionSizeQty := resource.NewQuantity(int64(s.metadata.targetSubscriptionSize), resource.DecimalSI)
// Construct the target value as a quantity
targetValueQty := resource.NewQuantity(int64(s.metadata.value), resource.DecimalSI)

externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("%s-%s", "gcp", s.metadata.subscriptionName))),
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetSubscriptionSizeQty,
AverageValue: targetValueQty,
},
}

Expand All @@ -135,40 +165,73 @@ func (s *pubsubScaler) GetMetricSpecForScaling(context.Context) []v2beta2.Metric

// GetMetrics connects to Stack Driver and finds the size of the pub sub subscription
func (s *pubsubScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
size, err := s.GetSubscriptionSize(ctx)
var value int64
fira42073 marked this conversation as resolved.
Show resolved Hide resolved
var err error

if err != nil {
gcpPubSubLog.Error(err, "error getting subscription size")
return []external_metrics.ExternalMetricValue{}, err
if s.metadata.mode == pubsubModeSubscriptionSize {
value, err = s.getSubscriptionSize(ctx)
if err != nil {
gcpPubSubLog.Error(err, "error getting subscription size")
return []external_metrics.ExternalMetricValue{}, err
}
} else if s.metadata.mode == pubsubModeOldestUnackedMessageAge {
value, err = s.getOldestUnackedMessageAge(ctx)
if err != nil {
gcpPubSubLog.Error(err, "error getting oldest unacked message age")
return []external_metrics.ExternalMetricValue{}, err
}
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(size, resource.DecimalSI),
Value: *resource.NewQuantity(value, resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

// GetSubscriptionSize gets the number of messages in a subscription by calling the
func (s *pubsubScaler) setStackdriverClient(ctx context.Context) error {
var client *StackDriverClient
var err error
if s.metadata.gcpAuthorization.podIdentityProviderEnabled {
client, err = NewStackDriverClientPodIdentity(ctx)
} else {
client, err = NewStackDriverClient(ctx, s.metadata.gcpAuthorization.GoogleApplicationCredentials)
}

if err != nil {
return err
}
s.client = client
return nil
}

// getSubscriptionSize gets the number of messages in a subscription by calling the
// Stackdriver api
func (s *pubsubScaler) GetSubscriptionSize(ctx context.Context) (int64, error) {
func (s *pubsubScaler) getSubscriptionSize(ctx context.Context) (int64, error) {
if s.client == nil {
var client *StackDriverClient
var err error
if s.metadata.gcpAuthorization.podIdentityProviderEnabled {
client, err = NewStackDriverClientPodIdentity(ctx)
} else {
client, err = NewStackDriverClient(ctx, s.metadata.gcpAuthorization.GoogleApplicationCredentials)
err := s.setStackdriverClient(ctx)
if err != nil {
return -1, err
}
}

filter := `metric.type="` + pubSubStackDriverSubscriptionSizeMetricName + `" AND resource.labels.subscription_id="` + s.metadata.subscriptionName + `"`

return s.client.GetMetrics(ctx, filter)
}

// getOldestUnackedMessageAge gets oldest unacked message age in a subscription by calling stackdriver api
fira42073 marked this conversation as resolved.
Show resolved Hide resolved
func (s *pubsubScaler) getOldestUnackedMessageAge(ctx context.Context) (int64, error) {
if s.client == nil {
err := s.setStackdriverClient(ctx)
if err != nil {
return -1, err
}
s.client = client
}

filter := `metric.type="` + pubSubStackDriverMetricName + `" AND resource.labels.subscription_id="` + s.metadata.subscriptionName + `"`
filter := `metric.type="` + pubSubStackDriverOldestUnackedMessageAgeMetricName + `" AND resource.labels.subscription_id="` + s.metadata.subscriptionName + `"`

return s.client.GetMetrics(ctx, filter)
}
Expand Down
16 changes: 10 additions & 6 deletions pkg/scalers/gcp_pubsub_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,21 @@ type gcpPubSubMetricIdentifier struct {
var testPubSubMetadata = []parsePubSubMetadataTestData{
{map[string]string{}, map[string]string{}, true},
// all properly formed
{nil, map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
{nil, map[string]string{"subscriptionName": "mysubscription", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// all properly formed with oldest unacked message age mode
{nil, map[string]string{"subscriptionName": "mysubscription", "mode": pubsubModeOldestUnackedMessageAge, "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// missing subscriptionName
{nil, map[string]string{"subscriptionName": "", "subscriptionSize": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
{nil, map[string]string{"subscriptionName": "", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// missing credentials
{nil, map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "7", "credentialsFromEnv": ""}, true},
{nil, map[string]string{"subscriptionName": "mysubscription", "value": "7", "credentialsFromEnv": ""}, true},
// malformed subscriptionSize
{nil, map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "AA", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
{nil, map[string]string{"subscriptionName": "mysubscription", "value": "AA", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// malformed mode
{nil, map[string]string{"subscriptionName": "", "mode": "AA", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// Credentials from AuthParams
{map[string]string{"GoogleApplicationCredentials": "Creds", "podIdentityOwner": ""}, map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "7"}, false},
{map[string]string{"GoogleApplicationCredentials": "Creds", "podIdentityOwner": ""}, map[string]string{"subscriptionName": "mysubscription", "value": "7"}, false},
// Credentials from AuthParams with empty creds
{map[string]string{"GoogleApplicationCredentials": "", "podIdentityOwner": ""}, map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "7"}, true},
{map[string]string{"GoogleApplicationCredentials": "", "podIdentityOwner": ""}, map[string]string{"subscriptionName": "mysubscription", "value": "7"}, true},
}

var gcpPubSubMetricIdentifiers = []gcpPubSubMetricIdentifier{
Expand Down