Skip to content

Commit

Permalink
Fix issues in code and naming
Browse files Browse the repository at this point in the history
Signed-off-by: Aswin Suryanarayanan <[email protected]>
  • Loading branch information
aswinsuryan committed Jul 10, 2023
1 parent 9bdc5ca commit 984fe4a
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 140 deletions.
49 changes: 6 additions & 43 deletions pkg/routeagent_driver/handlers/ovn/host_networking.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,11 @@ package ovn

import (
"fmt"
"net"
"os"
"syscall"

"github.com/pkg/errors"
"github.com/submariner-io/admiral/pkg/log"
"github.com/submariner-io/submariner/pkg/routeagent_driver/constants"
"github.com/vishvananda/netlink"
"k8s.io/apimachinery/pkg/util/sets"
"os"
)

const (
Expand Down Expand Up @@ -57,7 +53,11 @@ func (ovn *Handler) updateHostNetworkDataplane() error {
return errors.Wrapf(err, "error removing routing rule")
}

nextHop, err := ovn.getNextHopOnK8sMgmtIntf()
if ovn.localEndpoint == nil {
return fmt.Errorf("missing localEndpoint info")
}

nextHop, err := getNextHopOnK8sMgmtIntf(ovn.config.ClusterCidr)
if err != nil {
return errors.Wrapf(err, "getNextHopOnK8sMgmtIntf returned error")
}
Expand Down Expand Up @@ -109,40 +109,3 @@ func (ovn *Handler) programRulesForRemoteSubnets(subnets []string, ruleFunc func

return nil
}

func (ovn *Handler) getNextHopOnK8sMgmtIntf() (*net.IP, error) {
if ovn.localEndpoint == nil {
return nil, fmt.Errorf("missing localEndpoint info")
}

link, err := netlink.LinkByName(OVNK8sMgmntIntfName)

if err != nil && !errors.Is(err, netlink.LinkNotFoundError{}) {
return nil, errors.Wrapf(err, "error retrieving link by name %q", OVNK8sMgmntIntfName)
}

currentRouteList, err := netlink.RouteList(link, syscall.AF_INET)
if err != nil {
return nil, errors.Wrapf(err, "error retrieving routes on the link %s", OVNK8sMgmntIntfName)
}

for i := range currentRouteList {
logger.V(log.DEBUG).Infof("Processing route %v", currentRouteList[i])

if currentRouteList[i].Dst == nil || currentRouteList[i].Gw == nil {
continue
}

// To support hostNetworking use-case the route-agent handler programs default route in table 150
// with nexthop matching the nexthop on the ovn-k8s-mp0 interface. Basically, we want the Submariner
// managed traffic to be forwarded to the ovn_cluster_router and pass through the CNI network so that
// it reaches the active gateway node in the cluster via the submariner pipeline.
for _, subnet := range ovn.config.ClusterCidr {
if currentRouteList[i].Dst.String() == subnet {
return &currentRouteList[i].Gw, nil
}
}
}

return nil, fmt.Errorf("could not find the route to %v via %q", ovn.config.ClusterCidr, OVNK8sMgmntIntfName)
}
72 changes: 12 additions & 60 deletions pkg/routeagent_driver/handlers/ovn/submarinerGWRouteHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,14 @@ package ovn

import (
"context"
"fmt"
"net"
"sync"
"syscall"

"github.com/pkg/errors"
"github.com/submariner-io/admiral/pkg/log"
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
submarinerClientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned"
"github.com/submariner-io/submariner/pkg/event"
"github.com/submariner-io/submariner/pkg/routeagent_driver/environment"
"github.com/vishvananda/netlink"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
"sync"
)

type GatewayRouteHandler struct {
Expand Down Expand Up @@ -76,7 +69,7 @@ func (gatewayRouteHandler *GatewayRouteHandler) LocalEndpointCreated(endpoint *s
gatewayRouteHandler.mutex.Lock()
defer gatewayRouteHandler.mutex.Unlock()

nextHopIP, err := gatewayRouteHandler.getNextHopOnK8sMgmtIntf(endpoint)
nextHopIP, err := getNextHopOnK8sMgmtIntf(gatewayRouteHandler.config.ClusterCidr)

if err != nil || nextHopIP == nil {
return errors.Wrapf(err, "error getting the ovn kubernetes management interface IP")
Expand All @@ -92,16 +85,12 @@ func (gatewayRouteHandler *GatewayRouteHandler) RemoteEndpointCreated(endpoint *
defer gatewayRouteHandler.mutex.Unlock()

if gatewayRouteHandler.isGateway {
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
_, err := gatewayRouteHandler.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Create(context.TODO(),
gatewayRouteHandler.getGatewayRoute(endpoint), metav1.CreateOptions{})
if !apierrors.IsAlreadyExists(err) {
return errors.Wrapf(err, "error processing the remote endpoint creation for %q", endpoint.Name)
}
return nil
})

return errors.Wrapf(retryErr, "processing the remote endpoint event failed even after retry")
_, err := gatewayRouteHandler.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Create(context.TODO(),
gatewayRouteHandler.newGatewayRoute(endpoint), metav1.CreateOptions{})
if !apierrors.IsAlreadyExists(err) {
return errors.Wrapf(err, "error processing the remote endpoint creation for %q", endpoint.Name)
}
}

return nil
Expand All @@ -123,35 +112,6 @@ func (gatewayRouteHandler *GatewayRouteHandler) RemoteEndpointRemoved(endpoint *
return nil
}

func (gatewayRouteHandler *GatewayRouteHandler) getNextHopOnK8sMgmtIntf(*submarinerv1.Endpoint) (*net.IP, error) {
link, err := netlink.LinkByName(OVNK8sMgmntIntfName)

if err != nil && !errors.Is(err, netlink.LinkNotFoundError{}) {
return nil, errors.Wrapf(err, "error retrieving link by name %q", OVNK8sMgmntIntfName)
}

currentRouteList, err := netlink.RouteList(link, syscall.AF_INET)
if err != nil {
return nil, errors.Wrapf(err, "error retrieving routes on the link %s", OVNK8sMgmntIntfName)
}

for i := range currentRouteList {
logger.V(log.DEBUG).Infof("Processing route %v", currentRouteList[i])

if currentRouteList[i].Dst == nil || currentRouteList[i].Gw == nil {
continue
}

for _, subnet := range gatewayRouteHandler.config.ClusterCidr {
if currentRouteList[i].Dst.String() == subnet {
return &currentRouteList[i].Gw, nil
}
}
}

return nil, fmt.Errorf("could not find the route to %v via %q", gatewayRouteHandler.config.ClusterCidr, OVNK8sMgmntIntfName)
}

func (gatewayRouteHandler *GatewayRouteHandler) TransitionToNonGateway() error {
gatewayRouteHandler.mutex.Lock()
defer gatewayRouteHandler.mutex.Unlock()
Expand All @@ -168,19 +128,11 @@ func (gatewayRouteHandler *GatewayRouteHandler) TransitionToGateway() error {
gatewayRouteHandler.isGateway = true

for _, endpoint := range gatewayRouteHandler.remoteEndpoints {
err := gatewayRouteHandler.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).DeleteCollection(context.TODO(),
metav1.DeleteOptions{}, metav1.ListOptions{
LabelSelector: "owner=submariner.io/ovn",
})
if err != nil {
return errors.Wrapf(err, "error deleting gatewayRoute %q", endpoint.Name)
}
}

for _, endpoint := range gatewayRouteHandler.remoteEndpoints {
if _, err := gatewayRouteHandler.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Create(context.TODO(),
gatewayRouteHandler.getGatewayRoute(endpoint), metav1.CreateOptions{}); err != nil {
if _, err := gatewayRouteHandler.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Update(context.TODO(),
gatewayRouteHandler.newGatewayRoute(endpoint), metav1.UpdateOptions{}); err != nil {
if !apierrors.IsNotFound(err) {
_, err = gatewayRouteHandler.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Create(context.TODO(),
gatewayRouteHandler.newGatewayRoute(endpoint), metav1.CreateOptions{})
return errors.Wrapf(err, "error deleting gatewayRoute %q", endpoint.Name)
}
}
Expand All @@ -189,7 +141,7 @@ func (gatewayRouteHandler *GatewayRouteHandler) TransitionToGateway() error {
return nil
}

func (gatewayRouteHandler *GatewayRouteHandler) getGatewayRoute(endpoint *submarinerv1.Endpoint) *submarinerv1.GatewayRoute {
func (gatewayRouteHandler *GatewayRouteHandler) newGatewayRoute(endpoint *submarinerv1.Endpoint) *submarinerv1.GatewayRoute {
return &submarinerv1.GatewayRoute{
TypeMeta: metav1.TypeMeta{
Kind: "GatewayRoute",
Expand Down
66 changes: 29 additions & 37 deletions pkg/routeagent_driver/handlers/ovn/submarinerNonGWRouteHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ package ovn

import (
"context"
"os"
"sync"

"github.com/pkg/errors"
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
submarinerClientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned"
"github.com/submariner-io/submariner/pkg/event"
nodeutil "github.com/submariner-io/submariner/pkg/node"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -56,14 +56,9 @@ func NewNonGatewayRouteHandler(smClientSet submarinerClientset.Interface, k8sCli
func (nonGatewayRouteHandler *NonGatewayRouteHandler) Init() error {
logger.Infof("Starting NonGatewayRouteHandler")

nodeName, ok := os.LookupEnv("NODE_NAME")
if !ok {
return errors.New("error getting the Node name")
}

node, err := nonGatewayRouteHandler.k8sClientSet.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
node, err := nodeutil.GetLocalNode(nonGatewayRouteHandler.k8sClientSet)
if err != nil {
return errors.Wrapf(err, "error getting the g/w node: %q", nodeName)
return errors.Wrap(err, "error getting the g/w node")
}

annotations := node.GetAnnotations()
Expand All @@ -80,7 +75,7 @@ func (nonGatewayRouteHandler *NonGatewayRouteHandler) Init() error {
}

func (nonGatewayRouteHandler *NonGatewayRouteHandler) GetName() string {
return "submariner-gw-route-handler"
return "submariner-nongw-route-handler"
}

func (nonGatewayRouteHandler *NonGatewayRouteHandler) GetNetworkPlugins() []string {
Expand All @@ -102,21 +97,21 @@ func (nonGatewayRouteHandler *NonGatewayRouteHandler) RemoteEndpointCreated(endp
nonGatewayRouteHandler.mutex.Lock()
defer nonGatewayRouteHandler.mutex.Unlock()

if nonGatewayRouteHandler.isGateway && nonGatewayRouteHandler.transitSwitchIP != "" {
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
_, err := nonGatewayRouteHandler.smClient.SubmarinerV1().
NonGatewayRoutes(endpoint.Namespace).Create(context.TODO(),
nonGatewayRouteHandler.getNonGatewayRoute(endpoint), metav1.CreateOptions{})
if !apierrors.IsAlreadyExists(err) {
return errors.Wrapf(err, "error processing the remote endpoint create event for %q", endpoint.Name)
}
return nil
})

return errors.Wrapf(retryErr, "processing the remote endpoint event failed even after retry")
if !nonGatewayRouteHandler.isGateway || nonGatewayRouteHandler.transitSwitchIP == "" {
return nil
}

return nil
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
_, err := nonGatewayRouteHandler.smClient.SubmarinerV1().
NonGatewayRoutes(endpoint.Namespace).Create(context.TODO(),
nonGatewayRouteHandler.getNonGatewayRoute(endpoint), metav1.CreateOptions{})
if !apierrors.IsAlreadyExists(err) {
return errors.Wrapf(err, "error processing the remote endpoint create event for %q", endpoint.Name)
}
return nil
})

return errors.Wrapf(retryErr, "processing the remote endpoint event failed even after retry")
}

func (nonGatewayRouteHandler *NonGatewayRouteHandler) RemoteEndpointRemoved(endpoint *submarinerv1.Endpoint) error {
Expand All @@ -125,11 +120,13 @@ func (nonGatewayRouteHandler *NonGatewayRouteHandler) RemoteEndpointRemoved(endp

delete(nonGatewayRouteHandler.remoteEndpoints, endpoint.Name)

if nonGatewayRouteHandler.isGateway && nonGatewayRouteHandler.transitSwitchIP != "" {
if err := nonGatewayRouteHandler.smClient.SubmarinerV1().NonGatewayRoutes(endpoint.Namespace).Delete(context.TODO(),
endpoint.Name, metav1.DeleteOptions{}); err != nil {
return errors.Wrapf(err, "error deleting nonGatewayRoute %q", endpoint.Name)
}
if !nonGatewayRouteHandler.isGateway || nonGatewayRouteHandler.transitSwitchIP == "" {
return nil
}

if err := nonGatewayRouteHandler.smClient.SubmarinerV1().NonGatewayRoutes(endpoint.Namespace).Delete(context.TODO(),
endpoint.Name, metav1.DeleteOptions{}); err != nil {
return errors.Wrapf(err, "error deleting nonGatewayRoute %q", endpoint.Name)
}

return nil
Expand All @@ -140,14 +137,6 @@ func (nonGatewayRouteHandler *NonGatewayRouteHandler) TransitionToNonGateway() e
defer nonGatewayRouteHandler.mutex.Unlock()

nonGatewayRouteHandler.isGateway = false
if nonGatewayRouteHandler.transitSwitchIP != "" {
for _, endpoint := range nonGatewayRouteHandler.remoteEndpoints {
if err := nonGatewayRouteHandler.smClient.SubmarinerV1().NonGatewayRoutes(endpoint.Namespace).Delete(context.TODO(),
endpoint.Name, metav1.DeleteOptions{}); err != nil {
return errors.Wrapf(err, "error deleting nonGatewayRoute %q", endpoint.Name)
}
}
}

return nil
}
Expand All @@ -159,9 +148,12 @@ func (nonGatewayRouteHandler *NonGatewayRouteHandler) TransitionToGateway() erro
nonGatewayRouteHandler.isGateway = true
if nonGatewayRouteHandler.transitSwitchIP != "" {
for _, endpoint := range nonGatewayRouteHandler.remoteEndpoints {
if _, err := nonGatewayRouteHandler.smClient.SubmarinerV1().NonGatewayRoutes(endpoint.Namespace).Create(context.TODO(),
nonGatewayRouteHandler.getNonGatewayRoute(endpoint), metav1.CreateOptions{}); err != nil {
if _, err := nonGatewayRouteHandler.smClient.SubmarinerV1().NonGatewayRoutes(endpoint.Namespace).Update(context.TODO(),
nonGatewayRouteHandler.getNonGatewayRoute(endpoint), metav1.UpdateOptions{}); err != nil {
if !apierrors.IsNotFound(err) {
nonGatewayRouteHandler.smClient.SubmarinerV1().NonGatewayRoutes(endpoint.Namespace).Create(context.TODO(),
nonGatewayRouteHandler.getNonGatewayRoute(endpoint), metav1.CreateOptions{})

return errors.Wrapf(err, "error deleting nonGatewayRoute %q", endpoint.Name)
}
}
Expand Down
44 changes: 44 additions & 0 deletions pkg/routeagent_driver/handlers/ovn/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package ovn

import (
"fmt"
"github.com/pkg/errors"
"github.com/submariner-io/admiral/pkg/log"
"github.com/vishvananda/netlink"
"net"
"syscall"
)

func getNextHopOnK8sMgmtIntf(clusterCidr []string) (*net.IP, error) {

link, err := netlink.LinkByName(OVNK8sMgmntIntfName)

if err != nil && !errors.Is(err, netlink.LinkNotFoundError{}) {
return nil, errors.Wrapf(err, "error retrieving link by name %q", OVNK8sMgmntIntfName)
}

currentRouteList, err := netlink.RouteList(link, syscall.AF_INET)
if err != nil {
return nil, errors.Wrapf(err, "error retrieving routes on the link %s", OVNK8sMgmntIntfName)
}

for i := range currentRouteList {
logger.V(log.DEBUG).Infof("Processing route %v", currentRouteList[i])

if currentRouteList[i].Dst == nil || currentRouteList[i].Gw == nil {
continue
}

// To support hostNetworking use-case the route-agent handler programs default route in table 150
// with nexthop matching the nexthop on the ovn-k8s-mp0 interface. Basically, we want the Submariner
// managed traffic to be forwarded to the ovn_cluster_router and pass through the CNI network so that
// it reaches the active gateway node in the cluster via the submariner pipeline.
for _, subnet := range clusterCidr {
if currentRouteList[i].Dst.String() == subnet {
return &currentRouteList[i].Gw, nil
}
}
}

return nil, fmt.Errorf("could not find the route to %v via %q", clusterCidr, OVNK8sMgmntIntfName)
}

0 comments on commit 984fe4a

Please sign in to comment.