Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support non-public cloud environments in the Azure Service Bus scaler #1907

Merged
merged 1 commit into from
Aug 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
### New

- Extend Azure Monitor scaler to support custom metrics ([#1883](https://github.com/kedacore/keda/pull/1883))
- Support non-public cloud environments in the Azure Service Bus scaler ([#1907](https://github.com/kedacore/keda/pull/1907))
- Support non-public cloud environments in the Azure Storage Queue and Azure Storage Blob scalers ([#1863](https://github.com/kedacore/keda/pull/1863))
- Show HashiCorp Vault Address when using `kubectl get ta` or `kubectl get cta` ([#1862](https://github.com/kedacore/keda/pull/1862))
- Add Solace PubSub+ Event Broker Scaler ([#1945](https://github.com/kedacore/keda/pull/1945))
Expand Down
33 changes: 33 additions & 0 deletions pkg/scalers/azure/azure_cloud_environment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package azure

import (
"fmt"
"strings"

az "github.com/Azure/go-autorest/autorest/azure"
)

// EnvironmentSuffixProvider for different types of Azure scalers
type EnvironmentSuffixProvider func(env az.Environment) (string, error)

// ParseEndpointSuffix parses cloud and endpointSuffix metadata and returns the resolved endpoint suffix
func ParseEndpointSuffix(metadata map[string]string, suffixProvider EnvironmentSuffixProvider) (string, error) {
if val, ok := metadata["cloud"]; ok && val != "" {
if strings.EqualFold(val, PrivateCloud) {
if val, ok := metadata["endpointSuffix"]; ok && val != "" {
return val, nil
}
return "", fmt.Errorf("endpointSuffix must be provided for %s cloud type", PrivateCloud)
}

env, err := az.EnvironmentFromName(val)
if err != nil {
return "", fmt.Errorf("invalid cloud environment %s", val)
}

return suffixProvider(env)
}

// Use public cloud suffix if `cloud` isn't specified
return suffixProvider(az.PublicCloud)
}
52 changes: 52 additions & 0 deletions pkg/scalers/azure/azure_cloud_environment_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package azure

import (
"fmt"
"testing"

az "github.com/Azure/go-autorest/autorest/azure"
)

type parseEndpointSuffixTestData struct {
metadata map[string]string
endpointSuffix string
suffixProvider EnvironmentSuffixProvider
isError bool
}

var testSuffixProvider EnvironmentSuffixProvider = func(env az.Environment) (string, error) {
if env == az.USGovernmentCloud {
return "", fmt.Errorf("test endpoint is not available in %s", env.Name)
}
return fmt.Sprintf("%s.suffix", env.Name), nil
}

var parseEndpointSuffixTestDataset = []parseEndpointSuffixTestData{
{map[string]string{}, "AzurePublicCloud.suffix", testSuffixProvider, false},
{map[string]string{"cloud": "Invalid"}, "", testSuffixProvider, true},
{map[string]string{"cloud": "AzureUSGovernmentCloud"}, "", testSuffixProvider, true},
{map[string]string{"cloud": "AzureGermanCloud"}, "AzureGermanCloud.suffix", testSuffixProvider, false},
{map[string]string{"cloud": "Private"}, "", testSuffixProvider, true},
{map[string]string{"cloud": "Private", "endpointSuffix": "suffix.private.cloud"}, "suffix.private.cloud", testSuffixProvider, false},
{map[string]string{"endpointSuffix": "ignored"}, "AzurePublicCloud.suffix", testSuffixProvider, false},
}

func TestParseEndpointSuffix(t *testing.T) {
for _, testData := range parseEndpointSuffixTestDataset {
endpointSuffix, err := ParseEndpointSuffix(testData.metadata, testData.suffixProvider)
if !testData.isError && err != nil {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
if err == nil {
if endpointSuffix != testData.endpointSuffix {
t.Error(
"For", testData.metadata,
"expected endpointSuffix=", testData.endpointSuffix,
"but got", endpointSuffix)
}
}
}
}
16 changes: 2 additions & 14 deletions pkg/scalers/azure/azure_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,11 @@ func (e StorageEndpointType) GetEndpointSuffix(environment az.Environment) strin

// ParseAzureStorageEndpointSuffix parses cloud and endpointSuffix metadata and returns endpoint suffix
func ParseAzureStorageEndpointSuffix(metadata map[string]string, endpointType StorageEndpointType) (string, error) {
if val, ok := metadata["cloud"]; ok && val != "" {
if strings.EqualFold(val, PrivateCloud) {
if val, ok := metadata["endpointSuffix"]; ok && val != "" {
return val, nil
}
return "", fmt.Errorf("endpointSuffix must be provided for %s cloud type", PrivateCloud)
}

env, err := az.EnvironmentFromName(val)
if err != nil {
return "", fmt.Errorf("invalid cloud environment %s", val)
}
envSuffixProvider := func(env az.Environment) (string, error) {
return endpointType.GetEndpointSuffix(env), nil
}

// Use the default public cloud endpoint suffix if `cloud` isn't specified
return endpointType.GetEndpointSuffix(az.PublicCloud), nil
return ParseEndpointSuffix(metadata, envSuffixProvider)
}

// ParseAzureStorageQueueConnection parses queue connection string and returns credential and resource url
Expand Down
6 changes: 3 additions & 3 deletions pkg/scalers/azure/azure_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ func TestParseStorageConnectionString(t *testing.T) {
}
}

type parseEndpointSuffixTestData struct {
type parseAzureStorageEndpointSuffixTestData struct {
metadata map[string]string
endpointSuffix string
endpointType StorageEndpointType
isError bool
}

var parseEndpointSuffixTestDataset = []parseEndpointSuffixTestData{
var parseAzureStorageEndpointSuffixTestDataset = []parseAzureStorageEndpointSuffixTestData{
{map[string]string{}, "queue.core.windows.net", QueueEndpoint, false},
{map[string]string{"cloud": "InvalidCloud"}, "", QueueEndpoint, true},
{map[string]string{"cloud": "AzureUSGovernmentCloud"}, "queue.core.usgovcloudapi.net", QueueEndpoint, false},
Expand All @@ -78,7 +78,7 @@ var parseEndpointSuffixTestDataset = []parseEndpointSuffixTestData{
}

func TestParseAzureStorageEndpointSuffix(t *testing.T) {
for _, testData := range parseEndpointSuffixTestDataset {
for _, testData := range parseAzureStorageEndpointSuffixTestDataset {
endpointSuffix, err := ParseAzureStorageEndpointSuffix(testData.metadata, testData.endpointType)
if !testData.isError && err != nil {
t.Error("Expected success but got error", err)
Expand Down
16 changes: 15 additions & 1 deletion pkg/scalers/azure_servicebus_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/Azure/azure-amqp-common-go/v3/auth"
servicebus "github.com/Azure/azure-service-bus-go"
az "github.com/Azure/go-autorest/autorest/azure"
"github.com/kedacore/keda/v2/pkg/scalers/azure"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -46,6 +47,7 @@ type azureServiceBusMetadata struct {
connection string
entityType entityType
namespace string
endpointSuffix string
}

// NewAzureServiceBusScaler creates a new AzureServiceBusScaler
Expand Down Expand Up @@ -102,6 +104,16 @@ func parseAzureServiceBusMetadata(config *ScalerConfig) (*azureServiceBusMetadat
}
}

envSuffixProvider := func(env az.Environment) (string, error) {
return env.ServiceBusEndpointSuffix, nil
}

endpointSuffix, err := azure.ParseEndpointSuffix(config.TriggerMetadata, envSuffixProvider)
if err != nil {
return nil, err
}
meta.endpointSuffix = endpointSuffix

if meta.entityType == none {
return nil, fmt.Errorf("no service bus entity type set")
}
Expand Down Expand Up @@ -198,7 +210,8 @@ type azureTokenProvider struct {

// GetToken implements TokenProvider interface for azureTokenProvider
func (a azureTokenProvider) GetToken(uri string) (*auth.Token, error) {
token, err := azure.GetAzureADPodIdentityToken(a.httpClient, "https://servicebus.azure.net")
// Service bus resource id is "https://servicebus.azure.net/" in all cloud environments
token, err := azure.GetAzureADPodIdentityToken(a.httpClient, "https://servicebus.azure.net/")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -249,6 +262,7 @@ func (s *azureServiceBusScaler) getServiceBusNamespace() (*servicebus.Namespace,
namespace.Name = s.metadata.namespace
}

namespace.Suffix = s.metadata.endpointSuffix
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is all it takes for the servicebus client to work in other cloud environments, hope I didn't miss anything.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ahmelsayed Can you confirm this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for the delay, I've been out for a while and just got back.

There is also namespace.Environment that I see in the API. I don't have an easy way to validate it, but I'll try to get an environment where I can.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I could tell, namespace.Environment is only used for getting the service bus resource ID when calling NamespaceWithEnvironmentBinding to configure a namespace using environment details. Since we either create the namespace with connection string or with pod identity, I think namespace.Environment isn't required. That also makes sense to me because as far as I know the service bus client should also support cloud environments that are not the publicly known clouds, like air gapped clouds.

@ahmelsayed I might be able to test it in one of our clusters in US Gov if that helps.

return namespace, nil
}

Expand Down
57 changes: 37 additions & 20 deletions pkg/scalers/azure_servicebus_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ const (
connectionSetting = "none"
namespaceName = "ns"
messageCount = "1000"
defaultSuffix = "servicebus.windows.net"
)

type parseServiceBusMetadataTestData struct {
metadata map[string]string
isError bool
entityType entityType
authParams map[string]string
podIdentity kedav1alpha1.PodIdentityProvider
metadata map[string]string
isError bool
entityType entityType
endpointSuffix string
authParams map[string]string
podIdentity kedav1alpha1.PodIdentityProvider
}

type azServiceBusMetricIdentifier struct {
Expand All @@ -43,31 +45,41 @@ var connectionResolvedEnv = map[string]string{
}

var parseServiceBusMetadataDataset = []parseServiceBusMetadataTestData{
{map[string]string{}, true, none, map[string]string{}, ""},
{map[string]string{}, true, none, "", map[string]string{}, ""},
// properly formed queue
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting}, false, queue, map[string]string{}, ""},
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting}, false, queue, defaultSuffix, map[string]string{}, ""},
// properly formed queue with message count
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "messageCount": messageCount}, false, queue, map[string]string{}, ""},
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "messageCount": messageCount}, false, queue, defaultSuffix, map[string]string{}, ""},
// properly formed topic & subscription
{map[string]string{"topicName": topicName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting}, false, subscription, map[string]string{}, ""},
{map[string]string{"topicName": topicName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting}, false, subscription, defaultSuffix, map[string]string{}, ""},
// properly formed topic & subscription with message count
{map[string]string{"topicName": topicName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting, "messageCount": messageCount}, false, subscription, map[string]string{}, ""},
{map[string]string{"topicName": topicName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting, "messageCount": messageCount}, false, subscription, defaultSuffix, map[string]string{}, ""},
// queue and topic specified
{map[string]string{"queueName": queueName, "topicName": topicName, "connectionFromEnv": connectionSetting}, true, none, map[string]string{}, ""},
{map[string]string{"queueName": queueName, "topicName": topicName, "connectionFromEnv": connectionSetting}, true, none, "", map[string]string{}, ""},
// queue and subscription specified
{map[string]string{"queueName": queueName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting}, true, none, map[string]string{}, ""},
{map[string]string{"queueName": queueName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting}, true, none, "", map[string]string{}, ""},
// topic but no subscription specified
{map[string]string{"topicName": topicName, "connectionFromEnv": connectionSetting}, true, none, map[string]string{}, ""},
{map[string]string{"topicName": topicName, "connectionFromEnv": connectionSetting}, true, none, "", map[string]string{}, ""},
// subscription but no topic specified
{map[string]string{"subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting}, true, none, map[string]string{}, ""},
{map[string]string{"subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting}, true, none, "", map[string]string{}, ""},
// valid cloud
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "cloud": "AzureChinaCloud"}, false, queue, "servicebus.chinacloudapi.cn", map[string]string{}, ""},
// invalid cloud
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "cloud": "InvalidCloud"}, true, none, "", map[string]string{}, ""},
// private cloud with endpoint suffix
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "cloud": "Private", "endpointSuffix": "servicebus.private.cloud"}, false, queue, "servicebus.private.cloud", map[string]string{}, ""},
// private cloud without endpoint suffix
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "cloud": "Private"}, true, none, "", map[string]string{}, ""},
// endpoint suffix without cloud
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "endpointSuffix": "ignored"}, false, queue, defaultSuffix, map[string]string{}, ""},
// connection not set
{map[string]string{"queueName": queueName}, true, queue, map[string]string{}, ""},
{map[string]string{"queueName": queueName}, true, queue, "", map[string]string{}, ""},
// connection set in auth params
{map[string]string{"queueName": queueName}, false, queue, map[string]string{"connection": connectionSetting}, ""},
{map[string]string{"queueName": queueName}, false, queue, defaultSuffix, map[string]string{"connection": connectionSetting}, ""},
// pod identity but missing namespace
{map[string]string{"queueName": queueName}, true, queue, map[string]string{}, kedav1alpha1.PodIdentityProviderAzure},
{map[string]string{"queueName": queueName}, true, queue, "", map[string]string{}, kedav1alpha1.PodIdentityProviderAzure},
// correct pod identity
{map[string]string{"queueName": queueName, "namespace": namespaceName}, false, queue, map[string]string{}, kedav1alpha1.PodIdentityProviderAzure},
{map[string]string{"queueName": queueName, "namespace": namespaceName}, false, queue, defaultSuffix, map[string]string{}, kedav1alpha1.PodIdentityProviderAzure},
}

var azServiceBusMetricIdentifiers = []azServiceBusMetricIdentifier{
Expand Down Expand Up @@ -116,8 +128,13 @@ func TestParseServiceBusMetadata(t *testing.T) {
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
if meta != nil && meta.entityType != testData.entityType {
t.Errorf("Expected entity type %v but got %v\n", testData.entityType, meta.entityType)
if meta != nil {
if meta.entityType != testData.entityType {
t.Errorf("Expected entity type %v but got %v\n", testData.entityType, meta.entityType)
}
if meta.endpointSuffix != testData.endpointSuffix {
t.Errorf("Expected endpoint suffix %v but got %v\n", testData.endpointSuffix, meta.endpointSuffix)
}
}
}
}
Expand Down