Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create Pod instead of Job #344

Merged
merged 1 commit into from
Mar 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
297 changes: 179 additions & 118 deletions pkg/trainer/replicas.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ import (
"fmt"
"strings"

"github.com/golang/protobuf/proto"
log "github.com/sirupsen/logrus"
batch "k8s.io/api/batch/v1"
log "github.com/golang/glog"
"k8s.io/api/core/v1"
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -118,123 +116,97 @@ func (s *TFReplicaSet) Labels() KubernetesLabels {
}

func (s *TFReplicaSet) Create(config *tfv1alpha1.ControllerConfig) error {
for index := int32(0); index < *s.Spec.Replicas; index++ {
taskLabels := s.Labels()
taskLabels["task_index"] = fmt.Sprintf("%v", index)

// Create the service.
service := &v1.Service{
ObjectMeta: meta_v1.ObjectMeta{
Name: s.jobName(index),
Labels: taskLabels,
OwnerReferences: []meta_v1.OwnerReference{
helper.AsOwner(s.Job.job),
},
},
Spec: v1.ServiceSpec{
Selector: taskLabels,
Ports: []v1.ServicePort{
{
Name: "tf-port",
Port: *s.Spec.TFPort,
},
},
},
}

log.Infof("Creating Service: %v", service.ObjectMeta.Name)
createdService, err := s.ClientSet.CoreV1().Services(s.Job.job.ObjectMeta.Namespace).Create(service)
// Create services
err := s.SyncServices()
if err != nil {
return err
}

// If the job already exists do nothing.
if err != nil {
if k8s_errors.IsAlreadyExists(err) {
log.Infof("Service %v already exists.", s.jobName(index))
} else {
s.recorder.Eventf(s.Job.job, v1.EventTypeWarning, FailedCreateReason, "Error creating: %v", err)
return k8sErrors.NewAggregate([]error{fmt.Errorf("Creating service %v returned error.", createdService.ObjectMeta.Name), err})
}
} else {
s.recorder.Eventf(s.Job.job, v1.EventTypeNormal, SuccessfulCreateReason, "Created service: %v", createdService.Name)
}
// Create pods
return s.SyncPods()
}

// Configure the TFCONFIG environment variable.
tfConfig := TFConfig{
Cluster: s.Job.ClusterSpec(),
Task: TaskSpec{
Type: strings.ToLower(string(s.Spec.TFReplicaType)),
Index: int(index),
// CreateServiceWithIndex will create a new service with specify index
func (s *TFReplicaSet) CreateServiceWithIndex(index int32) (*v1.Service, error) {
taskLabels := s.Labels()
taskLabels["task_index"] = fmt.Sprintf("%v", index)

// Create the service.
service := &v1.Service{
ObjectMeta: meta_v1.ObjectMeta{
Name: s.genName(index),
Labels: taskLabels,
OwnerReferences: []meta_v1.OwnerReference{
helper.AsOwner(s.Job.job),
},
// We need to set environment to cloud otherwise it will default to local which isn't what we want.
Environment: "cloud",
}

tfConfigJson, err := json.Marshal(tfConfig)
if err != nil {
log.Errorf("Job: %v serializing tfConfig: %v return error; %v", s.Job.job.ObjectMeta.Name, util.Pformat(tfConfig), err)
return err
}

// Make a copy of the template because we will modify it below. .
newPodSpecTemplate := s.Spec.Template.DeepCopy()

newJ := &batch.Job{
ObjectMeta: meta_v1.ObjectMeta{
Name: s.jobName(index),
Labels: taskLabels,
OwnerReferences: []meta_v1.OwnerReference{
helper.AsOwner(s.Job.job),
},
Spec: v1.ServiceSpec{
Selector: taskLabels,
Ports: []v1.ServicePort{
{
Name: "tf-port",
Port: *s.Spec.TFPort,
},
},
Spec: batch.JobSpec{
Completions: proto.Int32(1),
Parallelism: proto.Int32(1),
Template: *newPodSpecTemplate,
},
}

if newJ.Spec.Template.ObjectMeta.Labels == nil {
newJ.Spec.Template.ObjectMeta.Labels = make(map[string]string)
}
},
}

// Pods need to be tagged with the labels.
for k, v := range taskLabels {
newJ.Spec.Template.ObjectMeta.Labels[k] = v
}
log.Infof("Creating service: %v", service.ObjectMeta.Name)
return s.ClientSet.CoreV1().Services(s.Job.job.ObjectMeta.Namespace).Create(service)
}

// Add TF_CONFIG environment variable.
for i, _ := range newJ.Spec.Template.Spec.Containers {
// We can't get c in the loop variable because that would be by value so our modifications
// wouldn't have any effect.
c := &newJ.Spec.Template.Spec.Containers[i]
if tfv1alpha1.ContainerName(c.Name) != tfv1alpha1.TENSORFLOW {
continue
}
if len(c.Env) == 0 {
c.Env = make([]v1.EnvVar, 0)
}
c.Env = append(c.Env, v1.EnvVar{
Name: "TF_CONFIG",
Value: string(tfConfigJson),
})
}
// CreatePodWithIndex will create a new pod with specify index
func (s *TFReplicaSet) CreatePodWithIndex(index int32) (*v1.Pod, error) {
taskLabels := s.Labels()
taskLabels["task_index"] = fmt.Sprintf("%v", index)

pod := &v1.Pod{
ObjectMeta: meta_v1.ObjectMeta{
Name: s.genPodName(index),
Labels: taskLabels,
OwnerReferences: []meta_v1.OwnerReference{
helper.AsOwner(s.Job.job),
},
},
Spec: *s.Spec.Template.Spec.DeepCopy(),
}

log.Infof("Creating Job: %v", newJ.ObjectMeta.Name)
createdJob, err := s.ClientSet.BatchV1().Jobs(s.Job.job.ObjectMeta.Namespace).Create(newJ)
// Configure the TFCONFIG environment variable.
tfConfig := TFConfig{
Cluster: s.Job.ClusterSpec(),
Task: TaskSpec{
Type: strings.ToLower(string(s.Spec.TFReplicaType)),
Index: int(index),
},
// We need to set environment to cloud otherwise it will default to local which isn't what we want.
Environment: "cloud",
}

// If the job already exists do nothing.
if err != nil {
if k8s_errors.IsAlreadyExists(err) {
log.Infof("%v already exists.", s.jobName(index))
tfConfigJson, err := json.Marshal(tfConfig)
if err != nil {
log.Errorf("Job: %v serializing tfConfig: %v return error; %v", s.Job.job.ObjectMeta.Name, util.Pformat(tfConfig), err)
return nil, err
}

} else {
s.recorder.Eventf(s.Job.job, v1.EventTypeWarning, FailedCreateReason, "Error creating: %v", err)
return k8sErrors.NewAggregate([]error{fmt.Errorf("Creating Job %v returned error.", createdJob.ObjectMeta.Name), err})
}
} else {
s.recorder.Eventf(s.Job.job, v1.EventTypeNormal, SuccessfulCreateReason, "Created job: %v", createdJob.Name)
// Add TF_CONFIG environment variable.
for i, _ := range pod.Spec.Containers {
// We can't get c in the loop variable because that would be by value so our modifications
// wouldn't have any effect.
c := &pod.Spec.Containers[i]
if tfv1alpha1.ContainerName(c.Name) != tfv1alpha1.TENSORFLOW {
continue
}
if len(c.Env) == 0 {
c.Env = make([]v1.EnvVar, 0)
}
c.Env = append(c.Env, v1.EnvVar{
Name: "TF_CONFIG",
Value: string(tfConfigJson),
})
}
return nil

log.Infof("Creating pod: %v", pod.ObjectMeta.Name)
return s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).Create(pod)
}

// Delete deletes the replicas
Expand All @@ -250,8 +222,8 @@ func (s *TFReplicaSet) Delete() error {
LabelSelector: selector,
}

log.Infof("Deleting Jobs namespace=%v selector=%v", s.Job.job.ObjectMeta.Namespace, selector)
err = s.ClientSet.BatchV1().Jobs(s.Job.job.ObjectMeta.Namespace).DeleteCollection(&meta_v1.DeleteOptions{}, options)
log.V(1).Infof("Deleting Jobs namespace=%v selector=%v", s.Job.job.ObjectMeta.Namespace, selector)
err = s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).DeleteCollection(&meta_v1.DeleteOptions{}, options)

if err != nil {
log.Errorf("There was a problem deleting the jobs; %v", err)
Expand All @@ -270,11 +242,11 @@ func (s *TFReplicaSet) Delete() error {
// Services doesn't support DeleteCollection so we delete them individually.
// TODO(jlewi): We should check if this has changed with K8s 1.8 or other releases.
for index := int32(0); index < *s.Spec.Replicas; index++ {
log.Infof("Deleting Service %v:%v", s.Job.job.ObjectMeta.Namespace, s.jobName((index)))
err = s.ClientSet.CoreV1().Services(s.Job.job.ObjectMeta.Namespace).Delete(s.jobName(index), &meta_v1.DeleteOptions{})
log.V(1).Infof("Deleting Service %v:%v", s.Job.job.ObjectMeta.Namespace, s.genName((index)))
err = s.ClientSet.CoreV1().Services(s.Job.job.ObjectMeta.Namespace).Delete(s.genName(index), &meta_v1.DeleteOptions{})

if err != nil {
log.Errorf("Error deleting service %v; %v", s.jobName(index), err)
log.Errorf("Error deleting service %v; %v", s.genName(index), err)
failures = true
}
}
Expand Down Expand Up @@ -304,7 +276,6 @@ func (s *TFReplicaSet) Delete() error {

// replicaStatusFromPodList returns a status from a list of pods for a job.
func replicaStatusFromPodList(l v1.PodList, name tfv1alpha1.ContainerName) tfv1alpha1.ReplicaState {
log.Infof("Get replicaStatus from PodList: %v", util.Pformat(l))
var latest *v1.Pod
for _, i := range l.Items {
if latest == nil {
Expand Down Expand Up @@ -359,13 +330,13 @@ func replicaStatusFromPodList(l v1.PodList, name tfv1alpha1.ContainerName) tfv1a
}

func (s *TFReplicaSet) GetSingleReplicaStatus(index int32) tfv1alpha1.ReplicaState {
j, err := s.ClientSet.BatchV1().Jobs(s.Job.job.ObjectMeta.Namespace).Get(s.jobName(index), meta_v1.GetOptions{})
p, err := s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).Get(s.genName(index), meta_v1.GetOptions{})

if err != nil {
return tfv1alpha1.ReplicaStateUnknown
}

if j.Status.Succeeded >= 1 {
if v1.PodSucceeded == p.Status.Phase {
return tfv1alpha1.ReplicaStateSucceeded
}

Expand Down Expand Up @@ -436,14 +407,104 @@ func (s *TFReplicaSet) GetStatus() (tfv1alpha1.TFReplicaStatus, error) {
return status, nil
}

func (s *TFReplicaSet) jobName(index int32) string {
// SyncPods will try to check current pods for this TFReplicaSet and try to make it as desired.
func (s *TFReplicaSet) SyncPods() error {
for index := int32(0); index < *s.Spec.Replicas; index++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think deleting failed pods could make debugging difficult. It will be useful to be able to see failed pods by doing
kubectl describe pods to get information about why it failed.

Instead of relying on the Name to identify pods belonging to a specific replica. Can we use the labels?
i.e with Get use a label selector to select on replica_index and replica_type you can then sort by creationtimestamp to get the most recent pod (this is what the current implementation does so you could probably copy much of the current logic).

The name for the pod can no include a random UUID. So when a pod fails we can just create a new one.
This would simplify some of the logic below since we no longer need to delete the pod; we can just check if one is running and if not we can create a new pod. So the logic is the same whether we are creating the pod for the first time or recovering.


// Label to get all pods of this TFReplicaType + index
labels := s.Labels()
labels["task_index"] = fmt.Sprintf("%v", index)

labelSelector, err := labels.ToSelector()
if err != nil {
return err
}

// Filter the unactive pods
fieldSelector := "status.phase!=" + string(v1.PodFailed) +
",deletionTimestamp!=nil"

options := meta_v1.ListOptions{
LabelSelector: labelSelector,
FieldSelector: fieldSelector,
}

// List to get pods
pl, err := s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).List(options)

if len(pl.Items) == 0 {
log.Infof("Pod not found, create new one.")
// Create the pod
createdPod, err := s.CreatePodWithIndex(index)

// If the pod already exists do nothing.
if err != nil {
if k8s_errors.IsAlreadyExists(err) {
log.Infof("Pod: %v already exists.", createdPod.ObjectMeta.Name)
continue
}
s.recorder.Eventf(s.Job.job, v1.EventTypeWarning, FailedCreateReason, "Error creating: %v", err)
return k8sErrors.NewAggregate([]error{fmt.Errorf("Creating pod %v returned error.", createdPod.ObjectMeta.Name), err})
}

s.recorder.Eventf(s.Job.job, v1.EventTypeNormal, SuccessfulCreateReason, "Created pod: %v", createdPod.Name)
continue
}

if err != nil {
// TODO: handing this error
continue
}
}

return nil
}

// SyncServices will try to check current services for this TFReplicaSet and try to make it as desired.
func (s *TFReplicaSet) SyncServices() error {
for index := int32(0); index < *s.Spec.Replicas; index++ {
_, err := s.ClientSet.CoreV1().Services(s.Job.job.ObjectMeta.Namespace).Get(s.genName(index), meta_v1.GetOptions{})
if err != nil && k8s_errors.IsNotFound(err) {
log.Infof("Service: %v not found, create new one.", s.genName(index))
// Create the service
createdService, err := s.CreateServiceWithIndex(index)

// If the service already exists do nothing.
if err != nil {
if k8s_errors.IsAlreadyExists(err) {
log.Infof("Service: %v already exists.", s.genName(index))
continue
}
s.recorder.Eventf(s.Job.job, v1.EventTypeWarning, FailedCreateReason, "Error creating: %v", err)
return k8sErrors.NewAggregate([]error{fmt.Errorf("Creating Service %v returned error.", createdService.ObjectMeta.Name), err})
}

s.recorder.Eventf(s.Job.job, v1.EventTypeNormal, SuccessfulCreateReason, "Created Service: %v", createdService.Name)
continue
}

if err != nil {
// TODO: handing this error
continue
}
}

return nil
}

func (s *TFReplicaSet) genName(index int32) string {
// Truncate tfjob name to 40 characters
// The whole job name should be compliant with the DNS_LABEL spec, up to a max length of 63 characters
// Thus jobname(40 chars)-replicaType(6 chars)-runtimeId(4 chars)-index(4 chars), also leaving some spaces
// Thus genName(40 chars)-replicaType(6 chars)-runtimeId(4 chars)-index(4 chars), also leaving some spaces
// See https://github.com/kubernetes/community/blob/master/contributors/design-proposals/architecture/identifiers.md
return fmt.Sprintf("%v-%v-%v-%v", fmt.Sprintf("%.40s", s.Job.job.ObjectMeta.Name), strings.ToLower(string(s.Spec.TFReplicaType)), s.Job.job.Spec.RuntimeId, index)
}

func (s *TFReplicaSet) genPodName(index int32) string {
// Generate a new pod name with random string
return s.genName(index) + "-" + util.RandString(5)
}

func (s *TFReplicaSet) defaultPSConfigMapName() string {
return fmt.Sprintf("cm-ps-%v", s.Job.job.Spec.RuntimeId)
}
Loading