Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add handlers to create GatewayRoute and NonGatewayRoute CRs #2569

Merged
merged 7 commits into from
Jul 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/routeagent_driver/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,6 @@ const (

NATTable = "nat"
FilterTable = "filter"

OvnTransitSwitchIPAnnotation = "k8s.ovn.org/node-transit-switch-port-ifaddr"
)
172 changes: 172 additions & 0 deletions pkg/routeagent_driver/handlers/ovn/gateway_route_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
SPDX-License-Identifier: Apache-2.0

Copyright Contributors to the Submariner project.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ovn

import (
"context"
"sync"

"github.com/pkg/errors"
"github.com/submariner-io/admiral/pkg/log"
"github.com/submariner-io/admiral/pkg/resource"
"github.com/submariner-io/admiral/pkg/util"
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
submarinerClientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned"
"github.com/submariner-io/submariner/pkg/event"
"github.com/submariner-io/submariner/pkg/routeagent_driver/environment"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)

type GatewayRouteHandler struct {
event.HandlerBase
mutex sync.Mutex
smClient submarinerClientset.Interface
config *environment.Specification
remoteEndpoints map[string]*submarinerv1.Endpoint
isGateway bool
nextHopIP string
}

func NewGatewayRouteHandler(env *environment.Specification, smClientSet submarinerClientset.Interface) *GatewayRouteHandler {
// We'll panic if env is nil, this is intentional
return &GatewayRouteHandler{
config: env,
smClient: smClientSet,
remoteEndpoints: map[string]*submarinerv1.Endpoint{},
}
}

func (h *GatewayRouteHandler) Init() error {
logger.Info("Starting GatewayRouteHandler")

nextHopIP, err := getNextHopOnK8sMgmtIntf()
if err != nil || nextHopIP == "" {
return errors.Wrapf(err, "error getting the ovn kubernetes management interface IP")
}

h.nextHopIP = nextHopIP

return nil
}

func (h *GatewayRouteHandler) GetName() string {
return "submariner-gw-route-handler"
}

func (h *GatewayRouteHandler) GetNetworkPlugins() []string {
// TODO enable when we switch to new implementation
// return []string{cni.OVNKubernetes}
return []string{}
}

func (h *GatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.Endpoint) error {
h.mutex.Lock()
defer h.mutex.Unlock()

h.remoteEndpoints[endpoint.Name] = endpoint

if h.isGateway {
_, err := h.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Create(context.TODO(),
h.newGatewayRoute(endpoint), metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
return errors.Wrapf(err, "error processing the remote endpoint creation for %q", endpoint.Name)
}
}

return nil
}

func (h *GatewayRouteHandler) RemoteEndpointRemoved(endpoint *submarinerv1.Endpoint) error {
h.mutex.Lock()
defer h.mutex.Unlock()

delete(h.remoteEndpoints, endpoint.Name)

if h.isGateway {
if err := h.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Delete(context.TODO(),
endpoint.Name, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "error deleting gatewayRoute %q", endpoint.Name)
}
}

return nil
}

func (h *GatewayRouteHandler) TransitionToNonGateway() error {
h.mutex.Lock()
defer h.mutex.Unlock()

h.isGateway = false

return nil
}

func (h *GatewayRouteHandler) TransitionToGateway() error {
h.mutex.Lock()
defer h.mutex.Unlock()

h.isGateway = true

for _, endpoint := range h.remoteEndpoints {
gwr := h.newGatewayRoute(endpoint)

result, err := util.CreateOrUpdate(context.TODO(), h.gatewayResourceInterface(endpoint.Namespace),
gwr, util.Replace(gwr))
if err != nil {
return errors.Wrapf(err, "error creating/updating GatewayRoute")
}

logger.V(log.TRACE).Infof("GatewayRoute %s: %#v", result, gwr)
}

return nil
}

