From 6371e208e5c84d7e4f43b4f61dcb9bf282cd4ff9 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 15 Feb 2023 12:56:12 -0500 Subject: [PATCH] Introduce resolver module to the CoreDNS plugin Currently, ServiceImport and EndpointSlice processing are in separate components, with ServiceImports used for resolving non-headless service queries and EndpointSlice used for headless services. However, the upcoming ServiceImport aggregation changes will require direct coordination between ServiceImports and EndpointSlices for resolving DNS records. Thus, it makes sense to combine the processing in 'serviceimport.Map' and 'endpointslice.Map ' into a single component. A new 'resolver' module is introduced to contain a controller component that watches for ServiceImports and EndpointSlices and feeds them into a resolver component that builds internal structs used to provide DNSRecords on request. Subsequent commits will modify the plugin handler to user the resolver. Signed-off-by: Tom Pantelis --- coredns/resolver/clusterip_service_test.go | 403 +++++++++++++++++++++ coredns/resolver/controller.go | 111 ++++++ coredns/resolver/controller_test.go | 114 ++++++ coredns/resolver/endpoint_slice.go | 235 ++++++++++++ coredns/resolver/endpoint_slice_test.go | 100 +++++ coredns/resolver/fake/cluster_status.go | 47 +++ coredns/resolver/headless_service_test.go | 275 ++++++++++++++ coredns/resolver/resolver.go | 111 ++++++ coredns/resolver/resolver_suite_test.go | 335 +++++++++++++++++ coredns/resolver/service_import.go | 146 ++++++++ coredns/resolver/service_info.go | 89 +++++ coredns/resolver/types.go | 60 +++ 12 files changed, 2026 insertions(+) create mode 100644 coredns/resolver/clusterip_service_test.go create mode 100644 coredns/resolver/controller.go create mode 100644 coredns/resolver/controller_test.go create mode 100644 coredns/resolver/endpoint_slice.go create mode 100644 coredns/resolver/endpoint_slice_test.go create mode 100644 coredns/resolver/fake/cluster_status.go create mode 100644 coredns/resolver/headless_service_test.go create mode 100644 coredns/resolver/resolver.go create mode 100644 coredns/resolver/resolver_suite_test.go create mode 100644 coredns/resolver/service_import.go create mode 100644 coredns/resolver/service_info.go create mode 100644 coredns/resolver/types.go diff --git a/coredns/resolver/clusterip_service_test.go b/coredns/resolver/clusterip_service_test.go new file mode 100644 index 000000000..0fdf5da02 --- /dev/null +++ b/coredns/resolver/clusterip_service_test.go @@ -0,0 +1,403 @@ +/* +SPDX-License-Identifier: Apache-2.0 + +Copyright Contributors to the Submariner project. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resolver_test + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/submariner-io/lighthouse/coredns/resolver" + mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" +) + +var _ = Describe("GetDNSRecords", func() { + Describe("ClusterIP Service", func() { + When("a service is present in one cluster", testClusterIPServiceInOneCluster) + When("a service is present in two clusters", testClusterIPServiceInTwoClusters) + When("a service is present in three clusters", testClusterIPServiceInThreeClusters) + + testClusterIPServiceMisc() + }) +}) + +func testClusterIPServiceInOneCluster() { + t := newTestDriver() + + expDNSRecord := resolver.DNSRecord{ + IP: serviceIP1, + Ports: []mcsv1a1.ServicePort{port1}, + ClusterName: clusterID1, + } + + BeforeEach(func() { + t.resolver.PutServiceImport(newClusterServiceImport(namespace1, service1, expDNSRecord.IP, expDNSRecord.ClusterName, + expDNSRecord.Ports...)) + + t.putEndpointSlice(newClusterIPEndpointSlice(namespace1, service1, expDNSRecord.ClusterName, expDNSRecord.IP, true)) + }) + + Context("and no specific cluster is requested", func() { + It("should consistently return its DNS record", func() { + for i := 0; i < 5; i++ { + t.assertDNSRecordsFound(namespace1, service1, "", "", false, expDNSRecord) + } + }) + }) + + Context("and the cluster is requested", func() { + It("should consistently return its DNS record", func() { + for i := 0; i < 5; i++ { + t.assertDNSRecordsFound(namespace1, service1, clusterID1, "", false, expDNSRecord) + } + }) + }) + + Context("and it becomes disconnected", func() { + BeforeEach(func() { + t.clusterStatus.ConnectedClusterIDs.RemoveAll() + }) + + It("should return no DNS records", func() { + t.assertDNSRecordsFound(namespace1, service1, "", "", false) + }) + + Context("and the cluster is requested", func() { + It("should still return its DNS record", func() { + t.assertDNSRecordsFound(namespace1, service1, clusterID1, "", false, expDNSRecord) + }) + }) + }) + + Context("and it becomes unhealthy", func() { + BeforeEach(func() { + t.putEndpointSlice(newClusterIPEndpointSlice(namespace1, service1, clusterID1, serviceIP1, false)) + }) + + It("should return no DNS records", func() { + t.assertDNSRecordsFound(namespace1, service1, "", "", false) + }) + + Context("and the cluster is requested", func() { + It("should still return its DNS record", func() { + t.assertDNSRecordsFound(namespace1, service1, clusterID1, "", false, expDNSRecord) + }) + }) + }) + + Context("and the service information is updated", func() { + BeforeEach(func() { + t.resolver.PutServiceImport(newClusterServiceImport(namespace1, service1, serviceIP2, clusterID1, port2)) + }) + + It("should return the correct DNS record information", func() { + t.assertDNSRecordsFound(namespace1, service1, "", "", false, resolver.DNSRecord{ + IP: serviceIP2, + Ports: []mcsv1a1.ServicePort{port2}, + ClusterName: clusterID1, + }) + }) + }) + + Context("and a non-existent cluster is specified", func() { + It("should return no DNS records found", func() { + t.assertDNSRecordsNotFound(namespace1, service1, "non-existent", "") + }) + }) +} + +func testClusterIPServiceInTwoClusters() { + t := newTestDriver() + + BeforeEach(func() { + t.resolver.PutServiceImport(newClusterServiceImport(namespace1, service1, serviceIP1, clusterID1, port1)) + t.resolver.PutServiceImport(newClusterServiceImport(namespace1, service1, serviceIP2, clusterID2, port1)) + + t.putEndpointSlice(newClusterIPEndpointSlice(namespace1, service1, clusterID1, serviceIP1, true)) + t.putEndpointSlice(newClusterIPEndpointSlice(namespace1, service1, clusterID2, serviceIP2, true)) + }) + + Context("and no specific cluster is requested", func() { + It("should consistently return the DNS records round-robin", func() { + t.testRoundRobin(namespace1, service1, serviceIP1, serviceIP2) + }) + }) + + Context("and one is the local cluster", func() { + BeforeEach(func() { + t.clusterStatus.LocalClusterID.Store(clusterID1) + }) + + It("should consistently return its DNS record", func() { + for i := 0; i < 10; i++ { + Expect(t.getNonHeadlessDNSRecord(namespace1, service1, "").IP).To(Equal(serviceIP1)) + } + }) + }) + + Context("and one becomes disconnected", func() { + expDNSRecord := resolver.DNSRecord{ + IP: serviceIP2, + Ports: []mcsv1a1.ServicePort{port1}, + ClusterName: clusterID2, + } + + BeforeEach(func() { + t.clusterStatus.ConnectedClusterIDs.Remove(clusterID1) + }) + + Context("and no specific cluster is requested", func() { + It("should consistently return the DNS record of the connected cluster", func() { + for i := 0; i < 10; i++ { + t.assertDNSRecordsFound(namespace1, service1, "", "", false, expDNSRecord) + } + }) + }) + + Context("and the disconnected cluster is requested", func() { + It("should still return its DNS record", func() { + t.assertDNSRecordsFound(namespace1, service1, clusterID1, "", false, resolver.DNSRecord{ + IP: serviceIP1, + Ports: []mcsv1a1.ServicePort{port1}, + ClusterName: clusterID1, + }) + }) + }) + + Context("and the connected cluster is requested", func() { + It("should return its DNS record", func() { + t.assertDNSRecordsFound(namespace1, service1, clusterID2, "", false, expDNSRecord) + }) + }) + }) + + Context("and both become disconnected", func() { + BeforeEach(func() { + t.clusterStatus.ConnectedClusterIDs.RemoveAll() + }) + + It("should return no DNS records", func() { + t.assertDNSRecordsFound(namespace1, service1, "", "", false) + }) + }) + + Context("and one becomes unhealthy", func() { + expDNSRecord := resolver.DNSRecord{ + IP: serviceIP1, + Ports: []mcsv1a1.ServicePort{port1}, + ClusterName: clusterID1, + } + + BeforeEach(func() { + t.putEndpointSlice(newClusterIPEndpointSlice(namespace1, service1, clusterID2, serviceIP2, false)) + }) + + Context("and no specific cluster is requested", func() { + It("should consistently return the DNS record of the healthy cluster", func() { + for i := 0; i < 10; i++ { + t.assertDNSRecordsFound(namespace1, service1, "", "", false, expDNSRecord) + } + }) + }) + + Context("and the unhealthy cluster is requested", func() { + It("should still return its DNS record", func() { + t.assertDNSRecordsFound(namespace1, service1, clusterID2, "", false, resolver.DNSRecord{ + IP: serviceIP2, + Ports: []mcsv1a1.ServicePort{port1}, + ClusterName: clusterID2, + }) + }) + }) + + Context("and the healthy cluster is requested", func() { + It("should return its DNS record", func() { + t.assertDNSRecordsFound(namespace1, service1, clusterID1, "", false, expDNSRecord) + }) + }) + }) + + Context("and one is subsequently removed", func() { + expDNSRecord := resolver.DNSRecord{ + IP: serviceIP1, + Ports: []mcsv1a1.ServicePort{port1}, + ClusterName: clusterID1, + } + + BeforeEach(func() { + t.resolver.RemoveServiceImport(newClusterServiceImport(namespace1, service1, serviceIP2, clusterID2)) + }) + + It("should consistently return the DNS record of the remaining cluster", func() { + for i := 0; i < 10; i++ { + t.assertDNSRecordsFound(namespace1, service1, "", "", false, expDNSRecord) + } + }) + }) + + Context("and a non-existent local cluster is specified", func() { + BeforeEach(func() { + t.clusterStatus.LocalClusterID.Store("non-existent") + }) + + It("should consistently return the DNS records round-robin", func() { + t.testRoundRobin(namespace1, service1, serviceIP1, serviceIP2) + }) + }) +} + +func testClusterIPServiceInThreeClusters() { + t := newTestDriver() + + BeforeEach(func() { + t.resolver.PutServiceImport(newClusterServiceImport(namespace1, service1, serviceIP1, clusterID1, port1)) + t.resolver.PutServiceImport(newClusterServiceImport(namespace1, service1, serviceIP2, clusterID2, port1, port2)) + t.resolver.PutServiceImport(newClusterServiceImport(namespace1, service1, serviceIP3, clusterID3, port1)) + + t.putEndpointSlice(newClusterIPEndpointSlice(namespace1, service1, clusterID1, serviceIP1, true)) + t.putEndpointSlice(newClusterIPEndpointSlice(namespace1, service1, clusterID2, serviceIP2, true)) + t.putEndpointSlice(newClusterIPEndpointSlice(namespace1, service1, clusterID3, serviceIP3, true)) + }) + + Context("and no specific cluster is requested", func() { + It("should consistently return the DNS records round-robin", func() { + t.testRoundRobin(namespace1, service1, serviceIP1, serviceIP2, serviceIP3) + }) + + It("should consistently return the merged service ports", func() { + for i := 0; i < 10; i++ { + Expect(t.getNonHeadlessDNSRecord(namespace1, service1, "").Ports).To(Equal([]mcsv1a1.ServicePort{port1})) + } + }) + }) + + Context("and a specific cluster is requested", func() { + expDNSRecord := resolver.DNSRecord{ + IP: serviceIP2, + Ports: []mcsv1a1.ServicePort{port1, port2}, + ClusterName: clusterID2, + } + + It("should consistently return its DNS record", func() { + for i := 0; i < 10; i++ { + t.assertDNSRecordsFound(namespace1, service1, clusterID2, "", false, expDNSRecord) + } + }) + }) + + Context("and one becomes disconnected", func() { + BeforeEach(func() { + t.clusterStatus.ConnectedClusterIDs.Remove(clusterID3) + }) + + It("should consistently return the connected clusters' DNS records round-robin", func() { + t.testRoundRobin(namespace1, service1, serviceIP1, serviceIP2) + }) + }) + + Context("and one becomes unhealthy", func() { + BeforeEach(func() { + t.putEndpointSlice(newClusterIPEndpointSlice(namespace1, service1, clusterID2, serviceIP2, false)) + }) + + It("should consistently return the healthy clusters' DNS records round-robin", func() { + t.testRoundRobin(namespace1, service1, serviceIP1, serviceIP3) + }) + + Context("and subsequently healthy again", func() { + It("should consistently return the all DNS records round-robin", func() { + for i := 0; i < 5; i++ { + Expect(t.getNonHeadlessDNSRecord(namespace1, service1, "").IP).To(Or(Equal(serviceIP1), Equal(serviceIP3))) + } + + t.putEndpointSlice(newClusterIPEndpointSlice(namespace1, service1, clusterID2, serviceIP2, true)) + + t.testRoundRobin(namespace1, service1, serviceIP1, serviceIP2, serviceIP3) + }) + }) + }) + + Context("and one becomes disconnected and one becomes unhealthy", func() { + BeforeEach(func() { + t.clusterStatus.ConnectedClusterIDs.Remove(clusterID2) + t.putEndpointSlice(newClusterIPEndpointSlice(namespace1, service1, clusterID3, serviceIP3, false)) + }) + + It("should consistently return the remaining cluster's DNS record round-robin", func() { + t.testRoundRobin(namespace1, service1, serviceIP1) + }) + }) +} + +func testClusterIPServiceMisc() { + t := newTestDriver() + + When("a service exists in two namespaces", func() { + BeforeEach(func() { + t.resolver.PutServiceImport(newClusterServiceImport(namespace1, service1, serviceIP1, clusterID1)) + t.resolver.PutServiceImport(newClusterServiceImport(namespace2, service1, serviceIP2, clusterID1)) + }) + + It("should return the correct DNS record for each namespace", func() { + t.assertDNSRecordsFound(namespace1, service1, clusterID1, "", false, resolver.DNSRecord{ + IP: serviceIP1, + ClusterName: clusterID1, + }) + + t.assertDNSRecordsFound(namespace2, service1, clusterID1, "", false, resolver.DNSRecord{ + IP: serviceIP2, + ClusterName: clusterID1, + }) + }) + }) + + When("a per-cluster ServiceImport has the legacy annotations", func() { + BeforeEach(func() { + si := newClusterServiceImport(namespace1, service1, serviceIP1, clusterID1, port1) + si.Labels = map[string]string{"lighthouse.submariner.io/sourceCluster": clusterID1} + si.Annotations = map[string]string{"origin-name": service1, "origin-namespace": namespace1} + t.resolver.PutServiceImport(si) + }) + + It("should correctly process it and return its DNS record", func() { + t.assertDNSRecordsFound(namespace1, service1, clusterID1, "", false, resolver.DNSRecord{ + IP: serviceIP1, + Ports: []mcsv1a1.ServicePort{port1}, + ClusterName: clusterID1, + }) + }) + }) + + When("a cluster's EndpointSlice is created before its ServiceImport", func() { + It("should correctly process them and return its DNS record", func() { + t.resolver.PutServiceImport(newClusterServiceImport(namespace1, service1, serviceIP2, clusterID2)) + + es := newClusterIPEndpointSlice(namespace1, service1, clusterID1, serviceIP1, true) + Expect(t.resolver.PutEndpointSlice(es)).To(BeTrue()) + + t.awaitDNSRecords(namespace1, service1, clusterID1, "", false) + + t.resolver.PutServiceImport(newClusterServiceImport(namespace1, service1, serviceIP1, clusterID1)) + t.putEndpointSlice(es) + + t.assertDNSRecordsFound(namespace1, service1, "", "", false, resolver.DNSRecord{ + IP: serviceIP1, + ClusterName: clusterID1, + }) + }) + }) +} diff --git a/coredns/resolver/controller.go b/coredns/resolver/controller.go new file mode 100644 index 000000000..06099b263 --- /dev/null +++ b/coredns/resolver/controller.go @@ -0,0 +1,111 @@ +/* +SPDX-License-Identifier: Apache-2.0 + +Copyright Contributors to the Submariner project. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resolver + +import ( + "github.com/pkg/errors" + "github.com/submariner-io/admiral/pkg/log" + "github.com/submariner-io/admiral/pkg/watcher" + "github.com/submariner-io/lighthouse/coredns/constants" + discovery "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + logf "sigs.k8s.io/controller-runtime/pkg/log" + mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" +) + +var logger = log.Logger{Logger: logf.Log.WithName("Resolver")} + +type controller struct { + resolver *Interface + stopCh chan struct{} +} + +func NewController(r *Interface) *controller { + return &controller{ + resolver: r, + stopCh: make(chan struct{}), + } +} + +func (c *controller) Start(config watcher.Config) error { + logger.Infof("Starting Resolver Controller") + + config.ResourceConfigs = []watcher.ResourceConfig{ + { + Name: "EndpointSlice watcher", + ResourceType: &discovery.EndpointSlice{}, + SourceNamespace: metav1.NamespaceAll, + SourceLabelSelector: labels.Set(map[string]string{discovery.LabelManagedBy: constants.LabelValueManagedBy}).String(), + Handler: watcher.EventHandlerFuncs{ + OnCreateFunc: c.onEndpointSliceCreateOrUpdate, + OnUpdateFunc: c.onEndpointSliceCreateOrUpdate, + OnDeleteFunc: c.onEndpointSliceDelete, + }, + }, + { + Name: "ServiceImport watcher", + ResourceType: &mcsv1a1.ServiceImport{}, + SourceNamespace: metav1.NamespaceAll, + Handler: watcher.EventHandlerFuncs{ + OnCreateFunc: c.onServiceImportCreateOrUpdate, + OnUpdateFunc: c.onServiceImportCreateOrUpdate, + OnDeleteFunc: c.onServiceImportDelete, + }, + }, + } + + resourceWatcher, err := watcher.New(&config) + if err != nil { + return errors.Wrap(err, "error creating the resource watcher") + } + + err = resourceWatcher.Start(c.stopCh) + if err != nil { + return errors.Wrap(err, "error starting the resource watcher") + } + + return nil +} + +func (c *controller) Stop() { + close(c.stopCh) + + logger.Infof("Resolver Controller stopped") +} + +func (c *controller) onEndpointSliceCreateOrUpdate(obj runtime.Object, _ int) bool { + return c.resolver.PutEndpointSlice(obj.(*discovery.EndpointSlice)) +} + +func (c *controller) onEndpointSliceDelete(obj runtime.Object, _ int) bool { + c.resolver.RemoveEndpointSlice(obj.(*discovery.EndpointSlice)) + return false +} + +func (c *controller) onServiceImportCreateOrUpdate(obj runtime.Object, _ int) bool { + c.resolver.PutServiceImport(obj.(*mcsv1a1.ServiceImport)) + return false +} + +func (c *controller) onServiceImportDelete(obj runtime.Object, _ int) bool { + c.resolver.RemoveServiceImport(obj.(*mcsv1a1.ServiceImport)) + return false +} diff --git a/coredns/resolver/controller_test.go b/coredns/resolver/controller_test.go new file mode 100644 index 000000000..dd9cad37e --- /dev/null +++ b/coredns/resolver/controller_test.go @@ -0,0 +1,114 @@ +/* +SPDX-License-Identifier: Apache-2.0 + +Copyright Contributors to the Submariner project. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resolver_test + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/submariner-io/lighthouse/coredns/resolver" + discovery "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" +) + +var _ = Describe("Controller", func() { + t := newTestDriver() + + expDNSRecord := resolver.DNSRecord{ + IP: serviceIP1, + Ports: []mcsv1a1.ServicePort{port1}, + ClusterName: clusterID1, + } + + When("a ServiceImport is created", func() { + var serviceImport *mcsv1a1.ServiceImport + + BeforeEach(func() { + serviceImport = newClusterServiceImport(namespace1, service1, serviceIP1, clusterID1, port1) + t.createServiceImport(serviceImport) + }) + + Specify("GetDNSRecords should return the cluster's DNS record when requested", func() { + t.awaitDNSRecordsFound(namespace1, service1, clusterID1, "", false, expDNSRecord) + }) + + Context("and is subsequently deleted", func() { + Specify("GetDNSRecords should eventually return no DNS record found", func() { + t.awaitDNSRecords(namespace1, service1, clusterID1, "", true) + + err := t.serviceImports.Namespace(serviceImport.Namespace).Delete(context.TODO(), serviceImport.Name, metav1.DeleteOptions{}) + Expect(err).To(Succeed()) + + t.awaitDNSRecords(namespace1, service1, clusterID1, "", false) + }) + }) + }) + + When("an EndpointSlice is created", func() { + var endpointSlice *discovery.EndpointSlice + + JustBeforeEach(func() { + endpointSlice = newClusterIPEndpointSlice(namespace1, service1, clusterID1, serviceIP1, true) + t.createEndpointSlice(endpointSlice) + }) + + Context("before a ServiceImport", func() { + Specify("GetDNSRecords should eventually deem the cluster healthy and return its DNS record", func() { + Consistently(func() bool { + _, _, found := t.resolver.GetDNSRecords(namespace1, service1, "", "") + return found + }).Should(BeFalse()) + + t.createServiceImport(newClusterServiceImport(namespace1, service1, serviceIP1, clusterID1, port1)) + + t.awaitDNSRecordsFound(namespace1, service1, clusterID1, "", false, expDNSRecord) + }) + }) + + Context("after a ServiceImport", func() { + BeforeEach(func() { + t.createServiceImport(newClusterServiceImport(namespace1, service1, serviceIP1, clusterID1, port1)) + }) + + Context("and then deleted", func() { + Specify("GetDNSRecords should eventually deem the cluster unhealthy and return no DNS record", func() { + t.awaitDNSRecordsFound(namespace1, service1, clusterID1, "", false, expDNSRecord) + + err := t.endpointSlices.Namespace(namespace1).Delete(context.TODO(), endpointSlice.Name, metav1.DeleteOptions{}) + Expect(err).To(Succeed()) + + t.awaitDNSRecordsFound(namespace1, service1, "", "", false) + }) + }) + + Context("and then updated to unhealthy", func() { + Specify("GetDNSRecords should eventually return no DNS record", func() { + t.awaitDNSRecordsFound(namespace1, service1, clusterID1, "", false, expDNSRecord) + + err := t.endpointSlices.Namespace(namespace1).Delete(context.TODO(), endpointSlice.Name, metav1.DeleteOptions{}) + Expect(err).To(Succeed()) + + t.awaitDNSRecordsFound(namespace1, service1, "", "", false) + }) + }) + }) + }) +}) diff --git a/coredns/resolver/endpoint_slice.go b/coredns/resolver/endpoint_slice.go new file mode 100644 index 000000000..663c28b20 --- /dev/null +++ b/coredns/resolver/endpoint_slice.go @@ -0,0 +1,235 @@ +/* +SPDX-License-Identifier: Apache-2.0 + +Copyright Contributors to the Submariner project. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resolver + +import ( + "context" + "fmt" + + "github.com/submariner-io/admiral/pkg/log" + "github.com/submariner-io/lighthouse/coredns/constants" + discovery "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" +) + +func (i *Interface) PutEndpointSlice(endpointSlice *discovery.EndpointSlice) bool { + logger.Infof("Put EndpointSlice: %#v", endpointSlice.ObjectMeta) + + key, clusterID, ok := getKeyInfoFrom(endpointSlice) + if !ok { + return false + } + + var localEndpointSliceErr error + var localEndpointSlice *discovery.EndpointSlice + + localClusterID := i.clusterStatus.GetLocalClusterID() + if localClusterID != "" && clusterID == localClusterID { + // The EndpointSlice is from the local cluster. If globalnet is enabled, the local global endpoint IPs aren't + // routable in the local cluster so we retrieve the K8s EndpointSlice and use those endpoints. Note that this + // only applies to headless services. + localEndpointSlice, localEndpointSliceErr = i.getLocalEndpointSlice(endpointSlice) + } + + i.mutex.Lock() + defer i.mutex.Unlock() + + serviceInfo, found := i.serviceMap[key] + if !found { + // This means we haven't observed a ServiceImport yet for the service. Return true for the controller to re-queue it. + logger.Infof("Service not found for EndpointSlice %q - requeuing", key) + + return true + } + + if !serviceInfo.isHeadless { + return i.putClusterIPEndpointSlice(key, clusterID, endpointSlice, serviceInfo) + } + + if localEndpointSliceErr != nil { + logger.Error(localEndpointSliceErr, "unable to retrieve local EndpointSlice - requeuing") + + return true + } + + i.putHeadlessEndpointSlice(key, clusterID, endpointSlice, serviceInfo, localEndpointSlice) + + return false +} + +func (i *Interface) putClusterIPEndpointSlice(key, clusterID string, endpointSlice *discovery.EndpointSlice, serviceInfo *serviceInfo) bool { + clusterInfo, found := serviceInfo.clusters[clusterID] + if !found { + logger.Infof("Cluster %q not found for EndpointSlice %q - requeuing", clusterID, key) + return true + } + + // For a ClusterIPService we really only care if there are any backing endpoints. + clusterInfo.endpointsHealthy = len(endpointSlice.Endpoints) > 0 + + return false +} + +func (i *Interface) putHeadlessEndpointSlice(key, clusterID string, endpointSlice *discovery.EndpointSlice, serviceInfo *serviceInfo, + localEndpointSlice *discovery.EndpointSlice) { + clusterInfo := &clusterInfo{ + endpointRecordsByHost: make(map[string][]DNSRecord), + } + + serviceInfo.clusters[clusterID] = clusterInfo + + mcsPorts := make([]mcsv1a1.ServicePort, len(endpointSlice.Ports)) + for i, port := range endpointSlice.Ports { + mcsPorts[i] = mcsv1a1.ServicePort{ + Name: *port.Name, + Protocol: *port.Protocol, + AppProtocol: port.AppProtocol, + Port: *port.Port, + } + } + + for i := range endpointSlice.Endpoints { + endpoint := &endpointSlice.Endpoints[i] + + // Skip if not ready. Note: we're treating nil as ready to be on the safe side as the EndpointConditions doc + // states "In most cases consumers should interpret this unknown state (ie nil) as ready". + if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready { + continue + } + + var records []DNSRecord + + addresses := endpoint.Addresses + if localEndpointSlice != nil { + for _, localEndpoint := range localEndpointSlice.Endpoints { + if localEndpoint.NodeName != nil && endpoint.NodeName != nil && *localEndpoint.NodeName == *endpoint.NodeName && + (endpoint.Hostname == nil || localEndpoint.TargetRef.Name == *endpoint.Hostname) { + addresses = localEndpoint.Addresses + } + } + } + + for _, address := range addresses { + + record := DNSRecord{ + IP: address, + Ports: mcsPorts, + ClusterName: clusterID, + } + + if endpoint.Hostname != nil { + record.HostName = *endpoint.Hostname + } + + records = append(records, record) + } + + if endpoint.Hostname != nil { + clusterInfo.endpointRecordsByHost[*endpoint.Hostname] = records + } + + clusterInfo.endpointRecords = append(clusterInfo.endpointRecords, records...) + } + + logger.V(log.DEBUG).Infof("Added records for headless EndpointSlice %q from cluster %q: %#v", + key, clusterID, clusterInfo.endpointRecords) +} + +func (i *Interface) getLocalEndpointSlice(from *discovery.EndpointSlice) (*discovery.EndpointSlice, error) { + epsGVR := schema.GroupVersionResource{ + Group: discovery.SchemeGroupVersion.Group, + Version: discovery.SchemeGroupVersion.Version, + Resource: "endpointslices", + } + + epSlices, err := i.client.Resource(epsGVR).Namespace(from.Labels[constants.LabelSourceNamespace]).List(context.TODO(), + metav1.ListOptions{ + LabelSelector: labels.Set(map[string]string{ + constants.KubernetesServiceName: from.Labels[mcsv1a1.LabelServiceName], + }).String(), + }) + if err != nil { + return nil, err + } + + if len(epSlices.Items) == 0 { + return nil, fmt.Errorf("local EndpointSlice not found for %s/%s", from.Labels[constants.LabelSourceNamespace], + from.Labels[mcsv1a1.LabelServiceName]) + } + + epSlice := &discovery.EndpointSlice{} + _ = runtime.DefaultUnstructuredConverter.FromUnstructured(epSlices.Items[0].Object, epSlice) + + return epSlice, nil +} + +func (i *Interface) RemoveEndpointSlice(endpointSlice *discovery.EndpointSlice) { + logger.Infof("Remove EndpointSlice: %#v", endpointSlice.ObjectMeta) + + key, clusterID, ok := getKeyInfoFrom(endpointSlice) + if !ok { + return + } + + i.mutex.Lock() + defer i.mutex.Unlock() + + serviceInfo, found := i.serviceMap[key] + if !found { + return + } + + clusterInfo, found := serviceInfo.clusters[clusterID] + if !found { + return + } + + if !serviceInfo.isHeadless { + clusterInfo.endpointsHealthy = false + return + } + + delete(serviceInfo.clusters, clusterID) +} + +func getKeyInfoFrom(es *discovery.EndpointSlice) (string, string, bool) { + name, ok := es.Labels[mcsv1a1.LabelServiceName] + if !ok { + logger.Warningf("EndpointSlice missing label %q: %#v", mcsv1a1.LabelServiceName, es.ObjectMeta) + return "", "", false + } + + namespace, ok := es.Labels[constants.LabelSourceNamespace] + if !ok { + logger.Warningf("EndpointSlice missing label %q: %#v", constants.LabelSourceNamespace, es.ObjectMeta) + return "", "", false + } + + clusterID, ok := es.Labels[constants.MCSLabelSourceCluster] + if !ok { + logger.Warningf("EndpointSlice missing label %q: %#v", constants.MCSLabelSourceCluster, es.ObjectMeta) + return "", "", false + } + + return keyFunc(namespace, name), clusterID, true +} diff --git a/coredns/resolver/endpoint_slice_test.go b/coredns/resolver/endpoint_slice_test.go new file mode 100644 index 000000000..0eb069ac8 --- /dev/null +++ b/coredns/resolver/endpoint_slice_test.go @@ -0,0 +1,100 @@ +/* +SPDX-License-Identifier: Apache-2.0 + +Copyright Contributors to the Submariner project. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resolver_test + +import ( + . "github.com/onsi/ginkgo/v2" + "github.com/submariner-io/lighthouse/coredns/constants" + discovery "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" +) + +var _ = Describe("PutEndpointSlice", func() { + t := newTestDriver() + + When("the EndpointSlice is missing the required labels", func() { + It("should not process it", func() { + // Missing LabelServiceName + t.putEndpointSlice(&discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Labels: map[string]string{ + constants.LabelSourceNamespace: "test", + constants.MCSLabelSourceCluster: "test", + }, + }, + }) + + // Missing LabelSourceNamespace + t.putEndpointSlice(&discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Labels: map[string]string{ + constants.MCSLabelSourceCluster: "test", + mcsv1a1.LabelServiceName: "test", + }, + }, + }) + + // Missing MCSLabelSourceCluster + t.putEndpointSlice(&discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Labels: map[string]string{ + constants.LabelSourceNamespace: "test", + mcsv1a1.LabelServiceName: "test", + }, + }, + }) + }) + }) +}) + +var _ = Describe("RemoveEndpointSlice", func() { + t := newTestDriver() + + When("the EndpointSlice is missing a required label", func() { + It("should not process it", func() { + t.resolver.RemoveEndpointSlice(&discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Labels: map[string]string{ + constants.LabelSourceNamespace: "test", + constants.MCSLabelSourceCluster: "test", + }, + }, + }) + }) + }) + + When("the service information doesn't exist", func() { + It("should not process it", func() { + t.resolver.RemoveEndpointSlice(newEndpointSlice(namespace1, service1, clusterID1, nil)) + }) + }) + + When("the cluster information doesn't exist", func() { + It("should not process it", func() { + t.resolver.PutServiceImport(newClusterHeadlessServiceImport(namespace1, service1, clusterID1)) + + t.resolver.RemoveEndpointSlice(newEndpointSlice(namespace1, service1, clusterID1, nil)) + }) + }) +}) diff --git a/coredns/resolver/fake/cluster_status.go b/coredns/resolver/fake/cluster_status.go new file mode 100644 index 000000000..2dbaa4b95 --- /dev/null +++ b/coredns/resolver/fake/cluster_status.go @@ -0,0 +1,47 @@ +/* +SPDX-License-Identifier: Apache-2.0 + +Copyright Contributors to the Submariner project. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fake + +import ( + "sync/atomic" + + "github.com/submariner-io/admiral/pkg/stringset" +) + +type ClusterStatus struct { + ConnectedClusterIDs stringset.Interface + LocalClusterID atomic.Value +} + +func NewClusterStatus(localClusterID string, isConnected ...string) *ClusterStatus { + c := &ClusterStatus{ + ConnectedClusterIDs: stringset.NewSynchronized(isConnected...), + } + + c.LocalClusterID.Store(localClusterID) + return c +} + +func (c *ClusterStatus) IsConnected(clusterID string) bool { + return c.ConnectedClusterIDs.Contains(clusterID) +} + +func (c *ClusterStatus) GetLocalClusterID() string { + return c.LocalClusterID.Load().(string) +} diff --git a/coredns/resolver/headless_service_test.go b/coredns/resolver/headless_service_test.go new file mode 100644 index 000000000..1505df0bc --- /dev/null +++ b/coredns/resolver/headless_service_test.go @@ -0,0 +1,275 @@ +/* +SPDX-License-Identifier: Apache-2.0 + +Copyright Contributors to the Submariner project. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resolver_test + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/submariner-io/lighthouse/coredns/constants" + "github.com/submariner-io/lighthouse/coredns/resolver" + corev1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" +) + +var _ = Describe("GetDNSRecords", func() { + Describe("Headless Service", func() { + When("a service is present in multiple clusters", testHeadlessServiceInMultipleClusters) + }) +}) + +func testHeadlessServiceInMultipleClusters() { + t := newTestDriver() + + cluster1DNSRecord := resolver.DNSRecord{ + IP: endpointIP1, + Ports: []mcsv1a1.ServicePort{port1}, + ClusterName: clusterID1, + } + + cluster2DNSRecord := resolver.DNSRecord{ + IP: endpointIP2, + Ports: []mcsv1a1.ServicePort{port2}, + ClusterName: clusterID2, + } + + cluster3DNSRecord1 := resolver.DNSRecord{ + IP: endpointIP3, + Ports: []mcsv1a1.ServicePort{port3, port4}, + ClusterName: clusterID3, + HostName: hostName1, + } + + cluster3DNSRecord2 := resolver.DNSRecord{ + IP: endpointIP4, + Ports: []mcsv1a1.ServicePort{port3, port4}, + ClusterName: clusterID3, + HostName: hostName1, + } + + cluster3DNSRecord3 := resolver.DNSRecord{ + IP: endpointIP5, + Ports: []mcsv1a1.ServicePort{port3, port4}, + ClusterName: clusterID3, + } + + cluster3DNSRecord4 := resolver.DNSRecord{ + IP: endpointIP6, + Ports: []mcsv1a1.ServicePort{port3, port4}, + ClusterName: clusterID3, + HostName: hostName2, + } + + JustBeforeEach(func() { + t.resolver.PutServiceImport(newClusterHeadlessServiceImport(namespace1, service1, clusterID1)) + t.resolver.PutServiceImport(newClusterHeadlessServiceImport(namespace1, service1, clusterID2)) + t.resolver.PutServiceImport(newClusterHeadlessServiceImport(namespace1, service1, clusterID3)) + + t.putEndpointSlice(newEndpointSlice(namespace1, service1, clusterID1, []mcsv1a1.ServicePort{port1}, discovery.Endpoint{ + Addresses: []string{endpointIP1}, + })) + + t.putEndpointSlice(newEndpointSlice(namespace1, service1, clusterID2, []mcsv1a1.ServicePort{port2}, discovery.Endpoint{ + Addresses: []string{endpointIP2}, + })) + + t.putEndpointSlice(newEndpointSlice(namespace1, service1, clusterID3, []mcsv1a1.ServicePort{port3, port4}, + discovery.Endpoint{ + Addresses: []string{endpointIP3, endpointIP4}, + Hostname: &hostName1, + NodeName: &nodeName1, + Conditions: discovery.EndpointConditions{Ready: &ready}, + }, + discovery.Endpoint{ + Addresses: []string{endpointIP5}, + NodeName: &nodeName2, + }, + discovery.Endpoint{ + Addresses: []string{endpointIP6}, + Hostname: &hostName2, + NodeName: &nodeName3, + }, + discovery.Endpoint{ + Addresses: []string{"1.2.3.4"}, + Conditions: discovery.EndpointConditions{Ready: ¬Ready}, + })) + }) + + Context("and no specific cluster is requested", func() { + It("should return all the DNS records", func() { + t.assertDNSRecordsFound(namespace1, service1, "", "", true, cluster1DNSRecord, + cluster2DNSRecord, cluster3DNSRecord1, cluster3DNSRecord2, cluster3DNSRecord3, cluster3DNSRecord4) + }) + }) + + Context("and a specific cluster is requested", func() { + It("should return all its DNS records", func() { + t.assertDNSRecordsFound(namespace1, service1, clusterID3, "", true, + cluster3DNSRecord1, cluster3DNSRecord2, cluster3DNSRecord3, cluster3DNSRecord4) + }) + }) + + Context("and a specific cluster and host name is requested", func() { + It("should return its host name DNS records", func() { + t.assertDNSRecordsFound(namespace1, service1, clusterID3, hostName1, true, + cluster3DNSRecord1, cluster3DNSRecord2) + + t.assertDNSRecordsFound(namespace1, service1, clusterID3, hostName2, true, cluster3DNSRecord4) + }) + }) + + Context("and one is on the local cluster", func() { + BeforeEach(func() { + t.clusterStatus.LocalClusterID.Store(clusterID3) + + // If the local cluster EndpointSlice is created before the local K8s EndpointSlice, PutEndpointSlice should + // return true to requeue. + t.resolver.PutServiceImport(newClusterHeadlessServiceImport(namespace1, service1, clusterID3)) + Expect(t.resolver.PutEndpointSlice(newEndpointSlice(namespace1, service1, clusterID3, nil))).To(BeTrue()) + + t.createEndpointSlice(&discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "local-" + service1, + Namespace: namespace1, + Labels: map[string]string{ + constants.KubernetesServiceName: service1, + }, + }, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{endpointIP3, endpointIP4}, + NodeName: &nodeName1, + TargetRef: &corev1.ObjectReference{ + Name: hostName1, + }, + }, + { + Addresses: []string{endpointIP5}, + NodeName: &nodeName2, + }, + { + Addresses: []string{endpointIP6}, + NodeName: &nodeName3, + TargetRef: &corev1.ObjectReference{ + Name: hostName2, + }, + }, + }, + }) + }) + + It("should return all its DNS record", func() { + t.assertDNSRecordsFound(namespace1, service1, clusterID3, "", true, + cluster3DNSRecord1, cluster3DNSRecord2, cluster3DNSRecord3, cluster3DNSRecord4) + }) + }) + + Context("and one becomes disconnected", func() { + JustBeforeEach(func() { + t.clusterStatus.ConnectedClusterIDs.Remove(clusterID3) + }) + + Context("and no specific cluster is requested", func() { + It("should return the connected clusters' DNS records", func() { + t.assertDNSRecordsFound(namespace1, service1, "", "", true, + cluster1DNSRecord, cluster2DNSRecord) + }) + }) + + Context("and the disconnected cluster is requested", func() { + It("should still return its DNS records", func() { + t.assertDNSRecordsFound(namespace1, service1, clusterID3, "", true, + cluster3DNSRecord1, cluster3DNSRecord2, cluster3DNSRecord3, cluster3DNSRecord4) + + t.assertDNSRecordsFound(namespace1, service1, clusterID3, hostName1, true, + cluster3DNSRecord1, cluster3DNSRecord2) + }) + }) + }) + + Context("and one is subsequently removed", func() { + JustBeforeEach(func() { + t.resolver.RemoveEndpointSlice(newEndpointSlice(namespace1, service1, clusterID3, nil)) + }) + + Context("and no specific cluster is requested", func() { + It("should return the remaining clusters' DNS records", func() { + t.assertDNSRecordsFound(namespace1, service1, "", "", true, + cluster1DNSRecord, cluster2DNSRecord) + }) + }) + + Context("and the removed cluster is requested", func() { + It("should return no DNS records found", func() { + t.assertDNSRecordsNotFound(namespace1, service1, clusterID3, "") + }) + }) + }) + + Context("and the endpoints for one cluster are updated", func() { + expDNSRecord1 := resolver.DNSRecord{ + IP: endpointIP4, + Ports: []mcsv1a1.ServicePort{port3}, + ClusterName: clusterID3, + HostName: hostName1, + } + + expDNSRecord2 := resolver.DNSRecord{ + IP: endpointIP5, + Ports: []mcsv1a1.ServicePort{port3}, + ClusterName: clusterID3, + HostName: hostName2, + } + + expDNSRecord3 := resolver.DNSRecord{ + IP: endpointIP6, + Ports: []mcsv1a1.ServicePort{port3}, + ClusterName: clusterID3, + HostName: hostName2, + } + + JustBeforeEach(func() { + t.putEndpointSlice(newEndpointSlice(namespace1, service1, clusterID3, []mcsv1a1.ServicePort{port3}, + discovery.Endpoint{ + Addresses: []string{endpointIP4}, + Hostname: &hostName1, + }, + discovery.Endpoint{ + Addresses: []string{endpointIP5, endpointIP6}, + Hostname: &hostName2, + })) + }) + + It("should return the updated DNS records", func() { + t.assertDNSRecordsFound(namespace1, service1, clusterID3, "", true, + expDNSRecord1, expDNSRecord2, expDNSRecord3) + + t.assertDNSRecordsFound(namespace1, service1, clusterID3, hostName1, true, expDNSRecord1) + + t.assertDNSRecordsFound(namespace1, service1, clusterID3, hostName2, true, expDNSRecord2, expDNSRecord3) + }) + }) + + Context("and a non-existent cluster is specified", func() { + It("should return no DNS records found", func() { + t.assertDNSRecordsNotFound(namespace1, service1, "non-existent", "") + }) + }) +} diff --git a/coredns/resolver/resolver.go b/coredns/resolver/resolver.go new file mode 100644 index 000000000..998cacad5 --- /dev/null +++ b/coredns/resolver/resolver.go @@ -0,0 +1,111 @@ +/* +SPDX-License-Identifier: Apache-2.0 + +Copyright Contributors to the Submariner project. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resolver + +import ( + "k8s.io/client-go/dynamic" +) + +func New(clusterStatus ClusterStatus, client dynamic.Interface) *Interface { + return &Interface{ + clusterStatus: clusterStatus, + serviceMap: make(map[string]*serviceInfo), + client: client, + } +} + +func (i *Interface) GetDNSRecords(namespace, name, clusterID, hostname string) (records []DNSRecord, isHeadless bool, found bool) { + i.mutex.RLock() + defer i.mutex.RUnlock() + + serviceInfo, found := i.serviceMap[keyFunc(namespace, name)] + if !found { + return nil, false, false + } + + if !serviceInfo.isHeadless { + record, found := i.getClusterIPRecord(serviceInfo, clusterID) + if record != nil { + return []DNSRecord{*record}, false, true + } + + return nil, false, found + } + + records, found = i.getHeadlessRecords(serviceInfo, clusterID, hostname) + return records, true, found +} + +func (i *Interface) getClusterIPRecord(serviceInfo *serviceInfo, clusterID string) (*DNSRecord, bool) { + // If a clusterID is specified, we supply it even if the service is not healthy. + if clusterID != "" { + clusterInfo, found := serviceInfo.clusters[clusterID] + if !found { + return nil, false + } + + return &clusterInfo.endpointRecords[0], true + } + + // If we are aware of the local cluster and we found some accessible IP, we shall return it. + localClusterID := i.clusterStatus.GetLocalClusterID() + if localClusterID != "" { + clusterInfo, found := serviceInfo.clusters[localClusterID] + if found && clusterInfo.endpointsHealthy { + return serviceInfo.newRecordFrom(&clusterInfo.endpointRecords[0]), true + } + } + + // Fall back to selected load balancer (weighted/RR/etc) if service is not present in the local cluster + record := serviceInfo.selectIP(i.clusterStatus.IsConnected) + + if record != nil { + return serviceInfo.newRecordFrom(record), true + } + + return nil, true +} + +func (i *Interface) getHeadlessRecords(serviceInfo *serviceInfo, clusterID, hostname string) ([]DNSRecord, bool) { + clusterInfo, clusterFound := serviceInfo.clusters[clusterID] + + switch { + case clusterID == "": + records := make([]DNSRecord, 0) + + for id, info := range serviceInfo.clusters { + if i.clusterStatus.IsConnected(id) { + records = append(records, info.endpointRecords...) + } + } + + return records, true + case !clusterFound: + return nil, false + case hostname == "": + return clusterInfo.endpointRecords, true + default: + records, found := clusterInfo.endpointRecordsByHost[hostname] + return records, found + } +} + +func keyFunc(namespace, name string) string { + return namespace + "/" + name +} diff --git a/coredns/resolver/resolver_suite_test.go b/coredns/resolver/resolver_suite_test.go new file mode 100644 index 000000000..e39bb707a --- /dev/null +++ b/coredns/resolver/resolver_suite_test.go @@ -0,0 +1,335 @@ +/* +SPDX-License-Identifier: Apache-2.0 + +Copyright Contributors to the Submariner project. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resolver_test + +import ( + "flag" + "fmt" + "reflect" + "testing" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/onsi/gomega/format" + "github.com/submariner-io/admiral/pkg/log/kzerolog" + "github.com/submariner-io/admiral/pkg/syncer/test" + "github.com/submariner-io/admiral/pkg/watcher" + "github.com/submariner-io/lighthouse/coredns/constants" + "github.com/submariner-io/lighthouse/coredns/resolver" + "github.com/submariner-io/lighthouse/coredns/resolver/fake" + corev1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/dynamic" + fakeClient "k8s.io/client-go/dynamic/fake" + "k8s.io/client-go/kubernetes/scheme" + mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" +) + +const ( + clusterID1 = "cluster1" + clusterID2 = "cluster2" + clusterID3 = "cluster3" + service1 = "service1" + namespace1 = "namespace1" + namespace2 = "namespace2" + submarinerNamespace = "submariner-operator" + serviceIP1 = "192.168.56.21" + serviceIP2 = "192.168.56.22" + serviceIP3 = "192.168.56.23" + endpointIP1 = "100.96.157.101" + endpointIP2 = "100.96.157.102" + endpointIP3 = "100.96.157.103" + endpointIP4 = "100.96.157.104" + endpointIP5 = "100.96.157.105" + endpointIP6 = "100.96.157.106" +) + +var ( + hostName1 = "host1" + hostName2 = "host2" + + nodeName1 = "node1" + nodeName2 = "node2" + nodeName3 = "node3" + + ready = true + notReady = false + + port1 = mcsv1a1.ServicePort{ + Name: "http", + Protocol: corev1.ProtocolTCP, + Port: 8080, + } + + port2 = mcsv1a1.ServicePort{ + Name: "POP3", + Protocol: corev1.ProtocolUDP, + Port: 110, + } + + port3 = mcsv1a1.ServicePort{ + Name: "https", + Protocol: corev1.ProtocolTCP, + Port: 443, + } + + port4 = mcsv1a1.ServicePort{ + Name: "SMTP", + Protocol: corev1.ProtocolUDP, + Port: 25, + } +) + +func init() { + flags := flag.NewFlagSet("kzerolog", flag.ExitOnError) + kzerolog.AddFlags(flags) + _ = flags.Parse([]string{"-v=2"}) +} + +var _ = BeforeSuite(func() { + kzerolog.InitK8sLogging() +}) + +func TestResolver(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Resolver Suite") +} + +type testDriver struct { + clusterStatus *fake.ClusterStatus + resolver *resolver.Interface + endpointSlices dynamic.NamespaceableResourceInterface + serviceImports dynamic.NamespaceableResourceInterface +} + +func newTestDriver() *testDriver { + t := &testDriver{} + + BeforeEach(func() { + t.clusterStatus = fake.NewClusterStatus("", clusterID1, clusterID2, clusterID3) + + Expect(discovery.AddToScheme(scheme.Scheme)).To(Succeed()) + Expect(mcsv1a1.AddToScheme(scheme.Scheme)).To(Succeed()) + + client := fakeClient.NewSimpleDynamicClient(scheme.Scheme) + + restMapper := test.GetRESTMapperFor(&discovery.EndpointSlice{}, &mcsv1a1.ServiceImport{}) + + t.endpointSlices = client.Resource(*test.GetGroupVersionResourceFor(restMapper, &discovery.EndpointSlice{})) + t.serviceImports = client.Resource(*test.GetGroupVersionResourceFor(restMapper, &mcsv1a1.ServiceImport{})) + + t.resolver = resolver.New(t.clusterStatus, client) + controller := resolver.NewController(t.resolver) + + Expect(controller.Start(watcher.Config{ + RestMapper: restMapper, + Client: client, + })).To(Succeed()) + + DeferCleanup(controller.Stop) + }) + + return t +} + +func (t *testDriver) createServiceImport(si *mcsv1a1.ServiceImport) { + test.CreateResource(t.serviceImports.Namespace(si.Namespace), si) +} + +func (t *testDriver) createEndpointSlice(es *discovery.EndpointSlice) { + test.CreateResource(t.endpointSlices.Namespace(es.Namespace), es) +} + +func (t *testDriver) awaitDNSRecordsFound(ns, name, cluster, hostname string, expIsHeadless bool, expRecords ...resolver.DNSRecord) { + var records []resolver.DNSRecord + var found, isHeadless bool + + err := wait.PollImmediate(50*time.Millisecond, 5*time.Second, func() (bool, error) { + records, isHeadless, found = t.resolver.GetDNSRecords(ns, name, cluster, hostname) + return found && isHeadless == expIsHeadless && reflect.DeepEqual(records, expRecords), nil + }) + if err == nil { + return + } + + Expect(found).To(BeTrue()) + Expect(isHeadless).To(Equal(expIsHeadless)) + + t.assertDNSRecords(records, expRecords...) +} + +func (t *testDriver) assertDNSRecordsFound(ns, name, cluster, hostname string, expIsHeadless bool, expRecords ...resolver.DNSRecord) { + records, isHeadless, found := t.resolver.GetDNSRecords(ns, name, cluster, hostname) + + Expect(found).To(BeTrue()) + Expect(isHeadless).To(Equal(expIsHeadless)) + + t.assertDNSRecords(records, expRecords...) +} + +func (t *testDriver) assertDNSRecords(records []resolver.DNSRecord, expRecords ...resolver.DNSRecord) { + recordFound := func(r resolver.DNSRecord) bool { + for i := range expRecords { + if reflect.DeepEqual(r, expRecords[i]) { + return true + } + } + + return false + } + + for i := range records { + if !recordFound(records[i]) { + Fail(fmt.Sprintf("Unexpected DNS record returned: %s\nExpected:\n%s", + format.Object(records[i], 1), format.Object(expRecords, 1))) + } + } + + if len(records) != len(expRecords) { + Fail(fmt.Sprintf("Expected %d DNS record returned, received %d.\nActual: %s\nExpected:\n%s", + len(expRecords), len(records), format.Object(records, 1), format.Object(expRecords, 1))) + } +} + +func (t *testDriver) getNonHeadlessDNSRecord(ns, name, cluster string) *resolver.DNSRecord { + records, isHeadless, found := t.resolver.GetDNSRecords(ns, name, cluster, "") + + Expect(found).To(BeTrue()) + Expect(isHeadless).To(BeFalse()) + Expect(records).To(HaveLen(1)) + + return &records[0] +} + +func (t *testDriver) assertDNSRecordsNotFound(ns, name, cluster, hostname string) { + _, _, found := t.resolver.GetDNSRecords(ns, name, cluster, hostname) + Expect(found).To(BeFalse()) +} + +func (t *testDriver) awaitDNSRecords(ns, name, cluster, hostname string, expFound bool) { + Eventually(func() bool { + _, _, found := t.resolver.GetDNSRecords(ns, name, cluster, hostname) + return found + }).Should(Equal(expFound)) +} + +func (t *testDriver) testRoundRobin(ns, service string, serviceIPs ...string) { + ipsCount := len(serviceIPs) + rrIPs := make([]string, 0) + + for i := 0; i < ipsCount; i++ { + r := t.getNonHeadlessDNSRecord(ns, service, "") + rrIPs = append(rrIPs, r.IP) + slice := rrIPs[0:i] + Expect(slice).ToNot(ContainElement(r.IP)) + Expect(serviceIPs).To(ContainElement(r.IP)) + } + + for i := 0; i < 5; i++ { + for _, ip := range rrIPs { + testIP := t.getNonHeadlessDNSRecord(ns, service, "").IP + Expect(testIP).To(Equal(ip)) + } + } +} + +func (t *testDriver) putEndpointSlice(es *discovery.EndpointSlice) { + Expect(t.resolver.PutEndpointSlice(es)).To(BeFalse()) +} + +func newClusterServiceImport(namespace, name, serviceIP, clusterID string, ports ...mcsv1a1.ServicePort) *mcsv1a1.ServiceImport { + var ips []string + if serviceIP != "" { + ips = []string{serviceIP} + } + + return &mcsv1a1.ServiceImport{ + ObjectMeta: metav1.ObjectMeta{ + Name: name + "-" + namespace + "-" + clusterID, + Namespace: submarinerNamespace, + Labels: map[string]string{ + mcsv1a1.LabelServiceName: name, + constants.LabelSourceNamespace: namespace, + constants.MCSLabelSourceCluster: clusterID, + }, + }, + Spec: mcsv1a1.ServiceImportSpec{ + Type: mcsv1a1.ClusterSetIP, + IPs: ips, + Ports: ports, + }, + Status: mcsv1a1.ServiceImportStatus{ + Clusters: []mcsv1a1.ClusterStatus{ + { + Cluster: clusterID, + }, + }, + }, + } +} + +func newClusterHeadlessServiceImport(namespace, name, clusterID string) *mcsv1a1.ServiceImport { + si := newClusterServiceImport(namespace, name, "", clusterID) + si.Spec.Type = mcsv1a1.Headless + + return si +} + +func newClusterIPEndpointSlice(namespace, name, clusterID, clusterIP string, isHealthy bool, + ports ...mcsv1a1.ServicePort) *discovery.EndpointSlice { + if isHealthy { + return newEndpointSlice(namespace, name, clusterID, ports, discovery.Endpoint{ + Addresses: []string{clusterIP}, + }) + } + + return newEndpointSlice(namespace, name, clusterID, ports) +} + +func newEndpointSlice(namespace, name, clusterID string, ports []mcsv1a1.ServicePort, + endpoints ...discovery.Endpoint) *discovery.EndpointSlice { + epPorts := make([]discovery.EndpointPort, len(ports)) + for i := range ports { + epPorts[i] = discovery.EndpointPort{ + Name: &ports[i].Name, + Protocol: &ports[i].Protocol, + Port: &ports[i].Port, + AppProtocol: ports[i].AppProtocol, + } + } + + return &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: name + "-" + namespace + "-" + clusterID, + Namespace: namespace, + Labels: map[string]string{ + discovery.LabelManagedBy: constants.LabelValueManagedBy, + constants.LabelSourceNamespace: namespace, + constants.MCSLabelSourceCluster: clusterID, + mcsv1a1.LabelServiceName: name, + }, + }, + AddressType: discovery.AddressTypeIPv4, + Ports: epPorts, + Endpoints: endpoints, + } +} diff --git a/coredns/resolver/service_import.go b/coredns/resolver/service_import.go new file mode 100644 index 000000000..583d05ddb --- /dev/null +++ b/coredns/resolver/service_import.go @@ -0,0 +1,146 @@ +/* +SPDX-License-Identifier: Apache-2.0 + +Copyright Contributors to the Submariner project. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resolver + +import ( + "strconv" + + "github.com/submariner-io/lighthouse/coredns/constants" + "github.com/submariner-io/lighthouse/coredns/loadbalancer" + mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" +) + +func (i *Interface) PutServiceImport(serviceImport *mcsv1a1.ServiceImport) { + logger.Infof("Put %#v", serviceImport) + + name, ok := getSourceName(serviceImport) + if !ok { + return + } + + key := keyFunc(getSourceNamespace(serviceImport), name) + + i.mutex.Lock() + defer i.mutex.Unlock() + + svcInfo, found := i.serviceMap[key] + + if !found { + svcInfo = &serviceInfo{ + clusters: make(map[string]*clusterInfo), + balancer: loadbalancer.NewSmoothWeightedRR(), + isHeadless: serviceImport.Spec.Type == mcsv1a1.Headless, + } + + i.serviceMap[key] = svcInfo + } + + if !svcInfo.isHeadless { + clusterName := getSourceCluster(serviceImport) + + clusterInfo := svcInfo.ensureClusterInfo(clusterName) + clusterInfo.weight = getServiceWeightFrom(serviceImport, i.clusterStatus.GetLocalClusterID()) + clusterInfo.endpointRecords = []DNSRecord{{ + IP: serviceImport.Spec.IPs[0], + Ports: serviceImport.Spec.Ports, + ClusterName: clusterName, + }} + + svcInfo.resetLoadBalancing() + svcInfo.mergePorts() + + return + } +} + +func (i *Interface) RemoveServiceImport(serviceImport *mcsv1a1.ServiceImport) { + logger.Infof("Remove %#v", serviceImport) + + name, found := getSourceName(serviceImport) + if !found { + return + } + + key := keyFunc(getSourceNamespace(serviceImport), name) + + i.mutex.Lock() + defer i.mutex.Unlock() + + serviceInfo, found := i.serviceMap[key] + if !found { + return + } + + for _, info := range serviceImport.Status.Clusters { + delete(serviceInfo.clusters, info.Cluster) + } + + if len(serviceInfo.clusters) == 0 { + delete(i.serviceMap, key) + return + } + + if !serviceInfo.isHeadless { + serviceInfo.resetLoadBalancing() + } + + serviceInfo.mergePorts() +} + +func getSourceName(from *mcsv1a1.ServiceImport) (string, bool) { + name, ok := from.Labels[mcsv1a1.LabelServiceName] + if ok { + return name, true + } + + name, ok = from.Annotations["origin-name"] + return name, ok +} + +func getSourceNamespace(from *mcsv1a1.ServiceImport) string { + ns, ok := from.Labels[constants.LabelSourceNamespace] + if ok { + return ns + } + + return from.Annotations["origin-namespace"] +} + +func getSourceCluster(from *mcsv1a1.ServiceImport) string { + c, ok := from.Labels[constants.MCSLabelSourceCluster] + if ok { + return c + } + + return from.Labels["lighthouse.submariner.io/sourceCluster"] +} + +func getServiceWeightFrom(si *mcsv1a1.ServiceImport, forClusterName string) int64 { + weightKey := constants.LoadBalancerWeightAnnotationPrefix + "/" + forClusterName + if val, ok := si.Annotations[weightKey]; ok { + f, err := strconv.ParseInt(val, 0, 64) + if err != nil { + return f + } + + logger.Errorf(err, "Error parsing the %q annotation from ServiceImport %q", weightKey, si.Name) + } + + return 1 // Zero will cause no selection +} diff --git a/coredns/resolver/service_info.go b/coredns/resolver/service_info.go new file mode 100644 index 000000000..42f472a41 --- /dev/null +++ b/coredns/resolver/service_info.go @@ -0,0 +1,89 @@ +/* +SPDX-License-Identifier: Apache-2.0 + +Copyright Contributors to the Submariner project. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resolver + +import ( + "fmt" + + "github.com/submariner-io/admiral/pkg/slices" + mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" +) + +func (si *serviceInfo) resetLoadBalancing() { + si.balancer.RemoveAll() + + for name, info := range si.clusters { + err := si.balancer.Add(name, info.weight) + if err != nil { + logger.Error(err, "Error adding load balancer info") + } + } +} + +func (si *serviceInfo) mergePorts() { + si.ports = nil + + for _, info := range si.clusters { + if si.ports == nil { + si.ports = info.endpointRecords[0].Ports + } else { + si.ports = slices.Intersect(si.ports, info.endpointRecords[0].Ports, func(p mcsv1a1.ServicePort) string { + return fmt.Sprintf("%s%s%d", p.Name, p.Protocol, p.Port) + }) + } + } +} + +func (si *serviceInfo) ensureClusterInfo(name string) *clusterInfo { + info, ok := si.clusters[name] + + if !ok { + info = &clusterInfo{ + endpointRecordsByHost: make(map[string][]DNSRecord), + } + + si.clusters[name] = info + } + + return info +} + +func (si *serviceInfo) newRecordFrom(from *DNSRecord) *DNSRecord { + r := *from + r.Ports = si.ports + + return &r +} + +func (si *serviceInfo) selectIP(checkCluster func(string) bool) *DNSRecord { + queueLength := si.balancer.ItemCount() + for i := 0; i < queueLength; i++ { + clusterID := si.balancer.Next().(string) + clusterInfo := si.clusters[clusterID] + + if checkCluster(clusterID) && clusterInfo.endpointsHealthy { + return &clusterInfo.endpointRecords[0] + } + + // Will Skip the cluster until a full "round" of the items is done + si.balancer.Skip(clusterID) + } + + return nil +} diff --git a/coredns/resolver/types.go b/coredns/resolver/types.go new file mode 100644 index 000000000..72e83c227 --- /dev/null +++ b/coredns/resolver/types.go @@ -0,0 +1,60 @@ +/* +SPDX-License-Identifier: Apache-2.0 + +Copyright Contributors to the Submariner project. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resolver + +import ( + "sync" + + "github.com/submariner-io/lighthouse/coredns/loadbalancer" + "k8s.io/client-go/dynamic" + mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" +) + +type Interface struct { + serviceMap map[string]*serviceInfo + clusterStatus ClusterStatus + client dynamic.Interface + mutex sync.RWMutex +} + +type ClusterStatus interface { + IsConnected(clusterID string) bool + GetLocalClusterID() string +} + +type DNSRecord struct { + IP string + Ports []mcsv1a1.ServicePort + HostName string + ClusterName string +} + +type clusterInfo struct { + endpointRecords []DNSRecord + endpointRecordsByHost map[string][]DNSRecord + weight int64 + endpointsHealthy bool +} + +type serviceInfo struct { + clusters map[string]*clusterInfo + balancer loadbalancer.Interface + isHeadless bool + ports []mcsv1a1.ServicePort +}