Skip to content

Commit

Permalink
node: podresources: implement GetAllocatableResources API
Browse files Browse the repository at this point in the history
Extend the podresources API implementing the GetAllocatableResources endpoint,
as specified in the KEPs:

https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/2043-pod-resource-concrete-assigments
kubernetes/enhancements#2404

Signed-off-by: Francesco Romani <[email protected]>
  • Loading branch information
ffromani committed Mar 7, 2021
1 parent 9f3797e commit 06510cc
Show file tree
Hide file tree
Showing 20 changed files with 379 additions and 35 deletions.
30 changes: 30 additions & 0 deletions pkg/kubelet/apis/podresources/server_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,33 @@ func (p *v1PodResourcesServer) List(ctx context.Context, req *v1.ListPodResource
PodResources: podResources,
}, nil
}

// GetAllocatableResources returns information about all the resources known by the server - this more like the capacity, not like the current amount of free resources.
func (p *v1PodResourcesServer) GetAllocatableResources(ctx context.Context, req *v1.AllocatableResourcesRequest) (*v1.AllocatableResourcesResponse, error) {
metrics.PodResourcesEndpointRequestsTotalCount.WithLabelValues("v1").Inc()

allDevices := p.devicesProvider.GetAllocatableDevices()
var respDevs []*v1.ContainerDevices

for resourceName, resourceDevs := range allDevices {
for devID, dev := range resourceDevs {
for _, node := range dev.GetTopology().GetNodes() {
numaNode := node.GetID()
respDevs = append(respDevs, &v1.ContainerDevices{
ResourceName: resourceName,
DeviceIds: []string{devID},
Topology: &v1.TopologyInfo{
Nodes: []*v1.NUMANode{
{ID: numaNode},
},
},
})
}
}
}

return &v1.AllocatableResourcesResponse{
Devices: respDevs,
CpuIds: p.cpusProvider.GetAllocatableCPUs().ToSliceNoSortInt64(),
}, nil
}
198 changes: 183 additions & 15 deletions pkg/kubelet/apis/podresources/server_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
)

func TestListPodResourcesV1(t *testing.T) {
Expand Down Expand Up @@ -138,6 +140,8 @@ func TestListPodResourcesV1(t *testing.T) {
m.On("GetDevices", string(podUID), containerName).Return(tc.devices)
m.On("GetCPUs", string(podUID), containerName).Return(tc.cpus)
m.On("UpdateAllocatedDevices").Return()
m.On("GetAllocatableCPUs").Return(cpuset.CPUSet{})
m.On("GetAllocatableDevices").Return(devicemanager.NewResourceDeviceInstances())
server := NewV1PodResourcesServer(m, m, m)
resp, err := server.List(context.TODO(), &podresourcesapi.ListPodResourcesRequest{})
if err != nil {
Expand All @@ -150,6 +154,140 @@ func TestListPodResourcesV1(t *testing.T) {
}
}

func TestAllocatableResources(t *testing.T) {
allDevs := devicemanager.ResourceDeviceInstances{
"resource": {
"dev0": {
ID: "GPU-fef8089b-4820-abfc-e83e-94318197576e",
Health: "Healthy",
Topology: &pluginapi.TopologyInfo{
Nodes: []*pluginapi.NUMANode{
{
ID: 0,
},
},
},
},
"dev1": {
ID: "VF-8536e1e8-9dc6-4645-9aea-882db92e31e7",
Health: "Healthy",
Topology: &pluginapi.TopologyInfo{
Nodes: []*pluginapi.NUMANode{
{
ID: 1,
},
},
},
},
},
}
allCPUs := cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16)

for _, tc := range []struct {
desc string
allCPUs cpuset.CPUSet
allDevices devicemanager.ResourceDeviceInstances
expectedAllocatableResourcesResponse *podresourcesapi.AllocatableResourcesResponse
}{
{
desc: "no devices, no CPUs",
allCPUs: cpuset.CPUSet{},
allDevices: devicemanager.NewResourceDeviceInstances(),
expectedAllocatableResourcesResponse: &podresourcesapi.AllocatableResourcesResponse{},
},
{
desc: "no devices, all CPUs",
allCPUs: allCPUs,
allDevices: devicemanager.NewResourceDeviceInstances(),
expectedAllocatableResourcesResponse: &podresourcesapi.AllocatableResourcesResponse{
CpuIds: allCPUs.ToSliceNoSortInt64(),
},
},
{
desc: "with devices, all CPUs",
allCPUs: allCPUs,
allDevices: allDevs,
expectedAllocatableResourcesResponse: &podresourcesapi.AllocatableResourcesResponse{
CpuIds: allCPUs.ToSliceNoSortInt64(),
Devices: []*podresourcesapi.ContainerDevices{
{
ResourceName: "resource",
DeviceIds: []string{"dev0"},
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: 0,
},
},
},
},
{
ResourceName: "resource",
DeviceIds: []string{"dev1"},
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: 1,
},
},
},
},
},
},
},
{
desc: "with devices, no CPUs",
allCPUs: cpuset.CPUSet{},
allDevices: allDevs,
expectedAllocatableResourcesResponse: &podresourcesapi.AllocatableResourcesResponse{
Devices: []*podresourcesapi.ContainerDevices{
{
ResourceName: "resource",
DeviceIds: []string{"dev0"},
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: 0,
},
},
},
},
{
ResourceName: "resource",
DeviceIds: []string{"dev1"},
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: 1,
},
},
},
},
},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
m := new(mockProvider)
m.On("GetDevices", "", "").Return([]*podresourcesapi.ContainerDevices{})
m.On("GetCPUs", "", "").Return(cpuset.CPUSet{})
m.On("UpdateAllocatedDevices").Return()
m.On("GetAllocatableDevices").Return(tc.allDevices)
m.On("GetAllocatableCPUs").Return(tc.allCPUs)
server := NewV1PodResourcesServer(m, m, m)

