diff --git a/pkg/routeagent_driver/handlers/ovn/connection.go b/pkg/routeagent_driver/handlers/ovn/connection.go index 1fab3c3bc..7faa134fa 100644 --- a/pkg/routeagent_driver/handlers/ovn/connection.go +++ b/pkg/routeagent_driver/handlers/ovn/connection.go @@ -36,19 +36,25 @@ import ( "github.com/submariner-io/submariner/pkg/routeagent_driver/constants" "github.com/submariner-io/submariner/pkg/util/clusterfiles" corev1 "k8s.io/api/core/v1" + apiError "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" clientset "k8s.io/client-go/kubernetes" "k8s.io/utils/net" ) type ConnectionHandler struct { - k8sClientset clientset.Interface - nbdb libovsdbclient.Client + k8sClientset clientset.Interface + dynamicClient dynamic.Interface + nbdb libovsdbclient.Client } -func NewConnectionHandler(k8sClientset clientset.Interface) *ConnectionHandler { +func NewConnectionHandler(k8sClientset clientset.Interface, dynamicClient dynamic.Interface) *ConnectionHandler { return &ConnectionHandler{ - k8sClientset: k8sClientset, + k8sClientset: k8sClientset, + dynamicClient: dynamicClient, } } @@ -108,7 +114,7 @@ func (c *ConnectionHandler) createLibovsdbClient(dbModel model.ClientDBModel) (l // Will use empty zone if not found zoneName := annotations[constants.OvnZoneAnnotation] - dbAddress, err := discoverOvnKubernetesNetwork(context.TODO(), c.k8sClientset, zoneName) + dbAddress, err := discoverOvnKubernetesNetwork(context.TODO(), c.k8sClientset, c.dynamicClient, zoneName) if err != nil { return nil, errors.Wrap(err, "error getting the OVN NBDB Address") } @@ -182,69 +188,83 @@ func getTLSConfig(k8sClientset clientset.Interface) (*tls.Config, error) { return tlsConfig, nil } +func discoverOvnKubernetesNetwork(ctx context.Context, k8sClientset clientset.Interface, + dynamicClient dynamic.Interface, zoneName string, +) (string, error) { + openshiftNetwork, err := FindOpenshiftNetwork(ctx, dynamicClient) + if err != nil { + return "", err + } + + if openshiftNetwork != nil { + return discoverOpenshiftOvnKubernetesNetwork(ctx, k8sClientset) + } + + return discoverKindOvnKubernetesNetwork(ctx, k8sClientset, zoneName) +} + +/* + +openshift non-ic: in this case there will get the "app=ovnkube-node" and check for ovnkube-db service, +if present we will the default Openshift db path + +openshift ic with one node per zone: if above service is not present we will return default Openshift socket path +*/ + +func discoverOpenshiftOvnKubernetesNetwork(ctx context.Context, k8sClientSet clientset.Interface) (string, error) { + ovnPod, err := FindPod(ctx, k8sClientSet, "app=ovnkube-node") + if err != nil { + return "", errors.Wrap(err, "error finding a pod with label \"app=ovnkube-node\"") + } + + _, err = k8sClientSet.CoreV1().Services(ovnPod.Namespace).Get(ctx, ovnKubeService, metav1.GetOptions{}) + if err == nil { + return defaultOpenshiftOVNNBDB, nil + } + + return defaultOVNOpenshiftUnixSocket, nil +} + /* The discovery method is different for each kind of deployment. kind non-ic: Get the db pod with label name=ovnkube-db, if present will get the protocol and other details from it - openshift non-ic: in this case there will be no pod with name=ovnkube-db but there will be a ovnkube-db service, - in this case we will return the default Openshift db path - kind ic with multiple node per zone: we parse every endpoint in the namespace ovn uses and find the one that has same zone as the node in which route-agent runs. To get the ovn namespace we use pods running on app=ovnkube-node. kind ic with one node per zone: we will have no endpoints matching hence we will return default kind socket path - - openshift ic with one node per zone: we will have endpoints in the namespace ovn but none will match the zone as - these are not db endpoints. Here we will return default Openshift socket path */ -func discoverOvnKubernetesNetwork(ctx context.Context, k8sClientset clientset.Interface, zoneName string) (string, error) { - ovnDBPod, err := FindPod(ctx, k8sClientset, "name=ovnkube-db") +func discoverKindOvnKubernetesNetwork(ctx context.Context, k8sClientSet clientset.Interface, zoneName string) (string, error) { + ovnDBPod, err := FindPod(ctx, k8sClientSet, "name=ovnkube-db") if err != nil { return "", err } - ovnPod, err := FindPod(ctx, k8sClientset, "app=ovnkube-node") - if err != nil { - return "", err + if ovnDBPod != nil { + return discoverKindOvnDBClusterNetwork(ctx, ovnDBPod, k8sClientSet) } - if ovnPod == nil { - return defaultOVNOpenshiftUnixSocket, nil + ovnPod, err := FindPod(ctx, k8sClientSet, "app=ovnkube-node") + if err != nil { + return "", err } - var nbdbAddress string - _, err = k8sClientset.CoreV1().Services(ovnPod.Namespace).Get(ctx, ovnKubeService, metav1.GetOptions{}) - - // Openshift deployments uses DB that are parts of the node pod and there will not be a separate db pod for non-ic. - if ovnDBPod == nil && err == nil { - return defaultOpenshiftOVNNBDB, nil - } - // Kind deployments will have a db pod and db service for non-ic. - if ovnDBPod != nil && err == nil { - // This is a Kind non-IC deployment - nbdbAddress = discoverOvnDBClusterNetwork(ovnDBPod) - } else { - if zoneName == "" { - return defaultOVNOpenshiftUnixSocket, nil - } + return discoverKindOvnNodeClusterNetwork(ctx, k8sClientSet, zoneName, ovnPod) +} - nbdbAddress, err = discoverOvnNodeClusterNetwork(ctx, k8sClientset, zoneName, ovnPod) - if err != nil { - return "", err - } +func discoverKindOvnDBClusterNetwork(ctx context.Context, ovnDBPod *corev1.Pod, k8sClientSet clientset.Interface) (string, error) { + _, err := k8sClientSet.CoreV1().Services(ovnDBPod.Namespace).Get(ctx, ovnKubeService, metav1.GetOptions{}) + if err != nil { + return "", errors.Wrap(err, "OVN DB service is not present but expected") } - return nbdbAddress, nil -} - -func discoverOvnDBClusterNetwork(ovnDBPod *corev1.Pod) string { dbConnectionProtocol := findProtocol(ovnDBPod) - return fmt.Sprintf("%s:%s.%s:%d", dbConnectionProtocol, ovnKubeService, ovnDBPod.Namespace, ovnNBDBDefaultPort) + return fmt.Sprintf("%s:%s.%s:%d", dbConnectionProtocol, ovnKubeService, ovnDBPod.Namespace, ovnNBDBDefaultPort), nil } -func discoverOvnNodeClusterNetwork(ctx context.Context, k8sClientset clientset.Interface, +func discoverKindOvnNodeClusterNetwork(ctx context.Context, k8sClientset clientset.Interface, zoneName string, ovnPod *corev1.Pod, ) (string, error) { endpointList, err := findEndpoint(ctx, k8sClientset, ovnPod.Namespace) @@ -255,31 +275,32 @@ func discoverOvnNodeClusterNetwork(ctx context.Context, k8sClientset clientset.I var nbdbAddress string if endpointList == nil || len(endpointList.Items) == 0 { - // Kind setup will not have endpoints when using socket mode. nbdbAddress = defaultOVNUnixSocket } else { - nbdbAddress = createClusterNetworkWithEndpoints(endpointList.Items, zoneName) + nbdbAddress, err = createClusterNetworkWithEndpoints(endpointList.Items, zoneName) + if err != nil { + return "", err + } } return nbdbAddress, nil } -func createClusterNetworkWithEndpoints(endPoints []corev1.Endpoints, zoneName string) string { +func createClusterNetworkWithEndpoints(endPoints []corev1.Endpoints, zoneName string) (string, error) { for index := range endPoints { for _, subset := range endPoints[index].Subsets { if strings.Contains(endPoints[index].Name, zoneName) { for _, port := range subset.Ports { if strings.Contains(port.Name, "north") && net.IsIPv4String(subset.Addresses[0].IP) { return fmt.Sprintf("%s:%s:%d", - port.Protocol, subset.Addresses[0].IP, ovnNBDBDefaultPort) + port.Protocol, subset.Addresses[0].IP, ovnNBDBDefaultPort), nil } } } } } - // Openshift will have endpoints but not related to ovn DB when using IC. - return defaultOVNOpenshiftUnixSocket + return "", fmt.Errorf("error finding an endpoint for the zone %q", zoneName) } func findEndpoint(ctx context.Context, k8sClientset clientset.Interface, endpointNameSpace string) (*corev1.EndpointsList, error) { @@ -317,3 +338,22 @@ func FindPod(ctx context.Context, k8sClientset clientset.Interface, labelSelecto return &podsList.Items[0], nil } + +//nolint:nilnil // Intentional as the purpose is to find. +func FindOpenshiftNetwork(ctx context.Context, dynamicClient dynamic.Interface) (*unstructured.Unstructured, error) { + networkClient := dynamicClient.Resource( + schema.GroupVersionResource{ + Group: "config.openshift.io", + Version: "v1", + Resource: "networks", + }, + ) + + networks, err := networkClient.Get(ctx, "cluster", metav1.GetOptions{}) + + if apiError.IsNotFound(err) { + return nil, nil + } + + return networks, errors.Wrapf(err, "error finding the openshift network CR") +} diff --git a/pkg/routeagent_driver/handlers/ovn/handler.go b/pkg/routeagent_driver/handlers/ovn/handler.go index 4c563e891..aaa5664c3 100644 --- a/pkg/routeagent_driver/handlers/ovn/handler.go +++ b/pkg/routeagent_driver/handlers/ovn/handler.go @@ -34,6 +34,7 @@ import ( "github.com/submariner-io/submariner/pkg/iptables" "github.com/submariner-io/submariner/pkg/netlink" "github.com/submariner-io/submariner/pkg/routeagent_driver/environment" + "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -44,6 +45,7 @@ type Handler struct { config *environment.Specification smClient clientset.Interface k8sClientset *kubernetes.Clientset + dynamicClient dynamic.Interface watcherConfig *watcher.Config cableRoutingInterface *net.Interface localEndpoint *submV1.Endpoint @@ -59,7 +61,7 @@ type Handler struct { var logger = log.Logger{Logger: logf.Log.WithName("OVN")} func NewHandler(env *environment.Specification, smClientSet clientset.Interface, k8sClientset *kubernetes.Clientset, - watcherConfig *watcher.Config, + dynamicClient dynamic.Interface, watcherConfig *watcher.Config, ) *Handler { // We'll panic if env is nil, this is intentional ipt, err := iptables.New() @@ -71,6 +73,7 @@ func NewHandler(env *environment.Specification, smClientSet clientset.Interface, config: env, smClient: smClientSet, k8sClientset: k8sClientset, + dynamicClient: dynamicClient, watcherConfig: watcherConfig, remoteEndpoints: map[string]*submV1.Endpoint{}, netlink: netlink.New(), @@ -97,7 +100,7 @@ func (ovn *Handler) Init() error { ovn.startRouteConfigSyncer(ovn.stopCh) - connectionHandler := NewConnectionHandler(ovn.k8sClientset) + connectionHandler := NewConnectionHandler(ovn.k8sClientset, ovn.dynamicClient) err = connectionHandler.initClients() if err != nil { diff --git a/pkg/routeagent_driver/main.go b/pkg/routeagent_driver/main.go index d5bd21b87..1c5705862 100644 --- a/pkg/routeagent_driver/main.go +++ b/pkg/routeagent_driver/main.go @@ -47,6 +47,7 @@ import ( "github.com/submariner-io/submariner/pkg/routeagent_driver/handlers/mtu" "github.com/submariner-io/submariner/pkg/routeagent_driver/handlers/ovn" "github.com/submariner-io/submariner/pkg/versions" + "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/clientcmd" @@ -96,6 +97,11 @@ func main() { logger.Fatalf("Error building clientset: %s", err.Error()) } + dynamicClientSet, err := dynamic.NewForConfig(cfg) + if err != nil { + logger.Fatalf("Error building dynamic client: %s", err.Error()) + } + err = v1.AddToScheme(scheme.Scheme) logger.FatalOnError(err, "Error adding submariner to the scheme") @@ -116,7 +122,7 @@ func main() { if err := registry.AddHandlers( eventlogger.NewHandler(), kubeproxy.NewSyncHandler(env.ClusterCidr, env.ServiceCidr), - ovn.NewHandler(&env, smClientset, k8sClientSet, config), + ovn.NewHandler(&env, smClientset, k8sClientSet, dynamicClientSet, config), ovn.NewGatewayRouteHandler(&env, smClientset), ovn.NewNonGatewayRouteHandler(smClientset, k8sClientSet), cabledriver.NewXRFMCleanupHandler(),