Skip to content

Commit

Permalink
NETOBSERV-1805: make sure to cleanup pod veth's hooks when pods is de…
Browse files Browse the repository at this point in the history
…leted

Today ebpf agent only cleanup pods when the pod restarts
but in this issue pods where continosly recreated but
it was never deleted causing the fd leak

Signed-off-by: Mohamed Mahmoud <[email protected]>
  • Loading branch information
msherif1234 committed Aug 27, 2024
1 parent eedc708 commit 45c749d
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 25 deletions.
43 changes: 29 additions & 14 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func configureInformer(cfg *Config, log *logrus.Entry) ifaces.Informer {

}

func interfaceListener(ctx context.Context, ifaceEvents <-chan ifaces.Event, slog *logrus.Entry, eventAdded func(iface ifaces.Interface)) {
func interfaceListener(ctx context.Context, ifaceEvents <-chan ifaces.Event, slog *logrus.Entry, processEvent func(iface ifaces.Interface, add bool)) {
for {
select {
case <-ctx.Done():
Expand All @@ -85,10 +85,9 @@ func interfaceListener(ctx context.Context, ifaceEvents <-chan ifaces.Event, slo
slog.WithField("event", event).Debug("received event")
switch event.Type {
case ifaces.EventAdded:
eventAdded(event.Interface)
processEvent(event.Interface, true)
case ifaces.EventDeleted:
// qdiscs, ingress and egress filters are automatically deleted so we don't need to
// specifically detach them from the ebpfFetcher
processEvent(event.Interface, false)
default:
slog.WithField("event", event).Warn("unknown event type")
}
Expand Down Expand Up @@ -125,7 +124,9 @@ type Flows struct {
type ebpfFlowFetcher interface {
io.Closer
Register(iface ifaces.Interface) error
UnRegister(iface ifaces.Interface) error
AttachTCX(iface ifaces.Interface) error
DetachTCX(iface ifaces.Interface) error

LookupAndDeleteMap(*metrics.Metrics) map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics
DeleteMapsStaleEntries(timeOut time.Duration)
Expand Down Expand Up @@ -434,7 +435,7 @@ func (f *Flows) Status() Status {

// interfacesManager uses an informer to check new/deleted network interfaces. For each running
// interface, it registers a flow ebpfFetcher that will forward new flows to the returned channel
// TODO: consider move this method and "onInterfaceAdded" to another type
// TODO: consider move this method and "onInterfaceEvent" to another type
func (f *Flows) interfacesManager(ctx context.Context) error {
slog := alog.WithField("function", "interfacesManager")

Expand All @@ -444,7 +445,7 @@ func (f *Flows) interfacesManager(ctx context.Context) error {
return fmt.Errorf("instantiating interfaces' informer: %w", err)
}

go interfaceListener(ctx, ifaceEvents, slog, f.onInterfaceAdded)
go interfaceListener(ctx, ifaceEvents, slog, f.onInterfaceEvent)

return nil
}
Expand Down Expand Up @@ -500,7 +501,7 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*fl
return export, nil
}

func (f *Flows) onInterfaceAdded(iface ifaces.Interface) {
func (f *Flows) onInterfaceEvent(iface ifaces.Interface, add bool) {
// ignore interfaces that do not match the user configuration acceptance/exclusion lists
allowed, err := f.filter.Allowed(iface.Name)
if err != nil {
Expand All @@ -512,14 +513,28 @@ func (f *Flows) onInterfaceAdded(iface ifaces.Interface) {
Debug("interface does not match the allow/exclusion filters. Ignoring")
return
}
alog.WithField("interface", iface).Info("interface detected. trying to attach TCX hook")
if err := f.ebpf.AttachTCX(iface); err != nil {
alog.WithField("interface", iface).WithError(err).
Info("can't attach to TCx hook flow ebpfFetcher. fall back to use legacy TC hook")
if err := f.ebpf.Register(iface); err != nil {
if add {
alog.WithField("interface", iface).Info("interface detected. trying to attach TCX hook")
if err := f.ebpf.AttachTCX(iface); err != nil {
alog.WithField("interface", iface).WithError(err).
Warn("can't register flow ebpfFetcher. Ignoring")
return
Info("can't attach to TCx hook flow ebpfFetcher. fall back to use legacy TC hook")
if err := f.ebpf.Register(iface); err != nil {
alog.WithField("interface", iface).WithError(err).
Warn("can't register flow ebpfFetcher. Ignoring")
return
}
}
} else {
alog.WithField("interface", iface).Info("interface deleted. trying to detach TCX hook")
if err := f.ebpf.DetachTCX(iface); err != nil {
alog.WithField("interface", iface).WithError(err).
Info("can't detach from TCx hook flow ebpfFetcher. fall back to use legacy TC hook")
if err := f.ebpf.UnRegister(iface); err != nil {
alog.WithField("interface", iface).WithError(err).
Warn("can't unregister flow ebpfFetcher. Ignoring")
return
}
}

}
}
32 changes: 24 additions & 8 deletions pkg/agent/packets_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ type Packets struct {
type ebpfPacketFetcher interface {
io.Closer
Register(iface ifaces.Interface) error
UnRegister(iface ifaces.Interface) error
AttachTCX(iface ifaces.Interface) error
DetachTCX(iface ifaces.Interface) error
LookupAndDeleteMap(*metrics.Metrics) map[int][]*byte
ReadPerf() (perf.Record, error)
}
Expand Down Expand Up @@ -267,7 +269,7 @@ func (p *Packets) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*
return export, nil
}

func (p *Packets) onInterfaceAdded(iface ifaces.Interface) {
func (p *Packets) onInterfaceAdded(iface ifaces.Interface, add bool) {
// ignore interfaces that do not match the user configuration acceptance/exclusion lists
allowed, err := p.filter.Allowed(iface.Name)
if err != nil {
Expand All @@ -279,14 +281,28 @@ func (p *Packets) onInterfaceAdded(iface ifaces.Interface) {
Debug("[PCA]interface does not match the allow/exclusion filters. Ignoring")
return
}
plog.WithField("interface", iface).Info("interface detected. trying to attach TCX hook")
if err := p.ebpf.AttachTCX(iface); err != nil {
plog.WithField("[PCA]interface", iface).WithError(err).
Info("can't attach to TCx hook packet ebpfFetcher. fall back to use legacy TC hook")
if err := p.ebpf.Register(iface); err != nil {
if add {
plog.WithField("interface", iface).Info("interface detected. trying to attach TCX hook")
if err := p.ebpf.AttachTCX(iface); err != nil {
plog.WithField("[PCA]interface", iface).WithError(err).
Info("can't attach to TCx hook packet ebpfFetcher. fall back to use legacy TC hook")
if err := p.ebpf.Register(iface); err != nil {
plog.WithField("[PCA]interface", iface).WithError(err).
Warn("can't register packet ebpfFetcher. Ignoring")
return
}
}
} else {
plog.WithField("interface", iface).Info("interface deleted. trying to detach TCX hook")
if err := p.ebpf.DetachTCX(iface); err != nil {
plog.WithField("[PCA]interface", iface).WithError(err).
Warn("can't register packet ebpfFetcher. Ignoring")
return
Info("can't detach from TCx hook packet ebpfFetcher. check if there is any legacy TC hook")
if err := p.ebpf.UnRegister(iface); err != nil {
plog.WithField("[PCA]interface", iface).WithError(err).
Warn("can't unregister packet ebpfFetcher. Ignoring")
return
}
}

}
}
71 changes: 68 additions & 3 deletions pkg/ebpf/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,34 @@ func (m *FlowFetcher) AttachTCX(iface ifaces.Interface) error {
return nil
}

func (m *FlowFetcher) DetachTCX(iface ifaces.Interface) error {
ilog := log.WithField("iface", iface)

if m.enableEgress {
if l := m.egressTCXLink[iface]; l != nil {
if err := l.Close(); err != nil {
return fmt.Errorf("TCX: failed to close egress link: %w", err)
}
ilog.WithField("interface", iface.Name).Debug("successfully detach egressTCX hook")
} else {
return fmt.Errorf("egress link does not have a TCX egress hook")
}
}

if m.enableIngress {
if l := m.ingressTCXLink[iface]; l != nil {
if err := l.Close(); err != nil {
return fmt.Errorf("TCX: failed to close ingress link: %w", err)
}
ilog.WithField("interface", iface.Name).Debug("successfully detach ingressTCX hook")
} else {
return fmt.Errorf("ingress link does not have a TCX ingress hook")
}
}

return nil
}

func removeTCFilters(ifName string, tcDir uint32) error {
link, err := netlink.LinkByName(ifName)
if err != nil {
Expand All @@ -299,7 +327,7 @@ func removeTCFilters(ifName string, tcDir uint32) error {
return kerrors.NewAggregate(errs)
}

func (m *FlowFetcher) removePreviousFilters(iface ifaces.Interface) error {
func unregister(iface ifaces.Interface) error {
ilog := log.WithField("iface", iface)
ilog.Debugf("looking for previously installed TC filters on %s", iface.Name)
links, err := netlink.LinkList()
Expand Down Expand Up @@ -357,6 +385,12 @@ func (m *FlowFetcher) removePreviousFilters(iface ifaces.Interface) error {
return nil
}

func (m *FlowFetcher) UnRegister(iface ifaces.Interface) error {
// qdiscs, ingress and egress filters are automatically deleted so we don't need to
// specifically detach them from the ebpfFetcher
return unregister(iface)
}

// Register and links the eBPF fetcher into the system. The program should invoke Unregister
// before exiting.
func (m *FlowFetcher) Register(iface ifaces.Interface) error {
Expand Down Expand Up @@ -394,7 +428,7 @@ func (m *FlowFetcher) Register(iface ifaces.Interface) error {
m.qdiscs[iface] = qdisc

// Remove previously installed filters
if err := m.removePreviousFilters(iface); err != nil {
if err := unregister(iface); err != nil {
return fmt.Errorf("failed to remove previous filters: %w", err)
}

Expand Down Expand Up @@ -917,8 +951,13 @@ func registerInterface(iface ifaces.Interface) (*netlink.GenericQdisc, netlink.L
return qdisc, ipvlan, nil
}

func (p *PacketFetcher) Register(iface ifaces.Interface) error {
func (p *PacketFetcher) UnRegister(iface ifaces.Interface) error {
// qdiscs, ingress and egress filters are automatically deleted so we don't need to
// specifically detach them from the ebpfFetcher
return unregister(iface)
}

func (p *PacketFetcher) Register(iface ifaces.Interface) error {
qdisc, ipvlan, err := registerInterface(iface)
if err != nil {
return err
Expand All @@ -931,6 +970,32 @@ func (p *PacketFetcher) Register(iface ifaces.Interface) error {
return p.registerIngress(iface, ipvlan)
}

func (p *PacketFetcher) DetachTCX(iface ifaces.Interface) error {
ilog := log.WithField("iface", iface)
if p.enableEgress {
if l := p.egressTCXLink[iface]; l != nil {
if err := l.Close(); err != nil {
return fmt.Errorf("TCX: failed to close egress link: %w", err)
}
ilog.WithField("interface", iface.Name).Debug("successfully detach egressTCX hook")
} else {
return fmt.Errorf("egress link does not support TCX hook")
}
}

if p.enableIngress {
if l := p.ingressTCXLink[iface]; l != nil {
if err := l.Close(); err != nil {
return fmt.Errorf("TCX: failed to close ingress link: %w", err)
}
ilog.WithField("interface", iface.Name).Debug("successfully detach ingressTCX hook")
} else {
return fmt.Errorf("ingress link does not support TCX hook")
}
}
return nil
}

func (p *PacketFetcher) AttachTCX(iface ifaces.Interface) error {
ilog := log.WithField("iface", iface)
if iface.NetNS != netns.None() {
Expand Down
15 changes: 15 additions & 0 deletions pkg/ifaces/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,21 @@ func (w *Watcher) sendUpdates(ctx context.Context, ns string, out chan Event) {
if _, ok := w.current[iface]; ok {
delete(w.current, iface)
out <- Event{Type: EventDeleted, Interface: iface}
handle, err := netlink.NewHandleAt(iface.NetNS)
if err != nil {
log.WithError(err).Errorf("failed to create handle for netns (%s): %v", iface.NetNS.String(), err)
}
links, err := handle.LinkList()
if err != nil {
log.WithError(err).Errorf("failed to list netlink devices: %v", err)
}
handle.Delete()
if len(links) == 1 && links[0].Attrs().Name == "lo" {
log.Debugf("close netns %s", iface.NetNS.String())
iface.NetNS.Close()
netns.DeleteNamed(iface.NetNS.String())

Check failure on line 154 in pkg/ifaces/watcher.go

View workflow job for this annotation

GitHub Actions / test (1.21)

Error return value of `netns.DeleteNamed` is not checked (errcheck)

Check failure on line 154 in pkg/ifaces/watcher.go

View workflow job for this annotation

GitHub Actions / test (1.22)

Error return value of `netns.DeleteNamed` is not checked (errcheck)
close(doneChan)
}
}
}
w.mutex.Unlock()
Expand Down
9 changes: 9 additions & 0 deletions pkg/test/tracer_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,19 @@ func (m *TracerFake) Register(iface ifaces.Interface) error {
return nil
}

func (m *TracerFake) UnRegister(iface ifaces.Interface) error {
m.interfaces[iface] = struct{}{}
return nil
}

func (m *TracerFake) AttachTCX(_ ifaces.Interface) error {
return nil
}

func (m *TracerFake) DetachTCX(_ ifaces.Interface) error {
return nil
}

func (m *TracerFake) LookupAndDeleteMap(_ *metrics.Metrics) map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics {
select {
case r := <-m.mapLookups:
Expand Down

0 comments on commit 45c749d

Please sign in to comment.