Skip to content

Commit

Permalink
feat: combine xds-translator and xds-server runners
Browse files Browse the repository at this point in the history
* The combined runner is called `xds`
* This should eliminate one layer of caching resulting
in mem savings

Fixes: envoyproxy#3980

Signed-off-by: Arko Dasgupta <[email protected]>
  • Loading branch information
arkodg committed Sep 24, 2024
1 parent bd966b8 commit f9adcd8
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 638 deletions.
7 changes: 2 additions & 5 deletions api/v1alpha1/envoygateway_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,8 @@ const (
// LogComponentGatewayAPIRunner defines the "gateway-api" runner component.
LogComponentGatewayAPIRunner EnvoyGatewayLogComponent = "gateway-api"

// LogComponentXdsTranslatorRunner defines the "xds-translator" runner component.
LogComponentXdsTranslatorRunner EnvoyGatewayLogComponent = "xds-translator"

// LogComponentXdsServerRunner defines the "xds-server" runner component.
LogComponentXdsServerRunner EnvoyGatewayLogComponent = "xds-server"
// LogComponentXdsRunner defines the "xds" runner component.
LogComponentXdsRunner EnvoyGatewayLogComponent = "xds"

// LogComponentInfrastructureRunner defines the "infrastructure" runner component.
LogComponentInfrastructureRunner EnvoyGatewayLogComponent = "infrastructure"
Expand Down
3 changes: 1 addition & 2 deletions api/v1alpha1/validation/envoygateway_validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ func validateEnvoyGatewayLogging(logging *egv1a1.EnvoyGatewayLogging) error {
case egv1a1.LogComponentGatewayDefault,
egv1a1.LogComponentProviderRunner,
egv1a1.LogComponentGatewayAPIRunner,
egv1a1.LogComponentXdsTranslatorRunner,
egv1a1.LogComponentXdsServerRunner,
egv1a1.LogComponentXdsRunner,
egv1a1.LogComponentInfrastructureRunner,
egv1a1.LogComponentGlobalRateLimitRunner:
switch logLevel {
Expand Down
26 changes: 6 additions & 20 deletions internal/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import (
"github.com/envoyproxy/gateway/internal/message"
"github.com/envoyproxy/gateway/internal/metrics"
providerrunner "github.com/envoyproxy/gateway/internal/provider/runner"
xdsserverrunner "github.com/envoyproxy/gateway/internal/xds/server/runner"
xdstranslatorrunner "github.com/envoyproxy/gateway/internal/xds/translator/runner"
xdsrunner "github.com/envoyproxy/gateway/internal/xds/runner"
)

// cfgPath is the path to the EnvoyGateway configuration file.
Expand Down Expand Up @@ -154,18 +153,17 @@ func setupRunners(cfg *config.Server) (err error) {
return err
}

xds := new(message.Xds)
// Start the Xds Translator Service
// It subscribes to the xdsIR, translates it into xds Resources and publishes it.
// Start the Xds Service
// It subscribes to the xdsIR, translates it into xds Resources and
// updates the xds control plane cache.
// It also computes the EnvoyPatchPolicy statuses and publishes it.
xdsTranslatorRunner := xdstranslatorrunner.New(&xdstranslatorrunner.Config{
xdsRunner := xdsrunner.New(&xdsrunner.Config{
Server: *cfg,
XdsIR: xdsIR,
Xds: xds,
ExtensionManager: extMgr,
ProviderResources: pResources,
})
if err = xdsTranslatorRunner.Start(ctx); err != nil {
if err = xdsRunner.Start(ctx); err != nil {
return err
}

Expand All @@ -180,17 +178,6 @@ func setupRunners(cfg *config.Server) (err error) {
return err
}

// Start the xDS Server
// It subscribes to the xds Resources and configures the remote Envoy Proxy
// via the xDS Protocol.
xdsServerRunner := xdsserverrunner.New(&xdsserverrunner.Config{
Server: *cfg,
Xds: xds,
})
if err = xdsServerRunner.Start(ctx); err != nil {
return err
}

// Start the global rateLimit if it has been enabled through the config
if cfg.EnvoyGateway.RateLimit != nil {
// Start the Global RateLimit xDS Server
Expand All @@ -210,7 +197,6 @@ func setupRunners(cfg *config.Server) (err error) {
pResources.Close()
xdsIR.Close()
infraIR.Close()
xds.Close()

cfg.Logger.Info("shutting down")

Expand Down
6 changes: 0 additions & 6 deletions internal/message/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1"
"github.com/envoyproxy/gateway/internal/gatewayapi/resource"
"github.com/envoyproxy/gateway/internal/ir"
xdstypes "github.com/envoyproxy/gateway/internal/xds/types"
)

// ProviderResources message
Expand Down Expand Up @@ -131,8 +130,3 @@ type XdsIR struct {
type InfraIR struct {
watchable.Map[string, *ir.Infra]
}

// Xds message
type Xds struct {
watchable.Map[string, *xdstypes.ResourceVersionTable]
}
146 changes: 109 additions & 37 deletions internal/xds/server/runner/runner.go → internal/xds/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"fmt"
"net"
"os"
"reflect"
"strconv"
"time"

Expand All @@ -27,13 +28,17 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
ktypes "k8s.io/apimachinery/pkg/types"

egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1"
"github.com/envoyproxy/gateway/internal/envoygateway/config"
extension "github.com/envoyproxy/gateway/internal/extension/types"
"github.com/envoyproxy/gateway/internal/infrastructure/kubernetes/ratelimit"
"github.com/envoyproxy/gateway/internal/ir"
"github.com/envoyproxy/gateway/internal/message"
"github.com/envoyproxy/gateway/internal/xds/bootstrap"
"github.com/envoyproxy/gateway/internal/xds/cache"
xdstypes "github.com/envoyproxy/gateway/internal/xds/types"
"github.com/envoyproxy/gateway/internal/xds/translator"
)

const (
Expand All @@ -52,9 +57,11 @@ const (

type Config struct {
config.Server
Xds *message.Xds
grpc *grpc.Server
cache cache.SnapshotCacheWithCallbacks
XdsIR *message.XdsIR
ExtensionManager extension.Manager
ProviderResources *message.ProviderResources
grpc *grpc.Server
cache cache.SnapshotCacheWithCallbacks
}

type Runner struct {
Expand All @@ -66,10 +73,10 @@ func New(cfg *Config) *Runner {
}

func (r *Runner) Name() string {
return string(egv1a1.LogComponentXdsServerRunner)
return string(egv1a1.LogComponentXdsRunner)
}

// Start starts the xds-server runner
// Start starts the xds runner
func (r *Runner) Start(ctx context.Context) (err error) {
r.Logger = r.Logger.WithName(r.Name()).WithValues("runner", r.Name())

Expand All @@ -88,12 +95,107 @@ func (r *Runner) Start(ctx context.Context) (err error) {
// Start and listen xDS gRPC Server.
go r.serveXdsServer(ctx)

// Start message Subscription.
go r.subscribeAndTranslate(ctx)
r.Logger.Info("started")
return
}

func (r *Runner) subscribeAndTranslate(ctx context.Context) {
// Subscribe to resources
message.HandleSubscription(message.Metadata{Runner: string(egv1a1.LogComponentXdsRunner), Message: "xds-ir"}, r.XdsIR.Subscribe(ctx),
func(update message.Update[string, *ir.Xds], errChan chan error) {
r.Logger.Info("received an update")
key := update.Key
val := update.Value

if update.Delete {
err := r.cache.GenerateNewSnapshot(key, nil)
if err != nil {
r.Logger.Error(err, "failed to clear snapshot")
errChan <- err
}
} else {
// Translate to xds resources
t := &translator.Translator{
FilterOrder: val.FilterOrder,
}

// Set the extension manager if an extension is loaded
if r.ExtensionManager != nil {
t.ExtensionManager = &r.ExtensionManager
}

// Set the rate limit service URL if global rate limiting is enabled.
if r.EnvoyGateway.RateLimit != nil {
t.GlobalRateLimit = &translator.GlobalRateLimitSettings{
ServiceURL: ratelimit.GetServiceURL(r.Namespace, r.DNSDomain),
FailClosed: r.EnvoyGateway.RateLimit.FailClosed,
}
if r.EnvoyGateway.RateLimit.Timeout != nil {
t.GlobalRateLimit.Timeout = r.EnvoyGateway.RateLimit.Timeout.Duration
}
}

result, err := t.Translate(val)
if err != nil {
r.Logger.Error(err, "failed to translate xds ir")
errChan <- err
}

// xDS translation is done in a best-effort manner, so the result
// may contain partial resources even if there are errors.
if result == nil || result.XdsResources == nil {
r.Logger.Info("no xds resources to publish")
return
}

if r.cache == nil {
r.Logger.Error(err, "failed to init snapshot cache")
errChan <- err
} else {
// Update snapshot cache
err = r.cache.GenerateNewSnapshot(key, result.XdsResources)
if err != nil {
r.Logger.Error(err, "failed to generate a snapshot")
errChan <- err
}
}

// Get all status keys from watchable and save them in the map statusesToDelete.
// Iterating through result.EnvoyPatchPolicyStatuses, any valid keys will be removed from statusesToDelete.
// Remaining keys will be deleted from watchable before we exit this function.
statusesToDelete := make(map[ktypes.NamespacedName]bool)
for key := range r.ProviderResources.EnvoyPatchPolicyStatuses.LoadAll() {
statusesToDelete[key] = true
}

// Publish EnvoyPatchPolicyStatus
for _, e := range result.EnvoyPatchPolicyStatuses {
key := ktypes.NamespacedName{
Name: e.Name,
Namespace: e.Namespace,
}
// Skip updating status for policies with empty status
// They may have been skipped in this translation because
// their target is not found (not relevant)
if !(reflect.ValueOf(e.Status).IsZero()) {
r.ProviderResources.EnvoyPatchPolicyStatuses.Store(key, e.Status)
}
delete(statusesToDelete, key)
}
// Discard the EnvoyPatchPolicyStatuses to reduce memory footprint
result.EnvoyPatchPolicyStatuses = nil

// Delete all the deletable status keys
for key := range statusesToDelete {
r.ProviderResources.EnvoyPatchPolicyStatuses.Delete(key)
}
}
},
)
r.Logger.Info("subscriber shutting down")
}

func (r *Runner) serveXdsServer(ctx context.Context) {
addr := net.JoinHostPort(XdsServerAddress, strconv.Itoa(bootstrap.DefaultXdsServerPort))
l, err := net.Listen("tcp", addr)
Expand Down Expand Up @@ -130,36 +232,6 @@ func registerServer(srv serverv3.Server, g *grpc.Server) {
runtimev3.RegisterRuntimeDiscoveryServiceServer(g, srv)
}

func (r *Runner) subscribeAndTranslate(ctx context.Context) {
// Subscribe to resources
message.HandleSubscription(message.Metadata{Runner: string(egv1a1.LogComponentXdsServerRunner), Message: "xds"}, r.Xds.Subscribe(ctx),
func(update message.Update[string, *xdstypes.ResourceVersionTable], errChan chan error) {
key := update.Key
val := update.Value

r.Logger.Info("received an update")
var err error
if update.Delete {
err = r.cache.GenerateNewSnapshot(key, nil)
} else if val != nil && val.XdsResources != nil {
if r.cache == nil {
r.Logger.Error(err, "failed to init snapshot cache")
errChan <- err
} else {
// Update snapshot cache
err = r.cache.GenerateNewSnapshot(key, val.XdsResources)
}
}
if err != nil {
r.Logger.Error(err, "failed to generate a snapshot")
errChan <- err
}
},
)

r.Logger.Info("subscriber shutting down")
}

func (r *Runner) tlsConfig(cert, key, ca string) *tls.Config {
loadConfig := func() (*tls.Config, error) {
cert, err := tls.LoadX509KeyPair(cert, key)
Expand Down
Loading

0 comments on commit f9adcd8

Please sign in to comment.