Skip to content

Commit

Permalink
Update plugin to honor opaque configs in ResourceClaims / DeviceClasses
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Klues <[email protected]>
  • Loading branch information
klueska committed Aug 29, 2024
1 parent 6adfa55 commit abf989c
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 15 deletions.
35 changes: 28 additions & 7 deletions cmd/dra-example-kubeletplugin/cdi.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (cdi *CDIHandler) CreateCommonSpecFile() error {
Name: cdiCommonDeviceName,
ContainerEdits: cdispec.ContainerEdits{
Env: []string{
fmt.Sprintf("GPU_NODE_NAME=%s", os.Getenv("NODE_NAME")),
fmt.Sprintf("KUBERNETES_NODE_NAME=%s", os.Getenv("NODE_NAME")),
fmt.Sprintf("DRA_RESOURCE_DRIVER_NAME=%s", DriverName),
},
},
Expand Down Expand Up @@ -89,18 +89,39 @@ func (cdi *CDIHandler) CreateClaimSpecFile(claimUID string, devices PreparedDevi
Devices: []cdispec.Device{},
}

gpuIdx := 0
for _, device := range devices {
for i, device := range devices {
envs := []string{
fmt.Sprintf("GPU_DEVICE_%d=%s", i, device.DeviceName),
}

if device.Config.Sharing != nil {
envs = append(envs, fmt.Sprintf("GPU_DEVICE_%d_SHARING_STRATEGY=%s", i, device.Config.Sharing.Strategy))
}

switch {
case device.Config.Sharing.IsTimeSlicing():
tsconfig, err := device.Config.Sharing.GetTimeSlicingConfig()
if err != nil {
return fmt.Errorf("unable to get time slicing config for device %v: %v", device.DeviceName, err)
}
envs = append(envs, fmt.Sprintf("GPU_DEVICE_%d_TIMESLICE_INTERVAL=%v", i, tsconfig.Interval))

case device.Config.Sharing.IsSpacePartitioning():
spconfig, err := device.Config.Sharing.GetSpacePartitioningConfig()
if err != nil {
return fmt.Errorf("unable to get space partitioning config for device %v: %v", device.DeviceName, err)
}
envs = append(envs, fmt.Sprintf("GPU_DEVICE_%d_PARTITION_COUNT=%v", i, spconfig.PartitionCount))
}

cdiDevice := cdispec.Device{
Name: device.DeviceName,
ContainerEdits: cdispec.ContainerEdits{
Env: []string{
fmt.Sprintf("GPU_DEVICE_%d=%s", gpuIdx, device.DeviceName),
},
Env: envs,
},
}

spec.Devices = append(spec.Devices, cdiDevice)
gpuIdx++
}

minVersion, err := cdiapi.MinimumRequiredVersion(spec)
Expand Down
151 changes: 143 additions & 8 deletions cmd/dra-example-kubeletplugin/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,26 @@ package main

import (
"fmt"
"slices"
"sync"

resourceapi "k8s.io/api/resource/v1alpha3"
"k8s.io/apimachinery/pkg/runtime"
drapbv1 "k8s.io/kubelet/pkg/apis/dra/v1alpha4"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"

configapi "sigs.k8s.io/dra-example-driver/api/example.com/resource/gpu/v1alpha1"
)

type AllocatableDevices map[string]resourceapi.Device
type PreparedDevices []*drapbv1.Device
type PreparedDevices []*PreparedDevice
type PreparedClaims map[string]PreparedDevices

type PreparedDevice struct {
drapbv1.Device
Config configapi.GpuConfig
}

type DeviceState struct {
sync.Mutex
cdi *CDIHandler
Expand Down Expand Up @@ -95,7 +104,7 @@ func (s *DeviceState) Prepare(claim *resourceapi.ResourceClaim) ([]*drapbv1.Devi
preparedClaims := checkpoint.V1.PreparedClaims

if preparedClaims[claimUID] != nil {
return preparedClaims[claimUID], nil
return preparedClaims[claimUID].GetDevices(), nil
}

preparedDevices, err := s.prepareDevices(claim)
Expand All @@ -112,7 +121,7 @@ func (s *DeviceState) Prepare(claim *resourceapi.ResourceClaim) ([]*drapbv1.Devi
return nil, fmt.Errorf("unable to sync to checkpoint: %v", err)
}

return preparedClaims[claimUID], nil
return preparedClaims[claimUID].GetDevices(), nil
}

func (s *DeviceState) Unprepare(claimUID string) error {
Expand Down Expand Up @@ -151,17 +160,63 @@ func (s *DeviceState) prepareDevices(claim *resourceapi.ResourceClaim) (Prepared
return nil, fmt.Errorf("claim not yet allocated")
}

// Walk through each device allocation and prepare it.
var preparedDevices PreparedDevices
for _, result := range claim.Status.Allocation.Devices.Results {
if _, exists := s.allocatable[result.Device]; !exists {
return nil, fmt.Errorf("requested GPU is not allocatable: %v", result.Device)
}

device := &drapbv1.Device{
RequestNames: []string{result.Request},
PoolName: result.Pool,
DeviceName: result.Device,
CDIDeviceIDs: s.cdi.GetClaimDevices([]string{result.Device}),
// Retrieve the set of device configs for the current allocation.
configs, err := GetOpaqueDeviceConfigs(
configapi.Decoder,
DriverName,
result.Request,
claim.Status.Allocation.Devices.Config,
)
if err != nil {
return nil, fmt.Errorf("error getting GPU configs: %v", err)
}

// Select the GPU config with the highest precedence from those retrieved.
// Use the default GPU config if no GPU configs have been returned.
config := configapi.DefaultGpuConfig()
for _, c := range configs {
switch castConfig := c.(type) {
case *configapi.GpuConfig:
config = castConfig
default:
return nil, fmt.Errorf("runtime object is not a regognized configuration")
}
}

// Normalize the config to set any implied defaults.
if err := config.Normalize(); err != nil {
return nil, fmt.Errorf("error normalizing GPU config: %w", err)
}

// Validate the config to ensure its integrity.
if err := config.Validate(); err != nil {
return nil, fmt.Errorf("error validating GPU config: %w", err)
}

device := &PreparedDevice{
Device: drapbv1.Device{
RequestNames: []string{result.Request},
PoolName: result.Pool,
DeviceName: result.Device,
CDIDeviceIDs: s.cdi.GetClaimDevices([]string{result.Device}),
},
Config: *config,
}

// Apply any requested configuration here.
//
// In this example driver there is nothing to do at this point, but a
// real driver would likely need to do some sort of hardware
// configuration , based on the config that has been passed in.
if err := device.applyConfig(); err != nil {
return nil, fmt.Errorf("error applying GPU config: %v", err)
}

preparedDevices = append(preparedDevices, device)
Expand All @@ -173,3 +228,83 @@ func (s *DeviceState) prepareDevices(claim *resourceapi.ResourceClaim) (Prepared
func (s *DeviceState) unprepareDevices(claimUID string, devices PreparedDevices) error {
return nil
}

func (p *PreparedDevice) applyConfig() error {
return nil
}

// GetDevices extracts the list of drapbv1.Devices from PreparedDevices.
func (pds PreparedDevices) GetDevices() []*drapbv1.Device {
var devices []*drapbv1.Device
for _, pd := range pds {
devices = append(devices, &pd.Device)
}
return devices
}

// GetOpaqueDeviceConfigs returns an ordered list of configs specified for a device request in a resource claim.
//
// Configs can either come from the resource claim itself or from the device
// class associated with the request. Configs coming directly from the resource
// claim take precedence over configs coming from the device class. Moreover,
// configs found later in the list of configs attached to its source take
// precedence over configs found earlier in the list for that source.
//
// All of the configs relevant to the specified request for this driver will be
// returned in order of precedence (from lowest to highest). If no config is
// found, nil is returned.
func GetOpaqueDeviceConfigs(
decoder runtime.Decoder,
driverName,
request string,
possibleConfigs []resourceapi.DeviceAllocationConfiguration,
) ([]runtime.Object, error) {
// Collect all configs in order of reverse precedence.
var classConfigs []resourceapi.DeviceConfiguration
var claimConfigs []resourceapi.DeviceConfiguration
var candidateConfigs []resourceapi.DeviceConfiguration
for _, config := range possibleConfigs {
// If the config is for specific requests and the current request isn't
// one of those, the config can be ignored.
if len(config.Requests) != 0 && !slices.Contains(config.Requests, request) {
continue
}
switch config.Source {
case resourceapi.AllocationConfigSourceClass:
classConfigs = append(classConfigs, config.DeviceConfiguration)
case resourceapi.AllocationConfigSourceClaim:
claimConfigs = append(claimConfigs, config.DeviceConfiguration)
default:
return nil, fmt.Errorf("invalid config source: %v", config.Source)
}
}
candidateConfigs = append(candidateConfigs, classConfigs...)
candidateConfigs = append(candidateConfigs, claimConfigs...)

// Decode all configs that are relevant for the driver.
var resultConfigs []runtime.Object
for _, config := range candidateConfigs {
// If this is nil, the driver doesn't support some future API extension
// and needs to be updated.
if config.Opaque == nil {
return nil, fmt.Errorf("only opaque parameters are supported by this driver")
}

// Configs for different drivers may have been specified because a
// single request can be satisfied by different drivers. This is not
// an error -- drivers must skip over other driver's configs in order
// to support this.
if config.Opaque.Driver != driverName {
continue
}

c, err := runtime.Decode(decoder, config.Opaque.Parameters.Raw)
if err != nil {
return nil, fmt.Errorf("error decoding config parameters: %w", err)
}

resultConfigs = append(resultConfigs, c)
}

return resultConfigs, nil
}

0 comments on commit abf989c

Please sign in to comment.