diff --git a/receiver/k8sclusterreceiver/internal/collection/collector.go b/receiver/k8sclusterreceiver/internal/collection/collector.go index 43d278858b4a..e71f265ed3be 100644 --- a/receiver/k8sclusterreceiver/internal/collection/collector.go +++ b/receiver/k8sclusterreceiver/internal/collection/collector.go @@ -134,6 +134,7 @@ func (dc *DataCollector) CollectMetricData(currentTime time.Time) pmetric.Metric dc.metadataStore.ForEach(gvk.StatefulSet, func(o any) { statefulset.RecordMetrics(dc.metricsBuilder, o.(*appsv1.StatefulSet), ts) }) + dc.metadataStore.ForEach(gvk.Job, func(o any) { jobs.RecordMetrics(dc.metricsBuilder, o.(*batchv1.Job), ts) }) diff --git a/receiver/k8sclusterreceiver/internal/constants/constants.go b/receiver/k8sclusterreceiver/internal/constants/constants.go index 191a51d365ca..b101f5b5a249 100644 --- a/receiver/k8sclusterreceiver/internal/constants/constants.go +++ b/receiver/k8sclusterreceiver/internal/constants/constants.go @@ -37,3 +37,8 @@ const ( K8sServicePrefix = "k8s.service." ) + +// Middleware.io constants +const ( + MWK8sServiceName = "middleware.io/k8s.service.name" +) diff --git a/receiver/k8sclusterreceiver/internal/pod/pods.go b/receiver/k8sclusterreceiver/internal/pod/pods.go index 2b2a28057e54..4f2fa434cdfc 100644 --- a/receiver/k8sclusterreceiver/internal/pod/pods.go +++ b/receiver/k8sclusterreceiver/internal/pod/pods.go @@ -4,10 +4,6 @@ package pod // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/pod" import ( - "context" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" - "k8s.io/apimachinery/pkg/labels" - k8s "k8s.io/client-go/kubernetes" "strings" "time" @@ -69,18 +65,18 @@ func Transform(pod *corev1.Pod) *corev1.Pod { }, }) } + newPod.Spec.ServiceAccountName = pod.Spec.ServiceAccountName return newPod } func RecordMetrics(logger *zap.Logger, mb *metadata.MetricsBuilder, pod *corev1.Pod, ts pcommon.Timestamp) { - client, err := k8sconfig.MakeClient(k8sconfig.APIConfig{ - AuthType: k8sconfig.AuthTypeServiceAccount, - }) - if err != nil { - logger.Error(err.Error()) - } - jobInfo := getJobInfoForPod(client, pod) + var jobName, jobUID string + ownerReference := utils.FindOwnerWithKind(pod.OwnerReferences, constants.K8sKindJob) + if ownerReference != nil && ownerReference.Kind == constants.K8sKindJob { + jobName = ownerReference.Name + jobUID = string(ownerReference.UID) + } mb.RecordK8sPodPhaseDataPoint(ts, int64(phaseToInt(pod.Status.Phase))) mb.RecordK8sPodStatusReasonDataPoint(ts, int64(reasonToInt(pod.Status.Reason))) @@ -89,13 +85,17 @@ func RecordMetrics(logger *zap.Logger, mb *metadata.MetricsBuilder, pod *corev1. rb.SetK8sNodeName(pod.Spec.NodeName) rb.SetK8sPodName(pod.Name) rb.SetK8sPodUID(string(pod.UID)) - rb.SetK8sPodStartTime(pod.GetCreationTimestamp().String()) + rb.SetK8sPodStartTime(pod.CreationTimestamp.String()) rb.SetOpencensusResourcetype("k8s") - rb.SetK8sServiceName(getServiceNameForPod(client, pod)) - rb.SetK8sJobName(jobInfo.Name) - rb.SetK8sJobUID(string(jobInfo.UID)) - rb.SetK8sPodStartTime(pod.GetCreationTimestamp().String()) - rb.SetK8sServiceAccountName(getServiceAccountNameForPod(client, pod)) + + svcName, ok := pod.Labels[constants.MWK8sServiceName] + if ok { + rb.SetK8sServiceName(svcName) + } + + rb.SetK8sJobName(jobName) + rb.SetK8sJobUID(string(jobUID)) + rb.SetK8sServiceAccountName(pod.Spec.ServiceAccountName) rb.SetK8sClusterName("unknown") mb.EmitForResource(metadata.WithResource(rb.Emit())) @@ -104,62 +104,6 @@ func RecordMetrics(logger *zap.Logger, mb *metadata.MetricsBuilder, pod *corev1. } } -func getServiceNameForPod(client k8s.Interface, pod *corev1.Pod) string { - var serviceName string - - serviceList, err := client.CoreV1().Services(pod.Namespace).List(context.TODO(), v1.ListOptions{}) - if err != nil { - return "" - } - - for _, svc := range serviceList.Items { - if svc.Spec.Selector != nil { - if labels.Set(svc.Spec.Selector).AsSelectorPreValidated().Matches(labels.Set(pod.Labels)) { - serviceName = svc.Name - return serviceName - } - } - } - - return "" -} - -type JobInfo struct { - Name string - UID types.UID -} - -func getJobInfoForPod(client k8s.Interface, pod *corev1.Pod) JobInfo { - podSelector := labels.Set(pod.Labels) - jobList, err := client.BatchV1().Jobs(pod.Namespace).List(context.TODO(), v1.ListOptions{ - LabelSelector: podSelector.AsSelector().String(), - }) - if err != nil { - return JobInfo{} - } - - if len(jobList.Items) > 0 { - return JobInfo{ - Name: jobList.Items[0].Name, - UID: jobList.Items[0].UID, - } - } - - return JobInfo{} -} - -func getServiceAccountNameForPod(client k8s.Interface, pod *corev1.Pod) string { - var serviceAccountName string - - podDetails, err := client.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, v1.GetOptions{}) - if err != nil { - return "" - } - - serviceAccountName = podDetails.Spec.ServiceAccountName - return serviceAccountName -} - func reasonToInt(reason string) int32 { switch reason { case "Evicted": diff --git a/receiver/k8sclusterreceiver/internal/pod/pods_test.go b/receiver/k8sclusterreceiver/internal/pod/pods_test.go index 122341906c11..d8327f725395 100644 --- a/receiver/k8sclusterreceiver/internal/pod/pods_test.go +++ b/receiver/k8sclusterreceiver/internal/pod/pods_test.go @@ -4,7 +4,6 @@ package pod import ( - "context" "fmt" "path/filepath" "strings" @@ -22,7 +21,6 @@ import ( "k8s.io/apimachinery/pkg/api/resource" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/kubernetes/fake" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" @@ -411,6 +409,7 @@ func TestTransform(t *testing.T) { }, }, }, + ServiceAccountName: "my-service-account", }, Status: corev1.PodStatus{ Phase: corev1.PodRunning, @@ -460,6 +459,7 @@ func TestTransform(t *testing.T) { }, }, }, + ServiceAccountName: "my-service-account", }, Status: corev1.PodStatus{ Phase: corev1.PodRunning, @@ -476,103 +476,3 @@ func TestTransform(t *testing.T) { } assert.Equal(t, wantPod, Transform(originalPod)) } - -func TestGetServiceNameForPod(t *testing.T) { - // Create a fake Kubernetes client - client := fake.NewSimpleClientset() - - // Create a Pod with labels - pod := testutils.NewPodWithContainer( - "1", - testutils.NewPodSpecWithContainer("container-name"), - testutils.NewPodStatusWithContainer("container-name", containerIDWithPreifx("container-id")), - ) - - // Create a Service with the same labels as the Pod - service := &corev1.Service{ - ObjectMeta: v1.ObjectMeta{ - Name: "test-service", - Namespace: "test-namespace", - }, - Spec: corev1.ServiceSpec{ - Selector: map[string]string{ - "foo": "bar", - "foo1": "", - }, - }, - } - - // Create the Pod and Service in the fake client - _, err := client.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, v1.CreateOptions{}) - assert.NoError(t, err) - - _, err = client.CoreV1().Services(service.Namespace).Create(context.TODO(), service, v1.CreateOptions{}) - assert.NoError(t, err) - - // Call the function - serviceName := getServiceNameForPod(client, pod) - - // Verify the result - expectedServiceName := "test-service" - - assert.Equal(t, expectedServiceName, serviceName) -} - -func TestGetServiceAccountNameForPod(t *testing.T) { - // Create a fake Kubernetes client - client := fake.NewSimpleClientset() - - // Create a Pod with labels - pod := testutils.NewPodWithContainer( - "1", - &corev1.PodSpec{ - ServiceAccountName: "test-service-account", - }, - testutils.NewPodStatusWithContainer("container-name", containerIDWithPreifx("container-id")), - ) - - // Create the Pod in the fake client - _, err := client.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, v1.CreateOptions{}) - assert.NoError(t, err) - - // Call the function - serviceAccountName := getServiceAccountNameForPod(client, pod) - - // Verify the result - expectedServiceAccountName := "test-service-account" - - assert.Equal(t, expectedServiceAccountName, serviceAccountName) -} - -func TestGetJobInfoForPod(t *testing.T) { - // Create a fake Kubernetes client - client := fake.NewSimpleClientset() - - // Create a Pod with labels - pod := testutils.NewPodWithContainer( - "1", - testutils.NewPodSpecWithContainer("container-name"), - testutils.NewPodStatusWithContainer("container-name", containerIDWithPreifx("container-id")), - ) - - // Create a Job with the same labels as the Pod - job := testutils.NewJob("1") - - // Create the Pod and Job in the fake client - _, err := client.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, v1.CreateOptions{}) - assert.NoError(t, err) - - _, err = client.BatchV1().Jobs(job.Namespace).Create(context.TODO(), job, v1.CreateOptions{}) - assert.NoError(t, err) - - // Call the function - jobInfo := getJobInfoForPod(client, pod) - - // Verify the result - expectedJobInfo := JobInfo{ - Name: "test-job-1", - UID: job.UID, - } - - assert.Equal(t, expectedJobInfo, jobInfo) -} diff --git a/receiver/k8sclusterreceiver/internal/service/service.go b/receiver/k8sclusterreceiver/internal/service/service.go index 2fb077a47c07..747c2e618217 100644 --- a/receiver/k8sclusterreceiver/internal/service/service.go +++ b/receiver/k8sclusterreceiver/internal/service/service.go @@ -3,12 +3,10 @@ package service // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/service" import ( - "context" "fmt" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" + "go.opentelemetry.io/collector/pdata/pcommon" corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/tools/cache" @@ -23,7 +21,10 @@ func Transform(service *corev1.Service) *corev1.Service { return &corev1.Service{ ObjectMeta: metadata.TransformObjectMeta(service.ObjectMeta), Spec: corev1.ServiceSpec{ - Selector: service.Spec.Selector, + Selector: service.Spec.Selector, + ClusterIP: service.Spec.ClusterIP, + Type: service.Spec.Type, + Ports: service.Spec.Ports, }, } } @@ -44,33 +45,14 @@ func GetPodServiceTags(pod *corev1.Pod, services cache.Store) map[string]string } func RecordMetrics(mb *imetadata.MetricsBuilder, svc *corev1.Service, ts pcommon.Timestamp) { - svcDetails := GetServiceDetails(svc) - mb.RecordK8sServicePortCountDataPoint(ts, int64(len(svcDetails.Spec.Ports))) + mb.RecordK8sServicePortCountDataPoint(ts, int64(len(svc.Spec.Ports))) rb := mb.NewResourceBuilder() rb.SetK8sServiceUID(string(svc.UID)) rb.SetK8sServiceName(svc.ObjectMeta.Name) rb.SetK8sServiceNamespace(svc.ObjectMeta.Namespace) - rb.SetK8sServiceClusterIP(svcDetails.Spec.ClusterIP) - rb.SetK8sServiceType(string(svcDetails.Spec.Type)) - rb.SetK8sServiceClusterIP(svcDetails.Spec.ClusterIP) + rb.SetK8sServiceClusterIP(svc.Spec.ClusterIP) + rb.SetK8sServiceType(string(svc.Spec.Type)) rb.SetK8sClusterName("unknown") mb.EmitForResource(metadata.WithResource(rb.Emit())) } - -func GetServiceDetails(svc *corev1.Service) *corev1.Service { - var svcObject *corev1.Service - - client, _ := k8sconfig.MakeClient(k8sconfig.APIConfig{ - AuthType: k8sconfig.AuthTypeServiceAccount, - }) - - service, err := client.CoreV1().Services(svc.ObjectMeta.Namespace).Get(context.TODO(), svc.ObjectMeta.Name, v1.GetOptions{}) - if err != nil { - panic(err) - } else { - svcObject = service - } - - return svcObject -} diff --git a/receiver/k8sclusterreceiver/watcher.go b/receiver/k8sclusterreceiver/watcher.go index 7552a0aac53f..d27a607ab66a 100644 --- a/receiver/k8sclusterreceiver/watcher.go +++ b/receiver/k8sclusterreceiver/watcher.go @@ -27,6 +27,7 @@ import ( netv1 "k8s.io/api/networking/v1" rbacv1 "k8s.io/api/rbac/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" @@ -36,6 +37,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/clusterrole" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/clusterrolebinding" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/cronjob" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/demonset" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/deployment" @@ -297,6 +299,36 @@ func (rw *resourceWatcher) setupInformer(gvk schema.GroupVersionKind, informer c func (rw *resourceWatcher) onAdd(obj interface{}) { rw.waitForInitialInformerSync() + switch obj := obj.(type) { + case *corev1.Pod: + svcList := rw.metadataStore.Get(gvk.Service).List() + for _, svcObj := range svcList { + svc := svcObj.(*corev1.Service) + if svc.Spec.Selector != nil && len(svc.Spec.Selector) > 0 { + if labels.Set(svc.Spec.Selector).AsSelectorPreValidated().Matches(labels.Set(obj.Labels)) { + // only seting the first match ? + if obj.ObjectMeta.Labels == nil { + obj.ObjectMeta.Labels = make(map[string]string) + } + obj.Labels[constants.MWK8sServiceName] = svc.Name + break + } + } + } + case *corev1.Service: + podList := rw.metadataStore.Get(gvk.Pod).List() + for _, podObj := range podList { + pod := podObj.(*corev1.Pod) + selector := obj.Spec.Selector + if labels.Set(selector).AsSelectorPreValidated().Matches(labels.Set(pod.Labels)) { + if pod.ObjectMeta.Labels == nil { + pod.ObjectMeta.Labels = make(map[string]string) + } + //set the service name in the pod labels + pod.ObjectMeta.Labels[constants.MWK8sServiceName] = obj.Name + } + } + } // Sync metadata only if there's at least one destination for it to sent. if !rw.hasDestination() { @@ -312,6 +344,66 @@ func (rw *resourceWatcher) hasDestination() bool { func (rw *resourceWatcher) onUpdate(oldObj, newObj interface{}) { rw.waitForInitialInformerSync() + switch obj := newObj.(type) { + case *corev1.Pod: + oldLabels := oldObj.(*corev1.Pod).Labels + newLabels := obj.Labels + if !labels.Equals(labels.Set(oldLabels), labels.Set(newLabels)) { + rw.logger.Info("labels changed for pod ", zap.String("name", obj.Name), + zap.String("namespace", obj.Namespace), zap.Any("oldLabels", oldLabels), + zap.Any("newLabels", newLabels)) + // Get all the svc list and check if the pod labels match with any of the svc selectors + foundSvc := false + svcList := rw.metadataStore.Get(gvk.Service).List() + for _, svcObj := range svcList { + svc := svcObj.(*corev1.Service) + if svc.Spec.Selector != nil && len(svc.Spec.Selector) > 0 { + if labels.Set(svc.Spec.Selector).AsSelectorPreValidated().Matches(labels.Set(newLabels)) { + // only seting the first match ? + if obj.ObjectMeta.Labels == nil { + obj.ObjectMeta.Labels = make(map[string]string) + } + obj.ObjectMeta.Labels[constants.MWK8sServiceName] = svc.Name + foundSvc = true + break + } + } + } + + if !foundSvc { + _, ok := obj.Labels[constants.MWK8sServiceName] + if ok { + delete(obj.Labels, constants.MWK8sServiceName) + } + } + } + + case *corev1.Service: + oldSelector := oldObj.(*corev1.Service).Spec.Selector + newSelector := obj.Spec.Selector + if !labels.Equals(labels.Set(oldSelector), labels.Set(newSelector)) { + rw.logger.Info("selector changed for service ", zap.String("name", obj.Name), + zap.String("namespace", obj.Namespace), zap.Any("oldSelector", oldSelector), + zap.Any("newSelector", newSelector)) + // Get all the pod list and check if the pod labels match with the new svc selectors + podList := rw.metadataStore.Get(gvk.Pod).List() + for _, podObj := range podList { + pod := podObj.(*corev1.Pod) + if labels.Set(newSelector).AsSelectorPreValidated().Matches(labels.Set(pod.Labels)) { + if pod.ObjectMeta.Labels == nil { + pod.ObjectMeta.Labels = make(map[string]string) + } + //set the service name in the pod labes + pod.Labels[constants.MWK8sServiceName] = obj.Name + } else { + svcName, ok := obj.Labels[constants.MWK8sServiceName] + if ok && svcName == obj.Name { + delete(obj.Labels, constants.MWK8sServiceName) + } + } + } + } + } // Sync metadata only if there's at least one destination for it to sent. if !rw.hasDestination() {