Skip to content

Commit

Permalink
Revert kubeflow#135: change rtype to commonv1.ReplicaType
Browse files Browse the repository at this point in the history
Since reconciler.v1 add more changes. This revert's scope is larger than original PR.

Signed-off-by: Jiaxin Shan <[email protected]>
  • Loading branch information
Jeffwan committed Sep 20, 2021
1 parent a86572b commit 070e195
Show file tree
Hide file tree
Showing 20 changed files with 97 additions and 82 deletions.
2 changes: 1 addition & 1 deletion pkg/apis/common/v1/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type ControllerInterface interface {
UpdateJobStatusInApiServer(job interface{}, jobStatus *JobStatus) error

// SetClusterSpec sets the cluster spec for the pod
SetClusterSpec(job interface{}, podTemplate *v1.PodTemplateSpec, rtype ReplicaType, index string) error
SetClusterSpec(job interface{}, podTemplate *v1.PodTemplateSpec, rtype, index string) error

// Returns the default container name in pod
GetDefaultContainerName() string
Expand Down
6 changes: 4 additions & 2 deletions pkg/controller.v1/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package common
import (
"fmt"
"reflect"
"strings"
"time"

apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"
Expand Down Expand Up @@ -300,9 +301,10 @@ func (jc *JobController) ReconcileJobs(
// ResetExpectations reset the expectation for creates and deletes of pod/service to zero.
func (jc *JobController) ResetExpectations(jobKey string, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) {
for rtype := range replicas {
expectationPodsKey := expectation.GenExpectationPodsKey(jobKey, rtype)
rt := strings.ToLower(string(rtype))
expectationPodsKey := expectation.GenExpectationPodsKey(jobKey, rt)
jc.Expectations.SetExpectations(expectationPodsKey, 0, 0)
expectationServicesKey := expectation.GenExpectationServicesKey(jobKey, rtype)
expectationServicesKey := expectation.GenExpectationServicesKey(jobKey, rt)
jc.Expectations.SetExpectations(expectationServicesKey, 0, 0)
}
}
Expand Down
20 changes: 11 additions & 9 deletions pkg/controller.v1/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"reflect"
"strconv"
"strings"

apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"github.com/kubeflow/common/pkg/controller.v1/control"
Expand Down Expand Up @@ -113,7 +114,6 @@ func (jc *JobController) AddPod(obj interface{}) {

return
}

}

// When a pod is updated, figure out what job is managing it and wake it up.
Expand Down Expand Up @@ -255,7 +255,7 @@ func (jc *JobController) GetPodsForJob(jobObject interface{}) ([]*v1.Pod, error)
}

// FilterPodsForReplicaType returns pods belong to a replicaType.
func (jc *JobController) FilterPodsForReplicaType(pods []*v1.Pod, replicaType apiv1.ReplicaType) ([]*v1.Pod, error) {
func (jc *JobController) FilterPodsForReplicaType(pods []*v1.Pod, replicaType string) ([]*v1.Pod, error) {
return core.FilterPodsForReplicaType(pods, replicaType)
}

Expand Down Expand Up @@ -288,12 +288,14 @@ func (jc *JobController) ReconcilePods(
utilruntime.HandleError(fmt.Errorf("couldn't get key for job object %#v: %v", job, err))
return err
}
expectationPodsKey := expectation.GenExpectationPodsKey(jobKey, rtype)

// Convert ReplicaType to lower string.
logger := commonutil.LoggerForReplica(metaObject, rtype)
rt := strings.ToLower(string(rtype))
expectationPodsKey := expectation.GenExpectationPodsKey(jobKey, rt)

logger := commonutil.LoggerForReplica(metaObject, rt)
// Get all pods for the type rt.
pods, err = jc.FilterPodsForReplicaType(pods, rtype)
pods, err = jc.FilterPodsForReplicaType(pods, rt)
if err != nil {
return err
}
Expand All @@ -311,13 +313,13 @@ func (jc *JobController) ReconcilePods(
podSlices := jc.GetPodSlices(pods, numReplicas, logger)
for index, podSlice := range podSlices {
if len(podSlice) > 1 {
logger.Warningf("We have too many pods for %s %d", rtype, index)
logger.Warningf("We have too many pods for %s %d", rt, index)
} else if len(podSlice) == 0 {
logger.Infof("Need to create new pod: %s-%d", rtype, index)
logger.Infof("Need to create new pod: %s-%d", rt, index)

// check if this replica is the master role
masterRole = jc.Controller.IsMasterRole(replicas, rtype, index)
err = jc.createNewPod(job, rtype, index, spec, masterRole, replicas)
err = jc.createNewPod(job, rt, index, spec, masterRole, replicas)
if err != nil {
return err
}
Expand Down Expand Up @@ -365,7 +367,7 @@ func (jc *JobController) ReconcilePods(
}

// createNewPod creates a new pod for the given index and type.
func (jc *JobController) createNewPod(job interface{}, rt apiv1.ReplicaType, index int, spec *apiv1.ReplicaSpec, masterRole bool,
func (jc *JobController) createNewPod(job interface{}, rt string, index int, spec *apiv1.ReplicaSpec, masterRole bool,
replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) error {

metaObject, ok := job.(metav1.Object)
Expand Down
22 changes: 14 additions & 8 deletions pkg/controller.v1/common/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package common
import (
"fmt"
"strconv"
"strings"

apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"github.com/kubeflow/common/pkg/controller.v1/control"
Expand Down Expand Up @@ -139,7 +140,7 @@ func (jc *JobController) GetServicesForJob(jobObject interface{}) ([]*v1.Service
}

// FilterServicesForReplicaType returns service belong to a replicaType.
func (jc *JobController) FilterServicesForReplicaType(services []*v1.Service, replicaType apiv1.ReplicaType) ([]*v1.Service, error) {
func (jc *JobController) FilterServicesForReplicaType(services []*v1.Service, replicaType string) ([]*v1.Service, error) {
return core.FilterServicesForReplicaType(services, replicaType)
}

Expand All @@ -158,9 +159,11 @@ func (jc *JobController) ReconcileServices(
rtype apiv1.ReplicaType,
spec *apiv1.ReplicaSpec) error {

// Convert ReplicaType to lower string.
rt := strings.ToLower(string(rtype))
replicas := int(*spec.Replicas)
// Get all services for the type rt.
services, err := jc.FilterServicesForReplicaType(services, rtype)
services, err := jc.FilterServicesForReplicaType(services, rt)
if err != nil {
return err
}
Expand All @@ -171,13 +174,13 @@ func (jc *JobController) ReconcileServices(
// If replica is 4, return a slice with size 4. [[0],[1],[2],[]], a svc with replica-index 3 will be created.
//
// If replica is 1, return a slice with size 3. [[0],[1],[2]], svc with replica-index 1 and 2 are out of range and will be deleted.
serviceSlices := jc.GetServiceSlices(services, replicas, commonutil.LoggerForReplica(job, rtype))
serviceSlices := jc.GetServiceSlices(services, replicas, commonutil.LoggerForReplica(job, rt))

for index, serviceSlice := range serviceSlices {
if len(serviceSlice) > 1 {
commonutil.LoggerForReplica(job, rtype).Warningf("We have too many services for %s %d", rtype, index)
commonutil.LoggerForReplica(job, rt).Warningf("We have too many services for %s %d", rt, index)
} else if len(serviceSlice) == 0 {
commonutil.LoggerForReplica(job, rtype).Infof("need to create new service: %s-%d", rtype, index)
commonutil.LoggerForReplica(job, rt).Infof("need to create new service: %s-%d", rt, index)
err = jc.CreateNewService(job, rtype, spec, strconv.Itoa(index))
if err != nil {
return err
Expand Down Expand Up @@ -212,9 +215,12 @@ func (jc *JobController) CreateNewService(job metav1.Object, rtype apiv1.Replica
return err
}

// Convert ReplicaType to lower string.
rt := strings.ToLower(string(rtype))

// Append ReplicaTypeLabelDeprecated and ReplicaIndexLabelDeprecated labels.
labels := jc.GenLabels(job.GetName())
utillabels.SetReplicaType(labels, rtype)
utillabels.SetReplicaType(labels, rt)
utillabels.SetReplicaIndexStr(labels, index)

ports, err := jc.GetPortsFromJob(spec)
Expand All @@ -236,13 +242,13 @@ func (jc *JobController) CreateNewService(job metav1.Object, rtype apiv1.Replica
service.Spec.Ports = append(service.Spec.Ports, svcPort)
}

service.Name = GenGeneralName(job.GetName(), rtype, index)
service.Name = GenGeneralName(job.GetName(), rt, index)
service.Labels = labels
// Create OwnerReference.
controllerRef := jc.GenOwnerReference(job)

// Creation is expected when there is no error returned
expectationServicesKey := expectation.GenExpectationServicesKey(jobKey, rtype)
expectationServicesKey := expectation.GenExpectationServicesKey(jobKey, rt)
jc.Expectations.RaiseExpectations(expectationServicesKey, 1, 0)

err = jc.ServiceControl.CreateServicesWithControllerRef(job.GetNamespace(), service, job.(runtime.Object), controllerRef)
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller.v1/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ func (p ReplicasPriority) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
}

func GenGeneralName(jobName string, rtype apiv1.ReplicaType, index string) string {
n := jobName + "-" + strings.ToLower(string(rtype)) + "-" + index
func GenGeneralName(jobName string, rtype, index string) string {
n := jobName + "-" + strings.ToLower(rtype) + "-" + index
return strings.Replace(n, "/", "-", -1)
}

Expand Down
4 changes: 1 addition & 3 deletions pkg/controller.v1/common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ import (
"testing"

"github.com/stretchr/testify/assert"

apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"
)

func TestGenGeneralName(t *testing.T) {
tcs := []struct {
index string
key string
replicaType apiv1.ReplicaType
replicaType string
expectedName string
}{
{
Expand Down
9 changes: 4 additions & 5 deletions pkg/controller.v1/expectation/util.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package expectation

import (
apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"strings"
)

// GenExpectationPodsKey generates an expectation key for pods of a job
func GenExpectationPodsKey(jobKey string, replicaType apiv1.ReplicaType) string {
return jobKey + "/" + strings.ToLower(string(replicaType)) + "/pods"
func GenExpectationPodsKey(jobKey string, replicaType string) string {
return jobKey + "/" + strings.ToLower(replicaType) + "/pods"
}

// GenExpectationPodsKey generates an expectation key for services of a job
func GenExpectationServicesKey(jobKey string, replicaType apiv1.ReplicaType) string {
return jobKey + "/" + strings.ToLower(string(replicaType)) + "/services"
func GenExpectationServicesKey(jobKey string, replicaType string) string {
return jobKey + "/" + strings.ToLower(replicaType) + "/services"
}
6 changes: 4 additions & 2 deletions pkg/core/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"sort"
"strings"
"time"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -77,7 +78,7 @@ func PastActiveDeadline(runPolicy *apiv1.RunPolicy, jobStatus apiv1.JobStatus) b
// this method applies only to pods with restartPolicy == OnFailure or Always
func PastBackoffLimit(jobName string, runPolicy *apiv1.RunPolicy,
replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec, pods []*v1.Pod,
podFilterFunc func(pods []*v1.Pod, replicaType apiv1.ReplicaType) ([]*v1.Pod, error)) (bool, error) {
podFilterFunc func(pods []*v1.Pod, replicaType string) ([]*v1.Pod, error)) (bool, error) {
if runPolicy.BackoffLimit == nil {
return false, nil
}
Expand All @@ -88,7 +89,8 @@ func PastBackoffLimit(jobName string, runPolicy *apiv1.RunPolicy,
continue
}
// Convert ReplicaType to lower string.
pods, err := podFilterFunc(pods, rtype)
rt := strings.ToLower(string(rtype))
pods, err := podFilterFunc(pods, rt)
if err != nil {
return false, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/core/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ import (
)

// FilterPodsForReplicaType returns pods belong to a replicaType.
func FilterPodsForReplicaType(pods []*v1.Pod, replicaType apiv1.ReplicaType) ([]*v1.Pod, error) {
func FilterPodsForReplicaType(pods []*v1.Pod, replicaType string) ([]*v1.Pod, error) {
var result []*v1.Pod

selector := labels.SelectorFromValidatedSet(labels.Set{
apiv1.ReplicaTypeLabel: string(replicaType),
apiv1.ReplicaTypeLabel: replicaType,
})

// TODO(#149): Remove deprecated selector.
deprecatedSelector := labels.SelectorFromValidatedSet(labels.Set{
apiv1.ReplicaTypeLabelDeprecated: string(replicaType),
apiv1.ReplicaTypeLabelDeprecated: replicaType,
})

for _, pod := range pods {
Expand Down
6 changes: 3 additions & 3 deletions pkg/core/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ import (
)

// FilterServicesForReplicaType returns service belong to a replicaType.
func FilterServicesForReplicaType(services []*v1.Service, replicaType apiv1.ReplicaType) ([]*v1.Service, error) {
func FilterServicesForReplicaType(services []*v1.Service, replicaType string) ([]*v1.Service, error) {
var result []*v1.Service

selector := labels.SelectorFromValidatedSet(labels.Set{
apiv1.ReplicaTypeLabel: string(replicaType),
apiv1.ReplicaTypeLabel: replicaType,
})

// TODO(#149): Remove deprecated selector.
deprecatedSelector := labels.SelectorFromValidatedSet(labels.Set{
apiv1.ReplicaTypeLabelDeprecated: string(replicaType),
apiv1.ReplicaTypeLabelDeprecated: replicaType,
})

for _, service := range services {
Expand Down
6 changes: 2 additions & 4 deletions pkg/core/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package core

import (
"strings"

commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"
)

func MaxInt(x, y int) int {
Expand All @@ -13,7 +11,7 @@ func MaxInt(x, y int) int {
return x
}

func GenGeneralName(jobName string, rtype commonv1.ReplicaType, index string) string {
n := jobName + "-" + strings.ToLower(string(rtype)) + "-" + index
func GenGeneralName(jobName string, rtype string, index string) string {
n := jobName + "-" + strings.ToLower(rtype) + "-" + index
return strings.Replace(n, "/", "-", -1)
}
4 changes: 2 additions & 2 deletions pkg/reconciler.v1/common/gang_volcano.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,12 @@ func (r *VolcanoReconciler) ReconcilePodGroup(
}

// DecoratePodForGangScheduling decorates the podTemplate before it's used to generate a pod with information for gang-scheduling
func (r *VolcanoReconciler) DecoratePodForGangScheduling(rtype commonv1.ReplicaType, podTemplate *corev1.PodTemplateSpec, job client.Object) {
func (r *VolcanoReconciler) DecoratePodForGangScheduling(rt string, podTemplate *corev1.PodTemplateSpec, job client.Object) {
if podTemplate.Spec.SchedulerName == "" || podTemplate.Spec.SchedulerName == r.GetGangSchedulerName() {
podTemplate.Spec.SchedulerName = r.GetGangSchedulerName()
} else {
warnMsg := "Another scheduler is specified when gang-scheduling is enabled and it will not be overwritten"
commonutil.LoggerForReplica(job, rtype).Warn(warnMsg)
commonutil.LoggerForReplica(job, rt).Warn(warnMsg)
r.GetRecorder().Event(job, corev1.EventTypeWarning, "PodTemplateSchedulerNameAlreadySet", warnMsg)
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/reconciler.v1/common/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type GangSchedulingInterface interface {

// DecoratePodForGangScheduling SHOULD be overridden if gang scheduler demands Pods associated with PodGroup to be
// decorated with specific requests.
DecoratePodForGangScheduling(rtype commonv1.ReplicaType, podTemplate *corev1.PodTemplateSpec, job client.Object)
DecoratePodForGangScheduling(rtype string, podTemplate *corev1.PodTemplateSpec, job client.Object)
}

// PodInterface defines the abstract interface for Pod related actions, such like get, create or delete Pod
Expand All @@ -90,14 +90,14 @@ type PodInterface interface {
GetDefaultContainerName() string

// GenPodName CAN be overridden to customize Pod name.
GenPodName(jobName string, rtype commonv1.ReplicaType, index string) string
GenPodName(jobName string, rtype string, index string) string

// GetPodsForJob CAN be overridden to customize how to list all pods with the job.
GetPodsForJob(ctx context.Context, job client.Object) ([]*corev1.Pod, error)

// FilterPodsForReplicaType CAN be overridden if the linking approach between pods and replicaType changes as this
// function filters out pods for specific replica type from all pods associated with the job.
FilterPodsForReplicaType(pods []*corev1.Pod, replicaType commonv1.ReplicaType) ([]*corev1.Pod, error)
FilterPodsForReplicaType(pods []*corev1.Pod, replicaType string) ([]*corev1.Pod, error)

// GetPodSlices SHOULD NOT be overridden as it generates pod slices for further pod processing.
GetPodSlices(pods []*corev1.Pod, replicas int, logger *logrus.Entry) [][]*corev1.Pod
Expand All @@ -121,7 +121,7 @@ type PodInterface interface {

// DecoratePod CAN be overridden if customization to the pod is needed. The default implementation applies nothing
// to the pod.
DecoratePod(rtype commonv1.ReplicaType, podTemplate *corev1.PodTemplateSpec, job client.Object)
DecoratePod(rtype string, podTemplate *corev1.PodTemplateSpec, job client.Object)
}

// ServiceInterface defines the abstract interface for Pod related actions, such like get, create or delete Service
Expand All @@ -137,7 +137,7 @@ type ServiceInterface interface {

// FilterServicesForReplicaType CAN be overridden to customize how to filter out services for this Replica Type.
FilterServicesForReplicaType(services []*corev1.Service,
replicaType commonv1.ReplicaType) ([]*corev1.Service, error)
replicaType string) ([]*corev1.Service, error)

// GetServiceSlices CAN be overridden to customize how to generate service slices.
GetServiceSlices(services []*corev1.Service, replicas int, logger *logrus.Entry) [][]*corev1.Service
Expand All @@ -157,7 +157,7 @@ type ServiceInterface interface {
DeleteService(ns string, name string, job client.Object) error

// DecorateService CAN be overridden to customize this service right before being created
DecorateService(rtype commonv1.ReplicaType, svc *corev1.Service, job client.Object)
DecorateService(rtype string, svc *corev1.Service, job client.Object)
}

// JobInterface defines the abstract interface for Pod related actions, such like get, create or delete TFJob,
Expand Down
Loading

0 comments on commit 070e195

Please sign in to comment.