Skip to content

Commit

Permalink
Implement legacy ServiceImport migration in the agent controller
Browse files Browse the repository at this point in the history
Legacy per-cluster ServiceImports are longer be created but, for rolling
cluster upgrades, there may be a period of time where there is a mix of
upgraded and non-upgraded clusters. The latter still need to observe the
per-cluster ServiceImports so they need to remain on the broker during the
transition period.

An upgraded cluster can remove its legacy per-cluster ServiceImport from
the broker when it observes that all of the legacy ServiceImport cluster
names are present in the aggregated ServiceImport status cluster name list.
This indicates that all constituent clusters have been upgraded.

Introduced a ServiceImportMigrator that integrates with the
ServiceImportController to handle syncing of legacy ServiceImports from
the broker and removing the local legacy ServiceImport from the broker
based on the criteria above. The latter is triggered whenever an
aggregated ServiceImport is synced from the broker, either new or
updated.

Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis authored and skitt committed Mar 30, 2023
1 parent 64b3ff9 commit 6bb38c9
Show file tree
Hide file tree
Showing 7 changed files with 535 additions and 186 deletions.
10 changes: 0 additions & 10 deletions pkg/agent/controller/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,16 +403,6 @@ func newServiceExportCondition(condType mcsv1a1.ServiceExportConditionType, stat
}
}

// This function also checks the legacy source name label for migration - this can be removed after 0.15.
func serviceImportSourceName(serviceImport *mcsv1a1.ServiceImport) string {
name, ok := serviceImport.Labels[mcsv1a1.LabelServiceName]
if ok {
return name
}

return serviceImport.GetLabels()["lighthouse.submariner.io/sourceName"]
}

