Skip to content

Commit

Permalink
Merge pull request #228 from kanisterio/stream-log
Browse files Browse the repository at this point in the history
Stream logs from kube-task
  • Loading branch information
tdmanv authored Aug 23, 2019
2 parents 519ef06 + 2f6ce5e commit 81a1693
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 73 deletions.
29 changes: 26 additions & 3 deletions pkg/format/format.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package format

import (
"bufio"
"io"
"regexp"
"strings"

Expand All @@ -11,9 +13,30 @@ func Log(podName string, containerName string, output string) {
if output != "" {
logs := regexp.MustCompile("[\r\n]").Split(output, -1)
for _, l := range logs {
if strings.TrimSpace(l) != "" {
log.Info("Pod: ", podName, " Container: ", containerName, " Out: ", l)
}
info(podName, containerName, l)
}
}
}

func LogStream(podName string, containerName string, output io.ReadCloser) chan string {
logCh := make(chan string)
s := bufio.NewScanner(output)
go func() {
defer close(logCh)
for s.Scan() {
l := s.Text()
info(podName, containerName, l)
logCh <- l
}
if err := s.Err(); err != nil {
log.Error("Pod: ", podName, " Container: ", containerName, " Failed to stream log from pod: ", err.Error())
}
}()
return logCh
}

func info(podName string, containerName string, l string) {
if strings.TrimSpace(l) != "" {
log.Info("Pod: ", podName, " Container: ", containerName, " Out: ", l)
}
}
36 changes: 24 additions & 12 deletions pkg/function/kube_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,31 @@ func parseLogAndCreateOutput(out string) (map[string]interface{}, error) {
var op map[string]interface{}
logs := regexp.MustCompile("[\n]").Split(out, -1)
for _, l := range logs {
// Log should contain "###Phase-output###:" string
if strings.Contains(l, output.PhaseOpString) {
if op == nil {
op = make(map[string]interface{})
}
pattern := regexp.MustCompile(`###Phase-output###:(.*?)*$`)
match := pattern.FindAllStringSubmatch(l, 1)
opObj, err := output.UnmarshalOutput(match[0][1])
if err != nil {
return nil, err
}
op[opObj.Key] = opObj.Value
opObj, err := parseLogLineForOutput(l)
if err != nil {
return nil, err
}
if opObj == nil {
continue
}
if op == nil {
op = make(map[string]interface{})
}
op[opObj.Key] = opObj.Value
}
return op, nil
}

var outputRE = regexp.MustCompile(`###Phase-output###:(.*?)*$`)

func parseLogLineForOutput(l string) (*output.Output, error) {
if !strings.Contains(l, output.PhaseOpString) {
return nil, nil
}
match := outputRE.FindAllStringSubmatch(l, 1)
op, err := output.UnmarshalOutput(match[0][1])
if err != nil {
return nil, err
}
return op, nil
}
Expand Down
26 changes: 20 additions & 6 deletions pkg/function/kube_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,32 @@ func kubeTask(ctx context.Context, cli kubernetes.Interface, namespace, image st

func kubeTaskPodFunc(cli kubernetes.Interface) func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
return func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
// Wait for pod completion
if err := kube.WaitForPodCompletion(ctx, cli, pod.Namespace, pod.Name); err != nil {
if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to complete", pod.Name)
}
// Fetch logs from the pod
logs, err := kube.GetPodLogs(ctx, cli, pod.Namespace, pod.Name)
r, err := kube.StreamPodLogs(ctx, cli, pod.Namespace, pod.Name)
if err != nil {
return nil, errors.Wrapf(err, "Failed to fetch logs from the pod")
}
format.Log(pod.Name, pod.Spec.Containers[0].Name, logs)
out, err := parseLogAndCreateOutput(logs)
return out, errors.Wrap(err, "Failed to parse phase output")
defer r.Close()
logCh := format.LogStream(pod.Name, pod.Spec.Containers[0].Name, r)
// Wait for pod completion
err = kube.WaitForPodCompletion(ctx, cli, pod.Namespace, pod.Name)
if err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to complete", pod.Name)
}
out := make(map[string]interface{})
for l := range logCh {
op, err := parseLogLineForOutput(l)
if err != nil {
return nil, errors.Wrap(err, "Failed to parse phase output")
}
if op != nil {
out[op.Key] = op.Value
}
}
return out, nil
}
}

Expand Down
113 changes: 74 additions & 39 deletions pkg/function/kube_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,61 +47,96 @@ func (s *KubeTaskSuite) TearDownSuite(c *C) {
}
}

