Skip to content

Commit

Permalink
feat(event-hubs-scaler): Add pod/workload identity support to checkpo…
Browse files Browse the repository at this point in the history
…int in Azure Blob Storage (#3573)

Co-authored-by: Vighnesh Shenoy <[email protected]>
  • Loading branch information
andyatwork and v-shenoy authored Oct 31, 2022
1 parent 1d6d0f1 commit 92efd86
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **Azure AD Pod Identity Authentication:** Improve error messages to emphasize problems around the integration with aad-pod-identity itself ([#3610](https://github.com/kedacore/keda/issues/3610))
- **Azure Pipelines Scaler:** Improved speed of profiling large set of Job Requests from Azure Pipelines ([#3702](https://github.com/kedacore/keda/issues/3702))
- **Prometheus Scaler:** Introduce skipping of certificate check for unsigned certs ([#2310](https://github.com/kedacore/keda/issues/2310))
- **Event Hubs Scaler:** Support Azure Active Direcotry Pod & Workload Identity for Storage Blobs ([#3569](https://github.com/kedacore/keda/issues/3569))

### Fixes

Expand Down
3 changes: 3 additions & 0 deletions pkg/scalers/azure/azure_cloud_environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ const (

// Default Endpoint key in trigger metadata
DefaultEndpointSuffixKey string = "endpointSuffix"

// Default Storage Endpoint key in trigger metadata
DefaultStorageSuffixKey string = "storageEndpointSuffix"
)

// EnvironmentPropertyProvider for different types of Azure scalers
Expand Down
2 changes: 2 additions & 0 deletions pkg/scalers/azure/azure_cloud_environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ var parseEnvironmentPropertyTestDataset = []parseEnvironmentPropertyTestData{
{map[string]string{"cloud": "Private", "endpointSuffix": "suffix.private.cloud"}, "suffix.private.cloud", DefaultEndpointSuffixKey, testPropertyProvider, false},
{map[string]string{"endpointSuffix": "ignored"}, "AzurePublicCloud.suffix", DefaultEndpointSuffixKey, testPropertyProvider, false},
{map[string]string{"cloud": "Private", "endpointSuffixDiff": "suffix.private.cloud"}, "suffix.private.cloud", "endpointSuffixDiff", testPropertyProvider, false},
{map[string]string{"cloud": "Private", "storageEndpointSuffix": "suffix.private.cloud"}, "suffix.private.cloud", DefaultStorageSuffixKey, testPropertyProvider, false},
{map[string]string{"cloud": "Private"}, "suffix.private.cloud", DefaultStorageSuffixKey, testPropertyProvider, true},
}

func TestParseEnvironmentProperty(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/scalers/azure/azure_eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type EventHubInfo struct {
EventHubConnection string
EventHubConsumerGroup string
StorageConnection string
StorageAccountName string
BlobStorageEndpoint string
BlobContainer string
Namespace string
EventHubName string
Expand Down
16 changes: 15 additions & 1 deletion pkg/scalers/azure/azure_eventhub_checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,22 @@ func (checkpointer *defaultCheckpointer) extractCheckpoint(get *azblob.DownloadR
}

func getCheckpoint(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, checkpointer checkpointer) (Checkpoint, error) {
var podIdentity = info.PodIdentity

// For back-compat, prefer a connection string over pod identity when present
if len(info.StorageConnection) != 0 {
podIdentity.Provider = kedav1alpha1.PodIdentityProviderNone
}

if podIdentity.Provider == kedav1alpha1.PodIdentityProviderAzure || podIdentity.Provider == kedav1alpha1.PodIdentityProviderAzureWorkload {
if len(info.StorageAccountName) == 0 {
return Checkpoint{}, fmt.Errorf("storageAccountName not supplied when PodIdentity authentication is enabled")
}
}

blobCreds, storageEndpoint, err := ParseAzureStorageBlobConnection(ctx, httpClient,
kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderNone}, info.StorageConnection, "", "")
podIdentity, info.StorageConnection, info.StorageAccountName, info.BlobStorageEndpoint)

if err != nil {
return Checkpoint{}, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/scalers/azure/azure_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func ParseAzureStorageEndpointSuffix(metadata map[string]string, endpointType St
func ParseAzureStorageQueueConnection(ctx context.Context, httpClient util.HTTPDoer, podIdentity kedav1alpha1.AuthPodIdentity, connectionString, accountName, endpointSuffix string) (azqueue.Credential, *url.URL, error) {
switch podIdentity.Provider {
case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload:
token, endpoint, err := parseAcessTokenAndEndpoint(ctx, httpClient, accountName, endpointSuffix, podIdentity)
token, endpoint, err := parseAccessTokenAndEndpoint(ctx, httpClient, accountName, endpointSuffix, podIdentity)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -111,7 +111,7 @@ func ParseAzureStorageQueueConnection(ctx context.Context, httpClient util.HTTPD
func ParseAzureStorageBlobConnection(ctx context.Context, httpClient util.HTTPDoer, podIdentity kedav1alpha1.AuthPodIdentity, connectionString, accountName, endpointSuffix string) (azblob.Credential, *url.URL, error) {
switch podIdentity.Provider {
case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload:
token, endpoint, err := parseAcessTokenAndEndpoint(ctx, httpClient, accountName, endpointSuffix, podIdentity)
token, endpoint, err := parseAccessTokenAndEndpoint(ctx, httpClient, accountName, endpointSuffix, podIdentity)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -192,7 +192,7 @@ func parseAzureStorageConnectionString(connectionString string, endpointType Sto
return u, name, key, nil
}

func parseAcessTokenAndEndpoint(ctx context.Context, httpClient util.HTTPDoer, accountName string, endpointSuffix string,
func parseAccessTokenAndEndpoint(ctx context.Context, httpClient util.HTTPDoer, accountName string, endpointSuffix string,
podIdentity kedav1alpha1.AuthPodIdentity) (string, *url.URL, error) {
var token AADToken
var err error
Expand Down
80 changes: 62 additions & 18 deletions pkg/scalers/azure_eventhub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ func NewAzureEventHubScaler(ctx context.Context, config *ScalerConfig) (Scaler,
return nil, fmt.Errorf("error getting scaler metric type: %s", err)
}

parsedMetadata, err := parseAzureEventHubMetadata(config)
logger := InitializeLogger(config, "azure_eventhub_scaler")

parsedMetadata, err := parseAzureEventHubMetadata(logger, config)
if err != nil {
return nil, fmt.Errorf("unable to get eventhub metadata: %s", err)
}
Expand All @@ -85,21 +87,36 @@ func NewAzureEventHubScaler(ctx context.Context, config *ScalerConfig) (Scaler,
metadata: parsedMetadata,
client: hub,
httpClient: kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, false),
logger: InitializeLogger(config, "azure_eventhub_scaler"),
logger: logger,
}, nil
}

// parseAzureEventHubMetadata parses metadata
func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error) {
func parseAzureEventHubMetadata(logger logr.Logger, config *ScalerConfig) (*eventHubMetadata, error) {
meta := eventHubMetadata{
eventHubInfo: azure.EventHubInfo{},
}

err := parseCommonAzureEventHubMetadata(config, &meta)
if err != nil {
return nil, err
}

err = parseAzureEventHubAuthenticationMetadata(logger, config, &meta)
if err != nil {
return nil, err
}

return &meta, nil
}

func parseCommonAzureEventHubMetadata(config *ScalerConfig, meta *eventHubMetadata) error {
meta.threshold = defaultEventHubMessageThreshold

if val, ok := config.TriggerMetadata[thresholdMetricName]; ok {
threshold, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing azure eventhub metadata %s: %s", thresholdMetricName, err)
return fmt.Errorf("error parsing azure eventhub metadata %s: %s", thresholdMetricName, err)
}

meta.threshold = threshold
Expand All @@ -109,7 +126,7 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error)
if val, ok := config.TriggerMetadata[activationThresholdMetricName]; ok {
activationThreshold, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing azure eventhub metadata %s: %s", activationThresholdMetricName, err)
return fmt.Errorf("error parsing azure eventhub metadata %s: %s", activationThresholdMetricName, err)
}

meta.activationThreshold = activationThreshold
Expand All @@ -121,10 +138,6 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error)
meta.eventHubInfo.StorageConnection = config.ResolvedEnv[config.TriggerMetadata["storageConnectionFromEnv"]]
}

if len(meta.eventHubInfo.StorageConnection) == 0 {
return nil, fmt.Errorf("no storage connection string given")
}

meta.eventHubInfo.EventHubConsumerGroup = defaultEventHubConsumerGroup
if val, ok := config.TriggerMetadata["consumerGroup"]; ok {
meta.eventHubInfo.EventHubConsumerGroup = val
Expand All @@ -146,7 +159,7 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error)
if resourceURL, ok := config.TriggerMetadata["eventHubResourceURL"]; ok {
meta.eventHubInfo.EventHubResourceURL = resourceURL
} else {
return nil, fmt.Errorf("eventHubResourceURL must be provided for %s cloud type", azure.PrivateCloud)
return fmt.Errorf("eventHubResourceURL must be provided for %s cloud type", azure.PrivateCloud)
}
}
}
Expand All @@ -156,37 +169,70 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error)
}
serviceBusEndpointSuffix, err := azure.ParseEnvironmentProperty(config.TriggerMetadata, azure.DefaultEndpointSuffixKey, serviceBusEndpointSuffixProvider)
if err != nil {
return nil, err
return err
}
meta.eventHubInfo.ServiceBusEndpointSuffix = serviceBusEndpointSuffix

activeDirectoryEndpoint, err := azure.ParseActiveDirectoryEndpoint(config.TriggerMetadata)
if err != nil {
return nil, err
return err
}
meta.eventHubInfo.ActiveDirectoryEndpoint = activeDirectoryEndpoint

meta.scalerIndex = config.ScalerIndex

return nil
}

func parseAzureEventHubAuthenticationMetadata(logger logr.Logger, config *ScalerConfig, meta *eventHubMetadata) error {
meta.eventHubInfo.PodIdentity = config.PodIdentity

switch config.PodIdentity.Provider {
case "", v1alpha1.PodIdentityProviderNone:
if len(meta.eventHubInfo.StorageConnection) == 0 {
return fmt.Errorf("no storage connection string given")
}

if config.AuthParams["connection"] != "" {
meta.eventHubInfo.EventHubConnection = config.AuthParams["connection"]
} else if config.TriggerMetadata["connectionFromEnv"] != "" {
meta.eventHubInfo.EventHubConnection = config.ResolvedEnv[config.TriggerMetadata["connectionFromEnv"]]
}

if len(meta.eventHubInfo.EventHubConnection) == 0 {
return nil, fmt.Errorf("no event hub connection string given")
return fmt.Errorf("no event hub connection string given")
}
case v1alpha1.PodIdentityProviderAzure, v1alpha1.PodIdentityProviderAzureWorkload:
meta.eventHubInfo.StorageAccountName = ""
if val, ok := config.TriggerMetadata["storageAccountName"]; ok {
meta.eventHubInfo.StorageAccountName = val
} else {
logger.Info("no 'storageAccountName' provided to enable identity based authentication to Blob Storage. Attempting to use connection string instead")
}

if len(meta.eventHubInfo.StorageAccountName) != 0 {
storageEndpointSuffixProvider := func(env az.Environment) (string, error) {
return env.StorageEndpointSuffix, nil
}
storageEndpointSuffix, err := azure.ParseEnvironmentProperty(config.TriggerMetadata, azure.DefaultStorageSuffixKey, storageEndpointSuffixProvider)
if err != nil {
return err
}
meta.eventHubInfo.BlobStorageEndpoint = "blob." + storageEndpointSuffix
}

if len(meta.eventHubInfo.StorageConnection) == 0 && len(meta.eventHubInfo.StorageAccountName) == 0 {
return fmt.Errorf("no storage connection string or storage account name for pod identity based authentication given")
}

if config.TriggerMetadata["eventHubNamespace"] != "" {
meta.eventHubInfo.Namespace = config.TriggerMetadata["eventHubNamespace"]
} else if config.TriggerMetadata["eventHubNamespaceFromEnv"] != "" {
meta.eventHubInfo.Namespace = config.ResolvedEnv[config.TriggerMetadata["eventHubNamespaceFromEnv"]]
}

if len(meta.eventHubInfo.Namespace) == 0 {
return nil, fmt.Errorf("no event hub namespace string given")
return fmt.Errorf("no event hub namespace string given")
}

if config.TriggerMetadata["eventHubName"] != "" {
Expand All @@ -196,13 +242,11 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error)
}

if len(meta.eventHubInfo.EventHubName) == 0 {
return nil, fmt.Errorf("no event hub name string given")
return fmt.Errorf("no event hub name string given")
}
}

meta.scalerIndex = config.ScalerIndex

return &meta, nil
return nil
}

// GetUnprocessedEventCountInPartition gets number of unprocessed events in a given partition
Expand Down
22 changes: 18 additions & 4 deletions pkg/scalers/azure_eventhub_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/go-logr/logr"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/scalers/azure"
Expand All @@ -20,6 +21,7 @@ const (
eventHubConnectionSetting = "testEventHubConnectionSetting"
storageConnectionSetting = "testStorageConnectionSetting"
serviceBusEndpointSuffix = "serviceBusEndpointSuffix"
storageEndpointSuffix = "storageEndpointSuffix"
activeDirectoryEndpoint = "activeDirectoryEndpoint"
eventHubResourceURL = "eventHubResourceURL"
testEventHubNamespace = "kedatesteventhub"
Expand Down Expand Up @@ -87,6 +89,18 @@ var parseEventHubMetadataDatasetWithPodIdentity = []parseEventHubMetadataTestDat
// properly formed metadata with private cloud
{map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName,
"eventHubNamespace": testEventHubNamespace, "cloud": "private", "endpointSuffix": serviceBusEndpointSuffix, "activeDirectoryEndpoint": activeDirectoryEndpoint, "eventHubResourceURL": eventHubResourceURL}, false},
// properly formed event hub metadata with Pod Identity and no storage connection string
{map[string]string{"storageAccountName": "blobstorage", "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, false},
// event hub metadata with Pod Identity, no storage connection string, no storageAccountName - should fail
{map[string]string{"consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, true},
// event hub metadata with Pod Identity, no storage connection string, empty storageAccountName - should fail
{map[string]string{"storageAccount": "", "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, true},
// event hub metadata with Pod Identity, storage connection string, empty storageAccountName - should ignore pod identity for blob storage and succeed
{map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "storageAccountName": "", "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, false},
// event hub metadata with Pod Identity and no storage connection string, private cloud and no storageEndpointSuffix - should fail
{map[string]string{"cloud": "private", "storageAccountName": "blobstorage", "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, true},
// properly formed event hub metadata with Pod Identity and no storage connection string, private cloud and storageEndpointSuffix
{map[string]string{"cloud": "private", "endpointSuffix": serviceBusEndpointSuffix, "activeDirectoryEndpoint": activeDirectoryEndpoint, "eventHubResourceURL": eventHubResourceURL, "storageAccountName": "aStorageAccount", "storageEndpointSuffix": storageEndpointSuffix, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, false},
}

var eventHubMetricIdentifiers = []eventHubMetricIdentifier{
Expand All @@ -106,7 +120,7 @@ var testEventHubScaler = azureEventHubScaler{
func TestParseEventHubMetadata(t *testing.T) {
// Test first with valid resolved environment
for _, testData := range parseEventHubMetadataDataset {
_, err := parseAzureEventHubMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: sampleEventHubResolvedEnv, AuthParams: map[string]string{}})
_, err := parseAzureEventHubMetadata(logr.Discard(), &ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: sampleEventHubResolvedEnv, AuthParams: map[string]string{}})

if err != nil && !testData.isError {
t.Errorf("Expected success but got error: %s", err)
Expand All @@ -117,7 +131,7 @@ func TestParseEventHubMetadata(t *testing.T) {
}

for _, testData := range parseEventHubMetadataDatasetWithPodIdentity {
_, err := parseAzureEventHubMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: sampleEventHubResolvedEnv,
_, err := parseAzureEventHubMetadata(logr.Discard(), &ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: sampleEventHubResolvedEnv,
AuthParams: map[string]string{}, PodIdentity: kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderAzure}})

if err != nil && !testData.isError {
Expand All @@ -129,7 +143,7 @@ func TestParseEventHubMetadata(t *testing.T) {
}

for _, testData := range parseEventHubMetadataDatasetWithPodIdentity {
_, err := parseAzureEventHubMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: sampleEventHubResolvedEnv,
_, err := parseAzureEventHubMetadata(logr.Discard(), &ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: sampleEventHubResolvedEnv,
AuthParams: map[string]string{}, PodIdentity: kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderAzureWorkload}})

if err != nil && !testData.isError {
Expand Down Expand Up @@ -476,7 +490,7 @@ func DeleteContainerInStorage(ctx context.Context, endpoint *url.URL, credential

func TestEventHubGetMetricSpecForScaling(t *testing.T) {
for _, testData := range eventHubMetricIdentifiers {
meta, err := parseAzureEventHubMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: sampleEventHubResolvedEnv, AuthParams: map[string]string{}, ScalerIndex: testData.scalerIndex})
meta, err := parseAzureEventHubMetadata(logr.Discard(), &ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: sampleEventHubResolvedEnv, AuthParams: map[string]string{}, ScalerIndex: testData.scalerIndex})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
Expand Down

0 comments on commit 92efd86

Please sign in to comment.