Skip to content

Commit

Permalink
Change handler names and fix issues
Browse files Browse the repository at this point in the history
Signed-off-by: Aswin Suryanarayanan <[email protected]>
  • Loading branch information
aswinsuryan committed Jul 25, 2023
1 parent aa25392 commit 5c6ff97
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 111 deletions.
2 changes: 2 additions & 0 deletions pkg/routeagent_driver/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,6 @@ const (

NATTable = "nat"
FilterTable = "filter"

OvnTransitSwitchIPAnnotation = "k8s.ovn.org/node-transit-switch-port-ifaddr"
)
109 changes: 70 additions & 39 deletions pkg/routeagent_driver/handlers/ovn/submarinerGWRouteHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@ import (
"sync"

"github.com/pkg/errors"
"github.com/submariner-io/admiral/pkg/log"
"github.com/submariner-io/admiral/pkg/resource"
"github.com/submariner-io/admiral/pkg/util"
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
submarinerClientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned"
"github.com/submariner-io/submariner/pkg/event"
"github.com/submariner-io/submariner/pkg/routeagent_driver/environment"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)

type GatewayRouteHandler struct {
Expand All @@ -50,99 +54,126 @@ func NewGatewayRouteHandler(env *environment.Specification, smClientSet submarin
}
}