resp, err := server.GetAllocatableResources(context.TODO(), &podresourcesapi.AllocatableResourcesRequest{})
if err != nil {
t.Errorf("want err = %v, got %q", nil, err)
}

if !equalAllocatableResourcesResponse(tc.expectedAllocatableResourcesResponse, resp) {
t.Errorf("want resp = %s, got %s", tc.expectedAllocatableResourcesResponse.String(), resp.String())
}
})
}
}

func equalListResponse(respA, respB *podresourcesapi.ListPodResourcesResponse) bool {
if len(respA.PodResources) != len(respB.PodResources) {
return false
Expand Down Expand Up @@ -177,26 +315,49 @@ func equalListResponse(respA, respB *podresourcesapi.ListPodResourcesResponse) b
return false
}

if len(cntA.Devices) != len(cntB.Devices) {
if !equalContainerDevices(cntA.Devices, cntB.Devices) {
return false
}
}
}
return true
}

for kdx := 0; kdx < len(cntA.Devices); kdx++ {
cntDevA := cntA.Devices[kdx]
cntDevB := cntB.Devices[kdx]

if cntDevA.ResourceName != cntDevB.ResourceName {
return false
}
if !equalTopology(cntDevA.Topology, cntDevB.Topology) {
return false
}
if !equalStrings(cntDevA.DeviceIds, cntDevB.DeviceIds) {
return false
}
}
func equalContainerDevices(devA, devB []*podresourcesapi.ContainerDevices) bool {
if len(devA) != len(devB) {
return false
}

// the ordering of container devices in the response is not defined,
// so we need to do a full scan, failing at first mismatch
for idx := 0; idx < len(devA); idx++ {
if !containsContainerDevice(devA[idx], devB) {
return false
}
}

return true
}

func containsContainerDevice(cntDev *podresourcesapi.ContainerDevices, devs []*podresourcesapi.ContainerDevices) bool {
for idx := 0; idx < len(devs); idx++ {
if equalContainerDevice(cntDev, devs[idx]) {
return true
}
}
return false
}

func equalContainerDevice(cntDevA, cntDevB *podresourcesapi.ContainerDevices) bool {
if cntDevA.ResourceName != cntDevB.ResourceName {
return false
}
if !equalTopology(cntDevA.Topology, cntDevB.Topology) {
return false
}
if !equalStrings(cntDevA.DeviceIds, cntDevB.DeviceIds) {
return false
}
return true
}

Expand Down Expand Up @@ -231,3 +392,10 @@ func equalTopology(a, b *podresourcesapi.TopologyInfo) bool {
}
return reflect.DeepEqual(a, b)
}

