Skip to content

Commit

Permalink
update query
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuminyi committed Dec 25, 2024
1 parent 0a3ae53 commit c500dc9
Show file tree
Hide file tree
Showing 8 changed files with 330 additions and 326 deletions.
4 changes: 2 additions & 2 deletions cmd/cluster-agent/api/v2/series/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ func (jq *jobQueue) reportTelemetry(ctx context.Context) {
for _, statsResult := range statsResults {
telemetryWorkloadEntities.Set(float64(statsResult.Count),
statsResult.Namespace,
statsResult.Deployment,
statsResult.LoadName)
statsResult.PodOwner,
statsResult.MetricName)
}
}
}
Expand Down
8 changes: 0 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -647,15 +647,7 @@ require (
github.com/DataDog/datadog-agent/comp/trace/compression/impl-gzip v0.59.0
github.com/DataDog/datadog-agent/comp/trace/compression/impl-zstd v0.56.0-rc.3
github.com/DataDog/datadog-agent/pkg/aggregator/ckey v0.59.0-rc.6
<<<<<<< HEAD
github.com/DataDog/datadog-agent/pkg/api v0.59.0
=======
github.com/DataDog/datadog-agent/pkg/api v0.57.1
<<<<<<< HEAD
github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/loadstore v0.0.1
>>>>>>> e5b5b05bf4 (add load store)
=======
>>>>>>> 8416a0180a (support filter by namespace deployment)
github.com/DataDog/datadog-agent/pkg/collector/check/defaults v0.59.0
github.com/DataDog/datadog-agent/pkg/config/env v0.59.0
github.com/DataDog/datadog-agent/pkg/config/mock v0.59.0
Expand Down
84 changes: 49 additions & 35 deletions pkg/clusteragent/autoscaling/workload/loadstore/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,29 @@ package loadstore
import (
"fmt"
"time"
"unsafe"
)

// EntityType defines the type of entity.
type EntityType int
type EntityType int8

type PodOwnerType int8

// ValueType defines the datatype of workload value.
type ValueType float64

// Enumeration of entity types.
const (
ContainerType EntityType = iota
PodType // TODO: PodType is not supported yet
UnknownType
)

// Enumeration of pod owner types which is parsed from tags kube_ownerref_kind
const (
Deployment PodOwnerType = iota
Unsupported
)

const (
// maxDataPoints is the maximum number of data points to store per entity.
maxDataPoints = 3
Expand All @@ -35,33 +43,36 @@ const (
)

// Entity represents an entity with a type and its attributes.
// if entity is a pod, if entity restarts, a new entity will be created because podname is different
// if entity is a container, the entity will be same
type Entity struct {
EntityType EntityType
SourceID string
Host string // serie.Host
EntityName string // display_container_name
Namespace string
LoadName string
Deployment string
EntityType EntityType // required, PodType or ContainerType

// Use display_container_name for EntityName if EntityType is container
// or use podname for entityName if EntityType is pod
// display_container_name = container.Name + pod.Name
// if container is restarted, the display_container_name will be the same
EntityName string // required

Namespace string // required
PodOwnerName string // required, parsed from tags kube_ownerref_name
PodOwnerkind PodOwnerType // required, parsed from tags kube_ownerref_kind
PodName string // required, parsed from tags pod_name
ContainerName string // optional, short container name, empty if EntityType is PodType
MetricName string // required, metric name of workload
}

// String returns a string representation of the Entity.
func (e *Entity) String() string {
return fmt.Sprintf(
" Key: %d,"+
" SourceID: %s,"+
" LoadName: %s"+
" MetricName: %s"+
" EntityName: %s,"+
" EntityType: %d,"+
" Host: %s,"+
" Namespace: %s,"+
" Deployment: %s",
hashEntityToUInt64(e), e.SourceID, e.LoadName, e.EntityName, e.EntityType, e.Host, e.Namespace, e.Deployment)
}

// MemoryUsage returns the memory usage of the entity in bytes.
func (e *Entity) MemoryUsage() uint32 {
return uint32(len(e.SourceID)) + uint32(len(e.Host)) + uint32(len(e.EntityName)) + uint32(len(e.Namespace)) + uint32(len(e.LoadName)) + uint32(unsafe.Sizeof(e.EntityType)) + uint32(len(e.Deployment))
" PodOwnerName: %s"+
" PodOwnerType: %d",
hashEntityToUInt64(e), e.MetricName, e.EntityName, e.EntityType, e.Namespace, e.PodOwnerName, e.PodOwnerkind)
}

// EntityValue represents a value with a timestamp.
Expand All @@ -80,8 +91,7 @@ func (ev *EntityValue) String() string {

// EntityValueQueue represents a queue with a fixed capacity that removes the front element when full
type EntityValueQueue struct {
data []ValueType
avg ValueType
data []*EntityValue
head int
tail int
size int
Expand All @@ -90,7 +100,7 @@ type EntityValueQueue struct {

// pushBack adds an element to the back of the queue.
// If the queue is full, it removes the front element first.
func (q *EntityValueQueue) pushBack(value ValueType) bool {
func (q *EntityValueQueue) pushBack(value *EntityValue) bool {
if q.size == q.capacity {
// Remove the front element
q.head = (q.head + 1) % q.capacity
Expand All @@ -101,24 +111,28 @@ func (q *EntityValueQueue) pushBack(value ValueType) bool {
q.data[q.tail] = value
q.tail = (q.tail + 1) % q.capacity
q.size++
q.avg = q.average()
return true
}

// average calculates the average value of the queue.
func (q *EntityValueQueue) average() ValueType {
// ToSlice converts the EntityValueQueue data to a slice of EntityValue.
func (q *EntityValueQueue) ToSlice() []EntityValue {
if q.size == 0 {
return 0
return []EntityValue{}
}
sum := ValueType(0)
for i := 0; i < q.size; i++ {
index := (q.head + i) % q.capacity
sum += q.data[index]

result := make([]EntityValue, 0, q.size)
if q.head < q.tail {
for _, v := range q.data[q.head:q.tail] {
result = append(result, *v)
}
} else {
for _, v := range q.data[q.head:] {
result = append(result, *v)
}
for _, v := range q.data[:q.tail] {
result = append(result, *v)
}
}
return sum / ValueType(q.size)
}

// value returns the average value of the queue
func (q *EntityValueQueue) value() ValueType {
return q.avg
return result
}
61 changes: 0 additions & 61 deletions pkg/clusteragent/autoscaling/workload/loadstore/filter.go

This file was deleted.

82 changes: 51 additions & 31 deletions pkg/clusteragent/autoscaling/workload/loadstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,30 @@ import (
"github.com/DataDog/agent-payload/v5/gogen"
)

// StoreInfo represents the store information, like memory usage and entity count.
// StoreInfo represents the store information which aggregates the entities to lowest level, i.e., container level
type StoreInfo struct {
currentTime Timestamp
StatsResults []*StatsResult
}

// StatsResult represents the statistics result for the entities aggregated by namespace, deployment, and load name.
// StatsResult provides a summary of the entities, grouped by namespace, podOwner, and metric name.
type StatsResult struct {
Namespace string
Deployment string
LoadName string
Count int
Min ValueType
P10 ValueType
Medium ValueType
Avg ValueType
P95 ValueType
P99 ValueType
Max ValueType
PodOwner string
MetricName string
Count int // Under <namespace, podOwner, metric>, number of containers if container type or pods if pod type
}

// PodResult provides the time series of entity values for a pod and its containers
type PodResult struct {
PodName string
ContainerValues map[string][]EntityValue // container name to a time series of entity values, e.g cpu usage from past three collection
PodLevelValue []EntityValue // If Pod level value is not available, it will be empty
}

// QueryResult provides the pod results for a given query
type QueryResult struct {
results []PodResult
}

// Store is an interface for in-memory storage of entities and their load metric values.
Expand All @@ -42,8 +47,8 @@ type Store interface {
// GetStoreInfo returns the store information.
GetStoreInfo() StoreInfo

// GetEntitiesStats to get all entities by given search filters
GetEntitiesStats(namespace string, deployment string, loadName string) StatsResult
// GetMetricsRaw provides the values of qualified entities by given search filters
GetMetricsRaw(metricName string, namespace string, podOwnerName string, containerName string) QueryResult

//DeleteEntityByHashKey to delete entity by hash key
DeleteEntityByHashKey(hash uint64)
Expand All @@ -63,36 +68,51 @@ func createEntitiesFromPayload(payload *gogen.MetricPayload) map[*Entity]*Entity
metricName := series.GetMetric()
points := series.GetPoints()
tags := series.GetTags()
resources := series.GetResources()
entity := Entity{
EntityType: UnknownType,
SourceID: "",
Host: "",
EntityName: "",
Namespace: "",
LoadName: metricName,
Deployment: "",
}
for _, resource := range resources {
if resource.Type == "host" {
entity.Host = resource.Name
}
EntityType: UnknownType,
EntityName: "",
Namespace: "",
MetricName: metricName,
PodOwnerName: "",
PodOwnerkind: Unsupported,
}
for _, tag := range tags {
k, v := splitTag(tag)
switch k {
case "display_container_name":
entity.EntityType = ContainerType
entity.EntityName = v
case "kube_namespace":
entity.Namespace = v
case "container_id":
entity.SourceID = v
entity.EntityType = ContainerType
case "kube_deployment":
entity.Deployment = v
case "kube_ownerref_name":
entity.PodOwnerName = v
case "kube_ownerref_kind":
switch v {
case "deployment":
entity.PodOwnerkind = Deployment
// TODO: add more cases
default:
entity.PodOwnerkind = Unsupported
}
case "container_name":
entity.ContainerName = v
case "pod_name":
entity.PodName = v
}
}
if entity.LoadName == "" || entity.Host == "" || entity.EntityType == UnknownType || entity.Namespace == "" || entity.SourceID == "" {
// TODO:
// if PodType, populate entity.type first
// if entity.EntityType == PodType {
// entity.EntityName = entity.PodName
// }
if entity.MetricName == "" ||
entity.EntityType == UnknownType ||
entity.Namespace == "" ||
entity.PodOwnerName == "" ||
entity.EntityName == "" ||
entity.PodOwnerkind == Unsupported {
continue
}
for _, point := range points {
Expand Down
Loading

0 comments on commit c500dc9

Please sign in to comment.