From 6bb38c90d2e7f14281a650f57cf19e0ec00d9607 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 29 Mar 2023 08:05:50 -0400 Subject: [PATCH] Implement legacy ServiceImport migration in the agent controller Legacy per-cluster ServiceImports are longer be created but, for rolling cluster upgrades, there may be a period of time where there is a mix of upgraded and non-upgraded clusters. The latter still need to observe the per-cluster ServiceImports so they need to remain on the broker during the transition period. An upgraded cluster can remove its legacy per-cluster ServiceImport from the broker when it observes that all of the legacy ServiceImport cluster names are present in the aggregated ServiceImport status cluster name list. This indicates that all constituent clusters have been upgraded. Introduced a ServiceImportMigrator that integrates with the ServiceImportController to handle syncing of legacy ServiceImports from the broker and removing the local legacy ServiceImport from the broker based on the criteria above. The latter is triggered whenever an aggregated ServiceImport is synced from the broker, either new or updated. Signed-off-by: Tom Pantelis --- pkg/agent/controller/agent.go | 10 - pkg/agent/controller/endpoint.go | 2 +- pkg/agent/controller/reconciliation_test.go | 188 ++++------- pkg/agent/controller/service_import.go | 76 ++--- .../service_import_migration_test.go | 297 ++++++++++++++++++ .../controller/service_import_migrator.go | 147 +++++++++ pkg/agent/controller/types.go | 1 + 7 files changed, 535 insertions(+), 186 deletions(-) create mode 100644 pkg/agent/controller/service_import_migration_test.go create mode 100644 pkg/agent/controller/service_import_migrator.go diff --git a/pkg/agent/controller/agent.go b/pkg/agent/controller/agent.go index 8ceb2751c..f193c5e9b 100644 --- a/pkg/agent/controller/agent.go +++ b/pkg/agent/controller/agent.go @@ -403,16 +403,6 @@ func newServiceExportCondition(condType mcsv1a1.ServiceExportConditionType, stat } } -// This function also checks the legacy source name label for migration - this can be removed after 0.15. -func serviceImportSourceName(serviceImport *mcsv1a1.ServiceImport) string { - name, ok := serviceImport.Labels[mcsv1a1.LabelServiceName] - if ok { - return name - } - - return serviceImport.GetLabels()["lighthouse.submariner.io/sourceName"] -} - func (c converter) toServiceImport(obj runtime.Object) *mcsv1a1.ServiceImport { to := &mcsv1a1.ServiceImport{} utilruntime.Must(c.scheme.Convert(obj, to, nil)) diff --git a/pkg/agent/controller/endpoint.go b/pkg/agent/controller/endpoint.go index 87ba11de8..36efe8f7a 100644 --- a/pkg/agent/controller/endpoint.go +++ b/pkg/agent/controller/endpoint.go @@ -49,7 +49,7 @@ func startEndpointController(localClient dynamic.Interface, restMapper meta.REST serviceImport *mcsv1a1.ServiceImport, clusterID string, globalIngressIPCache *globalIngressIPCache, ) (*EndpointController, error) { serviceNamespace := serviceImport.Labels[constants.LabelSourceNamespace] - serviceName := serviceImport.Labels[mcsv1a1.LabelServiceName] + serviceName := serviceImportSourceName(serviceImport) logger.V(log.DEBUG).Infof("Starting Endpoints controller for service %s/%s", serviceNamespace, serviceName) diff --git a/pkg/agent/controller/reconciliation_test.go b/pkg/agent/controller/reconciliation_test.go index 81eddbeb2..dd9fc62a2 100644 --- a/pkg/agent/controller/reconciliation_test.go +++ b/pkg/agent/controller/reconciliation_test.go @@ -20,7 +20,7 @@ package controller_test import ( "context" - "time" + "strings" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -33,16 +33,18 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/client-go/dynamic/fake" "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/testing" mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) -const ( - legacySourceNameLabel = "lighthouse.submariner.io/sourceName" - legacySourceClusterLabel = "lighthouse.submariner.io/sourceCluster" -) - var _ = Describe("Reconciliation", func() { - var t *testDriver + var ( + t *testDriver + localServiceImport *mcsv1a1.ServiceImport + localEndpointSlice *discovery.EndpointSlice + brokerServiceImports *unstructured.UnstructuredList + brokerEndpointSlices *unstructured.UnstructuredList + ) BeforeEach(func() { t = newTestDiver() @@ -53,41 +55,47 @@ var _ = Describe("Reconciliation", func() { t.cluster1.createEndpoints() t.cluster1.createService() t.cluster1.createServiceExport() + + t.awaitNonHeadlessServiceExported(&t.cluster1) + + var err error + + brokerServiceImports, err = t.brokerServiceImportClient.Namespace(test.RemoteNamespace).List(context.TODO(), metav1.ListOptions{}) + Expect(err).To(Succeed()) + + brokerEndpointSlices, err = t.brokerEndpointSliceClient.List(context.TODO(), metav1.ListOptions{}) + Expect(err).To(Succeed()) + + localServiceImport = t.cluster1.findLocalServiceImport() + Expect(localServiceImport).ToNot(BeNil()) + + localEndpointSlice = t.cluster1.findLocalEndpointSlice() + Expect(localEndpointSlice).ToNot(BeNil()) }) AfterEach(func() { t.afterEach() }) - Context("on restart after a service was exported", func() { - It("should retain the exported resources on reconciliation", func() { - t.awaitNonHeadlessServiceExported(&t.cluster1) + restoreBrokerResources := func() { + for i := range brokerServiceImports.Items { + test.CreateResource(t.brokerServiceImportClient.Namespace(test.RemoteNamespace), &brokerServiceImports.Items[i]) + } - localServiceImport := t.cluster1.findLocalServiceImport() - Expect(localServiceImport).ToNot(BeNil()) - - localEndpointSlice := t.cluster1.findLocalEndpointSlice() - Expect(localEndpointSlice).ToNot(BeNil()) - - brokerServiceImports, err := t.brokerServiceImportClient.Namespace(test.RemoteNamespace).List(context.TODO(), metav1.ListOptions{}) - Expect(err).To(Succeed()) - - brokerEndpointSlices, err := t.brokerEndpointSliceClient.List(context.TODO(), metav1.ListOptions{}) - Expect(err).To(Succeed()) + for i := range brokerEndpointSlices.Items { + test.CreateResource(t.brokerEndpointSliceClient, &brokerEndpointSlices.Items[i]) + } + } + Context("on restart after a service was exported", func() { + It("should retain the exported resources on reconciliation", func() { t.afterEach() t = newTestDiver() test.CreateResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), localServiceImport) test.CreateResource(t.cluster1.localEndpointSliceClient, localEndpointSlice) - for i := range brokerServiceImports.Items { - test.CreateResource(t.brokerServiceImportClient.Namespace(test.RemoteNamespace), &brokerServiceImports.Items[i]) - } - - for i := range brokerEndpointSlices.Items { - test.CreateResource(t.brokerEndpointSliceClient, &brokerEndpointSlices.Items[i]) - } + restoreBrokerResources() t.cluster1.createEndpoints() t.cluster1.createService() @@ -102,26 +110,31 @@ var _ = Describe("Reconciliation", func() { testutil.EnsureNoActionsForResource(&t.cluster1.localDynClient.Fake, "endpointslices", "delete") brokerDynClient := t.syncerConfig.BrokerClient.(*fake.FakeDynamicClient) - testutil.EnsureNoActionsForResource(&brokerDynClient.Fake, "serviceimports", "delete") testutil.EnsureNoActionsForResource(&brokerDynClient.Fake, "endpointslices", "delete") + + // For migration cleanup, it may attempt to delete a local legacy ServiceImport from the broker so ignore it. + Consistently(func() bool { + siActions := brokerDynClient.Fake.Actions() + for i := range siActions { + if siActions[i].GetResource().Resource == "serviceimports" && siActions[i].GetVerb() == "delete" && + !strings.Contains(siActions[i].(testing.DeleteAction).GetName(), t.cluster1.clusterID) { + return true + } + } + + return false + }).Should(BeFalse()) }) }) When("a local ServiceImport is stale on startup due to a missed ServiceExport delete event", func() { It("should unexport the service on reconciliation", func() { - t.awaitNonHeadlessServiceExported(&t.cluster1) - - serviceImport := t.cluster1.findLocalServiceImport() - Expect(serviceImport).ToNot(BeNil()) - - endpointSlice := t.cluster1.findLocalEndpointSlice() - Expect(endpointSlice).ToNot(BeNil()) - t.afterEach() t = newTestDiver() - test.CreateResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), serviceImport) - test.CreateResource(t.cluster1.localEndpointSliceClient, endpointSlice) + restoreBrokerResources() + test.CreateResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), localServiceImport) + test.CreateResource(t.cluster1.localEndpointSliceClient, localEndpointSlice) t.cluster1.createService() t.cluster1.start(t, *t.syncerConfig) @@ -131,14 +144,11 @@ var _ = Describe("Reconciliation", func() { When("a local ServiceImport is stale on startup due to a missed Service delete event", func() { It("should unexport the service on reconciliation", func() { - t.awaitNonHeadlessServiceExported(&t.cluster1) - serviceImport := t.cluster1.findLocalServiceImport() - Expect(serviceImport).ToNot(BeNil()) - t.afterEach() t = newTestDiver() - test.CreateResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), serviceImport) + restoreBrokerResources() + test.CreateResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), localServiceImport) t.cluster1.createServiceExport() t.cluster1.start(t, *t.syncerConfig) @@ -149,8 +159,6 @@ var _ = Describe("Reconciliation", func() { When("a remote aggregated ServiceImport is stale in the local datastore on startup", func() { It("should delete it from the local datastore on reconciliation", func() { - t.awaitNonHeadlessServiceExported(&t.cluster1) - obj, err := t.cluster2.localServiceImportClient.Namespace(t.cluster1.service.Namespace).Get(context.TODO(), t.cluster1.service.Name, metav1.GetOptions{}) Expect(err).To(Succeed()) @@ -170,17 +178,10 @@ var _ = Describe("Reconciliation", func() { When("a remote aggregated ServiceImport in the broker datastore contains a stale cluster name on startup", func() { It("should delete it on reconciliation", func() { - t.awaitNonHeadlessServiceExported(&t.cluster1) - - brokerServiceImports, err := t.brokerServiceImportClient.Namespace(test.RemoteNamespace).List(context.TODO(), metav1.ListOptions{}) - Expect(err).To(Succeed()) - t.afterEach() t = newTestDiver() - for i := range brokerServiceImports.Items { - test.CreateResource(t.brokerServiceImportClient.Namespace(test.RemoteNamespace), &brokerServiceImports.Items[i]) - } + restoreBrokerResources() t.justBeforeEach() @@ -190,7 +191,6 @@ var _ = Describe("Reconciliation", func() { When("a local EndpointSlice is stale in the broker datastore on startup", func() { It("should delete it from the broker datastore on reconciliation", func() { - t.awaitNonHeadlessServiceExported(&t.cluster1) endpointSlice := findEndpointSlice(t.brokerEndpointSliceClient, t.cluster1.endpoints.Namespace, t.cluster1.endpoints.Name, t.cluster1.clusterID) Expect(endpointSlice).ToNot(BeNil()) @@ -207,7 +207,6 @@ var _ = Describe("Reconciliation", func() { When("a remote EndpointSlice is stale in the local datastore on startup", func() { It("should delete it from the local datastore on reconciliation", func() { - t.awaitNonHeadlessServiceExported(&t.cluster1) endpointSlice := findEndpointSlice(t.cluster2.localEndpointSliceClient, t.cluster1.endpoints.Namespace, t.cluster1.endpoints.Name, t.cluster1.clusterID) Expect(endpointSlice).ToNot(BeNil()) @@ -250,81 +249,4 @@ var _ = Describe("Reconciliation", func() { test.AwaitNoResource(t.brokerEndpointSliceClient, epsName) }) }) - - When("a local ServiceImport with the legacy source labels exists after restart", func() { - var ( - serviceImport *mcsv1a1.ServiceImport - brokerServiceImports *unstructured.UnstructuredList - ) - - JustBeforeEach(func() { - t.awaitNonHeadlessServiceExported(&t.cluster1) - serviceImport = t.cluster1.findLocalServiceImport() - Expect(serviceImport).ToNot(BeNil()) - - var err error - brokerServiceImports, err = t.brokerServiceImportClient.Namespace(test.RemoteNamespace).List(context.TODO(), metav1.ListOptions{}) - - Expect(err).To(Succeed()) - - t.afterEach() - t = newTestDiver() - - serviceImport.Labels[legacySourceNameLabel] = serviceImport.Labels[mcsv1a1.LabelServiceName] - delete(serviceImport.Labels, mcsv1a1.LabelServiceName) - - serviceImport.Labels[legacySourceClusterLabel] = serviceImport.Labels[constants.MCSLabelSourceCluster] - delete(serviceImport.Labels, constants.MCSLabelSourceCluster) - - t.cluster1.createService() - t.cluster1.createEndpoints() - - By("Restarting controller") - }) - - It("should update the ServiceImport labels and sync it", func() { - t.cluster1.start(t, *t.syncerConfig) - t.cluster2.start(t, *t.syncerConfig) - - By("Create the ServiceImport with the legacy labels") - - // We want to verify the transition behavior during the small window where the ServiceImport labels - // haven't been migrated yet. This occurs when the ServiceExport is processed so to force the sequencing - // create the ServiceImport with the legacy labels first then create the ServiceExport. - - test.CreateResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), serviceImport) - - // The ServiceImport still has the legacy labels so shouldn't be synced to the broker yet. - Eventually(func() int { - list, _ := t.brokerServiceImportClient.Namespace(test.RemoteNamespace).List(context.TODO(), metav1.ListOptions{}) - return len(list.Items) - }, 5*time.Second).Should(BeZero(), "Unexpected ServiceImport found") - - // The EndpointSlice shouldn't be created yet since the ServiceImport still has the legacy source cluster label. - Consistently(func() *discovery.EndpointSlice { - return t.cluster1.findLocalEndpointSlice() - }).Should(BeNil(), "Unexpected EndpointSlice found") - - By("Create the ServiceExport") - - t.cluster1.createServiceExport() - - t.awaitNoEndpointSlice(&t.cluster1) - }) - - Context("and the ServiceExport no longer exists", func() { - It("should delete the ServiceImport on reconciliation", func() { - test.CreateResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), serviceImport) - - for i := range brokerServiceImports.Items { - test.CreateResource(t.brokerServiceImportClient.Namespace(test.RemoteNamespace), &brokerServiceImports.Items[i]) - } - - t.cluster1.start(t, *t.syncerConfig) - t.cluster2.start(t, *t.syncerConfig) - - t.awaitNoAggregatedServiceImport(&t.cluster1) - }) - }) - }) }) diff --git a/pkg/agent/controller/service_import.go b/pkg/agent/controller/service_import.go index 3ce294b1c..0706b9d75 100644 --- a/pkg/agent/controller/service_import.go +++ b/pkg/agent/controller/service_import.go @@ -36,6 +36,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/cache" mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" @@ -86,15 +87,25 @@ func newServiceImportController(spec *AgentSpecification, syncerMetricNames Agen return nil, errors.Wrap(err, "error creating local ServiceImport syncer") } + controller.serviceImportMigrator = &ServiceImportMigrator{ + clusterID: spec.ClusterID, + localNamespace: spec.Namespace, + brokerClient: brokerClient.Resource(serviceImportGVR).Namespace(brokerNamespace), + listLocalServiceImports: controller.localSyncer.ListResources, + converter: converter{scheme: syncerConfig.Scheme}, + deletedLocalServiceImportsOnBroker: sets.New[string](), + } + controller.remoteSyncer, err = syncer.NewResourceSyncer(&syncer.ResourceSyncerConfig{ - Name: "Remote ServiceImport", - SourceClient: brokerClient, - SourceNamespace: brokerNamespace, - RestMapper: syncerConfig.RestMapper, - Federator: federate.NewCreateOrUpdateFederator(syncerConfig.LocalClient, syncerConfig.RestMapper, corev1.NamespaceAll, ""), - ResourceType: &mcsv1a1.ServiceImport{}, - Transform: controller.onRemoteServiceImport, - Scheme: syncerConfig.Scheme, + Name: "Remote ServiceImport", + SourceClient: brokerClient, + SourceNamespace: brokerNamespace, + RestMapper: syncerConfig.RestMapper, + Federator: federate.NewCreateOrUpdateFederator(syncerConfig.LocalClient, syncerConfig.RestMapper, corev1.NamespaceAll, ""), + ResourceType: &mcsv1a1.ServiceImport{}, + Transform: controller.onRemoteServiceImport, + OnSuccessfulSync: controller.serviceImportMigrator.onSuccessfulSyncFromBroker, + Scheme: syncerConfig.Scheme, }) if err != nil { return nil, errors.Wrap(err, "error creating ServiceImport watcher") @@ -191,8 +202,7 @@ func (c *ServiceImportController) reconcileLocalAggregatedServiceImports() { for i := range siList.Items { si := c.converter.toServiceImport(&siList.Items[i]) - _, ok := si.Labels[mcsv1a1.LabelServiceName] - if ok { + if serviceImportSourceName(si) != "" { // This is not an aggregated ServiceImport. continue } @@ -251,33 +261,25 @@ func (c *ServiceImportController) onLocalServiceImport(obj runtime.Object, _ int serviceImport := obj.(*mcsv1a1.ServiceImport) key, _ := cache.MetaNamespaceKeyFunc(serviceImport) + serviceName := serviceImportSourceName(serviceImport) + + sourceCluster := sourceClusterName(serviceImport) + if sourceCluster != c.clusterID { + return nil, false + } + logger.V(log.DEBUG).Infof("Local ServiceImport %q %sd", key, op) if op == syncer.Delete { - c.serviceExportClient.updateStatusConditions(serviceImport.Labels[mcsv1a1.LabelServiceName], - serviceImport.Labels[constants.LabelSourceNamespace], newServiceExportCondition(constants.ServiceExportSynced, + c.serviceExportClient.updateStatusConditions(serviceName, serviceImport.Labels[constants.LabelSourceNamespace], + newServiceExportCondition(constants.ServiceExportSynced, corev1.ConditionFalse, "NoServiceImport", "ServiceImport was deleted")) return obj, false } - sourceCluster := serviceImport.Labels[constants.MCSLabelSourceCluster] - if sourceCluster == "" || sourceCluster != c.clusterID { - // TODO - handle migration of legacy per-cluster ServiceImports - return nil, false - } - - serviceName, ok := serviceImport.Labels[mcsv1a1.LabelServiceName] - if !ok { - // The label is missing - most likely b/c the ServiceImport hasn't yet been migrated from the legacy labels. - logger.Infof("Label %q missing from ServiceImport (%s/%s) - not syncing", mcsv1a1.LabelServiceName, - serviceImport.Namespace, serviceImport.Name) - - return nil, false - } - - c.serviceExportClient.updateStatusConditions(serviceName, - serviceImport.Labels[constants.LabelSourceNamespace], newServiceExportCondition(constants.ServiceExportSynced, + c.serviceExportClient.updateStatusConditions(serviceName, serviceImport.Labels[constants.LabelSourceNamespace], + newServiceExportCondition(constants.ServiceExportSynced, corev1.ConditionFalse, "AwaitingExport", fmt.Sprintf("ServiceImport %sd - awaiting aggregation on the broker", op))) return obj, false @@ -289,7 +291,7 @@ func (c *ServiceImportController) Distribute(obj runtime.Object) error { logger.V(log.DEBUG).Infof("Distribute for local ServiceImport %q", key) - serviceName := localServiceImport.Labels[mcsv1a1.LabelServiceName] + serviceName := serviceImportSourceName(localServiceImport) serviceNamespace := localServiceImport.Labels[constants.LabelSourceNamespace] aggregate := &mcsv1a1.ServiceImport{ @@ -375,7 +377,7 @@ func (c *ServiceImportController) Delete(obj runtime.Object) error { return err } -func (c *ServiceImportController) onRemoteServiceImport(obj runtime.Object, _ int, _ syncer.Operation) (runtime.Object, bool) { +func (c *ServiceImportController) onRemoteServiceImport(obj runtime.Object, _ int, op syncer.Operation) (runtime.Object, bool) { serviceImport := obj.(*mcsv1a1.ServiceImport) serviceName, ok := serviceImport.Annotations[mcsv1a1.LabelServiceName] @@ -390,7 +392,7 @@ func (c *ServiceImportController) onRemoteServiceImport(obj runtime.Object, _ in return serviceImport, false } - return nil, false + return c.serviceImportMigrator.onRemoteServiceImport(serviceImport) } func (c *ServiceImportController) localServiceImportLister(transform func(si *mcsv1a1.ServiceImport) runtime.Object) []runtime.Object { @@ -400,16 +402,6 @@ func (c *ServiceImportController) localServiceImportLister(transform func(si *mc return nil } - // This function also checks the legacy source name label for migration - this can be removed after 0.15. - sourceClusterName := func(serviceImport *mcsv1a1.ServiceImport) string { - name, ok := serviceImport.Labels[constants.MCSLabelSourceCluster] - if ok { - return name - } - - return serviceImport.Labels["lighthouse.submariner.io/sourceCluster"] - } - retList := make([]runtime.Object, 0, len(siList)) for _, obj := range siList { diff --git a/pkg/agent/controller/service_import_migration_test.go b/pkg/agent/controller/service_import_migration_test.go new file mode 100644 index 000000000..deb731277 --- /dev/null +++ b/pkg/agent/controller/service_import_migration_test.go @@ -0,0 +1,297 @@ +/* +SPDX-License-Identifier: Apache-2.0 + +Copyright Contributors to the Submariner project. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller_test + +import ( + "context" + "fmt" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/submariner-io/admiral/pkg/syncer/test" + "github.com/submariner-io/lighthouse/pkg/agent/controller" + "github.com/submariner-io/lighthouse/pkg/constants" + discovery "k8s.io/api/discovery/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes/scheme" + mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" +) + +var _ = Describe("ServiceImport migration", func() { + Describe("after restart on upgrade with a service that was previously exported", testServiceImportMigration) +}) + +func testServiceImportMigration() { + var ( + t *testDriver + legacyServiceImport *mcsv1a1.ServiceImport + ) + + BeforeEach(func() { + t = newTestDiver() + + t.cluster1.createEndpoints() + t.cluster1.createService() + + legacyServiceImport = t.newLegacyServiceImport(t.cluster1.clusterID) + + test.CreateResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), legacyServiceImport) + + test.CreateResource(t.brokerServiceImportClient.Namespace(test.RemoteNamespace), + test.SetClusterIDLabel(legacyServiceImport, t.cluster1.clusterID)) + + legacyEndpointSlice := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s-%s", t.cluster1.service.Name, t.cluster1.service.Namespace, t.cluster1.clusterID), + Labels: map[string]string{ + discovery.LabelManagedBy: constants.LabelValueManagedBy, + constants.MCSLabelSourceCluster: t.cluster1.clusterID, + mcsv1a1.LabelServiceName: t.cluster1.service.Name, + constants.LabelSourceNamespace: t.cluster1.service.Namespace, + }, + }, + AddressType: discovery.AddressTypeIPv4, + Endpoints: t.cluster1.endpointSliceAddresses, + Ports: []discovery.EndpointPort{ + { + Name: &t.cluster1.service.Spec.Ports[0].Name, + Protocol: &t.cluster1.service.Spec.Ports[0].Protocol, + Port: &t.cluster1.service.Spec.Ports[0].Port, + }, + { + Name: &t.cluster1.service.Spec.Ports[1].Name, + Protocol: &t.cluster1.service.Spec.Ports[1].Protocol, + Port: &t.cluster1.service.Spec.Ports[1].Port, + }, + }, + } + + test.CreateResource(t.cluster1.localEndpointSliceClient, legacyEndpointSlice) + + legacyEndpointSlice.Labels["submariner-io/originatingNamespace"] = t.cluster1.service.Namespace + test.CreateResource(t.brokerEndpointSliceClient, test.SetClusterIDLabel(legacyEndpointSlice, t.cluster1.clusterID)) + + t.cluster1.createServiceExport() + }) + + JustBeforeEach(func() { + t.justBeforeEach() + }) + + AfterEach(func() { + t.afterEach() + }) + + It("should update the local legacy ServiceImport labels and re-export the service", func() { + t.awaitNonHeadlessServiceExported(&t.cluster1) + test.AwaitNoResource(t.brokerServiceImportClient.Namespace(test.RemoteNamespace), legacyServiceImport.Name) + }) + + Context("", func() { + BeforeEach(func() { + Expect(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace).Delete(context.Background(), + legacyServiceImport.Name, metav1.DeleteOptions{})).To(Succeed()) + t.cluster1.deleteServiceExport() + }) + + It("should not sync the local legacy ServiceImport from the broker", func() { + Consistently(func() bool { + _, err := t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace).Get(context.Background(), + legacyServiceImport.Name, metav1.GetOptions{}) + return apierrors.IsNotFound(err) + }, time.Millisecond*300).Should(BeTrue()) + }) + }) + + Context("and the ServiceExport no longer exists", func() { + BeforeEach(func() { + t.cluster1.deleteServiceExport() + }) + + It("should delete the local legacy ServiceImport", func() { + test.AwaitNoResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), legacyServiceImport.Name) + t.awaitNoEndpointSlice(&t.cluster1) + test.AwaitNoResource(t.brokerServiceImportClient.Namespace(test.RemoteNamespace), legacyServiceImport.Name) + }) + }) + + When("there's existing legacy ServiceImports for other clusters", func() { + var ( + remoteServiceImport1 *mcsv1a1.ServiceImport + remoteServiceImport2 *mcsv1a1.ServiceImport + remoteServiceImport3 *mcsv1a1.ServiceImport + ) + + BeforeEach(func() { + remoteServiceImport1 = t.newLegacyServiceImport("region1") + test.CreateResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), remoteServiceImport1) + test.CreateResource(t.brokerServiceImportClient.Namespace(test.RemoteNamespace), + test.SetClusterIDLabel(remoteServiceImport1, remoteServiceImport1.Status.Clusters[0].Cluster)) + + remoteServiceImport2 = t.newLegacyServiceImport("region2") + test.CreateResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), remoteServiceImport2) + test.CreateResource(t.brokerServiceImportClient.Namespace(test.RemoteNamespace), + test.SetClusterIDLabel(remoteServiceImport2, remoteServiceImport2.Status.Clusters[0].Cluster)) + + remoteServiceImport3 = t.newLegacyServiceImport("region3") + test.CreateResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), remoteServiceImport3) + test.CreateResource(t.brokerServiceImportClient.Namespace(test.RemoteNamespace), + test.SetClusterIDLabel(remoteServiceImport3, remoteServiceImport3.Status.Clusters[0].Cluster)) + }) + + Context("that haven't been upgraded yet", func() { + It("should retain the local legacy ServiceImport on the broker", func() { + t.awaitNonHeadlessServiceExported(&t.cluster1) + ensureServiceImport(t.brokerServiceImportClient.Namespace(test.RemoteNamespace), legacyServiceImport.Name) + }) + }) + + Context("and after upgrading the clusters", func() { + It("should eventually remove the local legacy ServiceImport from the broker", func() { + t.awaitAggregatedServiceImport(mcsv1a1.ClusterSetIP, t.cluster1.service.Name, t.cluster1.service.Namespace, &t.cluster1) + + // Get the aggregated ServiceImport on the broker. + + obj, err := t.brokerServiceImportClient.Namespace(test.RemoteNamespace).Get(context.Background(), + fmt.Sprintf("%s-%s", t.cluster1.service.Name, t.cluster1.service.Namespace), metav1.GetOptions{}) + Expect(err).To(Succeed()) + + aggregatedServiceImport := &mcsv1a1.ServiceImport{} + Expect(scheme.Scheme.Convert(obj, aggregatedServiceImport, nil)).To(Succeed()) + + By(fmt.Sprintf("Upgrade the first remote cluster %q", remoteServiceImport1.Status.Clusters[0].Cluster)) + + aggregatedServiceImport.Status.Clusters = append(aggregatedServiceImport.Status.Clusters, mcsv1a1.ClusterStatus{ + Cluster: remoteServiceImport1.Status.Clusters[0].Cluster, + }) + + test.UpdateResource(t.brokerServiceImportClient.Namespace(test.RemoteNamespace), aggregatedServiceImport) + + // Since the other remote clusters aren't upgraded yet, it shouldn't delete the local legacy ServiceImport + // from the broker yet. + + ensureServiceImport(t.brokerServiceImportClient.Namespace(test.RemoteNamespace), legacyServiceImport.Name) + + // Remove the legacy ServiceImport for remote cluster 3 from the broker. + + Expect(t.brokerServiceImportClient.Namespace(test.RemoteNamespace).Delete(context.Background(), + remoteServiceImport3.Name, metav1.DeleteOptions{})).To(Succeed()) + test.AwaitNoResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), remoteServiceImport3.Name) + + By(fmt.Sprintf("Upgrade the second remote cluster %q", remoteServiceImport2.Status.Clusters[0].Cluster)) + + aggregatedServiceImport.Status.Clusters = append(aggregatedServiceImport.Status.Clusters, mcsv1a1.ClusterStatus{ + Cluster: remoteServiceImport2.Status.Clusters[0].Cluster, + }) + + test.UpdateResource(t.brokerServiceImportClient.Namespace(test.RemoteNamespace), aggregatedServiceImport) + + // Since we also deleted the legacy ServiceImport from remote cluster 3, it should observe that all remote + // clusters have effectively been upgraded and thus should delete the local legacy ServiceImport from the broker. + + test.AwaitNoResource(t.brokerServiceImportClient.Namespace(test.RemoteNamespace), legacyServiceImport.Name) + + // Ensure the sync from the broker doesn't delete the local ServiceImport. + + ensureServiceImport(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), legacyServiceImport.Name) + + Expect(t.brokerServiceImportClient.Namespace(test.RemoteNamespace).Delete(context.Background(), + remoteServiceImport1.Name, metav1.DeleteOptions{})).To(Succeed()) + test.AwaitNoResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), remoteServiceImport1.Name) + }) + }) + + Context("that have already been upgraded", func() { + BeforeEach(func() { + test.CreateResource(t.brokerServiceImportClient.Namespace(test.RemoteNamespace), &mcsv1a1.ServiceImport{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", t.cluster1.service.Name, t.cluster1.service.Namespace), + Annotations: map[string]string{ + mcsv1a1.LabelServiceName: t.cluster1.service.Name, + constants.LabelSourceNamespace: t.cluster1.service.Namespace, + }, + }, + Spec: mcsv1a1.ServiceImportSpec{ + Type: mcsv1a1.ClusterSetIP, + Ports: []mcsv1a1.ServicePort{port1, port2}, + }, + Status: mcsv1a1.ServiceImportStatus{ + Clusters: []mcsv1a1.ClusterStatus{ + { + Cluster: remoteServiceImport1.Status.Clusters[0].Cluster, + }, + { + Cluster: remoteServiceImport2.Status.Clusters[0].Cluster, + }, + { + Cluster: remoteServiceImport3.Status.Clusters[0].Cluster, + }, + }, + }, + }) + }) + + It("should remove the local legacy ServiceImport from the broker", func() { + test.AwaitNoResource(t.brokerServiceImportClient.Namespace(test.RemoteNamespace), legacyServiceImport.Name) + }) + }) + }) +} + +func ensureServiceImport(client dynamic.ResourceInterface, name string) { + Consistently(func() bool { + _, err := client.Get(context.Background(), name, metav1.GetOptions{}) + return apierrors.IsNotFound(err) + }, time.Millisecond*300).Should(BeFalse()) +} + +func (t *testDriver) newLegacyServiceImport(clusterID string) *mcsv1a1.ServiceImport { + name := t.cluster1.service.Name + namespace := t.cluster1.service.Namespace + + return &mcsv1a1.ServiceImport{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s-%s", name, namespace, clusterID), + Annotations: map[string]string{ + "origin-name": name, + "origin-namespace": namespace, + }, + Labels: map[string]string{ + controller.LegacySourceNameLabel: name, + constants.LabelSourceNamespace: namespace, + controller.LegacySourceClusterLabel: clusterID, + }, + }, + Spec: mcsv1a1.ServiceImportSpec{ + Type: mcsv1a1.ClusterSetIP, + IPs: []string{"1.2.3.4"}, + Ports: []mcsv1a1.ServicePort{port1, port2}, + }, + Status: mcsv1a1.ServiceImportStatus{ + Clusters: []mcsv1a1.ClusterStatus{ + { + Cluster: clusterID, + }, + }, + }, + } +} diff --git a/pkg/agent/controller/service_import_migrator.go b/pkg/agent/controller/service_import_migrator.go new file mode 100644 index 000000000..301518a8f --- /dev/null +++ b/pkg/agent/controller/service_import_migrator.go @@ -0,0 +1,147 @@ +/* +SPDX-License-Identifier: Apache-2.0 + +Copyright Contributors to the Submariner project. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + + "github.com/submariner-io/admiral/pkg/slices" + "github.com/submariner-io/admiral/pkg/syncer" + "github.com/submariner-io/lighthouse/pkg/constants" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/dynamic" + mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" +) + +const ( + LegacySourceNameLabel = "lighthouse.submariner.io/sourceName" + LegacySourceClusterLabel = "lighthouse.submariner.io/sourceCluster" +) + +// ServiceImportMigrator handles migration from the legacy per-cluster ServiceImports to aggregated ServiceImports added in 0.15. +type ServiceImportMigrator struct { + brokerClient dynamic.ResourceInterface + listLocalServiceImports func() ([]runtime.Object, error) + clusterID string + localNamespace string + converter converter + deletedLocalServiceImportsOnBroker sets.Set[string] +} + +func (c *ServiceImportMigrator) onRemoteServiceImport(serviceImport *mcsv1a1.ServiceImport) (runtime.Object, bool) { + // Ignore a legacy ServiceImport from the broker for this cluster. + if serviceImport.Labels[LegacySourceClusterLabel] == c.clusterID { + return nil, false + } + + // Remote legacy ServiceImports are synced to the local submariner namespace. + serviceImport.Namespace = c.localNamespace + + return serviceImport, false +} + +func (c *ServiceImportMigrator) onSuccessfulSyncFromBroker(obj runtime.Object, op syncer.Operation) bool { + if op == syncer.Delete { + return false + } + + aggregatedServiceImport := obj.(*mcsv1a1.ServiceImport) + + _, ok := aggregatedServiceImport.Labels[LegacySourceNameLabel] + if ok { + // This is not an aggregated ServiceImport. + return false + } + + localServiceImportName := fmt.Sprintf("%s-%s-%s", aggregatedServiceImport.Name, aggregatedServiceImport.Namespace, c.clusterID) + if c.deletedLocalServiceImportsOnBroker.Has(localServiceImportName) { + // We've already deleted our legacy local ServiceImport from the broker for this service. + return false + } + + siList, err := c.listLocalServiceImports() + if err != nil { + logger.Error(err, "error listing legacy ServiceImports") + return true + } + + totalRemoteClusters := 0 + totalRemoteClustersUpgraded := 0 + foundLocalServiceImport := false + + for _, obj := range siList { + si := obj.(*mcsv1a1.ServiceImport) + + if serviceImportSourceName(si) != aggregatedServiceImport.Name || + si.Labels[constants.LabelSourceNamespace] != aggregatedServiceImport.Namespace { + continue + } + + clusterID := sourceClusterName(si) + if clusterID == c.clusterID { + foundLocalServiceImport = true + continue + } + + totalRemoteClusters++ + + if slices.IndexOf(aggregatedServiceImport.Status.Clusters, clusterID, clusterStatusKey) >= 0 { + totalRemoteClustersUpgraded++ + } + } + + if foundLocalServiceImport && totalRemoteClustersUpgraded == totalRemoteClusters { + logger.Infof("All remote clusters have been upgraded for service \"%s/%s\" - removing local ServiceImport %q from the broker", + aggregatedServiceImport.Namespace, aggregatedServiceImport.Name, localServiceImportName) + + err := c.brokerClient.Delete(context.Background(), localServiceImportName, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + logger.Error(err, "error deleting legacy ServiceImport from the broker") + return true + } + + c.deletedLocalServiceImportsOnBroker.Insert(localServiceImportName) + } + + return false +} + +func serviceImportSourceName(serviceImport *mcsv1a1.ServiceImport) string { + // This function also checks the legacy source name label for migration. + name, ok := serviceImport.Labels[mcsv1a1.LabelServiceName] + if ok { + return name + } + + return serviceImport.Labels[LegacySourceNameLabel] +} + +func sourceClusterName(serviceImport *mcsv1a1.ServiceImport) string { + // This function also checks the legacy source cluster label for migration. + name, ok := serviceImport.Labels[constants.MCSLabelSourceCluster] + if ok { + return name + } + + return serviceImport.Labels[LegacySourceClusterLabel] +} diff --git a/pkg/agent/controller/types.go b/pkg/agent/controller/types.go index f0e09a6c6..f796a803e 100644 --- a/pkg/agent/controller/types.go +++ b/pkg/agent/controller/types.go @@ -75,6 +75,7 @@ type ServiceImportController struct { localClient dynamic.Interface restMapper meta.RESTMapper serviceImportAggregator *ServiceImportAggregator + serviceImportMigrator *ServiceImportMigrator serviceExportClient *ServiceExportClient localSyncer syncer.Interface remoteSyncer syncer.Interface