Skip to content

Commit

Permalink
coordinator: ensure detect gateway and ip conflict in pod's netns
Browse files Browse the repository at this point in the history
The coordinator plugin uses errgroup to concurrently check the reachability of gateways and whether IP addresses conflict.However, when launching goroutines in [netns.Do]("github.com/containernetworking/plugins/pkg/ns"), the Go runtime cannot guarantee that the code will be executed in the specified network namespace. Therefore, we modified the Go method of errgroup: manually switch to the target network namespace when launching a goroutine and return to the original network namespace after execution.
  • Loading branch information
cyclinder committed Dec 20, 2023
1 parent 3b3a4fb commit 17a5909
Show file tree
Hide file tree
Showing 339 changed files with 1,287 additions and 2,604 deletions.
26 changes: 15 additions & 11 deletions cmd/coordinator/cmd/command_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@
package cmd

import (
"context"
"fmt"
"time"

"github.com/containernetworking/cni/pkg/skel"
"github.com/containernetworking/cni/pkg/types"
current "github.com/containernetworking/cni/pkg/types/100"
"github.com/containernetworking/plugins/pkg/ns"
"github.com/spidernet-io/spiderpool/pkg/errgroup"
"github.com/vishvananda/netlink"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"time"

"github.com/spidernet-io/spiderpool/api/v1/agent/client/daemonset"
"github.com/spidernet-io/spiderpool/api/v1/agent/models"
Expand Down Expand Up @@ -110,6 +110,12 @@ func CmdAdd(args *skel.CmdArgs) (err error) {
}
defer c.netns.Close()

c.hostNs, err = ns.GetCurrentNS()
if err != nil {
return fmt.Errorf("failed to get current netns: %v", err)
}
logger.Sugar().Debugf("Get current host netns: %v", c.hostNs.Path())

// check if it's first time invoke
err = c.coordinatorModeAndFirstInvoke(logger, conf.PodDefaultCniNic)
if err != nil {
Expand Down Expand Up @@ -148,9 +154,7 @@ func CmdAdd(args *skel.CmdArgs) (err error) {

logger.Sugar().Infof("Get coordinator config: %+v", c)

errg, ctx := errgroup.WithContext(context.Background())
defer ctx.Done()

errg := errgroup.Group{}
// we do detect gateway connection firstly
if conf.DetectGateway != nil && *conf.DetectGateway {
logger.Debug("Try to detect gateway")
Expand All @@ -175,26 +179,26 @@ func CmdAdd(args *skel.CmdArgs) (err error) {
if err != nil {
return fmt.Errorf("failed to run NewPinger: %v", err)
}
errg.Go(p.DetectGateway)
errg.Go(c.hostNs, c.netns, p.DetectGateway)
}
} else {
logger.Debug("disable detect gateway")
}

if conf.IPConflict != nil && *conf.IPConflict {
logger.Debug("Try to detect ip conflict")
ipc, err := ipchecking.NewIPChecker(conf.DetectOptions.Retry, conf.DetectOptions.Interval, conf.DetectOptions.TimeOut, c.netns, logger)
ipc, err := ipchecking.NewIPChecker(conf.DetectOptions.Retry, conf.DetectOptions.Interval, conf.DetectOptions.TimeOut, c.hostNs, c.netns, logger)
if err != nil {
return fmt.Errorf("failed to run NewIPChecker: %w", err)
}
ipc.DoIPConflictChecking(prevResult.IPs, c.currentInterface, errg)
ipc.DoIPConflictChecking(prevResult.IPs, c.currentInterface, &errg)
} else {
logger.Debug("disable detect ip conflict")
}

if err = errg.Wait(); err != nil {
logger.Error("failed to ip checking", zap.Error(err))
return fmt.Errorf("failed to ip checking: %w", err)
logger.Error("failed to detect gateway and ip checking", zap.Error(err))
return err
}

// overwrite mac address
Expand Down
2 changes: 1 addition & 1 deletion cmd/coordinator/cmd/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type coordinator struct {
tuneMode Mode
hostVethName, podVethName, currentInterface string
HijackCIDR, podNics []string
netns ns.NetNS
netns, hostNs ns.NetNS
hostVethHwAddress, podVethHwAddress net.HardwareAddr
currentAddress []netlink.Addr
hostIPRouteForPod []net.IP
Expand Down
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ require (
go.uber.org/atomic v1.10.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.25.0
golang.org/x/net v0.13.0
golang.org/x/sync v0.3.0
golang.org/x/sys v0.11.0
golang.org/x/net v0.17.0
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.14.0
golang.org/x/tools v0.11.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gopkg.in/yaml.v2 v2.4.0
Expand Down Expand Up @@ -142,12 +142,12 @@ require (
github.com/vishvananda/netns v0.0.4 // indirect
go.mongodb.org/mongo-driver v1.11.3 // indirect
go.opentelemetry.io/otel/trace v1.16.0 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/oauth2 v0.8.0 // indirect
golang.org/x/term v0.10.0 // indirect
golang.org/x/text v0.11.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -597,8 +597,8 @@ golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA=
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand Down Expand Up @@ -679,8 +679,8 @@ golang.org/x/net v0.0.0-20210421230115-4e50805a0758/go.mod h1:72T/g9IO56b78aLF+1
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211209124913-491a49abca63/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.13.0 h1:Nvo8UFsZ8X3BhAC9699Z1j7XQ3rsZnUUm7jfBEk1ueY=
golang.org/x/net v0.13.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -771,12 +771,12 @@ golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.9.1-0.20230616193735-e0c3b6e6ae3b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.10.0 h1:3R7pNqamzBraeqj/Tj8qt1aQ2HpmlC+Cx/qL/7hn4/c=
golang.org/x/term v0.10.0/go.mod h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o=
golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek=
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand All @@ -786,8 +786,8 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4=
golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
10 changes: 10 additions & 0 deletions pkg/errgroup/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# errgroup

## Background

This file originates from "golang.org/x/sync/errgroup". The coordinator plugin uses errgroup to concurrently check the reachability of gateways and whether IP addresses conflict.
However, when launching goroutines in [netns.Do]("github.com/containernetworking/plugins/pkg/ns"), the Go runtime cannot guarantee that the code will be executed in the specified
network namespace. Therefore, we modified the `Go()` method of errgroup: manually switch to the target network namespace when launching a goroutine and return to the original network
namespace after execution.

Please see `errgroup.go` to find more details.
111 changes: 111 additions & 0 deletions pkg/errgroup/errgroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2023 Authors of spidernet-io
// SPDX-License-Identifier: Apache-2.0

// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package errgroup provides synchronization, error propagation, and Context
// cancellation for groups of goroutines working on subtasks of a common task.

package errgroup

import (
"fmt"
"runtime"
"sync"

"github.com/containernetworking/plugins/pkg/ns"
)

type token struct{}

// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid, has no limit on the number of active goroutines,
// and does not cancel on error.
type Group struct {
cancel func(error)

wg sync.WaitGroup

sem chan token

errOnce sync.Once
err error
}

func (g *Group) done() {
if g.sem != nil {
<-g.sem
}
g.wg.Done()
}

// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel(g.err)
}
return g.err
}

// Go calls the given function in a new goroutine.
// It blocks until the new goroutine can be added without the number of
// active goroutines in the group exceeding the configured limit.
//
// The first call to return a non-nil error cancels the group's context, if the
// group was created by calling WithContext. The error will be returned by Wait.
//
// UPDATED: golang each OS thread can have a different
// network namespace, and Go's thread scheduling is highly
// variable, callers cannot guarantee any specific namespace
// is set unless operations that require that namespace are
// wrapped with Do().
// see https://github.com/golang/go/wiki/LockOSThread and
// https://www.weave.works/blog/linux-namespaces-golang-followup
// to more details.
func (g *Group) Go(srcNs, targetNs ns.NetNS, f func() error) {
if g.sem != nil {
g.sem <- token{}
}

g.wg.Add(1)
go func() {
defer g.done()
runtime.LockOSThread()

// switch to pod's netns
if err := targetNs.Set(); err != nil {
g.errOnce.Do(func() {
g.err = fmt.Errorf("failed to switch to pod's netns: %v", err)
if g.cancel != nil {
g.cancel(g.err)
}
})
}

defer func() {
err := srcNs.Set() // switch back
if err == nil {
// Unlock the current thread only when we successfully switched back
// to the original namespace; otherwise leave the thread locked which
// will force the runtime to scrap the current thread, that is maybe
// not as optimal but at least always safe to do.
runtime.UnlockOSThread()
}
}()

if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel(g.err)
}
})
}
}()
}
48 changes: 34 additions & 14 deletions pkg/networking/ipchecking/ipchecking.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,31 @@ import (
"fmt"
"net"
"net/netip"
"runtime"
"time"

types100 "github.com/containernetworking/cni/pkg/types/100"
"github.com/containernetworking/plugins/pkg/ns"
"github.com/mdlayher/arp"
"github.com/mdlayher/ethernet"
"github.com/mdlayher/ndp"
"github.com/spidernet-io/spiderpool/pkg/errgroup"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

type IPChecker struct {
retries int
interval time.Duration
timeout time.Duration
netns ns.NetNS
ip4, ip6 netip.Addr
ifi *net.Interface
arpClient *arp.Client
ndpClient *ndp.Conn
logger *zap.Logger
retries int
interval time.Duration
timeout time.Duration
netns, hostNs ns.NetNS
ip4, ip6 netip.Addr
ifi *net.Interface
arpClient *arp.Client
ndpClient *ndp.Conn
logger *zap.Logger
}

func NewIPChecker(retries int, interval, timeout string, netns ns.NetNS, logger *zap.Logger) (*IPChecker, error) {
func NewIPChecker(retries int, interval, timeout string, hostNs, netns ns.NetNS, logger *zap.Logger) (*IPChecker, error) {
var err error

ipc := new(IPChecker)
Expand All @@ -51,6 +52,7 @@ func NewIPChecker(retries int, interval, timeout string, netns ns.NetNS, logger
return nil, err
}

ipc.hostNs = hostNs
ipc.netns = netns
ipc.logger = logger
return ipc, nil
Expand Down Expand Up @@ -79,15 +81,15 @@ func (ipc *IPChecker) DoIPConflictChecking(ipconfigs []*types100.IPConfig, iface
if err != nil {
return fmt.Errorf("failed to init arp client: %w", err)
}
errg.Go(ipc.ipCheckingByARP)
errg.Go(ipc.hostNs, ipc.netns, ipc.ipCheckingByARP)
} else {
ipc.logger.Debug("IPCheckingByNDP", zap.String("ipv6 address", target.String()))
ipc.ip6 = target
ipc.ndpClient, _, err = ndp.Listen(ipc.ifi, ndp.LinkLocal)
if err != nil {
return fmt.Errorf("failed to init ndp client: %w", err)
}
errg.Go(ipc.ipCheckingByNDP)
errg.Go(ipc.hostNs, ipc.netns, ipc.ipCheckingByNDP)
}
}
return nil
Expand All @@ -98,12 +100,30 @@ func (ipc *IPChecker) ipCheckingByARP() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var err error
defer ipc.arpClient.Close()

var conflictingMac string
var err error
// start a goroutine to receive arp response
go func() {
runtime.LockOSThread()

// switch to pod's netns
if e := ipc.netns.Set(); e != nil {
ipc.logger.Warn("Detect IP Conflict: failed to switch to pod's net namespace")
}

defer func() {
err := ipc.hostNs.Set() // switch back
if err == nil {
// Unlock the current thread only when we successfully switched back
// to the original namespace; otherwise leave the thread locked which
// will force the runtime to scrap the current thread, that is maybe
// not as optimal but at least always safe to do.
runtime.UnlockOSThread()
}
}()

var packet *arp.Packet
for {
select {
Expand Down
Loading

0 comments on commit 17a5909

Please sign in to comment.