diff --git a/executor/api/kafka/server.go b/executor/api/kafka/server.go index bf5349829b..eb12714987 100644 --- a/executor/api/kafka/server.go +++ b/executor/api/kafka/server.go @@ -87,7 +87,7 @@ func NewKafkaServer(fullGraph bool, workers int, deploymentName, namespace, prot "go.delivery.reports": false, // Need this othewise will get memory leak } if broker != "" { - if util.GetKafkaSecurityProtocol() == "SSL" { + if util.GetKafkaSecurityProtocol() == "SSL" || util.GetKafkaSecurityProtocol() == "SASL_SSL" { sslKakfaServer := util.GetSslElements() producerConfigMap["security.protocol"] = util.GetKafkaSecurityProtocol() if sslKakfaServer.CACertFile != "" && sslKakfaServer.ClientCertFile != "" { @@ -101,7 +101,22 @@ func NewKafkaServer(fullGraph bool, workers int, deploymentName, namespace, prot producerConfigMap["ssl.certificate.pem"] = sslKakfaServer.ClientCert } producerConfigMap["ssl.key.password"] = sslKakfaServer.ClientKeyPass // Key password, if any - + if util.GetKafkaSecurityProtocol() == "SASL_SSL" { //if we also have SASL enabled, then we need to provide the necessary settings in addition to SSL + saslKafkaServer := util.GetSaslElements() + producerConfigMap["sasl.mechanisms"] = saslKafkaServer.Mechanism + if saslKafkaServer.UserName != "" && saslKafkaServer.Password != "" { + producerConfigMap["sasl.username"] = saslKafkaServer.UserName + producerConfigMap["sasl.password"] = saslKafkaServer.Password + } + } + } + if util.GetKafkaSecurityProtocol() == "SASL_PLAIN" || util.GetKafkaSecurityProtocol() == "PLAIN" { //if we also have SASL enabled, then we need to provide the necessary (no SSL) + saslKafkaServer := util.GetSaslElements() + producerConfigMap["sasl.mechanisms"] = saslKafkaServer.Mechanism + if saslKafkaServer.UserName != "" && saslKafkaServer.Password != "" { + producerConfigMap["sasl.username"] = saslKafkaServer.UserName + producerConfigMap["sasl.password"] = saslKafkaServer.Password + } } } @@ -180,7 +195,7 @@ func (ks *SeldonKafkaServer) Serve() error { "auto.offset.reset": "earliest", } - if util.GetKafkaSecurityProtocol() == "SSL" { + if util.GetKafkaSecurityProtocol() == "SSL" || util.GetKafkaSecurityProtocol() == "SASL_SSL" { sslKakfaServer := util.GetSslElements() consumerConfigMap["security.protocol"] = util.GetKafkaSecurityProtocol() if sslKakfaServer.CACertFile != "" && sslKakfaServer.ClientCertFile != "" { @@ -194,6 +209,14 @@ func (ks *SeldonKafkaServer) Serve() error { consumerConfigMap["ssl.certificate.pem"] = sslKakfaServer.ClientCert } consumerConfigMap["ssl.key.password"] = sslKakfaServer.ClientKeyPass // Key password, if any + if util.GetKafkaSecurityProtocol() == "SASL_SSL" { //if we also have SASL enabled, then we need to provide the necessary settings in addition to SSL + saslKafkaServer := util.GetSaslElements() + consumerConfigMap["sasl.mechanisms"] = saslKafkaServer.Mechanism + if saslKafkaServer.UserName != "" && saslKafkaServer.Password != "" { + consumerConfigMap["sasl.username"] = saslKafkaServer.UserName + consumerConfigMap["sasl.password"] = saslKafkaServer.Password + } + } } c, err := kafka.NewConsumer(&consumerConfigMap) diff --git a/executor/api/util/utils.go b/executor/api/util/utils.go index d36060b965..dde954c44b 100644 --- a/executor/api/util/utils.go +++ b/executor/api/util/utils.go @@ -157,6 +157,16 @@ func (o SslKakfa) String() string { return "SslKakfa" } +type SaslKafka struct { + UserName string + Password string + Mechanism string +} + +func (o SaslKafka) String() string { + return "SaslKafka" +} + func GetKafkaSecurityProtocol() string { return strings.ToUpper(GetEnv("KAFKA_SECURITY_PROTOCOL", "")) } @@ -176,3 +186,12 @@ func GetSslElements() *SslKakfa { return &sslElements } + +func GetSaslElements() *SaslKafka { + saslElements := SaslKafka{ + UserName: GetEnv("KAFKA_SASL_USERNAME", ""), + Password: GetEnv("KAFKA_SASL_PASSWORD", ""), + Mechanism: GetEnv("KAFKA_SASL_MECHANISM", ""), + } + return &saslElements +} diff --git a/executor/logger/worker.go b/executor/logger/worker.go index f5d735d1df..aa6e2df244 100644 --- a/executor/logger/worker.go +++ b/executor/logger/worker.go @@ -57,7 +57,22 @@ func NewWorker(id int, workQueue chan LogRequest, log logr.Logger, sdepName stri producerConfigMap["ssl.certificate.pem"] = sslKafka.ClientCert } producerConfigMap["ssl.key.password"] = sslKafka.ClientKeyPass // Key password, if any - + if util.GetKafkaSecurityProtocol() == "SASL_SSL" { //if we also have SASL enabled, then we need to provide the necessary settings in addition to SSL + saslKafkaServer := util.GetSaslElements() + producerConfigMap["sasl.mechanisms"] = saslKafkaServer.Mechanism + if saslKafkaServer.UserName != "" && saslKafkaServer.Password != "" { + producerConfigMap["sasl.username"] = saslKafkaServer.UserName + producerConfigMap["sasl.password"] = saslKafkaServer.Password + } + } + } + if util.GetKafkaSecurityProtocol() == "SASL_PLAIN" || util.GetKafkaSecurityProtocol() == "PLAIN" { //if we also have SASL enabled, then we need to provide the necessary (no SSL) + saslKafkaServer := util.GetSaslElements() + producerConfigMap["sasl.mechanisms"] = saslKafkaServer.Mechanism + if saslKafkaServer.UserName != "" && saslKafkaServer.Password != "" { + producerConfigMap["sasl.username"] = saslKafkaServer.UserName + producerConfigMap["sasl.password"] = saslKafkaServer.Password + } } producer, err = kafka.NewProducer(&producerConfigMap)