Skip to content

Commit

Permalink
feat(aws-sqs): Support for scaling to include delayed messages (kedac…
Browse files Browse the repository at this point in the history
…ore#4377)

Signed-off-by: Chris Taylor <[email protected]>
  • Loading branch information
phr3nzii committed Aug 23, 2023
1 parent e694286 commit a44e980
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 14 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

### New

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **AWS SQS Scaler**: Support for scaling to include delayed messages. [#4377](https://github.com/kedacore/keda/issues/4377)

### Improvements

Expand Down
30 changes: 18 additions & 12 deletions pkg/scalers/aws_sqs_queue_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,9 @@ const (
targetQueueLengthDefault = 5
activationTargetQueueLengthDefault = 0
defaultScaleOnInFlight = true
defaultScaleOnDelayed = false
)

var awsSqsQueueMetricNamesForScalingInFlight = []string{
"ApproximateNumberOfMessages",
"ApproximateNumberOfMessagesNotVisible",
}

var awsSqsQueueMetricNamesForNotScalingInFlight = []string{
"ApproximateNumberOfMessages",
}

type awsSqsQueueScaler struct {
metricType v2.MetricTargetType
metadata *awsSqsQueueMetadata
Expand All @@ -49,6 +41,7 @@ type awsSqsQueueMetadata struct {
awsAuthorization awsAuthorizationMetadata
scalerIndex int
scaleOnInFlight bool
scaleOnDelayed bool
awsSqsQueueMetricNames []string
}

Expand Down Expand Up @@ -78,6 +71,7 @@ func parseAwsSqsQueueMetadata(config *ScalerConfig, logger logr.Logger) (*awsSqs
meta := awsSqsQueueMetadata{}
meta.targetQueueLength = defaultTargetQueueLength
meta.scaleOnInFlight = defaultScaleOnInFlight
meta.scaleOnDelayed = defaultScaleOnDelayed

if val, ok := config.TriggerMetadata["queueLength"]; ok && val != "" {
queueLength, err := strconv.ParseInt(val, 10, 64)
Expand Down Expand Up @@ -109,10 +103,22 @@ func parseAwsSqsQueueMetadata(config *ScalerConfig, logger logr.Logger) (*awsSqs
}
}

if val, ok := config.TriggerMetadata["scaleOnDelayed"]; ok && val != "" {
scaleOnDelayed, err := strconv.ParseBool(val)
if err != nil {
meta.scaleOnDelayed = defaultScaleOnDelayed
logger.Error(err, "Error parsing SQS queue metadata scaleOnDelayed, using default %n", defaultScaleOnDelayed)
} else {
meta.scaleOnDelayed = scaleOnDelayed
}
}

meta.awsSqsQueueMetricNames = []string{"ApproximateNumberOfMessages"}
if meta.scaleOnInFlight {
meta.awsSqsQueueMetricNames = awsSqsQueueMetricNamesForScalingInFlight
} else {
meta.awsSqsQueueMetricNames = awsSqsQueueMetricNamesForNotScalingInFlight
meta.awsSqsQueueMetricNames = append(meta.awsSqsQueueMetricNames, "ApproximateNumberOfMessagesNotVisible")
}
if meta.scaleOnDelayed {
meta.awsSqsQueueMetricNames = append(meta.awsSqsQueueMetricNames, "ApproximateNumberOfMessagesDelayed")
}

if val, ok := config.TriggerMetadata["queueURL"]; ok && val != "" {
Expand Down
46 changes: 45 additions & 1 deletion pkg/scalers/aws_sqs_queue_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func (m *mockSqs) GetQueueAttributes(input *sqs.GetQueueAttributesInput) (*sqs.G
Attributes: map[string]*string{
"ApproximateNumberOfMessages": aws.String("NotInt"),
"ApproximateNumberOfMessagesNotVisible": aws.String("NotInt"),
"ApproximateNumberOfMessagesDelayed": aws.String("NotInt"),
},
}, nil
}
Expand All @@ -73,6 +74,7 @@ func (m *mockSqs) GetQueueAttributes(input *sqs.GetQueueAttributesInput) (*sqs.G
Attributes: map[string]*string{
"ApproximateNumberOfMessages": aws.String("200"),
"ApproximateNumberOfMessagesNotVisible": aws.String("100"),
"ApproximateNumberOfMessagesDelayed": aws.String("50"),
},
}, nil
}
Expand Down Expand Up @@ -326,6 +328,44 @@ var awsSQSGetMetricTestData = []*parseAWSSQSMetadataTestData{
testAWSSQSEmptyResolvedEnv,
false,
"not error with scaleOnInFlight enabled"},
{map[string]string{
"queueURL": testAWSSQSProperQueueURL,
"queueLength": "1",
"awsRegion": "eu-west-1",
"scaleOnDelayed": "false"},
testAWSSQSAuthentication,
testAWSSQSEmptyResolvedEnv,
false,
"not error with scaleOnDelayed disabled"},
{map[string]string{
"queueURL": testAWSSQSProperQueueURL,
"queueLength": "1",
"awsRegion": "eu-west-1",
"scaleOnDelayed": "true"},
testAWSSQSAuthentication,
testAWSSQSEmptyResolvedEnv,
false,
"not error with scaleOnDelayed enabled"},
{map[string]string{
"queueURL": testAWSSQSProperQueueURL,
"queueLength": "1",
"awsRegion": "eu-west-1",
"scaleOnInFlight": "false",
"scaleOnDelayed": "false"},
testAWSSQSAuthentication,
testAWSSQSEmptyResolvedEnv,
false,
"not error with scaledOnInFlight and scaleOnDelayed disabled"},
{map[string]string{
"queueURL": testAWSSQSProperQueueURL,
"queueLength": "1",
"awsRegion": "eu-west-1",
"scaleOnInFlight": "true",
"scaleOnDelayed": "true"},
testAWSSQSAuthentication,
testAWSSQSEmptyResolvedEnv,
false,
"not error with scaledOnInFlight and scaleOnDelayed enabled"},
{map[string]string{
"queueURL": testAWSSQSErrorQueueURL,
"queueLength": "1",
Expand Down Expand Up @@ -390,8 +430,12 @@ func TestAWSSQSScalerGetMetrics(t *testing.T) {
case testAWSSQSBadDataQueueURL:
assert.Error(t, err, "expect error because of bad data return from sqs")
default:
if meta.scaleOnInFlight {
if meta.scaleOnInFlight && meta.scaleOnDelayed {
assert.EqualValues(t, int64(350.0), value[0].Value.Value())
} else if meta.scaleOnInFlight && !meta.scaleOnDelayed {
assert.EqualValues(t, int64(300.0), value[0].Value.Value())
} else if !meta.scaleOnInFlight && meta.scaleOnDelayed {
assert.EqualValues(t, int64(250.0), value[0].Value.Value())
} else {
assert.EqualValues(t, int64(200.0), value[0].Value.Value())
}
Expand Down

0 comments on commit a44e980

Please sign in to comment.