Skip to content

Commit

Permalink
Improving thread safety of K8s informer DB (#1118)
Browse files Browse the repository at this point in the history
* Improving thread safety of K8s informer DB

* deleting namespaced resources on pod deletion

* review suggestions
  • Loading branch information
mariomac authored Sep 2, 2024
1 parent ae750f7 commit b71dc56
Showing 1 changed file with 63 additions and 66 deletions.
129 changes: 63 additions & 66 deletions pkg/internal/transform/kube/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,23 @@ func dblog() *slog.Logger {
type Database struct {
informer *kube.Metadata

cntMut sync.Mutex
access sync.RWMutex

containerIDs map[string]*container.Info

// a single namespace will point to any container inside the pod
// but we don't care which one
nsMut sync.RWMutex
namespaces map[uint32]*container.Info

// key: pid namespace
podsCacheMut sync.RWMutex
fetchedPodsCache map[uint32]*kube.PodInfo

// ip to pod name matcher
podsMut sync.RWMutex
podsByIP map[string]*kube.PodInfo

// ip to service name matcher
svcMut sync.RWMutex
svcByIP map[string]*kube.ServiceInfo

// ip to node name matcher
nodeMut sync.RWMutex
nodeByIP map[string]*kube.NodeInfo
}

Expand All @@ -68,8 +63,7 @@ func StartDatabase(kubeMetadata *kube.Metadata) (*Database, error) {
db.UpdateNewPodsByIPIndex(obj.(*kube.PodInfo))
},
UpdateFunc: func(oldObj, newObj interface{}) {
db.UpdateDeletedPodsByIPIndex(oldObj.(*kube.PodInfo))
db.UpdateNewPodsByIPIndex(newObj.(*kube.PodInfo))
db.UpdatePodsByIPIndex(oldObj.(*kube.PodInfo), newObj.(*kube.PodInfo))
},
DeleteFunc: func(obj interface{}) {
db.UpdateDeletedPodsByIPIndex(obj.(*kube.PodInfo))
Expand Down Expand Up @@ -111,28 +105,24 @@ func StartDatabase(kubeMetadata *kube.Metadata) (*Database, error) {

// OnDeletion implements ContainerEventHandler
func (id *Database) OnDeletion(containerID []string) {
id.access.Lock()
defer id.access.Unlock()
for _, cid := range containerID {
id.cntMut.Lock()
info, ok := id.containerIDs[cid]
delete(id.containerIDs, cid)
id.cntMut.Unlock()
if ok {
id.deletePodCache(info.PIDNamespace)
id.nsMut.Lock()
delete(id.fetchedPodsCache, info.PIDNamespace)
delete(id.namespaces, info.PIDNamespace)
id.nsMut.Unlock()
}
}
}

func (id *Database) addProcess(ifp *container.Info) {
id.deletePodCache(ifp.PIDNamespace)
id.nsMut.Lock()
id.access.Lock()
defer id.access.Unlock()
delete(id.fetchedPodsCache, ifp.PIDNamespace)
id.namespaces[ifp.PIDNamespace] = ifp
id.nsMut.Unlock()
id.cntMut.Lock()
id.containerIDs[ifp.ContainerID] = ifp
id.cntMut.Unlock()
}

// AddProcess also searches for the container.Info of the passed PID
Expand All @@ -147,36 +137,28 @@ func (id *Database) AddProcess(pid uint32) {
}

func (id *Database) CleanProcessCaches(ns uint32) {
id.access.Lock()
defer id.access.Unlock()
// Don't delete the id.namespaces, we can't tell if Add/Delete events
// are in order. Deleting from the cache is safe, since it will be rebuilt.
id.deletePodCache(ns)
}

func (id *Database) deletePodCache(ns uint32) {
id.podsCacheMut.Lock()
delete(id.fetchedPodsCache, ns)
id.podsCacheMut.Unlock()
}

// OwnerPodInfo returns the information of the pod owning the passed namespace
func (id *Database) OwnerPodInfo(pidNamespace uint32) (*kube.PodInfo, bool) {
id.podsCacheMut.RLock()
id.access.Lock()
defer id.access.Unlock()
pod, ok := id.fetchedPodsCache[pidNamespace]
id.podsCacheMut.RUnlock()
if !ok {
id.nsMut.RLock()
info, ok := id.namespaces[pidNamespace]
id.nsMut.RUnlock()
if !ok {
return nil, false
}
pod, ok = id.informer.GetContainerPod(info.ContainerID)
if !ok {
return nil, false
}
id.podsCacheMut.Lock()
id.fetchedPodsCache[pidNamespace] = pod
id.podsCacheMut.Unlock()
}
// we check DeploymentName after caching, as the replicasetInfo might be
// received late by the replicaset informer
Expand All @@ -185,60 +167,83 @@ func (id *Database) OwnerPodInfo(pidNamespace uint32) (*kube.PodInfo, bool) {
}

func (id *Database) UpdateNewPodsByIPIndex(pod *kube.PodInfo) {
id.access.Lock()
defer id.access.Unlock()
if len(pod.IPInfo.IPs) > 0 {
id.podsMut.Lock()
defer id.podsMut.Unlock()
for _, ip := range pod.IPInfo.IPs {
id.podsByIP[ip] = pod
}
id.addPods(pod)
}
}

func (id *Database) UpdateDeletedPodsByIPIndex(pod *kube.PodInfo) {
id.access.Lock()
defer id.access.Unlock()
if len(pod.IPInfo.IPs) > 0 {
id.podsMut.Lock()
defer id.podsMut.Unlock()
for _, ip := range pod.IPInfo.IPs {
delete(id.podsByIP, ip)
id.deletePods(pod)
}
}

func (id *Database) UpdatePodsByIPIndex(oldPod, newPod *kube.PodInfo) {
id.access.Lock()
defer id.access.Unlock()
id.deletePods(oldPod)
id.addPods(newPod)
}

func (id *Database) addPods(pod *kube.PodInfo) {
for _, ip := range pod.IPInfo.IPs {
id.podsByIP[ip] = pod
}
}

func (id *Database) deletePods(pod *kube.PodInfo) {
for _, ip := range pod.IPInfo.IPs {
delete(id.podsByIP, ip)
for _, cid := range pod.ContainerIDs {
cnt, ok := id.containerIDs[cid]
delete(id.containerIDs, cid)
if ok {
delete(id.namespaces, cnt.PIDNamespace)
delete(id.fetchedPodsCache, cnt.PIDNamespace)
}
}
}
}

func (id *Database) PodInfoForIP(ip string) *kube.PodInfo {
id.podsMut.RLock()
defer id.podsMut.RUnlock()
id.access.RLock()
defer id.access.RUnlock()
return id.podsByIP[ip]
}

func (id *Database) UpdateNewServicesByIPIndex(svc *kube.ServiceInfo) {
id.access.Lock()
defer id.access.Unlock()
if len(svc.IPInfo.IPs) > 0 {
id.svcMut.Lock()
defer id.svcMut.Unlock()
for _, ip := range svc.IPInfo.IPs {
id.svcByIP[ip] = svc
}
}
}

func (id *Database) UpdateDeletedServicesByIPIndex(svc *kube.ServiceInfo) {
id.access.Lock()
defer id.access.Unlock()
if len(svc.IPInfo.IPs) > 0 {
id.svcMut.Lock()
defer id.svcMut.Unlock()
for _, ip := range svc.IPInfo.IPs {
delete(id.svcByIP, ip)
}
}
}

func (id *Database) ServiceInfoForIP(ip string) *kube.ServiceInfo {
id.svcMut.RLock()
defer id.svcMut.RUnlock()
id.access.RLock()
defer id.access.RUnlock()
return id.svcByIP[ip]
}

func (id *Database) UpdateNewNodesByIPIndex(svc *kube.NodeInfo) {
id.nodeMut.Lock()
defer id.nodeMut.Unlock()
id.access.Lock()
defer id.access.Unlock()
if len(svc.IPInfo.IPs) > 0 {
for _, ip := range svc.IPInfo.IPs {
id.nodeByIP[ip] = svc
Expand All @@ -247,8 +252,8 @@ func (id *Database) UpdateNewNodesByIPIndex(svc *kube.NodeInfo) {
}

func (id *Database) UpdateDeletedNodesByIPIndex(svc *kube.NodeInfo) {
id.nodeMut.Lock()
defer id.nodeMut.Unlock()
id.access.Lock()
defer id.access.Unlock()
if len(svc.IPInfo.IPs) > 0 {
for _, ip := range svc.IPInfo.IPs {
delete(id.nodeByIP, ip)
Expand All @@ -257,49 +262,41 @@ func (id *Database) UpdateDeletedNodesByIPIndex(svc *kube.NodeInfo) {
}

func (id *Database) NodeInfoForIP(ip string) *kube.NodeInfo {
id.nodeMut.RLock()
defer id.nodeMut.RUnlock()
id.access.RLock()
defer id.access.RUnlock()
return id.nodeByIP[ip]
}

func (id *Database) HostNameForIP(ip string) string {
id.svcMut.RLock()
id.access.RLock()
defer id.access.RUnlock()
svc, ok := id.svcByIP[ip]
id.svcMut.RUnlock()
if ok {
return svc.Name
}
id.podsMut.RLock()
pod, ok := id.podsByIP[ip]
id.podsMut.RUnlock()
if ok {
return pod.Name
}
id.nodeMut.RLock()
node, ok := id.nodeByIP[ip]
id.nodeMut.RUnlock()
if ok {
return node.Name
}
return ""
}

func (id *Database) ServiceNameNamespaceForIP(ip string) (string, string) {
id.svcMut.RLock()
id.access.RLock()
defer id.access.RUnlock()
svc, ok := id.svcByIP[ip]
id.svcMut.RUnlock()
if ok {
return svc.Name, svc.Namespace
}
id.podsMut.RLock()
pod, ok := id.podsByIP[ip]
id.podsMut.RUnlock()
if ok {
return pod.ServiceName(), pod.Namespace
}
id.nodeMut.RLock()
node, ok := id.nodeByIP[ip]
id.nodeMut.RUnlock()
if ok {
return node.Name, node.Namespace
}
Expand Down

0 comments on commit b71dc56

Please sign in to comment.