Skip to content

Commit

Permalink
lastleg
Browse files Browse the repository at this point in the history
Signed-off-by: Shubham Chauhan <[email protected]>
  • Loading branch information
chauhanshubham committed Dec 3, 2022
1 parent 772bca7 commit 3bb2362
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 381 deletions.
239 changes: 59 additions & 180 deletions internal/provider/kubernetes/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,11 @@ type gatewayAPIReconciler struct {
statusUpdater status.Updater
classController gwapiv1b1.GatewayController

resources *message.ProviderResources
referenceStore *providerReferenceStore
resources *message.ProviderResources
}

// newGatewayAPIController
func newGatewayAPIController(mgr manager.Manager, cfg *config.Server, su status.Updater, resources *message.ProviderResources, referenceStore *providerReferenceStore) error {
func newGatewayAPIController(mgr manager.Manager, cfg *config.Server, su status.Updater, resources *message.ProviderResources) error {
ctx := context.Background()

r := &gatewayAPIReconciler{
Expand All @@ -64,7 +63,6 @@ func newGatewayAPIController(mgr manager.Manager, cfg *config.Server, su status.
classController: gwapiv1b1.GatewayController(cfg.EnvoyGateway.Gateway.ControllerName),
statusUpdater: su,
resources: resources,
referenceStore: referenceStore,
}

c, err := controller.New("gatewayapi", mgr, controller.Options{Reconciler: r})
Expand Down Expand Up @@ -101,7 +99,6 @@ func newGatewayAPIController(mgr manager.Manager, cfg *config.Server, su status.
if err := c.Watch(
&source.Kind{Type: &gwapiv1b1.HTTPRoute{}},
&handler.EnqueueRequestForObject{},
predicate.NewPredicateFuncs(r.validateHTTPRouteForReconcile),
); err != nil {
return err
}
Expand All @@ -113,7 +110,6 @@ func newGatewayAPIController(mgr manager.Manager, cfg *config.Server, su status.
if err := c.Watch(
&source.Kind{Type: &gwapiv1a2.TLSRoute{}},
&handler.EnqueueRequestForObject{},
predicate.NewPredicateFuncs(r.validateTLSRouteForReconcile),
); err != nil {
return err
}
Expand Down Expand Up @@ -162,6 +158,15 @@ func newGatewayAPIController(mgr manager.Manager, cfg *config.Server, su status.
return nil
}

type resourceMappings struct {
// Map for storing namespaces for Route, Service and Gateway objects.
allAssociatedNamespaces map[string]struct{}
// Map for storing service NamespaceNames referred by various Route objects.
allAssociatedBackendRefs map[types.NamespacedName]struct{}
// Map for storing referenceGrant NamespaceNames for BackendRefs, SecretRefs.
allAssociatedRefGrants map[types.NamespacedName]*gwapiv1a2.ReferenceGrant
}

func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
r.log.WithName(request.Name).Info("reconciling gatewayAPI object", "namespace", request.Namespace, "name", request.Name)

Expand Down Expand Up @@ -241,9 +246,11 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, request reconcile.
Namespaces: []*corev1.Namespace{},
}

// Add objects' namespaces into the map.
// Make sure to add only those objects' namespaces, that exist.
allAssociatedNamespaces := map[string]struct{}{}
resourceMap := &resourceMappings{
allAssociatedNamespaces: map[string]struct{}{},
allAssociatedBackendRefs: map[types.NamespacedName]struct{}{},
allAssociatedRefGrants: map[types.NamespacedName]*gwapiv1a2.ReferenceGrant{},
}

// Find gateways for the acceptedGC
// Find the Gateways that reference this Class.
Expand All @@ -258,7 +265,7 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, request reconcile.
for _, gtw := range gatewayList.Items {
gtw := gtw
r.log.Info("processing Gateway", "namespace", gtw.Namespace, "name", gtw.Name)
allAssociatedNamespaces[gtw.Namespace] = struct{}{}
resourceMap.allAssociatedNamespaces[gtw.Namespace] = struct{}{}

for _, listener := range gtw.Spec.Listeners {
listener := listener
Expand Down Expand Up @@ -289,141 +296,66 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, request reconcile.
continue
}

resourceTree.ReferenceGrants = append(resourceTree.ReferenceGrants, refGrant)
resourceMap.allAssociatedRefGrants[utils.NamespacedName(refGrant)] = refGrant
}

allAssociatedNamespaces[secretNamespace] = struct{}{}
resourceMap.allAssociatedNamespaces[secretNamespace] = struct{}{}
resourceTree.Secrets = append(resourceTree.Secrets, secret)
}
}
}
}

