Skip to content

Commit

Permalink
Implement clusterset IP in the CoredDNS plugin
Browse files Browse the repository at this point in the history
Modify the resolver to return the ServiceImport's IP if it's
set and a specific cluster name wasn't requested. If the latter
then return the cluster's DNS record info as it normally does.

Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis committed Sep 23, 2024
1 parent 70fcb84 commit 6bac66a
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 8 deletions.
53 changes: 53 additions & 0 deletions coredns/plugin/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ var _ = Describe("Lighthouse DNS plugin Handler", func() {
Context("Headless services", testHeadlessService)
Context("Local services", testLocalService)
Context("Service with multiple ports", testSRVMultiplePorts)
Context("Service with clusterset IP", testClusterSetIP)
})

type FailingResponseWriter struct {
Expand Down Expand Up @@ -959,6 +960,58 @@ func testSRVMultiplePorts() {
})
}

func testClusterSetIP() {
const clusterSetIP = "243.1.0.1"

qname := fmt.Sprintf("%s.%s.svc.clusterset.local.", service1, namespace1)

var (
rec *dnstest.Recorder
t *handlerTestDriver
)

BeforeEach(func() {
t = newHandlerTestDriver()

si := newServiceImport(namespace1, service1, mcsv1a1.ClusterSetIP)
si.Spec.IPs = []string{clusterSetIP}
si.Spec.Ports = []mcsv1a1.ServicePort{port1, port2}

t.lh.Resolver.PutServiceImport(si)

t.lh.Resolver.PutEndpointSlices(newEndpointSlice(namespace1, service1, clusterID, []mcsv1a1.ServicePort{port1},
newEndpoint(serviceIP, "", true)))

t.lh.Resolver.PutEndpointSlices(newEndpointSlice(namespace1, service1, clusterID2, []mcsv1a1.ServicePort{port2},
newEndpoint(serviceIP2, "", true)))

rec = dnstest.NewRecorder(&test.ResponseWriter{})
})

Specify("DNS query of Type A record should succeed and write an A record response", func() {
t.executeTestCase(rec, test.Case{
Qname: qname,
Qtype: dns.TypeA,
Rcode: dns.RcodeSuccess,
Answer: []dns.RR{
test.A(fmt.Sprintf("%s 5 IN A %s", qname, clusterSetIP)),
},
})
})

Specify("DNS query of Type SRV should succeed and write an SRV record response", func() {
t.executeTestCase(rec, test.Case{
Qname: qname,
Qtype: dns.TypeSRV,
Rcode: dns.RcodeSuccess,
Answer: []dns.RR{
test.SRV(fmt.Sprintf("%s 5 IN SRV 0 50 %d %s", qname, port2.Port, qname)),
test.SRV(fmt.Sprintf("%s 5 IN SRV 0 50 %d %s", qname, port1.Port, qname)),
},
})
})
}

type handlerTestDriver struct {
mockCs *fakecs.ClusterStatus
lh *lighthouse.Lighthouse
Expand Down
37 changes: 37 additions & 0 deletions coredns/resolver/clusterip_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var _ = Describe("GetDNSRecords", func() {
When("a service is present in one cluster", testClusterIPServiceInOneCluster)
When("a service is present in two clusters", testClusterIPServiceInTwoClusters)
When("a service is present in three clusters", testClusterIPServiceInThreeClusters)
Context("with a clusterset IP", testClusterSetIP)

testClusterIPServiceMisc()
})
Expand Down Expand Up @@ -441,3 +442,39 @@ func testClusterIPServiceMisc() {
})
})
}

