Skip to content

Commit

Permalink
Handle transient ServiceImport delete in the resolver
Browse files Browse the repository at this point in the history
During DR testing where the broker cluster is recovered from a backup,
it was observed that the coredns plugin resolver received a ServiceImport
delete event followed by an add event while the EndpointSlice remained.
This resulted in failed DNS queries b/c the service info was removed. The
assumption is that a ServiceImport deletion means the service was
unexported, which is normally the case. However, during DR scenarios,
transient deletions may occur due to reconciliation on LH agent startup
if it observes a local copy of a remote resource doesn't yet exist on the
broker due to timing. When this occurs a restart of the coredns pod is
required to correct it.

To alleviate this issue, make the resolver more resilient by only
removing the service info data when there's no more cluster data, ie
when all the cluster EndpointSlices are deleted. Normally, on unexport,
the EndpointSlices are deleted after the aggregated ServiceImport is
deleted.

Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis committed Dec 21, 2023
1 parent d6554ec commit 978606f
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 4 deletions.
18 changes: 16 additions & 2 deletions coredns/resolver/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,37 @@ var _ = Describe("Controller", func() {
})

Context("and then the EndpointSlice is deleted", func() {
Specify("GetDNSRecords should eventually return no DNS record", func() {
JustBeforeEach(func() {
t.awaitDNSRecordsFound(namespace1, service1, clusterID1, "", false, expDNSRecord)

err := t.endpointSlices.Namespace(namespace1).Delete(context.TODO(), endpointSlice.Name, metav1.DeleteOptions{})
Expect(err).To(Succeed())
})

Specify("GetDNSRecords should eventually return no DNS record", func() {
t.awaitDNSRecordsFound(namespace1, service1, "", "", false)
})

Specify("GetDNSRecords should eventually return not found after the ServiceImport is deleted", func() {
err := t.serviceImports.Namespace(namespace1).Delete(context.TODO(), service1, metav1.DeleteOptions{})
Expect(err).To(Succeed())

t.awaitDNSRecords(namespace1, service1, clusterID1, "", false)
})
})

Context("and then the ServiceImport is deleted", func() {
Specify("GetDNSRecords should eventually return not found", func() {
Specify("GetDNSRecords should eventually return not found after the EndpointSlice is deleted", func() {
t.awaitDNSRecordsFound(namespace1, service1, clusterID1, "", false, expDNSRecord)

err := t.serviceImports.Namespace(namespace1).Delete(context.TODO(), service1, metav1.DeleteOptions{})
Expect(err).To(Succeed())

t.ensureDNSRecordsFound(namespace1, service1, clusterID1, "", false, expDNSRecord)

err = t.endpointSlices.Namespace(namespace1).Delete(context.TODO(), endpointSlice.Name, metav1.DeleteOptions{})
Expect(err).To(Succeed())

t.awaitDNSRecords(namespace1, service1, clusterID1, "", false)
})
})
Expand Down
4 changes: 3 additions & 1 deletion coredns/resolver/endpoint_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,9 @@ func (i *Interface) RemoveEndpointSlice(endpointSlice *discovery.EndpointSlice)

delete(serviceInfo.clusters, clusterID)

if !serviceInfo.isHeadless {
if len(serviceInfo.clusters) == 0 && !serviceInfo.isExported {
delete(i.serviceMap, key)
} else if !serviceInfo.isHeadless {
serviceInfo.mergePorts()
serviceInfo.resetLoadBalancing()
}
Expand Down
11 changes: 10 additions & 1 deletion coredns/resolver/service_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ func (i *Interface) PutServiceImport(serviceImport *mcsv1a1.ServiceImport) {
i.serviceMap[key] = svcInfo
}

svcInfo.isExported = true

if svcInfo.isHeadless || !isLegacy {
return
}
Expand Down Expand Up @@ -82,7 +84,14 @@ func (i *Interface) RemoveServiceImport(serviceImport *mcsv1a1.ServiceImport) {
i.mutex.Lock()
defer i.mutex.Unlock()

delete(i.serviceMap, key)
svcInfo, found := i.serviceMap[key]
if found {
if len(svcInfo.clusters) == 0 {
delete(i.serviceMap, key)
} else {
svcInfo.isExported = false
}
}
}

func getServiceImportKey(from *mcsv1a1.ServiceImport) (string, bool) {
Expand Down
1 change: 1 addition & 0 deletions coredns/resolver/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,6 @@ type serviceInfo struct {
clusters map[string]*clusterInfo
balancer loadbalancer.Interface
isHeadless bool
isExported bool
ports []mcsv1a1.ServicePort
}

0 comments on commit 978606f

Please sign in to comment.