Skip to content
This repository has been archived by the owner on Sep 12, 2023. It is now read-only.

WIP: Revert #135: change rtype to commonv1.ReplicaType #163

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/apis/common/v1/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type ControllerInterface interface {
UpdateJobStatusInApiServer(job interface{}, jobStatus *JobStatus) error

// SetClusterSpec sets the cluster spec for the pod
SetClusterSpec(job interface{}, podTemplate *v1.PodTemplateSpec, rtype ReplicaType, index string) error
SetClusterSpec(job interface{}, podTemplate *v1.PodTemplateSpec, rtype, index string) error

// Returns the default container name in pod
GetDefaultContainerName() string
Expand Down
6 changes: 4 additions & 2 deletions pkg/controller.v1/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package common
import (
"fmt"
"reflect"
"strings"
"time"

apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"
Expand Down Expand Up @@ -300,9 +301,10 @@ func (jc *JobController) ReconcileJobs(
// ResetExpectations reset the expectation for creates and deletes of pod/service to zero.
func (jc *JobController) ResetExpectations(jobKey string, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) {
for rtype := range replicas {
expectationPodsKey := expectation.GenExpectationPodsKey(jobKey, rtype)
rt := strings.ToLower(string(rtype))
expectationPodsKey := expectation.GenExpectationPodsKey(jobKey, rt)
jc.Expectations.SetExpectations(expectationPodsKey, 0, 0)
expectationServicesKey := expectation.GenExpectationServicesKey(jobKey, rtype)
expectationServicesKey := expectation.GenExpectationServicesKey(jobKey, rt)
jc.Expectations.SetExpectations(expectationServicesKey, 0, 0)
}
}
Expand Down
20 changes: 11 additions & 9 deletions pkg/controller.v1/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"reflect"
"strconv"
"strings"

apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"github.com/kubeflow/common/pkg/controller.v1/control"
Expand Down Expand Up @@ -113,7 +114,6 @@ func (jc *JobController) AddPod(obj interface{}) {

return
}

}

// When a pod is updated, figure out what job is managing it and wake it up.
Expand Down Expand Up @@ -255,7 +255,7 @@ func (jc *JobController) GetPodsForJob(jobObject interface{}) ([]*v1.Pod, error)
}

// FilterPodsForReplicaType returns pods belong to a replicaType.
func (jc *JobController) FilterPodsForReplicaType(pods []*v1.Pod, replicaType apiv1.ReplicaType) ([]*v1.Pod, error) {
func (jc *JobController) FilterPodsForReplicaType(pods []*v1.Pod, replicaType string) ([]*v1.Pod, error) {
return core.FilterPodsForReplicaType(pods, replicaType)
}

Expand Down Expand Up @@ -288,12 +288,14 @@ func (jc *JobController) ReconcilePods(
utilruntime.HandleError(fmt.Errorf("couldn't get key for job object %#v: %v", job, err))
return err
}
expectationPodsKey := expectation.GenExpectationPodsKey(jobKey, rtype)

// Convert ReplicaType to lower string.
logger := commonutil.LoggerForReplica(metaObject, rtype)
rt := strings.ToLower(string(rtype))
expectationPodsKey := expectation.GenExpectationPodsKey(jobKey, rt)

logger := commonutil.LoggerForReplica(metaObject, rt)
// Get all pods for the type rt.
pods, err = jc.FilterPodsForReplicaType(pods, rtype)
pods, err = jc.FilterPodsForReplicaType(pods, rt)
if err != nil {
return err
}
Expand All @@ -311,13 +313,13 @@ func (jc *JobController) ReconcilePods(
podSlices := jc.GetPodSlices(pods, numReplicas, logger)
for index, podSlice := range podSlices {
if len(podSlice) > 1 {
logger.Warningf("We have too many pods for %s %d", rtype, index)
logger.Warningf("We have too many pods for %s %d", rt, index)
} else if len(podSlice) == 0 {
logger.Infof("Need to create new pod: %s-%d", rtype, index)
logger.Infof("Need to create new pod: %s-%d", rt, index)

// check if this replica is the master role
masterRole = jc.Controller.IsMasterRole(replicas, rtype, index)
err = jc.createNewPod(job, rtype, index, spec, masterRole, replicas)
err = jc.createNewPod(job, rt, index, spec, masterRole, replicas)
if err != nil {
return err
}
Expand Down Expand Up @@ -365,7 +367,7 @@ func (jc *JobController) ReconcilePods(
}

// createNewPod creates a new pod for the given index and type.
func (jc *JobController) createNewPod(job interface{}, rt apiv1.ReplicaType, index int, spec *apiv1.ReplicaSpec, masterRole bool,
func (jc *JobController) createNewPod(job interface{}, rt string, index int, spec *apiv1.ReplicaSpec, masterRole bool,
replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) error {

metaObject, ok := job.(metav1.Object)
Expand Down
22 changes: 14 additions & 8 deletions pkg/controller.v1/common/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package common
import (
"fmt"
"strconv"
"strings"

apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"github.com/kubeflow/common/pkg/controller.v1/control"
Expand Down Expand Up @@ -139,7 +140,7 @@ func (jc *JobController) GetServicesForJob(jobObject interface{}) ([]*v1.Service
}

// FilterServicesForReplicaType returns service belong to a replicaType.
func (jc *JobController) FilterServicesForReplicaType(services []*v1.Service, replicaType apiv1.ReplicaType) ([]*v1.Service, error) {
func (jc *JobController) FilterServicesForReplicaType(services []*v1.Service, replicaType string) ([]*v1.Service, error) {
return core.FilterServicesForReplicaType(services, replicaType)
}

Expand All @@ -158,9 +159,11 @@ func (jc *JobController) ReconcileServices(
rtype apiv1.ReplicaType,
spec *apiv1.ReplicaSpec) error {

// Convert ReplicaType to lower string.
rt := strings.ToLower(string(rtype))
replicas := int(*spec.Replicas)
// Get all services for the type rt.
services, err := jc.FilterServicesForReplicaType(services, rtype)
services, err := jc.FilterServicesForReplicaType(services, rt)
if err != nil {
return err
}
Expand All @@ -171,13 +174,13 @@ func (jc *JobController) ReconcileServices(
// If replica is 4, return a slice with size 4. [[0],[1],[2],[]], a svc with replica-index 3 will be created.
//
// If replica is 1, return a slice with size 3. [[0],[1],[2]], svc with replica-index 1 and 2 are out of range and will be deleted.
serviceSlices := jc.GetServiceSlices(services, replicas, commonutil.LoggerForReplica(job, rtype))
serviceSlices := jc.GetServiceSlices(services, replicas, commonutil.LoggerForReplica(job, rt))

for index, serviceSlice := range serviceSlices {
if len(serviceSlice) > 1 {
commonutil.LoggerForReplica(job, rtype).Warningf("We have too many services for %s %d", rtype, index)
commonutil.LoggerForReplica(job, rt).Warningf("We have too many services for %s %d", rt, index)
} else if len(serviceSlice) == 0 {
commonutil.LoggerForReplica(job, rtype).Infof("need to create new service: %s-%d", rtype, index)
commonutil.LoggerForReplica(job, rt).Infof("need to create new service: %s-%d", rt, index)
err = jc.CreateNewService(job, rtype, spec, strconv.Itoa(index))
if err != nil {
return err
Expand Down Expand Up @@ -212,9 +215,12 @@ func (jc *JobController) CreateNewService(job metav1.Object, rtype apiv1.Replica
return err
}

// Convert ReplicaType to lower string.
rt := strings.ToLower(string(rtype))

// Append ReplicaTypeLabelDeprecated and ReplicaIndexLabelDeprecated labels.
labels := jc.GenLabels(job.GetName())
utillabels.SetReplicaType(labels, rtype)
utillabels.SetReplicaType(labels, rt)
utillabels.SetReplicaIndexStr(labels, index)

ports, err := jc.GetPortsFromJob(spec)
Expand All @@ -236,13 +242,13 @@ func (jc *JobController) CreateNewService(job metav1.Object, rtype apiv1.Replica
service.Spec.Ports = append(service.Spec.Ports, svcPort)
}

service.Name = GenGeneralName(job.GetName(), rtype, index)
service.Name = GenGeneralName(job.GetName(), rt, index)
service.Labels = labels
// Create OwnerReference.
controllerRef := jc.GenOwnerReference(job)

// Creation is expected when there is no error returned
expectationServicesKey := expectation.GenExpectationServicesKey(jobKey, rtype)
expectationServicesKey := expectation.GenExpectationServicesKey(jobKey, rt)
jc.Expectations.RaiseExpectations(expectationServicesKey, 1, 0)

err = jc.ServiceControl.CreateServicesWithControllerRef(job.GetNamespace(), service, job.(runtime.Object), controllerRef)
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller.v1/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ func (p ReplicasPriority) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
}

func GenGeneralName(jobName string, rtype apiv1.ReplicaType, index string) string {
n := jobName + "-" + strings.ToLower(string(rtype)) + "-" + index
func GenGeneralName(jobName string, rtype, index string) string {
n := jobName + "-" + strings.ToLower(rtype) + "-" + index
return strings.Replace(n, "/", "-", -1)
}

Expand Down
4 changes: 1 addition & 3 deletions pkg/controller.v1/common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ import (
"testing"

"github.com/stretchr/testify/assert"

apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"
)

func TestGenGeneralName(t *testing.T) {
tcs := []struct {
index string
key string
replicaType apiv1.ReplicaType
replicaType string
expectedName string
}{
{
Expand Down
9 changes: 4 additions & 5 deletions pkg/controller.v1/expectation/util.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package expectation

import (
apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"strings"
)

// GenExpectationPodsKey generates an expectation key for pods of a job
func GenExpectationPodsKey(jobKey string, replicaType apiv1.ReplicaType) string {
return jobKey + "/" + strings.ToLower(string(replicaType)) + "/pods"
func GenExpectationPodsKey(jobKey string, replicaType string) string {
return jobKey + "/" + strings.ToLower(replicaType) + "/pods"
}

// GenExpectationPodsKey generates an expectation key for services of a job
func GenExpectationServicesKey(jobKey string, replicaType apiv1.ReplicaType) string {
return jobKey + "/" + strings.ToLower(string(replicaType)) + "/services"
func GenExpectationServicesKey(jobKey string, replicaType string) string {
return jobKey + "/" + strings.ToLower(replicaType) + "/services"
}
6 changes: 4 additions & 2 deletions pkg/core/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"sort"
"strings"
"time"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -77,7 +78,7 @@ func PastActiveDeadline(runPolicy *apiv1.RunPolicy, jobStatus apiv1.JobStatus) b
// this method applies only to pods with restartPolicy == OnFailure or Always
func PastBackoffLimit(jobName string, runPolicy *apiv1.RunPolicy,
replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec, pods []*v1.Pod,
podFilterFunc func(pods []*v1.Pod, replicaType apiv1.ReplicaType) ([]*v1.Pod, error)) (bool, error) {
podFilterFunc func(pods []*v1.Pod, replicaType string) ([]*v1.Pod, error)) (bool, error) {
if runPolicy.BackoffLimit == nil {
return false, nil
}
Expand All @@ -88,7 +89,8 @@ func PastBackoffLimit(jobName string, runPolicy *apiv1.RunPolicy,
continue
}
// Convert ReplicaType to lower string.
pods, err := podFilterFunc(pods, rtype)
rt := strings.ToLower(string(rtype))
pods, err := podFilterFunc(pods, rt)
if err != nil {
return false, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/core/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ import (
)

// FilterPodsForReplicaType returns pods belong to a replicaType.
func FilterPodsForReplicaType(pods []*v1.Pod, replicaType apiv1.ReplicaType) ([]*v1.Pod, error) {
func FilterPodsForReplicaType(pods []*v1.Pod, replicaType string) ([]*v1.Pod, error) {
var result []*v1.Pod

selector := labels.SelectorFromValidatedSet(labels.Set{
apiv1.ReplicaTypeLabel: string(replicaType),
apiv1.ReplicaTypeLabel: replicaType,
})

// TODO(#149): Remove deprecated selector.
deprecatedSelector := labels.SelectorFromValidatedSet(labels.Set{
apiv1.ReplicaTypeLabelDeprecated: string(replicaType),
apiv1.ReplicaTypeLabelDeprecated: replicaType,
})

for _, pod := range pods {
Expand Down
6 changes: 3 additions & 3 deletions pkg/core/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ import (
)

// FilterServicesForReplicaType returns service belong to a replicaType.
func FilterServicesForReplicaType(services []*v1.Service, replicaType apiv1.ReplicaType) ([]*v1.Service, error) {
func FilterServicesForReplicaType(services []*v1.Service, replicaType string) ([]*v1.Service, error) {
var result []*v1.Service

selector := labels.SelectorFromValidatedSet(labels.Set{
apiv1.ReplicaTypeLabel: string(replicaType),
apiv1.ReplicaTypeLabel: replicaType,
})

// TODO(#149): Remove deprecated selector.
deprecatedSelector := labels.SelectorFromValidatedSet(labels.Set{
apiv1.ReplicaTypeLabelDeprecated: string(replicaType),
apiv1.ReplicaTypeLabelDeprecated: replicaType,
})

for _, service := range services {
Expand Down
6 changes: 2 additions & 4 deletions pkg/core/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package core

import (
"strings"

commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"
)

func MaxInt(x, y int) int {
Expand All @@ -13,7 +11,7 @@ func MaxInt(x, y int) int {
return x
}

func GenGeneralName(jobName string, rtype commonv1.ReplicaType, index string) string {
n := jobName + "-" + strings.ToLower(string(rtype)) + "-" + index
func GenGeneralName(jobName string, rtype string, index string) string {
n := jobName + "-" + strings.ToLower(rtype) + "-" + index
return strings.Replace(n, "/", "-", -1)
}
4 changes: 2 additions & 2 deletions pkg/reconciler.v1/common/gang_volcano.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,12 @@ func (r *VolcanoReconciler) ReconcilePodGroup(
}

// DecoratePodForGangScheduling decorates the podTemplate before it's used to generate a pod with information for gang-scheduling
func (r *VolcanoReconciler) DecoratePodForGangScheduling(rtype commonv1.ReplicaType, podTemplate *corev1.PodTemplateSpec, job client.Object) {
func (r *VolcanoReconciler) DecoratePodForGangScheduling(rt string, podTemplate *corev1.PodTemplateSpec, job client.Object) {
if podTemplate.Spec.SchedulerName == "" || podTemplate.Spec.SchedulerName == r.GetGangSchedulerName() {
podTemplate.Spec.SchedulerName = r.GetGangSchedulerName()
} else {
warnMsg := "Another scheduler is specified when gang-scheduling is enabled and it will not be overwritten"
commonutil.LoggerForReplica(job, rtype).Warn(warnMsg)
commonutil.LoggerForReplica(job, rt).Warn(warnMsg)
r.GetRecorder().Event(job, corev1.EventTypeWarning, "PodTemplateSchedulerNameAlreadySet", warnMsg)
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/reconciler.v1/common/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type GangSchedulingInterface interface {

// DecoratePodForGangScheduling SHOULD be overridden if gang scheduler demands Pods associated with PodGroup to be
// decorated with specific requests.
DecoratePodForGangScheduling(rtype commonv1.ReplicaType, podTemplate *corev1.PodTemplateSpec, job client.Object)
DecoratePodForGangScheduling(rtype string, podTemplate *corev1.PodTemplateSpec, job client.Object)
}

// PodInterface defines the abstract interface for Pod related actions, such like get, create or delete Pod
Expand All @@ -90,14 +90,14 @@ type PodInterface interface {
GetDefaultContainerName() string

// GenPodName CAN be overridden to customize Pod name.
GenPodName(jobName string, rtype commonv1.ReplicaType, index string) string
GenPodName(jobName string, rtype string, index string) string

// GetPodsForJob CAN be overridden to customize how to list all pods with the job.
GetPodsForJob(ctx context.Context, job client.Object) ([]*corev1.Pod, error)

// FilterPodsForReplicaType CAN be overridden if the linking approach between pods and replicaType changes as this
// function filters out pods for specific replica type from all pods associated with the job.
FilterPodsForReplicaType(pods []*corev1.Pod, replicaType commonv1.ReplicaType) ([]*corev1.Pod, error)
FilterPodsForReplicaType(pods []*corev1.Pod, replicaType string) ([]*corev1.Pod, error)

// GetPodSlices SHOULD NOT be overridden as it generates pod slices for further pod processing.
GetPodSlices(pods []*corev1.Pod, replicas int, logger *logrus.Entry) [][]*corev1.Pod
Expand All @@ -121,7 +121,7 @@ type PodInterface interface {

// DecoratePod CAN be overridden if customization to the pod is needed. The default implementation applies nothing
// to the pod.
DecoratePod(rtype commonv1.ReplicaType, podTemplate *corev1.PodTemplateSpec, job client.Object)
DecoratePod(rtype string, podTemplate *corev1.PodTemplateSpec, job client.Object)
}

// ServiceInterface defines the abstract interface for Pod related actions, such like get, create or delete Service
Expand All @@ -137,7 +137,7 @@ type ServiceInterface interface {

// FilterServicesForReplicaType CAN be overridden to customize how to filter out services for this Replica Type.
FilterServicesForReplicaType(services []*corev1.Service,
replicaType commonv1.ReplicaType) ([]*corev1.Service, error)
replicaType string) ([]*corev1.Service, error)

// GetServiceSlices CAN be overridden to customize how to generate service slices.
GetServiceSlices(services []*corev1.Service, replicas int, logger *logrus.Entry) [][]*corev1.Service
Expand All @@ -157,7 +157,7 @@ type ServiceInterface interface {
DeleteService(ns string, name string, job client.Object) error

// DecorateService CAN be overridden to customize this service right before being created
DecorateService(rtype commonv1.ReplicaType, svc *corev1.Service, job client.Object)
DecorateService(rtype string, svc *corev1.Service, job client.Object)
}

// JobInterface defines the abstract interface for Pod related actions, such like get, create or delete TFJob,
Expand Down
Loading