Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Seldon add ssl #3813

Merged
merged 20 commits into from
Jan 17, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions executor/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ openapi/
executor/api/rest/openapi/
triton-inference-server/
_operator
certs/
2 changes: 2 additions & 0 deletions executor/Dockerfile.executor
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ COPY api/ api/
COPY predictor/ predictor/
COPY logger/ logger/
COPY k8s/ k8s/
COPY certs/ certs/
stephen37 marked this conversation as resolved.
Show resolved Hide resolved

# Build
RUN go build -a -o executor cmd/executor/main.go
Expand Down Expand Up @@ -55,6 +56,7 @@ RUN chmod -R 660 /openapi/
FROM gcr.io/distroless/base-debian10:latest
WORKDIR /
COPY --from=builder /workspace/executor .
COPY --from=builder /workspace/certs/ /certs/
COPY licenses/license.txt licenses/license.txt
COPY --from=builder /workspace/*.tar.gz licenses/mpl_source/

Expand Down
2 changes: 1 addition & 1 deletion executor/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ IMG_REDHAT ?= seldonio/${IMG_VERSION_REDHAT}

EXECUTOR_FOLDERS ?= ./api/... ./predictor/... ./k8s/... ./logger/...

KIND_NAME ?= kind
KIND_NAME ?= ansible

# Get the currently used golang install path (in GOPATH/bin, unless GOBIN is set)
ifeq (,$(shell go env GOBIN))
Expand Down
63 changes: 49 additions & 14 deletions executor/api/kafka/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ package kafka

import (
"fmt"
"net/url"
"os"
"os/signal"
"reflect"
"syscall"
"time"

"github.com/cloudevents/sdk-go/pkg/bindings/http"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/go-logr/logr"
Expand All @@ -13,14 +20,9 @@ import (
"github.com/seldonio/seldon-core/executor/api/grpc/tensorflow"
"github.com/seldonio/seldon-core/executor/api/payload"
"github.com/seldonio/seldon-core/executor/api/rest"
"github.com/seldonio/seldon-core/executor/api/util"
"github.com/seldonio/seldon-core/executor/predictor"
v1 "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1"
"net/url"
"os"
"os/signal"
"reflect"
"syscall"
"time"
)

const (
Expand All @@ -29,13 +31,22 @@ const (
)

const (
ENV_KAFKA_BROKER = "KAFKA_BROKER"
ENV_KAFKA_INPUT_TOPIC = "KAFKA_INPUT_TOPIC"
ENV_KAFKA_OUTPUT_TOPIC = "KAFKA_OUTPUT_TOPIC"
ENV_KAFKA_FULL_GRAPH = "KAFKA_FULL_GRAPH"
ENV_KAFKA_WORKERS = "KAFKA_WORKERS"
ENV_KAFKA_BROKER = "KAFKA_BROKER"
ENV_KAFKA_INPUT_TOPIC = "KAFKA_INPUT_TOPIC"
ENV_KAFKA_OUTPUT_TOPIC = "KAFKA_OUTPUT_TOPIC"
ENV_KAFKA_FULL_GRAPH = "KAFKA_FULL_GRAPH"
ENV_KAFKA_WORKERS = "KAFKA_WORKERS"
ENV_KAFKA_SECURITY_PROTOCOL = "KAFKA_SECURITY_PROTOCOL"
agrski marked this conversation as resolved.
Show resolved Hide resolved
)

type SslKakfa struct {
kafkaSslClientCertFile string
kafkaSslClientKeyFile string
kafkaSslCACertFile string
kafkaSecurityProtocol string
kafkaSslClientKeyPass string
}

type SeldonKafkaServer struct {
Client client.SeldonApiClient
Producer *kafka.Producer
Expand All @@ -49,6 +60,18 @@ type SeldonKafkaServer struct {
ServerUrl *url.URL
Workers int
Log logr.Logger
// KafkaSecurityProtocol string
agrski marked this conversation as resolved.
Show resolved Hide resolved
}

func getSslElements() *SslKakfa {
sslElements := SslKakfa{
kafkaSslClientCertFile: util.GetEnv("KAFKA_SSL_CLIENT_CERT_FILE", ""),
kafkaSslClientKeyFile: util.GetEnv("KAFKA_SSL_CLIENT_KEY_FILE", ""),
kafkaSslCACertFile: util.GetEnv("KAFKA_SSL_CA_CERT_FILE", ""),
kafkaSecurityProtocol: util.GetEnv("KAFKA_SECURITY_PROTOCOL", ""),
agrski marked this conversation as resolved.
Show resolved Hide resolved
kafkaSslClientKeyPass: util.GetEnv("KAFKA_SSL_CLIENT_KEY_PASS", ""),
}
return &sslElements
}

func NewKafkaServer(fullGraph bool, workers int, deploymentName, namespace, protocol, transport string, annotations map[string]string, serverUrl *url.URL, predictor *v1.PredictorSpec, broker, topicIn, topicOut string, log logr.Logger) (*SeldonKafkaServer, error) {
Expand Down Expand Up @@ -76,12 +99,24 @@ func NewKafkaServer(fullGraph bool, workers int, deploymentName, namespace, prot
return nil, fmt.Errorf("Unknown transport %s", transport)
}
}
sslKakfaServer := getSslElements()
var producerConfigMap = kafka.ConfigMap{"bootstrap.servers": broker,
"go.delivery.reports": false, // Need this othewise will get memory leak
}
if broker != "" {
if sslKakfaServer.kafkaSecurityProtocol != "" {
// producerConfigMap["debug"] = "security,broker,protocol,metadata,topic"
producerConfigMap["security.protocol"] = sslKakfaServer.kafkaSecurityProtocol
producerConfigMap["ssl.ca.location"] = sslKakfaServer.kafkaSslCACertFile
producerConfigMap["ssl.key.location"] = sslKakfaServer.kafkaSslClientKeyFile
producerConfigMap["ssl.certificate.location"] = sslKakfaServer.kafkaSslClientCertFile
producerConfigMap["ssl.key.password"] = sslKakfaServer.kafkaSslClientKeyPass // Key password, if any

}
}
// Create Producer
log.Info("Creating producer", "broker", broker)
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker,
"go.delivery.reports": false, // Need this othewise will get memory leak
})
p, err := kafka.NewProducer(&producerConfigMap)
if err != nil {
return nil, err
}
Expand Down
28 changes: 26 additions & 2 deletions executor/logger/dispatcher.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,29 @@
package logger

import (
"github.com/go-logr/logr"
"os"

"github.com/go-logr/logr"
"github.com/seldonio/seldon-core/executor/api/util"
)

const (
ENV_LOGGER_KAFKA_BROKER = "LOGGER_KAFKA_BROKER"
ENV_LOGGER_KAFKA_TOPIC = "LOGGER_KAFKA_TOPIC"
)

var WorkerQueue chan chan LogRequest
agrski marked this conversation as resolved.
Show resolved Hide resolved

func getSslElements() *SslKakfa {
agrski marked this conversation as resolved.
Show resolved Hide resolved
sslElements := SslKakfa{
kafkaSslClientCertFile: util.GetEnv("KAFKA_SSL_CLIENT_CERT_FILE", ""),
kafkaSslClientKeyFile: util.GetEnv("KAFKA_SSL_CLIENT_KEY_FILE", ""),
kafkaSslCACertFile: util.GetEnv("KAFKA_SSL_CA_CERT_FILE", ""),
kafkaSecurityProtocol: util.GetEnv("KAFKA_SECURITY_PROTOCOL", ""),
kafkaSslClientKeyPass: util.GetEnv("KAFKA_SSL_CLIENT_KEY_PASS", ""),
}
return &sslElements
}
func StartDispatcher(nworkers int, logBufferSize int, writeTimeoutMs int, log logr.Logger, sdepName string, namespace string, predictorName string, kafkaBroker string, kafkaTopic string) error {
if kafkaBroker == "" {
kafkaBroker = os.Getenv(ENV_LOGGER_KAFKA_BROKER)
Expand All @@ -22,14 +36,24 @@ func StartDispatcher(nworkers int, logBufferSize int, writeTimeoutMs int, log lo
kafkaTopic = "seldon"
}
}
sslKakfa := getSslElements()
log.Info("kafkaSslClientCertFile", "clientcertfile", sslKakfa.kafkaSslClientCertFile)
if sslKakfa.kafkaSslClientCertFile != "" && sslKakfa.kafkaSslClientKeyFile != "" && sslKakfa.kafkaSslCACertFile != "" {
if sslKakfa.kafkaSecurityProtocol == "" {
sslKakfa.kafkaSecurityProtocol = "ssl"
}

// if kafkaSecurityProtocol != "ssl" && kafkaSecurityProtocol != "sasl_ssl" {
// log.Error("invalid config: kafka security protocol is not ssl based but ssl config is provided")
// }
}
workQueue = make(chan LogRequest, logBufferSize)
writeTimeoutMilliseconds = writeTimeoutMs

// Now, create all of our workers.
for i := 0; i < nworkers; i++ {
log.Info("Starting", "worker", i+1)
worker, err := NewWorker(i+1, workQueue, log, sdepName, namespace, predictorName, kafkaBroker, kafkaTopic)
worker, err := NewWorker(i+1, workQueue, log, sdepName, namespace, predictorName, kafkaBroker, kafkaTopic, *sslKakfa)
if err != nil {
return err
}
Expand Down
8 changes: 8 additions & 0 deletions executor/logger/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,11 @@ type LogRequest struct {
ModelId string
RequestId string
}

type SslKakfa struct {
kafkaSslClientCertFile string
kafkaSslClientKeyFile string
kafkaSslCACertFile string
kafkaSecurityProtocol string
kafkaSslClientKeyPass string
}
29 changes: 23 additions & 6 deletions executor/logger/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,29 @@ const (
// NewWorker creates, and returns a new Worker object. Its only argument
// is a channel that the worker can add itself to whenever it is done its
// work.
func NewWorker(id int, workQueue chan LogRequest, log logr.Logger, sdepName string, namespace string, predictorName string, kafkaBroker string, kafkaTopic string) (*Worker, error) {
func NewWorker(id int, workQueue chan LogRequest, log logr.Logger, sdepName string, namespace string, predictorName string, kafkaBroker string, kafkaTopic string, sslKakfa SslKakfa) (*Worker, error) {

var producer *kafka.Producer
var err error
if kafkaBroker != "" {
log.Info("Creating producer", "broker", kafkaBroker, "topic", kafkaTopic)
producer, err = kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": kafkaBroker,
var producerConfigMap = kafka.ConfigMap{"bootstrap.servers": kafkaBroker,
"go.delivery.reports": false, // Need this othewise will get memory leak
})
}
log.Info("kafkaSecurityProtocol", "kafkaSecurityProtocol", sslKakfa.kafkaSecurityProtocol)
if kafkaBroker != "" {
if sslKakfa.kafkaSecurityProtocol != "" {
agrski marked this conversation as resolved.
Show resolved Hide resolved

// producerConfigMap["debug"] = "security,broker,protocol,metadata,topic"
producerConfigMap["security.protocol"] = sslKakfa.kafkaSecurityProtocol
producerConfigMap["ssl.ca.location"] = sslKakfa.kafkaSslCACertFile
producerConfigMap["ssl.key.location"] = sslKakfa.kafkaSslClientKeyFile
producerConfigMap["ssl.certificate.location"] = sslKakfa.kafkaSslClientCertFile
producerConfigMap["ssl.key.password"] = sslKakfa.kafkaSslClientKeyPass // Key password, if any

}
}
producer, err = kafka.NewProducer(&producerConfigMap)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -94,7 +107,7 @@ func getCEType(logReq LogRequest) (string, error) {
}

func (w *Worker) sendKafkaEvent(logReq LogRequest) error {

w.Log.Info("SENDING KAFKA EVENT", "LogReq", logReq)
reqType, err := getCEType(logReq)
if err != nil {
return err
Expand All @@ -109,12 +122,15 @@ func (w *Worker) sendKafkaEvent(logReq LogRequest) error {
{Key: NamespaceAttr, Value: []byte(w.Namespace)},
{Key: EndpointAttr, Value: []byte(w.PredictorName)},
}

w.Log.Info("kafkaHeaders is", "kafkaHeaders", kafkaHeaders)
err = w.Producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &w.KafkaTopic, Partition: kafka.PartitionAny},
Value: *logReq.Bytes,
Headers: kafkaHeaders,
}, nil)
w.Log.Info("Topic Partition:", "topic", kafka.TopicPartition{Topic: &w.KafkaTopic, Partition: kafka.PartitionAny})
w.Log.Info("Value:", "logReqBytes", *logReq.Bytes)
w.Log.Info("Kafka Headers:", "headers", kafkaHeaders)
agrski marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
w.Log.Error(err, "Failed to produce response")
return err
Expand Down Expand Up @@ -187,6 +203,7 @@ func (w *Worker) Start() {
// Receive a work request.

if w.KafkaTopic != "" {
w.Log.Info("KAFKA TOPIC IS NOT NULL", "Topic", w.KafkaTopic)
if err := w.sendKafkaEvent(work); err != nil {
w.Log.Error(err, "Failed to send kafka log", "Topic", w.KafkaTopic)
}
Expand Down
Loading