Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(EG K8S Provider): Improve EG Gateway xDS & startup reliability #2918

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
b5bbe08
bug: add h3 alpn by default if http3 is enabled (#2887)
arkodg Mar 12, 2024
134fd30
feat(EG K8S Provider): Improve EG Gateway xDS & startup reliability
alexwo Mar 13, 2024
55ab7d4
fixing k8s provider test
alexwo Mar 13, 2024
45353db
fix: prevent policies targeting non-TLS listeners on the same port fr…
liorokman Mar 12, 2024
30a2e35
chore: remove ProcessBackendTLSPoliciesAncestorRef (#2845)
zhaohuabing Mar 12, 2024
a520504
Change the Merge behavior to Replace for BackendTrafficPolicy (#2888)
zhaohuabing Mar 12, 2024
88eaef5
shutdown drainTimeout should also affect envoy drain time (#2898)
arkodg Mar 12, 2024
1209839
skip publishing empty status for policies (#2902)
arkodg Mar 13, 2024
58fe4b8
docs: multiple gatewayclass and merge gateways deployment mode (#2881)
cnvergence Mar 13, 2024
b0ce70b
feat: add PolicyStatus for ClientTrafficPolicy (#2895)
shawnh2 Mar 13, 2024
4c4a4c8
Use gwapiv1a2.PolicyStatus for SecurityPolicy Status (#2848)
zhaohuabing Mar 13, 2024
a06531e
Fix oidc doc (#2905)
zhaohuabing Mar 13, 2024
ead931a
Release v1.0 (#2909)
Xunzhuo Mar 13, 2024
9f6d02d
Merge branch 'main' into eg_startup_reliability
alexwo Mar 13, 2024
48f6c5f
adding sources test
alexwo Mar 13, 2024
011e98b
Merge branch 'main' into eg_startup_reliability
alexwo Mar 13, 2024
e70774c
Merge branch 'main' into eg_startup_reliability
alexwo Mar 14, 2024
7afa2db
Merge branch 'main' into eg_startup_reliability
alexwo Mar 21, 2024
99d254a
merge with latest main
alexwo Apr 10, 2024
1505793
Merge branch 'main' into eg_startup_reliability
alexwo May 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions internal/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/envoyproxy/gateway/internal/logging"
"github.com/envoyproxy/gateway/internal/message"
"github.com/envoyproxy/gateway/internal/metrics"
"github.com/envoyproxy/gateway/internal/probs"
providerrunner "github.com/envoyproxy/gateway/internal/provider/runner"
xdsserverrunner "github.com/envoyproxy/gateway/internal/xds/server/runner"
xdstranslatorrunner "github.com/envoyproxy/gateway/internal/xds/translator/runner"
Expand Down Expand Up @@ -125,9 +126,11 @@ func setupRunners(cfg *config.Server) error {
// and publishes it
// It also subscribes to status resources and once it receives
// a status resource back, it writes it out.
xdsHealthProb := probs.NewXdsReadyHealthProb()
providerRunner := providerrunner.New(&providerrunner.Config{
Server: *cfg,
ProviderResources: pResources,
XdsReady: xdsHealthProb,
})
if err := providerRunner.Start(ctx); err != nil {
return err
Expand Down Expand Up @@ -179,8 +182,9 @@ func setupRunners(cfg *config.Server) error {
// It subscribes to the xds Resources and configures the remote Envoy Proxy
// via the xDS Protocol.
xdsServerRunner := xdsserverrunner.New(&xdsserverrunner.Config{
Server: *cfg,
Xds: xds,
Server: *cfg,
Xds: xds,
XdsReady: xdsHealthProb,
})
if err := xdsServerRunner.Start(ctx); err != nil {
return err
Expand Down
69 changes: 69 additions & 0 deletions internal/probs/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright Envoy Gateway Authors
// SPDX-License-Identifier: Apache-2.0
// The full text of the Apache license is available in the LICENSE file at
// the root of the repo.

package probs

import (
"errors"
"net/http"

"sigs.k8s.io/controller-runtime/pkg/healthz"
)

const StartedXdsServer = "StartedXdsServer"
const NothingToReconcile = "NothingToReconcile"
const GeneratedNewXdsSnapshot = "GeneratedNewXdsSnapshot"

type HealthProb interface {
// GetHealthCheckerProb returns a health checker.
GetHealthCheckerProb() healthz.Checker
// SetIndicator sets an indicator representing a state or status used to determine the readiness of a service.
SetIndicator(name string)
}

func NewXdsReadyHealthProb() HealthProb {
return &xdsHealthProb{
isReady: false,
indicators: make(map[string]bool, 2),
}
}

type xdsHealthProb struct {
isReady bool
indicators map[string]bool
}

// SetIndicator sets an indicator representing a state or status used to determine the readiness of a service.
func (x *xdsHealthProb) SetIndicator(indicator string) {
// Set the indicator in the map
x.indicators[indicator] = true

// If a snapshot is generated, consider the controller as healthy,
// as it has passed at least one full reconcile and xDS is in sync
if indicator == GeneratedNewXdsSnapshot {
x.isReady = true
return
}

// If we don't have any gateways to reconcile and the xDS server is ready,
// we should be ready to start
if x.indicators[StartedXdsServer] && indicator == NothingToReconcile {
x.isReady = true
return
}

// If none of the above conditions are met, expect the controller
// to complete at least one reconcile flow and become healthy after xDS generate a snapshot
// otherwise the instance is considered not healthy
}

func (x *xdsHealthProb) GetHealthCheckerProb() healthz.Checker {
return func(req *http.Request) error {
if x.isReady {
return nil
}
return errors.New("the xds service is not ready")
}
}
64 changes: 64 additions & 0 deletions internal/probs/types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright Envoy Gateway Authors
// SPDX-License-Identifier: Apache-2.0
// The full text of the Apache license is available in the LICENSE file at
// the root of the repo.

package probs

import (
"testing"
)

func TestSetIndicator(t *testing.T) {
// Initialize xdsProb before each test
xdsProb := &xdsHealthProb{
indicators: make(map[string]bool),
isReady: false,
}

testCases := []struct {
name string
indicators []string
expect bool
}{
{
name: "No indicators set",
indicators: []string{},
expect: false,
},
{
name: "GeneratedNewXdsSnapshot only",
indicators: []string{GeneratedNewXdsSnapshot},
expect: true,
},
{
name: "StartedXdsServer only",
indicators: []string{StartedXdsServer},
expect: false,
},
{
name: "Both GeneratedNewXdsSnapshot and StartedXdsServer",
indicators: []string{GeneratedNewXdsSnapshot, StartedXdsServer},
expect: true,
},
// Add more test cases as needed
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Reset xdsProb before each test
xdsProb.indicators = make(map[string]bool)
xdsProb.isReady = false

// Set each indicator
for _, indicator := range tc.indicators {
xdsProb.SetIndicator(indicator)
}

// Check if isReady matches the expected value
if xdsProb.isReady != tc.expect {
t.Errorf("Expected isReady to be %v after setting indicators %v, got %v", tc.expect, tc.indicators, xdsProb.isReady)
}
})
}
}
21 changes: 19 additions & 2 deletions internal/provider/kubernetes/controller.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright Envoy Gateway Authors

Check failure on line 1 in internal/provider/kubernetes/controller.go

View workflow job for this annotation

GitHub Actions / lint

: # github.com/envoyproxy/gateway/internal/provider/kubernetes [github.com/envoyproxy/gateway/internal/provider/kubernetes.test]
// SPDX-License-Identifier: Apache-2.0
// The full text of the Apache license is available in the LICENSE file at
// the root of the repo.
Expand Down Expand Up @@ -38,6 +38,7 @@
"github.com/envoyproxy/gateway/internal/gatewayapi"
"github.com/envoyproxy/gateway/internal/logging"
"github.com/envoyproxy/gateway/internal/message"
"github.com/envoyproxy/gateway/internal/probs"
"github.com/envoyproxy/gateway/internal/status"
"github.com/envoyproxy/gateway/internal/utils"
"github.com/envoyproxy/gateway/internal/utils/slice"
Expand All @@ -55,12 +56,12 @@
mergeGateways sets.Set[string]
resources *message.ProviderResources
extGVKs []schema.GroupVersionKind
xdsReadyProb probs.HealthProb
}

// newGatewayAPIController
func newGatewayAPIController(mgr manager.Manager, cfg *config.Server, su status.Updater,
resources *message.ProviderResources,
) error {
resources *message.ProviderResources, xdsReadyProb probs.HealthProb) error {
ctx := context.Background()

// Gather additional resources to watch from registered extensions
Expand Down Expand Up @@ -90,6 +91,7 @@
store: newProviderStore(),
envoyGateway: cfg.EnvoyGateway,
mergeGateways: sets.New[string](),
xdsReadyProb: xdsReadyProb,
}

if byNamespaceSelector {
Expand Down Expand Up @@ -151,6 +153,8 @@
if managedGCs == nil {
r.resources.GatewayAPIResources.Delete(string(r.classController))
r.log.Info("no accepted gatewayclass")
// assume xds has nothing to update
r.xdsReadyProb.SetIndicator(probs.NothingToReconcile)
return reconcile.Result{}, nil
}

Expand Down Expand Up @@ -918,13 +922,26 @@
}

if err := c.Watch(
// Trigger a complete reconciliation for empty or new clusters to initiate the XDS health prob,
// as without it, it's uncertain whether the initial reconciliation has been executed to commence.
// If leader election is disabled, this is invoked immediately.
NewWatchAndReconcileSource(mgr.Elected(), &gwapiv1.GatewayClass{}),
handler.EnqueueRequestsFromMapFunc(r.enqueueClass)); err != nil {
return err
}
if err := c.Watch(
source.Kind(mgr.GetCache(), &gwapiv1.GatewayClass{}),
handler.EnqueueRequestsFromMapFunc(r.enqueueClass),
predicate.GenerationChangedPredicate{},
predicate.NewPredicateFuncs(r.hasMatchingController),
); err != nil {
source.Kind(mgr.GetCache(), &gwapiv1.GatewayClass{},
handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, gc *gwapiv1.GatewayClass) []reconcile.Request {
return r.enqueueClass(ctx, gc)
}),
&predicate.TypedGenerationChangedPredicate[*gwapiv1.GatewayClass]{},
predicate.NewTypedPredicateFuncs[*gwapiv1.GatewayClass](r.hasMatchingController))); err != nil {

Check failure on line 943 in internal/provider/kubernetes/controller.go

View workflow job for this annotation

GitHub Actions / coverage-test

expected statement, found ')'

Check failure on line 943 in internal/provider/kubernetes/controller.go

View workflow job for this annotation

GitHub Actions / coverage-test

expected statement, found ')'

Check failure on line 943 in internal/provider/kubernetes/controller.go

View workflow job for this annotation

GitHub Actions / coverage-test

expected statement, found ')'

Check failure on line 943 in internal/provider/kubernetes/controller.go

View workflow job for this annotation

GitHub Actions / coverage-test

expected statement, found ')'

Check failure on line 943 in internal/provider/kubernetes/controller.go

View workflow job for this annotation

GitHub Actions / coverage-test

expected statement, found ')'

Check failure on line 943 in internal/provider/kubernetes/controller.go

View workflow job for this annotation

GitHub Actions / gen-check

expected statement, found ')'

Check failure on line 943 in internal/provider/kubernetes/controller.go

View workflow job for this annotation

GitHub Actions / gen-check

expected statement, found ')'

Check failure on line 943 in internal/provider/kubernetes/controller.go

View workflow job for this annotation

GitHub Actions / gen-check

expected statement, found ')'

Check failure on line 943 in internal/provider/kubernetes/controller.go

View workflow job for this annotation

GitHub Actions / gen-check

expected statement, found ')'

Check failure on line 943 in internal/provider/kubernetes/controller.go

View workflow job for this annotation

GitHub Actions / gen-check

expected statement, found ')'

Check failure on line 943 in internal/provider/kubernetes/controller.go

View workflow job for this annotation

GitHub Actions / lint

syntax error: unexpected ) at end of statement

Check failure on line 943 in internal/provider/kubernetes/controller.go

View workflow job for this annotation

GitHub Actions / lint

syntax error: unexpected ) at end of statement

Check failure on line 943 in internal/provider/kubernetes/controller.go

View workflow job for this annotation

GitHub Actions / lint

syntax error: unexpected ) at end of statement

Check failure on line 943 in internal/provider/kubernetes/controller.go

View workflow job for this annotation

GitHub Actions / lint

expected statement, found ')' (typecheck)
return err

Check failure on line 944 in internal/provider/kubernetes/controller.go

View workflow job for this annotation

GitHub Actions / lint

syntax error: unexpected return, expected expression) (typecheck)

Check failure on line 944 in internal/provider/kubernetes/controller.go

View workflow job for this annotation

GitHub Actions / lint

syntax error: unexpected return, expected expression) (typecheck)

Check failure on line 944 in internal/provider/kubernetes/controller.go

View workflow job for this annotation

GitHub Actions / lint

syntax error: unexpected return, expected expression (typecheck)
}

// Only enqueue EnvoyProxy objects that match this Envoy Gateway's GatewayClass.
Expand Down
9 changes: 4 additions & 5 deletions internal/provider/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/envoyproxy/gateway/internal/envoygateway"
ec "github.com/envoyproxy/gateway/internal/envoygateway/config"
"github.com/envoyproxy/gateway/internal/message"
"github.com/envoyproxy/gateway/internal/probs"
"github.com/envoyproxy/gateway/internal/status"
)

Expand All @@ -34,9 +35,8 @@ type Provider struct {
}

// New creates a new Provider from the provided EnvoyGateway.
func New(cfg *rest.Config, svr *ec.Server, resources *message.ProviderResources) (*Provider, error) {
func New(cfg *rest.Config, svr *ec.Server, resources *message.ProviderResources, xdsReady probs.HealthProb) (*Provider, error) {
// TODO: Decide which mgr opts should be exposed through envoygateway.provider.kubernetes API.

mgrOpts := manager.Options{
Scheme: envoygateway.GetScheme(),
Logger: svr.Logger.Logger,
Expand Down Expand Up @@ -90,7 +90,7 @@ func New(cfg *rest.Config, svr *ec.Server, resources *message.ProviderResources)
}

// Create and register the controllers with the manager.
if err := newGatewayAPIController(mgr, svr, updateHandler.Writer(), resources); err != nil {
if err := newGatewayAPIController(mgr, svr, updateHandler.Writer(), resources, xdsReady); err != nil {
return nil, fmt.Errorf("failted to create gatewayapi controller: %w", err)
}

Expand All @@ -99,8 +99,7 @@ func New(cfg *rest.Config, svr *ec.Server, resources *message.ProviderResources)
return nil, fmt.Errorf("unable to set up health check: %w", err)
}

// Add ready check health probes.
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
if err = mgr.AddReadyzCheck("readyz", xdsReady.GetHealthCheckerProb()); err != nil {
return nil, fmt.Errorf("unable to set up ready check: %w", err)
}

Expand Down
7 changes: 4 additions & 3 deletions internal/provider/kubernetes/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/envoyproxy/gateway/internal/envoygateway/config"
"github.com/envoyproxy/gateway/internal/gatewayapi"
"github.com/envoyproxy/gateway/internal/message"
"github.com/envoyproxy/gateway/internal/probs"
"github.com/envoyproxy/gateway/internal/provider/kubernetes/test"
"github.com/envoyproxy/gateway/internal/utils"
)
Expand All @@ -54,7 +55,7 @@ func TestProvider(t *testing.T) {
svr, err := config.New()
require.NoError(t, err)
resources := new(message.ProviderResources)
provider, err := New(cliCfg, svr, resources)
provider, err := New(cliCfg, svr, resources, probs.NewXdsReadyHealthProb())
require.NoError(t, err)
ctx, cancel := context.WithCancel(ctrl.SetupSignalHandler())
go func() {
Expand Down Expand Up @@ -1265,7 +1266,7 @@ func TestNamespacedProvider(t *testing.T) {
LeaderElection: egv1a1.DefaultLeaderElection(),
}
resources := new(message.ProviderResources)
provider, err := New(cliCfg, svr, resources)
provider, err := New(cliCfg, svr, resources, probs.NewXdsReadyHealthProb())
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
go func() {
Expand Down Expand Up @@ -1325,7 +1326,7 @@ func TestNamespaceSelectorProvider(t *testing.T) {
LeaderElection: egv1a1.DefaultLeaderElection(),
}
resources := new(message.ProviderResources)
provider, err := New(cliCfg, svr, resources)
provider, err := New(cliCfg, svr, resources, probs.NewXdsReadyHealthProb())
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
go func() {
Expand Down
4 changes: 3 additions & 1 deletion internal/provider/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ import (
"github.com/envoyproxy/gateway/api/v1alpha1"
"github.com/envoyproxy/gateway/internal/envoygateway/config"
"github.com/envoyproxy/gateway/internal/message"
"github.com/envoyproxy/gateway/internal/probs"
"github.com/envoyproxy/gateway/internal/provider/kubernetes"
)

type Config struct {
config.Server
ProviderResources *message.ProviderResources
XdsReady probs.HealthProb
}

type Runner struct {
Expand All @@ -43,7 +45,7 @@ func (r *Runner) Start(ctx context.Context) (err error) {
if err != nil {
return fmt.Errorf("failed to get kubeconfig: %w", err)
}
p, err := kubernetes.New(cfg, &r.Config.Server, r.ProviderResources)
p, err := kubernetes.New(cfg, &r.Config.Server, r.ProviderResources, r.XdsReady)
if err != nil {
return fmt.Errorf("failed to create provider %s: %w", v1alpha1.ProviderTypeKubernetes, err)
}
Expand Down
14 changes: 9 additions & 5 deletions internal/xds/server/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
"strconv"
"time"

"google.golang.org/grpc/keepalive"

clusterv3 "github.com/envoyproxy/go-control-plane/envoy/service/cluster/v3"
discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
endpointv3 "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3"
Expand All @@ -28,10 +26,12 @@ import (
serverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"

"github.com/envoyproxy/gateway/api/v1alpha1"
"github.com/envoyproxy/gateway/internal/envoygateway/config"
"github.com/envoyproxy/gateway/internal/message"
"github.com/envoyproxy/gateway/internal/probs"
"github.com/envoyproxy/gateway/internal/xds/bootstrap"
"github.com/envoyproxy/gateway/internal/xds/cache"
xdstypes "github.com/envoyproxy/gateway/internal/xds/types"
Expand All @@ -53,9 +53,10 @@ const (

type Config struct {
config.Server
Xds *message.Xds
grpc *grpc.Server
cache cache.SnapshotCacheWithCallbacks
Xds *message.Xds
grpc *grpc.Server
cache cache.SnapshotCacheWithCallbacks
XdsReady probs.HealthProb
}

type Runner struct {
Expand Down Expand Up @@ -113,6 +114,7 @@ func (r *Runner) serveXdsServer(ctx context.Context) {
r.grpc.Stop()
}()

r.XdsReady.SetIndicator(probs.StartedXdsServer)
if err = r.grpc.Serve(l); err != nil {
r.Logger.Error(err, "failed to start grpc based xds server")
}
Expand Down Expand Up @@ -155,6 +157,8 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) {
r.Logger.Error(err, "failed to generate a snapshot")
errChan <- err
}
// signal xds is ready, as we have a new snapshot.
r.XdsReady.SetIndicator(probs.GeneratedNewXdsSnapshot)
},
)

Expand Down
Loading