Skip to content

Commit

Permalink
feat: allow running EnvoyProxy as DaemonSet (#4429)
Browse files Browse the repository at this point in the history
* Update status when running in daemonset mode

* Fix helm permissions and fully implement daemonset

Signed-off-by: jukie <[email protected]>
  • Loading branch information
jukie authored Oct 15, 2024
1 parent 172a73a commit 8fc4ecb
Show file tree
Hide file tree
Showing 17 changed files with 189 additions and 76 deletions.
1 change: 1 addition & 0 deletions charts/gateway-helm/templates/_rbac.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ apiGroups:
- apps
resources:
- deployments
- daemonsets
verbs:
- get
- list
Expand Down
45 changes: 28 additions & 17 deletions internal/gatewayapi/status/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
gwapiv1 "sigs.k8s.io/gateway-api/apis/v1"
)

Expand All @@ -30,8 +31,8 @@ func UpdateGatewayStatusAcceptedCondition(gw *gwapiv1.Gateway, accepted bool) *g

// UpdateGatewayStatusProgrammedCondition updates the status addresses for the provided gateway
// based on the status IP/Hostname of svc and updates the Programmed condition based on the
// service and deployment state.
func UpdateGatewayStatusProgrammedCondition(gw *gwapiv1.Gateway, svc *corev1.Service, deployment *appsv1.Deployment, nodeAddresses ...string) {
// service and deployment or daemonset state.
func UpdateGatewayStatusProgrammedCondition(gw *gwapiv1.Gateway, svc *corev1.Service, envoyObj client.Object, nodeAddresses ...string) {
var addresses, hostnames []string
// Update the status addresses field.
if svc != nil {
Expand Down Expand Up @@ -98,7 +99,7 @@ func UpdateGatewayStatusProgrammedCondition(gw *gwapiv1.Gateway, svc *corev1.Ser
}

// Update the programmed condition.
updateGatewayProgrammedCondition(gw, deployment)
updateGatewayProgrammedCondition(gw, envoyObj)
}

func SetGatewayListenerStatusCondition(gateway *gwapiv1.Gateway, listenerStatusIdx int,
Expand Down Expand Up @@ -132,13 +133,13 @@ func computeGatewayAcceptedCondition(gw *gwapiv1.Gateway, accepted bool) metav1.
const (
messageAddressNotAssigned = "No addresses have been assigned to the Gateway"
messageFmtTooManyAddresses = "Too many addresses (%d) have been assigned to the Gateway, the maximum number of addresses is 16"
messageNoResources = "Deployment replicas unavailable"
messageFmtProgrammed = "Address assigned to the Gateway, %d/%d envoy Deployment replicas available"
messageNoResources = "Envoy replicas unavailable"
messageFmtProgrammed = "Address assigned to the Gateway, %d/%d envoy replicas available"
)

// updateGatewayProgrammedCondition computes the Gateway Programmed status condition.
// Programmed condition surfaces true when the Envoy Deployment status is ready.
func updateGatewayProgrammedCondition(gw *gwapiv1.Gateway, deployment *appsv1.Deployment) {
// Programmed condition surfaces true when the Envoy Deployment or DaemonSet status is ready.
func updateGatewayProgrammedCondition(gw *gwapiv1.Gateway, envoyObj client.Object) {
if len(gw.Status.Addresses) == 0 {
gw.Status.Conditions = MergeConditions(gw.Status.Conditions,
newCondition(string(gwapiv1.GatewayConditionProgrammed), metav1.ConditionFalse, string(gwapiv1.GatewayReasonAddressNotAssigned),
Expand All @@ -157,17 +158,27 @@ func updateGatewayProgrammedCondition(gw *gwapiv1.Gateway, deployment *appsv1.De
return
}

// If there are no available replicas for the Envoy Deployment, don't
// mark the Gateway as ready yet.

if deployment == nil || deployment.Status.AvailableReplicas == 0 {
gw.Status.Conditions = MergeConditions(gw.Status.Conditions,
newCondition(string(gwapiv1.GatewayConditionProgrammed), metav1.ConditionFalse, string(gwapiv1.GatewayReasonNoResources),
messageNoResources, time.Now(), gw.Generation))
return
// Check for available Envoy replicas and if found mark the gateway as ready.
switch obj := envoyObj.(type) {
case *appsv1.Deployment:
if obj != nil && obj.Status.AvailableReplicas > 0 {
gw.Status.Conditions = MergeConditions(gw.Status.Conditions,
newCondition(string(gwapiv1.GatewayConditionProgrammed), metav1.ConditionTrue, string(gwapiv1.GatewayConditionProgrammed),
fmt.Sprintf(messageFmtProgrammed, obj.Status.AvailableReplicas, obj.Status.Replicas), time.Now(), gw.Generation))
return
}
case *appsv1.DaemonSet:
if obj != nil && obj.Status.NumberAvailable > 0 {
gw.Status.Conditions = MergeConditions(gw.Status.Conditions,
newCondition(string(gwapiv1.GatewayConditionProgrammed), metav1.ConditionTrue, string(gwapiv1.GatewayConditionProgrammed),
fmt.Sprintf(messageFmtProgrammed, obj.Status.NumberAvailable, obj.Status.CurrentNumberScheduled), time.Now(), gw.Generation))
return
}
}

// If there are no available replicas for the Envoy Deployment or
// Envoy DaemonSet, don't mark the Gateway as ready yet.
gw.Status.Conditions = MergeConditions(gw.Status.Conditions,
newCondition(string(gwapiv1.GatewayConditionProgrammed), metav1.ConditionTrue, string(gwapiv1.GatewayConditionProgrammed),
fmt.Sprintf(messageFmtProgrammed, deployment.Status.AvailableReplicas, deployment.Status.Replicas), time.Now(), gw.Generation))
newCondition(string(gwapiv1.GatewayConditionProgrammed), metav1.ConditionFalse, string(gwapiv1.GatewayReasonNoResources),
messageNoResources, time.Now(), gw.Generation))
}
28 changes: 24 additions & 4 deletions internal/provider/kubernetes/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1386,13 +1386,13 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M
}

// Watch Deployment CRUDs and process affected Gateways.
dPredicates := []predicate.TypedPredicate[*appsv1.Deployment]{
deploymentPredicates := []predicate.TypedPredicate[*appsv1.Deployment]{
predicate.NewTypedPredicateFuncs[*appsv1.Deployment](func(deploy *appsv1.Deployment) bool {
return r.validateDeploymentForReconcile(deploy)
return r.validateObjectForReconcile(deploy)
}),
}
if r.namespaceLabel != nil {
dPredicates = append(dPredicates, predicate.NewTypedPredicateFuncs[*appsv1.Deployment](func(deploy *appsv1.Deployment) bool {
deploymentPredicates = append(deploymentPredicates, predicate.NewTypedPredicateFuncs[*appsv1.Deployment](func(deploy *appsv1.Deployment) bool {
return r.hasMatchingNamespaceLabels(deploy)
}))
}
Expand All @@ -1401,7 +1401,27 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M
handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, deploy *appsv1.Deployment) []reconcile.Request {
return r.enqueueClass(ctx, deploy)
}),
dPredicates...)); err != nil {
deploymentPredicates...)); err != nil {
return err
}

// Watch DaemonSet CRUDs and process affected Gateways.
daemonsetPredicates := []predicate.TypedPredicate[*appsv1.DaemonSet]{
predicate.NewTypedPredicateFuncs[*appsv1.DaemonSet](func(daemonset *appsv1.DaemonSet) bool {
return r.validateObjectForReconcile(daemonset)
}),
}
if r.namespaceLabel != nil {
daemonsetPredicates = append(daemonsetPredicates, predicate.NewTypedPredicateFuncs[*appsv1.DaemonSet](func(daemonset *appsv1.DaemonSet) bool {
return r.hasMatchingNamespaceLabels(daemonset)
}))
}
if err := c.Watch(
source.Kind(mgr.GetCache(), &appsv1.DaemonSet{},
handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, daemonset *appsv1.DaemonSet) []reconcile.Request {
return r.enqueueClass(ctx, daemonset)
}),
daemonsetPredicates...)); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion internal/provider/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func New(cfg *rest.Config, svr *ec.Server, resources *message.ProviderResources)
return nil, fmt.Errorf("unable to set up ready check: %w", err)
}

// Emit elected & continue with deployment of infra resources
// Emit elected & continue with envoyObjects of infra resources
go func() {
<-mgr.Elected()
close(svr.Elected)
Expand Down
63 changes: 37 additions & 26 deletions internal/provider/kubernetes/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -439,21 +440,16 @@ func (r *gatewayAPIReconciler) validateEndpointSliceForReconcile(obj client.Obje
return r.isEnvoyExtensionPolicyReferencingBackend(&nsName)
}

// validateDeploymentForReconcile tries finding the owning Gateway of the Deployment
// validateObjectForReconcile tries finding the owning Gateway of the Deployment or DaemonSet
// if it exists, finds the Gateway's Service, and further updates the Gateway
// status Ready condition. No Deployments are pushed for reconciliation.
func (r *gatewayAPIReconciler) validateDeploymentForReconcile(obj client.Object) bool {
// status Ready condition. No Deployments or DaemonSets are pushed for reconciliation.
func (r *gatewayAPIReconciler) validateObjectForReconcile(obj client.Object) bool {
ctx := context.Background()
deployment, ok := obj.(*appsv1.Deployment)
if !ok {
r.log.Info("unexpected object type, bypassing reconciliation", "object", obj)
return false
}
labels := deployment.GetLabels()
labels := obj.GetLabels()

// Only deployments in the configured namespace should be reconciled.
if deployment.Namespace == r.namespace {
// Check if the deployment belongs to a Gateway, if so, update the Gateway status.
// Only objects in the configured namespace should be reconciled.
if obj.GetNamespace() == r.namespace {
// Check if the obj belongs to a Gateway, if so, update the Gateway status.
gtw := r.findOwningGateway(ctx, labels)
if gtw != nil {
r.updateStatusForGateway(ctx, gtw)
Expand All @@ -471,27 +467,42 @@ func (r *gatewayAPIReconciler) validateDeploymentForReconcile(obj client.Object)
return false
}

// There is no need to reconcile the Deployment any further.
// There is no need to reconcile the object any further.
return false
}

// envoyDeploymentForGateway returns the Envoy Deployment, returning nil if the Deployment doesn't exist.
func (r *gatewayAPIReconciler) envoyDeploymentForGateway(ctx context.Context, gateway *gwapiv1.Gateway) (*appsv1.Deployment, error) {
var deployments appsv1.DeploymentList
labelSelector := labels.SelectorFromSet(labels.Set(gatewayapi.OwnerLabels(gateway, r.mergeGateways.Has(string(gateway.Spec.GatewayClassName)))))
if err := r.client.List(ctx, &deployments, &client.ListOptions{
LabelSelector: labelSelector,
Namespace: r.namespace,
}); err != nil {
if kerrors.IsNotFound(err) {
// envoyObjectForGateway returns the Envoy Deployment or DaemonSet, returning nil if neither exists.
func (r *gatewayAPIReconciler) envoyObjectForGateway(ctx context.Context, gateway *gwapiv1.Gateway) (client.Object, error) {
// Helper func to list and return the first object from results
listResource := func(list client.ObjectList) (client.Object, error) {
if err := r.client.List(ctx, list, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(gatewayapi.OwnerLabels(gateway, r.mergeGateways.Has(string(gateway.Spec.GatewayClassName)))),
Namespace: r.namespace,
}); err != nil {
if !kerrors.IsNotFound(err) {
return nil, err
}
}
items, err := meta.ExtractList(list)
if err != nil || len(items) == 0 {
return nil, nil
}
return nil, err
return items[0].(client.Object), nil
}
if len(deployments.Items) == 0 {
return nil, nil

// Check for Deployment
deployments := &appsv1.DeploymentList{}
if obj, err := listResource(deployments); obj != nil || err != nil {
return obj, err
}

// Check for DaemonSet
daemonsets := &appsv1.DaemonSetList{}
if obj, err := listResource(daemonsets); obj != nil || err != nil {
return obj, err
}
return &deployments.Items[0], nil

return nil, nil
}

// envoyServiceForGateway returns the Envoy service, returning nil if the service doesn't exist.
Expand Down
80 changes: 56 additions & 24 deletions internal/provider/kubernetes/predicates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ func TestValidateServiceForReconcile(t *testing.T) {
expect bool
}{
{
name: "gateway service but deployment does not exist",
name: "gateway service but deployment or daemonset does not exist",
configs: []client.Object{
test.GetGatewayClass("test-gc", egv1a1.GatewayControllerName, nil),
sampleGateway,
Expand All @@ -547,7 +547,22 @@ func TestValidateServiceForReconcile(t *testing.T) {
gatewayapi.OwningGatewayNameLabel: "scheduled-status-test",
gatewayapi.OwningGatewayNamespaceLabel: "default",
}, nil),
// Note that in case when a deployment exists, the Service is just processed for Gateway status
// Note that in case when a envoyObjects exists, the Service is just processed for Gateway status
// updates and not reconciled further.
expect: false,
},
{
name: "gateway service daemonset also exist",
configs: []client.Object{
test.GetGatewayClass("test-gc", egv1a1.GatewayControllerName, nil),
sampleGateway,
test.GetGatewayDaemonSet(types.NamespacedName{Name: proxy.ExpectedResourceHashedName("default/scheduled-status-test")}, nil),
},
service: test.GetService(types.NamespacedName{Name: "service"}, map[string]string{
gatewayapi.OwningGatewayNameLabel: "scheduled-status-test",
gatewayapi.OwningGatewayNamespaceLabel: "default",
}, nil),
// Note that in case when a envoyObjects exists, the Service is just processed for Gateway status
// updates and not reconciled further.
expect: false,
},
Expand Down Expand Up @@ -859,34 +874,39 @@ func TestValidateServiceForReconcile(t *testing.T) {
}
}

// TestValidateDeploymentForReconcile tests the validateDeploymentForReconcile
// TestValidateObjectForReconcile tests the validateObjectForReconcile
// predicate function.
func TestValidateDeploymentForReconcile(t *testing.T) {
func TestValidateObjectForReconcile(t *testing.T) {
sampleGateway := test.GetGateway(types.NamespacedName{Namespace: "default", Name: "scheduled-status-test"}, "test-gc", 8080)
mergeGatewaysConfig := test.GetEnvoyProxy(types.NamespacedName{Namespace: "default", Name: "merge-gateways-config"}, true)

testCases := []struct {
name string
configs []client.Object
deployment client.Object
expect bool
name string
configs []client.Object
envoyObjects []client.Object
expect bool
}{
{
// No config should lead to a reconciliation of a Deployment object. The main
// purpose of the Deployment watcher is just for update Gateway object statuses.
name: "gateway deployment deployment also exist",
// No config should lead to a reconciliation of a Deployment or DaemonSet object. The main
// purpose of the watcher is just for updating Gateway object statuses.
name: "gateway deployment or daemonset also exist",
configs: []client.Object{
test.GetGatewayClass("test-gc", egv1a1.GatewayControllerName, nil),
sampleGateway,
test.GetService(types.NamespacedName{Name: "deployment"}, map[string]string{
test.GetService(types.NamespacedName{Name: "envoyObjects"}, map[string]string{
gatewayapi.OwningGatewayNameLabel: "scheduled-status-test",
gatewayapi.OwningGatewayNamespaceLabel: "default",
}, nil),
},
deployment: test.GetGatewayDeployment(types.NamespacedName{Name: "deployment"}, map[string]string{
gatewayapi.OwningGatewayNameLabel: "scheduled-status-test",
gatewayapi.OwningGatewayNamespaceLabel: "default",
}),
envoyObjects: []client.Object{
test.GetGatewayDeployment(types.NamespacedName{Name: "deployment"}, map[string]string{
gatewayapi.OwningGatewayNameLabel: "scheduled-status-test",
gatewayapi.OwningGatewayNamespaceLabel: "default",
}), test.GetGatewayDaemonSet(types.NamespacedName{Name: "daemonset"}, map[string]string{
gatewayapi.OwningGatewayNameLabel: "scheduled-status-test",
gatewayapi.OwningGatewayNamespaceLabel: "default",
}),
},
expect: false,
},
{
Expand All @@ -900,9 +920,14 @@ func TestValidateDeploymentForReconcile(t *testing.T) {
}),
mergeGatewaysConfig,
},
deployment: test.GetGatewayDeployment(types.NamespacedName{Name: "deployment"}, map[string]string{
gatewayapi.OwningGatewayClassLabel: "test-mg",
}),
envoyObjects: []client.Object{
test.GetGatewayDeployment(types.NamespacedName{Name: "deployment"}, map[string]string{
gatewayapi.OwningGatewayClassLabel: "test-mg",
}),
test.GetGatewayDaemonSet(types.NamespacedName{Name: "daemonset"}, map[string]string{
gatewayapi.OwningGatewayClassLabel: "test-mg",
}),
},
expect: false,
},
{
Expand All @@ -919,9 +944,14 @@ func TestValidateDeploymentForReconcile(t *testing.T) {
test.GetGateway(types.NamespacedName{Name: "merged-gateway-2", Namespace: "default"}, "test-mg", 8082),
test.GetGateway(types.NamespacedName{Name: "merged-gateway-3", Namespace: "default"}, "test-mg", 8083),
},
deployment: test.GetGatewayDeployment(types.NamespacedName{Name: "deployment"}, map[string]string{
gatewayapi.OwningGatewayClassLabel: "test-mg",
}),
envoyObjects: []client.Object{
test.GetGatewayDeployment(types.NamespacedName{Name: "deployment"}, map[string]string{
gatewayapi.OwningGatewayClassLabel: "test-mg",
}),
test.GetGatewayDaemonSet(types.NamespacedName{Name: "daemonset"}, map[string]string{
gatewayapi.OwningGatewayClassLabel: "test-mg",
}),
},
expect: false,
},
}
Expand All @@ -938,8 +968,10 @@ func TestValidateDeploymentForReconcile(t *testing.T) {
for _, tc := range testCases {
r.client = fakeclient.NewClientBuilder().WithScheme(envoygateway.GetScheme()).WithObjects(tc.configs...).Build()
t.Run(tc.name, func(t *testing.T) {
res := r.validateDeploymentForReconcile(tc.deployment)
require.Equal(t, tc.expect, res)
for _, obj := range tc.envoyObjects {
res := r.validateObjectForReconcile(obj)
require.Equal(t, tc.expect, res)
}
})
}
}
Expand Down
6 changes: 3 additions & 3 deletions internal/provider/kubernetes/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,8 @@ func (r *gatewayAPIReconciler) updateStatusForGateway(ctx context.Context, gtw *
return
}

// Get deployment
deploy, err := r.envoyDeploymentForGateway(ctx, gtw)
// Get envoyObjects
envoyObj, err := r.envoyObjectForGateway(ctx, gtw)
if err != nil {
r.log.Info("failed to get Deployment for gateway",
"namespace", gtw.Namespace, "name", gtw.Name)
Expand All @@ -491,7 +491,7 @@ func (r *gatewayAPIReconciler) updateStatusForGateway(ctx context.Context, gtw *
// update accepted condition
status.UpdateGatewayStatusAcceptedCondition(gtw, true)
// update address field and programmed condition
status.UpdateGatewayStatusProgrammedCondition(gtw, svc, deploy, r.store.listNodeAddresses()...)
status.UpdateGatewayStatusProgrammedCondition(gtw, svc, envoyObj, r.store.listNodeAddresses()...)

key := utils.NamespacedName(gtw)

Expand Down
Loading

0 comments on commit 8fc4ecb

Please sign in to comment.