Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Field selector pod informer #1209

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions charts/beyla/templates/daemon-set.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ spec:
value: beyla-$(KUBE_NAMESPACE)
- name: BEYLA_BPF_FS_BASE_DIR
value: /sys/fs/bpf
- name: HOSTNAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
{{- end }}
{{- range $key, $value := .Values.env }}
- name: {{ $key }}
Expand Down
5 changes: 3 additions & 2 deletions pkg/internal/discover/watcher_kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,21 +179,22 @@ func (wk *watcherKubeEnricher) onNewProcess(procInfo processAttrs) (processAttrs
containerInfo, err := wk.getContainerInfo(procInfo.pid)
if err != nil {
// it is expected for any process not running inside a container
wk.log.Debug("can't get container info for PID", "pid", procInfo.pid, "error", err)
wk.log.Warn("can't get container info for PID", "pid", procInfo.pid, "error", err)
return processAttrs{}, false
}

wk.processByContainer[containerInfo.ContainerID] = procInfo

if pod, ok := wk.informer.GetContainerPod(containerInfo.ContainerID); ok {
wk.log.Info("Pod found for container", "containerID", containerInfo.ContainerID, "pod", pod.Name)
procInfo = withMetadata(procInfo, pod)
}
return procInfo, true
}

func (wk *watcherKubeEnricher) onNewPod(pod *kube.PodInfo) []Event[processAttrs] {
wk.updateNewPodsByOwnerIndex(pod)

wk.log.Info("Pod added", "namespace", pod.Namespace, "name", pod.Name, "containers", pod.ContainerIDs)
var events []Event[processAttrs]
for _, containerID := range pod.ContainerIDs {
if procInfo, ok := wk.processByContainer[containerID]; ok {
Expand Down
29 changes: 22 additions & 7 deletions pkg/internal/kube/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -115,8 +116,9 @@ var nodeIndexers = cache.Indexers{
// GetContainerPod fetches metadata from a Pod given the name of one of its containers
func (k *Metadata) GetContainerPod(containerID string) (*PodInfo, bool) {
objs, err := k.pods.GetIndexer().ByIndex(IndexPodByContainerIDs, containerID)
k.log.Info("fetching pod by container ID", "containerID", containerID, "objs", objs, "len", len(objs))
if err != nil {
k.log.Debug("error accessing index by container ID. Ignoring", "error", err, "containerID", containerID)
k.log.Warn("error accessing index by container ID. Ignoring", "error", err, "containerID", containerID)
return nil, false
}
if len(objs) == 0 {
Expand All @@ -127,7 +129,6 @@ func (k *Metadata) GetContainerPod(containerID string) (*PodInfo, bool) {

func (k *Metadata) initPodInformer(informerFactory informers.SharedInformerFactory) error {
pods := informerFactory.Core().V1().Pods().Informer()

k.initContainerListeners(pods)

// Transform any *v1.Pod instance into a *PodInfo instance to save space
Expand All @@ -137,9 +138,9 @@ func (k *Metadata) initPodInformer(informerFactory informers.SharedInformerFacto
if !ok {
// it's Ok. The K8s library just informed from an entity
// that has been previously transformed/stored
if pi, ok := i.(*PodInfo); ok {
return pi, nil
}
// if pi, ok := i.(*PodInfo); ok {
// return pi, nil
// }
return nil, fmt.Errorf("was expecting a Pod. Got: %T", i)
}
containerIDs := make([]string, 0,
Expand All @@ -165,6 +166,7 @@ func (k *Metadata) initPodInformer(informerFactory informers.SharedInformerFacto
if ip.IP != pod.Status.HostIP {
ips = append(ips, ip.IP)
}
k.log.Info("pod IPs", "pod", pod.Name, "ip", ip.IP)
}

owner := OwnerFrom(pod.OwnerReferences)
Expand Down Expand Up @@ -266,10 +268,16 @@ func (k *Metadata) initInformers(ctx context.Context, client kubernetes.Interfac
if syncTimeout <= 0 {
syncTimeout = defaultSyncTimeout
}
informerFactory := informers.NewSharedInformerFactory(client, resyncTime)
if err := k.initPodInformer(informerFactory); err != nil {
fieldSelector := fields.OneTermEqualSelector("spec.nodeName", os.Getenv("HOSTNAME")).String()
k.log.Info("using field selector", "selector", fieldSelector)
opts := informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fieldSelector
})
informerFactoryPod := informers.NewSharedInformerFactoryWithOptions(client, resyncTime, opts)
if err := k.initPodInformer(informerFactoryPod); err != nil {
return err
}
informerFactory := informers.NewSharedInformerFactory(client, resyncTime)
if err := k.initNodeIPInformer(informerFactory); err != nil {
return err
}
Expand All @@ -280,15 +288,22 @@ func (k *Metadata) initInformers(ctx context.Context, client kubernetes.Interfac
log := klog()
log.Debug("starting kubernetes informers, waiting for syncronization")
informerFactory.Start(ctx.Done())
informerFactoryPod.Start(ctx.Done())
finishedCacheSync := make(chan struct{})
finishedCacheSyncPod := make(chan struct{})
go func() {
informerFactory.WaitForCacheSync(ctx.Done())
informerFactoryPod.WaitForCacheSync(ctx.Done())
close(finishedCacheSync)
close(finishedCacheSyncPod)
}()
select {
case <-finishedCacheSync:
log.Debug("kubernetes informers started")
return nil
case <-finishedCacheSyncPod:
log.Debug("kubernetes informers started")
return nil
case <-time.After(syncTimeout):
return fmt.Errorf("kubernetes cache has not been synced after %s timeout", syncTimeout)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/internal/transform/kube/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (id *Database) addProcess(ifp *container.Info) {
func (id *Database) AddProcess(pid uint32) {
ifp, err := container.InfoForPID(pid)
if err != nil {
dblog().Debug("failing to get container information", "pid", pid, "error", err)
dblog().Warn("failing to get container information", "pid", pid, "error", err)
return
}

Expand All @@ -170,6 +170,7 @@ func (id *Database) OwnerPodInfo(pidNamespace uint32) (*kube.PodInfo, bool) {
}
pod, ok = id.informer.GetContainerPod(info.ContainerID)
if !ok {
dblog().Warn("can't find pod for PID namespace", "pid", pidNamespace)
return nil, false
}
id.fetchedPodsCache[pidNamespace] = pod
Expand Down
Loading