From 12608c573fc6734b93ef88d3b386bac0a2b720d7 Mon Sep 17 00:00:00 2001 From: "Wu, ChungHsuan" Date: Wed, 28 Aug 2019 11:48:45 +0800 Subject: [PATCH] Separate the sidecar metrics collector --- .../v1alpha2/Dockerfile | 20 +++ cmd/sidecar-metricscollector/v1alpha2/main.go | 87 +++++++++++++ .../metricscollector/metricscollector.go | 20 ++- .../sidecarmetricscollector.go | 117 ++++++++++++++++++ pkg/webhook/v1alpha2/pod/inject_webhook.go | 6 +- 5 files changed, 235 insertions(+), 15 deletions(-) create mode 100644 cmd/sidecar-metricscollector/v1alpha2/Dockerfile create mode 100644 cmd/sidecar-metricscollector/v1alpha2/main.go create mode 100644 pkg/util/v1alpha2/sidecarmetricscollector/sidecarmetricscollector.go diff --git a/cmd/sidecar-metricscollector/v1alpha2/Dockerfile b/cmd/sidecar-metricscollector/v1alpha2/Dockerfile new file mode 100644 index 00000000000..0579c25a1e9 --- /dev/null +++ b/cmd/sidecar-metricscollector/v1alpha2/Dockerfile @@ -0,0 +1,20 @@ +# Build the manager binary +FROM golang:alpine AS build-env + +# Copy in the go src +ADD . /go/src/github.com/kubeflow/katib + +WORKDIR /go/src/github.com/kubeflow/katib/cmd/sidecar-metricscollector + +# Build +RUN if [ "$(uname -m)" = "ppc64le" ]; then \ + CGO_ENABLED=0 GOOS=linux GOARCH=ppc64le go build -a -o sidecar-metricscollector ./v1alpha2; \ + else \ + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -o sidecar-metricscollector ./v1alpha2; \ + fi + +# Copy the controller-manager into a thin image +FROM alpine:3.7 +WORKDIR /app +COPY --from=build-env /go/src/github.com/kubeflow/katib/cmd/sidecar-metricscollector/sidecar-metricscollector . +ENTRYPOINT ["./sidecar-metricscollector"] diff --git a/cmd/sidecar-metricscollector/v1alpha2/main.go b/cmd/sidecar-metricscollector/v1alpha2/main.go new file mode 100644 index 00000000000..298f9cd0356 --- /dev/null +++ b/cmd/sidecar-metricscollector/v1alpha2/main.go @@ -0,0 +1,87 @@ +/* +Copyright 2018 The Kubeflow Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +MetricsCollector is a default metricscollector for worker. +It will collect metrics from pod log. +You should print metrics in {{MetricsName}}={{MetricsValue}} format. +For example, the objective value name is F1 and the metrics are loss, your training code should print like below. + --- + epoch 1: + batch1 loss=0.8 + batch2 loss=0.6 + + F1=0.4 + + epoch 2: + batch1 loss=0.4 + batch2 loss=0.2 + + F1=0.7 + --- +The metrics collector will collect all logs of metrics. +*/ + +package main + +import ( + "context" + "flag" + "strings" + + "google.golang.org/grpc" + "k8s.io/klog" + + api "github.com/kubeflow/katib/pkg/api/v1alpha2" + "github.com/kubeflow/katib/pkg/util/v1alpha2/sidecarmetricscollector" +) + +var experimentName = flag.String("e", "", "Experiment Name") +var trialName = flag.String("t", "", "Trial Name") +var jobKind = flag.String("k", "", "Job Kind") +var namespace = flag.String("n", "", "NameSpace") +var managerService = flag.String("m", "", "Katib Manager service") +var metricNames = flag.String("mn", "", "Metric names") + +func main() { + flag.Parse() + klog.Infof("Experiment Name: %s, Trial Name: %s, Job Kind: %s", *experimentName, *trialName, *jobKind) + conn, err := grpc.Dial(*managerService, grpc.WithInsecure()) + if err != nil { + klog.Fatalf("could not connect: %v", err) + } + defer conn.Close() + c := api.NewManagerClient(conn) + mc, err := sidecarmetricscollector.NewSidecarMetricsCollector() + if err != nil { + klog.Fatalf("Failed to create MetricsCollector: %v", err) + } + ctx := context.Background() + olog, err := mc.CollectObservationLog(*trialName, *jobKind, strings.Split(*metricNames, ";"), *namespace) + if err != nil { + klog.Fatalf("Failed to collect logs: %v", err) + } + reportreq := &api.ReportObservationLogRequest{ + TrialName: *trialName, + ObservationLog: olog, + } + _, err = c.ReportObservationLog(ctx, reportreq) + if err != nil { + klog.Fatalf("Failed to Report logs: %v", err) + } + klog.Infof("Metrics reported. :\n%v", olog) + return +} diff --git a/pkg/util/v1alpha2/metricscollector/metricscollector.go b/pkg/util/v1alpha2/metricscollector/metricscollector.go index e5a4d5f8041..f10b5df4f1d 100644 --- a/pkg/util/v1alpha2/metricscollector/metricscollector.go +++ b/pkg/util/v1alpha2/metricscollector/metricscollector.go @@ -1,7 +1,6 @@ package metricscollector import ( - "bytes" "errors" "fmt" "strings" @@ -46,18 +45,15 @@ func (d *MetricsCollector) CollectObservationLog(tId string, jobKind string, met if len(pl.Items) == 0 { return nil, fmt.Errorf("No Pods are found in Trial %v", tId) } - logopt := apiv1.PodLogOptions{Container: "tensorflow", Timestamps: true, Follow: true} - reader, err := d.clientset.CoreV1().Pods(namespace).GetLogs(pl.Items[0].ObjectMeta.Name, &logopt).Stream() - for err != nil { - klog.Errorf("Retry to get logs, Error: %v", err) - time.Sleep(time.Duration(1) * time.Second) - reader, err = d.clientset.CoreV1().Pods(namespace).GetLogs(pl.Items[0].ObjectMeta.Name, &logopt).Stream() + logopt := apiv1.PodLogOptions{Timestamps: true} + logs, err := d.clientset.CoreV1().Pods(namespace).GetLogs(pl.Items[0].ObjectMeta.Name, &logopt).Do().Raw() + if err != nil { + return nil, err } - buf := new(bytes.Buffer) - buf.ReadFrom(reader) - logs := buf.String() - - olog, err := d.parseLogs(tId, strings.Split(logs, "\n"), metrics) + if len(logs) == 0 { + return &v1alpha2.ObservationLog{}, nil + } + olog, err := d.parseLogs(tId, strings.Split(string(logs), "\n"), metrics) return olog, err } diff --git a/pkg/util/v1alpha2/sidecarmetricscollector/sidecarmetricscollector.go b/pkg/util/v1alpha2/sidecarmetricscollector/sidecarmetricscollector.go new file mode 100644 index 00000000000..e37152c103d --- /dev/null +++ b/pkg/util/v1alpha2/sidecarmetricscollector/sidecarmetricscollector.go @@ -0,0 +1,117 @@ +package sidecarmetricscollector + +import ( + "bytes" + "errors" + "fmt" + "strings" + "time" + + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/kubernetes" + "k8s.io/klog" + "sigs.k8s.io/controller-runtime/pkg/client/config" + + v1alpha2 "github.com/kubeflow/katib/pkg/api/v1alpha2" + commonv1alpha2 "github.com/kubeflow/katib/pkg/common/v1alpha2" +) + +type SidecarMetricsCollector struct { + clientset *kubernetes.Clientset +} + +func NewSidecarMetricsCollector() (*SidecarMetricsCollector, error) { + config, err := config.GetConfig() + if err != nil { + return nil, err + } + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + return &SidecarMetricsCollector{ + clientset: clientset, + }, nil + +} + +func (d *SidecarMetricsCollector) CollectObservationLog(tId string, jobKind string, metrics []string, namespace string) (*v1alpha2.ObservationLog, error) { + labelMap := commonv1alpha2.GetJobLabelMap(jobKind, tId) + pl, err := d.clientset.CoreV1().Pods(namespace).List(metav1.ListOptions{LabelSelector: labels.Set(labelMap).String(), IncludeUninitialized: true}) + if err != nil { + return nil, err + } + if len(pl.Items) == 0 { + return nil, fmt.Errorf("No Pods are found in Trial %v", tId) + } + logopt := apiv1.PodLogOptions{Container: "tensorflow", Timestamps: true, Follow: true} + reader, err := d.clientset.CoreV1().Pods(namespace).GetLogs(pl.Items[0].ObjectMeta.Name, &logopt).Stream() + for err != nil { + klog.Errorf("Retry to get logs, Error: %v", err) + time.Sleep(time.Duration(1) * time.Second) + reader, err = d.clientset.CoreV1().Pods(namespace).GetLogs(pl.Items[0].ObjectMeta.Name, &logopt).Stream() + } + buf := new(bytes.Buffer) + buf.ReadFrom(reader) + logs := buf.String() + + olog, err := d.parseLogs(tId, strings.Split(logs, "\n"), metrics) + return olog, err +} + +func (d *SidecarMetricsCollector) parseLogs(tId string, logs []string, metrics []string) (*v1alpha2.ObservationLog, error) { + var lasterr error + olog := &v1alpha2.ObservationLog{} + mlogs := []*v1alpha2.MetricLog{} + for _, logline := range logs { + if logline == "" { + continue + } + ls := strings.SplitN(logline, " ", 2) + if len(ls) != 2 { + klog.Errorf("Error parsing log: %s", logline) + lasterr = errors.New("Error parsing log") + continue + } + _, err := time.Parse(time.RFC3339Nano, ls[0]) + if err != nil { + klog.Errorf("Error parsing time %s: %v", ls[0], err) + lasterr = err + continue + } + kvpairs := strings.Fields(ls[1]) + for _, kv := range kvpairs { + v := strings.Split(kv, "=") + if len(v) > 2 { + klog.Infof("Ignoring trailing garbage: %s", kv) + } + if len(v) == 1 { + continue + } + metricName := "" + for _, m := range metrics { + if v[0] == m { + metricName = v[0] + } + } + if metricName == "" { + continue + } + timestamp := ls[0] + mlogs = append(mlogs, &v1alpha2.MetricLog{ + TimeStamp: timestamp, + Metric: &v1alpha2.Metric{ + Name: metricName, + Value: v[1], + }, + }) + } + } + olog.MetricLogs = mlogs + if lasterr != nil { + return olog, lasterr + } + return olog, nil +} diff --git a/pkg/webhook/v1alpha2/pod/inject_webhook.go b/pkg/webhook/v1alpha2/pod/inject_webhook.go index 4b269f5d9d4..e1dcc6c01b3 100644 --- a/pkg/webhook/v1alpha2/pod/inject_webhook.go +++ b/pkg/webhook/v1alpha2/pod/inject_webhook.go @@ -163,9 +163,9 @@ func (s *sidecarInjector) Mutate(pod *v1.Pod, namespace string) (*v1.Pod, error) // Hard code container, inject metrics collector injectContainer := v1.Container{ - Name: "metrics-collector", - Image: "gcr.io/kubeflow-images-public/katib/v1alpha2/metrics-collector", - Command: []string{"./metricscollector"}, + Name: "sidecar-metrics-collector", + Image: "gcr.io/kubeflow-images-public/katib/v1alpha2/sidecar-metrics-collector", + Command: []string{"./sidecar-metricscollector"}, Args: []string{"-e", experimentName, "-t", trialName, "-k", kind, "-n", namespace, "-m", katibmanagerv1alpha2.GetManagerAddr(), "-mn", metricName}, ImagePullPolicy: v1.PullIfNotPresent, VolumeMounts: pod.Spec.Containers[0].VolumeMounts,