Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support regex usage in Azure Service Bus scaler. #3607

Merged
merged 7 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
### New

- **General**: Provide Prometheus metric with indication of total number of triggers per trigger type in `ScaledJob`/`ScaledObject`. ([#3663](https://github.com/kedacore/keda/issues/3663))
- **Azure Service Bus**: Add support for Shared Access Signature (SAS) tokens for authentication. ([#2920](https://github.com/kedacore/keda/issues/2920))
- **Azure Service Bus Scaler**: Add support for Shared Access Signature (SAS) tokens for authentication. ([#2920](https://github.com/kedacore/keda/issues/2920))
- **Azure Service Bus Scaler:** Support regex usage in queueName / subscriptionName parameters. ([#1624](https://github.com/kedacore/keda/issues/1624))

### Improvements

Expand Down
147 changes: 131 additions & 16 deletions pkg/scalers/azure_servicebus_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ limitations under the License.
import (
"context"
"fmt"
"regexp"
"strconv"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
Expand Down Expand Up @@ -66,6 +67,9 @@ type azureServiceBusMetadata struct {
connection string
entityType entityType
fullyQualifiedNamespace string
useRegex bool
entityNameRegex *regexp.Regexp
operation string
scalerIndex int
}

Expand Down Expand Up @@ -118,6 +122,28 @@ func parseAzureServiceBusMetadata(config *ScalerConfig, logger logr.Logger) (*az
meta.activationTargetLength = activationMessageCount
}

meta.useRegex = false
if val, ok := config.TriggerMetadata["useRegex"]; ok {
useRegex, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("useRegex has invalid value")
}
meta.useRegex = useRegex
}

meta.operation = sumOperation
if meta.useRegex {
if val, ok := config.TriggerMetadata["operation"]; ok {
meta.operation = val
}

switch meta.operation {
case avgOperation, maxOperation, sumOperation:
default:
return nil, fmt.Errorf("operation must be one of avg, max, or sum")
}
}

// get queue name OR topic and subscription name & set entity type accordingly
if val, ok := config.TriggerMetadata["queueName"]; ok {
meta.queueName = val
Expand All @@ -126,6 +152,16 @@ func parseAzureServiceBusMetadata(config *ScalerConfig, logger logr.Logger) (*az
if _, ok := config.TriggerMetadata["subscriptionName"]; ok {
return nil, fmt.Errorf("subscription name provided with queue name")
}

if meta.useRegex {
entityNameRegex, err := regexp.Compile(meta.queueName)
if err != nil {
return nil, fmt.Errorf("queueName is not a valid regular expression")
}
entityNameRegex.Longest()

meta.entityNameRegex = entityNameRegex
}
}

if val, ok := config.TriggerMetadata["topicName"]; ok {
Expand All @@ -140,6 +176,16 @@ func parseAzureServiceBusMetadata(config *ScalerConfig, logger logr.Logger) (*az
} else {
return nil, fmt.Errorf("no subscription name provided with topic name")
}

if meta.useRegex {
entityNameRegex, err := regexp.Compile(meta.subscriptionName)
if err != nil {
return nil, fmt.Errorf("subscriptionName is not a valid regular expression")
}
entityNameRegex.Longest()

meta.entityNameRegex = entityNameRegex
}
}
if meta.entityType == none {
return nil, fmt.Errorf("no service bus entity type set")
Expand Down Expand Up @@ -200,10 +246,18 @@ func (s *azureServiceBusScaler) Close(context.Context) error {
// Returns the metric spec to be used by the HPA
func (s *azureServiceBusScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
metricName := ""

var entityType string
if s.metadata.entityType == queue {
metricName = s.metadata.queueName
entityType = "queue"
} else {
metricName = s.metadata.topicName
entityType = "topic"
}

if s.metadata.useRegex {
metricName = fmt.Sprintf("%s-regex", entityType)
}

externalMetric := &v2.ExternalMetricSource{
Expand Down Expand Up @@ -240,9 +294,9 @@ func (s *azureServiceBusScaler) getAzureServiceBusLength(ctx context.Context) (i
// switch case for queue vs topic here
switch s.metadata.entityType {
case queue:
return getQueueLength(ctx, adminClient, s.metadata.queueName)
return getQueueLength(ctx, adminClient, s.metadata)
case subscription:
return getSubscriptionLength(ctx, adminClient, s.metadata.topicName, s.metadata.subscriptionName)
return getSubscriptionLength(ctx, adminClient, s.metadata)
default:
return -1, fmt.Errorf("no entity type")
}
Expand Down Expand Up @@ -303,26 +357,87 @@ func (s *azureServiceBusScaler) getServiceBusAdminClient(ctx context.Context) (*
return adminClient, nil
}

func getQueueLength(ctx context.Context, adminClient *admin.Client, queueName string) (int64, error) {
queueEntity, err := adminClient.GetQueueRuntimeProperties(ctx, queueName, &admin.GetQueueRuntimePropertiesOptions{})
if err != nil {
return -1, err
func getQueueLength(ctx context.Context, adminClient *admin.Client, meta *azureServiceBusMetadata) (int64, error) {
if !meta.useRegex {
queueEntity, err := adminClient.GetQueueRuntimeProperties(ctx, meta.queueName, &admin.GetQueueRuntimePropertiesOptions{})
if err != nil {
return -1, err
}
if queueEntity == nil {
return -1, fmt.Errorf("queue %s doesn't exist", meta.queueName)
}

return int64(queueEntity.ActiveMessageCount), nil
}
if queueEntity == nil {
return -1, fmt.Errorf("queue %s doesn't exist", queueName)

messageCounts := make([]int64, 0)

queuePager := adminClient.NewListQueuesRuntimePropertiesPager(nil)
for queuePager.More() {
page, err := queuePager.NextPage(ctx)
if err != nil {
return -1, err
}

for _, queue := range page.QueueRuntimeProperties {
if meta.entityNameRegex.FindString(queue.QueueName) == queue.QueueName {
messageCounts = append(messageCounts, int64(queue.ActiveMessageCount))
}
}
}

return int64(queueEntity.ActiveMessageCount), nil
return performOperation(messageCounts, meta.operation), nil
}

func getSubscriptionLength(ctx context.Context, adminClient *admin.Client, topicName, subscriptionName string) (int64, error) {
subscriptionEntity, err := adminClient.GetSubscriptionRuntimeProperties(ctx, topicName, subscriptionName, &admin.GetSubscriptionRuntimePropertiesOptions{})
if err != nil {
return -1, err
func getSubscriptionLength(ctx context.Context, adminClient *admin.Client, meta *azureServiceBusMetadata) (int64, error) {
if !meta.useRegex {
subscriptionEntity, err := adminClient.GetSubscriptionRuntimeProperties(ctx, meta.topicName, meta.subscriptionName,
&admin.GetSubscriptionRuntimePropertiesOptions{})
if err != nil {
return -1, err
}
if subscriptionEntity == nil {
return -1, fmt.Errorf("subscription %s doesn't exist in topic %s", meta.subscriptionName, meta.topicName)
}

return int64(subscriptionEntity.ActiveMessageCount), nil
}
if subscriptionEntity == nil {
return -1, fmt.Errorf("subscription %s doesn't exist in topic %s", subscriptionName, topicName)

messageCounts := make([]int64, 0)

subscriptionPager := adminClient.NewListSubscriptionsRuntimePropertiesPager(meta.topicName, nil)
for subscriptionPager.More() {
page, err := subscriptionPager.NextPage(ctx)
if err != nil {
return -1, err
}

for _, subscription := range page.SubscriptionRuntimeProperties {
if meta.entityNameRegex.FindString(subscription.SubscriptionName) == subscription.SubscriptionName {
messageCounts = append(messageCounts, int64(subscription.ActiveMessageCount))
}
}
}

return int64(subscriptionEntity.ActiveMessageCount), nil
return performOperation(messageCounts, meta.operation), nil
}

func performOperation(messageCounts []int64, operation string) int64 {
var result int64
for _, val := range messageCounts {
switch operation {
case avgOperation, sumOperation:
result += val
case maxOperation:
if val > result {
result = val
}
}
}

total := int64(len(messageCounts))
if operation == "avg" && total != 0 {
return result / total
}
return result
}
21 changes: 21 additions & 0 deletions pkg/scalers/azure_servicebus_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,27 @@ var parseServiceBusMetadataDataset = []parseServiceBusMetadataTestData{
{map[string]string{"queueName": queueName, "namespace": namespaceName}, false, queue, defaultSuffix, map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload},
// invalid activation message count
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "messageCount": messageCount, "activationMessageCount": "AA"}, true, queue, defaultSuffix, map[string]string{}, ""},
// queue with incorrect useRegex value
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "useRegex": "ababa"}, true, queue, defaultSuffix, map[string]string{}, ""},
// properly formed queues with regex
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "useRegex": "false"}, false, queue, defaultSuffix, map[string]string{}, ""},
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "useRegex": "true", "operation": avgOperation}, false, queue, defaultSuffix, map[string]string{}, ""},
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "useRegex": "true", "operation": sumOperation}, false, queue, defaultSuffix, map[string]string{}, ""},
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "useRegex": "true", "operation": maxOperation}, false, queue, defaultSuffix, map[string]string{}, ""},
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "useRegex": "true", "operation": "random"}, true, queue, defaultSuffix, map[string]string{}, ""},
// queue with invalid regex string
{map[string]string{"queueName": "*", "connectionFromEnv": connectionSetting, "useRegex": "true", "operation": "avg"}, true, queue, defaultSuffix, map[string]string{}, ""},

// subscription with incorrect useRegex value
{map[string]string{"topicName": topicName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting, "useRegex": "ababa"}, true, subscription, defaultSuffix, map[string]string{}, ""},
// properly formed subscriptions with regex
{map[string]string{"topicName": topicName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting, "useRegex": "false"}, false, subscription, defaultSuffix, map[string]string{}, ""},
{map[string]string{"topicName": topicName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting, "useRegex": "true", "operation": avgOperation}, false, subscription, defaultSuffix, map[string]string{}, ""},
{map[string]string{"topicName": topicName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting, "useRegex": "true", "operation": sumOperation}, false, subscription, defaultSuffix, map[string]string{}, ""},
{map[string]string{"topicName": topicName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting, "useRegex": "true", "operation": maxOperation}, false, subscription, defaultSuffix, map[string]string{}, ""},
{map[string]string{"topicName": topicName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting, "useRegex": "true", "operation": "random"}, true, subscription, defaultSuffix, map[string]string{}, ""},
// subscription with invalid regex string
{map[string]string{"topicName": topicName, "subscriptionName": "*", "connectionFromEnv": connectionSetting, "useRegex": "true", "operation": "avg"}, true, subscription, defaultSuffix, map[string]string{}, ""},
}

var azServiceBusMetricIdentifiers = []azServiceBusMetricIdentifier{
Expand Down
1 change: 1 addition & 0 deletions tests/.env
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ AZURE_DEVOPS_PROJECT=
AZURE_KEYVAULT_URI=
AZURE_LOG_ANALYTICS_WORKSPACE_ID=
AZURE_RESOURCE_GROUP=
AZURE_SERVICE_BUS_CONNECTION_STRING=
AZURE_SP_APP_ID=
AZURE_SP_KEY=
AZURE_SP_OBJECT_ID=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

// Load environment variables from .env file
var _ = godotenv.Load("../../.env")
var _ = godotenv.Load("../../../.env")

const (
testName = "azure-app-insights-test"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

// Load environment variables from .env file
var _ = godotenv.Load("../../.env")
var _ = godotenv.Load("../../../.env")

const (
testName = "azure-blob-test"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

// Load environment variables from .env file
var _ = godotenv.Load("../../.env")
var _ = godotenv.Load("../../../.env")

const (
testName = "azure-data-explorer-test"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

// Load environment variables from .env file
var _ = godotenv.Load("../../.env")
var _ = godotenv.Load("../../../.env")

const (
testName = "azure-log-analytics-test"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

// Load environment variables from .env file
var _ = godotenv.Load("../../.env")
var _ = godotenv.Load("../../../.env")

const (
testName = "azure-pipelines-test"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

// Load environment variables from .env file
var _ = godotenv.Load("../../.env")
var _ = godotenv.Load("../../../.env")

const (
testName = "azure-queue-test"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

// Load environment variables from .env file
var _ = godotenv.Load("../../.env")
var _ = godotenv.Load("../../../.env")

const (
testName = "azure-service-bus-queue-test"
Expand Down
Loading