Skip to content

Commit

Permalink
Create a common predicate for contaniers ready, use it in Pod reconci…
Browse files Browse the repository at this point in the history
…ler for instrumentation (#1754)

Following #1666, this PR includes the following:

1. Change the predicate for ebpf instrumentation to only pass events
where all the containers in a pod become ready. This will fix cases
where the reconciliation was called too early and missed the relevant
process.
2. Create an `AllContainersReadyPredicate` which is common to the
runtime details reconciler (the predicate was added in #1666 ) and the
ebpf instrumentation reconciler.
3. Create a common `DeletionPredicate` predicate - for the
instrumentation reconcilation it is an OR between the containers ready
predicate and the delete one.
  • Loading branch information
RonFed authored Nov 15, 2024
1 parent 79115ab commit 3363fac
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 115 deletions.
8 changes: 8 additions & 0 deletions k8sutils/pkg/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ func podContainerDeviceName(container v1.Container) *string {
}

func AllContainersReady(pod *v1.Pod) bool {
// If pod has no containers, return false as we can't determine readiness
if len(pod.Status.ContainerStatuses) == 0 {
return false
}
// Check if pod is in Running phase.
if pod.Status.Phase != v1.PodRunning {
return false
}
// Iterate over all containers in the pod
// Return false if any container is:
// 1. Not Ready
Expand Down
66 changes: 66 additions & 0 deletions k8sutils/pkg/predicate/containers_ready.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package predicate

import (
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/event"
k8scontainer "github.com/odigos-io/odigos/k8sutils/pkg/container"
)

// AllContainersReadyPredicate is a predicate that checks if all containers in a pod are ready or becoming ready.
//
// For Create events, it returns true if the pod is in Running phase and all containers are ready.
// For Update events, it returns true if the new pod has all containers ready and started, and the old pod had at least one container not ready or not started.
// For Delete events, it returns false.
type AllContainersReadyPredicate struct{}

func (p *AllContainersReadyPredicate) Create(e event.CreateEvent) bool {
if e.Object == nil {
return false
}

pod, ok := e.Object.(*corev1.Pod)
if !ok {
return false
}

allContainersReady := k8scontainer.AllContainersReady(pod)
// If all containers are not ready, return false.
// Otherwise, return true
return allContainersReady
}

func (p *AllContainersReadyPredicate) Update(e event.UpdateEvent) bool {
if e.ObjectOld == nil || e.ObjectNew == nil {
return false
}

oldPod, oldOk := e.ObjectOld.(*corev1.Pod)
newPod, newOk := e.ObjectNew.(*corev1.Pod)

if !oldOk || !newOk {
return false
}

// First check if all containers in newPod are ready and started
allNewContainersReady := k8scontainer.AllContainersReady(newPod)

// If new containers aren't all ready, return false
if !allNewContainersReady {
return false
}

// Now check if any container in oldPod was not ready or not started
allOldContainersReady := k8scontainer.AllContainersReady(oldPod)

// Return true only if old pods had at least one container not ready/not started
// and new pod has all containers ready/started
return !allOldContainersReady && allNewContainersReady
}

func (p *AllContainersReadyPredicate) Delete(e event.DeleteEvent) bool {
return false
}

func (p *AllContainersReadyPredicate) Generic(e event.GenericEvent) bool {
return false
}
27 changes: 27 additions & 0 deletions k8sutils/pkg/predicate/deletion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package predicate

import (
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// DeletionPredicate only allows delete events.
type DeletionPredicate struct{}

func (o DeletionPredicate) Create(e event.CreateEvent) bool {
return false
}

func (i DeletionPredicate) Update(e event.UpdateEvent) bool {
return false
}

func (i DeletionPredicate) Delete(e event.DeleteEvent) bool {
return true
}

func (i DeletionPredicate) Generic(e event.GenericEvent) bool {
return false
}

var _ predicate.Predicate = &DeletionPredicate{}
48 changes: 6 additions & 42 deletions odiglet/pkg/kube/instrumentation_ebpf/manager.go
Original file line number Diff line number Diff line change
@@ -1,57 +1,17 @@
package instrumentation_ebpf

import (
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"

odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
odigospredicate "github.com/odigos-io/odigos/k8sutils/pkg/predicate"
"github.com/odigos-io/odigos/odiglet/pkg/ebpf"
"github.com/odigos-io/odigos/odiglet/pkg/log"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
)

type podPredicate struct {
predicate.Funcs
}

func (i *podPredicate) Create(e event.CreateEvent) bool {
// when odiglet restart, it will receive create event for all running pods
// which we need to process to instrument them
return true
}

func (i *podPredicate) Update(e event.UpdateEvent) bool {
// Cast old and new objects to *corev1.Pod
oldPod, oldOk := e.ObjectOld.(*corev1.Pod)
newPod, newOk := e.ObjectNew.(*corev1.Pod)

// Check if both old and new objects are Pods
if !oldOk || !newOk {
return false
}

// Check if the Pod status has changed from not running to running
if oldPod.Status.Phase != corev1.PodRunning && newPod.Status.Phase == corev1.PodRunning {
return true
}

// Sum the restart counts for both oldPod and newPod containers, then compare them.
// If the newPod has a higher restart count than the oldPod, we need to re-instrument it.
// This happens because the pod was abruptly killed, which caused an increment in the restart count.
// This check is required because the pod will remain running during the process kill and re-launch.
return GetPodSumRestarts(newPod) > GetPodSumRestarts(oldPod)
}

func (i *podPredicate) Delete(e event.DeleteEvent) bool {
return true
}

func (i *podPredicate) Generic(e event.GenericEvent) bool {
return false
}

func SetupWithManager(mgr ctrl.Manager, ebpfDirectors ebpf.DirectorsMap) error {

log.Logger.V(0).Info("Starting reconcileres for ebpf instrumentation")
Expand All @@ -60,7 +20,11 @@ func SetupWithManager(mgr ctrl.Manager, ebpfDirectors ebpf.DirectorsMap) error {
ControllerManagedBy(mgr).
Named("PodReconciler_ebpf").
For(&corev1.Pod{}).
WithEventFilter(&podPredicate{}).
// trigger the reconcile when either:
// 1. A Create event is accepted for a pod with all containers ready (this is relevant when Odiglet is restarted)
// 2. All containers become ready in a running pod
// 3. Pod is deleted
WithEventFilter(predicate.Or(&odigospredicate.AllContainersReadyPredicate{}, &odigospredicate.DeletionPredicate{})).
Complete(&PodsReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Expand Down
7 changes: 0 additions & 7 deletions odiglet/pkg/kube/instrumentation_ebpf/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,3 @@ func (p *PodsReconciler) getPodWorkloadObject(ctx context.Context, pod *corev1.P
return nil, nil
}

func GetPodSumRestarts(pod *corev1.Pod) int {
restartCount := 0
for _, containerStatus := range pod.Status.ContainerStatuses {
restartCount += int(containerStatus.RestartCount)
}
return restartCount
}
4 changes: 3 additions & 1 deletion odiglet/pkg/kube/runtime_details/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package runtime_details

import (
odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"

odigospredicate "github.com/odigos-io/odigos/k8sutils/pkg/predicate"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -25,7 +27,7 @@ func SetupWithManager(mgr ctrl.Manager, clientset *kubernetes.Clientset) error {
ControllerManagedBy(mgr).
Named("Odiglet-RuntimeDetails-Pods").
For(&corev1.Pod{}).
WithEventFilter(&podPredicate{}).
WithEventFilter(&odigospredicate.AllContainersReadyPredicate{}).
Complete(&PodsReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Expand Down
65 changes: 0 additions & 65 deletions odiglet/pkg/kube/runtime_details/pods_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,81 +5,16 @@ import (

odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
"github.com/odigos-io/odigos/common"
k8scontainer "github.com/odigos-io/odigos/k8sutils/pkg/container"
k8sutils "github.com/odigos-io/odigos/k8sutils/pkg/utils"
"github.com/odigos-io/odigos/k8sutils/pkg/workload"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

type podPredicate struct{}

func (p *podPredicate) Create(e event.CreateEvent) bool {
pod, ok := e.Object.(*corev1.Pod)
if !ok {
return false
}

// Check if pod is in Running phase. This is the first requirement
if pod.Status.Phase != corev1.PodRunning {
return false
}

// If pod has no containers, return false as we can't determine readiness
if len(pod.Status.ContainerStatuses) == 0 {
return false
}

allContainersReady := k8scontainer.AllContainersReady(pod)
// If all containers are not ready, return false.
// Otherwise, return true
return allContainersReady
}

func (p *podPredicate) Update(e event.UpdateEvent) bool {
oldPod, oldOk := e.ObjectOld.(*corev1.Pod)
newPod, newOk := e.ObjectNew.(*corev1.Pod)

if !oldOk || !newOk {
return false
}

// If pod has no containers, return false as we can't determine readiness
if len(newPod.Status.ContainerStatuses) == 0 {
return false
}

// First check if all containers in newPod are ready and started
allNewContainersReady := k8scontainer.AllContainersReady(newPod)

// If new containers aren't all ready, return false
if !allNewContainersReady {
return false
}

// Now check if any container in oldPod was not ready or not started
allOldContainersReady := k8scontainer.AllContainersReady(oldPod)

// Return true only if old pods had at least one container not ready/not started
// and new pod has all containers ready/started
return !allOldContainersReady && allNewContainersReady
}

func (p *podPredicate) Delete(e event.DeleteEvent) bool {
// no runtime details detection needed when a pod is deleted
return false
}

func (p *podPredicate) Generic(e event.GenericEvent) bool {
// no runtime details detection needed for generic events
return false
}

type PodsReconciler struct {
client.Client
Scheme *runtime.Scheme
Expand Down

0 comments on commit 3363fac

Please sign in to comment.