diff --git a/pkg/kubelet/apis/podresources/server_v1.go b/pkg/kubelet/apis/podresources/server_v1.go index 325a7f727ec01..2e6425a2183a6 100644 --- a/pkg/kubelet/apis/podresources/server_v1.go +++ b/pkg/kubelet/apis/podresources/server_v1.go @@ -70,3 +70,33 @@ func (p *v1PodResourcesServer) List(ctx context.Context, req *v1.ListPodResource PodResources: podResources, }, nil } + +// GetAllocatableResources returns information about all the resources known by the server - this more like the capacity, not like the current amount of free resources. +func (p *v1PodResourcesServer) GetAllocatableResources(ctx context.Context, req *v1.AllocatableResourcesRequest) (*v1.AllocatableResourcesResponse, error) { + metrics.PodResourcesEndpointRequestsTotalCount.WithLabelValues("v1").Inc() + + allDevices := p.devicesProvider.GetAllocatableDevices() + var respDevs []*v1.ContainerDevices + + for resourceName, resourceDevs := range allDevices { + for devID, dev := range resourceDevs { + for _, node := range dev.GetTopology().GetNodes() { + numaNode := node.GetID() + respDevs = append(respDevs, &v1.ContainerDevices{ + ResourceName: resourceName, + DeviceIds: []string{devID}, + Topology: &v1.TopologyInfo{ + Nodes: []*v1.NUMANode{ + {ID: numaNode}, + }, + }, + }) + } + } + } + + return &v1.AllocatableResourcesResponse{ + Devices: respDevs, + CpuIds: p.cpusProvider.GetAllocatableCPUs().ToSliceNoSortInt64(), + }, nil +} diff --git a/pkg/kubelet/apis/podresources/server_v1_test.go b/pkg/kubelet/apis/podresources/server_v1_test.go index ed6033c3d50e9..73dbd54a0f183 100644 --- a/pkg/kubelet/apis/podresources/server_v1_test.go +++ b/pkg/kubelet/apis/podresources/server_v1_test.go @@ -25,8 +25,10 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" ) func TestListPodResourcesV1(t *testing.T) { @@ -138,6 +140,8 @@ func TestListPodResourcesV1(t *testing.T) { m.On("GetDevices", string(podUID), containerName).Return(tc.devices) m.On("GetCPUs", string(podUID), containerName).Return(tc.cpus) m.On("UpdateAllocatedDevices").Return() + m.On("GetAllocatableCPUs").Return(cpuset.CPUSet{}) + m.On("GetAllocatableDevices").Return(devicemanager.NewResourceDeviceInstances()) server := NewV1PodResourcesServer(m, m, m) resp, err := server.List(context.TODO(), &podresourcesapi.ListPodResourcesRequest{}) if err != nil { @@ -150,6 +154,140 @@ func TestListPodResourcesV1(t *testing.T) { } } +func TestAllocatableResources(t *testing.T) { + allDevs := devicemanager.ResourceDeviceInstances{ + "resource": { + "dev0": { + ID: "GPU-fef8089b-4820-abfc-e83e-94318197576e", + Health: "Healthy", + Topology: &pluginapi.TopologyInfo{ + Nodes: []*pluginapi.NUMANode{ + { + ID: 0, + }, + }, + }, + }, + "dev1": { + ID: "VF-8536e1e8-9dc6-4645-9aea-882db92e31e7", + Health: "Healthy", + Topology: &pluginapi.TopologyInfo{ + Nodes: []*pluginapi.NUMANode{ + { + ID: 1, + }, + }, + }, + }, + }, + } + allCPUs := cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16) + + for _, tc := range []struct { + desc string + allCPUs cpuset.CPUSet + allDevices devicemanager.ResourceDeviceInstances + expectedAllocatableResourcesResponse *podresourcesapi.AllocatableResourcesResponse + }{ + { + desc: "no devices, no CPUs", + allCPUs: cpuset.CPUSet{}, + allDevices: devicemanager.NewResourceDeviceInstances(), + expectedAllocatableResourcesResponse: &podresourcesapi.AllocatableResourcesResponse{}, + }, + { + desc: "no devices, all CPUs", + allCPUs: allCPUs, + allDevices: devicemanager.NewResourceDeviceInstances(), + expectedAllocatableResourcesResponse: &podresourcesapi.AllocatableResourcesResponse{ + CpuIds: allCPUs.ToSliceNoSortInt64(), + }, + }, + { + desc: "with devices, all CPUs", + allCPUs: allCPUs, + allDevices: allDevs, + expectedAllocatableResourcesResponse: &podresourcesapi.AllocatableResourcesResponse{ + CpuIds: allCPUs.ToSliceNoSortInt64(), + Devices: []*podresourcesapi.ContainerDevices{ + { + ResourceName: "resource", + DeviceIds: []string{"dev0"}, + Topology: &podresourcesapi.TopologyInfo{ + Nodes: []*podresourcesapi.NUMANode{ + { + ID: 0, + }, + }, + }, + }, + { + ResourceName: "resource", + DeviceIds: []string{"dev1"}, + Topology: &podresourcesapi.TopologyInfo{ + Nodes: []*podresourcesapi.NUMANode{ + { + ID: 1, + }, + }, + }, + }, + }, + }, + }, + { + desc: "with devices, no CPUs", + allCPUs: cpuset.CPUSet{}, + allDevices: allDevs, + expectedAllocatableResourcesResponse: &podresourcesapi.AllocatableResourcesResponse{ + Devices: []*podresourcesapi.ContainerDevices{ + { + ResourceName: "resource", + DeviceIds: []string{"dev0"}, + Topology: &podresourcesapi.TopologyInfo{ + Nodes: []*podresourcesapi.NUMANode{ + { + ID: 0, + }, + }, + }, + }, + { + ResourceName: "resource", + DeviceIds: []string{"dev1"}, + Topology: &podresourcesapi.TopologyInfo{ + Nodes: []*podresourcesapi.NUMANode{ + { + ID: 1, + }, + }, + }, + }, + }, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + m := new(mockProvider) + m.On("GetDevices", "", "").Return([]*podresourcesapi.ContainerDevices{}) + m.On("GetCPUs", "", "").Return(cpuset.CPUSet{}) + m.On("UpdateAllocatedDevices").Return() + m.On("GetAllocatableDevices").Return(tc.allDevices) + m.On("GetAllocatableCPUs").Return(tc.allCPUs) + server := NewV1PodResourcesServer(m, m, m) + + resp, err := server.GetAllocatableResources(context.TODO(), &podresourcesapi.AllocatableResourcesRequest{}) + if err != nil { + t.Errorf("want err = %v, got %q", nil, err) + } + + if !equalAllocatableResourcesResponse(tc.expectedAllocatableResourcesResponse, resp) { + t.Errorf("want resp = %s, got %s", tc.expectedAllocatableResourcesResponse.String(), resp.String()) + } + }) + } +} + func equalListResponse(respA, respB *podresourcesapi.ListPodResourcesResponse) bool { if len(respA.PodResources) != len(respB.PodResources) { return false @@ -177,26 +315,49 @@ func equalListResponse(respA, respB *podresourcesapi.ListPodResourcesResponse) b return false } - if len(cntA.Devices) != len(cntB.Devices) { + if !equalContainerDevices(cntA.Devices, cntB.Devices) { return false } + } + } + return true +} - for kdx := 0; kdx < len(cntA.Devices); kdx++ { - cntDevA := cntA.Devices[kdx] - cntDevB := cntB.Devices[kdx] - - if cntDevA.ResourceName != cntDevB.ResourceName { - return false - } - if !equalTopology(cntDevA.Topology, cntDevB.Topology) { - return false - } - if !equalStrings(cntDevA.DeviceIds, cntDevB.DeviceIds) { - return false - } - } +func equalContainerDevices(devA, devB []*podresourcesapi.ContainerDevices) bool { + if len(devA) != len(devB) { + return false + } + + // the ordering of container devices in the response is not defined, + // so we need to do a full scan, failing at first mismatch + for idx := 0; idx < len(devA); idx++ { + if !containsContainerDevice(devA[idx], devB) { + return false } } + + return true +} + +func containsContainerDevice(cntDev *podresourcesapi.ContainerDevices, devs []*podresourcesapi.ContainerDevices) bool { + for idx := 0; idx < len(devs); idx++ { + if equalContainerDevice(cntDev, devs[idx]) { + return true + } + } + return false +} + +func equalContainerDevice(cntDevA, cntDevB *podresourcesapi.ContainerDevices) bool { + if cntDevA.ResourceName != cntDevB.ResourceName { + return false + } + if !equalTopology(cntDevA.Topology, cntDevB.Topology) { + return false + } + if !equalStrings(cntDevA.DeviceIds, cntDevB.DeviceIds) { + return false + } return true } @@ -231,3 +392,10 @@ func equalTopology(a, b *podresourcesapi.TopologyInfo) bool { } return reflect.DeepEqual(a, b) } + +func equalAllocatableResourcesResponse(respA, respB *podresourcesapi.AllocatableResourcesResponse) bool { + if !equalInt64s(respA.CpuIds, respB.CpuIds) { + return false + } + return equalContainerDevices(respA.Devices, respB.Devices) +} diff --git a/pkg/kubelet/apis/podresources/server_v1alpha1_test.go b/pkg/kubelet/apis/podresources/server_v1alpha1_test.go index 9ce97900d7f5d..85156df770eae 100644 --- a/pkg/kubelet/apis/podresources/server_v1alpha1_test.go +++ b/pkg/kubelet/apis/podresources/server_v1alpha1_test.go @@ -28,6 +28,7 @@ import ( podresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1" "k8s.io/kubelet/pkg/apis/podresources/v1alpha1" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" ) type mockProvider struct { @@ -53,6 +54,16 @@ func (m *mockProvider) UpdateAllocatedDevices() { m.Called() } +func (m *mockProvider) GetAllocatableDevices() devicemanager.ResourceDeviceInstances { + args := m.Called() + return args.Get(0).(devicemanager.ResourceDeviceInstances) +} + +func (m *mockProvider) GetAllocatableCPUs() cpuset.CPUSet { + args := m.Called() + return args.Get(0).(cpuset.CPUSet) +} + func TestListPodResourcesV1alpha1(t *testing.T) { podName := "pod-name" podNamespace := "pod-namespace" diff --git a/pkg/kubelet/apis/podresources/types.go b/pkg/kubelet/apis/podresources/types.go index 40d00db7954fe..6199e3c52b83b 100644 --- a/pkg/kubelet/apis/podresources/types.go +++ b/pkg/kubelet/apis/podresources/types.go @@ -20,12 +20,17 @@ import ( "k8s.io/api/core/v1" podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" ) // DevicesProvider knows how to provide the devices used by the given container type DevicesProvider interface { - GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices + // UpdateAllocatedDevices frees any Devices that are bound to terminated pods. UpdateAllocatedDevices() + // GetDevices returns information about the devices assigned to pods and containers + GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices + // GetAllocatableDevices returns information about all the devices known to the manager + GetAllocatableDevices() devicemanager.ResourceDeviceInstances } // PodsProvider knows how to provide the pods admitted by the node @@ -35,5 +40,8 @@ type PodsProvider interface { // CPUsProvider knows how to provide the cpus used by the given container type CPUsProvider interface { + // GetCPUs returns information about the cpus assigned to pods and containers GetCPUs(podUID, containerName string) cpuset.CPUSet + // GetAllocatableCPUs returns the allocatable (not allocated) CPUs + GetAllocatableCPUs() cpuset.CPUSet } diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index cefb40f9512fa..dbbde9a1d3c46 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -26,8 +26,8 @@ import ( // TODO: Migrate kubelet to either use its own internal objects or client library. v1 "k8s.io/api/core/v1" internalapi "k8s.io/cri-api/pkg/apis" - podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" + "k8s.io/kubernetes/pkg/kubelet/apis/podresources" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -103,12 +103,6 @@ type ContainerManager interface { // registration. GetPluginRegistrationHandler() cache.PluginHandler - // GetDevices returns information about the devices assigned to pods and containers - GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices - - // GetCPUs returns information about the cpus assigned to pods and containers - GetCPUs(podUID, containerName string) cpuset.CPUSet - // ShouldResetExtendedResourceCapacity returns whether or not the extended resources should be zeroed, // due to node recreation. ShouldResetExtendedResourceCapacity() bool @@ -118,6 +112,10 @@ type ContainerManager interface { // UpdateAllocatedDevices frees any Devices that are bound to terminated pods. UpdateAllocatedDevices() + + // Implements the podresources Provider API for CPUs and Devices + podresources.CPUsProvider + podresources.DevicesProvider } type NodeConfig struct { diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index a19e218026878..e9dc437d03974 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -1073,10 +1073,18 @@ func (cm *containerManagerImpl) GetDevices(podUID, containerName string) []*podr return cm.deviceManager.GetDevices(podUID, containerName) } +func (cm *containerManagerImpl) GetAllocatableDevices() devicemanager.ResourceDeviceInstances { + return cm.deviceManager.GetAllocatableDevices() +} + func (cm *containerManagerImpl) GetCPUs(podUID, containerName string) cpuset.CPUSet { return cm.cpuManager.GetCPUs(podUID, containerName).Clone() } +func (cm *containerManagerImpl) GetAllocatableCPUs() cpuset.CPUSet { + return cm.cpuManager.GetAllocatableCPUs() +} + func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool { return cm.deviceManager.ShouldResetExtendedResourceCapacity() } diff --git a/pkg/kubelet/cm/container_manager_stub.go b/pkg/kubelet/cm/container_manager_stub.go index 5c8e835a39beb..e84cb9d3e9684 100644 --- a/pkg/kubelet/cm/container_manager_stub.go +++ b/pkg/kubelet/cm/container_manager_stub.go @@ -25,6 +25,7 @@ import ( podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" @@ -131,6 +132,14 @@ func (cm *containerManagerStub) GetCPUs(_, _ string) cpuset.CPUSet { return cpuset.CPUSet{} } +func (cm *containerManagerStub) GetAllocatableDevices() devicemanager.ResourceDeviceInstances { + return nil +} + +func (cm *containerManagerStub) GetAllocatableCPUs() cpuset.CPUSet { + return cpuset.CPUSet{} +} + func NewStubContainerManager() ContainerManager { return &containerManagerStub{shouldResetExtendedResourceCapacity: false} } diff --git a/pkg/kubelet/cm/container_manager_windows.go b/pkg/kubelet/cm/container_manager_windows.go index 54ca4f53d01ee..a39fb404a3883 100644 --- a/pkg/kubelet/cm/container_manager_windows.go +++ b/pkg/kubelet/cm/container_manager_windows.go @@ -36,6 +36,7 @@ import ( kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" @@ -235,3 +236,11 @@ func (cm *containerManagerImpl) UpdateAllocatedDevices() { func (cm *containerManagerImpl) GetCPUs(_, _ string) cpuset.CPUSet { return cpuset.CPUSet{} } + +func (cm *containerManagerImpl) GetAllocatableCPUs() cpuset.CPUSet { + return cpuset.CPUSet{} +} + +func (cm *containerManagerImpl) GetAllocatableDevices() devicemanager.ResourceDeviceInstances { + return nil +} diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index c0d0f1aa69b9b..673e627db080d 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -85,6 +85,9 @@ type Manager interface { // and is consulted to achieve NUMA aware resource alignment per Pod // among this and other resource controllers. GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint + + // GetAllocatableCPUs returns the assignable (not allocated) CPUs + GetAllocatableCPUs() cpuset.CPUSet } type manager struct { @@ -124,6 +127,9 @@ type manager struct { // stateFileDirectory holds the directory where the state file for checkpoints is held. stateFileDirectory string + + // allocatableCPUs is the set of online CPUs as reported by the system + allocatableCPUs cpuset.CPUSet } var _ Manager = &manager{} @@ -150,6 +156,7 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo return nil, err } klog.Infof("[cpumanager] detected CPU topology: %v", topo) + reservedCPUs, ok := nodeAllocatableReservation[v1.ResourceCPU] if !ok { // The static policy cannot initialize without this information. @@ -210,6 +217,8 @@ func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesRe return err } + m.allocatableCPUs = m.policy.GetAllocatableCPUs(m.state) + if m.policy.Name() == string(PolicyNone) { return nil } @@ -296,6 +305,10 @@ func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager. return m.policy.GetPodTopologyHints(m.state, pod) } +func (m *manager) GetAllocatableCPUs() cpuset.CPUSet { + return m.allocatableCPUs.Clone() +} + type reconciledContainer struct { podName string containerName string diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index 82f38262c39cd..51c6ad99251bc 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -120,6 +120,10 @@ func (p *mockPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string] return nil } +func (p *mockPolicy) GetAllocatableCPUs(m state.State) cpuset.CPUSet { + return cpuset.NewCPUSet() +} + type mockRuntimeService struct { err error } diff --git a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go index 478534b1c1c02..1c7fa8b549847 100644 --- a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go @@ -74,6 +74,11 @@ func (m *fakeManager) GetCPUs(podUID, containerName string) cpuset.CPUSet { return cpuset.CPUSet{} } +func (m *fakeManager) GetAllocatableCPUs() cpuset.CPUSet { + klog.Infof("[fake cpumanager] Get Allocatable Cpus") + return cpuset.CPUSet{} +} + // NewFakeManager creates empty/fake cpu manager func NewFakeManager() Manager { return &fakeManager{ diff --git a/pkg/kubelet/cm/cpumanager/policy.go b/pkg/kubelet/cm/cpumanager/policy.go index 54565e5023c08..dd5d977a12013 100644 --- a/pkg/kubelet/cm/cpumanager/policy.go +++ b/pkg/kubelet/cm/cpumanager/policy.go @@ -19,6 +19,7 @@ package cpumanager import ( "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" ) @@ -38,4 +39,6 @@ type Policy interface { // and is consulted to achieve NUMA aware resource alignment per Pod // among this and other resource controllers. GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint + // GetAllocatableCPUs returns the assignable (not allocated) CPUs + GetAllocatableCPUs(m state.State) cpuset.CPUSet } diff --git a/pkg/kubelet/cm/cpumanager/policy_none.go b/pkg/kubelet/cm/cpumanager/policy_none.go index abc1c0632b149..5b8f094d2d66d 100644 --- a/pkg/kubelet/cm/cpumanager/policy_none.go +++ b/pkg/kubelet/cm/cpumanager/policy_none.go @@ -20,6 +20,7 @@ import ( "k8s.io/api/core/v1" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" ) @@ -30,7 +31,7 @@ var _ Policy = &nonePolicy{} // PolicyNone name of none policy const PolicyNone policyName = "none" -// NewNonePolicy returns a cupset manager policy that does nothing +// NewNonePolicy returns a cpuset manager policy that does nothing func NewNonePolicy() Policy { return &nonePolicy{} } @@ -59,3 +60,12 @@ func (p *nonePolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1. func (p *nonePolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint { return nil } + +// Assignable CPUs are the ones that can be exclusively allocated to pods that meet the exclusivity requirement +// (ie guaranteed QoS class and integral CPU request). +// Assignability of CPUs as a concept is only applicable in case of static policy i.e. scenarios where workloads +// CAN get exclusive access to core(s). +// Hence, we return empty set here: no cpus are assignable according to above definition with this policy. +func (p *nonePolicy) GetAllocatableCPUs(m state.State) cpuset.CPUSet { + return cpuset.NewCPUSet() +} diff --git a/pkg/kubelet/cm/cpumanager/policy_none_test.go b/pkg/kubelet/cm/cpumanager/policy_none_test.go index 732bce2a47222..971279710962d 100644 --- a/pkg/kubelet/cm/cpumanager/policy_none_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_none_test.go @@ -65,3 +65,24 @@ func TestNonePolicyRemove(t *testing.T) { t.Errorf("NonePolicy RemoveContainer() error. expected no error but got %v", err) } } + +func TestNonePolicyGetAllocatableCPUs(t *testing.T) { + // any random topology is fine + + var cpuIDs []int + for cpuID := range topoSingleSocketHT.CPUDetails { + cpuIDs = append(cpuIDs, cpuID) + } + + policy := &nonePolicy{} + + st := &mockState{ + assignments: state.ContainerCPUAssignments{}, + defaultCPUSet: cpuset.NewCPUSet(cpuIDs...), + } + + cpus := policy.GetAllocatableCPUs(st) + if cpus.Size() != 0 { + t.Errorf("NonePolicy GetAllocatableCPUs() error. expected empty set, returned: %v", cpus) + } +} diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index c3309ef728057..ed04f64ae1438 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -187,8 +187,8 @@ func (p *staticPolicy) validateState(s state.State) error { return nil } -// assignableCPUs returns the set of unassigned CPUs minus the reserved set. -func (p *staticPolicy) assignableCPUs(s state.State) cpuset.CPUSet { +// GetAllocatableCPUs returns the set of unassigned CPUs minus the reserved set. +func (p *staticPolicy) GetAllocatableCPUs(s state.State) cpuset.CPUSet { return s.GetDefaultCPUSet().Difference(p.reserved) } @@ -258,14 +258,14 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask, reusableCPUs cpuset.CPUSet) (cpuset.CPUSet, error) { klog.Infof("[cpumanager] allocateCpus: (numCPUs: %d, socket: %v)", numCPUs, numaAffinity) - assignableCPUs := p.assignableCPUs(s).Union(reusableCPUs) + allocatableCPUs := p.GetAllocatableCPUs(s).Union(reusableCPUs) // If there are aligned CPUs in numaAffinity, attempt to take those first. result := cpuset.NewCPUSet() if numaAffinity != nil { alignedCPUs := cpuset.NewCPUSet() for _, numaNodeID := range numaAffinity.GetBits() { - alignedCPUs = alignedCPUs.Union(assignableCPUs.Intersection(p.topology.CPUDetails.CPUsInNUMANodes(numaNodeID))) + alignedCPUs = alignedCPUs.Union(allocatableCPUs.Intersection(p.topology.CPUDetails.CPUsInNUMANodes(numaNodeID))) } numAlignedToAlloc := alignedCPUs.Size() @@ -282,7 +282,7 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bit } // Get any remaining CPUs from what's leftover after attempting to grab aligned ones. - remainingCPUs, err := takeByTopology(p.topology, assignableCPUs.Difference(result), numCPUs-result.Size()) + remainingCPUs, err := takeByTopology(p.topology, allocatableCPUs.Difference(result), numCPUs-result.Size()) if err != nil { return cpuset.NewCPUSet(), err } @@ -367,11 +367,9 @@ func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v } } - // Get a list of available CPUs. - available := p.assignableCPUs(s) - // Get a list of reusable CPUs (e.g. CPUs reused from initContainers). // It should be an empty CPUSet for a newly created pod. + available := p.GetAllocatableCPUs(s) reusable := p.cpusToReuse[string(pod.UID)] // Generate hints. @@ -423,7 +421,7 @@ func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[strin } // Get a list of available CPUs. - available := p.assignableCPUs(s) + available := p.GetAllocatableCPUs(s) // Get a list of reusable CPUs (e.g. CPUs reused from initContainers). // It should be an empty CPUSet for a newly created pod. diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 445cf34ddb8e9..5f10e560e8db5 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -85,8 +85,8 @@ type ManagerImpl struct { // e.g. a new device is advertised, two old devices are deleted and a running device fails. callback monitorCallback - // allDevices is a map by resource name of all the devices currently registered to the device manager - allDevices map[string]map[string]pluginapi.Device + // allDevices holds all the devices currently registered to the device manager + allDevices ResourceDeviceInstances // healthyDevices contains all of the registered healthy resourceNames and their exported device IDs. healthyDevices map[string]sets.String @@ -1068,6 +1068,15 @@ func (m *ManagerImpl) isDevicePluginResource(resource string) bool { return false } +// GetAllocatableDevices returns information about all the devices known to the manager +func (m *ManagerImpl) GetAllocatableDevices() ResourceDeviceInstances { + m.mutex.Lock() + resp := m.allDevices.Clone() + m.mutex.Unlock() + klog.V(4).Infof("known devices: %d", len(resp)) + return resp +} + // GetDevices returns the devices used by the specified container func (m *ManagerImpl) GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices { return m.podDevices.getContainerDevices(podUID, containerName) diff --git a/pkg/kubelet/cm/devicemanager/manager_stub.go b/pkg/kubelet/cm/devicemanager/manager_stub.go index ea04f5a16c021..19da4538ab165 100644 --- a/pkg/kubelet/cm/devicemanager/manager_stub.go +++ b/pkg/kubelet/cm/devicemanager/manager_stub.go @@ -93,3 +93,8 @@ func (h *ManagerStub) ShouldResetExtendedResourceCapacity() bool { func (h *ManagerStub) UpdateAllocatedDevices() { return } + +// GetAllocatableDevices returns nothing +func (h *ManagerStub) GetAllocatableDevices() ResourceDeviceInstances { + return nil +} diff --git a/pkg/kubelet/cm/devicemanager/pod_devices.go b/pkg/kubelet/cm/devicemanager/pod_devices.go index 8e20eb7bb7bd9..93bfd79ab9c43 100644 --- a/pkg/kubelet/cm/devicemanager/pod_devices.go +++ b/pkg/kubelet/cm/devicemanager/pod_devices.go @@ -346,3 +346,21 @@ func (pdev *podDevices) getContainerDevices(podUID, contName string) []*podresou } return cDev } + +// ResourceDeviceInstances is a map ping resource name -> device name -> device data +type ResourceDeviceInstances map[string]map[string]pluginapi.Device + +func NewResourceDeviceInstances() ResourceDeviceInstances { + return make(ResourceDeviceInstances) +} + +func (rdev ResourceDeviceInstances) Clone() ResourceDeviceInstances { + clone := NewResourceDeviceInstances() + for resourceName, resourceDevs := range rdev { + clone[resourceName] = make(map[string]pluginapi.Device) + for devID, dev := range resourceDevs { + clone[resourceName][devID] = dev + } + } + return clone +} diff --git a/pkg/kubelet/cm/devicemanager/types.go b/pkg/kubelet/cm/devicemanager/types.go index 779d91e3df123..a8ef677316a0f 100644 --- a/pkg/kubelet/cm/devicemanager/types.go +++ b/pkg/kubelet/cm/devicemanager/types.go @@ -77,6 +77,9 @@ type Manager interface { // UpdateAllocatedDevices frees any Devices that are bound to terminated pods. UpdateAllocatedDevices() + + // GetAllocatableDevices returns information about all the devices known to the manager + GetAllocatableDevices() ResourceDeviceInstances } // DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices. diff --git a/pkg/kubelet/cm/fake_container_manager.go b/pkg/kubelet/cm/fake_container_manager.go index 67aaaec506068..454d4f6620290 100644 --- a/pkg/kubelet/cm/fake_container_manager.go +++ b/pkg/kubelet/cm/fake_container_manager.go @@ -26,6 +26,7 @@ import ( podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" @@ -202,3 +203,17 @@ func (cm *FakeContainerManager) GetCPUs(_, _ string) cpuset.CPUSet { cm.CalledFunctions = append(cm.CalledFunctions, "GetCPUs") return cpuset.CPUSet{} } + +func (cm *FakeContainerManager) GetAllocatableDevices() devicemanager.ResourceDeviceInstances { + cm.Lock() + defer cm.Unlock() + cm.CalledFunctions = append(cm.CalledFunctions, "GetAllocatableDevices") + return nil +} + +func (cm *FakeContainerManager) GetAllocatableCPUs() cpuset.CPUSet { + cm.Lock() + defer cm.Unlock() + cm.CalledFunctions = append(cm.CalledFunctions, "GetAllocatableCPUs") + return cpuset.CPUSet{} +}