From 197e51a5a39c0733048eec63bbcbf2796b5ac91d Mon Sep 17 00:00:00 2001 From: Tom Manville Date: Thu, 29 Aug 2019 09:54:59 -0700 Subject: [PATCH] Refactor Log Parsing --- pkg/function/kube_exec.go | 17 +-------- pkg/function/kube_task.go | 23 ++++-------- pkg/output/output.go | 26 +++++++++++++- pkg/output/output_test.go | 39 +++++++++++++++++++++ pkg/output/stream.go | 73 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 145 insertions(+), 33 deletions(-) create mode 100644 pkg/output/stream.go diff --git a/pkg/function/kube_exec.go b/pkg/function/kube_exec.go index 3a46e5bc6c..e31259c57b 100644 --- a/pkg/function/kube_exec.go +++ b/pkg/function/kube_exec.go @@ -17,7 +17,6 @@ package function import ( "context" "regexp" - "strings" "github.com/pkg/errors" @@ -56,7 +55,7 @@ func parseLogAndCreateOutput(out string) (map[string]interface{}, error) { var op map[string]interface{} logs := regexp.MustCompile("[\n]").Split(out, -1) for _, l := range logs { - opObj, err := parseLogLineForOutput(l) + opObj, err := output.Parse(l) if err != nil { return nil, err } @@ -71,20 +70,6 @@ func parseLogAndCreateOutput(out string) (map[string]interface{}, error) { return op, nil } -var outputRE = regexp.MustCompile(`###Phase-output###:(.*?)*$`) - -func parseLogLineForOutput(l string) (*output.Output, error) { - if !strings.Contains(l, output.PhaseOpString) { - return nil, nil - } - match := outputRE.FindAllStringSubmatch(l, 1) - op, err := output.UnmarshalOutput(match[0][1]) - if err != nil { - return nil, err - } - return op, nil -} - func (kef *kubeExecFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) { cli, err := kube.NewClient() if err != nil { diff --git a/pkg/function/kube_task.go b/pkg/function/kube_task.go index 170c1fcc71..619ba18172 100644 --- a/pkg/function/kube_task.go +++ b/pkg/function/kube_task.go @@ -22,8 +22,8 @@ import ( "k8s.io/client-go/kubernetes" kanister "github.com/kanisterio/kanister/pkg" - "github.com/kanisterio/kanister/pkg/format" "github.com/kanisterio/kanister/pkg/kube" + "github.com/kanisterio/kanister/pkg/output" "github.com/kanisterio/kanister/pkg/param" ) @@ -81,24 +81,15 @@ func kubeTaskPodFunc(cli kubernetes.Interface) func(ctx context.Context, pod *v1 if err != nil { return nil, errors.Wrapf(err, "Failed to fetch logs from the pod") } - defer r.Close() - logCh := format.LogStream(pod.Name, pod.Spec.Containers[0].Name, r) - // Wait for pod completion - err = kube.WaitForPodCompletion(ctx, cli, pod.Namespace, pod.Name) + out, err := output.LogAndParse(ctx, r) if err != nil { - return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to complete", pod.Name) + return nil, err } - out := make(map[string]interface{}) - for l := range logCh { - op, err := parseLogLineForOutput(l) - if err != nil { - return nil, errors.Wrap(err, "Failed to parse phase output") - } - if op != nil { - out[op.Key] = op.Value - } + // Wait for pod completion + if err := kube.WaitForPodCompletion(ctx, cli, pod.Namespace, pod.Name); err != nil { + return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to complete", pod.Name) } - return out, nil + return out, err } } diff --git a/pkg/output/output.go b/pkg/output/output.go index 617c07d315..1d8ca40d4a 100644 --- a/pkg/output/output.go +++ b/pkg/output/output.go @@ -17,7 +17,10 @@ package output import ( "encoding/json" "fmt" + "io" + "os" "regexp" + "strings" "github.com/pkg/errors" ) @@ -66,10 +69,31 @@ func ValidateKey(key string) error { // PrintOutput runs the `kando output` command func PrintOutput(key, value string) error { + return fPrintOutput(os.Stdout, key, value) +} + +func fPrintOutput(w io.Writer, key, value string) error { outString, err := marshalOutput(key, value) if err != nil { return err } - fmt.Println(PhaseOpString, outString) + fmt.Fprintln(w, PhaseOpString, outString) return nil } + +const reStr = PhaseOpString + `(.*)$` + +var logRE = regexp.MustCompile(reStr) + +func Parse(l string) (*Output, error) { + l = strings.TrimSpace(l) + match := logRE.FindAllStringSubmatch(l, 1) + if len(match) == 0 { + return nil, nil + } + op, err := UnmarshalOutput(match[0][1]) + if err != nil { + return nil, err + } + return op, nil +} diff --git a/pkg/output/output_test.go b/pkg/output/output_test.go index a63e681030..0f37a13698 100644 --- a/pkg/output/output_test.go +++ b/pkg/output/output_test.go @@ -15,6 +15,7 @@ package output import ( + "bytes" "testing" . "gopkg.in/check.v1" @@ -43,3 +44,41 @@ func (s *OutputSuite) TestValidateKey(c *C) { c.Check(err, tc.checker, Commentf("Key (%s) failed!", tc.key)) } } + +func (s *OutputSuite) TestParseValid(c *C) { + key, val := "foo", "bar" + b := bytes.NewBuffer(nil) + err := fPrintOutput(b, key, val) + c.Check(err, IsNil) + + o, err := Parse(b.String()) + c.Assert(err, IsNil) + c.Assert(o, NotNil) + c.Assert(o.Key, Equals, key) + c.Assert(o.Value, Equals, val) +} + +func (s *OutputSuite) TestParseNoOutput(c *C) { + key, val := "foo", "bar" + b := bytes.NewBuffer(nil) + err := fPrintOutput(b, key, val) + c.Check(err, IsNil) + valid := b.String() + for _, tc := range []struct { + s string + checker Checker + }{ + { + s: valid[0 : len(valid)-2], + checker: NotNil, + }, + { + s: valid[1 : len(valid)-1], + checker: IsNil, + }, + } { + o, err := Parse(tc.s) + c.Assert(err, tc.checker) + c.Assert(o, IsNil) + } +} diff --git a/pkg/output/stream.go b/pkg/output/stream.go new file mode 100644 index 0000000000..bdb0f61af0 --- /dev/null +++ b/pkg/output/stream.go @@ -0,0 +1,73 @@ +// Copyright 2019 The Kanister Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package output + +import ( + "bufio" + "context" + "io" + "strings" + + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" +) + +func splitLines(ctx context.Context, r io.ReadCloser, f func(context.Context, string) error) error { + // Call r.Close() if the context is canceled or if s.Scan() == false. + ctx, cancel := context.WithCancel(ctx) + defer cancel() + go func() { + <-ctx.Done() + _ = r.Close() + }() + + // Scan log lines when ready. + s := bufio.NewScanner(r) + for s.Scan() { + l := s.Text() + l = strings.TrimSpace(l) + if l == "" { + continue + } + if err := f(ctx, l); err != nil { + return err + } + } + return errors.Wrap(s.Err(), "Split lines failed") +} + +func LogAndParse(ctx context.Context, r io.ReadCloser) (map[string]interface{}, error) { + out := make(map[string]interface{}) + err := splitLines(ctx, r, func(ctx context.Context, l string) error { + log.Info("Pod Out:", l) + o, err := Parse(l) + if err != nil { + return err + } + if o != nil { + out[o.Key] = o.Value + } + return nil + }) + return out, err +} + +func Log(ctx context.Context, r io.ReadCloser) error { + err := splitLines(ctx, r, func(ctx context.Context, l string) error { + log.Info("Pod Out:", l) + return nil + }) + return err +}