Skip to content

Commit

Permalink
Update artemis, azure and aws scalers to handle scalerIndex
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Turrado <[email protected]>
  • Loading branch information
Jorge Turrado committed Oct 10, 2021
1 parent 9ce6287 commit 6d697e4
Show file tree
Hide file tree
Showing 26 changed files with 182 additions and 91 deletions.
6 changes: 5 additions & 1 deletion pkg/scalers/artemis_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type artemisMetadata struct {
restAPITemplate string
queueLength int
corsHeader string
scalerIndex int
}

//revive:enable:var-naming
Expand Down Expand Up @@ -149,6 +150,9 @@ func parseArtemisMetadata(config *ScalerConfig) (*artemisMetadata, error) {
if meta.password == "" {
return nil, fmt.Errorf("password cannot be empty")
}

meta.scalerIndex = config.ScalerIndex

return &meta, nil
}

Expand Down Expand Up @@ -214,7 +218,7 @@ func (s *artemisScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetMetricValue := resource.NewQuantity(int64(s.metadata.queueLength), resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s", "artemis", s.metadata.brokerName, s.metadata.queueName)),
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s", "artemis", s.metadata.brokerName, s.metadata.queueName))),
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
Expand Down
6 changes: 4 additions & 2 deletions pkg/scalers/artemis_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type parseArtemisMetadataTestData struct {

type artemisMetricIdentifier struct {
metadataTestData *parseArtemisMetadataTestData
scalerIndex int
name string
}

Expand Down Expand Up @@ -56,7 +57,8 @@ var testArtemisMetadata = []parseArtemisMetadataTestData{
}

var artemisMetricIdentifiers = []artemisMetricIdentifier{
{&testArtemisMetadata[7], "artemis-broker-activemq-queue1"},
{&testArtemisMetadata[7], 0, "s0-artemis-broker-activemq-queue1"},
{&testArtemisMetadata[7], 1, "s1-artemis-broker-activemq-queue1"},
}

var testArtemisMetadataWithEmptyAuthParams = []parseArtemisMetadataTestData{
Expand Down Expand Up @@ -138,7 +140,7 @@ func TestArtemisParseMetadata(t *testing.T) {

func TestArtemisGetMetricSpecForScaling(t *testing.T) {
for _, testData := range artemisMetricIdentifiers {
meta, err := parseArtemisMetadata(&ScalerConfig{ResolvedEnv: sampleArtemisResolvedEnv, TriggerMetadata: testData.metadataTestData.metadata, AuthParams: nil})
meta, err := parseArtemisMetadata(&ScalerConfig{ResolvedEnv: sampleArtemisResolvedEnv, TriggerMetadata: testData.metadataTestData.metadata, AuthParams: nil, ScalerIndex: testData.scalerIndex})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/scalers/aws_cloudwatch_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type awsCloudwatchMetadata struct {
awsRegion string

awsAuthorization awsAuthorizationMetadata

scalerIndex int
}

var cloudwatchLog = logf.Log.WithName("aws_cloudwatch_scaler")
Expand Down Expand Up @@ -189,6 +191,8 @@ func parseAwsCloudwatchMetadata(config *ScalerConfig) (*awsCloudwatchMetadata, e

meta.awsAuthorization = auth

meta.scalerIndex = config.ScalerIndex

return meta, nil
}

Expand All @@ -213,7 +217,7 @@ func (c *awsCloudwatchScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetMetricValue := resource.NewQuantity(int64(c.metadata.targetMetricValue), resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s-%s", "aws-cloudwatch", c.metadata.namespace, c.metadata.dimensionName[0], c.metadata.dimensionValue[0])),
Name: GenerateMetricNameWithIndex(c.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s-%s", "aws-cloudwatch", c.metadata.namespace, c.metadata.dimensionName[0], c.metadata.dimensionValue[0]))),
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
Expand Down
6 changes: 4 additions & 2 deletions pkg/scalers/aws_cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type parseAWSCloudwatchMetadataTestData struct {

type awsCloudwatchMetricIdentifier struct {
metadataTestData *parseAWSCloudwatchMetadataTestData
scalerIndex int
name string
}

Expand Down Expand Up @@ -233,7 +234,8 @@ var testAWSCloudwatchMetadata = []parseAWSCloudwatchMetadataTestData{
}

var awsCloudwatchMetricIdentifiers = []awsCloudwatchMetricIdentifier{
{&testAWSCloudwatchMetadata[1], "aws-cloudwatch-AWS-SQS-QueueName-keda"},
{&testAWSCloudwatchMetadata[1], 0, "s0-aws-cloudwatch-AWS-SQS-QueueName-keda"},
{&testAWSCloudwatchMetadata[1], 3, "s3-aws-cloudwatch-AWS-SQS-QueueName-keda"},
}

func TestCloudwatchParseMetadata(t *testing.T) {
Expand All @@ -250,7 +252,7 @@ func TestCloudwatchParseMetadata(t *testing.T) {

func TestAWSCloudwatchGetMetricSpecForScaling(t *testing.T) {
for _, testData := range awsCloudwatchMetricIdentifiers {
meta, err := parseAwsCloudwatchMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testAWSCloudwatchResolvedEnv, AuthParams: testData.metadataTestData.authParams})
meta, err := parseAwsCloudwatchMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testAWSCloudwatchResolvedEnv, AuthParams: testData.metadataTestData.authParams, ScalerIndex: testData.scalerIndex})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/scalers/aws_kinesis_stream_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type awsKinesisStreamMetadata struct {
streamName string
awsRegion string
awsAuthorization awsAuthorizationMetadata
scalerIndex int
}

var kinesisStreamLog = logf.Log.WithName("aws_kinesis_stream_scaler")
Expand Down Expand Up @@ -83,6 +84,8 @@ func parseAwsKinesisStreamMetadata(config *ScalerConfig) (*awsKinesisStreamMetad

meta.awsAuthorization = auth

meta.scalerIndex = config.ScalerIndex

return &meta, nil
}

Expand All @@ -105,7 +108,7 @@ func (s *awsKinesisStreamScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec
targetShardCountQty := resource.NewQuantity(int64(s.metadata.targetShardCount), resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s", "AWS-Kinesis-Stream", s.metadata.streamName)),
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("%s-%s", "AWS-Kinesis-Stream", s.metadata.streamName))),
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
Expand Down
91 changes: 58 additions & 33 deletions pkg/scalers/aws_kinesis_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ var testAWSKinesisAuthentication = map[string]string{
}

type parseAWSKinesisMetadataTestData struct {
metadata map[string]string
expected *awsKinesisStreamMetadata
authParams map[string]string
isError bool
comment string
metadata map[string]string
expected *awsKinesisStreamMetadata
authParams map[string]string
isError bool
comment string
scalerIndex int
}

type awsKinesisMetricIdentifier struct {
metadataTestData *parseAWSKinesisMetadataTestData
scalerIndex int
name string
}

Expand All @@ -53,27 +55,34 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{
awsSecretAccessKey: testAWSKinesisSecretAccessKey,
podIdentityOwner: true,
},
scalerIndex: 0,
},
isError: false,
comment: "properly formed stream name and region"},
isError: false,
comment: "properly formed stream name and region",
scalerIndex: 0,
},
{
metadata: map[string]string{
"streamName": "",
"shardCount": "2",
"awsRegion": testAWSRegion},
authParams: testAWSKinesisAuthentication,
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "missing stream name"},
authParams: testAWSKinesisAuthentication,
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "missing stream name",
scalerIndex: 1,
},
{
metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
"shardCount": "2",
"awsRegion": ""},
authParams: testAWSKinesisAuthentication,
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "properly formed stream name, empty region"},
authParams: testAWSKinesisAuthentication,
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "properly formed stream name, empty region",
scalerIndex: 2,
},
{
metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
Expand All @@ -89,9 +98,12 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{
awsSecretAccessKey: testAWSKinesisSecretAccessKey,
podIdentityOwner: true,
},
scalerIndex: 3,
},
isError: false,
comment: "properly formed stream name and region, empty shard count"},
isError: false,
comment: "properly formed stream name and region, empty shard count",
scalerIndex: 3,
},
{
metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
Expand All @@ -107,10 +119,12 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{
awsSecretAccessKey: testAWSKinesisSecretAccessKey,
podIdentityOwner: true,
},
scalerIndex: 4,
},
isError: false,
comment: "properly formed stream name and region, wrong shard count"},

isError: false,
comment: "properly formed stream name and region, wrong shard count",
scalerIndex: 4,
},
{
metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
Expand All @@ -120,9 +134,11 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{
"awsAccessKeyID": "",
"awsSecretAccessKey": testAWSKinesisSecretAccessKey,
},
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "with AWS Credentials from TriggerAuthentication, missing Access Key Id"},
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "with AWS Credentials from TriggerAuthentication, missing Access Key Id",
scalerIndex: 5,
},
{metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
"shardCount": "2",
Expand All @@ -131,9 +147,11 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{
"awsAccessKeyID": testAWSKinesisAccessKeyID,
"awsSecretAccessKey": "",
},
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "with AWS Credentials from TriggerAuthentication, missing Secret Access Key"},
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "with AWS Credentials from TriggerAuthentication, missing Secret Access Key",
scalerIndex: 6,
},
{metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
"shardCount": "2",
Expand All @@ -149,9 +167,12 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{
awsRoleArn: testAWSKinesisRoleArn,
podIdentityOwner: true,
},
scalerIndex: 7,
},
isError: false,
comment: "with AWS Role from TriggerAuthentication"},
isError: false,
comment: "with AWS Role from TriggerAuthentication",
scalerIndex: 7,
},
{metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
"shardCount": "2",
Expand All @@ -165,18 +186,22 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{
awsAuthorization: awsAuthorizationMetadata{
podIdentityOwner: false,
},
scalerIndex: 8,
},
isError: false,
comment: "with AWS Role assigned on KEDA operator itself"},
isError: false,
comment: "with AWS Role assigned on KEDA operator itself",
scalerIndex: 8,
},
}

