Skip to content

Commit

Permalink
fix(kuma-cp): remove Dataplane for Pod without IP (#4964)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dyszkiewicz <[email protected]>
(cherry picked from commit f341cff)
  • Loading branch information
jakubdyszkiewicz authored and mergify[bot] committed Oct 5, 2022
1 parent 6adb5d7 commit e700ced
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 80 deletions.
4 changes: 3 additions & 1 deletion pkg/plugins/runtime/k8s/controllers/gateway_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,19 @@ func (r *PodReconciler) createorUpdateBuiltinGatewayDataplane(ctx context.Contex
return nil
})

log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name})
if err != nil {
log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name})
log.Error(err, "unable to create/update Dataplane", "operationResult", operationResult)
r.EventRecorder.Eventf(pod, kube_core.EventTypeWarning, FailedToGenerateKumaDataplaneReason, "Failed to generate Kuma Dataplane: %s", err.Error())
return err
}

switch operationResult {
case kube_controllerutil.OperationResultCreated:
log.Info("Dataplane created")
r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, CreatedKumaDataplaneReason, "Created Kuma Dataplane: %s", dataplane.Name)
case kube_controllerutil.OperationResultUpdated:
log.Info("Dataplane updated")
r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, UpdatedKumaDataplaneReason, "Updated Kuma Dataplane: %s", dataplane.Name)
}
return nil
Expand Down
185 changes: 118 additions & 67 deletions pkg/plugins/runtime/k8s/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/kumahq/kuma/pkg/dns/vips"
k8s_common "github.com/kumahq/kuma/pkg/plugins/common/k8s"
mesh_k8s "github.com/kumahq/kuma/pkg/plugins/resources/k8s/native/api/v1alpha1"
k8s_model "github.com/kumahq/kuma/pkg/plugins/resources/k8s/native/pkg/model"
"github.com/kumahq/kuma/pkg/plugins/runtime/k8s/metadata"
util_k8s "github.com/kumahq/kuma/pkg/plugins/runtime/k8s/util"
)
Expand Down Expand Up @@ -53,45 +54,26 @@ type PodReconciler struct {

func (r *PodReconciler) Reconcile(ctx context.Context, req kube_ctrl.Request) (kube_ctrl.Result, error) {
log := r.Log.WithValues("pod", req.NamespacedName)
log.V(1).Info("reconcile")

// Fetch the Pod instance
pod := &kube_core.Pod{}
if err := r.Get(ctx, req.NamespacedName, pod); err != nil {
if kube_apierrs.IsNotFound(err) {
log.V(1).Info("pod not found. Skipping")
return kube_ctrl.Result{}, nil
}
log.Error(err, "unable to fetch Pod")
return kube_ctrl.Result{}, err
}

// skip a Pod if it doesn't have an IP address yet
if pod.Status.PodIP == "" {
return kube_ctrl.Result{}, nil
}

// skip a Pod if is complete/terminated (most probably a completed job)
if r.isPodComplete(pod) {
return kube_ctrl.Result{}, nil
}

// for Pods marked with ingress annotation special type of Dataplane will be injected
enabled, exist, err := metadata.Annotations(pod.Annotations).GetEnabled(metadata.KumaIngressAnnotation)
if err != nil {
return kube_ctrl.Result{}, err
}
if exist && enabled {
if pod.Namespace != r.SystemNamespace {
return kube_ctrl.Result{}, errors.Errorf("Ingress can only be deployed in system namespace %q", r.SystemNamespace)
}
services, err := r.findMatchingServices(ctx, pod)
if err != nil {
return kube_ctrl.Result{}, err
}
err = r.createOrUpdateIngress(ctx, pod, services)
if err != nil {
return kube_ctrl.Result{}, err
}
return kube_ctrl.Result{}, nil
return kube_ctrl.Result{}, r.reconcileZoneIngress(ctx, pod, log)
}

// for Pods marked with egress annotation special type of Dataplane will be injected
Expand All @@ -100,58 +82,135 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req kube_ctrl.Request) (k
return kube_ctrl.Result{}, err
}
if egressExist && egressEnabled {
if pod.Namespace != r.SystemNamespace {
return kube_ctrl.Result{}, errors.Errorf("Egress can only be deployed in system namespace %q", r.SystemNamespace)
}
services, err := r.findMatchingServices(ctx, pod)
if err != nil {
return kube_ctrl.Result{}, err
}
err = r.createOrUpdateEgress(ctx, pod, services)
if err != nil {
return kube_ctrl.Result{}, err
}

return kube_ctrl.Result{}, nil
}

ns := kube_core.Namespace{}
if err := r.Client.Get(ctx, kube_types.NamespacedName{Name: pod.Namespace}, &ns); err != nil {
return kube_ctrl.Result{}, errors.Wrap(err, "unable to get Namespace for Pod")
return kube_ctrl.Result{}, r.reconcileZoneEgress(ctx, pod, log)
}

// If we are using a builtin gateway, we want to generate a builtin gateway
// dataplane.
if name, _ := metadata.Annotations(pod.Annotations).GetString(metadata.KumaGatewayAnnotation); name == metadata.AnnotationBuiltin {
return kube_ctrl.Result{}, r.createorUpdateBuiltinGatewayDataplane(ctx, pod, &ns)
return kube_ctrl.Result{}, r.reconcileBuiltinGatewayDataplane(ctx, pod, log)
}

// only Pods with injected Kuma need a Dataplane descriptor
injected, exist, err := metadata.Annotations(pod.Annotations).GetEnabled(metadata.KumaSidecarInjectedAnnotation)
injected, _, err := metadata.Annotations(pod.Annotations).GetEnabled(metadata.KumaSidecarInjectedAnnotation)
if err != nil {
return kube_ctrl.Result{}, err
}
if !exist || !injected {
return kube_ctrl.Result{}, nil
if injected {
return kube_ctrl.Result{}, r.reconcileDataplane(ctx, pod, log)
}

return kube_ctrl.Result{}, nil
}

func (r *PodReconciler) reconcileDataplane(ctx context.Context, pod *kube_core.Pod, log logr.Logger) error {
dp := &mesh_k8s.Dataplane{
ObjectMeta: kube_meta.ObjectMeta{Name: pod.Name, Namespace: pod.Namespace},
}
if pod.Status.Phase == kube_core.PodSucceeded {
// Remove Dataplane object for Pods that are indefinitely in Succeeded phase, i.e. Jobs
return r.deleteObjectIfExist(ctx, dp, "pod succeeded", log)
}
if pod.Status.PodIP == "" {
return r.deleteObjectIfExist(ctx, dp, "pod IP is empty", log)
}

ns := kube_core.Namespace{}
if err := r.Client.Get(ctx, kube_types.NamespacedName{Name: pod.Namespace}, &ns); err != nil {
return errors.Wrap(err, "unable to get Namespace for Pod")
}

services, err := r.findMatchingServices(ctx, pod)
if err != nil {
return kube_ctrl.Result{}, err
return err
}

others, err := r.findOtherDataplanes(ctx, pod, &ns)
if err != nil {
return kube_ctrl.Result{}, err
return err
}

r.Log.WithValues("req", req).V(1).Info("other dataplanes", "others", others)

if err := r.createOrUpdateDataplane(ctx, pod, &ns, services, others); err != nil {
return kube_ctrl.Result{}, err
return err
}
return nil
}

return kube_ctrl.Result{}, nil
func (r *PodReconciler) deleteObjectIfExist(ctx context.Context, object k8s_model.KubernetesObject, cause string, log logr.Logger) error {
log = log.WithValues(
"cause", cause,
"kind", object.GetObjectKind(),
"name", object.GetName(),
"namespace", object.GetNamespace(),
)
if err := r.Client.Delete(ctx, object); err != nil {
if kube_apierrs.IsNotFound(err) {
log.V(1).Info("Object is not found, nothing to delete")
return nil
}
return errors.Wrapf(err, "could not delete %v %s/%s", object.GetObjectKind(), object.GetName(), object.GetNamespace())
}
log.Info("Object deleted")
return nil
}

func (r *PodReconciler) reconcileBuiltinGatewayDataplane(ctx context.Context, pod *kube_core.Pod, log logr.Logger) error {
if pod.Status.PodIP == "" {
dp := &mesh_k8s.Dataplane{
ObjectMeta: kube_meta.ObjectMeta{Name: pod.Name, Namespace: pod.Namespace},
}
return r.deleteObjectIfExist(ctx, dp, "pod IP is empty", log)
}

ns := kube_core.Namespace{}
if err := r.Client.Get(ctx, kube_types.NamespacedName{Name: pod.Namespace}, &ns); err != nil {
return errors.Wrap(err, "unable to get Namespace for Pod")
}
return r.createorUpdateBuiltinGatewayDataplane(ctx, pod, &ns)
}

func (r *PodReconciler) reconcileZoneIngress(ctx context.Context, pod *kube_core.Pod, log logr.Logger) error {
if pod.Status.PodIP == "" {
zi := &mesh_k8s.ZoneIngress{
ObjectMeta: kube_meta.ObjectMeta{Name: pod.Name},
}
return r.deleteObjectIfExist(ctx, zi, "pod IP is empty", log)
}

if pod.Namespace != r.SystemNamespace {
return errors.Errorf("Ingress can only be deployed in system namespace %q", r.SystemNamespace)
}
services, err := r.findMatchingServices(ctx, pod)
if err != nil {
return err
}
err = r.createOrUpdateIngress(ctx, pod, services)
if err != nil {
return err
}
return nil
}

func (r *PodReconciler) reconcileZoneEgress(ctx context.Context, pod *kube_core.Pod, log logr.Logger) error {
if pod.Status.PodIP == "" {
zi := &mesh_k8s.ZoneEgress{
ObjectMeta: kube_meta.ObjectMeta{Name: pod.Name},
}
return r.deleteObjectIfExist(ctx, zi, "pod IP is empty", log)
}

if pod.Namespace != r.SystemNamespace {
return errors.Errorf("Egress can only be deployed in system namespace %q", r.SystemNamespace)
}
services, err := r.findMatchingServices(ctx, pod)
if err != nil {
return err
}
err = r.createOrUpdateEgress(ctx, pod, services)
if err != nil {
return err
}
return nil
}

func (r *PodReconciler) findMatchingServices(ctx context.Context, pod *kube_core.Pod) ([]*kube_core.Service, error) {
Expand Down Expand Up @@ -218,16 +277,18 @@ func (r *PodReconciler) createOrUpdateDataplane(
}
return nil
})
log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name})
if err != nil {
log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name})
log.Error(err, "unable to create/update Dataplane", "operationResult", operationResult)
r.EventRecorder.Eventf(pod, kube_core.EventTypeWarning, FailedToGenerateKumaDataplaneReason, "Failed to generate Kuma Dataplane: %s", err.Error())
return err
}
switch operationResult {
case kube_controllerutil.OperationResultCreated:
log.Info("Dataplane created")
r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, CreatedKumaDataplaneReason, "Created Kuma Dataplane: %s", pod.Name)
case kube_controllerutil.OperationResultUpdated:
log.Info("Dataplane updated")
r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, UpdatedKumaDataplaneReason, "Updated Kuma Dataplane: %s", pod.Name)
}
return nil
Expand All @@ -250,16 +311,18 @@ func (r *PodReconciler) createOrUpdateIngress(ctx context.Context, pod *kube_cor
}
return nil
})
log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name})
if err != nil {
log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name})
log.Error(err, "unable to create/update Ingress", "operationResult", operationResult)
r.EventRecorder.Eventf(pod, kube_core.EventTypeWarning, FailedToGenerateKumaDataplaneReason, "Failed to generate Kuma Ingress: %s", err.Error())
return err
}
switch operationResult {
case kube_controllerutil.OperationResultCreated:
log.Info("ZoneIngress created")
r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, CreatedKumaDataplaneReason, "Created Kuma Ingress: %s", pod.Name)
case kube_controllerutil.OperationResultUpdated:
log.Info("ZoneIngress updated")
r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, UpdatedKumaDataplaneReason, "Updated Kuma Ingress: %s", pod.Name)
}
return nil
Expand All @@ -282,16 +345,18 @@ func (r *PodReconciler) createOrUpdateEgress(ctx context.Context, pod *kube_core
}
return nil
})
log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name})
if err != nil {
log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name})
log.Error(err, "unable to create/update Egress", "operationResult", operationResult)
r.EventRecorder.Eventf(pod, kube_core.EventTypeWarning, FailedToGenerateKumaDataplaneReason, "Failed to generate Kuma Egress: %s", err.Error())
return err
}
switch operationResult {
case kube_controllerutil.OperationResultCreated:
log.Info("ZoneEgress created")
r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, CreatedKumaDataplaneReason, "Created Kuma Egress: %s", pod.Name)
case kube_controllerutil.OperationResultUpdated:
log.Info("ZoneEgress updated")
r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, UpdatedKumaDataplaneReason, "Updated Kuma Egress: %s", pod.Name)
}
return nil
Expand All @@ -308,20 +373,6 @@ func (r *PodReconciler) SetupWithManager(mgr kube_ctrl.Manager) error {
Complete(r)
}

