Skip to content

Commit

Permalink
Delete stale EndpointSlices on restart
Browse files Browse the repository at this point in the history
If a local LH EndpointSlices exists but the corresponding Service doesn't
exist, delete the EPS.

Also, a headless Service can have multiple EndpointSlices so reconcile
the LH EndpointSlices on startup to handle a K8s EndpointSlice deleted
while the controller wasn't running.

Fixes #1416

Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis authored and sridhargaddam committed Nov 2, 2023
1 parent de6fafb commit ace7afe
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 25 deletions.
9 changes: 7 additions & 2 deletions pkg/agent/controller/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
k8slabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
validations "k8s.io/apimachinery/pkg/util/validation"
Expand Down Expand Up @@ -113,14 +114,18 @@ func New(spec *AgentSpecification, syncerConf broker.SyncerConfig, syncerMetricN
localSyncer: agentController.serviceExportSyncer,
}

agentController.endpointSliceController, err = newEndpointSliceController(spec, syncerConf, agentController.serviceExportClient)
agentController.endpointSliceController, err = newEndpointSliceController(spec, syncerConf, agentController.serviceExportClient,
agentController.serviceSyncer)
if err != nil {
return nil, err
}

agentController.serviceImportController, err = newServiceImportController(spec, syncerMetricNames, syncerConf,
agentController.endpointSliceController.syncer.GetBrokerClient(),
agentController.endpointSliceController.syncer.GetBrokerNamespace(), agentController.serviceExportClient)
agentController.endpointSliceController.syncer.GetBrokerNamespace(), agentController.serviceExportClient,
func(selector k8slabels.Selector) []runtime.Object {
return agentController.endpointSliceController.syncer.ListLocalResourcesBySelector(&discovery.EndpointSlice{}, selector)
})
if err != nil {
return nil, err
}
Expand Down
29 changes: 27 additions & 2 deletions pkg/agent/controller/endpoint_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/submariner-io/lighthouse/pkg/constants"
corev1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8slabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -43,11 +44,12 @@ import (

//nolint:gocritic // (hugeParam) This function modifies syncerConf so we don't want to pass by pointer.
func newEndpointSliceController(spec *AgentSpecification, syncerConfig broker.SyncerConfig,
serviceExportClient *ServiceExportClient,
serviceExportClient *ServiceExportClient, serviceSyncer syncer.Interface,
) (*EndpointSliceController, error) {
c := &EndpointSliceController{
clusterID: spec.ClusterID,
serviceExportClient: serviceExportClient,
serviceSyncer: serviceSyncer,
conflictCheckWorkQueue: workqueue.New("ConflictChecker"),
}

Expand Down Expand Up @@ -116,8 +118,31 @@ func (c *EndpointSliceController) onLocalEndpointSlice(obj runtime.Object, _ int
return nil, false
}

serviceName := endpointSlice.Labels[mcsv1a1.LabelServiceName]

logger.V(log.DEBUG).Infof("Local EndpointSlice \"%s/%s\" for service %q %sd",
endpointSlice.Namespace, endpointSlice.Name, endpointSlice.Labels[mcsv1a1.LabelServiceName], op)
endpointSlice.Namespace, endpointSlice.Name, serviceName, op)

// Check if the associated Service exists and, if not, delete the EndpointSlice. On restart, it's possible the Service could've been
// deleted.
if op == syncer.Create {
_, found, _ := c.serviceSyncer.GetResource(serviceName, endpointSlice.Namespace)
if !found {
logger.Infof("The service %q for EndpointSlice \"%s/%s\" does not exist - deleting it",
serviceName, endpointSlice.Namespace, endpointSlice.Name)

err := c.syncer.GetLocalFederator().Delete(ctx, endpointSlice)
if apierrors.IsNotFound(err) {
err = nil
}

if err != nil {
logger.Errorf(err, "Error deleting EndpointSlice %s/%s", endpointSlice.Namespace, endpointSlice.Name)
}

return nil, err != nil
}
}

return obj, false
}
Expand Down
86 changes: 85 additions & 1 deletion pkg/agent/controller/reconciliation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/submariner-io/admiral/pkg/federate"
"github.com/submariner-io/admiral/pkg/syncer/test"
testutil "github.com/submariner-io/admiral/pkg/test"
"github.com/submariner-io/lighthouse/pkg/constants"
Expand Down Expand Up @@ -59,7 +60,11 @@ var _ = Describe("Reconciliation", func() {
t.cluster1.createService()
t.cluster1.createServiceExport()

t.awaitNonHeadlessServiceExported(&t.cluster1)
if t.cluster1.service.Spec.ClusterIP == corev1.ClusterIPNone {
t.awaitHeadlessServiceExported(&t.cluster1)
} else {
t.awaitNonHeadlessServiceExported(&t.cluster1)
}

var err error

Expand Down Expand Up @@ -144,6 +149,7 @@ var _ = Describe("Reconciliation", func() {
test.CreateResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), localServiceImport)
test.CreateResource(t.cluster1.localEndpointSliceClient, localEndpointSlice)
t.cluster1.createService()
t.cluster1.createServiceEndpointSlices()
t.cluster1.start(t, *t.syncerConfig)

t.awaitServiceUnexported(&t.cluster1)
Expand All @@ -157,6 +163,7 @@ var _ = Describe("Reconciliation", func() {

restoreBrokerResources()
test.CreateResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), localServiceImport)
test.CreateResource(t.cluster1.localEndpointSliceClient, localEndpointSlice)
t.cluster1.createServiceExport()
t.cluster1.start(t, *t.syncerConfig)

Expand Down Expand Up @@ -231,6 +238,83 @@ var _ = Describe("Reconciliation", func() {
t.cluster1.service.Name, t.cluster1.clusterID)
})
})

