Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improving thread safety of K8s informer DB #1118

Merged
merged 4 commits into from
Sep 2, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 55 additions & 51 deletions pkg/internal/transform/kube/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,20 @@ func dblog() *slog.Logger {
type Database struct {
informer *kube.Metadata

cntMut sync.Mutex
access sync.RWMutex

containerIDs map[string]*container.Info

// a single namespace will point to any container inside the pod
// but we don't care which one
nsMut sync.RWMutex
namespaces map[uint32]*container.Info

// key: pid namespace
podsCacheMut sync.RWMutex
fetchedPodsCache map[uint32]*kube.PodInfo

// ip to pod name matcher
podsMut sync.RWMutex
podsByIP map[string]*kube.PodInfo

// ip to service name matcher
svcMut sync.RWMutex
svcByIP map[string]*kube.ServiceInfo
}

Expand All @@ -63,8 +59,7 @@ func StartDatabase(kubeMetadata *kube.Metadata) (*Database, error) {
db.UpdateNewPodsByIPIndex(obj.(*kube.PodInfo))
},
UpdateFunc: func(oldObj, newObj interface{}) {
db.UpdateDeletedPodsByIPIndex(oldObj.(*kube.PodInfo))
db.UpdateNewPodsByIPIndex(newObj.(*kube.PodInfo))
db.UpdatePodsByIPIndex(oldObj.(*kube.PodInfo), newObj.(*kube.PodInfo))
},
DeleteFunc: func(obj interface{}) {
db.UpdateDeletedPodsByIPIndex(obj.(*kube.PodInfo))
Expand Down Expand Up @@ -92,28 +87,24 @@ func StartDatabase(kubeMetadata *kube.Metadata) (*Database, error) {

// OnDeletion implements ContainerEventHandler
func (id *Database) OnDeletion(containerID []string) {
id.access.Lock()
defer id.access.Unlock()
for _, cid := range containerID {
id.cntMut.Lock()
info, ok := id.containerIDs[cid]
delete(id.containerIDs, cid)
id.cntMut.Unlock()
if ok {
id.deletePodCache(info.PIDNamespace)
id.nsMut.Lock()
delete(id.fetchedPodsCache, info.PIDNamespace)
delete(id.namespaces, info.PIDNamespace)
id.nsMut.Unlock()
}
}
}

func (id *Database) addProcess(ifp *container.Info) {
id.deletePodCache(ifp.PIDNamespace)
id.nsMut.Lock()
id.access.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe safer to do a defer unlock() here?

delete(id.fetchedPodsCache, ifp.PIDNamespace)
id.namespaces[ifp.PIDNamespace] = ifp
id.nsMut.Unlock()
id.cntMut.Lock()
id.containerIDs[ifp.ContainerID] = ifp
id.cntMut.Unlock()
id.access.Unlock()
}

// AddProcess also searches for the container.Info of the passed PID
Expand All @@ -128,36 +119,28 @@ func (id *Database) AddProcess(pid uint32) {
}

func (id *Database) CleanProcessCaches(ns uint32) {
id.access.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defer here too?

// Don't delete the id.namespaces, we can't tell if Add/Delete events
// are in order. Deleting from the cache is safe, since it will be rebuilt.
id.deletePodCache(ns)
}

func (id *Database) deletePodCache(ns uint32) {
id.podsCacheMut.Lock()
delete(id.fetchedPodsCache, ns)
id.podsCacheMut.Unlock()
id.access.Unlock()
}

// OwnerPodInfo returns the information of the pod owning the passed namespace
func (id *Database) OwnerPodInfo(pidNamespace uint32) (*kube.PodInfo, bool) {
id.podsCacheMut.RLock()
id.access.Lock()
defer id.access.Unlock()
pod, ok := id.fetchedPodsCache[pidNamespace]
id.podsCacheMut.RUnlock()
if !ok {
id.nsMut.RLock()
info, ok := id.namespaces[pidNamespace]
id.nsMut.RUnlock()
if !ok {
return nil, false
}
pod, ok = id.informer.GetContainerPod(info.ContainerID)
if !ok {
return nil, false
}
id.podsCacheMut.Lock()
id.fetchedPodsCache[pidNamespace] = pod
id.podsCacheMut.Unlock()
}
// we check DeploymentName after caching, as the replicasetInfo might be
// received late by the replicaset informer
Expand All @@ -167,34 +150,57 @@ func (id *Database) OwnerPodInfo(pidNamespace uint32) (*kube.PodInfo, bool) {

func (id *Database) UpdateNewPodsByIPIndex(pod *kube.PodInfo) {
if len(pod.IPInfo.IPs) > 0 {
id.podsMut.Lock()
defer id.podsMut.Unlock()
for _, ip := range pod.IPInfo.IPs {
id.podsByIP[ip] = pod
}
id.access.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can lead to a race too, we check the pod state before we lock. Let's wrap the whole function in a lock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, it's not needed because the Mutex protects access to the maps, and the pod value is provided from outside the database, from a value that is not concurrently modified.

Anyway let's move the lock outside the block to make it clearer, as anyway the pods should have always IPs.

defer id.access.Unlock()
id.addPods(pod)
}
}

func (id *Database) UpdateDeletedPodsByIPIndex(pod *kube.PodInfo) {
if len(pod.IPInfo.IPs) > 0 {
id.podsMut.Lock()
defer id.podsMut.Unlock()
for _, ip := range pod.IPInfo.IPs {
delete(id.podsByIP, ip)
id.access.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, we should check the pod state inside the lock.

defer id.access.Unlock()
id.deletePods(pod)
}
}

func (id *Database) UpdatePodsByIPIndex(oldPod, newPod *kube.PodInfo) {
id.access.Lock()
defer id.access.Unlock()
id.deletePods(oldPod)
id.addPods(newPod)
}

func (id *Database) addPods(pod *kube.PodInfo) {
for _, ip := range pod.IPInfo.IPs {
id.podsByIP[ip] = pod
}
}

func (id *Database) deletePods(pod *kube.PodInfo) {
for _, ip := range pod.IPInfo.IPs {
delete(id.podsByIP, ip)
for _, cid := range pod.ContainerIDs {
cnt, ok := id.containerIDs[cid]
delete(id.containerIDs, cid)
if ok {
delete(id.namespaces, cnt.PIDNamespace)
delete(id.fetchedPodsCache, cnt.PIDNamespace)
}
}
}
}

func (id *Database) PodInfoForIP(ip string) *kube.PodInfo {
id.podsMut.RLock()
defer id.podsMut.RUnlock()
id.access.RLock()
defer id.access.RUnlock()
return id.podsByIP[ip]
}

func (id *Database) UpdateNewServicesByIPIndex(svc *kube.ServiceInfo) {
if len(svc.IPInfo.IPs) > 0 {
id.svcMut.Lock()
defer id.svcMut.Unlock()
id.access.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also check the pod state inside the lock.

defer id.access.Unlock()
for _, ip := range svc.IPInfo.IPs {
id.svcByIP[ip] = svc
}
Expand All @@ -203,30 +209,28 @@ func (id *Database) UpdateNewServicesByIPIndex(svc *kube.ServiceInfo) {

func (id *Database) UpdateDeletedServicesByIPIndex(svc *kube.ServiceInfo) {
if len(svc.IPInfo.IPs) > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check the pod state inside a lock.

id.svcMut.Lock()
defer id.svcMut.Unlock()
id.access.Lock()
defer id.access.Unlock()
for _, ip := range svc.IPInfo.IPs {
delete(id.svcByIP, ip)
}
}
}

func (id *Database) ServiceInfoForIP(ip string) *kube.ServiceInfo {
id.svcMut.RLock()
defer id.svcMut.RUnlock()
id.access.RLock()
defer id.access.RUnlock()
return id.svcByIP[ip]
}

func (id *Database) HostNameForIP(ip string) string {
id.svcMut.RLock()
id.access.RLock()
defer id.access.RUnlock()
svc, ok := id.svcByIP[ip]
id.svcMut.RUnlock()
if ok {
return svc.Name
}
id.podsMut.RLock()
pod, ok := id.podsByIP[ip]
id.podsMut.RUnlock()
if ok {
return pod.Name
}
Expand Down
Loading