Skip to content

Commit

Permalink
Small refactor of cmd dir and fix some bugs for v1alpha2 (#109)
Browse files Browse the repository at this point in the history
* sample refactor code in cmd

* fix panic of v1aplha2

* add new crd for v1alpha2

* add new example for v1alpha2
  • Loading branch information
fisherxu authored and k8s-ci-robot committed May 28, 2019
1 parent a48fd69 commit 4a52f2a
Show file tree
Hide file tree
Showing 11 changed files with 434 additions and 29 deletions.
File renamed without changes.
62 changes: 62 additions & 0 deletions cmd/mpi-operator.v1alpha2/app/options/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2019 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package options

import (
"flag"

"k8s.io/api/core/v1"
)

// ServerOption is the main context object for the controller manager.
type ServerOption struct {
Kubeconfig string
MasterURL string
KubectlDeliveryImage string
Threadiness int
PrintVersion bool
EnableGangScheduling bool
Namespace string
}

// NewServerOption creates a new CMServer with a default config.
func NewServerOption() *ServerOption {
s := ServerOption{}
return &s
}

// AddFlags adds flags for a specific CMServer to the specified FlagSet.
func (s *ServerOption) AddFlags(fs *flag.FlagSet) {
fs.StringVar(&s.MasterURL, "master", "",
`The url of the Kubernetes API server,
will overrides any value in kubeconfig, only required if out-of-cluster.`)

fs.StringVar(&s.Kubeconfig, "kubeConfig", "",
"Path to a kubeConfig. Only required if out-of-cluster.")

fs.StringVar(&s.KubectlDeliveryImage, "kubectl-delivery-image", "",
"The container image used to deliver the kubectl binary.")

fs.StringVar(&s.Namespace, "namespace", v1.NamespaceAll,
`The namespace to monitor tfjobs. If unset, it monitors all namespaces cluster-wide.
If set, it only monitors tfjobs in the given namespace.`)

fs.IntVar(&s.Threadiness, "threadiness", 2,
`How many threads to process the main logic`)

fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit")

fs.BoolVar(&s.EnableGangScheduling, "enable-gang-scheduling", false, "Set true to enable gang scheduling by kube-batch.")
}
115 changes: 115 additions & 0 deletions cmd/mpi-operator.v1alpha2/app/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2019 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"os"

"github.com/golang/glog"
controllersv1alpha2 "github.com/kubeflow/mpi-operator/pkg/controllers/v1alpha2"
corev1 "k8s.io/api/core/v1"
kubeinformers "k8s.io/client-go/informers"
policyinformers "k8s.io/client-go/informers/policy/v1beta1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/sample-controller/pkg/signals"

"github.com/kubeflow/mpi-operator/cmd/mpi-operator.v1alpha2/app/options"
clientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned"
informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions"
"github.com/kubeflow/mpi-operator/pkg/version"
)

const (
apiVersion = "v1alpha2"
RecommendedKubeConfigPathEnv = "KUBECONFIG"
)

func Run(opt *options.ServerOption) error {
// Check if the -version flag was passed and, if so, print the version and exit.
if opt.PrintVersion {
version.PrintVersionAndExit(apiVersion)
}

if opt.Namespace == corev1.NamespaceAll {
glog.Info("Using cluster scoped operator")
} else {
glog.Infof("Scoping operator to namespace %s", opt.Namespace)
}

// To help debugging, immediately log version.
glog.Infof("%+v", version.Info(apiVersion))

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()

// Note: ENV KUBECONFIG will overwrite user defined Kubeconfig option.
if len(os.Getenv(RecommendedKubeConfigPathEnv)) > 0 {
// use the current context in kubeconfig
// This is very useful for running locally.
opt.Kubeconfig = os.Getenv(RecommendedKubeConfigPathEnv)
}

cfg, err := clientcmd.BuildConfigFromFlags(opt.MasterURL, opt.Kubeconfig)
if err != nil {
glog.Fatalf("Error building kubeConfig: %s", err.Error())
}

kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
glog.Fatalf("Error building kubernetes clientset: %s", err.Error())
}

kubeflowClient, err := clientset.NewForConfig(cfg)
if err != nil {
glog.Fatalf("Error building kubeflow clientset: %s", err.Error())
}

var kubeInformerFactory kubeinformers.SharedInformerFactory
var kubeflowInformerFactory informers.SharedInformerFactory
if opt.Namespace == "" {
kubeInformerFactory = kubeinformers.NewSharedInformerFactory(kubeClient, 0)
kubeflowInformerFactory = informers.NewSharedInformerFactory(kubeflowClient, 0)
} else {
kubeInformerFactory = kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(opt.Namespace), nil)
kubeflowInformerFactory = informers.NewSharedInformerFactoryWithOptions(kubeflowClient, 0, informers.WithNamespace(opt.Namespace), nil)
}

