diff --git a/pkg/plugins/runtime/k8s/controllers/gateway_converter.go b/pkg/plugins/runtime/k8s/controllers/gateway_converter.go index c5f3c86a0a57..d5aac6d38878 100644 --- a/pkg/plugins/runtime/k8s/controllers/gateway_converter.go +++ b/pkg/plugins/runtime/k8s/controllers/gateway_converter.go @@ -62,8 +62,8 @@ func (r *PodReconciler) createorUpdateBuiltinGatewayDataplane(ctx context.Contex return nil }) + log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}) if err != nil { - log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}) log.Error(err, "unable to create/update Dataplane", "operationResult", operationResult) r.EventRecorder.Eventf(pod, kube_core.EventTypeWarning, FailedToGenerateKumaDataplaneReason, "Failed to generate Kuma Dataplane: %s", err.Error()) return err @@ -71,8 +71,10 @@ func (r *PodReconciler) createorUpdateBuiltinGatewayDataplane(ctx context.Contex switch operationResult { case kube_controllerutil.OperationResultCreated: + log.Info("Dataplane created") r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, CreatedKumaDataplaneReason, "Created Kuma Dataplane: %s", dataplane.Name) case kube_controllerutil.OperationResultUpdated: + log.Info("Dataplane updated") r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, UpdatedKumaDataplaneReason, "Updated Kuma Dataplane: %s", dataplane.Name) } return nil diff --git a/pkg/plugins/runtime/k8s/controllers/pod_controller.go b/pkg/plugins/runtime/k8s/controllers/pod_controller.go index 065dbc4a4885..8ec37a974538 100644 --- a/pkg/plugins/runtime/k8s/controllers/pod_controller.go +++ b/pkg/plugins/runtime/k8s/controllers/pod_controller.go @@ -23,6 +23,7 @@ import ( "github.com/kumahq/kuma/pkg/dns/vips" k8s_common "github.com/kumahq/kuma/pkg/plugins/common/k8s" mesh_k8s "github.com/kumahq/kuma/pkg/plugins/resources/k8s/native/api/v1alpha1" + k8s_model "github.com/kumahq/kuma/pkg/plugins/resources/k8s/native/pkg/model" "github.com/kumahq/kuma/pkg/plugins/runtime/k8s/metadata" util_k8s "github.com/kumahq/kuma/pkg/plugins/runtime/k8s/util" ) @@ -53,45 +54,26 @@ type PodReconciler struct { func (r *PodReconciler) Reconcile(ctx context.Context, req kube_ctrl.Request) (kube_ctrl.Result, error) { log := r.Log.WithValues("pod", req.NamespacedName) + log.V(1).Info("reconcile") // Fetch the Pod instance pod := &kube_core.Pod{} if err := r.Get(ctx, req.NamespacedName, pod); err != nil { if kube_apierrs.IsNotFound(err) { + log.V(1).Info("pod not found. Skipping") return kube_ctrl.Result{}, nil } log.Error(err, "unable to fetch Pod") return kube_ctrl.Result{}, err } - // skip a Pod if it doesn't have an IP address yet - if pod.Status.PodIP == "" { - return kube_ctrl.Result{}, nil - } - - // skip a Pod if is complete/terminated (most probably a completed job) - if r.isPodComplete(pod) { - return kube_ctrl.Result{}, nil - } - // for Pods marked with ingress annotation special type of Dataplane will be injected enabled, exist, err := metadata.Annotations(pod.Annotations).GetEnabled(metadata.KumaIngressAnnotation) if err != nil { return kube_ctrl.Result{}, err } if exist && enabled { - if pod.Namespace != r.SystemNamespace { - return kube_ctrl.Result{}, errors.Errorf("Ingress can only be deployed in system namespace %q", r.SystemNamespace) - } - services, err := r.findMatchingServices(ctx, pod) - if err != nil { - return kube_ctrl.Result{}, err - } - err = r.createOrUpdateIngress(ctx, pod, services) - if err != nil { - return kube_ctrl.Result{}, err - } - return kube_ctrl.Result{}, nil + return kube_ctrl.Result{}, r.reconcileZoneIngress(ctx, pod, log) } // for Pods marked with egress annotation special type of Dataplane will be injected @@ -100,58 +82,135 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req kube_ctrl.Request) (k return kube_ctrl.Result{}, err } if egressExist && egressEnabled { - if pod.Namespace != r.SystemNamespace { - return kube_ctrl.Result{}, errors.Errorf("Egress can only be deployed in system namespace %q", r.SystemNamespace) - } - services, err := r.findMatchingServices(ctx, pod) - if err != nil { - return kube_ctrl.Result{}, err - } - err = r.createOrUpdateEgress(ctx, pod, services) - if err != nil { - return kube_ctrl.Result{}, err - } - - return kube_ctrl.Result{}, nil - } - - ns := kube_core.Namespace{} - if err := r.Client.Get(ctx, kube_types.NamespacedName{Name: pod.Namespace}, &ns); err != nil { - return kube_ctrl.Result{}, errors.Wrap(err, "unable to get Namespace for Pod") + return kube_ctrl.Result{}, r.reconcileZoneEgress(ctx, pod, log) } // If we are using a builtin gateway, we want to generate a builtin gateway // dataplane. if name, _ := metadata.Annotations(pod.Annotations).GetString(metadata.KumaGatewayAnnotation); name == metadata.AnnotationBuiltin { - return kube_ctrl.Result{}, r.createorUpdateBuiltinGatewayDataplane(ctx, pod, &ns) + return kube_ctrl.Result{}, r.reconcileBuiltinGatewayDataplane(ctx, pod, log) } // only Pods with injected Kuma need a Dataplane descriptor - injected, exist, err := metadata.Annotations(pod.Annotations).GetEnabled(metadata.KumaSidecarInjectedAnnotation) + injected, _, err := metadata.Annotations(pod.Annotations).GetEnabled(metadata.KumaSidecarInjectedAnnotation) if err != nil { return kube_ctrl.Result{}, err } - if !exist || !injected { - return kube_ctrl.Result{}, nil + if injected { + return kube_ctrl.Result{}, r.reconcileDataplane(ctx, pod, log) + } + + return kube_ctrl.Result{}, nil +} + +func (r *PodReconciler) reconcileDataplane(ctx context.Context, pod *kube_core.Pod, log logr.Logger) error { + dp := &mesh_k8s.Dataplane{ + ObjectMeta: kube_meta.ObjectMeta{Name: pod.Name, Namespace: pod.Namespace}, + } + if pod.Status.Phase == kube_core.PodSucceeded { + // Remove Dataplane object for Pods that are indefinitely in Succeeded phase, i.e. Jobs + return r.deleteObjectIfExist(ctx, dp, "pod succeeded", log) + } + if pod.Status.PodIP == "" { + return r.deleteObjectIfExist(ctx, dp, "pod IP is empty", log) + } + + ns := kube_core.Namespace{} + if err := r.Client.Get(ctx, kube_types.NamespacedName{Name: pod.Namespace}, &ns); err != nil { + return errors.Wrap(err, "unable to get Namespace for Pod") } services, err := r.findMatchingServices(ctx, pod) if err != nil { - return kube_ctrl.Result{}, err + return err } others, err := r.findOtherDataplanes(ctx, pod, &ns) if err != nil { - return kube_ctrl.Result{}, err + return err } - r.Log.WithValues("req", req).V(1).Info("other dataplanes", "others", others) - if err := r.createOrUpdateDataplane(ctx, pod, &ns, services, others); err != nil { - return kube_ctrl.Result{}, err + return err } + return nil +} - return kube_ctrl.Result{}, nil +func (r *PodReconciler) deleteObjectIfExist(ctx context.Context, object k8s_model.KubernetesObject, cause string, log logr.Logger) error { + log = log.WithValues( + "cause", cause, + "kind", object.GetObjectKind(), + "name", object.GetName(), + "namespace", object.GetNamespace(), + ) + if err := r.Client.Delete(ctx, object); err != nil { + if kube_apierrs.IsNotFound(err) { + log.V(1).Info("Object is not found, nothing to delete") + return nil + } + return errors.Wrapf(err, "could not delete %v %s/%s", object.GetObjectKind(), object.GetName(), object.GetNamespace()) + } + log.Info("Object deleted") + return nil +} + +func (r *PodReconciler) reconcileBuiltinGatewayDataplane(ctx context.Context, pod *kube_core.Pod, log logr.Logger) error { + if pod.Status.PodIP == "" { + dp := &mesh_k8s.Dataplane{ + ObjectMeta: kube_meta.ObjectMeta{Name: pod.Name, Namespace: pod.Namespace}, + } + return r.deleteObjectIfExist(ctx, dp, "pod IP is empty", log) + } + + ns := kube_core.Namespace{} + if err := r.Client.Get(ctx, kube_types.NamespacedName{Name: pod.Namespace}, &ns); err != nil { + return errors.Wrap(err, "unable to get Namespace for Pod") + } + return r.createorUpdateBuiltinGatewayDataplane(ctx, pod, &ns) +} + +func (r *PodReconciler) reconcileZoneIngress(ctx context.Context, pod *kube_core.Pod, log logr.Logger) error { + if pod.Status.PodIP == "" { + zi := &mesh_k8s.ZoneIngress{ + ObjectMeta: kube_meta.ObjectMeta{Name: pod.Name}, + } + return r.deleteObjectIfExist(ctx, zi, "pod IP is empty", log) + } + + if pod.Namespace != r.SystemNamespace { + return errors.Errorf("Ingress can only be deployed in system namespace %q", r.SystemNamespace) + } + services, err := r.findMatchingServices(ctx, pod) + if err != nil { + return err + } + err = r.createOrUpdateIngress(ctx, pod, services) + if err != nil { + return err + } + return nil +} + +func (r *PodReconciler) reconcileZoneEgress(ctx context.Context, pod *kube_core.Pod, log logr.Logger) error { + if pod.Status.PodIP == "" { + zi := &mesh_k8s.ZoneEgress{ + ObjectMeta: kube_meta.ObjectMeta{Name: pod.Name}, + } + return r.deleteObjectIfExist(ctx, zi, "pod IP is empty", log) + } + + if pod.Namespace != r.SystemNamespace { + return errors.Errorf("Egress can only be deployed in system namespace %q", r.SystemNamespace) + } + services, err := r.findMatchingServices(ctx, pod) + if err != nil { + return err + } + err = r.createOrUpdateEgress(ctx, pod, services) + if err != nil { + return err + } + return nil } func (r *PodReconciler) findMatchingServices(ctx context.Context, pod *kube_core.Pod) ([]*kube_core.Service, error) { @@ -218,16 +277,18 @@ func (r *PodReconciler) createOrUpdateDataplane( } return nil }) + log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}) if err != nil { - log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}) log.Error(err, "unable to create/update Dataplane", "operationResult", operationResult) r.EventRecorder.Eventf(pod, kube_core.EventTypeWarning, FailedToGenerateKumaDataplaneReason, "Failed to generate Kuma Dataplane: %s", err.Error()) return err } switch operationResult { case kube_controllerutil.OperationResultCreated: + log.Info("Dataplane created") r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, CreatedKumaDataplaneReason, "Created Kuma Dataplane: %s", pod.Name) case kube_controllerutil.OperationResultUpdated: + log.Info("Dataplane updated") r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, UpdatedKumaDataplaneReason, "Updated Kuma Dataplane: %s", pod.Name) } return nil @@ -250,16 +311,18 @@ func (r *PodReconciler) createOrUpdateIngress(ctx context.Context, pod *kube_cor } return nil }) + log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}) if err != nil { - log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}) log.Error(err, "unable to create/update Ingress", "operationResult", operationResult) r.EventRecorder.Eventf(pod, kube_core.EventTypeWarning, FailedToGenerateKumaDataplaneReason, "Failed to generate Kuma Ingress: %s", err.Error()) return err } switch operationResult { case kube_controllerutil.OperationResultCreated: + log.Info("ZoneIngress created") r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, CreatedKumaDataplaneReason, "Created Kuma Ingress: %s", pod.Name) case kube_controllerutil.OperationResultUpdated: + log.Info("ZoneIngress updated") r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, UpdatedKumaDataplaneReason, "Updated Kuma Ingress: %s", pod.Name) } return nil @@ -282,16 +345,18 @@ func (r *PodReconciler) createOrUpdateEgress(ctx context.Context, pod *kube_core } return nil }) + log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}) if err != nil { - log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}) log.Error(err, "unable to create/update Egress", "operationResult", operationResult) r.EventRecorder.Eventf(pod, kube_core.EventTypeWarning, FailedToGenerateKumaDataplaneReason, "Failed to generate Kuma Egress: %s", err.Error()) return err } switch operationResult { case kube_controllerutil.OperationResultCreated: + log.Info("ZoneEgress created") r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, CreatedKumaDataplaneReason, "Created Kuma Egress: %s", pod.Name) case kube_controllerutil.OperationResultUpdated: + log.Info("ZoneEgress updated") r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, UpdatedKumaDataplaneReason, "Updated Kuma Egress: %s", pod.Name) } return nil @@ -308,20 +373,6 @@ func (r *PodReconciler) SetupWithManager(mgr kube_ctrl.Manager) error { Complete(r) } -func (r *PodReconciler) isPodComplete(pod *kube_core.Pod) bool { - for _, cs := range pod.Status.ContainerStatuses { - // the sidecar amy or may not be terminated yet - if cs.Name == util_k8s.KumaSidecarContainerName { - continue - } - if cs.State.Terminated == nil { - // at least one container not terminated, therefore pod is still active - return false - } - } - return true -} - func ServiceToPodsMapper(l logr.Logger, client kube_client.Client) kube_handler.MapFunc { l = l.WithName("service-to-pods-mapper") return func(obj kube_client.Object) []kube_reconile.Request { diff --git a/pkg/plugins/runtime/k8s/controllers/pod_status_controller.go b/pkg/plugins/runtime/k8s/controllers/pod_status_controller.go index fbebb82aa429..fed5eda90059 100644 --- a/pkg/plugins/runtime/k8s/controllers/pod_status_controller.go +++ b/pkg/plugins/runtime/k8s/controllers/pod_status_controller.go @@ -5,7 +5,6 @@ import ( "github.com/go-logr/logr" "github.com/pkg/errors" - "go.uber.org/multierr" kube_core "k8s.io/api/core/v1" kube_apierrs "k8s.io/apimachinery/pkg/api/errors" kube_runtime "k8s.io/apimachinery/pkg/runtime" @@ -64,18 +63,10 @@ func (r *PodStatusReconciler) Reconcile(ctx context.Context, req kube_ctrl.Reque return kube_ctrl.Result{}, err } - var errs error - err := r.EnvoyAdminClient.PostQuit(ctx, dp) - if err != nil { - errs = multierr.Append(errs, errors.Wrapf(err, "envoy admin client failed. Most probably the pod is already going down.")) + if err := r.EnvoyAdminClient.PostQuit(ctx, dp); err != nil { + return kube_ctrl.Result{}, errors.Wrap(err, "envoy admin client failed. Most probably the pod is already going down.") } - - err = r.Client.Delete(ctx, dataplane) - if err != nil { - errs = multierr.Append(errs, errors.Wrapf(err, "unable to delete the job's dataplane")) - } - - return kube_ctrl.Result{}, errs + return kube_ctrl.Result{}, nil } func (r *PodStatusReconciler) SetupWithManager(mgr kube_ctrl.Manager) error { diff --git a/test/e2e_env/kubernetes/graceful/eviction.go b/test/e2e_env/kubernetes/graceful/eviction.go new file mode 100644 index 000000000000..1b83b842f59a --- /dev/null +++ b/test/e2e_env/kubernetes/graceful/eviction.go @@ -0,0 +1,77 @@ +package graceful + +import ( + "github.com/gruntwork-io/terratest/modules/k8s" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/kumahq/kuma/test/e2e_env/kubernetes/env" + . "github.com/kumahq/kuma/test/framework" +) + +func Eviction() { + nsName := "eviction" + meshName := "eviction" + + BeforeAll(func() { + err := NewClusterSetup(). + Install(NamespaceWithSidecarInjection(nsName)). + Install(MeshKubernetes(meshName)). + Setup(env.Cluster) + Expect(err).ToNot(HaveOccurred()) + }) + + E2EAfterAll(func() { + Expect(env.Cluster.TriggerDeleteNamespace(nsName)).To(Succeed()) + Expect(env.Cluster.DeleteMesh(meshName)).To(Succeed()) + }) + + It("remove Dataplane of evicted Pod", func() { + evictionPod := `apiVersion: v1 +kind: Pod +metadata: + name: to-be-evicted + namespace: eviction + annotations: + kuma.io/mesh: eviction +spec: + volumes: + containers: + - name: alpine-evict + image: alpine + args: + - /bin/ash + - -c + - -- + - "while true; do cat /usr/bin/* ; done" + resources: + limits: + cpu: 50m + ephemeral-storage: 10Ki + memory: 64Mi` + + // when faulty pod is applied + Expect(env.Cluster.Install(YamlK8s(evictionPod))).To(Succeed()) + + // then Dataplane should be created + Eventually(func(g Gomega) { + dataplanes, err := env.Cluster.GetKumactlOptions().KumactlList("dataplanes", meshName) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(dataplanes).Should(ContainElement(ContainSubstring("to-be-evicted"))) + }, "30s", "1s").Should(Succeed()) + + // when it's evicted + Eventually(func(g Gomega) { + out, err := k8s.RunKubectlAndGetOutputE(env.Cluster.GetTesting(), env.Cluster.GetKubectlOptions(nsName), "get", "pods") + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(out).To(ContainSubstring("Evicted")) + }, "30s", "1s").Should(Succeed()) + + // then Dataplane is removed + Eventually(func(g Gomega) { + dataplanes, err := env.Cluster.GetKumactlOptions().KumactlList("dataplanes", meshName) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(dataplanes).ShouldNot(ContainElement(ContainSubstring("to-be-evicted"))) + }, "60s", "1s").Should(Succeed()) + }) +} diff --git a/test/e2e_env/kubernetes/jobs/jobs.go b/test/e2e_env/kubernetes/jobs/jobs.go index b3099c2fe639..439238ac740c 100644 --- a/test/e2e_env/kubernetes/jobs/jobs.go +++ b/test/e2e_env/kubernetes/jobs/jobs.go @@ -34,6 +34,13 @@ func Jobs() { // then CP terminates the job by sending /quitquitquit to Envoy Admin and verifies connection using self-signed certs Expect(err).ToNot(HaveOccurred()) + + // and Dataplane object is deleted + Eventually(func(g Gomega) { + out, err := env.Cluster.GetKumactlOptions().RunKumactlAndGetOutput("get", "dataplanes", "--mesh", mesh) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(out).ToNot(ContainSubstring("demo-job-client")) + }, "30s", "1s") }) It("should terminate jobs with mTLS", func() { diff --git a/test/e2e_env/kubernetes/kubernetes_suite_test.go b/test/e2e_env/kubernetes/kubernetes_suite_test.go index 2975e43243df..5faa785f265b 100644 --- a/test/e2e_env/kubernetes/kubernetes_suite_test.go +++ b/test/e2e_env/kubernetes/kubernetes_suite_test.go @@ -88,6 +88,7 @@ var _ = Describe("Gateway - Gateway API", gateway.GatewayAPI, Ordered) var _ = Describe("Gateway - mTLS", gateway.Mtls, Ordered) var _ = Describe("Gateway - Resources", gateway.Resources, Ordered) var _ = Describe("Graceful", graceful.Graceful, Ordered) +var _ = Describe("Eviction", graceful.Eviction, Ordered) var _ = Describe("Jobs", jobs.Jobs) var _ = Describe("Membership", membership.Membership, Ordered) var _ = Describe("Container Patch", container_patch.ContainerPatch, Ordered)