routeServicesList := map[types.NamespacedName]struct{}{}

// Get TLSRoute objects and check if it exists.
tlsRouteList := &gwapiv1a2.TLSRouteList{}
if err := r.client.List(ctx, tlsRouteList, &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(gatewayTLSRouteIndex, utils.NamespacedName(&gtw).String()),
}); err != nil {
r.log.Error(err, "unable to find associated TLSRoutes")
return reconcile.Result{}, err
}
for _, tlsRoute := range tlsRouteList.Items {
tlsRoute := tlsRoute
r.log.Info("processing TLSRoute", "namespace", tlsRoute.Namespace, "name", tlsRoute.Name)

for _, rule := range tlsRoute.Spec.Rules {
for _, backendRef := range rule.BackendRefs {
backendRef := backendRef
ref := gatewayapi.UpgradeBackendRef(backendRef)
if err := validateBackendRef(&ref); err != nil {
r.log.Error(err, "invalid backendRef")
continue
}

backendNamespace := gatewayapi.NamespaceDerefOrAlpha(backendRef.Namespace, gtw.Namespace)
routeServicesList[types.NamespacedName{
Namespace: backendNamespace,
Name: string(backendRef.Name),
}] = struct{}{}

if backendNamespace != tlsRoute.Namespace {
from := ObjectKindNamespacedName{kind: gatewayapi.KindTLSRoute, namespace: tlsRoute.Namespace, name: tlsRoute.Name}
to := ObjectKindNamespacedName{kind: gatewayapi.KindService, namespace: backendNamespace, name: string(backendRef.Name)}
refGrant, err := r.findReferenceGrant(ctx, from, to)
if err != nil {
r.log.Error(err, "unable to find ReferenceGrant that links the Service to TLSRoute")
continue
}

resourceTree.ReferenceGrants = append(resourceTree.ReferenceGrants, refGrant)
}
}
}

allAssociatedNamespaces[tlsRoute.Namespace] = struct{}{}
resourceTree.TLSRoutes = append(resourceTree.TLSRoutes, &tlsRoute)
}

