-
Notifications
You must be signed in to change notification settings - Fork 77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add putfiles
command and add parse
functions on controller.
#496
Changes from 5 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ import ( | |
"strconv" | ||
|
||
paddlejob "github.com/PaddlePaddle/cloud/go/api" | ||
apiresource "k8s.io/apimachinery/pkg/api/resource" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/client-go/pkg/api/v1" | ||
batchv1 "k8s.io/client-go/pkg/apis/batch/v1" | ||
|
@@ -93,7 +94,7 @@ func (p *DefaultJobParser) ParseToPserver(job *paddlejob.TrainingJob) *v1beta1.R | |
Volumes: podVolumes(job), | ||
Containers: []v1.Container{ | ||
v1.Container{ | ||
Name: job.ObjectMeta.Name, | ||
Name: "pserver", | ||
Image: job.Spec.Image, | ||
Ports: podPorts(job), | ||
Env: podEnv(job), | ||
|
@@ -109,14 +110,124 @@ func (p *DefaultJobParser) ParseToPserver(job *paddlejob.TrainingJob) *v1beta1.R | |
|
||
// ParseToTrainer parse TrainingJob to a kubernetes job resource. | ||
func (p *DefaultJobParser) ParseToTrainer(job *paddlejob.TrainingJob) *batchv1.Job { | ||
// TODO: create job. | ||
return &batchv1.Job{} | ||
replicas := int32(job.Spec.Trainer.MinInstance) | ||
command := make([]string, 2) | ||
if job.Spec.FaultTolerant { | ||
command = []string{"paddle_k8s", "start_trainer"} | ||
} else { | ||
command = []string{"paddle_k8s", "start_new_trainer"} | ||
} | ||
|
||
return &batchv1.Job{ | ||
TypeMeta: metav1.TypeMeta{ | ||
Kind: "Job", | ||
APIVersion: "batch/v1", | ||
}, | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: job.ObjectMeta.Name + "-trainer", | ||
Namespace: job.ObjectMeta.Namespace, | ||
}, | ||
Spec: batchv1.JobSpec{ | ||
Parallelism: &replicas, | ||
Template: v1.PodTemplateSpec{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Labels: map[string]string{"paddle-job": job.ObjectMeta.Name}, | ||
}, | ||
Spec: v1.PodSpec{ | ||
Volumes: podVolumes(job), | ||
Containers: []v1.Container{ | ||
v1.Container{ | ||
Name: "trainer", | ||
Image: job.Spec.Image, | ||
ImagePullPolicy: "Always", | ||
Command: command, | ||
VolumeMounts: podVolumeMounts(job), | ||
Ports: podPorts(job), | ||
Env: podEnv(job), | ||
Resources: job.Spec.Trainer.Resources, | ||
}, | ||
}, | ||
RestartPolicy: "Never", | ||
}, | ||
}, | ||
}, | ||
} | ||
} | ||
|
||
func masterResource(job *paddlejob.TrainingJob) *v1.ResourceRequirements { | ||
// TODO(gongwb): config master resource? | ||
return &v1.ResourceRequirements{ | ||
Limits: v1.ResourceList{ | ||
"cpu": *apiresource.NewQuantity(int64(1), apiresource.DecimalSI), | ||
"memory": apiresource.MustParse("500Mi"), | ||
}, | ||
Requests: v1.ResourceList{ | ||
"cpu": *apiresource.NewQuantity(int64(2), apiresource.DecimalSI), | ||
"memory": apiresource.MustParse("1Gi"), | ||
}, | ||
} | ||
} | ||
|
||
func getEtcdPodSpec(job *paddlejob.TrainingJob) *v1.Container { | ||
command := []string{"etcd", "-name", "etcd0", | ||
"-advertise-client-urls", "http://$(POD_IP):2379,http://$(POD_IP):4001", | ||
"-listen-client-urls", "http://0.0.0.0:2379,http://0.0.0.0:4001", | ||
"-initial-advertise-peer-urls", "http://$(POD_IP):2380", | ||
"-listen-peer-urls", "http://0.0.0.0:2380", | ||
"-initial-cluster", "etcd0=http://$(POD_IP):2380", | ||
"-initial-cluster-state", "new"} | ||
|
||
return &v1.Container{ | ||
Name: "etcd", | ||
Image: "quay.io/coreos/etcd:v3.2.1", | ||
ImagePullPolicy: "Always", | ||
// TODO(gongwb): etcd ports? | ||
Env: podEnv(job), | ||
Command: command, | ||
} | ||
} | ||
|
||
// ParseToMaster parse TrainingJob to a kubernetes replicaset resource. | ||
func (p *DefaultJobParser) ParseToMaster(job *paddlejob.TrainingJob) *v1beta1.ReplicaSet { | ||
// TODO: create master if needed. | ||
return &v1beta1.ReplicaSet{} | ||
replicas := int32(1) | ||
// FIXME: refine these part. | ||
command := []string{"paddle_k8s", "start_master"} | ||
|
||
return &v1beta1.ReplicaSet{ | ||
TypeMeta: metav1.TypeMeta{ | ||
Kind: "extensions/v1beta1", | ||
APIVersion: "ReplicaSet", | ||
}, | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: job.ObjectMeta.Name + "-master", | ||
Namespace: job.ObjectMeta.Namespace, | ||
}, | ||
Spec: v1beta1.ReplicaSetSpec{ | ||
Replicas: &replicas, | ||
Template: v1.PodTemplateSpec{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Labels: map[string]string{"paddle-job-master": job.ObjectMeta.Name}, | ||
}, | ||
Spec: v1.PodSpec{ | ||
// TODO: setup pserver volumes on cloud. | ||
Volumes: podVolumes(job), | ||
Containers: []v1.Container{ | ||
v1.Container{ | ||
Name: "master", | ||
Image: job.Spec.Image, | ||
ImagePullPolicy: "Always", | ||
Ports: masterPorts(job), | ||
// TODO(gongwb):master env | ||
Command: command, | ||
VolumeMounts: podVolumeMounts(job), | ||
Resources: *masterResource(job), | ||
}, | ||
*getEtcdPodSpec(job), | ||
}, | ||
}, | ||
}, | ||
}, | ||
} | ||
} | ||
|
||
// ----------------------------------------------------------------------- | ||
|
@@ -133,7 +244,21 @@ func podPorts(job *paddlejob.TrainingJob) []v1.ContainerPort { | |
}) | ||
basePort++ | ||
} | ||
return []v1.ContainerPort{} | ||
return ports | ||
} | ||
|
||
func masterPorts(job *paddlejob.TrainingJob) []v1.ContainerPort { | ||
ports := []v1.ContainerPort{ | ||
v1.ContainerPort{ | ||
Name: "master-port", | ||
ContainerPort: 8080, | ||
}, | ||
v1.ContainerPort{ | ||
Name: "etcd-port", | ||
ContainerPort: 2379, | ||
}, | ||
} | ||
return ports | ||
} | ||
|
||
func podEnv(job *paddlejob.TrainingJob) []v1.EnvVar { | ||
|
@@ -150,6 +275,7 @@ func podEnv(job *paddlejob.TrainingJob) []v1.EnvVar { | |
// FIXME: CPU resource value can be less than 1. | ||
trainerCount = int(q.Value()) | ||
} | ||
|
||
return []v1.EnvVar{ | ||
v1.EnvVar{Name: "PADDLE_JOB_NAME", Value: job.ObjectMeta.Name}, | ||
// NOTICE: TRAINERS, PSERVERS, PADDLE_INIT_NUM_GRADIENT_SERVERS | ||
|
@@ -171,23 +297,42 @@ func podEnv(job *paddlejob.TrainingJob) []v1.EnvVar { | |
v1.EnvVar{Name: "PADDLE_INIT_NUM_GRADIENT_SERVERS", Value: strconv.Itoa(job.Spec.Trainer.MinInstance)}, | ||
v1.EnvVar{Name: "PADDLE_INIT_NUM_PASSES", Value: strconv.Itoa(job.Spec.Passes)}, | ||
v1.EnvVar{Name: "PADDLE_INIT_USE_GPU", Value: needGPU}, | ||
|
||
// FIXME(gongwb): LD_LIBRARY_PATH? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
v1.EnvVar{Name: "LD_LIBRARY_PATH", Value: job.Spec.Trainer.Entrypoint}, | ||
v1.EnvVar{Name: "NAMESPACE", ValueFrom: &v1.EnvVarSource{ | ||
FieldRef: &v1.ObjectFieldSelector{ | ||
FieldPath: "metadata.namespace", | ||
}, | ||
}}, | ||
v1.EnvVar{Name: "POD_IP", ValueFrom: &v1.EnvVarSource{ | ||
FieldRef: &v1.ObjectFieldSelector{ | ||
FieldPath: "status.podIP", | ||
}, | ||
}}, | ||
} | ||
} | ||
|
||
func podVolumes(job *paddlejob.TrainingJob) []v1.Volume { | ||
// TODO: prepare volumes. | ||
return []v1.Volume{} | ||
return []v1.Volume{ | ||
v1.Volume{ | ||
Name: job.ObjectMeta.Name + "-workspace", | ||
VolumeSource: v1.VolumeSource{ | ||
HostPath: &v1.HostPathVolumeSource{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 可以加个TODO,需要增加cephfs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Volume需要区分使用的文件系统:ceph, hostpath,还需要同时挂载用户目录和public目录 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
Path: job.Spec.Trainer.Workspace, | ||
}, | ||
}, | ||
}, | ||
} | ||
} | ||
|
||
func podVolumeMounts(job *paddlejob.TrainingJob) []v1.VolumeMount { | ||
// TODO: preapare volume mounts for pods. | ||
return []v1.VolumeMount{} | ||
return []v1.VolumeMount{ | ||
v1.VolumeMount{ | ||
Name: job.ObjectMeta.Name + "-workspace", | ||
MountPath: job.Spec.Trainer.Workspace, | ||
}, | ||
} | ||
} | ||
|
||
// ----------------------------------------------------------------------- | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ImagePullPolicy
好几个地方都用到了,可以定义成一个const,或者启动参数There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.