Skip to content

Commit

Permalink
fix: move sasl and tls flag to ScaledObject instead of specifying…
Browse files Browse the repository at this point in the history
… in TriggerAuthentication (#4322)

Signed-off-by: dttung2905 <[email protected]>
  • Loading branch information
dttung2905 authored Mar 9, 2023
1 parent b1e905c commit 846a4d0
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ Here is an overview of all new **experimental** features:
- **Azure Pipelines Scaler**: New configuration parameter `requireAllDemands` to scale only if jobs request all demands provided by the scaling definition ([#4138](https://github.com/kedacore/keda/issues/4138))
- **Hashicorp Vault**: Add support to secrets backend version 1 ([#2645](https://github.com/kedacore/keda/issues/2645))
- **Kafka Scaler**: Improve error logging for `GetBlock` method ([#4232](https://github.com/kedacore/keda/issues/4232))
- **Kafka Scaler**: Add support to use `tls` and `sasl` in ScaledObject ([#4232](https://github.com/kedacore/keda/issues/4322))
- **Prometheus Scaler**: Add custom headers and custom auth support ([#4208](https://github.com/kedacore/keda/issues/4208))
- **RabbitMQ Scaler**: Add TLS support ([#967](https://github.com/kedacore/keda/issues/967))
- **Redis Scalers**: Add support to Redis 7 ([#4052](https://github.com/kedacore/keda/issues/4052))
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/azure_blob_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func parseAzureBlobMetadata(config *ScalerConfig, logger logr.Logger) (*azure.Bl
meta.EndpointSuffix = endpointSuffix

// before triggerAuthentication CRD, pod identity was configured using this property
if val, ok := config.TriggerMetadata["useAAdPodIdentity"]; ok && config.PodIdentity.Provider == "" && val == "true" {
if val, ok := config.TriggerMetadata["useAAdPodIdentity"]; ok && config.PodIdentity.Provider == "" && val == stringTrue {
config.PodIdentity = kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderAzure}
}

Expand Down
84 changes: 62 additions & 22 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ type kafkaScaler struct {
previousOffsets map[string]map[int32]int64
}

const (
stringEnable = "enable"
stringDisable = "disable"
)

type kafkaMetadata struct {
bootstrapServers []string
group string
Expand Down Expand Up @@ -121,9 +126,23 @@ func NewKafkaScaler(config *ScalerConfig) (Scaler, error) {

func parseKafkaAuthParams(config *ScalerConfig, meta *kafkaMetadata) error {
meta.saslType = KafkaSASLTypeNone
var saslAuthType string
switch {
case config.TriggerMetadata["sasl"] != "":
saslAuthType = config.TriggerMetadata["sasl"]
default:
saslAuthType = ""
}

if val, ok := config.AuthParams["sasl"]; ok {
val = strings.TrimSpace(val)
mode := kafkaSaslType(val)
if saslAuthType != "" {
return errors.New("unable to set `sasl` in both ScaledObject and TriggerAuthentication together")
}
saslAuthType = val
}

if saslAuthType != "" {
mode := kafkaSaslType(saslAuthType)

if mode == KafkaSASLTypePlaintext || mode == KafkaSASLTypeSCRAMSHA256 || mode == KafkaSASLTypeSCRAMSHA512 || mode == KafkaSASLTypeOAuthbearer {
if config.AuthParams["username"] == "" {
Expand Down Expand Up @@ -151,30 +170,51 @@ func parseKafkaAuthParams(config *ScalerConfig, meta *kafkaMetadata) error {
}

meta.enableTLS = false
enableTLS := false
if val, ok := config.TriggerMetadata["tls"]; ok {
switch val {
case stringEnable:
enableTLS = true
case stringDisable:
enableTLS = false
default:
return fmt.Errorf("error incorrect TLS value given, got %s", val)
}
}

if val, ok := config.AuthParams["tls"]; ok {
val = strings.TrimSpace(val)
if enableTLS {
return errors.New("unable to set `tls` in both ScaledObject and TriggerAuthentication together")
}
switch val {
case stringEnable:
enableTLS = true
case stringDisable:
enableTLS = false
default:
return fmt.Errorf("error incorrect TLS value given, got %s", val)
}
}

if val == "enable" {
certGiven := config.AuthParams["cert"] != ""
keyGiven := config.AuthParams["key"] != ""
if certGiven && !keyGiven {
return errors.New("key must be provided with cert")
}
if keyGiven && !certGiven {
return errors.New("cert must be provided with key")
}
meta.ca = config.AuthParams["ca"]
meta.cert = config.AuthParams["cert"]
meta.key = config.AuthParams["key"]
if value, found := config.AuthParams["keyPassword"]; found {
meta.keyPassword = value
} else {
meta.keyPassword = ""
}
meta.enableTLS = true
} else if val != "disable" {
return fmt.Errorf("err incorrect value for TLS given: %s", val)
if enableTLS {
certGiven := config.AuthParams["cert"] != ""
keyGiven := config.AuthParams["key"] != ""
if certGiven && !keyGiven {
return errors.New("key must be provided with cert")
}
if keyGiven && !certGiven {
return errors.New("cert must be provided with key")
}
meta.ca = config.AuthParams["ca"]
meta.cert = config.AuthParams["cert"]
meta.key = config.AuthParams["key"]
if value, found := config.AuthParams["keyPassword"]; found {
meta.keyPassword = value
} else {
meta.keyPassword = ""
}
meta.enableTLS = true
}

return nil
Expand Down
103 changes: 100 additions & 3 deletions pkg/scalers/kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ type parseKafkaAuthParamsTestData struct {
enableTLS bool
}

// Testing the case where `tls` and `sasl` are specified in ScaledObject
type parseAuthParamsTestDataSecondAuthMethod struct {
metadata map[string]string
authParams map[string]string
isError bool
enableTLS bool
}

type kafkaMetricIdentifier struct {
metadataTestData *parseKafkaMetadataTestData
scalerIndex int
Expand Down Expand Up @@ -158,8 +166,65 @@ var parseKafkaAuthParamsTestDataset = []parseKafkaAuthParamsTestData{
// failure, SASL + TLS, missing key
{map[string]string{"sasl": "plaintext", "username": "admin", "password": "admin", "tls": "enable", "ca": "caaa", "cert": "ceert"}, true, false},
}
var parseAuthParamsTestDataset = []parseAuthParamsTestDataSecondAuthMethod{
// success, SASL plaintext
{map[string]string{"sasl": "plaintext", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin"}, false, false},
// success, SASL scram_sha256
{map[string]string{"sasl": "scram_sha256", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin"}, false, false},
// success, SASL scram_sha512
{map[string]string{"sasl": "scram_sha512", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin"}, false, false},
// success, TLS only
{map[string]string{"tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"ca": "caaa", "cert": "ceert", "key": "keey"}, false, true},
// success, TLS cert/key and assumed public CA
{map[string]string{"tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"cert": "ceert", "key": "keey"}, false, true},
// success, TLS cert/key + key password and assumed public CA
{map[string]string{"tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"cert": "ceert", "key": "keey", "keyPassword": "keeyPassword"}, false, true},
// success, TLS CA only
{map[string]string{"tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"ca": "caaa"}, false, true},
// success, SASL + TLS
{map[string]string{"sasl": "plaintext", "tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "ca": "caaa", "cert": "ceert", "key": "keey"}, false, true},
// success, SASL + TLS explicitly disabled
{map[string]string{"sasl": "plaintext", "tls": "disable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin"}, false, false},
// success, SASL OAUTHBEARER + TLS explicitly disabled
{map[string]string{"sasl": "oauthbearer", "tls": "disable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "https://website.com"}, false, false},
// failure, SASL OAUTHBEARER + TLS explicitly disable + bad SASL type
{map[string]string{"sasl": "foo", "tls": "disable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "https://website.com"}, true, false},
// success, SASL OAUTHBEARER + TLS missing scope
{map[string]string{"sasl": "oauthbearer", "tls": "disable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "oauthTokenEndpointUri": "https://website.com"}, false, false},
// failure, SASL OAUTHBEARER + TLS missing oauthTokenEndpointUri
{map[string]string{"sasl": "oauthbearer", "tls": "disable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": ""}, true, false},
// failure, SASL incorrect type
{map[string]string{"sasl": "foo", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin"}, true, false},
// failure, SASL missing username
{map[string]string{"sasl": "plaintext", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"password": "admin"}, true, false},
// failure, SASL missing password
{map[string]string{"sasl": "plaintext", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin"}, true, false},
// failure, TLS missing cert
{map[string]string{"tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"ca": "caaa", "key": "keey"}, true, false},
// failure, TLS missing key
{map[string]string{"tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"ca": "caaa", "cert": "ceert"}, true, false},
// failure, TLS invalid
{map[string]string{"tls": "random", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"ca": "caaa", "cert": "ceert", "key": "keey"}, true, false},
// failure, SASL + TLS, incorrect SASL type
{map[string]string{"sasl": "foo", "tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, false},
// failure, SASL + TLS, incorrect tls
{map[string]string{"sasl": "plaintext", "tls": "foo", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, false},
// failure, SASL + TLS, missing username
{map[string]string{"sasl": "plaintext", "tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"password": "admin", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, false},
// failure, SASL + TLS, missing password
{map[string]string{"sasl": "plaintext", "tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, false},
// failure, SASL + TLS, missing cert
{map[string]string{"sasl": "plaintext", "tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "ca": "caaa", "key": "keey"}, true, false},
// failure, SASL + TLS, missing key
{map[string]string{"sasl": "plaintext", "tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"sasl": "plaintext", "username": "admin", "password": "admin", "ca": "caaa", "cert": "ceert"}, true, false},

// failure, setting SASL values in both places
{map[string]string{"sasl": "scram_sha512", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"sasl": "scram_sha512", "username": "admin", "password": "admin"}, true, false},
// failure, setting TLS values in both places
{map[string]string{"tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, true},
}

var parseKafkaOAuthbreakerAuthParamsTestDataset = []parseKafkaAuthParamsTestData{
var parseKafkaOAuthbrearerAuthParamsTestDataset = []parseKafkaAuthParamsTestData{
// success, SASL OAUTHBEARER + TLS
{map[string]string{"sasl": "oauthbearer", "username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "https://website.com", "tls": "disable"}, false, false},
// success, SASL OAUTHBEARER + TLS multiple scopes
Expand Down Expand Up @@ -243,6 +308,7 @@ func TestGetBrokers(t *testing.T) {
}

func TestKafkaAuthParams(t *testing.T) {
// Testing tls and sasl value in TriggerAuthentication
for _, testData := range parseKafkaAuthParamsTestDataset {
meta, err := parseKafkaMetadata(&ScalerConfig{TriggerMetadata: validKafkaMetadata, AuthParams: testData.authParams}, logr.Discard())

Expand Down Expand Up @@ -270,10 +336,41 @@ func TestKafkaAuthParams(t *testing.T) {
}
}
}

// Testing tls and sasl value in scaledObject
for id, testData := range parseAuthParamsTestDataset {
meta, err := parseKafkaMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams}, logr.Discard())

if err != nil && !testData.isError {
t.Errorf("Test case: %v. Expected success but got error %v", id, err)
}
if testData.isError && err == nil {
t.Errorf("Test case: %v. Expected error but got success", id)
}
if !testData.isError {
if testData.metadata["tls"] == "true" && !meta.enableTLS {
t.Errorf("Test case: %v. Expected tls to be set to %v but got %v\n", id, testData.metadata["tls"], meta.enableTLS)
}
if meta.enableTLS {
if meta.ca != testData.authParams["ca"] {
t.Errorf("Test case: %v. Expected ca to be set to %v but got %v\n", id, testData.authParams["ca"], meta.ca)
}
if meta.cert != testData.authParams["cert"] {
t.Errorf("Test case: %v. Expected cert to be set to %v but got %v\n", id, testData.authParams["cert"], meta.cert)
}
if meta.key != testData.authParams["key"] {
t.Errorf("Test case: %v. Expected key to be set to %v but got %v\n", id, testData.authParams["key"], meta.key)
}
if meta.keyPassword != testData.authParams["keyPassword"] {
t.Errorf("Test case: %v. Expected key to be set to %v but got %v\n", id, testData.authParams["keyPassword"], meta.keyPassword)
}
}
}
}
}

func TestKafkaOAuthbreakerAuthParams(t *testing.T) {
for _, testData := range parseKafkaOAuthbreakerAuthParamsTestDataset {
func TestKafkaOAuthbrearerAuthParams(t *testing.T) {
for _, testData := range parseKafkaOAuthbrearerAuthParamsTestDataset {
meta, err := parseKafkaMetadata(&ScalerConfig{TriggerMetadata: validKafkaMetadata, AuthParams: testData.authParams}, logr.Discard())

if err != nil && !testData.isError {
Expand Down

0 comments on commit 846a4d0

Please sign in to comment.