diff --git a/pkg/internal/discover/container_updater.go b/pkg/internal/discover/container_updater.go index 8b619b67a..fca02ba37 100644 --- a/pkg/internal/discover/container_updater.go +++ b/pkg/internal/discover/container_updater.go @@ -28,7 +28,9 @@ func updateLoop(db *kube.Database) pipe.MiddleFunc[[]Event[Instrumentable], []Ev db.AddProcess(uint32(ev.Obj.FileInfo.Pid)) case EventDeleted: // we don't need to handle process deletion from here, as the Kubernetes informer will - // remove the process from the database when the Pod that contains it is deleted + // remove the process from the database when the Pod that contains it is deleted. + // However we clean-up the performance related caches, in case we miss pod removal event + db.CleanProcessCaches(ev.Obj.FileInfo.Ns) } } out <- instrumentables diff --git a/pkg/internal/transform/kube/db.go b/pkg/internal/transform/kube/db.go index 051a5f22e..438480a02 100644 --- a/pkg/internal/transform/kube/db.go +++ b/pkg/internal/transform/kube/db.go @@ -98,9 +98,7 @@ func (id *Database) OnDeletion(containerID []string) { delete(id.containerIDs, cid) id.cntMut.Unlock() if ok { - id.podsCacheMut.Lock() - delete(id.fetchedPodsCache, info.PIDNamespace) - id.podsCacheMut.Unlock() + id.deletePodCache(info.PIDNamespace) id.nsMut.Lock() delete(id.namespaces, info.PIDNamespace) id.nsMut.Unlock() @@ -108,6 +106,16 @@ func (id *Database) OnDeletion(containerID []string) { } } +func (id *Database) addProcess(ifp *container.Info) { + id.deletePodCache(ifp.PIDNamespace) + id.nsMut.Lock() + id.namespaces[ifp.PIDNamespace] = ifp + id.nsMut.Unlock() + id.cntMut.Lock() + id.containerIDs[ifp.ContainerID] = ifp + id.cntMut.Unlock() +} + // AddProcess also searches for the container.Info of the passed PID func (id *Database) AddProcess(pid uint32) { ifp, err := container.InfoForPID(pid) @@ -115,12 +123,20 @@ func (id *Database) AddProcess(pid uint32) { dblog().Debug("failing to get container information", "pid", pid, "error", err) return } - id.nsMut.Lock() - id.namespaces[ifp.PIDNamespace] = &ifp - id.nsMut.Unlock() - id.cntMut.Lock() - id.containerIDs[ifp.ContainerID] = &ifp - id.cntMut.Unlock() + + id.addProcess(&ifp) +} + +func (id *Database) CleanProcessCaches(ns uint32) { + // Don't delete the id.namespaces, we can't tell if Add/Delete events + // are in order. Deleting from the cache is safe, since it will be rebuilt. + id.deletePodCache(ns) +} + +func (id *Database) deletePodCache(ns uint32) { + id.podsCacheMut.Lock() + delete(id.fetchedPodsCache, ns) + id.podsCacheMut.Unlock() } // OwnerPodInfo returns the information of the pod owning the passed namespace diff --git a/pkg/internal/transform/kube/db_test.go b/pkg/internal/transform/kube/db_test.go new file mode 100644 index 000000000..32b8bcd30 --- /dev/null +++ b/pkg/internal/transform/kube/db_test.go @@ -0,0 +1,61 @@ +package kube + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/grafana/beyla/pkg/internal/helpers/container" + "github.com/grafana/beyla/pkg/internal/kube" +) + +func Test_NamespaceReuse(t *testing.T) { + db := CreateDatabase(&kube.Metadata{}) + ifp := container.Info{ + ContainerID: "a", + PIDNamespace: 111, + } + db.addProcess(&ifp) + // pretend we resolved some pod info earlier + db.fetchedPodsCache[ifp.PIDNamespace] = &kube.PodInfo{NodeName: "a"} + info, err := db.OwnerPodInfo(111) + assert.True(t, err) + assert.NotNil(t, info) + assert.Equal(t, "a", info.NodeName) + + _, ok := db.fetchedPodsCache[ifp.PIDNamespace] + assert.True(t, ok) + + ifp1 := container.Info{ + ContainerID: "b", + PIDNamespace: ifp.PIDNamespace, + } + + // We overwrite the container info with new namespace + db.addProcess(&ifp1) + _, ok = db.fetchedPodsCache[ifp.PIDNamespace] + assert.False(t, ok) +} + +func Test_NamespaceCacheCleanup(t *testing.T) { + db := CreateDatabase(&kube.Metadata{}) + ifp := container.Info{ + ContainerID: "a", + PIDNamespace: 111, + } + db.addProcess(&ifp) + // pretend we resolved some pod info earlier + db.fetchedPodsCache[ifp.PIDNamespace] = &kube.PodInfo{NodeName: "a"} + info, err := db.OwnerPodInfo(111) + assert.True(t, err) + assert.NotNil(t, info) + assert.Equal(t, "a", info.NodeName) + + _, ok := db.fetchedPodsCache[ifp.PIDNamespace] + assert.True(t, ok) + + // We overwrite the container info with new namespace + db.CleanProcessCaches(ifp.PIDNamespace) + _, ok = db.fetchedPodsCache[ifp.PIDNamespace] + assert.False(t, ok) +}