func testClusterSetIP() {
const clusterSetIP = "243.1.0.1"

t := newTestDriver()

BeforeEach(func() {
si := newAggregatedServiceImport(namespace1, service1)
si.Spec.IPs = []string{clusterSetIP}
si.Spec.Ports = []mcsv1a1.ServicePort{port1, port2}

t.resolver.PutServiceImport(si)

t.putEndpointSlice(newClusterIPEndpointSlice(namespace1, service1, clusterID1, serviceIP1, true, port1))
t.putEndpointSlice(newClusterIPEndpointSlice(namespace1, service1, clusterID2, serviceIP2, true, port2))
})

Context("and no specific cluster is requested", func() {
It("should return the clusterset IP DNS record", func() {
t.assertDNSRecordsFound(namespace1, service1, "", "", false, resolver.DNSRecord{
IP: clusterSetIP,
Ports: []mcsv1a1.ServicePort{port1, port2},
})
})
})

Context("and a cluster is requested", func() {
It("should return its DNS record", func() {
t.assertDNSRecordsFound(namespace1, service1, clusterID1, "", false, resolver.DNSRecord{
IP: serviceIP1,
Ports: []mcsv1a1.ServicePort{port1},
ClusterName: clusterID1,
})
})
})
}
4 changes: 2 additions & 2 deletions coredns/resolver/endpoint_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (i *Interface) PutEndpointSlices(endpointSlices ...*discovery.EndpointSlice
return true
}

if !serviceInfo.isHeadless {
if !serviceInfo.isHeadless() {
return i.putClusterIPEndpointSlice(key, clusterID, endpointSlices[0], serviceInfo)
}

Expand Down Expand Up @@ -264,7 +264,7 @@ func (i *Interface) RemoveEndpointSlice(endpointSlice *discovery.EndpointSlice)

if len(serviceInfo.clusters) == 0 && !serviceInfo.isExported {
delete(i.serviceMap, key)
} else if !serviceInfo.isHeadless {
} else if !serviceInfo.isHeadless() {
serviceInfo.mergePorts()
serviceInfo.resetLoadBalancing()
}
Expand Down
9 changes: 8 additions & 1 deletion coredns/resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (i *Interface) GetDNSRecords(namespace, name, clusterID, hostname string) (
return nil, false, false
}

if !serviceInfo.isHeadless {
if !serviceInfo.isHeadless() {
record, found := i.getClusterIPRecord(serviceInfo, clusterID)
if record != nil {
return []DNSRecord{*record}, false, true
Expand All @@ -64,6 +64,13 @@ func (i *Interface) getClusterIPRecord(serviceInfo *serviceInfo, clusterID strin
return &clusterInfo.endpointRecords[0], true
}

if len(serviceInfo.spec.IPs) > 0 {
return &DNSRecord{
IP: serviceInfo.spec.IPs[0],
Ports: serviceInfo.spec.Ports,
}, true
}

// If we are aware of the local cluster and we found some accessible IP, we shall return it.
localClusterID := i.clusterStatus.GetLocalClusterID()
if localClusterID != "" {
Expand Down
11 changes: 7 additions & 4 deletions coredns/resolver/service_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,20 @@ func (i *Interface) PutServiceImport(serviceImport *mcsv1a1.ServiceImport) {

if !found {
svcInfo = &serviceInfo{
clusters: make(map[string]*clusterInfo),
balancer: loadbalancer.NewSmoothWeightedRR(),
isHeadless: serviceImport.Spec.Type == mcsv1a1.Headless,
clusters: make(map[string]*clusterInfo),
balancer: loadbalancer.NewSmoothWeightedRR(),
}

if !isLegacy {
svcInfo.spec = serviceImport.Spec
}

i.serviceMap[key] = svcInfo
}

svcInfo.isExported = true

if svcInfo.isHeadless || !isLegacy {
if svcInfo.isHeadless() || !isLegacy {
return
}

Expand Down
4 changes: 4 additions & 0 deletions coredns/resolver/service_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,7 @@ func (si *serviceInfo) selectIP(checkCluster func(string) bool) *DNSRecord {

return nil
}

func (si *serviceInfo) isHeadless() bool {
return si.spec.Type == mcsv1a1.Headless
}
2 changes: 1 addition & 1 deletion coredns/resolver/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type clusterInfo struct {
type serviceInfo struct {
clusters map[string]*clusterInfo
balancer loadbalancer.Interface
isHeadless bool
isExported bool
ports []mcsv1a1.ServicePort
spec mcsv1a1.ServiceImportSpec
}

0 comments on commit 6bac66a

Please sign in to comment.