var awsKinesisMetricIdentifiers = []awsKinesisMetricIdentifier{
{&testAWSKinesisMetadata[1], "AWS-Kinesis-Stream-test"},
{&testAWSKinesisMetadata[1], 0, "s0-AWS-Kinesis-Stream-test"},
{&testAWSKinesisMetadata[1], 1, "s1-AWS-Kinesis-Stream-test"},
}

func TestKinesisParseMetadata(t *testing.T) {
for _, testData := range testAWSKinesisMetadata {
result, err := parseAwsKinesisStreamMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testAWSKinesisAuthentication, AuthParams: testData.authParams})
result, err := parseAwsKinesisStreamMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testAWSKinesisAuthentication, AuthParams: testData.authParams, ScalerIndex: testData.scalerIndex})
if err != nil && !testData.isError {
t.Errorf("Expected success because %s got error, %s", testData.comment, err)
}
Expand All @@ -192,7 +217,7 @@ func TestKinesisParseMetadata(t *testing.T) {

func TestAWSKinesisGetMetricSpecForScaling(t *testing.T) {
for _, testData := range awsKinesisMetricIdentifiers {
meta, err := parseAwsKinesisStreamMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testAWSKinesisAuthentication, AuthParams: testData.metadataTestData.authParams})
meta, err := parseAwsKinesisStreamMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testAWSKinesisAuthentication, AuthParams: testData.metadataTestData.authParams, ScalerIndex: testData.scalerIndex})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/scalers/aws_sqs_queue_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type awsSqsQueueMetadata struct {
queueName string
awsRegion string
awsAuthorization awsAuthorizationMetadata
scalerIndex int
}

