Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kuma-cp): remove Dataplane for Pod without IP (backport #4964) #4980

Merged
merged 1 commit into from
Sep 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/plugins/runtime/k8s/controllers/gateway_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,19 @@ func (r *PodReconciler) createorUpdateBuiltinGatewayDataplane(ctx context.Contex
return nil
})

log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name})
if err != nil {
log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name})
log.Error(err, "unable to create/update Dataplane", "operationResult", operationResult)
r.EventRecorder.Eventf(pod, kube_core.EventTypeWarning, FailedToGenerateKumaDataplaneReason, "Failed to generate Kuma Dataplane: %s", err.Error())
return err
}

switch operationResult {
case kube_controllerutil.OperationResultCreated:
log.Info("Dataplane created")
r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, CreatedKumaDataplaneReason, "Created Kuma Dataplane: %s", dataplane.Name)
case kube_controllerutil.OperationResultUpdated:
log.Info("Dataplane updated")
r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, UpdatedKumaDataplaneReason, "Updated Kuma Dataplane: %s", dataplane.Name)
}
return nil
Expand Down
185 changes: 118 additions & 67 deletions pkg/plugins/runtime/k8s/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/kumahq/kuma/pkg/dns/vips"
k8s_common "github.com/kumahq/kuma/pkg/plugins/common/k8s"
mesh_k8s "github.com/kumahq/kuma/pkg/plugins/resources/k8s/native/api/v1alpha1"
k8s_model "github.com/kumahq/kuma/pkg/plugins/resources/k8s/native/pkg/model"
"github.com/kumahq/kuma/pkg/plugins/runtime/k8s/metadata"
util_k8s "github.com/kumahq/kuma/pkg/plugins/runtime/k8s/util"
)
Expand Down Expand Up @@ -53,45 +54,26 @@ type PodReconciler struct {

func (r *PodReconciler) Reconcile(ctx context.Context, req kube_ctrl.Request) (kube_ctrl.Result, error) {
log := r.Log.WithValues("pod", req.NamespacedName)
log.V(1).Info("reconcile")

// Fetch the Pod instance
pod := &kube_core.Pod{}
if err := r.Get(ctx, req.NamespacedName, pod); err != nil {
if kube_apierrs.IsNotFound(err) {
log.V(1).Info("pod not found. Skipping")
return kube_ctrl.Result{}, nil
}
log.Error(err, "unable to fetch Pod")
return kube_ctrl.Result{}, err
}

// skip a Pod if it doesn't have an IP address yet
if pod.Status.PodIP == "" {
return kube_ctrl.Result{}, nil
}

// skip a Pod if is complete/terminated (most probably a completed job)
if r.isPodComplete(pod) {
return kube_ctrl.Result{}, nil
}

// for Pods marked with ingress annotation special type of Dataplane will be injected
enabled, exist, err := metadata.Annotations(pod.Annotations).GetEnabled(metadata.KumaIngressAnnotation)
if err != nil {
return kube_ctrl.Result{}, err
}
if exist && enabled {
if pod.Namespace != r.SystemNamespace {
return kube_ctrl.Result{}, errors.Errorf("Ingress can only be deployed in system namespace %q", r.SystemNamespace)
}
services, err := r.findMatchingServices(ctx, pod)
if err != nil {
return kube_ctrl.Result{}, err
}
err = r.createOrUpdateIngress(ctx, pod, services)
if err != nil {
return kube_ctrl.Result{}, err
}
return kube_ctrl.Result{}, nil
return kube_ctrl.Result{}, r.reconcileZoneIngress(ctx, pod, log)
}

// for Pods marked with egress annotation special type of Dataplane will be injected
Expand All @@ -100,58 +82,135 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req kube_ctrl.Request) (k
return kube_ctrl.Result{}, err
}
if egressExist && egressEnabled {
if pod.Namespace != r.SystemNamespace {
return kube_ctrl.Result{}, errors.Errorf("Egress can only be deployed in system namespace %q", r.SystemNamespace)
}
services, err := r.findMatchingServices(ctx, pod)
if err != nil {
return kube_ctrl.Result{}, err
}
err = r.createOrUpdateEgress(ctx, pod, services)
if err != nil {
return kube_ctrl.Result{}, err
}

return kube_ctrl.Result{}, nil
}

ns := kube_core.Namespace{}
if err := r.Client.Get(ctx, kube_types.NamespacedName{Name: pod.Namespace}, &ns); err != nil {
return kube_ctrl.Result{}, errors.Wrap(err, "unable to get Namespace for Pod")
return kube_ctrl.Result{}, r.reconcileZoneEgress(ctx, pod, log)
}

// If we are using a builtin gateway, we want to generate a builtin gateway
// dataplane.
if name, _ := metadata.Annotations(pod.Annotations).GetString(metadata.KumaGatewayAnnotation); name == metadata.AnnotationBuiltin {
return kube_ctrl.Result{}, r.createorUpdateBuiltinGatewayDataplane(ctx, pod, &ns)
return kube_ctrl.Result{}, r.reconcileBuiltinGatewayDataplane(ctx, pod, log)
}

// only Pods with injected Kuma need a Dataplane descriptor
injected, exist, err := metadata.Annotations(pod.Annotations).GetEnabled(metadata.KumaSidecarInjectedAnnotation)
injected, _, err := metadata.Annotations(pod.Annotations).GetEnabled(metadata.KumaSidecarInjectedAnnotation)
if err != nil {
return kube_ctrl.Result{}, err
}
if !exist || !injected {
return kube_ctrl.Result{}, nil
if injected {
return kube_ctrl.Result{}, r.reconcileDataplane(ctx, pod, log)
}

return kube_ctrl.Result{}, nil
}

func (r *PodReconciler) reconcileDataplane(ctx context.Context, pod *kube_core.Pod, log logr.Logger) error {
dp := &mesh_k8s.Dataplane{
ObjectMeta: kube_meta.ObjectMeta{Name: pod.Name, Namespace: pod.Namespace},
}
if pod.Status.Phase == kube_core.PodSucceeded {
// Remove Dataplane object for Pods that are indefinitely in Succeeded phase, i.e. Jobs
return r.deleteObjectIfExist(ctx, dp, "pod succeeded", log)
}
if pod.Status.PodIP == "" {
return r.deleteObjectIfExist(ctx, dp, "pod IP is empty", log)
}

ns := kube_core.Namespace{}
if err := r.Client.Get(ctx, kube_types.NamespacedName{Name: pod.Namespace}, &ns); err != nil {
return errors.Wrap(err, "unable to get Namespace for Pod")
}

services, err := r.findMatchingServices(ctx, pod)
if err != nil {
return kube_ctrl.Result{}, err
return err
}

others, err := r.findOtherDataplanes(ctx, pod, &ns)
if err != nil {
return kube_ctrl.Result{}, err
return err
}

r.Log.WithValues("req", req).V(1).Info("other dataplanes", "others", others)

if err := r.createOrUpdateDataplane(ctx, pod, &ns, services, others); err != nil {
return kube_ctrl.Result{}, err
return err
}
return nil
}

return kube_ctrl.Result{}, nil
func (r *PodReconciler) deleteObjectIfExist(ctx context.Context, object k8s_model.KubernetesObject, cause string, log logr.Logger) error {
log = log.WithValues(
"cause", cause,
"kind", object.GetObjectKind(),
"name", object.GetName(),
"namespace", object.GetNamespace(),
)
if err := r.Client.Delete(ctx, object); err != nil {
if kube_apierrs.IsNotFound(err) {
log.V(1).Info("Object is not found, nothing to delete")
return nil
}
return errors.Wrapf(err, "could not delete %v %s/%s", object.GetObjectKind(), object.GetName(), object.GetNamespace())
}
log.Info("Object deleted")
return nil
}

func (r *PodReconciler) reconcileBuiltinGatewayDataplane(ctx context.Context, pod *kube_core.Pod, log logr.Logger) error {
if pod.Status.PodIP == "" {
dp := &mesh_k8s.Dataplane{
ObjectMeta: kube_meta.ObjectMeta{Name: pod.Name, Namespace: pod.Namespace},
}
return r.deleteObjectIfExist(ctx, dp, "pod IP is empty", log)
}

ns := kube_core.Namespace{}
if err := r.Client.Get(ctx, kube_types.NamespacedName{Name: pod.Namespace}, &ns); err != nil {
return errors.Wrap(err, "unable to get Namespace for Pod")
}
return r.createorUpdateBuiltinGatewayDataplane(ctx, pod, &ns)
}

func (r *PodReconciler) reconcileZoneIngress(ctx context.Context, pod *kube_core.Pod, log logr.Logger) error {
if pod.Status.PodIP == "" {
zi := &mesh_k8s.ZoneIngress{
ObjectMeta: kube_meta.ObjectMeta{Name: pod.Name},
}
return r.deleteObjectIfExist(ctx, zi, "pod IP is empty", log)
}

if pod.Namespace != r.SystemNamespace {
return errors.Errorf("Ingress can only be deployed in system namespace %q", r.SystemNamespace)
}
services, err := r.findMatchingServices(ctx, pod)
if err != nil {
return err
}
err = r.createOrUpdateIngress(ctx, pod, services)
if err != nil {
return err
}
return nil
}

func (r *PodReconciler) reconcileZoneEgress(ctx context.Context, pod *kube_core.Pod, log logr.Logger) error {
if pod.Status.PodIP == "" {
zi := &mesh_k8s.ZoneEgress{
ObjectMeta: kube_meta.ObjectMeta{Name: pod.Name},
}
return r.deleteObjectIfExist(ctx, zi, "pod IP is empty", log)
}

if pod.Namespace != r.SystemNamespace {
return errors.Errorf("Egress can only be deployed in system namespace %q", r.SystemNamespace)
}
services, err := r.findMatchingServices(ctx, pod)
if err != nil {
return err
}
err = r.createOrUpdateEgress(ctx, pod, services)
if err != nil {
return err
}
return nil
}

func (r *PodReconciler) findMatchingServices(ctx context.Context, pod *kube_core.Pod) ([]*kube_core.Service, error) {
Expand Down Expand Up @@ -218,16 +277,18 @@ func (r *PodReconciler) createOrUpdateDataplane(
}
return nil
})
log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name})
if err != nil {
log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name})
log.Error(err, "unable to create/update Dataplane", "operationResult", operationResult)
r.EventRecorder.Eventf(pod, kube_core.EventTypeWarning, FailedToGenerateKumaDataplaneReason, "Failed to generate Kuma Dataplane: %s", err.Error())
return err
}
switch operationResult {
case kube_controllerutil.OperationResultCreated:
log.Info("Dataplane created")
r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, CreatedKumaDataplaneReason, "Created Kuma Dataplane: %s", pod.Name)
case kube_controllerutil.OperationResultUpdated:
log.Info("Dataplane updated")
r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, UpdatedKumaDataplaneReason, "Updated Kuma Dataplane: %s", pod.Name)
}
return nil
Expand All @@ -250,16 +311,18 @@ func (r *PodReconciler) createOrUpdateIngress(ctx context.Context, pod *kube_cor
}
return nil
})
log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name})
if err != nil {
log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name})
log.Error(err, "unable to create/update Ingress", "operationResult", operationResult)
r.EventRecorder.Eventf(pod, kube_core.EventTypeWarning, FailedToGenerateKumaDataplaneReason, "Failed to generate Kuma Ingress: %s", err.Error())
return err
}
switch operationResult {
case kube_controllerutil.OperationResultCreated:
log.Info("ZoneIngress created")
r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, CreatedKumaDataplaneReason, "Created Kuma Ingress: %s", pod.Name)
case kube_controllerutil.OperationResultUpdated:
log.Info("ZoneIngress updated")
r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, UpdatedKumaDataplaneReason, "Updated Kuma Ingress: %s", pod.Name)
}
return nil
Expand All @@ -282,16 +345,18 @@ func (r *PodReconciler) createOrUpdateEgress(ctx context.Context, pod *kube_core
}
return nil
})
log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name})
if err != nil {
log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name})
log.Error(err, "unable to create/update Egress", "operationResult", operationResult)
r.EventRecorder.Eventf(pod, kube_core.EventTypeWarning, FailedToGenerateKumaDataplaneReason, "Failed to generate Kuma Egress: %s", err.Error())
return err
}
switch operationResult {
case kube_controllerutil.OperationResultCreated:
log.Info("ZoneEgress created")
r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, CreatedKumaDataplaneReason, "Created Kuma Egress: %s", pod.Name)
case kube_controllerutil.OperationResultUpdated:
log.Info("ZoneEgress updated")
r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, UpdatedKumaDataplaneReason, "Updated Kuma Egress: %s", pod.Name)
}
return nil
Expand All @@ -308,20 +373,6 @@ func (r *PodReconciler) SetupWithManager(mgr kube_ctrl.Manager) error {
Complete(r)
}

