Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pod level inject webhook #716

Merged
merged 9 commits into from
Aug 28, 2019
4 changes: 2 additions & 2 deletions pkg/common/v1alpha2/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func GetJobLabelMap(jobKind string, trialName string) map[string]string {
labelMap := make(map[string]string)

if jobKind == "TFJob" {
labelMap["tf-job-name"] = trialName
labelMap["tf-job-role"] = "master"
labelMap["job-name"] = trialName
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kubeflow/pytorch-operator#204 had made pytorch also can use "job-name"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, updated the pytorch label name.

labelMap["job-role"] = "master"
} else if jobKind == "PyTorchJob" {
labelMap["pytorch-job-name"] = trialName
labelMap["pytorch-job-role"] = "master"
Expand Down
20 changes: 12 additions & 8 deletions pkg/util/v1alpha2/metricscollector/metricscollector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metricscollector

import (
"bytes"
"errors"
"fmt"
"strings"
Expand Down Expand Up @@ -45,15 +46,18 @@ func (d *MetricsCollector) CollectObservationLog(tId string, jobKind string, met
if len(pl.Items) == 0 {
return nil, fmt.Errorf("No Pods are found in Trial %v", tId)
}
logopt := apiv1.PodLogOptions{Timestamps: true}
logs, err := d.clientset.CoreV1().Pods(namespace).GetLogs(pl.Items[0].ObjectMeta.Name, &logopt).Do().Raw()
if err != nil {
return nil, err
logopt := apiv1.PodLogOptions{Container: "tensorflow", Timestamps: true, Follow: true}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only works for "tfjob"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only a PoC now.

reader, err := d.clientset.CoreV1().Pods(namespace).GetLogs(pl.Items[0].ObjectMeta.Name, &logopt).Stream()
for err != nil {
klog.Errorf("Retry to get logs, Error: %v", err)
time.Sleep(time.Duration(1) * time.Second)
reader, err = d.clientset.CoreV1().Pods(namespace).GetLogs(pl.Items[0].ObjectMeta.Name, &logopt).Stream()
}
if len(logs) == 0 {
return &v1alpha2.ObservationLog{}, nil
}
olog, err := d.parseLogs(tId, strings.Split(string(logs), "\n"), metrics)
buf := new(bytes.Buffer)
buf.ReadFrom(reader)
logs := buf.String()

olog, err := d.parseLogs(tId, strings.Split(logs, "\n"), metrics)
return olog, err
}

Expand Down
152 changes: 152 additions & 0 deletions pkg/webhook/v1alpha2/pod/inject_webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
Copyright 2019 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package pod

import (
"context"
"errors"
"net/http"

logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission/types"

v1 "k8s.io/api/core/v1"
apitypes "k8s.io/apimachinery/pkg/types"

trialsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/trial/v1alpha2"
katibmanagerv1alpha2 "github.com/kubeflow/katib/pkg/common/v1alpha2"
)

const (
// JobNameLabel represents the label key for the job name, the value is job name
JobNameLabel = "job-name"
// JobRoleLabel represents the label key for the job role, e.g. the value is master
JobRoleLabel = "job-role"
)

// For debug
var log = logf.Log.WithName("injector-webhook")

// sidecarInjector that inject metrics collect sidecar into master pod
type sidecarInjector struct {
client client.Client
decoder types.Decoder
managerService string
}

var _ admission.Handler = &sidecarInjector{}

func (s *sidecarInjector) Handle(ctx context.Context, req types.Request) types.Response {
pod := &v1.Pod{}
err := s.decoder.Decode(req, pod)
if err != nil {
return admission.ErrorResponse(http.StatusBadRequest, err)
}

// Check whether the pod need to be mutated
if !s.MutationRequired(pod) {
return admission.ValidationResponse(true, "")
}

// Get the namespace from req since the namespace in the pod is empty.
namespace := req.AdmissionRequest.Namespace
// Do mutation
mutatedPod, err := s.Mutate(pod, namespace)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

namespace can be get from pod object, so I think we can just use one argument instead, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to our experiment, we fail to get the namespace from pod.

if err != nil {
return admission.ErrorResponse(http.StatusBadRequest, err)
}

return admission.PatchResponse(pod, mutatedPod)
}

var _ inject.Client = &sidecarInjector{}

func (s *sidecarInjector) InjectClient(c client.Client) error {
s.client = c
return nil
}

var _ inject.Decoder = &sidecarInjector{}

func (s *sidecarInjector) InjectDecoder(d types.Decoder) error {
s.decoder = d
return nil
}

func NewSidecarInjector(c client.Client, ms string) *sidecarInjector {
return &sidecarInjector{
client: c,
managerService: ms,
}
}

func (s *sidecarInjector) MutationRequired(pod *v1.Pod) bool {
value, err := s.GetLabel(pod, JobRoleLabel)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can handle tfjob and pytorchjob case, but not cover k8s Job case

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is just a PoC. In the future, we will support job

if err != nil || value != "master" {
return false
}
return true
}

func (s *sidecarInjector) GetLabel(pod *v1.Pod, targetLabel string) (string, error) {
labels := pod.Labels
for k, v := range labels {
if k == targetLabel {
return v, nil
}
}
return "", errors.New("Label " + targetLabel + " not found.")
}

func (s *sidecarInjector) Mutate(pod *v1.Pod, namespace string) (*v1.Pod, error) {
mutatedPod := pod.DeepCopy()

// Get the trial info from client
trialName, err := s.GetLabel(pod, JobNameLabel)
if err != nil {
return nil, err
}
trial := &trialsv1alpha2.Trial{}
err = s.client.Get(context.TODO(), apitypes.NamespacedName{Name: trialName, Namespace: namespace}, trial)
if err != nil {
return nil, err
}

metricName := trial.Spec.Objective.ObjectiveMetricName
for _, v := range trial.Spec.Objective.AdditionalMetricNames {
metricName += ";"
metricName += v
}

// Hard code container, inject metrics collector
injectContainer := v1.Container{
Name: "metrics-collector",
Image: "gcr.io/kubeflow-images-public/katib/v1alpha2/metrics-collector",
Command: []string{"./metricscollector"},
Args: []string{"-e", "TODO_Experiment", "-t", trialName, "-k", "TFJob", "-n", namespace, "-m", katibmanagerv1alpha2.GetManagerAddr(), "-mn", metricName},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can use "controller-name" label of Pod to get the "-k" value
we can get "-e" value, that is the experiment name, from trial's metadata ownerReferences field

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, got "-k" and "-e" values. It seems pytorch job does not have "controller-name" label yet?

ImagePullPolicy: v1.PullIfNotPresent,
VolumeMounts: pod.Spec.Containers[0].VolumeMounts,
}
mutatedPod.Spec.Containers = append(mutatedPod.Spec.Containers, injectContainer)
mutatedPod.Spec.ServiceAccountName = pod.Spec.ServiceAccountName
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need line 149, line 120 cannot assign ServiceAccountName?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we cannot get it. There is a hack.

cc @wuchunghsuan


return mutatedPod, nil
}
15 changes: 14 additions & 1 deletion pkg/webhook/v1alpha2/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (

experimentsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/experiment/v1alpha2"
"github.com/kubeflow/katib/pkg/webhook/v1alpha2/experiment"
"github.com/kubeflow/katib/pkg/webhook/v1alpha2/pod"

admissionregistrationv1beta1 "k8s.io/api/admissionregistration/v1beta1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"

"sigs.k8s.io/controller-runtime/pkg/manager"
Expand Down Expand Up @@ -84,5 +86,16 @@ func register(manager manager.Manager, server *webhook.Server) error {
if err != nil {
return err
}
return server.Register(mutatingWebhook, validatingWebhook)
injectWebhook, err := builder.NewWebhookBuilder().
Name("mutating.pod.kubeflow.org").
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think maybe we can rename it to one which is katib related

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, do you have any suggestion about the name?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should follow xx.$group.$domain style (group should be katib and domain is kubeflow.org here), we'd better make all others consistent with this style.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM, it is a historical problem. Can we do the change in another PR? I think we could keep the name here and update it after the PR is merged, with other changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

Mutating().
Operations(admissionregistrationv1beta1.Create, admissionregistrationv1beta1.Update).
WithManager(manager).
ForType(&v1.Pod{}).
Handlers(pod.NewSidecarInjector(manager.GetClient(), manager.GetConfig().Host)).
Build()
if err != nil {
return err
}
return server.Register(mutatingWebhook, validatingWebhook, injectWebhook)
}