Skip to content

Commit

Permalink
Change handler names and fix issues
Browse files Browse the repository at this point in the history
Signed-off-by: Aswin Suryanarayanan <[email protected]>
  • Loading branch information
aswinsuryan committed Jul 25, 2023
1 parent aa25392 commit 9ea4bf3
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 110 deletions.
2 changes: 2 additions & 0 deletions pkg/routeagent_driver/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,6 @@ const (

NATTable = "nat"
FilterTable = "filter"

OvnTransitSwitchIPAnnotation = "k8s.ovn.org/node-transit-switch-port-ifaddr"
)
100 changes: 62 additions & 38 deletions pkg/routeagent_driver/handlers/ovn/submarinerGWRouteHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@ import (
"sync"

"github.com/pkg/errors"
"github.com/submariner-io/admiral/pkg/log"
"github.com/submariner-io/admiral/pkg/resource"
"github.com/submariner-io/admiral/pkg/util"
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
submarinerClientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned"
"github.com/submariner-io/submariner/pkg/event"
"github.com/submariner-io/submariner/pkg/routeagent_driver/environment"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)

type GatewayRouteHandler struct {
Expand All @@ -50,99 +54,119 @@ func NewGatewayRouteHandler(env *environment.Specification, smClientSet submarin
}
}

