diff --git a/charts/beyla/templates/daemon-set.yaml b/charts/beyla/templates/daemon-set.yaml index e05eb85fd..04885c21d 100644 --- a/charts/beyla/templates/daemon-set.yaml +++ b/charts/beyla/templates/daemon-set.yaml @@ -123,6 +123,10 @@ spec: value: beyla-$(KUBE_NAMESPACE) - name: BEYLA_BPF_FS_BASE_DIR value: /sys/fs/bpf + - name: HOSTNAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName {{- end }} {{- range $key, $value := .Values.env }} - name: {{ $key }} diff --git a/pkg/internal/discover/watcher_kube.go b/pkg/internal/discover/watcher_kube.go index 425c18e29..76e3a0b14 100644 --- a/pkg/internal/discover/watcher_kube.go +++ b/pkg/internal/discover/watcher_kube.go @@ -179,13 +179,14 @@ func (wk *watcherKubeEnricher) onNewProcess(procInfo processAttrs) (processAttrs containerInfo, err := wk.getContainerInfo(procInfo.pid) if err != nil { // it is expected for any process not running inside a container - wk.log.Debug("can't get container info for PID", "pid", procInfo.pid, "error", err) + wk.log.Warn("can't get container info for PID", "pid", procInfo.pid, "error", err) return processAttrs{}, false } wk.processByContainer[containerInfo.ContainerID] = procInfo if pod, ok := wk.informer.GetContainerPod(containerInfo.ContainerID); ok { + wk.log.Info("Pod found for container", "containerID", containerInfo.ContainerID, "pod", pod.Name) procInfo = withMetadata(procInfo, pod) } return procInfo, true @@ -193,7 +194,7 @@ func (wk *watcherKubeEnricher) onNewProcess(procInfo processAttrs) (processAttrs func (wk *watcherKubeEnricher) onNewPod(pod *kube.PodInfo) []Event[processAttrs] { wk.updateNewPodsByOwnerIndex(pod) - + wk.log.Info("Pod added", "namespace", pod.Namespace, "name", pod.Name, "containers", pod.ContainerIDs) var events []Event[processAttrs] for _, containerID := range pod.ContainerIDs { if procInfo, ok := wk.processByContainer[containerID]; ok { diff --git a/pkg/internal/kube/informer.go b/pkg/internal/kube/informer.go index ec3b3651f..1e9596af9 100644 --- a/pkg/internal/kube/informer.go +++ b/pkg/internal/kube/informer.go @@ -11,6 +11,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -115,8 +116,9 @@ var nodeIndexers = cache.Indexers{ // GetContainerPod fetches metadata from a Pod given the name of one of its containers func (k *Metadata) GetContainerPod(containerID string) (*PodInfo, bool) { objs, err := k.pods.GetIndexer().ByIndex(IndexPodByContainerIDs, containerID) + k.log.Info("fetching pod by container ID", "containerID", containerID, "objs", objs, "len", len(objs)) if err != nil { - k.log.Debug("error accessing index by container ID. Ignoring", "error", err, "containerID", containerID) + k.log.Warn("error accessing index by container ID. Ignoring", "error", err, "containerID", containerID) return nil, false } if len(objs) == 0 { @@ -127,7 +129,6 @@ func (k *Metadata) GetContainerPod(containerID string) (*PodInfo, bool) { func (k *Metadata) initPodInformer(informerFactory informers.SharedInformerFactory) error { pods := informerFactory.Core().V1().Pods().Informer() - k.initContainerListeners(pods) // Transform any *v1.Pod instance into a *PodInfo instance to save space @@ -137,9 +138,9 @@ func (k *Metadata) initPodInformer(informerFactory informers.SharedInformerFacto if !ok { // it's Ok. The K8s library just informed from an entity // that has been previously transformed/stored - if pi, ok := i.(*PodInfo); ok { - return pi, nil - } + // if pi, ok := i.(*PodInfo); ok { + // return pi, nil + // } return nil, fmt.Errorf("was expecting a Pod. Got: %T", i) } containerIDs := make([]string, 0, @@ -165,6 +166,7 @@ func (k *Metadata) initPodInformer(informerFactory informers.SharedInformerFacto if ip.IP != pod.Status.HostIP { ips = append(ips, ip.IP) } + k.log.Info("pod IPs", "pod", pod.Name, "ip", ip.IP) } owner := OwnerFrom(pod.OwnerReferences) @@ -266,10 +268,16 @@ func (k *Metadata) initInformers(ctx context.Context, client kubernetes.Interfac if syncTimeout <= 0 { syncTimeout = defaultSyncTimeout } - informerFactory := informers.NewSharedInformerFactory(client, resyncTime) - if err := k.initPodInformer(informerFactory); err != nil { + fieldSelector := fields.OneTermEqualSelector("spec.nodeName", os.Getenv("HOSTNAME")).String() + k.log.Info("using field selector", "selector", fieldSelector) + opts := informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = fieldSelector + }) + informerFactoryPod := informers.NewSharedInformerFactoryWithOptions(client, resyncTime, opts) + if err := k.initPodInformer(informerFactoryPod); err != nil { return err } + informerFactory := informers.NewSharedInformerFactory(client, resyncTime) if err := k.initNodeIPInformer(informerFactory); err != nil { return err } @@ -280,15 +288,22 @@ func (k *Metadata) initInformers(ctx context.Context, client kubernetes.Interfac log := klog() log.Debug("starting kubernetes informers, waiting for syncronization") informerFactory.Start(ctx.Done()) + informerFactoryPod.Start(ctx.Done()) finishedCacheSync := make(chan struct{}) + finishedCacheSyncPod := make(chan struct{}) go func() { informerFactory.WaitForCacheSync(ctx.Done()) + informerFactoryPod.WaitForCacheSync(ctx.Done()) close(finishedCacheSync) + close(finishedCacheSyncPod) }() select { case <-finishedCacheSync: log.Debug("kubernetes informers started") return nil + case <-finishedCacheSyncPod: + log.Debug("kubernetes informers started") + return nil case <-time.After(syncTimeout): return fmt.Errorf("kubernetes cache has not been synced after %s timeout", syncTimeout) } diff --git a/pkg/internal/transform/kube/db.go b/pkg/internal/transform/kube/db.go index 9cd48d839..2f37c3ded 100644 --- a/pkg/internal/transform/kube/db.go +++ b/pkg/internal/transform/kube/db.go @@ -143,7 +143,7 @@ func (id *Database) addProcess(ifp *container.Info) { func (id *Database) AddProcess(pid uint32) { ifp, err := container.InfoForPID(pid) if err != nil { - dblog().Debug("failing to get container information", "pid", pid, "error", err) + dblog().Warn("failing to get container information", "pid", pid, "error", err) return } @@ -170,6 +170,7 @@ func (id *Database) OwnerPodInfo(pidNamespace uint32) (*kube.PodInfo, bool) { } pod, ok = id.informer.GetContainerPod(info.ContainerID) if !ok { + dblog().Warn("can't find pod for PID namespace", "pid", pidNamespace) return nil, false } id.fetchedPodsCache[pidNamespace] = pod