Skip to content

Commit

Permalink
[receiver/k8scluster] Do not keep metrics in memory
Browse files Browse the repository at this point in the history
Construct metrics on every scrape instead. We keep k8s objects in cache anyway, so we can use that instead of the pre-built metrics. This reduces RAM utilization.

This also allows us extracting a metrics builder instance and do not create it on every scrape. This is the recommended approach that all other receivers follow. It ensures that any warnings defined in the metadata.yaml will be displayed only once, not on every scrape interval.
  • Loading branch information
dmitryax committed Aug 5, 2023
1 parent b149984 commit fe8c6f3
Show file tree
Hide file tree
Showing 37 changed files with 842 additions and 966 deletions.
17 changes: 17 additions & 0 deletions .chloggen/k8sclusterreceiver-improve-memory-utilization.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: k8sclusterreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Reduce memory utilization

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [24769]

change_logs: [user]
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,43 @@ package clusterresourcequota // import "github.com/open-telemetry/opentelemetry-

import (
"strings"
"time"

quotav1 "github.com/openshift/api/quota/v1"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

imetadataphase "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata"
)

func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadataphase.MetricsBuilderConfig, crq *quotav1.ClusterResourceQuota) pmetric.Metrics {
mbphase := imetadataphase.NewMetricsBuilder(metricsBuilderConfig, set)
ts := pcommon.NewTimestampFromTime(time.Now())

func RecordMetrics(mb *metadata.MetricsBuilder, crq *quotav1.ClusterResourceQuota, ts pcommon.Timestamp) {
for k, v := range crq.Status.Total.Hard {
val := extractValue(k, v)
mbphase.RecordOpenshiftClusterquotaLimitDataPoint(ts, val, string(k))
mb.RecordOpenshiftClusterquotaLimitDataPoint(ts, val, string(k))
}

for k, v := range crq.Status.Total.Used {
val := extractValue(k, v)
mbphase.RecordOpenshiftClusterquotaUsedDataPoint(ts, val, string(k))
mb.RecordOpenshiftClusterquotaUsedDataPoint(ts, val, string(k))
}

for _, ns := range crq.Status.Namespaces {
for k, v := range ns.Status.Hard {
val := extractValue(k, v)
mbphase.RecordOpenshiftAppliedclusterquotaLimitDataPoint(ts, val, ns.Namespace, string(k))
mb.RecordOpenshiftAppliedclusterquotaLimitDataPoint(ts, val, ns.Namespace, string(k))
}

for k, v := range ns.Status.Used {
val := extractValue(k, v)
mbphase.RecordOpenshiftAppliedclusterquotaUsedDataPoint(ts, val, ns.Namespace, string(k))
mb.RecordOpenshiftAppliedclusterquotaUsedDataPoint(ts, val, ns.Namespace, string(k))
}
}

rb := imetadataphase.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes)
rb := mb.NewResourceBuilder()
rb.SetOpenshiftClusterquotaName(crq.Name)
rb.SetOpenshiftClusterquotaUID(string(crq.UID))
rb.SetOpencensusResourcetype("k8s")
return mbphase.Emit(imetadataphase.WithResource(rb.Emit()))
mb.EmitForResource(metadata.WithResource(rb.Emit()))
}

func extractValue(k v1.ResourceName, v resource.Quantity) int64 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ package clusterresourcequota
import (
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/receiver/receivertest"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden"
Expand All @@ -19,7 +21,10 @@ import (
func TestClusterRequestQuotaMetrics(t *testing.T) {
crq := testutils.NewClusterResourceQuota("1")

m := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), crq)
ts := pcommon.Timestamp(time.Now().UnixNano())
mb := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings())
RecordMetrics(mb, crq, ts)
m := mb.Emit()

expected, err := golden.ReadMetrics(filepath.Join("testdata", "expected.yaml"))
require.NoError(t, err)
Expand Down
190 changes: 65 additions & 125 deletions receiver/k8sclusterreceiver/internal/collection/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,24 @@
package collection // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/collection"

