Skip to content

Commit

Permalink
[BPF] detach cali programs from devices no longer in regex
Browse files Browse the repository at this point in the history
It can happen that the device regexp changes and calico programs then
remain attached to devices that we do not observe anymore. For instance,
user has the default ethX in the regexp, but those devices are bonded
and the user does not have bondX in the regexp. The user fixes the
regexp to include bondX only, but then the programs stay attached to
ethX. User has to remove the programs manually or restart the node.

We figure out that the programs on devices not in regexp are ours when
those programs have a jump map pinned inthe calico bpffs hierarchy.

This is not really a problem for workload devices as they usually
disapear and programs get detached then.
  • Loading branch information
tomastigera committed Nov 18, 2022
1 parent a3dee7d commit d00a4f1
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 35 deletions.
63 changes: 37 additions & 26 deletions felix/bpf/tc/attach.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,30 +187,8 @@ func (ap *AttachPoint) AttachProgram() (int, error) {
}
logCxt.Info("Program attached to TC.")

var progErrs []error
for _, p := range progsToClean {
log.WithField("prog", p).Debug("Cleaning up old calico program")
attemptCleanup := func() error {
_, err := ExecTC("filter", "del", "dev", ap.Iface, string(ap.Hook), "pref", p.pref, "handle", p.handle, "bpf")
return err
}
err = attemptCleanup()
if errors.Is(err, ErrInterrupted) {
// This happens if the interface is deleted in the middle of calling tc.
log.Debug("First cleanup hit 'Dump was interrupted', retrying (once).")
err = attemptCleanup()
}
if errors.Is(err, ErrDeviceNotFound) {
continue
}
if err != nil {
log.WithError(err).WithField("prog", p).Warn("Failed to clean up old calico program.")
progErrs = append(progErrs, err)
}
}

if len(progErrs) != 0 {
return -1, fmt.Errorf("failed to clean up one or more old calico programs: %v", progErrs)
if err := ap.detachPrograms(progsToClean); err != nil {
return -1, err
}

// Store information of object in a json file so in future we can skip reattaching it.
Expand Down Expand Up @@ -240,8 +218,41 @@ func (ap *AttachPoint) patchLogPrefix(logCtx *log.Entry, ifile, ofile string) er
}

func (ap *AttachPoint) DetachProgram() error {
// We never detach TC programs, so this should not be called.
ap.Log().Panic("DetachProgram is not implemented for TC")
progsToClean, err := ap.listAttachedPrograms()
if err != nil {
return err
}

return ap.detachPrograms(progsToClean)
}

func (ap *AttachPoint) detachPrograms(progsToClean []attachedProg) error {
var progErrs []error
for _, p := range progsToClean {
log.WithField("prog", p).Debug("Cleaning up old calico program")
attemptCleanup := func() error {
_, err := ExecTC("filter", "del", "dev", ap.Iface, string(ap.Hook), "pref", p.pref, "handle", p.handle, "bpf")
return err
}
err := attemptCleanup()
if errors.Is(err, ErrInterrupted) {
// This happens if the interface is deleted in the middle of calling tc.
log.Debug("First cleanup hit 'Dump was interrupted', retrying (once).")
err = attemptCleanup()
}
if errors.Is(err, ErrDeviceNotFound) {
continue
}
if err != nil {
log.WithError(err).WithField("prog", p).Warn("Failed to clean up old calico program.")
progErrs = append(progErrs, err)
}
}

if len(progErrs) != 0 {
return fmt.Errorf("failed to clean up one or more old calico programs: %v", progErrs)
}

return nil
}

Expand Down
78 changes: 75 additions & 3 deletions felix/dataplane/linux/bpf_ep_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ const (
)