func equalAllocatableResourcesResponse(respA, respB *podresourcesapi.AllocatableResourcesResponse) bool {
if !equalInt64s(respA.CpuIds, respB.CpuIds) {
return false
}
return equalContainerDevices(respA.Devices, respB.Devices)
}
11 changes: 11 additions & 0 deletions pkg/kubelet/apis/podresources/server_v1alpha1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
podresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubelet/pkg/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
)

type mockProvider struct {
Expand All @@ -53,6 +54,16 @@ func (m *mockProvider) UpdateAllocatedDevices() {
m.Called()
}

func (m *mockProvider) GetAllocatableDevices() devicemanager.ResourceDeviceInstances {
args := m.Called()
return args.Get(0).(devicemanager.ResourceDeviceInstances)
}

func (m *mockProvider) GetAllocatableCPUs() cpuset.CPUSet {
args := m.Called()
return args.Get(0).(cpuset.CPUSet)
}

func TestListPodResourcesV1alpha1(t *testing.T) {
podName := "pod-name"
podNamespace := "pod-namespace"
Expand Down
10 changes: 9 additions & 1 deletion pkg/kubelet/apis/podresources/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@ import (
"k8s.io/api/core/v1"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
)

// DevicesProvider knows how to provide the devices used by the given container
type DevicesProvider interface {
GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices
// UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
UpdateAllocatedDevices()
// GetDevices returns information about the devices assigned to pods and containers
GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices
// GetAllocatableDevices returns information about all the devices known to the manager
GetAllocatableDevices() devicemanager.ResourceDeviceInstances
}

// PodsProvider knows how to provide the pods admitted by the node
Expand All @@ -35,5 +40,8 @@ type PodsProvider interface {

// CPUsProvider knows how to provide the cpus used by the given container
type CPUsProvider interface {
// GetCPUs returns information about the cpus assigned to pods and containers
GetCPUs(podUID, containerName string) cpuset.CPUSet
// GetAllocatableCPUs returns the allocatable (not allocated) CPUs
GetAllocatableCPUs() cpuset.CPUSet
}
13 changes: 4 additions & 9 deletions pkg/kubelet/cm/container_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
// TODO: Migrate kubelet to either use its own internal objects or client library.
v1 "k8s.io/api/core/v1"
internalapi "k8s.io/cri-api/pkg/apis"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
Expand Down Expand Up @@ -103,21 +103,16 @@ type ContainerManager interface {
// registration.
GetPluginRegistrationHandler() cache.PluginHandler

// GetDevices returns information about the devices assigned to pods and containers
GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices

// GetCPUs returns information about the cpus assigned to pods and containers
GetCPUs(podUID, containerName string) cpuset.CPUSet

// ShouldResetExtendedResourceCapacity returns whether or not the extended resources should be zeroed,
// due to node recreation.
ShouldResetExtendedResourceCapacity() bool

// GetAllocateResourcesPodAdmitHandler returns an instance of a PodAdmitHandler responsible for allocating pod resources.
GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler

// UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
UpdateAllocatedDevices()
// Implements the podresources Provider API for CPUs and Devices
podresources.CPUsProvider
podresources.DevicesProvider
}

type NodeConfig struct {
Expand Down
8 changes: 8 additions & 0 deletions pkg/kubelet/cm/container_manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -1073,10 +1073,18 @@ func (cm *containerManagerImpl) GetDevices(podUID, containerName string) []*podr
return cm.deviceManager.GetDevices(podUID, containerName)
}

func (cm *containerManagerImpl) GetAllocatableDevices() devicemanager.ResourceDeviceInstances {
return cm.deviceManager.GetAllocatableDevices()
}

func (cm *containerManagerImpl) GetCPUs(podUID, containerName string) cpuset.CPUSet {
return cm.cpuManager.GetCPUs(podUID, containerName).Clone()
}

func (cm *containerManagerImpl) GetAllocatableCPUs() cpuset.CPUSet {
return cm.cpuManager.GetAllocatableCPUs()
}

func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
return cm.deviceManager.ShouldResetExtendedResourceCapacity()
}
Expand Down
Loading

0 comments on commit 06510cc

Please sign in to comment.