Skip to content

Commit

Permalink
chore: Bump kubeflow/common version to 0.3.6
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffwan committed Aug 29, 2021
1 parent ff5aaf1 commit ca2e0ab
Show file tree
Hide file tree
Showing 12 changed files with 249 additions and 36 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ go 1.14

require (
github.com/go-logr/logr v0.3.0
github.com/go-openapi/spec v0.19.9
github.com/kubeflow/common v0.3.4
github.com/go-openapi/spec v0.20.3
github.com/kubeflow/common v0.3.6
github.com/onrik/logrus v0.2.2-0.20181225141908-a09d5cdcdc62
github.com/onsi/ginkgo v1.14.1
github.com/onsi/gomega v1.10.2
github.com/prometheus/client_golang v1.7.1
github.com/prometheus/client_golang v1.10.0
github.com/sirupsen/logrus v1.6.0
k8s.io/api v0.19.9
k8s.io/apiextensions-apiserver v0.19.9
Expand Down
199 changes: 199 additions & 0 deletions go.sum

Large diffs are not rendered by default.

13 changes: 7 additions & 6 deletions pkg/common/util/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ func SatisfiedExpectations(exp expectation.ControllerExpectationsInterface, jobK
satisfied := false
for _, rtype := range replicaTypes {
// Check the expectations of the pods.
expectationPodsKey := expectation.GenExpectationPodsKey(jobKey, string(rtype))
expectationPodsKey := expectation.GenExpectationPodsKey(jobKey, rtype)
satisfied = satisfied || exp.SatisfiedExpectations(expectationPodsKey)
// Check the expectations of the services.
expectationServicesKey := expectation.GenExpectationServicesKey(jobKey, string(rtype))
expectationServicesKey := expectation.GenExpectationServicesKey(jobKey, rtype)
satisfied = satisfied || exp.SatisfiedExpectations(expectationServicesKey)
}

Expand All @@ -61,11 +61,12 @@ func OnDependentCreateFunc(exp expectation.ControllerExpectationsInterface) func
jobKey := fmt.Sprintf("%s/%s", e.Object.GetNamespace(), controllerRef.Name)
var expectKey string
if _, ok := e.Object.(*corev1.Pod); ok {
expectKey = expectation.GenExpectationPodsKey(jobKey, rtype)
expectKey = expectation.GenExpectationPodsKey(jobKey, commonv1.ReplicaType(rtype))
}
// TODO: str to rType.

if _, ok := e.Object.(*corev1.Service); ok {
expectKey = expectation.GenExpectationServicesKey(jobKey, rtype)
expectKey = expectation.GenExpectationServicesKey(jobKey, commonv1.ReplicaType(rtype))
}
exp.CreationObserved(expectKey)
return true
Expand Down Expand Up @@ -155,11 +156,11 @@ func OnDependentDeleteFunc(exp expectation.ControllerExpectationsInterface) func
jobKey := fmt.Sprintf("%s/%s", e.Object.GetNamespace(), controllerRef.Name)
var expectKey string
if _, ok := e.Object.(*corev1.Pod); ok {
expectKey = expectation.GenExpectationPodsKey(jobKey, rtype)
expectKey = expectation.GenExpectationPodsKey(jobKey, commonv1.ReplicaType(rtype))
}

if _, ok := e.Object.(*corev1.Service); ok {
expectKey = expectation.GenExpectationServicesKey(jobKey, rtype)
expectKey = expectation.GenExpectationServicesKey(jobKey, commonv1.ReplicaType(rtype))
}

exp.DeletionObserved(expectKey)
Expand Down
6 changes: 6 additions & 0 deletions pkg/common/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package util

import (
"strings"

commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"
corev1 "k8s.io/api/core/v1"
)
Expand Down Expand Up @@ -50,3 +52,7 @@ func GetReplicaTypes(specs map[commonv1.ReplicaType]*commonv1.ReplicaSpec) []com
}
return keys
}

func IsReplicaTypeSame(a, b commonv1.ReplicaType) bool {
return strings.ToLower(string(a)) == strings.ToLower(string(b))
}
2 changes: 1 addition & 1 deletion pkg/controller.v1/mxnet/mxjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ func (r *MXJobReconciler) UpdateJobStatusInApiServer(job interface{}, jobStatus
return nil
}

