diff --git a/internal/aws/containerinsight/const.go b/internal/aws/containerinsight/const.go index 4e7db5330509..ee8764f1364e 100644 --- a/internal/aws/containerinsight/const.go +++ b/internal/aws/containerinsight/const.go @@ -86,6 +86,14 @@ const ( FSInodesfree = "filesystem_inodes_free" FSUtilization = "filesystem_utilization" + StatusConditionReady = "status_condition_ready" + StatusConditionDiskPressure = "status_condition_disk_pressure" + StatusConditionMemoryPressure = "status_condition_memory_pressure" + StatusConditionPIDPressure = "status_condition_pid_pressure" + StatusConditionNetworkUnavailable = "status_condition_network_unavailable" + StatusCapacityPods = "status_capacity_pods" + StatusAllocatablePods = "status_allocatable_pods" + RunningPodCount = "number_of_running_pods" RunningContainerCount = "number_of_running_containers" ContainerCount = "number_of_containers" @@ -201,6 +209,15 @@ func init() { FSInodesfree: UnitCount, FSUtilization: UnitPercent, + // status metrics + StatusConditionReady: UnitCount, + StatusConditionDiskPressure: UnitCount, + StatusConditionMemoryPressure: UnitCount, + StatusConditionPIDPressure: UnitCount, + StatusConditionNetworkUnavailable: UnitCount, + StatusCapacityPods: UnitCount, + StatusAllocatablePods: UnitCount, + // cluster metrics NodeCount: UnitCount, FailedNodeCount: UnitCount, diff --git a/internal/aws/k8s/k8sclient/clientset.go b/internal/aws/k8s/k8sclient/clientset.go index 50d08244270e..66abc3f79fd6 100644 --- a/internal/aws/k8s/k8sclient/clientset.go +++ b/internal/aws/k8s/k8sclient/clientset.go @@ -19,11 +19,13 @@ import ( "path/filepath" "reflect" "sort" + "strconv" "strings" "sync" "time" "go.uber.org/zap" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -116,6 +118,28 @@ func InitSyncPollTimeout(pollTimeout time.Duration) Option { } } +// NodeSelector provides the option to provide a field selector +// when retrieving information using the node client +func NodeSelector(nodeSelector fields.Selector) Option { + return Option{ + name: "nodeSelector:" + nodeSelector.String(), + set: func(kc *K8sClient) { + kc.nodeSelector = nodeSelector + }, + } +} + +// CaptureNodeLevelInfo allows one to specify whether node level info +// should be captured and retained in memory +func CaptureNodeLevelInfo(captureNodeLevelInfo bool) Option { + return Option{ + name: "captureNodeLevelInfo:" + strconv.FormatBool(captureNodeLevelInfo), + set: func(kc *K8sClient) { + kc.captureNodeLevelInfo = captureNodeLevelInfo + }, + } +} + func getStringifiedOptions(options ...Option) string { var opts []string for _, option := range options { @@ -188,6 +212,9 @@ type K8sClient struct { nodeMu sync.Mutex node nodeClientWithStopper + nodeSelector fields.Selector + captureNodeLevelInfo bool + jobMu sync.Mutex job jobClientWithStopper @@ -274,7 +301,11 @@ func (c *K8sClient) ShutdownPodClient() { func (c *K8sClient) GetNodeClient() NodeClient { c.nodeMu.Lock() if c.node == nil { - c.node = newNodeClient(c.clientSet, c.logger, nodeSyncCheckerOption(c.syncChecker)) + opts := []nodeClientOption{nodeSyncCheckerOption(c.syncChecker), captureNodeLevelInfoOption(c.captureNodeLevelInfo)} + if c.nodeSelector != nil { + opts = append(opts, nodeSelectorOption(c.nodeSelector)) + } + c.node = newNodeClient(c.clientSet, c.logger, opts...) } c.nodeMu.Unlock() return c.node diff --git a/internal/aws/k8s/k8sclient/clientset_test.go b/internal/aws/k8s/k8sclient/clientset_test.go index 9f4e844f734a..69b32cae1d1f 100644 --- a/internal/aws/k8s/k8sclient/clientset_test.go +++ b/internal/aws/k8s/k8sclient/clientset_test.go @@ -20,6 +20,7 @@ import ( "github.com/stretchr/testify/assert" "go.uber.org/zap" + "k8s.io/apimachinery/pkg/fields" ) func TestGetShutdown(t *testing.T) { @@ -29,6 +30,8 @@ func TestGetShutdown(t *testing.T) { KubeConfigPath(tmpConfigPath), InitSyncPollInterval(10*time.Nanosecond), InitSyncPollTimeout(20*time.Nanosecond), + NodeSelector(fields.OneTermEqualSelector("testField", "testVal")), + CaptureNodeLevelInfo(true), ) assert.Equal(t, 1, len(optionsToK8sClient)) assert.NotNil(t, k8sClient.GetClientSet()) @@ -37,6 +40,8 @@ func TestGetShutdown(t *testing.T) { assert.NotNil(t, k8sClient.GetNodeClient()) assert.NotNil(t, k8sClient.GetPodClient()) assert.NotNil(t, k8sClient.GetReplicaSetClient()) + assert.True(t, k8sClient.captureNodeLevelInfo) + assert.Equal(t, "testField=testVal", k8sClient.nodeSelector.String()) k8sClient.Shutdown() assert.Nil(t, k8sClient.ep) assert.Nil(t, k8sClient.job) diff --git a/internal/aws/k8s/k8sclient/node.go b/internal/aws/k8s/k8sclient/node.go index 0f8283f8c549..44d367c2f6dd 100644 --- a/internal/aws/k8s/k8sclient/node.go +++ b/internal/aws/k8s/k8sclient/node.go @@ -22,6 +22,7 @@ import ( "go.uber.org/zap" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" @@ -41,6 +42,9 @@ type NodeClient interface { ClusterFailedNodeCount() int // Get the number of nodes for current cluster ClusterNodeCount() int + NodeToCapacityMap() map[string]v1.ResourceList + NodeToAllocatableMap() map[string]v1.ResourceList + NodeToConditionsMap() map[string]map[v1.NodeConditionType]v1.ConditionStatus } type nodeClientOption func(*nodeClient) @@ -51,16 +55,39 @@ func nodeSyncCheckerOption(checker initialSyncChecker) nodeClientOption { } } +func nodeSelectorOption(nodeSelector fields.Selector) nodeClientOption { + return func(n *nodeClient) { + n.nodeSelector = nodeSelector + } +} + +func captureNodeLevelInfoOption(captureNodeLevelInfo bool) nodeClientOption { + return func(n *nodeClient) { + n.captureNodeLevelInfo = captureNodeLevelInfo + } +} + type nodeClient struct { stopChan chan struct{} store *ObjStore + logger *zap.Logger stopped bool syncChecker initialSyncChecker + nodeSelector fields.Selector + + // The node client can be used in several places, including code paths that execute on both leader and non-leader nodes. + // But for logic on the leader node (for ex in k8sapiserver.go), there is no need to obtain node level info since only cluster + // level info is needed there. Hence, this optimization allows us to save on memory by not capturing node level info when not needed. + captureNodeLevelInfo bool + mu sync.RWMutex clusterFailedNodeCount int clusterNodeCount int + nodeToCapacityMap map[string]v1.ResourceList + nodeToAllocatableMap map[string]v1.ResourceList + nodeToConditionsMap map[string]map[v1.NodeConditionType]v1.ConditionStatus } func (c *nodeClient) ClusterFailedNodeCount() int { @@ -81,6 +108,42 @@ func (c *nodeClient) ClusterNodeCount() int { return c.clusterNodeCount } +func (c *nodeClient) NodeToCapacityMap() map[string]v1.ResourceList { + if !c.captureNodeLevelInfo { + c.logger.Warn("trying to access node level info when captureNodeLevelInfo is not set, will return empty data") + } + if c.store.GetResetRefreshStatus() { + c.refresh() + } + c.mu.RLock() + defer c.mu.RUnlock() + return c.nodeToCapacityMap +} + +func (c *nodeClient) NodeToAllocatableMap() map[string]v1.ResourceList { + if !c.captureNodeLevelInfo { + c.logger.Warn("trying to access node level info when captureNodeLevelInfo is not set, will return empty data") + } + if c.store.GetResetRefreshStatus() { + c.refresh() + } + c.mu.RLock() + defer c.mu.RUnlock() + return c.nodeToAllocatableMap +} + +func (c *nodeClient) NodeToConditionsMap() map[string]map[v1.NodeConditionType]v1.ConditionStatus { + if !c.captureNodeLevelInfo { + c.logger.Warn("trying to access node level info when captureNodeLevelInfo is not set, will return empty data") + } + if c.store.GetResetRefreshStatus() { + c.refresh() + } + c.mu.RLock() + defer c.mu.RUnlock() + return c.nodeToConditionsMap +} + func (c *nodeClient) refresh() { c.mu.Lock() defer c.mu.Unlock() @@ -89,9 +152,22 @@ func (c *nodeClient) refresh() { clusterFailedNodeCountNew := 0 clusterNodeCountNew := 0 + nodeToCapacityMap := make(map[string]v1.ResourceList) + nodeToAllocatableMap := make(map[string]v1.ResourceList) + nodeToConditionsMap := make(map[string]map[v1.NodeConditionType]v1.ConditionStatus) + for _, obj := range objsList { node := obj.(*nodeInfo) + if c.captureNodeLevelInfo { + nodeToCapacityMap[node.name] = node.capacity + nodeToAllocatableMap[node.name] = node.allocatable + conditionsMap := make(map[v1.NodeConditionType]v1.ConditionStatus) + for _, condition := range node.conditions { + conditionsMap[condition.Type] = condition.Status + } + nodeToConditionsMap[node.name] = conditionsMap + } clusterNodeCountNew++ failed := false @@ -115,11 +191,15 @@ func (c *nodeClient) refresh() { c.clusterFailedNodeCount = clusterFailedNodeCountNew c.clusterNodeCount = clusterNodeCountNew + c.nodeToCapacityMap = nodeToCapacityMap + c.nodeToAllocatableMap = nodeToAllocatableMap + c.nodeToConditionsMap = nodeToConditionsMap } func newNodeClient(clientSet kubernetes.Interface, logger *zap.Logger, options ...nodeClientOption) *nodeClient { c := &nodeClient{ stopChan: make(chan struct{}), + logger: logger, } for _, option := range options { @@ -128,7 +208,7 @@ func newNodeClient(clientSet kubernetes.Interface, logger *zap.Logger, options . c.store = NewObjStore(transformFuncNode, logger) - lw := createNodeListWatch(clientSet) + lw := c.createNodeListWatch(clientSet) reflector := cache.NewReflector(lw, &v1.Node{}, c.store, 0) go reflector.Run(c.stopChan) @@ -154,9 +234,12 @@ func transformFuncNode(obj interface{}) (interface{}, error) { return nil, fmt.Errorf("input obj %v is not Node type", obj) } info := new(nodeInfo) - info.conditions = []*nodeCondition{} + info.name = node.Name + info.capacity = node.Status.Capacity + info.allocatable = node.Status.Allocatable + info.conditions = []*NodeCondition{} for _, condition := range node.Status.Conditions { - info.conditions = append(info.conditions, &nodeCondition{ + info.conditions = append(info.conditions, &NodeCondition{ Type: condition.Type, Status: condition.Status, }) @@ -164,13 +247,19 @@ func transformFuncNode(obj interface{}) (interface{}, error) { return info, nil } -func createNodeListWatch(client kubernetes.Interface) cache.ListerWatcher { +func (c *nodeClient) createNodeListWatch(client kubernetes.Interface) cache.ListerWatcher { ctx := context.Background() return &cache.ListWatch{ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + if c.nodeSelector != nil { + opts.FieldSelector = c.nodeSelector.String() + } return client.CoreV1().Nodes().List(ctx, opts) }, WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + if c.nodeSelector != nil { + opts.FieldSelector = c.nodeSelector.String() + } return client.CoreV1().Nodes().Watch(ctx, opts) }, } diff --git a/internal/aws/k8s/k8sclient/node_info.go b/internal/aws/k8s/k8sclient/node_info.go index ca01fc4545dd..09842a476a44 100644 --- a/internal/aws/k8s/k8sclient/node_info.go +++ b/internal/aws/k8s/k8sclient/node_info.go @@ -19,10 +19,13 @@ import ( ) type nodeInfo struct { - conditions []*nodeCondition + name string + conditions []*NodeCondition + capacity v1.ResourceList + allocatable v1.ResourceList } -type nodeCondition struct { +type NodeCondition struct { Type v1.NodeConditionType Status v1.ConditionStatus } diff --git a/internal/aws/k8s/k8sclient/node_test.go b/internal/aws/k8s/k8sclient/node_test.go index d534a5a28a65..40d28877fdea 100644 --- a/internal/aws/k8s/k8sclient/node_test.go +++ b/internal/aws/k8s/k8sclient/node_test.go @@ -15,13 +15,14 @@ package k8sclient import ( - "log" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" ) @@ -53,6 +54,12 @@ var nodeArray = []interface{}{ }, }, Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(5, resource.DecimalSI), + }, + Allocatable: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(5, resource.DecimalSI), + }, Conditions: []v1.NodeCondition{ { Type: "MemoryPressure", @@ -143,6 +150,12 @@ var nodeArray = []interface{}{ }, }, Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), + }, + Allocatable: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), + }, Conditions: []v1.NodeCondition{ { Type: "MemoryPressure", @@ -233,6 +246,12 @@ var nodeArray = []interface{}{ }, }, Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(5, resource.DecimalSI), + }, + Allocatable: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(1, resource.DecimalSI), + }, Conditions: []v1.NodeCondition{ { Type: "MemoryPressure", @@ -272,7 +291,7 @@ var nodeArray = []interface{}{ }, { Type: "Ready", - Status: "True", + Status: "False", LastHeartbeatTime: metav1.Time{ Time: time.Now(), }, @@ -280,7 +299,7 @@ var nodeArray = []interface{}{ Time: time.Now(), }, Reason: "KubeletReady", - Message: "kubelet is posting ready status", + Message: "kubelet is not posting ready status", }, }, NodeInfo: v1.NodeSystemInfo{ @@ -300,22 +319,91 @@ var nodeArray = []interface{}{ } func TestNodeClient(t *testing.T) { - setOption := nodeSyncCheckerOption(&mockReflectorSyncChecker{}) - - fakeClientSet := fake.NewSimpleClientset() - client := newNodeClient(fakeClientSet, zap.NewNop(), setOption) - assert.NoError(t, client.store.Replace(nodeArray, "")) + testCases := map[string]struct { + options []nodeClientOption + want map[string]interface{} + }{ + "Default": { + options: []nodeClientOption{ + nodeSyncCheckerOption(&mockReflectorSyncChecker{}), + }, + want: map[string]interface{}{ + "clusterNodeCount": 3, + "clusterFailedNodeCount": 1, + "nodeToCapacityMap": map[string]v1.ResourceList{}, // Node level info is not captured by default + "nodeToAllocatableMap": map[string]v1.ResourceList{}, // Node level info is not captured by default + "nodeToConditionsMap": map[string]map[v1.NodeConditionType]v1.ConditionStatus{}, // Node level info is not captured by default + }, + }, + "CaptureNodeLevelInfo": { + options: []nodeClientOption{ + nodeSyncCheckerOption(&mockReflectorSyncChecker{}), + captureNodeLevelInfoOption(true), + }, + want: map[string]interface{}{ + "clusterNodeCount": 3, + "clusterFailedNodeCount": 1, + "nodeToCapacityMap": map[string]v1.ResourceList{ + "ip-192-168-200-63.eu-west-1.compute.internal": { + "pods": *resource.NewQuantity(5, resource.DecimalSI), + }, + "ip-192-168-76-61.eu-west-1.compute.internal": { + "pods": *resource.NewQuantity(10, resource.DecimalSI), + }, + "ip-192-168-153-1.eu-west-1.compute.internal": { + "pods": *resource.NewQuantity(5, resource.DecimalSI), + }, + }, + "nodeToAllocatableMap": map[string]v1.ResourceList{ + "ip-192-168-200-63.eu-west-1.compute.internal": { + "pods": *resource.NewQuantity(5, resource.DecimalSI), + }, + "ip-192-168-76-61.eu-west-1.compute.internal": { + "pods": *resource.NewQuantity(10, resource.DecimalSI), + }, + "ip-192-168-153-1.eu-west-1.compute.internal": { + "pods": *resource.NewQuantity(1, resource.DecimalSI), + }, + }, + "nodeToConditionsMap": map[string]map[v1.NodeConditionType]v1.ConditionStatus{ + "ip-192-168-200-63.eu-west-1.compute.internal": { + "DiskPressure": "False", + "MemoryPressure": "False", + "PIDPressure": "False", + "Ready": "True", + }, + "ip-192-168-76-61.eu-west-1.compute.internal": { + "DiskPressure": "False", + "MemoryPressure": "False", + "PIDPressure": "False", + "Ready": "True", + }, + "ip-192-168-153-1.eu-west-1.compute.internal": { + "DiskPressure": "True", + "MemoryPressure": "False", + "PIDPressure": "False", + "Ready": "False", + }, + }, + }, + }, + } + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + fakeClientSet := fake.NewSimpleClientset() + client := newNodeClient(fakeClientSet, zap.NewNop(), testCase.options...) + assert.NoError(t, client.store.Replace(nodeArray, "")) - expectedClusterNodeCount := 3 - expectedClusterFailedNodeCount := 1 - clusterNodeCount := client.ClusterNodeCount() - clusterFailedNodeCount := client.ClusterFailedNodeCount() - log.Printf("clusterNodeCount: %v, clusterFailedNodeCount: %v", clusterNodeCount, clusterFailedNodeCount) + require.Equal(t, testCase.want["clusterNodeCount"], client.ClusterNodeCount()) + require.Equal(t, testCase.want["clusterFailedNodeCount"], client.ClusterFailedNodeCount()) + require.Equal(t, testCase.want["nodeToCapacityMap"], client.NodeToCapacityMap()) + require.Equal(t, testCase.want["nodeToAllocatableMap"], client.NodeToAllocatableMap()) + require.Equal(t, testCase.want["nodeToConditionsMap"], client.NodeToConditionsMap()) - assert.Equal(t, clusterNodeCount, expectedClusterNodeCount) - assert.Equal(t, clusterFailedNodeCount, expectedClusterFailedNodeCount) - client.shutdown() - assert.True(t, client.stopped) + client.shutdown() + assert.True(t, client.stopped) + }) + } } func TestTransformFuncNode(t *testing.T) { diff --git a/receiver/awscontainerinsightreceiver/README.md b/receiver/awscontainerinsightreceiver/README.md index 13c7090c28df..32302aa645a8 100644 --- a/receiver/awscontainerinsightreceiver/README.md +++ b/receiver/awscontainerinsightreceiver/README.md @@ -400,42 +400,49 @@ kubectl apply -f config.yaml

### Node -| Metric | Unit | -|-------------------------------------|---------------| -| node_cpu_limit | Millicore | -| node_cpu_request | Millicore | -| node_cpu_reserved_capacity | Percent | -| node_cpu_usage_system | Millicore | -| node_cpu_usage_total | Millicore | -| node_cpu_usage_user | Millicore | -| node_cpu_utilization | Percent | -| node_memory_cache | Bytes | -| node_memory_failcnt | Count | -| node_memory_hierarchical_pgfault | Count/Second | -| node_memory_hierarchical_pgmajfault | Count/Second | -| node_memory_limit | Bytes | -| node_memory_mapped_file | Bytes | -| node_memory_max_usage | Bytes | -| node_memory_pgfault | Count/Second | -| node_memory_pgmajfault | Count/Second | -| node_memory_request | Bytes | -| node_memory_reserved_capacity | Percent | -| node_memory_rss | Bytes | -| node_memory_swap | Bytes | -| node_memory_usage | Bytes | -| node_memory_utilization | Percent | -| node_memory_working_set | Bytes | -| node_network_rx_bytes | Bytes/Second | -| node_network_rx_dropped | Count/Second | -| node_network_rx_errors | Count/Second | -| node_network_rx_packets | Count/Second | -| node_network_total_bytes | Bytes/Second | -| node_network_tx_bytes | Bytes/Second | -| node_network_tx_dropped | Count/Second | -| node_network_tx_errors | Count/Second | -| node_network_tx_packets | Count/Second | -| node_number_of_running_containers | Count | -| node_number_of_running_pods | Count | +| Metric | Unit | +|-------------------------------------------|--------------| +| node_cpu_limit | Millicore | +| node_cpu_request | Millicore | +| node_cpu_reserved_capacity | Percent | +| node_cpu_usage_system | Millicore | +| node_cpu_usage_total | Millicore | +| node_cpu_usage_user | Millicore | +| node_cpu_utilization | Percent | +| node_memory_cache | Bytes | +| node_memory_failcnt | Count | +| node_memory_hierarchical_pgfault | Count/Second | +| node_memory_hierarchical_pgmajfault | Count/Second | +| node_memory_limit | Bytes | +| node_memory_mapped_file | Bytes | +| node_memory_max_usage | Bytes | +| node_memory_pgfault | Count/Second | +| node_memory_pgmajfault | Count/Second | +| node_memory_request | Bytes | +| node_memory_reserved_capacity | Percent | +| node_memory_rss | Bytes | +| node_memory_swap | Bytes | +| node_memory_usage | Bytes | +| node_memory_utilization | Percent | +| node_memory_working_set | Bytes | +| node_network_rx_bytes | Bytes/Second | +| node_network_rx_dropped | Count/Second | +| node_network_rx_errors | Count/Second | +| node_network_rx_packets | Count/Second | +| node_network_total_bytes | Bytes/Second | +| node_network_tx_bytes | Bytes/Second | +| node_network_tx_dropped | Count/Second | +| node_network_tx_errors | Count/Second | +| node_network_tx_packets | Count/Second | +| node_number_of_running_containers | Count | +| node_number_of_running_pods | Count | +| node_status_condition_ready | Count | +| node_status_condition_pid_pressure | Count | +| node_status_condition_memory_pressure | Count | +| node_status_condition_disk_pressure | Count | +| node_status_condition_network_unavailable | Count | +| node_status_capacity_pods | Count | +| node_status_allocatable_pods | Count |

| Resource Attribute | diff --git a/receiver/awscontainerinsightreceiver/internal/stores/nodeinfo.go b/receiver/awscontainerinsightreceiver/internal/stores/nodeinfo.go index 38cca796be64..4d863f747238 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/nodeinfo.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/nodeinfo.go @@ -19,6 +19,7 @@ import ( "sync" "go.uber.org/zap" + v1 "k8s.io/api/core/v1" ) type nodeStats struct { @@ -29,6 +30,9 @@ type nodeStats struct { } type nodeInfo struct { + nodeName string + provider nodeInfoProvider + statsLock sync.RWMutex nodeStats nodeStats @@ -41,9 +45,17 @@ type nodeInfo struct { logger *zap.Logger } -func newNodeInfo(logger *zap.Logger) *nodeInfo { +type nodeInfoProvider interface { + NodeToCapacityMap() map[string]v1.ResourceList + NodeToAllocatableMap() map[string]v1.ResourceList + NodeToConditionsMap() map[string]map[v1.NodeConditionType]v1.ConditionStatus +} + +func newNodeInfo(nodeName string, provider nodeInfoProvider, logger *zap.Logger) *nodeInfo { nc := &nodeInfo{ - logger: logger, + nodeName: nodeName, + provider: provider, + logger: logger, } return nc } @@ -84,6 +96,36 @@ func (n *nodeInfo) getNodeStats() nodeStats { return n.nodeStats } +func (n *nodeInfo) getNodeStatusCapacityPods() (uint64, bool) { + capacityResources, ok := n.provider.NodeToCapacityMap()[n.nodeName] + if !ok { + return 0, false + } + pods, _ := capacityResources.Pods().AsInt64() + return forceConvertToInt64(pods, n.logger), true +} + +func (n *nodeInfo) getNodeStatusAllocatablePods() (uint64, bool) { + allocatableResources, ok := n.provider.NodeToAllocatableMap()[n.nodeName] + if !ok { + return 0, false + } + pods, _ := allocatableResources.Pods().AsInt64() + return forceConvertToInt64(pods, n.logger), true +} + +func (n *nodeInfo) getNodeStatusCondition(conditionType v1.NodeConditionType) (uint64, bool) { + if nodeConditions, ok := n.provider.NodeToConditionsMap()[n.nodeName]; ok { + if conditionType, ok := nodeConditions[conditionType]; ok { + if conditionType == v1.ConditionTrue { + return 1, true + } + return 0, true // v1.ConditionFalse or v1.ConditionUnknown + } + } + return 0, false +} + func forceConvertToInt64(v interface{}, logger *zap.Logger) uint64 { var value uint64 diff --git a/receiver/awscontainerinsightreceiver/internal/stores/nodeinfo_test.go b/receiver/awscontainerinsightreceiver/internal/stores/nodeinfo_test.go index c3461df5cce9..1874a0b68377 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/nodeinfo_test.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/nodeinfo_test.go @@ -19,10 +19,11 @@ import ( "github.com/stretchr/testify/assert" "go.uber.org/zap" + v1 "k8s.io/api/core/v1" ) func TestSetGetCPUCapacity(t *testing.T) { - nodeInfo := newNodeInfo(zap.NewNop()) + nodeInfo := newNodeInfo("testNode1", &mockNodeInfoProvider{}, zap.NewNop()) nodeInfo.setCPUCapacity(int(4)) assert.Equal(t, uint64(4), nodeInfo.getCPUCapacity()) @@ -55,7 +56,7 @@ func TestSetGetCPUCapacity(t *testing.T) { } func TestSetGetMemCapacity(t *testing.T) { - nodeInfo := newNodeInfo(zap.NewNop()) + nodeInfo := newNodeInfo("testNode1", &mockNodeInfoProvider{}, zap.NewNop()) nodeInfo.setMemCapacity(int(2048)) assert.Equal(t, uint64(2048), nodeInfo.getMemCapacity()) @@ -82,3 +83,56 @@ func TestSetGetMemCapacity(t *testing.T) { nodeInfo.setMemCapacity(int64(-2)) assert.Equal(t, uint64(0), nodeInfo.getMemCapacity()) } + +func TestGetNodeStatusCapacityPods(t *testing.T) { + nodeInfo := newNodeInfo("testNode1", &mockNodeInfoProvider{}, zap.NewNop()) + nodeStatusCapacityPods, valid := nodeInfo.getNodeStatusCapacityPods() + assert.True(t, valid) + assert.Equal(t, uint64(5), nodeStatusCapacityPods) + + nodeInfo = newNodeInfo("testNodeNonExistent", &mockNodeInfoProvider{}, zap.NewNop()) + nodeStatusCapacityPods, valid = nodeInfo.getNodeStatusCapacityPods() + assert.False(t, valid) + assert.Equal(t, uint64(0), nodeStatusCapacityPods) +} + +func TestGetNodeStatusAllocatablePods(t *testing.T) { + nodeInfo := newNodeInfo("testNode1", &mockNodeInfoProvider{}, zap.NewNop()) + nodeStatusAllocatablePods, valid := nodeInfo.getNodeStatusAllocatablePods() + assert.True(t, valid) + assert.Equal(t, uint64(15), nodeStatusAllocatablePods) + + nodeInfo = newNodeInfo("testNodeNonExistent", &mockNodeInfoProvider{}, zap.NewNop()) + nodeStatusAllocatablePods, valid = nodeInfo.getNodeStatusAllocatablePods() + assert.False(t, valid) + assert.Equal(t, uint64(0), nodeStatusAllocatablePods) +} + +func TestGetNodeStatusCondition(t *testing.T) { + nodeInfo := newNodeInfo("testNode1", &mockNodeInfoProvider{}, zap.NewNop()) + nodeStatusCondition, valid := nodeInfo.getNodeStatusCondition(v1.NodeReady) + assert.True(t, valid) + assert.Equal(t, uint64(1), nodeStatusCondition) + nodeStatusCondition, valid = nodeInfo.getNodeStatusCondition(v1.NodeDiskPressure) + assert.True(t, valid) + assert.Equal(t, uint64(0), nodeStatusCondition) + nodeStatusCondition, valid = nodeInfo.getNodeStatusCondition(v1.NodeMemoryPressure) + assert.True(t, valid) + assert.Equal(t, uint64(0), nodeStatusCondition) + nodeStatusCondition, valid = nodeInfo.getNodeStatusCondition(v1.NodePIDPressure) + assert.True(t, valid) + assert.Equal(t, uint64(0), nodeStatusCondition) + nodeStatusCondition, valid = nodeInfo.getNodeStatusCondition(v1.NodeNetworkUnavailable) + assert.True(t, valid) + assert.Equal(t, uint64(0), nodeStatusCondition) + + nodeInfo = newNodeInfo("testNode2", &mockNodeInfoProvider{}, zap.NewNop()) + nodeStatusCondition, valid = nodeInfo.getNodeStatusCondition(v1.NodeNetworkUnavailable) + assert.False(t, valid) + assert.Equal(t, uint64(0), nodeStatusCondition) + + nodeInfo = newNodeInfo("testNodeNonExistent", &mockNodeInfoProvider{}, zap.NewNop()) + nodeStatusCondition, valid = nodeInfo.getNodeStatusCondition(v1.NodeReady) + assert.False(t, valid) + assert.Equal(t, uint64(0), nodeStatusCondition) +} diff --git a/receiver/awscontainerinsightreceiver/internal/stores/podstore.go b/receiver/awscontainerinsightreceiver/internal/stores/podstore.go index a259ec69e2d5..039b6c207742 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/podstore.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/podstore.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "log" + "os" "regexp" "strings" "sync" @@ -26,6 +27,7 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/fields" ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient" @@ -123,7 +125,15 @@ func NewPodStore(hostIP string, prefFullPodName bool, addFullPodNameMetricLabel return nil, fmt.Errorf("cannot get pod from kubelet, err: %w", err) } - k8sClient := k8sclient.Get(logger) + nodeName := os.Getenv("HOST_NAME") + if nodeName == "" { + return nil, errors.New("missing environment variable HOST_NAME. Please check your deployment YAML config") + } + + k8sClient := k8sclient.Get(logger, + k8sclient.NodeSelector(fields.OneTermEqualSelector("metadata.name", nodeName)), + k8sclient.CaptureNodeLevelInfo(true), + ) if k8sClient == nil { return nil, errors.New("failed to start pod store because k8sclient is nil") } @@ -132,7 +142,7 @@ func NewPodStore(hostIP string, prefFullPodName bool, addFullPodNameMetricLabel cache: newMapWithExpiry(podsExpiry), prevMeasurements: make(map[string]*mapWithExpiry), podClient: podClient, - nodeInfo: newNodeInfo(logger), + nodeInfo: newNodeInfo(nodeName, k8sClient.GetNodeClient(), logger), prefFullPodName: prefFullPodName, k8sClient: k8sClient, logger: logger, @@ -324,6 +334,28 @@ func (p *PodStore) decorateNode(metric CIMetric) { metric.AddField(ci.MetricName(ci.TypeNode, ci.RunningPodCount), nodeStats.podCnt) metric.AddField(ci.MetricName(ci.TypeNode, ci.RunningContainerCount), nodeStats.containerCnt) + + if nodeStatusCapacityPods, ok := p.nodeInfo.getNodeStatusCapacityPods(); ok { + metric.AddField(ci.MetricName(ci.TypeNode, ci.StatusCapacityPods), nodeStatusCapacityPods) + } + if nodeStatusAllocatablePods, ok := p.nodeInfo.getNodeStatusAllocatablePods(); ok { + metric.AddField(ci.MetricName(ci.TypeNode, ci.StatusAllocatablePods), nodeStatusAllocatablePods) + } + if nodeStatusConditionReady, ok := p.nodeInfo.getNodeStatusCondition(corev1.NodeReady); ok { + metric.AddField(ci.MetricName(ci.TypeNode, ci.StatusConditionReady), nodeStatusConditionReady) + } + if nodeStatusConditionDiskPressure, ok := p.nodeInfo.getNodeStatusCondition(corev1.NodeDiskPressure); ok { + metric.AddField(ci.MetricName(ci.TypeNode, ci.StatusConditionDiskPressure), nodeStatusConditionDiskPressure) + } + if nodeStatusConditionMemoryPressure, ok := p.nodeInfo.getNodeStatusCondition(corev1.NodeMemoryPressure); ok { + metric.AddField(ci.MetricName(ci.TypeNode, ci.StatusConditionMemoryPressure), nodeStatusConditionMemoryPressure) + } + if nodeStatusConditionPIDPressure, ok := p.nodeInfo.getNodeStatusCondition(corev1.NodePIDPressure); ok { + metric.AddField(ci.MetricName(ci.TypeNode, ci.StatusConditionPIDPressure), nodeStatusConditionPIDPressure) + } + if nodeStatusConditionNetworkUnavailable, ok := p.nodeInfo.getNodeStatusCondition(corev1.NodeNetworkUnavailable); ok { + metric.AddField(ci.MetricName(ci.TypeNode, ci.StatusConditionNetworkUnavailable), nodeStatusConditionNetworkUnavailable) + } } func (p *PodStore) decorateCPU(metric CIMetric, pod *corev1.Pod) { diff --git a/receiver/awscontainerinsightreceiver/internal/stores/podstore_test.go b/receiver/awscontainerinsightreceiver/internal/stores/podstore_test.go index 05a6b3813bb5..1ee568a3d0f1 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/podstore_test.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/podstore_test.go @@ -197,7 +197,7 @@ func getBaseTestPodInfo() *corev1.Pod { } func getPodStore() *PodStore { - nodeInfo := newNodeInfo(zap.NewNop()) + nodeInfo := newNodeInfo("testNode1", &mockNodeInfoProvider{}, zap.NewNop()) nodeInfo.setCPUCapacity(4000) nodeInfo.setMemCapacity(400 * 1024 * 1024) return &PodStore{ @@ -600,6 +600,7 @@ func TestPodStore_RefreshTick(t *testing.T) { } func TestPodStore_decorateNode(t *testing.T) { + t.Setenv("HOST_NAME", "testNode1") pod := getBaseTestPodInfo() podList := []corev1.Pod{*pod} @@ -629,6 +630,15 @@ func TestPodStore_decorateNode(t *testing.T) { assert.Equal(t, int(1), metric.GetField("node_number_of_running_containers").(int)) assert.Equal(t, int(1), metric.GetField("node_number_of_running_pods").(int)) + + assert.Equal(t, uint64(1), metric.GetField("node_status_condition_ready").(uint64)) + assert.Equal(t, uint64(0), metric.GetField("node_status_condition_disk_pressure").(uint64)) + assert.Equal(t, uint64(0), metric.GetField("node_status_condition_memory_pressure").(uint64)) + assert.Equal(t, uint64(0), metric.GetField("node_status_condition_pid_pressure").(uint64)) + assert.Equal(t, uint64(0), metric.GetField("node_status_condition_network_unavailable").(uint64)) + + assert.Equal(t, uint64(5), metric.GetField("node_status_capacity_pods").(uint64)) + assert.Equal(t, uint64(15), metric.GetField("node_status_allocatable_pods").(uint64)) } func TestPodStore_Decorate(t *testing.T) { diff --git a/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go b/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go index 3c4702c57a9c..ddbf5f9737ed 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go @@ -20,6 +20,8 @@ import ( "time" "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" ) @@ -57,6 +59,50 @@ func (m *mockCIMetric) RemoveTag(key string) { delete(m.tags, key) } +type mockNodeInfoProvider struct { +} + +func (m *mockNodeInfoProvider) NodeToCapacityMap() map[string]v1.ResourceList { + return map[string]v1.ResourceList{ + "testNode1": { + "pods": *resource.NewQuantity(5, resource.DecimalSI), + }, + "testNode2": { + "pods": *resource.NewQuantity(10, resource.DecimalSI), + }, + } +} + +func (m *mockNodeInfoProvider) NodeToAllocatableMap() map[string]v1.ResourceList { + return map[string]v1.ResourceList{ + "testNode1": { + "pods": *resource.NewQuantity(15, resource.DecimalSI), + }, + "testNode2": { + "pods": *resource.NewQuantity(20, resource.DecimalSI), + }, + } +} + +func (m *mockNodeInfoProvider) NodeToConditionsMap() map[string]map[v1.NodeConditionType]v1.ConditionStatus { + return map[string]map[v1.NodeConditionType]v1.ConditionStatus{ + "testNode1": { + v1.NodeReady: v1.ConditionTrue, + v1.NodeDiskPressure: v1.ConditionFalse, + v1.NodeMemoryPressure: v1.ConditionFalse, + v1.NodePIDPressure: v1.ConditionFalse, + v1.NodeNetworkUnavailable: v1.ConditionUnknown, + }, + "testNode2": { + v1.NodeReady: v1.ConditionFalse, + v1.NodeDiskPressure: v1.ConditionTrue, + v1.NodeMemoryPressure: v1.ConditionFalse, + v1.NodePIDPressure: v1.ConditionFalse, + // v1.NodeNetworkUnavailable: v1.ConditionFalse, Commented out intentionally to test missing scenario + }, + } +} + func TestUtils_parseDeploymentFromReplicaSet(t *testing.T) { assert.Equal(t, "", parseDeploymentFromReplicaSet("cloudwatch-agent")) assert.Equal(t, "cloudwatch-agent", parseDeploymentFromReplicaSet("cloudwatch-agent-42kcz"))