From d49172cadb5350f82a6df08f7565ab6c847d1680 Mon Sep 17 00:00:00 2001 From: Tom Manville Date: Thu, 29 Aug 2019 09:54:59 -0700 Subject: [PATCH] Refactor Log Parsing --- pkg/function/kube_task.go | 20 +++-------- pkg/output/log.go | 74 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 16 deletions(-) create mode 100644 pkg/output/log.go diff --git a/pkg/function/kube_task.go b/pkg/function/kube_task.go index 170c1fcc71b..38c16e98cd5 100644 --- a/pkg/function/kube_task.go +++ b/pkg/function/kube_task.go @@ -22,8 +22,8 @@ import ( "k8s.io/client-go/kubernetes" kanister "github.com/kanisterio/kanister/pkg" - "github.com/kanisterio/kanister/pkg/format" "github.com/kanisterio/kanister/pkg/kube" + "github.com/kanisterio/kanister/pkg/output" "github.com/kanisterio/kanister/pkg/param" ) @@ -81,24 +81,12 @@ func kubeTaskPodFunc(cli kubernetes.Interface) func(ctx context.Context, pod *v1 if err != nil { return nil, errors.Wrapf(err, "Failed to fetch logs from the pod") } - defer r.Close() - logCh := format.LogStream(pod.Name, pod.Spec.Containers[0].Name, r) + out, err := output.ParseLog(ctx, r) // Wait for pod completion - err = kube.WaitForPodCompletion(ctx, cli, pod.Namespace, pod.Name) - if err != nil { + if err := kube.WaitForPodCompletion(ctx, cli, pod.Namespace, pod.Name); err != nil { return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to complete", pod.Name) } - out := make(map[string]interface{}) - for l := range logCh { - op, err := parseLogLineForOutput(l) - if err != nil { - return nil, errors.Wrap(err, "Failed to parse phase output") - } - if op != nil { - out[op.Key] = op.Value - } - } - return out, nil + return out, err } } diff --git a/pkg/output/log.go b/pkg/output/log.go new file mode 100644 index 00000000000..73c5678a3c9 --- /dev/null +++ b/pkg/output/log.go @@ -0,0 +1,74 @@ +// Copyright 2019 The Kanister Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package output + +import ( + "bufio" + "context" + "io" + "regexp" + "strings" + + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" +) + +const reStr = `###Phase-output###:(.*?)*$` + +var logRE = regexp.MustCompile(reStr) + +func parseLine(l string) (*Output, error) { + match := logRE.FindAllStringSubmatch(l, 1) + if len(match) == 0 { + return nil, nil + } + op, err := UnmarshalOutput(match[0][1]) + if err != nil { + return nil, err + } + return op, nil +} + +func ParseLog(ctx context.Context, r io.ReadCloser) (map[string]interface{}, error) { + // Call r.Close() if the context is canceled or if s.Scan() == false. + ctx, cancel := context.WithCancel(ctx) + defer cancel() + go func() { + <-ctx.Done() + _ = r.Close() + }() + + // Scan log lines when ready. + s := bufio.NewScanner(r) + out := make(map[string]interface{}) + for s.Scan() { + l := s.Text() + l = strings.TrimSpace(l) + if l == "" { + continue + } + // TODO(supriya): Add fields from ctx. + log.Info("Out: ", l) + o, err := parseLine(l) + if err != nil { + return nil, err + } + if o != nil { + out[o.Key] = o.Value + } + } + err := errors.Wrap(s.Err(), "Failed to parse output") + return out, err +}