Skip to content

Commit

Permalink
Merge pull request #3325 from belo4ya/fix-data-race-for-shared-devices
Browse files Browse the repository at this point in the history
fix Data Race leading to panic in Scheduler (and use more efficient strings comparison)
  • Loading branch information
volcano-sh-bot authored May 9, 2024
2 parents 15df16e + 88fbcfc commit 9e28783
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 15 deletions.
4 changes: 2 additions & 2 deletions pkg/scheduler/api/devices/nvidia/vgpu/device_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (gs *GPUDevices) AddResource(pod *v1.Pod) {
break
}
for index, gsdevice := range gs.Device {
if strings.Compare(gsdevice.UUID, deviceused.UUID) == 0 {
if gsdevice.UUID == deviceused.UUID {
klog.V(4).Infoln("VGPU recording pod", pod.Name, "device", deviceused)
gs.Device[index].UsedMem += uint(deviceused.Usedmem)
gs.Device[index].UsedNum++
Expand All @@ -156,7 +156,7 @@ func (gs *GPUDevices) SubResource(pod *v1.Pod) {
break
}
for index, gsdevice := range gs.Device {
if strings.Compare(gsdevice.UUID, deviceused.UUID) == 0 {
if gsdevice.UUID == deviceused.UUID {
klog.V(4).Infoln("VGPU subsctracting pod", pod.Name, "device", deviceused)
gs.Device[index].UsedMem -= uint(deviceused.Usedmem)
gs.Device[index].UsedNum--
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/api/devices/nvidia/vgpu/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func checkType(annos map[string]string, d GPUDevice, n ContainerDeviceRequest) b
if !strings.Contains(d.Type, n.Type) {
return false
}
if strings.Compare(n.Type, NvidiaGPUDevice) == 0 {
if n.Type == NvidiaGPUDevice {
return checkGPUtype(annos, d.Type)
}
klog.Errorf("Unrecognized device %v", n.Type)
Expand Down
7 changes: 4 additions & 3 deletions pkg/scheduler/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,11 +345,12 @@ func (ni *NodeInfo) SetNode(node *v1.Node) {

// setNodeOthersResource initialize sharable devices
func (ni *NodeInfo) setNodeOthersResource(node *v1.Node) {
IgnoredDevicesList = []string{}
ni.Others[GPUSharingDevice] = gpushare.NewGPUDevices(ni.Name, node)
ni.Others[vgpu.DeviceName] = vgpu.NewGPUDevices(ni.Name, node)
IgnoredDevicesList = append(IgnoredDevicesList, ni.Others[GPUSharingDevice].(Devices).GetIgnoredDevices()...)
IgnoredDevicesList = append(IgnoredDevicesList, ni.Others[vgpu.DeviceName].(Devices).GetIgnoredDevices()...)
IgnoredDevicesList.Set(
ni.Others[GPUSharingDevice].(Devices).GetIgnoredDevices(),
ni.Others[vgpu.DeviceName].(Devices).GetIgnoredDevices(),
)
}

// setNode sets kubernetes node object to nodeInfo object without assertion
Expand Down
9 changes: 5 additions & 4 deletions pkg/scheduler/api/resource_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,13 @@ func NewResource(rl v1.ResourceList) *Resource {
//NOTE: When converting this back to k8s resource, we need record the format as well as / 1000
if v1helper.IsScalarResourceName(rName) {
ignore := false
for _, val := range IgnoredDevicesList {
if strings.Compare(rName.String(), val) == 0 {
IgnoredDevicesList.Range(func(_ int, val string) bool {
if rName.String() == val {
ignore = true
break
return false
}
}
return true
})
if !ignore {
r.AddScalar(rName, float64(rQuant.MilliValue()))
} else {
Expand Down
30 changes: 28 additions & 2 deletions pkg/scheduler/api/shared_device_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package api

import (
"sync"

v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"

Expand Down Expand Up @@ -73,8 +75,32 @@ type Devices interface {
// make sure GPUDevices implements Devices interface
var _ Devices = new(gpushare.GPUDevices)

var IgnoredDevicesList []string

var RegisteredDevices = []string{
GPUSharingDevice, vgpu.DeviceName,
}

var IgnoredDevicesList = ignoredDevicesList{}

type ignoredDevicesList struct {
sync.RWMutex
ignoredDevices []string
}

func (l *ignoredDevicesList) Set(deviceLists ...[]string) {
l.Lock()
defer l.Unlock()
l.ignoredDevices = l.ignoredDevices[:0]
for _, devices := range deviceLists {
l.ignoredDevices = append(l.ignoredDevices, devices...)
}
}

func (l *ignoredDevicesList) Range(f func(i int, device string) bool) {
l.RLock()
defer l.RUnlock()
for i, device := range l.ignoredDevices {
if !f(i, device) {
break
}
}
}
126 changes: 126 additions & 0 deletions pkg/scheduler/api/shared_device_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package api

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"
)

func Test_ignoredDevicesList_Set_BasicUsage(t *testing.T) {
tests := []struct {
name string
deviceLists [][]string
expectedIgnoredDevices []string
}{
{
name: "set several values to ignoredDevicesList",
deviceLists: [][]string{{"volcano.sh/vgpu-memory", "volcano.sh/vgpu-memory-percentage", "volcano.sh/vgpu-cores"}},
expectedIgnoredDevices: []string{"volcano.sh/vgpu-memory", "volcano.sh/vgpu-memory-percentage", "volcano.sh/vgpu-cores"},
},
{
name: "set several lists of values to ignoredDevicesList atomically",
deviceLists: [][]string{{"volcano.sh/vgpu-memory"}, {"volcano.sh/vgpu-memory-percentage", "volcano.sh/vgpu-cores"}},
expectedIgnoredDevices: []string{"volcano.sh/vgpu-memory", "volcano.sh/vgpu-memory-percentage", "volcano.sh/vgpu-cores"},
},
{
name: "possible way to clear ignoredDevicesList",
deviceLists: nil,
expectedIgnoredDevices: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
lst := ignoredDevicesList{}
lst.Set(tt.deviceLists...)
assert.Equal(t, tt.expectedIgnoredDevices, lst.ignoredDevices)
})
}
}

func Test_ignoredDevicesList_Range_BasicUsage(t *testing.T) {
lst := ignoredDevicesList{}
lst.Set([]string{"volcano.sh/vgpu-memory", "volcano.sh/vgpu-memory-percentage", "volcano.sh/vgpu-cores"})

t.Run("read and copy values from the ignoredDevicesList", func(t *testing.T) {
ignoredDevices := make([]string, 0, len(lst.ignoredDevices))
lst.Range(func(_ int, device string) bool {
ignoredDevices = append(ignoredDevices, device)
return true
})
assert.Equal(t, lst.ignoredDevices, ignoredDevices)
})

t.Run("break iteration through the ignoredDevicesList", func(t *testing.T) {
i := 0
flag := false
lst.Range(func(_ int, device string) bool {
i++
if lst.ignoredDevices[1] == device {
flag = true
return false
}
return true
})

assert.Equal(t, true, flag)
assert.Equal(t, 2, i)
})
}

func Test_ignoredDevicesList_Set_Concurrent(t *testing.T) {
lst := ignoredDevicesList{}
expected := []string{"volcano.sh/vgpu-memory", "volcano.sh/vgpu-memory-percentage", "volcano.sh/vgpu-cores"}

var wg sync.WaitGroup
wg.Add(8)
for i := 0; i < 8; i++ {
go func() {
defer wg.Done()
lst.Set(expected)
}()
}
wg.Wait()

assert.Equal(t, expected, lst.ignoredDevices)
}

func Test_ignoredDevicesList_Range_Concurrent(t *testing.T) {
lst := ignoredDevicesList{}
lst.Set([]string{"volcano.sh/vgpu-memory", "volcano.sh/vgpu-memory-percentage", "volcano.sh/vgpu-cores"})

var wg sync.WaitGroup
wg.Add(8)
for i := 0; i < 8; i++ {
go func() {
defer wg.Done()
ignoredDevices := make([]string, 0, len(lst.ignoredDevices))
lst.Range(func(_ int, device string) bool {
ignoredDevices = append(ignoredDevices, device)
return true
})
assert.Equal(t, ignoredDevices, lst.ignoredDevices)
}()
}
wg.Wait()
}

func Test_ignoredDevicesList_NoRace(t *testing.T) {
lst := ignoredDevicesList{}

var wg sync.WaitGroup
wg.Add(16)
for i := 0; i < 8; i++ {
go func() {
defer wg.Done()
lst.Set([]string{"volcano.sh/vgpu-memory", "volcano.sh/vgpu-memory-percentage", "volcano.sh/vgpu-cores"})
}()
go func() {
defer wg.Done()
lst.Range(func(_ int, _ string) bool {
return true
})
}()
}
wg.Wait()
}
7 changes: 4 additions & 3 deletions pkg/scheduler/plugins/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,13 @@ func GetInqueueResource(job *api.JobInfo, allocated *api.Resource) *api.Resource
continue
}
ignore := false
for _, ignoredDevice := range api.IgnoredDevicesList {
api.IgnoredDevicesList.Range(func(_ int, ignoredDevice string) bool {
if len(ignoredDevice) > 0 && strings.Contains(rName.String(), ignoredDevice) {
ignore = true
break
return false
}
}
return true
})
if ignore {
continue
}
Expand Down

0 comments on commit 9e28783

Please sign in to comment.