Skip to content

Commit

Permalink
[receiver/awscontainerinsightreceiver] Add new node level status metr…
Browse files Browse the repository at this point in the history
…ics for capacity_pods, allocatable_pods and conditions (open-telemetry#17)
  • Loading branch information
sky333999 authored May 24, 2023
1 parent 6531448 commit c9ec95a
Show file tree
Hide file tree
Showing 12 changed files with 491 additions and 67 deletions.
17 changes: 17 additions & 0 deletions internal/aws/containerinsight/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ const (
FSInodesfree = "filesystem_inodes_free"
FSUtilization = "filesystem_utilization"

StatusConditionReady = "status_condition_ready"
StatusConditionDiskPressure = "status_condition_disk_pressure"
StatusConditionMemoryPressure = "status_condition_memory_pressure"
StatusConditionPIDPressure = "status_condition_pid_pressure"
StatusConditionNetworkUnavailable = "status_condition_network_unavailable"
StatusCapacityPods = "status_capacity_pods"
StatusAllocatablePods = "status_allocatable_pods"

RunningPodCount = "number_of_running_pods"
RunningContainerCount = "number_of_running_containers"
ContainerCount = "number_of_containers"
Expand Down Expand Up @@ -201,6 +209,15 @@ func init() {
FSInodesfree: UnitCount,
FSUtilization: UnitPercent,

// status metrics
StatusConditionReady: UnitCount,
StatusConditionDiskPressure: UnitCount,
StatusConditionMemoryPressure: UnitCount,
StatusConditionPIDPressure: UnitCount,
StatusConditionNetworkUnavailable: UnitCount,
StatusCapacityPods: UnitCount,
StatusAllocatablePods: UnitCount,

// cluster metrics
NodeCount: UnitCount,
FailedNodeCount: UnitCount,
Expand Down
33 changes: 32 additions & 1 deletion internal/aws/k8s/k8sclient/clientset.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import (
"path/filepath"
"reflect"
"sort"
"strconv"
"strings"
"sync"
"time"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -116,6 +118,28 @@ func InitSyncPollTimeout(pollTimeout time.Duration) Option {
}
}

// NodeSelector provides the option to provide a field selector
// when retrieving information using the node client
func NodeSelector(nodeSelector fields.Selector) Option {
return Option{
name: "nodeSelector:" + nodeSelector.String(),
set: func(kc *K8sClient) {
kc.nodeSelector = nodeSelector
},
}
}

// CaptureNodeLevelInfo allows one to specify whether node level info
// should be captured and retained in memory
func CaptureNodeLevelInfo(captureNodeLevelInfo bool) Option {
return Option{
name: "captureNodeLevelInfo:" + strconv.FormatBool(captureNodeLevelInfo),
set: func(kc *K8sClient) {
kc.captureNodeLevelInfo = captureNodeLevelInfo
},
}
}

func getStringifiedOptions(options ...Option) string {
var opts []string
for _, option := range options {
Expand Down Expand Up @@ -188,6 +212,9 @@ type K8sClient struct {
nodeMu sync.Mutex
node nodeClientWithStopper

nodeSelector fields.Selector
captureNodeLevelInfo bool

jobMu sync.Mutex
job jobClientWithStopper

Expand Down Expand Up @@ -274,7 +301,11 @@ func (c *K8sClient) ShutdownPodClient() {
func (c *K8sClient) GetNodeClient() NodeClient {
c.nodeMu.Lock()
if c.node == nil {
c.node = newNodeClient(c.clientSet, c.logger, nodeSyncCheckerOption(c.syncChecker))
opts := []nodeClientOption{nodeSyncCheckerOption(c.syncChecker), captureNodeLevelInfoOption(c.captureNodeLevelInfo)}
if c.nodeSelector != nil {
opts = append(opts, nodeSelectorOption(c.nodeSelector))
}
c.node = newNodeClient(c.clientSet, c.logger, opts...)
}
c.nodeMu.Unlock()
return c.node
Expand Down
5 changes: 5 additions & 0 deletions internal/aws/k8s/k8sclient/clientset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/fields"
)

func TestGetShutdown(t *testing.T) {
Expand All @@ -29,6 +30,8 @@ func TestGetShutdown(t *testing.T) {
KubeConfigPath(tmpConfigPath),
InitSyncPollInterval(10*time.Nanosecond),
InitSyncPollTimeout(20*time.Nanosecond),
NodeSelector(fields.OneTermEqualSelector("testField", "testVal")),
CaptureNodeLevelInfo(true),
)
assert.Equal(t, 1, len(optionsToK8sClient))
assert.NotNil(t, k8sClient.GetClientSet())
Expand All @@ -37,6 +40,8 @@ func TestGetShutdown(t *testing.T) {
assert.NotNil(t, k8sClient.GetNodeClient())
assert.NotNil(t, k8sClient.GetPodClient())
assert.NotNil(t, k8sClient.GetReplicaSetClient())
assert.True(t, k8sClient.captureNodeLevelInfo)
assert.Equal(t, "testField=testVal", k8sClient.nodeSelector.String())
k8sClient.Shutdown()
assert.Nil(t, k8sClient.ep)
assert.Nil(t, k8sClient.job)
Expand Down
97 changes: 93 additions & 4 deletions internal/aws/k8s/k8sclient/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
Expand All @@ -41,6 +42,9 @@ type NodeClient interface {
ClusterFailedNodeCount() int
// Get the number of nodes for current cluster
ClusterNodeCount() int
NodeToCapacityMap() map[string]v1.ResourceList
NodeToAllocatableMap() map[string]v1.ResourceList
NodeToConditionsMap() map[string]map[v1.NodeConditionType]v1.ConditionStatus
}

type nodeClientOption func(*nodeClient)
Expand All @@ -51,16 +55,39 @@ func nodeSyncCheckerOption(checker initialSyncChecker) nodeClientOption {
}
}

func nodeSelectorOption(nodeSelector fields.Selector) nodeClientOption {
return func(n *nodeClient) {
n.nodeSelector = nodeSelector
}
}

func captureNodeLevelInfoOption(captureNodeLevelInfo bool) nodeClientOption {
return func(n *nodeClient) {
n.captureNodeLevelInfo = captureNodeLevelInfo
}
}

type nodeClient struct {
stopChan chan struct{}
store *ObjStore
logger *zap.Logger

stopped bool
syncChecker initialSyncChecker

nodeSelector fields.Selector

// The node client can be used in several places, including code paths that execute on both leader and non-leader nodes.
// But for logic on the leader node (for ex in k8sapiserver.go), there is no need to obtain node level info since only cluster
// level info is needed there. Hence, this optimization allows us to save on memory by not capturing node level info when not needed.
captureNodeLevelInfo bool

mu sync.RWMutex
clusterFailedNodeCount int
clusterNodeCount int
nodeToCapacityMap map[string]v1.ResourceList
nodeToAllocatableMap map[string]v1.ResourceList
nodeToConditionsMap map[string]map[v1.NodeConditionType]v1.ConditionStatus
}

func (c *nodeClient) ClusterFailedNodeCount() int {
Expand All @@ -81,6 +108,42 @@ func (c *nodeClient) ClusterNodeCount() int {
return c.clusterNodeCount
}

func (c *nodeClient) NodeToCapacityMap() map[string]v1.ResourceList {
if !c.captureNodeLevelInfo {
c.logger.Warn("trying to access node level info when captureNodeLevelInfo is not set, will return empty data")
}
if c.store.GetResetRefreshStatus() {
c.refresh()
}
c.mu.RLock()
defer c.mu.RUnlock()
return c.nodeToCapacityMap
}

func (c *nodeClient) NodeToAllocatableMap() map[string]v1.ResourceList {
if !c.captureNodeLevelInfo {
c.logger.Warn("trying to access node level info when captureNodeLevelInfo is not set, will return empty data")
}
if c.store.GetResetRefreshStatus() {
c.refresh()
}
c.mu.RLock()
defer c.mu.RUnlock()
return c.nodeToAllocatableMap
}

func (c *nodeClient) NodeToConditionsMap() map[string]map[v1.NodeConditionType]v1.ConditionStatus {
if !c.captureNodeLevelInfo {
c.logger.Warn("trying to access node level info when captureNodeLevelInfo is not set, will return empty data")
}
if c.store.GetResetRefreshStatus() {
c.refresh()
}
c.mu.RLock()
defer c.mu.RUnlock()
return c.nodeToConditionsMap
}

func (c *nodeClient) refresh() {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -89,9 +152,22 @@ func (c *nodeClient) refresh() {

clusterFailedNodeCountNew := 0
clusterNodeCountNew := 0
nodeToCapacityMap := make(map[string]v1.ResourceList)
nodeToAllocatableMap := make(map[string]v1.ResourceList)
nodeToConditionsMap := make(map[string]map[v1.NodeConditionType]v1.ConditionStatus)

for _, obj := range objsList {
node := obj.(*nodeInfo)

if c.captureNodeLevelInfo {
nodeToCapacityMap[node.name] = node.capacity
nodeToAllocatableMap[node.name] = node.allocatable
conditionsMap := make(map[v1.NodeConditionType]v1.ConditionStatus)
for _, condition := range node.conditions {
conditionsMap[condition.Type] = condition.Status
}
nodeToConditionsMap[node.name] = conditionsMap
}
clusterNodeCountNew++

failed := false
Expand All @@ -115,11 +191,15 @@ func (c *nodeClient) refresh() {

c.clusterFailedNodeCount = clusterFailedNodeCountNew
c.clusterNodeCount = clusterNodeCountNew
c.nodeToCapacityMap = nodeToCapacityMap
c.nodeToAllocatableMap = nodeToAllocatableMap
c.nodeToConditionsMap = nodeToConditionsMap
}

func newNodeClient(clientSet kubernetes.Interface, logger *zap.Logger, options ...nodeClientOption) *nodeClient {
c := &nodeClient{
stopChan: make(chan struct{}),
logger: logger,
}

for _, option := range options {
Expand All @@ -128,7 +208,7 @@ func newNodeClient(clientSet kubernetes.Interface, logger *zap.Logger, options .

c.store = NewObjStore(transformFuncNode, logger)

lw := createNodeListWatch(clientSet)
lw := c.createNodeListWatch(clientSet)
reflector := cache.NewReflector(lw, &v1.Node{}, c.store, 0)
go reflector.Run(c.stopChan)

Expand All @@ -154,23 +234,32 @@ func transformFuncNode(obj interface{}) (interface{}, error) {
return nil, fmt.Errorf("input obj %v is not Node type", obj)
}
info := new(nodeInfo)
info.conditions = []*nodeCondition{}
info.name = node.Name
info.capacity = node.Status.Capacity
info.allocatable = node.Status.Allocatable
info.conditions = []*NodeCondition{}
for _, condition := range node.Status.Conditions {
info.conditions = append(info.conditions, &nodeCondition{
info.conditions = append(info.conditions, &NodeCondition{
Type: condition.Type,
Status: condition.Status,
})
}
return info, nil
}

func createNodeListWatch(client kubernetes.Interface) cache.ListerWatcher {
func (c *nodeClient) createNodeListWatch(client kubernetes.Interface) cache.ListerWatcher {
ctx := context.Background()
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
if c.nodeSelector != nil {
opts.FieldSelector = c.nodeSelector.String()
}
return client.CoreV1().Nodes().List(ctx, opts)
},
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
if c.nodeSelector != nil {
opts.FieldSelector = c.nodeSelector.String()
}
return client.CoreV1().Nodes().Watch(ctx, opts)
},
}
Expand Down
7 changes: 5 additions & 2 deletions internal/aws/k8s/k8sclient/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ import (
)

type nodeInfo struct {
conditions []*nodeCondition
name string
conditions []*NodeCondition
capacity v1.ResourceList
allocatable v1.ResourceList
}

type nodeCondition struct {
type NodeCondition struct {
Type v1.NodeConditionType
Status v1.ConditionStatus
}
Loading

0 comments on commit c9ec95a

Please sign in to comment.