From 098cdcc77763108c5627746c3eedb22d01163f6a Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Thu, 22 Aug 2024 09:00:27 -0400 Subject: [PATCH] Aggregate ServiceExport conflict conditions It's possible there could be more than one concurrent conflict however the MCS API only defines one conflict type so a second conflict would overwrite the first. We could define our own conflict types but the MCS spec does specifically reference the ServiceExportConflict type, defined as "Conflict", in the conflict resolution section. It doesn't mention what the behavior should be if there's more than one conflict. To represent multiple conflicts, We can still use the ServiceExportConflict type but aggregate the differing condition reasons and messages. For the Reason field, join multiple reasons separated by a comma and, for the Message field, join with a new line. When a particular conflict is resolved, remove it's reason and message. When there's no more conflicts, ie Reason is empty, set the condition Status to False. Signed-off-by: Tom Pantelis --- .../controller/clusterip_service_test.go | 25 +++-- pkg/agent/controller/controller_suite_test.go | 12 --- pkg/agent/controller/endpoint_slice.go | 7 +- pkg/agent/controller/service_export_client.go | 65 +++++++++++ .../controller/service_export_client_test.go | 101 ++++++++++++++++++ .../service_export_failures_test.go | 7 +- pkg/agent/controller/service_import.go | 10 +- pkg/agent/controller/types.go | 6 +- 8 files changed, 198 insertions(+), 35 deletions(-) diff --git a/pkg/agent/controller/clusterip_service_test.go b/pkg/agent/controller/clusterip_service_test.go index bf7255b61..9713e7904 100644 --- a/pkg/agent/controller/clusterip_service_test.go +++ b/pkg/agent/controller/clusterip_service_test.go @@ -27,6 +27,7 @@ import ( "github.com/submariner-io/admiral/pkg/resource" "github.com/submariner-io/admiral/pkg/syncer/test" testutil "github.com/submariner-io/admiral/pkg/test" + "github.com/submariner-io/lighthouse/pkg/agent/controller" "github.com/submariner-io/lighthouse/pkg/constants" corev1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" @@ -369,6 +370,12 @@ func testClusterIPServiceInOneCluster() { } func testClusterIPServiceInTwoClusters() { + noConflictCondition := &mcsv1a1.ServiceExportCondition{ + Type: mcsv1a1.ServiceExportConflict, + Status: corev1.ConditionFalse, + Reason: ptr.To(""), + } + var t *testDriver BeforeEach(func() { @@ -417,7 +424,7 @@ func testClusterIPServiceInTwoClusters() { It("should correctly set the ports in the aggregated ServiceImport and set the Conflict status condition", func() { t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2) - condition := newServiceExportConflictCondition("ConflictingPorts") + condition := newServiceExportConflictCondition(controller.PortConflictReason) t.cluster1.awaitServiceExportCondition(condition) t.cluster2.awaitServiceExportCondition(condition) }) @@ -431,22 +438,22 @@ func testClusterIPServiceInTwoClusters() { t.awaitNoEndpointSlice(&t.cluster1) t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster2.service.Name, t.cluster2.service.Namespace, &t.cluster2) - t.cluster2.awaitNoServiceExportCondition(mcsv1a1.ServiceExportConflict) + t.cluster2.awaitServiceExportCondition(noConflictCondition) }) }) Context("initially and after updating the ports to match", func() { It("should correctly update the ports in the aggregated ServiceImport and clear the Conflict status condition", func() { t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2) - t.cluster1.awaitServiceExportCondition(newServiceExportConflictCondition("ConflictingPorts")) + t.cluster1.awaitServiceExportCondition(newServiceExportConflictCondition(controller.PortConflictReason)) t.aggregatedServicePorts = []mcsv1a1.ServicePort{port1, port2} t.cluster2.service.Spec.Ports = []corev1.ServicePort{toServicePort(port1), toServicePort(port2)} t.cluster2.updateService() t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2) - t.cluster1.awaitNoServiceExportCondition(mcsv1a1.ServiceExportConflict) - t.cluster2.awaitNoServiceExportCondition(mcsv1a1.ServiceExportConflict) + t.cluster1.awaitServiceExportCondition(noConflictCondition) + t.cluster2.awaitServiceExportCondition(noConflictCondition) }) }) }) @@ -460,20 +467,20 @@ func testClusterIPServiceInTwoClusters() { t.cluster2.ensureNoEndpointSlice() t.awaitNonHeadlessServiceExported(&t.cluster1) - t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition("ConflictingType")) - t.cluster2.awaitServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionFalse, "ExportFailed")) + t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition(controller.TypeConflictReason)) + t.cluster2.awaitServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionFalse, controller.ExportFailedReason)) t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) }) Context("initially and after updating the service types to match", func() { It("should export the service in both clusters", func() { - t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition("ConflictingType")) + t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition(controller.TypeConflictReason)) t.cluster2.service.Spec.ClusterIP = t.cluster2.serviceIP t.cluster2.updateService() t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2) - t.cluster2.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) + t.cluster2.awaitServiceExportCondition(noConflictCondition) }) }) }) diff --git a/pkg/agent/controller/controller_suite_test.go b/pkg/agent/controller/controller_suite_test.go index cdc151a93..8c4fb5a0e 100644 --- a/pkg/agent/controller/controller_suite_test.go +++ b/pkg/agent/controller/controller_suite_test.go @@ -584,18 +584,6 @@ func (c *cluster) ensureNoServiceExportCondition(condType mcsv1a1.ServiceExportC } } -func (c *cluster) awaitNoServiceExportCondition(condType mcsv1a1.ServiceExportConditionType, serviceExports ...*mcsv1a1.ServiceExport) { - if len(serviceExports) == 0 { - serviceExports = []*mcsv1a1.ServiceExport{c.serviceExport} - } - - for _, se := range serviceExports { - Eventually(func() interface{} { - return c.retrieveServiceExportCondition(se, condType) - }).Should(BeNil(), "Unexpected ServiceExport status condition") - } -} - func (c *cluster) awaitServiceUnavailableStatus() { c.awaitServiceExportCondition(newServiceExportValidCondition(corev1.ConditionFalse, "ServiceUnavailable")) } diff --git a/pkg/agent/controller/endpoint_slice.go b/pkg/agent/controller/endpoint_slice.go index 002533f11..6ef1b35cd 100644 --- a/pkg/agent/controller/endpoint_slice.go +++ b/pkg/agent/controller/endpoint_slice.go @@ -185,7 +185,7 @@ func (c *EndpointSliceController) onLocalEndpointSliceSynced(obj runtime.Object, err = c.serviceImportAggregator.updateOnCreateOrUpdate(ctx, serviceName, serviceNamespace) if err != nil { c.serviceExportClient.UpdateStatusConditions(ctx, serviceName, serviceNamespace, - newServiceExportCondition(constants.ServiceExportReady, corev1.ConditionFalse, exportFailedReason, + newServiceExportCondition(constants.ServiceExportReady, corev1.ConditionFalse, ExportFailedReason, fmt.Sprintf("Unable to export: %v", err))) } else { c.serviceExportClient.UpdateStatusConditions(ctx, serviceName, serviceNamespace, @@ -264,12 +264,13 @@ func (c *EndpointSliceController) checkForConflicts(_, name, namespace string) ( if conflict { c.serviceExportClient.UpdateStatusConditions(ctx, name, namespace, newServiceExportCondition( - mcsv1a1.ServiceExportConflict, corev1.ConditionTrue, portConflictReason, + mcsv1a1.ServiceExportConflict, corev1.ConditionTrue, PortConflictReason, fmt.Sprintf("The service ports conflict between the constituent clusters %s. "+ "The service will expose the intersection of all the ports: %s", fmt.Sprintf("[%s]", strings.Join(clusterNames, ", ")), servicePortsToString(intersectedServicePorts)))) } else if FindServiceExportStatusCondition(localServiceExport.Status.Conditions, mcsv1a1.ServiceExportConflict) != nil { - c.serviceExportClient.RemoveStatusCondition(ctx, name, namespace, mcsv1a1.ServiceExportConflict, portConflictReason) + c.serviceExportClient.UpdateStatusConditions(ctx, name, namespace, newServiceExportCondition( + mcsv1a1.ServiceExportConflict, corev1.ConditionFalse, PortConflictReason, "")) } return false, nil diff --git a/pkg/agent/controller/service_export_client.go b/pkg/agent/controller/service_export_client.go index 72aa14b03..e2d5a6d59 100644 --- a/pkg/agent/controller/service_export_client.go +++ b/pkg/agent/controller/service_export_client.go @@ -21,17 +21,21 @@ package controller import ( "context" "reflect" + goslices "slices" + "strings" "github.com/pkg/errors" "github.com/submariner-io/admiral/pkg/log" "github.com/submariner-io/admiral/pkg/slices" "github.com/submariner-io/lighthouse/pkg/constants" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/util/retry" + "k8s.io/utils/ptr" mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) @@ -96,12 +100,24 @@ func (c *ServiceExportClient) tryUpdateStatusConditions(ctx context.Context, nam condition := &conditions[i] prevCond := findStatusCondition(toUpdate.Status.Conditions, condition.Type) + if prevCond == nil { + if condition.Type == mcsv1a1.ServiceExportConflict && condition.Status == corev1.ConditionFalse { + continue + } + logger.V(log.DEBUG).Infof("Add status condition for ServiceExport (%s/%s): Type: %q, Status: %q, Reason: %q, Message: %q", namespace, name, condition.Type, condition.Status, *condition.Reason, *condition.Message) toUpdate.Status.Conditions = append(toUpdate.Status.Conditions, *condition) updated = true + } else if condition.Type == mcsv1a1.ServiceExportConflict { + updated = updated || c.mergeConflictCondition(prevCond, condition) + if updated { + logger.V(log.DEBUG).Infof( + "Update status condition for ServiceExport (%s/%s): Type: %q, Status: %q, Reason: %q, Message: %q", + namespace, name, condition.Type, prevCond.Status, *prevCond.Reason, *prevCond.Message) + } } else if serviceExportConditionEqual(prevCond, condition) { logger.V(log.TRACE).Infof("Last ServiceExportCondition for (%s/%s) is equal - not updating status: %#v", namespace, name, prevCond) @@ -118,6 +134,55 @@ func (c *ServiceExportClient) tryUpdateStatusConditions(ctx context.Context, nam }) } +func (c *ServiceExportClient) mergeConflictCondition(to, from *mcsv1a1.ServiceExportCondition) bool { + var reasons, messages []string + + if ptr.Deref(to.Reason, "") != "" { + reasons = strings.Split(ptr.Deref(to.Reason, ""), ",") + } + + if ptr.Deref(to.Message, "") != "" { + messages = strings.Split(ptr.Deref(to.Message, ""), "\n") + } + + index := goslices.Index(reasons, *from.Reason) + if index >= 0 { + if from.Status == corev1.ConditionTrue { + if index < len(messages) { + messages[index] = *from.Message + } + } else { + reasons = goslices.Delete(reasons, index, index+1) + + if index < len(messages) { + messages = goslices.Delete(messages, index, index+1) + } + } + } else if from.Status == corev1.ConditionTrue { + reasons = append(reasons, *from.Reason) + messages = append(messages, *from.Message) + } + + newReason := strings.Join(reasons, ",") + newMessage := strings.Join(messages, "\n") + updated := newReason != ptr.Deref(to.Reason, "") || newMessage != ptr.Deref(to.Message, "") + + to.Reason = ptr.To(newReason) + to.Message = ptr.To(newMessage) + + if *to.Reason != "" { + to.Status = corev1.ConditionTrue + } else { + to.Status = corev1.ConditionFalse + } + + if updated { + to.LastTransitionTime = from.LastTransitionTime + } + + return updated +} + func (c *ServiceExportClient) doUpdate(ctx context.Context, name, namespace string, update func(toUpdate *mcsv1a1.ServiceExport) bool) { err := retry.RetryOnConflict(retry.DefaultRetry, func() error { obj, err := c.Namespace(namespace).Get(ctx, name, metav1.GetOptions{}) diff --git a/pkg/agent/controller/service_export_client_test.go b/pkg/agent/controller/service_export_client_test.go index 548dc53df..95bcda7a7 100644 --- a/pkg/agent/controller/service_export_client_test.go +++ b/pkg/agent/controller/service_export_client_test.go @@ -20,6 +20,7 @@ package controller_test import ( "context" + "strings" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -126,4 +127,104 @@ var _ = Describe("ServiceExportClient", func() { constants.ServiceExportReady)).ToNot(BeNil()) }) }) + + Context("with Conflict condition type", func() { + It("should aggregate the different reasons and messages", func() { + // The condition shouldn't be added with Status False. + + serviceExportClient.UpdateStatusConditions(context.TODO(), serviceName, serviceNamespace, mcsv1a1.ServiceExportCondition{ + Type: mcsv1a1.ServiceExportConflict, + Status: corev1.ConditionFalse, + Reason: ptr.To(""), + Message: ptr.To(""), + }) + + Expect(controller.FindServiceExportStatusCondition(getServiceExport().Status.Conditions, + mcsv1a1.ServiceExportConflict)).To(BeNil()) + + portConflictMsg := "The service ports conflict" + typeConflictMsg := "The service types conflict" + + cond := mcsv1a1.ServiceExportCondition{ + Type: mcsv1a1.ServiceExportConflict, + Status: corev1.ConditionTrue, + Reason: ptr.To(controller.PortConflictReason), + Message: ptr.To(portConflictMsg), + } + + // Add first condition reason + + serviceExportClient.UpdateStatusConditions(context.TODO(), serviceName, serviceNamespace, cond) + + Expect(controller.FindServiceExportStatusCondition(getServiceExport().Status.Conditions, cond.Type)).To(Equal(&cond)) + + // Add second condition reason + + cond.Reason = ptr.To(controller.TypeConflictReason) + cond.Message = ptr.To(typeConflictMsg) + + serviceExportClient.UpdateStatusConditions(context.TODO(), serviceName, serviceNamespace, cond) + + actual := controller.FindServiceExportStatusCondition(getServiceExport().Status.Conditions, cond.Type) + Expect(strings.Split(*actual.Reason, ",")).To(HaveExactElements(controller.PortConflictReason, controller.TypeConflictReason)) + Expect(strings.Split(*actual.Message, "\n")).To(HaveExactElements(portConflictMsg, typeConflictMsg)) + Expect(actual.Status).To(Equal(corev1.ConditionTrue)) + + // Update second condition message + + typeConflictMsg = "The service types still conflict" + cond.Message = ptr.To(typeConflictMsg) + + serviceExportClient.UpdateStatusConditions(context.TODO(), serviceName, serviceNamespace, cond) + + actual = controller.FindServiceExportStatusCondition(getServiceExport().Status.Conditions, cond.Type) + Expect(strings.Split(*actual.Reason, ",")).To(HaveExactElements(controller.PortConflictReason, controller.TypeConflictReason)) + Expect(strings.Split(*actual.Message, "\n")).To(HaveExactElements(portConflictMsg, typeConflictMsg)) + Expect(actual.Status).To(Equal(corev1.ConditionTrue)) + + // Resolve first condition + + cond.Reason = ptr.To(controller.PortConflictReason) + cond.Message = ptr.To("") + cond.Status = corev1.ConditionFalse + + serviceExportClient.UpdateStatusConditions(context.TODO(), serviceName, serviceNamespace, cond) + + actual = controller.FindServiceExportStatusCondition(getServiceExport().Status.Conditions, cond.Type) + Expect(*actual.Reason).To(Equal(controller.TypeConflictReason)) + Expect(*actual.Message).To(Equal(typeConflictMsg)) + Expect(actual.Status).To(Equal(corev1.ConditionTrue)) + + // Resolve second condition + + cond.Reason = ptr.To(controller.TypeConflictReason) + + for i := 1; i <= 2; i++ { + serviceExportClient.UpdateStatusConditions(context.TODO(), serviceName, serviceNamespace, cond) + + actual = controller.FindServiceExportStatusCondition(getServiceExport().Status.Conditions, cond.Type) + Expect(actual.Status).To(Equal(corev1.ConditionFalse)) + Expect(*actual.Reason).To(BeEmpty()) + Expect(*actual.Message).To(BeEmpty()) + } + + // Add the first condition back + + serviceExportClient.UpdateStatusConditions(context.TODO(), serviceName, serviceNamespace, mcsv1a1.ServiceExportCondition{ + Type: mcsv1a1.ServiceExportConflict, + Status: corev1.ConditionTrue, + Reason: ptr.To(controller.PortConflictReason), + Message: ptr.To(""), + }, mcsv1a1.ServiceExportCondition{ + Type: mcsv1a1.ServiceExportConflict, + Status: corev1.ConditionFalse, + Reason: ptr.To(controller.TypeConflictReason), + Message: ptr.To(""), + }) + + actual = controller.FindServiceExportStatusCondition(getServiceExport().Status.Conditions, cond.Type) + Expect(*actual.Reason).To(Equal(controller.PortConflictReason)) + Expect(actual.Status).To(Equal(corev1.ConditionTrue)) + }) + }) }) diff --git a/pkg/agent/controller/service_export_failures_test.go b/pkg/agent/controller/service_export_failures_test.go index 89e2db3ed..5fceef750 100644 --- a/pkg/agent/controller/service_export_failures_test.go +++ b/pkg/agent/controller/service_export_failures_test.go @@ -23,6 +23,7 @@ import ( . "github.com/onsi/ginkgo/v2" "github.com/submariner-io/admiral/pkg/fake" + "github.com/submariner-io/lighthouse/pkg/agent/controller" "github.com/submariner-io/lighthouse/pkg/constants" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -67,7 +68,7 @@ var _ = Describe("Service export failures", func() { }) It("should eventually export the service", func() { - t.cluster1.awaitServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionFalse, "ExportFailed")) + t.cluster1.awaitServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionFalse, controller.ExportFailedReason)) t.brokerServiceImportReactor.SetFailOnCreate(nil) t.awaitNonHeadlessServiceExported(&t.cluster1) @@ -80,7 +81,7 @@ var _ = Describe("Service export failures", func() { }) It("should eventually export the service", func() { - t.cluster1.awaitServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionFalse, "ExportFailed")) + t.cluster1.awaitServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionFalse, controller.ExportFailedReason)) t.brokerServiceImportReactor.SetFailOnUpdate(nil) t.awaitNonHeadlessServiceExported(&t.cluster1) @@ -131,7 +132,7 @@ var _ = Describe("Service export failures", func() { }) It("should eventually export the service", func() { - t.cluster1.awaitServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionFalse, "ExportFailed")) + t.cluster1.awaitServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionFalse, controller.ExportFailedReason)) t.brokerEndpointSliceReactor.SetFailOnList(nil) t.awaitNonHeadlessServiceExported(&t.cluster1) diff --git a/pkg/agent/controller/service_import.go b/pkg/agent/controller/service_import.go index 03c26c006..a53b8c132 100644 --- a/pkg/agent/controller/service_import.go +++ b/pkg/agent/controller/service_import.go @@ -329,16 +329,16 @@ func (c *ServiceImportController) Distribute(ctx context.Context, obj runtime.Ob if localServiceImport.Spec.Type != existing.Spec.Type { conflict = true conflictCondition := newServiceExportCondition( - mcsv1a1.ServiceExportConflict, corev1.ConditionTrue, typeConflictReason, + mcsv1a1.ServiceExportConflict, corev1.ConditionTrue, TypeConflictReason, fmt.Sprintf("The service type %q does not match the type (%q) of the existing service export", localServiceImport.Spec.Type, existing.Spec.Type)) c.serviceExportClient.UpdateStatusConditions(ctx, serviceName, serviceNamespace, conflictCondition, newServiceExportCondition(constants.ServiceExportReady, - corev1.ConditionFalse, exportFailedReason, "Unable to export due to an irresolvable conflict")) + corev1.ConditionFalse, ExportFailedReason, "Unable to export due to an irresolvable conflict")) } else { - c.serviceExportClient.RemoveStatusCondition(ctx, serviceName, serviceNamespace, mcsv1a1.ServiceExportConflict, - typeConflictReason) + c.serviceExportClient.UpdateStatusConditions(ctx, serviceName, serviceNamespace, newServiceExportCondition( + mcsv1a1.ServiceExportConflict, corev1.ConditionFalse, TypeConflictReason, "")) if existing.Spec.SessionAffinity == "" || existing.Spec.SessionAffinity == corev1.ServiceAffinityNone { existing.Spec.SessionAffinity = localServiceImport.Spec.SessionAffinity @@ -368,7 +368,7 @@ func (c *ServiceImportController) Distribute(ctx context.Context, obj runtime.Ob if err != nil { c.serviceExportClient.UpdateStatusConditions(ctx, serviceName, serviceNamespace, newServiceExportCondition(constants.ServiceExportReady, - corev1.ConditionFalse, exportFailedReason, fmt.Sprintf("Unable to export: %v", err))) + corev1.ConditionFalse, ExportFailedReason, fmt.Sprintf("Unable to export: %v", err))) } if result == util.OperationResultCreated { diff --git a/pkg/agent/controller/types.go b/pkg/agent/controller/types.go index fe054b7b7..f81c02ae8 100644 --- a/pkg/agent/controller/types.go +++ b/pkg/agent/controller/types.go @@ -36,9 +36,9 @@ import ( ) const ( - exportFailedReason = "ExportFailed" - typeConflictReason = "ConflictingType" - portConflictReason = "ConflictingPorts" + ExportFailedReason = "ExportFailed" + TypeConflictReason = "ConflictingType" + PortConflictReason = "ConflictingPorts" ) type EndpointSliceListerFn func(selector k8slabels.Selector) []runtime.Object