Skip to content

Commit

Permalink
[chore] Kubernetes receiver improvements (#52)
Browse files Browse the repository at this point in the history
* Avoid api server calls, use metadatastore

* change pod transformer to preserve serviceAccountName

* Remove api server calls for service from k8sclusterreceiver
  • Loading branch information
tejaskokje-mw authored and bhogayatakb committed Dec 4, 2024
1 parent 711d2f0 commit 2617d6e
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 201 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (dc *DataCollector) CollectMetricData(currentTime time.Time) pmetric.Metric
dc.metadataStore.ForEach(gvk.StatefulSet, func(o any) {
statefulset.RecordMetrics(dc.metricsBuilder, o.(*appsv1.StatefulSet), ts)
})

dc.metadataStore.ForEach(gvk.Job, func(o any) {
jobs.RecordMetrics(dc.metricsBuilder, o.(*batchv1.Job), ts)
})
Expand Down
5 changes: 5 additions & 0 deletions receiver/k8sclusterreceiver/internal/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,8 @@ const (

K8sServicePrefix = "k8s.service."
)

// Middleware.io constants
const (
MWK8sServiceName = "middleware.io/k8s.service.name"
)
91 changes: 18 additions & 73 deletions receiver/k8sclusterreceiver/internal/pod/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@
package pod // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/pod"

import (
"context"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
"k8s.io/apimachinery/pkg/labels"
k8s "k8s.io/client-go/kubernetes"
"strings"
"time"

Expand Down Expand Up @@ -72,18 +68,18 @@ func Transform(pod *corev1.Pod) *corev1.Pod {
},
})
}
newPod.Spec.ServiceAccountName = pod.Spec.ServiceAccountName
return newPod
}

func RecordMetrics(logger *zap.Logger, mb *metadata.MetricsBuilder, pod *corev1.Pod, ts pcommon.Timestamp) {
client, err := k8sconfig.MakeClient(k8sconfig.APIConfig{
AuthType: k8sconfig.AuthTypeServiceAccount,
})
if err != nil {
logger.Error(err.Error())
}

jobInfo := getJobInfoForPod(client, pod)
var jobName, jobUID string
ownerReference := utils.FindOwnerWithKind(pod.OwnerReferences, constants.K8sKindJob)
if ownerReference != nil && ownerReference.Kind == constants.K8sKindJob {
jobName = ownerReference.Name
jobUID = string(ownerReference.UID)
}

mb.RecordK8sPodPhaseDataPoint(ts, int64(phaseToInt(pod.Status.Phase)))
mb.RecordK8sPodStatusReasonDataPoint(ts, int64(reasonToInt(pod.Status.Reason)))
Expand All @@ -92,13 +88,18 @@ func RecordMetrics(logger *zap.Logger, mb *metadata.MetricsBuilder, pod *corev1.
rb.SetK8sNodeName(pod.Spec.NodeName)
rb.SetK8sPodName(pod.Name)
rb.SetK8sPodUID(string(pod.UID))
rb.SetK8sPodStartTime(pod.GetCreationTimestamp().String())
rb.SetK8sPodQosClass(string(pod.Status.QOSClass))
rb.SetK8sServiceName(getServiceNameForPod(client, pod))
rb.SetK8sPodStartTime(pod.CreationTimestamp.String())
rb.SetOpencensusResourcetype("k8s")

svcName, ok := pod.Labels[constants.MWK8sServiceName]
if ok {
rb.SetK8sServiceName(svcName)
}

rb.SetK8sPodQosClass(string(pod.Status.QOSClass))
rb.SetK8sJobName(jobInfo.Name)
rb.SetK8sJobUID(string(jobInfo.UID))
rb.SetK8sPodStartTime(pod.GetCreationTimestamp().String())
rb.SetK8sServiceAccountName(getServiceAccountNameForPod(client, pod))
rb.SetK8sJobUID(string(jobInfo.UID))
rb.SetK8sServiceAccountName(pod.Spec.ServiceAccountName)
rb.SetK8sClusterName("unknown")
mb.EmitForResource(metadata.WithResource(rb.Emit()))

Expand All @@ -107,62 +108,6 @@ func RecordMetrics(logger *zap.Logger, mb *metadata.MetricsBuilder, pod *corev1.
}
}

func getServiceNameForPod(client k8s.Interface, pod *corev1.Pod) string {
var serviceName string

serviceList, err := client.CoreV1().Services(pod.Namespace).List(context.TODO(), v1.ListOptions{})
if err != nil {
return ""
}

for _, svc := range serviceList.Items {
if svc.Spec.Selector != nil {
if labels.Set(svc.Spec.Selector).AsSelectorPreValidated().Matches(labels.Set(pod.Labels)) {
serviceName = svc.Name
return serviceName
}
}
}

return ""
}

type JobInfo struct {
Name string
UID types.UID
}

func getJobInfoForPod(client k8s.Interface, pod *corev1.Pod) JobInfo {
podSelector := labels.Set(pod.Labels)
jobList, err := client.BatchV1().Jobs(pod.Namespace).List(context.TODO(), v1.ListOptions{
LabelSelector: podSelector.AsSelector().String(),
})
if err != nil {
return JobInfo{}
}

if len(jobList.Items) > 0 {
return JobInfo{
Name: jobList.Items[0].Name,
UID: jobList.Items[0].UID,
}
}

return JobInfo{}
}

func getServiceAccountNameForPod(client k8s.Interface, pod *corev1.Pod) string {
var serviceAccountName string

podDetails, err := client.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, v1.GetOptions{})
if err != nil {
return ""
}

serviceAccountName = podDetails.Spec.ServiceAccountName
return serviceAccountName
}

