From 62ec10b3087b25eecfaffc1e7fca49a73ddba417 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Thu, 22 Aug 2024 21:12:31 -0400 Subject: [PATCH] Implement conflict checking for SessionAffinity The MCS spec's conflict resolution policy states that a "conflict will be resolved by assigning precedence based on each ServiceExport's creationTimestamp, from oldest to newest". We don't have a central MCS controller with access to all cluster's ServiceExports but we can store each cluster's ServiceExport creationTimestamp as annotations in the aggregated ServiceImport and use them to resolve conflicts. The SessionAffinity and SessionAffinityConfig fields on the aggregated ServiceImport will be set by the cluster with the oldest timestamp. The other clusters will set a ServiceExportConflict condition if their corresponding local service's fields do not match those on the aggregated ServiceImport. If a local service is updated in any cluster, each cluster re-evaluates the updated aggregated ServiceImport and either clears or sets the conflict conditon. Also if the service from the precedent cluster is unexported, the next precedent cluster will set the fields. Signed-off-by: Tom Pantelis --- .../controller/clusterip_service_test.go | 207 ++++++++++++++++-- pkg/agent/controller/controller_suite_test.go | 60 ++--- pkg/agent/controller/endpoint_slice.go | 2 +- pkg/agent/controller/reconciliation_test.go | 8 +- pkg/agent/controller/service_export_client.go | 27 ++- pkg/agent/controller/service_import.go | 199 +++++++++++++++-- .../controller/service_import_aggregator.go | 2 + .../service_import_migration_test.go | 9 +- pkg/agent/controller/types.go | 8 +- 9 files changed, 444 insertions(+), 78 deletions(-) diff --git a/pkg/agent/controller/clusterip_service_test.go b/pkg/agent/controller/clusterip_service_test.go index 595e0e3a5..9e61af967 100644 --- a/pkg/agent/controller/clusterip_service_test.go +++ b/pkg/agent/controller/clusterip_service_test.go @@ -21,6 +21,7 @@ package controller_test import ( "fmt" "strconv" + "time" . "github.com/onsi/ginkgo/v2" "github.com/submariner-io/admiral/pkg/fake" @@ -201,6 +202,9 @@ func testClusterIPServiceInOneCluster() { t.cluster1.service.Spec.SessionAffinityConfig = &corev1.SessionAffinityConfig{ ClientIP: &corev1.ClientIPConfig{TimeoutSeconds: ptr.To(int32(10))}, } + + t.aggregatedSessionAffinity = t.cluster1.service.Spec.SessionAffinity + t.aggregatedSessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig }) It("should be propagated to the ServiceImport", func() { @@ -388,11 +392,6 @@ func testClusterIPServiceInTwoClusters() { BeforeEach(func() { t = newTestDiver() - - t.cluster2.service.Spec.SessionAffinity = corev1.ServiceAffinityClientIP - t.cluster2.service.Spec.SessionAffinityConfig = &corev1.SessionAffinityConfig{ - ClientIP: &corev1.ClientIPConfig{TimeoutSeconds: ptr.To(int32(10))}, - } }) JustBeforeEach(func() { @@ -400,27 +399,48 @@ func testClusterIPServiceInTwoClusters() { t.cluster1.createService() t.cluster1.createServiceExport() - t.justBeforeEach() + t.cluster1.start(t, *t.syncerConfig) + + // Sleep a little before starting the second cluster to ensure its resource CreationTimestamps will be + // later than the first cluster to ensure conflict checking in deterministic. + time.Sleep(100 * time.Millisecond) t.cluster2.createServiceEndpointSlices() t.cluster2.createService() t.cluster2.createServiceExport() + + t.cluster2.start(t, *t.syncerConfig) }) AfterEach(func() { t.afterEach() }) - It("should export the service in both clusters", func() { - t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2) - t.cluster1.ensureLastServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionTrue, "")) - t.cluster1.ensureLastServiceExportCondition(newServiceExportValidCondition(corev1.ConditionTrue, "")) - t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) - t.cluster2.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) + Context("", func() { + BeforeEach(func() { + t.cluster1.service.Spec.SessionAffinity = corev1.ServiceAffinityClientIP + t.cluster1.service.Spec.SessionAffinityConfig = &corev1.SessionAffinityConfig{ + ClientIP: &corev1.ClientIPConfig{TimeoutSeconds: ptr.To(int32(10))}, + } + + t.cluster2.service.Spec.SessionAffinity = t.cluster1.service.Spec.SessionAffinity + t.cluster2.service.Spec.SessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig - By("Ensure conflict checking does not try to unnecessarily update the ServiceExport status") + t.aggregatedSessionAffinity = t.cluster1.service.Spec.SessionAffinity + t.aggregatedSessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig + }) - t.cluster1.ensureNoServiceExportActions() + It("should export the service in both clusters", func() { + t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2) + t.cluster1.ensureLastServiceExportCondition(newServiceExportReadyCondition(corev1.ConditionTrue, "")) + t.cluster1.ensureLastServiceExportCondition(newServiceExportValidCondition(corev1.ConditionTrue, "")) + t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) + t.cluster2.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) + + By("Ensure conflict checking does not try to unnecessarily update the ServiceExport status") + + t.cluster1.ensureNoServiceExportActions() + }) }) Context("with differing ports", func() { @@ -492,6 +512,165 @@ func testClusterIPServiceInTwoClusters() { }) }) }) + + Context("with differing service SessionAffinity", func() { + BeforeEach(func() { + t.cluster1.service.Spec.SessionAffinity = corev1.ServiceAffinityClientIP + t.aggregatedSessionAffinity = t.cluster1.service.Spec.SessionAffinity + }) + + It("should resolve the conflict and set the Conflict status condition on the conflicting cluster", func() { + t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace, + &t.cluster1, &t.cluster2) + + t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition( + controller.SessionAffinityConflictReason)) + t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) + }) + + Context("initially and after updating the SessionAffinity on the conflicting cluster to match", func() { + It("should clear the Conflict status condition on the conflicting cluster", func() { + t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition( + controller.SessionAffinityConflictReason)) + + By("Updating the SessionAffinity on the service") + + t.cluster2.service.Spec.SessionAffinity = t.cluster1.service.Spec.SessionAffinity + t.cluster2.updateService() + + t.cluster2.awaitServiceExportCondition(noConflictCondition) + }) + }) + + Context("initially and after updating the SessionAffinity on the oldest exporting cluster to match", func() { + It("should clear the Conflict status condition on the conflicting cluster", func() { + t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition( + controller.SessionAffinityConflictReason)) + + By("Updating the SessionAffinity on the service") + + t.cluster1.service.Spec.SessionAffinity = t.cluster2.service.Spec.SessionAffinity + t.cluster1.updateService() + + t.aggregatedSessionAffinity = t.cluster1.service.Spec.SessionAffinity + t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace, + &t.cluster1, &t.cluster2) + + t.cluster2.awaitServiceExportCondition(noConflictCondition) + }) + }) + + Context("initially and after the service on the oldest exporting cluster is unexported", func() { + It("should update the SessionAffinity on the aggregated ServiceImport and clear the Conflict status condition", func() { + t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition( + controller.SessionAffinityConflictReason)) + + By("Unexporting the service") + + t.cluster1.deleteServiceExport() + + t.aggregatedSessionAffinity = t.cluster2.service.Spec.SessionAffinity + t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace, &t.cluster2) + t.cluster2.awaitServiceExportCondition(noConflictCondition) + }) + }) + }) + + Context("with differing service SessionAffinityConfig", func() { + BeforeEach(func() { + t.cluster1.service.Spec.SessionAffinity = corev1.ServiceAffinityClientIP + t.cluster2.service.Spec.SessionAffinity = corev1.ServiceAffinityClientIP + t.aggregatedSessionAffinity = t.cluster1.service.Spec.SessionAffinity + + t.cluster1.service.Spec.SessionAffinityConfig = &corev1.SessionAffinityConfig{ + ClientIP: &corev1.ClientIPConfig{TimeoutSeconds: ptr.To(int32(10))}, + } + t.aggregatedSessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig + }) + + It("should resolve the conflict and set the Conflict status condition on the conflicting cluster", func() { + t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace, + &t.cluster1, &t.cluster2) + + t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition( + controller.SessionAffinityConfigConflictReason)) + t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) + }) + + Context("initially and after updating the SessionAffinityConfig on the conflicting cluster to match", func() { + It("should clear the Conflict status condition on the conflicting cluster", func() { + t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition( + controller.SessionAffinityConfigConflictReason)) + + By("Updating the SessionAffinityConfig on the service") + + t.cluster2.service.Spec.SessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig + t.cluster2.updateService() + + t.cluster2.awaitServiceExportCondition(noConflictCondition) + }) + }) + + Context("initially and after updating the SessionAffinityConfig on the oldest exporting cluster to match", func() { + It("should clear the Conflict status condition on the conflicting cluster", func() { + t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition( + controller.SessionAffinityConfigConflictReason)) + + By("Updating the SessionAffinityConfig on the service") + + t.cluster1.service.Spec.SessionAffinityConfig = t.cluster2.service.Spec.SessionAffinityConfig + t.cluster1.updateService() + + t.aggregatedSessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig + t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace, + &t.cluster1, &t.cluster2) + + t.cluster2.awaitServiceExportCondition(noConflictCondition) + }) + }) + + Context("initially and after the service on the oldest exporting cluster is unexported", func() { + BeforeEach(func() { + t.cluster2.service.Spec.SessionAffinityConfig = &corev1.SessionAffinityConfig{ + ClientIP: &corev1.ClientIPConfig{TimeoutSeconds: ptr.To(int32(20))}, + } + }) + + It("should update the SessionAffinity on the aggregated ServiceImport and clear the Conflict status condition", func() { + t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition( + controller.SessionAffinityConfigConflictReason)) + + By("Unexporting the service") + + t.cluster1.deleteServiceExport() + + t.aggregatedSessionAffinityConfig = t.cluster2.service.Spec.SessionAffinityConfig + t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace, &t.cluster2) + t.cluster2.awaitServiceExportCondition(noConflictCondition) + }) + }) + }) + + Context("with differing service SessionAffinity and SessionAffinityConfig", func() { + BeforeEach(func() { + t.cluster1.service.Spec.SessionAffinity = corev1.ServiceAffinityClientIP + t.aggregatedSessionAffinity = t.cluster1.service.Spec.SessionAffinity + + t.cluster1.service.Spec.SessionAffinityConfig = &corev1.SessionAffinityConfig{ + ClientIP: &corev1.ClientIPConfig{TimeoutSeconds: ptr.To(int32(10))}, + } + t.aggregatedSessionAffinityConfig = t.cluster1.service.Spec.SessionAffinityConfig + }) + + It("should resolve the conflicts and set the Conflict status condition on the conflicting cluster", func() { + t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace, + &t.cluster1, &t.cluster2) + + t.cluster2.awaitServiceExportCondition(newServiceExportConflictCondition( + fmt.Sprintf("%s,%s", controller.SessionAffinityConflictReason, controller.SessionAffinityConfigConflictReason))) + t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) + }) + }) } func testClusterIPServiceWithMultipleEPS() { diff --git a/pkg/agent/controller/controller_suite_test.go b/pkg/agent/controller/controller_suite_test.go index 576ad76fd..472c46f60 100644 --- a/pkg/agent/controller/controller_suite_test.go +++ b/pkg/agent/controller/controller_suite_test.go @@ -135,16 +135,18 @@ type cluster struct { } type testDriver struct { - cluster1 cluster - cluster2 cluster - brokerServiceImportClient dynamic.NamespaceableResourceInterface - brokerEndpointSliceClient dynamic.ResourceInterface - brokerEndpointSliceReactor *fake.FailingReactor - stopCh chan struct{} - syncerConfig *broker.SyncerConfig - doStart bool - brokerServiceImportReactor *fake.FailingReactor - aggregatedServicePorts []mcsv1a1.ServicePort + cluster1 cluster + cluster2 cluster + brokerServiceImportClient dynamic.NamespaceableResourceInterface + brokerEndpointSliceClient dynamic.ResourceInterface + brokerEndpointSliceReactor *fake.FailingReactor + stopCh chan struct{} + syncerConfig *broker.SyncerConfig + doStart bool + brokerServiceImportReactor *fake.FailingReactor + aggregatedServicePorts []mcsv1a1.ServicePort + aggregatedSessionAffinity corev1.ServiceAffinity + aggregatedSessionAffinityConfig *corev1.SessionAffinityConfig } func newTestDiver() *testDriver { @@ -163,7 +165,8 @@ func newTestDiver() *testDriver { fake.AddBasicReactors(&brokerClient.Fake) t := &testDriver{ - aggregatedServicePorts: []mcsv1a1.ServicePort{port1, port2}, + aggregatedServicePorts: []mcsv1a1.ServicePort{port1, port2}, + aggregatedSessionAffinity: corev1.ServiceAffinityNone, cluster1: cluster{ clusterID: clusterID1, agentSpec: controller.AgentSpecification{ @@ -595,8 +598,7 @@ func (c *cluster) findLocalServiceImport() *mcsv1a1.ServiceImport { for i := range list.Items { if list.Items[i].GetLabels()[mcsv1a1.LabelServiceName] == c.service.Name && list.Items[i].GetLabels()[constants.LabelSourceNamespace] == c.service.Namespace { - serviceImport := &mcsv1a1.ServiceImport{} - Expect(scheme.Scheme.Convert(&list.Items[i], serviceImport, nil)).To(Succeed()) + serviceImport := toServiceImport(&list.Items[i]) return serviceImport } @@ -645,8 +647,7 @@ func awaitServiceImport(client dynamic.NamespaceableResourceInterface, expected return false, nil } - serviceImport = &mcsv1a1.ServiceImport{} - Expect(scheme.Scheme.Convert(obj, serviceImport, nil)).To(Succeed()) + serviceImport = toServiceImport(obj) sortSlices(serviceImport) @@ -667,6 +668,13 @@ func awaitServiceImport(client dynamic.NamespaceableResourceInterface, expected Expect(serviceImport.Labels).To(BeEmpty()) } +func getServiceImport(client dynamic.NamespaceableResourceInterface, namespace, name string) *mcsv1a1.ServiceImport { + obj, err := client.Namespace(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + Expect(err).To(Succeed()) + + return toServiceImport(obj) +} + func findEndpointSlices(client dynamic.ResourceInterface, namespace, name, clusterID string) []*discovery.EndpointSlice { list, err := client.List(context.TODO(), metav1.ListOptions{}) if resource.IsMissingNamespaceErr(err) { @@ -777,9 +785,10 @@ func (t *testDriver) awaitAggregatedServiceImport(sType mcsv1a1.ServiceImportTyp Namespace: test.RemoteNamespace, }, Spec: mcsv1a1.ServiceImportSpec{ - Type: sType, - Ports: []mcsv1a1.ServicePort{}, - SessionAffinity: corev1.ServiceAffinityNone, + Type: sType, + Ports: []mcsv1a1.ServicePort{}, + SessionAffinity: t.aggregatedSessionAffinity, + SessionAffinityConfig: t.aggregatedSessionAffinityConfig, }, } @@ -791,14 +800,6 @@ func (t *testDriver) awaitAggregatedServiceImport(sType mcsv1a1.ServiceImportTyp for _, c := range clusters { expServiceImport.Status.Clusters = append(expServiceImport.Status.Clusters, mcsv1a1.ClusterStatus{Cluster: c.clusterID}) - - if c.service.Spec.SessionAffinity != corev1.ServiceAffinityNone { - expServiceImport.Spec.SessionAffinity = c.service.Spec.SessionAffinity - } - - if c.service.Spec.SessionAffinityConfig != nil { - expServiceImport.Spec.SessionAffinityConfig = c.service.Spec.SessionAffinityConfig - } } } @@ -938,6 +939,13 @@ func toServiceExport(obj interface{}) *mcsv1a1.ServiceExport { return se } +func toServiceImport(obj interface{}) *mcsv1a1.ServiceImport { + si := &mcsv1a1.ServiceImport{} + Expect(scheme.Scheme.Convert(obj, si, nil)).To(Succeed()) + + return si +} + func (t *testDriver) awaitNonHeadlessServiceExported(clusters ...*cluster) { t.awaitServiceExported(mcsv1a1.ClusterSetIP, clusters...) } diff --git a/pkg/agent/controller/endpoint_slice.go b/pkg/agent/controller/endpoint_slice.go index 6ef1b35cd..a2c40192e 100644 --- a/pkg/agent/controller/endpoint_slice.go +++ b/pkg/agent/controller/endpoint_slice.go @@ -268,7 +268,7 @@ func (c *EndpointSliceController) checkForConflicts(_, name, namespace string) ( fmt.Sprintf("The service ports conflict between the constituent clusters %s. "+ "The service will expose the intersection of all the ports: %s", fmt.Sprintf("[%s]", strings.Join(clusterNames, ", ")), servicePortsToString(intersectedServicePorts)))) - } else if FindServiceExportStatusCondition(localServiceExport.Status.Conditions, mcsv1a1.ServiceExportConflict) != nil { + } else if c.serviceExportClient.hasCondition(name, namespace, mcsv1a1.ServiceExportConflict, PortConflictReason) { c.serviceExportClient.UpdateStatusConditions(ctx, name, namespace, newServiceExportCondition( mcsv1a1.ServiceExportConflict, corev1.ConditionFalse, PortConflictReason, "")) } diff --git a/pkg/agent/controller/reconciliation_test.go b/pkg/agent/controller/reconciliation_test.go index 5d30ae109..2ab140ada 100644 --- a/pkg/agent/controller/reconciliation_test.go +++ b/pkg/agent/controller/reconciliation_test.go @@ -35,7 +35,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/client-go/dynamic/fake" - "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/testing" mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) @@ -174,12 +173,7 @@ var _ = Describe("Reconciliation", func() { When("a remote aggregated ServiceImport is stale in the local datastore on startup", func() { It("should delete it from the local datastore on reconciliation", func() { - obj, err := t.cluster2.localServiceImportClient.Namespace(t.cluster1.service.Namespace).Get(context.TODO(), - t.cluster1.service.Name, metav1.GetOptions{}) - Expect(err).To(Succeed()) - - serviceImport := &mcsv1a1.ServiceImport{} - Expect(scheme.Scheme.Convert(obj, serviceImport, nil)).To(Succeed()) + serviceImport := getServiceImport(t.cluster2.localServiceImportClient, t.cluster1.service.Namespace, t.cluster1.service.Name) t.afterEach() t = newTestDiver() diff --git a/pkg/agent/controller/service_export_client.go b/pkg/agent/controller/service_export_client.go index ea939ec68..49c41d603 100644 --- a/pkg/agent/controller/service_export_client.go +++ b/pkg/agent/controller/service_export_client.go @@ -58,6 +58,10 @@ func (c *ServiceExportClient) UpdateStatusConditions(ctx context.Context, name, func (c *ServiceExportClient) tryUpdateStatusConditions(ctx context.Context, name, namespace string, canReplace bool, conditions ...mcsv1a1.ServiceExportCondition, ) { + if len(conditions) == 0 { + return + } + findStatusCondition := func(conditions []mcsv1a1.ServiceExportCondition, condType mcsv1a1.ServiceExportConditionType, ) *mcsv1a1.ServiceExportCondition { cond := FindServiceExportStatusCondition(conditions, condType) @@ -81,6 +85,7 @@ func (c *ServiceExportClient) tryUpdateStatusConditions(ctx context.Context, nam if prevCond == nil { if condition.Type == mcsv1a1.ServiceExportConflict && condition.Status == corev1.ConditionFalse { + // The caller intends to clear the Conflict condition so don't add it. continue } @@ -90,12 +95,14 @@ func (c *ServiceExportClient) tryUpdateStatusConditions(ctx context.Context, nam toUpdate.Status.Conditions = append(toUpdate.Status.Conditions, *condition) updated = true } else if condition.Type == mcsv1a1.ServiceExportConflict { - updated = updated || c.mergeConflictCondition(prevCond, condition) - if updated { + condUpdated := c.mergeConflictCondition(prevCond, condition) + if condUpdated { logger.V(log.DEBUG).Infof( "Update status condition for ServiceExport (%s/%s): Type: %q, Status: %q, Reason: %q, Message: %q", namespace, name, condition.Type, prevCond.Status, *prevCond.Reason, *prevCond.Message) } + + updated = updated || condUpdated } else if serviceExportConditionEqual(prevCond, condition) { logger.V(log.TRACE).Infof("Last ServiceExportCondition for (%s/%s) is equal - not updating status: %#v", namespace, name, prevCond) @@ -116,11 +123,11 @@ func (c *ServiceExportClient) mergeConflictCondition(to, from *mcsv1a1.ServiceEx var reasons, messages []string if ptr.Deref(to.Reason, "") != "" { - reasons = strings.Split(ptr.Deref(to.Reason, ""), ",") + reasons = strings.Split(*to.Reason, ",") } if ptr.Deref(to.Message, "") != "" { - messages = strings.Split(ptr.Deref(to.Message, ""), "\n") + messages = strings.Split(*to.Message, "\n") } index := goslices.Index(reasons, *from.Reason) @@ -196,6 +203,18 @@ func (c *ServiceExportClient) getLocalInstance(name, namespace string) *mcsv1a1. return obj.(*mcsv1a1.ServiceExport) } +//nolint:unparam // Ignore `condType` always receives `mcsv1a1.ServiceExportConflict` +func (c *ServiceExportClient) hasCondition(name, namespace string, condType mcsv1a1.ServiceExportConditionType, reason string) bool { + se := c.getLocalInstance(name, namespace) + if se == nil { + return false + } + + cond := FindServiceExportStatusCondition(se.Status.Conditions, condType) + + return cond != nil && strings.Contains(ptr.Deref(cond.Reason, ""), reason) +} + func serviceExportConditionEqual(c1, c2 *mcsv1a1.ServiceExportCondition) bool { return c1.Type == c2.Type && c1.Status == c2.Status && reflect.DeepEqual(c1.Reason, c2.Reason) && reflect.DeepEqual(c1.Message, c2.Message) diff --git a/pkg/agent/controller/service_import.go b/pkg/agent/controller/service_import.go index a53b8c132..b584a39ab 100644 --- a/pkg/agent/controller/service_import.go +++ b/pkg/agent/controller/service_import.go @@ -21,6 +21,10 @@ package controller import ( "context" "fmt" + "math" + "reflect" + "strconv" + "strings" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -36,13 +40,17 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + k8slabels "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/validation" "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/cache" "k8s.io/utils/set" mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) +const timestampAnnotationPrefix = "timestamp.submariner.io/" + //nolint:gocritic // (hugeParam) This function modifies syncerConf so we don't want to pass by pointer. func newServiceImportController(spec *AgentSpecification, syncerMetricNames AgentConfig, syncerConfig broker.SyncerConfig, brokerClient dynamic.Interface, brokerNamespace string, serviceExportClient *ServiceExportClient, @@ -97,7 +105,7 @@ func newServiceImportController(spec *AgentSpecification, syncerMetricNames Agen Federator: federate.NewCreateOrUpdateFederator(syncerConfig.LocalClient, syncerConfig.RestMapper, corev1.NamespaceAll, ""), ResourceType: &mcsv1a1.ServiceImport{}, Transform: controller.onRemoteServiceImport, - OnSuccessfulSync: controller.serviceImportMigrator.onSuccessfulSyncFromBroker, + OnSuccessfulSync: controller.onSuccessfulSyncFromBroker, Scheme: syncerConfig.Scheme, NamespaceInformer: syncerConfig.NamespaceInformer, SyncCounterOpts: &prometheus.GaugeOpts{ @@ -290,12 +298,25 @@ func (c *ServiceImportController) Distribute(ctx context.Context, obj runtime.Ob serviceName := serviceImportSourceName(localServiceImport) serviceNamespace := localServiceImport.Labels[constants.LabelSourceNamespace] + localTimestamp := strconv.FormatInt(int64(math.MaxInt64-1), 10) + + // As per the MCS spec, a conflict will be resolved by assigning precedence based on each ServiceExport's + // creationTimestamp, from oldest to newest. We don't have access to other cluster's ServiceExports so + // instead add our ServiceExport's creationTimestamp as an annotation on the aggregated ServiceImport. + localServiceExport := c.serviceExportClient.getLocalInstance(serviceName, serviceNamespace) + if localServiceExport != nil { + localTimestamp = strconv.FormatInt(localServiceExport.CreationTimestamp.UTC().UnixNano(), 10) + } + + timestampAnnotationKey := makeTimestampAnnotationKey(c.clusterID) + aggregate := &mcsv1a1.ServiceImport{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-%s", serviceName, serviceNamespace), Annotations: map[string]string{ mcsv1a1.LabelServiceName: serviceName, constants.LabelSourceNamespace: serviceNamespace, + timestampAnnotationKey: localTimestamp, }, }, Spec: mcsv1a1.ServiceImportSpec{ @@ -313,12 +334,13 @@ func (c *ServiceImportController) Distribute(ctx context.Context, obj runtime.Ob }, } - conflict := false + typeConflict := false - // Here we just create the aggregated ServiceImport on the broker. We don't merge the local service info until we've - // successfully synced our local EndpointSlice to the broker. This is mainly done b/c the aggregated port information - // is determined from the constituent clusters' EndpointSlices, thus each cluster must have a consistent view of all - // the EndpointSlices in order for the aggregated port information to be eventually consistent. + // Here we create the aggregated ServiceImport on the broker or update the existing instance with our local service + // info, but we don't add/merge our local service ports until we've successfully synced our local EndpointSlice to + // the broker. This is mainly done b/c the aggregated port information is determined from the constituent clusters' + // EndpointSlices, thus each cluster must have a consistent view of all the EndpointSlices in order for the + // aggregated port information to be eventually consistent. result, err := util.CreateOrUpdate(ctx, resource.ForDynamic(c.serviceImportAggregator.brokerServiceImportClient()), @@ -327,27 +349,32 @@ func (c *ServiceImportController) Distribute(ctx context.Context, obj runtime.Ob existing := c.converter.toServiceImport(obj) if localServiceImport.Spec.Type != existing.Spec.Type { - conflict = true + typeConflict = true conflictCondition := newServiceExportCondition( mcsv1a1.ServiceExportConflict, corev1.ConditionTrue, TypeConflictReason, - fmt.Sprintf("The service type %q does not match the type (%q) of the existing service export", + fmt.Sprintf("The local service type (%q) does not match the type (%q) of the existing exported service", localServiceImport.Spec.Type, existing.Spec.Type)) c.serviceExportClient.UpdateStatusConditions(ctx, serviceName, serviceNamespace, conflictCondition, newServiceExportCondition(constants.ServiceExportReady, corev1.ConditionFalse, ExportFailedReason, "Unable to export due to an irresolvable conflict")) } else { - c.serviceExportClient.UpdateStatusConditions(ctx, serviceName, serviceNamespace, newServiceExportCondition( - mcsv1a1.ServiceExportConflict, corev1.ConditionFalse, TypeConflictReason, "")) - - if existing.Spec.SessionAffinity == "" || existing.Spec.SessionAffinity == corev1.ServiceAffinityNone { - existing.Spec.SessionAffinity = localServiceImport.Spec.SessionAffinity + if c.serviceExportClient.hasCondition(serviceName, serviceNamespace, mcsv1a1.ServiceExportConflict, TypeConflictReason) { + c.serviceExportClient.UpdateStatusConditions(ctx, serviceName, serviceNamespace, newServiceExportCondition( + mcsv1a1.ServiceExportConflict, corev1.ConditionFalse, TypeConflictReason, "")) } - if existing.Spec.SessionAffinityConfig == nil { - existing.Spec.SessionAffinityConfig = localServiceImport.Spec.SessionAffinityConfig + if existing.Annotations == nil { + existing.Annotations = map[string]string{} } + existing.Annotations[timestampAnnotationKey] = localTimestamp + + // Update the appropriate aggregated ServiceImport fields if we're the oldest exporting cluster + _ = c.updateAggregatedServiceImport(existing, localServiceImport) + + c.checkConflicts(ctx, existing, localServiceImport) + var added bool existing.Status.Clusters, added = slices.AppendIfNotPresent(existing.Status.Clusters, @@ -361,7 +388,7 @@ func (c *ServiceImportController) Distribute(ctx context.Context, obj runtime.Ob return c.converter.toUnstructured(existing), nil }) - if err == nil && !conflict { + if err == nil && !typeConflict { err = c.startEndpointsController(localServiceImport) } @@ -424,6 +451,101 @@ func (c *ServiceImportController) onRemoteServiceImport(obj runtime.Object, _ in return c.serviceImportMigrator.onRemoteServiceImport(serviceImport) } +func (c *ServiceImportController) onSuccessfulSyncFromBroker(synced runtime.Object, op syncer.Operation) bool { + ctx := context.TODO() + + retry := c.serviceImportMigrator.onSuccessfulSyncFromBroker(synced, op) + + aggregatedServiceImport := synced.(*mcsv1a1.ServiceImport) + + // Check for conflicts with the local ServiceImport + + siList := c.localSyncer.ListResourcesBySelector(k8slabels.SelectorFromSet(map[string]string{ + mcsv1a1.LabelServiceName: aggregatedServiceImport.Name, + constants.LabelSourceNamespace: aggregatedServiceImport.Namespace, + constants.MCSLabelSourceCluster: c.clusterID, + })) + + if len(siList) == 0 { + // Service not exported locally. + return retry + } + + localServiceImport := siList[0].(*mcsv1a1.ServiceImport) + + // This handles the case where the previously oldest exporting cluster has unexported its service. If we're now + // the oldest exporting cluster, then update the appropriate aggregated ServiceImport fields to match those of + // our service's. + if c.updateAggregatedServiceImport(aggregatedServiceImport, localServiceImport) { + err := c.serviceImportAggregator.update(ctx, aggregatedServiceImport.Name, aggregatedServiceImport.Namespace, + func(aggregated *mcsv1a1.ServiceImport) error { + aggregated.Spec.SessionAffinity = localServiceImport.Spec.SessionAffinity + aggregated.Spec.SessionAffinityConfig = localServiceImport.Spec.SessionAffinityConfig + + return nil + }) + if err != nil { + logger.Errorf(err, "error updating aggregated ServiceImport on broker sync") + + return true + } + } + + c.checkConflicts(ctx, aggregatedServiceImport, localServiceImport) + + return retry +} + +func (c *ServiceImportController) checkConflicts(ctx context.Context, aggregated, local *mcsv1a1.ServiceImport) { + var conditions []mcsv1a1.ServiceExportCondition + + serviceName := local.Labels[mcsv1a1.LabelServiceName] + serviceNamespace := local.Labels[constants.LabelSourceNamespace] + + precedentCluster := findClusterWithOldestTimestamp(aggregated.Annotations) + + if local.Spec.SessionAffinity != aggregated.Spec.SessionAffinity { + conditions = append(conditions, newServiceExportCondition(mcsv1a1.ServiceExportConflict, corev1.ConditionTrue, + SessionAffinityConflictReason, + fmt.Sprintf("The local service SessionAffinity %q conflicts with other constituent clusters. "+ + "Using SessionAffinity %q from the oldest exported service in cluster %q.", + local.Spec.SessionAffinity, aggregated.Spec.SessionAffinity, precedentCluster))) + } else if c.serviceExportClient.hasCondition(serviceName, serviceNamespace, mcsv1a1.ServiceExportConflict, + SessionAffinityConflictReason) { + conditions = append(conditions, newServiceExportCondition( + mcsv1a1.ServiceExportConflict, corev1.ConditionFalse, SessionAffinityConflictReason, "")) + } + + if !reflect.DeepEqual(local.Spec.SessionAffinityConfig, aggregated.Spec.SessionAffinityConfig) { + conditions = append(conditions, newServiceExportCondition(mcsv1a1.ServiceExportConflict, corev1.ConditionTrue, + SessionAffinityConfigConflictReason, + fmt.Sprintf("The local service SessionAffinityConfig %q conflicts with other constituent clusters. "+ + "Using SessionAffinityConfig %q from the oldest exported service in cluster %q.", + toSessionAffinityConfigString(local.Spec.SessionAffinityConfig), + toSessionAffinityConfigString(aggregated.Spec.SessionAffinityConfig), precedentCluster))) + } else if c.serviceExportClient.hasCondition(serviceName, serviceNamespace, mcsv1a1.ServiceExportConflict, + SessionAffinityConfigConflictReason) { + conditions = append(conditions, newServiceExportCondition( + mcsv1a1.ServiceExportConflict, corev1.ConditionFalse, SessionAffinityConfigConflictReason, "")) + } + + c.serviceExportClient.UpdateStatusConditions(ctx, serviceName, serviceNamespace, conditions...) +} + +func (c *ServiceImportController) updateAggregatedServiceImport(aggregated, local *mcsv1a1.ServiceImport) bool { + oldestCluster := findClusterWithOldestTimestamp(aggregated.Annotations) + if oldestCluster != sanitizeClusterID(c.clusterID) { + return false + } + + origSpec := aggregated.Spec + + aggregated.Spec.SessionAffinity = local.Spec.SessionAffinity + aggregated.Spec.SessionAffinityConfig = local.Spec.SessionAffinityConfig + + return !reflect.DeepEqual(origSpec, aggregated.Spec) +} + func (c *ServiceImportController) localServiceImportLister(transform func(si *mcsv1a1.ServiceImport) runtime.Object) []runtime.Object { siList := c.localSyncer.ListResources() @@ -442,3 +564,48 @@ func (c *ServiceImportController) localServiceImportLister(transform func(si *mc return retList } + +func findClusterWithOldestTimestamp(from map[string]string) string { + oldest := int64(math.MaxInt64) + foundCluster := "" + + for k, v := range from { + cluster, found := strings.CutPrefix(k, timestampAnnotationPrefix) + if !found { + continue + } + + t, err := strconv.ParseInt(v, 10, 64) + if err != nil { + logger.Warningf("Invalid timestamp annotation value %q for cluster %q", v, cluster) + continue + } + + if t < oldest || (t == oldest && cluster < foundCluster) { + foundCluster = cluster + oldest = t + } + } + + return foundCluster +} + +func toSessionAffinityConfigString(c *corev1.SessionAffinityConfig) string { + if c != nil && c.ClientIP != nil && c.ClientIP.TimeoutSeconds != nil { + return fmt.Sprintf("ClientIP TimeoutSeconds: %d", *c.ClientIP.TimeoutSeconds) + } + + return "none" +} + +func makeTimestampAnnotationKey(clusterID string) string { + return timestampAnnotationPrefix + sanitizeClusterID(clusterID) +} + +func sanitizeClusterID(clusterID string) string { + if len(clusterID) > validation.DNS1123LabelMaxLength { + clusterID = clusterID[:validation.DNS1123LabelMaxLength] + } + + return resource.EnsureValidName(clusterID) +} diff --git a/pkg/agent/controller/service_import_aggregator.go b/pkg/agent/controller/service_import_aggregator.go index 4202ae61f..ef1c446f3 100644 --- a/pkg/agent/controller/service_import_aggregator.go +++ b/pkg/agent/controller/service_import_aggregator.go @@ -101,6 +101,8 @@ func (a *ServiceImportAggregator) updateOnDelete(ctx context.Context, name, name logger.V(log.DEBUG).Infof("Removed cluster name %q from aggregated ServiceImport %q. New status: %#v", a.clusterID, existing.Name, existing.Status.Clusters) + delete(existing.Annotations, makeTimestampAnnotationKey(a.clusterID)) + return a.setServicePorts(ctx, existing) }) } diff --git a/pkg/agent/controller/service_import_migration_test.go b/pkg/agent/controller/service_import_migration_test.go index 6e221da74..5400702a2 100644 --- a/pkg/agent/controller/service_import_migration_test.go +++ b/pkg/agent/controller/service_import_migration_test.go @@ -32,7 +32,6 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/dynamic" - "k8s.io/client-go/kubernetes/scheme" mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) @@ -171,12 +170,8 @@ func testServiceImportMigration() { // Get the aggregated ServiceImport on the broker. - obj, err := t.brokerServiceImportClient.Namespace(test.RemoteNamespace).Get(context.Background(), - fmt.Sprintf("%s-%s", t.cluster1.service.Name, t.cluster1.service.Namespace), metav1.GetOptions{}) - Expect(err).To(Succeed()) - - aggregatedServiceImport := &mcsv1a1.ServiceImport{} - Expect(scheme.Scheme.Convert(obj, aggregatedServiceImport, nil)).To(Succeed()) + aggregatedServiceImport := getServiceImport(t.brokerServiceImportClient, test.RemoteNamespace, + fmt.Sprintf("%s-%s", t.cluster1.service.Name, t.cluster1.service.Namespace)) By(fmt.Sprintf("Upgrade the first remote cluster %q", remoteServiceImport1.Status.Clusters[0].Cluster)) diff --git a/pkg/agent/controller/types.go b/pkg/agent/controller/types.go index f81c02ae8..8254de4a5 100644 --- a/pkg/agent/controller/types.go +++ b/pkg/agent/controller/types.go @@ -36,9 +36,11 @@ import ( ) const ( - ExportFailedReason = "ExportFailed" - TypeConflictReason = "ConflictingType" - PortConflictReason = "ConflictingPorts" + ExportFailedReason = "ExportFailed" + TypeConflictReason = "ConflictingType" + PortConflictReason = "ConflictingPorts" + SessionAffinityConflictReason = "ConflictingSessionAffinity" + SessionAffinityConfigConflictReason = "ConflictingSessionAffinityConfig" ) type EndpointSliceListerFn func(selector k8slabels.Selector) []runtime.Object