Skip to content

Commit

Permalink
gcp_pubsub: Support distribution-valued metrics and metrics from topi…
Browse files Browse the repository at this point in the history
…cs (#5246)

Signed-off-by: Kevin Mingtarja <[email protected]>
  • Loading branch information
kevinmingtarja authored Dec 7, 2023
1 parent df22d1f commit b730a5f
Show file tree
Hide file tree
Showing 6 changed files with 704 additions and 47 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ Here is an overview of all new **experimental** features:

- **General**: Add parameter queryParameters to prometheus-scaler ([#4962](https://github.com/kedacore/keda/issues/4962))
- **General**: Support TriggerAuthentication properties from ConfigMap ([#4830](https://github.com/kedacore/keda/issues/4830))
- **GCP pubsub scaler**: Support distribution-valued metrics and metrics from topics ([#5070](https://github.com/kedacore/keda/issues/5070))
- **Hashicorp Vault**: Add support to get secret that needs write operation (e.g. pki) ([#5067](https://github.com/kedacore/keda/issues/5067))
- **Hashicorp Vault**: Fix operator panic when spec.hashiCorpVault.credential.serviceAccount is not set ([#4964](https://github.com/kedacore/keda/issues/4964))
- **Hashicorp Vault**: Fix operator panic when using root token to authenticate to vault server ([#5192](https://github.com/kedacore/keda/issues/5192))
Expand Down
72 changes: 49 additions & 23 deletions pkg/scalers/gcp_pubsub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ import (
)

const (
compositeSubscriptionIDPrefix = "projects/[a-z][a-zA-Z0-9-]*[a-zA-Z0-9]/subscriptions/[a-zA-Z][a-zA-Z0-9-_~%\\+\\.]*"
prefixPubSubStackDriverSubscription = "pubsub.googleapis.com/subscription/"
compositeSubscriptionIDPrefix = "projects/[a-z][a-zA-Z0-9-]*[a-zA-Z0-9]/(subscriptions|topics)/[a-zA-Z][a-zA-Z0-9-_~%\\+\\.]*"
prefixPubSubResource = "pubsub.googleapis.com/"

resourceTypePubSubSubscription = "subscription"
resourceTypePubSubTopic = "topic"

pubSubModeSubscriptionSize = "SubscriptionSize"
pubSubDefaultValue = 10
Expand All @@ -37,9 +40,12 @@ type pubsubMetadata struct {
value float64
activationValue float64

subscriptionName string
// a resource is one of subscription or topic
resourceType string
resourceName string
gcpAuthorization *gcpAuthorizationMetadata
scalerIndex int
aggregation string
}

// NewPubSubScaler creates a new pubsubScaler
Expand All @@ -64,7 +70,6 @@ func NewPubSubScaler(config *ScalerConfig) (Scaler, error) {
}

func parsePubSubMetadata(config *ScalerConfig, logger logr.Logger) (*pubsubMetadata, error) {
// set subscription size to the default mode
meta := pubsubMetadata{mode: pubSubModeSubscriptionSize, value: pubSubDefaultValue}

mode, modePresent := config.TriggerMetadata["mode"]
Expand All @@ -74,6 +79,9 @@ func parsePubSubMetadata(config *ScalerConfig, logger logr.Logger) (*pubsubMetad
if modePresent || valuePresent {
return nil, errors.New("you can use either mode and value fields or subscriptionSize field")
}
if _, topicPresent := config.TriggerMetadata["topicName"]; topicPresent {
return nil, errors.New("you cannot use subscriptionSize field together with topicName field. Use subscriptionName field instead")
}
logger.Info("subscriptionSize field is deprecated. Use mode and value fields instead")
subSizeValue, err := strconv.ParseFloat(subSize, 64)
if err != nil {
Expand All @@ -94,14 +102,28 @@ func parsePubSubMetadata(config *ScalerConfig, logger logr.Logger) (*pubsubMetad
}
}

if val, ok := config.TriggerMetadata["subscriptionName"]; ok {
if val == "" {
meta.aggregation = config.TriggerMetadata["aggregation"]

sub, subPresent := config.TriggerMetadata["subscriptionName"]
topic, topicPresent := config.TriggerMetadata["topicName"]
if (!subPresent && !topicPresent) || (subPresent && topicPresent) {
return nil, fmt.Errorf("exactly one of subscription or topic name must be given")
}

if subPresent {
if sub == "" {
return nil, fmt.Errorf("no subscription name given")
}

meta.subscriptionName = val
meta.resourceName = sub
meta.resourceType = resourceTypePubSubSubscription
} else {
return nil, fmt.Errorf("no subscription name given")
if topic == "" {
return nil, fmt.Errorf("no topic name given")
}

meta.resourceName = topic
meta.resourceType = resourceTypePubSubTopic
}

meta.activationValue = 0
Expand Down Expand Up @@ -138,7 +160,7 @@ func (s *pubsubScaler) Close(context.Context) error {
func (s *pubsubScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("gcp-ps-%s", s.metadata.subscriptionName))),
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("gcp-ps-%s", s.metadata.resourceName))),
},
Target: GetMetricTargetMili(s.metricType, s.metadata.value),
}
Expand All @@ -162,8 +184,8 @@ func (s *pubsubScaler) GetMetricsAndActivity(ctx context.Context, metricName str
mode = "NumUndeliveredMessages"
}

metricType := prefixPubSubStackDriverSubscription + snakeCase(mode)

prefix := prefixPubSubResource + s.metadata.resourceType + "/"
metricType := prefix + snakeCase(mode)
value, err := s.getMetrics(ctx, metricType)
if err != nil {
s.logger.Error(err, "error getting metric", "metricType", metricType)
Expand Down Expand Up @@ -194,30 +216,34 @@ func (s *pubsubScaler) setStackdriverClient(ctx context.Context) error {
// getMetrics gets metric type value from stackdriver api
func (s *pubsubScaler) getMetrics(ctx context.Context, metricType string) (float64, error) {
if s.client == nil {
err := s.setStackdriverClient(ctx)
if err != nil {
if err := s.setStackdriverClient(ctx); err != nil {
return -1, err
}
}
subscriptionID, projectID := getSubscriptionData(s)
filter := `metric.type="` + metricType + `" AND resource.labels.subscription_id="` + subscriptionID + `"`
resourceID, projectID := getResourceData(s)
query, err := s.client.buildMQLQuery(
projectID, s.metadata.resourceType, metricType, resourceID, s.metadata.aggregation,
)
if err != nil {
return -1, err
}

// Pubsub metrics are collected every 60 seconds so no need to aggregate them.
// See: https://cloud.google.com/monitoring/api/metrics_gcp#gcp-pubsub
return s.client.GetMetrics(ctx, filter, projectID, nil)
return s.client.QueryMetrics(ctx, projectID, query)
}

func getSubscriptionData(s *pubsubScaler) (string, string) {
var subscriptionID string
func getResourceData(s *pubsubScaler) (string, string) {
var resourceID string
var projectID string

if regexpCompositeSubscriptionIDPrefix.MatchString(s.metadata.subscriptionName) {
subscriptionID = strings.Split(s.metadata.subscriptionName, "/")[3]
projectID = strings.Split(s.metadata.subscriptionName, "/")[1]
if regexpCompositeSubscriptionIDPrefix.MatchString(s.metadata.resourceName) {
resourceID = strings.Split(s.metadata.resourceName, "/")[3]
projectID = strings.Split(s.metadata.resourceName, "/")[1]
} else {
subscriptionID = s.metadata.subscriptionName
resourceID = s.metadata.resourceName
}
return subscriptionID, projectID
return resourceID, projectID
}

var (
Expand Down
30 changes: 23 additions & 7 deletions pkg/scalers/gcp_pubsub_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var testPubSubMetadata = []parsePubSubMetadataTestData{
{map[string]string{}, map[string]string{}, true},
// all properly formed with deprecated field
{nil, map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// all properly formed
// all properly formed with subscriptionName
{nil, map[string]string{"subscriptionName": "mysubscription", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS", "activationValue": "5"}, false},
// all properly formed with oldest unacked message age mode
{nil, map[string]string{"subscriptionName": "mysubscription", "mode": "OldestUnackedMessageAge", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
Expand All @@ -60,18 +60,34 @@ var testPubSubMetadata = []parsePubSubMetadataTestData{
{nil, map[string]string{"subscriptionName": "mysubscription", "value": "7.1", "credentialsFromEnv": "SAMPLE_CREDS", "activationValue": "2.1"}, false},
// All optional omitted
{nil, map[string]string{"subscriptionName": "mysubscription", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// value omittted when mode present
// value omitted when mode present
{nil, map[string]string{"subscriptionName": "mysubscription", "mode": "SubscriptionSize", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// all properly formed with topicName
{nil, map[string]string{"topicName": "mytopic", "mode": "MessageSizes", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// with full link to topic
{nil, map[string]string{"topicName": "projects/myproject/topics/mytopic", "mode": "MessageSizes", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// with full (bad) link to topic
{nil, map[string]string{"topicName": "projects/myproject/mytopic", "mode": "MessageSizes", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// both subscriptionName and topicName present
{nil, map[string]string{"subscriptionName": "mysubscription", "topicName": "mytopic", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// both subscriptionName and topicName missing
{nil, map[string]string{"value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// both subscriptionSize and topicName present
{nil, map[string]string{"subscriptionSize": "7", "topicName": "mytopic", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
}

var gcpPubSubMetricIdentifiers = []gcpPubSubMetricIdentifier{
{&testPubSubMetadata[1], 0, "s0-gcp-ps-mysubscription"},
{&testPubSubMetadata[1], 1, "s1-gcp-ps-mysubscription"},
{&testPubSubMetadata[16], 0, "s0-gcp-ps-mytopic"},
{&testPubSubMetadata[16], 1, "s1-gcp-ps-mytopic"},
}

var gcpSubscriptionNameTests = []gcpPubSubSubscription{
var gcpResourceNameTests = []gcpPubSubSubscription{
{&testPubSubMetadata[11], 1, "mysubscription", "myproject"},
{&testPubSubMetadata[12], 1, "projects/myproject/mysubscription", ""},
{&testPubSubMetadata[17], 1, "mytopic", "myproject"},
{&testPubSubMetadata[18], 1, "projects/myproject/mytopic", ""},
}

var gcpSubscriptionDefaults = []gcpPubSubSubscription{
Expand Down Expand Up @@ -123,16 +139,16 @@ func TestGcpPubSubGetMetricSpecForScaling(t *testing.T) {
}

func TestGcpPubSubSubscriptionName(t *testing.T) {
for _, testData := range gcpSubscriptionNameTests {
for _, testData := range gcpResourceNameTests {
meta, err := parsePubSubMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testPubSubResolvedEnv, ScalerIndex: testData.scalerIndex}, logr.Discard())
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockGcpPubSubScaler := pubsubScaler{nil, "", meta, logr.Discard()}
subscriptionID, projectID := getSubscriptionData(&mockGcpPubSubScaler)
resourceID, projectID := getResourceData(&mockGcpPubSubScaler)

if subscriptionID != testData.name || projectID != testData.projectID {
t.Error("Wrong Subscription parsing:", subscriptionID, projectID)
if resourceID != testData.name || projectID != testData.projectID {
t.Error("Wrong Subscription parsing:", resourceID, projectID)
}
}
}
Expand Down
Loading

0 comments on commit b730a5f

Please sign in to comment.