diff --git a/pkg/agent/controller/clusterip_service_test.go b/pkg/agent/controller/clusterip_service_test.go index bf7255b6..9713e790 100644 --- a/pkg/agent/controller/clusterip_service_test.go +++ b/pkg/agent/controller/clusterip_service_test.go @@ -27,6 +27,7 @@ import ( "github.com/submariner-io/admiral/pkg/resource" "github.com/submariner-io/admiral/pkg/syncer/test" testutil "github.com/submariner-io/admiral/pkg/test" + "github.com/submariner-io/lighthouse/pkg/agent/controller" "github.com/submariner-io/lighthouse/pkg/constants" corev1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" @@ -369,6 +370,12 @@ func testClusterIPServiceInOneCluster() { } func testClusterIPServiceInTwoClusters() { + noConflictCondition := &mcsv1a1.ServiceExportCondition{ + Type: mcsv1a1.ServiceExportConflict, + Status: corev1.ConditionFalse, + Reason: ptr.To(""), + } + var t *testDriver BeforeEach(func() { @@ -417,7 +424,7 @@ func testClusterIPServiceInTwoClusters() { It("should correctly set the ports in the aggregated ServiceImport and set the Conflict status condition", func() { t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2) - condition := newServiceExportConflictCondition("ConflictingPorts") + condition := newServiceExportConflictCondition(controller.PortConflictReason) t.cluster1.awaitServiceExportCondition(condition) t.cluster2.awaitServiceExportCondition(condition) }) @@ -431,22 +438,22 @@ func testClusterIPServiceInTwoClusters() { t.awaitNoEndpointSlice(&t.cluster1) t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster2.service.Name, t.cluster2.service.Namespace, &t.cluster2) - t.cluster2.awaitNoServiceExportCondition(mcsv1a1.ServiceExportConflict) + t.cluster2.awaitServiceExportCondition(noConflictCondition) }) }) Context("initially and after updating the ports to match", func() { It("should correctly update the ports in the aggregated ServiceImport and clear the Conflict status condition", func() { t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2) - t.cluster1.awaitServiceExportCondition(newServiceExportConflictCondition("ConflictingPorts")) + t.cluster1.awaitServiceExportCondition(newServiceExportConflictCondition(controller.PortConflictReason)) t.aggregatedServicePorts = []mcsv1a1.ServicePort{port1, port2} t.cluster2.service.Spec.Ports = []corev1.ServicePort{toServicePort(port1), toServicePort(port2)} t.cluster2.updateService() t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2) - t.cluster1.awaitNoServiceExportCondition(mcsv1a1.ServiceExportConflict) - t.cluster2.awaitNoServiceExportCondition(mcsv1a1.ServiceExportConflict) + t.cluster1.awaitServiceExportCondition(noConflictCondition) + t.cluster2.awaitServiceExportCondition(noConflictCondition) }) }) }) @@ -460,20 +467,20 @@ func testClusterIPServiceInTwoClusters() { t.cluster2.ensureNoEndpointSlice() t.awaitNonHeadlessServiceExported(&t.cluster1) - t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition("ConflictingType")) - t.cluster2.awaitServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionFalse, "ExportFailed")) + t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition(controller.TypeConflictReason)) + t.cluster2.awaitServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionFalse, controller.ExportFailedReason)) t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) }) Context("initially and after updating the service types to match", func() { It("should export the service in both clusters", func() { - t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition("ConflictingType")) + t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition(controller.TypeConflictReason)) t.cluster2.service.Spec.ClusterIP = t.cluster2.serviceIP t.cluster2.updateService() t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2) - t.cluster2.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) + t.cluster2.awaitServiceExportCondition(noConflictCondition) }) }) }) diff --git a/pkg/agent/controller/controller_suite_test.go b/pkg/agent/controller/controller_suite_test.go index cdc151a9..8c4fb5a0 100644 --- a/pkg/agent/controller/controller_suite_test.go +++ b/pkg/agent/controller/controller_suite_test.go @@ -584,18 +584,6 @@ func (c *cluster) ensureNoServiceExportCondition(condType mcsv1a1.ServiceExportC } } -func (c *cluster) awaitNoServiceExportCondition(condType mcsv1a1.ServiceExportConditionType, serviceExports ...*mcsv1a1.ServiceExport) { - if len(serviceExports) == 0 { - serviceExports = []*mcsv1a1.ServiceExport{c.serviceExport} - } - - for _, se := range serviceExports { - Eventually(func() interface{} { - return c.retrieveServiceExportCondition(se, condType) - }).Should(BeNil(), "Unexpected ServiceExport status condition") - } -} - func (c *cluster) awaitServiceUnavailableStatus() { c.awaitServiceExportCondition(newServiceExportValidCondition(corev1.ConditionFalse, "ServiceUnavailable")) } diff --git a/pkg/agent/controller/endpoint_slice.go b/pkg/agent/controller/endpoint_slice.go index 002533f1..6ef1b35c 100644 --- a/pkg/agent/controller/endpoint_slice.go +++ b/pkg/agent/controller/endpoint_slice.go @@ -185,7 +185,7 @@ func (c *EndpointSliceController) onLocalEndpointSliceSynced(obj runtime.Object, err = c.serviceImportAggregator.updateOnCreateOrUpdate(ctx, serviceName, serviceNamespace) if err != nil { c.serviceExportClient.UpdateStatusConditions(ctx, serviceName, serviceNamespace, - newServiceExportCondition(constants.ServiceExportReady, corev1.ConditionFalse, exportFailedReason, + newServiceExportCondition(constants.ServiceExportReady, corev1.ConditionFalse, ExportFailedReason, fmt.Sprintf("Unable to export: %v", err))) } else { c.serviceExportClient.UpdateStatusConditions(ctx, serviceName, serviceNamespace, @@ -264,12 +264,13 @@ func (c *EndpointSliceController) checkForConflicts(_, name, namespace string) ( if conflict { c.serviceExportClient.UpdateStatusConditions(ctx, name, namespace, newServiceExportCondition( - mcsv1a1.ServiceExportConflict, corev1.ConditionTrue, portConflictReason, + mcsv1a1.ServiceExportConflict, corev1.ConditionTrue, PortConflictReason, fmt.Sprintf("The service ports conflict between the constituent clusters %s. "+ "The service will expose the intersection of all the ports: %s", fmt.Sprintf("[%s]", strings.Join(clusterNames, ", ")), servicePortsToString(intersectedServicePorts)))) } else if FindServiceExportStatusCondition(localServiceExport.Status.Conditions, mcsv1a1.ServiceExportConflict) != nil { - c.serviceExportClient.RemoveStatusCondition(ctx, name, namespace, mcsv1a1.ServiceExportConflict, portConflictReason) + c.serviceExportClient.UpdateStatusConditions(ctx, name, namespace, newServiceExportCondition( + mcsv1a1.ServiceExportConflict, corev1.ConditionFalse, PortConflictReason, "")) } return false, nil diff --git a/pkg/agent/controller/service_export_client.go b/pkg/agent/controller/service_export_client.go index 72aa14b0..e2d5a6d5 100644 --- a/pkg/agent/controller/service_export_client.go +++ b/pkg/agent/controller/service_export_client.go @@ -21,17 +21,21 @@ package controller import ( "context" "reflect" + goslices "slices" + "strings" "github.com/pkg/errors" "github.com/submariner-io/admiral/pkg/log" "github.com/submariner-io/admiral/pkg/slices" "github.com/submariner-io/lighthouse/pkg/constants" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/util/retry" + "k8s.io/utils/ptr" mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) @@ -96,12 +100,24 @@ func (c *ServiceExportClient) tryUpdateStatusConditions(ctx context.Context, nam condition := &conditions[i] prevCond := findStatusCondition(toUpdate.Status.Conditions, condition.Type) + if prevCond == nil { + if condition.Type == mcsv1a1.ServiceExportConflict && condition.Status == corev1.ConditionFalse { + continue + } + logger.V(log.DEBUG).Infof("Add status condition for ServiceExport (%s/%s): Type: %q, Status: %q, Reason: %q, Message: %q", namespace, name, condition.Type, condition.Status, *condition.Reason, *condition.Message) toUpdate.Status.Conditions = append(toUpdate.Status.Conditions, *condition) updated = true + } else if condition.Type == mcsv1a1.ServiceExportConflict { + updated = updated || c.mergeConflictCondition(prevCond, condition) + if updated { + logger.V(log.DEBUG).Infof( + "Update status condition for ServiceExport (%s/%s): Type: %q, Status: %q, Reason: %q, Message: %q", + namespace, name, condition.Type, prevCond.Status, *prevCond.Reason, *prevCond.Message) + } } else if serviceExportConditionEqual(prevCond, condition) { logger.V(log.TRACE).Infof("Last ServiceExportCondition for (%s/%s) is equal - not updating status: %#v", namespace, name, prevCond) @@ -118,6 +134,55 @@ func (c *ServiceExportClient) tryUpdateStatusConditions(ctx context.Context, nam }) } +func (c *ServiceExportClient) mergeConflictCondition(to, from *mcsv1a1.ServiceExportCondition) bool { + var reasons, messages []string + + if ptr.Deref(to.Reason, "") != "" { + reasons = strings.Split(ptr.Deref(to.Reason, ""), ",") + } + + if ptr.Deref(to.Message, "") != "" { + messages = strings.Split(ptr.Deref(to.Message, ""), "\n") + } + + index := goslices.Index(reasons, *from.Reason) + if index >= 0 { + if from.Status == corev1.ConditionTrue { + if index < len(messages) { + messages[index] = *from.Message + } + } else { + reasons = goslices.Delete(reasons, index, index+1) + + if index < len(messages) { + messages = goslices.Delete(messages, index, index+1) + } + } + } else if from.Status == corev1.ConditionTrue { + reasons = append(reasons, *from.Reason) + messages = append(messages, *from.Message) + } + + newReason := strings.Join(reasons, ",") + newMessage := strings.Join(messages, "\n") + updated := newReason != ptr.Deref(to.Reason, "") || newMessage != ptr.Deref(to.Message, "") + + to.Reason = ptr.To(newReason) + to.Message = ptr.To(newMessage) + + if *to.Reason != "" { + to.Status = corev1.ConditionTrue + } else { + to.Status = corev1.ConditionFalse + } + + if updated { + to.LastTransitionTime = from.LastTransitionTime + } + + return updated +} + func (c *ServiceExportClient) doUpdate(ctx context.Context, name, namespace string, update func(toUpdate *mcsv1a1.ServiceExport) bool) { err := retry.RetryOnConflict(retry.DefaultRetry, func() error { obj, err := c.Namespace(namespace).Get(ctx, name, metav1.GetOptions{}) diff --git a/pkg/agent/controller/service_export_client_test.go b/pkg/agent/controller/service_export_client_test.go index 548dc53d..95bcda7a 100644 --- a/pkg/agent/controller/service_export_client_test.go +++ b/pkg/agent/controller/service_export_client_test.go @@ -20,6 +20,7 @@ package controller_test import ( "context" + "strings" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -126,4 +127,104 @@ var _ = Describe("ServiceExportClient", func() { constants.ServiceExportReady)).ToNot(BeNil()) }) }) + + Context("with Conflict condition type", func() { + It("should aggregate the different reasons and messages", func() { + // The condition shouldn't be added with Status False. + + serviceExportClient.UpdateStatusConditions(context.TODO(), serviceName, serviceNamespace, mcsv1a1.ServiceExportCondition{ + Type: mcsv1a1.ServiceExportConflict, + Status: corev1.ConditionFalse, + Reason: ptr.To(""), + Message: ptr.To(""), + }) + + Expect(controller.FindServiceExportStatusCondition(getServiceExport().Status.Conditions, + mcsv1a1.ServiceExportConflict)).To(BeNil()) + + portConflictMsg := "The service ports conflict" + typeConflictMsg := "The service types conflict" + + cond := mcsv1a1.ServiceExportCondition{ + Type: mcsv1a1.ServiceExportConflict, + Status: corev1.ConditionTrue, + Reason: ptr.To(controller.PortConflictReason), + Message: ptr.To(portConflictMsg), + } + + // Add first condition reason + + serviceExportClient.UpdateStatusConditions(context.TODO(), serviceName, serviceNamespace, cond) + + Expect(controller.FindServiceExportStatusCondition(getServiceExport().Status.Conditions, cond.Type)).To(Equal(&cond)) + + // Add second condition reason + + cond.Reason = ptr.To(controller.TypeConflictReason) + cond.Message = ptr.To(typeConflictMsg) + + serviceExportClient.UpdateStatusConditions(context.TODO(), serviceName, serviceNamespace, cond) + + actual := controller.FindServiceExportStatusCondition(getServiceExport().Status.Conditions, cond.Type) + Expect(strings.Split(*actual.Reason, ",")).To(HaveExactElements(controller.PortConflictReason, controller.TypeConflictReason)) + Expect(strings.Split(*actual.Message, "\n")).To(HaveExactElements(portConflictMsg, typeConflictMsg)) + Expect(actual.Status).To(Equal(corev1.ConditionTrue)) + + // Update second condition message + + typeConflictMsg = "The service types still conflict" + cond.Message = ptr.To(typeConflictMsg) + + serviceExportClient.UpdateStatusConditions(context.TODO(), serviceName, serviceNamespace, cond) + + actual = controller.FindServiceExportStatusCondition(getServiceExport().Status.Conditions, cond.Type) + Expect(strings.Split(*actual.Reason, ",")).To(HaveExactElements(controller.PortConflictReason, controller.TypeConflictReason)) + Expect(strings.Split(*actual.Message, "\n")).To(HaveExactElements(portConflictMsg, typeConflictMsg)) + Expect(actual.Status).To(Equal(corev1.ConditionTrue)) + + // Resolve first condition + + cond.Reason = ptr.To(controller.PortConflictReason) + cond.Message = ptr.To("") + cond.Status = corev1.ConditionFalse + + serviceExportClient.UpdateStatusConditions(context.TODO(), serviceName, serviceNamespace, cond) + + actual = controller.FindServiceExportStatusCondition(getServiceExport().Status.Conditions, cond.Type) + Expect(*actual.Reason).To(Equal(controller.TypeConflictReason)) + Expect(*actual.Message).To(Equal(typeConflictMsg)) + Expect(actual.Status).To(Equal(corev1.ConditionTrue)) + + // Resolve second condition + + cond.Reason = ptr.To(controller.TypeConflictReason) + + for i := 1; i <= 2; i++ { + serviceExportClient.UpdateStatusConditions(context.TODO(), serviceName, serviceNamespace, cond) + + actual = controller.FindServiceExportStatusCondition(getServiceExport().Status.Conditions, cond.Type) + Expect(actual.Status).To(Equal(corev1.ConditionFalse)) + Expect(*actual.Reason).To(BeEmpty()) + Expect(*actual.Message).To(BeEmpty()) + } + + // Add the first condition back + + serviceExportClient.UpdateStatusConditions(context.TODO(), serviceName, serviceNamespace, mcsv1a1.ServiceExportCondition{ + Type: mcsv1a1.ServiceExportConflict, + Status: corev1.ConditionTrue, + Reason: ptr.To(controller.PortConflictReason), + Message: ptr.To(""), + }, mcsv1a1.ServiceExportCondition{ + Type: mcsv1a1.ServiceExportConflict, + Status: corev1.ConditionFalse, + Reason: ptr.To(controller.TypeConflictReason), + Message: ptr.To(""), + }) + + actual = controller.FindServiceExportStatusCondition(getServiceExport().Status.Conditions, cond.Type) + Expect(*actual.Reason).To(Equal(controller.PortConflictReason)) + Expect(actual.Status).To(Equal(corev1.ConditionTrue)) + }) + }) }) diff --git a/pkg/agent/controller/service_export_failures_test.go b/pkg/agent/controller/service_export_failures_test.go index 89e2db3e..5fceef75 100644 --- a/pkg/agent/controller/service_export_failures_test.go +++ b/pkg/agent/controller/service_export_failures_test.go @@ -23,6 +23,7 @@ import ( . "github.com/onsi/ginkgo/v2" "github.com/submariner-io/admiral/pkg/fake" + "github.com/submariner-io/lighthouse/pkg/agent/controller" "github.com/submariner-io/lighthouse/pkg/constants" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -67,7 +68,7 @@ var _ = Describe("Service export failures", func() { }) It("should eventually export the service", func() { - t.cluster1.awaitServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionFalse, "ExportFailed")) + t.cluster1.awaitServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionFalse, controller.ExportFailedReason)) t.brokerServiceImportReactor.SetFailOnCreate(nil) t.awaitNonHeadlessServiceExported(&t.cluster1) @@ -80,7 +81,7 @@ var _ = Describe("Service export failures", func() { }) It("should eventually export the service", func() { - t.cluster1.awaitServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionFalse, "ExportFailed")) + t.cluster1.awaitServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionFalse, controller.ExportFailedReason)) t.brokerServiceImportReactor.SetFailOnUpdate(nil) t.awaitNonHeadlessServiceExported(&t.cluster1) @@ -131,7 +132,7 @@ var _ = Describe("Service export failures", func() { }) It("should eventually export the service", func() { - t.cluster1.awaitServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionFalse, "ExportFailed")) + t.cluster1.awaitServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionFalse, controller.ExportFailedReason)) t.brokerEndpointSliceReactor.SetFailOnList(nil) t.awaitNonHeadlessServiceExported(&t.cluster1) diff --git a/pkg/agent/controller/service_import.go b/pkg/agent/controller/service_import.go index 03c26c00..a53b8c13 100644 --- a/pkg/agent/controller/service_import.go +++ b/pkg/agent/controller/service_import.go @@ -329,16 +329,16 @@ func (c *ServiceImportController) Distribute(ctx context.Context, obj runtime.Ob if localServiceImport.Spec.Type != existing.Spec.Type { conflict = true conflictCondition := newServiceExportCondition( - mcsv1a1.ServiceExportConflict, corev1.ConditionTrue, typeConflictReason, + mcsv1a1.ServiceExportConflict, corev1.ConditionTrue, TypeConflictReason, fmt.Sprintf("The service type %q does not match the type (%q) of the existing service export", localServiceImport.Spec.Type, existing.Spec.Type)) c.serviceExportClient.UpdateStatusConditions(ctx, serviceName, serviceNamespace, conflictCondition, newServiceExportCondition(constants.ServiceExportReady, - corev1.ConditionFalse, exportFailedReason, "Unable to export due to an irresolvable conflict")) + corev1.ConditionFalse, ExportFailedReason, "Unable to export due to an irresolvable conflict")) } else { - c.serviceExportClient.RemoveStatusCondition(ctx, serviceName, serviceNamespace, mcsv1a1.ServiceExportConflict, - typeConflictReason) + c.serviceExportClient.UpdateStatusConditions(ctx, serviceName, serviceNamespace, newServiceExportCondition( + mcsv1a1.ServiceExportConflict, corev1.ConditionFalse, TypeConflictReason, "")) if existing.Spec.SessionAffinity == "" || existing.Spec.SessionAffinity == corev1.ServiceAffinityNone { existing.Spec.SessionAffinity = localServiceImport.Spec.SessionAffinity @@ -368,7 +368,7 @@ func (c *ServiceImportController) Distribute(ctx context.Context, obj runtime.Ob if err != nil { c.serviceExportClient.UpdateStatusConditions(ctx, serviceName, serviceNamespace, newServiceExportCondition(constants.ServiceExportReady, - corev1.ConditionFalse, exportFailedReason, fmt.Sprintf("Unable to export: %v", err))) + corev1.ConditionFalse, ExportFailedReason, fmt.Sprintf("Unable to export: %v", err))) } if result == util.OperationResultCreated { diff --git a/pkg/agent/controller/types.go b/pkg/agent/controller/types.go index fe054b7b..f81c02ae 100644 --- a/pkg/agent/controller/types.go +++ b/pkg/agent/controller/types.go @@ -36,9 +36,9 @@ import ( ) const ( - exportFailedReason = "ExportFailed" - typeConflictReason = "ConflictingType" - portConflictReason = "ConflictingPorts" + ExportFailedReason = "ExportFailed" + TypeConflictReason = "ConflictingType" + PortConflictReason = "ConflictingPorts" ) type EndpointSliceListerFn func(selector k8slabels.Selector) []runtime.Object