// NewAwsSqsQueueScaler creates a new awsSqsQueueScaler
Expand Down Expand Up @@ -105,6 +106,8 @@ func parseAwsSqsQueueMetadata(config *ScalerConfig) (*awsSqsQueueMetadata, error

meta.awsAuthorization = auth

meta.scalerIndex = config.ScalerIndex

return &meta, nil
}

Expand All @@ -127,7 +130,7 @@ func (s *awsSqsQueueScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetQueueLengthQty := resource.NewQuantity(int64(s.metadata.targetQueueLength), resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s", "AWS-SQS-Queue", s.metadata.queueName)),
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("%s-%s", "AWS-SQS-Queue", s.metadata.queueName))),
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
Expand Down
6 changes: 4 additions & 2 deletions pkg/scalers/aws_sqs_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type parseAWSSQSMetadataTestData struct {

type awsSQSMetricIdentifier struct {
metadataTestData *parseAWSSQSMetadataTestData
scalerIndex int
name string
}

Expand Down Expand Up @@ -131,7 +132,8 @@ var testAWSSQSMetadata = []parseAWSSQSMetadataTestData{
}

var awsSQSMetricIdentifiers = []awsSQSMetricIdentifier{
{&testAWSSQSMetadata[1], "AWS-SQS-Queue-DeleteArtifactQ"},
{&testAWSSQSMetadata[1], 0, "s0-AWS-SQS-Queue-DeleteArtifactQ"},
{&testAWSSQSMetadata[1], 1, "s1-AWS-SQS-Queue-DeleteArtifactQ"},
}

func TestSQSParseMetadata(t *testing.T) {
Expand All @@ -148,7 +150,7 @@ func TestSQSParseMetadata(t *testing.T) {

func TestAWSSQSGetMetricSpecForScaling(t *testing.T) {
for _, testData := range awsSQSMetricIdentifiers {
meta, err := parseAwsSqsQueueMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testAWSSQSAuthentication, AuthParams: testData.metadataTestData.authParams})
meta, err := parseAwsSqsQueueMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testAWSSQSAuthentication, AuthParams: testData.metadataTestData.authParams, ScalerIndex: testData.scalerIndex})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/scalers/azure_blob_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type azureBlobMetadata struct {
accountName string
metricName string
endpointSuffix string
scalerIndex int
}

var azureBlobLog = logf.Log.WithName("azure_blob_scaler")
Expand Down Expand Up @@ -152,6 +153,8 @@ func parseAzureBlobMetadata(config *ScalerConfig) (*azureBlobMetadata, kedav1alp
return nil, "", fmt.Errorf("pod identity %s not supported for azure storage blobs", config.PodIdentity)
}

meta.scalerIndex = config.ScalerIndex

return &meta, config.PodIdentity, nil
}

Expand Down Expand Up @@ -185,7 +188,7 @@ func (s *azureBlobScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetBlobCount := resource.NewQuantity(int64(s.metadata.targetBlobCount), resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: s.metadata.metricName,
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, s.metadata.metricName),
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
Expand Down
Loading

0 comments on commit 6d697e4

Please sign in to comment.