diff --git a/doc/source/analytics/logging.md b/doc/source/analytics/logging.md index 978777455b..251ef61fb0 100644 --- a/doc/source/analytics/logging.md +++ b/doc/source/analytics/logging.md @@ -80,6 +80,60 @@ The two required environment variables are: * LOGGER_KAFKA_BROKER : The Kafka Broker service endpoint. * LOGGER_KAFKA_TOPIC : The kafka Topic to log the requests. +### Logging to encrypted Kafka with SSL + +You can log requests to an encrypted Kafka with SSL. SSL uses private-key/ certificate pairs, which are used during the SSL handshake process. + +To be able to log payloads, the client needs: +* to authenticate with SSL +* its own keystore, made up of a key pair and a signed certificate +* the CA certificate used to sign the key-certificate pair + +The CA certificate needs to be recognised by the broker and can also be used for verifying the broker's certificate. + +It is possible to read more about the different options available on the [Confluent documentation](https://docs.confluent.io/platform/current/kafka/authentication_ssl.html) and [librdkafka Configuration](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) pages. + +Here is an example on how to define these for a deployment: + +```yaml +apiVersion: machinelearning.seldon.io/v1 +kind: SeldonDeployment +metadata: + name: cifar10 + namespace: seldon +spec: + name: resnet32 + predictors: + - graph: + implementation: TRITON_SERVER + logger: + mode: all + modelUri: gs://seldon-models/triton/tf_cifar10 + name: cifar10 + name: default + svcOrchSpec: + env: + - name: LOGGER_KAFKA_BROKER + value: seldon-kafka-plain-0.kafka:9092 + - name: LOGGER_KAFKA_TOPIC + value: seldon + - name: KAFKA_SECURITY_PROTOCOL + value: ssl + - name: KAFKA_SSL_CA_CERT_FILE + value: /path/to/ca.pem + - name: KAFKA_SSL_CLIENT_CERT_FILE + value: /path/to/access.cert + - name: KAFKA_SSL_CLIENT_KEY_FILE + value: /path/to/access.key + - name: KAFKA_SSL_CLIENT_KEY_PASS + valueFrom: + secretKeyRef: + name: my-kafka-secret + key: ssl-password # Key password, if any (optional field) + replicas: 1 + protocol: kfserving + +``` Follow a [benchmarking notebook for CIFAR10 image payload logging showing 3K predictions per second with Triton Inference Server](../examples/kafka_logger.html). ## Setting Global Default diff --git a/executor/.gitignore b/executor/.gitignore index 1e81bc847d..0ce5804b55 100644 --- a/executor/.gitignore +++ b/executor/.gitignore @@ -10,3 +10,4 @@ openapi/ executor/api/rest/openapi/ triton-inference-server/ _operator +certs/ diff --git a/executor/api/kafka/server.go b/executor/api/kafka/server.go index f428505080..e7d28555a3 100644 --- a/executor/api/kafka/server.go +++ b/executor/api/kafka/server.go @@ -2,6 +2,13 @@ package kafka import ( "fmt" + "net/url" + "os" + "os/signal" + "reflect" + "syscall" + "time" + "github.com/cloudevents/sdk-go/pkg/bindings/http" "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/go-logr/logr" @@ -13,14 +20,9 @@ import ( "github.com/seldonio/seldon-core/executor/api/grpc/tensorflow" "github.com/seldonio/seldon-core/executor/api/payload" "github.com/seldonio/seldon-core/executor/api/rest" + "github.com/seldonio/seldon-core/executor/api/util" "github.com/seldonio/seldon-core/executor/predictor" v1 "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1" - "net/url" - "os" - "os/signal" - "reflect" - "syscall" - "time" ) const ( @@ -76,12 +78,23 @@ func NewKafkaServer(fullGraph bool, workers int, deploymentName, namespace, prot return nil, fmt.Errorf("Unknown transport %s", transport) } } + var producerConfigMap = kafka.ConfigMap{"bootstrap.servers": broker, + "go.delivery.reports": false, // Need this othewise will get memory leak + } + if broker != "" { + if util.GetKafkaSecurityProtocol() == "SSL" { + sslKakfaServer := util.GetSslElements() + producerConfigMap["security.protocol"] = util.GetKafkaSecurityProtocol() + producerConfigMap["ssl.ca.location"] = sslKakfaServer.CACertFile + producerConfigMap["ssl.key.location"] = sslKakfaServer.ClientKeyFile + producerConfigMap["ssl.certificate.location"] = sslKakfaServer.ClientCertFile + producerConfigMap["ssl.key.password"] = sslKakfaServer.ClientKeyPass // Key password, if any + } + } // Create Producer log.Info("Creating producer", "broker", broker) - p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker, - "go.delivery.reports": false, // Need this othewise will get memory leak - }) + p, err := kafka.NewProducer(&producerConfigMap) if err != nil { return nil, err } diff --git a/executor/api/util/utils.go b/executor/api/util/utils.go index e8e8e46d72..df1eef5cad 100644 --- a/executor/api/util/utils.go +++ b/executor/api/util/utils.go @@ -4,6 +4,7 @@ import ( "encoding/json" "os" "strconv" + "strings" "github.com/golang/protobuf/jsonpb" "github.com/seldonio/seldon-core/executor/api/grpc/seldon/proto" @@ -141,3 +142,28 @@ func GetEnvAsBool(key string, fallback bool) bool { return fallback } + +type SslKakfa struct { + ClientCertFile string + ClientKeyFile string + CACertFile string + ClientKeyPass string +} + +func (o SslKakfa) String() string { + return "SslKakfa" +} + +func GetKafkaSecurityProtocol() string { + return strings.ToUpper(GetEnv("KAFKA_SECURITY_PROTOCOL", "")) +} + +func GetSslElements() *SslKakfa { + sslElements := SslKakfa{ + ClientCertFile: GetEnv("KAFKA_SSL_CLIENT_CERT_FILE", ""), + ClientKeyFile: GetEnv("KAFKA_SSL_CLIENT_KEY_FILE", ""), + CACertFile: GetEnv("KAFKA_SSL_CA_CERT_FILE", ""), + ClientKeyPass: GetEnv("KAFKA_SSL_CLIENT_KEY_PASS", ""), + } + return &sslElements +} diff --git a/executor/api/util/utils_test.go b/executor/api/util/utils_test.go index 3006c57f2f..6dcc5a9a25 100644 --- a/executor/api/util/utils_test.go +++ b/executor/api/util/utils_test.go @@ -141,3 +141,10 @@ func TestInjectRouteSeldonJson(t *testing.T) { g.Expect(routes).To(Equal(testRouting)) } + +func TestSSLSecurityProtocol(t *testing.T) { + g := NewGomegaWithT(t) + os.Setenv("KAFKA_SECURITY_PROTOCOL", "ssl") + val := GetKafkaSecurityProtocol() + g.Expect(val).To(Equal("SSL")) +} diff --git a/executor/logger/dispatcher.go b/executor/logger/dispatcher.go index 07126dac53..a130b8565f 100644 --- a/executor/logger/dispatcher.go +++ b/executor/logger/dispatcher.go @@ -1,8 +1,9 @@ package logger import ( - "github.com/go-logr/logr" "os" + + "github.com/go-logr/logr" ) const ( @@ -25,7 +26,6 @@ func StartDispatcher(nworkers int, logBufferSize int, writeTimeoutMs int, log lo workQueue = make(chan LogRequest, logBufferSize) writeTimeoutMilliseconds = writeTimeoutMs - // Now, create all of our workers. for i := 0; i < nworkers; i++ { log.Info("Starting", "worker", i+1) diff --git a/executor/logger/worker.go b/executor/logger/worker.go index 790ee9510d..e5efb77b6a 100644 --- a/executor/logger/worker.go +++ b/executor/logger/worker.go @@ -12,6 +12,7 @@ import ( "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/go-logr/logr" "github.com/seldonio/seldon-core/executor/api/payload" + "github.com/seldonio/seldon-core/executor/api/util" ) const ( @@ -37,10 +38,20 @@ func NewWorker(id int, workQueue chan LogRequest, log logr.Logger, sdepName stri var err error if kafkaBroker != "" { log.Info("Creating producer", "broker", kafkaBroker, "topic", kafkaTopic) - producer, err = kafka.NewProducer(&kafka.ConfigMap{ - "bootstrap.servers": kafkaBroker, + var producerConfigMap = kafka.ConfigMap{"bootstrap.servers": kafkaBroker, "go.delivery.reports": false, // Need this othewise will get memory leak - }) + } + log.Info("kafkaSecurityProtocol", "kafkaSecurityProtocol", util.GetKafkaSecurityProtocol()) + if util.GetKafkaSecurityProtocol() == "SSL" { + sslKafka := util.GetSslElements() + producerConfigMap["security.protocol"] = util.GetKafkaSecurityProtocol() + producerConfigMap["ssl.ca.location"] = sslKafka.CACertFile + producerConfigMap["ssl.key.location"] = sslKafka.ClientKeyFile + producerConfigMap["ssl.certificate.location"] = sslKafka.ClientCertFile + producerConfigMap["ssl.key.password"] = sslKafka.ClientKeyPass // Key password, if any + } + + producer, err = kafka.NewProducer(&producerConfigMap) if err != nil { return nil, err } @@ -94,7 +105,6 @@ func getCEType(logReq LogRequest) (string, error) { } func (w *Worker) sendKafkaEvent(logReq LogRequest) error { - reqType, err := getCEType(logReq) if err != nil { return err @@ -109,7 +119,7 @@ func (w *Worker) sendKafkaEvent(logReq LogRequest) error { {Key: NamespaceAttr, Value: []byte(w.Namespace)}, {Key: EndpointAttr, Value: []byte(w.PredictorName)}, } - + w.Log.Info("kafkaHeaders is", "kafkaHeaders", kafkaHeaders) err = w.Producer.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &w.KafkaTopic, Partition: kafka.PartitionAny}, Value: *logReq.Bytes, diff --git a/executor/logger/worker_test.go b/executor/logger/worker_test.go new file mode 100644 index 0000000000..517309c744 --- /dev/null +++ b/executor/logger/worker_test.go @@ -0,0 +1,193 @@ +package logger + +import ( + "io/ioutil" + "log" + "os" + + _ "net/http/pprof" + "testing" + + "github.com/confluentinc/confluent-kafka-go/kafka" + . "github.com/onsi/gomega" +) + +const ( + certPEM = `-----BEGIN CERTIFICATE----- +MIID2zCCAsOgAwIBAgIJAMSqbewCgw4xMA0GCSqGSIb3DQEBCwUAMGAxCzAJBgNV +BAYTAlVTMRMwEQYDVQQIDApDYWxpZm9ybmlhMRYwFAYDVQQHDA1TYW4gRnJhbmNp +c2NvMRAwDgYDVQQKDAdTZWdtZW50MRIwEAYDVQQDDAlsb2NhbGhvc3QwHhcNMTcx +MjIzMTU1NzAxWhcNMjcxMjIxMTU1NzAxWjBgMQswCQYDVQQGEwJVUzETMBEGA1UE +CAwKQ2FsaWZvcm5pYTEWMBQGA1UEBwwNU2FuIEZyYW5jaXNjbzEQMA4GA1UECgwH +U2VnbWVudDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOC +AQ8AMIIBCgKCAQEAtda9OWKYNtINe/BKAoB+/zLg2qbaTeHN7L722Ug7YoY6zMVB +aQEHrUmshw/TOrT7GLN/6e6rFN74UuNg72C1tsflZvxqkGdrup3I3jxMh2ApAxLi +zem/M6Eke2OAqt+SzRPqc5GXH/nrWVd3wqg48DZOAR0jVTY2e0fWy+Er/cPJI1lc +L6ZMIRJikHTXkaiFj2Jct1iWvgizx5HZJBxXJn2Awix5nvc+zmXM0ZhoedbJRoBC +dGkRXd3xv2F4lqgVHtP3Ydjc/wYoPiGudSAkhyl9tnkHjvIjA/LeRNshWHbCIaQX +yemnXIcyyf+W+7EK0gXio7uiP+QSoM5v/oeVMQIDAQABo4GXMIGUMHoGA1UdIwRz +MHGhZKRiMGAxCzAJBgNVBAYTAlVTMRMwEQYDVQQIDApDYWxpZm9ybmlhMRYwFAYD +VQQHDA1TYW4gRnJhbmNpc2NvMRAwDgYDVQQKDAdTZWdtZW50MRIwEAYDVQQDDAls +b2NhbGhvc3SCCQCBYUuEuypDMTAJBgNVHRMEAjAAMAsGA1UdDwQEAwIE8DANBgkq +hkiG9w0BAQsFAAOCAQEATk6IlVsXtNp4C1yeegaM+jE8qgKJfNm1sV27zKx8HPiO +F7LvTGYIG7zd+bf3pDSwRxfBhsLEwmN9TUN1d6Aa9zeu95qOnR76POfHILgttu2w +IzegO8I7BycnLjU9o/l9gCpusnN95tIYQhfD08ygUpYTQRuI0cmZ/Dp3xb0S9f5N +miYTuUoStYSA4RWbDWo+Is9YWPu7rwieziOZ96oguGz3mtqvkjxVAQH1xZr3bKHr +HU9LpQh0i6oTK0UCqnDwlhJl1c7A3UooxFpc3NGxyjogzTfI/gnBKfPo7eeswwsV +77rjIkhBW49L35KOo1uyblgK1vTT7VPtzJnuDq3ORg== +-----END CERTIFICATE-----` + + keyPEM = `-----BEGIN RSA PRIVATE KEY----- +MIIEowIBAAKCAQEAtda9OWKYNtINe/BKAoB+/zLg2qbaTeHN7L722Ug7YoY6zMVB +aQEHrUmshw/TOrT7GLN/6e6rFN74UuNg72C1tsflZvxqkGdrup3I3jxMh2ApAxLi +zem/M6Eke2OAqt+SzRPqc5GXH/nrWVd3wqg48DZOAR0jVTY2e0fWy+Er/cPJI1lc +L6ZMIRJikHTXkaiFj2Jct1iWvgizx5HZJBxXJn2Awix5nvc+zmXM0ZhoedbJRoBC +dGkRXd3xv2F4lqgVHtP3Ydjc/wYoPiGudSAkhyl9tnkHjvIjA/LeRNshWHbCIaQX +yemnXIcyyf+W+7EK0gXio7uiP+QSoM5v/oeVMQIDAQABAoIBAQCa6roHW8JGYipu +vsau3v5TOOtsHN67n3arDf6MGwfM5oLN1ffmF6SMs8myv36781hBMRv3FwjWHSf+ +pgz9o6zsbd05Ii8/m3yiXq609zZT107ZeYuU1mG5AL5uCNWjvhn5cdA6aX0RFwC0 ++tnjEyJ/NCS8ujBR9n/wA8IxrEKoTGcxRb6qFPPKWYoBevu34td1Szf0kH8AKjtQ +rdPK0Of/ZEiAUxNMLTBEOmC0ZabxJV/YGWcUU4DpmEDZSgQSr4yLT4BFUwF2VC8t +8VXn5dBP3RMo4h7JlteulcKYsMQZXD6KvUwY2LaEpFM/b14r+TZTUQGhwS+Ha11m +xa4eNwFhAoGBANshGlpR9cUUq8vNex0Wb63P9BTRTXwg1yEJVMSua+DlaaqaX/hS +hOxl3K4y2V5OCK31C+SOAqqbrGtMXVym5c5pX8YyC11HupFJwdFLUEc74uF3CtWY +GMMvEvItCK5ZvYvS5I2CQGcp1fhEMle/Uz+hFi1eeWepMqgHbVx5vkdtAoGBANRv +XYQsTAGSkhcHB++/ASDskAew5EoHfwtJzSX0BZC6DCACF/U4dCKzBVndOrELOPXs +2CZXCG4ptWzNgt6YTlMX9U7nLei5pPjoivIJsMudnc22DrDS7C94rCk++M3JeLOM +KSN0ou9+1iEdE7rQdMgZMryaY71OBonCIDsWgJZVAoGAB+k0CFq5IrpSUXNDpJMw +yPee+jlsMLUGzzyFAOzDHEVsASq9mDtybQ5oXymay1rJ2W3lVgUCd6JTITSKklO8 +LC2FtaQM4Ps78w7Unne3mDrDQByKGZf6HOHQL0oM7C51N10Pv0Qaix7piKL9pklT ++hIYuN6WR3XGTGaoPhRvGCkCgYBqaQ5y8q1v7Dd5iXAUS50JHPZYo+b2niKpSOKW +LFHNWSRRtDrD/u9Nolb/2K1ZmcGCjo0HR3lVlVbnlVoEnk49mTaru2lntfZJKFLR +QsFofR9at+NL95uPe+bhEkYW7uCjL4Y72GT1ipdAJwyG+3xD7ztW9g8X+EmWH8N9 +VZw7sQKBgGxp820jbjWhG1O9RnYLwflcZzUlSkhWJDg9tKJXBjD+hFX98Okuf0gu +DUpdbxbJHSi0xAjOjLVswNws4pVwzgtZVK8R7k8j3Z5TtYTJTSQLfgVowuyEdAaI +C8OxVJ/At/IJGnWSIz8z+/YCUf7p4jd2LJgmZVVzXeDsOFcH62gu +-----END RSA PRIVATE KEY-----` + + caPEM = `-----BEGIN CERTIFICATE----- +MIIDPDCCAiQCCQCBYUuEuypDMTANBgkqhkiG9w0BAQsFADBgMQswCQYDVQQGEwJV +UzETMBEGA1UECAwKQ2FsaWZvcm5pYTEWMBQGA1UEBwwNU2FuIEZyYW5jaXNjbzEQ +MA4GA1UECgwHU2VnbWVudDESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTE3MTIyMzE1 +NTMxOVoXDTI3MTIyMTE1NTMxOVowYDELMAkGA1UEBhMCVVMxEzARBgNVBAgMCkNh +bGlmb3JuaWExFjAUBgNVBAcMDVNhbiBGcmFuY2lzY28xEDAOBgNVBAoMB1NlZ21l +bnQxEjAQBgNVBAMMCWxvY2FsaG9zdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCC +AQoCggEBAJwB+Yp6MyUepgtaRDxVjpMI2RmlAaV1qApMWu60LWGKJs4KWoIoLl6p +oSEqnWrpMmb38pyGP99X1+t3uZjiK9L8nFhuKZ581tsTKLxaSl+YVg7JbH5LVCS6 +opsfB5ON1gJxf1HA9YyMqKHkBFh8/hdOGR0T6Bll9TPO1NQB/UqMy/tKr3sA3KZm +XVDbRKSuUAQWz5J9/hLPmVMU41F/uD7mvyDY+x8GymInZjUXG4e0oq2RJgU6SYZ8 +mkscM6qhKY3mL487w/kHVFtFlMkOhvI7LIh3zVvWwgGSAoAv9yai9BDZNFSk0cEb +bb/IK7BQW9sNI3lcnGirdbnjV94X9/sCAwEAATANBgkqhkiG9w0BAQsFAAOCAQEA +MJLeGdYO3dpsPx2R39Bw0qa5cUh42huPf8n7rp4a4Ca5jJjcAlCYV8HzqOzpiKYy +ZNuHy8LnNVYYh5Qoh8EO45bplMV1wnHfi6hW6DY5j3SQdcxkoVsW5R7rBF7a7SDg +6uChVRPHgsnALUUc7Wvvd3sAs/NKHzHu86mgD3EefkdqWAaCapzcqT9mo9KXkWJM +DhSJS+/iIaroc8umDnbPfhhgnlMf0/D4q0TjiLSSqyLzVifxnv9yHz56TrhHG/QP +E/8+FEGCHYKM4JLr5smGlzv72Kfx9E1CkG6TgFNIHjipVv1AtYDvaNMdPF2533+F +wE3YmpC3Q0g9r44nEbz4Bw== +-----END CERTIFICATE-----` + keyPassword = "" +) + +type certsConfig struct { + CertPEM string + KeyPEM string + KeyPassword string + CaPEM string +} + +func tlsConfig(t *testing.T) *certsConfig { + tmpDir := t.TempDir() + + tmpFileCa := writeTmpFile(tmpDir, "test-ca.pem", []byte(caPEM)) + tmpFileKey := writeTmpFile(tmpDir, "test-key.key", []byte(keyPEM)) + tmpFileCert := writeTmpFile(tmpDir, "test-cert.cert", []byte(certPEM)) + + return &certsConfig{ + CertPEM: tmpFileCert.Name(), + KeyPEM: tmpFileKey.Name(), + KeyPassword: keyPassword, + CaPEM: tmpFileCa.Name(), + } +} + +func writeTmpFile(dirname string, filename string, contents []byte) *os.File { + f, err := ioutil.TempFile(dirname, filename) + if err != nil { + log.Fatalf("cannot create temporary file %s/%s: %v", dirname, filename, err) + } + + _, err = f.Write(contents) + if err != nil { + log.Fatalf("cannot write to temporary file %s/%s: %v", dirname, filename, err) + } + + return f +} + +func TestWorkerKafkaConfigurations(t *testing.T) { + type test struct { + name string + kafkaConfig kafka.ConfigMap + expectError bool + } + + g := NewWithT(t) + config := tlsConfig(t) + + tests := []test{ + { + name: "All options are valid", + kafkaConfig: kafka.ConfigMap{ + "security.protocol": "SSL", + "ssl.ca.location": config.CaPEM, + "ssl.key.location": config.KeyPEM, + "ssl.certificate.location": config.CertPEM, + "ssl.key.password": config.KeyPassword, + }, + expectError: false, + }, + { + name: "CA cert location is invalid", + kafkaConfig: kafka.ConfigMap{ + "security.protocol": "SSL", + "ssl.ca.location": "/fakepath/to/test-ca.pem", + "ssl.key.location": config.KeyPEM, + "ssl.certificate.location": config.CertPEM, + "ssl.key.password": config.KeyPassword, + }, + expectError: true, + }, + { + name: "Private key file location is invalid", + kafkaConfig: kafka.ConfigMap{ + "security.protocol": "SSL", + "ssl.ca.location": config.CaPEM, + "ssl.key.location": "/fakepath/to/test-access.key", + "ssl.certificate.location": config.CertPEM, + "ssl.key.password": config.KeyPassword, + }, + expectError: true, + }, + { + name: "Public key certificate location is invalid", + kafkaConfig: kafka.ConfigMap{ + "security.protocol": "SSL", + "ssl.ca.location": config.CaPEM, + "ssl.key.location": config.KeyPEM, + "ssl.certificate.location": "/fakepath/to/test-cert.cert", + "ssl.key.password": config.KeyPassword, + }, + expectError: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := kafka.NewProducer(&tt.kafkaConfig) + if tt.expectError { + g.Expect(err).ToNot(BeNil()) + } else { + g.Expect(err).To(BeNil()) + } + }) + } +} diff --git a/executor/predictor/predictor_process_test.go b/executor/predictor/predictor_process_test.go index 8aacf268c8..fb310a20da 100644 --- a/executor/predictor/predictor_process_test.go +++ b/executor/predictor/predictor_process_test.go @@ -7,9 +7,10 @@ import ( "net/http" "net/http/httptest" "net/url" - "sigs.k8s.io/controller-runtime/pkg/log/zap" "testing" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + "github.com/golang/protobuf/jsonpb" . "github.com/onsi/gomega" "github.com/seldonio/seldon-core/executor/api/grpc" @@ -17,7 +18,7 @@ import ( "github.com/seldonio/seldon-core/executor/api/payload" "github.com/seldonio/seldon-core/executor/api/test" "github.com/seldonio/seldon-core/executor/logger" - "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1" + v1 "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1" logf "sigs.k8s.io/controller-runtime/pkg/log" )