func (gatewayRouteHandler *GatewayRouteHandler) Init() error {
logger.Infof("Starting GatewayRouteHandler")
func (h *GatewayRouteHandler) Init() error {
logger.Info("Starting GatewayRouteHandler")

nextHopIP, err := getNextHopOnK8sMgmtIntf()
if err != nil || nextHopIP == "" {
return errors.Wrapf(err, "error getting the ovn kubernetes management interface IP")
}

gatewayRouteHandler.nextHopIP = nextHopIP
h.nextHopIP = nextHopIP

return nil
}

func (gatewayRouteHandler *GatewayRouteHandler) GetName() string {
func (h *GatewayRouteHandler) GetName() string {
return "submariner-gw-route-handler"
}

func (gatewayRouteHandler *GatewayRouteHandler) GetNetworkPlugins() []string {
func (h *GatewayRouteHandler) GetNetworkPlugins() []string {
// TODO enable when we switch to new implementation
// return []string{cni.OVNKubernetes}
return []string{}
}

func (gatewayRouteHandler *GatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.Endpoint) error {
gatewayRouteHandler.mutex.Lock()
defer gatewayRouteHandler.mutex.Unlock()
func (h *GatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.Endpoint) error {
h.mutex.Lock()
defer h.mutex.Unlock()

gatewayRouteHandler.remoteEndpoints[endpoint.Name] = endpoint
h.remoteEndpoints[endpoint.Name] = endpoint

if gatewayRouteHandler.isGateway {
_, err := gatewayRouteHandler.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Create(context.TODO(),
gatewayRouteHandler.newGatewayRoute(endpoint), metav1.CreateOptions{})
if !apierrors.IsAlreadyExists(err) {
if h.isGateway {
_, err := h.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Create(context.TODO(),
h.newGatewayRoute(endpoint), metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
return errors.Wrapf(err, "error processing the remote endpoint creation for %q", endpoint.Name)
}
}

return nil
}

func (gatewayRouteHandler *GatewayRouteHandler) RemoteEndpointRemoved(endpoint *submarinerv1.Endpoint) error {
gatewayRouteHandler.mutex.Lock()
defer gatewayRouteHandler.mutex.Unlock()
func (h *GatewayRouteHandler) RemoteEndpointRemoved(endpoint *submarinerv1.Endpoint) error {
h.mutex.Lock()
defer h.mutex.Unlock()

delete(gatewayRouteHandler.remoteEndpoints, endpoint.Name)
delete(h.remoteEndpoints, endpoint.Name)

if gatewayRouteHandler.isGateway {
if err := gatewayRouteHandler.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Delete(context.TODO(),
endpoint.Name, metav1.DeleteOptions{}); err != nil {
if h.isGateway {
if err := h.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Delete(context.TODO(),
endpoint.Name, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "error deleting gatewayRoute %q", endpoint.Name)
}
}

return nil
}

func (gatewayRouteHandler *GatewayRouteHandler) TransitionToNonGateway() error {
gatewayRouteHandler.mutex.Lock()
defer gatewayRouteHandler.mutex.Unlock()
func (h *GatewayRouteHandler) TransitionToNonGateway() error {
h.mutex.Lock()
defer h.mutex.Unlock()

gatewayRouteHandler.isGateway = false
h.isGateway = false

return nil
}

func (gatewayRouteHandler *GatewayRouteHandler) TransitionToGateway() error {
gatewayRouteHandler.mutex.Lock()
defer gatewayRouteHandler.mutex.Unlock()

gatewayRouteHandler.isGateway = true
func (h *GatewayRouteHandler) TransitionToGateway() error {
h.mutex.Lock()
defer h.mutex.Unlock()

h.isGateway = true

for _, endpoint := range h.remoteEndpoints {
gwr := h.newGatewayRoute(endpoint)
result, err := util.CreateOrUpdate(context.TODO(), h.gatewayResourceInterface(endpoint.Namespace), gwr,
func(existing runtime.Object) (runtime.Object, error) {
existingGwr := gwr
return existingGwr, nil
})
if err != nil {
return errors.Wrapf(err, "error creating/updating GatewayRoute")
}

for _, endpoint := range gatewayRouteHandler.remoteEndpoints {
if _, err := gatewayRouteHandler.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Update(context.TODO(),
gatewayRouteHandler.newGatewayRoute(endpoint), metav1.UpdateOptions{}); err != nil {
if !apierrors.IsNotFound(err) {
_, err = gatewayRouteHandler.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Create(context.TODO(),
gatewayRouteHandler.newGatewayRoute(endpoint), metav1.CreateOptions{})
return errors.Wrapf(err, "error updating gatewayRoute %q", endpoint.Name)
}
if result == util.OperationResultCreated {
logger.V(log.TRACE).Infof("GatewayRoute does not exist - created: %+v", gwr)
} else if result == util.OperationResultUpdated {
logger.V(log.TRACE).Infof("GatewayRoute already exists - updated %+v", gwr)
} else {
logger.V(log.TRACE).Info("GatewayRoute already exists but doesn't need updating")
}
}

return nil
}

func (gatewayRouteHandler *GatewayRouteHandler) newGatewayRoute(endpoint *submarinerv1.Endpoint) *submarinerv1.GatewayRoute {
func (h *GatewayRouteHandler) newGatewayRoute(endpoint *submarinerv1.Endpoint) *submarinerv1.GatewayRoute {
return &submarinerv1.GatewayRoute{
ObjectMeta: metav1.ObjectMeta{
Name: endpoint.Name,
},
RoutePolicySpec: submarinerv1.RoutePolicySpec{
RemoteCIDRs: endpoint.Spec.Subnets,
NextHops: []string{gatewayRouteHandler.nextHopIP},
NextHops: []string{h.nextHopIP},
},
}
}

func (h *GatewayRouteHandler) gatewayResourceInterface(namespace string) resource.Interface {
//nolint:wrapcheck // These functions are pass-through wrappers for the k8s APIs.
return &resource.InterfaceFuncs{
GetFunc: func(ctx context.Context, name string, options metav1.GetOptions) (runtime.Object, error) {
return h.smClient.SubmarinerV1().GatewayRoutes(namespace).Get(ctx, name, options)
},
CreateFunc: func(ctx context.Context, obj runtime.Object, options metav1.CreateOptions) (runtime.Object, error) {
return h.smClient.SubmarinerV1().GatewayRoutes(namespace).Create(ctx, obj.(*submarinerv1.GatewayRoute), options)
},
UpdateFunc: func(ctx context.Context, obj runtime.Object, options metav1.UpdateOptions) (runtime.Object, error) {
return h.smClient.SubmarinerV1().GatewayRoutes(namespace).Update(ctx, obj.(*submarinerv1.GatewayRoute), options)
},
DeleteFunc: func(ctx context.Context, name string, options metav1.DeleteOptions) error {
return h.smClient.SubmarinerV1().GatewayRoutes(namespace).Delete(ctx, name, options)
},
}
}
Loading

0 comments on commit 5c6ff97

Please sign in to comment.