Skip to content

Commit

Permalink
Test-cases for ENG-1076 & previous functions
Browse files Browse the repository at this point in the history
  • Loading branch information
sanjoyment committed Jan 5, 2024
1 parent 4cb4cfc commit 2afc846
Show file tree
Hide file tree
Showing 8 changed files with 322 additions and 85 deletions.
42 changes: 14 additions & 28 deletions receiver/k8sclusterreceiver/internal/pod/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
"k8s.io/apimachinery/pkg/labels"
k8s "k8s.io/client-go/kubernetes"
"strings"
"time"

Expand Down Expand Up @@ -72,7 +73,14 @@ func Transform(pod *corev1.Pod) *corev1.Pod {
}

func RecordMetrics(logger *zap.Logger, mb *metadata.MetricsBuilder, pod *corev1.Pod, ts pcommon.Timestamp) {
jobInfo := getJobInfoForPod(pod)
client, err := k8sconfig.MakeClient(k8sconfig.APIConfig{
AuthType: k8sconfig.AuthTypeServiceAccount,
})
if err != nil {
logger.Error(err.Error())
}

jobInfo := getJobInfoForPod(client, pod)

mb.RecordK8sPodPhaseDataPoint(ts, int64(phaseToInt(pod.Status.Phase)))
mb.RecordK8sPodStatusReasonDataPoint(ts, int64(reasonToInt(pod.Status.Reason)))
Expand All @@ -83,10 +91,10 @@ func RecordMetrics(logger *zap.Logger, mb *metadata.MetricsBuilder, pod *corev1.
rb.SetK8sPodUID(string(pod.UID))
rb.SetK8sPodStartTime(pod.GetCreationTimestamp().String())
rb.SetOpencensusResourcetype("k8s")
rb.SetK8sServiceName(getServiceNameForPod(pod))
rb.SetK8sServiceName(getServiceNameForPod(client, pod))
rb.SetK8sJobName(jobInfo.Name)
rb.SetK8sJobUID(string(jobInfo.UID))
rb.SetK8sServiceAccountName(getServiceAccountNameForPod(pod))
rb.SetK8sServiceAccountName(getServiceAccountNameForPod(client, pod))
rb.SetK8sClusterName("unknown")
mb.EmitForResource(metadata.WithResource(rb.Emit()))

Expand All @@ -95,16 +103,9 @@ func RecordMetrics(logger *zap.Logger, mb *metadata.MetricsBuilder, pod *corev1.
}
}

func getServiceNameForPod(pod *corev1.Pod) string {
func getServiceNameForPod(client k8s.Interface, pod *corev1.Pod) string {
var serviceName string

client, err := k8sconfig.MakeClient(k8sconfig.APIConfig{
AuthType: k8sconfig.AuthTypeServiceAccount,
})
if err != nil {
return ""
}

serviceList, err := client.CoreV1().Services(pod.Namespace).List(context.TODO(), v1.ListOptions{})
if err != nil {
return ""
Expand All @@ -127,14 +128,7 @@ type JobInfo struct {
UID types.UID
}

func getJobInfoForPod(pod *corev1.Pod) JobInfo {
client, err := k8sconfig.MakeClient(k8sconfig.APIConfig{
AuthType: k8sconfig.AuthTypeServiceAccount,
})
if err != nil {
return JobInfo{}
}

func getJobInfoForPod(client k8s.Interface, pod *corev1.Pod) JobInfo {
podSelector := labels.Set(pod.Labels)
jobList, err := client.BatchV1().Jobs(pod.Namespace).List(context.TODO(), v1.ListOptions{
LabelSelector: podSelector.AsSelector().String(),
Expand All @@ -153,17 +147,9 @@ func getJobInfoForPod(pod *corev1.Pod) JobInfo {
return JobInfo{}
}

func getServiceAccountNameForPod(pod *corev1.Pod) string {
func getServiceAccountNameForPod(client k8s.Interface, pod *corev1.Pod) string {
var serviceAccountName string

client, err := k8sconfig.MakeClient(k8sconfig.APIConfig{
AuthType: k8sconfig.AuthTypeServiceAccount,
})
if err != nil {
panic(err)
return ""
}

podDetails, err := client.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, v1.GetOptions{})
if err != nil {
return ""
Expand Down
102 changes: 102 additions & 0 deletions receiver/k8sclusterreceiver/internal/pod/pods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package pod

import (
"context"
"fmt"
"path/filepath"
"strings"
Expand All @@ -21,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/fake"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
Expand Down Expand Up @@ -474,3 +476,103 @@ func TestTransform(t *testing.T) {
}
assert.Equal(t, wantPod, Transform(originalPod))
}

func TestGetServiceNameForPod(t *testing.T) {
// Create a fake Kubernetes client
client := fake.NewSimpleClientset()

// Create a Pod with labels
pod := testutils.NewPodWithContainer(
"1",
testutils.NewPodSpecWithContainer("container-name"),
testutils.NewPodStatusWithContainer("container-name", containerIDWithPreifx("container-id")),
)

// Create a Service with the same labels as the Pod
service := &corev1.Service{
ObjectMeta: v1.ObjectMeta{
Name: "test-service",
Namespace: "test-namespace",
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"foo": "bar",
"foo1": "",
},
},
}

// Create the Pod and Service in the fake client
_, err := client.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, v1.CreateOptions{})
assert.NoError(t, err)

_, err = client.CoreV1().Services(service.Namespace).Create(context.TODO(), service, v1.CreateOptions{})
assert.NoError(t, err)

// Call the function
serviceName := getServiceNameForPod(client, pod)

// Verify the result
expectedServiceName := "test-service"

assert.Equal(t, expectedServiceName, serviceName)
}

func TestGetServiceAccountNameForPod(t *testing.T) {
// Create a fake Kubernetes client
client := fake.NewSimpleClientset()

// Create a Pod with labels
pod := testutils.NewPodWithContainer(
"1",
&corev1.PodSpec{
ServiceAccountName: "test-service-account",
},
testutils.NewPodStatusWithContainer("container-name", containerIDWithPreifx("container-id")),
)

// Create the Pod in the fake client
_, err := client.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, v1.CreateOptions{})
assert.NoError(t, err)

// Call the function
serviceAccountName := getServiceAccountNameForPod(client, pod)

// Verify the result
expectedServiceAccountName := "test-service-account"

assert.Equal(t, expectedServiceAccountName, serviceAccountName)
}

func TestGetJobInfoForPod(t *testing.T) {
// Create a fake Kubernetes client
client := fake.NewSimpleClientset()

// Create a Pod with labels
pod := testutils.NewPodWithContainer(
"1",
testutils.NewPodSpecWithContainer("container-name"),
testutils.NewPodStatusWithContainer("container-name", containerIDWithPreifx("container-id")),
)

// Create a Job with the same labels as the Pod
job := testutils.NewJob("1")

// Create the Pod and Job in the fake client
_, err := client.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, v1.CreateOptions{})
assert.NoError(t, err)

_, err = client.BatchV1().Jobs(job.Namespace).Create(context.TODO(), job, v1.CreateOptions{})
assert.NoError(t, err)

// Call the function
jobInfo := getJobInfoForPod(client, pod)

// Verify the result
expectedJobInfo := JobInfo{
Name: "test-job-1",
UID: job.UID,
}

assert.Equal(t, expectedJobInfo, jobInfo)
}
43 changes: 18 additions & 25 deletions receiver/kubeletstatsreceiver/internal/kubelet/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package kubelet // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/kubelet"

import (
"log"
k8s "k8s.io/client-go/kubernetes"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
Expand Down Expand Up @@ -78,7 +78,7 @@ func (a *metricDataAccumulator) nodeStats(s stats.NodeStats) {
func (a *metricDataAccumulator) getNodeUID(nodeName string) string {
uid, err := a.metadata.getNodeUID(nodeName)
if err != nil {
log.Println(err.Error())
a.logger.Error(err.Error())
return ""
}
return uid
Expand All @@ -96,8 +96,15 @@ func (a *metricDataAccumulator) podStats(s stats.PodStats) {
addFilesystemMetrics(a.mbs.PodMetricsBuilder, metadata.PodFilesystemMetrics, s.EphemeralStorage, currentTime)
addNetworkMetrics(a.mbs.PodMetricsBuilder, metadata.PodNetworkMetrics, s.Network, currentTime)

serviceName := a.getServiceName(s.PodRef.UID)
jobInfo := a.getJobInfo(s.PodRef.UID)
client, err := k8sconfig.MakeClient(k8sconfig.APIConfig{
AuthType: k8sconfig.AuthTypeServiceAccount,
})
if err != nil {
a.logger.Error(err.Error())
}

serviceName := a.getServiceName(client, s.PodRef.UID)
jobInfo := a.getJobInfo(client, s.PodRef.UID)
serviceAccountName := a.getServiceAccountName(s.PodRef.UID)

rb := a.mbs.PodMetricsBuilder.NewResourceBuilder()
Expand All @@ -117,34 +124,20 @@ func (a *metricDataAccumulator) podStats(s stats.PodStats) {
}

// getch k8s service name from metadata
func (a *metricDataAccumulator) getServiceName(podUID string) string {
k8sAPIClient, err := k8sconfig.MakeClient(k8sconfig.APIConfig{
AuthType: k8sconfig.AuthTypeServiceAccount,
})
func (a *metricDataAccumulator) getServiceName(client k8s.Interface, podUID string) string {
name, err := a.metadata.getServiceName(client, podUID)
if err != nil {
return ""
}

name, err := a.metadata.getServiceName(podUID, k8sAPIClient)
if err != nil {
log.Println(err.Error())
a.logger.Error(err.Error())
return ""
}
return name
}

// getch k8s job uid from metadata
func (a *metricDataAccumulator) getJobInfo(podUID string) JobInfo {
k8sAPIClient, err := k8sconfig.MakeClient(k8sconfig.APIConfig{
AuthType: k8sconfig.AuthTypeServiceAccount,
})
if err != nil {
return JobInfo{}
}

jobInfo, err := a.metadata.getJobInfo(podUID, k8sAPIClient)
func (a *metricDataAccumulator) getJobInfo(client k8s.Interface, podUID string) JobInfo {
jobInfo, err := a.metadata.getJobInfo(client, podUID)
if err != nil {
log.Println(err.Error())
a.logger.Error(err.Error())
return JobInfo{}
}
return jobInfo
Expand All @@ -154,7 +147,7 @@ func (a *metricDataAccumulator) getJobInfo(podUID string) JobInfo {
func (a *metricDataAccumulator) getServiceAccountName(podUID string) string {
name, err := a.metadata.getServiceAccountName(podUID)
if err != nil {
log.Println(err.Error())
a.logger.Error(err.Error())
return ""
}
return name
Expand Down
Loading

0 comments on commit 2afc846

Please sign in to comment.