Skip to content

Commit

Permalink
Refactor code
Browse files Browse the repository at this point in the history
Signed-off-by: Aswin Suryanarayanan <[email protected]>
  • Loading branch information
aswinsuryan authored and sridhargaddam committed Sep 11, 2023
1 parent 1df7bf1 commit 69701d3
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 52 deletions.
138 changes: 89 additions & 49 deletions pkg/routeagent_driver/handlers/ovn/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,25 @@ import (
"github.com/submariner-io/submariner/pkg/routeagent_driver/constants"
"github.com/submariner-io/submariner/pkg/util/clusterfiles"
corev1 "k8s.io/api/core/v1"
apiError "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/utils/net"
)

type ConnectionHandler struct {
k8sClientset clientset.Interface
nbdb libovsdbclient.Client
k8sClientset clientset.Interface
dynamicClient dynamic.Interface
nbdb libovsdbclient.Client
}

func NewConnectionHandler(k8sClientset clientset.Interface) *ConnectionHandler {
func NewConnectionHandler(k8sClientset clientset.Interface, dynamicClient dynamic.Interface) *ConnectionHandler {
return &ConnectionHandler{
k8sClientset: k8sClientset,
k8sClientset: k8sClientset,
dynamicClient: dynamicClient,
}
}

Expand Down Expand Up @@ -108,7 +114,7 @@ func (c *ConnectionHandler) createLibovsdbClient(dbModel model.ClientDBModel) (l
// Will use empty zone if not found
zoneName := annotations[constants.OvnZoneAnnotation]

dbAddress, err := discoverOvnKubernetesNetwork(context.TODO(), c.k8sClientset, zoneName)
dbAddress, err := discoverOvnKubernetesNetwork(context.TODO(), c.k8sClientset, c.dynamicClient, zoneName)
if err != nil {
return nil, errors.Wrap(err, "error getting the OVN NBDB Address")
}
Expand Down Expand Up @@ -182,69 +188,83 @@ func getTLSConfig(k8sClientset clientset.Interface) (*tls.Config, error) {
return tlsConfig, nil
}

func discoverOvnKubernetesNetwork(ctx context.Context, k8sClientset clientset.Interface,
dynamicClient dynamic.Interface, zoneName string,
) (string, error) {
openshiftNetwork, err := FindOpenshiftNetwork(ctx, dynamicClient)
if err != nil {
return "", err
}

if openshiftNetwork != nil {
return discoverOpenshiftOvnKubernetesNetwork(ctx, k8sClientset)
}

return discoverKindOvnKubernetesNetwork(ctx, k8sClientset, zoneName)
}

/*
openshift non-ic: in this case there will get the "app=ovnkube-node" and check for ovnkube-db service,
if present we will the default Openshift db path
openshift ic with one node per zone: if above service is not present we will return default Openshift socket path
*/

func discoverOpenshiftOvnKubernetesNetwork(ctx context.Context, k8sClientSet clientset.Interface) (string, error) {
ovnPod, err := FindPod(ctx, k8sClientSet, "app=ovnkube-node")
if err != nil {
return "", errors.Wrap(err, "error finding a pod with label \"app=ovnkube-node\"")
}

_, err = k8sClientSet.CoreV1().Services(ovnPod.Namespace).Get(ctx, ovnKubeService, metav1.GetOptions{})
if err == nil {
return defaultOpenshiftOVNNBDB, nil
}

return defaultOVNOpenshiftUnixSocket, nil
}

/*
The discovery method is different for each kind of deployment.
kind non-ic: Get the db pod with label name=ovnkube-db, if present will get the protocol and other details from it
openshift non-ic: in this case there will be no pod with name=ovnkube-db but there will be a ovnkube-db service,
in this case we will return the default Openshift db path
kind ic with multiple node per zone: we parse every endpoint in the namespace ovn uses and find the one that has
same zone as the node in which route-agent runs. To get the ovn namespace we use pods running on app=ovnkube-node.
kind ic with one node per zone: we will have no endpoints matching hence we will return default kind socket path
openshift ic with one node per zone: we will have endpoints in the namespace ovn but none will match the zone as
these are not db endpoints. Here we will return default Openshift socket path
*/

func discoverOvnKubernetesNetwork(ctx context.Context, k8sClientset clientset.Interface, zoneName string) (string, error) {
ovnDBPod, err := FindPod(ctx, k8sClientset, "name=ovnkube-db")
func discoverKindOvnKubernetesNetwork(ctx context.Context, k8sClientSet clientset.Interface, zoneName string) (string, error) {
ovnDBPod, err := FindPod(ctx, k8sClientSet, "name=ovnkube-db")
if err != nil {
return "", err
}

ovnPod, err := FindPod(ctx, k8sClientset, "app=ovnkube-node")
if err != nil {
return "", err
if ovnDBPod != nil {
return discoverKindOvnDBClusterNetwork(ctx, ovnDBPod, k8sClientSet)
}

if ovnPod == nil {
return defaultOVNOpenshiftUnixSocket, nil
ovnPod, err := FindPod(ctx, k8sClientSet, "app=ovnkube-node")
if err != nil {
return "", err
}

var nbdbAddress string
_, err = k8sClientset.CoreV1().Services(ovnPod.Namespace).Get(ctx, ovnKubeService, metav1.GetOptions{})

// Openshift deployments uses DB that are parts of the node pod and there will not be a separate db pod for non-ic.
if ovnDBPod == nil && err == nil {
return defaultOpenshiftOVNNBDB, nil
}
// Kind deployments will have a db pod and db service for non-ic.
if ovnDBPod != nil && err == nil {
// This is a Kind non-IC deployment
nbdbAddress = discoverOvnDBClusterNetwork(ovnDBPod)
} else {
if zoneName == "" {
return defaultOVNOpenshiftUnixSocket, nil
}
return discoverKindOvnNodeClusterNetwork(ctx, k8sClientSet, zoneName, ovnPod)
}

nbdbAddress, err = discoverOvnNodeClusterNetwork(ctx, k8sClientset, zoneName, ovnPod)
if err != nil {
return "", err
}
func discoverKindOvnDBClusterNetwork(ctx context.Context, ovnDBPod *corev1.Pod, k8sClientSet clientset.Interface) (string, error) {
_, err := k8sClientSet.CoreV1().Services(ovnDBPod.Namespace).Get(ctx, ovnKubeService, metav1.GetOptions{})
if err != nil {
return "", errors.Wrap(err, "OVN DB service is not present but expected")
}

return nbdbAddress, nil
}

func discoverOvnDBClusterNetwork(ovnDBPod *corev1.Pod) string {
dbConnectionProtocol := findProtocol(ovnDBPod)

return fmt.Sprintf("%s:%s.%s:%d", dbConnectionProtocol, ovnKubeService, ovnDBPod.Namespace, ovnNBDBDefaultPort)
return fmt.Sprintf("%s:%s.%s:%d", dbConnectionProtocol, ovnKubeService, ovnDBPod.Namespace, ovnNBDBDefaultPort), nil
}

func discoverOvnNodeClusterNetwork(ctx context.Context, k8sClientset clientset.Interface,
func discoverKindOvnNodeClusterNetwork(ctx context.Context, k8sClientset clientset.Interface,
zoneName string, ovnPod *corev1.Pod,
) (string, error) {
endpointList, err := findEndpoint(ctx, k8sClientset, ovnPod.Namespace)
Expand All @@ -255,31 +275,32 @@ func discoverOvnNodeClusterNetwork(ctx context.Context, k8sClientset clientset.I
var nbdbAddress string

if endpointList == nil || len(endpointList.Items) == 0 {
// Kind setup will not have endpoints when using socket mode.
nbdbAddress = defaultOVNUnixSocket
} else {
nbdbAddress = createClusterNetworkWithEndpoints(endpointList.Items, zoneName)
nbdbAddress, err = createClusterNetworkWithEndpoints(endpointList.Items, zoneName)
if err != nil {
return "", err
}
}

return nbdbAddress, nil
}

func createClusterNetworkWithEndpoints(endPoints []corev1.Endpoints, zoneName string) string {
func createClusterNetworkWithEndpoints(endPoints []corev1.Endpoints, zoneName string) (string, error) {
for index := range endPoints {
for _, subset := range endPoints[index].Subsets {
if strings.Contains(endPoints[index].Name, zoneName) {
for _, port := range subset.Ports {
if strings.Contains(port.Name, "north") && net.IsIPv4String(subset.Addresses[0].IP) {
return fmt.Sprintf("%s:%s:%d",
port.Protocol, subset.Addresses[0].IP, ovnNBDBDefaultPort)
port.Protocol, subset.Addresses[0].IP, ovnNBDBDefaultPort), nil
}
}
}
}
}

// Openshift will have endpoints but not related to ovn DB when using IC.
return defaultOVNOpenshiftUnixSocket
return "", fmt.Errorf("error finding an endpoint for the zone %q", zoneName)
}

func findEndpoint(ctx context.Context, k8sClientset clientset.Interface, endpointNameSpace string) (*corev1.EndpointsList, error) {
Expand Down Expand Up @@ -317,3 +338,22 @@ func FindPod(ctx context.Context, k8sClientset clientset.Interface, labelSelecto

return &podsList.Items[0], nil
}

//nolint:nilnil // Intentional as the purpose is to find.
func FindOpenshiftNetwork(ctx context.Context, dynamicClient dynamic.Interface) (*unstructured.Unstructured, error) {
networkClient := dynamicClient.Resource(
schema.GroupVersionResource{
Group: "config.openshift.io",
Version: "v1",
Resource: "networks",
},
)

networks, err := networkClient.Get(ctx, "cluster", metav1.GetOptions{})

if apiError.IsNotFound(err) {
return nil, nil
}

return networks, errors.Wrapf(err, "error finding the openshift network CR")
}
7 changes: 5 additions & 2 deletions pkg/routeagent_driver/handlers/ovn/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/submariner-io/submariner/pkg/iptables"
"github.com/submariner-io/submariner/pkg/netlink"
"github.com/submariner-io/submariner/pkg/routeagent_driver/environment"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)
Expand All @@ -44,6 +45,7 @@ type Handler struct {
config *environment.Specification
smClient clientset.Interface
k8sClientset *kubernetes.Clientset
dynamicClient dynamic.Interface
watcherConfig *watcher.Config
cableRoutingInterface *net.Interface
localEndpoint *submV1.Endpoint
Expand All @@ -59,7 +61,7 @@ type Handler struct {
var logger = log.Logger{Logger: logf.Log.WithName("OVN")}

func NewHandler(env *environment.Specification, smClientSet clientset.Interface, k8sClientset *kubernetes.Clientset,
watcherConfig *watcher.Config,
dynamicClient dynamic.Interface, watcherConfig *watcher.Config,
) *Handler {
// We'll panic if env is nil, this is intentional
ipt, err := iptables.New()
Expand All @@ -71,6 +73,7 @@ func NewHandler(env *environment.Specification, smClientSet clientset.Interface,
config: env,
smClient: smClientSet,
k8sClientset: k8sClientset,
dynamicClient: dynamicClient,
watcherConfig: watcherConfig,
remoteEndpoints: map[string]*submV1.Endpoint{},
netlink: netlink.New(),
Expand All @@ -97,7 +100,7 @@ func (ovn *Handler) Init() error {

ovn.startRouteConfigSyncer(ovn.stopCh)

connectionHandler := NewConnectionHandler(ovn.k8sClientset)
connectionHandler := NewConnectionHandler(ovn.k8sClientset, ovn.dynamicClient)

err = connectionHandler.initClients()
if err != nil {
Expand Down
8 changes: 7 additions & 1 deletion pkg/routeagent_driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/submariner-io/submariner/pkg/routeagent_driver/handlers/mtu"
"github.com/submariner-io/submariner/pkg/routeagent_driver/handlers/ovn"
"github.com/submariner-io/submariner/pkg/versions"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -96,6 +97,11 @@ func main() {
logger.Fatalf("Error building clientset: %s", err.Error())
}

dynamicClientSet, err := dynamic.NewForConfig(cfg)
if err != nil {
logger.Fatalf("Error building dynamic client: %s", err.Error())
}

err = v1.AddToScheme(scheme.Scheme)
logger.FatalOnError(err, "Error adding submariner to the scheme")

Expand All @@ -116,7 +122,7 @@ func main() {
if err := registry.AddHandlers(
eventlogger.NewHandler(),
kubeproxy.NewSyncHandler(env.ClusterCidr, env.ServiceCidr),
ovn.NewHandler(&env, smClientset, k8sClientSet, config),
ovn.NewHandler(&env, smClientset, k8sClientSet, dynamicClientSet, config),
ovn.NewGatewayRouteHandler(&env, smClientset),
ovn.NewNonGatewayRouteHandler(smClientset, k8sClientSet),
cabledriver.NewXRFMCleanupHandler(),
Expand Down

0 comments on commit 69701d3

Please sign in to comment.