Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AWS DynamoDB Scaler #2486

Merged
merged 34 commits into from
Mar 22, 2022
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
83ef62e
Adding DynamoDB Scaler + Testing metadata parsing.
samueleresca Jan 16, 2022
9108d10
Adding coverage GetMetrics methods.
samueleresca Jan 16, 2022
a811770
Adding GetQueryMetrics tests.
samueleresca Jan 19, 2022
e0c2bb6
Updating configuration parsing.
samueleresca Jan 20, 2022
63b0dcf
Updating Dynamo scaler: moving JSON parsing into parseAwsDynamoDBMeta…
samueleresca Jan 20, 2022
cc1403e
Cleaning code for linting rules + Adding IsActive test coverage.
samueleresca Jan 20, 2022
6c0c7fd
Updating code for linting rules.
samueleresca Jan 20, 2022
20f9934
Updating with proper import formatting.
samueleresca Jan 20, 2022
a403f4b
Merge branch 'main' of https://github.com/kedacore/keda into dynamodb…
samueleresca Feb 18, 2022
bf0e96f
Updating scalers.
samueleresca Feb 21, 2022
a297157
Merge branch 'main' of https://github.com/kedacore/keda into dynamodb…
samueleresca Feb 22, 2022
05a7f23
Updating scaler removing metricName + Fix logging.
samueleresca Feb 24, 2022
5b1c199
Update pkg/scalers/aws_dynamodb_scaler.go
samueleresca Feb 28, 2022
a0c681e
Update pkg/scalers/aws_dynamodb_scaler.go
samueleresca Feb 28, 2022
5357266
Update pkg/scalers/aws_dynamodb_scaler.go
samueleresca Feb 28, 2022
c700152
Update pkg/scalers/aws_dynamodb_scaler.go
samueleresca Feb 28, 2022
88c4572
nit: Reordering kedautil import.
samueleresca Feb 28, 2022
8819b1d
Update pkg/scalers/aws_dynamodb_scaler.go
samueleresca Feb 28, 2022
5b7170c
Adding E2E tests for dynamo.
samueleresca Mar 7, 2022
adf3d79
move metrics generation to parseAwsDynamoDBMetadata.
samueleresca Mar 7, 2022
30014c1
Run go fmt.
samueleresca Mar 7, 2022
1f5022f
Updating CHANGELOG.
samueleresca Mar 7, 2022
7d46b5f
Merge branch 'main' into dynamodb-scaler
samueleresca Mar 7, 2022
9592d3d
Merge branch 'kedacore:main' into dynamodb-scaler
samueleresca Mar 11, 2022
30b977d
Using createNamespace helper.
samueleresca Mar 11, 2022
90bb1e0
Merge branch 'dynamodb-scaler' of https://github.com/samueleresca/ked…
samueleresca Mar 11, 2022
610b91c
Merge branch 'main' into dynamodb-scaler
samueleresca Mar 14, 2022
f67456e
Pull origin main.
samueleresca Mar 14, 2022
6926736
Fix CHANGELOG.
samueleresca Mar 14, 2022
d00b0f5
Fixing CHANGELOG + Ignoring AWS e2e test.
samueleresca Mar 17, 2022
1e72ea2
Pulling main.
samueleresca Mar 17, 2022
f6ae0ee
Removing TODO.
samueleresca Mar 17, 2022
1539c69
Adding newline to e2e test.
samueleresca Mar 17, 2022
6204e9b
Updating CHANGELOG, removing unrelated entries.
samueleresca Mar 18, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

### New

