Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow running EnvoyProxy as DaemonSet #4429

Merged
merged 13 commits into from
Oct 15, 2024
1 change: 1 addition & 0 deletions charts/gateway-helm/templates/_rbac.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ apiGroups:
- apps
resources:
- deployments
- daemonsets
jukie marked this conversation as resolved.
Show resolved Hide resolved
verbs:
- get
- list
Expand Down
45 changes: 28 additions & 17 deletions internal/gatewayapi/status/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
gwapiv1 "sigs.k8s.io/gateway-api/apis/v1"
)

Expand All @@ -30,8 +31,8 @@

// UpdateGatewayStatusProgrammedCondition updates the status addresses for the provided gateway
// based on the status IP/Hostname of svc and updates the Programmed condition based on the
// service and deployment state.
func UpdateGatewayStatusProgrammedCondition(gw *gwapiv1.Gateway, svc *corev1.Service, deployment *appsv1.Deployment, nodeAddresses ...string) {
// service and deployment or daemonset state.
func UpdateGatewayStatusProgrammedCondition(gw *gwapiv1.Gateway, svc *corev1.Service, envoyObj client.Object, nodeAddresses ...string) {
var addresses, hostnames []string
// Update the status addresses field.
if svc != nil {
Expand Down Expand Up @@ -98,7 +99,7 @@
}

// Update the programmed condition.
updateGatewayProgrammedCondition(gw, deployment)
updateGatewayProgrammedCondition(gw, envoyObj)
}

func SetGatewayListenerStatusCondition(gateway *gwapiv1.Gateway, listenerStatusIdx int,
Expand Down Expand Up @@ -132,13 +133,13 @@
const (
messageAddressNotAssigned = "No addresses have been assigned to the Gateway"
messageFmtTooManyAddresses = "Too many addresses (%d) have been assigned to the Gateway, the maximum number of addresses is 16"
messageNoResources = "Deployment replicas unavailable"
messageFmtProgrammed = "Address assigned to the Gateway, %d/%d envoy Deployment replicas available"
messageNoResources = "Envoy replicas unavailable"
messageFmtProgrammed = "Address assigned to the Gateway, %d/%d envoy replicas available"
)

// updateGatewayProgrammedCondition computes the Gateway Programmed status condition.
// Programmed condition surfaces true when the Envoy Deployment status is ready.
func updateGatewayProgrammedCondition(gw *gwapiv1.Gateway, deployment *appsv1.Deployment) {
// Programmed condition surfaces true when the Envoy Deployment or DaemonSet status is ready.
func updateGatewayProgrammedCondition(gw *gwapiv1.Gateway, envoyObj client.Object) {
if len(gw.Status.Addresses) == 0 {
gw.Status.Conditions = MergeConditions(gw.Status.Conditions,
newCondition(string(gwapiv1.GatewayConditionProgrammed), metav1.ConditionFalse, string(gwapiv1.GatewayReasonAddressNotAssigned),
Expand All @@ -157,17 +158,27 @@
return
}

// If there are no available replicas for the Envoy Deployment, don't
// mark the Gateway as ready yet.

if deployment == nil || deployment.Status.AvailableReplicas == 0 {
gw.Status.Conditions = MergeConditions(gw.Status.Conditions,
newCondition(string(gwapiv1.GatewayConditionProgrammed), metav1.ConditionFalse, string(gwapiv1.GatewayReasonNoResources),
messageNoResources, time.Now(), gw.Generation))
return
// Check for available Envoy replicas and if found mark the gateway as ready.
switch obj := envoyObj.(type) {
case *appsv1.Deployment:
if obj != nil && obj.Status.AvailableReplicas > 0 {
gw.Status.Conditions = MergeConditions(gw.Status.Conditions,
newCondition(string(gwapiv1.GatewayConditionProgrammed), metav1.ConditionTrue, string(gwapiv1.GatewayConditionProgrammed),
fmt.Sprintf(messageFmtProgrammed, obj.Status.AvailableReplicas, obj.Status.Replicas), time.Now(), gw.Generation))
return
}
case *appsv1.DaemonSet:
if obj != nil && obj.Status.NumberAvailable > 0 {
gw.Status.Conditions = MergeConditions(gw.Status.Conditions,
newCondition(string(gwapiv1.GatewayConditionProgrammed), metav1.ConditionTrue, string(gwapiv1.GatewayConditionProgrammed),
fmt.Sprintf(messageFmtProgrammed, obj.Status.NumberAvailable, obj.Status.CurrentNumberScheduled), time.Now(), gw.Generation))
return

Check warning on line 175 in internal/gatewayapi/status/gateway.go

View check run for this annotation

Codecov / codecov/patch

internal/gatewayapi/status/gateway.go#L170-L175

Added lines #L170 - L175 were not covered by tests
}
}

// If there are no available replicas for the Envoy Deployment or
// Envoy DaemonSet, don't mark the Gateway as ready yet.
gw.Status.Conditions = MergeConditions(gw.Status.Conditions,
newCondition(string(gwapiv1.GatewayConditionProgrammed), metav1.ConditionTrue, string(gwapiv1.GatewayConditionProgrammed),
fmt.Sprintf(messageFmtProgrammed, deployment.Status.AvailableReplicas, deployment.Status.Replicas), time.Now(), gw.Generation))
newCondition(string(gwapiv1.GatewayConditionProgrammed), metav1.ConditionFalse, string(gwapiv1.GatewayReasonNoResources),
messageNoResources, time.Now(), gw.Generation))
}
28 changes: 24 additions & 4 deletions internal/provider/kubernetes/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1386,13 +1386,13 @@
}

// Watch Deployment CRUDs and process affected Gateways.
dPredicates := []predicate.TypedPredicate[*appsv1.Deployment]{
deploymentPredicates := []predicate.TypedPredicate[*appsv1.Deployment]{
predicate.NewTypedPredicateFuncs[*appsv1.Deployment](func(deploy *appsv1.Deployment) bool {
return r.validateDeploymentForReconcile(deploy)
return r.validateObjectForReconcile(deploy)
}),
}
if r.namespaceLabel != nil {
dPredicates = append(dPredicates, predicate.NewTypedPredicateFuncs[*appsv1.Deployment](func(deploy *appsv1.Deployment) bool {
deploymentPredicates = append(deploymentPredicates, predicate.NewTypedPredicateFuncs[*appsv1.Deployment](func(deploy *appsv1.Deployment) bool {
return r.hasMatchingNamespaceLabels(deploy)
}))
}
Expand All @@ -1401,7 +1401,27 @@
handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, deploy *appsv1.Deployment) []reconcile.Request {
return r.enqueueClass(ctx, deploy)
}),
dPredicates...)); err != nil {
deploymentPredicates...)); err != nil {
return err

Check warning on line 1405 in internal/provider/kubernetes/controller.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/kubernetes/controller.go#L1404-L1405

Added lines #L1404 - L1405 were not covered by tests
}

// Watch DaemonSet CRUDs and process affected Gateways.
daemonsetPredicates := []predicate.TypedPredicate[*appsv1.DaemonSet]{
predicate.NewTypedPredicateFuncs[*appsv1.DaemonSet](func(daemonset *appsv1.DaemonSet) bool {
return r.validateObjectForReconcile(daemonset)
}),

Check warning on line 1412 in internal/provider/kubernetes/controller.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/kubernetes/controller.go#L1411-L1412

Added lines #L1411 - L1412 were not covered by tests
}
if r.namespaceLabel != nil {
daemonsetPredicates = append(daemonsetPredicates, predicate.NewTypedPredicateFuncs[*appsv1.DaemonSet](func(daemonset *appsv1.DaemonSet) bool {
return r.hasMatchingNamespaceLabels(daemonset)
}))

Check warning on line 1417 in internal/provider/kubernetes/controller.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/kubernetes/controller.go#L1416-L1417

Added lines #L1416 - L1417 were not covered by tests
}
if err := c.Watch(
source.Kind(mgr.GetCache(), &appsv1.DaemonSet{},
handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, daemonset *appsv1.DaemonSet) []reconcile.Request {
return r.enqueueClass(ctx, daemonset)
}),
daemonsetPredicates...)); err != nil {

Check warning on line 1424 in internal/provider/kubernetes/controller.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/kubernetes/controller.go#L1422-L1424

Added lines #L1422 - L1424 were not covered by tests
return err
}

Expand Down
2 changes: 1 addition & 1 deletion internal/provider/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func New(cfg *rest.Config, svr *ec.Server, resources *message.ProviderResources)
return nil, fmt.Errorf("unable to set up ready check: %w", err)
}