func reasonToInt(reason string) int32 {
switch reason {
case "Evicted":
Expand Down
104 changes: 2 additions & 102 deletions receiver/k8sclusterreceiver/internal/pod/pods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package pod

import (
"context"
"fmt"
"path/filepath"
"strings"
Expand All @@ -22,7 +21,6 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/fake"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
Expand Down Expand Up @@ -414,6 +412,7 @@ func TestTransform(t *testing.T) {
},
},
},
ServiceAccountName: "my-service-account",
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
Expand Down Expand Up @@ -463,6 +462,7 @@ func TestTransform(t *testing.T) {
},
},
},
ServiceAccountName: "my-service-account",
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
Expand All @@ -480,103 +480,3 @@ func TestTransform(t *testing.T) {
}
assert.Equal(t, wantPod, Transform(originalPod))
}

func TestGetServiceNameForPod(t *testing.T) {
// Create a fake Kubernetes client
client := fake.NewSimpleClientset()

// Create a Pod with labels
pod := testutils.NewPodWithContainer(
"1",
testutils.NewPodSpecWithContainer("container-name"),
testutils.NewPodStatusWithContainer("container-name", containerIDWithPreifx("container-id")),
)

// Create a Service with the same labels as the Pod
service := &corev1.Service{
ObjectMeta: v1.ObjectMeta{
Name: "test-service",
Namespace: "test-namespace",
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"foo": "bar",
"foo1": "",
},
},
}

// Create the Pod and Service in the fake client
_, err := client.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, v1.CreateOptions{})
assert.NoError(t, err)

_, err = client.CoreV1().Services(service.Namespace).Create(context.TODO(), service, v1.CreateOptions{})
assert.NoError(t, err)

// Call the function
serviceName := getServiceNameForPod(client, pod)

// Verify the result
expectedServiceName := "test-service"

assert.Equal(t, expectedServiceName, serviceName)
}

func TestGetServiceAccountNameForPod(t *testing.T) {
// Create a fake Kubernetes client
client := fake.NewSimpleClientset()

// Create a Pod with labels
pod := testutils.NewPodWithContainer(
"1",
&corev1.PodSpec{
ServiceAccountName: "test-service-account",
},
testutils.NewPodStatusWithContainer("container-name", containerIDWithPreifx("container-id")),
)

// Create the Pod in the fake client
_, err := client.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, v1.CreateOptions{})
assert.NoError(t, err)