func newTaskBlueprint(namespace string) *crv1alpha1.Blueprint {
func outputPhase(namespace string) crv1alpha1.BlueprintPhase {
return crv1alpha1.BlueprintPhase{
Name: "testOutput",
Func: "KubeTask",
Args: map[string]interface{}{
KubeTaskNamespaceArg: namespace,
KubeTaskImageArg: "kanisterio/kanister-tools:0.20.0",
KubeTaskCommandArg: []string{
"sh",
"-c",
"kando output version 0.20.0",
},
},
}
}

func sleepPhase(namespace string) crv1alpha1.BlueprintPhase {
return crv1alpha1.BlueprintPhase{
Name: "testSleep",
Func: "KubeTask",
Args: map[string]interface{}{
KubeTaskNamespaceArg: namespace,
KubeTaskImageArg: "ubuntu:latest",
KubeTaskCommandArg: []string{
"sleep",
"2",
},
},
}
}

func tickPhase(namespace string) crv1alpha1.BlueprintPhase {
return crv1alpha1.BlueprintPhase{
Name: "testTick",
Func: "KubeTask",
Args: map[string]interface{}{
KubeTaskNamespaceArg: namespace,
KubeTaskImageArg: "alpine:3.10",
KubeTaskCommandArg: []string{
"sh",
"-c",
`for i in $(seq 3); do echo Tick: "${i}"; sleep 1; done`,
},
},
}
}

func newTaskBlueprint(phases ...crv1alpha1.BlueprintPhase) *crv1alpha1.Blueprint {
return &crv1alpha1.Blueprint{
Actions: map[string]*crv1alpha1.BlueprintAction{
"test": {
Kind: "StatefulSet",
Phases: []crv1alpha1.BlueprintPhase{
{
Name: "testOutput",
Func: "KubeTask",
Args: map[string]interface{}{
KubeTaskNamespaceArg: namespace,
KubeTaskImageArg: "kanisterio/kanister-tools:0.20.0",
KubeTaskCommandArg: []string{
"sh",
"-c",
"kando output version 0.20.0",
},
},
},
{
Name: "testSleep",
Func: "KubeTask",
Args: map[string]interface{}{
KubeTaskNamespaceArg: namespace,
KubeTaskImageArg: "ubuntu:latest",
KubeTaskCommandArg: []string{
"sleep",
"2",
},
},
},
},
Kind: "StatefulSet",
Phases: phases,
},
},
}
}

func (s *KubeTaskSuite) TestKubeTask(c *C) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
tp := param.TemplateParams{
StatefulSet: &param.StatefulSetParams{
Namespace: s.namespace,
},
}

action := "test"
bp := newTaskBlueprint(s.namespace)
phases, err := kanister.GetPhases(*bp, action, tp)
c.Assert(err, IsNil)
for _, p := range phases {
out, err := p.Exec(ctx, *bp, action, tp)
c.Assert(err, IsNil, Commentf("Phase %s failed", p.Name()))
if out != nil {
c.Assert(out["version"], NotNil)
for _, tc := range []struct {
bp *crv1alpha1.Blueprint
outs []map[string]interface{}
}{
{
bp: newTaskBlueprint(outputPhase(s.namespace), sleepPhase(s.namespace), tickPhase(s.namespace)),
outs: []map[string]interface{}{
map[string]interface{}{
"version": "0.20.0",
},
map[string]interface{}{},
map[string]interface{}{},
},
},
} {

phases, err := kanister.GetPhases(*tc.bp, action, tp)
c.Assert(err, IsNil)
c.Assert(phases, HasLen, len(tc.outs))
for i, p := range phases {
out, err := p.Exec(ctx, *tc.bp, action, tp)
c.Assert(err, IsNil, Commentf("Phase %s failed", p.Name()))
c.Assert(out, DeepEquals, tc.outs[i])
}
}
}
28 changes: 15 additions & 13 deletions pkg/kube/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kube

import (
"context"
"io"
"io/ioutil"

"github.com/pkg/errors"
Expand Down Expand Up @@ -64,6 +65,13 @@ func DeletePod(ctx context.Context, cli kubernetes.Interface, pod *v1.Pod) error
return nil
}

func StreamPodLogs(ctx context.Context, cli kubernetes.Interface, namespace, name string) (io.ReadCloser, error) {
plo := &v1.PodLogOptions{
Follow: true,
}
return cli.CoreV1().Pods(namespace).GetLogs(name, plo).Stream()
}

// GetPodLogs fetches the logs from the given pod
func GetPodLogs(ctx context.Context, cli kubernetes.Interface, namespace, name string) (string, error) {
reader, err := cli.CoreV1().Pods(namespace).GetLogs(name, &v1.PodLogOptions{}).Stream()
Expand All @@ -78,18 +86,15 @@ func GetPodLogs(ctx context.Context, cli kubernetes.Interface, namespace, name s
return string(bytes), nil
}

// WaitForPodReady waits for a pod to reach Running state
// WaitForPodReady waits for a pod to exit the pending state
func WaitForPodReady(ctx context.Context, cli kubernetes.Interface, namespace, name string) error {
err := poll.Wait(ctx, func(ctx context.Context) (bool, error) {
p, err := cli.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return false, err
}
return (p.Status.Phase == v1.PodRunning), nil
return p.Status.Phase != v1.PodPending && p.Status.Phase != "", nil
})
if err == nil {
return nil
}
return errors.Wrapf(err, "Pod did not transition into running state. Namespace:%s, Name:%s", namespace, name)
}

Expand All @@ -100,20 +105,17 @@ func WaitForPodCompletion(ctx context.Context, cli kubernetes.Interface, namespa
if err != nil {
return true, err
}
if p.Status.Phase == v1.PodFailed {
return false, errors.Errorf("Pod %s failed", name)
}
if p.Status.Phase == v1.PodRunning {
switch p.Status.Phase {
case v1.PodRunning:
for _, con := range p.Status.ContainerStatuses {
if con.State.Terminated != nil {
return false, errors.Errorf("Container %v is terminated, while Pod %v is Running", con.Name, name)
}
}
case v1.PodFailed:
return false, errors.Errorf("Pod %s failed", name)
}
return (p.Status.Phase == v1.PodSucceeded), nil
return p.Status.Phase == v1.PodSucceeded, nil
})
if err == nil {
return nil
}
return errors.Wrap(err, "Pod did not transition into complete state")
}

0 comments on commit 81a1693

Please sign in to comment.