-
Notifications
You must be signed in to change notification settings - Fork 77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add putfiles
command and add parse
functions on controller.
#496
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ package controller | |
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
|
||
log "github.com/inconshreveable/log15" | ||
|
||
|
@@ -104,10 +105,33 @@ func (c *Controller) onAdd(obj interface{}) { | |
job := obj.(*paddlejob.TrainingJob) | ||
log.Debug("TrainingJob resource added", "name", job.ObjectMeta.Name) | ||
c.autoscaler.OnAdd(job) | ||
// TODO: if we need to create training job instance by the resource, | ||
// you should add the following code: | ||
// var parser DefaultJobParser | ||
// c.clientset.ExtensionsV1beta1().ReplicaSets(namespace).Create(parser.ParseToPserver(job)) | ||
|
||
// TODO(gongwb):open it when all are ready. | ||
// All-are-ready means: | ||
// create trainjob from paddlectl | ||
// scheduler can schedule trainjobs | ||
var parser DefaultJobParser | ||
p := parser.ParseToPserver(job) | ||
t := parser.ParseToTrainer(job) | ||
m := parser.ParseToMaster(job) | ||
|
||
b, _ := json.MarshalIndent(p, "", " ") | ||
log.Debug("create pserver:" + string(b[:])) | ||
|
||
b, _ = json.MarshalIndent(t, "", " ") | ||
log.Debug("create trainer-job:" + string(b[:])) | ||
|
||
b, _ = json.MarshalIndent(m, "", " ") | ||
log.Debug("create master:" + string(b[:])) | ||
|
||
// TODO(gongwb): create them | ||
// just like: | ||
// namespace := job.ObjectMeta.Namespace | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe put this code in your notes instead :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done.Thanks. |
||
// _, err := c.clientset.ExtensionsV1beta1().ReplicaSets(namespace).Create(p) | ||
// if err != nil { | ||
// b, _ := json.MarshalIndent(p, "", " ") | ||
// log.Debug("create pserver:%s\terror: %v", string(b[:]), err) | ||
// } | ||
} | ||
|
||
func (c *Controller) onUpdate(oldObj, newObj interface{}) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ import ( | |
"strconv" | ||
|
||
paddlejob "github.com/PaddlePaddle/cloud/go/api" | ||
apiresource "k8s.io/apimachinery/pkg/api/resource" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/client-go/pkg/api/v1" | ||
batchv1 "k8s.io/client-go/pkg/apis/batch/v1" | ||
|
@@ -93,7 +94,7 @@ func (p *DefaultJobParser) ParseToPserver(job *paddlejob.TrainingJob) *v1beta1.R | |
Volumes: podVolumes(job), | ||
Containers: []v1.Container{ | ||
v1.Container{ | ||
Name: job.ObjectMeta.Name, | ||
Name: "pserver", | ||
Image: job.Spec.Image, | ||
Ports: podPorts(job), | ||
Env: podEnv(job), | ||
|
@@ -109,14 +110,124 @@ func (p *DefaultJobParser) ParseToPserver(job *paddlejob.TrainingJob) *v1beta1.R | |
|
||
// ParseToTrainer parse TrainingJob to a kubernetes job resource. | ||
func (p *DefaultJobParser) ParseToTrainer(job *paddlejob.TrainingJob) *batchv1.Job { | ||
// TODO: create job. | ||
return &batchv1.Job{} | ||
replicas := int32(job.Spec.Trainer.MinInstance) | ||
command := make([]string, 2, 2) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is same as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done.Thanks. |
||
if job.Spec.FaultTolerant { | ||
command = []string{"paddle_k8s", "start_trainer"} | ||
} else { | ||
command = []string{"paddle_k8s", "start_new_trainer"} | ||
} | ||
|
||
return &batchv1.Job{ | ||
TypeMeta: metav1.TypeMeta{ | ||
Kind: "Job", | ||
APIVersion: "batch/v1", | ||
}, | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: job.ObjectMeta.Name + "-trainer", | ||
Namespace: job.ObjectMeta.Namespace, | ||
}, | ||
Spec: batchv1.JobSpec{ | ||
Parallelism: &replicas, | ||
Template: v1.PodTemplateSpec{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Labels: map[string]string{"paddle-job": job.ObjectMeta.Name}, | ||
}, | ||
Spec: v1.PodSpec{ | ||
Volumes: podVolumes(job), | ||
Containers: []v1.Container{ | ||
v1.Container{ | ||
Name: "trainer", | ||
Image: job.Spec.Image, | ||
ImagePullPolicy: "Always", | ||
Command: command, | ||
VolumeMounts: podVolumeMounts(job), | ||
Ports: podPorts(job), | ||
Env: podEnv(job), | ||
Resources: job.Spec.Trainer.Resources, | ||
}, | ||
}, | ||
RestartPolicy: "Never", | ||
}, | ||
}, | ||
}, | ||
} | ||
} | ||
|
||
func masterResource(job *paddlejob.TrainingJob) *v1.ResourceRequirements { | ||
// TODO(gongwb): config master resource? | ||
return &v1.ResourceRequirements{ | ||
Limits: v1.ResourceList{ | ||
"cpu": *apiresource.NewQuantity(int64(1), apiresource.DecimalSI), | ||
"memory": apiresource.MustParse("500Mi"), | ||
}, | ||
Requests: v1.ResourceList{ | ||
"cpu": *apiresource.NewQuantity(int64(2), apiresource.DecimalSI), | ||
"memory": apiresource.MustParse("1Gi"), | ||
}, | ||
} | ||
} | ||
|
||
func getEtcdPodSpec(job *paddlejob.TrainingJob) *v1.Container { | ||
command := []string{"etcd", "-name", "etcd0", | ||
"-advertise-client-urls", "http://$(POD_IP):2379,http://$(POD_IP):4001", | ||
"-listen-client-urls", "http://0.0.0.0:2379,http://0.0.0.0:4001", | ||
"-initial-advertise-peer-urls", "http://$(POD_IP):2380", | ||
"-listen-peer-urls", "http://0.0.0.0:2380", | ||
"-initial-cluster", "etcd0=http://$(POD_IP):2380", | ||
"-initial-cluster-state", "new"} | ||
|
||
return &v1.Container{ | ||
Name: "etcd", | ||
Image: "quay.io/coreos/etcd:v3.2.1", | ||
ImagePullPolicy: "Always", | ||
// TODO(gongwb): etcd ports? | ||
Env: podEnv(job), | ||
Command: command, | ||
} | ||
} | ||
|
||
// ParseToMaster parse TrainingJob to a kubernetes replicaset resource. | ||
func (p *DefaultJobParser) ParseToMaster(job *paddlejob.TrainingJob) *v1beta1.ReplicaSet { | ||
// TODO: create master if needed. | ||
return &v1beta1.ReplicaSet{} | ||
replicas := int32(1) | ||
// FIXME: refine these part. | ||
command := []string{"paddle_k8s", "start_master"} | ||
|
||
return &v1beta1.ReplicaSet{ | ||
TypeMeta: metav1.TypeMeta{ | ||
Kind: "extensions/v1beta1", | ||
APIVersion: "ReplicaSet", | ||
}, | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: job.ObjectMeta.Name + "-master", | ||
Namespace: job.ObjectMeta.Namespace, | ||
}, | ||
Spec: v1beta1.ReplicaSetSpec{ | ||
Replicas: &replicas, | ||
Template: v1.PodTemplateSpec{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Labels: map[string]string{"paddle-job-master": job.ObjectMeta.Name}, | ||
}, | ||
Spec: v1.PodSpec{ | ||
// TODO: setup pserver volumes on cloud. | ||
Volumes: podVolumes(job), | ||
Containers: []v1.Container{ | ||
v1.Container{ | ||
Name: "master", | ||
Image: job.Spec.Image, | ||
ImagePullPolicy: "Always", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
Ports: masterPorts(job), | ||
// TODO(gongwb):master env | ||
Command: command, | ||
VolumeMounts: podVolumeMounts(job), | ||
Resources: *masterResource(job), | ||
}, | ||
*getEtcdPodSpec(job), | ||
}, | ||
}, | ||
}, | ||
}, | ||
} | ||
} | ||
|
||
// ----------------------------------------------------------------------- | ||
|
@@ -133,7 +244,21 @@ func podPorts(job *paddlejob.TrainingJob) []v1.ContainerPort { | |
}) | ||
basePort++ | ||
} | ||
return []v1.ContainerPort{} | ||
return ports | ||
} | ||
|
||
func masterPorts(job *paddlejob.TrainingJob) []v1.ContainerPort { | ||
ports := []v1.ContainerPort{ | ||
v1.ContainerPort{ | ||
Name: "master-port", | ||
ContainerPort: 8080, | ||
}, | ||
v1.ContainerPort{ | ||
Name: "etcd-port", | ||
ContainerPort: 2379, | ||
}, | ||
} | ||
return ports | ||
} | ||
|
||
func podEnv(job *paddlejob.TrainingJob) []v1.EnvVar { | ||
|
@@ -150,6 +275,7 @@ func podEnv(job *paddlejob.TrainingJob) []v1.EnvVar { | |
// FIXME: CPU resource value can be less than 1. | ||
trainerCount = int(q.Value()) | ||
} | ||
|
||
return []v1.EnvVar{ | ||
v1.EnvVar{Name: "PADDLE_JOB_NAME", Value: job.ObjectMeta.Name}, | ||
// NOTICE: TRAINERS, PSERVERS, PADDLE_INIT_NUM_GRADIENT_SERVERS | ||
|
@@ -171,23 +297,42 @@ func podEnv(job *paddlejob.TrainingJob) []v1.EnvVar { | |
v1.EnvVar{Name: "PADDLE_INIT_NUM_GRADIENT_SERVERS", Value: strconv.Itoa(job.Spec.Trainer.MinInstance)}, | ||
v1.EnvVar{Name: "PADDLE_INIT_NUM_PASSES", Value: strconv.Itoa(job.Spec.Passes)}, | ||
v1.EnvVar{Name: "PADDLE_INIT_USE_GPU", Value: needGPU}, | ||
|
||
// FIXME(gongwb): LD_LIBRARY_PATH? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
v1.EnvVar{Name: "LD_LIBRARY_PATH", Value: job.Spec.Trainer.Entrypoint}, | ||
v1.EnvVar{Name: "NAMESPACE", ValueFrom: &v1.EnvVarSource{ | ||
FieldRef: &v1.ObjectFieldSelector{ | ||
FieldPath: "metadata.namespace", | ||
}, | ||
}}, | ||
v1.EnvVar{Name: "POD_IP", ValueFrom: &v1.EnvVarSource{ | ||
FieldRef: &v1.ObjectFieldSelector{ | ||
FieldPath: "status.podIP", | ||
}, | ||
}}, | ||
} | ||
} | ||
|
||
func podVolumes(job *paddlejob.TrainingJob) []v1.Volume { | ||
// TODO: prepare volumes. | ||
return []v1.Volume{} | ||
return []v1.Volume{ | ||
v1.Volume{ | ||
Name: job.ObjectMeta.Name + "-workspace", | ||
VolumeSource: v1.VolumeSource{ | ||
HostPath: &v1.HostPathVolumeSource{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 可以加个TODO,需要增加cephfs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Volume需要区分使用的文件系统:ceph, hostpath,还需要同时挂载用户目录和public目录 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
Path: job.Spec.Trainer.Workspace, | ||
}, | ||
}, | ||
}, | ||
} | ||
} | ||
|
||
func podVolumeMounts(job *paddlejob.TrainingJob) []v1.VolumeMount { | ||
// TODO: preapare volume mounts for pods. | ||
return []v1.VolumeMount{} | ||
return []v1.VolumeMount{ | ||
v1.VolumeMount{ | ||
Name: job.ObjectMeta.Name + "-workspace", | ||
MountPath: job.Spec.Trainer.Workspace, | ||
}, | ||
} | ||
} | ||
|
||
// ----------------------------------------------------------------------- | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ import ( | |
"flag" | ||
"fmt" | ||
"os" | ||
"path" | ||
"strings" | ||
|
||
"k8s.io/client-go/pkg/api/v1" | ||
|
@@ -73,9 +74,8 @@ func (*SubmitCmd) Usage() string { | |
|
||
func (p *SubmitCmd) getTrainer() *paddlejob.TrainerSpec { | ||
return &paddlejob.TrainerSpec{ | ||
Entrypoint: p.Entry, | ||
// FIXME(gongwb): workspace | ||
|
||
Entrypoint: p.Entry, | ||
Workspace: getJobPfsPath(p.Jobpackage, p.Jobname), | ||
MinInstance: p.MinInstance, | ||
MaxInstance: p.MaxInstance, | ||
Resources: v1.ResourceRequirements{ | ||
|
@@ -115,7 +115,6 @@ func (p *SubmitCmd) getMaster() *paddlejob.MasterSpec { | |
|
||
// GetTrainingJob get's paddlejob.TrainingJob struct filed by Submitcmd paramters. | ||
func (p *SubmitCmd) GetTrainingJob() *paddlejob.TrainingJob { | ||
|
||
t := paddlejob.TrainingJob{ | ||
metav1.TypeMeta{ | ||
Kind: "TrainingJob", | ||
|
@@ -201,14 +200,30 @@ func NewSubmitter(cmd *SubmitCmd) *Submitter { | |
return &s | ||
} | ||
|
||
func getJobPfsPath(jobPackage, jobName string) string { | ||
_, err := os.Stat(jobPackage) | ||
if os.IsNotExist(err) { | ||
return jobPackage | ||
} | ||
|
||
return path.Join("/pfs", Config.ActiveConfig.Name, "home", Config.ActiveConfig.Username, "jobs", jobName) | ||
} | ||
|
||
// putFiles puts files to pfs and | ||
// if jobPackage is not a local dir, skip uploading package. | ||
func putFiles(jobPackage, jobName string) error { | ||
func putFilesToPfs(jobPackage, jobName string) error { | ||
_, pkgerr := os.Stat(jobPackage) | ||
if pkgerr == nil { | ||
// FIXME: upload job package to paddle cloud. | ||
dest := getJobPfsPath(jobPackage, jobName) | ||
if !strings.HasSuffix(jobPackage, "/") { | ||
jobPackage = jobPackage + "/" | ||
} | ||
err := putFiles(jobPackage, dest) | ||
if err != nil { | ||
return err | ||
} | ||
} else if os.IsNotExist(pkgerr) { | ||
return fmt.Errorf("stat jobpackage '%s' error: %v", jobPackage, pkgerr) | ||
glog.Warning("jobpackage not a local dir, skip upload.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not a merge blocker, but maybe we can switch to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK. Maybe we need an individual PR to fix this. |
||
} | ||
|
||
return nil | ||
|
@@ -229,7 +244,7 @@ func (s *Submitter) Submit(jobPackage string, jobName string) error { | |
return err | ||
} | ||
|
||
if err := putFiles(jobPackage, jobName); err != nil { | ||
if err := putFilesToPfs(jobPackage, jobName); err != nil { | ||
return err | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
b
instead ofb[:]
is more idiomatic.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.Thanks.