// Emit elected & continue with deployment of infra resources
// Emit elected & continue with envoyObjects of infra resources
go func() {
<-mgr.Elected()
close(svr.Elected)
Expand Down
63 changes: 37 additions & 26 deletions internal/provider/kubernetes/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -439,21 +440,16 @@
return r.isEnvoyExtensionPolicyReferencingBackend(&nsName)
}

// validateDeploymentForReconcile tries finding the owning Gateway of the Deployment
// validateObjectForReconcile tries finding the owning Gateway of the Deployment or DaemonSet
// if it exists, finds the Gateway's Service, and further updates the Gateway
// status Ready condition. No Deployments are pushed for reconciliation.
func (r *gatewayAPIReconciler) validateDeploymentForReconcile(obj client.Object) bool {
// status Ready condition. No Deployments or DaemonSets are pushed for reconciliation.
func (r *gatewayAPIReconciler) validateObjectForReconcile(obj client.Object) bool {
ctx := context.Background()
deployment, ok := obj.(*appsv1.Deployment)
if !ok {
r.log.Info("unexpected object type, bypassing reconciliation", "object", obj)
return false
}
labels := deployment.GetLabels()
labels := obj.GetLabels()

// Only deployments in the configured namespace should be reconciled.
if deployment.Namespace == r.namespace {
// Check if the deployment belongs to a Gateway, if so, update the Gateway status.
// Only objects in the configured namespace should be reconciled.
if obj.GetNamespace() == r.namespace {
// Check if the obj belongs to a Gateway, if so, update the Gateway status.
gtw := r.findOwningGateway(ctx, labels)
if gtw != nil {
r.updateStatusForGateway(ctx, gtw)
Expand All @@ -471,27 +467,42 @@
return false
}

// There is no need to reconcile the Deployment any further.
// There is no need to reconcile the object any further.
return false
}

// envoyDeploymentForGateway returns the Envoy Deployment, returning nil if the Deployment doesn't exist.
func (r *gatewayAPIReconciler) envoyDeploymentForGateway(ctx context.Context, gateway *gwapiv1.Gateway) (*appsv1.Deployment, error) {
var deployments appsv1.DeploymentList
labelSelector := labels.SelectorFromSet(labels.Set(gatewayapi.OwnerLabels(gateway, r.mergeGateways.Has(string(gateway.Spec.GatewayClassName)))))
if err := r.client.List(ctx, &deployments, &client.ListOptions{
LabelSelector: labelSelector,
Namespace: r.namespace,
}); err != nil {
if kerrors.IsNotFound(err) {
// envoyObjectForGateway returns the Envoy Deployment or DaemonSet, returning nil if neither exists.
func (r *gatewayAPIReconciler) envoyObjectForGateway(ctx context.Context, gateway *gwapiv1.Gateway) (client.Object, error) {
// Helper func to list and return the first object from results
listResource := func(list client.ObjectList) (client.Object, error) {
if err := r.client.List(ctx, list, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(gatewayapi.OwnerLabels(gateway, r.mergeGateways.Has(string(gateway.Spec.GatewayClassName)))),
Namespace: r.namespace,
}); err != nil {
if !kerrors.IsNotFound(err) {
return nil, err

Check warning on line 483 in internal/provider/kubernetes/predicates.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/kubernetes/predicates.go#L482-L483

Added lines #L482 - L483 were not covered by tests
}
}
items, err := meta.ExtractList(list)
if err != nil || len(items) == 0 {
return nil, nil
}
return nil, err
return items[0].(client.Object), nil

Check warning on line 490 in internal/provider/kubernetes/predicates.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/kubernetes/predicates.go#L490

Added line #L490 was not covered by tests
}
if len(deployments.Items) == 0 {
return nil, nil

// Check for Deployment
deployments := &appsv1.DeploymentList{}
if obj, err := listResource(deployments); obj != nil || err != nil {
return obj, err

Check warning on line 496 in internal/provider/kubernetes/predicates.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/kubernetes/predicates.go#L496

Added line #L496 was not covered by tests
}

// Check for DaemonSet
daemonsets := &appsv1.DaemonSetList{}
if obj, err := listResource(daemonsets); obj != nil || err != nil {
return obj, err

Check warning on line 502 in internal/provider/kubernetes/predicates.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/kubernetes/predicates.go#L502

Added line #L502 was not covered by tests
}
return &deployments.Items[0], nil

return nil, nil
}

// envoyServiceForGateway returns the Envoy service, returning nil if the service doesn't exist.
Expand Down
80 changes: 56 additions & 24 deletions internal/provider/kubernetes/predicates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ func TestValidateServiceForReconcile(t *testing.T) {
expect bool
}{
{
name: "gateway service but deployment does not exist",
name: "gateway service but deployment or daemonset does not exist",
configs: []client.Object{
test.GetGatewayClass("test-gc", egv1a1.GatewayControllerName, nil),
sampleGateway,
Expand All @@ -547,7 +547,22 @@ func TestValidateServiceForReconcile(t *testing.T) {
gatewayapi.OwningGatewayNameLabel: "scheduled-status-test",
gatewayapi.OwningGatewayNamespaceLabel: "default",
}, nil),
// Note that in case when a deployment exists, the Service is just processed for Gateway status
// Note that in case when a envoyObjects exists, the Service is just processed for Gateway status
// updates and not reconciled further.
expect: false,
},
{
name: "gateway service daemonset also exist",
configs: []client.Object{
test.GetGatewayClass("test-gc", egv1a1.GatewayControllerName, nil),
sampleGateway,
test.GetGatewayDaemonSet(types.NamespacedName{Name: proxy.ExpectedResourceHashedName("default/scheduled-status-test")}, nil),
},
service: test.GetService(types.NamespacedName{Name: "service"}, map[string]string{
gatewayapi.OwningGatewayNameLabel: "scheduled-status-test",
gatewayapi.OwningGatewayNamespaceLabel: "default",
}, nil),
// Note that in case when a envoyObjects exists, the Service is just processed for Gateway status
// updates and not reconciled further.
expect: false,
},
Expand Down Expand Up @@ -859,34 +874,39 @@ func TestValidateServiceForReconcile(t *testing.T) {
}
}

// TestValidateDeploymentForReconcile tests the validateDeploymentForReconcile
// TestValidateObjectForReconcile tests the validateObjectForReconcile
// predicate function.
func TestValidateDeploymentForReconcile(t *testing.T) {
func TestValidateObjectForReconcile(t *testing.T) {
sampleGateway := test.GetGateway(types.NamespacedName{Namespace: "default", Name: "scheduled-status-test"}, "test-gc", 8080)
mergeGatewaysConfig := test.GetEnvoyProxy(types.NamespacedName{Namespace: "default", Name: "merge-gateways-config"}, true)

testCases := []struct {
name string
configs []client.Object
deployment client.Object
expect bool
name string
configs []client.Object
envoyObjects []client.Object
expect bool
}{
{
// No config should lead to a reconciliation of a Deployment object. The main
// purpose of the Deployment watcher is just for update Gateway object statuses.
name: "gateway deployment deployment also exist",
// No config should lead to a reconciliation of a Deployment or DaemonSet object. The main
// purpose of the watcher is just for updating Gateway object statuses.
name: "gateway deployment or daemonset also exist",
configs: []client.Object{
test.GetGatewayClass("test-gc", egv1a1.GatewayControllerName, nil),
sampleGateway,
test.GetService(types.NamespacedName{Name: "deployment"}, map[string]string{
test.GetService(types.NamespacedName{Name: "envoyObjects"}, map[string]string{
gatewayapi.OwningGatewayNameLabel: "scheduled-status-test",
gatewayapi.OwningGatewayNamespaceLabel: "default",
}, nil),
},
deployment: test.GetGatewayDeployment(types.NamespacedName{Name: "deployment"}, map[string]string{
gatewayapi.OwningGatewayNameLabel: "scheduled-status-test",
gatewayapi.OwningGatewayNamespaceLabel: "default",
}),
envoyObjects: []client.Object{
test.GetGatewayDeployment(types.NamespacedName{Name: "deployment"}, map[string]string{
gatewayapi.OwningGatewayNameLabel: "scheduled-status-test",
gatewayapi.OwningGatewayNamespaceLabel: "default",
}), test.GetGatewayDaemonSet(types.NamespacedName{Name: "daemonset"}, map[string]string{
gatewayapi.OwningGatewayNameLabel: "scheduled-status-test",
gatewayapi.OwningGatewayNamespaceLabel: "default",
}),
},
expect: false,
},
{
Expand All @@ -900,9 +920,14 @@ func TestValidateDeploymentForReconcile(t *testing.T) {
}),
mergeGatewaysConfig,
},
deployment: test.GetGatewayDeployment(types.NamespacedName{Name: "deployment"}, map[string]string{
gatewayapi.OwningGatewayClassLabel: "test-mg",
}),
envoyObjects: []client.Object{
test.GetGatewayDeployment(types.NamespacedName{Name: "deployment"}, map[string]string{
gatewayapi.OwningGatewayClassLabel: "test-mg",
}),
test.GetGatewayDaemonSet(types.NamespacedName{Name: "daemonset"}, map[string]string{
gatewayapi.OwningGatewayClassLabel: "test-mg",
}),
},
expect: false,
},
{
Expand All @@ -919,9 +944,14 @@ func TestValidateDeploymentForReconcile(t *testing.T) {
test.GetGateway(types.NamespacedName{Name: "merged-gateway-2", Namespace: "default"}, "test-mg", 8082),
test.GetGateway(types.NamespacedName{Name: "merged-gateway-3", Namespace: "default"}, "test-mg", 8083),
},
deployment: test.GetGatewayDeployment(types.NamespacedName{Name: "deployment"}, map[string]string{
gatewayapi.OwningGatewayClassLabel: "test-mg",
}),
envoyObjects: []client.Object{
test.GetGatewayDeployment(types.NamespacedName{Name: "deployment"}, map[string]string{
gatewayapi.OwningGatewayClassLabel: "test-mg",
}),
test.GetGatewayDaemonSet(types.NamespacedName{Name: "daemonset"}, map[string]string{
gatewayapi.OwningGatewayClassLabel: "test-mg",
}),
},
expect: false,
},
}
Expand All @@ -938,8 +968,10 @@ func TestValidateDeploymentForReconcile(t *testing.T) {
for _, tc := range testCases {
r.client = fakeclient.NewClientBuilder().WithScheme(envoygateway.GetScheme()).WithObjects(tc.configs...).Build()
t.Run(tc.name, func(t *testing.T) {
res := r.validateDeploymentForReconcile(tc.deployment)
require.Equal(t, tc.expect, res)
for _, obj := range tc.envoyObjects {
res := r.validateObjectForReconcile(obj)
require.Equal(t, tc.expect, res)
}
})
}
}
Expand Down
6 changes: 3 additions & 3 deletions internal/provider/kubernetes/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,8 @@ func (r *gatewayAPIReconciler) updateStatusForGateway(ctx context.Context, gtw *
return
}

// Get deployment
deploy, err := r.envoyDeploymentForGateway(ctx, gtw)
// Get envoyObjects
envoyObj, err := r.envoyObjectForGateway(ctx, gtw)
if err != nil {
r.log.Info("failed to get Deployment for gateway",
"namespace", gtw.Namespace, "name", gtw.Name)
Expand All @@ -491,7 +491,7 @@ func (r *gatewayAPIReconciler) updateStatusForGateway(ctx context.Context, gtw *
// update accepted condition
status.UpdateGatewayStatusAcceptedCondition(gtw, true)
// update address field and programmed condition
status.UpdateGatewayStatusProgrammedCondition(gtw, svc, deploy, r.store.listNodeAddresses()...)
status.UpdateGatewayStatusProgrammedCondition(gtw, svc, envoyObj, r.store.listNodeAddresses()...)

key := utils.NamespacedName(gtw)

Expand Down
Loading