func (h *GatewayRouteHandler) newGatewayRoute(endpoint *submarinerv1.Endpoint) *submarinerv1.GatewayRoute {
return &submarinerv1.GatewayRoute{
ObjectMeta: metav1.ObjectMeta{
Name: endpoint.Name,
},
RoutePolicySpec: submarinerv1.RoutePolicySpec{
RemoteCIDRs: endpoint.Spec.Subnets,
NextHops: []string{h.nextHopIP},
},
}
}

//nolint // These functions are pass-through wrappers for the k8s APIs.
func (h *GatewayRouteHandler) gatewayResourceInterface(namespace string) resource.Interface {

return &resource.InterfaceFuncs{
GetFunc: func(ctx context.Context, name string, options metav1.GetOptions) (runtime.Object, error) {
return h.smClient.SubmarinerV1().GatewayRoutes(namespace).Get(ctx, name, options)
},
CreateFunc: func(ctx context.Context, obj runtime.Object, options metav1.CreateOptions) (runtime.Object, error) {
return h.smClient.SubmarinerV1().GatewayRoutes(namespace).Create(ctx, obj.(*submarinerv1.GatewayRoute), options)
},
UpdateFunc: func(ctx context.Context, obj runtime.Object, options metav1.UpdateOptions) (runtime.Object, error) {
return h.smClient.SubmarinerV1().GatewayRoutes(namespace).Update(ctx, obj.(*submarinerv1.GatewayRoute), options)
},
DeleteFunc: func(ctx context.Context, name string, options metav1.DeleteOptions) error {
return h.smClient.SubmarinerV1().GatewayRoutes(namespace).Delete(ctx, name, options)
},
}
}
190 changes: 190 additions & 0 deletions pkg/routeagent_driver/handlers/ovn/non_gateway_route_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
SPDX-License-Identifier: Apache-2.0

Copyright Contributors to the Submariner project.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ovn

import (
"context"
"sync"

"github.com/pkg/errors"
"github.com/submariner-io/admiral/pkg/log"
"github.com/submariner-io/admiral/pkg/resource"
"github.com/submariner-io/admiral/pkg/util"
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
submarinerClientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned"
"github.com/submariner-io/submariner/pkg/event"
nodeutil "github.com/submariner-io/submariner/pkg/node"
"github.com/submariner-io/submariner/pkg/routeagent_driver/constants"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
clientset "k8s.io/client-go/kubernetes"
)

type NonGatewayRouteHandler struct {
event.HandlerBase
mutex sync.Mutex
smClient submarinerClientset.Interface
remoteEndpoints map[string]*submarinerv1.Endpoint
k8sClientSet clientset.Interface
isGateway bool
transitSwitchIP string
}

func NewNonGatewayRouteHandler(smClientSet submarinerClientset.Interface, k8sClientset *clientset.Clientset) *NonGatewayRouteHandler {
return &NonGatewayRouteHandler{
smClient: smClientSet,
remoteEndpoints: map[string]*submarinerv1.Endpoint{},
k8sClientSet: k8sClientset,
}
}

func (h *NonGatewayRouteHandler) Init() error {
logger.Info("Starting NonGatewayRouteHandler")

node, err := nodeutil.GetLocalNode(h.k8sClientSet)
if err != nil {
return errors.Wrap(err, "error getting the g/w node")
}

annotations := node.GetAnnotations()

// TODO transitSwitchIP changes support needs to be added.
transitSwitchIP, ok := annotations[constants.OvnTransitSwitchIPAnnotation]
if !ok {
logger.Infof("No transit switch IP configured")
return nil
}

h.transitSwitchIP, err = jsonToIP(transitSwitchIP)

return errors.Wrapf(err, "error parsing the transit switch IP")
}

func (h *NonGatewayRouteHandler) GetName() string {
return "submariner-nongw-route-handler"
}

func (h *NonGatewayRouteHandler) GetNetworkPlugins() []string {
// TODO enable when we switch to new implementation
// return []string{cni.OVNKubernetes}
return []string{}
}

