Skip to content

Commit

Permalink
Add xds-server implementation (#176)
Browse files Browse the repository at this point in the history
* First cut

Signed-off-by: Nick Young <[email protected]>

* Working test xds server

Signed-off-by: Nick Young <[email protected]>

* Complete move from pkg to internal

Signed-off-by: Nick Young <[email protected]>

* Move to logr wrapper for logging

Signed-off-by: Nick Young <[email protected]>

* Renamed CacheVersion to ResourceVersionTable and added comments

Signed-off-by: Nick Young <[email protected]>

* Fix linting

Signed-off-by: Nick Young <[email protected]>

* Fix go.mod

Signed-off-by: Nick Young <[email protected]>

* Fix go.mod and go.sum again

Signed-off-by: Nick Young <[email protected]>

* Fix PR comments

Signed-off-by: Nick Young <[email protected]>

* Added mutex for snapshotcache and made nodeID handling more resilient

Signed-off-by: Nick Young <[email protected]>

* Resolve PR comments

Signed-off-by: Nick Young <[email protected]>
  • Loading branch information
Nick Young authored Jul 29, 2022
1 parent 7246e3a commit e20d7ad
Show file tree
Hide file tree
Showing 11 changed files with 663 additions and 52 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/spf13/cobra v1.4.0
github.com/stretchr/testify v1.7.1
go.uber.org/zap v1.19.1
google.golang.org/grpc v1.45.0
k8s.io/api v0.24.2
k8s.io/apimachinery v0.24.2
k8s.io/client-go v0.24.2
Expand Down Expand Up @@ -72,6 +73,7 @@ require (
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220329172620-7be39ac1afc7 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,7 @@ google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxH
google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20220107163113-42d7afdf6368/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20220329172620-7be39ac1afc7 h1:HOL66YCI20JvN2hVk6o2YIp9i/3RvzVUz82PqNr7fXw=
google.golang.org/genproto v0.0.0-20220329172620-7be39ac1afc7/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
Expand All @@ -893,6 +894,7 @@ google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQ
google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
google.golang.org/grpc v1.45.0 h1:NEpgUqV3Z+ZjkqMsxMg11IaDrXY4RY6CQukSGK0uI1M=
google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
Expand Down
1 change: 1 addition & 0 deletions internal/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func GetRootCommand() *cobra.Command {
}

cmd.AddCommand(getServerCommand())
cmd.AddCommand(getxDSTestCommand())

return cmd
}
237 changes: 237 additions & 0 deletions internal/cmd/xdstest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
package cmd

import (
"net"
"time"

"github.com/envoyproxy/gateway/internal/ir"
"github.com/envoyproxy/gateway/internal/log"
"github.com/envoyproxy/gateway/internal/xds/cache"
"github.com/envoyproxy/gateway/internal/xds/translator"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"

controlplane_service_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/service/cluster/v3"
controlplane_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
controlplane_service_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3"
controlplane_service_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/service/listener/v3"
controlplane_service_route_v3 "github.com/envoyproxy/go-control-plane/envoy/service/route/v3"
controlplane_service_runtime_v3 "github.com/envoyproxy/go-control-plane/envoy/service/runtime/v3"
controlplane_service_secret_v3 "github.com/envoyproxy/go-control-plane/envoy/service/secret/v3"
controlplane_server_v3 "github.com/envoyproxy/go-control-plane/pkg/server/v3"
)

// The xdstest command is intended just to show how updating the IR can produce different
// xDS output, including showing that Delta xDS works.
// You'll need an xDS probe like the `contour cli` command to check.
//
// It's also intended that this get removed once we have a full loop implemented in
// `gateway serve`.

// getServerCommand returns the server cobra command to be executed.
func getxDSTestCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "xdstest",
Aliases: []string{"xdstest"},
Short: "Run a test xDS server",
RunE: func(cmd *cobra.Command, args []string) error {
return xDSTest()
},
}
return cmd
}