func (r *PodReconciler) isPodComplete(pod *kube_core.Pod) bool {
for _, cs := range pod.Status.ContainerStatuses {
// the sidecar amy or may not be terminated yet
if cs.Name == util_k8s.KumaSidecarContainerName {
continue
}
if cs.State.Terminated == nil {
// at least one container not terminated, therefore pod is still active
return false
}
}
return true
}

func ServiceToPodsMapper(l logr.Logger, client kube_client.Client) kube_handler.MapFunc {
l = l.WithName("service-to-pods-mapper")
return func(obj kube_client.Object) []kube_reconile.Request {
Expand Down
15 changes: 3 additions & 12 deletions pkg/plugins/runtime/k8s/controllers/pod_status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/go-logr/logr"
"github.com/pkg/errors"
"go.uber.org/multierr"
kube_core "k8s.io/api/core/v1"
kube_apierrs "k8s.io/apimachinery/pkg/api/errors"
kube_runtime "k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -64,18 +63,10 @@ func (r *PodStatusReconciler) Reconcile(ctx context.Context, req kube_ctrl.Reque
return kube_ctrl.Result{}, err
}

var errs error
err := r.EnvoyAdminClient.PostQuit(ctx, dp)
if err != nil {
errs = multierr.Append(errs, errors.Wrapf(err, "envoy admin client failed. Most probably the pod is already going down."))
if err := r.EnvoyAdminClient.PostQuit(ctx, dp); err != nil {
return kube_ctrl.Result{}, errors.Wrap(err, "envoy admin client failed. Most probably the pod is already going down.")
}

err = r.Client.Delete(ctx, dataplane)
if err != nil {
errs = multierr.Append(errs, errors.Wrapf(err, "unable to delete the job's dataplane"))
}

return kube_ctrl.Result{}, errs
return kube_ctrl.Result{}, nil
}

func (r *PodStatusReconciler) SetupWithManager(mgr kube_ctrl.Manager) error {
Expand Down
Loading