// Get HTTPRoute objects and check if it exists.
httpRouteList := &gwapiv1b1.HTTPRouteList{}
if err := r.client.List(ctx, httpRouteList, &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(gatewayHTTPRouteIndex, utils.NamespacedName(&gtw).String()),
}); err != nil {
r.log.Error(err, "unable to find associated HTTPRoutes")
return reconcile.Result{}, err
}
for _, httpRoute := range httpRouteList.Items {
httpRoute := httpRoute
r.log.Info("processing HTTPRoute", "namespace", httpRoute.Namespace, "name", httpRoute.Name)

for _, rule := range httpRoute.Spec.Rules {
for _, backendRef := range rule.BackendRefs {
backendRef := backendRef
if err := validateBackendRef(&backendRef.BackendRef); err != nil {
r.log.Error(err, "invalid backendRef")
continue
}
// Route Processing
// Get TLSRoute objects and check if it exists.
if err := r.processTLSRoutes(ctx, utils.NamespacedName(&gtw).String(), resourceMap, resourceTree); err != nil {
return reconcile.Result{}, err
}

backendNamespace := gatewayapi.NamespaceDerefOr(backendRef.Namespace, httpRoute.Namespace)
routeServicesList[types.NamespacedName{
Namespace: backendNamespace,
Name: string(backendRef.Name),
}] = struct{}{}
// Get HTTPRoute objects and check if it exists.
if err := r.processHTTPRoutes(ctx, utils.NamespacedName(&gtw).String(), resourceMap, resourceTree); err != nil {
return reconcile.Result{}, err
}

if backendNamespace != httpRoute.Namespace {
from := ObjectKindNamespacedName{kind: gatewayapi.KindHTTPRoute, namespace: httpRoute.Namespace, name: httpRoute.Name}
to := ObjectKindNamespacedName{kind: gatewayapi.KindService, namespace: backendNamespace, name: string(backendRef.Name)}
refGrant, err := r.findReferenceGrant(ctx, from, to)
if err != nil {
r.log.Error(err, "unable to find ReferenceGrant that links the Service to HTTPRoute")
continue
}
resourceTree.Gateways = append(resourceTree.Gateways, &gtw)
}

resourceTree.ReferenceGrants = append(resourceTree.ReferenceGrants, refGrant)
}
}
}
for serviceNamespaceName := range resourceMap.allAssociatedBackendRefs {
r.log.Info("processing Service", "namespace", serviceNamespaceName.Namespace,
"name", serviceNamespaceName.Name)

allAssociatedNamespaces[httpRoute.Namespace] = struct{}{}
resourceTree.HTTPRoutes = append(resourceTree.HTTPRoutes, &httpRoute)
service := new(corev1.Service)
err := r.client.Get(ctx, serviceNamespaceName, service)
if err != nil {
r.log.Error(err, "unable to find associated Services")
if kerrors.IsNotFound(err) {
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}

for serviceNamespaceName := range routeServicesList {
r.log.Info("processing Service", "namespace", serviceNamespaceName.Namespace,
"name", serviceNamespaceName.Name)

service := new(corev1.Service)
err := r.client.Get(ctx, serviceNamespaceName, service)
if err != nil {
if kerrors.IsNotFound(err) {
continue
}
r.log.Error(err, "unable to find associated Services")
return reconcile.Result{}, err
}
resourceMap.allAssociatedNamespaces[service.Namespace] = struct{}{}
resourceTree.Services = append(resourceTree.Services, service)
}

allAssociatedNamespaces[service.Namespace] = struct{}{}
resourceTree.Services = append(resourceTree.Services, service)
}
}
// Add all ReferenceGrants to the resourceTree
for _, referenceGrant := range resourceMap.allAssociatedRefGrants {
resourceTree.ReferenceGrants = append(resourceTree.ReferenceGrants, referenceGrant)
}

