Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to use PEM string for SSL #3868

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions executor/api/kafka/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,16 @@ func NewKafkaServer(fullGraph bool, workers int, deploymentName, namespace, prot
if util.GetKafkaSecurityProtocol() == "SSL" {
sslKakfaServer := util.GetSslElements()
producerConfigMap["security.protocol"] = util.GetKafkaSecurityProtocol()
producerConfigMap["ssl.ca.location"] = sslKakfaServer.CACertFile
producerConfigMap["ssl.key.location"] = sslKakfaServer.ClientKeyFile
producerConfigMap["ssl.certificate.location"] = sslKakfaServer.ClientCertFile
if sslKakfaServer.CACertFile != "" && sslKakfaServer.ClientCertFile != "" {
producerConfigMap["ssl.ca.location"] = sslKakfaServer.CACertFile
producerConfigMap["ssl.key.location"] = sslKakfaServer.ClientKeyFile
producerConfigMap["ssl.certificate.location"] = sslKakfaServer.ClientCertFile
}
if sslKakfaServer.CACert != "" && sslKakfaServer.ClientCert != "" {
producerConfigMap["ssl.ca.pem"] = sslKakfaServer.CACert
producerConfigMap["ssl.key.pem"] = sslKakfaServer.ClientKey
producerConfigMap["ssl.certificate.pem"] = sslKakfaServer.ClientCert
}
producerConfigMap["ssl.key.password"] = sslKakfaServer.ClientKeyPass // Key password, if any

}
Expand Down
11 changes: 10 additions & 1 deletion executor/api/util/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ func GetEnvAsBool(key string, fallback bool) bool {
}

type SslKakfa struct {
ClientCert string
ClientKey string
CACert string
ClientCertFile string
ClientKeyFile string
CACertFile string
Expand All @@ -160,10 +163,16 @@ func GetKafkaSecurityProtocol() string {

func GetSslElements() *SslKakfa {
sslElements := SslKakfa{
ClientCert: GetEnv("KAFKA_SSL_CLIENT_CERT", ""),
ClientKey: GetEnv("KAFKA_SSL_CLIENT_KEY", ""),
CACert: GetEnv("KAFKA_SSL_CA_CERT", ""),
// If we use path to files instead of string
ClientCertFile: GetEnv("KAFKA_SSL_CLIENT_CERT_FILE", ""),
ClientKeyFile: GetEnv("KAFKA_SSL_CLIENT_KEY_FILE", ""),
CACertFile: GetEnv("KAFKA_SSL_CA_CERT_FILE", ""),
ClientKeyPass: GetEnv("KAFKA_SSL_CLIENT_KEY_PASS", ""),
// Optional password
ClientKeyPass: GetEnv("KAFKA_SSL_CLIENT_KEY_PASS", ""),
Copy link
Contributor

@ukclivecox ukclivecox Jan 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume if password is set to empty string as no env setting its ok later when its set in Kafka options? Will it be ignored?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will be ignored 😃 It has been tested in the worker_test.go file

}
return &sslElements

}
14 changes: 11 additions & 3 deletions executor/logger/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,18 @@ func NewWorker(id int, workQueue chan LogRequest, log logr.Logger, sdepName stri
if util.GetKafkaSecurityProtocol() == "SSL" {
sslKafka := util.GetSslElements()
producerConfigMap["security.protocol"] = util.GetKafkaSecurityProtocol()
producerConfigMap["ssl.ca.location"] = sslKafka.CACertFile
producerConfigMap["ssl.key.location"] = sslKafka.ClientKeyFile
producerConfigMap["ssl.certificate.location"] = sslKafka.ClientCertFile
if sslKafka.CACertFile != "" && sslKafka.ClientCertFile != "" {
producerConfigMap["ssl.ca.location"] = sslKafka.CACertFile
producerConfigMap["ssl.key.location"] = sslKafka.ClientKeyFile
producerConfigMap["ssl.certificate.location"] = sslKafka.ClientCertFile
}
if sslKafka.CACert != "" && sslKafka.ClientCert != "" {
producerConfigMap["ssl.ca.pem"] = sslKafka.CACert
producerConfigMap["ssl.key.pem"] = sslKafka.ClientKey
producerConfigMap["ssl.certificate.pem"] = sslKafka.ClientCert
}
producerConfigMap["ssl.key.password"] = sslKafka.ClientKeyPass // Key password, if any

}

producer, err = kafka.NewProducer(&producerConfigMap)
Expand Down
67 changes: 67 additions & 0 deletions executor/logger/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,70 @@ func TestWorkerKafkaConfigurations(t *testing.T) {
})
}
}

func TestWorkerKafkaConfigurationsString(t *testing.T) {
type test struct {
name string
kafkaConfig kafka.ConfigMap
expectError bool
}

g := NewWithT(t)

tests := []test{
{
name: "All options are valid",
kafkaConfig: kafka.ConfigMap{
"security.protocol": "SSL",
"ssl.ca.pem": caPEM,
"ssl.key.pem": keyPEM,
"ssl.certificate.pem": certPEM,
"ssl.key.password": keyPassword,
},
expectError: false,
},
{
name: "CA cert location is invalid",
kafkaConfig: kafka.ConfigMap{
"security.protocol": "SSL",
"ssl.ca.pem": "foobar",
"ssl.key.pem": keyPEM,
"ssl.certificate.pem": certPEM,
"ssl.key.password": keyPassword,
},
expectError: true,
},
{
name: "Private key file location is invalid",
kafkaConfig: kafka.ConfigMap{
"security.protocol": "SSL",
"ssl.ca.pem": caPEM,
"ssl.key.pem": "foobar",
"ssl.certificate.pem": certPEM,
"ssl.key.password": keyPassword,
},
expectError: true,
},
{
name: "Public key certificate location is invalid",
kafkaConfig: kafka.ConfigMap{
"security.protocol": "SSL",
"ssl.ca.pem": caPEM,
"ssl.key.pem": keyPEM,
"ssl.certificate.pem": "foobar",
"ssl.key.password": keyPassword,
},
expectError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := kafka.NewProducer(&tt.kafkaConfig)
if tt.expectError {
g.Expect(err).ToNot(BeNil())
} else {
g.Expect(err).To(BeNil())
}
})
}
}