func (c converter) toServiceImport(obj runtime.Object) *mcsv1a1.ServiceImport {
to := &mcsv1a1.ServiceImport{}
utilruntime.Must(c.scheme.Convert(obj, to, nil))
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/controller/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func startEndpointController(localClient dynamic.Interface, restMapper meta.REST
serviceImport *mcsv1a1.ServiceImport, clusterID string, globalIngressIPCache *globalIngressIPCache,
) (*EndpointController, error) {
serviceNamespace := serviceImport.Labels[constants.LabelSourceNamespace]
serviceName := serviceImport.Labels[mcsv1a1.LabelServiceName]
serviceName := serviceImportSourceName(serviceImport)

logger.V(log.DEBUG).Infof("Starting Endpoints controller for service %s/%s", serviceNamespace, serviceName)

Expand Down
188 changes: 55 additions & 133 deletions pkg/agent/controller/reconciliation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package controller_test

import (
"context"
"time"
"strings"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
Expand All @@ -33,16 +33,18 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/testing"
mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
)

const (
legacySourceNameLabel = "lighthouse.submariner.io/sourceName"
legacySourceClusterLabel = "lighthouse.submariner.io/sourceCluster"
)

var _ = Describe("Reconciliation", func() {
var t *testDriver
var (
t *testDriver
localServiceImport *mcsv1a1.ServiceImport
localEndpointSlice *discovery.EndpointSlice
brokerServiceImports *unstructured.UnstructuredList
brokerEndpointSlices *unstructured.UnstructuredList
)

BeforeEach(func() {
t = newTestDiver()
Expand All @@ -53,41 +55,47 @@ var _ = Describe("Reconciliation", func() {
t.cluster1.createEndpoints()
t.cluster1.createService()
t.cluster1.createServiceExport()

t.awaitNonHeadlessServiceExported(&t.cluster1)

var err error

brokerServiceImports, err = t.brokerServiceImportClient.Namespace(test.RemoteNamespace).List(context.TODO(), metav1.ListOptions{})
Expect(err).To(Succeed())

brokerEndpointSlices, err = t.brokerEndpointSliceClient.List(context.TODO(), metav1.ListOptions{})
Expect(err).To(Succeed())

localServiceImport = t.cluster1.findLocalServiceImport()
Expect(localServiceImport).ToNot(BeNil())

localEndpointSlice = t.cluster1.findLocalEndpointSlice()
Expect(localEndpointSlice).ToNot(BeNil())
})

AfterEach(func() {
t.afterEach()
})

Context("on restart after a service was exported", func() {
It("should retain the exported resources on reconciliation", func() {
t.awaitNonHeadlessServiceExported(&t.cluster1)
restoreBrokerResources := func() {
for i := range brokerServiceImports.Items {
test.CreateResource(t.brokerServiceImportClient.Namespace(test.RemoteNamespace), &brokerServiceImports.Items[i])
}

localServiceImport := t.cluster1.findLocalServiceImport()
Expect(localServiceImport).ToNot(BeNil())

localEndpointSlice := t.cluster1.findLocalEndpointSlice()
Expect(localEndpointSlice).ToNot(BeNil())

brokerServiceImports, err := t.brokerServiceImportClient.Namespace(test.RemoteNamespace).List(context.TODO(), metav1.ListOptions{})
Expect(err).To(Succeed())

brokerEndpointSlices, err := t.brokerEndpointSliceClient.List(context.TODO(), metav1.ListOptions{})
Expect(err).To(Succeed())
for i := range brokerEndpointSlices.Items {
test.CreateResource(t.brokerEndpointSliceClient, &brokerEndpointSlices.Items[i])
}
}

Context("on restart after a service was exported", func() {
It("should retain the exported resources on reconciliation", func() {
t.afterEach()
t = newTestDiver()

test.CreateResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), localServiceImport)
test.CreateResource(t.cluster1.localEndpointSliceClient, localEndpointSlice)

for i := range brokerServiceImports.Items {
test.CreateResource(t.brokerServiceImportClient.Namespace(test.RemoteNamespace), &brokerServiceImports.Items[i])
}

for i := range brokerEndpointSlices.Items {
test.CreateResource(t.brokerEndpointSliceClient, &brokerEndpointSlices.Items[i])
}
restoreBrokerResources()

t.cluster1.createEndpoints()
t.cluster1.createService()
Expand All @@ -102,26 +110,31 @@ var _ = Describe("Reconciliation", func() {
testutil.EnsureNoActionsForResource(&t.cluster1.localDynClient.Fake, "endpointslices", "delete")

brokerDynClient := t.syncerConfig.BrokerClient.(*fake.FakeDynamicClient)
testutil.EnsureNoActionsForResource(&brokerDynClient.Fake, "serviceimports", "delete")
testutil.EnsureNoActionsForResource(&brokerDynClient.Fake, "endpointslices", "delete")

// For migration cleanup, it may attempt to delete a local legacy ServiceImport from the broker so ignore it.
Consistently(func() bool {
siActions := brokerDynClient.Fake.Actions()
for i := range siActions {
if siActions[i].GetResource().Resource == "serviceimports" && siActions[i].GetVerb() == "delete" &&
!strings.Contains(siActions[i].(testing.DeleteAction).GetName(), t.cluster1.clusterID) {
return true
}
}

return false
}).Should(BeFalse())
})
})

When("a local ServiceImport is stale on startup due to a missed ServiceExport delete event", func() {
It("should unexport the service on reconciliation", func() {
t.awaitNonHeadlessServiceExported(&t.cluster1)

serviceImport := t.cluster1.findLocalServiceImport()
Expect(serviceImport).ToNot(BeNil())

endpointSlice := t.cluster1.findLocalEndpointSlice()
Expect(endpointSlice).ToNot(BeNil())

t.afterEach()
t = newTestDiver()

test.CreateResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), serviceImport)
test.CreateResource(t.cluster1.localEndpointSliceClient, endpointSlice)
restoreBrokerResources()
test.CreateResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), localServiceImport)
test.CreateResource(t.cluster1.localEndpointSliceClient, localEndpointSlice)
t.cluster1.createService()
t.cluster1.start(t, *t.syncerConfig)

Expand All @@ -131,14 +144,11 @@ var _ = Describe("Reconciliation", func() {

When("a local ServiceImport is stale on startup due to a missed Service delete event", func() {
It("should unexport the service on reconciliation", func() {
t.awaitNonHeadlessServiceExported(&t.cluster1)
serviceImport := t.cluster1.findLocalServiceImport()
Expect(serviceImport).ToNot(BeNil())

t.afterEach()
t = newTestDiver()

test.CreateResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), serviceImport)
restoreBrokerResources()
test.CreateResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), localServiceImport)
t.cluster1.createServiceExport()
t.cluster1.start(t, *t.syncerConfig)

Expand All @@ -149,8 +159,6 @@ var _ = Describe("Reconciliation", func() {

When("a remote aggregated ServiceImport is stale in the local datastore on startup", func() {
It("should delete it from the local datastore on reconciliation", func() {
t.awaitNonHeadlessServiceExported(&t.cluster1)

obj, err := t.cluster2.localServiceImportClient.Namespace(t.cluster1.service.Namespace).Get(context.TODO(),
t.cluster1.service.Name, metav1.GetOptions{})
Expect(err).To(Succeed())
Expand All @@ -170,17 +178,10 @@ var _ = Describe("Reconciliation", func() {

When("a remote aggregated ServiceImport in the broker datastore contains a stale cluster name on startup", func() {
It("should delete it on reconciliation", func() {
t.awaitNonHeadlessServiceExported(&t.cluster1)

brokerServiceImports, err := t.brokerServiceImportClient.Namespace(test.RemoteNamespace).List(context.TODO(), metav1.ListOptions{})
Expect(err).To(Succeed())

t.afterEach()
t = newTestDiver()

for i := range brokerServiceImports.Items {
test.CreateResource(t.brokerServiceImportClient.Namespace(test.RemoteNamespace), &brokerServiceImports.Items[i])
}
restoreBrokerResources()

t.justBeforeEach()

Expand All @@ -190,7 +191,6 @@ var _ = Describe("Reconciliation", func() {

When("a local EndpointSlice is stale in the broker datastore on startup", func() {
It("should delete it from the broker datastore on reconciliation", func() {
t.awaitNonHeadlessServiceExported(&t.cluster1)
endpointSlice := findEndpointSlice(t.brokerEndpointSliceClient, t.cluster1.endpoints.Namespace,
t.cluster1.endpoints.Name, t.cluster1.clusterID)
Expect(endpointSlice).ToNot(BeNil())
Expand All @@ -207,7 +207,6 @@ var _ = Describe("Reconciliation", func() {

When("a remote EndpointSlice is stale in the local datastore on startup", func() {
It("should delete it from the local datastore on reconciliation", func() {
t.awaitNonHeadlessServiceExported(&t.cluster1)
endpointSlice := findEndpointSlice(t.cluster2.localEndpointSliceClient, t.cluster1.endpoints.Namespace,
t.cluster1.endpoints.Name, t.cluster1.clusterID)
Expect(endpointSlice).ToNot(BeNil())
Expand Down Expand Up @@ -250,81 +249,4 @@ var _ = Describe("Reconciliation", func() {
test.AwaitNoResource(t.brokerEndpointSliceClient, epsName)
})
})

When("a local ServiceImport with the legacy source labels exists after restart", func() {
var (
serviceImport *mcsv1a1.ServiceImport
brokerServiceImports *unstructured.UnstructuredList
)

JustBeforeEach(func() {
t.awaitNonHeadlessServiceExported(&t.cluster1)
serviceImport = t.cluster1.findLocalServiceImport()
Expect(serviceImport).ToNot(BeNil())

var err error
brokerServiceImports, err = t.brokerServiceImportClient.Namespace(test.RemoteNamespace).List(context.TODO(), metav1.ListOptions{})

Expect(err).To(Succeed())

t.afterEach()
t = newTestDiver()

serviceImport.Labels[legacySourceNameLabel] = serviceImport.Labels[mcsv1a1.LabelServiceName]
delete(serviceImport.Labels, mcsv1a1.LabelServiceName)

serviceImport.Labels[legacySourceClusterLabel] = serviceImport.Labels[constants.MCSLabelSourceCluster]
delete(serviceImport.Labels, constants.MCSLabelSourceCluster)

t.cluster1.createService()
t.cluster1.createEndpoints()

By("Restarting controller")
})

It("should update the ServiceImport labels and sync it", func() {
t.cluster1.start(t, *t.syncerConfig)
t.cluster2.start(t, *t.syncerConfig)

By("Create the ServiceImport with the legacy labels")

// We want to verify the transition behavior during the small window where the ServiceImport labels
// haven't been migrated yet. This occurs when the ServiceExport is processed so to force the sequencing
// create the ServiceImport with the legacy labels first then create the ServiceExport.

test.CreateResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), serviceImport)

// The ServiceImport still has the legacy labels so shouldn't be synced to the broker yet.
Eventually(func() int {
list, _ := t.brokerServiceImportClient.Namespace(test.RemoteNamespace).List(context.TODO(), metav1.ListOptions{})
return len(list.Items)
}, 5*time.Second).Should(BeZero(), "Unexpected ServiceImport found")

// The EndpointSlice shouldn't be created yet since the ServiceImport still has the legacy source cluster label.
Consistently(func() *discovery.EndpointSlice {
return t.cluster1.findLocalEndpointSlice()
}).Should(BeNil(), "Unexpected EndpointSlice found")

By("Create the ServiceExport")

t.cluster1.createServiceExport()

t.awaitNoEndpointSlice(&t.cluster1)
})

Context("and the ServiceExport no longer exists", func() {
It("should delete the ServiceImport on reconciliation", func() {
test.CreateResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), serviceImport)

for i := range brokerServiceImports.Items {
test.CreateResource(t.brokerServiceImportClient.Namespace(test.RemoteNamespace), &brokerServiceImports.Items[i])
}

t.cluster1.start(t, *t.syncerConfig)
t.cluster2.start(t, *t.syncerConfig)

t.awaitNoAggregatedServiceImport(&t.cluster1)
})
})
})
})
Loading

0 comments on commit 6bb38c9

Please sign in to comment.