Skip to content

Commit

Permalink
Update GetAzureQueueLength in azure storage queue scaler to consider …
Browse files Browse the repository at this point in the history
…queueLengthStrategy

Signed-off-by: Leonardo D'Ippolito <[email protected]>
  • Loading branch information
leodip committed Jun 9, 2024
1 parent fd4fcc3 commit a95588d
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Here is an overview of all new **experimental** features:

### Improvements

- **Azure queue scaler**: Added new configuration option 'queueLengthStrategy' ([#4478](https://github.com/kedacore/keda/issues/4478))
- **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778))
- **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738))
- **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689))
Expand Down
38 changes: 37 additions & 1 deletion pkg/scalers/azure/azure_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@ package azure

import (
"context"
"strings"

"github.com/Azure/azure-storage-queue-go/azqueue"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
)

const (
maxPeekMessages int32 = 32
)

// GetAzureQueueLength returns the length of a queue in int, see https://learn.microsoft.com/en-us/azure/storage/queues/storage-dotnet-how-to-use-queues?tabs=dotnet#get-the-queue-length
func GetAzureQueueLength(ctx context.Context, podIdentity kedav1alpha1.AuthPodIdentity, connectionString, queueName, accountName, endpointSuffix string) (int64, error) {
func GetAzureQueueLength(ctx context.Context, podIdentity kedav1alpha1.AuthPodIdentity, connectionString, queueName, accountName, endpointSuffix, queueLengthStrategy string) (int64, error) {
credential, endpoint, err := ParseAzureStorageQueueConnection(ctx, podIdentity, connectionString, accountName, endpointSuffix)
if err != nil {
return -1, err
Expand All @@ -35,10 +40,41 @@ func GetAzureQueueLength(ctx context.Context, podIdentity kedav1alpha1.AuthPodId
serviceURL := azqueue.NewServiceURL(*endpoint, p)
queueURL := serviceURL.NewQueueURL(queueName)

strategy := strings.ToLower(queueLengthStrategy)
if strategy == "visibleonly" {
visibleMessageCount, err := getVisibleCount(ctx, &queueURL, maxPeekMessages)
if err != nil {
return -1, err
}

// Queue has less messages than we allowed to peek for, so no need to get the approximation
if visibleMessageCount < int64(maxPeekMessages) {
return visibleMessageCount, nil
}

props, err := queueURL.GetProperties(ctx)
if err != nil {
return -1, err
}

return int64(props.ApproximateMessagesCount()), nil
}

// Default strategy (visible + invisible messages)
props, err := queueURL.GetProperties(ctx)
if err != nil {
return -1, err
}

return int64(props.ApproximateMessagesCount()), nil
}

func getVisibleCount(ctx context.Context, queueURL *azqueue.QueueURL, maxCount int32) (int64, error) {
messagesURL := queueURL.NewMessagesURL()
queue, err := messagesURL.Peek(ctx, maxCount)
if err != nil {
return 0, err
}
num := queue.NumMessages()
return int64(num), nil
}
4 changes: 2 additions & 2 deletions pkg/scalers/azure/azure_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func TestGetQueueLength(t *testing.T) {
length, err := GetAzureQueueLength(context.TODO(), kedav1alpha1.AuthPodIdentity{}, "", "queueName", "", "")
length, err := GetAzureQueueLength(context.TODO(), kedav1alpha1.AuthPodIdentity{}, "", "queueName", "", "", "")
if length != -1 {
t.Error("Expected length to be -1, but got", length)
}
Expand All @@ -23,7 +23,7 @@ func TestGetQueueLength(t *testing.T) {
t.Error("Expected error to contain parsing error message, but got", err.Error())
}

length, err = GetAzureQueueLength(context.TODO(), kedav1alpha1.AuthPodIdentity{}, "DefaultEndpointsProtocol=https;AccountName=name;AccountKey=key==;EndpointSuffix=core.windows.net", "queueName", "", "")
length, err = GetAzureQueueLength(context.TODO(), kedav1alpha1.AuthPodIdentity{}, "DefaultEndpointsProtocol=https;AccountName=name;AccountKey=key==;EndpointSuffix=core.windows.net", "queueName", "", "", "")

if length != -1 {
t.Error("Expected length to be -1, but got", length)
Expand Down
14 changes: 14 additions & 0 deletions pkg/scalers/azure_queue_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"net/http"
"strconv"
"strings"

"github.com/go-logr/logr"
v2 "k8s.io/api/autoscaling/v2"
Expand Down Expand Up @@ -54,6 +55,7 @@ type azureQueueMetadata struct {
connection string
accountName string
endpointSuffix string
queueLengthStrategy string
triggerIndex int
}

Expand Down Expand Up @@ -120,6 +122,17 @@ func parseAzureQueueMetadata(config *scalersconfig.ScalerConfig, logger logr.Log
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no queueName given")
}

if val, ok := config.TriggerMetadata["queueLengthStrategy"]; ok && val != "" {
strategy := strings.ToLower(val)
if strategy == "default" || strategy == "visibleonly" {
meta.queueLengthStrategy = strategy
} else {
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("invalid queueLengthStrategy %s given", val)
}
} else {
meta.queueLengthStrategy = "default"
}

// If the Use AAD Pod Identity is not present, or set to "none"
// then check for connection string
switch config.PodIdentity.Provider {
Expand Down Expand Up @@ -179,6 +192,7 @@ func (s *azureQueueScaler) GetMetricsAndActivity(ctx context.Context, metricName
s.metadata.queueName,
s.metadata.accountName,
s.metadata.endpointSuffix,
s.metadata.queueLengthStrategy,
)

if err != nil {
Expand Down

0 comments on commit a95588d

Please sign in to comment.