Skip to content

Commit

Permalink
Revert "Revert multiplexing in the Executor (#2365)"
Browse files Browse the repository at this point in the history
This reverts commit 0cfbc38.
  • Loading branch information
axsaucedo authored Sep 17, 2020
1 parent 8a6da89 commit ee36a03
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 97 deletions.
106 changes: 67 additions & 39 deletions executor/cmd/executor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@ package main

import (
"context"
"crypto/tls"
"flag"
"fmt"
"log"
"net"
"net/url"
"os"
"os/signal"
"path"
"strings"
"syscall"
"time"

"strconv"

"github.com/go-logr/logr"
"github.com/seldonio/seldon-core/executor/api"
seldonclient "github.com/seldonio/seldon-core/executor/api/client"
Expand All @@ -29,10 +30,12 @@ import (
loghandler "github.com/seldonio/seldon-core/executor/logger"
predictor2 "github.com/seldonio/seldon-core/executor/predictor"
"github.com/seldonio/seldon-core/executor/proto/tensorflow/serving"
v1 "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1"
"github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1"
"github.com/soheilhy/cmux"
"go.uber.org/zap"
zapf "sigs.k8s.io/controller-runtime/pkg/log/zap"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
"strconv"
)

const (
Expand All @@ -53,8 +56,7 @@ var (
sdepName = flag.String("sdep", "", "Seldon deployment name")
namespace = flag.String("namespace", "", "Namespace")
predictorName = flag.String("predictor", "", "Name of the predictor inside the SeldonDeployment")
httpPort = flag.Int("http_port", 8080, "Executor port")
grpcPort = flag.Int("grpc_port", 8000, "Executor port")
port = flag.Int("port", 8080, "Executor port")
wait = flag.Duration("graceful_timeout", time.Second*15, "Graceful shutdown secs")
protocol = flag.String("protocol", "seldon", "The payload protocol")
transport = flag.String("transport", "rest", "The network transport mechanism rest, grpc")
Expand Down Expand Up @@ -87,7 +89,7 @@ func getServerUrl(hostname string, port int) (*url.URL, error) {
return url.Parse(fmt.Sprintf("http://%s:%d/", hostname, port))
}

func runHttpServer(logger logr.Logger, predictor *v1.PredictorSpec, client seldonclient.SeldonApiClient, port int,
func runHttpServer(lis net.Listener, logger logr.Logger, predictor *v1.PredictorSpec, client seldonclient.SeldonApiClient, port int,
probesOnly bool, serverUrl *url.URL, namespace string, protocol string, deploymentName string, prometheusPath string) {

// Create REST API
Expand All @@ -96,7 +98,7 @@ func runHttpServer(logger logr.Logger, predictor *v1.PredictorSpec, client seldo
srv := seldonRest.CreateHttpServer(port)

go func() {
if err := srv.ListenAndServe(); err != nil {
if err := srv.Serve(lis); err != nil {
logger.Error(err, "Server error")
}
logger.Info("server started")
Expand Down Expand Up @@ -125,11 +127,7 @@ func runHttpServer(logger logr.Logger, predictor *v1.PredictorSpec, client seldo

}

func runGrpcServer(logger logr.Logger, predictor *v1.PredictorSpec, client seldonclient.SeldonApiClient, port int, serverUrl *url.URL, namespace string, protocol string, deploymentName string, annotations map[string]string) {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
func runGrpcServer(lis net.Listener, logger logr.Logger, predictor *v1.PredictorSpec, client seldonclient.SeldonApiClient, port int, serverUrl *url.URL, namespace string, protocol string, deploymentName string, annotations map[string]string) {
grpcServer, err := grpc.CreateGrpcServer(predictor, deploymentName, annotations, logger)
if err != nil {
log.Fatalf("Failed to create gRPC server: %v", err)
Expand Down Expand Up @@ -236,15 +234,7 @@ func main() {
*kafkaWorkers = kafkaWorkersFromEnvInt
}
}
}

if !(*transport == "rest" || *transport == "grpc") {
log.Fatal("Only rest and grpc supported")
}

serverUrl, err := getServerUrl(*hostname, *httpPort)
if err != nil {
log.Fatal("Failed to create server url from", *hostname, *httpPort)
}

setupLogger()
Expand All @@ -259,6 +249,12 @@ func main() {
} else {
logger.Info("Hostname found from env", "hostname", *hostname)
}
} else {
logger.Info("Hostname provided on command line", "hostname", *hostname)
}
serverUrl, err := getServerUrl(*hostname, *port)
if err != nil {
log.Fatal("Failed to create server url from", *hostname, *port)
}

predictor, err := predictor2.GetPredictor(*predictorName, *filename, *sdepName, *namespace, configPath)
Expand Down Expand Up @@ -288,6 +284,52 @@ func main() {
log.Fatal("Could not initialize jaeger tracer", err.Error())
}
defer closer.Close()
// Create a listener at the desired port.
var lis net.Listener
if len(certMountPath) > 0 {
logger.Info("Creating TLS listener", "port", *port)
certPath := path.Join(certMountPath, certFileName)
keyPath := path.Join(certMountPath, certKeyFileName)
cert, err := tls.LoadX509KeyPair(certPath, keyPath)
if err != nil {
log.Fatalf("Error certificate could not be found: %v", err)
}
lis, err = tls.Listen("tcp", fmt.Sprintf(":%d", *port), &tls.Config{Certificates: []tls.Certificate{cert}})
if err != nil {
log.Fatalf("failed to create listener: %v", err)
}
} else {
logger.Info("Creating non-TLS listener", "port", *port)
lis, err = net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to create listener: %v", err)
}
}
defer lis.Close()

// Create a cmux object.
tcpm := cmux.New(lis)

// Declare the match for different services required.
httpl := tcpm.Match(cmux.HTTP1Fast())
grpcl := tcpm.MatchWithWriters(
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))

logger.Info("Running grpc server ", "port", *port)
var clientGrpc seldonclient.SeldonApiClient
if *protocol == "seldon" {
clientGrpc = seldon.NewSeldonGrpcClient(predictor, *sdepName, annotations)
} else {
clientGrpc = tensorflow.NewTensorflowGrpcClient(predictor, *sdepName, annotations)
}
go runGrpcServer(grpcl, logger, predictor, clientGrpc, *port, serverUrl, *namespace, *protocol, *sdepName, annotations)

clientRest, err := rest.NewJSONRestClient(*protocol, *sdepName, predictor, annotations)
if err != nil {
log.Fatalf("Failed to create http client: %v", err)
}
logger.Info("Running http server ", "port", *port)
go runHttpServer(httpl, logger, predictor, clientRest, *port, false, serverUrl, *namespace, *protocol, *sdepName, *prometheusPath)

if *serverType == "kafka" {
logger.Info("Starting kafka server")
Expand All @@ -303,24 +345,10 @@ func main() {
}()
}

if *transport == "rest" {
clientRest, err := rest.NewJSONRestClient(*protocol, *sdepName, predictor, annotations)
if err != nil {
log.Fatalf("Failed to create http client: %v", err)
}
logger.Info("Running http server ", "port", *httpPort)
runHttpServer(logger, predictor, clientRest, *httpPort, false, serverUrl, *namespace, *protocol, *sdepName, *prometheusPath)
} else {
logger.Info("Running http probes only server ", "port", *httpPort)
go runHttpServer(logger, predictor, nil, *httpPort, true, serverUrl, *namespace, *protocol, *sdepName, *prometheusPath)
logger.Info("Running grpc server ", "port", *grpcPort)
var clientGrpc seldonclient.SeldonApiClient
if *protocol == "seldon" {
clientGrpc = seldon.NewSeldonGrpcClient(predictor, *sdepName, annotations)
} else {
clientGrpc = tensorflow.NewTensorflowGrpcClient(predictor, *sdepName, annotations)
}
runGrpcServer(logger, predictor, clientGrpc, *grpcPort, serverUrl, *namespace, *protocol, *sdepName, annotations)

// Start cmux serving.
if err := tcpm.Serve(); !strings.Contains(err.Error(),
"use of closed network connection") {
log.Fatal(err)
}

}
Loading

0 comments on commit ee36a03

Please sign in to comment.