func (h *NonGatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.Endpoint) error {
h.mutex.Lock()
defer h.mutex.Unlock()

h.remoteEndpoints[endpoint.Name] = endpoint

if !h.isGateway || h.transitSwitchIP == "" {
return nil
}

_, err := h.smClient.SubmarinerV1().
NonGatewayRoutes(endpoint.Namespace).Create(context.TODO(),
h.newNonGatewayRoute(endpoint), metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
return errors.Wrapf(err, "error processing the remote endpoint create event for %q", endpoint.Name)
}

return nil
}

func (h *NonGatewayRouteHandler) RemoteEndpointRemoved(endpoint *submarinerv1.Endpoint) error {
h.mutex.Lock()
defer h.mutex.Unlock()
delete(h.remoteEndpoints, endpoint.Name)

if !h.isGateway || h.transitSwitchIP == "" {
return nil
}

if err := h.smClient.SubmarinerV1().NonGatewayRoutes(endpoint.Namespace).Delete(context.TODO(),
endpoint.Name, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "error deleting nonGatewayRoute %q", endpoint.Name)
}

return nil
}

func (h *NonGatewayRouteHandler) TransitionToNonGateway() error {
h.mutex.Lock()
defer h.mutex.Unlock()

h.isGateway = false

return nil
}

func (h *NonGatewayRouteHandler) TransitionToGateway() error {
h.mutex.Lock()
defer h.mutex.Unlock()

h.isGateway = true

if h.transitSwitchIP == "" {
return nil
}

for _, endpoint := range h.remoteEndpoints {
ngwr := h.newNonGatewayRoute(endpoint)

result, err := util.CreateOrUpdate(context.TODO(), h.nonGatewayResourceInterface(endpoint.Namespace),
ngwr, util.Replace(ngwr))
if err != nil {
return errors.Wrapf(err, "error creating/updating NonGatewayRoute")
}

logger.V(log.TRACE).Infof("NonGatewayRoute %s: %#v", result, ngwr)
}

return nil
}

func (h *NonGatewayRouteHandler) newNonGatewayRoute(endpoint *submarinerv1.Endpoint) *submarinerv1.NonGatewayRoute {
return &submarinerv1.NonGatewayRoute{
ObjectMeta: metav1.ObjectMeta{
Name: endpoint.Name,
Namespace: endpoint.Namespace,
},
RoutePolicySpec: submarinerv1.RoutePolicySpec{
RemoteCIDRs: endpoint.Spec.Subnets,
NextHops: []string{h.transitSwitchIP},
},
}
}

//nolint // These functions are pass-through wrappers for the k8s APIs.
func (h *NonGatewayRouteHandler) nonGatewayResourceInterface(namespace string) resource.Interface {
return &resource.InterfaceFuncs{
GetFunc: func(ctx context.Context, name string, options metav1.GetOptions) (runtime.Object, error) {
return h.smClient.SubmarinerV1().NonGatewayRoutes(namespace).Get(ctx, name, options)
},
CreateFunc: func(ctx context.Context, obj runtime.Object, options metav1.CreateOptions) (runtime.Object, error) {
return h.smClient.SubmarinerV1().NonGatewayRoutes(namespace).Create(ctx, obj.(*submarinerv1.NonGatewayRoute), options)
},
UpdateFunc: func(ctx context.Context, obj runtime.Object, options metav1.UpdateOptions) (runtime.Object, error) {
return h.smClient.SubmarinerV1().NonGatewayRoutes(namespace).Update(ctx, obj.(*submarinerv1.NonGatewayRoute), options)
},
DeleteFunc: func(ctx context.Context, name string, options metav1.DeleteOptions) error {
return h.smClient.SubmarinerV1().NonGatewayRoutes(namespace).Delete(ctx, name, options)
},
}
}
Loading
Loading