// xDSTest implements the command.
// This is deliberately verbose and unoptimized, since the purpose
// is just to illustrate how the flow will need to work.
func xDSTest() error {

// Grab the logr.Logger.
logger, err := log.NewLogger()
if err != nil {
return err
}

// Set the logr Logger to debug.
// zap's logr impl requires negative levels.
logger = logger.V(-2)

ctx := signals.SetupSignalHandler()

logger.Info("Starting xDS Tester service")
defer logger.Info("Stopping xDS Tester service")

// Create three IR versions that we'll swap between, to
// generate xDS updates for the various methods.
ir1 := &ir.Xds{
HTTP: []*ir.HTTPListener{
{
Name: "first-listener",
Address: "0.0.0.0",
Port: 10080,
Hostnames: []string{
"*",
},
Routes: []*ir.HTTPRoute{
{
Name: "first-route",
Destinations: []*ir.RouteDestination{
{
Host: "1.2.3.4",
Port: 50000,
},
},
},
},
},
},
}

ir2 := &ir.Xds{
HTTP: []*ir.HTTPListener{
{
Name: "first-listener",
Address: "0.0.0.0",
Port: 10080,
Hostnames: []string{
"*",
},
Routes: []*ir.HTTPRoute{
{
Name: "second-route",
Destinations: []*ir.RouteDestination{
{
Host: "1.2.3.4",
Port: 50000,
},
},
},
},
},
},
}

ir3 := &ir.Xds{
HTTP: []*ir.HTTPListener{
{
Name: "second-listener",
Address: "0.0.0.0",
Port: 10080,
Hostnames: []string{
"*",
},
Routes: []*ir.HTTPRoute{
{
Name: "second-route",
Destinations: []*ir.RouteDestination{
{
Host: "1.2.3.4",
Port: 50000,
},
},
},
},
},
},
}

// Now, we do the translation because everything is static.
// Normally, we'd do this in response to updates on the
// message bus.
cacheVersion1, err := translator.TranslateXDSIR(ir1)
if err != nil {
return err
}

cacheVersion2, err := translator.TranslateXDSIR(ir2)
if err != nil {
return err
}

cacheVersion3, err := translator.TranslateXDSIR(ir3)
if err != nil {
return err
}

// Set up the gRPC server and register the xDS handler.
g := grpc.NewServer()

snapCache := cache.NewSnapshotCache(false, logger)
RegisterServer(controlplane_server_v3.NewServer(ctx, snapCache, snapCache), g)

addr := net.JoinHostPort("0.0.0.0", "8001")
l, err := net.Listen("tcp", addr)
if err != nil {
return err
}

// Handle the signals and stop when the signal context does.
go func() {
<-ctx.Done()

// We don't use GracefulStop here because envoy
// has long-lived hanging xDS requests. There's no
// mechanism to make those pending requests fail,
// so we forcibly terminate the TCP sessions.
g.Stop()
}()

// Loop through the various configs, updating the SnapshotCache
// each time. This will run until the process is killed by signal
// (SIGINT, SIGKILL etc).
go func() {
// This little function sleeps 10 seconds then swaps
// between various versions of the IR
logger.Info("Sleeping for a bit before updating the cache")
for {
time.Sleep(10 * time.Second)
logger.Info("Updating the cache for first-listener with first-route")
err := snapCache.GenerateNewSnapshot(cacheVersion1.GetXdsResources())
if err != nil {
logger.Error(err, "Something went wrong with generating a snapshot")
}
time.Sleep(10 * time.Second)
logger.Info("Updating the cache for first-listener with second-route")
err = snapCache.GenerateNewSnapshot(cacheVersion2.GetXdsResources())
if err != nil {
logger.Error(err, "Something went wrong with generating a snapshot")
}
time.Sleep(10 * time.Second)
logger.Info("Updating the cache for second-listener with second-route")
err = snapCache.GenerateNewSnapshot(cacheVersion3.GetXdsResources())
if err != nil {
logger.Error(err, "Something went wrong with generating a snapshot")
}
}
}()

return g.Serve(l)

}

// Some helper stuff that we'll need to put somewhere eventually.

// Server is a collection of handlers for streaming discovery requests.
type Server interface {
controlplane_service_cluster_v3.ClusterDiscoveryServiceServer
controlplane_service_endpoint_v3.EndpointDiscoveryServiceServer
controlplane_service_listener_v3.ListenerDiscoveryServiceServer
controlplane_service_route_v3.RouteDiscoveryServiceServer
controlplane_service_discovery_v3.AggregatedDiscoveryServiceServer
controlplane_service_secret_v3.SecretDiscoveryServiceServer
controlplane_service_runtime_v3.RuntimeDiscoveryServiceServer
}

// RegisterServer registers the given xDS protocol Server with the gRPC
// runtime.
func RegisterServer(srv Server, g *grpc.Server) {
// register services
controlplane_service_discovery_v3.RegisterAggregatedDiscoveryServiceServer(g, srv)
controlplane_service_secret_v3.RegisterSecretDiscoveryServiceServer(g, srv)
controlplane_service_cluster_v3.RegisterClusterDiscoveryServiceServer(g, srv)
controlplane_service_endpoint_v3.RegisterEndpointDiscoveryServiceServer(g, srv)
controlplane_service_listener_v3.RegisterListenerDiscoveryServiceServer(g, srv)
controlplane_service_route_v3.RegisterRouteDiscoveryServiceServer(g, srv)
controlplane_service_runtime_v3.RegisterRuntimeDiscoveryServiceServer(g, srv)
}
47 changes: 47 additions & 0 deletions internal/xds/cache/logrwrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package cache

import (
"fmt"

"github.com/go-logr/logr"
)

// LogrWrapper is a nasty hack for turning the logr.Logger we get from NewLogger()
// into something that go-control-plane can accept.
// It seems pretty silly to take a zap logger, which is levelled, turn it into
// a V-style logr Logger, then turn it back again with this, but here we are.
// TODO(youngnick): Reopen the logging library discussion then do something about this.
type LogrWrapper struct {
logr logr.Logger
}

const LevelDebug int = -2
const LevelInfo int = 0
const LevelWarn int = -1

func (l LogrWrapper) Debugf(template string, args ...interface{}) {

l.logr.V(LevelDebug).Info(fmt.Sprintf(template, args...))
}

func (l LogrWrapper) Infof(template string, args ...interface{}) {

l.logr.V(LevelInfo).Info(fmt.Sprintf(template, args...))
}

func (l LogrWrapper) Warnf(template string, args ...interface{}) {

l.logr.V(LevelWarn).Info(fmt.Sprintf(template, args...))
}

func (l LogrWrapper) Errorf(template string, args ...interface{}) {

l.logr.Error(fmt.Errorf(template, args...), "")
}

func NewLogrWrapper(log logr.Logger) *LogrWrapper {

return &LogrWrapper{
logr: log,
}
}
Loading

0 comments on commit e20d7ad

Please sign in to comment.