// Call the function
serviceAccountName := getServiceAccountNameForPod(client, pod)

// Verify the result
expectedServiceAccountName := "test-service-account"

assert.Equal(t, expectedServiceAccountName, serviceAccountName)
}

func TestGetJobInfoForPod(t *testing.T) {
// Create a fake Kubernetes client
client := fake.NewSimpleClientset()

// Create a Pod with labels
pod := testutils.NewPodWithContainer(
"1",
testutils.NewPodSpecWithContainer("container-name"),
testutils.NewPodStatusWithContainer("container-name", containerIDWithPreifx("container-id")),
)

// Create a Job with the same labels as the Pod
job := testutils.NewJob("1")

// Create the Pod and Job in the fake client
_, err := client.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, v1.CreateOptions{})
assert.NoError(t, err)

_, err = client.BatchV1().Jobs(job.Namespace).Create(context.TODO(), job, v1.CreateOptions{})
assert.NoError(t, err)

// Call the function
jobInfo := getJobInfoForPod(client, pod)

// Verify the result
expectedJobInfo := JobInfo{
Name: "test-job-1",
UID: job.UID,
}

assert.Equal(t, expectedJobInfo, jobInfo)
}
34 changes: 8 additions & 26 deletions receiver/k8sclusterreceiver/internal/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@

package service // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/service"
import (
"context"
"fmt"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"

"go.opentelemetry.io/collector/pdata/pcommon"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"

Expand All @@ -23,7 +21,10 @@ func Transform(service *corev1.Service) *corev1.Service {
return &corev1.Service{
ObjectMeta: metadata.TransformObjectMeta(service.ObjectMeta),
Spec: corev1.ServiceSpec{
Selector: service.Spec.Selector,
Selector: service.Spec.Selector,
ClusterIP: service.Spec.ClusterIP,
Type: service.Spec.Type,
Ports: service.Spec.Ports,
},
}
}
Expand All @@ -44,33 +45,14 @@ func GetPodServiceTags(pod *corev1.Pod, services cache.Store) map[string]string
}

func RecordMetrics(mb *imetadata.MetricsBuilder, svc *corev1.Service, ts pcommon.Timestamp) {
svcDetails := GetServiceDetails(svc)
mb.RecordK8sServicePortCountDataPoint(ts, int64(len(svcDetails.Spec.Ports)))
mb.RecordK8sServicePortCountDataPoint(ts, int64(len(svc.Spec.Ports)))

rb := mb.NewResourceBuilder()
rb.SetK8sServiceUID(string(svc.UID))
rb.SetK8sServiceName(svc.ObjectMeta.Name)
rb.SetK8sServiceNamespace(svc.ObjectMeta.Namespace)
rb.SetK8sServiceClusterIP(svcDetails.Spec.ClusterIP)
rb.SetK8sServiceType(string(svcDetails.Spec.Type))
rb.SetK8sServiceClusterIP(svcDetails.Spec.ClusterIP)
rb.SetK8sServiceClusterIP(svc.Spec.ClusterIP)
rb.SetK8sServiceType(string(svc.Spec.Type))
rb.SetK8sClusterName("unknown")
mb.EmitForResource(metadata.WithResource(rb.Emit()))
}

func GetServiceDetails(svc *corev1.Service) *corev1.Service {
var svcObject *corev1.Service

client, _ := k8sconfig.MakeClient(k8sconfig.APIConfig{
AuthType: k8sconfig.AuthTypeServiceAccount,
})

service, err := client.CoreV1().Services(svc.ObjectMeta.Namespace).Get(context.TODO(), svc.ObjectMeta.Name, v1.GetOptions{})
if err != nil {
panic(err)
} else {
svcObject = service
}

return svcObject
}
Loading

0 comments on commit 2617d6e

Please sign in to comment.