Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stephaneey/podidentity #2738

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
- **Datadog Scaler:** Validate query to contain `{` to prevent panic on invalid query ([#2625](https://github.com/kedacore/keda/issues/2625))
- **Kafka Scaler** Make "disable" a valid value for tls auth parameter ([#2608](https://github.com/kedacore/keda/issues/2608))
- **RabbitMQ Scaler:** Include `vhost` for RabbitMQ when retrieving queue info with `useRegex` ([#2498](https://github.com/kedacore/keda/issues/2498))
- **TriggerAuthentication** Better segregation of managed identities ([#2656](https://github.com/kedacore/keda/issues/2656))

### Breaking Changes

Expand Down
3 changes: 2 additions & 1 deletion apis/keda/v1alpha1/triggerauthentication_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ const (
// AuthPodIdentity allows users to select the platform native identity
// mechanism
type AuthPodIdentity struct {
Provider PodIdentityProvider `json:"provider"`
Provider PodIdentityProvider `json:"provider"`
IdentityId string `json:"identityId"`
}

// AuthSecretTargetRef is used to authenticate using a reference to a secret
Expand Down
3 changes: 3 additions & 0 deletions config/crd/bases/keda.sh_triggerauthentications.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ spec:
provider:
description: PodIdentityProvider contains the list of providers
type: string
identityId:
description: Allows users to specify the identity which should be used by the operator to authenticate against the source
type: string
required:
- provider
type: object
Expand Down
45 changes: 42 additions & 3 deletions pkg/scalers/azure/azure_aad_podidentity.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,53 @@ import (
)

const (
msiURL = "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=%s"
msiURL = "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=%s"
msiURLWithIdentityId = "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=%s&client_id=%s"
)

// GetAzureADPodIdentityToken returns the AADToken for resource
func GetAzureADPodIdentityToken(ctx context.Context, httpClient util.HTTPDoer, audience string) (AADToken, error) {
func GetAzureServiceBusADPodIdentityToken(ctx context.Context, httpClient util.HTTPDoer, audience string) (AADToken, error) {
var token AADToken

urlStr := fmt.Sprintf(msiURL, url.QueryEscape(audience))

req, err := http.NewRequestWithContext(ctx, "GET", urlStr, nil)
if err != nil {
return token, err
}
req.Header = map[string][]string{
"Metadata": {"true"},
}

resp, err := httpClient.Do(req)
if err != nil {
return token, err
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return token, err
}

err = json.Unmarshal(body, &token)
if err != nil {
return token, errors.New(string(body))
}
return token, nil
}

// GetAzureADPodIdentityToken returns the AADToken for resource
func GetAzureADPodIdentityToken(ctx context.Context, httpClient util.HTTPDoer, audience string, identityId string) (AADToken, error) {
var token AADToken

//urlStr := fmt.Sprintf(msiURL, url.QueryEscape(audience))
var urlStr string
if identityId == "" {
urlStr = fmt.Sprintf(msiURL, url.QueryEscape(audience))
} else {
urlStr = fmt.Sprintf(msiURLWithIdentityId, url.QueryEscape(audience), identityId)
}

req, err := http.NewRequestWithContext(ctx, "GET", urlStr, nil)
if err != nil {
return token, err
Expand Down
6 changes: 3 additions & 3 deletions pkg/scalers/azure/azure_app_insights.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func toISO8601(time string) (string, error) {
return fmt.Sprintf("PT%02dH%02dM", hours, minutes), nil
}

func getAuthConfig(info AppInsightsInfo, podIdentity kedav1alpha1.PodIdentityProvider) auth.AuthorizerConfig {
if podIdentity == "" || podIdentity == kedav1alpha1.PodIdentityProviderNone {
func getAuthConfig(info AppInsightsInfo, podIdentity kedav1alpha1.AuthPodIdentity) auth.AuthorizerConfig {
if podIdentity.Provider == "" || podIdentity.Provider == kedav1alpha1.PodIdentityProviderNone {
config := auth.NewClientCredentialsConfig(info.ClientID, info.ClientPassword, info.TenantID)
config.Resource = appInsightsResource
return config
Expand Down Expand Up @@ -102,7 +102,7 @@ func queryParamsForAppInsightsRequest(info AppInsightsInfo) (map[string]interfac
}

// GetAzureAppInsightsMetricValue returns the value of an Azure App Insights metric, rounded to the nearest int
func GetAzureAppInsightsMetricValue(ctx context.Context, info AppInsightsInfo, podIdentity kedav1alpha1.PodIdentityProvider) (int32, error) {
func GetAzureAppInsightsMetricValue(ctx context.Context, info AppInsightsInfo, podIdentity kedav1alpha1.AuthPodIdentity) (int32, error) {
config := getAuthConfig(info, podIdentity)
authorizer, err := config.Authorizer()
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/scalers/azure/azure_app_insights_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,13 @@ type testAppInsightsAuthConfigTestData struct {
testName string
expectMSI bool
info AppInsightsInfo
podIdentity kedav1alpha1.PodIdentityProvider
podIdentity kedav1alpha1.AuthPodIdentity
}

var testAppInsightsAuthConfigData = []testAppInsightsAuthConfigTestData{
{"client credentials", false, AppInsightsInfo{ClientID: "1234", ClientPassword: "pw", TenantID: "5678"}, ""},
{"client credentials - pod id none", false, AppInsightsInfo{ClientID: "1234", ClientPassword: "pw", TenantID: "5678"}, kedav1alpha1.PodIdentityProviderNone},
{"azure pod identity", true, AppInsightsInfo{}, kedav1alpha1.PodIdentityProviderAzure},
{"client credentials", false, AppInsightsInfo{ClientID: "1234", ClientPassword: "pw", TenantID: "5678"}, kedav1alpha1.AuthPodIdentity{}},
{"client credentials - pod id none", false, AppInsightsInfo{ClientID: "1234", ClientPassword: "pw", TenantID: "5678"}, kedav1alpha1.AuthPodIdentity{}},
{"azure pod identity", true, AppInsightsInfo{}, kedav1alpha1.AuthPodIdentity{Provider: "azure"}},
}

func TestAzAppInfoGetAuthConfig(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/azure/azure_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

// GetAzureBlobListLength returns the count of the blobs in blob container in int
func GetAzureBlobListLength(ctx context.Context, httpClient util.HTTPDoer, podIdentity kedav1alpha1.PodIdentityProvider, connectionString, blobContainerName string, accountName string, blobDelimiter string, blobPrefix string, endpointSuffix string) (int, error) {
func GetAzureBlobListLength(ctx context.Context, httpClient util.HTTPDoer, podIdentity kedav1alpha1.AuthPodIdentity, connectionString, blobContainerName string, accountName string, blobDelimiter string, blobPrefix string, endpointSuffix string) (int, error) {
credential, endpoint, err := ParseAzureStorageBlobConnection(ctx, httpClient, podIdentity, connectionString, accountName, endpointSuffix)
if err != nil {
return -1, err
Expand Down
6 changes: 4 additions & 2 deletions pkg/scalers/azure/azure_blob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"net/http"
"strings"
"testing"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
)

func TestGetBlobLength(t *testing.T) {
httpClient := http.DefaultClient
length, err := GetAzureBlobListLength(context.TODO(), httpClient, "", "", "blobContainerName", "", "", "", "")
length, err := GetAzureBlobListLength(context.TODO(), httpClient, kedav1alpha1.AuthPodIdentity{}, "", "blobContainerName", "", "", "", "")
if length != -1 {
t.Error("Expected length to be -1, but got", length)
}
Expand All @@ -22,7 +24,7 @@ func TestGetBlobLength(t *testing.T) {
t.Error("Expected error to contain parsing error message, but got", err.Error())
}

length, err = GetAzureBlobListLength(context.TODO(), httpClient, "", "DefaultEndpointsProtocol=https;AccountName=name;AccountKey=key==;EndpointSuffix=core.windows.net", "blobContainerName", "", "", "", "")
length, err = GetAzureBlobListLength(context.TODO(), httpClient, kedav1alpha1.AuthPodIdentity{}, "DefaultEndpointsProtocol=https;AccountName=name;AccountKey=key==;EndpointSuffix=core.windows.net", "blobContainerName", "", "", "", "")

if length != -1 {
t.Error("Expected length to be -1, but got", length)
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/azure/azure_eventhub_checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (checkpointer *defaultCheckpointer) extractCheckpoint(get *azblob.DownloadR
}

func getCheckpoint(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, checkpointer checkpointer) (Checkpoint, error) {
blobCreds, storageEndpoint, err := ParseAzureStorageBlobConnection(ctx, httpClient, kedav1alpha1.PodIdentityProviderNone, info.StorageConnection, "", "")
blobCreds, storageEndpoint, err := ParseAzureStorageBlobConnection(ctx, httpClient, kedav1alpha1.AuthPodIdentity{Provider: "none"}, info.StorageConnection, "", "")
if err != nil {
return Checkpoint{}, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/scalers/azure/azure_eventhub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/go-playground/assert/v2"
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
)

// Add a valid Storage account connection string here
Expand Down Expand Up @@ -294,7 +295,7 @@ func TestShouldParseCheckpointForGoSdk(t *testing.T) {
func createNewCheckpointInStorage(urlPath string, containerName string, partitionID string, checkpoint string, metadata map[string]string) (context.Context, error) {
ctx := context.Background()

credential, endpoint, _ := ParseAzureStorageBlobConnection(ctx, http.DefaultClient, "none", StorageConnectionString, "", "")
credential, endpoint, _ := ParseAzureStorageBlobConnection(ctx, http.DefaultClient, kedav1alpha1.AuthPodIdentity{Provider: "none"}, StorageConnectionString, "", "")

// Create container
path, _ := url.Parse(containerName)
Expand Down
4 changes: 2 additions & 2 deletions pkg/scalers/azure/azure_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ type MonitorInfo struct {
var azureMonitorLog = logf.Log.WithName("azure_monitor_scaler")

// GetAzureMetricValue returns the value of an Azure Monitor metric, rounded to the nearest int
func GetAzureMetricValue(ctx context.Context, info MonitorInfo, podIdentity kedav1alpha1.PodIdentityProvider) (int32, error) {
func GetAzureMetricValue(ctx context.Context, info MonitorInfo, podIdentity kedav1alpha1.AuthPodIdentity) (int32, error) {
var podIdentityEnabled = true

if podIdentity == "" || podIdentity == kedav1alpha1.PodIdentityProviderNone {
if podIdentity.Provider == "" || podIdentity.Provider == kedav1alpha1.PodIdentityProviderNone {
podIdentityEnabled = false
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/azure/azure_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
)

// GetAzureQueueLength returns the length of a queue in int
func GetAzureQueueLength(ctx context.Context, httpClient util.HTTPDoer, podIdentity kedav1alpha1.PodIdentityProvider, connectionString, queueName, accountName, endpointSuffix string) (int32, error) {
func GetAzureQueueLength(ctx context.Context, httpClient util.HTTPDoer, podIdentity kedav1alpha1.AuthPodIdentity, connectionString, queueName, accountName, endpointSuffix string) (int32, error) {
credential, endpoint, err := ParseAzureStorageQueueConnection(ctx, httpClient, podIdentity, connectionString, accountName, endpointSuffix)
if err != nil {
return -1, err
Expand Down
6 changes: 4 additions & 2 deletions pkg/scalers/azure/azure_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"net/http"
"strings"
"testing"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
)

func TestGetQueueLength(t *testing.T) {
length, err := GetAzureQueueLength(context.TODO(), http.DefaultClient, "", "", "queueName", "", "")
length, err := GetAzureQueueLength(context.TODO(), http.DefaultClient, kedav1alpha1.AuthPodIdentity{}, "", "queueName", "", "")
if length != -1 {
t.Error("Expected length to be -1, but got", length)
}
Expand All @@ -21,7 +23,7 @@ func TestGetQueueLength(t *testing.T) {
t.Error("Expected error to contain parsing error message, but got", err.Error())
}

length, err = GetAzureQueueLength(context.TODO(), http.DefaultClient, "", "DefaultEndpointsProtocol=https;AccountName=name;AccountKey=key==;EndpointSuffix=core.windows.net", "queueName", "", "")
length, err = GetAzureQueueLength(context.TODO(), http.DefaultClient, kedav1alpha1.AuthPodIdentity{}, "DefaultEndpointsProtocol=https;AccountName=name;AccountKey=key==;EndpointSuffix=core.windows.net", "queueName", "", "")

if length != -1 {
t.Error("Expected length to be -1, but got", length)
Expand Down
19 changes: 11 additions & 8 deletions pkg/scalers/azure/azure_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,11 @@ func ParseAzureStorageEndpointSuffix(metadata map[string]string, endpointType St
}

// ParseAzureStorageQueueConnection parses queue connection string and returns credential and resource url
func ParseAzureStorageQueueConnection(ctx context.Context, httpClient util.HTTPDoer, podIdentity kedav1alpha1.PodIdentityProvider, connectionString, accountName, endpointSuffix string) (azqueue.Credential, *url.URL, error) {
switch podIdentity {
func ParseAzureStorageQueueConnection(ctx context.Context, httpClient util.HTTPDoer, podIdentity kedav1alpha1.AuthPodIdentity, connectionString, accountName, endpointSuffix string) (azqueue.Credential, *url.URL, error) {

switch podIdentity.Provider {
case kedav1alpha1.PodIdentityProviderAzure:
token, endpoint, err := parseAcessTokenAndEndpoint(ctx, httpClient, accountName, endpointSuffix)
token, endpoint, err := parseAcessTokenAndEndpoint(ctx, httpClient, accountName, endpointSuffix, podIdentity.IdentityId)
if err != nil {
return nil, nil, err
}
Expand All @@ -106,10 +107,11 @@ func ParseAzureStorageQueueConnection(ctx context.Context, httpClient util.HTTPD
}

// ParseAzureStorageBlobConnection parses blob connection string and returns credential and resource url
func ParseAzureStorageBlobConnection(ctx context.Context, httpClient util.HTTPDoer, podIdentity kedav1alpha1.PodIdentityProvider, connectionString, accountName, endpointSuffix string) (azblob.Credential, *url.URL, error) {
switch podIdentity {
func ParseAzureStorageBlobConnection(ctx context.Context, httpClient util.HTTPDoer, podIdentity kedav1alpha1.AuthPodIdentity, connectionString, accountName, endpointSuffix string) (azblob.Credential, *url.URL, error) {

switch podIdentity.Provider {
case kedav1alpha1.PodIdentityProviderAzure:
token, endpoint, err := parseAcessTokenAndEndpoint(ctx, httpClient, accountName, endpointSuffix)
token, endpoint, err := parseAcessTokenAndEndpoint(ctx, httpClient, accountName, endpointSuffix, podIdentity.IdentityId)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -190,9 +192,10 @@ func parseAzureStorageConnectionString(connectionString string, endpointType Sto
return u, name, key, nil
}

func parseAcessTokenAndEndpoint(ctx context.Context, httpClient util.HTTPDoer, accountName string, endpointSuffix string) (string, *url.URL, error) {
func parseAcessTokenAndEndpoint(ctx context.Context, httpClient util.HTTPDoer, accountName string, endpointSuffix string, identityId string) (string, *url.URL, error) {
// Azure storage resource is "https://storage.azure.com/" in all cloud environments
token, err := GetAzureADPodIdentityToken(ctx, httpClient, "https://storage.azure.com/")

token, err := GetAzureADPodIdentityToken(ctx, httpClient, "https://storage.azure.com/", identityId)
if err != nil {
return "", nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/azure_app_insights_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var azureAppInsightsLog = logf.Log.WithName("azure_app_insights_scaler")

type azureAppInsightsScaler struct {
metadata *azureAppInsightsMetadata
podIdentity kedav1alpha1.PodIdentityProvider
podIdentity kedav1alpha1.AuthPodIdentity
}

// NewAzureAppInsightsScaler creates a new AzureAppInsightsScaler
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/azure_app_insights_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestAzureAppInsightsGetMetricSpecForScaling(t *testing.T) {
}
mockAzureAppInsightsScaler := azureAppInsightsScaler{
metadata: meta,
podIdentity: kedav1alpha1.PodIdentityProviderAzure,
podIdentity: kedav1alpha1.AuthPodIdentity{Provider: "azure"},
}

metricSpec := mockAzureAppInsightsScaler.GetMetricSpecForScaling(ctx)
Expand Down
22 changes: 11 additions & 11 deletions pkg/scalers/azure_blob_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (

type azureBlobScaler struct {
metadata *azureBlobMetadata
podIdentity kedav1alpha1.PodIdentityProvider
podIdentity kedav1alpha1.AuthPodIdentity
httpClient *http.Client
}

Expand Down Expand Up @@ -75,7 +75,7 @@ func NewAzureBlobScaler(config *ScalerConfig) (Scaler, error) {
}, nil
}

func parseAzureBlobMetadata(config *ScalerConfig) (*azureBlobMetadata, kedav1alpha1.PodIdentityProvider, error) {
func parseAzureBlobMetadata(config *ScalerConfig) (*azureBlobMetadata, kedav1alpha1.AuthPodIdentity, error) {
meta := azureBlobMetadata{}
meta.targetBlobCount = defaultTargetBlobCount
meta.blobDelimiter = defaultBlobDelimiter
Expand All @@ -85,7 +85,7 @@ func parseAzureBlobMetadata(config *ScalerConfig) (*azureBlobMetadata, kedav1alp
blobCount, err := strconv.Atoi(val)
if err != nil {
azureBlobLog.Error(err, "Error parsing azure blob metadata", "blobCountMetricName", blobCountMetricName)
return nil, "", fmt.Errorf("error parsing azure blob metadata %s: %s", blobCountMetricName, err.Error())
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("error parsing azure blob metadata %s: %s", blobCountMetricName, err.Error())
}

meta.targetBlobCount = blobCount
Expand All @@ -94,7 +94,7 @@ func parseAzureBlobMetadata(config *ScalerConfig) (*azureBlobMetadata, kedav1alp
if val, ok := config.TriggerMetadata["blobContainerName"]; ok && val != "" {
meta.blobContainerName = val
} else {
return nil, "", fmt.Errorf("no blobContainerName given")
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no blobContainerName given")
}

if val, ok := config.TriggerMetadata["blobDelimiter"]; ok && val != "" {
Expand All @@ -107,14 +107,14 @@ func parseAzureBlobMetadata(config *ScalerConfig) (*azureBlobMetadata, kedav1alp

endpointSuffix, err := azure.ParseAzureStorageEndpointSuffix(config.TriggerMetadata, azure.BlobEndpoint)
if err != nil {
return nil, "", err
return nil, kedav1alpha1.AuthPodIdentity{}, err
}

meta.endpointSuffix = endpointSuffix

// before triggerAuthentication CRD, pod identity was configured using this property
if val, ok := config.TriggerMetadata["useAAdPodIdentity"]; ok && config.PodIdentity == "" && val == "true" {
config.PodIdentity = kedav1alpha1.PodIdentityProviderAzure
if val, ok := config.TriggerMetadata["useAAdPodIdentity"]; ok && config.PodIdentity.Provider == "" && val == "true" {
config.PodIdentity = kedav1alpha1.AuthPodIdentity{}
}

if val, ok := config.TriggerMetadata["metricName"]; ok {
Expand All @@ -125,7 +125,7 @@ func parseAzureBlobMetadata(config *ScalerConfig) (*azureBlobMetadata, kedav1alp

// If the Use AAD Pod Identity is not present, or set to "none"
// then check for connection string
switch config.PodIdentity {
switch config.PodIdentity.Provider {
case "", kedav1alpha1.PodIdentityProviderNone:
// Azure Blob Scaler expects a "connection" parameter in the metadata
// of the scaler or in a TriggerAuthentication object
Expand All @@ -136,17 +136,17 @@ func parseAzureBlobMetadata(config *ScalerConfig) (*azureBlobMetadata, kedav1alp
}

if len(meta.connection) == 0 {
return nil, "", fmt.Errorf("no connection setting given")
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no connection setting given")
}
case kedav1alpha1.PodIdentityProviderAzure:
// If the Use AAD Pod Identity is present then check account name
if val, ok := config.TriggerMetadata["accountName"]; ok && val != "" {
meta.accountName = val
} else {
return nil, "", fmt.Errorf("no accountName given")
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no accountName given")
}
default:
return nil, "", fmt.Errorf("pod identity %s not supported for azure storage blobs", config.PodIdentity)
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("pod identity %s not supported for azure storage blobs", config.PodIdentity)
}

meta.scalerIndex = config.ScalerIndex
Expand Down
Loading