Skip to content

Commit

Permalink
cmd/contour: replace current xds server with go-control-plane impl
Browse files Browse the repository at this point in the history
Fixes projectcontour#2134 by implementing the Envoy go-control-plane to replace Contour's current custom xDS gRPC server.

The change utilizes snapshots as a way to represent a versioned point in time representation of the xDS
resources (RDS, CDS, EDS, LDS, SDS). When the dag is rebuilt or an endpoint changes, a new snapshot is
created with the updated caches for each xDS resource type.

Signed-off-by: Steve Sloka <[email protected]>
  • Loading branch information
stevesloka committed Aug 3, 2020
1 parent bbfa70e commit b22f55a
Show file tree
Hide file tree
Showing 42 changed files with 1,446 additions and 3,193 deletions.
53 changes: 35 additions & 18 deletions cmd/contour/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ import (
"syscall"
"time"

v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/projectcontour/contour/internal/envoy"

xds "github.com/envoyproxy/go-control-plane/pkg/cache/types"

"github.com/envoyproxy/go-control-plane/pkg/cache/v2"
projectcontourv1alpha1 "github.com/projectcontour/contour/apis/projectcontour/v1alpha1"
"github.com/projectcontour/contour/internal/annotation"
"github.com/projectcontour/contour/internal/contour"
Expand Down Expand Up @@ -174,7 +180,9 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
return err
}