func (r *MXJobReconciler) SetClusterSpec(job interface{}, podTemplate *corev1.PodTemplateSpec, rtype, index string) error {
func (r *MXJobReconciler) SetClusterSpec(job interface{}, podTemplate *corev1.PodTemplateSpec, rtype commonv1.ReplicaType, index string) error {
return SetPodEnv(job, podTemplate, rtype, index)
}

Expand Down
14 changes: 8 additions & 6 deletions pkg/controller.v1/mxnet/mxnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"strconv"
"strings"

"github.com/kubeflow/tf-operator/pkg/common/util"

commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"github.com/kubeflow/common/pkg/controller.v1/common"
mxnetv1 "github.com/kubeflow/tf-operator/pkg/apis/mxnet/v1"
Expand Down Expand Up @@ -66,7 +68,7 @@ type TaskSpec struct {
Index int `json:"index"`
}

func SetPodEnv(job interface{}, podTemplate *corev1.PodTemplateSpec, rtype, index string) error {
func SetPodEnv(job interface{}, podTemplate *corev1.PodTemplateSpec, rtype commonv1.ReplicaType, index string) error {
mxJob, ok := job.(*mxnetv1.MXJob)
if !ok {
return fmt.Errorf("%v is not a type of MXJob", mxJob)
Expand Down Expand Up @@ -133,7 +135,7 @@ func SetPodEnv(job interface{}, podTemplate *corev1.PodTemplateSpec, rtype, inde
return nil
}

func genMXConfig(mxjob *mxnetv1.MXJob, rtype, index string) (MXConfig, error) {
func genMXConfig(mxjob *mxnetv1.MXJob, rtype commonv1.ReplicaType, index string) (MXConfig, error) {
// Configure the MXCONFIG environment variable.
i, err := strconv.ParseInt(index, 0, 32)
if err != nil {
Expand All @@ -154,7 +156,7 @@ func genMXConfig(mxjob *mxnetv1.MXJob, rtype, index string) (MXConfig, error) {
Cluster: cluster,
Labels: labels,
Task: TaskSpec{
Type: rtype,
Type: strings.ToLower(string(rtype)),
Index: int(i),
},
}
Expand All @@ -176,7 +178,7 @@ func genClusterSpec(mxjob *mxnetv1.MXJob) (ClusterSpec, error) {
}
for i := int32(0); i < *spec.Replicas; i++ {
host := UrlPort{
Url: common.GenGeneralName(mxjob.Name, rt, fmt.Sprintf("%d", i)),
Url: common.GenGeneralName(mxjob.Name, rtype, fmt.Sprintf("%d", i)),
Port: int(port),
}
replicaNames = append(replicaNames, host)
Expand Down Expand Up @@ -237,8 +239,8 @@ func getPortFromMXJob(mxJob *mxnetv1.MXJob, rtype commonv1.ReplicaType) (int32,
return -1, errPortNotFound
}

func addBytePSEnv(c *corev1.Container, rtype, index string) {
if rtype == strings.ToLower(string(mxnetv1.MXReplicaTypeWorker)) {
func addBytePSEnv(c *corev1.Container, rtype commonv1.ReplicaType, index string) {
if util.IsReplicaTypeSame(rtype, mxnetv1.MXReplicaTypeWorker) {
c.Env = append(c.Env, corev1.EnvVar{
Name: "DMLC_WORKER_ID",
Value: index,
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller.v1/pytorch/pytorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (

commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"
pytorchv1 "github.com/kubeflow/tf-operator/pkg/apis/pytorch/v1"
commonutil "github.com/kubeflow/tf-operator/pkg/common/util"
corev1 "k8s.io/api/core/v1"
)

func SetPodEnv(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype, index string) error {
func SetPodEnv(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype commonv1.ReplicaType, index string) error {
pytorchjob, ok := obj.(*pytorchv1.PyTorchJob)
if !ok {
return fmt.Errorf("%+v is not a type of PyTorchJob", obj)
Expand All @@ -43,7 +44,7 @@ func SetPodEnv(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype,
}

masterAddr := genGeneralName(pytorchjob.Name, strings.ToLower(string(pytorchv1.PyTorchReplicaTypeMaster)), strconv.Itoa(0))
if rtype == strings.ToLower(string(pytorchv1.PyTorchReplicaTypeMaster)) {
if commonutil.IsReplicaTypeSame(rtype, pytorchv1.PyTorchReplicaTypeMaster) {
if rank != 0 {
return fmt.Errorf("invalid config: There should be only a single master with index=0")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller.v1/pytorch/pytorchjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func (r *PyTorchJobReconciler) UpdateJobStatusInApiServer(job interface{}, jobSt
}

// SetClusterSpec sets the cluster spec for the pod
func (r *PyTorchJobReconciler) SetClusterSpec(job interface{}, podTemplate *corev1.PodTemplateSpec, rtype, index string) error {
func (r *PyTorchJobReconciler) SetClusterSpec(job interface{}, podTemplate *corev1.PodTemplateSpec, rtype commonv1.ReplicaType, index string) error {
return SetPodEnv(job, podTemplate, rtype, index)
}

Expand Down
12 changes: 7 additions & 5 deletions pkg/controller.v1/tensorflow/tensorflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"strconv"
"strings"

commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"

"github.com/kubeflow/common/pkg/controller.v1/common"
tfv1 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1"
)
Expand Down Expand Up @@ -94,7 +96,7 @@ func convertClusterSpecToSparseClusterSpec(clusterSpec ClusterSpec, rtype string
// },
// }
// }
func genTFConfigJSONStr(tfjob *tfv1.TFJob, rtype, index string) (string, error) {
func genTFConfigJSONStr(tfjob *tfv1.TFJob, rtype commonv1.ReplicaType, index string) (string, error) {
// Configure the TFCONFIG environment variable.
i, err := strconv.ParseInt(index, 0, 32)
if err != nil {
Expand All @@ -108,11 +110,11 @@ func genTFConfigJSONStr(tfjob *tfv1.TFJob, rtype, index string) (string, error)

var tfConfigJSONByteSlice []byte
if tfjob.Spec.EnableDynamicWorker {
sparseCluster := convertClusterSpecToSparseClusterSpec(cluster, strings.ToLower(rtype), int32(i))
sparseCluster := convertClusterSpecToSparseClusterSpec(cluster, strings.ToLower(string(rtype)), int32(i))
sparseTFConfig := SparseTFConfig{
Cluster: sparseCluster,
Task: TaskSpec{
Type: strings.ToLower(rtype),
Type: strings.ToLower(string(rtype)),
Index: int(i),
},
}
Expand All @@ -121,7 +123,7 @@ func genTFConfigJSONStr(tfjob *tfv1.TFJob, rtype, index string) (string, error)
tfConfig := TFConfig{
Cluster: cluster,
Task: TaskSpec{
Type: strings.ToLower(rtype),
Type: strings.ToLower(string(rtype)),
Index: int(i),
},
// We need to set environment to cloud otherwise it will default to local which isn't what we want.
Expand Down Expand Up @@ -155,7 +157,7 @@ func genClusterSpec(tfjob *tfv1.TFJob) (ClusterSpec, error) {
// Headless service assigned a DNS A record for a name of the form "my-svc.my-namespace.svc.cluster.local".
// And the last part "svc.cluster.local" is called cluster domain
// which maybe different between kubernetes clusters.
hostName := common.GenGeneralName(tfjob.Name, rt, fmt.Sprintf("%d", i))
hostName := common.GenGeneralName(tfjob.Name, rtype, fmt.Sprintf("%d", i))
svcName := hostName + "." + tfjob.Namespace + "." + "svc"
clusterDomain := os.Getenv(EnvCustomClusterDomain)
if len(clusterDomain) > 0 {
Expand Down
16 changes: 8 additions & 8 deletions pkg/controller.v1/tensorflow/tfjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ func (r *TFJobReconciler) UpdateJobStatusInApiServer(job interface{}, jobStatus
}

// Same as Func (tc *TFController) SetClusterSpec(...) in pod.go
func (r *TFJobReconciler) SetClusterSpec(job interface{}, podTemplate *corev1.PodTemplateSpec, rtype, index string) error {
func (r *TFJobReconciler) SetClusterSpec(job interface{}, podTemplate *corev1.PodTemplateSpec, rtype commonv1.ReplicaType, index string) error {
tfjob, ok := job.(*tensorflowv1.TFJob)
if !ok {
return fmt.Errorf("%v is not a type of TFJob", tfjob)
Expand Down Expand Up @@ -638,7 +638,7 @@ func (r *TFJobReconciler) IsWorker0Completed(tfjob *tensorflowv1.TFJob, replicas
// getPodSlices returns a slice, which element is the slice of pod.
// It gives enough information to caller to make decision to up/down scale resources.
func (r *TFJobReconciler) getPodSlices(tfjob *tensorflowv1.TFJob, replicasNum *int32) ([][]*v1.Pod, error) {
logger := commonutil.LoggerForReplica(tfjob, strings.ToLower(string(tensorflowv1.TFReplicaTypeWorker)))
logger := commonutil.LoggerForReplica(tfjob, tensorflowv1.TFReplicaTypeWorker)

pods, err := r.GetPodsForJob(tfjob)
if err != nil {
Expand All @@ -647,7 +647,7 @@ func (r *TFJobReconciler) getPodSlices(tfjob *tensorflowv1.TFJob, replicasNum *i
}

// Get all pods for the type rt.
pods, err = r.JobController.FilterPodsForReplicaType(pods, strings.ToLower(string(tensorflowv1.TFReplicaTypeWorker)))
pods, err = r.JobController.FilterPodsForReplicaType(pods, tensorflowv1.TFReplicaTypeWorker)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -678,7 +678,7 @@ func (r *TFJobReconciler) ReconcilePods(
rt := strings.ToLower(string(rtype))
logger := commonutil.LoggerForJob(tfJob)
// Get all pods for the type rt.
pods, err := r.FilterPodsForReplicaType(pods, rt)
pods, err := r.FilterPodsForReplicaType(pods, rtype)
if err != nil {
return err
}
Expand Down Expand Up @@ -768,12 +768,12 @@ func (r *TFJobReconciler) createNewPod(tfjob *tfv1.TFJob, rt, index string, spec
utilruntime.HandleError(fmt.Errorf("couldn't get key for tfjob object %#v: %v", tfjob, err))
return err
}
expectationPodsKey := expectation.GenExpectationPodsKey(tfjobKey, rt)
expectationPodsKey := expectation.GenExpectationPodsKey(tfjobKey, commonv1.ReplicaType(rt))
err = r.Expectations.ExpectCreations(expectationPodsKey, 1)
if err != nil {
return err
}
logger := commonutil.LoggerForReplica(tfjob, rt)
logger := commonutil.LoggerForReplica(tfjob, commonv1.ReplicaType(rt))
// Create OwnerReference.
controllerRef := r.GenOwnerReference(tfjob)

Expand All @@ -789,7 +789,7 @@ func (r *TFJobReconciler) createNewPod(tfjob *tfv1.TFJob, rt, index string, spec
podTemplate := spec.Template.DeepCopy()

// Set name for the template.
podTemplate.Name = common.GenGeneralName(tfjob.Name, rt, index)
podTemplate.Name = common.GenGeneralName(tfjob.Name, commonv1.ReplicaType(rt), index)

if podTemplate.Labels == nil {
podTemplate.Labels = make(map[string]string)
Expand All @@ -799,7 +799,7 @@ func (r *TFJobReconciler) createNewPod(tfjob *tfv1.TFJob, rt, index string, spec
podTemplate.Labels[key] = value
}

if err := r.SetClusterSpec(tfjob, podTemplate, rt, index); err != nil {
if err := r.SetClusterSpec(tfjob, podTemplate, commonv1.ReplicaType(rt), index); err != nil {
return err
}

Expand Down
8 changes: 5 additions & 3 deletions pkg/controller.v1/xgboost/xgboost.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"strconv"
"strings"

"github.com/kubeflow/tf-operator/pkg/common/util"

commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"
xgboostv1 "github.com/kubeflow/tf-operator/pkg/apis/xgboost/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -28,7 +30,7 @@ import (
// SetPodEnv sets the pod env set for:
// - XGBoost Rabit Tracker and worker
// - LightGBM master and workers
func SetPodEnv(job interface{}, podTemplate *corev1.PodTemplateSpec, rtype, index string) error {
func SetPodEnv(job interface{}, podTemplate *corev1.PodTemplateSpec, rtype commonv1.ReplicaType, index string) error {
xgboostjob, ok := job.(*xgboostv1.XGBoostJob)
if !ok {
return fmt.Errorf("%+v is not a type of XGBoostJob", xgboostjob)
Expand All @@ -40,8 +42,8 @@ func SetPodEnv(job interface{}, podTemplate *corev1.PodTemplateSpec, rtype, inde
}

// Add master offset for worker pods
if strings.ToLower(rtype) == strings.ToLower(string(xgboostv1.XGBoostReplicaTypeWorker)) {
masterSpec := xgboostjob.Spec.XGBReplicaSpecs[commonv1.ReplicaType(xgboostv1.XGBoostReplicaTypeMaster)]
if util.IsReplicaTypeSame(rtype, xgboostv1.XGBoostReplicaTypeMaster) {
masterSpec := xgboostjob.Spec.XGBReplicaSpecs[xgboostv1.XGBoostReplicaTypeMaster]
masterReplicas := int(*masterSpec.Replicas)
rank += masterReplicas
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller.v1/xgboost/xgboostjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func (r *XGBoostJobReconciler) UpdateJobStatusInApiServer(job interface{}, jobSt
}

// SetClusterSpec sets the cluster spec for the pod
func (r *XGBoostJobReconciler) SetClusterSpec(job interface{}, podTemplate *corev1.PodTemplateSpec, rtype, index string) error {
func (r *XGBoostJobReconciler) SetClusterSpec(job interface{}, podTemplate *corev1.PodTemplateSpec, rtype commonv1.ReplicaType, index string) error {
return SetPodEnv(job, podTemplate, rtype, index)
}

Expand Down

0 comments on commit ca2e0ab

Please sign in to comment.