From 880a28ad3172478e268d1867b6be2105b492ac50 Mon Sep 17 00:00:00 2001 From: njucjc Date: Mon, 17 Apr 2023 17:16:41 +0800 Subject: [PATCH] Feature: use ipset for policy route --- .github/workflows/build.yml | 14 +- Dockerfile | 2 +- go.mod | 4 +- go.sum | 4 + pkg/networkengine/routedriver/vxlan/utils.go | 150 +++++++++++ pkg/networkengine/routedriver/vxlan/vxlan.go | 238 ++++++++++++------ .../routedriver/vxlan/vxlan_test.go | 36 +++ pkg/networkengine/util/ipset/ipset.go | 139 ++++++++++ pkg/networkengine/util/iptables/constants.go | 27 ++ pkg/networkengine/util/iptables/iptables.go | 115 +++++++++ pkg/networkengine/util/utils.go | 62 +++-- pkg/networkengine/util/utils_test.go | 19 +- 12 files changed, 703 insertions(+), 107 deletions(-) create mode 100644 pkg/networkengine/util/ipset/ipset.go create mode 100644 pkg/networkengine/util/iptables/constants.go create mode 100644 pkg/networkengine/util/iptables/iptables.go diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index ab5f10b..9fa0b33 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -40,7 +40,11 @@ jobs: ${{ runner.os }}-buildx- - name: Unit test - run: sudo go test -v -short ./pkg/... ./cmd/... -coverprofile cover.out + run: | + sudo apt-get update -y + sudo apt-get install -y ipset + sudo apt-get install -y iptables + sudo go test -v -short ./pkg/... ./cmd/... -coverprofile cover.out - name: Publish Unit Test Coverage uses: codecov/codecov-action@v3 @@ -100,10 +104,14 @@ jobs: path: /tmp/.buildx-cache key: ${{ runner.os }}-buildx-${{ github.sha }} restore-keys: | - ${{ runner.os }}-buildx- + ${{ runner.os }}-buildx- - name: Unit test - run: sudo go test -v -short ./pkg/... ./cmd/... + run: | + sudo apt-get update -y + sudo apt-get install -y ipset + sudo apt-get install -y iptables + sudo go test -v -short ./pkg/... ./cmd/... - name: Docker meta id: meta diff --git a/Dockerfile b/Dockerfile index 31a233f..d1b327e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -22,7 +22,7 @@ RUN CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} GO111MODULE=on go build FROM alpine:3.17 RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories \ - && apk add openrc libreswan libreswan-openrc iptables python3 bash --no-cache \ + && apk add openrc libreswan libreswan-openrc ipset iptables python3 bash --no-cache \ && sed -i 's/runscript/openrc-run/g' /etc/init.d/ipsec \ && sed -i 's/#logfile=/logfile=/g' /etc/ipsec.conf \ && mkdir -p /run/openrc \ diff --git a/go.mod b/go.mod index b57fd9b..9820f11 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,8 @@ go 1.18 require ( github.com/EvilSuperstars/go-cidrman v0.0.0-20190607145828-28e79e32899a + github.com/coreos/go-iptables v0.6.0 + github.com/gonetx/ipset v0.1.0 github.com/openyurtio/openyurt v1.2.1-0.20230320014349-7cc573e1d097 github.com/pkg/errors v0.9.1 github.com/spf13/cobra v1.7.0 @@ -11,6 +13,7 @@ require ( github.com/stretchr/testify v1.8.2 github.com/vdobler/ht v5.3.0+incompatible github.com/vishvananda/netlink v1.2.1-beta.2 + golang.org/x/sys v0.6.0 golang.zx2c4.com/wireguard/wgctrl v0.0.0-20220504211119-3d4a969bb56b k8s.io/apimachinery v0.23.2 k8s.io/apiserver v0.23.2 @@ -87,7 +90,6 @@ require ( golang.org/x/net v0.7.0 // indirect golang.org/x/oauth2 v0.5.0 // indirect golang.org/x/sync v0.1.0 // indirect - golang.org/x/sys v0.6.0 // indirect golang.org/x/term v0.5.0 // indirect golang.org/x/text v0.7.0 // indirect golang.org/x/time v0.3.0 // indirect diff --git a/go.sum b/go.sum index d3d0498..89bedce 100644 --- a/go.sum +++ b/go.sum @@ -105,6 +105,8 @@ github.com/cockroachdb/errors v1.2.4/go.mod h1:rQD95gz6FARkaKkQXUksEje/d9a6wBJoC github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f/go.mod h1:i/u985jwjWRlyHXQbwatDASoW0RMlZ/3i9yJHE2xLkI= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/go-iptables v0.6.0 h1:is9qnZMPYjLd8LYqmm/qlE+wwEgJIkTYdhV3rfZo4jk= +github.com/coreos/go-iptables v0.6.0/go.mod h1:Qe8Bv2Xik5FyTXwgIbLAnv2sWSBmvWdFETJConOQ//Q= github.com/coreos/go-oidc v2.1.0+incompatible/go.mod h1:CgnwVTmzoESiwO9qyAFEMiHoZ1nMCKZlZ9V6mm3/LKc= github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= @@ -221,6 +223,8 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/gonetx/ipset v0.1.0 h1:LFkRdTbedg2UYXFN/2mOtgbvdWyo+OERrwVbtrPVuYY= +github.com/gonetx/ipset v0.1.0/go.mod h1:AwNAf1Vtqg0cJ4bha4w1ROX5cO/8T50UYoegxM20AH8= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4= diff --git a/pkg/networkengine/routedriver/vxlan/utils.go b/pkg/networkengine/routedriver/vxlan/utils.go index 209d0d3..f8e7f32 100644 --- a/pkg/networkengine/routedriver/vxlan/utils.go +++ b/pkg/networkengine/routedriver/vxlan/utils.go @@ -24,11 +24,16 @@ import ( "net" "github.com/vishvananda/netlink" + "golang.org/x/sys/unix" "k8s.io/klog/v2" netlinkutil "github.com/openyurtio/raven/pkg/networkengine/util/netlink" ) +const ( + resetMark = 0x0 +) + func ensureVxlanLink(vxlan netlink.Vxlan, vtepIP net.IP) (netlink.Link, error) { linkExist := func() netlink.Link { link, err := netlink.LinkByName(vxlanLinkName) @@ -74,9 +79,154 @@ func ensureVxlanLink(vxlan netlink.Vxlan, vtepIP net.IP) (netlink.Link, error) { return nil, fmt.Errorf("error add vxlan addr: %v", err) } + // tc qdisc add dev raven0 clsact + err = ensureClsActQdsic(vxLink) + if err != nil { + return nil, fmt.Errorf("error ensure qdisc: %v", err) + } + + // tc filter add dev raven0 egress protocol ip prio 1 matchall action skbedit mark 0x0 + err = ensureSkbEditFilter(vxLink) + if err != nil { + return nil, fmt.Errorf("error ensure filter: %v", err) + } + return vxLink, nil } +func ensureClsActQdsic(link netlink.Link) error { + qds, err := netlink.QdiscList(link) + if err != nil { + return fmt.Errorf("list qdisc for dev %s error, %w", link.Attrs().Name, err) + } + for _, q := range qds { + if q.Type() == "clsact" { + return nil + } + } + qdisc := &netlink.GenericQdisc{ + QdiscAttrs: netlink.QdiscAttrs{ + LinkIndex: link.Attrs().Index, + Parent: netlink.HANDLE_CLSACT, + Handle: netlink.HANDLE_CLSACT & 0xffff0000, + }, + QdiscType: "clsact", + } + if err := netlink.QdiscReplace(qdisc); err != nil { + return fmt.Errorf("replace clsact qdisc for dev %s error, %w", link.Attrs().Name, err) + } + return nil +} + +func deleteClsActQdsic(link netlink.Link) error { + qds, err := netlink.QdiscList(link) + if err != nil { + return fmt.Errorf("list qdisc for dev %s error, %w", link.Attrs().Name, err) + } + var qdisc netlink.Qdisc + for _, q := range qds { + if q.Type() == "clsact" { + qdisc = q + break + } + } + if qdisc != nil { + err = netlink.QdiscDel(qdisc) + if err != nil { + return fmt.Errorf("error delete qdisc: %s", err) + } + } + return nil +} + +func ensureSkbEditFilter(link netlink.Link) error { + filters, err := netlink.FilterList(link, netlink.HANDLE_MIN_EGRESS) + if err != nil { + return fmt.Errorf("list egress filter for %s error, %w", link.Attrs().Name, err) + } + + for _, f := range filters { + if isMatch(f) { + return nil + } + } + + skbedit := netlink.NewSkbEditAction() + mark := uint32(resetMark) + skbedit.Mark = &mark + match := &netlink.MatchAll{ + FilterAttrs: netlink.FilterAttrs{ + LinkIndex: link.Attrs().Index, + Parent: netlink.HANDLE_MIN_EGRESS, + Priority: 20000, + Protocol: unix.ETH_P_IP, + }, + Actions: []netlink.Action{ + skbedit, + }, + } + + return netlink.FilterReplace(match) +} + +func deleteSkbEditFilter(link netlink.Link) error { + filters, err := netlink.FilterList(link, netlink.HANDLE_MIN_EGRESS) + if err != nil { + return fmt.Errorf("list egress filter for %s error, %w", link.Attrs().Name, err) + } + for _, f := range filters { + _ = netlink.FilterDel(f) + } + return nil +} + +func isMatch(filter netlink.Filter) bool { + match, ok := filter.(*netlink.MatchAll) + if !ok { + return false + } + if match.Parent != netlink.HANDLE_MIN_EGRESS || match.Protocol != unix.ETH_P_IP { + return false + } + if len(match.Actions) != 1 { + return false + } + action, ok := match.Actions[0].(*netlink.SkbEditAction) + if !ok { + return false + } + if *action.Mark != resetMark { + return false + } + return true +} + +func deleteVxlanLink(linkName string) error { + vxLink, err := netlink.LinkByName(linkName) + if err != nil { + if _, ok := err.(netlink.LinkNotFoundError); ok { + return nil + } + return fmt.Errorf("error finding vxlan link: %s", err) + } + + err = deleteSkbEditFilter(vxLink) + if err != nil { + return fmt.Errorf("error deleting skbedit filter: %s", err) + } + err = deleteClsActQdsic(vxLink) + if err != nil { + return fmt.Errorf("error deleting clsact qdsic: %s", err) + } + err = netlink.LinkDel(vxLink) + if err != nil { + if _, ok := err.(netlink.LinkNotFoundError); !ok { + return fmt.Errorf("error deleting vxlan link: %s", err) + } + } + return nil +} + func isVxlanConfigChanged(newLink, currentLink netlink.Link) bool { required := newLink.(*netlink.Vxlan) existing := currentLink.(*netlink.Vxlan) diff --git a/pkg/networkengine/routedriver/vxlan/vxlan.go b/pkg/networkengine/routedriver/vxlan/vxlan.go index 131ebfb..8ae0cc7 100644 --- a/pkg/networkengine/routedriver/vxlan/vxlan.go +++ b/pkg/networkengine/routedriver/vxlan/vxlan.go @@ -32,7 +32,8 @@ import ( "github.com/openyurtio/raven/cmd/agent/app/config" "github.com/openyurtio/raven/pkg/networkengine/routedriver" networkutil "github.com/openyurtio/raven/pkg/networkengine/util" - netlinkutil "github.com/openyurtio/raven/pkg/networkengine/util/netlink" + ipsetutil "github.com/openyurtio/raven/pkg/networkengine/util/ipset" + iptablesutil "github.com/openyurtio/raven/pkg/networkengine/util/iptables" "github.com/openyurtio/raven/pkg/types" ) @@ -47,6 +48,15 @@ const ( vxlanGwPrefix = 240 DriverName = "vxlan" + + ravenMark = 0x40 + + ravenMarkSet = "raven-mark-set" +) + +var ( + nonGatewayChainRuleSpec = []string{"-m", "set", "--match-set", ravenMarkSet, "dst", "-j", "MARK", "--set-mark", fmt.Sprintf("%d", ravenMark)} + gatewayChainRuleSpec = []string{"-m", "set", "--match-set", ravenMarkSet, "src", "-j", "MARK", "--set-mark", fmt.Sprintf("%d", ravenMark)} ) func init() { @@ -56,6 +66,9 @@ func init() { type vxlan struct { vxlanIface netlink.Link nodeName types.NodeName + + iptables iptablesutil.IPTablesInterface + ipset ipsetutil.IPSetInterface } func (vx *vxlan) Apply(network *types.Network, vpnDriverMTUFn func() (int, error)) (err error) { @@ -80,6 +93,20 @@ func (vx *vxlan) Apply(network *types.Network, vpnDriverMTUFn func() (int, error // The key is netlink.Neigh.IP var desiredFDBs, currentFDBs map[string]*netlink.Neigh + // The desired and current ipset entries calculated from given network. + // The key is ip set entry + var desiredSet, currentSet map[string]bool + + vx.ipset, err = ipsetutil.New(ravenMarkSet) + if err != nil { + return fmt.Errorf("error create ip set: %s", err) + } + + err = vx.ensureRavenMarkChain() + if err != nil { + return fmt.Errorf("error ensure raven mark chain: %s", err) + } + err = vx.ensureVxlanLink(network, vpnDriverMTUFn) if err != nil { return fmt.Errorf("error ensuring vxlan: %s", err) @@ -98,14 +125,39 @@ func (vx *vxlan) Apply(network *types.Network, vpnDriverMTUFn func() (int, error if err != nil { return fmt.Errorf("error listing fdb on node: %s", err) } + + currentSet, err = networkutil.ListIPSetOnNode(vx.ipset) + if err != nil { + return fmt.Errorf("error listing ip set on node: %s", err) + } + + desiredSet = vx.calIPSetOnNode(network) + desiredRules = vx.calRulesOnNode() + if vx.isGatewayRole(network) { desiredRoutes = vx.calRouteOnGateway(network) - desiredRules = vx.calRulesOnGateway(network) desiredFDBs = vx.calFDBOnGateway(network) + + err = vx.deleteChainRuleOnNode(nonGatewayChainRuleSpec) + if err != nil { + return fmt.Errorf("error deleting non gateway chain rule: %s", err) + } + err = vx.addChainRuleOnNode(gatewayChainRuleSpec) + if err != nil { + return fmt.Errorf("error adding gateway chain rule: %s", err) + } } else { desiredRoutes = vx.calRouteOnNonGateway(network) - desiredRules = vx.calRulesOnNonGateway() desiredFDBs = vx.calFDBOnNonGateway(network) + + err = vx.deleteChainRuleOnNode(gatewayChainRuleSpec) + if err != nil { + return fmt.Errorf("error deleting gateway chain rule: %s", err) + } + err = vx.addChainRuleOnNode(nonGatewayChainRuleSpec) + if err != nil { + return fmt.Errorf("error adding non gateway chain rule: %s", err) + } } err = networkutil.ApplyRoutes(currentRoutes, desiredRoutes) @@ -120,6 +172,10 @@ func (vx *vxlan) Apply(network *types.Network, vpnDriverMTUFn func() (int, error if err != nil { return fmt.Errorf("error applying fdb: %s", err) } + err = networkutil.ApplyIPSet(vx.ipset, currentSet, desiredSet) + if err != nil { + return fmt.Errorf("error applying ip set: %s", err) + } return nil } @@ -166,6 +222,28 @@ func New(cfg *config.Config) (routedriver.Driver, error) { } func (vx *vxlan) Init() (err error) { + vx.iptables, err = iptablesutil.New() + if err != nil { + return err + } + + vx.ipset, err = ipsetutil.New(ravenMarkSet) + if err != nil { + return err + } + return +} + +func (vx *vxlan) ensureRavenMarkChain() error { + if err := vx.iptables.NewChainIfNotExist(iptablesutil.MangleTable, iptablesutil.RavenMarkChain); err != nil { + return fmt.Errorf("error create %s chain: %s", iptablesutil.RavenMarkChain, err) + } + if err := vx.iptables.AppendIfNotExists(iptablesutil.MangleTable, iptablesutil.PreRoutingChain, "-j", iptablesutil.RavenMarkChain); err != nil { + return fmt.Errorf("error adding chain %s rule: %s", iptablesutil.PreRoutingChain, err) + } + if err := vx.iptables.AppendIfNotExists(iptablesutil.MangleTable, iptablesutil.OutputChain, "-j", iptablesutil.RavenMarkChain); err != nil { + return fmt.Errorf("error adding chain %s rule: %s", iptablesutil.OutputChain, err) + } return nil } @@ -235,54 +313,22 @@ func setSysctl(path string, contents []byte) error { // and configure the local gateway node as the next hop for packets sending to remote gateway nodes. // The routes entries format are equivalent to the following `ip route` command: // -// ip route add {remote_subnet} via {local_gateway_raven0_ip} dev raven0 src {node_cni_ip} onlink mtu {mtu} table {routeTableID} +// ip route add default via {local_gateway_raven0_ip} dev raven0 onlink mtu {mtu} table {routeTableID} func (vx *vxlan) calRouteOnNonGateway(network *types.Network) map[string]*netlink.Route { routes := make(map[string]*netlink.Route) - for _, srcCIDR := range vx.nodeInfo(network).Subnets { - src, _, err := net.ParseCIDR(srcCIDR) - if err != nil { - klog.ErrorS(err, "error parsing cidr", "cidr", srcCIDR) - return routes - } - - via := vxlanIP(net.ParseIP(network.LocalEndpoint.PrivateIP)) - for _, v := range network.RemoteEndpoints { - for _, dstCIDR := range v.Subnets { - _, ipnet, err := net.ParseCIDR(dstCIDR) - if err != nil { - klog.ErrorS(err, "error parsing cidr", "cidr", dstCIDR) - continue - } - nr := &netlink.Route{ - LinkIndex: vx.vxlanIface.Attrs().Index, - Scope: netlink.SCOPE_UNIVERSE, - Dst: ipnet, - Gw: via, - Table: routeTableID, - Src: src, - Flags: int(netlink.FLAG_ONLINK), - // TODO should minus vpn mtu OverHead - MTU: vx.vxlanIface.Attrs().MTU, - } - routes[networkutil.RouteKey(nr)] = nr - } - } + via := vxlanIP(net.ParseIP(network.LocalEndpoint.PrivateIP)) + defaultNR := &netlink.Route{ + LinkIndex: vx.vxlanIface.Attrs().Index, + Scope: netlink.SCOPE_UNIVERSE, + Gw: via, + Table: routeTableID, + Flags: int(netlink.FLAG_ONLINK), + MTU: vx.vxlanIface.Attrs().MTU, } + routes[networkutil.RouteKey(defaultNR)] = defaultNR return routes } -// calRulesOnNonGateway calculates and returns the desired rules on non-gateway node. -// Rules on non-gateway will give raven route table a higher priority than main table in order to bypass the CNI routing rules. -// The rules format are equivalent to the following `ip rule` command: -// -// ip rule add from all lookup {routeTableID} prio {rulePriority} -func (vx *vxlan) calRulesOnNonGateway() map[string]*netlink.Rule { - rules := make(map[string]*netlink.Rule) - rule := networkutil.NewRavenRule(rulePriority, routeTableID) - rules[networkutil.RuleKey(rule)] = rule - return rules -} - // calRouteOnGateway calculates and returns the desired routes on gateway. // Routes on gateway node are used to configure the reverse-path route for packets from local non-gateway nodes to remote nodes, // to avoid asymmetric routing. @@ -317,30 +363,16 @@ func (vx *vxlan) calRouteOnGateway(network *types.Network) map[string]*netlink.R return routes } -// calRulesOnGateway calculates and returns the desired rules on gateway node. +// calRulesOnNode calculates and returns the desired rules on node. // Rules on gateway node are used to configure route policy for the reverse-path route. // The rules format are equivalent to the following `ip rule` command: // -// ip rule add from {remote_nodeN_subnet_cidr} lookup {routeTableID} prio {rulePriority} -func (vx *vxlan) calRulesOnGateway(network *types.Network) map[string]*netlink.Rule { +// ip rule add from all fwmark 0x40 lookup {routeTableID} prio {rulePriority} +func (vx *vxlan) calRulesOnNode() map[string]*netlink.Rule { rules := make(map[string]*netlink.Rule) - for _, v := range network.RemoteNodeInfo { - nodeInfo := network.RemoteNodeInfo[types.NodeName(v.NodeName)] - if nodeInfo == nil { - klog.Errorf("node %s not found in RemoteNodeInfo", v.NodeName) - continue - } - for _, srcCIDR := range nodeInfo.Subnets { - _, src, err := net.ParseCIDR(srcCIDR) - if err != nil { - klog.ErrorS(err, "error parsing cidr", "cidr", srcCIDR) - continue - } - rule := networkutil.NewRavenRule(rulePriority, routeTableID) - rule.Src = src - rules[networkutil.RuleKey(rule)] = rule - } - } + rule := networkutil.NewRavenRule(rulePriority, routeTableID) + rule.Mark = ravenMark + rules[networkutil.RuleKey(rule)] = rule return rules } @@ -385,6 +417,30 @@ func (vx *vxlan) calFDBOnNonGateway(network *types.Network) map[string]*netlink. } } +// calIPSetOnNonGateway calculates and returns the desired ip set entries on non-gateway node. +// The ip set entries format equivalent to the following `ipset add SETNAME ENTRY` command: +// +// ipset add raven-egress-set {remote_nodeN_subnet_cidr} +func (vx *vxlan) calIPSetOnNode(network *types.Network) map[string]bool { + set := make(map[string]bool) + for _, v := range network.RemoteNodeInfo { + nodeInfo := network.RemoteNodeInfo[types.NodeName(v.NodeName)] + if nodeInfo == nil { + klog.Errorf("node %s not found in RemoteNodeInfo", v.NodeName) + continue + } + for _, srcCIDR := range nodeInfo.Subnets { + ip, cidr, _ := net.ParseCIDR(srcCIDR) + if bytes.Equal(cidr.Mask, net.CIDRMask(32, 32)) { + set[ip.String()] = true + } else { + set[cidr.String()] = true + } + } + } + return set +} + func (vx *vxlan) Cleanup() error { errList := errorlist.List{} if err := networkutil.CleanRulesOnNode(routeTableID); err != nil { @@ -395,21 +451,59 @@ func (vx *vxlan) Cleanup() error { errList = errList.Append(err) } - l, err := netlinkutil.LinkByName(vxlanLinkName) - if _, ok := err.(netlink.LinkNotFoundError); ok { - return errList.AsError() + if err := deleteVxlanLink(vxlanLinkName); err != nil { + errList = errList.Append(err) + } + + // Clean may be called more than one time, so we should ensure chain exists + + err := vx.iptables.NewChainIfNotExist(iptablesutil.MangleTable, iptablesutil.RavenMarkChain) + if err != nil { + errList = errList.Append(fmt.Errorf("error ensure chain %s: %s", iptablesutil.RavenMarkChain, err)) + } + err = vx.iptables.DeleteIfExists(iptablesutil.MangleTable, iptablesutil.PreRoutingChain, "-j", iptablesutil.RavenMarkChain) + if err != nil { + errList = errList.Append(fmt.Errorf("error deleting %s chain rule: %s", iptablesutil.PreRoutingChain, err)) + } + err = vx.iptables.DeleteIfExists(iptablesutil.MangleTable, iptablesutil.OutputChain, "-j", iptablesutil.RavenMarkChain) + if err != nil { + errList = errList.Append(fmt.Errorf("error deleting %s chain rule: %s", iptablesutil.OutputChain, err)) + } + err = vx.iptables.ClearAndDeleteChain(iptablesutil.MangleTable, iptablesutil.RavenMarkChain) + if err != nil { + errList = errList.Append(fmt.Errorf("error deleting %s chain %s", iptablesutil.RavenMarkChain, err)) + } + + // Clean may be called more than one time, so we should ensure ip set exists + vx.ipset, err = ipsetutil.New(ravenMarkSet) + if err != nil { + errList = errList.Append(fmt.Errorf("error ensure ip set %s: %s", ravenMarkSet, err)) } + err = vx.ipset.Flush() if err != nil { - errList = errList.Append(fmt.Errorf("error listing routes: %s", err)) - return errList.AsError() + errList = errList.Append(fmt.Errorf("error flushing ipset: %s", err)) } - err = netlink.LinkDel(l) + err = vx.ipset.Destroy() if err != nil { - errList = errList.Append(fmt.Errorf("error deleting vxlan link: %s", err)) + errList = errList.Append(fmt.Errorf("error destroying ipset: %s", err)) } return errList.AsError() } +func (vx *vxlan) deleteChainRuleOnNode(ruleSpec []string) error { + if err := vx.iptables.DeleteIfExists(iptablesutil.MangleTable, iptablesutil.RavenMarkChain, ruleSpec...); err != nil { + return fmt.Errorf("error deleting chain %s rule %v: %s", iptablesutil.RavenMarkChain, ruleSpec, err) + } + return nil +} + +func (vx *vxlan) addChainRuleOnNode(ruleSpec []string) error { + if err := vx.iptables.AppendIfNotExists(iptablesutil.MangleTable, iptablesutil.RavenMarkChain, ruleSpec...); err != nil { + return fmt.Errorf("error adding chain %s rule %v: %s", iptablesutil.RavenMarkChain, ruleSpec, err) + } + return nil +} + func vxlanIP(privateIP net.IP) net.IP { privateIP = privateIP.To4() vxlanIP := make(net.IP, 4) diff --git a/pkg/networkengine/routedriver/vxlan/vxlan_test.go b/pkg/networkengine/routedriver/vxlan/vxlan_test.go index aa38541..4ae370d 100644 --- a/pkg/networkengine/routedriver/vxlan/vxlan_test.go +++ b/pkg/networkengine/routedriver/vxlan/vxlan_test.go @@ -166,6 +166,24 @@ func Test_applyRoutes(t *testing.T) { } for _, v := range tt { t.Run(v.name, func(t *testing.T) { + current := make(map[string]*netlink.Route) + for _, r := range v.current { + current[networkutil.RouteKey(r)] = r + } + v.current = current + + desired := make(map[string]*netlink.Route) + for _, r := range v.desired { + desired[networkutil.RouteKey(r)] = r + } + v.desired = desired + + expected := make(map[string]*netlink.Route) + for _, r := range v.expected { + expected[networkutil.RouteKey(r)] = r + } + v.expected = expected + actual := make(map[string]*netlink.Route) for k := range v.current { actual[k] = v.current[k] @@ -297,6 +315,24 @@ func Test_applyRule(t *testing.T) { } for _, v := range tt { t.Run(v.name, func(t *testing.T) { + current := make(map[string]*netlink.Rule) + for _, r := range v.current { + current[networkutil.RuleKey(r)] = r + } + v.current = current + + desired := make(map[string]*netlink.Rule) + for _, r := range v.desired { + desired[networkutil.RuleKey(r)] = r + } + v.desired = desired + + expected := make(map[string]*netlink.Rule) + for _, r := range v.expected { + expected[networkutil.RuleKey(r)] = r + } + v.expected = expected + actual := make(map[string]*netlink.Rule) for k := range v.current { actual[k] = v.current[k] diff --git a/pkg/networkengine/util/ipset/ipset.go b/pkg/networkengine/util/ipset/ipset.go new file mode 100644 index 0000000..ad85b6c --- /dev/null +++ b/pkg/networkengine/util/ipset/ipset.go @@ -0,0 +1,139 @@ +//go:build linux +// +build linux + +/* + * Copyright 2022 The OpenYurt Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ipsetutil + +import ( + "sync" + + "github.com/gonetx/ipset" + "k8s.io/klog/v2" +) + +type IPSetInterface interface { + List(options ...ipset.Option) (*ipset.Info, error) + Name() string + Add(entry string, options ...ipset.Option) error + Del(entry string, options ...ipset.Option) error + Flush() error + Destroy() error +} + +type ipSetWrapper struct { + ipset.IPSet +} + +var ( + once sync.Once +) + +func check() error { + if err := ipset.Check(); err != nil { + klog.ErrorS(err, "error on ipset.Check") + return err + } + if klog.V(5).Enabled() { + klog.V(5).InfoS("ipset.Check succeeded") + } + return nil +} + +func New(setName string) (IPSetInterface, error) { + var err error + once.Do(func() { + err = check() + }) + if err != nil { + return nil, err + } + + set, err := ipset.New(setName, ipset.HashNet, ipset.Exist(true)) + if err != nil { + klog.ErrorS(err, "error on ipset.Create", "setName", setName) + return nil, err + } + if klog.V(5).Enabled() { + klog.V(5).InfoS("ipset.Create succeeded", "setName", setName) + } + return &ipSetWrapper{set}, nil +} + +func (i *ipSetWrapper) List(options ...ipset.Option) (*ipset.Info, error) { + info, err := i.IPSet.List(options...) + if err != nil { + klog.ErrorS(err, "error on ipset.List", "setName", i.Name()) + return nil, err + } + if klog.V(5).Enabled() { + klog.V(5).InfoS("ipset.List succeeded", "setName", i.Name()) + } + return info, nil +} + +func (i *ipSetWrapper) Name() string { + return i.IPSet.Name() +} + +func (i *ipSetWrapper) Add(entry string, options ...ipset.Option) (err error) { + err = i.IPSet.Add(entry, options...) + if err != nil { + klog.ErrorS(err, "error on ipset.Add", "setName", i.Name(), "entry", entry, "opts", options) + return + } + if klog.V(5).Enabled() { + klog.V(5).InfoS("ipset.Add succeeded", "setName", i.Name(), "entry", entry, "opts", options) + } + return +} + +func (i *ipSetWrapper) Del(entry string, options ...ipset.Option) (err error) { + err = i.IPSet.Del(entry, options...) + if err != nil { + klog.ErrorS(err, "error on ipset.Del", "setName", i.Name(), "entry", entry) + return + } + if klog.V(5).Enabled() { + klog.V(5).InfoS("ipset.Del succeeded", "setName", i.Name(), "entry", entry) + } + return +} + +func (i *ipSetWrapper) Flush() (err error) { + err = i.IPSet.Flush() + if err != nil { + klog.ErrorS(err, "error on ipset.Flush", "setName", i.Name()) + return + } + if klog.V(5).Enabled() { + klog.V(5).InfoS("ipset.Flush succeeded", "setName", i.Name()) + } + return +} + +func (i *ipSetWrapper) Destroy() (err error) { + err = i.IPSet.Destroy() + if err != nil { + klog.ErrorS(err, "error on ipset.Destroy") + return + } + if klog.V(5).Enabled() { + klog.V(5).InfoS("ipset.Destroy succeeded", "setName", i.Name()) + } + return +} diff --git a/pkg/networkengine/util/iptables/constants.go b/pkg/networkengine/util/iptables/constants.go new file mode 100644 index 0000000..1e200a3 --- /dev/null +++ b/pkg/networkengine/util/iptables/constants.go @@ -0,0 +1,27 @@ +//go:build linux +// +build linux + +/* + * Copyright 2022 The OpenYurt Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package iptablesutil + +const ( + PreRoutingChain = "PREROUTING" + OutputChain = "OUTPUT" + RavenMarkChain = "RAVEN-MARK-CHAIN" + MangleTable = "mangle" +) diff --git a/pkg/networkengine/util/iptables/iptables.go b/pkg/networkengine/util/iptables/iptables.go new file mode 100644 index 0000000..58e4ab2 --- /dev/null +++ b/pkg/networkengine/util/iptables/iptables.go @@ -0,0 +1,115 @@ +//go:build linux +// +build linux + +/* + * Copyright 2022 The OpenYurt Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package iptablesutil + +import ( + "github.com/coreos/go-iptables/iptables" + "k8s.io/klog/v2" +) + +type IPTablesInterface interface { + NewChainIfNotExist(table, chain string) error + ClearAndDeleteChain(table, chain string) error + List(table, chain string) ([]string, error) + AppendIfNotExists(table, chain string, rulespec ...string) error + DeleteIfExists(table, chain string, rulespec ...string) error +} + +type iptablesWrapper struct { + *iptables.IPTables +} + +func New() (IPTablesInterface, error) { + ipt, err := iptables.New(iptables.IPFamily(iptables.ProtocolIPv4), iptables.Timeout(5)) + if err != nil { + klog.ErrorS(err, "error on iptables.New") + return nil, err + } + if klog.V(5).Enabled() { + klog.V(5).InfoS("iptables.New succeeded") + } + return &iptablesWrapper{ipt}, nil +} + +func (ipt *iptablesWrapper) NewChainIfNotExist(table, chain string) error { + exists, err := ipt.IPTables.ChainExists(table, chain) + if err == nil && !exists { + err = ipt.IPTables.NewChain(table, chain) + } + if err != nil { + klog.ErrorS(err, "error on iptables.NewChain", "table", table, "chain", chain, "exists", exists) + return err + } + if klog.V(5).Enabled() { + klog.V(5).InfoS("iptables.NewChain succeeded", "table", table, "chain", chain, "exists", exists) + } + return nil +} + +func (ipt *iptablesWrapper) ClearAndDeleteChain(table, chain string) error { + err := ipt.IPTables.ClearAndDeleteChain(table, chain) + if err != nil { + klog.ErrorS(err, "error on iptables.ClearAndDeleteChain", "table", table, "chain", chain) + return err + } + if klog.V(5).Enabled() { + klog.V(5).InfoS("iptables.ClearAndDeleteChain succeeded", "table", table, "chain", chain) + } + return nil +} + +func (ipt *iptablesWrapper) List(table, chain string) ([]string, error) { + rules, err := ipt.IPTables.List(table, chain) + if err != nil { + klog.ErrorS(err, "error on iptables.List", "table", table, "chain", chain) + return nil, err + } + if klog.V(5).Enabled() { + klog.V(5).InfoS("iptables.List succeeded", "table", table, "chain", chain, "rules", rules) + } + return rules, nil +} + +func (ipt *iptablesWrapper) AppendIfNotExists(table, chain string, rulespec ...string) error { + exists, err := ipt.Exists(table, chain, rulespec...) + if err == nil && !exists { + err = ipt.Append(table, chain, rulespec...) + } + if err != nil { + klog.ErrorS(err, "error on iptables.Append", "table", table, "chain", chain, "rulespec", rulespec, "exists", exists) + return err + } + if klog.V(5).Enabled() { + klog.V(5).InfoS("iptables.Append succeeded", "table", table, "chain", chain, "rulespec", rulespec, "exists", exists) + } + return nil +} + +func (ipt *iptablesWrapper) DeleteIfExists(table, chain string, rulespec ...string) error { + err := ipt.IPTables.DeleteIfExists(table, chain, rulespec...) + if err != nil { + klog.ErrorS(err, "error on iptables.Delete", "table", table, "chain", chain, "rulespec", rulespec) + return err + } + if klog.V(5).Enabled() { + klog.V(5).InfoS("iptables.Delete succeeded", "table", table, "chain", chain, "rulespec", rulespec) + } + return nil +} diff --git a/pkg/networkengine/util/utils.go b/pkg/networkengine/util/utils.go index d7d1cef..56d2c02 100644 --- a/pkg/networkengine/util/utils.go +++ b/pkg/networkengine/util/utils.go @@ -20,20 +20,22 @@ package networkutil import ( - "bytes" "fmt" "net" "syscall" + "github.com/gonetx/ipset" "github.com/vdobler/ht/errorlist" "github.com/vishvananda/netlink" "k8s.io/klog/v2" + ipsetutil "github.com/openyurtio/raven/pkg/networkengine/util/ipset" netlinkutil "github.com/openyurtio/raven/pkg/networkengine/util/netlink" ) var ( - AllZeroMAC = net.HardwareAddr{0, 0, 0, 0, 0, 0} + AllZeroMAC = net.HardwareAddr{0, 0, 0, 0, 0, 0} + AllZeroAddress = "0.0.0.0/0" ) func NewRavenRule(rulePriority int, routeTableID int) *netlink.Rule { @@ -45,16 +47,11 @@ func NewRavenRule(rulePriority int, routeTableID int) *netlink.Rule { } func RouteKey(route *netlink.Route) string { - return fmt.Sprintf("%s-%d", route.Dst, route.Table) + return route.String() } func RuleKey(rule *netlink.Rule) string { - src := "0.0.0.0/0" - srcIPNet := rule.Src - if srcIPNet != nil { - src = srcIPNet.String() - } - return src + return rule.String() } func ListRulesOnNode(routeTableID int) (map[string]*netlink.Rule, error) { @@ -89,6 +86,18 @@ func ListRoutesOnNode(routeTableID int) (map[string]*netlink.Route, error) { return ro, nil } +func ListIPSetOnNode(set ipsetutil.IPSetInterface) (map[string]bool, error) { + info, err := set.List() + if err != nil { + return nil, err + } + ro := make(map[string]bool) + for _, v := range info.Entries { + ro[v] = true + } + return ro, nil +} + func ApplyRules(current, desired map[string]*netlink.Rule) (err error) { if klog.V(5).Enabled() { klog.InfoS("applying rules", "current", current, "desired", desired) @@ -127,7 +136,7 @@ func ApplyRoutes(current, desired map[string]*netlink.Route) (err error) { continue } delete(current, k) - if !routeEqual(*ro, *v) { + if !ro.Equal(*v) { klog.InfoS("replacing route", "dst", v.Dst, "via", v.Gw, "src", v.Src, "table", v.Table) err = netlinkutil.RouteReplace(v) errList = errList.Append(err) @@ -142,6 +151,30 @@ func ApplyRoutes(current, desired map[string]*netlink.Route) (err error) { return errList.AsError() } +func ApplyIPSet(set ipsetutil.IPSetInterface, current, desired map[string]bool) (err error) { + if klog.V(5).Enabled() { + klog.InfoS("applying ipset entry", "current", current, "desired", desired) + } + errList := errorlist.List{} + for k := range desired { + _, ok := current[k] + if !ok { + klog.InfoS("adding entry", "entry", k) + err = set.Add(k, ipset.Exist(true)) + errList = errList.Append(err) + continue + } + delete(current, k) + } + // remove unwanted entries + for k := range current { + klog.InfoS("deleting ipset entry", "entry", k) + err = set.Del(k) + errList = errList.Append(err) + } + return errList.AsError() +} + func ListFDBsOnNode(link netlink.Link) (map[string]*netlink.Neigh, error) { fdbsOnNode := make(map[string]*netlink.Neigh) neighs, err := netlinkutil.NeighList(link.Attrs().Index, syscall.AF_BRIDGE) @@ -209,12 +242,3 @@ func CleanRulesOnNode(routeTableID int) error { } return errList.AsError() } - -func routeEqual(x, y netlink.Route) bool { - if x.Dst.IP.Equal(y.Dst.IP) && x.Gw.Equal(y.Gw) && - bytes.Equal(x.Dst.Mask, y.Dst.Mask) && - x.LinkIndex == y.LinkIndex { - return true - } - return false -} diff --git a/pkg/networkengine/util/utils_test.go b/pkg/networkengine/util/utils_test.go index d546fdd..aca2039 100644 --- a/pkg/networkengine/util/utils_test.go +++ b/pkg/networkengine/util/utils_test.go @@ -20,7 +20,6 @@ package networkutil import ( - "fmt" "reflect" "testing" @@ -84,9 +83,8 @@ func TestRouteKey(t *testing.T) { expect string }{ { - "normal", - route1, - fmt.Sprintf("%s-%d", route1.Dst, route1.Table), + name: "normal", + route: route1, }, } @@ -96,6 +94,7 @@ func TestRouteKey(t *testing.T) { t.Parallel() t.Logf("\tTestCase: %s", tt.name) + tt.expect = tt.route.String() get := RouteKey(tt.route) if !reflect.DeepEqual(get, tt.expect) { @@ -113,8 +112,8 @@ func TestRuleKey(t *testing.T) { expect string }{ { - "nil", - &netlink.Rule{ + name: "nil", + rule: &netlink.Rule{ SuppressIfgroup: -1, SuppressPrefixlen: -1, Priority: -1, @@ -123,11 +122,10 @@ func TestRuleKey(t *testing.T) { Goto: -1, Flow: -1, }, - "0.0.0.0/0", }, { - "normal", - &netlink.Rule{ + name: "normal", + rule: &netlink.Rule{ SuppressIfgroup: -1, SuppressPrefixlen: -1, Priority: -1, @@ -137,7 +135,6 @@ func TestRuleKey(t *testing.T) { Flow: -1, Src: netlink.NewIPNet([]byte{0xC0, 0xA8, 0x00, 0x01}), }, - "192.168.0.1/32", }, } @@ -146,7 +143,7 @@ func TestRuleKey(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() t.Logf("\tTestCase: %s", tt.name) - + tt.expect = tt.rule.String() get := RuleKey(tt.rule) if !reflect.DeepEqual(get, tt.expect) {