From 147a7ae7c9cb3b8cfaccc0665f0ee5470e97ed8d Mon Sep 17 00:00:00 2001 From: Reham Tarek Date: Wed, 14 Aug 2024 19:34:41 +0100 Subject: [PATCH] Add Resiliency Metrics --- internal/aws/containerinsight/const.go | 18 ++++ internal/aws/containerinsight/utils.go | 6 +- internal/aws/containerinsight/utils_test.go | 37 ++++++++ internal/aws/k8s/k8sclient/clientset.go | 19 +++- internal/aws/k8s/k8sclient/node.go | 95 ++++++++++++++++--- internal/aws/k8s/k8sclient/node_info.go | 22 ++++- internal/aws/k8s/k8sclient/node_test.go | 41 +++++--- internal/aws/k8s/k8sutil/util.go | 31 ++++++ .../internal/k8sapiserver/k8sapiserver.go | 43 ++++++++- .../k8sapiserver/k8sapiserver_test.go | 44 +++++++++ .../internal/k8sapiserver/leaderelection.go | 6 +- .../internal/k8sapiserver/utils.go | 13 +++ .../internal/k8sapiserver/utils_test.go | 19 ++++ .../internal/stores/nodeinfo.go | 3 + .../internal/stores/utils_test.go | 11 +++ 15 files changed, 370 insertions(+), 38 deletions(-) diff --git a/internal/aws/containerinsight/const.go b/internal/aws/containerinsight/const.go index d4488249005e..186053800a76 100644 --- a/internal/aws/containerinsight/const.go +++ b/internal/aws/containerinsight/const.go @@ -146,6 +146,11 @@ const ( GpuRequest = "gpu_request" GpuReservedCapacity = "gpu_reserved_capacity" + HyperPodUnschedulablePendingReplacement = "unschedulable_pending_replacement" + HyperPodUnschedulablePendingReboot = "unschedulable_pending_reboot" + HyperPodSchedulable = "schedulable" + HyperPodUnschedulable = "unschedulable" + // Define the metric types TypeCluster = "Cluster" TypeClusterService = "ClusterService" @@ -179,6 +184,7 @@ const ( TypeContainerEFA = "ContainerEFA" TypePodEFA = "PodEFA" TypeNodeEFA = "NodeEFA" + TypeHyperPodNode = "HyperPodNode" // unit UnitBytes = "Bytes" @@ -202,6 +208,13 @@ var WaitingReasonLookup = map[string]string{ "StartError": StatusContainerWaitingReasonStartError, } +var HyperPodConditionToMetric = map[string]string{ + "UnschedulablePendingReplacement": HyperPodUnschedulablePendingReplacement, + "UnschedulablePendingReboot": HyperPodUnschedulablePendingReboot, + "Schedulable": HyperPodSchedulable, + "Unschedulable": HyperPodUnschedulable, +} + var metricToUnitMap map[string]string func init() { @@ -330,5 +343,10 @@ func init() { GpuUsageTotal: UnitCount, GpuRequest: UnitCount, GpuReservedCapacity: UnitPercent, + + HyperPodUnschedulablePendingReplacement: UnitCount, + HyperPodUnschedulablePendingReboot: UnitCount, + HyperPodSchedulable: UnitCount, + HyperPodUnschedulable: UnitCount, } } diff --git a/internal/aws/containerinsight/utils.go b/internal/aws/containerinsight/utils.go index 4c0b6fa15d0d..d81b016bac14 100644 --- a/internal/aws/containerinsight/utils.go +++ b/internal/aws/containerinsight/utils.go @@ -58,7 +58,8 @@ func IsNode(mType string) bool { TypeNodeEFA, TypeNodeFS, TypeNodeGPU, - TypeNodeNet: + TypeNodeNet, + TypeHyperPodNode: return true } return false @@ -107,6 +108,7 @@ func getPrefixByMetricType(mType string) string { instanceNetPrefix := "instance_interface_" nodeNetPrefix := "node_interface_" nodeEfaPrefix := "node_efa_" + hyperPodNodeHealthStatus := "hyper_pod_node_health_status_" podPrefix := "pod_" podNetPrefix := "pod_interface_" podEfaPrefix := "pod_efa_" @@ -169,6 +171,8 @@ func getPrefixByMetricType(mType string) string { prefix = statefulSet case TypeClusterReplicaSet: prefix = replicaSet + case TypeHyperPodNode: + prefix = hyperPodNodeHealthStatus default: log.Printf("E! Unexpected MetricType: %s", mType) } diff --git a/internal/aws/containerinsight/utils_test.go b/internal/aws/containerinsight/utils_test.go index b9f3e878bf26..68b991d05a2b 100644 --- a/internal/aws/containerinsight/utils_test.go +++ b/internal/aws/containerinsight/utils_test.go @@ -79,6 +79,7 @@ func TestIsNode(t *testing.T) { assert.Equal(t, true, IsNode(TypeNodeGPU)) assert.Equal(t, true, IsNode(TypeNodeNet)) assert.Equal(t, false, IsNode(TypePod)) + assert.Equal(t, true, IsNode(TypeHyperPodNode)) } func TestIsInstance(t *testing.T) { @@ -929,3 +930,39 @@ func TestConvertToOTLPMetricsForPodEfaMetrics(t *testing.T) { md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) checkMetricsAreExpected(t, md, fields, tags, expectedUnits) } + +func TestConvertToOTLPMetricsForHyperPodNodeMetrics(t *testing.T) { + var fields map[string]any + var expectedUnits map[string]string + var tags map[string]string + var md pmetric.Metrics + now := time.Now() + timestamp := strconv.FormatInt(now.UnixNano(), 10) + + fields = map[string]any{ + "unschedulable_pending_replacement": 0, + "unschedulable_pending_reboot": 0, + "schedulable": 1, + "unschedulable": 0, + } + expectedUnits = map[string]string{ + "unschedulable_pending_replacement": UnitCount, + "unschedulable_pending_reboot": UnitCount, + "schedulable": UnitCount, + "unschedulable": UnitCount, + } + tags = map[string]string{ + "ClusterName": "eks-aoc", + "InstanceId": "i-01bf9fb097cbf3205", + "InstanceType": "t2.xlarge", + "Namespace": "amazon-cloudwatch", + "NodeName": "hyperpod-ip-192-168-12-170.ec2.internal", + "PodName": "cloudwatch-agent", + "ContainerName": "cloudwatch-agent", + "Type": "HyperPodNode", + "Version": "0", + "Timestamp": timestamp, + } + md = ConvertToOTLPMetrics(fields, tags, zap.NewNop()) + checkMetricsAreExpected(t, md, fields, tags, expectedUnits) +} diff --git a/internal/aws/k8s/k8sclient/clientset.go b/internal/aws/k8s/k8sclient/clientset.go index 0db4ec06c610..9739c18caf8b 100644 --- a/internal/aws/k8s/k8sclient/clientset.go +++ b/internal/aws/k8s/k8sclient/clientset.go @@ -138,6 +138,17 @@ func CaptureNodeLevelInfo(captureNodeLevelInfo bool) Option { } } +// CaptureOnlyNodeLabelsInfo allows one to specify whether node label +// should be captured and retained in memory +func CaptureOnlyNodeLabelsInfo(captureOnlyNodeLabelInfo bool) Option { + return Option{ + name: "captureOnlyNodeLabelInfo:" + strconv.FormatBool(captureOnlyNodeLabelInfo), + set: func(kc *K8sClient) { + kc.captureOnlyNodeLabelInfo = captureOnlyNodeLabelInfo + }, + } +} + func getStringifiedOptions(options ...Option) string { opts := make([]string, len(options)) for i, option := range options { @@ -225,8 +236,9 @@ type K8sClient struct { nodeMu sync.Mutex node nodeClientWithStopper - nodeSelector fields.Selector - captureNodeLevelInfo bool + nodeSelector fields.Selector + captureNodeLevelInfo bool + captureOnlyNodeLabelInfo bool jobMu sync.Mutex job jobClientWithStopper @@ -326,7 +338,8 @@ func (c *K8sClient) ShutdownPodClient() { func (c *K8sClient) GetNodeClient() NodeClient { c.nodeMu.Lock() if c.node == nil { - opts := []nodeClientOption{nodeSyncCheckerOption(c.syncChecker), captureNodeLevelInfoOption(c.captureNodeLevelInfo)} + opts := []nodeClientOption{nodeSyncCheckerOption(c.syncChecker), captureNodeLevelInfoOption(c.captureNodeLevelInfo), + captureOnlyNodeLabelInfoOption(c.captureOnlyNodeLabelInfo)} if c.nodeSelector != nil { opts = append(opts, nodeSelectorOption(c.nodeSelector)) } diff --git a/internal/aws/k8s/k8sclient/node.go b/internal/aws/k8s/k8sclient/node.go index 8682417c590d..ef9da26fd1d8 100644 --- a/internal/aws/k8s/k8sclient/node.go +++ b/internal/aws/k8s/k8sclient/node.go @@ -16,6 +16,13 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sutil" +) + +const ( + instanceTypeLabelKey = "node.kubernetes.io/instance-type" + instanceTypeLabelKeyBeta = "beta.kubernetes.io/instance-type" ) // This needs to be reviewed for newer versions of k8s. @@ -27,6 +34,7 @@ var failedNodeConditions = map[v1.NodeConditionType]bool{ } type NodeClient interface { + NodeInfos() map[string]*NodeInfo // Get the number of failed nodes for current cluster ClusterFailedNodeCount() int // Get the number of nodes for current cluster @@ -34,6 +42,7 @@ type NodeClient interface { NodeToCapacityMap() map[string]v1.ResourceList NodeToAllocatableMap() map[string]v1.ResourceList NodeToConditionsMap() map[string]map[v1.NodeConditionType]v1.ConditionStatus + NodeToLabelsMap() map[string]map[Label]int8 } type nodeClientOption func(*nodeClient) @@ -56,6 +65,12 @@ func captureNodeLevelInfoOption(captureNodeLevelInfo bool) nodeClientOption { } } +func captureOnlyNodeLabelInfoOption(captureOnlyNodeLabelInfo bool) nodeClientOption { + return func(n *nodeClient) { + n.captureOnlyNodeLabelInfo = captureOnlyNodeLabelInfo + } +} + type nodeClient struct { stopChan chan struct{} store *ObjStore @@ -69,14 +84,26 @@ type nodeClient struct { // The node client can be used in several places, including code paths that execute on both leader and non-leader nodes. // But for logic on the leader node (for ex in k8sapiserver.go), there is no need to obtain node level info since only cluster // level info is needed there. Hence, this optimization allows us to save on memory by not capturing node level info when not needed. - captureNodeLevelInfo bool + captureNodeLevelInfo bool + captureOnlyNodeLabelInfo bool mu sync.RWMutex + nodeInfos map[string]*NodeInfo clusterFailedNodeCount int clusterNodeCount int nodeToCapacityMap map[string]v1.ResourceList nodeToAllocatableMap map[string]v1.ResourceList nodeToConditionsMap map[string]map[v1.NodeConditionType]v1.ConditionStatus + nodeToLabelsMap map[string]map[Label]int8 +} + +func (c *nodeClient) NodeInfos() map[string]*NodeInfo { + if c.store.GetResetRefreshStatus() { + c.refresh() + } + c.mu.RLock() + defer c.mu.RUnlock() + return c.nodeInfos } func (c *nodeClient) ClusterFailedNodeCount() int { @@ -97,6 +124,18 @@ func (c *nodeClient) ClusterNodeCount() int { return c.clusterNodeCount } +func (c *nodeClient) NodeToLabelsMap() map[string]map[Label]int8 { + if !c.captureOnlyNodeLabelInfo { + c.logger.Warn("trying to access node label info when captureOnlyNodeLabelInfo is not set, will return empty data") + } + if c.store.GetResetRefreshStatus() { + c.refresh() + } + c.mu.RLock() + defer c.mu.RUnlock() + return c.nodeToLabelsMap +} + func (c *nodeClient) NodeToCapacityMap() map[string]v1.ResourceList { if !c.captureNodeLevelInfo { c.logger.Warn("trying to access node level info when captureNodeLevelInfo is not set, will return empty data") @@ -144,25 +183,37 @@ func (c *nodeClient) refresh() { nodeToCapacityMap := make(map[string]v1.ResourceList) nodeToAllocatableMap := make(map[string]v1.ResourceList) nodeToConditionsMap := make(map[string]map[v1.NodeConditionType]v1.ConditionStatus) + nodeToLabelsMap := make(map[string]map[Label]int8) + + nodeInfos := map[string]*NodeInfo{} for _, obj := range objsList { - node := obj.(*nodeInfo) + node := obj.(*NodeInfo) + nodeInfos[node.Name] = node if c.captureNodeLevelInfo { - nodeToCapacityMap[node.name] = node.capacity - nodeToAllocatableMap[node.name] = node.allocatable + nodeToCapacityMap[node.Name] = node.Capacity + nodeToAllocatableMap[node.Name] = node.Allocatable conditionsMap := make(map[v1.NodeConditionType]v1.ConditionStatus) - for _, condition := range node.conditions { + for _, condition := range node.Conditions { conditionsMap[condition.Type] = condition.Status } - nodeToConditionsMap[node.name] = conditionsMap + nodeToConditionsMap[node.Name] = conditionsMap + } + + if c.captureOnlyNodeLabelInfo { + labelsMap := make(map[Label]int8) + if HyperPodLabel, ok := node.Labels[SageMakerNodeHealthStatus]; ok { + labelsMap[SageMakerNodeHealthStatus] = HyperPodLabel + nodeToLabelsMap[node.Name] = labelsMap + } } clusterNodeCountNew++ failed := false Loop: - for _, condition := range node.conditions { + for _, condition := range node.Conditions { if _, ok := failedNodeConditions[condition.Type]; ok { // match the failedNodeConditions type we care about if condition.Status != v1.ConditionFalse { @@ -178,11 +229,13 @@ func (c *nodeClient) refresh() { } } + c.nodeInfos = nodeInfos c.clusterFailedNodeCount = clusterFailedNodeCountNew c.clusterNodeCount = clusterNodeCountNew c.nodeToCapacityMap = nodeToCapacityMap c.nodeToAllocatableMap = nodeToAllocatableMap c.nodeToConditionsMap = nodeToConditionsMap + c.nodeToLabelsMap = nodeToLabelsMap } func newNodeClient(clientSet kubernetes.Interface, logger *zap.Logger, options ...nodeClientOption) *nodeClient { @@ -222,17 +275,33 @@ func transformFuncNode(obj any) (any, error) { if !ok { return nil, fmt.Errorf("input obj %v is not Node type", obj) } - info := new(nodeInfo) - info.name = node.Name - info.capacity = node.Status.Capacity - info.allocatable = node.Status.Allocatable - info.conditions = []*NodeCondition{} + info := new(NodeInfo) + info.Name = node.Name + info.Capacity = node.Status.Capacity + info.Allocatable = node.Status.Allocatable + if instanceType, ok := node.Labels[instanceTypeLabelKey]; ok { + info.InstanceType = instanceType + } else { + // fallback for compatibility with k8s versions older than v1.17 + // https://kubernetes.io/docs/reference/labels-annotations-taints/#beta-kubernetes-io-instance-type-deprecated + if instanceType, ok := node.Labels[instanceTypeLabelKeyBeta]; ok { + info.InstanceType = instanceType + } + } + info.Conditions = []*NodeCondition{} for _, condition := range node.Status.Conditions { - info.conditions = append(info.conditions, &NodeCondition{ + info.Conditions = append(info.Conditions, &NodeCondition{ Type: condition.Type, Status: condition.Status, }) } + + if sageMakerHealthStatus, ok := node.Labels[SageMakerNodeHealthStatus.String()]; ok { + info.Labels = make(map[Label]int8) + if condition, ok := k8sutil.ParseString(sageMakerHealthStatus); ok { + info.Labels[SageMakerNodeHealthStatus] = condition + } + } return info, nil } diff --git a/internal/aws/k8s/k8sclient/node_info.go b/internal/aws/k8s/k8sclient/node_info.go index 6b1462adadcd..0fc680f2b892 100644 --- a/internal/aws/k8s/k8sclient/node_info.go +++ b/internal/aws/k8s/k8sclient/node_info.go @@ -7,14 +7,26 @@ import ( v1 "k8s.io/api/core/v1" ) -type nodeInfo struct { - name string - conditions []*NodeCondition - capacity v1.ResourceList - allocatable v1.ResourceList +type NodeInfo struct { + Name string + Conditions []*NodeCondition + Capacity v1.ResourceList + Allocatable v1.ResourceList + InstanceType string + Labels map[Label]int8 } type NodeCondition struct { Type v1.NodeConditionType Status v1.ConditionStatus } + +type Label int8 + +const ( + SageMakerNodeHealthStatus Label = iota +) + +func (ct Label) String() string { + return [...]string{"sagemaker.amazonaws.com/node-health-status"}[ct] +} diff --git a/internal/aws/k8s/k8sclient/node_test.go b/internal/aws/k8s/k8sclient/node_test.go index 9c0213001f44..4e8f3974f174 100644 --- a/internal/aws/k8s/k8sclient/node_test.go +++ b/internal/aws/k8s/k8sclient/node_test.go @@ -14,6 +14,8 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sutil" ) var nodeArray = []any{ @@ -30,12 +32,13 @@ var nodeArray = []any{ Time: time.Now(), }, Labels: map[string]string{ - "kubernetes.io/arch": "amd64", - "beta.kubernetes.io/instance-type": "t3.medium", - "kubernetes.io/os": "linux", - "failure-domain.beta.kubernetes.io/region": "eu-west-1", - "failure-domain.beta.kubernetes.io/zone": "eu-west-1c", - "kubernetes.io/hostname": "ip-192-168-200-63.eu-west-1.compute.internal", + "kubernetes.io/arch": "amd64", + "beta.kubernetes.io/instance-type": "t3.medium", + "kubernetes.io/os": "linux", + "failure-domain.beta.kubernetes.io/region": "eu-west-1", + "failure-domain.beta.kubernetes.io/zone": "eu-west-1c", + "kubernetes.io/hostname": "ip-192-168-200-63.eu-west-1.compute.internal", + "sagemaker.amazonaws.com/node-health-status": "Schedulable", }, Annotations: map[string]string{ "node.alpha.kubernetes.io/ttl": "0", @@ -126,13 +129,14 @@ var nodeArray = []any{ Time: time.Now(), }, Labels: map[string]string{ - "kubernetes.io/os": "linux", - "failure-domain.beta.kubernetes.io/region": "eu-west-1", - "failure-domain.beta.kubernetes.io/zone": "eu-west-1a", - "kubernetes.io/hostname": "ip-192-168-76-61.eu-west-1.compute.internal", - "kubernetes.io/arch": "amd64", - "beta.kubernetes.io/instance-type": "t3.medium", - "node.kubernetes.io/instance-type": "t3.medium", + "kubernetes.io/os": "linux", + "failure-domain.beta.kubernetes.io/region": "eu-west-1", + "failure-domain.beta.kubernetes.io/zone": "eu-west-1a", + "kubernetes.io/hostname": "ip-192-168-76-61.eu-west-1.compute.internal", + "kubernetes.io/arch": "amd64", + "beta.kubernetes.io/instance-type": "t3.medium", + "node.kubernetes.io/instance-type": "t3.medium", + "sagemaker.amazonaws.com/node-health-status": "Unschedulable", }, Annotations: map[string]string{ "node.alpha.kubernetes.io/ttl": "0", @@ -323,12 +327,14 @@ func TestNodeClient(t *testing.T) { "nodeToCapacityMap": map[string]v1.ResourceList{}, // Node level info is not captured by default "nodeToAllocatableMap": map[string]v1.ResourceList{}, // Node level info is not captured by default "nodeToConditionsMap": map[string]map[v1.NodeConditionType]v1.ConditionStatus{}, // Node level info is not captured by default + "NodeToLabelsMap": map[string]map[Label]int8{}, }, }, "CaptureNodeLevelInfo": { options: []nodeClientOption{ nodeSyncCheckerOption(&mockReflectorSyncChecker{}), captureNodeLevelInfoOption(true), + captureOnlyNodeLabelInfoOption(true), }, want: map[string]any{ "clusterNodeCount": 3, @@ -375,6 +381,14 @@ func TestNodeClient(t *testing.T) { "Ready": "False", }, }, + "NodeToLabelsMap": map[string]map[Label]int8{ + "ip-192-168-200-63.eu-west-1.compute.internal": { + SageMakerNodeHealthStatus: int8(k8sutil.Schedulable), + }, + "ip-192-168-76-61.eu-west-1.compute.internal": { + SageMakerNodeHealthStatus: int8(k8sutil.Unschedulable), + }, + }, }, }, } @@ -389,6 +403,7 @@ func TestNodeClient(t *testing.T) { require.Equal(t, testCase.want["nodeToCapacityMap"], client.NodeToCapacityMap()) require.Equal(t, testCase.want["nodeToAllocatableMap"], client.NodeToAllocatableMap()) require.Equal(t, testCase.want["nodeToConditionsMap"], client.NodeToConditionsMap()) + require.Equal(t, testCase.want["NodeToLabelsMap"], client.NodeToLabelsMap()) client.shutdown() assert.True(t, client.stopped) diff --git a/internal/aws/k8s/k8sutil/util.go b/internal/aws/k8s/k8sutil/util.go index 2264446eacfb..94242a92d4ca 100644 --- a/internal/aws/k8s/k8sutil/util.go +++ b/internal/aws/k8s/k8sutil/util.go @@ -22,3 +22,34 @@ func CreateContainerKey(namespace, podName, containerName string) string { } return fmt.Sprintf("namespace:%s,podName:%s,containerName:%s", namespace, podName, containerName) } + +type HyperPodConditionType int8 + +const ( + Schedulable HyperPodConditionType = iota + UnschedulablePendingReplacement + UnschedulablePendingReboot + Unschedulable +) + +func (ct HyperPodConditionType) String() string { + return [...]string{"Schedulable", "UnschedulablePendingReplacement", "UnschedulablePendingReboot", "Unschedulable"}[ct] +} + +func (ct HyperPodConditionType) EnumIndex() int { + return int(ct) +} + +var ( + HyperPodConditionTypeMap = map[string]HyperPodConditionType{ + "Schedulable": Schedulable, + "UnschedulablePendingReplacement": UnschedulablePendingReplacement, + "UnschedulablePendingReboot": UnschedulablePendingReboot, + "Unschedulable": Unschedulable, + } +) + +func ParseString(str string) (int8, bool) { + c, ok := HyperPodConditionTypeMap[str] + return int8(c), ok +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go index cc9a17ef801d..354a223360f4 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go @@ -26,6 +26,15 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sutil" ) +var HyperPodConditions = []k8sutil.HyperPodConditionType{ + k8sutil.UnschedulablePendingReplacement, + k8sutil.UnschedulablePendingReboot, + k8sutil.Unschedulable, + k8sutil.Schedulable, +} + +const HyperPodNodePrefix = "hyperpod-" + // eventBroadcaster is adpated from record.EventBroadcaster type eventBroadcaster interface { // StartRecordingToSink starts sending events received from this EventBroadcaster to the given @@ -71,7 +80,6 @@ type Option func(*K8sAPIServer) // NewK8sAPIServer creates a k8sApiServer which can generate cluster-level metrics func NewK8sAPIServer(cnp clusterNameProvider, logger *zap.Logger, leaderElection *LeaderElection, addFullPodNameMetricLabel bool, includeEnhancedMetrics bool, options ...Option) (*K8sAPIServer, error) { - k := &K8sAPIServer{ logger: logger, clusterNameProvider: cnp, @@ -126,6 +134,10 @@ func (k *K8sAPIServer) GetMetrics() []pmetric.Metrics { result = append(result, k.getReplicaSetMetrics(clusterName, timestampNs)...) result = append(result, k.getPendingPodStatusMetrics(clusterName, timestampNs)...) + if k.includeEnhancedMetrics { + result = append(result, k.getHyperPodResiliencyMetrics(clusterName, timestampNs)...) + } + return result } @@ -442,6 +454,35 @@ func (k *K8sAPIServer) getKubernetesBlob(pod *k8sclient.PodInfo, kubernetesBlob } } +func (k *K8sAPIServer) getHyperPodResiliencyMetrics(clusterName, timestampNs string) []pmetric.Metrics { + var metrics []pmetric.Metrics + nodeInfos := k.leaderElection.nodeClient.NodeInfos() + for nodeName, labels := range k.leaderElection.nodeClient.NodeToLabelsMap() { + if nodeInfo, ok := nodeInfos[nodeName]; ok { + if isHyperPodNode(nodeInfo.InstanceType) { + fields := map[string]any{} + attributes := map[string]string{ + ci.ClusterNameKey: clusterName, + ci.MetricType: ci.TypeHyperPodNode, + ci.Timestamp: timestampNs, + ci.Version: "0", + } + + for _, condition := range HyperPodConditions { + if count, ok := isLabelSet(int8(condition), labels, k8sclient.SageMakerNodeHealthStatus); ok { + fields[ci.MetricName(ci.TypeHyperPodNode, ci.HyperPodConditionToMetric[condition.String()])] = count + } + } + attributes[ci.InstanceID] = strings.TrimPrefix(nodeName, HyperPodNodePrefix) + attributes[ci.NodeNameKey] = nodeName + md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) + metrics = append(metrics, md) + } + } + } + return metrics +} + // Shutdown stops the k8sApiServer func (k *K8sAPIServer) Shutdown() error { if k.cancel != nil { diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go index d4d27fbaea00..c151bc1a9df4 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" @@ -22,6 +23,7 @@ import ( ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sutil" ) func NewService(name, namespace string) k8sclient.Service { @@ -144,11 +146,21 @@ func (client *MockClient) ClusterFailedNodeCount() int { return args.Get(0).(int) } +func (client *MockClient) NodeInfos() map[string]*k8sclient.NodeInfo { + args := client.Called() + return args.Get(0).(map[string]*k8sclient.NodeInfo) +} + func (client *MockClient) ClusterNodeCount() int { args := client.Called() return args.Get(0).(int) } +func (client *MockClient) NodeToLabelsMap() map[string]map[k8sclient.Label]int8 { + args := client.Called() + return args.Get(0).(map[string]map[k8sclient.Label]int8) +} + // k8sclient.EpClient func (client *MockClient) ServiceToPodNum() map[k8sclient.Service]int { args := client.Called() @@ -310,6 +322,30 @@ func TestK8sAPIServer_GetMetrics(t *testing.T) { "namespace:kube-system,podName:coredns-7554568866-shwn6": {"kube-dns"}, }) + mockClient.On("NodeInfos").Return(map[string]*k8sclient.NodeInfo{ + "ip-192-168-57-23.us-west-2.compute.internal": { + Name: "ip-192-168-57-23.us-west-2.compute.internal", + Conditions: []*k8sclient.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + }, + }, + Capacity: map[v1.ResourceName]resource.Quantity{}, + Allocatable: map[v1.ResourceName]resource.Quantity{}, + InstanceType: "ml.g4dn-12xl", + Labels: map[k8sclient.Label]int8{ + k8sclient.SageMakerNodeHealthStatus: int8(k8sutil.Schedulable), + }, + }, + }) + + mockClient.On("NodeToLabelsMap").Return(map[string]map[k8sclient.Label]int8{ + "ip-192-168-57-23.us-west-2.compute.internal": { + k8sclient.SageMakerNodeHealthStatus: int8(k8sutil.Schedulable), + }, + }) + leaderElection := &LeaderElection{ k8sClient: &mockK8sClient{}, nodeClient: mockClient, @@ -401,6 +437,14 @@ func TestK8sAPIServer_GetMetrics(t *testing.T) { assert.Equal(t, "kube-system", getStringAttrVal(metric, ci.AttributeK8sNamespace)) assert.Equal(t, "Pending", getStringAttrVal(metric, "pod_status")) assert.Equal(t, "Pod", getStringAttrVal(metric, ci.MetricType)) + case ci.TypeHyperPodNode: + assert.Equal(t, "HyperPodNode", getStringAttrVal(metric, ci.MetricType)) + assert.Equal(t, "ip-192-168-57-23.us-west-2.compute.internal", getStringAttrVal(metric, ci.NodeNameKey)) + assert.Equal(t, "ip-192-168-57-23.us-west-2.compute.internal", getStringAttrVal(metric, ci.InstanceID)) + assertMetricValueEqual(t, metric, "hyper_pod_node_health_status_unschedulable_pending_reboot", int64(0)) + assertMetricValueEqual(t, metric, "hyper_pod_node_health_status_schedulable", int64(1)) + assertMetricValueEqual(t, metric, "hyper_pod_node_health_status_unschedulable", int64(0)) + assertMetricValueEqual(t, metric, "hyper_pod_node_health_status_unschedulable_pending_replacement", int64(0)) default: assert.Fail(t, "Unexpected metric type: "+metricType) } diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection.go index c08b0c939146..74d317e3fbea 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection.go @@ -65,8 +65,10 @@ func WithLeaderLockUsingConfigMapOnly(leaderLockUsingConfigMapOnly bool) LeaderE func NewLeaderElection(logger *zap.Logger, options ...LeaderElectionOption) (*LeaderElection, error) { le := &LeaderElection{ - logger: logger, - k8sClient: k8sclient.Get(logger), + logger: logger, + k8sClient: k8sclient.Get(logger, + k8sclient.CaptureOnlyNodeLabelsInfo(true), + ), broadcaster: record.NewBroadcaster(), } diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils.go index a2efa312805d..e06b3142dbf0 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils.go @@ -127,3 +127,16 @@ func parseDeploymentFromReplicaSet(name string) string { } return name[:lastDash] } + +func isHyperPodNode(instanceType string) bool { + return strings.HasPrefix(instanceType, "ml.") +} + +func isLabelSet(conditionType int8, nodeLabels map[k8sclient.Label]int8, labelKey k8sclient.Label) (int, bool) { + var count = 0 + nodeConditions, labelExists := nodeLabels[labelKey] + if labelExists && nodeConditions == conditionType { + count = 1 + } + return count, labelExists +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils_test.go index 6e7e0fd28fc4..396092913d67 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/utils_test.go @@ -10,6 +10,7 @@ import ( v1 "k8s.io/api/core/v1" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sutil" ) func TestUtils_parseDeploymentFromReplicaSet(t *testing.T) { @@ -61,3 +62,21 @@ func TestPodStore_addPodConditionMetrics(t *testing.T) { } assert.Equal(t, expectedFieldsArray, fields) } + +func TestUtils_isHyperPodNode(t *testing.T) { + assert.True(t, isHyperPodNode("ml.t3.medium")) + assert.False(t, isHyperPodNode("t3.medium")) +} + +func TestUtils_LabelsUtils(t *testing.T) { + nodelabels := map[k8sclient.Label]int8{ + k8sclient.SageMakerNodeHealthStatus: int8(k8sutil.Schedulable), + } + status, ok := isLabelSet(int8(k8sutil.Schedulable), nodelabels, k8sclient.SageMakerNodeHealthStatus) + assert.Equal(t, 1, status) + assert.True(t, ok) + + status, ok = isLabelSet(int8(k8sutil.UnschedulablePendingReboot), nodelabels, k8sclient.SageMakerNodeHealthStatus) + assert.Equal(t, 0, status) + assert.True(t, ok) +} diff --git a/receiver/awscontainerinsightreceiver/internal/stores/nodeinfo.go b/receiver/awscontainerinsightreceiver/internal/stores/nodeinfo.go index 34abbcb19054..c73d9c442eb1 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/nodeinfo.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/nodeinfo.go @@ -10,6 +10,8 @@ import ( "go.uber.org/zap" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient" ) type nodeStats struct { @@ -41,6 +43,7 @@ type nodeInfoProvider interface { NodeToCapacityMap() map[string]v1.ResourceList NodeToAllocatableMap() map[string]v1.ResourceList NodeToConditionsMap() map[string]map[v1.NodeConditionType]v1.ConditionStatus + NodeToLabelsMap() map[string]map[k8sclient.Label]int8 } func newNodeInfo(nodeName string, provider nodeInfoProvider, logger *zap.Logger) *nodeInfo { diff --git a/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go b/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go index f570415658ad..4448de9bfc9b 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/utils_test.go @@ -13,6 +13,8 @@ import ( "k8s.io/apimachinery/pkg/api/resource" ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sutil" ) type mockCIMetric struct { @@ -87,6 +89,15 @@ func (m *mockNodeInfoProvider) NodeToAllocatableMap() map[string]v1.ResourceList } } +func (m *mockNodeInfoProvider) NodeToLabelsMap() map[string]map[k8sclient.Label]int8 { + return map[string]map[k8sclient.Label]int8{ + "hyperpod-testNode1": { + k8sclient.SageMakerNodeHealthStatus: int8(k8sutil.Schedulable), + }, + "hyperpod-testNode2": {}, + } +} + func (m *mockNodeInfoProvider) NodeToConditionsMap() map[string]map[v1.NodeConditionType]v1.ConditionStatus { return map[string]map[v1.NodeConditionType]v1.ConditionStatus{ "testNode1": {