diff --git a/go/cmd/paddlectl/main.go b/go/cmd/paddlectl/main.go index c2927c4d..5bbbe0d2 100644 --- a/go/cmd/paddlectl/main.go +++ b/go/cmd/paddlectl/main.go @@ -14,14 +14,8 @@ func main() { subcommands.Register(subcommands.CommandsCommand(), "") subcommands.Register(&paddlectl.SubmitCmd{}, "") - // TODO(gongwb): add these commands. - // subcommands.Register(&paddlecloud.LogsCommand{}, "") - // subcommands.Register(&paddlecloud.GetCommand{}, "") - // subcommands.Register(&paddlecloud.KillCommand{}, "") - // subcommands.Register(&paddlecloud.SimpleFileCmd{}, "") - // subcommands.Register(&paddlecloud.RegistryCmd{}, "") - // subcommands.Register(&paddlecloud.DeleteCommand{}, "") - // subcommands.Register(&paddlecloud.PublishCmd{}, "") + // TODO(gongwb): add more commands. + subcommands.Register(&paddlectl.SimpleFileCmd{}, "") flag.Parse() ctx := context.Background() diff --git a/go/controller/controller.go b/go/controller/controller.go index c29a515d..f6721eea 100644 --- a/go/controller/controller.go +++ b/go/controller/controller.go @@ -26,6 +26,7 @@ package controller import ( "context" + "encoding/json" log "github.com/inconshreveable/log15" @@ -104,10 +105,26 @@ func (c *Controller) onAdd(obj interface{}) { job := obj.(*paddlejob.TrainingJob) log.Debug("TrainingJob resource added", "name", job.ObjectMeta.Name) c.autoscaler.OnAdd(job) - // TODO: if we need to create training job instance by the resource, - // you should add the following code: - // var parser DefaultJobParser - // c.clientset.ExtensionsV1beta1().ReplicaSets(namespace).Create(parser.ParseToPserver(job)) + + // TODO(gongwb):open it when all are ready. + // All-are-ready means: + // create trainjob from paddlectl + // scheduler can schedule trainjobs + var parser DefaultJobParser + p := parser.ParseToPserver(job) + t := parser.ParseToTrainer(job) + m := parser.ParseToMaster(job) + + b, _ := json.MarshalIndent(p, "", " ") + log.Debug("create pserver:" + string(b)) + + b, _ = json.MarshalIndent(t, "", " ") + log.Debug("create trainer-job:" + string(b)) + + b, _ = json.MarshalIndent(m, "", " ") + log.Debug("create master:" + string(b)) + + // TODO(gongwb): create them } func (c *Controller) onUpdate(oldObj, newObj interface{}) { diff --git a/go/controller/jobparser.go b/go/controller/jobparser.go index 2f97d1b1..ab9631d9 100644 --- a/go/controller/jobparser.go +++ b/go/controller/jobparser.go @@ -20,12 +20,17 @@ import ( "strconv" paddlejob "github.com/PaddlePaddle/cloud/go/api" + apiresource "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/pkg/api/v1" batchv1 "k8s.io/client-go/pkg/apis/batch/v1" v1beta1 "k8s.io/client-go/pkg/apis/extensions/v1beta1" ) +const ( + imagePullPolicy = "Always" +) + // JobParser is a interface can parse "TrainingJob" to // ReplicaSet and job. type JobParser interface { @@ -93,7 +98,7 @@ func (p *DefaultJobParser) ParseToPserver(job *paddlejob.TrainingJob) *v1beta1.R Volumes: podVolumes(job), Containers: []v1.Container{ v1.Container{ - Name: job.ObjectMeta.Name, + Name: "pserver", Image: job.Spec.Image, Ports: podPorts(job), Env: podEnv(job), @@ -109,14 +114,124 @@ func (p *DefaultJobParser) ParseToPserver(job *paddlejob.TrainingJob) *v1beta1.R // ParseToTrainer parse TrainingJob to a kubernetes job resource. func (p *DefaultJobParser) ParseToTrainer(job *paddlejob.TrainingJob) *batchv1.Job { - // TODO: create job. - return &batchv1.Job{} + replicas := int32(job.Spec.Trainer.MinInstance) + command := make([]string, 2) + if job.Spec.FaultTolerant { + command = []string{"paddle_k8s", "start_trainer"} + } else { + command = []string{"paddle_k8s", "start_new_trainer"} + } + + return &batchv1.Job{ + TypeMeta: metav1.TypeMeta{ + Kind: "Job", + APIVersion: "batch/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: job.ObjectMeta.Name + "-trainer", + Namespace: job.ObjectMeta.Namespace, + }, + Spec: batchv1.JobSpec{ + Parallelism: &replicas, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"paddle-job": job.ObjectMeta.Name}, + }, + Spec: v1.PodSpec{ + Volumes: podVolumes(job), + Containers: []v1.Container{ + v1.Container{ + Name: "trainer", + Image: job.Spec.Image, + ImagePullPolicy: imagePullPolicy, + Command: command, + VolumeMounts: podVolumeMounts(job), + Ports: podPorts(job), + Env: podEnv(job), + Resources: job.Spec.Trainer.Resources, + }, + }, + RestartPolicy: "Never", + }, + }, + }, + } +} + +func masterResource(job *paddlejob.TrainingJob) *v1.ResourceRequirements { + // TODO(gongwb): config master resource? + return &v1.ResourceRequirements{ + Limits: v1.ResourceList{ + "cpu": *apiresource.NewQuantity(int64(1), apiresource.DecimalSI), + "memory": apiresource.MustParse("500Mi"), + }, + Requests: v1.ResourceList{ + "cpu": *apiresource.NewQuantity(int64(2), apiresource.DecimalSI), + "memory": apiresource.MustParse("1Gi"), + }, + } +} + +func getEtcdPodSpec(job *paddlejob.TrainingJob) *v1.Container { + command := []string{"etcd", "-name", "etcd0", + "-advertise-client-urls", "http://$(POD_IP):2379,http://$(POD_IP):4001", + "-listen-client-urls", "http://0.0.0.0:2379,http://0.0.0.0:4001", + "-initial-advertise-peer-urls", "http://$(POD_IP):2380", + "-listen-peer-urls", "http://0.0.0.0:2380", + "-initial-cluster", "etcd0=http://$(POD_IP):2380", + "-initial-cluster-state", "new"} + + return &v1.Container{ + Name: "etcd", + Image: "quay.io/coreos/etcd:v3.2.1", + ImagePullPolicy: imagePullPolicy, + // TODO(gongwb): etcd ports? + Env: podEnv(job), + Command: command, + } } // ParseToMaster parse TrainingJob to a kubernetes replicaset resource. func (p *DefaultJobParser) ParseToMaster(job *paddlejob.TrainingJob) *v1beta1.ReplicaSet { - // TODO: create master if needed. - return &v1beta1.ReplicaSet{} + replicas := int32(1) + // FIXME: refine these part. + command := []string{"paddle_k8s", "start_master"} + + return &v1beta1.ReplicaSet{ + TypeMeta: metav1.TypeMeta{ + Kind: "extensions/v1beta1", + APIVersion: "ReplicaSet", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: job.ObjectMeta.Name + "-master", + Namespace: job.ObjectMeta.Namespace, + }, + Spec: v1beta1.ReplicaSetSpec{ + Replicas: &replicas, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"paddle-job-master": job.ObjectMeta.Name}, + }, + Spec: v1.PodSpec{ + // TODO: setup pserver volumes on cloud. + Volumes: podVolumes(job), + Containers: []v1.Container{ + v1.Container{ + Name: "master", + Image: job.Spec.Image, + ImagePullPolicy: imagePullPolicy, + Ports: masterPorts(job), + // TODO(gongwb):master env + Command: command, + VolumeMounts: podVolumeMounts(job), + Resources: *masterResource(job), + }, + *getEtcdPodSpec(job), + }, + }, + }, + }, + } } // ----------------------------------------------------------------------- @@ -133,7 +248,21 @@ func podPorts(job *paddlejob.TrainingJob) []v1.ContainerPort { }) basePort++ } - return []v1.ContainerPort{} + return ports +} + +func masterPorts(job *paddlejob.TrainingJob) []v1.ContainerPort { + ports := []v1.ContainerPort{ + v1.ContainerPort{ + Name: "master-port", + ContainerPort: 8080, + }, + v1.ContainerPort{ + Name: "etcd-port", + ContainerPort: 2379, + }, + } + return ports } func podEnv(job *paddlejob.TrainingJob) []v1.EnvVar { @@ -150,6 +279,7 @@ func podEnv(job *paddlejob.TrainingJob) []v1.EnvVar { // FIXME: CPU resource value can be less than 1. trainerCount = int(q.Value()) } + return []v1.EnvVar{ v1.EnvVar{Name: "PADDLE_JOB_NAME", Value: job.ObjectMeta.Name}, // NOTICE: TRAINERS, PSERVERS, PADDLE_INIT_NUM_GRADIENT_SERVERS @@ -171,23 +301,41 @@ func podEnv(job *paddlejob.TrainingJob) []v1.EnvVar { v1.EnvVar{Name: "PADDLE_INIT_NUM_GRADIENT_SERVERS", Value: strconv.Itoa(job.Spec.Trainer.MinInstance)}, v1.EnvVar{Name: "PADDLE_INIT_NUM_PASSES", Value: strconv.Itoa(job.Spec.Passes)}, v1.EnvVar{Name: "PADDLE_INIT_USE_GPU", Value: needGPU}, - v1.EnvVar{Name: "LD_LIBRARY_PATH", Value: job.Spec.Trainer.Entrypoint}, + v1.EnvVar{Name: "LD_LIBRARY_PATH", Value: "/usr/local/cuda/lib64"}, v1.EnvVar{Name: "NAMESPACE", ValueFrom: &v1.EnvVarSource{ FieldRef: &v1.ObjectFieldSelector{ FieldPath: "metadata.namespace", }, }}, + v1.EnvVar{Name: "POD_IP", ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + FieldPath: "status.podIP", + }, + }}, } } func podVolumes(job *paddlejob.TrainingJob) []v1.Volume { - // TODO: prepare volumes. - return []v1.Volume{} + return []v1.Volume{ + v1.Volume{ + Name: job.ObjectMeta.Name + "-workspace", + // TODO(gongwb): add support to ceph fs and mount public path. + VolumeSource: v1.VolumeSource{ + HostPath: &v1.HostPathVolumeSource{ + Path: job.Spec.Trainer.Workspace, + }, + }, + }, + } } func podVolumeMounts(job *paddlejob.TrainingJob) []v1.VolumeMount { - // TODO: preapare volume mounts for pods. - return []v1.VolumeMount{} + return []v1.VolumeMount{ + v1.VolumeMount{ + Name: job.ObjectMeta.Name + "-workspace", + MountPath: job.Spec.Trainer.Workspace, + }, + } } // ----------------------------------------------------------------------- diff --git a/go/paddlectl/simplefile.go b/go/paddlectl/simplefile.go new file mode 100644 index 00000000..3258b0f6 --- /dev/null +++ b/go/paddlectl/simplefile.go @@ -0,0 +1,198 @@ +package paddlectl + +import ( + "context" + "encoding/json" + "errors" + "flag" + "fmt" + "io" + "net/url" + "os" + "path" + "path/filepath" + "strings" + + "github.com/PaddlePaddle/cloud/go/utils/restclient" + "github.com/google/subcommands" +) + +// TODO(gongwb): rm simplefile.go under paddlecloud/ + +// SimpleFileCmd define the subcommand of simple file operations. +type SimpleFileCmd struct { +} + +// Name is subcommands name. +func (*SimpleFileCmd) Name() string { return "file" } + +// Synopsis is subcommands synopsis. +func (*SimpleFileCmd) Synopsis() string { return "Simple file operations." } + +// Usage is subcommands Usage. +func (*SimpleFileCmd) Usage() string { + return `file [put|get] or file ls : + dst must be like /pfs/[datacenter]/home/[username] + Options: +` +} + +// SetFlags registers subcommands flags. +func (p *SimpleFileCmd) SetFlags(f *flag.FlagSet) { +} + +// Execute file ops. +func (p *SimpleFileCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus { + if f.NArg() < 1 || f.NArg() > 3 { + f.Usage() + return subcommands.ExitFailure + } + switch f.Arg(0) { + case "put": + err := putFiles(f.Arg(1), f.Arg(2)) + if err != nil { + fmt.Fprintf(os.Stderr, "put file error: %s\n", err) + return subcommands.ExitFailure + } + case "get": + err := getFile(f.Arg(1), f.Arg(2)) + if err != nil { + fmt.Fprintf(os.Stderr, "get file error: %s\n", err) + return subcommands.ExitFailure + } + case "ls": + err := lsFile(f.Arg(1)) + if err != nil { + fmt.Fprintf(os.Stderr, "ls file error: %s\n", err) + return subcommands.ExitFailure + } + default: + f.Usage() + return subcommands.ExitFailure + } + return subcommands.ExitSuccess +} + +func lsFile(dst string) error { + query := url.Values{} + query.Set("path", dst) + query.Set("dc", Config.ActiveConfig.Name) + respStr, err := restclient.GetCall(Config.ActiveConfig.Endpoint+"/api/v1/filelist/", query) + if err != nil { + return err + } + var respObj interface{} + if err = json.Unmarshal(respStr, &respObj); err != nil { + return err + } + // FIXME: Print an error if error message is not empty. Use response code instead + errMsg := respObj.(map[string]interface{})["msg"].(string) + if len(errMsg) > 0 { + return errors.New("list file error: " + errMsg) + } + items := respObj.(map[string]interface{})["items"].([]interface{}) + for _, fn := range items { + fmt.Println(fn.(string)) + } + return nil +} + +func putFiles(src string, dest string) error { + f, err := os.Stat(src) + if err != nil { + return err + } + if strings.HasPrefix(src, "..") { + return errors.New("src path should be inside your submiting path") + } + switch mode := f.Mode(); { + case mode.IsDir(): + if err := filepath.Walk(src, func(path string, info os.FileInfo, err error) error { + if info.Mode().IsRegular() { + srcs := strings.Split(filepath.Clean(src), string(os.PathSeparator)) + paths := strings.Split(path, string(os.PathSeparator)) + var destFile string + if strings.HasSuffix(src, "/") { + destFile = filepath.Join(dest, strings.Join(paths[len(srcs):len(paths)], string(os.PathSeparator))) + } else { + destFile = filepath.Join(dest, strings.Join(paths[len(srcs)-1:len(paths)], string(os.PathSeparator))) + } + putFile(path, destFile) + } + return nil + }); err != nil { + return err + } + + case mode.IsRegular(): + _, f := filepath.Split(src) + return putFile(src, filepath.Join(dest, f)) + } + return nil +} + +func putFile(src string, dest string) error { + fmt.Printf("Uploading ... %s %s\n", src, dest) + query := url.Values{} + _, srcFile := path.Split(src) + destDir, destFile := path.Split(dest) + var destFullPath string + if len(destFile) == 0 { + destFullPath = path.Join(destDir, srcFile) + } else { + destFullPath = dest + } + query.Set("path", destFullPath) + respStr, err := restclient.PostFile(Config.ActiveConfig.Endpoint+"/api/v1/file/", src, query) + if err != nil { + return err + } + var respObj interface{} + if err = json.Unmarshal(respStr, &respObj); err != nil { + return err + } + // FIXME: Print an error if error message is not empty. Use response code instead + errMsg := respObj.(map[string]interface{})["msg"].(string) + if len(errMsg) > 0 { + fmt.Fprintf(os.Stderr, "upload file error: %s\n", errMsg) + } + return nil +} + +func getFile(src string, dest string) error { + query := url.Values{} + query.Set("path", src) + req, err := restclient.MakeRequestToken(Config.ActiveConfig.Endpoint+"/api/v1/file/", "GET", nil, "", query) + if err != nil { + return err + } + resp, err := restclient.HTTPClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.Status != restclient.HTTPOK { + return errors.New("server error: " + resp.Status) + } + _, srcFile := path.Split(src) + destDir, destFile := path.Split(dest) + var destFullPath string + if len(destFile) == 0 { + destFullPath = path.Join(destDir, srcFile) + } else { + destFullPath = dest + } + if _, err = os.Stat(destFullPath); err == nil { + return errors.New("file already exist: " + destFullPath) + } + out, err := os.Create(destFullPath) + if err != nil { + return err + } + defer out.Close() + _, err = io.Copy(out, resp.Body) + if err != nil { + return err + } + return nil +} diff --git a/go/paddlectl/submit.go b/go/paddlectl/submit.go index 13635851..5e249caa 100644 --- a/go/paddlectl/submit.go +++ b/go/paddlectl/submit.go @@ -6,6 +6,7 @@ import ( "flag" "fmt" "os" + "path" "strings" "k8s.io/client-go/pkg/api/v1" @@ -73,9 +74,8 @@ func (*SubmitCmd) Usage() string { func (p *SubmitCmd) getTrainer() *paddlejob.TrainerSpec { return &paddlejob.TrainerSpec{ - Entrypoint: p.Entry, - // FIXME(gongwb): workspace - + Entrypoint: p.Entry, + Workspace: getJobPfsPath(p.Jobpackage, p.Jobname), MinInstance: p.MinInstance, MaxInstance: p.MaxInstance, Resources: v1.ResourceRequirements{ @@ -115,7 +115,6 @@ func (p *SubmitCmd) getMaster() *paddlejob.MasterSpec { // GetTrainingJob get's paddlejob.TrainingJob struct filed by Submitcmd paramters. func (p *SubmitCmd) GetTrainingJob() *paddlejob.TrainingJob { - t := paddlejob.TrainingJob{ metav1.TypeMeta{ Kind: "TrainingJob", @@ -201,14 +200,30 @@ func NewSubmitter(cmd *SubmitCmd) *Submitter { return &s } +func getJobPfsPath(jobPackage, jobName string) string { + _, err := os.Stat(jobPackage) + if os.IsNotExist(err) { + return jobPackage + } + + return path.Join("/pfs", Config.ActiveConfig.Name, "home", Config.ActiveConfig.Username, "jobs", jobName) +} + // putFiles puts files to pfs and // if jobPackage is not a local dir, skip uploading package. -func putFiles(jobPackage, jobName string) error { +func putFilesToPfs(jobPackage, jobName string) error { _, pkgerr := os.Stat(jobPackage) if pkgerr == nil { - // FIXME: upload job package to paddle cloud. + dest := getJobPfsPath(jobPackage, jobName) + if !strings.HasSuffix(jobPackage, "/") { + jobPackage = jobPackage + "/" + } + err := putFiles(jobPackage, dest) + if err != nil { + return err + } } else if os.IsNotExist(pkgerr) { - return fmt.Errorf("stat jobpackage '%s' error: %v", jobPackage, pkgerr) + glog.Warning("jobpackage not a local dir, skip upload.") } return nil @@ -229,7 +244,7 @@ func (s *Submitter) Submit(jobPackage string, jobName string) error { return err } - if err := putFiles(jobPackage, jobName); err != nil { + if err := putFilesToPfs(jobPackage, jobName); err != nil { return err } @@ -248,12 +263,9 @@ func (s *Submitter) Submit(jobPackage string, jobName string) error { return err } - if err := kubeutil.CreateTrainingJob(client, namespace, s.args.GetTrainingJob()); err != nil { - return err - } - - return nil + return kubeutil.CreateTrainingJob(client, namespace, s.args.GetTrainingJob()) } + func checkJobName(jobName string) error { if strings.Contains(jobName, "_") || strings.Contains(jobName, ".") { return errors.New(invalidJobName)