Skip to content

Commit

Permalink
RabbitMQ scaler changes
Browse files Browse the repository at this point in the history
Signed-off-by: Youn Jae Kim <[email protected]>
  • Loading branch information
aagusuab committed Nov 22, 2024
1 parent 87cc1ca commit 23603d7
Show file tree
Hide file tree
Showing 13 changed files with 45 additions and 882 deletions.
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ require (
cloud.google.com/go/secretmanager v1.14.2
cloud.google.com/go/storage v1.43.0
dario.cat/mergo v1.0.1
github.com/Azure/azure-amqp-common-go/v4 v4.2.0
github.com/Azure/azure-kusto-go v0.16.1
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.16.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.0
Expand All @@ -20,7 +19,6 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.1
github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue v1.0.0
github.com/Azure/go-autorest/autorest v0.11.29
github.com/Azure/go-autorest/autorest/azure/auth v0.5.13
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.3
github.com/DataDog/datadog-api-client-go v1.16.0
github.com/Huawei/gophercloud v1.0.21
Expand Down Expand Up @@ -118,6 +116,8 @@ require (
sigs.k8s.io/kustomize/kustomize/v5 v5.5.0
)

require github.com/Azure/go-autorest/autorest/adal v0.9.23 // indirect

replace (
// pin k8s.io to v0.31.2 & sigs.k8s.io/controller-runtime to v0.19.1
github.com/google/cel-go => github.com/google/cel-go v0.20.1
Expand Down Expand Up @@ -204,7 +204,6 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dennwc/varint v1.0.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dimchansky/utfbom v1.1.1 // indirect
github.com/eapache/go-resiliency v1.7.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1613,8 +1613,8 @@ github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo=
github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/elazarl/goproxy v0.0.0-20220417044921-416226498f94 h1:VIy7cdK7ufs7ctpTFkXJHm1uP3dJSnCGSPysEICB1so=
github.com/elazarl/goproxy v0.0.0-20220417044921-416226498f94/go.mod h1:Ro8st/ElPeALwNFlcTpWmkr6IoMFfkjXAvTHpevnDsM=
github.com/elazarl/goproxy v0.0.0-20231117061959-7cc037d33fb5 h1:m62nsMU279qRD9PQSWD1l66kmkXzuYcnVJqL4XLeV2M=
github.com/elazarl/goproxy v0.0.0-20231117061959-7cc037d33fb5/go.mod h1:Ro8st/ElPeALwNFlcTpWmkr6IoMFfkjXAvTHpevnDsM=
github.com/emicklei/go-restful/v3 v3.12.1 h1:PJMDIM/ak7btuL8Ex0iYET9hxM3CI2sjZtzpL63nKAU=
github.com/emicklei/go-restful/v3 v3.12.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE=
Expand Down Expand Up @@ -2161,8 +2161,6 @@ github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDm
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo=
github.com/neelance/sourcemap v0.0.0-20200213170602-2833bce08e4c/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM=
github.com/newrelic/newrelic-client-go v1.1.0 h1:aflNjzQ21c+2GwBVh+UbAf9lznkRfCcVABoc5UM4IXw=
github.com/newrelic/newrelic-client-go v1.1.0/go.mod h1:RYMXt7hgYw7nzuXIGd2BH0F1AivgWw7WrBhNBQZEB4k=
github.com/newrelic/newrelic-client-go/v2 v2.51.2 h1:Xf+M0NuZuIuxqG48zYoqyIdQL514j2J1c+kNVYajcYI=
github.com/newrelic/newrelic-client-go/v2 v2.51.2/go.mod h1:+RRjI3nDGWT3kLm9Oi3QxpBm70uu8q1upEHBVWCZFpo=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
Expand Down
5 changes: 3 additions & 2 deletions pkg/scalers/azure/azure_aad_workload_identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ package azure
import (
"context"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/AzureAD/microsoft-authentication-library-for-go/apps/confidential"
"os"
"strconv"
"strings"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/AzureAD/microsoft-authentication-library-for-go/apps/confidential"
)

// Azure AD Workload Identity Webhook will inject the following environment variables.
Expand Down
36 changes: 15 additions & 21 deletions pkg/scalers/azure/azure_app_insights.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,6 @@ func toISO8601(time string) (string, error) {
return fmt.Sprintf("PT%02dH%02dM", hours, minutes), nil
}

//func getAuthConfig(ctx context.Context, info AppInsightsInfo, podIdentity kedav1alpha1.AuthPodIdentity) auth.AuthorizerConfig {
// switch podIdentity.Provider {
// case "", kedav1alpha1.PodIdentityProviderNone:
// config := auth.NewClientCredentialsConfig(info.ClientID, info.ClientPassword, info.TenantID)
// config.Resource = info.AppInsightsResourceURL
// config.AADEndpoint = info.ActiveDirectoryEndpoint
// return config
// case kedav1alpha1.PodIdentityProviderAzureWorkload:
// return NewAzureADWorkloadIdentityConfig(ctx, podIdentity.GetIdentityID(), podIdentity.GetIdentityTenantID(), podIdentity.GetIdentityAuthorityHost(), info.AppInsightsResourceURL)
// }
// return nil
//}

func extractAppInsightValue(info AppInsightsInfo, metric ApplicationInsightsMetric) (float64, error) {
if _, ok := metric.Value[info.MetricID]; !ok {
return -1, fmt.Errorf("metric named %s not found in app insights response", info.MetricID)
Expand Down Expand Up @@ -108,17 +95,24 @@ func queryParamsForAppInsightsRequest(info AppInsightsInfo) (map[string]interfac
return queryParams, nil
}

// GetAzureAppInsightsMetricValue returns the value of an Azure App Insights metric, rounded to the nearest int
func GetAzureAppInsightsMetricValue(ctx context.Context, info AppInsightsInfo, podIdentity kedav1alpha1.AuthPodIdentity, ignoreNullValues bool) (float64, error) {

//config := getAuthConfig(ctx, info, podIdentity)
func getAuthConfig(ctx context.Context, info AppInsightsInfo, podIdentity kedav1alpha1.AuthPodIdentity) (AADToken, error) {
token := AADToken{}
var err error

token, err := GetAzureADWorkloadIdentityToken(ctx, info.ClientID, info.TenantID, "", info.AppInsightsResourceURL)
//MSAL get Token here instead of the config
if err != nil {
return -1, err
switch podIdentity.Provider {
case "", kedav1alpha1.PodIdentityProviderNone:
token, err = GetAzureADWorkloadIdentityToken(ctx, info.ClientID, info.TenantID, info.ActiveDirectoryEndpoint, info.AppInsightsResourceURL)
case kedav1alpha1.PodIdentityProviderAzureWorkload:
token, err = GetAzureADWorkloadIdentityToken(ctx, podIdentity.GetIdentityID(), podIdentity.GetIdentityTenantID(), podIdentity.GetIdentityAuthorityHost(), info.AppInsightsResourceURL)
}

return token, err
}

// GetAzureAppInsightsMetricValue returns the value of an Azure App Insights metric, rounded to the nearest int
func GetAzureAppInsightsMetricValue(ctx context.Context, info AppInsightsInfo, podIdentity kedav1alpha1.AuthPodIdentity, ignoreNullValues bool) (float64, error) {
token, err := getAuthConfig(ctx, info, podIdentity)

Check warning

Code scanning / CodeQL

Useless assignment to local variable Warning

This definition of err is never used.

queryParams, err := queryParamsForAppInsightsRequest(info)
if err != nil {
return -1, err
Expand Down
36 changes: 0 additions & 36 deletions pkg/scalers/azure/azure_app_insights_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package azure

import (
"context"
"testing"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
)

type testExtractAzAppInsightsTestData struct {
Expand Down Expand Up @@ -67,39 +64,6 @@ func TestAzGetAzureAppInsightsMetricValue(t *testing.T) {
}
}

type testAppInsightsAuthConfigTestData struct {
testName string
config string
info AppInsightsInfo
podIdentity kedav1alpha1.PodIdentityProvider
}

const (
msiConfig = "msiConfig"
clientCredentialsConfig = "clientCredentialsConfig"
workloadIdentityConfig = "workloadIdentityConfig"
)

var testAppInsightsAuthConfigData = []testAppInsightsAuthConfigTestData{
{"client credentials", clientCredentialsConfig, AppInsightsInfo{ClientID: "1234", ClientPassword: "pw", TenantID: "5678"}, ""},
{"client credentials - pod id none", clientCredentialsConfig, AppInsightsInfo{ClientID: "1234", ClientPassword: "pw", TenantID: "5678"}, kedav1alpha1.PodIdentityProviderNone},
{"azure workload identity", workloadIdentityConfig, AppInsightsInfo{}, kedav1alpha1.PodIdentityProviderAzureWorkload},
}

func TestAzAppInfoGetToken(t *testing.T) {
for _, testData := range testAppInsightsAuthConfigData {
authToken, err := GetAzureADWorkloadIdentityToken(context.TODO(), testData.info.ClientID, testData.info.TenantID, "", testData.info.AppInsightsResourceURL)

if err != nil {
t.Errorf("Test %v; Expected success but got error: %v", testData.testName, err)
}
if authToken.AccessToken == "" {
t.Errorf("Test %v; Expected token but got empty token: %v", testData.testName, authToken)
}
t.Logf("Test %v; data: %v, token: %v", testData.testName, testData.info, authToken)
}
}

type toISO8601TestData struct {
testName string
isError bool
Expand Down
29 changes: 23 additions & 6 deletions pkg/scalers/rabbitmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import (
"strings"
"time"

"github.com/AzureAD/microsoft-authentication-library-for-go/apps/confidential"
"github.com/go-logr/logr"
amqp "github.com/rabbitmq/amqp091-go"
v2 "k8s.io/api/autoscaling/v2"
"k8s.io/metrics/pkg/apis/external_metrics"

"github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/scalers/azure"
"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)
Expand Down Expand Up @@ -64,7 +64,7 @@ type rabbitMQScaler struct {
connection *amqp.Connection
channel *amqp.Channel
httpClient *http.Client
azureOAuth *azure.ADWorkloadIdentityTokenProvider
azureOAuth *confidential.Client
logger logr.Logger
}

Expand Down Expand Up @@ -409,15 +409,32 @@ func getJSON(ctx context.Context, s *rabbitMQScaler, url string) (queueInfo, err

if s.metadata.WorkloadIdentityResource != "" {
if s.azureOAuth == nil {
s.azureOAuth = azure.NewAzureADWorkloadIdentityTokenProvider(ctx, s.metadata.workloadIdentityClientID, s.metadata.workloadIdentityTenantID, s.metadata.workloadIdentityAuthorityHost, s.metadata.WorkloadIdentityResource)
cred, err := confidential.NewCredFromSecret(s.metadata.workloadIdentityClientID)
if err != nil {
return result, err
}

client, err := confidential.New(
fmt.Sprintf("%s%s/oauth2/token", s.metadata.workloadIdentityAuthorityHost, s.metadata.workloadIdentityTenantID),
s.metadata.workloadIdentityClientID,
cred,
)
if err != nil {
return result, err
}

s.azureOAuth = &client
}

err = s.azureOAuth.Refresh()
token, err := s.azureOAuth.AcquireTokenSilent(ctx, []string{s.metadata.WorkloadIdentityResource})
if err != nil {
return result, err
token, err = s.azureOAuth.AcquireTokenByCredential(ctx, []string{s.metadata.WorkloadIdentityResource})
if err != nil {
return result, err
}
}

request.Header.Set("Authorization", "Bearer "+s.azureOAuth.OAuthToken())
request.Header.Set("Authorization", "Bearer "+token.AccessToken)
}

r, err := s.httpClient.Do(request)
Expand Down
58 changes: 0 additions & 58 deletions vendor/github.com/Azure/azure-amqp-common-go/v4/auth/token.go

This file was deleted.

Loading

0 comments on commit 23603d7

Please sign in to comment.