From 978606f6a6c8ccbabfe6846edc104ff82ed5959e Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Thu, 21 Dec 2023 11:03:55 -0500 Subject: [PATCH] Handle transient ServiceImport delete in the resolver During DR testing where the broker cluster is recovered from a backup, it was observed that the coredns plugin resolver received a ServiceImport delete event followed by an add event while the EndpointSlice remained. This resulted in failed DNS queries b/c the service info was removed. The assumption is that a ServiceImport deletion means the service was unexported, which is normally the case. However, during DR scenarios, transient deletions may occur due to reconciliation on LH agent startup if it observes a local copy of a remote resource doesn't yet exist on the broker due to timing. When this occurs a restart of the coredns pod is required to correct it. To alleviate this issue, make the resolver more resilient by only removing the service info data when there's no more cluster data, ie when all the cluster EndpointSlices are deleted. Normally, on unexport, the EndpointSlices are deleted after the aggregated ServiceImport is deleted. Signed-off-by: Tom Pantelis --- coredns/resolver/controller_test.go | 18 ++++++++++++++++-- coredns/resolver/endpoint_slice.go | 4 +++- coredns/resolver/service_import.go | 11 ++++++++++- coredns/resolver/types.go | 1 + 4 files changed, 30 insertions(+), 4 deletions(-) diff --git a/coredns/resolver/controller_test.go b/coredns/resolver/controller_test.go index 2514ede90..a06cfc64d 100644 --- a/coredns/resolver/controller_test.go +++ b/coredns/resolver/controller_test.go @@ -69,23 +69,37 @@ var _ = Describe("Controller", func() { }) Context("and then the EndpointSlice is deleted", func() { - Specify("GetDNSRecords should eventually return no DNS record", func() { + JustBeforeEach(func() { t.awaitDNSRecordsFound(namespace1, service1, clusterID1, "", false, expDNSRecord) err := t.endpointSlices.Namespace(namespace1).Delete(context.TODO(), endpointSlice.Name, metav1.DeleteOptions{}) Expect(err).To(Succeed()) + }) + Specify("GetDNSRecords should eventually return no DNS record", func() { t.awaitDNSRecordsFound(namespace1, service1, "", "", false) }) + + Specify("GetDNSRecords should eventually return not found after the ServiceImport is deleted", func() { + err := t.serviceImports.Namespace(namespace1).Delete(context.TODO(), service1, metav1.DeleteOptions{}) + Expect(err).To(Succeed()) + + t.awaitDNSRecords(namespace1, service1, clusterID1, "", false) + }) }) Context("and then the ServiceImport is deleted", func() { - Specify("GetDNSRecords should eventually return not found", func() { + Specify("GetDNSRecords should eventually return not found after the EndpointSlice is deleted", func() { t.awaitDNSRecordsFound(namespace1, service1, clusterID1, "", false, expDNSRecord) err := t.serviceImports.Namespace(namespace1).Delete(context.TODO(), service1, metav1.DeleteOptions{}) Expect(err).To(Succeed()) + t.ensureDNSRecordsFound(namespace1, service1, clusterID1, "", false, expDNSRecord) + + err = t.endpointSlices.Namespace(namespace1).Delete(context.TODO(), endpointSlice.Name, metav1.DeleteOptions{}) + Expect(err).To(Succeed()) + t.awaitDNSRecords(namespace1, service1, clusterID1, "", false) }) }) diff --git a/coredns/resolver/endpoint_slice.go b/coredns/resolver/endpoint_slice.go index b02cee30e..e677c8475 100644 --- a/coredns/resolver/endpoint_slice.go +++ b/coredns/resolver/endpoint_slice.go @@ -253,7 +253,9 @@ func (i *Interface) RemoveEndpointSlice(endpointSlice *discovery.EndpointSlice) delete(serviceInfo.clusters, clusterID) - if !serviceInfo.isHeadless { + if len(serviceInfo.clusters) == 0 && !serviceInfo.isExported { + delete(i.serviceMap, key) + } else if !serviceInfo.isHeadless { serviceInfo.mergePorts() serviceInfo.resetLoadBalancing() } diff --git a/coredns/resolver/service_import.go b/coredns/resolver/service_import.go index 5a20a5235..1c36c7c4f 100644 --- a/coredns/resolver/service_import.go +++ b/coredns/resolver/service_import.go @@ -47,6 +47,8 @@ func (i *Interface) PutServiceImport(serviceImport *mcsv1a1.ServiceImport) { i.serviceMap[key] = svcInfo } + svcInfo.isExported = true + if svcInfo.isHeadless || !isLegacy { return } @@ -82,7 +84,14 @@ func (i *Interface) RemoveServiceImport(serviceImport *mcsv1a1.ServiceImport) { i.mutex.Lock() defer i.mutex.Unlock() - delete(i.serviceMap, key) + svcInfo, found := i.serviceMap[key] + if found { + if len(svcInfo.clusters) == 0 { + delete(i.serviceMap, key) + } else { + svcInfo.isExported = false + } + } } func getServiceImportKey(from *mcsv1a1.ServiceImport) (string, bool) { diff --git a/coredns/resolver/types.go b/coredns/resolver/types.go index 72e83c227..24f46d9d0 100644 --- a/coredns/resolver/types.go +++ b/coredns/resolver/types.go @@ -56,5 +56,6 @@ type serviceInfo struct { clusters map[string]*clusterInfo balancer loadbalancer.Interface isHeadless bool + isExported bool ports []mcsv1a1.ServicePort }