diff --git a/.chloggen/k8sclusterreceiver-improve-memory-utilization.yaml b/.chloggen/k8sclusterreceiver-improve-memory-utilization.yaml new file mode 100755 index 000000000000..697ee4a5b833 --- /dev/null +++ b/.chloggen/k8sclusterreceiver-improve-memory-utilization.yaml @@ -0,0 +1,17 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: k8sclusterreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Reduce memory utilization + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [24769] + +change_logs: [user] diff --git a/receiver/k8sclusterreceiver/internal/clusterresourcequota/clusterresourcequotas.go b/receiver/k8sclusterreceiver/internal/clusterresourcequota/clusterresourcequotas.go index 7bae4aa063a7..f2d90593f064 100644 --- a/receiver/k8sclusterreceiver/internal/clusterresourcequota/clusterresourcequotas.go +++ b/receiver/k8sclusterreceiver/internal/clusterresourcequota/clusterresourcequotas.go @@ -5,49 +5,43 @@ package clusterresourcequota // import "github.com/open-telemetry/opentelemetry- import ( "strings" - "time" quotav1 "github.com/openshift/api/quota/v1" "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" - imetadataphase "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" ) -func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadataphase.MetricsBuilderConfig, crq *quotav1.ClusterResourceQuota) pmetric.Metrics { - mbphase := imetadataphase.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) - +func RecordMetrics(mb *metadata.MetricsBuilder, crq *quotav1.ClusterResourceQuota, ts pcommon.Timestamp) { for k, v := range crq.Status.Total.Hard { val := extractValue(k, v) - mbphase.RecordOpenshiftClusterquotaLimitDataPoint(ts, val, string(k)) + mb.RecordOpenshiftClusterquotaLimitDataPoint(ts, val, string(k)) } for k, v := range crq.Status.Total.Used { val := extractValue(k, v) - mbphase.RecordOpenshiftClusterquotaUsedDataPoint(ts, val, string(k)) + mb.RecordOpenshiftClusterquotaUsedDataPoint(ts, val, string(k)) } for _, ns := range crq.Status.Namespaces { for k, v := range ns.Status.Hard { val := extractValue(k, v) - mbphase.RecordOpenshiftAppliedclusterquotaLimitDataPoint(ts, val, ns.Namespace, string(k)) + mb.RecordOpenshiftAppliedclusterquotaLimitDataPoint(ts, val, ns.Namespace, string(k)) } for k, v := range ns.Status.Used { val := extractValue(k, v) - mbphase.RecordOpenshiftAppliedclusterquotaUsedDataPoint(ts, val, ns.Namespace, string(k)) + mb.RecordOpenshiftAppliedclusterquotaUsedDataPoint(ts, val, ns.Namespace, string(k)) } } - rb := imetadataphase.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) + rb := mb.NewResourceBuilder() rb.SetOpenshiftClusterquotaName(crq.Name) rb.SetOpenshiftClusterquotaUID(string(crq.UID)) rb.SetOpencensusResourcetype("k8s") - return mbphase.Emit(imetadataphase.WithResource(rb.Emit())) + mb.EmitForResource(metadata.WithResource(rb.Emit())) } func extractValue(k v1.ResourceName, v resource.Quantity) int64 { diff --git a/receiver/k8sclusterreceiver/internal/clusterresourcequota/clusterresourcequotas_test.go b/receiver/k8sclusterreceiver/internal/clusterresourcequota/clusterresourcequotas_test.go index 84f59c8cbabd..6c3fd9d6b72a 100644 --- a/receiver/k8sclusterreceiver/internal/clusterresourcequota/clusterresourcequotas_test.go +++ b/receiver/k8sclusterreceiver/internal/clusterresourcequota/clusterresourcequotas_test.go @@ -6,8 +6,10 @@ package clusterresourcequota import ( "path/filepath" "testing" + "time" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/receiver/receivertest" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden" @@ -19,7 +21,10 @@ import ( func TestClusterRequestQuotaMetrics(t *testing.T) { crq := testutils.NewClusterResourceQuota("1") - m := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), crq) + ts := pcommon.Timestamp(time.Now().UnixNano()) + mb := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings()) + RecordMetrics(mb, crq, ts) + m := mb.Emit() expected, err := golden.ReadMetrics(filepath.Join("testdata", "expected.yaml")) require.NoError(t, err) diff --git a/receiver/k8sclusterreceiver/internal/collection/collector.go b/receiver/k8sclusterreceiver/internal/collection/collector.go index e6768d40adcb..aa4685286f5f 100644 --- a/receiver/k8sclusterreceiver/internal/collection/collector.go +++ b/receiver/k8sclusterreceiver/internal/collection/collector.go @@ -4,29 +4,24 @@ package collection // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/collection" import ( - "reflect" "time" quotav1 "github.com/openshift/api/quota/v1" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" - "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" autoscalingv2 "k8s.io/api/autoscaling/v2" autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2" batchv1 "k8s.io/api/batch/v1" batchv1beta1 "k8s.io/api/batch/v1beta1" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/cache" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/clusterresourcequota" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/cronjob" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/demonset" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/deployment" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/hpa" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/jobs" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" @@ -42,137 +37,82 @@ import ( // TODO: Consider moving some of these constants to // https://go.opentelemetry.io/collector/blob/main/model/semconv/opentelemetry.go. -// DataCollector wraps around a metricsStore and a metadaStore exposing -// methods to perform on the underlying stores. DataCollector also provides -// an interface to interact with refactored code from SignalFx Agent which is -// confined to the collection package. +// DataCollector emits metrics with CollectMetricData based on the Kubernetes API objects in the metadata store. type DataCollector struct { settings receiver.CreateSettings - metricsStore *metricsStore metadataStore *metadata.Store nodeConditionsToReport []string allocatableTypesToReport []string - metricsBuilderConfig metadata.MetricsBuilderConfig + metricsBuilder *metadata.MetricsBuilder } // NewDataCollector returns a DataCollector. -func NewDataCollector(set receiver.CreateSettings, metricsBuilderConfig metadata.MetricsBuilderConfig, nodeConditionsToReport, allocatableTypesToReport []string) *DataCollector { +func NewDataCollector(set receiver.CreateSettings, ms *metadata.Store, + metricsBuilderConfig metadata.MetricsBuilderConfig, nodeConditionsToReport, allocatableTypesToReport []string) *DataCollector { return &DataCollector{ - settings: set, - metricsStore: &metricsStore{ - metricsCache: make(map[types.UID]pmetric.Metrics), - }, - metadataStore: &metadata.Store{}, + settings: set, + metadataStore: ms, nodeConditionsToReport: nodeConditionsToReport, allocatableTypesToReport: allocatableTypesToReport, - metricsBuilderConfig: metricsBuilderConfig, - } -} - -// SetupMetadataStore initializes a metadata store for the kubernetes kind. -func (dc *DataCollector) SetupMetadataStore(gvk schema.GroupVersionKind, store cache.Store) { - dc.metadataStore.Setup(gvk, store) -} - -func (dc *DataCollector) RemoveFromMetricsStore(obj interface{}) { - if err := dc.metricsStore.remove(obj.(runtime.Object)); err != nil { - dc.settings.TelemetrySettings.Logger.Error( - "failed to remove from metric cache", - zap.String("obj", reflect.TypeOf(obj).String()), - zap.Error(err), - ) - } -} - -func (dc *DataCollector) UpdateMetricsStore(obj interface{}, md pmetric.Metrics) { - if err := dc.metricsStore.update(obj.(runtime.Object), md); err != nil { - dc.settings.TelemetrySettings.Logger.Error( - "failed to update metric cache", - zap.String("obj", reflect.TypeOf(obj).String()), - zap.Error(err), - ) + metricsBuilder: metadata.NewMetricsBuilder(metricsBuilderConfig, set), } } func (dc *DataCollector) CollectMetricData(currentTime time.Time) pmetric.Metrics { - return dc.metricsStore.getMetricData(currentTime) -} - -// SyncMetrics updates the metric store with latest metrics from the kubernetes object. -func (dc *DataCollector) SyncMetrics(obj interface{}) { - var md pmetric.Metrics - - switch o := obj.(type) { - case *corev1.Pod: - md = pod.GetMetrics(dc.settings, dc.metricsBuilderConfig, o) - case *corev1.Node: - md = node.GetMetrics(dc.settings, dc.metricsBuilderConfig, o, dc.nodeConditionsToReport, dc.allocatableTypesToReport) - case *corev1.Namespace: - md = namespace.GetMetrics(dc.settings, dc.metricsBuilderConfig, o) - case *corev1.ReplicationController: - md = replicationcontroller.GetMetrics(dc.settings, dc.metricsBuilderConfig, o) - case *corev1.ResourceQuota: - md = resourcequota.GetMetrics(dc.settings, dc.metricsBuilderConfig, o) - case *appsv1.Deployment: - md = deployment.GetMetrics(dc.settings, dc.metricsBuilderConfig, o) - case *appsv1.ReplicaSet: - md = replicaset.GetMetrics(dc.settings, dc.metricsBuilderConfig, o) - case *appsv1.DaemonSet: - md = demonset.GetMetrics(dc.settings, dc.metricsBuilderConfig, o) - case *appsv1.StatefulSet: - md = statefulset.GetMetrics(dc.settings, dc.metricsBuilderConfig, o) - case *batchv1.Job: - md = jobs.GetMetrics(dc.settings, dc.metricsBuilderConfig, o) - case *batchv1.CronJob: - md = cronjob.GetMetrics(dc.settings, dc.metricsBuilderConfig, o) - case *batchv1beta1.CronJob: - md = cronjob.GetMetricsBeta(dc.settings, dc.metricsBuilderConfig, o) - case *autoscalingv2.HorizontalPodAutoscaler: - md = hpa.GetMetrics(dc.settings, dc.metricsBuilderConfig, o) - case *autoscalingv2beta2.HorizontalPodAutoscaler: - md = hpa.GetMetricsBeta(dc.settings, dc.metricsBuilderConfig, o) - case *quotav1.ClusterResourceQuota: - md = clusterresourcequota.GetMetrics(dc.settings, dc.metricsBuilderConfig, o) - default: - return - } - - if md.DataPointCount() == 0 { - return - } - - dc.UpdateMetricsStore(obj, md) -} - -// SyncMetadata updates the metric store with latest metrics from the kubernetes object -func (dc *DataCollector) SyncMetadata(obj interface{}) map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata { - km := map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{} - switch o := obj.(type) { - case *corev1.Pod: - km = pod.GetMetadata(o, dc.metadataStore, dc.settings.TelemetrySettings.Logger) - case *corev1.Node: - km = node.GetMetadata(o) - case *corev1.ReplicationController: - km = replicationcontroller.GetMetadata(o) - case *appsv1.Deployment: - km = deployment.GetMetadata(o) - case *appsv1.ReplicaSet: - km = replicaset.GetMetadata(o) - case *appsv1.DaemonSet: - km = demonset.GetMetadata(o) - case *appsv1.StatefulSet: - km = statefulset.GetMetadata(o) - case *batchv1.Job: - km = jobs.GetMetadata(o) - case *batchv1.CronJob: - km = cronjob.GetMetadata(o) - case *batchv1beta1.CronJob: - km = cronjob.GetMetadataBeta(o) - case *autoscalingv2.HorizontalPodAutoscaler: - km = hpa.GetMetadata(o) - case *autoscalingv2beta2.HorizontalPodAutoscaler: - km = hpa.GetMetadataBeta(o) - } - - return km + ts := pcommon.NewTimestampFromTime(currentTime) + customRMs := pmetric.NewResourceMetricsSlice() + + dc.metadataStore.ForEach(gvk.Pod, func(o any) { + pod.RecordMetrics(dc.settings.Logger, dc.metricsBuilder, o.(*corev1.Pod), ts) + }) + dc.metadataStore.ForEach(gvk.Node, func(o any) { + crm := node.CustomMetrics(dc.settings, dc.metricsBuilder.NewResourceBuilder(), o.(*corev1.Node), + dc.nodeConditionsToReport, dc.allocatableTypesToReport, ts) + if crm.ScopeMetrics().Len() > 0 { + crm.MoveTo(customRMs.AppendEmpty()) + } + }) + dc.metadataStore.ForEach(gvk.Namespace, func(o any) { + namespace.RecordMetrics(dc.metricsBuilder, o.(*corev1.Namespace), ts) + }) + dc.metadataStore.ForEach(gvk.ReplicationController, func(o any) { + replicationcontroller.RecordMetrics(dc.metricsBuilder, o.(*corev1.ReplicationController), ts) + }) + dc.metadataStore.ForEach(gvk.ResourceQuota, func(o any) { + resourcequota.RecordMetrics(dc.metricsBuilder, o.(*corev1.ResourceQuota), ts) + }) + dc.metadataStore.ForEach(gvk.Deployment, func(o any) { + deployment.RecordMetrics(dc.metricsBuilder, o.(*appsv1.Deployment), ts) + }) + dc.metadataStore.ForEach(gvk.ReplicaSet, func(o any) { + replicaset.RecordMetrics(dc.metricsBuilder, o.(*appsv1.ReplicaSet), ts) + }) + dc.metadataStore.ForEach(gvk.DaemonSet, func(o any) { + demonset.RecordMetrics(dc.metricsBuilder, o.(*appsv1.DaemonSet), ts) + }) + dc.metadataStore.ForEach(gvk.StatefulSet, func(o any) { + statefulset.RecordMetrics(dc.metricsBuilder, o.(*appsv1.StatefulSet), ts) + }) + dc.metadataStore.ForEach(gvk.Job, func(o any) { + jobs.RecordMetrics(dc.metricsBuilder, o.(*batchv1.Job), ts) + }) + dc.metadataStore.ForEach(gvk.CronJob, func(o any) { + cronjob.RecordMetrics(dc.metricsBuilder, o.(*batchv1.CronJob), ts) + }) + dc.metadataStore.ForEach(gvk.CronJobBeta, func(o any) { + cronjob.RecordMetricsBeta(dc.metricsBuilder, o.(*batchv1beta1.CronJob), ts) + }) + dc.metadataStore.ForEach(gvk.HorizontalPodAutoscaler, func(o any) { + hpa.RecordMetrics(dc.metricsBuilder, o.(*autoscalingv2.HorizontalPodAutoscaler), ts) + }) + dc.metadataStore.ForEach(gvk.HorizontalPodAutoscalerBeta, func(o any) { + hpa.RecordMetricsBeta(dc.metricsBuilder, o.(*autoscalingv2beta2.HorizontalPodAutoscaler), ts) + }) + dc.metadataStore.ForEach(gvk.ClusterResourceQuota, func(o any) { + clusterresourcequota.RecordMetrics(dc.metricsBuilder, o.(*quotav1.ClusterResourceQuota), ts) + }) + + m := dc.metricsBuilder.Emit() + customRMs.MoveAndAppendTo(m.ResourceMetrics()) + return m } diff --git a/receiver/k8sclusterreceiver/internal/collection/collector_test.go b/receiver/k8sclusterreceiver/internal/collection/collector_test.go index 50ea8fb0c1e2..0c34bb595b1d 100644 --- a/receiver/k8sclusterreceiver/internal/collection/collector_test.go +++ b/receiver/k8sclusterreceiver/internal/collection/collector_test.go @@ -5,289 +5,118 @@ package collection import ( "testing" + "time" - "github.com/stretchr/testify/require" + "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/receiver/receivertest" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "go.uber.org/zap/zaptest/observer" - corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/maps" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils" ) -var commonPodMetadata = map[string]string{ - "foo": "bar", - "foo1": "", - "pod.creation_timestamp": "0001-01-01T00:00:00Z", -} - -var allPodMetadata = func(metadata map[string]string) map[string]string { - out := maps.MergeStringMaps(metadata, commonPodMetadata) - return out -} +func TestCollectMetricData(t *testing.T) { + ms := metadata.NewStore() + var expectedRMs int -func TestDataCollectorSyncMetadata(t *testing.T) { - tests := []struct { - name string - metadataStore *metadata.Store - resource interface{} - want map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata - }{ - { - name: "Pod and container metadata simple case", - metadataStore: &metadata.Store{}, - resource: testutils.NewPodWithContainer( - "0", + ms.Setup(gvk.Pod, &testutils.MockStore{ + Cache: map[string]interface{}{ + "pod1-uid": testutils.NewPodWithContainer( + "1", testutils.NewPodSpecWithContainer("container-name"), testutils.NewPodStatusWithContainer("container-name", "container-id"), ), - want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ - experimentalmetricmetadata.ResourceID("test-pod-0-uid"): { - EntityType: "k8s.pod", - ResourceIDKey: "k8s.pod.uid", - ResourceID: "test-pod-0-uid", - Metadata: commonPodMetadata, - }, - experimentalmetricmetadata.ResourceID("container-id"): { - EntityType: "container", - ResourceIDKey: "container.id", - ResourceID: "container-id", - Metadata: map[string]string{ - "container.status": "running", - }, - }, - }, }, - { - name: "Pod with Owner Reference", - metadataStore: &metadata.Store{}, - resource: testutils.WithOwnerReferences([]v1.OwnerReference{ - { - Kind: "StatefulSet", - Name: "test-statefulset-0", - UID: "test-statefulset-0-uid", - }, - }, testutils.NewPodWithContainer("0", &corev1.PodSpec{}, &corev1.PodStatus{})), - want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ - experimentalmetricmetadata.ResourceID("test-pod-0-uid"): { - EntityType: "k8s.pod", - ResourceIDKey: "k8s.pod.uid", - ResourceID: "test-pod-0-uid", - Metadata: allPodMetadata(map[string]string{ - "k8s.workload.kind": "StatefulSet", - "k8s.workload.name": "test-statefulset-0", - "k8s.statefulset.name": "test-statefulset-0", - "k8s.statefulset.uid": "test-statefulset-0-uid", - }), - }, - }, + }) + expectedRMs += 2 // 1 for pod, 1 for container + + ms.Setup(gvk.Node, &testutils.MockStore{ + Cache: map[string]interface{}{ + "node1-uid": testutils.NewNode("1"), + "node2-uid": testutils.NewNode("2"), }, - { - name: "Pod with Service metadata", - metadataStore: &metadata.Store{ - Services: &testutils.MockStore{ - Cache: map[string]interface{}{ - "test-namespace/test-service": &corev1.Service{ - ObjectMeta: v1.ObjectMeta{ - Name: "test-service", - Namespace: "test-namespace", - UID: "test-service-uid", - }, - Spec: corev1.ServiceSpec{ - Selector: map[string]string{ - "k8s-app": "my-app", - }, - }, - }, - }, - }, - }, - resource: podWithAdditionalLabels( - map[string]string{"k8s-app": "my-app"}, - testutils.NewPodWithContainer("0", &corev1.PodSpec{}, &corev1.PodStatus{}), - ), - want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ - experimentalmetricmetadata.ResourceID("test-pod-0-uid"): { - EntityType: "k8s.pod", - ResourceIDKey: "k8s.pod.uid", - ResourceID: "test-pod-0-uid", - Metadata: allPodMetadata(map[string]string{ - "k8s.service.test-service": "", - "k8s-app": "my-app", - }), - }, - }, + }) + expectedRMs += 2 + + ms.Setup(gvk.Namespace, &testutils.MockStore{ + Cache: map[string]interface{}{ + "namespace1-uid": testutils.NewNamespace("1"), }, - { - name: "Daemonset simple case", - metadataStore: &metadata.Store{}, - resource: testutils.NewDaemonset("1"), - want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ - experimentalmetricmetadata.ResourceID("test-daemonset-1-uid"): { - EntityType: "k8s.daemonset", - ResourceIDKey: "k8s.daemonset.uid", - ResourceID: "test-daemonset-1-uid", - Metadata: map[string]string{ - "k8s.workload.kind": "DaemonSet", - "k8s.workload.name": "test-daemonset-1", - "daemonset.creation_timestamp": "0001-01-01T00:00:00Z", - }, - }, - }, + }) + expectedRMs++ + + ms.Setup(gvk.ReplicationController, &testutils.MockStore{ + Cache: map[string]interface{}{ + "replicationcontroller1-uid": testutils.NewReplicationController("1"), }, - { - name: "Deployment simple case", - metadataStore: &metadata.Store{}, - resource: testutils.NewDeployment("1"), - want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ - experimentalmetricmetadata.ResourceID("test-deployment-1-uid"): { - EntityType: "k8s.deployment", - ResourceIDKey: "k8s.deployment.uid", - ResourceID: "test-deployment-1-uid", - Metadata: map[string]string{ - "k8s.workload.kind": "Deployment", - "k8s.workload.name": "test-deployment-1", - "k8s.deployment.name": "test-deployment-1", - "deployment.creation_timestamp": "0001-01-01T00:00:00Z", - }, - }, - }, + }) + expectedRMs++ + + ms.Setup(gvk.ResourceQuota, &testutils.MockStore{ + Cache: map[string]interface{}{ + "resourcequota1-uid": testutils.NewResourceQuota("1"), }, - { - name: "HPA simple case", - metadataStore: &metadata.Store{}, - resource: testutils.NewHPA("1"), - want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ - experimentalmetricmetadata.ResourceID("test-hpa-1-uid"): { - EntityType: "k8s.hpa", - ResourceIDKey: "k8s.hpa.uid", - ResourceID: "test-hpa-1-uid", - Metadata: map[string]string{ - "k8s.workload.kind": "HPA", - "k8s.workload.name": "test-hpa-1", - "hpa.creation_timestamp": "0001-01-01T00:00:00Z", - }, - }, - }, + }) + expectedRMs++ + + ms.Setup(gvk.Deployment, &testutils.MockStore{ + Cache: map[string]interface{}{ + "deployment1-uid": testutils.NewDeployment("1"), }, - { - name: "Job simple case", - metadataStore: &metadata.Store{}, - resource: testutils.NewJob("1"), - want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ - experimentalmetricmetadata.ResourceID("test-job-1-uid"): { - EntityType: "k8s.job", - ResourceIDKey: "k8s.job.uid", - ResourceID: "test-job-1-uid", - Metadata: map[string]string{ - "foo": "bar", - "foo1": "", - "k8s.workload.kind": "Job", - "k8s.workload.name": "test-job-1", - "job.creation_timestamp": "0001-01-01T00:00:00Z", - }, - }, - }, + }) + expectedRMs++ + + ms.Setup(gvk.ReplicaSet, &testutils.MockStore{ + Cache: map[string]interface{}{ + "replicaset1-uid": testutils.NewReplicaSet("1"), }, - { - name: "Node simple case", - metadataStore: &metadata.Store{}, - resource: testutils.NewNode("1"), - want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ - experimentalmetricmetadata.ResourceID("test-node-1-uid"): { - EntityType: "k8s.node", - ResourceIDKey: "k8s.node.uid", - ResourceID: "test-node-1-uid", - Metadata: map[string]string{ - "foo": "bar", - "foo1": "", - "k8s.node.name": "test-node-1", - "node.creation_timestamp": "0001-01-01T00:00:00Z", - }, - }, - }, + }) + expectedRMs++ + + ms.Setup(gvk.DaemonSet, &testutils.MockStore{ + Cache: map[string]interface{}{ + "daemonset1-uid": testutils.NewDaemonset("1"), }, - { - name: "ReplicaSet simple case", - metadataStore: &metadata.Store{}, - resource: testutils.NewReplicaSet("1"), - want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ - experimentalmetricmetadata.ResourceID("test-replicaset-1-uid"): { - EntityType: "k8s.replicaset", - ResourceIDKey: "k8s.replicaset.uid", - ResourceID: "test-replicaset-1-uid", - Metadata: map[string]string{ - "foo": "bar", - "foo1": "", - "k8s.workload.kind": "ReplicaSet", - "k8s.workload.name": "test-replicaset-1", - "replicaset.creation_timestamp": "0001-01-01T00:00:00Z", - }, - }, - }, + }) + expectedRMs++ + + ms.Setup(gvk.StatefulSet, &testutils.MockStore{ + Cache: map[string]interface{}{ + "statefulset1-uid": testutils.NewStatefulset("1"), }, - { - name: "ReplicationController simple case", - metadataStore: &metadata.Store{}, - resource: &corev1.ReplicationController{ - ObjectMeta: v1.ObjectMeta{ - Name: "test-replicationcontroller-1", - Namespace: "test-namespace", - UID: types.UID("test-replicationcontroller-1-uid"), - }, - }, - want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ - experimentalmetricmetadata.ResourceID("test-replicationcontroller-1-uid"): { - EntityType: "k8s.replicationcontroller", - ResourceIDKey: "k8s.replicationcontroller.uid", - ResourceID: "test-replicationcontroller-1-uid", - Metadata: map[string]string{ - "k8s.workload.kind": "ReplicationController", - "k8s.workload.name": "test-replicationcontroller-1", - "replicationcontroller.creation_timestamp": "0001-01-01T00:00:00Z", - }, - }, - }, + }) + expectedRMs++ + + ms.Setup(gvk.Job, &testutils.MockStore{ + Cache: map[string]interface{}{ + "job1-uid": testutils.NewJob("1"), }, - } + }) + expectedRMs++ - for _, tt := range tests { - observedLogger, _ := observer.New(zapcore.WarnLevel) - set := receivertest.NewNopCreateSettings() - set.TelemetrySettings.Logger = zap.New(observedLogger) - t.Run(tt.name, func(t *testing.T) { - dc := &DataCollector{ - settings: set, - metadataStore: tt.metadataStore, - nodeConditionsToReport: []string{}, - } + ms.Setup(gvk.CronJob, &testutils.MockStore{ + Cache: map[string]interface{}{ + "cronjob1-uid": testutils.NewCronJob("1"), + }, + }) + expectedRMs++ - actual := dc.SyncMetadata(tt.resource) - require.Equal(t, len(tt.want), len(actual)) + ms.Setup(gvk.HorizontalPodAutoscaler, &testutils.MockStore{ + Cache: map[string]interface{}{ + "horizontalpodautoscaler1-uid": testutils.NewHPA("1"), + }, + }) + expectedRMs++ - for key, item := range tt.want { - got, exists := actual[key] - require.True(t, exists) - require.Equal(t, *item, *got) - } - }) - } -} + dc := NewDataCollector(receivertest.NewNopCreateSettings(), ms, metadata.DefaultMetricsBuilderConfig(), []string{"Ready"}, nil) + m1 := dc.CollectMetricData(time.Now()) -func podWithAdditionalLabels(labels map[string]string, pod *corev1.Pod) interface{} { - if pod.Labels == nil { - pod.Labels = make(map[string]string, len(labels)) - } + // Verify number of resource metrics only, content is tested in other tests. + assert.Equal(t, expectedRMs, m1.ResourceMetrics().Len()) - for k, v := range labels { - pod.Labels[k] = v - } + m2 := dc.CollectMetricData(time.Now()) - return pod + // Second scrape should be the same as the first one except for the timestamp. + assert.NoError(t, pmetrictest.CompareMetrics(m1, m2, pmetrictest.IgnoreTimestamp())) } diff --git a/receiver/k8sclusterreceiver/internal/collection/metricsstore.go b/receiver/k8sclusterreceiver/internal/collection/metricsstore.go deleted file mode 100644 index 4297e00879f2..000000000000 --- a/receiver/k8sclusterreceiver/internal/collection/metricsstore.go +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package collection // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/collection" - -import ( - "sync" - "time" - - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/utils" -) - -// metricsStore keeps track of the metrics being pushed along the pipeline -// every interval. Since Kubernetes events that generate these metrics are -// aperiodic, the values in this cache will be pushed along the pipeline -// until the next Kubernetes event pertaining to an object. -type metricsStore struct { - sync.RWMutex - metricsCache map[types.UID]pmetric.Metrics -} - -// updates metricsStore with latest metrics. -func (ms *metricsStore) update(obj runtime.Object, md pmetric.Metrics) error { - ms.Lock() - defer ms.Unlock() - - key, err := utils.GetUIDForObject(obj) - if err != nil { - return err - } - - ms.metricsCache[key] = md - return nil -} - -// removes entry from metric cache when resources are deleted. -func (ms *metricsStore) remove(obj runtime.Object) error { - ms.Lock() - defer ms.Unlock() - - key, err := utils.GetUIDForObject(obj) - if err != nil { - return err - } - - delete(ms.metricsCache, key) - return nil -} - -// getMetricData returns metricsCache stored in the cache at a given point in time. -func (ms *metricsStore) getMetricData(currentTime time.Time) pmetric.Metrics { - ms.RLock() - defer ms.RUnlock() - - currentTimestamp := pcommon.NewTimestampFromTime(currentTime) - out := pmetric.NewMetrics() - for _, md := range ms.metricsCache { - // Set datapoint timestamp to be time of retrieval from cache. - applyCurrentTime(md, currentTimestamp) - rms := pmetric.NewResourceMetricsSlice() - md.ResourceMetrics().CopyTo(rms) - rms.MoveAndAppendTo(out.ResourceMetrics()) - } - - return out -} - -func applyCurrentTime(md pmetric.Metrics, t pcommon.Timestamp) { - rms := md.ResourceMetrics() - for i := 0; i < rms.Len(); i++ { - sms := rms.At(i).ScopeMetrics() - for j := 0; j < sms.Len(); j++ { - ms := sms.At(j).Metrics() - for k := 0; k < ms.Len(); k++ { - switch ms.At(k).Type() { - case pmetric.MetricTypeGauge: - applyCurrentTimeNumberDataPoint(ms.At(k).Gauge().DataPoints(), t) - case pmetric.MetricTypeSum: - applyCurrentTimeNumberDataPoint(ms.At(k).Sum().DataPoints(), t) - case pmetric.MetricTypeEmpty: - case pmetric.MetricTypeHistogram: - applyCurrentTimeHistogramDataPoint(ms.At(k).Histogram().DataPoints(), t) - case pmetric.MetricTypeExponentialHistogram: - applyCurrentTimeExponentialHistogramDataPoint(ms.At(k).ExponentialHistogram().DataPoints(), t) - case pmetric.MetricTypeSummary: - applyCurrentTimeSummaryDataPoint(ms.At(k).Summary().DataPoints(), t) - } - } - } - } -} - -func applyCurrentTimeSummaryDataPoint(dps pmetric.SummaryDataPointSlice, t pcommon.Timestamp) { - for i := 0; i < dps.Len(); i++ { - dp := dps.At(i) - dp.SetTimestamp(t) - } -} -func applyCurrentTimeHistogramDataPoint(dps pmetric.HistogramDataPointSlice, t pcommon.Timestamp) { - for i := 0; i < dps.Len(); i++ { - dp := dps.At(i) - dp.SetTimestamp(t) - for j := 0; j < dp.Exemplars().Len(); j++ { - dp.Exemplars().At(j).SetTimestamp(t) - } - } -} - -func applyCurrentTimeExponentialHistogramDataPoint(dps pmetric.ExponentialHistogramDataPointSlice, t pcommon.Timestamp) { - for i := 0; i < dps.Len(); i++ { - dp := dps.At(i) - dp.SetTimestamp(t) - for j := 0; j < dp.Exemplars().Len(); j++ { - dp.Exemplars().At(j).SetTimestamp(t) - } - } -} - -func applyCurrentTimeNumberDataPoint(dps pmetric.NumberDataPointSlice, t pcommon.Timestamp) { - for i := 0; i < dps.Len(); i++ { - switch dps.At(i).ValueType() { - case pmetric.NumberDataPointValueTypeDouble: - dps.At(i).SetTimestamp(t) - case pmetric.NumberDataPointValueTypeInt: - dps.At(i).SetTimestamp(t) - case pmetric.NumberDataPointValueTypeEmpty: - } - } -} diff --git a/receiver/k8sclusterreceiver/internal/collection/metricsstore_test.go b/receiver/k8sclusterreceiver/internal/collection/metricsstore_test.go deleted file mode 100644 index 9d969084db95..000000000000 --- a/receiver/k8sclusterreceiver/internal/collection/metricsstore_test.go +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package collection - -import ( - "testing" - "time" - - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/pdata/pmetric" - corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" -) - -func TestMetricsStoreOperations(t *testing.T) { - ms := metricsStore{ - metricsCache: make(map[types.UID]pmetric.Metrics), - } - - updates := []struct { - id types.UID - rm pmetric.Metrics - }{ - { - id: types.UID("test-uid-1"), - rm: func() pmetric.Metrics { - m := pmetric.NewMetrics() - m.ResourceMetrics().AppendEmpty().Resource().Attributes().PutStr("k1", "v1") - m.ResourceMetrics().AppendEmpty().Resource().Attributes().PutStr("k2", "v2") - return m - }(), - }, - { - id: types.UID("test-uid-2"), - rm: func() pmetric.Metrics { - m := pmetric.NewMetrics() - m.ResourceMetrics().AppendEmpty().Resource().Attributes().PutStr("k3", "v3") - return m - }(), - }, - } - - // Update metric store with metrics - for _, u := range updates { - require.NoError(t, ms.update(&corev1.Pod{ObjectMeta: v1.ObjectMeta{UID: u.id}}, u.rm)) - } - - // Asset values form updates - expectedMetricData := 0 - for _, u := range updates { - require.Contains(t, ms.metricsCache, u.id) - require.Equal(t, u.rm.ResourceMetrics().Len(), ms.metricsCache[u.id].ResourceMetrics().Len()) - expectedMetricData += u.rm.ResourceMetrics().Len() - } - require.Equal(t, expectedMetricData, ms.getMetricData(time.Now()).ResourceMetrics().Len()) - - // Remove non existent item - require.NoError(t, ms.remove(&corev1.Pod{ - ObjectMeta: v1.ObjectMeta{ - UID: "1", - }, - })) - require.Equal(t, len(updates), len(ms.metricsCache)) - - // Remove valid item from cache - require.NoError(t, ms.remove(&corev1.Pod{ - ObjectMeta: v1.ObjectMeta{ - UID: updates[1].id, - }, - })) - expectedMetricData -= updates[1].rm.ResourceMetrics().Len() - require.Equal(t, len(updates)-1, len(ms.metricsCache)) - require.Equal(t, expectedMetricData, ms.getMetricData(time.Now()).ResourceMetrics().Len()) -} diff --git a/receiver/k8sclusterreceiver/internal/container/containers.go b/receiver/k8sclusterreceiver/internal/container/containers.go index 94cb1cd0c86e..53eb807b2e18 100644 --- a/receiver/k8sclusterreceiver/internal/container/containers.go +++ b/receiver/k8sclusterreceiver/internal/container/containers.go @@ -4,11 +4,7 @@ package container // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/container" import ( - "time" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" conventions "go.opentelemetry.io/collector/semconv/v1.6.1" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" @@ -31,11 +27,9 @@ const ( containerStatusTerminated = "terminated" ) -// GetSpecMetrics metricizes values from the container spec. +// RecordSpecMetrics metricizes values from the container spec. // This includes values like resource requests and limits. -func GetSpecMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadata.MetricsBuilderConfig, c corev1.Container, pod *corev1.Pod) pmetric.Metrics { - mb := imetadata.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) +func RecordSpecMetrics(logger *zap.Logger, mb *imetadata.MetricsBuilder, c corev1.Container, pod *corev1.Pod, ts pcommon.Timestamp) { for k, r := range c.Resources.Requests { //exhaustive:ignore switch k { @@ -48,7 +42,7 @@ func GetSpecMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadata. case corev1.ResourceEphemeralStorage: mb.RecordK8sContainerEphemeralstorageRequestDataPoint(ts, r.Value()) default: - set.Logger.Debug("unsupported request type", zap.Any("type", k)) + logger.Debug("unsupported request type", zap.Any("type", k)) } } for k, l := range c.Resources.Limits { @@ -63,7 +57,7 @@ func GetSpecMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadata. case corev1.ResourceEphemeralStorage: mb.RecordK8sContainerEphemeralstorageLimitDataPoint(ts, l.Value()) default: - set.Logger.Debug("unsupported request type", zap.Any("type", k)) + logger.Debug("unsupported request type", zap.Any("type", k)) } } var containerID string @@ -78,7 +72,7 @@ func GetSpecMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadata. } } - rb := imetadata.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) + rb := mb.NewResourceBuilder() rb.SetK8sPodUID(string(pod.UID)) rb.SetK8sPodName(pod.Name) rb.SetK8sNodeName(pod.Spec.NodeName) @@ -88,12 +82,12 @@ func GetSpecMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadata. rb.SetK8sContainerName(c.Name) image, err := docker.ParseImageName(imageStr) if err != nil { - docker.LogParseError(err, imageStr, set.Logger) + docker.LogParseError(err, imageStr, logger) } else { rb.SetContainerImageName(image.Repository) rb.SetContainerImageTag(image.Tag) } - return mb.Emit(imetadata.WithResource(rb.Emit())) + mb.EmitForResource(imetadata.WithResource(rb.Emit())) } func GetMetadata(cs corev1.ContainerStatus) *metadata.KubernetesMetadata { diff --git a/receiver/k8sclusterreceiver/internal/cronjob/cronjobs.go b/receiver/k8sclusterreceiver/internal/cronjob/cronjobs.go index dece80dc2cda..e7a49e6c15da 100644 --- a/receiver/k8sclusterreceiver/internal/cronjob/cronjobs.go +++ b/receiver/k8sclusterreceiver/internal/cronjob/cronjobs.go @@ -4,18 +4,13 @@ package cronjob // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/cronjob" import ( - "time" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" batchv1 "k8s.io/api/batch/v1" batchv1beta1 "k8s.io/api/batch/v1beta1" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" - imetadataphase "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" ) const ( @@ -24,32 +19,25 @@ const ( cronJobKeyConcurrencyPolicy = "concurrency_policy" ) -func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadataphase.MetricsBuilderConfig, cj *batchv1.CronJob) pmetric.Metrics { - mbphase := imetadataphase.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) - - mbphase.RecordK8sCronjobActiveJobsDataPoint(ts, int64(len(cj.Status.Active))) +func RecordMetrics(mb *metadata.MetricsBuilder, cj *batchv1.CronJob, ts pcommon.Timestamp) { + mb.RecordK8sCronjobActiveJobsDataPoint(ts, int64(len(cj.Status.Active))) - rb := imetadataphase.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) + rb := mb.NewResourceBuilder() rb.SetK8sNamespaceName(cj.Namespace) rb.SetK8sCronjobUID(string(cj.UID)) rb.SetK8sCronjobName(cj.Name) rb.SetOpencensusResourcetype("k8s") - return mbphase.Emit(imetadataphase.WithResource(rb.Emit())) + mb.EmitForResource(metadata.WithResource(rb.Emit())) } -func GetMetricsBeta(set receiver.CreateSettings, metricsBuilderConfig imetadataphase.MetricsBuilderConfig, cj *batchv1beta1.CronJob) pmetric.Metrics { - mbphase := imetadataphase.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) - - mbphase.RecordK8sCronjobActiveJobsDataPoint(ts, int64(len(cj.Status.Active))) - - rb := imetadataphase.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) +func RecordMetricsBeta(mb *metadata.MetricsBuilder, cj *batchv1beta1.CronJob, ts pcommon.Timestamp) { + mb.RecordK8sCronjobActiveJobsDataPoint(ts, int64(len(cj.Status.Active))) + rb := mb.NewResourceBuilder() rb.SetK8sNamespaceName(cj.Namespace) rb.SetK8sCronjobUID(string(cj.UID)) rb.SetK8sCronjobName(cj.Name) rb.SetOpencensusResourcetype("k8s") - return mbphase.Emit(imetadataphase.WithResource(rb.Emit())) + mb.EmitForResource(metadata.WithResource(rb.Emit())) } func GetMetadata(cj *batchv1.CronJob) map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata { diff --git a/receiver/k8sclusterreceiver/internal/cronjob/cronjobs_test.go b/receiver/k8sclusterreceiver/internal/cronjob/cronjobs_test.go index 4b0476edda34..c60622d9390e 100644 --- a/receiver/k8sclusterreceiver/internal/cronjob/cronjobs_test.go +++ b/receiver/k8sclusterreceiver/internal/cronjob/cronjobs_test.go @@ -6,23 +6,26 @@ package cronjob import ( "path/filepath" "testing" + "time" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/receiver/receivertest" - batchv1 "k8s.io/api/batch/v1" - corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils" ) func TestCronJobMetrics(t *testing.T) { - cj := newCronJob("1") + cj := testutils.NewCronJob("1") + + ts := pcommon.Timestamp(time.Now().UnixNano()) + mb := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings()) + RecordMetrics(mb, cj, ts) + m := mb.Emit() - m := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), cj) expected, err := golden.ReadMetrics(filepath.Join("testdata", "expected.yaml")) require.NoError(t, err) require.NoError(t, pmetrictest.CompareMetrics(expected, m, @@ -36,7 +39,7 @@ func TestCronJobMetrics(t *testing.T) { } func TestCronJobMetadata(t *testing.T) { - cj := newCronJob("1") + cj := testutils.NewCronJob("1") actualMetadata := GetMetadata(cj) @@ -61,24 +64,3 @@ func TestCronJobMetadata(t *testing.T) { *actualMetadata["test-cronjob-1-uid"], ) } - -func newCronJob(id string) *batchv1.CronJob { - return &batchv1.CronJob{ - ObjectMeta: v1.ObjectMeta{ - Name: "test-cronjob-" + id, - Namespace: "test-namespace", - UID: types.UID("test-cronjob-" + id + "-uid"), - Labels: map[string]string{ - "foo": "bar", - "foo1": "", - }, - }, - Spec: batchv1.CronJobSpec{ - Schedule: "schedule", - ConcurrencyPolicy: "concurrency_policy", - }, - Status: batchv1.CronJobStatus{ - Active: []corev1.ObjectReference{{}, {}}, - }, - } -} diff --git a/receiver/k8sclusterreceiver/internal/demonset/daemonsets.go b/receiver/k8sclusterreceiver/internal/demonset/daemonsets.go index fc07441632e3..303e9c9a24d9 100644 --- a/receiver/k8sclusterreceiver/internal/demonset/daemonsets.go +++ b/receiver/k8sclusterreceiver/internal/demonset/daemonsets.go @@ -4,17 +4,12 @@ package demonset // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/demonset" import ( - "time" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" appsv1 "k8s.io/api/apps/v1" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" - imetadataphase "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" ) // Transform transforms the pod to remove the fields that we don't use to reduce RAM utilization. @@ -31,20 +26,18 @@ func Transform(ds *appsv1.DaemonSet) *appsv1.DaemonSet { } } -func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadataphase.MetricsBuilderConfig, ds *appsv1.DaemonSet) pmetric.Metrics { - mbphase := imetadataphase.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) - mbphase.RecordK8sDaemonsetCurrentScheduledNodesDataPoint(ts, int64(ds.Status.CurrentNumberScheduled)) - mbphase.RecordK8sDaemonsetDesiredScheduledNodesDataPoint(ts, int64(ds.Status.DesiredNumberScheduled)) - mbphase.RecordK8sDaemonsetMisscheduledNodesDataPoint(ts, int64(ds.Status.NumberMisscheduled)) - mbphase.RecordK8sDaemonsetReadyNodesDataPoint(ts, int64(ds.Status.NumberReady)) +func RecordMetrics(mb *metadata.MetricsBuilder, ds *appsv1.DaemonSet, ts pcommon.Timestamp) { + mb.RecordK8sDaemonsetCurrentScheduledNodesDataPoint(ts, int64(ds.Status.CurrentNumberScheduled)) + mb.RecordK8sDaemonsetDesiredScheduledNodesDataPoint(ts, int64(ds.Status.DesiredNumberScheduled)) + mb.RecordK8sDaemonsetMisscheduledNodesDataPoint(ts, int64(ds.Status.NumberMisscheduled)) + mb.RecordK8sDaemonsetReadyNodesDataPoint(ts, int64(ds.Status.NumberReady)) - rb := imetadataphase.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) + rb := mb.NewResourceBuilder() rb.SetK8sNamespaceName(ds.Namespace) rb.SetK8sDaemonsetName(ds.Name) rb.SetK8sDaemonsetUID(string(ds.UID)) rb.SetOpencensusResourcetype("k8s") - return mbphase.Emit(imetadataphase.WithResource(rb.Emit())) + mb.EmitForResource(metadata.WithResource(rb.Emit())) } func GetMetadata(ds *appsv1.DaemonSet) map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata { diff --git a/receiver/k8sclusterreceiver/internal/demonset/daemonsets_test.go b/receiver/k8sclusterreceiver/internal/demonset/daemonsets_test.go index 2cebe4a3616b..5222d3d91510 100644 --- a/receiver/k8sclusterreceiver/internal/demonset/daemonsets_test.go +++ b/receiver/k8sclusterreceiver/internal/demonset/daemonsets_test.go @@ -6,9 +6,11 @@ package demonset import ( "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/receiver/receivertest" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -23,7 +25,11 @@ import ( func TestDaemonsetMetrics(t *testing.T) { ds := testutils.NewDaemonset("1") - m := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), ds) + ts := pcommon.Timestamp(time.Now().UnixNano()) + mb := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings()) + RecordMetrics(mb, ds, ts) + m := mb.Emit() + expected, err := golden.ReadMetrics(filepath.Join("testdata", "expected.yaml")) require.NoError(t, err) require.NoError(t, pmetrictest.CompareMetrics(expected, m, diff --git a/receiver/k8sclusterreceiver/internal/deployment/deployments.go b/receiver/k8sclusterreceiver/internal/deployment/deployments.go index ebbf829cac09..207b5c5caaa0 100644 --- a/receiver/k8sclusterreceiver/internal/deployment/deployments.go +++ b/receiver/k8sclusterreceiver/internal/deployment/deployments.go @@ -4,11 +4,7 @@ package deployment // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/deployment" import ( - "time" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" conventions "go.opentelemetry.io/collector/semconv/v1.6.1" appsv1 "k8s.io/api/apps/v1" @@ -32,17 +28,15 @@ func Transform(deployment *appsv1.Deployment) *appsv1.Deployment { } } -func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadata.MetricsBuilderConfig, dep *appsv1.Deployment) pmetric.Metrics { - mb := imetadata.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) +func RecordMetrics(mb *imetadata.MetricsBuilder, dep *appsv1.Deployment, ts pcommon.Timestamp) { mb.RecordK8sDeploymentDesiredDataPoint(ts, int64(*dep.Spec.Replicas)) mb.RecordK8sDeploymentAvailableDataPoint(ts, int64(dep.Status.AvailableReplicas)) - rb := imetadata.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) + rb := mb.NewResourceBuilder() rb.SetK8sDeploymentName(dep.Name) rb.SetK8sDeploymentUID(string(dep.UID)) rb.SetK8sNamespaceName(dep.Namespace) rb.SetOpencensusResourcetype("k8s") - return mb.Emit(imetadata.WithResource(rb.Emit())) + mb.EmitForResource(metadata.WithResource(rb.Emit())) } func GetMetadata(dep *appsv1.Deployment) map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata { diff --git a/receiver/k8sclusterreceiver/internal/deployment/deployments_test.go b/receiver/k8sclusterreceiver/internal/deployment/deployments_test.go index 24dc7c86bae4..1ce4544310b7 100644 --- a/receiver/k8sclusterreceiver/internal/deployment/deployments_test.go +++ b/receiver/k8sclusterreceiver/internal/deployment/deployments_test.go @@ -6,9 +6,11 @@ package deployment import ( "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/receivertest" appsv1 "k8s.io/api/apps/v1" @@ -24,7 +26,10 @@ import ( func TestDeploymentMetrics(t *testing.T) { dep := testutils.NewDeployment("1") - m := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), dep) + ts := pcommon.Timestamp(time.Now().UnixNano()) + mb := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings()) + RecordMetrics(mb, dep, ts) + m := mb.Emit() require.Equal(t, 1, m.ResourceMetrics().Len()) require.Equal(t, 2, m.MetricCount()) @@ -51,7 +56,10 @@ func TestDeploymentMetrics(t *testing.T) { func TestGoldenFile(t *testing.T) { dep := testutils.NewDeployment("1") - m := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), dep) + ts := pcommon.Timestamp(time.Now().UnixNano()) + mb := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings()) + RecordMetrics(mb, dep, ts) + m := mb.Emit() expectedFile := filepath.Join("testdata", "expected.yaml") expected, err := golden.ReadMetrics(expectedFile) require.NoError(t, err) diff --git a/receiver/k8sclusterreceiver/internal/hpa/hpa.go b/receiver/k8sclusterreceiver/internal/hpa/hpa.go index 19d965aa1e20..380f1fac87a7 100644 --- a/receiver/k8sclusterreceiver/internal/hpa/hpa.go +++ b/receiver/k8sclusterreceiver/internal/hpa/hpa.go @@ -4,45 +4,36 @@ package hpa // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/hpa" import ( - "time" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" autoscalingv2 "k8s.io/api/autoscaling/v2" autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" - imetadata "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" ) -func GetMetricsBeta(set receiver.CreateSettings, metricsBuilderConfig imetadata.MetricsBuilderConfig, hpa *autoscalingv2beta2.HorizontalPodAutoscaler) pmetric.Metrics { - mb := imetadata.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) +func RecordMetricsBeta(mb *metadata.MetricsBuilder, hpa *autoscalingv2beta2.HorizontalPodAutoscaler, ts pcommon.Timestamp) { mb.RecordK8sHpaMaxReplicasDataPoint(ts, int64(hpa.Spec.MaxReplicas)) mb.RecordK8sHpaMinReplicasDataPoint(ts, int64(*hpa.Spec.MinReplicas)) mb.RecordK8sHpaCurrentReplicasDataPoint(ts, int64(hpa.Status.CurrentReplicas)) mb.RecordK8sHpaDesiredReplicasDataPoint(ts, int64(hpa.Status.DesiredReplicas)) - rb := imetadata.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) + rb := mb.NewResourceBuilder() rb.SetK8sHpaUID(string(hpa.UID)) rb.SetK8sHpaName(hpa.Name) rb.SetK8sNamespaceName(hpa.Namespace) - return mb.Emit(imetadata.WithResource(rb.Emit())) + mb.EmitForResource(metadata.WithResource(rb.Emit())) } -func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadata.MetricsBuilderConfig, hpa *autoscalingv2.HorizontalPodAutoscaler) pmetric.Metrics { - mb := imetadata.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) +func RecordMetrics(mb *metadata.MetricsBuilder, hpa *autoscalingv2.HorizontalPodAutoscaler, ts pcommon.Timestamp) { mb.RecordK8sHpaMaxReplicasDataPoint(ts, int64(hpa.Spec.MaxReplicas)) mb.RecordK8sHpaMinReplicasDataPoint(ts, int64(*hpa.Spec.MinReplicas)) mb.RecordK8sHpaCurrentReplicasDataPoint(ts, int64(hpa.Status.CurrentReplicas)) mb.RecordK8sHpaDesiredReplicasDataPoint(ts, int64(hpa.Status.DesiredReplicas)) - rb := imetadata.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) + rb := mb.NewResourceBuilder() rb.SetK8sHpaUID(string(hpa.UID)) rb.SetK8sHpaName(hpa.Name) rb.SetK8sNamespaceName(hpa.Namespace) - return mb.Emit(imetadata.WithResource(rb.Emit())) + mb.EmitForResource(metadata.WithResource(rb.Emit())) } func GetMetadata(hpa *autoscalingv2.HorizontalPodAutoscaler) map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata { diff --git a/receiver/k8sclusterreceiver/internal/hpa/hpa_test.go b/receiver/k8sclusterreceiver/internal/hpa/hpa_test.go index 479652033541..5d971412bbe3 100644 --- a/receiver/k8sclusterreceiver/internal/hpa/hpa_test.go +++ b/receiver/k8sclusterreceiver/internal/hpa/hpa_test.go @@ -5,9 +5,11 @@ package hpa import ( "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/receivertest" @@ -18,10 +20,13 @@ import ( func TestHPAMetrics(t *testing.T) { hpa := testutils.NewHPA("1") - md := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), hpa) + ts := pcommon.Timestamp(time.Now().UnixNano()) + mb := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings()) + RecordMetrics(mb, hpa, ts) + m := mb.Emit() - require.Equal(t, 1, md.ResourceMetrics().Len()) - rm := md.ResourceMetrics().At(0) + require.Equal(t, 1, m.ResourceMetrics().Len()) + rm := m.ResourceMetrics().At(0) assert.Equal(t, map[string]any{ "k8s.hpa.uid": "test-hpa-1-uid", diff --git a/receiver/k8sclusterreceiver/internal/jobs/jobs.go b/receiver/k8sclusterreceiver/internal/jobs/jobs.go index 876d8a7ec5b8..fdd0e4e07ade 100644 --- a/receiver/k8sclusterreceiver/internal/jobs/jobs.go +++ b/receiver/k8sclusterreceiver/internal/jobs/jobs.go @@ -4,40 +4,32 @@ package jobs // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/jobs" import ( - "time" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" batchv1 "k8s.io/api/batch/v1" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" - imetadataphase "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" ) -func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadataphase.MetricsBuilderConfig, j *batchv1.Job) pmetric.Metrics { - mbphase := imetadataphase.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) - - mbphase.RecordK8sJobActivePodsDataPoint(ts, int64(j.Status.Active)) - mbphase.RecordK8sJobFailedPodsDataPoint(ts, int64(j.Status.Failed)) - mbphase.RecordK8sJobSuccessfulPodsDataPoint(ts, int64(j.Status.Succeeded)) +func RecordMetrics(mb *metadata.MetricsBuilder, j *batchv1.Job, ts pcommon.Timestamp) { + mb.RecordK8sJobActivePodsDataPoint(ts, int64(j.Status.Active)) + mb.RecordK8sJobFailedPodsDataPoint(ts, int64(j.Status.Failed)) + mb.RecordK8sJobSuccessfulPodsDataPoint(ts, int64(j.Status.Succeeded)) if j.Spec.Completions != nil { - mbphase.RecordK8sJobDesiredSuccessfulPodsDataPoint(ts, int64(*j.Spec.Completions)) + mb.RecordK8sJobDesiredSuccessfulPodsDataPoint(ts, int64(*j.Spec.Completions)) } if j.Spec.Parallelism != nil { - mbphase.RecordK8sJobMaxParallelPodsDataPoint(ts, int64(*j.Spec.Parallelism)) + mb.RecordK8sJobMaxParallelPodsDataPoint(ts, int64(*j.Spec.Parallelism)) } - rb := imetadataphase.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) + rb := mb.NewResourceBuilder() rb.SetK8sNamespaceName(j.Namespace) rb.SetK8sJobName(j.Name) rb.SetK8sJobUID(string(j.UID)) rb.SetOpencensusResourcetype("k8s") - return mbphase.Emit(imetadataphase.WithResource(rb.Emit())) + mb.EmitForResource(metadata.WithResource(rb.Emit())) } // Transform transforms the job to remove the fields that we don't use to reduce RAM utilization. diff --git a/receiver/k8sclusterreceiver/internal/jobs/jobs_test.go b/receiver/k8sclusterreceiver/internal/jobs/jobs_test.go index 642657449c05..f76e070dc138 100644 --- a/receiver/k8sclusterreceiver/internal/jobs/jobs_test.go +++ b/receiver/k8sclusterreceiver/internal/jobs/jobs_test.go @@ -6,9 +6,11 @@ package jobs import ( "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/receiver/receivertest" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -23,7 +25,10 @@ import ( func TestJobMetrics(t *testing.T) { j := testutils.NewJob("1") - m := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), j) + ts := pcommon.Timestamp(time.Now().UnixNano()) + mb := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings()) + RecordMetrics(mb, j, ts) + m := mb.Emit() expected, err := golden.ReadMetrics(filepath.Join("testdata", "expected.yaml")) require.NoError(t, err) @@ -35,10 +40,12 @@ func TestJobMetrics(t *testing.T) { pmetrictest.IgnoreScopeMetricsOrder(), ), ) + // Test with nil values. j.Spec.Completions = nil j.Spec.Parallelism = nil - m = GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), j) + RecordMetrics(mb, j, ts) + m = mb.Emit() expected, err = golden.ReadMetrics(filepath.Join("testdata", "expected_empty.yaml")) require.NoError(t, err) require.NoError(t, pmetrictest.CompareMetrics(expected, m, diff --git a/receiver/k8sclusterreceiver/internal/metadata/metadatastore.go b/receiver/k8sclusterreceiver/internal/metadata/metadatastore.go index 1ec348b13b5f..546f3ba62142 100644 --- a/receiver/k8sclusterreceiver/internal/metadata/metadatastore.go +++ b/receiver/k8sclusterreceiver/internal/metadata/metadatastore.go @@ -6,27 +6,40 @@ package metadata // import "github.com/open-telemetry/opentelemetry-collector-co import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/tools/cache" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk" ) // Store keeps track of required caches exposed by informers. // This store is used while collecting metadata about Pods to be able // to correlate other Kubernetes objects with a Pod. type Store struct { - Services cache.Store - Jobs cache.Store - ReplicaSets cache.Store + stores map[schema.GroupVersionKind]cache.Store +} + +// NewStore creates a new Store. +func NewStore() *Store { + return &Store{ + stores: make(map[schema.GroupVersionKind]cache.Store), + } +} + +// Get returns a cache.Store for a given GroupVersionKind. +func (ms *Store) Get(gvk schema.GroupVersionKind) cache.Store { + return ms.stores[gvk] } // Setup tracks metadata of services, jobs and replicasets. -func (ms *Store) Setup(kind schema.GroupVersionKind, store cache.Store) { - switch kind { - case gvk.Service: - ms.Services = store - case gvk.Job: - ms.Jobs = store - case gvk.ReplicaSet: - ms.ReplicaSets = store +func (ms *Store) Setup(gvk schema.GroupVersionKind, store cache.Store) { + ms.stores[gvk] = store +} + +// ForEach iterates over all objects in a given cache.Store. +func (ms *Store) ForEach(gvk schema.GroupVersionKind, f func(o any)) { + store := ms.Get(gvk) + if store == nil { + // This is normal, not all caches are set up, e.g. ClusterResourceQuota is only available in OpenShift. + return + } + for _, obj := range store.List() { + f(obj) } } diff --git a/receiver/k8sclusterreceiver/internal/namespace/namespaces.go b/receiver/k8sclusterreceiver/internal/namespace/namespaces.go index 6e4e08377fa3..19fc86377049 100644 --- a/receiver/k8sclusterreceiver/internal/namespace/namespaces.go +++ b/receiver/k8sclusterreceiver/internal/namespace/namespaces.go @@ -4,25 +4,19 @@ package namespace // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/namespace" import ( - "time" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" corev1 "k8s.io/api/core/v1" imetadata "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" ) -func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadata.MetricsBuilderConfig, ns *corev1.Namespace) pmetric.Metrics { - mb := imetadata.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) +func RecordMetrics(mb *imetadata.MetricsBuilder, ns *corev1.Namespace, ts pcommon.Timestamp) { mb.RecordK8sNamespacePhaseDataPoint(ts, int64(namespacePhaseValues[ns.Status.Phase])) - rb := imetadata.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) + rb := mb.NewResourceBuilder() rb.SetK8sNamespaceUID(string(ns.UID)) rb.SetK8sNamespaceName(ns.Name) rb.SetOpencensusResourcetype("k8s") - return mb.Emit(imetadata.WithResource(rb.Emit())) + mb.EmitForResource(imetadata.WithResource(rb.Emit())) } var namespacePhaseValues = map[corev1.NamespacePhase]int32{ diff --git a/receiver/k8sclusterreceiver/internal/namespace/namespaces_test.go b/receiver/k8sclusterreceiver/internal/namespace/namespaces_test.go index 8f85f3854b91..86a9599e1bb4 100644 --- a/receiver/k8sclusterreceiver/internal/namespace/namespaces_test.go +++ b/receiver/k8sclusterreceiver/internal/namespace/namespaces_test.go @@ -6,21 +6,24 @@ package namespace import ( "path/filepath" "testing" + "time" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/receiver/receivertest" - corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils" ) func TestNamespaceMetrics(t *testing.T) { - n := newNamespace("1") - m := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), n) + n := testutils.NewNamespace("1") + ts := pcommon.Timestamp(time.Now().UnixNano()) + mb := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings()) + RecordMetrics(mb, n, ts) + m := mb.Emit() expected, err := golden.ReadMetrics(filepath.Join("testdata", "expected.yaml")) require.NoError(t, err) @@ -33,19 +36,3 @@ func TestNamespaceMetrics(t *testing.T) { ), ) } - -func newNamespace(id string) *corev1.Namespace { - return &corev1.Namespace{ - ObjectMeta: v1.ObjectMeta{ - Name: "test-namespace-" + id, - UID: types.UID("test-namespace-" + id + "-uid"), - Labels: map[string]string{ - "foo": "bar", - "foo1": "", - }, - }, - Status: corev1.NamespaceStatus{ - Phase: corev1.NamespaceTerminating, - }, - } -} diff --git a/receiver/k8sclusterreceiver/internal/node/nodes.go b/receiver/k8sclusterreceiver/internal/node/nodes.go index de5fd3551c1b..77b2981647ca 100644 --- a/receiver/k8sclusterreceiver/internal/node/nodes.go +++ b/receiver/k8sclusterreceiver/internal/node/nodes.go @@ -43,17 +43,11 @@ func Transform(node *corev1.Node) *corev1.Node { return newNode } -func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig metadata.MetricsBuilderConfig, node *corev1.Node, nodeConditionTypesToReport, allocatableTypesToReport []string) pmetric.Metrics { - ts := pcommon.NewTimestampFromTime(time.Now()) - ms := pmetric.NewMetrics() - rm := ms.ResourceMetrics().AppendEmpty() +func CustomMetrics(set receiver.CreateSettings, rb *metadata.ResourceBuilder, node *corev1.Node, nodeConditionTypesToReport, + allocatableTypesToReport []string, ts pcommon.Timestamp) pmetric.ResourceMetrics { + rm := pmetric.NewResourceMetrics() - // TODO: Generate a schema URL for the node metrics in the metadata package and use them here. - rm.SetSchemaUrl(conventions.SchemaURL) sm := rm.ScopeMetrics().AppendEmpty() - sm.Scope().SetName("otelcol/k8sclusterreceiver") - sm.Scope().SetVersion(set.BuildInfo.Version) - // Adding 'node condition type' metrics for _, nodeConditionTypeValue := range nodeConditionTypesToReport { v1NodeConditionTypeValue := corev1.NodeConditionType(nodeConditionTypeValue) @@ -85,13 +79,21 @@ func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig metadata.Metri setNodeAllocatableValue(dp, v1NodeAllocatableTypeValue, quantity) dp.SetTimestamp(ts) } - rb := metadata.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) + + if sm.Metrics().Len() == 0 { + return pmetric.NewResourceMetrics() + } + + // TODO: Generate a schema URL for the node metrics in the metadata package and use them here. + rm.SetSchemaUrl(conventions.SchemaURL) + sm.Scope().SetName("otelcol/k8sclusterreceiver") + sm.Scope().SetVersion(set.BuildInfo.Version) + rb.SetK8sNodeUID(string(node.UID)) rb.SetK8sNodeName(node.Name) rb.SetOpencensusResourcetype("k8s") rb.Emit().MoveTo(rm.Resource()) - return ms - + return rm } var nodeConditionValues = map[corev1.ConditionStatus]int64{ diff --git a/receiver/k8sclusterreceiver/internal/node/nodes_test.go b/receiver/k8sclusterreceiver/internal/node/nodes_test.go index 1d84700c5ea4..4eb37189be0d 100644 --- a/receiver/k8sclusterreceiver/internal/node/nodes_test.go +++ b/receiver/k8sclusterreceiver/internal/node/nodes_test.go @@ -6,9 +6,12 @@ package node import ( "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/receivertest" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -22,7 +25,8 @@ import ( func TestNodeMetricsReportCPUMetrics(t *testing.T) { n := testutils.NewNode("1") - m := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), n, + rb := metadata.NewResourceBuilder(metadata.DefaultResourceAttributesConfig()) + rm := CustomMetrics(receivertest.NewNopCreateSettings(), rb, n, []string{ "Ready", "MemoryPressure", @@ -41,7 +45,11 @@ func TestNodeMetricsReportCPUMetrics(t *testing.T) { "hugepages-2Mi", "not-present", }, + pcommon.Timestamp(time.Now().UnixNano()), ) + m := pmetric.NewMetrics() + rm.MoveTo(m.ResourceMetrics().AppendEmpty()) + expected, err := golden.ReadMetrics(filepath.Join("testdata", "expected.yaml")) require.NoError(t, err) require.NoError(t, pmetrictest.CompareMetrics(expected, m, diff --git a/receiver/k8sclusterreceiver/internal/pod/pods.go b/receiver/k8sclusterreceiver/internal/pod/pods.go index 798bebf6e12d..575d71c01330 100644 --- a/receiver/k8sclusterreceiver/internal/pod/pods.go +++ b/receiver/k8sclusterreceiver/internal/pod/pods.go @@ -9,8 +9,6 @@ import ( "time" "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" conventions "go.opentelemetry.io/collector/semconv/v1.6.1" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" @@ -25,8 +23,8 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/container" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" - imetadataphase "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/utils" ) @@ -71,24 +69,19 @@ func Transform(pod *corev1.Pod) *corev1.Pod { return newPod } -func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadataphase.MetricsBuilderConfig, pod *corev1.Pod) pmetric.Metrics { - mbphase := imetadataphase.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) - mbphase.RecordK8sPodPhaseDataPoint(ts, int64(phaseToInt(pod.Status.Phase))) - rb := imetadataphase.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) +func RecordMetrics(logger *zap.Logger, mb *metadata.MetricsBuilder, pod *corev1.Pod, ts pcommon.Timestamp) { + mb.RecordK8sPodPhaseDataPoint(ts, int64(phaseToInt(pod.Status.Phase))) + rb := mb.NewResourceBuilder() rb.SetK8sNamespaceName(pod.Namespace) rb.SetK8sNodeName(pod.Spec.NodeName) rb.SetK8sPodName(pod.Name) rb.SetK8sPodUID(string(pod.UID)) rb.SetOpencensusResourcetype("k8s") - metrics := mbphase.Emit(imetadataphase.WithResource(rb.Emit())) + mb.EmitForResource(metadata.WithResource(rb.Emit())) for _, c := range pod.Spec.Containers { - specMetrics := container.GetSpecMetrics(set, metricsBuilderConfig, c, pod) - specMetrics.ResourceMetrics().MoveAndAppendTo(metrics.ResourceMetrics()) + container.RecordSpecMetrics(logger, mb, c, pod, ts) } - - return metrics } func phaseToInt(phase corev1.PodPhase) int32 { @@ -127,16 +120,16 @@ func GetMetadata(pod *corev1.Pod, mc *metadata.Store, logger *zap.Logger) map[ex meta[constants.K8sKeyWorkLoadName] = or.Name } - if mc.Services != nil { - meta = maps.MergeStringMaps(meta, getPodServiceTags(pod, mc.Services)) + if store := mc.Get(gvk.Service); store != nil { + meta = maps.MergeStringMaps(meta, getPodServiceTags(pod, store)) } - if mc.Jobs != nil { - meta = maps.MergeStringMaps(meta, collectPodJobProperties(pod, mc.Jobs, logger)) + if store := mc.Get(gvk.Job); store != nil { + meta = maps.MergeStringMaps(meta, collectPodJobProperties(pod, store, logger)) } - if mc.ReplicaSets != nil { - meta = maps.MergeStringMaps(meta, collectPodReplicaSetProperties(pod, mc.ReplicaSets, logger)) + if store := mc.Get(gvk.ReplicaSet); store != nil { + meta = maps.MergeStringMaps(meta, collectPodReplicaSetProperties(pod, store, logger)) } podID := experimentalmetricmetadata.ResourceID(pod.UID) diff --git a/receiver/k8sclusterreceiver/internal/pod/pods_test.go b/receiver/k8sclusterreceiver/internal/pod/pods_test.go index d8bef00a2048..943bdc97adc5 100644 --- a/receiver/k8sclusterreceiver/internal/pod/pods_test.go +++ b/receiver/k8sclusterreceiver/internal/pod/pods_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -24,6 +25,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils" ) @@ -41,7 +43,10 @@ func TestPodAndContainerMetricsReportCPUMetrics(t *testing.T) { testutils.NewPodStatusWithContainer("container-name", containerIDWithPreifx("container-id")), ) - m := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), pod) + ts := pcommon.Timestamp(time.Now().UnixNano()) + mb := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings()) + RecordMetrics(zap.NewNop(), mb, pod, ts) + m := mb.Emit() expected, err := golden.ReadMetrics(filepath.Join("testdata", "expected.yaml")) require.NoError(t, err) require.NoError(t, pmetrictest.CompareMetrics(expected, m, @@ -248,7 +253,7 @@ func expectedKubernetesMetadata(to testCaseOptions) map[experimentalmetricmetada } func mockMetadataStore(to testCaseOptions) *metadata.Store { - ms := &metadata.Store{} + ms := metadata.NewStore() if to.wantNilCache { return ms @@ -261,7 +266,7 @@ func mockMetadataStore(to testCaseOptions) *metadata.Store { switch to.kind { case "Job": - ms.Jobs = store + ms.Setup(gvk.Job, store) if !to.emptyCache { if to.withParentOR { store.Cache["test-namespace/test-job-0"] = testutils.WithOwnerReferences( @@ -279,7 +284,7 @@ func mockMetadataStore(to testCaseOptions) *metadata.Store { } return ms case "ReplicaSet": - ms.ReplicaSets = store + ms.Setup(gvk.ReplicaSet, store) if !to.emptyCache { if to.withParentOR { store.Cache["test-namespace/test-replicaset-0"] = testutils.WithOwnerReferences( diff --git a/receiver/k8sclusterreceiver/internal/replicaset/replicasets.go b/receiver/k8sclusterreceiver/internal/replicaset/replicasets.go index 4562d7606bca..d3fde6ca6f3c 100644 --- a/receiver/k8sclusterreceiver/internal/replicaset/replicasets.go +++ b/receiver/k8sclusterreceiver/internal/replicaset/replicasets.go @@ -4,17 +4,12 @@ package replicaset // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/replicaset" import ( - "time" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" appsv1 "k8s.io/api/apps/v1" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" - imetadataphase "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" ) // Transform transforms the replica set to remove the fields that we don't use to reduce RAM utilization. @@ -31,21 +26,18 @@ func Transform(rs *appsv1.ReplicaSet) *appsv1.ReplicaSet { } } -func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadataphase.MetricsBuilderConfig, rs *appsv1.ReplicaSet) pmetric.Metrics { - - mbphase := imetadataphase.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) +func RecordMetrics(mb *metadata.MetricsBuilder, rs *appsv1.ReplicaSet, ts pcommon.Timestamp) { if rs.Spec.Replicas != nil { - mbphase.RecordK8sReplicasetDesiredDataPoint(ts, int64(*rs.Spec.Replicas)) - mbphase.RecordK8sReplicasetAvailableDataPoint(ts, int64(rs.Status.AvailableReplicas)) + mb.RecordK8sReplicasetDesiredDataPoint(ts, int64(*rs.Spec.Replicas)) + mb.RecordK8sReplicasetAvailableDataPoint(ts, int64(rs.Status.AvailableReplicas)) } - rb := imetadataphase.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) + rb := mb.NewResourceBuilder() rb.SetK8sNamespaceName(rs.Namespace) rb.SetK8sReplicasetName(rs.Name) rb.SetK8sReplicasetUID(string(rs.UID)) rb.SetOpencensusResourcetype("k8s") - return mbphase.Emit(imetadataphase.WithResource(rb.Emit())) + mb.EmitForResource(metadata.WithResource(rb.Emit())) } func GetMetadata(rs *appsv1.ReplicaSet) map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata { diff --git a/receiver/k8sclusterreceiver/internal/replicaset/replicasets_test.go b/receiver/k8sclusterreceiver/internal/replicaset/replicasets_test.go index 8059dce935b6..e4e1f005dbdd 100644 --- a/receiver/k8sclusterreceiver/internal/replicaset/replicasets_test.go +++ b/receiver/k8sclusterreceiver/internal/replicaset/replicasets_test.go @@ -6,9 +6,11 @@ package replicaset import ( "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/receiver/receivertest" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" @@ -23,7 +25,10 @@ import ( func TestReplicasetMetrics(t *testing.T) { rs := testutils.NewReplicaSet("1") - m := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), rs) + ts := pcommon.Timestamp(time.Now().UnixNano()) + mb := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings()) + RecordMetrics(mb, rs, ts) + m := mb.Emit() expected, err := golden.ReadMetrics(filepath.Join("testdata", "expected.yaml")) require.NoError(t, err) require.NoError(t, pmetrictest.CompareMetrics(expected, m, diff --git a/receiver/k8sclusterreceiver/internal/replicationcontroller/replicationcontrollers.go b/receiver/k8sclusterreceiver/internal/replicationcontroller/replicationcontrollers.go index 97b31cdb5dce..f4fb422c5356 100644 --- a/receiver/k8sclusterreceiver/internal/replicationcontroller/replicationcontrollers.go +++ b/receiver/k8sclusterreceiver/internal/replicationcontroller/replicationcontrollers.go @@ -4,34 +4,26 @@ package replicationcontroller // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/replicationcontroller" import ( - "time" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" corev1 "k8s.io/api/core/v1" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" - imetadataphase "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" ) -func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadataphase.MetricsBuilderConfig, rc *corev1.ReplicationController) pmetric.Metrics { - mbphase := imetadataphase.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) - +func RecordMetrics(mb *metadata.MetricsBuilder, rc *corev1.ReplicationController, ts pcommon.Timestamp) { if rc.Spec.Replicas != nil { - mbphase.RecordK8sReplicationControllerDesiredDataPoint(ts, int64(*rc.Spec.Replicas)) - mbphase.RecordK8sReplicationControllerAvailableDataPoint(ts, int64(rc.Status.AvailableReplicas)) + mb.RecordK8sReplicationControllerDesiredDataPoint(ts, int64(*rc.Spec.Replicas)) + mb.RecordK8sReplicationControllerAvailableDataPoint(ts, int64(rc.Status.AvailableReplicas)) } - rb := imetadataphase.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) + rb := mb.NewResourceBuilder() rb.SetK8sNamespaceName(rc.Namespace) rb.SetK8sReplicationcontrollerName(rc.Name) rb.SetK8sReplicationcontrollerUID(string(rc.UID)) rb.SetOpencensusResourcetype("k8s") - return mbphase.Emit(imetadataphase.WithResource(rb.Emit())) + mb.EmitForResource(metadata.WithResource(rb.Emit())) } func GetMetadata(rc *corev1.ReplicationController) map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata { diff --git a/receiver/k8sclusterreceiver/internal/replicationcontroller/replicationcontrollers_test.go b/receiver/k8sclusterreceiver/internal/replicationcontroller/replicationcontrollers_test.go index fe0a765fe938..bfdaefd2274e 100644 --- a/receiver/k8sclusterreceiver/internal/replicationcontroller/replicationcontrollers_test.go +++ b/receiver/k8sclusterreceiver/internal/replicationcontroller/replicationcontrollers_test.go @@ -6,36 +6,25 @@ package replicationcontroller import ( "path/filepath" "testing" + "time" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/receiver/receivertest" - corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils" ) func TestReplicationController(t *testing.T) { + rc := testutils.NewReplicationController("1") - rc := &corev1.ReplicationController{ - ObjectMeta: v1.ObjectMeta{ - Name: "test-replicationcontroller-1", - Namespace: "test-namespace", - UID: "test-replicationcontroller-1-uid", - Labels: map[string]string{ - "app": "my-app", - "version": "v1", - }, - }, - Spec: corev1.ReplicationControllerSpec{ - Replicas: func() *int32 { i := int32(1); return &i }(), - }, - Status: corev1.ReplicationControllerStatus{AvailableReplicas: 2}, - } - - m := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), rc) + ts := pcommon.Timestamp(time.Now().UnixNano()) + mb := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings()) + RecordMetrics(mb, rc, ts) + m := mb.Emit() expected, err := golden.ReadMetrics(filepath.Join("testdata", "expected.yaml")) require.NoError(t, err) diff --git a/receiver/k8sclusterreceiver/internal/resourcequota/resourcequotas.go b/receiver/k8sclusterreceiver/internal/resourcequota/resourcequotas.go index fc0921357389..6d286a95c2c4 100644 --- a/receiver/k8sclusterreceiver/internal/resourcequota/resourcequotas.go +++ b/receiver/k8sclusterreceiver/internal/resourcequota/resourcequotas.go @@ -5,20 +5,14 @@ package resourcequota // import "github.com/open-telemetry/opentelemetry-collect import ( "strings" - "time" "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" corev1 "k8s.io/api/core/v1" - imetadata "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" ) -func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadata.MetricsBuilderConfig, rq *corev1.ResourceQuota) pmetric.Metrics { - mb := imetadata.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) - +func RecordMetrics(mb *metadata.MetricsBuilder, rq *corev1.ResourceQuota, ts pcommon.Timestamp) { for k, v := range rq.Status.Hard { val := v.Value() if strings.HasSuffix(string(k), ".cpu") { @@ -35,10 +29,10 @@ func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadata.Metr mb.RecordK8sResourceQuotaUsedDataPoint(ts, val, string(k)) } - rb := imetadata.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) + rb := mb.NewResourceBuilder() rb.SetK8sResourcequotaUID(string(rq.UID)) rb.SetK8sResourcequotaName(rq.Name) rb.SetK8sNamespaceName(rq.Namespace) rb.SetOpencensusResourcetype("k8s") - return mb.Emit(imetadata.WithResource(rb.Emit())) + mb.EmitForResource(metadata.WithResource(rb.Emit())) } diff --git a/receiver/k8sclusterreceiver/internal/resourcequota/resourcequotas_test.go b/receiver/k8sclusterreceiver/internal/resourcequota/resourcequotas_test.go index 27e92c0fb283..3ee8e638e809 100644 --- a/receiver/k8sclusterreceiver/internal/resourcequota/resourcequotas_test.go +++ b/receiver/k8sclusterreceiver/internal/resourcequota/resourcequotas_test.go @@ -6,22 +6,24 @@ package resourcequota import ( "path/filepath" "testing" + "time" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/receiver/receivertest" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils" ) func TestRequestQuotaMetrics(t *testing.T) { - rq := newResourceQuota("1") - m := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), rq) + rq := testutils.NewResourceQuota("1") + ts := pcommon.Timestamp(time.Now().UnixNano()) + mb := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings()) + RecordMetrics(mb, rq, ts) + m := mb.Emit() expected, err := golden.ReadMetrics(filepath.Join("testdata", "expected.yaml")) require.NoError(t, err) @@ -34,25 +36,3 @@ func TestRequestQuotaMetrics(t *testing.T) { ), ) } - -func newResourceQuota(id string) *corev1.ResourceQuota { - return &corev1.ResourceQuota{ - ObjectMeta: v1.ObjectMeta{ - Name: "test-resourcequota-" + id, - UID: types.UID("test-resourcequota-" + id + "-uid"), - Namespace: "test-namespace", - Labels: map[string]string{ - "foo": "bar", - "foo1": "", - }, - }, - Status: corev1.ResourceQuotaStatus{ - Hard: corev1.ResourceList{ - "requests.cpu": *resource.NewQuantity(2, resource.DecimalSI), - }, - Used: corev1.ResourceList{ - "requests.cpu": *resource.NewQuantity(1, resource.DecimalSI), - }, - }, - } -} diff --git a/receiver/k8sclusterreceiver/internal/statefulset/statefulsets.go b/receiver/k8sclusterreceiver/internal/statefulset/statefulsets.go index 97f42c5598e7..e1d4b29e1ce3 100644 --- a/receiver/k8sclusterreceiver/internal/statefulset/statefulsets.go +++ b/receiver/k8sclusterreceiver/internal/statefulset/statefulsets.go @@ -4,11 +4,7 @@ package statefulset // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/statefulset" import ( - "time" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" appsv1 "k8s.io/api/apps/v1" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" @@ -39,22 +35,20 @@ func Transform(statefulset *appsv1.StatefulSet) *appsv1.StatefulSet { } } -func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadata.MetricsBuilderConfig, ss *appsv1.StatefulSet) pmetric.Metrics { +func RecordMetrics(mb *imetadata.MetricsBuilder, ss *appsv1.StatefulSet, ts pcommon.Timestamp) { if ss.Spec.Replicas == nil { - return pmetric.NewMetrics() + return } - mb := imetadata.NewMetricsBuilder(metricsBuilderConfig, set) - ts := pcommon.NewTimestampFromTime(time.Now()) mb.RecordK8sStatefulsetDesiredPodsDataPoint(ts, int64(*ss.Spec.Replicas)) mb.RecordK8sStatefulsetReadyPodsDataPoint(ts, int64(ss.Status.ReadyReplicas)) mb.RecordK8sStatefulsetCurrentPodsDataPoint(ts, int64(ss.Status.CurrentReplicas)) mb.RecordK8sStatefulsetUpdatedPodsDataPoint(ts, int64(ss.Status.UpdatedReplicas)) - rb := imetadata.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes) + rb := mb.NewResourceBuilder() rb.SetK8sStatefulsetUID(string(ss.UID)) rb.SetK8sStatefulsetName(ss.Name) rb.SetK8sNamespaceName(ss.Namespace) rb.SetOpencensusResourcetype("k8s") - return mb.Emit(imetadata.WithResource(rb.Emit())) + mb.EmitForResource(imetadata.WithResource(rb.Emit())) } func GetMetadata(ss *appsv1.StatefulSet) map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata { diff --git a/receiver/k8sclusterreceiver/internal/statefulset/statefulsets_test.go b/receiver/k8sclusterreceiver/internal/statefulset/statefulsets_test.go index 3fda62749978..eac806a3f851 100644 --- a/receiver/k8sclusterreceiver/internal/statefulset/statefulsets_test.go +++ b/receiver/k8sclusterreceiver/internal/statefulset/statefulsets_test.go @@ -5,27 +5,32 @@ package statefulset import ( "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/receiver/receivertest" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils" ) func TestStatefulsetMetrics(t *testing.T) { - ss := newStatefulset("1") + ss := testutils.NewStatefulset("1") - actualResourceMetrics := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), ss) + ts := pcommon.Timestamp(time.Now().UnixNano()) + mb := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings()) + RecordMetrics(mb, ss, ts) + m := mb.Emit() - require.Equal(t, 1, actualResourceMetrics.ResourceMetrics().Len()) - require.Equal(t, 4, actualResourceMetrics.MetricCount()) + require.Equal(t, 1, m.ResourceMetrics().Len()) + require.Equal(t, 4, m.MetricCount()) - rm := actualResourceMetrics.ResourceMetrics().At(0) + rm := m.ResourceMetrics().At(0) assert.Equal(t, map[string]interface{}{ "k8s.statefulset.uid": "test-statefulset-1-uid", @@ -48,7 +53,7 @@ func TestStatefulsetMetrics(t *testing.T) { } func TestStatefulsetMetadata(t *testing.T) { - ss := newStatefulset("1") + ss := testutils.NewStatefulset("1") actualMetadata := GetMetadata(ss) @@ -73,31 +78,6 @@ func TestStatefulsetMetadata(t *testing.T) { ) } -func newStatefulset(id string) *appsv1.StatefulSet { - desired := int32(10) - return &appsv1.StatefulSet{ - ObjectMeta: v1.ObjectMeta{ - Name: "test-statefulset-" + id, - Namespace: "test-namespace", - UID: types.UID("test-statefulset-" + id + "-uid"), - Labels: map[string]string{ - "foo": "bar", - "foo1": "", - }, - }, - Spec: appsv1.StatefulSetSpec{ - Replicas: &desired, - }, - Status: appsv1.StatefulSetStatus{ - ReadyReplicas: 7, - CurrentReplicas: 5, - UpdatedReplicas: 3, - CurrentRevision: "current_revision", - UpdateRevision: "update_revision", - }, - } -} - func TestTransform(t *testing.T) { orig := &appsv1.StatefulSet{ ObjectMeta: v1.ObjectMeta{ diff --git a/receiver/k8sclusterreceiver/internal/testutils/objects.go b/receiver/k8sclusterreceiver/internal/testutils/objects.go index f8f67d8a0eda..8c5ef60f22e6 100644 --- a/receiver/k8sclusterreceiver/internal/testutils/objects.go +++ b/receiver/k8sclusterreceiver/internal/testutils/objects.go @@ -292,3 +292,105 @@ func WithOwnerReferences(or []v1.OwnerReference, obj interface{}) interface{} { } return obj } + +func NewNamespace(id string) *corev1.Namespace { + return &corev1.Namespace{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-namespace-" + id, + UID: types.UID("test-namespace-" + id + "-uid"), + Labels: map[string]string{ + "foo": "bar", + "foo1": "", + }, + }, + Status: corev1.NamespaceStatus{ + Phase: corev1.NamespaceTerminating, + }, + } +} + +func NewReplicationController(id string) *corev1.ReplicationController { + return &corev1.ReplicationController{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-replicationcontroller-" + id, + Namespace: "test-namespace", + UID: types.UID("test-replicationcontroller-" + id + "-uid"), + Labels: map[string]string{ + "app": "my-app", + "version": "v1", + }, + }, + Spec: corev1.ReplicationControllerSpec{ + Replicas: func() *int32 { i := int32(1); return &i }(), + }, + Status: corev1.ReplicationControllerStatus{AvailableReplicas: 2}, + } +} + +func NewResourceQuota(id string) *corev1.ResourceQuota { + return &corev1.ResourceQuota{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-resourcequota-" + id, + UID: types.UID("test-resourcequota-" + id + "-uid"), + Namespace: "test-namespace", + Labels: map[string]string{ + "foo": "bar", + "foo1": "", + }, + }, + Status: corev1.ResourceQuotaStatus{ + Hard: corev1.ResourceList{ + "requests.cpu": *resource.NewQuantity(2, resource.DecimalSI), + }, + Used: corev1.ResourceList{ + "requests.cpu": *resource.NewQuantity(1, resource.DecimalSI), + }, + }, + } +} + +func NewStatefulset(id string) *appsv1.StatefulSet { + desired := int32(10) + return &appsv1.StatefulSet{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-statefulset-" + id, + Namespace: "test-namespace", + UID: types.UID("test-statefulset-" + id + "-uid"), + Labels: map[string]string{ + "foo": "bar", + "foo1": "", + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: &desired, + }, + Status: appsv1.StatefulSetStatus{ + ReadyReplicas: 7, + CurrentReplicas: 5, + UpdatedReplicas: 3, + CurrentRevision: "current_revision", + UpdateRevision: "update_revision", + }, + } +} + +func NewCronJob(id string) *batchv1.CronJob { + return &batchv1.CronJob{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-cronjob-" + id, + Namespace: "test-namespace", + UID: types.UID("test-cronjob-" + id + "-uid"), + Labels: map[string]string{ + "foo": "bar", + "foo1": "", + }, + }, + Spec: batchv1.CronJobSpec{ + Schedule: "schedule", + ConcurrencyPolicy: "concurrency_policy", + }, + Status: batchv1.CronJobStatus{ + Active: []corev1.ObjectReference{{}, {}}, + }, + } +} diff --git a/receiver/k8sclusterreceiver/receiver.go b/receiver/k8sclusterreceiver/receiver.go index ea8977b35683..6294402f3827 100644 --- a/receiver/k8sclusterreceiver/receiver.go +++ b/receiver/k8sclusterreceiver/receiver.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/receiver" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/collection" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" ) @@ -25,6 +26,7 @@ const ( var _ receiver.Metrics = (*kubernetesReceiver)(nil) type kubernetesReceiver struct { + dataCollector *collection.DataCollector resourceWatcher *resourceWatcher config *Config @@ -102,8 +104,7 @@ func (kr *kubernetesReceiver) dispatchMetrics(ctx context.Context) { return } - now := time.Now() - mds := kr.resourceWatcher.dataCollector.CollectMetricData(now) + mds := kr.dataCollector.CollectMetricData(time.Now()) c := kr.obsrecv.StartMetricsOp(ctx) @@ -163,8 +164,11 @@ func newReceiver(_ context.Context, set receiver.CreateSettings, cfg component.C if err != nil { return nil, err } + ms := metadata.NewStore() return &kubernetesReceiver{ - resourceWatcher: newResourceWatcher(set, rCfg), + dataCollector: collection.NewDataCollector(set, ms, rCfg.MetricsBuilderConfig, + rCfg.NodeConditionTypesToReport, rCfg.AllocatableTypesToReport), + resourceWatcher: newResourceWatcher(set, rCfg, ms), settings: set, config: rCfg, obsrecv: obsrecv, diff --git a/receiver/k8sclusterreceiver/watcher.go b/receiver/k8sclusterreceiver/watcher.go index 4528e065d096..3f9926738fdb 100644 --- a/receiver/k8sclusterreceiver/watcher.go +++ b/receiver/k8sclusterreceiver/watcher.go @@ -18,6 +18,12 @@ import ( "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" "go.uber.org/zap/zapcore" + appsv1 "k8s.io/api/apps/v1" + autoscalingv2 "k8s.io/api/autoscaling/v2" + autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2" + batchv1 "k8s.io/api/batch/v1" + batchv1beta1 "k8s.io/api/batch/v1beta1" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/informers" @@ -26,9 +32,18 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/collection" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/cronjob" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/demonset" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/deployment" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/hpa" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/jobs" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/node" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/pod" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/replicaset" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/replicationcontroller" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/statefulset" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/utils" ) @@ -41,7 +56,7 @@ type resourceWatcher struct { client kubernetes.Interface osQuotaClient quotaclientset.Interface informerFactories []sharedInformer - dataCollector *collection.DataCollector + metadataStore *metadata.Store logger *zap.Logger sampledLogger *zap.Logger metadataConsumers []metadataConsumer @@ -59,7 +74,7 @@ type resourceWatcher struct { type metadataConsumer func(metadata []*experimentalmetricmetadata.MetadataUpdate) error // newResourceWatcher creates a Kubernetes resource watcher. -func newResourceWatcher(set receiver.CreateSettings, cfg *Config) *resourceWatcher { +func newResourceWatcher(set receiver.CreateSettings, cfg *Config, metadataStore *metadata.Store) *resourceWatcher { // Create a sampled logger for error messages. core := zapcore.NewSamplerWithOptions( set.Logger.Core(), @@ -72,7 +87,7 @@ func newResourceWatcher(set receiver.CreateSettings, cfg *Config) *resourceWatch return &resourceWatcher{ logger: set.Logger, sampledLogger: sampledLogger, - dataCollector: collection.NewDataCollector(set, cfg.MetricsBuilderConfig, cfg.NodeConditionTypesToReport, cfg.AllocatableTypesToReport), + metadataStore: metadataStore, initialSyncDone: &atomic.Bool{}, initialSyncTimedOut: &atomic.Bool{}, initialTimeout: defaultInitialSyncTimeout, @@ -239,30 +254,22 @@ func (rw *resourceWatcher) setupInformer(gvk schema.GroupVersionKind, informer c _, err = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: rw.onAdd, UpdateFunc: rw.onUpdate, - DeleteFunc: rw.onDelete, }) if err != nil { rw.logger.Error("error adding event handler to informer", zap.Error(err)) } - rw.dataCollector.SetupMetadataStore(gvk, informer.GetStore()) + rw.metadataStore.Setup(gvk, informer.GetStore()) } func (rw *resourceWatcher) onAdd(obj interface{}) { rw.waitForInitialInformerSync() - rw.dataCollector.SyncMetrics(obj) // Sync metadata only if there's at least one destination for it to sent. if !rw.hasDestination() { return } - newMetadata := rw.dataCollector.SyncMetadata(obj) - rw.syncMetadataUpdate(map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{}, newMetadata) -} - -func (rw *resourceWatcher) onDelete(obj interface{}) { - rw.waitForInitialInformerSync() - rw.dataCollector.RemoveFromMetricsStore(obj) + rw.syncMetadataUpdate(map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{}, rw.objMetadata(obj)) } func (rw *resourceWatcher) hasDestination() bool { @@ -271,18 +278,44 @@ func (rw *resourceWatcher) hasDestination() bool { func (rw *resourceWatcher) onUpdate(oldObj, newObj interface{}) { rw.waitForInitialInformerSync() - // Sync metrics from the new object - rw.dataCollector.SyncMetrics(newObj) // Sync metadata only if there's at least one destination for it to sent. if !rw.hasDestination() { return } - oldMetadata := rw.dataCollector.SyncMetadata(oldObj) - newMetadata := rw.dataCollector.SyncMetadata(newObj) + rw.syncMetadataUpdate(rw.objMetadata(oldObj), rw.objMetadata(newObj)) +} - rw.syncMetadataUpdate(oldMetadata, newMetadata) +// objMetadata returns the metadata for the given object. +func (rw *resourceWatcher) objMetadata(obj interface{}) map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata { + switch o := obj.(type) { + case *corev1.Pod: + return pod.GetMetadata(o, rw.metadataStore, rw.logger) + case *corev1.Node: + return node.GetMetadata(o) + case *corev1.ReplicationController: + return replicationcontroller.GetMetadata(o) + case *appsv1.Deployment: + return deployment.GetMetadata(o) + case *appsv1.ReplicaSet: + return replicaset.GetMetadata(o) + case *appsv1.DaemonSet: + return demonset.GetMetadata(o) + case *appsv1.StatefulSet: + return statefulset.GetMetadata(o) + case *batchv1.Job: + return jobs.GetMetadata(o) + case *batchv1.CronJob: + return cronjob.GetMetadata(o) + case *batchv1beta1.CronJob: + return cronjob.GetMetadataBeta(o) + case *autoscalingv2.HorizontalPodAutoscaler: + return hpa.GetMetadata(o) + case *autoscalingv2beta2.HorizontalPodAutoscaler: + return hpa.GetMetadataBeta(o) + } + return nil } func (rw *resourceWatcher) waitForInitialInformerSync() { diff --git a/receiver/k8sclusterreceiver/watcher_test.go b/receiver/k8sclusterreceiver/watcher_test.go index 6a70ffbcf028..f4bbbb72e9dd 100644 --- a/receiver/k8sclusterreceiver/watcher_test.go +++ b/receiver/k8sclusterreceiver/watcher_test.go @@ -13,17 +13,28 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/collection" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/maps" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils" ) +var commonPodMetadata = map[string]string{ + "foo": "bar", + "foo1": "", + "pod.creation_timestamp": "0001-01-01T00:00:00Z", +} + func TestSetupMetadataExporters(t *testing.T) { type fields struct { metadataConsumers []metadataConsumer @@ -198,7 +209,7 @@ func TestPrepareSharedInformerFactory(t *testing.T) { rw := &resourceWatcher{ client: newFakeClientWithAllResources(), logger: obsLogger, - dataCollector: collection.NewDataCollector(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), []string{}, []string{}), + metadataStore: metadata.NewStore(), config: &Config{}, } @@ -236,7 +247,7 @@ func TestSyncMetadataAndEmitEntityEvents(t *testing.T) { origPod := pods[0] updatedPod := getUpdatedPod(origPod) - rw := newResourceWatcher(receivertest.NewNopCreateSettings(), &Config{}) + rw := newResourceWatcher(receivertest.NewNopCreateSettings(), &Config{}, metadata.NewStore()) rw.entityLogConsumer = logsConsumer step1 := time.Now() @@ -245,25 +256,25 @@ func TestSyncMetadataAndEmitEntityEvents(t *testing.T) { // as a log record. // Pod is created. - rw.syncMetadataUpdate(nil, rw.dataCollector.SyncMetadata(origPod)) + rw.syncMetadataUpdate(nil, rw.objMetadata(origPod)) step2 := time.Now() // Pod is updated. - rw.syncMetadataUpdate(rw.dataCollector.SyncMetadata(origPod), rw.dataCollector.SyncMetadata(updatedPod)) + rw.syncMetadataUpdate(rw.objMetadata(origPod), rw.objMetadata(updatedPod)) step3 := time.Now() // Pod is updated again, but nothing changed in the pod. // Should still result in entity event because they are emitted even // if the entity is not changed. - rw.syncMetadataUpdate(rw.dataCollector.SyncMetadata(updatedPod), rw.dataCollector.SyncMetadata(updatedPod)) + rw.syncMetadataUpdate(rw.objMetadata(updatedPod), rw.objMetadata(updatedPod)) step4 := time.Now() // Change pod's state back to original - rw.syncMetadataUpdate(rw.dataCollector.SyncMetadata(updatedPod), rw.dataCollector.SyncMetadata(origPod)) + rw.syncMetadataUpdate(rw.objMetadata(updatedPod), rw.objMetadata(origPod)) step5 := time.Now() // Delete the pod - rw.syncMetadataUpdate(rw.dataCollector.SyncMetadata(origPod), nil) + rw.syncMetadataUpdate(rw.objMetadata(origPod), nil) step6 := time.Now() // Must have 5 entity events. @@ -308,3 +319,266 @@ func TestSyncMetadataAndEmitEntityEvents(t *testing.T) { assert.EqualValues(t, expected, lr.Attributes().AsRaw()) assert.WithinRange(t, lr.Timestamp().AsTime(), step5, step6) } + +func TestObjMetadata(t *testing.T) { + tests := []struct { + name string + metadataStore *metadata.Store + resource interface{} + want map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata + }{ + { + name: "Pod and container metadata simple case", + metadataStore: metadata.NewStore(), + resource: testutils.NewPodWithContainer( + "0", + testutils.NewPodSpecWithContainer("container-name"), + testutils.NewPodStatusWithContainer("container-name", "container-id"), + ), + want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ + experimentalmetricmetadata.ResourceID("test-pod-0-uid"): { + EntityType: "k8s.pod", + ResourceIDKey: "k8s.pod.uid", + ResourceID: "test-pod-0-uid", + Metadata: commonPodMetadata, + }, + experimentalmetricmetadata.ResourceID("container-id"): { + EntityType: "container", + ResourceIDKey: "container.id", + ResourceID: "container-id", + Metadata: map[string]string{ + "container.status": "running", + }, + }, + }, + }, + { + name: "Pod with Owner Reference", + metadataStore: metadata.NewStore(), + resource: testutils.WithOwnerReferences([]metav1.OwnerReference{ + { + Kind: "StatefulSet", + Name: "test-statefulset-0", + UID: "test-statefulset-0-uid", + }, + }, testutils.NewPodWithContainer("0", &corev1.PodSpec{}, &corev1.PodStatus{})), + want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ + experimentalmetricmetadata.ResourceID("test-pod-0-uid"): { + EntityType: "k8s.pod", + ResourceIDKey: "k8s.pod.uid", + ResourceID: "test-pod-0-uid", + Metadata: allPodMetadata(map[string]string{ + "k8s.workload.kind": "StatefulSet", + "k8s.workload.name": "test-statefulset-0", + "k8s.statefulset.name": "test-statefulset-0", + "k8s.statefulset.uid": "test-statefulset-0-uid", + }), + }, + }, + }, + { + name: "Pod with Service metadata", + metadataStore: func() *metadata.Store { + ms := metadata.NewStore() + ms.Setup(gvk.Service, &testutils.MockStore{ + Cache: map[string]interface{}{ + "test-namespace/test-service": &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + Namespace: "test-namespace", + UID: "test-service-uid", + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{ + "k8s-app": "my-app", + }, + }, + }, + }, + }) + return ms + }(), + resource: podWithAdditionalLabels( + map[string]string{"k8s-app": "my-app"}, + testutils.NewPodWithContainer("0", &corev1.PodSpec{}, &corev1.PodStatus{}), + ), + want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ + experimentalmetricmetadata.ResourceID("test-pod-0-uid"): { + EntityType: "k8s.pod", + ResourceIDKey: "k8s.pod.uid", + ResourceID: "test-pod-0-uid", + Metadata: allPodMetadata(map[string]string{ + "k8s.service.test-service": "", + "k8s-app": "my-app", + }), + }, + }, + }, + { + name: "Daemonset simple case", + metadataStore: &metadata.Store{}, + resource: testutils.NewDaemonset("1"), + want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ + experimentalmetricmetadata.ResourceID("test-daemonset-1-uid"): { + EntityType: "k8s.daemonset", + ResourceIDKey: "k8s.daemonset.uid", + ResourceID: "test-daemonset-1-uid", + Metadata: map[string]string{ + "k8s.workload.kind": "DaemonSet", + "k8s.workload.name": "test-daemonset-1", + "daemonset.creation_timestamp": "0001-01-01T00:00:00Z", + }, + }, + }, + }, + { + name: "Deployment simple case", + metadataStore: &metadata.Store{}, + resource: testutils.NewDeployment("1"), + want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ + experimentalmetricmetadata.ResourceID("test-deployment-1-uid"): { + EntityType: "k8s.deployment", + ResourceIDKey: "k8s.deployment.uid", + ResourceID: "test-deployment-1-uid", + Metadata: map[string]string{ + "k8s.workload.kind": "Deployment", + "k8s.workload.name": "test-deployment-1", + "k8s.deployment.name": "test-deployment-1", + "deployment.creation_timestamp": "0001-01-01T00:00:00Z", + }, + }, + }, + }, + { + name: "HPA simple case", + metadataStore: &metadata.Store{}, + resource: testutils.NewHPA("1"), + want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ + experimentalmetricmetadata.ResourceID("test-hpa-1-uid"): { + EntityType: "k8s.hpa", + ResourceIDKey: "k8s.hpa.uid", + ResourceID: "test-hpa-1-uid", + Metadata: map[string]string{ + "k8s.workload.kind": "HPA", + "k8s.workload.name": "test-hpa-1", + "hpa.creation_timestamp": "0001-01-01T00:00:00Z", + }, + }, + }, + }, + { + name: "Job simple case", + metadataStore: &metadata.Store{}, + resource: testutils.NewJob("1"), + want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ + experimentalmetricmetadata.ResourceID("test-job-1-uid"): { + EntityType: "k8s.job", + ResourceIDKey: "k8s.job.uid", + ResourceID: "test-job-1-uid", + Metadata: map[string]string{ + "foo": "bar", + "foo1": "", + "k8s.workload.kind": "Job", + "k8s.workload.name": "test-job-1", + "job.creation_timestamp": "0001-01-01T00:00:00Z", + }, + }, + }, + }, + { + name: "Node simple case", + metadataStore: &metadata.Store{}, + resource: testutils.NewNode("1"), + want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ + experimentalmetricmetadata.ResourceID("test-node-1-uid"): { + EntityType: "k8s.node", + ResourceIDKey: "k8s.node.uid", + ResourceID: "test-node-1-uid", + Metadata: map[string]string{ + "foo": "bar", + "foo1": "", + "k8s.node.name": "test-node-1", + "node.creation_timestamp": "0001-01-01T00:00:00Z", + }, + }, + }, + }, + { + name: "ReplicaSet simple case", + metadataStore: &metadata.Store{}, + resource: testutils.NewReplicaSet("1"), + want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ + experimentalmetricmetadata.ResourceID("test-replicaset-1-uid"): { + EntityType: "k8s.replicaset", + ResourceIDKey: "k8s.replicaset.uid", + ResourceID: "test-replicaset-1-uid", + Metadata: map[string]string{ + "foo": "bar", + "foo1": "", + "k8s.workload.kind": "ReplicaSet", + "k8s.workload.name": "test-replicaset-1", + "replicaset.creation_timestamp": "0001-01-01T00:00:00Z", + }, + }, + }, + }, + { + name: "ReplicationController simple case", + metadataStore: &metadata.Store{}, + resource: &corev1.ReplicationController{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-replicationcontroller-1", + Namespace: "test-namespace", + UID: types.UID("test-replicationcontroller-1-uid"), + }, + }, + want: map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{ + experimentalmetricmetadata.ResourceID("test-replicationcontroller-1-uid"): { + EntityType: "k8s.replicationcontroller", + ResourceIDKey: "k8s.replicationcontroller.uid", + ResourceID: "test-replicationcontroller-1-uid", + Metadata: map[string]string{ + "k8s.workload.kind": "ReplicationController", + "k8s.workload.name": "test-replicationcontroller-1", + "replicationcontroller.creation_timestamp": "0001-01-01T00:00:00Z", + }, + }, + }, + }, + } + + for _, tt := range tests { + observedLogger, _ := observer.New(zapcore.WarnLevel) + set := receivertest.NewNopCreateSettings() + set.TelemetrySettings.Logger = zap.New(observedLogger) + t.Run(tt.name, func(t *testing.T) { + dc := &resourceWatcher{metadataStore: tt.metadataStore} + + actual := dc.objMetadata(tt.resource) + require.Equal(t, len(tt.want), len(actual)) + + for key, item := range tt.want { + got, exists := actual[key] + require.True(t, exists) + require.Equal(t, *item, *got) + } + }) + } +} + +var allPodMetadata = func(metadata map[string]string) map[string]string { + out := maps.MergeStringMaps(metadata, commonPodMetadata) + return out +} + +func podWithAdditionalLabels(labels map[string]string, pod *corev1.Pod) interface{} { + if pod.Labels == nil { + pod.Labels = make(map[string]string, len(labels)) + } + + for k, v := range labels { + pod.Labels[k] = v + } + + return pod +}