// For this particular Gateway, and all associated objects, check whether the
// namespace exists. Add to the resourceTree.
for ns := range allAssociatedNamespaces {
namespace, err := r.getNamespace(ctx, ns)
if err != nil {
if kerrors.IsNotFound(err) {
continue
}
r.log.Error(err, "unable to find the namespace")
return reconcile.Result{}, err
// For this particular Gateway, and all associated objects, check whether the
// namespace exists. Add to the resourceTree.
for ns := range resourceMap.allAssociatedNamespaces {
namespace, err := r.getNamespace(ctx, ns)
if err != nil {
r.log.Error(err, "unable to find the namespace")
if kerrors.IsNotFound(err) {
return reconcile.Result{}, nil
}

resourceTree.Namespaces = append(resourceTree.Namespaces, namespace)
return reconcile.Result{}, err
}

resourceTree.Gateways = append(resourceTree.Gateways, &gtw)
resourceTree.Namespaces = append(resourceTree.Namespaces, namespace)
}

if err := updater(acceptedGC, true); err != nil {
Expand Down Expand Up @@ -469,27 +401,6 @@ func (r *gatewayAPIReconciler) getNamespace(ctx context.Context, name string) (*
return ns, nil
}

func (r gatewayAPIReconciler) findOwningGateway(ctx context.Context, labels map[string]string) *gwapiv1b1.Gateway {
gwName, ok := labels[gatewayapi.OwningGatewayNameLabel]
if !ok {
return nil
}

gwNamespace, ok := labels[gatewayapi.OwningGatewayNamespaceLabel]
if !ok {
return nil
}

gatewayKey := types.NamespacedName{Namespace: gwNamespace, Name: gwName}
gtw := new(gwapiv1b1.Gateway)
if err := r.client.Get(ctx, gatewayKey, gtw); err != nil {
r.log.Error(err, "gateway not found")
return nil
}

return gtw
}

func (r *gatewayAPIReconciler) statusUpdateForGateway(gtw *gwapiv1b1.Gateway, svc *corev1.Service, deploy *appsv1.Deployment) {
// update scheduled condition
status.UpdateGatewayStatusScheduledCondition(gtw, true)
Expand Down Expand Up @@ -737,38 +648,6 @@ func (r *gatewayAPIReconciler) addFinalizer(ctx context.Context, gc *gwapiv1b1.G
return nil
}

// envoyDeploymentForGateway returns the Envoy Deployment, returning nil if the Deployment doesn't exist.
func (r *gatewayAPIReconciler) envoyDeploymentForGateway(ctx context.Context, gateway *gwapiv1b1.Gateway) (*appsv1.Deployment, error) {
key := types.NamespacedName{
Namespace: config.EnvoyGatewayNamespace,
Name: infraDeploymentName(gateway),
}
deployment := new(appsv1.Deployment)
if err := r.client.Get(ctx, key, deployment); err != nil {
if kerrors.IsNotFound(err) {
return nil, nil
}
return nil, err
}
return deployment, nil
}

// envoyServiceForGateway returns the Envoy service, returning nil if the service doesn't exist.
func (r *gatewayAPIReconciler) envoyServiceForGateway(ctx context.Context, gateway *gwapiv1b1.Gateway) (*corev1.Service, error) {
key := types.NamespacedName{
Namespace: config.EnvoyGatewayNamespace,
Name: infraServiceName(gateway),
}
svc := new(corev1.Service)
if err := r.client.Get(ctx, key, svc); err != nil {
if kerrors.IsNotFound(err) {
return nil, nil
}
return nil, err
}
return svc, nil
}

// subscribeAndUpdateStatus subscribes to gateway API object status updates and
// writes it into the Kubernetes API Server.
func (r *gatewayAPIReconciler) subscribeAndUpdateStatus(ctx context.Context) {
Expand Down
6 changes: 6 additions & 0 deletions internal/provider/kubernetes/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ const (
gatewayClassFinalizer = gwapiv1b1.GatewayClassFinalizerGatewaysExist
)

type ObjectKindNamespacedName struct {
kind string
namespace string
name string
}

// validateParentRefs validates the provided routeParentReferences, returning the
// referenced Gateways managed by Envoy Gateway. The only supported parentRef
// is a Gateway.
Expand Down
5 changes: 1 addition & 4 deletions internal/provider/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,8 @@ func New(cfg *rest.Config, svr *config.Server, resources *message.ProviderResour
return nil, fmt.Errorf("failed to add status update handler %v", err)
}

// Initialize kubernetes provider referenceStore to store additional object mappings.
referenceStore := newProviderReferenceStore()

// Create and register the controllers with the manager.
if err := newGatewayAPIController(mgr, svr, updateHandler.Writer(), resources, referenceStore); err != nil {
if err := newGatewayAPIController(mgr, svr, updateHandler.Writer(), resources); err != nil {
return nil, fmt.Errorf("failted to create gatewayapi controller: %w", err)
}

Expand Down
Loading

0 comments on commit 3bb2362

Please sign in to comment.