Skip to content

Commit

Permalink
Merge pull request #31 from G-Research/executor-allocation
Browse files Browse the repository at this point in the history
Refactoring JobLeaseService and ClusterUtilisationService
  • Loading branch information
JamesMurkin authored Jul 26, 2019
2 parents 0a83cde + 204cd66 commit 50ae74d
Show file tree
Hide file tree
Showing 14 changed files with 574 additions and 389 deletions.
2 changes: 1 addition & 1 deletion config/executor/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ task:
utilisationReportingInterval: 10s
missingJobEventReconciliationInterval: 10s
jobLeaseRenewalInterval: 10s
requestNewJobsInterval: 10s
allocateSpareClusterCapacityInterval: 10s
armada:
url : "localhost:50051"
events:
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/imdario/mergo v0.3.7 // indirect
github.com/json-iterator/go v1.1.6 // indirect
github.com/kjk/betterguid v0.0.0-20170621091430-c442874ba63a
github.com/oklog/ulid v1.3.1
github.com/spf13/viper v1.4.0
github.com/stretchr/testify v1.3.0
github.com/yuin/gopher-lua v0.0.0-20190514113301-1cd887cd7036 // indirect
Expand Down
2 changes: 1 addition & 1 deletion internal/executor/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type TaskConfiguration struct {
UtilisationReportingInterval time.Duration
MissingJobEventReconciliationInterval time.Duration
JobLeaseRenewalInterval time.Duration
RequestNewJobsInterval time.Duration
AllocateSpareClusterCapacityInterval time.Duration
}

