Skip to content

Commit

Permalink
update telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuminyi committed Nov 19, 2024
1 parent f68dea9 commit 2df46e1
Show file tree
Hide file tree
Showing 8 changed files with 633 additions and 40 deletions.
86 changes: 47 additions & 39 deletions cmd/cluster-agent/api/v2/series/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ import (
loadstore "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload/loadstore"
"github.com/DataDog/datadog-agent/pkg/telemetry"
"github.com/DataDog/datadog-agent/pkg/util/log"
"k8s.io/apimachinery/pkg/util/wait"
"golang.org/x/time/rate"
"k8s.io/client-go/util/workqueue"
)

const (
subsystem = "autoscaling_workload"
subsystem = "autoscaling_workload"
payloadProcessQPS = 500
payloadProcessRateBurst = 50
)

var (
Expand All @@ -30,37 +32,37 @@ var (
// telemetryWorkloadStoreMemory tracks the total memory usage of the store
telemetryWorkloadStoreMemory = telemetry.NewGaugeWithOpts(
subsystem,
"load_store_memory_usage",
"store_memory_usage",
nil,
"Total memory usage of the store",
commonOpts,
)
telemetryWorkloadMetricEntities = telemetry.NewGaugeWithOpts(
subsystem,
"load_store_metric_entities",
"store_metric_entities",
[]string{"metric"},
"Number of entities by metric names in the store",
commonOpts,
)
telemetryWorkloadNamespaceEntities = telemetry.NewGaugeWithOpts(
subsystem,
"load_store_namespace_entities",
"store_namespace_entities",
[]string{"namespace"},
"Number of entities by namespaces in the store",
commonOpts,
)
telemetryWorkloadJobQueueLength = telemetry.NewGaugeWithOpts(
telemetryWorkloadJobQueueLength = telemetry.NewCounterWithOpts(
subsystem,
"load_store_job_queue_length",
nil,
"store_job_queue_length",
[]string{"status"},
"Length of the job queue",
commonOpts,
)
)

// jobQueue is a wrapper around workqueue.DelayingInterface to make it thread-safe.
type jobQueue struct {
queue workqueue.DelayingInterface
taskQueue workqueue.TypedRateLimitingInterface[*gogen.MetricPayload]
isStarted bool
store loadstore.Store
m sync.Mutex
Expand All @@ -69,66 +71,72 @@ type jobQueue struct {
// newJobQueue creates a new jobQueue with no delay for adding items
func newJobQueue(ctx context.Context) *jobQueue {
q := jobQueue{
queue: workqueue.NewDelayingQueueWithConfig(workqueue.DelayingQueueConfig{
Name: "seriesPayloadJobQueue",
}),
taskQueue: workqueue.NewTypedRateLimitingQueue(workqueue.NewTypedMaxOfRateLimiter(
&workqueue.TypedBucketRateLimiter[*gogen.MetricPayload]{
Limiter: rate.NewLimiter(rate.Limit(payloadProcessQPS), payloadProcessRateBurst),
},
)),
store: loadstore.NewEntityStore(ctx),
isStarted: false,
}
go q.start(ctx)
return &q
}

func (jq *jobQueue) worker() {
for jq.processNextWorkItem() {
}
}

func (jq *jobQueue) start(ctx context.Context) {
jq.m.Lock()
if jq.isStarted {
return
}
jq.isStarted = true
jq.m.Unlock()
defer jq.queue.ShutDown()
go wait.Until(jq.worker, time.Second, ctx.Done())
infoTicker := time.NewTicker(60 * time.Second)
defer jq.taskQueue.ShutDown()
jq.reportTelemetry(ctx)
for {
select {
case <-ctx.Done():
log.Infof("Stopping series payload job queue")
return
case <-infoTicker.C:
info := jq.store.GetStoreInfo()
telemetryWorkloadStoreMemory.Set(float64(info.TotalMemoryUsage))
for k, v := range info.EntityCountByMetric {
telemetryWorkloadMetricEntities.Set(float64(v), k)
}
for k, v := range info.EntityCountByNamespace {
telemetryWorkloadNamespaceEntities.Set(float64(v), k)
}
telemetryWorkloadJobQueueLength.Set(float64(jq.queue.Len()))
log.Debugf("Store info: %+v", info)
default:
jq.processNextWorkItem()
}
}
}

func (jq *jobQueue) processNextWorkItem() bool {
obj, shutdown := jq.queue.Get()
metricPayload, shutdown := jq.taskQueue.Get()
if shutdown {
return false
}
defer jq.queue.Done(obj)
metricPayload, ok := obj.(*gogen.MetricPayload)
if !ok {
log.Errorf("Expected MetricPayload but got %T", obj)
return true
}
defer jq.taskQueue.Done(metricPayload)
telemetryWorkloadJobQueueLength.Inc("processed")
loadstore.ProcessLoadPayload(metricPayload, jq.store)
return true
}

func (jq *jobQueue) addJob(payload *gogen.MetricPayload) {
jq.queue.Add(payload)
jq.taskQueue.Add(payload)
telemetryWorkloadJobQueueLength.Inc("queued")
}

func (jq *jobQueue) reportTelemetry(ctx context.Context) {
go func() {
infoTicker := time.NewTicker(60 * time.Second)
for {
select {
case <-ctx.Done():
return
case <-infoTicker.C:
info := jq.store.GetStoreInfo()
telemetryWorkloadStoreMemory.Set(float64(info.TotalMemoryUsage))
for k, v := range info.EntityCountByMetric {
telemetryWorkloadMetricEntities.Set(float64(v), k)
}
for k, v := range info.EntityCountByNamespace {
telemetryWorkloadNamespaceEntities.Set(float64(v), k)
}
log.Infof("Store info: %+v", info)
}
}
}()
}
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,6 @@ require (
github.com/DataDog/datadog-agent/comp/trace/compression/impl-zstd v0.56.0-rc.3
github.com/DataDog/datadog-agent/pkg/aggregator/ckey v0.59.0-rc.6
github.com/DataDog/datadog-agent/pkg/api v0.57.1
github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/loadstore v0.0.1
github.com/DataDog/datadog-agent/pkg/collector/check/defaults v0.59.0-rc.6
github.com/DataDog/datadog-agent/pkg/config/env v0.59.0-rc.6
github.com/DataDog/datadog-agent/pkg/config/mock v0.58.1
Expand Down
11 changes: 11 additions & 0 deletions pkg/clusteragent/autoscaling/workload/loadstore/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build kubeapiserver

/*
Package loadstore stores local failover metrics for the workload that need autoscaling
*/
package loadstore
77 changes: 77 additions & 0 deletions pkg/clusteragent/autoscaling/workload/loadstore/entity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build kubeapiserver

package loadstore

import (
"fmt"
"time"
"unsafe"
)

// EntityType defines the type of entity.
type EntityType int

// ValueType defines the datatype of metric value.
type ValueType float64

// Enumeration of entity types.
const (
ContainerType EntityType = iota
UnknownType
)

const (
// maxDataPoints is the maximum number of data points to store per entity.
maxDataPoints = 3
// defaultPurgeInterval is the default interval to purge inactive entities.
defaultPurgeInterval = 3 * time.Minute
// defaultExpireInterval is the default interval to expire entities.
defaultExpireInterval = 3 * time.Minute
)

// Entity represents an entity with a type and its attributes.
type Entity struct {
EntityType EntityType
SourceID string
Host string // serie.Host
EntityName string // display_container_name
Namespace string
MetricName string
}

// String returns a string representation of the Entity.
func (e *Entity) String() string {
return fmt.Sprintf(
" Key: %d,"+
" SourceID: %s,"+
" MetricName: %s"+
" EntityName: %s,"+
" EntityType: %d,"+
" Host: %s,"+
" Namespace: %s",
hashEntityToUInt64(e), e.SourceID, e.MetricName, e.EntityName, e.EntityType, e.Host, e.Namespace)
}

// MemoryUsage returns the memory usage of the entity in bytes.
func (e *Entity) MemoryUsage() uint32 {
return uint32(len(e.SourceID)) + uint32(len(e.Host)) + uint32(len(e.EntityName)) + uint32(len(e.Namespace)) + uint32(len(e.MetricName)) + uint32(unsafe.Sizeof(e.EntityType))
}

// EntityValue represents a metric value with a timestamp.
type EntityValue struct {
value ValueType
timestamp Timestamp
}

// String returns a string representation of the EntityValue.
func (ev *EntityValue) String() string {
// Convert the timestamp to a time.Time object assuming the timestamp is in seconds.
// If the timestamp is in milliseconds, use time.UnixMilli(ev.timestamp) instead.
readableTime := time.Unix(int64(ev.timestamp), 0).Local().Format(time.RFC3339)
return fmt.Sprintf("Value: %f, Timestamp: %s", ev.value, readableTime)
}
115 changes: 115 additions & 0 deletions pkg/clusteragent/autoscaling/workload/loadstore/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build kubeapiserver

package loadstore

import (
"math"
"strings"

"github.com/DataDog/agent-payload/v5/gogen"
)

// StoreInfo represents the store information, like memory usage and entity count.
type StoreInfo struct {
TotalMemoryUsage uint64
TotalEntityCount uint64
currentTime Timestamp
EntityCountByMetric map[string]uint64
EntityCountByNamespace map[string]uint64
}

// Store is an interface for in-memory storage of entities and their load metric values.
type Store interface {
// SetEntitiesValues sets the values for the given map
SetEntitiesValues(entities map[*Entity]*EntityValue)

// GetStoreInfo returns the store information.
GetStoreInfo() StoreInfo

// GetEntitiesByNamespace to get all entities and values by namespace
GetEntitiesByNamespace(namespace string) map[*Entity]*EntityValue

// GetEntitiesByMetricName to get all entities and values by load metric name
GetEntitiesByMetricName(metricName string) map[*Entity]*EntityValue

// GetAllMetricNamesWithCount to get all metric names and corresponding entity count
GetAllMetricNamesWithCount() map[string]int64

// GetAllNamespaceNamesWithCount to get all namespace names and corresponding entity count
GetAllNamespaceNamesWithCount() map[string]int64

// GetEntityByHashKey to get entity and latest value by hash key
GetEntityByHashKey(hash uint64) (*Entity, *EntityValue)

//DeleteEntityByHashKey to delete entity by hash key
DeleteEntityByHashKey(hash uint64)
}

// createEntitiesFromPayload is a helper function used for creating entities from the metric payload.
func createEntitiesFromPayload(payload *gogen.MetricPayload) map[*Entity]*EntityValue {
entities := make(map[*Entity]*EntityValue)
splitTag := func(tag string) (key string, value string) {
split := strings.SplitN(tag, ":", 2)
if len(split) < 2 || split[0] == "" || split[1] == "" {
return "", ""
}
return split[0], split[1]
}
for _, series := range payload.Series {
metricName := series.GetMetric()
points := series.GetPoints()
tags := series.GetTags()
resources := series.GetResources()
entity := Entity{
EntityType: UnknownType,
SourceID: "",
Host: "",
EntityName: "",
Namespace: "",
MetricName: metricName,
}
for _, resource := range resources {
if resource.Type == "host" {
entity.Host = resource.Name
}
}
for _, tag := range tags {
k, v := splitTag(tag)
switch k {
case "display_container_name":
entity.EntityName = v
case "kube_namespace":
entity.Namespace = v
case "container_id":
entity.SourceID = v
entity.EntityType = ContainerType
}
}
if entity.MetricName == "" || entity.Host == "" || entity.EntityType == UnknownType || entity.Namespace == "" || entity.SourceID == "" {
continue
}
for _, point := range points {
if point != nil && !math.IsNaN(point.Value) {
entities[&entity] = &EntityValue{
value: ValueType(point.Value),
timestamp: Timestamp(point.Timestamp),
}
}
}
}
return entities
}

// ProcessLoadPayload converts the metric payload and stores the entities and their values in the store.
func ProcessLoadPayload(payload *gogen.MetricPayload, store Store) {
if payload == nil || store == nil {
return
}
entities := createEntitiesFromPayload(payload)
store.SetEntitiesValues(entities)
}
Loading

0 comments on commit 2df46e1

Please sign in to comment.