Skip to content

Commit

Permalink
Merge pull request #322 from Pavani-Panakanti/strict_mode_annotate_po…
Browse files Browse the repository at this point in the history
…d_ip

Fix race condition in strict mode and annotate_pod_ip=true
  • Loading branch information
orsenthil authored Oct 24, 2024
2 parents 34104a3 + 1da6bfb commit b0942ec
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 18 deletions.
6 changes: 2 additions & 4 deletions controllers/policyendpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,7 @@ func (r *PolicyEndpointsReconciler) configureeBPFProbes(ctx context.Context, pod
continue
}

// Check if an eBPF probe is already attached on both ingress and egress direction(s) for this pod.
// If yes, then skip probe attach flow for this pod and update the relevant map entries.
isIngressProbeAttached, isEgressProbeAttached := r.ebpfClient.IsEBPFProbeAttached(pod.Name, pod.Namespace)
err = r.ebpfClient.AttacheBPFProbes(pod, podIdentifier, !isIngressProbeAttached, !isEgressProbeAttached)
err = r.ebpfClient.AttacheBPFProbes(pod, podIdentifier)
if err != nil {
r.log.Info("Attaching eBPF probe failed for", "pod", pod.Name, "namespace", pod.Namespace)
return err
Expand Down Expand Up @@ -621,6 +618,7 @@ func (r *PolicyEndpointsReconciler) getPodListToBeCleanedUp(oldPodSet []types.Na
r.log.Info("Pod not active. Deleting from progPod caches", "podName: ", oldPod.Name, "podNamespace: ", oldPod.Namespace)
r.ebpfClient.DeletePodFromIngressProgPodCaches(oldPod.Name, oldPod.Namespace)
r.ebpfClient.DeletePodFromEgressProgPodCaches(oldPod.Name, oldPod.Namespace)
r.ebpfClient.DeletePodFromAttachProbesToPodLock(oldPod.Name, oldPod.Namespace)
}
}

Expand Down
35 changes: 29 additions & 6 deletions pkg/ebpf/bpf_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func prometheusRegister() {
}

