Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Seldon add ssl #3813

Merged
merged 20 commits into from
Jan 17, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions doc/source/analytics/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,59 @@ The two required environment variables are:
* LOGGER_KAFKA_BROKER : The Kafka Broker service endpoint.
* LOGGER_KAFKA_TOPIC : The kafka Topic to log the requests.

### Logging to encrypted Kafka with SSL

You can log requests to an encrypted Kafka with SSL. SSL uses private-key/ certificate pairs, which are used during the SSL handshake process.

To be able to log payloads, the client needs:
* to authenticate with SSL
* its own keystore, made up of a key pair and a signed certificate
* the CA certificate used to sign the key-certificate pair

The CA certificate needs to be recognised by the broker and can also be used for verifying the broker's certificate. It is possible to read more about the different options available on [Confluent documentation](https://docs.confluent.io/platform/current/kafka/authentication_ssl.html) and [librdkafka Configuration](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) page.
stephen37 marked this conversation as resolved.
Show resolved Hide resolved

Here is an example on how to define these for a deployment:

```yaml
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
name: cifar10
namespace: seldon
spec:
name: resnet32
predictors:
- graph:
implementation: TRITON_SERVER
logger:
mode: all
modelUri: gs://seldon-models/triton/tf_cifar10
name: cifar10
name: default
svcOrchSpec:
env:
- name: LOGGER_KAFKA_BROKER
value: seldon-kafka-plain-0.kafka:9092
- name: LOGGER_KAFKA_TOPIC
value: seldon
- name: KAFKA_SECURITY_PROTOCOL
value: ssl
- name: KAFKA_SSL_CA_CERT_FILE
value: /path/to/ca.pem
- name: KAFKA_SSL_CLIENT_CERT_FILE
value: /path/to/access.cert
- name: KAFKA_SSL_CLIENT_KEY_FILE
value: /path/to/access.key
- name: KAFKA_SSL_CLIENT_KEY_PASS
valueFrom:
secretKeyRef:
name: my-kafka-secret
key: ssl-password # Key password, if any (optional field)
replicas: 1
protocol: kfserving

```
=
stephen37 marked this conversation as resolved.
Show resolved Hide resolved
Follow a [benchmarking notebook for CIFAR10 image payload logging showing 3K predictions per second with Triton Inference Server](../examples/kafka_logger.html).

## Setting Global Default
Expand Down
1 change: 1 addition & 0 deletions executor/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ openapi/
executor/api/rest/openapi/
triton-inference-server/
_operator
certs/
1 change: 1 addition & 0 deletions executor/Dockerfile.executor
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ COPY api/ api/
COPY predictor/ predictor/
COPY logger/ logger/
COPY k8s/ k8s/
COPY certs/ certs/
stephen37 marked this conversation as resolved.
Show resolved Hide resolved

# Build
RUN go build -a -o executor cmd/executor/main.go
Expand Down
31 changes: 22 additions & 9 deletions executor/api/kafka/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ package kafka

import (
"fmt"
"net/url"
"os"
"os/signal"
"reflect"
"syscall"
"time"

"github.com/cloudevents/sdk-go/pkg/bindings/http"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/go-logr/logr"
Expand All @@ -13,14 +20,9 @@ import (
"github.com/seldonio/seldon-core/executor/api/grpc/tensorflow"
"github.com/seldonio/seldon-core/executor/api/payload"
"github.com/seldonio/seldon-core/executor/api/rest"
"github.com/seldonio/seldon-core/executor/api/util"
"github.com/seldonio/seldon-core/executor/predictor"
v1 "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1"
"net/url"
"os"
"os/signal"
"reflect"
"syscall"
"time"
)

const (
Expand Down Expand Up @@ -76,12 +78,23 @@ func NewKafkaServer(fullGraph bool, workers int, deploymentName, namespace, prot
return nil, fmt.Errorf("Unknown transport %s", transport)
}
}
sslKakfaServer := util.GetSslElements()
var producerConfigMap = kafka.ConfigMap{"bootstrap.servers": broker,
"go.delivery.reports": false, // Need this othewise will get memory leak
}
if broker != "" {
if util.GetKafkaSecurityProtocol() == "SSL" {
producerConfigMap["security.protocol"] = util.GetKafkaSecurityProtocol()
producerConfigMap["ssl.ca.location"] = sslKakfaServer.CACertFile
producerConfigMap["ssl.key.location"] = sslKakfaServer.ClientKeyFile
producerConfigMap["ssl.certificate.location"] = sslKakfaServer.ClientCertFile
producerConfigMap["ssl.key.password"] = sslKakfaServer.ClientKeyPass // Key password, if any

}
}
// Create Producer
log.Info("Creating producer", "broker", broker)
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker,
"go.delivery.reports": false, // Need this othewise will get memory leak
})
p, err := kafka.NewProducer(&producerConfigMap)
if err != nil {
return nil, err
}
Expand Down
26 changes: 26 additions & 0 deletions executor/api/util/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"os"
"strconv"
"strings"

