Skip to content

Commit

Permalink
NETOBSERV-1805: make sure to cleanup pod veth's hooks when pods is de…
Browse files Browse the repository at this point in the history
…leted (netobserv#399)

Today ebpf agent only cleanup pods when the pod restarts
but in this issue pods where continosly recreated but
it was never deleted causing the fd leak

Signed-off-by: Mohamed Mahmoud <[email protected]>
  • Loading branch information
msherif1234 authored and jotak committed Sep 9, 2024
1 parent ec5f257 commit f02429f
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 25 deletions.
43 changes: 29 additions & 14 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func configureInformer(cfg *Config, log *logrus.Entry) ifaces.Informer {

}

func interfaceListener(ctx context.Context, ifaceEvents <-chan ifaces.Event, slog *logrus.Entry, eventAdded func(iface ifaces.Interface)) {
func interfaceListener(ctx context.Context, ifaceEvents <-chan ifaces.Event, slog *logrus.Entry, processEvent func(iface ifaces.Interface, add bool)) {
for {
select {
case <-ctx.Done():
Expand All @@ -85,10 +85,9 @@ func interfaceListener(ctx context.Context, ifaceEvents <-chan ifaces.Event, slo
slog.WithField("event", event).Debug("received event")
switch event.Type {
case ifaces.EventAdded:
eventAdded(event.Interface)
processEvent(event.Interface, true)
case ifaces.EventDeleted:
// qdiscs, ingress and egress filters are automatically deleted so we don't need to
// specifically detach them from the ebpfFetcher
processEvent(event.Interface, false)
default:
slog.WithField("event", event).Warn("unknown event type")
}
Expand Down Expand Up @@ -125,7 +124,9 @@ type Flows struct {
type ebpfFlowFetcher interface {
io.Closer
Register(iface ifaces.Interface) error
UnRegister(iface ifaces.Interface) error
AttachTCX(iface ifaces.Interface) error
DetachTCX(iface ifaces.Interface) error

LookupAndDeleteMap(*metrics.Metrics) map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics
DeleteMapsStaleEntries(timeOut time.Duration)
Expand Down Expand Up @@ -432,7 +433,7 @@ func (f *Flows) Status() Status {

// interfacesManager uses an informer to check new/deleted network interfaces. For each running
// interface, it registers a flow ebpfFetcher that will forward new flows to the returned channel
// TODO: consider move this method and "onInterfaceAdded" to another type
// TODO: consider move this method and "onInterfaceEvent" to another type
func (f *Flows) interfacesManager(ctx context.Context) error {
slog := alog.WithField("function", "interfacesManager")

Expand All @@ -442,7 +443,7 @@ func (f *Flows) interfacesManager(ctx context.Context) error {
return fmt.Errorf("instantiating interfaces' informer: %w", err)
}

go interfaceListener(ctx, ifaceEvents, slog, f.onInterfaceAdded)
go interfaceListener(ctx, ifaceEvents, slog, f.onInterfaceEvent)

return nil
}
Expand Down Expand Up @@ -498,7 +499,7 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*fl
return export, nil
}

func (f *Flows) onInterfaceAdded(iface ifaces.Interface) {
func (f *Flows) onInterfaceEvent(iface ifaces.Interface, add bool) {
// ignore interfaces that do not match the user configuration acceptance/exclusion lists
allowed, err := f.filter.Allowed(iface.Name)
if err != nil {
Expand All @@ -510,14 +511,28 @@ func (f *Flows) onInterfaceAdded(iface ifaces.Interface) {
Debug("interface does not match the allow/exclusion filters. Ignoring")
return
}
alog.WithField("interface", iface).Info("interface detected. trying to attach TCX hook")
if err := f.ebpf.AttachTCX(iface); err != nil {
alog.WithField("interface", iface).WithError(err).
Info("can't attach to TCx hook flow ebpfFetcher. fall back to use legacy TC hook")
if err := f.ebpf.Register(iface); err != nil {
if add {
alog.WithField("interface", iface).Info("interface detected. trying to attach TCX hook")
if err := f.ebpf.AttachTCX(iface); err != nil {
alog.WithField("interface", iface).WithError(err).
Warn("can't register flow ebpfFetcher. Ignoring")
return
Info("can't attach to TCx hook flow ebpfFetcher. fall back to use legacy TC hook")
if err := f.ebpf.Register(iface); err != nil {
alog.WithField("interface", iface).WithError(err).
Warn("can't register flow ebpfFetcher. Ignoring")
return
}
}
} else {
alog.WithField("interface", iface).Info("interface deleted. trying to detach TCX hook")
if err := f.ebpf.DetachTCX(iface); err != nil {
alog.WithField("interface", iface).WithError(err).
Info("can't detach from TCx hook flow ebpfFetcher. fall back to use legacy TC hook")
if err := f.ebpf.UnRegister(iface); err != nil {
alog.WithField("interface", iface).WithError(err).
Warn("can't unregister flow ebpfFetcher. Ignoring")
return
}
}

}
}
32 changes: 24 additions & 8 deletions pkg/agent/packets_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ type Packets struct {
type ebpfPacketFetcher interface {
io.Closer
Register(iface ifaces.Interface) error
UnRegister(iface ifaces.Interface) error
AttachTCX(iface ifaces.Interface) error
DetachTCX(iface ifaces.Interface) error
LookupAndDeleteMap(*metrics.Metrics) map[int][]*byte
ReadPerf() (perf.Record, error)
}
Expand Down Expand Up @@ -234,7 +236,7 @@ func (p *Packets) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*
return export, nil
}

func (p *Packets) onInterfaceAdded(iface ifaces.Interface) {
func (p *Packets) onInterfaceAdded(iface ifaces.Interface, add bool) {
// ignore interfaces that do not match the user configuration acceptance/exclusion lists
allowed, err := p.filter.Allowed(iface.Name)
if err != nil {
Expand All @@ -246,14 +248,28 @@ func (p *Packets) onInterfaceAdded(iface ifaces.Interface) {
Debug("[PCA]interface does not match the allow/exclusion filters. Ignoring")
return
}
plog.WithField("interface", iface).Info("interface detected. trying to attach TCX hook")
if err := p.ebpf.AttachTCX(iface); err != nil {
plog.WithField("[PCA]interface", iface).WithError(err).
Info("can't attach to TCx hook packet ebpfFetcher. fall back to use legacy TC hook")
if err := p.ebpf.Register(iface); err != nil {
if add {
plog.WithField("interface", iface).Info("interface detected. trying to attach TCX hook")
if err := p.ebpf.AttachTCX(iface); err != nil {
plog.WithField("[PCA]interface", iface).WithError(err).
Info("can't attach to TCx hook packet ebpfFetcher. fall back to use legacy TC hook")
if err := p.ebpf.Register(iface); err != nil {
plog.WithField("[PCA]interface", iface).WithError(err).
Warn("can't register packet ebpfFetcher. Ignoring")
return
}
}
} else {
plog.WithField("interface", iface).Info("interface deleted. trying to detach TCX hook")
if err := p.ebpf.DetachTCX(iface); err != nil {
plog.WithField("[PCA]interface", iface).WithError(err).
Warn("can't register packet ebpfFetcher. Ignoring")
return
Info("can't detach from TCx hook packet ebpfFetcher. check if there is any legacy TC hook")
if err := p.ebpf.UnRegister(iface); err != nil {
plog.WithField("[PCA]interface", iface).WithError(err).
Warn("can't unregister packet ebpfFetcher. Ignoring")
return
}
}

}
}
100 changes: 97 additions & 3 deletions pkg/ebpf/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,48 @@ func (m *FlowFetcher) AttachTCX(iface ifaces.Interface) error {
return nil
}

func (m *FlowFetcher) DetachTCX(iface ifaces.Interface) error {
ilog := log.WithField("iface", iface)
if iface.NetNS != netns.None() {
originalNs, err := netns.Get()
if err != nil {
return fmt.Errorf("failed to get current netns: %w", err)
}
defer func() {
if err := netns.Set(originalNs); err != nil {
ilog.WithError(err).Error("failed to set netns back")
}
originalNs.Close()
}()
if err := unix.Setns(int(iface.NetNS), unix.CLONE_NEWNET); err != nil {
return fmt.Errorf("failed to setns to %s: %w", iface.NetNS, err)
}
}
if m.enableEgress {
if l := m.egressTCXLink[iface]; l != nil {
if err := l.Close(); err != nil {
return fmt.Errorf("TCX: failed to close egress link: %w", err)
}
ilog.WithField("interface", iface.Name).Debug("successfully detach egressTCX hook")
} else {
return fmt.Errorf("egress link does not have a TCX egress hook")
}
}

if m.enableIngress {
if l := m.ingressTCXLink[iface]; l != nil {
if err := l.Close(); err != nil {
return fmt.Errorf("TCX: failed to close ingress link: %w", err)
}
ilog.WithField("interface", iface.Name).Debug("successfully detach ingressTCX hook")
} else {
return fmt.Errorf("ingress link does not have a TCX ingress hook")
}
}

return nil
}

func removeTCFilters(ifName string, tcDir uint32) error {
link, err := netlink.LinkByName(ifName)
if err != nil {
Expand All @@ -285,7 +327,7 @@ func removeTCFilters(ifName string, tcDir uint32) error {
return kerrors.NewAggregate(errs)
}

func (m *FlowFetcher) removePreviousFilters(iface ifaces.Interface) error {
func unregister(iface ifaces.Interface) error {
ilog := log.WithField("iface", iface)
ilog.Debugf("looking for previously installed TC filters on %s", iface.Name)
links, err := netlink.LinkList()
Expand Down Expand Up @@ -343,6 +385,12 @@ func (m *FlowFetcher) removePreviousFilters(iface ifaces.Interface) error {
return nil
}

func (m *FlowFetcher) UnRegister(iface ifaces.Interface) error {
// qdiscs, ingress and egress filters are automatically deleted so we don't need to
// specifically detach them from the ebpfFetcher
return unregister(iface)
}

// Register and links the eBPF fetcher into the system. The program should invoke Unregister
// before exiting.
func (m *FlowFetcher) Register(iface ifaces.Interface) error {
Expand Down Expand Up @@ -380,7 +428,7 @@ func (m *FlowFetcher) Register(iface ifaces.Interface) error {
m.qdiscs[iface] = qdisc

// Remove previously installed filters
if err := m.removePreviousFilters(iface); err != nil {
if err := unregister(iface); err != nil {
return fmt.Errorf("failed to remove previous filters: %w", err)
}

Expand Down Expand Up @@ -912,8 +960,13 @@ func registerInterface(iface ifaces.Interface) (*netlink.GenericQdisc, netlink.L
return qdisc, ipvlan, nil
}

func (p *PacketFetcher) Register(iface ifaces.Interface) error {
func (p *PacketFetcher) UnRegister(iface ifaces.Interface) error {
// qdiscs, ingress and egress filters are automatically deleted so we don't need to
// specifically detach them from the ebpfFetcher
return unregister(iface)
}

func (p *PacketFetcher) Register(iface ifaces.Interface) error {
qdisc, ipvlan, err := registerInterface(iface)
if err != nil {
return err
Expand All @@ -926,6 +979,47 @@ func (p *PacketFetcher) Register(iface ifaces.Interface) error {
return p.registerIngress(iface, ipvlan)
}

func (p *PacketFetcher) DetachTCX(iface ifaces.Interface) error {
ilog := log.WithField("iface", iface)
if iface.NetNS != netns.None() {
originalNs, err := netns.Get()
if err != nil {
return fmt.Errorf("PCA failed to get current netns: %w", err)
}
defer func() {
if err := netns.Set(originalNs); err != nil {
ilog.WithError(err).Error("PCA failed to set netns back")
}
originalNs.Close()
}()
if err := unix.Setns(int(iface.NetNS), unix.CLONE_NEWNET); err != nil {
return fmt.Errorf("PCA failed to setns to %s: %w", iface.NetNS, err)
}
}
if p.enableEgress {
if l := p.egressTCXLink[iface]; l != nil {
if err := l.Close(); err != nil {
return fmt.Errorf("TCX: failed to close egress link: %w", err)
}
ilog.WithField("interface", iface.Name).Debug("successfully detach egressTCX hook")
} else {
return fmt.Errorf("egress link does not support TCX hook")
}
}

if p.enableIngress {
if l := p.ingressTCXLink[iface]; l != nil {
if err := l.Close(); err != nil {
return fmt.Errorf("TCX: failed to close ingress link: %w", err)
}
ilog.WithField("interface", iface.Name).Debug("successfully detach ingressTCX hook")
} else {
return fmt.Errorf("ingress link does not support TCX hook")
}
}
return nil
}

func (p *PacketFetcher) AttachTCX(iface ifaces.Interface) error {
ilog := log.WithField("iface", iface)
if iface.NetNS != netns.None() {
Expand Down
3 changes: 3 additions & 0 deletions pkg/ifaces/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ func (w *Watcher) sendUpdates(ctx context.Context, ns string, out chan Event) {
"netnsHandle": netnsHandle.String(),
"error": err,
}).Debug("linkSubscribe failed retry")
if err := netnsHandle.Close(); err != nil {
log.WithError(err).Warn("netnsHandle close failed")
}
return false, nil
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/test/tracer_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,19 @@ func (m *TracerFake) Register(iface ifaces.Interface) error {
return nil
}

func (m *TracerFake) UnRegister(iface ifaces.Interface) error {
m.interfaces[iface] = struct{}{}
return nil
}

func (m *TracerFake) AttachTCX(_ ifaces.Interface) error {
return nil
}

func (m *TracerFake) DetachTCX(_ ifaces.Interface) error {
return nil
}

func (m *TracerFake) LookupAndDeleteMap(_ *metrics.Metrics) map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics {
select {
case r := <-m.mapLookups:
Expand Down

0 comments on commit f02429f

Please sign in to comment.