diff --git a/pkg/routeagent_driver/constants/constants.go b/pkg/routeagent_driver/constants/constants.go index 251b8822cb..61f1ef106a 100644 --- a/pkg/routeagent_driver/constants/constants.go +++ b/pkg/routeagent_driver/constants/constants.go @@ -53,4 +53,6 @@ const ( NATTable = "nat" FilterTable = "filter" + + OvnTransitSwitchIPAnnotation = "k8s.ovn.org/node-transit-switch-port-ifaddr" ) diff --git a/pkg/routeagent_driver/handlers/ovn/submarinerGWRouteHandler.go b/pkg/routeagent_driver/handlers/ovn/submarinerGWRouteHandler.go index 1258913f0a..1e107cfe8d 100644 --- a/pkg/routeagent_driver/handlers/ovn/submarinerGWRouteHandler.go +++ b/pkg/routeagent_driver/handlers/ovn/submarinerGWRouteHandler.go @@ -23,12 +23,16 @@ import ( "sync" "github.com/pkg/errors" + "github.com/submariner-io/admiral/pkg/log" + "github.com/submariner-io/admiral/pkg/resource" + "github.com/submariner-io/admiral/pkg/util" submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" submarinerClientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned" "github.com/submariner-io/submariner/pkg/event" "github.com/submariner-io/submariner/pkg/routeagent_driver/environment" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" ) type GatewayRouteHandler struct { @@ -50,39 +54,39 @@ func NewGatewayRouteHandler(env *environment.Specification, smClientSet submarin } } -func (gatewayRouteHandler *GatewayRouteHandler) Init() error { - logger.Infof("Starting GatewayRouteHandler") +func (h *GatewayRouteHandler) Init() error { + logger.Info("Starting GatewayRouteHandler") nextHopIP, err := getNextHopOnK8sMgmtIntf() if err != nil || nextHopIP == "" { return errors.Wrapf(err, "error getting the ovn kubernetes management interface IP") } - gatewayRouteHandler.nextHopIP = nextHopIP + h.nextHopIP = nextHopIP return nil } -func (gatewayRouteHandler *GatewayRouteHandler) GetName() string { +func (h *GatewayRouteHandler) GetName() string { return "submariner-gw-route-handler" } -func (gatewayRouteHandler *GatewayRouteHandler) GetNetworkPlugins() []string { +func (h *GatewayRouteHandler) GetNetworkPlugins() []string { // TODO enable when we switch to new implementation // return []string{cni.OVNKubernetes} return []string{} } -func (gatewayRouteHandler *GatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.Endpoint) error { - gatewayRouteHandler.mutex.Lock() - defer gatewayRouteHandler.mutex.Unlock() +func (h *GatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.Endpoint) error { + h.mutex.Lock() + defer h.mutex.Unlock() - gatewayRouteHandler.remoteEndpoints[endpoint.Name] = endpoint + h.remoteEndpoints[endpoint.Name] = endpoint - if gatewayRouteHandler.isGateway { - _, err := gatewayRouteHandler.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Create(context.TODO(), - gatewayRouteHandler.newGatewayRoute(endpoint), metav1.CreateOptions{}) - if !apierrors.IsAlreadyExists(err) { + if h.isGateway { + _, err := h.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Create(context.TODO(), + h.newGatewayRoute(endpoint), metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { return errors.Wrapf(err, "error processing the remote endpoint creation for %q", endpoint.Name) } } @@ -90,15 +94,15 @@ func (gatewayRouteHandler *GatewayRouteHandler) RemoteEndpointCreated(endpoint * return nil } -func (gatewayRouteHandler *GatewayRouteHandler) RemoteEndpointRemoved(endpoint *submarinerv1.Endpoint) error { - gatewayRouteHandler.mutex.Lock() - defer gatewayRouteHandler.mutex.Unlock() +func (h *GatewayRouteHandler) RemoteEndpointRemoved(endpoint *submarinerv1.Endpoint) error { + h.mutex.Lock() + defer h.mutex.Unlock() - delete(gatewayRouteHandler.remoteEndpoints, endpoint.Name) + delete(h.remoteEndpoints, endpoint.Name) - if gatewayRouteHandler.isGateway { - if err := gatewayRouteHandler.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Delete(context.TODO(), - endpoint.Name, metav1.DeleteOptions{}); err != nil { + if h.isGateway { + if err := h.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Delete(context.TODO(), + endpoint.Name, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { return errors.Wrapf(err, "error deleting gatewayRoute %q", endpoint.Name) } } @@ -106,43 +110,70 @@ func (gatewayRouteHandler *GatewayRouteHandler) RemoteEndpointRemoved(endpoint * return nil } -func (gatewayRouteHandler *GatewayRouteHandler) TransitionToNonGateway() error { - gatewayRouteHandler.mutex.Lock() - defer gatewayRouteHandler.mutex.Unlock() +func (h *GatewayRouteHandler) TransitionToNonGateway() error { + h.mutex.Lock() + defer h.mutex.Unlock() - gatewayRouteHandler.isGateway = false + h.isGateway = false return nil } -func (gatewayRouteHandler *GatewayRouteHandler) TransitionToGateway() error { - gatewayRouteHandler.mutex.Lock() - defer gatewayRouteHandler.mutex.Unlock() - - gatewayRouteHandler.isGateway = true +func (h *GatewayRouteHandler) TransitionToGateway() error { + h.mutex.Lock() + defer h.mutex.Unlock() + + h.isGateway = true + + for _, endpoint := range h.remoteEndpoints { + gwr := h.newGatewayRoute(endpoint) + result, err := util.CreateOrUpdate(context.TODO(), h.gatewayResourceInterface(endpoint.Namespace), gwr, + func(existing runtime.Object) (runtime.Object, error) { + existingGwr := gwr + return existingGwr, nil + }) + if err != nil { + return errors.Wrapf(err, "error creating/updating GatewayRoute") + } - for _, endpoint := range gatewayRouteHandler.remoteEndpoints { - if _, err := gatewayRouteHandler.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Update(context.TODO(), - gatewayRouteHandler.newGatewayRoute(endpoint), metav1.UpdateOptions{}); err != nil { - if !apierrors.IsNotFound(err) { - _, err = gatewayRouteHandler.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Create(context.TODO(), - gatewayRouteHandler.newGatewayRoute(endpoint), metav1.CreateOptions{}) - return errors.Wrapf(err, "error updating gatewayRoute %q", endpoint.Name) - } + if result == util.OperationResultCreated { + logger.V(log.TRACE).Infof("GatewayRoute does not exist - created: %+v", gwr) + } else if result == util.OperationResultUpdated { + logger.V(log.TRACE).Infof("GatewayRoute already exists - updated %+v", gwr) + } else { + logger.V(log.TRACE).Info("GatewayRoute already exists but doesn't need updating") } } return nil } -func (gatewayRouteHandler *GatewayRouteHandler) newGatewayRoute(endpoint *submarinerv1.Endpoint) *submarinerv1.GatewayRoute { +func (h *GatewayRouteHandler) newGatewayRoute(endpoint *submarinerv1.Endpoint) *submarinerv1.GatewayRoute { return &submarinerv1.GatewayRoute{ ObjectMeta: metav1.ObjectMeta{ Name: endpoint.Name, }, RoutePolicySpec: submarinerv1.RoutePolicySpec{ RemoteCIDRs: endpoint.Spec.Subnets, - NextHops: []string{gatewayRouteHandler.nextHopIP}, + NextHops: []string{h.nextHopIP}, + }, + } +} + +func (h *GatewayRouteHandler) gatewayResourceInterface(namespace string) resource.Interface { + //nolint:wrapcheck // These functions are pass-through wrappers for the k8s APIs. + return &resource.InterfaceFuncs{ + GetFunc: func(ctx context.Context, name string, options metav1.GetOptions) (runtime.Object, error) { + return h.smClient.SubmarinerV1().GatewayRoutes(namespace).Get(ctx, name, options) + }, + CreateFunc: func(ctx context.Context, obj runtime.Object, options metav1.CreateOptions) (runtime.Object, error) { + return h.smClient.SubmarinerV1().GatewayRoutes(namespace).Create(ctx, obj.(*submarinerv1.GatewayRoute), options) + }, + UpdateFunc: func(ctx context.Context, obj runtime.Object, options metav1.UpdateOptions) (runtime.Object, error) { + return h.smClient.SubmarinerV1().GatewayRoutes(namespace).Update(ctx, obj.(*submarinerv1.GatewayRoute), options) + }, + DeleteFunc: func(ctx context.Context, name string, options metav1.DeleteOptions) error { + return h.smClient.SubmarinerV1().GatewayRoutes(namespace).Delete(ctx, name, options) }, } } diff --git a/pkg/routeagent_driver/handlers/ovn/submarinerNonGWRouteHandler.go b/pkg/routeagent_driver/handlers/ovn/submarinerNonGWRouteHandler.go index 0a37af3307..ad2fd97023 100644 --- a/pkg/routeagent_driver/handlers/ovn/submarinerNonGWRouteHandler.go +++ b/pkg/routeagent_driver/handlers/ovn/submarinerNonGWRouteHandler.go @@ -23,19 +23,20 @@ import ( "sync" "github.com/pkg/errors" + "github.com/submariner-io/admiral/pkg/log" + "github.com/submariner-io/admiral/pkg/resource" + "github.com/submariner-io/admiral/pkg/util" submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" submarinerClientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned" "github.com/submariner-io/submariner/pkg/event" nodeutil "github.com/submariner-io/submariner/pkg/node" + "github.com/submariner-io/submariner/pkg/routeagent_driver/constants" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" clientset "k8s.io/client-go/kubernetes" ) -const ( - ovnTransitSwitchIPAnnotation = "k8s.ovn.org/node-transit-switch-port-ifaddr" -) - type NonGatewayRouteHandler struct { event.HandlerBase mutex sync.Mutex @@ -55,10 +56,10 @@ func NewNonGatewayRouteHandler(smClientSet submarinerClientset.Interface, k8sCli } } -func (nonGatewayRouteHandler *NonGatewayRouteHandler) Init() error { - logger.Infof("Starting NonGatewayRouteHandler") +func (h *NonGatewayRouteHandler) Init() error { + logger.Info("Starting NonGatewayRouteHandler") - node, err := nodeutil.GetLocalNode(nonGatewayRouteHandler.k8sClientSet) + node, err := nodeutil.GetLocalNode(h.k8sClientSet) if err != nil { return errors.Wrap(err, "error getting the g/w node") } @@ -66,99 +67,107 @@ func (nonGatewayRouteHandler *NonGatewayRouteHandler) Init() error { annotations := node.GetAnnotations() // TODO transitSwitchIP changes support needs to be added. - transitSwitchIP, ok := annotations[ovnTransitSwitchIPAnnotation] + transitSwitchIP, ok := annotations[constants.OvnTransitSwitchIPAnnotation] if !ok { logger.Infof("No transit switch IP configured") return nil } - nonGatewayRouteHandler.transitSwitchIP, err = jsonToIP(transitSwitchIP) - if !ok { - return errors.Wrapf(err, "error parsing the transit switch IP") - } + h.transitSwitchIP, err = jsonToIP(transitSwitchIP) - return nil + return errors.Wrapf(err, "error parsing the transit switch IP") } -func (nonGatewayRouteHandler *NonGatewayRouteHandler) GetName() string { +func (h *NonGatewayRouteHandler) GetName() string { return "submariner-nongw-route-handler" } -func (nonGatewayRouteHandler *NonGatewayRouteHandler) GetNetworkPlugins() []string { +func (h *NonGatewayRouteHandler) GetNetworkPlugins() []string { // TODO enable when we switch to new implementation // return []string{cni.OVNKubernetes} return []string{} } -func (nonGatewayRouteHandler *NonGatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.Endpoint) error { - nonGatewayRouteHandler.mutex.Lock() - defer nonGatewayRouteHandler.mutex.Unlock() +func (h *NonGatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.Endpoint) error { + h.mutex.Lock() + defer h.mutex.Unlock() - nonGatewayRouteHandler.remoteEndpoints[endpoint.Name] = endpoint + h.remoteEndpoints[endpoint.Name] = endpoint - if !nonGatewayRouteHandler.isGateway || nonGatewayRouteHandler.transitSwitchIP == "" { + if !h.isGateway || h.transitSwitchIP == "" { return nil } - _, err := nonGatewayRouteHandler.smClient.SubmarinerV1(). + _, err := h.smClient.SubmarinerV1(). NonGatewayRoutes(endpoint.Namespace).Create(context.TODO(), - nonGatewayRouteHandler.newNonGatewayRoute(endpoint), metav1.CreateOptions{}) - if !apierrors.IsAlreadyExists(err) { + h.newNonGatewayRoute(endpoint), metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { return errors.Wrapf(err, "error processing the remote endpoint create event for %q", endpoint.Name) } return nil } -func (nonGatewayRouteHandler *NonGatewayRouteHandler) RemoteEndpointRemoved(endpoint *submarinerv1.Endpoint) error { - nonGatewayRouteHandler.mutex.Lock() - defer nonGatewayRouteHandler.mutex.Unlock() - delete(nonGatewayRouteHandler.remoteEndpoints, endpoint.Name) +func (h *NonGatewayRouteHandler) RemoteEndpointRemoved(endpoint *submarinerv1.Endpoint) error { + h.mutex.Lock() + defer h.mutex.Unlock() + delete(h.remoteEndpoints, endpoint.Name) - if !nonGatewayRouteHandler.isGateway || nonGatewayRouteHandler.transitSwitchIP == "" { + if !h.isGateway || h.transitSwitchIP == "" { return nil } - if err := nonGatewayRouteHandler.smClient.SubmarinerV1().NonGatewayRoutes(endpoint.Namespace).Delete(context.TODO(), - endpoint.Name, metav1.DeleteOptions{}); err != nil { + if err := h.smClient.SubmarinerV1().NonGatewayRoutes(endpoint.Namespace).Delete(context.TODO(), + endpoint.Name, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { return errors.Wrapf(err, "error deleting nonGatewayRoute %q", endpoint.Name) } return nil } -func (nonGatewayRouteHandler *NonGatewayRouteHandler) TransitionToNonGateway() error { - nonGatewayRouteHandler.mutex.Lock() - defer nonGatewayRouteHandler.mutex.Unlock() +func (h *NonGatewayRouteHandler) TransitionToNonGateway() error { + h.mutex.Lock() + defer h.mutex.Unlock() - nonGatewayRouteHandler.isGateway = false + h.isGateway = false return nil } -func (nonGatewayRouteHandler *NonGatewayRouteHandler) TransitionToGateway() error { - nonGatewayRouteHandler.mutex.Lock() - defer nonGatewayRouteHandler.mutex.Unlock() - - nonGatewayRouteHandler.isGateway = true - if nonGatewayRouteHandler.transitSwitchIP != "" { - for _, endpoint := range nonGatewayRouteHandler.remoteEndpoints { - if _, err := nonGatewayRouteHandler.smClient.SubmarinerV1().NonGatewayRoutes(endpoint.Namespace).Update(context.TODO(), - nonGatewayRouteHandler.newNonGatewayRoute(endpoint), metav1.UpdateOptions{}); err != nil { - if !apierrors.IsNotFound(err) { - _, err = nonGatewayRouteHandler.smClient.SubmarinerV1().NonGatewayRoutes(endpoint.Namespace).Create(context.TODO(), - nonGatewayRouteHandler.newNonGatewayRoute(endpoint), metav1.CreateOptions{}) - - return errors.Wrapf(err, "error updating nonGatewayRoute %q", endpoint.Name) - } - } +func (h *NonGatewayRouteHandler) TransitionToGateway() error { + h.mutex.Lock() + defer h.mutex.Unlock() + + h.isGateway = true + + if h.transitSwitchIP == "" { + return nil + } + + for _, endpoint := range h.remoteEndpoints { + ngwr := h.newNonGatewayRoute(endpoint) + result, err := util.CreateOrUpdate(context.TODO(), h.nonGatewayResourceInterface(endpoint.Namespace), ngwr, + func(existing runtime.Object) (runtime.Object, error) { + existingNgwr := ngwr + return existingNgwr, nil + }) + if err != nil { + return errors.Wrapf(err, "error creating/updating NonGatewayRoute") + } + + if result == util.OperationResultCreated { + logger.V(log.TRACE).Infof("NonGatewayRoute does not exist - created: %+v", ngwr) + } else if result == util.OperationResultUpdated { + logger.V(log.TRACE).Infof("NonGatewayRoute already exists - updated %+v", ngwr) + } else { + logger.V(log.TRACE).Info("NonGatewayRoute already exists but doesn't need updating") } } return nil } -func (nonGatewayRouteHandler *NonGatewayRouteHandler) newNonGatewayRoute(endpoint *submarinerv1.Endpoint) *submarinerv1.NonGatewayRoute { +func (h *NonGatewayRouteHandler) newNonGatewayRoute(endpoint *submarinerv1.Endpoint) *submarinerv1.NonGatewayRoute { return &submarinerv1.NonGatewayRoute{ ObjectMeta: metav1.ObjectMeta{ Name: endpoint.Name, @@ -166,7 +175,25 @@ func (nonGatewayRouteHandler *NonGatewayRouteHandler) newNonGatewayRoute(endpoin }, RoutePolicySpec: submarinerv1.RoutePolicySpec{ RemoteCIDRs: endpoint.Spec.Subnets, - NextHops: []string{nonGatewayRouteHandler.transitSwitchIP}, + NextHops: []string{h.transitSwitchIP}, + }, + } +} + +func (h *NonGatewayRouteHandler) nonGatewayResourceInterface(namespace string) resource.Interface { + //nolint:wrapcheck // These functions are pass-through wrappers for the k8s APIs. + return &resource.InterfaceFuncs{ + GetFunc: func(ctx context.Context, name string, options metav1.GetOptions) (runtime.Object, error) { + return h.smClient.SubmarinerV1().NonGatewayRoutes(namespace).Get(ctx, name, options) + }, + CreateFunc: func(ctx context.Context, obj runtime.Object, options metav1.CreateOptions) (runtime.Object, error) { + return h.smClient.SubmarinerV1().NonGatewayRoutes(namespace).Create(ctx, obj.(*submarinerv1.NonGatewayRoute), options) + }, + UpdateFunc: func(ctx context.Context, obj runtime.Object, options metav1.UpdateOptions) (runtime.Object, error) { + return h.smClient.SubmarinerV1().NonGatewayRoutes(namespace).Update(ctx, obj.(*submarinerv1.NonGatewayRoute), options) + }, + DeleteFunc: func(ctx context.Context, name string, options metav1.DeleteOptions) error { + return h.smClient.SubmarinerV1().NonGatewayRoutes(namespace).Delete(ctx, name, options) }, } } diff --git a/pkg/routeagent_driver/handlers/ovn/utils.go b/pkg/routeagent_driver/handlers/ovn/utils.go index 53aebfc9f0..3faeab1134 100644 --- a/pkg/routeagent_driver/handlers/ovn/utils.go +++ b/pkg/routeagent_driver/handlers/ovn/utils.go @@ -24,51 +24,38 @@ import ( "net" "github.com/pkg/errors" + subMNetLink "github.com/submariner-io/submariner/pkg/netlink" "github.com/vishvananda/netlink" ) func getNextHopOnK8sMgmtIntf() (string, error) { - link, err := netlink.LinkByName(OVNK8sMgmntIntfName) + netLink := subMNetLink.New() + + link, err := netLink.LinkByName(OVNK8sMgmntIntfName) if err != nil { return "", errors.Wrapf(err, "failed to retrieve link by name") } - addrs, err := netlink.AddrList(link, 0) + addrs, err := netlink.AddrList(link, netlink.FAMILY_V4) if err != nil || len(addrs) == 0 { return "", errors.Wrapf(err, "failed to retrieve addresses for link") } for _, addr := range addrs { if addr.IPNet != nil { - ok, err := isIPv4CIDR(addr.IPNet.String()) - if err != nil { - return "", err - } else if ok { - return addr.IPNet.IP.String(), nil - } + return addr.IPNet.IP.String(), nil } } return "", nil } -func isIPv4CIDR(address string) (bool, error) { - _, iPnet, err := net.ParseCIDR(address) - if err != nil { - return false, errors.Wrapf(err, "Error parsing IP address %v", iPnet) - } - - ip := iPnet.IP - - return ip != nil && ip.To4() != nil, nil -} - func jsonToIP(jsonData string) (string, error) { var data map[string]string err := json.Unmarshal([]byte(jsonData), &data) if err != nil { - return "", errors.Wrapf(err, "error marshalling the json ip") + return "", errors.Wrapf(err, "error unmarshalling the json ip") } ipStr, found := data["ipv4"] diff --git a/pkg/routeagent_driver/main.go b/pkg/routeagent_driver/main.go index af81b9f2d5..98f9fd10ce 100644 --- a/pkg/routeagent_driver/main.go +++ b/pkg/routeagent_driver/main.go @@ -104,6 +104,7 @@ func main() { kubeproxy.NewSyncHandler(env.ClusterCidr, env.ServiceCidr), ovn.NewHandler(&env, smClientset), ovn.NewGatewayRouteHandler(&env, smClientset), + ovn.NewNonGatewayRouteHandler(smClientset, k8sClientSet), cabledriver.NewXRFMCleanupHandler(), cabledriver.NewVXLANCleanup(), mtu.NewMTUHandler(env.ClusterCidr, len(env.GlobalCidr) != 0, getTCPMssValue(k8sClientSet)),