Skip to content

Commit

Permalink
[receiver/k8scluster] Do not keep metrics in memory (#24769)
Browse files Browse the repository at this point in the history
Construct metrics on every scrape instead of keeping them in memory and
copy them with modifications on every scrape. We keep k8s objects in
cache anyway, so we can use that instead of the pre-built metrics. This
reduces RAM utilization.

This also allows us to extract a metrics builder instance and not create
it on every scrape. This is the recommended approach that all other
receivers follow. It ensures that any warnings defined in the
metadata.yaml will be displayed only once, not on every scrape interval.
  • Loading branch information
dmitryax authored Aug 5, 2023
1 parent b149984 commit 0fa58d2
Show file tree
Hide file tree
Showing 37 changed files with 842 additions and 966 deletions.
17 changes: 17 additions & 0 deletions .chloggen/k8sclusterreceiver-improve-memory-utilization.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: k8sclusterreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Reduce memory utilization

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [24769]

change_logs: [user]
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,43 @@ package clusterresourcequota // import "github.com/open-telemetry/opentelemetry-

import (
"strings"
"time"

quotav1 "github.com/openshift/api/quota/v1"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

imetadataphase "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata"
)

func GetMetrics(set receiver.CreateSettings, metricsBuilderConfig imetadataphase.MetricsBuilderConfig, crq *quotav1.ClusterResourceQuota) pmetric.Metrics {
mbphase := imetadataphase.NewMetricsBuilder(metricsBuilderConfig, set)
ts := pcommon.NewTimestampFromTime(time.Now())

func RecordMetrics(mb *metadata.MetricsBuilder, crq *quotav1.ClusterResourceQuota, ts pcommon.Timestamp) {
for k, v := range crq.Status.Total.Hard {
val := extractValue(k, v)
mbphase.RecordOpenshiftClusterquotaLimitDataPoint(ts, val, string(k))
mb.RecordOpenshiftClusterquotaLimitDataPoint(ts, val, string(k))
}

for k, v := range crq.Status.Total.Used {
val := extractValue(k, v)
mbphase.RecordOpenshiftClusterquotaUsedDataPoint(ts, val, string(k))
mb.RecordOpenshiftClusterquotaUsedDataPoint(ts, val, string(k))
}

for _, ns := range crq.Status.Namespaces {
for k, v := range ns.Status.Hard {
val := extractValue(k, v)
mbphase.RecordOpenshiftAppliedclusterquotaLimitDataPoint(ts, val, ns.Namespace, string(k))
mb.RecordOpenshiftAppliedclusterquotaLimitDataPoint(ts, val, ns.Namespace, string(k))
}

for k, v := range ns.Status.Used {
val := extractValue(k, v)
mbphase.RecordOpenshiftAppliedclusterquotaUsedDataPoint(ts, val, ns.Namespace, string(k))
mb.RecordOpenshiftAppliedclusterquotaUsedDataPoint(ts, val, ns.Namespace, string(k))
}
}

rb := imetadataphase.NewResourceBuilder(metricsBuilderConfig.ResourceAttributes)
rb := mb.NewResourceBuilder()
rb.SetOpenshiftClusterquotaName(crq.Name)
rb.SetOpenshiftClusterquotaUID(string(crq.UID))
rb.SetOpencensusResourcetype("k8s")
return mbphase.Emit(imetadataphase.WithResource(rb.Emit()))
mb.EmitForResource(metadata.WithResource(rb.Emit()))
}

func extractValue(k v1.ResourceName, v resource.Quantity) int64 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ package clusterresourcequota
import (
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/receiver/receivertest"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden"
Expand All @@ -19,7 +21,10 @@ import (
func TestClusterRequestQuotaMetrics(t *testing.T) {
crq := testutils.NewClusterResourceQuota("1")

m := GetMetrics(receivertest.NewNopCreateSettings(), metadata.DefaultMetricsBuilderConfig(), crq)
ts := pcommon.Timestamp(time.Now().UnixNano())
mb := metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings())
RecordMetrics(mb, crq, ts)
m := mb.Emit()

expected, err := golden.ReadMetrics(filepath.Join("testdata", "expected.yaml"))
require.NoError(t, err)
Expand Down
190 changes: 65 additions & 125 deletions receiver/k8sclusterreceiver/internal/collection/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,24 @@
package collection // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/collection"

import (
"reflect"
"time"

quotav1 "github.com/openshift/api/quota/v1"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/clusterresourcequota"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/cronjob"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/demonset"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/deployment"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/hpa"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/jobs"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata"
Expand All @@ -42,137 +37,82 @@ import (
// TODO: Consider moving some of these constants to
// https://go.opentelemetry.io/collector/blob/main/model/semconv/opentelemetry.go.

// DataCollector wraps around a metricsStore and a metadaStore exposing
// methods to perform on the underlying stores. DataCollector also provides
// an interface to interact with refactored code from SignalFx Agent which is
// confined to the collection package.
// DataCollector emits metrics with CollectMetricData based on the Kubernetes API objects in the metadata store.
type DataCollector struct {
settings receiver.CreateSettings
metricsStore *metricsStore
metadataStore *metadata.Store
nodeConditionsToReport []string
allocatableTypesToReport []string
metricsBuilderConfig metadata.MetricsBuilderConfig
metricsBuilder *metadata.MetricsBuilder
}

// NewDataCollector returns a DataCollector.
func NewDataCollector(set receiver.CreateSettings, metricsBuilderConfig metadata.MetricsBuilderConfig, nodeConditionsToReport, allocatableTypesToReport []string) *DataCollector {
func NewDataCollector(set receiver.CreateSettings, ms *metadata.Store,
metricsBuilderConfig metadata.MetricsBuilderConfig, nodeConditionsToReport, allocatableTypesToReport []string) *DataCollector {
return &DataCollector{
settings: set,
metricsStore: &metricsStore{
metricsCache: make(map[types.UID]pmetric.Metrics),
},
metadataStore: &metadata.Store{},
settings: set,
metadataStore: ms,
nodeConditionsToReport: nodeConditionsToReport,
allocatableTypesToReport: allocatableTypesToReport,
metricsBuilderConfig: metricsBuilderConfig,
}
}

// SetupMetadataStore initializes a metadata store for the kubernetes kind.
func (dc *DataCollector) SetupMetadataStore(gvk schema.GroupVersionKind, store cache.Store) {
dc.metadataStore.Setup(gvk, store)
}

func (dc *DataCollector) RemoveFromMetricsStore(obj interface{}) {
if err := dc.metricsStore.remove(obj.(runtime.Object)); err != nil {
dc.settings.TelemetrySettings.Logger.Error(
"failed to remove from metric cache",
zap.String("obj", reflect.TypeOf(obj).String()),
zap.Error(err),
)
}
}

func (dc *DataCollector) UpdateMetricsStore(obj interface{}, md pmetric.Metrics) {
if err := dc.metricsStore.update(obj.(runtime.Object), md); err != nil {
dc.settings.TelemetrySettings.Logger.Error(
"failed to update metric cache",
zap.String("obj", reflect.TypeOf(obj).String()),
zap.Error(err),
)
metricsBuilder: metadata.NewMetricsBuilder(metricsBuilderConfig, set),
}
}

func (dc *DataCollector) CollectMetricData(currentTime time.Time) pmetric.Metrics {
return dc.metricsStore.getMetricData(currentTime)
}

// SyncMetrics updates the metric store with latest metrics from the kubernetes object.
func (dc *DataCollector) SyncMetrics(obj interface{}) {
var md pmetric.Metrics

switch o := obj.(type) {
case *corev1.Pod:
md = pod.GetMetrics(dc.settings, dc.metricsBuilderConfig, o)
case *corev1.Node:
md = node.GetMetrics(dc.settings, dc.metricsBuilderConfig, o, dc.nodeConditionsToReport, dc.allocatableTypesToReport)
case *corev1.Namespace:
md = namespace.GetMetrics(dc.settings, dc.metricsBuilderConfig, o)
case *corev1.ReplicationController:
md = replicationcontroller.GetMetrics(dc.settings, dc.metricsBuilderConfig, o)
case *corev1.ResourceQuota:
md = resourcequota.GetMetrics(dc.settings, dc.metricsBuilderConfig, o)
case *appsv1.Deployment:
md = deployment.GetMetrics(dc.settings, dc.metricsBuilderConfig, o)
case *appsv1.ReplicaSet:
md = replicaset.GetMetrics(dc.settings, dc.metricsBuilderConfig, o)
case *appsv1.DaemonSet:
md = demonset.GetMetrics(dc.settings, dc.metricsBuilderConfig, o)
case *appsv1.StatefulSet:
md = statefulset.GetMetrics(dc.settings, dc.metricsBuilderConfig, o)
case *batchv1.Job:
md = jobs.GetMetrics(dc.settings, dc.metricsBuilderConfig, o)
case *batchv1.CronJob:
md = cronjob.GetMetrics(dc.settings, dc.metricsBuilderConfig, o)
case *batchv1beta1.CronJob:
md = cronjob.GetMetricsBeta(dc.settings, dc.metricsBuilderConfig, o)
case *autoscalingv2.HorizontalPodAutoscaler:
md = hpa.GetMetrics(dc.settings, dc.metricsBuilderConfig, o)
case *autoscalingv2beta2.HorizontalPodAutoscaler:
md = hpa.GetMetricsBeta(dc.settings, dc.metricsBuilderConfig, o)
case *quotav1.ClusterResourceQuota:
md = clusterresourcequota.GetMetrics(dc.settings, dc.metricsBuilderConfig, o)
default:
return
}

if md.DataPointCount() == 0 {
return
}

dc.UpdateMetricsStore(obj, md)
}

// SyncMetadata updates the metric store with latest metrics from the kubernetes object
func (dc *DataCollector) SyncMetadata(obj interface{}) map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata {
km := map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{}
switch o := obj.(type) {
case *corev1.Pod:
km = pod.GetMetadata(o, dc.metadataStore, dc.settings.TelemetrySettings.Logger)
case *corev1.Node:
km = node.GetMetadata(o)
case *corev1.ReplicationController:
km = replicationcontroller.GetMetadata(o)
case *appsv1.Deployment:
km = deployment.GetMetadata(o)
case *appsv1.ReplicaSet:
km = replicaset.GetMetadata(o)
case *appsv1.DaemonSet:
km = demonset.GetMetadata(o)
case *appsv1.StatefulSet:
km = statefulset.GetMetadata(o)
case *batchv1.Job:
km = jobs.GetMetadata(o)
case *batchv1.CronJob:
km = cronjob.GetMetadata(o)
case *batchv1beta1.CronJob:
km = cronjob.GetMetadataBeta(o)
case *autoscalingv2.HorizontalPodAutoscaler:
km = hpa.GetMetadata(o)
case *autoscalingv2beta2.HorizontalPodAutoscaler:
km = hpa.GetMetadataBeta(o)
}

return km
ts := pcommon.NewTimestampFromTime(currentTime)
customRMs := pmetric.NewResourceMetricsSlice()

dc.metadataStore.ForEach(gvk.Pod, func(o any) {
pod.RecordMetrics(dc.settings.Logger, dc.metricsBuilder, o.(*corev1.Pod), ts)
})
dc.metadataStore.ForEach(gvk.Node, func(o any) {
crm := node.CustomMetrics(dc.settings, dc.metricsBuilder.NewResourceBuilder(), o.(*corev1.Node),
dc.nodeConditionsToReport, dc.allocatableTypesToReport, ts)
if crm.ScopeMetrics().Len() > 0 {
crm.MoveTo(customRMs.AppendEmpty())
}
})
dc.metadataStore.ForEach(gvk.Namespace, func(o any) {
namespace.RecordMetrics(dc.metricsBuilder, o.(*corev1.Namespace), ts)
})
dc.metadataStore.ForEach(gvk.ReplicationController, func(o any) {
replicationcontroller.RecordMetrics(dc.metricsBuilder, o.(*corev1.ReplicationController), ts)
})
dc.metadataStore.ForEach(gvk.ResourceQuota, func(o any) {
resourcequota.RecordMetrics(dc.metricsBuilder, o.(*corev1.ResourceQuota), ts)
})
dc.metadataStore.ForEach(gvk.Deployment, func(o any) {
deployment.RecordMetrics(dc.metricsBuilder, o.(*appsv1.Deployment), ts)
})
dc.metadataStore.ForEach(gvk.ReplicaSet, func(o any) {
replicaset.RecordMetrics(dc.metricsBuilder, o.(*appsv1.ReplicaSet), ts)
})
dc.metadataStore.ForEach(gvk.DaemonSet, func(o any) {
demonset.RecordMetrics(dc.metricsBuilder, o.(*appsv1.DaemonSet), ts)
})
dc.metadataStore.ForEach(gvk.StatefulSet, func(o any) {
statefulset.RecordMetrics(dc.metricsBuilder, o.(*appsv1.StatefulSet), ts)
})
dc.metadataStore.ForEach(gvk.Job, func(o any) {
jobs.RecordMetrics(dc.metricsBuilder, o.(*batchv1.Job), ts)
})
dc.metadataStore.ForEach(gvk.CronJob, func(o any) {
cronjob.RecordMetrics(dc.metricsBuilder, o.(*batchv1.CronJob), ts)
})
dc.metadataStore.ForEach(gvk.CronJobBeta, func(o any) {
cronjob.RecordMetricsBeta(dc.metricsBuilder, o.(*batchv1beta1.CronJob), ts)
})
dc.metadataStore.ForEach(gvk.HorizontalPodAutoscaler, func(o any) {
hpa.RecordMetrics(dc.metricsBuilder, o.(*autoscalingv2.HorizontalPodAutoscaler), ts)
})
dc.metadataStore.ForEach(gvk.HorizontalPodAutoscalerBeta, func(o any) {
hpa.RecordMetricsBeta(dc.metricsBuilder, o.(*autoscalingv2beta2.HorizontalPodAutoscaler), ts)
})
dc.metadataStore.ForEach(gvk.ClusterResourceQuota, func(o any) {
clusterresourcequota.RecordMetrics(dc.metricsBuilder, o.(*quotav1.ClusterResourceQuota), ts)
})

m := dc.metricsBuilder.Emit()
customRMs.MoveAndAppendTo(m.ResourceMetrics())
return m
}
Loading

0 comments on commit 0fa58d2

Please sign in to comment.