diff --git a/source/debug.test b/source/debug.test new file mode 100755 index 0000000000..7f202af7ab Binary files /dev/null and b/source/debug.test differ diff --git a/source/service.go b/source/service.go index ec9d8575a1..cfb3726ddb 100644 --- a/source/service.go +++ b/source/service.go @@ -25,6 +25,7 @@ import ( log "github.com/Sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" "k8s.io/client-go/pkg/api/v1" @@ -114,6 +115,30 @@ func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) { return endpoints, nil } +func (sc *serviceSource) extractHeadlessEndpoint(svc *v1.Service, hostname string) []*endpoint.Endpoint { + + var endpoints []*endpoint.Endpoint + + pods, err := sc.client.CoreV1().Pods(svc.Namespace).List(metav1.ListOptions{LabelSelector: labels.Set(svc.Spec.Selector).AsSelectorPreValidated().String()}) + + if err != nil { + log.Errorf("List Pods of service[%s] error:%v", svc.GetName(), err) + return endpoints + } + + for _, v := range pods.Items { + headlessDomain := v.Spec.Hostname + "." + hostname + log.Debugf("Generating matching endpoint %s with HostIP %s", headlessDomain, v.Status.HostIP) + // To reduce traffice on the DNS API only add record for running Pods. Good Idea? + if v.Status.Phase == v1.PodRunning { + endpoints = append(endpoints, endpoint.NewEndpoint(headlessDomain, v.Status.HostIP, endpoint.RecordTypeA)) + } else { + log.Debugf("Pod %s is not in running phase", v.Spec.Hostname) + } + } + + return endpoints +} func (sc *serviceSource) endpointsFromTemplate(svc *v1.Service) ([]*endpoint.Endpoint, error) { var endpoints []*endpoint.Endpoint @@ -159,6 +184,10 @@ func (sc *serviceSource) generateEndpoints(svc *v1.Service, hostname string) []* if sc.publishInternal { endpoints = append(endpoints, extractServiceIps(svc, hostname)...) } + if svc.Spec.ClusterIP == v1.ClusterIPNone { + endpoints = append(endpoints, sc.extractHeadlessEndpoint(svc, hostname)...) + } + } return endpoints diff --git a/source/service_test.go b/source/service_test.go index f4ef658782..365e16319a 100644 --- a/source/service_test.go +++ b/source/service_test.go @@ -674,6 +674,145 @@ func TestClusterIpServices(t *testing.T) { } } +// TestHeadlessServices tests that headless services generate the correct endpoints. +func TestHeadlessServices(t *testing.T) { + for _, tc := range []struct { + title string + targetNamespace string + svcNamespace string + svcName string + svcType v1.ServiceType + compatibility string + fqdnTemplate string + labels map[string]string + annotations map[string]string + clusterIP string + hostIP string + selector map[string]string + lbs []string + hostnames []string + phases []v1.PodPhase + expected []*endpoint.Endpoint + expectError bool + }{ + { + "annotated Headless services return endpoints for each selected Pod", + "", + "testing", + "foo", + v1.ServiceTypeClusterIP, + "", + "", + map[string]string{"component": "foo"}, + map[string]string{ + hostnameAnnotationKey: "service.example.org", + }, + v1.ClusterIPNone, + "1.1.1.1", + map[string]string{ + "component": "foo", + }, + []string{}, + []string{"foo-0", "foo-1"}, + []v1.PodPhase{v1.PodRunning, v1.PodRunning}, + []*endpoint.Endpoint{ + {DNSName: "foo-0.service.example.org", Target: "1.1.1.1"}, + {DNSName: "foo-1.service.example.org", Target: "1.1.1.1"}, + }, + false, + }, + { + "annotated Headless services return endpoints for each selected Pod, which are in running state", + "", + "testing", + "foo", + v1.ServiceTypeClusterIP, + "", + "", + map[string]string{"component": "foo"}, + map[string]string{ + hostnameAnnotationKey: "service.example.org", + }, + v1.ClusterIPNone, + "1.1.1.1", + map[string]string{ + "component": "foo", + }, + []string{}, + []string{"foo-0", "foo-1"}, + []v1.PodPhase{v1.PodRunning, v1.PodFailed}, + []*endpoint.Endpoint{ + {DNSName: "foo-0.service.example.org", Target: "1.1.1.1"}, + }, + false, + }, + } { + t.Run(tc.title, func(t *testing.T) { + // Create a Kubernetes testing client + kubernetes := fake.NewSimpleClientset() + + service := &v1.Service{ + Spec: v1.ServiceSpec{ + Type: tc.svcType, + ClusterIP: tc.clusterIP, + Selector: tc.selector, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: tc.svcNamespace, + Name: tc.svcName, + Labels: tc.labels, + Annotations: tc.annotations, + }, + Status: v1.ServiceStatus{}, + } + _, err := kubernetes.CoreV1().Services(service.Namespace).Create(service) + require.NoError(t, err) + + for i, hostname := range tc.hostnames { + pod := &v1.Pod{ + Spec: v1.PodSpec{ + Containers: []v1.Container{}, + Hostname: hostname, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: tc.svcNamespace, + Name: hostname, + Labels: tc.labels, + Annotations: tc.annotations, + }, + Status: v1.PodStatus{ + HostIP: tc.hostIP, + Phase: tc.phases[i], + }, + } + + _, err = kubernetes.CoreV1().Pods(tc.svcNamespace).Create(pod) + require.NoError(t, err) + } + + // Create our object under test and get the endpoints. + client, _ := NewServiceSource( + kubernetes, + tc.targetNamespace, + tc.fqdnTemplate, + tc.compatibility, + true, + ) + require.NoError(t, err) + + endpoints, err := client.Endpoints() + if tc.expectError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + // Validate returned endpoints against desired endpoints. + validateEndpoints(t, endpoints, tc.expected) + }) + } +} + func BenchmarkServiceEndpoints(b *testing.B) { kubernetes := fake.NewSimpleClientset()