func (r *PodReconciler) isPodComplete(pod *kube_core.Pod) bool {
for _, cs := range pod.Status.ContainerStatuses {
// the sidecar amy or may not be terminated yet
if cs.Name == util_k8s.KumaSidecarContainerName {
continue
}
if cs.State.Terminated == nil {
// at least one container not terminated, therefore pod is still active
return false
}
}
return true
}

func ServiceToPodsMapper(l logr.Logger, client kube_client.Client) kube_handler.MapFunc {
l = l.WithName("service-to-pods-mapper")
return func(obj kube_client.Object) []kube_reconile.Request {
Expand Down
15 changes: 3 additions & 12 deletions pkg/plugins/runtime/k8s/controllers/pod_status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/go-logr/logr"
"github.com/pkg/errors"
"go.uber.org/multierr"
kube_core "k8s.io/api/core/v1"
kube_apierrs "k8s.io/apimachinery/pkg/api/errors"
kube_runtime "k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -64,18 +63,10 @@ func (r *PodStatusReconciler) Reconcile(ctx context.Context, req kube_ctrl.Reque
return kube_ctrl.Result{}, err
}

var errs error
err := r.EnvoyAdminClient.PostQuit(ctx, dp)
if err != nil {
errs = multierr.Append(errs, errors.Wrapf(err, "envoy admin client failed. Most probably the pod is already going down."))
if err := r.EnvoyAdminClient.PostQuit(ctx, dp); err != nil {
return kube_ctrl.Result{}, errors.Wrap(err, "envoy admin client failed. Most probably the pod is already going down.")
}

err = r.Client.Delete(ctx, dataplane)
if err != nil {
errs = multierr.Append(errs, errors.Wrapf(err, "unable to delete the job's dataplane"))
}

return kube_ctrl.Result{}, errs
return kube_ctrl.Result{}, nil
}

func (r *PodStatusReconciler) SetupWithManager(mgr kube_ctrl.Manager) error {
Expand Down
Loading

0 comments on commit e700ced

Please sign in to comment.