Skip to content

Commit

Permalink
feat: Add OAuth extensions for kafka scaler
Browse files Browse the repository at this point in the history
Signed-off-by: qvalentin <[email protected]>
  • Loading branch information
qvalentin committed Apr 28, 2023
1 parent 57908a4 commit 37fe668
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **CPU/Memory scaler**: Add support for scale to zero if there are multiple triggers([#4269](https://github.com/kedacore/keda/issues/4269))
- **Redis Scalers**: Allow scaling using redis stream length ([#4277](https://github.com/kedacore/keda/issues/4277))
- **General:** Introduce new Solr Scaler ([#4234](https://github.com/kedacore/keda/issues/4234))
- **Kafka Scaler:** Add support for OAuth extensions ([#4486](https://github.com/kedacore/keda/pull/4486))

### Improvements

Expand Down
15 changes: 14 additions & 1 deletion pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type kafkaMetadata struct {
// OAUTHBEARER
scopes []string
oauthTokenEndpointURI string
extensions map[string]string

// TLS
enableTLS bool
Expand Down Expand Up @@ -163,6 +164,18 @@ func parseKafkaAuthParams(config *ScalerConfig, meta *kafkaMetadata) error {
return errors.New("no oauth token endpoint uri given")
}
meta.oauthTokenEndpointURI = strings.TrimSpace(config.AuthParams["oauthTokenEndpointUri"])

meta.extensions = make(map[string]string)
for _, extension := range strings.Split(config.AuthParams["extensions"], ",") {
if extension == "" {
continue
}
splittedExtension := strings.Split(extension, "=")
if len(splittedExtension) != 2 {
return errors.New("invalid OAuthBearer extension, must be of format key=value")
}
meta.extensions[splittedExtension[0]] = splittedExtension[1]
}
}
} else {
return fmt.Errorf("err SASL mode %s given", mode)
Expand Down Expand Up @@ -382,7 +395,7 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin

if metadata.saslType == KafkaSASLTypeOAuthbearer {
config.Net.SASL.Mechanism = sarama.SASLTypeOAuth
config.Net.SASL.TokenProvider = OAuthBearerTokenProvider(metadata.username, metadata.password, metadata.oauthTokenEndpointURI, metadata.scopes)
config.Net.SASL.TokenProvider = OAuthBearerTokenProvider(metadata.username, metadata.password, metadata.oauthTokenEndpointURI, metadata.scopes, metadata.extensions)
}

client, err := sarama.NewClient(metadata.bootstrapServers, config)
Expand Down
6 changes: 4 additions & 2 deletions pkg/scalers/kafka_scaler_oauth_token_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (

type TokenProvider struct {
tokenSource oauth2.TokenSource
extensions map[string]string
}

func OAuthBearerTokenProvider(clientID, clientSecret, tokenURL string, scopes []string) sarama.AccessTokenProvider {
func OAuthBearerTokenProvider(clientID, clientSecret, tokenURL string, scopes []string, extensions map[string]string) sarama.AccessTokenProvider {
cfg := clientcredentials.Config{
ClientID: clientID,
ClientSecret: clientSecret,
Expand All @@ -22,6 +23,7 @@ func OAuthBearerTokenProvider(clientID, clientSecret, tokenURL string, scopes []

return &TokenProvider{
tokenSource: cfg.TokenSource(context.Background()),
extensions: extensions,
}
}

Expand All @@ -31,5 +33,5 @@ func (t *TokenProvider) Token() (*sarama.AccessToken, error) {
return nil, err
}

return &sarama.AccessToken{Token: token.AccessToken}, nil
return &sarama.AccessToken{Token: token.AccessToken, Extensions: t.extensions}, nil
}
11 changes: 11 additions & 0 deletions pkg/scalers/kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ var parseKafkaOAuthbrearerAuthParamsTestDataset = []parseKafkaAuthParamsTestData
{map[string]string{"sasl": "foo", "username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "https://website.com", "tls": "disable"}, true, false},
// failure, SASL OAUTHBEARER + TLS missing oauthTokenEndpointUri
{map[string]string{"sasl": "oauthbearer", "username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "", "tls": "disable"}, true, false},
// success, SASL OAUTHBEARER + extension
{map[string]string{"sasl": "oauthbearer", "username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "https://website.com", "tls": "disable", "extensions": "extension_foo=bar"}, false, false},
// success, SASL OAUTHBEARER + multiple extensions
{map[string]string{"sasl": "oauthbearer", "username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "https://website.com", "tls": "disable", "extensions": "extension_foo=bar,extension_baz=baz"}, false, false},
// failure, SASL OAUTHBEARER + bad extension
{map[string]string{"sasl": "oauthbearer", "username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "https://website.com", "tls": "disable", "extensions": "extension_foo=bar,extension_bazbaz"}, true, false},
}

var kafkaMetricIdentifiers = []kafkaMetricIdentifier{
Expand Down Expand Up @@ -384,6 +390,11 @@ func TestKafkaOAuthbrearerAuthParams(t *testing.T) {
t.Errorf("Expected scopes to be set to %v but got %v\n", strings.Count(testData.authParams["scopes"], ","), len(meta.scopes))
}
}
if err == nil && testData.authParams["extensions"] != "" {
if len(meta.extensions) != strings.Count(testData.authParams["extensions"], ",")+1 {
t.Errorf("Expected number of extensions to be set to %v but got %v\n", strings.Count(testData.authParams["extensions"], ",")+1, len(meta.extensions))
}
}
}
}

Expand Down

0 comments on commit 37fe668

Please sign in to comment.