When("a local EndpointSlice is stale on startup", func() {
Context("because the service no longer exists", func() {
It("should delete it from the local datastore", func() {
t.afterEach()
t = newTestDiver()

By("Restarting controllers")

restoreBrokerResources()
test.CreateResource(t.cluster1.localEndpointSliceClient, localEndpointSlice)
t.cluster1.start(t, *t.syncerConfig)

t.awaitServiceUnexported(&t.cluster1)
})
})

Context("because the K8s EndpointSlice no longer exists", func() {
BeforeEach(func() {
t.cluster1.service.Spec.ClusterIP = corev1.ClusterIPNone
})

It("should delete it from the local datastore", func() {
t.afterEach()
t = newTestDiver()

t.cluster1.service.Spec.ClusterIP = corev1.ClusterIPNone

By("Restarting controllers")

restoreBrokerResources()
test.CreateResource(t.cluster1.localServiceImportClient.Namespace(test.LocalNamespace), localServiceImport)
test.CreateResource(t.cluster1.localEndpointSliceClient, localEndpointSlice)
test.CreateResource(t.cluster1.localServiceExportClient, serviceExport)
t.cluster1.createService()

// Create a remote EPS for the same service and ensure it's not deleted.
remoteEndpointSlice := localEndpointSlice.DeepCopy()
remoteEndpointSlice.Name = "remote-eps"
remoteEndpointSlice.Labels[constants.MCSLabelSourceCluster] = t.cluster2.clusterID
remoteEndpointSlice.Labels[federate.ClusterIDLabelKey] = t.cluster2.clusterID
test.CreateResource(t.cluster1.localEndpointSliceClient, remoteEndpointSlice)

remoteEndpointSlice.Namespace = test.RemoteNamespace
test.CreateResource(t.brokerEndpointSliceClient, remoteEndpointSlice)

// Create an EPS for a service in another namespace and ensure it's not deleted.
otherNS := "other-ns"
otherNSEndpointSlice := localEndpointSlice.DeepCopy()
otherNSEndpointSlice.Name = "other-ns-eps"
otherNSEndpointSlice.Namespace = otherNS
otherNSEndpointSlice.Labels[constants.LabelSourceNamespace] = otherNS
test.CreateResource(endpointSliceClientFor(t.cluster1.localDynClient, otherNS), otherNSEndpointSlice)

test.CreateResource(t.cluster1.dynamicServiceClientFor().Namespace(otherNS), &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: t.cluster1.service.Name,
Namespace: otherNS,
},
})

t.cluster1.start(t, *t.syncerConfig)

t.awaitNoEndpointSlice(&t.cluster1)

Consistently(func() bool {
test.AwaitResource(t.cluster1.localEndpointSliceClient, remoteEndpointSlice.Name)
return true
}).Should(BeTrue())

Consistently(func() bool {
test.AwaitResource(endpointSliceClientFor(t.cluster1.localDynClient, otherNS), otherNSEndpointSlice.Name)
return true
}).Should(BeTrue())
})
})
})
})

