Skip to content

Commit

Permalink
Refactor Log Parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
tdmanv committed Aug 29, 2019
1 parent 9d0ab03 commit d49172c
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 16 deletions.
20 changes: 4 additions & 16 deletions pkg/function/kube_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
"k8s.io/client-go/kubernetes"

kanister "github.com/kanisterio/kanister/pkg"
"github.com/kanisterio/kanister/pkg/format"
"github.com/kanisterio/kanister/pkg/kube"
"github.com/kanisterio/kanister/pkg/output"
"github.com/kanisterio/kanister/pkg/param"
)

Expand Down Expand Up @@ -81,24 +81,12 @@ func kubeTaskPodFunc(cli kubernetes.Interface) func(ctx context.Context, pod *v1
if err != nil {
return nil, errors.Wrapf(err, "Failed to fetch logs from the pod")
}
defer r.Close()
logCh := format.LogStream(pod.Name, pod.Spec.Containers[0].Name, r)
out, err := output.ParseLog(ctx, r)
// Wait for pod completion
err = kube.WaitForPodCompletion(ctx, cli, pod.Namespace, pod.Name)
if err != nil {
if err := kube.WaitForPodCompletion(ctx, cli, pod.Namespace, pod.Name); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to complete", pod.Name)
}
out := make(map[string]interface{})
for l := range logCh {
op, err := parseLogLineForOutput(l)
if err != nil {
return nil, errors.Wrap(err, "Failed to parse phase output")
}
if op != nil {
out[op.Key] = op.Value
}
}
return out, nil
return out, err
}
}

Expand Down
74 changes: 74 additions & 0 deletions pkg/output/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2019 The Kanister Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package output

import (
"bufio"
"context"
"io"
"regexp"
"strings"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)

const reStr = `###Phase-output###:(.*?)*$`

var logRE = regexp.MustCompile(reStr)

func parseLine(l string) (*Output, error) {
match := logRE.FindAllStringSubmatch(l, 1)
if len(match) == 0 {
return nil, nil
}
op, err := UnmarshalOutput(match[0][1])
if err != nil {
return nil, err
}
return op, nil
}

func ParseLog(ctx context.Context, r io.ReadCloser) (map[string]interface{}, error) {
// Call r.Close() if the context is canceled or if s.Scan() == false.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
<-ctx.Done()
_ = r.Close()
}()

// Scan log lines when ready.
s := bufio.NewScanner(r)
out := make(map[string]interface{})
for s.Scan() {
l := s.Text()
l = strings.TrimSpace(l)
if l == "" {
continue
}
// TODO(supriya): Add fields from ctx.
log.Info("Out: ", l)
o, err := parseLine(l)
if err != nil {
return nil, err
}
if o != nil {
out[o.Key] = o.Value
}
}
err := errors.Wrap(s.Err(), "Failed to parse output")
return out, err
}

0 comments on commit d49172c

Please sign in to comment.