var pdbInformer policyinformers.PodDisruptionBudgetInformer
if opt.EnableGangScheduling {
pdbInformer = kubeInformerFactory.Policy().V1beta1().PodDisruptionBudgets()
}
controller := controllersv1alpha2.NewMPIJobController(
kubeClient,
kubeflowClient,
kubeInformerFactory.Core().V1().ConfigMaps(),
kubeInformerFactory.Core().V1().ServiceAccounts(),
kubeInformerFactory.Rbac().V1().Roles(),
kubeInformerFactory.Rbac().V1().RoleBindings(),
kubeInformerFactory.Apps().V1().StatefulSets(),
kubeInformerFactory.Batch().V1().Jobs(),
pdbInformer,
kubeflowInformerFactory.Kubeflow().V1alpha2().MPIJobs(),
opt.KubectlDeliveryImage,
opt.EnableGangScheduling)

go kubeInformerFactory.Start(stopCh)
go kubeflowInformerFactory.Start(stopCh)

if err = controller.Run(opt.Threadiness, stopCh); err != nil {
glog.Fatalf("Error running controller: %s", err.Error())
}
return nil
}
35 changes: 35 additions & 0 deletions cmd/mpi-operator.v1alpha2/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2018 The Kubeflow Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"flag"

"github.com/golang/glog"

"github.com/kubeflow/mpi-operator/cmd/mpi-operator.v1alpha2/app"
"github.com/kubeflow/mpi-operator/cmd/mpi-operator.v1alpha2/app/options"
)

func main() {
s := options.NewServerOption()
s.AddFlags(flag.CommandLine)

flag.Parse()

if err := app.Run(s); err != nil {
glog.Fatalf("%v\n", err)
}
}
File renamed without changes.
36 changes: 36 additions & 0 deletions deploy/crd/crd-v1alpha2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: mpijobs.kubeflow.org
spec:
group: kubeflow.org
version: v1alpha2
scope: Namespaced
names:
plural: mpijobs
singular: mpijob
kind: MPIJob
shortNames:
- mj
- mpij
validation:
openAPIV3Schema:
properties:
spec:
properties:
slotsPerWorker:
type: integer
minimum: 1
mpiReplicaSpecs:
properties:
Launcher:
properties:
replicas:
type: integer
minimum: 1
maximum: 1
Worker:
properties:
replicas:
type: integer
minimum: 1
26 changes: 26 additions & 0 deletions examples/v1alpha2/tensorflow-benchmarks-custom.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# This file shows how to run multi-node training benchmarks using an MPIJob,
# specifying GPUs explicitly per worker.
apiVersion: kubeflow.org/v1alpha2
kind: MPIJob
metadata:
name: tensorflow-benchmarks-16-custom
spec:
slotsPerWorker: 4
mpiReplicaSpecs:
Launcher:
replicas: 1
template:
spec:
containers:
- image: mpioperator/tensorflow-benchmarks:latest
name: tensorflow-benchmarks
Worker:
replicas: 4
template:
spec:
containers:
- image: mpioperator/tensorflow-benchmarks:latest
name: tensorflow-benchmarks
resources:
limits:
nvidia.com/gpu: 4
55 changes: 55 additions & 0 deletions examples/v1alpha2/tensorflow-benchmarks-imagenet.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# This file shows how to run multi-node training benchmarks using an MPIJob,
# using real ImageNet data stored in Amazon EFS.
#
# It assume the ImageNet tfrecord files are stored in an EFS directory under
# `/imagenet/{train,validation}`.
apiVersion: kubeflow.org/v1alpha2
kind: MPIJob
metadata:
name: tensorflow-benchmarks-imagenet
spec:
mpiReplicaSpecs:
Launcher:
replicas: 1
template:
spec:
containers:
- image: mpioperator/tensorflow-benchmarks:latest
name: tensorflow-benchmarks
command:
- mpirun
- python
- scripts/tf_cnn_benchmarks/tf_cnn_benchmarks.py
- --data_format=NCHW
- --batch_size=256
- --model=resnet50
- --optimizer=momentum
- --variable_update=horovod
- --nodistortions
- --gradient_repacking=8
- --num_epochs=90
- --weight_decay=1e-4
- --data_dir=/efs/imagenet/train
- --use_fp16
- --train_dir=/models/resnet50
Worker:
replicas: 2
template:
spec:
containers:
- image: mpioperator/tensorflow-benchmarks:latest
name: tensorflow-benchmarks
volumeMounts:
- mountPath: /efs
name: efs
- mountPath: /models
name: models
volumes:
- name: efs
nfs:
server: fs-ab134502.efs.us-west-2.amazonaws.com
path: /
readOnly: true
- name: models
emptyDir: {}

22 changes: 22 additions & 0 deletions examples/v1alpha2/tensorflow-benchmarks.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
apiVersion: kubeflow.org/v1alpha2
kind: MPIJob
metadata:
name: tensorflow-benchmarks-16
spec:
slotsPerWorker: 1
mpiReplicaSpecs:
Launcher:
replicas: 1
template:
spec:
containers:
- image: mpioperator/tensorflow-benchmarks:latest
name: tensorflow-benchmarks
Worker:
replicas: 2
template:
spec:
containers:
- image: mpioperator/tensorflow-benchmarks:latest
name: tensorflow-benchmarks

Loading

0 comments on commit 4a52f2a

Please sign in to comment.