listenerConfig := contour.ListenerVisitorConfig{
statsListener := envoy.StatsListener(ctx.statsAddr, ctx.statsPort)

listenerConfig := contour.ListenerConfig{
UseProxyProto: ctx.useProxyProto,
HTTPAddress: ctx.httpAddr,
HTTPPort: ctx.httpPort,
Expand All @@ -190,6 +198,9 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
StreamIdleTimeout: timeout.Parse(ctx.StreamIdleTimeout),
MaxConnectionDuration: timeout.Parse(ctx.MaxConnectionDuration),
ConnectionShutdownGracePeriod: timeout.Parse(ctx.ConnectionShutdownGracePeriod),
StaticListeners: map[string]*v2.Listener{
statsListener.Name: statsListener,
},
}

defaultHTTPVersions, err := parseDefaultHTTPVersions(ctx.DefaultHTTPVersions)
Expand All @@ -201,12 +212,8 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {

// step 3. build our mammoth Kubernetes event handler.
eventHandler := &contour.EventHandler{
CacheHandler: &contour.CacheHandler{
ListenerVisitorConfig: listenerConfig,
ListenerCache: contour.NewListenerCache(ctx.statsAddr, ctx.statsPort),
FieldLogger: log.WithField("context", "CacheHandler"),
Metrics: metrics.NewMetrics(registry),
},
ListenerConfig: listenerConfig,
Metrics: metrics.NewMetrics(registry),
HoldoffDelay: 100 * time.Millisecond,
HoldoffMaxDelay: 500 * time.Millisecond,
Builder: dag.Builder{
Expand Down Expand Up @@ -269,12 +276,28 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
informerSyncList.InformOnResources(clusterInformerFactory, dynamicHandler, k8s.SecretsResources()...)
}

// snapshotCache is used to store the state of what all xDS services should
// contain at any given point in time.
snapshotCache := cache.NewSnapshotCache(false, contour.DefaultHash,
log.WithField("context", "xDS"))

// snapshotHandler is used to produce new snapshots when the internal state changes for any xDS resource.
snapshotHandler := contour.NewSnapshotHandler(snapshotCache, log.WithField("context", "snapshotHandler"))

// step 5. endpoints updates are handled directly by the EndpointsTranslator
// due to their high update rate and their orthogonal nature.
et := &contour.EndpointsTranslator{
FieldLogger: log.WithField("context", "endpointstranslator"),
SnapshotHandler: snapshotHandler,
FieldLogger: log.WithField("context", "endpointstranslator"),
}

eventHandler.SnapshotHandler = snapshotHandler

// Trigger an initial snapshot for static resources
snapshotHandler.UpdateSnapshot(map[xds.ResponseType][]xds.Resource{
xds.Listener: eventHandler.ListenerConfig.StaticListenersProto(),
})

informerSyncList.InformOnResources(clusterInformerFactory,
&k8s.DynamicClientHandler{
Next: &contour.EventRecorder{
Expand Down Expand Up @@ -401,15 +424,9 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
}
log.Printf("informer caches synced")

resources := map[string]cgrpc.Resource{
eventHandler.CacheHandler.ClusterCache.TypeURL(): &eventHandler.CacheHandler.ClusterCache,
eventHandler.CacheHandler.RouteCache.TypeURL(): &eventHandler.CacheHandler.RouteCache,
eventHandler.CacheHandler.ListenerCache.TypeURL(): &eventHandler.CacheHandler.ListenerCache,
eventHandler.CacheHandler.SecretCache.TypeURL(): &eventHandler.CacheHandler.SecretCache,
et.TypeURL(): et,
}
opts := ctx.grpcOptions()
s := cgrpc.NewAPI(log, resources, registry, opts...)
grpcServer := cgrpc.NewAPI(registry, snapshotCache, opts...)

addr := net.JoinHostPort(ctx.xdsAddr, strconv.Itoa(ctx.xdsPort))
l, err := net.Listen("tcp", addr)
if err != nil {
Expand All @@ -426,10 +443,10 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {

go func() {
<-stop
s.Stop()
grpcServer.Stop()
}()

return s.Serve(l)
return grpcServer.Serve(l)
})

// step 14. Setup SIGTERM handler
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4
github.com/prometheus/common v0.6.0
github.com/sirupsen/logrus v1.4.2
github.com/stretchr/testify v1.4.0
golang.org/x/tools v0.0.0-20190929041059-e7abfedfabcf // indirect
google.golang.org/grpc v1.25.1
gopkg.in/alecthomas/kingpin.v2 v2.2.6
Expand Down
28 changes: 28 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkg
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.5 h1:lRJIqDD8yjV1YyPRqecMdytjDLs2fTXq363aCib5xPU=
github.com/envoyproxy/go-control-plane v0.9.5/go.mod h1:OXl5to++W0ctG+EHWTFUjiypVxC/Y4VLc/KFU+al13s=
github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A=
Expand Down Expand Up @@ -174,12 +175,22 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.1 h1:ZFgWrT+bLgsYPirOnRfKLYJLvssAegOj/hgyMFdJZe0=
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v0.0.0-20161122191042-44d81051d367/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI=
github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
Expand Down Expand Up @@ -506,12 +517,29 @@ google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRn
google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0 h1:AzbTB6ux+okLTzP8Ru1Xs41C303zdcfEht7MQnYJt5A=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1 h1:wdKvqQk7IttEw92GoRyKG2IDrUIpgpj6H6m81yfeMW0=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.0/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.20.1/go.mod h1:KqelGeouBkcbcuB3HCk4/YH2tmNLk6YSWA5LIWeI/lY=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4=
google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
22 changes: 0 additions & 22 deletions internal/assert/assert.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,7 @@ package assert
import (
"testing"

v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
)

type Assert struct {
Expand All @@ -41,8 +36,6 @@ func Equal(t *testing.T, want, got interface{}, opts ...cmp.Option) {
func (a Assert) Equal(want, got interface{}, opts ...cmp.Option) {
a.t.Helper()
opts = append(opts,
cmpopts.IgnoreFields(v2.DiscoveryResponse{}, "VersionInfo", "Nonce"),
cmpopts.AcyclicTransformer("UnmarshalAny", unmarshalAny),
// errors to be equal only if both are nil or both are non-nil.
cmp.Comparer(func(x, y error) bool {
return (x == nil) == (y == nil)
Expand All @@ -53,18 +46,3 @@ func (a Assert) Equal(want, got interface{}, opts ...cmp.Option) {
a.t.Fatal(diff)
}
}

func unmarshalAny(a *any.Any) proto.Message {
if a == nil {
return nil
}
pb, err := ptypes.Empty(a)
if err != nil {
panic(err.Error())
}
err = ptypes.UnmarshalAny(a, pb)
if err != nil {
panic(err.Error())
}
return pb
}
71 changes: 0 additions & 71 deletions internal/contour/cachehandler.go

This file was deleted.

56 changes: 8 additions & 48 deletions internal/contour/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,77 +15,37 @@ package contour

import (
"sort"
"sync"

resource "github.com/envoyproxy/go-control-plane/pkg/resource/v2"

v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/golang/protobuf/proto"
xds "github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/projectcontour/contour/internal/dag"
"github.com/projectcontour/contour/internal/envoy"
"github.com/projectcontour/contour/internal/protobuf"
"github.com/projectcontour/contour/internal/sorter"
)

// ClusterCache manages the contents of the gRPC CDS cache.
type ClusterCache struct {
mu sync.Mutex
values map[string]*v2.Cluster
Cond
}

// Update replaces the contents of the cache with the supplied map.
func (c *ClusterCache) Update(v map[string]*v2.Cluster) {
c.mu.Lock()
defer c.mu.Unlock()

c.values = v
c.Cond.Notify()
}

// Contents returns a copy of the cache's contents.
func (c *ClusterCache) Contents() []proto.Message {
c.mu.Lock()
defer c.mu.Unlock()
// translateClusters returns an array of CDS resources.
func translateClusters(clusters map[string]*v2.Cluster) []xds.Resource {
var values []*v2.Cluster
for _, v := range c.values {
for _, v := range clusters {
values = append(values, v)
}
sort.Stable(sorter.For(values))
return protobuf.AsMessages(values)
}

func (c *ClusterCache) Query(names []string) []proto.Message {
c.mu.Lock()
defer c.mu.Unlock()
var values []*v2.Cluster
for _, n := range names {
// if the cluster is not registered we cannot return
// a blank cluster because each cluster has a required
// discovery type; DNS, EDS, etc. We cannot determine the
// correct value for this property from the cluster's name
// provided by the query so we must not return a blank cluster.
if v, ok := c.values[n]; ok {
values = append(values, v)
}
}
sort.Stable(sorter.For(values))
return protobuf.AsMessages(values)
}

func (*ClusterCache) TypeURL() string { return resource.ClusterType }

type clusterVisitor struct {
clusters map[string]*v2.Cluster
}

// visitCluster produces a map of *v2.Clusters.
func visitClusters(root dag.Vertex) map[string]*v2.Cluster {
// visitCluster produces an array of xds.Resources
func visitClusters(root dag.Vertex) []xds.Resource {
cv := clusterVisitor{
clusters: make(map[string]*v2.Cluster),
}
cv.visit(root)
return cv.clusters

return translateClusters(cv.clusters)
}

func (v *clusterVisitor) visit(vertex dag.Vertex) {
Expand Down
Loading

0 comments on commit b22f55a

Please sign in to comment.