import (
"reflect"
"time"

quotav1 "github.com/openshift/api/quota/v1"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/clusterresourcequota"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/cronjob"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/demonset"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/deployment"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/hpa"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/jobs"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata"
Expand All @@ -42,137 +37,82 @@ import (
// TODO: Consider moving some of these constants to
// https://go.opentelemetry.io/collector/blob/main/model/semconv/opentelemetry.go.

// DataCollector wraps around a metricsStore and a metadaStore exposing
// methods to perform on the underlying stores. DataCollector also provides
// an interface to interact with refactored code from SignalFx Agent which is
// confined to the collection package.
// DataCollector emits metrics with CollectMetricData based on the Kubernetes API objects in the metadata store.
type DataCollector struct {
settings receiver.CreateSettings
metricsStore *metricsStore
metadataStore *metadata.Store
nodeConditionsToReport []string
allocatableTypesToReport []string
metricsBuilderConfig metadata.MetricsBuilderConfig
metricsBuilder *metadata.MetricsBuilder
}

// NewDataCollector returns a DataCollector.
func NewDataCollector(set receiver.CreateSettings, metricsBuilderConfig metadata.MetricsBuilderConfig, nodeConditionsToReport, allocatableTypesToReport []string) *DataCollector {
func NewDataCollector(set receiver.CreateSettings, ms *metadata.Store,
metricsBuilderConfig metadata.MetricsBuilderConfig, nodeConditionsToReport, allocatableTypesToReport []string) *DataCollector {
return &DataCollector{
settings: set,
metricsStore: &metricsStore{
metricsCache: make(map[types.UID]pmetric.Metrics),
},
metadataStore: &metadata.Store{},
settings: set,
metadataStore: ms,
nodeConditionsToReport: nodeConditionsToReport,
allocatableTypesToReport: allocatableTypesToReport,
metricsBuilderConfig: metricsBuilderConfig,
}
}

// SetupMetadataStore initializes a metadata store for the kubernetes kind.
func (dc *DataCollector) SetupMetadataStore(gvk schema.GroupVersionKind, store cache.Store) {
dc.metadataStore.Setup(gvk, store)
}

func (dc *DataCollector) RemoveFromMetricsStore(obj interface{}) {
if err := dc.metricsStore.remove(obj.(runtime.Object)); err != nil {
dc.settings.TelemetrySettings.Logger.Error(
"failed to remove from metric cache",
zap.String("obj", reflect.TypeOf(obj).String()),
zap.Error(err),
)
}
}

func (dc *DataCollector) UpdateMetricsStore(obj interface{}, md pmetric.Metrics) {
if err := dc.metricsStore.update(obj.(runtime.Object), md); err != nil {
dc.settings.TelemetrySettings.Logger.Error(
"failed to update metric cache",
zap.String("obj", reflect.TypeOf(obj).String()),
zap.Error(err),
)
metricsBuilder: metadata.NewMetricsBuilder(metricsBuilderConfig, set),
}
}

func (dc *DataCollector) CollectMetricData(currentTime time.Time) pmetric.Metrics {
return dc.metricsStore.getMetricData(currentTime)
}

// SyncMetrics updates the metric store with latest metrics from the kubernetes object.
func (dc *DataCollector) SyncMetrics(obj interface{}) {
var md pmetric.Metrics

switch o := obj.(type) {
case *corev1.Pod:
md = pod.GetMetrics(dc.settings, dc.metricsBuilderConfig, o)
case *corev1.Node:
md = node.GetMetrics(dc.settings, dc.metricsBuilderConfig, o, dc.nodeConditionsToReport, dc.allocatableTypesToReport)
case *corev1.Namespace:
md = namespace.GetMetrics(dc.settings, dc.metricsBuilderConfig, o)
case *corev1.ReplicationController:
md = replicationcontroller.GetMetrics(dc.settings, dc.metricsBuilderConfig, o)
case *corev1.ResourceQuota:
md = resourcequota.GetMetrics(dc.settings, dc.metricsBuilderConfig, o)
case *appsv1.Deployment:
md = deployment.GetMetrics(dc.settings, dc.metricsBuilderConfig, o)
case *appsv1.ReplicaSet:
md = replicaset.GetMetrics(dc.settings, dc.metricsBuilderConfig, o)
case *appsv1.DaemonSet:
md = demonset.GetMetrics(dc.settings, dc.metricsBuilderConfig, o)
case *appsv1.StatefulSet:
md = statefulset.GetMetrics(dc.settings, dc.metricsBuilderConfig, o)
case *batchv1.Job:
md = jobs.GetMetrics(dc.settings, dc.metricsBuilderConfig, o)
case *batchv1.CronJob:
md = cronjob.GetMetrics(dc.settings, dc.metricsBuilderConfig, o)
case *batchv1beta1.CronJob:
md = cronjob.GetMetricsBeta(dc.settings, dc.metricsBuilderConfig, o)
case *autoscalingv2.HorizontalPodAutoscaler:
md = hpa.GetMetrics(dc.settings, dc.metricsBuilderConfig, o)
case *autoscalingv2beta2.HorizontalPodAutoscaler:
md = hpa.GetMetricsBeta(dc.settings, dc.metricsBuilderConfig, o)
case *quotav1.ClusterResourceQuota:
md = clusterresourcequota.GetMetrics(dc.settings, dc.metricsBuilderConfig, o)
default:
return
}

if md.DataPointCount() == 0 {
return
}

dc.UpdateMetricsStore(obj, md)
}

// SyncMetadata updates the metric store with latest metrics from the kubernetes object
func (dc *DataCollector) SyncMetadata(obj interface{}) map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata {
km := map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{}
switch o := obj.(type) {
case *corev1.Pod:
km = pod.GetMetadata(o, dc.metadataStore, dc.settings.TelemetrySettings.Logger)
case *corev1.Node:
km = node.GetMetadata(o)
case *corev1.ReplicationController:
km = replicationcontroller.GetMetadata(o)
case *appsv1.Deployment:
km = deployment.GetMetadata(o)
case *appsv1.ReplicaSet:
km = replicaset.GetMetadata(o)
case *appsv1.DaemonSet:
km = demonset.GetMetadata(o)
case *appsv1.StatefulSet:
km = statefulset.GetMetadata(o)
case *batchv1.Job:
km = jobs.GetMetadata(o)
case *batchv1.CronJob:
km = cronjob.GetMetadata(o)
case *batchv1beta1.CronJob:
km = cronjob.GetMetadataBeta(o)
case *autoscalingv2.HorizontalPodAutoscaler:
km = hpa.GetMetadata(o)
case *autoscalingv2beta2.HorizontalPodAutoscaler:
km = hpa.GetMetadataBeta(o)
}

return km
ts := pcommon.NewTimestampFromTime(currentTime)
customRMs := pmetric.NewResourceMetricsSlice()

dc.metadataStore.ForEach(gvk.Pod, func(o any) {
pod.RecordMetrics(dc.settings.Logger, dc.metricsBuilder, o.(*corev1.Pod), ts)
})
dc.metadataStore.ForEach(gvk.Node, func(o any) {
crm := node.CustomMetrics(dc.settings, dc.metricsBuilder.NewResourceBuilder(), o.(*corev1.Node),
dc.nodeConditionsToReport, dc.allocatableTypesToReport, ts)
if crm.ScopeMetrics().Len() > 0 {
crm.MoveTo(customRMs.AppendEmpty())
}
})
dc.metadataStore.ForEach(gvk.Namespace, func(o any) {
namespace.RecordMetrics(dc.metricsBuilder, o.(*corev1.Namespace), ts)
})
dc.metadataStore.ForEach(gvk.ReplicationController, func(o any) {
replicationcontroller.RecordMetrics(dc.metricsBuilder, o.(*corev1.ReplicationController), ts)
})
dc.metadataStore.ForEach(gvk.ResourceQuota, func(o any) {
resourcequota.RecordMetrics(dc.metricsBuilder, o.(*corev1.ResourceQuota), ts)
})
dc.metadataStore.ForEach(gvk.Deployment, func(o any) {
deployment.RecordMetrics(dc.metricsBuilder, o.(*appsv1.Deployment), ts)
})
dc.metadataStore.ForEach(gvk.ReplicaSet, func(o any) {
replicaset.RecordMetrics(dc.metricsBuilder, o.(*appsv1.ReplicaSet), ts)
})
dc.metadataStore.ForEach(gvk.DaemonSet, func(o any) {
demonset.RecordMetrics(dc.metricsBuilder, o.(*appsv1.DaemonSet), ts)
})
dc.metadataStore.ForEach(gvk.StatefulSet, func(o any) {
statefulset.RecordMetrics(dc.metricsBuilder, o.(*appsv1.StatefulSet), ts)
})
dc.metadataStore.ForEach(gvk.Job, func(o any) {
jobs.RecordMetrics(dc.metricsBuilder, o.(*batchv1.Job), ts)
})
dc.metadataStore.ForEach(gvk.CronJob, func(o any) {
cronjob.RecordMetrics(dc.metricsBuilder, o.(*batchv1.CronJob), ts)
})
dc.metadataStore.ForEach(gvk.CronJobBeta, func(o any) {
cronjob.RecordMetricsBeta(dc.metricsBuilder, o.(*batchv1beta1.CronJob), ts)
})
dc.metadataStore.ForEach(gvk.HorizontalPodAutoscaler, func(o any) {
hpa.RecordMetrics(dc.metricsBuilder, o.(*autoscalingv2.HorizontalPodAutoscaler), ts)
})
dc.metadataStore.ForEach(gvk.HorizontalPodAutoscalerBeta, func(o any) {
hpa.RecordMetricsBeta(dc.metricsBuilder, o.(*autoscalingv2beta2.HorizontalPodAutoscaler), ts)
})
dc.metadataStore.ForEach(gvk.ClusterResourceQuota, func(o any) {
clusterresourcequota.RecordMetrics(dc.metricsBuilder, o.(*quotav1.ClusterResourceQuota), ts)
})

m := dc.metricsBuilder.Emit()
customRMs.MoveAndAppendTo(m.ResourceMetrics())
return m
}
Loading

0 comments on commit fe8c6f3

Please sign in to comment.