func (gatewayRouteHandler *GatewayRouteHandler) Init() error {
logger.Infof("Starting GatewayRouteHandler")
func (h *GatewayRouteHandler) Init() error {
logger.Info("Starting GatewayRouteHandler")

nextHopIP, err := getNextHopOnK8sMgmtIntf()
if err != nil || nextHopIP == "" {
return errors.Wrapf(err, "error getting the ovn kubernetes management interface IP")
}

gatewayRouteHandler.nextHopIP = nextHopIP
h.nextHopIP = nextHopIP

return nil
}

func (gatewayRouteHandler *GatewayRouteHandler) GetName() string {
func (h *GatewayRouteHandler) GetName() string {
return "submariner-gw-route-handler"
}

func (gatewayRouteHandler *GatewayRouteHandler) GetNetworkPlugins() []string {
func (h *GatewayRouteHandler) GetNetworkPlugins() []string {
// TODO enable when we switch to new implementation
// return []string{cni.OVNKubernetes}
return []string{}
}

func (gatewayRouteHandler *GatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.Endpoint) error {
gatewayRouteHandler.mutex.Lock()
defer gatewayRouteHandler.mutex.Unlock()
func (h *GatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.Endpoint) error {
h.mutex.Lock()
defer h.mutex.Unlock()

gatewayRouteHandler.remoteEndpoints[endpoint.Name] = endpoint
h.remoteEndpoints[endpoint.Name] = endpoint

if gatewayRouteHandler.isGateway {
_, err := gatewayRouteHandler.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Create(context.TODO(),
gatewayRouteHandler.newGatewayRoute(endpoint), metav1.CreateOptions{})
if !apierrors.IsAlreadyExists(err) {
if h.isGateway {
_, err := h.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Create(context.TODO(),
h.newGatewayRoute(endpoint), metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
return errors.Wrapf(err, "error processing the remote endpoint creation for %q", endpoint.Name)
}
}

return nil
}

func (gatewayRouteHandler *GatewayRouteHandler) RemoteEndpointRemoved(endpoint *submarinerv1.Endpoint) error {
gatewayRouteHandler.mutex.Lock()
defer gatewayRouteHandler.mutex.Unlock()
func (h *GatewayRouteHandler) RemoteEndpointRemoved(endpoint *submarinerv1.Endpoint) error {
h.mutex.Lock()
defer h.mutex.Unlock()

delete(gatewayRouteHandler.remoteEndpoints, endpoint.Name)
delete(h.remoteEndpoints, endpoint.Name)

if gatewayRouteHandler.isGateway {
if err := gatewayRouteHandler.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Delete(context.TODO(),
endpoint.Name, metav1.DeleteOptions{}); err != nil {
if h.isGateway {
if err := h.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Delete(context.TODO(),
endpoint.Name, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "error deleting gatewayRoute %q", endpoint.Name)
}
}

return nil
}

func (gatewayRouteHandler *GatewayRouteHandler) TransitionToNonGateway() error {
gatewayRouteHandler.mutex.Lock()
defer gatewayRouteHandler.mutex.Unlock()
func (h *GatewayRouteHandler) TransitionToNonGateway() error {
h.mutex.Lock()
defer h.mutex.Unlock()

gatewayRouteHandler.isGateway = false
h.isGateway = false

return nil
}

func (gatewayRouteHandler *GatewayRouteHandler) TransitionToGateway() error {
gatewayRouteHandler.mutex.Lock()
defer gatewayRouteHandler.mutex.Unlock()
func (h *GatewayRouteHandler) TransitionToGateway() error {
h.mutex.Lock()
defer h.mutex.Unlock()

gatewayRouteHandler.isGateway = true
h.isGateway = true

for _, endpoint := range gatewayRouteHandler.remoteEndpoints {
if _, err := gatewayRouteHandler.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Update(context.TODO(),
gatewayRouteHandler.newGatewayRoute(endpoint), metav1.UpdateOptions{}); err != nil {
if !apierrors.IsNotFound(err) {
_, err = gatewayRouteHandler.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Create(context.TODO(),
gatewayRouteHandler.newGatewayRoute(endpoint), metav1.CreateOptions{})
return errors.Wrapf(err, "error updating gatewayRoute %q", endpoint.Name)
}
for _, endpoint := range h.remoteEndpoints {
gwr := h.newGatewayRoute(endpoint)

result, err := util.CreateOrUpdate(context.TODO(), h.gatewayResourceInterface(endpoint.Namespace),
gwr, util.Replace(gwr))
if err != nil {
return errors.Wrapf(err, "error creating/updating GatewayRoute")
}

logger.V(log.TRACE).Infof("GatewayRoute %s: %#v", result, gwr)
}

return nil
}

func (gatewayRouteHandler *GatewayRouteHandler) newGatewayRoute(endpoint *submarinerv1.Endpoint) *submarinerv1.GatewayRoute {
func (h *GatewayRouteHandler) newGatewayRoute(endpoint *submarinerv1.Endpoint) *submarinerv1.GatewayRoute {
return &submarinerv1.GatewayRoute{
ObjectMeta: metav1.ObjectMeta{
Name: endpoint.Name,
},
RoutePolicySpec: submarinerv1.RoutePolicySpec{
RemoteCIDRs: endpoint.Spec.Subnets,
NextHops: []string{gatewayRouteHandler.nextHopIP},
NextHops: []string{h.nextHopIP},
},
}
}

//nolint // These functions are pass-through wrappers for the k8s APIs.
func (h *GatewayRouteHandler) gatewayResourceInterface(namespace string) resource.Interface {

return &resource.InterfaceFuncs{
GetFunc: func(ctx context.Context, name string, options metav1.GetOptions) (runtime.Object, error) {
return h.smClient.SubmarinerV1().GatewayRoutes(namespace).Get(ctx, name, options)
},
CreateFunc: func(ctx context.Context, obj runtime.Object, options metav1.CreateOptions) (runtime.Object, error) {
return h.smClient.SubmarinerV1().GatewayRoutes(namespace).Create(ctx, obj.(*submarinerv1.GatewayRoute), options)
},
UpdateFunc: func(ctx context.Context, obj runtime.Object, options metav1.UpdateOptions) (runtime.Object, error) {
return h.smClient.SubmarinerV1().GatewayRoutes(namespace).Update(ctx, obj.(*submarinerv1.GatewayRoute), options)
},
DeleteFunc: func(ctx context.Context, name string, options metav1.DeleteOptions) error {
return h.smClient.SubmarinerV1().GatewayRoutes(namespace).Delete(ctx, name, options)
},
}
}
123 changes: 71 additions & 52 deletions pkg/routeagent_driver/handlers/ovn/submarinerNonGWRouteHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,20 @@ import (
"sync"

"github.com/pkg/errors"
"github.com/submariner-io/admiral/pkg/log"
"github.com/submariner-io/admiral/pkg/resource"
"github.com/submariner-io/admiral/pkg/util"
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
submarinerClientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned"
"github.com/submariner-io/submariner/pkg/event"
nodeutil "github.com/submariner-io/submariner/pkg/node"
"github.com/submariner-io/submariner/pkg/routeagent_driver/constants"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
clientset "k8s.io/client-go/kubernetes"
)

const (
ovnTransitSwitchIPAnnotation = "k8s.ovn.org/node-transit-switch-port-ifaddr"
)

type NonGatewayRouteHandler struct {
event.HandlerBase
mutex sync.Mutex
Expand All @@ -55,118 +56,136 @@ func NewNonGatewayRouteHandler(smClientSet submarinerClientset.Interface, k8sCli
}
}

func (nonGatewayRouteHandler *NonGatewayRouteHandler) Init() error {
logger.Infof("Starting NonGatewayRouteHandler")
func (h *NonGatewayRouteHandler) Init() error {
logger.Info("Starting NonGatewayRouteHandler")

node, err := nodeutil.GetLocalNode(nonGatewayRouteHandler.k8sClientSet)
node, err := nodeutil.GetLocalNode(h.k8sClientSet)
if err != nil {
return errors.Wrap(err, "error getting the g/w node")
}

annotations := node.GetAnnotations()

// TODO transitSwitchIP changes support needs to be added.
transitSwitchIP, ok := annotations[ovnTransitSwitchIPAnnotation]
transitSwitchIP, ok := annotations[constants.OvnTransitSwitchIPAnnotation]
if !ok {
logger.Infof("No transit switch IP configured")
return nil
}

nonGatewayRouteHandler.transitSwitchIP, err = jsonToIP(transitSwitchIP)
if !ok {
return errors.Wrapf(err, "error parsing the transit switch IP")
}
h.transitSwitchIP, err = jsonToIP(transitSwitchIP)

return nil
return errors.Wrapf(err, "error parsing the transit switch IP")
}

func (nonGatewayRouteHandler *NonGatewayRouteHandler) GetName() string {
func (h *NonGatewayRouteHandler) GetName() string {
return "submariner-nongw-route-handler"
}

func (nonGatewayRouteHandler *NonGatewayRouteHandler) GetNetworkPlugins() []string {
func (h *NonGatewayRouteHandler) GetNetworkPlugins() []string {
// TODO enable when we switch to new implementation
// return []string{cni.OVNKubernetes}
return []string{}
}

func (nonGatewayRouteHandler *NonGatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.Endpoint) error {
nonGatewayRouteHandler.mutex.Lock()
defer nonGatewayRouteHandler.mutex.Unlock()
func (h *NonGatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.Endpoint) error {
h.mutex.Lock()
defer h.mutex.Unlock()

nonGatewayRouteHandler.remoteEndpoints[endpoint.Name] = endpoint
h.remoteEndpoints[endpoint.Name] = endpoint

if !nonGatewayRouteHandler.isGateway || nonGatewayRouteHandler.transitSwitchIP == "" {
if !h.isGateway || h.transitSwitchIP == "" {
return nil
}

_, err := nonGatewayRouteHandler.smClient.SubmarinerV1().
_, err := h.smClient.SubmarinerV1().
NonGatewayRoutes(endpoint.Namespace).Create(context.TODO(),
nonGatewayRouteHandler.newNonGatewayRoute(endpoint), metav1.CreateOptions{})
if !apierrors.IsAlreadyExists(err) {
h.newNonGatewayRoute(endpoint), metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
return errors.Wrapf(err, "error processing the remote endpoint create event for %q", endpoint.Name)
}

return nil
}

func (nonGatewayRouteHandler *NonGatewayRouteHandler) RemoteEndpointRemoved(endpoint *submarinerv1.Endpoint) error {
nonGatewayRouteHandler.mutex.Lock()
defer nonGatewayRouteHandler.mutex.Unlock()
delete(nonGatewayRouteHandler.remoteEndpoints, endpoint.Name)
func (h *NonGatewayRouteHandler) RemoteEndpointRemoved(endpoint *submarinerv1.Endpoint) error {
h.mutex.Lock()
defer h.mutex.Unlock()
delete(h.remoteEndpoints, endpoint.Name)

if !nonGatewayRouteHandler.isGateway || nonGatewayRouteHandler.transitSwitchIP == "" {
if !h.isGateway || h.transitSwitchIP == "" {
return nil
}

if err := nonGatewayRouteHandler.smClient.SubmarinerV1().NonGatewayRoutes(endpoint.Namespace).Delete(context.TODO(),
endpoint.Name, metav1.DeleteOptions{}); err != nil {
if err := h.smClient.SubmarinerV1().NonGatewayRoutes(endpoint.Namespace).Delete(context.TODO(),
endpoint.Name, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "error deleting nonGatewayRoute %q", endpoint.Name)
}

return nil
}

func (nonGatewayRouteHandler *NonGatewayRouteHandler) TransitionToNonGateway() error {
nonGatewayRouteHandler.mutex.Lock()
defer nonGatewayRouteHandler.mutex.Unlock()
func (h *NonGatewayRouteHandler) TransitionToNonGateway() error {
h.mutex.Lock()
defer h.mutex.Unlock()

nonGatewayRouteHandler.isGateway = false
h.isGateway = false

return nil
}

func (nonGatewayRouteHandler *NonGatewayRouteHandler) TransitionToGateway() error {
nonGatewayRouteHandler.mutex.Lock()
defer nonGatewayRouteHandler.mutex.Unlock()

nonGatewayRouteHandler.isGateway = true
if nonGatewayRouteHandler.transitSwitchIP != "" {
for _, endpoint := range nonGatewayRouteHandler.remoteEndpoints {
if _, err := nonGatewayRouteHandler.smClient.SubmarinerV1().NonGatewayRoutes(endpoint.Namespace).Update(context.TODO(),
nonGatewayRouteHandler.newNonGatewayRoute(endpoint), metav1.UpdateOptions{}); err != nil {
if !apierrors.IsNotFound(err) {
_, err = nonGatewayRouteHandler.smClient.SubmarinerV1().NonGatewayRoutes(endpoint.Namespace).Create(context.TODO(),
nonGatewayRouteHandler.newNonGatewayRoute(endpoint), metav1.CreateOptions{})

return errors.Wrapf(err, "error updating nonGatewayRoute %q", endpoint.Name)
}
}
func (h *NonGatewayRouteHandler) TransitionToGateway() error {
h.mutex.Lock()
defer h.mutex.Unlock()

h.isGateway = true

if h.transitSwitchIP == "" {
return nil
}

for _, endpoint := range h.remoteEndpoints {
ngwr := h.newNonGatewayRoute(endpoint)

result, err := util.CreateOrUpdate(context.TODO(), h.nonGatewayResourceInterface(endpoint.Namespace),
ngwr, util.Replace(ngwr))
if err != nil {
return errors.Wrapf(err, "error creating/updating NonGatewayRoute")
}

logger.V(log.TRACE).Infof("GatewayRoute %s: %#v", result, ngwr)
}

return nil
}

func (nonGatewayRouteHandler *NonGatewayRouteHandler) newNonGatewayRoute(endpoint *submarinerv1.Endpoint) *submarinerv1.NonGatewayRoute {
func (h *NonGatewayRouteHandler) newNonGatewayRoute(endpoint *submarinerv1.Endpoint) *submarinerv1.NonGatewayRoute {
return &submarinerv1.NonGatewayRoute{
ObjectMeta: metav1.ObjectMeta{
Name: endpoint.Name,
Namespace: endpoint.Namespace,
},
RoutePolicySpec: submarinerv1.RoutePolicySpec{
RemoteCIDRs: endpoint.Spec.Subnets,
NextHops: []string{nonGatewayRouteHandler.transitSwitchIP},
NextHops: []string{h.transitSwitchIP},
},
}
}

//nolint // These functions are pass-through wrappers for the k8s APIs.
func (h *NonGatewayRouteHandler) nonGatewayResourceInterface(namespace string) resource.Interface {
return &resource.InterfaceFuncs{
GetFunc: func(ctx context.Context, name string, options metav1.GetOptions) (runtime.Object, error) {
return h.smClient.SubmarinerV1().NonGatewayRoutes(namespace).Get(ctx, name, options)
},
CreateFunc: func(ctx context.Context, obj runtime.Object, options metav1.CreateOptions) (runtime.Object, error) {
return h.smClient.SubmarinerV1().NonGatewayRoutes(namespace).Create(ctx, obj.(*submarinerv1.NonGatewayRoute), options)
},
UpdateFunc: func(ctx context.Context, obj runtime.Object, options metav1.UpdateOptions) (runtime.Object, error) {
return h.smClient.SubmarinerV1().NonGatewayRoutes(namespace).Update(ctx, obj.(*submarinerv1.NonGatewayRoute), options)
},
DeleteFunc: func(ctx context.Context, name string, options metav1.DeleteOptions) error {
return h.smClient.SubmarinerV1().NonGatewayRoutes(namespace).Delete(ctx, name, options)
},
}
}
Loading

0 comments on commit 9ea4bf3

Please sign in to comment.