type ArmadaConfiguration struct {
Expand Down
29 changes: 29 additions & 0 deletions internal/executor/service/cluster_allocation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package service

import (
"fmt"
"github.com/G-Research/k8s-batch/internal/executor/submitter"
)

type ClusterAllocationService struct {
LeaseService JobLeaseService
UtilisationService ClusterUtilisationService
JobSubmitter submitter.JobSubmitter
}

func (allocationService ClusterAllocationService) AllocateSpareClusterCapacity() {
availableResource := allocationService.UtilisationService.GetAvailableClusterCapacity()

newJobs, err := allocationService.LeaseService.RequestJobLeases(availableResource)

if err != nil {
fmt.Printf("Failed to lease new jobs because %s \n", err)
} else {
for _, job := range newJobs {
_, err = allocationService.JobSubmitter.SubmitJob(job)
if err != nil {
fmt.Printf("Failed to submit job %s because %s \n", job.Id, err)
}
}
}
}
121 changes: 92 additions & 29 deletions internal/executor/service/cluster_utilisation_service.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package service

import (
"context"
"fmt"
"github.com/G-Research/k8s-batch/internal/armada/api"
"github.com/G-Research/k8s-batch/internal/common"
Expand All @@ -20,59 +21,121 @@ type ClusterUtilisationService struct {
}

func (clusterUtilisationService ClusterUtilisationService) ReportClusterUtilisation() {
allAvailableProcessingNodes := clusterUtilisationService.getAllAvailableProcessingNodes()
totalNodeResource := util.CalculateTotalResource(allAvailableProcessingNodes)

allActiveManagedPods := getAllActiveManagedPods(clusterUtilisationService.PodLister)
queueReports := createReportsOfQueueUsages(allActiveManagedPods)

allNodes, err := clusterUtilisationService.NodeLister.List(labels.Everything())
if err != nil {
fmt.Println("Error getting node information")
}

allAvailableProcessingNodes := getAllAvailableProcessingNodes(allNodes)
totalNodeResource := calculateTotalResource(allAvailableProcessingNodes)

clusterUsage := api.ClusterUsageReport{
ClusterId: clusterUtilisationService.ClientId,
ReportTime: time.Now(),
Queues: queueReports,
ClusterCapacity: totalNodeResource,
}

if clusterUsage.ClusterId == "" {
err := clusterUtilisationService.reportUsage(&clusterUsage)

if err != nil {
fmt.Printf("Failed to report cluster usage because %s \n", err)
}
}

func (clusterUtilisationService ClusterUtilisationService) GetAvailableClusterCapacity() *common.ComputeResources {
processingNodes := clusterUtilisationService.getAllAvailableProcessingNodes()
allPods, err := clusterUtilisationService.PodLister.List(labels.Everything())
if err != nil {
fmt.Println("Error getting pod information")
}

//TODO Add back in when API side is ready
//ctx, cancel := context.WithTimeout(context.Background(), time.Second)
//defer cancel()
//_, err = clusterUtilisationService.UsageClient.ReportUsage(ctx, &clusterUsage)
//
//if err != nil {
// fmt.Printf("Failed to report cluster usage because %s", err)
//}
podsOnProcessingNodes := getAllPodsOnNodes(allPods, processingNodes)
activePodsOnProcessingNodes := util.FilterCompletedPods(podsOnProcessingNodes)

totalNodeResource := util.CalculateTotalResource(processingNodes)
totalPodResource := util.CalculateTotalResourceLimit(activePodsOnProcessingNodes)

availableResource := totalNodeResource.DeepCopy()
availableResource.Sub(totalPodResource)

return &availableResource
}

func getAllActiveManagedPods(podLister lister.PodLister) []*v1.Pod {
managedPodSelector, err := util.CreateLabelSelectorForManagedPods()
func (clusterUtilisationService ClusterUtilisationService) getAllAvailableProcessingNodes() []*v1.Node {
allNodes, err := clusterUtilisationService.NodeLister.List(labels.Everything())
if err != nil {
//TODO Handle error case
fmt.Println("Error getting node information")
return []*v1.Node{}
}

allActiveManagedPods, err := podLister.List(managedPodSelector)
allActiveManagedPods = removePodsInTerminalState(allActiveManagedPods)
return allActiveManagedPods
return filterAvailableProcessingNodes(allNodes)
}

func (clusterUtilisationService ClusterUtilisationService) reportUsage(clusterUsage *api.ClusterUsageReport) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
_, err := clusterUtilisationService.UsageClient.ReportUsage(ctx, clusterUsage)

return err
}

func filterAvailableProcessingNodes(nodes []*v1.Node) []*v1.Node {
processingNodes := make([]*v1.Node, 0, len(nodes))

for _, node := range nodes {
if isAvailableProcessingNode(node) {
processingNodes = append(processingNodes, node)
}
}

return processingNodes
}

func isAvailableProcessingNode(node *v1.Node) bool {
if node.Spec.Unschedulable {
return false
}

noSchedule := false

for _, taint := range node.Spec.Taints {
if taint.Effect == v1.TaintEffectNoSchedule {
noSchedule = true
break
}
}

if noSchedule {
return false
}

return true
}

func removePodsInTerminalState(pods []*v1.Pod) []*v1.Pod {
activePods := make([]*v1.Pod, 0)
func getAllPodsOnNodes(pods []*v1.Pod, nodes []*v1.Node) []*v1.Pod {
podsBelongingToNodes := make([]*v1.Pod, 0, len(pods))

nodeMap := make(map[string]*v1.Node)
for _, node := range nodes {
nodeMap[node.Name] = node
}

for _, pod := range pods {
if !util.IsInTerminalState(pod) {
activePods = append(activePods, pod)
if _, present := nodeMap[pod.Spec.NodeName]; present {
podsBelongingToNodes = append(podsBelongingToNodes, pod)
}
}

return activePods
return podsBelongingToNodes
}

func getAllActiveManagedPods(podLister lister.PodLister) []*v1.Pod {
managedPodSelector := util.GetManagedPodSelector()
allActiveManagedPods, err := podLister.List(managedPodSelector)
if err != nil {
//TODO handle error case
}
allActiveManagedPods = util.FilterNonCompletedPods(allActiveManagedPods)
return allActiveManagedPods
}

func createReportsOfQueueUsages(pods []*v1.Pod) []*api.QueueReport {
Expand All @@ -95,7 +158,7 @@ func getUsageByQueue(pods []*v1.Pod) map[string]common.ComputeResources {

for _, pod := range pods {
queue := pod.Labels[domain.Queue]
podComputeResource := calculateTotalResourceLimit([]*v1.Pod{pod})
podComputeResource := util.CalculateTotalResourceLimit([]*v1.Pod{pod})

if _, ok := utilisationByQueue[queue]; ok {
utilisationByQueue[queue].Add(podComputeResource)
Expand Down
95 changes: 95 additions & 0 deletions internal/executor/service/cluster_utilisation_service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package service

import (
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"testing"
)

func TestFilterAvailableProcessingNodes_ShouldReturnAvailableProcessingNodes(t *testing.T) {
node := v1.Node{
Spec: v1.NodeSpec{
Unschedulable: false,
Taints: nil,
},
}

nodes := []*v1.Node{&node}
result := filterAvailableProcessingNodes(nodes)

assert.Equal(t, len(result), 1)
}

func TestFilterAvailableProcessingNodes_ShouldFilterUnschedulableNodes(t *testing.T) {
node := v1.Node{
Spec: v1.NodeSpec{
Unschedulable: true,
Taints: nil,
},
}

nodes := []*v1.Node{&node}
result := filterAvailableProcessingNodes(nodes)

assert.Equal(t, len(result), 0)
}

func TestFilterAvailableProcessingNodes_ShouldFilterNodesWithNoScheduleTaint(t *testing.T) {
taint := v1.Taint{
Effect: v1.TaintEffectNoSchedule,
}
node := v1.Node{
Spec: v1.NodeSpec{
Unschedulable: false,
Taints: []v1.Taint{taint},
},
}

nodes := []*v1.Node{&node}
result := filterAvailableProcessingNodes(nodes)

assert.Equal(t, len(result), 0)
}

func TestGetAllPodsOnNodes_ShouldExcludePodsNoOnGivenNodes(t *testing.T) {
presentNodeName := "Node1"
podOnNode := v1.Pod{
Spec: v1.PodSpec{
NodeName: presentNodeName,
},
}
podNotOnNode := v1.Pod{
Spec: v1.PodSpec{
NodeName: "Node2",
},
}

node := v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: presentNodeName,
},
}
pods := []*v1.Pod{&podOnNode, &podNotOnNode}
nodes := []*v1.Node{&node}

result := getAllPodsOnNodes(pods, nodes)

assert.Equal(t, len(result), 1)
assert.Equal(t, result[0].Spec.NodeName, presentNodeName)
}

func TestGetAllPodsOnNodes_ShouldHandleNoNodesProvided(t *testing.T) {
podOnNode := v1.Pod{
Spec: v1.PodSpec{
NodeName: "Node1",
},
}

pods := []*v1.Pod{&podOnNode}
var nodes []*v1.Node

result := getAllPodsOnNodes(pods, nodes)

assert.Equal(t, len(result), 0)
}
6 changes: 3 additions & 3 deletions internal/executor/service/job_event_reconciliation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ type JobEventReconciliationService struct {
}

func (reconciliationService JobEventReconciliationService) ReconcileMissingJobEvents() {
selector, err := util.CreateLabelSelectorForManagedPods()
selector := util.GetManagedPodSelector()
allBatchPods, err := reconciliationService.PodLister.List(selector)
if err != nil {
return
//TODO Handle error case
return
}

allBatchPods, err := reconciliationService.PodLister.List(selector)
allPodsWithMissingEvent := filterPodsWithCurrentStateNotReported(allBatchPods)

for _, pod := range allPodsWithMissingEvent {
Expand Down
Loading

0 comments on commit 50ae74d

Please sign in to comment.