- **General:** Automatically release container image for ARM ([#2263]https://github.com/kedacore/keda/issues/2263))
samueleresca marked this conversation as resolved.
Show resolved Hide resolved
- **General:** Automatically run end-to-end tests on ARM ([#2262]https://github.com/kedacore/keda/issues/2262))
- **General:** Introduce new AWS DynamoDB Scaler ([#2486](https://github.com/kedacore/keda/issues/2482))
- **General:** Introduce new Azure Data Explorer Scaler ([#1488](https://github.com/kedacore/keda/issues/1488))
- **General:** Introduce new GCP Storage Scaler ([#2628](https://github.com/kedacore/keda/issues/2628))
- **General:** Introduce ARM-based container image for KEDA ([#2263](https://github.com/kedacore/keda/issues/2263) & [#2262](https://github.com/kedacore/keda/issues/2262))
Expand Down
242 changes: 242 additions & 0 deletions pkg/scalers/aws_dynamodb_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
package scalers

import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"go.mongodb.org/mongo-driver/bson"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)

type awsDynamoDBScaler struct {
metadata *awsDynamoDBMetadata
dbClient dynamodbiface.DynamoDBAPI
}

type awsDynamoDBMetadata struct {
tableName string
awsRegion string
keyConditionExpression string
expressionAttributeNames map[string]*string
expressionAttributeValues map[string]*dynamodb.AttributeValue
targetValue int
awsAuthorization awsAuthorizationMetadata
scalerIndex int
metricName string
}

var dynamoDBLog = logf.Log.WithName("aws_dynamodb_scaler")

func NewAwsDynamoDBScaler(config *ScalerConfig) (Scaler, error) {
meta, err := parseAwsDynamoDBMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing DynamoDb metadata: %s", err)
}

return &awsDynamoDBScaler{
metadata: meta,
dbClient: createDynamoDBClient(meta),
}, nil
}

func parseAwsDynamoDBMetadata(config *ScalerConfig) (*awsDynamoDBMetadata, error) {
meta := awsDynamoDBMetadata{}

if val, ok := config.TriggerMetadata["tableName"]; ok && val != "" {
meta.tableName = val
} else {
return nil, fmt.Errorf("no tableName given")
}

if val, ok := config.TriggerMetadata["awsRegion"]; ok && val != "" {
meta.awsRegion = val
} else {
return nil, fmt.Errorf("no awsRegion given")
}

if val, ok := config.TriggerMetadata["keyConditionExpression"]; ok && val != "" {
meta.keyConditionExpression = val
} else {
return nil, fmt.Errorf("no keyConditionExpression given")
}

if val, ok := config.TriggerMetadata["expressionAttributeNames"]; ok && val != "" {
names, err := json2Map(val)

if err != nil {
return nil, fmt.Errorf("error parsing expressionAttributeNames: %s", err)
}

meta.expressionAttributeNames = names
} else {
return nil, fmt.Errorf("no expressionAttributeNames given")
}

if val, ok := config.TriggerMetadata["expressionAttributeValues"]; ok && val != "" {
values, err := json2DynamoMap(val)

if err != nil {
return nil, fmt.Errorf("error parsing expressionAttributeValues: %s", err)
}

meta.expressionAttributeValues = values
} else {
return nil, fmt.Errorf("no expressionAttributeValues given")
}

if val, ok := config.TriggerMetadata["targetValue"]; ok && val != "" {
n, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("error parsing metadata targetValue")
}

meta.targetValue = n
} else {
return nil, fmt.Errorf("no targetValue given")
}

auth, err := getAwsAuthorization(config.AuthParams, config.TriggerMetadata, config.ResolvedEnv)
if err != nil {
return nil, err
}

meta.awsAuthorization = auth
meta.scalerIndex = config.ScalerIndex

meta.metricName = GenerateMetricNameWithIndex(config.ScalerIndex,
kedautil.NormalizeString(fmt.Sprintf("aws-dynamodb-%s", meta.tableName)))

return &meta, nil
}

func createDynamoDBClient(meta *awsDynamoDBMetadata) *dynamodb.DynamoDB {
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(meta.awsRegion),
}))

var dbClient *dynamodb.DynamoDB

if !meta.awsAuthorization.podIdentityOwner {
dbClient = dynamodb.New(sess, &aws.Config{
Region: aws.String(meta.awsRegion),
})

return dbClient
}

creds := credentials.NewStaticCredentials(meta.awsAuthorization.awsAccessKeyID, meta.awsAuthorization.awsSecretAccessKey, "")

if meta.awsAuthorization.awsRoleArn != "" {
creds = stscreds.NewCredentials(sess, meta.awsAuthorization.awsRoleArn)
}

dbClient = dynamodb.New(sess, &aws.Config{
Region: aws.String(meta.awsRegion),
Credentials: creds,
})

return dbClient
}

func (c *awsDynamoDBScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
metricValue, err := c.GetQueryMetrics()
if err != nil {
dynamoDBLog.Error(err, "Error getting metric value")
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(metricValue, resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func (c *awsDynamoDBScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
targetMetricValue := resource.NewQuantity(int64(c.metadata.targetValue), resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: c.metadata.metricName,
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetMetricValue,
},
}
metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType}

return []v2beta2.MetricSpec{
metricSpec,
}
}

func (c *awsDynamoDBScaler) IsActive(ctx context.Context) (bool, error) {
messages, err := c.GetQueryMetrics()
if err != nil {
return false, fmt.Errorf("error inspecting aws-dynamodb: %s", err)
}

return messages > 0, nil
}

func (c *awsDynamoDBScaler) Close(context.Context) error {
return nil
}

func (c *awsDynamoDBScaler) GetQueryMetrics() (int64, error) {
dimensions := dynamodb.QueryInput{
TableName: aws.String(c.metadata.tableName),
KeyConditionExpression: aws.String(c.metadata.keyConditionExpression),
ExpressionAttributeNames: c.metadata.expressionAttributeNames,
ExpressionAttributeValues: c.metadata.expressionAttributeValues,
}

res, err := c.dbClient.Query(&dimensions)
if err != nil {
dynamoDBLog.Error(err, "Failed to get output")
return 0, err
}

return *res.Count, nil
}

// json2Map convert Json to map[string]string
func json2Map(js string) (m map[string]*string, err error) {
err = bson.UnmarshalExtJSON([]byte(js), true, &m)
if err != nil {
return nil, err
}

if len(m) == 0 {
return nil, errors.New("empty map")
}
return m, err
}

// json2DynamoMap converts Json to map[string]*dynamoDb.AttributeValue
func json2DynamoMap(js string) (m map[string]*dynamodb.AttributeValue, err error) {
err = json.Unmarshal([]byte(js), &m)

if err != nil {
return nil, err
}
return m, err
}
Loading