From 6bac66acd2f99d102bd066ce175935f2f2cee71d Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Tue, 17 Sep 2024 13:28:23 -0400 Subject: [PATCH] Implement clusterset IP in the CoredDNS plugin Modify the resolver to return the ServiceImport's IP if it's set and a specific cluster name wasn't requested. If the latter then return the cluster's DNS record info as it normally does. Signed-off-by: Tom Pantelis --- coredns/plugin/handler_test.go | 53 ++++++++++++++++++++++ coredns/resolver/clusterip_service_test.go | 37 +++++++++++++++ coredns/resolver/endpoint_slice.go | 4 +- coredns/resolver/resolver.go | 9 +++- coredns/resolver/service_import.go | 11 +++-- coredns/resolver/service_info.go | 4 ++ coredns/resolver/types.go | 2 +- 7 files changed, 112 insertions(+), 8 deletions(-) diff --git a/coredns/plugin/handler_test.go b/coredns/plugin/handler_test.go index 3a02d6f2c..30845624e 100644 --- a/coredns/plugin/handler_test.go +++ b/coredns/plugin/handler_test.go @@ -77,6 +77,7 @@ var _ = Describe("Lighthouse DNS plugin Handler", func() { Context("Headless services", testHeadlessService) Context("Local services", testLocalService) Context("Service with multiple ports", testSRVMultiplePorts) + Context("Service with clusterset IP", testClusterSetIP) }) type FailingResponseWriter struct { @@ -959,6 +960,58 @@ func testSRVMultiplePorts() { }) } +func testClusterSetIP() { + const clusterSetIP = "243.1.0.1" + + qname := fmt.Sprintf("%s.%s.svc.clusterset.local.", service1, namespace1) + + var ( + rec *dnstest.Recorder + t *handlerTestDriver + ) + + BeforeEach(func() { + t = newHandlerTestDriver() + + si := newServiceImport(namespace1, service1, mcsv1a1.ClusterSetIP) + si.Spec.IPs = []string{clusterSetIP} + si.Spec.Ports = []mcsv1a1.ServicePort{port1, port2} + + t.lh.Resolver.PutServiceImport(si) + + t.lh.Resolver.PutEndpointSlices(newEndpointSlice(namespace1, service1, clusterID, []mcsv1a1.ServicePort{port1}, + newEndpoint(serviceIP, "", true))) + + t.lh.Resolver.PutEndpointSlices(newEndpointSlice(namespace1, service1, clusterID2, []mcsv1a1.ServicePort{port2}, + newEndpoint(serviceIP2, "", true))) + + rec = dnstest.NewRecorder(&test.ResponseWriter{}) + }) + + Specify("DNS query of Type A record should succeed and write an A record response", func() { + t.executeTestCase(rec, test.Case{ + Qname: qname, + Qtype: dns.TypeA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.A(fmt.Sprintf("%s 5 IN A %s", qname, clusterSetIP)), + }, + }) + }) + + Specify("DNS query of Type SRV should succeed and write an SRV record response", func() { + t.executeTestCase(rec, test.Case{ + Qname: qname, + Qtype: dns.TypeSRV, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.SRV(fmt.Sprintf("%s 5 IN SRV 0 50 %d %s", qname, port2.Port, qname)), + test.SRV(fmt.Sprintf("%s 5 IN SRV 0 50 %d %s", qname, port1.Port, qname)), + }, + }) + }) +} + type handlerTestDriver struct { mockCs *fakecs.ClusterStatus lh *lighthouse.Lighthouse diff --git a/coredns/resolver/clusterip_service_test.go b/coredns/resolver/clusterip_service_test.go index 64cbf688d..d5482ebfd 100644 --- a/coredns/resolver/clusterip_service_test.go +++ b/coredns/resolver/clusterip_service_test.go @@ -32,6 +32,7 @@ var _ = Describe("GetDNSRecords", func() { When("a service is present in one cluster", testClusterIPServiceInOneCluster) When("a service is present in two clusters", testClusterIPServiceInTwoClusters) When("a service is present in three clusters", testClusterIPServiceInThreeClusters) + Context("with a clusterset IP", testClusterSetIP) testClusterIPServiceMisc() }) @@ -441,3 +442,39 @@ func testClusterIPServiceMisc() { }) }) } + +func testClusterSetIP() { + const clusterSetIP = "243.1.0.1" + + t := newTestDriver() + + BeforeEach(func() { + si := newAggregatedServiceImport(namespace1, service1) + si.Spec.IPs = []string{clusterSetIP} + si.Spec.Ports = []mcsv1a1.ServicePort{port1, port2} + + t.resolver.PutServiceImport(si) + + t.putEndpointSlice(newClusterIPEndpointSlice(namespace1, service1, clusterID1, serviceIP1, true, port1)) + t.putEndpointSlice(newClusterIPEndpointSlice(namespace1, service1, clusterID2, serviceIP2, true, port2)) + }) + + Context("and no specific cluster is requested", func() { + It("should return the clusterset IP DNS record", func() { + t.assertDNSRecordsFound(namespace1, service1, "", "", false, resolver.DNSRecord{ + IP: clusterSetIP, + Ports: []mcsv1a1.ServicePort{port1, port2}, + }) + }) + }) + + Context("and a cluster is requested", func() { + It("should return its DNS record", func() { + t.assertDNSRecordsFound(namespace1, service1, clusterID1, "", false, resolver.DNSRecord{ + IP: serviceIP1, + Ports: []mcsv1a1.ServicePort{port1}, + ClusterName: clusterID1, + }) + }) + }) +} diff --git a/coredns/resolver/endpoint_slice.go b/coredns/resolver/endpoint_slice.go index c5927471a..ae4276eaa 100644 --- a/coredns/resolver/endpoint_slice.go +++ b/coredns/resolver/endpoint_slice.go @@ -76,7 +76,7 @@ func (i *Interface) PutEndpointSlices(endpointSlices ...*discovery.EndpointSlice return true } - if !serviceInfo.isHeadless { + if !serviceInfo.isHeadless() { return i.putClusterIPEndpointSlice(key, clusterID, endpointSlices[0], serviceInfo) } @@ -264,7 +264,7 @@ func (i *Interface) RemoveEndpointSlice(endpointSlice *discovery.EndpointSlice) if len(serviceInfo.clusters) == 0 && !serviceInfo.isExported { delete(i.serviceMap, key) - } else if !serviceInfo.isHeadless { + } else if !serviceInfo.isHeadless() { serviceInfo.mergePorts() serviceInfo.resetLoadBalancing() } diff --git a/coredns/resolver/resolver.go b/coredns/resolver/resolver.go index ac308f7f2..ac8d51539 100644 --- a/coredns/resolver/resolver.go +++ b/coredns/resolver/resolver.go @@ -39,7 +39,7 @@ func (i *Interface) GetDNSRecords(namespace, name, clusterID, hostname string) ( return nil, false, false } - if !serviceInfo.isHeadless { + if !serviceInfo.isHeadless() { record, found := i.getClusterIPRecord(serviceInfo, clusterID) if record != nil { return []DNSRecord{*record}, false, true @@ -64,6 +64,13 @@ func (i *Interface) getClusterIPRecord(serviceInfo *serviceInfo, clusterID strin return &clusterInfo.endpointRecords[0], true } + if len(serviceInfo.spec.IPs) > 0 { + return &DNSRecord{ + IP: serviceInfo.spec.IPs[0], + Ports: serviceInfo.spec.Ports, + }, true + } + // If we are aware of the local cluster and we found some accessible IP, we shall return it. localClusterID := i.clusterStatus.GetLocalClusterID() if localClusterID != "" { diff --git a/coredns/resolver/service_import.go b/coredns/resolver/service_import.go index 1c36c7c4f..7ca0e3529 100644 --- a/coredns/resolver/service_import.go +++ b/coredns/resolver/service_import.go @@ -39,9 +39,12 @@ func (i *Interface) PutServiceImport(serviceImport *mcsv1a1.ServiceImport) { if !found { svcInfo = &serviceInfo{ - clusters: make(map[string]*clusterInfo), - balancer: loadbalancer.NewSmoothWeightedRR(), - isHeadless: serviceImport.Spec.Type == mcsv1a1.Headless, + clusters: make(map[string]*clusterInfo), + balancer: loadbalancer.NewSmoothWeightedRR(), + } + + if !isLegacy { + svcInfo.spec = serviceImport.Spec } i.serviceMap[key] = svcInfo @@ -49,7 +52,7 @@ func (i *Interface) PutServiceImport(serviceImport *mcsv1a1.ServiceImport) { svcInfo.isExported = true - if svcInfo.isHeadless || !isLegacy { + if svcInfo.isHeadless() || !isLegacy { return } diff --git a/coredns/resolver/service_info.go b/coredns/resolver/service_info.go index 46511c60b..abdea0753 100644 --- a/coredns/resolver/service_info.go +++ b/coredns/resolver/service_info.go @@ -88,3 +88,7 @@ func (si *serviceInfo) selectIP(checkCluster func(string) bool) *DNSRecord { return nil } + +func (si *serviceInfo) isHeadless() bool { + return si.spec.Type == mcsv1a1.Headless +} diff --git a/coredns/resolver/types.go b/coredns/resolver/types.go index 24f46d9d0..3d89fd93d 100644 --- a/coredns/resolver/types.go +++ b/coredns/resolver/types.go @@ -55,7 +55,7 @@ type clusterInfo struct { type serviceInfo struct { clusters map[string]*clusterInfo balancer loadbalancer.Interface - isHeadless bool isExported bool ports []mcsv1a1.ServicePort + spec mcsv1a1.ServiceImportSpec }