Skip to content

Commit

Permalink
[wip]
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Schuppert <[email protected]>
  • Loading branch information
stuggi committed Nov 5, 2024
1 parent 3bc5750 commit 8d12e61
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 158 deletions.
257 changes: 102 additions & 155 deletions controllers/ovndbcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,8 +613,8 @@ func (r *OVNDBClusterReconciler) reconcileNormal(ctx context.Context, instance *
for _, svc := range svcList.Items {
svcPort = svc.Spec.Ports[0].Port

// Filter out headless services
if svc.Spec.ClusterIP != "None" {
// Filter out headless and loadbalancer services
if svc.Spec.ClusterIP != "None" || svc.Spec.Type == corev1.ServiceTypeLoadBalancer {
internalDbAddress = append(internalDbAddress, fmt.Sprintf("%s:%s.%s.svc.%s:%d", scheme, svc.Name, svc.Namespace, ovnv1.DNSSuffix, svcPort))
}
}
Expand Down Expand Up @@ -670,190 +670,139 @@ func (r *OVNDBClusterReconciler) reconcileServices(

Log.Info("Reconciling OVN DB Cluster Service")

var svc *corev1.Service
// create service using the
svcOverride := instance.Spec.Override.Service
if svcOverride != nil {
if svcOverride.EmbeddedLabelsAnnotations == nil {
svcOverride.EmbeddedLabelsAnnotations = &service.EmbeddedLabelsAnnotations{}
}
if svcOverride.Spec == nil {
svcOverride.Spec = &service.OverrideServiceSpec{}
}
}

//
// Ensure the ovndbcluster service Exists, if no override is provided, a headless service gets created
//
ssvc, err := service.NewService(
ovndbcluster.HeadlessService(serviceName, instance, serviceLabels, serviceLabels),
time.Duration(5)*time.Second,
svcOverride,
)
if err != nil {
return ctrl.Result{}, err
}

// add annotation to register service name in dnsmasq
if ssvc.GetServiceType() == corev1.ServiceTypeLoadBalancer {
ssvc.AddAnnotation(map[string]string{
service.AnnotationHostnameKey: ssvc.GetServiceHostname(),
})
}

ctrlResult, err := ssvc.CreateOrPatch(ctx, helper)
if err != nil {
return ctrl.Result{}, err
} else if (ctrlResult != ctrl.Result{}) {
return ctrl.Result{}, nil
}

// for backward compatability keep the current behavior, at least for now
if instance.Spec.Override.Service == nil {
podList, err := ovndbcluster.OVNDBPods(ctx, instance, helper, serviceLabels)
if err != nil {
return ctrl.Result{}, err
}

for _, ovnPod := range podList.Items {
//
// Ensure the ovndbcluster headless service Exists
// Create the ovndbcluster pod service if none exists
//
headlessServiceLabels := util.MergeMaps(serviceLabels, map[string]string{"type": ovnv1.ServiceHeadlessType})

headlesssvc, err := service.NewService(
ovndbcluster.HeadlessService(serviceName, instance, headlessServiceLabels, serviceLabels),
ovndbSelectorLabels := map[string]string{
common.AppSelector: serviceName,
"statefulset.kubernetes.io/pod-name": ovnPod.Name,
}
ovndbServiceLabels := util.MergeMaps(ovndbSelectorLabels, map[string]string{"type": ovnv1.ServiceClusterType})
svc, err := service.NewService(
ovndbcluster.Service(ovnPod.Name, instance, ovndbServiceLabels, ovndbSelectorLabels),
time.Duration(5)*time.Second,
nil,
)
if err != nil {
return ctrl.Result{}, err
}

ctrlResult, err := headlesssvc.CreateOrPatch(ctx, helper)
ctrlResult, err := svc.CreateOrPatch(ctx, helper)
if err != nil {
return ctrl.Result{}, err
} else if (ctrlResult != ctrl.Result{}) {
return ctrl.Result{}, nil
}
// create service - end
}

podList, err := ovndbcluster.OVNDBPods(ctx, instance, helper, serviceLabels)
if err != nil {
return ctrl.Result{}, err
}

for _, ovnPod := range podList.Items {
//
// Create the ovndbcluster pod service if none exists
//
ovndbSelectorLabels := map[string]string{
// Delete any extra services left after scale down
clusterServiceLabels := util.MergeMaps(serviceLabels, map[string]string{"type": ovnv1.ServiceClusterType})
svcList, err := service.GetServicesListWithLabel(
ctx,
helper,
helper.GetBeforeObject().GetNamespace(),
clusterServiceLabels,
)
if err == nil && len(svcList.Items) > int(*(instance.Spec.Replicas)) {
for i := len(svcList.Items) - 1; i >= int(*(instance.Spec.Replicas)); i-- {
fullServiceName := fmt.Sprintf("%s-%d", serviceName, i)
svcLabels := map[string]string{
common.AppSelector: serviceName,
"statefulset.kubernetes.io/pod-name": ovnPod.Name,
}
ovndbServiceLabels := util.MergeMaps(ovndbSelectorLabels, map[string]string{"type": ovnv1.ServiceClusterType})
svc, err := service.NewService(
ovndbcluster.Service(ovnPod.Name, instance, ovndbServiceLabels, ovndbSelectorLabels),
time.Duration(5)*time.Second,
nil,
)
if err != nil {
return ctrl.Result{}, err
"statefulset.kubernetes.io/pod-name": fullServiceName,
}
ctrlResult, err := svc.CreateOrPatch(ctx, helper)
if err != nil {
return ctrl.Result{}, err
} else if (ctrlResult != ctrl.Result{}) {
return ctrl.Result{}, nil
}
// create service - end
}

// Delete any extra services left after scale down
clusterServiceLabels := util.MergeMaps(serviceLabels, map[string]string{"type": ovnv1.ServiceClusterType})
svcList, err := service.GetServicesListWithLabel(
ctx,
helper,
helper.GetBeforeObject().GetNamespace(),
clusterServiceLabels,
)
if err == nil && len(svcList.Items) > int(*(instance.Spec.Replicas)) {
for i := len(svcList.Items) - 1; i >= int(*(instance.Spec.Replicas)); i-- {
fullServiceName := fmt.Sprintf("%s-%d", serviceName, i)
svcLabels := map[string]string{
common.AppSelector: serviceName,
"statefulset.kubernetes.io/pod-name": fullServiceName,
}
err = service.DeleteServicesWithLabel(
ctx,
helper,
instance,
svcLabels,
)
if err != nil {
err = fmt.Errorf("error while deleting service with name %s: %w", fullServiceName, err)
return ctrl.Result{}, err
}
}
}

// When the cluster is attached to an external network, create DNS record for every
// cluster member so it can be resolved from outside cluster (edpm nodes)
if instance.Spec.NetworkAttachment != "" {
var dnsIPsList []string
// TODO(averdagu): use built in Min once go1.21 is used
minLen := ovn_common.Min(len(podList.Items), int(*(instance.Spec.Replicas)))
for _, ovnPod := range podList.Items[:minLen] {
svc, err = service.GetServiceWithName(
ctx,
helper,
ovnPod.Name,
ovnPod.Namespace,
)
if err != nil {
return ctrl.Result{}, err
}

dnsIP, err := getPodIPInNetwork(ovnPod, instance.Namespace, instance.Spec.NetworkAttachment)
dnsIPsList = append(dnsIPsList, dnsIP)
if err != nil {
return ctrl.Result{}, err
}
}
// DNSData info is called every reconcile loop to ensure that even if a pod gets
// restarted and it's IP has changed, the DNSData CR will have the correct info.
// If nothing changed this won't modify the current dnsmasq pod.
err = ovndbcluster.DNSData(
err = service.DeleteServicesWithLabel(
ctx,
helper,
serviceName,
dnsIPsList,
instance,
serviceLabels,
svcLabels,
)
if err != nil {
err = fmt.Errorf("error while deleting service with name %s: %w", fullServiceName, err)
return ctrl.Result{}, err
}
// It can be possible that not all pods are ready, so DNSData won't
// have complete information, return error to retrigger reconcile loop
// Returning here instead of at the beggining of the for is done to
// expose the already created pods to other services/dataplane nodes
if len(podList.Items) < int(*(instance.Spec.Replicas)) {
Log.Info(fmt.Sprintf("not all pods are yet created, number of expected pods: %v, current pods: %v", *(instance.Spec.Replicas), len(podList.Items)))
return ctrl.Result{RequeueAfter: 1 * time.Second}, nil
}
}
} else {
// create service using the
svcOverride := instance.Spec.Override.Service
if svcOverride.EmbeddedLabelsAnnotations == nil {
svcOverride.EmbeddedLabelsAnnotations = &service.EmbeddedLabelsAnnotations{}
}
if svcOverride.Spec == nil {
svcOverride.Spec = &service.OverrideServiceSpec{}
}

// Create the service
svc = ovndbcluster.Service(serviceName, instance, serviceLabels, serviceLabels)
// make sure that connections from a particular client are passed to the same Pod each time
svc.Spec.SessionAffinity = corev1.ServiceAffinityClientIP
ssvc, err := service.NewService(
svc,
time.Duration(5)*time.Second,
svcOverride,
)
if err != nil {
return ctrl.Result{}, err
}

// add annotation to register service name in dnsmasq
if ssvc.GetServiceType() == corev1.ServiceTypeLoadBalancer {
ssvc.AddAnnotation(map[string]string{
service.AnnotationHostnameKey: ssvc.GetServiceHostname(),
})
}

ctrlResult, err := ssvc.CreateOrPatch(ctx, helper)
if err != nil {
return ctrl.Result{}, err
} else if (ctrlResult != ctrl.Result{}) {
return ctrl.Result{}, nil
}
}

// cleanup ovndbcluster pod services if migrate to LB service
svcLabels := map[string]string{
common.AppSelector: serviceName,
"statefulset.kubernetes.io/pod-name": "",
// When the cluster is attached to an external network, create DNS record for every
// cluster member so it can be resolved from outside cluster (edpm nodes)
if instance.Spec.NetworkAttachment != "" && ssvc.GetServiceType() != corev1.ServiceTypeLoadBalancer {
var dnsIPsList []string
// TODO(averdagu): use built in Min once go1.21 is used
minLen := ovn_common.Min(len(podList.Items), int(*(instance.Spec.Replicas)))
for _, ovnPod := range podList.Items[:minLen] {
dnsIP, err := getPodIPInNetwork(ovnPod, instance.Namespace, instance.Spec.NetworkAttachment)
dnsIPsList = append(dnsIPsList, dnsIP)
if err != nil {
return ctrl.Result{}, err
}
}
err = service.DeleteServicesWithLabel(
// DNSData info is called every reconcile loop to ensure that even if a pod gets
// restarted and it's IP has changed, the DNSData CR will have the correct info.
// If nothing changed this won't modify the current dnsmasq pod.
err = ovndbcluster.DNSData(
ctx,
helper,
serviceName,
dnsIPsList,
instance,
svcLabels,
serviceLabels,
)
if err != nil {
err = fmt.Errorf("error while deleting service with label %s: %w", serviceName, err)
return ctrl.Result{}, err
}

// cleanup dnsData if this is a migration to use LB service instead
// It can be possible that not all pods are ready, so DNSData won't
// have complete information, return error to retrigger reconcile loop
// Returning here instead of at the beggining of the for is done to
// expose the already created pods to other services/dataplane nodes
if len(podList.Items) < int(*(instance.Spec.Replicas)) {
Log.Info(fmt.Sprintf("not all pods are yet created, number of expected pods: %v, current pods: %v", *(instance.Spec.Replicas), len(podList.Items)))
return ctrl.Result{RequeueAfter: 1 * time.Second}, nil
}
} else {
// cleanup dnsData either if there are no NAD, or LB k8s service
dnsdata := &infranetworkv1.DNSData{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Expand All @@ -865,16 +814,14 @@ func (r *OVNDBClusterReconciler) reconcileServices(
if err != nil && !k8s_errors.IsNotFound(err) {
return ctrl.Result{}, fmt.Errorf("error deleting dnsdata %s: %w", serviceName, err)
}

// create service - end
}

// dbAddress will contain ovsdbserver-(nb|sb).openstack.svc or empty
scheme := "tcp"
if instance.Spec.TLS.Enabled() {
scheme = "ssl"
}
instance.Status.DBAddress = ovndbcluster.GetDBAddress(svc, serviceName, instance.Namespace, scheme)
instance.Status.DBAddress = ovndbcluster.GetDBAddress(ssvc.GetSpec(), serviceName, instance.Namespace, scheme)

Log.Info("Reconciled OVN DB Cluster Service successfully")
return ctrl.Result{}, nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/ovndbcluster/dnsdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ func DNSData(
}

// GetDBAddress - return string connection for the given service
func GetDBAddress(svc *corev1.Service, serviceName string, namespace string, scheme string) string {
if svc == nil {
func GetDBAddress(svcSpec *corev1.ServiceSpec, serviceName string, namespace string, scheme string) string {
if svcSpec == nil {
return ""
}
headlessDNSHostname := serviceName + "." + namespace + ".svc"
return fmt.Sprintf("%s:%s:%d", scheme, headlessDNSHostname, svc.Spec.Ports[0].Port)
return fmt.Sprintf("%s:%s:%d", scheme, headlessDNSHostname, svcSpec.Ports[0].Port)
}

0 comments on commit 8d12e61

Please sign in to comment.