diff --git a/CHANGELOG.md b/CHANGELOG.md index c7d67ee9127..a1d54692338 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -70,6 +70,7 @@ Here is an overview of all new **experimental** features: ### Improvements +- **Azure queue scaler**: Added new configuration option 'queueLengthStrategy' ([#4478](https://github.com/kedacore/keda/issues/4478)) - **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778)) - **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738)) - **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689)) diff --git a/pkg/scalers/azure/azure_queue.go b/pkg/scalers/azure/azure_queue.go index 39ddaa7eacc..8cb3f1c7deb 100644 --- a/pkg/scalers/azure/azure_queue.go +++ b/pkg/scalers/azure/azure_queue.go @@ -18,14 +18,19 @@ package azure import ( "context" + "strings" "github.com/Azure/azure-storage-queue-go/azqueue" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" ) +const ( + maxPeekMessages int32 = 32 +) + // GetAzureQueueLength returns the length of a queue in int, see https://learn.microsoft.com/en-us/azure/storage/queues/storage-dotnet-how-to-use-queues?tabs=dotnet#get-the-queue-length -func GetAzureQueueLength(ctx context.Context, podIdentity kedav1alpha1.AuthPodIdentity, connectionString, queueName, accountName, endpointSuffix string) (int64, error) { +func GetAzureQueueLength(ctx context.Context, podIdentity kedav1alpha1.AuthPodIdentity, connectionString, queueName, accountName, endpointSuffix, queueLengthStrategy string) (int64, error) { credential, endpoint, err := ParseAzureStorageQueueConnection(ctx, podIdentity, connectionString, accountName, endpointSuffix) if err != nil { return -1, err @@ -35,6 +40,27 @@ func GetAzureQueueLength(ctx context.Context, podIdentity kedav1alpha1.AuthPodId serviceURL := azqueue.NewServiceURL(*endpoint, p) queueURL := serviceURL.NewQueueURL(queueName) + strategy := strings.ToLower(queueLengthStrategy) + if strategy == "visibleonly" { + visibleMessageCount, err := getVisibleCount(ctx, &queueURL, maxPeekMessages) + if err != nil { + return -1, err + } + + // Queue has less messages than we allowed to peek for, so no need to get the approximation + if visibleMessageCount < int64(maxPeekMessages) { + return visibleMessageCount, nil + } + + props, err := queueURL.GetProperties(ctx) + if err != nil { + return -1, err + } + + return int64(props.ApproximateMessagesCount()), nil + } + + // Default strategy (visible + invisible messages) props, err := queueURL.GetProperties(ctx) if err != nil { return -1, err @@ -42,3 +68,13 @@ func GetAzureQueueLength(ctx context.Context, podIdentity kedav1alpha1.AuthPodId return int64(props.ApproximateMessagesCount()), nil } + +func getVisibleCount(ctx context.Context, queueURL *azqueue.QueueURL, maxCount int32) (int64, error) { + messagesURL := queueURL.NewMessagesURL() + queue, err := messagesURL.Peek(ctx, maxCount) + if err != nil { + return 0, err + } + num := queue.NumMessages() + return int64(num), nil +} diff --git a/pkg/scalers/azure/azure_queue_test.go b/pkg/scalers/azure/azure_queue_test.go index eb47cfb0c59..deea58cbfea 100644 --- a/pkg/scalers/azure/azure_queue_test.go +++ b/pkg/scalers/azure/azure_queue_test.go @@ -7,34 +7,24 @@ import ( "testing" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" + "github.com/stretchr/testify/assert" ) func TestGetQueueLength(t *testing.T) { - length, err := GetAzureQueueLength(context.TODO(), kedav1alpha1.AuthPodIdentity{}, "", "queueName", "", "") - if length != -1 { - t.Error("Expected length to be -1, but got", length) - } - - if err == nil { - t.Error("Expected error for empty connection string, but got nil") - } - - if !errors.Is(err, ErrAzureConnectionStringKeyName) { - t.Error("Expected error to contain parsing error message, but got", err.Error()) - } - - length, err = GetAzureQueueLength(context.TODO(), kedav1alpha1.AuthPodIdentity{}, "DefaultEndpointsProtocol=https;AccountName=name;AccountKey=key==;EndpointSuffix=core.windows.net", "queueName", "", "") - - if length != -1 { - t.Error("Expected length to be -1, but got", length) - } - - if err == nil { - t.Error("Expected error for empty connection string, but got nil") - } - - var base64Error base64.CorruptInputError - if !errors.As(err, &base64Error) { - t.Error("Expected error to contain base64 error message, but got", err.Error()) - } + t.Run("Empty connection string", func(t *testing.T) { + length, err := GetAzureQueueLength(context.TODO(), kedav1alpha1.AuthPodIdentity{}, "", "queueName", "", "", "") + + assert.Equal(t, int64(-1), length, "Expected length to be -1") + assert.NotNil(t, err, "Expected error for empty connection string") + assert.True(t, errors.Is(err, ErrAzureConnectionStringKeyName), "Expected error to contain parsing error message") + }) + + t.Run("Invalid base64 connection string", func(t *testing.T) { + length, err := GetAzureQueueLength(context.TODO(), kedav1alpha1.AuthPodIdentity{}, "DefaultEndpointsProtocol=https;AccountName=name;AccountKey=key==;EndpointSuffix=core.windows.net", "queueName", "", "", "") + + assert.Equal(t, int64(-1), length, "Expected length to be -1") + assert.NotNil(t, err, "Expected error for invalid connection string") + var base64Error base64.CorruptInputError + assert.True(t, errors.As(err, &base64Error), "Expected error to contain base64 error message") + }) } diff --git a/pkg/scalers/azure_queue_scaler.go b/pkg/scalers/azure_queue_scaler.go index 8609d84a88a..04eaa6ca6c5 100644 --- a/pkg/scalers/azure_queue_scaler.go +++ b/pkg/scalers/azure_queue_scaler.go @@ -21,6 +21,7 @@ import ( "fmt" "net/http" "strconv" + "strings" "github.com/go-logr/logr" v2 "k8s.io/api/autoscaling/v2" @@ -54,6 +55,7 @@ type azureQueueMetadata struct { connection string accountName string endpointSuffix string + queueLengthStrategy string triggerIndex int } @@ -120,6 +122,17 @@ func parseAzureQueueMetadata(config *scalersconfig.ScalerConfig, logger logr.Log return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no queueName given") } + if val, ok := config.TriggerMetadata["queueLengthStrategy"]; ok && val != "" { + strategy := strings.ToLower(val) + if strategy == "default" || strategy == "visibleonly" { + meta.queueLengthStrategy = strategy + } else { + return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("invalid queueLengthStrategy %s given", val) + } + } else { + meta.queueLengthStrategy = "default" + } + // If the Use AAD Pod Identity is not present, or set to "none" // then check for connection string switch config.PodIdentity.Provider { @@ -179,6 +192,7 @@ func (s *azureQueueScaler) GetMetricsAndActivity(ctx context.Context, metricName s.metadata.queueName, s.metadata.accountName, s.metadata.endpointSuffix, + s.metadata.queueLengthStrategy, ) if err != nil {