From dbebd137bd6b0f5ddc2a5a213ed37aba9e44d27b Mon Sep 17 00:00:00 2001 From: Tejas Kokje Date: Wed, 10 Apr 2024 01:54:32 -0700 Subject: [PATCH 1/3] Avoid api server calls, use metadatastore --- .../internal/collection/collector.go | 1 + .../internal/constants/constants.go | 5 + .../k8sclusterreceiver/internal/pod/pods.go | 86 +++---------- .../internal/pod/pods_test.go | 102 --------------- .../internal/service/service.go | 1 + receiver/k8sclusterreceiver/watcher.go | 117 ++++++++++++++++++ 6 files changed, 139 insertions(+), 173 deletions(-) diff --git a/receiver/k8sclusterreceiver/internal/collection/collector.go b/receiver/k8sclusterreceiver/internal/collection/collector.go index 43d278858b4a..e71f265ed3be 100644 --- a/receiver/k8sclusterreceiver/internal/collection/collector.go +++ b/receiver/k8sclusterreceiver/internal/collection/collector.go @@ -134,6 +134,7 @@ func (dc *DataCollector) CollectMetricData(currentTime time.Time) pmetric.Metric dc.metadataStore.ForEach(gvk.StatefulSet, func(o any) { statefulset.RecordMetrics(dc.metricsBuilder, o.(*appsv1.StatefulSet), ts) }) + dc.metadataStore.ForEach(gvk.Job, func(o any) { jobs.RecordMetrics(dc.metricsBuilder, o.(*batchv1.Job), ts) }) diff --git a/receiver/k8sclusterreceiver/internal/constants/constants.go b/receiver/k8sclusterreceiver/internal/constants/constants.go index 191a51d365ca..b101f5b5a249 100644 --- a/receiver/k8sclusterreceiver/internal/constants/constants.go +++ b/receiver/k8sclusterreceiver/internal/constants/constants.go @@ -37,3 +37,8 @@ const ( K8sServicePrefix = "k8s.service." ) + +// Middleware.io constants +const ( + MWK8sServiceName = "middleware.io/k8s.service.name" +) diff --git a/receiver/k8sclusterreceiver/internal/pod/pods.go b/receiver/k8sclusterreceiver/internal/pod/pods.go index 2b2a28057e54..b13458a127f0 100644 --- a/receiver/k8sclusterreceiver/internal/pod/pods.go +++ b/receiver/k8sclusterreceiver/internal/pod/pods.go @@ -4,10 +4,6 @@ package pod // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/pod" import ( - "context" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" - "k8s.io/apimachinery/pkg/labels" - k8s "k8s.io/client-go/kubernetes" "strings" "time" @@ -73,14 +69,13 @@ func Transform(pod *corev1.Pod) *corev1.Pod { } func RecordMetrics(logger *zap.Logger, mb *metadata.MetricsBuilder, pod *corev1.Pod, ts pcommon.Timestamp) { - client, err := k8sconfig.MakeClient(k8sconfig.APIConfig{ - AuthType: k8sconfig.AuthTypeServiceAccount, - }) - if err != nil { - logger.Error(err.Error()) - } - jobInfo := getJobInfoForPod(client, pod) + var jobName, jobUID string + ownerReference := utils.FindOwnerWithKind(pod.OwnerReferences, constants.K8sKindJob) + if ownerReference != nil && ownerReference.Kind == constants.K8sKindJob { + jobName = ownerReference.Name + jobUID = string(ownerReference.UID) + } mb.RecordK8sPodPhaseDataPoint(ts, int64(phaseToInt(pod.Status.Phase))) mb.RecordK8sPodStatusReasonDataPoint(ts, int64(reasonToInt(pod.Status.Reason))) @@ -91,11 +86,16 @@ func RecordMetrics(logger *zap.Logger, mb *metadata.MetricsBuilder, pod *corev1. rb.SetK8sPodUID(string(pod.UID)) rb.SetK8sPodStartTime(pod.GetCreationTimestamp().String()) rb.SetOpencensusResourcetype("k8s") - rb.SetK8sServiceName(getServiceNameForPod(client, pod)) - rb.SetK8sJobName(jobInfo.Name) - rb.SetK8sJobUID(string(jobInfo.UID)) + + svcName, ok := pod.Annotations[constants.MWK8sServiceName] + if ok { + rb.SetK8sServiceName(svcName) + } + + rb.SetK8sJobName(jobName) + rb.SetK8sJobUID(string(jobUID)) rb.SetK8sPodStartTime(pod.GetCreationTimestamp().String()) - rb.SetK8sServiceAccountName(getServiceAccountNameForPod(client, pod)) + rb.SetK8sServiceAccountName(pod.Spec.ServiceAccountName) rb.SetK8sClusterName("unknown") mb.EmitForResource(metadata.WithResource(rb.Emit())) @@ -104,62 +104,6 @@ func RecordMetrics(logger *zap.Logger, mb *metadata.MetricsBuilder, pod *corev1. } } -func getServiceNameForPod(client k8s.Interface, pod *corev1.Pod) string { - var serviceName string - - serviceList, err := client.CoreV1().Services(pod.Namespace).List(context.TODO(), v1.ListOptions{}) - if err != nil { - return "" - } - - for _, svc := range serviceList.Items { - if svc.Spec.Selector != nil { - if labels.Set(svc.Spec.Selector).AsSelectorPreValidated().Matches(labels.Set(pod.Labels)) { - serviceName = svc.Name - return serviceName - } - } - } - - return "" -} - -type JobInfo struct { - Name string - UID types.UID -} - -func getJobInfoForPod(client k8s.Interface, pod *corev1.Pod) JobInfo { - podSelector := labels.Set(pod.Labels) - jobList, err := client.BatchV1().Jobs(pod.Namespace).List(context.TODO(), v1.ListOptions{ - LabelSelector: podSelector.AsSelector().String(), - }) - if err != nil { - return JobInfo{} - } - - if len(jobList.Items) > 0 { - return JobInfo{ - Name: jobList.Items[0].Name, - UID: jobList.Items[0].UID, - } - } - - return JobInfo{} -} - -func getServiceAccountNameForPod(client k8s.Interface, pod *corev1.Pod) string { - var serviceAccountName string - - podDetails, err := client.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, v1.GetOptions{}) - if err != nil { - return "" - } - - serviceAccountName = podDetails.Spec.ServiceAccountName - return serviceAccountName -} - func reasonToInt(reason string) int32 { switch reason { case "Evicted": diff --git a/receiver/k8sclusterreceiver/internal/pod/pods_test.go b/receiver/k8sclusterreceiver/internal/pod/pods_test.go index 122341906c11..7c5e40f8ed85 100644 --- a/receiver/k8sclusterreceiver/internal/pod/pods_test.go +++ b/receiver/k8sclusterreceiver/internal/pod/pods_test.go @@ -4,7 +4,6 @@ package pod import ( - "context" "fmt" "path/filepath" "strings" @@ -22,7 +21,6 @@ import ( "k8s.io/apimachinery/pkg/api/resource" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/kubernetes/fake" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" @@ -476,103 +474,3 @@ func TestTransform(t *testing.T) { } assert.Equal(t, wantPod, Transform(originalPod)) } - -func TestGetServiceNameForPod(t *testing.T) { - // Create a fake Kubernetes client - client := fake.NewSimpleClientset() - - // Create a Pod with labels - pod := testutils.NewPodWithContainer( - "1", - testutils.NewPodSpecWithContainer("container-name"), - testutils.NewPodStatusWithContainer("container-name", containerIDWithPreifx("container-id")), - ) - - // Create a Service with the same labels as the Pod - service := &corev1.Service{ - ObjectMeta: v1.ObjectMeta{ - Name: "test-service", - Namespace: "test-namespace", - }, - Spec: corev1.ServiceSpec{ - Selector: map[string]string{ - "foo": "bar", - "foo1": "", - }, - }, - } - - // Create the Pod and Service in the fake client - _, err := client.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, v1.CreateOptions{}) - assert.NoError(t, err) - - _, err = client.CoreV1().Services(service.Namespace).Create(context.TODO(), service, v1.CreateOptions{}) - assert.NoError(t, err) - - // Call the function - serviceName := getServiceNameForPod(client, pod) - - // Verify the result - expectedServiceName := "test-service" - - assert.Equal(t, expectedServiceName, serviceName) -} - -func TestGetServiceAccountNameForPod(t *testing.T) { - // Create a fake Kubernetes client - client := fake.NewSimpleClientset() - - // Create a Pod with labels - pod := testutils.NewPodWithContainer( - "1", - &corev1.PodSpec{ - ServiceAccountName: "test-service-account", - }, - testutils.NewPodStatusWithContainer("container-name", containerIDWithPreifx("container-id")), - ) - - // Create the Pod in the fake client - _, err := client.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, v1.CreateOptions{}) - assert.NoError(t, err) - - // Call the function - serviceAccountName := getServiceAccountNameForPod(client, pod) - - // Verify the result - expectedServiceAccountName := "test-service-account" - - assert.Equal(t, expectedServiceAccountName, serviceAccountName) -} - -func TestGetJobInfoForPod(t *testing.T) { - // Create a fake Kubernetes client - client := fake.NewSimpleClientset() - - // Create a Pod with labels - pod := testutils.NewPodWithContainer( - "1", - testutils.NewPodSpecWithContainer("container-name"), - testutils.NewPodStatusWithContainer("container-name", containerIDWithPreifx("container-id")), - ) - - // Create a Job with the same labels as the Pod - job := testutils.NewJob("1") - - // Create the Pod and Job in the fake client - _, err := client.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, v1.CreateOptions{}) - assert.NoError(t, err) - - _, err = client.BatchV1().Jobs(job.Namespace).Create(context.TODO(), job, v1.CreateOptions{}) - assert.NoError(t, err) - - // Call the function - jobInfo := getJobInfoForPod(client, pod) - - // Verify the result - expectedJobInfo := JobInfo{ - Name: "test-job-1", - UID: job.UID, - } - - assert.Equal(t, expectedJobInfo, jobInfo) -} diff --git a/receiver/k8sclusterreceiver/internal/service/service.go b/receiver/k8sclusterreceiver/internal/service/service.go index 2fb077a47c07..4a24f5169980 100644 --- a/receiver/k8sclusterreceiver/internal/service/service.go +++ b/receiver/k8sclusterreceiver/internal/service/service.go @@ -5,6 +5,7 @@ package service // import "github.com/open-telemetry/opentelemetry-collector-con import ( "context" "fmt" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" "go.opentelemetry.io/collector/pdata/pcommon" corev1 "k8s.io/api/core/v1" diff --git a/receiver/k8sclusterreceiver/watcher.go b/receiver/k8sclusterreceiver/watcher.go index 7552a0aac53f..d6c7d659268d 100644 --- a/receiver/k8sclusterreceiver/watcher.go +++ b/receiver/k8sclusterreceiver/watcher.go @@ -5,6 +5,7 @@ package k8sclusterreceiver // import "github.com/open-telemetry/opentelemetry-co import ( "context" + "encoding/json" "fmt" "reflect" "sync/atomic" @@ -27,6 +28,8 @@ import ( netv1 "k8s.io/api/networking/v1" rbacv1 "k8s.io/api/rbac/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" @@ -36,6 +39,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/clusterrole" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/clusterrolebinding" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/cronjob" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/demonset" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/deployment" @@ -295,8 +299,56 @@ func (rw *resourceWatcher) setupInformer(gvk schema.GroupVersionKind, informer c rw.metadataStore.Setup(gvk, informer.GetStore()) } +func (rw *resourceWatcher) getServiceAccountNameForPod(pod *corev1.Pod) string { + podDetails, err := rw.client.CoreV1().Pods(pod.Namespace).Get(context.TODO(), + pod.Name, v1.GetOptions{}) + if err != nil { + return "" + } + + return podDetails.Spec.ServiceAccountName +} + func (rw *resourceWatcher) onAdd(obj interface{}) { rw.waitForInitialInformerSync() + switch obj := obj.(type) { + case *corev1.Pod: + obj.Spec.ServiceAccountName = rw.getServiceAccountNameForPod(obj) + svcList := rw.metadataStore.Get(gvk.Service).List() + b, _ := json.Marshal(svcList) + fmt.Println("onAdd In pod svcList: ", string(b)) + for _, svcObj := range svcList { + svc := svcObj.(*corev1.Service) + if svc.Spec.Selector != nil { + if labels.Set(svc.Spec.Selector).AsSelectorPreValidated().Matches(labels.Set(obj.Labels)) { + // only seting the first match ? + if obj.ObjectMeta.Annotations == nil { + obj.ObjectMeta.Annotations = make(map[string]string) + } + obj.Annotations[constants.MWK8sServiceName] = svc.Name + fmt.Println("on Add In pod service name set for pod ", + obj.Name, obj.Annotations[constants.MWK8sServiceName]) + break + } + } + } + case *corev1.Service: + podList := rw.metadataStore.Get(gvk.Pod).List() + for _, podObj := range podList { + pod := podObj.(*corev1.Pod) + selector := obj.Spec.Selector + if labels.Set(selector).AsSelectorPreValidated().Matches(labels.Set(pod.Labels)) { + if pod.ObjectMeta.Annotations == nil { + pod.ObjectMeta.Annotations = make(map[string]string) + } + //set the service name in the pod annotations + pod.ObjectMeta.Annotations[constants.MWK8sServiceName] = obj.Name + fmt.Println("onAdd In service, service name set for pod ", + obj.Name, obj.Annotations[constants.MWK8sServiceName]) + + } + } + } // Sync metadata only if there's at least one destination for it to sent. if !rw.hasDestination() { @@ -312,6 +364,71 @@ func (rw *resourceWatcher) hasDestination() bool { func (rw *resourceWatcher) onUpdate(oldObj, newObj interface{}) { rw.waitForInitialInformerSync() + switch obj := newObj.(type) { + case *corev1.Pod: + oldLabels := oldObj.(*corev1.Pod).Labels + newLabels := obj.Labels + if !labels.Equals(labels.Set(oldLabels), labels.Set(newLabels)) { + rw.logger.Info("labels changed for pod ", zap.String("name", obj.Name), + zap.String("namespace", obj.Namespace), zap.Any("oldLabels", oldLabels), + zap.Any("newLabels", newLabels)) + fmt.Println("onUpdate labels changed for pod ", obj.Name, obj.Namespace, oldLabels, newLabels) + // Get all the svc list and check if the pod labels match with any of the svc selectors + foundSvc := false + svcList := rw.metadataStore.Get(gvk.Service).List() + for _, svcObj := range svcList { + svc := svcObj.(*corev1.Service) + if svc.Spec.Selector != nil { + if labels.Set(svc.Spec.Selector).AsSelectorPreValidated().Matches(labels.Set(newLabels)) { + // only seting the first match ? + obj.Annotations[constants.MWK8sServiceName] = svc.Name + fmt.Println("onUpdate In pod update service name set for pod ", + obj.Name, obj.Annotations[constants.MWK8sServiceName]) + foundSvc = true + break + } + } + } + + if !foundSvc { + _, ok := obj.Annotations[constants.MWK8sServiceName] + if ok { + delete(obj.Annotations, constants.MWK8sServiceName) + } + } + } + + case *corev1.Service: + oldSelector := oldObj.(*corev1.Service).Spec.Selector + newSelector := obj.Spec.Selector + if !labels.Equals(labels.Set(oldSelector), labels.Set(newSelector)) { + rw.logger.Info("selector changed for service ", zap.String("name", obj.Name), + zap.String("namespace", obj.Namespace), zap.Any("oldSelector", oldSelector), + zap.Any("newSelector", newSelector)) + fmt.Println("onUpdate selector changed for service ", obj.Name, obj.Namespace, oldSelector, newSelector) + + // Get all the pod list and check if the pod labels match with the new svc selectors + podList := rw.metadataStore.Get(gvk.Pod).List() + for _, podObj := range podList { + pod := podObj.(*corev1.Pod) + if labels.Set(newSelector).AsSelectorPreValidated().Matches(labels.Set(pod.Labels)) { + if pod.ObjectMeta.Annotations == nil { + pod.ObjectMeta.Annotations = make(map[string]string) + } + //set the service name in the pod annotations + pod.ObjectMeta.Annotations[constants.MWK8sServiceName] = obj.Name + fmt.Println("onUpdate In service, service name set for pod ", + obj.Name, obj.Annotations[constants.MWK8sServiceName]) + + } else { + svcName, ok := obj.Annotations[constants.MWK8sServiceName] + if ok && svcName == obj.Name { + delete(obj.Annotations, constants.MWK8sServiceName) + } + } + } + } + } // Sync metadata only if there's at least one destination for it to sent. if !rw.hasDestination() { From 508c642f6f02dac8791d68ed06ac1fbd7be9fca0 Mon Sep 17 00:00:00 2001 From: Tejas Kokje Date: Wed, 10 Apr 2024 17:43:54 -0700 Subject: [PATCH 2/3] change pod transformer to preserve serviceAccountName --- .../k8sclusterreceiver/internal/pod/pods.go | 6 +- .../internal/pod/pods_test.go | 2 + receiver/k8sclusterreceiver/watcher.go | 67 ++++++------------- 3 files changed, 26 insertions(+), 49 deletions(-) diff --git a/receiver/k8sclusterreceiver/internal/pod/pods.go b/receiver/k8sclusterreceiver/internal/pod/pods.go index b13458a127f0..4f2fa434cdfc 100644 --- a/receiver/k8sclusterreceiver/internal/pod/pods.go +++ b/receiver/k8sclusterreceiver/internal/pod/pods.go @@ -65,6 +65,7 @@ func Transform(pod *corev1.Pod) *corev1.Pod { }, }) } + newPod.Spec.ServiceAccountName = pod.Spec.ServiceAccountName return newPod } @@ -84,17 +85,16 @@ func RecordMetrics(logger *zap.Logger, mb *metadata.MetricsBuilder, pod *corev1. rb.SetK8sNodeName(pod.Spec.NodeName) rb.SetK8sPodName(pod.Name) rb.SetK8sPodUID(string(pod.UID)) - rb.SetK8sPodStartTime(pod.GetCreationTimestamp().String()) + rb.SetK8sPodStartTime(pod.CreationTimestamp.String()) rb.SetOpencensusResourcetype("k8s") - svcName, ok := pod.Annotations[constants.MWK8sServiceName] + svcName, ok := pod.Labels[constants.MWK8sServiceName] if ok { rb.SetK8sServiceName(svcName) } rb.SetK8sJobName(jobName) rb.SetK8sJobUID(string(jobUID)) - rb.SetK8sPodStartTime(pod.GetCreationTimestamp().String()) rb.SetK8sServiceAccountName(pod.Spec.ServiceAccountName) rb.SetK8sClusterName("unknown") mb.EmitForResource(metadata.WithResource(rb.Emit())) diff --git a/receiver/k8sclusterreceiver/internal/pod/pods_test.go b/receiver/k8sclusterreceiver/internal/pod/pods_test.go index 7c5e40f8ed85..d8327f725395 100644 --- a/receiver/k8sclusterreceiver/internal/pod/pods_test.go +++ b/receiver/k8sclusterreceiver/internal/pod/pods_test.go @@ -409,6 +409,7 @@ func TestTransform(t *testing.T) { }, }, }, + ServiceAccountName: "my-service-account", }, Status: corev1.PodStatus{ Phase: corev1.PodRunning, @@ -458,6 +459,7 @@ func TestTransform(t *testing.T) { }, }, }, + ServiceAccountName: "my-service-account", }, Status: corev1.PodStatus{ Phase: corev1.PodRunning, diff --git a/receiver/k8sclusterreceiver/watcher.go b/receiver/k8sclusterreceiver/watcher.go index d6c7d659268d..d27a607ab66a 100644 --- a/receiver/k8sclusterreceiver/watcher.go +++ b/receiver/k8sclusterreceiver/watcher.go @@ -5,7 +5,6 @@ package k8sclusterreceiver // import "github.com/open-telemetry/opentelemetry-co import ( "context" - "encoding/json" "fmt" "reflect" "sync/atomic" @@ -28,7 +27,6 @@ import ( netv1 "k8s.io/api/networking/v1" rbacv1 "k8s.io/api/rbac/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/informers" @@ -299,35 +297,20 @@ func (rw *resourceWatcher) setupInformer(gvk schema.GroupVersionKind, informer c rw.metadataStore.Setup(gvk, informer.GetStore()) } -func (rw *resourceWatcher) getServiceAccountNameForPod(pod *corev1.Pod) string { - podDetails, err := rw.client.CoreV1().Pods(pod.Namespace).Get(context.TODO(), - pod.Name, v1.GetOptions{}) - if err != nil { - return "" - } - - return podDetails.Spec.ServiceAccountName -} - func (rw *resourceWatcher) onAdd(obj interface{}) { rw.waitForInitialInformerSync() switch obj := obj.(type) { case *corev1.Pod: - obj.Spec.ServiceAccountName = rw.getServiceAccountNameForPod(obj) svcList := rw.metadataStore.Get(gvk.Service).List() - b, _ := json.Marshal(svcList) - fmt.Println("onAdd In pod svcList: ", string(b)) for _, svcObj := range svcList { svc := svcObj.(*corev1.Service) - if svc.Spec.Selector != nil { + if svc.Spec.Selector != nil && len(svc.Spec.Selector) > 0 { if labels.Set(svc.Spec.Selector).AsSelectorPreValidated().Matches(labels.Set(obj.Labels)) { // only seting the first match ? - if obj.ObjectMeta.Annotations == nil { - obj.ObjectMeta.Annotations = make(map[string]string) + if obj.ObjectMeta.Labels == nil { + obj.ObjectMeta.Labels = make(map[string]string) } - obj.Annotations[constants.MWK8sServiceName] = svc.Name - fmt.Println("on Add In pod service name set for pod ", - obj.Name, obj.Annotations[constants.MWK8sServiceName]) + obj.Labels[constants.MWK8sServiceName] = svc.Name break } } @@ -338,14 +321,11 @@ func (rw *resourceWatcher) onAdd(obj interface{}) { pod := podObj.(*corev1.Pod) selector := obj.Spec.Selector if labels.Set(selector).AsSelectorPreValidated().Matches(labels.Set(pod.Labels)) { - if pod.ObjectMeta.Annotations == nil { - pod.ObjectMeta.Annotations = make(map[string]string) + if pod.ObjectMeta.Labels == nil { + pod.ObjectMeta.Labels = make(map[string]string) } - //set the service name in the pod annotations - pod.ObjectMeta.Annotations[constants.MWK8sServiceName] = obj.Name - fmt.Println("onAdd In service, service name set for pod ", - obj.Name, obj.Annotations[constants.MWK8sServiceName]) - + //set the service name in the pod labels + pod.ObjectMeta.Labels[constants.MWK8sServiceName] = obj.Name } } } @@ -372,18 +352,18 @@ func (rw *resourceWatcher) onUpdate(oldObj, newObj interface{}) { rw.logger.Info("labels changed for pod ", zap.String("name", obj.Name), zap.String("namespace", obj.Namespace), zap.Any("oldLabels", oldLabels), zap.Any("newLabels", newLabels)) - fmt.Println("onUpdate labels changed for pod ", obj.Name, obj.Namespace, oldLabels, newLabels) // Get all the svc list and check if the pod labels match with any of the svc selectors foundSvc := false svcList := rw.metadataStore.Get(gvk.Service).List() for _, svcObj := range svcList { svc := svcObj.(*corev1.Service) - if svc.Spec.Selector != nil { + if svc.Spec.Selector != nil && len(svc.Spec.Selector) > 0 { if labels.Set(svc.Spec.Selector).AsSelectorPreValidated().Matches(labels.Set(newLabels)) { // only seting the first match ? - obj.Annotations[constants.MWK8sServiceName] = svc.Name - fmt.Println("onUpdate In pod update service name set for pod ", - obj.Name, obj.Annotations[constants.MWK8sServiceName]) + if obj.ObjectMeta.Labels == nil { + obj.ObjectMeta.Labels = make(map[string]string) + } + obj.ObjectMeta.Labels[constants.MWK8sServiceName] = svc.Name foundSvc = true break } @@ -391,9 +371,9 @@ func (rw *resourceWatcher) onUpdate(oldObj, newObj interface{}) { } if !foundSvc { - _, ok := obj.Annotations[constants.MWK8sServiceName] + _, ok := obj.Labels[constants.MWK8sServiceName] if ok { - delete(obj.Annotations, constants.MWK8sServiceName) + delete(obj.Labels, constants.MWK8sServiceName) } } } @@ -405,25 +385,20 @@ func (rw *resourceWatcher) onUpdate(oldObj, newObj interface{}) { rw.logger.Info("selector changed for service ", zap.String("name", obj.Name), zap.String("namespace", obj.Namespace), zap.Any("oldSelector", oldSelector), zap.Any("newSelector", newSelector)) - fmt.Println("onUpdate selector changed for service ", obj.Name, obj.Namespace, oldSelector, newSelector) - // Get all the pod list and check if the pod labels match with the new svc selectors podList := rw.metadataStore.Get(gvk.Pod).List() for _, podObj := range podList { pod := podObj.(*corev1.Pod) if labels.Set(newSelector).AsSelectorPreValidated().Matches(labels.Set(pod.Labels)) { - if pod.ObjectMeta.Annotations == nil { - pod.ObjectMeta.Annotations = make(map[string]string) + if pod.ObjectMeta.Labels == nil { + pod.ObjectMeta.Labels = make(map[string]string) } - //set the service name in the pod annotations - pod.ObjectMeta.Annotations[constants.MWK8sServiceName] = obj.Name - fmt.Println("onUpdate In service, service name set for pod ", - obj.Name, obj.Annotations[constants.MWK8sServiceName]) - + //set the service name in the pod labes + pod.Labels[constants.MWK8sServiceName] = obj.Name } else { - svcName, ok := obj.Annotations[constants.MWK8sServiceName] + svcName, ok := obj.Labels[constants.MWK8sServiceName] if ok && svcName == obj.Name { - delete(obj.Annotations, constants.MWK8sServiceName) + delete(obj.Labels, constants.MWK8sServiceName) } } } From 3359b6225e2ea2d6403108bc64cf36abdc43c2bb Mon Sep 17 00:00:00 2001 From: Tejas Kokje Date: Thu, 18 Apr 2024 16:19:12 -0700 Subject: [PATCH 3/3] Remove api server calls for service from k8sclusterreceiver --- .../internal/service/service.go | 33 ++++--------------- 1 file changed, 7 insertions(+), 26 deletions(-) diff --git a/receiver/k8sclusterreceiver/internal/service/service.go b/receiver/k8sclusterreceiver/internal/service/service.go index 4a24f5169980..747c2e618217 100644 --- a/receiver/k8sclusterreceiver/internal/service/service.go +++ b/receiver/k8sclusterreceiver/internal/service/service.go @@ -3,13 +3,10 @@ package service // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/service" import ( - "context" "fmt" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" "go.opentelemetry.io/collector/pdata/pcommon" corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/tools/cache" @@ -24,7 +21,10 @@ func Transform(service *corev1.Service) *corev1.Service { return &corev1.Service{ ObjectMeta: metadata.TransformObjectMeta(service.ObjectMeta), Spec: corev1.ServiceSpec{ - Selector: service.Spec.Selector, + Selector: service.Spec.Selector, + ClusterIP: service.Spec.ClusterIP, + Type: service.Spec.Type, + Ports: service.Spec.Ports, }, } } @@ -45,33 +45,14 @@ func GetPodServiceTags(pod *corev1.Pod, services cache.Store) map[string]string } func RecordMetrics(mb *imetadata.MetricsBuilder, svc *corev1.Service, ts pcommon.Timestamp) { - svcDetails := GetServiceDetails(svc) - mb.RecordK8sServicePortCountDataPoint(ts, int64(len(svcDetails.Spec.Ports))) + mb.RecordK8sServicePortCountDataPoint(ts, int64(len(svc.Spec.Ports))) rb := mb.NewResourceBuilder() rb.SetK8sServiceUID(string(svc.UID)) rb.SetK8sServiceName(svc.ObjectMeta.Name) rb.SetK8sServiceNamespace(svc.ObjectMeta.Namespace) - rb.SetK8sServiceClusterIP(svcDetails.Spec.ClusterIP) - rb.SetK8sServiceType(string(svcDetails.Spec.Type)) - rb.SetK8sServiceClusterIP(svcDetails.Spec.ClusterIP) + rb.SetK8sServiceClusterIP(svc.Spec.ClusterIP) + rb.SetK8sServiceType(string(svc.Spec.Type)) rb.SetK8sClusterName("unknown") mb.EmitForResource(metadata.WithResource(rb.Emit())) } - -func GetServiceDetails(svc *corev1.Service) *corev1.Service { - var svcObject *corev1.Service - - client, _ := k8sconfig.MakeClient(k8sconfig.APIConfig{ - AuthType: k8sconfig.AuthTypeServiceAccount, - }) - - service, err := client.CoreV1().Services(svc.ObjectMeta.Namespace).Get(context.TODO(), svc.ObjectMeta.Name, v1.GetOptions{}) - if err != nil { - panic(err) - } else { - svcObject = service - } - - return svcObject -}