var _ = Describe("EndpointSlice migration", func() {
Expand Down
24 changes: 24 additions & 0 deletions pkg/agent/controller/service_endpoint_slices.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (

func startEndpointSliceController(localClient dynamic.Interface, restMapper meta.RESTMapper, scheme *runtime.Scheme,
serviceImport *mcsv1a1.ServiceImport, clusterID string, globalIngressIPCache *globalIngressIPCache,
localLHEndpointSliceLister EndpointSliceListerFn,
) (*ServiceEndpointSliceController, error) {
serviceNamespace := serviceImport.Labels[constants.LabelSourceNamespace]
serviceName := serviceImportSourceName(serviceImport)
Expand Down Expand Up @@ -89,6 +90,29 @@ func startEndpointSliceController(localClient dynamic.Interface, restMapper meta
return nil, errors.Wrap(err, "error starting Endpoints syncer")
}

if controller.isHeadless() {
controller.epsSyncer.Reconcile(func() []runtime.Object {
list := localLHEndpointSliceLister(k8slabels.SelectorFromSet(map[string]string{
constants.LabelSourceNamespace: serviceNamespace,
mcsv1a1.LabelServiceName: serviceName,
constants.MCSLabelSourceCluster: clusterID,
}))

retList := make([]runtime.Object, 0, len(list))
for _, o := range list {
eps := o.(*discovery.EndpointSlice)
retList = append(retList, &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: eps.Labels[constants.LabelSourceName],
Namespace: serviceNamespace,
},
})
}

return retList
})
}

return controller, nil
}

Expand Down
18 changes: 10 additions & 8 deletions pkg/agent/controller/service_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,17 @@ import (
//nolint:gocritic // (hugeParam) This function modifies syncerConf so we don't want to pass by pointer.
func newServiceImportController(spec *AgentSpecification, syncerMetricNames AgentConfig, syncerConfig broker.SyncerConfig,
brokerClient dynamic.Interface, brokerNamespace string, serviceExportClient *ServiceExportClient,
localLHEndpointSliceLister EndpointSliceListerFn,
) (*ServiceImportController, error) {
controller := &ServiceImportController{
localClient: syncerConfig.LocalClient,
restMapper: syncerConfig.RestMapper,
clusterID: spec.ClusterID,
localNamespace: spec.Namespace,
converter: converter{scheme: syncerConfig.Scheme},
serviceImportAggregator: newServiceImportAggregator(brokerClient, brokerNamespace, spec.ClusterID, syncerConfig.Scheme),
serviceExportClient: serviceExportClient,
localClient: syncerConfig.LocalClient,
restMapper: syncerConfig.RestMapper,
clusterID: spec.ClusterID,
localNamespace: spec.Namespace,
converter: converter{scheme: syncerConfig.Scheme},
serviceImportAggregator: newServiceImportAggregator(brokerClient, brokerNamespace, spec.ClusterID, syncerConfig.Scheme),
serviceExportClient: serviceExportClient,
localLHEndpointSliceLister: localLHEndpointSliceLister,
}

var err error
Expand Down Expand Up @@ -222,7 +224,7 @@ func (c *ServiceImportController) startEndpointsController(serviceImport *mcsv1a
}

endpointController, err := startEndpointSliceController(c.localClient, c.restMapper, c.converter.scheme,
serviceImport, c.clusterID, c.globalIngressIPCache)
serviceImport, c.clusterID, c.globalIngressIPCache, c.localLHEndpointSliceLister)
if err != nil {
return errors.Wrapf(err, "failed to start endpoints controller for %q", key)
}
Expand Down
29 changes: 17 additions & 12 deletions pkg/agent/controller/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/submariner-io/admiral/pkg/watcher"
"github.com/submariner-io/admiral/pkg/workqueue"
"k8s.io/apimachinery/pkg/api/meta"
k8slabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic"
mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
Expand All @@ -41,6 +42,8 @@ const (

var BrokerResyncPeriod = time.Minute * 2

type EndpointSliceListerFn func(selector k8slabels.Selector) []runtime.Object

type converter struct {
scheme *runtime.Scheme
}
Expand Down Expand Up @@ -78,18 +81,19 @@ type ServiceImportAggregator struct {
// from the submariner namespace and creates/updates the aggregated ServiceImport on the broker; the other that syncs
// aggregated ServiceImports from the broker to the local service namespace. It also creates a ServiceEndpointSliceController.
type ServiceImportController struct {
localClient dynamic.Interface
restMapper meta.RESTMapper
serviceImportAggregator *ServiceImportAggregator
serviceImportMigrator *ServiceImportMigrator
serviceExportClient *ServiceExportClient
localSyncer syncer.Interface
remoteSyncer syncer.Interface
endpointControllers sync.Map
clusterID string
localNamespace string
converter converter
globalIngressIPCache *globalIngressIPCache
localClient dynamic.Interface
restMapper meta.RESTMapper
serviceImportAggregator *ServiceImportAggregator
serviceImportMigrator *ServiceImportMigrator
serviceExportClient *ServiceExportClient
localSyncer syncer.Interface
remoteSyncer syncer.Interface
endpointControllers sync.Map
clusterID string
localNamespace string
converter converter
globalIngressIPCache *globalIngressIPCache
localLHEndpointSliceLister EndpointSliceListerFn
}

// Each ServiceEndpointSliceController watches for the EndpointSlices that backs a Service and have a ServiceImport.
Expand All @@ -115,6 +119,7 @@ type EndpointSliceController struct {
syncer *broker.Syncer
serviceImportAggregator *ServiceImportAggregator
serviceExportClient *ServiceExportClient
serviceSyncer syncer.Interface
conflictCheckWorkQueue workqueue.Interface
}

Expand Down

0 comments on commit ace7afe

Please sign in to comment.