type BpfClient interface {
AttacheBPFProbes(pod types.NamespacedName, policyEndpoint string, ingress bool, egress bool) error
AttacheBPFProbes(pod types.NamespacedName, policyEndpoint string) error
DetacheBPFProbes(pod types.NamespacedName, ingress bool, egress bool, deletePinPath bool) error
UpdateEbpfMaps(podIdentifier string, ingressFirewallRules []EbpfFirewallRules, egressFirewallRules []EbpfFirewallRules) error
IsEBPFProbeAttached(podName string, podNamespace string) (bool, bool)
Expand All @@ -95,6 +95,7 @@ type BpfClient interface {
GetEgressProgToPodsMap() *sync.Map
DeletePodFromIngressProgPodCaches(podName string, podNamespace string)
DeletePodFromEgressProgPodCaches(podName string, podNamespace string)
DeletePodFromAttachProbesToPodLock(podName string, podNamespace string)
}

type EvProgram struct {
Expand Down Expand Up @@ -126,6 +127,7 @@ func NewBpfClient(policyEndpointeBPFContext *sync.Map, nodeIP string, enablePoli
GlobalMaps: new(sync.Map),
IngressProgToPodsMap: new(sync.Map),
EgressProgToPodsMap: new(sync.Map),
AttachProbesToPodLock: new(sync.Map),
}
ebpfClient.logger = ctrl.Log.WithName("ebpf-client")
ingressBinary, egressBinary, eventsBinary,
Expand Down Expand Up @@ -291,6 +293,8 @@ type bpfClient struct {
IngressProgToPodsMap *sync.Map
// Stores the Egress eBPF Prog FD to pods mapping
EgressProgToPodsMap *sync.Map
// Stores pod to attachprobes lock mapping
AttachProbesToPodLock *sync.Map
}

type Event_t struct {
Expand Down Expand Up @@ -438,15 +442,28 @@ func (l *bpfClient) GetEgressProgToPodsMap() *sync.Map {
return l.EgressProgToPodsMap
}

func (l *bpfClient) AttacheBPFProbes(pod types.NamespacedName, podIdentifier string, ingress bool, egress bool) error {
func (l *bpfClient) AttacheBPFProbes(pod types.NamespacedName, podIdentifier string) error {
podNamespacedName := utils.GetPodNamespacedName(pod.Name, pod.Namespace)
// In strict mode two go routines can try to attach the probes at the same time
// Locking will help updating all the datastructures correctly
value, _ := l.AttachProbesToPodLock.LoadOrStore(podNamespacedName, &sync.Mutex{})
attachProbesLock := value.(*sync.Mutex)
attachProbesLock.Lock()
l.logger.Info("Got the attachProbesLock for", "pod", pod.Name, " in namespace", pod.Namespace)
defer attachProbesLock.Unlock()

// Check if an eBPF probe is already attached on both ingress and egress direction(s) for this pod.
// If yes, then skip probe attach flow for this pod and update the relevant map entries.
isIngressProbeAttached, isEgressProbeAttached := l.IsEBPFProbeAttached(pod.Name, pod.Namespace)

start := time.Now()
// We attach the TC probes to the hostVeth interface of the pod. Derive the hostVeth
// name from the Name and Namespace of the Pod.
// Note: The below naming convention is tied to VPC CNI and isn't meant to be generic
hostVethName := utils.GetHostVethName(pod.Name, pod.Namespace)
l.logger.Info("AttacheBPFProbes for", "pod", pod.Name, " in namespace", pod.Namespace, " with hostVethName", hostVethName)

if ingress {
if !isIngressProbeAttached {
progFD, err := l.attachIngressBPFProbe(hostVethName, podIdentifier)
duration := msSince(start)
sdkAPILatency.WithLabelValues("attachIngressBPFProbe", fmt.Sprint(err != nil)).Observe(duration)
Expand All @@ -456,13 +473,12 @@ func (l *bpfClient) AttacheBPFProbes(pod types.NamespacedName, podIdentifier str
return err
}
l.logger.Info("Successfully attached Ingress TC probe for", "pod: ", pod.Name, " in namespace", pod.Namespace)
podNamespacedName := utils.GetPodNamespacedName(pod.Name, pod.Namespace)
l.IngressPodToProgMap.Store(podNamespacedName, progFD)
currentPodSet, _ := l.IngressProgToPodsMap.LoadOrStore(progFD, make(map[string]struct{}))
currentPodSet.(map[string]struct{})[podNamespacedName] = struct{}{}
}

if egress {
if !isEgressProbeAttached {
progFD, err := l.attachEgressBPFProbe(hostVethName, podIdentifier)
duration := msSince(start)
sdkAPILatency.WithLabelValues("attachEgressBPFProbe", fmt.Sprint(err != nil)).Observe(duration)
Expand All @@ -472,7 +488,6 @@ func (l *bpfClient) AttacheBPFProbes(pod types.NamespacedName, podIdentifier str
return err
}
l.logger.Info("Successfully attached Egress TC probe for", "pod: ", pod.Name, " in namespace", pod.Namespace)
podNamespacedName := utils.GetPodNamespacedName(pod.Name, pod.Namespace)
l.EgressPodToProgMap.Store(podNamespacedName, progFD)
currentPodSet, _ := l.EgressProgToPodsMap.LoadOrStore(progFD, make(map[string]struct{}))
currentPodSet.(map[string]struct{})[podNamespacedName] = struct{}{}
Expand Down Expand Up @@ -529,6 +544,7 @@ func (l *bpfClient) DetacheBPFProbes(pod types.NamespacedName, ingress bool, egr
}
l.DeletePodFromEgressProgPodCaches(pod.Name, pod.Namespace)
}
l.DeletePodFromAttachProbesToPodLock(pod.Name, pod.Namespace)
return nil
}

Expand Down Expand Up @@ -1027,3 +1043,10 @@ func (l *bpfClient) DeletePodFromEgressProgPodCaches(podName string, podNamespac
}
}
}

func (l *bpfClient) DeletePodFromAttachProbesToPodLock(podName string, podNamespace string) {
podNamespacedName := utils.GetPodNamespacedName(podName, podNamespace)
if _, ok := l.AttachProbesToPodLock.Load(podNamespacedName); ok {
l.AttachProbesToPodLock.Delete(podNamespacedName)
}
}
10 changes: 3 additions & 7 deletions pkg/ebpf/bpf_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,23 +569,17 @@ func TestBpfClient_AttacheBPFProbes(t *testing.T) {
name string
testPod types.NamespacedName
podIdentifier string
ingress bool
egress bool
wantErr error
}{
{
name: "Ingress and Egress Attach - Existing probes",
testPod: testPod,
podIdentifier: utils.GetPodIdentifier(testPod.Name, testPod.Namespace, logr.New(&log.NullLogSink{})),
ingress: true,
egress: true,
wantErr: nil,
},
{
name: "Ingress and Egress Attach - New probes",
testPod: testPod,
ingress: true,
egress: true,
wantErr: nil,
},
}
Expand All @@ -611,6 +605,7 @@ func TestBpfClient_AttacheBPFProbes(t *testing.T) {
EgressPodToProgMap: new(sync.Map),
IngressProgToPodsMap: new(sync.Map),
EgressProgToPodsMap: new(sync.Map),
AttachProbesToPodLock: new(sync.Map),
}

sampleBPFContext := BPFContext{
Expand All @@ -620,7 +615,7 @@ func TestBpfClient_AttacheBPFProbes(t *testing.T) {
testBpfClient.policyEndpointeBPFContext.Store(tt.podIdentifier, sampleBPFContext)

t.Run(tt.name, func(t *testing.T) {
gotError := testBpfClient.AttacheBPFProbes(tt.testPod, tt.podIdentifier, tt.ingress, tt.egress)
gotError := testBpfClient.AttacheBPFProbes(tt.testPod, tt.podIdentifier)
assert.Equal(t, tt.wantErr, gotError)
})
}
Expand Down Expand Up @@ -665,6 +660,7 @@ func TestBpfClient_DetacheBPFProbes(t *testing.T) {
bpfTCClient: mockTCClient,
IngressPodToProgMap: new(sync.Map),
EgressPodToProgMap: new(sync.Map),
AttachProbesToPodLock: new(sync.Map),
}

t.Run(tt.name, func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/rpc/rpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (s *server) EnforceNpToPod(ctx context.Context, in *rpc.EnforceNpRequest) (
podIdentifier := utils.GetPodIdentifier(in.K8S_POD_NAME, in.K8S_POD_NAMESPACE, s.log)
isMapUpdateRequired := s.policyReconciler.GeteBPFClient().IsMapUpdateRequired(podIdentifier)
err = s.policyReconciler.GeteBPFClient().AttacheBPFProbes(types.NamespacedName{Name: in.K8S_POD_NAME, Namespace: in.K8S_POD_NAMESPACE},
podIdentifier, true, true)
podIdentifier)
if err != nil {
s.log.Error(err, "Attaching eBPF probe failed for", "pod", in.K8S_POD_NAME, "namespace", in.K8S_POD_NAMESPACE)
return nil, err
Expand Down

0 comments on commit b0942ec

Please sign in to comment.