Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supports gRPC as a protocol (without TLS) #987

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 12 additions & 2 deletions cmd/activator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
"github.com/knative/serving/pkg/activator"
clientset "github.com/knative/serving/pkg/client/clientset/versioned"
"github.com/knative/serving/pkg/controller"
h2cutil "github.com/knative/serving/pkg/h2c"
"github.com/knative/serving/pkg/signals"
"github.com/knative/serving/third_party/h2c"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
Expand All @@ -45,7 +47,13 @@ type activationHandler struct {
type retryRoundTripper struct{}

func (rrt retryRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
transport := http.DefaultTransport
var transport http.RoundTripper

transport = http.DefaultTransport
if r.ProtoMajor == 2 {
transport = h2cutil.NewTransport()
}

resp, err := transport.RoundTrip(r)
// TODO: Activator should retry with backoff.
// https://github.com/knative/serving/issues/1229
Expand Down Expand Up @@ -79,9 +87,11 @@ func (a *activationHandler) handler(w http.ResponseWriter, r *http.Request) {
}
proxy := httputil.NewSingleHostReverseProxy(target)
proxy.Transport = retryRoundTripper{}

// TODO: Clear the host to avoid 404's.
// https://github.com/elafros/elafros/issues/964
r.Host = ""

proxy.ServeHTTP(w, r)
}

Expand Down Expand Up @@ -114,5 +124,5 @@ func main() {
}()

http.HandleFunc("/", ah.handler)
http.ListenAndServe(":8080", nil)
h2c.ListenAndServe(":8080", nil)
}
66 changes: 45 additions & 21 deletions cmd/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@ import (
"syscall"
"time"

"github.com/knative/serving/pkg"

"github.com/knative/serving/cmd/util"
"github.com/knative/serving/pkg"
"github.com/knative/serving/pkg/apis/serving/v1alpha1"
"github.com/knative/serving/pkg/autoscaler"
h2cutil "github.com/knative/serving/pkg/h2c"
"github.com/knative/serving/pkg/logging"
"github.com/knative/serving/pkg/logging/logkey"
"github.com/knative/serving/pkg/queue"
"github.com/knative/serving/third_party/h2c"
"go.uber.org/zap"

"github.com/gorilla/websocket"
Expand All @@ -66,18 +67,21 @@ const (
)

var (
podName string
elaNamespace string
elaConfiguration string
elaRevision string
elaAutoscaler string
elaAutoscalerPort string
statChan = make(chan *autoscaler.Stat, statReportingQueueLength)
reqChan = make(chan queue.ReqEvent, requestCountingQueueLength)
kubeClient *kubernetes.Clientset
statSink *websocket.Conn
proxy *httputil.ReverseProxy
logger *zap.SugaredLogger
podName string
elaNamespace string
elaConfiguration string
elaRevision string
elaAutoscaler string
elaAutoscalerPort string
statChan = make(chan *autoscaler.Stat, statReportingQueueLength)
reqChan = make(chan queue.ReqEvent, requestCountingQueueLength)
kubeClient *kubernetes.Clientset
statSink *websocket.Conn
logger *zap.SugaredLogger

h2cProxy *httputil.ReverseProxy
httpProxy *httputil.ReverseProxy

concurrencyQuantumOfTime = flag.Duration("concurrencyQuantumOfTime", 100*time.Millisecond, "")
concurrencyModel = flag.String("concurrencyModel", string(v1alpha1.RevisionRequestConcurrencyModelMulti), "")
singleConcurrencyBreaker = queue.NewBreaker(singleConcurrencyQueueDepth, 1)
Expand Down Expand Up @@ -141,18 +145,29 @@ func statReporter() {
}
}

func proxyForRequest(req *http.Request) *httputil.ReverseProxy {
if req.ProtoMajor == 2 {
return h2cProxy
}

return httpProxy
}

func isProbe(r *http.Request) bool {
// Since K8s 1.8, prober requests have
// User-Agent = "kube-probe/{major-version}.{minor-version}".
return strings.HasPrefix(r.Header.Get("User-Agent"), "kube-probe/")
}

func handler(w http.ResponseWriter, r *http.Request) {
proxy := proxyForRequest(r)

if isProbe(r) {
// Do not count health checks for concurrency metrics
proxy.ServeHTTP(w, r)
return
}

// Metrics for autoscaling
reqChan <- queue.ReqIn
defer func() {
Expand Down Expand Up @@ -253,7 +268,10 @@ func main() {
if err != nil {
logger.Fatal("Failed to parse localhost url", zap.Error(err))
}
proxy = httputil.NewSingleHostReverseProxy(target)

httpProxy = httputil.NewSingleHostReverseProxy(target)
h2cProxy = httputil.NewSingleHostReverseProxy(target)
h2cProxy.Transport = h2cutil.NewTransport()

logger.Infof("Queue container is starting, concurrencyModel: %s", *concurrencyModel)
config, err := rest.InClusterConfig()
Expand Down Expand Up @@ -281,10 +299,15 @@ func main() {
}
}()

server := &http.Server{
Addr: fmt.Sprintf(":%d", queue.RequestQueuePort), Handler: nil}
adminServer := &http.Server{
Addr: fmt.Sprintf(":%d", queue.RequestQueueAdminPort), Handler: nil}
Addr: fmt.Sprintf(":%d", queue.RequestQueueAdminPort),
Handler: nil,
}

h2cServer := h2c.Server{Server: &http.Server{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just "server" again, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Addr: fmt.Sprintf(":%d", queue.RequestQueuePort),
Handler: http.HandlerFunc(handler),
}}

// Add a SIGTERM handler to gracefully shutdown the servers during
// pod termination.
Expand All @@ -294,11 +317,12 @@ func main() {
<-sigTermChan
// Calling server.Shutdown() allows pending requests to
// complete, while no new work is accepted.
server.Shutdown(context.Background())

h2cServer.Shutdown(context.Background())
adminServer.Shutdown(context.Background())
os.Exit(0)
}()
http.HandleFunc("/", handler)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this because we're setting Handler in the server initialization now, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

go server.ListenAndServe()

go h2cServer.ListenAndServe()
setupAdminHandlers(adminServer)
}
19 changes: 18 additions & 1 deletion config/400-activator-service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
apiVersion: v1
kind: Service
metadata:
name: activator-service
name: activator-http-service
namespace: knative-serving-system
labels:
app: activator
Expand All @@ -28,3 +28,20 @@ spec:
port: 80
targetPort: 8080
type: NodePort
---
apiVersion: v1
kind: Service
metadata:
name: activator-grpc-service
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a second service and app here still?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do need a second service while we are still using Istio (v1alpha1). If the backend service port name does not match the servicePort on the ingress using the protocol convention, traffic won't be able to proxied through to the revision.

The app should be the same activator app; I must have missed it during the knative renaming madness.

namespace: knative-serving-system
labels:
app: activator
spec:
selector:
app: activator
ports:
- name: grpc
protocol: TCP
port: 80
targetPort: 8080
type: NodePort
8 changes: 8 additions & 0 deletions docs/spec/spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ spec:
livenessProbe: ... # Optional
readinessProbe: ... # Optional

# +optional protcol (http or grpc). Defaults to http.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of having "protocol" here? Is the concern libraries which only support h2/h2c, and not HTTP/1.1?

From my reading of the gRPC protocol, it looks like it should be possible for general HTTP/2 and gRPC to operate on the same server.

Copy link
Contributor

@bsnchan bsnchan Jun 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is due to the named ports requirement in Istio 0.7. We couldn't use http2 since that assumes that the traffic is encrypted.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my other comment:

FWIW, the Istio docs on the port naming restrictions for ingress seem to have been removed between 0.7 and 0.8. I'm not sure why earlier versions of Istio had this restriction (digging, it appears to have been copied from the pod injection docs).

FWIW, I'm still hunting for time to get an SSL cert set up on my Istio cluster to verify that Istio can do https ingress --> h2c pod. I think this should work from my reading of the docs, but I need to verify.

protocol: ...
# +optional concurrency strategy. Defaults to Multi.
concurrencyModel: ...
# +optional. max time the instance is allowed for responding to a request
Expand Down Expand Up @@ -256,6 +258,10 @@ spec:
# scaling to/from 0.
servingState: Active | Reserve | Retired

# The protocol used for this Revision. This will define the Istio ingress spec
# and service spec required for this Revision.
protocol: grpc | http
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tl;dr It feels to me like this belongs on Route not Revision.

cc @vaikas-google @evankanderson @cooperneil

I have a hard time imagining a use-case where a Route splits traffic over multiple Revisions with different protocols, which is reflected in some of the getProtocolFromTrafficTargets logic and TODOs that need to cope with the potential heterogeneity here. Let's just not allow it.

I believe the main reason this is needed on Revision in its current form is because you make use of it in ela-queue to proxy arbitrary TCP traffic. Two questions:

  1. Can we switch completely to tcpproxy thereby eliminating the need for the config @ Revision level?
  2. How does this codepath expose auto-scaling metrics (the core purpose of ela-queue today)?

I suspect we also need to change the activator in a similar manner. cc @josephburnett

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dprotaso and I struggled with where the protocol belongs ourselves and decided to specify the protocol on the Revision and open it up for discussion.

Both the route controller and revision controller need to be aware of the protocol due to the following:

  • The route controller needs to create the appropriate Istio ingress and route service with the protocol specified
  • The revision controller also needs to create the revision service with the servicePort name specified as the protocol

Istio docs that explains it here

We are currently in the process of removing the need for the protocol in the ela-queue container by utilizing a h2cserver that can proxy both h2c and http traffic based on the request protocol. The reason why we couldn't use a tcp proxy was because we wouldn't be able to collect request statistics for autoscaling.

We're also in the process of changing the activator to utilize a h2c server as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI the Envoy in the istio sidecar collects request statistics similar to the ones ela-queue collects. Using them hasn't been a high priority yet, but if the queue-proxy is a blocker that could change. It's unclear whether they can fully replace the queue proxy stats collection.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, the Istio docs on the port naming restrictions for ingress seem to have been removed between 0.7 and 0.8. I'm not sure why earlier versions of Istio had this restriction (digging, it appears to have been copied from the pod injection docs).


# Some function or server frameworks or application code may be written to
# expect that each request will be granted a single-tenant process to run
# (i.e. that the request code is run single-threaded).
Expand Down Expand Up @@ -331,6 +337,7 @@ spec: # One of "runLatest" or "pinned"
- ...
livenessProbe: ... # Optional
readinessProbe: ... # Optional
protocol: ...
concurrencyModel: ...
timeoutSeconds: ...
serviceAccountName: ... # Name of the service account the code should run as
Expand All @@ -353,6 +360,7 @@ spec: # One of "runLatest" or "pinned"
- ...
livenessProbe: ... # Optional
readinessProbe: ... # Optional
protocol: ...
concurrencyModel: ...
timeoutSeconds: ...
serviceAccountName: ... # Name of the service account the code should run as
Expand Down
15 changes: 15 additions & 0 deletions pkg/apis/serving/v1alpha1/revision_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,17 @@ const (
RevisionRequestConcurrencyModelMulti RevisionRequestConcurrencyModelType = "Multi"
)

type RevisionProtocolType string

const (
// RevisionProtocolHTTP specifies that the container will
// exclusively receive HTTP traffic.
RevisionProtocolHTTP RevisionProtocolType = "http"
// RevisionProtocolGRPC specifies that the container will
// exclusively receive gRPC traffic.
RevisionProtocolGRPC RevisionProtocolType = "grpc"
)

// RevisionSpec holds the desired state of the Revision (from the client).
type RevisionSpec struct {
// TODO: Generation does not work correctly with CRD. They are scrubbed
Expand All @@ -112,6 +123,10 @@ type RevisionSpec struct {
// +optional
ConcurrencyModel RevisionRequestConcurrencyModelType `json:"concurrencyModel,omitempty"`

// Protocol specifies the exclusive network protocol
// for the Revision. Defaults to HTTP.
Protocol RevisionProtocolType `json:"protocol,omitempty"`

// ServiceAccountName holds the name of the Kubernetes service account
// as which the underlying K8s resources should be run. If unspecified
// this will default to the "default" service account for the namespace
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controller

import (
"context"
"fmt"

"go.uber.org/zap"

Expand Down Expand Up @@ -80,8 +81,8 @@ func GetServiceRouteName(u *v1alpha1.Service) string {
return u.Name
}

func GetServingK8SActivatorServiceName() string {
return "activator-service"
func GetServingK8SActivatorServiceName(protocol v1alpha1.RevisionProtocolType) string {
return fmt.Sprintf("activator-%s-service", protocol)
}

func GetRevisionHeaderName() string {
Expand Down
28 changes: 28 additions & 0 deletions pkg/controller/revision/revision_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"fmt"
"reflect"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -88,6 +89,7 @@ func getTestRevision() *v1alpha1.Revision {
UID: "test-rev-uid",
},
Spec: v1alpha1.RevisionSpec{
Protocol: v1alpha1.RevisionProtocolHTTP,
// corev1.Container has a lot of setting. We try to pass many
// of them here to verify that we pass through the settings to
// derived objects.
Expand Down Expand Up @@ -634,6 +636,32 @@ func TestCreateRevCreatesStuff(t *testing.T) {
}
}

func TestCreateRevisionWithGRPCProtocol(t *testing.T) {
kubeClient, _, elaClient, controller, _, _, elaInformer, _ := newTestController(t)

config := getTestConfiguration()
config.Spec.RevisionTemplate.Spec.Protocol = v1alpha1.RevisionProtocolGRPC

rev := getTestRevision()
rev.Spec.Protocol = v1alpha1.RevisionProtocolGRPC

createRevision(elaClient, elaInformer, controller, rev)

expectedServiceName := fmt.Sprintf("%s-service", rev.Name)
service, err := kubeClient.CoreV1().Services(testNamespace).Get(expectedServiceName, metav1.GetOptions{})
if err != nil {
t.Fatalf("Couldn't get revision service: %v", err)
}

for _, port := range service.Spec.Ports {
servicePortName := port.Name
if strings.HasPrefix(servicePortName, string(v1alpha1.RevisionProtocolHTTP)) {
t.Errorf("Revision service port name does not match protocol: expected %v got %v.",
v1alpha1.RevisionProtocolHTTP, servicePortName)
}
}
}

type errorResolver struct {
error string
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/controller/revision/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
)

var httpServicePortName = "http"
var servicePort = 80

// MakeRevisionK8sService creates a Service that targets all pods with the same
Expand All @@ -44,7 +43,7 @@ func MakeRevisionK8sService(rev *v1alpha1.Revision) *corev1.Service {
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: httpServicePortName,
Name: string(rev.Spec.Protocol),
Port: int32(servicePort),
TargetPort: intstr.IntOrString{Type: intstr.String, StrVal: queue.RequestQueuePortName},
},
Expand Down
Loading