Skip to content

Commit

Permalink
ENG-1067: Added k8s.pod.filesystem.utilization, k8s.node.filesystem.u…
Browse files Browse the repository at this point in the history
…tilization & container.filesystem.utilization metrics; ENG-1076: Added k8s.job.uid & k8s.job.name for job <> pod mapping.
  • Loading branch information
sanjoyment committed Jan 4, 2024
1 parent 2cff773 commit 4cb4cfc
Show file tree
Hide file tree
Showing 16 changed files with 477 additions and 19 deletions.
35 changes: 35 additions & 0 deletions receiver/k8sclusterreceiver/internal/pod/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ func Transform(pod *corev1.Pod) *corev1.Pod {
}

func RecordMetrics(logger *zap.Logger, mb *metadata.MetricsBuilder, pod *corev1.Pod, ts pcommon.Timestamp) {
jobInfo := getJobInfoForPod(pod)

mb.RecordK8sPodPhaseDataPoint(ts, int64(phaseToInt(pod.Status.Phase)))
mb.RecordK8sPodStatusReasonDataPoint(ts, int64(reasonToInt(pod.Status.Reason)))
rb := mb.NewResourceBuilder()
Expand All @@ -82,6 +84,8 @@ func RecordMetrics(logger *zap.Logger, mb *metadata.MetricsBuilder, pod *corev1.
rb.SetK8sPodStartTime(pod.GetCreationTimestamp().String())
rb.SetOpencensusResourcetype("k8s")
rb.SetK8sServiceName(getServiceNameForPod(pod))
rb.SetK8sJobName(jobInfo.Name)
rb.SetK8sJobUID(string(jobInfo.UID))
rb.SetK8sServiceAccountName(getServiceAccountNameForPod(pod))
rb.SetK8sClusterName("unknown")
mb.EmitForResource(metadata.WithResource(rb.Emit()))
Expand Down Expand Up @@ -118,6 +122,37 @@ func getServiceNameForPod(pod *corev1.Pod) string {
return ""
}

type JobInfo struct {
Name string
UID types.UID
}

func getJobInfoForPod(pod *corev1.Pod) JobInfo {
client, err := k8sconfig.MakeClient(k8sconfig.APIConfig{
AuthType: k8sconfig.AuthTypeServiceAccount,
})
if err != nil {
return JobInfo{}
}

podSelector := labels.Set(pod.Labels)
jobList, err := client.BatchV1().Jobs(pod.Namespace).List(context.TODO(), v1.ListOptions{
LabelSelector: podSelector.AsSelector().String(),
})
if err != nil {
return JobInfo{}
}

if len(jobList.Items) > 0 {
return JobInfo{
Name: jobList.Items[0].Name,
UID: jobList.Items[0].UID,
}
}

return JobInfo{}
}

func getServiceAccountNameForPod(pod *corev1.Pod) string {
var serviceAccountName string

Expand Down
2 changes: 0 additions & 2 deletions receiver/k8sclusterreceiver/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ resource_attributes:
type: string
enabled: true



k8s.namespace.uid:
description: The k8s namespace uid.
type: string
Expand Down
26 changes: 26 additions & 0 deletions receiver/kubeletstatsreceiver/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ Container filesystem usage
| ---- | ----------- | ---------- |
| By | Gauge | Int |
### container.filesystem.utilization
Container filesystem utilization
| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| 1 | Gauge | Double |
### container.memory.available
Container memory available
Expand Down Expand Up @@ -140,6 +148,14 @@ Node filesystem usage
| ---- | ----------- | ---------- |
| By | Gauge | Int |
### k8s.node.filesystem.utilization
Node filesystem utilization
| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| 1 | Gauge | Double |
### k8s.node.memory.available
Node memory available
Expand Down Expand Up @@ -258,6 +274,14 @@ Pod filesystem usage
| ---- | ----------- | ---------- |
| By | Gauge | Int |
### k8s.pod.filesystem.utilization
Pod filesystem utilization
| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| 1 | Gauge | Double |
### k8s.pod.memory.available
Pod memory available
Expand Down Expand Up @@ -422,6 +446,8 @@ The time since the pod started
| glusterfs.path | Glusterfs volume path | Any Str | true |
| k8s.cluster.name | The name of the Cluster | Any Str | true |
| k8s.container.name | Container name used by container runtime | Any Str | true |
| k8s.job.name | The name of the Job | Any Str | true |
| k8s.job.uid | The UID of the Job | Any Str | true |
| k8s.namespace.name | The name of the namespace that the pod is running in | Any Str | true |
| k8s.node.name | The name of the Node | Any Str | true |
| k8s.node.start_time | The start time of the Node. | Any Str | true |
Expand Down
20 changes: 20 additions & 0 deletions receiver/kubeletstatsreceiver/internal/kubelet/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func (a *metricDataAccumulator) podStats(s stats.PodStats) {
addNetworkMetrics(a.mbs.PodMetricsBuilder, metadata.PodNetworkMetrics, s.Network, currentTime)

serviceName := a.getServiceName(s.PodRef.UID)
jobInfo := a.getJobInfo(s.PodRef.UID)
serviceAccountName := a.getServiceAccountName(s.PodRef.UID)

rb := a.mbs.PodMetricsBuilder.NewResourceBuilder()
Expand All @@ -105,6 +106,8 @@ func (a *metricDataAccumulator) podStats(s stats.PodStats) {
rb.SetK8sPodStartTime(s.StartTime.Time.String())
rb.SetK8sNamespaceName(s.PodRef.Namespace)
rb.SetK8sServiceName(serviceName)
rb.SetK8sJobUID(string(jobInfo.UID))
rb.SetK8sJobName(jobInfo.Name)
rb.SetK8sServiceAccountName(serviceAccountName)
rb.SetK8sClusterName("unknown")
a.m = append(a.m, a.mbs.PodMetricsBuilder.Emit(
Expand All @@ -130,6 +133,23 @@ func (a *metricDataAccumulator) getServiceName(podUID string) string {
return name
}

// getch k8s job uid from metadata
func (a *metricDataAccumulator) getJobInfo(podUID string) JobInfo {
k8sAPIClient, err := k8sconfig.MakeClient(k8sconfig.APIConfig{
AuthType: k8sconfig.AuthTypeServiceAccount,
})
if err != nil {
return JobInfo{}
}

jobInfo, err := a.metadata.getJobInfo(podUID, k8sAPIClient)
if err != nil {
log.Println(err.Error())
return JobInfo{}
}
return jobInfo
}

// getch k8s service account name from metadata
func (a *metricDataAccumulator) getServiceAccountName(podUID string) string {
name, err := a.metadata.getServiceAccountName(podUID)
Expand Down
10 changes: 7 additions & 3 deletions receiver/kubeletstatsreceiver/internal/kubelet/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@
package kubelet // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/kubelet"

import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/metadata"
"go.opentelemetry.io/collector/pdata/pcommon"
stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/metadata"
)

func calculateUtilization(usedBytes, capacityBytes *uint64) float64 {
return float64(*usedBytes) / float64(*capacityBytes) * 100
}

func addFilesystemMetrics(mb *metadata.MetricsBuilder, filesystemMetrics metadata.FilesystemMetrics, s *stats.FsStats, currentTime pcommon.Timestamp) {
if s == nil {
return
}

utilization := calculateUtilization(s.UsedBytes, s.CapacityBytes)
recordIntDataPoint(mb, filesystemMetrics.Available, s.AvailableBytes, currentTime)
recordIntDataPoint(mb, filesystemMetrics.Capacity, s.CapacityBytes, currentTime)
recordIntDataPoint(mb, filesystemMetrics.Usage, s.UsedBytes, currentTime)
recordDoubleDataPoint(mb, filesystemMetrics.Utilization, &utilization, currentTime)
}
34 changes: 34 additions & 0 deletions receiver/kubeletstatsreceiver/internal/kubelet/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,40 @@ func (m *Metadata) getServiceName(podUID string, client k8s.Interface) (string,
return "", nil
}

type JobInfo struct {
Name string
UID types.UID
}

// getJobInfo retrieves k8s.job.name & k8s.job.uid from metadata for given pod uid,
// returns an error if no job found in the metadata that matches the requirements.
func (m *Metadata) getJobInfo(podUID string, client k8s.Interface) (JobInfo, error) {
if m.PodsMetadata == nil {
return JobInfo{}, errors.New("pods metadata were not fetched")
}
uid := types.UID(podUID)
for _, pod := range m.PodsMetadata.Items {
if pod.UID == uid {
podSelector := labels.Set(pod.Labels)
jobList, err := client.BatchV1().Jobs(pod.Namespace).List(context.TODO(), v_one.ListOptions{
LabelSelector: podSelector.AsSelector().String(),
})
if err != nil {
return JobInfo{}, fmt.Errorf("failed to fetch job list for POD: %w", err)
}

if len(jobList.Items) > 0 {
return JobInfo{
Name: jobList.Items[0].Name,
UID: jobList.Items[0].UID,
}, nil
}

}
}
return JobInfo{}, nil
}

// getServiceAccountName retrieves k8s.service_account.name from metadata for given pod uid,
// returns an error if no service account found in the metadata that matches the requirements.
func (m *Metadata) getServiceAccountName(podUID string) (string, error) {
Expand Down
7 changes: 7 additions & 0 deletions receiver/kubeletstatsreceiver/internal/kubelet/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,10 @@ func recordIntDataPoint(mb *metadata.MetricsBuilder, recordDataPoint metadata.Re
}
recordDataPoint(mb, currentTime, int64(*value))
}

func recordDoubleDataPoint(mb *metadata.MetricsBuilder, recordDataPoint metadata.RecordDoubleDataPointFunc, value *float64, currentTime pcommon.Timestamp) {
if value == nil {
return
}
recordDataPoint(mb, currentTime, *value)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 4cb4cfc

Please sign in to comment.