From c500dc9870bd4447c86402650e2489866b70a3ed Mon Sep 17 00:00:00 2001 From: Minyi Zhu Date: Tue, 24 Dec 2024 13:40:13 -0500 Subject: [PATCH] update query --- cmd/cluster-agent/api/v2/series/job.go | 4 +- go.mod | 8 - .../autoscaling/workload/loadstore/entity.go | 84 +++--- .../autoscaling/workload/loadstore/filter.go | 61 ---- .../autoscaling/workload/loadstore/store.go | 82 +++-- .../workload/loadstore/store_test.go | 112 +++++-- .../workload/loadstore/storeimpl.go | 284 +++++++++--------- .../autoscaling/workload/loadstore/util.go | 21 +- 8 files changed, 330 insertions(+), 326 deletions(-) delete mode 100644 pkg/clusteragent/autoscaling/workload/loadstore/filter.go diff --git a/cmd/cluster-agent/api/v2/series/job.go b/cmd/cluster-agent/api/v2/series/job.go index d380a17b35bb37..8dca57e93814fb 100644 --- a/cmd/cluster-agent/api/v2/series/job.go +++ b/cmd/cluster-agent/api/v2/series/job.go @@ -118,8 +118,8 @@ func (jq *jobQueue) reportTelemetry(ctx context.Context) { for _, statsResult := range statsResults { telemetryWorkloadEntities.Set(float64(statsResult.Count), statsResult.Namespace, - statsResult.Deployment, - statsResult.LoadName) + statsResult.PodOwner, + statsResult.MetricName) } } } diff --git a/go.mod b/go.mod index 1a2e331d796fe7..a939a145b37c42 100644 --- a/go.mod +++ b/go.mod @@ -647,15 +647,7 @@ require ( github.com/DataDog/datadog-agent/comp/trace/compression/impl-gzip v0.59.0 github.com/DataDog/datadog-agent/comp/trace/compression/impl-zstd v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/aggregator/ckey v0.59.0-rc.6 -<<<<<<< HEAD github.com/DataDog/datadog-agent/pkg/api v0.59.0 -======= - github.com/DataDog/datadog-agent/pkg/api v0.57.1 -<<<<<<< HEAD - github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/loadstore v0.0.1 ->>>>>>> e5b5b05bf4 (add load store) -======= ->>>>>>> 8416a0180a (support filter by namespace deployment) github.com/DataDog/datadog-agent/pkg/collector/check/defaults v0.59.0 github.com/DataDog/datadog-agent/pkg/config/env v0.59.0 github.com/DataDog/datadog-agent/pkg/config/mock v0.59.0 diff --git a/pkg/clusteragent/autoscaling/workload/loadstore/entity.go b/pkg/clusteragent/autoscaling/workload/loadstore/entity.go index e93ef5f1b55ee9..7bad865d7df347 100644 --- a/pkg/clusteragent/autoscaling/workload/loadstore/entity.go +++ b/pkg/clusteragent/autoscaling/workload/loadstore/entity.go @@ -10,11 +10,12 @@ package loadstore import ( "fmt" "time" - "unsafe" ) // EntityType defines the type of entity. -type EntityType int +type EntityType int8 + +type PodOwnerType int8 // ValueType defines the datatype of workload value. type ValueType float64 @@ -22,9 +23,16 @@ type ValueType float64 // Enumeration of entity types. const ( ContainerType EntityType = iota + PodType // TODO: PodType is not supported yet UnknownType ) +// Enumeration of pod owner types which is parsed from tags kube_ownerref_kind +const ( + Deployment PodOwnerType = iota + Unsupported +) + const ( // maxDataPoints is the maximum number of data points to store per entity. maxDataPoints = 3 @@ -35,33 +43,36 @@ const ( ) // Entity represents an entity with a type and its attributes. +// if entity is a pod, if entity restarts, a new entity will be created because podname is different +// if entity is a container, the entity will be same type Entity struct { - EntityType EntityType - SourceID string - Host string // serie.Host - EntityName string // display_container_name - Namespace string - LoadName string - Deployment string + EntityType EntityType // required, PodType or ContainerType + + // Use display_container_name for EntityName if EntityType is container + // or use podname for entityName if EntityType is pod + // display_container_name = container.Name + pod.Name + // if container is restarted, the display_container_name will be the same + EntityName string // required + + Namespace string // required + PodOwnerName string // required, parsed from tags kube_ownerref_name + PodOwnerkind PodOwnerType // required, parsed from tags kube_ownerref_kind + PodName string // required, parsed from tags pod_name + ContainerName string // optional, short container name, empty if EntityType is PodType + MetricName string // required, metric name of workload } // String returns a string representation of the Entity. func (e *Entity) String() string { return fmt.Sprintf( " Key: %d,"+ - " SourceID: %s,"+ - " LoadName: %s"+ + " MetricName: %s"+ " EntityName: %s,"+ " EntityType: %d,"+ - " Host: %s,"+ " Namespace: %s,"+ - " Deployment: %s", - hashEntityToUInt64(e), e.SourceID, e.LoadName, e.EntityName, e.EntityType, e.Host, e.Namespace, e.Deployment) -} - -// MemoryUsage returns the memory usage of the entity in bytes. -func (e *Entity) MemoryUsage() uint32 { - return uint32(len(e.SourceID)) + uint32(len(e.Host)) + uint32(len(e.EntityName)) + uint32(len(e.Namespace)) + uint32(len(e.LoadName)) + uint32(unsafe.Sizeof(e.EntityType)) + uint32(len(e.Deployment)) + " PodOwnerName: %s"+ + " PodOwnerType: %d", + hashEntityToUInt64(e), e.MetricName, e.EntityName, e.EntityType, e.Namespace, e.PodOwnerName, e.PodOwnerkind) } // EntityValue represents a value with a timestamp. @@ -80,8 +91,7 @@ func (ev *EntityValue) String() string { // EntityValueQueue represents a queue with a fixed capacity that removes the front element when full type EntityValueQueue struct { - data []ValueType - avg ValueType + data []*EntityValue head int tail int size int @@ -90,7 +100,7 @@ type EntityValueQueue struct { // pushBack adds an element to the back of the queue. // If the queue is full, it removes the front element first. -func (q *EntityValueQueue) pushBack(value ValueType) bool { +func (q *EntityValueQueue) pushBack(value *EntityValue) bool { if q.size == q.capacity { // Remove the front element q.head = (q.head + 1) % q.capacity @@ -101,24 +111,28 @@ func (q *EntityValueQueue) pushBack(value ValueType) bool { q.data[q.tail] = value q.tail = (q.tail + 1) % q.capacity q.size++ - q.avg = q.average() return true } -// average calculates the average value of the queue. -func (q *EntityValueQueue) average() ValueType { +// ToSlice converts the EntityValueQueue data to a slice of EntityValue. +func (q *EntityValueQueue) ToSlice() []EntityValue { if q.size == 0 { - return 0 + return []EntityValue{} } - sum := ValueType(0) - for i := 0; i < q.size; i++ { - index := (q.head + i) % q.capacity - sum += q.data[index] + + result := make([]EntityValue, 0, q.size) + if q.head < q.tail { + for _, v := range q.data[q.head:q.tail] { + result = append(result, *v) + } + } else { + for _, v := range q.data[q.head:] { + result = append(result, *v) + } + for _, v := range q.data[:q.tail] { + result = append(result, *v) + } } - return sum / ValueType(q.size) -} -// value returns the average value of the queue -func (q *EntityValueQueue) value() ValueType { - return q.avg + return result } diff --git a/pkg/clusteragent/autoscaling/workload/loadstore/filter.go b/pkg/clusteragent/autoscaling/workload/loadstore/filter.go deleted file mode 100644 index 91a6aaeb15b058..00000000000000 --- a/pkg/clusteragent/autoscaling/workload/loadstore/filter.go +++ /dev/null @@ -1,61 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2016-present Datadog, Inc. - -//go:build kubeapiserver - -package loadstore - -// entityFilter defines an interface to include entity based on its attributes. -type entityFilter interface { - IsIncluded(entity *Entity) bool -} - -type namespaceFilter struct { - namespace string -} - -// IsIncluded returns true if the entity's namespace is included. -func (f *namespaceFilter) IsIncluded(entity *Entity) bool { - return f.namespace == "" || f.namespace == entity.Namespace -} - -type loadNameFilter struct { - loadName string -} - -// IsIncluded returns true if the entity's load name is included. -func (f *loadNameFilter) IsIncluded(entity *Entity) bool { - return f.loadName == "" || f.loadName == entity.LoadName -} - -type deploymentFilter struct { - deployment string -} - -// IsIncluded returns true if the entity's load name is included. -func (f *deploymentFilter) IsIncluded(entity *Entity) bool { - return f.deployment == "" || f.deployment == entity.Deployment -} - -// ANDEntityFilter implements a logical AND between given filters -type ANDEntityFilter struct { - Filters []entityFilter -} - -// IsIncluded returns true if the entity is included by all filters. -func (f *ANDEntityFilter) IsIncluded(entity *Entity) bool { - for _, filter := range f.Filters { - if !filter.IsIncluded(entity) { - return false - } - } - return true -} - -func newANDEntityFilter(filters ...entityFilter) *ANDEntityFilter { - f := &ANDEntityFilter{} - f.Filters = append(f.Filters, filters...) - return f -} diff --git a/pkg/clusteragent/autoscaling/workload/loadstore/store.go b/pkg/clusteragent/autoscaling/workload/loadstore/store.go index 2e1dbe1f2d8573..4d118958dd58ac 100644 --- a/pkg/clusteragent/autoscaling/workload/loadstore/store.go +++ b/pkg/clusteragent/autoscaling/workload/loadstore/store.go @@ -13,25 +13,30 @@ import ( "github.com/DataDog/agent-payload/v5/gogen" ) -// StoreInfo represents the store information, like memory usage and entity count. +// StoreInfo represents the store information which aggregates the entities to lowest level, i.e., container level type StoreInfo struct { currentTime Timestamp StatsResults []*StatsResult } -// StatsResult represents the statistics result for the entities aggregated by namespace, deployment, and load name. +// StatsResult provides a summary of the entities, grouped by namespace, podOwner, and metric name. type StatsResult struct { Namespace string - Deployment string - LoadName string - Count int - Min ValueType - P10 ValueType - Medium ValueType - Avg ValueType - P95 ValueType - P99 ValueType - Max ValueType + PodOwner string + MetricName string + Count int // Under , number of containers if container type or pods if pod type +} + +// PodResult provides the time series of entity values for a pod and its containers +type PodResult struct { + PodName string + ContainerValues map[string][]EntityValue // container name to a time series of entity values, e.g cpu usage from past three collection + PodLevelValue []EntityValue // If Pod level value is not available, it will be empty +} + +// QueryResult provides the pod results for a given query +type QueryResult struct { + results []PodResult } // Store is an interface for in-memory storage of entities and their load metric values. @@ -42,8 +47,8 @@ type Store interface { // GetStoreInfo returns the store information. GetStoreInfo() StoreInfo - // GetEntitiesStats to get all entities by given search filters - GetEntitiesStats(namespace string, deployment string, loadName string) StatsResult + // GetMetricsRaw provides the values of qualified entities by given search filters + GetMetricsRaw(metricName string, namespace string, podOwnerName string, containerName string) QueryResult //DeleteEntityByHashKey to delete entity by hash key DeleteEntityByHashKey(hash uint64) @@ -63,36 +68,51 @@ func createEntitiesFromPayload(payload *gogen.MetricPayload) map[*Entity]*Entity metricName := series.GetMetric() points := series.GetPoints() tags := series.GetTags() - resources := series.GetResources() entity := Entity{ - EntityType: UnknownType, - SourceID: "", - Host: "", - EntityName: "", - Namespace: "", - LoadName: metricName, - Deployment: "", - } - for _, resource := range resources { - if resource.Type == "host" { - entity.Host = resource.Name - } + EntityType: UnknownType, + EntityName: "", + Namespace: "", + MetricName: metricName, + PodOwnerName: "", + PodOwnerkind: Unsupported, } for _, tag := range tags { k, v := splitTag(tag) switch k { case "display_container_name": + entity.EntityType = ContainerType entity.EntityName = v case "kube_namespace": entity.Namespace = v case "container_id": - entity.SourceID = v entity.EntityType = ContainerType - case "kube_deployment": - entity.Deployment = v + case "kube_ownerref_name": + entity.PodOwnerName = v + case "kube_ownerref_kind": + switch v { + case "deployment": + entity.PodOwnerkind = Deployment + // TODO: add more cases + default: + entity.PodOwnerkind = Unsupported + } + case "container_name": + entity.ContainerName = v + case "pod_name": + entity.PodName = v } } - if entity.LoadName == "" || entity.Host == "" || entity.EntityType == UnknownType || entity.Namespace == "" || entity.SourceID == "" { + // TODO: + // if PodType, populate entity.type first + // if entity.EntityType == PodType { + // entity.EntityName = entity.PodName + // } + if entity.MetricName == "" || + entity.EntityType == UnknownType || + entity.Namespace == "" || + entity.PodOwnerName == "" || + entity.EntityName == "" || + entity.PodOwnerkind == Unsupported { continue } for _, point := range points { diff --git a/pkg/clusteragent/autoscaling/workload/loadstore/store_test.go b/pkg/clusteragent/autoscaling/workload/loadstore/store_test.go index 747fca8ea59ba0..f27c8539b3a86f 100644 --- a/pkg/clusteragent/autoscaling/workload/loadstore/store_test.go +++ b/pkg/clusteragent/autoscaling/workload/loadstore/store_test.go @@ -9,7 +9,6 @@ package loadstore import ( "fmt" - "strconv" "sync" "testing" "time" @@ -18,23 +17,26 @@ import ( "github.com/stretchr/testify/assert" ) -func createSeriesPayload(i int) *gogen.MetricPayload { - containerID := fmt.Sprintf("container_id:%d", i) - displayContainerName := fmt.Sprintf("display_container_name:%d", i) +func createSeriesPayload(i int, timeDelta int64) *gogen.MetricPayload { + containerID := fmt.Sprintf("container_id:%d", 10) + containerName := "container_name:container_test" + displayContainerName := fmt.Sprintf("display_container_name:pod_%d-container_test", i) namespace := "kube_namespace:test" deployment := "kube_deployment:redis_test" + kubeOwnerrefName := "kube_ownerref_name:redis_test" + podName := fmt.Sprintf("pod_name:redis_%d", i) payload := gogen.MetricPayload{ Series: []*gogen.MetricPayload_MetricSeries{ { - Metric: "datadog.test.run", + Metric: "container.memory.usage", Type: 3, // Gauge Points: []*gogen.MetricPayload_MetricPoint{ { - Timestamp: time.Now().Unix() - 100, // 100 seconds ago + Timestamp: time.Now().Unix() - timeDelta, // timeDelta seconds ago Value: float64(i), }, }, - Tags: []string{containerID, displayContainerName, namespace, deployment}, + Tags: []string{containerID, displayContainerName, namespace, deployment, kubeOwnerrefName, podName, containerName}, Resources: []*gogen.MetricPayload_Resource{ { Type: "host", Name: "localHost", @@ -46,11 +48,14 @@ func createSeriesPayload(i int) *gogen.MetricPayload { return &payload } -func createSeriesPayload2(i int) *gogen.MetricPayload { +func createSeriesPayload2(i int, timeDelta int64) *gogen.MetricPayload { containerID := fmt.Sprintf("container_id:%d", i) - displayContainerName := fmt.Sprintf("display_container_name:%d", i) + containerName := "container_name:container_test" + displayContainerName := fmt.Sprintf("display_container_name:pod_%d-container_test", i) namespace := "kube_namespace:test" deployment := "kube_deployment:nginx_test" + kubeOwnerrefName := "kube_ownerref_name:nginx_test" + podName := fmt.Sprintf("pod_name:nginx_%d", i) payload := gogen.MetricPayload{ Series: []*gogen.MetricPayload_MetricSeries{ { @@ -58,11 +63,11 @@ func createSeriesPayload2(i int) *gogen.MetricPayload { Type: 3, // Gauge Points: []*gogen.MetricPayload_MetricPoint{ { - Timestamp: time.Now().Unix() - 100, // 100 seconds ago + Timestamp: time.Now().Unix() - timeDelta, // timeDelta seconds ago Value: float64(i), }, }, - Tags: []string{containerID, displayContainerName, namespace, deployment}, + Tags: []string{containerID, displayContainerName, namespace, deployment, kubeOwnerrefName, podName, containerName}, Resources: []*gogen.MetricPayload_Resource{ { Type: "host", Name: "localHost2", @@ -77,15 +82,17 @@ func createSeriesPayload2(i int) *gogen.MetricPayload { func TestCreateEntitiesFromPayload(t *testing.T) { numPayloads := 10 for i := 0; i < numPayloads; i++ { - payload := createSeriesPayload(i) + payload := createSeriesPayload(i, 100) entities := createEntitiesFromPayload(payload) assert.Equal(t, len(entities), 1) for k, v := range entities { - assert.Equal(t, "datadog.test.run", k.LoadName) - assert.Equal(t, "localHost", k.Host) - assert.Equal(t, strconv.Itoa(i), k.SourceID) + assert.Equal(t, "container.memory.usage", k.MetricName) assert.Equal(t, ValueType(i), v.value) - assert.Equal(t, "redis_test", k.Deployment) + assert.Equal(t, fmt.Sprintf("redis_%d", i), k.PodName) + assert.Equal(t, "test", k.Namespace) + assert.Equal(t, "redis_test", k.PodOwnerName) + assert.Equal(t, "container_test", k.ContainerName) + assert.Equal(t, fmt.Sprintf("pod_%d-container_test", i), k.EntityName) } } } @@ -94,33 +101,74 @@ func TestStoreAndPurgeEntities(t *testing.T) { numPayloads := 100 store := EntityStore{ key2ValuesMap: make(map[uint64]*dataItem), + keyAttrTable: make(map[compositeKey]podList), lock: sync.RWMutex{}, - keyAttrTable: make(map[string]map[string]map[string]map[uint64]struct{}), } - for i := 0; i < numPayloads; i++ { - payload := createSeriesPayload(i) - entities := createEntitiesFromPayload(payload) - store.SetEntitiesValues(entities) - payload2 := createSeriesPayload2(i) - entities2 := createEntitiesFromPayload(payload2) - store.SetEntitiesValues(entities2) + for _, timeDelta := range []int64{100, 85, 80} { + for i := 0; i < numPayloads; i++ { + payload := createSeriesPayload(i, timeDelta) + entities := createEntitiesFromPayload(payload) + store.SetEntitiesValues(entities) + payload2 := createSeriesPayload2(i, timeDelta) + entities2 := createEntitiesFromPayload(payload2) + store.SetEntitiesValues(entities2) + + } } storeInfo := store.GetStoreInfo() assert.Equal(t, 2, len(storeInfo.StatsResults)) - for _, statsResult := range storeInfo.StatsResults { + for id, statsResult := range storeInfo.StatsResults { assert.Equal(t, numPayloads, statsResult.Count) - assert.EqualValues(t, 99, statsResult.Max) - assert.EqualValues(t, 0, statsResult.Min) - assert.EqualValues(t, 49.5, statsResult.Avg) - assert.EqualValues(t, 49, statsResult.Medium) - assert.EqualValues(t, 98, statsResult.P99) - assert.EqualValues(t, 94, statsResult.P95) + if id == 0 { + assert.Equal(t, "test", statsResult.Namespace) + assert.Equal(t, "redis_test", statsResult.PodOwner) + assert.Equal(t, "container.memory.usage", statsResult.MetricName) + } else { + assert.Equal(t, numPayloads, statsResult.Count) + assert.Equal(t, "test", statsResult.Namespace) + assert.Equal(t, "nginx_test", statsResult.PodOwner) + assert.Equal(t, "container.cpu.usage", statsResult.MetricName) + } } - store.purgeInactiveEntities(30 * time.Second) storeInfo = store.GetStoreInfo() for _, statsResult := range storeInfo.StatsResults { assert.Equal(t, 0, statsResult.Count) } +} + +func TestGetMetrics(t *testing.T) { + numPayloads := 100 + store := EntityStore{ + key2ValuesMap: make(map[uint64]*dataItem), + keyAttrTable: make(map[compositeKey]podList), + lock: sync.RWMutex{}, + } + for _, timeDelta := range []int64{100, 85, 80} { + for i := 0; i < numPayloads; i++ { + payload := createSeriesPayload(i, timeDelta) + entities := createEntitiesFromPayload(payload) + store.SetEntitiesValues(entities) + payload2 := createSeriesPayload2(i, timeDelta) + entities2 := createEntitiesFromPayload(payload2) + store.SetEntitiesValues(entities2) + + } + } + queryResult := store.GetMetricsRaw("container.cpu.usage", "test", "nginx_test", "") + assert.Equal(t, 100, len(queryResult.results)) + for _, podResult := range queryResult.results { + assert.Equal(t, 1, len(podResult.ContainerValues)) + assert.Equal(t, 0, len(podResult.PodLevelValue)) + for containerName, entityValues := range podResult.ContainerValues { + assert.Equal(t, "container_test", containerName) + assert.Equal(t, 3, len(entityValues)) + } + } + + emptyQueryResult := store.GetMetricsRaw("container.cpu.usage", "test", "nginx_test", "container_test2") + assert.Equal(t, 0, len(emptyQueryResult.results)) + filteredQueryResult := store.GetMetricsRaw("container.memory.usage", "test", "redis_test", "container_test") + assert.Equal(t, 100, len(filteredQueryResult.results)) } diff --git a/pkg/clusteragent/autoscaling/workload/loadstore/storeimpl.go b/pkg/clusteragent/autoscaling/workload/loadstore/storeimpl.go index b2dec9fb53fd27..1cc9670f2a8883 100644 --- a/pkg/clusteragent/autoscaling/workload/loadstore/storeimpl.go +++ b/pkg/clusteragent/autoscaling/workload/loadstore/storeimpl.go @@ -9,8 +9,6 @@ package loadstore import ( "context" - "math" - "sort" "sync" "time" @@ -21,22 +19,55 @@ var _ Store = (*EntityStore)(nil) type dataItem struct { entity *Entity - valueQue EntityValueQueue // value queue - lastActiveTs Timestamp + valueQue EntityValueQueue // value queue, default 3 data points + lastActiveTs Timestamp // last active timestamp } -// EntityStore stores entities to values, hash keys to entities mapping. +func convertsToEntityValueSlice(data []*EntityValue) []EntityValue { + result := make([]EntityValue, len(data)) + for i, v := range data { + if v != nil { + result[i] = *v + } + } + return result +} + +// compositeKey is a hash id of composite key for the keyAttrTable, which is used for quick filtering +type compositeKey uint64 + +func generateCompositeKey(namespace, podOwnerName, metricName string) compositeKey { + return compositeKey(generateHash(namespace, podOwnerName, metricName)) +} + +// dataPerPod stores the mapping between contaienr name and entity hash id and pod level entity hash id if available +// {containerName: entityHashId, containerName2: entityHashId2...} +type dataPerPod struct { + containers map[string]uint64 // map container name -> entity hash id + podEntityID uint64 // pod level entity hash id, if not available, it will be 0 +} + +// podList has a map of pod name (i.e. pod name: expod-hash1-hash2 ) to dataPerPod +type podList struct { + pods map[string]*dataPerPod + namespace string + podOwnerName string + metricName string +} + +// EntityStore manages mappings between entities and their hashed keys. type EntityStore struct { - key2ValuesMap map[uint64]*dataItem // Maps hash key to a entity and its values - keyAttrTable map[string]map[string]map[string]map[uint64]struct{} // map namespace -> deployment -> loadName -> hashKeys - lock sync.RWMutex // Protects access to store and entityMap + key2ValuesMap map[uint64]*dataItem // Maps hash(entity) to a dataitem (entity and its values) + keyAttrTable map[compositeKey]podList // map Hash -> pod name -> dataPerPod + lock sync.RWMutex // Protects access to store and entityMap } // NewEntityStore creates a new EntityStore. func NewEntityStore(ctx context.Context) *EntityStore { store := EntityStore{ key2ValuesMap: make(map[uint64]*dataItem), - keyAttrTable: make(map[string]map[string]map[string]map[uint64]struct{}), + keyAttrTable: make(map[compositeKey]podList), + lock: sync.RWMutex{}, } store.startCleanupInBackground(ctx) return &store @@ -47,17 +78,17 @@ func (es *EntityStore) SetEntitiesValues(entities map[*Entity]*EntityValue) { es.lock.Lock() // Lock for writing defer es.lock.Unlock() for entity, value := range entities { - if entity.Deployment == "" || entity.LoadName == "" || entity.Namespace == "" { - log.Tracef("Skipping entity with empty namespace, deployment or loadName: %v", entity) + if entity.EntityName == "" || entity.MetricName == "" || entity.Namespace == "" || entity.PodOwnerName == "" { + log.Tracef("Skipping entity with empty entityName, podOwnerName, namespace or metricName: %v", entity) continue } - hash := hashEntityToUInt64(entity) - data, exists := es.key2ValuesMap[hash] + entityHash := hashEntityToUInt64(entity) + data, exists := es.key2ValuesMap[entityHash] if !exists { data = &dataItem{ entity: entity, valueQue: EntityValueQueue{ - data: make([]ValueType, maxDataPoints), + data: make([]*EntityValue, maxDataPoints), head: 0, tail: 0, size: 0, @@ -65,125 +96,102 @@ func (es *EntityStore) SetEntitiesValues(entities map[*Entity]*EntityValue) { }, lastActiveTs: value.timestamp, } - data.valueQue.pushBack(value.value) - es.key2ValuesMap[hash] = data + data.valueQue.pushBack(value) + es.key2ValuesMap[entityHash] = data } else { if data.lastActiveTs < value.timestamp { // Update the last active timestamp data.lastActiveTs = value.timestamp - data.valueQue.pushBack(value.value) - } + data.valueQue.pushBack(value) + } //else if lastActiveTs is greater than value.timestamp, skip the value because it is outdated } + // Update the key attribute table - if _, ok := es.keyAttrTable[entity.Namespace]; !ok { - es.keyAttrTable[entity.Namespace] = make(map[string]map[string]map[uint64]struct{}) + compositeKeyHash := generateCompositeKey(entity.Namespace, entity.PodOwnerName, entity.MetricName) + if _, ok := es.keyAttrTable[compositeKeyHash]; !ok { + es.keyAttrTable[compositeKeyHash] = podList{ + pods: make(map[string]*dataPerPod), + namespace: entity.Namespace, + podOwnerName: entity.PodOwnerName, + metricName: entity.MetricName, + } } - if _, ok := es.keyAttrTable[entity.Namespace][entity.Deployment]; !ok { - es.keyAttrTable[entity.Namespace][entity.Deployment] = make(map[string]map[uint64]struct{}) + if _, ok := (es.keyAttrTable[compositeKeyHash].pods)[entity.PodName]; !ok { + (es.keyAttrTable[compositeKeyHash].pods)[entity.PodName] = &dataPerPod{ + containers: make(map[string]uint64), + podEntityID: 0, + } + } + // Update the pod level entity hash id + if entity.EntityType == PodType { + (es.keyAttrTable[compositeKeyHash].pods)[entity.PodName].podEntityID = entityHash } - if _, ok := es.keyAttrTable[entity.Namespace][entity.Deployment][entity.LoadName]; !ok { - es.keyAttrTable[entity.Namespace][entity.Deployment][entity.LoadName] = make(map[uint64]struct{}) + if entity.EntityType == ContainerType { + (es.keyAttrTable[compositeKeyHash].pods)[entity.PodName].containers[entity.ContainerName] = entityHash } - es.keyAttrTable[entity.Namespace][entity.Deployment][entity.LoadName][hash] = struct{}{} } } -// GetEntitiesStats to get all entities by given search filters -func (es *EntityStore) GetEntitiesStats(namespace string, deployment string, loadName string) StatsResult { +// GetMetricsRaw to get all entities by given search filters +func (es *EntityStore) GetMetricsRaw(metricName string, + namespace string, + podOwnerName string, + containerName string) QueryResult { es.lock.RLock() // Lock for writing defer es.lock.RUnlock() - return es.calculateStatsByConditions(namespace, deployment, loadName) -} - -// calculateStatsByFilter is an internal function to scan entire table (slow). Caller should acquire lock. -func (es *EntityStore) calculateStatsByFilter(namespace string, deployment string, loadName string) StatsResult { - filter1 := namespaceFilter{namespace: namespace} - filter2 := deploymentFilter{deployment: deployment} - filter3 := loadNameFilter{loadName: loadName} - filter := newANDEntityFilter(&filter1, &filter2, &filter3) - rawData := make([]ValueType, 0, len(es.key2ValuesMap)) - for _, dataItem := range es.key2ValuesMap { - entity := dataItem.entity - if !filter.IsIncluded(entity) { - continue - } - entityValue := dataItem.valueQue.value() - rawData = append(rawData, entityValue) - } - return es.statsCalc(rawData, namespace, deployment, loadName) -} - -func (es *EntityStore) calculateStatsByConditions(namespace string, deployment string, loadName string) StatsResult { - rawData := make([]ValueType, 0, len(es.key2ValuesMap)) - if namespace == "" || deployment == "" || loadName == "" { - // scan entire table (slow) - return es.calculateStatsByFilter(namespace, deployment, loadName) + compositeKeyHash := generateCompositeKey(namespace, podOwnerName, metricName) + podList, ok := es.keyAttrTable[compositeKeyHash] + if !ok { + return QueryResult{} } - - // Traverse the keyAttrTable based on provided conditions - if nsMap, nsExists := es.keyAttrTable[namespace]; nsExists { - // Check if deployment is provided and exists in the namespace map - if depMap, depExists := nsMap[deployment]; depExists { - // Check if loadName is provided and exists in the deployment map - if loadMap, loadExists := depMap[loadName]; loadExists { - // Populate rawData with hash keys - for hash := range loadMap { - rawData = append(rawData, es.key2ValuesMap[hash].valueQue.value()) + var result QueryResult + for podName, dataPerPod := range podList.pods { + if dataPerPod.podEntityID != 0 { // if it is a pod level entity + entity := es.key2ValuesMap[dataPerPod.podEntityID] + podResult := PodResult{ + PodName: podName, + PodLevelValue: convertsToEntityValueSlice(entity.valueQue.data), + } + result.results = append(result.results, podResult) + } else { + podList := PodResult{ + PodName: podName, + ContainerValues: make(map[string][]EntityValue), + } + for containerNameKey, entityHash := range dataPerPod.containers { + if containerName != "" && containerName != containerNameKey { + continue } + entity := es.key2ValuesMap[entityHash] + podList.ContainerValues[containerNameKey] = convertsToEntityValueSlice(entity.valueQue.data) + } + if len(podList.ContainerValues) > 0 { + result.results = append(result.results, podList) } } } - return es.statsCalc(rawData, namespace, deployment, loadName) -} - -func (es *EntityStore) statsCalc(rawData []ValueType, namespace string, deployment string, loadName string) StatsResult { - count := len(rawData) - if count == 0 { - return StatsResult{ - Namespace: namespace, - Deployment: deployment, - LoadName: loadName, - Count: 0, - Min: 0, - Max: 0, - Avg: 0, - Medium: 0, - P10: 0, - P95: 0, - P99: 0, - } - } - sum := ValueType(0) - for _, value := range rawData { - sum += value - } - sort.Slice(rawData, func(i, j int) bool { return rawData[i] < rawData[j] }) - percentileIndexFunc := func(percentile float64) int { - return int(math.Floor(float64(len(rawData)-1) * percentile)) - } - return StatsResult{ - Namespace: namespace, - Deployment: deployment, - LoadName: loadName, - Count: count, - Min: rawData[0], - Max: rawData[len(rawData)-1], - Avg: sum / ValueType(count), - Medium: rawData[percentileIndexFunc(0.5)], - P10: rawData[percentileIndexFunc(0.1)], - P95: rawData[percentileIndexFunc(0.95)], - P99: rawData[percentileIndexFunc(0.99)], - } + return result } func (es *EntityStore) deleteInternal(hash uint64) { - if dataItem, exists := es.key2ValuesMap[hash]; exists { - // Remove the entity from the keyAttrTable - entity := dataItem.entity - delete(es.keyAttrTable[entity.Namespace][entity.Deployment][entity.LoadName], hash) - // Remove the entity from the key2ValuesMap + if toBeDelItem, exists := es.key2ValuesMap[hash]; exists { + compositeKeyHash := generateCompositeKey(toBeDelItem.entity.Namespace, toBeDelItem.entity.PodOwnerName, toBeDelItem.entity.MetricName) + if _, ok := es.keyAttrTable[compositeKeyHash]; ok { + if dataPerPod, ok := (es.keyAttrTable[compositeKeyHash].pods)[toBeDelItem.entity.PodName]; ok { + // Delete the entity from the keyAttrTable + if toBeDelItem.entity.EntityType == ContainerType { + delete(dataPerPod.containers, toBeDelItem.entity.EntityName) + } + // Delete the pod from the keyAttrTable if there is no container + if toBeDelItem.entity.EntityType == PodType || + (len(dataPerPod.containers) == 0 && dataPerPod.podEntityID == 0) { + delete((es.keyAttrTable[compositeKeyHash].pods), toBeDelItem.entity.PodName) + } + } + } + // Delete the entity from the key2ValuesMap + delete(es.key2ValuesMap, hash) } - delete(es.key2ValuesMap, hash) } // DeleteEntityByHashKey deltes an entity from the store. @@ -223,47 +231,29 @@ func (es *EntityStore) startCleanupInBackground(ctx context.Context) { }() } -// GetStoreInfo returns the store information. +// GetStoreInfo returns the store information, aggregated by namespace, podOwner, and metric name func (es *EntityStore) GetStoreInfo() StoreInfo { es.lock.RLock() defer es.lock.RUnlock() - var results []*StatsResult - // Iterate through namespaces - for namespace, nsMap := range es.keyAttrTable { - // Iterate through deployments - for deployment, depMap := range nsMap { - // Iterate through loadNames - for loadName, entities := range depMap { - if len(entities) > 0 { - var rawData []ValueType - // Populate rawData with hash keys - for hash := range entities { - rawData = append(rawData, es.key2ValuesMap[hash].valueQue.value()) - } - // Generate StatsResult and add to results - statsResult := es.statsCalc(rawData, namespace, deployment, loadName) - results = append(results, &statsResult) - } else { - results = append(results, &StatsResult{ - Namespace: namespace, - Deployment: deployment, - LoadName: loadName, - Count: 0, - Min: 0, - Max: 0, - Avg: 0, - Medium: 0, - P10: 0, - P95: 0, - P99: 0, - }) - } + var storeInfo StoreInfo + for _, podList := range es.keyAttrTable { + namespace := podList.namespace + podOwnerName := podList.podOwnerName + metricName := podList.metricName + count := 0 + for _, dataPerPod := range podList.pods { + count += len(dataPerPod.containers) + if dataPerPod.podEntityID != 0 { + count++ } } + storeInfo.StatsResults = append(storeInfo.StatsResults, &StatsResult{ + Namespace: namespace, + PodOwner: podOwnerName, + MetricName: metricName, + Count: count, + }) } - - return StoreInfo{ - currentTime: getCurrentTime(), - StatsResults: results, - } + storeInfo.currentTime = getCurrentTime() + return storeInfo } diff --git a/pkg/clusteragent/autoscaling/workload/loadstore/util.go b/pkg/clusteragent/autoscaling/workload/loadstore/util.go index 8b82a766b69834..ce940f904e8bb1 100644 --- a/pkg/clusteragent/autoscaling/workload/loadstore/util.go +++ b/pkg/clusteragent/autoscaling/workload/loadstore/util.go @@ -15,21 +15,22 @@ import ( // Timestamp is a uint32 representing a timestamp. type Timestamp uint32 -// hashEntityToUInt64 generates an uint64 hash for an Entity. -func hashEntityToUInt64(entity *Entity) uint64 { +// generateHash generates a uint64 hash for an unknown number of strings. +func generateHash(strings ...string) uint64 { // Initialize a new FNV-1a hasher hasher := fnv.New64a() - // Convert and write the entity's SourceID (string) to the hasher - hasher.Write([]byte(entity.SourceID)) - // Convert and write the entity's host (string) to the hasher - hasher.Write([]byte(entity.Host)) - // Convert and write the entity's namespace (string) to the hasher - hasher.Write([]byte(entity.Namespace)) - // Convert and write the entity's LoadName (string) to the hasher - hasher.Write([]byte(entity.LoadName)) + // Iterate over the strings and write each one to the hasher + for _, str := range strings { + hasher.Write([]byte(str)) + } return hasher.Sum64() } +// hashEntityToUInt64 generates an uint64 hash for an Entity. +func hashEntityToUInt64(entity *Entity) uint64 { + return generateHash(entity.EntityName, entity.Namespace, entity.MetricName) +} + // getCurrentTime returns the current time in uint32 func getCurrentTime() Timestamp { return timeToTimestamp(time.Now())