diff --git a/executor/api/kafka/server.go b/executor/api/kafka/server.go index e7d28555a3..c3517135fc 100644 --- a/executor/api/kafka/server.go +++ b/executor/api/kafka/server.go @@ -85,9 +85,16 @@ func NewKafkaServer(fullGraph bool, workers int, deploymentName, namespace, prot if util.GetKafkaSecurityProtocol() == "SSL" { sslKakfaServer := util.GetSslElements() producerConfigMap["security.protocol"] = util.GetKafkaSecurityProtocol() - producerConfigMap["ssl.ca.location"] = sslKakfaServer.CACertFile - producerConfigMap["ssl.key.location"] = sslKakfaServer.ClientKeyFile - producerConfigMap["ssl.certificate.location"] = sslKakfaServer.ClientCertFile + if sslKakfaServer.CACertFile != "" && sslKakfaServer.ClientCertFile != "" { + producerConfigMap["ssl.ca.location"] = sslKakfaServer.CACertFile + producerConfigMap["ssl.key.location"] = sslKakfaServer.ClientKeyFile + producerConfigMap["ssl.certificate.location"] = sslKakfaServer.ClientCertFile + } + if sslKakfaServer.CACert != "" && sslKakfaServer.ClientCert != "" { + producerConfigMap["ssl.ca.pem"] = sslKakfaServer.CACert + producerConfigMap["ssl.key.pem"] = sslKakfaServer.ClientKey + producerConfigMap["ssl.certificate.pem"] = sslKakfaServer.ClientCert + } producerConfigMap["ssl.key.password"] = sslKakfaServer.ClientKeyPass // Key password, if any } diff --git a/executor/api/util/utils.go b/executor/api/util/utils.go index df1eef5cad..d36060b965 100644 --- a/executor/api/util/utils.go +++ b/executor/api/util/utils.go @@ -144,6 +144,9 @@ func GetEnvAsBool(key string, fallback bool) bool { } type SslKakfa struct { + ClientCert string + ClientKey string + CACert string ClientCertFile string ClientKeyFile string CACertFile string @@ -160,10 +163,16 @@ func GetKafkaSecurityProtocol() string { func GetSslElements() *SslKakfa { sslElements := SslKakfa{ + ClientCert: GetEnv("KAFKA_SSL_CLIENT_CERT", ""), + ClientKey: GetEnv("KAFKA_SSL_CLIENT_KEY", ""), + CACert: GetEnv("KAFKA_SSL_CA_CERT", ""), + // If we use path to files instead of string ClientCertFile: GetEnv("KAFKA_SSL_CLIENT_CERT_FILE", ""), ClientKeyFile: GetEnv("KAFKA_SSL_CLIENT_KEY_FILE", ""), CACertFile: GetEnv("KAFKA_SSL_CA_CERT_FILE", ""), - ClientKeyPass: GetEnv("KAFKA_SSL_CLIENT_KEY_PASS", ""), + // Optional password + ClientKeyPass: GetEnv("KAFKA_SSL_CLIENT_KEY_PASS", ""), } return &sslElements + } diff --git a/executor/logger/worker.go b/executor/logger/worker.go index e5efb77b6a..7b30cee90f 100644 --- a/executor/logger/worker.go +++ b/executor/logger/worker.go @@ -45,10 +45,18 @@ func NewWorker(id int, workQueue chan LogRequest, log logr.Logger, sdepName stri if util.GetKafkaSecurityProtocol() == "SSL" { sslKafka := util.GetSslElements() producerConfigMap["security.protocol"] = util.GetKafkaSecurityProtocol() - producerConfigMap["ssl.ca.location"] = sslKafka.CACertFile - producerConfigMap["ssl.key.location"] = sslKafka.ClientKeyFile - producerConfigMap["ssl.certificate.location"] = sslKafka.ClientCertFile + if sslKafka.CACertFile != "" && sslKafka.ClientCertFile != "" { + producerConfigMap["ssl.ca.location"] = sslKafka.CACertFile + producerConfigMap["ssl.key.location"] = sslKafka.ClientKeyFile + producerConfigMap["ssl.certificate.location"] = sslKafka.ClientCertFile + } + if sslKafka.CACert != "" && sslKafka.ClientCert != "" { + producerConfigMap["ssl.ca.pem"] = sslKafka.CACert + producerConfigMap["ssl.key.pem"] = sslKafka.ClientKey + producerConfigMap["ssl.certificate.pem"] = sslKafka.ClientCert + } producerConfigMap["ssl.key.password"] = sslKafka.ClientKeyPass // Key password, if any + } producer, err = kafka.NewProducer(&producerConfigMap) diff --git a/executor/logger/worker_test.go b/executor/logger/worker_test.go index 517309c744..f3b495d10d 100644 --- a/executor/logger/worker_test.go +++ b/executor/logger/worker_test.go @@ -191,3 +191,70 @@ func TestWorkerKafkaConfigurations(t *testing.T) { }) } } + +func TestWorkerKafkaConfigurationsString(t *testing.T) { + type test struct { + name string + kafkaConfig kafka.ConfigMap + expectError bool + } + + g := NewWithT(t) + + tests := []test{ + { + name: "All options are valid", + kafkaConfig: kafka.ConfigMap{ + "security.protocol": "SSL", + "ssl.ca.pem": caPEM, + "ssl.key.pem": keyPEM, + "ssl.certificate.pem": certPEM, + "ssl.key.password": keyPassword, + }, + expectError: false, + }, + { + name: "CA cert location is invalid", + kafkaConfig: kafka.ConfigMap{ + "security.protocol": "SSL", + "ssl.ca.pem": "foobar", + "ssl.key.pem": keyPEM, + "ssl.certificate.pem": certPEM, + "ssl.key.password": keyPassword, + }, + expectError: true, + }, + { + name: "Private key file location is invalid", + kafkaConfig: kafka.ConfigMap{ + "security.protocol": "SSL", + "ssl.ca.pem": caPEM, + "ssl.key.pem": "foobar", + "ssl.certificate.pem": certPEM, + "ssl.key.password": keyPassword, + }, + expectError: true, + }, + { + name: "Public key certificate location is invalid", + kafkaConfig: kafka.ConfigMap{ + "security.protocol": "SSL", + "ssl.ca.pem": caPEM, + "ssl.key.pem": keyPEM, + "ssl.certificate.pem": "foobar", + "ssl.key.password": keyPassword, + }, + expectError: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := kafka.NewProducer(&tt.kafkaConfig) + if tt.expectError { + g.Expect(err).ToNot(BeNil()) + } else { + g.Expect(err).To(BeNil()) + } + }) + } +}