Skip to content

Commit

Permalink
Revert "Handle updates to the node OVN transit switch IP"
Browse files Browse the repository at this point in the history
This reverts commit c57b883.

Signed-off-by: Aswin Suryanarayanan <[email protected]>
  • Loading branch information
aswinsuryan authored and tpantelis committed Oct 17, 2024
1 parent 9bf6a60 commit 6ac9266
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 424 deletions.
19 changes: 9 additions & 10 deletions pkg/routeagent_driver/handlers/ovn/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,14 @@ import (
type NewOVSDBClientFn func(_ model.ClientDBModel, _ ...libovsdbclient.Option) (libovsdbclient.Client, error)

type HandlerConfig struct {
Namespace string
ClusterCIDR []string
ServiceCIDR []string
SubmClient clientset.Interface
K8sClient kubernetes.Interface
DynClient dynamic.Interface
WatcherConfig *watcher.Config
NewOVSDBClient NewOVSDBClientFn
TransitSwitchIP TransitSwitchIPGetter
Namespace string
ClusterCIDR []string
ServiceCIDR []string
SubmClient clientset.Interface
K8sClient kubernetes.Interface
DynClient dynamic.Interface
WatcherConfig *watcher.Config
NewOVSDBClient NewOVSDBClientFn
}

type Handler struct {
Expand Down Expand Up @@ -125,7 +124,7 @@ func (ovn *Handler) Init() error {
return err
}

nonGatewayRouteController, err := NewNonGatewayRouteController(*ovn.WatcherConfig, connectionHandler, ovn.Namespace, ovn.TransitSwitchIP)
nonGatewayRouteController, err := NewNonGatewayRouteController(*ovn.WatcherConfig, connectionHandler, ovn.K8sClient, ovn.Namespace)
if err != nil {
return err
}
Expand Down
97 changes: 19 additions & 78 deletions pkg/routeagent_driver/handlers/ovn/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,7 @@ const (
var _ = Describe("Handler", func() {
t := newTestDriver()

var (
ovsdbClient *fakeovn.OVSDBClient
transitSwitchIP ovn.TransitSwitchIP
)
var ovsdbClient *fakeovn.OVSDBClient

BeforeEach(func() {
ovsdbClient = fakeovn.NewOVSDBClient()
Expand Down Expand Up @@ -82,8 +79,6 @@ var _ = Describe("Handler", func() {

restMapper := test.GetRESTMapperFor(&submarinerv1.GatewayRoute{}, &submarinerv1.NonGatewayRoute{})

transitSwitchIP = ovn.NewTransitSwitchIP()

t.Start(ovn.NewHandler(&ovn.HandlerConfig{
Namespace: testing.Namespace,
ClusterCIDR: []string{clusterCIDR},
Expand All @@ -98,11 +93,9 @@ var _ = Describe("Handler", func() {
NewOVSDBClient: func(_ model.ClientDBModel, _ ...libovsdbclient.Option) (libovsdbclient.Client, error) {
return ovsdbClient, nil
},
TransitSwitchIP: transitSwitchIP,
}))

Expect(ovsdbClient.Connected()).To(BeTrue())
Expect(transitSwitchIP.Init(t.k8sClient)).To(Succeed())
})

When("a remote Endpoint is created, updated, and deleted", func() {
Expand Down Expand Up @@ -261,89 +254,37 @@ var _ = Describe("Handler", func() {
})
})

When("NonGatewayRoutes are created, updated and deleted", func() {
verifyLogicalRouterPolicies := func(ngr *submarinerv1.NonGatewayRoute, nextHop string) {
for _, cidr := range ngr.RoutePolicySpec.RemoteCIDRs {
ovsdbClient.AwaitModel(&nbdb.LogicalRouterPolicy{
Match: cidr,
Nexthop: ptr.To(nextHop),
})
}
}

verifyNoLogicalRouterPolicies := func(ngr *submarinerv1.NonGatewayRoute, nextHop string) {
for _, cidr := range ngr.RoutePolicySpec.RemoteCIDRs {
ovsdbClient.AwaitNoModel(&nbdb.LogicalRouterPolicy{
Match: cidr,
Nexthop: ptr.To(nextHop),
})
}
}

When("a NonGatewayRoute is created and deleted", func() {
It("should correctly reconcile OVN router policies", func() {
client := t.dynClient.Resource(submarinerv1.SchemeGroupVersion.WithResource("nongatewayroutes")).Namespace(testing.Namespace)

By("Creating first NonGatewayRoute")

nextHop := "172.1.1.1"

nonGWRoute1 := &submarinerv1.NonGatewayRoute{
nonGWRoute := &submarinerv1.NonGatewayRoute{
ObjectMeta: metav1.ObjectMeta{
Name: "test-nongateway-route1",
Name: "test-nongateway-route",
},
RoutePolicySpec: submarinerv1.RoutePolicySpec{
NextHops: []string{nextHop},
RemoteCIDRs: []string{"111.0.1.0/24", "111.0.2.0/24"},
NextHops: []string{"111.1.1.1"},
RemoteCIDRs: []string{"192.0.1.0/24", "192.0.2.0/24"},
},
}

test.CreateResource(client, nonGWRoute1)

verifyLogicalRouterPolicies(nonGWRoute1, nextHop)
test.CreateResource(client, nonGWRoute)

By("Creating second NonGatewayRoute")

nonGWRoute2 := &submarinerv1.NonGatewayRoute{
ObjectMeta: metav1.ObjectMeta{
Name: "test-nongateway-route2",
},
RoutePolicySpec: submarinerv1.RoutePolicySpec{
NextHops: []string{nextHop},
RemoteCIDRs: []string{"222.0.1.0/24", "222.0.2.0/24"},
},
for _, cidr := range nonGWRoute.RoutePolicySpec.RemoteCIDRs {
ovsdbClient.AwaitModel(&nbdb.LogicalRouterPolicy{
Match: cidr,
Nexthop: ptr.To(nonGWRoute.RoutePolicySpec.NextHops[0]),
})
}

test.CreateResource(client, nonGWRoute2)

verifyLogicalRouterPolicies(nonGWRoute1, nextHop)
verifyLogicalRouterPolicies(nonGWRoute2, nextHop)

By("Updating NextHop for first NonGatewayRoute")

prevNextHop := nextHop
nextHop = "172.1.1.2"
nonGWRoute1.RoutePolicySpec.NextHops[0] = nextHop

test.UpdateResource(client, nonGWRoute1)

verifyLogicalRouterPolicies(nonGWRoute1, nextHop)
verifyNoLogicalRouterPolicies(nonGWRoute1, prevNextHop)
verifyNoLogicalRouterPolicies(nonGWRoute2, prevNextHop)

By("Updating NextHop for second NonGatewayRoute")
Expect(client.Delete(context.Background(), nonGWRoute.Name, metav1.DeleteOptions{})).To(Succeed())

nonGWRoute2.RoutePolicySpec.NextHops[0] = nextHop

test.UpdateResource(client, nonGWRoute2)

verifyLogicalRouterPolicies(nonGWRoute1, nextHop)
verifyLogicalRouterPolicies(nonGWRoute2, nextHop)

By("Deleting first NonGatewayRoute")

Expect(client.Delete(context.Background(), nonGWRoute1.Name, metav1.DeleteOptions{})).To(Succeed())

verifyNoLogicalRouterPolicies(nonGWRoute1, nextHop)
for _, cidr := range nonGWRoute.RoutePolicySpec.RemoteCIDRs {
ovsdbClient.AwaitNoModel(&nbdb.LogicalRouterPolicy{
Match: cidr,
Nexthop: ptr.To(nonGWRoute.RoutePolicySpec.NextHops[0]),
})
}
})
})

Expand Down
32 changes: 27 additions & 5 deletions pkg/routeagent_driver/handlers/ovn/non_gateway_route_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,23 @@ import (
"github.com/pkg/errors"
"github.com/submariner-io/admiral/pkg/watcher"
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
nodeutil "github.com/submariner-io/submariner/pkg/node"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
)

type NonGatewayRouteController struct {
nonGatewayRouteWatcher watcher.Interface
connectionHandler *ConnectionHandler
remoteSubnets sets.Set[string]
stopCh chan struct{}
transitSwitchIP TransitSwitchIPGetter
transitSwitchIP string
}

//nolint:gocritic // Ignore hugeParam
func NewNonGatewayRouteController(config watcher.Config, connectionHandler *ConnectionHandler,
namespace string, transitSwitchIP TransitSwitchIPGetter,
k8sClientSet clientset.Interface, namespace string,
) (*NonGatewayRouteController, error) {
// We'll panic if config is nil, this is intentional
var err error
Expand All @@ -45,7 +47,6 @@ func NewNonGatewayRouteController(config watcher.Config, connectionHandler *Conn
connectionHandler: connectionHandler,
remoteSubnets: sets.New[string](),
stopCh: make(chan struct{}),
transitSwitchIP: transitSwitchIP,
}

config.ResourceConfigs = []watcher.ResourceConfig{
Expand All @@ -61,6 +62,25 @@ func NewNonGatewayRouteController(config watcher.Config, connectionHandler *Conn
},
}

node, err := nodeutil.GetLocalNode(k8sClientSet)
if err != nil {
return nil, errors.Wrap(err, "error getting the local node info")
}

annotations := node.GetAnnotations()

transitSwitchIP := annotations["k8s.ovn.org/node-transit-switch-port-ifaddr"]
if transitSwitchIP == "" {
// This is a non-IC setup , so this controller will not be started.
logger.Infof("No transit switch IP configured on node %q", node.Name)
return controller, nil
}

controller.transitSwitchIP, err = jsonToIP(transitSwitchIP)
if err != nil {
return nil, errors.Wrapf(err, "error parsing transit switch IP")
}

controller.nonGatewayRouteWatcher, err = watcher.New(&config)
if err != nil {
return nil, errors.Wrap(err, "error creating resource watcher")
Expand Down Expand Up @@ -109,7 +129,7 @@ func (g *NonGatewayRouteController) reconcileRemoteSubnets(submNonGWRoute *subma
}

// If this node belongs to same zone as gateway node, ignore the event.
if submNonGWRoute.RoutePolicySpec.NextHops[0] != g.transitSwitchIP.Get() {
if submNonGWRoute.RoutePolicySpec.NextHops[0] != g.transitSwitchIP {
for _, subnet := range submNonGWRoute.RoutePolicySpec.RemoteCIDRs {
if addSubnet {
g.remoteSubnets.Insert(subnet)
Expand All @@ -125,5 +145,7 @@ func (g *NonGatewayRouteController) reconcileRemoteSubnets(submNonGWRoute *subma
}

func (g *NonGatewayRouteController) stop() {
close(g.stopCh)
if g.transitSwitchIP != "" {
close(g.stopCh)
}
}
77 changes: 30 additions & 47 deletions pkg/routeagent_driver/handlers/ovn/non_gateway_route_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,47 @@ import (
submarinerClientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned"
"github.com/submariner-io/submariner/pkg/cni"
"github.com/submariner-io/submariner/pkg/event"
corev1 "k8s.io/api/core/v1"
nodeutil "github.com/submariner-io/submariner/pkg/node"
"github.com/submariner-io/submariner/pkg/routeagent_driver/constants"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
clientset "k8s.io/client-go/kubernetes"
)

type NonGatewayRouteHandler struct {
event.HandlerBase
event.NodeHandlerBase
smClient submarinerClientset.Interface
k8sClient kubernetes.Interface
transitSwitchIP TransitSwitchIP
k8sClient clientset.Interface
transitSwitchIP string
}

func NewNonGatewayRouteHandler(smClient submarinerClientset.Interface, k8sClient kubernetes.Interface, transitSwitchIP TransitSwitchIP,
) *NonGatewayRouteHandler {
func NewNonGatewayRouteHandler(smClient submarinerClientset.Interface, k8sClient clientset.Interface) *NonGatewayRouteHandler {
return &NonGatewayRouteHandler{
smClient: smClient,
k8sClient: k8sClient,
transitSwitchIP: transitSwitchIP,
smClient: smClient,
k8sClient: k8sClient,
}
}

func (h *NonGatewayRouteHandler) Init() error {
logger.Info("Starting NonGatewayRouteHandler")
return errors.Wrap(h.transitSwitchIP.Init(h.k8sClient), "error initializing TransitSwitchIP")

node, err := nodeutil.GetLocalNode(h.k8sClient)
if err != nil {
return errors.Wrap(err, "error getting the g/w node")
}

annotations := node.GetAnnotations()

// TODO transitSwitchIP changes support needs to be added.
transitSwitchIP, ok := annotations[constants.OvnTransitSwitchIPAnnotation]
if !ok {
logger.Infof("No transit switch IP configured")
return nil
}

h.transitSwitchIP, err = jsonToIP(transitSwitchIP)

return errors.Wrapf(err, "error parsing the transit switch IP")
}

func (h *NonGatewayRouteHandler) GetName() string {
Expand All @@ -65,7 +80,7 @@ func (h *NonGatewayRouteHandler) GetNetworkPlugins() []string {
}

func (h *NonGatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.Endpoint) error {
if !h.State().IsOnGateway() || h.transitSwitchIP.Get() == "" {
if !h.State().IsOnGateway() || h.transitSwitchIP == "" {
return nil
}

Expand All @@ -83,7 +98,7 @@ func (h *NonGatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.En
}

func (h *NonGatewayRouteHandler) RemoteEndpointRemoved(endpoint *submarinerv1.Endpoint) error {
if !h.State().IsOnGateway() || h.transitSwitchIP.Get() == "" {
if !h.State().IsOnGateway() || h.transitSwitchIP == "" {
return nil
}

Expand All @@ -98,7 +113,7 @@ func (h *NonGatewayRouteHandler) RemoteEndpointRemoved(endpoint *submarinerv1.En
}

func (h *NonGatewayRouteHandler) TransitionToGateway() error {
if h.transitSwitchIP.Get() == "" {
if h.transitSwitchIP == "" {
return nil
}

Expand Down Expand Up @@ -126,38 +141,6 @@ func (h *NonGatewayRouteHandler) TransitionToGateway() error {
return nil
}

func (h *NonGatewayRouteHandler) NodeUpdated(node *corev1.Node) error {
updated, err := h.transitSwitchIP.UpdateFrom(node)
if err != nil {
logger.Errorf(err, "Error updating transit switch IP from node: %s", resource.ToJSON(node))
return nil
}

if !updated {
return nil
}

logger.Infof("Transit switch IP updated to %s", h.transitSwitchIP.Get())

if !h.State().IsOnGateway() {
return nil
}

endpoints := h.State().GetRemoteEndpoints()
for i := range endpoints {
err = util.Update(context.TODO(), NonGatewayResourceInterface(h.smClient, endpoints[i].Namespace),
h.newNonGatewayRoute(&endpoints[i]), func(existing *submarinerv1.NonGatewayRoute) (*submarinerv1.NonGatewayRoute, error) {
existing.RoutePolicySpec.NextHops = []string{h.transitSwitchIP.Get()}
return existing, nil
})
if err != nil {
return errors.Wrapf(err, "error updating NonGatewayRoute")
}
}

return nil
}

func (h *NonGatewayRouteHandler) newNonGatewayRoute(endpoint *submarinerv1.Endpoint) *submarinerv1.NonGatewayRoute {
return &submarinerv1.NonGatewayRoute{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -166,7 +149,7 @@ func (h *NonGatewayRouteHandler) newNonGatewayRoute(endpoint *submarinerv1.Endpo
},
RoutePolicySpec: submarinerv1.RoutePolicySpec{
RemoteCIDRs: endpoint.Spec.Subnets,
NextHops: []string{h.transitSwitchIP.Get()},
NextHops: []string{h.transitSwitchIP},
},
}
}
Expand Down
Loading

0 comments on commit 6ac9266

Please sign in to comment.