Skip to content

Commit

Permalink
Improve AWS Cloudwatch Scaler metric exporting logic (#2243)
Browse files Browse the repository at this point in the history
Signed-off-by: Xiayang Wu <[email protected]>
  • Loading branch information
fivesheep authored Nov 4, 2021
1 parent 2350679 commit b6fde28
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 67 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
- Add `unsafeSsl` parameter in InfluxDB scaler ([#2157](https://github.com/kedacore/keda/pull/2157))
- Improve metric name creation to be unique using scaler index inside the scaler ([#2161](https://github.com/kedacore/keda/pull/2161))
- Improve error message if `IdleReplicaCount` are equal to `MinReplicaCount` to be the same as the check ([#2212](https://github.com/kedacore/keda/pull/2212))
- Improve Cloudwatch Scaler metric exporting logic ([#2243](https://github.com/kedacore/keda/pull/2243))

### Breaking Changes

Expand Down
184 changes: 117 additions & 67 deletions pkg/scalers/aws_cloudwatch_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
defaultMetricCollectionTime = 300
defaultMetricStat = "Average"
defaultMetricStatPeriod = 300
defaultMetricEndTimeOffset = 0
)

type awsCloudwatchScaler struct {
Expand All @@ -44,7 +45,9 @@ type awsCloudwatchMetadata struct {

metricCollectionTime int64
metricStat string
metricUnit string
metricStatPeriod int64
metricEndTimeOffset int64

awsRegion string

Expand All @@ -67,44 +70,41 @@ func NewAwsCloudwatchScaler(config *ScalerConfig) (Scaler, error) {
}, nil
}

func parseMetricValues(config *ScalerConfig) (*awsCloudwatchMetadata, error) {
metricsMeta := awsCloudwatchMetadata{}

if val, ok := config.TriggerMetadata["metricCollectionTime"]; ok && val != "" {
if n, ok := strconv.ParseInt(val, 10, 64); ok == nil {
metricsMeta.metricCollectionTime = n
} else {
return nil, fmt.Errorf("metricCollectionTime not a valid number")
func getIntMetadataValue(metadata map[string]string, key string, required bool, defaultValue int64) (int64, error) {
if val, ok := metadata[key]; ok && val != "" {
value, err := strconv.Atoi(val)
if err != nil {
return 0, fmt.Errorf("error parsing %s metadata: %v", key, err)
}
} else {
metricsMeta.metricCollectionTime = defaultMetricCollectionTime
return int64(value), nil
}

if required {
return 0, fmt.Errorf("metadata %s not given", key)
}

if val, ok := config.TriggerMetadata["metricStatPeriod"]; ok && val != "" {
if n, ok := strconv.ParseInt(val, 10, 64); ok == nil {
metricsMeta.metricStatPeriod = n
} else {
return nil, fmt.Errorf("metricStatPeriod not a valid number")
return defaultValue, nil
}

func getFloatMetadataValue(metadata map[string]string, key string, required bool, defaultValue float64) (float64, error) {
if val, ok := metadata[key]; ok && val != "" {
value, err := strconv.ParseFloat(val, 64)
if err != nil {
return 0, fmt.Errorf("error parsing %s metadata: %v", key, err)
}
} else {
metricsMeta.metricStatPeriod = defaultMetricStatPeriod
return value, nil
}

if val, ok := config.TriggerMetadata["metricStat"]; ok && val != "" {
metricsMeta.metricStat = val
} else {
metricsMeta.metricStat = defaultMetricStat
if required {
return 0, fmt.Errorf("metadata %s not given", key)
}

return &metricsMeta, nil
return defaultValue, nil
}

func parseAwsCloudwatchMetadata(config *ScalerConfig) (*awsCloudwatchMetadata, error) {
meta, err := parseMetricValues(config)

if err != nil {
return nil, fmt.Errorf("an error occurred when the scaler tried to get the metrics values")
}
var err error
meta := awsCloudwatchMetadata{}

if val, ok := config.TriggerMetadata["namespace"]; ok && val != "" {
meta.namespace = val
Expand Down Expand Up @@ -134,48 +134,50 @@ func parseAwsCloudwatchMetadata(config *ScalerConfig) (*awsCloudwatchMetadata, e
return nil, fmt.Errorf("dimensionName and dimensionValue are not matching in size")
}

if val, ok := config.TriggerMetadata["targetMetricValue"]; ok && val != "" {
targetMetricValue, err := strconv.ParseFloat(val, 64)
if err != nil {
cloudwatchLog.Error(err, "Error parsing targetMetricValue metadata")
} else {
meta.targetMetricValue = targetMetricValue
}
} else {
return nil, fmt.Errorf("target Metric Value not given")
}

if val, ok := config.TriggerMetadata["minMetricValue"]; ok && val != "" {
minMetricValue, err := strconv.ParseFloat(val, 64)
if err != nil {
cloudwatchLog.Error(err, "Error parsing minMetricValue metadata")
} else {
meta.minMetricValue = minMetricValue
}
} else {
return nil, fmt.Errorf("min metric value not given")
meta.targetMetricValue, err = getFloatMetadataValue(config.TriggerMetadata, "targetMetricValue", true, 0)
if err != nil {
return nil, err
}

if val, ok := config.TriggerMetadata["metricCollectionTime"]; ok && val != "" {
metricCollectionTime, err := strconv.Atoi(val)
if err != nil {
cloudwatchLog.Error(err, "Error parsing metricCollectionTime metadata")
} else {
meta.metricCollectionTime = int64(metricCollectionTime)
}
meta.minMetricValue, err = getFloatMetadataValue(config.TriggerMetadata, "minMetricValue", true, 0)
if err != nil {
return nil, err
}

meta.metricStat = defaultMetricStat
if val, ok := config.TriggerMetadata["metricStat"]; ok && val != "" {
meta.metricStat = val
}
if err = checkMetricStat(meta.metricStat); err != nil {
return nil, err
}

if val, ok := config.TriggerMetadata["metricStatPeriod"]; ok && val != "" {
metricStatPeriod, err := strconv.Atoi(val)
if err != nil {
cloudwatchLog.Error(err, "Error parsing metricStatPeriod metadata")
} else {
meta.metricStatPeriod = int64(metricStatPeriod)
}
meta.metricStatPeriod, err = getIntMetadataValue(config.TriggerMetadata, "metricStatPeriod", false, defaultMetricStatPeriod)
if err != nil {
return nil, err
}

if err = checkMetricStatPeriod(meta.metricStatPeriod); err != nil {
return nil, err
}

meta.metricCollectionTime, err = getIntMetadataValue(config.TriggerMetadata, "metricCollectionTime", false, defaultMetricCollectionTime)
if err != nil {
return nil, err
}

if meta.metricCollectionTime < 0 || meta.metricCollectionTime%meta.metricStatPeriod != 0 {
return nil, fmt.Errorf("metricCollectionTime must be greater than 0 and a multiple of metricStatPeriod(%d), %d is given", meta.metricStatPeriod, meta.metricCollectionTime)
}

meta.metricEndTimeOffset, err = getIntMetadataValue(config.TriggerMetadata, "metricEndTimeOffset", false, defaultMetricEndTimeOffset)
if err != nil {
return nil, err
}

meta.metricUnit = config.TriggerMetadata["metricUnit"]
if err = checkMetricUnit(meta.metricUnit); err != nil {
return nil, err
}

if val, ok := config.TriggerMetadata["awsRegion"]; ok && val != "" {
Expand All @@ -184,16 +186,54 @@ func parseAwsCloudwatchMetadata(config *ScalerConfig) (*awsCloudwatchMetadata, e
return nil, fmt.Errorf("no awsRegion given")
}

auth, err := getAwsAuthorization(config.AuthParams, config.TriggerMetadata, config.ResolvedEnv)
meta.awsAuthorization, err = getAwsAuthorization(config.AuthParams, config.TriggerMetadata, config.ResolvedEnv)
if err != nil {
return nil, err
}

meta.awsAuthorization = auth

meta.scalerIndex = config.ScalerIndex

return meta, nil
return &meta, nil
}

func checkMetricStat(stat string) error {
for _, s := range cloudwatch.Statistic_Values() {
if stat == s {
return nil
}
}
return fmt.Errorf("metricStat '%s' is not one of %v", stat, cloudwatch.Statistic_Values())
}

func checkMetricUnit(unit string) error {
if unit == "" {
return nil
}
for _, u := range cloudwatch.StandardUnit_Values() {
if unit == u {
return nil
}
}
return fmt.Errorf("metricUnit '%s' is not one of %v", unit, cloudwatch.StandardUnit_Values())
}

func checkMetricStatPeriod(period int64) error {
if period < 1 {
return fmt.Errorf("metricStatPeriod can not be smaller than 1, however, %d is provided", period)
} else if period <= 60 {
switch period {
case 1, 5, 10, 30, 60:
return nil
default:
return fmt.Errorf("metricStatPeriod < 60 has to be one of [1, 5, 10, 30], however, %d is provided", period)
}
}

if period%60 != 0 {
return fmt.Errorf("metricStatPeriod >= 60 has to be a multiple of 60, however, %d is provided", period)
}

return nil
}

func (c *awsCloudwatchScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
Expand Down Expand Up @@ -273,9 +313,18 @@ func (c *awsCloudwatchScaler) GetCloudwatchMetrics() (float64, error) {
})
}

endTime := time.Now().Add(time.Second * -1 * time.Duration(c.metadata.metricEndTimeOffset)).Truncate(time.Duration(c.metadata.metricStatPeriod) * time.Second)
startTime := endTime.Add(time.Second * -1 * time.Duration(c.metadata.metricCollectionTime))

var metricUnit *string
if c.metadata.metricUnit != "" {
metricUnit = aws.String(c.metadata.metricUnit)
}

input := cloudwatch.GetMetricDataInput{
StartTime: aws.Time(time.Now().Add(time.Second * -1 * time.Duration(c.metadata.metricCollectionTime))),
EndTime: aws.Time(time.Now()),
StartTime: aws.Time(startTime),
EndTime: aws.Time(endTime),
ScanBy: aws.String(cloudwatch.ScanByTimestampDescending),
MetricDataQueries: []*cloudwatch.MetricDataQuery{
{
Id: aws.String("c1"),
Expand All @@ -287,6 +336,7 @@ func (c *awsCloudwatchScaler) GetCloudwatchMetrics() (float64, error) {
},
Period: aws.Int64(c.metadata.metricStatPeriod),
Stat: aws.String(c.metadata.metricStat),
Unit: metricUnit,
},
ReturnData: aws.Bool(true),
},
Expand Down
75 changes: 75 additions & 0 deletions pkg/scalers/aws_cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,81 @@ var testAWSCloudwatchMetadata = []parseAWSCloudwatchMetadataTestData{
"awsRegion": "eu-west-1"},
testAWSAuthentication, false,
"Missing metricStatPeriod not generate error because will get the default value"},
{map[string]string{
"namespace": "AWS/SQS",
"dimensionName": "QueueName",
"dimensionValue": "keda",
"metricName": "ApproximateNumberOfMessagesVisible",
"targetMetricValue": "2",
"minMetricValue": "0",
"metricStat": "Average",
"metricUnit": "Count",
"metricEndTimeOffset": "60",
"awsRegion": "eu-west-1"},
testAWSAuthentication, false,
"set a supported metricUnit"},
{map[string]string{
"namespace": "AWS/SQS",
"dimensionName": "QueueName",
"dimensionValue": "keda",
"metricName": "ApproximateNumberOfMessagesVisible",
"targetMetricValue": "2",
"minMetricValue": "0",
"metricCollectionTime": "300",
"metricStat": "SomeStat",
"awsRegion": "eu-west-1"},
testAWSAuthentication, true,
"metricStat is not supported"},
{map[string]string{
"namespace": "AWS/SQS",
"dimensionName": "QueueName",
"dimensionValue": "keda",
"metricName": "ApproximateNumberOfMessagesVisible",
"targetMetricValue": "2",
"minMetricValue": "0",
"metricStatPeriod": "300",
"metricCollectionTime": "100",
"metricStat": "Average",
"awsRegion": "eu-west-1"},
testAWSAuthentication, true,
"metricCollectionTime smaller than metricStatPeriod"},
{map[string]string{
"namespace": "AWS/SQS",
"dimensionName": "QueueName",
"dimensionValue": "keda",
"metricName": "ApproximateNumberOfMessagesVisible",
"targetMetricValue": "2",
"minMetricValue": "0",
"metricStatPeriod": "250",
"metricStat": "Average",
"awsRegion": "eu-west-1"},
testAWSAuthentication, true,
"unsupported metricStatPeriod"},
{map[string]string{
"namespace": "AWS/SQS",
"dimensionName": "QueueName",
"dimensionValue": "keda",
"metricName": "ApproximateNumberOfMessagesVisible",
"targetMetricValue": "2",
"minMetricValue": "0",
"metricStatPeriod": "25",
"metricStat": "Average",
"awsRegion": "eu-west-1"},
testAWSAuthentication, true,
"unsupported metricStatPeriod"},
{map[string]string{
"namespace": "AWS/SQS",
"dimensionName": "QueueName",
"dimensionValue": "keda",
"metricName": "ApproximateNumberOfMessagesVisible",
"targetMetricValue": "2",
"minMetricValue": "0",
"metricStatPeriod": "25",
"metricStat": "Average",
"metricUnit": "Hour",
"awsRegion": "eu-west-1"},
testAWSAuthentication, true,
"unsupported metricUnit"},
}

var awsCloudwatchMetricIdentifiers = []awsCloudwatchMetricIdentifier{
Expand Down

0 comments on commit b6fde28

Please sign in to comment.