diff --git a/pkg/internal/transform/kube/db.go b/pkg/internal/transform/kube/db.go index fbba34a24..6c2306ed1 100644 --- a/pkg/internal/transform/kube/db.go +++ b/pkg/internal/transform/kube/db.go @@ -22,28 +22,23 @@ func dblog() *slog.Logger { type Database struct { informer *kube.Metadata - cntMut sync.Mutex + access sync.RWMutex + containerIDs map[string]*container.Info // a single namespace will point to any container inside the pod // but we don't care which one - nsMut sync.RWMutex namespaces map[uint32]*container.Info - // key: pid namespace - podsCacheMut sync.RWMutex fetchedPodsCache map[uint32]*kube.PodInfo // ip to pod name matcher - podsMut sync.RWMutex podsByIP map[string]*kube.PodInfo // ip to service name matcher - svcMut sync.RWMutex svcByIP map[string]*kube.ServiceInfo // ip to node name matcher - nodeMut sync.RWMutex nodeByIP map[string]*kube.NodeInfo } @@ -68,8 +63,7 @@ func StartDatabase(kubeMetadata *kube.Metadata) (*Database, error) { db.UpdateNewPodsByIPIndex(obj.(*kube.PodInfo)) }, UpdateFunc: func(oldObj, newObj interface{}) { - db.UpdateDeletedPodsByIPIndex(oldObj.(*kube.PodInfo)) - db.UpdateNewPodsByIPIndex(newObj.(*kube.PodInfo)) + db.UpdatePodsByIPIndex(oldObj.(*kube.PodInfo), newObj.(*kube.PodInfo)) }, DeleteFunc: func(obj interface{}) { db.UpdateDeletedPodsByIPIndex(obj.(*kube.PodInfo)) @@ -111,28 +105,24 @@ func StartDatabase(kubeMetadata *kube.Metadata) (*Database, error) { // OnDeletion implements ContainerEventHandler func (id *Database) OnDeletion(containerID []string) { + id.access.Lock() + defer id.access.Unlock() for _, cid := range containerID { - id.cntMut.Lock() info, ok := id.containerIDs[cid] delete(id.containerIDs, cid) - id.cntMut.Unlock() if ok { - id.deletePodCache(info.PIDNamespace) - id.nsMut.Lock() + delete(id.fetchedPodsCache, info.PIDNamespace) delete(id.namespaces, info.PIDNamespace) - id.nsMut.Unlock() } } } func (id *Database) addProcess(ifp *container.Info) { - id.deletePodCache(ifp.PIDNamespace) - id.nsMut.Lock() + id.access.Lock() + defer id.access.Unlock() + delete(id.fetchedPodsCache, ifp.PIDNamespace) id.namespaces[ifp.PIDNamespace] = ifp - id.nsMut.Unlock() - id.cntMut.Lock() id.containerIDs[ifp.ContainerID] = ifp - id.cntMut.Unlock() } // AddProcess also searches for the container.Info of the passed PID @@ -147,26 +137,20 @@ func (id *Database) AddProcess(pid uint32) { } func (id *Database) CleanProcessCaches(ns uint32) { + id.access.Lock() + defer id.access.Unlock() // Don't delete the id.namespaces, we can't tell if Add/Delete events // are in order. Deleting from the cache is safe, since it will be rebuilt. - id.deletePodCache(ns) -} - -func (id *Database) deletePodCache(ns uint32) { - id.podsCacheMut.Lock() delete(id.fetchedPodsCache, ns) - id.podsCacheMut.Unlock() } // OwnerPodInfo returns the information of the pod owning the passed namespace func (id *Database) OwnerPodInfo(pidNamespace uint32) (*kube.PodInfo, bool) { - id.podsCacheMut.RLock() + id.access.Lock() + defer id.access.Unlock() pod, ok := id.fetchedPodsCache[pidNamespace] - id.podsCacheMut.RUnlock() if !ok { - id.nsMut.RLock() info, ok := id.namespaces[pidNamespace] - id.nsMut.RUnlock() if !ok { return nil, false } @@ -174,9 +158,7 @@ func (id *Database) OwnerPodInfo(pidNamespace uint32) (*kube.PodInfo, bool) { if !ok { return nil, false } - id.podsCacheMut.Lock() id.fetchedPodsCache[pidNamespace] = pod - id.podsCacheMut.Unlock() } // we check DeploymentName after caching, as the replicasetInfo might be // received late by the replicaset informer @@ -185,35 +167,58 @@ func (id *Database) OwnerPodInfo(pidNamespace uint32) (*kube.PodInfo, bool) { } func (id *Database) UpdateNewPodsByIPIndex(pod *kube.PodInfo) { + id.access.Lock() + defer id.access.Unlock() if len(pod.IPInfo.IPs) > 0 { - id.podsMut.Lock() - defer id.podsMut.Unlock() - for _, ip := range pod.IPInfo.IPs { - id.podsByIP[ip] = pod - } + id.addPods(pod) } } func (id *Database) UpdateDeletedPodsByIPIndex(pod *kube.PodInfo) { + id.access.Lock() + defer id.access.Unlock() if len(pod.IPInfo.IPs) > 0 { - id.podsMut.Lock() - defer id.podsMut.Unlock() - for _, ip := range pod.IPInfo.IPs { - delete(id.podsByIP, ip) + id.deletePods(pod) + } +} + +func (id *Database) UpdatePodsByIPIndex(oldPod, newPod *kube.PodInfo) { + id.access.Lock() + defer id.access.Unlock() + id.deletePods(oldPod) + id.addPods(newPod) +} + +func (id *Database) addPods(pod *kube.PodInfo) { + for _, ip := range pod.IPInfo.IPs { + id.podsByIP[ip] = pod + } +} + +func (id *Database) deletePods(pod *kube.PodInfo) { + for _, ip := range pod.IPInfo.IPs { + delete(id.podsByIP, ip) + for _, cid := range pod.ContainerIDs { + cnt, ok := id.containerIDs[cid] + delete(id.containerIDs, cid) + if ok { + delete(id.namespaces, cnt.PIDNamespace) + delete(id.fetchedPodsCache, cnt.PIDNamespace) + } } } } func (id *Database) PodInfoForIP(ip string) *kube.PodInfo { - id.podsMut.RLock() - defer id.podsMut.RUnlock() + id.access.RLock() + defer id.access.RUnlock() return id.podsByIP[ip] } func (id *Database) UpdateNewServicesByIPIndex(svc *kube.ServiceInfo) { + id.access.Lock() + defer id.access.Unlock() if len(svc.IPInfo.IPs) > 0 { - id.svcMut.Lock() - defer id.svcMut.Unlock() for _, ip := range svc.IPInfo.IPs { id.svcByIP[ip] = svc } @@ -221,9 +226,9 @@ func (id *Database) UpdateNewServicesByIPIndex(svc *kube.ServiceInfo) { } func (id *Database) UpdateDeletedServicesByIPIndex(svc *kube.ServiceInfo) { + id.access.Lock() + defer id.access.Unlock() if len(svc.IPInfo.IPs) > 0 { - id.svcMut.Lock() - defer id.svcMut.Unlock() for _, ip := range svc.IPInfo.IPs { delete(id.svcByIP, ip) } @@ -231,14 +236,14 @@ func (id *Database) UpdateDeletedServicesByIPIndex(svc *kube.ServiceInfo) { } func (id *Database) ServiceInfoForIP(ip string) *kube.ServiceInfo { - id.svcMut.RLock() - defer id.svcMut.RUnlock() + id.access.RLock() + defer id.access.RUnlock() return id.svcByIP[ip] } func (id *Database) UpdateNewNodesByIPIndex(svc *kube.NodeInfo) { - id.nodeMut.Lock() - defer id.nodeMut.Unlock() + id.access.Lock() + defer id.access.Unlock() if len(svc.IPInfo.IPs) > 0 { for _, ip := range svc.IPInfo.IPs { id.nodeByIP[ip] = svc @@ -247,8 +252,8 @@ func (id *Database) UpdateNewNodesByIPIndex(svc *kube.NodeInfo) { } func (id *Database) UpdateDeletedNodesByIPIndex(svc *kube.NodeInfo) { - id.nodeMut.Lock() - defer id.nodeMut.Unlock() + id.access.Lock() + defer id.access.Unlock() if len(svc.IPInfo.IPs) > 0 { for _, ip := range svc.IPInfo.IPs { delete(id.nodeByIP, ip) @@ -257,27 +262,23 @@ func (id *Database) UpdateDeletedNodesByIPIndex(svc *kube.NodeInfo) { } func (id *Database) NodeInfoForIP(ip string) *kube.NodeInfo { - id.nodeMut.RLock() - defer id.nodeMut.RUnlock() + id.access.RLock() + defer id.access.RUnlock() return id.nodeByIP[ip] } func (id *Database) HostNameForIP(ip string) string { - id.svcMut.RLock() + id.access.RLock() + defer id.access.RUnlock() svc, ok := id.svcByIP[ip] - id.svcMut.RUnlock() if ok { return svc.Name } - id.podsMut.RLock() pod, ok := id.podsByIP[ip] - id.podsMut.RUnlock() if ok { return pod.Name } - id.nodeMut.RLock() node, ok := id.nodeByIP[ip] - id.nodeMut.RUnlock() if ok { return node.Name } @@ -285,21 +286,17 @@ func (id *Database) HostNameForIP(ip string) string { } func (id *Database) ServiceNameNamespaceForIP(ip string) (string, string) { - id.svcMut.RLock() + id.access.RLock() + defer id.access.RUnlock() svc, ok := id.svcByIP[ip] - id.svcMut.RUnlock() if ok { return svc.Name, svc.Namespace } - id.podsMut.RLock() pod, ok := id.podsByIP[ip] - id.podsMut.RUnlock() if ok { return pod.ServiceName(), pod.Namespace } - id.nodeMut.RLock() node, ok := id.nodeByIP[ip] - id.nodeMut.RUnlock() if ok { return node.Name, node.Namespace }