type bpfEndpointManager struct {
initAttaches map[string]bpf.EPAttachInfo
initUnknownIfaces set.Set[string]

// Main store of information about interfaces; indexed on interface name.
ifacesLock sync.Mutex
nameToIface map[string]bpfInterface
Expand Down Expand Up @@ -284,6 +287,7 @@ func newBPFEndpointManager(
livenessCallback = func() {}
}
m := &bpfEndpointManager{
initUnknownIfaces: set.New[string](),
dp: dp,
allWEPs: map[proto.WorkloadEndpointID]*proto.WorkloadEndpoint{},
happyWEPs: map[proto.WorkloadEndpointID]*proto.WorkloadEndpoint{},
Expand Down Expand Up @@ -555,6 +559,38 @@ func (m *bpfEndpointManager) updateIfaceStateMap(name string, iface *bpfInterfac
}
}

func (m *bpfEndpointManager) cleanupOldAttach(iface string, ai bpf.EPAttachInfo) error {
if ai.XDPId != 0 {
ap := xdp.AttachPoint{
Iface: iface,
// Try all modes in this order
Modes: []bpf.XDPMode{bpf.XDPGeneric, bpf.XDPDriver, bpf.XDPOffload},
}

if err := m.dp.ensureNoProgram(&ap); err != nil {
return fmt.Errorf("xdp: %w", err)
}
}
if ai.TCId != 0 {
ap := tc.AttachPoint{
Iface: iface,
Hook: bpf.HookEgress,
}

if err := m.dp.ensureNoProgram(&ap); err != nil {
return fmt.Errorf("tc egress: %w", err)
}

ap.Hook = bpf.HookIngress

if err := m.dp.ensureNoProgram(&ap); err != nil {
return fmt.Errorf("tc ingress: %w", err)
}
}

return nil
}

func (m *bpfEndpointManager) onInterfaceUpdate(update *ifaceUpdate) {
log.Debugf("Interface update for %v, state %v", update.Name, update.State)
// Should be safe without the lock since there shouldn't be any active background threads
Expand All @@ -569,6 +605,18 @@ func (m *bpfEndpointManager) onInterfaceUpdate(update *ifaceUpdate) {
}

if !m.isDataIface(update.Name) && !m.isWorkloadIface(update.Name) && !m.isL3Iface(update.Name) {
if update.State == ifacemonitor.StateUp {
if ai, ok := m.initAttaches[update.Name]; ok {
if err := m.cleanupOldAttach(update.Name, ai); err != nil {
log.WithError(err).Warnf("Failed to detach old programs from now unused device '%s'", update.Name)
} else {
delete(m.initAttaches, update.Name)
}
}
}
if m.initUnknownIfaces != nil {
m.initUnknownIfaces.Add(update.Name)
}
log.WithField("update", update).Debug("Ignoring interface that's neither data nor workload nor L3.")
return
}
Expand All @@ -579,6 +627,7 @@ func (m *bpfEndpointManager) onInterfaceUpdate(update *ifaceUpdate) {
// For specific host endpoints OnHEPUpdate doesn't depend on iface state, and has
// already stored and mapped as needed.
if ifaceIsUp {
delete(m.initAttaches, update.Name)
// We require host interfaces to be in non-strict RPF mode so that
// packets can return straight to host for services bypassing CTLB.
switch update.Name {
Expand Down Expand Up @@ -752,7 +801,23 @@ func (m *bpfEndpointManager) markExistingWEPDirty(wlID proto.WorkloadEndpointID,

func (m *bpfEndpointManager) CompleteDeferredWork() error {
// Do one-off initialisation.
m.startupOnce.Do(m.dp.ensureStarted())
m.startupOnce.Do(func() {
m.dp.ensureStarted()

m.initUnknownIfaces.Iter(func(iface string) error {
if ai, ok := m.initAttaches[iface]; ok {
if err := m.cleanupOldAttach(iface, ai); err != nil {
log.WithError(err).Warnf("Failed to detach old programs from now unused device '%s'", iface)
} else {
delete(m.initAttaches, iface)
return set.RemoveItem
}
}
return nil
})

m.initUnknownIfaces = nil
})

m.applyProgramsToDirtyDataInterfaces()
m.updateWEPsInDataplane()
Expand Down Expand Up @@ -1681,6 +1746,13 @@ func (m *bpfEndpointManager) setRPFilter(iface string, val int) error {
func (m *bpfEndpointManager) ensureStarted() {
log.Info("Starting map cleanup runner.")
m.mapCleanupRunner.Start(context.Background())

var err error

m.initAttaches, err = bpf.ListCalicoAttached()
if err != nil {
log.WithError(err).Warn("Failed to list previously attached programs. We may not clean up some.")
}
}

func (m *bpfEndpointManager) ensureBPFDevices() error {
Expand Down Expand Up @@ -1840,11 +1912,11 @@ func (m *bpfEndpointManager) ensureNoProgram(ap attachPoint) error {
}
}

// Ensure interface does not have our XDP program attached in any mode.
// Ensure interface does not have our program attached.
err := ap.DetachProgram()

// Forget the policy debug info
m.removePolicyDebugInfo(ap.IfaceName(), 4, bpf.HookXDP)
m.removePolicyDebugInfo(ap.IfaceName(), 4, ap.HookName())

return err
}
Expand Down
89 changes: 83 additions & 6 deletions felix/dataplane/linux/bpf_ep_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type mockDataplane struct {
fds map[string]uint32
state map[uint32]polprog.Rules
routes map[ip.V4CIDR]struct{}

ensureStartedFn func()
}

func newMockDataplane() *mockDataplane {
Expand All @@ -65,6 +67,9 @@ func newMockDataplane() *mockDataplane {
}

func (m *mockDataplane) ensureStarted() {
if m.ensureStartedFn != nil {
m.ensureStartedFn()
}
}

func (m *mockDataplane) ensureBPFDevices() error {
Expand Down Expand Up @@ -140,6 +145,12 @@ func (m *mockDataplane) setAndReturn(vari **polprog.Rules, key string) func() *p
}
}

func (m *mockDataplane) programAttached(key string) bool {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.fds[key] != 0
}

func (m *mockDataplane) setRoute(cidr ip.V4CIDR) {
m.mutex.Lock()
defer m.mutex.Unlock()
Expand Down Expand Up @@ -168,7 +179,6 @@ var _ = Describe("BPF Endpoint Manager", func() {
fibLookupEnabled bool
endpointToHostAction string
dataIfacePattern string
l3IfacePattern string
workloadIfaceRegex string
ipSetIDAllocator *idalloc.IDAllocator
vxlanMTU int
Expand All @@ -183,8 +193,7 @@ var _ = Describe("BPF Endpoint Manager", func() {
BeforeEach(func() {
fibLookupEnabled = true
endpointToHostAction = "DROP"
dataIfacePattern = "^((en|wl|ww|sl|ib)[opsx].*|(eth|wlan|wwan).*|tunl0$|wireguard.cali$)"
l3IfacePattern = "^(wireguard.cali$)"
dataIfacePattern = "^eth0"
workloadIfaceRegex = "cali"
ipSetIDAllocator = idalloc.New()
vxlanMTU = 0
Expand Down Expand Up @@ -225,7 +234,6 @@ var _ = Describe("BPF Endpoint Manager", func() {
Hostname: "uthost",
BPFLogLevel: "info",
BPFDataIfacePattern: regexp.MustCompile(dataIfacePattern),
BPFL3IfacePattern: regexp.MustCompile(l3IfacePattern),
VXLANMTU: vxlanMTU,
VXLANPort: rrConfigNormal.VXLANPort,
BPFNodePortDSREnabled: nodePortDSR,
Expand Down Expand Up @@ -255,8 +263,8 @@ var _ = Describe("BPF Endpoint Manager", func() {
bpfEpMgr.OnUpdate(&ifaceUpdate{Name: name, State: state, Index: index})
err := bpfEpMgr.CompleteDeferredWork()
Expect(err).NotTo(HaveOccurred())
if state == ifacemonitor.StateUp {
Expect(ifStateMap.ContainsKey(ifstate.NewKey(uint32(index)).AsBytes())).To(BeTrue())
if state == ifacemonitor.StateUp && (bpfEpMgr.isDataIface(name) || bpfEpMgr.isWorkloadIface(name)) {
ExpectWithOffset(1, ifStateMap.ContainsKey(ifstate.NewKey(uint32(index)).AsBytes())).To(BeTrue())
}
}
}
Expand Down Expand Up @@ -415,6 +423,75 @@ var _ = Describe("BPF Endpoint Manager", func() {
genIfaceUpdate("eth0", ifacemonitor.StateUp, 10)()
})

It("should attach to eth0", func() {
Expect(dp.programAttached("eth0:ingress")).To(BeTrue())
Expect(dp.programAttached("eth0:egress")).To(BeTrue())
})

Context("with dataIfacePattern changed to eth1", func() {
JustBeforeEach(func() {
dataIfacePattern = "^eth1"
newBpfEpMgr()

dp.ensureStartedFn = func() {
bpfEpMgr.initAttaches = map[string]bpf.EPAttachInfo{
"eth0": {TCId: 12345},
}
}

})

It("should detach from eth0 when eth0 up before first CompleteDeferredWork()", func() {
Expect(dp.programAttached("eth0:ingress")).To(BeTrue())
Expect(dp.programAttached("eth0:egress")).To(BeTrue())

genIfaceUpdate("eth0", ifacemonitor.StateUp, 10)()
genIfaceUpdate("eth1", ifacemonitor.StateUp, 11)()

err := bpfEpMgr.CompleteDeferredWork()
Expect(err).NotTo(HaveOccurred())

// We inherited dp from the previous bpfEpMgr and it has eth0
// attached. This should clean it up.
Expect(dp.programAttached("eth0:ingress")).To(BeFalse())
Expect(dp.programAttached("eth0:egress")).To(BeFalse())

Expect(dp.programAttached("eth1:ingress")).To(BeTrue())
Expect(dp.programAttached("eth1:egress")).To(BeTrue())
})

It("should detach from eth0 when eth0 up after first CompleteDeferredWork()", func() {
Expect(dp.programAttached("eth0:ingress")).To(BeTrue())
Expect(dp.programAttached("eth0:egress")).To(BeTrue())

genIfaceUpdate("eth1", ifacemonitor.StateUp, 11)()

err := bpfEpMgr.CompleteDeferredWork()
Expect(err).NotTo(HaveOccurred())

// We inherited dp from the previous bpfEpMgr and it has eth0
// attached. We should see it.
Expect(dp.programAttached("eth0:ingress")).To(BeTrue())
Expect(dp.programAttached("eth0:egress")).To(BeTrue())

Expect(dp.programAttached("eth1:ingress")).To(BeTrue())
Expect(dp.programAttached("eth1:egress")).To(BeTrue())

genIfaceUpdate("eth0", ifacemonitor.StateUp, 10)()

err = bpfEpMgr.CompleteDeferredWork()
Expect(err).NotTo(HaveOccurred())

// We inherited dp from the previous bpfEpMgr and it has eth0
// attached. This should clean it up.
Expect(dp.programAttached("eth0:ingress")).To(BeFalse())
Expect(dp.programAttached("eth0:egress")).To(BeFalse())

Expect(dp.programAttached("eth1:ingress")).To(BeTrue())
Expect(dp.programAttached("eth1:egress")).To(BeTrue())
})
})

Context("with eth0 host endpoint", func() {
JustBeforeEach(genHEPUpdate("eth0", hostEp))

Expand Down

0 comments on commit d00a4f1

Please sign in to comment.