"github.com/golang/protobuf/jsonpb"
"github.com/seldonio/seldon-core/executor/api/grpc/seldon/proto"
Expand Down Expand Up @@ -141,3 +142,28 @@ func GetEnvAsBool(key string, fallback bool) bool {

return fallback
}

type SslKakfa struct {
ClientCertFile string
ClientKeyFile string
CACertFile string
ClientKeyPass string
}

func (o SslKakfa) String() string {
return "SslKakfa"
}

func GetKafkaSecurityProtocol() string {
return strings.ToUpper(GetEnv("KAFKA_SECURITY_PROTOCOL", ""))
}

func GetSslElements() *SslKakfa {
sslElements := SslKakfa{
ClientCertFile: GetEnv("KAFKA_SSL_CLIENT_CERT_FILE", ""),
ClientKeyFile: GetEnv("KAFKA_SSL_CLIENT_KEY_FILE", ""),
CACertFile: GetEnv("KAFKA_SSL_CA_CERT_FILE", ""),
ClientKeyPass: GetEnv("KAFKA_SSL_CLIENT_KEY_PASS", ""),
}
return &sslElements
}
7 changes: 7 additions & 0 deletions executor/api/util/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,10 @@ func TestInjectRouteSeldonJson(t *testing.T) {

g.Expect(routes).To(Equal(testRouting))
}

func TestSSLSecurityProtocol(t *testing.T) {
g := NewGomegaWithT(t)
os.Setenv("KAFKA_SECURITY_PROTOCOL", "ssl")
val := GetKafkaSecurityProtocol()
g.Expect(val).To(Equal("SSL"))
}
4 changes: 2 additions & 2 deletions executor/logger/dispatcher.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package logger

import (
"github.com/go-logr/logr"
"os"

"github.com/go-logr/logr"
)

const (
Expand All @@ -25,7 +26,6 @@ func StartDispatcher(nworkers int, logBufferSize int, writeTimeoutMs int, log lo

workQueue = make(chan LogRequest, logBufferSize)
writeTimeoutMilliseconds = writeTimeoutMs

// Now, create all of our workers.
for i := 0; i < nworkers; i++ {
log.Info("Starting", "worker", i+1)
Expand Down
20 changes: 15 additions & 5 deletions executor/logger/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/go-logr/logr"
"github.com/seldonio/seldon-core/executor/api/payload"
"github.com/seldonio/seldon-core/executor/api/util"
)

const (
Expand All @@ -35,12 +36,22 @@ func NewWorker(id int, workQueue chan LogRequest, log logr.Logger, sdepName stri

var producer *kafka.Producer
var err error
sslKafka := util.GetSslElements()
stephen37 marked this conversation as resolved.
Show resolved Hide resolved
if kafkaBroker != "" {
log.Info("Creating producer", "broker", kafkaBroker, "topic", kafkaTopic)
producer, err = kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": kafkaBroker,
var producerConfigMap = kafka.ConfigMap{"bootstrap.servers": kafkaBroker,
"go.delivery.reports": false, // Need this othewise will get memory leak
})
}
log.Info("kafkaSecurityProtocol", "kafkaSecurityProtocol", util.GetKafkaSecurityProtocol())
if util.GetKafkaSecurityProtocol() == "SSL" {
producerConfigMap["security.protocol"] = util.GetKafkaSecurityProtocol()
producerConfigMap["ssl.ca.location"] = sslKafka.CACertFile
producerConfigMap["ssl.key.location"] = sslKafka.ClientKeyFile
producerConfigMap["ssl.certificate.location"] = sslKafka.ClientCertFile
producerConfigMap["ssl.key.password"] = sslKafka.ClientKeyPass // Key password, if any
}

producer, err = kafka.NewProducer(&producerConfigMap)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -94,7 +105,6 @@ func getCEType(logReq LogRequest) (string, error) {
}

func (w *Worker) sendKafkaEvent(logReq LogRequest) error {

reqType, err := getCEType(logReq)
if err != nil {
return err
Expand All @@ -109,7 +119,7 @@ func (w *Worker) sendKafkaEvent(logReq LogRequest) error {
{Key: NamespaceAttr, Value: []byte(w.Namespace)},
{Key: EndpointAttr, Value: []byte(w.PredictorName)},
}

w.Log.Info("kafkaHeaders is", "kafkaHeaders", kafkaHeaders)
err = w.Producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &w.KafkaTopic, Partition: kafka.PartitionAny},
Value: *logReq.Bytes,
Expand Down
Loading